From ea2cc23d22e48a97fa07d083ba736ea24b0ee7ef Mon Sep 17 00:00:00 2001 From: blublinsky Date: Thu, 10 Oct 2024 19:05:39 +0100 Subject: [PATCH 1/9] added folder_transform --- .../pure_python/transform_file_processor.py | 15 ++++-- .../pure_python/transform_orchestrator.py | 42 ++++++++++------ .../runtime/transform_file_processor.py | 41 ++++++++------- .../src/data_processing/transform/__init__.py | 2 + .../transform/abstract_transform.py | 16 ++++++ .../transform/binary_transform.py | 5 +- .../transform/folder_transform.py | 50 +++++++++++++++++++ .../runtime/ray/transform_file_processor.py | 1 + .../runtime/ray/transform_orchestrator.py | 19 ++++--- .../runtime/spark/transform_file_processor.py | 5 +- .../runtime/spark/transform_orchestrator.py | 25 +++++++--- 11 files changed, 168 insertions(+), 53 deletions(-) create mode 100644 data-processing-lib/python/src/data_processing/transform/abstract_transform.py create mode 100644 data-processing-lib/python/src/data_processing/transform/folder_transform.py diff --git a/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_file_processor.py b/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_file_processor.py index 143835dd0a..fa3e69e4a3 100644 --- a/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_file_processor.py +++ b/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_file_processor.py @@ -14,7 +14,7 @@ from data_processing.data_access import DataAccessFactoryBase from data_processing.runtime import AbstractTransformFileProcessor -from data_processing.transform import AbstractBinaryTransform, TransformStatistics +from data_processing.transform import AbstractTransform, TransformStatistics from data_processing.utils import UnrecoverableException @@ -28,7 +28,8 @@ def __init__( data_access_factory: DataAccessFactoryBase, statistics: TransformStatistics, transform_params: dict[str, Any], - transform_class: type[AbstractBinaryTransform], + transform_class: type[AbstractTransform], + is_folder: bool, ): """ Init method @@ -36,11 +37,13 @@ def __init__( :param statistics - reference to statistics class :param transform_params - transform parameters :param transform_class: transform class + :param is_folder: folder transform flag """ # invoke superclass super().__init__( data_access_factory=data_access_factory, transform_parameters=dict(transform_params), + is_folder=is_folder, ) self.transform_params["statistics"] = statistics # Create local processor @@ -52,7 +55,8 @@ def __init__( # Create statistics self.stats = statistics - def _publish_stats(self, stats: dict[str, Any]) -> None: + +def _publish_stats(self, stats: dict[str, Any]) -> None: self.stats.add_stats(stats) @@ -65,17 +69,20 @@ def __init__( self, data_access_factory: DataAccessFactoryBase, transform_params: dict[str, Any], - transform_class: type[AbstractBinaryTransform], + transform_class: type[AbstractTransform], + is_folder: bool ): """ Init method :param data_access_factory - data access factory :param transform_params - transform parameters :param transform_class: transform class + :param is_folder: folder tranform flag """ super().__init__( data_access_factory=data_access_factory, transform_parameters=dict(transform_params), + is_folder=is_folder, ) # Add data access and statistics to the processor parameters self.transform_params["data_access"] = self.data_access diff --git a/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_orchestrator.py b/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_orchestrator.py index 8692da29e3..153eaaf0a2 100644 --- a/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_orchestrator.py +++ b/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_orchestrator.py @@ -24,7 +24,7 @@ PythonTransformFileProcessor, PythonTransformRuntimeConfiguration, ) -from data_processing.transform import AbstractBinaryTransform, TransformStatistics +from data_processing.transform import AbstractBinaryTransform, TransformStatistics, AbstractFolderTransform from data_processing.utils import GB, get_logger @@ -48,8 +48,6 @@ def _execution_resources() -> dict[str, Any]: "object_store": 0, } - - def orchestrate( data_access_factory: DataAccessFactoryBase, runtime_config: PythonTransformRuntimeConfiguration, @@ -74,15 +72,21 @@ def orchestrate( return 1 # create additional execution parameters runtime = runtime_config.create_transform_runtime() + is_folder = issubclass(runtime_config.get_transform_class(), AbstractFolderTransform) try: - # Get files to process - files, profile, retries = data_access.get_files_to_process() - if len(files) == 0: - logger.error("No input files to process - exiting") - return 0 - if retries > 0: - statistics.add_stats({"data access retries": retries}) - logger.info(f"Number of files is {len(files)}, source profile {profile}") + if is_folder: + # folder transform + files = AbstractFolderTransform.get_folders(data_access=data_access) + logger.info(f"Number of folders is {len(files)}") + else: + # Get files to process + files, profile, retries = data_access.get_files_to_process() + if len(files) == 0: + logger.error("No input files to process - exiting") + return 0 + if retries > 0: + statistics.add_stats({"data access retries": retries}) + logger.info(f"Number of files is {len(files)}, source profile {profile}") # Print interval print_interval = int(len(files) / 100) if print_interval == 0: @@ -99,6 +103,7 @@ def orchestrate( data_access_factory=data_access_factory, statistics=statistics, files=files ), transform_class=runtime_config.get_transform_class(), + is_folder=is_folder, ) else: # using sequential execution @@ -111,6 +116,7 @@ def orchestrate( data_access_factory=data_access_factory, statistics=statistics, files=files ), transform_class=runtime_config.get_transform_class(), + is_folder=is_folder, ) status = "success" return_code = 0 @@ -157,7 +163,8 @@ def _process_transforms( data_access_factory: DataAccessFactoryBase, statistics: TransformStatistics, transform_params: dict[str, Any], - transform_class: type[AbstractBinaryTransform], + transform_class: type[AbstractTransform], + is_folder: bool, ) -> None: """ Process transforms sequentially @@ -167,9 +174,8 @@ def _process_transforms( :param data_access_factory: data access factory :param transform_params - transform parameters :param transform_class: transform class + :param is_folder: folder transform flag :return: metadata for the execution - - :return: None """ # create executor executor = PythonTransformFileProcessor( @@ -177,6 +183,7 @@ def _process_transforms( statistics=statistics, transform_params=transform_params, transform_class=transform_class, + is_folder=is_folder, ) # process data t_start = time.time() @@ -203,6 +210,7 @@ def _process_transforms_multiprocessor( data_access_factory: DataAccessFactoryBase, transform_params: dict[str, Any], transform_class: type[AbstractBinaryTransform], + is_folder: bool ) -> TransformStatistics: """ Process transforms using multiprocessing pool @@ -212,13 +220,17 @@ def _process_transforms_multiprocessor( :param data_access_factory: data access factory :param transform_params - transform parameters :param transform_class: transform class + :param is_folder: folder transform class :return: metadata for the execution """ # result statistics statistics = TransformStatistics() # create processor processor = PythonPoolTransformFileProcessor( - data_access_factory=data_access_factory, transform_params=transform_params, transform_class=transform_class + data_access_factory=data_access_factory, + transform_params=transform_params, + transform_class=transform_class, + is_folder=is_folder, ) completed = 0 t_start = time.time() diff --git a/data-processing-lib/python/src/data_processing/runtime/transform_file_processor.py b/data-processing-lib/python/src/data_processing/runtime/transform_file_processor.py index d4ec548d8d..1d268875f3 100644 --- a/data-processing-lib/python/src/data_processing/runtime/transform_file_processor.py +++ b/data-processing-lib/python/src/data_processing/runtime/transform_file_processor.py @@ -26,11 +26,13 @@ def __init__( self, data_access_factory: DataAccessFactoryBase, transform_parameters: dict[str, Any], + is_folder: bool = False, ): """ Init method :param data_access_factory: Data Access Factory :param transform_parameters: Transform parameters + :param is_folder: folder transform flag """ self.logger = get_logger(__name__) # validate parameters @@ -46,6 +48,7 @@ def __init__( # Add data access and statistics to the processor parameters self.transform_params = transform_parameters self.transform_params["data_access"] = self.data_access + self.is_folder = is_folder def process_file(self, f_name: str) -> None: """ @@ -58,25 +61,29 @@ def process_file(self, f_name: str) -> None: self.logger.warning("No data_access found. Returning.") return t_start = time.time() - # Read source file - filedata, retries = self.data_access.get_file(path=f_name) - if retries > 0: - self._publish_stats({"data access retries": retries}) - if filedata is None: - self.logger.warning(f"File read resulted in None for {f_name}. Returning.") - self._publish_stats({"failed_reads": 1}) - return - self._publish_stats({"source_files": 1, "source_size": len(filedata)}) + if not self.is_folder: + # Read source file only if we are processing file + filedata, retries = self.data_access.get_file(path=f_name) + if retries > 0: + self._publish_stats({"data access retries": retries}) + if filedata is None: + self.logger.warning(f"File read resulted in None for {f_name}. Returning.") + self._publish_stats({"failed_reads": 1}) + return + self._publish_stats({"source_files": 1, "source_size": len(filedata)}) # Process input file try: - # execute local processing - name_extension = TransformUtils.get_file_extension(f_name) self.logger.debug(f"Begin transforming file {f_name}") - out_files, stats = self.transform.transform_binary(file_name=f_name, byte_array=filedata) + if not self.is_folder: + # execute local processing + out_files, stats = self.transform.transform_binary(file_name=f_name, byte_array=filedata) + name_extension = TransformUtils.get_file_extension(f_name) + self.last_file_name = name_extension[0] + self.last_file_name_next_index = None + self.last_extension = name_extension[1] + else: + out_files, stats = self.transform.transform(folder_name=f_name) self.logger.debug(f"Done transforming file {f_name}, got {len(out_files)} files") - self.last_file_name = name_extension[0] - self.last_file_name_next_index = None - self.last_extension = name_extension[1] # save results self._submit_file(t_start=t_start, out_files=out_files, stats=stats) # Process unrecoverable exceptions @@ -95,10 +102,10 @@ def flush(self) -> None: the hook for them to return back locally stored data and their statistics. :return: None """ - if self.last_file_name is None: + if self.last_file_name is None or self.is_folder: # for some reason a given worker never processed anything. Happens in testing # when the amount of workers is greater than the amount of files - self.logger.debug("skipping flush, no name for file is defined") + self.logger.debug("skipping flush, no name for file is defined or this is a folder transform") return try: t_start = time.time() diff --git a/data-processing-lib/python/src/data_processing/transform/__init__.py b/data-processing-lib/python/src/data_processing/transform/__init__.py index 6af43ad609..20254e47b2 100644 --- a/data-processing-lib/python/src/data_processing/transform/__init__.py +++ b/data-processing-lib/python/src/data_processing/transform/__init__.py @@ -1,3 +1,5 @@ +from data_processing.transform.abstract_transform import AbstractTransform +from data_processing.transform.folder_transform import AbstractFolderTransform from data_processing.transform.binary_transform import AbstractBinaryTransform from data_processing.transform.table_transform import AbstractTableTransform from data_processing.transform.transform_statistics import TransformStatistics diff --git a/data-processing-lib/python/src/data_processing/transform/abstract_transform.py b/data-processing-lib/python/src/data_processing/transform/abstract_transform.py new file mode 100644 index 0000000000..89db70f42b --- /dev/null +++ b/data-processing-lib/python/src/data_processing/transform/abstract_transform.py @@ -0,0 +1,16 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +class AbstractTransform: + """ + Base class for all transform types + """ \ No newline at end of file diff --git a/data-processing-lib/python/src/data_processing/transform/binary_transform.py b/data-processing-lib/python/src/data_processing/transform/binary_transform.py index 80dff61ea8..b313aff2fe 100644 --- a/data-processing-lib/python/src/data_processing/transform/binary_transform.py +++ b/data-processing-lib/python/src/data_processing/transform/binary_transform.py @@ -10,10 +10,11 @@ # limitations under the License. ################################################################################ -from typing import Any, TypeVar +from typing import Any +from data_processing.transform import AbstractTransform -class AbstractBinaryTransform: +class AbstractBinaryTransform(AbstractTransform): """ Converts input binary file to output file(s) (binary) Sub-classes must provide the transform() method to provide the conversion of one binary files to 0 or diff --git a/data-processing-lib/python/src/data_processing/transform/folder_transform.py b/data-processing-lib/python/src/data_processing/transform/folder_transform.py new file mode 100644 index 0000000000..866e3286fa --- /dev/null +++ b/data-processing-lib/python/src/data_processing/transform/folder_transform.py @@ -0,0 +1,50 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +from typing import Any +from data_processing.data_access import data_access +from data_processing.transform import AbstractTransform + + +class AbstractFolderTransform(AbstractTransform): + """ + Converts input folder to output file(s) (binary) + Sub-classes must provide the transform() method to provide the conversion of a folder to 0 or + more new binary files and metadata. + """ + + def __init__(self, config: dict[str, Any]): + """ + Initialize based on the dictionary of configuration information. + This simply stores the given instance in this instance for later use. + """ + self.config = config + + def transform(self, folder_name: str) -> tuple[list[tuple[bytes, str]], dict[str, Any]]: + """ + Converts input folder into o or more output files. + If there is an error, an exception must be raised - exit()ing is not generally allowed. + :param folder_name: the name of the folder containing arbitrary amount of files. + :return: a tuple of a list of 0 or more tuples and a dictionary of statistics that will be propagated + to metadata. Each element of the return list, is a tuple of the transformed bytes and a string + holding the extension to be used when writing out the new bytes. + """ + raise NotImplemented() + + @staticmethod + def get_folders(data_access:data_access) -> list(str): + """ + Compute the list of folders to use. + :param data_access - data access class + :return: + """ + raise NotImplemented() diff --git a/data-processing-lib/ray/src/data_processing_ray/runtime/ray/transform_file_processor.py b/data-processing-lib/ray/src/data_processing_ray/runtime/ray/transform_file_processor.py index e1fabb144e..cdad1309fb 100644 --- a/data-processing-lib/ray/src/data_processing_ray/runtime/ray/transform_file_processor.py +++ b/data-processing-lib/ray/src/data_processing_ray/runtime/ray/transform_file_processor.py @@ -35,6 +35,7 @@ def __init__(self, params: dict[str, Any]): super().__init__( data_access_factory=params.get("data_access_factory", None), transform_parameters=dict(params.get("transform_params", {})), + is_folder=params.get("is_folder", False) ) # Create statistics self.stats = params.get("statistics", None) diff --git a/data-processing-lib/ray/src/data_processing_ray/runtime/ray/transform_orchestrator.py b/data-processing-lib/ray/src/data_processing_ray/runtime/ray/transform_orchestrator.py index 42eba47a65..8276eb56c9 100644 --- a/data-processing-lib/ray/src/data_processing_ray/runtime/ray/transform_orchestrator.py +++ b/data-processing-lib/ray/src/data_processing_ray/runtime/ray/transform_orchestrator.py @@ -16,6 +16,7 @@ import ray from data_processing.data_access import DataAccessFactoryBase +from data_processing.transform import AbstractFolderTransform from data_processing_ray.runtime.ray import ( RayTransformExecutionConfiguration, RayTransformFileProcessor, @@ -56,13 +57,18 @@ def orchestrate( # create transformer runtime runtime = runtime_config.create_transform_runtime() resources = RayUtils.get_cluster_resources() + is_folder = issubclass(runtime_config.get_transform_class(), AbstractFolderTransform) try: - # Get files to process - files, profile, retries = data_access.get_files_to_process() - if len(files) == 0: - logger.error("No input files to process - exiting") - return 0 - logger.info(f"Number of files is {len(files)}, source profile {profile}") + if is_folder: + # folder transform + files = AbstractFolderTransform.get_folders(data_access=data_access) + logger.info(f"Number of folders is {len(files)}") # Get files to process + else: + files, profile, retries = data_access.get_files_to_process() + if len(files) == 0: + logger.error("No input files to process - exiting") + return 0 + logger.info(f"Number of files is {len(files)}, source profile {profile}") # Print interval print_interval = int(len(files) / 100) if print_interval == 0: @@ -84,6 +90,7 @@ def orchestrate( data_access_factory=data_access_factory, statistics=statistics, files=files ), "statistics": statistics, + "is_folder": is_folder, } logger.debug("Creating actors") processors = RayUtils.create_actors( diff --git a/data-processing-lib/spark/src/data_processing_spark/runtime/spark/transform_file_processor.py b/data-processing-lib/spark/src/data_processing_spark/runtime/spark/transform_file_processor.py index d63664ac46..a0968ab1df 100644 --- a/data-processing-lib/spark/src/data_processing_spark/runtime/spark/transform_file_processor.py +++ b/data-processing-lib/spark/src/data_processing_spark/runtime/spark/transform_file_processor.py @@ -29,12 +29,15 @@ def __init__( data_access_factory: DataAccessFactoryBase, runtime_configuration: SparkTransformRuntimeConfiguration, statistics: TransformStatistics, + is_folder: bool, ): """ Init method """ super().__init__( - data_access_factory=data_access_factory, transform_parameters=runtime_configuration.get_transform_params() + data_access_factory=data_access_factory, + transform_parameters=runtime_configuration.get_transform_params(), + is_folder=is_folder, ) # Add data access ant statistics to the processor parameters self.runtime_configuration = runtime_configuration diff --git a/data-processing-lib/spark/src/data_processing_spark/runtime/spark/transform_orchestrator.py b/data-processing-lib/spark/src/data_processing_spark/runtime/spark/transform_orchestrator.py index c279f2b73f..c534b685f7 100644 --- a/data-processing-lib/spark/src/data_processing_spark/runtime/spark/transform_orchestrator.py +++ b/data-processing-lib/spark/src/data_processing_spark/runtime/spark/transform_orchestrator.py @@ -18,7 +18,7 @@ import yaml from data_processing.data_access import DataAccessFactoryBase -from data_processing.transform import TransformStatistics +from data_processing.transform import TransformStatistics, AbstractFolderTransform from data_processing.utils import GB, get_logger from data_processing_spark.runtime.spark import ( SparkTransformExecutionConfiguration, @@ -117,7 +117,10 @@ def process_partition(iterator): runtime = runtime_conf.create_transform_runtime() # create file processor file_processor = SparkTransformFileProcessor( - data_access_factory=d_access_factory, runtime_configuration=runtime_conf, statistics=statistics + data_access_factory=d_access_factory, + runtime_configuration=runtime_conf, + statistics=statistics, + is_folder=is_folder, ) first = True for f in iterator: @@ -144,13 +147,19 @@ def process_partition(iterator): return list(statistics.get_execution_stats().items()) num_partitions = 0 + is_folder = issubclass(runtime_config.get_transform_class(), AbstractFolderTransform) try: - # Get files to process - files, profile, retries = data_access.get_files_to_process() - if len(files) == 0: - logger.error("No input files to process - exiting") - return 0 - logger.info(f"Number of files is {len(files)}, source profile {profile}") + if is_folder: + # folder transform + files = AbstractFolderTransform.get_folders(data_access=data_access) + logger.info(f"Number of folders is {len(files)}") # Get files to process + else: + # Get files to process + files, profile, retries = data_access.get_files_to_process() + if len(files) == 0: + logger.error("No input files to process - exiting") + return 0 + logger.info(f"Number of files is {len(files)}, source profile {profile}") # process data logger.debug("Begin processing files") # process files split by partitions From 4d30ac3cc8ade8e94569f52533e92260d60f5af2 Mon Sep 17 00:00:00 2001 From: blublinsky Date: Thu, 10 Oct 2024 19:13:01 +0100 Subject: [PATCH 2/9] added folder_transform --- .../runtime/pure_python/transform_orchestrator.py | 2 +- .../python/src/data_processing/transform/folder_transform.py | 4 ++-- .../data_processing_ray/runtime/ray/transform_orchestrator.py | 2 +- .../runtime/spark/transform_orchestrator.py | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_orchestrator.py b/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_orchestrator.py index 153eaaf0a2..d51f80a8ad 100644 --- a/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_orchestrator.py +++ b/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_orchestrator.py @@ -76,7 +76,7 @@ def orchestrate( try: if is_folder: # folder transform - files = AbstractFolderTransform.get_folders(data_access=data_access) + files = AbstractFolderTransform.get_folders(d_access=data_access) logger.info(f"Number of folders is {len(files)}") else: # Get files to process diff --git a/data-processing-lib/python/src/data_processing/transform/folder_transform.py b/data-processing-lib/python/src/data_processing/transform/folder_transform.py index 866e3286fa..eca191bbb2 100644 --- a/data-processing-lib/python/src/data_processing/transform/folder_transform.py +++ b/data-processing-lib/python/src/data_processing/transform/folder_transform.py @@ -41,10 +41,10 @@ def transform(self, folder_name: str) -> tuple[list[tuple[bytes, str]], dict[str raise NotImplemented() @staticmethod - def get_folders(data_access:data_access) -> list(str): + def get_folders(d_access: data_access) -> list(str): """ Compute the list of folders to use. - :param data_access - data access class + :param d_access - data access class :return: """ raise NotImplemented() diff --git a/data-processing-lib/ray/src/data_processing_ray/runtime/ray/transform_orchestrator.py b/data-processing-lib/ray/src/data_processing_ray/runtime/ray/transform_orchestrator.py index 8276eb56c9..a8ff957295 100644 --- a/data-processing-lib/ray/src/data_processing_ray/runtime/ray/transform_orchestrator.py +++ b/data-processing-lib/ray/src/data_processing_ray/runtime/ray/transform_orchestrator.py @@ -61,7 +61,7 @@ def orchestrate( try: if is_folder: # folder transform - files = AbstractFolderTransform.get_folders(data_access=data_access) + files = AbstractFolderTransform.get_folders(d_access=data_access) logger.info(f"Number of folders is {len(files)}") # Get files to process else: files, profile, retries = data_access.get_files_to_process() diff --git a/data-processing-lib/spark/src/data_processing_spark/runtime/spark/transform_orchestrator.py b/data-processing-lib/spark/src/data_processing_spark/runtime/spark/transform_orchestrator.py index c534b685f7..4a08979520 100644 --- a/data-processing-lib/spark/src/data_processing_spark/runtime/spark/transform_orchestrator.py +++ b/data-processing-lib/spark/src/data_processing_spark/runtime/spark/transform_orchestrator.py @@ -151,7 +151,7 @@ def process_partition(iterator): try: if is_folder: # folder transform - files = AbstractFolderTransform.get_folders(data_access=data_access) + files = AbstractFolderTransform.get_folders(d_access=data_access) logger.info(f"Number of folders is {len(files)}") # Get files to process else: # Get files to process From a772d1d876c1a5d70c75ab687a41b6b69610d9f7 Mon Sep 17 00:00:00 2001 From: blublinsky Date: Thu, 10 Oct 2024 21:00:43 +0100 Subject: [PATCH 3/9] added folder_transform --- .../runtime/pure_python/transform_file_processor.py | 3 +-- .../runtime/pure_python/transform_orchestrator.py | 11 ++++++----- .../runtime/pure_python/transform_runtime.py | 10 +++++++++- .../data_processing/transform/folder_transform.py | 12 +----------- .../runtime/ray/transform_orchestrator.py | 2 +- .../runtime/ray/transform_runtime.py | 10 +++++++++- 6 files changed, 27 insertions(+), 21 deletions(-) diff --git a/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_file_processor.py b/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_file_processor.py index fa3e69e4a3..44ccd0ef0d 100644 --- a/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_file_processor.py +++ b/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_file_processor.py @@ -55,8 +55,7 @@ def __init__( # Create statistics self.stats = statistics - -def _publish_stats(self, stats: dict[str, Any]) -> None: + def _publish_stats(self, stats: dict[str, Any]) -> None: self.stats.add_stats(stats) diff --git a/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_orchestrator.py b/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_orchestrator.py index d51f80a8ad..812be8cafc 100644 --- a/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_orchestrator.py +++ b/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_orchestrator.py @@ -24,14 +24,13 @@ PythonTransformFileProcessor, PythonTransformRuntimeConfiguration, ) -from data_processing.transform import AbstractBinaryTransform, TransformStatistics, AbstractFolderTransform +from data_processing.transform import AbstractTransform, TransformStatistics, AbstractFolderTransform from data_processing.utils import GB, get_logger logger = get_logger(__name__) -@staticmethod def _execution_resources() -> dict[str, Any]: """ Get Execution resource @@ -48,6 +47,7 @@ def _execution_resources() -> dict[str, Any]: "object_store": 0, } + def orchestrate( data_access_factory: DataAccessFactoryBase, runtime_config: PythonTransformRuntimeConfiguration, @@ -76,7 +76,7 @@ def orchestrate( try: if is_folder: # folder transform - files = AbstractFolderTransform.get_folders(d_access=data_access) + files = runtime.get_folders(data_access=data_access) logger.info(f"Number of folders is {len(files)}") else: # Get files to process @@ -145,7 +145,8 @@ def orchestrate( "job_input_params": input_params | data_access_factory.get_input_params() | execution_config.get_input_params(), - "execution_stats": _execution_resources() | {"execution time, min": round((time.time() - start_time) / 60.0, 3)}, + "execution_stats": _execution_resources() | + {"execution time, min": round((time.time() - start_time) / 60.0, 3)}, "job_output_stats": stats, } logger.debug(f"Saving job metadata: {metadata}.") @@ -209,7 +210,7 @@ def _process_transforms_multiprocessor( print_interval: int, data_access_factory: DataAccessFactoryBase, transform_params: dict[str, Any], - transform_class: type[AbstractBinaryTransform], + transform_class: type[AbstractTransform], is_folder: bool ) -> TransformStatistics: """ diff --git a/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_runtime.py b/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_runtime.py index 4173154aee..478d40837c 100644 --- a/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_runtime.py +++ b/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_runtime.py @@ -12,7 +12,7 @@ from typing import Any -from data_processing.data_access import DataAccessFactoryBase +from data_processing.data_access import DataAccessFactoryBase, DataAccess from data_processing.transform import TransformStatistics @@ -28,6 +28,14 @@ def __init__(self, params: dict[str, Any]): """ self.params = params + def get_folders(self, data_access: DataAccess) -> list[str]: + """ + Get folders to process + :param data_access: data access + :return: list of folders to process + """ + raise NotImplemented() + def get_transform_config( self, data_access_factory: DataAccessFactoryBase, statistics: TransformStatistics, files: list[str] ) -> dict[str, Any]: diff --git a/data-processing-lib/python/src/data_processing/transform/folder_transform.py b/data-processing-lib/python/src/data_processing/transform/folder_transform.py index eca191bbb2..9a2fb3713a 100644 --- a/data-processing-lib/python/src/data_processing/transform/folder_transform.py +++ b/data-processing-lib/python/src/data_processing/transform/folder_transform.py @@ -11,7 +11,6 @@ ################################################################################ from typing import Any -from data_processing.data_access import data_access from data_processing.transform import AbstractTransform @@ -38,13 +37,4 @@ def transform(self, folder_name: str) -> tuple[list[tuple[bytes, str]], dict[str to metadata. Each element of the return list, is a tuple of the transformed bytes and a string holding the extension to be used when writing out the new bytes. """ - raise NotImplemented() - - @staticmethod - def get_folders(d_access: data_access) -> list(str): - """ - Compute the list of folders to use. - :param d_access - data access class - :return: - """ - raise NotImplemented() + raise NotImplemented() \ No newline at end of file diff --git a/data-processing-lib/ray/src/data_processing_ray/runtime/ray/transform_orchestrator.py b/data-processing-lib/ray/src/data_processing_ray/runtime/ray/transform_orchestrator.py index a8ff957295..b29682997a 100644 --- a/data-processing-lib/ray/src/data_processing_ray/runtime/ray/transform_orchestrator.py +++ b/data-processing-lib/ray/src/data_processing_ray/runtime/ray/transform_orchestrator.py @@ -61,7 +61,7 @@ def orchestrate( try: if is_folder: # folder transform - files = AbstractFolderTransform.get_folders(d_access=data_access) + files = runtime.get_folders(data_access=data_access) logger.info(f"Number of folders is {len(files)}") # Get files to process else: files, profile, retries = data_access.get_files_to_process() diff --git a/data-processing-lib/ray/src/data_processing_ray/runtime/ray/transform_runtime.py b/data-processing-lib/ray/src/data_processing_ray/runtime/ray/transform_runtime.py index 57f071406e..64479302c4 100644 --- a/data-processing-lib/ray/src/data_processing_ray/runtime/ray/transform_runtime.py +++ b/data-processing-lib/ray/src/data_processing_ray/runtime/ray/transform_runtime.py @@ -12,7 +12,7 @@ from typing import Any -from data_processing.data_access import DataAccessFactoryBase +from data_processing.data_access import DataAccessFactoryBase, DataAccess from ray.actor import ActorHandle @@ -28,6 +28,14 @@ def __init__(self, params: dict[str, Any]): """ self.params = params + def get_folders(self, data_access: DataAccess) -> list[str]: + """ + Get folders to process + :param data_access: data access + :return: list of folders to process + """ + raise NotImplemented() + def get_transform_config( self, data_access_factory: DataAccessFactoryBase, statistics: ActorHandle, files: list[str] ) -> dict[str, Any]: From 2f28ab4457cfa1f3c5bbf109cdccc0a85b90cb6c Mon Sep 17 00:00:00 2001 From: blublinsky Date: Fri, 11 Oct 2024 08:48:00 +0100 Subject: [PATCH 4/9] added folder_transform --- .../runtime/spark/transform_orchestrator.py | 3 ++- .../runtime/spark/transform_runtime.py | 10 +++++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/data-processing-lib/spark/src/data_processing_spark/runtime/spark/transform_orchestrator.py b/data-processing-lib/spark/src/data_processing_spark/runtime/spark/transform_orchestrator.py index 4a08979520..096fab2724 100644 --- a/data-processing-lib/spark/src/data_processing_spark/runtime/spark/transform_orchestrator.py +++ b/data-processing-lib/spark/src/data_processing_spark/runtime/spark/transform_orchestrator.py @@ -151,7 +151,8 @@ def process_partition(iterator): try: if is_folder: # folder transform - files = AbstractFolderTransform.get_folders(d_access=data_access) + runtime = runtime_config.create_transform_runtime() + files = runtime.get_folders(data_access=data_access) logger.info(f"Number of folders is {len(files)}") # Get files to process else: # Get files to process diff --git a/data-processing-lib/spark/src/data_processing_spark/runtime/spark/transform_runtime.py b/data-processing-lib/spark/src/data_processing_spark/runtime/spark/transform_runtime.py index 7b968b1e93..7410d09d1a 100644 --- a/data-processing-lib/spark/src/data_processing_spark/runtime/spark/transform_runtime.py +++ b/data-processing-lib/spark/src/data_processing_spark/runtime/spark/transform_runtime.py @@ -12,7 +12,7 @@ from typing import Any -from data_processing.data_access import DataAccessFactoryBase +from data_processing.data_access import DataAccessFactoryBase, DataAccess from data_processing.transform import TransformStatistics @@ -28,6 +28,14 @@ def __init__(self, params: dict[str, Any]): """ self.params = params + def get_folders(self, data_access: DataAccess) -> list[str]: + """ + Get folders to process + :param data_access: data access + :return: list of folders to process + """ + raise NotImplemented() + def get_transform_config( self, partition: int, data_access_factory: DataAccessFactoryBase, statistics: TransformStatistics ) -> dict[str, Any]: From b3588efa391c714c93c0a8f19026636b16404e34 Mon Sep 17 00:00:00 2001 From: blublinsky Date: Fri, 11 Oct 2024 15:35:00 +0100 Subject: [PATCH 5/9] added noop testing --- .../runtime/transform_file_processor.py | 44 +++++--- .../test_support/transform/__init__.py | 13 ++- .../transform/noop_folder_transform.py | 105 ++++++++++++++++++ .../test_support/transform/noop_transform.py | 6 +- .../transform/folder_transform.py | 2 +- .../transform/transform_configuration.py | 6 +- .../transform/test_folders_noop.py | 33 ++++++ .../launch/ray/ray_test_noop_launch.py | 6 - .../ededup/ray/src/ededup_transform_ray.py | 9 +- 9 files changed, 187 insertions(+), 37 deletions(-) create mode 100644 data-processing-lib/python/src/data_processing/test_support/transform/noop_folder_transform.py create mode 100644 data-processing-lib/python/test/data_processing_tests/transform/test_folders_noop.py diff --git a/data-processing-lib/python/src/data_processing/runtime/transform_file_processor.py b/data-processing-lib/python/src/data_processing/runtime/transform_file_processor.py index 1d268875f3..4075f40bef 100644 --- a/data-processing-lib/python/src/data_processing/runtime/transform_file_processor.py +++ b/data-processing-lib/python/src/data_processing/runtime/transform_file_processor.py @@ -83,6 +83,7 @@ def process_file(self, f_name: str) -> None: self.last_extension = name_extension[1] else: out_files, stats = self.transform.transform(folder_name=f_name) + self.last_file_name = f_name self.logger.debug(f"Done transforming file {f_name}, got {len(out_files)} files") # save results self._submit_file(t_start=t_start, out_files=out_files, stats=stats) @@ -148,15 +149,21 @@ def _submit_file(self, t_start: float, out_files: list[tuple[bytes, str]], stats ) case 1: # we have exactly 1 output file - file_ext = out_files[0] - lfn = self.last_file_name - if self.last_file_name_next_index is not None: - lfn = f"{lfn}_{self.last_file_name_next_index}" - output_name = self.data_access.get_output_location(path=f"{lfn}{file_ext[1]}") + if self.is_folder: + # its folder + output_name = out_files[0][1] + dt = out_files[0][0] + else: + file_ext = out_files[0] + lfn = self.last_file_name + if self.last_file_name_next_index is not None: + lfn = f"{lfn}_{self.last_file_name_next_index}" + output_name = self.data_access.get_output_location(path=f"{lfn}{file_ext[1]}") + dt = file_ext[0] self.logger.debug( f"Writing transformed file {self.last_file_name}{self.last_extension} to {output_name}" ) - save_res, retries = self.data_access.save_file(path=output_name, data=file_ext[0]) + save_res, retries = self.data_access.save_file(path=output_name, data=dt) if retries > 0: self._publish_stats({"data access retries": retries}) if save_res is None: @@ -166,7 +173,7 @@ def _submit_file(self, t_start: float, out_files: list[tuple[bytes, str]], stats self._publish_stats( { "result_files": 1, - "result_size": len(file_ext[0]), + "result_size": len(dt), "processing_time": time.time() - t_start, } ) @@ -183,14 +190,21 @@ def _submit_file(self, t_start: float, out_files: list[tuple[bytes, str]], stats start_index = 0 count = len(out_files) for index in range(count): - file_ext = out_files[index] - output_name_indexed = f"{output_file_name}_{start_index + index}{file_ext[1]}" - file_sizes += len(file_ext[0]) - self.logger.debug( - f"Writing transformed file {self.last_file_name}{self.last_extension}, {index + 1} " - f"of {count} to {output_name_indexed}" - ) - save_res, retries = self.data_access.save_file(path=output_name_indexed, data=file_ext[0]) + if self.is_folder: + # its a folder + output_name_indexed = out_files[index][1] + dt = out_files[index][0] + else: + # files + file_ext = out_files[index] + output_name_indexed = f"{output_file_name}_{start_index + index}{file_ext[1]}" + self.logger.debug( + f"Writing transformed file {self.last_file_name}{self.last_extension}, {index + 1} " + f"of {count} to {output_name_indexed}" + ) + dt = file_ext[0] + file_sizes += len(dt) + save_res, retries = self.data_access.save_file(path=output_name_indexed, data=dt) if retries > 0: self._publish_stats({"data access retries": retries}) if save_res is None: diff --git a/data-processing-lib/python/src/data_processing/test_support/transform/__init__.py b/data-processing-lib/python/src/data_processing/test_support/transform/__init__.py index 0e90f7ffdc..04d6f3b0f7 100644 --- a/data-processing-lib/python/src/data_processing/test_support/transform/__init__.py +++ b/data-processing-lib/python/src/data_processing/test_support/transform/__init__.py @@ -1,6 +1,11 @@ -from .table_transform_test import AbstractTableTransformTest -from .binary_transform_test import AbstractBinaryTransformTest -from .noop_transform import ( +from data_processing.test_support.transform.table_transform_test import AbstractTableTransformTest +from data_processing.test_support.transform.binary_transform_test import AbstractBinaryTransformTest +from data_processing.test_support.transform.noop_transform import ( NOOPTransform, - NOOPPythonTransformConfiguration, + NOOPTransformConfiguration, + NOOPPythonTransformConfiguration ) +from data_processing.test_support.transform.noop_folder_transform import ( + NOOPFolderTransform, + NOOPFolderPythonTransformConfiguration +) \ No newline at end of file diff --git a/data-processing-lib/python/src/data_processing/test_support/transform/noop_folder_transform.py b/data-processing-lib/python/src/data_processing/test_support/transform/noop_folder_transform.py new file mode 100644 index 0000000000..5baab78587 --- /dev/null +++ b/data-processing-lib/python/src/data_processing/test_support/transform/noop_folder_transform.py @@ -0,0 +1,105 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import time +from typing import Any + +from data_processing.data_access import DataAccess +from data_processing.runtime.pure_python import ( + PythonTransformLauncher, + PythonTransformRuntimeConfiguration, + DefaultPythonTransformRuntime) +from data_processing.transform import AbstractFolderTransform +from data_processing.utils import get_logger +from data_processing.test_support.transform import NOOPTransformConfiguration + + +logger = get_logger(__name__) + + +class NOOPFolderTransform(AbstractFolderTransform): + """ + Implements a simple copy of a pyarrow Table. + """ + + def __init__(self, config: dict[str, Any]): + """ + Initialize based on the dictionary of configuration information. + This is generally called with configuration parsed from the CLI arguments defined + by the companion runtime, NOOPTransformRuntime. If running inside the RayMutatingDriver, + these will be provided by that class with help from the RayMutatingDriver. + """ + # Make sure that the param name corresponds to the name used in apply_input_params method + # of NOOPTransformConfiguration class + super().__init__(config) + self.sleep = config.get("sleep_sec", 1) + self.data_access = config.get("data_access") + + def transform(self, folder_name: str) -> tuple[list[tuple[bytes, str]], dict[str, Any]]: + """ + Converts input folder into o or more output files. + If there is an error, an exception must be raised - exit()ing is not generally allowed. + :param folder_name: the name of the folder containing arbitrary amount of files. + :return: a tuple of a list of 0 or more tuples and a dictionary of statistics that will be propagated + to metadata. Each element of the return list, is a tuple of the transformed bytes and a string + holding the file name to use. + """ + logger.debug(f"Transforming one folder {folder_name}") + metadata = {} + # get folder files + files, retries = self.data_access.get_folder_files(path=folder_name) + if retries > 0: + metadata |= {"data access retries": retries} + result = [()] * len(files) + index = 0 + for name, file in files.items(): + result[index] = (file, self.data_access.get_output_location(name)) + if self.sleep is not None: + logger.info(f"Sleep for {self.sleep} seconds") + time.sleep(self.sleep) + logger.info("Sleep completed - continue") + index += 1 + # Add some sample metadata. + metadata |= {"nfiles": len(files)} + return result, metadata + + +class NOOPFolderPythonRuntime(DefaultPythonTransformRuntime): + def get_folders(self, data_access: DataAccess) -> list[str]: + """ + Get folders to process + :param data_access: data access + :return: list of folders to process + """ + return [data_access.get_input_folder()] + + +class NOOPFolderPythonTransformConfiguration(PythonTransformRuntimeConfiguration): + """ + Implements the PythonTransformConfiguration for NOOP as required by the PythonTransformLauncher. + NOOP does not use a RayRuntime class so the superclass only needs the base + python-only configuration. + """ + + def __init__(self): + """ + Initialization + """ + super().__init__(transform_config=NOOPTransformConfiguration(clazz=NOOPFolderTransform), + runtime_class=NOOPFolderPythonRuntime) + + +if __name__ == "__main__": + # launcher = NOOPRayLauncher() + launcher = PythonTransformLauncher(NOOPFolderPythonTransformConfiguration()) + logger.info("Launching noop transform") + launcher.launch() diff --git a/data-processing-lib/python/src/data_processing/test_support/transform/noop_transform.py b/data-processing-lib/python/src/data_processing/test_support/transform/noop_transform.py index 0dee013a40..2fea355060 100644 --- a/data-processing-lib/python/src/data_processing/test_support/transform/noop_transform.py +++ b/data-processing-lib/python/src/data_processing/test_support/transform/noop_transform.py @@ -19,7 +19,7 @@ from data_processing.runtime.pure_python.runtime_configuration import ( PythonTransformRuntimeConfiguration, ) -from data_processing.transform import AbstractTableTransform, TransformConfiguration +from data_processing.transform import AbstractTableTransform, TransformConfiguration, AbstractTransform from data_processing.utils import CLIArgumentProvider, get_logger @@ -75,10 +75,10 @@ class NOOPTransformConfiguration(TransformConfiguration): configuration with CLI args. """ - def __init__(self): + def __init__(self, clazz: type[AbstractTransform] = NOOPTransform): super().__init__( name=short_name, - transform_class=NOOPTransform, + transform_class=clazz, remove_from_metadata=[pwd_key], ) diff --git a/data-processing-lib/python/src/data_processing/transform/folder_transform.py b/data-processing-lib/python/src/data_processing/transform/folder_transform.py index 9a2fb3713a..caa3bfa52b 100644 --- a/data-processing-lib/python/src/data_processing/transform/folder_transform.py +++ b/data-processing-lib/python/src/data_processing/transform/folder_transform.py @@ -35,6 +35,6 @@ def transform(self, folder_name: str) -> tuple[list[tuple[bytes, str]], dict[str :param folder_name: the name of the folder containing arbitrary amount of files. :return: a tuple of a list of 0 or more tuples and a dictionary of statistics that will be propagated to metadata. Each element of the return list, is a tuple of the transformed bytes and a string - holding the extension to be used when writing out the new bytes. + holding the file name to use. """ raise NotImplemented() \ No newline at end of file diff --git a/data-processing-lib/python/src/data_processing/transform/transform_configuration.py b/data-processing-lib/python/src/data_processing/transform/transform_configuration.py index 033e92f2a2..a5c9ec9ad6 100644 --- a/data-processing-lib/python/src/data_processing/transform/transform_configuration.py +++ b/data-processing-lib/python/src/data_processing/transform/transform_configuration.py @@ -13,7 +13,7 @@ from argparse import ArgumentParser from typing import Any -from data_processing.transform import AbstractBinaryTransform +from data_processing.transform import AbstractTransform from data_processing.utils import CLIArgumentProvider @@ -23,7 +23,7 @@ class TransformConfiguration(CLIArgumentProvider): """ def __init__( - self, name: str, transform_class: type[AbstractBinaryTransform], remove_from_metadata: list[str] = [] + self, name: str, transform_class: type[AbstractTransform], remove_from_metadata: list[str] = [] ): """ Initialization @@ -36,7 +36,7 @@ def __init__( self.remove_from_metadata = remove_from_metadata self.params = {} - def get_transform_class(self) -> type[AbstractBinaryTransform]: + def get_transform_class(self) -> type[AbstractTransform]: """ Get the class extending AbstractBinaryTransform which implements a specific transformation. The class will generally be instantiated with a dictionary of configuration produced by diff --git a/data-processing-lib/python/test/data_processing_tests/transform/test_folders_noop.py b/data-processing-lib/python/test/data_processing_tests/transform/test_folders_noop.py new file mode 100644 index 0000000000..e0fdd86c8f --- /dev/null +++ b/data-processing-lib/python/test/data_processing_tests/transform/test_folders_noop.py @@ -0,0 +1,33 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import os + +from data_processing.test_support.launch.transform_test import ( + AbstractTransformLauncherTest, +) +from data_processing.runtime.pure_python import PythonTransformLauncher +from data_processing.test_support.transform import NOOPFolderPythonTransformConfiguration + + +class TestRayNOOPTransform(AbstractTransformLauncherTest): + """ + Extends the super-class to define the test data for the tests defined there. + The name of this class MUST begin with the word Test so that pytest recognizes it as a test class. + """ + + def get_test_transform_fixtures(self) -> list[tuple]: + basedir = "../../../test-data/data_processing/python/noop/" + basedir = os.path.abspath(os.path.join(os.path.dirname(__file__), basedir)) + launcher = PythonTransformLauncher(NOOPFolderPythonTransformConfiguration()) + fixtures = [(launcher, {"noop_sleep_sec": 0}, basedir + "/input", basedir + "/expected")] + return fixtures diff --git a/data-processing-lib/ray/test/data_processing_ray_tests/launch/ray/ray_test_noop_launch.py b/data-processing-lib/ray/test/data_processing_ray_tests/launch/ray/ray_test_noop_launch.py index d4cc874f01..e706a4dfad 100644 --- a/data-processing-lib/ray/test/data_processing_ray_tests/launch/ray/ray_test_noop_launch.py +++ b/data-processing-lib/ray/test/data_processing_ray_tests/launch/ray/ray_test_noop_launch.py @@ -12,7 +12,6 @@ import os -import pyarrow as pa from data_processing.test_support.launch.transform_test import ( AbstractTransformLauncherTest, ) @@ -20,11 +19,6 @@ from data_processing_ray.test_support.transform import NOOPRayTransformConfiguration -table = pa.Table.from_pydict({"name": pa.array(["Tom"]), "age": pa.array([23])}) -expected_table = table # We're a noop after all. -expected_metadata_list = [{"nfiles": 1, "nrows": 1}, {}] # transform() result # flush() result - - class TestRayNOOPTransform(AbstractTransformLauncherTest): """ Extends the super-class to define the test data for the tests defined there. diff --git a/transforms/universal/ededup/ray/src/ededup_transform_ray.py b/transforms/universal/ededup/ray/src/ededup_transform_ray.py index c0823a22ec..d90dfa7808 100644 --- a/transforms/universal/ededup/ray/src/ededup_transform_ray.py +++ b/transforms/universal/ededup/ray/src/ededup_transform_ray.py @@ -149,13 +149,12 @@ def _load_snapshots(self, data_access_factory: DataAccessFactoryBase, statistics statistics.add_stats.remote({"data access retries": retries}) self.logger.info(f"Found the following snapshot files {files.keys()}") # process snapshot files - for file in files.keys(): - # load the file + for file in files.values(): + # convert the file try: - b_hashes, _ = data_access.get_file(file) - snaps = pickle.loads(b_hashes) + snaps = pickle.loads(file) except Exception as e: - self.logger.warning(f"Failed to load hashes from file {file} with exception {e}") + self.logger.warning(f"Failed to load hashes with exception {e}") raise UnrecoverableException("failed to load hashes") request = [[] for _ in range(len(self.filters))] for h in snaps: From a0a1400069fdf02f7fbec7b029a888fcb8ec4211 Mon Sep 17 00:00:00 2001 From: blublinsky Date: Sun, 13 Oct 2024 08:53:49 +0100 Subject: [PATCH 6/9] added noop Ray testing --- .../runtime/ray/transform_orchestrator.py | 6 +- .../test_support/transform/__init__.py | 1 + .../transform/noop_folder_transform.py | 57 +++++++++++++++++++ .../test_support/transform/noop_transform.py | 4 +- .../launch/ray/ray_test_noop_folder_launch.py | 33 +++++++++++ 5 files changed, 95 insertions(+), 6 deletions(-) create mode 100644 data-processing-lib/ray/src/data_processing_ray/test_support/transform/noop_folder_transform.py create mode 100644 data-processing-lib/ray/test/data_processing_ray_tests/launch/ray/ray_test_noop_folder_launch.py diff --git a/data-processing-lib/ray/src/data_processing_ray/runtime/ray/transform_orchestrator.py b/data-processing-lib/ray/src/data_processing_ray/runtime/ray/transform_orchestrator.py index b29682997a..da39cbcf7a 100644 --- a/data-processing-lib/ray/src/data_processing_ray/runtime/ray/transform_orchestrator.py +++ b/data-processing-lib/ray/src/data_processing_ray/runtime/ray/transform_orchestrator.py @@ -68,6 +68,9 @@ def orchestrate( if len(files) == 0: logger.error("No input files to process - exiting") return 0 + # log retries + if retries > 0: + statistics.add_stats.remote({"data access retries": retries}) logger.info(f"Number of files is {len(files)}, source profile {profile}") # Print interval print_interval = int(len(files) / 100) @@ -79,9 +82,6 @@ def orchestrate( logger.info( f"Number of workers - {preprocessing_params.n_workers} " f"with {preprocessing_params.worker_options} each" ) - # log retries - if retries > 0: - statistics.add_stats.remote({"data access retries": retries}) # create executors processor_params = { "data_access_factory": data_access_factory, diff --git a/data-processing-lib/ray/src/data_processing_ray/test_support/transform/__init__.py b/data-processing-lib/ray/src/data_processing_ray/test_support/transform/__init__.py index a6cd700f75..dd095c961f 100644 --- a/data-processing-lib/ray/src/data_processing_ray/test_support/transform/__init__.py +++ b/data-processing-lib/ray/src/data_processing_ray/test_support/transform/__init__.py @@ -1 +1,2 @@ from data_processing_ray.test_support.transform.noop_transform import NOOPRayTransformConfiguration +from data_processing_ray.test_support.transform.noop_folder_transform import NOOPFolderRayTransformConfiguration diff --git a/data-processing-lib/ray/src/data_processing_ray/test_support/transform/noop_folder_transform.py b/data-processing-lib/ray/src/data_processing_ray/test_support/transform/noop_folder_transform.py new file mode 100644 index 0000000000..9919600c43 --- /dev/null +++ b/data-processing-lib/ray/src/data_processing_ray/test_support/transform/noop_folder_transform.py @@ -0,0 +1,57 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + + +from data_processing.test_support.transform import NOOPTransformConfiguration +from data_processing.test_support.transform import NOOPFolderTransform +from data_processing.utils import get_logger +from data_processing_ray.runtime.ray import ( + RayTransformLauncher, + RayTransformRuntimeConfiguration, + DefaultRayTransformRuntime +) +from data_processing.data_access import DataAccess + + +logger = get_logger(__name__) + + +class NOOPFolderPythonRuntime(DefaultRayTransformRuntime): + def get_folders(self, data_access: DataAccess) -> list[str]: + """ + Get folders to process + :param data_access: data access + :return: list of folders to process + """ + return [data_access.get_input_folder()] + + +class NOOPFolderRayTransformConfiguration(RayTransformRuntimeConfiguration): + """ + Implements the RayTransformConfiguration for NOOP as required by the RayTransformLauncher. + NOOP does not use a RayRuntime class so the superclass only needs the base + python-only configuration. + """ + + def __init__(self): + """ + Initialization + """ + super().__init__(transform_config=NOOPTransformConfiguration(clazz=NOOPFolderTransform), + runtime_class=NOOPFolderPythonRuntime) + + +if __name__ == "__main__": + # launcher = NOOPRayLauncher() + launcher = RayTransformLauncher(NOOPFolderRayTransformConfiguration()) + logger.info("Launching noop transform") + launcher.launch() diff --git a/data-processing-lib/ray/src/data_processing_ray/test_support/transform/noop_transform.py b/data-processing-lib/ray/src/data_processing_ray/test_support/transform/noop_transform.py index 67cf202531..a2082c48c5 100644 --- a/data-processing-lib/ray/src/data_processing_ray/test_support/transform/noop_transform.py +++ b/data-processing-lib/ray/src/data_processing_ray/test_support/transform/noop_transform.py @@ -11,9 +11,7 @@ ################################################################################ -from data_processing.test_support.transform.noop_transform import ( - NOOPTransformConfiguration, -) +from data_processing.test_support.transform import NOOPTransformConfiguration from data_processing.utils import get_logger from data_processing_ray.runtime.ray import ( RayTransformLauncher, diff --git a/data-processing-lib/ray/test/data_processing_ray_tests/launch/ray/ray_test_noop_folder_launch.py b/data-processing-lib/ray/test/data_processing_ray_tests/launch/ray/ray_test_noop_folder_launch.py new file mode 100644 index 0000000000..cd61c67451 --- /dev/null +++ b/data-processing-lib/ray/test/data_processing_ray_tests/launch/ray/ray_test_noop_folder_launch.py @@ -0,0 +1,33 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import os + +from data_processing.test_support.launch.transform_test import ( + AbstractTransformLauncherTest, +) +from data_processing_ray.runtime.ray import RayTransformLauncher +from data_processing_ray.test_support.transform import NOOPFolderRayTransformConfiguration + + +class TestRayNOOPTransform(AbstractTransformLauncherTest): + """ + Extends the super-class to define the test data for the tests defined there. + The name of this class MUST begin with the word Test so that pytest recognizes it as a test class. + """ + + def get_test_transform_fixtures(self) -> list[tuple]: + basedir = "../../../../test-data/data_processing/ray/noop/" + basedir = os.path.abspath(os.path.join(os.path.dirname(__file__), basedir)) + launcher = RayTransformLauncher(NOOPFolderRayTransformConfiguration()) + fixtures = [(launcher, {"noop_sleep_sec": 0, "run_locally": True}, basedir + "/input", basedir + "/expected")] + return fixtures From c2dd53cc4c901552784979e561c1a72a272343c5 Mon Sep 17 00:00:00 2001 From: blublinsky Date: Sun, 13 Oct 2024 09:07:48 +0100 Subject: [PATCH 7/9] added noop Spark testing --- .../transform/noop_folder_transform.py | 7 ++- .../test_support/transform/__init__.py | 1 + .../transform/noop_folder_transform.py | 53 +++++++++++++++++++ .../launch/spark/test_noop_folder_launch.py | 34 ++++++++++++ 4 files changed, 91 insertions(+), 4 deletions(-) create mode 100644 data-processing-lib/spark/src/data_processing_spark/test_support/transform/noop_folder_transform.py create mode 100644 data-processing-lib/spark/test/data_processing_spark_tests/launch/spark/test_noop_folder_launch.py diff --git a/data-processing-lib/ray/src/data_processing_ray/test_support/transform/noop_folder_transform.py b/data-processing-lib/ray/src/data_processing_ray/test_support/transform/noop_folder_transform.py index 9919600c43..1d084b58ab 100644 --- a/data-processing-lib/ray/src/data_processing_ray/test_support/transform/noop_folder_transform.py +++ b/data-processing-lib/ray/src/data_processing_ray/test_support/transform/noop_folder_transform.py @@ -11,8 +11,7 @@ ################################################################################ -from data_processing.test_support.transform import NOOPTransformConfiguration -from data_processing.test_support.transform import NOOPFolderTransform +from data_processing.test_support.transform import NOOPFolderTransform, NOOPTransformConfiguration from data_processing.utils import get_logger from data_processing_ray.runtime.ray import ( RayTransformLauncher, @@ -25,7 +24,7 @@ logger = get_logger(__name__) -class NOOPFolderPythonRuntime(DefaultRayTransformRuntime): +class NOOPFolderRayRuntime(DefaultRayTransformRuntime): def get_folders(self, data_access: DataAccess) -> list[str]: """ Get folders to process @@ -47,7 +46,7 @@ def __init__(self): Initialization """ super().__init__(transform_config=NOOPTransformConfiguration(clazz=NOOPFolderTransform), - runtime_class=NOOPFolderPythonRuntime) + runtime_class=NOOPFolderRayRuntime) if __name__ == "__main__": diff --git a/data-processing-lib/spark/src/data_processing_spark/test_support/transform/__init__.py b/data-processing-lib/spark/src/data_processing_spark/test_support/transform/__init__.py index 83516f9aee..041cb43d64 100644 --- a/data-processing-lib/spark/src/data_processing_spark/test_support/transform/__init__.py +++ b/data-processing-lib/spark/src/data_processing_spark/test_support/transform/__init__.py @@ -11,3 +11,4 @@ ################################################################################ from data_processing_spark.test_support.transform.noop_transform import NOOPSparkTransformConfiguration +from data_processing_spark.test_support.transform.noop_folder_transform import NOOPFolderSparkTransformConfiguration diff --git a/data-processing-lib/spark/src/data_processing_spark/test_support/transform/noop_folder_transform.py b/data-processing-lib/spark/src/data_processing_spark/test_support/transform/noop_folder_transform.py new file mode 100644 index 0000000000..9972e0f791 --- /dev/null +++ b/data-processing-lib/spark/src/data_processing_spark/test_support/transform/noop_folder_transform.py @@ -0,0 +1,53 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +from data_processing.test_support.transform import NOOPFolderTransform, NOOPTransformConfiguration +from data_processing.utils import get_logger +from data_processing_spark.runtime.spark import SparkTransformLauncher +from data_processing_spark.runtime.spark import SparkTransformRuntimeConfiguration, DefaultSparkTransformRuntime +from data_processing.data_access import DataAccess + + +logger = get_logger(__name__) + + +class NOOPFolderSparkRuntime(DefaultSparkTransformRuntime): + def get_folders(self, data_access: DataAccess) -> list[str]: + """ + Get folders to process + :param data_access: data access + :return: list of folders to process + """ + return [data_access.get_input_folder()] + + +class NOOPFolderSparkTransformConfiguration(SparkTransformRuntimeConfiguration): + """ + Implements the SparkTransformConfiguration for NOOP as required by the PythonTransformLauncher. + NOOP does not use a RayRuntime class so the superclass only needs the base + python-only configuration. + """ + + def __init__(self): + """ + Initialization + """ + super().__init__(transform_config=NOOPTransformConfiguration(clazz=NOOPFolderTransform), + runtime_class=NOOPFolderSparkRuntime) + + +if __name__ == "__main__": + # create launcher + launcher = SparkTransformLauncher(runtime_config=NOOPFolderSparkTransformConfiguration()) + logger.info("Launching noop transform") + # Launch the ray actor(s) to process the input + launcher.launch() diff --git a/data-processing-lib/spark/test/data_processing_spark_tests/launch/spark/test_noop_folder_launch.py b/data-processing-lib/spark/test/data_processing_spark_tests/launch/spark/test_noop_folder_launch.py new file mode 100644 index 0000000000..c8e3ce40b1 --- /dev/null +++ b/data-processing-lib/spark/test/data_processing_spark_tests/launch/spark/test_noop_folder_launch.py @@ -0,0 +1,34 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import os + +from data_processing.test_support.launch.transform_test import ( + AbstractTransformLauncherTest, +) +from data_processing_spark.runtime.spark import SparkTransformLauncher +from data_processing_spark.test_support.transform import NOOPFolderSparkTransformConfiguration + + +class TestSparkNOOPTransform(AbstractTransformLauncherTest): + """ + Extends the super-class to define the test data for the tests defined there. + The name of this class MUST begin with the word Test so that pytest recognizes it as a test class. + """ + + def get_test_transform_fixtures(self) -> list[tuple]: + basedir = "../../../../test-data" + basedir = os.path.abspath(os.path.join(os.path.dirname(__file__), basedir)) + fixtures = [] + launcher = SparkTransformLauncher(NOOPFolderSparkTransformConfiguration()) + fixtures.append((launcher, {"noop_sleep_sec": 1}, basedir + "/input", basedir + "/expected")) + return fixtures From 59d57df4f1873767c6d0b79ba7e69010004a5dc0 Mon Sep 17 00:00:00 2001 From: blublinsky Date: Sun, 13 Oct 2024 10:03:21 +0100 Subject: [PATCH 8/9] more data access simplifications --- .../src/data_processing/data_access/data_access.py | 5 ++++- .../data_processing/data_access/data_access_local.py | 11 ----------- .../src/data_processing/data_access/data_access_s3.py | 11 ----------- 3 files changed, 4 insertions(+), 23 deletions(-) diff --git a/data-processing-lib/python/src/data_processing/data_access/data_access.py b/data-processing-lib/python/src/data_processing/data_access/data_access.py index bba5afd2bd..51d7b54b81 100644 --- a/data-processing-lib/python/src/data_processing/data_access/data_access.py +++ b/data-processing-lib/python/src/data_processing/data_access/data_access.py @@ -358,7 +358,10 @@ def get_output_location(self, path: str) -> str: :param path: input file location :return: output file location """ - raise NotImplementedError("Subclasses should implement this!") + if self.get_output_folder() is None: + self.logger.error("Get out put location. S3 configuration is not provided, returning None") + return None + return path.replace(self.get_input_folder(), self.get_output_folder()) def save_table(self, path: str, table: pa.Table) -> tuple[int, dict[str, Any], int]: """ diff --git a/data-processing-lib/python/src/data_processing/data_access/data_access_local.py b/data-processing-lib/python/src/data_processing/data_access/data_access_local.py index 224e30ce86..d37e571a3e 100644 --- a/data-processing-lib/python/src/data_processing/data_access/data_access_local.py +++ b/data-processing-lib/python/src/data_processing/data_access/data_access_local.py @@ -130,17 +130,6 @@ def get_table(self, path: str) -> tuple[pa.table, int]: logger.error(f"Error reading table from {path}: {e}") return None, 0 - def get_output_location(self, path: str) -> str: - """ - Get output location based on input - :param path: input file location - :return: output file location - """ - if self.output_folder is None: - logger.error("Get output location. local configuration is not defined, returning None") - return None - return path.replace(self.input_folder, self.output_folder) - def save_table(self, path: str, table: pa.Table) -> tuple[int, dict[str, Any], int]: """ Saves a pyarrow table to a file and returns information about the operation. diff --git a/data-processing-lib/python/src/data_processing/data_access/data_access_s3.py b/data-processing-lib/python/src/data_processing/data_access/data_access_s3.py index 43e13bcb1c..8ddc772c52 100644 --- a/data-processing-lib/python/src/data_processing/data_access/data_access_s3.py +++ b/data-processing-lib/python/src/data_processing/data_access/data_access_s3.py @@ -126,17 +126,6 @@ def get_table(self, path: str) -> tuple[pyarrow.table, int]: self.logger.error(f"Exception reading table {path} from S3 - {e}") return None, 0 - def get_output_location(self, path: str) -> str: - """ - Get output location based on input - :param path: input file location - :return: output file location - """ - if self.output_folder is None: - self.logger.error("Get out put location. S3 configuration is not provided, returning None") - return None - return path.replace(self.input_folder, self.output_folder) - def save_table(self, path: str, table: pyarrow.Table) -> tuple[int, dict[str, Any], int]: """ Save table to a given location From 7b7736c5301f84a5635a156dfa2416f066c409f7 Mon Sep 17 00:00:00 2001 From: blublinsky Date: Mon, 14 Oct 2024 20:56:16 +0100 Subject: [PATCH 9/9] documentation update --- data-processing-lib/doc/transforms.md | 34 +++++++++++++-------------- 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/data-processing-lib/doc/transforms.md b/data-processing-lib/doc/transforms.md index c8f1b74e84..fc3509ba33 100644 --- a/data-processing-lib/doc/transforms.md +++ b/data-processing-lib/doc/transforms.md @@ -1,26 +1,24 @@ # Transforms -[All transforms](../python/src/data_processing/transform/abstract_transform.py) -are generalized to operate on generically typed `DATA.` -[Ray](ray-runtime.md) and [Python](python-runtime.md) runtimes -currently support `DATA` as both byte arrays -and [pyarrow Tables](https://arrow.apache.org/docs/python/generated/pyarrow.Table.html). -The [Spark runtime](spark-runtime.md) currently supports the native Spark `DataFrame`. +Transform is a basic integration unit of DPK that can be executed in any of the supported by the DPK +runtimes ([Python](python-runtime.md), [Ray](ray-runtime.md) and [Spark](spark-runtime.md)). All transforms +are derived from the +[AbstractTransform class](../python/src/data_processing/transform/abstract_transform.py). Theis class +provides no functionality and is used as just a marker that a given class implements transform. +There are currently two types of transforms defined in DPK: -All transforms convert their input `DATA` to a list of transformed `DATA` objects -and optional metadata about the transformation of the `DATA` instance. -The Transform itself need only be concerned with the conversion -of one `DATA` instance at a time. -Transforms, where possible, should be implemented without regard to the -runtime it will run in or where its configuration originates. +* [AbstractBinaryTransform](../python/src/data_processing/transform/binary_transform.py) which is a base +class for all data transforms. Data transforms convert a file of data producing zero or more data files +and metadata. A specific class of the binary transform is +[AbstractTableTransform](../python/src/data_processing/transform/table_transform.py) that consumes and produces +data files containing [pyarrow tables](https://arrow.apache.org/docs/python/generated/pyarrow.Table.html) +* [AbstractFolderTransform](../python/src/data_processing/transform/folder_transform.py) which is a base +class consuming a folder (that can contain an arbitrary set of files, that need to be processed together) +and proces zero or more data files and metadata. -In the discussion that follows, we'll focus on the transformation of pyarrow Tables -using the `AbstractTableTransform` class (see below), supported by both -the Ray and Python runtimes. -Mapping from this tutorial to a Spark runtime can be done by using -`data-prep-kit-spark`'s [AbstractSparkTransform](../spark/src/data_processing_spark/runtime/spark/spark_transform.py) -which operates on a Spark DataFrame instead of a pyarrow Table. +In the discussion that follows, we'll focus on the transformation of pyarrow Tables +using the `AbstractTableTransform` class (see below), supported by Ray Spark and Python runtimes. #### AbstractTableTransform class [AbstractTableTransform](../python/src/data_processing/transform/table_transform.py)