diff --git a/flytekit/core/context_manager.py b/flytekit/core/context_manager.py index f3ed8a6026..63914c13b2 100644 --- a/flytekit/core/context_manager.py +++ b/flytekit/core/context_manager.py @@ -202,12 +202,10 @@ def raw_output_prefix(self) -> str: return self._raw_output_prefix @property - def working_directory(self) -> utils.AutoDeletingTempDir: + def working_directory(self) -> str: """ A handle to a special working directory for easily producing temporary files. - TODO: Usage examples - TODO: This does not always return a AutoDeletingTempDir """ return self._working_directory diff --git a/flytekit/core/data_persistence.py b/flytekit/core/data_persistence.py index 7b4cfcf9ad..61ba28925d 100644 --- a/flytekit/core/data_persistence.py +++ b/flytekit/core/data_persistence.py @@ -25,11 +25,10 @@ import pathlib import tempfile import typing -from typing import Union +from typing import Union, cast from uuid import UUID import fsspec -from fsspec.core import strip_protocol from fsspec.utils import get_protocol from flytekit import configuration @@ -95,7 +94,7 @@ def __init__( self._data_config = data_config if data_config else DataConfig.auto() self._default_protocol = get_protocol(raw_output_prefix) - self._default_remote = self.get_filesystem(self._default_protocol) + self._default_remote = cast(fsspec.AbstractFileSystem, self.get_filesystem(self._default_protocol)) if os.name == "nt" and raw_output_prefix.startswith("file://"): raise FlyteAssertion("Cannot use the file:// prefix on Windows.") self._raw_output_prefix = ( @@ -113,11 +112,11 @@ def data_config(self) -> DataConfig: return self._data_config def get_filesystem( - self, protocol: str = None, anonymous: bool = False + self, protocol: typing.Optional[str] = None, anonymous: bool = False ) -> typing.Optional[fsspec.AbstractFileSystem]: if not protocol: return self._default_remote - kwargs = {} + kwargs = {} # type: typing.Dict[str, typing.Any] if protocol == "file": kwargs = {"auto_mkdir": True} elif protocol == "s3": @@ -134,9 +133,9 @@ def get_filesystem( return fsspec.filesystem(protocol, **kwargs) # type: ignore - def get_filesystem_for_path(self, path: str = "") -> fsspec.AbstractFileSystem: + def get_filesystem_for_path(self, path: str = "", anonymous: bool = False) -> fsspec.AbstractFileSystem: protocol = get_protocol(path) - return self.get_filesystem(protocol) + return self.get_filesystem(protocol, anonymous=anonymous) @staticmethod def is_remote(path: Union[str, os.PathLike]) -> bool: @@ -322,7 +321,7 @@ def put_data(self, local_path: Union[str, os.PathLike], remote_path: str, is_mul """ try: with PerformanceTimer(f"Writing ({local_path} -> {remote_path})"): - self.put(local_path, remote_path, recursive=is_multipart) + self.put(cast(str, local_path), remote_path, recursive=is_multipart) except Exception as ex: raise FlyteAssertion( f"Failed to put data from {local_path} to {remote_path} (recursive={is_multipart}).\n\n" diff --git a/flytekit/remote/remote.py b/flytekit/remote/remote.py index 03cc9a66e9..37baacef70 100644 --- a/flytekit/remote/remote.py +++ b/flytekit/remote/remote.py @@ -6,17 +6,20 @@ from __future__ import annotations import base64 -import functools import hashlib +import importlib import os import pathlib +import tempfile import time import typing import uuid +from base64 import b64encode from collections import OrderedDict from dataclasses import asdict, dataclass from datetime import datetime, timedelta +import requests from flyteidl.admin.signal_pb2 import Signal, SignalListRequest, SignalSetRequest from flyteidl.core import literals_pb2 as literals_pb2 @@ -31,10 +34,15 @@ from flytekit.core.launch_plan import LaunchPlan from flytekit.core.python_auto_container import PythonAutoContainerTask from flytekit.core.reference_entity import ReferenceSpec +from flytekit.core.tracker import get_full_module_path from flytekit.core.type_engine import LiteralsResolver, TypeEngine from flytekit.core.workflow import WorkflowBase from flytekit.exceptions import user as user_exceptions -from flytekit.exceptions.user import FlyteEntityAlreadyExistsException, FlyteEntityNotExistException +from flytekit.exceptions.user import ( + FlyteEntityAlreadyExistsException, + FlyteEntityNotExistException, + FlyteValueException, +) from flytekit.loggers import remote_logger from flytekit.models import common as common_models from flytekit.models import filters as filter_models @@ -62,7 +70,7 @@ from flytekit.remote.lazy_entity import LazyEntity from flytekit.remote.remote_callable import RemoteEntity from flytekit.tools.fast_registration import fast_package -from flytekit.tools.script_mode import fast_register_single_script, hash_file +from flytekit.tools.script_mode import compress_single_script, hash_file from flytekit.tools.translator import ( FlyteControlPlaneEntity, FlyteLocalEntity, @@ -728,7 +736,23 @@ def _upload_file( content_md5=md5_bytes, filename=to_upload.name, ) - self._ctx.file_access.put_data(str(to_upload), upload_location.signed_url) + + encoded_md5 = b64encode(md5_bytes) + with open(str(to_upload), "+rb") as local_file: + content = local_file.read() + content_length = len(content) + rsp = requests.put( + upload_location.signed_url, + data=content, + headers={"Content-Length": str(content_length), "Content-MD5": encoded_md5}, + ) + + if rsp.status_code != requests.codes["OK"]: + raise FlyteValueException( + rsp.status_code, + f"Request to send data {upload_location.signed_url} failed.", + ) + remote_logger.debug( f"Uploading {to_upload} to {upload_location.signed_url} native url {upload_location.native_url}" ) @@ -795,16 +819,14 @@ def register_script( if image_config is None: image_config = ImageConfig.auto_default_image() - upload_location, md5_bytes = fast_register_single_script( - source_path, - module_name, - functools.partial( - self.client.get_upload_signed_url, - project=project or self.default_project, - domain=domain or self.default_domain, - filename="scriptmode.tar.gz", - ), - ) + with tempfile.TemporaryDirectory() as tmp_dir: + archive_fname = pathlib.Path(os.path.join(tmp_dir, "script_mode.tar.gz")) + mod = importlib.import_module(module_name) + compress_single_script(source_path, str(archive_fname), get_full_module_path(mod, mod.__name__)) + md5_bytes, upload_native_url = self._upload_file( + archive_fname, project or self.default_project, domain or self.default_domain + ) + serialization_settings = SerializationSettings( project=project, domain=domain, @@ -813,7 +835,7 @@ def register_script( fast_serialization_settings=FastSerializationSettings( enabled=True, destination_dir=destination_dir, - distribution_location=upload_location.native_url, + distribution_location=upload_native_url, ), ) diff --git a/flytekit/tools/script_mode.py b/flytekit/tools/script_mode.py index 29b617824c..1f3e31a382 100644 --- a/flytekit/tools/script_mode.py +++ b/flytekit/tools/script_mode.py @@ -1,6 +1,5 @@ import gzip import hashlib -import importlib import os import shutil import tarfile @@ -8,11 +7,6 @@ import typing from pathlib import Path -from flyteidl.service import dataproxy_pb2 as _data_proxy_pb2 - -from flytekit.core import context_manager -from flytekit.core.tracker import get_full_module_path - def compress_single_script(source_path: str, destination: str, full_module_name: str): """ @@ -96,24 +90,6 @@ def tar_strip_file_attributes(tar_info: tarfile.TarInfo) -> tarfile.TarInfo: return tar_info -def fast_register_single_script( - source_path: str, module_name: str, create_upload_location_fn: typing.Callable -) -> (_data_proxy_pb2.CreateUploadLocationResponse, bytes): - - # Open a temp directory and dump the contents of the digest. - with tempfile.TemporaryDirectory() as tmp_dir: - archive_fname = os.path.join(tmp_dir, "script_mode.tar.gz") - mod = importlib.import_module(module_name) - compress_single_script(source_path, archive_fname, get_full_module_path(mod, mod.__name__)) - - flyte_ctx = context_manager.FlyteContextManager.current_context() - md5, _ = hash_file(archive_fname) - upload_location = create_upload_location_fn(content_md5=md5) - flyte_ctx.file_access.put_data(archive_fname, upload_location.signed_url) - - return upload_location, md5 - - def hash_file(file_path: typing.Union[os.PathLike, str]) -> (bytes, str): """ Hash a file and produce a digest to be used as a version diff --git a/flytekit/types/structured/basic_dfs.py b/flytekit/types/structured/basic_dfs.py index 39f8d11e24..ae3e8a00d9 100644 --- a/flytekit/types/structured/basic_dfs.py +++ b/flytekit/types/structured/basic_dfs.py @@ -1,12 +1,18 @@ import os import typing +from pathlib import Path from typing import TypeVar import pandas as pd import pyarrow as pa import pyarrow.parquet as pq +from botocore.exceptions import NoCredentialsError +from fsspec.core import split_protocol, strip_protocol +from fsspec.utils import get_protocol -from flytekit import FlyteContext +from flytekit import FlyteContext, logger +from flytekit.configuration import DataConfig +from flytekit.core.data_persistence import s3_setup_args from flytekit.deck import TopFrameRenderer from flytekit.deck.renderer import ArrowRenderer from flytekit.models import literals @@ -23,6 +29,15 @@ T = TypeVar("T") +def get_storage_options(cfg: DataConfig, uri: str, anon: bool = False) -> typing.Optional[typing.Dict]: + protocol = get_protocol(uri) + if protocol == "s3": + kwargs = s3_setup_args(cfg.s3, anon) + if kwargs: + return kwargs + return None + + class PandasToParquetEncodingHandler(StructuredDatasetEncoder): def __init__(self): super().__init__(pd.DataFrame, None, PARQUET) @@ -33,6 +48,26 @@ def encode( structured_dataset: StructuredDataset, structured_dataset_type: StructuredDatasetType, ) -> literals.StructuredDataset: + uri = typing.cast(str, structured_dataset.uri) or ctx.file_access.get_random_remote_directory() + if not ctx.file_access.is_remote(uri): + Path(uri).mkdir(parents=True, exist_ok=True) + path = os.path.join(uri, f"{0:05}") + df = typing.cast(pd.DataFrame, structured_dataset.dataframe) + df.to_parquet( + path, + coerce_timestamps="us", + allow_truncated_timestamps=False, + storage_options=get_storage_options(ctx.file_access.data_config, path), + ) + structured_dataset_type.format = PARQUET + return literals.StructuredDataset(uri=uri, metadata=StructuredDatasetMetadata(structured_dataset_type)) + + def ddencode( + self, + ctx: FlyteContext, + structured_dataset: StructuredDataset, + structured_dataset_type: StructuredDatasetType, + ) -> literals.StructuredDataset: path = typing.cast(str, structured_dataset.uri) or ctx.file_access.get_random_remote_directory() df = typing.cast(pd.DataFrame, structured_dataset.dataframe) @@ -53,6 +88,24 @@ def decode( ctx: FlyteContext, flyte_value: literals.StructuredDataset, current_task_metadata: StructuredDatasetMetadata, + ) -> pd.DataFrame: + uri = flyte_value.uri + columns = None + kwargs = get_storage_options(ctx.file_access.data_config, uri) + if current_task_metadata.structured_dataset_type and current_task_metadata.structured_dataset_type.columns: + columns = [c.name for c in current_task_metadata.structured_dataset_type.columns] + try: + return pd.read_parquet(uri, columns=columns, storage_options=kwargs) + except NoCredentialsError: + logger.debug("S3 source detected, attempting anonymous S3 access") + kwargs = get_storage_options(ctx.file_access.data_config, uri, anon=True) + return pd.read_parquet(uri, columns=columns, storage_options=kwargs) + + def dcccecode( + self, + ctx: FlyteContext, + flyte_value: literals.StructuredDataset, + current_task_metadata: StructuredDatasetMetadata, ) -> pd.DataFrame: path = flyte_value.uri local_dir = ctx.file_access.get_random_local_directory() @@ -73,13 +126,13 @@ def encode( structured_dataset: StructuredDataset, structured_dataset_type: StructuredDatasetType, ) -> literals.StructuredDataset: - path = typing.cast(str, structured_dataset.uri) or ctx.file_access.get_random_remote_path() - df = structured_dataset.dataframe - local_dir = ctx.file_access.get_random_local_directory() - local_path = os.path.join(local_dir, f"{0:05}") - pq.write_table(df, local_path) - ctx.file_access.upload_directory(local_dir, path) - return literals.StructuredDataset(uri=path, metadata=StructuredDatasetMetadata(structured_dataset_type)) + uri = typing.cast(str, structured_dataset.uri) or ctx.file_access.get_random_remote_directory() + if not ctx.file_access.is_remote(uri): + Path(uri).mkdir(parents=True, exist_ok=True) + path = os.path.join(uri, f"{0:05}") + filesystem = ctx.file_access.get_filesystem_for_path(path) + pq.write_table(structured_dataset.dataframe, strip_protocol(path), filesystem=filesystem) + return literals.StructuredDataset(uri=uri, metadata=StructuredDatasetMetadata(structured_dataset_type)) class ParquetToArrowDecodingHandler(StructuredDatasetDecoder): @@ -92,13 +145,23 @@ def decode( flyte_value: literals.StructuredDataset, current_task_metadata: StructuredDatasetMetadata, ) -> pa.Table: - path = flyte_value.uri - local_dir = ctx.file_access.get_random_local_directory() - ctx.file_access.get_data(path, local_dir, is_multipart=True) + uri = flyte_value.uri + if not ctx.file_access.is_remote(uri): + Path(uri).parent.mkdir(parents=True, exist_ok=True) + _, path = split_protocol(uri) + + columns = None if current_task_metadata.structured_dataset_type and current_task_metadata.structured_dataset_type.columns: columns = [c.name for c in current_task_metadata.structured_dataset_type.columns] - return pq.read_table(local_dir, columns=columns) - return pq.read_table(local_dir) + try: + fs = ctx.file_access.get_filesystem_for_path(uri) + return pq.read_table(path, filesystem=fs, columns=columns) + except NoCredentialsError as e: + logger.debug("S3 source detected, attempting anonymous S3 access") + fs = ctx.file_access.get_filesystem_for_path(uri, anonymous=True) + if fs is not None: + return pq.read_table(path, filesystem=fs, columns=columns) + raise e StructuredDatasetTransformerEngine.register(PandasToParquetEncodingHandler(), default_format_for_type=True) diff --git a/plugins/flytekit-data-fsspec/flytekitplugins/fsspec/__init__.py b/plugins/flytekit-data-fsspec/flytekitplugins/fsspec/__init__.py index 68ee456ed6..c4da1b7d0d 100644 --- a/plugins/flytekit-data-fsspec/flytekitplugins/fsspec/__init__.py +++ b/plugins/flytekit-data-fsspec/flytekitplugins/fsspec/__init__.py @@ -8,7 +8,6 @@ :toctree: generated/ ArrowToParquetEncodingHandler - FSSpecPersistence PandasToParquetEncodingHandler ParquetToArrowDecodingHandler ParquetToPandasDecodingHandler @@ -16,7 +15,6 @@ __all__ = [ "ArrowToParquetEncodingHandler", - "FSSpecPersistence", "PandasToParquetEncodingHandler", "ParquetToArrowDecodingHandler", "ParquetToPandasDecodingHandler", @@ -28,7 +26,6 @@ from .arrow import ArrowToParquetEncodingHandler, ParquetToArrowDecodingHandler from .pandas import PandasToParquetEncodingHandler, ParquetToPandasDecodingHandler -from .persist import FSSpecPersistence S3 = "s3" ABFS = "abfs" diff --git a/plugins/flytekit-data-fsspec/flytekitplugins/fsspec/arrow.py b/plugins/flytekit-data-fsspec/flytekitplugins/fsspec/arrow.py deleted file mode 100644 index ec8d5f975e..0000000000 --- a/plugins/flytekit-data-fsspec/flytekitplugins/fsspec/arrow.py +++ /dev/null @@ -1,70 +0,0 @@ -import os -import typing -from pathlib import Path - -import pyarrow as pa -import pyarrow.parquet as pq -from botocore.exceptions import NoCredentialsError -from flytekitplugins.fsspec.persist import FSSpecPersistence -from fsspec.core import split_protocol, strip_protocol - -from flytekit import FlyteContext, logger -from flytekit.models import literals -from flytekit.models.literals import StructuredDatasetMetadata -from flytekit.models.types import StructuredDatasetType -from flytekit.types.structured.structured_dataset import ( - PARQUET, - StructuredDataset, - StructuredDatasetDecoder, - StructuredDatasetEncoder, -) - - -class ArrowToParquetEncodingHandler(StructuredDatasetEncoder): - def __init__(self, protocol: str): - super().__init__(pa.Table, protocol, PARQUET) - - def encode( - self, - ctx: FlyteContext, - structured_dataset: StructuredDataset, - structured_dataset_type: StructuredDatasetType, - ) -> literals.StructuredDataset: - uri = typing.cast(str, structured_dataset.uri) or ctx.file_access.get_random_remote_directory() - if not ctx.file_access.is_remote(uri): - Path(uri).mkdir(parents=True, exist_ok=True) - path = os.path.join(uri, f"{0:05}") - fp = FSSpecPersistence(data_config=ctx.file_access.data_config) - filesystem = fp.get_filesystem(path) - pq.write_table(structured_dataset.dataframe, strip_protocol(path), filesystem=filesystem) - return literals.StructuredDataset(uri=uri, metadata=StructuredDatasetMetadata(structured_dataset_type)) - - -class ParquetToArrowDecodingHandler(StructuredDatasetDecoder): - def __init__(self, protocol: str): - super().__init__(pa.Table, protocol, PARQUET) - - def decode( - self, - ctx: FlyteContext, - flyte_value: literals.StructuredDataset, - current_task_metadata: StructuredDatasetMetadata, - ) -> pa.Table: - uri = flyte_value.uri - if not ctx.file_access.is_remote(uri): - Path(uri).parent.mkdir(parents=True, exist_ok=True) - _, path = split_protocol(uri) - - columns = None - if current_task_metadata.structured_dataset_type and current_task_metadata.structured_dataset_type.columns: - columns = [c.name for c in current_task_metadata.structured_dataset_type.columns] - try: - fp = FSSpecPersistence(data_config=ctx.file_access.data_config) - fs = fp.get_filesystem(uri) - return pq.read_table(path, filesystem=fs, columns=columns) - except NoCredentialsError as e: - logger.debug("S3 source detected, attempting anonymous S3 access") - fs = FSSpecPersistence.get_anonymous_filesystem(uri) - if fs is not None: - return pq.read_table(path, filesystem=fs, columns=columns) - raise e diff --git a/plugins/flytekit-data-fsspec/flytekitplugins/fsspec/pandas.py b/plugins/flytekit-data-fsspec/flytekitplugins/fsspec/pandas.py deleted file mode 100644 index e4986ed9f6..0000000000 --- a/plugins/flytekit-data-fsspec/flytekitplugins/fsspec/pandas.py +++ /dev/null @@ -1,76 +0,0 @@ -import os -import typing -from pathlib import Path - -import pandas as pd -from botocore.exceptions import NoCredentialsError -from flytekitplugins.fsspec.persist import FSSpecPersistence, s3_setup_args - -from flytekit import FlyteContext, logger -from flytekit.configuration import DataConfig -from flytekit.models import literals -from flytekit.models.literals import StructuredDatasetMetadata -from flytekit.models.types import StructuredDatasetType -from flytekit.types.structured.structured_dataset import ( - PARQUET, - StructuredDataset, - StructuredDatasetDecoder, - StructuredDatasetEncoder, -) - - -def get_storage_options(cfg: DataConfig, uri: str) -> typing.Optional[typing.Dict]: - protocol = FSSpecPersistence.get_protocol(uri) - if protocol == "s3": - kwargs = s3_setup_args(cfg.s3) - if kwargs: - return kwargs - return None - - -class PandasToParquetEncodingHandler(StructuredDatasetEncoder): - def __init__(self, protocol: str): - super().__init__(pd.DataFrame, protocol, PARQUET) - - def encode( - self, - ctx: FlyteContext, - structured_dataset: StructuredDataset, - structured_dataset_type: StructuredDatasetType, - ) -> literals.StructuredDataset: - uri = typing.cast(str, structured_dataset.uri) or ctx.file_access.get_random_remote_directory() - if not ctx.file_access.is_remote(uri): - Path(uri).mkdir(parents=True, exist_ok=True) - path = os.path.join(uri, f"{0:05}") - df = typing.cast(pd.DataFrame, structured_dataset.dataframe) - df.to_parquet( - path, - coerce_timestamps="us", - allow_truncated_timestamps=False, - storage_options=get_storage_options(ctx.file_access.data_config, path), - ) - structured_dataset_type.format = PARQUET - return literals.StructuredDataset(uri=uri, metadata=StructuredDatasetMetadata(structured_dataset_type)) - - -class ParquetToPandasDecodingHandler(StructuredDatasetDecoder): - def __init__(self, protocol: str): - super().__init__(pd.DataFrame, protocol, PARQUET) - - def decode( - self, - ctx: FlyteContext, - flyte_value: literals.StructuredDataset, - current_task_metadata: StructuredDatasetMetadata, - ) -> pd.DataFrame: - uri = flyte_value.uri - columns = None - kwargs = get_storage_options(ctx.file_access.data_config, uri) - if current_task_metadata.structured_dataset_type and current_task_metadata.structured_dataset_type.columns: - columns = [c.name for c in current_task_metadata.structured_dataset_type.columns] - try: - return pd.read_parquet(uri, columns=columns, storage_options=kwargs) - except NoCredentialsError: - logger.debug("S3 source detected, attempting anonymous S3 access") - kwargs["anon"] = True - return pd.read_parquet(uri, columns=columns, storage_options=kwargs) diff --git a/plugins/flytekit-data-fsspec/flytekitplugins/fsspec/persist.py b/plugins/flytekit-data-fsspec/flytekitplugins/fsspec/persist.py deleted file mode 100644 index 4fe1b22baa..0000000000 --- a/plugins/flytekit-data-fsspec/flytekitplugins/fsspec/persist.py +++ /dev/null @@ -1,149 +0,0 @@ -import os -import typing - -import fsspec -from fsspec.registry import known_implementations - -from flytekit.configuration import DataConfig, S3Config -from flytekit.extend import DataPersistence, DataPersistencePlugins -from flytekit.loggers import logger - -S3_ACCESS_KEY_ID_ENV_NAME = "AWS_ACCESS_KEY_ID" -S3_SECRET_ACCESS_KEY_ENV_NAME = "AWS_SECRET_ACCESS_KEY" - -# Refer to https://github.com/fsspec/s3fs/blob/50bafe4d8766c3b2a4e1fc09669cf02fb2d71454/s3fs/core.py#L198 -# for key and secret -_FSSPEC_S3_KEY_ID = "key" -_FSSPEC_S3_SECRET = "secret" - - -def s3_setup_args(s3_cfg: S3Config): - kwargs = {} - if S3_ACCESS_KEY_ID_ENV_NAME not in os.environ: - if s3_cfg.access_key_id: - kwargs[_FSSPEC_S3_KEY_ID] = s3_cfg.access_key_id - - if S3_SECRET_ACCESS_KEY_ENV_NAME not in os.environ: - if s3_cfg.secret_access_key: - kwargs[_FSSPEC_S3_SECRET] = s3_cfg.secret_access_key - - # S3fs takes this as a special arg - if s3_cfg.endpoint is not None: - kwargs["client_kwargs"] = {"endpoint_url": s3_cfg.endpoint} - - return kwargs - - -class FSSpecPersistence(DataPersistence): - """ - This DataPersistence plugin uses fsspec to perform the IO. - NOTE: The put is not as performant as it can be for multiple files because of - - https://github.com/intake/filesystem_spec/issues/724. Once this bug is fixed, we can remove the `HACK` in the put - method - """ - - def __init__(self, default_prefix=None, data_config: typing.Optional[DataConfig] = None): - super(FSSpecPersistence, self).__init__(name="fsspec-persistence", default_prefix=default_prefix) - self.default_protocol = self.get_protocol(default_prefix) - self._data_cfg = data_config if data_config else DataConfig.auto() - - @staticmethod - def get_protocol(path: typing.Optional[str] = None): - if path: - return DataPersistencePlugins.get_protocol(path) - logger.info("Setting protocol to file") - return "file" - - def get_filesystem(self, path: str) -> fsspec.AbstractFileSystem: - protocol = FSSpecPersistence.get_protocol(path) - kwargs = {} - if protocol == "file": - kwargs = {"auto_mkdir": True} - elif protocol == "s3": - kwargs = s3_setup_args(self._data_cfg.s3) - return fsspec.filesystem(protocol, **kwargs) # type: ignore - - def get_anonymous_filesystem(self, path: str) -> typing.Optional[fsspec.AbstractFileSystem]: - protocol = FSSpecPersistence.get_protocol(path) - if protocol == "s3": - kwargs = s3_setup_args(self._data_cfg.s3) - anonymous_fs = fsspec.filesystem(protocol, anon=True, **kwargs) # type: ignore - return anonymous_fs - return None - - @staticmethod - def recursive_paths(f: str, t: str) -> typing.Tuple[str, str]: - if not f.endswith("*"): - f = os.path.join(f, "*") - if not t.endswith("/"): - t += "/" - return f, t - - def exists(self, path: str) -> bool: - try: - fs = self.get_filesystem(path) - return fs.exists(path) - except OSError as oe: - logger.debug(f"Error in exists checking {path} {oe}") - fs = self.get_anonymous_filesystem(path) - if fs is not None: - logger.debug("S3 source detected, attempting anonymous S3 exists check") - return fs.exists(path) - raise oe - - def get(self, from_path: str, to_path: str, recursive: bool = False): - fs = self.get_filesystem(from_path) - if recursive: - from_path, to_path = self.recursive_paths(from_path, to_path) - try: - return fs.get(from_path, to_path, recursive=recursive) - except OSError as oe: - logger.debug(f"Error in getting {from_path} to {to_path} rec {recursive} {oe}") - fs = self.get_anonymous_filesystem(from_path) - if fs is not None: - logger.debug("S3 source detected, attempting anonymous S3 access") - return fs.get(from_path, to_path, recursive=recursive) - raise oe - - def put(self, from_path: str, to_path: str, recursive: bool = False): - fs = self.get_filesystem(to_path) - if recursive: - from_path, to_path = self.recursive_paths(from_path, to_path) - # BEGIN HACK! - # Once https://github.com/intake/filesystem_spec/issues/724 is fixed, delete the special recursive handling - from fsspec.implementations.local import LocalFileSystem - from fsspec.utils import other_paths - - lfs = LocalFileSystem() - try: - lpaths = lfs.expand_path(from_path, recursive=recursive) - except FileNotFoundError: - # In some cases, there is no file in the original directory, so we just skip copying the file to the remote path - logger.debug(f"there is no file in the {from_path}") - return - rpaths = other_paths(lpaths, to_path) - for l, r in zip(lpaths, rpaths): - fs.put_file(l, r) - return - # END OF HACK!! - return fs.put(from_path, to_path, recursive=recursive) - - def construct_path(self, add_protocol: bool, add_prefix: bool, *paths) -> str: - path_list = list(paths) # make type check happy - if add_prefix: - path_list.insert(0, self.default_prefix) # type: ignore - path = "/".join(path_list) - if add_protocol: - return f"{self.default_protocol}://{path}" - return typing.cast(str, path) - - -def _register(): - logger.info("Registering fsspec known implementations and overriding all default implementations for persistence.") - DataPersistencePlugins.register_plugin("/", FSSpecPersistence, force=True) - for k, v in known_implementations.items(): - DataPersistencePlugins.register_plugin(f"{k}://", FSSpecPersistence, force=True) - - -# Registering all plugins -_register() diff --git a/plugins/flytekit-data-fsspec/setup.py b/plugins/flytekit-data-fsspec/setup.py index f622ea3d48..0a52dd9026 100644 --- a/plugins/flytekit-data-fsspec/setup.py +++ b/plugins/flytekit-data-fsspec/setup.py @@ -4,7 +4,7 @@ microlib_name = f"flytekitplugins-data-{PLUGIN_NAME}" -plugin_requires = ["flytekit>=1.3.0b2,<2.0.0", "fsspec>=2021.7.0", "botocore>=1.7.48", "pandas>=1.2.0"] +plugin_requires = [] __version__ = "0.0.0+develop" @@ -13,7 +13,7 @@ version=__version__, author="flyteorg", author_email="admin@flyte.org", - description="This package data-plugins for flytekit, that are powered by fsspec", + description="This is a deprecated plugin as of flytekit 1.5", url="https://github.com/flyteorg/flytekit/tree/master/plugins/flytekit-data-fsspec", long_description=open("README.md").read(), long_description_content_type="text/markdown", @@ -22,9 +22,9 @@ install_requires=plugin_requires, extras_require={ # https://github.com/fsspec/filesystem_spec/blob/master/setup.py#L36 - "abfs": ["adlfs>=2022.2.0"], - "aws": ["s3fs>=2021.7.0"], - "gcp": ["gcsfs>=2021.7.0"], + "abfs": [], + "aws": [], + "gcp": [], }, license="apache2", python_requires=">=3.8", diff --git a/plugins/flytekit-data-fsspec/tests/test_basic_dfs.py b/plugins/flytekit-data-fsspec/tests/test_basic_dfs.py deleted file mode 100644 index 434a763a93..0000000000 --- a/plugins/flytekit-data-fsspec/tests/test_basic_dfs.py +++ /dev/null @@ -1,44 +0,0 @@ -import pandas as pd -import pyarrow as pa -from flytekitplugins.fsspec.pandas import get_storage_options - -from flytekit import kwtypes, task -from flytekit.configuration import DataConfig, S3Config - -try: - from typing import Annotated -except ImportError: - from typing_extensions import Annotated - - -def test_get_storage_options(): - endpoint = "https://s3.amazonaws.com" - - options = get_storage_options(DataConfig(s3=S3Config(endpoint=endpoint)), "s3://bucket/somewhere") - assert options == {"client_kwargs": {"endpoint_url": endpoint}} - - options = get_storage_options(DataConfig(), "/tmp/file") - assert options is None - - -cols = kwtypes(Name=str, Age=int) -subset_cols = kwtypes(Name=str) - - -@task -def t1( - df1: Annotated[pd.DataFrame, cols], df2: Annotated[pa.Table, cols] -) -> (Annotated[pd.DataFrame, subset_cols], Annotated[pa.Table, subset_cols]): - return df1, df2 - - -def test_structured_dataset_wf(): - pd_df = pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22]}) - pa_df = pa.Table.from_pandas(pd_df) - - subset_pd_df = pd.DataFrame({"Name": ["Tom", "Joseph"]}) - subset_pa_df = pa.Table.from_pandas(subset_pd_df) - - df1, df2 = t1(df1=pd_df, df2=pa_df) - assert df1.equals(subset_pd_df) - assert df2.equals(subset_pa_df) diff --git a/plugins/flytekit-data-fsspec/tests/test_persist.py b/plugins/flytekit-data-fsspec/tests/test_persist.py deleted file mode 100644 index 691201925b..0000000000 --- a/plugins/flytekit-data-fsspec/tests/test_persist.py +++ /dev/null @@ -1,134 +0,0 @@ -import os -import pathlib -import tempfile - -from flytekitplugins.fsspec.persist import FSSpecPersistence, s3_setup_args -from fsspec.implementations.local import LocalFileSystem - -from flytekit.configuration import S3Config - - -def test_s3_setup_args(): - kwargs = s3_setup_args(S3Config()) - assert kwargs == {} - - kwargs = s3_setup_args(S3Config(endpoint="http://localhost:30084")) - assert kwargs == {"client_kwargs": {"endpoint_url": "http://localhost:30084"}} - - kwargs = s3_setup_args(S3Config(access_key_id="access")) - assert kwargs == {"key": "access"} - - -def test_get_protocol(): - assert FSSpecPersistence.get_protocol("s3://abc") == "s3" - assert FSSpecPersistence.get_protocol("/abc") == "file" - assert FSSpecPersistence.get_protocol("file://abc") == "file" - assert FSSpecPersistence.get_protocol("gs://abc") == "gs" - assert FSSpecPersistence.get_protocol("sftp://abc") == "sftp" - assert FSSpecPersistence.get_protocol("abfs://abc") == "abfs" - - -def test_get_anonymous_filesystem(): - fp = FSSpecPersistence() - fs = fp.get_anonymous_filesystem("/abc") - assert fs is None - fs = fp.get_anonymous_filesystem("s3://abc") - assert fs is not None - assert fs.protocol == ["s3", "s3a"] - - -def test_get_filesystem(): - fp = FSSpecPersistence() - fs = fp.get_filesystem("/abc") - assert fs is not None - assert isinstance(fs, LocalFileSystem) - - -def test_recursive_paths(): - f, t = FSSpecPersistence.recursive_paths("/tmp", "/tmp") - assert (f, t) == ("/tmp/*", "/tmp/") - f, t = FSSpecPersistence.recursive_paths("/tmp/", "/tmp/") - assert (f, t) == ("/tmp/*", "/tmp/") - f, t = FSSpecPersistence.recursive_paths("/tmp/*", "/tmp") - assert (f, t) == ("/tmp/*", "/tmp/") - - -def test_exists(): - fs = FSSpecPersistence() - assert not fs.exists("/tmp/non-existent") - - with tempfile.TemporaryDirectory() as tdir: - f = os.path.join(tdir, "f.txt") - with open(f, "w") as fp: - fp.write("hello") - - assert fs.exists(f) - - -def test_get(): - fs = FSSpecPersistence() - with tempfile.TemporaryDirectory() as tdir: - f = os.path.join(tdir, "f.txt") - with open(f, "w") as fp: - fp.write("hello") - - t = os.path.join(tdir, "t.txt") - - fs.get(f, t) - with open(t, "r") as fp: - assert fp.read() == "hello" - - -def test_get_recursive(): - fs = FSSpecPersistence() - with tempfile.TemporaryDirectory() as tdir: - p = pathlib.Path(tdir) - d = p.joinpath("d") - d.mkdir() - f = d.joinpath(d, "f.txt") - with open(f, "w") as fp: - fp.write("hello") - - o = p.joinpath("o") - - t = o.joinpath(o, "f.txt") - fs.get(str(d), str(o), recursive=True) - with open(t, "r") as fp: - assert fp.read() == "hello" - - -def test_put(): - fs = FSSpecPersistence() - with tempfile.TemporaryDirectory() as tdir: - f = os.path.join(tdir, "f.txt") - with open(f, "w") as fp: - fp.write("hello") - - t = os.path.join(tdir, "t.txt") - - fs.put(f, t) - with open(t, "r") as fp: - assert fp.read() == "hello" - - -def test_put_recursive(): - fs = FSSpecPersistence() - with tempfile.TemporaryDirectory() as tdir: - p = pathlib.Path(tdir) - d = p.joinpath("d") - d.mkdir() - f = d.joinpath(d, "f.txt") - with open(f, "w") as fp: - fp.write("hello") - - o = p.joinpath("o") - - t = o.joinpath(o, "f.txt") - fs.put(str(d), str(o), recursive=True) - with open(t, "r") as fp: - assert fp.read() == "hello" - - -def test_construct_path(): - fs = FSSpecPersistence() - assert fs.construct_path(True, False, "abc") == "file://abc" diff --git a/tests/flytekit/unit/core/tracker/test_arrow_data.py b/tests/flytekit/unit/core/tracker/test_arrow_data.py new file mode 100644 index 0000000000..747e7f1651 --- /dev/null +++ b/tests/flytekit/unit/core/tracker/test_arrow_data.py @@ -0,0 +1,29 @@ +import typing + +import pandas as pd +import pyarrow as pa +from typing_extensions import Annotated + +from flytekit import kwtypes, task + +cols = kwtypes(Name=str, Age=int) +subset_cols = kwtypes(Name=str) + + +@task +def t1( + df1: Annotated[pd.DataFrame, cols], df2: Annotated[pa.Table, cols] +) -> typing.Tuple[Annotated[pd.DataFrame, subset_cols], Annotated[pa.Table, subset_cols]]: + return df1, df2 + + +def test_structured_dataset_wf(): + pd_df = pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22]}) + pa_df = pa.Table.from_pandas(pd_df) + + subset_pd_df = pd.DataFrame({"Name": ["Tom", "Joseph"]}) + subset_pa_df = pa.Table.from_pandas(subset_pd_df) + + df1, df2 = t1(df1=pd_df, df2=pa_df) + assert df1.equals(subset_pd_df) + assert df2.equals(subset_pa_df) diff --git a/tests/flytekit/unit/remote/test_remote.py b/tests/flytekit/unit/remote/test_remote.py index 4b8f82fb7e..5e20eaeee3 100644 --- a/tests/flytekit/unit/remote/test_remote.py +++ b/tests/flytekit/unit/remote/test_remote.py @@ -177,14 +177,6 @@ def test_more_stuff(mock_client): with tempfile.TemporaryDirectory() as tmp_dir: r._upload_file(pathlib.Path(tmp_dir)) - # Test that this copies the file. - with tempfile.TemporaryDirectory() as tmp_dir: - mm = MagicMock() - mm.signed_url = os.path.join(tmp_dir, "tmp_file") - mock_client.return_value.get_upload_signed_url.return_value = mm - - r._upload_file(pathlib.Path(__file__)) - serialization_settings = flytekit.configuration.SerializationSettings( project="project", domain="domain",