1use std::borrow::Cow;
2use std::error;
3use std::fmt;
4use std::future::Future;
5use std::marker::PhantomData;
6use std::ops::{Deref, DerefMut};
7use std::pin::Pin;
8use std::time::Duration;
9
10use crate::inner::PoolInner;
11use crate::internals::Conn;
12
13pub struct Pool<M>
15where
16 M: ManageConnection,
17{
18 pub(crate) inner: PoolInner<M>,
19}
20
21impl<M> Clone for Pool<M>
22where
23 M: ManageConnection,
24{
25 fn clone(&self) -> Self {
26 Pool {
27 inner: self.inner.clone(),
28 }
29 }
30}
31
32impl<M> fmt::Debug for Pool<M>
33where
34 M: ManageConnection,
35{
36 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
37 f.write_fmt(format_args!("Pool({:?})", self.inner))
38 }
39}
40
41impl<M: ManageConnection> Pool<M> {
42 pub fn builder() -> Builder<M> {
44 Builder::new()
45 }
46
47 pub async fn get(&self) -> Result<PooledConnection<'_, M>, RunError<M::Error>> {
49 self.inner.get().await
50 }
51
52 pub async fn get_owned(&self) -> Result<PooledConnection<'static, M>, RunError<M::Error>> {
57 Ok(PooledConnection {
58 conn: self.get().await?.take(),
59 pool: Cow::Owned(self.inner.clone()),
60 state: ConnectionState::Present,
61 })
62 }
63
64 pub async fn dedicated_connection(&self) -> Result<M::Connection, M::Error> {
71 self.inner.connect().await
72 }
73
74 pub fn state(&self) -> State {
76 self.inner.state()
77 }
78
79 pub fn add(&self, conn: M::Connection) -> Result<(), AddError<M::Connection>> {
84 self.inner.try_put(conn)
85 }
86}
87
88#[derive(Debug)]
90#[non_exhaustive]
91pub struct State {
92 pub connections: u32,
94 pub idle_connections: u32,
96 pub statistics: Statistics,
98}
99
100#[derive(Debug, Default)]
102#[non_exhaustive]
103pub struct Statistics {
104 pub get_direct: u64,
106 pub get_waited: u64,
108 pub get_timed_out: u64,
110 pub get_wait_time: Duration,
112 pub connections_created: u64,
114 pub connections_closed_broken: u64,
116 pub connections_closed_invalid: u64,
118 pub connections_closed_max_lifetime: u64,
121 pub connections_closed_idle_timeout: u64,
124}
125
126#[derive(Debug)]
128pub struct Builder<M: ManageConnection> {
129 pub(crate) max_size: u32,
131 pub(crate) min_idle: Option<u32>,
133 pub(crate) test_on_check_out: bool,
135 pub(crate) max_lifetime: Option<Duration>,
137 pub(crate) idle_timeout: Option<Duration>,
139 pub(crate) connection_timeout: Duration,
141 pub(crate) retry_connection: bool,
143 pub(crate) error_sink: Box<dyn ErrorSink<M::Error>>,
145 pub(crate) reaper_rate: Duration,
147 pub(crate) queue_strategy: QueueStrategy,
149 pub(crate) connection_customizer: Option<Box<dyn CustomizeConnection<M::Connection, M::Error>>>,
151 _p: PhantomData<M>,
152}
153
154#[derive(Debug, Default, Clone, Copy)]
156pub enum QueueStrategy {
157 #[default]
161 Fifo,
162 Lifo,
166}
167
168impl<M: ManageConnection> Default for Builder<M> {
169 fn default() -> Self {
170 Builder {
171 max_size: 10,
172 min_idle: None,
173 test_on_check_out: true,
174 max_lifetime: Some(Duration::from_secs(30 * 60)),
175 idle_timeout: Some(Duration::from_secs(10 * 60)),
176 connection_timeout: Duration::from_secs(30),
177 retry_connection: true,
178 error_sink: Box::new(NopErrorSink),
179 reaper_rate: Duration::from_secs(30),
180 queue_strategy: QueueStrategy::default(),
181 connection_customizer: None,
182 _p: PhantomData,
183 }
184 }
185}
186
187impl<M: ManageConnection> Builder<M> {
188 #[must_use]
192 pub fn new() -> Self {
193 Builder::default()
194 }
195
196 #[must_use]
204 pub fn max_size(mut self, max_size: u32) -> Self {
205 assert!(max_size > 0, "max_size must be greater than zero!");
206 self.max_size = max_size;
207 self
208 }
209
210 #[must_use]
217 pub fn min_idle(mut self, min_idle: impl Into<Option<u32>>) -> Self {
218 self.min_idle = min_idle.into();
219 self
220 }
221
222 #[must_use]
227 pub fn test_on_check_out(mut self, test_on_check_out: bool) -> Self {
228 self.test_on_check_out = test_on_check_out;
229 self
230 }
231
232 #[must_use]
246 pub fn max_lifetime(mut self, max_lifetime: impl Into<Option<Duration>>) -> Self {
247 let max_lifetime = max_lifetime.into();
248 assert_ne!(
249 max_lifetime,
250 Some(Duration::from_secs(0)),
251 "max_lifetime must be greater than zero!"
252 );
253 self.max_lifetime = max_lifetime;
254 self
255 }
256
257 #[must_use]
268 pub fn idle_timeout(mut self, idle_timeout: impl Into<Option<Duration>>) -> Self {
269 let idle_timeout = idle_timeout.into();
270 assert_ne!(
271 idle_timeout,
272 Some(Duration::from_secs(0)),
273 "idle_timeout must be greater than zero!"
274 );
275 self.idle_timeout = idle_timeout;
276 self
277 }
278
279 #[must_use]
290 pub fn connection_timeout(mut self, connection_timeout: Duration) -> Self {
291 assert!(
292 connection_timeout > Duration::from_secs(0),
293 "connection_timeout must be non-zero"
294 );
295 self.connection_timeout = connection_timeout;
296 self
297 }
298
299 #[must_use]
308 pub fn retry_connection(mut self, retry: bool) -> Self {
309 self.retry_connection = retry;
310 self
311 }
312
313 #[must_use]
318 pub fn error_sink(mut self, error_sink: Box<dyn ErrorSink<M::Error>>) -> Self {
319 self.error_sink = error_sink;
320 self
321 }
322
323 #[allow(dead_code)]
325 #[must_use]
326 pub fn reaper_rate(mut self, reaper_rate: Duration) -> Self {
327 self.reaper_rate = reaper_rate;
328 self
329 }
330
331 #[must_use]
335 pub fn queue_strategy(mut self, queue_strategy: QueueStrategy) -> Self {
336 self.queue_strategy = queue_strategy;
337 self
338 }
339
340 #[must_use]
342 pub fn connection_customizer(
343 mut self,
344 connection_customizer: Box<dyn CustomizeConnection<M::Connection, M::Error>>,
345 ) -> Self {
346 self.connection_customizer = Some(connection_customizer);
347 self
348 }
349
350 fn build_inner(self, manager: M) -> Pool<M> {
351 if let Some(min_idle) = self.min_idle {
352 assert!(
353 self.max_size >= min_idle,
354 "min_idle must be no larger than max_size"
355 );
356 }
357
358 Pool {
359 inner: PoolInner::new(self, manager),
360 }
361 }
362
363 pub async fn build(self, manager: M) -> Result<Pool<M>, M::Error> {
368 let pool = self.build_inner(manager);
369 pool.inner.start_connections().await.map(|()| pool)
370 }
371
372 pub fn build_unchecked(self, manager: M) -> Pool<M> {
377 let p = self.build_inner(manager);
378 p.inner.spawn_start_connections();
379 p
380 }
381}
382
383pub trait ManageConnection: Sized + Send + Sync + 'static {
385 type Connection: Send + 'static;
387 type Error: fmt::Debug + Send + 'static;
389
390 fn connect(&self) -> impl Future<Output = Result<Self::Connection, Self::Error>> + Send;
392 fn is_valid(
394 &self,
395 conn: &mut Self::Connection,
396 ) -> impl Future<Output = Result<(), Self::Error>> + Send;
397 fn has_broken(&self, conn: &mut Self::Connection) -> bool;
399}
400
401pub trait CustomizeConnection<C: Send + 'static, E: 'static>:
403 fmt::Debug + Send + Sync + 'static
404{
405 fn on_acquire<'a>(
411 &'a self,
412 _connection: &'a mut C,
413 ) -> Pin<Box<dyn Future<Output = Result<(), E>> + Send + 'a>> {
414 Box::pin(async { Ok(()) })
415 }
416}
417
418pub struct PooledConnection<'a, M>
420where
421 M: ManageConnection,
422{
423 pool: Cow<'a, PoolInner<M>>,
424 conn: Option<Conn<M::Connection>>,
425 pub(crate) state: ConnectionState,
426}
427
428impl<'a, M> PooledConnection<'a, M>
429where
430 M: ManageConnection,
431{
432 pub(crate) fn new(pool: &'a PoolInner<M>, conn: Conn<M::Connection>) -> Self {
433 Self {
434 pool: Cow::Borrowed(pool),
435 conn: Some(conn),
436 state: ConnectionState::Present,
437 }
438 }
439
440 pub(crate) fn take(mut self) -> Option<Conn<M::Connection>> {
441 self.state = ConnectionState::Extracted;
442 self.conn.take()
443 }
444}
445
446impl<M> Deref for PooledConnection<'_, M>
447where
448 M: ManageConnection,
449{
450 type Target = M::Connection;
451
452 fn deref(&self) -> &Self::Target {
453 &self.conn.as_ref().unwrap().conn
454 }
455}
456
457impl<M> DerefMut for PooledConnection<'_, M>
458where
459 M: ManageConnection,
460{
461 fn deref_mut(&mut self) -> &mut M::Connection {
462 &mut self.conn.as_mut().unwrap().conn
463 }
464}
465
466impl<M> fmt::Debug for PooledConnection<'_, M>
467where
468 M: ManageConnection,
469 M::Connection: fmt::Debug,
470{
471 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
472 fmt::Debug::fmt(&self.conn.as_ref().unwrap().conn, fmt)
473 }
474}
475
476impl<M> Drop for PooledConnection<'_, M>
477where
478 M: ManageConnection,
479{
480 fn drop(&mut self) {
481 if let ConnectionState::Extracted = self.state {
482 return;
483 }
484
485 debug_assert!(self.conn.is_some(), "incorrect state {:?}", self.state);
486 if let Some(conn) = self.conn.take() {
487 self.pool.as_ref().put_back(conn, self.state);
488 }
489 }
490}
491
492#[derive(Debug, Clone, Copy)]
493pub(crate) enum ConnectionState {
494 Present,
495 Extracted,
496 Invalid,
497}
498
499#[derive(Debug, Clone, PartialEq, Eq)]
501pub enum RunError<E> {
502 User(E),
504 TimedOut,
506}
507
508impl<E> fmt::Display for RunError<E>
509where
510 E: error::Error + 'static,
511{
512 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
513 match *self {
514 RunError::User(ref err) => write!(f, "{err}"),
515 RunError::TimedOut => write!(f, "Timed out in bb8"),
516 }
517 }
518}
519
520impl<E> error::Error for RunError<E>
521where
522 E: error::Error + 'static,
523{
524 fn source(&self) -> Option<&(dyn error::Error + 'static)> {
525 match *self {
526 RunError::User(ref err) => Some(err),
527 RunError::TimedOut => None,
528 }
529 }
530}
531
532impl<E> From<E> for RunError<E>
533where
534 E: error::Error,
535{
536 fn from(error: E) -> Self {
537 Self::User(error)
538 }
539}
540
541#[derive(Debug, Clone, PartialEq, Eq)]
543pub enum AddError<C> {
544 Broken(C),
546 NoCapacity(C),
548}
549
550impl<E: error::Error + 'static> fmt::Display for AddError<E> {
551 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
552 match *self {
553 AddError::Broken(_) => write!(f, "The connection was broken before it could be added"),
554 AddError::NoCapacity(_) => write!(
555 f,
556 "Unable to add the connection to the pool due to insufficient capacity"
557 ),
558 }
559 }
560}
561
562impl<E: error::Error + 'static> error::Error for AddError<E> {
563 fn source(&self) -> Option<&(dyn error::Error + 'static)> {
564 None
565 }
566}
567
568pub trait ErrorSink<E>: fmt::Debug + Send + Sync + 'static {
571 fn sink(&self, error: E);
573
574 fn boxed_clone(&self) -> Box<dyn ErrorSink<E>>;
576}
577
578#[derive(Debug, Clone, Copy)]
580pub struct NopErrorSink;
581
582impl<E> ErrorSink<E> for NopErrorSink {
583 fn sink(&self, _: E) {}
584
585 fn boxed_clone(&self) -> Box<dyn ErrorSink<E>> {
586 Box::new(*self)
587 }
588}