Skip to content

Commit

Permalink
Initial WIP for execute_write_fn, refs #682
Browse files Browse the repository at this point in the history
  • Loading branch information
simonw committed Feb 24, 2020
1 parent b031fe9 commit 39d1a0a
Showing 1 changed file with 59 additions and 4 deletions.
63 changes: 59 additions & 4 deletions datasette/database.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import asyncio
import contextlib
from pathlib import Path
import queue
import threading
import uuid

from .tracer import trace
from .utils import (
Expand Down Expand Up @@ -30,6 +32,8 @@ def __init__(self, ds, path=None, is_mutable=False, is_memory=False):
self.hash = None
self.cached_size = None
self.cached_table_counts = None
self._write_thread = None
self._write_queue = None
if not self.is_mutable:
p = Path(path)
self.hash = inspect_hash(p)
Expand All @@ -41,18 +45,51 @@ def __init__(self, ds, path=None, is_mutable=False, is_memory=False):
for key, value in self.ds.inspect_data[self.name]["tables"].items()
}

def connect(self):
def connect(self, write=False):
if self.is_memory:
return sqlite3.connect(":memory:")
# mode=ro or immutable=1?
if self.is_mutable:
qs = "mode=ro"
qs = "?mode=ro"
else:
qs = "immutable=1"
qs = "?immutable=1"
assert not (write and not self.is_mutable)
if write:
qs = ""
return sqlite3.connect(
"file:{}?{}".format(self.path, qs), uri=True, check_same_thread=False
"file:{}{}".format(self.path, qs), uri=True, check_same_thread=False
)

def execute_write(self, sql, params=None, timeout=0.5):
return self.execute_write_fn(
lambda conn: conn.execute(sql, params or []), timeout=timeout
)

def execute_write_fn(self, fn, timeout=0.5):
task_id = uuid.uuid5(uuid.NAMESPACE_DNS, "datasette.io")
if self._write_queue is None:
self._write_queue = queue.Queue()
if self._write_thread is None:
self._write_thread = threading.Thread(
target=self._execute_writes, daemon=True
)
self._write_thread.start()
reply_queue = queue.Queue()
self._write_queue.put(WriteTask(fn, task_id, reply_queue))
try:
reply = reply_queue.get(timeout=timeout)
return WriteResponse(uuid, reply)
except queue.Empty:
return WriteResponse(uuid, in_progress=True)

def _execute_writes(self):
# Looks after the write connection to this database.
# Runs in a thread.
conn = self.connect(write=True)
while True:
task = self._write_queue.get()
task.reply_queue.put(task.fn(conn))

async def execute_against_connection_in_thread(self, fn):
def in_thread():
conn = getattr(connections, self.name, None)
Expand Down Expand Up @@ -326,3 +363,21 @@ def __repr__(self):
if tags:
tags_str = " ({})".format(", ".join(tags))
return "<Database: {}{}>".format(self.name, tags_str)


class WriteTask:
__slots__ = ("fn", "task_id", "reply_queue")

def __init__(self, fn, task_id, reply_queue):
self.fn = fn
self.task_id = task_id
self.reply_queue = reply_queue


class WriteResponse:
__slots__ = ("uuid", "reply", "in_progress")

def __init__(self, uuid, reply=None, in_progress=False):
self.uuid = uuid
self.reply = reply
self.in_progress = in_progress

0 comments on commit 39d1a0a

Please sign in to comment.