Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Create obstore store in fsspec on demand #198

Open
wants to merge 45 commits into
base: main
Choose a base branch
from

Conversation

machichima
Copy link

Construct the obstore store instance on demand in fsspec when calling methods. This allows automatic store creation for reads/writes across different buckets, aligning usage with fsspec conventions

constructe store with from_url using protocol and bucket name
@@ -45,6 +47,9 @@ def __init__(
self,
store: obs.store.ObjectStore,
*args,
config: dict[str, Any] = {},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we allow these, store should be optional?

And before merge we should enable typing overloads for better typing. You can see how from_url is implemented

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use store here for deciding the store Interface (whether it is S3Store, GCSStore, ...), so that in AsyncFsspecStore we don't need to decide the interface based on the protocol.

Maybe there's a better way of deciding the store interface?

obstore_fs: AsyncFsspecStore = fsspec.filesystem(
    "s3",
    store=S3Store,
    config={
        "endpoint": "http://localhost:30002",
        "access_key_id": "minio",
        "secret_access_key": "miniostorage",
        "virtual_hosted_style_request": True,  # path contain bucket name
    },
    client_options={"timeout": "99999s", "allow_http": "true"},
    retry_config={
        "max_retries": 2,
        "backoff": {
            "base": 2,
            "init_backoff": timedelta(seconds=2),
            "max_backoff": timedelta(seconds=16),
        },
        "retry_timeout": timedelta(minutes=3),
    },
)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll have a look at the typing later on

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh that's confusing because store is the type of the class and not an instance.

We should be able to use the from_url top level function directly here?

file_path = "/".join(path_li[1:])
return (bucket, file_path)

@lru_cache(maxsize=10)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if this cache size could be user specified but we can come back to it

@kylebarron
Copy link
Member

Would there be one fsspec instance per cloud provider? So if you wanted to use s3 and gcs you'd make two separate instances?

@machichima
Copy link
Author

machichima commented Feb 3, 2025

Would there be one fsspec instance per cloud provider? So if you wanted to use s3 and gcs you'd make two separate instances?

Based on what I know, to use fsspec, we will do:

fsspec.register_implementation("s3", AsyncFsspecStore)
fsspec.register_implementation("gs", AsyncFsspecStore)

Each will have their own AsyncFsspecStore innstance already. To config, we can use (based on my current implementation):

s3_fs: AsyncFsspecStore = fsspec.filesystem(
    "s3",
    store=S3Store,
    config={...}
)

gcs_fs: AsyncFsspecStore = fsspec.filesystem(
    "gs",
    store=GCSStore,
    config={...}
)

@kylebarron
Copy link
Member

It would be nice to take out the store arg and use from_url directly. from_url will automatically construct the correct store based on the url protocol

@machichima machichima force-pushed the obstore-instance-in-fsspec branch from 34f79f0 to 29464a7 Compare February 4, 2025 14:06
@machichima
Copy link
Author

I use from_url and remove store in the newest commit. However, by doing this, we need to specify the protocol by inherit the AsyncFsspecStore class for each store instance. I added here

class S3FsspecStore(AsyncFsspecStore):
protocol = "s3"
class GCSFsspecStore(AsyncFsspecStore):
protocol = "gs"
class AzureFsspecStore(AsyncFsspecStore):
protocol = "abfs"

@kylebarron
Copy link
Member

Is it true that a single fsspec class can't be associated with more than one protocol? E.g. Azure has three different protocols abfs, adlfs and az, but it looks like adlfs exports three separate classes.

@kylebarron
Copy link
Member

The latest PRs allow you to access the config back out of a store, which may be useful to you? You can validate that you already have a store in your cache for a specific bucket

@machichima
Copy link
Author

machichima commented Feb 6, 2025

Is it true that a single fsspec class can't be associated with more than one protocol? E.g. Azure has three different protocols abfs, adlfs and az, but it looks like adlfs exports three separate classes.

I think we can if those protocols refer to the same object instance. s3fs do have 2 protocols ("s3", "s3a"), see: https://github.com/fsspec/s3fs/blob/023aecf00b5c6243ff5f8a016dac8b6af3913c6b/s3fs/core.py#L277

I think abfs, adlfs, and az have different implementation so that they exports different classes. If we use them in obstore, I think we can define a class with protocol (abfs, adlfs, az), but need to test is they all work

@@ -104,6 +104,12 @@ def _split_path(self, path: str) -> Tuple[str, str]:
# no bucket name in path
return "", path

if path.startswith(self.protocol + "://"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming that this function will always receive something a URL like s3://mybucket/path/to/file, I'm inclined for this function to use urlparse instead of manually handling the parts of the URL

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will not always be s3://mybucket/path/to/file, but may be without protocol like mybucket/path/to/file

Copy link
Author

@machichima machichima Feb 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use urlparse like this here, which works for both s3://mybucket/path/to/file and mybucket/path/to/file

res = urlparse(path)
if res.scheme:
if res.scheme != self.protocol:
raise ValueError(f"Expect protocol to be {self.protocol}. Got {res.schema}")
path = res.netloc + res.path

@kylebarron
Copy link
Member

I think we can if those protocols refer to the same object instance. s3fs do have 2 protocols ("s3", "s3a"), see: fsspec/s3fs@023aecf/s3fs/core.py#L277

Oh cool! That seems to indicate that we could have a single class that defines supported protocols as:

    protocol = ("s3", "s3a", "gs", "az", "abfs", "adlfs")

Because the fsspec class used for each is the same? It's just custom kwargs that would need to be passed down for each?

@machichima
Copy link
Author

Oh cool! That seems to indicate that we could have a single class that defines supported protocols as:

    protocol = ("s3", "s3a", "gs", "az", "abfs", "adlfs")

Because the fsspec class used for each is the same? It's just custom kwargs that would need to be passed down for each?

I don't think we can put all the protocols together into a class, as when using fsspec.register_implementation("s3", AsyncFsspecStore), fsspec wouldn't tell AsyncFsspecStore what the protocol is, so that when constructing store instance, we cannot get the protocol

def _construct_store(self, bucket: str):
return from_url(
url=f"{self.protocol}://{bucket}",

I think the better way is to create obstore.fsspec.register("protocol"), that wraps around the fsspec.register and directly set the protocol for AsyncFsspecStore (like what mentioned in this comment), then we do not need more classes. Let me have a try.

@kylebarron
Copy link
Member

I did a quick look through your PR; it's really good progress but a few thoughts:

  • There are a bunch of cases where bucket, path = self._split_path(path) doesn't work because path is not in scope. E.g. in _cp_file where path1 and path2 are in scope
  • in _cp_file we need to validate that the bucket of the source and destination paths are the same
  • We need some tests for edits that happen in this PR
  • It's not clear how BufferedFileSimple works, because that subclasses from an upstream fsspec.spec.AbstractBufferedFile but doesn't touch obstore apis at all
  • If you don't already, I'd highly suggest using a linter like https://docs.astral.sh/ruff/ in your editor, so that you can catch some of these issue before hitting CI

@machichima
Copy link
Author

  • If you don't already, I'd highly suggest using a linter like https://docs.astral.sh/ruff/ in your editor, so that you can catch some of these issue before hitting CI

Thanks for the suggestion! I just added ruff linter and remove error for path. I also add the check for validating the two bucket name from two path are the same.

  • It's not clear how BufferedFileSimple works, because that subclasses from an upstream fsspec.spec.AbstractBufferedFile but doesn't touch obstore apis at all

For BufferedFileSimple, when self.fs.cat_file() is called, it will direct to the _cat_file() function in AsyncFsspecStore.

  • We need some tests for edits that happen in this PR

Yes! I will update the test in the next few days

Copy link

@Future-Outlier Future-Outlier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@kylebarron
Copy link
Member

Tests are failing; would you be able to fix that?

@machichima
Copy link
Author

machichima commented Feb 21, 2025

The failing tests are XFailed ones, I don't know why github now catching those. Should I take them out?

Oh sorry it's caused by run out of memory. I'll fix the test then

If register multiple time, and each of them have their instance, the
cache does not work and will end up with multiple instances with same
config
@machichima
Copy link
Author

machichima commented Feb 23, 2025

Tests are failing; would you be able to fix that?

Hi @kylebarron
I just add the code to clean up after each test. The OOM error is because the file system instances are not cleaned up.

@@ -296,3 +468,66 @@ def read(self, length: int = -1) -> Any:
data = self.fs.cat_file(self.path, self.loc, self.loc + length)
self.loc += length
return data


def register(protocol: str | list[str], asynchronous: bool = False) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def register(protocol: str | list[str], asynchronous: bool = False) -> None:
def register(protocol: str | list[str], *, asynchronous: bool = False) -> None:

I'm a big proponent of having a small number of positional parameters. This also allows us to make asynchronous a positional argument in the future without it being breaking

@@ -296,3 +467,50 @@ def read(self, length: int = -1) -> Any:
data = self.fs.cat_file(self.path, self.loc, self.loc + length)
self.loc += length
return data


def register(protocol: str | Iterable[str], *, asynchronous: bool = False) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be a follow-up PR, but we should support calling register with no arguments, which registers for all supported backends.

@kylebarron
Copy link
Member

I have to run, but I can do a final review later today or tomorrow

@machichima
Copy link
Author

machichima commented Feb 25, 2025

I just added the type checking in register, as setting the type for parameter in python only works for static check and not runtime, so we should explicitly check types here

@kylebarron
Copy link
Member

I tried a basic list operation and couldn't get it to work with this PR.

A successful list with s3fs:

import s3fs
fs = s3fs.S3FileSystem(anon=True)
path = "s3://sentinel-cogs/sentinel-s2-l2a-cogs/12/S/UF/2022/6/S2B_12SUF_20220609_0_L2A"
fs.ls(path)

prints

['sentinel-cogs/sentinel-s2-l2a-cogs/12/S/UF/2022/6/S2B_12SUF_20220609_0_L2A/AOT.tif',
 'sentinel-cogs/sentinel-s2-l2a-cogs/12/S/UF/2022/6/S2B_12SUF_20220609_0_L2A/B01.tif',
...

Trying to do this with this PR:

# Neither of these two work
store = AsyncFsspecStore(config={"skip_signature": True})
store = AsyncFsspecStore("s3", config={"skip_signature": True})
path = "s3://sentinel-cogs/sentinel-s2-l2a-cogs/12/S/UF/2022/6/S2B_12SUF_20220609_0_L2A"
store.ls(path)

raises with

GenericError: Generic URL error: Unable to recognise URL "abstract://"

You're checking self.protocol in multiple places but it's never getting set.

Comment on lines +59 to +60
"AWS_SKIP_SIGNATURE": "True",
"AWS_ALLOW_HTTP": "true",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"AWS_SKIP_SIGNATURE": "True",
"AWS_ALLOW_HTTP": "true",
"AWS_SKIP_SIGNATURE": True,
"AWS_ALLOW_HTTP": True,



@pytest.fixture
def s3_store_config(s3: str):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def s3_store_config(s3: str):
def s3_store_config(s3: str) -> S3ConfigInput:

args: positional arguments passed on to the `fsspec.asyn.AsyncFileSystem`
constructor.
Keyword Args:
asynchronous: Set to `True` if this instance is meant to be be called using
the fsspec async API. This should only be set to true when running
within a coroutine.
max_cache_size (int, optional): The maximum number of stores the cache
should keep. A cached store is kept internally for each bucket name.
Defaults to 10.
loop: since both fsspec/python and tokio/rust may be using loops, this
should be kept `None` for now, and will not be used.
batch_size: some operations on many files will batch their requests; if you
Copy link
Member

@kylebarron kylebarron Feb 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This example below in this docstring is no longer valid. Can you update it?


def __init__(
def __init__( # noqa: PLR0913
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems you want a protocol parameter here, which sets the value of protocol onto self?

"""
protocol_with_bucket = ["s3", "s3a", "gcs", "gs", "abfs", "https", "http"]

if self.protocol not in protocol_with_bucket:
Copy link
Member

@kylebarron kylebarron Feb 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are examples of protocols that we support that are not any of the above?

(That is, why are we even doing this check?)

Comment on lines +178 to +179
if "/" not in path:
return path, ""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't you returning a tuple of (bucket, file_path)? Then returning path, "" doesn't make sense.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we'd want to error in this case? There's no way for us to infer the bucket to use.

As it stands you're searching through the string twice, once here and again below in path.split.

Instead, you can call path.split("/", 1) once. If the result is a list of length 1, then you know a "/" wasn't in the path, and then you can error.

Comment on lines +180 to +182
path_li = path.split("/")
bucket = path_li[0]
file_path = "/".join(path_li[1:])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much simpler than this is to split on only the first / character:

path = "bucket/path/to/file.txt"
path.split("/", 1)
# ["bucket", "path/to/file.txt"]

Then you don't need to split and rejoin the path

super().__init__(
*args,
asynchronous=asynchronous,
loop=loop,
batch_size=batch_size,
)

def _split_path(self, path: str) -> tuple[str, str]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only thing this function uses from self is self.protocol. Let's move _split_path into global scope, and then we can test _split_path specifically from the test file.

We should validate that we can split the path both for URLs with a protocol and for "paths" without the protocol.

Comment on lines +377 to +378
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, super().info, path, **_kwargs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you calling super().info? You can call super()._info, which is async, and not need to touch the running event loop at all.

return await loop.run_in_executor(None, super().info, path, **_kwargs)

@staticmethod
def _fill_bucket_name(path: str, bucket: str) -> str:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only used in two places. Can we just copy the f-string above and delete this helper function?

@kylebarron
Copy link
Member

I just added the type checking in register, as setting the type for parameter in python only works for static check and not runtime, so we should explicitly check types here

I'd rather not. I'd rather just rely on the static type checker (at least for the register function) and keep the code more concise. Especially where register isn't a confusing API. It shouldn't be surprising that if you call register(123), the code won't work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants