From 3b4d1427f25cf7649729681b016b66269dad451b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20=C3=81vila=20de=20Esp=C3=ADndola?= Date: Tue, 26 Aug 2025 12:05:57 +0000 Subject: [PATCH 1/2] Hide DisconnectPool The only thing a user could do with it is await, so there is no need to expose the type. --- src/conn/pool/futures/disconnect_pool.rs | 8 ++++++-- src/conn/pool/futures/mod.rs | 3 ++- src/conn/pool/mod.rs | 4 ++-- src/lib.rs | 2 +- tests/exports.rs | 2 +- 5 files changed, 12 insertions(+), 7 deletions(-) diff --git a/src/conn/pool/futures/disconnect_pool.rs b/src/conn/pool/futures/disconnect_pool.rs index b8a07d4d..fa7050fc 100644 --- a/src/conn/pool/futures/disconnect_pool.rs +++ b/src/conn/pool/futures/disconnect_pool.rs @@ -29,13 +29,13 @@ use std::sync::{atomic, Arc}; /// are dropped or disonnected. Also all pending and new `GetConn`'s will resolve to error. #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct DisconnectPool { +struct DisconnectPool { pool_inner: Arc, drop: Option>>, } impl DisconnectPool { - pub(crate) fn new(pool: Pool) -> Self { + fn new(pool: Pool) -> Self { Self { pool_inner: pool.inner, drop: Some(pool.drop), @@ -73,3 +73,7 @@ impl Future for DisconnectPool { } } } + +pub(crate) async fn disconnect_pool(pool: Pool) -> Result<(), Error> { + DisconnectPool::new(pool).await +} diff --git a/src/conn/pool/futures/mod.rs b/src/conn/pool/futures/mod.rs index 00842994..0ef2f698 100644 --- a/src/conn/pool/futures/mod.rs +++ b/src/conn/pool/futures/mod.rs @@ -6,8 +6,9 @@ // option. All files in the project carrying such notice may not be copied, // modified, or distributed except according to those terms. +pub use self::get_conn::GetConn; pub(super) use self::get_conn::GetConnInner; -pub use self::{disconnect_pool::DisconnectPool, get_conn::GetConn}; +pub(crate) use disconnect_pool::disconnect_pool; mod disconnect_pool; mod get_conn; diff --git a/src/conn/pool/mod.rs b/src/conn/pool/mod.rs index de685441..8690f15b 100644 --- a/src/conn/pool/mod.rs +++ b/src/conn/pool/mod.rs @@ -287,8 +287,8 @@ impl Pool { /// /// **Note:** This Future won't resolve until all active connections, taken from it, /// are dropped or disonnected. Also all pending and new `GetConn`'s will resolve to error. - pub fn disconnect(self) -> DisconnectPool { - DisconnectPool::new(self) + pub async fn disconnect(self) -> Result<()> { + disconnect_pool(self).await } /// A way to return connection taken from a pool. diff --git a/src/lib.rs b/src/lib.rs index 4ad9d735..8060d837 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -558,7 +558,7 @@ pub use crate::connection_like::{Connection, ToConnectionResult}; /// Futures used in this crate pub mod futures { - pub use crate::conn::pool::futures::{DisconnectPool, GetConn}; + pub use crate::conn::pool::futures::GetConn; } /// Traits used in this crate diff --git a/tests/exports.rs b/tests/exports.rs index 6f9feef8..2e736f40 100644 --- a/tests/exports.rs +++ b/tests/exports.rs @@ -1,7 +1,7 @@ #[allow(unused_imports)] use mysql_async::{ consts, from_row, from_row_opt, from_value, from_value_opt, - futures::{DisconnectPool, GetConn}, + futures::GetConn, params, prelude::{ BatchQuery, FromRow, FromValue, GlobalHandler, Protocol, Query, Queryable, StatementLike, From 5af6738ea9b0abc31cd5ce5a6b803bdd5064b05b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20=C3=81vila=20de=20Esp=C3=ADndola?= Date: Tue, 26 Aug 2025 16:45:02 +0000 Subject: [PATCH 2/2] Convert disconnect_pool to an async function --- src/conn/pool/futures/disconnect_pool.rs | 73 ++++++------------------ 1 file changed, 19 insertions(+), 54 deletions(-) diff --git a/src/conn/pool/futures/disconnect_pool.rs b/src/conn/pool/futures/disconnect_pool.rs index fa7050fc..631a923b 100644 --- a/src/conn/pool/futures/disconnect_pool.rs +++ b/src/conn/pool/futures/disconnect_pool.rs @@ -7,73 +7,38 @@ // modified, or distributed except according to those terms. use std::{ - future::Future, - pin::Pin, + future::poll_fn, + sync::atomic, task::{Context, Poll}, }; -use futures_core::ready; -use tokio::sync::mpsc::UnboundedSender; - use crate::{ - conn::pool::{Inner, Pool, QUEUE_END_ID}, + conn::pool::{Pool, QUEUE_END_ID}, error::Error, - Conn, }; -use std::sync::{atomic, Arc}; - -/// Future that disconnects this pool from a server and resolves to `()`. +/// Disconnect this pool from a server and resolves to `()`. /// -/// **Note:** This Future won't resolve until all active connections, taken from it, +/// **Note:** This won't resolve until all active connections, taken from the poll, /// are dropped or disonnected. Also all pending and new `GetConn`'s will resolve to error. -#[derive(Debug)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -struct DisconnectPool { - pool_inner: Arc, - drop: Option>>, -} - -impl DisconnectPool { - fn new(pool: Pool) -> Self { - Self { - pool_inner: pool.inner, - drop: Some(pool.drop), - } - } -} +pub(crate) async fn disconnect_pool(pool: Pool) -> Result<(), Error> { + let inner = pool.inner; + let drop = pool.drop; -impl Future for DisconnectPool { - type Output = Result<(), Error>; + inner.close.store(true, atomic::Ordering::Release); - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.pool_inner.close.store(true, atomic::Ordering::Release); - let mut exchange = self.pool_inner.exchange.lock().unwrap(); - exchange.spawn_futures_if_needed(&self.pool_inner); + let f = |cx: &mut Context| { + let mut exchange = inner.exchange.lock().unwrap(); + exchange.spawn_futures_if_needed(&inner); exchange.waiting.push(cx.waker().clone(), QUEUE_END_ID); - drop(exchange); + Poll::Ready(()) + }; + poll_fn(f).await; - if self.pool_inner.closed.load(atomic::Ordering::Acquire) { - Poll::Ready(Ok(())) - } else { - match self.drop.take() { - Some(drop) => match drop.send(None) { - Ok(_) => { - // Recycler is alive. Waiting for it to finish. - ready!(Box::pin(drop.closed()).as_mut().poll(cx)); - Poll::Ready(Ok(())) - } - Err(_) => { - // Recycler seem dead. No one will wake us. - Poll::Ready(Ok(())) - } - }, - None => Poll::Pending, - } - } + if !inner.closed.load(atomic::Ordering::Acquire) && drop.send(None).is_ok() { + // Recycler is alive. Wait for it to finish. + drop.closed().await; } -} -pub(crate) async fn disconnect_pool(pool: Pool) -> Result<(), Error> { - DisconnectPool::new(pool).await + Ok(()) }