Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only use necessary futures components #900

Closed
wants to merge 9 commits into from
6 changes: 4 additions & 2 deletions kube-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ openssl-tls = ["openssl", "hyper-openssl"]
ws = ["client", "tokio-tungstenite", "rand", "kube-core/ws"]
oauth = ["client", "tame-oauth"]
gzip = ["client", "tower-http/decompression-gzip"]
client = ["config", "__non_core", "hyper", "http-body", "tower", "tower-http", "hyper-timeout", "pin-project", "chrono", "jsonpath_lib", "bytes", "futures", "tokio", "tokio-util", "either"]
client = ["config", "__non_core", "hyper", "http-body", "tower", "tower-http", "hyper-timeout", "pin-project", "chrono", "jsonpath_lib", "bytes", "futures-channel", "futures-util", "tokio", "tokio-util", "either"]
jsonpatch = ["kube-core/jsonpatch"]
admission = ["kube-core/admission"]
config = ["__non_core", "pem", "dirs"]
Expand All @@ -47,7 +47,8 @@ http = "0.2.5"
http-body = { version = "0.4.2", optional = true }
either = { version = "1.6.1", optional = true }
thiserror = "1.0.29"
futures = { version = "0.3.17", optional = true }
futures-channel = { version = "0.3.17", features = ["sink"], optional = true }
futures-util = { version = "0.3.17", features = ["sink"], optional = true }
Comment on lines +50 to +51
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while i like the trimming of dependencies we don't use, the use of all their peripheral crates (when futures have a facade-crate model) makes our documentation look less crisp (can no longer just point to futures, docs need to point at one or more sub-crates) and it feels slightly wrong to push that hierarchy complexity onto us + users.

would it make sense to use futures directly without default features to achieve a similar result?

Copy link
Author

@onalante-msft onalante-msft May 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes our documentation look less crisp

Another wrinkle that I was not aware of was that rustdoc does not include development dependencies when generating the links, so even having futures as a development dependency is not sufficient to keep the documentation in its current form.

would it make sense to use futures directly without default features to achieve a similar result?

I think it would be fine, but I do not have a strong opinion either way. My PR was largely predicated on the above misconception about rustdoc behavior, and I completely agree with your feeling that it seems wrong to use the futures subcrates in official documentation.

Realistically, saving 4 dependencies by using the futures subcrates directly is not all that significant, so I will close this PR for the time being unless there are any that would really like to have this included. Nonetheless, thank you for your time in reviewing the changes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i do appreciate you taking the time to do this. was not aware that the ecosystem was so consistent in the futures ecosystem use (most tend to only use util + core, and occasionally channel) so i think there is real value in aligning.

I think it's actually possible to get around the documentation issue by just referring to futures even though the file we are documenting does not have it in scope. we can use the same trick we use internally by referring to a crate in dev-dependencies (and have full futures listed in dev-dependencies)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's actually possible to get around the documentation issue by just referring to futures

Interesting, I did not know that that was a possibility. I will give it a try and reopen the PR if that is alright with you.

pem = { version = "1.0.1", optional = true }
openssl = { version = "0.10.36", optional = true }
tokio-native-tls = { version = "0.3.0", optional = true }
Expand Down Expand Up @@ -78,6 +79,7 @@ default-features = false
features = []

[dev-dependencies]
futures = "0.3.17"
kube = { path = "../kube", features = ["derive", "client", "ws"], version = "<1.0.0, >=0.61.0" }
tempfile = "3.1.0"
tokio = { version = "1.14.0", features = ["full"] }
Expand Down
2 changes: 1 addition & 1 deletion kube-client/src/api/core_methods.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use either::Either;
use futures::Stream;
use futures_util::Stream;
use serde::{de::DeserializeOwned, Serialize};
use std::fmt::Debug;

Expand Down
14 changes: 6 additions & 8 deletions kube-client/src/api/portforward.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use std::{collections::HashMap, future::Future};

