Skip to content

Commit

Permalink
Push batch / transaction onto a thread-local stack inside 'with'.
Browse files Browse the repository at this point in the history
Prep. for #514.
  • Loading branch information
tseaver committed Jan 8, 2015
1 parent 77cfe10 commit e5e4518
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 28 deletions.
69 changes: 67 additions & 2 deletions gcloud/datastore/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,55 @@
# limitations under the License.

"""Create / interact with a batch of updates / deletes."""
try:
from threading import local as Local
except ImportError: # pragma: NO COVER (who doesn't have it?)
class Local(object):
"""Placeholder for non-threaded applications."""

from gcloud.datastore import _implicit_environ
from gcloud.datastore import datastore_v1_pb2 as datastore_pb


class _Batches(Local):
"""Manage a thread-local LIFO stack of active batches / transactions.
Intended for use only in :class:`gcloud.datastore.batch.Batch.__enter__`
"""
def __init__(self):
super(_Batches, self).__init__()
self._stack = []

@property
def stack(self):
"""Return a copy of our stack."""
return self._stack[:]

def _push_batch(self, batch):
"""Push a batch onto our stack.
Intended for use only in :meth:`gcloud.datastore.batch.Batch.__enter__`
:type batch: :class:`gcloud.datastore.batch.Batch` or
:class:`gcloud.datastore.transaction.Transaction`
"""
self._stack.append(batch)

def _pop_batch(self):
"""Pop a batch onto our stack.
Intended for use only in :meth:`gcloud.datastore.batch.Batch.__enter__`
:rtype: :class:`gcloud.datastore.batch.Batch` or
:class:`gcloud.datastore.transaction.Transaction`
:raises: IndexError if the stack is empty.
"""
return self._stack.pop()


_BATCHES = _Batches()


class Batch(object):
"""An abstraction representing a collected group of updates / deletes.
Expand Down Expand Up @@ -180,6 +224,13 @@ def delete(self, key):
self.connection.delete_entities(
self.dataset_id, [key_pb], mutation=self.mutation)

def begin(self):
"""No-op
Overridden by :class:`gcloud.datastore.transaction.Transaction`.
"""
pass

def commit(self):
"""Commits the batch.
Expand All @@ -197,9 +248,23 @@ def commit(self):
new_id = new_key_pb.path_element[-1].id
entity.key = entity.key.completed_key(new_id)

def rollback(self):
"""No-op
Overridden by :class:`gcloud.datastore.transaction.Transaction`.
"""
pass

def __enter__(self):
_BATCHES._push_batch(self)
self.begin()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is None:
self.commit()
try:
if exc_type is None:
self.commit()
else:
self.rollback()
finally:
_BATCHES._pop_batch()
76 changes: 62 additions & 14 deletions gcloud/datastore/test_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def test_put_entity_w_partial_key(self):

self.assertEqual(
connection._saved,
(_DATASET, key._key, _PROPERTIES, (), batch.mutation))
[(_DATASET, key._key, _PROPERTIES, (), batch.mutation)])
self.assertEqual(batch._auto_id_entities, [entity])

def test_put_entity_w_completed_key(self):
Expand All @@ -121,7 +121,7 @@ def test_put_entity_w_completed_key(self):

self.assertEqual(
connection._saved,
(_DATASET, key._key, _PROPERTIES, (), batch.mutation))
[(_DATASET, key._key, _PROPERTIES, (), batch.mutation)])

def test_delete_w_partial_key(self):
_DATASET = 'DATASET'
Expand All @@ -142,7 +142,7 @@ def test_delete_w_completed_key(self):

self.assertEqual(
connection._deleted,
(_DATASET, [key._key], batch.mutation))
[(_DATASET, [key._key], batch.mutation)])

def test_commit(self):
_DATASET = 'DATASET'
Expand All @@ -151,7 +151,7 @@ def test_commit(self):

batch.commit()

self.assertEqual(connection._committed, (_DATASET, batch.mutation))
self.assertEqual(connection._committed, [(_DATASET, batch.mutation)])

def test_commit_w_auto_id_entities(self):
_DATASET = 'DATASET'
Expand All @@ -165,45 +165,91 @@ def test_commit_w_auto_id_entities(self):

batch.commit()

self.assertEqual(connection._committed, (_DATASET, batch.mutation))
self.assertEqual(connection._committed, [(_DATASET, batch.mutation)])
self.assertFalse(key._partial)
self.assertEqual(key._id, _NEW_ID)

def test_as_context_mgr_wo_error(self):
from gcloud.datastore.batch import _BATCHES
_DATASET = 'DATASET'
_PROPERTIES = {'foo': 'bar'}
connection = _Connection()
entity = _Entity(_PROPERTIES)
key = entity.key = _Key(_DATASET)

self.assertEqual(_BATCHES.stack, [])

with self._makeOne(dataset_id=_DATASET,
connection=connection) as batch:
self.assertEqual(_BATCHES.stack, [batch])
batch.put(entity)

self.assertEqual(_BATCHES.stack, [])

self.assertEqual(
connection._saved,
(_DATASET, key._key, _PROPERTIES, (), batch.mutation))
self.assertEqual(connection._committed, (_DATASET, batch.mutation))
[(_DATASET, key._key, _PROPERTIES, (), batch.mutation)])
self.assertEqual(connection._committed, [(_DATASET, batch.mutation)])

def test_as_context_mgr_nested(self):
from gcloud.datastore.batch import _BATCHES
_DATASET = 'DATASET'
_PROPERTIES = {'foo': 'bar'}
connection = _Connection()
entity1 = _Entity(_PROPERTIES)
key = entity1.key = _Key(_DATASET)
entity2 = _Entity(_PROPERTIES)
key = entity2.key = _Key(_DATASET)

self.assertEqual(_BATCHES.stack, [])

with self._makeOne(dataset_id=_DATASET,
connection=connection) as batch1:
self.assertEqual(_BATCHES.stack, [batch1])
batch1.put(entity1)
with self._makeOne(dataset_id=_DATASET,
connection=connection) as batch2:
self.assertEqual(_BATCHES.stack, [batch1, batch2])
batch2.put(entity2)

self.assertEqual(_BATCHES.stack, [batch1])

self.assertEqual(_BATCHES.stack, [])

self.assertEqual(
connection._saved,
[(_DATASET, key._key, _PROPERTIES, (), batch1.mutation),
(_DATASET, key._key, _PROPERTIES, (), batch2.mutation)]
)
self.assertEqual(connection._committed,
[(_DATASET, batch2.mutation),
(_DATASET, batch1.mutation)])

def test_as_context_mgr_w_error(self):
from gcloud.datastore.batch import _BATCHES
_DATASET = 'DATASET'
_PROPERTIES = {'foo': 'bar'}
connection = _Connection()
entity = _Entity(_PROPERTIES)
key = entity.key = _Key(_DATASET)

self.assertEqual(_BATCHES.stack, [])

try:
with self._makeOne(dataset_id=_DATASET,
connection=connection) as batch:
self.assertEqual(_BATCHES.stack, [batch])
batch.put(entity)
raise ValueError("testing")
except ValueError:
pass

self.assertEqual(_BATCHES.stack, [])

self.assertEqual(
connection._saved,
(_DATASET, key._key, _PROPERTIES, (), batch.mutation))
self.assertEqual(connection._committed, None)
[(_DATASET, key._key, _PROPERTIES, (), batch.mutation)])
self.assertEqual(connection._committed, [])


class _CommitResult(object):
Expand All @@ -226,23 +272,25 @@ def __init__(self, id):

class _Connection(object):
_marker = object()
_committed = _saved = _deleted = None
_save_result = (False, None)

def __init__(self, *new_keys):
self._commit_result = _CommitResult(*new_keys)
self._committed = []
self._saved = []
self._deleted = []

def save_entity(self, dataset_id, key_pb, properties,
exclude_from_indexes=(), mutation=None):
self._saved = (dataset_id, key_pb, properties,
tuple(exclude_from_indexes), mutation)
self._saved.append((dataset_id, key_pb, properties,
tuple(exclude_from_indexes), mutation))
return self._save_result

def delete_entities(self, dataset_id, key_pbs, mutation=None):
self._deleted = (dataset_id, key_pbs, mutation)
self._deleted.append((dataset_id, key_pbs, mutation))

def commit(self, dataset_id, mutation):
self._committed = (dataset_id, mutation)
self._committed.append((dataset_id, mutation))
return self._commit_result


Expand Down
12 changes: 0 additions & 12 deletions gcloud/datastore/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,15 +186,3 @@ def commit(self):

# Clear our own ID in case this gets accidentally reused.
self._id = None

def __enter__(self):
# Don't call super() -- we have different semantics.
self.begin()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
# Don't call super() -- we have different semantics.
if exc_type is None:
self.commit()
else:
self.rollback()

0 comments on commit e5e4518

Please sign in to comment.