Skip to content

Commit

Permalink
Merge pull request #2531 from dhermes/revamp-iterator
Browse files Browse the repository at this point in the history
Re-architect Iterator class.
  • Loading branch information
dhermes authored Oct 14, 2016
2 parents 484d469 + d9824c6 commit 9a5ddd5
Show file tree
Hide file tree
Showing 11 changed files with 577 additions and 437 deletions.
260 changes: 154 additions & 106 deletions core/google/cloud/iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,21 @@
These iterators simplify the process of paging through API responses
where the response is a list of results with a ``nextPageToken``.
To make an iterator work, just override the ``get_items_from_response``
method so that given a response (containing a page of results) it parses
those results into an iterable of the actual objects you want::
To make an iterator work, just override the ``PAGE_CLASS`` class
attribute so that given a response (containing a page of results) can
be parsed into an iterable page of the actual objects you want::
class MyPage(Page):
def _item_to_value(self, item):
my_item = MyItemClass(other_arg=True)
my_item._set_properties(item)
return my_item
class MyIterator(Iterator):
def get_items_from_response(self, response):
items = response.get('items', [])
for item in items:
my_item = MyItemClass(other_arg=True)
my_item._set_properties(item)
yield my_item
PAGE_CLASS = MyPage
You then can use this to get **all** the results from a resource::
Expand All @@ -38,25 +42,114 @@ def get_items_from_response(self, response):
you find what you're looking for (resulting in possibly fewer
requests)::
>>> for item in MyIterator(...):
>>> print(item.name)
>>> if not item.is_valid:
>>> break
>>> for my_item in MyIterator(...):
... print(my_item.name)
... if not my_item.is_valid:
... break
When iterating, not every new item will send a request to the server.
To monitor these requests, track the current page of the iterator::
>>> iterator = MyIterator(...)
>>> iterator.page_number
0
>>> next(iterator)
<MyItemClass at 0x7f1d3cccf690>
>>> iterator.page_number
1
>>> iterator.page.remaining
1
>>> next(iterator)
<MyItemClass at 0x7f1d3cccfe90>
>>> iterator.page.remaining
0
>>> next(iterator)
<MyItemClass at 0x7f1d3cccffd0>
>>> iterator.page_number
2
>>> iterator.page.remaining
19
"""


import six


class Page(object):
"""Single page of results in an iterator.
:type parent: :class:`Iterator`
:param parent: The iterator that owns the current page.
:type response: dict
:param response: The JSON API response for a page.
"""

ITEMS_KEY = 'items'

def __init__(self, parent, response):
self._parent = parent
items = response.get(self.ITEMS_KEY, ())
self._num_items = len(items)
self._remaining = self._num_items
self._item_iter = iter(items)

@property
def num_items(self):
"""Total items in the page.
:rtype: int
:returns: The number of items in this page of items.
"""
return self._num_items

@property
def remaining(self):
"""Remaining items in the page.
:rtype: int
:returns: The number of items remaining this page.
"""
return self._remaining

def __iter__(self):
"""The :class:`Page` is an iterator."""
return self

def _item_to_value(self, item):
"""Get the next item in the page.
This method (along with the constructor) is the workhorse
of this class. Subclasses will need to implement this method.
:type item: dict
:param item: An item to be converted to a native object.
:raises NotImplementedError: Always
"""
raise NotImplementedError

def next(self):
"""Get the next value in the iterator."""
item = six.next(self._item_iter)
result = self._item_to_value(item)
# Since we've successfully got the next value from the
# iterator, we update the number of remaining.
self._remaining -= 1
return result

# Alias needed for Python 2/3 support.
__next__ = next


class Iterator(object):
"""A generic class for iterating through Cloud JSON APIs list responses.
Sub-classes need to over-write ``PAGE_CLASS``.
:type client: :class:`google.cloud.client.Client`
:param client: The client, which owns a connection to make requests.
:type path: str
:param path: The path to query for the list of items.
:type page_token: str
:param page_token: (Optional) A token identifying a page in a result set.
Expand All @@ -65,59 +158,74 @@ class Iterator(object):
:type extra_params: dict or None
:param extra_params: Extra query string parameters for the API call.
:type path: str
:param path: The path to query for the list of items.
"""

PAGE_TOKEN = 'pageToken'
MAX_RESULTS = 'maxResults'
RESERVED_PARAMS = frozenset([PAGE_TOKEN, MAX_RESULTS])
PAGE_CLASS = Page
PATH = None

def __init__(self, client, path, page_token=None,
max_results=None, extra_params=None):
def __init__(self, client, page_token=None, max_results=None,
extra_params=None, path=None):
self.extra_params = extra_params or {}
self._verify_params()
self.max_results = max_results
self.client = client
self.path = path
self.path = path or self.PATH
# The attributes below will change over the life of the iterator.
self.page_number = 0
self.next_page_token = page_token
self.max_results = max_results
self.num_results = 0
self.extra_params = extra_params or {}
self._page = None

def _verify_params(self):
"""Verifies the parameters don't use any reserved parameter.
:raises ValueError: If a reserved parameter is used.
"""
reserved_in_use = self.RESERVED_PARAMS.intersection(
self.extra_params)
if reserved_in_use:
raise ValueError(('Using a reserved parameter',
reserved_in_use))
self._curr_items = iter(())
raise ValueError('Using a reserved parameter',
reserved_in_use)

@property
def page(self):
"""The current page of results that has been retrieved.
:rtype: :class:`Page`
:returns: The page of items that has been retrieved.
"""
return self._page

def __iter__(self):
"""The :class:`Iterator` is an iterator."""
return self

def _update_items(self):
"""Replace the current items iterator.
Intended to be used when the current items iterator is exhausted.
def _update_page(self):
"""Replace the current page.
After replacing the iterator, consumes the first value to make sure
it is valid.
Does nothing if the current page is non-null and has items
remaining.
:rtype: object
:returns: The first item in the next iterator.
:raises: :class:`~exceptions.StopIteration` if there is no next page.
"""
if self.page is not None and self.page.remaining > 0:
return
if self.has_next_page():
response = self.get_next_page_response()
items = self.get_items_from_response(response)
self._curr_items = iter(items)
return six.next(self._curr_items)
response = self._get_next_page_response()
self._page = self.PAGE_CLASS(self, response)
else:
raise StopIteration

