Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(all): remove unneeded timer channel drains #3002

Merged
merged 1 commit into from
Jan 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions dot/network/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,7 @@ func (d *discovery) advertise() {

select {
case <-d.ctx.Done():
if !timer.Stop() {
<-timer.C
}
timer.Stop()
return
case <-timer.C:
logger.Debug("advertising ourselves in the DHT...")
Expand Down
4 changes: 1 addition & 3 deletions dot/network/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,9 +362,7 @@ func (s *Service) sendHandshake(peer peer.ID, hs Handshake, info *notificationsP
closeOutboundStream(info, peer, stream)
return nil, errHandshakeTimeout
case hsResponse := <-s.readHandshake(stream, info.handshakeDecoder, info.maxSize):
if !hsTimer.Stop() {
<-hsTimer.C
}
hsTimer.Stop()

if hsResponse.err != nil {
logger.Tracef("failed to read handshake from peer %s using protocol %s: %s", peer, info.protocolID, hsResponse.err)
Expand Down
4 changes: 1 addition & 3 deletions dot/network/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,7 @@ func (s *Service) createBatchMessageHandler(txnBatchCh chan *batchMessage) Notif

select {
case txnBatchCh <- data:
if !timer.Stop() {
<-timer.C
}
timer.Stop()
case <-timer.C:
logger.Debugf("transaction message %s for peer %s not included into batch", msg, peer)
}
Expand Down
4 changes: 1 addition & 3 deletions internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,7 @@ func (s *Server) Stop() (err error) {
select {
case err := <-s.done:
close(s.done)
if !timeout.Stop() {
<-timeout.C
}
timeout.Stop()
if err != nil {
return err
}
Expand Down
22 changes: 6 additions & 16 deletions lib/babe/babe.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,29 +216,24 @@ func (b *Service) waitForFirstBlock() error {

const firstBlockTimeout = time.Minute * 5
timer := time.NewTimer(firstBlockTimeout)
cleanup := func() {
if !timer.Stop() {
<-timer.C
}
}

// loop until block 1
for {
select {
case block, ok := <-ch:
if !ok {
cleanup()
timer.Stop()
return errChannelClosed
}

if ok && block.Header.Number > 0 {
cleanup()
timer.Stop()
return nil
}
case <-timer.C:
return errFirstBlockTimeout
case <-b.ctx.Done():
cleanup()
timer.Stop()
return b.ctx.Err()
}
}
Expand Down Expand Up @@ -408,28 +403,23 @@ func (b *Service) handleEpoch(epoch uint64) (next uint64, err error) {

nextEpochStartTime := getSlotStartTime(nextEpochStart, b.constants.slotDuration)
epochTimer := time.NewTimer(time.Until(nextEpochStartTime))
cleanup := func() {
if !epochTimer.Stop() {
<-epochTimer.C
}
}

errCh := make(chan error)
go b.epochHandler.run(ctx, errCh)

select {
case <-b.ctx.Done():
cleanup()
epochTimer.Stop()
return 0, b.ctx.Err()
case <-b.pause:
cleanup()
epochTimer.Stop()
return 0, errServicePaused
case <-epochTimer.C:
// stop current epoch handler
cancel()
case err := <-errCh:
// TODO: errEpochPast is sent on this channel, but it doesnot get logged here
cleanup()
epochTimer.Stop()
logger.Errorf("error from epochHandler: %s", err)
}

Expand Down
12 changes: 3 additions & 9 deletions lib/babe/epoch_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,22 +103,16 @@ func (h *epochHandler) run(ctx context.Context, errCh chan<- error) {
logger.Debugf("start time of slot %d: %v", authoringSlot, startTime)
}

defer func() {
// cleanup timers if ctx was cancelled
for _, swt := range slotTimeTimers {
if !swt.timer.Stop() {
<-swt.timer.C
}
}
}()

logger.Debugf("authoring in %d slots in epoch %d", len(slotTimeTimers), h.epochNumber)

for _, swt := range slotTimeTimers {
logger.Debugf("waiting for next authoring slot %d", swt.slotNum)

select {
case <-ctx.Done():
for _, swt := range slotTimeTimers {
swt.timer.Stop()
}
return
case <-swt.timer.C:
// we must do a time correction as the slot timer sometimes is triggered
Expand Down
4 changes: 1 addition & 3 deletions lib/babe/epoch_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,7 @@ func TestEpochHandler_run(t *testing.T) {
case <-timer.C:
require.Equal(t, epochLength-(firstExecutedSlot-startSlot), callsToHandleSlot)
case err := <-errCh:
if !timer.Stop() {
<-timer.C
}
timer.Stop()
require.NoError(t, err)
}
}
15 changes: 3 additions & 12 deletions lib/grandpa/finalisation.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,14 +370,8 @@ func (f *finalisationEngine) defineRoundVotes() error {
for !precommited {
select {
case <-f.stopCh:
if !determinePrevoteTimer.Stop() {
<-determinePrevoteTimer.C
}

if !determinePrecommitTimer.Stop() {
<-determinePrecommitTimer.C
}

determinePrevoteTimer.Stop()
determinePrecommitTimer.Stop()
return nil

case <-determinePrevoteTimer.C:
Expand All @@ -389,10 +383,7 @@ func (f *finalisationEngine) defineRoundVotes() error {
if alreadyCompletable {
f.actionCh <- alreadyFinalized

if !determinePrecommitTimer.Stop() {
<-determinePrecommitTimer.C
}

determinePrecommitTimer.Stop()
return nil
}

Expand Down
4 changes: 1 addition & 3 deletions tests/rpc/system_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,7 @@ func TestStableNetworkRPC(t *testing.T) { //nolint:tparallel
select {
case <-timer.C:
case <-ctx.Done():
if !timer.Stop() {
<-timer.C
}
timer.Stop()
return
}
}
Expand Down
4 changes: 1 addition & 3 deletions tests/stress/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,7 @@ func compareChainHeadsWithRetry(ctx context.Context, nodes node.Nodes,
select {
case <-timer.C:
case <-ctx.Done():
if !timer.Stop() {
<-timer.C
}
timer.Stop()
return fmt.Errorf("%w: hashes=%v", err, hashes) // last error
}
}
Expand Down
4 changes: 1 addition & 3 deletions tests/stress/stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,9 +553,7 @@ func TestSync_SubmitExtrinsic(t *testing.T) {
select {
case <-timer.C:
case <-waitNoExtCtx.Done():
if !timer.Stop() {
<-timer.C
}
timer.Stop()
require.NoError(t, waitNoExtCtx.Err())
}
}
Expand Down