Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop Python 3.8, add Python 3.11 and 3.12, and use dd.from_map #81

Merged
merged 8 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/pre-commit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ jobs:
name: "pre-commit hooks"
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
- uses: pre-commit/action@v2.0.0
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
- uses: pre-commit/action@v3.0.1
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
fail-fast: false
matrix:
os: ["windows-latest", "ubuntu-latest", "macos-latest"]
python-version: ["3.8", "3.9", "3.10"]
python-version: ["3.9", "3.10", "3.11", "3.12"]

steps:
- name: Checkout source
Expand Down
5 changes: 4 additions & 1 deletion ci/environment-3.10.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ channels:
- nodefaults
dependencies:
- python=3.10
- dask
- dask >=2024.3.0
- distributed
- pandas
- pyarrow
- pytest
- gcsfs
- google-cloud-bigquery>=2.11.0
- google-cloud-bigquery-storage
- pip
- pip:
- git+https://github.com/dask/dask
7 changes: 5 additions & 2 deletions ci/environment-3.8.yaml → ci/environment-3.11.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ channels:
- conda-forge
- nodefaults
dependencies:
- python=3.8
- dask
- python=3.11
- dask >=2024.3.0
- distributed
- pandas
- pyarrow
- pytest
- gcsfs
- google-cloud-bigquery>=2.11.0
- google-cloud-bigquery-storage
- pip
- pip:
- git+https://github.com/dask/dask
Comment on lines +16 to +17
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because we need dask/dask#11233 for some tests to pass. A little unfortunate. Maybe we can change tests to avoid the timezone issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also remove it again after the release tomorrow, so shouldn't be an issue

17 changes: 17 additions & 0 deletions ci/environment-3.12.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
name: test-environment
channels:
- conda-forge
- nodefaults
dependencies:
- python=3.12
- dask >=2024.3.0
- distributed
- pandas
- pyarrow
- pytest
- gcsfs
- google-cloud-bigquery>=2.11.0
- google-cloud-bigquery-storage
- pip
- pip:
- git+https://github.com/dask/dask
5 changes: 4 additions & 1 deletion ci/environment-3.9.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ channels:
- nodefaults
dependencies:
- python=3.9
- dask
- dask >=2024.3.0
- distributed
- pandas
- pyarrow
- pytest
- gcsfs
- google-cloud-bigquery>=2.11.0
- google-cloud-bigquery-storage
- pip
- pip:
- git+https://github.com/dask/dask
26 changes: 5 additions & 21 deletions dask_bigquery/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@
from contextlib import contextmanager
from functools import partial

import dask.dataframe as dd
import gcsfs
import pandas as pd
import pyarrow
from dask.base import tokenize
from dask.dataframe.core import new_dd_object
from dask.highlevelgraph import HighLevelGraph
from dask.layers import DataFrameIOLayer
from google.api_core import client_info as rest_client_info
from google.api_core import exceptions
from google.api_core.gapic_v1 import client_info as grpc_client_info
Expand Down Expand Up @@ -206,19 +204,7 @@ def make_create_read_session_request():
)
meta = schema.empty_table().to_pandas(**arrow_options)

label = "read-gbq-"
output_name = label + tokenize(
project_id,
dataset_id,
table_id,
row_filter,
read_kwargs,
)

