Skip to content

Commit

Permalink
Merge pull request #39 from jsphpl/master
Browse files Browse the repository at this point in the history
disable multiprocessing if unavailable
  • Loading branch information
piskvorky committed Dec 3, 2015
2 parents 1fe93a2 + 5ab4965 commit 57ac71e
Showing 1 changed file with 26 additions and 7 deletions.
33 changes: 26 additions & 7 deletions smart_open/smart_open_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
"""

import logging
import multiprocessing.pool
import os
import subprocess
import sys
Expand All @@ -37,6 +36,18 @@

logger = logging.getLogger(__name__)

# Multiprocessing is unavailable in App Engine (and possibly other sandboxes).
# The only method currently relying on it is s3_iter_bucket, which is instructed
# not to use it by the NO_MULTIPROCESSING flag.
try:
import multiprocessing.pool
except ImportError:
logger.warning("multiprocessing could not be imported and won't be used")
NO_MULTIPROCESSING = True
from itertools import imap
else:
NO_MULTIPROCESSING = False

S3_MIN_PART_SIZE = 50 * 1024**2 # minimum part size for S3 multipart uploads
WEBHDFS_MIN_PART_SIZE = 50 * 1024**2 # minimum part size for HDFS multipart uploads

Expand Down Expand Up @@ -578,7 +589,8 @@ def s3_iter_bucket(bucket, prefix='', accept_key=lambda key: True, key_limit=Non
If `key_limit` is given, stop after yielding out that many results.
The keys are processed in parallel, using `workers` processes (default: 16),
to speed up downloads greatly.
to speed up downloads greatly. If multiprocessing is not available, thus
NO_MULTIPROCESSING is True, this parameter will be ignored.
Example::
Expand All @@ -593,13 +605,18 @@ def s3_iter_bucket(bucket, prefix='', accept_key=lambda key: True, key_limit=Non
... print key, len(content)
"""
logger.info("iterating over keys from %s with %i workers" % (bucket, workers))

total_size, key_no = 0, -1
keys = (key for key in bucket.list(prefix=prefix) if accept_key(key.name))

pool = multiprocessing.pool.Pool(processes=workers)
for key_no, (key, content) in enumerate(pool.imap_unordered(s3_iter_bucket_process_key, keys)):
if NO_MULTIPROCESSING:
logger.info("iterating over keys from %s without multiprocessing" % bucket)
iterator = imap(s3_iter_bucket_process_key, keys)
else:
logger.info("iterating over keys from %s with %i workers" % (bucket, workers))
pool = multiprocessing.pool.Pool(processes=workers)
iterator = pool.imap_unordered(s3_iter_bucket_process_key, keys)

for key_no, (key, content) in enumerate(iterator):
if key_no % 1000 == 0:
logger.info("yielding key #%i: %s, size %i (total %.1fMB)" %
(key_no, key, len(content), total_size / 1024.0 ** 2))
Expand All @@ -611,7 +628,9 @@ def s3_iter_bucket(bucket, prefix='', accept_key=lambda key: True, key_limit=Non
if key_limit is not None and key_no + 1 >= key_limit:
# we were asked to output only a limited number of keys => we're done
break
pool.terminate()

if not NO_MULTIPROCESSING:
pool.terminate()

logger.info("processed %i keys, total size %i" % (key_no + 1, total_size))

Expand Down

0 comments on commit 57ac71e

Please sign in to comment.