Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add test proxy #836

Merged
merged 34 commits into from
Aug 18, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
aa1e40e
support emulator in data client
daniel-sanche Jul 17, 2023
b059139
added test proxy files
daniel-sanche Jul 17, 2023
3ed2168
cleaned up noxfile
daniel-sanche Jul 17, 2023
5199839
moved protos to subdir
daniel-sanche Jul 17, 2023
4e14bf3
close client
daniel-sanche Jul 17, 2023
cbb95c9
moved handlers into subdir
daniel-sanche Jul 17, 2023
f43aac1
reverted close
daniel-sanche Jul 18, 2023
91fc1e6
removed user agent
daniel-sanche Jul 18, 2023
06e5276
removed go submodule
daniel-sanche Jul 18, 2023
62b8e48
fixied typo
daniel-sanche Jul 20, 2023
237e051
removed unneeded files
daniel-sanche Jul 20, 2023
868ff2e
removed duplicate client handler legacy
daniel-sanche Jul 20, 2023
21a5077
Merge branch 'v3' into test_proxy2
daniel-sanche Aug 16, 2023
02f0c09
addressed PR comments
daniel-sanche Aug 16, 2023
456caba
ran blacken
daniel-sanche Aug 16, 2023
f3627c1
Merge branch 'v3' into test_proxy2
daniel-sanche Aug 16, 2023
bcc02d7
fix method name
daniel-sanche Aug 16, 2023
5e7c156
added missing import
daniel-sanche Aug 17, 2023
604d3d8
added conformance tests to kokoro
daniel-sanche Aug 17, 2023
14f359d
added conformance to nox sessions
daniel-sanche Aug 17, 2023
858c57d
Revert unwwanted noxfile changes
daniel-sanche Aug 17, 2023
36a3153
added missing run_tests file
daniel-sanche Aug 17, 2023
07b39b1
changed conformance test kokoro configs
daniel-sanche Aug 17, 2023
5d90478
ran blacken
daniel-sanche Aug 17, 2023
b69da5a
install golang for conformance tests
daniel-sanche Aug 17, 2023
df3ea47
update before attempting install
daniel-sanche Aug 17, 2023
8dcd444
changed go install method
daniel-sanche Aug 17, 2023
94a8684
moved go installation to run_tests
daniel-sanche Aug 17, 2023
72b8d1b
fixed failing conformance tests
daniel-sanche Aug 17, 2023
8496211
Merge branch 'v3' into test_proxy2
daniel-sanche Aug 17, 2023
71ba0ea
fixed read rows test error
daniel-sanche Aug 17, 2023
94e98db
fixed conformance test errors
daniel-sanche Aug 17, 2023
320d157
download go locally instead of installing to system
daniel-sanche Aug 18, 2023
5064870
fixed lint issue
daniel-sanche Aug 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 27 additions & 7 deletions google/cloud/bigtable/data/_async/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import warnings
import sys
import random
import os

from collections import namedtuple