use bytes::{Buf, Bytes};
use futures::{
channel::{mpsc, oneshot},
future, FutureExt, SinkExt, StreamExt,
};
use futures_channel::{mpsc, oneshot};
use futures_util::{future, FutureExt, SinkExt, StreamExt};
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, DuplexStream};
use tokio_tungstenite::{tungstenite as ws, WebSocketStream};
Expand All @@ -28,11 +26,11 @@ pub enum Error {

/// Failed to forward bytes from Pod.
#[error("failed to forward bytes from Pod: {0}")]
ForwardFromPod(#[source] futures::channel::mpsc::SendError),
ForwardFromPod(#[source] mpsc::SendError),

/// Failed to forward bytes to Pod.
#[error("failed to forward bytes to Pod: {0}")]
ForwardToPod(#[source] futures::channel::mpsc::SendError),
ForwardToPod(#[source] mpsc::SendError),

/// Failed to write bytes from Pod.
#[error("failed to write bytes from Pod: {0}")]
Expand Down Expand Up @@ -196,7 +194,7 @@ async fn to_pod_loop(
}

async fn from_pod_loop<S>(
mut ws_stream: futures::stream::SplitStream<WebSocketStream<S>>,
mut ws_stream: futures_util::stream::SplitStream<WebSocketStream<S>>,
mut sender: mpsc::Sender<Message>,
) -> Result<(), Error>
where
Expand Down Expand Up @@ -231,7 +229,7 @@ where
async fn forwarder_loop<S>(
ports: &[u16],
mut receiver: mpsc::Receiver<Message>,
mut ws_sink: futures::stream::SplitSink<WebSocketStream<S>, ws::Message>,
mut ws_sink: futures_util::stream::SplitSink<WebSocketStream<S>, ws::Message>,
mut writers: Vec<tokio::io::WriteHalf<DuplexStream>>,
mut error_senders: Vec<Option<ErrorSender>>,
) -> Result<(), Error>
Expand Down
4 changes: 2 additions & 2 deletions kube-client/src/api/remote_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use std::future::Future;

use k8s_openapi::apimachinery::pkg::apis::meta::v1::Status;

use futures::{
channel::oneshot,
use futures_channel::oneshot;
use futures_util::{
future::{
select,
Either::{Left, Right},
Expand Down
2 changes: 1 addition & 1 deletion kube-client/src/api/subresource.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use bytes::Bytes;
use futures::Stream;
use futures_util::Stream;
use serde::de::DeserializeOwned;
use std::fmt::Debug;

Expand Down
2 changes: 1 addition & 1 deletion kube-client/src/client/auth/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{
};

use chrono::{DateTime, Duration, Utc};
use futures::future::BoxFuture;
use futures_util::future::BoxFuture;
use http::{
header::{InvalidHeaderValue, AUTHORIZATION},
HeaderValue, Request,
Expand Down
2 changes: 1 addition & 1 deletion kube-client/src/client/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{
task::{Context, Poll},
};

use futures::stream::Stream;
use futures_util::Stream;
use http_body::Body;
use pin_project::pin_project;

Expand Down
2 changes: 1 addition & 1 deletion kube-client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
//! retrieve the resources served by the kubernetes API.
use bytes::Bytes;
use either::{Either, Left, Right};
use futures::{self, Stream, StreamExt, TryStream, TryStreamExt};
use futures_util::{Stream, StreamExt, TryStream, TryStreamExt};
use http::{self, Request, Response, StatusCode};
use hyper::Body;
use k8s_openapi::apimachinery::pkg::apis::meta::v1 as k8s_meta_v1;
Expand Down
6 changes: 4 additions & 2 deletions kube-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ features = ["k8s-openapi/v1_23"]
rustdoc-args = ["--cfg", "docsrs"]

[dependencies]
futures = "0.3.17"
futures-channel = { version = "0.3.17", features = ["sink"] }
futures-util = { version = "0.3.17", features = ["sink"] }
kube-client = { path = "../kube-client", version = "=0.72.0", default-features = false, features = ["jsonpatch", "client"] }
derivative = "2.1.1"
serde = "1.0.130"
Expand All @@ -41,9 +42,10 @@ version = "0.14.0"
default-features = false

[dev-dependencies]
futures = "0.3.17"
kube = { path = "../kube", features = ["derive", "client", "runtime"], version = "<1.0.0, >=0.60.0" }
serde_json = "1.0.68"
tokio = { version = "1.14.0", features = ["full", "test-util"] }
tokio = { version = "1.14.0", features = ["macros", "rt-multi-thread", "test-util"] }
rand = "0.8.0"
schemars = "0.8.6"

Expand Down
2 changes: 1 addition & 1 deletion kube-runtime/src/controller/future_hash_map.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use futures::{Future, FutureExt, Stream};
use futures_util::{Future, FutureExt, Stream};
use std::{
collections::HashMap,
hash::Hash,
Expand Down
20 changes: 10 additions & 10 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ use crate::{
};
use backoff::backoff::Backoff;
use derivative::Derivative;
use futures::{
channel,
use futures_util::{
future::{self, BoxFuture},
stream, Future, FutureExt, SinkExt, Stream, StreamExt, TryFuture, TryFutureExt, TryStream, TryStreamExt,
};
Expand Down Expand Up @@ -208,7 +207,7 @@ impl Display for ReconcileReason {
///
/// The `queue` indicates which objects should be reconciled. For the core objects this will usually be
/// the [`reflector`] (piped through [`trigger_self`]). If your core objects own any subobjects then you
/// can also make them trigger reconciliations by [merging](`futures::stream::select`) the [`reflector`]
/// can also make them trigger reconciliations by [merging](futures_util::stream::select) the [`reflector`]
/// with a [`watcher`](watcher()) or [`reflector`](reflector()) for the subobject.
///
/// This is the "hard-mode" version of [`Controller`], which allows you some more customization
Expand All @@ -229,9 +228,10 @@ where
QueueStream::Ok: Into<ReconcileRequest<K>>,
QueueStream::Error: std::error::Error + 'static,
{
let (scheduler_shutdown_tx, scheduler_shutdown_rx) = channel::oneshot::channel();
let (scheduler_shutdown_tx, scheduler_shutdown_rx) = futures_channel::oneshot::channel();
let err_context = context.clone();
let (scheduler_tx, scheduler_rx) = channel::mpsc::channel::<ScheduleRequest<ReconcileRequest<K>>>(100);
let (scheduler_tx, scheduler_rx) =
futures_channel::mpsc::channel::<ScheduleRequest<ReconcileRequest<K>>>(100);
// Create a stream of ObjectRefs that need to be reconciled
trystream_try_via(
// input: stream combining scheduled tasks and user specified inputs event
Expand Down Expand Up @@ -577,7 +577,7 @@ where
///
/// ```
/// # async {
/// use futures::stream::StreamExt;
/// use futures::StreamExt;
/// use k8s_openapi::api::core::v1::ConfigMap;
/// use kube::{
/// Client,
Expand Down Expand Up @@ -691,7 +691,7 @@ where
#[must_use]
pub fn shutdown_on_signal(mut self) -> Self {
async fn shutdown_signal() {
futures::future::select(
futures_util::future::select(
tokio::signal::ctrl_c().map(|_| ()).boxed(),
#[cfg(unix)]
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
Expand All @@ -701,12 +701,12 @@ where
.boxed(),
// Assume that ctrl_c is enough on non-Unix platforms (such as Windows)
#[cfg(not(unix))]
futures::future::pending::<()>(),
futures_util::future::pending::<()>(),
)
.await;
}

let (graceful_tx, graceful_rx) = channel::oneshot::channel();
let (graceful_tx, graceful_rx) = futures_channel::oneshot::channel();
self.graceful_shutdown_selector
.push(graceful_rx.map(|_| ()).boxed());
self.forceful_shutdown_selector.push(
Expand Down Expand Up @@ -757,7 +757,7 @@ where
StreamBackoff::new(self.trigger_selector, self.trigger_backoff)
.take_until(future::select_all(self.graceful_shutdown_selector)),
)
.take_until(futures::future::select_all(self.forceful_shutdown_selector))
.take_until(futures_util::future::select_all(self.forceful_shutdown_selector))
}
}

Expand Down
2 changes: 1 addition & 1 deletion kube-runtime/src/controller/runner.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::future_hash_map::FutureHashMap;
use crate::scheduler::{ScheduleRequest, Scheduler};
use futures::{Future, Stream, StreamExt};
use futures_util::{Future, Stream, StreamExt};
use pin_project::pin_project;
use std::{
hash::Hash,
Expand Down
2 changes: 1 addition & 1 deletion kube-runtime/src/finalizer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Finalizer helper for [`Controller`](crate::Controller) reconcilers
use crate::controller::Action;
use futures::{TryFuture, TryFutureExt};
use futures_util::{TryFuture, TryFutureExt};
use json_patch::{AddOperation, PatchOperation, RemoveOperation, TestOperation};
use kube_client::{
api::{Patch, PatchParams},
Expand Down
2 changes: 1 addition & 1 deletion kube-runtime/src/reflector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ pub mod store;

pub use self::object_ref::{Extra as ObjectRefExtra, ObjectRef};
use crate::watcher;
use futures::{Stream, TryStreamExt};
use futures_util::{Stream, TryStreamExt};
use kube_client::Resource;
use std::hash::Hash;
pub use store::{store, Store};
Expand Down
2 changes: 1 addition & 1 deletion kube-runtime/src/scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Delays and deduplicates [`Stream`] items

use futures::{stream::Fuse, Stream, StreamExt};
use futures_util::{stream::Fuse, Stream, StreamExt};
use pin_project::pin_project;
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
Expand Down
2 changes: 1 addition & 1 deletion kube-runtime/src/utils/event_flatten.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use core::{
pin::Pin,
task::{Context, Poll},
};
use futures::{ready, Stream, TryStream};
use futures_util::{ready, Stream, TryStream};
use pin_project::pin_project;

#[pin_project]
Expand Down
4 changes: 2 additions & 2 deletions kube-runtime/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub use stream_backoff::StreamBackoff;
pub use watch_ext::WatchStreamExt;

use crate::watcher;
use futures::{
use futures_util::{
pin_mut,
stream::{self, Peekable},
Future, FutureExt, Stream, StreamExt, TryStream, TryStreamExt,
Expand Down Expand Up @@ -55,7 +55,7 @@ pub fn try_flatten_touched<K, S: TryStream<Ok = watcher::Event<K>>>(
/// Allows splitting a `Stream` into several streams that each emit a disjoint subset of the input stream's items,
/// like a streaming variant of pattern matching.
///
/// NOTE: The cases MUST be reunited into the same final stream (using `futures::stream::select` or similar),
/// NOTE: The cases MUST be reunited into the same final stream (using [`futures_util::stream::select`] or similar),
/// since cases for rejected items will *not* register wakeup correctly, and may otherwise lose items and/or deadlock.
///
/// NOTE: The whole set of cases will deadlock if there is ever an item that no live case wants to consume.
Expand Down
2 changes: 1 addition & 1 deletion kube-runtime/src/utils/stream_backoff.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{pin::Pin, task::Poll};

use backoff::backoff::Backoff;
use futures::{Future, Stream, TryStream};
use futures_util::{Future, Stream, TryStream};
use pin_project::pin_project;
use tokio::time::{sleep, Instant, Sleep};

Expand Down
2 changes: 1 addition & 1 deletion kube-runtime/src/utils/watch_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
};
use backoff::backoff::Backoff;

use futures::{Stream, TryStream};
use futures_util::{Stream, TryStream};

/// Extension trait for streams returned by [`watcher`](watcher()) or [`reflector`](crate::reflector::reflector)
pub trait WatchStreamExt: Stream {
Expand Down
2 changes: 1 addition & 1 deletion kube-runtime/src/wait.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! Waits for objects to reach desired states
use futures::TryStreamExt;
use futures_util::TryStreamExt;
use kube_client::{Api, Resource};
use serde::de::DeserializeOwned;
use std::fmt::Debug;
Expand Down
8 changes: 4 additions & 4 deletions kube-runtime/src/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use crate::utils::ResetTimerBackoff;
use backoff::{backoff::Backoff, ExponentialBackoff};
use derivative::Derivative;
use futures::{stream::BoxStream, Stream, StreamExt};
use futures_util::{self, stream::BoxStream, Stream, StreamExt};
use kube_client::{
api::{ListParams, Resource, ResourceExt, WatchEvent},
Api,
Expand Down Expand Up @@ -217,8 +217,8 @@ async fn step<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
///
/// Errors from the underlying watch are propagated, after which the stream will go into recovery mode on the next poll.
/// You can apply your own backoff by not polling the stream for a duration after errors.
/// Keep in mind that some [`TryStream`](futures::TryStream) combinators (such as
/// [`try_for_each`](futures::TryStreamExt::try_for_each) and [`try_concat`](futures::TryStreamExt::try_concat))
/// Keep in mind that some [`TryStream`](futures_util::TryStream) combinators (such as
/// [`try_for_each`](futures_util::TryStreamExt::try_for_each) and [`try_concat`](futures_util::TryStreamExt::try_concat))
/// will terminate eagerly as soon as they receive an [`Err`].
///
/// This is intended to provide a safe and atomic input interface for a state store like a [`reflector`].
Expand Down Expand Up @@ -264,7 +264,7 @@ pub fn watcher<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
api: Api<K>,
list_params: ListParams,
) -> impl Stream<Item = Result<Event<K>>> + Send {
futures::stream::unfold(
futures_util::stream::unfold(
(api, list_params, State::Empty),
|(api, list_params, state)| async {
let (event, state) = step(&api, &list_params, state).await;
Expand Down
2 changes: 1 addition & 1 deletion kube/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ version = "0.14.0"
default-features = false

[dev-dependencies]
tokio = { version = "1.14.0", features = ["full"] }
futures = "0.3.17"
tokio = { version = "1.14.0", features = ["macros", "rt-multi-thread"] }
serde_json = "1.0.68"
validator = { version = "0.15.0", features = ["derive"] }
serde = { version = "1.0.130", features = ["derive"] }
Expand Down