Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to use published datasets in a different client #2336

Closed
nikhilrajrs opened this issue Nov 5, 2018 · 51 comments · Fixed by #3729 · May be fixed by #2918
Closed

Unable to use published datasets in a different client #2336

nikhilrajrs opened this issue Nov 5, 2018 · 51 comments · Fixed by #3729 · May be fixed by #2918

Comments

@nikhilrajrs
Copy link

I get the error "Inputs contain futures that were created by another client" when I try to join a published dataset within a client different from the one that originally published it. The whole flow can be summarised as follows:

Client 1
df = dd.read_csv(...)
client.persist(df)
client.publish_dataset(ds_name=df)

Client 2
df = client.get_dataset("ds_name")
df2 = dd.read_csv(...)
df2.join(df)
client.persist(df2)

Is this expected behavior?

For reference, this check was added in the commit c02ea63#diff-96a27223dc91b5c9ea3d03684d79ad3f%5D which is part of the pull request #2227

@VMois
Copy link
Contributor

VMois commented Nov 11, 2018

Are your code for publishing dataset should looks like this:

df = dd.read_csv(...)  
df = client.persist(df) 
client.publish_dataset(my_dataset=df)  

Because client.persist() returns futures and you need to reference those futures for publishing dataset, but in your flow example you just call client.persist() without assigning it to any new variable. Please, check docs examples (https://distributed.readthedocs.io/en/latest/api.html#distributed.Client.publish_dataset) and add your actual code.

@nikhilrajrs
Copy link
Author

nikhilrajrs commented Nov 12, 2018

Sorry for being not precise enough.
The code I use is indeed:

names = dd.read_csv('/shared-store-path/names.csv')
names = names.set_index("ID")
names = client1.persist(names)
client1.publish_dataset(names=names)

I have attached a full example that can be used to reproduce the behavior. Shown below is the output from the notebook I used to test -

from dask.distributed import Client
from dask.distributed import wait
import dask.dataframe as dd

client1 = Client("scheduler-address:8786")
client2 = Client("scheduler-address:8786")
names = dd.read_csv('/shared-store-path/names.csv')
names = names.set_index("ID")
names = client1.persist(names)
wait(names)

DoneAndNotDoneFutures(done={<Future: status: finished, type: DataFrame, key: ('sort_index-6f9583110f3b77c24727c1e970735470', 0)>}, not_done=set())

client1.publish_dataset(names=names)
roles = dd.read_csv('/shared-store-path/roles.csv')
roles = roles.set_index("ID")

names_dataset = client2.get_dataset("names")

roles = roles.join(names_dataset)
roles.head()
Role Name
ID
1 Developer John
2 Developer Doe
3 Developer Johnny
4 Developer Doey
5 Tester Hello
client2.persist(roles)
---------------------------------------------------------------------------

ValueError                                Traceback (most recent call last)

<ipython-input-20-7cb62cd7bd89> in <module>()
----> 1 client2.persist(roles)


/misc/anaconda3-amp/lib/python3.6/site-packages/distributed/client.py in persist(self, collections, optimize_graph, workers, allow_other_workers, resources, retries, priority, fifo_timeout, actors, **kwargs)
   2561                                          user_priority=priority,
   2562                                          fifo_timeout=fifo_timeout,
-> 2563                                          actors=actors)
   2564 
   2565         postpersists = [c.__dask_postpersist__() for c in collections]


/misc/anaconda3-amp/lib/python3.6/site-packages/distributed/client.py in _graph_to_futures(self, dsk, keys, restrictions, loose_restrictions, priority, user_priority, resources, retries, fifo_timeout, actors)
   2210                     msg = ("Inputs contain futures that were created by "
   2211                            "another client.")
-> 2212                     raise ValueError(msg)
   2213 
   2214             if restrictions:


ValueError: Inputs contain futures that were created by another client.

testfiles.zip

@VMois
Copy link
Contributor

VMois commented Nov 12, 2018

Okay. Thanks for full example. I will try to run it tomorrow and see what will happen.

@VMois
Copy link
Contributor

VMois commented Nov 13, 2018

Actually, I have reproduced your error, but not using your code. It enough to publish a dataset with one client, then using second client (different process) get a dataset, modify something and if you will try to make client.persist() it will throw above error. I think it's a general error. My small example:

client = Client('localhost:8786')
client.restart()

df = dd.read_parquet('test_data/data.parquet')
df = client.persist(df)
client.publish_dataset(test=df)

Now, you can open a new terminal:

client = Client(...)
df = client.get_dataset('test')
new_df = df['some'] > 0.5
new_df = client.persist(new_df)
# error here...

