bark/
daemon.rs

1pub use tokio_util::sync::CancellationToken;
2
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::time::Duration;
6
7use anyhow::Context;
8use futures::StreamExt;
9use log::{info, warn};
10use tokio::sync::RwLock;
11
12use crate::Wallet;
13use crate::onchain::ExitUnilaterally;
14
15lazy_static::lazy_static! {
16	static ref FAST_INTERVAL: Duration = Duration::from_secs(1);
17	static ref MEDIUM_INTERVAL: Duration = Duration::from_secs(30);
18	static ref SLOW_INTERVAL: Duration = Duration::from_secs(60);
19}
20
21/// The daemon is responsible for running the wallet and performing the
22/// necessary actions to keep the wallet in a healthy state
23pub struct Daemon {
24	shutdown: CancellationToken,
25
26	connected: AtomicBool,
27	wallet: Arc<Wallet>,
28	onchain: Arc<RwLock<dyn ExitUnilaterally>>,
29}
30
31impl Daemon {
32	/// Create a new [Daemon]
33	///
34	/// Arguments:
35	/// - `shutdown`: A signal receive to shutdown the daemon
36	/// - `wallet`: The wallet to run the daemon on
37	/// - `onchain`: The onchain wallet to run the daemon on
38	///
39	/// Returns:
40	/// - The new [Daemon]
41	pub fn new(
42		shutdown: CancellationToken,
43		wallet: Arc<Wallet>,
44		onchain: Arc<RwLock<dyn ExitUnilaterally>>,
45	) -> anyhow::Result<Daemon> {
46		let daemon = Daemon {
47			shutdown,
48
49			connected: AtomicBool::new(false),
50			wallet,
51			onchain,
52		};
53
54		Ok(daemon)
55	}
56
57	/// Run lightning sync process
58	/// - Try to claim all pending lightning receives
59	/// - Sync pending lightning sends
60	async fn run_lightning_sync(&self) {
61		if let Err(e) = self.wallet.try_claim_all_lightning_receives(false).await {
62			warn!("An error occured while checking and claiming pending lightning receives: {e}");
63		}
64
65		if let Err(e) = self.wallet.sync_pending_lightning_send_vtxos().await {
66			warn!("An error occured while syncing pending lightning sends: {e}");
67		}
68	}
69
70	/// Check for incoming arkoors
71	async fn run_arkoors_sync(&self) {
72		if let Err(e) = self.wallet.sync_oors().await {
73			warn!("An error occured while syncing oors: {e}");
74		}
75	}
76
77	/// Sync pending boards, register new ones if needed
78	async fn run_boards_sync(&self) {
79		if let Err(e) = self.wallet.sync_pending_boards().await {
80			warn!("An error occured while syncing pending board: {e}");
81		}
82	}
83
84	/// Perform library built-in maintenance refresh
85	async fn run_maintenance_refresh_process(&self) {
86		loop {
87			if let Err(e) = self.wallet.maintenance_refresh().await {
88				warn!("An error occured while performing maintenance refresh: {e}");
89			}
90
91			tokio::select! {
92				_ = tokio::time::sleep(*SLOW_INTERVAL) => {},
93				_ = self.shutdown.cancelled() => {
94					info!("Shutdown signal received! Shutting maintenance refresh process...");
95					break;
96				},
97			}
98		}
99	}
100
101	/// Progress any ongoing unilateral exits and sync the exit statuses
102	async fn run_exits(&self) {
103		let mut onchain = self.onchain.write().await;
104
105		if let Err(e) = self.wallet.sync_exits(&mut *onchain).await {
106			warn!("An error occured while syncing exits: {e}");
107		}
108
109		let mut exit_lock = self.wallet.exit.write().await;
110		if let Err(e) = exit_lock.progress_exits(&mut *onchain, None).await {
111			warn!("An error occured while progressing exits: {e}");
112		}
113		drop(exit_lock);
114	}
115
116	/// Subscribe to round event stream and process each incoming event
117	async fn inner_process_pending_rounds(&self) -> anyhow::Result<()> {
118		let mut events = self.wallet.subscribe_round_events().await?;
119
120		loop {
121			tokio::select! {
122				res = events.next() => {
123					let event = res.context("events stream broke")?
124						.context("error on event stream")?;
125
126					self.wallet.progress_pending_rounds(Some(&event)).await?;
127				},
128				_ = self.shutdown.cancelled() => {
129					info!("Shutdown signal received! Shutting inner round events process...");
130					return Ok(());
131				},
132			}
133		}
134	}
135
136	/// Recursively resubscribe to round event stream by waiting and
137	/// calling [Self::inner_process_pending_rounds] again until
138	/// the daemon is shutdown.
139	async fn run_round_events_process(&self) {
140		loop {
141			if self.connected.load(Ordering::Relaxed) {
142				if let Err(e) = self.inner_process_pending_rounds().await {
143					warn!("An error occured while processing pending rounds: {e}");
144				}
145			}
146
147			tokio::select! {
148				_ = tokio::time::sleep(*SLOW_INTERVAL) => {},
149				_ = self.shutdown.cancelled() => {
150					info!("Shutdown signal received! Shutting round events process...");
151					break;
152				},
153			}
154		}
155	}
156
157	/// Run a process that will recursively check the server connection
158	async fn run_server_connection_check_process(&self) {
159		loop {
160			tokio::select! {
161				_ = tokio::time::sleep(*FAST_INTERVAL) => {},
162				_ = self.shutdown.cancelled() => {
163					info!("Shutdown signal received! Shutting server connection check process...");
164					break;
165				},
166			}
167
168			let connected = self.wallet.refresh_server().await.is_ok();
169			self.connected.store(connected, Ordering::Relaxed);
170		}
171	}
172
173	async fn run_sync_processes(&self) {
174		let mut fast_interval = tokio::time::interval(*FAST_INTERVAL);
175		fast_interval.reset();
176		let mut medium_interval = tokio::time::interval(*MEDIUM_INTERVAL);
177		medium_interval.reset();
178		let mut slow_interval = tokio::time::interval(*SLOW_INTERVAL);
179		slow_interval.reset();
180
181		loop {
182			tokio::select! {
183				_ = fast_interval.tick() => {
184					if !self.connected.load(Ordering::Relaxed) {
185						continue;
186					}
187
188					self.run_lightning_sync().await;
189					fast_interval.reset();
190				},
191				_ = medium_interval.tick() => {
192					if !self.connected.load(Ordering::Relaxed) {
193						continue;
194					}
195
196					self.run_arkoors_sync().await;
197					self.run_boards_sync().await;
198					medium_interval.reset();
199				},
200				_ = slow_interval.tick() => {
201					if !self.connected.load(Ordering::Relaxed) {
202						continue;
203					}
204
205					self.run_exits().await;
206					slow_interval.reset();
207				},
208				_ = self.shutdown.cancelled() => {
209					info!("Shutdown signal received! Shutting sync processes...");
210					break;
211				},
212			}
213		}
214	}
215
216	pub async fn run(self) {
217		let connected = self.wallet.server.read().is_some();
218		self.connected.store(connected, Ordering::Relaxed);
219
220		let _ = tokio::join!(
221			self.run_server_connection_check_process(),
222			self.run_round_events_process(),
223			self.run_sync_processes(),
224			self.run_maintenance_refresh_process(),
225		);
226
227		info!("Daemon gracefully stopped");
228	}
229}