bark/persist/adaptor/
mod.rs

1//! Storage adaptor module providing the [StorageAdaptor] trait and blanket
2//! implementation of [BarkPersister] for any type implementing [StorageAdaptor].
3//!
4//! This module provides an optimized single-table storage abstraction that can be
5//! efficiently implemented on various backends (SQLite, Postgres, MongoDB, Firebase,
6//! in-memory, etc.).
7//!
8//! The design uses structured keys:
9//! - **Primary key (`pk`)**: Unique identifier for each record
10//! - **Partition key**: Groups related records for efficient querying
11//! - **Sort key**: Enables ordered iteration and range queries
12//!
13//! # Example
14//!
15//! ```rust
16//! # use bark::persist::adaptor::memory::MemoryStorageAdaptor;
17//! # use bark::persist::adaptor::{Query, Record, StorageAdaptor, SortKey};
18//!
19//! # async fn example() -> anyhow::Result<()> {
20//! // Create an in-memory storage adaptor
21//! let mut storage = MemoryStorageAdaptor::new();
22//!
23//! // Store a record sorted by a numeric field (ascending)
24//! let record = Record {
25//!     partition: 0,
26//!     pk: "item:1".into(),
27//!     sort_key: Some(SortKey::u32_asc(42)),
28//!     data: b"hello world".to_vec(),
29//! };
30//! storage.put(record).await?;
31//!
32//! // Query with efficient index scan
33//! let query = Query::new(0).limit(10);
34//! let records = storage.query(query).await?;
35//! # Ok(())
36//! # }
37//! ```
38
39mod sort;
40
41#[cfg(feature = "filestore")]
42pub mod filestore;
43pub mod memory;
44
45pub use sort::SortKey;
46
47use anyhow::Context;
48use bitcoin::{Amount, Transaction, Txid};
49use bitcoin::secp256k1::PublicKey;
50use bitcoin::hashes::Hash;
51#[cfg(feature = "onchain-bdk")]
52use bdk_core::Merge;
53#[cfg(feature = "onchain-bdk")]
54use bdk_wallet::ChangeSet;
55use chrono::{DateTime, Local};
56use lightning_invoice::Bolt11Invoice;
57use serde::{de::DeserializeOwned, Serialize};
58
59use ark::lightning::{Invoice, PaymentHash, Preimage};
60use ark::{Vtxo, VtxoId};
61use ark::vtxo::Full;
62use bitcoin_ext::BlockDelta;
63
64use crate::exit::ExitTxOrigin;
65use crate::movement::{
66	Movement, MovementId, MovementStatus, MovementSubsystem,
67};
68use crate::persist::BarkPersister;
69use crate::persist::models::{
70	LightningReceive, LightningSend, PendingBoard, PendingOffboard, RoundStateId, SerdeExitChildTx, SerdeRoundState, SerdeVtxo, SerdeVtxoKey, StoredExit, StoredRoundState, Unlocked,
71};
72use crate::round::RoundState;
73use crate::vtxo::{VtxoState, VtxoStateKind};
74use crate::{WalletProperties, WalletVtxo};
75
76pub mod partition {
77	pub const PROPERTIES: u8 = 0;
78	#[allow(unused)]
79	pub const BDK_CHANGESET: u8 = 1;
80	pub const VTXO: u8 = 2;
81	pub const PUBLIC_KEY: u8 = 3;
82	pub const PENDING_BOARD: u8 = 4;
83	pub const ROUND_STATE: u8 = 5;
84	pub const MOVEMENT: u8 = 6;
85	pub const LIGHTNING_SEND: u8 = 7;
86	pub const LIGHTNING_RECEIVE: u8 = 8;
87	pub const EXIT_VTXO: u8 = 9;
88	pub const EXIT_CHILD_TX: u8 = 10;
89	pub const MAILBOX_CHECKPOINT: u8 = 11;
90	pub const PENDING_OFFBOARD: u8 = 12;
91
92	pub const LAST_IDS: u8 = u8::MAX;
93}
94
95/// A storage record with structured keys.
96#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
97pub struct Record {
98	/// Partition key for grouping related records (e.g., "vtxo", "movement").
99	///
100	/// Queries always filter by partition.
101	pub partition: u8,
102
103	/// Unique primary key
104	pub pk: Vec<u8>,
105
106	/// Optional sort key for ordered iteration within a partition.
107	///
108	/// Use [`SortKey::builder()`] to construct composite keys with
109	/// mixed sort directions.
110	///
111	/// This field may be set or changed after record insertion.
112	/// Implementation should support updating the sort key of a
113	/// record post-insert if needed.
114	pub sort_key: Option<SortKey>,
115
116	/// The record data encoded as JSON.
117	pub data: Vec<u8>,
118}
119
120impl Record {
121	/// Converts the record data to a typed value.
122	fn to_data<T: DeserializeOwned>(&self) -> anyhow::Result<T> {
123		serde_json::from_slice(&self.data).map_err(Into::into)
124	}
125
126	/// Creates a new record from a typed value.
127	fn from_data<T: Serialize>(
128		partition: u8,
129		pk: &[u8],
130		sort_key: Option<SortKey>,
131		data: &T,
132	) -> anyhow::Result<Record> {
133		Ok(Record {
134			partition,
135			pk: pk.to_vec(),
136			sort_key,
137			data: serde_json::to_vec(data)?,
138		})
139	}
140}
141
142/// Query specification for retrieving records from a partition.
143#[derive(Debug, Clone, Default)]
144pub struct Query {
145	/// Partition to query (required).
146	pub partition: u8,
147
148	/// Maximum number of records to return.
149	pub limit: Option<usize>,
150
151	/// Inclusive start key for the query.
152	pub start: Option<SortKey>,
153
154	/// Exclusive end key for the query.
155	pub end: Option<SortKey>,
156}
157
158impl Query {
159	/// Creates a new query for the given partition.
160	pub fn new(partition: u8) -> Self {
161		Self {
162			partition,
163			..Default::default()
164		}
165	}
166
167	/// Limits the number of results.
168	pub fn limit(mut self, n: usize) -> Self {
169		self.limit = Some(n);
170		self
171	}
172
173	/// Sets the start key for the query (inclusive).
174	pub fn start(mut self, start: SortKey) -> Self {
175		self.start = Some(start);
176		self
177	}
178
179	/// Sets the end key for the query (exclusive).
180	pub fn end(mut self, end: SortKey) -> Self {
181		self.end = Some(end);
182		self
183	}
184}
185
186/// Storage adaptor trait for persistence backends.
187///
188/// This trait provides a minimal interface (4 methods) that can be efficiently
189/// implemented on various storage backends while enabling query optimization.
190///
191/// # Implementor's Guide
192///
193/// ## Simple backends (memory, file-based)
194///
195/// Store records in a map/list and implement `query` by filtering in memory.
196///
197/// ## Database backends (Postgres, MongoDB, Firebase, IndexedDB, etc.)
198///
199/// Create a single table with indexes:
200///
201/// ```sql
202/// CREATE TABLE storage (
203///     pk TEXT PRIMARY KEY,
204///     partition TEXT NOT NULL,
205///     sort_key BLOB,
206///     data BLOB NOT NULL
207/// );
208/// CREATE INDEX idx_partition_sort ON storage(partition, sort_key);
209/// ```
210///
211/// Translate [`Query`] to SQL:
212///
213/// ```sql
214/// SELECT * FROM storage
215/// WHERE partition = :partition
216/// ORDER BY :sort_key DESC
217/// ```
218#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
219#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
220pub trait StorageAdaptor: Send + Sync + 'static {
221	/// Stores a record, inserting or updating by primary key.
222	async fn put(&mut self, record: Record) -> anyhow::Result<()>;
223
224	/// Retrieves a record by primary key.
225	///
226	/// Returns `None` if the record doesn't exist.
227	async fn get(&self, partition: u8, pk: &[u8]) -> anyhow::Result<Option<Record>>;
228
229	/// Deletes a record by primary key.
230	///
231	/// Returns the deleted record if it existed, `None` otherwise.
232	async fn delete(&mut self, partition: u8, pk: &[u8]) -> anyhow::Result<Option<Record>>;
233
234	/// Queries records in a partition
235	///
236	/// Results are ordered by sort key. Records without a sort key appear last.
237	async fn query(&self, query: Query) -> anyhow::Result<Vec<Record>>;
238
239	/// Increments the last partition id, then stores and returns the new id.
240	async fn incremental_id(&mut self, partition: u8) -> anyhow::Result<u32> {
241		let last_partition_id = self.get(partition::LAST_IDS, &[partition]).await?
242			.map(|r| r.to_data::<u32>()).unwrap_or(Ok(0))?;
243		let next_partition_id = last_partition_id + 1;
244
245		let record = Record::from_data(
246			partition::LAST_IDS,
247			&[partition],
248			None,
249			&next_partition_id,
250		)?;
251
252		self.put(record).await?;
253		Ok(next_partition_id)
254	}
255}
256
257async fn get_vtxo(adaptor: &dyn StorageAdaptor, id: VtxoId) -> anyhow::Result<Option<SerdeVtxo>> {
258	match adaptor.get(partition::VTXO, &id.to_bytes()).await? {
259		Some(record) => Ok(Some(record.to_data::<SerdeVtxo>()?)),
260		None => Ok(None),
261	}
262}
263
264async fn get_check_vtxo_state(
265	adaptor: &dyn StorageAdaptor,
266	vtxo_id: VtxoId,
267	allowed_states: &[VtxoStateKind],
268) -> anyhow::Result<SerdeVtxo> {
269	let vtxo = get_vtxo(adaptor, vtxo_id).await?
270		.context("vtxo not found")?;
271
272	let current_state = vtxo.current_state().context("vtxo has no state")?;
273	if !allowed_states.contains(&current_state.kind()) {
274		bail!("current state {:?} not in allowed states {:?}",
275			current_state.kind(), allowed_states
276		);
277	}
278
279	Ok(vtxo)
280}
281
282async fn update_vtxo_state_checked(
283	adaptor: &mut dyn StorageAdaptor,
284	vtxo_id: VtxoId,
285	new_state: VtxoState,
286	allowed_old_states: &[VtxoStateKind],
287) -> anyhow::Result<WalletVtxo> {
288	let mut serde_vtxo = get_check_vtxo_state(adaptor, vtxo_id, allowed_old_states).await?;
289
290	let sk = sort::vtxo_sort_key(
291		new_state.kind(), serde_vtxo.vtxo.expiry_height(), serde_vtxo.vtxo.amount()
292	);
293
294	serde_vtxo.states.push(new_state.clone());
295	let updated_record = Record::from_data(
296		partition::VTXO,
297		&vtxo_id.to_bytes(),
298		Some(sk),
299		&serde_vtxo,
300	)?;
301
302	adaptor.put(updated_record).await?;
303
304	Ok(WalletVtxo {
305		vtxo: serde_vtxo.vtxo,
306		state: new_state,
307	})
308}
309
310pub struct StorageAdaptorWrapper<S: StorageAdaptor> {
311	inner: tokio::sync::RwLock<S>,
312}
313
314impl<S: StorageAdaptor> StorageAdaptorWrapper<S> {
315	pub fn new(inner: S) -> Self {
316		Self {
317			inner: tokio::sync::RwLock::new(inner),
318		}
319	}
320}
321
322/// Blanket implementation of `BarkPersister` for any type implementing `StorageAdaptor`.
323#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
324#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
325impl <S: StorageAdaptor> BarkPersister for StorageAdaptorWrapper<S> {
326	async fn init_wallet(&self, properties: &WalletProperties) -> anyhow::Result<()> {
327		let record = Record::from_data(
328			partition::PROPERTIES,
329			// NB: a single set of properties is stored, so no need for primary key
330			&[],
331			None,
332			properties,
333		)?;
334		self.inner.write().await.put(record).await
335	}
336
337	async fn read_properties(&self) -> anyhow::Result<Option<WalletProperties>> {
338		match self.inner.read().await.get(partition::PROPERTIES, &[]).await? {
339			Some(record) => Ok(Some(record.to_data()?)),
340			None => Ok(None),
341		}
342	}
343
344	async fn set_server_pubkey(&self, server_pubkey: PublicKey) -> anyhow::Result<()> {
345		let mut properties = match self.read_properties().await? {
346			Some(properties) => properties,
347			None => bail!("wallet not initialized"),
348		};
349
350		properties.server_pubkey = Some(server_pubkey);
351
352		let record = Record::from_data(partition::PROPERTIES, &[], None, &properties)?;
353		self.inner.write().await.put(record).await
354	}
355
356	#[cfg(feature = "onchain-bdk")]
357	async fn initialize_bdk_wallet(&self) -> anyhow::Result<ChangeSet> {
358		match self.inner.read().await.get(partition::BDK_CHANGESET, &[]).await? {
359			Some(record) => record.to_data(),
360			None => Ok(ChangeSet::default()),
361		}
362	}
363
364	#[cfg(feature = "onchain-bdk")]
365	async fn store_bdk_wallet_changeset(&self, changeset: &ChangeSet) -> anyhow::Result<()> {
366		let mut current = self.initialize_bdk_wallet().await?;
367		current.merge(changeset.clone());
368
369		let record = Record::from_data(
370			partition::BDK_CHANGESET,
371			// NB: a single changeset is stored, so no need for primary key
372			&[],
373			None,
374			&current,
375		)?;
376		self.inner.write().await.put(record).await
377	}
378
379	async fn create_new_movement(
380		&self,
381		status: MovementStatus,
382		subsystem: &MovementSubsystem,
383		time: DateTime<Local>,
384	) -> anyhow::Result<MovementId> {
385		let mut lock = self.inner.write().await;
386
387		let id = MovementId(lock.incremental_id(partition::MOVEMENT).await?);
388		let movement = Movement::new(id, status, subsystem, time);
389
390		let record = Record::from_data(
391			partition::MOVEMENT,
392			&id.to_bytes(),
393			Some(sort::movement_sort_key(&time)),
394			&movement,
395		)?;
396		lock.put(record).await?;
397
398		Ok(id)
399	}
400
401	async fn update_movement(&self, movement: &Movement) -> anyhow::Result<()> {
402		let record = Record::from_data(
403			partition::MOVEMENT,
404			&movement.id.to_bytes(),
405			Some(sort::movement_sort_key(&movement.time.created_at)),
406			movement,
407		)?;
408		self.inner.write().await.put(record).await
409	}
410
411	async fn get_movement_by_id(&self, movement_id: MovementId) -> anyhow::Result<Movement> {
412		self.inner.read().await.get(partition::MOVEMENT, &movement_id.to_bytes())
413			.await?
414			.context("movement not found")?
415			.to_data()
416	}
417
418	async fn get_all_movements(&self) -> anyhow::Result<Vec<Movement>> {
419		let records = self.inner.read().await.query(Query::new(partition::MOVEMENT)).await?;
420		records.into_iter().map(|r| r.to_data()).collect()
421	}
422
423	async fn store_pending_board(
424		&self,
425		vtxo: &Vtxo<Full>,
426		funding_tx: &Transaction,
427		movement_id: MovementId,
428	) -> anyhow::Result<()> {
429		let pending_board = PendingBoard {
430			vtxos: vec![vtxo.id()],
431			amount: vtxo.amount(),
432			funding_tx: funding_tx.clone(),
433			movement_id,
434		};
435
436		let record = Record::from_data(
437			partition::PENDING_BOARD,
438			&vtxo.id().to_bytes(),
439			None,
440			&pending_board,
441		)?;
442
443		self.inner.write().await.put(record).await
444	}
445
446	async fn remove_pending_board(&self, vtxo_id: &VtxoId) -> anyhow::Result<()> {
447		self.inner.write().await.delete(partition::PENDING_BOARD, &vtxo_id.to_bytes()).await?;
448		Ok(())
449	}
450
451	async fn get_all_pending_board_ids(&self) -> anyhow::Result<Vec<VtxoId>> {
452		let records = self
453			.inner.read().await.query(Query::new(partition::PENDING_BOARD))
454			.await?;
455		records
456			.into_iter()
457			.map(|r| {
458				let board: PendingBoard = r.to_data()?;
459				Ok(board.vtxos.into_iter().next().context("empty vtxos")?)
460			})
461			.collect()
462	}
463
464	async fn get_pending_board_by_vtxo_id(
465		&self,
466		vtxo_id: VtxoId,
467	) -> anyhow::Result<Option<PendingBoard>> {
468		match self.inner.read().await.get(partition::PENDING_BOARD, &vtxo_id.to_bytes()).await? {
469			Some(record) => Ok(Some(record.to_data()?)),
470			None => Ok(None),
471		}
472	}
473
474	async fn store_round_state_lock_vtxos(
475		&self,
476		round_state: &RoundState,
477	) -> anyhow::Result<RoundStateId> {
478		let mut lock = self.inner.write().await;
479
480		let id = RoundStateId(lock.incremental_id(partition::ROUND_STATE).await?);
481
482		let allowed_states = &[VtxoStateKind::Spendable];
483
484		// First check that the inputs are spendable
485		for vtxo in round_state.participation().inputs.iter() {
486			get_check_vtxo_state(&mut *lock, vtxo.id(), allowed_states).await?;
487		}
488
489		for vtxo in round_state.participation().inputs.iter() {
490			update_vtxo_state_checked(
491				&mut *lock,
492				vtxo.id(),
493				VtxoState::Locked { movement_id: round_state.movement_id },
494				allowed_states,
495			).await?;
496		}
497
498		let serde_state = SerdeRoundState::from(round_state);
499		let record = Record::from_data(
500			partition::ROUND_STATE,
501			&id.to_bytes(),
502			Some(sort::SortKey::u32_asc(id.0)),
503			&serde_state,
504		)?;
505		lock.put(record).await?;
506
507		Ok(id)
508	}
509
510	async fn update_round_state(&self, round_state: &StoredRoundState) -> anyhow::Result<()> {
511		let serde_state = SerdeRoundState::from(round_state.state());
512		let record = Record::from_data(
513			partition::ROUND_STATE,
514			&round_state.id().to_bytes(),
515			Some(sort::SortKey::u32_asc(round_state.id().0)),
516			&serde_state,
517		)?;
518		self.inner.write().await.put(record).await
519	}
520
521	async fn remove_round_state(&self, round_state: &StoredRoundState) -> anyhow::Result<()> {
522		self.inner.write().await
523			.delete(partition::ROUND_STATE, &round_state.id().to_bytes()).await?;
524		Ok(())
525	}
526
527	async fn get_round_state_by_id(&self, _id: RoundStateId) -> anyhow::Result<Option<StoredRoundState<Unlocked>>> {
528		let record = self.inner.read().await
529			.get(partition::ROUND_STATE, &_id.to_bytes()).await?;
530		match record {
531			Some(r) => {
532				let pk_slice: [u8; 4] = r.pk[..4].try_into().expect("4 bytes shouldn't fail");
533				let id = RoundStateId(u32::from_be_bytes(pk_slice));
534				let state = r.to_data::<SerdeRoundState>()?.into();
535				Ok(Some(StoredRoundState::new(id, state)))
536			},
537			None => Ok(None),
538		}
539	}
540
541	async fn get_pending_round_state_ids(&self) -> anyhow::Result<Vec<RoundStateId>> {
542		let records = self.inner.read().await
543			.query(Query::new(partition::ROUND_STATE)).await?;
544		records.into_iter()
545			.map(|r| {
546				let pk_slice: [u8; 4] = r.pk[..4].try_into().expect("4 bytes shouldn't fail");
547				Ok(RoundStateId(u32::from_be_bytes(pk_slice)))
548			})
549			.collect()
550	}
551
552	async fn store_vtxos(&self, vtxos: &[(&Vtxo<Full>, &VtxoState)]) -> anyhow::Result<()> {
553		let mut lock = self.inner.write().await;
554
555		for (vtxo, state) in vtxos {
556			let serde_vtxo = SerdeVtxo {
557				vtxo: (*vtxo).clone(),
558				states: vec![(*state).clone()],
559			};
560
561			let sk = sort::vtxo_sort_key(
562				state.kind(), vtxo.expiry_height(), vtxo.amount(),
563			);
564			let record = Record::from_data(
565				partition::VTXO,
566				&vtxo.id().to_bytes(),
567				Some(sk),
568				&serde_vtxo,
569			)?;
570			lock.put(record).await?;
571		}
572		Ok(())
573	}
574
575	async fn get_wallet_vtxo(&self, id: VtxoId) -> anyhow::Result<Option<WalletVtxo>> {
576		let lock = self.inner.read().await;
577		match get_vtxo(&*lock, id).await? {
578			Some(vtxo) => Ok(Some(WalletVtxo {
579				state: vtxo.current_state().context("vtxo has no state")?.clone(),
580				vtxo: vtxo.vtxo,
581			})),
582			None => Ok(None),
583		}
584	}
585
586	async fn get_all_vtxos(&self) -> anyhow::Result<Vec<WalletVtxo>> {
587		let records = self.inner.read().await
588			.query(Query::new(partition::VTXO)).await?;
589
590		records
591			.into_iter()
592			.map(|r| {
593				let serde_vtxo = r.to_data::<SerdeVtxo>()?;
594				let state = serde_vtxo
595					.current_state()
596					.cloned()
597					.context("vtxo has no state")?;
598				Ok(WalletVtxo {
599					vtxo: serde_vtxo.vtxo,
600					state,
601				})
602			})
603			.collect()
604	}
605
606	async fn get_vtxos_by_state(
607		&self,
608		states: &[VtxoStateKind],
609	) -> anyhow::Result<Vec<WalletVtxo>> {
610		let lock = self.inner.read().await;
611
612		let range = |state: VtxoStateKind| {
613			let start = sort::vtxo_sort_key(state, u32::MIN, Amount::ZERO);
614			let end = sort::vtxo_sort_key(state, u32::MAX, Amount::MAX);
615			(start, end)
616		};
617
618		let mut records = Vec::new();
619		for state in states {
620			let (start, end) = range(*state);
621			let query = Query::new(partition::VTXO).start(start).end(end);
622
623			for record in lock.query(query).await? {
624				let serde_vtxo = record.to_data::<SerdeVtxo>()?;
625				let current_state = serde_vtxo.current_state()
626					.context("vtxo has no current state")?.clone();
627				debug_assert_eq!(current_state.kind(), *state);
628				records.push(WalletVtxo {
629					vtxo: serde_vtxo.vtxo,
630					state: current_state,
631				});
632			}
633		}
634
635		Ok(records)
636	}
637
638	async fn remove_vtxo(&self, id: VtxoId) -> anyhow::Result<Option<Vtxo<Full>>> {
639		match self.inner.write().await.delete(partition::VTXO, &id.to_bytes()).await? {
640			Some(record) => Ok(Some(record.to_data::<SerdeVtxo>()?.vtxo)),
641			None => Ok(None),
642		}
643	}
644
645	async fn has_spent_vtxo(&self, id: VtxoId) -> anyhow::Result<bool> {
646		match self.get_wallet_vtxo(id).await? {
647			Some(vtxo) => Ok(vtxo.state.kind() == VtxoStateKind::Spent),
648			None => Ok(false),
649		}
650	}
651
652	async fn update_vtxo_state_checked(
653		&self,
654		vtxo_id: VtxoId,
655		new_state: VtxoState,
656		allowed_old_states: &[VtxoStateKind],
657	) -> anyhow::Result<WalletVtxo> {
658		let mut lock = self.inner.write().await;
659		update_vtxo_state_checked(&mut *lock, vtxo_id, new_state, allowed_old_states).await
660	}
661
662	async fn store_vtxo_key(&self, index: u32, public_key: PublicKey) -> anyhow::Result<()> {
663		let vtxo_key = SerdeVtxoKey { index, public_key };
664		let record = Record::from_data(
665			partition::PUBLIC_KEY,
666			&public_key.serialize()[..],
667			Some(sort::SortKey::u64_desc(index as u64)),
668			&vtxo_key,
669		)?;
670		self.inner.write().await.put(record).await
671	}
672
673	async fn get_last_vtxo_key_index(&self) -> anyhow::Result<Option<u32>> {
674		// Query with reverse order and limit 1 to get the highest index
675		let query = Query::new(partition::PUBLIC_KEY).limit(1);
676		let records = self.inner.read().await.query(query).await?;
677
678		match records.into_iter().next() {
679			Some(record) => {
680				let vtxo_key = record.to_data::<SerdeVtxoKey>()?;
681				Ok(Some(vtxo_key.index))
682			}
683			None => Ok(None),
684		}
685	}
686
687	async fn get_public_key_idx(&self, public_key: &PublicKey) -> anyhow::Result<Option<u32>> {
688		match self.inner.read().await
689			.get(partition::PUBLIC_KEY, &public_key.serialize()[..]).await?
690		{
691			Some(record) => {
692				let vtxo_key = record.to_data::<SerdeVtxoKey>()?;
693				Ok(Some(vtxo_key.index))
694			}
695			None => Ok(None),
696		}
697	}
698
699	async fn get_mailbox_checkpoint(&self) -> anyhow::Result<u64> {
700		match self.inner.read().await
701			.get(partition::MAILBOX_CHECKPOINT, &[]).await?
702		{
703			Some(record) => Ok(record.to_data::<u64>()?),
704			None => Ok(0),
705		}
706	}
707
708	async fn store_mailbox_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
709		let mut lock = self.inner.write().await;
710		let record = Record::from_data(
711			partition::MAILBOX_CHECKPOINT,
712			&[],
713			None,
714			&checkpoint,
715		)?;
716		lock.put(record).await?;
717		Ok(())
718	}
719
720	async fn store_new_pending_lightning_send(
721		&self,
722		invoice: &Invoice,
723		amount: Amount,
724		fee: Amount,
725		vtxo_ids: &[VtxoId],
726		movement_id: MovementId,
727	) -> anyhow::Result<LightningSend> {
728		let mut lock = self.inner.write().await;
729		let mut htlc_vtxos = Vec::with_capacity(vtxo_ids.len());
730		for vtxo_id in vtxo_ids {
731			let vtxo = get_vtxo(&*lock, *vtxo_id).await?
732				.context("vtxo not found")?;
733			htlc_vtxos.push(vtxo.to_wallet_vtxo()?);
734		}
735
736		let lightning_send = LightningSend {
737			invoice: invoice.clone(),
738			amount,
739			fee,
740			htlc_vtxos,
741			preimage: None,
742			movement_id,
743			finished_at: None,
744		};
745
746		let record = Record::from_data(
747			partition::LIGHTNING_SEND,
748			&invoice.payment_hash().to_byte_array(),
749			None,
750			&lightning_send,
751		)?;
752
753		lock.put(record).await?;
754
755		Ok(lightning_send)
756	}
757
758	async fn get_all_pending_lightning_send(&self) -> anyhow::Result<Vec<LightningSend>> {
759		let records = self.inner.read().await
760			.query(Query::new(partition::LIGHTNING_SEND)).await?;
761		records
762			.into_iter()
763			.filter_map(|r| {
764				let send: LightningSend = r.to_data().ok()?;
765				if send.finished_at.is_none() {
766					Some(Ok(send))
767				} else {
768					None
769				}
770			})
771			.collect()
772	}
773
774	async fn finish_lightning_send(
775		&self,
776		payment_hash: PaymentHash,
777		preimage: Option<Preimage>,
778	) -> anyhow::Result<()> {
779		let mut lock = self.inner.write().await;
780
781		let pk = payment_hash.to_byte_array();
782		let record = lock
783			.get(partition::LIGHTNING_SEND, &pk).await?.context("lightning send not found")?;
784		let mut lightning_send: LightningSend = record.to_data()?;
785
786		lightning_send.preimage = preimage;
787		lightning_send.finished_at = Some(Local::now());
788
789		let updated_record = Record::from_data(
790			partition::LIGHTNING_SEND,
791			&pk,
792			None,
793			&lightning_send,
794		)?;
795		lock.put(updated_record).await?;
796
797		Ok(())
798	}
799
800	async fn remove_lightning_send(&self, payment_hash: PaymentHash) -> anyhow::Result<()> {
801		self.inner.write().await.delete(partition::LIGHTNING_SEND, &payment_hash.to_byte_array()).await?;
802		Ok(())
803	}
804
805	async fn get_lightning_send(
806		&self,
807		payment_hash: PaymentHash,
808	) -> anyhow::Result<Option<LightningSend>> {
809		match self.inner.read().await
810			.get(partition::LIGHTNING_SEND, &payment_hash.to_byte_array()).await?
811		{
812			Some(record) => Ok(Some(record.to_data()?)),
813			None => Ok(None),
814		}
815	}
816
817	async fn store_lightning_receive(
818		&self,
819		payment_hash: PaymentHash,
820		preimage: Preimage,
821		invoice: &Bolt11Invoice,
822		htlc_recv_cltv_delta: BlockDelta,
823	) -> anyhow::Result<()> {
824		let lightning_receive = LightningReceive {
825			payment_hash,
826			payment_preimage: preimage,
827			invoice: invoice.clone(),
828			htlc_recv_cltv_delta,
829			htlc_vtxos: vec![],
830			movement_id: None,
831			finished_at: None,
832			preimage_revealed_at: None,
833		};
834
835		let record = Record::from_data(
836			partition::LIGHTNING_RECEIVE,
837			&payment_hash.to_byte_array(),
838			None,
839			&lightning_receive,
840		)?;
841		self.inner.write().await.put(record).await
842	}
843
844	async fn get_all_pending_lightning_receives(&self) -> anyhow::Result<Vec<LightningReceive>> {
845		let records = self.inner.read().await
846			.query(Query::new(partition::LIGHTNING_RECEIVE))
847			.await?;
848		records
849			.into_iter()
850			.filter_map(|r| {
851				let receive: LightningReceive = r.to_data().ok()?;
852				if receive.finished_at.is_none() {
853					Some(Ok(receive))
854				} else {
855					None
856				}
857			})
858			.collect()
859	}
860
861	async fn set_preimage_revealed(&self, payment_hash: PaymentHash) -> anyhow::Result<()> {
862		let mut lock = self.inner.write().await;
863
864		let pk = payment_hash.to_byte_array();
865		let record = lock.get(partition::LIGHTNING_RECEIVE, &pk).await?
866			.context("lightning receive not found")?;
867		let mut lightning_receive: LightningReceive = record.to_data()?;
868
869		lightning_receive.preimage_revealed_at = Some(Local::now());
870
871		let updated_record = Record::from_data(
872			partition::LIGHTNING_RECEIVE,
873			&pk,
874			None,
875			&lightning_receive,
876		)?;
877		lock.put(updated_record).await
878	}
879
880	async fn update_lightning_receive(
881		&self,
882		payment_hash: PaymentHash,
883		vtxo_ids: &[VtxoId],
884		movement_id: MovementId,
885	) -> anyhow::Result<()> {
886		let mut lock = self.inner.write().await;
887		let pk = payment_hash.to_byte_array();
888		let record = lock.get(partition::LIGHTNING_RECEIVE, &pk).await?
889			.context("lightning receive not found")?;
890		let mut lightning_receive: LightningReceive = record.to_data()?;
891
892		let mut htlc_vtxos = Vec::with_capacity(vtxo_ids.len());
893		for vtxo_id in vtxo_ids {
894			let vtxo = get_vtxo(&*lock, *vtxo_id).await?
895				.context("vtxo not found")?;
896			htlc_vtxos.push(vtxo.to_wallet_vtxo()?);
897		}
898
899		lightning_receive.htlc_vtxos = htlc_vtxos;
900		lightning_receive.movement_id = Some(movement_id);
901
902		let updated_record = Record::from_data(
903			partition::LIGHTNING_RECEIVE,
904			&pk,
905			None,
906			&lightning_receive,
907		)?;
908		lock.put(updated_record).await
909	}
910
911	async fn fetch_lightning_receive_by_payment_hash(
912		&self,
913		payment_hash: PaymentHash,
914	) -> anyhow::Result<Option<LightningReceive>> {
915		match self.inner.read().await
916			.get(partition::LIGHTNING_RECEIVE, &payment_hash.to_byte_array()).await?
917		{
918			Some(record) => Ok(Some(record.to_data()?)),
919			None => Ok(None),
920		}
921	}
922
923	async fn finish_pending_lightning_receive(
924		&self,
925		payment_hash: PaymentHash,
926	) -> anyhow::Result<()> {
927		let mut lock = self.inner.write().await;
928		let pk = payment_hash.to_byte_array();
929		let record = lock.get(partition::LIGHTNING_RECEIVE, &pk).await?
930			.context("lightning receive not found")?;
931		let mut lightning_receive: LightningReceive = record.to_data()?;
932
933		lightning_receive.finished_at = Some(Local::now());
934
935		let updated_record = Record::from_data(
936			partition::LIGHTNING_RECEIVE,
937			&pk,
938			None,
939			&lightning_receive,
940		)?;
941		lock.put(updated_record).await
942	}
943
944	async fn store_pending_offboard(&self, pending: &PendingOffboard) -> anyhow::Result<()> {
945		let record = Record::from_data(
946			partition::PENDING_OFFBOARD,
947			&pending.movement_id.to_bytes(),
948			None,
949			pending,
950		)?;
951		self.inner.write().await.put(record).await
952	}
953
954	async fn get_pending_offboards(&self) -> anyhow::Result<Vec<PendingOffboard>> {
955		let records = self.inner.read().await
956			.query(Query::new(partition::PENDING_OFFBOARD)).await?;
957		records.into_iter().map(|r| r.to_data()).collect()
958	}
959
960	async fn remove_pending_offboard(&self, movement_id: MovementId) -> anyhow::Result<()> {
961		self.inner.write().await
962			.delete(partition::PENDING_OFFBOARD, &movement_id.to_bytes()).await?;
963		Ok(())
964	}
965
966	async fn store_exit_vtxo_entry(&self, exit: &StoredExit) -> anyhow::Result<()> {
967		let record = Record::from_data(
968			partition::EXIT_VTXO,
969			&exit.vtxo_id.to_bytes(),
970			None,
971			exit,
972		)?;
973		self.inner.write().await.put(record).await
974	}
975
976	async fn remove_exit_vtxo_entry(&self, id: &VtxoId) -> anyhow::Result<()> {
977		self.inner.write().await.delete(partition::EXIT_VTXO, &id.to_bytes()).await?;
978		Ok(())
979	}
980
981	async fn get_exit_vtxo_entries(&self) -> anyhow::Result<Vec<StoredExit>> {
982		let records = self.inner.read().await
983			.query(Query::new(partition::EXIT_VTXO)).await?;
984		records.into_iter().map(|r| r.to_data()).collect()
985	}
986
987	async fn store_exit_child_tx(
988		&self,
989		exit_txid: Txid,
990		child_tx: &Transaction,
991		origin: ExitTxOrigin,
992	) -> anyhow::Result<()> {
993		let exit_child = SerdeExitChildTx {
994			child_tx: child_tx.clone(),
995			origin,
996		};
997		let record = Record::from_data(
998			partition::EXIT_CHILD_TX,
999			&exit_txid.to_byte_array(),
1000			None,
1001			&exit_child,
1002		)?;
1003		self.inner.write().await.put(record).await
1004	}
1005
1006	async fn get_exit_child_tx(
1007		&self,
1008		exit_txid: Txid,
1009	) -> anyhow::Result<Option<(Transaction, ExitTxOrigin)>> {
1010		match self.inner.read().await
1011			.get(partition::EXIT_CHILD_TX, &exit_txid.to_byte_array()).await?
1012		{
1013			Some(record) => {
1014				let exit_child = record.to_data::<SerdeExitChildTx>()?;
1015				Ok(Some((exit_child.child_tx, exit_child.origin)))
1016			}
1017			None => Ok(None),
1018		}
1019	}
1020}
1021
1022#[cfg(test)]
1023mod tests {
1024	use super::*;
1025
1026	#[test]
1027	fn storage_query_builder() {
1028		let query = Query::new(0)
1029			.limit(10)
1030			.start(SortKey::u32_asc(100))
1031			.end(SortKey::u32_asc(200));
1032
1033		assert_eq!(query.partition, 0);
1034		assert_eq!(query.limit, Some(10));
1035		assert_eq!(query.start, Some(SortKey::u32_asc(100)));
1036		assert_eq!(query.end, Some(SortKey::u32_asc(200)));
1037	}
1038}
1039
1040/// This module provides comprehensive tests for all four methods of the
1041/// `StorageAdaptor` trait. Use these functions to validate custom implementations.
1042///
1043/// # Example
1044///
1045/// ```rust
1046/// use bark::persist::adaptor::memory::test_suite;
1047///
1048/// #[tokio::test]
1049/// async fn test_my_custom_adaptor() {
1050///     let storage = MyCustomStorageAdaptor::new();
1051///     test_suite::run_all(&storage).await;
1052/// }
1053/// ```
1054#[cfg(test)]
1055pub mod test_suite {
1056	use super::*;
1057	use super::partition::LAST_IDS;
1058	use super::sort::SortKey;
1059
1060	async fn clear_partitions<S: StorageAdaptor>(storage: &mut S, partitions: &[u8]) -> anyhow::Result<()> {
1061		for partition in partitions {
1062			let records = storage.query(Query::new(*partition)).await?;
1063			for record in records {
1064				storage.delete(record.partition, &record.pk).await?;
1065			}
1066		}
1067		Ok(())
1068	}
1069
1070	/// Runs all test suites against the given storage adaptor.
1071	pub async fn run_all<S: StorageAdaptor>(storage: &mut S) {
1072		// put tests
1073		test_put_insert(storage).await;
1074		test_put_upsert(storage).await;
1075		test_put_with_sort_key(storage).await;
1076		test_put_without_sort_key(storage).await;
1077		test_put_multiple_partitions(storage).await;
1078
1079		// get tests
1080		test_get_existing(storage).await;
1081		test_get_after_update(storage).await;
1082
1083		// delete tests
1084		test_delete_existing(storage).await;
1085		test_delete_nonexistent(storage).await;
1086		test_delete_idempotent(storage).await;
1087
1088		// query tests
1089		test_query_empty_partition(storage).await;
1090		test_query_returns_partition_records(storage).await;
1091		test_query_ordering(storage).await;
1092		test_query_with_limit(storage).await;
1093		test_query_null_sort_key_ordering(storage).await;
1094		test_query_partition_isolation(storage).await;
1095		test_query_range(storage).await;
1096
1097		// incremental_id tests
1098		test_incremental_id_starts_at_one(storage).await;
1099		test_incremental_id_increments(storage).await;
1100		test_incremental_id_partition_isolation(storage).await;
1101		test_incremental_id_persists_across_operations(storage).await;
1102	}
1103
1104	/// Tests that put inserts a new record.
1105	pub async fn test_put_insert<S: StorageAdaptor>(storage: &mut S) {
1106		let record = Record {
1107			pk: "put_insert_1".into(),
1108			partition: 0,
1109			sort_key: None,
1110			data: b"test data".to_vec(),
1111		};
1112
1113		storage.put(record).await.expect("put should succeed");
1114
1115		let retrieved = storage
1116			.get(0, b"put_insert_1")
1117			.await
1118			.expect("get should succeed")
1119			.expect("record should exist");
1120
1121		assert_eq!(retrieved.pk, b"put_insert_1");
1122		assert_eq!(retrieved.partition, 0);
1123		assert_eq!(retrieved.data, b"test data".to_vec());
1124	}
1125
1126	/// Tests that put updates an existing record (upsert behavior).
1127	pub async fn test_put_upsert<S: StorageAdaptor>(storage: &mut S) {
1128		let record1 = Record {
1129			pk: b"put_upsert_1".into(),
1130			partition: 0,
1131			sort_key: None,
1132			data: b"original".to_vec(),
1133		};
1134		storage.put(record1).await.expect("first put should succeed");
1135
1136		let record2 = Record {
1137			pk: "put_upsert_1".into(),
1138			partition: 0,
1139			sort_key: None,
1140			data: b"updated".to_vec(),
1141		};
1142		storage
1143			.put(record2)
1144			.await
1145			.expect("second put should succeed");
1146
1147		let retrieved = storage
1148			.get(0, b"put_upsert_1")
1149			.await
1150			.expect("get should succeed")
1151			.expect("record should exist");
1152
1153		assert_eq!(retrieved.data, b"updated".to_vec(), "data should be updated");
1154	}
1155
1156	/// Tests that put correctly stores the sort key.
1157	pub async fn test_put_with_sort_key<S: StorageAdaptor>(storage: &mut S) {
1158		let sort_key = SortKey::u32_asc(42);
1159		let record = Record {
1160			pk: b"put_sort_key_1".into(),
1161			partition: 0,
1162			sort_key: Some(sort_key.clone()),
1163			data: b"with sort key".to_vec(),
1164		};
1165
1166		storage.put(record).await.expect("put should succeed");
1167
1168		let retrieved = storage
1169			.get(0, b"put_sort_key_1")
1170			.await
1171			.expect("get should succeed")
1172			.expect("record should exist");
1173
1174		assert_eq!(retrieved.sort_key, Some(sort_key));
1175	}
1176
1177	/// Tests that put correctly handles records without sort keys.
1178	pub async fn test_put_without_sort_key<S: StorageAdaptor>(storage: &mut S) {
1179		let record = Record {
1180			pk: b"put_no_sort_key_1".into(),
1181			partition: 0,
1182			sort_key: None,
1183			data: b"no sort key".to_vec(),
1184		};
1185
1186		storage.put(record).await.expect("put should succeed");
1187
1188		let retrieved = storage
1189			.get(0, b"put_no_sort_key_1")
1190			.await
1191			.expect("get should succeed")
1192			.expect("record should exist");
1193
1194		assert!(retrieved.sort_key.is_none());
1195	}
1196
1197	/// Tests that put correctly handles multiple partitions.
1198	pub async fn test_put_multiple_partitions<S: StorageAdaptor>(storage: &mut S) {
1199		let record_a = Record {
1200			pk: "put_multi_a".into(),
1201			partition: 0,
1202			sort_key: None,
1203			data: b"in partition a".to_vec(),
1204		};
1205		let record_b = Record {
1206			pk: "put_multi_b".into(),
1207			partition: 1,
1208			sort_key: None,
1209			data: b"in partition b".to_vec(),
1210		};
1211
1212		storage.put(record_a).await.expect("put a should succeed");
1213		storage.put(record_b).await.expect("put b should succeed");
1214
1215		let retrieved_a = storage
1216			.get(0, b"put_multi_a")
1217			.await
1218			.expect("get should succeed")
1219			.expect("record a should exist");
1220		let retrieved_b = storage
1221			.get(1, b"put_multi_b")
1222			.await
1223			.expect("get should succeed")
1224			.expect("record b should exist");
1225
1226		assert_eq!(retrieved_a.partition, 0);
1227		assert_eq!(retrieved_b.partition, 1);
1228	}
1229
1230	/// Tests that get returns an existing record.
1231	pub async fn test_get_existing<S: StorageAdaptor>(storage: &mut S) {
1232		let record = Record {
1233			pk: b"get_existing_1".into(),
1234			partition: 0,
1235			sort_key: Some(SortKey::u32_asc(100)),
1236			data: b"test".to_vec(),
1237		};
1238		storage.put(record).await.expect("put should succeed");
1239
1240		let retrieved = storage
1241			.get(0, b"get_existing_1")
1242			.await
1243			.expect("get should succeed");
1244
1245		assert!(retrieved.is_some());
1246		let retrieved = retrieved.unwrap();
1247		assert_eq!(retrieved.pk, b"get_existing_1");
1248		assert_eq!(retrieved.partition, 0);
1249		assert_eq!(retrieved.data, b"test".to_vec());
1250
1251		// superset of the key
1252		assert!(storage.get(0, b"get_existing_1_").await.unwrap().is_none());
1253		// subset of the key
1254		assert!(storage.get(0, b"get_existing_").await.unwrap().is_none());
1255
1256		// non-existent key
1257		assert!(storage.get(0, b"get_nonexistent_does_not_exist").await.unwrap().is_none());
1258	}
1259
1260	/// Tests that get returns updated data after put.
1261	pub async fn test_get_after_update<S: StorageAdaptor>(storage: &mut S) {
1262		let record1 = Record {
1263			pk: b"get_after_update_1".into(),
1264			partition: 0,
1265			sort_key: None,
1266			data: b"version1".to_vec(),
1267		};
1268		storage.put(record1).await.expect("put should succeed");
1269
1270		let record2 = Record {
1271			pk: b"get_after_update_1".into(),
1272			partition: 0,
1273			sort_key: None,
1274			data: b"version2".to_vec(),
1275		};
1276		storage.put(record2).await.expect("put should succeed");
1277
1278		let retrieved = storage
1279			.get(0, b"get_after_update_1")
1280			.await
1281			.expect("get should succeed")
1282			.expect("record should exist");
1283
1284		assert_eq!(retrieved.data, b"version2".to_vec());
1285	}
1286
1287	/// Tests that delete removes an existing record and returns it.
1288	pub async fn test_delete_existing<S: StorageAdaptor>(storage: &mut S) {
1289		let record = Record {
1290			pk: b"delete_existing_1".into(),
1291			partition: 0,
1292			sort_key: None,
1293			data: b"to delete".to_vec(),
1294		};
1295		storage.put(record.clone()).await.expect("put should succeed");
1296
1297		let deleted_record = storage
1298			.delete(0, b"delete_existing_1")
1299			.await
1300			.expect("delete should succeed");
1301
1302		assert_eq!(deleted_record, Some(record));
1303
1304		let retrieved = storage
1305			.get(0, b"delete_existing_1")
1306			.await
1307			.expect("get should succeed");
1308		assert!(retrieved.is_none(), "record should no longer exist");
1309	}
1310
1311	/// Tests that delete returns None for non-existent records.
1312	pub async fn test_delete_nonexistent<S: StorageAdaptor>(storage: &mut S) {
1313		let deleted_record = storage
1314			.delete(0, b"delete_nonexistent_does_not_exist")
1315			.await
1316			.expect("delete should succeed");
1317
1318		assert!(
1319			deleted_record.is_none(),
1320			"delete should return None for non-existent record"
1321		);
1322	}
1323
1324	/// Tests that delete is idempotent (second delete returns None).
1325	pub async fn test_delete_idempotent<S: StorageAdaptor>(storage: &mut S) {
1326		let record = Record {
1327			pk: b"delete_idempotent_1".into(),
1328			partition: 0,
1329			sort_key: None,
1330			data: b"delete twice".to_vec(),
1331		};
1332		storage.put(record.clone()).await.expect("put should succeed");
1333
1334		let first_delete = storage
1335			.delete(0, b"delete_idempotent_1")
1336			.await
1337			.expect("first delete should succeed");
1338		let second_delete = storage
1339			.delete(0, b"delete_idempotent_1")
1340			.await
1341			.expect("second delete should succeed");
1342
1343		assert_eq!(first_delete, Some(record), "first delete should return the record");
1344		assert_eq!(second_delete, None, "second delete should return None");
1345	}
1346
1347	/// Tests that query returns empty results for empty partition.
1348	pub async fn test_query_empty_partition<S: StorageAdaptor>(storage: &mut S) {
1349		clear_partitions(storage, &[0]).await.unwrap();
1350		let results = storage
1351			.query(Query::new(0))
1352			.await
1353			.expect("query should succeed");
1354
1355		assert!(results.is_empty());
1356	}
1357
1358	/// Tests that query returns all records in a partition.
1359	pub async fn test_query_returns_partition_records<S: StorageAdaptor>(storage: &mut S) {
1360		clear_partitions(storage, &[0]).await.unwrap();
1361		for i in 0..3 {
1362			let record = Record {
1363				pk: format!("query_partition_{}", i).into(),
1364				partition: 0,
1365				sort_key: Some(SortKey::u32_asc(i)),
1366				data: format!("record_{}", i).as_bytes().to_vec(),
1367			};
1368			storage.put(record).await.expect("put should succeed");
1369		}
1370
1371		let results = storage
1372			.query(Query::new(0))
1373			.await
1374			.expect("query should succeed");
1375
1376		assert_eq!(results.len(), 3);
1377	}
1378
1379	/// Tests that query returns records in ascending sort key order by default.
1380	pub async fn test_query_ordering<S: StorageAdaptor>(storage: &mut S) {
1381		clear_partitions(storage, &[0]).await.unwrap();
1382		// Insert in non-sequential order
1383		for i in [5, 2, 8, 1, 9] {
1384			let record = Record {
1385				pk: format!("query_asc_{}", i).into(),
1386				partition: 0,
1387				sort_key: Some(SortKey::u32_asc(i)),
1388				data: format!("record_{}", i).as_bytes().to_vec(),
1389			};
1390			storage.put(record).await.expect("put should succeed");
1391		}
1392
1393		let results = storage
1394			.query(Query::new(0))
1395			.await
1396			.expect("query should succeed");
1397
1398		let values = results.iter().map(|r| r.data.clone()).collect::<Vec<_>>();
1399		assert_eq!(
1400			values,
1401			vec![b"record_1".to_vec(), b"record_2".to_vec(), b"record_5".to_vec(), b"record_8".to_vec(), b"record_9".to_vec()],
1402			"should be in ascending order"
1403		);
1404	}
1405
1406	/// Tests that query with limit returns at most N records.
1407	pub async fn test_query_with_limit<S: StorageAdaptor>(storage: &mut S) {
1408		clear_partitions(storage, &[0]).await.unwrap();
1409		for i in 0..10 {
1410			let record = Record {
1411				pk: format!("query_limit_{}", i).into(),
1412				partition: 0,
1413				sort_key: Some(SortKey::u32_asc(i)),
1414				data: format!("record_{}", i).as_bytes().to_vec(),
1415			};
1416			storage.put(record).await.expect("put should succeed");
1417		}
1418
1419		let results = storage
1420			.query(Query::new(0).limit(3))
1421			.await
1422			.expect("query should succeed");
1423
1424		assert_eq!(results.len(), 3);
1425		let values = results.iter().map(|r| r.data.clone()).collect::<Vec<_>>();
1426		assert_eq!(
1427			values,
1428			vec![b"record_0".to_vec(), b"record_1".to_vec(), b"record_2".to_vec()],
1429			"should return first 3 records"
1430		);
1431	}
1432
1433	/// Tests that records without sort keys are ordered last (or first when reversed).
1434	pub async fn test_query_null_sort_key_ordering<S: StorageAdaptor>(storage: &mut S) {
1435		clear_partitions(storage, &[0]).await.unwrap();
1436		// Records with sort keys
1437		let with_key_1 = Record {
1438			pk: "query_null_with_1".into(),
1439			partition: 0,
1440			sort_key: Some(SortKey::u32_asc(1)),
1441			data: b"with_key_1".to_vec(),
1442		};
1443		let with_key_2 = Record {
1444			pk: "query_null_with_2".into(),
1445			partition: 0,
1446			sort_key: Some(SortKey::u32_asc(2)),
1447			data: b"with_key_2".to_vec(),
1448		};
1449
1450		// Record without sort key
1451		let without_key = Record {
1452			pk: "query_null_without".into(),
1453			partition: 0,
1454			sort_key: None,
1455			data: b"no_key".to_vec(),
1456		};
1457
1458		storage.put(with_key_1).await.expect("put should succeed");
1459		storage.put(without_key).await.expect("put should succeed");
1460		storage.put(with_key_2).await.expect("put should succeed");
1461
1462		// Ascending: nulls last
1463		let results_asc = storage
1464			.query(Query::new(0))
1465			.await
1466			.expect("query should succeed");
1467
1468		assert_eq!(results_asc.len(), 3);
1469		assert_eq!(results_asc[0].data, b"with_key_1".to_vec());
1470		assert_eq!(results_asc[1].data, b"with_key_2".to_vec());
1471		assert_eq!(results_asc[2].data, b"no_key".to_vec(), "null sort key should be last");
1472	}
1473
1474	/// Tests that query only returns records from the specified partition.
1475	pub async fn test_query_partition_isolation<S: StorageAdaptor>(storage: &mut S) {
1476		clear_partitions(storage, &[0, 1]).await.unwrap();
1477		// Records in partition A
1478		for i in 0..3 {
1479			let record = Record {
1480				pk: format!("query_iso_a_{}", i).into(),
1481				partition: 0,
1482				sort_key: Some(SortKey::u32_asc(i)),
1483				data: format!("record_{}", i).as_bytes().to_vec(),
1484			};
1485			storage.put(record).await.expect("put should succeed");
1486		}
1487
1488		// Records in partition B
1489		for i in 0..5 {
1490			let record = Record {
1491				pk: format!("query_iso_b_{}", i).into(),
1492				partition: 1,
1493				sort_key: Some(SortKey::u32_asc(i)),
1494				data: format!("record_{}", i + 100).as_bytes().to_vec(),
1495			};
1496			storage.put(record).await.expect("put should succeed");
1497		}
1498
1499		let results_a = storage
1500			.query(Query::new(0))
1501			.await
1502			.expect("query should succeed");
1503
1504		let results_b = storage
1505			.query(Query::new(1))
1506			.await
1507			.expect("query should succeed");
1508
1509		assert_eq!(results_a.len(), 3, "partition A should have 3 records");
1510		assert_eq!(results_b.len(), 5, "partition B should have 5 records");
1511
1512		// Verify all results_a are from partition A
1513		assert!(results_a
1514			.iter()
1515			.all(|r| r.partition == 0));
1516
1517		// Verify all results_b are from partition B
1518		assert!(results_b
1519			.iter()
1520			.all(|r| r.partition == 1));
1521	}
1522
1523	/// Tests that query with start and end keys returns only records within the range.
1524	pub async fn test_query_range<S: StorageAdaptor>(storage: &mut S) {
1525		clear_partitions(storage, &[0]).await.unwrap();
1526
1527		// Insert records with sort keys 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
1528		for i in 1..=10u32 {
1529			let record = Record {
1530				pk: format!("query_range_{}", i).into(),
1531				partition: 0,
1532				sort_key: Some(SortKey::u32_asc(i)),
1533				data: format!("record_{}", i).as_bytes().to_vec(),
1534			};
1535			storage.put(record).await.expect("put should succeed");
1536		}
1537
1538		// Query with start key only (>= 5)
1539		let results_start = storage
1540			.query(Query::new(0).start(SortKey::u32_asc(5)))
1541			.await
1542			.expect("query should succeed");
1543
1544		assert_eq!(results_start.len(), 6, "should return records 5-10");
1545		let values: Vec<_> = results_start.iter().map(|r| r.data.clone()).collect();
1546		assert_eq!(
1547			values,
1548			vec![
1549				b"record_5".to_vec(),
1550				b"record_6".to_vec(),
1551				b"record_7".to_vec(),
1552				b"record_8".to_vec(),
1553				b"record_9".to_vec(),
1554				b"record_10".to_vec(),
1555			],
1556			"should return records from 5 onwards"
1557		);
1558
1559		// Query with end key only (<= 3)
1560		let results_end = storage
1561			.query(Query::new(0).end(SortKey::u32_asc(3)))
1562			.await
1563			.expect("query should succeed");
1564
1565		assert_eq!(results_end.len(), 3, "should return records 1-3");
1566		let values: Vec<_> = results_end.iter().map(|r| r.data.clone()).collect();
1567		assert_eq!(
1568			values,
1569			vec![
1570				b"record_1".to_vec(),
1571				b"record_2".to_vec(),
1572				b"record_3".to_vec(),
1573			],
1574			"should return records up to 3"
1575		);
1576
1577		// Query with both start and end keys (3 <= x <= 7)
1578		let results_range = storage
1579			.query(Query::new(0).start(SortKey::u32_asc(3)).end(SortKey::u32_asc(7)))
1580			.await
1581			.expect("query should succeed");
1582
1583		assert_eq!(results_range.len(), 5, "should return records 3-7");
1584		let values: Vec<_> = results_range.iter().map(|r| r.data.clone()).collect();
1585		assert_eq!(
1586			values,
1587			vec![
1588				b"record_3".to_vec(),
1589				b"record_4".to_vec(),
1590				b"record_5".to_vec(),
1591				b"record_6".to_vec(),
1592				b"record_7".to_vec(),
1593			],
1594			"should return records in range 3-7"
1595		);
1596
1597		// Query range with limit
1598		let results_range_limit = storage
1599			.query(Query::new(0).start(SortKey::u32_asc(2)).end(SortKey::u32_asc(8)).limit(3))
1600			.await
1601			.expect("query should succeed");
1602
1603		assert_eq!(results_range_limit.len(), 3, "should return only 3 records due to limit");
1604		let values: Vec<_> = results_range_limit.iter().map(|r| r.data.clone()).collect();
1605		assert_eq!(
1606			values,
1607			vec![
1608				b"record_2".to_vec(),
1609				b"record_3".to_vec(),
1610				b"record_4".to_vec(),
1611			],
1612			"should return first 3 records in range"
1613		);
1614
1615		// Query range that matches no records
1616		let results_empty = storage
1617			.query(Query::new(0).start(SortKey::u32_asc(100)).end(SortKey::u32_asc(200)))
1618			.await
1619			.expect("query should succeed");
1620
1621		assert!(results_empty.is_empty(), "should return no records for out-of-range query");
1622	}
1623
1624	/// Tests that incremental_id returns 1 for the first call on a partition.
1625	pub async fn test_incremental_id_starts_at_one<S: StorageAdaptor>(storage: &mut S) {
1626		// Clear the LAST_IDS entry for partition 0
1627		storage.delete(LAST_IDS, b"0").await.unwrap();
1628		let id = storage.incremental_id(0).await
1629			.expect("incremental_id should succeed");
1630
1631		assert_eq!(id, 1, "first id should be 1");
1632	}
1633
1634	/// Tests that incremental_id increments on subsequent calls.
1635	pub async fn test_incremental_id_increments<S: StorageAdaptor>(storage: &mut S) {
1636		clear_partitions(storage, &[0, LAST_IDS]).await.unwrap();
1637
1638		let id1 = storage.incremental_id(0).await
1639			.expect("incremental_id should succeed");
1640		let id2 = storage.incremental_id(0).await
1641			.expect("incremental_id should succeed");
1642		let id3 = storage.incremental_id(0).await
1643			.expect("incremental_id should succeed");
1644
1645		assert_eq!(id1, 1, "first id should be 1");
1646		assert_eq!(id2, 2, "second id should be 2");
1647		assert_eq!(id3, 3, "third id should be 3");
1648	}
1649
1650	/// Tests that incremental_id maintains separate sequences for different partitions.
1651	pub async fn test_incremental_id_partition_isolation<S: StorageAdaptor>(storage: &mut S) {
1652		clear_partitions(storage, &[0, 1, LAST_IDS]).await.unwrap();
1653
1654		// Generate IDs for partition A
1655		let a1 = storage.incremental_id(0).await
1656			.expect("incremental_id should succeed");
1657		let a2 = storage.incremental_id(0).await
1658			.expect("incremental_id should succeed");
1659		let a3 = storage.incremental_id(0).await
1660			.expect("incremental_id should succeed");
1661
1662		// Generate IDs for partition B
1663		let b1 = storage.incremental_id(1).await
1664			.expect("incremental_id should succeed");
1665		let b2 = storage.incremental_id(1).await
1666			.expect("incremental_id should succeed");
1667
1668		// Partition A should have 1, 2, 3
1669		assert_eq!(a1, 1);
1670		assert_eq!(a2, 2);
1671		assert_eq!(a3, 3);
1672
1673		// Partition B should have its own sequence starting at 1
1674		assert_eq!(b1, 1);
1675		assert_eq!(b2, 2);
1676
1677		// Generate more for A - should continue from 3
1678		let a4 = storage.incremental_id(0).await.expect("incremental_id should succeed");
1679		assert_eq!(a4, 4);
1680	}
1681
1682	/// Tests that incremental_id persists its state correctly.
1683	pub async fn test_incremental_id_persists_across_operations<S: StorageAdaptor>(storage: &mut S) {
1684		clear_partitions(storage, &[0, 1, LAST_IDS]).await.unwrap();
1685
1686		// Generate some IDs
1687		let id1 = storage.incremental_id(0).await
1688			.expect("incremental_id should succeed");
1689		let id2 = storage.incremental_id(0).await
1690			.expect("incremental_id should succeed");
1691		assert_eq!(id1, 1);
1692		assert_eq!(id2, 2);
1693
1694		// Verify the stored value can be retrieved directly
1695		let stored = storage
1696			.get(LAST_IDS, &[0])
1697			.await
1698			.expect("get should succeed")
1699			.expect("id record should exist");
1700		let stored_id: u32 = serde_json::from_slice(&stored.data).expect("should deserialize");
1701		assert_eq!(stored_id, 2, "stored id should be 2");
1702
1703		// Continue generating - should pick up where we left off
1704		let id3 = storage.incremental_id(0).await.expect("incremental_id should succeed");
1705		assert_eq!(id3, 3);
1706	}
1707}