Skip to content

Commit

Permalink
Merge pull request #449 from tseaver/305-lookup-run_query-eventual
Browse files Browse the repository at this point in the history
Add 'eventual' flag to 'Connection.lookup()' / 'Connection.run_query()'.
  • Loading branch information
tseaver committed Dec 19, 2014
2 parents 6b71b1a + a2a11bd commit 5723288
Show file tree
Hide file tree
Showing 2 changed files with 240 additions and 22 deletions.
91 changes: 69 additions & 22 deletions gcloud/datastore/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ def dataset(self, *args, **kwargs):
kwargs['connection'] = self
return Dataset(*args, **kwargs)

def lookup(self, dataset_id, key_pbs, missing=None, deferred=None):
def lookup(self, dataset_id, key_pbs,
missing=None, deferred=None, eventual=False):
"""Lookup keys from a dataset in the Cloud Datastore.
Maps the ``DatastoreService.Lookup`` protobuf RPC.
Expand Down Expand Up @@ -211,13 +212,20 @@ def lookup(self, dataset_id, key_pbs, missing=None, deferred=None):
by the backend as "deferred" will be copied into it.
Use only as a keyword param.
:type eventual: bool
:param eventual: If False (the default), request ``STRONG`` read
consistency. If True, request ``EVENTUAL`` read
consistency. If the connection has a current
transaction, this value *must* be false.
:rtype: list of :class:`gcloud.datastore.datastore_v1_pb2.Entity`
(or a single Entity)
:returns: The entities corresponding to the keys provided.
If a single key was provided and no results matched,
this will return None.
If multiple keys were provided and no results matched,
this will return an empty list.
:raises: ValueError if ``eventual`` is True
"""
if missing is not None and missing != []:
raise ValueError('missing must be None or an empty list')
Expand All @@ -226,6 +234,7 @@ def lookup(self, dataset_id, key_pbs, missing=None, deferred=None):
raise ValueError('deferred must be None or an empty list')

lookup_request = datastore_pb.LookupRequest()
self._set_read_options(lookup_request, eventual)

single_key = isinstance(key_pbs, datastore_pb.Key)

Expand All @@ -235,28 +244,14 @@ def lookup(self, dataset_id, key_pbs, missing=None, deferred=None):
for key_pb in key_pbs:
lookup_request.key.add().CopyFrom(key_pb)

results = []
while True: # loop against possible deferred.
lookup_response = self._rpc(dataset_id, 'lookup', lookup_request,
datastore_pb.LookupResponse)
results, missing_found, deferred_found = self._lookup(
lookup_request, dataset_id, deferred is not None)

results.extend(
[result.entity for result in lookup_response.found])
if missing is not None:
missing.extend(missing_found)

if missing is not None:
missing.extend(
[result.entity for result in lookup_response.missing])

if deferred is not None:
deferred.extend([key for key in lookup_response.deferred])
break

if not lookup_response.deferred:
break

# We have deferred keys, and the user didn't ask to know about
# them, so retry (but only with the deferred ones).
_copy_deferred_keys(lookup_request, lookup_response)
if deferred is not None:
deferred.extend(deferred_found)

if single_key:
if results:
Expand All @@ -266,7 +261,7 @@ def lookup(self, dataset_id, key_pbs, missing=None, deferred=None):

return results

def run_query(self, dataset_id, query_pb, namespace=None):
def run_query(self, dataset_id, query_pb, namespace=None, eventual=False):
"""Run a query on the Cloud Datastore.
Maps the ``DatastoreService.RunQuery`` protobuf RPC.
Expand Down Expand Up @@ -310,8 +305,15 @@ def run_query(self, dataset_id, query_pb, namespace=None):
:type namespace: string
:param namespace: The namespace over which to run the query.
:type eventual: bool
:param eventual: If False (the default), request ``STRONG`` read
consistency. If True, request ``EVENTUAL`` read
consistency. If the connection has a current
transaction, this value *must* be false.
"""
request = datastore_pb.RunQueryRequest()
self._set_read_options(request, eventual)

if namespace:
request.partition_id.namespace = namespace
Expand Down Expand Up @@ -514,6 +516,51 @@ def delete_entities(self, dataset_id, key_pbs):

return True

def _lookup(self, lookup_request, dataset_id, stop_on_deferred):
"""Repeat lookup until all keys found (unless stop requested).
Helper method for ``lookup()``.
"""
results = []
missing = []
deferred = []
while True: # loop against possible deferred.
lookup_response = self._rpc(dataset_id, 'lookup', lookup_request,
datastore_pb.LookupResponse)

