Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor s3 submodule to minimize resource usage #569

Merged
merged 7 commits into from
Dec 27, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions benchmark/read_s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import sys

import boto3
import smart_open

urls = [line.strip() for line in sys.stdin]

tp = {}
if 'create_session_and_resource' in sys.argv:
tp['session'] = boto3.Session()
tp['resource'] = tp['session'].resource('s3')
elif 'create_resource' in sys.argv:
tp['resource'] = boto3.resource('s3')
elif 'create_session' in sys.argv:
tp['session'] = boto3.Session()

for url in urls:
smart_open.open(url, transport_params=tp).read()
26 changes: 26 additions & 0 deletions howto.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,32 @@ s3.ObjectVersion(bucket_name='smart-open-versioned', object_key='demo.txt', id='

```

## How to Read from S3 Efficiently

Under the covers, `smart_open` uses the [boto3 resource API](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/resources.html) to read from S3.
By default, calling `smart_open.open` with an S3 URL will create its own boto3 session and resource.
These are expensive operations: they require both CPU time to construct the objects from a low-level API definition, and memory to store the objects once they have been created.
It is possible to save both CPU time and memory by sharing the same resource across multiple `smart_open.open` calls, for example:

```
>>> import boto3
>>> tp = {'resource': boto3.resoure('s3')}
>>> urls = ['s3://commoncrawl/robots.txt'] * 3 # These URLs can be unique
>>> for url in urls:
... with smart_open.open(url) as fin:
... print(fin.readline())
'User-Agent: *\n'
'User-Agent: *\n'
```

The above sharing is safe because it is all happening in the same thread and subprocess.

## How to Work in a Parallelized Environment

Under the covers, `smart_open` uses the [boto3 resource API](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/resources.html) to read from S3.
This API is not thread-safe or multiprocess-safe.
Do not share the same `smart_open` objects across different threads or subprocesses.

## How to Specify the Request Payer (S3 only)

Some public buckets require you to [pay for S3 requests for the data in the bucket](https://docs.aws.amazon.com/AmazonS3/latest/dev/RequesterPaysBuckets.html).
Expand Down
14 changes: 6 additions & 8 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,13 @@ def _get_version():
def read(fname):
return io.open(os.path.join(os.path.dirname(__file__), fname), encoding='utf-8').read()

aws_deps = ['boto3']
gcp_deps = ['google-cloud-storage']
azure_deps = ['azure-storage-blob', 'azure-common', 'azure-core']
http_deps = ['requests']

tests_require = [
all_deps = aws_deps + gcp_deps + azure_deps + http_deps
tests_require = all_deps + [
'mock',
'moto[server]==1.3.14', # Older versions of moto appear broken
'pathlib2',
Expand All @@ -48,13 +53,6 @@ def read(fname):
'pytest-rerunfailures'
]

aws_deps = ['boto3']
gcp_deps = ['google-cloud-storage']
azure_deps = ['azure-storage-blob', 'azure-common', 'azure-core']
http_deps = ['requests']

all_deps = aws_deps + gcp_deps + azure_deps + http_deps

setup(
name='smart_open',
version=__version__,
Expand Down
153 changes: 95 additions & 58 deletions smart_open/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,20 +171,21 @@ def open_uri(uri, mode, transport_params):


def open(
bucket_id,
key_id,
mode,
version_id=None,
buffer_size=DEFAULT_BUFFER_SIZE,
min_part_size=DEFAULT_MIN_PART_SIZE,
session=None,
resource_kwargs=None,
multipart_upload_kwargs=None,
multipart_upload=True,
singlepart_upload_kwargs=None,
object_kwargs=None,
defer_seek=False,
):
bucket_id,
key_id,
mode,
version_id=None,
buffer_size=DEFAULT_BUFFER_SIZE,
min_part_size=DEFAULT_MIN_PART_SIZE,
session=None,
resource=None,
resource_kwargs=None,
multipart_upload_kwargs=None,
multipart_upload=True,
singlepart_upload_kwargs=None,
object_kwargs=None,
defer_seek=False,
):
"""Open an S3 object for reading or writing.

Parameters
Expand All @@ -201,8 +202,13 @@ def open(
The minimum part size for multipart uploads. For writing only.
session: object, optional
The S3 session to use when working with boto3.
If you don't specify this, then smart_open will create a new session for you.
resource: object, optional
The S3 resource to use when working with boto3.
If you don't specify this, then smart_open will create a new resource for you.
resource_kwargs: dict, optional
Keyword arguments to use when accessing the S3 resource for reading or writing.
Keyword arguments to use when creating the S3 resource for reading or writing.
Will be ignored if you specify the resource object explicitly.
multipart_upload_kwargs: dict, optional
Additional parameters to pass to boto3's initiate_multipart_upload function.
For writing only.
Expand Down Expand Up @@ -242,6 +248,7 @@ def open(
version_id=version_id,
buffer_size=buffer_size,
session=session,
resource=resource,
resource_kwargs=resource_kwargs,
object_kwargs=object_kwargs,
defer_seek=defer_seek,
Expand Down Expand Up @@ -420,30 +427,68 @@ def read(self, size=-1):
return binary


def _initialize_boto3(rw, session, resource, resource_kwargs):
"""Created the required objects for accessing S3. Ideally, they have
been already created for us and we can just reuse them.

We only really need one thing: the resource. There are multiple ways of
getting one, in order of effort:

1) Directly from the user
2) From the session directly specified by the user
3) From an entirely new session

Once we have the resource, we no longer need the session.
"""
if resource_kwargs is None:
resource_kwargs = {}

if resource:
if session:
logger.warning('ignoring session because resource was passed explicitly')
if resource_kwargs:
logger.warning('ignoring resource_kwargs because resource was passed explicitly')
rw._session = None
rw._resource = resource
elif session:
rw._session = session
rw._resource = rw._session.resource('s3', **resource_kwargs)
rw._resource_kwargs = resource_kwargs
else:
rw._session = boto3.Session()
rw._resource = rw._session.resource('s3', **resource_kwargs)
rw._resource_kwargs = resource_kwargs


class Reader(io.BufferedIOBase):
"""Reads bytes from S3.

Implements the io.BufferedIOBase interface of the standard library."""

def __init__(self, bucket, key, version_id=None, buffer_size=DEFAULT_BUFFER_SIZE,
line_terminator=constants.BINARY_NEWLINE, session=None, resource_kwargs=None,
object_kwargs=None, defer_seek=False):

def __init__(
self,
bucket,
key,
version_id=None,
buffer_size=DEFAULT_BUFFER_SIZE,
line_terminator=constants.BINARY_NEWLINE,
session=None,
resource=None,
resource_kwargs=None,
object_kwargs=None,
defer_seek=False,
):
self._buffer_size = buffer_size

if session is None:
session = boto3.Session()
if resource_kwargs is None:
resource_kwargs = {}
if object_kwargs is None:
object_kwargs = {}

self._session = session
self._resource_kwargs = resource_kwargs
self._object_kwargs = object_kwargs
_initialize_boto3(self, session, resource, resource_kwargs)

s3 = session.resource('s3', **resource_kwargs)
self._object = s3.Object(bucket, key)
self._object_kwargs = object_kwargs
self._object = self._resource.Object(bucket, key)
self._version_id = version_id

self._raw_reader = _SeekableRawReader(
Expand Down Expand Up @@ -586,14 +631,16 @@ def to_boto3(self):

The created instance will re-use the session and resource parameters of
the current instance, but it will be independent: changes to the
`boto3.s3.Object` may not necessary affect the current instance.
`boto3.s3.Object` may not necessarily affect the current instance.

"""
s3 = self._session.resource('s3', **self._resource_kwargs)
if self._version_id is not None:
return s3.Object(self._object.bucket_name, self._object.key).Version(self._version_id)
return self._resource.Object(
self._object.bucket_name,
self._object.key,
).Version(self._version_id)
else:
return s3.Object(self._object.bucket_name, self._object.key)
return self._resource.Object(self._object.bucket_name, self._object.key)

#
# Internal methods.
Expand Down Expand Up @@ -647,32 +694,28 @@ class MultipartWriter(io.BufferedIOBase):
Implements the io.BufferedIOBase interface of the standard library."""

def __init__(
self,
bucket,
key,
min_part_size=DEFAULT_MIN_PART_SIZE,
session=None,
resource_kwargs=None,
upload_kwargs=None,
):
self,
bucket,
key,
min_part_size=DEFAULT_MIN_PART_SIZE,
session=None,
resource=None,
resource_kwargs=None,
upload_kwargs=None,
):
if min_part_size < MIN_MIN_PART_SIZE:
logger.warning("S3 requires minimum part size >= 5MB; \
multipart upload may fail")

if session is None:
session = boto3.Session()
if resource_kwargs is None:
resource_kwargs = {}
_initialize_boto3(self, session, resource, resource_kwargs)

if upload_kwargs is None:
upload_kwargs = {}

self._session = session
self._resource_kwargs = resource_kwargs
self._upload_kwargs = upload_kwargs

s3 = session.resource('s3', **resource_kwargs)
try:
self._object = s3.Object(bucket, key)
self._object = self._resource.Object(bucket, key)
self._min_part_size = min_part_size
partial = functools.partial(self._object.initiate_multipart_upload, **self._upload_kwargs)
self._mp = _retry_if_failed(partial)
Expand Down Expand Up @@ -773,8 +816,7 @@ def to_boto3(self):
`boto3.s3.Object` may not necessary affect the current instance.

"""
s3 = self._session.resource('s3', **self._resource_kwargs)
return s3.Object(self._object.bucket_name, self._object.key)
return self._resource.Object(self._object.bucket_name, self._object.key)

#
# Internal methods.
Expand Down Expand Up @@ -841,26 +883,21 @@ def __init__(
bucket,
key,
session=None,
resource=None,
resource_kwargs=None,
upload_kwargs=None,
):

self._session = session
self._resource_kwargs = resource_kwargs
_initialize_boto3(self, session, resource, resource_kwargs)

if session is None:
session = boto3.Session()
if resource_kwargs is None:
resource_kwargs = {}
if upload_kwargs is None:
upload_kwargs = {}

self._upload_kwargs = upload_kwargs

s3 = session.resource('s3', **resource_kwargs)
try:
self._object = s3.Object(bucket, key)
s3.meta.client.head_bucket(Bucket=bucket)
self._object = self._resource.Object(bucket, key)
self._resource.meta.client.head_bucket(Bucket=bucket)
except botocore.client.ClientError as e:
raise ValueError('the bucket %r does not exist, or is forbidden for access' % bucket) from e

Expand Down Expand Up @@ -1123,7 +1160,7 @@ def _download_key(key_name, bucket_name=None, retries=3, **session_kwargs):
raise ValueError('bucket_name may not be None')

#
# https://geekpete.com/blog/multithreading-boto3/
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/resources.html#multithreading-and-multiprocessing
#
session = boto3.session.Session(**session_kwargs)
s3 = session.resource('s3')
Expand Down