diff --git a/chia/data_layer/data_store.py b/chia/data_layer/data_store.py index 17fb9e4408fb..abe18674d7de 100644 --- a/chia/data_layer/data_store.py +++ b/chia/data_layer/data_store.py @@ -39,7 +39,7 @@ ) from chia.types.blockchain_format.program import Program from chia.types.blockchain_format.sized_bytes import bytes32 -from chia.util.db_wrapper import DBWrapper2 +from chia.util.db_wrapper import SQLITE_MAX_VARIABLE_NUMBER, DBWrapper2 log = logging.getLogger(__name__) @@ -1542,16 +1542,18 @@ async def insert_batch( raise Exception(f"Operation in batch is not insert or delete: {change}") if len(pending_upsert_new_hashes) > 0: - to_update_hashes: Set[bytes32] = set() - for hash in pending_upsert_new_hashes.keys(): - while True: - if hash in to_update_hashes: - break - to_update_hashes.add(hash) - node = await self._get_one_ancestor(hash, store_id) - if node is None: - break - hash = node.hash + to_update_hashes: Set[bytes32] = set(pending_upsert_new_hashes.keys()) + to_update_queue: List[bytes32] = list(pending_upsert_new_hashes.keys()) + batch_size = min(500, SQLITE_MAX_VARIABLE_NUMBER - 10) + + while len(to_update_queue) > 0: + nodes = await self._get_one_ancestor_multiple_hashes(to_update_queue[:batch_size], store_id) + to_update_queue = to_update_queue[batch_size:] + for node in nodes: + if node.hash not in to_update_hashes: + to_update_hashes.add(node.hash) + to_update_queue.append(node.hash) + assert latest_local_root is not None assert latest_local_root.node_hash is not None new_root_hash = await self.batch_upsert( @@ -1653,6 +1655,32 @@ async def _get_one_ancestor( return None return InternalNode.from_row(row=row) + async def _get_one_ancestor_multiple_hashes( + self, + node_hashes: List[bytes32], + store_id: bytes32, + generation: Optional[int] = None, + ) -> List[InternalNode]: + async with self.db_wrapper.reader() as reader: + node_hashes_place_holders = ",".join("?" for _ in node_hashes) + if generation is None: + generation = await self.get_tree_generation(store_id=store_id) + cursor = await reader.execute( + f""" + SELECT * from node INNER JOIN ( + SELECT ancestors.ancestor AS hash, MAX(ancestors.generation) AS generation + FROM ancestors + WHERE ancestors.hash IN ({node_hashes_place_holders}) + AND ancestors.tree_id == ? + AND ancestors.generation <= ? + GROUP BY hash + ) asc on asc.hash == node.hash + """, + [*node_hashes, store_id, generation], + ) + rows = await cursor.fetchall() + return [InternalNode.from_row(row=row) for row in rows] + async def build_ancestor_table_for_latest_root(self, store_id: bytes32) -> None: async with self.db_wrapper.writer() as writer: root = await self.get_tree_root(store_id=store_id)