def next(self):
"""Get the next value in the iterator."""
try:
item = six.next(self._curr_items)
except StopIteration:
item = self._update_items()

self._update_page()
item = six.next(self.page)
self.num_results += 1
return item

Expand All @@ -139,7 +247,7 @@ def has_next_page(self):

return self.next_page_token is not None

def get_query_params(self):
def _get_query_params(self):
"""Getter for query parameters for the next request.
:rtype: dict
Expand All @@ -153,17 +261,15 @@ def get_query_params(self):
result.update(self.extra_params)
return result

def get_next_page_response(self):
def _get_next_page_response(self):
"""Requests the next page from the path provided.
:rtype: dict
:returns: The parsed JSON response of the next page's contents.
"""
if not self.has_next_page():
raise RuntimeError('No more pages. Try resetting the iterator.')

response = self.client.connection.api_request(
method='GET', path=self.path, query_params=self.get_query_params())
method='GET', path=self.path,
query_params=self._get_query_params())

self.page_number += 1
self.next_page_token = response.get('nextPageToken')
Expand All @@ -175,62 +281,4 @@ def reset(self):
self.page_number = 0
self.next_page_token = None
self.num_results = 0

def get_items_from_response(self, response):
"""Factory method called while iterating. This should be overridden.
This method should be overridden by a subclass. It should
accept the API response of a request for the next page of items,
and return a list (or other iterable) of items.
Typically this method will construct a Bucket or a Blob from the
page of results in the response.
:type response: dict
:param response: The response of asking for the next page of items.
"""
raise NotImplementedError


class MethodIterator(object):
"""Method-based iterator iterating through Cloud JSON APIs list responses.
:type method: instance method
:param method: ``list_foo`` method of a domain object, taking as arguments
``page_token``, ``page_size``, and optional additional
keyword arguments.
:type page_token: string or ``NoneType``
:param page_token: Initial page token to pass. if ``None``, fetch the
first page from the ``method`` API call.
:type page_size: integer or ``NoneType``
:param page_size: Maximum number of items to return from the ``method``
API call; if ``None``, uses the default for the API.
:type max_calls: integer or ``NoneType``
:param max_calls: Maximum number of times to make the ``method``
API call; if ``None``, applies no limit.
:type kw: dict
:param kw: optional keyword arguments to be passed to ``method``.
"""
def __init__(self, method, page_token=None, page_size=None,
max_calls=None, **kw):
self._method = method
self._token = page_token
self._page_size = page_size
self._kw = kw
self._max_calls = max_calls
self._page_num = 0

def __iter__(self):
while self._max_calls is None or self._page_num < self._max_calls:
items, new_token = self._method(
page_token=self._token, page_size=self._page_size, **self._kw)
for item in items:
yield item
if new_token is None:
return
self._page_num += 1
self._token = new_token
self._page = None
Loading

0 comments on commit 9a5ddd5

Please sign in to comment.