diff --git a/.gitignore b/.gitignore
index 6d7240c92..08f355c90 100644
--- a/.gitignore
+++ b/.gitignore
@@ -4,3 +4,7 @@
# Ignore Python cache files after running tests
*/__pycache__/
+
+# Ignore all bazel-* symlinks. There is no full list since this can change
+# based on the name of the directory bazel is cloned into.
+/bazel-*
\ No newline at end of file
diff --git a/BUILD b/BUILD
new file mode 100644
index 000000000..989329f4b
--- /dev/null
+++ b/BUILD
@@ -0,0 +1,91 @@
+package(default_visibility = ["//visibility:public"])
+
+test_suite(
+ name = "DataprocInitActionsTestSuite",
+ tests = [
+ ":test_cloud_sql_proxy",
+ ":test_dr_elephant",
+ ":test_hive_hcatalog",
+ ":test_starburst_presto",
+ "//bigtable:test_bigtable",
+ "//conda:test_conda",
+ "//connectors:test_connectors",
+ "//datalab:test_datalab",
+ "//drill:test_drill",
+ "//flink:test_flink",
+ "//ganglia:test_ganglia",
+ "//gpu:test_gpu",
+ "//hbase:test_hbase",
+ "//hue:test_hue",
+ "//jupyter:test_jupyter",
+ "//kafka:test_kafka",
+ "//livy:test_livy",
+ "//oozie:test_oozie",
+ "//presto:test_presto",
+ "//ranger:test_ranger",
+ "//rapids:test_rapids",
+ "//rstudio:test_rstudio",
+ "//solr:test_solr",
+ "//tez:test_tez",
+ "//tony:test_tony",
+ ],
+)
+
+py_test(
+ name = "test_cloud_sql_proxy",
+ size = "enormous",
+ srcs = ["cloud-sql-proxy/test_cloud_sql_proxy.py"],
+ data = ["cloud-sql-proxy/cloud-sql-proxy.sh"],
+ local = True,
+ shard_count = 3,
+ deps = [
+ ":pyspark_metastore_test",
+ "//integration_tests:dataproc_test_case",
+ "@io_abseil_py//absl/testing:parameterized",
+ ],
+)
+
+py_test(
+ name = "test_dr_elephant",
+ size = "enormous",
+ srcs = ["dr-elephant/test_dr_elephant.py"],
+ data = ["dr-elephant/dr-elephant.sh"],
+ local = True,
+ shard_count = 2,
+ deps = [
+ "//integration_tests:dataproc_test_case",
+ "@io_abseil_py//absl/testing:parameterized",
+ ],
+)
+
+py_test(
+ name = "test_hive_hcatalog",
+ size = "enormous",
+ srcs = ["hive-hcatalog/test_hive_hcatalog.py"],
+ data = ["hive-hcatalog/hive-hcatalog.sh"],
+ local = True,
+ shard_count = 6,
+ deps = [
+ "//integration_tests:dataproc_test_case",
+ "@io_abseil_py//absl/testing:parameterized",
+ ],
+)
+
+py_test(
+ name = "test_starburst_presto",
+ size = "enormous",
+ srcs = ["starburst-presto/test_starburst_presto.py"],
+ data = ["starburst-presto/presto.sh"],
+ local = True,
+ shard_count = 4,
+ deps = [
+ "//integration_tests:dataproc_test_case",
+ "@io_abseil_py//absl/testing:parameterized",
+ ],
+)
+
+py_library(
+ name = "pyspark_metastore_test",
+ testonly = True,
+ srcs = ["cloud-sql-proxy/pyspark_metastore_test.py"],
+)
diff --git a/WORKSPACE b/WORKSPACE
new file mode 100644
index 000000000..4cabdd028
--- /dev/null
+++ b/WORKSPACE
@@ -0,0 +1,25 @@
+load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive")
+
+http_archive(
+ name = "rules_python",
+ sha256 = "aa96a691d3a8177f3215b14b0edc9641787abaaa30363a080165d06ab65e1161",
+ url = "https://github.com/bazelbuild/rules_python/releases/download/0.0.1/rules_python-0.0.1.tar.gz",
+)
+
+http_archive(
+ name = "io_abseil_py",
+ sha256 = "9b324bc96587819a1420c592cf5d54424456645719f1de6bb22f7045bb6dbc6b",
+ strip_prefix = "abseil-py-master",
+ url = "https://github.com/abseil/abseil-py/archive/master.zip",
+)
+
+http_archive(
+ name = "six_archive",
+ build_file = "@io_abseil_py//third_party:six.BUILD",
+ sha256 = "105f8d68616f8248e24bf0e9372ef04d3cc10104f1980f54d57b2ce73a5ad56a",
+ strip_prefix = "six-1.10.0",
+ urls = [
+ "http://mirror.bazel.build/pypi.python.org/packages/source/s/six/six-1.10.0.tar.gz",
+ "https://pypi.python.org/packages/source/s/six/six-1.10.0.tar.gz",
+ ],
+)
diff --git a/bigtable/BUILD b/bigtable/BUILD
new file mode 100644
index 000000000..50ca95e83
--- /dev/null
+++ b/bigtable/BUILD
@@ -0,0 +1,20 @@
+package(default_visibility = ["//visibility:public"])
+
+py_test(
+ name = "test_bigtable",
+ size = "enormous",
+ srcs = ["test_bigtable.py"],
+ data = ["bigtable.sh"],
+ local = True,
+ shard_count = 3,
+ deps = [
+ ":run_hbase_commands",
+ "//integration_tests:dataproc_test_case",
+ "@io_abseil_py//absl/testing:parameterized",
+ ],
+)
+
+py_library(
+ name = "run_hbase_commands",
+ srcs = ["run_hbase_commands.py"],
+)
diff --git a/bigtable/test_bigtable.py b/bigtable/test_bigtable.py
index ff6b7bfbf..55c42301d 100644
--- a/bigtable/test_bigtable.py
+++ b/bigtable/test_bigtable.py
@@ -10,9 +10,9 @@
See: https://cloud.google.com/bigtable/docs/cbt-overview
"""
import os
-import unittest
-from parameterized import parameterized
+from absl.testing import absltest
+from absl.testing import parameterized
from integration_tests.dataproc_test_case import DataprocTestCase
@@ -48,13 +48,14 @@ def tearDown(self):
def _validate_bigtable(self):
_, stdout, _ = self.assert_command(
'cbt -instance {} count test-bigtable '.format(self.db_name))
- self.assertEqual(int(float(stdout)), 4,
- "Invalid BigTable instance count")
+ self.assertEqual(
+ int(float(stdout)), 4, "Invalid BigTable instance count")
def verify_instance(self, name):
self.upload_test_file(
- os.path.join(os.path.dirname(os.path.abspath(__file__)),
- self.TEST_SCRIPT_FILE_NAME), name)
+ os.path.join(
+ os.path.dirname(os.path.abspath(__file__)),
+ self.TEST_SCRIPT_FILE_NAME), name)
self.assert_instance_command(
name, "python {}".format(self.TEST_SCRIPT_FILE_NAME))
self._validate_bigtable()
@@ -64,21 +65,14 @@ def verify_instance(self, name):
admin commands are provided from text file.
"""
- @parameterized.expand(
- [
- ("SINGLE", "1.2", ["m"]),
- ("STANDARD", "1.2", ["m"]),
- ("HA", "1.2", ["m-0"]),
- ("SINGLE", "1.3", ["m"]),
- ("STANDARD", "1.3", ["m"]),
- ("HA", "1.3", ["m-0"]),
- ],
- testcase_func_name=DataprocTestCase.generate_verbose_test_name)
- def test_bigtable(self, configuration, dataproc_version, machine_suffixes):
- self.createCluster(configuration,
- self.INIT_ACTIONS,
- dataproc_version,
- metadata=self.metadata)
+ @parameterized.parameters(
+ ("SINGLE", ["m"]),
+ ("STANDARD", ["m"]),
+ ("HA", ["m-0"]),
+ )
+ def test_bigtable(self, configuration, machine_suffixes):
+ self.createCluster(
+ configuration, self.INIT_ACTIONS, metadata=self.metadata)
for machine_suffix in machine_suffixes:
self.verify_instance("{}-{}".format(self.getClusterName(),
@@ -86,4 +80,4 @@ def test_bigtable(self, configuration, dataproc_version, machine_suffixes):
if __name__ == '__main__':
- unittest.main()
+ absltest.main()
diff --git a/cloud-sql-proxy/test_cloud_sql.py b/cloud-sql-proxy/test_cloud_sql_proxy.py
similarity index 70%
rename from cloud-sql-proxy/test_cloud_sql.py
rename to cloud-sql-proxy/test_cloud_sql_proxy.py
index cf3028f2a..a0e948b8e 100644
--- a/cloud-sql-proxy/test_cloud_sql.py
+++ b/cloud-sql-proxy/test_cloud_sql_proxy.py
@@ -1,8 +1,8 @@
import json
import logging
-import unittest
-from parameterized import parameterized
+from absl.testing import absltest
+from absl.testing import parameterized
from integration_tests.dataproc_test_case import DataprocTestCase
@@ -50,31 +50,23 @@ def __submit_pyspark_job(self, cluster_name):
cluster_name, 'pyspark',
'{}/{}'.format(self.INIT_ACTIONS_REPO, self.TEST_SCRIPT_FILE_NAME))
- @parameterized.expand(
- [
- ("SINGLE", "1.2"),
- ("STANDARD", "1.2"),
- ("HA", "1.2"),
- ("SINGLE", "1.3"),
- ("STANDARD", "1.3"),
- ("HA", "1.3"),
- ("SINGLE", "1.4"),
- ("STANDARD", "1.4"),
- ("HA", "1.4"),
- ],
- testcase_func_name=DataprocTestCase.generate_verbose_test_name)
- def test_cloud_sql_proxy(self, configuration, dataproc_version):
+ @parameterized.parameters(
+ "SINGLE",
+ "STANDARD",
+ "HA",
+ )
+ def test_cloud_sql_proxy(self, configuration):
metadata = 'hive-metastore-instance={}:{}'.format(
self.PROJECT_METADATA, self.DB_NAME)
- self.createCluster(configuration,
- self.INIT_ACTIONS,
- dataproc_version,
- machine_type="n1-standard-2",
- metadata=metadata,
- scopes='sql-admin')
+ self.createCluster(
+ configuration,
+ self.INIT_ACTIONS,
+ machine_type="n1-standard-2",
+ metadata=metadata,
+ scopes='sql-admin')
self.verify_cluster(self.getClusterName())
if __name__ == '__main__':
- unittest.main()
+ absltest.main()
diff --git a/cloudbuild/Dockerfile b/cloudbuild/Dockerfile
index 58f0b0156..e191e8cf5 100644
--- a/cloudbuild/Dockerfile
+++ b/cloudbuild/Dockerfile
@@ -7,13 +7,10 @@ RUN useradd -m -d /home/ia-tests -s /bin/bash ia-tests
COPY --chown=ia-tests:ia-tests . /init-actions
-# Install Pip
-RUN apt-get -y update
-RUN apt-get -y install python3-pip
-
-# Install fastunit
-WORKDIR /tmp/fastunit
-RUN git clone https://github.com/ityoung/python3-fastunit.git .
-RUN python3 setup.py install
+# Install Bazel:
+# https://docs.bazel.build/versions/master/install-ubuntu.html
+RUN echo "deb [arch=amd64] http://storage.googleapis.com/bazel-apt stable jdk1.8" | tee /etc/apt/sources.list.d/bazel.list
+RUN curl https://bazel.build/bazel-release.pub.gpg | apt-key add -
+RUN apt-get update && apt-get install -y openjdk-8-jdk python3-setuptools bazel
USER ia-tests
diff --git a/cloudbuild/cloudbuild.yaml b/cloudbuild/cloudbuild.yaml
index ee10263f4..0c34ebb39 100644
--- a/cloudbuild/cloudbuild.yaml
+++ b/cloudbuild/cloudbuild.yaml
@@ -1,15 +1,42 @@
steps:
# Create Docker image from regular Dockerfile
- name: 'gcr.io/cloud-builders/docker'
+ id: 'docker-build'
args: ['build', '--tag=gcr.io/$PROJECT_ID/init-actions-image:$BUILD_ID', '-f', 'cloudbuild/Dockerfile', '.']
+
# Push Docker image to GCR
- name: 'gcr.io/cloud-builders/docker'
+ id: 'gcr-push'
args: ['push', 'gcr.io/$PROJECT_ID/init-actions-image:$BUILD_ID']
- # Run presubmit script
+ # Run presubmit tests in parallel for 1.2 image
+ - name: 'gcr.io/cloud-builders/kubectl'
+ id: 'dataproc-1.2-tests'
+ waitFor: ['gcr-push']
+ entrypoint: 'bash'
+ args: ['cloudbuild/run-presubmit-on-k8s.sh', 'gcr.io/$PROJECT_ID/init-actions-image:$BUILD_ID', '$BUILD_ID', '1.2']
+ env:
+ - 'COMMIT_SHA=$COMMIT_SHA'
+ - 'CLOUDSDK_COMPUTE_ZONE=us-central1-f'
+ - 'CLOUDSDK_CONTAINER_CLUSTER=init-actions-presubmit'
+
+ # Run presubmit tests in parallel for 1.3 image
+ - name: 'gcr.io/cloud-builders/kubectl'
+ id: 'dataproc-1.3-tests'
+ waitFor: ['gcr-push']
+ entrypoint: 'bash'
+ args: ['cloudbuild/run-presubmit-on-k8s.sh', 'gcr.io/$PROJECT_ID/init-actions-image:$BUILD_ID', '$BUILD_ID', '1.3']
+ env:
+ - 'COMMIT_SHA=$COMMIT_SHA'
+ - 'CLOUDSDK_COMPUTE_ZONE=us-central1-f'
+ - 'CLOUDSDK_CONTAINER_CLUSTER=init-actions-presubmit'
+
+ # Run presubmit tests in parallel for 1.4 image
- name: 'gcr.io/cloud-builders/kubectl'
+ id: 'dataproc-1.4-tests'
+ waitFor: ['gcr-push']
entrypoint: 'bash'
- args: ['cloudbuild/run-presubmit-on-k8s.sh', 'gcr.io/$PROJECT_ID/init-actions-image:$BUILD_ID', '$BUILD_ID']
+ args: ['cloudbuild/run-presubmit-on-k8s.sh', 'gcr.io/$PROJECT_ID/init-actions-image:$BUILD_ID', '$BUILD_ID', '1.4']
env:
- 'COMMIT_SHA=$COMMIT_SHA'
- 'CLOUDSDK_COMPUTE_ZONE=us-central1-f'
diff --git a/cloudbuild/presubmit.sh b/cloudbuild/presubmit.sh
index 47f9af828..d3ab0b409 100644
--- a/cloudbuild/presubmit.sh
+++ b/cloudbuild/presubmit.sh
@@ -2,6 +2,9 @@
set -euxo pipefail
+# Declare global variable for passing tests between functions
+declare -a TESTS_TO_RUN
+
configure_gcloud() {
gcloud config set core/disable_prompts TRUE
gcloud config set compute/zone us-central1-f
@@ -21,10 +24,6 @@ configure_gcloud_ssh_key() {
chmod 600 "${HOME}/.ssh/google_compute_engine"
}
-install_test_dependencies() {
- pip3 install -r integration_tests/requirements.txt
-}
-
# Fetches master branch from GitHub and "resets" local changes to be relative to it,
# so we can diff what changed relatively to master branch.
initialize_git_repo() {
@@ -50,14 +49,13 @@ determine_tests_to_run() {
echo "Changed files: ${CHANGED_FILES[*]}"
# Determines init actions directories that were changed
- RUN_ALL_TESTS=false
declare -a changed_dirs
for changed_file in "${CHANGED_FILES[@]}"; do
local changed_dir="${changed_file/\/*/}/"
# Run all tests if common directories were changed
if [[ ${changed_dir} =~ ^(integration_tests/|util/|cloudbuild/)$ ]]; then
echo "All tests will be run: '${changed_dir}' was changed"
- RUN_ALL_TESTS=true
+ TESTS_TO_RUN=(":DataprocInitActionsTestSuite")
return 0
fi
# Hack to workaround empty array expansion on old versions of Bash.
@@ -68,42 +66,34 @@ determine_tests_to_run() {
done
echo "Changed directories: ${changed_dirs[*]}"
- # Determines what tests in changed init action directories to run
+ # Determines test target in changed init action directories to run
for changed_dir in "${changed_dirs[@]}"; do
- local tests_in_dir
- if ! tests_in_dir=$(compgen -G "${changed_dir}test*.py"); then
- echo "ERROR: presubmit failed - cannot find tests inside '${changed_dir}' directory"
- exit 1
+ # NOTE: The ::-1 removes the trailing '/'
+ local test_name=${changed_dir::-1}
+ # Some of our py_tests (that has dashes in the name) are defined in the top-level directory
+ if [[ $test_name == *"-"* ]]; then
+ local test_target=":test_${test_name//-/_}"
+ else
+ local test_target="${test_name}:test_${test_name}"
fi
- declare -a tests_array
- mapfile -t tests_array < <(echo "${tests_in_dir}")
- TESTS_TO_RUN+=("${tests_array[@]}")
+ TESTS_TO_RUN+=("${test_target}")
done
echo "Tests: ${TESTS_TO_RUN[*]}"
}
run_tests() {
- export INTERNAL_IP_SSH=true
- if [[ $RUN_ALL_TESTS == true ]]; then
- # Run all init action tests
- python3 -m fastunit -v */test_*.py
- else
- # Run tests for the init actions that were changed
- python3 -m fastunit -v "${TESTS_TO_RUN[@]}"
- fi
+ bazel test --jobs=15 --local_cpu_resources=15 --local_ram_resources=$((15 * 1024)) \
+ --action_env=INTERNAL_IP_SSH=true --test_output=errors \
+ --test_arg="--image_version=${IMAGE_VERSION}" "${TESTS_TO_RUN[@]}"
}
main() {
cd /init-actions
configure_gcloud
configure_gcloud_ssh_key
- install_test_dependencies
initialize_git_repo
determine_tests_to_run
run_tests
}
-# Declare global variable for passing tests between functions
-declare -a TESTS_TO_RUN
-
main
diff --git a/cloudbuild/run-presubmit-on-k8s.sh b/cloudbuild/run-presubmit-on-k8s.sh
index 6afb43dca..a5056ba40 100644
--- a/cloudbuild/run-presubmit-on-k8s.sh
+++ b/cloudbuild/run-presubmit-on-k8s.sh
@@ -4,14 +4,16 @@ set -euxo pipefail
readonly IMAGE=$1
readonly BUILD_ID=$2
+readonly DATAPROC_IMAGE_VERSION=$3
-readonly POD_NAME=presubmit-${BUILD_ID/_/-}
+readonly POD_NAME=presubmit-${DATAPROC_IMAGE_VERSION//./-}-${BUILD_ID//_/-}
gcloud container clusters get-credentials "${CLOUDSDK_CONTAINER_CLUSTER}"
kubectl run "${POD_NAME}" --generator=run-pod/v1 --image="$IMAGE" \
- --requests "cpu=2,memory=2Gi" --restart=Never \
+ --requests "cpu=4,memory=12Gi" --restart=Never \
--env="COMMIT_SHA=$COMMIT_SHA" \
+ --env="IMAGE_VERSION=$DATAPROC_IMAGE_VERSION" \
--command -- bash /init-actions/cloudbuild/presubmit.sh
trap 'kubectl delete pods "${POD_NAME}"' EXIT
diff --git a/conda/BUILD b/conda/BUILD
new file mode 100644
index 000000000..b85acff40
--- /dev/null
+++ b/conda/BUILD
@@ -0,0 +1,26 @@
+package(default_visibility = ["//visibility:public"])
+
+exports_files(["bootstrap-conda.sh"])
+
+py_test(
+ name = "test_conda",
+ size = "enormous",
+ srcs = ["test_conda.py"],
+ data = [
+ "bootstrap-conda.sh",
+ "install-conda-env.sh",
+ ],
+ local = True,
+ shard_count = 2,
+ deps = [
+ ":get_sys_exec",
+ "//integration_tests:dataproc_test_case",
+ "@io_abseil_py//absl/testing:parameterized",
+ ],
+)
+
+py_library(
+ name = "get_sys_exec",
+ testonly = True,
+ srcs = ["get-sys-exec.py"],
+)
diff --git a/conda/get-sys-exec.py b/conda/get-sys-exec.py
index fa2de5b1f..f53514a82 100644
--- a/conda/get-sys-exec.py
+++ b/conda/get-sys-exec.py
@@ -14,6 +14,6 @@
else:
sc = pyspark.SparkContext()
distData = sc.parallelize(range(100))
- python_distros = distData.map(lambda x: sys.executable).distinct().collect(
- )
+ python_distros = distData.map(
+ lambda x: sys.executable).distinct().collect()
print(python_distros)
diff --git a/conda/test_conda.py b/conda/test_conda.py
index 553d1ce1e..83fef4fc1 100644
--- a/conda/test_conda.py
+++ b/conda/test_conda.py
@@ -1,7 +1,7 @@
import json
-import unittest
-from parameterized import parameterized
+from absl.testing import absltest
+from absl.testing import parameterized
from integration_tests.dataproc_test_case import DataprocTestCase
@@ -49,34 +49,27 @@ def _verify_pip_packages(self, instance, pip_packages):
@staticmethod
def _parse_packages(stdout):
- return set(l.split()[0] for l in stdout.splitlines()
- if not l.startswith("#"))
+ return set(
+ l.split()[0] for l in stdout.splitlines() if not l.startswith("#"))
- @parameterized.expand(
- [
- ("STANDARD", "1.2", "3.7", [], []),
- ("STANDARD", "1.2", "3.7", CONDA_PKGS, PIP_PKGS),
- ("STANDARD", "1.3", "3.7", [], []),
- ("STANDARD", "1.3", "3.7", CONDA_PKGS, PIP_PKGS),
- ("STANDARD", "1.4", "3.7", [], []),
- ("STANDARD", "1.4", "3.7", CONDA_PKGS, PIP_PKGS),
- ],
- testcase_func_name=DataprocTestCase.generate_verbose_test_name)
- def test_conda(self, configuration, dataproc_version, expected_python,
- conda_packages, pip_packages):
+ @parameterized.parameters(
+ ("STANDARD", [], []),
+ ("STANDARD", CONDA_PKGS, PIP_PKGS),
+ )
+ def test_conda(self, configuration, conda_packages, pip_packages):
metadata = "'CONDA_PACKAGES={},PIP_PACKAGES={}'".format(
" ".join(conda_packages), " ".join(pip_packages))
- self.createCluster(configuration,
- self.INIT_ACTIONS,
- dataproc_version,
- machine_type="n1-standard-2",
- metadata=metadata)
+ self.createCluster(
+ configuration,
+ self.INIT_ACTIONS,
+ machine_type="n1-standard-2",
+ metadata=metadata)
instance_name = self.getClusterName() + "-m"
- self._verify_python_version(instance_name, expected_python)
+ self._verify_python_version(instance_name, "3.7")
self._verify_pip_packages(instance_name, pip_packages)
self._verify_conda_packages(instance_name, conda_packages)
if __name__ == "__main__":
- unittest.main()
+ absltest.main()
diff --git a/connectors/BUILD b/connectors/BUILD
new file mode 100644
index 000000000..22068fa2b
--- /dev/null
+++ b/connectors/BUILD
@@ -0,0 +1,14 @@
+package(default_visibility = ["//visibility:public"])
+
+py_test(
+ name = "test_connectors",
+ size = "enormous",
+ srcs = ["test_connectors.py"],
+ data = ["connectors.sh"],
+ local = True,
+ shard_count = 6,
+ deps = [
+ "//integration_tests:dataproc_test_case",
+ "@io_abseil_py//absl/testing:parameterized",
+ ],
+)
diff --git a/connectors/test_connectors.py b/connectors/test_connectors.py
index 1a43e395e..3640b5c5b 100644
--- a/connectors/test_connectors.py
+++ b/connectors/test_connectors.py
@@ -1,6 +1,6 @@
-import unittest
-
-from parameterized import parameterized
+import pkg_resources
+from absl.testing import absltest
+from absl.testing import parameterized
from integration_tests.dataproc_test_case import DataprocTestCase
@@ -12,71 +12,51 @@ class ConnectorsTestCase(DataprocTestCase):
BQ_CONNECTOR_VERSION = "1.0.0"
GCS_CONNECTOR_VERSION = "2.0.0"
- def verify_instance(self, name, dataproc_version, connector,
- connector_version):
+ def verify_instance(self, name, connector, connector_version):
self.__submit_pig_job(
name, "sh test -f {}/{}-hadoop2-{}.jar".format(
- self.__connectors_dir(dataproc_version), connector,
- connector_version))
+ self.__connectors_dir(), connector, connector_version))
self.__submit_pig_job(
- name, "sh test -L {}/{}.jar".format(
- self.__connectors_dir(dataproc_version), connector,
- connector_version))
+ name, "sh test -L {}/{}.jar".format(self.__connectors_dir(),
+ connector, connector_version))
- @staticmethod
- def __connectors_dir(dataproc_version):
- if dataproc_version < "1.4":
+ def __connectors_dir(self):
+ if self.getImageVersion() < pkg_resources.parse_version("1.4"):
return "/usr/lib/hadoop/lib"
return "/usr/local/share/google/dataproc/lib"
def __submit_pig_job(self, cluster_name, job):
self.assert_dataproc_job(cluster_name, 'pig', "-e '{}'".format(job))
- @parameterized.expand(
- [
- ("SINGLE", "1.2"),
- ("STANDARD", "1.2"),
- ("HA", "1.2"),
- ("SINGLE", "1.3"),
- ("STANDARD", "1.3"),
- ("HA", "1.3"),
- ("SINGLE", "1.4"),
- ("STANDARD", "1.4"),
- ("HA", "1.4"),
- ],
- testcase_func_name=DataprocTestCase.generate_verbose_test_name)
- def test_gcs_connector(self, configuration, dataproc_version):
- self.createCluster(configuration,
- self.INIT_ACTIONS,
- dataproc_version,
- metadata="gcs-connector-version={}".format(
- self.GCS_CONNECTOR_VERSION))
- self.verify_instance(self.getClusterName(), dataproc_version,
- "gcs-connector", self.GCS_CONNECTOR_VERSION)
-
- @parameterized.expand(
- [
- ("SINGLE", "1.2"),
- ("STANDARD", "1.2"),
- ("HA", "1.2"),
- ("SINGLE", "1.3"),
- ("STANDARD", "1.3"),
- ("HA", "1.3"),
- ("SINGLE", "1.4"),
- ("STANDARD", "1.4"),
- ("HA", "1.4"),
- ],
- testcase_func_name=DataprocTestCase.generate_verbose_test_name)
- def test_bq_connector(self, configuration, dataproc_version):
- self.createCluster(configuration,
- self.INIT_ACTIONS,
- dataproc_version,
- metadata="bigquery-connector-version={}".format(
- self.BQ_CONNECTOR_VERSION))
- self.verify_instance(self.getClusterName(), dataproc_version,
- "bigquery-connector", self.BQ_CONNECTOR_VERSION)
+ @parameterized.parameters(
+ "SINGLE",
+ "STANDARD",
+ "HA",
+ )
+ def test_gcs_connector(self, configuration):
+ self.createCluster(
+ configuration,
+ self.INIT_ACTIONS,
+ metadata="gcs-connector-version={}".format(
+ self.GCS_CONNECTOR_VERSION))
+ self.verify_instance(self.getClusterName(), "gcs-connector",
+ self.GCS_CONNECTOR_VERSION)
+
+ @parameterized.parameters(
+ "SINGLE",
+ "STANDARD",
+ "HA",
+ )
+ def test_bq_connector(self, configuration):
+ self.createCluster(
+ configuration,
+ self.INIT_ACTIONS,
+ metadata="bigquery-connector-version={}".format(
+ self.BQ_CONNECTOR_VERSION))
+ self.verify_instance(self.getClusterName(), "bigquery-connector",
+ self.BQ_CONNECTOR_VERSION)
if __name__ == '__main__':
- unittest.main()
+ absltest.main()
diff --git a/datalab/BUILD b/datalab/BUILD
new file mode 100644
index 000000000..9d3d71a8c
--- /dev/null
+++ b/datalab/BUILD
@@ -0,0 +1,18 @@
+package(default_visibility = ["//visibility:public"])
+
+py_test(
+ name = "test_datalab",
+ size = "enormous",
+ srcs = ["test_datalab.py"],
+ data = [
+ "datalab.sh",
+ "//conda:bootstrap-conda.sh",
+ "//docker:docker.sh",
+ ],
+ local = True,
+ shard_count = 2,
+ deps = [
+ "//integration_tests:dataproc_test_case",
+ "@io_abseil_py//absl/testing:parameterized",
+ ],
+)
diff --git a/datalab/test_datalab.py b/datalab/test_datalab.py
index 4d6433c43..ba9b3645c 100644
--- a/datalab/test_datalab.py
+++ b/datalab/test_datalab.py
@@ -1,6 +1,6 @@
-import unittest
-
-from parameterized import parameterized
+import pkg_resources
+from absl.testing import absltest
+from absl.testing import parameterized
from integration_tests.dataproc_test_case import DataprocTestCase
@@ -15,28 +15,25 @@ def verify_instance(self, name):
name, "curl {} -L {}:8080 | grep 'Google Cloud DataLab'".format(
"--retry 10 --retry-delay 10 --retry-connrefused", name))
- @parameterized.expand(
- [
- ("STANDARD", "1.2", ["m"], "python2"),
- ("STANDARD", "1.2", ["m"], "python3"),
- ("STANDARD", "1.3", ["m"], "python2"),
- ("STANDARD", "1.3", ["m"], "python3"),
- ("STANDARD", "1.4", ["m"], "python3-default"),
- ],
- testcase_func_name=DataprocTestCase.generate_verbose_test_name)
- def test_datalab(self, configuration, dataproc_version, machine_suffixes,
- python):
+ @parameterized.parameters(
+ ("STANDARD", ["m"], "python2"),
+ ("STANDARD", ["m"], "python3"),
+ )
+ def test_datalab(self, configuration, machine_suffixes, python):
init_actions = self.INIT_ACTIONS
metadata = 'INIT_ACTIONS_REPO={}'.format(self.INIT_ACTIONS_REPO)
- if python == "python3":
- init_actions = self.PYTHON_3_INIT_ACTIONS + init_actions
-
- self.createCluster(configuration,
- init_actions,
- dataproc_version,
- metadata=metadata,
- scopes='cloud-platform',
- timeout_in_minutes=30)
+ if self.getImageVersion() <= pkg_resources.parse_version("1.3"):
+ if python == "python3":
+ init_actions = self.PYTHON_3_INIT_ACTIONS + init_actions
+ elif python == "python2":
+ return
+
+ self.createCluster(
+ configuration,
+ init_actions,
+ metadata=metadata,
+ scopes='cloud-platform',
+ timeout_in_minutes=30)
for machine_suffix in machine_suffixes:
self.verify_instance("{}-{}".format(self.getClusterName(),
@@ -44,4 +41,4 @@ def test_datalab(self, configuration, dataproc_version, machine_suffixes,
if __name__ == '__main__':
- unittest.main()
+ absltest.main()
diff --git a/docker/BUILD b/docker/BUILD
new file mode 100644
index 000000000..fdc1c8bd0
--- /dev/null
+++ b/docker/BUILD
@@ -0,0 +1,3 @@
+package(default_visibility = ["//visibility:public"])
+
+exports_files(["docker.sh"])
diff --git a/dr-elephant/test_dr_elephant.py b/dr-elephant/test_dr_elephant.py
index 0a937e92c..06483b35b 100644
--- a/dr-elephant/test_dr_elephant.py
+++ b/dr-elephant/test_dr_elephant.py
@@ -1,6 +1,5 @@
-import unittest
-
-from parameterized import parameterized
+from absl.testing import absltest
+from absl.testing import parameterized
from integration_tests.dataproc_test_case import DataprocTestCase
@@ -26,23 +25,16 @@ def verify_instance(self, instance_name):
instance_name,
verify_cmd_fmt.format(instance_name, "
QuasiMonteCarlo
"))
- @parameterized.expand(
- [
- ("STANDARD", "1.2", ["m"]),
- ("HA", "1.2", ["m-0"]),
- ("STANDARD", "1.3", ["m"]),
- ("HA", "1.3", ["m-0"]),
- ("STANDARD", "1.4", ["m"]),
- ("HA", "1.4", ["m-0"]),
- ],
- testcase_func_name=DataprocTestCase.generate_verbose_test_name)
- def test_dr_elephant(self, configuration, dataproc_version,
- machine_suffixes):
- self.createCluster(configuration,
- self.INIT_ACTIONS,
- dataproc_version,
- timeout_in_minutes=30,
- machine_type="n1-standard-2")
+ @parameterized.parameters(
+ ("STANDARD", ["m"]),
+ ("HA", ["m-0"]),
+ )
+ def test_dr_elephant(self, configuration, machine_suffixes):
+ self.createCluster(
+ configuration,
+ self.INIT_ACTIONS,
+ timeout_in_minutes=30,
+ machine_type="n1-standard-2")
# Submit a job to check if statistic is generated
self.assert_dataproc_job(
@@ -57,4 +49,4 @@ def test_dr_elephant(self, configuration, dataproc_version,
if __name__ == '__main__':
- unittest.main()
+ absltest.main()
diff --git a/drill/BUILD b/drill/BUILD
new file mode 100644
index 000000000..afada39f5
--- /dev/null
+++ b/drill/BUILD
@@ -0,0 +1,18 @@
+package(default_visibility = ["//visibility:public"])
+
+py_test(
+ name = "test_drill",
+ size = "enormous",
+ srcs = ["test_drill.py"],
+ data = [
+ "drill.sh",
+ "validate.sh",
+ "//zookeeper:zookeeper.sh",
+ ],
+ local = True,
+ shard_count = 3,
+ deps = [
+ "//integration_tests:dataproc_test_case",
+ "@io_abseil_py//absl/testing:parameterized",
+ ],
+)
diff --git a/drill/test_drill.py b/drill/test_drill.py
index 9aa5a61cb..cd6368dae 100644
--- a/drill/test_drill.py
+++ b/drill/test_drill.py
@@ -1,7 +1,7 @@
import os
-import unittest
-from parameterized import parameterized
+from absl.testing import absltest
+from absl.testing import parameterized
from integration_tests.dataproc_test_case import DataprocTestCase
@@ -14,8 +14,9 @@ class DrillTestCase(DataprocTestCase):
def verify_instance(self, name, drill_mode, target_node):
self.upload_test_file(
- os.path.join(os.path.dirname(os.path.abspath(__file__)),
- self.TEST_SCRIPT_FILE_NAME), name)
+ os.path.join(
+ os.path.dirname(os.path.abspath(__file__)),
+ self.TEST_SCRIPT_FILE_NAME), name)
self.__run_bash_test_file(name, drill_mode, target_node)
self.remove_test_script(self.TEST_SCRIPT_FILE_NAME, name)
@@ -24,24 +25,17 @@ def __run_bash_test_file(self, name, drill_mode, target_node):
name, "sudo bash {} {} {}".format(self.TEST_SCRIPT_FILE_NAME,
drill_mode, target_node))
- @parameterized.expand(
- [
- ("SINGLE", "1.3-deb9", [("m", "m")]),
- ("SINGLE", "1.2-deb9", [("m", "m")]),
- ("STANDARD", "1.3-deb9", [("m", "w-0"), ("m", "m")]),
- ("STANDARD", "1.2-deb9", [("m", "w-0"), ("m", "m")]),
- ("HA", "1.3-deb9", [("m-0", "w-0"), ("w-0", "m-1")]),
- ("HA", "1.2-deb9", [("m-0", "w-0"), ("w-0", "m-1")]),
- ],
- testcase_func_name=DataprocTestCase.generate_verbose_test_name)
- def test_drill(self, configuration, dataproc_version, verify_options):
+ @parameterized.parameters(
+ ("SINGLE", [("m", "m")]),
+ ("STANDARD", [("m", "w-0"), ("m", "m")]),
+ ("HA", [("m-0", "w-0"), ("w-0", "m-1")]),
+ )
+ def test_drill(self, configuration, verify_options):
init_actions = self.INIT_ACTIONS
if configuration == "STANDARD":
init_actions = self.INIT_ACTIONS_FOR_STANDARD + init_actions
- self.createCluster(configuration,
- init_actions,
- dataproc_version,
- machine_type="n1-standard-2")
+ self.createCluster(
+ configuration, init_actions, machine_type="n1-standard-2")
drill_mode = "DISTRIBUTED"
if configuration == "SINGLE":
@@ -56,4 +50,4 @@ def test_drill(self, configuration, dataproc_version, verify_options):
if __name__ == '__main__':
- unittest.main()
+ absltest.main()
diff --git a/flink/BUILD b/flink/BUILD
new file mode 100644
index 000000000..d2617b252
--- /dev/null
+++ b/flink/BUILD
@@ -0,0 +1,17 @@
+package(default_visibility = ["//visibility:public"])
+
+py_test(
+ name = "test_flink",
+ size = "enormous",
+ srcs = ["test_flink.py"],
+ data = [
+ "flink.sh",
+ "validate.sh",
+ ],
+ local = True,
+ shard_count = 5,
+ deps = [
+ "//integration_tests:dataproc_test_case",
+ "@io_abseil_py//absl/testing:parameterized",
+ ],
+)
diff --git a/flink/test_flink.py b/flink/test_flink.py
index 47a10c527..2d6fccbf4 100644
--- a/flink/test_flink.py
+++ b/flink/test_flink.py
@@ -1,7 +1,7 @@
import os
-import unittest
-from parameterized import parameterized
+from absl.testing import absltest
+from absl.testing import parameterized
from integration_tests.dataproc_test_case import DataprocTestCase
@@ -24,48 +24,34 @@ def __run_test_file(self, name, yarn_session):
name, "bash {} {}".format(self.TEST_SCRIPT_FILE_NAME,
yarn_session))
- @parameterized.expand(
- [
- ("STANDARD", "1.2", ["m"]),
- ("HA", "1.2", ["m-0", "m-1", "m-2"]),
- ("STANDARD", "1.3", ["m"]),
- ("HA", "1.3", ["m-0", "m-1", "m-2"]),
- ("STANDARD", "1.4", ["m"]),
- ("HA", "1.4", ["m-0", "m-1", "m-2"]),
- ],
- testcase_func_name=DataprocTestCase.generate_verbose_test_name)
- def test_flink(self, configuration, dataproc_version, machine_suffixes):
- self.createCluster(configuration,
- self.INIT_ACTIONS,
- dataproc_version,
- machine_type="n1-standard-2")
+ @parameterized.parameters(
+ ("STANDARD", ["m"]),
+ ("HA", ["m-0", "m-1", "m-2"]),
+ )
+ def test_flink(self, configuration, machine_suffixes):
+ self.createCluster(
+ configuration, self.INIT_ACTIONS, machine_type="n1-standard-2")
for machine_suffix in machine_suffixes:
self.verify_instance("{}-{}".format(self.getClusterName(),
machine_suffix))
- @parameterized.expand(
- [
- ("STANDARD", "1.2", ["m"]),
- ("HA", "1.2", ["m-0", "m-1", "m-2"]),
- ("SINGLE", "1.3", ["m"]),
- ("STANDARD", "1.3", ["m"]),
- ("HA", "1.3", ["m-0", "m-1", "m-2"]),
- ("STANDARD", "1.4", ["m"]),
- ("HA", "1.4", ["m-0", "m-1", "m-2"]),
- ],
- testcase_func_name=DataprocTestCase.generate_verbose_test_name)
+ @parameterized.parameters(
+ ("STANDARD", ["m"]),
+ ("HA", ["m-0", "m-1", "m-2"]),
+ ("SINGLE", ["m"]),
+ )
def test_flink_with_optional_metadata(self, configuration,
- dataproc_version, machine_suffixes):
- self.createCluster(configuration,
- self.INIT_ACTIONS,
- dataproc_version,
- machine_type="n1-standard-2",
- metadata="flink-start-yarn-session=false")
+ machine_suffixes):
+ self.createCluster(
+ configuration,
+ self.INIT_ACTIONS,
+ machine_type="n1-standard-2",
+ metadata="flink-start-yarn-session=false")
for machine_suffix in machine_suffixes:
- self.verify_instance("{}-{}".format(self.getClusterName(),
- machine_suffix),
- yarn_session=False)
+ self.verify_instance(
+ "{}-{}".format(self.getClusterName(), machine_suffix),
+ yarn_session=False)
if __name__ == '__main__':
- unittest.main()
+ absltest.main()
diff --git a/ganglia/BUILD b/ganglia/BUILD
new file mode 100644
index 000000000..5b6f3bd52
--- /dev/null
+++ b/ganglia/BUILD
@@ -0,0 +1,22 @@
+package(default_visibility = ["//visibility:public"])
+
+py_test(
+ name = "test_ganglia",
+ size = "enormous",
+ srcs = ["test_ganglia.py"],
+ data = [
+ "ganglia.sh",
+ ],
+ local = True,
+ shard_count = 3,
+ deps = [
+ ":verify_ganglia_running",
+ "//integration_tests:dataproc_test_case",
+ "@io_abseil_py//absl/testing:parameterized",
+ ],
+)
+
+py_library(
+ name = "verify_ganglia_running",
+ srcs = ["verify_ganglia_running.py"],
+)
diff --git a/ganglia/test_ganglia.py b/ganglia/test_ganglia.py
index 63fbf26a8..26e03f9b0 100644
--- a/ganglia/test_ganglia.py
+++ b/ganglia/test_ganglia.py
@@ -1,7 +1,7 @@
import os
-import unittest
-from parameterized import parameterized
+from absl.testing import absltest
+from absl.testing import parameterized
from integration_tests.dataproc_test_case import DataprocTestCase
@@ -23,25 +23,17 @@ def verify_instance(self, name):
name, "python3 {}".format(self.TEST_SCRIPT_FILE_NAME))
self.remove_test_script(self.TEST_SCRIPT_FILE_NAME, name)
- @parameterized.expand(
- [
- ("SINGLE", "1.2", ["m"]),
- ("STANDARD", "1.2", ["m", "w-0"]),
- ("HA", "1.2", ["m-0", "m-1", "m-2", "w-0"]),
- ("SINGLE", "1.3", ["m"]),
- ("STANDARD", "1.3", ["m", "w-0"]),
- ("HA", "1.3", ["m-0", "m-1", "m-2", "w-0"]),
- ("SINGLE", "1.4", ["m"]),
- ("STANDARD", "1.4", ["m", "w-0"]),
- ("HA", "1.4", ["m-0", "m-1", "m-2", "w-0"]),
- ],
- testcase_func_name=DataprocTestCase.generate_verbose_test_name)
- def test_ganglia(self, configuration, dataproc_version, machine_suffixes):
- self.createCluster(configuration, self.INIT_ACTIONS, dataproc_version)
+ @parameterized.parameters(
+ ("SINGLE", ["m"]),
+ ("STANDARD", ["m", "w-0"]),
+ ("HA", ["m-0", "m-1", "m-2", "w-0"]),
+ )
+ def test_ganglia(self, configuration, machine_suffixes):
+ self.createCluster(configuration, self.INIT_ACTIONS)
for machine_suffix in machine_suffixes:
self.verify_instance("{}-{}".format(self.getClusterName(),
machine_suffix))
if __name__ == '__main__':
- unittest.main()
+ absltest.main()
diff --git a/gpu/BUILD b/gpu/BUILD
new file mode 100644
index 000000000..7dacb6ce4
--- /dev/null
+++ b/gpu/BUILD
@@ -0,0 +1,14 @@
+package(default_visibility = ["//visibility:public"])
+
+py_test(
+ name = "test_gpu",
+ size = "enormous",
+ srcs = ["test_gpu.py"],
+ data = ["install_gpu_driver.sh"],
+ local = True,
+ shard_count = 3,
+ deps = [
+ "//integration_tests:dataproc_test_case",
+ "@io_abseil_py//absl/testing:parameterized",
+ ],
+)
diff --git a/gpu/test_gpu.py b/gpu/test_gpu.py
new file mode 100644
index 000000000..ca5dc2af4
--- /dev/null
+++ b/gpu/test_gpu.py
@@ -0,0 +1,73 @@
+from absl.testing import absltest
+from absl.testing import parameterized
+
+from integration_tests.dataproc_test_case import DataprocTestCase
+
+
+class NvidiaGpuDriverTestCase(DataprocTestCase):
+ COMPONENT = 'gpu'
+ INIT_ACTIONS = ['gpu/install_gpu_driver.sh']
+ MASTER_GPU_TYPE = 'type=nvidia-tesla-v100'
+ WORKER_GPU_TYPE = 'type=nvidia-tesla-v100'
+
+ def verify_instance(self, name):
+ self.assert_instance_command(name, "nvidia-smi")
+
+ def verify_instance_gpu_agent(self, name):
+ self.assert_instance_command(
+ name, "systemctl status gpu_utilization_agent.service")
+
+ @parameterized.parameters(
+ ("STANDARD", ["m", "w-0"], MASTER_GPU_TYPE, WORKER_GPU_TYPE), )
+ def test_install_gpu(self, configuration, machine_suffixes,
+ master_accelerator, worker_accelerator):
+ init_actions = self.INIT_ACTIONS
+ self.createCluster(
+ configuration,
+ init_actions,
+ beta=True,
+ master_accelerator=master_accelerator,
+ worker_accelerator=worker_accelerator)
+ for machine_suffix in machine_suffixes:
+ self.verify_instance("{}-{}".format(self.getClusterName(),
+ machine_suffix))
+
+ @parameterized.parameters(
+ ("STANDARD", ["m", "w-0"], MASTER_GPU_TYPE, WORKER_GPU_TYPE), )
+ def test_install_gpu_no_agent(self, configuration, machine_suffixes,
+ master_accelerator, worker_accelerator):
+ init_actions = self.INIT_ACTIONS
+ self.createCluster(
+ configuration,
+ init_actions,
+ beta=True,
+ master_accelerator=master_accelerator,
+ worker_accelerator=worker_accelerator,
+ metadata='install_gpu_agent=false')
+ for machine_suffix in machine_suffixes:
+ self.verify_instance("{}-{}".format(self.getClusterName(),
+ machine_suffix))
+
+ @parameterized.parameters(
+ ("STANDARD", ["m", "w-0"], MASTER_GPU_TYPE, WORKER_GPU_TYPE), )
+ def test_install_gpu_agent(self, configuration, machine_suffixes,
+ master_accelerator, worker_accelerator):
+
+ init_actions = self.INIT_ACTIONS
+ self.createCluster(
+ configuration,
+ init_actions,
+ beta=True,
+ master_accelerator=master_accelerator,
+ worker_accelerator=worker_accelerator,
+ metadata='install_gpu_agent=true',
+ scopes='https://www.googleapis.com/auth/monitoring.write')
+ for machine_suffix in machine_suffixes:
+ self.verify_instance("{}-{}".format(self.getClusterName(),
+ machine_suffix))
+ self.verify_instance_gpu_agent("{}-{}".format(
+ self.getClusterName(), machine_suffix))
+
+
+if __name__ == '__main__':
+ absltest.main()
diff --git a/gpu/test_install_gpu_driver.py b/gpu/test_install_gpu_driver.py
deleted file mode 100644
index 773babd69..000000000
--- a/gpu/test_install_gpu_driver.py
+++ /dev/null
@@ -1,101 +0,0 @@
-import unittest
-from parameterized import parameterized
-
-from integration_tests.dataproc_test_case import DataprocTestCase
-
-
-class NvidiaGpuDriverTestCase(DataprocTestCase):
- COMPONENT = 'gpu'
- INIT_ACTIONS = ['gpu/install_gpu_driver.sh']
- MASTER_GPU_TYPE = 'type=nvidia-tesla-v100'
- WORKER_GPU_TYPE = 'type=nvidia-tesla-v100'
-
- def verify_instance(self, name):
- self.assert_instance_command(name, "nvidia-smi")
-
- def verify_instance_gpu_agent(self, name):
- self.assert_instance_command(
- name, "systemctl status gpu_utilization_agent.service")
-
- @parameterized.expand(
- [
- ("STANDARD", "1.2", ["m", "w-0"
- ], MASTER_GPU_TYPE, WORKER_GPU_TYPE),
- ("STANDARD", "1.3", ["m", "w-0"
- ], MASTER_GPU_TYPE, WORKER_GPU_TYPE),
- ("STANDARD", "1.4", ["m", "w-0"
- ], MASTER_GPU_TYPE, WORKER_GPU_TYPE),
- ],
- testcase_func_name=DataprocTestCase.generate_verbose_test_name)
- def test_install_gpu(self, configuration, dataproc_version,
- machine_suffixes, master_accelerator,
- worker_accelerator):
- init_actions = self.INIT_ACTIONS
- self.createCluster(configuration,
- init_actions,
- dataproc_version,
- beta=True,
- master_accelerator=master_accelerator,
- worker_accelerator=worker_accelerator)
- for machine_suffix in machine_suffixes:
- self.verify_instance("{}-{}".format(self.getClusterName(),
- machine_suffix))
-
- @parameterized.expand(
- [
- ("STANDARD", "1.2", ["m", "w-0"
- ], MASTER_GPU_TYPE, WORKER_GPU_TYPE),
- ("STANDARD", "1.3", ["m", "w-0"
- ], MASTER_GPU_TYPE, WORKER_GPU_TYPE),
- ("STANDARD", "1.4", ["m", "w-0"
- ], MASTER_GPU_TYPE, WORKER_GPU_TYPE),
- ],
- testcase_func_name=DataprocTestCase.generate_verbose_test_name)
- def test_install_gpu_no_agent(self, configuration, dataproc_version,
- machine_suffixes, master_accelerator,
- worker_accelerator):
- init_actions = self.INIT_ACTIONS
- self.createCluster(configuration,
- init_actions,
- dataproc_version,
- beta=True,
- master_accelerator=master_accelerator,
- worker_accelerator=worker_accelerator,
- metadata='install_gpu_agent=false')
- for machine_suffix in machine_suffixes:
- self.verify_instance("{}-{}".format(self.getClusterName(),
- machine_suffix))
-
- @parameterized.expand(
- [
- ("STANDARD", "1.2", ["m", "w-0"
- ], MASTER_GPU_TYPE, WORKER_GPU_TYPE),
- ("STANDARD", "1.3", ["m", "w-0"
- ], MASTER_GPU_TYPE, WORKER_GPU_TYPE),
- ("STANDARD", "1.4", ["m", "w-0"
- ], MASTER_GPU_TYPE, WORKER_GPU_TYPE),
- ],
- testcase_func_name=DataprocTestCase.generate_verbose_test_name)
- def test_install_gpu_agent(self, configuration, dataproc_version,
- machine_suffixes, master_accelerator,
- worker_accelerator):
-
- init_actions = self.INIT_ACTIONS
- self.createCluster(
- configuration,
- init_actions,
- dataproc_version,
- beta=True,
- master_accelerator=master_accelerator,
- worker_accelerator=worker_accelerator,
- metadata='install_gpu_agent=true',
- scopes='https://www.googleapis.com/auth/monitoring.write')
- for machine_suffix in machine_suffixes:
- self.verify_instance("{}-{}".format(self.getClusterName(),
- machine_suffix))
- self.verify_instance_gpu_agent("{}-{}".format(
- self.getClusterName(), machine_suffix))
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/hbase/BUILD b/hbase/BUILD
new file mode 100644
index 000000000..854f02c60
--- /dev/null
+++ b/hbase/BUILD
@@ -0,0 +1,17 @@
+package(default_visibility = ["//visibility:public"])
+
+py_test(
+ name = "test_hbase",
+ size = "enormous",
+ srcs = ["test_hbase.py"],
+ data = [
+ "hbase.sh",
+ "//zookeeper:zookeeper.sh",
+ ],
+ local = True,
+ shard_count = 6,
+ deps = [
+ "//integration_tests:dataproc_test_case",
+ "@io_abseil_py//absl/testing:parameterized",
+ ],
+)
diff --git a/hbase/test_hbase.py b/hbase/test_hbase.py
index 402ec3d5a..9c2057bf9 100644
--- a/hbase/test_hbase.py
+++ b/hbase/test_hbase.py
@@ -1,6 +1,5 @@
-import unittest
-
-from parameterized import parameterized
+from absl.testing import absltest
+from absl.testing import parameterized
from integration_tests.dataproc_test_case import DataprocTestCase
@@ -28,60 +27,37 @@ def verify_instance(self, name):
'org.apache.hadoop.hbase.IntegrationTestsDriver',
'org.apache.hadoop.hbase.mapreduce.IntegrationTestImportTsv'))
- @parameterized.expand(
- [
- ("SINGLE", "1.2", ["m"]),
- ("STANDARD", "1.2", ["m"]),
- ("HA", "1.2", ["m-0"]),
- ("SINGLE", "1.3", ["m"]),
- ("STANDARD", "1.3", ["m"]),
- ("HA", "1.3", ["m-0"]),
- ("SINGLE", "1.4", ["m"]),
- ("STANDARD", "1.4", ["m"]),
- ("HA", "1.4", ["m-0"]),
- ],
- testcase_func_name=DataprocTestCase.generate_verbose_test_name)
- def test_hbase(self, configuration, dataproc_version, machine_suffixes):
+ @parameterized.parameters(
+ ("SINGLE", ["m"]),
+ ("STANDARD", ["m"]),
+ ("HA", ["m-0"]),
+ )
+ def test_hbase(self, configuration, machine_suffixes):
init_actions = self.INIT_ACTIONS
if configuration != "HA":
init_actions = self.INIT_ACTIONS_FOR_NOT_HA + init_actions
- self.createCluster(configuration,
- init_actions,
- dataproc_version,
- machine_type="n1-standard-2")
+ self.createCluster(
+ configuration, init_actions, machine_type="n1-standard-2")
for machine_suffix in machine_suffixes:
self.verify_instance("{}-{}".format(self.getClusterName(),
machine_suffix))
- @parameterized.expand(
- [
- ("SINGLE", "1.2", ["m"]),
- ("STANDARD", "1.2", ["m"]),
- ("HA", "1.2", ["m-0"]),
- ("SINGLE", "1.3", ["m"]),
- ("STANDARD", "1.3", ["m"]),
- ("HA", "1.3", ["m-0"]),
- ("SINGLE", "1.4", ["m"]),
- ("STANDARD", "1.4", ["m"]),
- ("HA", "1.4", ["m-0"]),
- ],
- testcase_func_name=DataprocTestCase.generate_verbose_test_name)
- def test_hbase_on_gcs(self, configuration, dataproc_version,
- machine_suffixes):
+ @parameterized.parameters(
+ ("SINGLE", ["m"]),
+ ("STANDARD", ["m"]),
+ ("HA", ["m-0"]),
+ )
+ def test_hbase_on_gcs(self, configuration, machine_suffixes):
init_actions = self.INIT_ACTIONS
if configuration != "HA":
init_actions = self.INIT_ACTIONS_FOR_NOT_HA + init_actions
- test_dir = "{}-{}-{}".format(configuration.lower(),
- dataproc_version.replace(".", "-"),
- self.random_str())
- metadata = 'hbase-root-dir=gs://{}/{}'.format(self.GCS_BUCKET,
- test_dir)
- self.createCluster(configuration,
- init_actions,
- dataproc_version,
- metadata=metadata,
- machine_type="n1-standard-2")
+ metadata = 'hbase-root-dir=gs://{}/test-dir'.format(self.GCS_BUCKET)
+ self.createCluster(
+ configuration,
+ init_actions,
+ metadata=metadata,
+ machine_type="n1-standard-2")
for machine_suffix in machine_suffixes:
self.verify_instance("{}-{}".format(self.getClusterName(),
@@ -89,4 +65,5 @@ def test_hbase_on_gcs(self, configuration, dataproc_version,
if __name__ == '__main__':
- unittest.main()
+ absltest.main()
+
diff --git a/hive-hcatalog/test_hive.py b/hive-hcatalog/test_hive_hcatalog.py
similarity index 77%
rename from hive-hcatalog/test_hive.py
rename to hive-hcatalog/test_hive_hcatalog.py
index 1f8d86d6e..e9771bf60 100644
--- a/hive-hcatalog/test_hive.py
+++ b/hive-hcatalog/test_hive_hcatalog.py
@@ -1,7 +1,8 @@
import json
import random
-import unittest
-from parameterized import parameterized
+
+from absl.testing import absltest
+from absl.testing import parameterized
from integration_tests.dataproc_test_case import DataprocTestCase
@@ -56,23 +57,18 @@ def __submit_hive_job(self, cluster_name, job, should_repeat_job=False):
status = stdout_dict.get("status", {}).get("state")
return status, stderr
- @parameterized.expand(
- [
- ("SINGLE", "1.2", False),
- ("STANDARD", "1.2", False),
- ("HA", "1.2", False),
- ("SINGLE", "1.3", True),
- ("STANDARD", "1.3", True),
- ("HA", "1.3", True),
- ("SINGLE", "1.4", True),
- ("STANDARD", "1.4", True),
- ("HA", "1.4", True),
- ],
- testcase_func_name=DataprocTestCase.generate_verbose_test_name)
- def test_hive(self, configuration, dataproc_version, should_repeat_job):
- self.createCluster(configuration, self.INIT_ACTIONS, dataproc_version)
+ @parameterized.parameters(
+ ("SINGLE", False),
+ ("STANDARD", False),
+ ("HA", False),
+ ("SINGLE", True),
+ ("STANDARD", True),
+ ("HA", True),
+ )
+ def test_hive(self, configuration, should_repeat_job):
+ self.createCluster(configuration, self.INIT_ACTIONS)
self.verify_instance(self.getClusterName(), should_repeat_job)
if __name__ == '__main__':
- unittest.main()
+ absltest.main()
diff --git a/hue/BUILD b/hue/BUILD
new file mode 100644
index 000000000..d5fce54d6
--- /dev/null
+++ b/hue/BUILD
@@ -0,0 +1,14 @@
+package(default_visibility = ["//visibility:public"])
+
+py_test(
+ name = "test_hue",
+ size = "enormous",
+ srcs = ["test_hue.py"],
+ data = ["hue.sh"],
+ local = True,
+ shard_count = 3,
+ deps = [
+ "//integration_tests:dataproc_test_case",
+ "@io_abseil_py//absl/testing:parameterized",
+ ],
+)
diff --git a/hue/test_hue.py b/hue/test_hue.py
index c61822580..7df063184 100644
--- a/hue/test_hue.py
+++ b/hue/test_hue.py
@@ -1,6 +1,5 @@
-import unittest
-
-from parameterized import parameterized
+from absl.testing import absltest
+from absl.testing import parameterized
from integration_tests.dataproc_test_case import DataprocTestCase
@@ -8,7 +7,6 @@
class HueTestCase(DataprocTestCase):
COMPONENT = 'hue'
INIT_ACTIONS = ['hue/hue.sh']
- TEST_SCRIPT_FILE_NAME = 'verify_hue_running.py'
def verify_instance(self, instance_name):
verify_cmd_fmt = '''
@@ -24,25 +22,17 @@ def verify_instance(self, instance_name):
verify_cmd_fmt.format(instance_name,
"Query. Explore. Repeat.
"))
- @parameterized.expand(
- [
- ("SINGLE", "1.2", ["m"]),
- ("STANDARD", "1.2", ["m"]),
- ("HA", "1.2", ["m-0"]),
- ("SINGLE", "1.3", ["m"]),
- ("STANDARD", "1.3", ["m"]),
- ("HA", "1.3", ["m-0"]),
- ("SINGLE", "1.4", ["m"]),
- ("STANDARD", "1.4", ["m"]),
- ("HA", "1.4", ["m-0"]),
- ],
- testcase_func_name=DataprocTestCase.generate_verbose_test_name)
- def test_hue(self, configuration, dataproc_version, machine_suffixes):
- self.createCluster(configuration, self.INIT_ACTIONS, dataproc_version)
+ @parameterized.parameters(
+ ("SINGLE", ["m"]),
+ ("STANDARD", ["m"]),
+ ("HA", ["m-0"]),
+ )
+ def test_hue(self, configuration, machine_suffixes):
+ self.createCluster(configuration, self.INIT_ACTIONS)
for machine_suffix in machine_suffixes:
self.verify_instance("{}-{}".format(self.getClusterName(),
machine_suffix))
if __name__ == '__main__':
- unittest.main()
+ absltest.main()
diff --git a/integration_tests/BUILD b/integration_tests/BUILD
new file mode 100644
index 000000000..8c3991ce5
--- /dev/null
+++ b/integration_tests/BUILD
@@ -0,0 +1,11 @@
+package(default_visibility = ["//visibility:public"])
+
+py_library(
+ name = "dataproc_test_case",
+ testonly = True,
+ srcs = ["dataproc_test_case.py"],
+ deps = [
+ "@io_abseil_py//absl/flags",
+ "@io_abseil_py//absl/testing:parameterized",
+ ],
+)
diff --git a/integration_tests/README.MD b/integration_tests/README.MD
index d4d80f00c..66e37267e 100644
--- a/integration_tests/README.MD
+++ b/integration_tests/README.MD
@@ -1,28 +1,28 @@
# Integration tests framework
-This directory contains integration tests framework that runs init-actions on real Dataproc cluster
-and validates if software is properly installed.
+This directory contains integration tests framework that runs init-actions on
+real Dataproc cluster and validates if software is properly installed.
## Requirements
-In order to run tests you need `Python 3.5` virtualenv and libs specified in `requirements.txt`.
+
+In order to run tests you need Python with [setuptools](https://pypi.org/project/setuptools/)
+installed and [Bazel](https://bazel.build/).
You also need to have `gcloud` CLI configured to use your GCP project.
## Running tests
-To run all tests just type `python -m unittest` from project root.
-
-To run test for specific init action use its package name and test name:
- `python -m unittest hive-hcatalog.test_hive`
-
-## Parallel tests
-By default, tests are executed synchronously - this is standard `unittest` module behaviour and works well for
-real unit tests.
-However here tests are taking significant amount of time and running them one by one
-sometimes takes up to 2 hours to test whole component.
+To run all tests for default image version use command in project root:
+```bash
+bazel test :DataprocInitActionsTestSuite --jobs=25
+```
-This can be speed up by using [python3-fastunit](https://github.com/ityoung/python3-fastunit)
-package. Once installed (there is no PIP version) tests can be run as simple as before, but switching
-`unittest` module to `fastunit`:
+To run all tests for default image version use command in project root:
+```bash
+bazel test :DataprocInitActionsTestSuite --test_arg=--image_version=1.4
+```
- `python -m fastunit hive-hcatalog.test_hive`
\ No newline at end of file
+To run tests for specific initialization action use its package name and test name:
+```bash
+bazel test gpu:test_gpu
+```
diff --git a/integration_tests/dataproc_test_case.py b/integration_tests/dataproc_test_case.py
index 7e8dd9abb..fa642dbba 100644
--- a/integration_tests/dataproc_test_case.py
+++ b/integration_tests/dataproc_test_case.py
@@ -1,30 +1,30 @@
-import os
-import re
-import sys
+import datetime
import json
+import logging
+import os
import random
+import re
import string
-import logging
-import datetime
-import unittest
import subprocess
+import sys
from threading import Timer
-BASE_TEST_CASE = unittest.TestCase
-PARALLEL_RUN = False
-if "fastunit" in sys.modules:
- import fastunit
- BASE_TEST_CASE = fastunit.TestCase
- PARALLEL_RUN = True
+import pkg_resources
+from absl import flags
+from absl.testing import parameterized
logging.basicConfig(level=os.getenv("LOG_LEVEL", logging.INFO))
+FLAGS = flags.FLAGS
+flags.DEFINE_string('image_version', '1.3', 'dataproc_version, e.g. 1.2')
+FLAGS(sys.argv)
+
INTERNAL_IP_SSH = os.getenv("INTERNAL_IP_SSH", "false").lower() == "true"
DEFAULT_TIMEOUT = 15 # minutes
-class DataprocTestCase(BASE_TEST_CASE):
+class DataprocTestCase(parameterized.TestCase):
DEFAULT_ARGS = {
"SINGLE": [
"--single-node",
@@ -72,7 +72,6 @@ def setUpClass(cls):
def createCluster(self,
configuration,
init_actions,
- dataproc_version,
metadata=None,
scopes=None,
properties=None,
@@ -85,7 +84,7 @@ def createCluster(self,
boot_disk_size="50GB"):
self.name = "test-{}-{}-{}-{}".format(
self.COMPONENT, configuration.lower(),
- dataproc_version.replace(".", "-"), self.datetime_str())[:46]
+ FLAGS.image_version.replace(".", "-"), self.datetime_str())[:46]
self.name += "-{}".format(self.random_str(size=4))
self.cluster_version = None
@@ -101,8 +100,8 @@ def createCluster(self,
args.append("--scopes={}".format(scopes))
if metadata:
args.append("--metadata={}".format(metadata))
- if dataproc_version:
- args.append("--image-version={}".format(dataproc_version))
+ if FLAGS.image_version:
+ args.append("--image-version={}".format(FLAGS.image_version))
if timeout_in_minutes:
args.append("--initialization-action-timeout={}m".format(
timeout_in_minutes))
@@ -144,13 +143,18 @@ def stage_init_actions(self, project):
staging_dir = "{}/{}-{}".format(bucket, self.datetime_str(),
self.random_str())
- self.assert_command(
- "gsutil -q -m rsync -r -x '.git*|.idea*' ./ {}/".format(
- staging_dir))
+ self.assert_command("gsutil -q -m rsync -r -x '.git*|.idea*' ./ {}/".
+ format(staging_dir))
return staging_dir
def tearDown(self):
+ try:
+ self.name
+ except AttributeError:
+ logging.info("Skipping cluster delete: name undefined")
+ return
+
ret_code, _, stderr = self.run_command(
"gcloud dataproc clusters delete {} --region={} --quiet --async".
format(self.name, self.REGION))
@@ -161,6 +165,10 @@ def tearDown(self):
def getClusterName(self):
return self.name
+ @staticmethod
+ def getImageVersion():
+ return pkg_resources.parse_version(FLAGS.image_version)
+
def upload_test_file(self, testfile, name):
self.assert_command('gcloud compute scp {} {}:'.format(testfile, name))
@@ -281,16 +289,3 @@ def run_command(cmd, timeout_in_minutes=DEFAULT_TIMEOUT):
logging.debug("Ran %s: retcode: %d, stdout: %s, stderr: %s", cmd,
p.returncode, stdout, stderr)
return p.returncode, stdout, stderr
-
- @staticmethod
- def generate_verbose_test_name(testcase_func, param_num, param):
- return "{} [mode={}, version={}, random_prefix={}]".format(
- testcase_func.__name__, param.args[0],
- param.args[1].replace(".", "_"), DataprocTestCase.random_str())
-
-
-if __name__ == '__main__':
- if PARALLEL_RUN:
- fastunit.main()
- else:
- unittest.main()
diff --git a/integration_tests/requirements.txt b/integration_tests/requirements.txt
deleted file mode 100644
index 4581b850c..000000000
--- a/integration_tests/requirements.txt
+++ /dev/null
@@ -1 +0,0 @@
-parameterized==0.7.0
diff --git a/jupyter/BUILD b/jupyter/BUILD
new file mode 100644
index 000000000..249c92812
--- /dev/null
+++ b/jupyter/BUILD
@@ -0,0 +1,22 @@
+package(default_visibility = ["//visibility:public"])
+
+py_test(
+ name = "test_jupyter",
+ size = "enormous",
+ srcs = ["test_jupyter.py"],
+ data = [
+ "jupyter.sh",
+ "launch-jupyter-interface.sh",
+ "//conda:bootstrap-conda.sh",
+ "//util:utils.sh",
+ ] + glob([
+ "internal/*",
+ "kernels/*",
+ ]),
+ local = True,
+ shard_count = 4,
+ deps = [
+ "//integration_tests:dataproc_test_case",
+ "@io_abseil_py//absl/testing:parameterized",
+ ],
+)
diff --git a/jupyter/test_jupyter.py b/jupyter/test_jupyter.py
index 667d69e4f..b3680cbe2 100644
--- a/jupyter/test_jupyter.py
+++ b/jupyter/test_jupyter.py
@@ -1,6 +1,5 @@
-import unittest
-
-from parameterized import parameterized
+from absl.testing import absltest
+from absl.testing import parameterized
from integration_tests.dataproc_test_case import DataprocTestCase
@@ -15,53 +14,40 @@ def verify_instance(self, name, jupyter_port):
jupyter_port)
self.assert_instance_command(name, verify_cmd)
- @parameterized.expand(
- [
- ("SINGLE", "1.2", ["m"]),
- ("STANDARD", "1.2", ["m"]),
- ("SINGLE", "1.3", ["m"]),
- ("STANDARD", "1.3", ["m"]),
- ("SINGLE", "1.4", ["m"]),
- ("STANDARD", "1.4", ["m"]),
- ],
- testcase_func_name=DataprocTestCase.generate_verbose_test_name)
- def test_jupyter(self, configuration, dataproc_version, machine_suffixes):
+ @parameterized.parameters(
+ ("SINGLE", ["m"]),
+ ("STANDARD", ["m"]),
+ )
+ def test_jupyter(self, configuration, machine_suffixes):
metadata = 'INIT_ACTIONS_REPO={}'.format(self.INIT_ACTIONS_REPO)
- self.createCluster(configuration,
- self.INIT_ACTIONS,
- dataproc_version,
- metadata=metadata,
- timeout_in_minutes=15,
- machine_type="n1-standard-2")
+ self.createCluster(
+ configuration,
+ self.INIT_ACTIONS,
+ metadata=metadata,
+ timeout_in_minutes=15,
+ machine_type="n1-standard-2")
for machine_suffix in machine_suffixes:
self.verify_instance(
"{}-{}".format(self.getClusterName(), machine_suffix), "8123")
- @parameterized.expand(
- [
- ("SINGLE", "1.2", ["m"]),
- ("STANDARD", "1.2", ["m"]),
- ("SINGLE", "1.3", ["m"]),
- ("STANDARD", "1.3", ["m"]),
- ("SINGLE", "1.4", ["m"]),
- ("STANDARD", "1.4", ["m"]),
- ],
- testcase_func_name=DataprocTestCase.generate_verbose_test_name)
- def test_jupyter_with_metadata(self, configuration, dataproc_version,
- machine_suffixes):
+ @parameterized.parameters(
+ ("SINGLE", ["m"]),
+ ("STANDARD", ["m"]),
+ )
+ def test_jupyter_with_metadata(self, configuration, machine_suffixes):
jupyter_port = "8125"
metadata = 'INIT_ACTIONS_REPO={}'.format(self.INIT_ACTIONS_REPO)
metadata += ',JUPYTER_PORT={},JUPYTER_CONDA_PACKAGES={}'.format(
jupyter_port, "numpy:pandas:scikit-learn")
- self.createCluster(configuration,
- self.INIT_ACTIONS,
- dataproc_version,
- metadata=metadata,
- timeout_in_minutes=15,
- machine_type="n1-standard-2")
+ self.createCluster(
+ configuration,
+ self.INIT_ACTIONS,
+ metadata=metadata,
+ timeout_in_minutes=15,
+ machine_type="n1-standard-2")
for machine_suffix in machine_suffixes:
self.verify_instance(
@@ -70,4 +56,4 @@ def test_jupyter_with_metadata(self, configuration, dataproc_version,
if __name__ == '__main__':
- unittest.main()
+ absltest.main()
diff --git a/kafka/BUILD b/kafka/BUILD
new file mode 100644
index 000000000..703adf67a
--- /dev/null
+++ b/kafka/BUILD
@@ -0,0 +1,17 @@
+package(default_visibility = ["//visibility:public"])
+
+py_test(
+ name = "test_kafka",
+ size = "enormous",
+ srcs = ["test_kafka.py"],
+ data = [
+ "kafka.sh",
+ "validate.sh",
+ ],
+ local = True,
+ shard_count = 1,
+ deps = [
+ "//integration_tests:dataproc_test_case",
+ "@io_abseil_py//absl/testing:parameterized",
+ ],
+)
diff --git a/kafka/test_kafka.py b/kafka/test_kafka.py
index 169b57884..133735254 100644
--- a/kafka/test_kafka.py
+++ b/kafka/test_kafka.py
@@ -1,7 +1,7 @@
import os
-import unittest
-from parameterized import parameterized
+from absl.testing import absltest
+from absl.testing import parameterized
from integration_tests.dataproc_test_case import DataprocTestCase
@@ -13,8 +13,9 @@ class KafkaTestCase(DataprocTestCase):
def verify_instance(self, name):
self.upload_test_file(
- os.path.join(os.path.dirname(os.path.abspath(__file__)),
- self.TEST_SCRIPT_FILE_NAME), name)
+ os.path.join(
+ os.path.dirname(os.path.abspath(__file__)),
+ self.TEST_SCRIPT_FILE_NAME), name)
self.__run_test_script(name)
self.remove_test_script(self.TEST_SCRIPT_FILE_NAME, name)
@@ -22,21 +23,15 @@ def __run_test_script(self, name):
self.assert_instance_command(
name, "bash {}".format(self.TEST_SCRIPT_FILE_NAME))
- @parameterized.expand(
- [
- ("HA", "1.2", ["m-0", "m-1", "m-2"]),
- ("HA", "1.3", ["m-0", "m-1", "m-2"]),
- ],
- testcase_func_name=DataprocTestCase.generate_verbose_test_name)
- def test_kafka(self, configuration, dataproc_version, machine_suffixes):
- self.createCluster(configuration,
- self.INIT_ACTIONS,
- dataproc_version,
- machine_type="n1-standard-2")
+ @parameterized.parameters(
+ ("HA", ["m-0", "m-1", "m-2"]), )
+ def test_kafka(self, configuration, machine_suffixes):
+ self.createCluster(
+ configuration, self.INIT_ACTIONS, machine_type="n1-standard-2")
for machine_suffix in machine_suffixes:
self.verify_instance("{}-{}".format(self.getClusterName(),
machine_suffix))
if __name__ == '__main__':
- unittest.main()
+ absltest.main()
diff --git a/livy/BUILD b/livy/BUILD
new file mode 100644
index 000000000..8d0f73735
--- /dev/null
+++ b/livy/BUILD
@@ -0,0 +1,21 @@
+package(default_visibility = ["//visibility:public"])
+
+py_test(
+ name = "test_livy",
+ size = "enormous",
+ srcs = ["test_livy.py"],
+ data = ["livy.sh"],
+ local = True,
+ shard_count = 3,
+ deps = [
+ ":verify_livy_running",
+ "//integration_tests:dataproc_test_case",
+ "@io_abseil_py//absl/testing:parameterized",
+ ],
+)
+
+py_library(
+ name = "verify_livy_running",
+ testonly = True,
+ srcs = ["verify_livy_running.py"],
+)
diff --git a/livy/test_livy.py b/livy/test_livy.py
index 93f9cf6f8..0bfa875b1 100644
--- a/livy/test_livy.py
+++ b/livy/test_livy.py
@@ -1,7 +1,7 @@
import os
-import unittest
-from parameterized import parameterized
+from absl.testing import absltest
+from absl.testing import parameterized
from integration_tests.dataproc_test_case import DataprocTestCase
@@ -13,8 +13,9 @@ class LivyTestCase(DataprocTestCase):
def _verify_instance(self, name):
self.upload_test_file(
- os.path.join(os.path.dirname(os.path.abspath(__file__)),
- self.TEST_SCRIPT_FILE_NAME), name)
+ os.path.join(
+ os.path.dirname(os.path.abspath(__file__)),
+ self.TEST_SCRIPT_FILE_NAME), name)
self._run_python_test_file(name)
self.remove_test_script(self.TEST_SCRIPT_FILE_NAME, name)
@@ -26,25 +27,17 @@ def _run_python_test_file(self, name):
self.assert_instance_command(
name, "sudo python3 {}".format(self.TEST_SCRIPT_FILE_NAME))
- @parameterized.expand(
- [
- ("SINGLE", "1.2", ["m"]),
- ("STANDARD", "1.2", ["m"]),
- ("HA", "1.2", ["m-0", "m-1", "m-2"]),
- ("SINGLE", "1.3", ["m"]),
- ("STANDARD", "1.3", ["m"]),
- ("HA", "1.3", ["m-0", "m-1", "m-2"]),
- ("SINGLE", "1.4", ["m"]),
- ("STANDARD", "1.4", ["m"]),
- ("HA", "1.4", ["m-0", "m-1", "m-2"]),
- ],
- testcase_func_name=DataprocTestCase.generate_verbose_test_name)
- def test_livy(self, configuration, dataproc_version, machine_suffixes):
- self.createCluster(configuration, self.INIT_ACTIONS, dataproc_version)
+ @parameterized.parameters(
+ ("SINGLE", ["m"]),
+ ("STANDARD", ["m"]),
+ ("HA", ["m-0", "m-1", "m-2"]),
+ )
+ def test_livy(self, configuration, machine_suffixes):
+ self.createCluster(configuration, self.INIT_ACTIONS)
for machine_suffix in machine_suffixes:
self._verify_instance("{}-{}".format(self.getClusterName(),
machine_suffix))
if __name__ == '__main__':
- unittest.main()
+ absltest.main()
diff --git a/livy/verify_livy_running.py b/livy/verify_livy_running.py
index fb9608d04..624528548 100644
--- a/livy/verify_livy_running.py
+++ b/livy/verify_livy_running.py
@@ -16,9 +16,10 @@ class Livy:
headers = {'Content-Type': 'application/json'}
def create_session(self):
- resp = requests.post(self.host + '/sessions',
- data=json.dumps(self.session_data),
- headers=self.headers)
+ resp = requests.post(
+ self.host + '/sessions',
+ data=json.dumps(self.session_data),
+ headers=self.headers)
self.session_url = self.host + resp.headers['Location']
def wait_for_session_idle(self):
@@ -36,9 +37,8 @@ def wait_for_session_idle(self):
exit(1)
def submit_job(self, data):
- requests.post(self.statements_url,
- data=json.dumps(data),
- headers=self.headers)
+ requests.post(
+ self.statements_url, data=json.dumps(data), headers=self.headers)
def validate_job_result(self, expected):
wait_seconds_remain = WAIT_SECONDS
diff --git a/oozie/BUILD b/oozie/BUILD
new file mode 100644
index 000000000..1c8a0bea4
--- /dev/null
+++ b/oozie/BUILD
@@ -0,0 +1,17 @@
+package(default_visibility = ["//visibility:public"])
+
+py_test(
+ name = "test_oozie",
+ size = "enormous",
+ srcs = ["test_oozie.py"],
+ data = [
+ "oozie.sh",
+ "validate.sh",
+ ],
+ local = True,
+ shard_count = 3,
+ deps = [
+ "//integration_tests:dataproc_test_case",
+ "@io_abseil_py//absl/testing:parameterized",
+ ],
+)
diff --git a/oozie/test_oozie.py b/oozie/test_oozie.py
index 9d2966167..98d902c0d 100644
--- a/oozie/test_oozie.py
+++ b/oozie/test_oozie.py
@@ -1,7 +1,7 @@
import os
-import unittest
-from parameterized import parameterized
+from absl.testing import absltest
+from absl.testing import parameterized
from integration_tests.dataproc_test_case import DataprocTestCase
@@ -23,29 +23,21 @@ def __run_test_file(self, name):
self.assert_instance_command(
name, "bash {}".format(self.TEST_SCRIPT_FILE_NAME))
- @parameterized.expand(
- [
- ("SINGLE", "1.2", ["m"]),
- ("STANDARD", "1.2", ["m"]),
- ("HA", "1.2", ["m-0", "m-1", "m-2"]),
- ("SINGLE", "1.3", ["m"]),
- ("STANDARD", "1.3", ["m"]),
- ("HA", "1.3", ["m-0", "m-1", "m-2"]),
- ("SINGLE", "1.4", ["m"]),
- ("STANDARD", "1.4", ["m"]),
- ("HA", "1.4", ["m-0", "m-1", "m-2"]),
- ],
- testcase_func_name=DataprocTestCase.generate_verbose_test_name)
- def test_oozie(self, configuration, dataproc_version, machine_suffixes):
- self.createCluster(configuration,
- self.INIT_ACTIONS,
- dataproc_version,
- machine_type="n1-standard-4",
- boot_disk_size="200GB")
+ @parameterized.parameters(
+ ("SINGLE", ["m"]),
+ ("STANDARD", ["m"]),
+ ("HA", ["m-0", "m-1", "m-2"]),
+ )
+ def test_oozie(self, configuration, machine_suffixes):
+ self.createCluster(
+ configuration,
+ self.INIT_ACTIONS,
+ machine_type="n1-standard-4",
+ boot_disk_size="200GB")
for machine_suffix in machine_suffixes:
self.verify_instance("{}-{}".format(self.getClusterName(),
machine_suffix))
if __name__ == '__main__':
- unittest.main()
+ absltest.main()
diff --git a/presto/BUILD b/presto/BUILD
new file mode 100644
index 000000000..db66ad285
--- /dev/null
+++ b/presto/BUILD
@@ -0,0 +1,14 @@
+package(default_visibility = ["//visibility:public"])
+
+py_test(
+ name = "test_presto",
+ size = "enormous",
+ srcs = ["test_presto.py"],
+ data = ["presto.sh"],
+ local = True,
+ shard_count = 4,
+ deps = [
+ "//integration_tests:dataproc_test_case",
+ "@io_abseil_py//absl/testing:parameterized",
+ ],
+)
diff --git a/presto/test_presto.py b/presto/test_presto.py
index abd6b6371..9f8e81a03 100644
--- a/presto/test_presto.py
+++ b/presto/test_presto.py
@@ -1,7 +1,7 @@
import random
-import unittest
-from parameterized import parameterized
+from absl.testing import absltest
+from absl.testing import parameterized
from integration_tests.dataproc_test_case import DataprocTestCase
@@ -31,12 +31,11 @@ def __create_schema_via_hive(self, name, schema):
def __verify_schema_via_presto(self, name, schema):
query = "show schemas;"
_, stdout, _ = self.assert_instance_command(
- name,
- "presto --catalog=hive --execute='{}' --output-format TSV".format(
- query))
+ name, "presto --catalog=hive --execute='{}' --output-format TSV".
+ format(query))
schemas = str(stdout).split("\n")
- self.assertIn(schema, schemas,
- "Schema {} not found in {}".format(schema, schemas))
+ self.assertIn(schema, schemas, "Schema {} not found in {}".format(
+ schema, schemas))
def __create_table(self, name, table, schema):
query = "create table {}(number int) STORED AS SEQUENCEFILE;".format(
@@ -79,44 +78,28 @@ def __verify_workers_count(self, name, workers, server_param=""):
"Bad number of workers. Expected: {}\tFound: {}".format(
workers, stdout))
- @parameterized.expand(
- [
- ("SINGLE", "1.2", ["m"], 1, 0),
- ("STANDARD", "1.2", ["m"], 1, 2),
- ("HA", "1.2", ["m-0"], 1, 2),
- ("SINGLE", "1.3", ["m"], 1, 0),
- ("STANDARD", "1.3", ["m"], 1, 2),
- ("HA", "1.3", ["m-0"], 1, 2),
- ("SINGLE", "1.4", ["m"], 1, 0),
- ("STANDARD", "1.4", ["m"], 1, 2),
- ("HA", "1.4", ["m-0"], 1, 2),
- ],
- testcase_func_name=DataprocTestCase.generate_verbose_test_name)
- def test_presto(self, configuration, dataproc_version, machine_suffixes,
- coordinators, workers):
- self.createCluster(configuration,
- self.INIT_ACTIONS,
- dataproc_version,
- machine_type="n1-standard-2")
+ @parameterized.parameters(
+ ("SINGLE", ["m"], 1, 0),
+ ("STANDARD", ["m"], 1, 2),
+ ("HA", ["m-0"], 1, 2),
+ )
+ def test_presto(self, configuration, machine_suffixes, coordinators,
+ workers):
+ self.createCluster(
+ configuration, self.INIT_ACTIONS, machine_type="n1-standard-2")
for machine_suffix in machine_suffixes:
self.verify_instance(
"{}-{}".format(self.getClusterName(), machine_suffix),
coordinators, workers)
- @parameterized.expand(
- [
- ("SINGLE", "1.2", ["m"], 1, 0),
- ("SINGLE", "1.3", ["m"], 1, 0),
- ("SINGLE", "1.4", ["m"], 1, 0),
- ],
- testcase_func_name=DataprocTestCase.generate_verbose_test_name)
- def test_presto_custom_port(self, configuration, dataproc_version,
- machine_suffixes, coordinators, workers):
- self.createCluster(configuration,
- self.INIT_ACTIONS,
- dataproc_version,
- machine_type="n1-standard-2",
- metadata="presto-port=8060")
+ @parameterized.parameters(("SINGLE", ["m"], 1, 0))
+ def test_presto_custom_port(self, configuration, machine_suffixes,
+ coordinators, workers):
+ self.createCluster(
+ configuration,
+ self.INIT_ACTIONS,
+ machine_type="n1-standard-2",
+ metadata="presto-port=8060")
for machine_suffix in machine_suffixes:
machine_name = "{}-{}".format(self.getClusterName(),
machine_suffix)
@@ -127,4 +110,4 @@ def test_presto_custom_port(self, configuration, dataproc_version,
if __name__ == '__main__':
- unittest.main()
+ absltest.main()
diff --git a/ranger/BUILD b/ranger/BUILD
new file mode 100644
index 000000000..8d311c444
--- /dev/null
+++ b/ranger/BUILD
@@ -0,0 +1,24 @@
+package(default_visibility = ["//visibility:public"])
+
+py_test(
+ name = "test_ranger",
+ size = "enormous",
+ srcs = ["test_ranger.py"],
+ data = [
+ "ranger.sh",
+ "//solr:solr.sh",
+ ],
+ local = True,
+ shard_count = 3,
+ deps = [
+ ":verify_ranger",
+ "//integration_tests:dataproc_test_case",
+ "@io_abseil_py//absl/testing:parameterized",
+ ],
+)
+
+py_library(
+ name = "verify_ranger",
+ testonly = True,
+ srcs = ["verify_ranger.py"],
+)
diff --git a/ranger/test_ranger.py b/ranger/test_ranger.py
index 4337435db..045d4cb0e 100644
--- a/ranger/test_ranger.py
+++ b/ranger/test_ranger.py
@@ -1,9 +1,10 @@
"""This module provides testing functionality of the Apache Ranger Init Action.
"""
-import unittest
import os
-from parameterized import parameterized
+import pkg_resources
+from absl.testing import absltest
+from absl.testing import parameterized
from integration_tests.dataproc_test_case import DataprocTestCase
@@ -15,8 +16,9 @@ class RangerTestCase(DataprocTestCase):
def verify_instance(self, name):
self.upload_test_file(
- os.path.join(os.path.dirname(os.path.abspath(__file__)),
- self.TEST_SCRIPT_FILE_NAME), name)
+ os.path.join(
+ os.path.dirname(os.path.abspath(__file__)),
+ self.TEST_SCRIPT_FILE_NAME), name)
self.__run_test_script(name)
self.remove_test_script(self.TEST_SCRIPT_FILE_NAME, name)
@@ -24,18 +26,19 @@ def __run_test_script(self, name):
self.assert_instance_command(
name, "python {}".format(self.TEST_SCRIPT_FILE_NAME))
- @parameterized.expand(
- [
- ("SINGLE", "1.3", ["m"]),
- ("STANDARD", "1.3", ["m"]),
- ("HA", "1.3", ["m-0"]),
- ],
- testcase_func_name=DataprocTestCase.generate_verbose_test_name)
- def test_ranger(self, configuration, dataproc_version, machine_suffixes):
+ @parameterized.parameters(
+ ("SINGLE", ["m"]),
+ ("STANDARD", ["m"]),
+ ("HA", ["m-0"]),
+ )
+ def test_ranger(self, configuration, machine_suffixes):
+ # Init action supported on Dataproc 1.3+
+ if self.getImageVersion() < pkg_resources.parse_version("1.3"):
+ return
+
self.createCluster(
configuration,
self.INIT_ACTIONS,
- dataproc_version,
machine_type="n1-standard-2",
metadata="ranger-port={},default-admin-password={}".format(
6080, "dataproc2019"))
@@ -45,4 +48,4 @@ def test_ranger(self, configuration, dataproc_version, machine_suffixes):
if __name__ == "__main__":
- unittest.main()
+ absltest.main()
diff --git a/rapids/BUILD b/rapids/BUILD
new file mode 100644
index 000000000..ccf6cbc3d
--- /dev/null
+++ b/rapids/BUILD
@@ -0,0 +1,21 @@
+package(default_visibility = ["//visibility:public"])
+
+py_test(
+ name = "test_rapids",
+ size = "enormous",
+ srcs = ["test_rapids.py"],
+ data = ["rapids.sh"] + glob(["internal/*"]),
+ local = True,
+ shard_count = 1,
+ deps = [
+ ":verify_rapids",
+ "//integration_tests:dataproc_test_case",
+ "@io_abseil_py//absl/testing:parameterized",
+ ],
+)
+
+py_library(
+ name = "verify_rapids",
+ testonly = True,
+ srcs = ["verify_rapids.py"],
+)
diff --git a/rapids/test_rapids.py b/rapids/test_rapids.py
index 33fff3dce..41d32cf44 100644
--- a/rapids/test_rapids.py
+++ b/rapids/test_rapids.py
@@ -1,7 +1,8 @@
import os
-import unittest
-from parameterized import parameterized
+import pkg_resources
+from absl.testing import absltest
+from absl.testing import parameterized
from integration_tests.dataproc_test_case import DataprocTestCase
@@ -13,8 +14,9 @@ class RapidsTestCase(DataprocTestCase):
def verify_instance(self, name):
self.upload_test_file(
- os.path.join(os.path.dirname(os.path.abspath(__file__)),
- self.TEST_SCRIPT_FILE_NAME), name)
+ os.path.join(
+ os.path.dirname(os.path.abspath(__file__)),
+ self.TEST_SCRIPT_FILE_NAME), name)
self.__run_test_script(name)
self.remove_test_script(self.TEST_SCRIPT_FILE_NAME, name)
@@ -23,25 +25,23 @@ def __run_test_script(self, name):
self.TEST_SCRIPT_FILE_NAME)
self.assert_instance_command(name, verify_cmd)
- @parameterized.expand(
- [
- ("STANDARD", "1.3", ["m"]),
- ("STANDARD", "1.4", ["m"]),
- ],
- testcase_func_name=DataprocTestCase.generate_verbose_test_name)
- def test_rapids(self, configuration, dataproc_version, machine_suffixes):
- metadata = 'INIT_ACTIONS_REPO={}'.format(self.INIT_ACTIONS_REPO)
+ @parameterized.parameters(("STANDARD", ["m"]))
+ def test_rapids(self, configuration, machine_suffixes):
+ # Init action supported on Dataproc 1.3+
+ if self.getImageVersion() < pkg_resources.parse_version("1.3"):
+ return
- self.createCluster(configuration,
- self.INIT_ACTIONS,
- dataproc_version,
- metadata=metadata,
- beta=True,
- master_accelerator='type=nvidia-tesla-p100',
- worker_accelerator='type=nvidia-tesla-p100',
- optional_components='ANACONDA',
- machine_type='n1-standard-2',
- timeout_in_minutes=20)
+ metadata = 'INIT_ACTIONS_REPO={}'.format(self.INIT_ACTIONS_REPO)
+ self.createCluster(
+ configuration,
+ self.INIT_ACTIONS,
+ metadata=metadata,
+ beta=True,
+ master_accelerator='type=nvidia-tesla-p100',
+ worker_accelerator='type=nvidia-tesla-p100',
+ optional_components='ANACONDA',
+ machine_type='n1-standard-2',
+ timeout_in_minutes=20)
for machine_suffix in machine_suffixes:
self.verify_instance("{}-{}".format(self.getClusterName(),
@@ -49,4 +49,4 @@ def test_rapids(self, configuration, dataproc_version, machine_suffixes):
if __name__ == '__main__':
- unittest.main()
+ absltest.main()
diff --git a/rstudio/BUILD b/rstudio/BUILD
new file mode 100644
index 000000000..85b4e43b2
--- /dev/null
+++ b/rstudio/BUILD
@@ -0,0 +1,14 @@
+package(default_visibility = ["//visibility:public"])
+
+py_test(
+ name = "test_rstudio",
+ size = "enormous",
+ srcs = ["test_rstudio.py"],
+ data = ["rstudio.sh"],
+ local = True,
+ shard_count = 4,
+ deps = [
+ "//integration_tests:dataproc_test_case",
+ "@io_abseil_py//absl/testing:parameterized",
+ ],
+)
diff --git a/rstudio/test_rstudio.py b/rstudio/test_rstudio.py
index fa1270470..b43597b7d 100644
--- a/rstudio/test_rstudio.py
+++ b/rstudio/test_rstudio.py
@@ -1,7 +1,5 @@
-import random
-import unittest
-
-from parameterized import parameterized
+from absl.testing import absltest
+from absl.testing import parameterized
from integration_tests.dataproc_test_case import DataprocTestCase
@@ -10,24 +8,21 @@ class RStudioTestCase(DataprocTestCase):
COMPONENT = 'rstudio'
INIT_ACTIONS = ['rstudio/rstudio.sh']
- @parameterized.expand(
- [
- ("SINGLE", "1.2", "rstudio", "password"),
- ("SINGLE", "1.3", "rstudio", "password"),
- ("SINGLE", "1.3", "", "password"), # default username
- ("SINGLE", "1.3", "rstudio", ""), # no auth
- ("SINGLE", "1.3", "", ""), # default username and no auth
- ("SINGLE", "1.4", "rstudio", "password"),
- ],
- testcase_func_name=DataprocTestCase.generate_verbose_test_name)
- def test_rstudio(self, configuration, dataproc_version, user, password):
+ @parameterized.parameters(
+ ("SINGLE", "rstudio", "password"),
+ ("SINGLE", "", "password"), # default username
+ ("SINGLE", "rstudio", ""), # no auth
+ ("SINGLE", "", ""), # default username and no auth
+ )
+ def test_rstudio(self, configuration, user, password):
metadata = "rstudio-password={}".format(password)
if user:
metadata += ",rstudio-user={}".format(user)
- self.createCluster(configuration,
- self.INIT_ACTIONS,
- dataproc_version,
- metadata=metadata)
+ self.createCluster(configuration, self.INIT_ACTIONS, metadata=metadata)
instance_name = self.getClusterName() + "-m"
self.assert_instance_command(
instance_name, "curl http://{}:8787".format(instance_name))
+
+
+if __name__ == '__main__':
+ absltest.main()
diff --git a/solr/BUILD b/solr/BUILD
new file mode 100644
index 000000000..bddcfa963
--- /dev/null
+++ b/solr/BUILD
@@ -0,0 +1,23 @@
+package(default_visibility = ["//visibility:public"])
+
+exports_files(["solr.sh"])
+
+py_test(
+ name = "test_solr",
+ size = "enormous",
+ srcs = ["test_solr.py"],
+ data = ["solr.sh"],
+ local = True,
+ shard_count = 3,
+ deps = [
+ ":verify_solr",
+ "//integration_tests:dataproc_test_case",
+ "@io_abseil_py//absl/testing:parameterized",
+ ],
+)
+
+py_library(
+ name = "verify_solr",
+ testonly = True,
+ srcs = ["verify_solr.py"],
+)
diff --git a/solr/test_solr.py b/solr/test_solr.py
index 1c0b8ce92..2e709d48a 100644
--- a/solr/test_solr.py
+++ b/solr/test_solr.py
@@ -3,9 +3,9 @@
"""
import os
-import unittest
-from parameterized import parameterized
+from absl.testing import absltest
+from absl.testing import parameterized
from integration_tests.dataproc_test_case import DataprocTestCase
@@ -17,8 +17,9 @@ class SolrTestCase(DataprocTestCase):
def verify_instance(self, name):
self.upload_test_file(
- os.path.join(os.path.dirname(os.path.abspath(__file__)),
- self.TEST_SCRIPT_FILE_NAME), name)
+ os.path.join(
+ os.path.dirname(os.path.abspath(__file__)),
+ self.TEST_SCRIPT_FILE_NAME), name)
self.__run_test_script(name)
self.remove_test_script(self.TEST_SCRIPT_FILE_NAME, name)
@@ -26,22 +27,17 @@ def __run_test_script(self, name):
self.assert_instance_command(
name, "python3 {}".format(self.TEST_SCRIPT_FILE_NAME))
- @parameterized.expand(
- [
- ("SINGLE", "1.2", ["m"]),
- ("STANDARD", "1.2", ["m"]),
- ("HA", "1.2", ["m-0"]),
- ("SINGLE", "1.3", ["m"]),
- ("STANDARD", "1.3", ["m"]),
- ("HA", "1.3", ["m-0"]),
- ],
- testcase_func_name=DataprocTestCase.generate_verbose_test_name)
- def test_solr(self, configuration, dataproc_version, machine_suffixes):
- self.createCluster(configuration, self.INIT_ACTIONS, dataproc_version)
+ @parameterized.parameters(
+ ("SINGLE", ["m"]),
+ ("STANDARD", ["m"]),
+ ("HA", ["m-0"]),
+ )
+ def test_solr(self, configuration, machine_suffixes):
+ self.createCluster(configuration, self.INIT_ACTIONS)
for machine_suffix in machine_suffixes:
self.verify_instance("{}-{}".format(self.getClusterName(),
machine_suffix))
if __name__ == '__main__':
- unittest.main()
+ absltest.main()
diff --git a/starburst-presto/test_presto.py b/starburst-presto/test_starburst_presto.py
similarity index 67%
rename from starburst-presto/test_presto.py
rename to starburst-presto/test_starburst_presto.py
index 60d928893..23dbc86ae 100644
--- a/starburst-presto/test_presto.py
+++ b/starburst-presto/test_starburst_presto.py
@@ -1,7 +1,7 @@
import random
-import unittest
-from parameterized import parameterized
+from absl.testing import absltest
+from absl.testing import parameterized
from integration_tests.dataproc_test_case import DataprocTestCase
@@ -31,12 +31,11 @@ def __create_schema_via_hive(self, name, schema):
def __verify_schema_via_presto(self, name, schema):
query = "show schemas;"
_, stdout, _ = self.assert_instance_command(
- name,
- "presto --catalog=hive --execute='{}' --output-format TSV".format(
- query))
+ name, "presto --catalog=hive --execute='{}' --output-format TSV".
+ format(query))
schemas = str(stdout).split("\n")
- self.assertIn(schema, schemas,
- "Schema {} not found in {}".format(schema, schemas))
+ self.assertIn(schema, schemas, "Schema {} not found in {}".format(
+ schema, schemas))
def __create_table(self, name, table, schema):
query = "create table {}(number int) STORED AS SEQUENCEFILE;".format(
@@ -79,45 +78,29 @@ def __verify_workers_count(self, name, workers, server_param=""):
"Bad number of workers. Expected: {}\tFound: {}".format(
workers, stdout))
- @parameterized.expand(
- [
- ("SINGLE", "1.2", ["m"], 1, 0),
- ("STANDARD", "1.2", ["m"], 1, 2),
- ("HA", "1.2", ["m-0"], 1, 2),
- ("SINGLE", "1.3", ["m"], 1, 0),
- ("STANDARD", "1.3", ["m"], 1, 2),
- ("HA", "1.3", ["m-0"], 1, 2),
- ("SINGLE", "1.4", ["m"], 1, 0),
- ("STANDARD", "1.4", ["m"], 1, 2),
- ("HA", "1.4", ["m-0"], 1, 2),
- ],
- testcase_func_name=DataprocTestCase.generate_verbose_test_name)
- def test_starburst_presto(self, configuration, dataproc_version,
- machine_suffixes, coordinators, workers):
- self.createCluster(configuration,
- self.INIT_ACTIONS,
- dataproc_version,
- machine_type="n1-standard-2")
+ @parameterized.parameters(
+ ("SINGLE", ["m"], 1, 0),
+ ("STANDARD", ["m"], 1, 2),
+ ("HA", ["m-0"], 1, 2),
+ )
+ def test_starburst_presto(self, configuration, machine_suffixes,
+ coordinators, workers):
+ self.createCluster(
+ configuration, self.INIT_ACTIONS, machine_type="n1-standard-2")
for machine_suffix in machine_suffixes:
self.verify_instance(
"{}-{}".format(self.getClusterName(), machine_suffix),
coordinators, workers)
- @parameterized.expand(
- [
- ("SINGLE", "1.2", ["m"], 1, 0),
- ("SINGLE", "1.3", ["m"], 1, 0),
- ("SINGLE", "1.4", ["m"], 1, 0),
- ],
- testcase_func_name=DataprocTestCase.generate_verbose_test_name)
- def test_starburst_presto_custom_port(self, configuration,
- dataproc_version, machine_suffixes,
- coordinators, workers):
- self.createCluster(configuration,
- self.INIT_ACTIONS,
- dataproc_version,
- machine_type="n1-standard-2",
- metadata="presto-port=8060")
+ @parameterized.parameters(
+ ("SINGLE", ["m"], 1, 0), )
+ def test_starburst_presto_custom_port(
+ self, configuration, machine_suffixes, coordinators, workers):
+ self.createCluster(
+ configuration,
+ self.INIT_ACTIONS,
+ machine_type="n1-standard-2",
+ metadata="presto-port=8060")
for machine_suffix in machine_suffixes:
machine_name = "{}-{}".format(self.getClusterName(),
machine_suffix)
@@ -128,4 +111,4 @@ def test_starburst_presto_custom_port(self, configuration,
if __name__ == '__main__':
- unittest.main()
+ absltest.main()
diff --git a/tez/BUILD b/tez/BUILD
new file mode 100644
index 000000000..a0c27c84f
--- /dev/null
+++ b/tez/BUILD
@@ -0,0 +1,21 @@
+package(default_visibility = ["//visibility:public"])
+
+py_test(
+ name = "test_tez",
+ size = "enormous",
+ srcs = ["test_tez.py"],
+ data = ["tez.sh"],
+ local = True,
+ shard_count = 3,
+ deps = [
+ ":verify_tez",
+ "//integration_tests:dataproc_test_case",
+ "@io_abseil_py//absl/testing:parameterized",
+ ],
+)
+
+py_library(
+ name = "verify_tez",
+ testonly = True,
+ srcs = ["verify_tez.py"],
+)
diff --git a/tez/test_tez.py b/tez/test_tez.py
index 2bfd0ad20..480926c7b 100644
--- a/tez/test_tez.py
+++ b/tez/test_tez.py
@@ -8,9 +8,10 @@
flag. Test script is executed on every master node.
"""
import os
-import unittest
-from parameterized import parameterized
+import pkg_resources
+from absl.testing import absltest
+from absl.testing import parameterized
from integration_tests.dataproc_test_case import DataprocTestCase
@@ -32,42 +33,27 @@ def __run_test_script(self, name):
self.assert_instance_command(
name, "python {}".format(self.TEST_SCRIPT_FILE_NAME))
- @parameterized.expand(
- [
- ("SINGLE", "1.2", ["m"]),
- ("STANDARD", "1.2", ["m"]),
- ("HA", "1.2", ["m-0", "m-1", "m-2"]),
- ],
- testcase_func_name=DataprocTestCase.generate_verbose_test_name)
- def test_tez(self, configuration, dataproc_version, machine_suffixes):
- self.createCluster(configuration, self.INIT_ACTIONS, dataproc_version)
- for machine_suffix in machine_suffixes:
- self.verify_instance("{}-{}".format(self.getClusterName(),
- machine_suffix))
+ @parameterized.parameters(
+ ("SINGLE", ["m"]),
+ ("STANDARD", ["m"]),
+ ("HA", ["m-0", "m-1", "m-2"]),
+ )
+ def test_tez(self, configuration, machine_suffixes):
+ init_actions = self.INIT_ACTIONS
+ properties = None
+ if self.getImageVersion() >= pkg_resources.parse_version("1.3"):
+ # Remove init action - in Dataproc 1.3+ Tez installed by default
+ init_actions = []
+ tez_classpath = "/etc/tez/conf:/usr/lib/tez/*:/usr/lib/tez/lib/*"
+ properties = "'hadoop-env:HADOOP_CLASSPATH={}:{}'".format(
+ "${HADOOP_CLASSPATH}", tez_classpath)
+
+ self.createCluster(configuration, init_actions, properties=properties)
- @parameterized.expand(
- [
- ("SINGLE", "1.3", ["m"]),
- ("STANDARD", "1.3", ["m"]),
- ("HA", "1.3", ["m-0", "m-1", "m-2"]),
- ("SINGLE", "1.4", ["m"]),
- ("STANDARD", "1.4", ["m"]),
- ("HA", "1.4", ["m-0", "m-1", "m-2"]),
- ],
- testcase_func_name=DataprocTestCase.generate_verbose_test_name)
- def test_tez_on_image_version_1_3(self, configuration, dataproc_version,
- machine_suffixes):
- tez_classpath = "/etc/tez/conf:/usr/lib/tez/*:/usr/lib/tez/lib/*"
- self.createCluster(
- configuration,
- init_actions=[],
- dataproc_version=dataproc_version,
- properties="'hadoop-env:HADOOP_CLASSPATH={}:{}'".format(
- "${HADOOP_CLASSPATH}", tez_classpath))
for machine_suffix in machine_suffixes:
self.verify_instance("{}-{}".format(self.getClusterName(),
machine_suffix))
if __name__ == '__main__':
- unittest.main()
+ absltest.main()
diff --git a/tez/verify_tez.py b/tez/verify_tez.py
index e87857b2e..204186221 100644
--- a/tez/verify_tez.py
+++ b/tez/verify_tez.py
@@ -7,15 +7,13 @@
def main():
copy_test_file_cmd = shlex.split(
'hadoop fs -copyFromLocal /usr/lib/tez/LICENSE-MIT /tmp/LICENSE-MIT')
- copy_test_file_out = subprocess.Popen(copy_test_file_cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
+ copy_test_file_out = subprocess.Popen(
+ copy_test_file_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
health_check_cmd = shlex.split(
'hadoop jar /usr/lib/tez/tez-examples.jar orderedwordcount /tmp/LICENSE-MIT /tmp/tez-out'
)
- health_check_out = subprocess.Popen(health_check_cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
+ health_check_out = subprocess.Popen(
+ health_check_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
info = health_check_out.communicate()[0]
if 'DAG completed. FinalState=SUCCEEDED' not in info:
raise Exception('Running tez job failed.')
diff --git a/tony/BUILD b/tony/BUILD
new file mode 100644
index 000000000..fb56d8818
--- /dev/null
+++ b/tony/BUILD
@@ -0,0 +1,14 @@
+package(default_visibility = ["//visibility:public"])
+
+py_test(
+ name = "test_tony",
+ size = "enormous",
+ srcs = ["test_tony.py"],
+ data = ["tony.sh"],
+ local = True,
+ shard_count = 2,
+ deps = [
+ "//integration_tests:dataproc_test_case",
+ "@io_abseil_py//absl/testing:parameterized",
+ ],
+)
diff --git a/tony/test_tony.py b/tony/test_tony.py
index 0d41c6bb1..84ea38e9c 100644
--- a/tony/test_tony.py
+++ b/tony/test_tony.py
@@ -1,6 +1,6 @@
-import unittest
-
-from parameterized import parameterized
+import pkg_resources
+from absl.testing import absltest
+from absl.testing import parameterized
from integration_tests.dataproc_test_case import DataprocTestCase
@@ -9,19 +9,20 @@ class TonYTestCase(DataprocTestCase):
COMPONENT = 'tony'
INIT_ACTIONS = ['tony/tony.sh']
- @parameterized.expand(
- [
- ("STANDARD", "1.2"),
- ("STANDARD", "1.3"),
- ("STANDARD", "1.4"),
- ],
- testcase_func_name=DataprocTestCase.generate_verbose_test_name)
- def test_tony(self, configuration, dataproc_version):
- self.createCluster(configuration,
- self.INIT_ACTIONS,
- dataproc_version,
- timeout_in_minutes=30,
- machine_type="n1-standard-4")
+ @parameterized.parameters(
+ "SINGLE",
+ "STANDARD",
+ )
+ def test_tony(self, configuration):
+ # Init action supported on Dataproc 1.3+
+ if self.getImageVersion() < pkg_resources.parse_version("1.3"):
+ return
+
+ self.createCluster(
+ configuration,
+ self.INIT_ACTIONS,
+ timeout_in_minutes=30,
+ machine_type="n1-standard-4")
# Verify cluster using TensorFlow job
self.assert_dataproc_job(
@@ -53,4 +54,4 @@ def test_tony(self, configuration, dataproc_version):
if __name__ == '__main__':
- unittest.main()
+ absltest.main()
diff --git a/util/BUILD b/util/BUILD
new file mode 100644
index 000000000..c72804eea
--- /dev/null
+++ b/util/BUILD
@@ -0,0 +1,3 @@
+package(default_visibility = ["//visibility:public"])
+
+exports_files(["utils.sh"])
diff --git a/zookeeper/BUILD b/zookeeper/BUILD
new file mode 100644
index 000000000..51cbece99
--- /dev/null
+++ b/zookeeper/BUILD
@@ -0,0 +1,3 @@
+package(default_visibility = ["//visibility:public"])
+
+exports_files(["zookeeper.sh"])