Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP. First pass at ndb.eventloop.EventLoop implementation. #6353

Merged
merged 9 commits into from
Nov 2, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
First pass at ndb.eventloop.EvenLoop implementation. Does not impleme…
…nt RPC

integration.
  • Loading branch information
Chris Rossi committed Oct 31, 2018
commit c69a532b64d1d6f98e6ce70a17d2691cba68ec5c
223 changes: 218 additions & 5 deletions ndb/src/google/cloud/ndb/eventloop.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@

This should handle both asynchronous ``ndb`` objects and arbitrary callbacks.
"""

import collections
import time

__all__ = [
"add_idle",
Expand All @@ -30,16 +31,228 @@
]


def add_idle(*args, **kwargs):
raise NotImplementedError
def _noop(*args, **kw):
"""Do nothing."""

# TODO: Use utils.logging_debug when implemented

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

_logging_debug = _noop


_Event = collections.namedtuple('_Event', (
'when', 'callback', 'args', 'kwargs'))


class _Clock(object):
"""A clock to determine the current time, in seconds."""

def now(self):
"""Returns the number of seconds since epoch."""
return time.time()

def sleep(self, seconds):
"""Sleeps for the desired number of seconds."""
time.sleep(seconds)


class EventLoop:
__slots__ = ()
"""Constructor.

This comment was marked as spam.

This comment was marked as spam.


Args:
clock: an eventloop._Clock object. Defaults to a time-based clock.

def __init__(self, *args, **kwargs):
Fields:

This comment was marked as spam.

This comment was marked as spam.

current: a FIFO list of (callback, args, kwds). These callbacks
run immediately when the eventloop runs.

This comment was marked as spam.

This comment was marked as spam.

idlers: a FIFO list of (callback, args, kwds). Thes callbacks
run only when no other RPCs need to be fired first.
For example, AutoBatcher uses idler to fire a batch RPC even before
the batch is full.
queue: a sorted list of (absolute time in sec, callback, args, kwds),
sorted by time. These callbacks run only after the said time.
rpcs: a map from rpc to (callback, args, kwds). Callback is called
when the rpc finishes.
"""
__slots__ = ('clock', 'current', 'idlers', 'inactive', 'queue', 'rpcs')

This comment was marked as spam.

This comment was marked as spam.


def __init__(self, clock=None):
self.clock = clock if clock else _Clock()

This comment was marked as spam.

This comment was marked as spam.

self.current = collections.deque()
self.idlers = collections.deque()
self.inactive = 0
self.queue = []
self.rpcs = {}

def clear(self):

This comment was marked as spam.

"""Remove all pending events without running any."""
while self.current or self.idlers or self.queue or self.rpcs:
current = self.current
idlers = self.idlers
queue = self.queue
rpcs = self.rpcs
_logging_debug('Clearing stale EventLoop instance...')
if current:
_logging_debug(' current = %s', current)
if idlers:
_logging_debug(' idlers = %s', idlers)
if queue:
_logging_debug(' queue = %s', queue)
if rpcs:
_logging_debug(' rpcs = %s', rpcs)
self.__init__(self.clock)

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

current.clear()
idlers.clear()
queue[:] = []
rpcs.clear()
_logging_debug('Cleared')

def insort_event_right(self, event):

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

"""Insert event in queue, and keep it sorted by `event.when` assuming
queue is sorted.

This comment was marked as spam.


For events with same `event.when`, new events are inserted to the
right, to keep FIFO order.

Args:
event: a (time in sec since unix epoch, callback, args, kwargs)

This comment was marked as spam.

tuple.
"""
queue = self.queue
lo = 0
hi = len(queue)

This comment was marked as spam.

while lo < hi:
mid = (lo + hi) // 2
if event.when < queue[mid].when:
hi = mid
else:
lo = mid + 1
queue.insert(lo, event)

def queue_call(self, delay, callback, *args, **kwargs):
"""Schedule a function call at a specific time in the future."""

This comment was marked as spam.

if delay is None:
self.current.append((callback, args, kwargs))
return

# Times over a billion seconds are assumed to be absolute

This comment was marked as spam.

when = self.clock.now() + delay if delay < 1e9 else delay
event = _Event(when, callback, args, kwargs)
self.insort_event_right(event)

def queue_rpc(self, rpc, callback=None, *args, **kwds):
"""Schedule an RPC with an optional callback.

The caller must have previously sent the call to the service.
The optional callback is called with the remaining arguments.

NOTE: If the rpc is a MultiRpc, the callback will be called once

This comment was marked as spam.

This comment was marked as spam.

for each sub-RPC. TODO: Is this a good idea?
"""
# TODO Integrate with gRPC

This comment was marked as spam.

This comment was marked as spam.

raise NotImplementedError

def add_idle(self, callback, *args, **kwargs):
"""Add an idle callback.

An idle callback can return True, False or None. These mean:

- None: remove the callback (don't reschedule)
- False: the callback did no work; reschedule later
- True: the callback did some work; reschedule soon

If the callback raises an exception, the traceback is logged and
the callback is removed.

This comment was marked as spam.

This comment was marked as spam.

"""
self.idlers.append((callback, args, kwargs))

def run_idle(self):
"""Run one of the idle callbacks.

Returns:
True if one was called, False if no idle callback was called.

This comment was marked as spam.

"""
if not self.idlers or self.inactive >= len(self.idlers):
return False
idler = self.idlers.popleft()
callback, args, kwargs = idler
_logging_debug('idler: %s', callback.__name__)
result = callback(*args, **kwargs)

# See add_idle() for meaning of callback return value.
if result is None:
_logging_debug('idler %s removed', callback.__name__)
else:
if result:
self.inactive = 0
else:
self.inactive += 1
self.idlers.append(idler)
return True

def _run_current(self):
"""Run one current item.

Returns:
True if one was called, False if no callback was called.
"""
if not self.current:
return False

self.inactive = 0
callback, args, kwargs = self.current.popleft()
_logging_debug('nowevent: %s', callback.__name__)
callback(*args, **kwargs)
return True

def run0(self):
"""Run one item (a callback or an RPC wait_any).

Returns:
A time to sleep if something happened (may be 0);
None if all queues are empty.
"""
if self._run_current() or self.run_idle():
return 0

delay = None
if self.queue:
delay = self.queue[0][0] - self.clock.now()
if delay <= 0:
self.inactive = 0
_, callback, args, kwargs = self.queue.pop(0)
_logging_debug('event: %s', callback.__name__)
callback(*args, **kwargs)
# TODO: What if it raises an exception?

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

return 0

if self.rpcs:
raise NotImplementedError

This comment was marked as spam.

This comment was marked as spam.


return delay

def run1(self):
"""Run one item (a callback or an RPC wait_any) or sleep.

Returns:
True if something happened; False if all queues are empty.
"""
delay = self.run0()
if delay is None:
return False
if delay > 0:
self.clock.sleep(delay)
return True

def run(self):
"""Run until there's nothing left to do."""
self.inactive = 0
while True:
if not self.run1():
break


def add_idle(*args, **kwargs):
raise NotImplementedError


def get_event_loop(*args, **kwargs):
raise NotImplementedError
Expand Down
Loading