bb8/
api.rs

1use std::borrow::Cow;
2use std::error;
3use std::fmt;
4use std::future::Future;
5use std::marker::PhantomData;
6use std::ops::{Deref, DerefMut};
7use std::pin::Pin;
8use std::time::Duration;
9
10use crate::inner::PoolInner;
11use crate::internals::Conn;
12
13/// A generic connection pool.
14pub struct Pool<M>
15where
16    M: ManageConnection,
17{
18    pub(crate) inner: PoolInner<M>,
19}
20
21impl<M> Clone for Pool<M>
22where
23    M: ManageConnection,
24{
25    fn clone(&self) -> Self {
26        Pool {
27            inner: self.inner.clone(),
28        }
29    }
30}
31
32impl<M> fmt::Debug for Pool<M>
33where
34    M: ManageConnection,
35{
36    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
37        f.write_fmt(format_args!("Pool({:?})", self.inner))
38    }
39}
40
41impl<M: ManageConnection> Pool<M> {
42    /// Returns a `Builder` instance to configure a new pool.
43    pub fn builder() -> Builder<M> {
44        Builder::new()
45    }
46
47    /// Retrieves a connection from the pool.
48    pub async fn get(&self) -> Result<PooledConnection<'_, M>, RunError<M::Error>> {
49        self.inner.get().await
50    }
51
52    /// Retrieves an owned connection from the pool
53    ///
54    /// Using an owning `PooledConnection` makes it easier to leak the connection pool. Therefore, [`Pool::get`]
55    /// (which stores a lifetime-bound reference to the pool) should be preferred whenever possible.
56    pub async fn get_owned(&self) -> Result<PooledConnection<'static, M>, RunError<M::Error>> {
57        Ok(PooledConnection {
58            conn: self.get().await?.take(),
59            pool: Cow::Owned(self.inner.clone()),
60            state: ConnectionState::Present,
61        })
62    }
63
64    /// Get a new dedicated connection that will not be managed by the pool.
65    /// An application may want a persistent connection (e.g. to do a
66    /// postgres LISTEN) that will not be closed or repurposed by the pool.
67    ///
68    /// This method allows reusing the manager's configuration but otherwise
69    /// bypassing the pool
70    pub async fn dedicated_connection(&self) -> Result<M::Connection, M::Error> {
71        self.inner.connect().await
72    }
73
74    /// Returns information about the current state of the pool.
75    pub fn state(&self) -> State {
76        self.inner.state()
77    }
78
79    /// Adds a connection to the pool.
80    ///
81    /// If the connection is broken, or the pool is at capacity, the
82    /// connection is not added and instead returned to the caller in Err.
83    pub fn add(&self, conn: M::Connection) -> Result<(), AddError<M::Connection>> {
84        self.inner.try_put(conn)
85    }
86}
87
88/// Information about the state of a `Pool`.
89#[derive(Debug)]
90#[non_exhaustive]
91pub struct State {
92    /// The number of connections currently being managed by the pool.
93    pub connections: u32,
94    /// The number of idle connections.
95    pub idle_connections: u32,
96    /// Statistics about the historical usage of the pool.
97    pub statistics: Statistics,
98}
99
100/// Statistics about the historical usage of the `Pool`.
101#[derive(Debug, Default)]
102#[non_exhaustive]
103pub struct Statistics {
104    /// Total gets performed that did not have to wait for a connection.
105    pub get_direct: u64,
106    /// Total gets performed that had to wait for a connection available.
107    pub get_waited: u64,
108    /// Total gets performed that timed out while waiting for a connection.
109    pub get_timed_out: u64,
110    /// Total time accumulated waiting for a connection.
111    pub get_wait_time: Duration,
112    /// Total connections created.
113    pub connections_created: u64,
114    /// Total connections that were closed due to be in broken state.
115    pub connections_closed_broken: u64,
116    /// Total connections that were closed due to be considered invalid.
117    pub connections_closed_invalid: u64,
118    /// Total connections that were closed because they reached the max
119    /// lifetime.
120    pub connections_closed_max_lifetime: u64,
121    /// Total connections that were closed because they reached the max
122    /// idle timeout.
123    pub connections_closed_idle_timeout: u64,
124}
125
126/// A builder for a connection pool.
127#[derive(Debug)]
128pub struct Builder<M: ManageConnection> {
129    /// The maximum number of connections allowed.
130    pub(crate) max_size: u32,
131    /// The minimum idle connection count the pool will attempt to maintain.
132    pub(crate) min_idle: Option<u32>,
133    /// Whether or not to test the connection on checkout.
134    pub(crate) test_on_check_out: bool,
135    /// The maximum lifetime, if any, that a connection is allowed.
136    pub(crate) max_lifetime: Option<Duration>,
137    /// The duration, if any, after which idle_connections in excess of `min_idle` are closed.
138    pub(crate) idle_timeout: Option<Duration>,
139    /// The duration to wait to start a connection before giving up.
140    pub(crate) connection_timeout: Duration,
141    /// Enable/disable automatic retries on connection creation.
142    pub(crate) retry_connection: bool,
143    /// The error sink.
144    pub(crate) error_sink: Box<dyn ErrorSink<M::Error>>,
145    /// The time interval used to wake up and reap connections.
146    pub(crate) reaper_rate: Duration,
147    /// Queue strategy (FIFO or LIFO)
148    pub(crate) queue_strategy: QueueStrategy,
149    /// User-supplied trait object responsible for initializing connections
150    pub(crate) connection_customizer: Option<Box<dyn CustomizeConnection<M::Connection, M::Error>>>,
151    _p: PhantomData<M>,
152}
153
154/// bb8's queue strategy when getting pool resources
155#[derive(Debug, Default, Clone, Copy)]
156pub enum QueueStrategy {
157    /// First in first out
158    /// This strategy behaves like a queue
159    /// It will evenly spread load on all existing connections, resetting their idle timeouts, maintaining the pool size
160    #[default]
161    Fifo,
162    /// Last in first out
163    /// This behaves like a stack
164    /// It will use the most recently used connection and help to keep the total pool size small by evicting idle connections
165    Lifo,
166}
167
168impl<M: ManageConnection> Default for Builder<M> {
169    fn default() -> Self {
170        Builder {
171            max_size: 10,
172            min_idle: None,
173            test_on_check_out: true,
174            max_lifetime: Some(Duration::from_secs(30 * 60)),
175            idle_timeout: Some(Duration::from_secs(10 * 60)),
176            connection_timeout: Duration::from_secs(30),
177            retry_connection: true,
178            error_sink: Box::new(NopErrorSink),
179            reaper_rate: Duration::from_secs(30),
180            queue_strategy: QueueStrategy::default(),
181            connection_customizer: None,
182            _p: PhantomData,
183        }
184    }
185}
186
187impl<M: ManageConnection> Builder<M> {
188    /// Constructs a new `Builder`.
189    ///
190    /// Parameters are initialized with their default values.
191    #[must_use]
192    pub fn new() -> Self {
193        Builder::default()
194    }
195
196    /// Sets the maximum number of connections managed by the pool.
197    ///
198    /// Defaults to 10.
199    ///
200    /// # Panics
201    ///
202    /// Will panic if `max_size` is 0.
203    #[must_use]
204    pub fn max_size(mut self, max_size: u32) -> Self {
205        assert!(max_size > 0, "max_size must be greater than zero!");
206        self.max_size = max_size;
207        self
208    }
209
210    /// Sets the minimum idle connection count maintained by the pool.
211    ///
212    /// If set, the pool will try to maintain at least this many idle
213    /// connections at all times, while respecting the value of `max_size`.
214    ///
215    /// Defaults to None.
216    #[must_use]
217    pub fn min_idle(mut self, min_idle: impl Into<Option<u32>>) -> Self {
218        self.min_idle = min_idle.into();
219        self
220    }
221
222    /// If true, the health of a connection will be verified through a call to
223    /// `ManageConnection::is_valid` before it is provided to a pool user.
224    ///
225    /// Defaults to true.
226    #[must_use]
227    pub fn test_on_check_out(mut self, test_on_check_out: bool) -> Self {
228        self.test_on_check_out = test_on_check_out;
229        self
230    }
231
232    /// Sets the maximum lifetime of connections in the pool.
233    ///
234    /// If set, connections will be closed at the next reaping after surviving
235    /// past this duration.
236    ///
237    /// If a connection reaches its maximum lifetime while checked out it will be
238    /// closed when it is returned to the pool.
239    ///
240    /// Defaults to 30 minutes.
241    ///
242    /// # Panics
243    ///
244    /// Will panic if `max_lifetime` is 0.
245    #[must_use]
246    pub fn max_lifetime(mut self, max_lifetime: impl Into<Option<Duration>>) -> Self {
247        let max_lifetime = max_lifetime.into();
248        assert_ne!(
249            max_lifetime,
250            Some(Duration::from_secs(0)),
251            "max_lifetime must be greater than zero!"
252        );
253        self.max_lifetime = max_lifetime;
254        self
255    }
256
257    /// Sets the idle timeout used by the pool.
258    ///
259    /// If set, idle connections in excess of `min_idle` will be closed at the
260    /// next reaping after remaining idle past this duration.
261    ///
262    /// Defaults to 10 minutes.
263    ///
264    /// # Panics
265    ///
266    /// Will panic if `idle_timeout` is 0.
267    #[must_use]
268    pub fn idle_timeout(mut self, idle_timeout: impl Into<Option<Duration>>) -> Self {
269        let idle_timeout = idle_timeout.into();
270        assert_ne!(
271            idle_timeout,
272            Some(Duration::from_secs(0)),
273            "idle_timeout must be greater than zero!"
274        );
275        self.idle_timeout = idle_timeout;
276        self
277    }
278
279    /// Sets the connection timeout used by the pool.
280    ///
281    /// Futures returned by `Pool::get` will wait this long before giving up and
282    /// resolving with an error.
283    ///
284    /// Defaults to 30 seconds.
285    ///
286    /// # Panics
287    ///
288    /// Will panic if `connection_timeout` is 0.
289    #[must_use]
290    pub fn connection_timeout(mut self, connection_timeout: Duration) -> Self {
291        assert!(
292            connection_timeout > Duration::from_secs(0),
293            "connection_timeout must be non-zero"
294        );
295        self.connection_timeout = connection_timeout;
296        self
297    }
298
299    /// Instructs the pool to automatically retry connection creation if it fails, until the `connection_timeout` has expired.
300    ///
301    /// Useful for transient connectivity errors like temporary DNS resolution failure
302    /// or intermittent network failures. Some applications however are smart enough to
303    /// know that the server is down and retries won't help (and could actually hurt recovery).
304    /// In that case, it's better to disable retries here and let the pool error out.
305    ///
306    /// Defaults to enabled.
307    #[must_use]
308    pub fn retry_connection(mut self, retry: bool) -> Self {
309        self.retry_connection = retry;
310        self
311    }
312
313    /// Set the sink for errors that are not associated with any particular operation
314    /// on the pool. This can be used to log and monitor failures.
315    ///
316    /// Defaults to `NopErrorSink`.
317    #[must_use]
318    pub fn error_sink(mut self, error_sink: Box<dyn ErrorSink<M::Error>>) -> Self {
319        self.error_sink = error_sink;
320        self
321    }
322
323    /// Used by tests
324    #[allow(dead_code)]
325    #[must_use]
326    pub fn reaper_rate(mut self, reaper_rate: Duration) -> Self {
327        self.reaper_rate = reaper_rate;
328        self
329    }
330
331    /// Sets the queue strategy to be used by the pool
332    ///
333    /// Defaults to `Fifo`.
334    #[must_use]
335    pub fn queue_strategy(mut self, queue_strategy: QueueStrategy) -> Self {
336        self.queue_strategy = queue_strategy;
337        self
338    }
339
340    /// Set the connection customizer to customize newly checked out connections
341    #[must_use]
342    pub fn connection_customizer(
343        mut self,
344        connection_customizer: Box<dyn CustomizeConnection<M::Connection, M::Error>>,
345    ) -> Self {
346        self.connection_customizer = Some(connection_customizer);
347        self
348    }
349
350    fn build_inner(self, manager: M) -> Pool<M> {
351        if let Some(min_idle) = self.min_idle {
352            assert!(
353                self.max_size >= min_idle,
354                "min_idle must be no larger than max_size"
355            );
356        }
357
358        Pool {
359            inner: PoolInner::new(self, manager),
360        }
361    }
362
363    /// Consumes the builder, returning a new, initialized `Pool`.
364    ///
365    /// The `Pool` will not be returned until it has established its configured
366    /// minimum number of connections, or it times out.
367    pub async fn build(self, manager: M) -> Result<Pool<M>, M::Error> {
368        let pool = self.build_inner(manager);
369        pool.inner.start_connections().await.map(|()| pool)
370    }
371
372    /// Consumes the builder, returning a new, initialized `Pool`.
373    ///
374    /// Unlike `build`, this does not wait for any connections to be established
375    /// before returning.
376    pub fn build_unchecked(self, manager: M) -> Pool<M> {
377        let p = self.build_inner(manager);
378        p.inner.spawn_start_connections();
379        p
380    }
381}
382
383/// A trait which provides connection-specific functionality.
384pub trait ManageConnection: Sized + Send + Sync + 'static {
385    /// The connection type this manager deals with.
386    type Connection: Send + 'static;
387    /// The error type returned by `Connection`s.
388    type Error: fmt::Debug + Send + 'static;
389
390    /// Attempts to create a new connection.
391    fn connect(&self) -> impl Future<Output = Result<Self::Connection, Self::Error>> + Send;
392    /// Determines if the connection is still connected to the database.
393    fn is_valid(
394        &self,
395        conn: &mut Self::Connection,
396    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
397    /// Synchronously determine if the connection is no longer usable, if possible.
398    fn has_broken(&self, conn: &mut Self::Connection) -> bool;
399}
400
401/// A trait which provides functionality to initialize a connection
402pub trait CustomizeConnection<C: Send + 'static, E: 'static>:
403    fmt::Debug + Send + Sync + 'static
404{
405    /// Called with connections immediately after they are returned from
406    /// `ManageConnection::connect`.
407    ///
408    /// The default implementation simply returns `Ok(())`. If this method returns an
409    /// error, it will be forwarded to the configured error sink.
410    fn on_acquire<'a>(
411        &'a self,
412        _connection: &'a mut C,
413    ) -> Pin<Box<dyn Future<Output = Result<(), E>> + Send + 'a>> {
414        Box::pin(async { Ok(()) })
415    }
416}
417
418/// A smart pointer wrapping a connection.
419pub struct PooledConnection<'a, M>
420where
421    M: ManageConnection,
422{
423    pool: Cow<'a, PoolInner<M>>,
424    conn: Option<Conn<M::Connection>>,
425    pub(crate) state: ConnectionState,
426}
427
428impl<'a, M> PooledConnection<'a, M>
429where
430    M: ManageConnection,
431{
432    pub(crate) fn new(pool: &'a PoolInner<M>, conn: Conn<M::Connection>) -> Self {
433        Self {
434            pool: Cow::Borrowed(pool),
435            conn: Some(conn),
436            state: ConnectionState::Present,
437        }
438    }
439
440    pub(crate) fn take(mut self) -> Option<Conn<M::Connection>> {
441        self.state = ConnectionState::Extracted;
442        self.conn.take()
443    }
444}
445
446impl<M> Deref for PooledConnection<'_, M>
447where
448    M: ManageConnection,
449{
450    type Target = M::Connection;
451
452    fn deref(&self) -> &Self::Target {
453        &self.conn.as_ref().unwrap().conn
454    }
455}
456
457impl<M> DerefMut for PooledConnection<'_, M>
458where
459    M: ManageConnection,
460{
461    fn deref_mut(&mut self) -> &mut M::Connection {
462        &mut self.conn.as_mut().unwrap().conn
463    }
464}
465
466impl<M> fmt::Debug for PooledConnection<'_, M>
467where
468    M: ManageConnection,
469    M::Connection: fmt::Debug,
470{
471    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
472        fmt::Debug::fmt(&self.conn.as_ref().unwrap().conn, fmt)
473    }
474}
475
476impl<M> Drop for PooledConnection<'_, M>
477where
478    M: ManageConnection,
479{
480    fn drop(&mut self) {
481        if let ConnectionState::Extracted = self.state {
482            return;
483        }
484
485        debug_assert!(self.conn.is_some(), "incorrect state {:?}", self.state);
486        if let Some(conn) = self.conn.take() {
487            self.pool.as_ref().put_back(conn, self.state);
488        }
489    }
490}
491
492#[derive(Debug, Clone, Copy)]
493pub(crate) enum ConnectionState {
494    Present,
495    Extracted,
496    Invalid,
497}
498
499/// bb8's error type.
500#[derive(Debug, Clone, PartialEq, Eq)]
501pub enum RunError<E> {
502    /// An error returned from user code.
503    User(E),
504    /// bb8 attempted to get a connection but the provided timeout was exceeded.
505    TimedOut,
506}
507
508impl<E> fmt::Display for RunError<E>
509where
510    E: error::Error + 'static,
511{
512    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
513        match *self {
514            RunError::User(ref err) => write!(f, "{err}"),
515            RunError::TimedOut => write!(f, "Timed out in bb8"),
516        }
517    }
518}
519
520impl<E> error::Error for RunError<E>
521where
522    E: error::Error + 'static,
523{
524    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
525        match *self {
526            RunError::User(ref err) => Some(err),
527            RunError::TimedOut => None,
528        }
529    }
530}
531
532impl<E> From<E> for RunError<E>
533where
534    E: error::Error,
535{
536    fn from(error: E) -> Self {
537        Self::User(error)
538    }
539}
540
541/// Error type returned by `Pool::add(conn)`
542#[derive(Debug, Clone, PartialEq, Eq)]
543pub enum AddError<C> {
544    /// The connection was broken before it could be added.
545    Broken(C),
546    /// Unable to add the connection to the pool due to insufficient capacity.
547    NoCapacity(C),
548}
549
550impl<E: error::Error + 'static> fmt::Display for AddError<E> {
551    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
552        match *self {
553            AddError::Broken(_) => write!(f, "The connection was broken before it could be added"),
554            AddError::NoCapacity(_) => write!(
555                f,
556                "Unable to add the connection to the pool due to insufficient capacity"
557            ),
558        }
559    }
560}
561
562impl<E: error::Error + 'static> error::Error for AddError<E> {
563    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
564        None
565    }
566}
567
568/// A trait to receive errors generated by connection management that aren't
569/// tied to any particular caller.
570pub trait ErrorSink<E>: fmt::Debug + Send + Sync + 'static {
571    /// Receive an error
572    fn sink(&self, error: E);
573
574    /// Clone this sink.
575    fn boxed_clone(&self) -> Box<dyn ErrorSink<E>>;
576}
577
578/// An `ErrorSink` implementation that does nothing.
579#[derive(Debug, Clone, Copy)]
580pub struct NopErrorSink;
581
582impl<E> ErrorSink<E> for NopErrorSink {
583    fn sink(&self, _: E) {}
584
585    fn boxed_clone(&self) -> Box<dyn ErrorSink<E>> {
586        Box::new(*self)
587    }
588}