Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New interface for EventSubscriptions #491

Open
rargulati opened this issue Jan 8, 2019 · 4 comments
Open

New interface for EventSubscriptions #491

rargulati opened this issue Jan 8, 2019 · 4 comments

Comments

@rargulati
Copy link
Contributor

PR #474 introduces a way for us to unsubscribe from callbacks. Typically our callbacks are attached to long running processes (ie. Ethereum Watchers: goroutines which parse log files and signal back on a channel). Unfortunately, in the implementation, there's quite a bit of code duplication (generics!) and the unsubscribe isn't deferred.

@Shadowfiend proposed the following:

type <T>EventSubscription interface {
    func OnEvent(func (T)) EventSubscription // unsubscribe just runs the unsubscribe
    func OnEventContext(func (T), context *context.Context) // unsubscribe unsubscribes + handles context
    func Pipe(chan T) EventSubscription // unsubscribe does the unsubscribe + closes chan
    func PipeContext(chan T, context *context.Context) // unsubscribe unsubscribes + closes chan + handles context
}

type EventSubscription interface {
    func Unsubscribe()
}

type Chain interface {
    ...
    DKGResultPublication() DKGResultPublicationEventSubscription
}

subscription := chainRelay.DKGResultPublication().OnEvent(...)
defer subscription.Unsubscribe()

publishChan := make(chan *event.DKGResultPublication)
subscription := chainRelay.DKGResultPublication().Pipe(publishChan)
defer subscription.Unsubscribe()

This presents a more ideal way forward (less duplication, deferrable unsubscribes, etc). I attempted this with a bit of code generation, but gave up half way through (time). Let's pick this up again.

@pdyraga
Copy link
Member

pdyraga commented Mar 13, 2019

During work on this card, we need to ensure our subscriptions are handled properly. We have some technical debt to pay in Ethereum integration layer - a lot of code duplication, some hairy mutex synchronization, and some complex conditions preventing us from writing to closed channels. It all needs to be sorted out during the work on this card.

It covers not only designing and implementing the mechanism but also porting all of the existing code to the new solution.

Some potential inspirations: https://blog.golang.org/pipelines

@pdyraga
Copy link
Member

pdyraga commented Mar 14, 2019

  • PoC - 3 days
  • code generation - 2 days
  • concurrent test - 3 days
  • ethereum API update
  • local API update
  • review all subscriptions, and make sure we unsubscribe where needed - 5 days for 3 items above

~ 13 days

@Shadowfiend
Copy link
Contributor

Picked this guy up. Also, possible related issue.

@pdyraga
Copy link
Member

pdyraga commented Oct 22, 2019

After 12+ hours of executing relay requests locally, I observed a fatal failure of the client:

18:54:58.095  INFO keep-relay: [member:1,channel:579d0,state:*entry.entrySubmissionState] transitioning to a new state at block: [1308] machine.go:132
18:54:58.103  INFO keep-entry: [member:3] waiting for block [1314] to submit submission.go:123
18:54:58.104  INFO keep-entry: [member:5] waiting for block [1320] to submit submission.go:123
18:54:58.104  INFO keep-entry: [member:1] waiting for block [1308] to submit submission.go:123
18:54:58.176  INFO keep-entry: [member:1] submitting relay entry [61152635787414383325787830737202178302596509978190241205488548435566954490300] on behalf of group [[156 40 248 74 9 203 234 18 81 157 111 165 216 64 28 223 29 28 187 194 135 137 156 29 108 226 13 127 1 106 234 9 13 136 248 230 75 57 11 152 162 145 250 234 201 79 250 223 72 219 137 104 251 238 195 5 223 152 181 28 75 37 230 234]] submission.go:83
18:55:11.832  INFO keep-entry: [member:5] leaving; relay entry submitted by other member submission.go:103
18:55:11.832  INFO keep-entry: [member:3] submitting relay entry [61152635787414383325787830737202178302596509978190241205488548435566954490300] on behalf of group [[156 40 248 74 9 203 234 18 81 157 111 165 216 64 28 223 29 28 187 194 135 137 156 29 108 226 13 127 1 106 234 9 13 136 248 230 75 57 11 152 162 145 250 234 201 79 250 223 72 219 137 104 251 238 195 5 223 152 181 28 75 37 230 234]] submission.go:83
18:55:11.832  INFO keep-relay: [member:5,channel:579d0,state:*entry.entrySubmissionState] transitioned to new state machine.go:171
18:55:11.832  INFO keep-relay: [member:5,channel:579d0,state:*entry.entrySubmissionState] reached final state at block: [1308] machine.go:99
panic: send on closed channel

