Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: read_modify_write and check_and_mutate_row #780

Merged
merged 62 commits into from
Jun 16, 2023
Merged
Changes from 1 commit
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
1d02154
added initial implementation of mutate_rows
daniel-sanche Apr 24, 2023
ab63cba
implemented mutation models
daniel-sanche Apr 24, 2023
cf9daa5
added retries to mutate_row
daniel-sanche Apr 24, 2023
1247da4
return exception group if possible
daniel-sanche Apr 24, 2023
3b3ed8c
check for idempotence
daniel-sanche Apr 24, 2023
5d20037
initial implementation for bulk_mutations
daniel-sanche Apr 24, 2023
3d322a1
include successes in bulk mutation error message
daniel-sanche Apr 24, 2023
a31232b
fixed style checks
daniel-sanche Apr 24, 2023
8da2d65
added basic system tests
daniel-sanche Apr 24, 2023
2b89d9c
added unit tests for mutate_row
daniel-sanche Apr 25, 2023
47c5985
ran blacken
daniel-sanche Apr 25, 2023
38fdcd7
improved exceptions
daniel-sanche Apr 25, 2023
504d2d8
added bulk_mutate_rows unit tests
daniel-sanche Apr 25, 2023
b16067f
ran blacken
daniel-sanche Apr 25, 2023
3ab1405
support __new___ for exceptions for python3.11+
daniel-sanche Apr 25, 2023
0a6c0c6
added exception unit tests
daniel-sanche Apr 25, 2023
ec043cf
makde exceptions tuple
daniel-sanche Apr 26, 2023
518530e
got exceptions to print consistently across versions
daniel-sanche Apr 26, 2023
9624729
added test for 311 rich traceback
daniel-sanche Apr 27, 2023
3087081
moved retryable row mutations to new file
daniel-sanche Apr 27, 2023
9df588f
use index map
daniel-sanche Apr 27, 2023
7ed8be3
added docstring
daniel-sanche Apr 27, 2023
2536cc4
added predicate check to failed mutations
daniel-sanche Apr 27, 2023
1f6875c
added _mutate_rows tests
daniel-sanche Apr 27, 2023
1ea24e6
improved client tests
daniel-sanche Apr 27, 2023
25ca2d2
refactored to loop by raising exception
daniel-sanche Apr 28, 2023
c0787db
refactored retry deadline logic into shared wrapper
daniel-sanche Apr 28, 2023
3ed5c3d
ran black
daniel-sanche Apr 28, 2023
a91fbcb
pulled in table default timeouts
daniel-sanche Apr 28, 2023
df8a058
added tests for shared deadline parsing function
daniel-sanche Apr 28, 2023
b866b57
added tests for mutation models
daniel-sanche Apr 28, 2023
54a4d43
fixed linter errors
daniel-sanche Apr 28, 2023
bd51dc4
added tests for BulkMutationsEntry
daniel-sanche Apr 28, 2023
921b05a
improved mutations documentation
daniel-sanche Apr 28, 2023
82ea61f
refactored mutate_rows logic into helper function
daniel-sanche May 2, 2023
fa42b86
implemented callbacks for mutate_rows
daniel-sanche May 2, 2023
01a16f3
made exceptions into a tuple
daniel-sanche May 5, 2023
e6df77e
improved and tested read_modify_write_rules models
daniel-sanche May 18, 2023
2d8ee3f
implemented read_modify_write
daniel-sanche May 18, 2023
af77dc3
added unit tests
daniel-sanche May 18, 2023
ebe2f94
added system test
daniel-sanche May 18, 2023
8af5c71
added test for large values
daniel-sanche May 18, 2023
1242836
allow string for append value rule
daniel-sanche May 18, 2023
afe839c
added append value system test
daniel-sanche May 18, 2023
d0781d0
added chained value system test
daniel-sanche May 18, 2023
ef30977
support creating SetValueMutation with int
daniel-sanche May 19, 2023
6140acb
remove aborted from retryable errors
daniel-sanche May 22, 2023
36ba2b6
improved SetCell mutation
daniel-sanche May 22, 2023
b3c9017
fixed mutations tests
daniel-sanche May 22, 2023
cac9e2d
SetCell timestamps use millisecond precision
daniel-sanche May 22, 2023
34b051f
renamed BulkMutationsEntry to RowMutationEntry
daniel-sanche May 22, 2023
baf3378
implemented check_and_mutate
daniel-sanche May 22, 2023
bad11e5
added system tests
daniel-sanche May 22, 2023
1d79202
fixed test issues
daniel-sanche May 23, 2023
63ac35c
Merge branch 'v3' into mutate_rows
daniel-sanche May 24, 2023
4138c89
Merge branch 'mutate_rows' into mutate_rows_other_rpcs
daniel-sanche May 24, 2023
3c27fb7
Merge branch 'v3' into mutate_rows_other_rpcs
daniel-sanche Jun 7, 2023
b9b9dac
adjusted tests; require kwargs for check_and_mutate
daniel-sanche Jun 7, 2023
234ea6c
added metadata
daniel-sanche Jun 7, 2023
fb818d4
clean up
daniel-sanche Jun 8, 2023
c9cebc2
changed timeout values
daniel-sanche Jun 14, 2023
ef8879e
Merge branch 'v3' into mutate_rows_other_rpcs
daniel-sanche Jun 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
added bulk_mutate_rows unit tests
  • Loading branch information
