bark/movement/
manager.rs

1use std::cmp::PartialEq;
2use std::collections::{HashMap, HashSet};
3use std::sync::Arc;
4
5use tokio::sync::RwLock;
6
7use crate::movement::{Movement, MovementId, MovementStatus, MovementSubsystem};
8use crate::movement::error::MovementError;
9use crate::movement::update::MovementUpdate;
10use crate::persist::BarkPersister;
11use crate::subsystem::Subsystem;
12
13/// A minimalist helper class to handle movement registration and updating based on unique
14/// [SubsystemId] values.
15pub struct MovementManager {
16	db: Arc<dyn BarkPersister>,
17	subsystem_ids: RwLock<HashSet<Subsystem>>,
18	active_movements: RwLock<HashMap<MovementId, Arc<RwLock<Movement>>>>,
19}
20
21impl MovementManager {
22	/// Creates an instances of the [MovementManager].
23	pub fn new(db: Arc<dyn BarkPersister>) -> Self {
24		Self {
25			db,
26			subsystem_ids: RwLock::new(HashSet::new()),
27			active_movements: RwLock::new(HashMap::new()),
28		}
29	}
30
31	/// Registers a subsystem with the movement manager. Subsystems are identified using unique
32	/// names, to maintain this guarantee a unique [SubsystemId] will be generated and returned by
33	/// this function. Future calls to register or modify movements must provide this ID.
34	pub async fn register_subsystem(&self, id: Subsystem) -> anyhow::Result<(), MovementError> {
35		let mut guard = self.subsystem_ids.write().await;
36		if guard.contains(&id) {
37			Err(MovementError::SubsystemError {
38				id, error: "Subsystem already registered".into(),
39			})
40		} else {
41			guard.insert(id);
42			Ok(())
43		}
44	}
45
46	/// Begins the process of creating a new movement. This newly created movement will be defaulted
47	/// to a [MovementStatus::Pending] state. It can then be updated by using [MovementUpdate] in
48	/// combination with [MovementManager::update_movement].
49	///
50	/// [MovementManager::finish_movement] can be used once a movement has finished (whether
51	/// successful or not).
52	///
53	/// Parameters:
54	/// - subsystem_id: The ID of the subsystem that wishes to start a new movement.
55	/// - movement_kind: A descriptor for the type of movement being performed, e.g. "send",
56	///   "receive", "round".
57	///
58	/// Errors:
59	/// - If the subsystem ID is not recognized.
60	/// - If a database error occurs.
61	pub async fn new_movement(
62		&self,
63		subsystem_id: Subsystem,
64		movement_kind: impl Into<String>,
65	) -> anyhow::Result<MovementId, MovementError> {
66		self.db.create_new_movement(
67			MovementStatus::Pending,
68			&MovementSubsystem {
69				name: subsystem_id.as_name().to_string(),
70				kind: movement_kind.into(),
71			},
72			chrono::Local::now(),
73		).await.map_err(|e| MovementError::CreationError { e })
74	}
75
76	/// Creates a new [Movement] and returns a [MovementGuard] to manage it. The guard will call
77	/// [MovementManager::finish_movement] on drop unless [MovementGuard::success] has already been
78	/// called.
79	///
80	/// See [MovementManager::new_movement] and [MovementGuard::new] for more information.
81	///
82	/// Parameters:
83	/// - subsystem_id: The ID of the subsystem that wishes to start a new movement.
84	/// - movement_kind: A descriptor for the type of movement being performed, e.g. "send",
85	///   "receive", "round".
86	/// - on_drop: Determines what status the movement will be set to when the guard is dropped.
87	pub async fn new_guarded_movement(
88		self: &Arc<Self>,
89		subsystem_id: Subsystem,
90		movement_kind: impl Into<String>,
91		on_drop: OnDropStatus,
92	) -> anyhow::Result<MovementGuard, MovementError> {
93		Ok(MovementGuard::new(
94			self.new_movement(subsystem_id, movement_kind).await?, self.clone(), on_drop,
95		))
96	}
97
98	/// Similar to [MovementManager::new_movement] but it immediately calls
99	/// [MovementManager::update_movement] afterward.
100	///
101	/// Parameters:
102	/// - subsystem_id: The ID of the subsystem that wishes to start a new movement.
103	/// - movement_kind: A descriptor for the type of movement being performed, e.g. "send",
104	///   "receive", "round".
105	/// - update: Describes the initial state of the movement.
106	///
107	/// Errors:
108	/// - If the subsystem ID is not recognized.
109	/// - If a database error occurs.
110	pub async fn new_movement_with_update(
111		&self,
112		subsystem_id: Subsystem,
113		movement_kind: impl Into<String>,
114		update: MovementUpdate,
115	) -> anyhow::Result<MovementId, MovementError> {
116		let id = self.new_movement(subsystem_id, movement_kind).await?;
117		self.update_movement(id, update).await?;
118		Ok(id)
119	}
120
121	/// Similar to [MovementManager::new_guarded_movement] but it immediately calls
122	/// [MovementManager::update_movement] after creating the [Movement].
123	///
124	/// Parameters:
125	/// - subsystem_id: The ID of the subsystem that wishes to start a new movement.
126	/// - movement_kind: A descriptor for the type of movement being performed, e.g. "send",
127	///   "receive", "round".
128	/// - on_drop: Determines what status the movement will be set to when the guard is dropped.
129	/// - update: Describes the initial state of the movement.
130	///
131	/// Errors:
132	/// - If the subsystem ID is not recognized.
133	/// - If a database error occurs.
134	pub async fn new_guarded_movement_with_update(
135		self: &Arc<Self>,
136		subsystem_id: Subsystem,
137		movement_kind: impl Into<String>,
138		on_drop: OnDropStatus,
139		update: MovementUpdate,
140	) -> anyhow::Result<MovementGuard, MovementError> {
141		Ok(MovementGuard::new(
142			self.new_movement_with_update(subsystem_id, movement_kind, update).await?,
143			self.clone(),
144			on_drop,
145		))
146	}
147
148	/// Creates and marks a [Movement] as finished based on the given parameters. This is useful for
149	/// one-shot movements where the details are known at the time of creation, an example would be
150	/// when receiving funds asynchronously from a third party.
151	///
152	/// Parameters:
153	/// - subsystem_id: The ID of the subsystem that wishes to start a new movement.
154	/// - movement_kind: A descriptor for the type of movement being performed, e.g. "send",
155	///   "receive", "round".
156	/// - status: The [MovementStatus] to set. This can't be [MovementStatus::Pending].
157	/// - details: Contains information about the movement, e.g. what VTXOs were consumed or
158	///   produced.
159	///
160	/// Errors:
161	/// - If the subsystem ID is not recognized.
162	/// - If [MovementStatus::Pending] is given.
163	/// - If a database error occurs.
164	pub async fn new_finished_movement(
165		&self,
166		subsystem_id: Subsystem,
167		movement_kind: impl Into<String>,
168		status: MovementStatus,
169		details: MovementUpdate,
170	) -> anyhow::Result<MovementId, MovementError> {
171		if status == MovementStatus::Pending {
172			return Err(MovementError::IncorrectPendingStatus);
173		}
174		let id = self.new_movement(subsystem_id, movement_kind).await?;
175		let mut movement = self.db.get_movement_by_id(id).await
176			.map_err(|e| MovementError::LoadError { id, e })?;
177		let at = chrono::Local::now();
178		details.apply_to(&mut movement, at);
179		movement.status = status;
180		movement.time.completed_at = Some(at);
181		self.db.update_movement(&movement).await
182			.map_err(|e| MovementError::PersisterError { id, e })?;
183		Ok(id)
184	}
185
186	/// Updates a movement with the given parameters.
187	///
188	/// See also: [MovementManager::new_movement] and [MovementManager::finish_movement]
189	///
190	/// Parameters:
191	/// - id: The ID of the movement previously created by [MovementManager::new_movement].
192	/// - update: Specifies properties to set on the movement. `Option` fields will be ignored if
193	///   they are `None`. `Some` will result in that particular field being overwritten.
194	///
195	/// Errors:
196	/// - If the [MovementId] is not recognized.
197	/// - If a movement is not [MovementStatus::Pending].
198	/// - If a database error occurs.
199	pub async fn update_movement(
200		&self,
201		id: MovementId,
202		update: MovementUpdate,
203	) -> anyhow::Result<(), MovementError> {
204		// Ensure the movement is loaded.
205		self.load_movement_into_cache(id).await?;
206
207		// Apply the update to the movement.
208		update.apply_to(&mut *self.get_movement_lock(id).await?.write().await, chrono::Local::now());
209
210		// Persist the changes using a read lock.
211		let lock = self.get_movement_lock(id).await?;
212		let movement = lock.read().await;
213		self.db.update_movement(&movement).await
214			.map_err(|e| MovementError::PersisterError { id, e })?;
215
216		// Drop the movement if it's in a finished state as this was likely a one-time update.
217		if movement.status != MovementStatus::Pending {
218			self.unload_movement_from_cache(id).await?;
219		}
220		Ok(())
221	}
222
223	/// Finalizes a movement, setting it to the given [MovementStatus].
224	///
225	/// See also: [MovementManager::new_movement] and [MovementManager::update_movement]
226	///
227	/// Parameters:
228	/// - id: The ID of the movement previously created by [MovementManager::new_movement].
229	/// - new_status: The final [MovementStatus] to set. This can't be [MovementStatus::Pending].
230	///
231	/// Errors:
232	/// - If the movement ID is not recognized.
233	/// - If [MovementStatus::Pending] is given.
234	/// - If a database error occurs.
235	pub async fn finish_movement(
236		&self,
237		id: MovementId,
238		new_status: MovementStatus,
239	) -> anyhow::Result<(), MovementError> {
240		if new_status == MovementStatus::Pending {
241			return Err(MovementError::IncorrectPendingStatus);
242		}
243
244		// Ensure the movement is loaded.
245		self.load_movement_into_cache(id).await?;
246
247		// Update the status and persist it.
248		let lock = self.get_movement_lock(id).await?;
249		let mut movement = lock.write().await;
250		movement.status = new_status;
251		movement.time.completed_at = Some(chrono::Local::now());
252		self.db.update_movement(&*movement).await
253			.map_err(|e| MovementError::PersisterError { id, e })?;
254		self.unload_movement_from_cache(id).await
255	}
256
257	/// Applies a [MovementUpdate] before finalizing the movement with
258	/// [MovementManager::finish_movement].
259	///
260	/// Parameters:
261	/// - id: The ID of the movement previously created by [MovementManager::new_movement].
262	/// - new_status: The final [MovementStatus] to set. This can't be [MovementStatus::Pending].
263	/// - update: Contains information to apply to the movement before finalizing it.
264	///
265	/// Errors:
266	/// - If the movement ID is not recognized.
267	/// - If [MovementStatus::Pending] is given.
268	/// - If a database error occurs.
269	pub async fn finish_movement_with_update(
270		&self,
271		id: MovementId,
272		new_status: MovementStatus,
273		update: MovementUpdate,
274	) -> anyhow::Result<(), MovementError> {
275		self.update_movement(id, update).await?;
276		self.finish_movement(id, new_status).await
277	}
278
279	async fn get_movement_lock(
280		&self,
281		id: MovementId,
282	) -> anyhow::Result<Arc<RwLock<Movement>>, MovementError> {
283		self.active_movements
284			.read()
285			.await
286			.get(&id)
287			.cloned()
288			.ok_or(MovementError::CacheError { id })
289	}
290
291	async fn load_movement_into_cache(&self, id: MovementId) -> anyhow::Result<(), MovementError> {
292		if self.active_movements.read().await.contains_key(&id) {
293			return Ok(());
294		}
295		// Acquire a write lock and check if another thread already loaded the movement.
296		let mut movements = self.active_movements.write().await;
297		if movements.contains_key(&id) {
298			return Ok(());
299		}
300		let movement = self.db.get_movement_by_id(id).await
301			.map_err(|e| MovementError::LoadError { id, e })?;
302		movements.insert(id, Arc::new(RwLock::new(movement)));
303		Ok(())
304	}
305
306	async fn unload_movement_from_cache(&self, id: MovementId) -> anyhow::Result<(), MovementError> {
307		let mut lock = self.active_movements.write().await;
308		lock.remove(&id);
309		Ok(())
310	}
311}
312
313/// Determines the state to set a [Movement] to when a [MovementGuard] is dropped.
314///
315/// See [MovementGuard::new] for more information.
316#[derive(Debug, Copy, Clone, PartialEq, Eq)]
317pub enum OnDropStatus {
318	/// Marks the [Movement] as [MovementStatus::Canceled].
319	Canceled,
320	/// Marks the [Movement] as [MovementStatus::Failed].
321	Failed,
322}
323
324impl From<OnDropStatus> for MovementStatus {
325	fn from(status: OnDropStatus) -> Self {
326		match status {
327			OnDropStatus::Canceled => MovementStatus::Canceled,
328			OnDropStatus::Failed => MovementStatus::Failed,
329		}
330	}
331}
332
333/// A RAII helper class to ensure that pending movements get marked as finished in case an error
334/// occurs. You can construct a guard for an existing [Movement] with [MovementGuard::new].
335/// Alternatively, a [MovementGuard] can be coupled to a movement using
336/// [MovementGuard::new].
337///
338/// When the [MovementGuard] is dropped from the stack, it will finalize the movement according to
339/// the configured [OnDropStatus] unless [MovementGuard::success] has already been called.
340pub struct MovementGuard {
341	id: MovementId,
342	manager: Arc<MovementManager>,
343	on_drop: OnDropStatus,
344	has_finished: bool,
345}
346
347impl<'a> MovementGuard {
348	/// Constructs a [MovementGuard] to manage a pre-existing [Movement].
349	///
350	/// Parameters:
351	/// - id: The ID of the [Movement] to update.
352	/// - manager: A reference to the [MovementManager] so the guard can update the [Movement].
353	/// - on_drop: Determines what status the movement will be set to when the guard is dropped.
354	pub fn new(
355		id: MovementId,
356		manager: Arc<MovementManager>,
357		on_drop: OnDropStatus,
358	) -> Self {
359		Self {
360			id,
361			manager,
362			on_drop,
363			has_finished: false,
364		}
365	}
366
367	/// Gets the [MovementId] stored by this guard.
368	pub fn id(&self) -> MovementId {
369		self.id
370	}
371
372	/// Sets a different [OnDropStatus] to apply to the movement upon dropping the [MovementGuard].
373	///
374	/// Parameters:
375	/// - on_drop: Determines what status the movement will be set to when the guard is dropped.
376	pub fn set_on_drop_status(&mut self, status: OnDropStatus) {
377		self.on_drop = status;
378	}
379
380	/// Applies an update to the managed [Movement].
381	///
382	/// See [MovementManager::update_movement] for more information.
383	///
384	/// Parameters:
385	/// - update: Specifies properties to set on the movement. `Option` fields will be ignored if
386	///   they are `None`. `Some` will result in that particular field being overwritten.
387	pub async fn apply_update(
388		&self,
389		update: MovementUpdate,
390	) -> anyhow::Result<(), MovementError> {
391		self.manager.update_movement(self.id, update).await
392	}
393
394	/// Same as [MovementGuard::success] but sets [Movement::status] to [MovementStatus::Canceled].
395	pub async fn cancel(&mut self) -> anyhow::Result<(), MovementError> {
396		self.stop();
397		self.manager.finish_movement(self.id, MovementStatus::Canceled).await
398	}
399
400	/// Same as [MovementGuard::success] but sets [Movement::status] to [MovementStatus::Failed].
401	pub async fn fail(&mut self) -> anyhow::Result<(), MovementError> {
402		self.stop();
403		self.manager.finish_movement(self.id, MovementStatus::Failed).await
404	}
405
406	/// Finalizes a movement, setting it to [MovementStatus::Successful]. If the [MovementGuard] is
407	/// dropped after calling this function, no further changes will be made to the [Movement].
408	///
409	/// See [MovementManager::finish_movement] for more information.
410	pub async fn success(
411		&mut self,
412	) -> anyhow::Result<(), MovementError> {
413		self.stop();
414		self.manager.finish_movement(self.id, MovementStatus::Successful).await
415	}
416
417	/// Prevents the guard from making further changes to the movement after being dropped. Manual
418	/// actions such as [MovementGuard::apply_update] will continue to work.
419	pub fn stop(&mut self) {
420		self.has_finished = true;
421	}
422}
423
424impl Drop for MovementGuard {
425	fn drop(&mut self) {
426		if !self.has_finished {
427			// Asynchronously mark the movement as finished since we are being dropped.
428			let manager = self.manager.clone();
429			let id = self.id;
430			let on_drop = self.on_drop;
431			tokio::spawn(async move {
432				if let Err(e) = manager.finish_movement(id, on_drop.into()).await {
433					log::error!("An error occurred in MovementGuard::drop(): {:#}", e);
434				}
435			});
436		}
437	}
438}