Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: remove tokio01::codec #3095

Merged
merged 33 commits into from
Jul 23, 2020
Merged
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
a4b0362
statsd: remove tokio01 codec
fanatid Jul 15, 2020
060105c
tokio-openssl: update with deprecate
fanatid Jul 16, 2020
4c2e0f4
test_util: remove old codec
fanatid Jul 16, 2020
124d25e
benches/files: update codec
fanatid Jul 17, 2020
1c6d856
tests/syslog: update codec
fanatid Jul 17, 2020
2aa9e4f
tests/tcp: remove codec
fanatid Jul 17, 2020
749dad7
sinks/util/unix: remove codec from test
fanatid Jul 17, 2020
a1700c9
sinks/util/unix: remove codec
fanatid Jul 17, 2020
9ece824
sinks/util/tcp: remove codec
fanatid Jul 17, 2020
db9ab6d
sources/socket/mod: remove tokio-uds
fanatid Jul 17, 2020
edc626f
sources/socket/unix: update codec
fanatid Jul 18, 2020
3d9a874
sources/util/tcp: update codec
fanatid Jul 18, 2020
b874167
sources/syslog: fix new codec
fanatid Jul 18, 2020
9ec7935
sources/syslog: update codec
fanatid Jul 18, 2020
7349085
sources/socket/udp: update codec
fanatid Jul 18, 2020
f916a55
remove crate codec01
fanatid Jul 18, 2020
b709dc7
add tokio-util/udp to features
fanatid Jul 18, 2020
3917d95
sources/statsd: update codec
fanatid Jul 18, 2020
61071b1
remove unused tokio01 features: udp, codec
fanatid Jul 18, 2020
4e3bff0
cargo-fmt
fanatid Jul 18, 2020
880f7d6
remove already enabled feature
fanatid Jul 19, 2020
d9dc853
add shutdown to sources/util/unix
fanatid Jul 19, 2020
d7ca86b
fix uds feature for tests
fanatid Jul 19, 2020
99b8809
fix uds feature again
fanatid Jul 19, 2020
e1b6d79
Merge remote-tracking branch 'upstream/master' into tokio01-codec-remove
fanatid Jul 20, 2020
3cf9fb5
fix udp socket source
fanatid Jul 20, 2020
56d8122
fix syslog test for unix socket
fanatid Jul 20, 2020
fc8cd1a
Merge remote-tracking branch 'upstream/master' into tokio01-codec-remove
fanatid Jul 21, 2020
fc4c810
remove not required delay_for in statsd
fanatid Jul 21, 2020
4e5c05c
add type annotations for long get_ref/get_mut chains
fanatid Jul 21, 2020
9c57084
add explicit type to sinks/util
fanatid Jul 22, 2020
90a2cb3
Merge remote-tracking branch 'upstream/master' into tokio01-codec-remove
fanatid Jul 22, 2020
b4bd844
Merge remote-tracking branch 'upstream/master' into tokio01-codec-remove
fanatid Jul 23, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 43 additions & 20 deletions src/sinks/util/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,28 @@ use crate::{
tls::{MaybeTlsConnector, MaybeTlsSettings, MaybeTlsStream, TlsConfig},
topology::config::SinkContext,
};
use bytes::Bytes;
use bytes05::Bytes;
use futures::{
compat::{Compat01As03, CompatSink},
FutureExt, TryFutureExt,
};
use futures01::{
future, stream::iter_ok, try_ready, Async, AsyncSink, Future, Poll, Sink, StartSend,
};
use serde::{Deserialize, Serialize};
use snafu::Snafu;
use std::io::{ErrorKind, Read};
use std::net::SocketAddr;
use std::time::{Duration, Instant};
use tokio01::{
codec::{BytesCodec, FramedWrite},
net::tcp::TcpStream,
timer::Delay,
use std::{
io::{ErrorKind, Read},
net::SocketAddr,
time::Duration,
};
use tokio::time::{delay_for, Delay};
use tokio01::net::tcp::TcpStream;
use tokio_retry::strategy::ExponentialBackoff;
use tokio_util::{
codec::{BytesCodec, FramedWrite},
compat::{Compat, FuturesAsyncWriteCompatExt},
};

#[derive(Deserialize, Serialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
Expand Down Expand Up @@ -79,10 +86,11 @@ enum TcpSinkState {
ResolvingDns(crate::dns::ResolverFuture),
Connecting(MaybeTlsConnector),
Connected(TcpOrTlsStream),
Backoff(Delay),
Backoff(Box<dyn Future<Item = (), Error = ()> + Send>),
MOZGIII marked this conversation as resolved.
Show resolved Hide resolved
}

type TcpOrTlsStream = FramedWrite<MaybeTlsStream<TcpStream>, BytesCodec>;
type TcpOrTlsStream =
CompatSink<FramedWrite<Compat<Compat01As03<MaybeTlsStream<TcpStream>>>, BytesCodec>, Bytes>;

impl TcpSink {
pub fn new(host: String, port: u16, resolver: Resolver, tls: MaybeTlsSettings) -> Self {
Expand Down Expand Up @@ -115,7 +123,12 @@ impl TcpSink {
}

fn next_delay(&mut self) -> Delay {
Delay::new(Instant::now() + self.backoff.next().unwrap())
delay_for(self.backoff.next().unwrap())
}

fn next_delay01(&mut self) -> Box<dyn Future<Item = (), Error = ()> + Send> {
let delay = self.next_delay();
Box::new(async move { Ok(delay.await) }.boxed().compat())
}

fn poll_connection(&mut self) -> Poll<&mut TcpOrTlsStream, ()> {
Expand All @@ -137,24 +150,24 @@ impl TcpSink {
Ok(connector) => TcpSinkState::Connecting(connector),
Err(error) => {
error!(message = "unable to connect", %error);
TcpSinkState::Backoff(self.next_delay())
TcpSinkState::Backoff(self.next_delay01())
}
}
} else {
error!("DNS resolved but there were no IP addresses.");
TcpSinkState::Backoff(self.next_delay())
TcpSinkState::Backoff(self.next_delay01())
}
}
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(error) => {
error!(message = "unable to resolve dns.", %error);
TcpSinkState::Backoff(self.next_delay())
TcpSinkState::Backoff(self.next_delay01())
}
},
TcpSinkState::Backoff(ref mut delay) => match delay.poll() {
Ok(Async::NotReady) => return Ok(Async::NotReady),
// Err can only occur if the tokio runtime has been shutdown or if more than 2^63 timers have been created
Err(err) => unreachable!(err),
Err(_) => unreachable!(),
fanatid marked this conversation as resolved.
Show resolved Hide resolved
Ok(Async::Ready(())) => {
debug!(message = "disconnected.");
TcpSinkState::Disconnected
Expand All @@ -166,12 +179,14 @@ impl TcpSink {
peer_addr: stream.peer_addr().ok(),
});
self.backoff = Self::fresh_backoff();
TcpSinkState::Connected(FramedWrite::new(stream, BytesCodec::new()))
let stream = Compat01As03::new(stream).compat_write();
let out = FramedWrite::new(stream, BytesCodec::new());
TcpSinkState::Connected(CompatSink::new(out))
}
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(error) => {
emit!(TcpConnectionFailed { error });
TcpSinkState::Backoff(self.next_delay())
TcpSinkState::Backoff(self.next_delay01())
}
},
TcpSinkState::Connected(ref mut connection) => return Ok(Async::Ready(connection)),
Expand All @@ -181,7 +196,7 @@ impl TcpSink {
}