Do you have any ideas @mrocklin ?

Python: 3.6.6
Dask: 0.20.1
Dask distributed: 1.24.1

@VMois
Copy link
Contributor

VMois commented Nov 16, 2018

Any updates/suggestions about this problem @mrocklin @martindurant ?

@martindurant
Copy link
Member

(I am at a conference and unlikely to be able to look into this right now)

@itozapata
Copy link

itozapata commented Apr 22, 2019

did anything ever happen with this ticket. i seem to have run into the same situation and dont see any information anywhere else about it.

My situation is i am persisting a dataset the first time i load it and someone else is picking it up and adding more to the graph something akin to

df = client.get_dataset('test')
new_df = df['some'] > 0.5
new_df = client.compute(new_df)

and it is giving me this error

@martindurant
Copy link
Member

The original posting above used versions that are now somewhat out of date. Would you mind trying with the latest versions? I just ran a very similar test and didn't see any error.

@itozapata
Copy link

sounds good after continuing to look i think i am getting something just slightly different. i will try and package up an example. I was able to get it working by not creating my own client and instead using get_client on my computer to make sure that the same client was being used with the client.compute call, but the behavior that i was experiencing didnt seem to make sense to me.

I will attempt to get something small to illustrate the situation shortly

@tshatrov
Copy link

We're also having troubles working with this check. There's a bunch of datasets published by a loader script, and another application is launching tasks that use .loc[...].compute() to retrieve a part of published dataset as a pandas dataframe and then perform some calculations with that dataframe. The way I see it, if the published dataset contains futures, and dask-worker retrieves it, then future.client is always a different object than the client of the worker, so future.client is self check will always fail. So how are we supposed to use published datasets from within workers?

@mrocklin
Copy link
Member

future.client should become the client on the local worker.

@tshatrov
Copy link

That's not the case, the futures from the retrieved dataframe have a client with a different id (which is different for every session)

In [1]: from distributed import Client

In [2]: cl = Client("*redacted*")

In [3]: cl.id
Out[3]: 'Client-99fd4b86-8c42-11e9-b54a-48df3757a8a0'

In [4]: df = cl.get_dataset('test')

In [5]: list(df.dask.values())[0].client.id
Out[5]: 'Client-aaa7eacc-8c42-11e9-b54a-48df3757a8a0'

In [7]: dask.__version__
Out[7]: '1.2.2'

In [9]: distributed.__version__
Out[9]: '1.28.1'

@TomAugspurger
Copy link
Member

Can anyone provide a reproducible example? This doesn't do it, I suspect because we aren't getting references to the client in the task graph

from distributed import LocalCluster, Client
import dask.array as da


def main():
    cluster = LocalCluster(dashboard_address=None)
    data = da.ones(3)

    with Client(cluster) as a:
        a.publish_dataset(data=data)
        with Client(cluster, set_as_default=False) as b:
            print(b.get_dataset('data').compute())


if __name__ == '__main__':
    main()

@TomAugspurger
Copy link
Member

Oh @tshatrov, can you repeat your test in #2336 (comment) after calling optimize on the graph? That's when things would be replaced:

def _optimize_insert_futures(self, dsk, keys):

@tshatrov
Copy link

STR:

  1. Launch dask-scheduler and dask-worker pointing at dask-scheduler.
  2. Launch ipython and do the following
import dask
from dask.distributed import Client

uri = <dask-scheduler's uri>
cl = Client(uri)
ts = dask.datasets.timeseries()
ts = ts.persist()
cl.publish_dataset(timeseries=ts)

def get_ts():
    from dask.distributed import worker_client
    with worker_client() as client:
       return client.get_dataset('timeseries').compute()

cl.submit(get_ts).result()

As a result I get ValueError: Inputs contain futures that were created by another client.

I can't reproduce this on LocalCluster.

I have not figured out how to use optimize_insert_futures to make this not fail. As far as I see it doesn't change the dataframe.

@TomAugspurger
Copy link
Member

Thanks @tshatrov. FWIW, I can't reproduce the ValueError locally with that. Perhaps others can.

@pranav-kohli
Copy link

Any updates on the issue, even I am facing the same problem as mentioned by @tshatrov

@martindurant
Copy link
Member

Hm, the example code also works OK for me. I wonder what might be different on your system, @pranav-kohli - what exact versions are you using?

@pranav-kohli
Copy link

I am using the following setup
dask==1.1.0
distributed==1.25.2

@mrocklin
Copy link
Member

mrocklin commented Jun 24, 2019 via email

@tshatrov
Copy link

tshatrov commented Jun 24, 2019

As I posted above the problem persists on the most recently released versions.

