-
Notifications
You must be signed in to change notification settings - Fork 174
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding Resize Spark #630
Adding Resize Spark #630
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
# Split files | ||
# Resize files | ||
|
||
Please see the set of | ||
[transform project conventions](../../README.md) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
# Split files | ||
# Resize files | ||
|
||
Please see the set of | ||
[transform project conventions](../../README.md) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
venv/ |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
test-data/output | ||
output/* | ||
/output/ | ||
data-processing-lib/ | ||
data-processing-spark/ | ||
|
||
|
||
# Byte-compiled / optimized / DLL files | ||
__pycache__/ | ||
*.py[cod] | ||
*$py.class | ||
|
||
|
||
# Distribution / packaging | ||
bin/ | ||
build/ | ||
develop-eggs/ | ||
dist/ | ||
eggs/ | ||
lib/ | ||
lib64/ | ||
parts/ | ||
sdist/ | ||
var/ | ||
*.egg-info/ | ||
.installed.cfg | ||
*.egg | ||
|
||
# Installer logs | ||
pip-log.txt | ||
pip-delete-this-directory.txt | ||
|
||
# Unit test / coverage reports | ||
.tox/ | ||
htmlcov | ||
.coverage | ||
.cache | ||
nosetests.xml | ||
coverage.xml |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
ARG BASE_IMAGE=quay.io/dataprep1/data-prep-kit/data-prep-kit-spark-3.5.2:0.2.1.dev0 | ||
FROM ${BASE_IMAGE} | ||
|
||
USER root | ||
# install pytest | ||
RUN pip install --no-cache-dir pytest | ||
|
||
WORKDIR ${SPARK_HOME}/work-dir | ||
|
||
# Copy in the data processing framework source/project and install it | ||
# This is expected to be placed in the docker context before this is run (see the make image). | ||
COPY --chown=spark:root data-processing-lib-python/ data-processing-lib-python/ | ||
RUN cd data-processing-lib-python && pip install --no-cache-dir -e . | ||
COPY --chown=spark:root data-processing-lib-spark/ data-processing-lib-spark/ | ||
RUN cd data-processing-lib-spark && pip install --no-cache-dir -e . | ||
COPY --chown=spark:root python-transform/ python-transform/ | ||
RUN cd python-transform && pip install --no-cache-dir -e . | ||
|
||
COPY --chown=root:root src/ src/ | ||
COPY --chown=root:root pyproject.toml pyproject.toml | ||
RUN pip install --no-cache-dir -e . | ||
|
||
# copy in the main() entry point to the image | ||
COPY ./src/resize_transform_spark.py . | ||
|
||
# Copy in some samples | ||
COPY ./src/resize_local_spark.py local/ | ||
|
||
# copy test | ||
COPY test/ test/ | ||
COPY test-data/ test-data/ | ||
|
||
USER spark | ||
|
||
# Set environment | ||
ENV PYTHONPATH=${SPARK_HOME}/work-dir/:${PYTHONPATH} | ||
|
||
# Put these at the end since they seem to upset the docker cache. | ||
ARG BUILD_DATE | ||
ARG GIT_COMMIT | ||
LABEL build-date=$BUILD_DATE | ||
LABEL git-commit=$GIT_COMMIT | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
# Define the root of the local git clone for the common rules to be able | ||
# know where they are running from. | ||
REPOROOT=../../../.. | ||
# Include a library of common .transform.* targets which most | ||
# transforms should be able to reuse. However, feel free | ||
# to override/redefine the rules below. | ||
|
||
include $(REPOROOT)/transforms/.make.transforms | ||
|
||
TRANSFORM_NAME=resize | ||
|
||
venv:: .transforms.spark-venv | ||
|
||
test:: .transforms.spark-test | ||
|
||
clean:: .transforms.clean | ||
|
||
image:: .transforms.spark-image | ||
|
||
test-src:: .transforms.test-src | ||
|
||
setup:: .transforms.setup | ||
|
||
build:: build-dist image | ||
|
||
publish: publish-image | ||
|
||
publish-image:: .transforms.publish-image-spark | ||
|
||
# set the version of python transform that this depends on. | ||
set-versions: | ||
$(MAKE) TRANSFORM_PYTHON_VERSION=${NOOP_PYTHON_VERSION} TOML_VERSION=$(NOOP_SPARK_VERSION) .transforms.set-versions | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NOOP -> RESIZE? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
|
||
build-dist:: .defaults.build-dist | ||
|
||
publish-dist:: .defaults.publish-dist | ||
|
||
test-image:: .transforms.spark-test-image | ||
|
||
run-cli-sample: .transforms.run-cli-spark-sample | ||
|
||
run-local-sample: .transforms.run-local-sample | ||
|
||
minio-start: .minio-start | ||
|
||
kind-load-image:: .transforms.kind-load-image | ||
|
||
docker-load-image: .defaults.docker-load-image | ||
|
||
docker-save-image: .defaults.docker-save-image |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
# Resize files | ||
|
||
Please see the set of | ||
[transform project conventions](../../README.md) | ||
for details on general project conventions, transform configuration, | ||
testing and IDE set up. | ||
|
||
## Summary | ||
|
||
This is a simple transformer that is resizing the input tables to a specified size. | ||
* resizing based on in-memory size of the tables. | ||
* resized based on the number of rows in the tables. | ||
|
||
## Building | ||
|
||
A [docker file](Dockerfile) that can be used for building docker image. You can use | ||
|
||
```shell | ||
make build | ||
``` | ||
|
||
## Configuration and command line Options | ||
|
||
The set of dictionary keys holding [BlockListTransform](src/blocklist_transform.py) | ||
configuration for values are as follows: | ||
|
||
* _max_rows_per_table_ - specifies max documents per table | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To better future-proof this file, shouldn't it defer to the python readme for configuration and CLI? |
||
* _max_mbytes_per_table - specifies max size of table, according to the _size_type_ value. | ||
* _size_type_ - indicates how table size is measured. Can be one of | ||
* memory - table size is measure by the in-process memory used by the table | ||
* disk - table size is estimated as the on-disk size of the parquet files. This is an estimate only | ||
as files are generally compressed on disk and so may not be exact due varying compression ratios. | ||
This is the default. | ||
|
||
Only one of the _max_rows_per_table_ and _max_mbytes_per_table_ may be used. | ||
|
||
## Running | ||
|
||
We also provide several demos of the transform usage for different data storage options, including | ||
[local file system](src/resize_local_ray.py) and [s3](src/resize_s3_ray.py). | ||
|
||
|
||
### Launched Command Line Options | ||
When running the transform with the Ray launcher (i.e. TransformLauncher), | ||
the following command line arguments are available in addition to | ||
[the options provided by the launcher](../../../data-processing-lib/doc/launcher-options.md) and map to the configuration keys above. | ||
|
||
``` | ||
--resize_max_rows_per_table RESIZE_MAX_ROWS_PER_TABLE | ||
Max number of rows per table | ||
--resize_max_mbytes_per_table RESIZE_MAX_MBYTES_PER_TABLE | ||
Max table size (MB). Size is measured according to the --resize_size_type parameter | ||
--resize_size_type {disk,memory} | ||
Determines how memory is measured when using the --resize_max_mbytes_per_table option. | ||
'memory' measures the in-process memory footprint and | ||
'disk' makes an estimate of the resulting parquet file size. | ||
``` | ||
|
||
|
||
### Transforming data using the transform image | ||
|
||
To use the transform image to transform your data, please refer to the | ||
[running images quickstart](../../../../doc/quick-start/run-transform-image.md), | ||
substituting the name of this transform image and runtime as appropriate. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
[project] | ||
name = "dpk_resize_transform_spark" | ||
version = "0.2.2.dev0" | ||
requires-python = ">=3.10" | ||
description = "Resize Spark Transform" | ||
license = {text = "Apache-2.0"} | ||
readme = {file = "README.md", content-type = "text/markdown"} | ||
authors = [ | ||
{ name = "David Wood", email = "[email protected]" }, | ||
{ name = "Boris Lublinsky", email = "[email protected]" }, | ||
] | ||
dependencies = [ | ||
"dpk-resize-transform-python==0.2.2.dev0", | ||
"data-prep-toolkit-spark==0.2.2.dev0", | ||
] | ||
|
||
[build-system] | ||
requires = ["setuptools>=68.0.0", "wheel", "setuptools_scm[toml]>=7.1.0"] | ||
build-backend = "setuptools.build_meta" | ||
|
||
[project.optional-dependencies] | ||
dev = [ | ||
"twine", | ||
"pytest>=7.3.2", | ||
"pytest-dotenv>=0.5.2", | ||
"pytest-env>=1.0.0", | ||
"pre-commit>=3.3.2", | ||
"pytest-cov>=4.1.0", | ||
"pytest-mock>=3.10.0", | ||
"moto==5.0.5", | ||
"markupsafe==2.0.1", | ||
] | ||
|
||
[options] | ||
package_dir = ["src","test"] | ||
|
||
[options.packages.find] | ||
where = ["src/"] | ||
|
||
[tool.pytest.ini_options] | ||
# Currently we use low coverage since we have to run tests separately (see makefile) | ||
#addopts = "--cov --cov-report term-missing --cov-fail-under 25" | ||
markers = ["unit: unit tests", "integration: integration tests"] | ||
|
||
[tool.coverage.run] | ||
include = ["src/*"] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
# (C) Copyright IBM Corp. 2024. | ||
# Licensed under the Apache License, Version 2.0 (the “License”); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an “AS IS” BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
################################################################################ | ||
|
||
import os | ||
import sys | ||
|
||
from data_processing.utils import ParamsUtils | ||
from data_processing_spark.runtime.spark import SparkTransformLauncher | ||
from resize_transform_spark import ResizeSparkTransformConfiguration | ||
|
||
|
||
# create parameters | ||
input_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "test-data", "input")) | ||
output_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "output")) | ||
local_conf = { | ||
"input_folder": input_folder, | ||
"output_folder": output_folder, | ||
} | ||
code_location = {"github": "github", "commit_hash": "12345", "path": "path"} | ||
params = { | ||
# Data access. Only required parameters are specified | ||
"data_local_config": ParamsUtils.convert_to_ast(local_conf), | ||
# execution info | ||
"runtime_parallelization": 1, | ||
"runtime_pipeline_id": "pipeline_id", | ||
"runtime_job_id": "job_id", | ||
"runtime_code_location": ParamsUtils.convert_to_ast(code_location), | ||
# resize configuration | ||
# "resize_max_mbytes_per_table": 0.02, | ||
"resize_max_rows_per_table": 300, | ||
} | ||
if __name__ == "__main__": | ||
# Set the simulated command line args | ||
sys.argv = ParamsUtils.dict_to_req(d=params) | ||
# create launcher | ||
launcher = SparkTransformLauncher(runtime_config=ResizeSparkTransformConfiguration()) | ||
# Launch the ray actor(s) to process the input | ||
launcher.launch() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
# (C) Copyright IBM Corp. 2024. | ||
# Licensed under the Apache License, Version 2.0 (the “License”); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an “AS IS” BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
################################################################################ | ||
from data_processing.utils import get_logger | ||
from data_processing_spark.runtime.spark import SparkTransformLauncher, SparkTransformRuntimeConfiguration | ||
from resize_transform import ResizeTransformConfiguration | ||
|
||
|
||
logger = get_logger(__name__) | ||
|
||
|
||
class ResizeSparkTransformConfiguration(SparkTransformRuntimeConfiguration): | ||
""" | ||
Implements the SparkTransformConfiguration for NOOP as required by the PythonTransformLauncher. | ||
NOOP does not use a RayRuntime class so the superclass only needs the base | ||
python-only configuration. | ||
""" | ||
|
||
def __init__(self): | ||
""" | ||
Initialization | ||
""" | ||
super().__init__(transform_config=ResizeTransformConfiguration()) | ||
|
||
|
||
if __name__ == "__main__": | ||
# create launcher | ||
launcher = SparkTransformLauncher(runtime_config=ResizeSparkTransformConfiguration()) | ||
logger.info("Launching resize transform") | ||
# Launch the ray actor(s) to process the input | ||
launcher.launch() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
0.2.1.dev0 is no longer used in dev. to be consistent with other spark transforms (recent change), use
latest
as the tag. Note however, that this is generally overridden from the Makefile anyway by setting BASE_IMAGE when docker build is called. But for consistency it would be nice to change tolatest
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done