From e2d95e25a1df289e9c0a1f91f927edd58aa61abf Mon Sep 17 00:00:00 2001 From: Kristoffer Dalby Date: Mon, 22 Apr 2024 10:12:21 +0200 Subject: [PATCH 01/25] try to close on existing connection, also reject Signed-off-by: Kristoffer Dalby --- hscontrol/noise.go | 9 +++++++-- hscontrol/poll.go | 41 +++++++++++++++++++++-------------------- 2 files changed, 28 insertions(+), 22 deletions(-) diff --git a/hscontrol/noise.go b/hscontrol/noise.go index 7fcbc252a0..b656dbd3f5 100644 --- a/hscontrol/noise.go +++ b/hscontrol/noise.go @@ -241,7 +241,7 @@ func (ns *noiseServer) NoisePollNetMapHandler( sess.tracef("aquiring lock to check stream") ns.headscale.mapSessionMu.Lock() - if _, ok := ns.headscale.mapSessions[node.ID]; ok { + if oldSession, ok := ns.headscale.mapSessions[node.ID]; ok { // NOTE/TODO(kradalby): From how I understand the protocol, when // a client connects with stream=true, and already has a streaming // connection open, the correct way is to close the current channel @@ -266,7 +266,12 @@ func (ns *noiseServer) NoisePollNetMapHandler( defer ns.headscale.mapSessionMu.Unlock() - sess.infof("node has an open stream(%p), rejecting new stream", sess) + go func() { + oldSession.infof("mapSession (%p) is open, trying to close stream and replace with %p", oldSession, sess) + oldSession.close() + }() + + sess.infof("mapSession (%p) has an open stream, rejecting new stream", sess) mapResponseRejected.WithLabelValues("exists").Inc() return } diff --git a/hscontrol/poll.go b/hscontrol/poll.go index e3137cc6ad..7df4f9e8b0 100644 --- a/hscontrol/poll.go +++ b/hscontrol/poll.go @@ -41,11 +41,11 @@ type mapSession struct { capVer tailcfg.CapabilityVersion mapper *mapper.Mapper - serving bool - servingMu sync.Mutex + cancelChMu sync.Mutex - ch chan types.StateUpdate - cancelCh chan struct{} + ch chan types.StateUpdate + cancelCh chan struct{} + cancelChOpen bool keepAliveTicker *time.Ticker @@ -86,11 +86,9 @@ func (h *Headscale) newMapSession( capVer: req.Version, mapper: h.mapper, - // serving indicates if a client is being served. - serving: false, - - ch: updateChan, - cancelCh: make(chan struct{}), + ch: updateChan, + cancelCh: make(chan struct{}), + cancelChOpen: true, keepAliveTicker: time.NewTicker(keepAliveInterval + (time.Duration(rand.IntN(9000)) * time.Millisecond)), @@ -103,15 +101,20 @@ func (h *Headscale) newMapSession( } func (m *mapSession) close() { - m.servingMu.Lock() - defer m.servingMu.Unlock() - if !m.serving { + m.cancelChMu.Lock() + defer m.cancelChMu.Unlock() + + if !m.cancelChOpen { return } - m.tracef("mapSession (%p) sending message on cancel chan") - m.cancelCh <- struct{}{} - m.tracef("mapSession (%p) sent message on cancel chan") + m.tracef("mapSession (%p) sending message on cancel chan", m) + select { + case m.cancelCh <- struct{}{}: + m.tracef("mapSession (%p) sent message on cancel chan", m) + case <-time.After(30 * time.Second): + m.tracef("mapSession (%p) timed out sending close message", m) + } } func (m *mapSession) isStreaming() bool { @@ -145,15 +148,13 @@ func (m *mapSession) serve() { defer m.h.nodeNotifier.RemoveNode(m.node.ID) defer func() { - m.servingMu.Lock() - defer m.servingMu.Unlock() + m.cancelChMu.Lock() + defer m.cancelChMu.Unlock() - m.serving = false + m.cancelChOpen = false close(m.cancelCh) }() - m.serving = true - m.h.nodeNotifier.AddNode(m.node.ID, m.ch) m.h.updateNodeOnlineStatus(true, m.node) From 5b132b66b7a4bc74a67a42f2e3c5b1ba0b24d04c Mon Sep 17 00:00:00 2001 From: Kristoffer Dalby Date: Sat, 4 May 2024 09:30:57 +0200 Subject: [PATCH 02/25] expand notifier and mapresp metrics add gauge metrics for waiting for notifier lock add gauge metrics for waiting for notifier batcher lock add gauge metrics for current pending updates add metric and count to String method count mapresps ended add metric for tracking mapresp.close returns add timeout to sendall Signed-off-by: Kristoffer Dalby --- hscontrol/metrics.go | 10 ++++++ hscontrol/notifier/metrics.go | 20 ++++++++++++ hscontrol/notifier/notifier.go | 60 +++++++++++++++++++++------------- hscontrol/poll.go | 10 ++++-- 4 files changed, 74 insertions(+), 26 deletions(-) diff --git a/hscontrol/metrics.go b/hscontrol/metrics.go index 9d802caf79..5a7fdd729f 100644 --- a/hscontrol/metrics.go +++ b/hscontrol/metrics.go @@ -47,6 +47,16 @@ var ( Name: "mapresponse_rejected_new_sessions_total", Help: "total count of new mapsessions rejected", }, []string{"reason"}) + mapResponseEnded = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: prometheusNamespace, + Name: "mapresponse_ended_total", + Help: "total count of new mapsessions ended", + }, []string{"reason"}) + mapResponseClosed = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: prometheusNamespace, + Name: "mapresponse_closed_total", + Help: "total count of calls to mapresponse close", + }, []string{"return"}) httpDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ Namespace: prometheusNamespace, Name: "http_duration_seconds", diff --git a/hscontrol/notifier/metrics.go b/hscontrol/notifier/metrics.go index 1cc4df2b65..f0688225b8 100644 --- a/hscontrol/notifier/metrics.go +++ b/hscontrol/notifier/metrics.go @@ -8,6 +8,11 @@ import ( const prometheusNamespace = "headscale" var ( + notifierWaitersForLock = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: prometheusNamespace, + Name: "notifier_waiters_for_lock", + Help: "gauge of waiters for the notifier lock", + }, []string{"type", "action"}) notifierWaitForLock = promauto.NewHistogramVec(prometheus.HistogramOpts{ Namespace: prometheusNamespace, Name: "notifier_wait_for_lock_seconds", @@ -29,4 +34,19 @@ var ( Name: "notifier_open_channels_total", Help: "total count open channels in notifier", }) + notifierBatcherWaitersForLock = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: prometheusNamespace, + Name: "notifier_batcher_waiters_for_lock", + Help: "gauge of waiters for the notifier batcher lock", + }, []string{"type", "action"}) + notifierBatcherChanges = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: prometheusNamespace, + Name: "notifier_batcher_changes_pending", + Help: "gauge of full changes pending in the notifier batcher", + }, []string{}) + notifierBatcherPatches = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: prometheusNamespace, + Name: "notifier_batcher_patches_pending", + Help: "gauge of patches pending in the notifier batcher", + }, []string{}) ) diff --git a/hscontrol/notifier/notifier.go b/hscontrol/notifier/notifier.go index 339a56f1a6..91ccfbff0f 100644 --- a/hscontrol/notifier/notifier.go +++ b/hscontrol/notifier/notifier.go @@ -40,15 +40,11 @@ func (n *Notifier) Close() { } func (n *Notifier) AddNode(nodeID types.NodeID, c chan<- types.StateUpdate) { - log.Trace().Caller().Uint64("node.id", nodeID.Uint64()).Msg("acquiring lock to add node") - defer log.Trace(). - Caller(). - Uint64("node.id", nodeID.Uint64()). - Msg("releasing lock to add node") - start := time.Now() + notifierWaitersForLock.WithLabelValues("lock", "add").Inc() n.l.Lock() defer n.l.Unlock() + notifierWaitersForLock.WithLabelValues("lock", "add").Dec() notifierWaitForLock.WithLabelValues("add").Observe(time.Since(start).Seconds()) n.nodes[nodeID] = c @@ -62,15 +58,11 @@ func (n *Notifier) AddNode(nodeID types.NodeID, c chan<- types.StateUpdate) { } func (n *Notifier) RemoveNode(nodeID types.NodeID) { - log.Trace().Caller().Uint64("node.id", nodeID.Uint64()).Msg("acquiring lock to remove node") - defer log.Trace(). - Caller(). - Uint64("node.id", nodeID.Uint64()). - Msg("releasing lock to remove node") - start := time.Now() + notifierWaitersForLock.WithLabelValues("lock", "remove").Inc() n.l.Lock() defer n.l.Unlock() + notifierWaitersForLock.WithLabelValues("lock", "remove").Dec() notifierWaitForLock.WithLabelValues("remove").Observe(time.Since(start).Seconds()) if len(n.nodes) == 0 { @@ -90,8 +82,10 @@ func (n *Notifier) RemoveNode(nodeID types.NodeID) { // IsConnected reports if a node is connected to headscale and has a // poll session open. func (n *Notifier) IsConnected(nodeID types.NodeID) bool { + notifierWaitersForLock.WithLabelValues("rlock", "conncheck").Inc() n.l.RLock() defer n.l.RUnlock() + notifierWaitersForLock.WithLabelValues("rlock", "conncheck").Dec() if val, ok := n.connected.Load(nodeID); ok { return val @@ -130,15 +124,11 @@ func (n *Notifier) NotifyByNodeID( update types.StateUpdate, nodeID types.NodeID, ) { - log.Trace().Caller().Str("type", update.Type.String()).Msg("acquiring lock to notify") - defer log.Trace(). - Caller(). - Str("type", update.Type.String()). - Msg("releasing lock, finished notifying") - start := time.Now() + notifierWaitersForLock.WithLabelValues("rlock", "notify").Inc() n.l.RLock() defer n.l.RUnlock() + notifierWaitersForLock.WithLabelValues("rlock", "notify").Dec() notifierWaitForLock.WithLabelValues("notify").Observe(time.Since(start).Seconds()) if c, ok := n.nodes[nodeID]; ok { @@ -166,29 +156,45 @@ func (n *Notifier) NotifyByNodeID( func (n *Notifier) sendAll(update types.StateUpdate) { start := time.Now() + notifierWaitersForLock.WithLabelValues("rlock", "send-all").Inc() n.l.RLock() defer n.l.RUnlock() + notifierWaitersForLock.WithLabelValues("rlock", "send-all").Dec() notifierWaitForLock.WithLabelValues("send-all").Observe(time.Since(start).Seconds()) - for _, c := range n.nodes { - c <- update - notifierUpdateSent.WithLabelValues("ok", update.Type.String(), "send-all").Inc() + for id, c := range n.nodes { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + select { + case <-ctx.Done(): + log.Error(). + Err(ctx.Err()). + Uint64("node.id", id.Uint64()). + Msgf("update not sent, context cancelled") + notifierUpdateSent.WithLabelValues("cancelled", update.Type.String(), "send-all").Inc() + + return + case c <- update: + notifierUpdateSent.WithLabelValues("ok", update.Type.String(), "send-all").Inc() + } } } func (n *Notifier) String() string { + notifierWaitersForLock.WithLabelValues("rlock", "string").Inc() n.l.RLock() defer n.l.RUnlock() + notifierWaitersForLock.WithLabelValues("rlock", "string").Dec() var b strings.Builder - b.WriteString("chans:\n") + fmt.Fprintf(&b, "chans (%d):\n", len(n.nodes)) for k, v := range n.nodes { fmt.Fprintf(&b, "\t%d: %p\n", k, v) } b.WriteString("\n") - b.WriteString("connected:\n") + fmt.Fprintf(&b, "connected (%d):\n", len(n.nodes)) n.connected.Range(func(k types.NodeID, v bool) bool { fmt.Fprintf(&b, "\t%d: %t\n", k, v) @@ -230,13 +236,16 @@ func (b *batcher) close() { // addOrPassthrough adds the update to the batcher, if it is not a // type that is currently batched, it will be sent immediately. func (b *batcher) addOrPassthrough(update types.StateUpdate) { + notifierBatcherWaitersForLock.WithLabelValues("lock", "add").Inc() b.mu.Lock() defer b.mu.Unlock() + notifierBatcherWaitersForLock.WithLabelValues("lock", "add").Dec() switch update.Type { case types.StatePeerChanged: b.changedNodeIDs.Add(update.ChangeNodes...) b.nodesChanged = true + notifierBatcherChanges.WithLabelValues().Set(float64(b.changedNodeIDs.Len())) case types.StatePeerChangedPatch: for _, newPatch := range update.ChangePatches { @@ -248,6 +257,7 @@ func (b *batcher) addOrPassthrough(update types.StateUpdate) { } } b.patchesChanged = true + notifierBatcherPatches.WithLabelValues().Set(float64(len(b.patches))) default: b.n.sendAll(update) @@ -257,8 +267,10 @@ func (b *batcher) addOrPassthrough(update types.StateUpdate) { // flush sends all the accumulated patches to all // nodes in the notifier. func (b *batcher) flush() { + notifierBatcherWaitersForLock.WithLabelValues("lock", "flush").Inc() b.mu.Lock() defer b.mu.Unlock() + notifierBatcherWaitersForLock.WithLabelValues("lock", "flush").Dec() if b.nodesChanged || b.patchesChanged { var patches []*tailcfg.PeerChange @@ -296,8 +308,10 @@ func (b *batcher) flush() { } b.changedNodeIDs = set.Slice[types.NodeID]{} + notifierBatcherChanges.WithLabelValues().Set(0) b.nodesChanged = false b.patches = make(map[types.NodeID]tailcfg.PeerChange, len(b.patches)) + notifierBatcherPatches.WithLabelValues().Set(0) b.patchesChanged = false } } diff --git a/hscontrol/poll.go b/hscontrol/poll.go index 7df4f9e8b0..96790b71d4 100644 --- a/hscontrol/poll.go +++ b/hscontrol/poll.go @@ -105,14 +105,17 @@ func (m *mapSession) close() { defer m.cancelChMu.Unlock() if !m.cancelChOpen { + mapResponseClosed.WithLabelValues("chanclosed").Inc() return } m.tracef("mapSession (%p) sending message on cancel chan", m) select { case m.cancelCh <- struct{}{}: + mapResponseClosed.WithLabelValues("sent").Inc() m.tracef("mapSession (%p) sent message on cancel chan", m) case <-time.After(30 * time.Second): + mapResponseClosed.WithLabelValues("timeout").Inc() m.tracef("mapSession (%p) timed out sending close message", m) } } @@ -232,12 +235,15 @@ func (m *mapSession) serve() { select { case <-m.cancelCh: m.tracef("poll cancelled received") + mapResponseEnded.WithLabelValues("cancelled").Inc() return + case <-ctx.Done(): m.tracef("poll context done") + mapResponseEnded.WithLabelValues("done").Inc() return - // Consume all updates sent to node + // Consume updates sent to node case update := <-m.ch: m.tracef("received stream update: %s %s", update.Type.String(), update.Message) mapResponseUpdateReceived.WithLabelValues(update.Type.String()).Inc() @@ -304,8 +310,6 @@ func (m *mapSession) serve() { return } - // log.Trace().Str("node", m.node.Hostname).TimeDiff("timeSpent", time.Now(), startMapResp).Str("mkey", m.node.MachineKey.String()).Int("type", int(update.Type)).Msg("finished making map response") - // Only send update if there is change if data != nil { startWrite := time.Now() From 59c522d1f56b732c1d0870411d5a6044e94aeed6 Mon Sep 17 00:00:00 2001 From: Kristoffer Dalby Date: Sun, 5 May 2024 09:59:46 +0200 Subject: [PATCH 03/25] make timeout and rejects label node id Signed-off-by: Kristoffer Dalby --- hscontrol/metrics.go | 2 +- hscontrol/noise.go | 2 +- hscontrol/notifier/metrics.go | 2 +- hscontrol/notifier/notifier.go | 8 ++++---- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/hscontrol/metrics.go b/hscontrol/metrics.go index 5a7fdd729f..ff85263b6d 100644 --- a/hscontrol/metrics.go +++ b/hscontrol/metrics.go @@ -46,7 +46,7 @@ var ( Namespace: prometheusNamespace, Name: "mapresponse_rejected_new_sessions_total", Help: "total count of new mapsessions rejected", - }, []string{"reason"}) + }, []string{"reason", "id"}) mapResponseEnded = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: prometheusNamespace, Name: "mapresponse_ended_total", diff --git a/hscontrol/noise.go b/hscontrol/noise.go index b656dbd3f5..fb1bdf0849 100644 --- a/hscontrol/noise.go +++ b/hscontrol/noise.go @@ -272,7 +272,7 @@ func (ns *noiseServer) NoisePollNetMapHandler( }() sess.infof("mapSession (%p) has an open stream, rejecting new stream", sess) - mapResponseRejected.WithLabelValues("exists").Inc() + mapResponseRejected.WithLabelValues("exists", node.ID.NodeID().String()).Inc() return } diff --git a/hscontrol/notifier/metrics.go b/hscontrol/notifier/metrics.go index f0688225b8..3294be784f 100644 --- a/hscontrol/notifier/metrics.go +++ b/hscontrol/notifier/metrics.go @@ -23,7 +23,7 @@ var ( Namespace: prometheusNamespace, Name: "notifier_update_sent_total", Help: "total count of update sent on nodes channel", - }, []string{"status", "type", "trigger"}) + }, []string{"status", "type", "trigger", "id"}) notifierUpdateReceived = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: prometheusNamespace, Name: "notifier_update_received_total", diff --git a/hscontrol/notifier/notifier.go b/hscontrol/notifier/notifier.go index 91ccfbff0f..3cc728ea97 100644 --- a/hscontrol/notifier/notifier.go +++ b/hscontrol/notifier/notifier.go @@ -140,7 +140,7 @@ func (n *Notifier) NotifyByNodeID( Any("origin", types.NotifyOriginKey.Value(ctx)). Any("origin-hostname", types.NotifyHostnameKey.Value(ctx)). Msgf("update not sent, context cancelled") - notifierUpdateSent.WithLabelValues("cancelled", update.Type.String(), types.NotifyOriginKey.Value(ctx)).Inc() + notifierUpdateSent.WithLabelValues("cancelled", update.Type.String(), types.NotifyOriginKey.Value(ctx), nodeID.NodeID().String()).Inc() return case c <- update: @@ -149,7 +149,7 @@ func (n *Notifier) NotifyByNodeID( Any("origin", ctx.Value("origin")). Any("origin-hostname", ctx.Value("hostname")). Msgf("update successfully sent on chan") - notifierUpdateSent.WithLabelValues("ok", update.Type.String(), types.NotifyOriginKey.Value(ctx)).Inc() + notifierUpdateSent.WithLabelValues("ok", update.Type.String(), types.NotifyOriginKey.Value(ctx), nodeID.NodeID().String()).Inc() } } } @@ -171,11 +171,11 @@ func (n *Notifier) sendAll(update types.StateUpdate) { Err(ctx.Err()). Uint64("node.id", id.Uint64()). Msgf("update not sent, context cancelled") - notifierUpdateSent.WithLabelValues("cancelled", update.Type.String(), "send-all").Inc() + notifierUpdateSent.WithLabelValues("cancelled", update.Type.String(), "send-all", id.NodeID().String()).Inc() return case c <- update: - notifierUpdateSent.WithLabelValues("ok", update.Type.String(), "send-all").Inc() + notifierUpdateSent.WithLabelValues("ok", update.Type.String(), "send-all", id.NodeID().String()).Inc() } } } From 1f9ffaa449a961d10a3b3e21d7ea03fb01493400 Mon Sep 17 00:00:00 2001 From: Kristoffer Dalby Date: Wed, 8 May 2024 16:14:27 +0200 Subject: [PATCH 04/25] remove mapresponse map this commit removes the mapresponse map as it complicated and caused more problems than it solved. instead, notifier is now changed to always use the last connected nodes channel and to ensure it is not remove until that client is disconnected. as part of this, offline/route change is a bit more concervative and only get sent if the channel was removed from the notifier and not if the clients longpolling session ended (there might be more than one at the time, particularly if the client is connecting.) Signed-off-by: Kristoffer Dalby --- hscontrol/app.go | 28 ---------------- hscontrol/metrics.go | 10 ------ hscontrol/noise.go | 61 +--------------------------------- hscontrol/notifier/notifier.go | 45 ++++++++++++++++++++----- hscontrol/poll.go | 46 +++++++++++++++---------- 5 files changed, 67 insertions(+), 123 deletions(-) diff --git a/hscontrol/app.go b/hscontrol/app.go index 28211db39d..3aa6450e18 100644 --- a/hscontrol/app.go +++ b/hscontrol/app.go @@ -104,9 +104,6 @@ type Headscale struct { registrationCache *cache.Cache pollNetMapStreamWG sync.WaitGroup - - mapSessions map[types.NodeID]*mapSession - mapSessionMu sync.Mutex } var ( @@ -138,7 +135,6 @@ func NewHeadscale(cfg *types.Config) (*Headscale, error) { registrationCache: registrationCache, pollNetMapStreamWG: sync.WaitGroup{}, nodeNotifier: notifier.NewNotifier(cfg), - mapSessions: make(map[types.NodeID]*mapSession), } app.db, err = db.NewHeadscaleDatabase( @@ -729,19 +725,6 @@ func (h *Headscale) Serve() error { w.WriteHeader(http.StatusOK) w.Write([]byte(h.nodeNotifier.String())) }) - debugMux.HandleFunc("/debug/mapresp", func(w http.ResponseWriter, r *http.Request) { - h.mapSessionMu.Lock() - defer h.mapSessionMu.Unlock() - - var b strings.Builder - b.WriteString("mapresponders:\n") - for k, v := range h.mapSessions { - fmt.Fprintf(&b, "\t%d: %p\n", k, v) - } - - w.WriteHeader(http.StatusOK) - w.Write([]byte(b.String())) - }) debugMux.Handle("/metrics", promhttp.Handler()) debugHTTPServer := &http.Server{ @@ -822,17 +805,6 @@ func (h *Headscale) Serve() error { expireNodeCancel() expireEphemeralCancel() - trace("closing map sessions") - wg := sync.WaitGroup{} - for _, mapSess := range h.mapSessions { - wg.Add(1) - go func() { - mapSess.close() - wg.Done() - }() - } - wg.Wait() - trace("waiting for netmap stream to close") h.pollNetMapStreamWG.Wait() diff --git a/hscontrol/metrics.go b/hscontrol/metrics.go index ff85263b6d..0e18ed3464 100644 --- a/hscontrol/metrics.go +++ b/hscontrol/metrics.go @@ -37,16 +37,6 @@ var ( Name: "mapresponse_readonly_requests_total", Help: "total count of readonly requests received", }, []string{"status"}) - mapResponseSessions = promauto.NewGauge(prometheus.GaugeOpts{ - Namespace: prometheusNamespace, - Name: "mapresponse_current_sessions_total", - Help: "total count open map response sessions", - }) - mapResponseRejected = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: prometheusNamespace, - Name: "mapresponse_rejected_new_sessions_total", - Help: "total count of new mapsessions rejected", - }, []string{"reason", "id"}) mapResponseEnded = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: prometheusNamespace, Name: "mapresponse_ended_total", diff --git a/hscontrol/noise.go b/hscontrol/noise.go index fb1bdf0849..34875d1dd5 100644 --- a/hscontrol/noise.go +++ b/hscontrol/noise.go @@ -231,67 +231,8 @@ func (ns *noiseServer) NoisePollNetMapHandler( return } - sess := ns.headscale.newMapSession(req.Context(), mapRequest, writer, node) + sess := ns.headscale.newMapSession(req.Context(), mapRequest, writer, node) sess.tracef("a node sending a MapRequest with Noise protocol") - - // If a streaming mapSession exists for this node, close it - // and start a new one. - if sess.isStreaming() { - sess.tracef("aquiring lock to check stream") - - ns.headscale.mapSessionMu.Lock() - if oldSession, ok := ns.headscale.mapSessions[node.ID]; ok { - // NOTE/TODO(kradalby): From how I understand the protocol, when - // a client connects with stream=true, and already has a streaming - // connection open, the correct way is to close the current channel - // and replace it. However, I cannot manage to get that working with - // some sort of lock/block happening on the cancelCh in the streaming - // session. - // Not closing the channel and replacing it puts us in a weird state - // which keeps a ghost stream open, receiving keep alives, but no updates. - // - // Typically a new connection is opened when one exists as a client which - // is already authenticated reconnects (e.g. down, then up). The client will - // start auth and streaming at the same time, and then cancel the streaming - // when the auth has finished successfully, opening a new connection. - // - // As a work-around to not replacing, abusing the clients "resilience" - // by reject the new connection which will cause the client to immediately - // reconnect and "fix" the issue, as the other connection typically has been - // closed, meaning there is nothing to replace. - // - // sess.infof("node has an open stream(%p), replacing with %p", oldSession, sess) - // oldSession.close() - - defer ns.headscale.mapSessionMu.Unlock() - - go func() { - oldSession.infof("mapSession (%p) is open, trying to close stream and replace with %p", oldSession, sess) - oldSession.close() - }() - - sess.infof("mapSession (%p) has an open stream, rejecting new stream", sess) - mapResponseRejected.WithLabelValues("exists", node.ID.NodeID().String()).Inc() - return - } - - ns.headscale.mapSessions[node.ID] = sess - mapResponseSessions.Inc() - ns.headscale.mapSessionMu.Unlock() - sess.tracef("releasing lock to check stream") - } - sess.serve() - - if sess.isStreaming() { - sess.tracef("aquiring lock to remove stream") - ns.headscale.mapSessionMu.Lock() - defer ns.headscale.mapSessionMu.Unlock() - - delete(ns.headscale.mapSessions, node.ID) - mapResponseSessions.Dec() - - sess.tracef("releasing lock to remove stream") - } } diff --git a/hscontrol/notifier/notifier.go b/hscontrol/notifier/notifier.go index 3cc728ea97..4f56ae3b1a 100644 --- a/hscontrol/notifier/notifier.go +++ b/hscontrol/notifier/notifier.go @@ -47,6 +47,12 @@ func (n *Notifier) AddNode(nodeID types.NodeID, c chan<- types.StateUpdate) { notifierWaitersForLock.WithLabelValues("lock", "add").Dec() notifierWaitForLock.WithLabelValues("add").Observe(time.Since(start).Seconds()) + // If a channel exists, close it + if curr, ok := n.nodes[nodeID]; ok { + log.Trace().Uint64("node.id", nodeID.Uint64()).Msg("channel present, closing and replacing") + close(curr) + } + n.nodes[nodeID] = c n.connected.Store(nodeID, true) @@ -57,7 +63,11 @@ func (n *Notifier) AddNode(nodeID types.NodeID, c chan<- types.StateUpdate) { notifierNodeUpdateChans.Inc() } -func (n *Notifier) RemoveNode(nodeID types.NodeID) { +// RemoveNode removes a node and a given channel from the notifier. +// It checks that the channel is the same as currently being updated +// and ignores the removal if it is not. +// RemoveNode reports if the node/chan was removed. +func (n *Notifier) RemoveNode(nodeID types.NodeID, c chan<- types.StateUpdate) bool { start := time.Now() notifierWaitersForLock.WithLabelValues("lock", "remove").Inc() n.l.Lock() @@ -66,7 +76,15 @@ func (n *Notifier) RemoveNode(nodeID types.NodeID) { notifierWaitForLock.WithLabelValues("remove").Observe(time.Since(start).Seconds()) if len(n.nodes) == 0 { - return + return true + } + + // If a channel exists, close it + if curr, ok := n.nodes[nodeID]; ok { + if curr != c { + log.Trace().Uint64("node.id", nodeID.Uint64()).Msg("channel has been replaced, not removing") + return false + } } delete(n.nodes, nodeID) @@ -77,6 +95,8 @@ func (n *Notifier) RemoveNode(nodeID types.NodeID) { Int("open_chans", len(n.nodes)). Msg("Removed channel") notifierNodeUpdateChans.Dec() + + return true } // IsConnected reports if a node is connected to headscale and has a @@ -189,17 +209,26 @@ func (n *Notifier) String() string { var b strings.Builder fmt.Fprintf(&b, "chans (%d):\n", len(n.nodes)) - for k, v := range n.nodes { - fmt.Fprintf(&b, "\t%d: %p\n", k, v) + var keys []types.NodeID + n.connected.Range(func(key types.NodeID, value bool) bool { + keys = append(keys, key) + return true + }) + sort.Slice(keys, func(i, j int) bool { + return keys[i] < keys[j] + }) + + for _, key := range keys { + fmt.Fprintf(&b, "\t%d: %p\n", key, n.nodes[key]) } b.WriteString("\n") fmt.Fprintf(&b, "connected (%d):\n", len(n.nodes)) - n.connected.Range(func(k types.NodeID, v bool) bool { - fmt.Fprintf(&b, "\t%d: %t\n", k, v) - return true - }) + for _, key := range keys { + val, _ := n.connected.Load(key) + fmt.Fprintf(&b, "\t%d: %t\n", key, val) + } return b.String() } diff --git a/hscontrol/poll.go b/hscontrol/poll.go index 96790b71d4..1ba6bc6216 100644 --- a/hscontrol/poll.go +++ b/hscontrol/poll.go @@ -47,6 +47,7 @@ type mapSession struct { cancelCh chan struct{} cancelChOpen bool + keepAlive time.Duration keepAliveTicker *time.Ticker node *types.Node @@ -77,6 +78,8 @@ func (h *Headscale) newMapSession( } } + ka := keepAliveInterval + (time.Duration(rand.IntN(9000)) * time.Millisecond) + return &mapSession{ h: h, ctx: ctx, @@ -90,7 +93,8 @@ func (h *Headscale) newMapSession( cancelCh: make(chan struct{}), cancelChOpen: true, - keepAliveTicker: time.NewTicker(keepAliveInterval + (time.Duration(rand.IntN(9000)) * time.Millisecond)), + keepAlive: ka, + keepAliveTicker: nil, // Loggers warnf: warnf, @@ -132,6 +136,10 @@ func (m *mapSession) isReadOnlyUpdate() bool { return !m.req.Stream && m.req.OmitPeers && m.req.ReadOnly } +func (m *mapSession) resetKeepAlive() { + m.keepAliveTicker.Reset(m.keepAlive) +} + // handlePoll ensures the node gets the appropriate updates from either // polling or immediate responses. // @@ -145,10 +153,16 @@ func (m *mapSession) serve() { // Failover the node's routes if any. defer m.infof("node has disconnected, mapSession: %p", m) - defer m.pollFailoverRoutes("node closing connection", m.node) - - defer m.h.updateNodeOnlineStatus(false, m.node) - defer m.h.nodeNotifier.RemoveNode(m.node.ID) + defer func() { + // only update node status if the node channel was removed. + // in principal, it will be removed, but the client rapidly + // reconnects, the channel might be of another connection. + // In that case, it is not closed and the node is still online. + if m.h.nodeNotifier.RemoveNode(m.node.ID, m.ch) { + m.h.updateNodeOnlineStatus(false, m.node) + m.pollFailoverRoutes("node closing connection", m.node) + } + }() defer func() { m.cancelChMu.Lock() @@ -228,6 +242,8 @@ func (m *mapSession) serve() { ctx, cancel := context.WithCancel(context.WithValue(m.ctx, nodeNameContextKey, m.node.Hostname)) defer cancel() + m.keepAliveTicker = time.NewTicker(m.keepAlive) + // Loop through updates and continuously send them to the // client. for { @@ -244,7 +260,12 @@ func (m *mapSession) serve() { return // Consume updates sent to node - case update := <-m.ch: + case update, ok := <-m.ch: + if !ok { + m.tracef("update channel closed, streaming session is likely being replaced") + return + } + m.tracef("received stream update: %s %s", update.Type.String(), update.Message) mapResponseUpdateReceived.WithLabelValues(update.Type.String()).Inc() @@ -316,7 +337,7 @@ func (m *mapSession) serve() { _, err = m.w.Write(data) if err != nil { mapResponseSent.WithLabelValues("error", updateType).Inc() - m.errf(err, "Could not write the map response, for mapSession: %p", m) + m.errf(err, "could not write the map response(%s), for mapSession: %p", update.Type.String(), m) return } @@ -331,6 +352,7 @@ func (m *mapSession) serve() { mapResponseSent.WithLabelValues("ok", updateType).Inc() m.tracef("update sent") + m.resetKeepAlive() } case <-m.keepAliveTicker.C: @@ -409,16 +431,6 @@ func (h *Headscale) updateNodeOnlineStatus(online bool, node *types.Node) { }, node.ID) } -func closeChanWithLog[C chan []byte | chan struct{} | chan types.StateUpdate](channel C, node, name string) { - log.Trace(). - Str("handler", "PollNetMap"). - Str("node", node). - Str("channel", "Done"). - Msg(fmt.Sprintf("Closing %s channel", name)) - - close(channel) -} - func (m *mapSession) handleEndpointUpdate() { m.tracef("received endpoint update") From eb332996d9a5587f992f96ee9519f8b5a0055c1b Mon Sep 17 00:00:00 2001 From: Kristoffer Dalby Date: Wed, 8 May 2024 16:31:05 +0200 Subject: [PATCH 05/25] standardise trace logging in notifier Signed-off-by: Kristoffer Dalby --- hscontrol/notifier/notifier.go | 28 ++++++++++++---------------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/hscontrol/notifier/notifier.go b/hscontrol/notifier/notifier.go index 4f56ae3b1a..7299e58bf7 100644 --- a/hscontrol/notifier/notifier.go +++ b/hscontrol/notifier/notifier.go @@ -29,7 +29,7 @@ func NewNotifier(cfg *types.Config) *Notifier { } b := newBatcher(cfg.Tuning.BatchChangeDelay, n) n.b = b - // TODO(kradalby): clean this up + go b.doWork() return n } @@ -39,6 +39,12 @@ func (n *Notifier) Close() { n.b.close() } +func (n *Notifier) tracef(nID types.NodeID, msg string, args ...any) { + log.Trace(). + Uint64("node.id", nID.Uint64()). + Int("open_chans", len(n.nodes)).Msgf(msg, args...) +} + func (n *Notifier) AddNode(nodeID types.NodeID, c chan<- types.StateUpdate) { start := time.Now() notifierWaitersForLock.WithLabelValues("lock", "add").Inc() @@ -49,17 +55,14 @@ func (n *Notifier) AddNode(nodeID types.NodeID, c chan<- types.StateUpdate) { // If a channel exists, close it if curr, ok := n.nodes[nodeID]; ok { - log.Trace().Uint64("node.id", nodeID.Uint64()).Msg("channel present, closing and replacing") + n.tracef(nodeID, "channel present, closing and replacing") close(curr) } n.nodes[nodeID] = c n.connected.Store(nodeID, true) - log.Trace(). - Uint64("node.id", nodeID.Uint64()). - Int("open_chans", len(n.nodes)). - Msg("Added new channel") + n.tracef(nodeID, "added new channel") notifierNodeUpdateChans.Inc() } @@ -82,7 +85,7 @@ func (n *Notifier) RemoveNode(nodeID types.NodeID, c chan<- types.StateUpdate) b // If a channel exists, close it if curr, ok := n.nodes[nodeID]; ok { if curr != c { - log.Trace().Uint64("node.id", nodeID.Uint64()).Msg("channel has been replaced, not removing") + n.tracef(nodeID, "channel has been replaced, not removing") return false } } @@ -90,10 +93,7 @@ func (n *Notifier) RemoveNode(nodeID types.NodeID, c chan<- types.StateUpdate) b delete(n.nodes, nodeID) n.connected.Store(nodeID, false) - log.Trace(). - Uint64("node.id", nodeID.Uint64()). - Int("open_chans", len(n.nodes)). - Msg("Removed channel") + n.tracef(nodeID, "removed channel") notifierNodeUpdateChans.Dec() return true @@ -164,11 +164,7 @@ func (n *Notifier) NotifyByNodeID( return case c <- update: - log.Trace(). - Uint64("node.id", nodeID.Uint64()). - Any("origin", ctx.Value("origin")). - Any("origin-hostname", ctx.Value("hostname")). - Msgf("update successfully sent on chan") + n.tracef(nodeID, "update successfully sent on chan, origin: %s, origin-hostname: %s", ctx.Value("origin"), ctx.Value("hostname")) notifierUpdateSent.WithLabelValues("ok", update.Type.String(), types.NotifyOriginKey.Value(ctx), nodeID.NodeID().String()).Inc() } } From 9851123d7055bcb731a8f64e5f9e1ce57415bf8c Mon Sep 17 00:00:00 2001 From: Kristoffer Dalby Date: Wed, 8 May 2024 16:33:58 +0200 Subject: [PATCH 06/25] update comments Signed-off-by: Kristoffer Dalby --- hscontrol/notifier/notifier.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/hscontrol/notifier/notifier.go b/hscontrol/notifier/notifier.go index 7299e58bf7..4331c912fa 100644 --- a/hscontrol/notifier/notifier.go +++ b/hscontrol/notifier/notifier.go @@ -53,7 +53,8 @@ func (n *Notifier) AddNode(nodeID types.NodeID, c chan<- types.StateUpdate) { notifierWaitersForLock.WithLabelValues("lock", "add").Dec() notifierWaitForLock.WithLabelValues("add").Observe(time.Since(start).Seconds()) - // If a channel exists, close it + // If a channel exists, it means the node has opened a new + // connection. Close the old channel and replace it. if curr, ok := n.nodes[nodeID]; ok { n.tracef(nodeID, "channel present, closing and replacing") close(curr) @@ -82,7 +83,8 @@ func (n *Notifier) RemoveNode(nodeID types.NodeID, c chan<- types.StateUpdate) b return true } - // If a channel exists, close it + // If the channel exist, but it does not belong + // to the caller, ignore. if curr, ok := n.nodes[nodeID]; ok { if curr != c { n.tracef(nodeID, "channel has been replaced, not removing") From 3ed9f72707675148e8f306844ec263a71d7d268c Mon Sep 17 00:00:00 2001 From: Kristoffer Dalby Date: Wed, 8 May 2024 20:56:22 -0400 Subject: [PATCH 07/25] format id in metrics Signed-off-by: Kristoffer Dalby --- hscontrol/notifier/notifier.go | 8 ++++---- hscontrol/types/node.go | 4 ++++ 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/hscontrol/notifier/notifier.go b/hscontrol/notifier/notifier.go index 4331c912fa..d10527968c 100644 --- a/hscontrol/notifier/notifier.go +++ b/hscontrol/notifier/notifier.go @@ -162,12 +162,12 @@ func (n *Notifier) NotifyByNodeID( Any("origin", types.NotifyOriginKey.Value(ctx)). Any("origin-hostname", types.NotifyHostnameKey.Value(ctx)). Msgf("update not sent, context cancelled") - notifierUpdateSent.WithLabelValues("cancelled", update.Type.String(), types.NotifyOriginKey.Value(ctx), nodeID.NodeID().String()).Inc() + notifierUpdateSent.WithLabelValues("cancelled", update.Type.String(), types.NotifyOriginKey.Value(ctx), nodeID.String()).Inc() return case c <- update: n.tracef(nodeID, "update successfully sent on chan, origin: %s, origin-hostname: %s", ctx.Value("origin"), ctx.Value("hostname")) - notifierUpdateSent.WithLabelValues("ok", update.Type.String(), types.NotifyOriginKey.Value(ctx), nodeID.NodeID().String()).Inc() + notifierUpdateSent.WithLabelValues("ok", update.Type.String(), types.NotifyOriginKey.Value(ctx), nodeID.String()).Inc() } } } @@ -189,11 +189,11 @@ func (n *Notifier) sendAll(update types.StateUpdate) { Err(ctx.Err()). Uint64("node.id", id.Uint64()). Msgf("update not sent, context cancelled") - notifierUpdateSent.WithLabelValues("cancelled", update.Type.String(), "send-all", id.NodeID().String()).Inc() + notifierUpdateSent.WithLabelValues("cancelled", update.Type.String(), "send-all", id.String()).Inc() return case c <- update: - notifierUpdateSent.WithLabelValues("ok", update.Type.String(), "send-all", id.NodeID().String()).Inc() + notifierUpdateSent.WithLabelValues("ok", update.Type.String(), "send-all", id.String()).Inc() } } } diff --git a/hscontrol/types/node.go b/hscontrol/types/node.go index 7a5756aeba..3ccadc3895 100644 --- a/hscontrol/types/node.go +++ b/hscontrol/types/node.go @@ -43,6 +43,10 @@ func (id NodeID) Uint64() uint64 { return uint64(id) } +func (id NodeID) String() string { + return strconv.FormatUint(id.Uint64(), util.Base10) +} + // Node is a Headscale client. type Node struct { ID NodeID `gorm:"primary_key"` From 97f8b36ccbd51c6636e0e85f31e86b7f156ce850 Mon Sep 17 00:00:00 2001 From: Kristoffer Dalby Date: Mon, 13 May 2024 09:52:48 -0400 Subject: [PATCH 08/25] add missing ts version Signed-off-by: Kristoffer Dalby --- integration/scenario.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/integration/scenario.go b/integration/scenario.go index 3f0eb7d277..e7f894333b 100644 --- a/integration/scenario.go +++ b/integration/scenario.go @@ -51,6 +51,8 @@ var ( tailscaleVersions2021 = map[string]bool{ "head": true, "unstable": true, + "1.64": true, // CapVer: 82 + "1.62": true, // CapVer: 82 "1.60": true, // CapVer: 82 "1.58": true, // CapVer: 82 "1.56": true, // CapVer: 82 From 0a69b261dc66a99b64905e27dddb45af46b77614 Mon Sep 17 00:00:00 2001 From: Kristoffer Dalby Date: Mon, 13 May 2024 13:29:18 -0400 Subject: [PATCH 09/25] send updates async so we reach select in poll Signed-off-by: Kristoffer Dalby --- hscontrol/poll.go | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/hscontrol/poll.go b/hscontrol/poll.go index 1ba6bc6216..49db8357e9 100644 --- a/hscontrol/poll.go +++ b/hscontrol/poll.go @@ -29,11 +29,6 @@ type contextKey string const nodeNameContextKey = contextKey("nodeName") -type sessionManager struct { - mu sync.RWMutex - sess map[types.NodeID]*mapSession -} - type mapSession struct { h *Headscale req tailcfg.MapRequest @@ -173,7 +168,7 @@ func (m *mapSession) serve() { }() m.h.nodeNotifier.AddNode(m.node.ID, m.ch) - m.h.updateNodeOnlineStatus(true, m.node) + go m.h.updateNodeOnlineStatus(true, m.node) m.infof("node has connected, mapSession: %p", m) } @@ -216,14 +211,18 @@ func (m *mapSession) serve() { // From version 68, all streaming requests can be treated as read only. if m.capVer < 68 { - // Error has been handled/written to client in the func - // return - err := m.handleSaveNode() - if err != nil { - mapResponseWriteUpdatesInStream.WithLabelValues("error").Inc() - return - } - mapResponseWriteUpdatesInStream.WithLabelValues("ok").Inc() + go func() { + // Error has been handled/written to client in the func + // return + err := m.handleSaveNode() + if err != nil { + mapResponseWriteUpdatesInStream.WithLabelValues("error").Inc() + + m.close() + return + } + mapResponseWriteUpdatesInStream.WithLabelValues("ok").Inc() + }() } // Set up the client stream From 3d928379e230e0d3f4feacbc36658963e667dd3c Mon Sep 17 00:00:00 2001 From: Kristoffer Dalby Date: Mon, 13 May 2024 13:30:08 -0400 Subject: [PATCH 10/25] expand integ test cert validity Signed-off-by: Kristoffer Dalby --- integration/hsic/hsic.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration/hsic/hsic.go b/integration/hsic/hsic.go index a118b6fc6f..d2ad334cb3 100644 --- a/integration/hsic/hsic.go +++ b/integration/hsic/hsic.go @@ -747,7 +747,7 @@ func createCertificate(hostname string) ([]byte, []byte, error) { Locality: []string{"Leiden"}, }, NotBefore: time.Now(), - NotAfter: time.Now().Add(60 * time.Minute), + NotAfter: time.Now().Add(60 * time.Hour), IsCA: true, ExtKeyUsage: []x509.ExtKeyUsage{ x509.ExtKeyUsageClientAuth, From 789bfe7edaa2229c06ca1dc5b767655f1b4b0d6f Mon Sep 17 00:00:00 2001 From: Kristoffer Dalby Date: Tue, 14 May 2024 11:31:31 -0400 Subject: [PATCH 11/25] only print nodes with not all up peers in test Signed-off-by: Kristoffer Dalby --- integration/scenario.go | 6 ++++-- integration/tailscale.go | 5 ++++- integration/tsic/tsic.go | 19 +++++++++++-------- 3 files changed, 19 insertions(+), 11 deletions(-) diff --git a/integration/scenario.go b/integration/scenario.go index e7f894333b..83ee4495d5 100644 --- a/integration/scenario.go +++ b/integration/scenario.go @@ -425,8 +425,10 @@ func (s *Scenario) WaitForTailscaleSync() error { if err != nil { for _, user := range s.users { for _, client := range user.Clients { - peers, _ := client.PrettyPeers() - log.Println(peers) + peers, allOnline, _ := client.FailingPeersAsString() + if !allOnline { + log.Println(peers) + } } } } diff --git a/integration/tailscale.go b/integration/tailscale.go index 6bcf6073be..2ea3faa92a 100644 --- a/integration/tailscale.go +++ b/integration/tailscale.go @@ -36,5 +36,8 @@ type TailscaleClient interface { Ping(hostnameOrIP string, opts ...tsic.PingOption) error Curl(url string, opts ...tsic.CurlOption) (string, error) ID() string - PrettyPeers() (string, error) + + // FailingPeersAsString returns a formatted-ish multi-line-string of peers in the client + // and a bool indicating if the clients online count and peer count is equal. + FailingPeersAsString() (string, bool, error) } diff --git a/integration/tsic/tsic.go b/integration/tsic/tsic.go index 6ae0226a11..0e3c91f8d1 100644 --- a/integration/tsic/tsic.go +++ b/integration/tsic/tsic.go @@ -691,15 +691,18 @@ func (t *TailscaleInContainer) FQDN() (string, error) { return status.Self.DNSName, nil } -// PrettyPeers returns a formatted-ish table of peers in the client. -func (t *TailscaleInContainer) PrettyPeers() (string, error) { +// FailingPeersAsString returns a formatted-ish multi-line-string of peers in the client +// and a bool indicating if the clients online count and peer count is equal. +func (t *TailscaleInContainer) FailingPeersAsString() (string, bool, error) { status, err := t.Status() if err != nil { - return "", fmt.Errorf("failed to get FQDN: %w", err) + return "", false, fmt.Errorf("failed to get FQDN: %w", err) } - str := fmt.Sprintf("Peers of %s\n", t.hostname) - str += "Hostname\tOnline\tLastSeen\n" + var b strings.Builder + + fmt.Fprintf(&b, "Peers of %s\n", t.hostname) + fmt.Fprint(&b, "Hostname\tOnline\tLastSeen\n") peerCount := len(status.Peers()) onlineCount := 0 @@ -711,12 +714,12 @@ func (t *TailscaleInContainer) PrettyPeers() (string, error) { onlineCount++ } - str += fmt.Sprintf("%s\t%t\t%s\n", peer.HostName, peer.Online, peer.LastSeen) + fmt.Fprintf(&b, "%s\t%t\t%s\n", peer.HostName, peer.Online, peer.LastSeen) } - str += fmt.Sprintf("Peer Count: %d, Online Count: %d\n\n", peerCount, onlineCount) + fmt.Fprintf(&b, "Peer Count: %d, Online Count: %d\n\n", peerCount, onlineCount) - return str, nil + return b.String(), peerCount == onlineCount, nil } // WaitForNeedsLogin blocks until the Tailscale (tailscaled) instance has From c96f11dbfff15137c49c68cb63db8916c93e5894 Mon Sep 17 00:00:00 2001 From: Kristoffer Dalby Date: Fri, 17 May 2024 10:58:00 -0400 Subject: [PATCH 12/25] set a timeout for sendall nodes Signed-off-by: Kristoffer Dalby --- hscontrol/notifier/notifier.go | 37 +++++++------- hscontrol/notifier/notifier_test.go | 2 +- hscontrol/poll.go | 78 +++++++++++++---------------- hscontrol/types/config.go | 4 +- 4 files changed, 58 insertions(+), 63 deletions(-) diff --git a/hscontrol/notifier/notifier.go b/hscontrol/notifier/notifier.go index d10527968c..358cdc493f 100644 --- a/hscontrol/notifier/notifier.go +++ b/hscontrol/notifier/notifier.go @@ -20,12 +20,14 @@ type Notifier struct { nodes map[types.NodeID]chan<- types.StateUpdate connected *xsync.MapOf[types.NodeID, bool] b *batcher + cfg *types.Config } func NewNotifier(cfg *types.Config) *Notifier { n := &Notifier{ nodes: make(map[types.NodeID]chan<- types.StateUpdate), connected: xsync.NewMapOf[types.NodeID, bool](), + cfg: cfg, } b := newBatcher(cfg.Tuning.BatchChangeDelay, n) n.b = b @@ -104,10 +106,10 @@ func (n *Notifier) RemoveNode(nodeID types.NodeID, c chan<- types.StateUpdate) b // IsConnected reports if a node is connected to headscale and has a // poll session open. func (n *Notifier) IsConnected(nodeID types.NodeID) bool { - notifierWaitersForLock.WithLabelValues("rlock", "conncheck").Inc() - n.l.RLock() - defer n.l.RUnlock() - notifierWaitersForLock.WithLabelValues("rlock", "conncheck").Dec() + notifierWaitersForLock.WithLabelValues("lock", "conncheck").Inc() + n.l.Lock() + defer n.l.Unlock() + notifierWaitersForLock.WithLabelValues("lock", "conncheck").Dec() if val, ok := n.connected.Load(nodeID); ok { return val @@ -147,10 +149,10 @@ func (n *Notifier) NotifyByNodeID( nodeID types.NodeID, ) { start := time.Now() - notifierWaitersForLock.WithLabelValues("rlock", "notify").Inc() - n.l.RLock() - defer n.l.RUnlock() - notifierWaitersForLock.WithLabelValues("rlock", "notify").Dec() + notifierWaitersForLock.WithLabelValues("lock", "notify").Inc() + n.l.Lock() + defer n.l.Unlock() + notifierWaitersForLock.WithLabelValues("lock", "notify").Dec() notifierWaitForLock.WithLabelValues("notify").Observe(time.Since(start).Seconds()) if c, ok := n.nodes[nodeID]; ok { @@ -174,14 +176,14 @@ func (n *Notifier) NotifyByNodeID( func (n *Notifier) sendAll(update types.StateUpdate) { start := time.Now() - notifierWaitersForLock.WithLabelValues("rlock", "send-all").Inc() - n.l.RLock() - defer n.l.RUnlock() - notifierWaitersForLock.WithLabelValues("rlock", "send-all").Dec() + notifierWaitersForLock.WithLabelValues("lock", "send-all").Inc() + n.l.Lock() + defer n.l.Unlock() + notifierWaitersForLock.WithLabelValues("lock", "send-all").Dec() notifierWaitForLock.WithLabelValues("send-all").Observe(time.Since(start).Seconds()) for id, c := range n.nodes { - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), n.cfg.Tuning.NotifierSendTimeout) defer cancel() select { case <-ctx.Done(): @@ -194,15 +196,16 @@ func (n *Notifier) sendAll(update types.StateUpdate) { return case c <- update: notifierUpdateSent.WithLabelValues("ok", update.Type.String(), "send-all", id.String()).Inc() + n.tracef(id, "DONE SENDING TO NODE, CHAN: %p", c) } } } func (n *Notifier) String() string { - notifierWaitersForLock.WithLabelValues("rlock", "string").Inc() - n.l.RLock() - defer n.l.RUnlock() - notifierWaitersForLock.WithLabelValues("rlock", "string").Dec() + notifierWaitersForLock.WithLabelValues("lock", "string").Inc() + n.l.Lock() + defer n.l.Unlock() + notifierWaitersForLock.WithLabelValues("lock", "string").Dec() var b strings.Builder fmt.Fprintf(&b, "chans (%d):\n", len(n.nodes)) diff --git a/hscontrol/notifier/notifier_test.go b/hscontrol/notifier/notifier_test.go index 4d61f13403..8841a46d38 100644 --- a/hscontrol/notifier/notifier_test.go +++ b/hscontrol/notifier/notifier_test.go @@ -227,7 +227,7 @@ func TestBatcher(t *testing.T) { ch := make(chan types.StateUpdate, 30) defer close(ch) n.AddNode(1, ch) - defer n.RemoveNode(1) + defer n.RemoveNode(1, ch) for _, u := range tt.updates { n.NotifyAll(context.Background(), u) diff --git a/hscontrol/poll.go b/hscontrol/poll.go index 49db8357e9..9c8e242e3e 100644 --- a/hscontrol/poll.go +++ b/hscontrol/poll.go @@ -140,39 +140,6 @@ func (m *mapSession) resetKeepAlive() { // //nolint:gocyclo func (m *mapSession) serve() { - // Register with the notifier if this is a streaming - // session - if m.isStreaming() { - // defers are called in reverse order, - // so top one is executed last. - - // Failover the node's routes if any. - defer m.infof("node has disconnected, mapSession: %p", m) - defer func() { - // only update node status if the node channel was removed. - // in principal, it will be removed, but the client rapidly - // reconnects, the channel might be of another connection. - // In that case, it is not closed and the node is still online. - if m.h.nodeNotifier.RemoveNode(m.node.ID, m.ch) { - m.h.updateNodeOnlineStatus(false, m.node) - m.pollFailoverRoutes("node closing connection", m.node) - } - }() - - defer func() { - m.cancelChMu.Lock() - defer m.cancelChMu.Unlock() - - m.cancelChOpen = false - close(m.cancelCh) - }() - - m.h.nodeNotifier.AddNode(m.node.ID, m.ch) - go m.h.updateNodeOnlineStatus(true, m.node) - - m.infof("node has connected, mapSession: %p", m) - } - // TODO(kradalby): A set todos to harden: // - func to tell the stream to die, readonly -> false, !stream && omitpeers -> false, true @@ -211,18 +178,16 @@ func (m *mapSession) serve() { // From version 68, all streaming requests can be treated as read only. if m.capVer < 68 { - go func() { - // Error has been handled/written to client in the func - // return - err := m.handleSaveNode() - if err != nil { - mapResponseWriteUpdatesInStream.WithLabelValues("error").Inc() + // Error has been handled/written to client in the func + // return + err := m.handleSaveNode() + if err != nil { + mapResponseWriteUpdatesInStream.WithLabelValues("error").Inc() - m.close() - return - } - mapResponseWriteUpdatesInStream.WithLabelValues("ok").Inc() - }() + m.close() + return + } + mapResponseWriteUpdatesInStream.WithLabelValues("ok").Inc() } // Set up the client stream @@ -243,6 +208,31 @@ func (m *mapSession) serve() { m.keepAliveTicker = time.NewTicker(m.keepAlive) + // Clean up the session when the client disconnects + defer func() { + m.cancelChMu.Lock() + m.cancelChOpen = false + close(m.cancelCh) + m.cancelChMu.Unlock() + + // only update node status if the node channel was removed. + // in principal, it will be removed, but the client rapidly + // reconnects, the channel might be of another connection. + // In that case, it is not closed and the node is still online. + if m.h.nodeNotifier.RemoveNode(m.node.ID, m.ch) { + // Failover the node's routes if any. + m.h.updateNodeOnlineStatus(false, m.node) + m.pollFailoverRoutes("node closing connection", m.node) + } + + m.infof("node has disconnected, mapSession: %p, chan: %p", m, m.ch) + }() + + m.h.nodeNotifier.AddNode(m.node.ID, m.ch) + go m.h.updateNodeOnlineStatus(true, m.node) + + m.infof("node has connected, mapSession: %p, chan: %p", m, m.ch) + // Loop through updates and continuously send them to the // client. for { diff --git a/hscontrol/types/config.go b/hscontrol/types/config.go index bd0bfeac23..0b125aa295 100644 --- a/hscontrol/types/config.go +++ b/hscontrol/types/config.go @@ -171,6 +171,7 @@ type LogConfig struct { } type Tuning struct { + NotifierSendTimeout time.Duration BatchChangeDelay time.Duration NodeMapSessionBufferedChanSize int } @@ -232,6 +233,7 @@ func LoadConfig(path string, isFile bool) error { viper.SetDefault("ephemeral_node_inactivity_timeout", "120s") + viper.SetDefault("tuning.notifier_send_timeout", "800ms") viper.SetDefault("tuning.batch_change_delay", "800ms") viper.SetDefault("tuning.node_mapsession_buffered_chan_size", 30) @@ -640,7 +642,7 @@ func GetHeadscaleConfig() (*Config, error) { }, nil } - logConfig := GetLogConfig() + logConfig := GetLogConfig() zerolog.SetGlobalLevel(logConfig.Level) prefix4, err := PrefixV4() From e8e9290ff7d55242c1f244479b45f49acdb49ced Mon Sep 17 00:00:00 2001 From: Kristoffer Dalby Date: Fri, 17 May 2024 12:03:17 -0400 Subject: [PATCH 13/25] split mapresp serve by streaming Signed-off-by: Kristoffer Dalby --- hscontrol/noise.go | 6 +++++- hscontrol/poll.go | 53 +++++++++++++++++++++++++--------------------- 2 files changed, 34 insertions(+), 25 deletions(-) diff --git a/hscontrol/noise.go b/hscontrol/noise.go index 34875d1dd5..360c7045af 100644 --- a/hscontrol/noise.go +++ b/hscontrol/noise.go @@ -234,5 +234,9 @@ func (ns *noiseServer) NoisePollNetMapHandler( sess := ns.headscale.newMapSession(req.Context(), mapRequest, writer, node) sess.tracef("a node sending a MapRequest with Noise protocol") - sess.serve() + if !sess.isStreaming() { + sess.serve() + } else { + sess.serveLongPoll() + } } diff --git a/hscontrol/poll.go b/hscontrol/poll.go index 9c8e242e3e..97e6eccf45 100644 --- a/hscontrol/poll.go +++ b/hscontrol/poll.go @@ -135,10 +135,7 @@ func (m *mapSession) resetKeepAlive() { m.keepAliveTicker.Reset(m.keepAlive) } -// handlePoll ensures the node gets the appropriate updates from either -// polling or immediate responses. -// -//nolint:gocyclo +// serve handles non-streaming requests. func (m *mapSession) serve() { // TODO(kradalby): A set todos to harden: // - func to tell the stream to die, readonly -> false, !stream && omitpeers -> false, true @@ -176,7 +173,35 @@ func (m *mapSession) serve() { return } +} + +// serveLongPoll ensures the node gets the appropriate updates from either +// polling or immediate responses. +// +//nolint:gocyclo +func (m *mapSession) serveLongPoll() { + // Clean up the session when the client disconnects + defer func() { + m.cancelChMu.Lock() + m.cancelChOpen = false + close(m.cancelCh) + m.cancelChMu.Unlock() + + // only update node status if the node channel was removed. + // in principal, it will be removed, but the client rapidly + // reconnects, the channel might be of another connection. + // In that case, it is not closed and the node is still online. + if m.h.nodeNotifier.RemoveNode(m.node.ID, m.ch) { + // Failover the node's routes if any. + m.h.updateNodeOnlineStatus(false, m.node) + m.pollFailoverRoutes("node closing connection", m.node) + } + + m.infof("node has disconnected, mapSession: %p, chan: %p", m, m.ch) + }() + // From version 68, all streaming requests can be treated as read only. + // TODO: Remove when we drop support for 1.48 if m.capVer < 68 { // Error has been handled/written to client in the func // return @@ -208,26 +233,6 @@ func (m *mapSession) serve() { m.keepAliveTicker = time.NewTicker(m.keepAlive) - // Clean up the session when the client disconnects - defer func() { - m.cancelChMu.Lock() - m.cancelChOpen = false - close(m.cancelCh) - m.cancelChMu.Unlock() - - // only update node status if the node channel was removed. - // in principal, it will be removed, but the client rapidly - // reconnects, the channel might be of another connection. - // In that case, it is not closed and the node is still online. - if m.h.nodeNotifier.RemoveNode(m.node.ID, m.ch) { - // Failover the node's routes if any. - m.h.updateNodeOnlineStatus(false, m.node) - m.pollFailoverRoutes("node closing connection", m.node) - } - - m.infof("node has disconnected, mapSession: %p, chan: %p", m, m.ch) - }() - m.h.nodeNotifier.AddNode(m.node.ID, m.ch) go m.h.updateNodeOnlineStatus(true, m.node) From fa97a869a5075d1f3d59e3b78fde6d495ab53a6f Mon Sep 17 00:00:00 2001 From: Kristoffer Dalby Date: Fri, 17 May 2024 12:03:32 -0400 Subject: [PATCH 14/25] up notifier timeout Signed-off-by: Kristoffer Dalby --- hscontrol/types/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hscontrol/types/config.go b/hscontrol/types/config.go index 0b125aa295..11d308ea86 100644 --- a/hscontrol/types/config.go +++ b/hscontrol/types/config.go @@ -233,7 +233,7 @@ func LoadConfig(path string, isFile bool) error { viper.SetDefault("ephemeral_node_inactivity_timeout", "120s") - viper.SetDefault("tuning.notifier_send_timeout", "800ms") + viper.SetDefault("tuning.notifier_send_timeout", "3000ms") viper.SetDefault("tuning.batch_change_delay", "800ms") viper.SetDefault("tuning.node_mapsession_buffered_chan_size", 30) From edcd23b8978c546a0a9c4bb1d572b7d52fdae18f Mon Sep 17 00:00:00 2001 From: Kristoffer Dalby Date: Fri, 17 May 2024 12:03:56 -0400 Subject: [PATCH 15/25] name integration test metrics by hostname Signed-off-by: Kristoffer Dalby --- integration/hsic/hsic.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration/hsic/hsic.go b/integration/hsic/hsic.go index d2ad334cb3..19c8d6515d 100644 --- a/integration/hsic/hsic.go +++ b/integration/hsic/hsic.go @@ -397,7 +397,7 @@ func (t *HeadscaleInContainer) Shutdown() error { ) } - err = t.SaveMetrics("/tmp/control/metrics.txt") + err = t.SaveMetrics(fmt.Sprintf("/tmp/control/%s_metrics.txt", t.hostname)) if err != nil { log.Printf( "Failed to metrics from control: %s", From 9060260280f4c55faa1d1036c9b0e4b1ee0b0142 Mon Sep 17 00:00:00 2001 From: Kristoffer Dalby Date: Fri, 17 May 2024 12:38:40 -0400 Subject: [PATCH 16/25] fix not setting sendtimeout Signed-off-by: Kristoffer Dalby --- hscontrol/types/config.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hscontrol/types/config.go b/hscontrol/types/config.go index 11d308ea86..ab17cfb0ee 100644 --- a/hscontrol/types/config.go +++ b/hscontrol/types/config.go @@ -233,7 +233,7 @@ func LoadConfig(path string, isFile bool) error { viper.SetDefault("ephemeral_node_inactivity_timeout", "120s") - viper.SetDefault("tuning.notifier_send_timeout", "3000ms") + viper.SetDefault("tuning.notifier_send_timeout", "800ms") viper.SetDefault("tuning.batch_change_delay", "800ms") viper.SetDefault("tuning.node_mapsession_buffered_chan_size", 30) @@ -770,6 +770,7 @@ func GetHeadscaleConfig() (*Config, error) { // TODO(kradalby): Document these settings when more stable Tuning: Tuning{ + NotifierSendTimeout: viper.GetDuration("tuning.notifier_send_timeout"), BatchChangeDelay: viper.GetDuration("tuning.batch_change_delay"), NodeMapSessionBufferedChanSize: viper.GetInt("tuning.node_mapsession_buffered_chan_size"), }, From c76128cea66d3829d3411a8bcdb62f9fa5883862 Mon Sep 17 00:00:00 2001 From: Kristoffer Dalby Date: Fri, 17 May 2024 13:07:02 -0400 Subject: [PATCH 17/25] add env options for debug deadlock and config Signed-off-by: Kristoffer Dalby --- flake.nix | 4 ++-- go.mod | 2 ++ go.sum | 4 ++++ hscontrol/app.go | 17 +++++++++---- hscontrol/notifier/notifier.go | 15 +++++++++++- hscontrol/poll.go | 4 ++-- integration/general_test.go | 44 ++++++++++++++++++---------------- integration/hsic/hsic.go | 4 ++-- 8 files changed, 62 insertions(+), 32 deletions(-) diff --git a/flake.nix b/flake.nix index 94ec6150e7..5d4978ca92 100644 --- a/flake.nix +++ b/flake.nix @@ -30,8 +30,8 @@ checkFlags = ["-short"]; # When updating go.mod or go.sum, a new sha will need to be calculated, - # update this if you have a mismatch after doing a change to those files. - vendorHash = "sha256-wXfKeiJaGe6ahOsONrQhvbuMN8flQ13b0ZjxdbFs1e8="; + # update this if you have a mismatch after doing a change to thos files. + vendorHash = "sha256-EorT2AVwA3usly/LcNor6r5UIhLCdj3L4O4ilgTIC2o="; subPackages = ["cmd/headscale"]; diff --git a/go.mod b/go.mod index 0e0e12afb7..e96bcc8a41 100644 --- a/go.mod +++ b/go.mod @@ -29,6 +29,7 @@ require ( github.com/puzpuzpuz/xsync/v3 v3.1.0 github.com/rs/zerolog v1.32.0 github.com/samber/lo v1.39.0 + github.com/sasha-s/go-deadlock v0.3.1 github.com/spf13/cobra v1.8.0 github.com/spf13/viper v1.18.2 github.com/stretchr/testify v1.9.0 @@ -155,6 +156,7 @@ require ( github.com/opencontainers/image-spec v1.1.0 // indirect github.com/opencontainers/runc v1.1.12 // indirect github.com/pelletier/go-toml/v2 v2.2.2 // indirect + github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect diff --git a/go.sum b/go.sum index 309d14e7fc..a534a8e4fa 100644 --- a/go.sum +++ b/go.sum @@ -367,6 +367,8 @@ github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaR github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= +github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 h1:q2e307iGHPdTGp0hoxKjt1H5pDo6utceo3dQVK3I5XQ= +github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o= github.com/philip-bui/grpc-zerolog v1.0.1 h1:EMacvLRUd2O1K0eWod27ZP5CY1iTNkhBDLSN+Q4JEvA= github.com/philip-bui/grpc-zerolog v1.0.1/go.mod h1:qXbiq/2X4ZUMMshsqlWyTHOcw7ns+GZmlqZZN05ZHcQ= github.com/pierrec/lz4/v4 v4.1.14/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= @@ -423,6 +425,8 @@ github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6g github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ= github.com/samber/lo v1.39.0 h1:4gTz1wUhNYLhFSKl6O+8peW0v2F4BCY034GRpU9WnuA= github.com/samber/lo v1.39.0/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA= +github.com/sasha-s/go-deadlock v0.3.1 h1:sqv7fDNShgjcaxkO0JNcOAlr8B9+cV5Ey/OB71efZx0= +github.com/sasha-s/go-deadlock v0.3.1/go.mod h1:F73l+cr82YSh10GxyRI6qZiCgK64VaZjwesgfQ1/iLM= github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/sergi/go-diff v1.3.1 h1:xkr+Oxo4BOQKmkn/B9eMK0g5Kg/983T9DqqPHwYqD+8= github.com/sergi/go-diff v1.3.1/go.mod h1:aMJSSKb2lpPvRNec0+w3fl7LP9IOFzdc9Pa4NFbPK1I= diff --git a/hscontrol/app.go b/hscontrol/app.go index 3aa6450e18..253c2671b1 100644 --- a/hscontrol/app.go +++ b/hscontrol/app.go @@ -19,6 +19,7 @@ import ( "time" "github.com/coreos/go-oidc/v3/oidc" + "github.com/davecgh/go-spew/spew" "github.com/gorilla/mux" grpcMiddleware "github.com/grpc-ecosystem/go-grpc-middleware" grpcRuntime "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" @@ -107,10 +108,12 @@ type Headscale struct { } var ( - profilingEnabled = envknob.Bool("HEADSCALE_PROFILING_ENABLED") + profilingEnabled = envknob.Bool("HEADSCALE_DEBUG_PROFILING_ENABLED") + profilingPath = envknob.String("HEADSCALE_DEBUG_PROFILING_PATH") tailsqlEnabled = envknob.Bool("HEADSCALE_DEBUG_TAILSQL_ENABLED") tailsqlStateDir = envknob.String("HEADSCALE_DEBUG_TAILSQL_STATE_DIR") tailsqlTSKey = envknob.String("TS_AUTHKEY") + dumpConfig = envknob.Bool("HEADSCALE_DEBUG_DUMP_CONFIG") ) func NewHeadscale(cfg *types.Config) (*Headscale, error) { @@ -498,14 +501,14 @@ func (h *Headscale) createRouter(grpcMux *grpcRuntime.ServeMux) *mux.Router { // Serve launches the HTTP and gRPC server service Headscale and the API. func (h *Headscale) Serve() error { - if _, enableProfile := os.LookupEnv("HEADSCALE_PROFILING_ENABLED"); enableProfile { - if profilePath, ok := os.LookupEnv("HEADSCALE_PROFILING_PATH"); ok { - err := os.MkdirAll(profilePath, os.ModePerm) + if profilingEnabled { + if profilingPath != "" { + err := os.MkdirAll(profilingPath, os.ModePerm) if err != nil { log.Fatal().Err(err).Msg("failed to create profiling directory") } - defer profile.Start(profile.ProfilePath(profilePath)).Stop() + defer profile.Start(profile.ProfilePath(profilingPath)).Stop() } else { defer profile.Start().Stop() } @@ -513,6 +516,10 @@ func (h *Headscale) Serve() error { var err error + if dumpConfig { + spew.Dump(h.cfg) + } + // Fetch an initial DERP Map before we start serving h.DERPMap = derp.GetDERPMap(h.cfg.DERP) h.mapper = mapper.NewMapper(h.db, h.cfg, h.DERPMap, h.nodeNotifier) diff --git a/hscontrol/notifier/notifier.go b/hscontrol/notifier/notifier.go index 358cdc493f..dbae73bc40 100644 --- a/hscontrol/notifier/notifier.go +++ b/hscontrol/notifier/notifier.go @@ -11,12 +11,25 @@ import ( "github.com/juanfont/headscale/hscontrol/types" "github.com/puzpuzpuz/xsync/v3" "github.com/rs/zerolog/log" + "github.com/sasha-s/go-deadlock" + "tailscale.com/envknob" "tailscale.com/tailcfg" "tailscale.com/util/set" ) +var debugDeadlock = envknob.Bool("HEADSCALE_DEBUG_DEADLOCK") +var debugDeadlockTimeout = envknob.RegisterDuration("HEADSCALE_DEBUG_DEADLOCK_TIMEOUT") + +func init() { + deadlock.Opts.Disable = !debugDeadlock + if debugDeadlock { + deadlock.Opts.DeadlockTimeout = debugDeadlockTimeout() + deadlock.Opts.PrintAllCurrentGoroutines = true + } +} + type Notifier struct { - l sync.RWMutex + l deadlock.Mutex nodes map[types.NodeID]chan<- types.StateUpdate connected *xsync.MapOf[types.NodeID, bool] b *batcher diff --git a/hscontrol/poll.go b/hscontrol/poll.go index 97e6eccf45..377c7c3885 100644 --- a/hscontrol/poll.go +++ b/hscontrol/poll.go @@ -9,13 +9,13 @@ import ( "net/netip" "sort" "strings" - "sync" "time" "github.com/juanfont/headscale/hscontrol/db" "github.com/juanfont/headscale/hscontrol/mapper" "github.com/juanfont/headscale/hscontrol/types" "github.com/rs/zerolog/log" + "github.com/sasha-s/go-deadlock" xslices "golang.org/x/exp/slices" "gorm.io/gorm" "tailscale.com/tailcfg" @@ -36,7 +36,7 @@ type mapSession struct { capVer tailcfg.CapabilityVersion mapper *mapper.Mapper - cancelChMu sync.Mutex + cancelChMu deadlock.Mutex ch chan types.StateUpdate cancelCh chan struct{} diff --git a/integration/general_test.go b/integration/general_test.go index db9bf83b7c..245e8f096a 100644 --- a/integration/general_test.go +++ b/integration/general_test.go @@ -1,6 +1,7 @@ package integration import ( + "context" "encoding/json" "fmt" "net/netip" @@ -15,6 +16,7 @@ import ( "github.com/rs/zerolog/log" "github.com/samber/lo" "github.com/stretchr/testify/assert" + "golang.org/x/sync/errgroup" "tailscale.com/client/tailscale/apitype" "tailscale.com/types/key" ) @@ -829,24 +831,10 @@ func TestPingAllByIPManyUpDown(t *testing.T) { "user2": len(MustTestVersions), } - headscaleConfig := map[string]string{ - "HEADSCALE_DERP_URLS": "", - "HEADSCALE_DERP_SERVER_ENABLED": "true", - "HEADSCALE_DERP_SERVER_REGION_ID": "999", - "HEADSCALE_DERP_SERVER_REGION_CODE": "headscale", - "HEADSCALE_DERP_SERVER_REGION_NAME": "Headscale Embedded DERP", - "HEADSCALE_DERP_SERVER_STUN_LISTEN_ADDR": "0.0.0.0:3478", - "HEADSCALE_DERP_SERVER_PRIVATE_KEY_PATH": "/tmp/derp.key", - - // Envknob for enabling DERP debug logs - "DERP_DEBUG_LOGS": "true", - "DERP_PROBER_DEBUG_LOGS": "true", - } - err = scenario.CreateHeadscaleEnv(spec, []tsic.Option{}, - hsic.WithTestName("pingallbyip"), - hsic.WithConfigEnv(headscaleConfig), + hsic.WithTestName("pingallbyipmany"), + hsic.WithEmbeddedDERPServerOnly(), hsic.WithTLS(), hsic.WithHostnameAsServerURL(), ) @@ -870,19 +858,35 @@ func TestPingAllByIPManyUpDown(t *testing.T) { success := pingAllHelper(t, allClients, allAddrs) t.Logf("%d successful pings out of %d", success, len(allClients)*len(allIps)) + wg, _ := errgroup.WithContext(context.Background()) + for run := range 3 { t.Logf("Starting DownUpPing run %d", run+1) for _, client := range allClients { - t.Logf("taking down %q", client.Hostname()) - client.Down() + c := client + wg.Go(func() error { + t.Logf("taking down %q", c.Hostname()) + return c.Down() + }) + } + + if err := wg.Wait(); err != nil { + t.Fatalf("failed to take down all nodes: %s", err) } time.Sleep(5 * time.Second) for _, client := range allClients { - t.Logf("bringing up %q", client.Hostname()) - client.Up() + c := client + wg.Go(func() error { + t.Logf("bringing up %q", c.Hostname()) + return c.Up() + }) + } + + if err := wg.Wait(); err != nil { + t.Fatalf("failed to take down all nodes: %s", err) } time.Sleep(5 * time.Second) diff --git a/integration/hsic/hsic.go b/integration/hsic/hsic.go index 19c8d6515d..0ea6d2a268 100644 --- a/integration/hsic/hsic.go +++ b/integration/hsic/hsic.go @@ -286,8 +286,8 @@ func New( } env := []string{ - "HEADSCALE_PROFILING_ENABLED=1", - "HEADSCALE_PROFILING_PATH=/tmp/profile", + "HEADSCALE_DEBUG_PROFILING_ENABLED=1", + "HEADSCALE_DEBUG_PROFILING_PATH=/tmp/profile", "HEADSCALE_DEBUG_DUMP_MAPRESPONSE_PATH=/tmp/mapresponses", } for key, value := range hsic.env { From 0cfe0dd485b485a803a3dd4b21332c9f5e760fd4 Mon Sep 17 00:00:00 2001 From: Kristoffer Dalby Date: Fri, 17 May 2024 15:08:12 -0400 Subject: [PATCH 18/25] add debug to intg test, make high cardin metrics debug flag Signed-off-by: Kristoffer Dalby --- hscontrol/notifier/metrics.go | 13 ++++++++++++- hscontrol/notifier/notifier.go | 25 ++++++++++++++++++++----- integration/hsic/hsic.go | 3 +++ 3 files changed, 35 insertions(+), 6 deletions(-) diff --git a/hscontrol/notifier/metrics.go b/hscontrol/notifier/metrics.go index 3294be784f..983e06d363 100644 --- a/hscontrol/notifier/metrics.go +++ b/hscontrol/notifier/metrics.go @@ -3,10 +3,21 @@ package notifier import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "tailscale.com/envknob" ) const prometheusNamespace = "headscale" +var debugHighCardinalityMetrics = envknob.Bool("HEADSCALE_DEBUG_HIGH_CARDINALITY_METRICS") + +func init() { + notifierUpdateSent = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: prometheusNamespace, + Name: "notifier_update_sent_total", + Help: "total count of update sent on nodes channel", + }, []string{"status", "type", "trigger", "id"}) +} + var ( notifierWaitersForLock = promauto.NewGaugeVec(prometheus.GaugeOpts{ Namespace: prometheusNamespace, @@ -23,7 +34,7 @@ var ( Namespace: prometheusNamespace, Name: "notifier_update_sent_total", Help: "total count of update sent on nodes channel", - }, []string{"status", "type", "trigger", "id"}) + }, []string{"status", "type", "trigger"}) notifierUpdateReceived = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: prometheusNamespace, Name: "notifier_update_received_total", diff --git a/hscontrol/notifier/notifier.go b/hscontrol/notifier/notifier.go index dbae73bc40..ee656b0d6c 100644 --- a/hscontrol/notifier/notifier.go +++ b/hscontrol/notifier/notifier.go @@ -177,12 +177,20 @@ func (n *Notifier) NotifyByNodeID( Any("origin", types.NotifyOriginKey.Value(ctx)). Any("origin-hostname", types.NotifyHostnameKey.Value(ctx)). Msgf("update not sent, context cancelled") - notifierUpdateSent.WithLabelValues("cancelled", update.Type.String(), types.NotifyOriginKey.Value(ctx), nodeID.String()).Inc() + if debugHighCardinalityMetrics { + notifierUpdateSent.WithLabelValues("cancelled", update.Type.String(), types.NotifyOriginKey.Value(ctx), nodeID.String()).Inc() + } else { + notifierUpdateSent.WithLabelValues("cancelled", update.Type.String(), types.NotifyOriginKey.Value(ctx)).Inc() + } return case c <- update: n.tracef(nodeID, "update successfully sent on chan, origin: %s, origin-hostname: %s", ctx.Value("origin"), ctx.Value("hostname")) - notifierUpdateSent.WithLabelValues("ok", update.Type.String(), types.NotifyOriginKey.Value(ctx), nodeID.String()).Inc() + if debugHighCardinalityMetrics { + notifierUpdateSent.WithLabelValues("ok", update.Type.String(), types.NotifyOriginKey.Value(ctx), nodeID.String()).Inc() + } else { + notifierUpdateSent.WithLabelValues("ok", update.Type.String(), types.NotifyOriginKey.Value(ctx)).Inc() + } } } } @@ -204,12 +212,19 @@ func (n *Notifier) sendAll(update types.StateUpdate) { Err(ctx.Err()). Uint64("node.id", id.Uint64()). Msgf("update not sent, context cancelled") - notifierUpdateSent.WithLabelValues("cancelled", update.Type.String(), "send-all", id.String()).Inc() + if debugHighCardinalityMetrics { + notifierUpdateSent.WithLabelValues("cancelled", update.Type.String(), "send-all", id.String()).Inc() + } else { + notifierUpdateSent.WithLabelValues("cancelled", update.Type.String(), "send-all").Inc() + } return case c <- update: - notifierUpdateSent.WithLabelValues("ok", update.Type.String(), "send-all", id.String()).Inc() - n.tracef(id, "DONE SENDING TO NODE, CHAN: %p", c) + if debugHighCardinalityMetrics { + notifierUpdateSent.WithLabelValues("ok", update.Type.String(), "send-all", id.String()).Inc() + } else { + notifierUpdateSent.WithLabelValues("ok", update.Type.String(), "send-all").Inc() + } } } } diff --git a/integration/hsic/hsic.go b/integration/hsic/hsic.go index 0ea6d2a268..2dfee668db 100644 --- a/integration/hsic/hsic.go +++ b/integration/hsic/hsic.go @@ -289,6 +289,9 @@ func New( "HEADSCALE_DEBUG_PROFILING_ENABLED=1", "HEADSCALE_DEBUG_PROFILING_PATH=/tmp/profile", "HEADSCALE_DEBUG_DUMP_MAPRESPONSE_PATH=/tmp/mapresponses", + "HEADSCALE_DEBUG_DEADLOCK=1", + "HEADSCALE_DEBUG_DEADLOCK_TIMEOUT=5s", + "HEADSCALE_DEBUG_HIGH_CARDINALITY_METRICS=1", } for key, value := range hsic.env { env = append(env, fmt.Sprintf("%s=%s", key, value)) From 516da1c7916492969e03fa1b8d2acb06fc9aa3c6 Mon Sep 17 00:00:00 2001 From: Kristoffer Dalby Date: Fri, 17 May 2024 15:12:34 -0400 Subject: [PATCH 19/25] change how conditional metric is registered Signed-off-by: Kristoffer Dalby --- hscontrol/notifier/metrics.go | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/hscontrol/notifier/metrics.go b/hscontrol/notifier/metrics.go index 983e06d363..8a7a8839f0 100644 --- a/hscontrol/notifier/metrics.go +++ b/hscontrol/notifier/metrics.go @@ -10,12 +10,22 @@ const prometheusNamespace = "headscale" var debugHighCardinalityMetrics = envknob.Bool("HEADSCALE_DEBUG_HIGH_CARDINALITY_METRICS") +var notifierUpdateSent *prometheus.CounterVec + func init() { - notifierUpdateSent = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: prometheusNamespace, - Name: "notifier_update_sent_total", - Help: "total count of update sent on nodes channel", - }, []string{"status", "type", "trigger", "id"}) + if debugHighCardinalityMetrics { + notifierUpdateSent = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: prometheusNamespace, + Name: "notifier_update_sent_total", + Help: "total count of update sent on nodes channel", + }, []string{"status", "type", "trigger", "id"}) + } else { + notifierUpdateSent = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: prometheusNamespace, + Name: "notifier_update_sent_total", + Help: "total count of update sent on nodes channel", + }, []string{"status", "type", "trigger"}) + } } var ( @@ -30,11 +40,6 @@ var ( Help: "histogram of time spent waiting for the notifier lock", Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.5, 1, 3, 5, 10}, }, []string{"action"}) - notifierUpdateSent = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: prometheusNamespace, - Name: "notifier_update_sent_total", - Help: "total count of update sent on nodes channel", - }, []string{"status", "type", "trigger"}) notifierUpdateReceived = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: prometheusNamespace, Name: "notifier_update_received_total", From 5e5b443c69e850160b7d72718476b1c16f8ec3d2 Mon Sep 17 00:00:00 2001 From: Kristoffer Dalby Date: Fri, 17 May 2024 15:56:05 -0400 Subject: [PATCH 20/25] comment on sendall timeout Signed-off-by: Kristoffer Dalby --- hscontrol/notifier/notifier.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/hscontrol/notifier/notifier.go b/hscontrol/notifier/notifier.go index ee656b0d6c..483c3f3744 100644 --- a/hscontrol/notifier/notifier.go +++ b/hscontrol/notifier/notifier.go @@ -204,6 +204,13 @@ func (n *Notifier) sendAll(update types.StateUpdate) { notifierWaitForLock.WithLabelValues("send-all").Observe(time.Since(start).Seconds()) for id, c := range n.nodes { + // Whenever an update is sent to all nodes, there is a chance that the node + // has disconnected and the goroutine that was supposed to consume the update + // has shut down the channel and is waiting for the lock held here in RemoveNode. + // This means that there is potential for a deadlock which would stop all updates + // going out to clients. This timeout prevents that from happening by moving on to the + // next node if the context is cancelled. Afther sendAll releases the lock, the add/remove + // call will succeed and the update will go to the correct nodes on the next call. ctx, cancel := context.WithTimeout(context.Background(), n.cfg.Tuning.NotifierSendTimeout) defer cancel() select { From c03497dd88a23a45411c5f3ca6e1190edeb09211 Mon Sep 17 00:00:00 2001 From: Kristoffer Dalby Date: Fri, 17 May 2024 16:03:24 -0400 Subject: [PATCH 21/25] dump config in integ test Signed-off-by: Kristoffer Dalby --- integration/hsic/hsic.go | 1 + 1 file changed, 1 insertion(+) diff --git a/integration/hsic/hsic.go b/integration/hsic/hsic.go index 2dfee668db..5b55a0a8ab 100644 --- a/integration/hsic/hsic.go +++ b/integration/hsic/hsic.go @@ -292,6 +292,7 @@ func New( "HEADSCALE_DEBUG_DEADLOCK=1", "HEADSCALE_DEBUG_DEADLOCK_TIMEOUT=5s", "HEADSCALE_DEBUG_HIGH_CARDINALITY_METRICS=1", + "HEADSCALE_DEBUG_DUMP_CONFIG=1", } for key, value := range hsic.env { env = append(env, fmt.Sprintf("%s=%s", key, value)) From e9c6687e38738af6aca5502faeeb7c33c87a1ba3 Mon Sep 17 00:00:00 2001 From: Kristoffer Dalby Date: Fri, 17 May 2024 16:14:05 -0400 Subject: [PATCH 22/25] add debug metric for last sent to node Signed-off-by: Kristoffer Dalby --- hscontrol/metrics.go | 15 +++++++++++++++ hscontrol/poll.go | 6 ++++++ 2 files changed, 21 insertions(+) diff --git a/hscontrol/metrics.go b/hscontrol/metrics.go index 0e18ed3464..835a6aac0a 100644 --- a/hscontrol/metrics.go +++ b/hscontrol/metrics.go @@ -7,8 +7,23 @@ import ( "github.com/gorilla/mux" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "tailscale.com/envknob" ) +var debugHighCardinalityMetrics = envknob.Bool("HEADSCALE_DEBUG_HIGH_CARDINALITY_METRICS") + +var mapResponseLastSentSeconds *prometheus.GaugeVec + +func init() { + if debugHighCardinalityMetrics { + mapResponseLastSentSeconds = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: prometheusNamespace, + Name: "mapresponse_last_sent_seconds", + Help: "last sent metric to node.id", + }, []string{"type", "id"}) + } +} + const prometheusNamespace = "headscale" var ( diff --git a/hscontrol/poll.go b/hscontrol/poll.go index 377c7c3885..557a5cd4e9 100644 --- a/hscontrol/poll.go +++ b/hscontrol/poll.go @@ -344,6 +344,9 @@ func (m *mapSession) serveLongPoll() { log.Trace().Str("node", m.node.Hostname).TimeDiff("timeSpent", time.Now(), startWrite).Str("mkey", m.node.MachineKey.String()).Msg("finished writing mapresp to node") + if debugHighCardinalityMetrics { + mapResponseLastSentSeconds.WithLabelValues(updateType, m.node.ID.String()).Set(float64(time.Now().Unix())) + } mapResponseSent.WithLabelValues("ok", updateType).Inc() m.tracef("update sent") m.resetKeepAlive() @@ -369,6 +372,9 @@ func (m *mapSession) serveLongPoll() { return } + if debugHighCardinalityMetrics { + mapResponseLastSentSeconds.WithLabelValues("keepalive", m.node.ID.String()).Set(float64(time.Now().Unix())) + } mapResponseSent.WithLabelValues("ok", "keepalive").Inc() } } From 00aa63cb0ee89c3e86a794053127372d21e1cbcf Mon Sep 17 00:00:00 2001 From: Kristoffer Dalby Date: Mon, 20 May 2024 16:36:45 +0100 Subject: [PATCH 23/25] add 1.66 to testing Signed-off-by: Kristoffer Dalby --- integration/scenario.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/integration/scenario.go b/integration/scenario.go index 83ee4495d5..bd004247dc 100644 --- a/integration/scenario.go +++ b/integration/scenario.go @@ -51,10 +51,11 @@ var ( tailscaleVersions2021 = map[string]bool{ "head": true, "unstable": true, - "1.64": true, // CapVer: 82 - "1.62": true, // CapVer: 82 - "1.60": true, // CapVer: 82 - "1.58": true, // CapVer: 82 + "1.66": true, // CapVer: not checked + "1.64": true, // CapVer: not checked + "1.62": true, // CapVer: not checked + "1.60": true, // CapVer: not checked + "1.58": true, // CapVer: not checked "1.56": true, // CapVer: 82 "1.54": true, // CapVer: 79 "1.52": true, // CapVer: 79 From 00470081e893a449195372dc829d0fe484df9011 Mon Sep 17 00:00:00 2001 From: Kristoffer Dalby Date: Tue, 21 May 2024 07:54:49 +0100 Subject: [PATCH 24/25] fix issue where new clients are not always setting NetInfo Signed-off-by: Kristoffer Dalby --- hscontrol/poll.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/hscontrol/poll.go b/hscontrol/poll.go index 557a5cd4e9..d3c8211769 100644 --- a/hscontrol/poll.go +++ b/hscontrol/poll.go @@ -442,6 +442,17 @@ func (m *mapSession) handleEndpointUpdate() { m.node.ApplyPeerChange(&change) sendUpdate, routesChanged := hostInfoChanged(m.node.Hostinfo, m.req.Hostinfo) + + // The node might not set NetInfo if it has not changed and if + // the full HostInfo object is overrwritten, the information is lost. + // If there is no NetInfo, keep the previous one. + // From 1.66 the client only sends it if changed: + // https://github.com/tailscale/tailscale/commit/e1011f138737286ecf5123ff887a7a5800d129a2 + // TODO(kradalby): evaulate if we need better comparing of hostinfo + // before we take the changes. + if m.req.Hostinfo.NetInfo == nil { + m.req.Hostinfo.NetInfo = m.node.Hostinfo.NetInfo + } m.node.Hostinfo = m.req.Hostinfo logTracePeerChange(m.node.Hostname, sendUpdate, &change) From a49755e1aba88e9d9e41a8c687600093f96c5356 Mon Sep 17 00:00:00 2001 From: Kristoffer Dalby Date: Tue, 21 May 2024 08:02:54 +0100 Subject: [PATCH 25/25] replace docker head file with upstream and bisect note Signed-off-by: Kristoffer Dalby --- Dockerfile.tailscale-HEAD | 50 ++++++++++++++++++++++++++++----------- 1 file changed, 36 insertions(+), 14 deletions(-) diff --git a/Dockerfile.tailscale-HEAD b/Dockerfile.tailscale-HEAD index 83ff9fe530..f78d687a40 100644 --- a/Dockerfile.tailscale-HEAD +++ b/Dockerfile.tailscale-HEAD @@ -1,21 +1,43 @@ -# This Dockerfile and the images produced are for testing headscale, -# and are in no way endorsed by Headscale's maintainers as an -# official nor supported release or distribution. +# Copyright (c) Tailscale Inc & AUTHORS +# SPDX-License-Identifier: BSD-3-Clause -FROM golang:latest +# This Dockerfile is more or less lifted from tailscale/tailscale +# to ensure a similar build process when testing the HEAD of tailscale. -RUN apt-get update \ - && apt-get install -y dnsutils git iptables ssh ca-certificates \ - && rm -rf /var/lib/apt/lists/* +FROM golang:1.22-alpine AS build-env -RUN useradd --shell=/bin/bash --create-home ssh-it-user +WORKDIR /go/src +RUN apk add --no-cache git + +# Replace `RUN git...` with `COPY` and a local checked out version of Tailscale in `./tailscale` +# to test specific commits of the Tailscale client. This is useful when trying to find out why +# something specific broke between two versions of Tailscale with for example `git bisect`. +# COPY ./tailscale . RUN git clone https://github.com/tailscale/tailscale.git -WORKDIR /go/tailscale +WORKDIR /go/src/tailscale + + +# see build_docker.sh +ARG VERSION_LONG="" +ENV VERSION_LONG=$VERSION_LONG +ARG VERSION_SHORT="" +ENV VERSION_SHORT=$VERSION_SHORT +ARG VERSION_GIT_HASH="" +ENV VERSION_GIT_HASH=$VERSION_GIT_HASH +ARG TARGETARCH + +RUN GOARCH=$TARGETARCH go install -ldflags="\ + -X tailscale.com/version.longStamp=$VERSION_LONG \ + -X tailscale.com/version.shortStamp=$VERSION_SHORT \ + -X tailscale.com/version.gitCommitStamp=$VERSION_GIT_HASH" \ + -v ./cmd/tailscale ./cmd/tailscaled ./cmd/containerboot + +FROM alpine:3.18 +RUN apk add --no-cache ca-certificates iptables iproute2 ip6tables curl -RUN git checkout main \ - && sh build_dist.sh tailscale.com/cmd/tailscale \ - && sh build_dist.sh tailscale.com/cmd/tailscaled \ - && cp tailscale /usr/local/bin/ \ - && cp tailscaled /usr/local/bin/ +COPY --from=build-env /go/bin/* /usr/local/bin/ +# For compat with the previous run.sh, although ideally you should be +# using build_docker.sh which sets an entrypoint for the image. +RUN mkdir /tailscale && ln -s /usr/local/bin/containerboot /tailscale/run.sh