Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: upkeep #586

Merged
merged 14 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
[workspace]

resolver = "2"

members = [
# "client",
"server"
Expand Down
1 change: 1 addition & 0 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ unlicensed = "deny"
allow = [
"Apache-2.0",
"MIT",
"BSD-3-Clause",
# @todo see if we can get rid of this one.
"Unicode-DFS-2016"
]
Expand Down
4 changes: 2 additions & 2 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "stratum-server"
version = "5.5.4"
authors = ["Sean Kilgarriff [email protected]"]
rust-version = "1.65"
rust-version = "1.67.1"
edition = "2021"
license = "Apache-2.0 OR MIT"
description = "The server code for the Rust Stratum implementation"
Expand All @@ -26,7 +26,7 @@ dhat-heap = []
tokio = { version = "1.28.0", features = ["full"] }
async-trait = "0.1.68"
futures = "0.3.28"
tokio-util = { version = "0.7.7", features = ["time"]}
tokio-util = { version = "0.7.10", features = ["time"]}
tokio-stream = { version = "0.1.14", features = ["net"]}

# API
Expand Down
2 changes: 1 addition & 1 deletion server/src/api/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ pub enum Error {
#[error(transparent)]
Hyper(#[from] hyper::Error),
#[error(transparent)]
AddrParseError(#[from] std::net::AddrParseError),
AddrParse(#[from] std::net::AddrParseError),
#[error(transparent)]
Io(#[from] std::io::Error),
}
10 changes: 5 additions & 5 deletions server/src/ban_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ async fn purge_expired_tasks(shared: Arc<Shared>) {
// state as new keys have been set to expire early. This is done by
// looping.
tokio::select! {
_ = tokio::time::sleep_until(when) => {}
_ = shared.background_task.notified() => {}
() = tokio::time::sleep_until(when) => {}
() = shared.background_task.notified() => {}
}
} else {
// There are no keys expiring in the future. Wait until the task is
Expand Down Expand Up @@ -329,7 +329,7 @@ mod tests {
use super::*;
use tokio_test::{assert_err, assert_ok};

#[cfg_attr(coverage_nightly, no_coverage)]
#[cfg_attr(coverage_nightly, coverage(off))]
#[tokio::test]
async fn single_ban_expires() -> anyhow::Result<()> {
let cancel_token = CancellationToken::new();
Expand All @@ -354,7 +354,7 @@ mod tests {
Ok(())
}

#[cfg_attr(coverage_nightly, no_coverage)]
#[cfg_attr(coverage_nightly, coverage(off))]
#[tokio::test]
async fn ban_extended() -> anyhow::Result<()> {
let cancel_token = CancellationToken::new();
Expand Down Expand Up @@ -400,7 +400,7 @@ mod tests {
Duration::from_millis(n)
}

#[cfg_attr(coverage_nightly, no_coverage)]
#[cfg_attr(coverage_nightly, coverage(off))]
#[tokio::test]
async fn graceful_shutdown() -> anyhow::Result<()> {
let cancel_token = CancellationToken::new();
Expand Down
2 changes: 1 addition & 1 deletion server/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ async fn write_message(
}
}
}
_ = cancel_token.cancelled() => {
() = cancel_token.cancelled() => {
//@todo reword this
trace!("write loop hit cancellation token.");

Expand Down
4 changes: 2 additions & 2 deletions server/src/id_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ mod tests {
use super::*;
use tokio_test::assert_ok;

#[cfg_attr(coverage_nightly, no_coverage)]
#[cfg_attr(coverage_nightly, coverage(off))]
#[tokio::test]
async fn idx_allocations() {
let id_manager = IDManager::new(0);
Expand All @@ -120,7 +120,7 @@ mod tests {
assert_eq!(rolled_id, SessionID::from(0));
}

#[cfg_attr(coverage_nightly, no_coverage)]
#[cfg_attr(coverage_nightly, coverage(off))]
#[tokio::test]
async fn basic_idx_allocations_with_prefix() {
let id_manager = IDManager::new(9);
Expand Down
2 changes: 1 addition & 1 deletion server/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#![cfg_attr(coverage_nightly, feature(no_coverage))]
#![cfg_attr(coverage_nightly, feature(coverage_attribute))]
#![warn(clippy::pedantic)]
#![allow(clippy::module_name_repetitions)]
//@todo fix this.
Expand Down
4 changes: 2 additions & 2 deletions server/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ impl<State: Clone + Send + Sync + 'static, CState: Clone + Send + Sync + 'static
global_vars: GlobalVars,
) {
let Some(endpoint) = self.routes.get(value.method()) else {
warn!("Method {} was not found", value.method());
return;
warn!("Method {} was not found", value.method());
return;
};

// if log::log_enabled!(log::Level::Trace) {
Expand Down
4 changes: 2 additions & 2 deletions server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ where
error!(cause = ?e, "Global thread {} failed.", global_name);
}
}
_ = cancel_token.cancelled() => {
() = cancel_token.cancelled() => {
info!("Global thread {} is shutting down from shutdown message.", global_name);
}

Expand Down Expand Up @@ -146,7 +146,7 @@ where
error!(cause = %err, "failed to accept");
};
},
_ = cancel_token.cancelled() => {}
() = cancel_token.cancelled() => {}
}

let start = Instant::now();
Expand Down
2 changes: 1 addition & 1 deletion server/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ impl<State: Clone> Session<State> {
worker_id: Uuid,
) {
//@todo has to be an easier way to reuse worker_name here
debug!(id = ?self.inner.id, "Registered Worker {worker_id} ({}) Session ID: {session_id}", worker_name.clone().unwrap_or(String::new()));
debug!(id = ?self.inner.id, "Registered Worker {worker_id} ({}) Session ID: {session_id}", worker_name.clone().unwrap_or_default());

let worker = Miner::new(
self.id().clone(),
Expand Down
4 changes: 2 additions & 2 deletions server/src/session_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl<CState: Clone> SessionList<CState> {
"Session List sending {} miners reconnect message.",
self.inner.state.len()
);
for entry in self.inner.state.iter() {
for entry in &self.inner.state {
let miner = entry.value();
if let Err(e) = miner.send_raw(msg.clone()) {
warn!(connection_id = %miner.id(), cause = %e, "Failed to send shutdown message");
Expand All @@ -105,7 +105,7 @@ impl<CState: Clone> SessionList<CState> {
);

//@todo we need to parallize this async - now we can do it without async though.
for entry in self.inner.state.iter() {
for entry in &self.inner.state {
entry.value().shutdown();
}
}
Expand Down
6 changes: 3 additions & 3 deletions server/src/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,22 +93,22 @@ impl<State: Clone + Send + Sync + 'static, CState: Default + Clone + Send + Sync
Ok(frame) => frame,
}
},
_ = &mut sleep => {
() = &mut sleep => {
if enabled!(Level::DEBUG) {
error!( id = &self.id.to_string(), ip = &address.to_string(), "Session Parse Frame Timeout");
}
break;
},
//@todo we might want timeouts to reduce difficulty as well here. -> That is
//handled in retarget, so let's check that out.
_ = session_cancel_token.cancelled() => {
() = session_cancel_token.cancelled() => {
//@todo work on these errors,
if enabled!(Level::DEBUG) {
error!( id = &self.id.to_string(), ip = &address.to_string(), "Session Disconnected");
}
break;
},
_ = self.cancel_token.cancelled() => {
() = self.cancel_token.cancelled() => {
// If a shutdown signal is received, return from `run`.
// This will result in the task terminating.
break;
Expand Down
Loading