-
-
Notifications
You must be signed in to change notification settings - Fork 308
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
implement store.list_prefix
and store._set_many
#2064
Conversation
@martindurant i'm on flaky train internet and the github web API is giving me inconsistent signals about whether you have been requested for review; to be clear, I would appreciate your review :) |
What matters is how we intend to call it! fsspec likes to provide full paths (but has explicit prefix interfaces) - of course it's fine to tailor the situation to what we need, but I don't really know what that is.
I may have got a notification or a few. |
@@ -193,14 +193,10 @@ async def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]: | |||
------- | |||
AsyncGenerator[str, None] | |||
""" | |||
for p in (self.root / prefix).rglob("*"): | |||
if p.is_file(): | |||
yield str(p) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Were we getting duplicates?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think we were. this code path was not tested until this PR
src/zarr/store/local.py
Outdated
to_strip = str(self.root) + "/" | ||
for p in (self.root / prefix).rglob("*"): | ||
if p.is_file(): | ||
yield str(p).replace(to_strip, "") | ||
yield str(p).removeprefix(to_strip) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
src/zarr/store/remote.py
Outdated
for onefile in await self._fs._ls(prefix, detail=False): | ||
yield onefile | ||
find_str = "/".join([self.path, prefix]) | ||
for onefile in await self._fs._find(find_str): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The defaults for find are: maxdepth=None, withdirs=False, detail=False; maybe good to be specific.
Why is find() better than ls()? The former will return all child files, not just one level deep - is that the intent? If not, ls() ought to be generally more efficient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using find
here is merely due to my ignorance of fsspec. I will implement ls
as you suggest
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It depends on whether you want one directory level or everything below it. When I wrote the original, I didn't know the intent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i believe the intent here is to list everything below prefix
(at least, that's how I'm using it)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I misunderstood your first comment. since the intent is to use the behavior of _find
, I'm keeping it, but adding explicit kwargs as you suggested.
src/zarr/sync.py
Outdated
""" | ||
result = [] | ||
async for x in data: | ||
result.append(x) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
asyncio.gather? Like above, not much point in having coroutines if we serially wait for them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we can use asyncio.gather
here, because AsyncGenerator
is not iterable. Happy to be corrected, since I don't really know asyncio very well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and for clarification, _collect_aiterator
largely exists for convenience in testing, because I need some way to collect async generators when debugging with pdb
. This function is not intended for use in anything performance sensitive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably re-examine the use of async-iterators, though. If we can't gather() on them (seems to be true?), then they are the wrong abstraction since gather() is probably always what we actually want.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On second thoughts, maybe I'm wrong - does async for
schedule all the coroutines at once?? Should be easy to test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it schedules them all at once. in [x async for x in async_generator]
, x
is not an awaitable; it's already awaited. since the basic model of the generator is that it's a resumable, stateful iterator, I don't think we can schedule all the tasks at once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the idea with the generators is to a) support seamless pagination and b) support pipelining (del_prefix
will be able to take advantage of this at some point).
store_dict = dict(zip(keys, data_buf, strict=True)) | ||
await store._set_dict(store_dict) | ||
for k, v in store_dict.items(): | ||
assert self.get(store, k).to_bytes() == v.to_bytes() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if x.to_bytes() == y.to_bytes(), does x== y?
Isn't there a multiple get? Maybe not important here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if x.to_bytes() == y.to_bytes(), does x== y?
no, and I suspect this might be deliberate since in principle Buffer
instances can have identical bytes but different devices (e.g., gpu memory vs host memory); thus x == y
might only be true if two buffers are bytes-equal and device-equal, but I'm speculating here. @madsbk would have a better answer I think.
Isn't there a multiple get? Maybe not important here.
there is no multiple get (nor a multiple set, nor a multiple delete).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
xref: src/zarr/buffer.py in #2006
@@ -113,8 +107,8 @@ def store_kwargs(self, request) -> dict[str, str | bool]: | |||
raise AssertionError | |||
|
|||
@pytest.fixture(scope="function") | |||
def store(self, store_kwargs: dict[str, str | bool]) -> RemoteStore: | |||
url = store_kwargs["url"] | |||
async def store(self, store_kwargs: dict[str, str | bool | UPath]) -> RemoteStore: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't actually async
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
correct, but the class we are inheriting from defines this as an async method
src/zarr/abc/store.py
Outdated
@@ -221,6 +224,13 @@ def close(self) -> None: | |||
self._is_open = False | |||
pass | |||
|
|||
async def _set_dict(self, dict: Mapping[str, Buffer]) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
set_many()
(analogous to insert_many
)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved away from set_dict and switched to _set_many
…nto store-list-prefix
we should try to get this in, because it fixes problematic |
store.list_prefix
and store._set_dict
store.list_prefix
and store._set_many
* v3: chore: update pre-commit hooks (zarr-developers#2222) fix: validate v3 dtypes when loading/creating v3 metadata (zarr-developers#2209) fix typo in store integration test (zarr-developers#2223) Basic Zarr-python 2.x compatibility changes (zarr-developers#2098) Make Group.arrays, groups compatible with v2 (zarr-developers#2213) Typing fixes to test_indexing (zarr-developers#2193) Default to RemoteStore for fsspec URIs (zarr-developers#2198) Make MemoryStore serialiazable (zarr-developers#2204) [v3] Implement Group methods for empty, full, ones, and zeros (zarr-developers#2210) implement `store.list_prefix` and `store._set_many` (zarr-developers#2064) Fixed codec for v2 data with no fill value (zarr-developers#2207)
list_prefix
for stores.list_prefix(prefix=foo)
now consistently returns keys with the shared prefixfoo
stripped. I'd be fine altering this to return absolute keys instead.list_prefix
on theStoreTests
base class_set_dict(Mapping[str, Buffer])
method to stores that allows declaring a collection of key: value pairs to write to storage. The primary usage is to make store tests simpler via a declarative API. I also suspect that making tests simpler anticipates making other code simpler. The default implementation of_set_dict
simply wrapsstore.set
, but it's easy to imagine fancier batching / transaction implementations._set_dict
.TODO: