Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mechanism for writing to database via a queue #682

Closed
simonw opened this issue Feb 24, 2020 · 10 comments
Closed

Mechanism for writing to database via a queue #682

simonw opened this issue Feb 24, 2020 · 10 comments
Labels

Comments

@simonw
Copy link
Owner

simonw commented Feb 24, 2020

I've been mulling this over for a long time, and I have a new approach that I think is worth exploring.

The catch with writing to SQLite is that it should only accept one write at a time. I'm now thinking that an easy way to manage that would be with a write queue for each database which is then read by a single dedicated write thread which manages its own writable connection.

@simonw
Copy link
Owner Author

simonw commented Feb 24, 2020

Some more detailed notes I made earlier:

Datasette would run a single write thread per database. That thread gets an exclusive connection, and a queue. Plugins can add functions to the queue which will be called and given access to that connection.

The write thread for that database is created the first time a write is attempted.

Question: should that thread have its own asyncio loop so that async techniques like httpx can be used within the thread? I think not at first - only investigate this if it turns out to be necessary in the future.

This thread will run as part of the Datasette process. This means there is always a risk that the thread will die in the middle of something because the server got restarted - so use transactions to limit risk of damage to database should that happen.

I don’t want web responses blocking waiting for stuff to happen here - so every task put on that queue will have a task ID, and that ID will be returned such that client code can poll for its completion.

Could the request block for up to 0.5s just in case the write is really fast, then return a polling token if it isn't finished yet? Looks possible - Queue.get can block with a timeout.

There will be a /-/writes page which shows currently queued writes - so each one needs a human-readable description of some sort. (You can access a deque called q.queue to see what’s in there)

Stretch goal: It would be cool if write operations could optionally handle their own progress reports. That way I can do some really nice UI around what’s going on with these things.

This mechanism has a ton of potential. It may even be how we handle things like Twitter imports and suchlike - queued writing tasks.

One catch with this approach: if a plugin is reading from APIs etc it shouldn't block writes to the database while it is doing so. So sticking a function in the queue that does additional time consuming stuff is actually an anti pattern. Instead, plugins should schedule their API access in the main event loop and occasionally write just the updates they need to make to that write queue.

Implementation notes

Maybe each item in the queue is a (callable, uuid, reply_queue) triple. You can do a blocking .get() on the reply_queue if you want to wait for the answer. The execution framework could look for the return value from callable() and automatically send it to reply_queue.

@simonw
Copy link
Owner Author

simonw commented Feb 24, 2020

Some prior art: Charles Leifer implemented a SqliteQueueDatabase class that automatically queues writes for you: https://charlesleifer.com/blog/multi-threaded-sqlite-without-the-operationalerrors/

@simonw
Copy link
Owner Author

simonw commented Feb 24, 2020

Implementation plan

Method on Database class called execute_write(sql)

Which calls .execute_write_fn(fn) - so you can instead create a function that applies a whole batch of writes and pass that instead if you need to

Throws an error of database isn't mutable.

Add ._writer_thread thread property to Database - we start that thread the first time we need it. It blocks on ._writer_queue.get()

We write to that queue with WriteTask(fn, uuid, reply_queue) namedtuples - then time-out block awaiting reply for 0.5s

Have a .write_status(uuid) method that checks if uuid has completed

This should be enough to get it all working. MVP can skip the .5s timeout entirely

But... what about that progress bar supporting stretch goal?

For that let's have each write operation that's currently in progress have total and done integer properties. So I guess we can add those to the WriteTask.

Should we have the ability to see what the currently executing write is? Seems useful.

Hopefully I can integrate https://github.com/tqdm/tqdm such that it calculates ETAs without actually trying to print to the console.

@simonw
Copy link
Owner Author

simonw commented Feb 24, 2020

I wonder if I even need the reply_queue mechanism? Are the replies from writes generally even interesting?

@simonw
Copy link
Owner Author

simonw commented Feb 24, 2020

Error handling could be tricky. Exceptions thrown in threads don't show up anywhere by default - I would need to explicitly catch them and decide what to do with them.

@simonw
Copy link
Owner Author

simonw commented Feb 24, 2020

I'm dropping the progress bar idea. This mechanism is supposed to guarantee exclusive access to the single write connection, which means it should be targeted by operations that are as short as possible. An operation running long enough to need a progress bar is too long!

Any implementation of progress bars for long running write operations needs to happen elsewhere in the stack.

@simonw
Copy link
Owner Author

simonw commented Feb 24, 2020

Interesting challenge: I would like to be able to "await" on queue.get() (with a timeout).

Problem is: queue.Queue() is designed for threading and cannot be awaited. asyncio.Queue can be awaited but is not meant to be used with threads.

https://stackoverflow.com/a/32894169 suggests using Janus, a thread-aware asyncio queue: https://github.com/aio-libs/janus

@simonw
Copy link
Owner Author

simonw commented Feb 24, 2020

I tested this using the following code in a view (after from sqlite_utils import Database):

    db = next(iter(self.ds.databases.values()))
    db.execute_write_fn(lambda conn: Database(conn)["counter"].insert({"id": 1, "count": 0}, pk="id", ignore=True))
    db.execute_write("update counter set count = count + 1 where id = 1")

@simonw
Copy link
Owner Author

simonw commented Feb 24, 2020

I filed a question / feature request with Janus about supporting timeouts for .get() against async queues here: aio-libs/janus#240

I'm going to move ahead without needing that ability though. I figure SQLite writes are fast, and plugins can be trusted to implement just fast writes. So I'm going to support either fire-and-forget writes (they get added to the queue and a task ID is returned) or have the option to block awaiting the completion of the write (using Janus) but let callers decide which version they want. I may add optional timeouts some time in the future.

I am going to make both execute_write() and execute_write_fn() awaitable functions though, for consistency with .execute() and to give me flexibility to change how they work in the future.

I'll also add a block=True option to both of them which causes the function to wait for the write to be successfully executed - defaults to False (fire-and-forget mode).

@simonw
Copy link
Owner Author

simonw commented Feb 24, 2020

Moving further development to a pull request: #683

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant