tetratto_core/database/
notifications.rs

1use oiseau::cache::Cache;
2use crate::model::socket::{CrudMessageType, PacketType, SocketMessage, SocketMethod};
3use crate::model::{Error, Result, auth::Notification, auth::User, permissions::FinePermission};
4use crate::{auto_method, DataManager};
5
6use oiseau::{PostgresRow, cache::redis::Commands};
7
8use oiseau::{execute, get, query_rows, params};
9
10impl DataManager {
11    /// Get a [`Notification`] from an SQL row.
12    pub(crate) fn get_notification_from_row(x: &PostgresRow) -> Notification {
13        Notification {
14            id: get!(x->0(i64)) as usize,
15            created: get!(x->1(i64)) as usize,
16            title: get!(x->2(String)),
17            content: get!(x->3(String)),
18            owner: get!(x->4(i64)) as usize,
19            read: get!(x->5(i32)) as i8 == 1,
20            tag: get!(x->6(String)),
21        }
22    }
23
24    auto_method!(get_notification_by_id()@get_notification_from_row -> "SELECT * FROM notifications WHERE id = $1" --name="notification" --returns=Notification --cache-key-tmpl="atto.notification:{}");
25
26    /// Get all notifications by `owner`.
27    pub async fn get_notifications_by_owner(&self, owner: usize) -> Result<Vec<Notification>> {
28        let conn = match self.0.connect().await {
29            Ok(c) => c,
30            Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
31        };
32
33        let res = query_rows!(
34            &conn,
35            "SELECT * FROM notifications WHERE owner = $1 ORDER BY created DESC",
36            &[&(owner as i64)],
37            |x| { Self::get_notification_from_row(x) }
38        );
39
40        if res.is_err() {
41            return Err(Error::GeneralNotFound("notification".to_string()));
42        }
43
44        Ok(res.unwrap())
45    }
46
47    /// Get all notifications by `owner` (paginated).
48    pub async fn get_notifications_by_owner_paginated(
49        &self,
50        owner: usize,
51        batch: usize,
52        page: usize,
53    ) -> Result<Vec<Notification>> {
54        let conn = match self.0.connect().await {
55            Ok(c) => c,
56            Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
57        };
58
59        let res = query_rows!(
60            &conn,
61            "SELECT * FROM notifications WHERE owner = $1 ORDER BY created DESC LIMIT $2 OFFSET $3",
62            &[&(owner as i64), &(batch as i64), &((page * batch) as i64)],
63            |x| { Self::get_notification_from_row(x) }
64        );
65
66        if res.is_err() {
67            return Err(Error::GeneralNotFound("notification".to_string()));
68        }
69
70        Ok(res.unwrap())
71    }
72
73    /// Get all notifications by `tag`.
74    pub async fn get_notifications_by_tag(&self, tag: &str) -> Result<Vec<Notification>> {
75        let conn = match self.0.connect().await {
76            Ok(c) => c,
77            Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
78        };
79
80        let res = query_rows!(
81            &conn,
82            "SELECT * FROM notifications WHERE tag = $1 ORDER BY created DESC",
83            &[&tag],
84            |x| { Self::get_notification_from_row(x) }
85        );
86
87        if res.is_err() {
88            return Err(Error::GeneralNotFound("notification".to_string()));
89        }
90
91        Ok(res.unwrap())
92    }
93
94    /// Create a new notification in the database.
95    ///
96    /// # Arguments
97    /// * `data` - a mock [`Notification`] object to insert
98    pub async fn create_notification(&self, data: Notification) -> Result<()> {
99        let conn = match self.0.connect().await {
100            Ok(c) => c,
101            Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
102        };
103
104        let res = execute!(
105            &conn,
106            "INSERT INTO notifications VALUES ($1, $2, $3, $4, $5, $6, $7)",
107            params![
108                &(data.id as i64),
109                &(data.created as i64),
110                &data.title,
111                &data.content,
112                &(data.owner as i64),
113                &{ if data.read { 1 } else { 0 } },
114                &data.tag
115            ]
116        );
117
118        if let Err(e) = res {
119            return Err(Error::DatabaseError(e.to_string()));
120        }
121
122        // incr notification count
123        if let Err(e) = self.incr_user_notifications(data.owner).await {
124            return Err(e);
125        };
126
127        // post event
128        let mut con = self.0.1.get_con().await;
129
130        if let Err(e) = con.publish::<String, String, ()>(
131            format!("{}/notifs", data.owner),
132            serde_json::to_string(&SocketMessage {
133                method: SocketMethod::Packet(PacketType::Crud(CrudMessageType::Create)),
134                data: serde_json::to_string(&data).unwrap(),
135            })
136            .unwrap(),
137        ) {
138            return Err(Error::MiscError(e.to_string()));
139        }
140
141        // return
142        Ok(())
143    }
144
145    pub async fn delete_notification(&self, id: usize, user: &User) -> Result<()> {
146        let notification = self.get_notification_by_id(id).await?;
147
148        if user.id != notification.owner
149            && !user.permissions.check(FinePermission::MANAGE_NOTIFICATIONS)
150        {
151            return Err(Error::NotAllowed);
152        }
153
154        let conn = match self.0.connect().await {
155            Ok(c) => c,
156            Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
157        };
158
159        let res = execute!(
160            &conn,
161            "DELETE FROM notifications WHERE id = $1",
162            &[&(id as i64)]
163        );
164
165        if let Err(e) = res {
166            return Err(Error::DatabaseError(e.to_string()));
167        }
168
169        self.0.1.remove(format!("atto.notification:{}", id)).await;
170
171        // decr notification count
172        if !notification.read {
173            self.decr_user_notifications(notification.owner)
174                .await
175                .unwrap();
176        }
177
178        // post event
179        let mut con = self.0.1.get_con().await;
180
181        if let Err(e) = con.publish::<String, String, ()>(
182            format!("{}/notifs", notification.owner),
183            serde_json::to_string(&SocketMessage {
184                method: SocketMethod::Packet(PacketType::Crud(CrudMessageType::Delete)),
185                data: notification.id.to_string(),
186            })
187            .unwrap(),
188        ) {
189            return Err(Error::MiscError(e.to_string()));
190        }
191
192        // return
193        Ok(())
194    }
195
196    pub async fn delete_all_notifications(&self, user: &User) -> Result<()> {
197        let conn = match self.0.connect().await {
198            Ok(c) => c,
199            Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
200        };
201
202        let res = execute!(
203            &conn,
204            "DELETE FROM notifications WHERE owner = $1",
205            &[&(user.id as i64)]
206        );
207
208        if let Err(e) = res {
209            return Err(Error::DatabaseError(e.to_string()));
210        }
211
212        self.update_user_notification_count(user.id, 0).await?;
213        Ok(())
214    }
215
216    pub async fn delete_all_notifications_by_tag(&self, user: &User, tag: &str) -> Result<()> {
217        let conn = match self.0.connect().await {
218            Ok(c) => c,
219            Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
220        };
221
222        let res = execute!(
223            &conn,
224            "DELETE FROM notifications WHERE owner = $1 AND tag = $2",
225            params![&(user.id as i64), tag]
226        );
227
228        if let Err(e) = res {
229            return Err(Error::DatabaseError(e.to_string()));
230        }
231
232        Ok(())
233    }
234
235    pub async fn update_notification_read(
236        &self,
237        id: usize,
238        new_read: bool,
239        user: &User,
240    ) -> Result<()> {
241        let y = self.get_notification_by_id(id).await?;
242
243        if y.owner != user.id && !user.permissions.check(FinePermission::MANAGE_NOTIFICATIONS) {
244            return Err(Error::NotAllowed);
245        }
246
247        // ...
248        let conn = match self.0.connect().await {
249            Ok(c) => c,
250            Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
251        };
252
253        let res = execute!(
254            &conn,
255            "UPDATE notifications SET read = $1 WHERE id = $2",
256            params![&{ if new_read { 1 } else { 0 } }, &(id as i64)]
257        );
258
259        if let Err(e) = res {
260            return Err(Error::DatabaseError(e.to_string()));
261        }
262
263        self.0.1.remove(format!("atto.notification:{}", id)).await;
264
265        if (y.read) && (!new_read) {
266            self.incr_user_notifications(user.id).await?;
267        } else if (!y.read) && (new_read) {
268            self.decr_user_notifications(user.id).await?;
269        }
270
271        Ok(())
272    }
273
274    pub async fn update_all_notifications_read(&self, user: &User, read: bool) -> Result<()> {
275        let notifications = self.get_notifications_by_owner(user.id).await?;
276
277        let mut changed_count: i32 = 0;
278        for notification in notifications {
279            if notification.read == read {
280                // no need to update this
281                continue;
282            }
283
284            changed_count += 1;
285
286            self.0
287                .1
288                .remove(format!("atto.notification:{}", notification.id))
289                .await;
290        }
291
292        // execute
293        let conn = match self.0.connect().await {
294            Ok(c) => c,
295            Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
296        };
297
298        let res = execute!(
299            &conn,
300            "UPDATE notifications SET read = $1 WHERE owner = $2",
301            params![&{ if read { 1 } else { 0 } }, &(user.id as i64)]
302        );
303
304        if let Err(e) = res {
305            return Err(Error::DatabaseError(e.to_string()));
306        }
307
308        // use changed_count to update user counts
309        if read == false {
310            // we don't need to update when marking things as read since that should just be 0
311            self.update_user_notification_count(user.id, changed_count)
312                .await?;
313        } else {
314            self.update_user_notification_count(user.id, 0).await?;
315        }
316
317        // ...
318        Ok(())
319    }
320}