1pub use tokio_util::sync::CancellationToken;
2
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::time::Duration;
6
7use anyhow::Context;
8use futures::StreamExt;
9use log::{info, warn};
10use tokio::sync::RwLock;
11
12use crate::Wallet;
13use crate::onchain::ExitUnilaterally;
14
15lazy_static::lazy_static! {
16 static ref FAST_INTERVAL: Duration = Duration::from_secs(1);
17 static ref MEDIUM_INTERVAL: Duration = Duration::from_secs(30);
18 static ref SLOW_INTERVAL: Duration = Duration::from_secs(60);
19}
20
21pub struct Daemon {
24 shutdown: CancellationToken,
25
26 connected: AtomicBool,
27 wallet: Arc<Wallet>,
28 onchain: Arc<RwLock<dyn ExitUnilaterally>>,
29}
30
31impl Daemon {
32 pub fn new(
42 shutdown: CancellationToken,
43 wallet: Arc<Wallet>,
44 onchain: Arc<RwLock<dyn ExitUnilaterally>>,
45 ) -> anyhow::Result<Daemon> {
46 let daemon = Daemon {
47 shutdown,
48
49 connected: AtomicBool::new(false),
50 wallet,
51 onchain,
52 };
53
54 Ok(daemon)
55 }
56
57 async fn run_lightning_sync(&self) {
61 if let Err(e) = self.wallet.try_claim_all_lightning_receives(false).await {
62 warn!("An error occured while checking and claiming pending lightning receives: {e}");
63 }
64
65 if let Err(e) = self.wallet.sync_pending_lightning_send_vtxos().await {
66 warn!("An error occured while syncing pending lightning sends: {e}");
67 }
68 }
69
70 async fn run_arkoors_sync(&self) {
72 if let Err(e) = self.wallet.sync_oors().await {
73 warn!("An error occured while syncing oors: {e}");
74 }
75 }
76
77 async fn run_boards_sync(&self) {
79 if let Err(e) = self.wallet.sync_pending_boards().await {
80 warn!("An error occured while syncing pending board: {e}");
81 }
82 }
83
84 async fn run_maintenance_refresh_process(&self) {
86 loop {
87 if let Err(e) = self.wallet.maintenance_refresh().await {
88 warn!("An error occured while performing maintenance refresh: {e}");
89 }
90
91 tokio::select! {
92 _ = tokio::time::sleep(*SLOW_INTERVAL) => {},
93 _ = self.shutdown.cancelled() => {
94 info!("Shutdown signal received! Shutting maintenance refresh process...");
95 break;
96 },
97 }
98 }
99 }
100
101 async fn run_exits(&self) {
103 let mut onchain = self.onchain.write().await;
104
105 if let Err(e) = self.wallet.sync_exits(&mut *onchain).await {
106 warn!("An error occured while syncing exits: {e}");
107 }
108
109 let mut exit_lock = self.wallet.exit.write().await;
110 if let Err(e) = exit_lock.progress_exits(&mut *onchain, None).await {
111 warn!("An error occured while progressing exits: {e}");
112 }
113 drop(exit_lock);
114 }
115
116 async fn inner_process_pending_rounds(&self) -> anyhow::Result<()> {
118 let mut events = self.wallet.subscribe_round_events().await?;
119
120 loop {
121 tokio::select! {
122 res = events.next() => {
123 let event = res.context("events stream broke")?
124 .context("error on event stream")?;
125
126 self.wallet.progress_pending_rounds(Some(&event)).await?;
127 },
128 _ = self.shutdown.cancelled() => {
129 info!("Shutdown signal received! Shutting inner round events process...");
130 return Ok(());
131 },
132 }
133 }
134 }
135
136 async fn run_round_events_process(&self) {
140 loop {
141 if self.connected.load(Ordering::Relaxed) {
142 if let Err(e) = self.inner_process_pending_rounds().await {
143 warn!("An error occured while processing pending rounds: {e}");
144 }
145 }
146
147 tokio::select! {
148 _ = tokio::time::sleep(*SLOW_INTERVAL) => {},
149 _ = self.shutdown.cancelled() => {
150 info!("Shutdown signal received! Shutting round events process...");
151 break;
152 },
153 }
154 }
155 }
156
157 async fn run_server_connection_check_process(&self) {
159 loop {
160 tokio::select! {
161 _ = tokio::time::sleep(*FAST_INTERVAL) => {},
162 _ = self.shutdown.cancelled() => {
163 info!("Shutdown signal received! Shutting server connection check process...");
164 break;
165 },
166 }
167
168 let connected = self.wallet.refresh_server().await.is_ok();
169 self.connected.store(connected, Ordering::Relaxed);
170 }
171 }
172
173 async fn run_sync_processes(&self) {
174 let mut fast_interval = tokio::time::interval(*FAST_INTERVAL);
175 fast_interval.reset();
176 let mut medium_interval = tokio::time::interval(*MEDIUM_INTERVAL);
177 medium_interval.reset();
178 let mut slow_interval = tokio::time::interval(*SLOW_INTERVAL);
179 slow_interval.reset();
180
181 loop {
182 tokio::select! {
183 _ = fast_interval.tick() => {
184 if !self.connected.load(Ordering::Relaxed) {
185 continue;
186 }
187
188 self.run_lightning_sync().await;
189 fast_interval.reset();
190 },
191 _ = medium_interval.tick() => {
192 if !self.connected.load(Ordering::Relaxed) {
193 continue;
194 }
195
196 self.run_arkoors_sync().await;
197 self.run_boards_sync().await;
198 medium_interval.reset();
199 },
200 _ = slow_interval.tick() => {
201 if !self.connected.load(Ordering::Relaxed) {
202 continue;
203 }
204
205 self.run_exits().await;
206 slow_interval.reset();
207 },
208 _ = self.shutdown.cancelled() => {
209 info!("Shutdown signal received! Shutting sync processes...");
210 break;
211 },
212 }
213 }
214 }
215
216 pub async fn run(self) {
217 let connected = self.wallet.server.read().is_some();
218 self.connected.store(connected, Ordering::Relaxed);
219
220 let _ = tokio::join!(
221 self.run_server_connection_check_process(),
222 self.run_round_events_process(),
223 self.run_sync_processes(),
224 self.run_maintenance_refresh_process(),
225 );
226
227 info!("Daemon gracefully stopped");
228 }
229}