Skip to content

Commit 3c11c84

Browse files
committed
fix: Introduce ServerHandle type, stop listening on abort
1 parent 3eb509d commit 3c11c84

File tree

3 files changed

+87
-75
lines changed

3 files changed

+87
-75
lines changed

src/instance.rs

+19-23
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,13 @@ use thiserror::Error;
44
use tokio::select;
55
use tokio::sync::broadcast;
66

7-
use crate::api::types::PriorityInfo;
8-
use crate::color::color_to16;
9-
use crate::models::Color;
107
use crate::{
11-
color::{ChannelAdjustments, ChannelAdjustmentsBuilder},
8+
api::types::PriorityInfo,
9+
color::{color_to16, ChannelAdjustments, ChannelAdjustmentsBuilder},
1210
global::Global,
1311
image::RawImage,
14-
models::{self, DeviceConfig, InstanceConfig},
12+
models::{Color, Color16, DeviceConfig, InstanceConfig},
13+
servers::ServerHandle,
1514
};
1615

1716
mod black_border_detector;
@@ -40,10 +39,11 @@ pub struct Instance {
4039
config: Arc<InstanceConfig>,
4140
device: Device,
4241
muxer: PriorityMuxer,
43-
color_data: Vec<models::Color16>,
42+
color_data: Vec<Color16>,
4443
black_border_detector: BlackBorderDetector,
4544
channel_adjustments: ChannelAdjustments,
4645
smoothing: Smoothing,
46+
_boblight_server: ServerHandle,
4747
}
4848

