Skip to content

Commit

Permalink
ADD: DBNStore.to_parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
nmacholl committed Jan 10, 2024
1 parent aef77da commit d1dd6d5
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 16 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog

## 0.26.0 - TBD

This release adds support for transcoding DBN data into Apache parquet.

#### Enhancements
- Added `DBNStore.to_parquet` for transcoding DBN data into Apache parquet using `pyarrow`

## 0.25.0 - 2024-01-09

#### Breaking changes
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ The minimum dependencies as found in the `pyproject.toml` are also listed below:
- databento-dbn = "0.14.2"
- numpy= ">=1.23.5"
- pandas = ">=1.5.3"
- pyarrow = ">=13.0.0"
- requests = ">=2.24.0"
- zstandard = ">=0.21.0"

Expand Down
103 changes: 87 additions & 16 deletions databento/common/dbnstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
Any,
BinaryIO,
Callable,
Final,
Literal,
Protocol,
overload,
Expand All @@ -23,6 +24,8 @@
import databento_dbn
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import zstandard
from databento_dbn import FIXED_PRICE_SCALE
from databento_dbn import Compression
Expand Down Expand Up @@ -51,6 +54,8 @@

logger = logging.getLogger(__name__)

PARQUET_CHUNK_SIZE: Final = 2**16

if TYPE_CHECKING:
from databento.historical.client import Historical

