Skip to content

Commit

Permalink
Merge branch 'dev' into dev1-singlepackage-p3.12
Browse files Browse the repository at this point in the history
  • Loading branch information
touma-I committed Oct 11, 2024
2 parents 33bae0e + 34ade66 commit f5e1ac7
Show file tree
Hide file tree
Showing 21 changed files with 573 additions and 17 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ The matrix below shows the the combination of modules and supported runtimes. Al
| [Filter on annotations](transforms/universal/filter/python/README.md) | :white_check_mark: | :white_check_mark: | :white_check_mark: | :white_check_mark: |
| [Profiler](transforms/universal/profiler/ray/README.md) | :white_check_mark: | :white_check_mark: | :white_check_mark: | :white_check_mark: |
| [Resize](transforms/universal/resize/python/README.md) | :white_check_mark: | :white_check_mark: | :white_check_mark: | :white_check_mark: |
| [HAP](transforms/universal/hap/python/README.md) | :white_check_mark: | | | |
| [HAP](transforms/universal/hap/python/README.md) | :white_check_mark: | :white_check_mark: | | |
| [Tokenizer](transforms/universal/tokenization/python/README.md) | :white_check_mark: | :white_check_mark: | | :white_check_mark: |
| **Language-only** | | | | |
| [Language identification](transforms/language/lang_id/python/README.md) | :white_check_mark: | :white_check_mark: | | :white_check_mark: |
Expand Down
8 changes: 5 additions & 3 deletions data-processing-lib/doc/spark-runtime.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@ of this parameter:

## Transforms

* [SparkTransformRuntimeConfiguration](../spark/src/data_processing_spark/transform/runtime_configuration.py) allows
to configure transform to use PySpark

* [SparkTransformRuntimeConfiguration](../spark/src/data_processing_spark/runtime/spark/runtime_configuration.py)
allows to configure transform to use PySpark. In addition to its base class
[TransformRuntimeConfiguration](../python//src/data_processing/runtime/runtime_configuration.py) features,
this class includes `get_bcast_params()` method to get very large configuration settings. Before starting the
transform execution, the Spark runtime will broadcast these settings to all the workers.

## Runtime

Expand Down
3 changes: 2 additions & 1 deletion data-processing-lib/spark/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ authors = [
dependencies = [
"data-prep-toolkit==0.2.2.dev1",
"pyspark>=3.5.2",
"psutil>=6.0.0"
"psutil>=6.0.0",
"PyYAML>=6.0.2"
]

[project_urls]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
# limitations under the License.
################################################################################

from typing import Any

from data_processing.data_access import DataAccessFactoryBase
from data_processing.runtime import TransformRuntimeConfiguration
from data_processing.transform import TransformConfiguration
from data_processing_spark.runtime.spark import DefaultSparkTransformRuntime
Expand All @@ -29,6 +32,16 @@ def __init__(
super().__init__(transform_config=transform_config)
self.runtime_class = runtime_class

def get_bcast_params(self, data_access_factory: DataAccessFactoryBase) -> dict[str, Any]:
"""Allows retrieving and broadcasting to all the workers very large
configuration parameters, like the list of document IDs to remove for
fuzzy dedup, or the list of blocked web domains for block listing. This
function is called by the spark runtime after spark initialization, and
before spark_context.parallelize()
:param data_access_factory - creates data_access object to download the large config parameter
"""
return {}

def create_transform_runtime(self) -> DefaultSparkTransformRuntime:
"""
Create transform runtime with the parameters captured during apply_input_params()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,69 @@
# limitations under the License.
################################################################################

import os
import socket
import time
import traceback
from datetime import datetime

import yaml
from data_processing.data_access import DataAccessFactoryBase
from data_processing.transform import TransformStatistics
from data_processing.utils import GB, get_logger
from data_processing_spark.runtime.spark import (
SparkTransformExecutionConfiguration,
SparkTransformFileProcessor,
SparkTransformRuntimeConfiguration,
SparkTransformExecutionConfiguration,
)
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession


logger = get_logger(__name__)


def _init_spark(runtime_config: SparkTransformRuntimeConfiguration) -> SparkSession:
server_port_https = int(os.getenv("KUBERNETES_SERVICE_PORT_HTTPS", "-1"))
if server_port_https == -1:
# running locally
spark_config = {"spark.driver.host": "127.0.0.1"}
return SparkSession.builder.appName(runtime_config.get_name()).config(map=spark_config).getOrCreate()
else:
# running in Kubernetes, use spark_profile.yml and
# environment variables for configuration
server_port = os.environ["KUBERNETES_SERVICE_PORT"]
master_url = f"k8s://https://kubernetes.default:{server_port}"

# Read Spark configuration profile
config_filepath = os.path.abspath(
os.path.join(os.getenv("SPARK_HOME"), "work-dir", "config", "spark_profile.yml")
)
with open(config_filepath, "r") as config_fp:
spark_config = yaml.safe_load(os.path.expandvars(config_fp.read()))
spark_config["spark.submit.deployMode"] = "client"

# configure the executor pods from template
executor_pod_template_file = os.path.join(
os.getenv("SPARK_HOME"),
"work-dir",
"src",
"templates",
"spark-executor-pod-template.yml",
)
spark_config["spark.kubernetes.executor.podTemplateFile"] = executor_pod_template_file
spark_config["spark.kubernetes.container.image.pullPolicy"] = "Always"

# Pass the driver IP address to the workers for callback
myservice_url = socket.gethostbyname(socket.gethostname())
spark_config["spark.driver.host"] = myservice_url
spark_config["spark.driver.bindAddress"] = "0.0.0.0"
spark_config["spark.decommission.enabled"] = True
logger.info(f"Launching Spark Session with configuration\n" f"{yaml.dump(spark_config, indent=2)}")
app_name = spark_config.get("spark.app.name", "my-spark-app")
return SparkSession.builder.master(master_url).appName(app_name).config(map=spark_config).getOrCreate()


def orchestrate(
runtime_config: SparkTransformRuntimeConfiguration,
execution_configuration: SparkTransformExecutionConfiguration,
Expand All @@ -45,14 +90,17 @@ def orchestrate(
logger.info(f"orchestrator started at {start_ts}")
# create data access
data_access = data_access_factory.create_data_access()
bcast_params = runtime_config.get_bcast_params(data_access_factory)
if data_access is None:
logger.error("No DataAccess instance provided - exiting")
return 1
# initialize Spark
conf = SparkConf().setAppName(runtime_config.get_name()).set("spark.driver.host", "127.0.0.1")
sc = SparkContext(conf=conf)
spark_session = _init_spark(runtime_config)
sc = spark_session.sparkContext
# broadcast
spark_runtime_config = sc.broadcast(runtime_config)
daf = sc.broadcast(data_access_factory)
spark_bcast_params = sc.broadcast(bcast_params)

def process_partition(iterator):
"""
Expand All @@ -63,6 +111,7 @@ def process_partition(iterator):
# local statistics dictionary
statistics = TransformStatistics()
# create transformer runtime
bcast_params = spark_bcast_params.value
d_access_factory = daf.value
runtime_conf = spark_runtime_config.value
runtime = runtime_conf.create_transform_runtime()
Expand All @@ -77,8 +126,11 @@ def process_partition(iterator):
logger.debug(f"partition {f}")
# add additional parameters
transform_params = (
runtime.get_transform_config(partition=int(f[1]), data_access_factory=d_access_factory,
statistics=statistics))
runtime.get_transform_config(
partition=int(f[1]), data_access_factory=d_access_factory, statistics=statistics
)
| bcast_params
)
# create transform with partition number
file_processor.create_transform(transform_params)
first = False
Expand Down Expand Up @@ -128,7 +180,7 @@ def process_partition(iterator):
memory = 0.0
for i in range(executors.size()):
memory += executors.toList().apply(i)._2()._1()
resources = {"cpus": cpus, "gpus": 0, "memory": round(memory/GB, 2), "object_store": 0}
resources = {"cpus": cpus, "gpus": 0, "memory": round(memory / GB, 2), "object_store": 0}
input_params = runtime_config.get_transform_metadata() | execution_configuration.get_input_params()
metadata = {
"pipeline": execution_configuration.pipeline_id,
Expand All @@ -143,7 +195,8 @@ def process_partition(iterator):
"execution_stats": {
"num partitions": num_partitions,
"execution time, min": round((time.time() - start_time) / 60, 3),
} | resources,
}
| resources,
"job_output_stats": stats,
}
logger.debug(f"Saving job metadata: {metadata}.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,29 @@ def get_transform_config(
"""
Get the dictionary of configuration that will be provided to the transform's initializer.
This is the opportunity for this runtime to create a new set of configuration based on the
config/params provided to this instance's initializer. This may include the addition
of new configuration data such as ray shared memory, new actors, etc, that might be needed and
expected by the transform in its initializer and/or transform() methods.
config/params provided to this instance's initializer.
:param partition - the partition assigned to this worker, needed by transforms like doc_id
:param data_access_factory - data access factory class being used by the RayOrchestrator.
:param statistics - reference to statistics actor
:return: dictionary of transform init params
"""
return self.params

def get_bcast_params(self, data_access_factory: DataAccessFactoryBase) -> dict[str, Any]:
"""Allows retrieving and broadcasting to all the workers very large
configuration parameters, like the list of document IDs to remove for
fuzzy dedup, or the list of blocked web domains for block listing. This
function is called by the spark runtime after spark initialization, and
before spark_context.parallelize()
:param data_access_factory - creates data_access object to download the large config parameter
"""
return {}

def compute_execution_stats(self, stats: TransformStatistics) -> None:
"""
Update/augment the given statistics object with runtime-specific additions/modifications.
This method does not return a value; the job execution statistics are generally reported
as metadata by the Spark Orchestrator.
:param stats: output of statistics as aggregated across all calls to all transforms.
:return: job execution statistics. These are generally reported as metadata by the Ray Orchestrator.
"""
pass
pass
42 changes: 42 additions & 0 deletions transforms/universal/hap/ray/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
ARG BASE_IMAGE=docker.io/rayproject/ray:2.24.0-py310
FROM ${BASE_IMAGE}

RUN pip install --upgrade --no-cache-dir pip

# install pytest
RUN pip install --no-cache-dir pytest

# Copy and install data processing libraries
# These are expected to be placed in the docker context before this is run (see the make image).
COPY --chown=ray:users data-processing-lib-python/ data-processing-lib-python/
RUN cd data-processing-lib-python && pip install --no-cache-dir -e .
COPY --chown=ray:users data-processing-lib-ray/ data-processing-lib-ray/
RUN cd data-processing-lib-ray && pip install --no-cache-dir -e .
COPY --chown=ray:users python-transform/ python-transform/
RUN cd python-transform && pip install --no-cache-dir -e .

#COPY requirements.txt requirements.txt
#RUN pip install --no-cache-dir -r requirements.txt

COPY --chown=ray:users src/ src/
COPY --chown=ray:users pyproject.toml pyproject.toml
RUN pip install --no-cache-dir -e .

# copy the main() entry point to the image
COPY ./src/hap_transform_ray.py .

# copy some of the samples in
COPY ./src/hap_local_ray.py local/

# copy test
COPY test/ test/
COPY test-data/ test-data/

# Set environment
ENV PYTHONPATH /home/ray

# Put these at the end since they seem to upset the docker cache.
ARG BUILD_DATE
ARG GIT_COMMIT
LABEL build-date=$BUILD_DATE
LABEL git-commit=$GIT_COMMIT
58 changes: 58 additions & 0 deletions transforms/universal/hap/ray/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Define the root of the local git clone for the common rules to be able
# know where they are running from.
REPOROOT=../../../..
# Include a library of common .transform.* targets which most
# transforms should be able to reuse. However, feel free
# to override/redefine the rules below.
include $(REPOROOT)/transforms/.make.transforms

TRANSFORM_NAME=hap

BASE_IMAGE=${RAY_BASE_IMAGE}
HAP_PYTHON_VERSION= $(DPK_VERSION)

venv:: .transforms.ray-venv

install:: pip install -r requirements.txt

test:: .transforms.ray-test

clean:: .transforms.clean

image:: .transforms.ray-image

test-src:: .transforms.test-src

setup:: .transforms.setup

test-image:: .transforms.ray-test-image

build:: build-dist image

publish: publish-image

publish-image:: .transforms.publish-image-ray

setup:: .transforms.setup

# distribution versions is the same as image version.
set-versions:
$(MAKE) TRANSFORM_PYTHON_VERSION=$(HAP_PYTHON_VERSION) TOML_VERSION=$(HAP_PYTHON_VERSION) .transforms.set-versions

build-dist:: set-versions .defaults.build-dist

publish-dist:: .defaults.publish-dist

run-cli-sample: .transforms.run-cli-ray-sample

run-local-sample: .transforms.run-local-ray-sample

run-s3-sample: .transforms.run-s3-ray-sample

minio-start: .minio-start

kind-load-image:: .transforms.kind-load-image

docker-load-image: .defaults.docker-load-image

docker-save-image: .defaults.docker-save-image
20 changes: 20 additions & 0 deletions transforms/universal/hap/ray/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Hate, Abuse, and Profanity (HAP) Annotation
# HAP Transform for Ray
Please see the set of
[transform project conventions](../../../README.md#transform-project-conventions)
for details on general project conventions, transform configuration,
testing and IDE set up.

## Summary
This project wraps the [hap transform](../python) with a Ray runtime.

## Configuration and command line Options

Configuration and command line options are the same as for the base python transform.

## Running

### Launched Command Line Options
In addition to those available to the transform as defined in [here](../python/README.md),
the set of
[ray launcher](../../../../data-processing-lib/doc/ray-launcher-options.md) are available.
Loading

0 comments on commit f5e1ac7

Please sign in to comment.