Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler with Recurse/Inner #1014

Closed

Conversation

benjchristensen
Copy link
Member

API changes as per #997

Usage looks like this:

import java.util.concurrent.TimeUnit;

import rx.Scheduler.Inner;
import rx.Scheduler.Recurse;
import rx.functions.Action1;
import rx.schedulers.Schedulers;

public class Test {

    public static void main(String args[]) {

        Schedulers.newThread().schedule(new Action1<Recurse>() {

            @Override
            public void call(Recurse inner) {
                System.out.println("do stuff");
                // recurse
                inner.schedule(this);
            }

        });

        Schedulers.newThread().schedule(recurse -> {
            System.out.println("do stuff");
            recurse.schedule();
        });

        Schedulers.newThread().schedule(recurse -> {
            System.out.println("do stuff");
            recurse.schedule(1000, TimeUnit.MILLISECONDS);
        });

        Schedulers.newThread().schedule(recurse -> {
            recurse.schedule(re -> {
                System.out.println("do more stuff");
            });
        });

        Inner inner = Schedulers.newThread().createInner();
        inner.schedule(re -> {
            System.out.println("do stuff");
            re.schedule(r -> {
                System.out.println("do more stuff");
            });
        });

    }
}

Code outline:

screen shot 2014-04-02 at 11 08 31 pm

@cloudbees-pull-request-builder

RxJava-pull-requests #941 ABORTED

@akarnokd
Copy link
Member

akarnokd commented Apr 3, 2014

I would go for a combined Inner and Recurse scheduler by hiding the current inner action in a ThreadLocal variable. Here is an example Scheduler infrastructure and a NewThreadScheduler implemented like this. I'm not certain, however, that this ThreadLocal logic works for a TrampolineScheduler or TestScheduler.

@benjchristensen
Copy link
Member Author

The problem I've had with combining them is that when first creating an Inner there is no Action to invoke. Using the ThreadLocal to store the current action doesn't solve this, it would just make it non-obvious why inner.schedule() doesn't work the first time, and the API would be odd that I can get an Inner with a schedule() method even though nothing has been invoked yet.

The reason for this scenario is that retrieving an Inner via createInner() is needed to make use cases like observeOn less awkward. Here is the example where Inner is retrieved before an Action is executed in observeOn:

        protected void schedule() {
            if (counter.getAndIncrement() == 0) {
                if (recursiveScheduler == null) {
                    recursiveScheduler = scheduler.createInner();
                    add(recursiveScheduler);
                }
                recursiveScheduler.schedule(new Action1<Recurse>() {

                    @Override
                    public void call(Recurse inner) {
                        pollQueue();
                    }

                });
            }
        }

The reason is that the recursion happens externally (the operator is doing it) rather than internally (inside the Action<Inner>/Action<Recurse>.

Therefore, we have use cases where Inner is used before Recurse makes sense, so the API is not appropriate when they are combined.

@zsxwing
Copy link
Member

zsxwing commented Apr 4, 2014

Now the solution is passing a scheduler to the user. I just got an opposite idea. Can we let the user tell us how to schedule. For example,

public class Scheduler {

    public final Subscription schedule(Func1<ScheduleAction, ScheduleAction> action);
    public final Subscription schedule(Func1<ScheduleAction, ScheduleAction> action, final long delayTime, final TimeUnit unit);
    public final Subscription schedulePeriodically(Func1<ScheduleAction, ScheduleAction> action, long initialDelay, long period, TimeUnit unit);
    public abstract Inner createInner(); // for advanced use cases like `observeOn`
    public int degreeOfParallelism();
    public long now();

    public static final class ScheduleAction {

        private static final none = new ScheduleAction();

        public static final ScheduleAction none() {
            return none;
        }

        public ScheduleAction(Action1<Recurse> action, long initialDelay, long period, TimeUnit unit) {
            ...
        }
        ...
    }

    // now mostly an implementation detail except for advanced use cases
    public abstract static class Inner implements Subscription {
        public abstract void schedule(ScheduleAction);
        public long now();
    }

    public static void main(String[] args) {
        Scheduler scheduler = ...;

        // recursively schedule the current action
        scheduler.schedule(new Func1<ScheduleAction, ScheduleAction>() {
            public ScheduleAction call(ScheduleAction action) {
                ...
                return action;
            }
        });

        // create a new ScheduleAction
        scheduler.schedule(new Func1<ScheduleAction, ScheduleAction>() {
            public ScheduleAction call(ScheduleAction action) {
                ...
                return new ScheduleAction(...);
            }
        });

        // quit the recursive schedule process
        scheduler.schedule(new Func1<ScheduleAction, ScheduleAction>() {
            public ScheduleAction call(ScheduleAction action) {
                ...
                return ScheduleAction.none();
            }
        });
    }

}

We send the current ScheduleAction to the user, and the user needs to return a ScheduleAction to tell Scheduler how to schedule the next action.

@zsxwing
Copy link
Member

zsxwing commented Apr 4, 2014

What's more, if we change ScheduleAction to ScheduleAction<T> to allow the user returns a T value via ScheduleAction, that may be convenient in some situation.

@benjchristensen
Copy link
Member Author

@zsxwing I don't understand what the Func1<ScheduledAction, ScheduledAction> signature buys us as opposed to recurse which doesn't require the developer to create new Recurse or ScheduledAction types. It seems more complicated that the Recurse method signatures.

@benjchristensen
Copy link
Member Author

Discussion of the signatures are continuing at #997

@zsxwing
Copy link
Member

zsxwing commented Apr 8, 2014

Agreed. I gave up the Func1 idea.

@benjchristensen benjchristensen deleted the scheduler-0.18 branch April 11, 2014 20:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants