Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ray[default]==2.22.0] .write_parquet(s3_path, mode="overwrite") doesn't work properly #47799

Open
nthanapaisal opened this issue Sep 23, 2024 · 3 comments
Labels
bug Something that is supposed to be working; but isn't data Ray Data-related issues triage Needs triage (eg: priority, bug/not-bug, and owning component)

Comments

@nthanapaisal
Copy link

nthanapaisal commented Sep 23, 2024

What happened + What you expected to happen

Hello,

I am currently experiencing issue trying to overwrite an existing Parquet table in s3. I have tested the basic code below in snippet (1) and it works. However, what I am trying to implement a more complicated logic, it doesn't write but append to it instead(snippet 2).

I wonder why does the basic snippet works but not when I read using read_parquet, map_batches, write_parquet ?

Versions / Dependencies

ray[default]==2.22.0
ray cluster on top of k8s spark cluster using docker image
and also tried on Databricks ML g5.xlarge, 14LTS, spark 3.5.0 cluster

Reproduction script

Snipper 1:

import ray

ray.init()

data = {
    'column1': [1, 2, 5],
    'column2': ['a', 'b', 'h']
}

pandas_df = pd.DataFrame(data)

ray_df = ray.data.from_pandas(pandas_df)

s3_path = "s3://path/"

ray_df.write_parquet(s3_path, mode="overwrite")

Snipper 2:

import ray

ray.init()
input_df = ray.data.read_parquet("s3://path/input_data")
ray_df = input_df.map_batches(DummyClass,  concurrency=3, batch_size=64, num_gpus=1)
ray_df.write_parquet("s3://path/output", mode="overwrite")

class DummyClass:
    def __init__(self):
    def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, list]:
        # llm inferencing here
        return {
            "key1": batch["key1"],
            "key2": batch["key2"]
        }

Issue Severity

Medium: It is a significant difficulty but I can work around it.

@nthanapaisal nthanapaisal added bug Something that is supposed to be working; but isn't triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Sep 23, 2024
@nivibilla
Copy link

@nivibilla cc

@nthanapaisal
Copy link
Author

nthanapaisal commented Sep 23, 2024

more update:

  • I checked the doc, it doesn't even have mode as a parameter for write_parquet()??
  • secondly I am trying these two snippets: they are now overwriting the designated path but, they are overwriting only their own rows but not the whole table. see snippets below:

Snippet 3

import pandas as pd
import ray

ray.init()

data = {
    'column1': [1, 2, 7],
    'column2': ['a', 'b', 'l']
}

pandas_df = pd.DataFrame(data)

ray_df = ray.data.from_pandas(pandas_df)

ray_df.write_parquet("s3://path/ray-write-table3/", mode="overwrite")

ray.shutdown()

Snippet 4:

import ray

# Initialize Ray
ray.init()

from typing import Dict
import pandas as pd

class DummyClass:
    def __init__(self):
        new_val = "str"

    def __call__(self, s) -> Dict[str, list]:
        return {
        'column1': [1, 2, 8],
        'column2': ['a', 'b', 'p']
    }
        
input_df = ray.data.read_parquet("s3://path/ray-write-table3/")
predictions = input_df.map_batches(DummyClass, concurrency=1, batch_size=8, num_gpus=1)
predictions.write_parquet("s3://path/ray-write-table3/", mode="overwrite")

ray.shutdown()

Screenshot 2024-09-23 at 3 38 14 PM

as you can see here, if I rerun snippet 3 with different dict values, only row 1,2,3 get updated. same as if I rerun snippet 4, only row 4,5,6 get updated.

I am so confused. please help. I just want a way to overwrite the whole table...

@nthanapaisal
Copy link
Author

nthanapaisal commented Sep 23, 2024

more update:

if I read it from s3 and return batch["col_name"] instead of returning a hand coded dict {} ( snippet 5), it will behave the same as the first comment I have made. Looks like it is writing to its own .parquet but how does Ray know which .parquet to overwrite to and why can it not overwrite all parquet in that /dir?
snippet 5:

import ray
import numpy as np
# Initialize Ray
ray.init()

from typing import Dict
import pandas as pd
class DummyClass:
    def __init__(self):
        new_val = "str"

    def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, list]:
        return {
        'column1': batch["column1"],
        'column2': batch["column2"]
    }

        
input_df = ray.data.read_parquet("s3://path/ray-write-table3/")
predictions = input_df.map_batches(DummyClass, concurrency=1, batch_size=8, num_gpus=1)
print(predictions)
predictions.write_parquet("s3://path/ray-write-table3/", mode="overwrite")

ray.shutdown()

@anyscalesam anyscalesam added the data Ray Data-related issues label Sep 24, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something that is supposed to be working; but isn't data Ray Data-related issues triage Needs triage (eg: priority, bug/not-bug, and owning component)
Projects
None yet
Development

No branches or pull requests

3 participants