Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Remove goprocess #118

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 14 additions & 22 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"
"time"

bsmonitor "github.com/ipfs/go-bitswap/monitor"
bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"

decision "github.com/ipfs/go-bitswap/decision"
Expand All @@ -27,8 +28,6 @@ import (
exchange "github.com/ipfs/go-ipfs-exchange-interface"
logging "github.com/ipfs/go-log"
metrics "github.com/ipfs/go-metrics-interface"
process "github.com/jbenet/goprocess"
procctx "github.com/jbenet/goprocess/context"
peer "github.com/libp2p/go-libp2p-peer"
)

Expand All @@ -54,6 +53,8 @@ var (
provideKeysBufferSize = 2048
provideWorkerMax = 6

shutdownTimeout = time.Duration(0)

// the 1<<18+15 is to observe old file chunks that are 1<<18 + 14 in size
metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22}
)
Expand Down Expand Up @@ -81,10 +82,6 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
sentHistogram := metrics.NewCtx(ctx, "sent_all_blocks_bytes", "Histogram of blocks sent by"+
" this bitswap").Histogram(metricsBuckets)

px := process.WithTeardown(func() error {
return nil
})

peerQueueFactory := func(ctx context.Context, p peer.ID) bspm.PeerQueue {
return bsmq.New(ctx, p, network)
}
Expand All @@ -102,11 +99,12 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
return bssrs.New(ctx)
}

monitor := bsmonitor.New(shutdownTimeout)
bs := &Bitswap{
monitor: monitor,
blockstore: bstore,
engine: decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method
network: network,
process: px,
newBlocks: make(chan cid.Cid, HasBlockBufferSize),
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
wm: wm,
Expand All @@ -123,21 +121,16 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
network.SetDelegate(bs)

// Start up bitswaps async worker routines
bs.startWorkers(ctx, px)

// bind the context and process.
// do it over here to avoid closing before all setup is done.
go func() {
<-px.Closing() // process closes first
cancelFunc()
}()
procctx.CloseAfterContext(px, ctx) // parent cancelled first
bs.startWorkers(ctx)

bs.monitor.LinkContextCancellation(ctx, cancelFunc)
bs.monitor.Start()
return bs
}

