Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

iterate RemoteChannels #31177

Closed
GregPlowman opened this issue Feb 26, 2019 · 2 comments
Closed

iterate RemoteChannels #31177

GregPlowman opened this issue Feb 26, 2019 · 2 comments
Labels
iteration Involves iteration or the iteration protocol parallelism Parallel or distributed computation

Comments

@GregPlowman
Copy link
Contributor

Is there any reason why we can't iterate a RemoteChannel.

The documentation for Channel shows an example using a for x in channel loop, whereas the analogous example for RemoteChannel uses a while true loop. Presumably this is because iterate is not defined for RemoteChannel.

Channel (https://docs.julialang.org/en/latest/manual/parallel-computing/#Channels-1)

function do_work()
           for job_id in jobs
               exec_time = rand()
               sleep(exec_time)                # simulates elapsed time doing actual work
                                               # typically performed externally.
               put!(results, (job_id, exec_time))
           end
       end;

RemoteChannel (https://docs.julialang.org/en/latest/manual/parallel-computing/#Channels-and-RemoteChannels-1)

@everywhere function do_work(jobs, results) # define work function everywhere
           while true
               job_id = take!(jobs)
               exec_time = rand()
               sleep(exec_time) # simulates elapsed time doing actual work
               put!(results, (job_id, exec_time, myid()))
           end
       end

Naively copying the iterate code from Channel to RemoteChannel appears to work for a simple case:

using Distributed

function Base.iterate(c::RemoteChannel, state=nothing)
    try
        return (take!(c), nothing)
    catch e
        if isa(e, InvalidStateException) && e.state==:closed
            return nothing
        else
            rethrow(e)
        end
    end
end

Base.IteratorSize(::Type{<:RemoteChannel}) = Base.SizeUnknown()

c = Channel{Int}(10)
foreach(i->put!(c, i), 1:3)
close(c)
data = [i for i in c]

rc = RemoteChannel(() -> Channel{Int}(10))
foreach(i->put!(rc, i), 1:3)
close(rc)
data = [i for i in rc]
@tkf
Copy link
Member

tkf commented Dec 15, 2019

ref #33555 (my PR)

@brenhinkeller brenhinkeller added parallelism Parallel or distributed computation iteration Involves iteration or the iteration protocol labels Nov 21, 2022
@vtjnash
Copy link
Member

vtjnash commented Feb 11, 2024

Added in #48515

@vtjnash vtjnash closed this as completed Feb 11, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
iteration Involves iteration or the iteration protocol parallelism Parallel or distributed computation
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants