1
2use std::sync::Arc;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::time::Duration;
5
6use anyhow::Context;
7use futures::{FutureExt, StreamExt};
8use log::{info, warn};
9use tokio::sync::RwLock;
10use tokio_util::sync::CancellationToken;
11
12use crate::Wallet;
13use crate::onchain::DaemonizableOnchainWallet;
14
15const FAST_INTERVAL: Duration = Duration::from_secs(1);
16const MEDIUM_INTERVAL: Duration = Duration::from_secs(30);
17const SLOW_INTERVAL: Duration = Duration::from_secs(60);
18
19#[cfg(not(feature = "wasm-web"))]
21pub struct DaemonHandle {
22 shutdown: CancellationToken,
23 jh: tokio::task::JoinHandle<()>,
24}
25
26#[cfg(feature = "wasm-web")]
28pub struct DaemonHandle {
29 shutdown: CancellationToken,
30}
31
32impl DaemonHandle {
33 pub fn stop(&self) {
35 self.shutdown.cancel();
36 }
37
38 pub async fn stop_wait(self) -> anyhow::Result<()> {
40 self.stop();
41 #[cfg(not(feature = "wasm-web"))]
42 self.jh.await?;
43 Ok(())
44 }
45}
46
47pub(crate) fn start_daemon(
48 wallet: Arc<Wallet>,
49 onchain: Arc<RwLock<dyn DaemonizableOnchainWallet>>,
50) -> DaemonHandle {
51 let shutdown = CancellationToken::new();
52 let proc = DaemonProcess::new(shutdown.clone(), wallet, onchain);
53
54 #[cfg(not(feature = "wasm-web"))]
55 {
56 let jh = crate::utils::spawn(proc.run());
57 DaemonHandle { shutdown, jh }
58 }
59 #[cfg(feature = "wasm-web")]
60 {
61 crate::utils::spawn(proc.run());
62 DaemonHandle { shutdown }
63 }
64}
65
66struct DaemonProcess {
69 shutdown: CancellationToken,
70
71 connected: AtomicBool,
72 wallet: Arc<Wallet>,
73 onchain: Arc<RwLock<dyn DaemonizableOnchainWallet>>,
74}
75
76impl DaemonProcess {
77 fn new(
78 shutdown: CancellationToken,
79 wallet: Arc<Wallet>,
80 onchain: Arc<RwLock<dyn DaemonizableOnchainWallet>>,
81 ) -> DaemonProcess {
82 DaemonProcess {
83 connected: AtomicBool::new(false),
84 shutdown,
85 wallet,
86 onchain,
87 }
88 }
89
90 async fn run_lightning_sync(&self) {
94 if let Err(e) = self.wallet.try_claim_all_lightning_receives(false).await {
95 warn!("An error occured while checking and claiming pending lightning receives: {e:#}");
96 }
97
98 if let Err(e) = self.wallet.sync_pending_lightning_send_vtxos().await {
99 warn!("An error occured while syncing pending lightning sends: {e:#}");
100 }
101 }
102
103 async fn sync_mailbox(&self) {
105 if let Err(e) = self.wallet.sync_mailbox().await {
106 warn!("An error occurred while syncing mailbox: {e:#}");
107 }
108 }
109
110 async fn run_boards_sync(&self) {
112 if let Err(e) = self.wallet.sync_pending_boards().await {
113 warn!("An error occured while syncing pending board: {e:#}");
114 }
115 }
116
117 async fn run_offboards_sync(&self) {
119 if let Err(e) = self.wallet.sync_pending_offboards().await {
120 warn!("An error occured while syncing pending offboards: {e:#}");
121 }
122 }
123
124 async fn run_onchain_sync(&self) {
126 let mut onchain = self.onchain.write().await;
127 if let Err(e) = onchain.sync(&self.wallet.chain).await {
128 warn!("An error occured while syncing onchain: {e:#}");
129 }
130 }
131
132 async fn run_maintenance_refresh_process(&self) {
134 loop {
135 if let Err(e) = self.wallet.maintenance_refresh().await {
136 warn!("An error occured while performing maintenance refresh: {e:#}");
137 }
138
139 futures::select! {
140 _ = tokio::time::sleep(SLOW_INTERVAL).fuse() => {},
141
142 _ = self.shutdown.cancelled().fuse() => {
143 info!("Shutdown signal received! Shutting maintenance refresh process...");
144 break;
145 },
146 }
147 }
148 }
149
150 async fn run_exits(&self) {
152 let mut onchain = self.onchain.write().await;
153
154 let mut exit_lock = self.wallet.exit.write().await;
155 if let Err(e) = exit_lock.sync_no_progress(&*onchain).await {
156 warn!("An error occurred while syncing exits: {e:#}");
157 }
158
159 if let Err(e) = exit_lock.progress_exits(&self.wallet, &mut *onchain, None).await {
160 warn!("An error occurred while progressing exits: {e:#}");
161 }
162 }
163
164 async fn inner_process_pending_rounds(&self) -> anyhow::Result<()> {
166 let mut events = self.wallet.subscribe_round_events().await?;
167
168 loop {
169 futures::select! {
170 res = events.next().fuse() => {
171 let event = res.context("events stream broke")?
172 .context("error on event stream")?;
173
174 self.wallet.progress_pending_rounds(Some(&event)).await?;
175 },
176 _ = self.shutdown.cancelled().fuse() => {
177 info!("Shutdown signal received! Shutting inner round events process...");
178 return Ok(());
179 },
180 }
181 }
182 }
183
184 async fn run_round_events_process(&self) {
188 loop {
189 if self.connected.load(Ordering::Relaxed) {
190 if let Err(e) = self.inner_process_pending_rounds().await {
191 warn!("An error occured while processing pending rounds: {e:#}");
192 }
193 }
194
195 futures::select! {
196 _ = tokio::time::sleep(SLOW_INTERVAL).fuse() => {},
197 _ = self.shutdown.cancelled().fuse() => {
198 info!("Shutdown signal received! Shutting round events process...");
199 break;
200 },
201 }
202 }
203 }
204
205 async fn run_server_connection_check_process(&self) {
207 loop {
208 futures::select! {
209 _ = tokio::time::sleep(FAST_INTERVAL).fuse() => {},
210 _ = self.shutdown.cancelled().fuse() => {
211 info!("Shutdown signal received! Shutting server connection check process...");
212 break;
213 },
214 }
215
216 let connected = self.wallet.refresh_server().await.is_ok();
217 self.connected.store(connected, Ordering::Relaxed);
218 }
219 }
220
221 async fn run_sync_processes(&self) {
222 let mut fast_interval = tokio::time::interval(FAST_INTERVAL);
223 fast_interval.reset();
224 let mut medium_interval = tokio::time::interval(MEDIUM_INTERVAL);
225 medium_interval.reset();
226 let mut slow_interval = tokio::time::interval(SLOW_INTERVAL);
227 slow_interval.reset();
228
229 loop {
230 futures::select! {
231 _ = fast_interval.tick().fuse() => {
232 if !self.connected.load(Ordering::Relaxed) {
233 continue;
234 }
235
236 self.run_lightning_sync().await;
237 fast_interval.reset();
238 },
239 _ = medium_interval.tick().fuse() => {
240 if !self.connected.load(Ordering::Relaxed) {
241 continue;
242 }
243
244 self.sync_mailbox().await;
245 self.run_boards_sync().await;
246 self.run_offboards_sync().await;
247 medium_interval.reset();
248 },
249 _ = slow_interval.tick().fuse() => {
250 if !self.connected.load(Ordering::Relaxed) {
251 continue;
252 }
253
254 self.run_onchain_sync().await;
255 self.run_exits().await;
256 slow_interval.reset();
257 },
258 _ = self.shutdown.cancelled().fuse() => {
259 info!("Shutdown signal received! Shutting sync processes...");
260 break;
261 },
262 }
263 }
264 }
265
266 pub async fn run(self) {
267 let connected = self.wallet.server.read().is_some();
268 self.connected.store(connected, Ordering::Relaxed);
269
270 let _ = futures::join!(
271 self.run_server_connection_check_process(),
272 self.run_round_events_process(),
273 self.run_sync_processes(),
274 self.run_maintenance_refresh_process(),
275 );
276
277 info!("Daemon gracefully stopped");
278 }
279}