Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use snappy-framed format for compressing bellatrix+ database entries #3551

Merged
merged 3 commits into from
Mar 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 178 additions & 20 deletions beacon_chain/beacon_chain_db.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import
std/[typetraits, tables],
stew/[arrayops, assign2, byteutils, endians2, io2, objects, results],
serialization, chronicles, snappy,
serialization, chronicles, snappy, snappy/framing,
eth/db/[kvstore, kvstore_sqlite3],
./networking/network_metadata, ./beacon_chain_db_immutable,
./spec/[eth2_ssz_serialization, eth2_merkleization, forks, state_transition],
Expand Down Expand Up @@ -69,6 +69,9 @@ type
## 1.3 creates `kvstore` with rowid, making it quite fast, but doesn't do
## anything about existing databases. Versions after that use a separate
## file instead (V1)
##
## Starting with bellatrix, we store blocks and states using snappy framed
## encoding so as to match the `Req`/`Resp` protocols and era files ("SZ").
backend: KvStoreRef # kvstore
stateStore: KvStoreRef # state_no_validators

Expand Down Expand Up @@ -521,6 +524,18 @@ proc decodeSnappySSZ[T](data: openArray[byte], output: var T): bool =
err = e.msg, typ = name(T), dataLen = data.len
false

proc decodeSZSSZ[T](data: openArray[byte], output: var T): bool =
try:
let decompressed = framingFormatUncompress(data)
readSszBytes(decompressed, output, updateRoot = false)
true
except CatchableError as e:
# If the data can't be deserialized, it could be because it's from a
# version of the software that uses a different SSZ encoding
warn "Unable to deserialize data, old database?",
err = e.msg, typ = name(T), dataLen = data.len
false

proc encodeSSZ(v: auto): seq[byte] =
try:
SSZ.encode(v)
Expand All @@ -534,6 +549,14 @@ proc encodeSnappySSZ(v: auto): seq[byte] =
# In-memory encode shouldn't fail!
raiseAssert err.msg

proc encodeSZSSZ(v: auto): seq[byte] =
# https://github.com/google/snappy/blob/main/framing_format.txt
try:
framingFormatCompress(SSZ.encode(v))
except CatchableError as err:
# In-memory encode shouldn't fail!
raiseAssert err.msg

proc getRaw(db: KvStoreRef, key: openArray[byte], T: type Eth2Digest): Opt[T] =
var res: Opt[T]
proc decode(data: openArray[byte]) =
Expand Down Expand Up @@ -561,9 +584,7 @@ type GetResult = enum
proc getSSZ[T](db: KvStoreRef, key: openArray[byte], output: var T): GetResult =
var status = GetResult.notFound

# TODO address is needed because there's no way to express lifetimes in nim
# we'll use unsafeAddr to find the code later
var outputPtr = unsafeAddr output # callback is local, ptr wont escape
var outputPtr = addr output # callback is local, ptr wont escape
proc decode(data: openArray[byte]) =
status =
if decodeSSZ(data, outputPtr[]): GetResult.found
Expand All @@ -579,9 +600,7 @@ proc putSSZ(db: KvStoreRef, key: openArray[byte], v: auto) =
proc getSnappySSZ[T](db: KvStoreRef, key: openArray[byte], output: var T): GetResult =
var status = GetResult.notFound

# TODO address is needed because there's no way to express lifetimes in nim
# we'll use unsafeAddr to find the code later
var outputPtr = unsafeAddr output # callback is local, ptr wont escape
var outputPtr = addr output # callback is local, ptr wont escape
proc decode(data: openArray[byte]) =
status =
if decodeSnappySSZ(data, outputPtr[]): GetResult.found
Expand All @@ -594,6 +613,22 @@ proc getSnappySSZ[T](db: KvStoreRef, key: openArray[byte], output: var T): GetRe
proc putSnappySSZ(db: KvStoreRef, key: openArray[byte], v: auto) =
db.put(key, encodeSnappySSZ(v)).expectDb()

proc getSZSSZ[T](db: KvStoreRef, key: openArray[byte], output: var T): GetResult =
var status = GetResult.notFound

var outputPtr = addr output # callback is local, ptr wont escape
proc decode(data: openArray[byte]) =
status =
if decodeSZSSZ(data, outputPtr[]): GetResult.found
else: GetResult.corrupted

discard db.get(key, decode).expectDb()

status

proc putSZSSZ(db: KvStoreRef, key: openArray[byte], v: auto) =
db.put(key, encodeSZSSZ(v)).expectDb()

