redis/
client.rs

1use std::time::Duration;
2
3#[cfg(feature = "aio")]
4use crate::aio::{AsyncPushSender, DefaultAsyncDNSResolver};
5#[cfg(feature = "aio")]
6use crate::io::{tcp::TcpSettings, AsyncDNSResolver};
7use crate::{
8    connection::{connect, Connection, ConnectionInfo, ConnectionLike, IntoConnectionInfo},
9    types::{RedisResult, Value},
10};
11#[cfg(feature = "aio")]
12use std::pin::Pin;
13
14#[cfg(feature = "tls-rustls")]
15use crate::tls::{inner_build_with_tls, TlsCertificates};
16
17#[cfg(feature = "cache-aio")]
18use crate::caching::CacheConfig;
19#[cfg(all(feature = "cache-aio", feature = "connection-manager"))]
20use crate::caching::CacheManager;
21
22/// The client type.
23#[derive(Debug, Clone)]
24pub struct Client {
25    pub(crate) connection_info: ConnectionInfo,
26}
27
28/// The client acts as connector to the redis server.  By itself it does not
29/// do much other than providing a convenient way to fetch a connection from
30/// it.  In the future the plan is to provide a connection pool in the client.
31///
32/// When opening a client a URL in the following format should be used:
33///
34/// ```plain
35/// redis://host:port/db
36/// ```
37///
38/// Example usage::
39///
40/// ```rust,no_run
41/// let client = redis::Client::open("redis://127.0.0.1/").unwrap();
42/// let con = client.get_connection().unwrap();
43/// ```
44impl Client {
45    /// Connects to a redis server and returns a client.  This does not
46    /// actually open a connection yet but it does perform some basic
47    /// checks on the URL that might make the operation fail.
48    pub fn open<T: IntoConnectionInfo>(params: T) -> RedisResult<Client> {
49        Ok(Client {
50            connection_info: params.into_connection_info()?,
51        })
52    }
53
54    /// Instructs the client to actually connect to redis and returns a
55    /// connection object.  The connection object can be used to send
56    /// commands to the server.  This can fail with a variety of errors
57    /// (like unreachable host) so it's important that you handle those
58    /// errors.
59    pub fn get_connection(&self) -> RedisResult<Connection> {
60        connect(&self.connection_info, None)
61    }
62
63    /// Instructs the client to actually connect to redis with specified
64    /// timeout and returns a connection object.  The connection object
65    /// can be used to send commands to the server.  This can fail with
66    /// a variety of errors (like unreachable host) so it's important
67    /// that you handle those errors.
68    pub fn get_connection_with_timeout(&self, timeout: Duration) -> RedisResult<Connection> {
69        connect(&self.connection_info, Some(timeout))
70    }
71
72    /// Returns a reference of client connection info object.
73    pub fn get_connection_info(&self) -> &ConnectionInfo {
74        &self.connection_info
75    }
76
77    /// Constructs a new `Client` with parameters necessary to create a TLS connection.
78    ///
79    /// - `conn_info` - URL using the `rediss://` scheme.
80    /// - `tls_certs` - `TlsCertificates` structure containing:
81    ///     - `client_tls` - Optional `ClientTlsConfig` containing byte streams for
82    ///         - `client_cert` - client's byte stream containing client certificate in PEM format
83    ///         - `client_key` - client's byte stream containing private key in PEM format
84    ///     - `root_cert` - Optional byte stream yielding PEM formatted file for root certificates.
85    ///
86    /// If `ClientTlsConfig` ( cert+key pair ) is not provided, then client-side authentication is not enabled.
87    /// If `root_cert` is not provided, then system root certificates are used instead.
88    ///
89    /// # Examples
90    ///
91    /// ```no_run
92    /// use std::{fs::File, io::{BufReader, Read}};
93    ///
94    /// use redis::{Client, AsyncCommands as _, TlsCertificates, ClientTlsConfig};
95    ///
96    /// async fn do_redis_code(
97    ///     url: &str,
98    ///     root_cert_file: &str,
99    ///     cert_file: &str,
100    ///     key_file: &str
101    /// ) -> redis::RedisResult<()> {
102    ///     let root_cert_file = File::open(root_cert_file).expect("cannot open private cert file");
103    ///     let mut root_cert_vec = Vec::new();
104    ///     BufReader::new(root_cert_file)
105    ///         .read_to_end(&mut root_cert_vec)
106    ///         .expect("Unable to read ROOT cert file");
107    ///
108    ///     let cert_file = File::open(cert_file).expect("cannot open private cert file");
109    ///     let mut client_cert_vec = Vec::new();
110    ///     BufReader::new(cert_file)
111    ///         .read_to_end(&mut client_cert_vec)
112    ///         .expect("Unable to read client cert file");
113    ///
114    ///     let key_file = File::open(key_file).expect("cannot open private key file");
115    ///     let mut client_key_vec = Vec::new();
116    ///     BufReader::new(key_file)
117    ///         .read_to_end(&mut client_key_vec)
118    ///         .expect("Unable to read client key file");
119    ///
120    ///     let client = Client::build_with_tls(
121    ///         url,
122    ///         TlsCertificates {
123    ///             client_tls: Some(ClientTlsConfig{
124    ///                 client_cert: client_cert_vec,
125    ///                 client_key: client_key_vec,
126    ///             }),
127    ///             root_cert: Some(root_cert_vec),
128    ///         }
129    ///     )
130    ///     .expect("Unable to build client");
131    ///
132    ///     let connection_info = client.get_connection_info();
133    ///
134    ///     println!(">>> connection info: {connection_info:?}");
135    ///
136    ///     let mut con = client.get_multiplexed_async_connection().await?;
137    ///
138    ///     con.set("key1", b"foo").await?;
139    ///
140    ///     redis::cmd("SET")
141    ///         .arg(&["key2", "bar"])
142    ///         .exec_async(&mut con)
143    ///         .await?;
144    ///
145    ///     let result = redis::cmd("MGET")
146    ///         .arg(&["key1", "key2"])
147    ///         .query_async(&mut con)
148    ///         .await;
149    ///     assert_eq!(result, Ok(("foo".to_string(), b"bar".to_vec())));
150    ///     println!("Result from MGET: {result:?}");
151    ///
152    ///     Ok(())
153    /// }
154    /// ```
155    #[cfg(feature = "tls-rustls")]
156    pub fn build_with_tls<C: IntoConnectionInfo>(
157        conn_info: C,
158        tls_certs: TlsCertificates,
159    ) -> RedisResult<Client> {
160        let connection_info = conn_info.into_connection_info()?;
161
162        inner_build_with_tls(connection_info, &tls_certs)
163    }
164}
165
166#[cfg(feature = "cache-aio")]
167#[derive(Clone)]
168pub(crate) enum Cache {
169    Config(CacheConfig),
170    #[cfg(feature = "connection-manager")]
171    Manager(CacheManager),
172}
173
174/// Options for creation of async connection
175#[cfg(feature = "aio")]
176#[derive(Clone, Default)]
177pub struct AsyncConnectionConfig {
178    /// Maximum time to wait for a response from the server
179    pub(crate) response_timeout: Option<std::time::Duration>,
180    /// Maximum time to wait for a connection to be established
181    pub(crate) connection_timeout: Option<std::time::Duration>,
182    pub(crate) push_sender: Option<std::sync::Arc<dyn AsyncPushSender>>,
183    #[cfg(feature = "cache-aio")]
184    pub(crate) cache: Option<Cache>,
185    pub(crate) tcp_settings: TcpSettings,
186    pub(crate) dns_resolver: Option<std::sync::Arc<dyn AsyncDNSResolver>>,
187}
188
189#[cfg(feature = "aio")]
190impl AsyncConnectionConfig {
191    /// Creates a new instance of the options with nothing set
192    pub fn new() -> Self {
193        Self::default()
194    }
195
196    /// Sets the connection timeout
197    pub fn set_connection_timeout(mut self, connection_timeout: std::time::Duration) -> Self {
198        self.connection_timeout = Some(connection_timeout);
199        self
200    }
201
202    /// Sets the response timeout
203    pub fn set_response_timeout(mut self, response_timeout: std::time::Duration) -> Self {
204        self.response_timeout = Some(response_timeout);
205        self
206    }
207
208    /// Sets sender sender for push values.
209    ///
210    /// The sender can be a channel, or an arbitrary function that handles [crate::PushInfo] values.
211    /// This will fail client creation if the connection isn't configured for RESP3 communications via the [crate::RedisConnectionInfo::protocol] field.
212    ///
213    /// # Examples
214    ///
215    /// ```rust
216    /// # use redis::AsyncConnectionConfig;
217    /// let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
218    /// let config = AsyncConnectionConfig::new().set_push_sender(tx);
219    /// ```
220    ///
221    /// ```rust
222    /// # use std::sync::{Mutex, Arc};
223    /// # use redis::AsyncConnectionConfig;
224    /// let messages = Arc::new(Mutex::new(Vec::new()));
225    /// let config = AsyncConnectionConfig::new().set_push_sender(move |msg|{
226    ///     let Ok(mut messages) = messages.lock() else {
227    ///         return Err(redis::aio::SendError);
228    ///     };
229    ///     messages.push(msg);
230    ///     Ok(())
231    /// });
232    /// ```
233    pub fn set_push_sender(self, sender: impl AsyncPushSender) -> Self {
234        self.set_push_sender_internal(std::sync::Arc::new(sender))
235    }
236
237    pub(crate) fn set_push_sender_internal(
238        mut self,
239        sender: std::sync::Arc<dyn AsyncPushSender>,
240    ) -> Self {
241        self.push_sender = Some(sender);
242        self
243    }
244
245    /// Sets cache config for MultiplexedConnection, check CacheConfig for more details.
246    #[cfg(feature = "cache-aio")]
247    pub fn set_cache_config(mut self, cache_config: CacheConfig) -> Self {
248        self.cache = Some(Cache::Config(cache_config));
249        self
250    }
251
252    #[cfg(all(feature = "cache-aio", feature = "connection-manager"))]
253    pub(crate) fn set_cache_manager(mut self, cache_manager: CacheManager) -> Self {
254        self.cache = Some(Cache::Manager(cache_manager));
255        self
256    }
257
258    /// Set the behavior of the underlying TCP connection.
259    pub fn set_tcp_settings(self, tcp_settings: crate::io::tcp::TcpSettings) -> Self {
260        Self {
261            tcp_settings,
262            ..self
263        }
264    }
265
266    /// Set the DNS resolver for the underlying TCP connection.
267    ///
268    /// The parameter resolver must implement the [`crate::io::AsyncDNSResolver`] trait.
269    pub fn set_dns_resolver(self, dns_resolver: impl AsyncDNSResolver) -> Self {
270        self.set_dns_resolver_internal(std::sync::Arc::new(dns_resolver))
271    }
272
273    pub(super) fn set_dns_resolver_internal(
274        mut self,
275        dns_resolver: std::sync::Arc<dyn AsyncDNSResolver>,
276    ) -> Self {
277        self.dns_resolver = Some(dns_resolver);
278        self
279    }
280}
281
282/// To enable async support you need to chose one of the supported runtimes and active its
283/// corresponding feature: `tokio-comp` or `async-std-comp`
284#[cfg(feature = "aio")]
285#[cfg_attr(docsrs, doc(cfg(feature = "aio")))]
286impl Client {
287    /// Returns an async connection from the client.
288    #[cfg(feature = "aio")]
289    #[cfg_attr(docsrs, doc(cfg(feature = "aio")))]
290    pub async fn get_multiplexed_async_connection(
291        &self,
292    ) -> RedisResult<crate::aio::MultiplexedConnection> {
293        self.get_multiplexed_async_connection_with_config(&AsyncConnectionConfig::new())
294            .await
295    }
296
297    /// Returns an async connection from the client.
298    #[cfg(feature = "aio")]
299    #[cfg_attr(docsrs, doc(cfg(feature = "aio")))]
300    #[deprecated(note = "Use `get_multiplexed_async_connection_with_config` instead")]
301    pub async fn get_multiplexed_async_connection_with_timeouts(
302        &self,
303        response_timeout: std::time::Duration,
304        connection_timeout: std::time::Duration,
305    ) -> RedisResult<crate::aio::MultiplexedConnection> {
306        self.get_multiplexed_async_connection_with_config(
307            &AsyncConnectionConfig::new()
308                .set_connection_timeout(connection_timeout)
309                .set_response_timeout(response_timeout),
310        )
311        .await
312    }
313
314    /// Returns an async connection from the client.
315    #[cfg(feature = "aio")]
316    #[cfg_attr(docsrs, doc(cfg(feature = "aio")))]
317    pub async fn get_multiplexed_async_connection_with_config(
318        &self,
319        config: &AsyncConnectionConfig,
320    ) -> RedisResult<crate::aio::MultiplexedConnection> {
321        match Runtime::locate() {
322            #[cfg(feature = "tokio-comp")]
323            rt @ Runtime::Tokio => self
324                .get_multiplexed_async_connection_inner_with_timeout::<crate::aio::tokio::Tokio>(
325                    config, rt,
326                )
327                .await,
328
329            #[cfg(feature = "async-std-comp")]
330            rt @ Runtime::AsyncStd => self.get_multiplexed_async_connection_inner_with_timeout::<
331                crate::aio::async_std::AsyncStd,
332            >(config, rt)
333            .await,
334
335            #[cfg(feature = "smol-comp")]
336            rt @ Runtime::Smol => self.get_multiplexed_async_connection_inner_with_timeout::<
337                crate::aio::smol::Smol,
338            >(config, rt)
339            .await,
340        }
341    }
342
343    /// Returns an async multiplexed connection from the client.
344    ///
345    /// A multiplexed connection can be cloned, allowing requests to be sent concurrently
346    /// on the same underlying connection (tcp/unix socket).
347    #[cfg(feature = "tokio-comp")]
348    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-comp")))]
349    pub async fn get_multiplexed_tokio_connection_with_response_timeouts(
350        &self,
351        response_timeout: std::time::Duration,
352        connection_timeout: std::time::Duration,
353    ) -> RedisResult<crate::aio::MultiplexedConnection> {
354        let result = Runtime::locate()
355            .timeout(
356                connection_timeout,
357                self.get_multiplexed_async_connection_inner::<crate::aio::tokio::Tokio>(
358                    &AsyncConnectionConfig::new().set_response_timeout(response_timeout),
359                ),
360            )
361            .await;
362
363        match result {
364            Ok(Ok(connection)) => Ok(connection),
365            Ok(Err(e)) => Err(e),
366            Err(elapsed) => Err(elapsed.into()),
367        }
368    }
369
370    /// Returns an async multiplexed connection from the client.
371    ///
372    /// A multiplexed connection can be cloned, allowing requests to be sent concurrently
373    /// on the same underlying connection (tcp/unix socket).
374    #[cfg(feature = "tokio-comp")]
375    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-comp")))]
376    pub async fn get_multiplexed_tokio_connection(
377        &self,
378    ) -> RedisResult<crate::aio::MultiplexedConnection> {
379        self.get_multiplexed_async_connection_inner::<crate::aio::tokio::Tokio>(
380            &AsyncConnectionConfig::new(),
381        )
382        .await
383    }
384
385    /// Returns an async multiplexed connection from the client.
386    ///
387    /// A multiplexed connection can be cloned, allowing requests to be sent concurrently
388    /// on the same underlying connection (tcp/unix socket).
389    #[cfg(feature = "async-std-comp")]
390    #[cfg_attr(docsrs, doc(cfg(feature = "async-std-comp")))]
391    pub async fn get_multiplexed_async_std_connection_with_timeouts(
392        &self,
393        response_timeout: std::time::Duration,
394        connection_timeout: std::time::Duration,
395    ) -> RedisResult<crate::aio::MultiplexedConnection> {
396        let result = Runtime::locate()
397            .timeout(
398                connection_timeout,
399                self.get_multiplexed_async_connection_inner::<crate::aio::async_std::AsyncStd>(
400                    &AsyncConnectionConfig::new().set_response_timeout(response_timeout),
401                ),
402            )
403            .await;
404
405        match result {
406            Ok(Ok(connection)) => Ok(connection),
407            Ok(Err(e)) => Err(e),
408            Err(elapsed) => Err(elapsed.into()),
409        }
410    }
411
412    /// Returns an async multiplexed connection from the client.
413    ///
414    /// A multiplexed connection can be cloned, allowing requests to be sent concurrently
415    /// on the same underlying connection (tcp/unix socket).
416    #[cfg(feature = "async-std-comp")]
417    #[cfg_attr(docsrs, doc(cfg(feature = "async-std-comp")))]
418    pub async fn get_multiplexed_async_std_connection(
419        &self,
420    ) -> RedisResult<crate::aio::MultiplexedConnection> {
421        self.get_multiplexed_async_connection_inner::<crate::aio::async_std::AsyncStd>(
422            &AsyncConnectionConfig::new(),
423        )
424        .await
425    }
426
427    /// Returns an async multiplexed connection from the client and a future which must be polled
428    /// to drive any requests submitted to it (see [Self::get_multiplexed_async_connection]).
429    ///
430    /// A multiplexed connection can be cloned, allowing requests to be sent concurrently
431    /// on the same underlying connection (tcp/unix socket).
432    /// The multiplexer will return a timeout error on any request that takes longer then `response_timeout`.
433    #[cfg(feature = "tokio-comp")]
434    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-comp")))]
435    pub async fn create_multiplexed_tokio_connection_with_response_timeout(
436        &self,
437        response_timeout: std::time::Duration,
438    ) -> RedisResult<(
439        crate::aio::MultiplexedConnection,
440        impl std::future::Future<Output = ()>,
441    )> {
442        self.create_multiplexed_async_connection_inner::<crate::aio::tokio::Tokio>(
443            &AsyncConnectionConfig::new().set_response_timeout(response_timeout),
444        )
445        .await
446    }
447
448    /// Returns an async multiplexed connection from the client and a future which must be polled
449    /// to drive any requests submitted to it (see [Self::get_multiplexed_async_connection]).
450    ///
451    /// A multiplexed connection can be cloned, allowing requests to be sent concurrently
452    /// on the same underlying connection (tcp/unix socket).
453    #[cfg(feature = "tokio-comp")]
454    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-comp")))]
455    pub async fn create_multiplexed_tokio_connection(
456        &self,
457    ) -> RedisResult<(
458        crate::aio::MultiplexedConnection,
459        impl std::future::Future<Output = ()>,
460    )> {
461        self.create_multiplexed_async_connection_inner::<crate::aio::tokio::Tokio>(
462            &AsyncConnectionConfig::new(),
463        )
464        .await
465    }
466
467    /// Returns an async multiplexed connection from the client and a future which must be polled
468    /// to drive any requests submitted to it (see [Self::get_multiplexed_async_connection]).
469    ///
470    /// A multiplexed connection can be cloned, allowing requests to be sent concurrently
471    /// on the same underlying connection (tcp/unix socket).
472    /// The multiplexer will return a timeout error on any request that takes longer then `response_timeout`.
473    #[cfg(feature = "async-std-comp")]
474    #[cfg_attr(docsrs, doc(cfg(feature = "async-std-comp")))]
475    pub async fn create_multiplexed_async_std_connection_with_response_timeout(
476        &self,
477        response_timeout: std::time::Duration,
478    ) -> RedisResult<(
479        crate::aio::MultiplexedConnection,
480        impl std::future::Future<Output = ()>,
481    )> {
482        self.create_multiplexed_async_connection_inner::<crate::aio::async_std::AsyncStd>(
483            &AsyncConnectionConfig::new().set_response_timeout(response_timeout),
484        )
485        .await
486    }
487
488    /// Returns an async multiplexed connection from the client and a future which must be polled
489    /// to drive any requests submitted to it (see [Self::get_multiplexed_async_connection]).
490    ///
491    /// A multiplexed connection can be cloned, allowing requests to be sent concurrently
492    /// on the same underlying connection (tcp/unix socket).
493    #[cfg(feature = "async-std-comp")]
494    #[cfg_attr(docsrs, doc(cfg(feature = "async-std-comp")))]
495    pub async fn create_multiplexed_async_std_connection(
496        &self,
497    ) -> RedisResult<(
498        crate::aio::MultiplexedConnection,
499        impl std::future::Future<Output = ()>,
500    )> {
501        self.create_multiplexed_async_connection_inner::<crate::aio::async_std::AsyncStd>(
502            &AsyncConnectionConfig::new(),
503        )
504        .await
505    }
506
507    /// Returns an async [`ConnectionManager`][connection-manager] from the client.
508    ///
509    /// The connection manager wraps a
510    /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
511    /// connection fails with a connection error, then a new connection is
512    /// established in the background and the error is returned to the caller.
513    ///
514    /// This means that on connection loss at least one command will fail, but
515    /// the connection will be re-established automatically if possible. Please
516    /// refer to the [`ConnectionManager`][connection-manager] docs for
517    /// detailed reconnecting behavior.
518    ///
519    /// A connection manager can be cloned, allowing requests to be sent concurrently
520    /// on the same underlying connection (tcp/unix socket).
521    ///
522    /// [connection-manager]: aio/struct.ConnectionManager.html
523    /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
524    #[cfg(feature = "connection-manager")]
525    #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
526    #[deprecated(note = "use get_connection_manager instead")]
527    pub async fn get_tokio_connection_manager(&self) -> RedisResult<crate::aio::ConnectionManager> {
528        crate::aio::ConnectionManager::new(self.clone()).await
529    }
530
531    /// Returns an async [`ConnectionManager`][connection-manager] from the client.
532    ///
533    /// The connection manager wraps a
534    /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
535    /// connection fails with a connection error, then a new connection is
536    /// established in the background and the error is returned to the caller.
537    ///
538    /// This means that on connection loss at least one command will fail, but
539    /// the connection will be re-established automatically if possible. Please
540    /// refer to the [`ConnectionManager`][connection-manager] docs for
541    /// detailed reconnecting behavior.
542    ///
543    /// A connection manager can be cloned, allowing requests to be sent concurrently
544    /// on the same underlying connection (tcp/unix socket).
545    ///
546    /// [connection-manager]: aio/struct.ConnectionManager.html
547    /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
548    #[cfg(feature = "connection-manager")]
549    #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
550    pub async fn get_connection_manager(&self) -> RedisResult<crate::aio::ConnectionManager> {
551        crate::aio::ConnectionManager::new(self.clone()).await
552    }
553
554    /// Returns an async [`ConnectionManager`][connection-manager] from the client.
555    ///
556    /// The connection manager wraps a
557    /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
558    /// connection fails with a connection error, then a new connection is
559    /// established in the background and the error is returned to the caller.
560    ///
561    /// This means that on connection loss at least one command will fail, but
562    /// the connection will be re-established automatically if possible. Please
563    /// refer to the [`ConnectionManager`][connection-manager] docs for
564    /// detailed reconnecting behavior.
565    ///
566    /// A connection manager can be cloned, allowing requests to be sent concurrently
567    /// on the same underlying connection (tcp/unix socket).
568    ///
569    /// [connection-manager]: aio/struct.ConnectionManager.html
570    /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
571    #[cfg(feature = "connection-manager")]
572    #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
573    #[deprecated(note = "Use `get_connection_manager_with_config` instead")]
574    pub async fn get_tokio_connection_manager_with_backoff(
575        &self,
576        exponent_base: u64,
577        factor: u64,
578        number_of_retries: usize,
579    ) -> RedisResult<crate::aio::ConnectionManager> {
580        use crate::aio::ConnectionManagerConfig;
581
582        let config = ConnectionManagerConfig::new()
583            .set_exponent_base(exponent_base)
584            .set_factor(factor)
585            .set_number_of_retries(number_of_retries);
586        crate::aio::ConnectionManager::new_with_config(self.clone(), config).await
587    }
588
589    /// Returns an async [`ConnectionManager`][connection-manager] from the client.
590    ///
591    /// The connection manager wraps a
592    /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
593    /// connection fails with a connection error, then a new connection is
594    /// established in the background and the error is returned to the caller.
595    ///
596    /// This means that on connection loss at least one command will fail, but
597    /// the connection will be re-established automatically if possible. Please
598    /// refer to the [`ConnectionManager`][connection-manager] docs for
599    /// detailed reconnecting behavior.
600    ///
601    /// A connection manager can be cloned, allowing requests to be sent concurrently
602    /// on the same underlying connection (tcp/unix socket).
603    ///
604    /// [connection-manager]: aio/struct.ConnectionManager.html
605    /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
606    #[cfg(feature = "connection-manager")]
607    #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
608    #[deprecated(note = "Use `get_connection_manager_with_config` instead")]
609    pub async fn get_tokio_connection_manager_with_backoff_and_timeouts(
610        &self,
611        exponent_base: u64,
612        factor: u64,
613        number_of_retries: usize,
614        response_timeout: std::time::Duration,
615        connection_timeout: std::time::Duration,
616    ) -> RedisResult<crate::aio::ConnectionManager> {
617        use crate::aio::ConnectionManagerConfig;
618
619        let config = ConnectionManagerConfig::new()
620            .set_exponent_base(exponent_base)
621            .set_factor(factor)
622            .set_response_timeout(response_timeout)
623            .set_connection_timeout(connection_timeout)
624            .set_number_of_retries(number_of_retries);
625        crate::aio::ConnectionManager::new_with_config(self.clone(), config).await
626    }
627
628    /// Returns an async [`ConnectionManager`][connection-manager] from the client.
629    ///
630    /// The connection manager wraps a
631    /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
632    /// connection fails with a connection error, then a new connection is
633    /// established in the background and the error is returned to the caller.
634    ///
635    /// This means that on connection loss at least one command will fail, but
636    /// the connection will be re-established automatically if possible. Please
637    /// refer to the [`ConnectionManager`][connection-manager] docs for
638    /// detailed reconnecting behavior.
639    ///
640    /// A connection manager can be cloned, allowing requests to be sent concurrently
641    /// on the same underlying connection (tcp/unix socket).
642    ///
643    /// [connection-manager]: aio/struct.ConnectionManager.html
644    /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
645    #[cfg(feature = "connection-manager")]
646    #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
647    #[deprecated(note = "Use `get_connection_manager_with_config` instead")]
648    pub async fn get_connection_manager_with_backoff_and_timeouts(
649        &self,
650        exponent_base: u64,
651        factor: u64,
652        number_of_retries: usize,
653        response_timeout: std::time::Duration,
654        connection_timeout: std::time::Duration,
655    ) -> RedisResult<crate::aio::ConnectionManager> {
656        use crate::aio::ConnectionManagerConfig;
657
658        let config = ConnectionManagerConfig::new()
659            .set_exponent_base(exponent_base)
660            .set_factor(factor)
661            .set_response_timeout(response_timeout)
662            .set_connection_timeout(connection_timeout)
663            .set_number_of_retries(number_of_retries);
664        crate::aio::ConnectionManager::new_with_config(self.clone(), config).await
665    }
666
667    /// Returns an async [`ConnectionManager`][connection-manager] from the client.
668    ///
669    /// The connection manager wraps a
670    /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
671    /// connection fails with a connection error, then a new connection is
672    /// established in the background and the error is returned to the caller.
673    ///
674    /// This means that on connection loss at least one command will fail, but
675    /// the connection will be re-established automatically if possible. Please
676    /// refer to the [`ConnectionManager`][connection-manager] docs for
677    /// detailed reconnecting behavior.
678    ///
679    /// A connection manager can be cloned, allowing requests to be sent concurrently
680    /// on the same underlying connection (tcp/unix socket).
681    ///
682    /// [connection-manager]: aio/struct.ConnectionManager.html
683    /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
684    #[cfg(feature = "connection-manager")]
685    #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
686    pub async fn get_connection_manager_with_config(
687        &self,
688        config: crate::aio::ConnectionManagerConfig,
689    ) -> RedisResult<crate::aio::ConnectionManager> {
690        crate::aio::ConnectionManager::new_with_config(self.clone(), config).await
691    }
692
693    /// Returns an async [`ConnectionManager`][connection-manager] from the client.
694    ///
695    /// The connection manager wraps a
696    /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
697    /// connection fails with a connection error, then a new connection is
698    /// established in the background and the error is returned to the caller.
699    ///
700    /// This means that on connection loss at least one command will fail, but
701    /// the connection will be re-established automatically if possible. Please
702    /// refer to the [`ConnectionManager`][connection-manager] docs for
703    /// detailed reconnecting behavior.
704    ///
705    /// A connection manager can be cloned, allowing requests to be be sent concurrently
706    /// on the same underlying connection (tcp/unix socket).
707    ///
708    /// [connection-manager]: aio/struct.ConnectionManager.html
709    /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
710    #[cfg(feature = "connection-manager")]
711    #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
712    #[deprecated(note = "Use `get_connection_manager_with_config` instead")]
713    pub async fn get_connection_manager_with_backoff(
714        &self,
715        exponent_base: u64,
716        factor: u64,
717        number_of_retries: usize,
718    ) -> RedisResult<crate::aio::ConnectionManager> {
719        use crate::aio::ConnectionManagerConfig;
720
721        let config = ConnectionManagerConfig::new()
722            .set_exponent_base(exponent_base)
723            .set_factor(factor)
724            .set_number_of_retries(number_of_retries);
725        crate::aio::ConnectionManager::new_with_config(self.clone(), config).await
726    }
727
728    async fn get_multiplexed_async_connection_inner_with_timeout<T>(
729        &self,
730        config: &AsyncConnectionConfig,
731        rt: Runtime,
732    ) -> RedisResult<crate::aio::MultiplexedConnection>
733    where
734        T: crate::aio::RedisRuntime,
735    {
736        let result = if let Some(connection_timeout) = config.connection_timeout {
737            rt.timeout(
738                connection_timeout,
739                self.get_multiplexed_async_connection_inner::<T>(config),
740            )
741            .await
742        } else {
743            Ok(self
744                .get_multiplexed_async_connection_inner::<T>(config)
745                .await)
746        };
747
748        match result {
749            Ok(Ok(connection)) => Ok(connection),
750            Ok(Err(e)) => Err(e),
751            Err(elapsed) => Err(elapsed.into()),
752        }
753    }
754
755    async fn get_multiplexed_async_connection_inner<T>(
756        &self,
757        config: &AsyncConnectionConfig,
758    ) -> RedisResult<crate::aio::MultiplexedConnection>
759    where
760        T: crate::aio::RedisRuntime,
761    {
762        let (mut connection, driver) = self
763            .create_multiplexed_async_connection_inner::<T>(config)
764            .await?;
765        let handle = T::spawn(driver);
766        connection.set_task_handle(handle);
767        Ok(connection)
768    }
769
770    async fn create_multiplexed_async_connection_inner<T>(
771        &self,
772        config: &AsyncConnectionConfig,
773    ) -> RedisResult<(
774        crate::aio::MultiplexedConnection,
775        impl std::future::Future<Output = ()>,
776    )>
777    where
778        T: crate::aio::RedisRuntime,
779    {
780        let resolver = config
781            .dns_resolver
782            .as_deref()
783            .unwrap_or(&DefaultAsyncDNSResolver);
784        let con = self
785            .get_simple_async_connection::<T>(resolver, &config.tcp_settings)
786            .await?;
787        crate::aio::MultiplexedConnection::new_with_config(
788            &self.connection_info.redis,
789            con,
790            config.clone(),
791        )
792        .await
793    }
794
795    async fn get_simple_async_connection_dynamically(
796        &self,
797        dns_resolver: &dyn AsyncDNSResolver,
798        tcp_settings: &TcpSettings,
799    ) -> RedisResult<Pin<Box<dyn crate::aio::AsyncStream + Send + Sync>>> {
800        match Runtime::locate() {
801            #[cfg(feature = "tokio-comp")]
802            Runtime::Tokio => {
803                self.get_simple_async_connection::<crate::aio::tokio::Tokio>(
804                    dns_resolver,
805                    tcp_settings,
806                )
807                .await
808            }
809
810            #[cfg(feature = "async-std-comp")]
811            Runtime::AsyncStd => {
812                self.get_simple_async_connection::<crate::aio::async_std::AsyncStd>(
813                    dns_resolver,
814                    tcp_settings,
815                )
816                .await
817            }
818
819            #[cfg(feature = "smol-comp")]
820            Runtime::Smol => {
821                self.get_simple_async_connection::<crate::aio::smol::Smol>(
822                    dns_resolver,
823                    tcp_settings,
824                )
825                .await
826            }
827        }
828    }
829
830    async fn get_simple_async_connection<T>(
831        &self,
832        dns_resolver: &dyn AsyncDNSResolver,
833        tcp_settings: &TcpSettings,
834    ) -> RedisResult<Pin<Box<dyn crate::aio::AsyncStream + Send + Sync>>>
835    where
836        T: crate::aio::RedisRuntime,
837    {
838        Ok(
839            crate::aio::connect_simple::<T>(&self.connection_info, dns_resolver, tcp_settings)
840                .await?
841                .boxed(),
842        )
843    }
844
845    #[cfg(feature = "connection-manager")]
846    pub(crate) fn connection_info(&self) -> &ConnectionInfo {
847        &self.connection_info
848    }
849
850    /// Returns an async receiver for pub-sub messages.
851    #[cfg(feature = "aio")]
852    // TODO - do we want to type-erase pubsub using a trait, to allow us to replace it with a different implementation later?
853    pub async fn get_async_pubsub(&self) -> RedisResult<crate::aio::PubSub> {
854        let connection = self
855            .get_simple_async_connection_dynamically(
856                &DefaultAsyncDNSResolver,
857                &TcpSettings::default(),
858            )
859            .await?;
860
861        crate::aio::PubSub::new(&self.connection_info.redis, connection).await
862    }
863
864    /// Returns an async receiver for monitor messages.
865    #[cfg(feature = "aio")]
866    pub async fn get_async_monitor(&self) -> RedisResult<crate::aio::Monitor> {
867        let connection = self
868            .get_simple_async_connection_dynamically(
869                &DefaultAsyncDNSResolver,
870                &TcpSettings::default(),
871            )
872            .await?;
873        crate::aio::Monitor::new(&self.connection_info.redis, connection).await
874    }
875}
876
877#[cfg(feature = "aio")]
878use crate::aio::Runtime;
879
880impl ConnectionLike for Client {
881    fn req_packed_command(&mut self, cmd: &[u8]) -> RedisResult<Value> {
882        self.get_connection()?.req_packed_command(cmd)
883    }
884
885    fn req_packed_commands(
886        &mut self,
887        cmd: &[u8],
888        offset: usize,
889        count: usize,
890    ) -> RedisResult<Vec<Value>> {
891        self.get_connection()?
892            .req_packed_commands(cmd, offset, count)
893    }
894
895    fn get_db(&self) -> i64 {
896        self.connection_info.redis.db
897    }
898
899    fn check_connection(&mut self) -> bool {
900        if let Ok(mut conn) = self.get_connection() {
901            conn.check_connection()
902        } else {
903            false
904        }
905    }
906
907    fn is_open(&self) -> bool {
908        if let Ok(conn) = self.get_connection() {
909            conn.is_open()
910        } else {
911            false
912        }
913    }
914}
915
916#[cfg(test)]
917mod test {
918    use super::*;
919
920    #[test]
921    fn regression_293_parse_ipv6_with_interface() {
922        assert!(Client::open(("fe80::cafe:beef%eno1", 6379)).is_ok());
923    }
924}