redis/client.rs
1use std::time::Duration;
2
3#[cfg(feature = "aio")]
4use crate::aio::{AsyncPushSender, DefaultAsyncDNSResolver};
5#[cfg(feature = "aio")]
6use crate::io::{tcp::TcpSettings, AsyncDNSResolver};
7use crate::{
8 connection::{connect, Connection, ConnectionInfo, ConnectionLike, IntoConnectionInfo},
9 types::{RedisResult, Value},
10};
11#[cfg(feature = "aio")]
12use std::pin::Pin;
13
14#[cfg(feature = "tls-rustls")]
15use crate::tls::{inner_build_with_tls, TlsCertificates};
16
17#[cfg(feature = "cache-aio")]
18use crate::caching::CacheConfig;
19#[cfg(all(feature = "cache-aio", feature = "connection-manager"))]
20use crate::caching::CacheManager;
21
22/// The client type.
23#[derive(Debug, Clone)]
24pub struct Client {
25 pub(crate) connection_info: ConnectionInfo,
26}
27
28/// The client acts as connector to the redis server. By itself it does not
29/// do much other than providing a convenient way to fetch a connection from
30/// it. In the future the plan is to provide a connection pool in the client.
31///
32/// When opening a client a URL in the following format should be used:
33///
34/// ```plain
35/// redis://host:port/db
36/// ```
37///
38/// Example usage::
39///
40/// ```rust,no_run
41/// let client = redis::Client::open("redis://127.0.0.1/").unwrap();
42/// let con = client.get_connection().unwrap();
43/// ```
44impl Client {
45 /// Connects to a redis server and returns a client. This does not
46 /// actually open a connection yet but it does perform some basic
47 /// checks on the URL that might make the operation fail.
48 pub fn open<T: IntoConnectionInfo>(params: T) -> RedisResult<Client> {
49 Ok(Client {
50 connection_info: params.into_connection_info()?,
51 })
52 }
53
54 /// Instructs the client to actually connect to redis and returns a
55 /// connection object. The connection object can be used to send
56 /// commands to the server. This can fail with a variety of errors
57 /// (like unreachable host) so it's important that you handle those
58 /// errors.
59 pub fn get_connection(&self) -> RedisResult<Connection> {
60 connect(&self.connection_info, None)
61 }
62
63 /// Instructs the client to actually connect to redis with specified
64 /// timeout and returns a connection object. The connection object
65 /// can be used to send commands to the server. This can fail with
66 /// a variety of errors (like unreachable host) so it's important
67 /// that you handle those errors.
68 pub fn get_connection_with_timeout(&self, timeout: Duration) -> RedisResult<Connection> {
69 connect(&self.connection_info, Some(timeout))
70 }
71
72 /// Returns a reference of client connection info object.
73 pub fn get_connection_info(&self) -> &ConnectionInfo {
74 &self.connection_info
75 }
76
77 /// Constructs a new `Client` with parameters necessary to create a TLS connection.
78 ///
79 /// - `conn_info` - URL using the `rediss://` scheme.
80 /// - `tls_certs` - `TlsCertificates` structure containing:
81 /// - `client_tls` - Optional `ClientTlsConfig` containing byte streams for
82 /// - `client_cert` - client's byte stream containing client certificate in PEM format
83 /// - `client_key` - client's byte stream containing private key in PEM format
84 /// - `root_cert` - Optional byte stream yielding PEM formatted file for root certificates.
85 ///
86 /// If `ClientTlsConfig` ( cert+key pair ) is not provided, then client-side authentication is not enabled.
87 /// If `root_cert` is not provided, then system root certificates are used instead.
88 ///
89 /// # Examples
90 ///
91 /// ```no_run
92 /// use std::{fs::File, io::{BufReader, Read}};
93 ///
94 /// use redis::{Client, AsyncCommands as _, TlsCertificates, ClientTlsConfig};
95 ///
96 /// async fn do_redis_code(
97 /// url: &str,
98 /// root_cert_file: &str,
99 /// cert_file: &str,
100 /// key_file: &str
101 /// ) -> redis::RedisResult<()> {
102 /// let root_cert_file = File::open(root_cert_file).expect("cannot open private cert file");
103 /// let mut root_cert_vec = Vec::new();
104 /// BufReader::new(root_cert_file)
105 /// .read_to_end(&mut root_cert_vec)
106 /// .expect("Unable to read ROOT cert file");
107 ///
108 /// let cert_file = File::open(cert_file).expect("cannot open private cert file");
109 /// let mut client_cert_vec = Vec::new();
110 /// BufReader::new(cert_file)
111 /// .read_to_end(&mut client_cert_vec)
112 /// .expect("Unable to read client cert file");
113 ///
114 /// let key_file = File::open(key_file).expect("cannot open private key file");
115 /// let mut client_key_vec = Vec::new();
116 /// BufReader::new(key_file)
117 /// .read_to_end(&mut client_key_vec)
118 /// .expect("Unable to read client key file");
119 ///
120 /// let client = Client::build_with_tls(
121 /// url,
122 /// TlsCertificates {
123 /// client_tls: Some(ClientTlsConfig{
124 /// client_cert: client_cert_vec,
125 /// client_key: client_key_vec,
126 /// }),
127 /// root_cert: Some(root_cert_vec),
128 /// }
129 /// )
130 /// .expect("Unable to build client");
131 ///
132 /// let connection_info = client.get_connection_info();
133 ///
134 /// println!(">>> connection info: {connection_info:?}");
135 ///
136 /// let mut con = client.get_multiplexed_async_connection().await?;
137 ///
138 /// con.set("key1", b"foo").await?;
139 ///
140 /// redis::cmd("SET")
141 /// .arg(&["key2", "bar"])
142 /// .exec_async(&mut con)
143 /// .await?;
144 ///
145 /// let result = redis::cmd("MGET")
146 /// .arg(&["key1", "key2"])
147 /// .query_async(&mut con)
148 /// .await;
149 /// assert_eq!(result, Ok(("foo".to_string(), b"bar".to_vec())));
150 /// println!("Result from MGET: {result:?}");
151 ///
152 /// Ok(())
153 /// }
154 /// ```
155 #[cfg(feature = "tls-rustls")]
156 pub fn build_with_tls<C: IntoConnectionInfo>(
157 conn_info: C,
158 tls_certs: TlsCertificates,
159 ) -> RedisResult<Client> {
160 let connection_info = conn_info.into_connection_info()?;
161
162 inner_build_with_tls(connection_info, &tls_certs)
163 }
164}
165
166#[cfg(feature = "cache-aio")]
167#[derive(Clone)]
168pub(crate) enum Cache {
169 Config(CacheConfig),
170 #[cfg(feature = "connection-manager")]
171 Manager(CacheManager),
172}
173
174/// Options for creation of async connection
175#[cfg(feature = "aio")]
176#[derive(Clone, Default)]
177pub struct AsyncConnectionConfig {
178 /// Maximum time to wait for a response from the server
179 pub(crate) response_timeout: Option<std::time::Duration>,
180 /// Maximum time to wait for a connection to be established
181 pub(crate) connection_timeout: Option<std::time::Duration>,
182 pub(crate) push_sender: Option<std::sync::Arc<dyn AsyncPushSender>>,
183 #[cfg(feature = "cache-aio")]
184 pub(crate) cache: Option<Cache>,
185 pub(crate) tcp_settings: TcpSettings,
186 pub(crate) dns_resolver: Option<std::sync::Arc<dyn AsyncDNSResolver>>,
187}
188
189#[cfg(feature = "aio")]
190impl AsyncConnectionConfig {
191 /// Creates a new instance of the options with nothing set
192 pub fn new() -> Self {
193 Self::default()
194 }
195
196 /// Sets the connection timeout
197 pub fn set_connection_timeout(mut self, connection_timeout: std::time::Duration) -> Self {
198 self.connection_timeout = Some(connection_timeout);
199 self
200 }
201
202 /// Sets the response timeout
203 pub fn set_response_timeout(mut self, response_timeout: std::time::Duration) -> Self {
204 self.response_timeout = Some(response_timeout);
205 self
206 }
207
208 /// Sets sender sender for push values.
209 ///
210 /// The sender can be a channel, or an arbitrary function that handles [crate::PushInfo] values.
211 /// This will fail client creation if the connection isn't configured for RESP3 communications via the [crate::RedisConnectionInfo::protocol] field.
212 ///
213 /// # Examples
214 ///
215 /// ```rust
216 /// # use redis::AsyncConnectionConfig;
217 /// let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
218 /// let config = AsyncConnectionConfig::new().set_push_sender(tx);
219 /// ```
220 ///
221 /// ```rust
222 /// # use std::sync::{Mutex, Arc};
223 /// # use redis::AsyncConnectionConfig;
224 /// let messages = Arc::new(Mutex::new(Vec::new()));
225 /// let config = AsyncConnectionConfig::new().set_push_sender(move |msg|{
226 /// let Ok(mut messages) = messages.lock() else {
227 /// return Err(redis::aio::SendError);
228 /// };
229 /// messages.push(msg);
230 /// Ok(())
231 /// });
232 /// ```
233 pub fn set_push_sender(self, sender: impl AsyncPushSender) -> Self {
234 self.set_push_sender_internal(std::sync::Arc::new(sender))
235 }
236
237 pub(crate) fn set_push_sender_internal(
238 mut self,
239 sender: std::sync::Arc<dyn AsyncPushSender>,
240 ) -> Self {
241 self.push_sender = Some(sender);
242 self
243 }
244
245 /// Sets cache config for MultiplexedConnection, check CacheConfig for more details.
246 #[cfg(feature = "cache-aio")]
247 pub fn set_cache_config(mut self, cache_config: CacheConfig) -> Self {
248 self.cache = Some(Cache::Config(cache_config));
249 self
250 }
251
252 #[cfg(all(feature = "cache-aio", feature = "connection-manager"))]
253 pub(crate) fn set_cache_manager(mut self, cache_manager: CacheManager) -> Self {
254 self.cache = Some(Cache::Manager(cache_manager));
255 self
256 }
257
258 /// Set the behavior of the underlying TCP connection.
259 pub fn set_tcp_settings(self, tcp_settings: crate::io::tcp::TcpSettings) -> Self {
260 Self {
261 tcp_settings,
262 ..self
263 }
264 }
265
266 /// Set the DNS resolver for the underlying TCP connection.
267 ///
268 /// The parameter resolver must implement the [`crate::io::AsyncDNSResolver`] trait.
269 pub fn set_dns_resolver(self, dns_resolver: impl AsyncDNSResolver) -> Self {
270 self.set_dns_resolver_internal(std::sync::Arc::new(dns_resolver))
271 }
272
273 pub(super) fn set_dns_resolver_internal(
274 mut self,
275 dns_resolver: std::sync::Arc<dyn AsyncDNSResolver>,
276 ) -> Self {
277 self.dns_resolver = Some(dns_resolver);
278 self
279 }
280}
281
282/// To enable async support you need to chose one of the supported runtimes and active its
283/// corresponding feature: `tokio-comp` or `async-std-comp`
284#[cfg(feature = "aio")]
285#[cfg_attr(docsrs, doc(cfg(feature = "aio")))]
286impl Client {
287 /// Returns an async connection from the client.
288 #[cfg(feature = "aio")]
289 #[cfg_attr(docsrs, doc(cfg(feature = "aio")))]
290 pub async fn get_multiplexed_async_connection(
291 &self,
292 ) -> RedisResult<crate::aio::MultiplexedConnection> {
293 self.get_multiplexed_async_connection_with_config(&AsyncConnectionConfig::new())
294 .await
295 }
296
297 /// Returns an async connection from the client.
298 #[cfg(feature = "aio")]
299 #[cfg_attr(docsrs, doc(cfg(feature = "aio")))]
300 #[deprecated(note = "Use `get_multiplexed_async_connection_with_config` instead")]
301 pub async fn get_multiplexed_async_connection_with_timeouts(
302 &self,
303 response_timeout: std::time::Duration,
304 connection_timeout: std::time::Duration,
305 ) -> RedisResult<crate::aio::MultiplexedConnection> {
306 self.get_multiplexed_async_connection_with_config(
307 &AsyncConnectionConfig::new()
308 .set_connection_timeout(connection_timeout)
309 .set_response_timeout(response_timeout),
310 )
311 .await
312 }
313
314 /// Returns an async connection from the client.
315 #[cfg(feature = "aio")]
316 #[cfg_attr(docsrs, doc(cfg(feature = "aio")))]
317 pub async fn get_multiplexed_async_connection_with_config(
318 &self,
319 config: &AsyncConnectionConfig,
320 ) -> RedisResult<crate::aio::MultiplexedConnection> {
321 match Runtime::locate() {
322 #[cfg(feature = "tokio-comp")]
323 rt @ Runtime::Tokio => self
324 .get_multiplexed_async_connection_inner_with_timeout::<crate::aio::tokio::Tokio>(
325 config, rt,
326 )
327 .await,
328
329 #[cfg(feature = "async-std-comp")]
330 rt @ Runtime::AsyncStd => self.get_multiplexed_async_connection_inner_with_timeout::<
331 crate::aio::async_std::AsyncStd,
332 >(config, rt)
333 .await,
334
335 #[cfg(feature = "smol-comp")]
336 rt @ Runtime::Smol => self.get_multiplexed_async_connection_inner_with_timeout::<
337 crate::aio::smol::Smol,
338 >(config, rt)
339 .await,
340 }
341 }
342
343 /// Returns an async multiplexed connection from the client.
344 ///
345 /// A multiplexed connection can be cloned, allowing requests to be sent concurrently
346 /// on the same underlying connection (tcp/unix socket).
347 #[cfg(feature = "tokio-comp")]
348 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-comp")))]
349 pub async fn get_multiplexed_tokio_connection_with_response_timeouts(
350 &self,
351 response_timeout: std::time::Duration,
352 connection_timeout: std::time::Duration,
353 ) -> RedisResult<crate::aio::MultiplexedConnection> {
354 let result = Runtime::locate()
355 .timeout(
356 connection_timeout,
357 self.get_multiplexed_async_connection_inner::<crate::aio::tokio::Tokio>(
358 &AsyncConnectionConfig::new().set_response_timeout(response_timeout),
359 ),
360 )
361 .await;
362
363 match result {
364 Ok(Ok(connection)) => Ok(connection),
365 Ok(Err(e)) => Err(e),
366 Err(elapsed) => Err(elapsed.into()),
367 }
368 }
369
370 /// Returns an async multiplexed connection from the client.
371 ///
372 /// A multiplexed connection can be cloned, allowing requests to be sent concurrently
373 /// on the same underlying connection (tcp/unix socket).
374 #[cfg(feature = "tokio-comp")]
375 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-comp")))]
376 pub async fn get_multiplexed_tokio_connection(
377 &self,
378 ) -> RedisResult<crate::aio::MultiplexedConnection> {
379 self.get_multiplexed_async_connection_inner::<crate::aio::tokio::Tokio>(
380 &AsyncConnectionConfig::new(),
381 )
382 .await
383 }
384
385 /// Returns an async multiplexed connection from the client.
386 ///
387 /// A multiplexed connection can be cloned, allowing requests to be sent concurrently
388 /// on the same underlying connection (tcp/unix socket).
389 #[cfg(feature = "async-std-comp")]
390 #[cfg_attr(docsrs, doc(cfg(feature = "async-std-comp")))]
391 pub async fn get_multiplexed_async_std_connection_with_timeouts(
392 &self,
393 response_timeout: std::time::Duration,
394 connection_timeout: std::time::Duration,
395 ) -> RedisResult<crate::aio::MultiplexedConnection> {
396 let result = Runtime::locate()
397 .timeout(
398 connection_timeout,
399 self.get_multiplexed_async_connection_inner::<crate::aio::async_std::AsyncStd>(
400 &AsyncConnectionConfig::new().set_response_timeout(response_timeout),
401 ),
402 )
403 .await;
404
405 match result {
406 Ok(Ok(connection)) => Ok(connection),
407 Ok(Err(e)) => Err(e),
408 Err(elapsed) => Err(elapsed.into()),
409 }
410 }
411
412 /// Returns an async multiplexed connection from the client.
413 ///
414 /// A multiplexed connection can be cloned, allowing requests to be sent concurrently
415 /// on the same underlying connection (tcp/unix socket).
416 #[cfg(feature = "async-std-comp")]
417 #[cfg_attr(docsrs, doc(cfg(feature = "async-std-comp")))]
418 pub async fn get_multiplexed_async_std_connection(
419 &self,
420 ) -> RedisResult<crate::aio::MultiplexedConnection> {
421 self.get_multiplexed_async_connection_inner::<crate::aio::async_std::AsyncStd>(
422 &AsyncConnectionConfig::new(),
423 )
424 .await
425 }
426
427 /// Returns an async multiplexed connection from the client and a future which must be polled
428 /// to drive any requests submitted to it (see [Self::get_multiplexed_async_connection]).
429 ///
430 /// A multiplexed connection can be cloned, allowing requests to be sent concurrently
431 /// on the same underlying connection (tcp/unix socket).
432 /// The multiplexer will return a timeout error on any request that takes longer then `response_timeout`.
433 #[cfg(feature = "tokio-comp")]
434 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-comp")))]
435 pub async fn create_multiplexed_tokio_connection_with_response_timeout(
436 &self,
437 response_timeout: std::time::Duration,
438 ) -> RedisResult<(
439 crate::aio::MultiplexedConnection,
440 impl std::future::Future<Output = ()>,
441 )> {
442 self.create_multiplexed_async_connection_inner::<crate::aio::tokio::Tokio>(
443 &AsyncConnectionConfig::new().set_response_timeout(response_timeout),
444 )
445 .await
446 }
447
448 /// Returns an async multiplexed connection from the client and a future which must be polled
449 /// to drive any requests submitted to it (see [Self::get_multiplexed_async_connection]).
450 ///
451 /// A multiplexed connection can be cloned, allowing requests to be sent concurrently
452 /// on the same underlying connection (tcp/unix socket).
453 #[cfg(feature = "tokio-comp")]
454 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-comp")))]
455 pub async fn create_multiplexed_tokio_connection(
456 &self,
457 ) -> RedisResult<(
458 crate::aio::MultiplexedConnection,
459 impl std::future::Future<Output = ()>,
460 )> {
461 self.create_multiplexed_async_connection_inner::<crate::aio::tokio::Tokio>(
462 &AsyncConnectionConfig::new(),
463 )
464 .await
465 }
466
467 /// Returns an async multiplexed connection from the client and a future which must be polled
468 /// to drive any requests submitted to it (see [Self::get_multiplexed_async_connection]).
469 ///
470 /// A multiplexed connection can be cloned, allowing requests to be sent concurrently
471 /// on the same underlying connection (tcp/unix socket).
472 /// The multiplexer will return a timeout error on any request that takes longer then `response_timeout`.
473 #[cfg(feature = "async-std-comp")]
474 #[cfg_attr(docsrs, doc(cfg(feature = "async-std-comp")))]
475 pub async fn create_multiplexed_async_std_connection_with_response_timeout(
476 &self,
477 response_timeout: std::time::Duration,
478 ) -> RedisResult<(
479 crate::aio::MultiplexedConnection,
480 impl std::future::Future<Output = ()>,
481 )> {
482 self.create_multiplexed_async_connection_inner::<crate::aio::async_std::AsyncStd>(
483 &AsyncConnectionConfig::new().set_response_timeout(response_timeout),
484 )
485 .await
486 }
487
488 /// Returns an async multiplexed connection from the client and a future which must be polled
489 /// to drive any requests submitted to it (see [Self::get_multiplexed_async_connection]).
490 ///
491 /// A multiplexed connection can be cloned, allowing requests to be sent concurrently
492 /// on the same underlying connection (tcp/unix socket).
493 #[cfg(feature = "async-std-comp")]
494 #[cfg_attr(docsrs, doc(cfg(feature = "async-std-comp")))]
495 pub async fn create_multiplexed_async_std_connection(
496 &self,
497 ) -> RedisResult<(
498 crate::aio::MultiplexedConnection,
499 impl std::future::Future<Output = ()>,
500 )> {
501 self.create_multiplexed_async_connection_inner::<crate::aio::async_std::AsyncStd>(
502 &AsyncConnectionConfig::new(),
503 )
504 .await
505 }
506
507 /// Returns an async [`ConnectionManager`][connection-manager] from the client.
508 ///
509 /// The connection manager wraps a
510 /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
511 /// connection fails with a connection error, then a new connection is
512 /// established in the background and the error is returned to the caller.
513 ///
514 /// This means that on connection loss at least one command will fail, but
515 /// the connection will be re-established automatically if possible. Please
516 /// refer to the [`ConnectionManager`][connection-manager] docs for
517 /// detailed reconnecting behavior.
518 ///
519 /// A connection manager can be cloned, allowing requests to be sent concurrently
520 /// on the same underlying connection (tcp/unix socket).
521 ///
522 /// [connection-manager]: aio/struct.ConnectionManager.html
523 /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
524 #[cfg(feature = "connection-manager")]
525 #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
526 #[deprecated(note = "use get_connection_manager instead")]
527 pub async fn get_tokio_connection_manager(&self) -> RedisResult<crate::aio::ConnectionManager> {
528 crate::aio::ConnectionManager::new(self.clone()).await
529 }
530
531 /// Returns an async [`ConnectionManager`][connection-manager] from the client.
532 ///
533 /// The connection manager wraps a
534 /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
535 /// connection fails with a connection error, then a new connection is
536 /// established in the background and the error is returned to the caller.
537 ///
538 /// This means that on connection loss at least one command will fail, but
539 /// the connection will be re-established automatically if possible. Please
540 /// refer to the [`ConnectionManager`][connection-manager] docs for
541 /// detailed reconnecting behavior.
542 ///
543 /// A connection manager can be cloned, allowing requests to be sent concurrently
544 /// on the same underlying connection (tcp/unix socket).
545 ///
546 /// [connection-manager]: aio/struct.ConnectionManager.html
547 /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
548 #[cfg(feature = "connection-manager")]
549 #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
550 pub async fn get_connection_manager(&self) -> RedisResult<crate::aio::ConnectionManager> {
551 crate::aio::ConnectionManager::new(self.clone()).await
552 }
553
554 /// Returns an async [`ConnectionManager`][connection-manager] from the client.
555 ///
556 /// The connection manager wraps a
557 /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
558 /// connection fails with a connection error, then a new connection is
559 /// established in the background and the error is returned to the caller.
560 ///
561 /// This means that on connection loss at least one command will fail, but
562 /// the connection will be re-established automatically if possible. Please
563 /// refer to the [`ConnectionManager`][connection-manager] docs for
564 /// detailed reconnecting behavior.
565 ///
566 /// A connection manager can be cloned, allowing requests to be sent concurrently
567 /// on the same underlying connection (tcp/unix socket).
568 ///
569 /// [connection-manager]: aio/struct.ConnectionManager.html
570 /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
571 #[cfg(feature = "connection-manager")]
572 #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
573 #[deprecated(note = "Use `get_connection_manager_with_config` instead")]
574 pub async fn get_tokio_connection_manager_with_backoff(
575 &self,
576 exponent_base: u64,
577 factor: u64,
578 number_of_retries: usize,
579 ) -> RedisResult<crate::aio::ConnectionManager> {
580 use crate::aio::ConnectionManagerConfig;
581
582 let config = ConnectionManagerConfig::new()
583 .set_exponent_base(exponent_base)
584 .set_factor(factor)
585 .set_number_of_retries(number_of_retries);
586 crate::aio::ConnectionManager::new_with_config(self.clone(), config).await
587 }
588
589 /// Returns an async [`ConnectionManager`][connection-manager] from the client.
590 ///
591 /// The connection manager wraps a
592 /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
593 /// connection fails with a connection error, then a new connection is
594 /// established in the background and the error is returned to the caller.
595 ///
596 /// This means that on connection loss at least one command will fail, but
597 /// the connection will be re-established automatically if possible. Please
598 /// refer to the [`ConnectionManager`][connection-manager] docs for
599 /// detailed reconnecting behavior.
600 ///
601 /// A connection manager can be cloned, allowing requests to be sent concurrently
602 /// on the same underlying connection (tcp/unix socket).
603 ///
604 /// [connection-manager]: aio/struct.ConnectionManager.html
605 /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
606 #[cfg(feature = "connection-manager")]
607 #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
608 #[deprecated(note = "Use `get_connection_manager_with_config` instead")]
609 pub async fn get_tokio_connection_manager_with_backoff_and_timeouts(
610 &self,
611 exponent_base: u64,
612 factor: u64,
613 number_of_retries: usize,
614 response_timeout: std::time::Duration,
615 connection_timeout: std::time::Duration,
616 ) -> RedisResult<crate::aio::ConnectionManager> {
617 use crate::aio::ConnectionManagerConfig;
618
619 let config = ConnectionManagerConfig::new()
620 .set_exponent_base(exponent_base)
621 .set_factor(factor)
622 .set_response_timeout(response_timeout)
623 .set_connection_timeout(connection_timeout)
624 .set_number_of_retries(number_of_retries);
625 crate::aio::ConnectionManager::new_with_config(self.clone(), config).await
626 }
627
628 /// Returns an async [`ConnectionManager`][connection-manager] from the client.
629 ///
630 /// The connection manager wraps a
631 /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
632 /// connection fails with a connection error, then a new connection is
633 /// established in the background and the error is returned to the caller.
634 ///
635 /// This means that on connection loss at least one command will fail, but
636 /// the connection will be re-established automatically if possible. Please
637 /// refer to the [`ConnectionManager`][connection-manager] docs for
638 /// detailed reconnecting behavior.
639 ///
640 /// A connection manager can be cloned, allowing requests to be sent concurrently
641 /// on the same underlying connection (tcp/unix socket).
642 ///
643 /// [connection-manager]: aio/struct.ConnectionManager.html
644 /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
645 #[cfg(feature = "connection-manager")]
646 #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
647 #[deprecated(note = "Use `get_connection_manager_with_config` instead")]
648 pub async fn get_connection_manager_with_backoff_and_timeouts(
649 &self,
650 exponent_base: u64,
651 factor: u64,
652 number_of_retries: usize,
653 response_timeout: std::time::Duration,
654 connection_timeout: std::time::Duration,
655 ) -> RedisResult<crate::aio::ConnectionManager> {
656 use crate::aio::ConnectionManagerConfig;
657
658 let config = ConnectionManagerConfig::new()
659 .set_exponent_base(exponent_base)
660 .set_factor(factor)
661 .set_response_timeout(response_timeout)
662 .set_connection_timeout(connection_timeout)
663 .set_number_of_retries(number_of_retries);
664 crate::aio::ConnectionManager::new_with_config(self.clone(), config).await
665 }
666
667 /// Returns an async [`ConnectionManager`][connection-manager] from the client.
668 ///
669 /// The connection manager wraps a
670 /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
671 /// connection fails with a connection error, then a new connection is
672 /// established in the background and the error is returned to the caller.
673 ///
674 /// This means that on connection loss at least one command will fail, but
675 /// the connection will be re-established automatically if possible. Please
676 /// refer to the [`ConnectionManager`][connection-manager] docs for
677 /// detailed reconnecting behavior.
678 ///
679 /// A connection manager can be cloned, allowing requests to be sent concurrently
680 /// on the same underlying connection (tcp/unix socket).
681 ///
682 /// [connection-manager]: aio/struct.ConnectionManager.html
683 /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
684 #[cfg(feature = "connection-manager")]
685 #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
686 pub async fn get_connection_manager_with_config(
687 &self,
688 config: crate::aio::ConnectionManagerConfig,
689 ) -> RedisResult<crate::aio::ConnectionManager> {
690 crate::aio::ConnectionManager::new_with_config(self.clone(), config).await
691 }
692
693 /// Returns an async [`ConnectionManager`][connection-manager] from the client.
694 ///
695 /// The connection manager wraps a
696 /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
697 /// connection fails with a connection error, then a new connection is
698 /// established in the background and the error is returned to the caller.
699 ///
700 /// This means that on connection loss at least one command will fail, but
701 /// the connection will be re-established automatically if possible. Please
702 /// refer to the [`ConnectionManager`][connection-manager] docs for
703 /// detailed reconnecting behavior.
704 ///
705 /// A connection manager can be cloned, allowing requests to be be sent concurrently
706 /// on the same underlying connection (tcp/unix socket).
707 ///
708 /// [connection-manager]: aio/struct.ConnectionManager.html
709 /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
710 #[cfg(feature = "connection-manager")]
711 #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
712 #[deprecated(note = "Use `get_connection_manager_with_config` instead")]
713 pub async fn get_connection_manager_with_backoff(
714 &self,
715 exponent_base: u64,
716 factor: u64,
717 number_of_retries: usize,
718 ) -> RedisResult<crate::aio::ConnectionManager> {
719 use crate::aio::ConnectionManagerConfig;
720
721 let config = ConnectionManagerConfig::new()
722 .set_exponent_base(exponent_base)
723 .set_factor(factor)
724 .set_number_of_retries(number_of_retries);
725 crate::aio::ConnectionManager::new_with_config(self.clone(), config).await
726 }
727
728 async fn get_multiplexed_async_connection_inner_with_timeout<T>(
729 &self,
730 config: &AsyncConnectionConfig,
731 rt: Runtime,
732 ) -> RedisResult<crate::aio::MultiplexedConnection>
733 where
734 T: crate::aio::RedisRuntime,
735 {
736 let result = if let Some(connection_timeout) = config.connection_timeout {
737 rt.timeout(
738 connection_timeout,
739 self.get_multiplexed_async_connection_inner::<T>(config),
740 )
741 .await
742 } else {
743 Ok(self
744 .get_multiplexed_async_connection_inner::<T>(config)
745 .await)
746 };
747
748 match result {
749 Ok(Ok(connection)) => Ok(connection),
750 Ok(Err(e)) => Err(e),
751 Err(elapsed) => Err(elapsed.into()),
752 }
753 }
754
755 async fn get_multiplexed_async_connection_inner<T>(
756 &self,
757 config: &AsyncConnectionConfig,
758 ) -> RedisResult<crate::aio::MultiplexedConnection>
759 where
760 T: crate::aio::RedisRuntime,
761 {
762 let (mut connection, driver) = self
763 .create_multiplexed_async_connection_inner::<T>(config)
764 .await?;
765 let handle = T::spawn(driver);
766 connection.set_task_handle(handle);
767 Ok(connection)
768 }
769
770 async fn create_multiplexed_async_connection_inner<T>(
771 &self,
772 config: &AsyncConnectionConfig,
773 ) -> RedisResult<(
774 crate::aio::MultiplexedConnection,
775 impl std::future::Future<Output = ()>,
776 )>
777 where
778 T: crate::aio::RedisRuntime,
779 {
780 let resolver = config
781 .dns_resolver
782 .as_deref()
783 .unwrap_or(&DefaultAsyncDNSResolver);
784 let con = self
785 .get_simple_async_connection::<T>(resolver, &config.tcp_settings)
786 .await?;
787 crate::aio::MultiplexedConnection::new_with_config(
788 &self.connection_info.redis,
789 con,
790 config.clone(),
791 )
792 .await
793 }
794
795 async fn get_simple_async_connection_dynamically(
796 &self,
797 dns_resolver: &dyn AsyncDNSResolver,
798 tcp_settings: &TcpSettings,
799 ) -> RedisResult<Pin<Box<dyn crate::aio::AsyncStream + Send + Sync>>> {
800 match Runtime::locate() {
801 #[cfg(feature = "tokio-comp")]
802 Runtime::Tokio => {
803 self.get_simple_async_connection::<crate::aio::tokio::Tokio>(
804 dns_resolver,
805 tcp_settings,
806 )
807 .await
808 }
809
810 #[cfg(feature = "async-std-comp")]
811 Runtime::AsyncStd => {
812 self.get_simple_async_connection::<crate::aio::async_std::AsyncStd>(
813 dns_resolver,
814 tcp_settings,
815 )
816 .await
817 }
818
819 #[cfg(feature = "smol-comp")]
820 Runtime::Smol => {
821 self.get_simple_async_connection::<crate::aio::smol::Smol>(
822 dns_resolver,
823 tcp_settings,
824 )
825 .await
826 }
827 }
828 }
829
830 async fn get_simple_async_connection<T>(
831 &self,
832 dns_resolver: &dyn AsyncDNSResolver,
833 tcp_settings: &TcpSettings,
834 ) -> RedisResult<Pin<Box<dyn crate::aio::AsyncStream + Send + Sync>>>
835 where
836 T: crate::aio::RedisRuntime,
837 {
838 Ok(
839 crate::aio::connect_simple::<T>(&self.connection_info, dns_resolver, tcp_settings)
840 .await?
841 .boxed(),
842 )
843 }
844
845 #[cfg(feature = "connection-manager")]
846 pub(crate) fn connection_info(&self) -> &ConnectionInfo {
847 &self.connection_info
848 }
849
850 /// Returns an async receiver for pub-sub messages.
851 #[cfg(feature = "aio")]
852 // TODO - do we want to type-erase pubsub using a trait, to allow us to replace it with a different implementation later?
853 pub async fn get_async_pubsub(&self) -> RedisResult<crate::aio::PubSub> {
854 let connection = self
855 .get_simple_async_connection_dynamically(
856 &DefaultAsyncDNSResolver,
857 &TcpSettings::default(),
858 )
859 .await?;
860
861 crate::aio::PubSub::new(&self.connection_info.redis, connection).await
862 }
863
864 /// Returns an async receiver for monitor messages.
865 #[cfg(feature = "aio")]
866 pub async fn get_async_monitor(&self) -> RedisResult<crate::aio::Monitor> {
867 let connection = self
868 .get_simple_async_connection_dynamically(
869 &DefaultAsyncDNSResolver,
870 &TcpSettings::default(),
871 )
872 .await?;
873 crate::aio::Monitor::new(&self.connection_info.redis, connection).await
874 }
875}
876
877#[cfg(feature = "aio")]
878use crate::aio::Runtime;
879
880impl ConnectionLike for Client {
881 fn req_packed_command(&mut self, cmd: &[u8]) -> RedisResult<Value> {
882 self.get_connection()?.req_packed_command(cmd)
883 }
884
885 fn req_packed_commands(
886 &mut self,
887 cmd: &[u8],
888 offset: usize,
889 count: usize,
890 ) -> RedisResult<Vec<Value>> {
891 self.get_connection()?
892 .req_packed_commands(cmd, offset, count)
893 }
894
895 fn get_db(&self) -> i64 {
896 self.connection_info.redis.db
897 }
898
899 fn check_connection(&mut self) -> bool {
900 if let Ok(mut conn) = self.get_connection() {
901 conn.check_connection()
902 } else {
903 false
904 }
905 }
906
907 fn is_open(&self) -> bool {
908 if let Ok(conn) = self.get_connection() {
909 conn.is_open()
910 } else {
911 false
912 }
913 }
914}
915
916#[cfg(test)]
917mod test {
918 use super::*;
919
920 #[test]
921 fn regression_293_parse_ipv6_with_interface() {
922 assert!(Client::open(("fe80::cafe:beef%eno1", 6379)).is_ok());
923 }
924}