From c69a532b64d1d6f98e6ce70a17d2691cba68ec5c Mon Sep 17 00:00:00 2001 From: Chris Rossi Date: Wed, 31 Oct 2018 15:05:02 -0400 Subject: [PATCH 1/9] First pass at ndb.eventloop.EvenLoop implementation. Does not implement RPC integration. --- ndb/src/google/cloud/ndb/eventloop.py | 223 +++++++++++++++++- ndb/tests/unit/test_eventloop.py | 317 +++++++++++++++++++++++++- 2 files changed, 530 insertions(+), 10 deletions(-) diff --git a/ndb/src/google/cloud/ndb/eventloop.py b/ndb/src/google/cloud/ndb/eventloop.py index ac867bec7fe1..64cf9ac41c9d 100644 --- a/ndb/src/google/cloud/ndb/eventloop.py +++ b/ndb/src/google/cloud/ndb/eventloop.py @@ -16,7 +16,8 @@ This should handle both asynchronous ``ndb`` objects and arbitrary callbacks. """ - +import collections +import time __all__ = [ "add_idle", @@ -30,16 +31,228 @@ ] -def add_idle(*args, **kwargs): - raise NotImplementedError +def _noop(*args, **kw): + """Do nothing.""" + +# TODO: Use utils.logging_debug when implemented +_logging_debug = _noop + + +_Event = collections.namedtuple('_Event', ( + 'when', 'callback', 'args', 'kwargs')) + + +class _Clock(object): + """A clock to determine the current time, in seconds.""" + + def now(self): + """Returns the number of seconds since epoch.""" + return time.time() + + def sleep(self, seconds): + """Sleeps for the desired number of seconds.""" + time.sleep(seconds) class EventLoop: - __slots__ = () + """Constructor. + + Args: + clock: an eventloop._Clock object. Defaults to a time-based clock. - def __init__(self, *args, **kwargs): + Fields: + current: a FIFO list of (callback, args, kwds). These callbacks + run immediately when the eventloop runs. + idlers: a FIFO list of (callback, args, kwds). Thes callbacks + run only when no other RPCs need to be fired first. + For example, AutoBatcher uses idler to fire a batch RPC even before + the batch is full. + queue: a sorted list of (absolute time in sec, callback, args, kwds), + sorted by time. These callbacks run only after the said time. + rpcs: a map from rpc to (callback, args, kwds). Callback is called + when the rpc finishes. + """ + __slots__ = ('clock', 'current', 'idlers', 'inactive', 'queue', 'rpcs') + + def __init__(self, clock=None): + self.clock = clock if clock else _Clock() + self.current = collections.deque() + self.idlers = collections.deque() + self.inactive = 0 + self.queue = [] + self.rpcs = {} + + def clear(self): + """Remove all pending events without running any.""" + while self.current or self.idlers or self.queue or self.rpcs: + current = self.current + idlers = self.idlers + queue = self.queue + rpcs = self.rpcs + _logging_debug('Clearing stale EventLoop instance...') + if current: + _logging_debug(' current = %s', current) + if idlers: + _logging_debug(' idlers = %s', idlers) + if queue: + _logging_debug(' queue = %s', queue) + if rpcs: + _logging_debug(' rpcs = %s', rpcs) + self.__init__(self.clock) + current.clear() + idlers.clear() + queue[:] = [] + rpcs.clear() + _logging_debug('Cleared') + + def insort_event_right(self, event): + """Insert event in queue, and keep it sorted by `event.when` assuming + queue is sorted. + + For events with same `event.when`, new events are inserted to the + right, to keep FIFO order. + + Args: + event: a (time in sec since unix epoch, callback, args, kwargs) + tuple. + """ + queue = self.queue + lo = 0 + hi = len(queue) + while lo < hi: + mid = (lo + hi) // 2 + if event.when < queue[mid].when: + hi = mid + else: + lo = mid + 1 + queue.insert(lo, event) + + def queue_call(self, delay, callback, *args, **kwargs): + """Schedule a function call at a specific time in the future.""" + if delay is None: + self.current.append((callback, args, kwargs)) + return + + # Times over a billion seconds are assumed to be absolute + when = self.clock.now() + delay if delay < 1e9 else delay + event = _Event(when, callback, args, kwargs) + self.insort_event_right(event) + + def queue_rpc(self, rpc, callback=None, *args, **kwds): + """Schedule an RPC with an optional callback. + + The caller must have previously sent the call to the service. + The optional callback is called with the remaining arguments. + + NOTE: If the rpc is a MultiRpc, the callback will be called once + for each sub-RPC. TODO: Is this a good idea? + """ + # TODO Integrate with gRPC raise NotImplementedError + def add_idle(self, callback, *args, **kwargs): + """Add an idle callback. + + An idle callback can return True, False or None. These mean: + + - None: remove the callback (don't reschedule) + - False: the callback did no work; reschedule later + - True: the callback did some work; reschedule soon + + If the callback raises an exception, the traceback is logged and + the callback is removed. + """ + self.idlers.append((callback, args, kwargs)) + + def run_idle(self): + """Run one of the idle callbacks. + + Returns: + True if one was called, False if no idle callback was called. + """ + if not self.idlers or self.inactive >= len(self.idlers): + return False + idler = self.idlers.popleft() + callback, args, kwargs = idler + _logging_debug('idler: %s', callback.__name__) + result = callback(*args, **kwargs) + + # See add_idle() for meaning of callback return value. + if result is None: + _logging_debug('idler %s removed', callback.__name__) + else: + if result: + self.inactive = 0 + else: + self.inactive += 1 + self.idlers.append(idler) + return True + + def _run_current(self): + """Run one current item. + + Returns: + True if one was called, False if no callback was called. + """ + if not self.current: + return False + + self.inactive = 0 + callback, args, kwargs = self.current.popleft() + _logging_debug('nowevent: %s', callback.__name__) + callback(*args, **kwargs) + return True + + def run0(self): + """Run one item (a callback or an RPC wait_any). + + Returns: + A time to sleep if something happened (may be 0); + None if all queues are empty. + """ + if self._run_current() or self.run_idle(): + return 0 + + delay = None + if self.queue: + delay = self.queue[0][0] - self.clock.now() + if delay <= 0: + self.inactive = 0 + _, callback, args, kwargs = self.queue.pop(0) + _logging_debug('event: %s', callback.__name__) + callback(*args, **kwargs) + # TODO: What if it raises an exception? + return 0 + + if self.rpcs: + raise NotImplementedError + + return delay + + def run1(self): + """Run one item (a callback or an RPC wait_any) or sleep. + + Returns: + True if something happened; False if all queues are empty. + """ + delay = self.run0() + if delay is None: + return False + if delay > 0: + self.clock.sleep(delay) + return True + + def run(self): + """Run until there's nothing left to do.""" + self.inactive = 0 + while True: + if not self.run1(): + break + + +def add_idle(*args, **kwargs): + raise NotImplementedError + def get_event_loop(*args, **kwargs): raise NotImplementedError diff --git a/ndb/tests/unit/test_eventloop.py b/ndb/tests/unit/test_eventloop.py index f3c17a21be0c..74343daa3af7 100644 --- a/ndb/tests/unit/test_eventloop.py +++ b/ndb/tests/unit/test_eventloop.py @@ -12,7 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +import collections import pytest +import unittest.mock from google.cloud.ndb import eventloop import tests.unit.utils @@ -22,16 +24,304 @@ def test___all__(): tests.unit.utils.verify___all__(eventloop) -def test_add_idle(): - with pytest.raises(NotImplementedError): - eventloop.add_idle() +def _Event(when=0, what='foo', args=(), kw={}): + return eventloop._Event(when, what, args, kw) class TestEventLoop: @staticmethod - def test_constructor(): + def _make_one(**attrs): + """Use DummyClock""" + ev = eventloop.EventLoop(DummyClock()) + for name, value in attrs.items(): + setattr(ev, name, value) + return ev + + @staticmethod + @unittest.mock.patch('google.cloud.ndb.eventloop.time') + def test_constructor(time): + time.time.return_value = 42 + ev = eventloop.EventLoop() + assert ev.clock.now() == 42 + ev.clock.sleep(27) + time.sleep.assert_called_once_with(27) + + assert ev.current == collections.deque() + assert ev.idlers == collections.deque() + assert ev.inactive == 0 + assert ev.queue == [] + assert ev.rpcs == {} + + def test_clear_all(self): + ev = self._make_one() + myclock = ev.clock + ev.current.append('foo') + ev.idlers.append('bar') + ev.queue.append('baz') + ev.rpcs['qux'] = 'quux' + ev.clear() + assert not len(ev.current) + assert not len(ev.idlers) + assert not len(ev.queue) + assert not len(ev.rpcs) + assert ev.clock is myclock + + # idemptotence (branch coverage) + ev.clear() + assert not len(ev.current) + assert not len(ev.idlers) + assert not len(ev.queue) + assert not len(ev.rpcs) + assert ev.clock is myclock + + def test_clear_current(self): + """ For branch coverage. """ + ev = self._make_one() + myclock = ev.clock + ev.current.append('foo') + ev.clear() + assert not len(ev.current) + assert not len(ev.idlers) + assert not len(ev.queue) + assert not len(ev.rpcs) + assert ev.clock is myclock + + def test_clear_idlers(self): + """ For branch coverage. """ + ev = self._make_one() + myclock = ev.clock + ev.idlers.append('foo') + ev.clear() + assert not len(ev.current) + assert not len(ev.idlers) + assert not len(ev.queue) + assert not len(ev.rpcs) + assert ev.clock is myclock + + def test_insert_event_right_empty_queue(self): + ev = self._make_one() + event = _Event() + ev.insort_event_right(event) + assert ev.queue == [event] + + def test_insert_event_right_head(self): + ev = self._make_one( + queue=[_Event(1, 'bar')], + ) + ev.insort_event_right(_Event(0, 'foo')) + assert ev.queue == [ + _Event(0, 'foo'), + _Event(1, 'bar'), + ] + + def test_insert_event_right_tail(self): + ev = self._make_one( + queue=[_Event(0, 'foo')], + ) + ev.insort_event_right(_Event(1, 'bar')) + assert ev.queue == [ + _Event(0, 'foo'), + _Event(1, 'bar'), + ] + + def test_insert_event_right_middle(self): + ev = self._make_one( + queue=[ + _Event(0, 'foo'), + _Event(2, 'baz'), + ], + ) + ev.insort_event_right(_Event(1, 'bar')) + assert ev.queue == [ + _Event(0, 'foo'), + _Event(1, 'bar'), + _Event(2, 'baz'), + ] + + def test_insert_event_right_collision(self): + ev = self._make_one( + queue=[ + _Event(0, 'foo'), + _Event(1, 'bar'), + _Event(2, 'baz'), + ], + ) + ev.insort_event_right(_Event(1, 'barbar')) + assert ev.queue == [ + _Event(0, 'foo'), + _Event(1, 'bar'), + _Event(1, 'barbar'), + _Event(2, 'baz'), + ] + + def test_queue_call_now(self): + ev = self._make_one() + ev.queue_call(None, 'foo', 'bar', baz='qux') + assert list(ev.current) == [ + ('foo', ('bar',), {'baz': 'qux'}), + ] + assert not len(ev.queue) + + def test_queue_call_soon(self): + ev = self._make_one() + ev.clock.time = 5 + ev.queue_call(5, 'foo', 'bar', baz='qux') + assert not len(ev.current) + assert ev.queue == [ + _Event(10, 'foo', ('bar',), {'baz': 'qux'}), + ] + + def test_queue_call_absolute(self): + ev = self._make_one() + ev.clock.time = 5 + ev.queue_call(10e10, 'foo', 'bar', baz='qux') + assert not len(ev.current) + assert ev.queue == [ + _Event(10e10, 'foo', ('bar',), {'baz': 'qux'}), + ] + + def test_queue_rpc(self): + ev = self._make_one() + with pytest.raises(NotImplementedError): + ev.queue_rpc('rpc') + + def test_add_idle(self): + ev = self._make_one() + ev.add_idle('foo', 'bar', baz='qux') + assert list(ev.idlers) == [('foo', ('bar',), {'baz': 'qux'})] + + def test_run_idle_no_idlers(self): + ev = self._make_one() + assert ev.run_idle() is False + + def test_run_idle_all_inactive(self): + ev = self._make_one() + ev.add_idle('foo') + ev.inactive = 1 + assert ev.run_idle() is False + + def test_run_idle_remove_callback(self): + callback = unittest.mock.Mock(__name__='callback') + callback.return_value = None + ev = self._make_one() + ev.add_idle(callback, 'foo', bar='baz') + ev.add_idle('foo') + assert ev.run_idle() is True + callback.assert_called_once_with('foo', bar='baz') + assert len(ev.idlers) == 1 + assert ev.inactive == 0 + + def test_run_idle_did_work(self): + callback = unittest.mock.Mock(__name__='callback') + callback.return_value = True + ev = self._make_one() + ev.add_idle(callback, 'foo', bar='baz') + ev.add_idle('foo') + ev.inactive = 1 + assert ev.run_idle() is True + callback.assert_called_once_with('foo', bar='baz') + assert len(ev.idlers) == 2 + assert ev.inactive == 0 + + def test_run_idle_did_no_work(self): + callback = unittest.mock.Mock(__name__='callback') + callback.return_value = False + ev = self._make_one() + ev.add_idle(callback, 'foo', bar='baz') + ev.add_idle('foo') + ev.inactive = 1 + assert ev.run_idle() is True + callback.assert_called_once_with('foo', bar='baz') + assert len(ev.idlers) == 2 + assert ev.inactive == 2 + + def test_run0_nothing_to_do(self): + ev = self._make_one() + assert ev.run0() is None + + def test_run0_current(self): + callback = unittest.mock.Mock(__name__='callback') + ev = self._make_one() + ev.queue_call(None, callback, 'foo', bar='baz') + ev.inactive = 88 + assert ev.run0() == 0 + callback.assert_called_once_with('foo', bar='baz') + assert len(ev.current) == 0 + assert ev.inactive == 0 + + def test_run0_idler(self): + callback = unittest.mock.Mock(__name__='callback') + ev = self._make_one() + ev.add_idle(callback, 'foo', bar='baz') + assert ev.run0() == 0 + callback.assert_called_once_with('foo', bar='baz') + + def test_run0_next_later(self): + callback = unittest.mock.Mock(__name__='callback') + ev = self._make_one() + ev.queue_call(5, callback, 'foo', bar='baz') + ev.inactive = 88 + assert ev.run0() == 5 + callback.assert_not_called() + assert len(ev.queue) == 1 + assert ev.inactive == 88 + + def test_run0_next_now(self): + callback = unittest.mock.Mock(__name__='callback') + ev = self._make_one() + ev.queue_call(6, 'foo') + ev.queue_call(5, callback, 'foo', bar='baz') + ev.inactive = 88 + ev.clock.time = 10 + assert ev.run0() == 0 + callback.assert_called_once_with('foo', bar='baz') + assert len(ev.queue) == 1 + assert ev.inactive == 0 + + def test_run0_rpc(self): + ev = self._make_one() + ev.rpcs['foo'] = 'bar' with pytest.raises(NotImplementedError): - eventloop.EventLoop() + ev.run0() + + def test_run1_nothing_to_do(self): + ev = self._make_one() + assert ev.run1() is False + + def test_run1_has_work_now(self): + callback = unittest.mock.Mock(__name__='callback') + ev = self._make_one() + ev.queue_call(None, callback) + assert ev.run1() is True + assert ev.clock.slept_for == 0 + callback.assert_called_once_with() + + def test_run1_has_work_later(self): + callback = unittest.mock.Mock(__name__='callback') + ev = self._make_one() + ev.queue_call(5, callback) + assert ev.run1() is True + assert ev.clock.slept_for == 5 + callback.assert_not_called() + + def test_run(self): + idler = unittest.mock.Mock(__name__='idler') + idler.return_value = None + runnow = unittest.mock.Mock(__name__='runnow') + runlater = unittest.mock.Mock(__name__='runlater') + ev = self._make_one() + ev.add_idle(idler) + ev.queue_call(None, runnow) + ev.queue_call(5, runlater) + ev.run() + idler.assert_called_once_with() + runnow.assert_called_once_with() + runlater.assert_called_once_with() + + +def test_add_idle(): + with pytest.raises(NotImplementedError): + eventloop.add_idle() def test_get_event_loop(): @@ -62,3 +352,20 @@ def test_run0(): def test_run1(): with pytest.raises(NotImplementedError): eventloop.run1() + + +class DummyClock: + """Fake out clock class without having to actually read system clock or + sleep during tests. + """ + + def __init__(self, time=0): + self.time = time + self.slept_for = 0 + + def now(self): + return self.time + + def sleep(self, t): + self.time += t + self.slept_for += t From 0bcf385db125ed3b1d0d920c93127506fa96c26f Mon Sep 17 00:00:00 2001 From: Chris Rossi Date: Thu, 1 Nov 2018 09:33:05 -0400 Subject: [PATCH 2/9] Remove TODO, add reference to Issue #6360. --- ndb/src/google/cloud/ndb/eventloop.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/ndb/src/google/cloud/ndb/eventloop.py b/ndb/src/google/cloud/ndb/eventloop.py index 64cf9ac41c9d..eea91cd574ba 100644 --- a/ndb/src/google/cloud/ndb/eventloop.py +++ b/ndb/src/google/cloud/ndb/eventloop.py @@ -31,11 +31,10 @@ ] -def _noop(*args, **kw): - """Do nothing.""" +def _logging_debug(*args, **kw): + """Placeholder. -# TODO: Use utils.logging_debug when implemented -_logging_debug = _noop + See #6360.""" _Event = collections.namedtuple('_Event', ( From 9512caff2315b79c35b0abd4fdd4db6449466c1d Mon Sep 17 00:00:00 2001 From: Chris Rossi Date: Thu, 1 Nov 2018 10:02:38 -0400 Subject: [PATCH 3/9] Use to do dependency injection for time module. --- ndb/src/google/cloud/ndb/eventloop.py | 28 +++------- ndb/tests/unit/test_eventloop.py | 78 +++++++++++---------------- 2 files changed, 37 insertions(+), 69 deletions(-) diff --git a/ndb/src/google/cloud/ndb/eventloop.py b/ndb/src/google/cloud/ndb/eventloop.py index eea91cd574ba..741035fdce17 100644 --- a/ndb/src/google/cloud/ndb/eventloop.py +++ b/ndb/src/google/cloud/ndb/eventloop.py @@ -41,24 +41,9 @@ def _logging_debug(*args, **kw): 'when', 'callback', 'args', 'kwargs')) -class _Clock(object): - """A clock to determine the current time, in seconds.""" - - def now(self): - """Returns the number of seconds since epoch.""" - return time.time() - - def sleep(self, seconds): - """Sleeps for the desired number of seconds.""" - time.sleep(seconds) - - class EventLoop: """Constructor. - Args: - clock: an eventloop._Clock object. Defaults to a time-based clock. - Fields: current: a FIFO list of (callback, args, kwds). These callbacks run immediately when the eventloop runs. @@ -71,10 +56,9 @@ class EventLoop: rpcs: a map from rpc to (callback, args, kwds). Callback is called when the rpc finishes. """ - __slots__ = ('clock', 'current', 'idlers', 'inactive', 'queue', 'rpcs') + __slots__ = ('current', 'idlers', 'inactive', 'queue', 'rpcs') - def __init__(self, clock=None): - self.clock = clock if clock else _Clock() + def __init__(self): self.current = collections.deque() self.idlers = collections.deque() self.inactive = 0 @@ -97,7 +81,7 @@ def clear(self): _logging_debug(' queue = %s', queue) if rpcs: _logging_debug(' rpcs = %s', rpcs) - self.__init__(self.clock) + self.__init__() current.clear() idlers.clear() queue[:] = [] @@ -133,7 +117,7 @@ def queue_call(self, delay, callback, *args, **kwargs): return # Times over a billion seconds are assumed to be absolute - when = self.clock.now() + delay if delay < 1e9 else delay + when = time.time() + delay if delay < 1e9 else delay event = _Event(when, callback, args, kwargs) self.insort_event_right(event) @@ -214,7 +198,7 @@ def run0(self): delay = None if self.queue: - delay = self.queue[0][0] - self.clock.now() + delay = self.queue[0][0] - time.time() if delay <= 0: self.inactive = 0 _, callback, args, kwargs = self.queue.pop(0) @@ -238,7 +222,7 @@ def run1(self): if delay is None: return False if delay > 0: - self.clock.sleep(delay) + time.sleep(delay) return True def run(self): diff --git a/ndb/tests/unit/test_eventloop.py b/ndb/tests/unit/test_eventloop.py index 74343daa3af7..ac8976846914 100644 --- a/ndb/tests/unit/test_eventloop.py +++ b/ndb/tests/unit/test_eventloop.py @@ -31,21 +31,13 @@ def _Event(when=0, what='foo', args=(), kw={}): class TestEventLoop: @staticmethod def _make_one(**attrs): - """Use DummyClock""" - ev = eventloop.EventLoop(DummyClock()) + ev = eventloop.EventLoop() for name, value in attrs.items(): setattr(ev, name, value) return ev - @staticmethod - @unittest.mock.patch('google.cloud.ndb.eventloop.time') - def test_constructor(time): - time.time.return_value = 42 - ev = eventloop.EventLoop() - assert ev.clock.now() == 42 - ev.clock.sleep(27) - time.sleep.assert_called_once_with(27) - + def test_constructor(self): + ev = self._make_one() assert ev.current == collections.deque() assert ev.idlers == collections.deque() assert ev.inactive == 0 @@ -54,7 +46,6 @@ def test_constructor(time): def test_clear_all(self): ev = self._make_one() - myclock = ev.clock ev.current.append('foo') ev.idlers.append('bar') ev.queue.append('baz') @@ -64,7 +55,6 @@ def test_clear_all(self): assert not len(ev.idlers) assert not len(ev.queue) assert not len(ev.rpcs) - assert ev.clock is myclock # idemptotence (branch coverage) ev.clear() @@ -72,31 +62,26 @@ def test_clear_all(self): assert not len(ev.idlers) assert not len(ev.queue) assert not len(ev.rpcs) - assert ev.clock is myclock def test_clear_current(self): """ For branch coverage. """ ev = self._make_one() - myclock = ev.clock ev.current.append('foo') ev.clear() assert not len(ev.current) assert not len(ev.idlers) assert not len(ev.queue) assert not len(ev.rpcs) - assert ev.clock is myclock def test_clear_idlers(self): """ For branch coverage. """ ev = self._make_one() - myclock = ev.clock ev.idlers.append('foo') ev.clear() assert not len(ev.current) assert not len(ev.idlers) assert not len(ev.queue) assert not len(ev.rpcs) - assert ev.clock is myclock def test_insert_event_right_empty_queue(self): ev = self._make_one() @@ -162,18 +147,20 @@ def test_queue_call_now(self): ] assert not len(ev.queue) - def test_queue_call_soon(self): + @unittest.mock.patch('google.cloud.ndb.eventloop.time') + def test_queue_call_soon(self, time): ev = self._make_one() - ev.clock.time = 5 + time.time.return_value = 5 ev.queue_call(5, 'foo', 'bar', baz='qux') assert not len(ev.current) assert ev.queue == [ _Event(10, 'foo', ('bar',), {'baz': 'qux'}), ] - def test_queue_call_absolute(self): + @unittest.mock.patch('google.cloud.ndb.eventloop.time') + def test_queue_call_absolute(self, time): ev = self._make_one() - ev.clock.time = 5 + time.time.return_value = 5 ev.queue_call(10e10, 'foo', 'bar', baz='qux') assert not len(ev.current) assert ev.queue == [ @@ -256,7 +243,9 @@ def test_run0_idler(self): assert ev.run0() == 0 callback.assert_called_once_with('foo', bar='baz') - def test_run0_next_later(self): + @unittest.mock.patch('google.cloud.ndb.eventloop.time') + def test_run0_next_later(self, time): + time.time.return_value = 0 callback = unittest.mock.Mock(__name__='callback') ev = self._make_one() ev.queue_call(5, callback, 'foo', bar='baz') @@ -266,13 +255,15 @@ def test_run0_next_later(self): assert len(ev.queue) == 1 assert ev.inactive == 88 - def test_run0_next_now(self): + @unittest.mock.patch('google.cloud.ndb.eventloop.time') + def test_run0_next_now(self, time): + time.time.return_value = 0 callback = unittest.mock.Mock(__name__='callback') ev = self._make_one() ev.queue_call(6, 'foo') ev.queue_call(5, callback, 'foo', bar='baz') ev.inactive = 88 - ev.clock.time = 10 + time.time.return_value = 10 assert ev.run0() == 0 callback.assert_called_once_with('foo', bar='baz') assert len(ev.queue) == 1 @@ -288,23 +279,33 @@ def test_run1_nothing_to_do(self): ev = self._make_one() assert ev.run1() is False - def test_run1_has_work_now(self): + @unittest.mock.patch('google.cloud.ndb.eventloop.time') + def test_run1_has_work_now(self, time): callback = unittest.mock.Mock(__name__='callback') ev = self._make_one() ev.queue_call(None, callback) assert ev.run1() is True - assert ev.clock.slept_for == 0 + time.sleep.assert_not_called() callback.assert_called_once_with() - def test_run1_has_work_later(self): + @unittest.mock.patch('google.cloud.ndb.eventloop.time') + def test_run1_has_work_later(self, time): + time.time.return_value = 0 callback = unittest.mock.Mock(__name__='callback') ev = self._make_one() ev.queue_call(5, callback) assert ev.run1() is True - assert ev.clock.slept_for == 5 + time.sleep.assert_called_once_with(5) callback.assert_not_called() - def test_run(self): + @unittest.mock.patch('google.cloud.ndb.eventloop.time') + def test_run(self, time): + time.time.return_value = 0 + + def mock_sleep(seconds): + time.time.return_value += seconds + + time.sleep = mock_sleep idler = unittest.mock.Mock(__name__='idler') idler.return_value = None runnow = unittest.mock.Mock(__name__='runnow') @@ -352,20 +353,3 @@ def test_run0(): def test_run1(): with pytest.raises(NotImplementedError): eventloop.run1() - - -class DummyClock: - """Fake out clock class without having to actually read system clock or - sleep during tests. - """ - - def __init__(self, time=0): - self.time = time - self.slept_for = 0 - - def now(self): - return self.time - - def sleep(self, t): - self.time += t - self.slept_for += t From 5ced0a8a73c0752b2c03dbebc7c39c1e7a56257a Mon Sep 17 00:00:00 2001 From: Chris Rossi Date: Thu, 1 Nov 2018 11:09:05 -0400 Subject: [PATCH 4/9] Conform to style guides. --- ndb/src/google/cloud/ndb/eventloop.py | 90 +++++++++++++++++---------- 1 file changed, 56 insertions(+), 34 deletions(-) diff --git a/ndb/src/google/cloud/ndb/eventloop.py b/ndb/src/google/cloud/ndb/eventloop.py index 741035fdce17..94cf372ae91e 100644 --- a/ndb/src/google/cloud/ndb/eventloop.py +++ b/ndb/src/google/cloud/ndb/eventloop.py @@ -42,19 +42,23 @@ def _logging_debug(*args, **kw): class EventLoop: - """Constructor. - - Fields: - current: a FIFO list of (callback, args, kwds). These callbacks - run immediately when the eventloop runs. - idlers: a FIFO list of (callback, args, kwds). Thes callbacks - run only when no other RPCs need to be fired first. - For example, AutoBatcher uses idler to fire a batch RPC even before - the batch is full. - queue: a sorted list of (absolute time in sec, callback, args, kwds), - sorted by time. These callbacks run only after the said time. - rpcs: a map from rpc to (callback, args, kwds). Callback is called - when the rpc finishes. + """An event loop. + + Instances of ``EventLoop`` are used to coordinate single thraded execution + of tasks and RPCs scheduled asynchronously. + + Atrributes: + current (deque): a FIFO list of (callback, args, kwds). These callbacks + run immediately when the eventloop runs. + idlers (deque): a FIFO list of (callback, args, kwds). Thes callbacks + run only when no other RPCs need to be fired first. + For example, AutoBatcher uses idler to fire a batch RPC even before + the batch is full. + queue (list): a sorted list of (absolute time in sec, callback, args, + kwds), sorted by time. These callbacks run only after the said + time. + rpcs (dict): a map from RPC to (callback, args, kwds). Callback is + called when the RPC finishes. """ __slots__ = ('current', 'idlers', 'inactive', 'queue', 'rpcs') @@ -89,34 +93,43 @@ def clear(self): _logging_debug('Cleared') def insort_event_right(self, event): - """Insert event in queue, and keep it sorted by `event.when` assuming - queue is sorted. + """Insert event in queue with sorting. + + This function assumes the queue is already sorted by ``event.when`` and + inserts ``event`` in the queue, maintaining the sort. For events with same `event.when`, new events are inserted to the right, to keep FIFO order. Args: - event: a (time in sec since unix epoch, callback, args, kwargs) - tuple. + event (_Event): The event to insert. """ queue = self.queue - lo = 0 - hi = len(queue) - while lo < hi: - mid = (lo + hi) // 2 + low = 0 + high = len(queue) + while low < high: + mid = (low + high) // 2 if event.when < queue[mid].when: - hi = mid + high = mid else: - lo = mid + 1 - queue.insert(lo, event) + low = mid + 1 + queue.insert(low, event) def queue_call(self, delay, callback, *args, **kwargs): - """Schedule a function call at a specific time in the future.""" + """Schedule a function call at a specific time in the future. + + Arguments: + delay (number): Time in seconds to delay running the callback. + Times over a billion seconds are assumed to be absolute + timestamps rather than delays. + callback (callable): The function to eventually call. + *args: Positional arguments to be passed to callback. + **kwargs: Keyword arguments to be passed to callback. + """ if delay is None: self.current.append((callback, args, kwargs)) return - # Times over a billion seconds are assumed to be absolute when = time.time() + delay if delay < 1e9 else delay event = _Event(when, callback, args, kwargs) self.insort_event_right(event) @@ -127,8 +140,10 @@ def queue_rpc(self, rpc, callback=None, *args, **kwds): The caller must have previously sent the call to the service. The optional callback is called with the remaining arguments. - NOTE: If the rpc is a MultiRpc, the callback will be called once - for each sub-RPC. TODO: Is this a good idea? + .. note:: + + If the rpc is a MultiRpc, the callback will be called once + for each sub-RPC. """ # TODO Integrate with gRPC raise NotImplementedError @@ -136,6 +151,9 @@ def queue_rpc(self, rpc, callback=None, *args, **kwds): def add_idle(self, callback, *args, **kwargs): """Add an idle callback. + An idle callback is a low priority task which is executed when + there aren't other events scheduled for immediate execution. + An idle callback can return True, False or None. These mean: - None: remove the callback (don't reschedule) @@ -144,6 +162,11 @@ def add_idle(self, callback, *args, **kwargs): If the callback raises an exception, the traceback is logged and the callback is removed. + + Arguments: + callback (callable): The function to eventually call. + *args: Positional arguments to be passed to callback. + **kwargs: Keyword arguments to be passed to callback. """ self.idlers.append((callback, args, kwargs)) @@ -151,7 +174,7 @@ def run_idle(self): """Run one of the idle callbacks. Returns: - True if one was called, False if no idle callback was called. + bool: Indicates if an idle calback was called. """ if not self.idlers or self.inactive >= len(self.idlers): return False @@ -175,7 +198,7 @@ def _run_current(self): """Run one current item. Returns: - True if one was called, False if no callback was called. + bool: Indicates if an idle calback was called. """ if not self.current: return False @@ -190,8 +213,8 @@ def run0(self): """Run one item (a callback or an RPC wait_any). Returns: - A time to sleep if something happened (may be 0); - None if all queues are empty. + number: A time to sleep if something happened (may be 0); + None if all queues are empty. """ if self._run_current() or self.run_idle(): return 0 @@ -204,7 +227,6 @@ def run0(self): _, callback, args, kwargs = self.queue.pop(0) _logging_debug('event: %s', callback.__name__) callback(*args, **kwargs) - # TODO: What if it raises an exception? return 0 if self.rpcs: @@ -216,7 +238,7 @@ def run1(self): """Run one item (a callback or an RPC wait_any) or sleep. Returns: - True if something happened; False if all queues are empty. + bool: True if something happened; False if all queues are empty. """ delay = self.run0() if delay is None: From 70dbb85237f4b7ca53e3ac3deba336368e5e61f7 Mon Sep 17 00:00:00 2001 From: Chris Rossi Date: Thu, 1 Nov 2018 14:33:47 -0400 Subject: [PATCH 5/9] Replace TODO with issue in issue tracker and note in Metadata.db --- ndb/MIGRATION_NOTES.md | 3 +++ ndb/src/google/cloud/ndb/eventloop.py | 1 - 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/ndb/MIGRATION_NOTES.md b/ndb/MIGRATION_NOTES.md index 6281ae629bc5..15b47f2c4b7f 100644 --- a/ndb/MIGRATION_NOTES.md +++ b/ndb/MIGRATION_NOTES.md @@ -133,7 +133,10 @@ The primary differences come from: - There is a giant web of module interdependency, so runtime imports (to avoid import cycles) are very common. For example `model.Property` depends on `query` but `query` depends on `model`. +- Will need to sort out dependencies on old RPC implementations and port to + modern gRPC. ([Issue #6363][4]) [1]: https://github.com/GoogleCloudPlatform/datastore-ndb-python/issues/175 [2]: https://github.com/googleapis/google-cloud-python/issues/6317 [3]: https://github.com/googleapis/googleapis/blob/3afba2fd062df0c89ecd62d97f912192b8e0e0ae/google/datastore/v1/entity.proto#L203 +[4]: https://github.com/googleapis/google-cloud-python/issues/6363 diff --git a/ndb/src/google/cloud/ndb/eventloop.py b/ndb/src/google/cloud/ndb/eventloop.py index 94cf372ae91e..171f583f6e10 100644 --- a/ndb/src/google/cloud/ndb/eventloop.py +++ b/ndb/src/google/cloud/ndb/eventloop.py @@ -145,7 +145,6 @@ def queue_rpc(self, rpc, callback=None, *args, **kwds): If the rpc is a MultiRpc, the callback will be called once for each sub-RPC. """ - # TODO Integrate with gRPC raise NotImplementedError def add_idle(self, callback, *args, **kwargs): From 95a94f4bd0753a77890a3b07648b13845c296ac5 Mon Sep 17 00:00:00 2001 From: Chris Rossi Date: Thu, 1 Nov 2018 15:11:49 -0400 Subject: [PATCH 6/9] Style conformance. --- ndb/tests/unit/test_eventloop.py | 43 ++++++++++++++++---------------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/ndb/tests/unit/test_eventloop.py b/ndb/tests/unit/test_eventloop.py index ac8976846914..1386170fec7d 100644 --- a/ndb/tests/unit/test_eventloop.py +++ b/ndb/tests/unit/test_eventloop.py @@ -13,9 +13,10 @@ # limitations under the License. import collections -import pytest import unittest.mock +import pytest + from google.cloud.ndb import eventloop import tests.unit.utils @@ -51,37 +52,35 @@ def test_clear_all(self): ev.queue.append('baz') ev.rpcs['qux'] = 'quux' ev.clear() - assert not len(ev.current) - assert not len(ev.idlers) - assert not len(ev.queue) - assert not len(ev.rpcs) + assert not ev.current + assert not ev.idlers + assert not ev.queue + assert not ev.rpcs # idemptotence (branch coverage) ev.clear() - assert not len(ev.current) - assert not len(ev.idlers) - assert not len(ev.queue) - assert not len(ev.rpcs) + assert not ev.current + assert not ev.idlers + assert not ev.queue + assert not ev.rpcs def test_clear_current(self): - """ For branch coverage. """ ev = self._make_one() ev.current.append('foo') ev.clear() - assert not len(ev.current) - assert not len(ev.idlers) - assert not len(ev.queue) - assert not len(ev.rpcs) + assert not ev.current + assert not ev.idlers + assert not ev.queue + assert not ev.rpcs def test_clear_idlers(self): - """ For branch coverage. """ ev = self._make_one() ev.idlers.append('foo') ev.clear() - assert not len(ev.current) - assert not len(ev.idlers) - assert not len(ev.queue) - assert not len(ev.rpcs) + assert not ev.current + assert not ev.idlers + assert not ev.queue + assert not ev.rpcs def test_insert_event_right_empty_queue(self): ev = self._make_one() @@ -145,14 +144,14 @@ def test_queue_call_now(self): assert list(ev.current) == [ ('foo', ('bar',), {'baz': 'qux'}), ] - assert not len(ev.queue) + assert not ev.queue @unittest.mock.patch('google.cloud.ndb.eventloop.time') def test_queue_call_soon(self, time): ev = self._make_one() time.time.return_value = 5 ev.queue_call(5, 'foo', 'bar', baz='qux') - assert not len(ev.current) + assert not ev.current assert ev.queue == [ _Event(10, 'foo', ('bar',), {'baz': 'qux'}), ] @@ -162,7 +161,7 @@ def test_queue_call_absolute(self, time): ev = self._make_one() time.time.return_value = 5 ev.queue_call(10e10, 'foo', 'bar', baz='qux') - assert not len(ev.current) + assert not ev.current assert ev.queue == [ _Event(10e10, 'foo', ('bar',), {'baz': 'qux'}), ] From 90ad15a7506e22d7ab145750456db5f9b04151a3 Mon Sep 17 00:00:00 2001 From: Chris Rossi Date: Thu, 1 Nov 2018 15:17:55 -0400 Subject: [PATCH 7/9] Ran "nox -s blacken". --- ndb/src/google/cloud/ndb/eventloop.py | 28 +++-- ndb/tests/unit/test_eventloop.py | 167 +++++++++++--------------- 2 files changed, 86 insertions(+), 109 deletions(-) diff --git a/ndb/src/google/cloud/ndb/eventloop.py b/ndb/src/google/cloud/ndb/eventloop.py index 171f583f6e10..39fe8342cfa3 100644 --- a/ndb/src/google/cloud/ndb/eventloop.py +++ b/ndb/src/google/cloud/ndb/eventloop.py @@ -37,8 +37,9 @@ def _logging_debug(*args, **kw): See #6360.""" -_Event = collections.namedtuple('_Event', ( - 'when', 'callback', 'args', 'kwargs')) +_Event = collections.namedtuple( + "_Event", ("when", "callback", "args", "kwargs") +) class EventLoop: @@ -60,7 +61,8 @@ class EventLoop: rpcs (dict): a map from RPC to (callback, args, kwds). Callback is called when the RPC finishes. """ - __slots__ = ('current', 'idlers', 'inactive', 'queue', 'rpcs') + + __slots__ = ("current", "idlers", "inactive", "queue", "rpcs") def __init__(self): self.current = collections.deque() @@ -76,21 +78,21 @@ def clear(self): idlers = self.idlers queue = self.queue rpcs = self.rpcs - _logging_debug('Clearing stale EventLoop instance...') + _logging_debug("Clearing stale EventLoop instance...") if current: - _logging_debug(' current = %s', current) + _logging_debug(" current = %s", current) if idlers: - _logging_debug(' idlers = %s', idlers) + _logging_debug(" idlers = %s", idlers) if queue: - _logging_debug(' queue = %s', queue) + _logging_debug(" queue = %s", queue) if rpcs: - _logging_debug(' rpcs = %s', rpcs) + _logging_debug(" rpcs = %s", rpcs) self.__init__() current.clear() idlers.clear() queue[:] = [] rpcs.clear() - _logging_debug('Cleared') + _logging_debug("Cleared") def insort_event_right(self, event): """Insert event in queue with sorting. @@ -179,12 +181,12 @@ def run_idle(self): return False idler = self.idlers.popleft() callback, args, kwargs = idler - _logging_debug('idler: %s', callback.__name__) + _logging_debug("idler: %s", callback.__name__) result = callback(*args, **kwargs) # See add_idle() for meaning of callback return value. if result is None: - _logging_debug('idler %s removed', callback.__name__) + _logging_debug("idler %s removed", callback.__name__) else: if result: self.inactive = 0 @@ -204,7 +206,7 @@ def _run_current(self): self.inactive = 0 callback, args, kwargs = self.current.popleft() - _logging_debug('nowevent: %s', callback.__name__) + _logging_debug("nowevent: %s", callback.__name__) callback(*args, **kwargs) return True @@ -224,7 +226,7 @@ def run0(self): if delay <= 0: self.inactive = 0 _, callback, args, kwargs = self.queue.pop(0) - _logging_debug('event: %s', callback.__name__) + _logging_debug("event: %s", callback.__name__) callback(*args, **kwargs) return 0 diff --git a/ndb/tests/unit/test_eventloop.py b/ndb/tests/unit/test_eventloop.py index 1386170fec7d..f319f2a9ab9a 100644 --- a/ndb/tests/unit/test_eventloop.py +++ b/ndb/tests/unit/test_eventloop.py @@ -25,7 +25,7 @@ def test___all__(): tests.unit.utils.verify___all__(eventloop) -def _Event(when=0, what='foo', args=(), kw={}): +def _Event(when=0, what="foo", args=(), kw={}): return eventloop._Event(when, what, args, kw) @@ -47,10 +47,10 @@ def test_constructor(self): def test_clear_all(self): ev = self._make_one() - ev.current.append('foo') - ev.idlers.append('bar') - ev.queue.append('baz') - ev.rpcs['qux'] = 'quux' + ev.current.append("foo") + ev.idlers.append("bar") + ev.queue.append("baz") + ev.rpcs["qux"] = "quux" ev.clear() assert not ev.current assert not ev.idlers @@ -66,7 +66,7 @@ def test_clear_all(self): def test_clear_current(self): ev = self._make_one() - ev.current.append('foo') + ev.current.append("foo") ev.clear() assert not ev.current assert not ev.idlers @@ -75,7 +75,7 @@ def test_clear_current(self): def test_clear_idlers(self): ev = self._make_one() - ev.idlers.append('foo') + ev.idlers.append("foo") ev.clear() assert not ev.current assert not ev.idlers @@ -89,92 +89,67 @@ def test_insert_event_right_empty_queue(self): assert ev.queue == [event] def test_insert_event_right_head(self): - ev = self._make_one( - queue=[_Event(1, 'bar')], - ) - ev.insort_event_right(_Event(0, 'foo')) - assert ev.queue == [ - _Event(0, 'foo'), - _Event(1, 'bar'), - ] + ev = self._make_one(queue=[_Event(1, "bar")]) + ev.insort_event_right(_Event(0, "foo")) + assert ev.queue == [_Event(0, "foo"), _Event(1, "bar")] def test_insert_event_right_tail(self): - ev = self._make_one( - queue=[_Event(0, 'foo')], - ) - ev.insort_event_right(_Event(1, 'bar')) - assert ev.queue == [ - _Event(0, 'foo'), - _Event(1, 'bar'), - ] + ev = self._make_one(queue=[_Event(0, "foo")]) + ev.insort_event_right(_Event(1, "bar")) + assert ev.queue == [_Event(0, "foo"), _Event(1, "bar")] def test_insert_event_right_middle(self): - ev = self._make_one( - queue=[ - _Event(0, 'foo'), - _Event(2, 'baz'), - ], - ) - ev.insort_event_right(_Event(1, 'bar')) + ev = self._make_one(queue=[_Event(0, "foo"), _Event(2, "baz")]) + ev.insort_event_right(_Event(1, "bar")) assert ev.queue == [ - _Event(0, 'foo'), - _Event(1, 'bar'), - _Event(2, 'baz'), + _Event(0, "foo"), + _Event(1, "bar"), + _Event(2, "baz"), ] def test_insert_event_right_collision(self): ev = self._make_one( - queue=[ - _Event(0, 'foo'), - _Event(1, 'bar'), - _Event(2, 'baz'), - ], + queue=[_Event(0, "foo"), _Event(1, "bar"), _Event(2, "baz")] ) - ev.insort_event_right(_Event(1, 'barbar')) + ev.insort_event_right(_Event(1, "barbar")) assert ev.queue == [ - _Event(0, 'foo'), - _Event(1, 'bar'), - _Event(1, 'barbar'), - _Event(2, 'baz'), + _Event(0, "foo"), + _Event(1, "bar"), + _Event(1, "barbar"), + _Event(2, "baz"), ] def test_queue_call_now(self): ev = self._make_one() - ev.queue_call(None, 'foo', 'bar', baz='qux') - assert list(ev.current) == [ - ('foo', ('bar',), {'baz': 'qux'}), - ] + ev.queue_call(None, "foo", "bar", baz="qux") + assert list(ev.current) == [("foo", ("bar",), {"baz": "qux"})] assert not ev.queue - @unittest.mock.patch('google.cloud.ndb.eventloop.time') + @unittest.mock.patch("google.cloud.ndb.eventloop.time") def test_queue_call_soon(self, time): ev = self._make_one() time.time.return_value = 5 - ev.queue_call(5, 'foo', 'bar', baz='qux') + ev.queue_call(5, "foo", "bar", baz="qux") assert not ev.current - assert ev.queue == [ - _Event(10, 'foo', ('bar',), {'baz': 'qux'}), - ] + assert ev.queue == [_Event(10, "foo", ("bar",), {"baz": "qux"})] - @unittest.mock.patch('google.cloud.ndb.eventloop.time') + @unittest.mock.patch("google.cloud.ndb.eventloop.time") def test_queue_call_absolute(self, time): ev = self._make_one() time.time.return_value = 5 - ev.queue_call(10e10, 'foo', 'bar', baz='qux') + ev.queue_call(10e10, "foo", "bar", baz="qux") assert not ev.current - assert ev.queue == [ - _Event(10e10, 'foo', ('bar',), {'baz': 'qux'}), - ] + assert ev.queue == [_Event(10e10, "foo", ("bar",), {"baz": "qux"})] def test_queue_rpc(self): ev = self._make_one() with pytest.raises(NotImplementedError): - ev.queue_rpc('rpc') + ev.queue_rpc("rpc") def test_add_idle(self): ev = self._make_one() - ev.add_idle('foo', 'bar', baz='qux') - assert list(ev.idlers) == [('foo', ('bar',), {'baz': 'qux'})] + ev.add_idle("foo", "bar", baz="qux") + assert list(ev.idlers) == [("foo", ("bar",), {"baz": "qux"})] def test_run_idle_no_idlers(self): ev = self._make_one() @@ -182,42 +157,42 @@ def test_run_idle_no_idlers(self): def test_run_idle_all_inactive(self): ev = self._make_one() - ev.add_idle('foo') + ev.add_idle("foo") ev.inactive = 1 assert ev.run_idle() is False def test_run_idle_remove_callback(self): - callback = unittest.mock.Mock(__name__='callback') + callback = unittest.mock.Mock(__name__="callback") callback.return_value = None ev = self._make_one() - ev.add_idle(callback, 'foo', bar='baz') - ev.add_idle('foo') + ev.add_idle(callback, "foo", bar="baz") + ev.add_idle("foo") assert ev.run_idle() is True - callback.assert_called_once_with('foo', bar='baz') + callback.assert_called_once_with("foo", bar="baz") assert len(ev.idlers) == 1 assert ev.inactive == 0 def test_run_idle_did_work(self): - callback = unittest.mock.Mock(__name__='callback') + callback = unittest.mock.Mock(__name__="callback") callback.return_value = True ev = self._make_one() - ev.add_idle(callback, 'foo', bar='baz') - ev.add_idle('foo') + ev.add_idle(callback, "foo", bar="baz") + ev.add_idle("foo") ev.inactive = 1 assert ev.run_idle() is True - callback.assert_called_once_with('foo', bar='baz') + callback.assert_called_once_with("foo", bar="baz") assert len(ev.idlers) == 2 assert ev.inactive == 0 def test_run_idle_did_no_work(self): - callback = unittest.mock.Mock(__name__='callback') + callback = unittest.mock.Mock(__name__="callback") callback.return_value = False ev = self._make_one() - ev.add_idle(callback, 'foo', bar='baz') - ev.add_idle('foo') + ev.add_idle(callback, "foo", bar="baz") + ev.add_idle("foo") ev.inactive = 1 assert ev.run_idle() is True - callback.assert_called_once_with('foo', bar='baz') + callback.assert_called_once_with("foo", bar="baz") assert len(ev.idlers) == 2 assert ev.inactive == 2 @@ -226,51 +201,51 @@ def test_run0_nothing_to_do(self): assert ev.run0() is None def test_run0_current(self): - callback = unittest.mock.Mock(__name__='callback') + callback = unittest.mock.Mock(__name__="callback") ev = self._make_one() - ev.queue_call(None, callback, 'foo', bar='baz') + ev.queue_call(None, callback, "foo", bar="baz") ev.inactive = 88 assert ev.run0() == 0 - callback.assert_called_once_with('foo', bar='baz') + callback.assert_called_once_with("foo", bar="baz") assert len(ev.current) == 0 assert ev.inactive == 0 def test_run0_idler(self): - callback = unittest.mock.Mock(__name__='callback') + callback = unittest.mock.Mock(__name__="callback") ev = self._make_one() - ev.add_idle(callback, 'foo', bar='baz') + ev.add_idle(callback, "foo", bar="baz") assert ev.run0() == 0 - callback.assert_called_once_with('foo', bar='baz') + callback.assert_called_once_with("foo", bar="baz") - @unittest.mock.patch('google.cloud.ndb.eventloop.time') + @unittest.mock.patch("google.cloud.ndb.eventloop.time") def test_run0_next_later(self, time): time.time.return_value = 0 - callback = unittest.mock.Mock(__name__='callback') + callback = unittest.mock.Mock(__name__="callback") ev = self._make_one() - ev.queue_call(5, callback, 'foo', bar='baz') + ev.queue_call(5, callback, "foo", bar="baz") ev.inactive = 88 assert ev.run0() == 5 callback.assert_not_called() assert len(ev.queue) == 1 assert ev.inactive == 88 - @unittest.mock.patch('google.cloud.ndb.eventloop.time') + @unittest.mock.patch("google.cloud.ndb.eventloop.time") def test_run0_next_now(self, time): time.time.return_value = 0 - callback = unittest.mock.Mock(__name__='callback') + callback = unittest.mock.Mock(__name__="callback") ev = self._make_one() - ev.queue_call(6, 'foo') - ev.queue_call(5, callback, 'foo', bar='baz') + ev.queue_call(6, "foo") + ev.queue_call(5, callback, "foo", bar="baz") ev.inactive = 88 time.time.return_value = 10 assert ev.run0() == 0 - callback.assert_called_once_with('foo', bar='baz') + callback.assert_called_once_with("foo", bar="baz") assert len(ev.queue) == 1 assert ev.inactive == 0 def test_run0_rpc(self): ev = self._make_one() - ev.rpcs['foo'] = 'bar' + ev.rpcs["foo"] = "bar" with pytest.raises(NotImplementedError): ev.run0() @@ -278,26 +253,26 @@ def test_run1_nothing_to_do(self): ev = self._make_one() assert ev.run1() is False - @unittest.mock.patch('google.cloud.ndb.eventloop.time') + @unittest.mock.patch("google.cloud.ndb.eventloop.time") def test_run1_has_work_now(self, time): - callback = unittest.mock.Mock(__name__='callback') + callback = unittest.mock.Mock(__name__="callback") ev = self._make_one() ev.queue_call(None, callback) assert ev.run1() is True time.sleep.assert_not_called() callback.assert_called_once_with() - @unittest.mock.patch('google.cloud.ndb.eventloop.time') + @unittest.mock.patch("google.cloud.ndb.eventloop.time") def test_run1_has_work_later(self, time): time.time.return_value = 0 - callback = unittest.mock.Mock(__name__='callback') + callback = unittest.mock.Mock(__name__="callback") ev = self._make_one() ev.queue_call(5, callback) assert ev.run1() is True time.sleep.assert_called_once_with(5) callback.assert_not_called() - @unittest.mock.patch('google.cloud.ndb.eventloop.time') + @unittest.mock.patch("google.cloud.ndb.eventloop.time") def test_run(self, time): time.time.return_value = 0 @@ -305,10 +280,10 @@ def mock_sleep(seconds): time.time.return_value += seconds time.sleep = mock_sleep - idler = unittest.mock.Mock(__name__='idler') + idler = unittest.mock.Mock(__name__="idler") idler.return_value = None - runnow = unittest.mock.Mock(__name__='runnow') - runlater = unittest.mock.Mock(__name__='runlater') + runnow = unittest.mock.Mock(__name__="runnow") + runlater = unittest.mock.Mock(__name__="runlater") ev = self._make_one() ev.add_idle(idler) ev.queue_call(None, runnow) From e981c0cae1cb5b4fa35cfdaa604a3993ce80d059 Mon Sep 17 00:00:00 2001 From: Chris Rossi Date: Fri, 2 Nov 2018 09:32:53 -0400 Subject: [PATCH 8/9] PEP8 --- ndb/tests/unit/test_eventloop.py | 272 +++++++++++++++---------------- 1 file changed, 136 insertions(+), 136 deletions(-) diff --git a/ndb/tests/unit/test_eventloop.py b/ndb/tests/unit/test_eventloop.py index f319f2a9ab9a..7d8d0e9a6e57 100644 --- a/ndb/tests/unit/test_eventloop.py +++ b/ndb/tests/unit/test_eventloop.py @@ -32,87 +32,87 @@ def _Event(when=0, what="foo", args=(), kw={}): class TestEventLoop: @staticmethod def _make_one(**attrs): - ev = eventloop.EventLoop() + loop = eventloop.EventLoop() for name, value in attrs.items(): - setattr(ev, name, value) - return ev + setattr(loop, name, value) + return loop def test_constructor(self): - ev = self._make_one() - assert ev.current == collections.deque() - assert ev.idlers == collections.deque() - assert ev.inactive == 0 - assert ev.queue == [] - assert ev.rpcs == {} + loop = self._make_one() + assert loop.current == collections.deque() + assert loop.idlers == collections.deque() + assert loop.inactive == 0 + assert loop.queue == [] + assert loop.rpcs == {} def test_clear_all(self): - ev = self._make_one() - ev.current.append("foo") - ev.idlers.append("bar") - ev.queue.append("baz") - ev.rpcs["qux"] = "quux" - ev.clear() - assert not ev.current - assert not ev.idlers - assert not ev.queue - assert not ev.rpcs + loop = self._make_one() + loop.current.append("foo") + loop.idlers.append("bar") + loop.queue.append("baz") + loop.rpcs["qux"] = "quux" + loop.clear() + assert not loop.current + assert not loop.idlers + assert not loop.queue + assert not loop.rpcs # idemptotence (branch coverage) - ev.clear() - assert not ev.current - assert not ev.idlers - assert not ev.queue - assert not ev.rpcs + loop.clear() + assert not loop.current + assert not loop.idlers + assert not loop.queue + assert not loop.rpcs def test_clear_current(self): - ev = self._make_one() - ev.current.append("foo") - ev.clear() - assert not ev.current - assert not ev.idlers - assert not ev.queue - assert not ev.rpcs + loop = self._make_one() + loop.current.append("foo") + loop.clear() + assert not loop.current + assert not loop.idlers + assert not loop.queue + assert not loop.rpcs def test_clear_idlers(self): - ev = self._make_one() - ev.idlers.append("foo") - ev.clear() - assert not ev.current - assert not ev.idlers - assert not ev.queue - assert not ev.rpcs + loop = self._make_one() + loop.idlers.append("foo") + loop.clear() + assert not loop.current + assert not loop.idlers + assert not loop.queue + assert not loop.rpcs def test_insert_event_right_empty_queue(self): - ev = self._make_one() + loop = self._make_one() event = _Event() - ev.insort_event_right(event) - assert ev.queue == [event] + loop.insort_event_right(event) + assert loop.queue == [event] def test_insert_event_right_head(self): - ev = self._make_one(queue=[_Event(1, "bar")]) - ev.insort_event_right(_Event(0, "foo")) - assert ev.queue == [_Event(0, "foo"), _Event(1, "bar")] + loop = self._make_one(queue=[_Event(1, "bar")]) + loop.insort_event_right(_Event(0, "foo")) + assert loop.queue == [_Event(0, "foo"), _Event(1, "bar")] def test_insert_event_right_tail(self): - ev = self._make_one(queue=[_Event(0, "foo")]) - ev.insort_event_right(_Event(1, "bar")) - assert ev.queue == [_Event(0, "foo"), _Event(1, "bar")] + loop = self._make_one(queue=[_Event(0, "foo")]) + loop.insort_event_right(_Event(1, "bar")) + assert loop.queue == [_Event(0, "foo"), _Event(1, "bar")] def test_insert_event_right_middle(self): - ev = self._make_one(queue=[_Event(0, "foo"), _Event(2, "baz")]) - ev.insort_event_right(_Event(1, "bar")) - assert ev.queue == [ + loop = self._make_one(queue=[_Event(0, "foo"), _Event(2, "baz")]) + loop.insort_event_right(_Event(1, "bar")) + assert loop.queue == [ _Event(0, "foo"), _Event(1, "bar"), _Event(2, "baz"), ] def test_insert_event_right_collision(self): - ev = self._make_one( + loop = self._make_one( queue=[_Event(0, "foo"), _Event(1, "bar"), _Event(2, "baz")] ) - ev.insort_event_right(_Event(1, "barbar")) - assert ev.queue == [ + loop.insort_event_right(_Event(1, "barbar")) + assert loop.queue == [ _Event(0, "foo"), _Event(1, "bar"), _Event(1, "barbar"), @@ -120,145 +120,145 @@ def test_insert_event_right_collision(self): ] def test_queue_call_now(self): - ev = self._make_one() - ev.queue_call(None, "foo", "bar", baz="qux") - assert list(ev.current) == [("foo", ("bar",), {"baz": "qux"})] - assert not ev.queue + loop = self._make_one() + loop.queue_call(None, "foo", "bar", baz="qux") + assert list(loop.current) == [("foo", ("bar",), {"baz": "qux"})] + assert not loop.queue @unittest.mock.patch("google.cloud.ndb.eventloop.time") def test_queue_call_soon(self, time): - ev = self._make_one() + loop = self._make_one() time.time.return_value = 5 - ev.queue_call(5, "foo", "bar", baz="qux") - assert not ev.current - assert ev.queue == [_Event(10, "foo", ("bar",), {"baz": "qux"})] + loop.queue_call(5, "foo", "bar", baz="qux") + assert not loop.current + assert loop.queue == [_Event(10, "foo", ("bar",), {"baz": "qux"})] @unittest.mock.patch("google.cloud.ndb.eventloop.time") def test_queue_call_absolute(self, time): - ev = self._make_one() + loop = self._make_one() time.time.return_value = 5 - ev.queue_call(10e10, "foo", "bar", baz="qux") - assert not ev.current - assert ev.queue == [_Event(10e10, "foo", ("bar",), {"baz": "qux"})] + loop.queue_call(10e10, "foo", "bar", baz="qux") + assert not loop.current + assert loop.queue == [_Event(10e10, "foo", ("bar",), {"baz": "qux"})] def test_queue_rpc(self): - ev = self._make_one() + loop = self._make_one() with pytest.raises(NotImplementedError): - ev.queue_rpc("rpc") + loop.queue_rpc("rpc") def test_add_idle(self): - ev = self._make_one() - ev.add_idle("foo", "bar", baz="qux") - assert list(ev.idlers) == [("foo", ("bar",), {"baz": "qux"})] + loop = self._make_one() + loop.add_idle("foo", "bar", baz="qux") + assert list(loop.idlers) == [("foo", ("bar",), {"baz": "qux"})] def test_run_idle_no_idlers(self): - ev = self._make_one() - assert ev.run_idle() is False + loop = self._make_one() + assert loop.run_idle() is False def test_run_idle_all_inactive(self): - ev = self._make_one() - ev.add_idle("foo") - ev.inactive = 1 - assert ev.run_idle() is False + loop = self._make_one() + loop.add_idle("foo") + loop.inactive = 1 + assert loop.run_idle() is False def test_run_idle_remove_callback(self): callback = unittest.mock.Mock(__name__="callback") callback.return_value = None - ev = self._make_one() - ev.add_idle(callback, "foo", bar="baz") - ev.add_idle("foo") - assert ev.run_idle() is True + loop = self._make_one() + loop.add_idle(callback, "foo", bar="baz") + loop.add_idle("foo") + assert loop.run_idle() is True callback.assert_called_once_with("foo", bar="baz") - assert len(ev.idlers) == 1 - assert ev.inactive == 0 + assert len(loop.idlers) == 1 + assert loop.inactive == 0 def test_run_idle_did_work(self): callback = unittest.mock.Mock(__name__="callback") callback.return_value = True - ev = self._make_one() - ev.add_idle(callback, "foo", bar="baz") - ev.add_idle("foo") - ev.inactive = 1 - assert ev.run_idle() is True + loop = self._make_one() + loop.add_idle(callback, "foo", bar="baz") + loop.add_idle("foo") + loop.inactive = 1 + assert loop.run_idle() is True callback.assert_called_once_with("foo", bar="baz") - assert len(ev.idlers) == 2 - assert ev.inactive == 0 + assert len(loop.idlers) == 2 + assert loop.inactive == 0 def test_run_idle_did_no_work(self): callback = unittest.mock.Mock(__name__="callback") callback.return_value = False - ev = self._make_one() - ev.add_idle(callback, "foo", bar="baz") - ev.add_idle("foo") - ev.inactive = 1 - assert ev.run_idle() is True + loop = self._make_one() + loop.add_idle(callback, "foo", bar="baz") + loop.add_idle("foo") + loop.inactive = 1 + assert loop.run_idle() is True callback.assert_called_once_with("foo", bar="baz") - assert len(ev.idlers) == 2 - assert ev.inactive == 2 + assert len(loop.idlers) == 2 + assert loop.inactive == 2 def test_run0_nothing_to_do(self): - ev = self._make_one() - assert ev.run0() is None + loop = self._make_one() + assert loop.run0() is None def test_run0_current(self): callback = unittest.mock.Mock(__name__="callback") - ev = self._make_one() - ev.queue_call(None, callback, "foo", bar="baz") - ev.inactive = 88 - assert ev.run0() == 0 + loop = self._make_one() + loop.queue_call(None, callback, "foo", bar="baz") + loop.inactive = 88 + assert loop.run0() == 0 callback.assert_called_once_with("foo", bar="baz") - assert len(ev.current) == 0 - assert ev.inactive == 0 + assert len(loop.current) == 0 + assert loop.inactive == 0 def test_run0_idler(self): callback = unittest.mock.Mock(__name__="callback") - ev = self._make_one() - ev.add_idle(callback, "foo", bar="baz") - assert ev.run0() == 0 + loop = self._make_one() + loop.add_idle(callback, "foo", bar="baz") + assert loop.run0() == 0 callback.assert_called_once_with("foo", bar="baz") @unittest.mock.patch("google.cloud.ndb.eventloop.time") def test_run0_next_later(self, time): time.time.return_value = 0 callback = unittest.mock.Mock(__name__="callback") - ev = self._make_one() - ev.queue_call(5, callback, "foo", bar="baz") - ev.inactive = 88 - assert ev.run0() == 5 + loop = self._make_one() + loop.queue_call(5, callback, "foo", bar="baz") + loop.inactive = 88 + assert loop.run0() == 5 callback.assert_not_called() - assert len(ev.queue) == 1 - assert ev.inactive == 88 + assert len(loop.queue) == 1 + assert loop.inactive == 88 @unittest.mock.patch("google.cloud.ndb.eventloop.time") def test_run0_next_now(self, time): time.time.return_value = 0 callback = unittest.mock.Mock(__name__="callback") - ev = self._make_one() - ev.queue_call(6, "foo") - ev.queue_call(5, callback, "foo", bar="baz") - ev.inactive = 88 + loop = self._make_one() + loop.queue_call(6, "foo") + loop.queue_call(5, callback, "foo", bar="baz") + loop.inactive = 88 time.time.return_value = 10 - assert ev.run0() == 0 + assert loop.run0() == 0 callback.assert_called_once_with("foo", bar="baz") - assert len(ev.queue) == 1 - assert ev.inactive == 0 + assert len(loop.queue) == 1 + assert loop.inactive == 0 def test_run0_rpc(self): - ev = self._make_one() - ev.rpcs["foo"] = "bar" + loop = self._make_one() + loop.rpcs["foo"] = "bar" with pytest.raises(NotImplementedError): - ev.run0() + loop.run0() def test_run1_nothing_to_do(self): - ev = self._make_one() - assert ev.run1() is False + loop = self._make_one() + assert loop.run1() is False @unittest.mock.patch("google.cloud.ndb.eventloop.time") def test_run1_has_work_now(self, time): callback = unittest.mock.Mock(__name__="callback") - ev = self._make_one() - ev.queue_call(None, callback) - assert ev.run1() is True + loop = self._make_one() + loop.queue_call(None, callback) + assert loop.run1() is True time.sleep.assert_not_called() callback.assert_called_once_with() @@ -266,9 +266,9 @@ def test_run1_has_work_now(self, time): def test_run1_has_work_later(self, time): time.time.return_value = 0 callback = unittest.mock.Mock(__name__="callback") - ev = self._make_one() - ev.queue_call(5, callback) - assert ev.run1() is True + loop = self._make_one() + loop.queue_call(5, callback) + assert loop.run1() is True time.sleep.assert_called_once_with(5) callback.assert_not_called() @@ -284,11 +284,11 @@ def mock_sleep(seconds): idler.return_value = None runnow = unittest.mock.Mock(__name__="runnow") runlater = unittest.mock.Mock(__name__="runlater") - ev = self._make_one() - ev.add_idle(idler) - ev.queue_call(None, runnow) - ev.queue_call(5, runlater) - ev.run() + loop = self._make_one() + loop.add_idle(idler) + loop.queue_call(None, runnow) + loop.queue_call(5, runlater) + loop.run() idler.assert_called_once_with() runnow.assert_called_once_with() runlater.assert_called_once_with() From 5c1a2eb8f444eff145786b35e47a747389c8e612 Mon Sep 17 00:00:00 2001 From: Chris Rossi Date: Fri, 2 Nov 2018 13:43:09 -0400 Subject: [PATCH 9/9] Minor doc cleanup. --- ndb/src/google/cloud/ndb/eventloop.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/ndb/src/google/cloud/ndb/eventloop.py b/ndb/src/google/cloud/ndb/eventloop.py index 39fe8342cfa3..472f6d4a7ebd 100644 --- a/ndb/src/google/cloud/ndb/eventloop.py +++ b/ndb/src/google/cloud/ndb/eventloop.py @@ -55,6 +55,9 @@ class EventLoop: run only when no other RPCs need to be fired first. For example, AutoBatcher uses idler to fire a batch RPC even before the batch is full. + inactive (int): Number of consecutive idlers that were noops. Reset + to 0 whenever work is done by any callback, not necessarily by an + idler. queue (list): a sorted list of (absolute time in sec, callback, args, kwds), sorted by time. These callbacks run only after the said time. @@ -121,7 +124,7 @@ def queue_call(self, delay, callback, *args, **kwargs): """Schedule a function call at a specific time in the future. Arguments: - delay (number): Time in seconds to delay running the callback. + delay (float): Time in seconds to delay running the callback. Times over a billion seconds are assumed to be absolute timestamps rather than delays. callback (callable): The function to eventually call. @@ -214,7 +217,7 @@ def run0(self): """Run one item (a callback or an RPC wait_any). Returns: - number: A time to sleep if something happened (may be 0); + float: A time to sleep if something happened (may be 0); None if all queues are empty. """ if self._run_current() or self.run_idle():