From 26191032c7722aa302d6007f0316b56da4cf11ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez=20Mondrag=C3=B3n?= Date: Wed, 6 Dec 2023 15:25:53 -0600 Subject: [PATCH] refactor: Use ffspec --- poetry.lock | 36 ++++++++++++++++++++++- pyproject.toml | 1 + singer_sdk/contrib/batch_encoder_jsonl.py | 6 ++-- singer_sdk/helpers/_batch.py | 34 ++++++++++++++++----- 4 files changed, 66 insertions(+), 11 deletions(-) diff --git a/poetry.lock b/poetry.lock index c476d371c..1aa6bd4e1 100644 --- a/poetry.lock +++ b/poetry.lock @@ -652,6 +652,40 @@ boto3 = ">=1.9,<2.0" fs = ">=2.4,<3.0" six = ">=1.10,<2.0" +[[package]] +name = "fsspec" +version = "2023.1.0" +description = "File-system specification" +optional = false +python-versions = ">=3.7" +files = [ + {file = "fsspec-2023.1.0-py3-none-any.whl", hash = "sha256:b833e2e541e9e8cde0ab549414187871243177feb3d344f9d27b25a93f5d8139"}, + {file = "fsspec-2023.1.0.tar.gz", hash = "sha256:fbae7f20ff801eb5f7d0bedf81f25c787c0dfac5e982d98fa3884a9cde2b5411"}, +] + +[package.extras] +abfs = ["adlfs"] +adl = ["adlfs"] +arrow = ["pyarrow (>=1)"] +dask = ["dask", "distributed"] +dropbox = ["dropbox", "dropboxdrivefs", "requests"] +entrypoints = ["importlib-metadata"] +fuse = ["fusepy"] +gcs = ["gcsfs"] +git = ["pygit2"] +github = ["requests"] +gs = ["gcsfs"] +gui = ["panel"] +hdfs = ["pyarrow (>=1)"] +http = ["aiohttp (!=4.0.0a0,!=4.0.0a1)", "requests"] +libarchive = ["libarchive-c"] +oci = ["ocifs"] +s3 = ["s3fs"] +sftp = ["paramiko"] +smb = ["smbprotocol"] +ssh = ["paramiko"] +tqdm = ["tqdm"] + [[package]] name = "furo" version = "2023.9.10" @@ -2611,4 +2645,4 @@ testing = ["pytest", "pytest-durations"] [metadata] lock-version = "2.0" python-versions = ">=3.8" -content-hash = "f68c93fa98d9ee287c69e3de2b1fa59d528f90596cf83e824164a5d12f66e571" +content-hash = "1dfe5b9d027704356027c1b9af548b612f925592de83e45b7812361b934f370b" diff --git a/pyproject.toml b/pyproject.toml index 6c1baa805..7014bbe36 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -89,6 +89,7 @@ pyarrow = { version = ">=13", optional = true } # Testing dependencies installed as optional 'testing' extras pytest = {version=">=7.2.1", optional = true} pytest-durations = {version = ">=1.2.0", optional = true} +fsspec = ">=2023.1.0" [tool.poetry.extras] docs = [ diff --git a/singer_sdk/contrib/batch_encoder_jsonl.py b/singer_sdk/contrib/batch_encoder_jsonl.py index 6ce4c8793..88f31844a 100644 --- a/singer_sdk/contrib/batch_encoder_jsonl.py +++ b/singer_sdk/contrib/batch_encoder_jsonl.py @@ -38,9 +38,10 @@ def get_batches( start=1, ): filename = f"{prefix}{sync_id}-{i}.json.gz" - with self.batch_config.storage.fs(create=True) as fs: + with self.batch_config.storage.fs() as fs: # TODO: Determine compression from config. - with fs.open(filename, "wb") as f, gzip.GzipFile( + file_url = self.batch_config.storage.file_url(filename) + with fs.open(file_url, mode="wb") as f, gzip.GzipFile( fileobj=f, mode="wb", ) as gz: @@ -48,5 +49,4 @@ def get_batches( (json.dumps(record, default=str) + "\n").encode() for record in chunk ) - file_url = fs.geturl(filename) yield [file_url] diff --git a/singer_sdk/helpers/_batch.py b/singer_sdk/helpers/_batch.py index b57002e1d..a2406ef4f 100644 --- a/singer_sdk/helpers/_batch.py +++ b/singer_sdk/helpers/_batch.py @@ -3,6 +3,7 @@ from __future__ import annotations import enum +import os.path import platform import typing as t from contextlib import contextmanager @@ -10,12 +11,10 @@ from urllib.parse import ParseResult, urlencode, urlparse import fs +import fsspec from singer_sdk._singerlib.messages import Message, SingerMessageType -if t.TYPE_CHECKING: - from fs.base import FS - DEFAULT_BATCH_SIZE = 10000 @@ -174,7 +173,11 @@ def fs_url(self) -> ParseResult: return urlparse(self.root)._replace(query=urlencode(self.params)) @contextmanager - def fs(self, **kwargs: t.Any) -> t.Generator[FS, None, None]: + def fs( + self, + *, + create: bool = False, + ) -> t.Generator[fsspec.AbstractFileSystem, None, None]: """Get a filesystem object for the storage target. Args: @@ -183,9 +186,26 @@ def fs(self, **kwargs: t.Any) -> t.Generator[FS, None, None]: Returns: The filesystem object. """ - filesystem = fs.open_fs(self.fs_url.geturl(), **kwargs) - yield filesystem - filesystem.close() + fs, url = fsspec.core.url_to_fs( + self.root, + **self.params, + ) + if create: + fs.mkdirs(url, exist_ok=True) + yield fs + + def file_url(self, filename: str) -> str: + """Get the URL of a file in the storage target. + + Args: + filename: The filename to get the URL for. + + Returns: + The URL of the file. + """ + return self.fs_url._replace( + path=os.path.join(self.fs_url.path, filename) + ).geturl() @contextmanager def open(