Skip to content

Commit

Permalink
fixed lint and mypy
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-sanche committed Nov 27, 2023
1 parent ea7ad81 commit b3c2209
Show file tree
Hide file tree
Showing 9 changed files with 326 additions and 140 deletions.
6 changes: 3 additions & 3 deletions google/cloud/bigtable/data/_async/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1018,7 +1018,7 @@ def on_error_fn(exc):
metadata = _make_metadata(self.table_name, self.app_profile_id)
# trigger rpc
await deadline_wrapped(
row_key=row_key.encode("utf-8") if isinstance(row_key, str) else row_key,
row_key=row_key.encode() if isinstance(row_key, str) else row_key,
mutations=[mutation._to_pb() for mutation in mutations_list],
table_name=self.table_name,
app_profile_id=self.app_profile_id,
Expand Down Expand Up @@ -1141,7 +1141,7 @@ async def check_and_mutate_row(
true_mutations=true_case_list,
false_mutations=false_case_list,
predicate_filter=predicate._to_pb() if predicate is not None else None,
row_key=row_key.encode("utf-8") if isinstance(row_key, str) else row_key,
row_key=row_key.encode() if isinstance(row_key, str) else row_key,
table_name=self.table_name,
app_profile_id=self.app_profile_id,
metadata=metadata,
Expand Down Expand Up @@ -1199,7 +1199,7 @@ async def read_modify_write_row(

result = await metric_wrapped(
rules=[rule._to_pb() for rule in rules],
row_key=row_key.encode("utf-8") if isinstance(row_key, str) else row_key,
row_key=row_key.encode() if isinstance(row_key, str) else row_key,
table_name=self.table_name,
app_profile_id=self.app_profile_id,
metadata=metadata,
Expand Down
57 changes: 39 additions & 18 deletions google/cloud/bigtable/data/_metrics/data_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,28 @@
# limitations under the License.
from __future__ import annotations

from typing import Callable, Any, TYPE_CHECKING
from typing import Callable, Any, cast, TYPE_CHECKING

import time
import os
import re
import logging

from enum import Enum
from uuid import uuid4
from dataclasses import dataclass
from dataclasses import field
from grpc import StatusCode

import google.cloud.bigtable.data.exceptions as bt_exceptions

if TYPE_CHECKING:
from uuid import UUID
from google.cloud.bigtable.data._metrics.handlers._base import MetricsHandler


ALLOW_METRIC_EXCEPTIONS = os.getenv("BIGTABLE_METRICS_EXCEPTIONS", False)
LOGGER = logging.getLogger(__name__) if os.getenv("BIGTABLE_METRICS_LOGS", False) else None
LOGGER = (
logging.getLogger(__name__) if os.getenv("BIGTABLE_METRICS_LOGS", False) else None
)

DEFAULT_ZONE = "global"
DEFAULT_CLUSTER_ID = "unspecified"
Expand All @@ -60,6 +60,7 @@ class OperationType(Enum):

class OperationState(Enum):
"""Enum for the state of the active operation."""

CREATED = 0
ACTIVE_ATTEMPT = 1
BETWEEN_ATTEMPTS = 2
Expand Down Expand Up @@ -153,8 +154,13 @@ def start_attempt(self) -> None:
If the operation was completed or there is already an active attempt,
will raise an exception or warning based on the value of ALLOW_METRIC_EXCEPTIONS.
"""
if self.state != OperationState.BETWEEN_ATTEMPTS and self.state != OperationState.CREATED:
return self._handle_error(INVALID_STATE_ERROR.format("start_attempt", self.state))
if (
self.state != OperationState.BETWEEN_ATTEMPTS
and self.state != OperationState.CREATED
):
return self._handle_error(
INVALID_STATE_ERROR.format("start_attempt", self.state)
)

self.active_attempt = ActiveAttemptMetric()

Expand All @@ -171,10 +177,13 @@ def add_response_metadata(self, metadata: dict[str, bytes | str]) -> None:
- metadata: the metadata as extracted from the grpc call
"""
if self.state != OperationState.ACTIVE_ATTEMPT:
return self._handle_error(INVALID_STATE_ERROR.format("add_response_metadata", self.state))
return self._handle_error(
INVALID_STATE_ERROR.format("add_response_metadata", self.state)
)

if self.cluster_id is None or self.zone is None:
bigtable_metadata = metadata.get(BIGTABLE_METADATA_KEY)
# BIGTABLE_METADATA_KEY should give a binary string with cluster_id and zone
bigtable_metadata = cast(bytes, metadata.get(BIGTABLE_METADATA_KEY))
if bigtable_metadata:
try:
decoded = "".join(
Expand All @@ -185,11 +194,14 @@ def add_response_metadata(self, metadata: dict[str, bytes | str]) -> None:
self.cluster_id = split_data[0]
self.zone = split_data[1]
except (AttributeError, IndexError):
self._handle_error(f"Failed to decode {BIGTABLE_METADATA_KEY} metadata: {bigtable_metadata}")
timing_header = metadata.get(SERVER_TIMING_METADATA_KEY)
self._handle_error(
f"Failed to decode {BIGTABLE_METADATA_KEY} metadata: {bigtable_metadata!r}"
)
# SERVER_TIMING_METADATA_KEY should give a string with the server-latency headers
timing_header = cast(str, metadata.get(SERVER_TIMING_METADATA_KEY))
if timing_header:
timing_data = SERVER_TIMING_REGEX.match(timing_header)
if timing_data:
if timing_data and self.active_attempt:
# convert from milliseconds to seconds
self.active_attempt.gfe_latency = float(timing_data.group(1)) / 1000

Expand All @@ -202,9 +214,11 @@ def attempt_first_response(self) -> None:
active attempt already has a first response time, will raise an
exception or warning based on the value of ALLOW_METRIC_EXCEPTIONS.
"""
if self.state != OperationState.ACTIVE_ATTEMPT:
return self._handle_error(INVALID_STATE_ERROR.format("attempt_first_response", self.state))
elif self.active_attempt.first_response_latency is not None:
if self.state != OperationState.ACTIVE_ATTEMPT or self.active_attempt is None:
return self._handle_error(
INVALID_STATE_ERROR.format("attempt_first_response", self.state)
)
if self.active_attempt.first_response_latency is not None:
return self._handle_error("Attempt already received first response")
self.active_attempt.first_response_latency = (
time.monotonic() - self.active_attempt.start_time
Expand All @@ -220,8 +234,10 @@ def end_attempt_with_status(self, status: StatusCode | Exception) -> None:
Args:
- status: The status of the attempt.
"""
if self.state != OperationState.ACTIVE_ATTEMPT:
return self._handle_error(INVALID_STATE_ERROR.format("end_attempt_with_status", self.state))
if self.state != OperationState.ACTIVE_ATTEMPT or self.active_attempt is None:
return self._handle_error(
INVALID_STATE_ERROR.format("end_attempt_with_status", self.state)
)

new_attempt = CompletedAttemptMetric(
start_time=self.active_attempt.start_time,
Expand Down Expand Up @@ -251,8 +267,12 @@ def end_with_status(self, status: StatusCode | Exception) -> None:
- status: The status of the operation.
"""
if self.state == OperationState.COMPLETED:
return self._handle_error(INVALID_STATE_ERROR.format("end_with_status", self.state))
final_status = self._exc_to_status(status) if isinstance(status, Exception) else status
return self._handle_error(
INVALID_STATE_ERROR.format("end_with_status", self.state)
)
final_status = (
self._exc_to_status(status) if isinstance(status, Exception) else status
)
if self.state == OperationState.ACTIVE_ATTEMPT:
self.end_attempt_with_status(final_status)
self.was_completed = True
Expand Down Expand Up @@ -396,6 +416,7 @@ def wrap_attempt_fn(
grpc function, and will automatically extract trailing_metadata
from the Call object on success.
"""

async def wrapped_fn(*args, **kwargs):
encountered_exc: Exception | None = None
call = None
Expand Down
4 changes: 3 additions & 1 deletion google/cloud/bigtable/data/_metrics/handlers/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,7 @@ def __init__(self, **kwargs):
def on_operation_complete(self, op: CompletedOperationMetric) -> None:
pass

def on_attempt_complete(self, attempt: CompletedAttemptMetric, op: ActiveOperationMetric) -> None:
def on_attempt_complete(
self, attempt: CompletedAttemptMetric, op: ActiveOperationMetric
) -> None:
pass
12 changes: 9 additions & 3 deletions google/cloud/bigtable/data/_metrics/handlers/opentelemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ class _OpenTelemetryInstrumentSingleton:
Singleton class that holds OpenTelelmetry instrument objects,
so that multiple Tables can write to the same metrics.
"""

def __new__(cls):
if not hasattr(cls, 'instance'):
if not hasattr(cls, "instance"):
cls.instance = super(_OpenTelemetryInstrumentSingleton, cls).__new__(cls)
return cls.instance

Expand Down Expand Up @@ -66,6 +67,7 @@ def __init__(self):
description="A count of the number of attempts that failed to reach Google's network.",
)


class OpenTelemetryMetricsHandler(MetricsHandler):
"""
Maintains a set of OpenTelemetry metrics for the Bigtable client library,
Expand Down Expand Up @@ -120,7 +122,9 @@ def on_operation_complete(self, op: CompletedOperationMetric) -> None:
self.otel.operation_latencies.record(op.duration, labels)
self.otel.retry_count.add(len(op.completed_attempts) - 1, labels)

def on_attempt_complete(self, attempt: CompletedAttemptMetric, op: ActiveOperationMetric):
def on_attempt_complete(
self, attempt: CompletedAttemptMetric, op: ActiveOperationMetric
):
"""
Update the metrics associated with a completed attempt:
- attempt_latencies
Expand All @@ -135,7 +139,9 @@ def on_attempt_complete(self, attempt: CompletedAttemptMetric, op: ActiveOperati
**self.shared_labels,
}

self.otel.attempt_latencies.record(attempt.end_time-attempt.start_time, labels)
self.otel.attempt_latencies.record(
attempt.end_time - attempt.start_time, labels
)
if (
op.op_type == OperationType.READ_ROWS
and attempt.first_response_latency is not None
Expand Down
4 changes: 3 additions & 1 deletion google/cloud/bigtable/data/_metrics/metrics_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ def add_handler(self, handler: MetricsHandler) -> None:
"""
self.handlers.append(handler)

def create_operation(self, op_type:OperationType, **kwargs) -> ActiveOperationMetric:
def create_operation(
self, op_type: OperationType, **kwargs
) -> ActiveOperationMetric:
"""
Creates a new operation and registers it with the subscribed handlers.
"""
Expand Down
4 changes: 3 additions & 1 deletion tests/unit/data/_async/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1024,7 +1024,9 @@ async def test_table_ctor(self):
from google.cloud.bigtable.data._async.client import BigtableDataClientAsync
from google.cloud.bigtable.data._async.client import TableAsync
from google.cloud.bigtable.data._async.client import _WarmedInstanceKey
from google.cloud.bigtable.data._metrics import BigtableClientSideMetricsController
from google.cloud.bigtable.data._metrics import (
BigtableClientSideMetricsController,
)
from google.cloud.bigtable.data._metrics import OpenTelemetryMetricsHandler

expected_table_id = "table-id"
Expand Down
Loading

0 comments on commit b3c2209

Please sign in to comment.