Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to download from S3 buckets #671

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions create-notice.sh
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,19 @@ function main {
add_license "py-cpuinfo" "https://raw.githubusercontent.com/workhorsy/py-cpuinfo/master/LICENSE"
add_license "tabulate" "https://bitbucket.org/astanin/python-tabulate/raw/03182bf9b8a2becbc54d17aa7e3e7dfed072c5f5/LICENSE"
add_license "thespian" "https://raw.githubusercontent.com/kquick/Thespian/master/LICENSE.txt"
add_license "boto3" "https://raw.githubusercontent.com/boto/boto3/develop/LICENSE"

# transitive dependencies
# Jinja2 -> Markupsafe
add_license "Markupsafe" "https://raw.githubusercontent.com/pallets/markupsafe/master/LICENSE.rst"
# elasticsearch -> urllib3
add_license "urllib3" "https://raw.githubusercontent.com/shazow/urllib3/master/LICENSE.txt"
# boto3 -> s3transfer
add_license "s3transfer" "https://raw.githubusercontent.com/boto/s3transfer/develop/LICENSE.txt"
# boto3 -> jmespath
add_license "jmespath" "https://raw.githubusercontent.com/jmespath/jmespath.py/develop/LICENSE.txt"
# boto3 -> botocore
add_license "botocore" "https://raw.githubusercontent.com/boto/botocore/develop/LICENSE.txt"
}

main
2 changes: 1 addition & 1 deletion docs/track.rst
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ The ``corpora`` section contains all document corpora that are used by this trac

Each entry in the ``documents`` list consists of the following properties:

* ``base-url`` (optional): A http or https URL that points to the root path where Rally can obtain the corresponding source file.
* ``base-url`` (optional): A http(s) or S3 URL that points to the root path where Rally can obtain the corresponding source file. Rally can also download data from private S3 buckets if access is properly `configured <https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html#configuration>`_.
* ``source-format`` (optional, default: ``bulk``): Defines in which format Rally should interpret the data file specified by ``source-file``. Currently, only ``bulk`` is supported.
* ``source-file`` (mandatory): File name of the corresponding documents. For local use, this file can be a ``.json`` file. If you provide a ``base-url`` we recommend that you provide a compressed file here. The following extensions are supported: ``.zip``, ``.bz2``, ``.gz``, ``.tar``, ``.tar.gz``, ``.tgz`` or ``.tar.bz2``. It must contain exactly one JSON file with the same name. The preferred file extension for our official tracks is ``.bz2``.
* ``includes-action-and-meta-data`` (optional, defaults to ``false``): Defines whether the documents file contains already an action and meta-data line (``true``) or only documents (``false``).
Expand Down
93 changes: 69 additions & 24 deletions esrally/utils/net.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,39 +65,84 @@ def finish(self):
self.p.finish()


# This function is not meant to be called externally and only exists for unit tests
def _download_from_s3_bucket(bucket_name, bucket_path, local_path, expected_size_in_bytes=None, progress_indicator=None):
# lazily initialize S3 support - we might not need it
import boto3.s3.transfer

class S3ProgressAdapter:
def __init__(self, size, progress):
self._expected_size_in_bytes = size
self._progress = progress
self._bytes_read = 0

def __call__(self, bytes_amount):
self._bytes_read += bytes_amount
self._progress(self._bytes_read, self._expected_size_in_bytes)

s3 = boto3.resource("s3")
progress_callback = S3ProgressAdapter(expected_size_in_bytes, progress_indicator) if progress_indicator else None
s3.Bucket(bucket_name).download_file(bucket_path, local_path,
Callback=progress_callback,
Config=boto3.s3.transfer.TransferConfig(use_threads=False))


def download_s3(url, local_path, expected_size_in_bytes=None, progress_indicator=None):
logger = logging.getLogger(__name__)

bucket_and_path = url[5:]
bucket_end_index = bucket_and_path.find("/")
bucket = bucket_and_path[:bucket_end_index]
# we need to remove the leading "/"
bucket_path = bucket_and_path[bucket_end_index + 1:]

logger.info("Downloading from S3 bucket [%s] and path [%s] to [%s].", bucket, bucket_path, local_path)
_download_from_s3_bucket(bucket, bucket_path, local_path, expected_size_in_bytes, progress_indicator)

return expected_size_in_bytes


