From 5d7352f6b77ed53a9f9a66b2c1a2011eef4e0f8f Mon Sep 17 00:00:00 2001 From: Janez Podhostnik Date: Mon, 30 Sep 2024 20:53:49 +0200 Subject: [PATCH] Refactor engine status --- models/engine.go | 6 +++--- services/ingestion/engine.go | 19 +++++-------------- services/traces/engine.go | 20 ++++++++------------ 3 files changed, 16 insertions(+), 29 deletions(-) diff --git a/models/engine.go b/models/engine.go index 4e2ade6cf..edb328689 100644 --- a/models/engine.go +++ b/models/engine.go @@ -30,15 +30,15 @@ func NewEngineStatus() *EngineStatus { } } -func (e *EngineStatus) IsReady() <-chan struct{} { +func (e *EngineStatus) Ready() <-chan struct{} { return e.ready } -func (e *EngineStatus) IsStopped() <-chan struct{} { +func (e *EngineStatus) Stopped() <-chan struct{} { return e.stop } -func (e *EngineStatus) IsDone() <-chan struct{} { +func (e *EngineStatus) Done() <-chan struct{} { return e.done } diff --git a/services/ingestion/engine.go b/services/ingestion/engine.go index de463a15b..2a9792933 100644 --- a/services/ingestion/engine.go +++ b/services/ingestion/engine.go @@ -31,6 +31,8 @@ var _ models.Engine = &Engine{} // it will just overwrite the current indexed data. Idempotency is an important // requirement of the implementation of this engine. type Engine struct { + *models.EngineStatus + subscriber EventSubscriber store *pebble.Storage blocks storage.BlockIndexer @@ -39,7 +41,6 @@ type Engine struct { accounts storage.AccountIndexer log zerolog.Logger evmLastHeight *models.SequentialHeight - status *models.EngineStatus blocksPublisher *models.Publisher logsPublisher *models.Publisher collector metrics.Collector @@ -60,6 +61,8 @@ func NewEventIngestionEngine( log = log.With().Str("component", "ingestion").Logger() return &Engine{ + EngineStatus: models.NewEngineStatus(), + subscriber: subscriber, store: store, blocks: blocks, @@ -67,24 +70,12 @@ func NewEventIngestionEngine( transactions: transactions, accounts: accounts, log: log, - status: models.NewEngineStatus(), blocksPublisher: blocksPublisher, logsPublisher: logsPublisher, collector: collector, } } -// Ready signals when the engine has started. -func (e *Engine) Ready() <-chan struct{} { - return e.status.IsReady() -} - -// Done signals when the engine has stopped. -func (e *Engine) Done() <-chan struct{} { - // return e.status.IsDone() - return nil -} - // Stop the engine. func (e *Engine) Stop() { // todo @@ -113,7 +104,7 @@ func (e *Engine) Run(ctx context.Context) error { e.log.Info().Uint64("start-cadence-height", latestCadence).Msg("starting ingestion") - e.status.MarkReady() + e.MarkReady() for events := range e.subscriber.Subscribe(ctx, latestCadence) { if events.Err != nil { diff --git a/services/traces/engine.go b/services/traces/engine.go index 45cf4daf6..fd51dfece 100644 --- a/services/traces/engine.go +++ b/services/traces/engine.go @@ -27,8 +27,9 @@ var _ models.Engine = &Engine{} // listens for new transaction events and then downloads and index the // traces from the transaction execution. type Engine struct { + *models.EngineStatus + logger zerolog.Logger - status *models.EngineStatus blocksPublisher *models.Publisher blocks storage.BlockIndexer traces storage.TraceIndexer @@ -46,7 +47,8 @@ func NewTracesIngestionEngine( collector metrics.Collector, ) *Engine { return &Engine{ - status: models.NewEngineStatus(), + EngineStatus: models.NewEngineStatus(), + logger: logger.With().Str("component", "trace-ingestion").Logger(), blocksPublisher: blocksPublisher, blocks: blocks, @@ -56,11 +58,13 @@ func NewTracesIngestionEngine( } } +// Run the engine. +// TODO: use the context to stop the engine. func (e *Engine) Run(ctx context.Context) error { // subscribe to new blocks e.blocksPublisher.Subscribe(e) - e.status.MarkReady() + e.MarkReady() return nil } @@ -148,13 +152,5 @@ func (e *Engine) Error() <-chan error { } func (e *Engine) Stop() { - e.status.MarkStopped() -} - -func (e *Engine) Done() <-chan struct{} { - return e.status.IsDone() -} - -func (e *Engine) Ready() <-chan struct{} { - return e.status.IsReady() + e.MarkStopped() }