From e35d98613605ae6c428b68ff51bc789b287e7ad0 Mon Sep 17 00:00:00 2001 From: Michiel De Smet Date: Sat, 13 Aug 2022 01:38:04 +0200 Subject: [PATCH] Acknowledge reception of data in `TrinoResult` --- trino/client.py | 20 +++++++++----------- trino/dbapi.py | 2 +- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/trino/client.py b/trino/client.py index 211ce9c0..b8dafcd9 100644 --- a/trino/client.py +++ b/trino/client.py @@ -594,7 +594,8 @@ class TrinoResult(object): def __init__(self, query, rows=None, experimental_python_types: bool = False): self._query = query - self._rows = rows or [] + # Initial rows from the first POST request + self._rows = rows self._rownumber = 0 self._experimental_python_types = experimental_python_types @@ -603,20 +604,17 @@ def rownumber(self) -> int: return self._rownumber def __iter__(self): - # Initial fetch from the first POST request - for row in self._rows: - self._rownumber += 1 - yield self._map_row(self._experimental_python_types, row, self._query.columns) - self._rows = None - - # Subsequent fetches from GET requests until next_uri is empty. - while not self._query.finished: - rows = self._query.fetch() - for row in rows: + # A query only transitions to a FINISHED state when the results are fully consumed: + # The reception of the data is acknowledged by calling the next_uri before exposing the data through dbapi. + while not self._query.finished or self._rows is not None: + next_rows = self._query.fetch() if not self._query.finished else None + for row in self._rows: self._rownumber += 1 logger.debug("row %s", row) yield self._map_row(self._experimental_python_types, row, self._query.columns) + self._rows = next_rows + @property def response_headers(self): return self._query.response_headers diff --git a/trino/dbapi.py b/trino/dbapi.py index 44813168..70fb43bb 100644 --- a/trino/dbapi.py +++ b/trino/dbapi.py @@ -322,7 +322,7 @@ def _prepare_statement(self, operation, statement_name): operation=operation ) - # Send prepare statement. Copy the _request object to avoid poluting the + # Send prepare statement. Copy the _request object to avoid polluting the # one that is going to be used to execute the actual operation. query = trino.client.TrinoQuery(copy.deepcopy(self._request), sql=sql, experimental_python_types=self._experimental_pyton_types)