-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove goprocess from Host #865
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,8 +21,6 @@ import ( | |
inat "github.com/libp2p/go-libp2p-nat" | ||
|
||
logging "github.com/ipfs/go-log" | ||
"github.com/jbenet/goprocess" | ||
goprocessctx "github.com/jbenet/goprocess/context" | ||
|
||
ma "github.com/multiformats/go-multiaddr" | ||
madns "github.com/multiformats/go-multiaddr-dns" | ||
|
@@ -68,6 +66,9 @@ const NATPortMap Option = iota | |
// * uses an identity service to send + receive node information | ||
// * uses a nat service to establish NAT port mappings | ||
type BasicHost struct { | ||
ctx context.Context | ||
ctxCancel context.CancelFunc | ||
|
||
network network.Network | ||
mux *msmux.MultistreamMuxer | ||
ids *identify.IDService | ||
|
@@ -81,8 +82,6 @@ type BasicHost struct { | |
|
||
negtimeout time.Duration | ||
|
||
proc goprocess.Process | ||
|
||
emitters struct { | ||
evtLocalProtocolsUpdated event.Emitter | ||
evtLocalAddrsUpdated event.Emitter | ||
|
@@ -128,6 +127,8 @@ type HostOpts struct { | |
|
||
// NewHost constructs a new *BasicHost and activates it by attaching its stream and connection handlers to the given inet.Network. | ||
func NewHost(ctx context.Context, net network.Network, opts *HostOpts) (*BasicHost, error) { | ||
hostCtx, cancel := context.WithCancel(ctx) | ||
|
||
h := &BasicHost{ | ||
network: net, | ||
mux: msmux.NewMultistreamMuxer(), | ||
|
@@ -136,6 +137,8 @@ func NewHost(ctx context.Context, net network.Network, opts *HostOpts) (*BasicHo | |
maResolver: madns.DefaultResolver, | ||
eventbus: eventbus.NewBus(), | ||
addrChangeChan: make(chan struct{}, 1), | ||
ctx: hostCtx, | ||
ctxCancel: cancel, | ||
} | ||
|
||
var err error | ||
|
@@ -146,28 +149,12 @@ func NewHost(ctx context.Context, net network.Network, opts *HostOpts) (*BasicHo | |
return nil, err | ||
} | ||
|
||
h.proc = goprocessctx.WithContextAndTeardown(ctx, func() error { | ||
if h.natmgr != nil { | ||
h.natmgr.Close() | ||
} | ||
if h.cmgr != nil { | ||
h.cmgr.Close() | ||
} | ||
_ = h.emitters.evtLocalProtocolsUpdated.Close() | ||
_ = h.emitters.evtLocalAddrsUpdated.Close() | ||
return h.Network().Close() | ||
}) | ||
|
||
if opts.MultistreamMuxer != nil { | ||
h.mux = opts.MultistreamMuxer | ||
} | ||
|
||
// we can't set this as a default above because it depends on the *BasicHost. | ||
h.ids = identify.NewIDService( | ||
goprocessctx.WithProcessClosing(ctx, h.proc), | ||
h, | ||
identify.UserAgent(opts.UserAgent), | ||
) | ||
h.ids = identify.NewIDService(h, identify.UserAgent(opts.UserAgent)) | ||
|
||
if uint64(opts.NegotiationTimeout) != 0 { | ||
h.negtimeout = opts.NegotiationTimeout | ||
|
@@ -199,9 +186,33 @@ func NewHost(ctx context.Context, net network.Network, opts *HostOpts) (*BasicHo | |
net.SetConnHandler(h.newConnHandler) | ||
net.SetStreamHandler(h.newStreamHandler) | ||
|
||
// setup the teardown func | ||
go func() { | ||
select { | ||
case <-hostCtx.Done(): | ||
h.teardown() | ||
} | ||
}() | ||
|
||
return h, nil | ||
} | ||
|
||
func (h *BasicHost) teardown() { | ||
if h.natmgr != nil { | ||
h.natmgr.Close() | ||
} | ||
if h.cmgr != nil { | ||
h.cmgr.Close() | ||
} | ||
if h.ids != nil { | ||
h.ids.Close() | ||
} | ||
|
||
_ = h.emitters.evtLocalProtocolsUpdated.Close() | ||
_ = h.emitters.evtLocalAddrsUpdated.Close() | ||
h.Network().Close() | ||
} | ||
|
||
// New constructs and sets up a new *BasicHost with given Network and options. | ||
// The following options can be passed: | ||
// * NATPortMap | ||
|
@@ -242,7 +253,7 @@ func New(net network.Network, opts ...interface{}) *BasicHost { | |
|
||
// Start starts background tasks in the host | ||
func (h *BasicHost) Start() { | ||
h.proc.Go(h.background) | ||
go h.background() | ||
} | ||
|
||
// newConnHandler is the remote-opened conn handler for inet.Network | ||
|
@@ -343,7 +354,7 @@ func makeUpdatedAddrEvent(prev, current []ma.Multiaddr) *event.EvtLocalAddresses | |
return &evt | ||
} | ||
|
||
func (h *BasicHost) background(p goprocess.Process) { | ||
func (h *BasicHost) background() { | ||
// periodically schedules an IdentifyPush to update our peers for changes | ||
// in our address set (if needed) | ||
ticker := time.NewTicker(10 * time.Second) | ||
|
@@ -356,7 +367,7 @@ func (h *BasicHost) background(p goprocess.Process) { | |
select { | ||
case <-ticker.C: | ||
case <-h.addrChangeChan: | ||
case <-p.Closing(): | ||
case <-h.ctx.Done(): | ||
return | ||
} | ||
|
||
|
@@ -812,7 +823,8 @@ func (h *BasicHost) Close() error { | |
// This: | ||
// 1. May be called multiple times. | ||
// 2. May _never_ be called if the host is stopped by the context. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. comment maybe out of date now? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Have removed this comment. |
||
return h.proc.Close() | ||
h.ctxCancel() | ||
return nil | ||
Stebalien marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
type streamWrapper struct { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,7 +7,6 @@ import ( | |
"sync" | ||
"time" | ||
|
||
"github.com/libp2p/go-eventbus" | ||
ic "github.com/libp2p/go-libp2p-core/crypto" | ||
"github.com/libp2p/go-libp2p-core/event" | ||
"github.com/libp2p/go-libp2p-core/helpers" | ||
|
@@ -17,6 +16,7 @@ import ( | |
"github.com/libp2p/go-libp2p-core/peerstore" | ||
"github.com/libp2p/go-libp2p-core/protocol" | ||
|
||
"github.com/libp2p/go-eventbus" | ||
pb "github.com/libp2p/go-libp2p/p2p/protocol/identify/pb" | ||
|
||
ggio "github.com/gogo/protobuf/io" | ||
|
@@ -71,7 +71,8 @@ type IDService struct { | |
Host host.Host | ||
UserAgent string | ||
|
||
ctx context.Context | ||
ctx context.Context | ||
ctxCancel context.CancelFunc | ||
|
||
// connections undergoing identification | ||
// for wait purposes | ||
|
@@ -94,7 +95,7 @@ type IDService struct { | |
|
||
// NewIDService constructs a new *IDService and activates it by | ||
// attaching its stream handler to the given host.Host. | ||
func NewIDService(ctx context.Context, h host.Host, opts ...Option) *IDService { | ||
func NewIDService(h host.Host, opts ...Option) *IDService { | ||
var cfg config | ||
for _, opt := range opts { | ||
opt(&cfg) | ||
|
@@ -105,13 +106,15 @@ func NewIDService(ctx context.Context, h host.Host, opts ...Option) *IDService { | |
userAgent = cfg.userAgent | ||
} | ||
|
||
hostCtx, cancel := context.WithCancel(context.Background()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. being able to have the ID service run within a larger context continues to seem valuable. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't see why. It's a separate service and exposes a What are your concerns here ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if a consumer is spinning up a bunch of separate services, it's nice to have them all wired up to a central context, and then cancel that context to shut things down, rather than explicitly calling close on each service. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see where you are coming from but we've had problems in the past using contexts for cancellation across service boundaries. Please take a look at libp2p/go-libp2p-kbucket#50 (comment) & the linked discussions in that comment. The most important benefit to me is being able to control the order in which the services, sub-components etc. are shut down. |
||
s := &IDService{ | ||
Host: h, | ||
UserAgent: userAgent, | ||
|
||
ctx: ctx, | ||
ctx: hostCtx, | ||
ctxCancel: cancel, | ||
currid: make(map[network.Conn]chan struct{}), | ||
observedAddrs: NewObservedAddrSet(ctx), | ||
observedAddrs: NewObservedAddrSet(hostCtx), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we also have a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Some day, yes. But not today. |
||
} | ||
|
||
// handle local protocol handler updates, and push deltas to peers. | ||
|
@@ -143,6 +146,12 @@ func NewIDService(ctx context.Context, h host.Host, opts ...Option) *IDService { | |
return s | ||
} | ||
|
||
// Close shuts down the IDService | ||
func (ids *IDService) Close() error { | ||
ids.ctxCancel() | ||
return nil | ||
} | ||
|
||
func (ids *IDService) handleEvents() { | ||
sub := ids.subscription | ||
defer func() { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code has been removed.