Skip to content

Commit 8bd3fe9

Browse files
committed
refactor: Add InstanceHandle type
1 parent 8163f10 commit 8bd3fe9

File tree

3 files changed

+96
-15
lines changed

3 files changed

+96
-15
lines changed

src/global.rs

+16-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::collections::HashMap;
1+
use std::collections::{BTreeMap, HashMap};
22
use std::net::SocketAddr;
33
use std::sync::Arc;
44

@@ -12,7 +12,7 @@ pub use input_message::*;
1212
mod input_source;
1313
pub use input_source::*;
1414

15-
use crate::{component::ComponentName, models::Config};
15+
use crate::{component::ComponentName, instance::InstanceHandle, models::Config};
1616

1717
pub trait Message: Sized {
1818
type Data;
@@ -75,6 +75,14 @@ impl Global {
7575
self.0.read().await.input_tx.subscribe()
7676
}
7777

78+
pub async fn register_instance(&self, handle: InstanceHandle) {
79+
self.0.write().await.register_instance(handle);
80+
}
81+
82+
pub async fn get_instance(&self, id: i32) -> Option<InstanceHandle> {
83+
self.0.read().await.instances.get(&id).cloned()
84+
}
85+
7886
pub async fn read_config<T>(&self, f: impl FnOnce(&Config) -> T) -> T {
7987
let data = self.0.read().await;
8088
f(&data.config)
@@ -94,6 +102,7 @@ pub struct GlobalData {
94102
input_sources: HashMap<usize, Arc<InputSource<InputMessage>>>,
95103
next_input_source_id: usize,
96104
config: Config,
105+
instances: BTreeMap<i32, InstanceHandle>,
97106
}
98107

99108
impl GlobalData {
@@ -105,6 +114,7 @@ impl GlobalData {
105114
input_sources: Default::default(),
106115
next_input_source_id: 1,
107116
config: config.clone(),
117+
instances: Default::default(),
108118
}
109119
}
110120

@@ -134,4 +144,8 @@ impl GlobalData {
134144
info!("unregistered input source {}", *is);
135145
}
136146
}
147+
148+
fn register_instance(&mut self, handle: InstanceHandle) {
149+
self.instances.insert(handle.id(), handle);
150+
}
137151
}

src/instance.rs

+75-12
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
use std::sync::Arc;
22

33
use thiserror::Error;
4-
use tokio::sync::broadcast;
5-
use tokio::{select, sync::mpsc};
4+
use tokio::{
5+
select,
6+
sync::{broadcast, mpsc, oneshot},
7+
};
68

9+
use crate::api::types::PriorityInfo;
710
use crate::models::Color;
811
use crate::{
912
global::{Global, InputMessage},
@@ -37,7 +40,9 @@ pub enum InstanceError {
3740
}
3841

3942
pub struct Instance {
43+
config: Arc<InstanceConfig>,
4044
device: InstanceDevice,
45+
handle_rx: mpsc::Receiver<InstanceMessage>,
4146
receiver: broadcast::Receiver<InputMessage>,
4247
local_receiver: mpsc::Receiver<InputMessage>,
4348
muxer: PriorityMuxer,
@@ -46,7 +51,7 @@ pub struct Instance {
4651
}
4752

4853
impl Instance {
49-
pub async fn new(global: Global, config: InstanceConfig) -> Self {
54+
pub async fn new(global: Global, config: InstanceConfig) -> (Self, InstanceHandle) {
5055
let device: InstanceDevice =
5156
Device::new(&config.instance.friendly_name, config.device.clone())
5257
.await
@@ -92,14 +97,22 @@ impl Instance {
9297
None
9398
};
9499

95-
Self {
96-
device,
97-
receiver,
98-
local_receiver,
99-
muxer,
100-
core,
101-
_boblight_server,
102-
}
100+
let (tx, handle_rx) = mpsc::channel(1);
101+
let id = config.instance.id;
102+
103+
(
104+
Self {
105+
config,
106+
device,
107+
handle_rx,
108+
receiver,
109+
local_receiver,
110+
muxer,
111+
core,
112+
_boblight_server,
113+
},
114+
InstanceHandle { id, tx },
115+
)
103116
}
104117

105118
async fn on_input_message(&mut self, message: InputMessage) {
@@ -109,6 +122,19 @@ impl Instance {
109122
}
110123
}
111124

125+
pub fn id(&self) -> i32 {
126+
self.config.instance.id
127+
}
128+
129+
async fn handle_instance_message(&mut self, message: InstanceMessage) {
130+
match message {
131+
InstanceMessage::PriorityInfo(tx) => {
132+
// unwrap: the receiver should not have dropped
133+
tx.send(self.muxer.current_priorities().await).unwrap();
134+
}
135+
}
136+
}
137+
112138
pub async fn run(mut self) -> Result<(), InstanceError> {
113139
loop {
114140
select! {
@@ -123,7 +149,7 @@ impl Instance {
123149
},
124150
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
125151
// No more input messages
126-
return Ok(());
152+
break Ok(());
127153
},
128154
Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
129155
warn!("skipped {} input messages", skipped);
@@ -147,6 +173,14 @@ impl Instance {
147173
// LED data changed
148174
self.device.set_led_data(led_data).await?;
149175
},
176+
message = self.handle_rx.recv() => {
177+
if let Some(message) = message {
178+
self.handle_instance_message(message).await;
179+
} else {
180+
// If the handle is dropped, it means the instance was unregistered
181+
break Ok(());
182+
}
183+
}
150184
}
151185
}
152186
}
@@ -181,3 +215,32 @@ impl From<Result<Device, DeviceError>> for InstanceDevice {
181215
Self { inner }
182216
}
183217
}
218+
219+
#[derive(Debug)]
220+
enum InstanceMessage {
221+
PriorityInfo(oneshot::Sender<Vec<PriorityInfo>>),
222+
}
223+
224+
#[derive(Clone)]
225+
pub struct InstanceHandle {
226+
id: i32,
227+
tx: mpsc::Sender<InstanceMessage>,
228+
}
229+
230+
impl InstanceHandle {
231+
pub fn id(&self) -> i32 {
232+
self.id
233+
}
234+
235+
pub async fn current_priorities(&self) -> Vec<PriorityInfo> {
236+
let (tx, rx) = oneshot::channel();
237+
238+
// TODO: Don't unwrap and propagate?
239+
self.tx
240+
.send(InstanceMessage::PriorityInfo(tx))
241+
.await
242+
.unwrap();
243+
// unwrap: if the previous didn't fail, the instance will be there to answer
244+
rx.await.unwrap()
245+
}
246+
}

src/main.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,11 @@ async fn run(opts: Opts) -> color_eyre::eyre::Result<()> {
3333

3434
// Initialize and spawn the devices
3535
for (_, inst) in &config.instances {
36-
let inst = hyperion::instance::Instance::new(global.clone(), inst.clone()).await;
36+
// Create the instance
37+
let (inst, handle) = hyperion::instance::Instance::new(global.clone(), inst.clone()).await;
38+
// Register the instance globally using its handle
39+
global.register_instance(handle).await;
40+
// Run the instance futures
3741
tokio::spawn(async move {
3842
let result = inst.run().await;
3943

0 commit comments

Comments
 (0)