Skip to content

Commit

Permalink
Process file URL by splitting it
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon committed Sep 27, 2022
1 parent 3e819dd commit 9bb7ea1
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 16 deletions.
23 changes: 18 additions & 5 deletions singer_sdk/helpers/_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from contextlib import contextmanager
from dataclasses import asdict, dataclass, field
from typing import IO, TYPE_CHECKING, Any, ClassVar, Generator
from urllib.parse import ParseResult, parse_qs, urlencode, urlparse
from urllib.parse import ParseResult, urlencode, urlparse

import fs

Expand Down Expand Up @@ -119,18 +119,31 @@ def from_dict(cls, data: dict[str, Any]) -> StorageTarget:
"""
return cls(**data)

@staticmethod
def split_url(url: str) -> tuple[str, str]:
"""Split a URL into a head and tail pair.
Args:
url: The URL to split.
Returns:
A tuple of the head and tail parts of the URL.
"""
return fs.path.split(url)

@classmethod
def from_url(cls, url: ParseResult) -> StorageTarget:
"""Create a storage target from a URL.
def from_url(cls, url: str) -> StorageTarget:
"""Create a storage target from a file URL.
Args:
url: The URL to create the storage target from.
Returns:
The created storage target.
"""
new_url = url._replace(path="", query="")
return cls(root=new_url.geturl(), params=parse_qs(url.query))
parsed_url = urlparse(url)
new_url = parsed_url._replace(query="")
return cls(root=new_url.geturl())

@property
def fs_url(self) -> ParseResult:
Expand Down
9 changes: 4 additions & 5 deletions singer_sdk/sinks/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from logging import Logger
from types import MappingProxyType
from typing import IO, Any, Mapping, Sequence
from urllib.parse import urlparse

from dateutil import parser
from jsonschema import Draft4Validator, FormatChecker
Expand Down Expand Up @@ -451,16 +450,16 @@ def process_batch_files(
storage: StorageTarget | None = None

for path in files:
url = urlparse(path)
head, tail = StorageTarget.split_url(path)

if self.batch_config:
storage = self.batch_config.storage
else:
storage = StorageTarget.from_url(url)
storage = StorageTarget.from_url(head)

if encoding.format == BatchFileFormat.JSONL:
with storage.fs(create=False) as fs:
with fs.open(url.path, mode="rb") as file:
with storage.fs(create=False) as batch_fs:
with batch_fs.open(tail, mode="rb") as file:
if encoding.compression == "gzip":
file = gzip_open(file)
context = {"records": [json.loads(line) for line in file]}
Expand Down
21 changes: 15 additions & 6 deletions tests/core/test_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,18 @@ def test_storage_get_url():
assert url.replace("\\", "/").endswith("root_dir/prefix--file.jsonl.gz")


def test_storage_from_url():
url = urlparse("s3://bucket/path/to/file?region=us-east-1")
target = StorageTarget.from_url(url)
assert target.root == "s3://bucket"
assert target.prefix is None
assert target.params == {"region": ["us-east-1"]}
@pytest.mark.parametrize(
"file_url,root",
[
pytest.param(
"file:///Users/sdk/path/to/file",
"file:///Users/sdk/path/to",
id="local",
),
],
)
def test_storage_from_url(file_url: str, root: str):
"""Test storage target from URL."""
head, _ = StorageTarget.split_url(file_url)
target = StorageTarget.from_url(head)
assert target.root == root

0 comments on commit 9bb7ea1

Please sign in to comment.