Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix handling of loading empty metadata file for queue #1042

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions src/crawlee/storage_clients/_memory/_creation_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ async def persist_metadata_if_enabled(*, data: dict, entity_directory: str, writ

# Write the metadata to the file
file_path = os.path.join(entity_directory, METADATA_FILENAME)
f = await asyncio.to_thread(open, file_path, mode='wb')
mode = 'r+b' if os.path.exists(file_path) else 'wb'
Copy link
Contributor

@Pijukatel Pijukatel Mar 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the file exists we can still open it with "wb" mode.If we open it with "r+b" mode, then we are not overwriting the whole file with new content but just changing the file starting from the beginning of the file. I doubt that is what we want.

Imagine file with content b"abc"
and you want to change it

with open(path, 'r+b') as f:
    f.write(b"x") 

-> b"xbc"

with open(path, 'wb') as f:
    f.write(b"x") 

-> b"x"

Maybe I have missed the point here, but so far it does not seem right to me.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I suppose it is because of wb + the fact that we work with files asynchronously that we have a situation where empty files are created.

We open the file in wb mode, deleting the contents and switching the asynchronous context. If the crawler is interrupted at this point, the file remains empty.

I settled on r+b mode because we always write formatted json files. These are also metadata files, so the fields in them are not changed. So I think it should work, since we will be overwriting the same number of lines each time.

f = await asyncio.to_thread(open, file_path, mode=mode)
try:
s = await json_dumps(data)
await asyncio.to_thread(f.write, s.encode('utf-8'))
Expand Down Expand Up @@ -409,9 +410,16 @@ def _determine_storage_path(
metadata_path = os.path.join(entry.path, METADATA_FILENAME)
if os.access(metadata_path, os.F_OK):
with open(metadata_path, encoding='utf-8') as metadata_file:
metadata = json.load(metadata_file)
if (id and metadata.get('id') == id) or (name and metadata.get('name') == name):
return entry.path
try:
metadata = json.load(metadata_file)
if (id and metadata.get('id') == id) or (name and metadata.get('name') == name):
return entry.path
except Exception:
logger.warning(
f'Metadata of store entry "{entry.name}" for store {name or id} could not be parsed. '
'The metadata file will be ignored.',
exc_info=True,
)

# Check for default storage directory as a last resort
if id == default_id:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,13 +334,14 @@ async def persist_record(self, record: KeyValueStoreRecord) -> None:
await asyncio.to_thread(f.close)

if self._memory_storage_client.write_metadata:
f = await asyncio.to_thread(open, record_metadata_path, mode='wb')
mode = 'r+b' if os.path.exists(record_metadata_path) else 'wb'
metadata_f = await asyncio.to_thread(open, record_metadata_path, mode=mode)

try:
record_metadata = KeyValueStoreRecordMetadata(key=record.key, content_type=record.content_type)
await asyncio.to_thread(f.write, record_metadata.model_dump_json(indent=2).encode('utf-8'))
await asyncio.to_thread(metadata_f.write, record_metadata.model_dump_json(indent=2).encode('utf-8'))
finally:
await asyncio.to_thread(f.close)
await asyncio.to_thread(metadata_f.close)

async def delete_persisted_record(self, record: KeyValueStoreRecord) -> None:
"""Delete the specified record from the key-value store."""
Expand Down
Loading