@pranav-kohli
Copy link

This is what i used to replicate the issue on the latest versions
dask = 2.0.0
distributed = 2.0.1

from distributed import Client
daskClient = Client('localhost:8786')
df = pd.DataFrame(data=[[1]], columns=['a'])
df = dd.from_pandas(df, npartitions=1)
df = df.persist()
print(daskClient.id)
print(list(df.dask.values())[0].client.id)
future = daskClient.submit(getCompute, df)

def getCompute(df):
    #print(df.id)
    print(list(df.dask.values())[0].client.id)
    df.compute()

The printed ids are different, hence the future.client is self check fails

@TomAugspurger
Copy link
Member

Thanks @pranav-kohli. Unfortunately this is the output I get for (a lightly modified) version of your script :/

Client-7ef457a4-9bfa-11e9-abd5-186590cd1c87
Client-7ef457a4-9bfa-11e9-abd5-186590cd1c87
import pandas as pd
from distributed import Client
import dask.dataframe as dd


def getCompute(df):
    #print(df.id)
    print(list(df.dask.values())[0].client.id)
    df.compute()


if __name__ == '__main__':
    daskClient = Client()
    df = pd.DataFrame(data=[[1]], columns=['a'])
    df = dd.from_pandas(df, npartitions=1)
    df = df.persist()
    print(daskClient.id)
    print(list(df.dask.values())[0].client.id)
    future = daskClient.submit(getCompute, df)

@pranav-kohli
Copy link

pranav-kohli commented Jul 1, 2019

@TomAugspurger So actually in the test case we are printing the same id twice

print(daskClient.id)
print(list(df.dask.values())[0].client.id) 

Can you check your dask worker log which prints from getCompute function?
My dask worker log shows a different client Id

@TomAugspurger
Copy link
Member

Is anybody who's able to reproduce this issue locally able to debug further?

@mrocklin
Copy link
Member

mrocklin commented Aug 2, 2019

Here is a reproducer:

from distributed.deploy.ssh2 import SSHCluster
from distributed import Client
import pandas as pd
import dask.dataframe as dd


def getCompute(df):
    return df.compute()


async def f():
    async with SSHCluster(
        hosts=["localhost", "localhost", "localhost"],
        worker_kwargs={"nthreads": 4},
        connect_kwargs={"known_hosts": None},
        asynchronous=True,
    ) as cluster:

        client = await Client("localhost:8786", asynchronous=True)
        df = pd.DataFrame(data=[[1]], columns=["a"])
        df = dd.from_pandas(df, npartitions=1)
        df = df.persist()
        future = client.submit(getCompute, df)
        await future


if __name__ == "__main__":
    import asyncio

    asyncio.get_event_loop().run_until_complete(f())

Traceback

Traceback (most recent call last):
  File "foo.py", line 28, in <module>
    asyncio.get_event_loop().run_until_complete(f())
  File "/Users/mrocklin/miniconda/envs/dev/lib/python3.7/asyncio/base_events.py", line 573, in run_until_complete
    return future.result()
  File "foo.py", line 23, in f
    await future
  File "/Users/mrocklin/workspace/distributed/distributed/client.py", line 232, in _result
    six.reraise(*exc)
  File "/Users/mrocklin/miniconda/envs/dev/lib/python3.7/site-packages/six.py", line 692, in reraise
    raise value.with_traceback(tb)
  File "foo.py", line 8, in getCompute
    return df.compute()
  File "/Users/mrocklin/workspace/dask/dask/base.py", line 175, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/Users/mrocklin/workspace/dask/dask/base.py", line 446, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/Users/mrocklin/workspace/distributed/distributed/client.py", line 2500, in get
    actors=actors,
  File "/Users/mrocklin/workspace/distributed/distributed/client.py", line 2389, in _graph_to_futures
    raise ValueError(msg)
ValueError: Inputs contain futures that were created by another client.

@jacobtomlinson
Copy link
Member

I am able to reproduce following @tshatrov's instructions. I've slightly modified it to use get_cluster.

import distributed
client = distributed.Client('tcp://localhost:8786')

import dask
ts = dask.datasets.timeseries()
ts = ts.persist()
client.publish_dataset(timeseries=ts)

def get_ts():
    from dask.distributed import get_client
    with get_client() as client:
        return client.get_dataset('timeseries').compute()

client.submit(get_ts).result()
ValueError: Inputs contain futures that were created by another client.

Dask and distributed 2.1.0.

mrocklin added a commit to mrocklin/distributed that referenced this issue Aug 2, 2019
This helps to normalize scheduler addresses before comparison

Fixes dask#2336
@jacobtomlinson
Copy link
Member

