From b3c220943d16a64ff86137743ad660f04d1e9025 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Mon, 27 Nov 2023 14:39:22 -0800 Subject: [PATCH] fixed lint and mypy --- google/cloud/bigtable/data/_async/client.py | 6 +- .../bigtable/data/_metrics/data_model.py | 57 +++-- .../bigtable/data/_metrics/handlers/_base.py | 4 +- .../data/_metrics/handlers/opentelemetry.py | 12 +- .../data/_metrics/metrics_controller.py | 4 +- tests/unit/data/_async/test_client.py | 4 +- .../handlers/test_handler_opentelemetry.py | 112 ++++++--- tests/unit/data/_metrics/test_data_model.py | 236 ++++++++++++------ .../data/_metrics/test_metrics_controller.py | 31 ++- 9 files changed, 326 insertions(+), 140 deletions(-) diff --git a/google/cloud/bigtable/data/_async/client.py b/google/cloud/bigtable/data/_async/client.py index a5e6ce552..b9cfc317b 100644 --- a/google/cloud/bigtable/data/_async/client.py +++ b/google/cloud/bigtable/data/_async/client.py @@ -1018,7 +1018,7 @@ def on_error_fn(exc): metadata = _make_metadata(self.table_name, self.app_profile_id) # trigger rpc await deadline_wrapped( - row_key=row_key.encode("utf-8") if isinstance(row_key, str) else row_key, + row_key=row_key.encode() if isinstance(row_key, str) else row_key, mutations=[mutation._to_pb() for mutation in mutations_list], table_name=self.table_name, app_profile_id=self.app_profile_id, @@ -1141,7 +1141,7 @@ async def check_and_mutate_row( true_mutations=true_case_list, false_mutations=false_case_list, predicate_filter=predicate._to_pb() if predicate is not None else None, - row_key=row_key.encode("utf-8") if isinstance(row_key, str) else row_key, + row_key=row_key.encode() if isinstance(row_key, str) else row_key, table_name=self.table_name, app_profile_id=self.app_profile_id, metadata=metadata, @@ -1199,7 +1199,7 @@ async def read_modify_write_row( result = await metric_wrapped( rules=[rule._to_pb() for rule in rules], - row_key=row_key.encode("utf-8") if isinstance(row_key, str) else row_key, + row_key=row_key.encode() if isinstance(row_key, str) else row_key, table_name=self.table_name, app_profile_id=self.app_profile_id, metadata=metadata, diff --git a/google/cloud/bigtable/data/_metrics/data_model.py b/google/cloud/bigtable/data/_metrics/data_model.py index 32d92e2e4..dff3c4567 100644 --- a/google/cloud/bigtable/data/_metrics/data_model.py +++ b/google/cloud/bigtable/data/_metrics/data_model.py @@ -13,7 +13,7 @@ # limitations under the License. from __future__ import annotations -from typing import Callable, Any, TYPE_CHECKING +from typing import Callable, Any, cast, TYPE_CHECKING import time import os @@ -21,7 +21,6 @@ import logging from enum import Enum -from uuid import uuid4 from dataclasses import dataclass from dataclasses import field from grpc import StatusCode @@ -29,12 +28,13 @@ import google.cloud.bigtable.data.exceptions as bt_exceptions if TYPE_CHECKING: - from uuid import UUID from google.cloud.bigtable.data._metrics.handlers._base import MetricsHandler ALLOW_METRIC_EXCEPTIONS = os.getenv("BIGTABLE_METRICS_EXCEPTIONS", False) -LOGGER = logging.getLogger(__name__) if os.getenv("BIGTABLE_METRICS_LOGS", False) else None +LOGGER = ( + logging.getLogger(__name__) if os.getenv("BIGTABLE_METRICS_LOGS", False) else None +) DEFAULT_ZONE = "global" DEFAULT_CLUSTER_ID = "unspecified" @@ -60,6 +60,7 @@ class OperationType(Enum): class OperationState(Enum): """Enum for the state of the active operation.""" + CREATED = 0 ACTIVE_ATTEMPT = 1 BETWEEN_ATTEMPTS = 2 @@ -153,8 +154,13 @@ def start_attempt(self) -> None: If the operation was completed or there is already an active attempt, will raise an exception or warning based on the value of ALLOW_METRIC_EXCEPTIONS. """ - if self.state != OperationState.BETWEEN_ATTEMPTS and self.state != OperationState.CREATED: - return self._handle_error(INVALID_STATE_ERROR.format("start_attempt", self.state)) + if ( + self.state != OperationState.BETWEEN_ATTEMPTS + and self.state != OperationState.CREATED + ): + return self._handle_error( + INVALID_STATE_ERROR.format("start_attempt", self.state) + ) self.active_attempt = ActiveAttemptMetric() @@ -171,10 +177,13 @@ def add_response_metadata(self, metadata: dict[str, bytes | str]) -> None: - metadata: the metadata as extracted from the grpc call """ if self.state != OperationState.ACTIVE_ATTEMPT: - return self._handle_error(INVALID_STATE_ERROR.format("add_response_metadata", self.state)) + return self._handle_error( + INVALID_STATE_ERROR.format("add_response_metadata", self.state) + ) if self.cluster_id is None or self.zone is None: - bigtable_metadata = metadata.get(BIGTABLE_METADATA_KEY) + # BIGTABLE_METADATA_KEY should give a binary string with cluster_id and zone + bigtable_metadata = cast(bytes, metadata.get(BIGTABLE_METADATA_KEY)) if bigtable_metadata: try: decoded = "".join( @@ -185,11 +194,14 @@ def add_response_metadata(self, metadata: dict[str, bytes | str]) -> None: self.cluster_id = split_data[0] self.zone = split_data[1] except (AttributeError, IndexError): - self._handle_error(f"Failed to decode {BIGTABLE_METADATA_KEY} metadata: {bigtable_metadata}") - timing_header = metadata.get(SERVER_TIMING_METADATA_KEY) + self._handle_error( + f"Failed to decode {BIGTABLE_METADATA_KEY} metadata: {bigtable_metadata!r}" + ) + # SERVER_TIMING_METADATA_KEY should give a string with the server-latency headers + timing_header = cast(str, metadata.get(SERVER_TIMING_METADATA_KEY)) if timing_header: timing_data = SERVER_TIMING_REGEX.match(timing_header) - if timing_data: + if timing_data and self.active_attempt: # convert from milliseconds to seconds self.active_attempt.gfe_latency = float(timing_data.group(1)) / 1000 @@ -202,9 +214,11 @@ def attempt_first_response(self) -> None: active attempt already has a first response time, will raise an exception or warning based on the value of ALLOW_METRIC_EXCEPTIONS. """ - if self.state != OperationState.ACTIVE_ATTEMPT: - return self._handle_error(INVALID_STATE_ERROR.format("attempt_first_response", self.state)) - elif self.active_attempt.first_response_latency is not None: + if self.state != OperationState.ACTIVE_ATTEMPT or self.active_attempt is None: + return self._handle_error( + INVALID_STATE_ERROR.format("attempt_first_response", self.state) + ) + if self.active_attempt.first_response_latency is not None: return self._handle_error("Attempt already received first response") self.active_attempt.first_response_latency = ( time.monotonic() - self.active_attempt.start_time @@ -220,8 +234,10 @@ def end_attempt_with_status(self, status: StatusCode | Exception) -> None: Args: - status: The status of the attempt. """ - if self.state != OperationState.ACTIVE_ATTEMPT: - return self._handle_error(INVALID_STATE_ERROR.format("end_attempt_with_status", self.state)) + if self.state != OperationState.ACTIVE_ATTEMPT or self.active_attempt is None: + return self._handle_error( + INVALID_STATE_ERROR.format("end_attempt_with_status", self.state) + ) new_attempt = CompletedAttemptMetric( start_time=self.active_attempt.start_time, @@ -251,8 +267,12 @@ def end_with_status(self, status: StatusCode | Exception) -> None: - status: The status of the operation. """ if self.state == OperationState.COMPLETED: - return self._handle_error(INVALID_STATE_ERROR.format("end_with_status", self.state)) - final_status = self._exc_to_status(status) if isinstance(status, Exception) else status + return self._handle_error( + INVALID_STATE_ERROR.format("end_with_status", self.state) + ) + final_status = ( + self._exc_to_status(status) if isinstance(status, Exception) else status + ) if self.state == OperationState.ACTIVE_ATTEMPT: self.end_attempt_with_status(final_status) self.was_completed = True @@ -396,6 +416,7 @@ def wrap_attempt_fn( grpc function, and will automatically extract trailing_metadata from the Call object on success. """ + async def wrapped_fn(*args, **kwargs): encountered_exc: Exception | None = None call = None diff --git a/google/cloud/bigtable/data/_metrics/handlers/_base.py b/google/cloud/bigtable/data/_metrics/handlers/_base.py index 2d69ed3d5..72f5aa550 100644 --- a/google/cloud/bigtable/data/_metrics/handlers/_base.py +++ b/google/cloud/bigtable/data/_metrics/handlers/_base.py @@ -29,5 +29,7 @@ def __init__(self, **kwargs): def on_operation_complete(self, op: CompletedOperationMetric) -> None: pass - def on_attempt_complete(self, attempt: CompletedAttemptMetric, op: ActiveOperationMetric) -> None: + def on_attempt_complete( + self, attempt: CompletedAttemptMetric, op: ActiveOperationMetric + ) -> None: pass diff --git a/google/cloud/bigtable/data/_metrics/handlers/opentelemetry.py b/google/cloud/bigtable/data/_metrics/handlers/opentelemetry.py index e9f303ba9..14cd6913a 100644 --- a/google/cloud/bigtable/data/_metrics/handlers/opentelemetry.py +++ b/google/cloud/bigtable/data/_metrics/handlers/opentelemetry.py @@ -28,8 +28,9 @@ class _OpenTelemetryInstrumentSingleton: Singleton class that holds OpenTelelmetry instrument objects, so that multiple Tables can write to the same metrics. """ + def __new__(cls): - if not hasattr(cls, 'instance'): + if not hasattr(cls, "instance"): cls.instance = super(_OpenTelemetryInstrumentSingleton, cls).__new__(cls) return cls.instance @@ -66,6 +67,7 @@ def __init__(self): description="A count of the number of attempts that failed to reach Google's network.", ) + class OpenTelemetryMetricsHandler(MetricsHandler): """ Maintains a set of OpenTelemetry metrics for the Bigtable client library, @@ -120,7 +122,9 @@ def on_operation_complete(self, op: CompletedOperationMetric) -> None: self.otel.operation_latencies.record(op.duration, labels) self.otel.retry_count.add(len(op.completed_attempts) - 1, labels) - def on_attempt_complete(self, attempt: CompletedAttemptMetric, op: ActiveOperationMetric): + def on_attempt_complete( + self, attempt: CompletedAttemptMetric, op: ActiveOperationMetric + ): """ Update the metrics associated with a completed attempt: - attempt_latencies @@ -135,7 +139,9 @@ def on_attempt_complete(self, attempt: CompletedAttemptMetric, op: ActiveOperati **self.shared_labels, } - self.otel.attempt_latencies.record(attempt.end_time-attempt.start_time, labels) + self.otel.attempt_latencies.record( + attempt.end_time - attempt.start_time, labels + ) if ( op.op_type == OperationType.READ_ROWS and attempt.first_response_latency is not None diff --git a/google/cloud/bigtable/data/_metrics/metrics_controller.py b/google/cloud/bigtable/data/_metrics/metrics_controller.py index e9b4b2dcb..eb4ab9726 100644 --- a/google/cloud/bigtable/data/_metrics/metrics_controller.py +++ b/google/cloud/bigtable/data/_metrics/metrics_controller.py @@ -63,7 +63,9 @@ def add_handler(self, handler: MetricsHandler) -> None: """ self.handlers.append(handler) - def create_operation(self, op_type:OperationType, **kwargs) -> ActiveOperationMetric: + def create_operation( + self, op_type: OperationType, **kwargs + ) -> ActiveOperationMetric: """ Creates a new operation and registers it with the subscribed handlers. """ diff --git a/tests/unit/data/_async/test_client.py b/tests/unit/data/_async/test_client.py index 3773326f3..76bb15a98 100644 --- a/tests/unit/data/_async/test_client.py +++ b/tests/unit/data/_async/test_client.py @@ -1024,7 +1024,9 @@ async def test_table_ctor(self): from google.cloud.bigtable.data._async.client import BigtableDataClientAsync from google.cloud.bigtable.data._async.client import TableAsync from google.cloud.bigtable.data._async.client import _WarmedInstanceKey - from google.cloud.bigtable.data._metrics import BigtableClientSideMetricsController + from google.cloud.bigtable.data._metrics import ( + BigtableClientSideMetricsController, + ) from google.cloud.bigtable.data._metrics import OpenTelemetryMetricsHandler expected_table_id = "table-id" diff --git a/tests/unit/data/_metrics/handlers/test_handler_opentelemetry.py b/tests/unit/data/_metrics/handlers/test_handler_opentelemetry.py index c088db1c7..652d97925 100644 --- a/tests/unit/data/_metrics/handlers/test_handler_opentelemetry.py +++ b/tests/unit/data/_metrics/handlers/test_handler_opentelemetry.py @@ -21,52 +21,66 @@ class Test_OpenTelemetryInstrumentSingleton: - def test_singleton(self): """ Should be able to create multiple instances that map to the same singleton """ - from google.cloud.bigtable.data._metrics.handlers.opentelemetry import _OpenTelemetryInstrumentSingleton + from google.cloud.bigtable.data._metrics.handlers.opentelemetry import ( + _OpenTelemetryInstrumentSingleton, + ) + instance1 = _OpenTelemetryInstrumentSingleton() instance2 = _OpenTelemetryInstrumentSingleton() assert instance1 is instance2 class TestOpenTelemetryMetricsHandler: - def setup_otel(self): """ Create a concrete MeterProvider in the environment """ from opentelemetry import metrics from opentelemetry.sdk.metrics import MeterProvider + metrics.set_meter_provider(MeterProvider()) def _make_one(self, **kwargs): from google.cloud.bigtable.data._metrics import OpenTelemetryMetricsHandler + if not kwargs: # create defaults - kwargs = {"project_id": "p", "instance_id": "i", "table_id": "t", "app_profile_id": "a"} + kwargs = { + "project_id": "p", + "instance_id": "i", + "table_id": "t", + "app_profile_id": "a", + } self.setup_otel() return OpenTelemetryMetricsHandler(**kwargs) - @pytest.mark.parametrize("metric_name,kind", [ - ("operation_latencies", "histogram"), - ("first_response_latencies", "histogram"), - ("attempt_latencies", "histogram"), - ("retry_count", "count"), - ("server_latencies", "histogram"), - ("connectivity_error_count", "count"), - # ("application_latencies", "histogram"), - # ("throttling_latencies", "histogram"), - ]) + @pytest.mark.parametrize( + "metric_name,kind", + [ + ("operation_latencies", "histogram"), + ("first_response_latencies", "histogram"), + ("attempt_latencies", "histogram"), + ("retry_count", "count"), + ("server_latencies", "histogram"), + ("connectivity_error_count", "count"), + # ("application_latencies", "histogram"), + # ("throttling_latencies", "histogram"), + ], + ) def test_ctor_creates_metrics(self, metric_name, kind): """ Make sure each expected metric is created """ from opentelemetry.metrics import Counter from opentelemetry.metrics import Histogram - from google.cloud.bigtable.data._metrics.handlers.opentelemetry import _OpenTelemetryInstrumentSingleton + from google.cloud.bigtable.data._metrics.handlers.opentelemetry import ( + _OpenTelemetryInstrumentSingleton, + ) + instance = self._make_one() assert instance.otel is _OpenTelemetryInstrumentSingleton() metric = getattr(instance.otel, metric_name) @@ -82,6 +96,7 @@ def test_ctor_labels(self): should create dicts with with client name and uid, and shared labels """ from google.cloud.bigtable import __version__ + expected_project = "p" expected_instance = "i" expected_table = "t" @@ -125,25 +140,35 @@ def ctor_defaults(self): assert "app_profile" not in instance.shared_labels assert len(instance.shared_labels) == 2 - @pytest.mark.parametrize("metric_name,kind", [ - ("first_response_latencies", "histogram"), - ("attempt_latencies", "histogram"), - ("server_latencies", "histogram"), - ("connectivity_error_count", "count"), - # ("application_latencies", "histogram"), - # ("throttling_latencies", "histogram"), - ]) + @pytest.mark.parametrize( + "metric_name,kind", + [ + ("first_response_latencies", "histogram"), + ("attempt_latencies", "histogram"), + ("server_latencies", "histogram"), + ("connectivity_error_count", "count"), + # ("application_latencies", "histogram"), + # ("throttling_latencies", "histogram"), + ], + ) def test_attempt_update_labels(self, metric_name, kind): """ test that each attempt metric is sending the set of expected labels """ from google.cloud.bigtable.data._metrics.data_model import OperationType + expected_op_type = OperationType.READ_ROWS expected_status = mock.Mock() expected_streaming = mock.Mock() # server_latencies only shows up if gfe_latency is set gfe_latency = 1 if metric_name == "server_latencies" else None - attempt = CompletedAttemptMetric(start_time=0, end_time=1, end_status=expected_status,gfe_latency=gfe_latency,first_response_latency=1) + attempt = CompletedAttemptMetric( + start_time=0, + end_time=1, + end_status=expected_status, + gfe_latency=gfe_latency, + first_response_latency=1, + ) op = ActiveOperationMetric(expected_op_type, is_streaming=expected_streaming) instance = self._make_one() @@ -162,15 +187,19 @@ def test_attempt_update_labels(self, metric_name, kind): assert k in found_labels assert found_labels[k] == instance.shared_labels[k] - @pytest.mark.parametrize("metric_name,kind", [ - ("operation_latencies", "histogram"), - ("retry_count", "count"), - ]) + @pytest.mark.parametrize( + "metric_name,kind", + [ + ("operation_latencies", "histogram"), + ("retry_count", "count"), + ], + ) def test_operation_update_labels(self, metric_name, kind): """ test that each operation metric is sending the set of expected labels """ from google.cloud.bigtable.data._metrics.data_model import OperationType + expected_op_type = OperationType.READ_ROWS expected_status = mock.Mock() expected_streaming = mock.Mock() @@ -205,7 +234,9 @@ def test_attempt_update_latency(self): update attempt_latencies on attempt completion """ expected_latency = 123 - attempt = CompletedAttemptMetric(start_time=0, end_time=expected_latency, end_status=mock.Mock()) + attempt = CompletedAttemptMetric( + start_time=0, end_time=expected_latency, end_status=mock.Mock() + ) op = ActiveOperationMetric(mock.Mock()) instance = self._make_one() @@ -219,12 +250,20 @@ def test_attempt_update_first_response(self): update first_response_latency on attempt completion """ from google.cloud.bigtable.data._metrics.data_model import OperationType + expected_first_response_latency = 123 - attempt = CompletedAttemptMetric(start_time=0, end_time=1, end_status=mock.Mock(), first_response_latency=expected_first_response_latency) + attempt = CompletedAttemptMetric( + start_time=0, + end_time=1, + end_status=mock.Mock(), + first_response_latency=expected_first_response_latency, + ) op = ActiveOperationMetric(OperationType.READ_ROWS) instance = self._make_one() - with mock.patch.object(instance.otel.first_response_latencies, "record") as record: + with mock.patch.object( + instance.otel.first_response_latencies, "record" + ) as record: instance.on_attempt_complete(attempt, op) assert record.call_count == 1 assert record.call_args[0][0] == expected_first_response_latency @@ -234,7 +273,12 @@ def test_attempt_update_server_latency(self): update server_latency on attempt completion """ expected_latency = 456 - attempt = CompletedAttemptMetric(start_time=0, end_time=expected_latency, end_status=mock.Mock(), gfe_latency=expected_latency) + attempt = CompletedAttemptMetric( + start_time=0, + end_time=expected_latency, + end_status=mock.Mock(), + gfe_latency=expected_latency, + ) op = ActiveOperationMetric(mock.Mock()) instance = self._make_one() @@ -248,7 +292,9 @@ def test_attempt_update_connectivity_error_count(self): update connectivity_error_count on attempt completion """ # error connectivity is logged when gfe_latency is None - attempt = CompletedAttemptMetric(start_time=0, end_time=1, end_status=mock.Mock(), gfe_latency=None) + attempt = CompletedAttemptMetric( + start_time=0, end_time=1, end_status=mock.Mock(), gfe_latency=None + ) op = ActiveOperationMetric(mock.Mock()) instance = self._make_one() diff --git a/tests/unit/data/_metrics/test_data_model.py b/tests/unit/data/_metrics/test_data_model.py index ec25490ea..18fb7ccb3 100644 --- a/tests/unit/data/_metrics/test_data_model.py +++ b/tests/unit/data/_metrics/test_data_model.py @@ -15,15 +15,14 @@ import time import pytest import mock -from uuid import UUID from google.cloud.bigtable.data._metrics.data_model import OperationState as State class TestActiveOperationMetric: - def _make_one(self, *args, **kwargs): from google.cloud.bigtable.data._metrics.data_model import ActiveOperationMetric + return ActiveOperationMetric(*args, **kwargs) def test_ctor_defaults(self): @@ -115,16 +114,37 @@ def test_state_machine_w_state(self): else: assert metric.state == State.CREATED - - @pytest.mark.parametrize("method,args,valid_states,error_method_name", [ - ("start", (), (State.CREATED,), None), - ("start_attempt", (), (State.CREATED, State.BETWEEN_ATTEMPTS), None), - ("add_response_metadata", ({},), (State.ACTIVE_ATTEMPT,), None), - ("attempt_first_response", (), (State.ACTIVE_ATTEMPT,), None), - ("end_attempt_with_status", (mock.Mock(),), (State.ACTIVE_ATTEMPT,), None), - ("end_with_status", (mock.Mock(),), (State.CREATED, State.ACTIVE_ATTEMPT,State.BETWEEN_ATTEMPTS,), None), - ("end_with_success", (), (State.CREATED, State.ACTIVE_ATTEMPT,State.BETWEEN_ATTEMPTS,), "end_with_status"), - ], ids=lambda x: x if isinstance(x, str) else "") + @pytest.mark.parametrize( + "method,args,valid_states,error_method_name", + [ + ("start", (), (State.CREATED,), None), + ("start_attempt", (), (State.CREATED, State.BETWEEN_ATTEMPTS), None), + ("add_response_metadata", ({},), (State.ACTIVE_ATTEMPT,), None), + ("attempt_first_response", (), (State.ACTIVE_ATTEMPT,), None), + ("end_attempt_with_status", (mock.Mock(),), (State.ACTIVE_ATTEMPT,), None), + ( + "end_with_status", + (mock.Mock(),), + ( + State.CREATED, + State.ACTIVE_ATTEMPT, + State.BETWEEN_ATTEMPTS, + ), + None, + ), + ( + "end_with_success", + (), + ( + State.CREATED, + State.ACTIVE_ATTEMPT, + State.BETWEEN_ATTEMPTS, + ), + "end_with_status", + ), + ], + ids=lambda x: x if isinstance(x, str) else "", + ) def test_error_invalid_states(self, method, args, valid_states, error_method_name): """ each method only works for certain states. Make sure _handle_error is called for invalid states @@ -145,7 +165,10 @@ def test_error_invalid_states(self, method, args, valid_states, error_method_nam return_obj = getattr(metric, method)(*args) assert return_obj is None assert mock_handle_error.call_count == 1 - assert mock_handle_error.call_args[0][0] == f"Invalid state for {error_method_name}: {state}" + assert ( + mock_handle_error.call_args[0][0] + == f"Invalid state for {error_method_name}: {state}" + ) def test_start(self): """ @@ -165,6 +188,7 @@ def test_start_attempt(self): calling start_attempt should create a new emptu atempt metric """ from google.cloud.bigtable.data._metrics.data_model import ActiveAttemptMetric + metric = self._make_one(mock.Mock()) assert metric.active_attempt is None metric.start_attempt() @@ -176,31 +200,45 @@ def test_start_attempt(self): # should be in ACTIVE_ATTEMPT state after completing assert metric.state == State.ACTIVE_ATTEMPT - @pytest.mark.parametrize("start_cluster,start_zone,metadata_field,end_cluster,end_zone", [ - (None,None, None, None, None), - ("orig_cluster", "orig_zone", None, "orig_cluster", "orig_zone"), - (None,None, b"cluster zone", "cluster", "zone"), - (None,None, b'\n\rus-central1-b\x12\x0ctest-cluster', "us-central1-b", "test-cluster"), - ("orig_cluster","orig_zone", b"new_new", "orig_cluster", "orig_zone"), - (None,None, b"", None, None), - (None,None, b"cluster zone future", "cluster", "zone"), - (None, "filled", b"cluster zone", "cluster", "zone"), - ("filled", None, b"cluster zone", "cluster", "zone"), - ]) - def test_add_response_metadata_cbt_header(self, start_cluster, start_zone, metadata_field, end_cluster, end_zone): + @pytest.mark.parametrize( + "start_cluster,start_zone,metadata_field,end_cluster,end_zone", + [ + (None, None, None, None, None), + ("orig_cluster", "orig_zone", None, "orig_cluster", "orig_zone"), + (None, None, b"cluster zone", "cluster", "zone"), + ( + None, + None, + b"\n\rus-central1-b\x12\x0ctest-cluster", + "us-central1-b", + "test-cluster", + ), + ("orig_cluster", "orig_zone", b"new_new", "orig_cluster", "orig_zone"), + (None, None, b"", None, None), + (None, None, b"cluster zone future", "cluster", "zone"), + (None, "filled", b"cluster zone", "cluster", "zone"), + ("filled", None, b"cluster zone", "cluster", "zone"), + ], + ) + def test_add_response_metadata_cbt_header( + self, start_cluster, start_zone, metadata_field, end_cluster, end_zone + ): """ calling add_response_metadata should update fields based on grpc response metadata The x-goog-ext-425905942-bin field contains cluster and zone info """ import grpc + cls = type(self._make_one(mock.Mock())) with mock.patch.object(cls, "_handle_error") as mock_handle_error: - metric = self._make_one(mock.Mock(), cluster_id=start_cluster, zone=start_zone) + metric = self._make_one( + mock.Mock(), cluster_id=start_cluster, zone=start_zone + ) metric.active_attempt = mock.Mock() metric.active_attempt.gfe_latency = None metadata = grpc.aio.Metadata() if metadata_field: - metadata['x-goog-ext-425905942-bin'] = metadata_field + metadata["x-goog-ext-425905942-bin"] = metadata_field metric.add_response_metadata(metadata) assert metric.cluster_id == end_cluster assert metric.zone == end_zone @@ -211,10 +249,13 @@ def test_add_response_metadata_cbt_header(self, start_cluster, start_zone, metad # gfe latency should not be touched assert metric.active_attempt.gfe_latency is None - @pytest.mark.parametrize("metadata_field", [ - b"cluster", - "cluster zone", # expect bytes - ]) + @pytest.mark.parametrize( + "metadata_field", + [ + b"cluster", + "cluster zone", # expect bytes + ], + ) def test_add_response_metadata_cbt_header_w_error(self, metadata_field): """ If the x-goog-ext-425905942-bin field is present, but not structured properly, @@ -223,6 +264,7 @@ def test_add_response_metadata_cbt_header_w_error(self, metadata_field): Extra fields should not result in parsingerror """ import grpc + cls = type(self._make_one(mock.Mock())) with mock.patch.object(cls, "_handle_error") as mock_handle_error: metric = self._make_one(mock.Mock()) @@ -230,31 +272,40 @@ def test_add_response_metadata_cbt_header_w_error(self, metadata_field): metric.zone = None metric.active_attempt = mock.Mock() metadata = grpc.aio.Metadata() - metadata['x-goog-ext-425905942-bin'] = metadata_field + metadata["x-goog-ext-425905942-bin"] = metadata_field metric.add_response_metadata(metadata) # should remain in ACTIVE_ATTEMPT state after completing assert metric.state == State.ACTIVE_ATTEMPT # no errors encountered assert mock_handle_error.call_count == 1 - assert mock_handle_error.call_args[0][0] == f"Failed to decode x-goog-ext-425905942-bin metadata: {metadata_field}" - - @pytest.mark.parametrize("metadata_field,expected_latency", [ - (None, None), - ("gfet4t7; dur=1000", 1), - ("gfet4t7; dur=1000.0", 1), - ("gfet4t7; dur=1000.1", 1.0001), - ("gfet4t7; dur=1000 dur=2000", 2), - ("gfet4t7; dur=0", 0), - ("gfet4t7; dur=empty", None), - ("gfet4t7;", None), - ("", None), - ]) - def test_add_response_metadata_server_timing_header(self, metadata_field, expected_latency): + assert ( + mock_handle_error.call_args[0][0] + == f"Failed to decode x-goog-ext-425905942-bin metadata: {metadata_field}" + ) + + @pytest.mark.parametrize( + "metadata_field,expected_latency", + [ + (None, None), + ("gfet4t7; dur=1000", 1), + ("gfet4t7; dur=1000.0", 1), + ("gfet4t7; dur=1000.1", 1.0001), + ("gfet4t7; dur=1000 dur=2000", 2), + ("gfet4t7; dur=0", 0), + ("gfet4t7; dur=empty", None), + ("gfet4t7;", None), + ("", None), + ], + ) + def test_add_response_metadata_server_timing_header( + self, metadata_field, expected_latency + ): """ calling add_response_metadata should update fields based on grpc response metadata The server-timing field contains gfle latency info """ import grpc + cls = type(self._make_one(mock.Mock())) with mock.patch.object(cls, "_handle_error") as mock_handle_error: metric = self._make_one(mock.Mock()) @@ -262,7 +313,7 @@ def test_add_response_metadata_server_timing_header(self, metadata_field, expect metric.active_attempt.gfe_latency = None metadata = grpc.aio.Metadata() if metadata_field: - metadata['server-timing'] = metadata_field + metadata["server-timing"] = metadata_field metric.add_response_metadata(metadata) if metric.active_attempt.gfe_latency is None: assert expected_latency is None @@ -293,7 +344,10 @@ def test_attempt_first_response(self): # calling it again should cause an error metric.attempt_first_response() assert mock_handle_error.call_count == 1 - assert mock_handle_error.call_args[0][0] == "Attempt already received first response" + assert ( + mock_handle_error.call_args[0][0] + == "Attempt already received first response" + ) # value should not be changed assert metric.active_attempt.first_response_latency == got_latency @@ -336,7 +390,9 @@ def test_end_attempt_with_status_w_exception(self): metric = self._make_one(mock.Mock()) metric.start_attempt() - with mock.patch.object(metric, "_exc_to_status", return_value=expected_status) as mock_exc_to_status: + with mock.patch.object( + metric, "_exc_to_status", return_value=expected_status + ) as mock_exc_to_status: metric.end_attempt_with_status(input_status) assert mock_exc_to_status.call_count == 1 assert mock_exc_to_status.call_args[0][0] == input_status @@ -350,6 +406,7 @@ def test_end_with_status(self): - update handlers """ from google.cloud.bigtable.data._metrics.data_model import ActiveAttemptMetric + expected_attempt_start_time = 7 expected_attempt_first_response_latency = 9 expected_attempt_gfe_latency = 5 @@ -362,7 +419,9 @@ def test_end_with_status(self): is_streaming = object() handlers = [mock.Mock(), mock.Mock()] - metric = self._make_one(expected_type, handlers=handlers,start_time=expected_start_time) + metric = self._make_one( + expected_type, handlers=handlers, start_time=expected_start_time + ) metric.cluster_id = expected_cluster metric.zone = expected_zone metric.is_streaming = is_streaming @@ -394,7 +453,10 @@ def test_end_with_status(self): assert len(called_with.completed_attempts) == 1 final_attempt = called_with.completed_attempts[0] assert final_attempt.start_time == expected_attempt_start_time - assert final_attempt.first_response_latency == expected_attempt_first_response_latency + assert ( + final_attempt.first_response_latency + == expected_attempt_first_response_latency + ) assert final_attempt.gfe_latency == expected_attempt_gfe_latency assert final_attempt.end_status == expected_status assert time.monotonic() - final_attempt.end_time < 0.001 @@ -409,7 +471,9 @@ def test_end_with_status_w_exception(self): metric = self._make_one(mock.Mock(), handlers=handlers) metric.start_attempt() - with mock.patch.object(metric, "_exc_to_status", return_value=expected_status) as mock_exc_to_status: + with mock.patch.object( + metric, "_exc_to_status", return_value=expected_status + ) as mock_exc_to_status: metric.end_with_status(input_status) assert mock_exc_to_status.call_count == 1 assert mock_exc_to_status.call_args[0][0] == input_status @@ -438,6 +502,7 @@ def test_end_on_empty_operation(self): Should be able to end an operation without any attempts """ from grpc import StatusCode + handlers = [mock.Mock()] metric = self._make_one(mock.Mock(), handlers=handlers) metric.end_with_success() @@ -479,7 +544,7 @@ def test_build_on_error_fn(self): cls.build_on_error_fn( mock.Mock(), predicate=lambda x: pred_val, - wrapped_on_error=mock_wrapped_on_error + wrapped_on_error=mock_wrapped_on_error, )(input_exc) assert mock_wrapped_on_error.call_count == 1 assert mock_wrapped_on_error.call_args[0][0] == input_exc @@ -500,28 +565,37 @@ def test__exc_to_status(self): assert cls._exc_to_status(ValueError()) == StatusCode.UNKNOWN assert cls._exc_to_status(RuntimeError()) == StatusCode.UNKNOWN # grpc status code for grpc errors - assert cls._exc_to_status(core_exc.InvalidArgument('msg')) == StatusCode.INVALID_ARGUMENT - assert cls._exc_to_status(core_exc.NotFound('msg')) == StatusCode.NOT_FOUND - assert cls._exc_to_status(core_exc.AlreadyExists('msg')) == StatusCode.ALREADY_EXISTS - assert cls._exc_to_status(core_exc.PermissionDenied('msg')) == StatusCode.PERMISSION_DENIED - cause_exc = core_exc.AlreadyExists('msg') - w_cause = core_exc.DeadlineExceeded('msg') + assert ( + cls._exc_to_status(core_exc.InvalidArgument("msg")) + == StatusCode.INVALID_ARGUMENT + ) + assert cls._exc_to_status(core_exc.NotFound("msg")) == StatusCode.NOT_FOUND + assert ( + cls._exc_to_status(core_exc.AlreadyExists("msg")) + == StatusCode.ALREADY_EXISTS + ) + assert ( + cls._exc_to_status(core_exc.PermissionDenied("msg")) + == StatusCode.PERMISSION_DENIED + ) + cause_exc = core_exc.AlreadyExists("msg") + w_cause = core_exc.DeadlineExceeded("msg") w_cause.__cause__ = cause_exc assert cls._exc_to_status(w_cause) == StatusCode.DEADLINE_EXCEEDED # use cause if available - w_cause = ValueError('msg') + w_cause = ValueError("msg") w_cause.__cause__ = cause_exc cause_exc.grpc_status_code = object() custom_excs = [ bt_exc.FailedMutationEntryError(1, mock.Mock(), cause=cause_exc), bt_exc.FailedQueryShardError(1, {}, cause=cause_exc), - w_cause + w_cause, ] for exc in custom_excs: assert cls._exc_to_status(exc) == cause_exc.grpc_status_code, exc # extract most recent exception for bigtable exception groups exc_groups = [ - bt_exc._BigtableExceptionGroup('', [ValueError(), cause_exc]), + bt_exc._BigtableExceptionGroup("", [ValueError(), cause_exc]), bt_exc.RetryExceptionGroup([RuntimeError(), cause_exc]), bt_exc.ShardedReadRowsExceptionGroup( [bt_exc.FailedQueryShardError(1, {}, cause=cause_exc)], [], 2 @@ -540,12 +614,17 @@ def test__handle_error(self): input_message = "test message" expected_message = f"Error in Bigtable Metrics: {input_message}" # if ALLOW_METRIC_EXCEPTIONS is set, raise the exception - with mock.patch("google.cloud.bigtable.data._metrics.data_model.ALLOW_METRIC_EXCEPTIONS", True): + with mock.patch( + "google.cloud.bigtable.data._metrics.data_model.ALLOW_METRIC_EXCEPTIONS", + True, + ): with pytest.raises(ValueError) as e: type(self._make_one(object()))._handle_error(input_message) assert e.value.args[0] == expected_message # if LOGGER is populated, log the exception - with mock.patch("google.cloud.bigtable.data._metrics.data_model.LOGGER") as logger_mock: + with mock.patch( + "google.cloud.bigtable.data._metrics.data_model.LOGGER" + ) as logger_mock: type(self._make_one(object()))._handle_error(input_message) assert logger_mock.warning.call_count == 1 assert logger_mock.warning.call_args[0][0] == expected_message @@ -624,6 +703,7 @@ async def test_wrap_attempt_fn_success(self): - should call end_with_success """ from grpc import StatusCode + metric = self._make_one(object()) async with metric as context: mock_call = mock.AsyncMock() @@ -646,7 +726,6 @@ async def test_wrap_attempt_fn_success(self): assert len(metric.completed_attempts) == 1 assert metric.completed_attempts[0].end_status == StatusCode.OK - @pytest.mark.asyncio async def test_wrap_attempt_fn_success_extract_call_metadata(self): """ @@ -660,7 +739,9 @@ async def test_wrap_attempt_fn_success_extract_call_metadata(self): mock_call = mock_grpc_call() inner_fn = lambda *args, **kwargs: mock_call # noqa wrapped_fn = context.wrap_attempt_fn(inner_fn, extract_call_metadata=True) - with mock.patch.object(metric, "add_response_metadata") as mock_add_metadata: + with mock.patch.object( + metric, "add_response_metadata" + ) as mock_add_metadata: # make the wrapped call result = await wrapped_fn() assert result == mock_call @@ -682,7 +763,9 @@ async def test_wrap_attempt_fn_failed_extract_call_metadata(self): metric = self._make_one(object()) async with metric as context: wrapped_fn = context.wrap_attempt_fn(inner_fn, extract_call_metadata=True) - with mock.patch.object(metric, "add_response_metadata") as mock_add_metadata: + with mock.patch.object( + metric, "add_response_metadata" + ) as mock_add_metadata: # make the wrapped call. expect exception when awaiting on mock_call with pytest.raises(TypeError): await wrapped_fn() @@ -697,9 +780,12 @@ async def test_wrap_attempt_fn_failed_extract_call_metadata_no_mock(self): Make sure the metadata is accessible after a failed attempt """ import grpc + mock_call = mock.AsyncMock() mock_call.trailing_metadata.return_value = grpc.aio.Metadata() - mock_call.initial_metadata.return_value = grpc.aio.Metadata(("server-timing", "gfet4t7; dur=5000")) + mock_call.initial_metadata.return_value = grpc.aio.Metadata( + ("server-timing", "gfet4t7; dur=5000") + ) inner_fn = lambda *args, **kwargs: mock_call # noqa metric = self._make_one(object()) async with metric as context: @@ -716,9 +802,12 @@ async def test_wrap_attempt_fn_failed_attempt(self): failed attempts should call operation.end_attempt with error """ from grpc import StatusCode + metric = self._make_one(object()) async with metric as context: - wrapped_fn = context.wrap_attempt_fn(mock.Mock(), extract_call_metadata=False) + wrapped_fn = context.wrap_attempt_fn( + mock.Mock(), extract_call_metadata=False + ) # make the wrapped call. expect type error when awaiting response of mock with pytest.raises(TypeError): await wrapped_fn() @@ -739,14 +828,19 @@ async def test_wrap_attempt_fn_with_retry(self): from grpc import StatusCode from google.api_core.retry_async import AsyncRetry from google.api_core.exceptions import RetryError + metric = self._make_one(object()) with pytest.raises(RetryError): # should eventually fail due to timeout async with metric as context: always_retry = lambda x: True # noqa - retry_obj = AsyncRetry(predicate=always_retry, timeout=0.05, maximum=0.001) + retry_obj = AsyncRetry( + predicate=always_retry, timeout=0.05, maximum=0.001 + ) # mock.Mock will fail on await - double_wrapped_fn = retry_obj(context.wrap_attempt_fn(mock.Mock(), extract_call_metadata=False)) + double_wrapped_fn = retry_obj( + context.wrap_attempt_fn(mock.Mock(), extract_call_metadata=False) + ) await double_wrapped_fn() # make sure operation ended with expected state assert metric.state == State.COMPLETED diff --git a/tests/unit/data/_metrics/test_metrics_controller.py b/tests/unit/data/_metrics/test_metrics_controller.py index 84ec2d03a..36586a51e 100644 --- a/tests/unit/data/_metrics/test_metrics_controller.py +++ b/tests/unit/data/_metrics/test_metrics_controller.py @@ -14,10 +14,12 @@ import mock -class TestBigtableClientSideMetricsController: +class TestBigtableClientSideMetricsController: def _make_one(self, *args, **kwargs): - from google.cloud.bigtable.data._metrics import BigtableClientSideMetricsController + from google.cloud.bigtable.data._metrics import ( + BigtableClientSideMetricsController, + ) return BigtableClientSideMetricsController(*args, **kwargs) @@ -27,7 +29,9 @@ def test_ctor_defaults(self): """ from google.cloud.bigtable.data._metrics import OpenTelemetryMetricsHandler - instance = self._make_one(project_id="p", instance_id="i", table_id="t", app_profile_id="a") + instance = self._make_one( + project_id="p", instance_id="i", table_id="t", app_profile_id="a" + ) assert len(instance.handlers) == 1 assert isinstance(instance.handlers[0], OpenTelemetryMetricsHandler) @@ -38,8 +42,12 @@ def test_ctor_w_logging(self): from google.cloud.bigtable.data._metrics import OpenTelemetryMetricsHandler from google.cloud.bigtable.data._metrics import _StdoutMetricsHandler - with mock.patch("google.cloud.bigtable.data._metrics.metrics_controller.PRINT_METRICS", True): - controller = self._make_one(project_id="p", instance_id="i", table_id="t", app_profile_id="a") + with mock.patch( + "google.cloud.bigtable.data._metrics.metrics_controller.PRINT_METRICS", True + ): + controller = self._make_one( + project_id="p", instance_id="i", table_id="t", app_profile_id="a" + ) assert len(controller.handlers) == 2 assert OpenTelemetryMetricsHandler in [type(h) for h in controller.handlers] assert _StdoutMetricsHandler in [type(h) for h in controller.handlers] @@ -66,13 +74,16 @@ def test_add_handler(self): def test_create_operation_mock(self): """ - All args should be passed through, as well as the handlers + All args should be passed through, as well as the handlers """ from google.cloud.bigtable.data._metrics import ActiveOperationMetric + controller = self._make_one(handlers=[object()]) arg = object() kwargs = {"a": 1, "b": 2} - with mock.patch("google.cloud.bigtable.data._metrics.ActiveOperationMetric.__init__") as mock_op: + with mock.patch( + "google.cloud.bigtable.data._metrics.ActiveOperationMetric.__init__" + ) as mock_op: mock_op.return_value = None op = controller.create_operation(arg, **kwargs) assert isinstance(op, ActiveOperationMetric) @@ -81,12 +92,15 @@ def test_create_operation_mock(self): def test_create_operation(self): from google.cloud.bigtable.data._metrics import ActiveOperationMetric + handler = object() expected_type = object() expected_is_streaming = True expected_zone = object() controller = self._make_one(handlers=[handler]) - op = controller.create_operation(expected_type, is_streaming=expected_is_streaming, zone=expected_zone) + op = controller.create_operation( + expected_type, is_streaming=expected_is_streaming, zone=expected_zone + ) assert isinstance(op, ActiveOperationMetric) assert op.op_type is expected_type assert op.is_streaming is expected_is_streaming @@ -95,7 +109,6 @@ def test_create_operation(self): assert op.handlers[0] is handler def test_create_operation_multiple_handlers(self): - from google.cloud.bigtable.data._metrics import ActiveOperationMetric orig_handler = object() new_handler = object() controller = self._make_one(handlers=[orig_handler])