Skip to content

Commit

Permalink
Merge pull request #4 from Cerebellum-Network/feature/multiple-blockc…
Browse files Browse the repository at this point in the history
…hains

Base processor and data duplication removal
  • Loading branch information
khssnv authored Aug 9, 2024
2 parents 06eb8d1 + 9452a10 commit 52f71c7
Show file tree
Hide file tree
Showing 12 changed files with 115 additions and 92 deletions.
39 changes: 13 additions & 26 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
# Cere Squid Indexer

This is an indexer for [Cerebellum Network](https://www.cere.network/) build using [Subsquid](https://subsquid.io/) [squid-sdk](https://github.com/subsquid/squid-sdk).

## Indexed data

1. Balance transfers,
2. DDC Customers bucket owners relation to bucket IDs.
This is an indexer for [Cerebellum Network](https://www.cere.network/) blockchain built using [Subsquid](https://subsquid.io/) [squid-sdk](https://github.com/subsquid/squid-sdk).

## Usage

Expand All @@ -16,14 +11,8 @@ Please [install](https://docs.subsquid.io/squid-cli/installation/) it before pro
# 1. Install dependencies
npm ci

# 2. Start target Postgres database from `docker-compose.yml` and detach
# 2. Start target Postgres database, processor, and GraphQL server from `docker-compose.yml` and detach
sqd up

# 3. Apply database migrations, load .env, and start the squid processor
sqd process

# 4. Start the GraphQL server
sqd serve
```

A GraphiQL playground will be available at [127.0.0.1:4350/graphql](http://localhost:4350/graphql).
Expand Down Expand Up @@ -51,25 +40,19 @@ All database changes are applied through migration files located at `db/migratio
It is all [TypeORM](https://typeorm.io/#/migrations) under the hood.

```bash
# Update database connection credentials in .env and load them.
source .env

# Connect to database, analyze its state and generate migration to match the target schema.
# The target schema is derived from entity classes generated earlier.
# Don't forget to compile your entity classes beforehand!
npx squid-typeorm-migration generate

# Create template file for custom database changes
npx squid-typeorm-migration create

# Apply database migrations from `db/migrations`
npx squid-typeorm-migration apply

# Revert the last performed migration
npx squid-typeorm-migration revert
sqd generate
```

### 4. Extend processor

Now extend the processor adding your events listening and RPC requests to `getEventsInfo` in `src/main.ts` to compose
im-memory data for the indexer and persist it in the `processor.run` callback.
Create or extend an existing processor with new events listening and RPC requests in `src/processors`.
Now you can add processor state persistance to the `processor.run` callback.

## Project conventions

Expand All @@ -91,5 +74,9 @@ specification file with the runtime metadata.
Create blockchain specification file for `typegen.json` using [substrate-metadata-explorer](https://github.com/subsquid/squid-sdk/tree/master/substrate/substrate-metadata-explorer).

```bash
npx @subsquid/substrate-metadata-explorer --rpc $RPC_WS --out specs/out.jsonl
# Export new runtime metadata
npx @subsquid/substrate-metadata-explorer --rpc $RPC_WS --fromBlock $FROM_BLOCK_NUMBER --out specs/out.jsonl

# Add new runtime metadata to existing file used in `typegen.json` and regenerate types.
npx squid-substrate-typegen ./typegen.json
```
2 changes: 1 addition & 1 deletion commands.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
},
"migration:generate": {
"description": "Generate a DB migration matching the TypeORM entities",
"deps": ["build", "migration:clean"],
"deps": ["build"],
"cmd": ["squid-typeorm-migration", "generate"]
},
"migration:clean": {
Expand Down
15 changes: 15 additions & 0 deletions db/migrations/1723182802615-Data.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
module.exports = class Data1723182802615 {
name = 'Data1723182802615'

async up(db) {
await db.query(`DROP INDEX "public"."IDX_16f64a1893b3288a5481259a36"`)
await db.query(`ALTER TABLE "ddc_bucket" DROP COLUMN "bucket_id"`)
}

async down(db) {
await db.query(`CREATE UNIQUE INDEX "IDX_16f64a1893b3288a5481259a36" ON "ddc_bucket" ("bucket_id") `)
await db.query(`ALTER TABLE "ddc_bucket" ADD "bucket_id" numeric`)
await db.query(`UPDATE "ddc_bucket" SET "bucket_id" = CAST("id" as numeric)`)
await db.query(`ALTER TABLE "ddc_bucket" ALTER COLUMN "bucket_id" SET NOT NULL`)
}
}
1 change: 0 additions & 1 deletion schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ enum DdcNodeMode {
type DdcBucket @entity {
id: ID!

bucketId: BigInt! @index @unique
ownerId: Account! @index
clusterId: DdcCluster! @index

Expand Down
24 changes: 13 additions & 11 deletions src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,23 @@ processor.run(new TypeormDatabase({ supportHotBlocks: true }), async (ctx) => {
logger.debug(
`Received event ${event.name} at block ${block.height} (${block.hash})`,
)
await cereBalancesProcessor.process(event, block)
await ddcBalancesProcessor.process(event, block)
await ddcClustersProcessor.process(event, block)
await ddcNodesProcessor.process(event, block)
await ddcBucketsProcessor.process(event, block)

await Promise.all([
cereBalancesProcessor.process(event, block),
ddcBalancesProcessor.process(event, block),
ddcClustersProcessor.process(event, block),
ddcNodesProcessor.process(event, block),
ddcBucketsProcessor.process(event, block),
])
}
}

// retrieving state from processors
const accountToCereBalance = cereBalancesProcessor.getState()
const accountToDdcBalance = ddcBalancesProcessor.getState()
const ddcClusters = ddcClustersProcessor.getState()
const ddcNodes = ddcNodesProcessor.getState()
const ddcBuckets = ddcBucketsProcessor.getState()
const accountToCereBalance = cereBalancesProcessor.state
const accountToDdcBalance = ddcBalancesProcessor.state
const ddcClusters = ddcClustersProcessor.state
const ddcNodes = ddcNodesProcessor.state
const ddcBuckets = ddcBucketsProcessor.state

// create missing accounts
const accounts = new Map<string, Account>()
Expand Down Expand Up @@ -222,7 +225,6 @@ processor.run(new TypeormDatabase({ supportHotBlocks: true }), async (ctx) => {
ddcBucketEntities.push(
new DdcBucket({
id: bucket.bucketId.toString(),
bucketId: bucket.bucketId,
ownerId: accounts.get(bucket.ownerId),
clusterId: cluster,
isPublic: bucket.isPublic,
Expand Down
6 changes: 1 addition & 5 deletions src/model/generated/ddcBucket.model.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {Entity as Entity_, Column as Column_, PrimaryColumn as PrimaryColumn_, BigIntColumn as BigIntColumn_, Index as Index_, ManyToOne as ManyToOne_, BooleanColumn as BooleanColumn_} from "@subsquid/typeorm-store"
import {Entity as Entity_, Column as Column_, PrimaryColumn as PrimaryColumn_, ManyToOne as ManyToOne_, Index as Index_, BooleanColumn as BooleanColumn_, BigIntColumn as BigIntColumn_} from "@subsquid/typeorm-store"
import {Account} from "./account.model"
import {DdcCluster} from "./ddcCluster.model"

Expand All @@ -11,10 +11,6 @@ export class DdcBucket {
@PrimaryColumn_()
id!: string

@Index_({unique: true})
@BigIntColumn_({nullable: false})
bucketId!: bigint

@Index_()
@ManyToOne_(() => Account, {nullable: true})
ownerId!: Account
Expand Down
20 changes: 12 additions & 8 deletions src/processors/cereBalancesProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,14 @@ import {
throwUnsupportedStorageSpec,
toCereAddress,
} from '../utils'
import { BaseProcessor } from './processor'

export class CereBalancesProcessor {
private state = new Map<string, bigint>()
type State = Map<string, bigint>

export class CereBalancesProcessor extends BaseProcessor<State> {
constructor() {
super(new Map<string, bigint>())
}

private async processBalancesEvent(accountId: string, block: BlockHeader) {
try {
Expand All @@ -32,26 +37,25 @@ export class CereBalancesProcessor {
throwUnsupportedStorageSpec(block)
}
if (accountInStorage) {
this.state.set(
this._state.set(
toCereAddress(accountId),
accountInStorage.data.free,
)
} else {
logStorageError('account', accountId, block)
}
} catch (error) {
if (error?.toString() === 'Error: Unexpected EOF' || error?.toString() === 'Error: Unprocessed data left') {
if (
error?.toString() === 'Error: Unexpected EOF' ||
error?.toString() === 'Error: Unprocessed data left'
) {
// some accounts in old blocks can not be parsed, just ignore them
} else {
throw error
}
}
}

getState(): Map<string, bigint> {
return this.state
}

async process(event: Event, block: BlockHeader) {
switch (event.name) {
case events.balances.endowed.name: {
Expand Down
15 changes: 8 additions & 7 deletions src/processors/ddcBalancesProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,14 @@ import {
throwUnsupportedStorageSpec,
toCereAddress,
} from '../utils'
import { BaseProcessor } from './processor'

export class DdcBalancesProcessor {
private state = new Map<string, bigint>()
type State = Map<string, bigint>

export class DdcBalancesProcessor extends BaseProcessor<State> {
constructor() {
super(new Map<string, bigint>())
}

private async processDdcCustomersBalancesEvents(
accountId: string,
Expand All @@ -24,16 +29,12 @@ export class DdcBalancesProcessor {
throwUnsupportedStorageSpec(block)
}
if (accountInStorage) {
this.state.set(toCereAddress(accountId), accountInStorage.active)
this._state.set(toCereAddress(accountId), accountInStorage.active)
} else {
logStorageError('DDC Customer ledger', accountId, block)
}
}

getState(): Map<string, bigint> {
return this.state
}

async process(event: Event, block: BlockHeader) {
switch (event.name) {
case events.ddcCustomers.deposited.name: {
Expand Down
15 changes: 8 additions & 7 deletions src/processors/ddcBucketsProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
throwUnsupportedStorageSpec,
toCereAddress,
} from '../utils'
import { BaseProcessor } from './processor'

export interface DdcBucketInfo {
ownerId: string
Expand All @@ -19,8 +20,12 @@ export interface DdcBucketInfo {
numberOfGets: bigint
}

export class DdcBucketsProcessor {
private state = new Map<bigint, DdcBucketInfo>()
type State = Map<bigint, DdcBucketInfo>

export class DdcBucketsProcessor extends BaseProcessor<State> {
constructor() {
super(new Map<bigint, DdcBucketInfo>())
}

private async processDdcBucketsEvents(
bucketId: bigint,
Expand Down Expand Up @@ -107,16 +112,12 @@ export class DdcBucketsProcessor {
}
if (bucketInfo) {
bucketInfo.ownerId = toCereAddress(bucketInfo.ownerId)
this.state.set(bucketId, bucketInfo)
this._state.set(bucketId, bucketInfo)
} else {
logStorageError('bucket', bucketId, block)
}
}

getState(): Map<bigint, DdcBucketInfo> {
return this.state
}

async process(event: Event, block: BlockHeader) {
switch (event.name) {
case events.ddcCustomers.bucketCreated.name: {
Expand Down
15 changes: 8 additions & 7 deletions src/processors/ddcClustersProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
toCereAddress,
} from '../utils'
import { DdcClusterStatus } from '../model'
import { BaseProcessor } from './processor'

export interface DdcClusterInfo {
id: string
Expand All @@ -30,8 +31,12 @@ export interface DdcClusterInfo {
status: DdcClusterStatus
}

export class DdcClustersProcessor {
private state = new Map<string, DdcClusterInfo>()
type State = Map<string, DdcClusterInfo>

export class DdcClustersProcessor extends BaseProcessor<State> {
constructor() {
super(new Map<string, DdcClusterInfo>())
}

private newClusterInfo(
clusterId: string,
Expand Down Expand Up @@ -173,16 +178,12 @@ export class DdcClustersProcessor {
clusterInfo.unitPerGetRequest =
clusterGovParams.unitPerGetRequest
}
this.state.set(clusterId, clusterInfo)
this._state.set(clusterId, clusterInfo)
} else {
logStorageError('DDC cluster', clusterId, block)
}
}

getState(): Map<string, DdcClusterInfo> {
return this.state
}

async process(event: Event, block: BlockHeader) {
switch (event.name) {
case events.ddcClusters.clusterCreated.name: {
Expand Down
Loading

0 comments on commit 52f71c7

Please sign in to comment.