bark/persist/sqlite/
mod.rs

1//! SQLite persistence backend for Bark.
2//!
3//! This module provides a concrete implementation of the `BarkPersister` trait
4//! backed by a local SQLite database. It encapsulates schema creation and
5//! migrations, typed query helpers, and conversions between in-memory models
6//! and their stored representations. Operations are performed using explicit
7//! connections and transactions to ensure atomic updates across related tables,
8//! covering wallet properties, movements, vtxos and their states, round
9//! lifecycle data, Lightning receives, exit tracking, and sync metadata.
10
11mod convert;
12mod migrations;
13mod query;
14
15
16use std::path::{Path, PathBuf};
17
18use anyhow::Context;
19use bitcoin::{Amount, Txid};
20use bitcoin::secp256k1::PublicKey;
21use chrono::DateTime;
22use lightning_invoice::Bolt11Invoice;
23use log::debug;
24use rusqlite::Connection;
25
26use ark::lightning::{Invoice, PaymentHash, Preimage};
27use bitcoin_ext::BlockDelta;
28
29use crate::{Vtxo, VtxoId, VtxoState, WalletProperties};
30use crate::exit::models::ExitTxOrigin;
31use crate::movement::{Movement, MovementId, MovementStatus, MovementSubsystem};
32use crate::persist::{BarkPersister, RoundStateId, StoredRoundState};
33use crate::persist::models::{LightningReceive, LightningSend, PendingBoard, StoredExit};
34use crate::round::{RoundState, UnconfirmedRound};
35use crate::vtxo::state::{VtxoStateKind, WalletVtxo};
36
37/// An implementation of the BarkPersister using rusqlite. Changes are persisted using the given
38/// [PathBuf].
39#[derive(Clone)]
40pub struct SqliteClient {
41	connection_string: PathBuf,
42}
43
44impl SqliteClient {
45	/// Open a new [SqliteClient] with the given file path
46	pub fn open(db_file: impl AsRef<Path>) -> anyhow::Result<SqliteClient> {
47		let path = db_file.as_ref().to_path_buf();
48
49		debug!("Opening database at {}", path.display());
50		let mut conn = rusqlite::Connection::open(&path)
51			.with_context(|| format!("Error connecting to database {}", path.display()))?;
52
53		let migrations = migrations::MigrationContext::new();
54		migrations.do_all_migrations(&mut conn)?;
55
56		Ok( Self { connection_string: path })
57	}
58
59	fn connect(&self) -> anyhow::Result<Connection> {
60		rusqlite::Connection::open(&self.connection_string)
61			.with_context(|| format!("Error connecting to database {}", self.connection_string.display()))
62	}
63}
64
65impl BarkPersister for SqliteClient {
66	fn init_wallet(&self, properties: &WalletProperties) -> anyhow::Result<()> {
67		let mut conn = self.connect()?;
68		let tx = conn.transaction()?;
69
70		query::set_properties(&tx, properties)?;
71
72		tx.commit()?;
73		Ok(())
74	}
75
76	#[cfg(feature = "onchain_bdk")]
77	fn initialize_bdk_wallet(&self) -> anyhow::Result<bdk_wallet::ChangeSet> {
78	    let mut conn = self.connect()?;
79		Ok(bdk_wallet::WalletPersister::initialize(&mut conn)?)
80	}
81
82	#[cfg(feature = "onchain_bdk")]
83	fn store_bdk_wallet_changeset(&self, changeset: &bdk_wallet::ChangeSet) -> anyhow::Result<()> {
84	    let mut conn = self.connect()?;
85		bdk_wallet::WalletPersister::persist(&mut conn, changeset)?;
86		Ok(())
87	}
88
89	fn read_properties(&self) -> anyhow::Result<Option<WalletProperties>> {
90		let conn = self.connect()?;
91		Ok(query::fetch_properties(&conn)?)
92	}
93
94	fn check_recipient_exists(&self, recipient: &str) -> anyhow::Result<bool> {
95		let conn = self.connect()?;
96		query::check_recipient_exists(&conn, recipient)
97	}
98
99	fn create_new_movement(&self,
100		status: MovementStatus,
101		subsystem: &MovementSubsystem,
102		time: DateTime<chrono::Local>,
103	) -> anyhow::Result<MovementId> {
104		let mut conn = self.connect()?;
105		let tx = conn.transaction()?;
106		let movement_id = query::create_new_movement(&tx, status, subsystem, time)?;
107		tx.commit()?;
108		Ok(movement_id)
109	}
110
111	fn update_movement(&self, movement: &Movement) -> anyhow::Result<()> {
112		let mut conn = self.connect()?;
113		let tx = conn.transaction()?;
114		query::update_movement(&tx, movement)?;
115		tx.commit()?;
116		Ok(())
117	}
118
119	fn get_movement_by_id(&self, movement_id: MovementId) -> anyhow::Result<Movement> {
120		let conn = self.connect()?;
121		query::get_movement_by_id(&conn, movement_id)
122	}
123
124	fn get_all_movements(&self) -> anyhow::Result<Vec<Movement>> {
125		let conn = self.connect()?;
126		query::get_all_movements(&conn)
127	}
128
129	fn store_pending_board(
130		&self,
131		vtxo: &Vtxo,
132		funding_tx: &bitcoin::Transaction,
133		movement_id: MovementId,
134	) -> anyhow::Result<()> {
135		let mut conn = self.connect()?;
136		let tx = conn.transaction()?;
137		query::store_new_pending_board(&tx, vtxo, funding_tx, movement_id)?;
138		tx.commit()?;
139		Ok(())
140	}
141
142	fn remove_pending_board(&self, vtxo_id: &VtxoId) -> anyhow::Result<()> {
143		let mut conn = self.connect()?;
144		let tx = conn.transaction()?;
145		query::remove_pending_board(&tx, vtxo_id)?;
146		tx.commit()?;
147		Ok(())
148	}
149
150	fn get_all_pending_board_ids(&self) -> anyhow::Result<Vec<VtxoId>> {
151		let conn = self.connect()?;
152		query::get_all_pending_boards_ids(&conn)
153	}
154
155	fn get_pending_board_by_vtxo_id(&self, vtxo_id: VtxoId) -> anyhow::Result<Option<PendingBoard>> {
156		let conn = self.connect()?;
157		query::get_pending_board_by_vtxo_id(&conn, vtxo_id)
158	}
159
160	fn store_round_state_lock_vtxos(&self, round_state: &RoundState) -> anyhow::Result<RoundStateId> {
161		let mut conn = self.connect()?;
162		let tx = conn.transaction()?;
163		for vtxo in round_state.participation().inputs.iter() {
164			query::update_vtxo_state_checked(
165				&*tx,
166				vtxo.id(),
167				VtxoState::Locked { movement_id: round_state.movement_id },
168				&[VtxoStateKind::Spendable],
169			)?;
170		}
171		let id = query::store_round_state(&tx, round_state)?;
172		tx.commit()?;
173		Ok(id)
174	}
175
176	fn update_round_state(&self, state: &StoredRoundState) -> anyhow::Result<()> {
177		let conn = self.connect()?;
178		query::update_round_state(&conn, state)?;
179		Ok(())
180	}
181
182	fn remove_round_state(&self, round_state: &StoredRoundState) -> anyhow::Result<()> {
183		let conn = self.connect()?;
184		query::remove_round_state(&conn, round_state.id)?;
185		Ok(())
186	}
187
188	fn load_round_states(&self) -> anyhow::Result<Vec<StoredRoundState>> {
189		let conn = self.connect()?;
190		query::load_round_states(&conn)
191	}
192
193	fn store_recovered_round(&self, round: &UnconfirmedRound) -> anyhow::Result<()> {
194		let conn = self.connect()?;
195		query::store_recovered_past_round(&conn, round)
196	}
197
198	fn remove_recovered_round(&self, funding_txid: Txid) -> anyhow::Result<()> {
199		let conn = self.connect()?;
200		query::remove_recovered_past_round(&conn, funding_txid)
201	}
202
203	fn load_recovered_rounds(&self) -> anyhow::Result<Vec<UnconfirmedRound>> {
204		let conn = self.connect()?;
205		query::load_recovered_past_rounds(&conn)
206	}
207
208	fn store_vtxos(
209		&self,
210		vtxos: &[(&Vtxo, &VtxoState)],
211	) -> anyhow::Result<()> {
212		let mut conn = self.connect()?;
213		let tx = conn.transaction()?;
214
215		for (vtxo, state) in vtxos {
216			query::store_vtxo_with_initial_state(&tx, vtxo, state)?;
217		}
218		tx.commit()?;
219		Ok(())
220	}
221
222	fn get_wallet_vtxo(&self, id: VtxoId) -> anyhow::Result<Option<WalletVtxo>> {
223		let conn = self.connect()?;
224		query::get_wallet_vtxo_by_id(&conn, id)
225	}
226
227	fn get_all_vtxos(&self) -> anyhow::Result<Vec<WalletVtxo>> {
228		let conn = self.connect()?;
229		query::get_all_vtxos(&conn)
230	}
231
232	/// Get all VTXOs that are in one of the provided states
233	fn get_vtxos_by_state(&self, state: &[VtxoStateKind]) -> anyhow::Result<Vec<WalletVtxo>> {
234		let conn = self.connect()?;
235		query::get_vtxos_by_state(&conn, state)
236	}
237
238	fn has_spent_vtxo(&self, id: VtxoId) -> anyhow::Result<bool> {
239		let conn = self.connect()?;
240		let state : Option<VtxoState> = query::get_vtxo_state(&conn, id)?;
241		let result = state.map(|s| s == VtxoState::Spent).unwrap_or(false);
242		Ok(result)
243	}
244
245	fn remove_vtxo(&self, id: VtxoId) -> anyhow::Result<Option<Vtxo>> {
246		let mut conn = self.connect()?;
247		let tx = conn.transaction().context("Failed to start transaction")?;
248		let result = query::delete_vtxo(&tx, id);
249		tx.commit().context("Failed to commit transaction")?;
250		result
251	}
252
253	fn store_vtxo_key(&self, index: u32, public_key: PublicKey) -> anyhow::Result<()> {
254		let conn = self.connect()?;
255		query::store_vtxo_key(&conn, index, public_key)
256	}
257
258	fn get_last_vtxo_key_index(&self) -> anyhow::Result<Option<u32>> {
259		let conn = self.connect()?;
260		query::get_last_vtxo_key_index(&conn)
261	}
262
263	fn get_public_key_idx(&self, public_key: &PublicKey) -> anyhow::Result<Option<u32>> {
264		let conn = self.connect()?;
265		query::get_public_key_idx(&conn, public_key)
266	}
267
268	/// Store a lightning receive
269	fn store_lightning_receive(
270		&self,
271		payment_hash: PaymentHash,
272		preimage: Preimage,
273		invoice: &Bolt11Invoice,
274		htlc_recv_cltv_delta: BlockDelta,
275	) -> anyhow::Result<()> {
276		let conn = self.connect()?;
277		query::store_lightning_receive(
278			&conn, payment_hash, preimage, invoice, htlc_recv_cltv_delta,
279		)?;
280		Ok(())
281	}
282
283	fn store_new_pending_lightning_send(
284		&self,
285		invoice: &Invoice,
286		amount: &Amount,
287		vtxos: &[VtxoId],
288		movement_id: MovementId,
289	) -> anyhow::Result<LightningSend> {
290		let conn = self.connect()?;
291		query::store_new_pending_lightning_send(&conn, invoice, amount, vtxos, movement_id)
292	}
293
294	fn get_all_pending_lightning_send(&self) -> anyhow::Result<Vec<LightningSend>> {
295		let conn = self.connect()?;
296		query::get_all_pending_lightning_send(&conn)
297	}
298
299	fn finish_lightning_send(
300		&self,
301		payment_hash: PaymentHash,
302		preimage: Option<Preimage>,
303	) -> anyhow::Result<()> {
304		let conn = self.connect()?;
305		query::finish_lightning_send(&conn, payment_hash, preimage)
306	}
307
308	fn remove_lightning_send(&self, payment_hash: PaymentHash) -> anyhow::Result<()> {
309		let conn = self.connect()?;
310		query::remove_lightning_send(&conn, payment_hash)?;
311		Ok(())
312	}
313
314	fn get_lightning_send(&self, payment_hash: PaymentHash) -> anyhow::Result<Option<LightningSend>> {
315		let conn = self.connect()?;
316		query::get_lightning_send(&conn, payment_hash)
317	}
318
319	fn get_all_pending_lightning_receives(&self) -> anyhow::Result<Vec<LightningReceive>> {
320		let conn = self.connect()?;
321		query::get_all_pending_lightning_receives(&conn)
322	}
323
324	fn set_preimage_revealed(&self, payment_hash: PaymentHash) -> anyhow::Result<()> {
325		let conn = self.connect()?;
326		query::set_preimage_revealed(&conn, payment_hash)?;
327		Ok(())
328	}
329
330	fn update_lightning_receive(
331		&self,
332		payment_hash: PaymentHash,
333		htlc_vtxo_ids: &[VtxoId],
334		movement_id: MovementId,
335	) -> anyhow::Result<()> {
336		let conn = self.connect()?;
337		query::update_lightning_receive(&conn, payment_hash, htlc_vtxo_ids, movement_id)?;
338		Ok(())
339	}
340
341	/// Fetch a lightning receive by payment hash
342	fn fetch_lightning_receive_by_payment_hash(
343		&self,
344		payment_hash: PaymentHash,
345	) -> anyhow::Result<Option<LightningReceive>> {
346		let conn = self.connect()?;
347		query::fetch_lightning_receive_by_payment_hash(&conn, payment_hash)
348	}
349
350	fn finish_pending_lightning_receive(&self, payment_hash: PaymentHash) -> anyhow::Result<()> {
351		let conn = self.connect()?;
352		query::finish_pending_lightning_receive(&conn, payment_hash)?;
353		Ok(())
354	}
355
356	fn store_exit_vtxo_entry(&self, exit: &StoredExit) -> anyhow::Result<()> {
357		let mut conn = self.connect()?;
358		let tx = conn.transaction()?;
359		query::store_exit_vtxo_entry(&tx, exit)?;
360		tx.commit()?;
361		Ok(())
362	}
363
364	fn remove_exit_vtxo_entry(&self, id: &VtxoId) -> anyhow::Result<()> {
365		let mut conn = self.connect()?;
366		let tx = conn.transaction()?;
367		query::remove_exit_vtxo_entry(&tx, &id)?;
368		tx.commit()?;
369		Ok(())
370	}
371
372	fn get_exit_vtxo_entries(&self) -> anyhow::Result<Vec<StoredExit>> {
373		let conn = self.connect()?;
374		query::get_exit_vtxo_entries(&conn)
375	}
376
377	fn store_exit_child_tx(
378		&self,
379		exit_txid: Txid,
380		child_tx: &bitcoin::Transaction,
381		origin: ExitTxOrigin,
382	) -> anyhow::Result<()> {
383		let mut conn = self.connect()?;
384		let tx = conn.transaction()?;
385		query::store_exit_child_tx(&tx, exit_txid, child_tx, origin)?;
386		tx.commit()?;
387		Ok(())
388	}
389
390	fn get_exit_child_tx(
391		&self,
392		exit_txid: Txid,
393	) -> anyhow::Result<Option<(bitcoin::Transaction, ExitTxOrigin)>> {
394		let conn = self.connect()?;
395		query::get_exit_child_tx(&conn, exit_txid)
396	}
397
398	fn update_vtxo_state_checked(
399		&self,
400		vtxo_id: VtxoId,
401		new_state: VtxoState,
402		allowed_old_states: &[VtxoStateKind]
403	) -> anyhow::Result<WalletVtxo> {
404		let conn = self.connect()?;
405		query::update_vtxo_state_checked(&conn, vtxo_id, new_state, allowed_old_states)
406	}
407}
408
409#[cfg(any(test, doc))]
410pub mod helpers {
411	use std::path::PathBuf;
412	use std::str::FromStr;
413
414	use rusqlite::Connection;
415
416	/// Creates an in-memory sqlite connection.
417	///
418	/// It returns a [PathBuf] and a [Connection].
419	/// The user should ensure the [Connection] isn't dropped
420	/// until the test completes. If all connections are dropped during
421	/// the test the entire database might be cleared.
422	#[cfg(any(test, feature = "rand"))]
423	pub fn in_memory_db() -> (PathBuf, Connection) {
424		use rand::{distr, Rng};
425
426		// All tests run in the same process and share the same
427		// cache. To ensure that each call to `in_memory` results
428		// in a new database a random file-name is generated.
429		//
430		// This database is deleted once all connections are dropped
431		let mut rng = rand::rng();
432		let filename: String = (&mut rng).sample_iter(distr::Alphanumeric)
433			.take(16).map(char::from).collect();
434
435		let connection_string = format!("file:{}?mode=memory&cache=shared", filename);
436		let pathbuf = PathBuf::from_str(&connection_string).unwrap();
437
438		let conn = Connection::open(pathbuf.clone()).unwrap();
439		(pathbuf.clone(), conn)
440	}
441}
442
443#[cfg(test)]
444mod test {
445	use bdk_wallet::chain::DescriptorExt;
446	use bitcoin::bip32;
447
448	use ark::vtxo::test::VTXO_VECTORS;
449
450	use crate::persist::sqlite::helpers::in_memory_db;
451	use crate::vtxo::state::UNSPENT_STATES;
452
453	use super::*;
454
455	#[test]
456	fn test_add_and_retrieve_vtxos() {
457		let vtxo_1 = &VTXO_VECTORS.board_vtxo;
458		let vtxo_2 = &VTXO_VECTORS.arkoor_htlc_out_vtxo;
459		let vtxo_3 = &VTXO_VECTORS.round2_vtxo;
460
461		let (cs, conn) = in_memory_db();
462		let db = SqliteClient::open(cs).unwrap();
463
464		db.store_vtxos(&[
465			(vtxo_1, &VtxoState::Spendable), (vtxo_2, &VtxoState::Spendable)
466		]).unwrap();
467
468		// Check that vtxo-1 can be retrieved from the database
469		let vtxo_1_db = db.get_wallet_vtxo(vtxo_1.id()).expect("No error").expect("A vtxo was found");
470		assert_eq!(vtxo_1_db.vtxo, *vtxo_1);
471
472		// Verify that vtxo 3 is not in the database
473		assert!(db.get_wallet_vtxo(vtxo_3.id()).expect("No error").is_none());
474
475		// Verify that we have two entries in the database
476		let vtxos = db.get_vtxos_by_state(&[VtxoStateKind::Spendable]).unwrap();
477		assert_eq!(vtxos.len(), 2);
478		assert!(vtxos.iter().any(|v| v.vtxo == *vtxo_1));
479		assert!(vtxos.iter().any(|v| v.vtxo == *vtxo_2));
480		assert!(!vtxos.iter().any(|v| v.vtxo == *vtxo_3));
481
482		// Verify that we can mark a vtxo as spent
483		db.update_vtxo_state_checked(
484			vtxo_1.id(), VtxoState::Spent, &UNSPENT_STATES,
485		).unwrap();
486
487		let vtxos = db.get_vtxos_by_state(&[VtxoStateKind::Spendable]).unwrap();
488		assert_eq!(vtxos.len(), 1);
489
490		// Add the third entry to the database
491		db.store_vtxos(&[(vtxo_3, &VtxoState::Spendable)]).unwrap();
492
493		let vtxos = db.get_vtxos_by_state(&[VtxoStateKind::Spendable]).unwrap();
494		assert_eq!(vtxos.len(), 2);
495		assert!(vtxos.iter().any(|v| v.vtxo == *vtxo_2));
496		assert!(vtxos.iter().any(|v| v.vtxo == *vtxo_3));
497
498		conn.close().unwrap();
499	}
500
501	#[test]
502	fn test_create_wallet_then_load() {
503		let (connection_string, conn) = in_memory_db();
504
505		let db = SqliteClient::open(connection_string).unwrap();
506		let network = bitcoin::Network::Testnet;
507
508		let seed = bip39::Mnemonic::generate(12).unwrap().to_seed("");
509		let xpriv = bip32::Xpriv::new_master(network, &seed).unwrap();
510
511		let desc = format!("tr({}/84'/0'/0'/*)", xpriv);
512
513		// need to call init before we call store
514		let _ = db.initialize_bdk_wallet().unwrap();
515		let mut created = bdk_wallet::Wallet::create_single(desc.clone())
516			.network(network)
517			.create_wallet_no_persist()
518			.unwrap();
519		db.store_bdk_wallet_changeset(&created.take_staged().unwrap()).unwrap();
520
521		let loaded = {
522			let changeset = db.initialize_bdk_wallet().unwrap();
523			bdk_wallet::Wallet::load()
524				.descriptor(bdk_wallet::KeychainKind::External, Some(desc.clone()))
525				.extract_keys()
526				.check_network(network)
527				.load_wallet_no_persist(changeset)
528				.unwrap()
529		};
530
531		assert!(loaded.is_some());
532		assert_eq!(
533			created.public_descriptor(bdk_wallet::KeychainKind::External).descriptor_id(),
534			loaded.unwrap().public_descriptor(bdk_wallet::KeychainKind::External).descriptor_id()
535		);
536
537		// Explicitly close the connection here
538		// This ensures the database isn't dropped during the test
539		conn.close().unwrap();
540	}
541}