From 63ebc1a9b4cbfab19b54b59961df7c054f2ce107 Mon Sep 17 00:00:00 2001 From: "terry.hung" Date: Thu, 18 Apr 2024 00:19:40 +0800 Subject: [PATCH 1/2] feat: add builtin-actor-event task --- chain/datasource/datasource.go | 4 + chain/indexer/integrated/processor/state.go | 3 + .../processor/state_internal_test.go | 2 +- .../integrated/processor/state_test.go | 2 +- chain/indexer/tasktype/table_tasks.go | 5 + chain/indexer/tasktype/tasks.go | 1 + chain/indexer/tasktype/tasks_test.go | 2 +- go.mod | 2 + go.sum | 4 + lens/interface.go | 5 + lens/lily/api.go | 1 + lens/lily/impl.go | 5 + lens/lily/struct.go | 5 + .../actors/builtinactor/builtinactorevents.go | 43 +++ schemas/v1/36_builtin_actor_event.go | 18 ++ tasks/api.go | 1 + tasks/messages/builtinactorevent/task.go | 265 ++++++++++++++++++ 17 files changed, 365 insertions(+), 3 deletions(-) create mode 100644 model/actors/builtinactor/builtinactorevents.go create mode 100644 schemas/v1/36_builtin_actor_event.go create mode 100644 tasks/messages/builtinactorevent/task.go diff --git a/chain/datasource/datasource.go b/chain/datasource/datasource.go index 7dffdbf9..efcc88c2 100644 --- a/chain/datasource/datasource.go +++ b/chain/datasource/datasource.go @@ -172,6 +172,10 @@ func (t *DataSource) EthGetTransactionReceipt(ctx context.Context, txHash ethtyp return t.node.EthGetTransactionReceipt(ctx, txHash) } +func (t *DataSource) GetActorEventsRaw(ctx context.Context, filter *types.ActorEventFilter) ([]*types.ActorEvent, error) { + return t.node.GetActorEventsRaw(ctx, filter) +} + // TipSetMessageReceipts returns the blocks and messages in `pts` and their corresponding receipts from `ts` matching block order in tipset (`pts`). // TODO replace with lotus chainstore method when https://github.com/filecoin-project/lotus/pull/9186 lands func (t *DataSource) TipSetMessageReceipts(ctx context.Context, ts, pts *types.TipSet) ([]*lens.BlockMessageReceipts, error) { diff --git a/chain/indexer/integrated/processor/state.go b/chain/indexer/integrated/processor/state.go index 97ea60d2..f7b38794 100644 --- a/chain/indexer/integrated/processor/state.go +++ b/chain/indexer/integrated/processor/state.go @@ -31,6 +31,7 @@ import ( "github.com/filecoin-project/lily/tasks" "github.com/filecoin-project/lily/tasks/messageexecutions/vm" "github.com/filecoin-project/lily/tasks/messages/actorevent" + "github.com/filecoin-project/lily/tasks/messages/builtinactorevent" "github.com/filecoin-project/lily/tasks/messages/messageparam" "github.com/filecoin-project/lily/tasks/messages/receiptreturn" @@ -779,6 +780,8 @@ func MakeProcessors(api tasks.DataSource, indexerTasks []string) (*IndexerProces out.TipsetsProcessors[t] = vm.NewTask(api) case tasktype.ActorEvent: out.TipsetsProcessors[t] = actorevent.NewTask(api) + case tasktype.BuiltInActorEvent: + out.TipsetsProcessors[t] = builtinactorevent.NewTask(api) case tasktype.ReceiptReturn: out.TipsetsProcessors[t] = receiptreturn.NewTask(api) diff --git a/chain/indexer/integrated/processor/state_internal_test.go b/chain/indexer/integrated/processor/state_internal_test.go index a910930e..0654cb32 100644 --- a/chain/indexer/integrated/processor/state_internal_test.go +++ b/chain/indexer/integrated/processor/state_internal_test.go @@ -52,7 +52,7 @@ func TestNewProcessor(t *testing.T) { require.Equal(t, t.Name(), proc.name) require.Len(t, proc.actorProcessors, 25) require.Len(t, proc.tipsetProcessors, 10) - require.Len(t, proc.tipsetsProcessors, 14) + require.Len(t, proc.tipsetsProcessors, 15) require.Len(t, proc.builtinProcessors, 1) require.Equal(t, gasoutput.NewTask(nil), proc.tipsetsProcessors[tasktype.GasOutputs]) diff --git a/chain/indexer/integrated/processor/state_test.go b/chain/indexer/integrated/processor/state_test.go index 95ce406c..78aa6204 100644 --- a/chain/indexer/integrated/processor/state_test.go +++ b/chain/indexer/integrated/processor/state_test.go @@ -441,6 +441,6 @@ func TestMakeProcessorsAllTasks(t *testing.T) { require.NoError(t, err) require.Len(t, proc.ActorProcessors, 25) require.Len(t, proc.TipsetProcessors, 10) - require.Len(t, proc.TipsetsProcessors, 14) + require.Len(t, proc.TipsetsProcessors, 15) require.Len(t, proc.ReportProcessors, 1) } diff --git a/chain/indexer/tasktype/table_tasks.go b/chain/indexer/tasktype/table_tasks.go index 9efe6bed..f5b39e3d 100644 --- a/chain/indexer/tasktype/table_tasks.go +++ b/chain/indexer/tasktype/table_tasks.go @@ -53,6 +53,7 @@ const ( FEVMTrace = "fevm_traces" FEVMActorDump = "fevm_actor_dumps" MinerActorDump = "miner_actor_dumps" + BuiltInActorEvent = "builtin_actor_event" ) var AllTableTasks = []string{ @@ -107,6 +108,7 @@ var AllTableTasks = []string{ FEVMTrace, FEVMActorDump, MinerActorDump, + BuiltInActorEvent, } var TableLookup = map[string]struct{}{ @@ -161,6 +163,7 @@ var TableLookup = map[string]struct{}{ FEVMTrace: {}, FEVMActorDump: {}, MinerActorDump: {}, + BuiltInActorEvent: {}, } var TableComment = map[string]string{ @@ -215,6 +218,7 @@ var TableComment = map[string]string{ FEVMTrace: ``, FEVMActorDump: ``, MinerActorDump: ``, + BuiltInActorEvent: ``, } var TableFieldComments = map[string]map[string]string{ @@ -442,4 +446,5 @@ var TableFieldComments = map[string]map[string]string{ "RawBytePower": "Claims", "TotalLockedFunds": "Locked Funds", }, + BuiltInActorEvent: {}, } diff --git a/chain/indexer/tasktype/tasks.go b/chain/indexer/tasktype/tasks.go index cb467af1..45081cbe 100644 --- a/chain/indexer/tasktype/tasks.go +++ b/chain/indexer/tasktype/tasks.go @@ -78,6 +78,7 @@ var TaskLookup = map[string][]string{ ActorEvent, MessageParam, ReceiptReturn, + BuiltInActorEvent, }, ChainEconomicsTask: { ChainEconomics, diff --git a/chain/indexer/tasktype/tasks_test.go b/chain/indexer/tasktype/tasks_test.go index 6ef4e84f..b312d852 100644 --- a/chain/indexer/tasktype/tasks_test.go +++ b/chain/indexer/tasktype/tasks_test.go @@ -101,7 +101,7 @@ func TestMakeAllTaskAliasNames(t *testing.T) { } func TestMakeAllTaskNames(t *testing.T) { - const TotalTableTasks = 51 + const TotalTableTasks = 52 actual, err := tasktype.MakeTaskNames(tasktype.AllTableTasks) require.NoError(t, err) // if this test fails it means a new task name was added, update the above test diff --git a/go.mod b/go.mod index dd66f4fb..92e49758 100644 --- a/go.mod +++ b/go.mod @@ -142,6 +142,7 @@ require ( github.com/filecoin-project/specs-actors/v8 v8.0.1 // indirect github.com/flynn/noise v1.1.0 // indirect github.com/francoispqt/gojay v1.2.13 // indirect + github.com/fxamacker/cbor/v2 v2.6.0 // indirect github.com/gammazero/deque v0.2.1 // indirect github.com/gbrlsnchs/jwt/v3 v3.0.1 // indirect github.com/gdamore/encoding v1.0.0 // indirect @@ -315,6 +316,7 @@ require ( github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 // indirect github.com/whyrusleeping/ledger-filecoin-go v0.9.1-0.20201010031517-c3dcc1bddce4 // indirect github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 // indirect + github.com/x448/float16 v0.8.4 // indirect github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect github.com/zondax/hid v0.9.1 // indirect github.com/zondax/ledger-go v0.12.1 // indirect diff --git a/go.sum b/go.sum index 65e1d51e..00abe807 100644 --- a/go.sum +++ b/go.sum @@ -391,6 +391,8 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= +github.com/fxamacker/cbor/v2 v2.6.0 h1:sU6J2usfADwWlYDAFhZBQ6TnLFBHxgesMrQfQgk1tWA= +github.com/fxamacker/cbor/v2 v2.6.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ= github.com/gammazero/deque v0.2.1 h1:qSdsbG6pgp6nL7A0+K/B7s12mcCY/5l5SIUpMOl+dC0= github.com/gammazero/deque v0.2.1/go.mod h1:LFroj8x4cMYCukHJDbxFCkT+r9AndaJnFMuZDV34tuU= github.com/gammazero/workerpool v1.1.3 h1:WixN4xzukFoN0XSeXF6puqEqFTl2mECI9S6W44HWy9Q= @@ -1702,6 +1704,8 @@ github.com/whyrusleeping/mdns v0.0.0-20190826153040-b9b60ed33aa9/go.mod h1:j4l84 github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 h1:E9S12nwJwEOXe2d6gT6qxdvqMnNq+VnSsKPgm2ZZNds= github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7/go.mod h1:X2c0RVCI1eSUFI8eLcY3c0423ykwiUdxLJtkDvruhjI= github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE= +github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= +github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/xorcare/golden v0.6.0/go.mod h1:7T39/ZMvaSEZlBPoYfVFmsBLmUl3uz9IuzWj/U6FtvQ= github.com/xorcare/golden v0.6.1-0.20191112154924-b87f686d7542 h1:oWgZJmC1DorFZDpfMfWg7xk29yEOZiXmo/wZl+utTI8= diff --git a/lens/interface.go b/lens/interface.go index 7d850d93..e8221feb 100644 --- a/lens/interface.go +++ b/lens/interface.go @@ -24,6 +24,7 @@ type API interface { StateAPI VMAPI EthModuleAPI + ActorEventAPI GetMessageExecutionsForTipSet(ctx context.Context, ts, pts *types.TipSet) ([]*MessageExecution, error) } @@ -86,6 +87,10 @@ type EthModuleAPI interface { EthGetTransactionByHash(ctx context.Context, txHash *ethtypes.EthHash) (*ethtypes.EthTx, error) } +type ActorEventAPI interface { + GetActorEventsRaw(ctx context.Context, filter *types.ActorEventFilter) ([]*types.ActorEvent, error) +} + type MessageExecution struct { Cid cid.Cid StateRoot cid.Cid diff --git a/lens/lily/api.go b/lens/lily/api.go index fa163391..2dbd5499 100644 --- a/lens/lily/api.go +++ b/lens/lily/api.go @@ -62,6 +62,7 @@ type LilyAPI interface { ChainGetMessagesInTipset(ctx context.Context, tsk types.TipSetKey) ([]api.Message, error) //perm:read EthGetTransactionByHash(ctx context.Context, txHash *ethtypes.EthHash) (*ethtypes.EthTx, error) //perm:read StateListActors(ctx context.Context, tsk types.TipSetKey) ([]address.Address, error) //perm:read + GetActorEventsRaw(ctx context.Context, filter *types.ActorEventFilter) ([]*types.ActorEvent, error) //perm:read // SyncIncomingBlocks returns a channel streaming incoming, potentially not // yet synced block headers. diff --git a/lens/lily/impl.go b/lens/lily/impl.go index c5bf02b9..5a3337c3 100644 --- a/lens/lily/impl.go +++ b/lens/lily/impl.go @@ -59,6 +59,7 @@ type LilyNodeAPI struct { full.StateAPI full.SyncAPI full.EthModuleAPI + full.ActorEventAPI common.CommonAPI Events *events.Events Scheduler *schedule.Scheduler @@ -575,6 +576,10 @@ func (m *LilyNodeAPI) StateListActors(ctx context.Context, tsk types.TipSetKey) return m.StateAPI.StateListActors(ctx, tsk) } +func (m *LilyNodeAPI) GetActorEventsRaw(ctx context.Context, filter *types.ActorEventFilter) ([]*types.ActorEvent, error) { + return m.ActorEventAPI.GetActorEventsRaw(ctx, filter) +} + // MessagesForTipSetBlocks returns messages stored in the blocks of the specified tipset, messages may be duplicated // across the returned set of BlockMessages. func (m *LilyNodeAPI) MessagesForTipSetBlocks(ctx context.Context, ts *types.TipSet) ([]*lens.BlockMessages, error) { diff --git a/lens/lily/struct.go b/lens/lily/struct.go index db381bad..2ea0b730 100644 --- a/lens/lily/struct.go +++ b/lens/lily/struct.go @@ -71,6 +71,7 @@ type LilyAPIStruct struct { ChainGetMessagesInTipset func(ctx context.Context, tsk types.TipSetKey) ([]api.Message, error) `perm:"read"` EthGetTransactionByHash func(ctx context.Context, txHash *ethtypes.EthHash) (*ethtypes.EthTx, error) `perm:"read"` StateListActors func(ctx context.Context, tsk types.TipSetKey) ([]address.Address, error) `perm:"read"` + GetActorEventsRaw func(ctx context.Context, filter *types.ActorEventFilter) ([]*types.ActorEvent, error) `perm:"read"` // SyncIncomingBlocks returns a channel streaming incoming, potentially not // yet synced block headers. @@ -304,3 +305,7 @@ func (s *LilyAPIStruct) EthGetTransactionByHash(ctx context.Context, txHash *eth func (s *LilyAPIStruct) SyncIncomingBlocks(ctx context.Context) (<-chan *types.BlockHeader, error) { return s.Internal.SyncIncomingBlocks(ctx) } + +func (s *LilyAPIStruct) GetActorEventsRaw(ctx context.Context, filter *types.ActorEventFilter) ([]*types.ActorEvent, error) { + return s.Internal.GetActorEventsRaw(ctx, filter) +} diff --git a/model/actors/builtinactor/builtinactorevents.go b/model/actors/builtinactor/builtinactorevents.go new file mode 100644 index 00000000..0353fcd5 --- /dev/null +++ b/model/actors/builtinactor/builtinactorevents.go @@ -0,0 +1,43 @@ +package builtinactor + +import ( + "context" + + "go.opencensus.io/tag" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + + "github.com/filecoin-project/lily/metrics" + "github.com/filecoin-project/lily/model" +) + +type BuiltInActorEvent struct { + tableName struct{} `pg:"builtin_actor_events"` // nolint: structcheck + + Height int64 `pg:",pk,notnull,use_zero"` + Cid string `pg:",pk,notnull"` + Emitter string `pg:",pk,notnull"` + EventType string `pg:",pk,notnull"` + EventEntries string `pg:",type:jsonb"` + EventPayload string `pg:",type:jsonb"` +} + +func (ds *BuiltInActorEvent) Persist(ctx context.Context, s model.StorageBatch, _ model.Version) error { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "builtin_actor_events")) + metrics.RecordCount(ctx, metrics.PersistModel, 1) + return s.PersistModel(ctx, ds) +} + +type BuiltInActorEvents []*BuiltInActorEvent + +func (dss BuiltInActorEvents) Persist(ctx context.Context, s model.StorageBatch, _ model.Version) error { + ctx, span := otel.Tracer("").Start(ctx, "BuiltInActorEvents.Persist") + if span.IsRecording() { + span.SetAttributes(attribute.Int("count", len(dss))) + } + defer span.End() + + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "builtin_actor_events")) + metrics.RecordCount(ctx, metrics.PersistModel, len(dss)) + return s.PersistModel(ctx, dss) +} diff --git a/schemas/v1/36_builtin_actor_event.go b/schemas/v1/36_builtin_actor_event.go new file mode 100644 index 00000000..45a0b3fb --- /dev/null +++ b/schemas/v1/36_builtin_actor_event.go @@ -0,0 +1,18 @@ +package v1 + +func init() { + patches.Register( + 36, + ` + CREATE TABLE IF NOT EXISTS {{ .SchemaName | default "public"}}.builtin_actor_events ( + height BIGINT NOT NULL, + cid TEXT, + emitter TEXT, + event_type TEXT, + event_entries JSONB, + event_payload JSONB, + PRIMARY KEY(height, cid, emitter, event_type) + ); +`, + ) +} diff --git a/tasks/api.go b/tasks/api.go index 7757d6cd..afbdf6bc 100644 --- a/tasks/api.go +++ b/tasks/api.go @@ -86,6 +86,7 @@ type DataSource interface { ChainGetMessagesInTipset(ctx context.Context, tsk types.TipSetKey) ([]api.Message, error) EthGetTransactionByHash(ctx context.Context, txHash *ethtypes.EthHash) (*ethtypes.EthTx, error) StateListActors(ctx context.Context, tsk types.TipSetKey) ([]address.Address, error) + GetActorEventsRaw(ctx context.Context, filter *types.ActorEventFilter) ([]*types.ActorEvent, error) SetIdRobustAddressMap(ctx context.Context, tsk types.TipSetKey) error LookupRobustAddress(ctx context.Context, idAddr address.Address, tsk types.TipSetKey) (address.Address, error) diff --git a/tasks/messages/builtinactorevent/task.go b/tasks/messages/builtinactorevent/task.go new file mode 100644 index 00000000..a619ca2a --- /dev/null +++ b/tasks/messages/builtinactorevent/task.go @@ -0,0 +1,265 @@ +package builtinactorevent + +import ( + "context" + "fmt" + "strconv" + + b64 "encoding/base64" + "encoding/json" + + "github.com/filecoin-project/lily/model" + "github.com/filecoin-project/lily/model/actors/builtinactor" + visormodel "github.com/filecoin-project/lily/model/visor" + "github.com/filecoin-project/lily/tasks" + "github.com/filecoin-project/lotus/chain/types" + "github.com/fxamacker/cbor/v2" + "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log/v2" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" +) + +var log = logging.Logger("lily/tasks/mineractordump") + +type Task struct { + node tasks.DataSource +} + +func NewTask(node tasks.DataSource) *Task { + return &Task{ + node: node, + } +} + +var ( + fields map[string][]types.ActorEventBlock + convert map[string]string +) + +type KVEvent struct { + Key string + Value string +} + +const ( + INT = "int" + STRING = "string" + CID = "cid" + BIGINT = "bigint" +) + +func init() { + // ------------ fields ------------ + const ( + VerifierBalance = "cHZlcmlmaWVyLWJhbGFuY2U=" // verifier-balance + Allocation = "amFsbG9jYXRpb24=" // allocation + AllocationRemoved = "cmFsbG9jYXRpb24tcmVtb3ZlZA==" // allocation-removed + Claim = "ZWNsYWlt" // claim + ClaimUpdated = "bWNsYWltLXVwZGF0ZWQ=" // claim-updated + ClaimRemoved = "bWNsYWltLXJlbW92ZWQ=" // claim-removed + DealPublished = "ZGVhbC1wdWJsaXNoZWQ=" // deal-published + DealActivated = "bmRlYWwtYWN0aXZhdGVk" // deal-activated + DealTerminated = "b2RlYWwtdGVybWluYXRlZA==" // deal-terminated + DealCompleted = "bmRlYWwtY29tcGxldGVk" // deal-completed + SectorPrecommitted = "c3NlY3Rvci1wcmVjb21taXR0ZWQ=" // sector-precommitted + SectorActivated = "cHNlY3Rvci1hY3RpdmF0ZWQ=" // sector-activated + SectorUpdated = "bnNlY3Rvci11cGRhdGVk" // sector-updated + SectorTerminated = "cXNlY3Rvci10ZXJtaW5hdGVk" // sector-terminated + ) + + verifierBalanceByte, _ := b64.StdEncoding.DecodeString(VerifierBalance) + allocationByte, _ := b64.StdEncoding.DecodeString(Allocation) + allocationRemovedByte, _ := b64.StdEncoding.DecodeString(AllocationRemoved) + claimByte, _ := b64.StdEncoding.DecodeString(Claim) + claimUpdatedByte, _ := b64.StdEncoding.DecodeString(ClaimUpdated) + claimRemovedByte, _ := b64.StdEncoding.DecodeString(ClaimRemoved) + dealPublishedByte, _ := b64.StdEncoding.DecodeString(DealPublished) + dealActivatedByte, _ := b64.StdEncoding.DecodeString(DealActivated) + dealTerminatedByte, _ := b64.StdEncoding.DecodeString(DealTerminated) + dealCompletedByte, _ := b64.StdEncoding.DecodeString(DealCompleted) + sectorPrecommittedByte, _ := b64.StdEncoding.DecodeString(SectorPrecommitted) + sectorActivatedByte, _ := b64.StdEncoding.DecodeString(SectorActivated) + sectorUpdatedByte, _ := b64.StdEncoding.DecodeString(SectorUpdated) + sectorTerminatedByte, _ := b64.StdEncoding.DecodeString(SectorTerminated) + + fields = map[string][]types.ActorEventBlock{ + "$type": []types.ActorEventBlock{ + {81, verifierBalanceByte}, // verifier-balance + {81, allocationByte}, // allocation + {81, allocationRemovedByte}, // allocation-removed + {81, claimByte}, // claim + {81, claimUpdatedByte}, // claim-updated + {81, claimRemovedByte}, // claim-removed + {81, dealPublishedByte}, // deal-published + {81, dealActivatedByte}, // deal-activated + {81, dealTerminatedByte}, // deal-terminated + {81, dealCompletedByte}, // deal-completed + {81, sectorPrecommittedByte}, // sector-precommitted + {81, sectorActivatedByte}, // sector-activated + {81, sectorUpdatedByte}, // sector-updated + {81, sectorTerminatedByte}, // sector-terminated + }, + } + + // ------------ convert ------------ + // https://fips.filecoin.io/FIPS/fip-0083.html + convert = map[string]string{ + "$type": STRING, + "verifier": INT, + "client": INT, + "balance": BIGINT, + "id": INT, + "provider": INT, + "piece-cid": CID, + "piece-size": INT, + "term-min": INT, + "term-max": INT, + "expiration": INT, + "term-start": INT, + "sector": INT, + "unsealed-cid": CID, + } +} +func cborValueDecode(key string, value []byte) interface{} { + var ( + resultSTR string + resultINT int + resultBIGINT types.BigInt + resultCID cid.Cid + err error + ) + + switch convert[key] { + case STRING: + err = cbor.Unmarshal(value, &resultSTR) + if err != nil { + log.Errorf("cbor.Unmarshal err: %v, key: %v", err, key) + return nil + } + return resultSTR + case INT: + err = cbor.Unmarshal(value, &resultINT) + if err != nil { + log.Errorf("cbor.Unmarshal err: %v, key: %v", err, key) + return nil + } + return resultINT + case BIGINT: + err = cbor.Unmarshal(value, &resultBIGINT) + if err != nil { + log.Errorf("cbor.Unmarshal err: %v, key: %v", err, key) + return nil + } + return resultBIGINT + case CID: + err = cbor.Unmarshal(value, &resultCID) + if err != nil { + log.Errorf("cbor.Unmarshal err: %v, key: %v", err, key) + return nil + } + return resultCID + } + + return nil +} + +func (t *Task) ProcessTipSets(ctx context.Context, current *types.TipSet, executed *types.TipSet) (model.Persistable, *visormodel.ProcessingReport, error) { + ctx, span := otel.Tracer("").Start(ctx, "ProcessTipSets") + if span.IsRecording() { + span.SetAttributes( + attribute.String("current", current.String()), + attribute.Int64("current_height", int64(current.Height())), + attribute.String("executed", executed.String()), + attribute.Int64("executed_height", int64(executed.Height())), + attribute.String("processor", "fevm_trace"), + ) + } + defer span.End() + errs := []error{} + + tsKey := executed.Key() + filter := &types.ActorEventFilter{ + TipSetKey: &tsKey, + Fields: fields, + } + + report := &visormodel.ProcessingReport{ + Height: int64(current.Height()), + StateRoot: current.ParentState().String(), + } + + events, err := t.node.GetActorEventsRaw(ctx, filter) + if err != nil { + log.Errorf("GetActorEventsRaw[pTs: %v, pHeight: %v, cTs: %v, cHeight: %v] err: %v", executed.Key().String(), executed.Height(), current.Key().String(), current.Height(), err) + errs = append(errs, err) + } + + log.Errorf("Get the events count: %v", len(events)) + + var ( + builtInActorResult = make(builtinactor.BuiltInActorEvents, 0) + ) + + for evtIdx, event := range events { + var eventsSlice []*KVEvent + var eventType string + actorEvent := make(map[string]interface{}) + + for entryIdx, e := range event.Entries { + if e.Codec != 0x51 { // 81 + log.Warnf("Codec not equal to cbor, height: %v, evtIdx: %v, emitter: %v, entryIdx: %v, e.Codec: %v", executed.Height(), evtIdx, event.Emitter.String(), entryIdx, e.Codec) + continue + } + + var kvEvent KVEvent + kvEvent.Key = e.Key + + v := cborValueDecode(e.Key, e.Value) + switch convert[e.Key] { + case STRING: + kvEvent.Value = v.(string) + if kvEvent.Key == "$type" { + eventType = kvEvent.Value + } + case INT: + kvEvent.Value = strconv.Itoa(v.(int)) + case BIGINT: + kvEvent.Value = v.(types.BigInt).String() + case CID: + if v != nil { + kvEvent.Value = v.(string) + } + } + if kvEvent.Key != "$type" { + actorEvent[kvEvent.Key] = kvEvent.Value + } + eventsSlice = append(eventsSlice, &kvEvent) + } + + obj := builtinactor.BuiltInActorEvent{ + Height: int64(executed.Height()), + Cid: event.MsgCid.String(), + Emitter: event.Emitter.String(), + EventType: eventType, + } + + re, jsonErr := json.Marshal(eventsSlice) + if jsonErr == nil { + obj.EventEntries = string(re) + } + + payload, jsonErr := json.Marshal(actorEvent) + if jsonErr == nil { + obj.EventPayload = string(payload) + } + + builtInActorResult = append(builtInActorResult, &obj) + } + + if len(errs) > 0 { + report.ErrorsDetected = fmt.Errorf("%v", errs) + } + + return builtInActorResult, report, nil +} From ec65a0440ab508493856da79098bde4f517eb03a Mon Sep 17 00:00:00 2001 From: "terry.hung" Date: Thu, 18 Apr 2024 11:50:25 +0800 Subject: [PATCH 2/2] fix the golint issue --- chain/indexer/tasktype/tasks_test.go | 3 ++- go.mod | 2 +- storage/sql.go | 2 ++ tasks/messages/builtinactorevent/task.go | 34 +++++++++++------------- 4 files changed, 21 insertions(+), 20 deletions(-) diff --git a/chain/indexer/tasktype/tasks_test.go b/chain/indexer/tasktype/tasks_test.go index b312d852..aa1a8a7a 100644 --- a/chain/indexer/tasktype/tasks_test.go +++ b/chain/indexer/tasktype/tasks_test.go @@ -55,7 +55,8 @@ func TestMakeTaskNamesAlias(t *testing.T) { }, { taskAlias: tasktype.MessagesTask, - tasks: []string{tasktype.Message, tasktype.ParsedMessage, tasktype.Receipt, tasktype.GasOutputs, tasktype.MessageGasEconomy, tasktype.BlockMessage, tasktype.ActorEvent, tasktype.MessageParam, tasktype.ReceiptReturn}, + tasks: []string{tasktype.Message, tasktype.ParsedMessage, tasktype.Receipt, tasktype.GasOutputs, tasktype.MessageGasEconomy, tasktype.BlockMessage, tasktype.ActorEvent, tasktype.MessageParam, tasktype.ReceiptReturn, + tasktype.BuiltInActorEvent}, }, { taskAlias: tasktype.ChainEconomicsTask, diff --git a/go.mod b/go.mod index 92e49758..936b4fc6 100644 --- a/go.mod +++ b/go.mod @@ -65,6 +65,7 @@ require ( github.com/DataDog/zstd v1.4.5 github.com/cenkalti/backoff/v4 v4.2.1 github.com/filecoin-project/go-amt-ipld/v4 v4.2.0 + github.com/fxamacker/cbor/v2 v2.6.0 github.com/hibiken/asynq v0.23.0 github.com/hibiken/asynq/x v0.0.0-20220413130846-5c723f597e01 github.com/ipfs/go-ipld-format v0.6.0 @@ -142,7 +143,6 @@ require ( github.com/filecoin-project/specs-actors/v8 v8.0.1 // indirect github.com/flynn/noise v1.1.0 // indirect github.com/francoispqt/gojay v1.2.13 // indirect - github.com/fxamacker/cbor/v2 v2.6.0 // indirect github.com/gammazero/deque v0.2.1 // indirect github.com/gbrlsnchs/jwt/v3 v3.0.1 // indirect github.com/gdamore/encoding v1.0.0 // indirect diff --git a/storage/sql.go b/storage/sql.go index fba9a625..f99354d0 100644 --- a/storage/sql.go +++ b/storage/sql.go @@ -17,6 +17,7 @@ import ( "github.com/filecoin-project/lily/model" "github.com/filecoin-project/lily/model/actordumps" + "github.com/filecoin-project/lily/model/actors/builtinactor" "github.com/filecoin-project/lily/model/actors/common" "github.com/filecoin-project/lily/model/actors/datacap" init_ "github.com/filecoin-project/lily/model/actors/init" @@ -104,6 +105,7 @@ var Models = []interface{}{ (*fevm.FEVMTrace)(nil), (*actordumps.FEVMActorDump)(nil), (*actordumps.MinerActorDump)(nil), + (*builtinactor.BuiltInActorEvent)(nil), } var log = logging.Logger("lily/storage") diff --git a/tasks/messages/builtinactorevent/task.go b/tasks/messages/builtinactorevent/task.go index a619ca2a..fc1d489f 100644 --- a/tasks/messages/builtinactorevent/task.go +++ b/tasks/messages/builtinactorevent/task.go @@ -84,21 +84,21 @@ func init() { sectorTerminatedByte, _ := b64.StdEncoding.DecodeString(SectorTerminated) fields = map[string][]types.ActorEventBlock{ - "$type": []types.ActorEventBlock{ - {81, verifierBalanceByte}, // verifier-balance - {81, allocationByte}, // allocation - {81, allocationRemovedByte}, // allocation-removed - {81, claimByte}, // claim - {81, claimUpdatedByte}, // claim-updated - {81, claimRemovedByte}, // claim-removed - {81, dealPublishedByte}, // deal-published - {81, dealActivatedByte}, // deal-activated - {81, dealTerminatedByte}, // deal-terminated - {81, dealCompletedByte}, // deal-completed - {81, sectorPrecommittedByte}, // sector-precommitted - {81, sectorActivatedByte}, // sector-activated - {81, sectorUpdatedByte}, // sector-updated - {81, sectorTerminatedByte}, // sector-terminated + "$type": { + {Codec: 81, Value: verifierBalanceByte}, // verifier-balance + {Codec: 81, Value: allocationByte}, // allocation + {Codec: 81, Value: allocationRemovedByte}, // allocation-removed + {Codec: 81, Value: claimByte}, // claim + {Codec: 81, Value: claimUpdatedByte}, // claim-updated + {Codec: 81, Value: claimRemovedByte}, // claim-removed + {Codec: 81, Value: dealPublishedByte}, // deal-published + {Codec: 81, Value: dealActivatedByte}, // deal-activated + {Codec: 81, Value: dealTerminatedByte}, // deal-terminated + {Codec: 81, Value: dealCompletedByte}, // deal-completed + {Codec: 81, Value: sectorPrecommittedByte}, // sector-precommitted + {Codec: 81, Value: sectorActivatedByte}, // sector-activated + {Codec: 81, Value: sectorUpdatedByte}, // sector-updated + {Codec: 81, Value: sectorTerminatedByte}, // sector-terminated }, } @@ -195,8 +195,6 @@ func (t *Task) ProcessTipSets(ctx context.Context, current *types.TipSet, execut errs = append(errs, err) } - log.Errorf("Get the events count: %v", len(events)) - var ( builtInActorResult = make(builtinactor.BuiltInActorEvents, 0) ) @@ -228,7 +226,7 @@ func (t *Task) ProcessTipSets(ctx context.Context, current *types.TipSet, execut kvEvent.Value = v.(types.BigInt).String() case CID: if v != nil { - kvEvent.Value = v.(string) + kvEvent.Value = v.(cid.Cid).String() } } if kvEvent.Key != "$type" {