def download_http(url, local_path, expected_size_in_bytes=None, progress_indicator=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it also make sense to add a unit test for download_http? I couldn't find if we have a test for download().. although it is a well executed code path so not sure if we need it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tbh, I attempted to do it but I ended up mocking so much code that I had the impression it's not really worth it.

with __http().request("GET", url, preload_content=False, retries=10,
timeout=urllib3.Timeout(connect=45, read=240)) as r, open(local_path, "wb") as out_file:
if r.status > 299:
raise urllib.error.HTTPError(url, r.status, "", None, None)
# noinspection PyBroadException
try:
size_from_content_header = int(r.getheader("Content-Length"))
if expected_size_in_bytes is None:
expected_size_in_bytes = size_from_content_header
except BaseException:
size_from_content_header = None

chunk_size = 2 ** 16
bytes_read = 0

for chunk in r.stream(chunk_size):
out_file.write(chunk)
bytes_read += len(chunk)
if progress_indicator and size_from_content_header:
progress_indicator(bytes_read, size_from_content_header)
return expected_size_in_bytes


def download(url, local_path, expected_size_in_bytes=None, progress_indicator=None):
"""
Downloads a single file from a URL to the provided local path.

:param url: The remote URL specifying one file that should be downloaded. May be either a HTTP or HTTPS URL.
:param url: The remote URL specifying one file that should be downloaded. May be either a HTTP, HTTPS or S3 URL.
:param local_path: The local file name of the file that should be downloaded.
:param expected_size_in_bytes: The expected file size in bytes if known. It will be used to verify that all data have been downloaded.
:param progress_indicator A callable that can be use to report progress to the user. It is expected to take two parameters
``bytes_read`` and ``total_bytes``. If not provided, no progress is shown. Note that ``total_bytes`` is derived from
the ``Content-Length`` header and not from the parameter ``expected_size_in_bytes``.
:param progress_indicator A callable that can be use to report progress to the user. It is expected to take two parameters
``bytes_read`` and ``total_bytes``. If not provided, no progress is shown. Note that ``total_bytes`` is derived from
the ``Content-Length`` header and not from the parameter ``expected_size_in_bytes`` for downloads via HTTP(S).
"""
tmp_data_set_path = local_path + ".tmp"
try:
with __http().request("GET", url, preload_content=False, retries=10,
timeout=urllib3.Timeout(connect=45, read=240)) as r, open(tmp_data_set_path, "wb") as out_file:
if r.status > 299:
raise urllib.error.HTTPError(url, r.status, "", None, None)
# noinspection PyBroadException
try:
size_from_content_header = int(r.getheader("Content-Length"))
if expected_size_in_bytes is None:
expected_size_in_bytes = size_from_content_header
except BaseException:
size_from_content_header = None

chunk_size = 2 ** 16
bytes_read = 0

for chunk in r.stream(chunk_size):
out_file.write(chunk)
bytes_read += len(chunk)
if progress_indicator and size_from_content_header:
progress_indicator(bytes_read, size_from_content_header)
if url.startswith("s3"):
expected_size_in_bytes = download_s3(url, tmp_data_set_path, expected_size_in_bytes, progress_indicator)
else:
expected_size_in_bytes = download_http(url, tmp_data_set_path, expected_size_in_bytes, progress_indicator)
except BaseException:
if os.path.isfile(tmp_data_set_path):
os.remove(tmp_data_set_path)
Expand Down
8 changes: 7 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,13 @@ def str_from_file(name):
# "setproctitle==1.1.10",
# always use the latest version, these are certificate files...
# License: MPL 2.0
"certifi"
"certifi",
# License: Apache 2.0
# transitive dependencies:
# botocore: Apache 2.0
# jmespath: MIT
# s3transfer: Apache 2.0
"boto3==1.9.120"
]

tests_require = [
Expand Down
35 changes: 35 additions & 0 deletions tests/utils/net_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import random
import unittest.mock as mock
from unittest import TestCase

from esrally.utils import net


class NetTests(TestCase):
# Mocking boto3 objects directly is too complex so we keep all code in a helper function and mock this instead
@mock.patch("esrally.utils.net._download_from_s3_bucket")
def test_download_from_s3_bucket(self, download):
expected_size = random.choice([None, random.randint(0, 1000)])
progress_indicator = random.choice([None, "some progress indicator"])

net.download_s3("s3://mybucket.elasticsearch.org/data/documents.json.bz2", "/tmp/documents.json.bz2",
expected_size, progress_indicator)
download.assert_called_once_with("mybucket.elasticsearch.org", "data/documents.json.bz2",
"/tmp/documents.json.bz2", expected_size, progress_indicator)