Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: remove tokio01::codec #3095

Merged
merged 33 commits into from
Jul 23, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
a4b0362
statsd: remove tokio01 codec
fanatid Jul 15, 2020
060105c
tokio-openssl: update with deprecate
fanatid Jul 16, 2020
4c2e0f4
test_util: remove old codec
fanatid Jul 16, 2020
124d25e
benches/files: update codec
fanatid Jul 17, 2020
1c6d856
tests/syslog: update codec
fanatid Jul 17, 2020
2aa9e4f
tests/tcp: remove codec
fanatid Jul 17, 2020
749dad7
sinks/util/unix: remove codec from test
fanatid Jul 17, 2020
a1700c9
sinks/util/unix: remove codec
fanatid Jul 17, 2020
9ece824
sinks/util/tcp: remove codec
fanatid Jul 17, 2020
db9ab6d
sources/socket/mod: remove tokio-uds
fanatid Jul 17, 2020
edc626f
sources/socket/unix: update codec
fanatid Jul 18, 2020
3d9a874
sources/util/tcp: update codec
fanatid Jul 18, 2020
b874167
sources/syslog: fix new codec
fanatid Jul 18, 2020
9ec7935
sources/syslog: update codec
fanatid Jul 18, 2020
7349085
sources/socket/udp: update codec
fanatid Jul 18, 2020
f916a55
remove crate codec01
fanatid Jul 18, 2020
b709dc7
add tokio-util/udp to features
fanatid Jul 18, 2020
3917d95
sources/statsd: update codec
fanatid Jul 18, 2020
61071b1
remove unused tokio01 features: udp, codec
fanatid Jul 18, 2020
4e3bff0
cargo-fmt
fanatid Jul 18, 2020
880f7d6
remove already enabled feature
fanatid Jul 19, 2020
d9dc853
add shutdown to sources/util/unix
fanatid Jul 19, 2020
d7ca86b
fix uds feature for tests
fanatid Jul 19, 2020
99b8809
fix uds feature again
fanatid Jul 19, 2020
e1b6d79
Merge remote-tracking branch 'upstream/master' into tokio01-codec-remove
fanatid Jul 20, 2020
3cf9fb5
fix udp socket source
fanatid Jul 20, 2020
56d8122
fix syslog test for unix socket
fanatid Jul 20, 2020
fc8cd1a
Merge remote-tracking branch 'upstream/master' into tokio01-codec-remove
fanatid Jul 21, 2020
fc4c810
remove not required delay_for in statsd
fanatid Jul 21, 2020
4e5c05c
add type annotations for long get_ref/get_mut chains
fanatid Jul 21, 2020
9c57084
add explicit type to sinks/util
fanatid Jul 22, 2020
90a2cb3
Merge remote-tracking branch 'upstream/master' into tokio01-codec-remove
fanatid Jul 22, 2020
b4bd844
Merge remote-tracking branch 'upstream/master' into tokio01-codec-remove
fanatid Jul 23, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/internal_events/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl InternalEvent for UnixSocketConnectionEstablished<'_> {

#[derive(Debug)]
pub struct UnixSocketConnectionFailure<'a> {
pub error: std::io::Error,
pub error: tokio::io::Error,
MOZGIII marked this conversation as resolved.
Show resolved Hide resolved
pub path: &'a std::path::Path,
}

Expand Down
109 changes: 57 additions & 52 deletions src/sinks/util/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,18 @@ use crate::{
sinks::{Healthcheck, RouterSink},
topology::config::SinkContext,
};
use bytes::Bytes;
use futures01::{
future, stream::iter_ok, try_ready, Async, AsyncSink, Future, Poll, Sink, StartSend,
};
use bytes05::Bytes;
use futures::{compat::CompatSink, FutureExt, TryFutureExt};
use futures01::{stream::iter_ok, try_ready, Async, AsyncSink, Future, Poll, Sink, StartSend};
use serde::{Deserialize, Serialize};
use snafu::Snafu;
use std::io;
use std::path::PathBuf;
use std::time::{Duration, Instant};
use tokio01::codec::{BytesCodec, FramedWrite};
use tokio01::timer::Delay;
use std::{path::PathBuf, time::Duration};
use tokio::{
net::UnixStream,
time::{delay_for, Delay},
};
use tokio_retry::strategy::ExponentialBackoff;
use tokio_uds::UnixStream;
use tokio_util::codec::{BytesCodec, FramedWrite};
use tracing::field;

#[derive(Deserialize, Serialize, Debug, Clone)]
Expand All @@ -41,27 +40,23 @@ impl UnixSinkConfig {

let sink =
Box::new(sink.with_flat_map(move |event| iter_ok(encode_event(event, &encoding))));
let healthcheck = unix_healthcheck(self.path.clone());
let healthcheck = healthcheck(self.path.clone()).boxed().compat();

Ok((sink, healthcheck))
Ok((sink, Box::new(healthcheck)))
}
}

#[derive(Debug, Snafu)]
enum HealthcheckError {
#[snafu(display("Connect error: {}", source))]
ConnectError { source: std::io::Error },
ConnectError { source: tokio::io::Error },
}

