bark/movement/manager.rs
1use std::cmp::PartialEq;
2use std::collections::{HashMap, HashSet};
3use std::sync::Arc;
4
5use tokio::sync::RwLock;
6
7use crate::movement::{Movement, MovementId, MovementStatus, MovementSubsystem};
8use crate::movement::error::MovementError;
9use crate::movement::update::MovementUpdate;
10use crate::persist::BarkPersister;
11use crate::subsystem::Subsystem;
12
13/// A minimalist helper class to handle movement registration and updating based on unique
14/// [SubsystemId] values.
15pub struct MovementManager {
16 db: Arc<dyn BarkPersister>,
17 subsystem_ids: RwLock<HashSet<Subsystem>>,
18 active_movements: RwLock<HashMap<MovementId, Arc<RwLock<Movement>>>>,
19}
20
21impl MovementManager {
22 /// Creates an instances of the [MovementManager].
23 pub fn new(db: Arc<dyn BarkPersister>) -> Self {
24 Self {
25 db,
26 subsystem_ids: RwLock::new(HashSet::new()),
27 active_movements: RwLock::new(HashMap::new()),
28 }
29 }
30
31 /// Registers a subsystem with the movement manager. Subsystems are identified using unique
32 /// names, to maintain this guarantee a unique [SubsystemId] will be generated and returned by
33 /// this function. Future calls to register or modify movements must provide this ID.
34 pub async fn register_subsystem(&self, id: Subsystem) -> anyhow::Result<(), MovementError> {
35 let mut guard = self.subsystem_ids.write().await;
36 if guard.contains(&id) {
37 Err(MovementError::SubsystemError {
38 id, error: "Subsystem already registered".into(),
39 })
40 } else {
41 guard.insert(id);
42 Ok(())
43 }
44 }
45
46 /// Begins the process of creating a new movement. This newly created movement will be defaulted
47 /// to a [MovementStatus::Pending] state. It can then be updated by using [MovementUpdate] in
48 /// combination with [MovementManager::update_movement].
49 ///
50 /// [MovementManager::finish_movement] can be used once a movement has finished (whether
51 /// successful or not).
52 ///
53 /// Parameters:
54 /// - subsystem_id: The ID of the subsystem that wishes to start a new movement.
55 /// - movement_kind: A descriptor for the type of movement being performed, e.g. "send",
56 /// "receive", "round".
57 ///
58 /// Errors:
59 /// - If the subsystem ID is not recognized.
60 /// - If a database error occurs.
61 pub async fn new_movement(
62 &self,
63 subsystem_id: Subsystem,
64 movement_kind: impl Into<String>,
65 ) -> anyhow::Result<MovementId, MovementError> {
66 self.db.create_new_movement(
67 MovementStatus::Pending,
68 &MovementSubsystem {
69 name: subsystem_id.as_name().to_string(),
70 kind: movement_kind.into(),
71 },
72 chrono::Local::now(),
73 ).await.map_err(|e| MovementError::CreationError { e })
74 }
75
76 /// Creates a new [Movement] and returns a [MovementGuard] to manage it. The guard will call
77 /// [MovementManager::finish_movement] on drop unless [MovementGuard::success] has already been
78 /// called.
79 ///
80 /// See [MovementManager::new_movement] and [MovementGuard::new] for more information.
81 ///
82 /// Parameters:
83 /// - subsystem_id: The ID of the subsystem that wishes to start a new movement.
84 /// - movement_kind: A descriptor for the type of movement being performed, e.g. "send",
85 /// "receive", "round".
86 /// - on_drop: Determines what status the movement will be set to when the guard is dropped.
87 pub async fn new_guarded_movement(
88 self: &Arc<Self>,
89 subsystem_id: Subsystem,
90 movement_kind: impl Into<String>,
91 on_drop: OnDropStatus,
92 ) -> anyhow::Result<MovementGuard, MovementError> {
93 Ok(MovementGuard::new(
94 self.new_movement(subsystem_id, movement_kind).await?, self.clone(), on_drop,
95 ))
96 }
97
98 /// Similar to [MovementManager::new_movement] but it immediately calls
99 /// [MovementManager::update_movement] afterward.
100 ///
101 /// Parameters:
102 /// - subsystem_id: The ID of the subsystem that wishes to start a new movement.
103 /// - movement_kind: A descriptor for the type of movement being performed, e.g. "send",
104 /// "receive", "round".
105 /// - update: Describes the initial state of the movement.
106 ///
107 /// Errors:
108 /// - If the subsystem ID is not recognized.
109 /// - If a database error occurs.
110 pub async fn new_movement_with_update(
111 &self,
112 subsystem_id: Subsystem,
113 movement_kind: impl Into<String>,
114 update: MovementUpdate,
115 ) -> anyhow::Result<MovementId, MovementError> {
116 let id = self.new_movement(subsystem_id, movement_kind).await?;
117 self.update_movement(id, update).await?;
118 Ok(id)
119 }
120
121 /// Similar to [MovementManager::new_guarded_movement] but it immediately calls
122 /// [MovementManager::update_movement] after creating the [Movement].
123 ///
124 /// Parameters:
125 /// - subsystem_id: The ID of the subsystem that wishes to start a new movement.
126 /// - movement_kind: A descriptor for the type of movement being performed, e.g. "send",
127 /// "receive", "round".
128 /// - on_drop: Determines what status the movement will be set to when the guard is dropped.
129 /// - update: Describes the initial state of the movement.
130 ///
131 /// Errors:
132 /// - If the subsystem ID is not recognized.
133 /// - If a database error occurs.
134 pub async fn new_guarded_movement_with_update(
135 self: &Arc<Self>,
136 subsystem_id: Subsystem,
137 movement_kind: impl Into<String>,
138 on_drop: OnDropStatus,
139 update: MovementUpdate,
140 ) -> anyhow::Result<MovementGuard, MovementError> {
141 Ok(MovementGuard::new(
142 self.new_movement_with_update(subsystem_id, movement_kind, update).await?,
143 self.clone(),
144 on_drop,
145 ))
146 }
147
148 /// Creates and marks a [Movement] as finished based on the given parameters. This is useful for
149 /// one-shot movements where the details are known at the time of creation, an example would be
150 /// when receiving funds asynchronously from a third party.
151 ///
152 /// Parameters:
153 /// - subsystem_id: The ID of the subsystem that wishes to start a new movement.
154 /// - movement_kind: A descriptor for the type of movement being performed, e.g. "send",
155 /// "receive", "round".
156 /// - status: The [MovementStatus] to set. This can't be [MovementStatus::Pending].
157 /// - details: Contains information about the movement, e.g. what VTXOs were consumed or
158 /// produced.
159 ///
160 /// Errors:
161 /// - If the subsystem ID is not recognized.
162 /// - If [MovementStatus::Pending] is given.
163 /// - If a database error occurs.
164 pub async fn new_finished_movement(
165 &self,
166 subsystem_id: Subsystem,
167 movement_kind: impl Into<String>,
168 status: MovementStatus,
169 details: MovementUpdate,
170 ) -> anyhow::Result<MovementId, MovementError> {
171 if status == MovementStatus::Pending {
172 return Err(MovementError::IncorrectPendingStatus);
173 }
174 let id = self.new_movement(subsystem_id, movement_kind).await?;
175 let mut movement = self.db.get_movement_by_id(id).await
176 .map_err(|e| MovementError::LoadError { id, e })?;
177 let at = chrono::Local::now();
178 details.apply_to(&mut movement, at);
179 movement.status = status;
180 movement.time.completed_at = Some(at);
181 self.db.update_movement(&movement).await
182 .map_err(|e| MovementError::PersisterError { id, e })?;
183 Ok(id)
184 }
185
186 /// Updates a movement with the given parameters.
187 ///
188 /// See also: [MovementManager::new_movement] and [MovementManager::finish_movement]
189 ///
190 /// Parameters:
191 /// - id: The ID of the movement previously created by [MovementManager::new_movement].
192 /// - update: Specifies properties to set on the movement. `Option` fields will be ignored if
193 /// they are `None`. `Some` will result in that particular field being overwritten.
194 ///
195 /// Errors:
196 /// - If the [MovementId] is not recognized.
197 /// - If a movement is not [MovementStatus::Pending].
198 /// - If a database error occurs.
199 pub async fn update_movement(
200 &self,
201 id: MovementId,
202 update: MovementUpdate,
203 ) -> anyhow::Result<(), MovementError> {
204 // Ensure the movement is loaded.
205 self.load_movement_into_cache(id).await?;
206
207 // Apply the update to the movement.
208 update.apply_to(&mut *self.get_movement_lock(id).await?.write().await, chrono::Local::now());
209
210 // Persist the changes using a read lock.
211 let lock = self.get_movement_lock(id).await?;
212 let movement = lock.read().await;
213 self.db.update_movement(&movement).await
214 .map_err(|e| MovementError::PersisterError { id, e })?;
215
216 // Drop the movement if it's in a finished state as this was likely a one-time update.
217 if movement.status != MovementStatus::Pending {
218 self.unload_movement_from_cache(id).await?;
219 }
220 Ok(())
221 }
222
223 /// Finalizes a movement, setting it to the given [MovementStatus].
224 ///
225 /// See also: [MovementManager::new_movement] and [MovementManager::update_movement]
226 ///
227 /// Parameters:
228 /// - id: The ID of the movement previously created by [MovementManager::new_movement].
229 /// - new_status: The final [MovementStatus] to set. This can't be [MovementStatus::Pending].
230 ///
231 /// Errors:
232 /// - If the movement ID is not recognized.
233 /// - If [MovementStatus::Pending] is given.
234 /// - If a database error occurs.
235 pub async fn finish_movement(
236 &self,
237 id: MovementId,
238 new_status: MovementStatus,
239 ) -> anyhow::Result<(), MovementError> {
240 if new_status == MovementStatus::Pending {
241 return Err(MovementError::IncorrectPendingStatus);
242 }
243
244 // Ensure the movement is loaded.
245 self.load_movement_into_cache(id).await?;
246
247 // Update the status and persist it.
248 let lock = self.get_movement_lock(id).await?;
249 let mut movement = lock.write().await;
250 movement.status = new_status;
251 movement.time.completed_at = Some(chrono::Local::now());
252 self.db.update_movement(&*movement).await
253 .map_err(|e| MovementError::PersisterError { id, e })?;
254 self.unload_movement_from_cache(id).await
255 }
256
257 /// Applies a [MovementUpdate] before finalizing the movement with
258 /// [MovementManager::finish_movement].
259 ///
260 /// Parameters:
261 /// - id: The ID of the movement previously created by [MovementManager::new_movement].
262 /// - new_status: The final [MovementStatus] to set. This can't be [MovementStatus::Pending].
263 /// - update: Contains information to apply to the movement before finalizing it.
264 ///
265 /// Errors:
266 /// - If the movement ID is not recognized.
267 /// - If [MovementStatus::Pending] is given.
268 /// - If a database error occurs.
269 pub async fn finish_movement_with_update(
270 &self,
271 id: MovementId,
272 new_status: MovementStatus,
273 update: MovementUpdate,
274 ) -> anyhow::Result<(), MovementError> {
275 self.update_movement(id, update).await?;
276 self.finish_movement(id, new_status).await
277 }
278
279 async fn get_movement_lock(
280 &self,
281 id: MovementId,
282 ) -> anyhow::Result<Arc<RwLock<Movement>>, MovementError> {
283 self.active_movements
284 .read()
285 .await
286 .get(&id)
287 .cloned()
288 .ok_or(MovementError::CacheError { id })
289 }
290
291 async fn load_movement_into_cache(&self, id: MovementId) -> anyhow::Result<(), MovementError> {
292 if self.active_movements.read().await.contains_key(&id) {
293 return Ok(());
294 }
295 // Acquire a write lock and check if another thread already loaded the movement.
296 let mut movements = self.active_movements.write().await;
297 if movements.contains_key(&id) {
298 return Ok(());
299 }
300 let movement = self.db.get_movement_by_id(id).await
301 .map_err(|e| MovementError::LoadError { id, e })?;
302 movements.insert(id, Arc::new(RwLock::new(movement)));
303 Ok(())
304 }
305
306 async fn unload_movement_from_cache(&self, id: MovementId) -> anyhow::Result<(), MovementError> {
307 let mut lock = self.active_movements.write().await;
308 lock.remove(&id);
309 Ok(())
310 }
311}
312
313/// Determines the state to set a [Movement] to when a [MovementGuard] is dropped.
314///
315/// See [MovementGuard::new] for more information.
316#[derive(Debug, Copy, Clone, PartialEq, Eq)]
317pub enum OnDropStatus {
318 /// Marks the [Movement] as [MovementStatus::Canceled].
319 Canceled,
320 /// Marks the [Movement] as [MovementStatus::Failed].
321 Failed,
322}
323
324impl From<OnDropStatus> for MovementStatus {
325 fn from(status: OnDropStatus) -> Self {
326 match status {
327 OnDropStatus::Canceled => MovementStatus::Canceled,
328 OnDropStatus::Failed => MovementStatus::Failed,
329 }
330 }
331}
332
333/// A RAII helper class to ensure that pending movements get marked as finished in case an error
334/// occurs. You can construct a guard for an existing [Movement] with [MovementGuard::new].
335/// Alternatively, a [MovementGuard] can be coupled to a movement using
336/// [MovementGuard::new].
337///
338/// When the [MovementGuard] is dropped from the stack, it will finalize the movement according to
339/// the configured [OnDropStatus] unless [MovementGuard::success] has already been called.
340pub struct MovementGuard {
341 id: MovementId,
342 manager: Arc<MovementManager>,
343 on_drop: OnDropStatus,
344 has_finished: bool,
345}
346
347impl<'a> MovementGuard {
348 /// Constructs a [MovementGuard] to manage a pre-existing [Movement].
349 ///
350 /// Parameters:
351 /// - id: The ID of the [Movement] to update.
352 /// - manager: A reference to the [MovementManager] so the guard can update the [Movement].
353 /// - on_drop: Determines what status the movement will be set to when the guard is dropped.
354 pub fn new(
355 id: MovementId,
356 manager: Arc<MovementManager>,
357 on_drop: OnDropStatus,
358 ) -> Self {
359 Self {
360 id,
361 manager,
362 on_drop,
363 has_finished: false,
364 }
365 }
366
367 /// Gets the [MovementId] stored by this guard.
368 pub fn id(&self) -> MovementId {
369 self.id
370 }
371
372 /// Sets a different [OnDropStatus] to apply to the movement upon dropping the [MovementGuard].
373 ///
374 /// Parameters:
375 /// - on_drop: Determines what status the movement will be set to when the guard is dropped.
376 pub fn set_on_drop_status(&mut self, status: OnDropStatus) {
377 self.on_drop = status;
378 }
379
380 /// Applies an update to the managed [Movement].
381 ///
382 /// See [MovementManager::update_movement] for more information.
383 ///
384 /// Parameters:
385 /// - update: Specifies properties to set on the movement. `Option` fields will be ignored if
386 /// they are `None`. `Some` will result in that particular field being overwritten.
387 pub async fn apply_update(
388 &self,
389 update: MovementUpdate,
390 ) -> anyhow::Result<(), MovementError> {
391 self.manager.update_movement(self.id, update).await
392 }
393
394 /// Same as [MovementGuard::success] but sets [Movement::status] to [MovementStatus::Canceled].
395 pub async fn cancel(&mut self) -> anyhow::Result<(), MovementError> {
396 self.stop();
397 self.manager.finish_movement(self.id, MovementStatus::Canceled).await
398 }
399
400 /// Same as [MovementGuard::success] but sets [Movement::status] to [MovementStatus::Failed].
401 pub async fn fail(&mut self) -> anyhow::Result<(), MovementError> {
402 self.stop();
403 self.manager.finish_movement(self.id, MovementStatus::Failed).await
404 }
405
406 /// Finalizes a movement, setting it to [MovementStatus::Successful]. If the [MovementGuard] is
407 /// dropped after calling this function, no further changes will be made to the [Movement].
408 ///
409 /// See [MovementManager::finish_movement] for more information.
410 pub async fn success(
411 &mut self,
412 ) -> anyhow::Result<(), MovementError> {
413 self.stop();
414 self.manager.finish_movement(self.id, MovementStatus::Successful).await
415 }
416
417 /// Prevents the guard from making further changes to the movement after being dropped. Manual
418 /// actions such as [MovementGuard::apply_update] will continue to work.
419 pub fn stop(&mut self) {
420 self.has_finished = true;
421 }
422}
423
424impl Drop for MovementGuard {
425 fn drop(&mut self) {
426 if !self.has_finished {
427 // Asynchronously mark the movement as finished since we are being dropped.
428 let manager = self.manager.clone();
429 let id = self.id;
430 let on_drop = self.on_drop;
431 tokio::spawn(async move {
432 if let Err(e) = manager.finish_movement(id, on_drop.into()).await {
433 log::error!("An error occurred in MovementGuard::drop(): {:#}", e);
434 }
435 });
436 }
437 }
438}