Skip to content

Commit

Permalink
interop: Add metrics tracking Cross-validated blocks. (#14211)
Browse files Browse the repository at this point in the history
  • Loading branch information
tyler-smith authored and alcueca committed Feb 7, 2025
1 parent dcdbaba commit 3b86d7a
Show file tree
Hide file tree
Showing 10 changed files with 218 additions and 39 deletions.
92 changes: 58 additions & 34 deletions op-service/metrics/ref_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,73 +40,97 @@ var _ RefMetricer = (*RefMetrics)(nil)
//
// ns is the fully qualified namespace, e.g. "op_node_default".
func MakeRefMetrics(ns string, factory Factory) RefMetrics {
return makeRefMetrics(ns, factory)
}

func (m *RefMetrics) RecordRef(layer string, name string, num uint64, timestamp uint64, h common.Hash) {
recordRefWithLabels(m, name, num, timestamp, h, []string{layer, name})
}

func (m *RefMetrics) RecordL1Ref(name string, ref eth.L1BlockRef) {
m.RecordRef("l1", name, ref.Number, ref.Time, ref.Hash)
}

func (m *RefMetrics) RecordL2Ref(name string, ref eth.L2BlockRef) {
m.RecordRef("l2", name, ref.Number, ref.Time, ref.Hash)
m.RecordRef("l1_origin", name, ref.L1Origin.Number, 0, ref.L1Origin.Hash)
m.RefsSeqNr.WithLabelValues(name).Set(float64(ref.SequenceNumber))
}

// RefMetricsWithChainID is a RefMetrics that includes a chain ID label.
type RefMetricsWithChainID struct {
RefMetrics
}

func MakeRefMetricsWithChainID(ns string, factory Factory) RefMetricsWithChainID {
return RefMetricsWithChainID{
RefMetrics: makeRefMetrics(ns, factory, "chain"),
}
}

func (m *RefMetricsWithChainID) RecordRef(layer string, name string, num uint64, timestamp uint64, h common.Hash, chainID eth.ChainID) {
recordRefWithLabels(&m.RefMetrics, name, num, timestamp, h, []string{layer, name, chainID.String()})
}

func (m *RefMetricsWithChainID) RecordL1Ref(name string, ref eth.L1BlockRef, chainID eth.ChainID) {
m.RecordRef("l1", name, ref.Number, ref.Time, ref.Hash, chainID)
}

func (m *RefMetricsWithChainID) RecordL2Ref(name string, ref eth.L2BlockRef, chainID eth.ChainID) {
m.RecordRef("l2", name, ref.Number, ref.Time, ref.Hash, chainID)
m.RecordRef("l1_origin", name, ref.L1Origin.Number, 0, ref.L1Origin.Hash, chainID)
m.RefsSeqNr.WithLabelValues(name, chainID.String()).Set(float64(ref.SequenceNumber))
}

// makeRefMetrics creates a new RefMetrics with the given namespace, factory, and labels.
func makeRefMetrics(ns string, factory Factory, extraLabels ...string) RefMetrics {
labels := append([]string{"layer", "type"}, extraLabels...)
seqLabels := append([]string{"type"}, extraLabels...)
return RefMetrics{
RefsNumber: factory.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Name: "refs_number",
Help: "Gauge representing the different L1/L2 reference block numbers",
}, []string{
"layer",
"type",
}),
}, labels),
RefsTime: factory.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Name: "refs_time",
Help: "Gauge representing the different L1/L2 reference block timestamps",
}, []string{
"layer",
"type",
}),
}, labels),
RefsHash: factory.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Name: "refs_hash",
Help: "Gauge representing the different L1/L2 reference block hashes truncated to float values",
}, []string{
"layer",
"type",
}),
}, labels),
RefsSeqNr: factory.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Name: "refs_seqnr",
Help: "Gauge representing the different L2 reference sequence numbers",
}, []string{
"type",
}),
}, seqLabels),
RefsLatency: factory.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Name: "refs_latency",
Help: "Gauge representing the different L1/L2 reference block timestamps minus current time, in seconds",
}, []string{
"layer",
"type",
}),
}, labels),
LatencySeen: make(map[string]common.Hash),
}
}