goroutine 2866 [running]:
github.com/keep-network/keep-core/pkg/beacon/relay/entry.(*relayEntrySubmitter).submitRelayEntry.func1(0xc00067c6e0)
	/Users/piotr/go/src/github.com/keep-network/keep-core/pkg/beacon/relay/entry/submission.go:45 +0x3e
github.com/keep-network/keep-core/pkg/chain/ethereum.(*ethereumChain).OnSignatureSubmitted.func1(0xc0007085e0, 0xc00012a340, 0x40, 0x40, 0xc000708680, 0xc0007086a0, 0x522)
	/Users/piotr/go/src/github.com/keep-network/keep-core/pkg/chain/ethereum/ethereum.go:236 +0x13f
github.com/keep-network/keep-core/pkg/chain/gen/contract.(*KeepRandomBeaconOperator).WatchSignatureSubmitted.func1(0xc0008c49c0, 0xc00078e150, 0xc0006701c0, 0x4b624c0, 0xc0005df280, 0x4a81158)
	/Users/piotr/go/src/github.com/keep-network/keep-core/pkg/chain/gen/contract/KeepRandomBeaconOperator.go:2315 +0x80
created by github.com/keep-network/keep-core/pkg/chain/gen/contract.(*KeepRandomBeaconOperator).WatchSignatureSubmitted
	/Users/piotr/go/src/github.com/keep-network/keep-core/pkg/chain/gen/contract/KeepRandomBeaconOperator.go:2305 +0x24f

Peers connected to that one lost him:

Connection to bootstrap peer [16Uiu2HAm82kFx5PMHWUARfKhPhg9gQVdasiySjZaPPsNUjuE59TV] failed. Retrying...
Connection to bootstrap peer [16Uiu2HAm82kFx5PMHWUARfKhPhg9gQVdasiySjZaPPsNUjuE59TV] failed. Retrying...
Connection to bootstrap peer [16Uiu2HAm82kFx5PMHWUARfKhPhg9gQVdasiySjZaPPsNUjuE59TV] failed. Retrying...
Connection to bootstrap peer [16Uiu2HAm82kFx5PMHWUARfKhPhg9gQVdasiySjZaPPsNUjuE59TV] failed. Retrying...

nkuba added a commit to keep-network/keep-common that referenced this issue Jan 28, 2021
New Ethereum event subscription API, background event pull loop

# Overview 

There are two major changes to Ethereum subscription API proposed here:
- new subscription API with `OnEvent` and `Pipe` functions,
- background monitoring loop pulling past events from the chain.

The first change should allow implementing some handler logic easier and to avoid complex logic leading to bugs such as keep-network/keep-core#1333 or keep-network/keep-core#2052.

The second change should improve client responsiveness for operators running their nodes against Ethereum deployments that are not very reliable on the event delivery front.
  
This code has been integrated with ECDSA keep client in `keep-ecdsa` repository and can be tested there on the branch `pipe-it` (keep-network/keep-ecdsa#671).

# New API

Event subscription API has been refactored to resemble the proposition from keep-network/keep-core#491. The new event subscription mechanism allows installing event callback handler function with `OnEvent` function as well as piping events from a subscription to a channel with `Pipe` function.

Example usage of `OnEvent`:
```
handlerFn := func(
    submittingMember common.Address,
    conflictingPublicKey []byte,
    blockNumber uint64,
) {
   // (...)
}
subscription := keepContract.ConflictingPublicKeySubmitted(
	nil, // default SubscribeOpts
	nil, // no filtering on submitting member
).OnEvent(handlerFn)
```

The same subscription but with a `Pipe`:
```
sink := make(chan *abi.BondedECDSAKeepConflictingPublicKeySubmitted)

subscription := keepContract.ConflictingPublicKeySubmitted(
	nil, // default SubscribeOpts
	nil, // no filtering on submitting member
).Pipe(sink)
```

Currently, all our event subscriptions use function handlers. While it is convenient in some cases, in some other cases it is the opposite. For example, `OnBondedECDSAKeepCreated` handler in ECDSA client works perfectly fine as a function. It triggers the protocol and does not have to constantly monitor the state of the chain. On the other hand, `OnDKGResultSubmitted` handler from the beacon client needs to monitor the chain and exit the process of event publication in case another node has published the result. In this case, the code could be better structured with a channel-based subscription that would allow listening for block counter events and events from DKG result submitted subscription in one for-select loop.

# Background monitoring loop

Some nodes in the network are running against Ethereum setups that are not particularly reliable in delivering events. Events are not delivered, nodes are not starting key-generation, or are not participating in redemption signing. Another problem is the stability of the event subscription mechanism (see #62). If the web socket connection is dropped too often, the resubscription mechanism is not enough to receive events emitted when the connection was in a weird, stale state.

To address this problem, we introduce a background loop periodically pulling past events from the chain next to a regular `watchLogs` subscription. How often events are pulled and how many blocks are taken into account can be configured with `SubscribeOpts` parameters. 

This way, even if the event was lost by `watchLogs` subscription for whatever reason, it should be pulled by a background monitoring loop later. An extremely important implication of this change is that handlers should have a logic in place allowing them to de-duplicate received events even if a lot of time passed between receiving the original event and the duplicate.

I have been experimenting with various options here, including de-duplication events in the chain implementation layer, but none of them proved to be successful as the correct de-duplication algorithm requires domain knowledge about a certain type of an event and in what circumstances identical event emitted later should or should not be identified as a duplicate.

De-duplicator implementations should be added to `keep-core` and `keep-ecdsa` clients and are out of the scope of `keep-common` and this PR.
nkuba added a commit to keep-network/keep-ecdsa that referenced this issue Jan 29, 2021
Incorporating the new Ethereum event subscription API and background event pull loop

Refs #680

Depends on #663
Depends on keep-network/keep-common#63

# Overview 

There are two major changes to Ethereum subscriptions proposed here, as a result of the changes implemented in keep-network/keep-common#63:
- new subscription API with `OnEvent` and `Pipe` functions,
- background monitoring loop pulling past events from the chain.

The first change should allow implementing some handler logic easier and to avoid complex logic leading to bugs such as keep-network/keep-core#1333 or keep-network/keep-core#2052. ECDSA keep client is currently not that much affected by this chain but this may change when proper event deduplication will be implemented.

The second change should improve client responsiveness for operators running their nodes against Ethereum deployments that are not very reliable on the event delivery front. This should hopefully improve SLA of some mainnet operators of ECDSA client.


# New API

Event subscription API has been refactored in keep-network/keep-common#63 to resemble the proposition from keep-network/keep-core#491. The new event subscription mechanism allows installing event callback handler function with `OnEvent` function as well as piping events from a subscription to a channel with `Pipe` function.

Example usage of `OnEvent`:
```
handlerFn := func(
    submittingMember common.Address,
    conflictingPublicKey []byte,
    blockNumber uint64,
) {
   // (...)
}
subscription := keepContract.ConflictingPublicKeySubmitted(
	nil, // default SubscribeOpts
	nil, // no filtering on submitting member
).OnEvent(handlerFn)
```

The same subscription but with a `Pipe`:
```
sink := make(chan *abi.BondedECDSAKeepConflictingPublicKeySubmitted)

subscription := keepContract.ConflictingPublicKeySubmitted(
	nil, // default SubscribeOpts
	nil, // no filtering on submitting member
).Pipe(sink)
```

Currently, all our event subscriptions in ECDSA client use function handlers and the code has been adjusted to the new API but is still using function handlers. This may or may not change in the future depending on an individual use case.

# Background monitoring loop

Some nodes in the network are running against Ethereum setups that are not particularly reliable in delivering events. Events are not delivered, nodes are not starting key-generation, or are not participating in redemption signing. Another problem is the stability of the event subscription mechanism (see #663). If the web socket connection is dropped too often, the resubscription mechanism is not enough to receive events emitted when the connection was in a weird, stale state.

To address this problem, keep-network/keep-common#63 introduces a background loop periodically pulling past events from the chain next to a regular `watchLogs` subscription. How often events are pulled and how many blocks are taken into account can be configured with `SubscribeOpts` parameters. 

This way, even if the event was lost by `watchLogs` subscription for whatever reason, it should be pulled by a background monitoring loop later. An extremely important implication of this change is that handlers should have logic in place allowing them to de-duplicate received events even if a lot of time passed between receiving the original event and the duplicate. 

This part has been implemented in `event_deduplicator.go` for four events:
- opened keep (key generation requested),
- redemption signature requested,
- keep closed,
- keep archived.

tBTC-specific events are covered separately in #679.

The only event that is not covered with deduplication is conflicting public key submitted event but it does not look to me as something that needs to be deduplicated as the subscription is canceled immediately after this event is received.
@pdyraga pdyraga removed this from the v1.5.0 milestone Jun 2, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants