Skip to content

Commit

Permalink
ADD: Add compression parameter to DBNStore.to_file
Browse files Browse the repository at this point in the history
  • Loading branch information
nmacholl committed Sep 16, 2024
1 parent 59acae2 commit f80864b
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
- Added `mode` parameter to `DBNStore.to_csv` to control the file writing mode
- Added `mode` parameter to `DBNStore.to_json` to control the file writing mode
- Added `mode` parameter to `DBNStore.to_parquet` to control the file writing mode
- Added `compression` parameter to `DBNStore.to_file` which controls the output compression format

#### Breaking changes
- Changed default write mode for `DBNStore.to_csv` to overwrite ("w")
Expand Down
33 changes: 31 additions & 2 deletions databento/common/dbnstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -1041,6 +1041,7 @@ def to_file(
self,
path: PathLike[str] | str,
mode: Literal["w", "x"] = "w",
compression: Compression | str | None = None,
) -> None:
"""
Write the data to a DBN file at the given path.
Expand All @@ -1051,6 +1052,8 @@ def to_file(
The file path to write to.
mode : str, default "w"
The file write mode to use, either "x" or "w".
compression : Compression or str, optional
The compression format to write. If `None`, uses the same compression as the underlying data.
Raises
------
Expand All @@ -1062,9 +1065,35 @@ def to_file(
If path is not writable.
"""
compression = validate_maybe_enum(compression, Compression, "compression")
file_path = validate_file_write_path(path, "path", exist_ok=mode == "w")
file_path.write_bytes(self._data_source.reader.read())
self._data_source = FileDataSource(file_path)

writer: IO[bytes] | zstandard.ZstdCompressionWriter
if compression is None or compression == self.compression:
# Handle trivial case
with open(file_path, mode=f"{mode}b") as writer:
reader = self._data_source.reader
while chunk := reader.read(2**16):
writer.write(chunk)
return

if compression == Compression.ZSTD:
writer = zstandard.ZstdCompressor(
write_checksum=True,
).stream_writer(
open(file_path, mode=f"{mode}b"),
closefd=True,
)
else:
writer = open(file_path, mode=f"{mode}b")

try:
reader = self.reader

while chunk := reader.read(2**16):
writer.write(chunk)
finally:
writer.close()

def to_json(
self,
Expand Down
31 changes: 31 additions & 0 deletions tests/test_historical_bento.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from databento.common.error import BentoError
from databento.common.publishers import Dataset
from databento.common.types import DBNRecord
from databento_dbn import Compression
from databento_dbn import MBOMsg
from databento_dbn import Schema
from databento_dbn import SType
Expand Down Expand Up @@ -243,6 +244,36 @@ def test_to_file_exclusive(
dbnstore.to_file(path=dbn_path, mode="x")


@pytest.mark.parametrize(
"compression",
[
Compression.NONE,
Compression.ZSTD,
],
)
def test_to_file_compression(
test_data: Callable[[Dataset, Schema], bytes],
tmp_path: Path,
compression: Compression,
) -> None:
"""
Test that specifying a compression for DBNStore.to_file writes the desired
compression mode.
"""
# Arrange
stub_data = test_data(Dataset.GLBX_MDP3, Schema.MBO)
dbnstore = DBNStore.from_bytes(data=stub_data)
dbn_path = tmp_path / "my_test.dbn"
dbnstore.to_file(
path=dbn_path,
compression=compression,
)

# Act, Assert
new_store = databento.read_dbn(dbn_path)
assert new_store.compression == compression


def test_to_csv_overwrite(
test_data: Callable[[Dataset, Schema], bytes],
tmp_path: Path,
Expand Down

0 comments on commit f80864b

Please sign in to comment.