Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swarm: track dial cancellation reason #2532

Merged
merged 4 commits into from
Aug 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 123 additions & 23 deletions dashboards/swarm/swarm.json
Original file line number Diff line number Diff line change
Expand Up @@ -2251,7 +2251,113 @@
},
"mappings": []
},
"overrides": []
"overrides": [
{
"matcher": {
"id": "byName",
"options": "canceled: concurrent dial successful"
},
"properties": [
{
"id": "color",
"value": {
"fixedColor": "super-light-blue",
"mode": "fixed"
}
}
]
},
{
"matcher": {
"id": "byName",
"options": "application canceled"
},
"properties": [
{
"id": "color",
"value": {
"fixedColor": "blue",
"mode": "fixed"
}
}
]
},
{
"matcher": {
"id": "byName",
"options": "canceled: other"
},
"properties": [
{
"id": "color",
"value": {
"fixedColor": "dark-blue",
"mode": "fixed"
}
}
]
},
{
"matcher": {
"id": "byName",
"options": "timeout"
},
"properties": [
{
"id": "color",
"value": {
"fixedColor": "orange",
"mode": "fixed"
}
}
]
},
{
"matcher": {
"id": "byName",
"options": "other"
},
"properties": [
{
"id": "color",
"value": {
"fixedColor": "red",
"mode": "fixed"
}
}
]
},
{
"matcher": {
"id": "byName",
"options": "deadline"
},
"properties": [
{
"id": "color",
"value": {
"fixedColor": "super-light-orange",
"mode": "fixed"
}
}
]
},
{
"matcher": {
"id": "byName",
"options": "connection refused"
},
"properties": [
{
"id": "color",
"value": {
"fixedColor": "green",
"mode": "fixed"
}
}
]
}
]
},
"gridPos": {
"h": 8,
Expand All @@ -2261,10 +2367,14 @@
},
"id": 15,
"options": {
"displayLabels": [],
"legend": {
"displayMode": "list",
"displayMode": "table",
"placement": "right",
"showLegend": true
"showLegend": true,
"values": [
"percent"
]
},
"pieType": "donut",
"reduceOptions": {
Expand Down Expand Up @@ -2795,7 +2905,7 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "sum(increase(libp2p_swarm_dial_ranking_delay_seconds_bucket{instance=~\"$instance\",le=\"inf\"}[$__range])) - ignoring(le) sum(increase(libp2p_swarm_dial_ranking_delay_seconds_bucket{instance=~\"$instance\",le=\"0.75\"}[$__range]))",
"expr": "sum(increase(libp2p_swarm_dial_ranking_delay_seconds_bucket{instance=~\"$instance\",le=\"+Inf\"}[$__range])) - ignoring(le) sum(increase(libp2p_swarm_dial_ranking_delay_seconds_bucket{instance=~\"$instance\",le=\"0.75\"}[$__range]))",
"hide": false,
"legendFormat": ">750ms",
"range": true,
Expand Down Expand Up @@ -3194,7 +3304,7 @@
},
"textMode": "value_and_name"
},
"pluginVersion": "9.3.6",
"pluginVersion": "10.0.1",
"targets": [
{
"datasource": {
Expand Down Expand Up @@ -3233,7 +3343,7 @@
},
{
"color": "green",
"value": 5
"value": 1
}
]
}
Expand All @@ -3259,7 +3369,7 @@
"showThresholdLabels": false,
"showThresholdMarkers": true
},
"pluginVersion": "9.3.6",
"pluginVersion": "10.0.1",
"targets": [
{
"datasource": {
Expand All @@ -3279,22 +3389,12 @@
"type": "gauge"
}
],
"refresh": false,
"schemaVersion": 37,
"refresh": "",
"schemaVersion": 38,
"style": "dark",
"tags": [],
"templating": {
"list": [
{
"hide": 0,
"label": "datasource",
"name": "DS_PROMETHEUS",
"options": [],
"query": "prometheus",
"refresh": 1,
"regex": "",
"type": "datasource"
},
{
"current": {},
"datasource": {
Expand All @@ -3320,13 +3420,13 @@
]
},
"time": {
"from": "now-1h",
"from": "now-15m",
"to": "now"
},
"timepicker": {},
"timezone": "",
"title": "go-libp2p Swarm",
"title": "libp2p Swarm",
"uid": "a15PyhO4z",
"version": 7,
"version": 4,
"weekStart": ""
}
}
46 changes: 26 additions & 20 deletions p2p/net/swarm/dial_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package swarm

import (
"context"
"errors"
"sync"

"github.com/libp2p/go-libp2p/core/network"
Expand All @@ -11,6 +12,9 @@ import (
// dialWorkerFunc is used by dialSync to spawn a new dial worker
type dialWorkerFunc func(peer.ID, <-chan dialRequest)

// errConcurrentDialSuccessful is used to signal that a concurrent dial succeeded
var errConcurrentDialSuccessful = errors.New("concurrent dial successful")

// newDialSync constructs a new dialSync
func newDialSync(worker dialWorkerFunc) *dialSync {
return &dialSync{
Expand All @@ -30,17 +34,12 @@ type dialSync struct {
type activeDial struct {
refCnt int

ctx context.Context
cancel func()
ctx context.Context
cancelCause func(error)

reqch chan dialRequest
}

func (ad *activeDial) close() {
ad.cancel()
close(ad.reqch)
}

func (ad *activeDial) dial(ctx context.Context) (*Conn, error) {
dialCtx := ad.ctx

Expand Down Expand Up @@ -74,11 +73,11 @@ func (ds *dialSync) getActiveDial(p peer.ID) (*activeDial, error) {
if !ok {
// This code intentionally uses the background context. Otherwise, if the first call
// to Dial is canceled, subsequent dial calls will also be canceled.
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancelCause(context.Background())
actd = &activeDial{
ctx: ctx,
cancel: cancel,
reqch: make(chan dialRequest),
ctx: ctx,
cancelCause: cancel,
reqch: make(chan dialRequest),
}
go ds.dialWorker(p, actd.reqch)
ds.dials[p] = actd
Expand All @@ -96,14 +95,21 @@ func (ds *dialSync) Dial(ctx context.Context, p peer.ID) (*Conn, error) {
return nil, err
}

defer func() {
ds.mutex.Lock()
defer ds.mutex.Unlock()
ad.refCnt--
if ad.refCnt == 0 {
ad.close()
delete(ds.dials, p)
conn, err := ad.dial(ctx)

ds.mutex.Lock()
defer ds.mutex.Unlock()

ad.refCnt--
if ad.refCnt == 0 {
if err == nil {
ad.cancelCause(errConcurrentDialSuccessful)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? What would happen if we're just dialing one address, and that dial is successful?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That dial would have succeeded so the context cancellation has no effect on that dial. That method would have returned.

There are two options:

  1. We set call cancelCause(nil) and live with the brittle check that errors.Is(Cause(err), context.Canceled) signals concurrent successful dial. Here we override actual cancelations of parent context with errParentCanceled. This is the strategy in 055cec0 commit.
  2. Is the latest commit. We explicitly signal errConcurrentDialSuccessful when err is nil. In case the dial succeeds, this is not a problem since we will not call FailedDial for tracking metrics in that case.

} else {
ad.cancelCause(err)
}
}()
return ad.dial(ctx)
close(ad.reqch)
delete(ds.dials, p)
}

return conn, err
}
2 changes: 1 addition & 1 deletion p2p/net/swarm/swarm_dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (tra

if err != nil {
if s.metricsTracer != nil {
s.metricsTracer.FailedDialing(addr, err)
s.metricsTracer.FailedDialing(addr, err, context.Cause(ctx))
}
return nil, err
}
Expand Down
24 changes: 17 additions & 7 deletions p2p/net/swarm/swarm_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ type MetricsTracer interface {
OpenedConnection(network.Direction, crypto.PubKey, network.ConnectionState, ma.Multiaddr)
ClosedConnection(network.Direction, time.Duration, network.ConnectionState, ma.Multiaddr)
CompletedHandshake(time.Duration, network.ConnectionState, ma.Multiaddr)
FailedDialing(ma.Multiaddr, error)
FailedDialing(ma.Multiaddr, error, error)
DialCompleted(success bool, totalDials int)
DialRankingDelay(d time.Duration)
UpdatedBlackHoleFilterState(name string, state blackHoleState, nextProbeAfter int, successFraction float64)
Expand Down Expand Up @@ -216,18 +216,28 @@ func (m *metricsTracer) CompletedHandshake(t time.Duration, cs network.Connectio
connHandshakeLatency.WithLabelValues(*tags...).Observe(t.Seconds())
}

func (m *metricsTracer) FailedDialing(addr ma.Multiaddr, err error) {
func (m *metricsTracer) FailedDialing(addr ma.Multiaddr, dialErr error, cause error) {
transport := metricshelper.GetTransport(addr)
e := "other"
if errors.Is(err, context.Canceled) {
e = "canceled"
} else if errors.Is(err, context.DeadlineExceeded) {
// dial deadline exceeded or the the parent contexts deadline exceeded
if errors.Is(dialErr, context.DeadlineExceeded) || errors.Is(cause, context.DeadlineExceeded) {
e = "deadline"
} else if errors.Is(dialErr, context.Canceled) {
// dial was cancelled.
if errors.Is(cause, context.Canceled) {
// parent context was canceled
e = "application canceled"
} else if errors.Is(cause, errConcurrentDialSuccessful) {
e = "canceled: concurrent dial successful"
} else {
// something else
e = "canceled: other"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we expect this to happen at all?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think so. Didnt happen on my kubo node. I ran it for about an hour

}
} else {
nerr, ok := err.(net.Error)
nerr, ok := dialErr.(net.Error)
if ok && nerr.Timeout() {
e = "timeout"
} else if strings.Contains(err.Error(), "connect: connection refused") {
} else if strings.Contains(dialErr.Error(), "connect: connection refused") {
e = "connection refused"
}
}
Expand Down
2 changes: 1 addition & 1 deletion p2p/net/swarm/swarm_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestMetricsNoAllocNoCover(t *testing.T) {
"CompletedHandshake": func() {
mt.CompletedHandshake(time.Duration(mrand.Intn(100))*time.Second, randItem(connections), randItem(addrs))
},
"FailedDialing": func() { mt.FailedDialing(randItem(addrs), randItem(errors)) },
"FailedDialing": func() { mt.FailedDialing(randItem(addrs), randItem(errors), randItem(errors)) },
"DialCompleted": func() { mt.DialCompleted(mrand.Intn(2) == 1, mrand.Intn(10)) },
"DialRankingDelay": func() { mt.DialRankingDelay(time.Duration(mrand.Intn(1e10))) },
"UpdatedBlackHoleFilterState": func() {
Expand Down