diff --git a/sqlx-core/src/io/write_and_flush.rs b/sqlx-core/src/io/write_and_flush.rs index 9e7824af81..8a0db31293 100644 --- a/sqlx-core/src/io/write_and_flush.rs +++ b/sqlx-core/src/io/write_and_flush.rs @@ -1,10 +1,9 @@ use crate::error::Error; -use futures_core::Future; -use futures_util::ready; use sqlx_rt::AsyncWrite; +use std::future::Future; use std::io::{BufRead, Cursor}; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; // Atomic operation that writes the full buffer to the stream, flushes the stream, and then // clears the buffer (even if either of the two previous operations failed). diff --git a/sqlx-core/src/net/socket/mod.rs b/sqlx-core/src/net/socket/mod.rs index 6b09d318f7..d11f15884e 100644 --- a/sqlx-core/src/net/socket/mod.rs +++ b/sqlx-core/src/net/socket/mod.rs @@ -2,10 +2,9 @@ use std::future::Future; use std::io; use std::path::Path; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; use bytes::BufMut; -use futures_core::ready; pub use buffered::{BufferedSocket, WriteBuffer}; diff --git a/sqlx-core/src/net/tls/util.rs b/sqlx-core/src/net/tls/util.rs index 02a16ef5e1..ddbc7a58f2 100644 --- a/sqlx-core/src/net/tls/util.rs +++ b/sqlx-core/src/net/tls/util.rs @@ -1,9 +1,8 @@ use crate::net::Socket; use std::io::{self, Read, Write}; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; -use futures_core::ready; use futures_util::future; pub struct StdSocket { diff --git a/sqlx-core/src/pool/inner.rs b/sqlx-core/src/pool/inner.rs index bbcc43134e..2066364a8e 100644 --- a/sqlx-core/src/pool/inner.rs +++ b/sqlx-core/src/pool/inner.rs @@ -10,6 +10,7 @@ use crate::sync::{AsyncSemaphore, AsyncSemaphoreReleaser}; use std::cmp; use std::future::Future; +use std::pin::pin; use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; use std::task::Poll; @@ -130,19 +131,12 @@ impl PoolInner { // This is just going to cause unnecessary churn in `acquire()`. .filter(|_| self.size() < self.options.max_connections); - let acquire_self = self.semaphore.acquire(1).fuse(); - let mut close_event = self.close_event(); + let mut acquire_self = pin!(self.semaphore.acquire(1).fuse()); + let mut close_event = pin!(self.close_event()); if let Some(parent) = parent { - let acquire_parent = parent.0.semaphore.acquire(1); - let parent_close_event = parent.0.close_event(); - - futures_util::pin_mut!( - acquire_parent, - acquire_self, - close_event, - parent_close_event - ); + let mut acquire_parent = pin!(parent.0.semaphore.acquire(1)); + let mut parent_close_event = pin!(parent.0.close_event()); let mut poll_parent = false; diff --git a/sqlx-core/src/pool/mod.rs b/sqlx-core/src/pool/mod.rs index e998618413..042bc5c7bc 100644 --- a/sqlx-core/src/pool/mod.rs +++ b/sqlx-core/src/pool/mod.rs @@ -56,7 +56,7 @@ use std::fmt; use std::future::Future; -use std::pin::Pin; +use std::pin::{pin, Pin}; use std::sync::Arc; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; @@ -565,11 +565,11 @@ impl CloseEvent { .await .map_or(Ok(()), |_| Err(Error::PoolClosed))?; - futures_util::pin_mut!(fut); + let mut fut = pin!(fut); // I find that this is clearer in intent than `futures_util::future::select()` // or `futures_util::select_biased!{}` (which isn't enabled anyway). - futures_util::future::poll_fn(|cx| { + std::future::poll_fn(|cx| { // Poll `fut` first as the wakeup event is more likely for it than `self`. if let Poll::Ready(ret) = fut.as_mut().poll(cx) { return Poll::Ready(Ok(ret)); diff --git a/sqlx-mysql/src/any.rs b/sqlx-mysql/src/any.rs index 0466bfc0a4..e01e41d68e 100644 --- a/sqlx-mysql/src/any.rs +++ b/sqlx-mysql/src/any.rs @@ -16,7 +16,7 @@ use sqlx_core::database::Database; use sqlx_core::describe::Describe; use sqlx_core::executor::Executor; use sqlx_core::transaction::TransactionManager; -use std::future; +use std::{future, pin::pin}; sqlx_core::declare_driver_with_optional_migrate!(DRIVER = MySql); @@ -113,8 +113,7 @@ impl AnyConnectionBackend for MySqlConnection { Box::pin(async move { let arguments = arguments?; - let stream = self.run(query, arguments, persistent).await?; - futures_util::pin_mut!(stream); + let mut stream = pin!(self.run(query, arguments, persistent).await?); while let Some(result) = stream.try_next().await? { if let Either::Right(row) = result { diff --git a/sqlx-mysql/src/connection/executor.rs b/sqlx-mysql/src/connection/executor.rs index d7f8fcfa14..d93aac0d68 100644 --- a/sqlx-mysql/src/connection/executor.rs +++ b/sqlx-mysql/src/connection/executor.rs @@ -21,8 +21,8 @@ use either::Either; use futures_core::future::BoxFuture; use futures_core::stream::BoxStream; use futures_core::Stream; -use futures_util::{pin_mut, TryStreamExt}; -use std::{borrow::Cow, sync::Arc}; +use futures_util::TryStreamExt; +use std::{borrow::Cow, pin::pin, sync::Arc}; impl MySqlConnection { async fn prepare_statement<'c>( @@ -263,8 +263,7 @@ impl<'c> Executor<'c> for &'c mut MySqlConnection { Box::pin(try_stream! { let arguments = arguments?; - let s = self.run(sql, arguments, persistent).await?; - pin_mut!(s); + let mut s = pin!(self.run(sql, arguments, persistent).await?); while let Some(v) = s.try_next().await? { r#yield!(v); diff --git a/sqlx-postgres/src/any.rs b/sqlx-postgres/src/any.rs index efa9a044bc..a7b30fb65b 100644 --- a/sqlx-postgres/src/any.rs +++ b/sqlx-postgres/src/any.rs @@ -5,7 +5,7 @@ use crate::{ use futures_core::future::BoxFuture; use futures_core::stream::BoxStream; use futures_util::{stream, StreamExt, TryFutureExt, TryStreamExt}; -use std::future; +use std::{future, pin::pin}; use sqlx_core::any::{ Any, AnyArguments, AnyColumn, AnyConnectOptions, AnyConnectionBackend, AnyQueryResult, AnyRow, @@ -115,8 +115,7 @@ impl AnyConnectionBackend for PgConnection { Box::pin(async move { let arguments = arguments?; - let stream = self.run(query, arguments, 1, persistent, None).await?; - futures_util::pin_mut!(stream); + let mut stream = pin!(self.run(query, arguments, 1, persistent, None).await?); if let Some(Either::Right(row)) = stream.try_next().await? { return Ok(Some(AnyRow::try_from(&row)?)); diff --git a/sqlx-postgres/src/connection/executor.rs b/sqlx-postgres/src/connection/executor.rs index 97503a5004..076c4209f6 100644 --- a/sqlx-postgres/src/connection/executor.rs +++ b/sqlx-postgres/src/connection/executor.rs @@ -15,10 +15,10 @@ use crate::{ use futures_core::future::BoxFuture; use futures_core::stream::BoxStream; use futures_core::Stream; -use futures_util::{pin_mut, TryStreamExt}; +use futures_util::TryStreamExt; use sqlx_core::arguments::Arguments; use sqlx_core::Either; -use std::{borrow::Cow, sync::Arc}; +use std::{borrow::Cow, pin::pin, sync::Arc}; async fn prepare( conn: &mut PgConnection, @@ -393,8 +393,7 @@ impl<'c> Executor<'c> for &'c mut PgConnection { Box::pin(try_stream! { let arguments = arguments?; - let s = self.run(sql, arguments, 0, persistent, metadata).await?; - pin_mut!(s); + let mut s = pin!(self.run(sql, arguments, 0, persistent, metadata).await?); while let Some(v) = s.try_next().await? { r#yield!(v); @@ -420,8 +419,7 @@ impl<'c> Executor<'c> for &'c mut PgConnection { Box::pin(async move { let arguments = arguments?; - let s = self.run(sql, arguments, 1, persistent, metadata).await?; - pin_mut!(s); + let mut s = pin!(self.run(sql, arguments, 1, persistent, metadata).await?); // With deferred constraints we need to check all responses as we // could get a OK response (with uncommitted data), only to get an diff --git a/sqlx-sqlite/src/any.rs b/sqlx-sqlite/src/any.rs index 01600d9931..2cc5855405 100644 --- a/sqlx-sqlite/src/any.rs +++ b/sqlx-sqlite/src/any.rs @@ -17,6 +17,7 @@ use sqlx_core::database::Database; use sqlx_core::describe::Describe; use sqlx_core::executor::Executor; use sqlx_core::transaction::TransactionManager; +use std::pin::pin; sqlx_core::declare_driver_with_optional_migrate!(DRIVER = Sqlite); @@ -105,12 +106,12 @@ impl AnyConnectionBackend for SqliteConnection { let args = arguments.map(map_arguments); Box::pin(async move { - let stream = self - .worker - .execute(query, args, self.row_channel_size, persistent, Some(1)) - .map_ok(flume::Receiver::into_stream) - .await?; - futures_util::pin_mut!(stream); + let mut stream = pin!( + self.worker + .execute(query, args, self.row_channel_size, persistent, Some(1)) + .map_ok(flume::Receiver::into_stream) + .await? + ); if let Some(Either::Right(row)) = stream.try_next().await? { return Ok(Some(AnyRow::try_from(&row)?)); diff --git a/sqlx-sqlite/src/connection/executor.rs b/sqlx-sqlite/src/connection/executor.rs index 541a4f7d4d..1f6ce7726f 100644 --- a/sqlx-sqlite/src/connection/executor.rs +++ b/sqlx-sqlite/src/connection/executor.rs @@ -8,7 +8,7 @@ use sqlx_core::describe::Describe; use sqlx_core::error::Error; use sqlx_core::executor::{Execute, Executor}; use sqlx_core::Either; -use std::future; +use std::{future, pin::pin}; impl<'c> Executor<'c> for &'c mut SqliteConnection { type Database = Sqlite; @@ -56,13 +56,11 @@ impl<'c> Executor<'c> for &'c mut SqliteConnection { let persistent = query.persistent() && arguments.is_some(); Box::pin(async move { - let stream = self + let mut stream = pin!(self .worker .execute(sql, arguments, self.row_channel_size, persistent, Some(1)) .map_ok(flume::Receiver::into_stream) - .try_flatten_stream(); - - futures_util::pin_mut!(stream); + .try_flatten_stream()); while let Some(res) = stream.try_next().await? { if let Either::Right(row) = res { diff --git a/tests/postgres/postgres.rs b/tests/postgres/postgres.rs index 86707e23e6..7de4a9cdc6 100644 --- a/tests/postgres/postgres.rs +++ b/tests/postgres/postgres.rs @@ -9,7 +9,7 @@ use sqlx::{Column, Connection, Executor, Row, Statement, TypeInfo}; use sqlx_core::{bytes::Bytes, error::BoxDynError}; use sqlx_test::{new, pool, setup_if_needed}; use std::env; -use std::pin::Pin; +use std::pin::{pin, Pin}; use std::sync::Arc; use std::time::Duration; @@ -637,8 +637,7 @@ async fn pool_smoke_test() -> anyhow::Result<()> { let pool = pool.clone(); sqlx_core::rt::spawn(async move { while !pool.is_closed() { - let acquire = pool.acquire(); - futures::pin_mut!(acquire); + let mut acquire = pin!(pool.acquire()); // poll the acquire future once to put the waiter in the queue future::poll_fn(move |cx| {