Skip to content

Commit

Permalink
.execute_write() and .execute_write_fn() methods on Database (#683)
Browse files Browse the repository at this point in the history
Closes #682.
  • Loading branch information
simonw authored Feb 25, 2020
1 parent 411056c commit a093c5f
Show file tree
Hide file tree
Showing 8 changed files with 282 additions and 95 deletions.
64 changes: 60 additions & 4 deletions datasette/database.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import asyncio
import contextlib
from pathlib import Path
import janus
import queue
import threading
import uuid

from .tracer import trace
from .utils import (
Expand Down Expand Up @@ -30,6 +33,8 @@ def __init__(self, ds, path=None, is_mutable=False, is_memory=False):
self.hash = None
self.cached_size = None
self.cached_table_counts = None
self._write_thread = None
self._write_queue = None
if not self.is_mutable:
p = Path(path)
self.hash = inspect_hash(p)
Expand All @@ -41,18 +46,60 @@ def __init__(self, ds, path=None, is_mutable=False, is_memory=False):
for key, value in self.ds.inspect_data[self.name]["tables"].items()
}

def connect(self):
def connect(self, write=False):
if self.is_memory:
return sqlite3.connect(":memory:")
# mode=ro or immutable=1?
if self.is_mutable:
qs = "mode=ro"
qs = "?mode=ro"
else:
qs = "immutable=1"
qs = "?immutable=1"
assert not (write and not self.is_mutable)
if write:
qs = ""
return sqlite3.connect(
"file:{}?{}".format(self.path, qs), uri=True, check_same_thread=False
"file:{}{}".format(self.path, qs), uri=True, check_same_thread=False
)

async def execute_write(self, sql, params=None, block=False):
def _inner(conn):
with conn:
return conn.execute(sql, params or [])

return await self.execute_write_fn(_inner, block=block)

async def execute_write_fn(self, fn, block=False):
task_id = uuid.uuid5(uuid.NAMESPACE_DNS, "datasette.io")
if self._write_queue is None:
self._write_queue = queue.Queue()
if self._write_thread is None:
self._write_thread = threading.Thread(
target=self._execute_writes, daemon=True
)
self._write_thread.start()
reply_queue = janus.Queue()
self._write_queue.put(WriteTask(fn, task_id, reply_queue))
if block:
result = await reply_queue.async_q.get()
if isinstance(result, Exception):
raise result
else:
return result
else:
return task_id

def _execute_writes(self):
# Infinite looping thread that protects the single write connection
# to this database
conn = self.connect(write=True)
while True:
task = self._write_queue.get()
try:
result = task.fn(conn)
except Exception as e:
result = e
task.reply_queue.sync_q.put(result)

async def execute_against_connection_in_thread(self, fn):
def in_thread():
conn = getattr(connections, self.name, None)
Expand Down Expand Up @@ -326,3 +373,12 @@ def __repr__(self):
if tags:
tags_str = " ({})".format(", ".join(tags))
return "<Database: {}{}>".format(self.name, tags_str)


class WriteTask:
__slots__ = ("fn", "task_id", "reply_queue")

def __init__(self, fn, task_id, reply_queue):
self.fn = fn
self.task_id = task_id
self.reply_queue = reply_queue
2 changes: 1 addition & 1 deletion docs/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Changelog
0.36 (2020-02-21)
-----------------

* The ``datasette`` object passed to plugins now has API documentation: :ref:`datasette`. (`#576 <https://github.com/simonw/datasette/issues/576>`__)
* The ``datasette`` object passed to plugins now has API documentation: :ref:`internals_datasette`. (`#576 <https://github.com/simonw/datasette/issues/576>`__)
* New methods on ``datasette``: ``.add_database()`` and ``.remove_database()`` - :ref:`documentation <datasette_add_database>`. (`#671 <https://github.com/simonw/datasette/issues/671>`__)
* ``prepare_connection()`` plugin hook now takes optional ``datasette`` and ``database`` arguments - :ref:`plugin_hook_prepare_connection`. (`#678 <https://github.com/simonw/datasette/issues/678>`__)
* Added three new plugins and one new conversion tool to the :ref:`ecosystem`.
Expand Down
83 changes: 0 additions & 83 deletions docs/datasette.rst

This file was deleted.

2 changes: 1 addition & 1 deletion docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ Contents
introspection
custom_templates
plugins
datasette
internals
contributing
changelog

Expand Down
148 changes: 148 additions & 0 deletions docs/internals.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
.. _internals:

Internals for plugins
=====================

Many :ref:`plugin_hooks` are passed objects that provide access to internal Datasette functionality. The interface to these objects should not be considered stable (at least until Datasette 1.0) with the exception of methods that are documented on this page.

.. _internals_datasette:

Datasette class
~~~~~~~~~~~~~~~

This object is an instance of the ``Datasette`` class, passed to many plugin hooks as an argument called ``datasette``.

.. _datasette_plugin_config:

.plugin_config(plugin_name, database=None, table=None)
------------------------------------------------------

``plugin_name`` - string
The name of the plugin to look up configuration for. Usually this is something similar to ``datasette-cluster-map``.

``database`` - None or string
The database the user is interacting with.

``table`` - None or string
The table the user is interacting with.

This method lets you read plugin configuration values that were set in ``metadata.json``. See :ref:`plugins_plugin_config` for full details of how this method should be used.

.. _datasette_render_template:

.render_template(template, context=None, request=None)
------------------------------------------------------

``template`` - string
The template file to be rendered, e.g. ``my_plugin.html``. Datasette will search for this file first in the ``--template-dir=`` location, if it was specified - then in the plugin's bundled templates and finally in Datasette's set of default templates.

``conttext`` - None or a Python dictionary
The context variables to pass to the template.

``request`` - request object or None
If you pass a Datasette request object here it will be made available to the template.

Renders a `Jinja template <https://jinja.palletsprojects.com/en/2.11.x/>`__ using Datasette's preconfigured instance of Jinja and returns the resulting string. The template will have access to Datasette's default template functions and any functions that have been made available by other plugins.

.. _datasette_add_database:

.add_database(name, db)
-----------------------

``name`` - string
The unique name to use for this database. Also used in the URL.

``db`` - datasette.database.Database instance
The database to be attached.

The ``datasette.add_database(name, db)`` method lets you add a new database to the current Datasette instance. This database will then be served at URL path that matches the ``name`` parameter, e.g. ``/mynewdb/``.

The ``db`` parameter should be an instance of the ``datasette.database.Database`` class. For example:

.. code-block:: python
from datasette.database import Database
datasette.add_database("my-new-database", Database(
datasette,
path="path/to/my-new-database.db",
is_mutable=True
))
This will add a mutable database from the provided file path.

The ``Database()`` constructor takes four arguments: the first is the ``datasette`` instance you are attaching to, the second is a ``path=``, then ``is_mutable`` and ``is_memory`` are both optional arguments.

Use ``is_mutable`` if it is possible that updates will be made to that database - otherwise Datasette will open it in immutable mode and any changes could cause undesired behavior.

Use ``is_memory`` if the connection is to an in-memory SQLite database.

.. _datasette_remove_database:

.remove_database(name)
----------------------

``name`` - string
The name of the database to be removed.

This removes a database that has been previously added. ``name=`` is the unique name of that database, also used in the URL for it.

.. _internals_database:

Database class
~~~~~~~~~~~~~~

Instances of the ``Database`` class can be used to execute queries against attached SQLite databases, and to run introspection against their schemas.

SQLite only allows one databasae connection to write at a time. Datasette handles this for you by maintaining a queue of writes to be executed against a given database. Plugins can submit write operations to this queue and they will be executed in the order in which they are received.

.. _database_execute_write:

await db.execute_write(sql, params=None, block=False)
-----------------------------------------------------

This method can be used to queue up a non-SELECT SQL query to be executed against a single write connection to the database.

You can pass additional SQL parametercs as a tuple or list.

By default queries are considered to be "fire and forget" - they will be added to the queue and executed in a separate thread while your code can continue to do other things. The method will return a UUID representing the queued task.

If you pass ``block=True`` this behaviour changes: the method will block until the write operation has completed, and the return value will be the return from calling ``conn.execute(...)`` using the underlying ``sqlite3`` Python library.

.. _database_execute_write:

await db.execute_write_fn(fn, block=False)
------------------------------------------

This method works like ``.execute_write()``, but instead of a SQL statement you give it a callable Python function. This function will be queued up and then called when the write connection is available, passing that connection as the argument to the function.

The function can then perform multiple actions, safe in the knowledge that it has exclusive access to the single writable connection as long as it is executing.

For example:

.. code-block:: python
def my_action(conn):
conn.execute("delete from some_table")
conn.execute("delete from other_table")
await database.execute_write_fn(my_action)
This method is fire-and-forget, queueing your function to be executed and then allowing your code after the call to ``.execute_write_fn()`` to continue running while the underlying thread waits for an opportunity to run your function. A UUID representing the queued task will be returned.

If you pass ``block=True`` your calling code will block until the function has been executed. The return value to the ``await`` will be the return value of your function.

If your function raises an exception and you specified ``block=True``, that exception will be propagated up to the ``await`` line. With ``block=False`` any exceptions will be silently ignored.

Here's an example of ``block=True`` in action:

.. code-block:: python
def my_action(conn):
conn.execute("delete from some_table where id > 5")
return conn.execute("select count(*) from some_table").fetchone()[0]
try:
num_rows_left = await database.execute_write_fn(my_action, block=True)
except Exception as e:
print("An error occurred:", e)
Loading

0 comments on commit a093c5f

Please sign in to comment.