results.extend(
[result.entity for result in lookup_response.found])

missing.extend(
[result.entity for result in lookup_response.missing])

if stop_on_deferred:
deferred.extend([key for key in lookup_response.deferred])
break

if not lookup_response.deferred:
break

# We have deferred keys, and the user didn't ask to know about
# them, so retry (but only with the deferred ones).
_copy_deferred_keys(lookup_request, lookup_response)
return results, missing, deferred

def _set_read_options(self, request, eventual):
"""Validate rules for read options, and assign to the request.
Helper method for ``lookup()`` and ``run_query``.
"""
transaction = self.transaction()
if eventual and transaction:
raise ValueError('eventual must be False when in a transaction')

opts = request.read_options
if eventual:
opts.read_consistency = datastore_pb.ReadOptions.EVENTUAL
elif transaction:
opts.transaction = transaction


def _copy_deferred_keys(lookup_request, lookup_response):
"""Clear requested keys and copy deferred keys back in.
Expand Down
171 changes: 171 additions & 0 deletions gcloud/datastore/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,77 @@ def test_lookup_single_key_empty_response(self):
self.assertEqual(len(keys), 1)
self.assertEqual(keys[0], key_pb)

def test_lookup_single_key_empty_response_w_eventual(self):
from gcloud.datastore.connection import datastore_pb
from gcloud.datastore.key import Key

DATASET_ID = 'DATASET'
key_pb = Key(path=[{'kind': 'Kind', 'id': 1234}]).to_protobuf()
rsp_pb = datastore_pb.LookupResponse()
conn = self._makeOne()
URI = '/'.join([
conn.API_BASE_URL,
'datastore',
conn.API_VERSION,
'datasets',
DATASET_ID,
'lookup',
])
http = conn._http = Http({'status': '200'}, rsp_pb.SerializeToString())
self.assertEqual(conn.lookup(DATASET_ID, key_pb, eventual=True), None)
cw = http._called_with
self._verifyProtobufCall(cw, URI, conn)
rq_class = datastore_pb.LookupRequest
request = rq_class()
request.ParseFromString(cw['body'])
keys = list(request.key)
self.assertEqual(len(keys), 1)
self.assertEqual(keys[0], key_pb)
self.assertEqual(request.read_options.read_consistency,
datastore_pb.ReadOptions.EVENTUAL)
self.assertEqual(request.read_options.transaction, '')

def test_lookup_single_key_empty_response_w_eventual_and_transaction(self):
from gcloud.datastore.key import Key

DATASET_ID = 'DATASET'
TRANSACTION = 'TRANSACTION'
key_pb = Key(path=[{'kind': 'Kind', 'id': 1234}]).to_protobuf()
conn = self._makeOne()
conn.transaction(TRANSACTION)
self.assertRaises(
ValueError, conn.lookup, DATASET_ID, key_pb, eventual=True)

def test_lookup_single_key_empty_response_w_transaction(self):
from gcloud.datastore.connection import datastore_pb
from gcloud.datastore.key import Key

DATASET_ID = 'DATASET'
TRANSACTION = 'TRANSACTION'
key_pb = Key(path=[{'kind': 'Kind', 'id': 1234}]).to_protobuf()
rsp_pb = datastore_pb.LookupResponse()
conn = self._makeOne()
conn.transaction(TRANSACTION)
URI = '/'.join([
conn.API_BASE_URL,
'datastore',
conn.API_VERSION,
'datasets',
DATASET_ID,
'lookup',
])
http = conn._http = Http({'status': '200'}, rsp_pb.SerializeToString())
self.assertEqual(conn.lookup(DATASET_ID, key_pb), None)
cw = http._called_with
self._verifyProtobufCall(cw, URI, conn)
rq_class = datastore_pb.LookupRequest
request = rq_class()
request.ParseFromString(cw['body'])
keys = list(request.key)
self.assertEqual(len(keys), 1)
self.assertEqual(keys[0], key_pb)
self.assertEqual(request.read_options.transaction, TRANSACTION)

def test_lookup_single_key_nonempty_response(self):
from gcloud.datastore.connection import datastore_pb
from gcloud.datastore.key import Key
Expand Down Expand Up @@ -443,6 +514,106 @@ def test_lookup_multiple_keys_w_deferred_from_backend_but_not_passed(self):
self.assertEqual(len(keys), 1)
self.assertEqual(keys[0], key_pb2)

def test_run_query_w_eventual_no_transaction(self):
from gcloud.datastore.connection import datastore_pb
from gcloud.datastore.query import Query

DATASET_ID = 'DATASET'
KIND = 'Nonesuch'
CURSOR = b'\x00'
q_pb = Query(KIND, DATASET_ID).to_protobuf()
rsp_pb = datastore_pb.RunQueryResponse()
rsp_pb.batch.end_cursor = CURSOR
no_more = datastore_pb.QueryResultBatch.NO_MORE_RESULTS
rsp_pb.batch.more_results = no_more
rsp_pb.batch.entity_result_type = datastore_pb.EntityResult.FULL
conn = self._makeOne()
URI = '/'.join([
conn.API_BASE_URL,
'datastore',
conn.API_VERSION,
'datasets',
DATASET_ID,
'runQuery',
])
http = conn._http = Http({'status': '200'}, rsp_pb.SerializeToString())
pbs, end, more, skipped = conn.run_query(DATASET_ID, q_pb,
eventual=True)
self.assertEqual(pbs, [])
self.assertEqual(end, CURSOR)
self.assertTrue(more)
self.assertEqual(skipped, 0)
cw = http._called_with
self._verifyProtobufCall(cw, URI, conn)
rq_class = datastore_pb.RunQueryRequest
request = rq_class()
request.ParseFromString(cw['body'])
self.assertEqual(request.partition_id.namespace, '')
self.assertEqual(request.query, q_pb)
self.assertEqual(request.read_options.read_consistency,
datastore_pb.ReadOptions.EVENTUAL)
self.assertEqual(request.read_options.transaction, '')

def test_run_query_wo_eventual_w_transaction(self):
from gcloud.datastore.connection import datastore_pb
from gcloud.datastore.query import Query

DATASET_ID = 'DATASET'
KIND = 'Nonesuch'
CURSOR = b'\x00'
TRANSACTION = 'TRANSACTION'
q_pb = Query(KIND, DATASET_ID).to_protobuf()
rsp_pb = datastore_pb.RunQueryResponse()
rsp_pb.batch.end_cursor = CURSOR
no_more = datastore_pb.QueryResultBatch.NO_MORE_RESULTS
rsp_pb.batch.more_results = no_more
rsp_pb.batch.entity_result_type = datastore_pb.EntityResult.FULL
conn = self._makeOne()
conn.transaction(TRANSACTION)
URI = '/'.join([
conn.API_BASE_URL,
'datastore',
conn.API_VERSION,
'datasets',
DATASET_ID,
'runQuery',
])
http = conn._http = Http({'status': '200'}, rsp_pb.SerializeToString())
pbs, end, more, skipped = conn.run_query(DATASET_ID, q_pb)
self.assertEqual(pbs, [])
self.assertEqual(end, CURSOR)
self.assertTrue(more)
self.assertEqual(skipped, 0)
cw = http._called_with
self._verifyProtobufCall(cw, URI, conn)
rq_class = datastore_pb.RunQueryRequest
request = rq_class()
request.ParseFromString(cw['body'])
self.assertEqual(request.partition_id.namespace, '')
self.assertEqual(request.query, q_pb)
self.assertEqual(request.read_options.read_consistency,
datastore_pb.ReadOptions.DEFAULT)
self.assertEqual(request.read_options.transaction, TRANSACTION)

def test_run_query_w_eventual_and_transaction(self):
from gcloud.datastore.connection import datastore_pb
from gcloud.datastore.query import Query

DATASET_ID = 'DATASET'
KIND = 'Nonesuch'
CURSOR = b'\x00'
TRANSACTION = 'TRANSACTION'
q_pb = Query(KIND, DATASET_ID).to_protobuf()
rsp_pb = datastore_pb.RunQueryResponse()
rsp_pb.batch.end_cursor = CURSOR
no_more = datastore_pb.QueryResultBatch.NO_MORE_RESULTS
rsp_pb.batch.more_results = no_more
rsp_pb.batch.entity_result_type = datastore_pb.EntityResult.FULL
conn = self._makeOne()
conn.transaction(TRANSACTION)
self.assertRaises(
ValueError, conn.run_query, DATASET_ID, q_pb, eventual=True)

def test_run_query_wo_namespace_empty_result(self):
from gcloud.datastore.connection import datastore_pb
from gcloud.datastore.query import Query
Expand Down

0 comments on commit 5723288

Please sign in to comment.