Skip to content

Commit b00727e

Browse files
authored
fix(reload): restart api server based on topology (#17958)
This PR attempts to resolve: #13508 I'm not married to switching out the `Runtime` parameters for `Handle`s, it just seemed like the easiest way to get something that could spawn tasks into the `TopologyController` was `Handle::current()`, and that required shifting the parameter types to match. Let me know if another route would be preferred.
1 parent 3b91662 commit b00727e

File tree

3 files changed

+50
-9
lines changed

3 files changed

+50
-9
lines changed

src/api/server.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use async_graphql::{
99
Data, Request, Schema,
1010
};
1111
use async_graphql_warp::{graphql_protocol, GraphQLResponse, GraphQLWebSocket};
12+
use tokio::runtime::Handle;
1213
use tokio::sync::oneshot;
1314
use warp::{filters::BoxedFilter, http::Response, ws::Ws, Filter, Reply};
1415

@@ -31,13 +32,13 @@ impl Server {
3132
config: &config::Config,
3233
watch_rx: topology::WatchRx,
3334
running: Arc<AtomicBool>,
34-
runtime: &tokio::runtime::Runtime,
35+
handle: &Handle,
3536
) -> crate::Result<Self> {
3637
let routes = make_routes(config.api.playground, watch_rx, running);
3738

3839
let (_shutdown, rx) = oneshot::channel();
3940
// warp uses `tokio::spawn` and so needs us to enter the runtime context.
40-
let _guard = runtime.enter();
41+
let _guard = handle.enter();
4142
let (addr, server) = warp::serve(routes)
4243
.try_bind_with_graceful_shutdown(
4344
config.api.address.expect("No socket address"),
@@ -57,7 +58,7 @@ impl Server {
5758
schema::components::update_config(config);
5859

5960
// Spawn the server in the background.
60-
runtime.spawn(server);
61+
handle.spawn(server);
6162

6263
Ok(Self { _shutdown, addr })
6364
}

src/app.rs

+8-6
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use crate::{
3838
use std::os::unix::process::ExitStatusExt;
3939
#[cfg(windows)]
4040
use std::os::windows::process::ExitStatusExt;
41+
use tokio::runtime::Handle;
4142

4243
pub static WORKER_THREADS: OnceNonZeroUsize = OnceNonZeroUsize::new();
4344

@@ -122,13 +123,13 @@ impl ApplicationConfig {
122123

123124
/// Configure the API server, if applicable
124125
#[cfg(feature = "api")]
125-
pub fn setup_api(&self, runtime: &Runtime) -> Option<api::Server> {
126+
pub fn setup_api(&self, handle: &Handle) -> Option<api::Server> {
126127
if self.api.enabled {
127128
match api::Server::start(
128129
self.topology.config(),
129130
self.topology.watch(),
130131
std::sync::Arc::clone(&self.topology.running),
131-
runtime,
132+
handle,
132133
) {
133134
Ok(api_server) => {
134135
emit!(ApiStarted {
@@ -159,7 +160,8 @@ impl Application {
159160
}
160161

161162
pub fn prepare_start() -> Result<(Runtime, StartedApplication), ExitCode> {
162-
Self::prepare().and_then(|(runtime, app)| app.start(&runtime).map(|app| (runtime, app)))
163+
Self::prepare()
164+
.and_then(|(runtime, app)| app.start(runtime.handle()).map(|app| (runtime, app)))
163165
}
164166

165167
pub fn prepare() -> Result<(Runtime, Self), ExitCode> {
@@ -208,13 +210,13 @@ impl Application {
208210
))
209211
}
210212

211-
pub fn start(self, runtime: &Runtime) -> Result<StartedApplication, ExitCode> {
213+
pub fn start(self, handle: &Handle) -> Result<StartedApplication, ExitCode> {
212214
// Any internal_logs sources will have grabbed a copy of the
213215
// early buffer by this point and set up a subscriber.
214216
crate::trace::stop_early_buffering();
215217

216218
emit!(VectorStarted);
217-
runtime.spawn(heartbeat::heartbeat());
219+
handle.spawn(heartbeat::heartbeat());
218220

219221
let Self {
220222
require_healthy,
@@ -224,7 +226,7 @@ impl Application {
224226

225227
let topology_controller = SharedTopologyController::new(TopologyController {
226228
#[cfg(feature = "api")]
227-
api_server: config.setup_api(runtime),
229+
api_server: config.setup_api(handle),
228230
topology: config.topology,
229231
config_paths: config.config_paths.clone(),
230232
require_healthy,

src/topology/controller.rs

+38
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::sync::Arc;
33
#[cfg(feature = "enterprise")]
44
use futures_util::future::BoxFuture;
55
use futures_util::FutureExt as _;
6+
67
use tokio::sync::{Mutex, MutexGuard};
78

89
#[cfg(feature = "api")]
@@ -14,6 +15,7 @@ use crate::config::enterprise::{
1415
use crate::internal_events::{
1516
VectorConfigLoadError, VectorRecoveryError, VectorReloadError, VectorReloaded,
1617
};
18+
1719
use crate::{config, topology::RunningTopology};
1820

1921
#[derive(Clone, Debug)]
@@ -93,6 +95,42 @@ impl TopologyController {
9395
}
9496
}
9597

98+
// Start the api server or disable it, if necessary
99+
#[cfg(feature = "api")]
100+
if !new_config.api.enabled {
101+
if let Some(server) = self.api_server.take() {
102+
debug!("Dropping api server.");
103+
drop(server)
104+
}
105+
} else if self.api_server.is_none() {
106+
use crate::internal_events::ApiStarted;
107+
use crate::topology::ReloadOutcome::FatalError;
108+
use std::sync::atomic::AtomicBool;
109+
use tokio::runtime::Handle;
110+
111+
debug!("Starting api server.");
112+
113+
self.api_server = match api::Server::start(
114+
self.topology.config(),
115+
self.topology.watch(),
116+
Arc::<AtomicBool>::clone(&self.topology.running),
117+
&Handle::current(),
118+
) {
119+
Ok(api_server) => {
120+
emit!(ApiStarted {
121+
addr: new_config.api.address.unwrap(),
122+
playground: new_config.api.playground
123+
});
124+
125+
Some(api_server)
126+
}
127+
Err(e) => {
128+
error!("An error occurred that Vector couldn't handle: {}.", e);
129+
return FatalError;
130+
}
131+
}
132+
}
133+
96134
match self.topology.reload_config_and_respawn(new_config).await {
97135
Ok(true) => {
98136
#[cfg(feature = "api")]

0 commit comments

Comments
 (0)