Skip to content

Commit

Permalink
Automatically select Hadoop build to use for Spark (#323)
Browse files Browse the repository at this point in the history
* automatically select hadoop build to use for spark

* revert unrelated change

* missing space

* fix template

* clarify file name in help text

* only append file name if not already specified

* show download URLs in debug output

* remove debug print

* warn on old download source format

* source hadoop build profile automatically

* add change log

* clarify dist.apache.org locations

* can't support without-hadoop now

* add test for spark_hadoop_build_version

* show full dist.apache.org url in config template
  • Loading branch information
nchammas authored Feb 21, 2021
1 parent 1e5fcb9 commit 55ad7b9
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 30 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@

* [#296]: Added support for launching clusters into private VPCs.
* [#324]: Flintrock now supports S3 URLs as a download source for Hadoop or Spark. This makes it easy to host your own copies of the Hadoop and Spark release builds in a private bucket.
* [#323]: Flintrock now automatically selects the correct build of Spark to use, based on the version of Hadoop/HDFS that you specify.

[#296]: https://github.com/nchammas/flintrock/pull/296
[#324]: https://github.com/nchammas/flintrock/pull/324
[#323]: https://github.com/nchammas/flintrock/pull/323

### Changed

Expand Down
14 changes: 7 additions & 7 deletions flintrock/config.yaml.template
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,19 @@ services:
# - can be http, https, or s3 URL
# - must contain a {v} template corresponding to the version
# - Spark must be pre-built
# - must be a tar.gz file
# download-source: "https://www.example.com/files/spark/{v}/spark-{v}.tgz"
# download-source: "s3://some-bucket/spark/{v}/spark-{v}.tgz"
# - files must be named according to the release pattern shown here: https://dist.apache.org/repos/dist/release/spark/
# download-source: "https://www.example.com/files/spark/{v}/"
# download-source: "s3://some-bucket/spark/{v}/"
# executor-instances: 1
hdfs:
version: 3.3.0
# optional; defaults to download from a dynamically selected Apache mirror
# - can be http, https, or s3 URL
# - must contain a {v} template corresponding to the version
# - must be a .tar.gz file
# download-source: "https://www.example.com/files/hadoop/{v}/hadoop-{v}.tar.gz"
# download-source: "http://www-us.apache.org/dist/hadoop/common/hadoop-{v}/hadoop-{v}.tar.gz"
# download-source: "s3://some-bucket/hadoop/{v}/hadoop-{v}.tar.gz"
# - files must be named according to the release pattern shown here: https://dist.apache.org/repos/dist/release/hadoop/common/
# download-source: "https://www.example.com/files/hadoop/{v}/"
# download-source: "http://www-us.apache.org/dist/hadoop/common/hadoop-{v}/"
# download-source: "s3://some-bucket/hadoop/{v}/"

provider: ec2

Expand Down
47 changes: 39 additions & 8 deletions flintrock/flintrock.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
NothingToDo,
Error)
from flintrock import __version__
from .util import spark_hadoop_build_version
from .services import HDFS, Spark # TODO: Remove this dependency.

FROZEN = getattr(sys, 'frozen', False)
Expand Down Expand Up @@ -174,13 +175,36 @@ def configure_log(debug: bool):


def build_hdfs_download_url(ctx, param, value):
hdfs_download_url = value.format(v=ctx.params['hdfs_version'])
return hdfs_download_url
hdfs_version = ctx.params['hdfs_version']
if value.endswith('.gz') or value.endswith('.tgz'):
logger.warning(
"Hadoop download source appears to point to a file, not a directory. "
"Flintrock will not try to determine the correct file to download based on "
"the Hadoop version."
)
hdfs_download_url = value
else:
hdfs_download_url = (value.rstrip('/') + '/hadoop-{v}.tar.gz')
return hdfs_download_url.format(v=hdfs_version)


def build_spark_download_url(ctx, param, value):
spark_download_url = value.format(v=ctx.params['spark_version'])
return spark_download_url
spark_version = ctx.params['spark_version']
hadoop_version = ctx.params['hdfs_version']
hadoop_build_version = spark_hadoop_build_version(hadoop_version)
if value.endswith('.gz') or value.endswith('.tgz'):
logger.warning(
"Spark download source appears to point to a file, not a directory. "
"Flintrock will not try to determine the correct file to download based on "
"the Spark and Hadoop versions."
)
spark_download_url = value
else:
spark_download_url = (value.rstrip('/') + '/spark-{v}-bin-{hv}.tgz')
return spark_download_url.format(
v=spark_version,
hv=hadoop_build_version,
)


def validate_download_source(url):
Expand Down Expand Up @@ -259,9 +283,12 @@ def cli(cli_context, config, provider, debug):
@click.option('--hdfs-download-source',
help=(
"URL to download Hadoop from. If an S3 URL, Flintrock will use the "
"AWS CLI from the cluster nodes to download it."
"AWS CLI from the cluster nodes to download it. "
"Flintrock will append the appropriate file name to the end "
"of the URL based on the Apache release file names here: "
"https://dist.apache.org/repos/dist/release/hadoop/common/"
),
default='https://www.apache.org/dyn/closer.lua?action=download&filename=hadoop/common/hadoop-{v}/hadoop-{v}.tar.gz',
default='https://www.apache.org/dyn/closer.lua?action=download&filename=hadoop/common/hadoop-{v}/',
show_default=True,
callback=build_hdfs_download_url)
@click.option('--install-spark/--no-install-spark', default=True)
Expand All @@ -276,9 +303,13 @@ def cli(cli_context, config, provider, debug):
@click.option('--spark-download-source',
help=(
"URL to download Spark from. If an S3 URL, Flintrock will use the "
"AWS CLI from the cluster nodes to download it."
"AWS CLI from the cluster nodes to download it. "
"Flintrock will append the appropriate file "
"name to the end of the URL based on the selected Hadoop version and "
"Apache release file names here: "
"https://dist.apache.org/repos/dist/release/spark/"
),
default='https://www.apache.org/dyn/closer.lua?action=download&filename=spark/spark-{v}/spark-{v}-bin-hadoop3.2.tgz',
default='https://www.apache.org/dyn/closer.lua?action=download&filename=spark/spark-{v}/',
show_default=True,
callback=build_spark_download_url)
@click.option('--spark-git-commit',
Expand Down
58 changes: 44 additions & 14 deletions flintrock/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
get_formatted_template,
)
from .ssh import ssh_check_output
from .util import spark_hadoop_build_version

FROZEN = getattr(sys, 'frozen', False)

Expand Down Expand Up @@ -152,17 +153,28 @@ def __init__(self, *, version, download_source):
self.manifest = {'version': version, 'download_source': download_source}

def install(
self,
ssh_client: paramiko.client.SSHClient,
cluster: FlintrockCluster):
logger.info("[{h}] Installing HDFS...".format(
h=ssh_client.get_transport().getpeername()[0]))
self,
ssh_client: paramiko.client.SSHClient,
cluster: FlintrockCluster,
):
logger.info(
"[{h}] Installing HDFS..."
.format(h=ssh_client.get_transport().getpeername()[0])
)

with ssh_client.open_sftp() as sftp:
sftp.put(
localpath=os.path.join(SCRIPTS_DIR, 'download-package.py'),
remotepath='/tmp/download-package.py')

logger.debug(
"[{h}] Downloading Hadoop from: {s}"
.format(
h=ssh_client.get_transport().getpeername()[0],
s=self.download_source,
)
)

ssh_check_output(
client=ssh_client,
command="""
Expand Down Expand Up @@ -326,18 +338,29 @@ def __init__(
'git_repository': git_repository}

def install(
self,
ssh_client: paramiko.client.SSHClient,
cluster: FlintrockCluster):
logger.info("[{h}] Installing Spark...".format(
h=ssh_client.get_transport().getpeername()[0]))
self,
ssh_client: paramiko.client.SSHClient,
cluster: FlintrockCluster,
):
logger.info(
"[{h}] Installing Spark..."
.format(h=ssh_client.get_transport().getpeername()[0])
)

if self.version:
with ssh_client.open_sftp() as sftp:
sftp.put(
localpath=os.path.join(SCRIPTS_DIR, 'download-package.py'),
remotepath='/tmp/download-package.py')

logger.debug(
"[{h}] Downloading Spark from: {s}"
.format(
h=ssh_client.get_transport().getpeername()[0],
s=self.download_source,
)
)

ssh_check_output(
client=ssh_client,
command="""
Expand All @@ -346,7 +369,6 @@ def install(
# version=self.version,
download_source=self.download_source.format(v=self.version),
))

else:
ssh_check_output(
client=ssh_client,
Expand All @@ -355,6 +377,16 @@ def install(
sudo yum install -y git
sudo yum install -y java-devel
""")

logger.debug(
"[{h}] Cloning Spark at {c} from: {s}"
.format(
h=ssh_client.get_transport().getpeername()[0],
c=self.git_commit,
s=self.git_repository,
)
)

ssh_check_output(
client=ssh_client,
command="""
Expand All @@ -370,9 +402,7 @@ def install(
""".format(
repo=shlex.quote(self.git_repository),
commit=shlex.quote(self.git_commit),
# Hardcoding this here until we figure out a better way to handle
# the supported build profiles.
hadoop_short_version='2.7',
hadoop_short_version=spark_hadoop_build_version(self.hadoop_version),
))
ssh_check_output(
client=ssh_client,
Expand Down
13 changes: 13 additions & 0 deletions flintrock/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,16 @@ def duration_to_expiration(duration_string):
expiration = datetime.now(tz=timezone.utc) + duration_to_timedelta(duration_string)

return expiration


def spark_hadoop_build_version(hadoop_version: str) -> str:
"""
Given a Hadoop version, determine the Hadoop build of Spark to use.
"""
hadoop_version = tuple(map(int, hadoop_version.split('.')))
if hadoop_version < (2, 7):
return 'hadoop2.6'
elif (2, 7) <= hadoop_version < (3, 0):
return 'hadoop2.7'
elif (3, 0) <= hadoop_version:
return 'hadoop3.2'
10 changes: 9 additions & 1 deletion tests/test_util.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
from datetime import datetime, timedelta, timezone
from flintrock.util import duration_to_timedelta, duration_to_expiration
from flintrock.util import (
duration_to_timedelta,
duration_to_expiration,
spark_hadoop_build_version,
)
from freezegun import freeze_time


Expand All @@ -14,3 +18,7 @@ def test_duration_to_timedelta():
@freeze_time("2012-01-14")
def test_duration_to_expiration():
assert duration_to_expiration('5m') == datetime.now(tz=timezone.utc) + timedelta(minutes=5)


def test_spark_hadoop_build_version():
assert spark_hadoop_build_version('3.1.3') == 'hadoop3.2'

0 comments on commit 55ad7b9

Please sign in to comment.