From cfc143238ec326d775a2157ea77678dd4f51c5a4 Mon Sep 17 00:00:00 2001 From: Joseph Paul Date: Thu, 5 Nov 2015 16:59:19 +0100 Subject: [PATCH 1/2] disable multiprocessing if unavailable --- smart_open/smart_open_lib.py | 35 ++++++++++++++++++++++++++++------- 1 file changed, 28 insertions(+), 7 deletions(-) diff --git a/smart_open/smart_open_lib.py b/smart_open/smart_open_lib.py index 22f056a1..12821c63 100644 --- a/smart_open/smart_open_lib.py +++ b/smart_open/smart_open_lib.py @@ -22,7 +22,6 @@ """ import logging -import multiprocessing.pool import os import subprocess import sys @@ -37,6 +36,18 @@ logger = logging.getLogger(__name__) +# Multiprocessing is unavailable in App Engine (and possibly other sandboxes). +# The only method currently relying on it is s3_iter_bucket, which is instructed +# not to use it by the NO_MULTIPROCESSING flag. +try: + import multiprocessing.pool +except ImportError: + logger.warning("multiprocessing could not be imported and won't be used") + NO_MULTIPROCESSING = True + from itertools import imap +else: + NO_MULTIPROCESSING = False + S3_MIN_PART_SIZE = 50 * 1024**2 # minimum part size for S3 multipart uploads WEBHDFS_MIN_PART_SIZE = 50 * 1024**2 # minimum part size for HDFS multipart uploads @@ -578,7 +589,8 @@ def s3_iter_bucket(bucket, prefix='', accept_key=lambda key: True, key_limit=Non If `key_limit` is given, stop after yielding out that many results. The keys are processed in parallel, using `workers` processes (default: 16), - to speed up downloads greatly. + to speed up downloads greatly. If multiprocessing is not available, thus + NO_MULTIPROCESSING is True, this parameter will be ignored. Example:: @@ -593,13 +605,18 @@ def s3_iter_bucket(bucket, prefix='', accept_key=lambda key: True, key_limit=Non ... print key, len(content) """ - logger.info("iterating over keys from %s with %i workers" % (bucket, workers)) - total_size, key_no = 0, -1 keys = (key for key in bucket.list(prefix=prefix) if accept_key(key.name)) - pool = multiprocessing.pool.Pool(processes=workers) - for key_no, (key, content) in enumerate(pool.imap_unordered(s3_iter_bucket_process_key, keys)): + if NO_MULTIPROCESSING: + logger.info("iterating over keys from %s without multiprocessing" % bucket) + iterator = imap(s3_iter_bucket_process_key, keys) + else: + logger.info("iterating over keys from %s with %i workers" % (bucket, workers)) + pool = multiprocessing.pool.Pool(processes=workers) + iterator = pool.imap_unordered(s3_iter_bucket_process_key, keys) + + for key_no, (key, content) in enumerate(iterator): if key_no % 1000 == 0: logger.info("yielding key #%i: %s, size %i (total %.1fMB)" % (key_no, key, len(content), total_size / 1024.0 ** 2)) @@ -611,7 +628,11 @@ def s3_iter_bucket(bucket, prefix='', accept_key=lambda key: True, key_limit=Non if key_limit is not None and key_no + 1 >= key_limit: # we were asked to output only a limited number of keys => we're done break - pool.terminate() + + try: + pool.terminate() + except UnboundLocalError: + pass # pool not defined when in NO_MULTIPROCESSING mode logger.info("processed %i keys, total size %i" % (key_no + 1, total_size)) From 5ab4965853fc8c646c4b75dfe1185b96cf8546c2 Mon Sep 17 00:00:00 2001 From: Joseph Paul Date: Tue, 1 Dec 2015 15:11:00 +0100 Subject: [PATCH 2/2] checking against NO_MULTIPROCESSING instead of try-catch to terminate the worker pool in s3_iter_bucket --- smart_open/smart_open_lib.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/smart_open/smart_open_lib.py b/smart_open/smart_open_lib.py index 12821c63..76602c69 100644 --- a/smart_open/smart_open_lib.py +++ b/smart_open/smart_open_lib.py @@ -629,10 +629,8 @@ def s3_iter_bucket(bucket, prefix='', accept_key=lambda key: True, key_limit=Non # we were asked to output only a limited number of keys => we're done break - try: + if not NO_MULTIPROCESSING: pool.terminate() - except UnboundLocalError: - pass # pool not defined when in NO_MULTIPROCESSING mode logger.info("processed %i keys, total size %i" % (key_no + 1, total_size))