Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Absolute file paths created by taps running in BATCH mode can't be processed by the Sink #999

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions samples/sample_tap_countries/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from samples.sample_tap_countries.countries_tap import SampleTapCountries

SampleTapCountries.cli()
3 changes: 3 additions & 0 deletions samples/sample_target_csv/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from samples.sample_target_csv.csv_target import SampleTargetCSV

SampleTargetCSV.cli()
3 changes: 3 additions & 0 deletions samples/sample_target_sqlite/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from samples.sample_target_sqlite import SQLiteTarget

SQLiteTarget.cli()
23 changes: 18 additions & 5 deletions singer_sdk/helpers/_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from contextlib import contextmanager
from dataclasses import asdict, dataclass, field
from typing import IO, TYPE_CHECKING, Any, ClassVar, Generator
from urllib.parse import ParseResult, parse_qs, urlencode, urlparse
from urllib.parse import ParseResult, urlencode, urlparse

import fs

Expand Down Expand Up @@ -119,18 +119,31 @@ def from_dict(cls, data: dict[str, Any]) -> StorageTarget:
"""
return cls(**data)

@staticmethod
def split_url(url: str) -> tuple[str, str]:
"""Split a URL into a head and tail pair.

Args:
url: The URL to split.

Returns:
A tuple of the head and tail parts of the URL.
"""
return fs.path.split(url)

@classmethod
def from_url(cls, url: ParseResult) -> StorageTarget:
"""Create a storage target from a URL.
def from_url(cls, url: str) -> StorageTarget:
"""Create a storage target from a file URL.

Args:
url: The URL to create the storage target from.

Returns:
The created storage target.
"""
new_url = url._replace(path="", query="")
return cls(root=new_url.geturl(), params=parse_qs(url.query))
parsed_url = urlparse(url)
new_url = parsed_url._replace(query="")
return cls(root=new_url.geturl())

@property
def fs_url(self) -> ParseResult:
Expand Down
9 changes: 4 additions & 5 deletions singer_sdk/sinks/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from logging import Logger
from types import MappingProxyType
from typing import IO, Any, Mapping, Sequence
from urllib.parse import urlparse

from dateutil import parser
from jsonschema import Draft4Validator, FormatChecker
Expand Down Expand Up @@ -451,16 +450,16 @@ def process_batch_files(
storage: StorageTarget | None = None

for path in files:
url = urlparse(path)
head, tail = StorageTarget.split_url(path)

if self.batch_config:
storage = self.batch_config.storage
else:
storage = StorageTarget.from_url(url)
storage = StorageTarget.from_url(head)

if encoding.format == BatchFileFormat.JSONL:
with storage.fs(create=False) as fs:
with fs.open(url.path, mode="rb") as file:
with storage.fs(create=False) as batch_fs:
with batch_fs.open(tail, mode="rb") as file:
if encoding.compression == "gzip":
file = gzip_open(file)
context = {"records": [json.loads(line) for line in file]}
Expand Down
21 changes: 15 additions & 6 deletions tests/core/test_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,18 @@ def test_storage_get_url():
assert url.replace("\\", "/").endswith("root_dir/prefix--file.jsonl.gz")


def test_storage_from_url():
url = urlparse("s3://bucket/path/to/file?region=us-east-1")
target = StorageTarget.from_url(url)
assert target.root == "s3://bucket"
assert target.prefix is None
assert target.params == {"region": ["us-east-1"]}
@pytest.mark.parametrize(
"file_url,root",
[
pytest.param(
"file:///Users/sdk/path/to/file",
"file:///Users/sdk/path/to",
id="local",
),
],
)
def test_storage_from_url(file_url: str, root: str):
"""Test storage target from URL."""
head, _ = StorageTarget.split_url(file_url)
target = StorageTarget.from_url(head)
assert target.root == root