diff --git a/export.go b/export.go index af60e89cd..a1f7e0570 100644 --- a/export.go +++ b/export.go @@ -2,31 +2,70 @@ package iavl import "fmt" +// TraverseOrderType is the type of the order in which the tree is traversed. +type TraverseOrderType uint8 + +const ( + PreOrder TraverseOrderType = iota + PostOrder +) + type Exporter struct { tree *Tree out chan *Node errCh chan error } -func (tree *Tree) ExportPreOrder() *Exporter { +func (tree *Tree) Export(order TraverseOrderType) *Exporter { exporter := &Exporter{ tree: tree, out: make(chan *Node), errCh: make(chan error), } - go func() { + + go func(traverseOrder TraverseOrderType) { defer close(exporter.out) defer close(exporter.errCh) - exporter.preOrderNext(tree.root) - }() + + if traverseOrder == PostOrder { + exporter.postOrderNext(tree.root) + } else if traverseOrder == PreOrder { + exporter.preOrderNext(tree.root) + } + }(order) + return exporter } +func (e *Exporter) postOrderNext(node *Node) { + if node.isLeaf() { + e.out <- node + return + } + + left, err := node.getLeftNode(e.tree) + if err != nil { + e.errCh <- err + return + } + e.postOrderNext(left) + + right, err := node.getRightNode(e.tree) + if err != nil { + e.errCh <- err + return + } + e.postOrderNext(right) + + e.out <- node +} + func (e *Exporter) preOrderNext(node *Node) { e.out <- node if node.isLeaf() { return } + left, err := node.getLeftNode(e.tree) if err != nil { e.errCh <- err diff --git a/multitree.go b/multitree.go index f377136fc..6e9625757 100644 --- a/multitree.go +++ b/multitree.go @@ -56,7 +56,7 @@ func ImportMultiTree(pool *NodePool, version int64, path string, treeOpts TreeOp return nil, err } go func(p string) { - root, importErr := sql.ImportSnapshotFromTable(version, false) + root, importErr := sql.ImportSnapshotFromTable(version, PreOrder, false) tree := NewTree(sql, pool, mt.treeOpts) tree.root = root diff --git a/snapshot.go b/snapshot.go index 9304a359c..c9245560e 100644 --- a/snapshot.go +++ b/snapshot.go @@ -77,6 +77,7 @@ func (sql *SqliteDb) Snapshot(ctx context.Context, tree *Tree) error { type SnapshotOptions struct { StoreLeafValues bool SaveTree bool + TraverseOrder TraverseOrderType } func NewIngestSnapshotConnection(snapshotDbPath string) (*sqlite3.Conn, error) { @@ -260,93 +261,14 @@ func (sql *SqliteDb) WriteSnapshot( } var ( - step func() (*Node, error) - maybeFlush func() error - count int - uniqueVersions = make(map[int64]struct{}) + root *Node + uniqueVersions map[int64]struct{} ) - maybeFlush = func() error { - count++ - if count%snap.batchSize == 0 { - if err = snap.flush(); err != nil { - return err - } - if err = snap.prepareWrite(); err != nil { - return err - } - } - return nil + if opts.TraverseOrder == PostOrder { + root, uniqueVersions, err = snap.restorePostOrderStep(nextFn, opts.StoreLeafValues) + } else if opts.TraverseOrder == PreOrder { + root, uniqueVersions, err = snap.restorePreOrderStep(nextFn, opts.StoreLeafValues) } - - step = func() (*Node, error) { - snapshotNode, err := nextFn() - if err != nil { - return nil, err - } - ordinal := snap.ordinal - snap.ordinal++ - - node := &Node{ - key: snapshotNode.Key, - subtreeHeight: snapshotNode.Height, - nodeKey: NewNodeKey(snapshotNode.Version, uint32(ordinal)), - } - if node.subtreeHeight == 0 { - node.value = snapshotNode.Value - node.size = 1 - node._hash(snapshotNode.Version) - if !opts.StoreLeafValues { - node.value = nil - } - nodeBz, err := node.Bytes() - if err != nil { - return nil, err - } - if err = snap.snapshotInsert.Exec(ordinal, snapshotNode.Version, ordinal, nodeBz); err != nil { - return nil, err - } - if err = snap.leafInsert.Exec(snapshotNode.Version, ordinal, nodeBz); err != nil { - return nil, err - } - if err = maybeFlush(); err != nil { - return nil, err - } - return node, nil - } - - node.leftNode, err = step() - if err != nil { - return nil, err - } - node.leftNodeKey = node.leftNode.nodeKey - node.rightNode, err = step() - if err != nil { - return nil, err - } - node.rightNodeKey = node.rightNode.nodeKey - - node.size = node.leftNode.size + node.rightNode.size - node._hash(snapshotNode.Version) - node.leftNode = nil - node.rightNode = nil - - nodeBz, err := node.Bytes() - if err != nil { - return nil, err - } - if err = snap.snapshotInsert.Exec(ordinal, snapshotNode.Version, ordinal, nodeBz); err != nil { - return nil, err - } - if err = snap.treeInsert.Exec(snapshotNode.Version, ordinal, nodeBz); err != nil { - return nil, err - } - uniqueVersions[snapshotNode.Version] = struct{}{} - if err = maybeFlush(); err != nil { - return nil, err - } - return node, nil - } - root, err := step() if err != nil { return nil, err } @@ -393,12 +315,18 @@ type SnapshotNode struct { Height int8 } -func (sql *SqliteDb) ImportSnapshotFromTable(version int64, loadLeaves bool) (*Node, error) { +func (sql *SqliteDb) ImportSnapshotFromTable(version int64, traverseOrder TraverseOrderType, loadLeaves bool) (*Node, error) { read, err := sql.getReadConn() if err != nil { return nil, err } - q, err := read.Prepare(fmt.Sprintf("SELECT version, sequence, bytes FROM snapshot_%d ORDER BY ordinal", version)) + + var q *sqlite3.Stmt + if traverseOrder == PostOrder { + q, err = read.Prepare(fmt.Sprintf("SELECT version, sequence, bytes FROM snapshot_%d ORDER BY ordinal DESC", version)) + } else if traverseOrder == PreOrder { + q, err = read.Prepare(fmt.Sprintf("SELECT version, sequence, bytes FROM snapshot_%d ORDER BY ordinal ASC", version)) + } if err != nil { return nil, err } @@ -416,7 +344,12 @@ func (sql *SqliteDb) ImportSnapshotFromTable(version int64, loadLeaves bool) (*N since: time.Now(), log: log.With().Str("path", sql.opts.Path).Logger(), } - root, err := imp.queryStep() + var root *Node + if traverseOrder == PostOrder { + root, err = imp.queryStepPostOrder() + } else if traverseOrder == PreOrder { + root, err = imp.queryStepPreOrder() + } if err != nil { return nil, err } @@ -434,7 +367,7 @@ func (sql *SqliteDb) ImportSnapshotFromTable(version int64, loadLeaves bool) (*N return root, nil } -func (sql *SqliteDb) ImportMostRecentSnapshot(targetVersion int64, loadLeaves bool) (*Node, int64, error) { +func (sql *SqliteDb) ImportMostRecentSnapshot(targetVersion int64, traverseOrder TraverseOrderType, loadLeaves bool) (*Node, int64, error) { read, err := sql.getReadConn() if err != nil { return nil, 0, err @@ -479,7 +412,7 @@ func (sql *SqliteDb) ImportMostRecentSnapshot(targetVersion int64, loadLeaves bo } } - root, err := sql.ImportSnapshotFromTable(version, loadLeaves) + root, err := sql.ImportSnapshotFromTable(version, traverseOrder, loadLeaves) if err != nil { return nil, 0, err } @@ -633,6 +566,170 @@ func (snap *sqliteSnapshot) prepareWrite() error { return err } +func (snap *sqliteSnapshot) restorePostOrderStep(nextFn func() (*SnapshotNode, error), isStoreLeafValues bool) (*Node, map[int64]struct{}, error) { + var ( + snapshotNode *SnapshotNode + err error + count int + stack []*Node + uniqueVersions = make(map[int64]struct{}) + ) + + for { + snapshotNode, err = nextFn() + if err != nil || snapshotNode == nil { + break + } + + ordinal := snap.ordinal + + uniqueVersions[snapshotNode.Version] = struct{}{} + node := &Node{ + key: snapshotNode.Key, + subtreeHeight: snapshotNode.Height, + nodeKey: NewNodeKey(snapshotNode.Version, uint32(ordinal)), + } + + stackSize := len(stack) + if node.isLeaf() { + node.value = snapshotNode.Value + node.size = 1 + node._hash(snapshotNode.Version) + if !isStoreLeafValues { + node.value = nil + } + + count++ + if err := snap.writeSnapNode(node, snapshotNode.Version, count, ordinal, count); err != nil { + return nil, nil, err + } + } else if stackSize >= 2 && stack[stackSize-1].subtreeHeight < node.subtreeHeight && stack[stackSize-2].subtreeHeight < node.subtreeHeight { + node.leftNode = stack[stackSize-2] + node.leftNodeKey = node.leftNode.nodeKey + node.rightNode = stack[stackSize-1] + node.rightNodeKey = node.rightNode.nodeKey + node.size = node.leftNode.size + node.rightNode.size + node._hash(snapshotNode.Version) + stack = stack[:stackSize-2] + + node.leftNode = nil + node.rightNode = nil + + count++ + if err := snap.writeSnapNode(node, snapshotNode.Version, count, ordinal, count); err != nil { + return nil, nil, err + } + } + + stack = append(stack, node) + snap.ordinal++ + } + + if err != nil && !errors.Is(err, ErrorExportDone) { + return nil, nil, err + } + + if len(stack) != 1 { + return nil, nil, fmt.Errorf("expected stack size 1, got %d", len(stack)) + } + + return stack[0], uniqueVersions, nil +} + +func (snap *sqliteSnapshot) restorePreOrderStep(nextFn func() (*SnapshotNode, error), isStoreLeafValues bool) (*Node, map[int64]struct{}, error) { + var ( + count int + step func() (*Node, error) + uniqueVersions = make(map[int64]struct{}) + ) + + step = func() (*Node, error) { + snapshotNode, err := nextFn() + if err != nil { + return nil, err + } + + ordinal := snap.ordinal + snap.ordinal++ + + node := &Node{ + key: snapshotNode.Key, + subtreeHeight: snapshotNode.Height, + nodeKey: NewNodeKey(snapshotNode.Version, uint32(ordinal)), + } + + if node.isLeaf() { + node.value = snapshotNode.Value + node.size = 1 + node._hash(snapshotNode.Version) + if !isStoreLeafValues { + node.value = nil + } + } else { + node.leftNode, err = step() + if err != nil { + return nil, err + } + node.leftNodeKey = node.leftNode.nodeKey + node.rightNode, err = step() + if err != nil { + return nil, err + } + node.rightNodeKey = node.rightNode.nodeKey + + node.size = node.leftNode.size + node.rightNode.size + node._hash(snapshotNode.Version) + node.leftNode = nil + node.rightNode = nil + uniqueVersions[snapshotNode.Version] = struct{}{} + } + + count++ + if err := snap.writeSnapNode(node, snapshotNode.Version, ordinal, ordinal, count); err != nil { + return nil, err + } + snap.ordinal++ + + return node, nil + } + + node, err := step() + + return node, uniqueVersions, err +} + +func (snap *sqliteSnapshot) writeSnapNode(node *Node, version int64, ordinal, sequence, count int) error { + nodeBz, err := node.Bytes() + if err != nil { + return err + } + if err = snap.snapshotInsert.Exec(ordinal, version, sequence, nodeBz); err != nil { + return err + } + if snap.writeTree { + if node.isLeaf() { + if err = snap.leafInsert.Exec(version, ordinal, nodeBz); err != nil { + return err + } + } else { + if err = snap.treeInsert.Exec(version, sequence, nodeBz); err != nil { + return err + } + } + } + + if count%snap.batchSize == 0 { + if err := snap.flush(); err != nil { + return err + } + if err := snap.prepareWrite(); err != nil { + return err + } + } + + return nil +} + func rehashTree(node *Node) { if node.isLeaf() { return @@ -655,7 +752,55 @@ type sqliteImport struct { log zerolog.Logger } -func (sqlImport *sqliteImport) queryStep() (node *Node, err error) { +func (sqlImport *sqliteImport) queryStepPreOrder() (node *Node, err error) { + sqlImport.i++ + if sqlImport.i%1_000_000 == 0 { + sqlImport.log.Debug().Msgf("import: nodes=%s, node/s=%s", + humanize.Comma(sqlImport.i), + humanize.Comma(int64(float64(1_000_000)/time.Since(sqlImport.since).Seconds())), + ) + sqlImport.since = time.Now() + } + + hasRow, err := sqlImport.query.Step() + if !hasRow { + return nil, nil + } + if err != nil { + return nil, err + } + var bz sqlite3.RawBytes + var version, seq int + err = sqlImport.query.Scan(&version, &seq, &bz) + if err != nil { + return nil, err + } + nodeKey := NewNodeKey(int64(version), uint32(seq)) + node, err = MakeNode(sqlImport.pool, nodeKey, bz) + if err != nil { + return nil, err + } + + if node.isLeaf() && sqlImport.i > 1 { + if sqlImport.loadLeaves { + return node, nil + } + sqlImport.pool.Put(node) + return nil, nil + } + + node.leftNode, err = sqlImport.queryStepPreOrder() + if err != nil { + return nil, err + } + node.rightNode, err = sqlImport.queryStepPreOrder() + if err != nil { + return nil, err + } + return node, nil +} + +func (sqlImport *sqliteImport) queryStepPostOrder() (node *Node, err error) { sqlImport.i++ if sqlImport.i%1_000_000 == 0 { sqlImport.log.Debug().Msgf("import: nodes=%s, node/s=%s", @@ -692,13 +837,14 @@ func (sqlImport *sqliteImport) queryStep() (node *Node, err error) { return nil, nil } - node.leftNode, err = sqlImport.queryStep() + node.rightNode, err = sqlImport.queryStepPostOrder() if err != nil { return nil, err } - node.rightNode, err = sqlImport.queryStep() + node.leftNode, err = sqlImport.queryStepPostOrder() if err != nil { return nil, err } + return node, nil } diff --git a/tree.go b/tree.go index a76cda18f..4122d19b2 100644 --- a/tree.go +++ b/tree.go @@ -109,9 +109,9 @@ func (tree *Tree) LoadVersion(version int64) error { return nil } -func (tree *Tree) LoadSnapshot(version int64) (err error) { +func (tree *Tree) LoadSnapshot(version int64, traverseOrder TraverseOrderType) (err error) { var v int64 - tree.root, v, err = tree.sql.ImportMostRecentSnapshot(version, true) + tree.root, v, err = tree.sql.ImportMostRecentSnapshot(version, traverseOrder, true) if err != nil { return err } diff --git a/tree_test.go b/tree_test.go index 57365faa9..e4a49001b 100644 --- a/tree_test.go +++ b/tree_test.go @@ -159,8 +159,7 @@ func TestTree_Hash(t *testing.T) { opts.Until = 100 opts.UntilHash = "0101e1d6f3158dcb7221acd7ed36ce19f2ef26847ffea7ce69232e362539e5cf" - ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) + _, cancel := context.WithCancel(context.Background()) testStart := time.Now() multiTree := NewMultiTree(tmpDir, TreeOptions{CheckpointInterval: 10, HeightFilter: 0}) @@ -203,25 +202,39 @@ func TestTree_Build_Load(t *testing.T) { opts.UntilHash = "3a037f8dd67a5e1a9ef83a53b81c619c9ac0233abee6f34a400fb9b9dfbb4f8d" testTreeBuild(t, mt, opts) - // export the tree at version 12,000 and import it into a sql db - ctx := context.Background() - restoreMt := NewMultiTree(t.TempDir(), TreeOptions{CheckpointInterval: 4000}) + // export the tree at version 12,000 and import it into a sql db in pre-order + traverseOrder := PreOrder + restorePreOrderMt := NewMultiTree(t.TempDir(), TreeOptions{CheckpointInterval: 4000}) for sk, tree := range multiTree.Trees { - require.NoError(t, restoreMt.MountTree(sk)) - exporter := tree.ExportPreOrder() + require.NoError(t, restorePreOrderMt.MountTree(sk)) + exporter := tree.Export(traverseOrder) - restoreTree := restoreMt.Trees[sk] - _, err := restoreTree.sql.WriteSnapshot(ctx, tree.Version(), exporter.Next, SnapshotOptions{SaveTree: true}) + restoreTree := restorePreOrderMt.Trees[sk] + _, err := restoreTree.sql.WriteSnapshot(context.Background(), tree.Version(), exporter.Next, SnapshotOptions{SaveTree: true, TraverseOrder: traverseOrder}) require.NoError(t, err) - require.NoError(t, restoreTree.LoadSnapshot(tree.Version())) + require.NoError(t, restoreTree.LoadSnapshot(tree.Version(), traverseOrder)) } + // export the tree at version 12,000 and import it into a sql db in post-order + traverseOrder = PostOrder + restorePostOrderMt := NewMultiTree(t.TempDir(), TreeOptions{CheckpointInterval: 4000}) + for sk, tree := range multiTree.Trees { + require.NoError(t, restorePostOrderMt.MountTree(sk)) + exporter := tree.Export(traverseOrder) + + restoreTree := restorePostOrderMt.Trees[sk] + _, err := restoreTree.sql.WriteSnapshot(context.Background(), tree.Version(), exporter.Next, SnapshotOptions{SaveTree: true, TraverseOrder: traverseOrder}) + require.NoError(t, err) + require.NoError(t, restoreTree.LoadSnapshot(tree.Version(), traverseOrder)) + } + require.Equal(t, restorePostOrderMt.Hash(), restorePreOrderMt.Hash()) + // play changes until version 20_000 require.NoError(t, opts.Iterator.Next()) require.Equal(t, int64(12_001), opts.Iterator.Version()) opts.Until = 20_000 opts.UntilHash = "25907b193c697903218d92fa70a87ef6cdd6fa5b9162d955a4d70a9d5d2c4824" - testTreeBuild(t, restoreMt, opts) + testTreeBuild(t, restorePostOrderMt, opts) } func TestOsmoLike_HotStart(t *testing.T) { @@ -259,7 +272,7 @@ func TestTree_Import(t *testing.T) { sql, err := NewSqliteDb(pool, SqliteDbOptions{Path: tmpDir}) require.NoError(t, err) - root, err := sql.ImportSnapshotFromTable(1, true) + root, err := sql.ImportSnapshotFromTable(1, PreOrder, true) require.NoError(t, err) require.NotNil(t, root) }