daniel-sanche committed Apr 25, 2023
commit 504d2d8f8fd8d2d40077ebf35cafd3227b87243e
217 changes: 209 additions & 8 deletions tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -965,20 +965,221 @@ def _make_client(self, *args, **kwargs):

return BigtableDataClient(*args, **kwargs)

async def _mock_response(self, response_list):
from google.cloud.bigtable_v2.types import MutateRowsResponse
from google.rpc import status_pb2
statuses = []
for response in response_list:
if isinstance(response, core_exceptions.GoogleAPICallError):
statuses.append(status_pb2.Status(message=str(response), code=response.grpc_status_code.value[0]))
else:
statuses.append(status_pb2.Status(code=0))
entries = [MutateRowsResponse.Entry(index=i, status=statuses[i]) for i in range(len(response_list))]
async def generator():
yield MutateRowsResponse(entries=entries)
return generator()

@pytest.mark.asyncio
@pytest.mark.asyncio
def test_bulk_mutate_rows(self):
@pytest.mark.parametrize(
"mutation_arg",
[
[mutations.SetCell("family", b"qualifier", b"value")],
[mutations.SetCell(
"family", b"qualifier", b"value", timestamp_micros=1234567890
)],
[mutations.DeleteRangeFromColumn("family", b"qualifier")],
[mutations.DeleteAllFromFamily("family")],
[mutations.DeleteAllFromRow()],
[mutations.SetCell("family", b"qualifier", b"value")],
[
mutations.DeleteRangeFromColumn("family", b"qualifier"),
mutations.DeleteAllFromRow(),
],
],
)
async def test_bulk_mutate_rows(self, mutation_arg):
"""Test mutations with no errors"""
pass
expected_per_request_timeout = 19
async with self._make_client(project="project") as client:
async with client.get_table("instance", "table") as table:
with mock.patch.object(
client._gapic_client, "mutate_rows"
) as mock_gapic:
mock_gapic.return_value = self._mock_response([None])
bulk_mutation = mutations.BulkMutationsEntry(b"row_key", mutation_arg)
await table.bulk_mutate_rows(
[bulk_mutation],
per_request_timeout=expected_per_request_timeout,
)
assert mock_gapic.call_count == 1
request = mock_gapic.call_args[0][0]
assert (
request["table_name"]
== "projects/project/instances/instance/tables/table"
)
assert request["entries"] == [bulk_mutation._to_dict()]
found_per_request_timeout = mock_gapic.call_args[1]["timeout"]
assert found_per_request_timeout == expected_per_request_timeout


@pytest.mark.asyncio
def test_bulk_mutate_rows_retryable_errors(self):
pass
async def test_bulk_mutate_rows_multiple_entries(self):
"""Test mutations with no errors"""
async with self._make_client(project="project") as client:
async with client.get_table("instance", "table") as table:
with mock.patch.object(
client._gapic_client, "mutate_rows"
) as mock_gapic:
mock_gapic.return_value = self._mock_response([None, None])
mutation_list = [mutations.DeleteAllFromRow()]
entry_1 = mutations.BulkMutationsEntry(b"row_key_1", mutation_list)
entry_2 = mutations.BulkMutationsEntry(b"row_key_2", mutation_list)
await table.bulk_mutate_rows(
[entry_1, entry_2],
)
assert mock_gapic.call_count == 1
request = mock_gapic.call_args[0][0]
assert (
request["table_name"]
== "projects/project/instances/instance/tables/table"
)
assert request["entries"][0] == entry_1._to_dict()
assert request["entries"][1] == entry_2._to_dict()

@pytest.mark.asyncio
def test_bulk_mutate_rows_non_retryable_errors(self):
pass
@pytest.mark.parametrize(
"exception",
[
core_exceptions.DeadlineExceeded,
core_exceptions.ServiceUnavailable,
core_exceptions.Aborted,
core_exceptions.OutOfRange,
core_exceptions.NotFound,
core_exceptions.FailedPrecondition,
],
)
async def test_bulk_mutate_rows_idempotent_mutation_error_retries(self, exception):
"""
Individual idempotent mutations should be retried if they fail with any error
"""
from google.api_core.exceptions import DeadlineExceeded
from google.cloud.bigtable.exceptions import RetryExceptionGroup, FailedMutationEntryError, MutationsExceptionGroup

