From 5c1b358e22e02f2936fddfd6433845259870f6fc Mon Sep 17 00:00:00 2001 From: sukun Date: Thu, 8 Jun 2023 14:13:00 +0530 Subject: [PATCH 1/2] swarm: implement blackhole detection --- dashboards/swarm/swarm.json | 237 ++++++++++++++++++++++ p2p/net/swarm/black_hole_detector.go | 208 +++++++++++++++++++ p2p/net/swarm/black_hole_detector_test.go | 171 ++++++++++++++++ p2p/net/swarm/dial_worker.go | 7 +- p2p/net/swarm/swarm.go | 26 +++ p2p/net/swarm/swarm_dial.go | 15 ++ p2p/net/swarm/swarm_dial_test.go | 54 +++++ p2p/net/swarm/swarm_metrics.go | 42 ++++ p2p/net/swarm/swarm_metrics_test.go | 11 + 9 files changed, 768 insertions(+), 3 deletions(-) create mode 100644 p2p/net/swarm/black_hole_detector.go create mode 100644 p2p/net/swarm/black_hole_detector_test.go diff --git a/dashboards/swarm/swarm.json b/dashboards/swarm/swarm.json index 3a1d875059..0c41bee76e 100644 --- a/dashboards/swarm/swarm.json +++ b/dashboards/swarm/swarm.json @@ -35,6 +35,18 @@ "name": "Prometheus", "version": "1.0.0" }, + { + "type": "panel", + "id": "stat", + "name": "Stat", + "version": "" + }, + { + "type": "panel", + "id": "state-timeline", + "name": "State timeline", + "version": "" + }, { "type": "panel", "id": "timeseries", @@ -3026,8 +3038,233 @@ ], "title": "Dials per connection", "type": "piechart" + }, + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 84 + }, + "id": 44, + "panels": [], + "title": "Black Hole Detection", + "type": "row" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "fixed" + }, + "custom": { + "fillOpacity": 76, + "lineWidth": 0, + "spanNulls": true + }, + "mappings": [ + { + "options": { + "0": { + "color": "green", + "index": 0, + "text": "Allowed" + }, + "1": { + "color": "purple", + "index": 1, + "text": "Blocked" + } + }, + "type": "value" + } + ], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 24, + "x": 0, + "y": 85 + }, + "id": 46, + "options": { + "alignValue": "center", + "legend": { + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "mergeValues": true, + "rowHeight": 0.9, + "showValue": "always", + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "libp2p_swarm_black_hole_filter_state{instance=~\"$instance\"}", + "legendFormat": "{{instance}} {{name}}", + "range": true, + "refId": "A" + } + ], + "title": "Black Hole Filter State", + "type": "state-timeline" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "fixedColor": "purple", + "mode": "fixed" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 7, + "w": 12, + "x": 0, + "y": 91 + }, + "id": 49, + "options": { + "colorMode": "value", + "graphMode": "none", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "textMode": "value_and_name" + }, + "pluginVersion": "9.3.6", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "libp2p_swarm_black_hole_filter_next_request_allowed_after{instance=~\"$instance\"}", + "legendFormat": "{{instance}}: {{name}}", + "range": true, + "refId": "A" + } + ], + "title": "Black Hole Filter Requests Till Next Probe", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "max": 10, + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "purple", + "value": null + }, + { + "color": "green", + "value": 1 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 7, + "w": 12, + "x": 12, + "y": 91 + }, + "id": 47, + "options": { + "orientation": "vertical", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showThresholdLabels": false, + "showThresholdMarkers": true + }, + "pluginVersion": "9.3.6", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "exemplar": false, + "expr": "libp2p_swarm_black_hole_filter_success_fraction{instance=~\"$instance\"} * 100", + "instant": true, + "legendFormat": "{{instance}} {{name}}", + "range": false, + "refId": "A" + } + ], + "title": "Black Hole Filter Success Percentage", + "type": "gauge" } ], + "refresh": false, "schemaVersion": 37, "style": "dark", "tags": [], diff --git a/p2p/net/swarm/black_hole_detector.go b/p2p/net/swarm/black_hole_detector.go new file mode 100644 index 0000000000..3c63868ac1 --- /dev/null +++ b/p2p/net/swarm/black_hole_detector.go @@ -0,0 +1,208 @@ +package swarm + +import ( + "sync" + + ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" +) + +type outcome int + +const ( + outcomeSuccess outcome = iota + outcomeFailed +) + +type blackHoleState int + +const ( + blackHoleStateAllowed blackHoleState = iota + blackHoleStateBlocked +) + +// blackHoleFilter provides black hole filtering logic for dials. On detecting a black holed +// network environment, subsequent dials are blocked and only 1 dial every n requests is allowed. +// This should be used in conjunction with an UDP or IPv6 address filter to detect UDP or +// IPv6 black hole. +// Requests are blocked if the success fraction in the last n outcomes is less than +// minSuccessFraction. If a request succeeds in Blocked state, the filter state is reset and n +// subsequent requests are allowed before reevaluating black hole status. Evaluating over n +// outcomes avoids situations where a dial was cancelled because a competing dial succeeded, +// the address was unreachable, and other false negatives. +type blackHoleFilter struct { + // n is the minimum number of completed dials required before we start blocking. + // Every nth request is allowed irrespective of the status of the detector. + n int + // minSuccessFraction is the minimum success fraction required to allow dials. + minSuccessFraction float64 + // name for the detector. + name string + + // requests counts number of dial requests up to n. Resets to 0 every nth request. + requests int + // outcomes of the last `n` allowed dials + outcomes []outcome + // successes is the count of successful dials in outcomes + successes int + // failures is the count of failed dials in outcomes + failures int + // state is the current state of the detector + state blackHoleState + + mu sync.Mutex + metricsTracer MetricsTracer +} + +// RecordOutcome records the outcome of a dial. A successful dial will change the state +// of the filter to Allowed. A failed dial only blocks subsequent requests if the success +// fraction over the last n outcomes is less than the minSuccessFraction of the filter. +func (b *blackHoleFilter) RecordOutcome(success bool) { + if b == nil { + return + } + + b.mu.Lock() + defer b.mu.Unlock() + + if b.state == blackHoleStateBlocked && success { + // If the call succeeds in a blocked state we reset to allowed. + // This is better than slowly accumulating values till we cross the minSuccessFraction + // threshold since a blackhole is a binary property. + b.reset() + return + } + + if success { + b.successes++ + b.outcomes = append(b.outcomes, outcomeSuccess) + } else { + b.failures++ + b.outcomes = append(b.outcomes, outcomeFailed) + } + + if len(b.outcomes) > b.n { + if b.outcomes[0] == outcomeSuccess { + b.successes-- + } else { + b.failures-- + } + b.outcomes = b.outcomes[1 : b.n+1] + } + + b.updateState() + b.trackMetrics() +} + +func (b *blackHoleFilter) IsAllowed() (state blackHoleState, isAllowed bool) { + if b == nil { + return blackHoleStateAllowed, true + } + + b.mu.Lock() + defer b.mu.Unlock() + b.requests++ + if b.requests == b.n { + b.requests = 0 + } + b.trackMetrics() + return b.state, (b.state == blackHoleStateAllowed) || (b.requests == 0) +} + +func (b *blackHoleFilter) reset() { + b.successes = 0 + b.failures = 0 + b.outcomes = b.outcomes[:0] + b.updateState() +} + +func (b *blackHoleFilter) updateState() { + st := b.state + successFraction := 0.0 + if len(b.outcomes) < b.n { + b.state = blackHoleStateAllowed + } else { + successFraction = float64(b.successes) / float64(b.successes+b.failures) + if successFraction >= b.minSuccessFraction { + b.state = blackHoleStateAllowed + } else { + b.state = blackHoleStateBlocked + } + } + if st != b.state { + if b.state == blackHoleStateAllowed { + log.Debugf("%s blackHoleDetector state changed to Allowed", b.name) + } else { + log.Debugf("%s blackHoleDetector state changed to Blocked. Success fraction is %0.3f", b.name, successFraction) + } + } +} + +func (b *blackHoleFilter) trackMetrics() { + if b.metricsTracer == nil { + return + } + successFraction := 0.0 + if b.successes+b.failures != 0 { + successFraction = float64(b.successes) / float64(b.successes+b.failures) + } + b.metricsTracer.UpdatedBlackHoleFilterState( + b.name, + b.state, + b.n-b.requests, + successFraction, + ) +} + +// blackHoleDetector provides UDP and IPv6 black hole detection using a `blackHoleFilter` +// for each. For details of the black hole detection logic see `blackHoleFilter` +type blackHoleDetector struct { + udp, ipv6 *blackHoleFilter +} + +func (d *blackHoleDetector) IsAllowed(addr ma.Multiaddr) bool { + if !manet.IsPublicAddr(addr) { + return true + } + + udpState, udpAllowed := blackHoleStateAllowed, true + if d.udp != nil && isProtocolAddr(addr, ma.P_UDP) { + udpState, udpAllowed = d.udp.IsAllowed() + } + + ipv6State, ipv6Allowed := blackHoleStateAllowed, true + if d.ipv6 != nil && isProtocolAddr(addr, ma.P_IP6) { + ipv6State, ipv6Allowed = d.ipv6.IsAllowed() + } + + // Allow all probes irrespective of the state of the other filter + if (udpState == blackHoleStateBlocked && udpAllowed) || + (ipv6State == blackHoleStateBlocked && ipv6Allowed) { + return true + } + return (udpAllowed && ipv6Allowed) +} + +// RecordOutcome updates the state of the relevant `blackHoleFilter` for addr +func (d *blackHoleDetector) RecordOutcome(addr ma.Multiaddr, success bool) { + if !manet.IsPublicAddr(addr) { + return + } + if d.udp != nil && isProtocolAddr(addr, ma.P_UDP) { + d.udp.RecordOutcome(success) + } + if d.ipv6 != nil && isProtocolAddr(addr, ma.P_IP6) { + d.ipv6.RecordOutcome(success) + } +} + +func newBlackHoleDetector(detectUDP, detectIPv6 bool, mt MetricsTracer) *blackHoleDetector { + d := &blackHoleDetector{} + if detectUDP { + d.udp = &blackHoleFilter{n: 100, minSuccessFraction: 0.01, name: "UDP", metricsTracer: mt} + } + if detectIPv6 { + d.ipv6 = &blackHoleFilter{n: 100, minSuccessFraction: 0.01, name: "IPv6", metricsTracer: mt} + } + return d +} diff --git a/p2p/net/swarm/black_hole_detector_test.go b/p2p/net/swarm/black_hole_detector_test.go new file mode 100644 index 0000000000..e53a800241 --- /dev/null +++ b/p2p/net/swarm/black_hole_detector_test.go @@ -0,0 +1,171 @@ +package swarm + +import ( + "testing" + + ma "github.com/multiformats/go-multiaddr" +) + +func TestBlackHoleFilterReset(t *testing.T) { + n := 10 + bhd := &blackHoleFilter{n: n, minSuccessFraction: 0.05, name: "test"} + var i = 0 + // calls up to threshold should be allowed + for i = 1; i <= n; i++ { + if _, isAllowed := bhd.IsAllowed(); !isAllowed { + t.Fatalf("expected calls up to minDials to be allowed") + } + bhd.RecordOutcome(false) + } + // after threshold calls every nth call should be allowed + for i = n + 1; i < 42; i++ { + _, isAllowed := bhd.IsAllowed() + if (i%n == 0 && !isAllowed) || (i%n != 0 && isAllowed) { + t.Fatalf("expected every nth dial to be allowed") + } + } + + bhd.RecordOutcome(true) + // check if calls up to threshold are allowed after success + for i = 0; i < n; i++ { + if _, isAllowed := bhd.IsAllowed(); !isAllowed { + t.Fatalf("expected black hole detector state to reset after success") + } + bhd.RecordOutcome(false) + } + + // next call should be refused + if _, isAllowed := bhd.IsAllowed(); isAllowed { + t.Fatalf("expected dial to be blocked") + } +} + +func TestBlackHoleDetector(t *testing.T) { + n := 10 + bhd := &blackHoleFilter{n: n, minSuccessFraction: 0.4, name: "test"} + var i = 0 + // 5 success and 5 fails + for i = 1; i <= 5; i++ { + bhd.RecordOutcome(true) + } + for i = 1; i <= 5; i++ { + bhd.RecordOutcome(false) + } + + if _, isAllowed := bhd.IsAllowed(); !isAllowed { + t.Fatalf("expected dial to be allowed") + } + // 4 success and 6 fails + bhd.RecordOutcome(false) + + if _, isAllowed := bhd.IsAllowed(); !isAllowed { + t.Fatalf("expected dial to be allowed") + } + // 3 success and 7 fails + bhd.RecordOutcome(false) + + // should be blocked + if _, isAllowed := bhd.IsAllowed(); isAllowed { + t.Fatalf("expected dial to be blocked") + } + + bhd.RecordOutcome(true) + // 5 success and 5 fails + for i = 1; i <= 5; i++ { + bhd.RecordOutcome(true) + } + for i = 1; i <= 5; i++ { + bhd.RecordOutcome(false) + } + + if _, isAllowed := bhd.IsAllowed(); !isAllowed { + t.Fatalf("expected dial to be allowed") + } + // 4 success and 6 fails + bhd.RecordOutcome(false) + + if _, isAllowed := bhd.IsAllowed(); !isAllowed { + t.Fatalf("expected dial to be allowed") + } + // 3 success and 7 fails + bhd.RecordOutcome(false) + + // should be blocked + if _, isAllowed := bhd.IsAllowed(); isAllowed { + t.Fatalf("expected dial to be blocked") + } + +} + +func TestBlackHoleDetectorInApplicableAddress(t *testing.T) { + bhd := newBlackHoleDetector(true, true, nil) + addr := ma.StringCast("/ip4/127.0.0.1/tcp/1234") + for i := 0; i < 1000; i++ { + if !bhd.IsAllowed(addr) { + t.Fatalf("expect dials to inapplicable address to always be allowed") + } + bhd.RecordOutcome(addr, false) + } +} + +func TestBlackHoleDetectorUDP(t *testing.T) { + bhd := newBlackHoleDetector(true, true, nil) + addr := ma.StringCast("/ip4/1.2.3.4/udp/1234") + for i := 0; i < 100; i++ { + bhd.RecordOutcome(addr, false) + } + if bhd.IsAllowed(addr) { + t.Fatalf("expect dial to be be blocked") + } + + bhd = newBlackHoleDetector(false, true, nil) + for i := 0; i < 100; i++ { + bhd.RecordOutcome(addr, false) + } + if !bhd.IsAllowed(addr) { + t.Fatalf("expected dial to be be allowed when UDP detection is disabled") + } +} + +func TestBlackHoleDetectorIPv6(t *testing.T) { + bhd := newBlackHoleDetector(true, true, nil) + addr := ma.StringCast("/ip6/1::1/tcp/1234") + for i := 0; i < 100; i++ { + bhd.RecordOutcome(addr, false) + } + if bhd.IsAllowed(addr) { + t.Fatalf("expect dial to be be blocked") + } + + bhd = newBlackHoleDetector(true, false, nil) + for i := 0; i < 100; i++ { + bhd.RecordOutcome(addr, false) + } + if !bhd.IsAllowed(addr) { + t.Fatalf("expected dial to be be allowed when IPv6 detection is disabled") + } +} + +func TestBlackHoleDetectorProbes(t *testing.T) { + bhd := &blackHoleDetector{ + udp: &blackHoleFilter{n: 2, minSuccessFraction: 0.5}, + ipv6: &blackHoleFilter{n: 3, minSuccessFraction: 0.5}, + } + udp6Addr := ma.StringCast("/ip6/1::1/udp/1234/quic-v1") + for i := 0; i < 3; i++ { + bhd.RecordOutcome(udp6Addr, false) + } + for i := 1; i < 100; i++ { + isAllowed := bhd.IsAllowed(udp6Addr) + if i%2 == 0 || i%3 == 0 { + if !isAllowed { + t.Fatalf("expected probe to be allowed irrespective of the state of other black hole filter") + } + } else { + if isAllowed { + t.Fatalf("expected dial to be blocked") + } + } + } + +} diff --git a/p2p/net/swarm/dial_worker.go b/p2p/net/swarm/dial_worker.go index 5688494f49..a8252c9680 100644 --- a/p2p/net/swarm/dial_worker.go +++ b/p2p/net/swarm/dial_worker.go @@ -295,8 +295,8 @@ loop: ad.dialRankingDelay = now.Sub(ad.createdAt) err := w.s.dialNextAddr(ad.ctx, w.peer, ad.addr, w.resch) if err != nil { - // the actual dial happens in a different go routine. An err here - // only happens in case of backoff. handle that. + // Errored without attempting a dial. This happens in case of + // backoff or black hole. w.dispatchError(ad, err) } else { // the dial was successful. update inflight dials @@ -358,7 +358,8 @@ loop: } // it must be an error -- add backoff if applicable and dispatch - if res.Err != context.Canceled && !w.connected { + // ErrDialRefusedBlackHole shouldn't end up here, just a safety check + if res.Err != ErrDialRefusedBlackHole && res.Err != context.Canceled && !w.connected { // we only add backoff if there has not been a successful connection // for consistency with the old dialer behavior. w.s.backf.AddBackoff(w.peer, res.Addr) diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index 0bffab99a2..65f3294837 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -108,6 +108,24 @@ func WithDialRanker(d network.DialRanker) Option { } } +// WithNoIPv6BlackHoleDetection configures swarm to not do any black hole detection for +// IPv6 addresses +func WithNoIPv6BlackHoleDetection() Option { + return func(s *Swarm) error { + s.disableIPv6BlackHoleDetection = true + return nil + } +} + +// WithNoUDPBlackHoleDetection configures swarm to not do any black hole detection for +// UDP addresses +func WithNoUDPBlackHoleDetection() Option { + return func(s *Swarm) error { + s.disableUDPBlackHoleDetection = true + return nil + } +} + // Swarm is a connection muxer, allowing connections to other peers to // be opened and closed, while still using the same Chan for all // communication. The Chan sends/receives Messages, which note the @@ -173,6 +191,10 @@ type Swarm struct { metricsTracer MetricsTracer dialRanker network.DialRanker + + disableIPv6BlackHoleDetection bool + disableUDPBlackHoleDetection bool + bhd *blackHoleDetector } // NewSwarm constructs a Swarm. @@ -209,8 +231,12 @@ func NewSwarm(local peer.ID, peers peerstore.Peerstore, eventBus event.Bus, opts } s.dsync = newDialSync(s.dialWorkerLoop) + s.limiter = newDialLimiter(s.dialAddr) s.backf.init(s.ctx) + + s.bhd = newBlackHoleDetector(!s.disableUDPBlackHoleDetection, !s.disableIPv6BlackHoleDetection, s.metricsTracer) + return s, nil } diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index f0c941320e..91b3e866a9 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -39,6 +39,9 @@ var ( // been dialed too frequently ErrDialBackoff = errors.New("dial backoff") + // ErrDialRefusedBlackHole is returned when we are in a black holed environment + ErrDialRefusedBlackHole = errors.New("dial refused because of black hole") + // ErrDialToSelf is returned if we attempt to dial our own peer ErrDialToSelf = errors.New("dial to self attempted") @@ -393,6 +396,15 @@ func (s *Swarm) dialNextAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr, } } + // Check if dial to the address is black holed. + // This is the best place to have this check since we want to ensure that periodic + // probes allowed by the black hole detector are actually dialed. If this check is + // done before the dial prioritisation logic, we might not dial the address because + // a higher priority address succeeded. + if !s.bhd.IsAllowed(addr) { + return ErrDialRefusedBlackHole + } + // start the dial s.limitedDial(ctx, p, addr, resch) @@ -484,6 +496,9 @@ func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (tra start := time.Now() connC, err := tpt.Dial(ctx, addr, p) + + s.bhd.RecordOutcome(addr, err == nil) + if err != nil { if s.metricsTracer != nil { s.metricsTracer.FailedDialing(addr, err) diff --git a/p2p/net/swarm/swarm_dial_test.go b/p2p/net/swarm/swarm_dial_test.go index f22144ee8c..bfe84c7fdb 100644 --- a/p2p/net/swarm/swarm_dial_test.go +++ b/p2p/net/swarm/swarm_dial_test.go @@ -330,3 +330,57 @@ func TestAddrsForDialFiltering(t *testing.T) { }) } } + +func TestBlackHoledAddrBlocked(t *testing.T) { + resolver, err := madns.NewResolver() + if err != nil { + t.Fatal(err) + } + s := newTestSwarmWithResolver(t, resolver) + defer s.Close() + + n := 3 + s.bhd.udp = &blackHoleFilter{n: n, minSuccessFraction: 0.5, name: "UDP"} + + // all dials to the address will fail. RFC6666 Discard Prefix + addr := ma.StringCast("/ip6/0100::1/udp/54321/quic-v1") + + p, err := test.RandPeerID() + if err != nil { + t.Error(err) + } + s.Peerstore().AddAddr(p, addr, peerstore.PermanentAddrTTL) + + // do 1 extra dial to ensure that the blackHoleDetector state is updated since it + // happens in a different goroutine + for i := 0; i < n+1; i++ { + s.backf.Clear(p) + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + conn, err := s.DialPeer(ctx, p) + if err == nil || conn != nil { + t.Fatalf("expected dial to fail") + } + cancel() + } + s.backf.Clear(p) + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + conn, err := s.DialPeer(ctx, p) + if conn != nil { + t.Fatalf("expected dial to be blocked") + } + dialError, ok := err.(*DialError) + if !ok { + t.Fatalf("expected to receive an error of type *DialError, got %T", err) + } + isBlackHoleErr := false + for _, err := range dialError.DialErrors { + if err.Cause == ErrDialRefusedBlackHole { + isBlackHoleErr = true + break + } + } + if !isBlackHoleErr { + t.Fatalf("expected to receive ErrDialRefusedBlackHole %s", err) + } +} diff --git a/p2p/net/swarm/swarm_metrics.go b/p2p/net/swarm/swarm_metrics.go index 3110217f81..dfa0698b3e 100644 --- a/p2p/net/swarm/swarm_metrics.go +++ b/p2p/net/swarm/swarm_metrics.go @@ -85,6 +85,30 @@ var ( Buckets: []float64{0.001, 0.01, 0.05, 0.1, 0.2, 0.3, 0.4, 0.5, 0.75, 1, 2}, }, ) + blackHoleFilterState = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: metricNamespace, + Name: "black_hole_filter_state", + Help: "State of the black hole filter", + }, + []string{"name"}, + ) + blackHoleFilterSuccessFraction = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: metricNamespace, + Name: "black_hole_filter_success_fraction", + Help: "Fraction of successful dials among the last n requests", + }, + []string{"name"}, + ) + blackHoleFilterNextRequestAllowedAfter = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: metricNamespace, + Name: "black_hole_filter_next_request_allowed_after", + Help: "Number of requests after which the next request will be allowed", + }, + []string{"name"}, + ) collectors = []prometheus.Collector{ connsOpened, keyTypes, @@ -94,6 +118,9 @@ var ( connHandshakeLatency, dialsPerPeer, dialRankingDelay, + blackHoleFilterSuccessFraction, + blackHoleFilterState, + blackHoleFilterNextRequestAllowedAfter, } ) @@ -104,6 +131,7 @@ type MetricsTracer interface { FailedDialing(ma.Multiaddr, error) DialCompleted(success bool, totalDials int) DialRankingDelay(d time.Duration) + UpdatedBlackHoleFilterState(name string, state blackHoleState, nextProbeAfter int, successFraction float64) } type metricsTracer struct{} @@ -235,3 +263,17 @@ func (m *metricsTracer) DialCompleted(success bool, totalDials int) { func (m *metricsTracer) DialRankingDelay(d time.Duration) { dialRankingDelay.Observe(d.Seconds()) } + +func (m *metricsTracer) UpdatedBlackHoleFilterState(name string, state blackHoleState, + nextProbeAfter int, successFraction float64) { + tags := metricshelper.GetStringSlice() + defer metricshelper.PutStringSlice(tags) + + *tags = append(*tags, name) + + blackHoleFilterState.WithLabelValues(*tags...).Set(float64(state)) + blackHoleFilterSuccessFraction.WithLabelValues(*tags...).Set(successFraction) + if state == blackHoleStateBlocked { + blackHoleFilterNextRequestAllowedAfter.WithLabelValues(*tags...).Set(float64(nextProbeAfter)) + } +} diff --git a/p2p/net/swarm/swarm_metrics_test.go b/p2p/net/swarm/swarm_metrics_test.go index 0e13048a99..25e13f3213 100644 --- a/p2p/net/swarm/swarm_metrics_test.go +++ b/p2p/net/swarm/swarm_metrics_test.go @@ -78,6 +78,9 @@ func TestMetricsNoAllocNoCover(t *testing.T) { ma.StringCast("/ip4/1.2.3.4/udp/2345"), } + bhfNames := []string{"udp", "ipv6", "tcp", "icmp"} + bhfState := []blackHoleState{blackHoleStateAllowed, blackHoleStateBlocked} + tests := map[string]func(){ "OpenedConnection": func() { mt.OpenedConnection(randItem(directions), randItem(keys), randItem(connections), randItem(addrs)) @@ -91,6 +94,14 @@ func TestMetricsNoAllocNoCover(t *testing.T) { "FailedDialing": func() { mt.FailedDialing(randItem(addrs), randItem(errors)) }, "DialCompleted": func() { mt.DialCompleted(mrand.Intn(2) == 1, mrand.Intn(10)) }, "DialRankingDelay": func() { mt.DialRankingDelay(time.Duration(mrand.Intn(1e10))) }, + "UpdatedBlackHoleFilterState": func() { + mt.UpdatedBlackHoleFilterState( + randItem(bhfNames), + randItem(bhfState), + mrand.Intn(100), + mrand.Float64(), + ) + }, } for method, f := range tests { From 41808dfe60aa4a88decdf20721057d4a281924ec Mon Sep 17 00:00:00 2001 From: sukun Date: Wed, 28 Jun 2023 00:51:29 +0530 Subject: [PATCH 2/2] address review comments --- dashboards/swarm/swarm.json | 29 ++- p2p/net/swarm/black_hole_detector.go | 225 ++++++++++++--------- p2p/net/swarm/black_hole_detector_test.go | 226 +++++++++++----------- p2p/net/swarm/dial_worker.go | 4 + p2p/net/swarm/swarm.go | 24 +-- p2p/net/swarm/swarm_dial.go | 18 +- p2p/net/swarm/swarm_dial_test.go | 19 +- p2p/net/swarm/swarm_metrics.go | 4 +- 8 files changed, 295 insertions(+), 254 deletions(-) diff --git a/dashboards/swarm/swarm.json b/dashboards/swarm/swarm.json index 0c41bee76e..9d15e6f0ef 100644 --- a/dashboards/swarm/swarm.json +++ b/dashboards/swarm/swarm.json @@ -3071,13 +3071,18 @@ { "options": { "0": { - "color": "green", + "color": "blue", "index": 0, - "text": "Allowed" + "text": "Probing" }, "1": { - "color": "purple", + "color": "green", "index": 1, + "text": "Allowed" + }, + "2": { + "color": "purple", + "index": 2, "text": "Blocked" } }, @@ -3145,7 +3150,17 @@ "fixedColor": "purple", "mode": "fixed" }, - "mappings": [], + "mappings": [ + { + "options": { + "0": { + "index": 0, + "text": "-" + } + }, + "type": "value" + } + ], "thresholds": { "mode": "absolute", "steps": [ @@ -3169,7 +3184,7 @@ "colorMode": "value", "graphMode": "none", "justifyMode": "auto", - "orientation": "auto", + "orientation": "horizontal", "reduceOptions": { "calcs": [ "lastNotNull" @@ -3218,7 +3233,7 @@ }, { "color": "green", - "value": 1 + "value": 5 } ] } @@ -3302,6 +3317,6 @@ "timezone": "", "title": "libp2p Swarm", "uid": "a15PyhO4z", - "version": 6, + "version": 7, "weekStart": "" } \ No newline at end of file diff --git a/p2p/net/swarm/black_hole_detector.go b/p2p/net/swarm/black_hole_detector.go index 3c63868ac1..0c415080e0 100644 --- a/p2p/net/swarm/black_hole_detector.go +++ b/p2p/net/swarm/black_hole_detector.go @@ -1,52 +1,70 @@ package swarm import ( + "fmt" "sync" ma "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" ) -type outcome int +type blackHoleState int const ( - outcomeSuccess outcome = iota - outcomeFailed + blackHoleStateProbing blackHoleState = iota + blackHoleStateAllowed + blackHoleStateBlocked ) -type blackHoleState int +func (st blackHoleState) String() string { + switch st { + case blackHoleStateProbing: + return "Probing" + case blackHoleStateAllowed: + return "Allowed" + case blackHoleStateBlocked: + return "Blocked" + default: + return fmt.Sprintf("Unknown %d", st) + } +} + +type blackHoleResult int const ( - blackHoleStateAllowed blackHoleState = iota - blackHoleStateBlocked + blackHoleResultAllowed blackHoleResult = iota + blackHoleResultProbing + blackHoleResultBlocked ) -// blackHoleFilter provides black hole filtering logic for dials. On detecting a black holed -// network environment, subsequent dials are blocked and only 1 dial every n requests is allowed. -// This should be used in conjunction with an UDP or IPv6 address filter to detect UDP or -// IPv6 black hole. -// Requests are blocked if the success fraction in the last n outcomes is less than -// minSuccessFraction. If a request succeeds in Blocked state, the filter state is reset and n -// subsequent requests are allowed before reevaluating black hole status. Evaluating over n -// outcomes avoids situations where a dial was cancelled because a competing dial succeeded, -// the address was unreachable, and other false negatives. +// blackHoleFilter provides black hole filtering for dials. This filter should be used in +// concert with a UDP of IPv6 address filter to detect UDP or IPv6 black hole. In a black +// holed environments dial requests are blocked and only periodic probes to check the +// state of the black hole are allowed. +// +// Requests are blocked if the number of successes in the last n dials is less than +// minSuccesses. If a request succeeds in Blocked state, the filter state is reset and n +// subsequent requests are allowed before reevaluating black hole state. Dials cancelled +// when some other concurrent dial succeeded are counted as failures. A sufficiently large +// n prevents false negatives in such cases. type blackHoleFilter struct { - // n is the minimum number of completed dials required before we start blocking. - // Every nth request is allowed irrespective of the status of the detector. + // n serves the dual purpose of being the minimum number of requests after which we + // probe the state of the black hole in blocked state and the minimum number of + // completed dials required before evaluating black hole state. n int - // minSuccessFraction is the minimum success fraction required to allow dials. - minSuccessFraction float64 + // minSuccesses is the minimum number of Success required in the last n dials + // to consider we are not blocked. + minSuccesses int // name for the detector. name string - // requests counts number of dial requests up to n. Resets to 0 every nth request. + // requests counts number of dial requests to peers. We handle request at a peer + // level and record results at individual address dial level. requests int - // outcomes of the last `n` allowed dials - outcomes []outcome + // dialResults of the last `n` dials. A successful dial is true. + dialResults []bool // successes is the count of successful dials in outcomes successes int - // failures is the count of failed dials in outcomes - failures int // state is the current state of the detector state blackHoleState @@ -54,14 +72,10 @@ type blackHoleFilter struct { metricsTracer MetricsTracer } -// RecordOutcome records the outcome of a dial. A successful dial will change the state +// RecordResult records the outcome of a dial. A successful dial will change the state // of the filter to Allowed. A failed dial only blocks subsequent requests if the success // fraction over the last n outcomes is less than the minSuccessFraction of the filter. -func (b *blackHoleFilter) RecordOutcome(success bool) { - if b == nil { - return - } - +func (b *blackHoleFilter) RecordResult(success bool) { b.mu.Lock() defer b.mu.Unlock() @@ -75,66 +89,58 @@ func (b *blackHoleFilter) RecordOutcome(success bool) { if success { b.successes++ - b.outcomes = append(b.outcomes, outcomeSuccess) - } else { - b.failures++ - b.outcomes = append(b.outcomes, outcomeFailed) } + b.dialResults = append(b.dialResults, success) - if len(b.outcomes) > b.n { - if b.outcomes[0] == outcomeSuccess { + if len(b.dialResults) > b.n { + if b.dialResults[0] { b.successes-- - } else { - b.failures-- } - b.outcomes = b.outcomes[1 : b.n+1] + b.dialResults = b.dialResults[1:] } b.updateState() b.trackMetrics() } -func (b *blackHoleFilter) IsAllowed() (state blackHoleState, isAllowed bool) { - if b == nil { - return blackHoleStateAllowed, true - } - +// HandleRequest returns the result of applying the black hole filter for the request. +func (b *blackHoleFilter) HandleRequest() blackHoleResult { b.mu.Lock() defer b.mu.Unlock() + b.requests++ - if b.requests == b.n { - b.requests = 0 - } + b.trackMetrics() - return b.state, (b.state == blackHoleStateAllowed) || (b.requests == 0) + + if b.state == blackHoleStateAllowed { + return blackHoleResultAllowed + } else if b.state == blackHoleStateProbing || b.requests%b.n == 0 { + return blackHoleResultProbing + } else { + return blackHoleResultBlocked + } } func (b *blackHoleFilter) reset() { b.successes = 0 - b.failures = 0 - b.outcomes = b.outcomes[:0] + b.dialResults = b.dialResults[:0] + b.requests = 0 b.updateState() } func (b *blackHoleFilter) updateState() { st := b.state - successFraction := 0.0 - if len(b.outcomes) < b.n { + + if len(b.dialResults) < b.n { + b.state = blackHoleStateProbing + } else if b.successes >= b.minSuccesses { b.state = blackHoleStateAllowed } else { - successFraction = float64(b.successes) / float64(b.successes+b.failures) - if successFraction >= b.minSuccessFraction { - b.state = blackHoleStateAllowed - } else { - b.state = blackHoleStateBlocked - } + b.state = blackHoleStateBlocked } + if st != b.state { - if b.state == blackHoleStateAllowed { - log.Debugf("%s blackHoleDetector state changed to Allowed", b.name) - } else { - log.Debugf("%s blackHoleDetector state changed to Blocked. Success fraction is %0.3f", b.name, successFraction) - } + log.Debugf("%s blackHoleDetector state changed from %s to %s", b.name, st, b.state) } } @@ -142,67 +148,110 @@ func (b *blackHoleFilter) trackMetrics() { if b.metricsTracer == nil { return } + + nextRequestAllowedAfter := 0 + if b.state == blackHoleStateBlocked { + nextRequestAllowedAfter = b.n - (b.requests % b.n) + } + successFraction := 0.0 - if b.successes+b.failures != 0 { - successFraction = float64(b.successes) / float64(b.successes+b.failures) + if len(b.dialResults) > 0 { + successFraction = float64(b.successes) / float64(len(b.dialResults)) } + b.metricsTracer.UpdatedBlackHoleFilterState( b.name, b.state, - b.n-b.requests, + nextRequestAllowedAfter, successFraction, ) } // blackHoleDetector provides UDP and IPv6 black hole detection using a `blackHoleFilter` -// for each. For details of the black hole detection logic see `blackHoleFilter` +// for each. For details of the black hole detection logic see `blackHoleFilter`. +// +// black hole filtering is done at a peer dial level to ensure that periodic probes to +// detect change of the black hole state are actually dialed and are not skipped +// because of dial prioritisation logic. type blackHoleDetector struct { udp, ipv6 *blackHoleFilter } -func (d *blackHoleDetector) IsAllowed(addr ma.Multiaddr) bool { - if !manet.IsPublicAddr(addr) { - return true +// FilterAddrs filters the peer's addresses removing black holed addresses +func (d *blackHoleDetector) FilterAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { + hasUDP, hasIPv6 := false, false + for _, a := range addrs { + if !manet.IsPublicAddr(a) { + continue + } + if isProtocolAddr(a, ma.P_UDP) { + hasUDP = true + } + if isProtocolAddr(a, ma.P_IP6) { + hasIPv6 = true + } } - udpState, udpAllowed := blackHoleStateAllowed, true - if d.udp != nil && isProtocolAddr(addr, ma.P_UDP) { - udpState, udpAllowed = d.udp.IsAllowed() + udpRes := blackHoleResultAllowed + if d.udp != nil && hasUDP { + udpRes = d.udp.HandleRequest() } - ipv6State, ipv6Allowed := blackHoleStateAllowed, true - if d.ipv6 != nil && isProtocolAddr(addr, ma.P_IP6) { - ipv6State, ipv6Allowed = d.ipv6.IsAllowed() + ipv6Res := blackHoleResultAllowed + if d.ipv6 != nil && hasIPv6 { + ipv6Res = d.ipv6.HandleRequest() } - // Allow all probes irrespective of the state of the other filter - if (udpState == blackHoleStateBlocked && udpAllowed) || - (ipv6State == blackHoleStateBlocked && ipv6Allowed) { - return true - } - return (udpAllowed && ipv6Allowed) + return ma.FilterAddrs( + addrs, + func(a ma.Multiaddr) bool { + if !manet.IsPublicAddr(a) { + return true + } + // allow all UDP addresses while probing irrespective of IPv6 black hole state + if udpRes == blackHoleResultProbing && isProtocolAddr(a, ma.P_UDP) { + return true + } + // allow all IPv6 addresses while probing irrespective of UDP black hole state + if ipv6Res == blackHoleResultProbing && isProtocolAddr(a, ma.P_IP6) { + return true + } + + if udpRes == blackHoleResultBlocked && isProtocolAddr(a, ma.P_UDP) { + return false + } + if ipv6Res == blackHoleResultBlocked && isProtocolAddr(a, ma.P_IP6) { + return false + } + return true + }, + ) } -// RecordOutcome updates the state of the relevant `blackHoleFilter` for addr -func (d *blackHoleDetector) RecordOutcome(addr ma.Multiaddr, success bool) { +// RecordResult updates the state of the relevant `blackHoleFilter`s for addr +func (d *blackHoleDetector) RecordResult(addr ma.Multiaddr, success bool) { if !manet.IsPublicAddr(addr) { return } if d.udp != nil && isProtocolAddr(addr, ma.P_UDP) { - d.udp.RecordOutcome(success) + d.udp.RecordResult(success) } if d.ipv6 != nil && isProtocolAddr(addr, ma.P_IP6) { - d.ipv6.RecordOutcome(success) + d.ipv6.RecordResult(success) } } func newBlackHoleDetector(detectUDP, detectIPv6 bool, mt MetricsTracer) *blackHoleDetector { d := &blackHoleDetector{} + + // A black hole is a binary property. On a network if UDP dials are blocked or there is + // no IPv6 connectivity, all dials will fail. So a low success rate of 5 out 100 dials + // is good enough. if detectUDP { - d.udp = &blackHoleFilter{n: 100, minSuccessFraction: 0.01, name: "UDP", metricsTracer: mt} + d.udp = &blackHoleFilter{n: 100, minSuccesses: 5, name: "UDP", metricsTracer: mt} } if detectIPv6 { - d.ipv6 = &blackHoleFilter{n: 100, minSuccessFraction: 0.01, name: "IPv6", metricsTracer: mt} + d.ipv6 = &blackHoleFilter{n: 100, minSuccesses: 5, name: "IPv6", metricsTracer: mt} } return d } diff --git a/p2p/net/swarm/black_hole_detector_test.go b/p2p/net/swarm/black_hole_detector_test.go index e53a800241..564fc07767 100644 --- a/p2p/net/swarm/black_hole_detector_test.go +++ b/p2p/net/swarm/black_hole_detector_test.go @@ -1,171 +1,179 @@ package swarm import ( + "fmt" "testing" ma "github.com/multiformats/go-multiaddr" + "github.com/stretchr/testify/require" ) func TestBlackHoleFilterReset(t *testing.T) { n := 10 - bhd := &blackHoleFilter{n: n, minSuccessFraction: 0.05, name: "test"} + bhf := &blackHoleFilter{n: n, minSuccesses: 2, name: "test"} var i = 0 - // calls up to threshold should be allowed + // calls up to n should be probing for i = 1; i <= n; i++ { - if _, isAllowed := bhd.IsAllowed(); !isAllowed { - t.Fatalf("expected calls up to minDials to be allowed") + if bhf.HandleRequest() != blackHoleResultProbing { + t.Fatalf("expected calls up to n to be probes") } - bhd.RecordOutcome(false) + bhf.RecordResult(false) } - // after threshold calls every nth call should be allowed + + // after threshold calls every nth call should be a probe for i = n + 1; i < 42; i++ { - _, isAllowed := bhd.IsAllowed() - if (i%n == 0 && !isAllowed) || (i%n != 0 && isAllowed) { - t.Fatalf("expected every nth dial to be allowed") + result := bhf.HandleRequest() + if (i%n == 0 && result != blackHoleResultProbing) || (i%n != 0 && result != blackHoleResultBlocked) { + t.Fatalf("expected every nth dial to be a probe") } } - bhd.RecordOutcome(true) - // check if calls up to threshold are allowed after success + bhf.RecordResult(true) + // check if calls up to n are probes again for i = 0; i < n; i++ { - if _, isAllowed := bhd.IsAllowed(); !isAllowed { + if bhf.HandleRequest() != blackHoleResultProbing { t.Fatalf("expected black hole detector state to reset after success") } - bhd.RecordOutcome(false) + bhf.RecordResult(false) } - // next call should be refused - if _, isAllowed := bhd.IsAllowed(); isAllowed { + // next call should be blocked + if bhf.HandleRequest() != blackHoleResultBlocked { t.Fatalf("expected dial to be blocked") } } -func TestBlackHoleDetector(t *testing.T) { +func TestBlackHoleFilterSuccessFraction(t *testing.T) { n := 10 - bhd := &blackHoleFilter{n: n, minSuccessFraction: 0.4, name: "test"} - var i = 0 - // 5 success and 5 fails - for i = 1; i <= 5; i++ { - bhd.RecordOutcome(true) - } - for i = 1; i <= 5; i++ { - bhd.RecordOutcome(false) - } - - if _, isAllowed := bhd.IsAllowed(); !isAllowed { - t.Fatalf("expected dial to be allowed") - } - // 4 success and 6 fails - bhd.RecordOutcome(false) - - if _, isAllowed := bhd.IsAllowed(); !isAllowed { - t.Fatalf("expected dial to be allowed") - } - // 3 success and 7 fails - bhd.RecordOutcome(false) - - // should be blocked - if _, isAllowed := bhd.IsAllowed(); isAllowed { - t.Fatalf("expected dial to be blocked") - } - - bhd.RecordOutcome(true) - // 5 success and 5 fails - for i = 1; i <= 5; i++ { - bhd.RecordOutcome(true) - } - for i = 1; i <= 5; i++ { - bhd.RecordOutcome(false) - } - - if _, isAllowed := bhd.IsAllowed(); !isAllowed { - t.Fatalf("expected dial to be allowed") - } - // 4 success and 6 fails - bhd.RecordOutcome(false) - - if _, isAllowed := bhd.IsAllowed(); !isAllowed { - t.Fatalf("expected dial to be allowed") - } - // 3 success and 7 fails - bhd.RecordOutcome(false) - - // should be blocked - if _, isAllowed := bhd.IsAllowed(); isAllowed { - t.Fatalf("expected dial to be blocked") + tests := []struct { + minSuccesses, successes int + result blackHoleResult + }{ + {minSuccesses: 5, successes: 5, result: blackHoleResultAllowed}, + {minSuccesses: 3, successes: 3, result: blackHoleResultAllowed}, + {minSuccesses: 5, successes: 4, result: blackHoleResultBlocked}, + {minSuccesses: 5, successes: 7, result: blackHoleResultAllowed}, + {minSuccesses: 3, successes: 1, result: blackHoleResultBlocked}, + {minSuccesses: 0, successes: 0, result: blackHoleResultAllowed}, + {minSuccesses: 10, successes: 10, result: blackHoleResultAllowed}, + } + for i, tc := range tests { + t.Run(fmt.Sprintf("case-%d", i), func(t *testing.T) { + bhf := blackHoleFilter{n: n, minSuccesses: tc.minSuccesses} + for i := 0; i < tc.successes; i++ { + bhf.RecordResult(true) + } + for i := 0; i < n-tc.successes; i++ { + bhf.RecordResult(false) + } + got := bhf.HandleRequest() + if got != tc.result { + t.Fatalf("expected %d got %d", tc.result, got) + } + }) } - } func TestBlackHoleDetectorInApplicableAddress(t *testing.T) { bhd := newBlackHoleDetector(true, true, nil) - addr := ma.StringCast("/ip4/127.0.0.1/tcp/1234") + addrs := []ma.Multiaddr{ + ma.StringCast("/ip4/1.2.3.4/tcp/1234"), + ma.StringCast("/ip4/1.2.3.4/tcp/1233"), + ma.StringCast("/ip6/::1/udp/1234/quic-v1"), + ma.StringCast("/ip4/192.168.1.5/udp/1234/quic-v1"), + } for i := 0; i < 1000; i++ { - if !bhd.IsAllowed(addr) { - t.Fatalf("expect dials to inapplicable address to always be allowed") + filteredAddrs := bhd.FilterAddrs(addrs) + require.ElementsMatch(t, addrs, filteredAddrs) + for j := 0; j < len(addrs); j++ { + bhd.RecordResult(addrs[j], false) } - bhd.RecordOutcome(addr, false) } } -func TestBlackHoleDetectorUDP(t *testing.T) { - bhd := newBlackHoleDetector(true, true, nil) - addr := ma.StringCast("/ip4/1.2.3.4/udp/1234") - for i := 0; i < 100; i++ { - bhd.RecordOutcome(addr, false) - } - if bhd.IsAllowed(addr) { - t.Fatalf("expect dial to be be blocked") - } - - bhd = newBlackHoleDetector(false, true, nil) +func TestBlackHoleDetectorUDPDisabled(t *testing.T) { + bhd := newBlackHoleDetector(false, true, nil) + publicAddr := ma.StringCast("/ip4/1.2.3.4/udp/1234/quic-v1") + privAddr := ma.StringCast("/ip4/192.168.1.5/udp/1234/quic-v1") for i := 0; i < 100; i++ { - bhd.RecordOutcome(addr, false) - } - if !bhd.IsAllowed(addr) { - t.Fatalf("expected dial to be be allowed when UDP detection is disabled") + bhd.RecordResult(publicAddr, false) } + addrs := []ma.Multiaddr{publicAddr, privAddr} + require.ElementsMatch(t, addrs, bhd.FilterAddrs(addrs)) } -func TestBlackHoleDetectorIPv6(t *testing.T) { - bhd := newBlackHoleDetector(true, true, nil) - addr := ma.StringCast("/ip6/1::1/tcp/1234") - for i := 0; i < 100; i++ { - bhd.RecordOutcome(addr, false) - } - if bhd.IsAllowed(addr) { - t.Fatalf("expect dial to be be blocked") - } - - bhd = newBlackHoleDetector(true, false, nil) +func TestBlackHoleDetectorIPv6Disabled(t *testing.T) { + bhd := newBlackHoleDetector(true, false, nil) + publicAddr := ma.StringCast("/ip6/1::1/tcp/1234") + privAddr := ma.StringCast("/ip6/::1/tcp/1234") + addrs := []ma.Multiaddr{publicAddr, privAddr} for i := 0; i < 100; i++ { - bhd.RecordOutcome(addr, false) - } - if !bhd.IsAllowed(addr) { - t.Fatalf("expected dial to be be allowed when IPv6 detection is disabled") + bhd.RecordResult(publicAddr, false) } + require.ElementsMatch(t, addrs, bhd.FilterAddrs(addrs)) } func TestBlackHoleDetectorProbes(t *testing.T) { bhd := &blackHoleDetector{ - udp: &blackHoleFilter{n: 2, minSuccessFraction: 0.5}, - ipv6: &blackHoleFilter{n: 3, minSuccessFraction: 0.5}, + udp: &blackHoleFilter{n: 2, minSuccesses: 1, name: "udp"}, + ipv6: &blackHoleFilter{n: 3, minSuccesses: 1, name: "ipv6"}, } udp6Addr := ma.StringCast("/ip6/1::1/udp/1234/quic-v1") + addrs := []ma.Multiaddr{udp6Addr} for i := 0; i < 3; i++ { - bhd.RecordOutcome(udp6Addr, false) + bhd.RecordResult(udp6Addr, false) } for i := 1; i < 100; i++ { - isAllowed := bhd.IsAllowed(udp6Addr) + filteredAddrs := bhd.FilterAddrs(addrs) if i%2 == 0 || i%3 == 0 { - if !isAllowed { + if len(filteredAddrs) == 0 { t.Fatalf("expected probe to be allowed irrespective of the state of other black hole filter") } } else { - if isAllowed { - t.Fatalf("expected dial to be blocked") + if len(filteredAddrs) != 0 { + t.Fatalf("expected dial to be blocked %s", filteredAddrs) } } } } + +func TestBlackHoleDetectorAddrFiltering(t *testing.T) { + udp6Pub := ma.StringCast("/ip6/1::1/udp/1234/quic-v1") + udp6Pri := ma.StringCast("/ip6/::1/udp/1234/quic-v1") + upd4Pub := ma.StringCast("/ip4/1.2.3.4/udp/1234/quic-v1") + udp4Pri := ma.StringCast("/ip4/192.168.1.5/udp/1234/quic-v1") + tcp6Pub := ma.StringCast("/ip6/1::1/tcp/1234/quic-v1") + tcp6Pri := ma.StringCast("/ip6/::1/tcp/1234/quic-v1") + tcp4Pub := ma.StringCast("/ip4/1.2.3.4/tcp/1234/quic-v1") + tcp4Pri := ma.StringCast("/ip4/192.168.1.5/tcp/1234/quic-v1") + + makeBHD := func(udpBlocked, ipv6Blocked bool) *blackHoleDetector { + bhd := &blackHoleDetector{ + udp: &blackHoleFilter{n: 100, minSuccesses: 10, name: "udp"}, + ipv6: &blackHoleFilter{n: 100, minSuccesses: 10, name: "ipv6"}, + } + for i := 0; i < 100; i++ { + bhd.RecordResult(upd4Pub, !udpBlocked) + } + for i := 0; i < 100; i++ { + bhd.RecordResult(tcp6Pub, !ipv6Blocked) + } + return bhd + } + + allInput := []ma.Multiaddr{udp6Pub, udp6Pri, upd4Pub, udp4Pri, tcp6Pub, tcp6Pri, + tcp4Pub, tcp4Pri} + + udpBlockedOutput := []ma.Multiaddr{udp6Pri, udp4Pri, tcp6Pub, tcp6Pri, tcp4Pub, tcp4Pri} + bhd := makeBHD(true, false) + require.ElementsMatch(t, udpBlockedOutput, bhd.FilterAddrs(allInput)) + + ip6BlockedOutput := []ma.Multiaddr{udp6Pri, upd4Pub, udp4Pri, tcp6Pri, tcp4Pub, tcp4Pri} + bhd = makeBHD(false, true) + require.ElementsMatch(t, ip6BlockedOutput, bhd.FilterAddrs(allInput)) + + bothBlockedOutput := []ma.Multiaddr{udp6Pri, udp4Pri, tcp6Pri, tcp4Pub, tcp4Pri} + bhd = makeBHD(true, true) + require.ElementsMatch(t, bothBlockedOutput, bhd.FilterAddrs(allInput)) +} diff --git a/p2p/net/swarm/dial_worker.go b/p2p/net/swarm/dial_worker.go index a8252c9680..0334ac863e 100644 --- a/p2p/net/swarm/dial_worker.go +++ b/p2p/net/swarm/dial_worker.go @@ -363,7 +363,11 @@ loop: // we only add backoff if there has not been a successful connection // for consistency with the old dialer behavior. w.s.backf.AddBackoff(w.peer, res.Addr) + } else if res.Err == ErrDialRefusedBlackHole { + log.Errorf("SWARM BUG: unexpected ErrDialRefusedBlackHole while dialing peer %s to addr %s", + w.peer, res.Addr) } + w.dispatchError(ad, res.Err) // Only schedule next dial on error. // If we scheduleNextDial on success, we will end up making one dial more than diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index 65f3294837..416bfd8cda 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -108,24 +108,6 @@ func WithDialRanker(d network.DialRanker) Option { } } -// WithNoIPv6BlackHoleDetection configures swarm to not do any black hole detection for -// IPv6 addresses -func WithNoIPv6BlackHoleDetection() Option { - return func(s *Swarm) error { - s.disableIPv6BlackHoleDetection = true - return nil - } -} - -// WithNoUDPBlackHoleDetection configures swarm to not do any black hole detection for -// UDP addresses -func WithNoUDPBlackHoleDetection() Option { - return func(s *Swarm) error { - s.disableUDPBlackHoleDetection = true - return nil - } -} - // Swarm is a connection muxer, allowing connections to other peers to // be opened and closed, while still using the same Chan for all // communication. The Chan sends/receives Messages, which note the @@ -192,9 +174,7 @@ type Swarm struct { dialRanker network.DialRanker - disableIPv6BlackHoleDetection bool - disableUDPBlackHoleDetection bool - bhd *blackHoleDetector + bhd *blackHoleDetector } // NewSwarm constructs a Swarm. @@ -235,7 +215,7 @@ func NewSwarm(local peer.ID, peers peerstore.Peerstore, eventBus event.Bus, opts s.limiter = newDialLimiter(s.dialAddr) s.backf.init(s.ctx) - s.bhd = newBlackHoleDetector(!s.disableUDPBlackHoleDetection, !s.disableIPv6BlackHoleDetection, s.metricsTracer) + s.bhd = newBlackHoleDetector(true, true, s.metricsTracer) return s, nil } diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index 91b3e866a9..83799f56f7 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -396,15 +396,6 @@ func (s *Swarm) dialNextAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr, } } - // Check if dial to the address is black holed. - // This is the best place to have this check since we want to ensure that periodic - // probes allowed by the black hole detector are actually dialed. If this check is - // done before the dial prioritisation logic, we might not dial the address because - // a higher priority address succeeded. - if !s.bhd.IsAllowed(addr) { - return ErrDialRefusedBlackHole - } - // start the dial s.limitedDial(ctx, p, addr, resch) @@ -446,9 +437,13 @@ func (s *Swarm) filterKnownUndialables(p peer.ID, addrs []ma.Multiaddr) []ma.Mul // filter addresses we cannot dial addrs = ma.FilterAddrs(addrs, s.canDial) + // filter low priority addresses among the addresses we can dial addrs = filterLowPriorityAddresses(addrs) + // remove black holed addrs + addrs = s.bhd.FilterAddrs(addrs) + return ma.FilterAddrs(addrs, func(addr ma.Multiaddr) bool { return !ma.Contains(ourAddrs, addr) }, // TODO: Consider allowing link-local addresses @@ -497,7 +492,10 @@ func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (tra start := time.Now() connC, err := tpt.Dial(ctx, addr, p) - s.bhd.RecordOutcome(addr, err == nil) + // We're recording any error as a failure here. + // Notably, this also applies to cancelations (i.e. if another dial attempt was faster). + // This is ok since the black hole detector uses a very low threshold (5%). + s.bhd.RecordResult(addr, err == nil) if err != nil { if s.metricsTracer != nil { diff --git a/p2p/net/swarm/swarm_dial_test.go b/p2p/net/swarm/swarm_dial_test.go index bfe84c7fdb..2f6b3f8c4d 100644 --- a/p2p/net/swarm/swarm_dial_test.go +++ b/p2p/net/swarm/swarm_dial_test.go @@ -340,10 +340,10 @@ func TestBlackHoledAddrBlocked(t *testing.T) { defer s.Close() n := 3 - s.bhd.udp = &blackHoleFilter{n: n, minSuccessFraction: 0.5, name: "UDP"} + s.bhd.ipv6 = &blackHoleFilter{n: n, minSuccesses: 1, name: "IPv6"} // all dials to the address will fail. RFC6666 Discard Prefix - addr := ma.StringCast("/ip6/0100::1/udp/54321/quic-v1") + addr := ma.StringCast("/ip6/0100::1/tcp/54321/") p, err := test.RandPeerID() if err != nil { @@ -369,18 +369,7 @@ func TestBlackHoledAddrBlocked(t *testing.T) { if conn != nil { t.Fatalf("expected dial to be blocked") } - dialError, ok := err.(*DialError) - if !ok { - t.Fatalf("expected to receive an error of type *DialError, got %T", err) - } - isBlackHoleErr := false - for _, err := range dialError.DialErrors { - if err.Cause == ErrDialRefusedBlackHole { - isBlackHoleErr = true - break - } - } - if !isBlackHoleErr { - t.Fatalf("expected to receive ErrDialRefusedBlackHole %s", err) + if err != ErrNoGoodAddresses { + t.Fatalf("expected to receive an error of type *DialError, got %s of type %T", err, err) } } diff --git a/p2p/net/swarm/swarm_metrics.go b/p2p/net/swarm/swarm_metrics.go index dfa0698b3e..28564e9e54 100644 --- a/p2p/net/swarm/swarm_metrics.go +++ b/p2p/net/swarm/swarm_metrics.go @@ -273,7 +273,5 @@ func (m *metricsTracer) UpdatedBlackHoleFilterState(name string, state blackHole blackHoleFilterState.WithLabelValues(*tags...).Set(float64(state)) blackHoleFilterSuccessFraction.WithLabelValues(*tags...).Set(successFraction) - if state == blackHoleStateBlocked { - blackHoleFilterNextRequestAllowedAfter.WithLabelValues(*tags...).Set(float64(nextProbeAfter)) - } + blackHoleFilterNextRequestAllowedAfter.WithLabelValues(*tags...).Set(float64(nextProbeAfter)) }