Skip to content

Commit

Permalink
WIP attempt to fix tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
mmontagna committed Feb 4, 2020
1 parent 55cd147 commit 3fd2657
Showing 1 changed file with 26 additions and 17 deletions.
43 changes: 26 additions & 17 deletions lore/io/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import sys
import tempfile
import threading
import psycopg2

from datetime import datetime

Expand Down Expand Up @@ -74,6 +75,14 @@ def after_replace(func):
sqlalchemy_logger.setLevel(log_levels.get(lore.env.NAME, logging.WARN))


class ResultWrapper(object):
# Used to make psycopg2 results compatible
# with the interface provided by Connection.execute
def __init__(self, results):
self.results = results
def fetchall(self):
return self.results

class Connection(object):
UNLOAD_PREFIX = os.path.join(lore.env.NAME, 'unloads')
IAM_ROLE = os.environ.get('IAM_ROLE', None)
Expand Down Expand Up @@ -276,21 +285,7 @@ def select(self, sql=None, extract=None, filename=None, **kwargs):

@query_cached
def _select(self, sql, bindings):
if self._use_psycopg2:
try:
with self._connection.engine.raw_connection().connection as conn:
with conn.cursor() as cursor:
cursor.execute(sql, bindings)
return cursor.fetchall()
except Psycopg2OperationalError as e:
logger.warning('Reconnect and retry due to invalid connection')
self.close()
with self._connection.engine.raw_connection().connection as conn:
with conn.cursor() as cursor:
cursor.execute(sql, bindings)
return cursor.fetchall()
else:
return self.__execute(sql, bindings).fetchall()
return self.__execute(sql, bindings).fetchall()

def unload(self, sql=None, extract=None, filename=None, **kwargs):
cache = kwargs.pop('cache', False)
Expand Down Expand Up @@ -443,14 +438,28 @@ def __prepare(self, sql=None, extract=None, filename=None, **kwargs):

return sql

def __connection_execute(self, sql, bindings):
if self._use_psycopg2:
with self._connection.engine.raw_connection().connection as conn:
with conn.cursor() as cursor:
cursor.execute(sql, bindings)
try:
return ResultWrapper(cursor.fetchall())
except psycopg2.ProgrammingError as e:
if 'no results to fetch' in str(y):
return None
raise e
else:
return self._connection.execute(sql, bindings)

def __execute(self, sql, bindings):
try:
return self._connection.execute(sql, bindings)
return self.__connection_execute(sql, bindings)
except (sqlalchemy.exc.DBAPIError, Psycopg2OperationalError, SnowflakeProgrammingError) as e:
if not self._transactions and (isinstance(e, Psycopg2OperationalError) or e.connection_invalidated):
logger.warning('Reconnect and retry due to invalid connection')
self.close()
return self._connection.execute(sql, bindings)
return self.__connection_execute(sql, bindings)
elif not self._transactions and (isinstance(e, SnowflakeProgrammingError) or e.connection_invalidated):
if hasattr(e, 'msg') and e.msg and "authenticate" in e.msg.lower():
logger.warning('Reconnect and retry due to unauthenticated connection')
Expand Down

0 comments on commit 3fd2657

Please sign in to comment.