From 31133ff18456371f9492bb66718ca095e63979c0 Mon Sep 17 00:00:00 2001 From: zhangkai Date: Thu, 12 Oct 2023 09:51:29 +0800 Subject: [PATCH] Add logs for sequencer (#14) * add log statistics * add reset and batch number * total duration with milliseconds * add batch gas used statistics --- sequencer/finalizer.go | 38 +++++++++ sequencer/metrics/logstatistics.go | 40 ++++++++++ sequencer/metrics/logstatisticsimpl.go | 85 +++++++++++++++++++++ sequencer/metrics/logstatisticsimpl_test.go | 51 +++++++++++++ 4 files changed, 214 insertions(+) create mode 100644 sequencer/metrics/logstatistics.go create mode 100644 sequencer/metrics/logstatisticsimpl.go create mode 100644 sequencer/metrics/logstatisticsimpl_test.go diff --git a/sequencer/finalizer.go b/sequencer/finalizer.go index ace1d827e9..ca0d3557bc 100644 --- a/sequencer/finalizer.go +++ b/sequencer/finalizer.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "math/big" + "strconv" "sync" "sync/atomic" "time" @@ -337,7 +338,10 @@ func (f *finalizer) finalizeBatches(ctx context.Context) { tx := f.worker.GetBestFittingTx(f.batch.remainingResources) metrics.WorkerProcessingTime(time.Since(start)) + metrics.GetLogStatistics().CumulativeTiming(metrics.GetTx, time.Since(start)) + if tx != nil { + metrics.GetLogStatistics().CumulativeCounting(metrics.TxCounter) log.Debugf("processing tx: %s", tx.Hash.Hex()) showNotFoundTxLog = true @@ -350,14 +354,18 @@ func (f *finalizer) finalizeBatches(ctx context.Context) { if err == ErrEffectiveGasPriceReprocess { firstTxProcess = false log.Info("reprocessing tx because of effective gas price calculation: %s", tx.Hash.Hex()) + metrics.GetLogStatistics().CumulativeCounting(metrics.ReprocessingTxCounter) continue } else { log.Errorf("failed to process transaction in finalizeBatches, Err: %v", err) + metrics.GetLogStatistics().CumulativeCounting(metrics.FailTxCounter) break } } + metrics.GetLogStatistics().CumulativeValue(metrics.BatchGas, int64(tx.Gas)) break } + f.sharedResourcesMux.Unlock() } else { // wait for new txs @@ -367,6 +375,7 @@ func (f *finalizer) finalizeBatches(ctx context.Context) { } if f.cfg.SleepDuration.Duration > 0 { time.Sleep(f.cfg.SleepDuration.Duration) + metrics.GetLogStatistics().CumulativeCounting(metrics.GetTxPauseCounter) } } @@ -378,10 +387,18 @@ func (f *finalizer) finalizeBatches(ctx context.Context) { if f.isDeadlineEncountered() { log.Infof("closing batch %d because deadline was encountered.", f.batch.batchNumber) + metrics.GetLogStatistics().SetTag(metrics.BatchCloseReason, "deadline") f.finalizeBatch(ctx) + log.Infof(metrics.GetLogStatistics().Summary()) + metrics.GetLogStatistics().ResetStatistics() + metrics.GetLogStatistics().UpdateTimestamp(metrics.NewRound, time.Now()) } else if f.isBatchFull() || f.isBatchAlmostFull() { log.Infof("closing batch %d because it's almost full.", f.batch.batchNumber) + metrics.GetLogStatistics().SetTag(metrics.BatchCloseReason, "full") f.finalizeBatch(ctx) + log.Infof(metrics.GetLogStatistics().Summary()) + metrics.GetLogStatistics().ResetStatistics() + metrics.GetLogStatistics().UpdateTimestamp(metrics.NewRound, time.Now()) } if err := ctx.Err(); err != nil { @@ -421,8 +438,10 @@ func (f *finalizer) isBatchFull() bool { // finalizeBatch retries to until successful closes the current batch and opens a new one, potentially processing forced batches between the batch is closed and the resulting new empty batch func (f *finalizer) finalizeBatch(ctx context.Context) { start := time.Now() + metrics.GetLogStatistics().SetTag(metrics.FinalizeBatchNumber, strconv.Itoa(int(f.batch.batchNumber))) defer func() { metrics.ProcessingTime(time.Since(start)) + metrics.GetLogStatistics().CumulativeTiming(metrics.FinalizeBatchTiming, time.Since(start)) }() var err error @@ -504,6 +523,7 @@ func (f *finalizer) newWIPBatch(ctx context.Context) (*WipBatch, error) { } // Reprocess full batch as sanity check + tsReprocessFullBatch := time.Now() if f.cfg.SequentialReprocessFullBatch { // Do the full batch reprocess now _, err := f.reprocessFullBatch(ctx, f.batch.batchNumber, f.batch.initialStateRoot, f.batch.stateRoot) @@ -517,12 +537,15 @@ func (f *finalizer) newWIPBatch(ctx context.Context) (*WipBatch, error) { _, _ = f.reprocessFullBatch(ctx, f.batch.batchNumber, f.batch.initialStateRoot, f.batch.stateRoot) }() } + metrics.GetLogStatistics().CumulativeTiming(metrics.FinalizeBatchReprocessFullBatch, time.Since(tsReprocessFullBatch)) // Close the current batch + tsCloseBatch := time.Now() err = f.closeBatch(ctx) if err != nil { return nil, fmt.Errorf("failed to close batch, err: %w", err) } + metrics.GetLogStatistics().CumulativeTiming(metrics.FinalizeBatchCloseBatch, time.Since(tsCloseBatch)) // Check if the batch is empty and sending a GER Update to the stream is needed if f.streamServer != nil && f.batch.isEmpty() && f.currentGERHash != f.previousGERHash { @@ -552,6 +575,7 @@ func (f *finalizer) newWIPBatch(ctx context.Context) (*WipBatch, error) { } // Metadata for the next batch + tsOpenBatch := time.Now() stateRoot := f.batch.stateRoot lastBatchNumber := f.batch.batchNumber @@ -581,6 +605,7 @@ func (f *finalizer) newWIPBatch(ctx context.Context) (*WipBatch, error) { f.processRequest.GlobalExitRoot = batch.globalExitRoot f.processRequest.Transactions = make([]byte, 0, 1) } + metrics.GetLogStatistics().CumulativeTiming(metrics.FinalizeBatchOpenBatch, time.Since(tsOpenBatch)) return batch, err } @@ -595,6 +620,9 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker, first start := time.Now() defer func() { metrics.ProcessingTime(time.Since(start)) + if tx != nil { + metrics.GetLogStatistics().CumulativeTiming(metrics.ProcessingTxTiming, time.Since(start)) + } }() if f.batch.isEmpty() { @@ -685,6 +713,7 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker, first } log.Infof("processTransaction: single tx. Batch.BatchNumber: %d, BatchNumber: %d, OldStateRoot: %s, txHash: %s, GER: %s", f.batch.batchNumber, f.processRequest.BatchNumber, f.processRequest.OldStateRoot, hashStr, f.processRequest.GlobalExitRoot.String()) + tsCommit := time.Now() processBatchResponse, err := f.executor.ProcessBatch(ctx, f.processRequest, true) if err != nil && errors.Is(err, runtime.ErrExecutorDBError) { log.Errorf("failed to process transaction: %s", err) @@ -704,10 +733,15 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker, first log.Errorf("failed to update status to invalid in the pool for tx: %s, err: %s", tx.Hash.String(), err) } else { metrics.TxProcessed(metrics.TxProcessedLabelInvalid, 1) + metrics.GetLogStatistics().CumulativeCounting(metrics.ProcessingInvalidTxCounter) } return nil, err } + if tx != nil { + metrics.GetLogStatistics().CumulativeTiming(metrics.ProcessingTxCommit, time.Since(tsCommit)) + } + tsProcessResponse := time.Now() oldStateRoot := f.batch.stateRoot if len(processBatchResponse.Responses) > 0 && tx != nil { errWg, err = f.handleProcessTransactionResponse(ctx, tx, processBatchResponse, oldStateRoot) @@ -721,6 +755,10 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker, first f.batch.localExitRoot = processBatchResponse.NewLocalExitRoot log.Infof("processTransaction: data loaded in memory. batch.batchNumber: %d, batchNumber: %d, result.NewStateRoot: %s, result.NewLocalExitRoot: %s, oldStateRoot: %s", f.batch.batchNumber, f.processRequest.BatchNumber, processBatchResponse.NewStateRoot.String(), processBatchResponse.NewLocalExitRoot.String(), oldStateRoot.String()) + if tx != nil { + metrics.GetLogStatistics().CumulativeTiming(metrics.ProcessingTxResponse, time.Since(tsProcessResponse)) + } + return nil, nil } diff --git a/sequencer/metrics/logstatistics.go b/sequencer/metrics/logstatistics.go new file mode 100644 index 0000000000..141649d523 --- /dev/null +++ b/sequencer/metrics/logstatistics.go @@ -0,0 +1,40 @@ +package metrics + +import ( + "time" +) + +type LogTag string + +type LogStatistics interface { + CumulativeCounting(tag LogTag) + CumulativeValue(tag LogTag, value int64) + CumulativeTiming(tag LogTag, duration time.Duration) + SetTag(tag LogTag, value string) + Summary() string + ResetStatistics() + + UpdateTimestamp(tag LogTag, tm time.Time) +} + +const ( + TxCounter LogTag = "TxCounter" + GetTx LogTag = "GetTx" + GetTxPauseCounter LogTag = "GetTxPauseCounter" + BatchCloseReason LogTag = "BatchCloseReason" + ReprocessingTxCounter LogTag = "ReProcessingTxCounter" + FailTxCounter LogTag = "FailTxCounter" + NewRound LogTag = "NewRound" + BatchGas LogTag = "BatchGas" + + ProcessingTxTiming LogTag = "ProcessingTxTiming" + ProcessingInvalidTxCounter LogTag = "ProcessingInvalidTxCounter" + ProcessingTxCommit LogTag = "ProcessingTxCommit" + ProcessingTxResponse LogTag = "ProcessingTxResponse" + + FinalizeBatchTiming LogTag = "FinalizeBatchTiming" + FinalizeBatchNumber LogTag = "FinalizeBatchNumber" + FinalizeBatchReprocessFullBatch LogTag = "FinalizeBatchReprocessFullBatch" + FinalizeBatchCloseBatch LogTag = "FinalizeBatchCloseBatch" + FinalizeBatchOpenBatch LogTag = "FinalizeBatchOpenBatch" +) diff --git a/sequencer/metrics/logstatisticsimpl.go b/sequencer/metrics/logstatisticsimpl.go new file mode 100644 index 0000000000..d74dd87596 --- /dev/null +++ b/sequencer/metrics/logstatisticsimpl.go @@ -0,0 +1,85 @@ +package metrics + +import ( + "strconv" + "sync" + "time" +) + +var instance *logStatisticsInstance +var once sync.Once + +func GetLogStatistics() LogStatistics { + once.Do(func() { + instance = &logStatisticsInstance{} + instance.init() + }) + return instance +} + +type logStatisticsInstance struct { + timestamp map[LogTag]time.Time + statistics map[LogTag]int64 // value maybe the counter or time.Duration(ms) + tags map[LogTag]string +} + +func (l *logStatisticsInstance) init() { + l.timestamp = make(map[LogTag]time.Time) + l.statistics = make(map[LogTag]int64) + l.tags = make(map[LogTag]string) +} + +func (l *logStatisticsInstance) CumulativeCounting(tag LogTag) { + l.statistics[tag]++ +} + +func (l *logStatisticsInstance) CumulativeValue(tag LogTag, value int64) { + l.statistics[tag] += value +} + +func (l *logStatisticsInstance) CumulativeTiming(tag LogTag, duration time.Duration) { + l.statistics[tag] += duration.Milliseconds() +} + +func (l *logStatisticsInstance) SetTag(tag LogTag, value string) { + l.tags[tag] = value +} + +func (l *logStatisticsInstance) UpdateTimestamp(tag LogTag, tm time.Time) { + l.timestamp[tag] = tm +} + +func (l *logStatisticsInstance) ResetStatistics() { + l.statistics = make(map[LogTag]int64) + l.tags = make(map[LogTag]string) +} + +func (l *logStatisticsInstance) Summary() string { + batchTotalDuration := "-" + if key, ok := l.timestamp[NewRound]; ok { + batchTotalDuration = strconv.Itoa(int(time.Since(key).Milliseconds())) + } + processTxTiming := "ProcessTx<" + strconv.Itoa(int(l.statistics[ProcessingTxTiming])) + "ms, " + + "Commit<" + strconv.Itoa(int(l.statistics[ProcessingTxCommit])) + "ms>, " + + "ProcessResponse<" + strconv.Itoa(int(l.statistics[ProcessingTxResponse])) + "ms>>, " + + finalizeBatchTiming := "FinalizeBatch<" + strconv.Itoa(int(l.statistics[FinalizeBatchTiming])) + "ms, " + + "ReprocessFullBatch<" + strconv.Itoa(int(l.statistics[FinalizeBatchReprocessFullBatch])) + "ms>, " + + "CloseBatch<" + strconv.Itoa(int(l.statistics[FinalizeBatchCloseBatch])) + "ms>, " + + "OpenBatch<" + strconv.Itoa(int(l.statistics[FinalizeBatchOpenBatch])) + "ms>>, " + + result := "Batch<" + l.tags[FinalizeBatchNumber] + ">, " + + "TotalDuration<" + batchTotalDuration + "ms>, " + + "GasUsed<" + strconv.Itoa(int(l.statistics[BatchGas])) + ">, " + + "Tx<" + strconv.Itoa(int(l.statistics[TxCounter])) + ">, " + + "GetTx<" + strconv.Itoa(int(l.statistics[GetTx])) + "ms>, " + + "GetTxPause<" + strconv.Itoa(int(l.statistics[GetTxPauseCounter])) + ">, " + + "ReprocessTx<" + strconv.Itoa(int(l.statistics[ReprocessingTxCounter])) + ">, " + + "FailTx<" + strconv.Itoa(int(l.statistics[FailTxCounter])) + ">, " + + "InvalidTx<" + strconv.Itoa(int(l.statistics[ProcessingInvalidTxCounter])) + ">, " + + processTxTiming + + finalizeBatchTiming + + "BatchCloseReason<" + l.tags[BatchCloseReason] + ">" + + return result +} diff --git a/sequencer/metrics/logstatisticsimpl_test.go b/sequencer/metrics/logstatisticsimpl_test.go new file mode 100644 index 0000000000..c2ec294811 --- /dev/null +++ b/sequencer/metrics/logstatisticsimpl_test.go @@ -0,0 +1,51 @@ +package metrics + +import ( + "testing" + "time" +) + +func Test_logStatisticsInstance_Summary(t *testing.T) { + type fields struct { + timestamp map[LogTag]time.Time + statistics map[LogTag]int64 + tags map[LogTag]string + } + tests := []struct { + name string + fields fields + want string + }{ + // TODO: Add test cases. + {"1", fields{ + timestamp: map[LogTag]time.Time{NewRound: time.Now().Add(-time.Second)}, + statistics: map[LogTag]int64{ + BatchGas: 111111, + TxCounter: 10, + GetTx: time.Second.Milliseconds(), + GetTxPauseCounter: 2, + ReprocessingTxCounter: 3, + FailTxCounter: 1, + ProcessingInvalidTxCounter: 2, + ProcessingTxTiming: time.Second.Milliseconds() * 30, + ProcessingTxCommit: time.Second.Milliseconds() * 10, + ProcessingTxResponse: time.Second.Milliseconds() * 15, + FinalizeBatchTiming: time.Second.Milliseconds() * 50, + FinalizeBatchReprocessFullBatch: time.Second.Milliseconds() * 20, + FinalizeBatchCloseBatch: time.Second.Milliseconds() * 10, + FinalizeBatchOpenBatch: time.Second.Milliseconds() * 10, + }, + tags: map[LogTag]string{BatchCloseReason: "deadline", FinalizeBatchNumber: "123"}, + }, "test"}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + l := &logStatisticsInstance{ + timestamp: tt.fields.timestamp, + statistics: tt.fields.statistics, + tags: tt.fields.tags, + } + t.Log(l.Summary()) + }) + } +}