func (m *RefMetrics) RecordRef(layer string, name string, num uint64, timestamp uint64, h common.Hash) {
m.RefsNumber.WithLabelValues(layer, name).Set(float64(num))
// recordRefWithLabels implements to core logic of emitting block ref metrics.
// It's abstracted over labels to enable re-use in different contexts.
func recordRefWithLabels(m *RefMetrics, name string, num uint64, timestamp uint64, h common.Hash, labels []string) {
m.RefsNumber.WithLabelValues(labels...).Set(float64(num))
if timestamp != 0 {
m.RefsTime.WithLabelValues(layer, name).Set(float64(timestamp))
m.RefsTime.WithLabelValues(labels...).Set(float64(timestamp))
// only meter the latency when we first see this hash for the given label name
if m.LatencySeen[name] != h {
m.LatencySeen[name] = h
m.RefsLatency.WithLabelValues(layer, name).Set(float64(timestamp) - (float64(time.Now().UnixNano()) / 1e9))
m.RefsLatency.WithLabelValues(labels...).Set(float64(timestamp) - (float64(time.Now().UnixNano()) / 1e9))
}
}
// we map the first 8 bytes to a float64, so we can graph changes of the hash to find divergences visually.
// We don't do math.Float64frombits, just a regular conversion, to keep the value within a manageable range.
m.RefsHash.WithLabelValues(layer, name).Set(float64(binary.LittleEndian.Uint64(h[:])))
}

func (m *RefMetrics) RecordL1Ref(name string, ref eth.L1BlockRef) {
m.RecordRef("l1", name, ref.Number, ref.Time, ref.Hash)
}

func (m *RefMetrics) RecordL2Ref(name string, ref eth.L2BlockRef) {
m.RecordRef("l2", name, ref.Number, ref.Time, ref.Hash)
m.RecordRef("l1_origin", name, ref.L1Origin.Number, 0, ref.L1Origin.Hash)
m.RefsSeqNr.WithLabelValues(name).Set(float64(ref.SequenceNumber))
m.RefsHash.WithLabelValues(labels...).Set(float64(binary.LittleEndian.Uint64(h[:])))
}

// NoopRefMetrics can be embedded in a noop version of a metric implementation
Expand Down
12 changes: 12 additions & 0 deletions op-supervisor/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ type Metricer interface {
RecordUp()

opmetrics.RPCMetricer
RecordCrossUnsafeRef(chainID eth.ChainID, r eth.BlockRef)
RecordCrossSafeRef(chainID eth.ChainID, r eth.BlockRef)

CacheAdd(chainID eth.ChainID, label string, cacheSize int, evicted bool)
CacheGet(chainID eth.ChainID, label string, hit bool)
Expand All @@ -30,6 +32,7 @@ type Metrics struct {
factory opmetrics.Factory

opmetrics.RPCMetrics
RefMetrics opmetrics.RefMetricsWithChainID

CacheSizeVec *prometheus.GaugeVec
CacheGetVec *prometheus.CounterVec
Expand Down Expand Up @@ -62,6 +65,7 @@ func NewMetrics(procName string) *Metrics {
factory: factory,

RPCMetrics: opmetrics.MakeRPCMetrics(ns, factory),
RefMetrics: opmetrics.MakeRefMetricsWithChainID(ns, factory),

info: *factory.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Expand Down Expand Up @@ -140,6 +144,14 @@ func (m *Metrics) RecordUp() {
m.up.Set(1)
}

func (m *Metrics) RecordCrossUnsafeRef(chainID eth.ChainID, ref eth.BlockRef) {
m.RefMetrics.RecordRef("l2", "cross_unsafe", ref.Number, ref.Time, ref.Hash, chainID)
}

func (m *Metrics) RecordCrossSafeRef(chainID eth.ChainID, ref eth.BlockRef) {
m.RefMetrics.RecordRef("l2", "cross_safe", ref.Number, ref.Time, ref.Hash, chainID)
}

func (m *Metrics) CacheAdd(chainID eth.ChainID, label string, cacheSize int, evicted bool) {
chain := chainIDLabel(chainID)
m.CacheSizeVec.WithLabelValues(chain, label).Set(float64(cacheSize))
Expand Down
3 changes: 3 additions & 0 deletions op-supervisor/metrics/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ func (*noopMetrics) Document() []opmetrics.DocumentedMetric { return nil }
func (*noopMetrics) RecordInfo(version string) {}
func (*noopMetrics) RecordUp() {}

func (m *noopMetrics) RecordCrossUnsafeRef(_ eth.ChainID, _ eth.BlockRef) {}
func (m *noopMetrics) RecordCrossSafeRef(_ eth.ChainID, _ eth.BlockRef) {}

func (m *noopMetrics) CacheAdd(_ eth.ChainID, _ string, _ int, _ bool) {}
func (m *noopMetrics) CacheGet(_ eth.ChainID, _ string, _ bool) {}

Expand Down
2 changes: 1 addition & 1 deletion op-supervisor/supervisor/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func NewSupervisorBackend(ctx context.Context, logger log.Logger,
sysCtx, sysCancel := context.WithCancel(ctx)

// create initial per-chain resources
chainsDBs := db.NewChainsDB(logger, depSet)
chainsDBs := db.NewChainsDB(logger, depSet, m)
eventSys.Register("chainsDBs", chainsDBs, event.DefaultRegisterOpts())

l1Accessor := l1access.NewL1Accessor(sysCtx, logger, nil)
Expand Down
108 changes: 108 additions & 0 deletions op-supervisor/supervisor/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,114 @@ func TestBackendLifetime(t *testing.T) {
t.Log("stopped!")
}

func TestBackendCallsMetrics(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo)
mockMetrics := &MockMetrics{}
dataDir := t.TempDir()
chainA := eth.ChainIDFromUInt64(900)

// Set up mock metrics
mockMetrics.Mock.On("RecordDBEntryCount", chainA, mock.AnythingOfType("string"), mock.AnythingOfType("int64")).Return()
mockMetrics.Mock.On("RecordCrossUnsafeRef", chainA, mock.MatchedBy(func(_ eth.BlockRef) bool { return true })).Return()
mockMetrics.Mock.On("RecordCrossSafeRef", chainA, mock.MatchedBy(func(_ eth.BlockRef) bool { return true })).Return()

depSet, err := depset.NewStaticConfigDependencySet(
map[eth.ChainID]*depset.StaticConfigDependency{
chainA: {
ChainIndex: 900,
ActivationTime: 42,
HistoryMinTime: 100,
},
})
require.NoError(t, err)

cfg := &config.Config{
Version: "test",
LogConfig: oplog.CLIConfig{},
MetricsConfig: opmetrics.CLIConfig{},
PprofConfig: oppprof.CLIConfig{},
RPC: oprpc.CLIConfig{},
DependencySetSource: depSet,
SynchronousProcessors: true,
MockRun: false,
SyncSources: &syncnode.CLISyncNodes{},
Datadir: dataDir,
}

ex := event.NewGlobalSynchronous(context.Background())
b, err := NewSupervisorBackend(context.Background(), logger, mockMetrics, cfg, ex)
require.NoError(t, err)

// Assert that the metrics are called at initialization
mockMetrics.Mock.AssertCalled(t, "RecordDBEntryCount", chainA, "log", int64(0))
mockMetrics.Mock.AssertCalled(t, "RecordDBEntryCount", chainA, "local_derived", int64(0))
mockMetrics.Mock.AssertCalled(t, "RecordDBEntryCount", chainA, "cross_derived", int64(0))

// Start the backend
err = b.Start(context.Background())
require.NoError(t, err)

// Create a test block
block := eth.BlockRef{
Hash: common.Hash{0xaa},
Number: 42,
ParentHash: common.Hash{0xbb},
Time: 10000,
}

// Assert that metrics are called on safety level updates
err = b.chainDBs.UpdateCrossUnsafe(chainA, types.BlockSeal{
Hash: block.Hash,
Number: block.Number,
Timestamp: block.Time,
})
require.NoError(t, err)
mockMetrics.Mock.AssertCalled(t, "RecordCrossUnsafeRef", chainA, mock.MatchedBy(func(ref eth.BlockRef) bool {
return ref.Hash == block.Hash && ref.Number == block.Number && ref.Time == block.Time
}))

err = b.chainDBs.UpdateCrossSafe(chainA, block, block)
require.NoError(t, err)
mockMetrics.Mock.AssertCalled(t, "RecordDBEntryCount", chainA, "cross_derived", int64(1))
mockMetrics.Mock.AssertCalled(t, "RecordCrossSafeRef", chainA, mock.MatchedBy(func(ref eth.BlockRef) bool {
return ref.Hash == block.Hash && ref.Number == block.Number && ref.Time == block.Time
}))

// Stop the backend
err = b.Stop(context.Background())
require.NoError(t, err)
}

type MockMetrics struct {
mock.Mock
}

var _ Metrics = (*MockMetrics)(nil)

func (m *MockMetrics) CacheAdd(chainID eth.ChainID, label string, cacheSize int, evicted bool) {
m.Mock.Called(chainID, label, cacheSize, evicted)
}

func (m *MockMetrics) CacheGet(chainID eth.ChainID, label string, hit bool) {
m.Mock.Called(chainID, label, hit)
}

func (m *MockMetrics) RecordCrossUnsafeRef(chainID eth.ChainID, ref eth.BlockRef) {
m.Mock.Called(chainID, ref)
}

func (m *MockMetrics) RecordCrossSafeRef(chainID eth.ChainID, ref eth.BlockRef) {
m.Mock.Called(chainID, ref)
}

func (m *MockMetrics) RecordDBEntryCount(chainID eth.ChainID, kind string, count int64) {
m.Mock.Called(chainID, kind, count)
}

func (m *MockMetrics) RecordDBSearchEntriesRead(chainID eth.ChainID, count int64) {
m.Mock.Called(chainID, count)
}

type MockProcessorSource struct {
mock.Mock
}
Expand Down
11 changes: 11 additions & 0 deletions op-supervisor/supervisor/backend/chain_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ type Metrics interface {
CacheAdd(chainID eth.ChainID, label string, cacheSize int, evicted bool)
CacheGet(chainID eth.ChainID, label string, hit bool)

RecordCrossUnsafeRef(chainID eth.ChainID, ref eth.BlockRef)
RecordCrossSafeRef(chainID eth.ChainID, ref eth.BlockRef)

RecordDBEntryCount(chainID eth.ChainID, kind string, count int64)
RecordDBSearchEntriesRead(chainID eth.ChainID, count int64)
}
Expand All @@ -28,6 +31,14 @@ func newChainMetrics(chainID eth.ChainID, delegate Metrics) *chainMetrics {
}
}

func (c *chainMetrics) RecordCrossUnsafeRef(ref eth.BlockRef) {
c.delegate.RecordCrossUnsafeRef(c.chainID, ref)
}

func (c *chainMetrics) RecordCrossSafeRef(ref eth.BlockRef) {
c.delegate.RecordCrossSafeRef(c.chainID, ref)
}

func (c *chainMetrics) CacheAdd(label string, cacheSize int, evicted bool) {
c.delegate.CacheAdd(c.chainID, label, cacheSize, evicted)
}
Expand Down
15 changes: 14 additions & 1 deletion op-supervisor/supervisor/backend/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup/event"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/locks"
"github.com/ethereum-optimism/optimism/op-supervisor/metrics"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/fromda"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset"
Expand Down Expand Up @@ -84,6 +85,11 @@ var _ DerivationStorage = (*fromda.DB)(nil)

var _ LogStorage = (*logs.DB)(nil)

type Metrics interface {
RecordCrossUnsafeRef(chainID eth.ChainID, ref eth.BlockRef)
RecordCrossSafeRef(chainID eth.ChainID, ref eth.BlockRef)
}

// ChainsDB is a database that stores logs and derived-from data for multiple chains.
// it implements the LogStorage interface, as well as several DB interfaces needed by the cross package.
type ChainsDB struct {
Expand Down Expand Up @@ -113,14 +119,21 @@ type ChainsDB struct {

// emitter used to signal when the DB changes, for other modules to react to
emitter event.Emitter

m Metrics
}

var _ event.AttachEmitter = (*ChainsDB)(nil)

func NewChainsDB(l log.Logger, depSet depset.DependencySet) *ChainsDB {
func NewChainsDB(l log.Logger, depSet depset.DependencySet, m Metrics) *ChainsDB {
if m == nil {
m = metrics.NoopMetrics
}

return &ChainsDB{
logger: l,
depSet: depSet,
m: m,
}
}

Expand Down
5 changes: 3 additions & 2 deletions op-supervisor/supervisor/backend/db/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-supervisor/metrics"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -96,7 +97,7 @@ func TestCommonL1UnknownChain(t *testing.T) {
m1 := &mockDerivationStorage{}
m2 := &mockDerivationStorage{}
logger := testlog.Logger(t, log.LevelDebug)
chainDB := NewChainsDB(logger, sampleDepSet(t))
chainDB := NewChainsDB(logger, sampleDepSet(t), metrics.NoopMetrics)

// add a mock local derived-from storage to drive the test
chainDB.AddLocalDerivationDB(eth.ChainIDFromUInt64(900), m1)
Expand All @@ -112,7 +113,7 @@ func TestCommonL1(t *testing.T) {
m2 := &mockDerivationStorage{}
m3 := &mockDerivationStorage{}
logger := testlog.Logger(t, log.LevelDebug)
chainDB := NewChainsDB(logger, sampleDepSet(t))
chainDB := NewChainsDB(logger, sampleDepSet(t), metrics.NoopMetrics)

// add a mock local derived-from storage to drive the test
chainDB.AddLocalDerivationDB(eth.ChainIDFromUInt64(900), m1)
Expand Down
Loading

0 comments on commit 3b86d7a

Please sign in to comment.