Expand Down Expand Up @@ -791,18 +796,14 @@ def to_csv(
compression : Compression or str, default `Compression.NONE`
The output compression for writing.
schema : Schema or str, optional
The schema for the csv.
The DBN schema for the csv.
This is only required when reading a DBN stream with mixed record types.
Raises
------
ValueError
If the schema for the array cannot be determined.
Notes
-----
Requires all the data to be brought up into memory to then be written.
"""
compression = validate_enum(compression, Compression, "compression")
schema = validate_maybe_enum(schema, Schema, "schema")
Expand Down Expand Up @@ -870,7 +871,7 @@ def to_df(
a 'symbol' column, mapping the instrument ID to its requested symbol for
every record.
schema : Schema or str, optional
The schema for the dataframe.
The DBN schema for the dataframe.
This is only required when reading a DBN stream with mixed record types.
count : int, optional
If set, instead of returning a single `DataFrame` a `DataFrameIterator`
Expand All @@ -887,7 +888,7 @@ def to_df(
Raises
------
ValueError
If the schema for the array cannot be determined.
If the DBN schema is unspecified and cannot be determined.
"""
schema = validate_maybe_enum(schema, Schema, "schema")
Expand Down Expand Up @@ -919,6 +920,81 @@ def to_df(

return df_iter

def to_parquet(
self,
path: Path | str,
price_type: Literal["fixed", "float"] = "float",
pretty_ts: bool = True,
map_symbols: bool = True,
schema: Schema | str | None = None,
**kwargs: Any,
) -> None:
"""
Write the data to a parquet file at the given path.
Parameters
----------
price_type : str, default "float"
The price type to use for price fields.
If "fixed", prices will have a type of `int` in fixed decimal format; each unit representing 1e-9 or 0.000000001.
If "float", prices will have a type of `float`.
The "decimal" price type is not supported at this time.
pretty_ts : bool, default True
If all timestamp columns should be converted from UNIX nanosecond
`int` to tz-aware UTC `pyarrow.TimestampType`.
map_symbols : bool, default True
If symbology mappings from the metadata should be used to create
a 'symbol' column, mapping the instrument ID to its requested symbol for
every record.
schema : Schema or str, optional
The DBN schema for the parquet file.
This is only required when reading a DBN stream with mixed record types.
Raises
------
ValueError
If an incorrect price type is specified.
If the DBN schema is unspecified and cannot be determined.
"""
if price_type == "decimal":
raise ValueError("the 'decimal' price type is not currently supported")

schema = validate_maybe_enum(schema, Schema, "schema")
if schema is None:
if self.schema is None:
raise ValueError("a schema must be specified for mixed DBN data")
schema = self.schema

dataframe_iter = self.to_df(
price_type=price_type,
pretty_ts=pretty_ts,
map_symbols=map_symbols,
schema=schema,
count=PARQUET_CHUNK_SIZE,
)

writer = None
try:
for frame in dataframe_iter:
if writer is None:
# Initialize the writer using the first DataFrame
parquet_schema = pa.Schema.from_pandas(frame)
writer = pq.ParquetWriter(
where=path,
schema=parquet_schema,
**kwargs,
)
writer.write_table(
pa.Table.from_pandas(
frame,
schema=parquet_schema,
),
)
finally:
if writer is not None:
writer.close()

def to_file(self, path: Path | str) -> None:
"""
Write the data to a DBN file at the given path.
Expand Down Expand Up @@ -972,18 +1048,14 @@ def to_json(
compression : Compression or str, default `Compression.NONE`
The output compression for writing.
schema : Schema or str, optional
The schema for the json.
The DBN schema for the json.
This is only required when reading a DBN stream with mixed record types.
Raises
------
ValueError
If the schema for the array cannot be determined.
Notes
-----
Requires all the data to be brought up into memory to then be written.
"""
compression = validate_enum(compression, Compression, "compression")
schema = validate_maybe_enum(schema, Schema, "schema")
Expand Down Expand Up @@ -1030,7 +1102,7 @@ def to_ndarray(
Parameters
----------
schema : Schema or str, optional
The schema for the array.
The DBN schema for the array.
This is only required when reading a DBN stream with mixed record types.
count : int, optional
If set, instead of returning a single `np.ndarray` a `NDArrayIterator`
Expand All @@ -1047,7 +1119,7 @@ def to_ndarray(
Raises
------
ValueError
If the schema for the array cannot be determined.
If the DBN schema is unspecified and cannot be determined.
"""
schema = validate_maybe_enum(schema, Schema, "schema")
Expand Down Expand Up @@ -1329,8 +1401,7 @@ def _format_px(
if price_type == "decimal":
for field in px_fields:
df[field] = (
df[field].replace(INT64_NULL, np.nan).apply(decimal.Decimal)
/ FIXED_PRICE_SCALE
df[field].replace(INT64_NULL, np.nan).apply(decimal.Decimal) / FIXED_PRICE_SCALE
)
elif price_type == "float":
for field in px_fields:
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ numpy = [
{version = "^1.26.0", python = "^3.12"}
]
pandas = ">=1.5.3"
pyarrow = ">=13.0.0"
requests = ">=2.24.0"
zstandard = ">=0.21.0"

Expand Down
72 changes: 72 additions & 0 deletions tests/test_historical_bento.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from unittest.mock import MagicMock

import databento
import databento.common.dbnstore
import numpy as np
import pandas as pd
import pytest
Expand Down Expand Up @@ -439,6 +440,77 @@ def test_to_df_with_price_type_handles_null(
assert all(np.isnan(df_pretty["strike_price"]))


@pytest.mark.parametrize(
"dataset",
[
Dataset.GLBX_MDP3,
Dataset.XNAS_ITCH,
Dataset.OPRA_PILLAR,
Dataset.DBEQ_BASIC,
],
)
@pytest.mark.parametrize(
"schema",
[pytest.param(schema, id=str(schema)) for schema in Schema.variants()],
)
@pytest.mark.parametrize(
"price_type",
[
"fixed",
"float",
],
)
@pytest.mark.parametrize(
"pretty_ts",
[
True,
False,
],
)
@pytest.mark.parametrize(
"map_symbols",
[
True,
False,
],
)
def test_to_parquet(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
test_data: Callable[[Dataset, Schema], bytes],
dataset: Dataset,
schema: Schema,
price_type: Literal["fixed", "float"],
pretty_ts: bool,
map_symbols: bool,
) -> None:
# Arrange
monkeypatch.setattr(databento.common.dbnstore, "PARQUET_CHUNK_SIZE", 1)
stub_data = test_data(dataset, schema)
data = DBNStore.from_bytes(data=stub_data)
parquet_file = tmp_path / "test.parquet"

# Act
expected = data.to_df(
price_type=price_type,
pretty_ts=pretty_ts,
map_symbols=map_symbols,
)
data.to_parquet(
parquet_file,
price_type=price_type,
pretty_ts=pretty_ts,
map_symbols=map_symbols,
)
actual = pd.read_parquet(parquet_file)

# Replace None values with np.nan
actual.fillna(value=np.nan)

# Assert
pd.testing.assert_frame_equal(actual, expected)


@pytest.mark.parametrize(
"expected_schema",
[pytest.param(schema, id=str(schema)) for schema in Schema.variants()],
Expand Down

0 comments on commit d1dd6d5

Please sign in to comment.