From cf2ad6085b64733321d6a7accff305bd0a540d80 Mon Sep 17 00:00:00 2001 From: Bart De Smet Date: Thu, 13 Apr 2017 21:49:09 -0700 Subject: [PATCH 1/2] Misc. improvements to concurrency types --- .../System.Reactive/Concurrency/AsyncLock.cs | 5 +- .../Concurrency/CatchScheduler.cs | 19 ++-- .../ConcurrencyAbstractionLayer.cs | 5 +- ...ConcurrencyAbstractionLayerImpl.Windows.cs | 22 +---- .../ConcurrencyAbstractionLayerImpl.cs | 54 +++-------- .../Concurrency/CurrentThreadScheduler.cs | 44 +++------ .../Concurrency/DefaultScheduler.cs | 22 +++-- .../DisableOptimizationsScheduler.cs | 2 +- .../Concurrency/EventLoopScheduler.cs | 12 +-- .../Concurrency/HistoricalScheduler.cs | 38 ++++---- .../System.Reactive/Concurrency/IScheduler.cs | 2 +- .../Concurrency/ISchedulerLongRunning.cs | 2 +- .../Concurrency/ISchedulerPeriodic.cs | 2 - .../System.Reactive/Concurrency/IStopwatch.cs | 2 - .../Concurrency/IStopwatchProvider.cs | 4 +- .../Concurrency/ImmediateScheduler.cs | 30 +++++-- .../Concurrency/LocalScheduler.TimerQueue.cs | 45 ++++------ .../Concurrency/LocalScheduler.cs | 25 ++---- .../Concurrency/NewThreadScheduler.cs | 20 ++--- .../Concurrency/ScheduledItem.cs | 90 +++++++------------ .../Concurrency/Scheduler.Async.cs | 48 +++++----- .../Concurrency/Scheduler.Recursive.cs | 32 ++++--- .../Scheduler.Services.Emulation.cs | 37 +++++--- .../Concurrency/Scheduler.Services.cs | 55 +++++------- .../Concurrency/Scheduler.Simple.cs | 10 +-- .../Concurrency/Scheduler.Wrappers.cs | 6 +- .../System.Reactive/Concurrency/Scheduler.cs | 69 +++----------- .../Concurrency/SchedulerDefaults.cs | 10 +-- .../Concurrency/SchedulerOperation.cs | 20 ++--- .../Concurrency/SchedulerQueue.cs | 25 ++---- .../Concurrency/SchedulerWrapper.cs | 5 +- .../Concurrency/Synchronization.ObserveOn.cs | 16 ++-- .../Synchronization.Synchronize.cs | 14 +-- .../Concurrency/Synchronization.cs | 14 +-- .../SynchronizationContextScheduler.cs | 12 ++- .../Concurrency/TaskHelpers.cs | 2 +- .../Concurrency/TaskPoolScheduler.cs | 33 +++---- .../Concurrency/Thread.Stub.cs | 13 ++- .../ThreadPoolScheduler.Windows.cs | 46 +++++----- .../Concurrency/ThreadPoolScheduler.cs | 45 ++++------ .../VirtualTimeScheduler.Extensions.cs | 10 +-- .../Concurrency/VirtualTimeScheduler.cs | 59 ++++++------ .../Concurrency/CoreDispatcherScheduler.cs | 2 +- .../Concurrency/DispatcherScheduler.cs | 2 +- 44 files changed, 421 insertions(+), 609 deletions(-) diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/AsyncLock.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/AsyncLock.cs index 74e891c9d6..4f6f918cf4 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/AsyncLock.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/AsyncLock.cs @@ -21,7 +21,7 @@ public sealed class AsyncLock : IDisposable /// processed by the owner. /// /// Action to queue for execution. - /// is null. + /// is null. public void Wait(Action action) { if (action == null) @@ -46,7 +46,9 @@ public void Wait(Action action) lock (queue) { if (queue.Count > 0) + { work = queue.Dequeue(); + } else { isAcquired = false; @@ -65,6 +67,7 @@ public void Wait(Action action) queue.Clear(); hasFaulted = true; } + throw; } } diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/CatchScheduler.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/CatchScheduler.cs index fc3e12fe54..327fe64de2 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/CatchScheduler.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/CatchScheduler.cs @@ -7,7 +7,7 @@ namespace System.Reactive.Concurrency { - class CatchScheduler : SchedulerWrapper + internal sealed class CatchScheduler : SchedulerWrapper where TException : Exception { private readonly Func _handler; @@ -26,11 +26,8 @@ protected override Func Wrap(Func _handler; @@ -81,16 +82,14 @@ public IDisposable ScheduleLongRunning(TState state, Action _handler; diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/ConcurrencyAbstractionLayer.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/ConcurrencyAbstractionLayer.cs index e6466410b9..03d0ee9e66 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/ConcurrencyAbstractionLayer.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/ConcurrencyAbstractionLayer.cs @@ -75,10 +75,7 @@ public interface IConcurrencyAbstractionLayer /// /// Gets whether long-running scheduling is supported. /// - bool SupportsLongRunning - { - get; - } + bool SupportsLongRunning { get; } /// /// Starts a new long-running thread. diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/ConcurrencyAbstractionLayerImpl.Windows.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/ConcurrencyAbstractionLayerImpl.Windows.cs index 69ee7a19ec..b629b7dcbd 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/ConcurrencyAbstractionLayerImpl.Windows.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/ConcurrencyAbstractionLayerImpl.Windows.cs @@ -3,8 +3,6 @@ // See the LICENSE file in the project root for more information. #if NO_THREAD && WINDOWS -using System; -using System.Collections.Generic; using System.Reactive.Disposables; using System.Threading; @@ -72,28 +70,16 @@ public void Sleep(TimeSpan timeout) e.Wait(); } - public IStopwatch StartStopwatch() - { - return new StopwatchImpl(); - } + public IStopwatch StartStopwatch() => new StopwatchImpl(); - public bool SupportsLongRunning - { - get { return false; } - } + public bool SupportsLongRunning => false; public void StartThread(Action action, object state) { throw new NotSupportedException(); } - private TimeSpan Normalize(TimeSpan dueTime) - { - if (dueTime < TimeSpan.Zero) - return TimeSpan.Zero; - - return dueTime; - } + private TimeSpan Normalize(TimeSpan dueTime) => dueTime < TimeSpan.Zero ? TimeSpan.Zero : dueTime; } } -#endif \ No newline at end of file +#endif diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/ConcurrencyAbstractionLayerImpl.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/ConcurrencyAbstractionLayerImpl.cs index 9825973fd2..9ef2380c3e 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/ConcurrencyAbstractionLayerImpl.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/ConcurrencyAbstractionLayerImpl.cs @@ -3,7 +3,6 @@ // See the LICENSE file in the project root for more information. #if !NO_THREAD -using System; using System.Collections.Generic; using System.Reactive.Disposables; using System.Threading; @@ -17,10 +16,7 @@ namespace System.Reactive.Concurrency // internal class /*Default*/ConcurrencyAbstractionLayerImpl : IConcurrencyAbstractionLayer { - public IDisposable StartTimer(Action action, object state, TimeSpan dueTime) - { - return new Timer(action, state, Normalize(dueTime)); - } + public IDisposable StartTimer(Action action, object state, TimeSpan dueTime) => new Timer(action, state, Normalize(dueTime)); public IDisposable StartPeriodicTimer(Action action, TimeSpan period) { @@ -47,27 +43,11 @@ public IDisposable QueueUserWorkItem(Action action, object state) return Disposable.Empty; } -#if USE_SLEEP_MS - public void Sleep(TimeSpan timeout) - { - System.Threading.Thread.Sleep((int)Normalize(timeout).TotalMilliseconds); - } -#else - public void Sleep(TimeSpan timeout) - { - System.Threading.Thread.Sleep(Normalize(timeout)); - } -#endif + public void Sleep(TimeSpan timeout) => System.Threading.Thread.Sleep(Normalize(timeout)); - public IStopwatch StartStopwatch() - { - return new StopwatchImpl(); - } + public IStopwatch StartStopwatch() => new StopwatchImpl(); - public bool SupportsLongRunning - { - get { return true; } - } + public bool SupportsLongRunning => true; public void StartThread(Action action, object state) { @@ -77,13 +57,7 @@ public void StartThread(Action action, object state) }) { IsBackground = true }.Start(); } - private static TimeSpan Normalize(TimeSpan dueTime) - { - if (dueTime < TimeSpan.Zero) - return TimeSpan.Zero; - - return dueTime; - } + private static TimeSpan Normalize(TimeSpan dueTime) => dueTime < TimeSpan.Zero ? TimeSpan.Zero : dueTime; // // Some historical context. In the early days of Rx, we discovered an issue with @@ -156,7 +130,7 @@ private static TimeSpan Normalize(TimeSpan dueTime) // symbol. // - class Timer : IDisposable + private sealed class Timer : IDisposable { private Action _action; private volatile System.Threading.Timer _timer; @@ -190,10 +164,7 @@ private void Tick(object state) } } - private bool IsTimerAssigned() - { - return _timer != null; - } + private bool IsTimerAssigned() => _timer != null; public void Dispose() { @@ -208,7 +179,7 @@ public void Dispose() } } - class PeriodicTimer : IDisposable + private sealed class PeriodicTimer : IDisposable { private Action _action; private volatile System.Threading.Timer _timer; @@ -224,10 +195,7 @@ public PeriodicTimer(Action action, TimeSpan period) _timer = new System.Threading.Timer(this.Tick, null, period, period); } - private void Tick(object state) - { - _action(); - } + private void Tick(object state) => _action(); public void Dispose() { @@ -242,7 +210,7 @@ public void Dispose() } } - class FastPeriodicTimer : IDisposable + private sealed class FastPeriodicTimer : IDisposable { private readonly Action _action; private volatile bool disposed; @@ -274,4 +242,4 @@ public void Dispose() } } } -#endif \ No newline at end of file +#endif diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/CurrentThreadScheduler.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/CurrentThreadScheduler.cs index dc15dc772b..7a913721b3 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/CurrentThreadScheduler.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/CurrentThreadScheduler.cs @@ -16,28 +16,22 @@ public sealed class CurrentThreadScheduler : LocalScheduler { private static readonly Lazy s_instance = new Lazy(() => new CurrentThreadScheduler()); - CurrentThreadScheduler() + private CurrentThreadScheduler() { } /// /// Gets the singleton instance of the current thread scheduler. /// - public static CurrentThreadScheduler Instance - { - get { return s_instance.Value; } - } + public static CurrentThreadScheduler Instance => s_instance.Value; [ThreadStatic] - static SchedulerQueue s_threadLocalQueue; + private static SchedulerQueue s_threadLocalQueue; [ThreadStatic] - static IStopwatch s_clock; + private static IStopwatch s_clock; - private static SchedulerQueue GetQueue() - { - return s_threadLocalQueue; - } + private static SchedulerQueue GetQueue() => s_threadLocalQueue; private static void SetQueue(SchedulerQueue newQueue) { @@ -61,25 +55,13 @@ private static TimeSpan Time [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1822:MarkMembersAsStatic", Justification = "Now marked as obsolete.")] [EditorBrowsable(EditorBrowsableState.Never)] [Obsolete(Constants_Core.OBSOLETE_SCHEDULEREQUIRED)] // Preferring static method call over instance method call. - public bool ScheduleRequired - { - get - { - return IsScheduleRequired; - } - } + public bool ScheduleRequired => IsScheduleRequired; /// /// Gets a value that indicates whether the caller must call a Schedule method. /// [EditorBrowsable(EditorBrowsableState.Advanced)] - public static bool IsScheduleRequired - { - get - { - return GetQueue() == null; - } - } + public static bool IsScheduleRequired => GetQueue() == null; /// /// Schedules an action to be executed after dueTime. @@ -89,7 +71,7 @@ public static bool IsScheduleRequired /// Action to be executed. /// Relative time after which to execute the action. /// The disposable object used to cancel the scheduled action (best effort). - /// is null. + /// is null. public override IDisposable Schedule(TState state, TimeSpan dueTime, Func action) { if (action == null) @@ -106,14 +88,14 @@ public override IDisposable Schedule(TState state, TimeSpan dueTime, Fun queue = new SchedulerQueue(4); queue.Enqueue(si); - CurrentThreadScheduler.SetQueue(queue); + SetQueue(queue); try { Trampoline.Run(queue); } finally { - CurrentThreadScheduler.SetQueue(null); + SetQueue(null); } } else @@ -124,7 +106,7 @@ public override IDisposable Schedule(TState state, TimeSpan dueTime, Fun return Disposable.Create(si.Cancel); } - static class Trampoline + private static class Trampoline { public static void Run(SchedulerQueue queue) { @@ -133,14 +115,16 @@ public static void Run(SchedulerQueue queue) var item = queue.Dequeue(); if (!item.IsCanceled) { - var wait = item.DueTime - CurrentThreadScheduler.Time; + var wait = item.DueTime - Time; if (wait.Ticks > 0) { ConcurrencyAbstractionLayer.Current.Sleep(wait); } if (!item.IsCanceled) + { item.Invoke(); + } } } } diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/DefaultScheduler.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/DefaultScheduler.cs index 060a18c6c2..551ca8bc30 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/DefaultScheduler.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/DefaultScheduler.cs @@ -18,13 +18,7 @@ public sealed class DefaultScheduler : LocalScheduler, ISchedulerPeriodic /// /// Gets the singleton instance of the default scheduler. /// - public static DefaultScheduler Instance - { - get - { - return s_instance.Value; - } - } + public static DefaultScheduler Instance => s_instance.Value; private DefaultScheduler() { @@ -37,7 +31,7 @@ private DefaultScheduler() /// State passed to the action to be executed. /// Action to be executed. /// The disposable object used to cancel the scheduled action (best effort). - /// is null. + /// is null. public override IDisposable Schedule(TState state, Func action) { if (action == null) @@ -48,7 +42,9 @@ public override IDisposable Schedule(TState state, Func { if (!d.IsDisposed) + { d.Disposable = action(this, state); + } }, null); return StableCompositeDisposable.Create( @@ -65,7 +61,7 @@ public override IDisposable Schedule(TState state, FuncAction to be executed. /// Relative time after which to execute the action. /// The disposable object used to cancel the scheduled action (best effort). - /// is null. + /// is null. public override IDisposable Schedule(TState state, TimeSpan dueTime, Func action) { if (action == null) @@ -80,7 +76,9 @@ public override IDisposable Schedule(TState state, TimeSpan dueTime, Fun var cancel = s_cal.StartTimer(_ => { if (!d.IsDisposed) + { d.Disposable = action(this, state); + } }, null, dt); return StableCompositeDisposable.Create( @@ -97,8 +95,8 @@ public override IDisposable Schedule(TState state, TimeSpan dueTime, Fun /// Period for running the work periodically. /// Action to be executed, potentially updating the state. /// The disposable object used to cancel the scheduled recurring action (best effort). - /// is less than TimeSpan.Zero. - /// is null. + /// is less than . + /// is null. public IDisposable SchedulePeriodic(TState state, TimeSpan period, Func action) { if (period < TimeSpan.Zero) @@ -143,7 +141,7 @@ protected override object GetService(Type serviceType) return base.GetService(serviceType); } - class LongRunning : ISchedulerLongRunning + private sealed class LongRunning : ISchedulerLongRunning { public static ISchedulerLongRunning Instance = new LongRunning(); diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/DisableOptimizationsScheduler.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/DisableOptimizationsScheduler.cs index f376afa762..8cb99c7cfe 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/DisableOptimizationsScheduler.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/DisableOptimizationsScheduler.cs @@ -7,7 +7,7 @@ namespace System.Reactive.Concurrency { - class DisableOptimizationsScheduler : SchedulerWrapper + internal sealed class DisableOptimizationsScheduler : SchedulerWrapper { private readonly Type[] _optimizationInterfaces; diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/EventLoopScheduler.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/EventLoopScheduler.cs index c4126cce7c..b2d4ee95f2 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/EventLoopScheduler.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/EventLoopScheduler.cs @@ -91,7 +91,7 @@ public EventLoopScheduler() /// Creates an object that schedules units of work on a designated thread, using the specified factory to control thread creation options. /// /// Factory function for thread creation. - /// is null. + /// is null. public EventLoopScheduler(Func threadFactory) { if (threadFactory == null) @@ -141,7 +141,7 @@ internal bool ExitIfEmpty /// Action to be executed. /// Relative time after which to execute the action. /// The disposable object used to cancel the scheduled action (best effort). - /// is null. + /// is null. /// The scheduler has been disposed and doesn't accept new work. public override IDisposable Schedule(TState state, TimeSpan dueTime, Func action) { @@ -181,8 +181,8 @@ public override IDisposable Schedule(TState state, TimeSpan dueTime, Fun /// Period for running the work periodically. /// Action to be executed, potentially updating the state. /// The disposable object used to cancel the scheduled recurring action (best effort). - /// is null. - /// is less than TimeSpan.Zero. + /// is null. + /// is less than . /// The scheduler has been disposed and doesn't accept new work. public IDisposable SchedulePeriodic(TState state, TimeSpan period, Func action) { @@ -325,7 +325,9 @@ private void Run() foreach (var item in ready) { if (!item.IsCanceled) + { item.Invoke(); + } } } @@ -362,4 +364,4 @@ private void Tick(object state) #endregion } -} \ No newline at end of file +} diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/HistoricalScheduler.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/HistoricalScheduler.cs index 387b5d6401..ffe28474b4 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/HistoricalScheduler.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/HistoricalScheduler.cs @@ -8,12 +8,12 @@ namespace System.Reactive.Concurrency { /// - /// Base class for historical schedulers, which are virtual time schedulers that use DateTimeOffset for absolute time and TimeSpan for relative time. + /// Base class for historical schedulers, which are virtual time schedulers that use for absolute time and for relative time. /// public abstract class HistoricalSchedulerBase : VirtualTimeSchedulerBase { /// - /// Creates a new historical scheduler with the minimum value of DateTimeOffset as the initial clock value. + /// Creates a new historical scheduler with the minimum value of as the initial clock value. /// protected HistoricalSchedulerBase() : base(DateTimeOffset.MinValue, Comparer.Default) @@ -51,35 +51,29 @@ protected override DateTimeOffset Add(DateTimeOffset absolute, TimeSpan relative } /// - /// Converts the absolute time value to a DateTimeOffset value. + /// Converts the absolute time value to a value. /// /// Absolute time value to convert. - /// The corresponding DateTimeOffset value. - protected override DateTimeOffset ToDateTimeOffset(DateTimeOffset absolute) - { - return absolute; - } + /// The corresponding value. + protected override DateTimeOffset ToDateTimeOffset(DateTimeOffset absolute) => absolute; /// - /// Converts the TimeSpan value to a relative time value. + /// Converts the value to a relative time value. /// - /// TimeSpan value to convert. + /// value to convert. /// The corresponding relative time value. - protected override TimeSpan ToRelative(TimeSpan timeSpan) - { - return timeSpan; - } + protected override TimeSpan ToRelative(TimeSpan timeSpan) => timeSpan; } /// - /// Provides a virtual time scheduler that uses DateTimeOffset for absolute time and TimeSpan for relative time. + /// Provides a virtual time scheduler that uses for absolute time and for relative time. /// public class HistoricalScheduler : HistoricalSchedulerBase { private readonly SchedulerQueue queue = new SchedulerQueue(); /// - /// Creates a new historical scheduler with the minimum value of DateTimeOffset as the initial clock value. + /// Creates a new historical scheduler with the minimum value of as the initial clock value. /// public HistoricalScheduler() : base() @@ -100,7 +94,7 @@ public HistoricalScheduler(DateTimeOffset initialClock) /// /// Initial value for the clock. /// Comparer to determine causality of events based on absolute time. - /// is null. + /// is null. public HistoricalScheduler(DateTimeOffset initialClock, IComparer comparer) : base(initialClock, comparer) { @@ -115,23 +109,29 @@ protected override IScheduledItem GetNext() while (queue.Count > 0) { var next = queue.Peek(); + if (next.IsCanceled) + { queue.Dequeue(); + } else + { return next; + } } + return null; } /// - /// Schedules an action to be executed at dueTime. + /// Schedules an action to be executed at . /// /// The type of the state passed to the scheduled action. /// State passed to the action to be executed. /// Action to be executed. /// Absolute time at which to execute the action. /// The disposable object used to cancel the scheduled action (best effort). - /// is null. + /// is null. public override IDisposable ScheduleAbsolute(TState state, DateTimeOffset dueTime, Func action) { if (action == null) diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/IScheduler.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/IScheduler.cs index d3fbd6ea54..c02e5a4b58 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/IScheduler.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/IScheduler.cs @@ -43,4 +43,4 @@ public interface IScheduler /// The disposable object used to cancel the scheduled action (best effort). IDisposable Schedule(TState state, DateTimeOffset dueTime, Func action); } -} \ No newline at end of file +} diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/ISchedulerLongRunning.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/ISchedulerLongRunning.cs index 8ec3471d02..755dfe5d6e 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/ISchedulerLongRunning.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/ISchedulerLongRunning.cs @@ -25,4 +25,4 @@ public interface ISchedulerLongRunning /// IDisposable ScheduleLongRunning(TState state, Action action); } -} \ No newline at end of file +} diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/ISchedulerPeriodic.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/ISchedulerPeriodic.cs index a328ec7c0b..917b1706bc 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/ISchedulerPeriodic.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/ISchedulerPeriodic.cs @@ -2,8 +2,6 @@ // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. -using System.Reactive.Disposables; - namespace System.Reactive.Concurrency { /// diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/IStopwatch.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/IStopwatch.cs index cdbcb8daad..5ef67ce014 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/IStopwatch.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/IStopwatch.cs @@ -2,8 +2,6 @@ // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. -using System; - namespace System.Reactive.Concurrency { /// diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/IStopwatchProvider.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/IStopwatchProvider.cs index ef7e7f1fe2..5ae2c36778 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/IStopwatchProvider.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/IStopwatchProvider.cs @@ -2,8 +2,6 @@ // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. -using System; - namespace System.Reactive.Concurrency { /* @@ -14,7 +12,7 @@ namespace System.Reactive.Concurrency */ /// - /// Provider for IStopwatch objects. + /// Provider for objects. /// public interface IStopwatchProvider { diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/ImmediateScheduler.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/ImmediateScheduler.cs index 465c6a73d6..99d6f10868 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/ImmediateScheduler.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/ImmediateScheduler.cs @@ -2,7 +2,6 @@ // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. -using System.Threading; using System.Reactive.Disposables; namespace System.Reactive.Concurrency @@ -15,17 +14,14 @@ public sealed class ImmediateScheduler : LocalScheduler { private static readonly Lazy s_instance = new Lazy(() => new ImmediateScheduler()); - ImmediateScheduler() + private ImmediateScheduler() { } /// /// Gets the singleton instance of the immediate scheduler. /// - public static ImmediateScheduler Instance - { - get { return s_instance.Value; } - } + public static ImmediateScheduler Instance => s_instance.Value; /// /// Schedules an action to be executed. @@ -66,9 +62,9 @@ public override IDisposable Schedule(TState state, TimeSpan dueTime, Fun return action(new AsyncLockScheduler(), state); } - class AsyncLockScheduler : LocalScheduler + private sealed class AsyncLockScheduler : LocalScheduler { - AsyncLock asyncLock; + private AsyncLock asyncLock; public override IDisposable Schedule(TState state, Func action) { @@ -78,12 +74,16 @@ public override IDisposable Schedule(TState state, Func { if (!m.IsDisposed) + { m.Disposable = action(this, state); + } }); return m; @@ -95,14 +95,23 @@ public override IDisposable Schedule(TState state, TimeSpan dueTime, Fun throw new ArgumentNullException(nameof(action)); if (dueTime.Ticks <= 0) - return Schedule(state, action); + { + return Schedule(state, action); + } + return ScheduleSlow(state, dueTime, action); + } + + private IDisposable ScheduleSlow(TState state, TimeSpan dueTime, Func action) + { var timer = ConcurrencyAbstractionLayer.Current.StartStopwatch(); var m = new SingleAssignmentDisposable(); if (asyncLock == null) + { asyncLock = new AsyncLock(); + } asyncLock.Wait(() => { @@ -113,8 +122,11 @@ public override IDisposable Schedule(TState state, TimeSpan dueTime, Fun { ConcurrencyAbstractionLayer.Current.Sleep(sleep); } + if (!m.IsDisposed) + { m.Disposable = action(this, state); + } } }); diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/LocalScheduler.TimerQueue.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/LocalScheduler.TimerQueue.cs index ffbae2359c..15faade4d7 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/LocalScheduler.TimerQueue.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/LocalScheduler.TimerQueue.cs @@ -414,31 +414,23 @@ internal void SystemClockChanged(object sender, SystemClockChangedEventArgs args /// This type is very similar to ScheduledItem, but we need a different Invoke signature to allow customization /// of the target scheduler (e.g. when called in a recursive scheduling context, see ExecuteNextShortTermWorkItem). /// - abstract class WorkItem : IComparable, IDisposable + private abstract class WorkItem : IComparable, IDisposable { - private readonly LocalScheduler _scheduler; - private readonly DateTimeOffset _dueTime; + public readonly LocalScheduler Scheduler; + public readonly DateTimeOffset DueTime; + private readonly SingleAssignmentDisposable _disposable; private int _hasRun; public WorkItem(LocalScheduler scheduler, DateTimeOffset dueTime) { - _scheduler = scheduler; - _dueTime = dueTime; + Scheduler = scheduler; + DueTime = dueTime; + _disposable = new SingleAssignmentDisposable(); _hasRun = 0; } - public LocalScheduler Scheduler - { - get { return _scheduler; } - } - - public DateTimeOffset DueTime - { - get { return _dueTime; } - } - public void Invoke(IScheduler scheduler) { // @@ -452,33 +444,29 @@ public void Invoke(IScheduler scheduler) try { if (!_disposable.IsDisposed) + { _disposable.Disposable = InvokeCore(scheduler); + } } finally { SystemClock.Release(); } - } + } } protected abstract IDisposable InvokeCore(IScheduler scheduler); - public int CompareTo(WorkItem/*!*/ other) - { - return Comparer.Default.Compare(this._dueTime, other._dueTime); - } + public int CompareTo(WorkItem/*!*/ other) => Comparer.Default.Compare(DueTime, other.DueTime); - public void Dispose() - { - _disposable.Dispose(); - } + public void Dispose() => _disposable.Dispose(); } /// /// Represents a work item that closes over scheduler invocation state. Subtyping is /// used to have a common type for the scheduler queues. /// - sealed class WorkItem : WorkItem + private sealed class WorkItem : WorkItem { private readonly TState _state; private readonly Func _action; @@ -490,10 +478,7 @@ public WorkItem(LocalScheduler scheduler, TState state, DateTimeOffset dueTime, _action = action; } - protected override IDisposable InvokeCore(IScheduler scheduler) - { - return _action(scheduler, _state); - } + protected override IDisposable InvokeCore(IScheduler scheduler) => _action(scheduler, _state); } } -} \ No newline at end of file +} diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/LocalScheduler.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/LocalScheduler.cs index e1fd1a4a07..67d5fdda63 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/LocalScheduler.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/LocalScheduler.cs @@ -12,10 +12,7 @@ public abstract partial class LocalScheduler : IScheduler, IStopwatchProvider, I /// /// Gets the scheduler's notion of current time. /// - public virtual DateTimeOffset Now - { - get { return Scheduler.Now; } - } + public virtual DateTimeOffset Now => Scheduler.Now; /// /// Schedules an action to be executed. @@ -24,7 +21,7 @@ public virtual DateTimeOffset Now /// State passed to the action to be executed. /// Action to be executed. /// The disposable object used to cancel the scheduled action (best effort). - /// is null. + /// is null. public virtual IDisposable Schedule(TState state, Func action) { if (action == null) @@ -51,7 +48,7 @@ public virtual IDisposable Schedule(TState state, FuncAction to be executed. /// Absolute time at which to execute the action. /// The disposable object used to cancel the scheduled action (best effort). - /// is null. + /// is null. public virtual IDisposable Schedule(TState state, DateTimeOffset dueTime, Func action) { if (action == null) @@ -65,18 +62,12 @@ public virtual IDisposable Schedule(TState state, DateTimeOffset dueTime /// /// New stopwatch object; started at the time of the request. /// - /// Platform-specific scheduler implementations should reimplement IStopwatchProvider to provide a more - /// efficient IStopwatch implementation (if available). + /// Platform-specific scheduler implementations should reimplement + /// to provide a more efficient implementation (if available). /// - public virtual IStopwatch StartStopwatch() - { - return ConcurrencyAbstractionLayer.Current.StartStopwatch(); - } + public virtual IStopwatch StartStopwatch() => ConcurrencyAbstractionLayer.Current.StartStopwatch(); - object IServiceProvider.GetService(Type serviceType) - { - return GetService(serviceType); - } + object IServiceProvider.GetService(Type serviceType) => GetService(serviceType); /// /// Discovers scheduler services by interface type. The base class implementation returns @@ -84,7 +75,7 @@ object IServiceProvider.GetService(Type serviceType) /// more control over service discovery, derived types can override this method. /// /// Scheduler service interface type to discover. - /// Object implementing the requested service, if available; null otherwise. + /// Object implementing the requested service, if available; null otherwise. protected virtual object GetService(Type serviceType) { #if !NO_PERF diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/NewThreadScheduler.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/NewThreadScheduler.cs index 22ed4a4173..bd5fc05de9 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/NewThreadScheduler.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/NewThreadScheduler.cs @@ -27,20 +27,14 @@ public NewThreadScheduler() /// /// Gets an instance of this scheduler that uses the default Thread constructor. /// - public static NewThreadScheduler Default - { - get - { - return s_instance.Value; - } - } + public static NewThreadScheduler Default => s_instance.Value; #if !NO_THREAD /// /// Creates an object that schedules each unit of work on a separate thread. /// /// Factory function for thread creation. - /// is null. + /// is null. public NewThreadScheduler(Func threadFactory) { if (threadFactory == null) @@ -60,7 +54,7 @@ private NewThreadScheduler(Func threadFactory) /// Action to be executed. /// Relative time after which to execute the action. /// The disposable object used to cancel the scheduled action (best effort). - /// is null. + /// is null. public override IDisposable Schedule(TState state, TimeSpan dueTime, Func action) { if (action == null) @@ -78,7 +72,7 @@ public override IDisposable Schedule(TState state, TimeSpan dueTime, Fun /// State passed to the action to be executed. /// Action to be executed. /// The disposable object used to cancel the scheduled action (best effort). - /// is null. + /// is null. public IDisposable ScheduleLongRunning(TState state, Action action) { if (action == null) @@ -109,8 +103,8 @@ public IDisposable ScheduleLongRunning(TState state, ActionPeriod for running the work periodically. /// Action to be executed, potentially updating the state. /// The disposable object used to cancel the scheduled recurring action (best effort). - /// is null. - /// is less than TimeSpan.Zero. + /// is null. + /// is less than . public IDisposable SchedulePeriodic(TState state, TimeSpan period, Func action) { if (period < TimeSpan.Zero) @@ -126,7 +120,7 @@ public IDisposable SchedulePeriodic(TState state, TimeSpan period, Func< return periodic; } - class Periodic : IDisposable + private sealed class Periodic : IDisposable { private readonly IStopwatch _stopwatch; private readonly TimeSpan _period; diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/ScheduledItem.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/ScheduledItem.cs index 61d7ccbe1a..81e9e81ee2 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/ScheduledItem.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/ScheduledItem.cs @@ -22,7 +22,7 @@ public abstract class ScheduledItem : IScheduledItem, ICom /// /// Absolute time at which the work item has to be executed. /// Comparer used to compare work items based on their scheduled time. - /// is null. + /// is null. protected ScheduledItem(TAbsolute dueTime, IComparer comparer) { if (comparer == null) @@ -43,7 +43,9 @@ protected ScheduledItem(TAbsolute dueTime, IComparer comparer) public void Invoke() { if (!_disposable.IsDisposed) + { _disposable.Disposable = InvokeCore(); + } } /// @@ -59,12 +61,14 @@ public void Invoke() /// /// Work item to compare the current work item to. /// Relative ordering between this and the specified work item. - /// The inequality operators are overloaded to provide results consistent with the IComparable implementation. Equality operators implement traditional reference equality semantics. + /// The inequality operators are overloaded to provide results consistent with the implementation. Equality operators implement traditional reference equality semantics. public int CompareTo(ScheduledItem other) { - // MSDN: By definition, any object compares greater than null, and two null references compare equal to each other. - if (object.ReferenceEquals(other, null)) + // MSDN: By definition, any object compares greater than null, and two null references compare equal to each other. + if (ReferenceEquals(other, null)) + { return 1; + } return _comparer.Compare(DueTime, other.DueTime); } @@ -74,48 +78,36 @@ public int CompareTo(ScheduledItem other) /// /// The first object to compare. /// The second object to compare. - /// true if the DueTime value of left is earlier than the DueTime value of right; otherwise, false. - /// This operator provides results consistent with the IComparable implementation. - public static bool operator <(ScheduledItem left, ScheduledItem right) - { - return Comparer>.Default.Compare(left, right) < 0; - } + /// true if the value of left is earlier than the value of right; otherwise, false. + /// This operator provides results consistent with the implementation. + public static bool operator <(ScheduledItem left, ScheduledItem right) => Comparer>.Default.Compare(left, right) < 0; /// /// Determines whether one specified object is due before or at the same of a second specified object. /// /// The first object to compare. /// The second object to compare. - /// true if the DueTime value of left is earlier than or simultaneous with the DueTime value of right; otherwise, false. - /// This operator provides results consistent with the IComparable implementation. - public static bool operator <=(ScheduledItem left, ScheduledItem right) - { - return Comparer>.Default.Compare(left, right) <= 0; - } + /// true if the value of left is earlier than or simultaneous with the value of right; otherwise, false. + /// This operator provides results consistent with the implementation. + public static bool operator <=(ScheduledItem left, ScheduledItem right) => Comparer>.Default.Compare(left, right) <= 0; /// /// Determines whether one specified object is due after a second specified object. /// /// The first object to compare. /// The second object to compare. - /// true if the DueTime value of left is later than the DueTime value of right; otherwise, false. - /// This operator provides results consistent with the IComparable implementation. - public static bool operator >(ScheduledItem left, ScheduledItem right) - { - return Comparer>.Default.Compare(left, right) > 0; - } + /// true if the value of left is later than the value of right; otherwise, false. + /// This operator provides results consistent with the implementation. + public static bool operator >(ScheduledItem left, ScheduledItem right) => Comparer>.Default.Compare(left, right) > 0; /// /// Determines whether one specified object is due after or at the same time of a second specified object. /// /// The first object to compare. /// The second object to compare. - /// true if the DueTime value of left is later than or simultaneous with the DueTime value of right; otherwise, false. - /// This operator provides results consistent with the IComparable implementation. - public static bool operator >=(ScheduledItem left, ScheduledItem right) - { - return Comparer>.Default.Compare(left, right) >= 0; - } + /// true if the value of left is later than or simultaneous with the value of right; otherwise, false. + /// This operator provides results consistent with the implementation. + public static bool operator >=(ScheduledItem left, ScheduledItem right) => Comparer>.Default.Compare(left, right) >= 0; #endregion @@ -126,53 +118,38 @@ public int CompareTo(ScheduledItem other) /// /// The first object to compare. /// The second object to compare. - /// true if both are equal; otherwise, false. + /// true if both are equal; otherwise, false. /// This operator does not provide results consistent with the IComparable implementation. Instead, it implements reference equality. - public static bool operator ==(ScheduledItem left, ScheduledItem right) - { - return object.ReferenceEquals(left, right); - } + public static bool operator ==(ScheduledItem left, ScheduledItem right) => ReferenceEquals(left, right); /// /// Determines whether two specified objects are inequal. /// /// The first object to compare. /// The second object to compare. - /// true if both are inequal; otherwise, false. + /// true if both are inequal; otherwise, false. /// This operator does not provide results consistent with the IComparable implementation. Instead, it implements reference equality. - public static bool operator !=(ScheduledItem left, ScheduledItem right) - { - return !(left == right); - } + public static bool operator !=(ScheduledItem left, ScheduledItem right) => !(left == right); /// /// Determines whether a object is equal to the specified object. /// /// The object to compare to the current object. - /// true if the obj parameter is a object and is equal to the current object; otherwise, false. - public override bool Equals(object obj) - { - return object.ReferenceEquals(this, obj); - } + /// true if the obj parameter is a object and is equal to the current object; otherwise, false. + public override bool Equals(object obj) => ReferenceEquals(this, obj); /// /// Returns the hash code for the current object. /// /// A 32-bit signed integer hash code. - public override int GetHashCode() - { - return base.GetHashCode(); - } + public override int GetHashCode() => base.GetHashCode(); #endregion /// - /// Cancels the work item by disposing the resource returned by InvokeCore as soon as possible. + /// Cancels the work item by disposing the resource returned by as soon as possible. /// - public void Cancel() - { - _disposable.Dispose(); - } + public void Cancel() => _disposable.Dispose(); /// /// Gets whether the work item has received a cancellation request. @@ -200,7 +177,7 @@ public sealed class ScheduledItem : ScheduledItem /// Scheduled action. /// Time at which to run the scheduled action. /// Comparer used to compare work items based on their scheduled time. - /// or or is null. + /// or or is null. public ScheduledItem(IScheduler scheduler, TValue state, Func action, TAbsolute dueTime, IComparer comparer) : base(dueTime, comparer) { @@ -221,7 +198,7 @@ public ScheduledItem(IScheduler scheduler, TValue state, FuncState to pass to the scheduled action. /// Scheduled action. /// Time at which to run the scheduled action. - /// or is null. + /// or is null. public ScheduledItem(IScheduler scheduler, TValue state, Func action, TAbsolute dueTime) : this(scheduler, state, action, dueTime, Comparer.Default) { @@ -231,9 +208,6 @@ public ScheduledItem(IScheduler scheduler, TValue state, Func /// Cancellation resource returned by the scheduled action. - protected override IDisposable InvokeCore() - { - return _action(_scheduler, _state); - } + protected override IDisposable InvokeCore() => _action(_scheduler, _state); } } diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Async.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Async.cs index 35a29f1db1..e063f4bc4a 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Async.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Async.cs @@ -16,7 +16,7 @@ public static partial class Scheduler /// /// Scheduler to yield work on. /// Scheduler operation object to await in order to schedule the continuation. - /// is null. + /// is null. public static SchedulerOperation Yield(this IScheduler scheduler) { if (scheduler == null) @@ -32,7 +32,7 @@ public static SchedulerOperation Yield(this IScheduler scheduler) /// Scheduler to yield work on. /// Cancellation token to cancel the continuation to run. /// Scheduler operation object to await in order to schedule the continuation. - /// is null. + /// is null. public static SchedulerOperation Yield(this IScheduler scheduler, CancellationToken cancellationToken) { if (scheduler == null) @@ -48,7 +48,7 @@ public static SchedulerOperation Yield(this IScheduler scheduler, CancellationTo /// Scheduler to yield work on. /// Time when the continuation should run. /// Scheduler operation object to await in order to schedule the continuation. - /// is null. + /// is null. public static SchedulerOperation Sleep(this IScheduler scheduler, TimeSpan dueTime) { if (scheduler == null) @@ -65,7 +65,7 @@ public static SchedulerOperation Sleep(this IScheduler scheduler, TimeSpan dueTi /// Time when the continuation should run. /// Cancellation token to cancel the continuation to run. /// Scheduler operation object to await in order to schedule the continuation. - /// is null. + /// is null. public static SchedulerOperation Sleep(this IScheduler scheduler, TimeSpan dueTime, CancellationToken cancellationToken) { if (scheduler == null) @@ -81,7 +81,7 @@ public static SchedulerOperation Sleep(this IScheduler scheduler, TimeSpan dueTi /// Scheduler to yield work on. /// Time when the continuation should run. /// Scheduler operation object to await in order to schedule the continuation. - /// is null. + /// is null. public static SchedulerOperation Sleep(this IScheduler scheduler, DateTimeOffset dueTime) { if (scheduler == null) @@ -98,7 +98,7 @@ public static SchedulerOperation Sleep(this IScheduler scheduler, DateTimeOffset /// Time when the continuation should run. /// Cancellation token to cancel the continuation to run. /// Scheduler operation object to await in order to schedule the continuation. - /// is null. + /// is null. public static SchedulerOperation Sleep(this IScheduler scheduler, DateTimeOffset dueTime, CancellationToken cancellationToken) { if (scheduler == null) @@ -115,7 +115,7 @@ public static SchedulerOperation Sleep(this IScheduler scheduler, DateTimeOffset /// State to pass to the asynchronous method. /// Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points. /// Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method. - /// or is null. + /// or is null. public static IDisposable ScheduleAsync(this IScheduler scheduler, TState state, Func action) { if (scheduler == null) @@ -134,7 +134,7 @@ public static IDisposable ScheduleAsync(this IScheduler scheduler, TStat /// State to pass to the asynchronous method. /// Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points. /// Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method. - /// or is null. + /// or is null. public static IDisposable ScheduleAsync(this IScheduler scheduler, TState state, Func> action) { if (scheduler == null) @@ -151,7 +151,7 @@ public static IDisposable ScheduleAsync(this IScheduler scheduler, TStat /// Scheduler to schedule work on. /// Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points. /// Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method. - /// or is null. + /// or is null. public static IDisposable ScheduleAsync(this IScheduler scheduler, Func action) { if (scheduler == null) @@ -168,7 +168,7 @@ public static IDisposable ScheduleAsync(this IScheduler scheduler, FuncScheduler to schedule work on. /// Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points. /// Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method. - /// or is null. + /// or is null. public static IDisposable ScheduleAsync(this IScheduler scheduler, Func> action) { if (scheduler == null) @@ -188,7 +188,7 @@ public static IDisposable ScheduleAsync(this IScheduler scheduler, FuncRelative time after which to execute the action. /// Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points. /// Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method. - /// or is null. + /// or is null. public static IDisposable ScheduleAsync(this IScheduler scheduler, TState state, TimeSpan dueTime, Func action) { if (scheduler == null) @@ -208,7 +208,7 @@ public static IDisposable ScheduleAsync(this IScheduler scheduler, TStat /// Relative time after which to execute the action. /// Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points. /// Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method. - /// or is null. + /// or is null. public static IDisposable ScheduleAsync(this IScheduler scheduler, TState state, TimeSpan dueTime, Func> action) { if (scheduler == null) @@ -226,7 +226,7 @@ public static IDisposable ScheduleAsync(this IScheduler scheduler, TStat /// Relative time after which to execute the action. /// Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points. /// Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method. - /// or is null. + /// or is null. public static IDisposable ScheduleAsync(this IScheduler scheduler, TimeSpan dueTime, Func action) { if (scheduler == null) @@ -244,7 +244,7 @@ public static IDisposable ScheduleAsync(this IScheduler scheduler, TimeSpan dueT /// Relative time after which to execute the action. /// Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points. /// Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method. - /// or is null. + /// or is null. public static IDisposable ScheduleAsync(this IScheduler scheduler, TimeSpan dueTime, Func> action) { if (scheduler == null) @@ -264,7 +264,7 @@ public static IDisposable ScheduleAsync(this IScheduler scheduler, TimeSpan dueT /// Absolute time at which to execute the action. /// Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points. /// Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method. - /// or is null. + /// or is null. public static IDisposable ScheduleAsync(this IScheduler scheduler, TState state, DateTimeOffset dueTime, Func action) { if (scheduler == null) @@ -284,7 +284,7 @@ public static IDisposable ScheduleAsync(this IScheduler scheduler, TStat /// Absolute time at which to execute the action. /// Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points. /// Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method. - /// or is null. + /// or is null. public static IDisposable ScheduleAsync(this IScheduler scheduler, TState state, DateTimeOffset dueTime, Func> action) { if (scheduler == null) @@ -302,7 +302,7 @@ public static IDisposable ScheduleAsync(this IScheduler scheduler, TStat /// Absolute time at which to execute the action. /// Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points. /// Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method. - /// or is null. + /// or is null. public static IDisposable ScheduleAsync(this IScheduler scheduler, DateTimeOffset dueTime, Func action) { if (scheduler == null) @@ -320,7 +320,7 @@ public static IDisposable ScheduleAsync(this IScheduler scheduler, DateTimeOffse /// Absolute time at which to execute the action. /// Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points. /// Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method. - /// or is null. + /// or is null. public static IDisposable ScheduleAsync(this IScheduler scheduler, DateTimeOffset dueTime, Func> action) { if (scheduler == null) @@ -372,7 +372,9 @@ private static IDisposable InvokeAsync(IScheduler self, TState s, Func e is OperationCanceledException); + } d.Disposable = t.Result; }, TaskContinuationOptions.ExecuteSynchronously); @@ -391,7 +393,7 @@ private static CancellationToken GetCancellationToken(this IScheduler scheduler) return cs != null ? cs.Token : CancellationToken.None; } - class CancelableScheduler : IScheduler + private sealed class CancelableScheduler : IScheduler { private readonly IScheduler _scheduler; private readonly CancellationToken _cancellationToken; @@ -407,10 +409,7 @@ public CancellationToken Token get { return _cancellationToken; } } - public DateTimeOffset Now - { - get { return _scheduler.Now; } - } + public DateTimeOffset Now => _scheduler.Now; public IDisposable Schedule(TState state, Func action) { @@ -427,6 +426,5 @@ public IDisposable Schedule(TState state, DateTimeOffset dueTime, FuncScheduler to execute the recursive action on. /// Action to execute recursively. The parameter passed to the action is used to trigger recursive scheduling of the action. /// The disposable object used to cancel the scheduled action (best effort). - /// or is null. + /// or is null. public static IDisposable Schedule(this IScheduler scheduler, Action action) { if (scheduler == null) @@ -33,7 +33,7 @@ public static IDisposable Schedule(this IScheduler scheduler, Action act /// State passed to the action to be executed. /// Action to execute recursively. The last parameter passed to the action is used to trigger recursive scheduling of the action, passing in recursive invocation state. /// The disposable object used to cancel the scheduled action (best effort). - /// or is null. + /// or is null. public static IDisposable Schedule(this IScheduler scheduler, TState state, Action> action) { if (scheduler == null) @@ -44,7 +44,7 @@ public static IDisposable Schedule(this IScheduler scheduler, TState sta return scheduler.Schedule(new Pair>> { First = state, Second = action }, InvokeRec1); } - static IDisposable InvokeRec1(IScheduler scheduler, Pair>> pair) + private static IDisposable InvokeRec1(IScheduler scheduler, Pair>> pair) { var group = new CompositeDisposable(1); var gate = new object(); @@ -62,9 +62,13 @@ static IDisposable InvokeRec1(IScheduler scheduler, Pair(IScheduler scheduler, PairAction to execute recursively. The parameter passed to the action is used to trigger recursive scheduling of the action at the specified relative time. /// Relative time after which to execute the action for the first time. /// The disposable object used to cancel the scheduled action (best effort). - /// or is null. + /// or is null. public static IDisposable Schedule(this IScheduler scheduler, TimeSpan dueTime, Action> action) { if (scheduler == null) @@ -112,7 +116,7 @@ public static IDisposable Schedule(this IScheduler scheduler, TimeSpan dueTime, /// Action to execute recursively. The last parameter passed to the action is used to trigger recursive scheduling of the action, passing in the recursive due time and invocation state. /// Relative time after which to execute the action for the first time. /// The disposable object used to cancel the scheduled action (best effort). - /// or is null. + /// or is null. public static IDisposable Schedule(this IScheduler scheduler, TState state, TimeSpan dueTime, Action> action) { if (scheduler == null) @@ -123,7 +127,7 @@ public static IDisposable Schedule(this IScheduler scheduler, TState sta return scheduler.Schedule(new Pair>> { First = state, Second = action }, dueTime, InvokeRec2); } - static IDisposable InvokeRec2(IScheduler scheduler, Pair>> pair) + private static IDisposable InvokeRec2(IScheduler scheduler, Pair>> pair) { var group = new CompositeDisposable(1); var gate = new object(); @@ -141,9 +145,13 @@ static IDisposable InvokeRec2(IScheduler scheduler, Pair(IScheduler scheduler, PairAction to execute recursively. The parameter passed to the action is used to trigger recursive scheduling of the action at the specified absolute time. /// Absolute time at which to execute the action for the first time. /// The disposable object used to cancel the scheduled action (best effort). - /// or is null. + /// or is null. public static IDisposable Schedule(this IScheduler scheduler, DateTimeOffset dueTime, Action> action) { if (scheduler == null) @@ -191,7 +199,7 @@ public static IDisposable Schedule(this IScheduler scheduler, DateTimeOffset due /// Action to execute recursively. The last parameter passed to the action is used to trigger recursive scheduling of the action, passing in the recursive due time and invocation state. /// Absolute time at which to execute the action for the first time. /// The disposable object used to cancel the scheduled action (best effort). - /// or is null. + /// or is null. public static IDisposable Schedule(this IScheduler scheduler, TState state, DateTimeOffset dueTime, Action> action) { if (scheduler == null) @@ -202,7 +210,7 @@ public static IDisposable Schedule(this IScheduler scheduler, TState sta return scheduler.Schedule(new Pair>> { First = state, Second = action }, dueTime, InvokeRec3); } - static IDisposable InvokeRec3(IScheduler scheduler, Pair>> pair) + private static IDisposable InvokeRec3(IScheduler scheduler, Pair>> pair) { var group = new CompositeDisposable(1); var gate = new object(); @@ -220,9 +228,13 @@ static IDisposable InvokeRec3(IScheduler scheduler, Pair(IScheduler scheduler, Pair + private struct Pair { public T1 First; public T2 Second; diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Services.Emulation.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Services.Emulation.cs index 575aee92e3..7bee867327 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Services.Emulation.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Services.Emulation.cs @@ -23,8 +23,8 @@ public static partial class Scheduler /// Period for running the work periodically. /// Action to be executed, potentially updating the state. /// The disposable object used to cancel the scheduled recurring action (best effort). - /// or is null. - /// is less than TimeSpan.Zero. + /// or is null. + /// is less than . public static IDisposable SchedulePeriodic(this IScheduler scheduler, TState state, TimeSpan period, Func action) { if (scheduler == null) @@ -49,8 +49,8 @@ public static IDisposable SchedulePeriodic(this IScheduler scheduler, TS /// Period for running the work periodically. /// Action to be executed. /// The disposable object used to cancel the scheduled recurring action (best effort). - /// or is null. - /// is less than TimeSpan.Zero. + /// or is null. + /// is less than . public static IDisposable SchedulePeriodic(this IScheduler scheduler, TState state, TimeSpan period, Action action) { if (scheduler == null) @@ -73,8 +73,8 @@ public static IDisposable SchedulePeriodic(this IScheduler scheduler, TS /// Period for running the work periodically. /// Action to be executed. /// The disposable object used to cancel the scheduled recurring action (best effort). - /// or is null. - /// is less than TimeSpan.Zero. + /// or is null. + /// is less than . public static IDisposable SchedulePeriodic(this IScheduler scheduler, TimeSpan period, Action action) { if (scheduler == null) @@ -94,7 +94,7 @@ public static IDisposable SchedulePeriodic(this IScheduler scheduler, TimeSpan p /// /// Scheduler to obtain a stopwatch for. /// New stopwatch object; started at the time of the request. - /// is null. + /// is null. /// The resulting stopwatch object can have non-monotonic behavior. public static IStopwatch StartStopwatch(this IScheduler scheduler) { @@ -135,7 +135,9 @@ public static IStopwatch StartStopwatch(this IScheduler scheduler) // var swp = scheduler.AsStopwatchProvider(); if (swp != null) + { return swp.StartStopwatch(); + } return new EmulatedStopwatch(scheduler); } @@ -287,7 +289,7 @@ private static IDisposable SchedulePeriodic_(IScheduler scheduler, TStat } } - class SchedulePeriodicStopwatch + private sealed class SchedulePeriodicStopwatch { private readonly IScheduler _scheduler; private readonly TimeSpan _period; @@ -409,7 +411,9 @@ private void Tick(Action recurse) // once more. // if (shouldWaitForResume) + { _resumeEvent.WaitOne(); + } } recurse(next); @@ -424,7 +428,9 @@ private void Cancel() _runState = DISPOSED; if (!Environment.HasShutdownStarted) + { _resumeEvent.Set(); + } } } @@ -452,7 +458,9 @@ private void Suspending(object sender, HostSuspendingEventArgs args) _runState = SUSPENDED; if (!Environment.HasShutdownStarted) + { _resumeEvent.Reset(); + } } } } @@ -484,7 +492,9 @@ private void Resuming(object sender, HostResumingEventArgs args) _runState = RUNNING; if (!Environment.HasShutdownStarted) + { _resumeEvent.Set(); + } } } } @@ -504,7 +514,7 @@ private void UnregisterHostLifecycleEventHandlers() } } - class SchedulePeriodicRecursive + private sealed class SchedulePeriodicRecursive { private readonly IScheduler _scheduler; private readonly TimeSpan _period; @@ -605,14 +615,16 @@ private void Tick(int command, Action recurse) // we make tail calls to play nice with the scheduler. // if (Interlocked.Decrement(ref _pendingTickCount) > 0) + { recurse(DISPATCH_START, TimeSpan.Zero); + } break; } } } - class EmulatedStopwatch : IStopwatch + private sealed class EmulatedStopwatch : IStopwatch { private readonly IScheduler _scheduler; private readonly DateTimeOffset _start; @@ -623,10 +635,7 @@ public EmulatedStopwatch(IScheduler scheduler) _start = _scheduler.Now; } - public TimeSpan Elapsed - { - get { return Scheduler.Normalize(_scheduler.Now - _start); } - } + public TimeSpan Elapsed => Normalize(_scheduler.Now - _start); } } } diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Services.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Services.cs index e2c86bff01..1ff021ec5a 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Services.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Services.cs @@ -18,70 +18,61 @@ public static partial class Scheduler }; /// - /// Returns the ISchedulerLongRunning implementation of the specified scheduler, or null if no such implementation is available. + /// Returns the implementation of the specified scheduler, or null if no such implementation is available. /// - /// Scheduler to get the ISchedulerLongRunning implementation for. - /// The scheduler's ISchedulerLongRunning implementation if available; null otherwise. + /// Scheduler to get the implementation for. + /// The scheduler's implementation if available; null otherwise. /// /// This helper method is made available for query operator authors in order to discover scheduler services by using the required /// IServiceProvider pattern, which allows for interception or redefinition of scheduler services. /// - public static ISchedulerLongRunning AsLongRunning(this IScheduler scheduler) - { - var svc = scheduler as IServiceProvider; - if (svc != null) - return (ISchedulerLongRunning)svc.GetService(typeof(ISchedulerLongRunning)); - - return null; - } + public static ISchedulerLongRunning AsLongRunning(this IScheduler scheduler) => As(scheduler); /// - /// Returns the IStopwatchProvider implementation of the specified scheduler, or null if no such implementation is available. + /// Returns the implementation of the specified scheduler, or null if no such implementation is available. /// - /// Scheduler to get the IStopwatchProvider implementation for. - /// The scheduler's IStopwatchProvider implementation if available; null otherwise. + /// Scheduler to get the implementation for. + /// The scheduler's implementation if available; null otherwise. /// /// /// This helper method is made available for query operator authors in order to discover scheduler services by using the required /// IServiceProvider pattern, which allows for interception or redefinition of scheduler services. /// /// - /// Consider using in case a stopwatch is required, but use of emulation stopwatch based + /// Consider using in case a stopwatch is required, but use of emulation stopwatch based /// on the scheduler's clock is acceptable. Use of this method is recommended for best-effort use of the stopwatch provider /// scheduler service, where the caller falls back to not using stopwatches if this facility wasn't found. /// /// - public static IStopwatchProvider AsStopwatchProvider(this IScheduler scheduler) - { - var svc = scheduler as IServiceProvider; - if (svc != null) - return (IStopwatchProvider)svc.GetService(typeof(IStopwatchProvider)); - - return null; - } + public static IStopwatchProvider AsStopwatchProvider(this IScheduler scheduler) => As(scheduler); /// - /// Returns the IStopwatchProvider implementation of the specified scheduler, or null if no such implementation is available. + /// Returns the implementation of the specified scheduler, or null if no such implementation is available. /// - /// Scheduler to get the IStopwatchProvider implementation for. - /// The scheduler's IStopwatchProvider implementation if available; null otherwise. + /// Scheduler to get the implementation for. + /// The scheduler's implementation if available; null otherwise. /// /// /// This helper method is made available for query operator authors in order to discover scheduler services by using the required /// IServiceProvider pattern, which allows for interception or redefinition of scheduler services. /// /// - /// Consider using the Scheduler.SchedulePeriodic extension methods for IScheduler in case periodic scheduling is required and - /// emulation of periodic behavior using other scheduler services is desirable. Use of this method is recommended for best-effort - /// use of the periodic scheduling service, where the caller falls back to not using periodic scheduling if this facility wasn't - /// found. + /// Consider using the extension methods for in case periodic scheduling + /// is required and emulation of periodic behavior using other scheduler services is desirable. Use of this method is recommended + /// for best-effort use of the periodic scheduling service, where the caller falls back to not using periodic scheduling if this + /// facility wasn't found. /// /// - public static ISchedulerPeriodic AsPeriodic(this IScheduler scheduler) + public static ISchedulerPeriodic AsPeriodic(this IScheduler scheduler) => As(scheduler); + + private static T As(IScheduler scheduler) + where T : class { var svc = scheduler as IServiceProvider; if (svc != null) - return (ISchedulerPeriodic)svc.GetService(typeof(ISchedulerPeriodic)); + { + return (T)svc.GetService(typeof(T)); + } return null; } diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Simple.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Simple.cs index bc23f61848..4a4b92f934 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Simple.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Simple.cs @@ -14,7 +14,7 @@ public static partial class Scheduler /// Scheduler to execute the action on. /// Action to execute. /// The disposable object used to cancel the scheduled action (best effort). - /// or is null. + /// or is null. public static IDisposable Schedule(this IScheduler scheduler, Action action) { if (scheduler == null) @@ -32,7 +32,7 @@ public static IDisposable Schedule(this IScheduler scheduler, Action action) /// Action to execute. /// Relative time after which to execute the action. /// The disposable object used to cancel the scheduled action (best effort). - /// or is null. + /// or is null. public static IDisposable Schedule(this IScheduler scheduler, TimeSpan dueTime, Action action) { if (scheduler == null) @@ -50,7 +50,7 @@ public static IDisposable Schedule(this IScheduler scheduler, TimeSpan dueTime, /// Action to execute. /// Absolute time at which to execute the action. /// The disposable object used to cancel the scheduled action (best effort). - /// or is null. + /// or is null. public static IDisposable Schedule(this IScheduler scheduler, DateTimeOffset dueTime, Action action) { if (scheduler == null) @@ -67,7 +67,7 @@ public static IDisposable Schedule(this IScheduler scheduler, DateTimeOffset due /// Scheduler to execute the action on. /// Action to execute. /// The disposable object used to cancel the scheduled action (best effort). - /// or is null. + /// or is null. public static IDisposable ScheduleLongRunning(this ISchedulerLongRunning scheduler, Action action) { if (scheduler == null) @@ -78,7 +78,7 @@ public static IDisposable ScheduleLongRunning(this ISchedulerLongRunning schedul return scheduler.ScheduleLongRunning(action, (a, c) => a(c)); } - static IDisposable Invoke(IScheduler scheduler, Action action) + private static IDisposable Invoke(IScheduler scheduler, Action action) { action(); return Disposable.Empty; diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Wrappers.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Wrappers.cs index 636ea8603a..86e08eb5b0 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Wrappers.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Wrappers.cs @@ -11,7 +11,7 @@ public static partial class Scheduler /// /// Scheduler to disable all optimizations for. /// Proxy to the original scheduler but without any optimizations enabled. - /// is null. + /// is null. public static IScheduler DisableOptimizations(this IScheduler scheduler) { if (scheduler == null) @@ -26,7 +26,7 @@ public static IScheduler DisableOptimizations(this IScheduler scheduler) /// Scheduler to disable the specified optimizations for. /// Types of the optimization interfaces that have to be disabled. /// Proxy to the original scheduler but without the specified optimizations enabled. - /// or is null. + /// or is null. public static IScheduler DisableOptimizations(this IScheduler scheduler, params Type[] optimizationInterfaces) { if (scheduler == null) @@ -44,7 +44,7 @@ public static IScheduler DisableOptimizations(this IScheduler scheduler, params /// Scheduler to apply an exception filter for. /// Handler that's run if an exception is caught. The exception will be rethrown if the handler returns false. /// Wrapper around the original scheduler, enforcing exception handling. - /// or is null. + /// or is null. public static IScheduler Catch(this IScheduler scheduler, Func handler) where TException : Exception { diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.cs index 8f0a7c76c0..fd590c208d 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.cs @@ -21,58 +21,29 @@ public static partial class Scheduler /// /// Gets the current time according to the local machine's system clock. /// - public static DateTimeOffset Now - { - get - { - return SystemClock.UtcNow; - } - } + public static DateTimeOffset Now => SystemClock.UtcNow; /// - /// Normalizes the specified TimeSpan value to a positive value. + /// Normalizes the specified value to a positive value. /// - /// The TimeSpan value to normalize. - /// The specified TimeSpan value if it is zero or positive; otherwise, TimeSpan.Zero. - public static TimeSpan Normalize(TimeSpan timeSpan) - { - if (timeSpan.Ticks < 0) - return TimeSpan.Zero; - return timeSpan; - } + /// The value to normalize. + /// The specified TimeSpan value if it is zero or positive; otherwise, . + public static TimeSpan Normalize(TimeSpan timeSpan) => timeSpan.Ticks < 0 ? TimeSpan.Zero : timeSpan; /// /// Gets a scheduler that schedules work immediately on the current thread. /// - public static ImmediateScheduler Immediate - { - get - { - return ImmediateScheduler.Instance; - } - } + public static ImmediateScheduler Immediate => ImmediateScheduler.Instance; /// /// Gets a scheduler that schedules work as soon as possible on the current thread. /// - public static CurrentThreadScheduler CurrentThread - { - get - { - return CurrentThreadScheduler.Instance; - } - } + public static CurrentThreadScheduler CurrentThread => CurrentThreadScheduler.Instance; /// /// Gets a scheduler that schedules work on the platform's default scheduler. /// - public static DefaultScheduler Default - { - get - { - return DefaultScheduler.Instance; - } - } + public static DefaultScheduler Default => DefaultScheduler.Instance; // @@ -95,13 +66,7 @@ public static DefaultScheduler Default /// Gets a scheduler that schedules work on the thread pool. /// [Obsolete(Constants_Core.OBSOLETE_SCHEDULER_THREADPOOL)] - public static IScheduler ThreadPool - { - get - { - return s_threadPool.Value; - } - } + public static IScheduler ThreadPool => s_threadPool.Value; private static Lazy s_newThread = new Lazy(() => Initialize("NewThread")); @@ -109,13 +74,7 @@ public static IScheduler ThreadPool /// Gets a scheduler that schedules work on a new thread using default thread creation options. /// [Obsolete(Constants_Core.OBSOLETE_SCHEDULER_NEWTHREAD)] - public static IScheduler NewThread - { - get - { - return s_newThread.Value; - } - } + public static IScheduler NewThread => s_newThread.Value; private static Lazy s_taskPool = new Lazy(() => Initialize("TaskPool")); @@ -123,13 +82,7 @@ public static IScheduler NewThread /// Gets a scheduler that schedules work on Task Parallel Library (TPL) task pool using the default TaskScheduler. /// [Obsolete(Constants_Core.OBSOLETE_SCHEDULER_TASKPOOL)] - public static IScheduler TaskPool - { - get - { - return s_taskPool.Value; - } - } + public static IScheduler TaskPool => s_taskPool.Value; private static IScheduler Initialize(string name) { diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/SchedulerDefaults.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/SchedulerDefaults.cs index 04d2f3c591..ead23ef2d0 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/SchedulerDefaults.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/SchedulerDefaults.cs @@ -6,10 +6,10 @@ namespace System.Reactive.Concurrency { internal static class SchedulerDefaults { - internal static IScheduler ConstantTimeOperations { get { return ImmediateScheduler.Instance; } } - internal static IScheduler TailRecursion { get { return ImmediateScheduler.Instance; } } - internal static IScheduler Iteration { get { return CurrentThreadScheduler.Instance; } } - internal static IScheduler TimeBasedOperations { get { return DefaultScheduler.Instance; } } - internal static IScheduler AsyncConversions { get { return DefaultScheduler.Instance; } } + internal static IScheduler ConstantTimeOperations => ImmediateScheduler.Instance; + internal static IScheduler TailRecursion => ImmediateScheduler.Instance; + internal static IScheduler Iteration => CurrentThreadScheduler.Instance; + internal static IScheduler TimeBasedOperations => DefaultScheduler.Instance; + internal static IScheduler AsyncConversions => DefaultScheduler.Instance; } } diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/SchedulerOperation.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/SchedulerOperation.cs index 24ab76a8d0..258bc0334e 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/SchedulerOperation.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/SchedulerOperation.cs @@ -53,8 +53,7 @@ public SchedulerOperationAwaiter GetAwaiter() /// (Infrastructure) Scheduler operation awaiter type used by the code generated for C# await and Visual Basic Await expressions. /// [EditorBrowsable(EditorBrowsableState.Never)] - public sealed class SchedulerOperationAwaiter - : INotifyCompletion + public sealed class SchedulerOperationAwaiter : INotifyCompletion { private readonly Func _schedule; private readonly CancellationToken _cancellationToken; @@ -76,18 +75,12 @@ internal SchedulerOperationAwaiter(Func schedule, Cancellat /// /// Indicates whether the scheduler operation has completed. Returns false unless cancellation was already requested. /// - public bool IsCompleted - { - get { return _cancellationToken.IsCancellationRequested; } - } + public bool IsCompleted => _cancellationToken.IsCancellationRequested; /// /// Completes the scheduler operation, throwing an OperationCanceledException in case cancellation was requested. /// - public void GetResult() - { - _cancellationToken.ThrowIfCancellationRequested(); - } + public void GetResult() => _cancellationToken.ThrowIfCancellationRequested(); /// /// Registers the continuation with the scheduler operation. @@ -147,11 +140,8 @@ public void OnCompleted(Action continuation) private void Cancel() { - var w = _work; - if (w != null) - w.Dispose(); - + _work?.Dispose(); _continuation?.Invoke(); } } -} \ No newline at end of file +} diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/SchedulerQueue.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/SchedulerQueue.cs index 46177bd942..e079137790 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/SchedulerQueue.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/SchedulerQueue.cs @@ -39,13 +39,7 @@ public SchedulerQueue(int capacity) /// /// Gets the number of scheduled items in the scheduler queue. /// - public int Count - { - get - { - return _queue.Count; - } - } + public int Count => _queue.Count; /// /// Enqueues the specified work item to be scheduled. @@ -60,28 +54,19 @@ public void Enqueue(ScheduledItem scheduledItem) /// Removes the specified work item from the scheduler queue. /// /// Work item to be removed from the scheduler queue. - /// true if the item was found; false otherwise. - public bool Remove(ScheduledItem scheduledItem) - { - return _queue.Remove(scheduledItem); - } + /// true if the item was found; false otherwise. + public bool Remove(ScheduledItem scheduledItem) => _queue.Remove(scheduledItem); /// /// Dequeues the next work item from the scheduler queue. /// /// Next work item in the scheduler queue (removed). - public ScheduledItem Dequeue() - { - return _queue.Dequeue(); - } + public ScheduledItem Dequeue() => _queue.Dequeue(); /// /// Peeks the next work item in the scheduler queue. /// /// Next work item in the scheduler queue (not removed). - public ScheduledItem Peek() - { - return _queue.Peek(); - } + public ScheduledItem Peek() => _queue.Peek(); } } diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/SchedulerWrapper.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/SchedulerWrapper.cs index 8667a84640..986de4c2c5 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/SchedulerWrapper.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/SchedulerWrapper.cs @@ -23,10 +23,7 @@ public SchedulerWrapper(IScheduler scheduler, ConditionalWeakTable _scheduler.Now; public IDisposable Schedule(TState state, Func action) { diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.ObserveOn.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.ObserveOn.cs index 33b11c850e..77b7f121bb 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.ObserveOn.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.ObserveOn.cs @@ -8,7 +8,7 @@ namespace System.Reactive.Concurrency { - class ObserveOn : Producer + internal sealed class ObserveOn : Producer { private readonly IObservable _source; private readonly IScheduler _scheduler; @@ -43,7 +43,7 @@ protected override IDisposable Run(IObserver observer, IDisposable canc } } - class ObserveOnSink : Sink, IObserver + private sealed class ObserveOnSink : Sink, IObserver { private readonly ObserveOn _parent; @@ -85,24 +85,24 @@ public void OnError(Exception error) public void OnCompleted() { - _parent._context.Post(OnCompletedPosted, null); + _parent._context.Post(OnCompletedPosted, state: null); } private void OnNextPosted(object value) { - base._observer.OnNext((TSource)value); + _observer.OnNext((TSource)value); } private void OnErrorPosted(object error) { - base._observer.OnError((Exception)error); - base.Dispose(); + _observer.OnError((Exception)error); + Dispose(); } private void OnCompletedPosted(object ignored) { - base._observer.OnCompleted(); - base.Dispose(); + _observer.OnCompleted(); + Dispose(); } } } diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.Synchronize.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.Synchronize.cs index 2d517a69cb..719ea5389e 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.Synchronize.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.Synchronize.cs @@ -5,7 +5,7 @@ #if !NO_PERF namespace System.Reactive.Concurrency { - class Synchronize : Producer + internal sealed class Synchronize : Producer { private readonly IObservable _source; private readonly object _gate; @@ -29,7 +29,7 @@ protected override IDisposable Run(IObserver observer, IDisposable canc return _source.SubscribeSafe(sink); } - class _ : Sink, IObserver + private sealed class _ : Sink, IObserver { private readonly Synchronize _parent; private readonly object _gate; @@ -45,7 +45,7 @@ public void OnNext(TSource value) { lock (_gate) { - base._observer.OnNext(value); + _observer.OnNext(value); } } @@ -53,8 +53,8 @@ public void OnError(Exception error) { lock (_gate) { - base._observer.OnError(error); - base.Dispose(); + _observer.OnError(error); + Dispose(); } } @@ -62,8 +62,8 @@ public void OnCompleted() { lock (_gate) { - base._observer.OnCompleted(); - base.Dispose(); + _observer.OnCompleted(); + Dispose(); } } } diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.cs index 4beb5e2b35..c301ee6b16 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.cs @@ -23,7 +23,7 @@ public static class Synchronization /// Source sequence. /// Scheduler to perform subscription and unsubscription actions on. /// The source sequence whose subscriptions and unsubscriptions happen on the specified scheduler. - /// or is null. + /// or is null. /// /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the specified scheduler. /// In order to invoke observer callbacks on the specified scheduler, e.g. to offload callback processing to a dedicated thread, use . @@ -57,7 +57,7 @@ public static IObservable SubscribeOn(IObservable sou /// Source sequence. /// Synchronization context to perform subscription and unsubscription actions on. /// The source sequence whose subscriptions and unsubscriptions happen on the specified synchronization context. - /// or is null. + /// or is null. /// /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the specified synchronization context. /// In order to invoke observer callbacks on the specified synchronization context, e.g. to post callbacks to a UI thread represented by the synchronization context, use . @@ -75,7 +75,9 @@ public static IObservable SubscribeOn(IObservable sou context.PostWithStartComplete(() => { if (!subscription.IsDisposed) + { subscription.Disposable = new ContextDisposable(context, source.SubscribeSafe(observer)); + } }); return subscription; }); @@ -92,7 +94,7 @@ public static IObservable SubscribeOn(IObservable sou /// Source sequence. /// Scheduler to notify observers on. /// The source sequence whose observations happen on the specified scheduler. - /// or is null. + /// or is null. public static IObservable ObserveOn(IObservable source, IScheduler scheduler) { if (source == null) @@ -114,7 +116,7 @@ public static IObservable ObserveOn(IObservable sourc /// Source sequence. /// Synchronization context to notify observers on. /// The source sequence whose observations happen on the specified synchronization context. - /// or is null. + /// or is null. public static IObservable ObserveOn(IObservable source, SynchronizationContext context) { if (source == null) @@ -160,7 +162,7 @@ public static IObservable ObserveOn(IObservable sourc /// The type of the elements in the source sequence. /// Source sequence. /// The source sequence whose outgoing calls to observers are synchronized. - /// is null. + /// is null. public static IObservable Synchronize(IObservable source) { if (source == null) @@ -184,7 +186,7 @@ public static IObservable Synchronize(IObservable sou /// Source sequence. /// Gate object to synchronize each observer call on. /// The source sequence whose outgoing calls to observers are synchronized on the given gate object. - /// or is null. + /// or is null. public static IObservable Synchronize(IObservable source, object gate) { if (source == null) diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/SynchronizationContextScheduler.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/SynchronizationContextScheduler.cs index 38c5aebf67..d65e387f0f 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/SynchronizationContextScheduler.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/SynchronizationContextScheduler.cs @@ -19,7 +19,7 @@ public class SynchronizationContextScheduler : LocalScheduler /// Creates an object that schedules units of work on the provided . /// /// Synchronization context to schedule units of work on. - /// is null. + /// is null. public SynchronizationContextScheduler(SynchronizationContext context) { if (context == null) @@ -34,7 +34,7 @@ public SynchronizationContextScheduler(SynchronizationContext context) /// /// Synchronization context to schedule units of work on. /// Configures whether scheduling always posts to the synchronization context, regardless whether the caller is on the same synchronization context. - /// is null. + /// is null. public SynchronizationContextScheduler(SynchronizationContext context, bool alwaysPost) { if (context == null) @@ -51,7 +51,7 @@ public SynchronizationContextScheduler(SynchronizationContext context, bool alwa /// State passed to the action to be executed. /// Action to be executed. /// The disposable object used to cancel the scheduled action (best effort). - /// is null. + /// is null. public override IDisposable Schedule(TState state, Func action) { if (action == null) @@ -68,7 +68,9 @@ public override IDisposable Schedule(TState state, Func { if (!d.IsDisposed) + { d.Disposable = action(this, state); + } }); } @@ -83,7 +85,7 @@ public override IDisposable Schedule(TState state, FuncAction to be executed. /// Relative time after which to execute the action. /// The disposable object used to cancel the scheduled action (best effort). - /// is null. + /// is null. public override IDisposable Schedule(TState state, TimeSpan dueTime, Func action) { if (action == null) @@ -91,7 +93,9 @@ public override IDisposable Schedule(TState state, TimeSpan dueTime, Fun var dt = Scheduler.Normalize(dueTime); if (dt.Ticks == 0) + { return Schedule(state, action); + } return DefaultScheduler.Instance.Schedule(state, dt, (_, state1) => Schedule(state1, action)); } diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/TaskHelpers.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/TaskHelpers.cs index 258861ca0d..3145083b02 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/TaskHelpers.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/TaskHelpers.cs @@ -7,7 +7,7 @@ namespace System.Reactive.Concurrency { - static class TaskHelpers + internal static class TaskHelpers { private const int MAX_DELAY = int.MaxValue; diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/TaskPoolScheduler.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/TaskPoolScheduler.cs index 0ccdea400f..e82d7b9f1b 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/TaskPoolScheduler.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/TaskPoolScheduler.cs @@ -18,10 +18,10 @@ public sealed class TaskPoolScheduler : LocalScheduler, ISchedulerLongRunning, I private readonly TaskFactory taskFactory; /// - /// Creates an object that schedules units of work using the provided TaskFactory. + /// Creates an object that schedules units of work using the provided . /// /// Task factory used to create tasks to run units of work. - /// is null. + /// is null. public TaskPoolScheduler(TaskFactory taskFactory) { if (taskFactory == null) @@ -31,15 +31,9 @@ public TaskPoolScheduler(TaskFactory taskFactory) } /// - /// Gets an instance of this scheduler that uses the default TaskScheduler. + /// Gets an instance of this scheduler that uses the default . /// - public static TaskPoolScheduler Default - { - get - { - return s_instance.Value; - } - } + public static TaskPoolScheduler Default => s_instance.Value; /// /// Schedules an action to be executed. @@ -48,7 +42,7 @@ public static TaskPoolScheduler Default /// State passed to the action to be executed. /// Action to be executed. /// The disposable object used to cancel the scheduled action (best effort). - /// is null. + /// is null. public override IDisposable Schedule(TState state, Func action) { if (action == null) @@ -96,7 +90,7 @@ public override IDisposable Schedule(TState state, FuncAction to be executed. /// Relative time after which to execute the action. /// The disposable object used to cancel the scheduled action (best effort). - /// is null. + /// is null. public override IDisposable Schedule(TState state, TimeSpan dueTime, Func action) { if (action == null) @@ -104,8 +98,15 @@ public override IDisposable Schedule(TState state, TimeSpan dueTime, Fun var dt = Scheduler.Normalize(dueTime); if (dt.Ticks == 0) + { return Schedule(state, action); + } + + return ScheduleSlow(state, dt, action); + } + private IDisposable ScheduleSlow(TState state, TimeSpan dueTime, Func action) + { var d = new MultipleAssignmentDisposable(); var ct = new CancellationDisposable(); @@ -114,7 +115,9 @@ public override IDisposable Schedule(TState state, TimeSpan dueTime, Fun TaskHelpers.Delay(dueTime, ct.Token).ContinueWith(_ => { if (!d.IsDisposed) + { d.Disposable = action(this, state); + } }, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion, taskFactory.Scheduler); return d; @@ -127,7 +130,7 @@ public override IDisposable Schedule(TState state, TimeSpan dueTime, Fun /// State passed to the action to be executed. /// Action to be executed. /// The disposable object used to cancel the scheduled action (best effort). - /// is null. + /// is null. public IDisposable ScheduleLongRunning(TState state, Action action) { var d = new BooleanDisposable(); @@ -167,8 +170,8 @@ public override IStopwatch StartStopwatch() /// Period for running the work periodically. /// Action to be executed, potentially updating the state. /// The disposable object used to cancel the scheduled recurring action (best effort). - /// is null. - /// is less than TimeSpan.Zero. + /// is null. + /// is less than . public IDisposable SchedulePeriodic(TState state, TimeSpan period, Func action) { if (period < TimeSpan.Zero) diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/Thread.Stub.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/Thread.Stub.cs index 72fdda7fe9..e05a5ba704 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/Thread.Stub.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/Thread.Stub.cs @@ -3,9 +3,11 @@ // See the LICENSE file in the project root for more information. #if NO_THREAD +using System.Threading.Tasks; + namespace System.Reactive.Concurrency { - class Thread + internal sealed class Thread { private readonly ThreadStart _start; @@ -19,15 +21,12 @@ public Thread(ThreadStart start) public void Start() { - System.Threading.Tasks.Task.Factory.StartNew(Run, System.Threading.Tasks.TaskCreationOptions.LongRunning); + Task.Factory.StartNew(Run, TaskCreationOptions.LongRunning); } - private void Run() - { - _start(); - } + private void Run() => _start(); } delegate void ThreadStart(); } -#endif \ No newline at end of file +#endif diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/ThreadPoolScheduler.Windows.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/ThreadPoolScheduler.Windows.cs index 5b8729f302..49982529e4 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/ThreadPoolScheduler.Windows.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/ThreadPoolScheduler.Windows.cs @@ -3,7 +3,6 @@ // See the LICENSE file in the project root for more information. #if WINDOWS -using System.Reactive.Concurrency; using System.Reactive.Disposables; using Windows.System.Threading; @@ -16,8 +15,6 @@ namespace System.Reactive.Concurrency [CLSCompliant(false)] public sealed class ThreadPoolScheduler : LocalScheduler, ISchedulerPeriodic { - private readonly WorkItemPriority _priority; - private readonly WorkItemOptions _options; private static Lazy s_default = new Lazy(() => new ThreadPoolScheduler()); /// @@ -33,8 +30,8 @@ public ThreadPoolScheduler() /// Priority for scheduled units of work. public ThreadPoolScheduler(WorkItemPriority priority) { - _priority = priority; - _options = WorkItemOptions.None; + Priority = priority; + Options = WorkItemOptions.None; } /// @@ -44,36 +41,24 @@ public ThreadPoolScheduler(WorkItemPriority priority) /// Options that configure how work is scheduled. public ThreadPoolScheduler(WorkItemPriority priority, WorkItemOptions options) { - _priority = priority; - _options = options; + Priority = priority; + Options = options; } /// /// Gets the singleton instance of the Windows Runtime thread pool scheduler. /// - public static ThreadPoolScheduler Default - { - get - { - return s_default.Value; - } - } + public static ThreadPoolScheduler Default => s_default.Value; /// /// Gets the priority at which work is scheduled. /// - public WorkItemPriority Priority - { - get { return _priority; } - } + public WorkItemPriority Priority { get; } /// /// Gets the options that configure how work is scheduled. /// - public WorkItemOptions Options - { - get { return _options; } - } + public WorkItemOptions Options { get; } /// /// Schedules an action to be executed. @@ -93,8 +78,10 @@ public override IDisposable Schedule(TState state, Func { if (!d.IsDisposed) + { d.Disposable = action(this, state); - }, _priority, _options); + } + }, Priority, Options); return new CompositeDisposable( d, @@ -119,17 +106,26 @@ public override IDisposable Schedule(TState state, TimeSpan dueTime, Fun var dt = Scheduler.Normalize(dueTime); if (dt.Ticks == 0) + { return Schedule(state, action); + } + return ScheduleSlow(state, dt, action); + } + + private IDisposable ScheduleSlow(TState state, TimeSpan dueTime, Func action) + { var d = new SingleAssignmentDisposable(); var res = global::Windows.System.Threading.ThreadPoolTimer.CreateTimer( tpt => { if (!d.IsDisposed) + { d.Disposable = action(this, state); + } }, - dt + dueTime ); return new CompositeDisposable( @@ -184,4 +180,4 @@ public IDisposable SchedulePeriodic(TState state, TimeSpan period, Func< } } } -#endif \ No newline at end of file +#endif diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/ThreadPoolScheduler.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/ThreadPoolScheduler.cs index 9f3504030c..e17d807513 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/ThreadPoolScheduler.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/ThreadPoolScheduler.cs @@ -21,15 +21,9 @@ public sealed class ThreadPoolScheduler : LocalScheduler, ISchedulerLongRunning, /// /// Gets the singleton instance of the CLR thread pool scheduler. /// - public static ThreadPoolScheduler Instance - { - get - { - return s_instance.Value; - } - } + public static ThreadPoolScheduler Instance => s_instance.Value; - ThreadPoolScheduler() + private ThreadPoolScheduler() { } @@ -40,7 +34,7 @@ public static ThreadPoolScheduler Instance /// State passed to the action to be executed. /// Action to be executed. /// The disposable object used to cancel the scheduled action (best effort). - /// is null. + /// is null. public override IDisposable Schedule(TState state, Func action) { if (action == null) @@ -51,7 +45,9 @@ public override IDisposable Schedule(TState state, Func { if (!d.IsDisposed) + { d.Disposable = action(this, state); + } }, null); return d; @@ -65,7 +61,7 @@ public override IDisposable Schedule(TState state, FuncAction to be executed. /// Relative time after which to execute the action. /// The disposable object used to cancel the scheduled action (best effort). - /// is null. + /// is null. public override IDisposable Schedule(TState state, TimeSpan dueTime, Func action) { if (action == null) @@ -73,7 +69,9 @@ public override IDisposable Schedule(TState state, TimeSpan dueTime, Fun var dt = Scheduler.Normalize(dueTime); if (dt.Ticks == 0) + { return Schedule(state, action); + } return new Timer(this, state, dt, action); } @@ -85,7 +83,7 @@ public override IDisposable Schedule(TState state, TimeSpan dueTime, Fun /// State passed to the action to be executed. /// Action to be executed. /// The disposable object used to cancel the scheduled action (best effort). - /// is null. + /// is null. public IDisposable ScheduleLongRunning(TState state, Action action) { if (action == null) @@ -116,7 +114,7 @@ public override IStopwatch StartStopwatch() /// Period for running the work periodically. /// Action to be executed, potentially updating the state. /// The disposable object used to cancel the scheduled recurring action (best effort). - /// is null. + /// is null. /// is less than zero. public IDisposable SchedulePeriodic(TState state, TimeSpan period, Func action) { @@ -135,7 +133,7 @@ public IDisposable SchedulePeriodic(TState state, TimeSpan period, Func< } } - sealed class FastPeriodicTimer : IDisposable + private sealed class FastPeriodicTimer : IDisposable { private TState _state; private Func _action; @@ -170,7 +168,7 @@ public void Dispose() // below and its timer rooting behavior. // - sealed class Timer : IDisposable + private sealed class Timer : IDisposable { private readonly MultipleAssignmentDisposable _disposable; @@ -214,15 +212,9 @@ private void Tick(object state) } } - private bool IsTimerAssigned() - { - return _timer != null; - } + private bool IsTimerAssigned() => _timer != null; - public void Dispose() - { - _disposable.Dispose(); - } + public void Dispose() => _disposable.Dispose(); private void Stop() { @@ -236,13 +228,10 @@ private void Stop() } } - private IDisposable Nop(IScheduler scheduler, TState state) - { - return Disposable.Empty; - } + private IDisposable Nop(IScheduler scheduler, TState state) => Disposable.Empty; } - sealed class PeriodicTimer : IDisposable + private sealed class PeriodicTimer : IDisposable { private TState _state; private Func _action; @@ -287,4 +276,4 @@ public void Dispose() } } } -#endif \ No newline at end of file +#endif diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/VirtualTimeScheduler.Extensions.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/VirtualTimeScheduler.Extensions.cs index 1a6bf060dd..f51d766567 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/VirtualTimeScheduler.Extensions.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/VirtualTimeScheduler.Extensions.cs @@ -12,7 +12,7 @@ namespace System.Reactive.Concurrency public static class VirtualTimeSchedulerExtensions { /// - /// Schedules an action to be executed at dueTime. + /// Schedules an action to be executed at . /// /// Absolute time representation type. /// Relative time representation type. @@ -20,7 +20,7 @@ public static class VirtualTimeSchedulerExtensions /// Relative time after which to execute the action. /// Action to be executed. /// The disposable object used to cancel the scheduled action (best effort). - /// or is null. + /// or is null. public static IDisposable ScheduleRelative(this VirtualTimeSchedulerBase scheduler, TRelative dueTime, Action action) where TAbsolute : IComparable { @@ -33,7 +33,7 @@ public static IDisposable ScheduleRelative(this VirtualTim } /// - /// Schedules an action to be executed at dueTime. + /// Schedules an action to be executed at . /// /// Absolute time representation type. /// Relative time representation type. @@ -41,7 +41,7 @@ public static IDisposable ScheduleRelative(this VirtualTim /// Absolute time at which to execute the action. /// Action to be executed. /// The disposable object used to cancel the scheduled action (best effort). - /// or is null. + /// or is null. public static IDisposable ScheduleAbsolute(this VirtualTimeSchedulerBase scheduler, TAbsolute dueTime, Action action) where TAbsolute : IComparable { @@ -53,7 +53,7 @@ public static IDisposable ScheduleAbsolute(this VirtualTim return scheduler.ScheduleAbsolute(action, dueTime, Invoke); } - static IDisposable Invoke(IScheduler scheduler, Action action) + private static IDisposable Invoke(IScheduler scheduler, Action action) { action(); return Disposable.Empty; diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/VirtualTimeScheduler.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/VirtualTimeScheduler.cs index c901563a64..8db063589d 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/VirtualTimeScheduler.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/VirtualTimeScheduler.cs @@ -29,7 +29,7 @@ protected VirtualTimeSchedulerBase() /// /// Initial value for the clock. /// Comparer to determine causality of events based on absolute time. - /// is null. + /// is null. protected VirtualTimeSchedulerBase(TAbsolute initialClock, IComparer comparer) { if (comparer == null) @@ -64,20 +64,12 @@ protected VirtualTimeSchedulerBase(TAbsolute initialClock, IComparer /// /// Gets whether the scheduler is enabled to run work. /// - public bool IsEnabled - { - get; - private set; - } + public bool IsEnabled { get; private set; } /// /// Gets the comparer used to compare absolute time values. /// - protected IComparer Comparer - { - get; - private set; - } + protected IComparer Comparer { get; } /// /// Schedules an action to be executed at dueTime. @@ -114,7 +106,7 @@ public IDisposable ScheduleRelative(TState state, TRelative dueTime, Fun /// State passed to the action to be executed. /// Action to be executed. /// The disposable object used to cancel the scheduled action (best effort). - /// is null. + /// is null. public IDisposable Schedule(TState state, Func action) { if (action == null) @@ -131,7 +123,7 @@ public IDisposable Schedule(TState state, FuncRelative time after which to execute the action. /// Action to be executed. /// The disposable object used to cancel the scheduled action (best effort). - /// is null. + /// is null. public IDisposable Schedule(TState state, TimeSpan dueTime, Func action) { if (action == null) @@ -148,7 +140,7 @@ public IDisposable Schedule(TState state, TimeSpan dueTime, FuncAbsolute time at which to execute the action. /// Action to be executed. /// The disposable object used to cancel the scheduled action (best effort). - /// is null. + /// is null. public IDisposable Schedule(TState state, DateTimeOffset dueTime, Func action) { if (action == null) @@ -171,11 +163,16 @@ public void Start() if (next != null) { if (Comparer.Compare(next.DueTime, Clock) > 0) + { Clock = next.DueTime; + } + next.Invoke(); } else + { IsEnabled = false; + } } while (IsEnabled); } } @@ -212,18 +209,23 @@ public void AdvanceTo(TAbsolute time) if (next != null && Comparer.Compare(next.DueTime, time) <= 0) { if (Comparer.Compare(next.DueTime, Clock) > 0) + { Clock = next.DueTime; + } + next.Invoke(); } else + { IsEnabled = false; + } } while (IsEnabled); Clock = time; } else { - throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, Strings_Linq.CANT_ADVANCE_WHILE_RUNNING, "AdvanceTo")); + throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, Strings_Linq.CANT_ADVANCE_WHILE_RUNNING, nameof(AdvanceTo))); } } @@ -250,7 +252,7 @@ public void AdvanceBy(TRelative time) } else { - throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, Strings_Linq.CANT_ADVANCE_WHILE_RUNNING, "AdvanceBy")); + throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, Strings_Linq.CANT_ADVANCE_WHILE_RUNNING, nameof(AdvanceBy))); } } @@ -282,10 +284,7 @@ public TAbsolute Clock /// /// Gets the scheduler's notion of current time. /// - public DateTimeOffset Now - { - get { return ToDateTimeOffset(Clock); } - } + public DateTimeOffset Now => ToDateTimeOffset(Clock); /// /// Gets the next scheduled item to be executed. @@ -294,10 +293,7 @@ public DateTimeOffset Now [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1024:UsePropertiesWhereAppropriate", Justification = "By design. Side-effecting operation to retrieve the next element.")] protected abstract IScheduledItem GetNext(); - object IServiceProvider.GetService(Type serviceType) - { - return GetService(serviceType); - } + object IServiceProvider.GetService(Type serviceType) => GetService(serviceType); /// /// Discovers scheduler services by interface type. The base class implementation supports @@ -324,7 +320,7 @@ public IStopwatch StartStopwatch() return new VirtualTimeStopwatch(() => ToDateTimeOffset(Clock) - start); } - class VirtualTimeStopwatch : IStopwatch + private sealed class VirtualTimeStopwatch : IStopwatch { private readonly Func _getElapsed; @@ -333,10 +329,7 @@ public VirtualTimeStopwatch(Func getElapsed) _getElapsed = getElapsed; } - public TimeSpan Elapsed - { - get { return _getElapsed(); } - } + public TimeSpan Elapsed => _getElapsed(); } } @@ -363,7 +356,7 @@ protected VirtualTimeScheduler() /// /// Initial value for the clock. /// Comparer to determine causality of events based on absolute time. - /// is null. + /// is null. protected VirtualTimeScheduler(TAbsolute initialClock, IComparer comparer) : base(initialClock, comparer) { @@ -381,9 +374,13 @@ protected override IScheduledItem GetNext() { var next = queue.Peek(); if (next.IsCanceled) + { queue.Dequeue(); + } else + { return next; + } } } @@ -398,7 +395,7 @@ protected override IScheduledItem GetNext() /// Action to be executed. /// Absolute time at which to execute the action. /// The disposable object used to cancel the scheduled action (best effort). - /// is null. + /// is null. public override IDisposable ScheduleAbsolute(TState state, TAbsolute dueTime, Func action) { if (action == null) diff --git a/Rx.NET/Source/src/System.Reactive/Platforms/Windows/Concurrency/CoreDispatcherScheduler.cs b/Rx.NET/Source/src/System.Reactive/Platforms/Windows/Concurrency/CoreDispatcherScheduler.cs index 37b0ac5377..4509eed2d6 100644 --- a/Rx.NET/Source/src/System.Reactive/Platforms/Windows/Concurrency/CoreDispatcherScheduler.cs +++ b/Rx.NET/Source/src/System.Reactive/Platforms/Windows/Concurrency/CoreDispatcherScheduler.cs @@ -203,7 +203,7 @@ public override IDisposable Schedule(TState state, TimeSpan dueTime, Fun /// Action to be executed, potentially updating the state. /// The disposable object used to cancel the scheduled recurring action (best effort). /// is null. - /// is less than TimeSpan.Zero. + /// is less than . public IDisposable SchedulePeriodic(TState state, TimeSpan period, Func action) { // diff --git a/Rx.NET/Source/src/System.Reactive/Platforms/Windows/Concurrency/DispatcherScheduler.cs b/Rx.NET/Source/src/System.Reactive/Platforms/Windows/Concurrency/DispatcherScheduler.cs index 74410bc8d0..caa515ddef 100644 --- a/Rx.NET/Source/src/System.Reactive/Platforms/Windows/Concurrency/DispatcherScheduler.cs +++ b/Rx.NET/Source/src/System.Reactive/Platforms/Windows/Concurrency/DispatcherScheduler.cs @@ -192,7 +192,7 @@ public override IDisposable Schedule(TState state, TimeSpan dueTime, Fun /// Action to be executed, potentially updating the state. /// The disposable object used to cancel the scheduled recurring action (best effort). /// is null. - /// is less than TimeSpan.Zero. + /// is less than . public IDisposable SchedulePeriodic(TState state, TimeSpan period, Func action) { if (period < TimeSpan.Zero) From d6794e1b2cae79552ec3b39b4a0ccad17a66079b Mon Sep 17 00:00:00 2001 From: Bart De Smet Date: Thu, 13 Apr 2017 22:03:47 -0700 Subject: [PATCH 2/2] A few more changes --- .../Concurrency/CoreDispatcherScheduler.cs | 56 ++++++------ .../Concurrency/DispatcherScheduler.cs | 85 ++++++++----------- 2 files changed, 61 insertions(+), 80 deletions(-) diff --git a/Rx.NET/Source/src/System.Reactive/Platforms/Windows/Concurrency/CoreDispatcherScheduler.cs b/Rx.NET/Source/src/System.Reactive/Platforms/Windows/Concurrency/CoreDispatcherScheduler.cs index 4509eed2d6..8f86943fe7 100644 --- a/Rx.NET/Source/src/System.Reactive/Platforms/Windows/Concurrency/CoreDispatcherScheduler.cs +++ b/Rx.NET/Source/src/System.Reactive/Platforms/Windows/Concurrency/CoreDispatcherScheduler.cs @@ -13,7 +13,7 @@ namespace System.Reactive.Concurrency { /// - /// Represents an object that schedules units of work on a Windows.UI.Core.CoreDispatcher. + /// Represents an object that schedules units of work on a . /// /// /// This scheduler type is typically used indirectly through the and methods that use the current Dispatcher. @@ -21,40 +21,37 @@ namespace System.Reactive.Concurrency [CLSCompliant(false)] public sealed class CoreDispatcherScheduler : LocalScheduler, ISchedulerPeriodic { - private readonly CoreDispatcher _dispatcher; - private readonly CoreDispatcherPriority _priority; - /// - /// Constructs a CoreDispatcherScheduler that schedules units of work on the given Windows.UI.Core.CoreDispatcher. + /// Constructs a that schedules units of work on the given . /// /// Dispatcher to schedule work on. - /// is null. + /// is null. public CoreDispatcherScheduler(CoreDispatcher dispatcher) { if (dispatcher == null) throw new ArgumentNullException(nameof(dispatcher)); - _dispatcher = dispatcher; - _priority = CoreDispatcherPriority.Normal; + Dispatcher = dispatcher; + Priority = CoreDispatcherPriority.Normal; } /// - /// Constructs a CoreDispatcherScheduler that schedules units of work on the given Windows.UI.Core.CoreDispatcher with the given priority. + /// Constructs a that schedules units of work on the given with the given priority. /// /// Dispatcher to schedule work on. /// Priority for scheduled units of work. - /// is null. + /// is null. public CoreDispatcherScheduler(CoreDispatcher dispatcher, CoreDispatcherPriority priority) { if (dispatcher == null) throw new ArgumentNullException(nameof(dispatcher)); - _dispatcher = dispatcher; - _priority = priority; + Dispatcher = dispatcher; + Priority = priority; } /// - /// Gets the scheduler that schedules work on the Windows.UI.Core.CoreDispatcher associated with the current Window. + /// Gets the scheduler that schedules work on the associated with the current Window. /// public static CoreDispatcherScheduler Current { @@ -69,20 +66,14 @@ public static CoreDispatcherScheduler Current } /// - /// Gets the Windows.UI.Core.CoreDispatcher associated with the CoreDispatcherScheduler. + /// Gets the associated with the . /// - public CoreDispatcher Dispatcher - { - get { return _dispatcher; } - } + public CoreDispatcher Dispatcher { get; } /// /// Gets the priority at which work is scheduled. /// - public CoreDispatcherPriority Priority - { - get { return _priority; } - } + public CoreDispatcherPriority Priority { get; } /// /// Schedules an action to be executed on the dispatcher. @@ -91,7 +82,7 @@ public CoreDispatcherPriority Priority /// State passed to the action to be executed. /// Action to be executed. /// The disposable object used to cancel the scheduled action (best effort). - /// is null. + /// is null. public override IDisposable Schedule(TState state, Func action) { if (action == null) @@ -99,7 +90,7 @@ public override IDisposable Schedule(TState state, Func + var res = Dispatcher.RunAsync(Priority, () => { if (!d.IsDisposed) { @@ -140,14 +131,14 @@ public override IDisposable Schedule(TState state, Func - /// Schedules an action to be executed after dueTime on the dispatcher, using a Windows.UI.Xaml.DispatcherTimer object. + /// Schedules an action to be executed after on the dispatcher, using a object. /// /// The type of the state passed to the scheduled action. /// State passed to the action to be executed. /// Action to be executed. /// Relative time after which to execute the action. /// The disposable object used to cancel the scheduled action (best effort). - /// is null. + /// is null. public override IDisposable Schedule(TState state, TimeSpan dueTime, Func action) { if (action == null) @@ -155,8 +146,15 @@ public override IDisposable Schedule(TState state, TimeSpan dueTime, Fun var dt = Scheduler.Normalize(dueTime); if (dt.Ticks == 0) + { return Schedule(state, action); + } + return ScheduleSlow(state, dt, action); + } + + private IDisposable ScheduleSlow(TState state, TimeSpan dueTime, Func action) + { var d = new MultipleAssignmentDisposable(); var timer = new DispatcherTimer(); @@ -178,7 +176,7 @@ public override IDisposable Schedule(TState state, TimeSpan dueTime, Fun } }; - timer.Interval = dt; + timer.Interval = dueTime; timer.Start(); d.Disposable = Disposable.Create(() => @@ -195,14 +193,14 @@ public override IDisposable Schedule(TState state, TimeSpan dueTime, Fun } /// - /// Schedules a periodic piece of work on the dispatcher, using a Windows.UI.Xaml.DispatcherTimer object. + /// Schedules a periodic piece of work on the dispatcher, using a object. /// /// The type of the state passed to the scheduled action. /// Initial state passed to the action upon the first iteration. /// Period for running the work periodically. /// Action to be executed, potentially updating the state. /// The disposable object used to cancel the scheduled recurring action (best effort). - /// is null. + /// is null. /// is less than . public IDisposable SchedulePeriodic(TState state, TimeSpan period, Func action) { diff --git a/Rx.NET/Source/src/System.Reactive/Platforms/Windows/Concurrency/DispatcherScheduler.cs b/Rx.NET/Source/src/System.Reactive/Platforms/Windows/Concurrency/DispatcherScheduler.cs index caa515ddef..4f9fc90759 100644 --- a/Rx.NET/Source/src/System.Reactive/Platforms/Windows/Concurrency/DispatcherScheduler.cs +++ b/Rx.NET/Source/src/System.Reactive/Platforms/Windows/Concurrency/DispatcherScheduler.cs @@ -20,16 +20,7 @@ public class DispatcherScheduler : LocalScheduler, ISchedulerPeriodic /// Gets the scheduler that schedules work on the current . /// [Obsolete(Constants_WindowsThreading.OBSOLETE_INSTANCE_PROPERTY)] - public static DispatcherScheduler Instance - { - get - { - return new DispatcherScheduler( - - System.Windows.Threading.Dispatcher.CurrentDispatcher - ); - } - } + public static DispatcherScheduler Instance => new DispatcherScheduler(System.Windows.Threading.Dispatcher.CurrentDispatcher); /// /// Gets the scheduler that schedules work on the for the current thread. @@ -43,61 +34,48 @@ public static DispatcherScheduler Current throw new InvalidOperationException(Strings_WindowsThreading.NO_DISPATCHER_CURRENT_THREAD); return new DispatcherScheduler(dispatcher); - } } - System.Windows.Threading.Dispatcher _dispatcher; - System.Windows.Threading.DispatcherPriority _priority; - - /// - /// Constructs a DispatcherScheduler that schedules units of work on the given . + /// Constructs a that schedules units of work on the given . /// - /// Dispatcher to schedule work on. - /// is null. + /// to schedule work on. + /// is null. public DispatcherScheduler(System.Windows.Threading.Dispatcher dispatcher) { if (dispatcher == null) throw new ArgumentNullException(nameof(dispatcher)); - _dispatcher = dispatcher; - _priority = Windows.Threading.DispatcherPriority.Normal; + Dispatcher = dispatcher; + Priority = Windows.Threading.DispatcherPriority.Normal; } /// - /// Constructs a DispatcherScheduler that schedules units of work on the given at the given priority. + /// Constructs a that schedules units of work on the given at the given priority. /// - /// Dispatcher to schedule work on. + /// to schedule work on. /// Priority at which units of work are scheduled. - /// is null. + /// is null. public DispatcherScheduler(System.Windows.Threading.Dispatcher dispatcher, System.Windows.Threading.DispatcherPriority priority) { if (dispatcher == null) throw new ArgumentNullException(nameof(dispatcher)); - _dispatcher = dispatcher; - _priority = priority; + Dispatcher = dispatcher; + Priority = priority; } - /// - /// Gets the associated with the DispatcherScheduler. + /// Gets the associated with the . /// - public System.Windows.Threading.Dispatcher Dispatcher - { - get { return _dispatcher; } - } + public System.Windows.Threading.Dispatcher Dispatcher { get; } /// /// Gets the priority at which work items will be dispatched. /// - public System.Windows.Threading.DispatcherPriority Priority - { - get { return _priority; } - } - + public System.Windows.Threading.DispatcherPriority Priority { get; } /// /// Schedules an action to be executed on the dispatcher. @@ -106,7 +84,7 @@ public System.Windows.Threading.DispatcherPriority Priority /// State passed to the action to be executed. /// Action to be executed. /// The disposable object used to cancel the scheduled action (best effort). - /// is null. + /// is null. public override IDisposable Schedule(TState state, Func action) { if (action == null) @@ -114,27 +92,29 @@ public override IDisposable Schedule(TState state, Func { if (!d.IsDisposed) + { d.Disposable = action(this, state); - }) - , _priority + } + }), + Priority ); return d; } /// - /// Schedules an action to be executed after dueTime on the dispatcher, using a object. + /// Schedules an action to be executed after on the dispatcher, using a object. /// /// The type of the state passed to the scheduled action. /// State passed to the action to be executed. /// Action to be executed. /// Relative time after which to execute the action. /// The disposable object used to cancel the scheduled action (best effort). - /// is null. + /// is null. public override IDisposable Schedule(TState state, TimeSpan dueTime, Func action) { if (action == null) @@ -142,13 +122,18 @@ public override IDisposable Schedule(TState state, TimeSpan dueTime, Fun var dt = Scheduler.Normalize(dueTime); if (dt.Ticks == 0) + { return Schedule(state, action); + } + + return ScheduleSlow(state, dt, action); + } + private IDisposable ScheduleSlow(TState state, TimeSpan dueTime, Func action) + { var d = new MultipleAssignmentDisposable(); - var timer = new System.Windows.Threading.DispatcherTimer( - _priority, _dispatcher - ); + var timer = new System.Windows.Threading.DispatcherTimer(Priority, Dispatcher); timer.Tick += (s, e) => { @@ -167,7 +152,7 @@ public override IDisposable Schedule(TState state, TimeSpan dueTime, Fun } }; - timer.Interval = dt; + timer.Interval = dueTime; timer.Start(); d.Disposable = Disposable.Create(() => @@ -191,7 +176,7 @@ public override IDisposable Schedule(TState state, TimeSpan dueTime, Fun /// Period for running the work periodically. /// Action to be executed, potentially updating the state. /// The disposable object used to cancel the scheduled recurring action (best effort). - /// is null. + /// is null. /// is less than . public IDisposable SchedulePeriodic(TState state, TimeSpan period, Func action) { @@ -200,9 +185,7 @@ public IDisposable SchedulePeriodic(TState state, TimeSpan period, Func< if (action == null) throw new ArgumentNullException(nameof(action)); - var timer = new System.Windows.Threading.DispatcherTimer( - _priority, _dispatcher - ); + var timer = new System.Windows.Threading.DispatcherTimer(Priority, Dispatcher); var state1 = state; @@ -226,4 +209,4 @@ public IDisposable SchedulePeriodic(TState state, TimeSpan period, Func< } } } -#endif \ No newline at end of file +#endif