1use std::cmp::PartialEq;
2use std::collections::HashMap;
3use std::sync::Arc;
4
5use chrono::DateTime;
6use tokio::sync::RwLock;
7
8use crate::movement::{Movement, MovementId, MovementStatus, MovementSubsystem};
9use crate::movement::error::MovementError;
10use crate::movement::update::MovementUpdate;
11use crate::persist::BarkPersister;
12use crate::subsystem::SubsystemId;
13
14pub struct MovementManager {
17 db: Arc<dyn BarkPersister>,
18 subsystem_ids: RwLock<HashMap<SubsystemId, String>>,
19 active_movements: RwLock<HashMap<MovementId, Arc<RwLock<Movement>>>>,
20}
21
22impl MovementManager {
23 pub fn new(db: Arc<dyn BarkPersister>) -> Self {
25 Self {
26 db,
27 subsystem_ids: RwLock::new(HashMap::new()),
28 active_movements: RwLock::new(HashMap::new()),
29 }
30 }
31
32 pub async fn register_subsystem(&self, name: String) -> anyhow::Result<SubsystemId, MovementError> {
36 let exists = self.subsystem_ids.read().await.iter().any(|(_, n)| n == &name);
37 if exists {
38 Err(MovementError::SubsystemError {
39 name, error: "Subsystem already registered".into(),
40 })
41 } else {
42 let mut ids = self.subsystem_ids.write().await;
43 let id = SubsystemId::new(ids.len() as u32);
44 ids.insert(id, name);
45 Ok(id)
46 }
47 }
48
49 pub async fn new_movement(
52 &self,
53 subsystem_id: SubsystemId,
54 movement_kind: String,
55 ) -> anyhow::Result<MovementId, MovementError> {
56 self.new_movement_at(subsystem_id, movement_kind, chrono::Local::now()).await
57 }
58
59 pub async fn new_movement_at(
76 &self,
77 subsystem_id: SubsystemId,
78 movement_kind: String,
79 at: DateTime<chrono::Local>,
80 ) -> anyhow::Result<MovementId, MovementError> {
81 self.db.create_new_movement(
82 MovementStatus::Pending,
83 &MovementSubsystem {
84 name: self.get_subsystem_name(subsystem_id).await?,
85 kind: movement_kind,
86 },
87 at,
88 ).map_err(|e| MovementError::CreationError { e })
89 }
90
91 pub async fn new_finished_movement(
94 &self,
95 subsystem_id: SubsystemId,
96 movement_kind: String,
97 status: MovementStatus,
98 details: MovementUpdate,
99 ) -> anyhow::Result<MovementId, MovementError> {
100 self.new_finished_movement_at(
101 subsystem_id, movement_kind, status, details, chrono::Local::now(),
102 ).await
103 }
104
105 pub async fn new_finished_movement_at(
123 &self,
124 subsystem_id: SubsystemId,
125 movement_kind: String,
126 status: MovementStatus,
127 details: MovementUpdate,
128 at: DateTime<chrono::Local>,
129 ) -> anyhow::Result<MovementId, MovementError> {
130 if status == MovementStatus::Pending {
131 return Err(MovementError::IncorrectStatus { status: status.as_str().into() });
132 }
133 let id = self.new_movement_at(subsystem_id, movement_kind, at).await?;
134 let mut movement = self.db.get_movement_by_id(id)
135 .map_err(|e| MovementError::LoadError { id, e })?;
136 details.apply_to(&mut movement, at);
137 movement.status = status;
138 movement.time.completed_at = Some(at);
139 self.db.update_movement(&movement)
140 .map_err(|e| MovementError::PersisterError { id, e })?;
141 Ok(id)
142 }
143
144 pub async fn update_movement(
147 &self,
148 id: MovementId,
149 update: MovementUpdate,
150 ) -> anyhow::Result<(), MovementError> {
151 self.update_movement_at(id, update, chrono::Local::now()).await
152 }
153
154 pub async fn update_movement_at(
169 &self,
170 id: MovementId,
171 update: MovementUpdate,
172 at: DateTime<chrono::Local>,
173 ) -> anyhow::Result<(), MovementError> {
174
175 self.load_movement_into_cache(id).await?;
177
178 update.apply_to(&mut *self.get_movement_lock(id).await?.write().await, at);
180
181 let lock = self.get_movement_lock(id).await?;
183 let movement = lock.read().await;
184 self.db.update_movement(&movement)
185 .map_err(|e| MovementError::PersisterError { id, e })?;
186
187 if movement.status != MovementStatus::Pending {
189 self.unload_movement_from_cache(id).await?;
190 }
191 Ok(())
192 }
193
194 pub async fn finish_movement(
197 &self,
198 id: MovementId,
199 new_status: MovementStatus,
200 ) -> anyhow::Result<(), MovementError> {
201 self.finish_movement_at(id, new_status, chrono::Local::now()).await
202 }
203
204 pub async fn finish_movement_at(
218 &self,
219 id: MovementId,
220 new_status: MovementStatus,
221 at: DateTime<chrono::Local>,
222 ) -> anyhow::Result<(), MovementError> {
223 if new_status == MovementStatus::Pending {
224 return Err(MovementError::IncorrectStatus { status: new_status.as_str().into() });
225 }
226
227 self.load_movement_into_cache(id).await?;
229
230 let lock = self.get_movement_lock(id).await?;
232 let mut movement = lock.write().await;
233 movement.status = new_status;
234 movement.time.completed_at = Some(at);
235 self.db.update_movement(&*movement)
236 .map_err(|e| MovementError::PersisterError { id, e })?;
237 self.unload_movement_from_cache(id).await
238 }
239
240 async fn get_movement_lock(
241 &self,
242 id: MovementId,
243 ) -> anyhow::Result<Arc<RwLock<Movement>>, MovementError> {
244 self.active_movements
245 .read()
246 .await
247 .get(&id)
248 .cloned()
249 .ok_or(MovementError::CacheError { id })
250 }
251
252 async fn get_subsystem_name(&self, id: SubsystemId) -> anyhow::Result<String, MovementError> {
253 self.subsystem_ids
254 .read()
255 .await
256 .get(&id)
257 .cloned()
258 .ok_or(MovementError::InvalidSubsystemId { id })
259 }
260
261 async fn load_movement_into_cache(&self, id: MovementId) -> anyhow::Result<(), MovementError> {
262 if self.active_movements.read().await.contains_key(&id) {
263 return Ok(());
264 }
265 let mut movements = self.active_movements.write().await;
267 if movements.contains_key(&id) {
268 return Ok(());
269 }
270 let movement = self.db.get_movement_by_id(id)
271 .map_err(|e| MovementError::LoadError { id, e })?;
272 movements.insert(id, Arc::new(RwLock::new(movement)));
273 Ok(())
274 }
275
276 async fn unload_movement_from_cache(&self, id: MovementId) -> anyhow::Result<(), MovementError> {
277 let mut lock = self.active_movements.write().await;
278 lock.remove(&id);
279 Ok(())
280 }
281}
282
283#[derive(Debug, Copy, Clone, PartialEq, Eq)]
287pub enum OnDropStatus {
288 Cancelled,
290 Failed,
292}
293
294impl From<OnDropStatus> for MovementStatus {
295 fn from(status: OnDropStatus) -> Self {
296 match status {
297 OnDropStatus::Cancelled => MovementStatus::Cancelled,
298 OnDropStatus::Failed => MovementStatus::Failed,
299 }
300 }
301}
302
303pub struct MovementGuard {
311 id: MovementId,
312 manager: Arc<MovementManager>,
313 on_drop: OnDropStatus,
314 has_finished: bool,
315}
316
317impl<'a> MovementGuard {
318 pub fn new(
324 id: MovementId,
325 manager: Arc<MovementManager>,
326 ) -> Self {
327 Self {
328 id,
329 manager,
330 on_drop: OnDropStatus::Failed,
331 has_finished: false,
332 }
333 }
334
335 pub async fn new_movement(
345 manager: Arc<MovementManager>,
346 subsystem_id: SubsystemId,
347 movement_kind: String,
348 ) -> anyhow::Result<Self, MovementError> {
349 let id = manager.new_movement(subsystem_id, movement_kind).await?;
350 Ok(Self {
351 id,
352 manager,
353 on_drop: OnDropStatus::Failed,
354 has_finished: false,
355 })
356 }
357
358 pub async fn new_movement_at(
367 manager: Arc<MovementManager>,
368 subsystem_id: SubsystemId,
369 movement_kind: String,
370 at: DateTime<chrono::Local>,
371 ) -> anyhow::Result<Self, MovementError> {
372 let id = manager.new_movement_at(subsystem_id, movement_kind, at).await?;
373 Ok(Self {
374 id,
375 manager,
376 on_drop: OnDropStatus::Failed,
377 has_finished: false,
378 })
379 }
380
381 pub fn id(&self) -> MovementId {
383 self.id
384 }
385
386 pub fn set_on_drop_status(&mut self, status: OnDropStatus) {
391 self.on_drop = status;
392 }
393
394 pub async fn apply_update(
402 &self,
403 update: MovementUpdate,
404 ) -> anyhow::Result<(), MovementError> {
405 self.manager.update_movement(self.id, update).await
406 }
407
408 pub async fn apply_update_at(
415 &self,
416 update: MovementUpdate,
417 at: DateTime<chrono::Local>,
418 ) -> anyhow::Result<(), MovementError> {
419 self.manager.update_movement_at(self.id, update, at).await
420 }
421
422 pub async fn finish(
430 &mut self,
431 status: MovementStatus,
432 ) -> anyhow::Result<(), MovementError> {
433 self.manager.finish_movement(self.id, status).await?;
434 self.has_finished = true;
435 Ok(())
436 }
437
438 pub async fn finish_at(
447 &mut self,
448 status: MovementStatus,
449 at: DateTime<chrono::Local>,
450 ) -> anyhow::Result<(), MovementError> {
451 self.manager.finish_movement_at(self.id, status, at).await?;
452 self.has_finished = true;
453 Ok(())
454 }
455
456 pub fn stop(&mut self) {
459 self.has_finished = true;
460 }
461}
462
463impl Drop for MovementGuard {
464 fn drop(&mut self) {
465 if !self.has_finished {
466 let manager = self.manager.clone();
468 let id = self.id;
469 let on_drop = self.on_drop;
470 tokio::spawn(async move {
471 if let Err(e) = manager.finish_movement(id, on_drop.into()).await {
472 log::error!("An error occurred in MovementGuard::drop(): {:#}", e);
473 }
474 });
475 }
476 }
477}