-
Notifications
You must be signed in to change notification settings - Fork 177
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optimize the experimental python types flag #206
Optimize the experimental python types flag #206
Conversation
trino/client.py
Outdated
def _map_row(self, row): | ||
return [self._map_value(value, self.mapping_funcs[idx]) for idx, value in enumerate(row)] | ||
|
||
def _map_value(self, value, col_mapping_func): | ||
if value is None: | ||
return None | ||
|
||
try: | ||
return col_mapping_func(value) | ||
except ValueError as e: | ||
error_str = f"Could not convert '{value}' into the associated python type" | ||
raise trino.exceptions.TrinoDataError(error_str) from e |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move to separate class RowMapper
. If no col_mapping_funcs are provided in constructor, return the row value as is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
trino/client.py
Outdated
@@ -776,6 +701,11 @@ def fetch(self) -> List[List[Any]]: | |||
self._response_headers = response.headers | |||
if status.next_uri is None: | |||
self._finished = True | |||
|
|||
if self._experimental_python_types: | |||
factory = RowMapperFactory(status.columns) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RowMapperFactory().create(...)
should only be called once when the columns are known. Maybe move this logic to the _update_state
method. Add the created RowMapper
instance on an instance field of TrinoQuery
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I modified the code as best as I could, because in execute()
function expects some row (maybe) yet the columns may be not known yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, the columns are only known when Trino is starting to send data, which can be in the first execute
or in any of the subsequent fetch
calls. We should delay the creation of the RowMapper
until the columns are present and reuse that cached RowMapper
on the fetched data to get the performance benefit.
trino/client.py
Outdated
if self._experimental_python_types: | ||
factory = RowMapperFactory(status.columns) | ||
rows = factory.map_rows(status.rows) | ||
else: | ||
rows = status.rows | ||
|
||
self._result = TrinoResult(self, rows) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also applies to the fetch
method.
if self._experimental_python_types: | |
factory = RowMapperFactory(status.columns) | |
rows = factory.map_rows(status.rows) | |
else: | |
rows = status.rows | |
self._result = TrinoResult(self, rows) | |
assert self._row_mapper is not None | |
self._result = TrinoResult(self, self._row_mapper(rows)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually if we want to call RowMapperFactory()
only once we know the column, then we shouldn't call this in execute()
. At this point we have no guarantee we'll have columns at this point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, that's why I suggested to add this in the _update_state
method to call it only when you actually have the columns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please add some docstrings/comments to the new classes?
def _col_func(self, column): | ||
col_type = column['rawType'] | ||
|
||
if col_type == 'array': |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be splitted to separate functions per data type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That way we could do unit tests. especially on the date and time types, that would be beneficial.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
trino/client.py
Outdated
if ms_size > 0: | ||
pattern += ".%f" | ||
|
||
if col_type.startswith('timestamp'): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's try to avoid these nested conditionals, it would improve code visibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
trino/client.py
Outdated
col_type = column['rawType'] | ||
|
||
if col_type == 'array': | ||
elt_mapping_func = self._col_func(column['arguments'][0]['value']) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the meaning of the name elt_mapping_func
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's the mapping function for each element of the array
trino/client.py
Outdated
return self._result | ||
|
||
def _update_state(self, status): | ||
self._stats.update(status.stats) | ||
self._update_type = status.update_type | ||
self._row_mapper = RowMapperFactory().create(columns=status.columns, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_update_state
will be called on every request, in order to get the benefit of only initializing the row mapper once. we would need to to ensure it's only called once.
self._row_mapper = RowMapperFactory().create(columns=status.columns, | |
if self._row_mapper is None and status.columns is not None: | |
self._row_mapper = RowMapperFactory().create(columns=status.columns, |
That way the RowMapperFactory().create() would only be called once when the columns are set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
trino/client.py
Outdated
@@ -776,7 +701,8 @@ def fetch(self) -> List[List[Any]]: | |||
self._response_headers = response.headers | |||
if status.next_uri is None: | |||
self._finished = True | |||
return status.rows | |||
|
|||
return self._row_mapper.map(status.rows) if self._row_mapper else status.rows |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At this stage the self._row_mapper
should never be undefined. When rows are returned, the columns are also known.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually _row_mapper
may NOT be defined at this point. This is when the query is queued and we're still waiting for results.
trino/client.py
Outdated
return self._result | ||
|
||
def _update_state(self, status): | ||
self._stats.update(status.stats) | ||
self._update_type = status.update_type | ||
if not self._row_mapper: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's possible that the columns are not yet known at this point. The row mapper would not be recreated in case the columns arrive later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is correct, but then the RowMapperFactory
will just return None
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me defer the final review to @hashhar.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A quick pass.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some more comments.
I'd recommend focusing on everything except temporal types for now since temporal types are a rabbit hole and need to be reviewed in more depth.
.add_field(sql="cast(null AS VARBINARY)", python=None) \ | ||
.add_field(sql="cast('{}' AS JSON)", python='"{}"') \ | ||
.add_field(sql="cast('null' AS JSON)", python='"null"') \ | ||
.add_field(sql="cast(null AS JSON)", python=None) \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
varbinary and JSON into their own methods.
Also let's test some varbinary values which don't represent valid strings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also empty varbinary (not null) is special as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for JSON we should also test float values, strings, strings with special chars (", ', :), empty array as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually right now we don't seem to perform anything special treatment for JSON
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done for splitting each type in its own test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Either add TODO to add the missing tests or add the tests for the other values and assert that they fail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What test are you talking about here? JSON tests (even though we just convert them to strings), varbinary which represent invalid strings (e.g. non-UTF-8?) or something else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both.
- Add test for varbinary which doesn't represent a string
- Add test for JSON with:
- JSON '3.14'
- JSON 'a string'
- JSON 'a " complex ' string :' (I'm not sure if I escaped that
'
correctly) - JSON '[]'
Even if the test fails we should assert on that failure so that when something like #214 changes behaviour we know to update the tests. It also serves as documentation of the "issues" we need to fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not entirely sure I know exactly what you have in mind but I've added such tests.
return RowMapper() | ||
|
||
def _col_func(self, column): | ||
col_type = column['rawType'] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What other fields does the typeSignature have available? I'm asking because it's possible for startswith to fail in presence of user contributed types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typeSignature has rawType
and arguments
fields (the latter is used for types such as decimal(3, 1)
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then can the checks below use rawType == 'decimal'
instead of existing rawType.startswith('decimal')
etc.?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've seen cases where the value is decimal(18, 18)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👀 Thanks. I'll need to re-read some code on the server side to have better understanding. 🙂
return lambda val: \ | ||
[datetime.strptime(val[:dt_size] + val[dt_tz_offset:], pattern + ' %z') | ||
if tz.startswith('+') or tz.startswith('-') | ||
else datetime.strptime(dt[:dt_size] + dt[dt_tz_offset:], pattern) | ||
.replace(tzinfo=pytz.timezone(tz)) | ||
for dt, tz in [val.rsplit(' ', 1)]][0] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
while this is very concise a proper function would be more readable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it? The function relies on multiple variables computed before the lambda (e.g. dt_tz_offset
, tz
, etc.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
@ebyhr FYI I've asked timestamp related tests to be added in separate PR to help with reviewing them.
@lpoulain Can you squash the "fixups" together? |
fe93827
to
33a7cd5
Compare
@hashhar done |
@lpoulain "All commits" do not need to be squashed. Just the logically related ones. For future PRs see https://github.com/trinodb/trino/blob/master/.github/DEVELOPMENT.md#git-merge-strategy |
@lpoulain Please reword the commit message as:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good but please reword commit message to have useful information.
Instead of checking the type for each row, check the type once for each fetch() call and compute a list of lambdas which are to be applied to the values from each row. A new RowMapperFactory class is created to wrap this behavior. The experimental_python_types flag is now processed in the TrinoQuery class instead of the TrinoResult class. Type mapping tests for each lambda which maps rows to Python types is added.
33a7cd5
to
f0da713
Compare
@hashhar done |
Optimizes the
experimental_python_types
flag. Instead of checking the type for each row, check the type once for eachfetch()
call and compute a list of lambdas which are to be applied to the values from each row.RowMapperFactory
class was created to wrap this behavior. Thecolumns
content from the status is analyzed and a list of lambda functions is created (one lambda per column)experimental_python_types
flag is now processed in theTrinoQuery
class instead of theTrinoResult
class