Skip to content

Commit

Permalink
FsyncWorker also does flush and fadvise now: call it WritebackThread
Browse files Browse the repository at this point in the history
My beautiful code didn't include flush to start with, and certainly not
fadvise.  Now it needs a new name and verbose comments.  Such is life.

No semantic change here.  I did move the flush into WritebackThread.
However it will happen in exactly the same sequence (and still on the same
thread as the writes, which sounds good).
  • Loading branch information
sourcejedi committed Aug 13, 2015
1 parent a6b3e6d commit f7ff045
Showing 1 changed file with 31 additions and 14 deletions.
45 changes: 31 additions & 14 deletions borg/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ def __init__(self, path, limit, segments_per_dir, capacity=90):
self.segments_per_dir = segments_per_dir
self.offset = 0
self._write_fd = None
self.writeback = FsyncWorker()
self.writeback = WritebackThread()

def close(self):
for segment in list(self.fds.keys()):
Expand Down Expand Up @@ -632,20 +632,24 @@ def close_segment(self):
if self._write_fd:
self.segment += 1
self.offset = 0
self._write_fd.flush()
self.writeback.fsync_and_close_fd(self._write_fd)
self.writeback.submit_fd(self._write_fd)
self._write_fd = None


class FsyncWorker(object):
"""os.fsync() in a background thread.
class WritebackThread(object):
"""A background thread to flush buffered writes to permanent storage.
One fd is processed at a time. If the thread is already working,
the caller will block. This provides double-buffering.
the caller will block. This provides double-buffering. Credit
probably goes to Linus Torvalds because I read this message some
time ago :).
http://article.gmane.org/gmane.linux.kernel/988070
http://stackoverflow.com/a/3756466/799204
Any exceptions (from os.fsync() or fd.close()) will be re-raised
on the next call into FsyncWorker. (Naturally this applies to
the .flush() and .close() methods as well as .fsync_and_close_fd()).
Any IO exceptions will be re-raised on the next method call.
(Naturally this applies to the .flush() and .close() methods as well
as .submit_fd()).
"""

def __init__(self):
Expand All @@ -665,22 +669,35 @@ def _run(self):
except Exception as e:
self.exception = e

def fsync_and_close_fd(self, fd):
"""fsync() and close() fd in the background"""
def submit_fd(self, fd):
"""Start flushing fd's buffered writes to permanent storage.
This requires ownership of fd; when finished the fd will be closed.
"""
# Wait for pending writeback and re-raise any error
self.flush()

# Flush any python buffers to kernel (effectively calls os.write()).
# This can block just like fd.write() if we're writing too fast.
# So we block the caller exactly when we want to.
fd.flush()

# Background writeback task
def task():
try:
# Trigger writeback of kernel buffers and wait for all data
# to reach permanent storage.
os.fsync(fd.fileno())
if hasattr(os, 'posix_fadvise'): # python >= 3.3, only on UNIX
# tell the OS that it does not need to cache what we just wrote,
# tell the OS it does not need to cache what we just wrote,
# avoids spoiling the cache for the OS and other processes.
os.posix_fadvise(fd.fileno(), 0, 0, os.POSIX_FADV_DONTNEED)
finally:
fd.close()
self.flush() # raise any pending exception
self.channel.put(task)

def flush(self):
"""Wait for any pending fsync.
"""Wait for any pending writeback.
This will also make sure an IOError is re-raised
in the calling thread, if necessary.
Expand Down

0 comments on commit f7ff045

Please sign in to comment.