From 4d82225d15220a885b08d90f31ef3f0ef0310c35 Mon Sep 17 00:00:00 2001 From: Janos Guljas Date: Tue, 15 Jan 2019 12:22:18 +0100 Subject: [PATCH 1/3] swarm/network: fix TestFetcherRetryOnTimeout race --- swarm/network/fetcher.go | 9 +++++++-- swarm/network/fetcher_test.go | 29 +++++++++++++++++++++-------- 2 files changed, 28 insertions(+), 10 deletions(-) diff --git a/swarm/network/fetcher.go b/swarm/network/fetcher.go index 6aed57e225..9f4311a4d7 100644 --- a/swarm/network/fetcher.go +++ b/swarm/network/fetcher.go @@ -106,7 +106,7 @@ func NewFetcherFactory(request RequestFunc, skipCheck bool) *FetcherFactory { // The created Fetcher is started and returned. func (f *FetcherFactory) New(ctx context.Context, source storage.Address, peersToSkip *sync.Map) storage.NetFetcher { fetcher := NewFetcher(source, f.request, f.skipCheck) - go fetcher.run(ctx, peersToSkip) + go fetcher.run(ctx, peersToSkip, nil) return fetcher } @@ -162,7 +162,12 @@ func (f *Fetcher) Request(ctx context.Context, hopCount uint8) { // start prepares the Fetcher // it keeps the Fetcher alive within the lifecycle of the passed context -func (f *Fetcher) run(ctx context.Context, peers *sync.Map) { +// If doneC is not nil it will be closed to signal that the run function has returned. +// Not nil doneC is used in tests to synchronize cleanup operations when the test is done. +func (f *Fetcher) run(ctx context.Context, peers *sync.Map, doneC chan struct{}) { + if doneC != nil { + defer close(doneC) + } var ( doRequest bool // determines if retrieval is initiated in the current iteration wait *time.Timer // timer for search timeout diff --git a/swarm/network/fetcher_test.go b/swarm/network/fetcher_test.go index 3a926f4758..e9b1ab78ae 100644 --- a/swarm/network/fetcher_test.go +++ b/swarm/network/fetcher_test.go @@ -80,7 +80,7 @@ func TestFetcherSingleRequest(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go fetcher.run(ctx, peersToSkip) + go fetcher.run(ctx, peersToSkip, nil) rctx := context.Background() fetcher.Request(rctx, 0) @@ -122,7 +122,7 @@ func TestFetcherCancelStopsFetcher(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) // we start the fetcher, and then we immediately cancel the context - go fetcher.run(ctx, peersToSkip) + go fetcher.run(ctx, peersToSkip, nil) cancel() rctx, rcancel := context.WithTimeout(ctx, 100*time.Millisecond) @@ -150,7 +150,7 @@ func TestFetcherCancelStopsRequest(t *testing.T) { defer cancel() // we start the fetcher with an active context - go fetcher.run(ctx, peersToSkip) + go fetcher.run(ctx, peersToSkip, nil) rctx, rcancel := context.WithCancel(context.Background()) rcancel() @@ -190,7 +190,7 @@ func TestFetcherOfferUsesSource(t *testing.T) { defer cancel() // start the fetcher - go fetcher.run(ctx, peersToSkip) + go fetcher.run(ctx, peersToSkip, nil) rctx := context.Background() // call the Offer function with the source peer @@ -242,7 +242,7 @@ func TestFetcherOfferAfterRequestUsesSourceFromContext(t *testing.T) { defer cancel() // start the fetcher - go fetcher.run(ctx, peersToSkip) + go fetcher.run(ctx, peersToSkip, nil) // call Request first rctx := context.Background() @@ -293,11 +293,24 @@ func TestFetcherRetryOnTimeout(t *testing.T) { }(searchTimeout) searchTimeout = 250 * time.Millisecond + // fetcherDoneC will be closed when fetcher run function returns. + // Context cancel defer will close the context and deferred function + // that waits for fetcherDoneC will prevent searchTimeout defer function + // to change searchTimeout before fetcher run returns. + fetcherDoneC := make(chan struct{}) + defer func() { + select { + case <-fetcherDoneC: + case <-time.After(10 * time.Second): + t.Error("fetcher run function did not finish") + } + }() + ctx, cancel := context.WithCancel(context.Background()) defer cancel() // start the fetcher - go fetcher.run(ctx, peersToSkip) + go fetcher.run(ctx, peersToSkip, fetcherDoneC) // call the fetch function with an active context rctx := context.Background() @@ -370,7 +383,7 @@ func TestFetcherRequestQuitRetriesRequest(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go fetcher.run(ctx, peersToSkip) + go fetcher.run(ctx, peersToSkip, nil) rctx := context.Background() fetcher.Request(rctx, 0) @@ -473,7 +486,7 @@ func TestFetcherMaxHopCount(t *testing.T) { peersToSkip := &sync.Map{} - go fetcher.run(ctx, peersToSkip) + go fetcher.run(ctx, peersToSkip, nil) rctx := context.Background() fetcher.Request(rctx, maxHopCount) From f35879a4bf75b6689e095a78140179557e429105 Mon Sep 17 00:00:00 2001 From: Janos Guljas Date: Tue, 15 Jan 2019 13:28:37 +0100 Subject: [PATCH 2/3] swarm/network: fix races between different fetcher tests --- swarm/network/fetcher.go | 10 +- swarm/network/fetcher_test.go | 181 ++++++++++++++++++++++++++++++---- 2 files changed, 170 insertions(+), 21 deletions(-) diff --git a/swarm/network/fetcher.go b/swarm/network/fetcher.go index 9f4311a4d7..da669b044e 100644 --- a/swarm/network/fetcher.go +++ b/swarm/network/fetcher.go @@ -106,7 +106,7 @@ func NewFetcherFactory(request RequestFunc, skipCheck bool) *FetcherFactory { // The created Fetcher is started and returned. func (f *FetcherFactory) New(ctx context.Context, source storage.Address, peersToSkip *sync.Map) storage.NetFetcher { fetcher := NewFetcher(source, f.request, f.skipCheck) - go fetcher.run(ctx, peersToSkip, nil) + go fetcher.run(ctx, peersToSkip) return fetcher } @@ -164,9 +164,9 @@ func (f *Fetcher) Request(ctx context.Context, hopCount uint8) { // it keeps the Fetcher alive within the lifecycle of the passed context // If doneC is not nil it will be closed to signal that the run function has returned. // Not nil doneC is used in tests to synchronize cleanup operations when the test is done. -func (f *Fetcher) run(ctx context.Context, peers *sync.Map, doneC chan struct{}) { - if doneC != nil { - defer close(doneC) +func (f *Fetcher) run(ctx context.Context, peers *sync.Map) { + if testHookFetcherRun != nil { + defer testHookFetcherRun() } var ( doRequest bool // determines if retrieval is initiated in the current iteration @@ -255,6 +255,8 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map, doneC chan struct{}) } } +var testHookFetcherRun func() + // doRequest attempts at finding a peer to request the chunk from // * first it tries to request explicitly from peers that are known to have offered the chunk // * if there are no such peers (available) it tries to request it from a peer closest to the chunk address diff --git a/swarm/network/fetcher_test.go b/swarm/network/fetcher_test.go index e9b1ab78ae..035780b4bd 100644 --- a/swarm/network/fetcher_test.go +++ b/swarm/network/fetcher_test.go @@ -77,10 +77,19 @@ func TestFetcherSingleRequest(t *testing.T) { peersToSkip.Store(p, time.Now()) } + // fetcherDoneC will be closed when fetcher run function returns. + // Waiting for fetcher run function goroutine to terminate + // prevents the race condition on searchTimeout value change by other + // tests. The fetcherDoneC defer function will wait for a proper + // goroutine cleanup. + fetcherDoneC := make(chan struct{}) + defer setTestHookFetcherRun(func() { close(fetcherDoneC) })() + defer waitFetcherDone(t, fetcherDoneC) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go fetcher.run(ctx, peersToSkip, nil) + go fetcher.run(ctx, peersToSkip) rctx := context.Background() fetcher.Request(rctx, 0) @@ -119,10 +128,19 @@ func TestFetcherCancelStopsFetcher(t *testing.T) { peersToSkip := &sync.Map{} + // fetcherDoneC will be closed when fetcher run function returns. + // Waiting for fetcher run function goroutine to terminate + // prevents the race condition on searchTimeout value change by other + // tests. The fetcherDoneC defer function will wait for a proper + // goroutine cleanup. + fetcherDoneC := make(chan struct{}) + defer setTestHookFetcherRun(func() { close(fetcherDoneC) })() + defer waitFetcherDone(t, fetcherDoneC) + ctx, cancel := context.WithCancel(context.Background()) // we start the fetcher, and then we immediately cancel the context - go fetcher.run(ctx, peersToSkip, nil) + go fetcher.run(ctx, peersToSkip) cancel() rctx, rcancel := context.WithTimeout(ctx, 100*time.Millisecond) @@ -146,11 +164,20 @@ func TestFetcherCancelStopsRequest(t *testing.T) { peersToSkip := &sync.Map{} + // fetcherDoneC will be closed when fetcher run function returns. + // Waiting for fetcher run function goroutine to terminate + // prevents the race condition on searchTimeout value change by other + // tests. The fetcherDoneC defer function will wait for a proper + // goroutine cleanup. + fetcherDoneC := make(chan struct{}) + defer setTestHookFetcherRun(func() { close(fetcherDoneC) })() + defer waitFetcherDone(t, fetcherDoneC) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() // we start the fetcher with an active context - go fetcher.run(ctx, peersToSkip, nil) + go fetcher.run(ctx, peersToSkip) rctx, rcancel := context.WithCancel(context.Background()) rcancel() @@ -186,11 +213,20 @@ func TestFetcherOfferUsesSource(t *testing.T) { peersToSkip := &sync.Map{} + // fetcherDoneC will be closed when fetcher run function returns. + // Waiting for fetcher run function goroutine to terminate + // prevents the race condition on searchTimeout value change by other + // tests. The fetcherDoneC defer function will wait for a proper + // goroutine cleanup. + fetcherDoneC := make(chan struct{}) + defer setTestHookFetcherRun(func() { close(fetcherDoneC) })() + defer waitFetcherDone(t, fetcherDoneC) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() // start the fetcher - go fetcher.run(ctx, peersToSkip, nil) + go fetcher.run(ctx, peersToSkip) rctx := context.Background() // call the Offer function with the source peer @@ -238,11 +274,20 @@ func TestFetcherOfferAfterRequestUsesSourceFromContext(t *testing.T) { peersToSkip := &sync.Map{} + // fetcherDoneC will be closed when fetcher run function returns. + // Waiting for fetcher run function goroutine to terminate + // prevents the race condition on searchTimeout value change by other + // tests. The fetcherDoneC defer function will wait for a proper + // goroutine cleanup. + fetcherDoneC := make(chan struct{}) + defer setTestHookFetcherRun(func() { close(fetcherDoneC) })() + defer waitFetcherDone(t, fetcherDoneC) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() // start the fetcher - go fetcher.run(ctx, peersToSkip, nil) + go fetcher.run(ctx, peersToSkip) // call Request first rctx := context.Background() @@ -298,19 +343,14 @@ func TestFetcherRetryOnTimeout(t *testing.T) { // that waits for fetcherDoneC will prevent searchTimeout defer function // to change searchTimeout before fetcher run returns. fetcherDoneC := make(chan struct{}) - defer func() { - select { - case <-fetcherDoneC: - case <-time.After(10 * time.Second): - t.Error("fetcher run function did not finish") - } - }() + defer setTestHookFetcherRun(func() { close(fetcherDoneC) })() + defer waitFetcherDone(t, fetcherDoneC) ctx, cancel := context.WithCancel(context.Background()) defer cancel() // start the fetcher - go fetcher.run(ctx, peersToSkip, fetcherDoneC) + go fetcher.run(ctx, peersToSkip) // call the fetch function with an active context rctx := context.Background() @@ -354,9 +394,21 @@ func TestFetcherFactory(t *testing.T) { peersToSkip := &sync.Map{} - fetcher := fetcherFactory.New(context.Background(), addr, peersToSkip) + // fetcherDoneC will be closed when fetcher run function returns. + // Waiting for fetcher run function goroutine to terminate + // prevents the race condition on searchTimeout value change by other + // tests. The fetcherDoneC defer function will wait for a proper + // goroutine cleanup. + fetcherDoneC := make(chan struct{}) + defer setTestHookFetcherRun(func() { close(fetcherDoneC) })() + defer waitFetcherDone(t, fetcherDoneC) - fetcher.Request(context.Background(), 0) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + fetcher := fetcherFactory.New(ctx, addr, peersToSkip) + + fetcher.Request(ctx, 0) // check if the created fetchFunction really starts a fetcher and initiates a request select { @@ -380,10 +432,19 @@ func TestFetcherRequestQuitRetriesRequest(t *testing.T) { peersToSkip := &sync.Map{} + // fetcherDoneC will be closed when fetcher run function returns. + // Waiting for fetcher run function goroutine to terminate + // prevents the race condition on searchTimeout value change by other + // tests. The fetcherDoneC defer function will wait for a proper + // goroutine cleanup. + fetcherDoneC := make(chan struct{}) + defer setTestHookFetcherRun(func() { close(fetcherDoneC) })() + defer waitFetcherDone(t, fetcherDoneC) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go fetcher.run(ctx, peersToSkip, nil) + go fetcher.run(ctx, peersToSkip) rctx := context.Background() fetcher.Request(rctx, 0) @@ -481,12 +542,21 @@ func TestFetcherMaxHopCount(t *testing.T) { addr := make([]byte, 32) fetcher := NewFetcher(addr, requester.doRequest, true) + // fetcherDoneC will be closed when fetcher run function returns. + // Waiting for fetcher run function goroutine to terminate + // prevents the race condition on searchTimeout value change by other + // tests. The fetcherDoneC defer function will wait for a proper + // goroutine cleanup. + fetcherDoneC := make(chan struct{}) + defer setTestHookFetcherRun(func() { close(fetcherDoneC) })() + defer waitFetcherDone(t, fetcherDoneC) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() peersToSkip := &sync.Map{} - go fetcher.run(ctx, peersToSkip, nil) + go fetcher.run(ctx, peersToSkip) rctx := context.Background() fetcher.Request(rctx, maxHopCount) @@ -498,3 +568,80 @@ func TestFetcherMaxHopCount(t *testing.T) { case <-time.After(200 * time.Millisecond): } } + +// waitFetcherDone is a helper function that waits for +// a channel to be closed. +func waitFetcherDone(t *testing.T, doneC <-chan struct{}) { + t.Helper() + select { + case _, ok := <-doneC: + if ok { + t.Error("channel struct{} received, but channel not closed") + } + case <-time.After(10 * time.Second): + t.Error("fetcher run function did not finish") + } +} + +// setTestHookFetcherRun sets testHookFetcherRun and +// returns a function that will reset it to the +// value before the change. +func setTestHookFetcherRun(h func()) (reset func()) { + current := testHookFetcherRun + reset = func() { testHookFetcherRun = current } + testHookFetcherRun = h + return reset +} + +// TestSetTestHookFetcherRun tests if setTestHookFetcherRun changes +// testHookFetcherRun function correctly and if its reset function +// resets the original function. +func TestSetTestHookFetcherRun(t *testing.T) { + // Set the current function after the test finishes. + defer func(h func()) { testHookFetcherRun = h }(testHookFetcherRun) + + // expected value for the unchanged function + original := 1 + // expected value for the changed function + changed := 2 + + // this variable will be set with two different functions + var got int + + // define the original (unchanged) functions + testHookFetcherRun = func() { + got = original + } + + // set got variable + testHookFetcherRun() + + // test if got variable is set correctly + if got != original { + t.Errorf("got hook value %v, want %v", got, original) + } + + // set the new function + reset := setTestHookFetcherRun(func() { + got = changed + }) + + // set got variable + testHookFetcherRun() + + // test if got variable is set correctly to changed value + if got != changed { + t.Errorf("got hook value %v, want %v", got, changed) + } + + // set the function to the original one + reset() + + // set got variable + testHookFetcherRun() + + // test if got variable is set correctly to original value + if got != original { + t.Errorf("got hook value %v, want %v", got, original) + } +} From f8b4dbb50cda21a0e19bf93a088886974127082e Mon Sep 17 00:00:00 2001 From: Janos Guljas Date: Tue, 15 Jan 2019 13:30:35 +0100 Subject: [PATCH 3/3] swarm/network: prevent mockRequester.doDequest deadlock --- swarm/network/fetcher_test.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/swarm/network/fetcher_test.go b/swarm/network/fetcher_test.go index 035780b4bd..32625bfde6 100644 --- a/swarm/network/fetcher_test.go +++ b/swarm/network/fetcher_test.go @@ -52,7 +52,11 @@ func (m *mockRequester) doRequest(ctx context.Context, request *Request) (*enode m.count++ } time.Sleep(waitTime) - m.requestC <- request + select { + case m.requestC <- request: + case <-ctx.Done(): + return nil, nil, ctx.Err() + } // if there is a Source in the request use that, if not use the global requestedPeerId source := request.Source