jacobtomlinson commented Aug 7, 2019

Here is a new minimum reproducible example. The key here is that the clients connects to the scheduler on a different IP to the worker.

# Run dash scheduler on a machine
$ dask-scheduler
# Connect to that scheduler with a worker on the same machine via localhost
$ dask-worker localhost:8786
# Connect to the cluster on a different IP
import distributed
client = distributed.Client('10.1.2.3:8786')  # Or whatever the LAN IP is

# Persist some data and publish it as a dataset
import dask
df = dask.datasets.timeseries().persist()
client.publish_dataset(df=df)

# Try and grab the dataset from within a delayed task
@dask.delayed
def remote_head():
    client = distributed.get_client()
    df = client.get_dataset('df')
    return df.head()

remote_head().compute()
# This results in an error
ValueError: Inputs contain futures that were created by another client.

@crusaderky
Copy link
Collaborator

I"m stumbling on the same problem on distributed 2.7.0.

Another minimal example:

client = distributed.Client("localhost:8786")

a, = client.persist([delayed(1, pure=True)])
print(a) # Delayed('int-62645d78d66e2508256b7ab60a38b944')
print(a.compute())  # 1
print(client.compute([a])[0].result())  # 1

client.publish_dataset(foo=a)
b = client.get_dataset("foo")

print(b)  # Delayed('int-62645d78d66e2508256b7ab60a38b944')
print(b.compute()) # 1
print(client.compute([b])[0].result())  # ValueError: Inputs contain futures that were created by another client.

The issue disappears if I connect to a LocalCluster instead.

Workaround:

print(client.gather([b.dask[b.key]]))  # 1

