Skip to content
This repository has been archived by the owner on Oct 22, 2024. It is now read-only.

Commit

Permalink
Overhaul channels & shutdown sequence to avoid deadlocks (paritytech#371
Browse files Browse the repository at this point in the history
)

* Make Sub writer wait for Eth listener to shut down

* Improve channel use in syncer

* Make listener wait for syncer to shut down

* Test that WorkerPool stops cleanly

* Consume single payloads channel in parachain writer

* getDefaultLogger -> defaultLogger
  • Loading branch information
Rizziepit authored Apr 26, 2021
1 parent c5e91a7 commit a0ae815
Show file tree
Hide file tree
Showing 7 changed files with 221 additions and 131 deletions.
32 changes: 28 additions & 4 deletions relayer/chain/ethereum/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type Syncer struct {
headers chan<- *gethTypes.Header
loader HeaderLoader
log *logrus.Entry
newHeaders chan *gethTypes.Header
oldHeaders chan *gethTypes.Header
}

func NewSyncer(descendantsUntilFinal uint64, loader HeaderLoader, headers chan<- *gethTypes.Header, log *logrus.Entry) *Syncer {
Expand All @@ -41,6 +43,8 @@ func NewSyncer(descendantsUntilFinal uint64, loader HeaderLoader, headers chan<-
headers: headers,
loader: loader,
log: log,
newHeaders: nil,
oldHeaders: nil,
}
}

Expand All @@ -49,24 +53,44 @@ func (s *Syncer) StartSync(ctx context.Context, eg *errgroup.Group, initBlockHei
fetchFinalizedDone: false,
height: 0,
}
s.newHeaders = make(chan *gethTypes.Header)
s.oldHeaders = make(chan *gethTypes.Header)

eg.Go(func() error {
return s.pollNewHeaders(ctx, lbi)
err := s.pollNewHeaders(ctx, lbi)
close(s.newHeaders)
return err
})

lbi.Lock()
defer lbi.Unlock()
latestHeader, err := s.loader.HeaderByNumber(ctx, nil)
if err != nil {
s.log.WithError(err).Error("Failed to retrieve latest header")
close(s.headers)
return err
}
if latestHeader.Number.Uint64() > lbi.height {
lbi.height = latestHeader.Number.Uint64()
}

eg.Go(func() error {
return s.fetchFinalizedHeaders(ctx, initBlockHeight, lbi)
err := s.fetchFinalizedHeaders(ctx, initBlockHeight, lbi)
close(s.oldHeaders)
return err
})

eg.Go(func() error {
for header := range s.oldHeaders {
s.headers <- header
}

for header := range s.newHeaders {
s.headers <- header
}

close(s.headers)
return nil
})

return nil
Expand Down Expand Up @@ -106,7 +130,7 @@ func (s *Syncer) fetchFinalizedHeaders(ctx context.Context, initBlockHeight uint
case <-ctx.Done():
return ctx.Err()
default:
s.headers <- header
s.oldHeaders <- header
}
syncedUpUntil++
}
Expand Down Expand Up @@ -181,7 +205,7 @@ func (s *Syncer) forwardAncestry(ctx context.Context, hash gethCommon.Hash, olde
}
}

s.headers <- item.Header
s.newHeaders <- item.Header
item.Forwarded = true
return nil
}
Expand Down
116 changes: 66 additions & 50 deletions relayer/workers/ethrelayer/ethereum-listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,22 @@ import (
"github.com/snowfork/polkadot-ethereum/relayer/contracts/outbound"
)

const MaxMessagesPerSend = 10

// EthereumListener streams the Ethereum blockchain for application events
type EthereumListener struct {
config *ethereum.Config
conn *ethereum.Connection
basicOutboundChannel *outbound.BasicOutboundChannel
incentivizedOutboundChannel *outbound.IncentivizedOutboundChannel
mapping map[common.Address]string
messages chan<- []chain.Message
headers chan<- chain.Header
payloads chan<- ParachainPayload
headerSyncer *syncer.Syncer
log *logrus.Entry
}

func NewEthereumListener(
config *ethereum.Config,
conn *ethereum.Connection,
messages chan<- []chain.Message,
headers chan<- chain.Header,
payloads chan<- ParachainPayload,
log *logrus.Entry,
) *EthereumListener {
return &EthereumListener{
Expand All @@ -47,63 +44,81 @@ func NewEthereumListener(
basicOutboundChannel: nil,
incentivizedOutboundChannel: nil,
mapping: make(map[common.Address]string),
messages: messages,
headers: headers,
payloads: payloads,
headerSyncer: nil,
log: log,
}
}

func (li *EthereumListener) Start(cxt context.Context, eg *errgroup.Group, initBlockHeight uint64, descendantsUntilFinal uint64) error {
closeWithError := func(err error) error {
li.log.Info("Shutting down listener...")
close(li.payloads)
return err
}

hcs, err := ethereum.NewHeaderCacheState(
eg,
initBlockHeight,
&ethereum.DefaultBlockLoader{Conn: li.conn},
nil,
)
if err != nil {
return err
return closeWithError(err)
}

basicOutboundChannel, err := outbound.NewBasicOutboundChannel(common.HexToAddress(li.config.Channels.Basic.Outbound), li.conn.GetClient())
if err != nil {
return err
return closeWithError(err)
}
li.basicOutboundChannel = basicOutboundChannel

incentivizedOutboundChannel, err := outbound.NewIncentivizedOutboundChannel(common.HexToAddress(li.config.Channels.Incentivized.Outbound), li.conn.GetClient())
if err != nil {
return err
return closeWithError(err)
}
li.incentivizedOutboundChannel = incentivizedOutboundChannel

li.mapping[common.HexToAddress(li.config.Channels.Basic.Outbound)] = "BasicInboundChannel.submit"
li.mapping[common.HexToAddress(li.config.Channels.Incentivized.Outbound)] = "IncentivizedInboundChannel.submit"

headersIn := make(chan *gethTypes.Header, 5)
li.headerSyncer = syncer.NewSyncer(
descendantsUntilFinal,
syncer.NewHeaderLoader(li.conn.GetClient()),
headersIn,
li.log,
)

eg.Go(func() error {
err := li.pollEventsAndHeaders(cxt, initBlockHeight, descendantsUntilFinal, hcs)
if li.messages != nil {
close(li.messages)
err := li.processEventsAndHeaders(cxt, initBlockHeight, descendantsUntilFinal, headersIn, hcs)

// Ensures the context is canceled so that the channel below is
// closed by the syncer
eg.Go(func() error { return err })

// Avoid deadlock if the syncer is still trying to send a header
for range headersIn {
li.log.Debug("Discarded header")
}
close(li.headers)
return err

return closeWithError(err)
})

return nil
}

func (li *EthereumListener) pollEventsAndHeaders(
func (li *EthereumListener) processEventsAndHeaders(
ctx context.Context,
initBlockHeight uint64,
descendantsUntilFinal uint64,
headers <-chan *gethTypes.Header,
hcs *ethereum.HeaderCacheState,
) error {
headers := make(chan *gethTypes.Header, 5)
headerEg, headerCtx := errgroup.WithContext(ctx)

headerSyncer := syncer.NewSyncer(descendantsUntilFinal, syncer.NewHeaderLoader(li.conn.GetClient()), headers, li.log)

li.log.Info("Syncing headers starting...")
err := headerSyncer.StartSync(headerCtx, headerEg, initBlockHeight-1)
err := li.headerSyncer.StartSync(headerCtx, headerEg, initBlockHeight-1)
if err != nil {
li.log.WithError(err).Error("Failed to start header sync")
return err
Expand All @@ -112,23 +127,22 @@ func (li *EthereumListener) pollEventsAndHeaders(
for {
select {
case <-ctx.Done():
li.log.Info("Shutting down listener...")
return ctx.Err()
case <-headerCtx.Done():
li.log.Info("Shutting down listener...")
return ctx.Err()
case gethheader := <-headers:
err := li.forwardHeader(hcs, gethheader)
if err != nil {
return err
return headerCtx.Err()
case gethheader, ok := <-headers:
if !ok {
return nil
}

if li.messages == nil {
li.log.Info("Not polling events since channel is nil")
header, err := li.makeOutgoingHeader(hcs, gethheader)
if err != nil {
return err
}

// Don't attempt to forward events prior to genesis block
if descendantsUntilFinal > gethheader.Number.Uint64() {
li.payloads <- ParachainPayload{header: header}
continue
}

Expand All @@ -151,7 +165,12 @@ func (li *EthereumListener) pollEventsAndHeaders(
}
events = append(events, incentivizedEvents...)

li.forwardEvents(ctx, hcs, events)
messages, err := li.makeOutgoingMessages(ctx, hcs, events)
if err != nil {
return err
}

li.payloads <- ParachainPayload{header: header, messages: messages}
}
}
}
Expand Down Expand Up @@ -200,8 +219,12 @@ func (li *EthereumListener) queryIncentivizedEvents(contract *outbound.Incentivi
return events, nil
}

func (li *EthereumListener) forwardEvents(ctx context.Context, hcs *ethereum.HeaderCacheState, events []*etypes.Log) error {
messages := make([]chain.Message, len(events))
func (li *EthereumListener) makeOutgoingMessages(
ctx context.Context,
hcs *ethereum.HeaderCacheState,
events []*etypes.Log,
) ([]*chain.EthereumOutboundMessage, error) {
messages := make([]*chain.EthereumOutboundMessage, len(events))

for i, event := range events {
receiptTrie, err := hcs.GetReceiptTrie(ctx, event.BlockHash)
Expand All @@ -211,7 +234,7 @@ func (li *EthereumListener) forwardEvents(ctx context.Context, hcs *ethereum.Hea
"blockNumber": event.BlockNumber,
"txHash": event.TxHash.Hex(),
}).WithError(err).Error("Failed to get receipt trie for event")
return err
return nil, err
}

msg, err := ethereum.MakeMessageFromEvent(li.mapping, event, receiptTrie, li.log)
Expand All @@ -222,30 +245,26 @@ func (li *EthereumListener) forwardEvents(ctx context.Context, hcs *ethereum.Hea
"blockNumber": event.BlockNumber,
"txHash": event.TxHash.Hex(),
}).WithError(err).Error("Failed to generate message from ethereum event")
return err
return nil, err
}

messages[i] = msg
if (i+1)%MaxMessagesPerSend == 0 || i == len(events)-1 {
start := i + 1 - MaxMessagesPerSend
if i == len(events)-1 {
start = i - (i % MaxMessagesPerSend)
}
li.messages <- messages[start : i+1]
}
}

return nil
return messages, nil
}

func (li *EthereumListener) forwardHeader(hcs *ethereum.HeaderCacheState, gethheader *gethTypes.Header) error {
func (li *EthereumListener) makeOutgoingHeader(
hcs *ethereum.HeaderCacheState,
gethheader *gethTypes.Header,
) (*chain.Header, error) {
cache, err := hcs.GetEthashproofCache(gethheader.Number.Uint64())
if err != nil {
li.log.WithFields(logrus.Fields{
"blockHash": gethheader.Hash().Hex(),
"blockNumber": gethheader.Number,
}).WithError(err).Error("Failed to get ethashproof cache for header")
return err
return nil, err
}

header, err := ethereum.MakeHeaderFromEthHeader(gethheader, cache, li.log)
Expand All @@ -254,10 +273,7 @@ func (li *EthereumListener) forwardHeader(hcs *ethereum.HeaderCacheState, gethhe
"blockHash": gethheader.Hash().Hex(),
"blockNumber": gethheader.Number,
}).WithError(err).Error("Failed to generate header from ethereum header")
return err
} else {
li.headers <- *header
return nil, err
}

return nil
return header, nil
}
14 changes: 4 additions & 10 deletions relayer/workers/ethrelayer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/sirupsen/logrus"

"github.com/snowfork/go-substrate-rpc-client/v2/types"
"github.com/snowfork/polkadot-ethereum/relayer/chain"
"github.com/snowfork/polkadot-ethereum/relayer/chain/ethereum"
"github.com/snowfork/polkadot-ethereum/relayer/chain/parachain"
"github.com/snowfork/polkadot-ethereum/relayer/crypto/sr25519"
Expand Down Expand Up @@ -52,23 +51,18 @@ func (w *Worker) Start(ctx context.Context, eg *errgroup.Group) error {
return nil
})

// channel for messages from ethereum
ethMessages := make(chan []chain.Message, 1)
// channel for headers from ethereum (it's a blocking channel so that we
// can guarantee that a header is forwarded before we send dependent messages)
ethHeaders := make(chan chain.Header)
// channel for payloads from ethereum
payloads := make(chan ParachainPayload, 1)

listener := NewEthereumListener(
w.ethconfig,
w.ethconn,
ethMessages,
ethHeaders,
payloads,
w.log,
)
writer := NewParachainWriter(
w.paraconn,
ethMessages,
ethHeaders,
payloads,
w.log,
)

Expand Down
Loading

0 comments on commit a0ae815

Please sign in to comment.