Skip to content

Commit

Permalink
refactor(pathlib): Replace os.path with pathlib (#1660)
Browse files Browse the repository at this point in the history
Signed-off-by: Chi-Sheng Liu <[email protected]>
Co-authored-by: Kevin Su <[email protected]>
  • Loading branch information
MortalHappiness and pingsutw authored May 26, 2024
1 parent 988de2d commit 69dbe48
Show file tree
Hide file tree
Showing 24 changed files with 74 additions and 82 deletions.
7 changes: 4 additions & 3 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@

# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
# documentation root, use pathlib.Path.resolve to make it absolute, like shown here.
#
import os
import re
import sys
from pathlib import Path

sys.path.insert(0, os.path.abspath("../"))
sys.path.append(os.path.abspath("./_ext"))
sys.path.insert(0, str(Path("../").resolve(strict=True)))
sys.path.append(str(Path("./_ext").resolve(strict=True)))

# -- Project information -----------------------------------------------------

Expand Down
9 changes: 3 additions & 6 deletions examples/data_types_and_io/data_types_and_io/file.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import csv
import os
from collections import defaultdict
from pathlib import Path
from typing import List

import flytekit
Expand Down Expand Up @@ -37,11 +37,8 @@ def normalize_columns(
normalized_data[colname] = [(x - mean) / std for x in values]

# write to local path
out_path = os.path.join(
flytekit.current_context().working_directory,
f"normalized-{os.path.basename(csv_url.path).rsplit('.')[0]}.csv",
)
with open(out_path, mode="w") as output_file:
out_path = Path(flytekit.current_context().working_directory) / f"normalized-{Path(csv_url.path).stem}.csv"
with out_path.open(mode="w") as output_file:
writer = csv.DictWriter(output_file, fieldnames=columns_to_normalize)
writer.writeheader()
for row in zip(*normalized_data.values()):
Expand Down
12 changes: 4 additions & 8 deletions examples/data_types_and_io/data_types_and_io/folder.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import csv
import os
import urllib.request
from collections import defaultdict
from pathlib import Path
Expand All @@ -15,17 +14,14 @@
@task
def download_files(csv_urls: List[str]) -> FlyteDirectory:
working_dir = flytekit.current_context().working_directory
local_dir = Path(os.path.join(working_dir, "csv_files"))
local_dir = Path(working_dir) / "csv_files"
local_dir.mkdir(exist_ok=True)

# get the number of digits needed to preserve the order of files in the local directory
zfill_len = len(str(len(csv_urls)))
for idx, remote_location in enumerate(csv_urls):
local_image = os.path.join(
# prefix the file name with the index location of the file in the original csv_urls list
local_dir,
f"{str(idx).zfill(zfill_len)}_{os.path.basename(remote_location)}",
)
# prefix the file name with the index location of the file in the original csv_urls list
local_image = Path(local_dir) / f"{str(idx).zfill(zfill_len)}_{Path(remote_location).name}"
urllib.request.urlretrieve(remote_location, local_image)
return FlyteDirectory(path=str(local_dir))

Expand Down Expand Up @@ -69,7 +65,7 @@ def normalize_all_files(
) -> FlyteDirectory:
for local_csv_file, column_names, columns_to_normalize in zip(
# make sure we sort the files in the directory to preserve the original order of the csv urls
[os.path.join(csv_files_dir, x) for x in sorted(os.listdir(csv_files_dir))],
list(sorted(Path(csv_files_dir).iterdir())),
columns_metadata,
columns_to_normalize_metadata,
):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os
import typing
from dataclasses import dataclass
from pathlib import Path

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -87,8 +87,8 @@ def encode(
table = pa.Table.from_arrays(df, name)
path = ctx.file_access.get_random_remote_directory()
local_dir = ctx.file_access.get_random_local_directory()
local_path = os.path.join(local_dir, f"{0:05}")
pq.write_table(table, local_path)
local_path = Path(local_dir) / f"{0:05}"
pq.write_table(table, str(local_path))
ctx.file_access.upload_directory(local_dir, path)
return literals.StructuredDataset(
uri=path,
Expand Down
4 changes: 2 additions & 2 deletions examples/dolt_plugin/dolt_plugin/dolt_branch_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
#
# In this example, we'll show how to use DoltTable along with Dolt's `Branch` feature.
# %%
import os
import sys
import typing
from pathlib import Path

import pandas as pd
from dolt_integrations.core import NewBranch
Expand All @@ -30,7 +30,7 @@
# statement to fetch data.

# %%
doltdb_path = os.path.join(os.path.dirname(__file__), "foo")
doltdb_path = str(Path(__file__).parent / "foo")


def generate_confs(a: int) -> typing.Tuple[DoltConfig, DoltConfig, DoltConfig]:
Expand Down
4 changes: 2 additions & 2 deletions examples/dolt_plugin/dolt_plugin/dolt_quickstart_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
# %% [markdown]
# First, let's import the libraries.
# %%
import os
import sys
from pathlib import Path

import pandas as pd
from flytekit import task, workflow
Expand All @@ -20,7 +20,7 @@
# %% [markdown]
# Next, we initialize Dolt's config.
# %%
doltdb_path = os.path.join(os.path.dirname(__file__), "foo")
doltdb_path = str(Path(__file__).parent / "foo")

rabbits_conf = DoltConfig(
db_path=doltdb_path,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
# %% [markdown]
# First, let's import the libraries we will use in this example.
# %%
import os
import pathlib

from flytekit import Resources, kwtypes, workflow
Expand All @@ -34,7 +33,7 @@
# %%
nb = NotebookTask(
name="pipeline-nb",
notebook_path=os.path.join(pathlib.Path(__file__).parent.absolute(), "supermarket_regression.ipynb"),
notebook_path=str(pathlib.Path(__file__).parent.absolute() / "supermarket_regression.ipynb"),
inputs=kwtypes(
n_estimators=int,
max_depth=int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
# %% [markdown]
# First, let's import the libraries we will use in this example.
# %%
import os
import pathlib
from dataclasses import dataclass

Expand Down Expand Up @@ -46,7 +45,7 @@ class Hyperparameters(object):
# %%
nb = NotebookTask(
name="eda-feature-eng-nb",
notebook_path=os.path.join(pathlib.Path(__file__).parent.absolute(), "supermarket_regression_1.ipynb"),
notebook_path=str(pathlib.Path(__file__).parent.absolute() / "supermarket_regression_1.ipynb"),
outputs=kwtypes(dummified_data=pd.DataFrame, dataset=str),
requests=Resources(mem="500Mi"),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
# %% [markdown]
# First, let's import the libraries we will use in this example.
# %%
import os
import pathlib

import pandas as pd
Expand All @@ -27,7 +26,7 @@
# %%
nb_1 = NotebookTask(
name="eda-featureeng-nb",
notebook_path=os.path.join(pathlib.Path(__file__).parent.absolute(), "supermarket_regression_1.ipynb"),
notebook_path=str(pathlib.Path(__file__).parent.absolute() / "supermarket_regression_1.ipynb"),
outputs=kwtypes(dummified_data=pd.DataFrame, dataset=str),
requests=Resources(mem="500Mi"),
)
Expand All @@ -40,10 +39,7 @@
# %%
nb_2 = NotebookTask(
name="regression-nb",
notebook_path=os.path.join(
pathlib.Path(__file__).parent.absolute(),
"supermarket_regression_2.ipynb",
),
notebook_path=str(pathlib.Path(__file__).parent.absolute() / "supermarket_regression_2.ipynb"),
inputs=kwtypes(
dataset=str,
n_estimators=int,
Expand Down
13 changes: 6 additions & 7 deletions examples/extending/extending/custom_types.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os
import tempfile
import typing
from pathlib import Path
from typing import Type

from flytekit import Blob, BlobMetadata, BlobType, FlyteContext, Literal, LiteralType, Scalar, task, workflow
Expand All @@ -13,26 +13,25 @@ class MyDataset(object):
``MyDataset`` is a collection of files. In Flyte, this maps to a multi-part blob or directory.
"""

def __init__(self, base_dir: str = None):
def __init__(self, base_dir: str | None = None):
if base_dir is None:
self._tmp_dir = tempfile.TemporaryDirectory()
self._base_dir = self._tmp_dir.name
self._files = []
else:
self._base_dir = base_dir
files = os.listdir(base_dir)
self._files = [os.path.join(base_dir, f) for f in files]
self._files = list(Path(base_dir).iterdir())

@property
def base_dir(self) -> str:
return self._base_dir

@property
def files(self) -> typing.List[str]:
def files(self) -> list[os.PathLike]:
return self._files

def new_file(self, name: str) -> str:
new_file = os.path.join(self._base_dir, name)
def new_file(self, name: str) -> os.PathLike:
new_file = Path(self._base_dir) / name
self._files.append(new_file)
return new_file

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import logging
import os
from datetime import datetime, timedelta
from pathlib import Path

import boto3
import flytekit
Expand Down Expand Up @@ -173,8 +174,8 @@ def store_offline(repo_config: RepoConfig, dataframe: StructuredDataset) -> Flyt
horse_colic_entity = Entity(name="Hospital Number")

ctx = flytekit.current_context()
data_dir = os.path.join(ctx.working_directory, "parquet-data")
os.makedirs(data_dir, exist_ok=True)
data_dir = Path(ctx.working_directory) / "parquet-data"
data_dir.mkdir(parents=True, exist_ok=True)

FlyteContext.current_context().file_access.get_data(
dataframe._literal_sd.uri + "/00000",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def download_data(dataset: str) -> FlyteDirectory:
print("==============")

working_dir = flytekit.current_context().working_directory
data_dir = pathlib.Path(os.path.join(working_dir, "data"))
data_dir = pathlib.Path(working_dir) / "data"
data_dir.mkdir(exist_ok=True)

# download the dataset
Expand Down Expand Up @@ -584,8 +584,8 @@ def act_sigmoid_scaled(x):
print("Best RMSPE: %f" % best_val_rmspe)

# save the trained model
keras_model.save(os.path.join(working_dir, hp.local_checkpoint_file))
print("Written checkpoint to %s" % os.path.join(working_dir, hp.local_checkpoint_file))
keras_model.save(pathlib.Path(working_dir) / hp.local_checkpoint_file)
print("Written checkpoint to %s" % (pathlib.Path(working_dir) / hp.local_checkpoint_file))
# the Estimator returns a Transformer representation of the trained model once training is complete
return keras_model

Expand All @@ -612,7 +612,7 @@ def test(
pred_df = pred_df.withColumn("Sales_pred", F.exp(pred_df.Sales_output))

submission_df = pred_df.select(pred_df.Id.cast(T.IntegerType()), pred_df.Sales_pred).toPandas()
submission_df.sort_values(by=["Id"]).to_csv(os.path.join(working_dir, hp.local_submission_csv), index=False)
submission_df.sort_values(by=["Id"]).to_csv(pathlib.Path(working_dir) / hp.local_submission_csv, index=False)
# predictions are saved to a CSV file.
print("Saved predictions to %s" % hp.local_submission_csv)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
# %% [markdown]
# First, let's import the required libraries.
# %%
import os
import typing
from pathlib import Path

import pandas as pd
from flytekit import Resources, kwtypes, task, workflow
Expand Down Expand Up @@ -65,7 +65,7 @@ def simple_task(csv_file: str) -> int:
# If the data validation fails, this will return a ValidationError.
result = simple_task_object(dataset=csv_file)
print(result)
df = pd.read_csv(os.path.join("greatexpectations", "data", csv_file))
df = pd.read_csv(Path("greatexpectations") / "data" / csv_file)
return df.shape[0]


Expand Down Expand Up @@ -205,7 +205,7 @@ def schema_wf() -> typing.List[str]:
# %%
@task
def runtime_to_df_task(csv_file: str) -> pd.DataFrame:
df = pd.read_csv(os.path.join("greatexpectations", "data", csv_file))
df = pd.read_csv(Path("greatexpectations") / "data" / csv_file)
return df


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# %% [markdown]
# First, let's import the required libraries.
# %%
import os
from pathlib import Path

import pandas as pd
from flytekit import Resources, task, workflow
Expand Down Expand Up @@ -191,7 +191,7 @@ def schema_wf() -> int:
# %%
@task
def runtime_to_df_task(csv_file: str) -> pd.DataFrame:
df = pd.read_csv(os.path.join("greatexpectations", "data", csv_file))
df = pd.read_csv(Path("greatexpectations") / "data" / csv_file)
return df


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@
# pip install xgboost
# ```

# %%
import os

# %% [markdown]
# First, let's import the required packages into the environment.
# %%
import typing

# %%
from pathlib import Path
from typing import Tuple

import flytekit
Expand Down Expand Up @@ -210,7 +210,7 @@ def fit(loc: str, train: pd.DataFrame, val: pd.DataFrame) -> JoblibSerializedFil
m.fit(x, y, eval_set=[(eval_x, eval_y)])

working_dir = flytekit.current_context().working_directory
fname = os.path.join(working_dir, f"model-{loc}.joblib.dat")
fname = str(Path(working_dir) / f"model-{loc}.joblib.dat")
joblib.dump(m, fname)

# return the serialized model
Expand Down
4 changes: 2 additions & 2 deletions examples/k8s_pod_plugin/k8s_pod_plugin/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
# %% [markdown]
# First, we import the necessary libraries for use in the following examples.
# %%
import os
import time
from pathlib import Path
from typing import List

from flytekit import Resources, TaskMetadata, dynamic, map_task, task, workflow
Expand Down Expand Up @@ -131,7 +131,7 @@ def pod_workflow() -> str:
)
def multiple_containers_pod_task() -> str:
# The code defined in this task will get injected into the primary container.
while not os.path.isfile(_SHARED_DATA_PATH):
while not Path(_SHARED_DATA_PATH).is_file():
time.sleep(5)

with open(_SHARED_DATA_PATH, "r") as shared_message_file:
Expand Down
3 changes: 1 addition & 2 deletions examples/kfmpi_plugin/kfmpi_plugin/mpi_mnist.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
# %% [markdown]
# To begin, import the necessary dependencies.
# %%
import os
import pathlib

import flytekit
Expand Down Expand Up @@ -131,7 +130,7 @@ def horovod_train_task(batch_size: int, buffer_size: int, dataset_size: int) ->
raise IgnoreOutputs("I am not rank 0")

working_dir = flytekit.current_context().working_directory
checkpoint_prefix = pathlib.Path(os.path.join(working_dir, "checkpoint"))
checkpoint_prefix = pathlib.Path(working_dir) / "checkpoint"
checkpoint.save(checkpoint_prefix)

tf.keras.models.save_model(
Expand Down
Loading

0 comments on commit 69dbe48

Please sign in to comment.