diff --git a/go.mod b/go.mod index b884efd..335d3ec 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ go 1.17 require ( github.com/spf13/cobra v1.4.0 - gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b + gopkg.in/yaml.v3 v3.0.1 ) require ( @@ -21,6 +21,7 @@ require ( github.com/ethereum/go-ethereum v1.10.17 // indirect github.com/fatih/color v1.13.0 // indirect github.com/ferranbt/fastssz v0.0.0-20220103083642-bc5fefefa28b // indirect + github.com/go-co-op/gocron v1.15.0 // indirect github.com/go-ole/go-ole v1.2.1 // indirect github.com/go-stack/stack v1.8.0 // indirect github.com/goccy/go-yaml v1.9.5 // indirect @@ -31,13 +32,20 @@ require ( github.com/huin/goupnp v1.0.3-0.20220313090229-ca81a64b4204 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect + github.com/klauspost/compress v1.14.4 // indirect github.com/klauspost/cpuid/v2 v2.0.11 // indirect github.com/kr/pretty v0.2.1 // indirect github.com/mattn/go-colorable v0.1.12 // indirect github.com/mattn/go-isatty v0.0.14 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect + github.com/minio/highwayhash v1.0.2 // indirect github.com/minio/sha256-simd v1.0.0 // indirect github.com/mitchellh/mapstructure v1.4.3 // indirect + github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a // indirect + github.com/nats-io/nats-server/v2 v2.8.4 // indirect + github.com/nats-io/nats.go v1.16.0 // indirect + github.com/nats-io/nkeys v0.3.0 // indirect + github.com/nats-io/nuid v1.0.1 // indirect github.com/onrik/ethrpc v1.0.0 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.12.1 // indirect @@ -46,6 +54,7 @@ require ( github.com/prometheus/procfs v0.7.3 // indirect github.com/prysmaticlabs/go-bitfield v0.0.0-20210809151128-385d8c5e3fb7 // indirect github.com/r3labs/sse/v2 v2.7.4 // indirect + github.com/robfig/cron/v3 v3.0.1 // indirect github.com/rs/zerolog v1.26.1 // indirect github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect github.com/sirupsen/logrus v1.8.1 // indirect @@ -54,10 +63,11 @@ require ( github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 // indirect github.com/tklauser/go-sysconf v0.3.5 // indirect github.com/tklauser/numcpus v0.2.2 // indirect - golang.org/x/crypto v0.0.0-20211215165025-cf75a172585e // indirect + golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd // indirect golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9 // indirect + golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect google.golang.org/protobuf v1.26.0 // indirect gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect diff --git a/go.sum b/go.sum index 9981509..256ff7a 100644 --- a/go.sum +++ b/go.sum @@ -140,6 +140,8 @@ github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeME github.com/glycerine/go-unsnap-stream v0.0.0-20180323001048-9f0cb55181dd/go.mod h1:/20jfyN9Y5QPEAprSgKAUr+glWDY39ZiUEAYOEv5dsE= github.com/glycerine/goconvey v0.0.0-20190410193231-58a59202ab31/go.mod h1:Ogl1Tioa0aV7gstGFO7KhffUsb9M4ydbEbbxpcEDc24= github.com/go-chi/chi/v5 v5.0.0/go.mod h1:BBug9lr0cqtdAhsu6R4AAdvufI0/XBzAQSsUqJpoZOs= +github.com/go-co-op/gocron v1.15.0 h1:XmiPazahD9aq0/QdK5toCVHfgTXfrZ/s83RpAgzr6SM= +github.com/go-co-op/gocron v1.15.0/go.mod h1:On9zUZTv7FBeuj9D/cdYyAWcPUiLqqAx7nsPHd0EmKM= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= @@ -282,6 +284,8 @@ github.com/karalabe/usb v0.0.2/go.mod h1:Od972xHfMJowv7NGVDiWVxk2zxnWgjLlJzE+F4F github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.4.0/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= +github.com/klauspost/compress v1.14.4 h1:eijASRJcobkVtSt81Olfh7JX43osYLwy5krOJo6YEu4= +github.com/klauspost/compress v1.14.4/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/cpuid v0.0.0-20170728055534-ae7887de9fa5/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.0.11 h1:i2lw1Pm7Yi/4O6XCSyJWqEHI2MDw2FzUK6o/D21xn2A= @@ -329,6 +333,8 @@ github.com/mattn/go-sqlite3 v1.11.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsO github.com/mattn/go-tty v0.0.0-20180907095812-13ff1204f104/go.mod h1:XPvLUNfbS4fJH25nqRHfWLMa1ONC8Amw+mIA639KxkE= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= +github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/minio/sha256-simd v0.1.1/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM= github.com/minio/sha256-simd v1.0.0 h1:v1ta+49hkWZyvaKwrQB8elexRqm6Y0aMLjCNsrYxo6g= github.com/minio/sha256-simd v1.0.0/go.mod h1:OuYzVNI5vcoYIAmbIvHPl3N3jUzVedXbKy5RFepssQM= @@ -348,6 +354,16 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/naoina/go-stringutil v0.1.0/go.mod h1:XJ2SJL9jCtBh+P9q5btrd/Ylo8XwT/h1USek5+NqSA0= github.com/naoina/toml v0.1.2-0.20170918210437-9fafd6967416/go.mod h1:NBIhNtsFMo3G2szEBne+bO4gS192HuIYRqfvOWb4i1E= +github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a h1:lem6QCvxR0Y28gth9P+wV2K/zYUUAkJ+55U8cpS0p5I= +github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= +github.com/nats-io/nats-server/v2 v2.8.4 h1:0jQzze1T9mECg8YZEl8+WYUXb9JKluJfCBriPUtluB4= +github.com/nats-io/nats-server/v2 v2.8.4/go.mod h1:8zZa+Al3WsESfmgSs98Fi06dRWLH5Bnq90m5bKD/eT4= +github.com/nats-io/nats.go v1.16.0 h1:zvLE7fGBQYW6MWaFaRdsgm9qT39PJDQoju+DS8KsO1g= +github.com/nats-io/nats.go v1.16.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= +github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= @@ -405,6 +421,8 @@ github.com/retailnext/hllpp v1.0.1-0.20180308014038-101a6d2f8b52/go.mod h1:RDpi1 github.com/ricochet2200/go-disk-usage/du v0.0.0-20210707232629-ac9918953285 h1:d54EL9l+XteliUfUCGsEwwuk65dmmxX85VXF+9T6+50= github.com/ricochet2200/go-disk-usage/du v0.0.0-20210707232629-ac9918953285/go.mod h1:fxIDly1xtudczrZeOOlfaUvd2OPb2qZAPuWdU2BsBTk= github.com/rjeczalik/notify v0.9.1/go.mod h1:rKwnCoCGeuQnwBtTSPL9Dad03Vh2n40ePRrjvIXnJho= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= github.com/rs/xid v1.3.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= @@ -480,9 +498,12 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= +golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20211215165025-cf75a172585e h1:1SzTfNOXwIS2oWiMF+6qu0OUDKb0dauo6MoDUQyu+yU= golang.org/x/crypto v0.0.0-20211215165025-cf75a172585e/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8= +golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd h1:XcWmESyNjXJMLahc3mqVQJcgSTDxFxhETVlfk9uGc38= +golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -583,6 +604,7 @@ golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -657,6 +679,8 @@ golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxb golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20201208040808-7e3f01d25324/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 h1:GZokNIeuVkl3aZHJchRrr13WCsols02MLUcz1U9is6M= +golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -821,6 +845,8 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/pkg/exporter/consensus/beacon/beacon.go b/pkg/exporter/consensus/beacon/beacon.go new file mode 100644 index 0000000..678a137 --- /dev/null +++ b/pkg/exporter/consensus/beacon/beacon.go @@ -0,0 +1,453 @@ +package beacon + +import ( + "context" + "errors" + "fmt" + "time" + + eth2client "github.com/attestantio/go-eth2-client" + v1 "github.com/attestantio/go-eth2-client/api/v1" + "github.com/attestantio/go-eth2-client/spec" + "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/go-co-op/gocron" + "github.com/nats-io/nats.go" + "github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/api" + "github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/api/types" + "github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/beacon/state" + "github.com/sirupsen/logrus" +) + +type Node interface { + // Lifecycle + // Start starts the node. + Start(ctx context.Context) error + // StartAsync starts the node asynchronously. + StartAsync(ctx context.Context) + + // Getters + // GetEpoch returns the epoch for the given epoch. + GetEpoch(ctx context.Context, epoch phase0.Epoch) (*state.Epoch, error) + // GetSlot returns the slot for the given slot. + GetSlot(ctx context.Context, slot phase0.Slot) (*state.Slot, error) + // GetSpec returns the spec for the node. + GetSpec(ctx context.Context) (*state.Spec, error) + // GetSyncState returns the sync state for the node. + GetSyncState(ctx context.Context) (*v1.SyncState, error) + // GetGenesis returns the genesis for the node. + GetGenesis(ctx context.Context) (*v1.Genesis, error) + // GetNodeVersion returns the node version. + GetNodeVersion(ctx context.Context) (string, error) + + // Subscriptions + // - Proxied Beacon events + // OnEvent is called when a beacon event is received. + OnEvent(ctx context.Context, handler func(ctx context.Context, ev *v1.Event) error) (*nats.Subscription, error) + // OnBlock is called when a block is received. + OnBlock(ctx context.Context, handler func(ctx context.Context, ev *v1.BlockEvent) error) (*nats.Subscription, error) + // OnAttestation is called when an attestation is received. + OnAttestation(ctx context.Context, handler func(ctx context.Context, ev *phase0.Attestation) error) (*nats.Subscription, error) + // OnFinalizedCheckpoint is called when a finalized checkpoint is received. + OnFinalizedCheckpoint(ctx context.Context, handler func(ctx context.Context, ev *v1.FinalizedCheckpointEvent) error) (*nats.Subscription, error) + // OnHead is called when the head is received. + OnHead(ctx context.Context, handler func(ctx context.Context, ev *v1.HeadEvent) error) (*nats.Subscription, error) + // OnChainReOrg is called when a chain reorg is received. + OnChainReOrg(ctx context.Context, handler func(ctx context.Context, ev *v1.ChainReorgEvent) error) (*nats.Subscription, error) + // OnVoluntaryExit is called when a voluntary exit is received. + OnVoluntaryExit(ctx context.Context, handler func(ctx context.Context, ev *phase0.VoluntaryExit) error) (*nats.Subscription, error) + + // - Custom events + // OnReady is called when the node is ready. + OnReady(ctx context.Context, handler func(ctx context.Context, event *ReadyEvent) error) (*nats.Subscription, error) + // OnEpochChanged is called when the current epoch changes. + OnEpochChanged(ctx context.Context, handler func(ctx context.Context, event *EpochChangedEvent) error) (*nats.Subscription, error) + // OnSlotChanged is called when the current slot changes. + OnSlotChanged(ctx context.Context, handler func(ctx context.Context, event *SlotChangedEvent) error) (*nats.Subscription, error) + // OnEpochSlotChanged is called when the current epoch or slot changes. + OnEpochSlotChanged(ctx context.Context, handler func(ctx context.Context, event *EpochSlotChangedEvent) error) (*nats.Subscription, error) + // OnBlockInserted is called when a block is inserted. + OnBlockInserted(ctx context.Context, handler func(ctx context.Context, event *BlockInsertedEvent) error) (*nats.Subscription, error) + // OnSyncStatus is called when the sync status changes. + OnSyncStatus(ctx context.Context, handler func(ctx context.Context, event *SyncStatusEvent) error) (*nats.Subscription, error) + // OnNodeVersionUpdated is called when the node version is updated. + OnNodeVersionUpdated(ctx context.Context, handler func(ctx context.Context, event *NodeVersionUpdatedEvent) error) (*nats.Subscription, error) + // OnPeersUpdated is called when the peers are updated. + OnPeersUpdated(ctx context.Context, handler func(ctx context.Context, event *PeersUpdatedEvent) error) (*nats.Subscription, error) + // OnSpecUpdated is called when the spec is updated. + OnSpecUpdated(ctx context.Context, handler func(ctx context.Context, event *SpecUpdatedEvent) error) (*nats.Subscription, error) + // OnEmptySlot is called when an empty slot is detected. + OnEmptySlot(ctx context.Context, handler func(ctx context.Context, event *EmptySlotEvent) error) (*nats.Subscription, error) +} + +// Node represents an Ethereum beacon node. It computes values based on the spec. +type node struct { + // Helpers + log logrus.FieldLogger + + // Clients + api api.ConsensusClient + client eth2client.Service + broker *nats.EncodedConn + + // Internal data stores + genesis *v1.Genesis + state *state.Container + lastEventTime time.Time + syncing *v1.SyncState + nodeVersion string + peers types.Peers +} + +func NewNode(ctx context.Context, log logrus.FieldLogger, ap api.ConsensusClient, client eth2client.Service, broker *nats.EncodedConn) Node { + return &node{ + log: log.WithField("module", "consensus/beacon"), + api: ap, + client: client, + broker: broker, + + syncing: &v1.SyncState{ + IsSyncing: false, + }, + } +} + +func (n *node) Start(ctx context.Context) error { + if err := n.bootstrap(ctx); err != nil { + return err + } + + if err := n.fetchSyncStatus(ctx); err != nil { + return err + } + + s := gocron.NewScheduler(time.Local) + + if _, err := s.Every("15s").Do(func() { + if err := n.fetchSyncStatus(ctx); err != nil { + n.log.WithError(err).Error("Failed to fetch sync status") + } + }); err != nil { + return err + } + + if _, err := s.Every("15m").Do(func() { + if err := n.fetchNodeVersion(ctx); err != nil { + n.log.WithError(err).Error("Failed to fetch node version") + } + }); err != nil { + return err + } + + if _, err := s.Every("15s").Do(func() { + if err := n.fetchPeers(ctx); err != nil { + n.log.WithError(err).Error("Failed to fetch peers") + } + }); err != nil { + return err + } + + s.StartAsync() + + return nil +} + +func (n *node) StartAsync(ctx context.Context) { + go func() { + if err := n.Start(ctx); err != nil { + n.log.WithError(err).Error("Failed to start beacon node") + } + }() +} + +func (n *node) GetEpoch(ctx context.Context, epoch phase0.Epoch) (*state.Epoch, error) { + if n.state == nil { + return nil, errors.New("state is not initialized") + } + + return n.state.GetEpoch(ctx, epoch) +} + +func (n *node) GetSlot(ctx context.Context, slot phase0.Slot) (*state.Slot, error) { + if n.state == nil { + return nil, errors.New("state is not initialized") + } + + return n.state.GetSlot(ctx, slot) +} + +func (n *node) GetSpec(ctx context.Context) (*state.Spec, error) { + if n.state == nil { + return nil, errors.New("state is not initialized") + } + + sp := n.state.Spec() + + if sp == nil { + return nil, errors.New("spec not yet available") + } + + return sp, nil +} + +func (n *node) GetSyncState(ctx context.Context) (*v1.SyncState, error) { + return n.syncing, nil +} + +func (n *node) GetGenesis(ctx context.Context) (*v1.Genesis, error) { + return n.genesis, nil +} + +func (n *node) GetNodeVersion(ctx context.Context) (string, error) { + return n.nodeVersion, nil +} + +func (n *node) bootstrap(ctx context.Context) error { + if err := n.initializeState(ctx); err != nil { + return err + } + + if err := n.subscribeDownstream(ctx); err != nil { + return err + } + + if err := n.subscribeToSelf(ctx); err != nil { + return err + } + + if err := n.publishReady(ctx); err != nil { + return err + } + + //nolint:errcheck // we dont care if this errors out since it runs indefinitely in a goroutine + go n.ensureBeaconSubscription(ctx) + + return nil +} + +func (n *node) fetchSyncStatus(ctx context.Context) error { + provider, isProvider := n.client.(eth2client.NodeSyncingProvider) + if !isProvider { + return errors.New("client does not implement eth2client.NodeSyncingProvider") + } + + status, err := provider.NodeSyncing(ctx) + if err != nil { + return err + } + + n.syncing = status + + if err := n.publishSyncStatus(ctx, status); err != nil { + return err + } + + return nil +} + +func (n *node) fetchPeers(ctx context.Context) error { + peers, err := n.api.NodePeers(ctx) + if err != nil { + return err + } + + n.peers = peers + + return n.publishPeersUpdated(ctx, peers) +} + +func (n *node) subscribeToSelf(ctx context.Context) error { + // Listen for beacon block events and insert them in to our state + if _, err := n.OnBlock(ctx, func(ctx context.Context, ev *v1.BlockEvent) error { + if n.syncing.IsSyncing { + return nil + } + + start := time.Now() + + // Grab the entire block from the beacon node + block, err := n.getBlock(ctx, fmt.Sprintf("%v", ev.Slot)) + if err != nil { + return err + } + + // Insert the beacon block into the state + if err := n.state.AddBeaconBlock(ctx, block, start); err != nil { + return err + } + + return nil + }); err != nil { + return err + } + + return nil +} + +func (n *node) subscribeDownstream(ctx context.Context) error { + if err := n.state.OnEpochSlotChanged(ctx, n.handleStateEpochSlotChanged); err != nil { + return err + } + + if err := n.state.OnBlockInserted(ctx, n.handleDownstreamBlockInserted); err != nil { + return err + } + + if err := n.state.OnEmptySlot(ctx, n.handleDownstreamEmptySlot); err != nil { + return err + } + + return nil +} + +func (n *node) fetchNodeVersion(ctx context.Context) error { + provider, isProvider := n.client.(eth2client.NodeVersionProvider) + if !isProvider { + return errors.New("client does not implement eth2client.NodeVersionProvider") + } + + version, err := provider.NodeVersion(ctx) + if err != nil { + return err + } + + n.nodeVersion = version + + return n.publishNodeVersionUpdated(ctx, version) +} + +func (n *node) handleDownstreamBlockInserted(ctx context.Context, epoch phase0.Epoch, slot state.Slot) error { + if err := n.publishBlockInserted(ctx, slot.Number()); err != nil { + return err + } + + return nil +} + +func (n *node) handleDownstreamEmptySlot(ctx context.Context, epoch phase0.Epoch, slot state.Slot) error { + if n.syncing.IsSyncing { + return nil + } + + if err := n.publishEmptySlot(ctx, slot.Number()); err != nil { + return err + } + + return nil +} + +func (n *node) handleStateEpochSlotChanged(ctx context.Context, epochNumber phase0.Epoch, slot phase0.Slot) error { + n.log.WithFields(logrus.Fields{ + "epoch": epochNumber, + "slot": slot, + }).Debug("Current epoch/slot changed") + + for i := epochNumber; i < epochNumber+1; i++ { + epoch, err := n.state.GetEpoch(ctx, i) + if err != nil { + return err + } + + if epoch.HaveProposerDuties() { + continue + } + + if n.syncing.IsSyncing { + continue + } + + if err := n.fetchEpochProposerDuties(ctx, i); err != nil { + return err + } + } + + return nil +} + +func (n *node) fetchEpochProposerDuties(ctx context.Context, epoch phase0.Epoch) error { + duties, err := n.getProserDuties(ctx, epoch) + if err != nil { + return err + } + + if err := n.state.SetProposerDuties(ctx, epoch, duties); err != nil { + return err + } + + return nil +} + +func (n *node) initializeState(ctx context.Context) error { + n.log.Info("Initializing beacon state") + + sp, err := n.getSpec(ctx) + if err != nil { + return err + } + + genesis, err := n.fetchGenesis(ctx) + if err != nil { + return err + } + + st := state.NewContainer(ctx, n.log, sp, genesis) + + if err := st.Init(ctx); err != nil { + return err + } + + n.state = &st + + n.log.Info("Beacon state initialized! Ready to serve requests...") + + return nil +} + +func (n *node) getSpec(ctx context.Context) (*state.Spec, error) { + provider, isProvider := n.client.(eth2client.SpecProvider) + if !isProvider { + return nil, errors.New("client does not implement eth2client.SpecProvider") + } + + data, err := provider.Spec(ctx) + if err != nil { + return nil, err + } + + sp := state.NewSpec(data) + + if err := n.publishSpecUpdated(ctx, &sp); err != nil { + return nil, err + } + + return &sp, nil +} + +func (n *node) getProserDuties(ctx context.Context, epoch phase0.Epoch) ([]*v1.ProposerDuty, error) { + n.log.WithField("epoch", epoch).Debug("Fetching proposer duties") + + provider, isProvider := n.client.(eth2client.ProposerDutiesProvider) + if !isProvider { + return nil, errors.New("client does not implement eth2client.ProposerDutiesProvider") + } + + duties, err := provider.ProposerDuties(ctx, epoch, nil) + if err != nil { + return nil, err + } + + return duties, nil +} + +func (n *node) getBlock(ctx context.Context, blockID string) (*spec.VersionedSignedBeaconBlock, error) { + provider, isProvider := n.client.(eth2client.SignedBeaconBlockProvider) + if !isProvider { + return nil, errors.New("client does not implement eth2client.SignedBeaconBlockProvider") + } + + signedBeaconBlock, err := provider.SignedBeaconBlock(ctx, blockID) + if err != nil { + return nil, err + } + + return signedBeaconBlock, nil +} diff --git a/pkg/exporter/consensus/beacon/event.go b/pkg/exporter/consensus/beacon/event.go new file mode 100644 index 0000000..5f50586 --- /dev/null +++ b/pkg/exporter/consensus/beacon/event.go @@ -0,0 +1,72 @@ +package beacon + +import ( + v1 "github.com/attestantio/go-eth2-client/api/v1" + "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/api/types" + "github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/beacon/state" +) + +const ( + // Custom events derived from our pseudo beacon node + topicEpochChanged = "epoch_changed" + topicSlotChanged = "slot_changed" + topicEpochSlotChanged = "epoch_slot_changed" + topicBlockInserted = "block_inserted" + topicReady = "ready" + topicSyncStatus = "sync_status" + topicNodeVersionUpdated = "node_version_updated" + topicPeersUpdated = "peers_updated" + topicSpecUpdated = "spec_updated" + topicEmptySlot = "slot_empty" + + // Official beacon events that are proxied + topicAttestation = "attestation" + topicBlock = "block" + topicChainReorg = "chain_reorg" + topicFinalizedCheckpoint = "finalized_checkpoint" + topicHead = "head" + topicVoluntaryExit = "voluntary_exit" + topicContributionAndProof = "contribution_and_proof" + topicEvent = "raw_event" +) + +type EpochChangedEvent struct { + Epoch phase0.Epoch +} + +type SlotChangedEvent struct { + Slot phase0.Slot +} + +type EpochSlotChangedEvent struct { + Epoch phase0.Epoch + Slot phase0.Slot +} + +type BlockInsertedEvent struct { + Slot phase0.Slot +} + +type ReadyEvent struct { +} + +type SyncStatusEvent struct { + State *v1.SyncState +} + +type NodeVersionUpdatedEvent struct { + Version string +} + +type PeersUpdatedEvent struct { + Peers types.Peers +} + +type SpecUpdatedEvent struct { + Spec *state.Spec +} + +type EmptySlotEvent struct { + Slot phase0.Slot +} diff --git a/pkg/exporter/consensus/beacon/genesis.go b/pkg/exporter/consensus/beacon/genesis.go new file mode 100644 index 0000000..c38fe75 --- /dev/null +++ b/pkg/exporter/consensus/beacon/genesis.go @@ -0,0 +1,25 @@ +package beacon + +import ( + "context" + "errors" + + eth2client "github.com/attestantio/go-eth2-client" + v1 "github.com/attestantio/go-eth2-client/api/v1" +) + +func (n *node) fetchGenesis(ctx context.Context) (*v1.Genesis, error) { + provider, isProvider := n.client.(eth2client.GenesisProvider) + if !isProvider { + return nil, errors.New("client does not implement eth2client.GenesisProvider") + } + + genesis, err := provider.Genesis(ctx) + if err != nil { + return nil, err + } + + n.genesis = genesis + + return genesis, nil +} diff --git a/pkg/exporter/consensus/beacon/publisher.go b/pkg/exporter/consensus/beacon/publisher.go new file mode 100644 index 0000000..27fb298 --- /dev/null +++ b/pkg/exporter/consensus/beacon/publisher.go @@ -0,0 +1,99 @@ +package beacon + +import ( + "context" + + v1 "github.com/attestantio/go-eth2-client/api/v1" + "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/api/types" + "github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/beacon/state" +) + +// Official beacon events that are proxied +func (n *node) publishBlock(ctx context.Context, event *v1.BlockEvent) error { + return n.broker.Publish(topicBlock, event) +} + +func (n *node) publishAttestation(ctx context.Context, event *phase0.Attestation) error { + return n.broker.Publish(topicAttestation, event) +} + +func (n *node) publishChainReOrg(ctx context.Context, event *v1.ChainReorgEvent) error { + return n.broker.Publish(topicChainReorg, event) +} + +func (n *node) publishFinalizedCheckpoint(ctx context.Context, event *v1.FinalizedCheckpointEvent) error { + return n.broker.Publish(topicFinalizedCheckpoint, event) +} + +func (n *node) publishHead(ctx context.Context, event *v1.HeadEvent) error { + return n.broker.Publish(topicHead, event) +} + +func (n *node) publishVoluntaryExit(ctx context.Context, event *phase0.VoluntaryExit) error { + return n.broker.Publish(topicVoluntaryExit, event) +} + +func (n *node) publishEvent(ctx context.Context, event *v1.Event) error { + return n.broker.Publish(topicEvent, event) +} + +// Custom Events derived from our pseudo beacon node +func (n *node) publishReady(ctx context.Context) error { + return n.broker.Publish(topicReady, nil) +} + +func (n *node) publishEpochChanged(ctx context.Context, epoch phase0.Epoch) error { + return n.broker.Publish(topicEpochChanged, &EpochChangedEvent{ + Epoch: epoch, + }) +} + +func (n *node) publishSlotChanged(ctx context.Context, slot phase0.Slot) error { + return n.broker.Publish(topicSlotChanged, &SlotChangedEvent{ + Slot: slot, + }) +} + +func (n *node) publishEpochSlotChanged(ctx context.Context, epoch phase0.Epoch, slot phase0.Slot) error { + return n.broker.Publish(topicEpochSlotChanged, &EpochSlotChangedEvent{ + Epoch: epoch, + Slot: slot, + }) +} + +func (n *node) publishBlockInserted(ctx context.Context, slot phase0.Slot) error { + return n.broker.Publish(topicBlockInserted, &BlockInsertedEvent{ + Slot: slot, + }) +} + +func (n *node) publishSyncStatus(ctx context.Context, st *v1.SyncState) error { + return n.broker.Publish(topicSyncStatus, &SyncStatusEvent{ + State: st, + }) +} + +func (n *node) publishNodeVersionUpdated(ctx context.Context, version string) error { + return n.broker.Publish(topicNodeVersionUpdated, &NodeVersionUpdatedEvent{ + Version: version, + }) +} + +func (n *node) publishPeersUpdated(ctx context.Context, peers types.Peers) error { + return n.broker.Publish(topicPeersUpdated, &PeersUpdatedEvent{ + Peers: peers, + }) +} + +func (n *node) publishSpecUpdated(ctx context.Context, spec *state.Spec) error { + return n.broker.Publish(topicSpecUpdated, &SpecUpdatedEvent{ + Spec: spec, + }) +} + +func (n *node) publishEmptySlot(ctx context.Context, slot phase0.Slot) error { + return n.broker.Publish(topicEmptySlot, &EmptySlotEvent{ + Slot: slot, + }) +} diff --git a/pkg/exporter/consensus/beacon/state/block.go b/pkg/exporter/consensus/beacon/state/block.go new file mode 100644 index 0000000..2462bae --- /dev/null +++ b/pkg/exporter/consensus/beacon/state/block.go @@ -0,0 +1,13 @@ +package state + +import ( + "time" + + "github.com/attestantio/go-eth2-client/spec" +) + +// TimedBlock is a block with a timestamp. +type TimedBlock struct { + Block *spec.VersionedSignedBeaconBlock + SeenAt time.Time +} diff --git a/pkg/exporter/consensus/beacon/state/epoch.go b/pkg/exporter/consensus/beacon/state/epoch.go new file mode 100644 index 0000000..cc48a9f --- /dev/null +++ b/pkg/exporter/consensus/beacon/state/epoch.go @@ -0,0 +1,114 @@ +package state + +import ( + "errors" + "time" + + v1 "github.com/attestantio/go-eth2-client/api/v1" + "github.com/attestantio/go-eth2-client/spec" + "github.com/attestantio/go-eth2-client/spec/phase0" +) + +type Epoch struct { + slots Slots + Number phase0.Epoch + FirstSlot phase0.Slot + LastSlot phase0.Slot + StartTime time.Time + EndTime time.Time + Duration time.Duration + bundle BlockTimeCalculatorBundle + + haveProposerDuties bool +} + +func NewEpoch(epochNumber phase0.Epoch, slotsPerEpoch phase0.Slot, bundle BlockTimeCalculatorBundle) Epoch { + firstSlot := uint64(epochNumber) * uint64(slotsPerEpoch) + lastSlot := (firstSlot + uint64(slotsPerEpoch)) - 1 + + e := Epoch{ + slots: NewSlots(bundle), + + Number: epochNumber, + FirstSlot: phase0.Slot(firstSlot), + LastSlot: phase0.Slot(lastSlot), + StartTime: bundle.Genesis.GenesisTime.Add(time.Duration(firstSlot) * bundle.SecondsPerSlot), + EndTime: bundle.Genesis.GenesisTime.Add((time.Duration(lastSlot) * bundle.SecondsPerSlot)).Add(bundle.SecondsPerSlot), + Duration: bundle.SecondsPerSlot * time.Duration(slotsPerEpoch), + bundle: bundle, + + haveProposerDuties: false, + } + + return e +} + +func (e *Epoch) AddBlock(block *spec.VersionedSignedBeaconBlock, seenAt time.Time) error { + if block == nil { + return errors.New("block is nil") + } + + slotNumber, err := block.Slot() + if err != nil { + return err + } + + slot, err := e.slots.Get(slotNumber) + if err != nil { + return err + } + + return slot.AddBlock(&TimedBlock{ + Block: block, + SeenAt: seenAt, + }) +} + +func (e *Epoch) GetSlotProposer(slotNumber phase0.Slot) (*v1.ProposerDuty, error) { + slot, err := e.slots.Get(slotNumber) + if err != nil { + return nil, err + } + + return slot.ProposerDuty() +} + +func (e *Epoch) SetProposerDuties(duties []*v1.ProposerDuty) error { + for _, duty := range duties { + slot, err := e.slots.Get(duty.Slot) + if err != nil { + return err + } + + if err := slot.SetProposerDuty(duty); err != nil { + return err + } + } + + e.haveProposerDuties = true + + return nil +} + +func (e *Epoch) HaveProposerDuties() bool { + return e.haveProposerDuties +} + +func (e *Epoch) GetSlot(slotNumber phase0.Slot) (*Slot, error) { + return e.slots.Get(slotNumber) +} + +func (e *Epoch) InitializeSlots() error { + start := uint64(e.FirstSlot) + end := uint64(e.LastSlot) + + for i := start; i <= end; i++ { + slot := NewSlot(phase0.Slot(i), e.bundle) + + if err := e.slots.Add(&slot); err != nil { + return err + } + } + + return nil +} diff --git a/pkg/exporter/consensus/beacon/state/epochs.go b/pkg/exporter/consensus/beacon/state/epochs.go new file mode 100644 index 0000000..e7b4ba6 --- /dev/null +++ b/pkg/exporter/consensus/beacon/state/epochs.go @@ -0,0 +1,86 @@ +package state + +import ( + "errors" + "sync" + + v1 "github.com/attestantio/go-eth2-client/api/v1" + "github.com/attestantio/go-eth2-client/spec/phase0" +) + +type Epochs struct { + spec *Spec + state map[phase0.Epoch]*Epoch + genesis *v1.Genesis + bundle BlockTimeCalculatorBundle + mu *sync.Mutex +} + +func NewEpochs(spec *Spec, genesis *v1.Genesis) Epochs { + return Epochs{ + spec: spec, + state: make(map[phase0.Epoch]*Epoch), + genesis: genesis, + bundle: BlockTimeCalculatorBundle{ + Genesis: genesis, + SecondsPerSlot: spec.SecondsPerSlot, + }, + mu: &sync.Mutex{}, + } +} + +func (e *Epochs) GetEpoch(epoch phase0.Epoch) (*Epoch, error) { + e.mu.Lock() + defer e.mu.Unlock() + + if e.state[epoch] == nil { + return nil, errors.New("epoch not found") + } + + return e.state[epoch], nil +} + +func (e *Epochs) Exists(number phase0.Epoch) bool { + e.mu.Lock() + defer e.mu.Unlock() + + _, ok := e.state[number] + + return ok +} + +func (e *Epochs) NewInitializedEpoch(number phase0.Epoch) (*Epoch, error) { + epoch := NewEpoch(number, e.spec.SlotsPerEpoch, e.bundle) + + if err := epoch.InitializeSlots(); err != nil { + return nil, err + } + + if err := e.AddEpoch(number, &epoch); err != nil { + return nil, err + } + + return &epoch, nil +} + +func (e *Epochs) AddEpoch(number phase0.Epoch, epoch *Epoch) error { + e.mu.Lock() + defer e.mu.Unlock() + + if epoch == nil { + return errors.New("epoch is nil") + } + + e.state[number] = epoch + + return nil +} + +func (e *Epochs) RemoveEpoch(number phase0.Epoch) error { + e.mu.Lock() + defer e.mu.Unlock() + + delete(e.state, number) + + return nil +} diff --git a/pkg/exporter/consensus/beacon/state/fork_epoch.go b/pkg/exporter/consensus/beacon/state/fork_epoch.go new file mode 100644 index 0000000..7220715 --- /dev/null +++ b/pkg/exporter/consensus/beacon/state/fork_epoch.go @@ -0,0 +1,66 @@ +package state + +import ( + "errors" + + "github.com/attestantio/go-eth2-client/spec/phase0" +) + +// ForkEpoch is a beacon fork that activates at a specific epoch. +type ForkEpoch struct { + Epoch phase0.Epoch + Name string +} + +// Active returns true if the fork is active at the given slot. +func (f *ForkEpoch) Active(slot, slotsPerEpoch phase0.Slot) bool { + return phase0.Epoch(int(slot)/int(slotsPerEpoch)) > f.Epoch +} + +// ForkEpochs is a list of forks that activate at specific epochs. +type ForkEpochs []ForkEpoch + +// Active returns a list of forks that are active at the given slot. +func (f *ForkEpochs) Active(slot, slotsPerEpoch phase0.Slot) []ForkEpoch { + activated := []ForkEpoch{} + + for _, fork := range *f { + if fork.Active(slot, slotsPerEpoch) { + activated = append(activated, fork) + } + } + + return activated +} + +// CurrentFork returns the current fork at the given slot. +func (f *ForkEpochs) Scheduled(slot, slotsPerEpoch phase0.Slot) []ForkEpoch { + scheduled := []ForkEpoch{} + + for _, fork := range *f { + if !fork.Active(slot, slotsPerEpoch) { + scheduled = append(scheduled, fork) + } + } + + return scheduled +} + +// CurrentFork returns the current fork at the given slot. +func (f *ForkEpochs) CurrentFork(slot, slotsPerEpoch phase0.Slot) (ForkEpoch, error) { + largest := ForkEpoch{ + Epoch: 0, + } + + for _, fork := range f.Active(slot, slotsPerEpoch) { + if fork.Active(slot, slotsPerEpoch) && fork.Epoch > largest.Epoch { + largest = fork + } + } + + if largest.Epoch == 0 { + return ForkEpoch{}, errors.New("no active fork") + } + + return largest, nil +} diff --git a/pkg/exporter/consensus/beacon/state/publisher.go b/pkg/exporter/consensus/beacon/state/publisher.go new file mode 100644 index 0000000..ec65aa5 --- /dev/null +++ b/pkg/exporter/consensus/beacon/state/publisher.go @@ -0,0 +1,43 @@ +package state + +import ( + "context" + + "github.com/attestantio/go-eth2-client/spec/phase0" +) + +func (c *Container) handleCallbackError(err error, topic string) { + if err != nil { + c.log.WithError(err).WithField("topic", topic).Error("Receieved error from subscriber callback") + } +} + +func (c *Container) publishEpochChanged(ctx context.Context, epoch phase0.Epoch) { + for _, cb := range c.callbacksEpochChanged { + c.handleCallbackError(cb(ctx, epoch), "epochs_changed") + } +} + +func (c *Container) publishSlotChanged(ctx context.Context, slot phase0.Slot) { + for _, cb := range c.callbacksSlotChanged { + c.handleCallbackError(cb(ctx, slot), "slots_changed") + } +} + +func (c *Container) publishEpochSlotChanged(ctx context.Context, epoch phase0.Epoch, slot phase0.Slot) { + for _, cb := range c.callbacksEpochSlotChanged { + c.handleCallbackError(cb(ctx, epoch, slot), "epoch_slots_changed") + } +} + +func (c *Container) publishBlockInserted(ctx context.Context, epoch phase0.Epoch, slot Slot) { + for _, cb := range c.callbacksBlockInserted { + c.handleCallbackError(cb(ctx, epoch, slot), "block_inserted") + } +} + +func (c *Container) publishEmptySlot(ctx context.Context, epoch phase0.Epoch, slot Slot) { + for _, cb := range c.callbacksEmptySlot { + c.handleCallbackError(cb(ctx, epoch, slot), "empty_slot") + } +} diff --git a/pkg/exporter/consensus/beacon/state/slot.go b/pkg/exporter/consensus/beacon/state/slot.go new file mode 100644 index 0000000..7622b6a --- /dev/null +++ b/pkg/exporter/consensus/beacon/state/slot.go @@ -0,0 +1,111 @@ +package state + +import ( + "errors" + "sync" + "time" + + v1 "github.com/attestantio/go-eth2-client/api/v1" + "github.com/attestantio/go-eth2-client/spec/phase0" +) + +// Slot is a slot in the beacon chain. +type Slot struct { + block *TimedBlock + proposerDuty *v1.ProposerDuty + number phase0.Slot + bundle BlockTimeCalculatorBundle + mu *sync.Mutex +} + +// NewSlot returns a new slot. +func NewSlot(number phase0.Slot, bundle BlockTimeCalculatorBundle) Slot { + return Slot{ + block: nil, + proposerDuty: nil, + number: number, + mu: &sync.Mutex{}, + bundle: bundle, + } +} + +// Number returns the slot number. +func (m *Slot) Number() phase0.Slot { + return m.number +} + +// Block returns the block for the slot (if it exists). +func (m *Slot) Block() (*TimedBlock, error) { + m.mu.Lock() + defer m.mu.Unlock() + + if m.block == nil { + return nil, errors.New("block does not exist") + } + + return m.block, nil +} + +// AddBlock adds a block to the slot. +func (m *Slot) AddBlock(timedBlock *TimedBlock) error { + m.mu.Lock() + defer m.mu.Unlock() + + if timedBlock == nil { + return errors.New("timed_block is nil") + } + + if timedBlock.Block == nil { + return errors.New("block is nil") + } + + slot, err := timedBlock.Block.Slot() + if err != nil { + return err + } + + if slot != m.number { + return errors.New("block slot does not match slot") + } + + m.block = timedBlock + + return nil +} + +// ProposerDelay calculates the amount of time it took for the proposer to publish the block. +func (m *Slot) ProposerDelay() (time.Duration, error) { + if m.block == nil { + return 0, errors.New("block does not exist") + } + + // Calculate the proposer delay for the block. + expected := m.bundle.Genesis.GenesisTime.Add(time.Duration(uint64(m.number)) * m.bundle.SecondsPerSlot) + delay := m.block.SeenAt.Sub(expected) + + return delay, nil +} + +// ProposerDuty returns the proposer duty for the slot (if it exists). +func (m *Slot) ProposerDuty() (*v1.ProposerDuty, error) { + if m.proposerDuty == nil { + return nil, errors.New("proposer duty does not exist") + } + + return m.proposerDuty, nil +} + +// SetProposerDuty sets the proposer duty for the slot. +func (m *Slot) SetProposerDuty(proposerDuty *v1.ProposerDuty) error { + if proposerDuty.Slot != m.number { + return errors.New("proposer duty slot does not match slot") + } + + m.proposerDuty = proposerDuty + + return nil +} + +func (m *Slot) MissingBlock() bool { + return m.block == nil +} diff --git a/pkg/exporter/consensus/beacon/state/slots.go b/pkg/exporter/consensus/beacon/state/slots.go new file mode 100644 index 0000000..6eb0a90 --- /dev/null +++ b/pkg/exporter/consensus/beacon/state/slots.go @@ -0,0 +1,60 @@ +package state + +import ( + "errors" + "sync" + + "github.com/attestantio/go-eth2-client/spec/phase0" +) + +// Slots is a collection of slots. +type Slots struct { + state map[phase0.Slot]*Slot + bundle BlockTimeCalculatorBundle + mu *sync.Mutex +} + +// NewSlots returns a new slots instance. +func NewSlots(bundle BlockTimeCalculatorBundle) Slots { + return Slots{ + state: make(map[phase0.Slot]*Slot), + mu: &sync.Mutex{}, + bundle: bundle, + } +} + +// Add adds a slot to the collection. +func (m *Slots) Add(slot *Slot) error { + m.mu.Lock() + defer m.mu.Unlock() + + if slot == nil { + return errors.New("slot is nil") + } + + m.state[slot.Number()] = slot + + return nil +} + +// Get returns a slot from the collection. +func (m *Slots) Get(slot phase0.Slot) (*Slot, error) { + m.mu.Lock() + defer m.mu.Unlock() + + if m.state[slot] == nil { + return nil, errors.New("slot does not exist") + } + + return m.state[slot], nil +} + +// Delete deletes a slot from the collection. +func (m *Slots) Delete(slot phase0.Slot) error { + m.mu.Lock() + defer m.mu.Unlock() + + delete(m.state, slot) + + return nil +} diff --git a/pkg/exporter/consensus/beacon/state/spec.go b/pkg/exporter/consensus/beacon/state/spec.go new file mode 100644 index 0000000..4cfbc0a --- /dev/null +++ b/pkg/exporter/consensus/beacon/state/spec.go @@ -0,0 +1,166 @@ +package state + +import ( + "fmt" + "math/big" + "strings" + "time" + + "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/spf13/cast" +) + +// Spec represents the state of the spec. +type Spec struct { + PresetBase string + ConfigName string + + DepositChainID uint64 + + SafeSlotsToUpdateJustified phase0.Slot + SlotsPerEpoch phase0.Slot + + EpochsPerSyncCommitteePeriod phase0.Epoch + MinSyncCommitteeParticipants uint64 + TargetCommitteeSize uint64 + SyncCommitteeSize uint64 + + TerminalBlockHashActivationEpoch phase0.Epoch + TerminalTotalDifficulty big.Int + + MaxValidatorsPerCommittee uint64 + BaseRewardFactor uint64 + EffectiveBalanceIncrement phase0.Gwei + MaxEffectiveBalance phase0.Gwei + MinDepositAmount phase0.Gwei + MaxAttestations uint64 + + SecondsPerEth1Block time.Duration + GenesisDelay time.Duration + SecondsPerSlot time.Duration + MaxDeposits uint64 + MinGenesisActiveValidatorCount uint64 + Eth1FollowDistance uint64 + + ForkEpochs ForkEpochs +} + +// NewSpec creates a new spec instance. +func NewSpec(data map[string]interface{}) Spec { + spec := Spec{ + ForkEpochs: ForkEpochs{}, + } + + if safeSlotsToUpdateJustified, exists := data["SAFE_SLOTS_TO_UPDATE_JUSTIFIED"]; exists { + spec.SafeSlotsToUpdateJustified = phase0.Slot(cast.ToUint64(safeSlotsToUpdateJustified)) + } + + if depositChainID, exists := data["DEPOSIT_CHAIN_ID"]; exists { + spec.DepositChainID = cast.ToUint64(depositChainID) + } + + if configName, exists := data["CONFIG_NAME"]; exists { + spec.ConfigName = cast.ToString(configName) + } + + if maxValidatorsPerCommittee, exists := data["MAX_VALIDATORS_PER_COMMITTEE"]; exists { + spec.MaxValidatorsPerCommittee = cast.ToUint64(maxValidatorsPerCommittee) + } + + if secondsPerEth1Block, exists := data["SECONDS_PER_ETH1_BLOCK"]; exists { + spec.SecondsPerEth1Block = cast.ToDuration(secondsPerEth1Block) + } + + if baseRewardFactor, exists := data["BASE_REWARD_FACTOR"]; exists { + spec.BaseRewardFactor = cast.ToUint64(baseRewardFactor) + } + + if epochsPerSyncComitteePeriod, exists := data["EPOCHS_PER_SYNC_COMMITTEE_PERIOD"]; exists { + spec.EpochsPerSyncCommitteePeriod = phase0.Epoch(cast.ToUint64(epochsPerSyncComitteePeriod)) + } + + if effectiveBalanceIncrement, exists := data["EFFECTIVE_BALANCE_INCREMENT"]; exists { + spec.EffectiveBalanceIncrement = phase0.Gwei(cast.ToUint64(effectiveBalanceIncrement)) + } + + if maxAttestations, exists := data["MAX_ATTESTATIONS"]; exists { + spec.MaxAttestations = cast.ToUint64(maxAttestations) + } + + if minSyncCommitteeParticipants, exists := data["MIN_SYNC_COMMITTEE_PARTICIPANTS"]; exists { + spec.MinSyncCommitteeParticipants = cast.ToUint64(minSyncCommitteeParticipants) + } + + if genesisDelay, exists := data["GENESIS_DELAY"]; exists { + spec.GenesisDelay = cast.ToDuration(genesisDelay) + } + + if secondsPerSlot, exists := data["SECONDS_PER_SLOT"]; exists { + spec.SecondsPerSlot = cast.ToDuration(secondsPerSlot) + } + + if maxEffectiveBalance, exists := data["MAX_EFFECTIVE_BALANCE"]; exists { + spec.MaxEffectiveBalance = phase0.Gwei(cast.ToUint64(maxEffectiveBalance)) + } + + if terminalTotalDifficulty, exists := data["TERMINAL_TOTAL_DIFFICULTY"]; exists { + ttd := cast.ToString(fmt.Sprintf("%v", terminalTotalDifficulty)) + + casted, _ := (*big.NewInt(0)).SetString(ttd, 10) + spec.TerminalTotalDifficulty = *casted + } + + if maxDeposits, exists := data["MAX_DEPOSITS"]; exists { + spec.MaxDeposits = cast.ToUint64(maxDeposits) + } + + if minGenesisActiveValidatorCount, exists := data["MIN_GENESIS_ACTIVE_VALIDATOR_COUNT"]; exists { + spec.MinGenesisActiveValidatorCount = cast.ToUint64(minGenesisActiveValidatorCount) + } + + if targetCommitteeSize, exists := data["TARGET_COMMITTEE_SIZE"]; exists { + spec.TargetCommitteeSize = cast.ToUint64(targetCommitteeSize) + } + + if syncCommitteeSize, exists := data["SYNC_COMMITTEE_SIZE"]; exists { + spec.SyncCommitteeSize = cast.ToUint64(syncCommitteeSize) + } + + if eth1FollowDistance, exists := data["ETH1_FOLLOW_DISTANCE"]; exists { + spec.Eth1FollowDistance = cast.ToUint64(eth1FollowDistance) + } + + if terminalBlockHashActivationEpoch, exists := data["TERMINAL_BLOCK_HASH_ACTIVATION_EPOCH"]; exists { + spec.TerminalBlockHashActivationEpoch = phase0.Epoch(cast.ToUint64(terminalBlockHashActivationEpoch)) + } + + if minDepositAmount, exists := data["MIN_DEPOSIT_AMOUNT"]; exists { + spec.MinDepositAmount = phase0.Gwei(cast.ToUint64(minDepositAmount)) + } + + if slotsPerEpoch, exists := data["SLOTS_PER_EPOCH"]; exists { + spec.SlotsPerEpoch = phase0.Slot(cast.ToUint64(slotsPerEpoch)) + } + + if presetBase, exists := data["PRESET_BASE"]; exists { + spec.PresetBase = cast.ToString(presetBase) + } + + for k, v := range data { + if strings.Contains(k, "_FORK_EPOCH") { + forkName := strings.ReplaceAll(k, "_FORK_EPOCH", "") + + spec.ForkEpochs = append(spec.ForkEpochs, ForkEpoch{ + Epoch: phase0.Epoch(cast.ToUint64(v)), + Name: forkName, + }) + } + } + + return spec +} + +// Validate performs basic validation of the spec. +func (s *Spec) Validate() error { + return nil +} diff --git a/pkg/exporter/consensus/beacon/state/state.go b/pkg/exporter/consensus/beacon/state/state.go new file mode 100644 index 0000000..6588144 --- /dev/null +++ b/pkg/exporter/consensus/beacon/state/state.go @@ -0,0 +1,335 @@ +package state + +import ( + "context" + "errors" + "fmt" + "time" + + v1 "github.com/attestantio/go-eth2-client/api/v1" + "github.com/attestantio/go-eth2-client/spec" + "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/sirupsen/logrus" +) + +// Container is the state container. +type Container struct { + log logrus.FieldLogger + spec *Spec + genesis *v1.Genesis + epochs Epochs + + currentEpoch phase0.Epoch + currentSlot phase0.Slot + + startedAt time.Time + + callbacksEpochChanged []func(ctx context.Context, epoch phase0.Epoch) error + callbacksSlotChanged []func(ctx context.Context, slot phase0.Slot) error + callbacksEpochSlotChanged []func(ctx context.Context, epoch phase0.Epoch, slot phase0.Slot) error + callbacksBlockInserted []func(ctx context.Context, epoch phase0.Epoch, slot Slot) error + callbacksEmptySlot []func(ctx context.Context, epoch phase0.Epoch, slot Slot) error +} + +const ( + // SurroundingEpochDistance is the number of epochs to create around the current epoch. + SurroundingEpochDistance = 1 +) + +// NewContainer creates a new state container instance +func NewContainer(ctx context.Context, log logrus.FieldLogger, sp *Spec, genesis *v1.Genesis) Container { + return Container{ + log: log.WithField("sub_module", "state"), + spec: sp, + + genesis: genesis, + + currentEpoch: 0, + currentSlot: 0, + + startedAt: time.Now(), + + epochs: NewEpochs(sp, genesis), + } +} + +var ( + ErrSpecNotInitialized = errors.New("spec not initialized") + ErrGenesisNotFetched = errors.New("genesis not fetched") +) + +// Init initializes the state container. +func (c *Container) Init(ctx context.Context) error { + if err := c.hydrateEpochs(ctx); err != nil { + return err + } + + go c.ticker(ctx) + + //nolint:errcheck // dont care about an error here. + go c.currentSlotLoop(ctx) + + return nil +} + +// Spec returns the spec for the state container. +func (c *Container) Spec() *Spec { + return c.spec +} + +func (c *Container) ticker(ctx context.Context) { + c.tick(ctx) + + for { + select { + case <-ctx.Done(): + return + case <-time.After(time.Second * 5): + c.tick(ctx) + } + } +} + +func (c *Container) currentSlotLoop(ctx context.Context) error { + for { + currentSlot := c.currentSlot + + nextSlotStartsAt := c.genesis.GenesisTime.Add(c.spec.SecondsPerSlot * time.Duration(currentSlot+1)) + + select { + case <-ctx.Done(): + return nil + case <-time.After(time.Until(nextSlotStartsAt)): + if err := c.checkForNewCurrentEpochAndSlot(ctx); err != nil { + return err + } + } + } +} + +func (c *Container) tick(ctx context.Context) { + if err := c.hydrateEpochs(ctx); err != nil { + c.log.WithError(err).Error("Failed to hydrate epochs") + } +} + +// AddBeaconBlock adds a beacon block to the state container. +func (c *Container) AddBeaconBlock(ctx context.Context, beaconBlock *spec.VersionedSignedBeaconBlock, seenAt time.Time) error { + if beaconBlock == nil { + return errors.New("beacon block is nil") + } + + // Calculate the epoch + slotNumber, err := beaconBlock.Slot() + if err != nil { + return err + } + + epochNumber := c.calculateEpochFromSlot(slotNumber) + + if exists := c.epochs.Exists(epochNumber); !exists { + return fmt.Errorf("epoch %d does not exist", epochNumber) + } + + // Get the epoch + epoch, err := c.epochs.GetEpoch(epochNumber) + if err != nil { + return err + } + + // Insert the block + //nolint:gocritic // false positive + if err = epoch.AddBlock(beaconBlock, seenAt); err != nil { + return err + } + + slot, err := epoch.GetSlot(slotNumber) + if err != nil { + return err + } + + delay, err := slot.ProposerDelay() + if err != nil { + return err + } + + proposer := "unknown" + + proposerDuty, err := slot.ProposerDuty() + if err == nil { + proposer = fmt.Sprintf("%v", proposerDuty.ValidatorIndex) + } else { + c.log.WithError(err).WithField("slot", slot).Warn("Failed to get slot proposer") + } + + c.log.WithFields(logrus.Fields{ + "epoch": epochNumber, + "slot": slotNumber, + "proposer_delay": delay.String(), + "proposer_index": proposer, + }).Debug("Inserted beacon block") + + c.publishBlockInserted(ctx, epochNumber, *slot) + + return nil +} + +func (c *Container) hydrateEpochs(ctx context.Context) error { + epoch := c.currentEpoch + + // Ensure the state has +-SurroundingEpochDistance epochs created. + for i := epoch - SurroundingEpochDistance; i <= epoch+SurroundingEpochDistance; i++ { + if _, err := c.epochs.GetEpoch(i); err != nil { + if _, err := c.createEpoch(ctx, i); err != nil { + return err + } + } + } + + return nil +} + +func (c *Container) getCurrentEpochAndSlot() (phase0.Epoch, phase0.Slot, error) { + if c.spec == nil { + return 0, 0, ErrSpecNotInitialized + } + + if c.genesis == nil { + return 0, 0, ErrGenesisNotFetched + } + + if err := c.spec.Validate(); err != nil { + return 0, 0, err + } + + // Calculate the current epoch based on genesis time. + genesis := c.genesis.GenesisTime + + currentSlot := phase0.Slot(time.Since(genesis).Seconds() / c.spec.SecondsPerSlot.Seconds()) + currentEpoch := phase0.Epoch(currentSlot / c.spec.SlotsPerEpoch) + + return currentEpoch, currentSlot, nil +} + +func (c *Container) SetProposerDuties(ctx context.Context, epochNumber phase0.Epoch, duties []*v1.ProposerDuty) error { + epoch, err := c.epochs.GetEpoch(epochNumber) + if err != nil { + return err + } + + if err := epoch.SetProposerDuties(duties); err != nil { + return err + } + + return nil +} + +func (c *Container) createEpoch(ctx context.Context, epochNumber phase0.Epoch) (*Epoch, error) { + if _, err := c.epochs.GetEpoch(epochNumber); err == nil { + return nil, fmt.Errorf("epoch %d already exists", epochNumber) + } + + epoch, err := c.epochs.NewInitializedEpoch(epochNumber) + if err != nil { + return nil, err + } + + c.log.WithField("epoch", epochNumber).Debug("Created new epoch") + + return epoch, nil +} + +func (c *Container) checkForNewCurrentEpochAndSlot(ctx context.Context) error { + epoch, slot, err := c.getCurrentEpochAndSlot() + if err != nil { + return err + } + + epochChanged := false + + if epoch != c.currentEpoch { + c.currentEpoch = epoch + + epochChanged = true + + if err := c.hydrateEpochs(ctx); err != nil { + return err + } + + // Notify the listeners of the new epoch. + go c.publishEpochChanged(ctx, epoch) + + // // Delete old epochs + previousEpoch := epoch - 5 + if err := c.DeleteEpoch(ctx, previousEpoch); err != nil { + return err + } + } + + slotChanged := false + + if slot != c.currentSlot { + previousSlot := c.currentSlot + + c.currentSlot = slot + + slotChanged = true + + // Notify the listeners of the new slot. + go c.publishSlotChanged(ctx, slot) + + // We can't safely check if the previous slot was missed if + // we potentially started up _after_ the slot had started. + // So we'll just not bother checking in that case. + if time.Since(c.startedAt) > (c.spec.SecondsPerSlot * 3) { + if err := c.checkForEmptySlot(ctx, previousSlot); err != nil { + c.log.WithError(err).Error("Failed to check for empty slot") + } + } + } + + if epochChanged || slotChanged { + // Notify the listeners of the new epoch and slot. + go c.publishEpochSlotChanged(ctx, epoch, slot) + } + + return nil +} + +func (c *Container) checkForEmptySlot(ctx context.Context, slotNumber phase0.Slot) error { + slot, err := c.GetSlot(ctx, slotNumber) + if err != nil { + return err + } + + epoch := c.calculateEpochFromSlot(slotNumber) + + if slot.MissingBlock() { + go c.publishEmptySlot(ctx, epoch, *slot) + } + + return nil +} + +// GetSlot returns the slot for the given slot number. +func (c *Container) GetSlot(ctx context.Context, slotNumber phase0.Slot) (*Slot, error) { + epoch, err := c.epochs.GetEpoch(c.calculateEpochFromSlot(slotNumber)) + if err != nil { + return nil, err + } + + return epoch.GetSlot(slotNumber) +} + +func (c *Container) calculateEpochFromSlot(slotNumber phase0.Slot) phase0.Epoch { + return phase0.Epoch(slotNumber / c.spec.SlotsPerEpoch) +} + +// GetEpoch returns the epoch for the given epoch number. +func (c *Container) GetEpoch(ctx context.Context, epochNumber phase0.Epoch) (*Epoch, error) { + return c.epochs.GetEpoch(epochNumber) +} + +func (c *Container) DeleteEpoch(ctx context.Context, epochNumber phase0.Epoch) error { + return c.epochs.RemoveEpoch(epochNumber) +} diff --git a/pkg/exporter/consensus/beacon/state/subscriber.go b/pkg/exporter/consensus/beacon/state/subscriber.go new file mode 100644 index 0000000..d06c630 --- /dev/null +++ b/pkg/exporter/consensus/beacon/state/subscriber.go @@ -0,0 +1,42 @@ +package state + +import ( + "context" + + "github.com/attestantio/go-eth2-client/spec/phase0" +) + +// OnEpochChanged is called when the current epoch changes. +func (c *Container) OnEpochChanged(ctx context.Context, cb func(ctx context.Context, epoch phase0.Epoch) error) error { + c.callbacksEpochChanged = append(c.callbacksEpochChanged, cb) + + return nil +} + +// OnSlotChanged is called when the current slot changes. +func (c *Container) OnSlotChanged(ctx context.Context, cb func(ctx context.Context, slot phase0.Slot) error) error { + c.callbacksSlotChanged = append(c.callbacksSlotChanged, cb) + + return nil +} + +// OnEpochSlotChanged is called when the current epoch or slot changes. +func (c *Container) OnEpochSlotChanged(ctx context.Context, cb func(ctx context.Context, epoch phase0.Epoch, slot phase0.Slot) error) error { + c.callbacksEpochSlotChanged = append(c.callbacksEpochSlotChanged, cb) + + return nil +} + +// OnBlockInserted is called when a block is inserted in to a slot. +func (c *Container) OnBlockInserted(ctx context.Context, cb func(ctx context.Context, epoch phase0.Epoch, slot Slot) error) error { + c.callbacksBlockInserted = append(c.callbacksBlockInserted, cb) + + return nil +} + +// OnEmptySlot is called when a slot expires without an associated block. +func (c *Container) OnEmptySlot(ctx context.Context, cb func(ctx context.Context, epoch phase0.Epoch, slot Slot) error) error { + c.callbacksEmptySlot = append(c.callbacksEmptySlot, cb) + + return nil +} diff --git a/pkg/exporter/consensus/beacon/state/types.go b/pkg/exporter/consensus/beacon/state/types.go new file mode 100644 index 0000000..0a9f4c2 --- /dev/null +++ b/pkg/exporter/consensus/beacon/state/types.go @@ -0,0 +1,13 @@ +package state + +import ( + "time" + + v1 "github.com/attestantio/go-eth2-client/api/v1" +) + +// BlockTimeCalculatorBundle is a bundle of data to help with calculating proposer delay. +type BlockTimeCalculatorBundle struct { + Genesis *v1.Genesis + SecondsPerSlot time.Duration +} diff --git a/pkg/exporter/consensus/beacon/subscriber.go b/pkg/exporter/consensus/beacon/subscriber.go new file mode 100644 index 0000000..cd97e7a --- /dev/null +++ b/pkg/exporter/consensus/beacon/subscriber.go @@ -0,0 +1,119 @@ +package beacon + +import ( + "context" + + v1 "github.com/attestantio/go-eth2-client/api/v1" + "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/nats-io/nats.go" +) + +func (n *node) handleSubscriberError(err error, topic string) { + if err != nil { + n.log.WithError(err).WithField("topic", topic).Error("Subscriber error") + } +} + +// Official Beacon events +func (n *node) OnBlock(ctx context.Context, handler func(ctx context.Context, event *v1.BlockEvent) error) (*nats.Subscription, error) { + return n.broker.Subscribe(topicBlock, func(event *v1.BlockEvent) { + n.handleSubscriberError(handler(ctx, event), topicBlock) + }) +} + +func (n *node) OnAttestation(ctx context.Context, handler func(ctx context.Context, event *phase0.Attestation) error) (*nats.Subscription, error) { + return n.broker.Subscribe(topicAttestation, func(event *phase0.Attestation) { + n.handleSubscriberError(handler(ctx, event), topicAttestation) + }) +} + +func (n *node) OnChainReOrg(ctx context.Context, handler func(ctx context.Context, event *v1.ChainReorgEvent) error) (*nats.Subscription, error) { + return n.broker.Subscribe(topicChainReorg, func(event *v1.ChainReorgEvent) { + n.handleSubscriberError(handler(ctx, event), topicChainReorg) + }) +} + +func (n *node) OnFinalizedCheckpoint(ctx context.Context, handler func(ctx context.Context, event *v1.FinalizedCheckpointEvent) error) (*nats.Subscription, error) { + return n.broker.Subscribe(topicFinalizedCheckpoint, func(event *v1.FinalizedCheckpointEvent) { + n.handleSubscriberError(handler(ctx, event), topicFinalizedCheckpoint) + }) +} + +func (n *node) OnHead(ctx context.Context, handler func(ctx context.Context, event *v1.HeadEvent) error) (*nats.Subscription, error) { + return n.broker.Subscribe(topicHead, func(event *v1.HeadEvent) { + n.handleSubscriberError(handler(ctx, event), topicHead) + }) +} + +func (n *node) OnVoluntaryExit(ctx context.Context, handler func(ctx context.Context, event *phase0.VoluntaryExit) error) (*nats.Subscription, error) { + return n.broker.Subscribe(topicVoluntaryExit, func(event *phase0.VoluntaryExit) { + n.handleSubscriberError(handler(ctx, event), topicVoluntaryExit) + }) +} + +func (n *node) OnEvent(ctx context.Context, handler func(ctx context.Context, event *v1.Event) error) (*nats.Subscription, error) { + return n.broker.Subscribe(topicEvent, func(event *v1.Event) { + n.handleSubscriberError(handler(ctx, event), topicEvent) + }) +} + +// Custom Events +func (n *node) OnEpochChanged(ctx context.Context, handler func(ctx context.Context, event *EpochChangedEvent) error) (*nats.Subscription, error) { + return n.broker.Subscribe(topicEpochChanged, func(event *EpochChangedEvent) { + n.handleSubscriberError(handler(ctx, event), topicEpochChanged) + }) +} + +func (n *node) OnSlotChanged(ctx context.Context, handler func(ctx context.Context, event *SlotChangedEvent) error) (*nats.Subscription, error) { + return n.broker.Subscribe(topicSlotChanged, func(event *SlotChangedEvent) { + n.handleSubscriberError(handler(ctx, event), topicSlotChanged) + }) +} + +func (n *node) OnEpochSlotChanged(ctx context.Context, handler func(ctx context.Context, event *EpochSlotChangedEvent) error) (*nats.Subscription, error) { + return n.broker.Subscribe(topicEpochSlotChanged, func(event *EpochSlotChangedEvent) { + n.handleSubscriberError(handler(ctx, event), topicEpochSlotChanged) + }) +} + +func (n *node) OnBlockInserted(ctx context.Context, handler func(ctx context.Context, event *BlockInsertedEvent) error) (*nats.Subscription, error) { + return n.broker.Subscribe(topicBlockInserted, func(event *BlockInsertedEvent) { + n.handleSubscriberError(handler(ctx, event), topicBlockInserted) + }) +} + +func (n *node) OnReady(ctx context.Context, handler func(ctx context.Context, event *ReadyEvent) error) (*nats.Subscription, error) { + return n.broker.Subscribe(topicReady, func(event *ReadyEvent) { + n.handleSubscriberError(handler(ctx, event), topicReady) + }) +} + +func (n *node) OnSyncStatus(ctx context.Context, handler func(ctx context.Context, event *SyncStatusEvent) error) (*nats.Subscription, error) { + return n.broker.Subscribe(topicSyncStatus, func(event *SyncStatusEvent) { + n.handleSubscriberError(handler(ctx, event), topicSyncStatus) + }) +} + +func (n *node) OnNodeVersionUpdated(ctx context.Context, handler func(ctx context.Context, event *NodeVersionUpdatedEvent) error) (*nats.Subscription, error) { + return n.broker.Subscribe(topicNodeVersionUpdated, func(event *NodeVersionUpdatedEvent) { + n.handleSubscriberError(handler(ctx, event), topicNodeVersionUpdated) + }) +} + +func (n *node) OnPeersUpdated(ctx context.Context, handler func(ctx context.Context, event *PeersUpdatedEvent) error) (*nats.Subscription, error) { + return n.broker.Subscribe(topicPeersUpdated, func(event *PeersUpdatedEvent) { + n.handleSubscriberError(handler(ctx, event), topicPeersUpdated) + }) +} + +func (n *node) OnSpecUpdated(ctx context.Context, handler func(ctx context.Context, event *SpecUpdatedEvent) error) (*nats.Subscription, error) { + return n.broker.Subscribe(topicSpecUpdated, func(event *SpecUpdatedEvent) { + n.handleSubscriberError(handler(ctx, event), topicSpecUpdated) + }) +} + +func (n *node) OnEmptySlot(ctx context.Context, handler func(ctx context.Context, event *EmptySlotEvent) error) (*nats.Subscription, error) { + return n.broker.Subscribe(topicEmptySlot, func(event *EmptySlotEvent) { + n.handleSubscriberError(handler(ctx, event), topicEmptySlot) + }) +} diff --git a/pkg/exporter/consensus/beacon/subscriptions.go b/pkg/exporter/consensus/beacon/subscriptions.go new file mode 100644 index 0000000..5116151 --- /dev/null +++ b/pkg/exporter/consensus/beacon/subscriptions.go @@ -0,0 +1,189 @@ +package beacon + +import ( + "context" + "errors" + "fmt" + "time" + + eth2client "github.com/attestantio/go-eth2-client" + v1 "github.com/attestantio/go-eth2-client/api/v1" + "github.com/attestantio/go-eth2-client/spec/phase0" +) + +func (n *node) ensureBeaconSubscription(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(time.Second * 5): + if n.client == nil { + continue + } + + // Only resubscribe if we haven't received an event after 5 minutes. + if time.Since(n.lastEventTime) < (5 * time.Minute) { + continue + } + + n.log. + WithField("last_event_time", n.lastEventTime.Local().String()). + Info("Haven't received any events for 5 minutes, re-subscribing") + + if time.Since(n.lastEventTime) > time.Minute*5 { + if err := n.subscribeToBeaconEvents(ctx); err != nil { + n.log.WithError(err).Error("Failed to subscribe to beacon") + } + + time.Sleep(time.Second * 60) + } + } + } +} + +func (n *node) subscribeToBeaconEvents(ctx context.Context) error { + provider, isProvider := n.client.(eth2client.EventsProvider) + if !isProvider { + return errors.New("client does not implement eth2client.Subscriptions") + } + + topics := []string{} + + for key, supported := range v1.SupportedEventTopics { + if key == "contribution_and_proof" { + continue + } + + if supported { + topics = append(topics, key) + } + } + + if err := provider.Events(ctx, topics, func(event *v1.Event) { + n.lastEventTime = time.Now() + + if err := n.handleEvent(ctx, event); err != nil { + n.log.Errorf("Failed to handle event: %v", err) + } + }); err != nil { + return err + } + + return nil +} + +func (n *node) handleEvent(ctx context.Context, event *v1.Event) error { + if err := n.publishEvent(ctx, event); err != nil { + n.log.WithError(err).Error("Failed to publish raw event") + } + + // If we are syncing, only forward on "head" and "block" events + if n.syncing.IsSyncing { + if event.Topic != topicBlock && event.Topic != topicHead { + return nil + } + } + + switch event.Topic { + case topicAttestation: + return n.handleAttestation(ctx, event) + case topicBlock: + return n.handleBlock(ctx, event) + case topicChainReorg: + return n.handleChainReorg(ctx, event) + case topicFinalizedCheckpoint: + return n.handleFinalizedCheckpoint(ctx, event) + case topicHead: + return n.handleHead(ctx, event) + case topicVoluntaryExit: + return n.handleVoluntaryExit(ctx, event) + case topicContributionAndProof: + return n.handleContributionAndProof(ctx, event) + + default: + return fmt.Errorf("unknown event topic %s", event.Topic) + } +} + +func (n *node) handleAttestation(ctx context.Context, event *v1.Event) error { + attestation, valid := event.Data.(*phase0.Attestation) + if !valid { + return errors.New("invalid attestation event") + } + + if err := n.publishAttestation(ctx, attestation); err != nil { + return err + } + + return nil +} + +func (n *node) handleBlock(ctx context.Context, event *v1.Event) error { + block, valid := event.Data.(*v1.BlockEvent) + if !valid { + return errors.New("invalid block event") + } + + if err := n.publishBlock(ctx, block); err != nil { + return err + } + + return nil +} + +func (n *node) handleChainReorg(ctx context.Context, event *v1.Event) error { + chainReorg, valid := event.Data.(*v1.ChainReorgEvent) + if !valid { + return errors.New("invalid chain reorg event") + } + + if err := n.publishChainReOrg(ctx, chainReorg); err != nil { + return err + } + + return nil +} + +func (n *node) handleFinalizedCheckpoint(ctx context.Context, event *v1.Event) error { + checkpoint, valid := event.Data.(*v1.FinalizedCheckpointEvent) + if !valid { + return errors.New("invalid checkpoint event") + } + + if err := n.publishFinalizedCheckpoint(ctx, checkpoint); err != nil { + return err + } + + return nil +} + +func (n *node) handleHead(ctx context.Context, event *v1.Event) error { + head, valid := event.Data.(*v1.HeadEvent) + if !valid { + return errors.New("invalid head event") + } + + if err := n.publishHead(ctx, head); err != nil { + return err + } + + return nil +} + +func (n *node) handleVoluntaryExit(ctx context.Context, event *v1.Event) error { + exit, valid := event.Data.(*phase0.VoluntaryExit) + if !valid { + return errors.New("invalid voluntary exit event") + } + + if err := n.publishVoluntaryExit(ctx, exit); err != nil { + return err + } + + return nil +} + +func (n *node) handleContributionAndProof(ctx context.Context, event *v1.Event) error { + // Do nothing for now + return nil +} diff --git a/pkg/exporter/consensus/consensus.go b/pkg/exporter/consensus/consensus.go deleted file mode 100644 index 4a8abed..0000000 --- a/pkg/exporter/consensus/consensus.go +++ /dev/null @@ -1,87 +0,0 @@ -package consensus - -import ( - "context" - "time" - - eth2client "github.com/attestantio/go-eth2-client" - "github.com/attestantio/go-eth2-client/http" - "github.com/rs/zerolog" - "github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/api" - "github.com/sirupsen/logrus" -) - -// Node represents a single consensus node in the ethereum network. -type Node interface { - // Name returns the name of the node. - Name() string - // URL returns the url of the node. - URL() string - // Bootstrapped returns whether the node has been bootstrapped and is ready to be used. - Bootstrapped() bool - // Bootstrap attempts to bootstrap the node (i.e. configuring clients) - Bootstrap(ctx context.Context) error - // StartMetrics starts the metrics collection. - StartMetrics(ctx context.Context) -} - -type node struct { - name string - url string - namespace string - client eth2client.Service - api api.ConsensusClient - log logrus.FieldLogger - metrics Metrics -} - -// NewConsensusNode returns a new Node instance. -func NewConsensusNode(ctx context.Context, log logrus.FieldLogger, namespace, name, url string) (Node, error) { - return &node{ - name: name, - url: url, - log: log, - namespace: namespace, - }, nil -} - -func (c *node) Name() string { - return c.name -} - -func (c *node) URL() string { - return c.url -} - -func (c *node) Bootstrap(ctx context.Context) error { - client, err := http.New(ctx, - http.WithAddress(c.url), - http.WithLogLevel(zerolog.Disabled), - ) - if err != nil { - return err - } - - c.client = client - c.api = api.NewConsensusClient(ctx, c.log, c.url) - - return nil -} - -func (c *node) Bootstrapped() bool { - _, isProvider := c.client.(eth2client.NodeSyncingProvider) - return isProvider -} - -func (c *node) StartMetrics(ctx context.Context) { - for !c.Bootstrapped() { - if err := c.Bootstrap(ctx); err != nil { - c.log.WithError(err).Error("Failed to bootstrap consensus client") - } - - time.Sleep(5 * time.Second) - } - - c.metrics = NewMetrics(c.client, c.api, c.log, c.name, c.namespace) - c.metrics.StartAsync(ctx) -} diff --git a/pkg/exporter/consensus/jobs/beacon.go b/pkg/exporter/consensus/jobs/beacon.go index ccd0ecd..dd491b1 100644 --- a/pkg/exporter/consensus/jobs/beacon.go +++ b/pkg/exporter/consensus/jobs/beacon.go @@ -11,6 +11,7 @@ import ( "github.com/attestantio/go-eth2-client/spec" "github.com/prometheus/client_golang/prometheus" "github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/api" + "github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/beacon" "github.com/sirupsen/logrus" ) @@ -18,6 +19,7 @@ import ( type Beacon struct { client eth2client.Service log logrus.FieldLogger + beaconNode beacon.Node Slot prometheus.GaugeVec Transactions prometheus.GaugeVec Slashings prometheus.GaugeVec @@ -29,6 +31,8 @@ type Beacon struct { ReOrgDepth prometheus.Counter HeadSlotHash prometheus.Gauge FinalityCheckpointHash prometheus.GaugeVec + EmptySlots prometheus.Counter + ProposerDelay prometheus.Histogram currentVersion string } @@ -40,13 +44,14 @@ const ( ) // NewBeacon creates a new Beacon instance. -func NewBeaconJob(client eth2client.Service, ap api.ConsensusClient, log logrus.FieldLogger, namespace string, constLabels map[string]string) Beacon { +func NewBeaconJob(client eth2client.Service, ap api.ConsensusClient, beac beacon.Node, log logrus.FieldLogger, namespace string, constLabels map[string]string) Beacon { constLabels["module"] = NameBeacon namespace += "_beacon" return Beacon{ - client: client, - log: log, + client: client, + beaconNode: beac, + log: log, Slot: *prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: namespace, @@ -168,6 +173,23 @@ func NewBeaconJob(client eth2client.Service, ap api.ConsensusClient, log logrus. "checkpoint", }, ), + ProposerDelay: prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: namespace, + Name: "proposer_delay", + Help: "The delay of the proposer.", + ConstLabels: constLabels, + Buckets: prometheus.LinearBuckets(0, 1000, 13), + }, + ), + EmptySlots: prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "empty_slots_count", + Help: "The number of slots that have expired without a block proposed.", + ConstLabels: constLabels, + }, + ), } } @@ -175,15 +197,19 @@ func (b *Beacon) Name() string { return NameBeacon } -func (b *Beacon) Start(ctx context.Context) { +func (b *Beacon) Start(ctx context.Context) error { b.tick(ctx) + if err := b.setupSubscriptions(ctx); err != nil { + return err + } + go b.getInitialData(ctx) for { select { case <-ctx.Done(): - return + return ctx.Err() case <-time.After(time.Second * 5): b.tick(ctx) } @@ -194,6 +220,57 @@ func (b *Beacon) tick(ctx context.Context) { } +func (b *Beacon) setupSubscriptions(ctx context.Context) error { + if _, err := b.beaconNode.OnBlockInserted(ctx, b.handleBlockInserted); err != nil { + return err + } + + if _, err := b.beaconNode.OnChainReOrg(ctx, b.handleChainReorg); err != nil { + return err + } + + if _, err := b.beaconNode.OnEmptySlot(ctx, b.handleEmptySlot); err != nil { + return err + } + + return nil +} + +func (b *Beacon) handleEmptySlot(ctx context.Context, event *beacon.EmptySlotEvent) error { + b.log.WithField("slot", event.Slot).Debug("Empty slot detected") + + b.EmptySlots.Inc() + + return nil +} + +func (b *Beacon) handleBlockInserted(ctx context.Context, event *beacon.BlockInsertedEvent) error { + // Fetch the slot + slot, err := b.beaconNode.GetSlot(ctx, event.Slot) + if err != nil { + return err + } + + timedBlock, err := slot.Block() + if err != nil { + return err + } + + // nolint:gocritic // False positive + if err = b.handleSingleBlock("head", timedBlock.Block); err != nil { + return err + } + + delay, err := slot.ProposerDelay() + if err != nil { + return err + } + + b.ProposerDelay.Observe(float64(delay.Milliseconds())) + + return nil +} + func (b *Beacon) getInitialData(ctx context.Context) { for { if b.client == nil { @@ -201,7 +278,6 @@ func (b *Beacon) getInitialData(ctx context.Context) { continue } - b.updateBeaconBlock(ctx) b.updateFinalizedCheckpoint(ctx) break @@ -209,27 +285,16 @@ func (b *Beacon) getInitialData(ctx context.Context) { } func (b *Beacon) HandleEvent(ctx context.Context, event *v1.Event) { - if event.Topic == EventTopicBlock { - b.handleBlockEvent(ctx, event) - } - if event.Topic == EventTopicFinalizedCheckpoint { b.handleFinalizedCheckpointEvent(ctx, event) } - - if event.Topic == EventTopicChainReorg { - b.handleChainReorg(event) - } } -func (b *Beacon) handleChainReorg(event *v1.Event) { - reorg, ok := event.Data.(*v1.ChainReorgEvent) - if !ok { - return - } - +func (b *Beacon) handleChainReorg(ctx context.Context, event *v1.ChainReorgEvent) error { b.ReOrgs.Inc() - b.ReOrgDepth.Add(float64(reorg.Depth)) + b.ReOrgDepth.Add(float64(event.Depth)) + + return nil } func (b *Beacon) handleFinalizedCheckpointEvent(ctx context.Context, event *v1.Event) { @@ -251,21 +316,6 @@ func (b *Beacon) updateFinalizedCheckpoint(ctx context.Context) { } } -func (b *Beacon) updateBeaconBlock(ctx context.Context) { - if err := b.GetSignedBeaconBlock(ctx, "head"); err != nil { - b.log.WithError(err).Error("Failed to get signed beacon block") - } -} - -func (b *Beacon) handleBlockEvent(ctx context.Context, event *v1.Event) { - _, ok := event.Data.(*v1.BlockEvent) - if !ok { - return - } - - b.updateBeaconBlock(ctx) -} - func (b *Beacon) GetSignedBeaconBlock(ctx context.Context, blockID string) error { provider, isProvider := b.client.(eth2client.SignedBeaconBlockProvider) if !isProvider { @@ -323,6 +373,7 @@ func (b *Beacon) handleSingleBlock(blockID string, block *spec.VersionedSignedBe b.Attestations.Reset() b.Deposits.Reset() b.VoluntaryExits.Reset() + b.Slot.Reset() b.currentVersion = block.Version.String() } diff --git a/pkg/exporter/consensus/jobs/beacon_block.go b/pkg/exporter/consensus/jobs/beacon_block.go index c7159a6..62b2f1f 100644 --- a/pkg/exporter/consensus/jobs/beacon_block.go +++ b/pkg/exporter/consensus/jobs/beacon_block.go @@ -4,52 +4,6 @@ import ( "github.com/attestantio/go-eth2-client/spec" ) -type BeaconBlock struct { - AttesterSlashings int - ProposerSlashings int - Transactions int - Deposits int - VoluntaryExits int - Attestations int - Slot uint64 -} - -func NewBeaconBlockFromPhase0(block *spec.VersionedSignedBeaconBlock) BeaconBlock { - return BeaconBlock{ - AttesterSlashings: len(block.Phase0.Message.Body.AttesterSlashings), - ProposerSlashings: len(block.Phase0.Message.Body.ProposerSlashings), - Transactions: 0, - Deposits: len(block.Phase0.Message.Body.Deposits), - VoluntaryExits: len(block.Phase0.Message.Body.VoluntaryExits), - Attestations: len(block.Phase0.Message.Body.Attestations), - Slot: uint64(block.Phase0.Message.Slot), - } -} - -func NewBeaconBlockFromAltair(block *spec.VersionedSignedBeaconBlock) BeaconBlock { - return BeaconBlock{ - AttesterSlashings: len(block.Altair.Message.Body.AttesterSlashings), - ProposerSlashings: len(block.Altair.Message.Body.ProposerSlashings), - Transactions: 0, - Deposits: len(block.Altair.Message.Body.Deposits), - VoluntaryExits: len(block.Altair.Message.Body.VoluntaryExits), - Attestations: len(block.Altair.Message.Body.Attestations), - Slot: uint64(block.Altair.Message.Slot), - } -} - -func NewBeaconBlockFromBellatrix(block *spec.VersionedSignedBeaconBlock) BeaconBlock { - return BeaconBlock{ - AttesterSlashings: len(block.Bellatrix.Message.Body.AttesterSlashings), - ProposerSlashings: len(block.Bellatrix.Message.Body.ProposerSlashings), - Transactions: len(block.Bellatrix.Message.Body.ExecutionPayload.Transactions), - Deposits: len(block.Bellatrix.Message.Body.Deposits), - VoluntaryExits: len(block.Bellatrix.Message.Body.VoluntaryExits), - Attestations: len(block.Bellatrix.Message.Body.Attestations), - Slot: uint64(block.Bellatrix.Message.Slot), - } -} - func GetDepositCountsFromBeaconBlock(block *spec.VersionedSignedBeaconBlock) int { switch block.Version { case spec.DataVersionPhase0: diff --git a/pkg/exporter/consensus/jobs/event.go b/pkg/exporter/consensus/jobs/event.go index 02829d1..d6dff7f 100644 --- a/pkg/exporter/consensus/jobs/event.go +++ b/pkg/exporter/consensus/jobs/event.go @@ -7,7 +7,7 @@ import ( eth2client "github.com/attestantio/go-eth2-client" v1 "github.com/attestantio/go-eth2-client/api/v1" "github.com/prometheus/client_golang/prometheus" - "github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/api" + "github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/beacon" "github.com/sirupsen/logrus" ) @@ -17,6 +17,8 @@ type Event struct { Count prometheus.CounterVec TimeSinceLastEvent prometheus.Gauge + beacon beacon.Node + LastEventTime time.Time } @@ -25,12 +27,13 @@ const ( ) // NewEvent creates a new Event instance. -func NewEventJob(client eth2client.Service, ap api.ConsensusClient, log logrus.FieldLogger, namespace string, constLabels map[string]string) Event { +func NewEventJob(client eth2client.Service, bc beacon.Node, log logrus.FieldLogger, namespace string, constLabels map[string]string) Event { constLabels["module"] = NameEvent namespace += "_event" return Event{ - log: log, + log: log, + beacon: bc, Count: *prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: namespace, @@ -54,28 +57,34 @@ func NewEventJob(client eth2client.Service, ap api.ConsensusClient, log logrus.F } } -func (b *Event) Name() string { +func (e *Event) Name() string { return NameEvent } -func (b *Event) Start(ctx context.Context) { +func (e *Event) Start(ctx context.Context) error { + if _, err := e.beacon.OnEvent(ctx, e.HandleEvent); err != nil { + return err + } + for { select { case <-ctx.Done(): - return + return ctx.Err() case <-time.After(time.Second * 1): - b.tick(ctx) + e.tick(ctx) } } } //nolint:unparam // ctx will probably be used in the future -func (b *Event) tick(ctx context.Context) { - b.TimeSinceLastEvent.Set(float64(time.Since(b.LastEventTime).Milliseconds())) +func (e *Event) tick(ctx context.Context) { + e.TimeSinceLastEvent.Set(float64(time.Since(e.LastEventTime).Milliseconds())) } -func (b *Event) HandleEvent(ctx context.Context, event *v1.Event) { - b.Count.WithLabelValues(event.Topic).Inc() - b.LastEventTime = time.Now() - b.TimeSinceLastEvent.Set(0) +func (e *Event) HandleEvent(ctx context.Context, event *v1.Event) error { + e.Count.WithLabelValues(event.Topic).Inc() + e.LastEventTime = time.Now() + e.TimeSinceLastEvent.Set(0) + + return nil } diff --git a/pkg/exporter/consensus/jobs/forks.go b/pkg/exporter/consensus/jobs/forks.go index 9cb2855..8117171 100644 --- a/pkg/exporter/consensus/jobs/forks.go +++ b/pkg/exporter/consensus/jobs/forks.go @@ -2,27 +2,21 @@ package jobs import ( "context" - "errors" - "strings" - "time" "github.com/prometheus/client_golang/prometheus" - "github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/api" + "github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/beacon" "github.com/sirupsen/logrus" - "github.com/spf13/cast" - eth2client "github.com/attestantio/go-eth2-client" - v1 "github.com/attestantio/go-eth2-client/api/v1" + "github.com/attestantio/go-eth2-client/spec/phase0" ) // Forks reports the state of any forks (previous, active or upcoming). type Forks struct { - Epochs prometheus.GaugeVec - Activated prometheus.GaugeVec - Current prometheus.GaugeVec - client eth2client.Service - log logrus.FieldLogger - previousCurrentFork string + Epochs prometheus.GaugeVec + Activated prometheus.GaugeVec + Current prometheus.GaugeVec + beacon beacon.Node + log logrus.FieldLogger } const ( @@ -30,13 +24,13 @@ const ( ) // NewForksJob returns a new Forks instance. -func NewForksJob(client eth2client.Service, ap api.ConsensusClient, log logrus.FieldLogger, namespace string, constLabels map[string]string) Forks { +func NewForksJob(beac beacon.Node, log logrus.FieldLogger, namespace string, constLabels map[string]string) Forks { constLabels["module"] = NameFork namespace += "_fork" return Forks{ - client: client, + beacon: beac, log: log, Epochs: *prometheus.NewGaugeVec( prometheus.GaugeOpts{ @@ -71,8 +65,6 @@ func NewForksJob(client eth2client.Service, ap api.ConsensusClient, log logrus.F "fork", }, ), - - previousCurrentFork: "", } } @@ -80,111 +72,45 @@ func (f *Forks) Name() string { return NameFork } -func (f *Forks) Start(ctx context.Context) { - f.tick(ctx) - - for { - select { - case <-ctx.Done(): - return - case <-time.After(time.Second * 600): - f.tick(ctx) - } - } -} - -func (f *Forks) tick(ctx context.Context) { - if err := f.ForkEpochs(ctx); err != nil { - f.log.WithError(err).Error("Failed to fetch fork epochs") - } - - if err := f.GetCurrent(ctx); err != nil { - f.log.WithError(err).Error("Failed to fetch current fork") - } -} - -func (f *Forks) HandleEvent(ctx context.Context, event *v1.Event) { -} - -func (f *Forks) ForkEpochs(ctx context.Context) error { - spec, err := f.getSpec(ctx) - if err != nil { +func (f *Forks) Start(ctx context.Context) error { + if _, err := f.beacon.OnBlockInserted(ctx, func(ctx context.Context, event *beacon.BlockInsertedEvent) error { + return f.calculateCurrent(ctx, event.Slot) + }); err != nil { return err } - for k, v := range spec { - if strings.Contains(k, "_FORK_EPOCH") { - f.ObserveForkEpoch(strings.ReplaceAll(k, "_FORK_EPOCH", ""), cast.ToUint64(v)) - } - } - return nil } -func (f *Forks) GetCurrent(ctx context.Context) error { - // Get the current head slot. - provider, isProvider := f.client.(eth2client.BeaconBlockHeadersProvider) - if !isProvider { - return errors.New("client does not implement eth2client.BeaconBlockHeadersProvider") - } - - headSlot, err := provider.BeaconBlockHeader(ctx, "head") +func (f *Forks) calculateCurrent(ctx context.Context, slot phase0.Slot) error { + spec, err := f.beacon.GetSpec(ctx) if err != nil { return err } - spec, err := f.getSpec(ctx) - if err != nil { - return err - } + slotsPerEpoch := spec.SlotsPerEpoch - slotsPerEpoch := 32 - if v, ok := spec["SLOTS_PER_EPOCH"]; ok { - slotsPerEpoch = cast.ToInt(v) - } - - current := "" - currentSlot := 0 + f.Activated.Reset() + f.Epochs.Reset() - for k, v := range spec { - if strings.Contains(k, "_FORK_EPOCH") { - forkName := strings.ReplaceAll(k, "_FORK_EPOCH", "") + for _, fork := range spec.ForkEpochs { + f.Epochs.WithLabelValues(fork.Name).Set(float64(fork.Epoch)) - if int(headSlot.Header.Message.Slot)/slotsPerEpoch > cast.ToInt(v) { - f.Activated.WithLabelValues(forkName).Set(1) - } else { - f.Activated.WithLabelValues(forkName).Set(0) - } - - if currentSlot < cast.ToInt(v) { - current = forkName - currentSlot = cast.ToInt(v) - } + if fork.Active(slot, slotsPerEpoch) { + f.Activated.WithLabelValues(fork.Name).Set(1) + } else { + f.Activated.WithLabelValues(fork.Name).Set(0) } } - if current != f.previousCurrentFork { - f.Current.WithLabelValues(current).Set(1) - - if f.previousCurrentFork != "" { - f.Current.WithLabelValues(f.previousCurrentFork).Set(0) - } + current, err := spec.ForkEpochs.CurrentFork(slot, slotsPerEpoch) + if err != nil { + f.log.WithError(err).Error("Failed to set current fork") + } else { + f.Current.Reset() - f.previousCurrentFork = current + f.Current.WithLabelValues(current.Name).Set(1) } return nil } - -func (f *Forks) getSpec(ctx context.Context) (map[string]interface{}, error) { - provider, isProvider := f.client.(eth2client.SpecProvider) - if !isProvider { - return nil, errors.New("client does not implement eth2client.SpecProvider") - } - - return provider.Spec(ctx) -} - -func (f *Forks) ObserveForkEpoch(name string, epoch uint64) { - f.Epochs.WithLabelValues(name).Set(float64(epoch)) -} diff --git a/pkg/exporter/consensus/jobs/general.go b/pkg/exporter/consensus/jobs/general.go index 809026f..2572aad 100644 --- a/pkg/exporter/consensus/jobs/general.go +++ b/pkg/exporter/consensus/jobs/general.go @@ -2,26 +2,20 @@ package jobs import ( "context" - "errors" - "time" - eth2client "github.com/attestantio/go-eth2-client" - v1 "github.com/attestantio/go-eth2-client/api/v1" "github.com/prometheus/client_golang/prometheus" - "github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/api" "github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/api/types" + "github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/beacon" "github.com/sirupsen/logrus" ) // General reports general information about the node. type General struct { - client eth2client.Service - api api.ConsensusClient - log logrus.FieldLogger - NodeVersion prometheus.GaugeVec - ClientName prometheus.GaugeVec - Peers prometheus.GaugeVec - nodeVersionFetchedAt time.Time + beacon beacon.Node + log logrus.FieldLogger + NodeVersion prometheus.GaugeVec + ClientName prometheus.GaugeVec + Peers prometheus.GaugeVec } const ( @@ -29,12 +23,11 @@ const ( ) // NewGeneral creates a new General instance. -func NewGeneralJob(client eth2client.Service, ap api.ConsensusClient, log logrus.FieldLogger, namespace string, constLabels map[string]string) General { +func NewGeneralJob(beac beacon.Node, log logrus.FieldLogger, namespace string, constLabels map[string]string) General { constLabels["module"] = NameGeneral return General{ - client: client, - api: ap, + beacon: beac, log: log, NodeVersion: *prometheus.NewGaugeVec( prometheus.GaugeOpts{ @@ -66,69 +59,49 @@ func (g *General) Name() string { return NameGeneral } -func (g *General) Start(ctx context.Context) { - g.tick(ctx) - - for { - select { - case <-ctx.Done(): - return - case <-time.After(time.Second * 15): - g.tick(ctx) - } +func (g *General) Start(ctx context.Context) error { + if _, err := g.beacon.OnNodeVersionUpdated(ctx, func(ctx context.Context, event *beacon.NodeVersionUpdatedEvent) error { + g.observeNodeVersion(ctx, event.Version) + return nil + }); err != nil { + return err } -} - -func (g *General) HandleEvent(ctx context.Context, event *v1.Event) { - -} -func (g *General) tick(ctx context.Context) { - if err := g.GetNodeVersion(ctx); err != nil { - g.log.WithError(err).Error("Failed to get node version") - } + if _, err := g.beacon.OnPeersUpdated(ctx, func(ctx context.Context, event *beacon.PeersUpdatedEvent) error { + g.Peers.Reset() - if err := g.GetPeers(ctx); err != nil { - g.log.WithError(err).Error("Failed to get peers") - } -} + for _, state := range types.PeerStates { + for _, direction := range types.PeerDirections { + g.Peers.WithLabelValues(state, direction).Set(float64(len(event.Peers.ByStateAndDirection(state, direction)))) + } + } -func (g *General) GetNodeVersion(ctx context.Context) error { - if time.Since(g.nodeVersionFetchedAt) < (30 * time.Minute) { return nil + }); err != nil { + return err } - provider, isProvider := g.client.(eth2client.NodeVersionProvider) - if !isProvider { - return errors.New("client does not implement eth2client.NodeVersionProvider") - } - - version, err := provider.NodeVersion(ctx) - if err != nil { + if err := g.initialFetch(ctx); err != nil { return err } - g.NodeVersion.Reset() - g.NodeVersion.WithLabelValues(version).Set(1) - - g.nodeVersionFetchedAt = time.Now() - return nil } -func (g *General) GetPeers(ctx context.Context) error { - peers, err := g.api.NodePeers(ctx) +func (g *General) initialFetch(ctx context.Context) error { + version, err := g.beacon.GetNodeVersion(ctx) if err != nil { return err } - g.Peers.Reset() - - for _, state := range types.PeerStates { - for _, direction := range types.PeerDirections { - g.Peers.WithLabelValues(state, direction).Set(float64(len(peers.ByStateAndDirection(state, direction)))) - } - } + g.observeNodeVersion(ctx, version) return nil } + +func (g *General) observeNodeVersion(ctx context.Context, version string) { + g.log.WithField("version", version).Debug("Got node version") + + g.NodeVersion.Reset() + g.NodeVersion.WithLabelValues(version).Set(1) +} diff --git a/pkg/exporter/consensus/jobs/spec.go b/pkg/exporter/consensus/jobs/spec.go index bc34113..7f5bdb9 100644 --- a/pkg/exporter/consensus/jobs/spec.go +++ b/pkg/exporter/consensus/jobs/spec.go @@ -2,22 +2,18 @@ package jobs import ( "context" - "errors" - "fmt" "math/big" "time" - eth2client "github.com/attestantio/go-eth2-client" - v1 "github.com/attestantio/go-eth2-client/api/v1" "github.com/prometheus/client_golang/prometheus" - "github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/api" + "github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/beacon" + "github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/beacon/state" "github.com/sirupsen/logrus" - "github.com/spf13/cast" ) // Spec reports metrics about the configured consensus spec. type Spec struct { - client eth2client.Service + beacon beacon.Node log logrus.FieldLogger SafeSlotsToUpdateJustified prometheus.Gauge DepositChainID prometheus.Gauge @@ -50,14 +46,14 @@ const ( ) // NewSpecJob returns a new Spec instance. -func NewSpecJob(client eth2client.Service, ap api.ConsensusClient, log logrus.FieldLogger, namespace string, constLabels map[string]string) Spec { +func NewSpecJob(bc beacon.Node, log logrus.FieldLogger, namespace string, constLabels map[string]string) Spec { constLabels["module"] = NameSpec namespace += "_spec" return Spec{ - client: client, log: log, + beacon: bc, SafeSlotsToUpdateJustified: prometheus.NewGauge( prometheus.GaugeOpts{ Namespace: namespace, @@ -259,144 +255,76 @@ func (s *Spec) Name() string { return NameSpec } -func (s *Spec) Start(ctx context.Context) { +func (s *Spec) Start(ctx context.Context) error { + if _, err := s.beacon.OnSpecUpdated(ctx, func(ctx context.Context, event *beacon.SpecUpdatedEvent) error { + return s.observeSpec(ctx, event.Spec) + }); err != nil { + return err + } + s.tick(ctx) for { select { case <-ctx.Done(): - return - case <-time.After(time.Second * 600): + return ctx.Err() + case <-time.After(time.Minute * 5): s.tick(ctx) } } } func (s *Spec) tick(ctx context.Context) { - if err := s.GetSpec(ctx); err != nil { + if err := s.getSpec(ctx); err != nil { s.log.WithError(err).Error("Failed to fetch spec") } } -func (s *Spec) HandleEvent(ctx context.Context, event *v1.Event) { +func (s *Spec) observeSpec(ctx context.Context, spec *state.Spec) error { + s.ConfigName.Reset() + s.ConfigName.WithLabelValues(spec.ConfigName).Set(1) + + s.PresetBase.Reset() + s.PresetBase.WithLabelValues(spec.PresetBase).Set(1) + + s.SafeSlotsToUpdateJustified.Set(float64(spec.SafeSlotsToUpdateJustified)) + s.DepositChainID.Set(float64(spec.DepositChainID)) + s.MaxValidatorsPerCommittee.Set(float64(spec.MaxValidatorsPerCommittee)) + // nolint:unconvert // false positive + s.SecondsPerEth1Block.Set(float64(spec.SecondsPerEth1Block.Seconds())) + s.BaseRewardFactor.Set(float64(spec.BaseRewardFactor)) + s.EpochsPerSyncCommitteePeriod.Set(float64(spec.EpochsPerSyncCommitteePeriod)) + s.EffectiveBalanceIncrement.Set(float64(spec.EffectiveBalanceIncrement)) + s.MaxAttestations.Set(float64(spec.MaxAttestations)) + s.MinSyncCommitteeParticipants.Set(float64(spec.MinSyncCommitteeParticipants)) + // nolint:unconvert // false positive + s.GenesisDelay.Set(float64(spec.GenesisDelay.Seconds())) + // nolint:unconvert // false positive + s.SecondsPerSlot.Set(float64(spec.SecondsPerSlot.Seconds())) + s.MaxEffectiveBalance.Set(float64(spec.MaxEffectiveBalance)) + s.MaxDeposits.Set(float64(spec.MaxDeposits)) + s.MinGenesisActiveValidatorCount.Set(float64(spec.MinGenesisActiveValidatorCount)) + s.TargetCommitteeSize.Set(float64(spec.TargetCommitteeSize)) + s.SyncCommitteeSize.Set(float64(spec.SyncCommitteeSize)) + s.Eth1FollowDistance.Set(float64(spec.Eth1FollowDistance)) + s.TerminalBlockHashActivationEpoch.Set(float64(spec.TerminalBlockHashActivationEpoch)) + s.MinDepositAmount.Set(float64(spec.MinDepositAmount)) + s.SlotsPerEpoch.Set(float64(spec.SlotsPerEpoch)) + + trillion := big.NewInt(1e12) + divided := new(big.Int).Div(&spec.TerminalTotalDifficulty, trillion) + asFloat, _ := new(big.Float).SetInt(divided).Float64() + s.TerminalTotalDifficultyTrillions.Set(asFloat) + s.TerminalTotalDifficulty.Set(float64(spec.TerminalTotalDifficulty.Uint64())) + return nil } -func (s *Spec) GetSpec(ctx context.Context) error { - provider, isProvider := s.client.(eth2client.SpecProvider) - if !isProvider { - return errors.New("client does not implement eth2client.SpecProvider") - } - - spec, err := provider.Spec(ctx) +func (s *Spec) getSpec(ctx context.Context) error { + spec, err := s.beacon.GetSpec(ctx) if err != nil { return err } - s.Update(spec) - - return nil -} - -func (s *Spec) Update(spec map[string]interface{}) { - if safeSlotsToUpdateJustified, exists := spec["SAFE_SLOTS_TO_UPDATE_JUSTIFIED"]; exists { - s.SafeSlotsToUpdateJustified.Set(cast.ToFloat64(safeSlotsToUpdateJustified)) - } - - if depositChainID, exists := spec["DEPOSIT_CHAIN_ID"]; exists { - s.DepositChainID.Set(cast.ToFloat64(depositChainID)) - } - - if configName, exists := spec["CONFIG_NAME"]; exists { - s.ConfigName.WithLabelValues(cast.ToString(configName)).Set(1) - } - - if maxValidatorsPerCommittee, exists := spec["MAX_VALIDATORS_PER_COMMITTEE"]; exists { - s.MaxValidatorsPerCommittee.Set(cast.ToFloat64(maxValidatorsPerCommittee)) - } - - if secondsPerEth1Block, exists := spec["SECONDS_PER_ETH1_BLOCK"]; exists { - s.SecondsPerEth1Block.Set(float64(cast.ToDuration(secondsPerEth1Block))) - } - - if baseRewardFactor, exists := spec["BASE_REWARD_FACTOR"]; exists { - s.BaseRewardFactor.Set(cast.ToFloat64(baseRewardFactor)) - } - - if epochsPerSyncComitteePeriod, exists := spec["EPOCHS_PER_SYNC_COMMITTEE_PERIOD"]; exists { - s.EpochsPerSyncCommitteePeriod.Set(cast.ToFloat64(epochsPerSyncComitteePeriod)) - } - - if effectiveBalanceIncrement, exists := spec["EFFECTIVE_BALANCE_INCREMENT"]; exists { - s.EffectiveBalanceIncrement.Set(cast.ToFloat64(effectiveBalanceIncrement)) - } - - if maxAttestations, exists := spec["MAX_ATTESTATIONS"]; exists { - s.MaxAttestations.Set(cast.ToFloat64(maxAttestations)) - } - - if minSyncCommitteeParticipants, exists := spec["MIN_SYNC_COMMITTEE_PARTICIPANTS"]; exists { - s.MinSyncCommitteeParticipants.Set(cast.ToFloat64(minSyncCommitteeParticipants)) - } - - if genesisDelay, exists := spec["GENESIS_DELAY"]; exists { - s.GenesisDelay.Set(float64(cast.ToDuration(genesisDelay))) - } - - if secondsPerSlot, exists := spec["SECONDS_PER_SLOT"]; exists { - s.SecondsPerSlot.Set(float64(cast.ToDuration(secondsPerSlot))) - } - - if maxEffectiveBalance, exists := spec["MAX_EFFECTIVE_BALANCE"]; exists { - s.MaxEffectiveBalance.Set(cast.ToFloat64(maxEffectiveBalance)) - } - - if terminalTotalDifficulty, exists := spec["TERMINAL_TOTAL_DIFFICULTY"]; exists { - ttd := cast.ToString(fmt.Sprintf("%v", terminalTotalDifficulty)) - - asBigInt, success := big.NewInt(0).SetString(ttd, 10) - if success { - trillion := big.NewInt(1e12) - divided := new(big.Int).Div(asBigInt, trillion) - asFloat, _ := new(big.Float).SetInt(divided).Float64() - s.TerminalTotalDifficultyTrillions.Set(asFloat) - s.TerminalTotalDifficulty.Set(float64(asBigInt.Uint64())) - } - } - - if maxDeposits, exists := spec["MAX_DEPOSITS"]; exists { - s.MaxDeposits.Set(cast.ToFloat64((maxDeposits))) - } - - if minGenesisActiveValidatorCount, exists := spec["MIN_GENESIS_ACTIVE_VALIDATOR_COUNT"]; exists { - s.MinGenesisActiveValidatorCount.Set(cast.ToFloat64(minGenesisActiveValidatorCount)) - } - - if targetCommitteeSize, exists := spec["TARGET_COMMITTEE_SIZE"]; exists { - s.TargetCommitteeSize.Set(cast.ToFloat64(targetCommitteeSize)) - } - - if syncCommitteeSize, exists := spec["SYNC_COMMITTEE_SIZE"]; exists { - s.SyncCommitteeSize.Set(cast.ToFloat64(syncCommitteeSize)) - } - - if eth1FollowDistance, exists := spec["ETH1_FOLLOW_DISTANCE"]; exists { - s.Eth1FollowDistance.Set(cast.ToFloat64(eth1FollowDistance)) - } - - if terminalBlockHashActivationEpoch, exists := spec["TERMINAL_BLOCK_HASH_ACTIVATION_EPOCH"]; exists { - s.TerminalBlockHashActivationEpoch.Set(cast.ToFloat64(terminalBlockHashActivationEpoch)) - } - - if minDepositAmount, exists := spec["MIN_DEPOSIT_AMOUNT"]; exists { - s.MinDepositAmount.Set(cast.ToFloat64(minDepositAmount)) - } - - if slotsPerEpoch, exists := spec["SLOTS_PER_EPOCH"]; exists { - s.SlotsPerEpoch.Set(cast.ToFloat64(slotsPerEpoch)) - } - - if presetBase, exists := spec["PRESET_BASE"]; exists { - s.PresetBase.WithLabelValues(cast.ToString(presetBase)).Set(1) - } + return s.observeSpec(ctx, spec) } diff --git a/pkg/exporter/consensus/jobs/syncstatus.go b/pkg/exporter/consensus/jobs/syncstatus.go index b9c3a30..c01f4b4 100644 --- a/pkg/exporter/consensus/jobs/syncstatus.go +++ b/pkg/exporter/consensus/jobs/syncstatus.go @@ -2,19 +2,15 @@ package jobs import ( "context" - "errors" - "time" - eth2client "github.com/attestantio/go-eth2-client" - v1 "github.com/attestantio/go-eth2-client/api/v1" "github.com/prometheus/client_golang/prometheus" - "github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/api" + "github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/beacon" "github.com/sirupsen/logrus" ) // Sync reports metrics on the sync status of the node. type Sync struct { - client eth2client.Service + beacon beacon.Node log logrus.FieldLogger Percentage prometheus.Gauge EstimatedHighestSlot prometheus.Gauge @@ -28,13 +24,13 @@ const ( ) // NewSyncJob returns a new Sync instance. -func NewSyncJob(client eth2client.Service, ap api.ConsensusClient, log logrus.FieldLogger, namespace string, constLabels map[string]string) Sync { +func NewSyncJob(beac beacon.Node, log logrus.FieldLogger, namespace string, constLabels map[string]string) Sync { constLabels["module"] = NameSync namespace += "_sync" return Sync{ - client: client, + beacon: beac, log: log, Percentage: prometheus.NewGauge( prometheus.GaugeOpts{ @@ -83,72 +79,32 @@ func (s *Sync) Name() string { return NameSync } -func (s *Sync) Start(ctx context.Context) { - s.tick(ctx) +func (s *Sync) Start(ctx context.Context) error { + if _, err := s.beacon.OnSyncStatus(ctx, func(ctx context.Context, event *beacon.SyncStatusEvent) error { + status := event.State - for { - select { - case <-ctx.Done(): - return - case <-time.After(time.Second * 15): - s.tick(ctx) - } - } -} + s.Distance.Set(float64(status.SyncDistance)) + s.HeadSlot.Set(float64(status.HeadSlot)) + s.ObserveSyncIsSyncing(status.IsSyncing) -func (s *Sync) HandleEvent(ctx context.Context, event *v1.Event) { -} + estimatedHighestHeadSlot := status.SyncDistance + status.HeadSlot + s.EstimatedHighestSlot.Set(float64(estimatedHighestHeadSlot)) -func (s *Sync) tick(ctx context.Context) { - if err := s.GetSyncState(ctx); err != nil { - s.log.WithError(err).Error("failed to get sync state") - } -} + percent := (float64(status.HeadSlot) / float64(estimatedHighestHeadSlot) * 100) + if !status.IsSyncing { + percent = 100 + } -func (s *Sync) GetSyncState(ctx context.Context) error { - provider, isProvider := s.client.(eth2client.NodeSyncingProvider) - if !isProvider { - return errors.New("client does not implement eth2client.NodeSyncingProvider") - } + s.Percentage.Set(percent) - status, err := provider.NodeSyncing(ctx) - if err != nil { + return nil + }); err != nil { return err } - s.ObserveSyncDistance(uint64(status.SyncDistance)) - s.ObserveSyncHeadSlot(uint64(status.HeadSlot)) - s.ObserveSyncIsSyncing(status.IsSyncing) - - estimatedHighestHeadSlot := status.SyncDistance + status.HeadSlot - s.ObserveSyncEstimatedHighestSlot(uint64(estimatedHighestHeadSlot)) - - percent := (float64(status.HeadSlot) / float64(estimatedHighestHeadSlot) * 100) - if !status.IsSyncing { - percent = 100 - } - - s.ObserveSyncPercentage(percent) - return nil } -func (s *Sync) ObserveSyncPercentage(percent float64) { - s.Percentage.Set(percent) -} - -func (s *Sync) ObserveSyncEstimatedHighestSlot(slot uint64) { - s.EstimatedHighestSlot.Set(float64(slot)) -} - -func (s *Sync) ObserveSyncHeadSlot(slot uint64) { - s.HeadSlot.Set(float64(slot)) -} - -func (s *Sync) ObserveSyncDistance(slots uint64) { - s.Distance.Set(float64(slots)) -} - func (s *Sync) ObserveSyncIsSyncing(syncing bool) { if syncing { s.IsSyncing.Set(1) diff --git a/pkg/exporter/consensus/metrics.go b/pkg/exporter/consensus/metrics.go index e496936..9220502 100644 --- a/pkg/exporter/consensus/metrics.go +++ b/pkg/exporter/consensus/metrics.go @@ -9,6 +9,7 @@ import ( v1 "github.com/attestantio/go-eth2-client/api/v1" "github.com/prometheus/client_golang/prometheus" "github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/api" + "github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/beacon" "github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/jobs" "github.com/sirupsen/logrus" ) @@ -33,7 +34,7 @@ type metrics struct { } // NewMetrics returns a new metrics object. -func NewMetrics(client eth2client.Service, ap api.ConsensusClient, log logrus.FieldLogger, nodeName, namespace string) Metrics { +func NewMetrics(client eth2client.Service, ap api.ConsensusClient, beac beacon.Node, log logrus.FieldLogger, nodeName, namespace string) Metrics { constLabels := make(prometheus.Labels) constLabels["ethereum_role"] = "consensus" constLabels["node_name"] = nodeName @@ -41,12 +42,12 @@ func NewMetrics(client eth2client.Service, ap api.ConsensusClient, log logrus.Fi m := &metrics{ log: log, client: client, - generalMetrics: jobs.NewGeneralJob(client, ap, log, namespace, constLabels), - specMetrics: jobs.NewSpecJob(client, ap, log, namespace, constLabels), - syncMetrics: jobs.NewSyncJob(client, ap, log, namespace, constLabels), - forkMetrics: jobs.NewForksJob(client, ap, log, namespace, constLabels), - beaconMetrics: jobs.NewBeaconJob(client, ap, log, namespace, constLabels), - eventMetrics: jobs.NewEventJob(client, ap, log, namespace, constLabels), + generalMetrics: jobs.NewGeneralJob(beac, log, namespace, constLabels), + specMetrics: jobs.NewSpecJob(beac, log, namespace, constLabels), + syncMetrics: jobs.NewSyncJob(beac, log, namespace, constLabels), + forkMetrics: jobs.NewForksJob(beac, log, namespace, constLabels), + beaconMetrics: jobs.NewBeaconJob(client, ap, beac, log, namespace, constLabels), + eventMetrics: jobs.NewEventJob(client, beac, log, namespace, constLabels), } prometheus.MustRegister(m.generalMetrics.NodeVersion) @@ -98,6 +99,8 @@ func NewMetrics(client eth2client.Service, ap api.ConsensusClient, log logrus.Fi prometheus.MustRegister(m.beaconMetrics.ReOrgDepth) prometheus.MustRegister(m.beaconMetrics.FinalityCheckpointHash) prometheus.MustRegister(m.beaconMetrics.HeadSlotHash) + prometheus.MustRegister(m.beaconMetrics.ProposerDelay) + prometheus.MustRegister(m.beaconMetrics.EmptySlots) prometheus.MustRegister(m.eventMetrics.Count) prometheus.MustRegister(m.eventMetrics.TimeSinceLastEvent) @@ -106,12 +109,42 @@ func NewMetrics(client eth2client.Service, ap api.ConsensusClient, log logrus.Fi } func (m *metrics) StartAsync(ctx context.Context) { - go m.generalMetrics.Start(ctx) - go m.specMetrics.Start(ctx) - go m.syncMetrics.Start(ctx) - go m.forkMetrics.Start(ctx) - go m.beaconMetrics.Start(ctx) - go m.eventMetrics.Start(ctx) + go func() { + if err := m.generalMetrics.Start(ctx); err != nil { + m.log.Errorf("Failed to start general metrics: %v", err) + } + }() + + go func() { + if err := m.specMetrics.Start(ctx); err != nil { + m.log.Errorf("Failed to start spec metrics: %v", err) + } + }() + + go func() { + if err := m.syncMetrics.Start(ctx); err != nil { + m.log.Errorf("Failed to start sync metrics: %v", err) + } + }() + + go func() { + if err := m.forkMetrics.Start(ctx); err != nil { + m.log.Errorf("Failed to start fork metrics: %v", err) + } + }() + + go func() { + if err := m.beaconMetrics.Start(ctx); err != nil { + m.log.Errorf("Failed to start beacon metrics: %v", err) + } + }() + + go func() { + if err := m.eventMetrics.Start(ctx); err != nil { + m.log.Errorf("Failed to start event metrics: %v", err) + } + }() + go m.subscriptionLoop(ctx) } @@ -169,10 +202,5 @@ func (m *metrics) startSubscriptions(ctx context.Context) error { } func (m *metrics) handleEvent(ctx context.Context, event *v1.Event) { - m.generalMetrics.HandleEvent(ctx, event) - m.specMetrics.HandleEvent(ctx, event) - m.syncMetrics.HandleEvent(ctx, event) - m.forkMetrics.HandleEvent(ctx, event) m.beaconMetrics.HandleEvent(ctx, event) - m.eventMetrics.HandleEvent(ctx, event) } diff --git a/pkg/exporter/exporter.go b/pkg/exporter/exporter.go index fdd6e60..115ed8e 100644 --- a/pkg/exporter/exporter.go +++ b/pkg/exporter/exporter.go @@ -2,12 +2,21 @@ package exporter import ( "context" + "errors" "fmt" "net/http" "strings" + "time" + eth2client "github.com/attestantio/go-eth2-client" + ehttp "github.com/attestantio/go-eth2-client/http" + "github.com/nats-io/nats-server/v2/server" + "github.com/nats-io/nats.go" "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/rs/zerolog" "github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus" + "github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/api" + "github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/beacon" "github.com/samcm/ethereum-metrics-exporter/pkg/exporter/disk" "github.com/samcm/ethereum-metrics-exporter/pkg/exporter/execution" "github.com/samcm/ethereum-metrics-exporter/pkg/exporter/pair" @@ -27,44 +36,67 @@ type Exporter interface { // NewExporter returns a new Exporter instance func NewExporter(log logrus.FieldLogger, conf *Config) Exporter { return &exporter{ - log: log.WithField("component", "exporter"), - config: conf, + log: log.WithField("component", "exporter"), + config: conf, + namespace: "eth", } } type exporter struct { - log logrus.FieldLogger - config *Config - consensus consensus.Node + // Helpers + namespace string + log logrus.FieldLogger + config *Config + // Exporters + consensus consensus.Metrics execution execution.Node diskUsage disk.UsageMetrics pairMetrics pair.Metrics + + // Nats + broker *server.Server + brokerConn *nats.EncodedConn + + // Clients + beacon beacon.Node + client eth2client.Service + api api.ConsensusClient } func (e *exporter) Init(ctx context.Context) error { e.log.Info("Initializing...") - namespace := "eth" + natsServer, err := server.NewServer(&server.Options{}) + if err != nil { + return err + } - if e.config.Consensus.Enabled { - e.log.Info("Initializing consensus...") + e.broker = natsServer - consensusNode, err := consensus.NewConsensusNode(ctx, e.log.WithField("exporter", "consensus"), fmt.Sprintf("%s_con", namespace), e.config.Consensus.Name, e.config.Consensus.URL) - if err != nil { - return err - } + // Start the nats server via goroutine + go e.broker.Start() - if err := consensusNode.Bootstrap(ctx); err != nil { - e.log.WithError(err).Error("failed to bootstrap consnesus node") - } + if !e.broker.ReadyForConnections(15 * time.Second) { + return errors.New("nats server failed to start") + } + + nc, err := nats.Connect(e.broker.ClientURL()) + if err != nil { + return err + } - e.consensus = consensusNode + // Create a NATS encoded connection to the nats server + conn, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER) + if err != nil { + return err } + e.brokerConn = conn + if e.config.Execution.Enabled { e.log.WithField("modules", strings.Join(e.config.Execution.Modules, ", ")).Info("Initializing execution...") - executionNode, err := execution.NewExecutionNode(ctx, e.log.WithField("exporter", "execution"), fmt.Sprintf("%s_exe", namespace), e.config.Execution.Name, e.config.Execution.URL, e.config.Execution.Modules) + executionNode, err := execution.NewExecutionNode(ctx, e.log.WithField("exporter", "execution"), fmt.Sprintf("%s_exe", e.namespace), e.config.Execution.Name, e.config.Execution.URL, e.config.Execution.Modules) if err != nil { return err } @@ -79,7 +111,7 @@ func (e *exporter) Init(ctx context.Context) error { if e.config.DiskUsage.Enabled { e.log.Info("Initializing disk usage...") - diskUsage, err := disk.NewUsage(ctx, e.log.WithField("exporter", "disk"), fmt.Sprintf("%s_disk", namespace), e.config.DiskUsage.Directories) + diskUsage, err := disk.NewUsage(ctx, e.log.WithField("exporter", "disk"), fmt.Sprintf("%s_disk", e.namespace), e.config.DiskUsage.Directories) if err != nil { return err } @@ -87,17 +119,6 @@ func (e *exporter) Init(ctx context.Context) error { e.diskUsage = diskUsage } - if e.config.Pair.Enabled && e.config.Execution.Enabled && e.config.Consensus.Enabled { - e.log.Info("Initializing pair...") - - pairMetrics, err := pair.NewMetrics(ctx, e.log.WithField("exporter", "pair"), fmt.Sprintf("%s_pair", namespace), e.config.Consensus.URL, e.config.Execution.URL) - if err != nil { - return err - } - - e.pairMetrics = pairMetrics - } - return nil } @@ -106,8 +127,22 @@ func (e *exporter) Config(ctx context.Context) *Config { } func (e *exporter) Serve(ctx context.Context, port int) error { + e.log. + WithField("consensus_url", e.config.Consensus.URL). + WithField("execution_url", e.execution.URL()). + Info(fmt.Sprintf("Starting metrics server on :%v", port)) + + http.Handle("/metrics", promhttp.Handler()) + + go func() { + err := http.ListenAndServe(fmt.Sprintf(":%v", port), nil) + if err != nil { + e.log.Fatal(err) + } + }() + if e.config.Execution.Enabled { - e.log.Info("Starting execution metrics...") + e.log.WithField("execution_url", e.execution.URL()).Info("Starting execution metrics...") go e.execution.StartMetrics(ctx) } @@ -118,26 +153,119 @@ func (e *exporter) Serve(ctx context.Context, port int) error { go e.diskUsage.StartAsync(ctx) } + if e.config.Pair.Enabled && e.config.Execution.Enabled && e.config.Consensus.Enabled { + if err := e.ensureConsensusClients(ctx); err != nil { + e.log.Fatal(err) + } + + if _, err := e.beacon.OnReady(ctx, func(ctx context.Context, event *beacon.ReadyEvent) error { + e.pairMetrics.StartAsync(ctx) + return nil + }); err != nil { + e.log.WithError(err).Error("Failed to subscribe to beacon node ready event") + } + + if err := e.startPairExporter(ctx); err != nil { + e.log.WithError(err).Error("failed to start pair metrics") + + e.log.Fatal(err) + } + } + if e.config.Consensus.Enabled { - e.log.Info("Starting consensus metrics...") + if err := e.ensureConsensusClients(ctx); err != nil { + e.log.Fatal(err) + } - go e.consensus.StartMetrics(ctx) + if err := e.startConsensusExporter(ctx); err != nil { + e.log.WithError(err).Error("failed to start consensus") + + e.log.Fatal(err) + } + + if _, err := e.beacon.OnReady(ctx, func(ctx context.Context, event *beacon.ReadyEvent) error { + e.consensus.StartAsync(ctx) + + return nil + }); err != nil { + e.log.WithError(err).Error("Failed to subscribe to beacon node ready event") + } + + e.beacon.StartAsync(ctx) } - if e.config.Pair.Enabled && e.config.Execution.Enabled && e.config.Consensus.Enabled { - e.log.Info("Starting pair metrics...") + return nil +} - go e.pairMetrics.StartAsync(ctx) +func (e *exporter) bootstrapConsensusClients(ctx context.Context) error { + client, err := ehttp.New(ctx, + ehttp.WithAddress(e.config.Consensus.URL), + ehttp.WithLogLevel(zerolog.Disabled), + ) + if err != nil { + return err } - e.log. - WithField("consensus_url", e.consensus.URL()). - WithField("execution_url", e.execution.URL()). - Info(fmt.Sprintf("Starting metrics server on :%v", port)) + e.client = client + e.api = api.NewConsensusClient(ctx, e.log, e.config.Consensus.URL) + e.beacon = beacon.NewNode(ctx, e.log, e.api, e.client, e.brokerConn) - http.Handle("/metrics", promhttp.Handler()) + return nil +} - err := http.ListenAndServe(fmt.Sprintf(":%v", port), nil) +func (e *exporter) ensureConsensusClients(ctx context.Context) error { + for { + if e.client != nil { + _, isProvider := e.client.(eth2client.NodeSyncingProvider) + if isProvider { + break + } + } + + select { + case <-ctx.Done(): + return ctx.Err() + default: + time.Sleep(1 * time.Second) + + if err := e.bootstrapConsensusClients(ctx); err != nil { + e.log.WithError(err).Error("failed to bootstrap consensus node") - return err + continue + } + + break + } + } + + return nil +} + +func (e *exporter) startConsensusExporter(ctx context.Context) error { + if err := e.ensureConsensusClients(ctx); err != nil { + return err + } + + e.log.Info("Starting consensus metrics...") + + conMetrics := consensus.NewMetrics(e.client, e.api, e.beacon, e.log.WithField("exporter", "consensus"), e.config.Consensus.Name, fmt.Sprintf("%s_con", e.namespace)) + + e.consensus = conMetrics + + return nil +} + +func (e *exporter) startPairExporter(ctx context.Context) error { + if err := e.ensureConsensusClients(ctx); err != nil { + return err + } + + pairMetrics, err := pair.NewMetrics(ctx, e.log.WithField("exporter", "pair"), fmt.Sprintf("%s_pair", e.namespace), e.beacon, e.config.Execution.URL) + if err != nil { + return err + } + + e.pairMetrics = pairMetrics + + return nil } diff --git a/pkg/exporter/pair/pair.go b/pkg/exporter/pair/pair.go index 9df2fca..0fef37a 100644 --- a/pkg/exporter/pair/pair.go +++ b/pkg/exporter/pair/pair.go @@ -3,18 +3,14 @@ package pair import ( "context" "errors" - "fmt" "math/big" "time" - eth2client "github.com/attestantio/go-eth2-client" - "github.com/attestantio/go-eth2-client/http" "github.com/ethereum/go-ethereum/ethclient" "github.com/onrik/ethrpc" "github.com/prometheus/client_golang/prometheus" - "github.com/rs/zerolog" + "github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/beacon" "github.com/sirupsen/logrus" - "github.com/spf13/cast" ) // Metrics reports pair metrics @@ -29,30 +25,27 @@ type pair struct { consensusMechanism *prometheus.GaugeVec executionClient *ethclient.Client - consensusClient eth2client.Service + beacon beacon.Node ethrpcClient *ethrpc.EthRPC bootstrapped bool - consensusURL string executionURL string - totalDifficulty *big.Int - terminalTotalDifficulty *big.Int - networkID *big.Int + totalDifficulty *big.Int + networkID *big.Int networkIDFetchedAt time.Time - ttdFetchedAt time.Time tdFetchedAt time.Time } // NewMetrics returns a new Metrics instance. -func NewMetrics(ctx context.Context, log logrus.FieldLogger, namespace, consensusURL, executionURL string) (Metrics, error) { +func NewMetrics(ctx context.Context, log logrus.FieldLogger, namespace string, beac beacon.Node, executionURL string) (Metrics, error) { p := &pair{ log: log, executionURL: executionURL, - consensusURL: consensusURL, - consensusClient: nil, + beacon: beac, + executionClient: nil, ethrpcClient: nil, @@ -70,8 +63,7 @@ func NewMetrics(ctx context.Context, log logrus.FieldLogger, namespace, consensu }, ), - totalDifficulty: big.NewInt(0), - terminalTotalDifficulty: big.NewInt(0), + totalDifficulty: big.NewInt(0), } prometheus.MustRegister(p.consensusMechanism) @@ -80,16 +72,6 @@ func NewMetrics(ctx context.Context, log logrus.FieldLogger, namespace, consensu } func (p *pair) Bootstrap(ctx context.Context) error { - consenusClient, err := http.New(ctx, - http.WithAddress(p.consensusURL), - http.WithLogLevel(zerolog.Disabled), - ) - if err != nil { - return err - } - - p.consensusClient = consenusClient - executionClient, err := ethclient.Dial(p.executionURL) if err != nil { return err @@ -117,12 +99,6 @@ func (p *pair) StartAsync(ctx context.Context) { } } - if time.Since(p.ttdFetchedAt) > 15*time.Minute { - if err := p.fetchTerminalTotalDifficulty(ctx); err != nil { - p.log.WithError(err).Error("Failed to fetch terminal total difficulty") - } - } - if time.Since(p.tdFetchedAt) > 12*time.Second { if err := p.fetchTotalDifficulty(ctx); err != nil { p.log.WithError(err).Error("Failed to fetch total difficulty") @@ -174,45 +150,16 @@ func (p *pair) fetchNetworkID(ctx context.Context) error { return nil } -func (p *pair) fetchTerminalTotalDifficulty(ctx context.Context) error { - provider, isProvider := p.consensusClient.(eth2client.SpecProvider) - if !isProvider { - return errors.New("client does not implement eth2client.SpecProvider") - } - - spec, err := provider.Spec(ctx) +func (p *pair) deriveConsensusMechanism(ctx context.Context) error { + spec, err := p.beacon.GetSpec(ctx) if err != nil { return err } - terminalTotalDifficulty, exists := spec["TERMINAL_TOTAL_DIFFICULTY"] - if !exists { - return errors.New("TERMINAL_TOTAL_DIFFICULTY not found in spec") - } - - ttd := cast.ToString(fmt.Sprintf("%v", terminalTotalDifficulty)) - - asBigInt, success := big.NewInt(0).SetString(ttd, 10) - if !success { - return errors.New("TERMINAL_TOTAL_DIFFICULTY not a valid integer") - } - - p.terminalTotalDifficulty = asBigInt - - p.ttdFetchedAt = time.Now() - - return nil -} - -func (p *pair) deriveConsensusMechanism(ctx context.Context) error { if p.totalDifficulty == big.NewInt(0) { return errors.New("total difficulty not fetched") } - if p.terminalTotalDifficulty == big.NewInt(0) { - return errors.New("terminal total difficulty not fetched") - } - if p.networkID == big.NewInt(0) { return errors.New("network ID not fetched") } @@ -224,7 +171,7 @@ func (p *pair) deriveConsensusMechanism(ctx context.Context) error { consensusMechanism = value } - if p.totalDifficulty.Cmp(p.terminalTotalDifficulty) >= 0 { + if p.totalDifficulty.Cmp(&spec.TerminalTotalDifficulty) >= 0 { consensusMechanism = ProofOfStake }