This issue is particularly troublesome when using the asynchronous client together with non-trivial collections (read: anything other than a delayed), since the compute() method does not work (dask/dask#5580) so one would have to reassemble the output by hand starting from output of the futures.

@pranav-kohli
Copy link

Still facing the same issue with Dask 2.6.0 and Distributed 2.6.0.

import pandas as pd
import dask.dataframe as dd
from distributed import Client

def getCompute(df):
    print("client id inside")
    print(list(df.dask.values())[0].client.id)
    df.compute()

daskClient = Client('localhost:8786')
df = pd.DataFrame(data=[[1]], columns=['a'])
df = dd.from_pandas(df, npartitions=1)
df = df.persist()
print(daskClient.id)
future = daskClient.submit(getCompute, df)
print(future.result())

Error:
ValueError: Inputs contain futures that were created by another client.

@jrbourbeau
Copy link
Member

@pranav-kohli I'm not able to reproduce the error with the latest dask and distributed release (version 2.12.0):

In [1]: import pandas as pd
   ...: import dask.dataframe as dd
   ...: from distributed import Client
   ...:
   ...: def getCompute(df):
   ...:     print("client id inside")
   ...:     print(list(df.dask.values())[0].client.id)
   ...:     df.compute()
   ...:
   ...: daskClient = Client()
   ...: df = pd.DataFrame(data=[[1]], columns=['a'])
   ...: df = dd.from_pandas(df, npartitions=1)
   ...: df = df.persist()
   ...: print(daskClient.id)
   ...: future = daskClient.submit(getCompute, df)
   ...: print(future.result())
Client-cb589a84-63a4-11ea-9e5d-a0999b120aab
client id inside
Client-worker-ccab6e5c-63a4-11ea-9e6b-a0999b120aab
None

Can you update those packages and see if the problem persists?

@tshatrov
Copy link

Isn't this code using LocalCluster? We already know this bug does not reproduce on a LocalCluster.

@martindurant
Copy link
Member

LocalCluster has separate processes by default, just like any other deployment

@jrbourbeau
Copy link
Member

Isn't this code using LocalCluster? We already know this bug does not reproduce on a LocalCluster.

Ah, I missed that when reading through the previous comments. I am able to reproduce the ValueError when not using LocalCluster

@martindurant
Copy link
Member

How weird! Now that should give us something to diagnose by, but I am pretty mystified.
I wonder, @jrbourbeau , do you get the problem with LocalCluster if you use spawn (which would end up with the client objects having different IDs in each worker) ?

@jrbourbeau
Copy link
Member

The default multiprocessing method was updated to spawn in #3461. I checked that the problem does not occur with LocalCluster when using either spawn or forkserver multiprocessing methods

@pborgen
Copy link

pborgen commented May 9, 2020

This is what i used to replicate the issue on the latest versions
dask = 2.0.0
distributed = 2.0.1

from distributed import Client
daskClient = Client('localhost:8786')
df = pd.DataFrame(data=[[1]], columns=['a'])
df = dd.from_pandas(df, npartitions=1)
df = df.persist()
print(daskClient.id)
print(list(df.dask.values())[0].client.id)
future = daskClient.submit(getCompute, df)

def getCompute(df):
    #print(df.id)
    print(list(df.dask.values())[0].client.id)
    df.compute()

The printed ids are different, hence the future.client is self check fails

I have this same issue with the latest. Anyone one have any ideas?
dask==2.16.0
distributed==2.16.0

@crusaderky
Copy link
Collaborator

@pborgen my PR resolves the issue for get_dataset() specifically - which is what the opening post was about. What you're doing in that snippet is sending over an arbitrary python object which happens to contain Futures and expect them to be recreated correctly when the Worker deserializes them - which is a different problem, albeit related.

@pborgen
Copy link

pborgen commented May 10, 2020

@pborgen my PR resolves the issue for get_dataset() specifically - which is what the opening post was about. What you're doing in that snippet is sending over an arbitrary python object which happens to contain Futures and expect them to be recreated correctly when the Worker deserializes them - which is a different problem, albeit related.

Is there currently a issue to capture this?

@crusaderky
Copy link
Collaborator

@pborgen there is one now: #3790

Note that, as a workaround, you can use publish, which is currently the only sanctioned way to move collections across clients:

from distributed import Client, get_client

daskClient = Client('localhost:8786')
df = pd.DataFrame(data=[[1]], columns=['a'])
df = dd.from_pandas(df, npartitions=1)
df = df.persist()
daskClient.publish_dataset(foo=df)
try:
    future = daskClient.submit(getCompute, "foo")
    future.result()
finally:
    daskClient.unpublish_dataset("foo")

def getCompute(name):
    df = get_client().get_dataset(name)
    df.compute()

The downside is that, if for any reason the Client is SIGKILL'ed or loses network connectivity before the end of the computation, you'll end up with a memory leak on the cluster.

@pborgen
Copy link

pborgen commented May 11, 2020

Thanks for your help.....I upgraded my dev box and all my machines that are part of the cluster to 2.16.0. I am using dask cli with one machine dedicated to the scheduler and 3 other dask workers... When I run with one worker everything works fine. But if I run with 2 workers I get the below error:

Error:
File "c:\python38\lib\site-packages\dask\base.py", line 166, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "c:\python38\lib\site-packages\dask\base.py", line 444, in compute
results = schedule(dsk, keys, **kwargs)
File "c:\python38\lib\site-packages\distributed\client.py", line 2646, in get
used as an optimization to avoid recomputation.
File "c:\python38\lib\site-packages\distributed\client.py", line 2543, in _graph_to_futures

ValueError: Inputs contain futures that were created by another client.
Versions:
dask==2.16.0
distributed==2.16.0

@crusaderky
Copy link
Collaborator

@pborgen run what? the POC that you posted, or my latest one with publish_dataset/get_dataset?

@pborgen
Copy link

pborgen commented May 11, 2020

I am running my code...It is pretty similar to what you posted though..... Your code seems to run fine on my cluster.....I am going to try to run your code with a very large dask dataframe to see if I can reproduce...

@pborgen
Copy link

pborgen commented May 11, 2020

I should say that this error I get only happens after things seem to be running for for a minute or so...I am also using a ddf that is created from a parquet file that is 140mb and about 40millions rows

@pborgen
Copy link

pborgen commented May 11, 2020

inside my task I am querying this very large parquet file

@jcrist
Copy link
Member

jcrist commented May 11, 2020

@pborgen, if you can create a minimal reproducible example, please file a new issue and we can discuss there.

@pborgen
Copy link

pborgen commented May 11, 2020

    def task_big_data(id):
        from dask.distributed import get_client

        dataset = get_client().get_dataset('my_dataset')
        df = dataset[dataset.ID== id].compute()  <-----this is where I am getting my error(it does run for a bit before getting the error....Maybe it errors when the task is run on another worker?)

@pborgen
Copy link

pborgen commented May 11, 2020

@pborgen, if you can create a minimal reproducible example, please file a new issue and we can discuss there.

Thanks for you help...I will try to get this to you today or tomarrow.

@pborgen
Copy link

pborgen commented May 11, 2020

Do you know of a way to programmaticly create a very large dataframe? like at least 40 million rows and 8 columns.

@jcrist
Copy link
Member

jcrist commented May 11, 2020

There are some functions for doing this in dask.datasets (e.g. dask.datasets.timeseries). You could also use dask.dataframe.from_dask_array and dask.array.random to make some large random numeric dataframes.

@pborgen
Copy link

pborgen commented May 11, 2020

Created #3791

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet