Skip to content

Commit

Permalink
Connection pool and execute refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
simonw committed Nov 15, 2019
1 parent 8fc9a5d commit dba49c0
Show file tree
Hide file tree
Showing 3 changed files with 277 additions and 106 deletions.
90 changes: 20 additions & 70 deletions datasette/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,11 @@

from .utils import (
QueryInterrupted,
Results,
escape_css_string,
escape_sqlite,
get_plugins,
module_from_path,
sqlite3,
sqlite_timelimit,
to_css_class,
)
from .utils.asgi import (
Expand All @@ -42,13 +40,12 @@
asgi_send_json,
asgi_send_redirect,
)
from .tracer import trace, AsgiTracer
from .tracer import AsgiTracer
from .plugins import pm, DEFAULT_PLUGINS
from .version import __version__

app_root = Path(__file__).parent.parent

connections = threading.local()
MEMORY = object()

ConfigOption = collections.namedtuple("ConfigOption", ("name", "default", "help"))
Expand Down Expand Up @@ -336,6 +333,25 @@ def prepare_connection(self, conn):
# pylint: disable=no-member
pm.hook.prepare_connection(conn=conn)

async def execute(
self,
db_name,
sql,
params=None,
truncate=False,
custom_time_limit=None,
page_size=None,
log_sql_errors=True,
):
return await self.databases[db_name].execute(
sql,
params=params,
truncate=truncate,
custom_time_limit=custom_time_limit,
page_size=page_size,
log_sql_errors=log_sql_errors,
)

async def expand_foreign_keys(self, database, table, column, values):
"Returns dict mapping (column, value) -> label"
labeled_fks = {}
Expand Down Expand Up @@ -477,72 +493,6 @@ def table_metadata(self, database, table):
.get(table, {})
)

async def execute_against_connection_in_thread(self, db_name, fn):
def in_thread():
conn = getattr(connections, db_name, None)
if not conn:
conn = self.databases[db_name].connect()
self.prepare_connection(conn)
setattr(connections, db_name, conn)
return fn(conn)

return await asyncio.get_event_loop().run_in_executor(self.executor, in_thread)

async def execute(
self,
db_name,
sql,
params=None,
truncate=False,
custom_time_limit=None,
page_size=None,
log_sql_errors=True,
):
"""Executes sql against db_name in a thread"""
page_size = page_size or self.page_size

def sql_operation_in_thread(conn):
time_limit_ms = self.sql_time_limit_ms
if custom_time_limit and custom_time_limit < time_limit_ms:
time_limit_ms = custom_time_limit

with sqlite_timelimit(conn, time_limit_ms):
try:
cursor = conn.cursor()
cursor.execute(sql, params or {})
max_returned_rows = self.max_returned_rows
if max_returned_rows == page_size:
max_returned_rows += 1
if max_returned_rows and truncate:
rows = cursor.fetchmany(max_returned_rows + 1)
truncated = len(rows) > max_returned_rows
rows = rows[:max_returned_rows]
else:
rows = cursor.fetchall()
truncated = False
except sqlite3.OperationalError as e:
if e.args == ("interrupted",):
raise QueryInterrupted(e, sql, params)
if log_sql_errors:
print(
"ERROR: conn={}, sql = {}, params = {}: {}".format(
conn, repr(sql), params, e
)
)
raise

if truncate:
return Results(rows, truncated, cursor.description)

else:
return Results(rows, False, cursor.description)

with trace("sql", database=db_name, sql=sql.strip(), params=params):
results = await self.execute_against_connection_in_thread(
db_name, sql_operation_in_thread
)
return results

def register_renderers(self):
""" Register output renderers which output data in custom formats. """
# Built-in renderers
Expand Down
Loading

0 comments on commit dba49c0

Please sign in to comment.