tetratto_core/database/
transfers.rs

1use std::collections::HashMap;
2use crate::model::{
3    auth::{Notification, User},
4    economy::{CoinTransfer, CoinTransferMethod, CoinTransferSource, Product},
5    Error, Result,
6};
7use crate::{auto_method, DataManager};
8use oiseau::{cache::Cache, execute, get, params, query_row, query_rows, PostgresRow};
9
10impl DataManager {
11    /// Get a [`CoinTransfer`] from an SQL row.
12    pub(crate) fn get_transfer_from_row(x: &PostgresRow) -> CoinTransfer {
13        CoinTransfer {
14            id: get!(x->0(i64)) as usize,
15            created: get!(x->1(i64)) as usize,
16            sender: get!(x->2(i64)) as usize,
17            receiver: get!(x->3(i64)) as usize,
18            amount: get!(x->4(i32)),
19            is_pending: get!(x->5(i32)) as i8 == 1,
20            method: serde_json::from_str(&get!(x->6(String))).unwrap(),
21            source: serde_json::from_str(&get!(x->7(String))).unwrap(),
22        }
23    }
24
25    auto_method!(get_transfer_by_id(usize as i64)@get_transfer_from_row -> "SELECT * FROM transfers WHERE id = $1" --name="transfer" --returns=CoinTransfer --cache-key-tmpl="atto.transfer:{}");
26
27    /// Fill a list of transfers with their users and product.
28    pub async fn fill_transfers(
29        &self,
30        list: Vec<CoinTransfer>,
31    ) -> Result<Vec<(User, User, Option<Product>, CoinTransfer)>> {
32        let mut out = Vec::new();
33        let mut seen_users: HashMap<usize, User> = HashMap::new();
34        let mut seen_products: HashMap<usize, Product> = HashMap::new();
35
36        for transfer in list {
37            out.push((
38                if let Some(user) = seen_users.get(&transfer.sender) {
39                    user.to_owned()
40                } else {
41                    let user = self.get_user_by_id(transfer.sender).await?;
42                    seen_users.insert(user.id, user.clone());
43                    user
44                },
45                if let Some(user) = seen_users.get(&transfer.receiver) {
46                    user.to_owned()
47                } else {
48                    let user = self.get_user_by_id(transfer.receiver).await?;
49                    seen_users.insert(user.id, user.clone());
50                    user
51                },
52                match transfer.method {
53                    CoinTransferMethod::Transfer => None,
54                    CoinTransferMethod::Purchase(id) => {
55                        if let Some(product) = seen_products.get(&id) {
56                            Some(product.to_owned())
57                        } else {
58                            match self.get_product_by_id(id).await {
59                                Ok(product) => {
60                                    seen_products.insert(product.id, product.clone());
61                                    Some(product)
62                                }
63                                Err(_) => None,
64                            }
65                        }
66                    }
67                },
68                transfer,
69            ));
70        }
71
72        Ok(out)
73    }
74
75    /// Get all transfers by user.
76    ///
77    /// # Arguments
78    /// * `id` - the ID of the user to fetch transfers for
79    /// * `batch` - the limit of items in each page
80    /// * `page` - the page number
81    pub async fn get_transfers_by_user(
82        &self,
83        id: usize,
84        batch: usize,
85        page: usize,
86    ) -> Result<Vec<CoinTransfer>> {
87        let conn = match self.0.connect().await {
88            Ok(c) => c,
89            Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
90        };
91
92        let res = query_rows!(
93            &conn,
94            "SELECT * FROM transfers WHERE sender = $1 OR receiver = $1 ORDER BY created DESC LIMIT $2 OFFSET $3",
95            &[&(id as i64), &(batch as i64), &((page * batch) as i64)],
96            |x| { Self::get_transfer_from_row(x) }
97        );
98
99        if res.is_err() {
100            return Err(Error::GeneralNotFound("transfer".to_string()));
101        }
102
103        Ok(res.unwrap())
104    }
105
106    /// Get a transfer by user and method.
107    ///
108    /// # Arguments
109    /// * `id` - the ID of the user to fetch transfers for
110    /// * `method` - the transfer method
111    pub async fn get_transfer_by_sender_method(
112        &self,
113        id: usize,
114        method: CoinTransferMethod,
115    ) -> Result<CoinTransfer> {
116        let conn = match self.0.connect().await {
117            Ok(c) => c,
118            Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
119        };
120
121        let res = query_row!(
122            &conn,
123            "SELECT * FROM transfers WHERE sender = $1 AND method = $2 LIMIT 1",
124            params![&(id as i64), &serde_json::to_string(&method).unwrap()],
125            |x| { Ok(Self::get_transfer_from_row(x)) }
126        );
127
128        if res.is_err() {
129            return Err(Error::GeneralNotFound("transfer".to_string()));
130        }
131
132        Ok(res.unwrap())
133    }
134
135    /// Create a new transfer in the database.
136    ///
137    /// # Arguments
138    /// * `data` - a mock [`CoinTransfer`] object to insert
139    pub async fn create_transfer(&self, data: &mut CoinTransfer, apply: bool) -> Result<usize> {
140        // check values
141        let mut sender = self.get_user_by_id(data.sender).await?;
142        let mut receiver = self.get_user_by_id(data.receiver).await?;
143
144        if sender.id == self.0.0.system_user {
145            // system user can create coins from the void
146            sender.coins = i32::MAX;
147        }
148
149        let (sender_bankrupt, receiver_bankrupt) = data.apply(&mut sender, &mut receiver);
150
151        if sender_bankrupt | receiver_bankrupt {
152            return Err(Error::MiscError(
153                "One party of this transfer cannot afford this".to_string(),
154            ));
155        }
156
157        if apply {
158            if sender.id != self.0.0.system_user {
159                self.update_user_coins(sender.id, sender.coins).await?;
160            }
161
162            self.update_user_coins(receiver.id, receiver.coins).await?;
163
164            // handle refund notification
165            if data.source == CoinTransferSource::Refund {
166                self.create_notification(Notification::new(
167                    "A coin refund has been issued to your account!".to_string(),
168                    "You've been issued a refund for a prior purchase. The product will remain in your account, but your coins have been returned.".to_string(),
169                    receiver.id,
170                ))
171                .await?;
172            }
173        } else {
174            // we haven't applied the transfer, so this must be pending
175            data.is_pending = true;
176        }
177
178        // ...
179        let conn = match self.0.connect().await {
180            Ok(c) => c,
181            Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
182        };
183
184        let res = execute!(
185            &conn,
186            "INSERT INTO transfers VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
187            params![
188                &(data.id as i64),
189                &(data.created as i64),
190                &(data.sender as i64),
191                &(data.receiver as i64),
192                &data.amount,
193                &{ if data.is_pending { 1 } else { 0 } },
194                &serde_json::to_string(&data.method).unwrap(),
195                &serde_json::to_string(&data.source).unwrap(),
196            ]
197        );
198
199        if let Err(e) = res {
200            return Err(Error::DatabaseError(e.to_string()));
201        }
202
203        Ok(data.id)
204    }
205
206    /// Apply a pending transfer.
207    pub async fn apply_transfer(&self, id: usize) -> Result<()> {
208        let transfer = self.get_transfer_by_id(id).await?;
209
210        let mut sender = self.get_user_by_id(transfer.sender).await?;
211        let mut receiver = self.get_user_by_id(transfer.receiver).await?;
212        let (sender_bankrupt, receiver_bankrupt) = transfer.apply(&mut sender, &mut receiver);
213
214        if sender_bankrupt | receiver_bankrupt {
215            return Err(Error::MiscError(
216                "One party of this transfer cannot afford this".to_string(),
217            ));
218        }
219
220        self.update_user_coins(sender.id, sender.coins).await?;
221        self.update_user_coins(receiver.id, receiver.coins).await?;
222        self.update_transfer_is_pending(id, 0).await?;
223
224        self.create_notification(Notification::new(
225            "Purchase fulfilled!".to_string(),
226            format!(
227                "You've just successfully fulfilled a purchase for a [product](/product/{}).",
228                match transfer.method {
229                    CoinTransferMethod::Purchase(x) => x,
230                    _ => 0,
231                }
232            ),
233            receiver.id,
234        ))
235        .await?;
236
237        self.create_notification(Notification::new(
238            "Purchase fulfilled!".to_string(),
239            format!(
240                "Your purchase for a [product](/product/{}) has been fulfilled.",
241                match transfer.method {
242                    CoinTransferMethod::Purchase(x) => x,
243                    _ => 0,
244                }
245            ),
246            sender.id,
247        ))
248        .await?;
249
250        Ok(())
251    }
252
253    auto_method!(update_transfer_is_pending(i32) -> "UPDATE transfers SET is_pending = $1 WHERE id = $2" --cache-key-tmpl="atto.transfer:{}");
254}