proc close*(db: BeaconChainDBV0) =
discard db.stateStore.close()
discard db.backend.close()
Expand Down Expand Up @@ -628,11 +663,20 @@ proc putBeaconBlockSummary*(
# Summaries are too simple / small to compress, store them as plain SSZ
db.summaries.putSSZ(root.data, value)

proc putBlock*(db: BeaconChainDB, value: ForkyTrustedSignedBeaconBlock) =
proc putBlock*(
db: BeaconChainDB,
value: phase0.TrustedSignedBeaconBlock | altair.TrustedSignedBeaconBlock) =
db.withManyWrites:
db.blocks[type(value).toFork].putSnappySSZ(value.root.data, value)
db.putBeaconBlockSummary(value.root, value.message.toBeaconBlockSummary())

proc putBlock*(
db: BeaconChainDB,
value: bellatrix.TrustedSignedBeaconBlock) =
db.withManyWrites:
db.blocks[type(value).toFork].putSZSSZ(value.root.data, value)
db.putBeaconBlockSummary(value.root, value.message.toBeaconBlockSummary())

proc updateImmutableValidators*(
db: BeaconChainDB, validators: openArray[Validator]) =
# Must be called before storing a state that references the new validators
Expand All @@ -659,11 +703,18 @@ template toBeaconStateNoImmutableValidators(state: bellatrix.BeaconState):
BellatrixBeaconStateNoImmutableValidators =
isomorphicCast[BellatrixBeaconStateNoImmutableValidators](state)

proc putState*(db: BeaconChainDB, key: Eth2Digest, value: ForkyBeaconState) =
proc putState*(
db: BeaconChainDB, key: Eth2Digest,
value: phase0.BeaconState | altair.BeaconState) =
db.updateImmutableValidators(value.validators.asSeq())
db.statesNoVal[type(value).toFork()].putSnappySSZ(
key.data, toBeaconStateNoImmutableValidators(value))

proc putState*(db: BeaconChainDB, key: Eth2Digest, value: bellatrix.BeaconState) =
db.updateImmutableValidators(value.validators.asSeq())
db.statesNoVal[type(value).toFork()].putSZSSZ(
key.data, toBeaconStateNoImmutableValidators(value))

proc putState*(db: BeaconChainDB, state: ForkyHashedBeaconState) =
db.withManyWrites:
db.putStateRoot(state.latest_block_root, state.data.slot, state.root)
Expand Down Expand Up @@ -740,49 +791,71 @@ proc getBlock*(
# set root after deserializing (so it doesn't get zeroed)
result.get().root = key

proc getBlock*(
db: BeaconChainDB, key: Eth2Digest,
T: type altair.TrustedSignedBeaconBlock): Opt[T] =
# We only store blocks that we trust in the database
result.ok(default(T))
if db.blocks[T.toFork].getSnappySSZ(key.data, result.get) == GetResult.found:
# set root after deserializing (so it doesn't get zeroed)
result.get().root = key
else:
result.err()

proc getBlock*[
X: altair.TrustedSignedBeaconBlock | bellatrix.TrustedSignedBeaconBlock](
X: bellatrix.TrustedSignedBeaconBlock](
db: BeaconChainDB, key: Eth2Digest,
T: type X): Opt[T] =
# We only store blocks that we trust in the database
result.ok(default(T))
if db.blocks[T.toFork].getSnappySSZ(key.data, result.get) == GetResult.found:
if db.blocks[T.toFork].getSZSSZ(key.data, result.get) == GetResult.found:
# set root after deserializing (so it doesn't get zeroed)
result.get().root = key
else:
result.err()

proc getPhase0BlockSSZ(db: BeaconChainDBV0, key: Eth2Digest, data: var seq[byte]): bool =
let dataPtr = unsafeAddr data # Short-lived
proc getPhase0BlockSSZ(
db: BeaconChainDBV0, key: Eth2Digest, data: var seq[byte]): bool =
let dataPtr = addr data # Short-lived
var success = true
proc decode(data: openArray[byte]) =
try: dataPtr[] = snappy.decode(data, maxDecompressedDbRecordSize)
except CatchableError: success = false
db.backend.get(subkey(phase0.SignedBeaconBlock, key), decode).expectDb() and success
db.backend.get(subkey(phase0.SignedBeaconBlock, key), decode).expectDb() and
success

# SSZ implementations are separate so as to avoid unnecessary data copies
proc getBlockSSZ*(
db: BeaconChainDB, key: Eth2Digest, data: var seq[byte],
T: type phase0.TrustedSignedBeaconBlock): bool =
let dataPtr = unsafeAddr data # Short-lived
let dataPtr = addr data # Short-lived
var success = true
proc decode(data: openArray[byte]) =
try: dataPtr[] = snappy.decode(data, maxDecompressedDbRecordSize)
except CatchableError: success = false
db.blocks[BeaconBlockFork.Phase0].get(key.data, decode).expectDb() and success or
db.v0.getPhase0BlockSSZ(key, data)

proc getBlockSSZ*[
X: altair.TrustedSignedBeaconBlock | bellatrix.TrustedSignedBeaconBlock](
proc getBlockSSZ*(
db: BeaconChainDB, key: Eth2Digest, data: var seq[byte],
T: type X): bool =
let dataPtr = unsafeAddr data # Short-lived
T: type altair.TrustedSignedBeaconBlock): bool =
let dataPtr = addr data # Short-lived
var success = true
proc decode(data: openArray[byte]) =
try: dataPtr[] = snappy.decode(data, maxDecompressedDbRecordSize)
except CatchableError: success = false
db.blocks[T.toFork].get(key.data, decode).expectDb() and success

proc getBlockSSZ*(
db: BeaconChainDB, key: Eth2Digest, data: var seq[byte],
T: type bellatrix.TrustedSignedBeaconBlock): bool =
let dataPtr = addr data # Short-lived
var success = true
proc decode(data: openArray[byte]) =
try: dataPtr[] = framingFormatUncompress(data)
except CatchableError: success = false
db.blocks[T.toFork].get(key.data, decode).expectDb() and success

proc getBlockSSZ*(
db: BeaconChainDB, key: Eth2Digest, data: var seq[byte],
fork: BeaconBlockFork): bool =
Expand All @@ -794,9 +867,53 @@ proc getBlockSSZ*(
of BeaconBlockFork.Bellatrix:
getBlockSSZ(db, key, data, bellatrix.TrustedSignedBeaconBlock)

proc getBlockSZ*(
db: BeaconChainDB, key: Eth2Digest, data: var seq[byte],
T: type phase0.TrustedSignedBeaconBlock): bool =
let dataPtr = addr data # Short-lived
var success = true
proc decode(data: openArray[byte]) =
try: dataPtr[] = framingFormatCompress(
snappy.decode(data, maxDecompressedDbRecordSize))
except CatchableError: success = false
db.blocks[BeaconBlockFork.Phase0].get(key.data, decode).expectDb() and success or
db.v0.getPhase0BlockSSZ(key, data)

proc getBlockSZ*(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are getBlockSZ(phase0) and getBlockSZ(altair) separate functions? Bellatrix differs in that it does assign rather than snappy.decode.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

phase0 reads from an additional table.. yeah, there's better ways to factor the code I'm sure but ..

db: BeaconChainDB, key: Eth2Digest, data: var seq[byte],
T: type altair.TrustedSignedBeaconBlock): bool =
let dataPtr = addr data # Short-lived
var success = true
proc decode(data: openArray[byte]) =
try: dataPtr[] = framingFormatCompress(
snappy.decode(data, maxDecompressedDbRecordSize))
except CatchableError: success = false
db.blocks[T.toFork].get(key.data, decode).expectDb() and success

proc getBlockSZ*(
db: BeaconChainDB, key: Eth2Digest, data: var seq[byte],
T: type bellatrix.TrustedSignedBeaconBlock): bool =
let dataPtr = addr data # Short-lived
var success = true
proc decode(data: openArray[byte]) =
assign(dataPtr[], data)
db.blocks[T.toFork].get(key.data, decode).expectDb() and success

proc getBlockSZ*(
db: BeaconChainDB, key: Eth2Digest, data: var seq[byte],
fork: BeaconBlockFork): bool =
case fork
of BeaconBlockFork.Phase0:
getBlockSZ(db, key, data, phase0.TrustedSignedBeaconBlock)
of BeaconBlockFork.Altair:
getBlockSZ(db, key, data, altair.TrustedSignedBeaconBlock)
of BeaconBlockFork.Bellatrix:
getBlockSZ(db, key, data, bellatrix.TrustedSignedBeaconBlock)

proc getStateOnlyMutableValidators(
immutableValidators: openArray[ImmutableValidatorData2],
store: KvStoreRef, key: openArray[byte], output: var ForkyBeaconState,
store: KvStoreRef, key: openArray[byte],
output: var (phase0.BeaconState | altair.BeaconState),
rollback: RollbackProc): bool =
## Load state into `output` - BeaconState is large so we want to avoid
## re-allocating it if possible
Expand Down Expand Up @@ -835,6 +952,47 @@ proc getStateOnlyMutableValidators(
rollback()
false

proc getStateOnlyMutableValidators(
immutableValidators: openArray[ImmutableValidatorData2],
store: KvStoreRef, key: openArray[byte], output: var bellatrix.BeaconState,
rollback: RollbackProc): bool =
## Load state into `output` - BeaconState is large so we want to avoid
## re-allocating it if possible
## Return `true` iff the entry was found in the database and `output` was
## overwritten.
## Rollback will be called only if output was partially written - if it was
## not found at all, rollback will not be called
# TODO rollback is needed to deal with bug - use `noRollback` to ignore:
# https://github.com/nim-lang/Nim/issues/14126
# TODO RVO is inefficient for large objects:
# https://github.com/nim-lang/Nim/issues/13879
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is RVO still inefficient for large objects? That Nim issue is closed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, yes.. there's nim-lang/Nim#15571 now instead

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

longer story would be that to remove the TODO, one would need to examine the C code again and see what's going on - issue might be closed but that doesn't mean it's actually fixed for our use case


case store.getSZSSZ(key, toBeaconStateNoImmutableValidators(output))
of GetResult.found:
let numValidators = output.validators.len
doAssert immutableValidators.len >= numValidators

for i in 0 ..< numValidators:
let
# Bypass hash cache invalidation
dstValidator = addr output.validators.data[i]

assign(
dstValidator.pubkey,
immutableValidators[i].pubkey.toPubKey())
assign(
dstValidator.withdrawal_credentials,
immutableValidators[i].withdrawal_credentials)

output.validators.resetCache()

true
of GetResult.notFound:
false
of GetResult.corrupted:
rollback()
false

proc getState(
db: BeaconChainDBV0,
immutableValidators: openArray[ImmutableValidatorData2],
Expand Down
14 changes: 12 additions & 2 deletions beacon_chain/consensus_object_pools/blockchain_dag.nim
Original file line number Diff line number Diff line change
Expand Up @@ -478,15 +478,25 @@ proc getBlock*(
proc getBlockSSZ*(dag: ChainDAGRef, bid: BlockId, bytes: var seq[byte]): bool =
# Load the SSZ-encoded data of a block into `bytes`, overwriting the existing
# content
# careful: there are two snappy encodings in use, with and without framing!
# Returns true if the block is found, false if not
let fork = dag.cfg.blockForkAtEpoch(bid.slot.epoch)
dag.db.getBlockSSZ(bid.root, bytes, fork) or
(bid.slot <= dag.finalizedHead.slot and
getBlockSSZ(
dag.era, getStateField(dag.headState, historical_roots).asSeq,
bid.slot, bytes).isOk)

proc getBlockSZ*(dag: ChainDAGRef, bid: BlockId, bytes: var seq[byte]): bool =
# Load the snappy-frame-compressed ("SZ") SSZ-encoded data of a block into
# `bytes`, overwriting the existing content
# careful: there are two snappy encodings in use, with and without framing!
# Returns true if the block is found, false if not
let fork = dag.cfg.blockForkAtEpoch(bid.slot.epoch)
dag.db.getBlockSZ(bid.root, bytes, fork) or
(bid.slot <= dag.finalizedHead.slot and
getBlockSZ(
dag.era, getStateField(dag.headState, historical_roots).asSeq,
bid.slot, bytes).isOk)

proc getForkedBlock*(
dag: ChainDAGRef, bid: BlockId): Opt[ForkedTrustedSignedBeaconBlock] =

Expand Down
4 changes: 2 additions & 2 deletions ncli/e2store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,11 @@ proc init*(T: type EraGroup, f: IoHandle, startSlot: Option[Slot]): Result[T, st
else: 0
))))

proc update*(g: var EraGroup, f: IoHandle, slot: Slot, sszBytes: openArray[byte]): Result[void, string] =
proc update*(g: var EraGroup, f: IoHandle, slot: Slot, szBytes: openArray[byte]): Result[void, string] =
doAssert slot >= g.slotIndex.startSlot
g.slotIndex.offsets[int(slot - g.slotIndex.startSlot)] =
try:
? f.appendRecord(SnappyBeaconBlock, framingFormatCompress(sszBytes))
? f.appendRecord(SnappyBeaconBlock, szBytes)
except CatchableError as e: raiseAssert e.msg # TODO fix snappy

ok()
Expand Down
2 changes: 1 addition & 1 deletion ncli/ncli_db.nim
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ proc cmdExportEra(conf: DbConf, cfg: RuntimeConfig) =
withTimer(timers[tBlocks]):
var blocks: array[SLOTS_PER_HISTORICAL_ROOT.int, BlockId]
for i in dag.getBlockRange(firstSlot.get(), 1, blocks)..<blocks.len:
if dag.getBlockSSZ(blocks[i], tmp):
if dag.getBlockSZ(blocks[i], tmp):
group.update(e2, blocks[i].slot, tmp).get()

withTimer(timers[tState]):
Expand Down
Loading