Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

Commit

Permalink
Merge pull request #273 from libp2p/clean-up-self-dial
Browse files Browse the repository at this point in the history
remove redundant self-dialing check, simplify starting of dialWorkerLoop
  • Loading branch information
marten-seemann authored Sep 7, 2021
2 parents 29bd937 + 1bf84f7 commit 93f7ca9
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 63 deletions.
15 changes: 5 additions & 10 deletions dial_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
)

// DialWorerFunc is used by DialSync to spawn a new dial worker
type dialWorkerFunc func(peer.ID, <-chan dialRequest) error
// dialWorkerFunc is used by DialSync to spawn a new dial worker
type dialWorkerFunc func(peer.ID, <-chan dialRequest)

// newDialSync constructs a new DialSync
func newDialSync(worker dialWorkerFunc) *DialSync {
Expand Down Expand Up @@ -93,12 +93,7 @@ func (ds *DialSync) getActiveDial(p peer.ID) (*activeDial, error) {
reqch: make(chan dialRequest),
ds: ds,
}

if err := ds.dialWorker(p, actd.reqch); err != nil {
cancel()
return nil, err
}

go ds.dialWorker(p, actd.reqch)
ds.dials[p] = actd
}

Expand All @@ -108,9 +103,9 @@ func (ds *DialSync) getActiveDial(p peer.ID) (*activeDial, error) {
return actd, nil
}

