Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor engine status #600

Merged
merged 2 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions models/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ func NewEngineStatus() *EngineStatus {
}
}

func (e *EngineStatus) IsReady() <-chan struct{} {
func (e *EngineStatus) Ready() <-chan struct{} {
return e.ready
}

func (e *EngineStatus) IsStopped() <-chan struct{} {
func (e *EngineStatus) Stopped() <-chan struct{} {
return e.stop
}

func (e *EngineStatus) IsDone() <-chan struct{} {
func (e *EngineStatus) Done() <-chan struct{} {
return e.done
}

Expand Down
19 changes: 5 additions & 14 deletions services/ingestion/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ var _ models.Engine = &Engine{}
// it will just overwrite the current indexed data. Idempotency is an important
// requirement of the implementation of this engine.
type Engine struct {
*models.EngineStatus

subscriber EventSubscriber
store *pebble.Storage
blocks storage.BlockIndexer
Expand All @@ -39,7 +41,6 @@ type Engine struct {
accounts storage.AccountIndexer
log zerolog.Logger
evmLastHeight *models.SequentialHeight
status *models.EngineStatus
blocksPublisher *models.Publisher
logsPublisher *models.Publisher
collector metrics.Collector
Expand All @@ -60,31 +61,21 @@ func NewEventIngestionEngine(
log = log.With().Str("component", "ingestion").Logger()

return &Engine{
EngineStatus: models.NewEngineStatus(),

subscriber: subscriber,
store: store,
blocks: blocks,
receipts: receipts,
transactions: transactions,
accounts: accounts,
log: log,
status: models.NewEngineStatus(),
blocksPublisher: blocksPublisher,
logsPublisher: logsPublisher,
collector: collector,
}
}

// Ready signals when the engine has started.
func (e *Engine) Ready() <-chan struct{} {
return e.status.IsReady()
}

// Done signals when the engine has stopped.
func (e *Engine) Done() <-chan struct{} {
// return e.status.IsDone()
return nil
}

// Stop the engine.
func (e *Engine) Stop() {
// todo
Expand Down Expand Up @@ -113,7 +104,7 @@ func (e *Engine) Run(ctx context.Context) error {

e.log.Info().Uint64("start-cadence-height", latestCadence).Msg("starting ingestion")

e.status.MarkReady()
e.MarkReady()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Update Required: Reference to removed engine.Ready() method found in bootstrap/bootstrap.go. Please update this reference to prevent potential runtime errors.

  • bootstrap/bootstrap.go: <-engine.Ready()
🔗 Analysis chain

Ensure all references to removed Ready() and Done() methods are updated

Since the Ready() and Done() methods have been removed from the Engine struct, please verify that any external code or components that previously relied on these methods have been updated accordingly to prevent potential runtime errors.

Run the following script to check for any remaining references to the removed methods:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Search for references to the removed `Ready()` and `Done()` methods in the codebase.

# Test: Find any usage of `Ready()` method on `Engine` instances.
# Expect: No matches if all references have been updated.
rg --type go '\.Ready\(\)' --context 5

# Test: Find any usage of `Done()` method on `Engine` instances.
# Expect: No matches if all references have been updated.
rg --type go '\.Done\(\)' --context 5

Length of output: 2877


for events := range e.subscriber.Subscribe(ctx, latestCadence) {
if events.Err != nil {
Expand Down
20 changes: 8 additions & 12 deletions services/traces/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ var _ models.Engine = &Engine{}
// listens for new transaction events and then downloads and index the
// traces from the transaction execution.
type Engine struct {
*models.EngineStatus

logger zerolog.Logger
status *models.EngineStatus
blocksPublisher *models.Publisher
blocks storage.BlockIndexer
traces storage.TraceIndexer
Expand All @@ -46,7 +47,8 @@ func NewTracesIngestionEngine(
collector metrics.Collector,
) *Engine {
return &Engine{
status: models.NewEngineStatus(),
EngineStatus: models.NewEngineStatus(),

logger: logger.With().Str("component", "trace-ingestion").Logger(),
blocksPublisher: blocksPublisher,
blocks: blocks,
Expand All @@ -56,11 +58,13 @@ func NewTracesIngestionEngine(
}
}

// Run the engine.
// TODO: use the context to stop the engine.
func (e *Engine) Run(ctx context.Context) error {
// subscribe to new blocks
e.blocksPublisher.Subscribe(e)

e.status.MarkReady()
e.MarkReady()
return nil
}

Expand Down Expand Up @@ -148,13 +152,5 @@ func (e *Engine) Error() <-chan error {
}

func (e *Engine) Stop() {
e.status.MarkStopped()
}

func (e *Engine) Done() <-chan struct{} {
return e.status.IsDone()
}

func (e *Engine) Ready() <-chan struct{} {
return e.status.IsReady()
e.MarkStopped()
}
Loading