4949
impl Instance {
@@ -61,40 +61,36 @@ impl Instance {
6161

6262
let config = Arc::new(config);
6363

64-
// TODO: Terminate BoblightServer on Instance drop?
65-
tokio::spawn({
66-
let instance = config.clone();
67-
let config = instance.boblight_server.clone();
68-
69-
async move {
70-
let result = crate::servers::bind(config, global, move |tcp, global| {
64+
let _boblight_server = ServerHandle::spawn(
65+
"Boblight",
66+
config.boblight_server.clone(),
67+
global.clone(),
68+
{
69+
let instance = config.clone();
70+
move |tcp, global| {
7171
crate::servers::boblight::handle_client(
7272
tcp,
7373
tx.clone(),
7474
instance.clone(),
7575
global,
7676
)
77-
})
78-
.await;
79-
80-
if let Err(error) = result {
81-
error!("Boblight server terminated: {:?}", error);
8277
}
83-
}
84-
});
78+
},
79+
);
8580

8681
Ok(Self {
8782
config,
8883
device,
8984
muxer,
90-
color_data: vec![models::Color16::default(); led_count],
85+
color_data: vec![Color16::default(); led_count],
9186
black_border_detector,
9287
channel_adjustments,
9388
smoothing,
89+
_boblight_server,
9490
})
9591
}
9692

97-
fn handle_color(&mut self, color: models::Color) {
93+
fn handle_color(&mut self, color: Color) {
9894
let color = crate::color::color_to16(color);
9995
self.color_data.iter_mut().map(|x| *x = color).count();
10096
}
@@ -150,7 +146,7 @@ impl Instance {
150146
}
151147
}
152148

153-
*value = models::Color16::from_components((
149+
*value = Color16::from_components((
154150
(r_acc / cnt.max(1)).max(0).min(u16::MAX as _) as u16,
155151
(g_acc / cnt.max(1)).max(0).min(u16::MAX as _) as u16,
156152
(b_acc / cnt.max(1)).max(0).min(u16::MAX as _) as u16,

src/main.rs

+28-50
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#[macro_use]
22
extern crate log;
33

4+
use hyperion::servers::ServerHandle;
45
use structopt::StructOpt;
56
use tokio::runtime::Builder;
67
use tokio::signal;
@@ -79,60 +80,37 @@ async fn run(opts: Opts) -> color_eyre::eyre::Result<()> {
7980
});
8081
}
8182

82-
// Start the flatbuffers servers
83-
if config.global.flatbuffers_server.enable {
84-
tokio::spawn({
85-
let global = global.clone();
86-
let config = config.global.flatbuffers_server.clone();
87-
88-
async move {
89-
let result =
90-
hyperion::servers::bind(config, global, hyperion::servers::flat::handle_client)
91-
.await;
92-
93-
if let Err(error) = result {
94-
error!("Flatbuffers server terminated: {:?}", error);
95-
}
96-
}
97-
});
98-
}
83+
// Start the Flatbuffers servers
84+
let _flatbuffers_server = if config.global.flatbuffers_server.enable {
85+
Some(ServerHandle::spawn(
86+
"Flatbuffers",
87+
config.global.flatbuffers_server.clone(),
88+
global.clone(),
89+
hyperion::servers::flat::handle_client,
90+
))
91+
} else {
92+
None
93+
};
9994

10095
// Start the JSON server
101-
tokio::spawn({
102-
let global = global.clone();
103-
let config = config.global.json_server.clone();
104-
105-
async move {
106-
let result =
107-
hyperion::servers::bind(config, global, hyperion::servers::json::handle_client)
108-
.await;
109-
110-
if let Err(error) = result {
111-
error!("JSON server terminated: {:?}", error);
112-
}
113-
}
114-
});
96+
let _json_server = ServerHandle::spawn(
97+
"JSON",
98+
config.global.json_server.clone(),
99+
global.clone(),
100+
hyperion::servers::json::handle_client,
101+
);
115102

116103
// Start the Protobuf server
117-
if config.global.proto_server.enable {
118-
tokio::spawn({
119-
let global = global.clone();
120-
let config = config.global.proto_server.clone();
121-
122-
async move {
123-
let result = hyperion::servers::bind(
124-
config,
125-
global,
126-
hyperion::servers::proto::handle_client,
127-
)
128-
.await;
129-
130-
if let Err(error) = result {
131-
error!("Protobuf server terminated: {:?}", error);
132-
}
133-
}
134-
});
135-
}
104+
let _proto_server = if config.global.proto_server.enable {
105+
Some(ServerHandle::spawn(
106+
"Protobuf",
107+
config.global.proto_server.clone(),
108+
global.clone(),
109+
hyperion::servers::proto::handle_client,
110+
))
111+
} else {
112+
None
113+
};
136114

137115
// Should we continue running?
138116
let mut abort = false;

src/servers.rs

+40-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
22

3-
use tokio::net::{TcpListener, TcpStream};
3+
use tokio::{
4+
net::{TcpListener, TcpStream},
5+
task::JoinHandle,
6+
};
47

58
use crate::{global::Global, models::ServerConfig};
69

@@ -9,7 +12,7 @@ pub mod flat;
912
pub mod json;
1013
pub mod proto;
1114

12-
pub async fn bind<T, E, F>(
15+
async fn bind<T, E, F>(
1316
options: T,
1417
global: Global,
1518
handle_client: impl Fn((TcpStream, SocketAddr), Global) -> F,
@@ -49,3 +52,38 @@ where
4952
});
5053
}
5154
}
55+
56+
pub struct ServerHandle {
57+
join_handle: JoinHandle<()>,
58+
}
59+
60+
impl ServerHandle {
61+
pub fn spawn<T, E, F, H>(
62+
name: &'static str,
63+
options: T,
64+
global: Global,
65+
handle_client: H,
66+
) -> Self
67+
where
68+
T: ServerConfig + Send + 'static,
69+
F: futures::Future<Output = Result<(), E>> + Send + 'static,
70+
E: From<std::io::Error> + std::fmt::Display + Send + 'static,
71+
H: Fn((TcpStream, SocketAddr), Global) -> F + Send + 'static,
72+
{
73+
Self {
74+
join_handle: tokio::spawn(async move {
75+
let result = bind(options, global, handle_client).await;
76+
77+
if let Err(error) = result {
78+
error!("{} server terminated: {}", name, error);
79+
}
80+
}),
81+
}
82+
}
83+
}
84+
85+
impl Drop for ServerHandle {
86+
fn drop(&mut self) {
87+
self.join_handle.abort();
88+
}
89+
}

0 commit comments

Comments
 (0)