From 27cfd8c9bd772c40c8e47a35baf3896c0fbb0884 Mon Sep 17 00:00:00 2001 From: Konstantin Pereiaslov Date: Sat, 8 Feb 2025 03:28:49 -0600 Subject: [PATCH] Implement open connection counting in test server --- test-server/main.go | 134 +++++++++++++++++++++++++++++++++----------- 1 file changed, 101 insertions(+), 33 deletions(-) diff --git a/test-server/main.go b/test-server/main.go index 1c72901..0d0aa1d 100644 --- a/test-server/main.go +++ b/test-server/main.go @@ -8,11 +8,14 @@ import ( "net" "net/http" "os" + "sync" "time" ) var appStarted = false +var mainPortConnections = 0 + func main() { port := flag.String("p", "", "Main port to listen on") healthCheckApiPort := flag.String("healthcheck-port", "", "Healthcheck API port to listen on. If not specified, healthcheck API is disabled") @@ -38,70 +41,135 @@ func main() { } type HealthcheckResponse struct { - Message string `json:"message"` - Status int `json:"status"` + Message string `json:"message"` + MainPortConnections int `json:"main_port_connections"` + Status int `json:"status"` } -func listenOnMainPort(port *string, sleepDuration *time.Duration, startupDuration *time.Duration, requestProcessingDuration *time.Duration) { +func listenOnMainPort(port *string, + sleepDuration, startupDuration, requestProcessingDuration *time.Duration) { + + // Simulate pre-listening work time.Sleep(*sleepDuration) + + // Mark app as started after startupDuration time.AfterFunc(*startupDuration, func() { appStarted = true }) + listener, err := net.Listen("tcp", ":"+*port) if err != nil { fmt.Println("Error listening:", err.Error()) os.Exit(1) } - defer func(listener net.Listener) { - err := listener.Close() - if err != nil { - fmt.Println("Failed to stop listening: ", err.Error()) + defer func() { + if err := listener.Close(); err != nil { + fmt.Println("Failed to stop listening:", err.Error()) } - }(listener) + }() fmt.Printf("Listening on port %s\n", *port) + + // Accept connections in a loop for { conn, err := listener.Accept() if err != nil { - fmt.Println("Error accepting: ", err.Error()) - os.Exit(1) + // Log and move on rather than exiting + fmt.Println("Error accepting connection:", err.Error()) + continue } - fmt.Println("Connection received.") - var contentToWriteToSocket string - if appStarted { - if requestProcessingDuration.Nanoseconds() > 0 { - fmt.Printf("Sleeping for %s before returning pid\n", requestProcessingDuration) - time.Sleep(*requestProcessingDuration) + go handleMainPortConnection(conn, requestProcessingDuration) + } +} + +func handleMainPortConnection(conn net.Conn, requestProcessingDuration *time.Duration) { + mainPortConnections++ + fmt.Println("Connection received on main port.") + + // We'll track if we've already decremented to avoid double decrementConnections. + var once sync.Once + decrementConnections := func() { + once.Do(func() { + mainPortConnections-- + }) + } + + // Channel to signal that the server is closing the connection normally + srvClosed := make(chan struct{}) + + // Channel to signal that the client is detected to have disconnected early + clientClosed := make(chan struct{}) + + // Goroutine to detect client closure or read errors + go func() { + buf := make([]byte, 1) + for { + if _, err := conn.Read(buf); err != nil { + // Check if the server has already signaled a normal close + select { + case <-srvClosed: + // This means the server intentionally closed the connection first, + // so it's a normal close — do not log an error or treat it as early disconnect + return + default: + // Otherwise, the client likely closed early or a read error occurred + fmt.Println("Client disconnected early or read error:", err.Error()) + decrementConnections() + close(clientClosed) + return + } } - fmt.Println("Responding with pid") - pid := os.Getpid() - contentToWriteToSocket = fmt.Sprintf("%d", pid) - } else { - fmt.Println("Server was still starting, responding with error") - contentToWriteToSocket = fmt.Sprintf("Error, server still starting") - } - _, writeErr := conn.Write([]byte(contentToWriteToSocket)) - if writeErr != nil { - fmt.Println("Error writing to connection: ", writeErr.Error()) } + }() - connectionCloseErr := conn.Close() - if connectionCloseErr != nil { - fmt.Println("Error closing connection: ", connectionCloseErr.Error()) + var content string + if appStarted { + if requestProcessingDuration != nil && requestProcessingDuration.Nanoseconds() > 0 { + fmt.Printf("Sleeping for %s before returning pid\n", *requestProcessingDuration) + time.Sleep(*requestProcessingDuration) } + fmt.Println("Responding with pid") + content = fmt.Sprintf("%d", os.Getpid()) + } else { + fmt.Println("Server still starting, responding with error") + content = "Error, server still starting" + } + + // Check if the client has already closed (before writing) + select { + case <-clientClosed: + // Client is gone; `mainPortConnections` is already decremented + _ = conn.Close() + return + default: + // Client still here, proceed } + + if _, err := conn.Write([]byte(content)); err != nil { + fmt.Println("Error writing to connection:", err.Error()) + } + + // Signal that the server is about to close the connection normally + close(srvClosed) + if err := conn.Close(); err != nil { + fmt.Println("Error closing connection:", err.Error()) + } + + decrementConnections() } func healthCheckHandler(responseWriter http.ResponseWriter, _ *http.Request) { var response HealthcheckResponse if appStarted { response = HealthcheckResponse{ - Message: "ok", - Status: 200, + Message: "ok", + MainPortConnections: mainPortConnections, + Status: 200, } } else { response = HealthcheckResponse{ - Message: "server_starting", - Status: 503, + Message: "server_starting", + MainPortConnections: mainPortConnections, + Status: 503, } }