Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: upkeep #586

Merged
merged 14 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
[workspace]

resolver = "2"

members = [
# "client",
"server"
Expand Down
2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ dhat-heap = []
tokio = { version = "1.28.0", features = ["full"] }
async-trait = "0.1.68"
futures = "0.3.28"
tokio-util = { version = "0.7.7", features = ["time"]}
tokio-util = { version = "0.7.10", features = ["time"]}
tokio-stream = { version = "0.1.14", features = ["net"]}

# API
Expand Down
4 changes: 2 additions & 2 deletions server/src/ban_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ async fn purge_expired_tasks(shared: Arc<Shared>) {
// state as new keys have been set to expire early. This is done by
// looping.
tokio::select! {
_ = tokio::time::sleep_until(when) => {}
_ = shared.background_task.notified() => {}
() = tokio::time::sleep_until(when) => {}
() = shared.background_task.notified() => {}
}
} else {
// There are no keys expiring in the future. Wait until the task is
Expand Down
2 changes: 1 addition & 1 deletion server/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ async fn write_message(
}
}
}
_ = cancel_token.cancelled() => {
() = cancel_token.cancelled() => {
//@todo reword this
trace!("write loop hit cancellation token.");

Expand Down
2 changes: 1 addition & 1 deletion server/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#![cfg_attr(coverage_nightly, feature(no_coverage))]
#![cfg_attr(coverage_nightly, coverage(off))]
#![warn(clippy::pedantic)]
#![allow(clippy::module_name_repetitions)]
//@todo fix this.
Expand Down
4 changes: 2 additions & 2 deletions server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ where
error!(cause = ?e, "Global thread {} failed.", global_name);
}
}
_ = cancel_token.cancelled() => {
() = cancel_token.cancelled() => {
info!("Global thread {} is shutting down from shutdown message.", global_name);
}

Expand Down Expand Up @@ -146,7 +146,7 @@ where
error!(cause = %err, "failed to accept");
};
},
_ = cancel_token.cancelled() => {}
() = cancel_token.cancelled() => {}
}

let start = Instant::now();
Expand Down
2 changes: 1 addition & 1 deletion server/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ impl<State: Clone> Session<State> {
worker_id: Uuid,
) {
//@todo has to be an easier way to reuse worker_name here
debug!(id = ?self.inner.id, "Registered Worker {worker_id} ({}) Session ID: {session_id}", worker_name.clone().unwrap_or(String::new()));
debug!(id = ?self.inner.id, "Registered Worker {worker_id} ({}) Session ID: {session_id}", worker_name.clone().unwrap_or_default());

let worker = Miner::new(
self.id().clone(),
Expand Down
4 changes: 2 additions & 2 deletions server/src/session_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl<CState: Clone> SessionList<CState> {
"Session List sending {} miners reconnect message.",
self.inner.state.len()
);
for entry in self.inner.state.iter() {
for entry in &self.inner.state {
let miner = entry.value();
if let Err(e) = miner.send_raw(msg.clone()) {
warn!(connection_id = %miner.id(), cause = %e, "Failed to send shutdown message");
Expand All @@ -105,7 +105,7 @@ impl<CState: Clone> SessionList<CState> {
);

//@todo we need to parallize this async - now we can do it without async though.
for entry in self.inner.state.iter() {
for entry in &self.inner.state {
entry.value().shutdown();
}
}
Expand Down
6 changes: 3 additions & 3 deletions server/src/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,22 +93,22 @@ impl<State: Clone + Send + Sync + 'static, CState: Default + Clone + Send + Sync
Ok(frame) => frame,
}
},
_ = &mut sleep => {
() = &mut sleep => {
if enabled!(Level::DEBUG) {
error!( id = &self.id.to_string(), ip = &address.to_string(), "Session Parse Frame Timeout");
}
break;
},
//@todo we might want timeouts to reduce difficulty as well here. -> That is
//handled in retarget, so let's check that out.
_ = session_cancel_token.cancelled() => {
() = session_cancel_token.cancelled() => {
//@todo work on these errors,
if enabled!(Level::DEBUG) {
error!( id = &self.id.to_string(), ip = &address.to_string(), "Session Disconnected");
}
break;
},
_ = self.cancel_token.cancelled() => {
() = self.cancel_token.cancelled() => {
// If a shutdown signal is received, return from `run`.
// This will result in the task terminating.
break;
Expand Down
Loading