Skip to content

Commit

Permalink
Upgrade Tokio to version 0.3.0
Browse files Browse the repository at this point in the history
  • Loading branch information
romac committed Nov 17, 2020
1 parent bfcbf83 commit 3bad70e
Show file tree
Hide file tree
Showing 12 changed files with 70 additions and 57 deletions.
2 changes: 1 addition & 1 deletion light-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ serde_derive = "1.0.106"
sled = "0.34.3"
static_assertions = "1.1.0"
thiserror = "1.0.15"
tokio = "0.2.20"
tokio = "0.3"

[dev-dependencies]
tendermint-testgen = { path = "../testgen"}
Expand Down
4 changes: 4 additions & 0 deletions light-client/src/components/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ pub enum IoError {
/// Task timed out.
#[error("task timed out after {} ms", .0.as_millis())]
Timeout(Duration),

/// Failed to initialize runtime
#[error("failed to initialize runtime")]
Runtime,
}

impl IoError {
Expand Down
6 changes: 1 addition & 5 deletions light-client/src/utils/block_on.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,7 @@ where
F::Output: Send,
{
std::thread::spawn(move || {
let mut rt = tokio::runtime::Builder::new()
.basic_scheduler()
.enable_all()
.build()
.unwrap();
let rt = tokio::runtime::Runtime::new().unwrap();

if let Some(timeout) = timeout {
let task = async { tokio::time::timeout(timeout, f).await };
Expand Down
3 changes: 1 addition & 2 deletions light-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ name = "tendermint-light-node"
path = "src/bin/tendermint-light-node/main.rs"

[dependencies]
abscissa_tokio = "0.5"
anomaly = { version = "0.2", features = [ "serializer" ] }
async-trait = "0.1"
gumdrop = "0.7"
Expand All @@ -42,7 +41,7 @@ tendermint = { version = "0.17.0-rc2", path = "../tendermint" }
tendermint-light-client = { version = "0.17.0-rc2", path = "../light-client" }
tendermint-rpc = { version = "0.17.0-rc2", path = "../rpc", features = [ "http-client" ] }
thiserror = "1.0"
tokio = { version = "0.2", features = ["full"] }
tokio = { version = "0.3", features = ["full"] }

[dependencies.abscissa_core]
version = "0.5.0"
Expand Down
4 changes: 1 addition & 3 deletions light-node/src/application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use abscissa_core::{
application::{self, AppCell},
config, trace, Application, EntryPoint, FrameworkError, StandardPaths,
};
use abscissa_tokio::TokioComponent;

/// Application state
pub static APPLICATION: AppCell<LightNodeApp> = AppCell::new();
Expand Down Expand Up @@ -83,8 +82,7 @@ impl Application for LightNodeApp {
/// beyond the default ones provided by the framework, this is the place
/// to do so.
fn register_components(&mut self, command: &Self::Cmd) -> Result<(), FrameworkError> {
let mut components = self.framework_components(command)?;
components.push(Box::new(TokioComponent::new()?));
let components = self.framework_components(command)?;
self.state.components.register(components)
}

Expand Down
57 changes: 25 additions & 32 deletions light-node/src/commands/start.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
//! `start` subcommand - start the light node.
use std::process;

use crate::application::{app_config, APPLICATION};
use crate::application::app_config;
use crate::config::{LightClientConfig, LightNodeConfig};
use crate::rpc;
use crate::rpc::Server;
Expand Down Expand Up @@ -44,42 +42,37 @@ pub struct StartCmd {
impl Runnable for StartCmd {
/// Start the application.
fn run(&self) {
if let Err(err) = abscissa_tokio::run(&APPLICATION, async {
if let Err(e) = StartCmd::assert_init_was_run() {
if let Err(e) = StartCmd::assert_init_was_run() {
status_err!(&e);
panic!(e);
}

let supervisor = match self.construct_supervisor() {
Ok(supervisor) => supervisor,
Err(e) => {
status_err!(&e);
panic!(e);
}
};

let rpc_handler = supervisor.handle();
StartCmd::start_rpc_server(rpc_handler);

let supervisor = match self.construct_supervisor() {
Ok(supervisor) => supervisor,
Err(e) => {
status_err!(&e);
panic!(e);
let handle = supervisor.handle();
std::thread::spawn(|| supervisor.run());

loop {
match handle.verify_to_highest() {
Ok(light_block) => {
status_info!("synced to block:", light_block.height().to_string());
}
};

let rpc_handler = supervisor.handle();
StartCmd::start_rpc_server(rpc_handler);

let handle = supervisor.handle();
std::thread::spawn(|| supervisor.run());

loop {
match handle.verify_to_highest() {
Ok(light_block) => {
status_info!("synced to block:", light_block.height().to_string());
}
Err(err) => {
status_err!("sync failed: {}", err);
}
Err(err) => {
status_err!("sync failed: {}", err);
}

// TODO(liamsi): use ticks and make this configurable:
std::thread::sleep(Duration::from_millis(800));
}
}) {
status_err!("Unexpected error while running application: {}", err);
process::exit(1);

// TODO(liamsi): use ticks and make this configurable:
std::thread::sleep(Duration::from_millis(800));
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ uuid = { version = "0.8", default-features = false }
subtle-encoding = { version = "0.5", features = ["bech32-preview"] }

async-trait = { version = "0.1", optional = true }
async-tungstenite = { version="0.8", features = ["tokio-runtime"], optional = true }
async-tungstenite = { version = "0.10", features = ["tokio-runtime"], optional = true }
futures = { version = "0.3", optional = true }
http = { version = "0.2", optional = true }
hyper = { version = "0.13", optional = true }
tokio = { version = "0.2", optional = true }
tokio = { version = "0.3", features = ["time", "sync", "stream"], optional = true }
tracing = { version = "0.1", optional = true }
16 changes: 12 additions & 4 deletions rpc/src/client/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ pub struct Subscription {
impl Stream for Subscription {
type Item = Result<Event>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.event_rx.poll_recv(cx)
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.pin_get_event_rx().poll_next(cx)
}
}

Expand Down Expand Up @@ -108,6 +108,14 @@ impl Subscription {
)
})?
}

/// Pinning is structural for the underlying channels.
/// As such we can project the underlying channel as a pinned value.
///
/// See https://doc.rust-lang.org/std/pin/index.html#pinning-is-structural-for-field
fn pin_get_event_rx(self: Pin<&mut Self>) -> Pin<&mut ChannelRx<Result<Event>>> {
unsafe { self.map_unchecked_mut(|s| &mut s.event_rx) }
}
}

/// A command that can be sent to the subscription driver.
Expand Down Expand Up @@ -509,7 +517,7 @@ mod test {
}

async fn must_recv<T>(ch: &mut ChannelRx<T>, timeout_ms: u64) -> T {
let mut delay = time::delay_for(Duration::from_millis(timeout_ms));
let mut delay = time::sleep(Duration::from_millis(timeout_ms));
tokio::select! {
_ = &mut delay, if !delay.is_elapsed() => panic!("timed out waiting for recv"),
Some(v) = ch.recv() => v,
Expand All @@ -520,7 +528,7 @@ mod test {
where
T: std::fmt::Debug,
{
let mut delay = time::delay_for(Duration::from_millis(timeout_ms));
let mut delay = time::sleep(Duration::from_millis(timeout_ms));
tokio::select! {
_ = &mut delay, if !delay.is_elapsed() => (),
Some(v) = ch.recv() => panic!("got unexpected result from channel: {:?}", v),
Expand Down
19 changes: 17 additions & 2 deletions rpc/src/client/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
//! convenience methods. We also only implement unbounded channels at present.
//! In future, if RPC consumers need it, we will implement bounded channels.
use std::pin::Pin;

use crate::{Error, Result};
use futures::task::{Context, Poll};
use futures::Stream;
use tokio::sync::mpsc;

/// Constructor for an unbounded channel.
Expand Down Expand Up @@ -43,7 +46,19 @@ impl<T> ChannelRx<T> {
self.0.recv().await
}

pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.0.poll_recv(cx)
/// Pinning is structural for the underlying channel.
/// As such we can project the underlying channel as a pinned value.
///
/// See https://doc.rust-lang.org/std/pin/index.html#pinning-is-structural-for-field
fn pin_get(self: Pin<&mut Self>) -> Pin<&mut mpsc::UnboundedReceiver<T>> {
unsafe { self.map_unchecked_mut(|s| &mut s.0) }
}
}

impl<T> Stream for ChannelRx<T> {
type Item = T;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.pin_get().poll_next(cx)
}
}
2 changes: 1 addition & 1 deletion rpc/src/client/transport/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ impl WebSocketClientDriver {
pub async fn run(mut self) -> Result<()> {
let mut ping_interval =
tokio::time::interval_at(Instant::now().add(PING_INTERVAL), PING_INTERVAL);
let mut recv_timeout = tokio::time::delay_for(PING_INTERVAL);
let mut recv_timeout = tokio::time::sleep(PING_INTERVAL);
loop {
tokio::select! {
Some(res) = self.stream.next() => match res {
Expand Down
4 changes: 2 additions & 2 deletions tendermint/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ k256 = { version = "0.5", optional = true, features = ["ecdsa"] }
ripemd160 = { version = "0.9", optional = true }

[dev-dependencies]
tendermint-rpc = { path = "../rpc", features = [ "http-client", "websocket-client" ] }
tokio = { version = "0.2", features = [ "macros" ] }
tendermint-rpc = { path = "../rpc", features = ["http-client", "websocket-client"] }
tokio = { version = "0.3", features = ["macros"] }

[features]
secp256k1 = ["k256", "ripemd160"]
6 changes: 3 additions & 3 deletions tendermint/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ mod rpc {
let mut cur_tx_id = 0_u32;

while !expected_tx_values.is_empty() {
let mut delay = tokio::time::delay_for(Duration::from_secs(3));
let mut delay = tokio::time::sleep(Duration::from_secs(3));
tokio::select! {
Some(res) = subs.next() => {
let ev = res.unwrap();
Expand Down Expand Up @@ -314,7 +314,7 @@ mod rpc {
.broadcast_tx_async(Transaction::from(tx.into_bytes()))
.await
.unwrap();
tokio::time::delay_for(Duration::from_millis(100)).await;
tokio::time::sleep(Duration::from_millis(100)).await;
}
});

Expand All @@ -327,7 +327,7 @@ mod rpc {
);

while expected_new_blocks > 0 && !expected_tx_values.is_empty() {
let mut timeout = tokio::time::delay_for(Duration::from_secs(3));
let mut timeout = tokio::time::sleep(Duration::from_secs(3));
tokio::select! {
Some(res) = combined_subs.next() => {
let ev: Event = res.unwrap();
Expand Down

0 comments on commit 3bad70e

Please sign in to comment.