diff --git a/dial_queue.go b/dial_queue.go index 62fbf03e5..75cc703f6 100644 --- a/dial_queue.go +++ b/dial_queue.go @@ -110,7 +110,7 @@ type waitingCh struct { // end up adding fuel to the fire. Since we have no deterministic way to detect this for now, we hard-limit concurrency // to config.maxParallelism. func newDialQueue(params *dqParams) (*dialQueue, error) { - sq := &dialQueue{ + dq := &dialQueue{ dqParams: params, nWorkers: params.config.minParallelism, out: queue.NewChanQueue(params.ctx, queue.NewXORDistancePQ(params.target)), @@ -121,10 +121,10 @@ func newDialQueue(params *dqParams) (*dialQueue, error) { } for i := 0; i < int(params.config.minParallelism); i++ { - go sq.worker() + go dq.worker() } - go sq.control() - return sq, nil + go dq.control() + return dq, nil } func (dq *dialQueue) control() { @@ -323,7 +323,14 @@ func (dq *dialQueue) worker() { } logger.Debugf("dialling %v took %dms (as observed by the dht subsystem).", p, time.Since(t)/time.Millisecond) waiting := len(dq.waitingCh) - dq.out.EnqChan <- p + + // by the time we're done dialling, it's possible that the context is closed, in which case there will + // be nobody listening on dq.out.EnqChan and we could block forever. + select { + case dq.out.EnqChan <- p: + case <-dq.ctx.Done(): + return + } if waiting > 0 { // we have somebody to deliver this value to, so no need to shrink. continue