Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimization: asynchronous writeback #150

Closed
wants to merge 7 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 107 additions & 7 deletions borg/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import struct
import sys
from zlib import crc32
import threading
import queue

from .hashindex import NSIndex
from .helpers import Error, IntegrityError, read_msgpack, write_msgpack, unhexlify
Expand Down Expand Up @@ -438,11 +440,13 @@ def __init__(self, path, limit, segments_per_dir, capacity=90):
self.segments_per_dir = segments_per_dir
self.offset = 0
self._write_fd = None
self.writeback = WritebackThread()

def close(self):
for segment in list(self.fds.keys()):
self.fds.pop(segment).close()
self.close_segment()
self.writeback.close()
self.fds = None # Just to make sure we're disabled

def segment_iterator(self, reverse=False):
Expand Down Expand Up @@ -622,16 +626,112 @@ def write_commit(self):
crc = self.crc_fmt.pack(crc32(header) & 0xffffffff)
fd.write(b''.join((crc, header)))
self.close_segment()
self.writeback.flush()

def close_segment(self):
if self._write_fd:
self.segment += 1
self.offset = 0
self._write_fd.flush()
os.fsync(self._write_fd.fileno())
if hasattr(os, 'posix_fadvise'): # python >= 3.3, only on UNIX
# tell the OS that it does not need to cache what we just wrote,
# avoids spoiling the cache for the OS and other processes.
os.posix_fadvise(self._write_fd.fileno(), 0, 0, os.POSIX_FADV_DONTNEED)
self._write_fd.close()
self.writeback.submit_fd(self._write_fd)
self._write_fd = None


class WritebackThread(object):
"""A background thread to flush buffered writes to permanent storage.

One fd is processed at a time. If the thread is already working,
the caller will block. This provides double-buffering. Credit
probably goes to Linus Torvalds because I read this message some
time ago :).

http://article.gmane.org/gmane.linux.kernel/988070
http://stackoverflow.com/a/3756466/799204

Any IO exceptions will be re-raised on the next method call.
(Naturally this applies to the .flush() and .close() methods as well
as .submit_fd()).
"""

def __init__(self):
self.channel = Channel()
self.exception = None
thread = threading.Thread(target=self._run)
thread.daemon = True
thread.start()

def _run(self):
while True: # worker thread loop
task = self.channel.get()
if task is None:
break # thread shutdown requested
try:
task()
except Exception as e:
self.exception = e

def submit_fd(self, fd):
"""Start flushing fd's buffered writes to permanent storage.

This requires ownership of fd; when finished the fd will be closed.
"""
# Wait for pending writeback and re-raise any error
self.flush()

# Flush any python buffers to kernel (effectively calls os.write()).
# This can block just like fd.write() if we're writing too fast.
# So we block the caller exactly when we want to.
fd.flush()

# Background writeback task
def task():
try:
# Trigger writeback of kernel buffers and wait for all data
# to reach permanent storage.
os.fsync(fd.fileno())
if hasattr(os, 'posix_fadvise'): # python >= 3.3, only on UNIX
# tell the OS it does not need to cache what we just wrote,
# avoids spoiling the cache for the OS and other processes.
os.posix_fadvise(fd.fileno(), 0, 0, os.POSIX_FADV_DONTNEED)
finally:
fd.close()
self.channel.put(task)

def flush(self):
"""Wait for any pending writeback.

This will also make sure an IOError is re-raised
in the calling thread, if necessary.
"""
def task():
pass
self.channel.put(task)

if self.exception is not None:
e = self.exception
self.exception = None
raise e

def close(self):
try:
self.flush()
finally:
self.channel.put(None) # tell thread to shutdown


class Channel(object):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, couldn't we just use a normal Queue with length 1 and just signal "task done" when we really have done it?

"""A blocking channel, like in CSP or Go.

This can also be considered as a Queue with zero buffer space.
"""

def __init__(self):
self.q = queue.Queue()

def get(self):
value = self.q.get()
self.q.task_done()
return value

def put(self, item):
self.q.put(item)
self.q.join() # wait for task_done(), in reader thread