diff --git a/hscontrol/notifier/notifier.go b/hscontrol/notifier/notifier.go index ed77c40e2c..f7e8b46da0 100644 --- a/hscontrol/notifier/notifier.go +++ b/hscontrol/notifier/notifier.go @@ -162,9 +162,21 @@ func (n *Notifier) sendAll(update types.StateUpdate) { notifierWaitersForLock.WithLabelValues("rlock", "send-all").Dec() notifierWaitForLock.WithLabelValues("send-all").Observe(time.Since(start).Seconds()) - for _, c := range n.nodes { - c <- update - notifierUpdateSent.WithLabelValues("ok", update.Type.String(), "send-all").Inc() + for id, c := range n.nodes { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + select { + case <-ctx.Done(): + log.Error(). + Err(ctx.Err()). + Uint64("node.id", id.Uint64()). + Msgf("update not sent, context cancelled") + notifierUpdateSent.WithLabelValues("cancelled", update.Type.String(), "send-all").Inc() + + return + case c <- update: + notifierUpdateSent.WithLabelValues("ok", update.Type.String(), "send-all").Inc() + } } } diff --git a/hscontrol/poll.go b/hscontrol/poll.go index 7df4f9e8b0..02905372ae 100644 --- a/hscontrol/poll.go +++ b/hscontrol/poll.go @@ -233,11 +233,12 @@ func (m *mapSession) serve() { case <-m.cancelCh: m.tracef("poll cancelled received") return + case <-ctx.Done(): m.tracef("poll context done") return - // Consume all updates sent to node + // Consume updates sent to node case update := <-m.ch: m.tracef("received stream update: %s %s", update.Type.String(), update.Message) mapResponseUpdateReceived.WithLabelValues(update.Type.String()).Inc() @@ -304,8 +305,6 @@ func (m *mapSession) serve() { return } - // log.Trace().Str("node", m.node.Hostname).TimeDiff("timeSpent", time.Now(), startMapResp).Str("mkey", m.node.MachineKey.String()).Int("type", int(update.Type)).Msg("finished making map response") - // Only send update if there is change if data != nil { startWrite := time.Now()