bark/movement/
manager.rs

1use std::cmp::PartialEq;
2use std::collections::HashMap;
3use std::sync::Arc;
4
5use chrono::DateTime;
6use tokio::sync::RwLock;
7
8use crate::movement::{Movement, MovementId, MovementStatus, MovementSubsystem};
9use crate::movement::error::MovementError;
10use crate::movement::update::MovementUpdate;
11use crate::persist::BarkPersister;
12use crate::subsystem::SubsystemId;
13
14/// A minimalist helper class to handle movement registration and updating based on unique
15/// [SubsystemId] values.
16pub struct MovementManager {
17	db: Arc<dyn BarkPersister>,
18	subsystem_ids: RwLock<HashMap<SubsystemId, String>>,
19	active_movements: RwLock<HashMap<MovementId, Arc<RwLock<Movement>>>>,
20}
21
22impl MovementManager {
23	/// Creates an instances of the [MovementManager].
24	pub fn new(db: Arc<dyn BarkPersister>) -> Self {
25		Self {
26			db,
27			subsystem_ids: RwLock::new(HashMap::new()),
28			active_movements: RwLock::new(HashMap::new()),
29		}
30	}
31
32	/// Registers a subsystem with the movement manager. Subsystems are identified using unique
33	/// names, to maintain this guarantee a unique [SubsystemId] will be generated and returned by
34	/// this function. Future calls to register or modify movements must provide this ID.
35	pub async fn register_subsystem(&self, name: String) -> anyhow::Result<SubsystemId, MovementError> {
36		let exists = self.subsystem_ids.read().await.iter().any(|(_, n)| n == &name);
37		if exists {
38			Err(MovementError::SubsystemError {
39				name, error: "Subsystem already registered".into(),
40			})
41		} else {
42			let mut ids = self.subsystem_ids.write().await;
43			let id = SubsystemId::new(ids.len() as u32);
44				ids.insert(id, name);
45				Ok(id)
46			}
47	}
48
49	/// Similar to [MovementManager::new_movement_at] but it sets the [Movement::created_at] field
50	/// to the current time.
51	pub async fn new_movement(
52		&self,
53		subsystem_id: SubsystemId,
54		movement_kind: String,
55	) -> anyhow::Result<MovementId, MovementError> {
56		self.new_movement_at(subsystem_id, movement_kind, chrono::Local::now()).await
57	}
58
59	/// Begins the process of creating a new movement. This newly created movement will be defaulted
60	/// to a [MovementStatus::Pending] state. It can then be updated by using [MovementUpdate] in
61	/// combination with [MovementManager::update_movement].
62	///
63	/// [MovementManager::finish_movement] can be used once a movement has finished (whether
64	/// successful or not).
65	///
66	/// Parameters:
67	/// - subsystem_id: The ID of the subsystem that wishes to start a new movement.
68	/// - movement_kind: A descriptor for the type of movement being performed, e.g. "send",
69	///   "receive", "round".
70	/// - at: The timestamp to set the [MovementTimestamp::created_at] field to.
71	///
72	/// Errors:
73	/// - If the subsystem ID is not recognized.
74	/// - If a database error occurs.
75	pub async fn new_movement_at(
76		&self,
77		subsystem_id: SubsystemId,
78		movement_kind: String,
79		at: DateTime<chrono::Local>,
80	) -> anyhow::Result<MovementId, MovementError> {
81		self.db.create_new_movement(
82			MovementStatus::Pending,
83			&MovementSubsystem {
84				name: self.get_subsystem_name(subsystem_id).await?,
85				kind: movement_kind,
86			},
87			at,
88		).map_err(|e| MovementError::CreationError { e })
89	}
90
91	/// Similar to [MovementManager::new_finished_movement_at] but it sets the
92	/// [Movement::created_at] field to the current time.
93	pub async fn new_finished_movement(
94		&self,
95		subsystem_id: SubsystemId,
96		movement_kind: String,
97		status: MovementStatus,
98		details: MovementUpdate,
99	) -> anyhow::Result<MovementId, MovementError> {
100		self.new_finished_movement_at(
101			subsystem_id, movement_kind, status, details, chrono::Local::now(),
102		).await
103	}
104
105	/// Creates and marks a [Movement] as finished based on the given parameters. This is useful for
106	/// one-shot movements where the details are known at time of creation, an example would be when
107	/// receiving funds asynchronously from a third party.
108	///
109	/// Parameters:
110	/// - subsystem_id: The ID of the subsystem that wishes to start a new movement.
111	/// - movement_kind: A descriptor for the type of movement being performed, e.g. "send",
112	///   "receive", "round".
113	/// - status: The [MovementStatus] to set. This can't be [MovementStatus::Pending].
114	/// - details: Contains information about the movement, e.g. what VTXOs were consumed or
115	///   produced.
116	/// - at: The timestamp to set the [Movement::time] field to.
117	///
118	/// Errors:
119	/// - If the subsystem ID is not recognized.
120	/// - If [MovementStatus::Pending] is given.
121	/// - If a database error occurs.
122	pub async fn new_finished_movement_at(
123		&self,
124		subsystem_id: SubsystemId,
125		movement_kind: String,
126		status: MovementStatus,
127		details: MovementUpdate,
128		at: DateTime<chrono::Local>,
129	) -> anyhow::Result<MovementId, MovementError> {
130		if status == MovementStatus::Pending {
131			return Err(MovementError::IncorrectStatus { status: status.as_str().into() });
132		}
133		let id = self.new_movement_at(subsystem_id, movement_kind, at).await?;
134		let mut movement = self.db.get_movement_by_id(id)
135			.map_err(|e| MovementError::LoadError { id, e })?;
136		details.apply_to(&mut movement, at);
137		movement.status = status;
138		movement.time.completed_at = Some(at);
139		self.db.update_movement(&movement)
140			.map_err(|e| MovementError::PersisterError { id, e })?;
141		Ok(id)
142	}
143
144	/// Similar to [MovementManager::update_movement_at] but it sets the
145	/// [MovementTimestamp::updated_at] field to the current time.
146	pub async fn update_movement(
147		&self,
148		id: MovementId,
149		update: MovementUpdate,
150	) -> anyhow::Result<(), MovementError> {
151		self.update_movement_at(id, update, chrono::Local::now()).await
152	}
153
154	/// Updates a movement with the given parameters.
155	///
156	/// See also: [MovementManager::create_movement] and [MovementManager::finish_movement]
157	///
158	/// Parameters:
159	/// - id: The ID of the movement previously created by [MovementManager::new_movement].
160	/// - update: Specifies properties to set on the movement. `Option` fields will be ignored if
161	///   they are `None`. `Some` will result in that particular field being overwritten.
162	/// - at: The timestamp to set the [MovementTimestamp::completed_at] field to.
163	///
164	/// Errors:
165	/// - If the [MovementId] is not recognized.
166	/// - If a movement is not [MovementStatus::Pending].
167	/// - If a database error occurs.
168	pub async fn update_movement_at(
169		&self,
170		id: MovementId,
171		update: MovementUpdate,
172		at: DateTime<chrono::Local>,
173	) -> anyhow::Result<(), MovementError> {
174
175		// Ensure the movement is loaded.
176		self.load_movement_into_cache(id).await?;
177
178		// Apply the update to the movement.
179		update.apply_to(&mut *self.get_movement_lock(id).await?.write().await, at);
180
181		// Persist the changes using a read lock.
182		let lock = self.get_movement_lock(id).await?;
183		let movement = lock.read().await;
184		self.db.update_movement(&movement)
185			.map_err(|e| MovementError::PersisterError { id, e })?;
186
187		// Drop the movement if it's in a finished state as this was likely a one-time update.
188		if movement.status != MovementStatus::Pending {
189			self.unload_movement_from_cache(id).await?;
190		}
191		Ok(())
192	}
193
194	/// Similar to [MovementManager::finish_movement] but it sets the
195	/// [MovementTimestamp::completed_at] field to the current time.
196	pub async fn finish_movement(
197		&self,
198		id: MovementId,
199		new_status: MovementStatus,
200	) -> anyhow::Result<(), MovementError> {
201		self.finish_movement_at(id, new_status, chrono::Local::now()).await
202	}
203
204	/// Finalizes a movement, setting it to the given [MovementStatus].
205	///
206	/// See also: [MovementManager::create_movement] and [MovementManager::update_movement]
207	///
208	/// Parameters:
209	/// - id: The ID of the movement previously created by [MovementManager::new_movement].
210	/// - new_status: The final [MovementStatus] to set. This can't be [MovementStatus::Pending].
211	/// - at: The timestamp to set the [MovementTimestamp::completed_at] field to.
212	///
213	/// Errors:
214	/// - If the movement ID is not recognized.
215	/// - If [MovementStatus::Pending] is given.
216	/// - If a database error occurs.
217	pub async fn finish_movement_at(
218		&self,
219		id: MovementId,
220		new_status: MovementStatus,
221		at: DateTime<chrono::Local>,
222	) -> anyhow::Result<(), MovementError> {
223		if new_status == MovementStatus::Pending {
224			return Err(MovementError::IncorrectStatus { status: new_status.as_str().into() });
225		}
226
227		// Ensure the movement is loaded.
228		self.load_movement_into_cache(id).await?;
229
230		// Update the status and persist it.
231		let lock = self.get_movement_lock(id).await?;
232		let mut movement = lock.write().await;
233		movement.status = new_status;
234		movement.time.completed_at = Some(at);
235		self.db.update_movement(&*movement)
236			.map_err(|e| MovementError::PersisterError { id, e })?;
237		self.unload_movement_from_cache(id).await
238	}
239
240	async fn get_movement_lock(
241		&self,
242		id: MovementId,
243	) -> anyhow::Result<Arc<RwLock<Movement>>, MovementError> {
244		self.active_movements
245			.read()
246			.await
247			.get(&id)
248			.cloned()
249			.ok_or(MovementError::CacheError { id })
250	}
251
252	async fn get_subsystem_name(&self, id: SubsystemId) -> anyhow::Result<String, MovementError> {
253		self.subsystem_ids
254			.read()
255			.await
256			.get(&id)
257			.cloned()
258			.ok_or(MovementError::InvalidSubsystemId { id })
259	}
260
261	async fn load_movement_into_cache(&self, id: MovementId) -> anyhow::Result<(), MovementError> {
262		if self.active_movements.read().await.contains_key(&id) {
263			return Ok(());
264		}
265		// Acquire a write lock and check if another thread already loaded the movement.
266		let mut movements = self.active_movements.write().await;
267		if movements.contains_key(&id) {
268			return Ok(());
269		}
270		let movement = self.db.get_movement_by_id(id)
271			.map_err(|e| MovementError::LoadError { id, e })?;
272		movements.insert(id, Arc::new(RwLock::new(movement)));
273		Ok(())
274	}
275
276	async fn unload_movement_from_cache(&self, id: MovementId) -> anyhow::Result<(), MovementError> {
277		let mut lock = self.active_movements.write().await;
278		lock.remove(&id);
279		Ok(())
280	}
281}
282
283/// Determines the state to set a [Movement] to when a [MovementGuard] is dropped.
284///
285/// See [MovementGuard::new_movement] for more information.
286#[derive(Debug, Copy, Clone, PartialEq, Eq)]
287pub enum OnDropStatus {
288	/// Marks the [Movement] as [MovementStatus::Cancelled].
289	Cancelled,
290	/// Marks the [Movement] as [MovementStatus::Failed].
291	Failed,
292}
293
294impl From<OnDropStatus> for MovementStatus {
295	fn from(status: OnDropStatus) -> Self {
296		match status {
297			OnDropStatus::Cancelled => MovementStatus::Cancelled,
298			OnDropStatus::Failed => MovementStatus::Failed,
299		}
300	}
301}
302
303/// A RAII helper class to ensure that pending movements get marked as finished in case an error
304/// occurs. You can construct a guard for an existing [Movement] with [MovementGuard::new].
305/// Alternatively, a [MovementGuard] can be coupled to a movement using
306/// [MovementGuard::new_movement].
307///
308/// When the [MovementGuard] is dropped from the stack, it will finalize the movement according to
309/// the configured [OnDropStatus] unless [MovementGuard::finish] has already been called.
310pub struct MovementGuard {
311	id: MovementId,
312	manager: Arc<MovementManager>,
313	on_drop: OnDropStatus,
314	has_finished: bool,
315}
316
317impl<'a> MovementGuard {
318	/// Constructs a [MovementGuard] to manage a pre-existing [Movement].
319	///
320	/// Parameters:
321	/// - id: The ID of the [Movement] to update.
322	/// - manager: A reference to the [MovementManager] so the guard can update the [Movement].
323	pub fn new(
324		id: MovementId,
325		manager: Arc<MovementManager>,
326	) -> Self {
327		Self {
328			id,
329			manager,
330			on_drop: OnDropStatus::Failed,
331			has_finished: false,
332		}
333	}
334
335	/// Constructs a [MovementGuard] and creates a new [Movement] for the guard to manage.
336	///
337	/// See [MovementManager::new_movement] for more information.
338	///
339	/// Parameters:
340	/// - manager: A reference to the [MovementManager] so the guard can update the [Movement].
341	/// - subsystem_id: The ID of the subsystem that wishes to start a new movement.
342	/// - movement_kind: A descriptor for the type of movement being performed, e.g. "send",
343	///   "receive", "round".
344	pub async fn new_movement(
345		manager: Arc<MovementManager>,
346		subsystem_id: SubsystemId,
347		movement_kind: String,
348	) -> anyhow::Result<Self, MovementError> {
349		let id = manager.new_movement(subsystem_id, movement_kind).await?;
350		Ok(Self {
351			id,
352			manager,
353			on_drop: OnDropStatus::Failed,
354			has_finished: false,
355		})
356	}
357
358	/// Similar to [MovementGuard::new_movement] with the ability to set a custom timestamp.
359	///
360	/// Parameters:
361	/// - manager: A reference to the [MovementManager] so the guard can update the [Movement].
362	/// - subsystem_id: The ID of the subsystem that wishes to start a new movement.
363	/// - movement_kind: A descriptor for the type of movement being performed, e.g. "send",
364	///   "receive", "round".
365	/// - at: The timestamp to set the [MovementTimestamp::created_at] field to.
366	pub async fn new_movement_at(
367		manager: Arc<MovementManager>,
368		subsystem_id: SubsystemId,
369		movement_kind: String,
370		at: DateTime<chrono::Local>,
371	) -> anyhow::Result<Self, MovementError> {
372		let id = manager.new_movement_at(subsystem_id, movement_kind, at).await?;
373		Ok(Self {
374			id,
375			manager,
376			on_drop: OnDropStatus::Failed,
377			has_finished: false,
378		})
379	}
380
381	/// Gets the [MovementId] stored by this guard.
382	pub fn id(&self) -> MovementId {
383		self.id
384	}
385
386	/// Sets a different [OnDropStatus] to apply to the movement upon dropping the [MovementGuard].
387	///
388	/// Parameters:
389	/// - on_drop: Determines what status the movement will be set to when the guard is dropped.
390	pub fn set_on_drop_status(&mut self, status: OnDropStatus) {
391		self.on_drop = status;
392	}
393
394	/// Applies an update to the managed [Movement].
395	///
396	/// See [MovementManager::update_movement] for more information.
397	///
398	/// Parameters:
399	/// - update: Specifies properties to set on the movement. `Option` fields will be ignored if
400	///   they are `None`. `Some` will result in that particular field being overwritten.
401	pub async fn apply_update(
402		&self,
403		update: MovementUpdate,
404	) -> anyhow::Result<(), MovementError> {
405		self.manager.update_movement(self.id, update).await
406	}
407
408	/// Similar to [MovementGuard::apply_update] with the ability to set a custom timestamp.
409	///
410	/// Parameters:
411	/// - update: Specifies properties to set on the movement. `Option` fields will be ignored if
412	///   they are `None`. `Some` will result in that particular field being overwritten.
413	/// - at: The timestamp to set the [MovementTimestamp::completed_at] field to.
414	pub async fn apply_update_at(
415		&self,
416		update: MovementUpdate,
417		at: DateTime<chrono::Local>,
418	) -> anyhow::Result<(), MovementError> {
419		self.manager.update_movement_at(self.id, update, at).await
420	}
421
422	/// Finalizes a movement, setting it to the given [MovementStatus]. If the [MovementGuard] is
423	/// dropped after calling this function, no further changes will be made to the [Movement].
424	///
425	/// See [MovementManager::finish_movement] for more information.
426	///
427	/// Parameters:
428	/// - status: The final [MovementStatus] to set. Must not be [MovementStatus::Pending].
429	pub async fn finish(
430		&mut self,
431		status: MovementStatus,
432	) -> anyhow::Result<(), MovementError> {
433		self.manager.finish_movement(self.id, status).await?;
434		self.has_finished = true;
435		Ok(())
436	}
437
438	/// Finalizes a movement, setting it to the given [MovementStatus]. If the [MovementGuard] is
439	/// dropped after calling this function, no further changes will be made to the [Movement].
440	///
441	/// See [MovementManager::finish_movement] for more information.
442	///
443	/// Parameters:
444	/// - status: The final [MovementStatus] to set. Must not be [MovementStatus::Pending].
445	/// - at: The timestamp to set the [MovementTimestamp::completed_at] field to.
446	pub async fn finish_at(
447		&mut self,
448		status: MovementStatus,
449		at: DateTime<chrono::Local>,
450	) -> anyhow::Result<(), MovementError> {
451		self.manager.finish_movement_at(self.id, status, at).await?;
452		self.has_finished = true;
453		Ok(())
454	}
455
456	/// Prevents the guard from making further changes to the movement after being dropped. Manual
457	/// actions such as [MovementGuard::apply_update] will continue to work.
458	pub fn stop(&mut self) {
459		self.has_finished = true;
460	}
461}
462
463impl Drop for MovementGuard {
464	fn drop(&mut self) {
465		if !self.has_finished {
466			// Asynchronously mark the movement as finished since we are being dropped.
467			let manager = self.manager.clone();
468			let id = self.id;
469			let on_drop = self.on_drop;
470			tokio::spawn(async move {
471				if let Err(e) = manager.finish_movement(id, on_drop.into()).await {
472					log::error!("An error occurred in MovementGuard::drop(): {:#}", e);
473				}
474			});
475		}
476	}
477}