Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DL query multiple ancestors in upsert. #18146

Merged
merged 1 commit into from
Jun 24, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 39 additions & 11 deletions chia/data_layer/data_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
)
from chia.types.blockchain_format.program import Program
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.util.db_wrapper import DBWrapper2
from chia.util.db_wrapper import SQLITE_MAX_VARIABLE_NUMBER, DBWrapper2

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -1542,16 +1542,18 @@ async def insert_batch(
raise Exception(f"Operation in batch is not insert or delete: {change}")

if len(pending_upsert_new_hashes) > 0:
to_update_hashes: Set[bytes32] = set()
for hash in pending_upsert_new_hashes.keys():
while True:
if hash in to_update_hashes:
break
to_update_hashes.add(hash)
node = await self._get_one_ancestor(hash, store_id)
if node is None:
break
hash = node.hash
to_update_hashes: Set[bytes32] = set(pending_upsert_new_hashes.keys())
to_update_queue: List[bytes32] = list(pending_upsert_new_hashes.keys())
batch_size = min(500, SQLITE_MAX_VARIABLE_NUMBER - 10)

while len(to_update_queue) > 0:
nodes = await self._get_one_ancestor_multiple_hashes(to_update_queue[:batch_size], store_id)
to_update_queue = to_update_queue[batch_size:]
for node in nodes:
if node.hash not in to_update_hashes:
to_update_hashes.add(node.hash)
to_update_queue.append(node.hash)

assert latest_local_root is not None
assert latest_local_root.node_hash is not None
new_root_hash = await self.batch_upsert(
Expand Down Expand Up @@ -1653,6 +1655,32 @@ async def _get_one_ancestor(
return None
return InternalNode.from_row(row=row)

async def _get_one_ancestor_multiple_hashes(
self,
node_hashes: List[bytes32],
store_id: bytes32,
generation: Optional[int] = None,
) -> List[InternalNode]:
async with self.db_wrapper.reader() as reader:
node_hashes_place_holders = ",".join("?" for _ in node_hashes)
if generation is None:
generation = await self.get_tree_generation(store_id=store_id)
cursor = await reader.execute(
f"""
SELECT * from node INNER JOIN (
SELECT ancestors.ancestor AS hash, MAX(ancestors.generation) AS generation
FROM ancestors
WHERE ancestors.hash IN ({node_hashes_place_holders})
AND ancestors.tree_id == ?
AND ancestors.generation <= ?
GROUP BY hash
) asc on asc.hash == node.hash
""",
[*node_hashes, store_id, generation],
)
rows = await cursor.fetchall()
return [InternalNode.from_row(row=row) for row in rows]

async def build_ancestor_table_for_latest_root(self, store_id: bytes32) -> None:
async with self.db_wrapper.writer() as writer:
root = await self.get_tree_root(store_id=store_id)
Expand Down
Loading