From cb2e6b9e323b4051cc100f2759b5c182e6a04e15 Mon Sep 17 00:00:00 2001 From: Michael Penkov Date: Fri, 18 Dec 2020 13:39:16 +0900 Subject: [PATCH 1/7] refactor s3 submodule to minimize resource usage creating sessions and resources costs time and memory --- setup.py | 14 ++--- smart_open/s3.py | 153 +++++++++++++++++++++++++++++------------------ 2 files changed, 101 insertions(+), 66 deletions(-) diff --git a/setup.py b/setup.py index 6df86ab2..8e15fdef 100644 --- a/setup.py +++ b/setup.py @@ -35,8 +35,13 @@ def _get_version(): def read(fname): return io.open(os.path.join(os.path.dirname(__file__), fname), encoding='utf-8').read() +aws_deps = ['boto3'] +gcp_deps = ['google-cloud-storage'] +azure_deps = ['azure-storage-blob', 'azure-common', 'azure-core'] +http_deps = ['requests'] -tests_require = [ +all_deps = aws_deps + gcp_deps + azure_deps + http_deps +tests_require = all_deps + [ 'mock', 'moto[server]==1.3.14', # Older versions of moto appear broken 'pathlib2', @@ -48,13 +53,6 @@ def read(fname): 'pytest-rerunfailures' ] -aws_deps = ['boto3'] -gcp_deps = ['google-cloud-storage'] -azure_deps = ['azure-storage-blob', 'azure-common', 'azure-core'] -http_deps = ['requests'] - -all_deps = aws_deps + gcp_deps + azure_deps + http_deps - setup( name='smart_open', version=__version__, diff --git a/smart_open/s3.py b/smart_open/s3.py index 1702ed35..45b16308 100644 --- a/smart_open/s3.py +++ b/smart_open/s3.py @@ -171,20 +171,21 @@ def open_uri(uri, mode, transport_params): def open( - bucket_id, - key_id, - mode, - version_id=None, - buffer_size=DEFAULT_BUFFER_SIZE, - min_part_size=DEFAULT_MIN_PART_SIZE, - session=None, - resource_kwargs=None, - multipart_upload_kwargs=None, - multipart_upload=True, - singlepart_upload_kwargs=None, - object_kwargs=None, - defer_seek=False, - ): + bucket_id, + key_id, + mode, + version_id=None, + buffer_size=DEFAULT_BUFFER_SIZE, + min_part_size=DEFAULT_MIN_PART_SIZE, + session=None, + resource=None, + resource_kwargs=None, + multipart_upload_kwargs=None, + multipart_upload=True, + singlepart_upload_kwargs=None, + object_kwargs=None, + defer_seek=False, +): """Open an S3 object for reading or writing. Parameters @@ -201,8 +202,13 @@ def open( The minimum part size for multipart uploads. For writing only. session: object, optional The S3 session to use when working with boto3. + If you don't specify this, then smart_open will create a new session for you. + resource: object, optonal + The S3 resource to use when working with boto3. + If you don't specify this, then smart_open will create a new resource for you. resource_kwargs: dict, optional - Keyword arguments to use when accessing the S3 resource for reading or writing. + Keyword arguments to use when creating the S3 resource for reading or writing. + Will be ignored if you specify the resource object explicitly. multipart_upload_kwargs: dict, optional Additional parameters to pass to boto3's initiate_multipart_upload function. For writing only. @@ -242,6 +248,7 @@ def open( version_id=version_id, buffer_size=buffer_size, session=session, + resource=resource, resource_kwargs=resource_kwargs, object_kwargs=object_kwargs, defer_seek=defer_seek, @@ -420,30 +427,68 @@ def read(self, size=-1): return binary +def _initialize_boto3(rw, session, resource, resource_kwargs): + """Created the required objects for accessing S3. Ideally, they have + been already created for us and we can just reuse them. + + We only really need one thing: the resource. There are multiple ways of + getting one, in order of effort: + + 1) Directly from the user + 2) From the session directly specified by the user + 3) From an entirely new session + + Once we have the resource, we no longer need the session. + """ + if resource_kwargs is None: + resource_kwargs = {} + + if resource: + if session: + logger.warning('ignoring session because resource was passed explicitly') + if resource_kwargs: + logger.warning('ignoring resource_kwargs because resource was passed explicitly') + rw._session = None + rw._resource = resource + elif session: + rw._session = session + rw._resource = rw._session.resource('s3', **resource_kwargs) + rw._resource_kwargs = resource_kwargs + else: + rw._session = boto3.Session() + rw._resource = rw._session.resource('s3', **resource_kwargs) + rw._resource_kwargs = resource_kwargs + + class Reader(io.BufferedIOBase): """Reads bytes from S3. Implements the io.BufferedIOBase interface of the standard library.""" - def __init__(self, bucket, key, version_id=None, buffer_size=DEFAULT_BUFFER_SIZE, - line_terminator=constants.BINARY_NEWLINE, session=None, resource_kwargs=None, - object_kwargs=None, defer_seek=False): - + def __init__( + self, + bucket, + key, + version_id=None, + buffer_size=DEFAULT_BUFFER_SIZE, + line_terminator=constants.BINARY_NEWLINE, + session=None, + resource=None, + resource_kwargs=None, + object_kwargs=None, + defer_seek=False, + ): self._buffer_size = buffer_size - if session is None: - session = boto3.Session() if resource_kwargs is None: resource_kwargs = {} if object_kwargs is None: object_kwargs = {} - self._session = session - self._resource_kwargs = resource_kwargs - self._object_kwargs = object_kwargs + _initialize_boto3(self, session, resource, resource_kwargs) - s3 = session.resource('s3', **resource_kwargs) - self._object = s3.Object(bucket, key) + self._object_kwargs = object_kwargs + self._object = self._resource.Object(bucket, key) self._version_id = version_id self._raw_reader = _SeekableRawReader( @@ -586,14 +631,16 @@ def to_boto3(self): The created instance will re-use the session and resource parameters of the current instance, but it will be independent: changes to the - `boto3.s3.Object` may not necessary affect the current instance. + `boto3.s3.Object` may not necessarily affect the current instance. """ - s3 = self._session.resource('s3', **self._resource_kwargs) if self._version_id is not None: - return s3.Object(self._object.bucket_name, self._object.key).Version(self._version_id) + return self._resource.Object( + self._object.bucket_name, + self._object.key, + ).Version(self._version_id) else: - return s3.Object(self._object.bucket_name, self._object.key) + return self._resource.Object(self._object.bucket_name, self._object.key) # # Internal methods. @@ -647,32 +694,28 @@ class MultipartWriter(io.BufferedIOBase): Implements the io.BufferedIOBase interface of the standard library.""" def __init__( - self, - bucket, - key, - min_part_size=DEFAULT_MIN_PART_SIZE, - session=None, - resource_kwargs=None, - upload_kwargs=None, - ): + self, + bucket, + key, + min_part_size=DEFAULT_MIN_PART_SIZE, + session=None, + resource=None, + resource_kwargs=None, + upload_kwargs=None, + ): if min_part_size < MIN_MIN_PART_SIZE: logger.warning("S3 requires minimum part size >= 5MB; \ multipart upload may fail") - if session is None: - session = boto3.Session() - if resource_kwargs is None: - resource_kwargs = {} + _initialize_boto3(self, session, resource, resource_kwargs) + if upload_kwargs is None: upload_kwargs = {} - self._session = session - self._resource_kwargs = resource_kwargs self._upload_kwargs = upload_kwargs - s3 = session.resource('s3', **resource_kwargs) try: - self._object = s3.Object(bucket, key) + self._object = self._resource.Object(bucket, key) self._min_part_size = min_part_size partial = functools.partial(self._object.initiate_multipart_upload, **self._upload_kwargs) self._mp = _retry_if_failed(partial) @@ -773,8 +816,7 @@ def to_boto3(self): `boto3.s3.Object` may not necessary affect the current instance. """ - s3 = self._session.resource('s3', **self._resource_kwargs) - return s3.Object(self._object.bucket_name, self._object.key) + return self._resource.Object(self._object.bucket_name, self._object.key) # # Internal methods. @@ -841,26 +883,21 @@ def __init__( bucket, key, session=None, + resource=None, resource_kwargs=None, upload_kwargs=None, ): - self._session = session - self._resource_kwargs = resource_kwargs + _initialize_boto3(self, session, resource, resource_kwargs) - if session is None: - session = boto3.Session() - if resource_kwargs is None: - resource_kwargs = {} if upload_kwargs is None: upload_kwargs = {} self._upload_kwargs = upload_kwargs - s3 = session.resource('s3', **resource_kwargs) try: - self._object = s3.Object(bucket, key) - s3.meta.client.head_bucket(Bucket=bucket) + self._object = self._resource.Object(bucket, key) + self._resource.meta.client.head_bucket(Bucket=bucket) except botocore.client.ClientError as e: raise ValueError('the bucket %r does not exist, or is forbidden for access' % bucket) from e @@ -1123,7 +1160,7 @@ def _download_key(key_name, bucket_name=None, retries=3, **session_kwargs): raise ValueError('bucket_name may not be None') # - # https://geekpete.com/blog/multithreading-boto3/ + # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/resources.html#multithreading-and-multiprocessing # session = boto3.session.Session(**session_kwargs) s3 = session.resource('s3') From 0a9bee5906a1d5172afc7881300fa03ff98fe8f0 Mon Sep 17 00:00:00 2001 From: Michael Penkov Date: Fri, 18 Dec 2020 13:40:13 +0900 Subject: [PATCH 2/7] git add benchmark/read_s3.py $ time python benchmark/read_s3.py < benchmark/urls.txt real 1m20.786s user 0m8.619s sys 0m0.894s $ time python benchmark/read_s3.py create_session < benchmark/urls.txt real 1m45.826s user 0m4.554s sys 0m0.149s $ time python benchmark/read_s3.py create_resource < benchmark/urls.txt real 0m22.046s user 0m1.474s sys 0m0.065s $ time python benchmark/read_s3.py create_session_and_resource < benchmark/urls.txt real 0m21.086s user 0m1.496s sys 0m0.073s --- benchmark/read_s3.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) create mode 100644 benchmark/read_s3.py diff --git a/benchmark/read_s3.py b/benchmark/read_s3.py new file mode 100644 index 00000000..b6d868cc --- /dev/null +++ b/benchmark/read_s3.py @@ -0,0 +1,19 @@ +import sys + +import boto3 +import smart_open + +urls = [line.strip() for line in sys.stdin] + +tp = {} +if 'create_session_and_resource': + tp['session'] = boto3.Session() + tp['resource'] = tp['session'].resource('s3') +elif 'create_resource' in sys.argv: + tp['resource'] = boto3.resource('s3') +elif 'create_session' in sys.argv: + tp['session'] = boto3.Session() + +for i, url in enumerate(urls, 1): + # print('%d/%d %s' % (i, len(urls), url)) + smart_open.open(url, transport_params=tp).read() From 303d873bc9edbab4c2d8962afe5068e0a0486794 Mon Sep 17 00:00:00 2001 From: Michael Penkov Date: Fri, 18 Dec 2020 13:57:01 +0900 Subject: [PATCH 3/7] fixup --- benchmark/read_s3.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmark/read_s3.py b/benchmark/read_s3.py index b6d868cc..f5d43256 100644 --- a/benchmark/read_s3.py +++ b/benchmark/read_s3.py @@ -6,7 +6,7 @@ urls = [line.strip() for line in sys.stdin] tp = {} -if 'create_session_and_resource': +if 'create_session_and_resource' in sys.argv: tp['session'] = boto3.Session() tp['resource'] = tp['session'].resource('s3') elif 'create_resource' in sys.argv: From e5a51202da0ff3160a50743622572921c2d6f6dc Mon Sep 17 00:00:00 2001 From: Michael Penkov Date: Sat, 19 Dec 2020 08:08:16 +0900 Subject: [PATCH 4/7] Update smart_open/s3.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Radim Řehůřek --- smart_open/s3.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/smart_open/s3.py b/smart_open/s3.py index 45b16308..3b712af6 100644 --- a/smart_open/s3.py +++ b/smart_open/s3.py @@ -203,7 +203,7 @@ def open( session: object, optional The S3 session to use when working with boto3. If you don't specify this, then smart_open will create a new session for you. - resource: object, optonal + resource: object, optional The S3 resource to use when working with boto3. If you don't specify this, then smart_open will create a new resource for you. resource_kwargs: dict, optional From 52e177d1c0df6ef8cc572eaa67f2b18a6fac5ff5 Mon Sep 17 00:00:00 2001 From: Michael Penkov Date: Sat, 19 Dec 2020 17:45:24 +0900 Subject: [PATCH 5/7] Update read_s3.py --- benchmark/read_s3.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/benchmark/read_s3.py b/benchmark/read_s3.py index f5d43256..61691357 100644 --- a/benchmark/read_s3.py +++ b/benchmark/read_s3.py @@ -14,6 +14,5 @@ elif 'create_session' in sys.argv: tp['session'] = boto3.Session() -for i, url in enumerate(urls, 1): - # print('%d/%d %s' % (i, len(urls), url)) +for url in urls: smart_open.open(url, transport_params=tp).read() From 4d17db76e939779434a1e24f51e9212ddccec2fa Mon Sep 17 00:00:00 2001 From: Michael Penkov Date: Sat, 26 Dec 2020 22:04:26 +0900 Subject: [PATCH 6/7] update howto --- howto.md | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/howto.md b/howto.md index 1d621623..e35410cb 100644 --- a/howto.md +++ b/howto.md @@ -173,6 +173,32 @@ s3.ObjectVersion(bucket_name='smart-open-versioned', object_key='demo.txt', id=' ``` +## How to Read from S3 Efficiently + +Under the covers, `smart_open` uses the [boto3 resource API](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/resources.html) to read from S3. +By default, calling `smart_open.open` with an S3 URL will create its own boto3 session and resource. +These are expensive operations: they require both CPU time to construct the objects from a low-level API definition, and memory to store the objects once they have been created. +It is possible to save both CPU time and memory by sharing the same resource across multiple `smart_open.open` calls, for example: + +``` +>>> import boto3 +>>> tp = {'resource': boto3.resoure('s3')} +>>> urls = ['s3://commoncrawl/robots.txt'] * 3 # These URLs can be unique +>>> for url in urls: +... with smart_open.open(url) as fin: +... print(fin.readline()) +'User-Agent: *\n' +'User-Agent: *\n' +``` + +The above sharing is safe because it is all happening in the same thread and subprocess. + +## How to Work in a Parallelized Environment + +Under the covers, `smart_open` uses the [boto3 resource API](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/resources.html) to read from S3. +This API is not thread-safe or multiprocess-safe. +Do not share the same `smart_open` objects across different threads or subprocesses. + ## How to Specify the Request Payer (S3 only) Some public buckets require you to [pay for S3 requests for the data in the bucket](https://docs.aws.amazon.com/AmazonS3/latest/dev/RequesterPaysBuckets.html). From 49b49b53f3728413b5a6582a472d3b2f7dcf656b Mon Sep 17 00:00:00 2001 From: Michael Penkov Date: Sun, 27 Dec 2020 16:45:52 +0900 Subject: [PATCH 7/7] update howto.md --- howto.md | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/howto.md b/howto.md index e35410cb..be3f7650 100644 --- a/howto.md +++ b/howto.md @@ -182,22 +182,28 @@ It is possible to save both CPU time and memory by sharing the same resource acr ``` >>> import boto3 ->>> tp = {'resource': boto3.resoure('s3')} ->>> urls = ['s3://commoncrawl/robots.txt'] * 3 # These URLs can be unique ->>> for url in urls: -... with smart_open.open(url) as fin: -... print(fin.readline()) -'User-Agent: *\n' -'User-Agent: *\n' +>>> from smart_open import open +>>> tp = {'resource': boto3.resource('s3')} +>>> for month in (1, 2, 3): +... url = 's3://nyc-tlc/trip data/yellow_tripdata_2020-%02d.csv' % month +... with open(url, transport_params=tp) as fin: +... _ = fin.readline() # skip CSV header +... print(fin.readline().strip()) +1,2020-01-01 00:28:15,2020-01-01 00:33:03,1,1.20,1,N,238,239,1,6,3,0.5,1.47,0,0.3,11.27,2.5 +1,2020-02-01 00:17:35,2020-02-01 00:30:32,1,2.60,1,N,145,7,1,11,0.5,0.5,2.45,0,0.3,14.75,0 +1,2020-03-01 00:31:13,2020-03-01 01:01:42,1,4.70,1,N,88,255,1,22,3,0.5,2,0,0.3,27.8,2.5 + ``` -The above sharing is safe because it is all happening in the same thread and subprocess. +The above sharing is safe because it is all happening in the same thread and subprocess (see below for details). ## How to Work in a Parallelized Environment Under the covers, `smart_open` uses the [boto3 resource API](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/resources.html) to read from S3. This API is not thread-safe or multiprocess-safe. Do not share the same `smart_open` objects across different threads or subprocesses. +`smart_open` will create its own session and resource objects for each individual `open` call, so you don't have to worry about managing boto3 objects. +This comes at a price: each session and resource requires CPU time to create and memory to store, so be wary of keeping hundreds of threads or subprocesses reading/writing from/to S3. ## How to Specify the Request Payer (S3 only) @@ -232,7 +238,7 @@ First, install localstack and start it: The start command is blocking, so you'll need to run it in a separate terminal session or run it in the background. Before we can read/write, we'll need to create a bucket: - $ aws --endpoint-url http://localhost:4566 s3api create-bucket --bucket-name mybucket + $ aws --endpoint-url http://localhost:4566 s3api create-bucket --bucket mybucket where `http://localhost:4566` is the default host/port that localstack uses to listen for requests.