layer = DataFrameIOLayer(
output_name,
meta.columns,
[stream.name for stream in session.streams],
return dd.from_map(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you pass the label into from_map to make the task prefix more descriptive?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

partial(
bigquery_read,
make_create_read_session_request=make_create_read_session_request,
Expand All @@ -227,12 +213,10 @@ def make_create_read_session_request():
arrow_options=arrow_options,
credentials=credentials,
),
label=label,
[stream.name for stream in session.streams],
meta=meta,
label="read-bigquery",
)
divisions = tuple([None] * (len(session.streams) + 1))

graph = HighLevelGraph({output_name: layer}, {output_name: set()})
return new_dd_object(graph, output_name, meta, divisions)


def to_gbq(
Expand Down
56 changes: 27 additions & 29 deletions dask_bigquery/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,13 @@ def df():
for i in range(10)
]

yield pd.DataFrame(records)
df = pd.DataFrame(records)
df["timestamp"] = df["timestamp"].astype("datetime64[us, UTC]")
yield df
Comment on lines +38 to +40
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a us cast here. Previously this has ns time resolution. From what I can tell bigquery only stores timestamps up to us resolution (see docs https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#timestamp_type and stack overflow https://stackoverflow.com/a/44307611). Without this cast, assert_eq starts raising due to timestamp resolution mismatches.

I'll admit I'm a bit stumped here. Clearly this used to work in the past somehow.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tswast maybe you have a sense for any recent changes, or if I'm just wrong here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started hitting similar issues at some point when I bumped pandas versions, could you be running a different version here than in the past?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We noticed some weirdness around Pandas 2.0 where we started getting microsecond precision back.

BigQuery itself hasn't change AFAIK. We should always respond with us precision in the Arrow we return. I think it's just what pandas does with that now that's changed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah pandas started supporting non nanosecond resolution with 2.0

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm okay, thanks all for clarifying. I'm inclined to just go with the small test change here. We can always handle things in a follow-up PR as needed.



@pytest.fixture(scope="module")
def dataset():
project_id = os.environ.get("DASK_BIGQUERY_PROJECT_ID")
if not project_id:
credentials, project_id = google.auth.default()
def dataset(project_id):
dataset_id = f"{sys.platform}_{uuid.uuid4().hex}"

with bigquery.Client() as bq_client:
Expand Down Expand Up @@ -110,25 +109,30 @@ def required_partition_filter_table(dataset, df):
yield project_id, dataset_id, table_id


@pytest.fixture(scope="module")
def project_id():
project_id = os.environ.get("DASK_BIGQUERY_PROJECT_ID")
if not project_id:
_, project_id = google.auth.default()

yield project_id


@pytest.fixture
def google_creds():
env_creds_file = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS")
if env_creds_file:
credentials = json.load(open(env_creds_file))
else:
if os.environ.get("GOOGLE_APPLICATION_CREDENTIALS"):
credentials = json.load(open(os.environ.get("GOOGLE_APPLICATION_CREDENTIALS")))
elif os.environ.get("DASK_BIGQUERY_GCP_CREDENTIALS"):
credentials = json.loads(os.environ.get("DASK_BIGQUERY_GCP_CREDENTIALS"))
else:
credentials, _ = google.auth.default()

yield credentials
Comment on lines +112 to 130
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes keep the current testing setup behavior (i.e. support DASK_BIGQUERY_GCP_CREDENTIALS, DASK_BIGQUERY_PROJECT_ID, GOOGLE_APPLICATION_CREDENTIALS environment variables), but now fallback to Google's default auth if those aren't set (a better experience IMO).

This made it more straightforward for me to run things locally, while also not rocking the boat too much with our current CI setup.



@pytest.fixture
def bucket(google_creds):
project_id = google_creds["project_id"]
env_project_id = os.environ.get("DASK_BIGQUERY_PROJECT_ID")
if env_project_id:
project_id = env_project_id

def bucket(google_creds, project_id):
bucket = f"dask-bigquery-tmp-{uuid.uuid4().hex}"

fs = gcsfs.GCSFileSystem(
project=project_id, access="read_write", token=google_creds
)
Expand All @@ -140,12 +144,7 @@ def bucket(google_creds):


@pytest.fixture
def write_dataset(google_creds):
project_id = google_creds["project_id"]
env_project_id = os.environ.get("DASK_BIGQUERY_PROJECT_ID")
if env_project_id:
project_id = env_project_id

def write_dataset(google_creds, project_id):
dataset_id = f"{sys.platform}_{uuid.uuid4().hex}"

yield google_creds, project_id, dataset_id, None
Expand All @@ -158,8 +157,7 @@ def write_dataset(google_creds):


@pytest.fixture
def write_existing_dataset(google_creds):
project_id = os.environ.get("DASK_BIGQUERY_PROJECT_ID", google_creds["project_id"])
def write_existing_dataset(google_creds, project_id):
dataset_id = "persistent_dataset"
table_id = f"table_to_write_{sys.platform}_{uuid.uuid4().hex}"

Expand All @@ -181,7 +179,7 @@ def write_existing_dataset(google_creds):
[
("name", pa.string()),
("number", pa.uint8()),
("timestamp", pa.timestamp("ns")),
("timestamp", pa.timestamp("us")),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corresponding change given the us resolution

("idx", pa.uint8()),
]
),
Expand Down Expand Up @@ -285,14 +283,14 @@ def test_roundtrip(df, dataset_fixture, request):

ddf_out = read_gbq(project_id=project_id, dataset_id=dataset_id, table_id=table_id)
# bigquery does not guarantee ordering, so let's reindex
assert_eq(ddf.set_index("idx"), ddf_out.set_index("idx"))
assert_eq(ddf.set_index("idx"), ddf_out.set_index("idx"), check_divisions=False)


def test_read_gbq(df, table, client):
project_id, dataset_id, table_id = table
ddf = read_gbq(project_id=project_id, dataset_id=dataset_id, table_id=table_id)

assert list(ddf.columns) == ["name", "number", "timestamp", "idx"]
assert list(df.columns) == list(ddf.columns)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just generalizing a bit to make things more resilient to changing column names in the test DataFrame.

assert assert_eq(ddf.set_index("idx"), df.set_index("idx"))


Expand All @@ -305,7 +303,7 @@ def test_read_row_filter(df, table, client):
row_filter="idx < 5",
)

assert list(ddf.columns) == ["name", "number", "timestamp", "idx"]
assert list(df.columns) == list(ddf.columns)
assert assert_eq(ddf.set_index("idx").loc[:4], df.set_index("idx").loc[:4])


Expand Down Expand Up @@ -361,7 +359,7 @@ def test_read_gbq_credentials(df, dataset_fixture, request, monkeypatch):
credentials=credentials,
)

assert list(ddf.columns) == ["name", "number", "timestamp", "idx"]
assert list(df.columns) == list(ddf.columns)
assert assert_eq(ddf.set_index("idx"), df.set_index("idx"))


Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
dask
dask>=2024.3.0
gcsfs
google-cloud-bigquery >= 2.11.0
google-cloud-bigquery-storage
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
packages=["dask_bigquery"],
long_description=long_description,
long_description_content_type="text/markdown",
python_requires=">=3.8",
python_requires=">=3.9",
install_requires=open("requirements.txt").read().strip().split("\n"),
extras_require={"test": ["pytest", "distributed", "google-auth>=1.30.0"]},
include_package_data=True,
Expand Down
Loading