async with self._make_client(project="project") as client:
async with client.get_table("instance", "table") as table:
with mock.patch.object(
client._gapic_client, "mutate_rows"
) as mock_gapic:
mock_gapic.side_effect = lambda *a, **k: self._mock_response([exception("mock")])
with pytest.raises(MutationsExceptionGroup) as e:
mutation = mutations.DeleteAllFromRow()
entry = mutations.BulkMutationsEntry(b"row_key", [mutation])
assert mutation.is_idempotent() is True
await table.bulk_mutate_rows([entry], operation_timeout=0.05)
assert len(e.value.exceptions) == 1
failed_exception = e.value.exceptions[0]
assert "non-idempotent" not in str(failed_exception)
assert isinstance(failed_exception, FailedMutationEntryError)
cause = failed_exception.__cause__
assert isinstance(cause, RetryExceptionGroup)
assert isinstance(cause.exceptions[0], exception)
# last exception should be due to retry timeout
assert isinstance(cause.exceptions[-1], core_exceptions.DeadlineExceeded)

@pytest.mark.parametrize(
"retryable_exception",
[
core_exceptions.DeadlineExceeded,
core_exceptions.ServiceUnavailable,
core_exceptions.Aborted,
],
)
@pytest.mark.asyncio
def test_bulk_mutate_rows_idempotent(self):
async def test_bulk_mutate_idempotent_retryable_errors(self, retryable_exception):
"""
Individual idempotent mutations should be retried if the request fails with a retryable error
"""
from google.cloud.bigtable.exceptions import RetryExceptionGroup, FailedMutationEntryError, MutationsExceptionGroup
async with self._make_client(project="project") as client:
async with client.get_table("instance", "table") as table:
with mock.patch.object(
client._gapic_client, "mutate_rows"
) as mock_gapic:
mock_gapic.side_effect = retryable_exception("mock")
with pytest.raises(MutationsExceptionGroup) as e:
mutation = mutations.SetCell("family", b"qualifier", b"value", timestamp_micros=123)
entry = mutations.BulkMutationsEntry(b"row_key", [mutation])
assert mutation.is_idempotent() is True
await table.bulk_mutate_rows([entry], operation_timeout=0.05)
assert len(e.value.exceptions) == 1
failed_exception = e.value.exceptions[0]
assert isinstance(failed_exception, FailedMutationEntryError)
assert "non-idempotent" not in str(failed_exception)
cause = failed_exception.__cause__
assert isinstance(cause, RetryExceptionGroup)
assert isinstance(cause.exceptions[0], retryable_exception)

@pytest.mark.asyncio
@pytest.mark.parametrize(
"retryable_exception",
[
core_exceptions.DeadlineExceeded,
core_exceptions.ServiceUnavailable,
core_exceptions.Aborted,
],
)
async def test_bulk_mutate_rows_idempotent_retryable_errors(self, retryable_exception):
"""Idempotent mutations should never be retried"""
pass
from google.cloud.bigtable.exceptions import RetryExceptionGroup, FailedMutationEntryError, MutationsExceptionGroup
async with self._make_client(project="project") as client:
async with client.get_table("instance", "table") as table:
with mock.patch.object(
client._gapic_client, "mutate_rows"
) as mock_gapic:
mock_gapic.side_effect = lambda *a, **k: self._mock_response([retryable_exception("mock")])
with pytest.raises(MutationsExceptionGroup) as e:
mutation = mutations.SetCell("family", b"qualifier", b"value")
entry = mutations.BulkMutationsEntry(b"row_key", [mutation])
assert mutation.is_idempotent() is False
await table.bulk_mutate_rows([entry], operation_timeout=0.2)
assert len(e.value.exceptions) == 1
failed_exception = e.value.exceptions[0]
assert isinstance(failed_exception, FailedMutationEntryError)
assert "non-idempotent" in str(failed_exception)
cause = failed_exception.__cause__
assert isinstance(cause, retryable_exception)

@pytest.mark.parametrize(
"non_retryable_exception",
[
core_exceptions.OutOfRange,
core_exceptions.NotFound,
core_exceptions.FailedPrecondition,
RuntimeError,
ValueError,
],
)
@pytest.mark.asyncio
async def test_bulk_mutate_rows_non_retryable_errors(self, non_retryable_exception):
"""
If the request fails with a non-retryable error, mutations should not be retried
"""
from google.cloud.bigtable.exceptions import RetryExceptionGroup, FailedMutationEntryError, MutationsExceptionGroup
async with self._make_client(project="project") as client:
async with client.get_table("instance", "table") as table:
with mock.patch.object(
client._gapic_client, "mutate_rows"
) as mock_gapic:
mock_gapic.side_effect = non_retryable_exception("mock")
with pytest.raises(MutationsExceptionGroup) as e:
mutation = mutations.SetCell("family", b"qualifier", b"value", timestamp_micros=123)
entry = mutations.BulkMutationsEntry(b"row_key", [mutation])
assert mutation.is_idempotent() is True
await table.bulk_mutate_rows([entry], operation_timeout=0.2)
assert len(e.value.exceptions) == 1
failed_exception = e.value.exceptions[0]
assert isinstance(failed_exception, FailedMutationEntryError)
assert "non-idempotent" not in str(failed_exception)
cause = failed_exception.__cause__
assert isinstance(cause, non_retryable_exception)