From fc2468f61ab2a3e44b5a9ac773f15e899e2efbd4 Mon Sep 17 00:00:00 2001 From: Alan Jenkins Date: Mon, 10 Aug 2015 23:25:40 +0100 Subject: [PATCH 1/5] Implement asynchronous writeback fsync() after each segment write is suboptimal :). It means you stop (cpu) processing to wait for the physical disk write. And the default segment size is 5MB. (I noticed bup avoids this issue by writing pack files of 1GB by default :). Improvements will vary depending disk/cpu speed (I guess the worst case was when they were evenly matched). Writing 65M on SheevaPlug "NAS" went from 47s to 45s. 920M on desktop HDD (read from SSD) went from 68s to 45s --- attic/repository.py | 63 ++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 60 insertions(+), 3 deletions(-) diff --git a/attic/repository.py b/attic/repository.py index eed85dc436..ccf0ad58b6 100644 --- a/attic/repository.py +++ b/attic/repository.py @@ -7,6 +7,8 @@ import struct import sys from zlib import crc32 +import threading +import queue from .hashindex import NSIndex from .helpers import Error, IntegrityError, read_msgpack, write_msgpack, unhexlify, UpgradableLock @@ -377,7 +379,6 @@ def preload(self, ids): """Preload objects (only applies to remote repositories """ - class LoggedIO(object): header_fmt = struct.Struct(' Date: Tue, 11 Aug 2015 14:16:50 +0100 Subject: [PATCH 2/5] Make sure we propagate IO errors from async writeback --- attic/repository.py | 42 ++++++++++++++++++++++++++++++++---------- 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/attic/repository.py b/attic/repository.py index ccf0ad58b6..d359f89c49 100644 --- a/attic/repository.py +++ b/attic/repository.py @@ -589,36 +589,58 @@ class FsyncWorker(object): One fd is processed at a time. If the thread is already working, the caller will block. This provides double-buffering. + + Any exceptions (from os.fsync() or fd.close()) will be re-raised + on the next call into FsyncWorker. (Naturally this applies to + the .flush() and .close() methods as well as .fsync_and_close_fd()). """ def __init__(self): self.channel = Channel() - self.thread = threading.Thread(target=self._run, daemon=True) - self.thread.start() + self.exception = None + thread = threading.Thread(target=self._run, daemon=True) + thread.start() def _run(self): - while True: + while True: # worker thread loop task = self.channel.get() if task == None: - break - task() + break # thread shutdown requested + try: + task() + except Exception as e: + self.exception = e def fsync_and_close_fd(self, fd): """fsync() and close() fd in the background""" def task(): - os.fsync(fd) - fd.close() + try: + os.fsync(fd) + finally: + fd.close() + self.flush() # raise any pending exception self.channel.put(task) def flush(self): - """Wait for pending writeback""" + """Wait for any pending fsync. + + This will also make sure an IOError is re-raised + in the calling thread, if necessary. + """ def task(): pass self.channel.put(task) + if self.exception != None: + e = self.exception + self.exception = None + raise e + def close(self): - self.channel.put(None) - self.thread.join() # wait for thread to finish + try: + self.flush() + finally: + self.channel.put(None) # tell thread to shutdown class Channel(object): """A blocking channel, like in CSP or Go. From f5cdca56f5673392aa17a890adb59b8dc232072f Mon Sep 17 00:00:00 2001 From: Alan Jenkins Date: Wed, 12 Aug 2015 12:52:51 +0100 Subject: [PATCH 3/5] Python 3.2 compat fix in added code TypeError: __init__() got an unexpected keyword argument 'daemon' --- attic/repository.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/attic/repository.py b/attic/repository.py index d359f89c49..536f2b463e 100644 --- a/attic/repository.py +++ b/attic/repository.py @@ -598,7 +598,8 @@ class FsyncWorker(object): def __init__(self): self.channel = Channel() self.exception = None - thread = threading.Thread(target=self._run, daemon=True) + thread = threading.Thread(target=self._run) + thread.daemon = True thread.start() def _run(self): From 5abf53b8d9363bf8b5628bb34eee4fb312126548 Mon Sep 17 00:00:00 2001 From: Alan Jenkins Date: Wed, 12 Aug 2015 14:06:02 +0100 Subject: [PATCH 4/5] Style fixes in added code --- attic/repository.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/attic/repository.py b/attic/repository.py index 536f2b463e..a41b35bd42 100644 --- a/attic/repository.py +++ b/attic/repository.py @@ -603,10 +603,10 @@ def __init__(self): thread.start() def _run(self): - while True: # worker thread loop + while True: # worker thread loop task = self.channel.get() - if task == None: - break # thread shutdown requested + if task is None: + break # thread shutdown requested try: task() except Exception as e: @@ -619,7 +619,7 @@ def task(): os.fsync(fd) finally: fd.close() - self.flush() # raise any pending exception + self.flush() # raise any pending exception self.channel.put(task) def flush(self): @@ -632,7 +632,7 @@ def task(): pass self.channel.put(task) - if self.exception != None: + if self.exception is not None: e = self.exception self.exception = None raise e @@ -641,7 +641,7 @@ def close(self): try: self.flush() finally: - self.channel.put(None) # tell thread to shutdown + self.channel.put(None) # tell thread to shutdown class Channel(object): """A blocking channel, like in CSP or Go. @@ -659,4 +659,4 @@ def get(self): def put(self, item): self.q.put(item) - self.q.join() # wait for task_done(), in reader thread + self.q.join() # wait for task_done(), in reader thread From f7ff0455c4a94e1134398c5e0afc7a9ae7aa9994 Mon Sep 17 00:00:00 2001 From: Alan Jenkins Date: Thu, 13 Aug 2015 15:37:50 +0100 Subject: [PATCH 5/5] FsyncWorker also does flush and fadvise now: call it WritebackThread My beautiful code didn't include flush to start with, and certainly not fadvise. Now it needs a new name and verbose comments. Such is life. No semantic change here. I did move the flush into WritebackThread. However it will happen in exactly the same sequence (and still on the same thread as the writes, which sounds good). --- borg/repository.py | 45 +++++++++++++++++++++++++++++++-------------- 1 file changed, 31 insertions(+), 14 deletions(-) diff --git a/borg/repository.py b/borg/repository.py index 11f99065ed..164bbcc229 100755 --- a/borg/repository.py +++ b/borg/repository.py @@ -440,7 +440,7 @@ def __init__(self, path, limit, segments_per_dir, capacity=90): self.segments_per_dir = segments_per_dir self.offset = 0 self._write_fd = None - self.writeback = FsyncWorker() + self.writeback = WritebackThread() def close(self): for segment in list(self.fds.keys()): @@ -632,20 +632,24 @@ def close_segment(self): if self._write_fd: self.segment += 1 self.offset = 0 - self._write_fd.flush() - self.writeback.fsync_and_close_fd(self._write_fd) + self.writeback.submit_fd(self._write_fd) self._write_fd = None -class FsyncWorker(object): - """os.fsync() in a background thread. +class WritebackThread(object): + """A background thread to flush buffered writes to permanent storage. One fd is processed at a time. If the thread is already working, - the caller will block. This provides double-buffering. + the caller will block. This provides double-buffering. Credit + probably goes to Linus Torvalds because I read this message some + time ago :). + + http://article.gmane.org/gmane.linux.kernel/988070 + http://stackoverflow.com/a/3756466/799204 - Any exceptions (from os.fsync() or fd.close()) will be re-raised - on the next call into FsyncWorker. (Naturally this applies to - the .flush() and .close() methods as well as .fsync_and_close_fd()). + Any IO exceptions will be re-raised on the next method call. + (Naturally this applies to the .flush() and .close() methods as well + as .submit_fd()). """ def __init__(self): @@ -665,22 +669,35 @@ def _run(self): except Exception as e: self.exception = e - def fsync_and_close_fd(self, fd): - """fsync() and close() fd in the background""" + def submit_fd(self, fd): + """Start flushing fd's buffered writes to permanent storage. + + This requires ownership of fd; when finished the fd will be closed. + """ + # Wait for pending writeback and re-raise any error + self.flush() + + # Flush any python buffers to kernel (effectively calls os.write()). + # This can block just like fd.write() if we're writing too fast. + # So we block the caller exactly when we want to. + fd.flush() + + # Background writeback task def task(): try: + # Trigger writeback of kernel buffers and wait for all data + # to reach permanent storage. os.fsync(fd.fileno()) if hasattr(os, 'posix_fadvise'): # python >= 3.3, only on UNIX - # tell the OS that it does not need to cache what we just wrote, + # tell the OS it does not need to cache what we just wrote, # avoids spoiling the cache for the OS and other processes. os.posix_fadvise(fd.fileno(), 0, 0, os.POSIX_FADV_DONTNEED) finally: fd.close() - self.flush() # raise any pending exception self.channel.put(task) def flush(self): - """Wait for any pending fsync. + """Wait for any pending writeback. This will also make sure an IOError is re-raised in the calling thread, if necessary.