From 5092fc41c77c7206a65637f3adc1c2cc4687b32a Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Tue, 29 Mar 2022 13:33:06 +0200 Subject: [PATCH] use snappy-framed format for compressing bellatrix+ database entries (#3551) `.era` files and Req/Resp protocols use framed formats - aligning the database with these makes for less recompression work overall as gossip is sent only once while req/resp repeats (potentially) - this also allows efficient pruning-to-era where snappy-recompression is the major cycle thief. --- beacon_chain/beacon_chain_db.nim | 198 ++++++++++++++++-- .../consensus_object_pools/blockchain_dag.nim | 14 +- ncli/e2store.nim | 4 +- ncli/ncli_db.nim | 2 +- tests/test_beacon_chain_db.nim | 17 +- vendor/nim-snappy | 2 +- 6 files changed, 207 insertions(+), 30 deletions(-) diff --git a/beacon_chain/beacon_chain_db.nim b/beacon_chain/beacon_chain_db.nim index 97592087b5..c7ba0af143 100644 --- a/beacon_chain/beacon_chain_db.nim +++ b/beacon_chain/beacon_chain_db.nim @@ -10,7 +10,7 @@ import std/[typetraits, tables], stew/[arrayops, assign2, byteutils, endians2, io2, objects, results], - serialization, chronicles, snappy, + serialization, chronicles, snappy, snappy/framing, eth/db/[kvstore, kvstore_sqlite3], ./networking/network_metadata, ./beacon_chain_db_immutable, ./spec/[eth2_ssz_serialization, eth2_merkleization, forks, state_transition], @@ -69,6 +69,9 @@ type ## 1.3 creates `kvstore` with rowid, making it quite fast, but doesn't do ## anything about existing databases. Versions after that use a separate ## file instead (V1) + ## + ## Starting with bellatrix, we store blocks and states using snappy framed + ## encoding so as to match the `Req`/`Resp` protocols and era files ("SZ"). backend: KvStoreRef # kvstore stateStore: KvStoreRef # state_no_validators @@ -521,6 +524,18 @@ proc decodeSnappySSZ[T](data: openArray[byte], output: var T): bool = err = e.msg, typ = name(T), dataLen = data.len false +proc decodeSZSSZ[T](data: openArray[byte], output: var T): bool = + try: + let decompressed = framingFormatUncompress(data) + readSszBytes(decompressed, output, updateRoot = false) + true + except CatchableError as e: + # If the data can't be deserialized, it could be because it's from a + # version of the software that uses a different SSZ encoding + warn "Unable to deserialize data, old database?", + err = e.msg, typ = name(T), dataLen = data.len + false + proc encodeSSZ(v: auto): seq[byte] = try: SSZ.encode(v) @@ -534,6 +549,14 @@ proc encodeSnappySSZ(v: auto): seq[byte] = # In-memory encode shouldn't fail! raiseAssert err.msg +proc encodeSZSSZ(v: auto): seq[byte] = + # https://github.com/google/snappy/blob/main/framing_format.txt + try: + framingFormatCompress(SSZ.encode(v)) + except CatchableError as err: + # In-memory encode shouldn't fail! + raiseAssert err.msg + proc getRaw(db: KvStoreRef, key: openArray[byte], T: type Eth2Digest): Opt[T] = var res: Opt[T] proc decode(data: openArray[byte]) = @@ -561,9 +584,7 @@ type GetResult = enum proc getSSZ[T](db: KvStoreRef, key: openArray[byte], output: var T): GetResult = var status = GetResult.notFound - # TODO address is needed because there's no way to express lifetimes in nim - # we'll use unsafeAddr to find the code later - var outputPtr = unsafeAddr output # callback is local, ptr wont escape + var outputPtr = addr output # callback is local, ptr wont escape proc decode(data: openArray[byte]) = status = if decodeSSZ(data, outputPtr[]): GetResult.found @@ -579,9 +600,7 @@ proc putSSZ(db: KvStoreRef, key: openArray[byte], v: auto) = proc getSnappySSZ[T](db: KvStoreRef, key: openArray[byte], output: var T): GetResult = var status = GetResult.notFound - # TODO address is needed because there's no way to express lifetimes in nim - # we'll use unsafeAddr to find the code later - var outputPtr = unsafeAddr output # callback is local, ptr wont escape + var outputPtr = addr output # callback is local, ptr wont escape proc decode(data: openArray[byte]) = status = if decodeSnappySSZ(data, outputPtr[]): GetResult.found @@ -594,6 +613,22 @@ proc getSnappySSZ[T](db: KvStoreRef, key: openArray[byte], output: var T): GetRe proc putSnappySSZ(db: KvStoreRef, key: openArray[byte], v: auto) = db.put(key, encodeSnappySSZ(v)).expectDb() +proc getSZSSZ[T](db: KvStoreRef, key: openArray[byte], output: var T): GetResult = + var status = GetResult.notFound + + var outputPtr = addr output # callback is local, ptr wont escape + proc decode(data: openArray[byte]) = + status = + if decodeSZSSZ(data, outputPtr[]): GetResult.found + else: GetResult.corrupted + + discard db.get(key, decode).expectDb() + + status + +proc putSZSSZ(db: KvStoreRef, key: openArray[byte], v: auto) = + db.put(key, encodeSZSSZ(v)).expectDb() + proc close*(db: BeaconChainDBV0) = discard db.stateStore.close() discard db.backend.close() @@ -628,11 +663,20 @@ proc putBeaconBlockSummary*( # Summaries are too simple / small to compress, store them as plain SSZ db.summaries.putSSZ(root.data, value) -proc putBlock*(db: BeaconChainDB, value: ForkyTrustedSignedBeaconBlock) = +proc putBlock*( + db: BeaconChainDB, + value: phase0.TrustedSignedBeaconBlock | altair.TrustedSignedBeaconBlock) = db.withManyWrites: db.blocks[type(value).toFork].putSnappySSZ(value.root.data, value) db.putBeaconBlockSummary(value.root, value.message.toBeaconBlockSummary()) +proc putBlock*( + db: BeaconChainDB, + value: bellatrix.TrustedSignedBeaconBlock) = + db.withManyWrites: + db.blocks[type(value).toFork].putSZSSZ(value.root.data, value) + db.putBeaconBlockSummary(value.root, value.message.toBeaconBlockSummary()) + proc updateImmutableValidators*( db: BeaconChainDB, validators: openArray[Validator]) = # Must be called before storing a state that references the new validators @@ -659,11 +703,18 @@ template toBeaconStateNoImmutableValidators(state: bellatrix.BeaconState): BellatrixBeaconStateNoImmutableValidators = isomorphicCast[BellatrixBeaconStateNoImmutableValidators](state) -proc putState*(db: BeaconChainDB, key: Eth2Digest, value: ForkyBeaconState) = +proc putState*( + db: BeaconChainDB, key: Eth2Digest, + value: phase0.BeaconState | altair.BeaconState) = db.updateImmutableValidators(value.validators.asSeq()) db.statesNoVal[type(value).toFork()].putSnappySSZ( key.data, toBeaconStateNoImmutableValidators(value)) +proc putState*(db: BeaconChainDB, key: Eth2Digest, value: bellatrix.BeaconState) = + db.updateImmutableValidators(value.validators.asSeq()) + db.statesNoVal[type(value).toFork()].putSZSSZ( + key.data, toBeaconStateNoImmutableValidators(value)) + proc putState*(db: BeaconChainDB, state: ForkyHashedBeaconState) = db.withManyWrites: db.putStateRoot(state.latest_block_root, state.data.slot, state.root) @@ -740,31 +791,44 @@ proc getBlock*( # set root after deserializing (so it doesn't get zeroed) result.get().root = key +proc getBlock*( + db: BeaconChainDB, key: Eth2Digest, + T: type altair.TrustedSignedBeaconBlock): Opt[T] = + # We only store blocks that we trust in the database + result.ok(default(T)) + if db.blocks[T.toFork].getSnappySSZ(key.data, result.get) == GetResult.found: + # set root after deserializing (so it doesn't get zeroed) + result.get().root = key + else: + result.err() + proc getBlock*[ - X: altair.TrustedSignedBeaconBlock | bellatrix.TrustedSignedBeaconBlock]( + X: bellatrix.TrustedSignedBeaconBlock]( db: BeaconChainDB, key: Eth2Digest, T: type X): Opt[T] = # We only store blocks that we trust in the database result.ok(default(T)) - if db.blocks[T.toFork].getSnappySSZ(key.data, result.get) == GetResult.found: + if db.blocks[T.toFork].getSZSSZ(key.data, result.get) == GetResult.found: # set root after deserializing (so it doesn't get zeroed) result.get().root = key else: result.err() -proc getPhase0BlockSSZ(db: BeaconChainDBV0, key: Eth2Digest, data: var seq[byte]): bool = - let dataPtr = unsafeAddr data # Short-lived +proc getPhase0BlockSSZ( + db: BeaconChainDBV0, key: Eth2Digest, data: var seq[byte]): bool = + let dataPtr = addr data # Short-lived var success = true proc decode(data: openArray[byte]) = try: dataPtr[] = snappy.decode(data, maxDecompressedDbRecordSize) except CatchableError: success = false - db.backend.get(subkey(phase0.SignedBeaconBlock, key), decode).expectDb() and success + db.backend.get(subkey(phase0.SignedBeaconBlock, key), decode).expectDb() and + success # SSZ implementations are separate so as to avoid unnecessary data copies proc getBlockSSZ*( db: BeaconChainDB, key: Eth2Digest, data: var seq[byte], T: type phase0.TrustedSignedBeaconBlock): bool = - let dataPtr = unsafeAddr data # Short-lived + let dataPtr = addr data # Short-lived var success = true proc decode(data: openArray[byte]) = try: dataPtr[] = snappy.decode(data, maxDecompressedDbRecordSize) @@ -772,17 +836,26 @@ proc getBlockSSZ*( db.blocks[BeaconBlockFork.Phase0].get(key.data, decode).expectDb() and success or db.v0.getPhase0BlockSSZ(key, data) -proc getBlockSSZ*[ - X: altair.TrustedSignedBeaconBlock | bellatrix.TrustedSignedBeaconBlock]( +proc getBlockSSZ*( db: BeaconChainDB, key: Eth2Digest, data: var seq[byte], - T: type X): bool = - let dataPtr = unsafeAddr data # Short-lived + T: type altair.TrustedSignedBeaconBlock): bool = + let dataPtr = addr data # Short-lived var success = true proc decode(data: openArray[byte]) = try: dataPtr[] = snappy.decode(data, maxDecompressedDbRecordSize) except CatchableError: success = false db.blocks[T.toFork].get(key.data, decode).expectDb() and success +proc getBlockSSZ*( + db: BeaconChainDB, key: Eth2Digest, data: var seq[byte], + T: type bellatrix.TrustedSignedBeaconBlock): bool = + let dataPtr = addr data # Short-lived + var success = true + proc decode(data: openArray[byte]) = + try: dataPtr[] = framingFormatUncompress(data) + except CatchableError: success = false + db.blocks[T.toFork].get(key.data, decode).expectDb() and success + proc getBlockSSZ*( db: BeaconChainDB, key: Eth2Digest, data: var seq[byte], fork: BeaconBlockFork): bool = @@ -794,9 +867,53 @@ proc getBlockSSZ*( of BeaconBlockFork.Bellatrix: getBlockSSZ(db, key, data, bellatrix.TrustedSignedBeaconBlock) +proc getBlockSZ*( + db: BeaconChainDB, key: Eth2Digest, data: var seq[byte], + T: type phase0.TrustedSignedBeaconBlock): bool = + let dataPtr = addr data # Short-lived + var success = true + proc decode(data: openArray[byte]) = + try: dataPtr[] = framingFormatCompress( + snappy.decode(data, maxDecompressedDbRecordSize)) + except CatchableError: success = false + db.blocks[BeaconBlockFork.Phase0].get(key.data, decode).expectDb() and success or + db.v0.getPhase0BlockSSZ(key, data) + +proc getBlockSZ*( + db: BeaconChainDB, key: Eth2Digest, data: var seq[byte], + T: type altair.TrustedSignedBeaconBlock): bool = + let dataPtr = addr data # Short-lived + var success = true + proc decode(data: openArray[byte]) = + try: dataPtr[] = framingFormatCompress( + snappy.decode(data, maxDecompressedDbRecordSize)) + except CatchableError: success = false + db.blocks[T.toFork].get(key.data, decode).expectDb() and success + +proc getBlockSZ*( + db: BeaconChainDB, key: Eth2Digest, data: var seq[byte], + T: type bellatrix.TrustedSignedBeaconBlock): bool = + let dataPtr = addr data # Short-lived + var success = true + proc decode(data: openArray[byte]) = + assign(dataPtr[], data) + db.blocks[T.toFork].get(key.data, decode).expectDb() and success + +proc getBlockSZ*( + db: BeaconChainDB, key: Eth2Digest, data: var seq[byte], + fork: BeaconBlockFork): bool = + case fork + of BeaconBlockFork.Phase0: + getBlockSZ(db, key, data, phase0.TrustedSignedBeaconBlock) + of BeaconBlockFork.Altair: + getBlockSZ(db, key, data, altair.TrustedSignedBeaconBlock) + of BeaconBlockFork.Bellatrix: + getBlockSZ(db, key, data, bellatrix.TrustedSignedBeaconBlock) + proc getStateOnlyMutableValidators( immutableValidators: openArray[ImmutableValidatorData2], - store: KvStoreRef, key: openArray[byte], output: var ForkyBeaconState, + store: KvStoreRef, key: openArray[byte], + output: var (phase0.BeaconState | altair.BeaconState), rollback: RollbackProc): bool = ## Load state into `output` - BeaconState is large so we want to avoid ## re-allocating it if possible @@ -835,6 +952,47 @@ proc getStateOnlyMutableValidators( rollback() false +proc getStateOnlyMutableValidators( + immutableValidators: openArray[ImmutableValidatorData2], + store: KvStoreRef, key: openArray[byte], output: var bellatrix.BeaconState, + rollback: RollbackProc): bool = + ## Load state into `output` - BeaconState is large so we want to avoid + ## re-allocating it if possible + ## Return `true` iff the entry was found in the database and `output` was + ## overwritten. + ## Rollback will be called only if output was partially written - if it was + ## not found at all, rollback will not be called + # TODO rollback is needed to deal with bug - use `noRollback` to ignore: + # https://github.com/nim-lang/Nim/issues/14126 + # TODO RVO is inefficient for large objects: + # https://github.com/nim-lang/Nim/issues/13879 + + case store.getSZSSZ(key, toBeaconStateNoImmutableValidators(output)) + of GetResult.found: + let numValidators = output.validators.len + doAssert immutableValidators.len >= numValidators + + for i in 0 ..< numValidators: + let + # Bypass hash cache invalidation + dstValidator = addr output.validators.data[i] + + assign( + dstValidator.pubkey, + immutableValidators[i].pubkey.toPubKey()) + assign( + dstValidator.withdrawal_credentials, + immutableValidators[i].withdrawal_credentials) + + output.validators.resetCache() + + true + of GetResult.notFound: + false + of GetResult.corrupted: + rollback() + false + proc getState( db: BeaconChainDBV0, immutableValidators: openArray[ImmutableValidatorData2], diff --git a/beacon_chain/consensus_object_pools/blockchain_dag.nim b/beacon_chain/consensus_object_pools/blockchain_dag.nim index bed29360a7..85c379eff6 100644 --- a/beacon_chain/consensus_object_pools/blockchain_dag.nim +++ b/beacon_chain/consensus_object_pools/blockchain_dag.nim @@ -478,8 +478,6 @@ proc getBlock*( proc getBlockSSZ*(dag: ChainDAGRef, bid: BlockId, bytes: var seq[byte]): bool = # Load the SSZ-encoded data of a block into `bytes`, overwriting the existing # content - # careful: there are two snappy encodings in use, with and without framing! - # Returns true if the block is found, false if not let fork = dag.cfg.blockForkAtEpoch(bid.slot.epoch) dag.db.getBlockSSZ(bid.root, bytes, fork) or (bid.slot <= dag.finalizedHead.slot and @@ -487,6 +485,18 @@ proc getBlockSSZ*(dag: ChainDAGRef, bid: BlockId, bytes: var seq[byte]): bool = dag.era, getStateField(dag.headState, historical_roots).asSeq, bid.slot, bytes).isOk) +proc getBlockSZ*(dag: ChainDAGRef, bid: BlockId, bytes: var seq[byte]): bool = + # Load the snappy-frame-compressed ("SZ") SSZ-encoded data of a block into + # `bytes`, overwriting the existing content + # careful: there are two snappy encodings in use, with and without framing! + # Returns true if the block is found, false if not + let fork = dag.cfg.blockForkAtEpoch(bid.slot.epoch) + dag.db.getBlockSZ(bid.root, bytes, fork) or + (bid.slot <= dag.finalizedHead.slot and + getBlockSZ( + dag.era, getStateField(dag.headState, historical_roots).asSeq, + bid.slot, bytes).isOk) + proc getForkedBlock*( dag: ChainDAGRef, bid: BlockId): Opt[ForkedTrustedSignedBeaconBlock] = diff --git a/ncli/e2store.nim b/ncli/e2store.nim index c2fe272a79..48f5a8954f 100644 --- a/ncli/e2store.nim +++ b/ncli/e2store.nim @@ -243,11 +243,11 @@ proc init*(T: type EraGroup, f: IoHandle, startSlot: Option[Slot]): Result[T, st else: 0 )))) -proc update*(g: var EraGroup, f: IoHandle, slot: Slot, sszBytes: openArray[byte]): Result[void, string] = +proc update*(g: var EraGroup, f: IoHandle, slot: Slot, szBytes: openArray[byte]): Result[void, string] = doAssert slot >= g.slotIndex.startSlot g.slotIndex.offsets[int(slot - g.slotIndex.startSlot)] = try: - ? f.appendRecord(SnappyBeaconBlock, framingFormatCompress(sszBytes)) + ? f.appendRecord(SnappyBeaconBlock, szBytes) except CatchableError as e: raiseAssert e.msg # TODO fix snappy ok() diff --git a/ncli/ncli_db.nim b/ncli/ncli_db.nim index 9190f5ed43..4c0819897f 100644 --- a/ncli/ncli_db.nim +++ b/ncli/ncli_db.nim @@ -487,7 +487,7 @@ proc cmdExportEra(conf: DbConf, cfg: RuntimeConfig) = withTimer(timers[tBlocks]): var blocks: array[SLOTS_PER_HISTORICAL_ROOT.int, BlockId] for i in dag.getBlockRange(firstSlot.get(), 1, blocks)..