Skip to content

Commit

Permalink
fixed bytes handling
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-sanche committed Jun 19, 2023
1 parent d72caed commit ec85af2
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 37 deletions.
37 changes: 1 addition & 36 deletions test_proxy/client_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,54 +123,35 @@ async def ReadRow(self, row_key, **kwargs):

@error_safe
async def MutateRow(self, request, **kwargs):
import base64
from google.cloud.bigtable.mutations import Mutation
table_id = request["table_name"].split("/")[-1]
app_profile_id = self.app_profile_id or request.get("app_profile_id", None)
table = self.client.get_table(self.instance_id, table_id, app_profile_id)
kwargs["operation_timeout"] = kwargs.get("operation_timeout", self.per_operation_timeout) or 20
row_key = request["row_key"]
try:
# conformance tests send row keys as base64 encoded strings
row_key = base64.b64decode(row_key)
except Exception:
pass
mutations = [Mutation._from_dict(d) for d in request["mutations"]]
await table.mutate_row(row_key, mutations, **kwargs)
return "OK"

@error_safe
async def BulkMutateRows(self, request, **kwargs):
from google.cloud.bigtable.mutations import RowMutationEntry
import base64
table_id = request["table_name"].split("/")[-1]
app_profile_id = self.app_profile_id or request.get("app_profile_id", None)
table = self.client.get_table(self.instance_id, table_id, app_profile_id)
kwargs["operation_timeout"] = kwargs.get("operation_timeout", self.per_operation_timeout) or 20
# conformance tests send row keys as base64 encoded strings
for entry in request["entries"]:
try:
entry["row_key"] = base64.b64decode(entry["row_key"])
except Exception:
pass
entry_list = [RowMutationEntry._from_dict(entry) for entry in request["entries"]]
await table.bulk_mutate_rows(entry_list, **kwargs)
return "OK"

@error_safe
async def CheckAndMutateRow(self, request, **kwargs):
from google.cloud.bigtable.mutations import Mutation, SetCell
import base64
table_id = request["table_name"].split("/")[-1]
app_profile_id = self.app_profile_id or request.get("app_profile_id", None)
table = self.client.get_table(self.instance_id, table_id, app_profile_id)
kwargs["operation_timeout"] = kwargs.get("operation_timeout", self.per_operation_timeout) or 20
row_key = request["row_key"]
try:
# conformance tests send row keys as base64 encoded strings
row_key = base64.b64decode(row_key)
except Exception:
pass
# add default values for incomplete dicts, so they can still be parsed to objects
true_mutations = []
for mut_dict in request.get("true_mutations", []):
Expand Down Expand Up @@ -200,32 +181,16 @@ async def CheckAndMutateRow(self, request, **kwargs):
async def ReadModifyWriteRow(self, request, **kwargs):
from google.cloud.bigtable.read_modify_write_rules import IncrementRule
from google.cloud.bigtable.read_modify_write_rules import AppendValueRule
import base64
table_id = request["table_name"].split("/")[-1]
app_profile_id = self.app_profile_id or request.get("app_profile_id", None)
table = self.client.get_table(self.instance_id, table_id, app_profile_id)
kwargs["operation_timeout"] = kwargs.get("operation_timeout", self.per_operation_timeout) or 20
row_key = request["row_key"]
try:
# conformance tests send row keys as base64 encoded strings
row_key = base64.b64decode(row_key)
except Exception:
pass
rules = []
# conformance tests send qualifiers and values as base64 encoded strings
for rule_dict in request.get("rules", []):
qualifier = rule_dict["column_qualifier"]
try:
qualifier = base64.b64decode(qualifier)
except Exception:
pass
if "append_value" in rule_dict:
value = rule_dict["append_value"]
try:
value = base64.b64decode(value)
except Exception:
pass
new_rule = AppendValueRule(rule_dict["family_name"], qualifier, value)
new_rule = AppendValueRule(rule_dict["family_name"], qualifier, rule_dict["append_value"])
else:
new_rule = IncrementRule(rule_dict["family_name"], qualifier, rule_dict["increment_amount"])
rules.append(new_rule)
Expand Down
12 changes: 11 additions & 1 deletion test_proxy/test_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ async def client_handler_process_async(request_q, queue_pool, use_legacy_client=
Defines a process that recives Bigtable requests from a grpc_server_process,
and runs the request using a client library instance
"""
import base64
import re
import asyncio
import warnings
Expand All @@ -75,6 +76,15 @@ def format_dict(input_obj):
# check for time encodings
if re.match("^[0-9]+s$", input_obj):
return int(input_obj[:-1])
# check for encoded bytes
if re.match("^[A-Za-z0-9+/=]+$", input_obj):
try:
decoded_str = base64.b64decode(input_obj)
# if the string contains non-ascii bytes, raise exception
decoded_str.decode("ascii")
return decoded_str
except Exception:
pass
# check for int strings
try:
return int(input_obj)
Expand Down Expand Up @@ -174,4 +184,4 @@ def client_handler_process(request_q, queue_pool, legacy_client=False):
# )
# client.start()
# grpc_server_process(request_q, response_queue_pool, port)
# client.join()
client.join()

0 comments on commit ec85af2

Please sign in to comment.