-
Notifications
You must be signed in to change notification settings - Fork 94
/
Copy pathhot_reload.rs
93 lines (86 loc) · 3.73 KB
/
hot_reload.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
use buildstructor::Builder;
use camino::Utf8PathBuf;
use futures::StreamExt;
use tap::TapFallible;
use crate::{subtask::SubtaskHandleStream, utils::effect::write_file::WriteFile};
use super::config::RouterConfig;
use rover_std::{debugln, errln, infoln};
pub enum RouterUpdateEvent {
SchemaChanged { schema: String },
ConfigChanged { config: RouterConfig },
}
#[derive(Debug)]
pub enum HotReloadEvent {
ConfigWritten(#[allow(unused)] Result<(), Box<dyn std::error::Error + Send>>),
SchemaWritten(#[allow(unused)] Result<(), Box<dyn std::error::Error + Send>>),
}
#[derive(Builder)]
pub struct HotReloadWatcher<WriteF> {
config: Utf8PathBuf,
schema: Utf8PathBuf,
write_file_impl: WriteF,
}
impl<WriteF> SubtaskHandleStream for HotReloadWatcher<WriteF>
where
WriteF: WriteFile + Send + Clone + 'static,
{
type Input = RouterUpdateEvent;
type Output = HotReloadEvent;
fn handle(
self,
sender: tokio::sync::mpsc::UnboundedSender<Self::Output>,
mut input: futures::stream::BoxStream<'static, Self::Input>,
) -> tokio::task::AbortHandle {
let write_file_impl = self.write_file_impl.clone();
tokio::task::spawn(async move {
while let Some(router_update_event) = input.next().await {
match router_update_event {
RouterUpdateEvent::SchemaChanged { schema } => {
match write_file_impl
.write_file(&self.schema, schema.as_bytes())
.await
{
Ok(_) => {
let message = HotReloadEvent::SchemaWritten(Ok(()));
let _ = sender.send(message).tap_err(|err| {
tracing::error!("Unable to send message. Error: {:?}", err)
});
}
Err(err) => {
let message = HotReloadEvent::SchemaWritten(Err(Box::new(err)));
let _ = sender.send(message).tap_err(|err| {
tracing::error!("Unable to send message. Error: {:?}", err)
});
}
}
}
RouterUpdateEvent::ConfigChanged { config } => {
match write_file_impl
.write_file(&self.config, config.inner().as_bytes())
.await
{
Ok(_) => {
let message = HotReloadEvent::ConfigWritten(Ok(()));
let _ = sender.send(message).tap_err(|err| {
tracing::error!("Unable to send message. Error: {:?}", err)
});
infoln!("Router config updated.");
debugln!("{}", config.inner());
}
Err(err) => {
let error_message =
format!("Router config failed to update. {}", &err);
let message = HotReloadEvent::ConfigWritten(Err(Box::new(err)));
let _ = sender.send(message).tap_err(|err| {
tracing::error!("Unable to send message. Error: {:?}", err)
});
errln!("{}", error_message);
}
}
}
}
}
})
.abort_handle()
}
}