1mod convert;
12mod migrations;
13mod query;
14
15
16use std::path::{Path, PathBuf};
17
18use anyhow::Context;
19use bitcoin::{Amount, Txid};
20use bitcoin::secp256k1::PublicKey;
21use chrono::DateTime;
22use lightning_invoice::Bolt11Invoice;
23use log::debug;
24use rusqlite::Connection;
25
26use ark::{Vtxo, VtxoId};
27use ark::lightning::{Invoice, PaymentHash, Preimage};
28use ark::vtxo::Full;
29use bitcoin_ext::BlockDelta;
30
31use crate::WalletProperties;
32use crate::exit::ExitTxOrigin;
33use crate::movement::{Movement, MovementId, MovementStatus, MovementSubsystem};
34use crate::persist::{BarkPersister, RoundStateId, StoredRoundState, Unlocked};
35use crate::persist::models::{LightningReceive, LightningSend, PendingBoard, PendingOffboard, StoredExit};
36use crate::round::RoundState;
37use crate::vtxo::{VtxoState, VtxoStateKind, WalletVtxo};
38
39#[derive(Clone)]
42pub struct SqliteClient {
43 connection_string: PathBuf,
44}
45
46impl SqliteClient {
47 pub fn open(db_file: impl AsRef<Path>) -> anyhow::Result<SqliteClient> {
49 let path = db_file.as_ref().to_path_buf();
50
51 debug!("Opening database at {}", path.display());
52 let mut conn = rusqlite::Connection::open(&path)
53 .with_context(|| format!("Error connecting to database {}", path.display()))?;
54
55 let migrations = migrations::MigrationContext::new();
56 migrations.do_all_migrations(&mut conn)?;
57
58 Ok( Self { connection_string: path })
59 }
60
61 fn connect(&self) -> anyhow::Result<Connection> {
62 rusqlite::Connection::open(&self.connection_string)
63 .with_context(|| format!("Error connecting to database {}", self.connection_string.display()))
64 }
65}
66
67#[async_trait]
68impl BarkPersister for SqliteClient {
69 async fn init_wallet(&self, properties: &WalletProperties) -> anyhow::Result<()> {
70 let mut conn = self.connect()?;
71 let tx = conn.transaction()?;
72
73 query::set_properties(&tx, properties)?;
74
75 tx.commit()?;
76 Ok(())
77 }
78
79 #[cfg(feature = "onchain-bdk")]
80 async fn initialize_bdk_wallet(&self) -> anyhow::Result<bdk_wallet::ChangeSet> {
81 let mut conn = self.connect()?;
82 Ok(bdk_wallet::WalletPersister::initialize(&mut conn)?)
83 }
84
85 #[cfg(feature = "onchain-bdk")]
86 async fn store_bdk_wallet_changeset(&self, changeset: &bdk_wallet::ChangeSet) -> anyhow::Result<()> {
87 let mut conn = self.connect()?;
88 bdk_wallet::WalletPersister::persist(&mut conn, changeset)?;
89 Ok(())
90 }
91
92 async fn read_properties(&self) -> anyhow::Result<Option<WalletProperties>> {
93 let conn = self.connect()?;
94 Ok(query::fetch_properties(&conn)?)
95 }
96
97 async fn set_server_pubkey(&self, server_pubkey: PublicKey) -> anyhow::Result<()> {
98 let conn = self.connect()?;
99 query::set_server_pubkey(&conn, &server_pubkey)?;
100 Ok(())
101 }
102
103 async fn create_new_movement(&self,
104 status: MovementStatus,
105 subsystem: &MovementSubsystem,
106 time: DateTime<chrono::Local>,
107 ) -> anyhow::Result<MovementId> {
108 let mut conn = self.connect()?;
109 let tx = conn.transaction()?;
110 let movement_id = query::create_new_movement(&tx, status, subsystem, time)?;
111 tx.commit()?;
112 Ok(movement_id)
113 }
114
115 async fn update_movement(&self, movement: &Movement) -> anyhow::Result<()> {
116 let mut conn = self.connect()?;
117 let tx = conn.transaction()?;
118 query::update_movement(&tx, movement)?;
119 tx.commit()?;
120 Ok(())
121 }
122
123 async fn get_movement_by_id(&self, movement_id: MovementId) -> anyhow::Result<Movement> {
124 let conn = self.connect()?;
125 query::get_movement_by_id(&conn, movement_id)
126 }
127
128 async fn get_all_movements(&self) -> anyhow::Result<Vec<Movement>> {
129 let conn = self.connect()?;
130 query::get_all_movements(&conn)
131 }
132
133 async fn store_pending_board(
134 &self,
135 vtxo: &Vtxo<Full>,
136 funding_tx: &bitcoin::Transaction,
137 movement_id: MovementId,
138 ) -> anyhow::Result<()> {
139 let mut conn = self.connect()?;
140 let tx = conn.transaction()?;
141 query::store_new_pending_board(&tx, vtxo, funding_tx, movement_id)?;
142 tx.commit()?;
143 Ok(())
144 }
145
146 async fn remove_pending_board(&self, vtxo_id: &VtxoId) -> anyhow::Result<()> {
147 let mut conn = self.connect()?;
148 let tx = conn.transaction()?;
149 query::remove_pending_board(&tx, vtxo_id)?;
150 tx.commit()?;
151 Ok(())
152 }
153
154 async fn get_all_pending_board_ids(&self) -> anyhow::Result<Vec<VtxoId>> {
155 let conn = self.connect()?;
156 query::get_all_pending_boards_ids(&conn)
157 }
158
159 async fn get_pending_board_by_vtxo_id(&self, vtxo_id: VtxoId) -> anyhow::Result<Option<PendingBoard>> {
160 let conn = self.connect()?;
161 query::get_pending_board_by_vtxo_id(&conn, vtxo_id)
162 }
163
164 async fn store_round_state_lock_vtxos(&self, round_state: &RoundState) -> anyhow::Result<RoundStateId> {
165 let mut conn = self.connect()?;
166 let tx = conn.transaction()?;
167 for vtxo in round_state.participation().inputs.iter() {
168 query::update_vtxo_state_checked(
169 &*tx,
170 vtxo.id(),
171 VtxoState::Locked { movement_id: round_state.movement_id },
172 &[VtxoStateKind::Spendable],
173 )?;
174 }
175 let id = query::store_round_state(&tx, round_state)?;
176 tx.commit()?;
177 Ok(id)
178 }
179
180 async fn update_round_state(&self, state: &StoredRoundState) -> anyhow::Result<()> {
181 let conn = self.connect()?;
182 query::update_round_state(&conn, state)?;
183 Ok(())
184 }
185
186 async fn remove_round_state(&self, round_state: &StoredRoundState) -> anyhow::Result<()> {
187 let conn = self.connect()?;
188 query::remove_round_state(&conn, round_state.id())?;
189 Ok(())
190 }
191
192 async fn get_round_state_by_id(&self, id: RoundStateId) -> anyhow::Result<Option<StoredRoundState<Unlocked>>> {
193 let conn = self.connect()?;
194 query::get_round_state_by_id(&conn, id)
195 }
196
197 async fn get_pending_round_state_ids(&self) -> anyhow::Result<Vec<RoundStateId>> {
198 let conn = self.connect()?;
199 query::get_pending_round_state_ids(&conn)
200 }
201
202 async fn store_vtxos(
203 &self,
204 vtxos: &[(&Vtxo<Full>, &VtxoState)],
205 ) -> anyhow::Result<()> {
206 let mut conn = self.connect()?;
207 let tx = conn.transaction()?;
208
209 for (vtxo, state) in vtxos {
210 query::store_vtxo_with_initial_state(&tx, vtxo, state)?;
211 }
212 tx.commit()?;
213 Ok(())
214 }
215
216 async fn get_wallet_vtxo(&self, id: VtxoId) -> anyhow::Result<Option<WalletVtxo>> {
217 let conn = self.connect()?;
218 query::get_wallet_vtxo_by_id(&conn, id)
219 }
220
221 async fn get_all_vtxos(&self) -> anyhow::Result<Vec<WalletVtxo>> {
222 let conn = self.connect()?;
223 query::get_all_vtxos(&conn)
224 }
225
226 async fn get_vtxos_by_state(&self, state: &[VtxoStateKind]) -> anyhow::Result<Vec<WalletVtxo>> {
228 let conn = self.connect()?;
229 query::get_vtxos_by_state(&conn, state)
230 }
231
232 async fn has_spent_vtxo(&self, id: VtxoId) -> anyhow::Result<bool> {
233 let conn = self.connect()?;
234 let state : Option<VtxoState> = query::get_vtxo_state(&conn, id)?;
235 let result = state.map(|s| s == VtxoState::Spent).unwrap_or(false);
236 Ok(result)
237 }
238
239 async fn remove_vtxo(&self, id: VtxoId) -> anyhow::Result<Option<Vtxo<Full>>> {
240 let mut conn = self.connect()?;
241 let tx = conn.transaction().context("Failed to start transaction")?;
242 let result = query::delete_vtxo(&tx, id);
243 tx.commit().context("Failed to commit transaction")?;
244 result
245 }
246
247 async fn store_vtxo_key(&self, index: u32, public_key: PublicKey) -> anyhow::Result<()> {
248 let conn = self.connect()?;
249 query::store_vtxo_key(&conn, index, public_key)
250 }
251
252 async fn get_last_vtxo_key_index(&self) -> anyhow::Result<Option<u32>> {
253 let conn = self.connect()?;
254 query::get_last_vtxo_key_index(&conn)
255 }
256
257 async fn get_public_key_idx(&self, public_key: &PublicKey) -> anyhow::Result<Option<u32>> {
258 let conn = self.connect()?;
259 query::get_public_key_idx(&conn, public_key)
260 }
261
262 async fn get_mailbox_checkpoint(&self) -> anyhow::Result<u64> {
263 let conn = self.connect()?;
264 query::get_mailbox_checkpoint(&conn)
265 }
266
267 async fn store_mailbox_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
268 let conn = self.connect()?;
269 query::store_mailbox_checkpoint(&conn, checkpoint)?;
270 Ok(())
271 }
272
273 async fn store_lightning_receive(
275 &self,
276 payment_hash: PaymentHash,
277 preimage: Preimage,
278 invoice: &Bolt11Invoice,
279 htlc_recv_cltv_delta: BlockDelta,
280 ) -> anyhow::Result<()> {
281 let conn = self.connect()?;
282 query::store_lightning_receive(
283 &conn, payment_hash, preimage, invoice, htlc_recv_cltv_delta,
284 )?;
285 Ok(())
286 }
287
288 async fn store_new_pending_lightning_send(
289 &self,
290 invoice: &Invoice,
291 amount: Amount,
292 fee: Amount,
293 vtxos: &[VtxoId],
294 movement_id: MovementId,
295 ) -> anyhow::Result<LightningSend> {
296 let conn = self.connect()?;
297 query::store_new_pending_lightning_send(&conn, invoice, amount, fee, vtxos, movement_id)
298 }
299
300 async fn get_all_pending_lightning_send(&self) -> anyhow::Result<Vec<LightningSend>> {
301 let conn = self.connect()?;
302 query::get_all_pending_lightning_send(&conn)
303 }
304
305 async fn finish_lightning_send(
306 &self,
307 payment_hash: PaymentHash,
308 preimage: Option<Preimage>,
309 ) -> anyhow::Result<()> {
310 let conn = self.connect()?;
311 query::finish_lightning_send(&conn, payment_hash, preimage)
312 }
313
314 async fn remove_lightning_send(&self, payment_hash: PaymentHash) -> anyhow::Result<()> {
315 let conn = self.connect()?;
316 query::remove_lightning_send(&conn, payment_hash)?;
317 Ok(())
318 }
319
320 async fn get_lightning_send(&self, payment_hash: PaymentHash) -> anyhow::Result<Option<LightningSend>> {
321 let conn = self.connect()?;
322 query::get_lightning_send(&conn, payment_hash)
323 }
324
325 async fn get_all_pending_lightning_receives(&self) -> anyhow::Result<Vec<LightningReceive>> {
326 let conn = self.connect()?;
327 query::get_all_pending_lightning_receives(&conn)
328 }
329
330 async fn set_preimage_revealed(&self, payment_hash: PaymentHash) -> anyhow::Result<()> {
331 let conn = self.connect()?;
332 query::set_preimage_revealed(&conn, payment_hash)?;
333 Ok(())
334 }
335
336 async fn update_lightning_receive(
337 &self,
338 payment_hash: PaymentHash,
339 htlc_vtxo_ids: &[VtxoId],
340 movement_id: MovementId,
341 ) -> anyhow::Result<()> {
342 let conn = self.connect()?;
343 query::update_lightning_receive(&conn, payment_hash, htlc_vtxo_ids, movement_id)?;
344 Ok(())
345 }
346
347 async fn fetch_lightning_receive_by_payment_hash(
349 &self,
350 payment_hash: PaymentHash,
351 ) -> anyhow::Result<Option<LightningReceive>> {
352 let conn = self.connect()?;
353 query::fetch_lightning_receive_by_payment_hash(&conn, payment_hash)
354 }
355
356 async fn finish_pending_lightning_receive(&self, payment_hash: PaymentHash) -> anyhow::Result<()> {
357 let conn = self.connect()?;
358 query::finish_pending_lightning_receive(&conn, payment_hash)?;
359 Ok(())
360 }
361
362 async fn store_exit_vtxo_entry(&self, exit: &StoredExit) -> anyhow::Result<()> {
363 let mut conn = self.connect()?;
364 let tx = conn.transaction()?;
365 query::store_exit_vtxo_entry(&tx, exit)?;
366 tx.commit()?;
367 Ok(())
368 }
369
370 async fn remove_exit_vtxo_entry(&self, id: &VtxoId) -> anyhow::Result<()> {
371 let mut conn = self.connect()?;
372 let tx = conn.transaction()?;
373 query::remove_exit_vtxo_entry(&tx, &id)?;
374 tx.commit()?;
375 Ok(())
376 }
377
378 async fn get_exit_vtxo_entries(&self) -> anyhow::Result<Vec<StoredExit>> {
379 let conn = self.connect()?;
380 query::get_exit_vtxo_entries(&conn)
381 }
382
383 async fn store_exit_child_tx(
384 &self,
385 exit_txid: Txid,
386 child_tx: &bitcoin::Transaction,
387 origin: ExitTxOrigin,
388 ) -> anyhow::Result<()> {
389 let mut conn = self.connect()?;
390 let tx = conn.transaction()?;
391 query::store_exit_child_tx(&tx, exit_txid, child_tx, origin)?;
392 tx.commit()?;
393 Ok(())
394 }
395
396 async fn get_exit_child_tx(
397 &self,
398 exit_txid: Txid,
399 ) -> anyhow::Result<Option<(bitcoin::Transaction, ExitTxOrigin)>> {
400 let conn = self.connect()?;
401 query::get_exit_child_tx(&conn, exit_txid)
402 }
403
404 async fn update_vtxo_state_checked(
405 &self,
406 vtxo_id: VtxoId,
407 new_state: VtxoState,
408 allowed_old_states: &[VtxoStateKind]
409 ) -> anyhow::Result<WalletVtxo> {
410 let conn = self.connect()?;
411 query::update_vtxo_state_checked(&conn, vtxo_id, new_state, allowed_old_states)
412 }
413
414 async fn store_pending_offboard(
415 &self,
416 pending: &PendingOffboard,
417 ) -> anyhow::Result<()> {
418 let mut conn = self.connect()?;
419 let tx = conn.transaction()?;
420 query::store_pending_offboard(&tx, pending)?;
421 tx.commit()?;
422 Ok(())
423 }
424
425 async fn get_pending_offboards(&self) -> anyhow::Result<Vec<PendingOffboard>> {
426 let conn = self.connect()?;
427 query::get_all_pending_offboards(&conn)
428 }
429
430 async fn remove_pending_offboard(&self, movement_id: MovementId) -> anyhow::Result<()> {
431 let mut conn = self.connect()?;
432 let tx = conn.transaction()?;
433 query::remove_pending_offboard(&tx, movement_id)?;
434 tx.commit()?;
435 Ok(())
436 }
437}
438
439#[cfg(any(test, doc))]
440pub mod helpers {
441 use std::path::PathBuf;
442 use std::str::FromStr;
443
444 use rusqlite::Connection;
445
446 #[cfg(any(test, feature = "rand"))]
453 pub fn in_memory_db() -> (PathBuf, Connection) {
454 use rand::{distr, Rng};
455
456 let mut rng = rand::rng();
462 let filename: String = (&mut rng).sample_iter(distr::Alphanumeric)
463 .take(16).map(char::from).collect();
464
465 let connection_string = format!("file:{}?mode=memory&cache=shared", filename);
466 let pathbuf = PathBuf::from_str(&connection_string).unwrap();
467
468 let conn = Connection::open(pathbuf.clone()).unwrap();
469 (pathbuf.clone(), conn)
470 }
471}
472
473#[cfg(test)]
474mod test {
475 use ark::test_util::VTXO_VECTORS;
476
477 use crate::{persist::sqlite::helpers::in_memory_db, vtxo::VtxoState};
478
479 use super::*;
480
481 #[tokio::test]
482 async fn test_add_and_retrieve_vtxos() {
483 let vtxo_1 = &VTXO_VECTORS.board_vtxo;
484 let vtxo_2 = &VTXO_VECTORS.arkoor_htlc_out_vtxo;
485 let vtxo_3 = &VTXO_VECTORS.round2_vtxo;
486
487 let (cs, conn) = in_memory_db();
488 let db = SqliteClient::open(cs).unwrap();
489
490 db.store_vtxos(&[
491 (vtxo_1, &VtxoState::Spendable), (vtxo_2, &VtxoState::Spendable)
492 ]).await.unwrap();
493
494 let vtxo_1_db = db.get_wallet_vtxo(vtxo_1.id()).await.expect("No error").expect("A vtxo was found");
496 assert_eq!(vtxo_1_db.vtxo, *vtxo_1);
497
498 assert!(db.get_wallet_vtxo(vtxo_3.id()).await.expect("No error").is_none());
500
501 let vtxos = db.get_vtxos_by_state(&[VtxoStateKind::Spendable]).await.unwrap();
503 assert_eq!(vtxos.len(), 2);
504 assert!(vtxos.iter().any(|v| v.vtxo == *vtxo_1));
505 assert!(vtxos.iter().any(|v| v.vtxo == *vtxo_2));
506 assert!(!vtxos.iter().any(|v| v.vtxo == *vtxo_3));
507
508 db.update_vtxo_state_checked(
510 vtxo_1.id(), VtxoState::Spent, &VtxoStateKind::UNSPENT_STATES,
511 ).await.unwrap();
512
513 let vtxos = db.get_vtxos_by_state(&[VtxoStateKind::Spendable]).await.unwrap();
514 assert_eq!(vtxos.len(), 1);
515
516 db.store_vtxos(&[(vtxo_3, &VtxoState::Spendable)]).await.unwrap();
518
519 let vtxos = db.get_vtxos_by_state(&[VtxoStateKind::Spendable]).await.unwrap();
520 assert_eq!(vtxos.len(), 2);
521 assert!(vtxos.iter().any(|v| v.vtxo == *vtxo_2));
522 assert!(vtxos.iter().any(|v| v.vtxo == *vtxo_3));
523
524 conn.close().unwrap();
525 }
526
527 #[tokio::test]
528 #[cfg(feature = "onchain-bdk")]
529 async fn test_create_wallet_then_load() {
530 use bdk_wallet::chain::DescriptorExt;
531
532 let (connection_string, conn) = in_memory_db();
533
534 let db = SqliteClient::open(connection_string).unwrap();
535 let network = bitcoin::Network::Testnet;
536
537 let seed = bip39::Mnemonic::generate(12).unwrap().to_seed("");
538 let xpriv = bitcoin::bip32::Xpriv::new_master(network, &seed).unwrap();
539
540 let desc = format!("tr({}/84'/0'/0'/*)", xpriv);
541
542 let _ = db.initialize_bdk_wallet().await.unwrap();
544 let mut created = bdk_wallet::Wallet::create_single(desc.clone())
545 .network(network)
546 .create_wallet_no_persist()
547 .unwrap();
548 db.store_bdk_wallet_changeset(&created.take_staged().unwrap()).await.unwrap();
549
550 let loaded = {
551 let changeset = db.initialize_bdk_wallet().await.unwrap();
552 bdk_wallet::Wallet::load()
553 .descriptor(bdk_wallet::KeychainKind::External, Some(desc.clone()))
554 .extract_keys()
555 .check_network(network)
556 .load_wallet_no_persist(changeset)
557 .unwrap()
558 };
559
560 assert!(loaded.is_some());
561 assert_eq!(
562 created.public_descriptor(bdk_wallet::KeychainKind::External).descriptor_id(),
563 loaded.unwrap().public_descriptor(bdk_wallet::KeychainKind::External).descriptor_id()
564 );
565
566 conn.close().unwrap();
569 }
570}