From cbf0a606ccb22d87e95ed55204bcfbc7e3b9693d Mon Sep 17 00:00:00 2001 From: ptrus Date: Wed, 29 Apr 2020 15:28:47 +0200 Subject: [PATCH] go/epochtime: add WatchLatestEpoch method Pubsub framework is extended to support subscriptions based on bounded ring channels. Bounded subscriptions are used in the new epochtime WatchLatestEpoch method. The method is similar to the existing WatchEpochs method, with the change that unread epochs get overridden with latest epoch. Registration worker is changed to use the WatchLatestEpoch method to prevent trying to register for old epochs in case the worker falls behind. --- .changelog/2876.bugfix.md | 5 + .changelog/2876.internal.1.md | 1 + .changelog/2876.internal.2.md | 4 + go/common/pubsub/pubsub.go | 41 ++++--- go/common/pubsub/pubsub_test.go | 107 ++++++++++++++---- .../tendermint/epochtime/epochtime.go | 10 +- .../epochtime_mock/epochtime_mock.go | 10 +- .../tendermint/keymanager/keymanager.go | 2 +- go/consensus/tendermint/registry/registry.go | 2 +- go/consensus/tendermint/roothash/roothash.go | 2 +- .../tendermint/scheduler/scheduler.go | 2 +- go/epochtime/api/api.go | 7 ++ go/epochtime/tests/mock_tester.go | 16 +++ go/oasis-node/cmd/debug/consim/timesource.go | 4 + go/runtime/committee/nodes.go | 2 +- go/worker/registration/worker.go | 2 +- 16 files changed, 176 insertions(+), 41 deletions(-) create mode 100644 .changelog/2876.bugfix.md create mode 100644 .changelog/2876.internal.1.md create mode 100644 .changelog/2876.internal.2.md diff --git a/.changelog/2876.bugfix.md b/.changelog/2876.bugfix.md new file mode 100644 index 00000000000..8a84f1aa930 --- /dev/null +++ b/.changelog/2876.bugfix.md @@ -0,0 +1,5 @@ +worker/registration: use WatchLatestEpoch when watching for registrations + +By using WatchLatestEpoch the worker will always try to register for latest +known epoch, which should prevent cases where registration worker fell behind +and was trying to register for past epochs. diff --git a/.changelog/2876.internal.1.md b/.changelog/2876.internal.1.md new file mode 100644 index 00000000000..dc18fe192cc --- /dev/null +++ b/.changelog/2876.internal.1.md @@ -0,0 +1 @@ +go/common/pubsub: support subscriptions based on bounded ring channels diff --git a/.changelog/2876.internal.2.md b/.changelog/2876.internal.2.md new file mode 100644 index 00000000000..2a0ce378276 --- /dev/null +++ b/.changelog/2876.internal.2.md @@ -0,0 +1,4 @@ +go/epochtime: add WatchLatestEpoch method + +The method is similar to the existing WatchEpochs method, with the change that +unread epochs get overridden with latest epoch. diff --git a/go/common/pubsub/pubsub.go b/go/common/pubsub/pubsub.go index 9275baca85b..6f04eb17f4d 100644 --- a/go/common/pubsub/pubsub.go +++ b/go/common/pubsub/pubsub.go @@ -13,7 +13,7 @@ type broadcastedValue struct { } type cmdCtx struct { - ch *channels.InfiniteChannel + ch channels.Channel errCh chan error onSubscribeHook OnSubscribeHook @@ -46,7 +46,7 @@ func NewContextSubscription(ctx context.Context) (context.Context, ClosableSubsc // Subscription is a Broker subscription instance. type Subscription struct { b *Broker - ch *channels.InfiniteChannel + ch channels.Channel } // Untyped returns the subscription's untyped output. Effort should be @@ -77,24 +77,34 @@ func (s *Subscription) Close() { // Broker is a pub/sub broker instance. type Broker struct { - subscribers map[*channels.InfiniteChannel]bool + subscribers map[channels.Channel]bool cmdCh chan *cmdCtx - broadcastCh *channels.InfiniteChannel + broadcastCh channels.Channel lastBroadcasted *broadcastedValue onSubscribeHook OnSubscribeHook } // OnSubscribeHook is the on-subscribe callback hook prototype. -type OnSubscribeHook func(*channels.InfiniteChannel) +type OnSubscribeHook func(channels.Channel) // Subscribe subscribes to the Broker's broadcasts, and returns a // subscription handle that can be used to receive broadcasts. // // Note: The returned subscription's channel will have an unbounded -// capacity. +// capacity, use SubscribeBuffered to use a bounded ring channel. func (b *Broker) Subscribe() *Subscription { - return b.SubscribeEx(nil) + return b.SubscribeEx(int64(channels.Infinity), nil) +} + +// SubscribeBuffered subscribes to the Broker's broadcasts, and returns a +// subscription handle that can be used to receive broadcasts. +// +// Buffer controlls the capacity of a ring buffer - when buffer is full the +// oldest value will be discarded. In case buffer is negative (or zero) an +// unbounded channel is used. +func (b *Broker) SubscribeBuffered(buffer int64) *Subscription { + return b.SubscribeEx(buffer, nil) } // SubscribeEx subscribes to the Broker's broadcasts, and returns a @@ -102,12 +112,17 @@ func (b *Broker) Subscribe() *Subscription { // addition it also takes a per-subscription on-subscribe callback // hook. // -// Note: The returned subscription's channel will have an unbounded -// capacity. If there is a Broker wide hook set, it will be called +// Note: If there is a Broker wide hook set, it will be called // after the per-subscription hook is called. -func (b *Broker) SubscribeEx(onSubscribeHook OnSubscribeHook) *Subscription { +func (b *Broker) SubscribeEx(buffer int64, onSubscribeHook OnSubscribeHook) *Subscription { + var ch channels.Channel + if buffer <= 0 { + ch = channels.NewInfiniteChannel() + } else { + ch = channels.NewRingChannel(channels.BufferCap(buffer)) + } ctx := &cmdCtx{ - ch: channels.NewInfiniteChannel(), + ch: ch, errCh: make(chan error), onSubscribeHook: onSubscribeHook, isSubscribe: true, @@ -167,7 +182,7 @@ func (b *Broker) worker() { func NewBroker(pubLastOnSubscribe bool) *Broker { b := newBroker() if pubLastOnSubscribe { - b.onSubscribeHook = func(ch *channels.InfiniteChannel) { + b.onSubscribeHook = func(ch channels.Channel) { if b.lastBroadcasted != nil { ch.In() <- b.lastBroadcasted.v } @@ -192,7 +207,7 @@ func NewBrokerEx(onSubscribeHook OnSubscribeHook) *Broker { func newBroker() *Broker { return &Broker{ - subscribers: make(map[*channels.InfiniteChannel]bool), + subscribers: make(map[channels.Channel]bool), cmdCh: make(chan *cmdCtx), broadcastCh: channels.NewInfiniteChannel(), } diff --git a/go/common/pubsub/pubsub_test.go b/go/common/pubsub/pubsub_test.go index 6418afd3480..1840b286ef5 100644 --- a/go/common/pubsub/pubsub_test.go +++ b/go/common/pubsub/pubsub_test.go @@ -8,16 +8,20 @@ import ( "github.com/stretchr/testify/require" ) -const recvTimeout = 5 * time.Second +const ( + recvTimeout = 5 * time.Second + bufferSize = 5 +) func TestPubSub(t *testing.T) { - t.Run("Basic", testBasic) + t.Run("BasicInfinity", testBasicInfinity) + t.Run("BasicOverwriting", testBasicOverwriting) t.Run("PubLastOnSubscribe", testLastOnSubscribe) t.Run("SubscribeEx", testSubscribeEx) t.Run("NewBrokerEx", testNewBrokerEx) } -func testBasic(t *testing.T) { +func testBasicInfinity(t *testing.T) { broker := NewBroker(false) sub := broker.Subscribe() @@ -50,41 +54,104 @@ func testBasic(t *testing.T) { require.Len(t, broker.subscribers, 0, "Subscriber map, post Close()") } -func testLastOnSubscribe(t *testing.T) { - broker := NewBroker(true) - broker.Broadcast(23) +func testBasicOverwriting(t *testing.T) { + broker := NewBroker(false) - sub := broker.Subscribe() + sub := broker.SubscribeBuffered(bufferSize) typedCh := make(chan int) sub.Unwrap(typedCh) + // Test a single broadcast/receive. + broker.Broadcast(23) select { case v := <-typedCh: - require.Equal(t, 23, v, "Last Broadcast()") + require.Equal(t, 23, v, "Single Broadcast())") case <-time.After(recvTimeout): - t.Fatalf("Failed to receive value, last Broadcast() on Subscribe()") + t.Fatalf("Failed to receive value, initial Broadcast()") + } + + // Test the buffered nature of the overwriting channel. + for i := 0; i < bufferSize+10; i++ { + broker.Broadcast(i) + } + // Ensure we don't start reading before all messages are processed by the + // underlying channel. + time.Sleep(100 * time.Millisecond) + + // RingChannel prefers to write before buffering the items, so the first + // element will be instantly send to the output channel and removed from the + // buffer so it will not get overwritten. + expected := []int{ + 0, + } + for i := 10; i < bufferSize+10; i++ { + expected = append(expected, i) + } + for _, i := range expected { + select { + case v := <-typedCh: + require.Equal(t, i, v, "Buffered Broadcast()") + case <-time.After(recvTimeout): + t.Fatalf("Failed to receive value, buffered Broadcast()") + } + } + + require.NotPanics(t, func() { sub.Close() }, "Close()") + require.Len(t, broker.subscribers, 0, "Subscriber map, post Close()") +} + +func testLastOnSubscribe(t *testing.T) { + broker := NewBroker(true) + broker.Broadcast(23) + + for _, b := range []int64{ + int64(channels.Infinity), + bufferSize, + } { + sub := broker.SubscribeBuffered(b) + typedCh := make(chan int) + sub.Unwrap(typedCh) + + select { + case v := <-typedCh: + require.Equal(t, 23, v, "Last Broadcast()") + case <-time.After(recvTimeout): + t.Fatalf("Failed to receive value, last Broadcast() on Subscribe()") + } } } func testSubscribeEx(t *testing.T) { broker := NewBroker(false) - - var callbackCh *channels.InfiniteChannel - sub := broker.SubscribeEx(func(ch *channels.InfiniteChannel) { + var callbackCh channels.Channel + callback := func(ch channels.Channel) { callbackCh = ch - }) + } + + for _, b := range []int64{ + int64(channels.Infinity), + bufferSize, + } { + sub := broker.SubscribeEx(b, callback) + + require.NotNil(t, sub.ch, "Subscription, inner channel") + require.Equal(t, sub.ch, callbackCh, "Callback channel != Subscription, inner channel") + } - require.NotNil(t, sub.ch, "Subscription, inner channel") - require.Equal(t, sub.ch, callbackCh, "Callback channel != Subscription, inner channel") } func testNewBrokerEx(t *testing.T) { - var callbackCh *channels.InfiniteChannel - broker := NewBrokerEx(func(ch *channels.InfiniteChannel) { + var callbackCh channels.Channel + broker := NewBrokerEx(func(ch channels.Channel) { callbackCh = ch }) - sub := broker.Subscribe() - require.NotNil(t, sub.ch, "Subscription, inner channel") - require.Equal(t, sub.ch, callbackCh, "Callback channel != Subscription, inner channel") + for _, b := range []int64{ + int64(channels.Infinity), + bufferSize, + } { + sub := broker.SubscribeBuffered(b) + require.NotNil(t, sub.ch, "Subscription, inner channel") + require.Equal(t, sub.ch, callbackCh, "Callback channel != Subscription, inner channel") + } } diff --git a/go/consensus/tendermint/epochtime/epochtime.go b/go/consensus/tendermint/epochtime/epochtime.go index 07d4a9f329d..9837fb07e25 100644 --- a/go/consensus/tendermint/epochtime/epochtime.go +++ b/go/consensus/tendermint/epochtime/epochtime.go @@ -63,6 +63,14 @@ func (t *tendermintBackend) WatchEpochs() (<-chan api.EpochTime, *pubsub.Subscri return typedCh, sub } +func (t *tendermintBackend) WatchLatestEpoch() (<-chan api.EpochTime, *pubsub.Subscription) { + typedCh := make(chan api.EpochTime) + sub := t.notifier.SubscribeBuffered(1) + sub.Unwrap(typedCh) + + return typedCh, sub +} + func (t *tendermintBackend) StateToGenesis(ctx context.Context, height int64) (*api.Genesis, error) { now, err := t.GetEpoch(ctx, height) if err != nil { @@ -125,7 +133,7 @@ func New(ctx context.Context, service service.TendermintService, interval int64) base: base, epoch: base, } - r.notifier = pubsub.NewBrokerEx(func(ch *channels.InfiniteChannel) { + r.notifier = pubsub.NewBrokerEx(func(ch channels.Channel) { r.RLock() defer r.RUnlock() diff --git a/go/consensus/tendermint/epochtime_mock/epochtime_mock.go b/go/consensus/tendermint/epochtime_mock/epochtime_mock.go index 0a8cd7ac381..0e074cc2dbc 100644 --- a/go/consensus/tendermint/epochtime_mock/epochtime_mock.go +++ b/go/consensus/tendermint/epochtime_mock/epochtime_mock.go @@ -80,6 +80,14 @@ func (t *tendermintMockBackend) WatchEpochs() (<-chan api.EpochTime, *pubsub.Sub return typedCh, sub } +func (t *tendermintMockBackend) WatchLatestEpoch() (<-chan api.EpochTime, *pubsub.Subscription) { + typedCh := make(chan api.EpochTime) + sub := t.notifier.SubscribeBuffered(1) + sub.Unwrap(typedCh) + + return typedCh, sub +} + func (t *tendermintMockBackend) StateToGenesis(ctx context.Context, height int64) (*api.Genesis, error) { now, err := t.GetEpoch(ctx, height) if err != nil { @@ -232,7 +240,7 @@ func New(ctx context.Context, service service.TendermintService) (api.SetableBac service: service, querier: a.QueryFactory().(*app.QueryFactory), } - r.notifier = pubsub.NewBrokerEx(func(ch *channels.InfiniteChannel) { + r.notifier = pubsub.NewBrokerEx(func(ch channels.Channel) { r.RLock() defer r.RUnlock() diff --git a/go/consensus/tendermint/keymanager/keymanager.go b/go/consensus/tendermint/keymanager/keymanager.go index 8bc930e5dab..fcc7637b3ad 100644 --- a/go/consensus/tendermint/keymanager/keymanager.go +++ b/go/consensus/tendermint/keymanager/keymanager.go @@ -145,7 +145,7 @@ func New(ctx context.Context, service service.TendermintService) (api.Backend, e service: service, querier: a.QueryFactory().(*app.QueryFactory), } - tb.notifier = pubsub.NewBrokerEx(func(ch *channels.InfiniteChannel) { + tb.notifier = pubsub.NewBrokerEx(func(ch channels.Channel) { statuses, err := tb.GetStatuses(ctx, consensus.HeightLatest) if err != nil { tb.logger.Error("status notifier: unable to get a list of statuses", diff --git a/go/consensus/tendermint/registry/registry.go b/go/consensus/tendermint/registry/registry.go index aff6b27cc29..2972045c80f 100644 --- a/go/consensus/tendermint/registry/registry.go +++ b/go/consensus/tendermint/registry/registry.go @@ -414,7 +414,7 @@ func New(ctx context.Context, service service.TendermintService) (api.Backend, e nodeNotifier: pubsub.NewBroker(false), nodeListNotifier: pubsub.NewBroker(true), } - tb.runtimeNotifier = pubsub.NewBrokerEx(func(ch *channels.InfiniteChannel) { + tb.runtimeNotifier = pubsub.NewBrokerEx(func(ch channels.Channel) { wr := ch.In() runtimes, err := tb.GetRuntimes(ctx, consensus.HeightLatest) if err != nil { diff --git a/go/consensus/tendermint/roothash/roothash.go b/go/consensus/tendermint/roothash/roothash.go index 308a34f008d..eff1384a5f6 100644 --- a/go/consensus/tendermint/roothash/roothash.go +++ b/go/consensus/tendermint/roothash/roothash.go @@ -104,7 +104,7 @@ func (tb *tendermintBackend) getLatestBlockAt(ctx context.Context, id common.Nam func (tb *tendermintBackend) WatchBlocks(id common.Namespace) (<-chan *api.AnnotatedBlock, *pubsub.Subscription, error) { notifiers := tb.getRuntimeNotifiers(id) - sub := notifiers.blockNotifier.SubscribeEx(func(ch *channels.InfiniteChannel) { + sub := notifiers.blockNotifier.SubscribeEx(-1, func(ch channels.Channel) { // Replay the latest block if it exists. notifiers.Lock() defer notifiers.Unlock() diff --git a/go/consensus/tendermint/scheduler/scheduler.go b/go/consensus/tendermint/scheduler/scheduler.go index 7d3b85cf00d..23f492756c2 100644 --- a/go/consensus/tendermint/scheduler/scheduler.go +++ b/go/consensus/tendermint/scheduler/scheduler.go @@ -182,7 +182,7 @@ func New(ctx context.Context, service service.TendermintService) (api.Backend, e service: service, querier: a.QueryFactory().(*app.QueryFactory), } - tb.notifier = pubsub.NewBrokerEx(func(ch *channels.InfiniteChannel) { + tb.notifier = pubsub.NewBrokerEx(func(ch channels.Channel) { currentCommittees, err := tb.getCurrentCommittees() if err != nil { tb.logger.Error("couldn't get current committees. won't send them. good luck to the subscriber", diff --git a/go/epochtime/api/api.go b/go/epochtime/api/api.go index aba86b7cd12..32f15e55eb3 100644 --- a/go/epochtime/api/api.go +++ b/go/epochtime/api/api.go @@ -36,6 +36,13 @@ type Backend interface { // Upon subscription the current epoch is sent immediately. WatchEpochs() (<-chan EpochTime, *pubsub.Subscription) + // WatchLatestEpoch returns a channel that produces a stream of messages on + // epoch transitions. If an epoch transition hapens before previous epoch + // is read from channel, the old epochs is overwritten. + // + // Upon subscription the current epoch is sent immediately. + WatchLatestEpoch() (<-chan EpochTime, *pubsub.Subscription) + // StateToGenesis returns the genesis state at the specified block height. StateToGenesis(ctx context.Context, height int64) (*Genesis, error) } diff --git a/go/epochtime/tests/mock_tester.go b/go/epochtime/tests/mock_tester.go index 26132e9b805..9db7cb4d394 100644 --- a/go/epochtime/tests/mock_tester.go +++ b/go/epochtime/tests/mock_tester.go @@ -37,6 +37,15 @@ func EpochtimeSetableImplementationTest(t *testing.T, backend api.Backend) { t.Fatalf("failed to receive current epoch on WatchEpochs") } + latestCh, subCh := timeSource.WatchLatestEpoch() + defer subCh.Close() + select { + case e = <-latestCh: + require.Equal(epoch, e, "WatchLatestEpoch initial") + case <-time.After(recvTimeout): + t.Fatalf("failed to receive current epoch on WatchLatestEpoch") + } + epoch++ err = timeSource.SetEpoch(context.Background(), epoch) require.NoError(err, "SetEpoch") @@ -48,6 +57,13 @@ func EpochtimeSetableImplementationTest(t *testing.T, backend api.Backend) { t.Fatalf("failed to receive epoch notification after transition") } + select { + case e = <-latestCh: + require.Equal(epoch, e, "WatchLatestEpoch after set") + case <-time.After(recvTimeout): + t.Fatalf("failed to receive latest epoch after transition") + } + e, err = timeSource.GetEpoch(context.Background(), consensus.HeightLatest) require.NoError(err, "GetEpoch after set") require.Equal(epoch, e, "GetEpoch after set, epoch") diff --git a/go/oasis-node/cmd/debug/consim/timesource.go b/go/oasis-node/cmd/debug/consim/timesource.go index 664365d646f..eb5d64d7bc5 100644 --- a/go/oasis-node/cmd/debug/consim/timesource.go +++ b/go/oasis-node/cmd/debug/consim/timesource.go @@ -37,6 +37,10 @@ func (b *simTimeSource) WatchEpochs() (<-chan api.EpochTime, *pubsub.Subscriptio panic("consim/epochtime: WatchEpochs not supported") } +func (b *simTimeSource) WatchLatestEpoch() (<-chan api.EpochTime, *pubsub.Subscription) { + panic("consim/epochtime: WatchLatestEpoch not supported") +} + func (b *simTimeSource) StateToGenesis(ctx context.Context, height int64) (*api.Genesis, error) { // WARNING: This ignores the height because it's only used for the final // dump. diff --git a/go/runtime/committee/nodes.go b/go/runtime/committee/nodes.go index 25590916ce9..682d482f8aa 100644 --- a/go/runtime/committee/nodes.go +++ b/go/runtime/committee/nodes.go @@ -239,7 +239,7 @@ func NewNodeDescriptorWatcher(ctx context.Context, registry registry.Backend) (N ctx: ctx, logger: logging.GetLogger("runtime/committee/nodedescriptorwatcher"), } - nw.notifier = pubsub.NewBrokerEx(func(ch *channels.InfiniteChannel) { + nw.notifier = pubsub.NewBrokerEx(func(ch channels.Channel) { nw.RLock() defer nw.RUnlock() diff --git a/go/worker/registration/worker.go b/go/worker/registration/worker.go index 455b0d5adc4..624255caa8c 100644 --- a/go/worker/registration/worker.go +++ b/go/worker/registration/worker.go @@ -206,7 +206,7 @@ func (w *Worker) registrationLoop() { // nolint: gocyclo // (re-)register the node on each epoch transition. This doesn't // need to be strict block-epoch time, since it just serves to // extend the node's expiration. - ch, sub := w.epochtime.WatchEpochs() + ch, sub := w.epochtime.WatchLatestEpoch() defer sub.Close() regFn := func(epoch epochtime.EpochTime, hook RegisterNodeHook, retry bool) error {