Skip to content

Commit

Permalink
swarm: track dial cancellation reason
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt committed Aug 27, 2023
1 parent 3441712 commit 3665bd0
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 18 deletions.
30 changes: 20 additions & 10 deletions p2p/net/swarm/dial_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package swarm

import (
"context"
"errors"
"fmt"
"sync"

"github.com/libp2p/go-libp2p/core/network"
Expand All @@ -11,6 +13,10 @@ import (
// dialWorkerFunc is used by dialSync to spawn a new dial worker
type dialWorkerFunc func(peer.ID, <-chan dialRequest)

// errParentContextCanceled is used with cancelCause to cancel the activeDial when the parent context is cancelled
// This helps distinguish between err nil and err context.Cancelled when inspecting the context with context.Cause
var errParentContextCanceled = errors.New("parent context cancelled")

// newDialSync constructs a new dialSync
func newDialSync(worker dialWorkerFunc) *dialSync {
return &dialSync{
Expand All @@ -30,14 +36,17 @@ type dialSync struct {
type activeDial struct {
refCnt int

ctx context.Context
cancel func()
ctx context.Context
cancelCause func(error)

reqch chan dialRequest
}

func (ad *activeDial) close() {
ad.cancel()
func (ad *activeDial) close(err error) {
if errors.Is(err, context.Canceled) {
err = fmt.Errorf("%w: %w", errParentContextCanceled, context.Canceled)
}
ad.cancelCause(err)
close(ad.reqch)
}

Expand Down Expand Up @@ -74,11 +83,11 @@ func (ds *dialSync) getActiveDial(p peer.ID) (*activeDial, error) {
if !ok {
// This code intentionally uses the background context. Otherwise, if the first call
// to Dial is canceled, subsequent dial calls will also be canceled.
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancelCause(context.Background())
actd = &activeDial{
ctx: ctx,
cancel: cancel,
reqch: make(chan dialRequest),
ctx: ctx,
cancelCause: cancel,
reqch: make(chan dialRequest),
}
go ds.dialWorker(p, actd.reqch)
ds.dials[p] = actd
Expand All @@ -101,9 +110,10 @@ func (ds *dialSync) Dial(ctx context.Context, p peer.ID) (*Conn, error) {
defer ds.mutex.Unlock()
ad.refCnt--
if ad.refCnt == 0 {
ad.close()
ad.close(err)
delete(ds.dials, p)
}
}()
return ad.dial(ctx)
conn, err := ad.dial(ctx) // updated err is used in defered func
return conn, err
}
2 changes: 1 addition & 1 deletion p2p/net/swarm/swarm_dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (tra

if err != nil {
if s.metricsTracer != nil {
s.metricsTracer.FailedDialing(addr, err)
s.metricsTracer.FailedDialing(addr, err, context.Cause(ctx))
}
return nil, err
}
Expand Down
26 changes: 19 additions & 7 deletions p2p/net/swarm/swarm_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ type MetricsTracer interface {
OpenedConnection(network.Direction, crypto.PubKey, network.ConnectionState, ma.Multiaddr)
ClosedConnection(network.Direction, time.Duration, network.ConnectionState, ma.Multiaddr)
CompletedHandshake(time.Duration, network.ConnectionState, ma.Multiaddr)
FailedDialing(ma.Multiaddr, error)
FailedDialing(ma.Multiaddr, error, error)
DialCompleted(success bool, totalDials int)
DialRankingDelay(d time.Duration)
UpdatedBlackHoleFilterState(name string, state blackHoleState, nextProbeAfter int, successFraction float64)
Expand Down Expand Up @@ -216,18 +216,30 @@ func (m *metricsTracer) CompletedHandshake(t time.Duration, cs network.Connectio
connHandshakeLatency.WithLabelValues(*tags...).Observe(t.Seconds())
}

func (m *metricsTracer) FailedDialing(addr ma.Multiaddr, err error) {
func (m *metricsTracer) FailedDialing(addr ma.Multiaddr, dialErr error, cause error) {
transport := metricshelper.GetTransport(addr)
e := "other"
if errors.Is(err, context.Canceled) {
e = "canceled"
} else if errors.Is(err, context.DeadlineExceeded) {
// dial deadline exceeded or the the parent contexts deadline exceeded
if errors.Is(dialErr, context.DeadlineExceeded) || errors.Is(cause, context.DeadlineExceeded) {
e = "deadline"
} else if errors.Is(dialErr, context.Canceled) {
// dial was cancelled.

if errors.Is(cause, errParentContextCanceled) {
// parent was canceled
e = "canceled"
} else if errors.Is(cause, context.Canceled) {
// another parallel dial succeeded
e = "canceled: dial successful"
} else {
// something else
e = "canceled: other"
}
} else {
nerr, ok := err.(net.Error)
nerr, ok := dialErr.(net.Error)
if ok && nerr.Timeout() {
e = "timeout"
} else if strings.Contains(err.Error(), "connect: connection refused") {
} else if strings.Contains(dialErr.Error(), "connect: connection refused") {
e = "connection refused"
}
}
Expand Down

0 comments on commit 3665bd0

Please sign in to comment.