Skip to content

Commit

Permalink
Implement open connection counting in test server
Browse files Browse the repository at this point in the history
  • Loading branch information
perk11 committed Feb 8, 2025
1 parent 5ca1061 commit 27cfd8c
Showing 1 changed file with 101 additions and 33 deletions.
134 changes: 101 additions & 33 deletions test-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@ import (
"net"
"net/http"
"os"
"sync"
"time"
)

var appStarted = false

var mainPortConnections = 0

func main() {
port := flag.String("p", "", "Main port to listen on")
healthCheckApiPort := flag.String("healthcheck-port", "", "Healthcheck API port to listen on. If not specified, healthcheck API is disabled")
Expand All @@ -38,70 +41,135 @@ func main() {
}

type HealthcheckResponse struct {
Message string `json:"message"`
Status int `json:"status"`
Message string `json:"message"`
MainPortConnections int `json:"main_port_connections"`
Status int `json:"status"`
}

func listenOnMainPort(port *string, sleepDuration *time.Duration, startupDuration *time.Duration, requestProcessingDuration *time.Duration) {
func listenOnMainPort(port *string,
sleepDuration, startupDuration, requestProcessingDuration *time.Duration) {

// Simulate pre-listening work
time.Sleep(*sleepDuration)

// Mark app as started after startupDuration
time.AfterFunc(*startupDuration, func() {
appStarted = true
})

listener, err := net.Listen("tcp", ":"+*port)
if err != nil {
fmt.Println("Error listening:", err.Error())
os.Exit(1)
}
defer func(listener net.Listener) {
err := listener.Close()
if err != nil {
fmt.Println("Failed to stop listening: ", err.Error())
defer func() {
if err := listener.Close(); err != nil {
fmt.Println("Failed to stop listening:", err.Error())
}
}(listener)
}()
fmt.Printf("Listening on port %s\n", *port)

// Accept connections in a loop
for {
conn, err := listener.Accept()
if err != nil {
fmt.Println("Error accepting: ", err.Error())
os.Exit(1)
// Log and move on rather than exiting
fmt.Println("Error accepting connection:", err.Error())
continue
}
fmt.Println("Connection received.")
var contentToWriteToSocket string
if appStarted {
if requestProcessingDuration.Nanoseconds() > 0 {
fmt.Printf("Sleeping for %s before returning pid\n", requestProcessingDuration)
time.Sleep(*requestProcessingDuration)
go handleMainPortConnection(conn, requestProcessingDuration)
}
}

func handleMainPortConnection(conn net.Conn, requestProcessingDuration *time.Duration) {
mainPortConnections++
fmt.Println("Connection received on main port.")

// We'll track if we've already decremented to avoid double decrementConnections.
var once sync.Once
decrementConnections := func() {
once.Do(func() {
mainPortConnections--
})
}

// Channel to signal that the server is closing the connection normally
srvClosed := make(chan struct{})

// Channel to signal that the client is detected to have disconnected early
clientClosed := make(chan struct{})

// Goroutine to detect client closure or read errors
go func() {
buf := make([]byte, 1)
for {
if _, err := conn.Read(buf); err != nil {
// Check if the server has already signaled a normal close
select {
case <-srvClosed:
// This means the server intentionally closed the connection first,
// so it's a normal close — do not log an error or treat it as early disconnect
return
default:
// Otherwise, the client likely closed early or a read error occurred
fmt.Println("Client disconnected early or read error:", err.Error())
decrementConnections()
close(clientClosed)
return
}
}
fmt.Println("Responding with pid")
pid := os.Getpid()
contentToWriteToSocket = fmt.Sprintf("%d", pid)
} else {
fmt.Println("Server was still starting, responding with error")
contentToWriteToSocket = fmt.Sprintf("Error, server still starting")
}
_, writeErr := conn.Write([]byte(contentToWriteToSocket))
if writeErr != nil {
fmt.Println("Error writing to connection: ", writeErr.Error())
}
}()

connectionCloseErr := conn.Close()
if connectionCloseErr != nil {
fmt.Println("Error closing connection: ", connectionCloseErr.Error())
var content string
if appStarted {
if requestProcessingDuration != nil && requestProcessingDuration.Nanoseconds() > 0 {
fmt.Printf("Sleeping for %s before returning pid\n", *requestProcessingDuration)
time.Sleep(*requestProcessingDuration)
}
fmt.Println("Responding with pid")
content = fmt.Sprintf("%d", os.Getpid())
} else {
fmt.Println("Server still starting, responding with error")
content = "Error, server still starting"
}

// Check if the client has already closed (before writing)
select {
case <-clientClosed:
// Client is gone; `mainPortConnections` is already decremented
_ = conn.Close()
return
default:
// Client still here, proceed
}

if _, err := conn.Write([]byte(content)); err != nil {
fmt.Println("Error writing to connection:", err.Error())
}

// Signal that the server is about to close the connection normally
close(srvClosed)
if err := conn.Close(); err != nil {
fmt.Println("Error closing connection:", err.Error())
}

decrementConnections()
}

func healthCheckHandler(responseWriter http.ResponseWriter, _ *http.Request) {
var response HealthcheckResponse
if appStarted {
response = HealthcheckResponse{
Message: "ok",
Status: 200,
Message: "ok",
MainPortConnections: mainPortConnections,
Status: 200,
}
} else {
response = HealthcheckResponse{
Message: "server_starting",
Status: 503,
Message: "server_starting",
MainPortConnections: mainPortConnections,
Status: 503,
}
}

Expand Down

0 comments on commit 27cfd8c

Please sign in to comment.