bark/persist/sqlite/
mod.rs

1//! SQLite persistence backend for Bark.
2//!
3//! This module provides a concrete implementation of the `BarkPersister` trait
4//! backed by a local SQLite database. It encapsulates schema creation and
5//! migrations, typed query helpers, and conversions between in-memory models
6//! and their stored representations. Operations are performed using explicit
7//! connections and transactions to ensure atomic updates across related tables,
8//! covering wallet properties, movements, vtxos and their states, round
9//! lifecycle data, Lightning receives, exit tracking, and sync metadata.
10
11mod convert;
12mod migrations;
13mod query;
14
15
16use std::path::{Path, PathBuf};
17
18use anyhow::Context;
19use bitcoin::{Amount, Txid};
20use bitcoin::secp256k1::PublicKey;
21use chrono::DateTime;
22use lightning_invoice::Bolt11Invoice;
23use log::debug;
24use rusqlite::Connection;
25
26use ark::lightning::{Invoice, PaymentHash, Preimage};
27use bitcoin_ext::BlockDelta;
28
29use crate::{Vtxo, VtxoId, WalletProperties};
30use crate::exit::ExitTxOrigin;
31use crate::movement::{Movement, MovementId, MovementStatus, MovementSubsystem, PaymentMethod};
32use crate::persist::{BarkPersister, RoundStateId, StoredRoundState};
33use crate::persist::models::{LightningReceive, LightningSend, PendingBoard, StoredExit};
34use crate::round::RoundState;
35use crate::vtxo::{VtxoState, VtxoStateKind, WalletVtxo};
36
37/// An implementation of the BarkPersister using rusqlite. Changes are persisted using the given
38/// [PathBuf].
39#[derive(Clone)]
40pub struct SqliteClient {
41	connection_string: PathBuf,
42}
43
44impl SqliteClient {
45	/// Open a new [SqliteClient] with the given file path
46	pub fn open(db_file: impl AsRef<Path>) -> anyhow::Result<SqliteClient> {
47		let path = db_file.as_ref().to_path_buf();
48
49		debug!("Opening database at {}", path.display());
50		let mut conn = rusqlite::Connection::open(&path)
51			.with_context(|| format!("Error connecting to database {}", path.display()))?;
52
53		let migrations = migrations::MigrationContext::new();
54		migrations.do_all_migrations(&mut conn)?;
55
56		Ok( Self { connection_string: path })
57	}
58
59	fn connect(&self) -> anyhow::Result<Connection> {
60		rusqlite::Connection::open(&self.connection_string)
61			.with_context(|| format!("Error connecting to database {}", self.connection_string.display()))
62	}
63}
64
65#[async_trait]
66impl BarkPersister for SqliteClient {
67	async fn init_wallet(&self, properties: &WalletProperties) -> anyhow::Result<()> {
68		let mut conn = self.connect()?;
69		let tx = conn.transaction()?;
70
71		query::set_properties(&tx, properties)?;
72
73		tx.commit()?;
74		Ok(())
75	}
76
77	#[cfg(feature = "onchain_bdk")]
78	async fn initialize_bdk_wallet(&self) -> anyhow::Result<bdk_wallet::ChangeSet> {
79	    let mut conn = self.connect()?;
80		Ok(bdk_wallet::WalletPersister::initialize(&mut conn)?)
81	}
82
83	#[cfg(feature = "onchain_bdk")]
84	async fn store_bdk_wallet_changeset(&self, changeset: &bdk_wallet::ChangeSet) -> anyhow::Result<()> {
85	    let mut conn = self.connect()?;
86		bdk_wallet::WalletPersister::persist(&mut conn, changeset)?;
87		Ok(())
88	}
89
90	async fn read_properties(&self) -> anyhow::Result<Option<WalletProperties>> {
91		let conn = self.connect()?;
92		Ok(query::fetch_properties(&conn)?)
93	}
94
95	async fn check_recipient_exists(&self, recipient: &PaymentMethod) -> anyhow::Result<bool> {
96		let conn = self.connect()?;
97		query::check_recipient_exists(&conn, recipient)
98	}
99
100	async fn create_new_movement(&self,
101		status: MovementStatus,
102		subsystem: &MovementSubsystem,
103		time: DateTime<chrono::Local>,
104	) -> anyhow::Result<MovementId> {
105		let mut conn = self.connect()?;
106		let tx = conn.transaction()?;
107		let movement_id = query::create_new_movement(&tx, status, subsystem, time)?;
108		tx.commit()?;
109		Ok(movement_id)
110	}
111
112	async fn update_movement(&self, movement: &Movement) -> anyhow::Result<()> {
113		let mut conn = self.connect()?;
114		let tx = conn.transaction()?;
115		query::update_movement(&tx, movement)?;
116		tx.commit()?;
117		Ok(())
118	}
119
120	async fn get_movement_by_id(&self, movement_id: MovementId) -> anyhow::Result<Movement> {
121		let conn = self.connect()?;
122		query::get_movement_by_id(&conn, movement_id)
123	}
124
125	async fn get_all_movements(&self) -> anyhow::Result<Vec<Movement>> {
126		let conn = self.connect()?;
127		query::get_all_movements(&conn)
128	}
129
130	async fn store_pending_board(
131		&self,
132		vtxo: &Vtxo,
133		funding_tx: &bitcoin::Transaction,
134		movement_id: MovementId,
135	) -> anyhow::Result<()> {
136		let mut conn = self.connect()?;
137		let tx = conn.transaction()?;
138		query::store_new_pending_board(&tx, vtxo, funding_tx, movement_id)?;
139		tx.commit()?;
140		Ok(())
141	}
142
143	async fn remove_pending_board(&self, vtxo_id: &VtxoId) -> anyhow::Result<()> {
144		let mut conn = self.connect()?;
145		let tx = conn.transaction()?;
146		query::remove_pending_board(&tx, vtxo_id)?;
147		tx.commit()?;
148		Ok(())
149	}
150
151	async fn get_all_pending_board_ids(&self) -> anyhow::Result<Vec<VtxoId>> {
152		let conn = self.connect()?;
153		query::get_all_pending_boards_ids(&conn)
154	}
155
156	async fn get_pending_board_by_vtxo_id(&self, vtxo_id: VtxoId) -> anyhow::Result<Option<PendingBoard>> {
157		let conn = self.connect()?;
158		query::get_pending_board_by_vtxo_id(&conn, vtxo_id)
159	}
160
161	async fn store_round_state_lock_vtxos(&self, round_state: &RoundState) -> anyhow::Result<RoundStateId> {
162		let mut conn = self.connect()?;
163		let tx = conn.transaction()?;
164		for vtxo in round_state.participation().inputs.iter() {
165			query::update_vtxo_state_checked(
166				&*tx,
167				vtxo.id(),
168				VtxoState::Locked { movement_id: round_state.movement_id },
169				&[VtxoStateKind::Spendable],
170			)?;
171		}
172		let id = query::store_round_state(&tx, round_state)?;
173		tx.commit()?;
174		Ok(id)
175	}
176
177	async fn update_round_state(&self, state: &StoredRoundState) -> anyhow::Result<()> {
178		let conn = self.connect()?;
179		query::update_round_state(&conn, state)?;
180		Ok(())
181	}
182
183	async fn remove_round_state(&self, round_state: &StoredRoundState) -> anyhow::Result<()> {
184		let conn = self.connect()?;
185		query::remove_round_state(&conn, round_state.id)?;
186		Ok(())
187	}
188
189	async fn load_round_states(&self) -> anyhow::Result<Vec<StoredRoundState>> {
190		let conn = self.connect()?;
191		query::load_round_states(&conn)
192	}
193
194	async fn store_vtxos(
195		&self,
196		vtxos: &[(&Vtxo, &VtxoState)],
197	) -> anyhow::Result<()> {
198		let mut conn = self.connect()?;
199		let tx = conn.transaction()?;
200
201		for (vtxo, state) in vtxos {
202			query::store_vtxo_with_initial_state(&tx, vtxo, state)?;
203		}
204		tx.commit()?;
205		Ok(())
206	}
207
208	async fn get_wallet_vtxo(&self, id: VtxoId) -> anyhow::Result<Option<WalletVtxo>> {
209		let conn = self.connect()?;
210		query::get_wallet_vtxo_by_id(&conn, id)
211	}
212
213	async fn get_all_vtxos(&self) -> anyhow::Result<Vec<WalletVtxo>> {
214		let conn = self.connect()?;
215		query::get_all_vtxos(&conn)
216	}
217
218	/// Get all VTXOs that are in one of the provided states
219	async fn get_vtxos_by_state(&self, state: &[VtxoStateKind]) -> anyhow::Result<Vec<WalletVtxo>> {
220		let conn = self.connect()?;
221		query::get_vtxos_by_state(&conn, state)
222	}
223
224	async fn has_spent_vtxo(&self, id: VtxoId) -> anyhow::Result<bool> {
225		let conn = self.connect()?;
226		let state : Option<VtxoState> = query::get_vtxo_state(&conn, id)?;
227		let result = state.map(|s| s == VtxoState::Spent).unwrap_or(false);
228		Ok(result)
229	}
230
231	async fn remove_vtxo(&self, id: VtxoId) -> anyhow::Result<Option<Vtxo>> {
232		let mut conn = self.connect()?;
233		let tx = conn.transaction().context("Failed to start transaction")?;
234		let result = query::delete_vtxo(&tx, id);
235		tx.commit().context("Failed to commit transaction")?;
236		result
237	}
238
239	async fn store_vtxo_key(&self, index: u32, public_key: PublicKey) -> anyhow::Result<()> {
240		let conn = self.connect()?;
241		query::store_vtxo_key(&conn, index, public_key)
242	}
243
244	async fn get_last_vtxo_key_index(&self) -> anyhow::Result<Option<u32>> {
245		let conn = self.connect()?;
246		query::get_last_vtxo_key_index(&conn)
247	}
248
249	async fn get_public_key_idx(&self, public_key: &PublicKey) -> anyhow::Result<Option<u32>> {
250		let conn = self.connect()?;
251		query::get_public_key_idx(&conn, public_key)
252	}
253
254	/// Store a lightning receive
255	async fn store_lightning_receive(
256		&self,
257		payment_hash: PaymentHash,
258		preimage: Preimage,
259		invoice: &Bolt11Invoice,
260		htlc_recv_cltv_delta: BlockDelta,
261	) -> anyhow::Result<()> {
262		let conn = self.connect()?;
263		query::store_lightning_receive(
264			&conn, payment_hash, preimage, invoice, htlc_recv_cltv_delta,
265		)?;
266		Ok(())
267	}
268
269	async fn store_new_pending_lightning_send(
270		&self,
271		invoice: &Invoice,
272		amount: &Amount,
273		vtxos: &[VtxoId],
274		movement_id: MovementId,
275	) -> anyhow::Result<LightningSend> {
276		let conn = self.connect()?;
277		query::store_new_pending_lightning_send(&conn, invoice, amount, vtxos, movement_id)
278	}
279
280	async fn get_all_pending_lightning_send(&self) -> anyhow::Result<Vec<LightningSend>> {
281		let conn = self.connect()?;
282		query::get_all_pending_lightning_send(&conn)
283	}
284
285	async fn finish_lightning_send(
286		&self,
287		payment_hash: PaymentHash,
288		preimage: Option<Preimage>,
289	) -> anyhow::Result<()> {
290		let conn = self.connect()?;
291		query::finish_lightning_send(&conn, payment_hash, preimage)
292	}
293
294	async fn remove_lightning_send(&self, payment_hash: PaymentHash) -> anyhow::Result<()> {
295		let conn = self.connect()?;
296		query::remove_lightning_send(&conn, payment_hash)?;
297		Ok(())
298	}
299
300	async fn get_lightning_send(&self, payment_hash: PaymentHash) -> anyhow::Result<Option<LightningSend>> {
301		let conn = self.connect()?;
302		query::get_lightning_send(&conn, payment_hash)
303	}
304
305	async fn get_all_pending_lightning_receives(&self) -> anyhow::Result<Vec<LightningReceive>> {
306		let conn = self.connect()?;
307		query::get_all_pending_lightning_receives(&conn)
308	}
309
310	async fn set_preimage_revealed(&self, payment_hash: PaymentHash) -> anyhow::Result<()> {
311		let conn = self.connect()?;
312		query::set_preimage_revealed(&conn, payment_hash)?;
313		Ok(())
314	}
315
316	async fn update_lightning_receive(
317		&self,
318		payment_hash: PaymentHash,
319		htlc_vtxo_ids: &[VtxoId],
320		movement_id: MovementId,
321	) -> anyhow::Result<()> {
322		let conn = self.connect()?;
323		query::update_lightning_receive(&conn, payment_hash, htlc_vtxo_ids, movement_id)?;
324		Ok(())
325	}
326
327	/// Fetch a lightning receive by payment hash
328	async fn fetch_lightning_receive_by_payment_hash(
329		&self,
330		payment_hash: PaymentHash,
331	) -> anyhow::Result<Option<LightningReceive>> {
332		let conn = self.connect()?;
333		query::fetch_lightning_receive_by_payment_hash(&conn, payment_hash)
334	}
335
336	async fn finish_pending_lightning_receive(&self, payment_hash: PaymentHash) -> anyhow::Result<()> {
337		let conn = self.connect()?;
338		query::finish_pending_lightning_receive(&conn, payment_hash)?;
339		Ok(())
340	}
341
342	async fn store_exit_vtxo_entry(&self, exit: &StoredExit) -> anyhow::Result<()> {
343		let mut conn = self.connect()?;
344		let tx = conn.transaction()?;
345		query::store_exit_vtxo_entry(&tx, exit)?;
346		tx.commit()?;
347		Ok(())
348	}
349
350	async fn remove_exit_vtxo_entry(&self, id: &VtxoId) -> anyhow::Result<()> {
351		let mut conn = self.connect()?;
352		let tx = conn.transaction()?;
353		query::remove_exit_vtxo_entry(&tx, &id)?;
354		tx.commit()?;
355		Ok(())
356	}
357
358	async fn get_exit_vtxo_entries(&self) -> anyhow::Result<Vec<StoredExit>> {
359		let conn = self.connect()?;
360		query::get_exit_vtxo_entries(&conn)
361	}
362
363	async fn store_exit_child_tx(
364		&self,
365		exit_txid: Txid,
366		child_tx: &bitcoin::Transaction,
367		origin: ExitTxOrigin,
368	) -> anyhow::Result<()> {
369		let mut conn = self.connect()?;
370		let tx = conn.transaction()?;
371		query::store_exit_child_tx(&tx, exit_txid, child_tx, origin)?;
372		tx.commit()?;
373		Ok(())
374	}
375
376	async fn get_exit_child_tx(
377		&self,
378		exit_txid: Txid,
379	) -> anyhow::Result<Option<(bitcoin::Transaction, ExitTxOrigin)>> {
380		let conn = self.connect()?;
381		query::get_exit_child_tx(&conn, exit_txid)
382	}
383
384	async fn update_vtxo_state_checked(
385		&self,
386		vtxo_id: VtxoId,
387		new_state: VtxoState,
388		allowed_old_states: &[VtxoStateKind]
389	) -> anyhow::Result<WalletVtxo> {
390		let conn = self.connect()?;
391		query::update_vtxo_state_checked(&conn, vtxo_id, new_state, allowed_old_states)
392	}
393}
394
395#[cfg(any(test, doc))]
396pub mod helpers {
397	use std::path::PathBuf;
398	use std::str::FromStr;
399
400	use rusqlite::Connection;
401
402	/// Creates an in-memory sqlite connection.
403	///
404	/// It returns a [PathBuf] and a [Connection].
405	/// The user should ensure the [Connection] isn't dropped
406	/// until the test completes. If all connections are dropped during
407	/// the test the entire database might be cleared.
408	#[cfg(any(test, feature = "rand"))]
409	pub fn in_memory_db() -> (PathBuf, Connection) {
410		use rand::{distr, Rng};
411
412		// All tests run in the same process and share the same
413		// cache. To ensure that each call to `in_memory` results
414		// in a new database a random file-name is generated.
415		//
416		// This database is deleted once all connections are dropped
417		let mut rng = rand::rng();
418		let filename: String = (&mut rng).sample_iter(distr::Alphanumeric)
419			.take(16).map(char::from).collect();
420
421		let connection_string = format!("file:{}?mode=memory&cache=shared", filename);
422		let pathbuf = PathBuf::from_str(&connection_string).unwrap();
423
424		let conn = Connection::open(pathbuf.clone()).unwrap();
425		(pathbuf.clone(), conn)
426	}
427}
428
429#[cfg(test)]
430mod test {
431	use bitcoin::bip32;
432
433	use ark::vtxo::test::VTXO_VECTORS;
434
435	use crate::{persist::sqlite::helpers::in_memory_db, vtxo::VtxoState};
436
437	use super::*;
438
439	#[tokio::test]
440	async fn test_add_and_retrieve_vtxos() {
441		let vtxo_1 = &VTXO_VECTORS.board_vtxo;
442		let vtxo_2 = &VTXO_VECTORS.arkoor_htlc_out_vtxo;
443		let vtxo_3 = &VTXO_VECTORS.round2_vtxo;
444
445		let (cs, conn) = in_memory_db();
446		let db = SqliteClient::open(cs).unwrap();
447
448		db.store_vtxos(&[
449			(vtxo_1, &VtxoState::Spendable), (vtxo_2, &VtxoState::Spendable)
450		]).await.unwrap();
451
452		// Check that vtxo-1 can be retrieved from the database
453		let vtxo_1_db = db.get_wallet_vtxo(vtxo_1.id()).await.expect("No error").expect("A vtxo was found");
454		assert_eq!(vtxo_1_db.vtxo, *vtxo_1);
455
456		// Verify that vtxo 3 is not in the database
457		assert!(db.get_wallet_vtxo(vtxo_3.id()).await.expect("No error").is_none());
458
459		// Verify that we have two entries in the database
460		let vtxos = db.get_vtxos_by_state(&[VtxoStateKind::Spendable]).await.unwrap();
461		assert_eq!(vtxos.len(), 2);
462		assert!(vtxos.iter().any(|v| v.vtxo == *vtxo_1));
463		assert!(vtxos.iter().any(|v| v.vtxo == *vtxo_2));
464		assert!(!vtxos.iter().any(|v| v.vtxo == *vtxo_3));
465
466		// Verify that we can mark a vtxo as spent
467		db.update_vtxo_state_checked(
468			vtxo_1.id(), VtxoState::Spent, &VtxoStateKind::UNSPENT_STATES,
469		).await.unwrap();
470
471		let vtxos = db.get_vtxos_by_state(&[VtxoStateKind::Spendable]).await.unwrap();
472		assert_eq!(vtxos.len(), 1);
473
474		// Add the third entry to the database
475		db.store_vtxos(&[(vtxo_3, &VtxoState::Spendable)]).await.unwrap();
476
477		let vtxos = db.get_vtxos_by_state(&[VtxoStateKind::Spendable]).await.unwrap();
478		assert_eq!(vtxos.len(), 2);
479		assert!(vtxos.iter().any(|v| v.vtxo == *vtxo_2));
480		assert!(vtxos.iter().any(|v| v.vtxo == *vtxo_3));
481
482		conn.close().unwrap();
483	}
484
485	#[tokio::test]
486	#[cfg(feature = "onchain_bdk")]
487	async fn test_create_wallet_then_load() {
488		use bdk_wallet::chain::DescriptorExt;
489
490		let (connection_string, conn) = in_memory_db();
491
492		let db = SqliteClient::open(connection_string).unwrap();
493		let network = bitcoin::Network::Testnet;
494
495		let seed = bip39::Mnemonic::generate(12).unwrap().to_seed("");
496		let xpriv = bip32::Xpriv::new_master(network, &seed).unwrap();
497
498		let desc = format!("tr({}/84'/0'/0'/*)", xpriv);
499
500		// need to call init before we call store
501		let _ = db.initialize_bdk_wallet().await.unwrap();
502		let mut created = bdk_wallet::Wallet::create_single(desc.clone())
503			.network(network)
504			.create_wallet_no_persist()
505			.unwrap();
506		db.store_bdk_wallet_changeset(&created.take_staged().unwrap()).await.unwrap();
507
508		let loaded = {
509			let changeset = db.initialize_bdk_wallet().await.unwrap();
510			bdk_wallet::Wallet::load()
511				.descriptor(bdk_wallet::KeychainKind::External, Some(desc.clone()))
512				.extract_keys()
513				.check_network(network)
514				.load_wallet_no_persist(changeset)
515				.unwrap()
516		};
517
518		assert!(loaded.is_some());
519		assert_eq!(
520			created.public_descriptor(bdk_wallet::KeychainKind::External).descriptor_id(),
521			loaded.unwrap().public_descriptor(bdk_wallet::KeychainKind::External).descriptor_id()
522		);
523
524		// Explicitly close the connection here
525		// This ensures the database isn't dropped during the test
526		conn.close().unwrap();
527	}
528}