Expand All @@ -37,10 +38,12 @@
from google.cloud.bigtable_v2.services.bigtable.async_client import DEFAULT_CLIENT_INFO
from google.cloud.bigtable_v2.services.bigtable.transports.pooled_grpc_asyncio import (
PooledBigtableGrpcAsyncIOTransport,
PooledChannel,
)
from google.cloud.bigtable_v2.types.bigtable import PingAndWarmRequest
from google.cloud.client import ClientWithProject
from google.api_core.exceptions import GoogleAPICallError
from google.cloud.environment_vars import BIGTABLE_EMULATOR # type: ignore
from google.api_core import retry_async as retries
from google.api_core import exceptions as core_exceptions
from google.cloud.bigtable.data._async._read_rows import _ReadRowsOperationAsync
Expand Down Expand Up @@ -150,26 +153,43 @@ def __init__(
# keep track of table objects associated with each instance
# only remove instance from _active_instances when all associated tables remove it
self._instance_owners: dict[_WarmedInstanceKey, Set[int]] = {}
# attempt to start background tasks
self._channel_init_time = time.monotonic()
self._channel_refresh_tasks: list[asyncio.Task[None]] = []
try:
self._start_background_channel_refresh()
except RuntimeError:
self._emulator_host = os.getenv(BIGTABLE_EMULATOR)
if self._emulator_host is not None:
# connect to an emulator host
warnings.warn(
f"{self.__class__.__name__} should be started in an "
"asyncio event loop. Channel refresh will not be started",
"Connecting to Bigtable emulator at {}".format(self._emulator_host),
RuntimeWarning,
stacklevel=2,
)
self.transport._grpc_channel = PooledChannel(
pool_size=pool_size,
host=self._emulator_host,
insecure=True,
)
# refresh cached stubs to use emulator pool
self.transport._stubs = {}
self.transport._prep_wrapped_messages(client_info)
else:
# attempt to start background channel refresh tasks
try:
self.start_background_channel_refresh()
except RuntimeError:
warnings.warn(
f"{self.__class__.__name__} should be started in an "
"asyncio event loop. Channel refresh will not be started",
RuntimeWarning,
stacklevel=2,
)

def _start_background_channel_refresh(self) -> None:
"""
Starts a background task to ping and warm each channel in the pool
Raises:
- RuntimeError if not called in an asyncio event loop
"""
if not self._channel_refresh_tasks:
if not self._channel_refresh_tasks and not self._emulator_host:
# raise RuntimeError if there is no event loop
asyncio.get_running_loop()
for channel_idx in range(self.transport.pool_size):
Expand Down
105 changes: 105 additions & 0 deletions test_proxy/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# CBT Python Test Proxy

The CBT test proxy is intended for running conformance tests for Cloud Bigtable Python Client.

## Start test proxy

#### running the proxy with nox

You can launch the test proxy directly using `nox`, which will handle dependency management

```
cd python-bigtable/test_proxy
nox -s test_proxy
```

The port can be configured using the `PROXY_SERVER_PORT` environment variable

```
cd python-bigtable/test_proxy
PROXY_SERVER_PORT=8080
nox -s test_proxy
```

#### running the proxy script manually

You can also run the `test_proxy.py` file directly

```
cd python-bigtable/test_proxy
python test_proxy.py
```

The port can be set by passing in an extra positional argument

```
cd python-bigtable/test_proxy
python test_proxy.py --port 8080
```

## Run the test cases

Prerequisites:
- If you have not already done so, [install golang](https://go.dev/doc/install).
- Before running tests, [launch an instance of the test proxy](#start-test-proxy)
in a separate shell session, and make note of the port

#### running the test cases with nox

You can trigger the tests directly using `nox`, which will clone the test repo locally if it doesn't exist

```
cd python-bigtable/test_proxy
nox -s conformance_tests
```

The port can be configured using the `PROXY_SERVER_PORT` environment variable

```
cd python-bigtable/test_proxy
PROXY_SERVER_PORT=8080
nox -s conformance_tests
```

#### running the test cases manually

Clone and navigate to the go test library:

```
git clone https://github.com/googleapis/cloud-bigtable-clients-test.git
cd cloud-bigtable-clients-test/tests
```


Launch the tests

```
go test -v -proxy_addr=:50055
```

## Test a released client

You can run the test proxy against a released version of the library with `nox`
by setting the `PROXY_CLIENT_VERSION` environment variable:

```
PROXY_CLIENT_VERSION=3.0.0
nox -s test_proxy
```

if unset, it will default to installing the library from source

## Test the legacy client

By default, tests are run against the new data client.You can run the test proxy against the
previous client by running it with the `--legacy-client` flag:

```
nox -s test_proxy -- --legacy-client
```

or

```
python test_proxy.py --legacy-client
```
213 changes: 213 additions & 0 deletions test_proxy/handlers/client_handler_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This module contains the client handler process for proxy_server.py.
"""
import os

from google.cloud.environment_vars import BIGTABLE_EMULATOR
from google.cloud.bigtable.data import BigtableDataClientAsync


def error_safe(func):
"""
Catch and pass errors back to the grpc_server_process
Also check if client is closed before processing requests
"""
async def wrapper(self, *args, **kwargs):
try:
if self.closed:
raise RuntimeError("client is closed")
return await func(self, *args, **kwargs)
except (Exception, NotImplementedError) as e:
# exceptions should be raised in grpc_server_process
return encode_exception(e)

return wrapper


def encode_exception(exc):
"""
Encode an exception or chain of exceptions to pass back to grpc_handler
"""
from google.api_core.exceptions import GoogleAPICallError
error_msg = f"{type(exc).__name__}: {exc}"
result = {"error": error_msg}
if exc.__cause__:
result["cause"] = encode_exception(exc.__cause__)
if hasattr(exc, "exceptions"):
result["subexceptions"] = [encode_exception(e) for e in exc.exceptions]
if hasattr(exc, "index"):
result["index"] = exc.index
if isinstance(exc, GoogleAPICallError):
if exc.grpc_status_code is not None:
result["code"] = exc.grpc_status_code.value[0]
elif exc.code is not None:
result["code"] = int(exc.code)
else:
result["code"] = -1
elif result.get("cause", {}).get("code", None):
# look for code code in cause
result["code"] = result["cause"]["code"]
elif result.get("subexceptions", None):
# look for code in subexceptions
for subexc in result["subexceptions"]:
if subexc.get("code", None):
result["code"] = subexc["code"]
return result


class TestProxyClientHandler:
"""
Implements the same methods as the grpc server, but handles the client
library side of the request.

Requests received in TestProxyGrpcServer are converted to a dictionary,
and supplied to the TestProxyClientHandler methods as kwargs.
The client response is then returned back to the TestProxyGrpcServer
"""

def __init__(
self,
data_target=None,
project_id=None,
instance_id=None,
app_profile_id=None,
per_operation_timeout=None,
**kwargs,
):
self.closed = False
# use emulator
os.environ[BIGTABLE_EMULATOR] = data_target
self.client = BigtableDataClientAsync(project=project_id)
self.instance_id = instance_id
self.app_profile_id = app_profile_id
self.per_operation_timeout = per_operation_timeout

def close(self):
# TODO: call self.client.close()
self.closed = True

@error_safe
async def ReadRows(self, request, **kwargs):
table_id = request["table_name"].split("/")[-1]
app_profile_id = self.app_profile_id or request.get("app_profile_id", None)
table = self.client.get_table(self.instance_id, table_id, app_profile_id)
kwargs["operation_timeout"] = kwargs.get("operation_timeout", self.per_operation_timeout) or 20
result_list = await table.read_rows(request, **kwargs)
# pack results back into protobuf-parsable format
serialized_response = [row.to_dict() for row in result_list]
return serialized_response

@error_safe
async def ReadRow(self, row_key, **kwargs):
table_id = kwargs.pop("table_name").split("/")[-1]
app_profile_id = self.app_profile_id or kwargs.get("app_profile_id", None)
table = self.client.get_table(self.instance_id, table_id, app_profile_id)
kwargs["operation_timeout"] = kwargs.get("operation_timeout", self.per_operation_timeout) or 20
result_row = await table.read_row(row_key, **kwargs)
# pack results back into protobuf-parsable format
if result_row:
return result_row.to_dict()
else:
return "None"

@error_safe
async def MutateRow(self, request, **kwargs):
from google.cloud.bigtable.data.mutations import Mutation
table_id = request["table_name"].split("/")[-1]
app_profile_id = self.app_profile_id or request.get("app_profile_id", None)
table = self.client.get_table(self.instance_id, table_id, app_profile_id)
kwargs["operation_timeout"] = kwargs.get("operation_timeout", self.per_operation_timeout) or 20
row_key = request["row_key"]
mutations = [Mutation._from_dict(d) for d in request["mutations"]]
await table.mutate_row(row_key, mutations, **kwargs)
return "OK"

@error_safe
async def BulkMutateRows(self, request, **kwargs):
from google.cloud.bigtable.data.mutations import RowMutationEntry
table_id = request["table_name"].split("/")[-1]
app_profile_id = self.app_profile_id or request.get("app_profile_id", None)
table = self.client.get_table(self.instance_id, table_id, app_profile_id)
kwargs["operation_timeout"] = kwargs.get("operation_timeout", self.per_operation_timeout) or 20
entry_list = [RowMutationEntry._from_dict(entry) for entry in request["entries"]]
await table.bulk_mutate_rows(entry_list, **kwargs)
return "OK"

@error_safe
async def CheckAndMutateRow(self, request, **kwargs):
from google.cloud.bigtable.data.mutations import Mutation, SetCell
table_id = request["table_name"].split("/")[-1]
app_profile_id = self.app_profile_id or request.get("app_profile_id", None)
table = self.client.get_table(self.instance_id, table_id, app_profile_id)
kwargs["operation_timeout"] = kwargs.get("operation_timeout", self.per_operation_timeout) or 20
row_key = request["row_key"]
# add default values for incomplete dicts, so they can still be parsed to objects
true_mutations = []
for mut_dict in request.get("true_mutations", []):
try:
true_mutations.append(Mutation._from_dict(mut_dict))
except ValueError:
# invalid mutation type. Conformance test may be sending generic empty request
true_mutations.append(SetCell("", "", "", -1))
false_mutations = []
for mut_dict in request.get("false_mutations", []):
try:
false_mutations.append(Mutation._from_dict(mut_dict))
except ValueError:
# invalid mutation type. Conformance test may be sending generic empty request
false_mutations.append(SetCell("", "", "", -1))
predicate_filter = request.get("predicate_filter", None)
result = await table.check_and_mutate_row(
row_key,
predicate_filter,
true_case_mutations=true_mutations,
false_case_mutations=false_mutations,
**kwargs,
)
return result

@error_safe
async def ReadModifyWriteRow(self, request, **kwargs):
from google.cloud.bigtable.data.read_modify_write_rules import IncrementRule
from google.cloud.bigtable.data.read_modify_write_rules import AppendValueRule
table_id = request["table_name"].split("/")[-1]
app_profile_id = self.app_profile_id or request.get("app_profile_id", None)
table = self.client.get_table(self.instance_id, table_id, app_profile_id)
kwargs["operation_timeout"] = kwargs.get("operation_timeout", self.per_operation_timeout) or 20
row_key = request["row_key"]
rules = []
for rule_dict in request.get("rules", []):
qualifier = rule_dict["column_qualifier"]
if "append_value" in rule_dict:
new_rule = AppendValueRule(rule_dict["family_name"], qualifier, rule_dict["append_value"])
else:
new_rule = IncrementRule(rule_dict["family_name"], qualifier, rule_dict["increment_amount"])
rules.append(new_rule)
result = await table.read_modify_write_row(row_key, rules, **kwargs)
# pack results back into protobuf-parsable format
if result:
return result.to_dict()
else:
return "None"

@error_safe
async def SampleRowKeys(self, request, **kwargs):
table_id = request["table_name"].split("/")[-1]
app_profile_id = self.app_profile_id or request.get("app_profile_id", None)
table = self.client.get_table(self.instance_id, table_id, app_profile_id)
kwargs["operation_timeout"] = kwargs.get("operation_timeout", self.per_operation_timeout) or 20
result = await table.sample_row_keys(**kwargs)
return result
Loading