impl Sink for TcpSink {
type SinkItem = Bytes;
type SinkItem = bytes::Bytes;
type SinkError = ();

fn start_send(&mut self, line: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
Expand All @@ -198,7 +213,8 @@ impl Sink for TcpSink {
//
// If this returns `WouldBlock` we know the connection is still
// valid and the write will most likely succeed.
match connection.get_mut().read(&mut [0u8; 1]) {
let stream = connection.get_mut().get_mut().get_mut().get_mut();
fanatid marked this conversation as resolved.
Show resolved Hide resolved
match stream.read(&mut [0u8; 1]) {
Err(error) if error.kind() != ErrorKind::WouldBlock => {
emit!(TcpConnectionDisconnected { error });
self.state = TcpSinkState::Disconnected;
Expand All @@ -225,13 +241,20 @@ impl Sink for TcpSink {
emit!(TcpEventSent {
byte_size: line.len()
});
let line = Bytes::copy_from_slice(&line);
Copy link
Contributor

@MOZGIII MOZGIII Jul 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This a conversion between two bytes crates in unfortunate. The extra copies are probably not that big of a hit to the performance, and it's temporary, so I guess it's ok. Just pointing it out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. The bad part of such slow update that in the future we will need to find and rid all such conversions. Also new bytes have extra copying on converting between Bytes <=> BytesMut 😞

Copy link
Contributor

@MOZGIII MOZGIII Jul 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, yet it seems kind of odd that Bytes were convertible to BytesMut before in the first place. The API seems to have been incorrect, thus we're probably using it incorrectly... We probably should do an audit of the code as part of old bytes removal - I bet there are a lot of places where we can avoid copying where we could just switch to Bytes to BytesMut (or do sth similar).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An interesting thing, will clippy catch extra operations with Bytes or not. I already started improving bytes from BytesMut => Bytes through freeze, but this pretty hard for my current Rust level. tokio-rs/bytes#418

match connection.start_send(line) {
Err(error) => {
error!(message = "connection disconnected.", %error);
self.state = TcpSinkState::Disconnected;
Ok(AsyncSink::Ready)
}
Ok(ok) => Ok(ok),
Ok(res) => Ok(match res {
AsyncSink::Ready => AsyncSink::Ready,
AsyncSink::NotReady(bytes) => {
let bytes = bytes::Bytes::from(&bytes[..]);
AsyncSink::NotReady(bytes)
}
}),
}
}
}
Expand Down