diff --git a/client_test.go b/client_test.go index 3a493f1..892ddda 100644 --- a/client_test.go +++ b/client_test.go @@ -62,6 +62,13 @@ func setupListener(t *testing.T) (*net.UDPConn, chan []byte) { return inSocket, received } +func TestWrongAddress(t *testing.T) { + client := NewClient("BOOM:BOOM") + if err := client.Close(); err != nil { + t.Errorf("error from close: %v", err) + } +} + func TestCommands(t *testing.T) { inSocket, received := setupListener(t) diff --git a/loops.go b/loops.go index f4f4f8d..80d692e 100644 --- a/loops.go +++ b/loops.go @@ -25,6 +25,7 @@ SOFTWARE. */ import ( + "context" "net" "sync/atomic" "time" @@ -69,6 +70,8 @@ func (t *transport) sendLoop(addr string, reconnectInterval, retryTimeout time.D reconnectC <-chan time.Time ) + defer t.shutdownWg.Done() + if reconnectInterval > 0 { reconnectTicker := time.NewTicker(reconnectInterval) defer reconnectTicker.Stop() @@ -77,7 +80,23 @@ func (t *transport) sendLoop(addr string, reconnectInterval, retryTimeout time.D RECONNECT: // Attempt to connect - sock, err = net.Dial("udp", addr) + sock, err = func() (net.Conn, error) { + // Dial with context which is aborted when client is shut down + ctx, ctxCancel := context.WithCancel(context.Background()) + defer ctxCancel() + + go func() { + select { + case <-t.shutdown: + ctxCancel() + case <-ctx.Done(): + } + }() + + var d net.Dialer + return d.DialContext(ctx, "udp", addr) + }() + if err != nil { log.Printf("[STATSD] Error connecting to server: %s", err) goto WAIT @@ -89,7 +108,6 @@ RECONNECT: // Get a buffer from the queue if !ok { _ = sock.Close() // nolint: gosec - t.shutdownWg.Done() return } @@ -117,8 +135,15 @@ RECONNECT: WAIT: // Wait for a while - time.Sleep(retryTimeout) - goto RECONNECT + select { + case <-time.After(retryTimeout): + goto RECONNECT + case <-t.shutdown: + } + + // drain send queue waiting for flush loops to terminate + for range t.sendQueue { + } } // reportLoop reports periodically number of packets lost