diff --git a/data-processing-lib/python/requirements.txt b/data-processing-lib/python/requirements.txt index 318d715d5..39e26a68b 100644 --- a/data-processing-lib/python/requirements.txt +++ b/data-processing-lib/python/requirements.txt @@ -5,3 +5,4 @@ mmh3 psutil polars>=1.9.0 + huggingface-hub>=0.25.2 diff --git a/data-processing-lib/python/src/data_processing/data_access/__init__.py b/data-processing-lib/python/src/data_processing/data_access/__init__.py index 1f1d77928..dee751f0c 100644 --- a/data-processing-lib/python/src/data_processing/data_access/__init__.py +++ b/data-processing-lib/python/src/data_processing/data_access/__init__.py @@ -2,6 +2,7 @@ from data_processing.data_access.data_access import DataAccess from data_processing.data_access.data_access_local import DataAccessLocal from data_processing.data_access.data_access_s3 import DataAccessS3 +from data_processing.data_access.data_access_hf import DataAccessHF from data_processing.data_access.data_access_factory_base import DataAccessFactoryBase from data_processing.data_access.data_access_factory import DataAccessFactory from data_processing.data_access.snapshot_utils import SnapshotUtils diff --git a/data-processing-lib/python/src/data_processing/data_access/data_access_factory.py b/data-processing-lib/python/src/data_processing/data_access/data_access_factory.py index 2172e3ed0..16ed5231b 100644 --- a/data-processing-lib/python/src/data_processing/data_access/data_access_factory.py +++ b/data-processing-lib/python/src/data_processing/data_access/data_access_factory.py @@ -19,6 +19,7 @@ DataAccessFactoryBase, DataAccessLocal, DataAccessS3, + DataAccessHF, ) from data_processing.utils import ParamsUtils, str2bool @@ -46,6 +47,7 @@ def __init__(self, cli_arg_prefix: str = "data_", enable_data_navigation: bool = super().__init__(cli_arg_prefix=cli_arg_prefix) self.s3_config = None self.local_config = None + self.hf_config = None self.enable_data_navigation = enable_data_navigation def add_input_params(self, parser: argparse.ArgumentParser) -> None: @@ -77,6 +79,7 @@ def add_input_params(self, parser: argparse.ArgumentParser) -> None: self.__add_data_navigation_params(parser) def __add_data_navigation_params(self, parser): + # s3 config help_example_dict = { "input_folder": [ "s3-path/your-input-bucket", @@ -93,6 +96,7 @@ def __add_data_navigation_params(self, parser): default=None, help="AST string containing input/output paths.\n" + ParamsUtils.get_ast_help_text(help_example_dict), ) + # local config help_example_dict = { "input_folder": ["./input", "Path to input folder of files to be processed"], "output_folder": ["/tmp/output", "Path to output folder of processed files"], @@ -104,6 +108,19 @@ def __add_data_navigation_params(self, parser): help="ast string containing input/output folders using local fs.\n" + ParamsUtils.get_ast_help_text(help_example_dict), ) + # hf config + help_example_dict = { + "hf_token": ["./input", "HF token required for write operation"], + "input_folder": ["./input", "Path to input folder of files to be processed"], + "output_folder": ["/tmp/output", "Path to output folder of processed files"], + } + parser.add_argument( + f"--{self.cli_arg_prefix}hf_config", + type=ast.literal_eval, + default=None, + help="ast string containing hf_token/input/output folders using hf fs.\n" + + ParamsUtils.get_ast_help_text(help_example_dict), + ) parser.add_argument( f"--{self.cli_arg_prefix}max_files", type=int, default=-1, help="Max amount of files to process" ) @@ -154,6 +171,7 @@ def apply_input_params(self, args: Union[dict, argparse.Namespace]) -> bool: s3_cred = arg_dict.get(f"{self.cli_arg_prefix}s3_cred", None) s3_config = arg_dict.get(f"{self.cli_arg_prefix}s3_config", None) local_config = arg_dict.get(f"{self.cli_arg_prefix}local_config", None) + hf_config = arg_dict.get(f"{self.cli_arg_prefix}hf_config", None) checkpointing = arg_dict.get(f"{self.cli_arg_prefix}checkpointing", False) max_files = arg_dict.get(f"{self.cli_arg_prefix}max_files", -1) data_sets = arg_dict.get(f"{self.cli_arg_prefix}data_sets", None) @@ -163,18 +181,20 @@ def apply_input_params(self, args: Union[dict, argparse.Namespace]) -> bool: # check which configuration (S3 or Local) is specified s3_config_specified = 1 if s3_config is not None else 0 local_config_specified = 1 if local_config is not None else 0 + hf_config_specified = 1 if hf_config is not None else 0 # check that only one (S3 or Local) configuration is specified - if s3_config_specified + local_config_specified > 1: + if s3_config_specified + local_config_specified + hf_config_specified > 1: self.logger.error( f"data factory {self.cli_arg_prefix} " f"{'S3, ' if s3_config_specified == 1 else ''}" f"{'Local ' if local_config_specified == 1 else ''}" + f"{'hf ' if hf_config_specified == 1 else ''}" "configurations specified, but only one configuration expected" ) return False - # further validate the specified configuration (S3 or Local) + # further validate the specified configuration (S3, hf or Local) if s3_config_specified == 1: if not self._validate_s3_config(s3_config=s3_config): return False @@ -188,6 +208,20 @@ def apply_input_params(self, args: Union[dict, argparse.Namespace]) -> bool: f'input path - {self.s3_config["input_folder"]}, ' f'output path - {self.s3_config["output_folder"]}' ) + elif hf_config_specified == 1: + if not self._validate_hf_config(hf_config=hf_config): + return False + self.hf_config = hf_config + self.logger.info( + f"data factory {self.cli_arg_prefix} is using HF data access: " + f"input_folder - {self.hf_config['input_folder']} " + f"output_folder - {self.hf_config['output_folder']}" + ) + elif s3_cred is not None: + if not self._validate_s3_cred(s3_credentials=s3_cred): + return False + self.s3_cred = s3_cred + self.logger.info(f"data factory {self.cli_arg_prefix} is using s3 configuration without input/output path") elif local_config_specified == 1: if not self._validate_local_config(local_config=local_config): return False @@ -197,11 +231,6 @@ def apply_input_params(self, args: Union[dict, argparse.Namespace]) -> bool: f"input_folder - {self.local_config['input_folder']} " f"output_folder - {self.local_config['output_folder']}" ) - elif s3_cred is not None: - if not self._validate_s3_cred(s3_credentials=s3_cred): - return False - self.s3_cred = s3_cred - self.logger.info(f"data factory {self.cli_arg_prefix} is using s3 configuration without input/output path") else: self.logger.info( f"data factory {self.cli_arg_prefix} " f"is using local configuration without input/output path" @@ -240,11 +269,10 @@ def create_data_access(self) -> DataAccess: Create data access based on the parameters :return: corresponding data access class """ - if self.s3_config is not None or self.s3_cred is not None: - # If S3 config or S3 credential are specified, its S3 - return DataAccessS3( - s3_credentials=self.s3_cred, - s3_config=self.s3_config, + if self.hf_config is not None: + # hf-config is specified, its hf + return DataAccessHF( + hf_config=self.hf_config, d_sets=self.dsets, checkpoint=self.checkpointing, m_files=self.max_files, @@ -252,10 +280,11 @@ def create_data_access(self) -> DataAccess: files_to_use=self.files_to_use, files_to_checkpoint=self.files_to_checkpoint, ) - else: - # anything else is local data - return DataAccessLocal( - local_config=self.local_config, + if self.s3_config is not None or self.s3_cred is not None: + # If S3 config or S3 credential are specified, its S3 + return DataAccessS3( + s3_credentials=self.s3_cred, + s3_config=self.s3_config, d_sets=self.dsets, checkpoint=self.checkpointing, m_files=self.max_files, @@ -263,3 +292,13 @@ def create_data_access(self) -> DataAccess: files_to_use=self.files_to_use, files_to_checkpoint=self.files_to_checkpoint, ) + # anything else is local data + return DataAccessLocal( + local_config=self.local_config, + d_sets=self.dsets, + checkpoint=self.checkpointing, + m_files=self.max_files, + n_samples=self.n_samples, + files_to_use=self.files_to_use, + files_to_checkpoint=self.files_to_checkpoint, + ) diff --git a/data-processing-lib/python/src/data_processing/data_access/data_access_factory_base.py b/data-processing-lib/python/src/data_processing/data_access/data_access_factory_base.py index cef7c9657..8e011a494 100644 --- a/data-processing-lib/python/src/data_processing/data_access/data_access_factory_base.py +++ b/data-processing-lib/python/src/data_processing/data_access/data_access_factory_base.py @@ -130,8 +130,8 @@ def _validate_local_config(self, local_config: dict[str, str]) -> bool: def _validate_s3_config(self, s3_config: dict[str, str]) -> bool: """ Validate that - :param s3_config: dictionary of local config - :return: True if s3l config is valid, False otherwise + :param s3_config: dictionary of s3 config + :return: True if s3 config is valid, False otherwise """ valid_config = True if s3_config.get("input_folder", "") == "": @@ -141,3 +141,35 @@ def _validate_s3_config(self, s3_config: dict[str, str]) -> bool: valid_config = False self.logger.error(f"data access factory {self.cli_arg_prefix}: Could not find output folder in s3 config") return valid_config + + def _validate_hf_config(self, hf_config: dict[str, str]) -> bool: + """ + Validate that + :param s3_config: dictionary of hf config + :return: True if hf config is valid, False otherwise + """ + valid_config = True + if hf_config.get("hf_token", "") == "": + self.logger.warning(f"data access factory {self.cli_arg_prefix}: " + f"HF token is not defined, write operation may fail") + input_folder = hf_config.get("input_folder", "") + if input_folder == "": + valid_config = False + self.logger.error(f"data access factory {self.cli_arg_prefix}: Could not find input folder in HF config") + else: + if not input_folder.startswith("datasets/"): + valid_config = False + self.logger.error(f"data access factory {self.cli_arg_prefix}: " + f"Input folder in HF config has to start from datasets/") + + output_folder = hf_config.get("output_folder", "") + if output_folder == "": + valid_config = False + self.logger.error(f"data access factory {self.cli_arg_prefix}: Could not find output folder in HF config") + else: + if not output_folder.startswith("datasets/"): + valid_config = False + self.logger.error(f"data access factory {self.cli_arg_prefix}: " + f"Output folder in HF config has to start from datasets/") + + return valid_config diff --git a/data-processing-lib/python/src/data_processing/data_access/data_access_hf.py b/data-processing-lib/python/src/data_processing/data_access/data_access_hf.py new file mode 100644 index 000000000..395f08e9d --- /dev/null +++ b/data-processing-lib/python/src/data_processing/data_access/data_access_hf.py @@ -0,0 +1,294 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import gzip +import json +import os +from pathlib import Path +from typing import Any + +import pyarrow as pa +from huggingface_hub import HfFileSystem, RepoCard +from huggingface_hub.errors import EntryNotFoundError +from data_processing.data_access import DataAccess +from data_processing.utils import TransformUtils, get_logger + + +logger = get_logger(__name__) + + +class DataAccessHF(DataAccess): + """ + Implementation of the Base Data access class for local folder data access. + """ + + def __init__( + self, + hf_config: dict[str, str] = None, + d_sets: list[str] = None, + checkpoint: bool = False, + m_files: int = -1, + n_samples: int = -1, + files_to_use: list[str] = [".parquet"], + files_to_checkpoint: list[str] = [".parquet"], + ): + """ + Create data access class for folder based configuration + :param hf_config: dictionary of path info + :param d_sets list of the data sets to use + :param checkpoint: flag to return only files that do not exist in the output directory + :param m_files: max amount of files to return + :param n_samples: amount of files to randomly sample + :param files_to_use: files extensions of files to include + :param files_to_checkpoint: files extensions of files to use for checkpointing + """ + super().__init__(d_sets=d_sets, checkpoint=checkpoint, m_files=m_files, n_samples=n_samples, + files_to_use=files_to_use, files_to_checkpoint=files_to_checkpoint) + if hf_config is None: + self.input_folder = None + self.output_folder = None + else: + self.input_folder = hf_config["input_folder"] + if self.input_folder[-1] == "/": + self.input_folder = self.input_folder[:-1] + self.output_folder = hf_config["output_folder"] + if self.output_folder[-1] == "/": + self.output_folder = self.output_folder[:-1] + self.hf_config = hf_config + self.fs = HfFileSystem(token=hf_config["hf_token"]) + + logger.debug(f"hf input folder: {self.input_folder}") + logger.debug(f"hf output folder: {self.output_folder}") + logger.debug(f"hf data sets: {self.d_sets}") + logger.debug(f"hf checkpoint: {self.checkpoint}") + logger.debug(f"hf m_files: {self.m_files}") + logger.debug(f"hf n_samples: {self.n_samples}") + logger.debug(f"hf files_to_use: {self.files_to_use}") + logger.debug(f"hf files_to_checkpoint: {self.files_to_checkpoint}") + + def get_output_folder(self) -> str: + """ + Get output folder as a string + :return: output_folder + """ + return self.output_folder + + def get_input_folder(self) -> str: + """ + Get input folder as a string + :return: input_folder + """ + return self.input_folder + + def _get_file_size(self, path: str) -> int: + """ + Get file size in bytes + :param path: file path + :return: file size in bytes + """ + return self.fs.info(path=path)['size'] + + def _list_files_folder(self, path: str) -> tuple[list[dict[str, Any]], int]: + """ + Get files for a given folder and all sub folders + :param path: path + :return: List of files + """ + files = sorted(self.fs.glob(path=f"{path}/**/*.*")) + res = [{"name": file, "size": self._get_file_size(file)} for file in files] + return res, 0 + + def _get_folders_to_use(self) -> tuple[list[str], int]: + """ + convert data sets to a list of folders to use + :return: list of folders and retries + """ + folders_to_use = [] + files = self.fs.ls(path=self.input_folder) + dirs = [f['name'] for f in files if f['type'] == 'directory'] + + for file in dirs: + for s_name in self.d_sets: + if file.endswith(s_name): + folders_to_use.append(file) + break + return folders_to_use, 0 + + def get_table(self, path: str) -> tuple[pa.table, int]: + """ + Attempts to read a PyArrow table from the given path. + + Args: + path (str): Path to the file containing the table. + + Returns: + pyarrow.Table: PyArrow table if read successfully, None otherwise. + """ + + try: + data, retries = self.get_file(path=path) + return TransformUtils.convert_binary_to_arrow(data=data), retries + except Exception as e: + logger.error(f"Error reading table from {path}: {e}") + return None, 0 + + def save_table(self, path: str, table: pa.Table) -> tuple[int, dict[str, Any], int]: + """ + Saves a pyarrow table to a file and returns information about the operation. + + Args: + table (pyarrow.Table): The pyarrow table to save. + path (str): The path to the output file. + + Returns: + tuple: A tuple containing: + - size_in_memory (int): The size of the table in memory (bytes). + - file_info (dict or None): A dictionary containing: + - name (str): The name of the file. + - size (int): The size of the file (bytes). + If saving fails, file_info will be None. + """ + # Get table size in memory + try: + # Write the table to parquet format + data = TransformUtils.convert_arrow_to_binary(table=table) + finfo, retries = self.save_file(path=path, data=data) + return len(data), finfo, retries + + except Exception as e: + logger.error(f"Error saving table to {path}: {e}") + return -1, None, 0 + + def save_job_metadata(self, metadata: dict[str, Any]) -> tuple[dict[str, Any], int]: + """ + Save metadata + :param metadata: a dictionary, containing the following keys: + "pipeline", + "job details", + "code", + "job_input_params", + "execution_stats", + "job_output_stats" + two additional elements: + "source" + "target" + are filled bu implementation + :return: a dictionary as + defined https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/put_object.html + in the case of failure dict is None + """ + if self.output_folder is None: + logger.error("hf configuration is not defined, can't save metadata") + return None, 0 + metadata["source"] = {"name": self.input_folder, "type": "path"} + metadata["target"] = {"name": self.output_folder, "type": "path"} + return self.save_file( + path=f"{self.output_folder}/metadata.json", + data=json.dumps(metadata, indent=2).encode(), + ) + + def get_file(self, path: str) -> tuple[bytes, int]: + """ + Gets the contents of a file as a byte array, decompressing gz files if needed. + + Args: + path (str): The path to the file. + + Returns: + bytes: The contents of the file as a byte array, or None if an error occurs. + """ + + try: + with self.fs.open(path=path, mode="rb") as f: + return f.read(), 0 + except Exception as e: + logger.error(f"Error reading file {path}: {e}") + raise e + + def save_file(self, path: str, data: bytes) -> tuple[dict[str, Any], int]: + """ + Saves bytes to a file and returns a dictionary with file information. + + Args: + data (bytes): The bytes data to save. + path (str): The full name of the file to save. + + Returns: + dict or None: A dictionary with "name" and "size" keys if successful, + or None if saving fails. + """ + # make sure that token is defined + if self.hf_config["hf_token"] is None: + logger.warning("Writing file is only supported when HF_TOKEN is defined") + return None, 0 + try: + with self.fs.open(path=path, mode="wb") as f: + f.write(data) + file_info = {"name": path, "size": self.fs.info(path=path)['size']} + return file_info, 0 + except Exception as e: + logger.error(f"Error saving bytes to file {path}: {e}") + return None, 0 + + def get_dataset_card(self, ds_name: str) -> RepoCard: + """ + Get the Repo card for the data set + :param ds_name: data set name in the format owner/ds_name + :return: DS card object + """ + # get file location + if ds_name[-1] == "/": + path = f"datasets/{ds_name[:-1]}/README.md" + else: + path = f"datasets/{ds_name}/README.md" + # read README file + try: + with self.fs.open(path=path, mode="r", newline="", encoding="utf-8") as f: + data = f.read() + except Exception as e: + logger.warning(f"Failted to read README file {e}") + return None + # convert README to Repo card + return RepoCard(content=data) + + def update_data_set_card(self, ds_name: str, content: str) -> None: + """ + Update Repo card + :param ds_name: data set name in the format owner/ds_name + :param content: new readme content + :return: None + """ + # make sure that token is defined + if self.hf_config["hf_token"] is None: + raise Exception("Update data set card is only supported when HF_TOKEN is defined") + # get file location + if ds_name[-1] == "/": + path = f"datasets/{ds_name[:-1]}/README.md" + else: + path = f"datasets/{ds_name}/README.md" + # delete current Readme file + try: + self.fs.rm(path=path) + except EntryNotFoundError: + # If the README file does not exist, ignore + logger.warning(f"Data set {ds_name} does not have README file") + except Exception as e: + logger.warning(f"Failted to delete README file {e}") + raise e + # write new Readme file + try: + with self.fs.open(path=path, mode="w", newline="", encoding="utf-8") as f: + f.write(content) + except Exception as e: + logger.warning(f"Failted to save README file {e}") + raise e + diff --git a/data-processing-lib/python/test/data_processing_tests/data_access/data_access_factory_hf_test.py b/data-processing-lib/python/test/data_processing_tests/data_access/data_access_factory_hf_test.py new file mode 100644 index 000000000..66713a704 --- /dev/null +++ b/data-processing-lib/python/test/data_processing_tests/data_access/data_access_factory_hf_test.py @@ -0,0 +1,60 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import sys +from argparse import ArgumentParser + +from data_processing.data_access import DataAccessFactory, DataAccessHF +from data_processing.utils import ParamsUtils + + +def test_creating_hf_data_access(): + """ + Testing creation of HF data access + :return: None + """ + input_folder = "datasets/blublinsky/test/data" + output_folder = "datasets/blublinsky/test/temp" + hf_token = "token" + hf_conf = { + "hf_token": hf_token, + "input_folder": input_folder, + "output_folder": output_folder, + } + params = {} + params["data_hf_config"] = hf_conf + params["data_num_samples"] = 3 + params["data_data_sets"] = ["ds1"] + params["data_files_to_use"] = [".nothere"] + + # Set the simulated command line args + sys.argv = ParamsUtils.dict_to_req(params) + daf = DataAccessFactory() + parser = ArgumentParser() + daf.add_input_params(parser) + args = parser.parse_args() + daf.apply_input_params(args) + + # create data_access + data_access = daf.create_data_access() + + # validate created data access + assert isinstance(data_access, DataAccessHF) + assert data_access.input_folder == input_folder + assert data_access.output_folder == output_folder + assert data_access.fs.token == hf_token + + assert data_access.n_samples == 3 + assert data_access.d_sets == ["ds1"] + assert data_access.files_to_use == [".nothere"] + + diff --git a/data-processing-lib/python/test/data_processing_tests/data_access/data_access_hf_test.py b/data-processing-lib/python/test/data_processing_tests/data_access/data_access_hf_test.py new file mode 100644 index 000000000..b8a3a7ea3 --- /dev/null +++ b/data-processing-lib/python/test/data_processing_tests/data_access/data_access_hf_test.py @@ -0,0 +1,96 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + + +from data_processing.data_access import DataAccessHF +from huggingface_hub import CardData + +hf_conf = { + "hf_token": None, + "input_folder": 'datasets/blublinsky/test/data', + "output_folder": 'datasets/blublinsky/test/temp/', +} + + +def test_hf_data_access(): + """ + Testing data access of HF data access + :return: None + """ + data_access = DataAccessHF(hf_config=hf_conf) + # get files to process + files, profile, retries = data_access.get_files_to_process() + assert len(files) == 50 + assert profile['max_file_size'] >= 135.730997085571 + assert profile['min_file_size'] >= 0.00743961334228515 + assert profile['total_file_size'] >= 269.3791465759277 + + # read tables + t_stats = [ + {"n_rows": 8, "n_columns": 11}, + {"n_rows": 424, "n_columns": 11}, + {"n_rows": 9336, "n_columns": 12}, + {"n_rows": 7, "n_columns": 11}, + {"n_rows": 1353, "n_columns": 11}, + ] + for i in range(5): + table, retries = data_access.get_table(path=files[i]) + assert table.num_rows == t_stats[i]["n_rows"] + assert table.num_columns == t_stats[i]["n_columns"] + if i == 0: + data, _ = data_access.get_file(path=files[i]) + # write to data set + output_file = data_access.get_output_location(files[i]) + res, _ = data_access.save_file(path=output_file, data=data) + assert res is None + + # get random set of files + random = data_access.get_random_file_set(n_samples=5, files=files) + assert len(random) == 5 + + +def test_hf_data_access_sets(): + """ + Testing data access of HF data access with data sets + :return: None + """ + data_access = DataAccessHF(hf_config=hf_conf, d_sets=["aai_Latn", "aba_Latn"]) + # get files to process + files, profile, retries = data_access.get_files_to_process() + assert len(files) == 4 + assert profile['max_file_size'] >= 0.501188278198242 + assert profile['min_file_size'] >= 0.00965785980224609 + assert profile['total_file_size'] >= 0.620627403259277 + + +def test_data_set_card(): + """ + Testing data set card access + :return: None + """ + # read the card + data_access = DataAccessHF(hf_config=hf_conf) + card = data_access.get_dataset_card(ds_name="blublinsky/test") + assert card.data.license == 'apache-2.0' + # update it + data = card.data.to_dict() + data["extension"] = "my_extension" + card.data = CardData(**data) + content = card.content + # save a new card (readme) + try: + data_access.update_data_set_card(ds_name="blublinsky/test", content=content) + # read it back + card = data_access.get_dataset_card(ds_name="blublinsky/test") + assert card.data.extension == "my_extension" + except Exception as e: + print(f"Exception updating card {e}. Did you specify hf_token?") diff --git a/data-processing-lib/ray/src/data_processing_ray/runtime/ray/ray_utils.py b/data-processing-lib/ray/src/data_processing_ray/runtime/ray/ray_utils.py index ff6e53892..9b2503cbb 100644 --- a/data-processing-lib/ray/src/data_processing_ray/runtime/ray/ray_utils.py +++ b/data-processing-lib/ray/src/data_processing_ray/runtime/ray/ray_utils.py @@ -37,10 +37,10 @@ class RayUtils: @staticmethod def get_available_resources( - available_cpus_gauge: Gauge = None, - available_gpus_gauge: Gauge = None, - available_memory_gauge: Gauge = None, - object_memory_gauge: Gauge = None, + available_cpus_gauge: Gauge = None, + available_gpus_gauge: Gauge = None, + available_memory_gauge: Gauge = None, + object_memory_gauge: Gauge = None, ) -> dict[str, Any]: """ Get currently available cluster resources @@ -98,7 +98,7 @@ def get_available_nodes(available_nodes_gauge: Gauge = None) -> int: @staticmethod def create_actors( - clazz: type, params: dict[str, Any], actor_options: dict[str, Any], n_actors: int, creation_delay: int = 0 + clazz: type, params: dict[str, Any], actor_options: dict[str, Any], n_actors: int, creation_delay: int = 0 ) -> list[ActorHandle]: """ Create a set of actors @@ -130,16 +130,16 @@ def operator() -> ActorHandle: @staticmethod def process_files( - executors: ActorPool, - files: list[str], - print_interval: int, - files_in_progress_gauge: Gauge, - files_completed_gauge: Gauge, - available_cpus_gauge: Gauge, - available_gpus_gauge: Gauge, - available_memory_gauge: Gauge, - object_memory_gauge: Gauge, - logger: logging.Logger, + executors: ActorPool, + files: list[str], + print_interval: int, + files_in_progress_gauge: Gauge, + files_completed_gauge: Gauge, + available_cpus_gauge: Gauge, + available_gpus_gauge: Gauge, + available_memory_gauge: Gauge, + object_memory_gauge: Gauge, + logger: logging.Logger, ) -> int: """ Process files @@ -251,4 +251,4 @@ def wait_for_execution_completion(logger: logging.Logger, replies: list[ray.Obje actor_failures += 1 not_ready = replies - 1 replies = not_ready - return actor_failures + return actor_failures \ No newline at end of file