fn unix_healthcheck(path: PathBuf) -> Healthcheck {
// Lazy to avoid immediately connecting
let check = future::lazy(move || {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the reasoning for lazy is still valid here unless you've verified that connect doesn't bind to the port until polled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good note. I actually do not understand why new futures need lazy function, because new futures not executed while not pulled, for example: https://play.rust-lang.org/?version=nightly&mode=debug&edition=2018&gist=1903bb7388f8d2965ee127bf8db2675c

Copy link
Contributor

@MOZGIII MOZGIII Jul 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why new futures need lazy function,

I think it's because Rust can't yet pass bare async closures around, thus this future::lazy.

Here: https://docs.rs/futures-util/0.3.5/src/futures_util/future/lazy.rs.html#34-38, it's very similar to poll! macro: https://docs.rs/futures-util/0.3.5/src/futures_util/async_await/poll.rs.html#13-17

UnixStream::connect(&path)
.map(|_| ())
.map_err(|source| HealthcheckError::ConnectError { source }.into())
});

Box::new(check)
async fn healthcheck(path: PathBuf) -> crate::Result<()> {
match UnixStream::connect(&path).await {
Ok(_) => Ok(()),
Err(source) => Err(HealthcheckError::ConnectError { source }.into()),
}
}

pub struct UnixSink {
Expand All @@ -72,9 +67,9 @@ pub struct UnixSink {

enum UnixSinkState {
Disconnected,
Creating(Box<dyn Future<Item = UnixStream, Error = io::Error> + Send + 'static>),
Open(FramedWrite<UnixStream, BytesCodec>),
Backoff(Delay),
Creating(Box<dyn Future<Item = UnixStream, Error = tokio::io::Error> + Send + 'static>),
Open(CompatSink<FramedWrite<UnixStream, BytesCodec>, Bytes>),
Backoff(Box<dyn Future<Item = (), Error = ()> + Send>),
}

impl UnixSink {
Expand All @@ -94,13 +89,15 @@ impl UnixSink {
}

fn next_delay(&mut self) -> Delay {
Delay::new(Instant::now() + self.backoff.next().unwrap())
delay_for(self.backoff.next().unwrap())
}

/**
* Polls for whether the underlying UnixStream is connected and ready to receive writes.
**/
fn poll_connection(&mut self) -> Poll<&mut FramedWrite<UnixStream, BytesCodec>, ()> {
fn poll_connection(
&mut self,
) -> Poll<&mut CompatSink<FramedWrite<UnixStream, BytesCodec>, Bytes>, ()> {
loop {
self.state = match self.state {
UnixSinkState::Open(ref mut stream) => {
Expand All @@ -115,27 +112,28 @@ impl UnixSink {
error,
path: &self.path
});
UnixSinkState::Backoff(self.next_delay())
let delay = self.next_delay();
let delay = Box::new(async move { Ok(delay.await) }.boxed().compat());
UnixSinkState::Backoff(delay)
}
Ok(Async::Ready(stream)) => {
emit!(UnixSocketConnectionEstablished { path: &self.path });
self.backoff = Self::fresh_backoff();
let out = FramedWrite::new(stream, BytesCodec::new());
UnixSinkState::Open(out)
UnixSinkState::Open(CompatSink::new(out))
}
},
UnixSinkState::Backoff(ref mut delay) => match delay.poll() {
Ok(Async::NotReady) => return Ok(Async::NotReady),
// Err can only occur if the tokio runtime has been shutdown or if more than 2^63 timers have been created
Err(err) => unreachable!(err),
Err(_) => unreachable!(),
fanatid marked this conversation as resolved.
Show resolved Hide resolved
Ok(Async::Ready(())) => UnixSinkState::Disconnected,
},
UnixSinkState::Disconnected => {
debug!(
message = "connecting",
path = &field::display(self.path.to_str().unwrap())
);
let connect_future = UnixStream::connect(&self.path);
let connect_future = UnixStream::connect(self.path.clone()).boxed().compat();
UnixSinkState::Creating(Box::new(connect_future))
}
}
Expand All @@ -144,7 +142,7 @@ impl UnixSink {
}

impl Sink for UnixSink {
type SinkItem = Bytes;
type SinkItem = bytes::Bytes;
type SinkError = ();

fn start_send(&mut self, line: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
Expand All @@ -154,20 +152,29 @@ impl Sink for UnixSink {
Err(_) => {
unreachable!(); // poll_ready() should never return an error
}
Ok(Async::Ready(connection)) => match connection.start_send(line) {
Err(error) => {
emit!(UnixSocketError {
error,
path: &self.path
});
self.state = UnixSinkState::Disconnected;
Ok(AsyncSink::Ready)
}
Ok(res) => {
emit!(UnixSocketEventSent { byte_size });
Ok(res)
Ok(Async::Ready(connection)) => {
let line = Bytes::copy_from_slice(&line);
match connection.start_send(line) {
Err(error) => {
emit!(UnixSocketError {
error,
path: &self.path
});
self.state = UnixSinkState::Disconnected;
Ok(AsyncSink::Ready)
}
Ok(res) => {
emit!(UnixSocketEventSent { byte_size });
Ok(match res {
AsyncSink::Ready => AsyncSink::Ready,
AsyncSink::NotReady(bytes) => {
let bytes = bytes::Bytes::from(&bytes[..]);
AsyncSink::NotReady(bytes)
}
})
}
}
},
}
}
}

Expand Down Expand Up @@ -208,14 +215,12 @@ mod tests {

#[tokio::test]
async fn unix_sink_healthcheck() {
let path = temp_uds_path("valid_uds");
let _listener = UnixListener::bind(&path).unwrap();
let healthcheck = unix_healthcheck(path);
assert!(healthcheck.wait().is_ok());
let good_path = temp_uds_path("valid_uds");
let _listener = UnixListener::bind(&good_path).unwrap();
assert!(healthcheck(good_path).await.is_ok());

let bad_path = temp_uds_path("no_one_listening");
let bad_healthcheck = unix_healthcheck(bad_path);
assert!(bad_healthcheck.wait().is_err());
assert!(healthcheck(bad_path).await.is_err());
}

#[test]
Expand Down