Skip to content

Commit

Permalink
Replace some futures_util APIs with std variants
Browse files Browse the repository at this point in the history
  • Loading branch information
paolobarbolini committed Feb 1, 2025
1 parent 5b26369 commit 1dcc9a5
Show file tree
Hide file tree
Showing 12 changed files with 35 additions and 51 deletions.
5 changes: 2 additions & 3 deletions sqlx-core/src/io/write_and_flush.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use crate::error::Error;
use futures_core::Future;
use futures_util::ready;
use sqlx_rt::AsyncWrite;
use std::future::Future;
use std::io::{BufRead, Cursor};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::task::{ready, Context, Poll};

// Atomic operation that writes the full buffer to the stream, flushes the stream, and then
// clears the buffer (even if either of the two previous operations failed).
Expand Down
3 changes: 1 addition & 2 deletions sqlx-core/src/net/socket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ use std::future::Future;
use std::io;
use std::path::Path;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::task::{ready, Context, Poll};

use bytes::BufMut;
use futures_core::ready;

pub use buffered::{BufferedSocket, WriteBuffer};

Expand Down
3 changes: 1 addition & 2 deletions sqlx-core/src/net/tls/util.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use crate::net::Socket;

use std::io::{self, Read, Write};
use std::task::{Context, Poll};
use std::task::{ready, Context, Poll};

use futures_core::ready;
use futures_util::future;

