bark/persist/sqlite/
mod.rs

1//! SQLite persistence backend for Bark.
2//!
3//! This module provides a concrete implementation of the `BarkPersister` trait
4//! backed by a local SQLite database. It encapsulates schema creation and
5//! migrations, typed query helpers, and conversions between in-memory models
6//! and their stored representations. Operations are performed using explicit
7//! connections and transactions to ensure atomic updates across related tables,
8//! covering wallet properties, movements, vtxos and their states, round
9//! lifecycle data, Lightning receives, exit tracking, and sync metadata.
10
11mod convert;
12mod migrations;
13mod query;
14
15
16use std::path::{Path, PathBuf};
17
18use anyhow::Context;
19use bitcoin::{Amount, Txid};
20use bitcoin::secp256k1::PublicKey;
21use chrono::DateTime;
22use lightning_invoice::Bolt11Invoice;
23use log::debug;
24use rusqlite::Connection;
25
26use ark::{Vtxo, VtxoId};
27use ark::lightning::{Invoice, PaymentHash, Preimage};
28use ark::vtxo::Full;
29use bitcoin_ext::BlockDelta;
30
31use crate::WalletProperties;
32use crate::exit::ExitTxOrigin;
33use crate::movement::{Movement, MovementId, MovementStatus, MovementSubsystem};
34use crate::persist::{BarkPersister, RoundStateId, StoredRoundState, Unlocked};
35use crate::persist::models::{LightningReceive, LightningSend, PendingBoard, PendingOffboard, StoredExit};
36use crate::round::RoundState;
37use crate::vtxo::{VtxoState, VtxoStateKind, WalletVtxo};
38
39/// An implementation of the BarkPersister using rusqlite. Changes are persisted using the given
40/// [PathBuf].
41#[derive(Clone)]
42pub struct SqliteClient {
43	connection_string: PathBuf,
44}
45
46impl SqliteClient {
47	/// Open a new [SqliteClient] with the given file path
48	pub fn open(db_file: impl AsRef<Path>) -> anyhow::Result<SqliteClient> {
49		let path = db_file.as_ref().to_path_buf();
50
51		debug!("Opening database at {}", path.display());
52		let mut conn = rusqlite::Connection::open(&path)
53			.with_context(|| format!("Error connecting to database {}", path.display()))?;
54
55		let migrations = migrations::MigrationContext::new();
56		migrations.do_all_migrations(&mut conn)?;
57
58		Ok( Self { connection_string: path })
59	}
60
61	fn connect(&self) -> anyhow::Result<Connection> {
62		rusqlite::Connection::open(&self.connection_string)
63			.with_context(|| format!("Error connecting to database {}", self.connection_string.display()))
64	}
65}
66
67#[async_trait]
68impl BarkPersister for SqliteClient {
69	async fn init_wallet(&self, properties: &WalletProperties) -> anyhow::Result<()> {
70		let mut conn = self.connect()?;
71		let tx = conn.transaction()?;
72
73		query::set_properties(&tx, properties)?;
74
75		tx.commit()?;
76		Ok(())
77	}
78
79	#[cfg(feature = "onchain-bdk")]
80	async fn initialize_bdk_wallet(&self) -> anyhow::Result<bdk_wallet::ChangeSet> {
81	    let mut conn = self.connect()?;
82		Ok(bdk_wallet::WalletPersister::initialize(&mut conn)?)
83	}
84
85	#[cfg(feature = "onchain-bdk")]
86	async fn store_bdk_wallet_changeset(&self, changeset: &bdk_wallet::ChangeSet) -> anyhow::Result<()> {
87	    let mut conn = self.connect()?;
88		bdk_wallet::WalletPersister::persist(&mut conn, changeset)?;
89		Ok(())
90	}
91
92	async fn read_properties(&self) -> anyhow::Result<Option<WalletProperties>> {
93		let conn = self.connect()?;
94		Ok(query::fetch_properties(&conn)?)
95	}
96
97	async fn set_server_pubkey(&self, server_pubkey: PublicKey) -> anyhow::Result<()> {
98		let conn = self.connect()?;
99		query::set_server_pubkey(&conn, &server_pubkey)?;
100		Ok(())
101	}
102
103	async fn create_new_movement(&self,
104		status: MovementStatus,
105		subsystem: &MovementSubsystem,
106		time: DateTime<chrono::Local>,
107	) -> anyhow::Result<MovementId> {
108		let mut conn = self.connect()?;
109		let tx = conn.transaction()?;
110		let movement_id = query::create_new_movement(&tx, status, subsystem, time)?;
111		tx.commit()?;
112		Ok(movement_id)
113	}
114
115	async fn update_movement(&self, movement: &Movement) -> anyhow::Result<()> {
116		let mut conn = self.connect()?;
117		let tx = conn.transaction()?;
118		query::update_movement(&tx, movement)?;
119		tx.commit()?;
120		Ok(())
121	}
122
123	async fn get_movement_by_id(&self, movement_id: MovementId) -> anyhow::Result<Movement> {
124		let conn = self.connect()?;
125		query::get_movement_by_id(&conn, movement_id)
126	}
127
128	async fn get_all_movements(&self) -> anyhow::Result<Vec<Movement>> {
129		let conn = self.connect()?;
130		query::get_all_movements(&conn)
131	}
132
133	async fn store_pending_board(
134		&self,
135		vtxo: &Vtxo<Full>,
136		funding_tx: &bitcoin::Transaction,
137		movement_id: MovementId,
138	) -> anyhow::Result<()> {
139		let mut conn = self.connect()?;
140		let tx = conn.transaction()?;
141		query::store_new_pending_board(&tx, vtxo, funding_tx, movement_id)?;
142		tx.commit()?;
143		Ok(())
144	}
145
146	async fn remove_pending_board(&self, vtxo_id: &VtxoId) -> anyhow::Result<()> {
147		let mut conn = self.connect()?;
148		let tx = conn.transaction()?;
149		query::remove_pending_board(&tx, vtxo_id)?;
150		tx.commit()?;
151		Ok(())
152	}
153
154	async fn get_all_pending_board_ids(&self) -> anyhow::Result<Vec<VtxoId>> {
155		let conn = self.connect()?;
156		query::get_all_pending_boards_ids(&conn)
157	}
158
159	async fn get_pending_board_by_vtxo_id(&self, vtxo_id: VtxoId) -> anyhow::Result<Option<PendingBoard>> {
160		let conn = self.connect()?;
161		query::get_pending_board_by_vtxo_id(&conn, vtxo_id)
162	}
163
164	async fn store_round_state_lock_vtxos(&self, round_state: &RoundState) -> anyhow::Result<RoundStateId> {
165		let mut conn = self.connect()?;
166		let tx = conn.transaction()?;
167		for vtxo in round_state.participation().inputs.iter() {
168			query::update_vtxo_state_checked(
169				&*tx,
170				vtxo.id(),
171				VtxoState::Locked { movement_id: round_state.movement_id },
172				&[VtxoStateKind::Spendable],
173			)?;
174		}
175		let id = query::store_round_state(&tx, round_state)?;
176		tx.commit()?;
177		Ok(id)
178	}
179
180	async fn update_round_state(&self, state: &StoredRoundState) -> anyhow::Result<()> {
181		let conn = self.connect()?;
182		query::update_round_state(&conn, state)?;
183		Ok(())
184	}
185
186	async fn remove_round_state(&self, round_state: &StoredRoundState) -> anyhow::Result<()> {
187		let conn = self.connect()?;
188		query::remove_round_state(&conn, round_state.id())?;
189		Ok(())
190	}
191
192	async fn get_round_state_by_id(&self, id: RoundStateId) -> anyhow::Result<Option<StoredRoundState<Unlocked>>> {
193		let conn = self.connect()?;
194		query::get_round_state_by_id(&conn, id)
195	}
196
197	async fn get_pending_round_state_ids(&self) -> anyhow::Result<Vec<RoundStateId>> {
198		let conn = self.connect()?;
199		query::get_pending_round_state_ids(&conn)
200	}
201
202	async fn store_vtxos(
203		&self,
204		vtxos: &[(&Vtxo<Full>, &VtxoState)],
205	) -> anyhow::Result<()> {
206		let mut conn = self.connect()?;
207		let tx = conn.transaction()?;
208
209		for (vtxo, state) in vtxos {
210			query::store_vtxo_with_initial_state(&tx, vtxo, state)?;
211		}
212		tx.commit()?;
213		Ok(())
214	}
215
216	async fn get_wallet_vtxo(&self, id: VtxoId) -> anyhow::Result<Option<WalletVtxo>> {
217		let conn = self.connect()?;
218		query::get_wallet_vtxo_by_id(&conn, id)
219	}
220
221	async fn get_all_vtxos(&self) -> anyhow::Result<Vec<WalletVtxo>> {
222		let conn = self.connect()?;
223		query::get_all_vtxos(&conn)
224	}
225
226	/// Get all VTXOs that are in one of the provided states
227	async fn get_vtxos_by_state(&self, state: &[VtxoStateKind]) -> anyhow::Result<Vec<WalletVtxo>> {
228		let conn = self.connect()?;
229		query::get_vtxos_by_state(&conn, state)
230	}
231
232	async fn has_spent_vtxo(&self, id: VtxoId) -> anyhow::Result<bool> {
233		let conn = self.connect()?;
234		let state : Option<VtxoState> = query::get_vtxo_state(&conn, id)?;
235		let result = state.map(|s| s == VtxoState::Spent).unwrap_or(false);
236		Ok(result)
237	}
238
239	async fn remove_vtxo(&self, id: VtxoId) -> anyhow::Result<Option<Vtxo<Full>>> {
240		let mut conn = self.connect()?;
241		let tx = conn.transaction().context("Failed to start transaction")?;
242		let result = query::delete_vtxo(&tx, id);
243		tx.commit().context("Failed to commit transaction")?;
244		result
245	}
246
247	async fn store_vtxo_key(&self, index: u32, public_key: PublicKey) -> anyhow::Result<()> {
248		let conn = self.connect()?;
249		query::store_vtxo_key(&conn, index, public_key)
250	}
251
252	async fn get_last_vtxo_key_index(&self) -> anyhow::Result<Option<u32>> {
253		let conn = self.connect()?;
254		query::get_last_vtxo_key_index(&conn)
255	}
256
257	async fn get_public_key_idx(&self, public_key: &PublicKey) -> anyhow::Result<Option<u32>> {
258		let conn = self.connect()?;
259		query::get_public_key_idx(&conn, public_key)
260	}
261
262	async fn get_mailbox_checkpoint(&self) -> anyhow::Result<u64> {
263		let conn = self.connect()?;
264		query::get_mailbox_checkpoint(&conn)
265	}
266
267	async fn store_mailbox_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
268		let conn = self.connect()?;
269		query::store_mailbox_checkpoint(&conn, checkpoint)?;
270		Ok(())
271	}
272
273	/// Store a lightning receive
274	async fn store_lightning_receive(
275		&self,
276		payment_hash: PaymentHash,
277		preimage: Preimage,
278		invoice: &Bolt11Invoice,
279		htlc_recv_cltv_delta: BlockDelta,
280	) -> anyhow::Result<()> {
281		let conn = self.connect()?;
282		query::store_lightning_receive(
283			&conn, payment_hash, preimage, invoice, htlc_recv_cltv_delta,
284		)?;
285		Ok(())
286	}
287
288	async fn store_new_pending_lightning_send(
289		&self,
290		invoice: &Invoice,
291		amount: Amount,
292		fee: Amount,
293		vtxos: &[VtxoId],
294		movement_id: MovementId,
295	) -> anyhow::Result<LightningSend> {
296		let conn = self.connect()?;
297		query::store_new_pending_lightning_send(&conn, invoice, amount, fee, vtxos, movement_id)
298	}
299
300	async fn get_all_pending_lightning_send(&self) -> anyhow::Result<Vec<LightningSend>> {
301		let conn = self.connect()?;
302		query::get_all_pending_lightning_send(&conn)
303	}
304
305	async fn finish_lightning_send(
306		&self,
307		payment_hash: PaymentHash,
308		preimage: Option<Preimage>,
309	) -> anyhow::Result<()> {
310		let conn = self.connect()?;
311		query::finish_lightning_send(&conn, payment_hash, preimage)
312	}
313
314	async fn remove_lightning_send(&self, payment_hash: PaymentHash) -> anyhow::Result<()> {
315		let conn = self.connect()?;
316		query::remove_lightning_send(&conn, payment_hash)?;
317		Ok(())
318	}
319
320	async fn get_lightning_send(&self, payment_hash: PaymentHash) -> anyhow::Result<Option<LightningSend>> {
321		let conn = self.connect()?;
322		query::get_lightning_send(&conn, payment_hash)
323	}
324
325	async fn get_all_pending_lightning_receives(&self) -> anyhow::Result<Vec<LightningReceive>> {
326		let conn = self.connect()?;
327		query::get_all_pending_lightning_receives(&conn)
328	}
329
330	async fn set_preimage_revealed(&self, payment_hash: PaymentHash) -> anyhow::Result<()> {
331		let conn = self.connect()?;
332		query::set_preimage_revealed(&conn, payment_hash)?;
333		Ok(())
334	}
335
336	async fn update_lightning_receive(
337		&self,
338		payment_hash: PaymentHash,
339		htlc_vtxo_ids: &[VtxoId],
340		movement_id: MovementId,
341	) -> anyhow::Result<()> {
342		let conn = self.connect()?;
343		query::update_lightning_receive(&conn, payment_hash, htlc_vtxo_ids, movement_id)?;
344		Ok(())
345	}
346
347	/// Fetch a lightning receive by payment hash
348	async fn fetch_lightning_receive_by_payment_hash(
349		&self,
350		payment_hash: PaymentHash,
351	) -> anyhow::Result<Option<LightningReceive>> {
352		let conn = self.connect()?;
353		query::fetch_lightning_receive_by_payment_hash(&conn, payment_hash)
354	}
355
356	async fn finish_pending_lightning_receive(&self, payment_hash: PaymentHash) -> anyhow::Result<()> {
357		let conn = self.connect()?;
358		query::finish_pending_lightning_receive(&conn, payment_hash)?;
359		Ok(())
360	}
361
362	async fn store_exit_vtxo_entry(&self, exit: &StoredExit) -> anyhow::Result<()> {
363		let mut conn = self.connect()?;
364		let tx = conn.transaction()?;
365		query::store_exit_vtxo_entry(&tx, exit)?;
366		tx.commit()?;
367		Ok(())
368	}
369
370	async fn remove_exit_vtxo_entry(&self, id: &VtxoId) -> anyhow::Result<()> {
371		let mut conn = self.connect()?;
372		let tx = conn.transaction()?;
373		query::remove_exit_vtxo_entry(&tx, &id)?;
374		tx.commit()?;
375		Ok(())
376	}
377
378	async fn get_exit_vtxo_entries(&self) -> anyhow::Result<Vec<StoredExit>> {
379		let conn = self.connect()?;
380		query::get_exit_vtxo_entries(&conn)
381	}
382
383	async fn store_exit_child_tx(
384		&self,
385		exit_txid: Txid,
386		child_tx: &bitcoin::Transaction,
387		origin: ExitTxOrigin,
388	) -> anyhow::Result<()> {
389		let mut conn = self.connect()?;
390		let tx = conn.transaction()?;
391		query::store_exit_child_tx(&tx, exit_txid, child_tx, origin)?;
392		tx.commit()?;
393		Ok(())
394	}
395
396	async fn get_exit_child_tx(
397		&self,
398		exit_txid: Txid,
399	) -> anyhow::Result<Option<(bitcoin::Transaction, ExitTxOrigin)>> {
400		let conn = self.connect()?;
401		query::get_exit_child_tx(&conn, exit_txid)
402	}
403
404	async fn update_vtxo_state_checked(
405		&self,
406		vtxo_id: VtxoId,
407		new_state: VtxoState,
408		allowed_old_states: &[VtxoStateKind]
409	) -> anyhow::Result<WalletVtxo> {
410		let conn = self.connect()?;
411		query::update_vtxo_state_checked(&conn, vtxo_id, new_state, allowed_old_states)
412	}
413
414	async fn store_pending_offboard(
415		&self,
416		pending: &PendingOffboard,
417	) -> anyhow::Result<()> {
418		let mut conn = self.connect()?;
419		let tx = conn.transaction()?;
420		query::store_pending_offboard(&tx, pending)?;
421		tx.commit()?;
422		Ok(())
423	}
424
425	async fn get_pending_offboards(&self) -> anyhow::Result<Vec<PendingOffboard>> {
426		let conn = self.connect()?;
427		query::get_all_pending_offboards(&conn)
428	}
429
430	async fn remove_pending_offboard(&self, movement_id: MovementId) -> anyhow::Result<()> {
431		let mut conn = self.connect()?;
432		let tx = conn.transaction()?;
433		query::remove_pending_offboard(&tx, movement_id)?;
434		tx.commit()?;
435		Ok(())
436	}
437}
438
439#[cfg(any(test, doc))]
440pub mod helpers {
441	use std::path::PathBuf;
442	use std::str::FromStr;
443
444	use rusqlite::Connection;
445
446	/// Creates an in-memory sqlite connection.
447	///
448	/// It returns a [PathBuf] and a [Connection].
449	/// The user should ensure the [Connection] isn't dropped
450	/// until the test completes. If all connections are dropped during
451	/// the test the entire database might be cleared.
452	#[cfg(any(test, feature = "rand"))]
453	pub fn in_memory_db() -> (PathBuf, Connection) {
454		use rand::{distr, Rng};
455
456		// All tests run in the same process and share the same
457		// cache. To ensure that each call to `in_memory` results
458		// in a new database a random file-name is generated.
459		//
460		// This database is deleted once all connections are dropped
461		let mut rng = rand::rng();
462		let filename: String = (&mut rng).sample_iter(distr::Alphanumeric)
463			.take(16).map(char::from).collect();
464
465		let connection_string = format!("file:{}?mode=memory&cache=shared", filename);
466		let pathbuf = PathBuf::from_str(&connection_string).unwrap();
467
468		let conn = Connection::open(pathbuf.clone()).unwrap();
469		(pathbuf.clone(), conn)
470	}
471}
472
473#[cfg(test)]
474mod test {
475	use ark::test_util::VTXO_VECTORS;
476
477	use crate::{persist::sqlite::helpers::in_memory_db, vtxo::VtxoState};
478
479	use super::*;
480
481	#[tokio::test]
482	async fn test_add_and_retrieve_vtxos() {
483		let vtxo_1 = &VTXO_VECTORS.board_vtxo;
484		let vtxo_2 = &VTXO_VECTORS.arkoor_htlc_out_vtxo;
485		let vtxo_3 = &VTXO_VECTORS.round2_vtxo;
486
487		let (cs, conn) = in_memory_db();
488		let db = SqliteClient::open(cs).unwrap();
489
490		db.store_vtxos(&[
491			(vtxo_1, &VtxoState::Spendable), (vtxo_2, &VtxoState::Spendable)
492		]).await.unwrap();
493
494		// Check that vtxo-1 can be retrieved from the database
495		let vtxo_1_db = db.get_wallet_vtxo(vtxo_1.id()).await.expect("No error").expect("A vtxo was found");
496		assert_eq!(vtxo_1_db.vtxo, *vtxo_1);
497
498		// Verify that vtxo 3 is not in the database
499		assert!(db.get_wallet_vtxo(vtxo_3.id()).await.expect("No error").is_none());
500
501		// Verify that we have two entries in the database
502		let vtxos = db.get_vtxos_by_state(&[VtxoStateKind::Spendable]).await.unwrap();
503		assert_eq!(vtxos.len(), 2);
504		assert!(vtxos.iter().any(|v| v.vtxo == *vtxo_1));
505		assert!(vtxos.iter().any(|v| v.vtxo == *vtxo_2));
506		assert!(!vtxos.iter().any(|v| v.vtxo == *vtxo_3));
507
508		// Verify that we can mark a vtxo as spent
509		db.update_vtxo_state_checked(
510			vtxo_1.id(), VtxoState::Spent, &VtxoStateKind::UNSPENT_STATES,
511		).await.unwrap();
512
513		let vtxos = db.get_vtxos_by_state(&[VtxoStateKind::Spendable]).await.unwrap();
514		assert_eq!(vtxos.len(), 1);
515
516		// Add the third entry to the database
517		db.store_vtxos(&[(vtxo_3, &VtxoState::Spendable)]).await.unwrap();
518
519		let vtxos = db.get_vtxos_by_state(&[VtxoStateKind::Spendable]).await.unwrap();
520		assert_eq!(vtxos.len(), 2);
521		assert!(vtxos.iter().any(|v| v.vtxo == *vtxo_2));
522		assert!(vtxos.iter().any(|v| v.vtxo == *vtxo_3));
523
524		conn.close().unwrap();
525	}
526
527	#[tokio::test]
528	#[cfg(feature = "onchain-bdk")]
529	async fn test_create_wallet_then_load() {
530		use bdk_wallet::chain::DescriptorExt;
531
532		let (connection_string, conn) = in_memory_db();
533
534		let db = SqliteClient::open(connection_string).unwrap();
535		let network = bitcoin::Network::Testnet;
536
537		let seed = bip39::Mnemonic::generate(12).unwrap().to_seed("");
538		let xpriv = bitcoin::bip32::Xpriv::new_master(network, &seed).unwrap();
539
540		let desc = format!("tr({}/84'/0'/0'/*)", xpriv);
541
542		// need to call init before we call store
543		let _ = db.initialize_bdk_wallet().await.unwrap();
544		let mut created = bdk_wallet::Wallet::create_single(desc.clone())
545			.network(network)
546			.create_wallet_no_persist()
547			.unwrap();
548		db.store_bdk_wallet_changeset(&created.take_staged().unwrap()).await.unwrap();
549
550		let loaded = {
551			let changeset = db.initialize_bdk_wallet().await.unwrap();
552			bdk_wallet::Wallet::load()
553				.descriptor(bdk_wallet::KeychainKind::External, Some(desc.clone()))
554				.extract_keys()
555				.check_network(network)
556				.load_wallet_no_persist(changeset)
557				.unwrap()
558		};
559
560		assert!(loaded.is_some());
561		assert_eq!(
562			created.public_descriptor(bdk_wallet::KeychainKind::External).descriptor_id(),
563			loaded.unwrap().public_descriptor(bdk_wallet::KeychainKind::External).descriptor_id()
564		);
565
566		// Explicitly close the connection here
567		// This ensures the database isn't dropped during the test
568		conn.close().unwrap();
569	}
570}