Skip to content

Commit e4fa2eb

Browse files
committed
refactor: Split server binding and accepting loop
1 parent 19e2011 commit e4fa2eb

File tree

3 files changed

+65
-71
lines changed

3 files changed

+65
-71
lines changed

src/instance.rs

+7-9
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use crate::{
1010
global::Global,
1111
image::RawImage,
1212
models::{Color, Color16, DeviceConfig, InstanceConfig},
13-
servers::ServerHandle,
13+
servers::{self, ServerHandle},
1414
};
1515

1616
mod black_border_detector;
@@ -27,6 +27,8 @@ use smoothing::*;
2727

2828
#[derive(Debug, Error)]
2929
pub enum InstanceError {
30+
#[error("i/o error: {0}")]
31+
Io(#[from] std::io::Error),
3032
#[error("device error: {0}")]
3133
Device(#[from] DeviceError),
3234
#[error("recv error: {0}")]
@@ -59,22 +61,18 @@ impl Instance {
5961

6062
let config = Arc::new(config);
6163

62-
let _boblight_server = ServerHandle::spawn(
64+
let _boblight_server = servers::bind(
6365
"Boblight",
6466
config.boblight_server.clone(),
6567
global.clone(),
6668
{
6769
let instance = config.clone();
6870
move |tcp, global| {
69-
crate::servers::boblight::handle_client(
70-
tcp,
71-
tx.clone(),
72-
instance.clone(),
73-
global,
74-
)
71+
servers::boblight::handle_client(tcp, tx.clone(), instance.clone(), global)
7572
}
7673
},
77-
);
74+
)
75+
.await?;
7876

7977
Ok(Self {
8078
config,

src/main.rs

+21-15
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
#[macro_use]
22
extern crate log;
33

4-
use hyperion::servers::ServerHandle;
54
use structopt::StructOpt;
65
use tokio::runtime::Builder;
76
use tokio::signal;
@@ -82,32 +81,39 @@ async fn run(opts: Opts) -> color_eyre::eyre::Result<()> {
8281

8382
// Start the Flatbuffers servers
8483
let _flatbuffers_server = if config.global.flatbuffers_server.enable {
85-
Some(ServerHandle::spawn(
86-
"Flatbuffers",
87-
config.global.flatbuffers_server.clone(),
88-
global.clone(),
89-
hyperion::servers::flat::handle_client,
90-
))
84+
Some(
85+
hyperion::servers::bind(
86+
"Flatbuffers",
87+
config.global.flatbuffers_server.clone(),
88+
global.clone(),
89+
hyperion::servers::flat::handle_client,
90+
)
91+
.await?,
92+
)
9193
} else {
9294
None
9395
};
9496

9597
// Start the JSON server
96-
let _json_server = ServerHandle::spawn(
98+
let _json_server = hyperion::servers::bind(
9799
"JSON",
98100
config.global.json_server.clone(),
99101
global.clone(),
100102
hyperion::servers::json::handle_client,
101-
);
103+
)
104+
.await?;
102105

103106
// Start the Protobuf server
104107
let _proto_server = if config.global.proto_server.enable {
105-
Some(ServerHandle::spawn(
106-
"Protobuf",
107-
config.global.proto_server.clone(),
108-
global.clone(),
109-
hyperion::servers::proto::handle_client,
110-
))
108+
Some(
109+
hyperion::servers::bind(
110+
"Protobuf",
111+
config.global.proto_server.clone(),
112+
global.clone(),
113+
hyperion::servers::proto::handle_client,
114+
)
115+
.await?,
116+
)
111117
} else {
112118
None
113119
};

src/servers.rs

+37-47
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,21 @@ pub mod flat;
1212
pub mod json;
1313
pub mod proto;
1414

15-
async fn bind<T, E, F>(
15+
pub struct ServerHandle {
16+
join_handle: JoinHandle<()>,
17+
}
18+
19+
pub async fn bind<T, E, F, H>(
20+
name: &'static str,
1621
options: T,
1722
global: Global,
18-
handle_client: impl Fn((TcpStream, SocketAddr), Global) -> F,
19-
) -> Result<(), E>
23+
handle_client: H,
24+
) -> std::io::Result<ServerHandle>
2025
where
21-
T: ServerConfig,
26+
T: ServerConfig + Send + 'static,
2227
F: futures::Future<Output = Result<(), E>> + Send + 'static,
2328
E: From<std::io::Error> + std::fmt::Display + Send + 'static,
29+
H: Fn((TcpStream, SocketAddr), Global) -> F + Send + 'static,
2430
{
2531
// Compute binding address
2632
let address = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), options.port());
@@ -31,55 +37,39 @@ where
3137
// Notify we are listening
3238
info!("server listening on {}", address);
3339

34-
loop {
35-
let incoming = listener.accept().await?;
36-
tokio::spawn({
37-
let peer_addr = incoming.1.clone();
38-
let ft = handle_client(incoming, global.clone());
40+
// Spawn accepting loop
41+
let join_handle = tokio::spawn(async move {
42+
let result: Result<(), _> = loop {
43+
match listener.accept().await {
44+
Ok(incoming) => {
45+
tokio::spawn({
46+
let peer_addr = incoming.1.clone();
47+
let ft = handle_client(incoming, global.clone());
3948

40-
async move {
41-
let result = ft.await;
49+
async move {
50+
let result = ft.await;
4251

43-
match result {
44-
Ok(_) => {
45-
info!("({}) client disconnected", peer_addr);
46-
}
47-
Err(error) => {
48-
error!("({}) client error:{}", peer_addr, error);
49-
}
52+
match result {
53+
Ok(_) => {
54+
info!("({}) client disconnected", peer_addr);
55+
}
56+
Err(error) => {
57+
error!("({}) client error:{}", peer_addr, error);
58+
}
59+
}
60+
}
61+
});
5062
}
63+
Err(error) => break Err(error),
5164
}
52-
});
53-
}
54-
}
55-
56-
pub struct ServerHandle {
57-
join_handle: JoinHandle<()>,
58-
}
59-
60-
impl ServerHandle {
61-
pub fn spawn<T, E, F, H>(
62-
name: &'static str,
63-
options: T,
64-
global: Global,
65-
handle_client: H,
66-
) -> Self
67-
where
68-
T: ServerConfig + Send + 'static,
69-
F: futures::Future<Output = Result<(), E>> + Send + 'static,
70-
E: From<std::io::Error> + std::fmt::Display + Send + 'static,
71-
H: Fn((TcpStream, SocketAddr), Global) -> F + Send + 'static,
72-
{
73-
Self {
74-
join_handle: tokio::spawn(async move {
75-
let result = bind(options, global, handle_client).await;
65+
};
7666

77-
if let Err(error) = result {
78-
error!("{} server terminated: {}", name, error);
79-
}
80-
}),
67+
if let Err(error) = result {
68+
error!("{} server terminated: {}", name, error);
8169
}
82-
}
70+
});
71+
72+
Ok(ServerHandle { join_handle })
8373
}
8474

8575
impl Drop for ServerHandle {

0 commit comments

Comments
 (0)