// DialLock initiates a dial to the given peer if there are none in progress
// Dial initiates a dial to the given peer if there are none in progress
// then waits for the dial to that peer to complete.
func (ds *DialSync) DialLock(ctx context.Context, p peer.ID) (*Conn, error) {
func (ds *DialSync) Dial(ctx context.Context, p peer.ID) (*Conn, error) {
ad, err := ds.getActiveDial(p)
if err != nil {
return nil, err
Expand Down
53 changes: 13 additions & 40 deletions dial_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func getMockDialFunc() (dialWorkerFunc, func(), context.Context, <-chan struct{}
dfcalls := make(chan struct{}, 512) // buffer it large enough that we won't care
dialctx, cancel := context.WithCancel(context.Background())
ch := make(chan struct{})
f := func(p peer.ID, reqch <-chan dialRequest) error {
f := func(p peer.ID, reqch <-chan dialRequest) {
defer cancel()
dfcalls <- struct{}{}
go func() {
Expand All @@ -24,7 +24,6 @@ func getMockDialFunc() (dialWorkerFunc, func(), context.Context, <-chan struct{}
req.resch <- dialResponse{conn: new(Conn)}
}
}()
return nil
}

var once sync.Once
Expand All @@ -38,14 +37,14 @@ func TestBasicDialSync(t *testing.T) {

finished := make(chan struct{}, 2)
go func() {
if _, err := dsync.DialLock(context.Background(), p); err != nil {
if _, err := dsync.Dial(context.Background(), p); err != nil {
t.Error(err)
}
finished <- struct{}{}
}()

go func() {
if _, err := dsync.DialLock(context.Background(), p); err != nil {
if _, err := dsync.Dial(context.Background(), p); err != nil {
t.Error(err)
}
finished <- struct{}{}
Expand Down Expand Up @@ -74,7 +73,7 @@ func TestDialSyncCancel(t *testing.T) {

finished := make(chan struct{})
go func() {
_, err := dsync.DialLock(ctx1, p)
_, err := dsync.Dial(ctx1, p)
if err != ctx1.Err() {
t.Error("should have gotten context error")
}
Expand All @@ -90,7 +89,7 @@ func TestDialSyncCancel(t *testing.T) {

// Add a second dialwait in so two actors are waiting on the same dial
go func() {
_, err := dsync.DialLock(context.Background(), p)
_, err := dsync.Dial(context.Background(), p)
if err != nil {
t.Error(err)
}
Expand Down Expand Up @@ -123,15 +122,15 @@ func TestDialSyncAllCancel(t *testing.T) {

finished := make(chan struct{})
go func() {
if _, err := dsync.DialLock(ctx, p); err != ctx.Err() {
if _, err := dsync.Dial(ctx, p); err != ctx.Err() {
t.Error("should have gotten context error")
}
finished <- struct{}{}
}()

// Add a second dialwait in so two actors are waiting on the same dial
go func() {
if _, err := dsync.DialLock(ctx, p); err != ctx.Err() {
if _, err := dsync.Dial(ctx, p); err != ctx.Err() {
t.Error("should have gotten context error")
}
finished <- struct{}{}
Expand All @@ -155,14 +154,14 @@ func TestDialSyncAllCancel(t *testing.T) {

// should be able to successfully dial that peer again
done()
if _, err := dsync.DialLock(context.Background(), p); err != nil {
if _, err := dsync.Dial(context.Background(), p); err != nil {
t.Fatal(err)
}
}

func TestFailFirst(t *testing.T) {
var count int32
f := func(p peer.ID, reqch <-chan dialRequest) error {
f := func(p peer.ID, reqch <-chan dialRequest) {
go func() {
for {
req, ok := <-reqch
Expand All @@ -178,33 +177,29 @@ func TestFailFirst(t *testing.T) {
atomic.AddInt32(&count, 1)
}
}()
return nil
}

ds := newDialSync(f)

p := peer.ID("testing")

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

_, err := ds.DialLock(ctx, p)
if err == nil {
if _, err := ds.Dial(ctx, p); err == nil {
t.Fatal("expected gophers to have eaten the modem")
}

c, err := ds.DialLock(ctx, p)
c, err := ds.Dial(ctx, p)
if err != nil {
t.Fatal(err)
}

if c == nil {
t.Fatal("should have gotten a 'real' conn back")
}
}

func TestStressActiveDial(t *testing.T) {
ds := newDialSync(func(p peer.ID, reqch <-chan dialRequest) error {
ds := newDialSync(func(p peer.ID, reqch <-chan dialRequest) {
go func() {
for {
req, ok := <-reqch
Expand All @@ -214,7 +209,6 @@ func TestStressActiveDial(t *testing.T) {
req.resch <- dialResponse{}
}
}()
return nil
})

wg := sync.WaitGroup{}
Expand All @@ -223,7 +217,7 @@ func TestStressActiveDial(t *testing.T) {

makeDials := func() {
for i := 0; i < 10000; i++ {
ds.DialLock(context.Background(), pid)
ds.Dial(context.Background(), pid)
}
wg.Done()
}
Expand All @@ -235,24 +229,3 @@ func TestStressActiveDial(t *testing.T) {

wg.Wait()
}

func TestDialSelf(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

self := peer.ID("ABC")
s := NewSwarm(ctx, self, nil, nil)
defer s.Close()

// this should fail
_, err := s.dsync.DialLock(ctx, self)
if err != ErrDialToSelf {
t.Fatal("expected error from self dial")
}

// do it twice to make sure we get a new active dial object that fails again
_, err = s.dsync.DialLock(ctx, self)
if err != ErrDialToSelf {
t.Fatal("expected error from self dial")
}
}
2 changes: 1 addition & 1 deletion dial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,7 @@ func TestDialSimultaneousJoin(t *testing.T) {
}
}

func TestDialSelf2(t *testing.T) {
func TestDialSelf(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down
2 changes: 1 addition & 1 deletion swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func NewSwarm(ctx context.Context, local peer.ID, peers peerstore.Peerstore, bwc
}
}

s.dsync = newDialSync(s.startDialWorker)
s.dsync = newDialSync(s.dialWorkerLoop)
s.limiter = newDialLimiter(s.dialAddr)
s.proc = goprocessctx.WithContext(ctx)
s.ctx = goprocessctx.OnClosingContext(s.proc)
Expand Down
13 changes: 2 additions & 11 deletions swarm_dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func (s *Swarm) dialPeer(ctx context.Context, p peer.ID) (*Conn, error) {
ctx, cancel := context.WithTimeout(ctx, network.GetDialPeerTimeout(ctx))
defer cancel()

conn, err = s.dsync.DialLock(ctx, p)
conn, err = s.dsync.Dial(ctx, p)
if err == nil {
return conn, nil
}
Expand Down Expand Up @@ -294,16 +294,7 @@ type dialResponse struct {
err error
}

// startDialWorker starts an active dial goroutine that synchronizes and executes concurrent dials
func (s *Swarm) startDialWorker(p peer.ID, reqch <-chan dialRequest) error {
if p == s.local {
return ErrDialToSelf
}

go s.dialWorkerLoop(p, reqch)
return nil
}

// dialWorkerLoop synchronizes and executes concurrent dials to a single peer
func (s *Swarm) dialWorkerLoop(p peer.ID, reqch <-chan dialRequest) {
defer s.limiter.clearAllPeerDials(p)

Expand Down

0 comments on commit 93f7ca9

Please sign in to comment.