// Bitswap instances implement the bitswap protocol.
type Bitswap struct {
monitor *bsmonitor.Monitor
// the wantlist tracks global wants for bitswap
wm *bswm.WantManager

Expand All @@ -161,8 +154,6 @@ type Bitswap struct {
// provideKeys directly feeds provide workers
provideKeys chan cid.Cid

process process.Process

// Counters for various statistics
counterLk sync.Mutex
counters *counters
Expand Down Expand Up @@ -232,7 +223,7 @@ func (bs *Bitswap) HasBlock(blk blocks.Block) error {
// @whyrusleeping, I don't know the answers you seek.
func (bs *Bitswap) receiveBlockFrom(blk blocks.Block, from peer.ID) error {
select {
case <-bs.process.Closing():
case <-bs.monitor.Closed():
return errors.New("bitswap is closed")
default:
}
Expand All @@ -257,8 +248,8 @@ func (bs *Bitswap) receiveBlockFrom(blk blocks.Block, from peer.ID) error {
select {
case bs.newBlocks <- blk.Cid():
// send block off to be reprovided
case <-bs.process.Closing():
return bs.process.Close()
case <-bs.monitor.Closed():
return bs.monitor.Err()
}
}
return nil
Expand Down Expand Up @@ -357,7 +348,8 @@ func (bs *Bitswap) ReceiveError(err error) {

// Close is called to shutdown Bitswap
func (bs *Bitswap) Close() error {
return bs.process.Close()
bs.monitor.Shutdown()
return nil
}

// GetWantlist returns the current local wantlist.
Expand Down
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ require (
github.com/ipfs/go-ipfs-util v0.0.1
github.com/ipfs/go-log v0.0.1
github.com/ipfs/go-metrics-interface v0.0.1
github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8
github.com/libp2p/go-libp2p v0.0.2
github.com/libp2p/go-libp2p-host v0.0.1
github.com/libp2p/go-libp2p-interface-connmgr v0.0.1
Expand All @@ -32,7 +31,7 @@ require (
github.com/multiformats/go-multiaddr v0.0.1
golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284 // indirect
golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c // indirect
golang.org/x/sync v0.0.0-20190423024810-112230192c58 // indirect
golang.org/x/sys v0.0.0-20190509141414-a5b02f93d862 // indirect
golang.org/x/text v0.3.2 // indirect
golang.org/x/tools v0.0.0-20190509153222-73554e0f7805 // indirect
)
4 changes: 1 addition & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,6 @@ golang.org/x/net v0.0.0-20180524181706-dfa909b99c79/go.mod h1:mL1N/T3taQHkDXs73r
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190227160552-c95aed5357e7 h1:C2F/nMkR/9sfUTpvR3QrjBuTdvMUC/cFajkphs1YLQo=
golang.org/x/net v0.0.0-20190227160552-c95aed5357e7/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c h1:uOCk1iQW6Vc18bnC13MfzScl+wdKBmM9Y9kU7Z83/lw=
golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
Expand All @@ -267,11 +266,10 @@ golang.org/x/sys v0.0.0-20190509141414-a5b02f93d862 h1:rM0ROo5vb9AdYJi1110yjWGMe
golang.org/x/sys v0.0.0-20190509141414-a5b02f93d862/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190509153222-73554e0f7805 h1:1ufBXAsTpUhSmmPXEEs5PrGQSfnBhsjAd2SmVhp9xrY=
golang.org/x/tools v0.0.0-20190509153222-73554e0f7805/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
google.golang.org/genproto v0.0.0-20180831171423-11092d34479b/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
Expand Down
167 changes: 167 additions & 0 deletions monitor/monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package monitor

import (
"context"
"errors"
"sync"
"time"
)

// Monitor starts and monitors closing of tasks, providing mechanisms
// to wait for tasks to complete. It does not assume contexts as the default
// waiting mechanism, but can be linked to them
type Monitor struct {
childrenLk sync.RWMutex
children []childRoutine
wg sync.WaitGroup
closeOnce sync.Once
shutdownTimeout time.Duration
err error
started chan struct{}
closed chan struct{}
closing chan struct{}
}

// New returns a new Monitor with the given timeout duration
func New(shutdownTimeout time.Duration) *Monitor {
return &Monitor{
closed: make(chan struct{}),
closing: make(chan struct{}),
started: make(chan struct{}),
shutdownTimeout: shutdownTimeout,
children: make([]childRoutine, 0),
}
}

// Add sets up a task to monitor. It takes three parameters:
// - a function to start the task
// - a function to tell the task to stop
// - a function to wait for a task to fully complete after it's told to stop
// How the underlying task manages it's internal operations is essentially an
// unknown
func (p *Monitor) Add(startFunc func(), stopFunc func(), waitForComplete func()) {
p.childrenLk.Lock()
defer p.childrenLk.Unlock()
p.children = append(p.children, childRoutine{startFunc, stopFunc, waitForComplete})
select {
case <-p.started:
startFunc()
default:
}
}

// AddRunnable is a task that is expressed as a function called to start and
// execute a task to completion, and an interrupt function that can cause the task
// to terminate.
func (p *Monitor) AddRunnable(runnable func(), interrupt func()) {
completeChan := make(chan struct{})
start := func() {
go func() {
defer close(completeChan)
runnable()
}()
}
waitForComplete := func() {
<-completeChan
}
p.Add(start, interrupt, waitForComplete)
}

// AddCancellable is for tasks that are functions that take a context and run until
// that context is cancelled.
func (p *Monitor) AddCancellable(ctx context.Context, cancellable func(context.Context)) {
subCtx, subCancel := context.WithCancel(ctx)
p.AddRunnable(func() {
defer subCancel()
cancellable(subCtx)
}, subCancel)
}

// LinkContextCancellation links a monitor to a context, so shutdown the monitor
// cancels the context and cancelling the context shuts down the monitor
func (p *Monitor) LinkContextCancellation(ctx context.Context, cancel context.CancelFunc) {
go func() {
defer cancel()
select {
case <-p.closing:
case <-ctx.Done():
p.Shutdown()
}
}()
}

// Start initiates tasks given to the monitor. Prior to calling start, added tasks
// are not started. After calling start, added tasks start as soon as
// you add them
func (p *Monitor) Start() {
p.childrenLk.RLock()
defer p.childrenLk.RUnlock()
for _, cr := range p.children {
cr.start()
}
close(p.started)
}

// Shutdown closes a monitor and waits for its underlying tasks to complete or
// a timeout to be reached
func (p *Monitor) Shutdown() error {
p.closeOnce.Do(func() {
p.childrenLk.RLock()
defer p.childrenLk.RUnlock()
defer close(p.closed)
for _, cr := range p.children {
cr.stop()
}
close(p.closing)
p.wg.Add(len(p.children))
for _, cr := range p.children {
go func(cr childRoutine) {
cr.waitForComplete()
p.wg.Done()
}(cr)
}
completeChan := make(chan struct{})
go func() {
defer close(completeChan)
p.wg.Wait()
}()
timeoutChan := func() <-chan time.Time {
if p.shutdownTimeout == 0 {
return nil
}
return time.After(p.shutdownTimeout)
}
select {
case <-completeChan:
case <-timeoutChan():
p.err = errors.New("Timeout Exceeded")
}
})
return p.err
}

// Closing is a channel that is readable once the monitor has begun closing
func (p *Monitor) Closing() <-chan struct{} {
return p.closing
}

// Closed is a channel that is readable once the monitor is finished closing
func (p *Monitor) Closed() <-chan struct{} {
return p.closed
}

// Err is the result of the Shutdown operation, or nil is not errors occurred
func (p *Monitor) Err() error {
select {
case <-p.closed:
return p.err
default:
return nil
}
}

type childRoutine struct {
start func()
stop func()
waitForComplete func()
}
Loading