Skip to content

Commit

Permalink
Remove caching bits.
Browse files Browse the repository at this point in the history
  • Loading branch information
dacut committed May 24, 2016
1 parent d697674 commit 3e79ef8
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 98 deletions.
2 changes: 1 addition & 1 deletion client/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"ubd-debug=ublkdev.ublkdev:debug",
]
},
install_requires=["boto>=2.0", "six>=1.0", "futures>=3.0", "pylru>=1.0"],
install_requires=["boto>=2.0", "six>=1.0", "futures>=3.0"],
#setup_requires=["nose>=1.0"],

# PyPI information
Expand Down
151 changes: 54 additions & 97 deletions client/ublkdev/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@
from json import dumps as json_dumps, loads as json_loads
import logging
from os import environ
from pylru import WriteThroughCacheManager
from re import match
from six.moves import cStringIO as StringIO
from six.moves import cStringIO as StringIO, range
from struct import pack, unpack
from sys import argv, exit, stderr, stdin, stdout
from threading import Condition, RLock, Thread
Expand Down Expand Up @@ -85,7 +84,7 @@ class S3Pool(object):
def __init__(self, region, size, bucket_name, s3_kw={}):
super(S3Pool, self).__init__()
conns = [boto.s3.connect_to_region(region, **s3_kw)
for i in xrange(size)]
for i in range(size)]
if [conn for conn in conns if conn is None]:
raise RuntimeError("Failed to connect to S3 region %r" % region)
self.connections = [
Expand Down Expand Up @@ -139,11 +138,9 @@ class UBDS3Volume(object):
"""
A userspace block driver volume handler for S3-backed volumes.
"""
def __init__(self, bucket_name, devname, region, handler_threads=1,
write_threads=10, cache_size=1000000, s3_kw={}):
def __init__(self, bucket_name, devname, region, thread_count=1, s3_kw={}):
"""
UBDS3Volume(bucket_name, devname, region, handler_threads=1,
write_threads=10, cache_size=1000000, s3_kw={})
UBDS3Volume(bucket_name, devname, region, thread_count=1, s3_kw={})
Create a new UBDS3Volume object.
"""
Expand All @@ -152,7 +149,7 @@ def __init__(self, bucket_name, devname, region, handler_threads=1,
self.devname = devname
self.region = region
self.ubd = None
self.handler_threads = handler_threads
self.thread_count = thread_count
self.block_size = None
self.encryption = None
self.policy = None
Expand All @@ -162,17 +159,7 @@ def __init__(self, bucket_name, devname, region, handler_threads=1,

self.stop_requested = False

self.s3_pool = S3Pool(region, handler_threads + write_threads,
bucket_name, s3_kw)

self.lock = RLock()

# lock must be held to read or write lrucache and pending_write
self.lrucache = WriteThroughCacheManager(self, cache_size)
self.pending_write = {}

self.write_executor = ThreadPoolExecutor(write_threads)

self.s3_pool = S3Pool(region, thread_count, bucket_name, s3_kw)
return

def register(self):
Expand All @@ -191,7 +178,7 @@ def register(self):

def run(self):
self.threads = [UBDS3Handler("handler-%d" % i, self)
for i in xrange(self.handler_threads)]
for i in range(self.thread_count)]

try:
for thread in self.threads:
Expand Down Expand Up @@ -279,7 +266,7 @@ def read(self, offset, length):
end_offset = self.block_size

result = StringIO()
for block_id in xrange(start_block, end_block + 1):
for block_id in range(start_block, end_block + 1):
block_data = self.read_block(block_id)

if block_id == start_block:
Expand Down Expand Up @@ -311,7 +298,7 @@ def write(self, offset, data):
end_offset = self.block_size

to_write = StringIO(data)
for block_id in xrange(start_block, end_block + 1):
for block_id in range(start_block, end_block + 1):
# Do we need a read-modify-write cycle?
if ((block_id == start_block and start_offset != 0) or
(block_id == end_block and
Expand Down Expand Up @@ -350,7 +337,7 @@ def trim(self, offset, length):
end_offset = self.block_size

to_write = StringIO(data)
for block_id in xrange(start_block, end_block + 1):
for block_id in range(start_block, end_block + 1):
# Skip partial blocks
if ((block_id != start_block or start_offset == 0) and
(block_id != end_block or end_offset == self.block_size)):
Expand Down Expand Up @@ -385,32 +372,6 @@ def read_block(self, block_id):
Read a block of data. The bucket must be a Boto S3 object; this is
required for thread safety.
"""
with self.lock:
result = self.pending_write.get(block_id)
if result is not None:
return result

result = self.lrucache.get(block_id)
if result == "":
# Trimmed block
result = "\0" * self.block_size
return result

def write_block(self, block_id, block_data):
"""
s3handler.write_block(bucket, block_id, block_data)
Write a block of data. The bucket must be a Boto S3 object; this is
required for thread safety.
"""
with self.lock:
self.lrucache[block_id] = block_data

def trim_block(self, block_id):
with self.lock:
self.lrucache[block_id] = "\0" * block_size

def read_block_from_s3(self, block_id):
with self.s3_pool.get_connection() as s3:
key = self.get_key_for_block(s3.bucket, block_id)
sleep_time = 0.1
Expand Down Expand Up @@ -440,19 +401,13 @@ def read_block_from_s3(self, block_id):
sleep(sleep_time)
sleep_time = min(5.0, 1.5 * sleep_time)

def schedule_block_write(self, block_id, block_data):
self.write_executor.submit(self.async_write_block_to_s3, block_id,
block_data)
self.pending_write[block_id] = block_data

def async_write_block_to_s3(self, block_id):
with self.lock:
block_data = self.pending_write.get(block_id)
if block_data is None:
# Multiple write to the same block; we've already written the
# newer data.
return

def write_block(self, block_id, block_data):
"""
s3handler.write_block(bucket, block_id, block_data)
Write a block of data. The bucket must be a Boto S3 object; this is
required for thread safety.
"""
with self.s3_pool.get_connection() as s3:
key = self.get_key_for_block(s3.bucket, block_id)

Expand All @@ -468,16 +423,9 @@ def async_write_block_to_s3(self, block_id):

while True:
try:
if block_data != "\0" * self.block-size:
# Trim empty blocks
key.delete()
else:
key.set_contents_from_string(
block_data, reduced_redundancy=rr,
policy=self.policy, encrypt_key=encrypt_key)

with self.lock:
del self.pending_write[block_id]
key.set_contents_from_string(
block_data, reduced_redundancy=rr,
policy=self.policy, encrypt_key=encrypt_key)

return
except S3ResponseError as e:
Expand All @@ -494,18 +442,34 @@ def async_write_block_to_s3(self, block_id):
sleep(sleep_time)
sleep_time = min(5.0, 1.5 * sleep_time)

def __getitem__(self, block_id):
# Called by WriteThroughCacheManager
return self.read_block_from_s3(block_id)

def __setitem__(self, block_id, block_data):
# Called by WriteThroughCacheManager
return self.schedule_block_write(block_id, block_data)
def trim_block(self, block_id):
sleep_time = 0.1
with self.s3_pool.get_connection() as s3:
key = self.get_key_for_block(s3.bucket, block_id)

def __contains__(self, block_id):
# Called by WriteThroughCacheManager
return 0 <= block_id < (self.size // self.block_size)
while True:
# Delete the key if it exists
try:
key.delete()
except S3ResponseError as e:
if e.status == 404:
# Ignore
return
elif e.status < 500:
log.warning("Received unexpected S3 error: %s", e,
exc_info=True)
raise OSError(EIO, str(e))
else:
log.info("S3 temporarily unavailable (%d): %s; still "
"trying", e.status, e)
except EnvironmentError as e:
log.info("S3 temporarily unavailable (env): %s; still "
"trying", e)

sleep(sleep_time)
sleep_time = min(5.0, 1.5 * sleep_time)


def parse_size(value, parameter_name, min=0, max=None):
m = match(r"([0-9]+|0x[0-9a-fA-F]+)\s*"
r"(k|kiB|M|MiB|G|GiB|T|TiB|P|PiB|E|EiB)?", value)
Expand Down Expand Up @@ -534,8 +498,7 @@ def main(args=None):
proxy_user = environ.get("PROXY_USER")
proxy_password = environ.get("PROXY_PASSWORD")
create = False
handler_threads = 1
write_threads = 10
thread_count = 10

if args is None:
args = argv[1:]
Expand All @@ -548,10 +511,10 @@ def main(args=None):
try:
opts, args = getopt(args, "b:B:cC:e:hp:P:r:s:S:t:",
["bucket=", "block-size=", "create", "encryption=",
"handler-threads=", "help", "policy=",
"help", "policy=",
"profile=", "proxy-user=", "proxy-password=",
"proxy-port=", "region=", "size=",
"storage-class=", "suffix=", "write-threads="])
"storage-class=", "suffix=", "threads="])

for opt, value in opts:
if opt in ("--bucket", "-b",):
Expand All @@ -566,8 +529,6 @@ def main(args=None):
if encryption != "sse-s3":
raise ValueError(
"Invalid encryption specification: %r" % value)
elif opt in ("--handler-threads",):
handler_threads = int(value)
elif opt in ("--help", "-h",):
usage(stdout)
return 0
Expand Down Expand Up @@ -601,8 +562,8 @@ def main(args=None):
raise ValueError("Invalid storage class: %r" % value)
elif opt in ("--suffix", "-S",):
suffix = value
elif opt in ("--write-threads",):
write_threads = int(value)
elif opt in ("--threads",):
thread_count = int(value)

if bucket_name is None:
raise GetoptError("--bucket-name is required")
Expand Down Expand Up @@ -650,8 +611,7 @@ def main(args=None):
logging.getLogger("boto").setLevel(logging.INFO)

volume = UBDS3Volume(bucket_name, devname, region=region,
handler_threads=handler_threads,
write_threads=write_threads,
thread_count=thread_count,
s3_kw={'profile_name': profile,
'proxy_user': proxy_user,
'proxy_pass': proxy_password,})
Expand Down Expand Up @@ -702,9 +662,6 @@ def usage(fd=stderr):
The k, M, and G suffixes are base-2 (k == 2**10, M == 2**20,
G == 2 ** 30).
--handler-threads <int>
Create the specified number of threads to handle block requests.
--policy <policy> | -p <policy>
Use the specified ACL policy. This defaults to 'private'. Valid
values are 'private', 'public-read', 'public-read-write' (DANGEROUS),
Expand All @@ -725,8 +682,8 @@ def usage(fd=stderr):
Object names are suffixed rather than prefixed to improve performance
(due to the way S3 partitions the bucket keyspace).
--write-threads <int>
Create the specified number of threads to handle write-back requests.
--threads <int>
Create the specified number of threads to handle requests.
""")

fd.flush()
Expand Down

0 comments on commit 3e79ef8

Please sign in to comment.