From 6afe5a7f618636f7e85ad6238af6bf7259f13198 Mon Sep 17 00:00:00 2001 From: Daniel Mitterdorfer Date: Fri, 29 Mar 2019 08:55:05 +0100 Subject: [PATCH] Add ability to download from S3 buckets With this commit we add the ability to download from S3 buckets. A download URL must start with the scheme `s3://` to be recognized as S3 bucket. Provided that S3 access is properly configured Rally can also handle downloads from private S3 buckets. --- create-notice.sh | 7 ++++ docs/track.rst | 2 +- esrally/utils/net.py | 93 ++++++++++++++++++++++++++++++----------- setup.py | 8 +++- tests/utils/net_test.py | 35 ++++++++++++++++ 5 files changed, 119 insertions(+), 26 deletions(-) create mode 100644 tests/utils/net_test.py diff --git a/create-notice.sh b/create-notice.sh index 83a3da784..fbd97d3b2 100755 --- a/create-notice.sh +++ b/create-notice.sh @@ -49,12 +49,19 @@ function main { add_license "py-cpuinfo" "https://raw.githubusercontent.com/workhorsy/py-cpuinfo/master/LICENSE" add_license "tabulate" "https://bitbucket.org/astanin/python-tabulate/raw/03182bf9b8a2becbc54d17aa7e3e7dfed072c5f5/LICENSE" add_license "thespian" "https://raw.githubusercontent.com/kquick/Thespian/master/LICENSE.txt" + add_license "boto3" "https://raw.githubusercontent.com/boto/boto3/develop/LICENSE" # transitive dependencies # Jinja2 -> Markupsafe add_license "Markupsafe" "https://raw.githubusercontent.com/pallets/markupsafe/master/LICENSE.rst" # elasticsearch -> urllib3 add_license "urllib3" "https://raw.githubusercontent.com/shazow/urllib3/master/LICENSE.txt" + # boto3 -> s3transfer + add_license "s3transfer" "https://raw.githubusercontent.com/boto/s3transfer/develop/LICENSE.txt" + # boto3 -> jmespath + add_license "jmespath" "https://raw.githubusercontent.com/jmespath/jmespath.py/develop/LICENSE.txt" + # boto3 -> botocore + add_license "botocore" "https://raw.githubusercontent.com/boto/botocore/develop/LICENSE.txt" } main diff --git a/docs/track.rst b/docs/track.rst index bf632cce7..0d2efc39f 100644 --- a/docs/track.rst +++ b/docs/track.rst @@ -220,7 +220,7 @@ The ``corpora`` section contains all document corpora that are used by this trac Each entry in the ``documents`` list consists of the following properties: -* ``base-url`` (optional): A http or https URL that points to the root path where Rally can obtain the corresponding source file. +* ``base-url`` (optional): A http(s) or S3 URL that points to the root path where Rally can obtain the corresponding source file. Rally can also download data from private S3 buckets if access is properly `configured `_. * ``source-format`` (optional, default: ``bulk``): Defines in which format Rally should interpret the data file specified by ``source-file``. Currently, only ``bulk`` is supported. * ``source-file`` (mandatory): File name of the corresponding documents. For local use, this file can be a ``.json`` file. If you provide a ``base-url`` we recommend that you provide a compressed file here. The following extensions are supported: ``.zip``, ``.bz2``, ``.gz``, ``.tar``, ``.tar.gz``, ``.tgz`` or ``.tar.bz2``. It must contain exactly one JSON file with the same name. The preferred file extension for our official tracks is ``.bz2``. * ``includes-action-and-meta-data`` (optional, defaults to ``false``): Defines whether the documents file contains already an action and meta-data line (``true``) or only documents (``false``). diff --git a/esrally/utils/net.py b/esrally/utils/net.py index 2a6ad6b5d..699420d73 100644 --- a/esrally/utils/net.py +++ b/esrally/utils/net.py @@ -65,39 +65,84 @@ def finish(self): self.p.finish() +# This function is not meant to be called externally and only exists for unit tests +def _download_from_s3_bucket(bucket_name, bucket_path, local_path, expected_size_in_bytes=None, progress_indicator=None): + # lazily initialize S3 support - we might not need it + import boto3.s3.transfer + + class S3ProgressAdapter: + def __init__(self, size, progress): + self._expected_size_in_bytes = size + self._progress = progress + self._bytes_read = 0 + + def __call__(self, bytes_amount): + self._bytes_read += bytes_amount + self._progress(self._bytes_read, self._expected_size_in_bytes) + + s3 = boto3.resource("s3") + progress_callback = S3ProgressAdapter(expected_size_in_bytes, progress_indicator) if progress_indicator else None + s3.Bucket(bucket_name).download_file(bucket_path, local_path, + Callback=progress_callback, + Config=boto3.s3.transfer.TransferConfig(use_threads=False)) + + +def download_s3(url, local_path, expected_size_in_bytes=None, progress_indicator=None): + logger = logging.getLogger(__name__) + + bucket_and_path = url[5:] + bucket_end_index = bucket_and_path.find("/") + bucket = bucket_and_path[:bucket_end_index] + # we need to remove the leading "/" + bucket_path = bucket_and_path[bucket_end_index + 1:] + + logger.info("Downloading from S3 bucket [%s] and path [%s] to [%s].", bucket, bucket_path, local_path) + _download_from_s3_bucket(bucket, bucket_path, local_path, expected_size_in_bytes, progress_indicator) + + return expected_size_in_bytes + + +def download_http(url, local_path, expected_size_in_bytes=None, progress_indicator=None): + with __http().request("GET", url, preload_content=False, retries=10, + timeout=urllib3.Timeout(connect=45, read=240)) as r, open(local_path, "wb") as out_file: + if r.status > 299: + raise urllib.error.HTTPError(url, r.status, "", None, None) + # noinspection PyBroadException + try: + size_from_content_header = int(r.getheader("Content-Length")) + if expected_size_in_bytes is None: + expected_size_in_bytes = size_from_content_header + except BaseException: + size_from_content_header = None + + chunk_size = 2 ** 16 + bytes_read = 0 + + for chunk in r.stream(chunk_size): + out_file.write(chunk) + bytes_read += len(chunk) + if progress_indicator and size_from_content_header: + progress_indicator(bytes_read, size_from_content_header) + return expected_size_in_bytes + + def download(url, local_path, expected_size_in_bytes=None, progress_indicator=None): """ Downloads a single file from a URL to the provided local path. - :param url: The remote URL specifying one file that should be downloaded. May be either a HTTP or HTTPS URL. + :param url: The remote URL specifying one file that should be downloaded. May be either a HTTP, HTTPS or S3 URL. :param local_path: The local file name of the file that should be downloaded. :param expected_size_in_bytes: The expected file size in bytes if known. It will be used to verify that all data have been downloaded. - :param progress_indicator A callable that can be use to report progress to the user. It is expected to take two parameters - ``bytes_read`` and ``total_bytes``. If not provided, no progress is shown. Note that ``total_bytes`` is derived from - the ``Content-Length`` header and not from the parameter ``expected_size_in_bytes``. + :param progress_indicator A callable that can be use to report progress to the user. It is expected to take two parameters + ``bytes_read`` and ``total_bytes``. If not provided, no progress is shown. Note that ``total_bytes`` is derived from + the ``Content-Length`` header and not from the parameter ``expected_size_in_bytes`` for downloads via HTTP(S). """ tmp_data_set_path = local_path + ".tmp" try: - with __http().request("GET", url, preload_content=False, retries=10, - timeout=urllib3.Timeout(connect=45, read=240)) as r, open(tmp_data_set_path, "wb") as out_file: - if r.status > 299: - raise urllib.error.HTTPError(url, r.status, "", None, None) - # noinspection PyBroadException - try: - size_from_content_header = int(r.getheader("Content-Length")) - if expected_size_in_bytes is None: - expected_size_in_bytes = size_from_content_header - except BaseException: - size_from_content_header = None - - chunk_size = 2 ** 16 - bytes_read = 0 - - for chunk in r.stream(chunk_size): - out_file.write(chunk) - bytes_read += len(chunk) - if progress_indicator and size_from_content_header: - progress_indicator(bytes_read, size_from_content_header) + if url.startswith("s3"): + expected_size_in_bytes = download_s3(url, tmp_data_set_path, expected_size_in_bytes, progress_indicator) + else: + expected_size_in_bytes = download_http(url, tmp_data_set_path, expected_size_in_bytes, progress_indicator) except BaseException: if os.path.isfile(tmp_data_set_path): os.remove(tmp_data_set_path) diff --git a/setup.py b/setup.py index b733ca8c2..1095adc18 100644 --- a/setup.py +++ b/setup.py @@ -64,7 +64,13 @@ def str_from_file(name): # "setproctitle==1.1.10", # always use the latest version, these are certificate files... # License: MPL 2.0 - "certifi" + "certifi", + # License: Apache 2.0 + # transitive dependencies: + # botocore: Apache 2.0 + # jmespath: MIT + # s3transfer: Apache 2.0 + "boto3==1.9.120" ] tests_require = [ diff --git a/tests/utils/net_test.py b/tests/utils/net_test.py new file mode 100644 index 000000000..395d9d12e --- /dev/null +++ b/tests/utils/net_test.py @@ -0,0 +1,35 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import random +import unittest.mock as mock +from unittest import TestCase + +from esrally.utils import net + + +class NetTests(TestCase): + # Mocking boto3 objects directly is too complex so we keep all code in a helper function and mock this instead + @mock.patch("esrally.utils.net._download_from_s3_bucket") + def test_download_from_s3_bucket(self, download): + expected_size = random.choice([None, random.randint(0, 1000)]) + progress_indicator = random.choice([None, "some progress indicator"]) + + net.download_s3("s3://mybucket.elasticsearch.org/data/documents.json.bz2", "/tmp/documents.json.bz2", + expected_size, progress_indicator) + download.assert_called_once_with("mybucket.elasticsearch.org", "data/documents.json.bz2", + "/tmp/documents.json.bz2", expected_size, progress_indicator)