@@ -11,11 +11,14 @@ use crate::{global::InputSourceError, image::RawImage, models::Color};
11
11
mod definition;
12
12
pub use definition:: * ;
13
13
14
- mod runtime;
14
+ mod providers;
15
+ pub use providers:: Providers ;
15
16
16
17
mod instance;
17
18
use instance:: * ;
18
19
20
+ use self :: providers:: { Provider , ProviderError } ;
21
+
19
22
pub struct EffectRunHandle {
20
23
ctx : Sender < ControlMessage > ,
21
24
join_handle : Option < JoinHandle < ( ) > > ,
@@ -69,52 +72,141 @@ pub enum EffectMessageKind {
69
72
SetColor { color : Color } ,
70
73
SetImage { image : Arc < RawImage > } ,
71
74
SetLedColors { colors : Arc < Vec < Color > > } ,
72
- Completed { result : Result < ( ) , pyo3:: PyErr > } ,
75
+ Completed { result : Result < ( ) , ProviderError > } ,
76
+ }
77
+
78
+ #[ derive( Default , Debug , Clone ) ]
79
+ pub struct EffectRegistry {
80
+ effects : Vec < EffectHandle > ,
73
81
}
74
82
75
- pub fn run < X : std:: fmt:: Debug + Clone + Send + ' static > (
76
- effect : & EffectDefinition ,
77
- args : serde_json:: Value ,
78
- led_count : usize ,
79
- duration : Option < chrono:: Duration > ,
80
- priority : i32 ,
81
- tx : Sender < EffectMessage < X > > ,
82
- extra : X ,
83
- ) -> Result < EffectRunHandle , RunEffectError > {
84
- // Resolve path
85
- let full_path = effect. script_path ( ) ?;
86
-
87
- // Create control channel
88
- let ( ctx, crx) = channel ( 1 ) ;
89
-
90
- // Create instance methods
91
- let methods = InstanceMethods :: new (
92
- tx. clone ( ) ,
93
- crx,
94
- led_count,
95
- duration. and_then ( |d| d. to_std ( ) . ok ( ) ) ,
96
- extra. clone ( ) ,
97
- ) ;
98
-
99
- // Run effect
100
- let join_handle = tokio:: task:: spawn ( async move {
101
- // Run the blocking task
102
- let result = tokio:: task:: spawn_blocking ( move || runtime:: run ( & full_path, args, methods) )
83
+ impl EffectRegistry {
84
+ pub fn new ( ) -> Self {
85
+ Self :: default ( )
86
+ }
87
+
88
+ pub fn iter ( & self ) -> impl Iterator < Item = & EffectDefinition > {
89
+ self . effects . iter ( ) . map ( |handle| & handle. definition )
90
+ }
91
+
92
+ pub fn find_effect ( & self , name : & str ) -> Option < & EffectHandle > {
93
+ self . effects . iter ( ) . find ( |e| e. definition . name == name)
94
+ }
95
+
96
+ pub fn len ( & self ) -> usize {
97
+ self . effects . len ( )
98
+ }
99
+
100
+ /// Add definitions to this registry
101
+ ///
102
+ /// # Parameters
103
+ ///
104
+ /// * `providers`: effect providers
105
+ /// * `definitions`: effect definitions to register
106
+ ///
107
+ /// # Returns
108
+ ///
109
+ /// Effect definitions that are not supported by any provider.
110
+ pub fn add_definitions (
111
+ & mut self ,
112
+ providers : & Providers ,
113
+ definitions : Vec < EffectDefinition > ,
114
+ ) -> Vec < EffectDefinition > {
115
+ let mut remaining = vec ! [ ] ;
116
+
117
+ for definition in definitions {
118
+ if let Some ( provider) = providers. get ( & definition. script ) {
119
+ debug ! ( provider=?provider, effect=%definition. name, "assigned provider to effect" ) ;
120
+
121
+ self . effects . push ( EffectHandle {
122
+ definition,
123
+ provider,
124
+ } ) ;
125
+ } else {
126
+ debug ! ( effect=%definition. name, "no provider for effect" ) ;
127
+
128
+ remaining. push ( definition) ;
129
+ }
130
+ }
131
+
132
+ remaining
133
+ }
134
+ }
135
+
136
+ #[ derive( Debug , Clone ) ]
137
+ pub struct EffectHandle {
138
+ pub definition : EffectDefinition ,
139
+ provider : Arc < dyn Provider > ,
140
+ }
141
+
142
+ impl EffectHandle {
143
+ pub fn run < X : std:: fmt:: Debug + Clone + Send + ' static > (
144
+ & self ,
145
+ args : serde_json:: Value ,
146
+ led_count : usize ,
147
+ duration : Option < chrono:: Duration > ,
148
+ priority : i32 ,
149
+ tx : Sender < EffectMessage < X > > ,
150
+ extra : X ,
151
+ ) -> Result < EffectRunHandle , RunEffectError > {
152
+ // Resolve path
153
+ let full_path = self . definition . script_path ( ) ?;
154
+
155
+ // Clone provider arc
156
+ let provider = self . provider . clone ( ) ;
157
+
158
+ // Create control channel
159
+ let ( ctx, crx) = channel ( 1 ) ;
160
+
161
+ // Create channel to wrap data
162
+ let ( etx, mut erx) = channel ( 1 ) ;
163
+
164
+ // Create instance methods
165
+ let methods =
166
+ InstanceMethods :: new ( etx, crx, led_count, duration. and_then ( |d| d. to_std ( ) . ok ( ) ) ) ;
167
+
168
+ // Run effect
169
+ let join_handle = tokio:: task:: spawn ( async move {
170
+ // Create the blocking task
171
+ let mut run_effect =
172
+ tokio:: task:: spawn_blocking ( move || provider. run ( & full_path, args, methods) ) ;
173
+
174
+ // Join the blocking task while forwarding the effect messages
175
+ let result = loop {
176
+ tokio:: select! {
177
+ kind = erx. recv( ) => {
178
+ if let Some ( kind) = kind {
179
+ // Add the extra marker to the message and forward it to the instance
180
+ let msg = EffectMessage { kind, extra: extra. clone( ) } ;
181
+
182
+ if let Err ( err) = tx. send( msg) . await {
183
+ // This would happen if the effect is running and the instance has
184
+ // already shutdown.
185
+ error!( err=%err, "failed to forward effect message" ) ;
186
+ return ;
187
+ }
188
+ }
189
+ }
190
+ result = & mut run_effect => {
191
+ // Unwrap blocking result
192
+ break result. expect( "failed to await blocking task" ) ;
193
+ }
194
+ }
195
+ } ;
196
+
197
+ // Send the completion, ignoring failures in case we're shutting down
198
+ tx. send ( EffectMessage {
199
+ kind : EffectMessageKind :: Completed { result } ,
200
+ extra,
201
+ } )
103
202
. await
104
- . expect ( "failed to await blocking task" ) ;
203
+ . ok ( ) ;
204
+ } ) ;
105
205
106
- // Send the completion, ignoring failures in case we're shutting down
107
- tx . send ( EffectMessage {
108
- kind : EffectMessageKind :: Completed { result } ,
109
- extra ,
206
+ Ok ( EffectRunHandle {
207
+ ctx ,
208
+ join_handle : join_handle . into ( ) ,
209
+ priority ,
110
210
} )
111
- . await
112
- . ok ( ) ;
113
- } ) ;
114
-
115
- Ok ( EffectRunHandle {
116
- ctx,
117
- join_handle : join_handle. into ( ) ,
118
- priority,
119
- } )
211
+ }
120
212
}
0 commit comments