diff --git a/ndb/MIGRATION_NOTES.md b/ndb/MIGRATION_NOTES.md index 6281ae629bc5..15b47f2c4b7f 100644 --- a/ndb/MIGRATION_NOTES.md +++ b/ndb/MIGRATION_NOTES.md @@ -133,7 +133,10 @@ The primary differences come from: - There is a giant web of module interdependency, so runtime imports (to avoid import cycles) are very common. For example `model.Property` depends on `query` but `query` depends on `model`. +- Will need to sort out dependencies on old RPC implementations and port to + modern gRPC. ([Issue #6363][4]) [1]: https://github.com/GoogleCloudPlatform/datastore-ndb-python/issues/175 [2]: https://github.com/googleapis/google-cloud-python/issues/6317 [3]: https://github.com/googleapis/googleapis/blob/3afba2fd062df0c89ecd62d97f912192b8e0e0ae/google/datastore/v1/entity.proto#L203 +[4]: https://github.com/googleapis/google-cloud-python/issues/6363 diff --git a/ndb/src/google/cloud/ndb/eventloop.py b/ndb/src/google/cloud/ndb/eventloop.py index ac867bec7fe1..472f6d4a7ebd 100644 --- a/ndb/src/google/cloud/ndb/eventloop.py +++ b/ndb/src/google/cloud/ndb/eventloop.py @@ -16,7 +16,8 @@ This should handle both asynchronous ``ndb`` objects and arbitrary callbacks. """ - +import collections +import time __all__ = [ "add_idle", @@ -30,16 +31,237 @@ ] -def add_idle(*args, **kwargs): - raise NotImplementedError +def _logging_debug(*args, **kw): + """Placeholder. + + See #6360.""" + + +_Event = collections.namedtuple( + "_Event", ("when", "callback", "args", "kwargs") +) class EventLoop: - __slots__ = () + """An event loop. + + Instances of ``EventLoop`` are used to coordinate single thraded execution + of tasks and RPCs scheduled asynchronously. + + Atrributes: + current (deque): a FIFO list of (callback, args, kwds). These callbacks + run immediately when the eventloop runs. + idlers (deque): a FIFO list of (callback, args, kwds). Thes callbacks + run only when no other RPCs need to be fired first. + For example, AutoBatcher uses idler to fire a batch RPC even before + the batch is full. + inactive (int): Number of consecutive idlers that were noops. Reset + to 0 whenever work is done by any callback, not necessarily by an + idler. + queue (list): a sorted list of (absolute time in sec, callback, args, + kwds), sorted by time. These callbacks run only after the said + time. + rpcs (dict): a map from RPC to (callback, args, kwds). Callback is + called when the RPC finishes. + """ + + __slots__ = ("current", "idlers", "inactive", "queue", "rpcs") + + def __init__(self): + self.current = collections.deque() + self.idlers = collections.deque() + self.inactive = 0 + self.queue = [] + self.rpcs = {} + + def clear(self): + """Remove all pending events without running any.""" + while self.current or self.idlers or self.queue or self.rpcs: + current = self.current + idlers = self.idlers + queue = self.queue + rpcs = self.rpcs + _logging_debug("Clearing stale EventLoop instance...") + if current: + _logging_debug(" current = %s", current) + if idlers: + _logging_debug(" idlers = %s", idlers) + if queue: + _logging_debug(" queue = %s", queue) + if rpcs: + _logging_debug(" rpcs = %s", rpcs) + self.__init__() + current.clear() + idlers.clear() + queue[:] = [] + rpcs.clear() + _logging_debug("Cleared") + + def insort_event_right(self, event): + """Insert event in queue with sorting. - def __init__(self, *args, **kwargs): + This function assumes the queue is already sorted by ``event.when`` and + inserts ``event`` in the queue, maintaining the sort. + + For events with same `event.when`, new events are inserted to the + right, to keep FIFO order. + + Args: + event (_Event): The event to insert. + """ + queue = self.queue + low = 0 + high = len(queue) + while low < high: + mid = (low + high) // 2 + if event.when < queue[mid].when: + high = mid + else: + low = mid + 1 + queue.insert(low, event) + + def queue_call(self, delay, callback, *args, **kwargs): + """Schedule a function call at a specific time in the future. + + Arguments: + delay (float): Time in seconds to delay running the callback. + Times over a billion seconds are assumed to be absolute + timestamps rather than delays. + callback (callable): The function to eventually call. + *args: Positional arguments to be passed to callback. + **kwargs: Keyword arguments to be passed to callback. + """ + if delay is None: + self.current.append((callback, args, kwargs)) + return + + when = time.time() + delay if delay < 1e9 else delay + event = _Event(when, callback, args, kwargs) + self.insort_event_right(event) + + def queue_rpc(self, rpc, callback=None, *args, **kwds): + """Schedule an RPC with an optional callback. + + The caller must have previously sent the call to the service. + The optional callback is called with the remaining arguments. + + .. note:: + + If the rpc is a MultiRpc, the callback will be called once + for each sub-RPC. + """ raise NotImplementedError + def add_idle(self, callback, *args, **kwargs): + """Add an idle callback. + + An idle callback is a low priority task which is executed when + there aren't other events scheduled for immediate execution. + + An idle callback can return True, False or None. These mean: + + - None: remove the callback (don't reschedule) + - False: the callback did no work; reschedule later + - True: the callback did some work; reschedule soon + + If the callback raises an exception, the traceback is logged and + the callback is removed. + + Arguments: + callback (callable): The function to eventually call. + *args: Positional arguments to be passed to callback. + **kwargs: Keyword arguments to be passed to callback. + """ + self.idlers.append((callback, args, kwargs)) + + def run_idle(self): + """Run one of the idle callbacks. + + Returns: + bool: Indicates if an idle calback was called. + """ + if not self.idlers or self.inactive >= len(self.idlers): + return False + idler = self.idlers.popleft() + callback, args, kwargs = idler + _logging_debug("idler: %s", callback.__name__) + result = callback(*args, **kwargs) + + # See add_idle() for meaning of callback return value. + if result is None: + _logging_debug("idler %s removed", callback.__name__) + else: + if result: + self.inactive = 0 + else: + self.inactive += 1 + self.idlers.append(idler) + return True + + def _run_current(self): + """Run one current item. + + Returns: + bool: Indicates if an idle calback was called. + """ + if not self.current: + return False + + self.inactive = 0 + callback, args, kwargs = self.current.popleft() + _logging_debug("nowevent: %s", callback.__name__) + callback(*args, **kwargs) + return True + + def run0(self): + """Run one item (a callback or an RPC wait_any). + + Returns: + float: A time to sleep if something happened (may be 0); + None if all queues are empty. + """ + if self._run_current() or self.run_idle(): + return 0 + + delay = None + if self.queue: + delay = self.queue[0][0] - time.time() + if delay <= 0: + self.inactive = 0 + _, callback, args, kwargs = self.queue.pop(0) + _logging_debug("event: %s", callback.__name__) + callback(*args, **kwargs) + return 0 + + if self.rpcs: + raise NotImplementedError + + return delay + + def run1(self): + """Run one item (a callback or an RPC wait_any) or sleep. + + Returns: + bool: True if something happened; False if all queues are empty. + """ + delay = self.run0() + if delay is None: + return False + if delay > 0: + time.sleep(delay) + return True + + def run(self): + """Run until there's nothing left to do.""" + self.inactive = 0 + while True: + if not self.run1(): + break + + +def add_idle(*args, **kwargs): + raise NotImplementedError + def get_event_loop(*args, **kwargs): raise NotImplementedError diff --git a/ndb/tests/unit/test_eventloop.py b/ndb/tests/unit/test_eventloop.py index f3c17a21be0c..7d8d0e9a6e57 100644 --- a/ndb/tests/unit/test_eventloop.py +++ b/ndb/tests/unit/test_eventloop.py @@ -12,6 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +import collections +import unittest.mock + import pytest from google.cloud.ndb import eventloop @@ -22,16 +25,278 @@ def test___all__(): tests.unit.utils.verify___all__(eventloop) -def test_add_idle(): - with pytest.raises(NotImplementedError): - eventloop.add_idle() +def _Event(when=0, what="foo", args=(), kw={}): + return eventloop._Event(when, what, args, kw) class TestEventLoop: @staticmethod - def test_constructor(): + def _make_one(**attrs): + loop = eventloop.EventLoop() + for name, value in attrs.items(): + setattr(loop, name, value) + return loop + + def test_constructor(self): + loop = self._make_one() + assert loop.current == collections.deque() + assert loop.idlers == collections.deque() + assert loop.inactive == 0 + assert loop.queue == [] + assert loop.rpcs == {} + + def test_clear_all(self): + loop = self._make_one() + loop.current.append("foo") + loop.idlers.append("bar") + loop.queue.append("baz") + loop.rpcs["qux"] = "quux" + loop.clear() + assert not loop.current + assert not loop.idlers + assert not loop.queue + assert not loop.rpcs + + # idemptotence (branch coverage) + loop.clear() + assert not loop.current + assert not loop.idlers + assert not loop.queue + assert not loop.rpcs + + def test_clear_current(self): + loop = self._make_one() + loop.current.append("foo") + loop.clear() + assert not loop.current + assert not loop.idlers + assert not loop.queue + assert not loop.rpcs + + def test_clear_idlers(self): + loop = self._make_one() + loop.idlers.append("foo") + loop.clear() + assert not loop.current + assert not loop.idlers + assert not loop.queue + assert not loop.rpcs + + def test_insert_event_right_empty_queue(self): + loop = self._make_one() + event = _Event() + loop.insort_event_right(event) + assert loop.queue == [event] + + def test_insert_event_right_head(self): + loop = self._make_one(queue=[_Event(1, "bar")]) + loop.insort_event_right(_Event(0, "foo")) + assert loop.queue == [_Event(0, "foo"), _Event(1, "bar")] + + def test_insert_event_right_tail(self): + loop = self._make_one(queue=[_Event(0, "foo")]) + loop.insort_event_right(_Event(1, "bar")) + assert loop.queue == [_Event(0, "foo"), _Event(1, "bar")] + + def test_insert_event_right_middle(self): + loop = self._make_one(queue=[_Event(0, "foo"), _Event(2, "baz")]) + loop.insort_event_right(_Event(1, "bar")) + assert loop.queue == [ + _Event(0, "foo"), + _Event(1, "bar"), + _Event(2, "baz"), + ] + + def test_insert_event_right_collision(self): + loop = self._make_one( + queue=[_Event(0, "foo"), _Event(1, "bar"), _Event(2, "baz")] + ) + loop.insort_event_right(_Event(1, "barbar")) + assert loop.queue == [ + _Event(0, "foo"), + _Event(1, "bar"), + _Event(1, "barbar"), + _Event(2, "baz"), + ] + + def test_queue_call_now(self): + loop = self._make_one() + loop.queue_call(None, "foo", "bar", baz="qux") + assert list(loop.current) == [("foo", ("bar",), {"baz": "qux"})] + assert not loop.queue + + @unittest.mock.patch("google.cloud.ndb.eventloop.time") + def test_queue_call_soon(self, time): + loop = self._make_one() + time.time.return_value = 5 + loop.queue_call(5, "foo", "bar", baz="qux") + assert not loop.current + assert loop.queue == [_Event(10, "foo", ("bar",), {"baz": "qux"})] + + @unittest.mock.patch("google.cloud.ndb.eventloop.time") + def test_queue_call_absolute(self, time): + loop = self._make_one() + time.time.return_value = 5 + loop.queue_call(10e10, "foo", "bar", baz="qux") + assert not loop.current + assert loop.queue == [_Event(10e10, "foo", ("bar",), {"baz": "qux"})] + + def test_queue_rpc(self): + loop = self._make_one() + with pytest.raises(NotImplementedError): + loop.queue_rpc("rpc") + + def test_add_idle(self): + loop = self._make_one() + loop.add_idle("foo", "bar", baz="qux") + assert list(loop.idlers) == [("foo", ("bar",), {"baz": "qux"})] + + def test_run_idle_no_idlers(self): + loop = self._make_one() + assert loop.run_idle() is False + + def test_run_idle_all_inactive(self): + loop = self._make_one() + loop.add_idle("foo") + loop.inactive = 1 + assert loop.run_idle() is False + + def test_run_idle_remove_callback(self): + callback = unittest.mock.Mock(__name__="callback") + callback.return_value = None + loop = self._make_one() + loop.add_idle(callback, "foo", bar="baz") + loop.add_idle("foo") + assert loop.run_idle() is True + callback.assert_called_once_with("foo", bar="baz") + assert len(loop.idlers) == 1 + assert loop.inactive == 0 + + def test_run_idle_did_work(self): + callback = unittest.mock.Mock(__name__="callback") + callback.return_value = True + loop = self._make_one() + loop.add_idle(callback, "foo", bar="baz") + loop.add_idle("foo") + loop.inactive = 1 + assert loop.run_idle() is True + callback.assert_called_once_with("foo", bar="baz") + assert len(loop.idlers) == 2 + assert loop.inactive == 0 + + def test_run_idle_did_no_work(self): + callback = unittest.mock.Mock(__name__="callback") + callback.return_value = False + loop = self._make_one() + loop.add_idle(callback, "foo", bar="baz") + loop.add_idle("foo") + loop.inactive = 1 + assert loop.run_idle() is True + callback.assert_called_once_with("foo", bar="baz") + assert len(loop.idlers) == 2 + assert loop.inactive == 2 + + def test_run0_nothing_to_do(self): + loop = self._make_one() + assert loop.run0() is None + + def test_run0_current(self): + callback = unittest.mock.Mock(__name__="callback") + loop = self._make_one() + loop.queue_call(None, callback, "foo", bar="baz") + loop.inactive = 88 + assert loop.run0() == 0 + callback.assert_called_once_with("foo", bar="baz") + assert len(loop.current) == 0 + assert loop.inactive == 0 + + def test_run0_idler(self): + callback = unittest.mock.Mock(__name__="callback") + loop = self._make_one() + loop.add_idle(callback, "foo", bar="baz") + assert loop.run0() == 0 + callback.assert_called_once_with("foo", bar="baz") + + @unittest.mock.patch("google.cloud.ndb.eventloop.time") + def test_run0_next_later(self, time): + time.time.return_value = 0 + callback = unittest.mock.Mock(__name__="callback") + loop = self._make_one() + loop.queue_call(5, callback, "foo", bar="baz") + loop.inactive = 88 + assert loop.run0() == 5 + callback.assert_not_called() + assert len(loop.queue) == 1 + assert loop.inactive == 88 + + @unittest.mock.patch("google.cloud.ndb.eventloop.time") + def test_run0_next_now(self, time): + time.time.return_value = 0 + callback = unittest.mock.Mock(__name__="callback") + loop = self._make_one() + loop.queue_call(6, "foo") + loop.queue_call(5, callback, "foo", bar="baz") + loop.inactive = 88 + time.time.return_value = 10 + assert loop.run0() == 0 + callback.assert_called_once_with("foo", bar="baz") + assert len(loop.queue) == 1 + assert loop.inactive == 0 + + def test_run0_rpc(self): + loop = self._make_one() + loop.rpcs["foo"] = "bar" with pytest.raises(NotImplementedError): - eventloop.EventLoop() + loop.run0() + + def test_run1_nothing_to_do(self): + loop = self._make_one() + assert loop.run1() is False + + @unittest.mock.patch("google.cloud.ndb.eventloop.time") + def test_run1_has_work_now(self, time): + callback = unittest.mock.Mock(__name__="callback") + loop = self._make_one() + loop.queue_call(None, callback) + assert loop.run1() is True + time.sleep.assert_not_called() + callback.assert_called_once_with() + + @unittest.mock.patch("google.cloud.ndb.eventloop.time") + def test_run1_has_work_later(self, time): + time.time.return_value = 0 + callback = unittest.mock.Mock(__name__="callback") + loop = self._make_one() + loop.queue_call(5, callback) + assert loop.run1() is True + time.sleep.assert_called_once_with(5) + callback.assert_not_called() + + @unittest.mock.patch("google.cloud.ndb.eventloop.time") + def test_run(self, time): + time.time.return_value = 0 + + def mock_sleep(seconds): + time.time.return_value += seconds + + time.sleep = mock_sleep + idler = unittest.mock.Mock(__name__="idler") + idler.return_value = None + runnow = unittest.mock.Mock(__name__="runnow") + runlater = unittest.mock.Mock(__name__="runlater") + loop = self._make_one() + loop.add_idle(idler) + loop.queue_call(None, runnow) + loop.queue_call(5, runlater) + loop.run() + idler.assert_called_once_with() + runnow.assert_called_once_with() + runlater.assert_called_once_with() + + +def test_add_idle(): + with pytest.raises(NotImplementedError): + eventloop.add_idle() def test_get_event_loop():