server: Revert to closing contexts on Shutdown

This commit is contained in:
Adnan Maolood 2021-02-22 21:13:42 -05:00
parent c5b304216c
commit 35f7958083
2 changed files with 91 additions and 79 deletions

View File

@ -17,10 +17,6 @@ var (
// when the response status code does not permit a body. // when the response status code does not permit a body.
ErrBodyNotAllowed = errors.New("gemini: response status code does not allow body") ErrBodyNotAllowed = errors.New("gemini: response status code does not allow body")
// ErrServerClosed is returned by the Server's Serve and ListenAndServe
// methods after a call to Shutdown or Close.
ErrServerClosed = errors.New("gemini: server closed")
// ErrHandlerTimeout is returned on ResponseWriter Write calls // ErrHandlerTimeout is returned on ResponseWriter Write calls
// in handlers which have timed out. // in handlers which have timed out.
ErrHandlerTimeout = errors.New("gemini: Handler timeout") ErrHandlerTimeout = errors.New("gemini: Handler timeout")

160
server.go
View File

@ -45,10 +45,11 @@ type Server struct {
// If nil, logging is done via the log package's standard logger. // If nil, logging is done via the log package's standard logger.
ErrorLog *log.Logger ErrorLog *log.Logger
listeners map[*net.Listener]struct{} listeners map[*net.Listener]context.CancelFunc
conns map[*net.Conn]struct{} conns map[*net.Conn]context.CancelFunc
closedChan chan struct{} // closed when the server is closed closed bool // true if Closed or Shutdown called
doneChan chan struct{} // closed when no more connections are open shutdown bool // true if Shutdown called
doneChan chan struct{}
mu sync.Mutex mu sync.Mutex
} }
@ -58,18 +59,10 @@ const (
serverClosed serverClosed
) )
// closed returns a channel that's closed when the server is closed. func (srv *Server) isClosed() bool {
func (srv *Server) closed() chan struct{} {
srv.mu.Lock() srv.mu.Lock()
defer srv.mu.Unlock() defer srv.mu.Unlock()
return srv.closedLocked() return srv.closed
}
func (srv *Server) closedLocked() chan struct{} {
if srv.closedChan == nil {
srv.closedChan = make(chan struct{})
}
return srv.closedChan
} }
// done returns a channel that's closed when the server is closed and // done returns a channel that's closed when the server is closed and
@ -87,14 +80,16 @@ func (srv *Server) doneLocked() chan struct{} {
return srv.doneChan return srv.doneChan
} }
// tryFinishShutdown closes srv.done() if the server is closed and // tryCloseDone closes srv.done() if the server is closed and
// there are no active listeners or connections. // there are no active listeners or connections.
func (srv *Server) tryFinishShutdown() { func (srv *Server) tryCloseDone() {
srv.mu.Lock() srv.mu.Lock()
defer srv.mu.Unlock() defer srv.mu.Unlock()
select { srv.tryCloseDoneLocked()
case <-srv.closedLocked(): }
default:
func (srv *Server) tryCloseDoneLocked() {
if !srv.closed {
return return
} }
if len(srv.listeners) == 0 && len(srv.conns) == 0 { if len(srv.listeners) == 0 && len(srv.conns) == 0 {
@ -107,23 +102,27 @@ func (srv *Server) tryFinishShutdown() {
} }
} }
// Close immediately closes all active net.Listeners and connections. // Close immediately closes all active net.Listeners and connections
// by cancelling their contexts.
// For a graceful shutdown, use Shutdown. // For a graceful shutdown, use Shutdown.
func (srv *Server) Close() error { func (srv *Server) Close() error {
ch := srv.closed()
select {
case <-ch:
return nil
default:
close(ch)
}
srv.tryFinishShutdown()
// Force all active connections to close.
srv.mu.Lock() srv.mu.Lock()
for conn := range srv.conns { {
(*conn).Close() if srv.closed {
srv.mu.Unlock()
return nil
}
srv.closed = true
srv.tryCloseDoneLocked()
// Close all active connections and listeners.
for _, cancel := range srv.listeners {
cancel()
}
for _, cancel := range srv.conns {
cancel()
}
} }
srv.mu.Unlock() srv.mu.Unlock()
@ -134,28 +133,34 @@ func (srv *Server) Close() error {
} }
// Shutdown gracefully shuts down the server without interrupting any // Shutdown gracefully shuts down the server without interrupting any
// active connections. Shutdown works by first closing all open // active connections. Shutdown works by first cancelling the contexts
// listeners and then waiting indefinitely for connections // of all open listeners and then waiting indefinitely for connections
// to close and then shut down. // to close.
// If the provided context expires before the shutdown is complete, // If the provided context expires before the shutdown is complete,
// Shutdown returns the context's error. // Shutdown returns the context's error.
// //
// When Shutdown is called, Serve and ListenAndServe immediately // When Shutdown is called, Serve and ListenAndServe immediately
// return ErrServerClosed. Make sure the program doesn't exit and // return an error. Make sure the program doesn't exit and waits instead for
// waits instead for Shutdown to return. // Shutdown to return.
// //
// Once Shutdown has been called on a server, it may not be reused; // Once Shutdown has been called on a server, it may not be reused;
// future calls to methods such as Serve will return ErrServerClosed. // future calls to methods such as Serve will return ErrServerClosed.
func (srv *Server) Shutdown(ctx context.Context) error { func (srv *Server) Shutdown(ctx context.Context) error {
ch := srv.closed() srv.mu.Lock()
select { {
case <-ch: if srv.closed {
srv.mu.Unlock()
return nil return nil
default:
close(ch)
} }
srv.closed = true
srv.shutdown = true
srv.tryFinishShutdown() // Close all active listeners.
for _, cancel := range srv.listeners {
cancel()
}
}
srv.mu.Unlock()
// Wait for active connections to finish. // Wait for active connections to finish.
select { select {
@ -172,13 +177,13 @@ func (srv *Server) Shutdown(ctx context.Context) error {
// //
// If srv.Addr is blank, ":1965" is used. // If srv.Addr is blank, ":1965" is used.
// //
// ListenAndServe always returns a non-nil error. After Shutdown or Close, the // ListenAndServe always returns a non-nil error.
// returned error is ErrServerClosed.
func (srv *Server) ListenAndServe(ctx context.Context) error { func (srv *Server) ListenAndServe(ctx context.Context) error {
select { if srv.isClosed() {
case <-srv.closed(): // Cancel context
return ErrServerClosed ctx, cancel := context.WithCancel(ctx)
default: cancel()
return ctx.Err()
} }
addr := srv.Addr addr := srv.Addr
@ -206,18 +211,16 @@ func (srv *Server) getCertificate(h *tls.ClientHelloInfo) (*tls.Certificate, err
return srv.GetCertificate(h.ServerName) return srv.GetCertificate(h.ServerName)
} }
func (srv *Server) trackListener(l *net.Listener) bool { func (srv *Server) trackListener(l *net.Listener, cancel context.CancelFunc) bool {
srv.mu.Lock() srv.mu.Lock()
defer srv.mu.Unlock() defer srv.mu.Unlock()
select { if srv.closed {
case <-srv.closedLocked():
return false return false
default:
} }
if srv.listeners == nil { if srv.listeners == nil {
srv.listeners = make(map[*net.Listener]struct{}) srv.listeners = make(map[*net.Listener]context.CancelFunc)
} }
srv.listeners[l] = struct{}{} srv.listeners[l] = cancel
return true return true
} }
@ -231,15 +234,19 @@ func (srv *Server) deleteListener(l *net.Listener) {
// service goroutine for each. The service goroutines read requests and // service goroutine for each. The service goroutines read requests and
// then calls the appropriate Handler to reply to them. // then calls the appropriate Handler to reply to them.
// //
// Serve always returns a non-nil error and closes l. After Shutdown or Close, // Serve always returns a non-nil error and closes l.
// the returned error is ErrServerClosed.
func (srv *Server) Serve(ctx context.Context, l net.Listener) error { func (srv *Server) Serve(ctx context.Context, l net.Listener) error {
defer l.Close() defer l.Close()
if !srv.trackListener(&l) { lnctx, cancel := context.WithCancel(ctx)
return ErrServerClosed defer cancel()
if !srv.trackListener(&l, cancel) {
// Cancel context
cancel()
return lnctx.Err()
} }
defer srv.tryFinishShutdown() defer srv.tryCloseDone()
defer srv.deleteListener(&l) defer srv.deleteListener(&l)
errch := make(chan error, 1) errch := make(chan error, 1)
@ -248,12 +255,10 @@ func (srv *Server) Serve(ctx context.Context, l net.Listener) error {
}() }()
select { select {
case <-ctx.Done(): case <-lnctx.Done():
return ctx.Err() return lnctx.Err()
case err := <-errch: case err := <-errch:
return err return err
case <-srv.closed():
return ErrServerClosed
} }
} }
@ -283,13 +288,17 @@ func (srv *Server) serve(ctx context.Context, l net.Listener) error {
} }
} }
func (srv *Server) trackConn(conn *net.Conn) { func (srv *Server) trackConn(conn *net.Conn, cancel context.CancelFunc) bool {
srv.mu.Lock() srv.mu.Lock()
defer srv.mu.Unlock() defer srv.mu.Unlock()
if srv.conns == nil { if srv.closed && !srv.shutdown {
srv.conns = make(map[*net.Conn]struct{}) return false
} }
srv.conns[conn] = struct{}{} if srv.conns == nil {
srv.conns = make(map[*net.Conn]context.CancelFunc)
}
srv.conns[conn] = cancel
return true
} }
func (srv *Server) deleteConn(conn *net.Conn) { func (srv *Server) deleteConn(conn *net.Conn) {
@ -300,12 +309,19 @@ func (srv *Server) deleteConn(conn *net.Conn) {
// ServeConn serves a Gemini response over the provided connection. // ServeConn serves a Gemini response over the provided connection.
// It closes the connection when the response has been completed. // It closes the connection when the response has been completed.
// ServeConn can be used even after Shutdown or Close have been called. // Note that ServeConn will succeed even if a call to Shutdown is ongoing.
func (srv *Server) ServeConn(ctx context.Context, conn net.Conn) error { func (srv *Server) ServeConn(ctx context.Context, conn net.Conn) error {
defer conn.Close() defer conn.Close()
srv.trackConn(&conn) ctx, cancel := context.WithCancel(ctx)
defer srv.tryFinishShutdown() defer cancel()
if !srv.trackConn(&conn, cancel) {
// Cancel context
cancel()
return ctx.Err()
}
defer srv.tryCloseDone()
defer srv.deleteConn(&conn) defer srv.deleteConn(&conn)
if d := srv.ReadTimeout; d != 0 { if d := srv.ReadTimeout; d != 0 {