-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathObservableBusyStack.cs
157 lines (132 loc) · 4.12 KB
/
ObservableBusyStack.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq;
using System.Threading;
namespace MvvmScarletToolkit.Observables
{
/// <summary>
/// Will notify its owner and all subscribers via a provided action on if it contains more tokens
/// </summary>
[DebuggerDisplay("{_id}")]
public sealed class ObservableBusyStack : IObservableBusyStack
{
private readonly string _id;
private readonly ConcurrentDictionary<IObserver<bool>, object> _observers;
private readonly Action<bool> _onChanged;
private int _items;
private bool _disposed;
public ObservableBusyStack(in Action<bool> onChanged)
{
_onChanged = onChanged ?? throw new ArgumentNullException(nameof(onChanged));
_id = Convert.ToBase64String(Guid.NewGuid().ToByteArray());
_observers = new ConcurrentDictionary<IObserver<bool>, object>();
}
public void Pull()
{
if (_disposed)
{
throw new ObjectDisposedException(nameof(ObservableBusyStack));
}
var oldValue = _items > 0;
Interlocked.Decrement(ref _items);
var newValue = _items > 0;
if (oldValue.Equals(newValue))
{
return;
}
#if DEBUG
Debug.WriteLine($"ObservableBusyStack({_id}) PULL HasItems: {newValue}");
#endif
Notify(newValue);
}
public void Push()
{
if (_disposed)
{
throw new ObjectDisposedException(nameof(ObservableBusyStack));
}
var oldValue = _items > 0;
Interlocked.Increment(ref _items);
var newValue = _items > 0;
if (oldValue.Equals(newValue))
{
return;
}
#if DEBUG
Debug.WriteLine($"ObservableBusyStack({_id}) PUSH HasItems: {newValue}");
#endif
Notify(newValue);
}
private void Notify(bool hasItems)
{
NotifyOwner(hasItems);
NotifySubscribers(hasItems);
}
private void NotifySubscribers(bool newValue)
{
var observers = _observers.Keys.ToArray();
for (var i = 0; i < observers.Length; i++)
{
var observer = observers[i];
InvokeOnChanged(observer, newValue);
}
}
private void NotifyOwner(bool newValue)
{
InvokeOnChanged(newValue);
}
public IDisposable Subscribe(IObserver<bool> observer)
{
if (_disposed)
{
throw new ObjectDisposedException(nameof(ObservableBusyStack));
}
return new DisposalToken<bool>(observer, _observers);
}
[DebuggerStepThrough]
private void InvokeOnChanged(bool newValue)
{
_onChanged(newValue);
}
[DebuggerStepThrough]
private void InvokeOnChanged(IObserver<bool> observer, bool newValue)
{
if (newValue)
{
observer.OnNext(newValue);
}
else
{
observer.OnCompleted();
}
}
/// <summary>
/// Returns a new <see cref="IDisposable"/> thats associated with <see cref="this"/> instance of a <see cref="IDisposable"/>
/// </summary>
/// <returns>a new <see cref="IDisposable"/></returns>
/// <exception cref="ObjectDisposedException"></exception>
[DebuggerStepThrough]
public IDisposable GetToken()
{
if (_disposed)
{
throw new ObjectDisposedException(nameof(ObservableBusyStack));
}
return new BusyToken(this);
}
public void Dispose()
{
if (_disposed)
{
return;
}
_observers.Clear();
_disposed = true;
}
public override string ToString()
{
return $"{_id} HasItems: {_items > 0}";
}
}
}