Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vdk-impala: upgrade code to support pydantic 2.0 #2362

Merged
merged 1 commit into from
Jul 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ class SlowlyChangingDimensionTypeOverwriteParams(BaseModel):
target_table: str
source_schema: str
source_view: str
check: Optional[Callable[[str], bool]]
staging_schema: Optional[str]
check: Optional[Callable[[str], bool]] = None
staging_schema: Optional[str] = None


class SlowlyChangingDimensionTypeOverwrite(TemplateArgumentsValidator):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ class FactDailySnapshotParams(BaseModel):
target_table: str
source_schema: str
source_view: str
check: Optional[Callable[[str], bool]]
staging_schema: Optional[str]
check: Optional[Callable[[str], bool]] = None
staging_schema: Optional[str] = None


class FactDailySnapshot(TemplateArgumentsValidator):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ class FactDailySnapshotParams(BaseModel):
source_schema: str
source_view: str
last_arrival_ts: str
check: Optional[Callable[[str], bool]]
staging_schema: Optional[str]
check: Optional[Callable[[str], bool]] = None
staging_schema: Optional[str] = None


class FactDailySnapshot(TemplateArgumentsValidator):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
from typing import List

from pydantic import BaseModel
from pydantic import validator
from pydantic import field_validator
from pydantic import FieldValidationInfo
from vdk.api.job_input import IJobInput
from vdk.plugin.impala.templates.template_arguments_validator import (
TemplateArgumentsValidator,
Expand All @@ -24,9 +25,11 @@ class LoadVersionedParams(BaseModel):
active_to_column: str = "active_to"
active_to_max_value: str = "9999-12-31"

@validator("tracked_columns", allow_reuse=True)
def passwords_match(cls, tracked_columns, values, **kwargs):
value_columns = values.get("value_columns")
@field_validator("tracked_columns")
def tracked_columns_subset_of_value_columns(
cls, tracked_columns: List[str], values: FieldValidationInfo, **kwargs
):
value_columns = values.data["value_columns"]
if type(value_columns) == list and not tracked_columns:
raise ValueError("The list must contain at least one column")
if type(value_columns) == list == type(value_columns) and not set(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import pytest
from vdk.internal.core import errors
from vdk.plugin.impala import impala_plugin
from vdk.plugin.test_utils.util_funcs import cli_assert
from vdk.plugin.test_utils.util_funcs import CliEntryBasedTestRunner
from vdk.plugin.test_utils.util_funcs import get_test_job_path

Expand Down Expand Up @@ -51,7 +52,7 @@ def test_load_dimension_scd1(self) -> None:
"target_table": target_table,
},
)
assert not res.exception
cli_assert(not res.exception, res)
actual_rs = self._run_query(f"SELECT * FROM {test_schema}.{target_table}")
expected_rs = self._run_query(f"SELECT * FROM {test_schema}.{source_view}")
assert actual_rs.output and expected_rs.output
Expand All @@ -71,7 +72,7 @@ def test_load_dimension_scd1_partitioned(self) -> None:
"target_table": target_table,
},
)
assert not res.exception
cli_assert(not res.exception, res)

actual_rs = self._run_query(f"SELECT * FROM {test_schema}.{target_table}")
expected_rs = self._run_query(f"SELECT * FROM {test_schema}.{source_view}")
Expand Down Expand Up @@ -127,7 +128,7 @@ def test_load_dimension_scd1_checks_positive(self) -> None:
},
)

assert not res.exception
cli_assert(not res.exception, res)
actual_rs = self._run_query(f"SELECT * FROM {test_schema}.{target_table}")
expected_rs = self._run_query(f"SELECT * FROM {test_schema}.{source_view}")
assert actual_rs.output and expected_rs.output
Expand Down Expand Up @@ -180,7 +181,7 @@ def test_load_dimension_scd2(self) -> None:
"id_column": "id",
},
)
assert not res.exception
cli_assert(not res.exception, res)

actual_rs = self._run_query(f"SELECT * FROM {test_schema}.{target_table}")
expected_rs = self._run_query(f"SELECT * FROM {test_schema}.{expect_table}")
Expand Down Expand Up @@ -259,7 +260,7 @@ def test_load_versioned(self) -> None:
"updated_at_column": "updated_at",
},
)
assert not res.exception
cli_assert(not res.exception, res)

actual_rs = self._run_query(f"SELECT * FROM {test_schema}.{target_table}")
expected_rs = self._run_query(f"SELECT * FROM {test_schema}.{expect_table}")
Expand Down Expand Up @@ -308,7 +309,7 @@ def test_load_versioned_partitioned(self) -> None:
"updated_at_column": "updated_at",
},
)
assert not res.exception
cli_assert(not res.exception, res)

actual_rs = self._run_query(f"SELECT * FROM {test_schema}.{target_table}")
expected_rs = self._run_query(f"SELECT * FROM {test_schema}.{expect_table}")
Expand Down Expand Up @@ -434,7 +435,7 @@ def test_load_fact_snapshot(self) -> None:
"last_arrival_ts": "updated_at",
},
)
assert not res.exception
cli_assert(not res.exception, res)

actual_rs = self._run_query(f"SELECT * FROM {test_schema}.{target_table}")
expected_rs = self._run_query(f"SELECT * FROM {test_schema}.{expect_table}")
Expand Down Expand Up @@ -466,7 +467,7 @@ def test_load_fact_snapshot_empty_source(self) -> None:
},
)
# Expecting data job not to finish due to empty source.
assert not res.exception
cli_assert(not res.exception, res)
assert res.exit_code == 0

actual_rs = self._run_query(f"SELECT * FROM {test_schema}.{target_table}")
Expand Down Expand Up @@ -498,7 +499,7 @@ def test_load_fact_snapshot_partition(self) -> None:
"last_arrival_ts": "updated_at",
},
)
assert not res.exception
cli_assert(not res.exception, res)

actual_rs = self._run_query(f"SELECT * FROM {test_schema}.{target_table}")
expected_rs = self._run_query(f"SELECT * FROM {test_schema}.{expect_table}")
Expand Down Expand Up @@ -560,7 +561,7 @@ def test_load_fact_snapshot_checks_positive(self) -> None:
},
)

assert not res.exception
cli_assert(not res.exception, res)
actual_rs = self._run_query(f"SELECT * FROM {test_schema}.{target_table}")
expected_rs = self._run_query(f"SELECT * FROM {test_schema}.{expect_table}")
assert actual_rs.output and expected_rs.output
Expand Down Expand Up @@ -724,7 +725,7 @@ def test_insert(self) -> None:
"expect_table": expect_table,
},
)
assert not res.exception
cli_assert(not res.exception, res)

actual_rs = self._run_query(f"SELECT * FROM {test_schema}.{target_table}")
expected_rs = self._run_query(f"SELECT * FROM {test_schema}.{expect_table}")
Expand Down Expand Up @@ -754,7 +755,7 @@ def test_insert_partition(self) -> None:
"expect_table": expect_table,
},
)
assert not res.exception
cli_assert(not res.exception, res)

actual_rs = self._run_query(f"SELECT * FROM {test_schema}.{target_table}")
expected_rs = self._run_query(f"SELECT * FROM {test_schema}.{expect_table}")
Expand Down Expand Up @@ -788,7 +789,7 @@ def test_insert_checks_positive(self) -> None:
},
)

assert not res.exception
cli_assert(not res.exception, res)
actual_rs = self._run_query(f"SELECT * FROM {test_schema}.{target_table}")
expected_rs = self._run_query(f"SELECT * FROM {test_schema}.{expect_table}")
assert actual_rs.output and expected_rs.output
Expand Down