Skip to content

Commit

Permalink
Merge branch 'main' into fix-genesis-block-hash-for-testnet
Browse files Browse the repository at this point in the history
  • Loading branch information
franklywatson authored Oct 2, 2024
2 parents f14ad0c + 4798d65 commit 5d5a346
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 29 deletions.
6 changes: 3 additions & 3 deletions models/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ func NewEngineStatus() *EngineStatus {
}
}

func (e *EngineStatus) IsReady() <-chan struct{} {
func (e *EngineStatus) Ready() <-chan struct{} {
return e.ready
}

func (e *EngineStatus) IsStopped() <-chan struct{} {
func (e *EngineStatus) Stopped() <-chan struct{} {
return e.stop
}

func (e *EngineStatus) IsDone() <-chan struct{} {
func (e *EngineStatus) Done() <-chan struct{} {
return e.done
}

Expand Down
19 changes: 5 additions & 14 deletions services/ingestion/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ var _ models.Engine = &Engine{}
// it will just overwrite the current indexed data. Idempotency is an important
// requirement of the implementation of this engine.
type Engine struct {
*models.EngineStatus

subscriber EventSubscriber
store *pebble.Storage
blocks storage.BlockIndexer
Expand All @@ -39,7 +41,6 @@ type Engine struct {
accounts storage.AccountIndexer
log zerolog.Logger
evmLastHeight *models.SequentialHeight
status *models.EngineStatus
blocksPublisher *models.Publisher
logsPublisher *models.Publisher
collector metrics.Collector
Expand All @@ -60,31 +61,21 @@ func NewEventIngestionEngine(
log = log.With().Str("component", "ingestion").Logger()

return &Engine{
EngineStatus: models.NewEngineStatus(),

subscriber: subscriber,
store: store,
blocks: blocks,
receipts: receipts,
transactions: transactions,
accounts: accounts,
log: log,
status: models.NewEngineStatus(),
blocksPublisher: blocksPublisher,
logsPublisher: logsPublisher,
collector: collector,
}
}

// Ready signals when the engine has started.
func (e *Engine) Ready() <-chan struct{} {
return e.status.IsReady()
}

// Done signals when the engine has stopped.
func (e *Engine) Done() <-chan struct{} {
// return e.status.IsDone()
return nil
}

// Stop the engine.
func (e *Engine) Stop() {
// todo
Expand Down Expand Up @@ -113,7 +104,7 @@ func (e *Engine) Run(ctx context.Context) error {

e.log.Info().Uint64("start-cadence-height", latestCadence).Msg("starting ingestion")

e.status.MarkReady()
e.MarkReady()

for events := range e.subscriber.Subscribe(ctx, latestCadence) {
if events.Err != nil {
Expand Down
20 changes: 8 additions & 12 deletions services/traces/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ var _ models.Engine = &Engine{}
// listens for new transaction events and then downloads and index the
// traces from the transaction execution.
type Engine struct {
*models.EngineStatus

logger zerolog.Logger
status *models.EngineStatus
blocksPublisher *models.Publisher
blocks storage.BlockIndexer
traces storage.TraceIndexer
Expand All @@ -46,7 +47,8 @@ func NewTracesIngestionEngine(
collector metrics.Collector,
) *Engine {
return &Engine{
status: models.NewEngineStatus(),
EngineStatus: models.NewEngineStatus(),

logger: logger.With().Str("component", "trace-ingestion").Logger(),
blocksPublisher: blocksPublisher,
blocks: blocks,
Expand All @@ -56,11 +58,13 @@ func NewTracesIngestionEngine(
}
}

// Run the engine.
// TODO: use the context to stop the engine.
func (e *Engine) Run(ctx context.Context) error {
// subscribe to new blocks
e.blocksPublisher.Subscribe(e)

e.status.MarkReady()
e.MarkReady()
return nil
}

Expand Down Expand Up @@ -148,13 +152,5 @@ func (e *Engine) Error() <-chan error {
}

func (e *Engine) Stop() {
e.status.MarkStopped()
}

func (e *Engine) Done() <-chan struct{} {
return e.status.IsDone()
}

func (e *Engine) Ready() <-chan struct{} {
return e.status.IsReady()
e.MarkStopped()
}

0 comments on commit 5d5a346

Please sign in to comment.