From 3e79ef8ef77c2fe7f4774d13fcdd33f245b882bf Mon Sep 17 00:00:00 2001 From: David Cuthbert Date: Tue, 24 May 2016 23:14:15 +0000 Subject: [PATCH] Remove caching bits. --- client/setup.py | 2 +- client/ublkdev/s3.py | 151 ++++++++++++++++--------------------------- 2 files changed, 55 insertions(+), 98 deletions(-) diff --git a/client/setup.py b/client/setup.py index e49316d..c203e7e 100755 --- a/client/setup.py +++ b/client/setup.py @@ -18,7 +18,7 @@ "ubd-debug=ublkdev.ublkdev:debug", ] }, - install_requires=["boto>=2.0", "six>=1.0", "futures>=3.0", "pylru>=1.0"], + install_requires=["boto>=2.0", "six>=1.0", "futures>=3.0"], #setup_requires=["nose>=1.0"], # PyPI information diff --git a/client/ublkdev/s3.py b/client/ublkdev/s3.py index 9833f13..db861c9 100644 --- a/client/ublkdev/s3.py +++ b/client/ublkdev/s3.py @@ -10,9 +10,8 @@ from json import dumps as json_dumps, loads as json_loads import logging from os import environ -from pylru import WriteThroughCacheManager from re import match -from six.moves import cStringIO as StringIO +from six.moves import cStringIO as StringIO, range from struct import pack, unpack from sys import argv, exit, stderr, stdin, stdout from threading import Condition, RLock, Thread @@ -85,7 +84,7 @@ class S3Pool(object): def __init__(self, region, size, bucket_name, s3_kw={}): super(S3Pool, self).__init__() conns = [boto.s3.connect_to_region(region, **s3_kw) - for i in xrange(size)] + for i in range(size)] if [conn for conn in conns if conn is None]: raise RuntimeError("Failed to connect to S3 region %r" % region) self.connections = [ @@ -139,11 +138,9 @@ class UBDS3Volume(object): """ A userspace block driver volume handler for S3-backed volumes. """ - def __init__(self, bucket_name, devname, region, handler_threads=1, - write_threads=10, cache_size=1000000, s3_kw={}): + def __init__(self, bucket_name, devname, region, thread_count=1, s3_kw={}): """ - UBDS3Volume(bucket_name, devname, region, handler_threads=1, - write_threads=10, cache_size=1000000, s3_kw={}) + UBDS3Volume(bucket_name, devname, region, thread_count=1, s3_kw={}) Create a new UBDS3Volume object. """ @@ -152,7 +149,7 @@ def __init__(self, bucket_name, devname, region, handler_threads=1, self.devname = devname self.region = region self.ubd = None - self.handler_threads = handler_threads + self.thread_count = thread_count self.block_size = None self.encryption = None self.policy = None @@ -162,17 +159,7 @@ def __init__(self, bucket_name, devname, region, handler_threads=1, self.stop_requested = False - self.s3_pool = S3Pool(region, handler_threads + write_threads, - bucket_name, s3_kw) - - self.lock = RLock() - - # lock must be held to read or write lrucache and pending_write - self.lrucache = WriteThroughCacheManager(self, cache_size) - self.pending_write = {} - - self.write_executor = ThreadPoolExecutor(write_threads) - + self.s3_pool = S3Pool(region, thread_count, bucket_name, s3_kw) return def register(self): @@ -191,7 +178,7 @@ def register(self): def run(self): self.threads = [UBDS3Handler("handler-%d" % i, self) - for i in xrange(self.handler_threads)] + for i in range(self.thread_count)] try: for thread in self.threads: @@ -279,7 +266,7 @@ def read(self, offset, length): end_offset = self.block_size result = StringIO() - for block_id in xrange(start_block, end_block + 1): + for block_id in range(start_block, end_block + 1): block_data = self.read_block(block_id) if block_id == start_block: @@ -311,7 +298,7 @@ def write(self, offset, data): end_offset = self.block_size to_write = StringIO(data) - for block_id in xrange(start_block, end_block + 1): + for block_id in range(start_block, end_block + 1): # Do we need a read-modify-write cycle? if ((block_id == start_block and start_offset != 0) or (block_id == end_block and @@ -350,7 +337,7 @@ def trim(self, offset, length): end_offset = self.block_size to_write = StringIO(data) - for block_id in xrange(start_block, end_block + 1): + for block_id in range(start_block, end_block + 1): # Skip partial blocks if ((block_id != start_block or start_offset == 0) and (block_id != end_block or end_offset == self.block_size)): @@ -385,32 +372,6 @@ def read_block(self, block_id): Read a block of data. The bucket must be a Boto S3 object; this is required for thread safety. """ - with self.lock: - result = self.pending_write.get(block_id) - if result is not None: - return result - - result = self.lrucache.get(block_id) - if result == "": - # Trimmed block - result = "\0" * self.block_size - return result - - def write_block(self, block_id, block_data): - """ - s3handler.write_block(bucket, block_id, block_data) - - Write a block of data. The bucket must be a Boto S3 object; this is - required for thread safety. - """ - with self.lock: - self.lrucache[block_id] = block_data - - def trim_block(self, block_id): - with self.lock: - self.lrucache[block_id] = "\0" * block_size - - def read_block_from_s3(self, block_id): with self.s3_pool.get_connection() as s3: key = self.get_key_for_block(s3.bucket, block_id) sleep_time = 0.1 @@ -440,19 +401,13 @@ def read_block_from_s3(self, block_id): sleep(sleep_time) sleep_time = min(5.0, 1.5 * sleep_time) - def schedule_block_write(self, block_id, block_data): - self.write_executor.submit(self.async_write_block_to_s3, block_id, - block_data) - self.pending_write[block_id] = block_data - - def async_write_block_to_s3(self, block_id): - with self.lock: - block_data = self.pending_write.get(block_id) - if block_data is None: - # Multiple write to the same block; we've already written the - # newer data. - return - + def write_block(self, block_id, block_data): + """ + s3handler.write_block(bucket, block_id, block_data) + + Write a block of data. The bucket must be a Boto S3 object; this is + required for thread safety. + """ with self.s3_pool.get_connection() as s3: key = self.get_key_for_block(s3.bucket, block_id) @@ -468,16 +423,9 @@ def async_write_block_to_s3(self, block_id): while True: try: - if block_data != "\0" * self.block-size: - # Trim empty blocks - key.delete() - else: - key.set_contents_from_string( - block_data, reduced_redundancy=rr, - policy=self.policy, encrypt_key=encrypt_key) - - with self.lock: - del self.pending_write[block_id] + key.set_contents_from_string( + block_data, reduced_redundancy=rr, + policy=self.policy, encrypt_key=encrypt_key) return except S3ResponseError as e: @@ -494,18 +442,34 @@ def async_write_block_to_s3(self, block_id): sleep(sleep_time) sleep_time = min(5.0, 1.5 * sleep_time) - def __getitem__(self, block_id): - # Called by WriteThroughCacheManager - return self.read_block_from_s3(block_id) - - def __setitem__(self, block_id, block_data): - # Called by WriteThroughCacheManager - return self.schedule_block_write(block_id, block_data) + def trim_block(self, block_id): + sleep_time = 0.1 + with self.s3_pool.get_connection() as s3: + key = self.get_key_for_block(s3.bucket, block_id) - def __contains__(self, block_id): - # Called by WriteThroughCacheManager - return 0 <= block_id < (self.size // self.block_size) + while True: + # Delete the key if it exists + try: + key.delete() + except S3ResponseError as e: + if e.status == 404: + # Ignore + return + elif e.status < 500: + log.warning("Received unexpected S3 error: %s", e, + exc_info=True) + raise OSError(EIO, str(e)) + else: + log.info("S3 temporarily unavailable (%d): %s; still " + "trying", e.status, e) + except EnvironmentError as e: + log.info("S3 temporarily unavailable (env): %s; still " + "trying", e) + sleep(sleep_time) + sleep_time = min(5.0, 1.5 * sleep_time) + + def parse_size(value, parameter_name, min=0, max=None): m = match(r"([0-9]+|0x[0-9a-fA-F]+)\s*" r"(k|kiB|M|MiB|G|GiB|T|TiB|P|PiB|E|EiB)?", value) @@ -534,8 +498,7 @@ def main(args=None): proxy_user = environ.get("PROXY_USER") proxy_password = environ.get("PROXY_PASSWORD") create = False - handler_threads = 1 - write_threads = 10 + thread_count = 10 if args is None: args = argv[1:] @@ -548,10 +511,10 @@ def main(args=None): try: opts, args = getopt(args, "b:B:cC:e:hp:P:r:s:S:t:", ["bucket=", "block-size=", "create", "encryption=", - "handler-threads=", "help", "policy=", + "help", "policy=", "profile=", "proxy-user=", "proxy-password=", "proxy-port=", "region=", "size=", - "storage-class=", "suffix=", "write-threads="]) + "storage-class=", "suffix=", "threads="]) for opt, value in opts: if opt in ("--bucket", "-b",): @@ -566,8 +529,6 @@ def main(args=None): if encryption != "sse-s3": raise ValueError( "Invalid encryption specification: %r" % value) - elif opt in ("--handler-threads",): - handler_threads = int(value) elif opt in ("--help", "-h",): usage(stdout) return 0 @@ -601,8 +562,8 @@ def main(args=None): raise ValueError("Invalid storage class: %r" % value) elif opt in ("--suffix", "-S",): suffix = value - elif opt in ("--write-threads",): - write_threads = int(value) + elif opt in ("--threads",): + thread_count = int(value) if bucket_name is None: raise GetoptError("--bucket-name is required") @@ -650,8 +611,7 @@ def main(args=None): logging.getLogger("boto").setLevel(logging.INFO) volume = UBDS3Volume(bucket_name, devname, region=region, - handler_threads=handler_threads, - write_threads=write_threads, + thread_count=thread_count, s3_kw={'profile_name': profile, 'proxy_user': proxy_user, 'proxy_pass': proxy_password,}) @@ -702,9 +662,6 @@ def usage(fd=stderr): The k, M, and G suffixes are base-2 (k == 2**10, M == 2**20, G == 2 ** 30). - --handler-threads - Create the specified number of threads to handle block requests. - --policy | -p Use the specified ACL policy. This defaults to 'private'. Valid values are 'private', 'public-read', 'public-read-write' (DANGEROUS), @@ -725,8 +682,8 @@ def usage(fd=stderr): Object names are suffixed rather than prefixed to improve performance (due to the way S3 partitions the bucket keyspace). - --write-threads - Create the specified number of threads to handle write-back requests. + --threads + Create the specified number of threads to handle requests. """) fd.flush()