From bbdcb0a6418cd375500ba6adec41eb30ed054371 Mon Sep 17 00:00:00 2001 From: sukun Date: Fri, 16 Jun 2023 18:06:42 +0530 Subject: [PATCH] address review comments --- p2p/net/swarm/black_hole_detector.go | 94 ++++++++++++----------- p2p/net/swarm/black_hole_detector_test.go | 77 ++++++++++--------- p2p/net/swarm/swarm_dial.go | 4 +- 3 files changed, 92 insertions(+), 83 deletions(-) diff --git a/p2p/net/swarm/black_hole_detector.go b/p2p/net/swarm/black_hole_detector.go index 3c63868ac1..a373f8bdcc 100644 --- a/p2p/net/swarm/black_hole_detector.go +++ b/p2p/net/swarm/black_hole_detector.go @@ -7,18 +7,19 @@ import ( manet "github.com/multiformats/go-multiaddr/net" ) -type outcome int +type blackHoleState int const ( - outcomeSuccess outcome = iota - outcomeFailed + blackHoleStateAllowed blackHoleState = iota + blackHoleStateBlocked ) -type blackHoleState int +type blackHoleResult int const ( - blackHoleStateAllowed blackHoleState = iota - blackHoleStateBlocked + blackHoleResultAllowed blackHoleResult = iota + blackHoleResultProbing + blackHoleResultBlocked ) // blackHoleFilter provides black hole filtering logic for dials. On detecting a black holed @@ -41,8 +42,8 @@ type blackHoleFilter struct { // requests counts number of dial requests up to n. Resets to 0 every nth request. requests int - // outcomes of the last `n` allowed dials - outcomes []outcome + // dialResults of the last `n` allowed dials. success is true. + dialResults []bool // successes is the count of successful dials in outcomes successes int // failures is the count of failed dials in outcomes @@ -54,14 +55,10 @@ type blackHoleFilter struct { metricsTracer MetricsTracer } -// RecordOutcome records the outcome of a dial. A successful dial will change the state +// RecordResult records the outcome of a dial. A successful dial will change the state // of the filter to Allowed. A failed dial only blocks subsequent requests if the success // fraction over the last n outcomes is less than the minSuccessFraction of the filter. -func (b *blackHoleFilter) RecordOutcome(success bool) { - if b == nil { - return - } - +func (b *blackHoleFilter) RecordResult(success bool) { b.mu.Lock() defer b.mu.Unlock() @@ -75,51 +72,54 @@ func (b *blackHoleFilter) RecordOutcome(success bool) { if success { b.successes++ - b.outcomes = append(b.outcomes, outcomeSuccess) } else { b.failures++ - b.outcomes = append(b.outcomes, outcomeFailed) } + b.dialResults = append(b.dialResults, success) - if len(b.outcomes) > b.n { - if b.outcomes[0] == outcomeSuccess { + if len(b.dialResults) > b.n { + if b.dialResults[0] { b.successes-- } else { b.failures-- } - b.outcomes = b.outcomes[1 : b.n+1] + b.dialResults = b.dialResults[1 : b.n+1] } b.updateState() b.trackMetrics() } -func (b *blackHoleFilter) IsAllowed() (state blackHoleState, isAllowed bool) { - if b == nil { - return blackHoleStateAllowed, true - } - +// HandleRequest handles a new dial request for the filter. It returns a +func (b *blackHoleFilter) HandleRequest() blackHoleResult { b.mu.Lock() defer b.mu.Unlock() + b.requests++ - if b.requests == b.n { - b.requests = 0 - } + b.trackMetrics() - return b.state, (b.state == blackHoleStateAllowed) || (b.requests == 0) + + if b.state == blackHoleStateAllowed { + return blackHoleResultAllowed + } else if b.requests%b.n == 0 { + return blackHoleResultProbing + } else { + return blackHoleResultBlocked + } } func (b *blackHoleFilter) reset() { b.successes = 0 b.failures = 0 - b.outcomes = b.outcomes[:0] + b.dialResults = b.dialResults[:0] + b.requests = 0 b.updateState() } func (b *blackHoleFilter) updateState() { st := b.state successFraction := 0.0 - if len(b.outcomes) < b.n { + if len(b.dialResults) < b.n { b.state = blackHoleStateAllowed } else { successFraction = float64(b.successes) / float64(b.successes+b.failures) @@ -146,10 +146,15 @@ func (b *blackHoleFilter) trackMetrics() { if b.successes+b.failures != 0 { successFraction = float64(b.successes) / float64(b.successes+b.failures) } + + nextRequestAllowedAfter := 0 + if b.state == blackHoleStateBlocked { + nextRequestAllowedAfter = b.n - (b.requests % b.n) + } b.metricsTracer.UpdatedBlackHoleFilterState( b.name, b.state, - b.n-b.requests, + nextRequestAllowedAfter, successFraction, ) } @@ -160,44 +165,47 @@ type blackHoleDetector struct { udp, ipv6 *blackHoleFilter } -func (d *blackHoleDetector) IsAllowed(addr ma.Multiaddr) bool { +func (d *blackHoleDetector) HandleRequest(addr ma.Multiaddr) bool { if !manet.IsPublicAddr(addr) { return true } - udpState, udpAllowed := blackHoleStateAllowed, true + udpres := blackHoleResultAllowed if d.udp != nil && isProtocolAddr(addr, ma.P_UDP) { - udpState, udpAllowed = d.udp.IsAllowed() + udpres = d.udp.HandleRequest() } - ipv6State, ipv6Allowed := blackHoleStateAllowed, true + ipv6res := blackHoleResultAllowed if d.ipv6 != nil && isProtocolAddr(addr, ma.P_IP6) { - ipv6State, ipv6Allowed = d.ipv6.IsAllowed() + ipv6res = d.ipv6.HandleRequest() } // Allow all probes irrespective of the state of the other filter - if (udpState == blackHoleStateBlocked && udpAllowed) || - (ipv6State == blackHoleStateBlocked && ipv6Allowed) { + if udpres == blackHoleResultProbing || ipv6res == blackHoleResultProbing { return true } - return (udpAllowed && ipv6Allowed) + return (udpres != blackHoleResultBlocked && ipv6res != blackHoleResultBlocked) } -// RecordOutcome updates the state of the relevant `blackHoleFilter` for addr -func (d *blackHoleDetector) RecordOutcome(addr ma.Multiaddr, success bool) { +// RecordResult updates the state of the relevant `blackHoleFilter` for addr +func (d *blackHoleDetector) RecordResult(addr ma.Multiaddr, success bool) { if !manet.IsPublicAddr(addr) { return } if d.udp != nil && isProtocolAddr(addr, ma.P_UDP) { - d.udp.RecordOutcome(success) + d.udp.RecordResult(success) } if d.ipv6 != nil && isProtocolAddr(addr, ma.P_IP6) { - d.ipv6.RecordOutcome(success) + d.ipv6.RecordResult(success) } } func newBlackHoleDetector(detectUDP, detectIPv6 bool, mt MetricsTracer) *blackHoleDetector { d := &blackHoleDetector{} + + // A black hole is a binary property. On a network if UDP dials are blocked or there is + // no IPv6 connectivity, all dials will fail. So a low min success fraction like 0.01 is + // good enough. if detectUDP { d.udp = &blackHoleFilter{n: 100, minSuccessFraction: 0.01, name: "UDP", metricsTracer: mt} } diff --git a/p2p/net/swarm/black_hole_detector_test.go b/p2p/net/swarm/black_hole_detector_test.go index e53a800241..f3f269af92 100644 --- a/p2p/net/swarm/black_hole_detector_test.go +++ b/p2p/net/swarm/black_hole_detector_test.go @@ -8,90 +8,91 @@ import ( func TestBlackHoleFilterReset(t *testing.T) { n := 10 - bhd := &blackHoleFilter{n: n, minSuccessFraction: 0.05, name: "test"} + bhf := &blackHoleFilter{n: n, minSuccessFraction: 0.05, name: "test"} var i = 0 // calls up to threshold should be allowed for i = 1; i <= n; i++ { - if _, isAllowed := bhd.IsAllowed(); !isAllowed { + if bhf.HandleRequest() != blackHoleResultAllowed { t.Fatalf("expected calls up to minDials to be allowed") } - bhd.RecordOutcome(false) + bhf.RecordResult(false) } + // after threshold calls every nth call should be allowed for i = n + 1; i < 42; i++ { - _, isAllowed := bhd.IsAllowed() - if (i%n == 0 && !isAllowed) || (i%n != 0 && isAllowed) { + result := bhf.HandleRequest() + if (i%n == 0 && result != blackHoleResultProbing) || (i%n != 0 && result != blackHoleResultBlocked) { t.Fatalf("expected every nth dial to be allowed") } } - bhd.RecordOutcome(true) + bhf.RecordResult(true) // check if calls up to threshold are allowed after success for i = 0; i < n; i++ { - if _, isAllowed := bhd.IsAllowed(); !isAllowed { + if bhf.HandleRequest() != blackHoleResultAllowed { t.Fatalf("expected black hole detector state to reset after success") } - bhd.RecordOutcome(false) + bhf.RecordResult(false) } // next call should be refused - if _, isAllowed := bhd.IsAllowed(); isAllowed { + if bhf.HandleRequest() != blackHoleResultBlocked { t.Fatalf("expected dial to be blocked") } } -func TestBlackHoleDetector(t *testing.T) { +func TestBlackHoleFilterSuccessFraction(t *testing.T) { n := 10 - bhd := &blackHoleFilter{n: n, minSuccessFraction: 0.4, name: "test"} + bhf := &blackHoleFilter{n: n, minSuccessFraction: 0.4, name: "test"} var i = 0 // 5 success and 5 fails for i = 1; i <= 5; i++ { - bhd.RecordOutcome(true) + bhf.RecordResult(true) } for i = 1; i <= 5; i++ { - bhd.RecordOutcome(false) + bhf.RecordResult(false) } - if _, isAllowed := bhd.IsAllowed(); !isAllowed { + if bhf.HandleRequest() != blackHoleResultAllowed { t.Fatalf("expected dial to be allowed") } // 4 success and 6 fails - bhd.RecordOutcome(false) + bhf.RecordResult(false) - if _, isAllowed := bhd.IsAllowed(); !isAllowed { + if bhf.HandleRequest() != blackHoleResultAllowed { t.Fatalf("expected dial to be allowed") } // 3 success and 7 fails - bhd.RecordOutcome(false) + bhf.RecordResult(false) // should be blocked - if _, isAllowed := bhd.IsAllowed(); isAllowed { + if bhf.HandleRequest() != blackHoleResultBlocked { t.Fatalf("expected dial to be blocked") } - bhd.RecordOutcome(true) + bhf.RecordResult(true) // 5 success and 5 fails for i = 1; i <= 5; i++ { - bhd.RecordOutcome(true) + bhf.RecordResult(true) } for i = 1; i <= 5; i++ { - bhd.RecordOutcome(false) + bhf.RecordResult(false) } - if _, isAllowed := bhd.IsAllowed(); !isAllowed { + if bhf.HandleRequest() != blackHoleResultAllowed { t.Fatalf("expected dial to be allowed") } // 4 success and 6 fails - bhd.RecordOutcome(false) + bhf.RecordResult(false) - if _, isAllowed := bhd.IsAllowed(); !isAllowed { + if bhf.HandleRequest() != blackHoleResultAllowed { t.Fatalf("expected dial to be allowed") } // 3 success and 7 fails - bhd.RecordOutcome(false) + bhf.RecordResult(false) // should be blocked - if _, isAllowed := bhd.IsAllowed(); isAllowed { + if bhf.HandleRequest() != blackHoleResultBlocked { t.Fatalf("expected dial to be blocked") } @@ -101,10 +102,10 @@ func TestBlackHoleDetectorInApplicableAddress(t *testing.T) { bhd := newBlackHoleDetector(true, true, nil) addr := ma.StringCast("/ip4/127.0.0.1/tcp/1234") for i := 0; i < 1000; i++ { - if !bhd.IsAllowed(addr) { + if !bhd.HandleRequest(addr) { t.Fatalf("expect dials to inapplicable address to always be allowed") } - bhd.RecordOutcome(addr, false) + bhd.RecordResult(addr, false) } } @@ -112,17 +113,17 @@ func TestBlackHoleDetectorUDP(t *testing.T) { bhd := newBlackHoleDetector(true, true, nil) addr := ma.StringCast("/ip4/1.2.3.4/udp/1234") for i := 0; i < 100; i++ { - bhd.RecordOutcome(addr, false) + bhd.RecordResult(addr, false) } - if bhd.IsAllowed(addr) { + if bhd.HandleRequest(addr) { t.Fatalf("expect dial to be be blocked") } bhd = newBlackHoleDetector(false, true, nil) for i := 0; i < 100; i++ { - bhd.RecordOutcome(addr, false) + bhd.RecordResult(addr, false) } - if !bhd.IsAllowed(addr) { + if !bhd.HandleRequest(addr) { t.Fatalf("expected dial to be be allowed when UDP detection is disabled") } } @@ -131,17 +132,17 @@ func TestBlackHoleDetectorIPv6(t *testing.T) { bhd := newBlackHoleDetector(true, true, nil) addr := ma.StringCast("/ip6/1::1/tcp/1234") for i := 0; i < 100; i++ { - bhd.RecordOutcome(addr, false) + bhd.RecordResult(addr, false) } - if bhd.IsAllowed(addr) { + if bhd.HandleRequest(addr) { t.Fatalf("expect dial to be be blocked") } bhd = newBlackHoleDetector(true, false, nil) for i := 0; i < 100; i++ { - bhd.RecordOutcome(addr, false) + bhd.RecordResult(addr, false) } - if !bhd.IsAllowed(addr) { + if !bhd.HandleRequest(addr) { t.Fatalf("expected dial to be be allowed when IPv6 detection is disabled") } } @@ -153,10 +154,10 @@ func TestBlackHoleDetectorProbes(t *testing.T) { } udp6Addr := ma.StringCast("/ip6/1::1/udp/1234/quic-v1") for i := 0; i < 3; i++ { - bhd.RecordOutcome(udp6Addr, false) + bhd.RecordResult(udp6Addr, false) } for i := 1; i < 100; i++ { - isAllowed := bhd.IsAllowed(udp6Addr) + isAllowed := bhd.HandleRequest(udp6Addr) if i%2 == 0 || i%3 == 0 { if !isAllowed { t.Fatalf("expected probe to be allowed irrespective of the state of other black hole filter") diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index 91b3e866a9..d86d17a921 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -401,7 +401,7 @@ func (s *Swarm) dialNextAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr, // probes allowed by the black hole detector are actually dialed. If this check is // done before the dial prioritisation logic, we might not dial the address because // a higher priority address succeeded. - if !s.bhd.IsAllowed(addr) { + if !s.bhd.HandleRequest(addr) { return ErrDialRefusedBlackHole } @@ -497,7 +497,7 @@ func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (tra start := time.Now() connC, err := tpt.Dial(ctx, addr, p) - s.bhd.RecordOutcome(addr, err == nil) + s.bhd.RecordResult(addr, err == nil) if err != nil { if s.metricsTracer != nil {