From f7ff0455c4a94e1134398c5e0afc7a9ae7aa9994 Mon Sep 17 00:00:00 2001 From: Alan Jenkins Date: Thu, 13 Aug 2015 15:37:50 +0100 Subject: [PATCH] FsyncWorker also does flush and fadvise now: call it WritebackThread My beautiful code didn't include flush to start with, and certainly not fadvise. Now it needs a new name and verbose comments. Such is life. No semantic change here. I did move the flush into WritebackThread. However it will happen in exactly the same sequence (and still on the same thread as the writes, which sounds good). --- borg/repository.py | 45 +++++++++++++++++++++++++++++++-------------- 1 file changed, 31 insertions(+), 14 deletions(-) diff --git a/borg/repository.py b/borg/repository.py index 11f99065ed..164bbcc229 100755 --- a/borg/repository.py +++ b/borg/repository.py @@ -440,7 +440,7 @@ def __init__(self, path, limit, segments_per_dir, capacity=90): self.segments_per_dir = segments_per_dir self.offset = 0 self._write_fd = None - self.writeback = FsyncWorker() + self.writeback = WritebackThread() def close(self): for segment in list(self.fds.keys()): @@ -632,20 +632,24 @@ def close_segment(self): if self._write_fd: self.segment += 1 self.offset = 0 - self._write_fd.flush() - self.writeback.fsync_and_close_fd(self._write_fd) + self.writeback.submit_fd(self._write_fd) self._write_fd = None -class FsyncWorker(object): - """os.fsync() in a background thread. +class WritebackThread(object): + """A background thread to flush buffered writes to permanent storage. One fd is processed at a time. If the thread is already working, - the caller will block. This provides double-buffering. + the caller will block. This provides double-buffering. Credit + probably goes to Linus Torvalds because I read this message some + time ago :). + + http://article.gmane.org/gmane.linux.kernel/988070 + http://stackoverflow.com/a/3756466/799204 - Any exceptions (from os.fsync() or fd.close()) will be re-raised - on the next call into FsyncWorker. (Naturally this applies to - the .flush() and .close() methods as well as .fsync_and_close_fd()). + Any IO exceptions will be re-raised on the next method call. + (Naturally this applies to the .flush() and .close() methods as well + as .submit_fd()). """ def __init__(self): @@ -665,22 +669,35 @@ def _run(self): except Exception as e: self.exception = e - def fsync_and_close_fd(self, fd): - """fsync() and close() fd in the background""" + def submit_fd(self, fd): + """Start flushing fd's buffered writes to permanent storage. + + This requires ownership of fd; when finished the fd will be closed. + """ + # Wait for pending writeback and re-raise any error + self.flush() + + # Flush any python buffers to kernel (effectively calls os.write()). + # This can block just like fd.write() if we're writing too fast. + # So we block the caller exactly when we want to. + fd.flush() + + # Background writeback task def task(): try: + # Trigger writeback of kernel buffers and wait for all data + # to reach permanent storage. os.fsync(fd.fileno()) if hasattr(os, 'posix_fadvise'): # python >= 3.3, only on UNIX - # tell the OS that it does not need to cache what we just wrote, + # tell the OS it does not need to cache what we just wrote, # avoids spoiling the cache for the OS and other processes. os.posix_fadvise(fd.fileno(), 0, 0, os.POSIX_FADV_DONTNEED) finally: fd.close() - self.flush() # raise any pending exception self.channel.put(task) def flush(self): - """Wait for any pending fsync. + """Wait for any pending writeback. This will also make sure an IOError is re-raised in the calling thread, if necessary.