Skip to content

Commit

Permalink
refactor: Use ffspec
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon committed Jan 22, 2024
1 parent c9c0b0b commit 2619103
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 11 deletions.
36 changes: 35 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ pyarrow = { version = ">=13", optional = true }
# Testing dependencies installed as optional 'testing' extras
pytest = {version=">=7.2.1", optional = true}
pytest-durations = {version = ">=1.2.0", optional = true}
fsspec = ">=2023.1.0"

[tool.poetry.extras]
docs = [
Expand Down
6 changes: 3 additions & 3 deletions singer_sdk/contrib/batch_encoder_jsonl.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ def get_batches(
start=1,
):
filename = f"{prefix}{sync_id}-{i}.json.gz"
with self.batch_config.storage.fs(create=True) as fs:
with self.batch_config.storage.fs() as fs:
# TODO: Determine compression from config.
with fs.open(filename, "wb") as f, gzip.GzipFile(
file_url = self.batch_config.storage.file_url(filename)
with fs.open(file_url, mode="wb") as f, gzip.GzipFile(
fileobj=f,
mode="wb",
) as gz:
gz.writelines(
(json.dumps(record, default=str) + "\n").encode()
for record in chunk
)
file_url = fs.geturl(filename)
yield [file_url]
34 changes: 27 additions & 7 deletions singer_sdk/helpers/_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,18 @@
from __future__ import annotations

import enum
import os.path
import platform
import typing as t
from contextlib import contextmanager
from dataclasses import asdict, dataclass, field
from urllib.parse import ParseResult, urlencode, urlparse

import fs
import fsspec

from singer_sdk._singerlib.messages import Message, SingerMessageType

if t.TYPE_CHECKING:
from fs.base import FS

DEFAULT_BATCH_SIZE = 10000


Expand Down Expand Up @@ -174,7 +173,11 @@ def fs_url(self) -> ParseResult:
return urlparse(self.root)._replace(query=urlencode(self.params))

@contextmanager
def fs(self, **kwargs: t.Any) -> t.Generator[FS, None, None]:
def fs(
self,
*,
create: bool = False,
) -> t.Generator[fsspec.AbstractFileSystem, None, None]:
"""Get a filesystem object for the storage target.
Args:
Expand All @@ -183,9 +186,26 @@ def fs(self, **kwargs: t.Any) -> t.Generator[FS, None, None]:
Returns:
The filesystem object.
"""
filesystem = fs.open_fs(self.fs_url.geturl(), **kwargs)
yield filesystem
filesystem.close()
fs, url = fsspec.core.url_to_fs(
self.root,
**self.params,
)
if create:
fs.mkdirs(url, exist_ok=True)
yield fs

def file_url(self, filename: str) -> str:
"""Get the URL of a file in the storage target.
Args:
filename: The filename to get the URL for.
Returns:
The URL of the file.
"""
return self.fs_url._replace(
path=os.path.join(self.fs_url.path, filename)
).geturl()

@contextmanager
def open(
Expand Down

0 comments on commit 2619103

Please sign in to comment.