bark/
daemon.rs

1
2use std::sync::Arc;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::time::Duration;
5
6use anyhow::Context;
7use futures::{FutureExt, StreamExt};
8use log::{info, warn};
9use tokio::sync::RwLock;
10use tokio_util::sync::CancellationToken;
11
12use crate::Wallet;
13use crate::onchain::DaemonizableOnchainWallet;
14
15const FAST_INTERVAL: Duration = Duration::from_secs(1);
16const MEDIUM_INTERVAL: Duration = Duration::from_secs(30);
17const SLOW_INTERVAL: Duration = Duration::from_secs(60);
18
19/// A handle to a running background daemon
20#[cfg(not(feature = "wasm-web"))]
21pub struct DaemonHandle {
22	shutdown: CancellationToken,
23	jh: tokio::task::JoinHandle<()>,
24}
25
26/// A handle to a running background daemon for WASM
27#[cfg(feature = "wasm-web")]
28pub struct DaemonHandle {
29	shutdown: CancellationToken,
30}
31
32impl DaemonHandle {
33	/// Trigger the daemon process to stop
34	pub fn stop(&self) {
35		self.shutdown.cancel();
36	}
37
38	/// Stop the daemon process and wait for it to finish
39	pub async fn stop_wait(self) -> anyhow::Result<()> {
40		self.stop();
41		#[cfg(not(feature = "wasm-web"))]
42		self.jh.await?;
43		Ok(())
44	}
45}
46
47pub(crate) fn start_daemon(
48	wallet: Arc<Wallet>,
49	onchain: Arc<RwLock<dyn DaemonizableOnchainWallet>>,
50) -> DaemonHandle {
51	let shutdown = CancellationToken::new();
52	let proc = DaemonProcess::new(shutdown.clone(), wallet, onchain);
53
54	#[cfg(not(feature = "wasm-web"))]
55	{
56		let jh = crate::utils::spawn(proc.run());
57		DaemonHandle { shutdown, jh }
58	}
59	#[cfg(feature = "wasm-web")]
60	{
61		crate::utils::spawn(proc.run());
62		DaemonHandle { shutdown }
63	}
64}
65
66/// The daemon is responsible for running the wallet and performing the
67/// necessary actions to keep the wallet in a healthy state
68struct DaemonProcess {
69	shutdown: CancellationToken,
70
71	connected: AtomicBool,
72	wallet: Arc<Wallet>,
73	onchain: Arc<RwLock<dyn DaemonizableOnchainWallet>>,
74}
75
76impl DaemonProcess {
77	fn new(
78		shutdown: CancellationToken,
79		wallet: Arc<Wallet>,
80		onchain: Arc<RwLock<dyn DaemonizableOnchainWallet>>,
81	) -> DaemonProcess {
82		DaemonProcess {
83			connected: AtomicBool::new(false),
84			shutdown,
85			wallet,
86			onchain,
87		}
88	}
89
90	/// Run lightning sync process
91	/// - Try to claim all pending lightning receives
92	/// - Sync pending lightning sends
93	async fn run_lightning_sync(&self) {
94		if let Err(e) = self.wallet.try_claim_all_lightning_receives(false).await {
95			warn!("An error occured while checking and claiming pending lightning receives: {e:#}");
96		}
97
98		if let Err(e) = self.wallet.sync_pending_lightning_send_vtxos().await {
99			warn!("An error occured while syncing pending lightning sends: {e:#}");
100		}
101	}
102
103	/// Check for incoming arkoors
104	async fn sync_mailbox(&self) {
105		if let Err(e) = self.wallet.sync_mailbox().await {
106			warn!("An error occurred while syncing mailbox: {e:#}");
107		}
108	}
109
110	/// Sync pending boards, register new ones if needed
111	async fn run_boards_sync(&self) {
112		if let Err(e) = self.wallet.sync_pending_boards().await {
113			warn!("An error occured while syncing pending board: {e:#}");
114		}
115	}
116
117	/// Sync pending offboards, check for confirmations
118	async fn run_offboards_sync(&self) {
119		if let Err(e) = self.wallet.sync_pending_offboards().await {
120			warn!("An error occured while syncing pending offboards: {e:#}");
121		}
122	}
123
124	/// Sync onchain wallet
125	async fn run_onchain_sync(&self) {
126		let mut onchain = self.onchain.write().await;
127		if let Err(e) = onchain.sync(&self.wallet.chain).await {
128			warn!("An error occured while syncing onchain: {e:#}");
129		}
130	}
131
132	/// Perform library built-in maintenance refresh
133	async fn run_maintenance_refresh_process(&self) {
134		loop {
135			if let Err(e) = self.wallet.maintenance_refresh().await {
136				warn!("An error occured while performing maintenance refresh: {e:#}");
137			}
138
139			futures::select! {
140				_ = tokio::time::sleep(SLOW_INTERVAL).fuse() => {},
141
142				_ = self.shutdown.cancelled().fuse() => {
143					info!("Shutdown signal received! Shutting maintenance refresh process...");
144					break;
145				},
146			}
147		}
148	}
149
150	/// Progress any ongoing unilateral exits and sync the exit statuses
151	async fn run_exits(&self) {
152		let mut onchain = self.onchain.write().await;
153
154		let mut exit_lock = self.wallet.exit.write().await;
155		if let Err(e) = exit_lock.sync_no_progress(&*onchain).await {
156			warn!("An error occurred while syncing exits: {e:#}");
157		}
158
159		if let Err(e) = exit_lock.progress_exits(&self.wallet, &mut *onchain, None).await {
160			warn!("An error occurred while progressing exits: {e:#}");
161		}
162	}
163
164	/// Subscribe to round event stream and process each incoming event
165	async fn inner_process_pending_rounds(&self) -> anyhow::Result<()> {
166		let mut events = self.wallet.subscribe_round_events().await?;
167
168		loop {
169			futures::select! {
170				res = events.next().fuse() => {
171					let event = res.context("events stream broke")?
172						.context("error on event stream")?;
173
174					self.wallet.progress_pending_rounds(Some(&event)).await?;
175				},
176				_ = self.shutdown.cancelled().fuse() => {
177					info!("Shutdown signal received! Shutting inner round events process...");
178					return Ok(());
179				},
180			}
181		}
182	}
183
184	/// Recursively resubscribe to round event stream by waiting and
185	/// calling [Self::inner_process_pending_rounds] again until
186	/// the daemon is shutdown.
187	async fn run_round_events_process(&self) {
188		loop {
189			if self.connected.load(Ordering::Relaxed) {
190				if let Err(e) = self.inner_process_pending_rounds().await {
191					warn!("An error occured while processing pending rounds: {e:#}");
192				}
193			}
194
195			futures::select! {
196				_ = tokio::time::sleep(SLOW_INTERVAL).fuse() => {},
197				_ = self.shutdown.cancelled().fuse() => {
198					info!("Shutdown signal received! Shutting round events process...");
199					break;
200				},
201			}
202		}
203	}
204
205	/// Run a process that will recursively check the server connection
206	async fn run_server_connection_check_process(&self) {
207		loop {
208			futures::select! {
209				_ = tokio::time::sleep(FAST_INTERVAL).fuse() => {},
210				_ = self.shutdown.cancelled().fuse() => {
211					info!("Shutdown signal received! Shutting server connection check process...");
212					break;
213				},
214			}
215
216			let connected = self.wallet.refresh_server().await.is_ok();
217			self.connected.store(connected, Ordering::Relaxed);
218		}
219	}
220
221	async fn run_sync_processes(&self) {
222		let mut fast_interval = tokio::time::interval(FAST_INTERVAL);
223		fast_interval.reset();
224		let mut medium_interval = tokio::time::interval(MEDIUM_INTERVAL);
225		medium_interval.reset();
226		let mut slow_interval = tokio::time::interval(SLOW_INTERVAL);
227		slow_interval.reset();
228
229		loop {
230			futures::select! {
231				_ = fast_interval.tick().fuse() => {
232					if !self.connected.load(Ordering::Relaxed) {
233						continue;
234					}
235
236					self.run_lightning_sync().await;
237					fast_interval.reset();
238				},
239				_ = medium_interval.tick().fuse() => {
240					if !self.connected.load(Ordering::Relaxed) {
241						continue;
242					}
243
244					self.sync_mailbox().await;
245					self.run_boards_sync().await;
246					self.run_offboards_sync().await;
247					medium_interval.reset();
248				},
249				_ = slow_interval.tick().fuse() => {
250					if !self.connected.load(Ordering::Relaxed) {
251						continue;
252					}
253
254					self.run_onchain_sync().await;
255					self.run_exits().await;
256					slow_interval.reset();
257				},
258				_ = self.shutdown.cancelled().fuse() => {
259					info!("Shutdown signal received! Shutting sync processes...");
260					break;
261				},
262			}
263		}
264	}
265
266	pub async fn run(self) {
267		let connected = self.wallet.server.read().is_some();
268		self.connected.store(connected, Ordering::Relaxed);
269
270		let _ = futures::join!(
271			self.run_server_connection_check_process(),
272			self.run_round_events_process(),
273			self.run_sync_processes(),
274			self.run_maintenance_refresh_process(),
275		);
276
277		info!("Daemon gracefully stopped");
278	}
279}