pub struct StdSocket<S> {
Expand Down
16 changes: 5 additions & 11 deletions sqlx-core/src/pool/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::sync::{AsyncSemaphore, AsyncSemaphoreReleaser};

use std::cmp;
use std::future::Future;
use std::pin::pin;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use std::task::Poll;
Expand Down Expand Up @@ -130,19 +131,12 @@ impl<DB: Database> PoolInner<DB> {
// This is just going to cause unnecessary churn in `acquire()`.
.filter(|_| self.size() < self.options.max_connections);

let acquire_self = self.semaphore.acquire(1).fuse();
let mut close_event = self.close_event();
let mut acquire_self = pin!(self.semaphore.acquire(1).fuse());
let mut close_event = pin!(self.close_event());

if let Some(parent) = parent {
let acquire_parent = parent.0.semaphore.acquire(1);
let parent_close_event = parent.0.close_event();

futures_util::pin_mut!(
acquire_parent,
acquire_self,
close_event,
parent_close_event
);
let mut acquire_parent = pin!(parent.0.semaphore.acquire(1));
let mut parent_close_event = pin!(parent.0.close_event());

let mut poll_parent = false;

Expand Down
6 changes: 3 additions & 3 deletions sqlx-core/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::pin::{pin, Pin};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -565,11 +565,11 @@ impl CloseEvent {
.await
.map_or(Ok(()), |_| Err(Error::PoolClosed))?;

futures_util::pin_mut!(fut);
let mut fut = pin!(fut);

// I find that this is clearer in intent than `futures_util::future::select()`
// or `futures_util::select_biased!{}` (which isn't enabled anyway).
futures_util::future::poll_fn(|cx| {
std::future::poll_fn(|cx| {
// Poll `fut` first as the wakeup event is more likely for it than `self`.
if let Poll::Ready(ret) = fut.as_mut().poll(cx) {
return Poll::Ready(Ok(ret));
Expand Down
5 changes: 2 additions & 3 deletions sqlx-mysql/src/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use sqlx_core::database::Database;
use sqlx_core::describe::Describe;
use sqlx_core::executor::Executor;
use sqlx_core::transaction::TransactionManager;
use std::future;
use std::{future, pin::pin};

sqlx_core::declare_driver_with_optional_migrate!(DRIVER = MySql);

Expand Down Expand Up @@ -113,8 +113,7 @@ impl AnyConnectionBackend for MySqlConnection {

Box::pin(async move {
let arguments = arguments?;
let stream = self.run(query, arguments, persistent).await?;
futures_util::pin_mut!(stream);
let mut stream = pin!(self.run(query, arguments, persistent).await?);

while let Some(result) = stream.try_next().await? {
if let Either::Right(row) = result {
Expand Down
7 changes: 3 additions & 4 deletions sqlx-mysql/src/connection/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use either::Either;
use futures_core::future::BoxFuture;
use futures_core::stream::BoxStream;
use futures_core::Stream;
use futures_util::{pin_mut, TryStreamExt};
use std::{borrow::Cow, sync::Arc};
use futures_util::TryStreamExt;
use std::{borrow::Cow, pin::pin, sync::Arc};

impl MySqlConnection {
async fn prepare_statement<'c>(
Expand Down Expand Up @@ -263,8 +263,7 @@ impl<'c> Executor<'c> for &'c mut MySqlConnection {

Box::pin(try_stream! {
let arguments = arguments?;
let s = self.run(sql, arguments, persistent).await?;
pin_mut!(s);
let mut s = pin!(self.run(sql, arguments, persistent).await?);

while let Some(v) = s.try_next().await? {
r#yield!(v);
Expand Down
5 changes: 2 additions & 3 deletions sqlx-postgres/src/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
use futures_core::future::BoxFuture;
use futures_core::stream::BoxStream;
use futures_util::{stream, StreamExt, TryFutureExt, TryStreamExt};
use std::future;
use std::{future, pin::pin};

use sqlx_core::any::{
Any, AnyArguments, AnyColumn, AnyConnectOptions, AnyConnectionBackend, AnyQueryResult, AnyRow,
Expand Down Expand Up @@ -115,8 +115,7 @@ impl AnyConnectionBackend for PgConnection {

Box::pin(async move {
let arguments = arguments?;
let stream = self.run(query, arguments, 1, persistent, None).await?;
futures_util::pin_mut!(stream);
let mut stream = pin!(self.run(query, arguments, 1, persistent, None).await?);

if let Some(Either::Right(row)) = stream.try_next().await? {
return Ok(Some(AnyRow::try_from(&row)?));
Expand Down
10 changes: 4 additions & 6 deletions sqlx-postgres/src/connection/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ use crate::{
use futures_core::future::BoxFuture;
use futures_core::stream::BoxStream;
use futures_core::Stream;
use futures_util::{pin_mut, TryStreamExt};
use futures_util::TryStreamExt;
use sqlx_core::arguments::Arguments;
use sqlx_core::Either;
use std::{borrow::Cow, sync::Arc};
use std::{borrow::Cow, pin::pin, sync::Arc};

async fn prepare(
conn: &mut PgConnection,
Expand Down Expand Up @@ -393,8 +393,7 @@ impl<'c> Executor<'c> for &'c mut PgConnection {

Box::pin(try_stream! {
let arguments = arguments?;
let s = self.run(sql, arguments, 0, persistent, metadata).await?;
pin_mut!(s);
let mut s = pin!(self.run(sql, arguments, 0, persistent, metadata).await?);

while let Some(v) = s.try_next().await? {
r#yield!(v);
Expand All @@ -420,8 +419,7 @@ impl<'c> Executor<'c> for &'c mut PgConnection {

Box::pin(async move {
let arguments = arguments?;
let s = self.run(sql, arguments, 1, persistent, metadata).await?;
pin_mut!(s);
let mut s = pin!(self.run(sql, arguments, 1, persistent, metadata).await?);

// With deferred constraints we need to check all responses as we
// could get a OK response (with uncommitted data), only to get an
Expand Down
13 changes: 7 additions & 6 deletions sqlx-sqlite/src/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use sqlx_core::database::Database;
use sqlx_core::describe::Describe;
use sqlx_core::executor::Executor;
use sqlx_core::transaction::TransactionManager;
use std::pin::pin;

sqlx_core::declare_driver_with_optional_migrate!(DRIVER = Sqlite);

Expand Down Expand Up @@ -105,12 +106,12 @@ impl AnyConnectionBackend for SqliteConnection {
let args = arguments.map(map_arguments);

Box::pin(async move {
let stream = self
.worker
.execute(query, args, self.row_channel_size, persistent, Some(1))
.map_ok(flume::Receiver::into_stream)
.await?;
futures_util::pin_mut!(stream);
let mut stream = pin!(
self.worker
.execute(query, args, self.row_channel_size, persistent, Some(1))
.map_ok(flume::Receiver::into_stream)
.await?
);

if let Some(Either::Right(row)) = stream.try_next().await? {
return Ok(Some(AnyRow::try_from(&row)?));
Expand Down
8 changes: 3 additions & 5 deletions sqlx-sqlite/src/connection/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use sqlx_core::describe::Describe;
use sqlx_core::error::Error;
use sqlx_core::executor::{Execute, Executor};
use sqlx_core::Either;
use std::future;
use std::{future, pin::pin};

impl<'c> Executor<'c> for &'c mut SqliteConnection {
type Database = Sqlite;
Expand Down Expand Up @@ -56,13 +56,11 @@ impl<'c> Executor<'c> for &'c mut SqliteConnection {
let persistent = query.persistent() && arguments.is_some();

Box::pin(async move {
let stream = self
let mut stream = pin!(self
.worker
.execute(sql, arguments, self.row_channel_size, persistent, Some(1))
.map_ok(flume::Receiver::into_stream)
.try_flatten_stream();

futures_util::pin_mut!(stream);
.try_flatten_stream());

while let Some(res) = stream.try_next().await? {
if let Either::Right(row) = res {
Expand Down
5 changes: 2 additions & 3 deletions tests/postgres/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use sqlx::{Column, Connection, Executor, Row, Statement, TypeInfo};
use sqlx_core::{bytes::Bytes, error::BoxDynError};
use sqlx_test::{new, pool, setup_if_needed};
use std::env;
use std::pin::Pin;
use std::pin::{pin, Pin};
use std::sync::Arc;
use std::time::Duration;

Expand Down Expand Up @@ -637,8 +637,7 @@ async fn pool_smoke_test() -> anyhow::Result<()> {
let pool = pool.clone();
sqlx_core::rt::spawn(async move {
while !pool.is_closed() {
let acquire = pool.acquire();
futures::pin_mut!(acquire);
let mut acquire = pin!(pool.acquire());

// poll the acquire future once to put the waiter in the queue
future::poll_fn(move |cx| {
Expand Down

0 comments on commit 1dcc9a5

Please sign in to comment.