1mod sort;
40
41#[cfg(feature = "filestore")]
42pub mod filestore;
43pub mod memory;
44
45pub use sort::SortKey;
46
47use anyhow::Context;
48use bitcoin::{Amount, Transaction, Txid};
49use bitcoin::secp256k1::PublicKey;
50use bitcoin::hashes::Hash;
51#[cfg(feature = "onchain-bdk")]
52use bdk_core::Merge;
53#[cfg(feature = "onchain-bdk")]
54use bdk_wallet::ChangeSet;
55use chrono::{DateTime, Local};
56use lightning_invoice::Bolt11Invoice;
57use serde::{de::DeserializeOwned, Serialize};
58
59use ark::lightning::{Invoice, PaymentHash, Preimage};
60use ark::{Vtxo, VtxoId};
61use ark::vtxo::Full;
62use bitcoin_ext::BlockDelta;
63
64use crate::exit::ExitTxOrigin;
65use crate::movement::{
66 Movement, MovementId, MovementStatus, MovementSubsystem,
67};
68use crate::persist::BarkPersister;
69use crate::persist::models::{
70 LightningReceive, LightningSend, PendingBoard, PendingOffboard, RoundStateId, SerdeExitChildTx, SerdeRoundState, SerdeVtxo, SerdeVtxoKey, StoredExit, StoredRoundState, Unlocked,
71};
72use crate::round::RoundState;
73use crate::vtxo::{VtxoState, VtxoStateKind};
74use crate::{WalletProperties, WalletVtxo};
75
76pub mod partition {
77 pub const PROPERTIES: u8 = 0;
78 #[allow(unused)]
79 pub const BDK_CHANGESET: u8 = 1;
80 pub const VTXO: u8 = 2;
81 pub const PUBLIC_KEY: u8 = 3;
82 pub const PENDING_BOARD: u8 = 4;
83 pub const ROUND_STATE: u8 = 5;
84 pub const MOVEMENT: u8 = 6;
85 pub const LIGHTNING_SEND: u8 = 7;
86 pub const LIGHTNING_RECEIVE: u8 = 8;
87 pub const EXIT_VTXO: u8 = 9;
88 pub const EXIT_CHILD_TX: u8 = 10;
89 pub const MAILBOX_CHECKPOINT: u8 = 11;
90 pub const PENDING_OFFBOARD: u8 = 12;
91
92 pub const LAST_IDS: u8 = u8::MAX;
93}
94
95#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
97pub struct Record {
98 pub partition: u8,
102
103 pub pk: Vec<u8>,
105
106 pub sort_key: Option<SortKey>,
115
116 pub data: Vec<u8>,
118}
119
120impl Record {
121 fn to_data<T: DeserializeOwned>(&self) -> anyhow::Result<T> {
123 serde_json::from_slice(&self.data).map_err(Into::into)
124 }
125
126 fn from_data<T: Serialize>(
128 partition: u8,
129 pk: &[u8],
130 sort_key: Option<SortKey>,
131 data: &T,
132 ) -> anyhow::Result<Record> {
133 Ok(Record {
134 partition,
135 pk: pk.to_vec(),
136 sort_key,
137 data: serde_json::to_vec(data)?,
138 })
139 }
140}
141
142#[derive(Debug, Clone, Default)]
144pub struct Query {
145 pub partition: u8,
147
148 pub limit: Option<usize>,
150
151 pub start: Option<SortKey>,
153
154 pub end: Option<SortKey>,
156}
157
158impl Query {
159 pub fn new(partition: u8) -> Self {
161 Self {
162 partition,
163 ..Default::default()
164 }
165 }
166
167 pub fn limit(mut self, n: usize) -> Self {
169 self.limit = Some(n);
170 self
171 }
172
173 pub fn start(mut self, start: SortKey) -> Self {
175 self.start = Some(start);
176 self
177 }
178
179 pub fn end(mut self, end: SortKey) -> Self {
181 self.end = Some(end);
182 self
183 }
184}
185
186#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
219#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
220pub trait StorageAdaptor: Send + Sync + 'static {
221 async fn put(&mut self, record: Record) -> anyhow::Result<()>;
223
224 async fn get(&self, partition: u8, pk: &[u8]) -> anyhow::Result<Option<Record>>;
228
229 async fn delete(&mut self, partition: u8, pk: &[u8]) -> anyhow::Result<Option<Record>>;
233
234 async fn query(&self, query: Query) -> anyhow::Result<Vec<Record>>;
238
239 async fn incremental_id(&mut self, partition: u8) -> anyhow::Result<u32> {
241 let last_partition_id = self.get(partition::LAST_IDS, &[partition]).await?
242 .map(|r| r.to_data::<u32>()).unwrap_or(Ok(0))?;
243 let next_partition_id = last_partition_id + 1;
244
245 let record = Record::from_data(
246 partition::LAST_IDS,
247 &[partition],
248 None,
249 &next_partition_id,
250 )?;
251
252 self.put(record).await?;
253 Ok(next_partition_id)
254 }
255}
256
257async fn get_vtxo(adaptor: &dyn StorageAdaptor, id: VtxoId) -> anyhow::Result<Option<SerdeVtxo>> {
258 match adaptor.get(partition::VTXO, &id.to_bytes()).await? {
259 Some(record) => Ok(Some(record.to_data::<SerdeVtxo>()?)),
260 None => Ok(None),
261 }
262}
263
264async fn get_check_vtxo_state(
265 adaptor: &dyn StorageAdaptor,
266 vtxo_id: VtxoId,
267 allowed_states: &[VtxoStateKind],
268) -> anyhow::Result<SerdeVtxo> {
269 let vtxo = get_vtxo(adaptor, vtxo_id).await?
270 .context("vtxo not found")?;
271
272 let current_state = vtxo.current_state().context("vtxo has no state")?;
273 if !allowed_states.contains(¤t_state.kind()) {
274 bail!("current state {:?} not in allowed states {:?}",
275 current_state.kind(), allowed_states
276 );
277 }
278
279 Ok(vtxo)
280}
281
282async fn update_vtxo_state_checked(
283 adaptor: &mut dyn StorageAdaptor,
284 vtxo_id: VtxoId,
285 new_state: VtxoState,
286 allowed_old_states: &[VtxoStateKind],
287) -> anyhow::Result<WalletVtxo> {
288 let mut serde_vtxo = get_check_vtxo_state(adaptor, vtxo_id, allowed_old_states).await?;
289
290 let sk = sort::vtxo_sort_key(
291 new_state.kind(), serde_vtxo.vtxo.expiry_height(), serde_vtxo.vtxo.amount()
292 );
293
294 serde_vtxo.states.push(new_state.clone());
295 let updated_record = Record::from_data(
296 partition::VTXO,
297 &vtxo_id.to_bytes(),
298 Some(sk),
299 &serde_vtxo,
300 )?;
301
302 adaptor.put(updated_record).await?;
303
304 Ok(WalletVtxo {
305 vtxo: serde_vtxo.vtxo,
306 state: new_state,
307 })
308}
309
310pub struct StorageAdaptorWrapper<S: StorageAdaptor> {
311 inner: tokio::sync::RwLock<S>,
312}
313
314impl<S: StorageAdaptor> StorageAdaptorWrapper<S> {
315 pub fn new(inner: S) -> Self {
316 Self {
317 inner: tokio::sync::RwLock::new(inner),
318 }
319 }
320}
321
322#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
324#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
325impl <S: StorageAdaptor> BarkPersister for StorageAdaptorWrapper<S> {
326 async fn init_wallet(&self, properties: &WalletProperties) -> anyhow::Result<()> {
327 let record = Record::from_data(
328 partition::PROPERTIES,
329 &[],
331 None,
332 properties,
333 )?;
334 self.inner.write().await.put(record).await
335 }
336
337 async fn read_properties(&self) -> anyhow::Result<Option<WalletProperties>> {
338 match self.inner.read().await.get(partition::PROPERTIES, &[]).await? {
339 Some(record) => Ok(Some(record.to_data()?)),
340 None => Ok(None),
341 }
342 }
343
344 async fn set_server_pubkey(&self, server_pubkey: PublicKey) -> anyhow::Result<()> {
345 let mut properties = match self.read_properties().await? {
346 Some(properties) => properties,
347 None => bail!("wallet not initialized"),
348 };
349
350 properties.server_pubkey = Some(server_pubkey);
351
352 let record = Record::from_data(partition::PROPERTIES, &[], None, &properties)?;
353 self.inner.write().await.put(record).await
354 }
355
356 #[cfg(feature = "onchain-bdk")]
357 async fn initialize_bdk_wallet(&self) -> anyhow::Result<ChangeSet> {
358 match self.inner.read().await.get(partition::BDK_CHANGESET, &[]).await? {
359 Some(record) => record.to_data(),
360 None => Ok(ChangeSet::default()),
361 }
362 }
363
364 #[cfg(feature = "onchain-bdk")]
365 async fn store_bdk_wallet_changeset(&self, changeset: &ChangeSet) -> anyhow::Result<()> {
366 let mut current = self.initialize_bdk_wallet().await?;
367 current.merge(changeset.clone());
368
369 let record = Record::from_data(
370 partition::BDK_CHANGESET,
371 &[],
373 None,
374 ¤t,
375 )?;
376 self.inner.write().await.put(record).await
377 }
378
379 async fn create_new_movement(
380 &self,
381 status: MovementStatus,
382 subsystem: &MovementSubsystem,
383 time: DateTime<Local>,
384 ) -> anyhow::Result<MovementId> {
385 let mut lock = self.inner.write().await;
386
387 let id = MovementId(lock.incremental_id(partition::MOVEMENT).await?);
388 let movement = Movement::new(id, status, subsystem, time);
389
390 let record = Record::from_data(
391 partition::MOVEMENT,
392 &id.to_bytes(),
393 Some(sort::movement_sort_key(&time)),
394 &movement,
395 )?;
396 lock.put(record).await?;
397
398 Ok(id)
399 }
400
401 async fn update_movement(&self, movement: &Movement) -> anyhow::Result<()> {
402 let record = Record::from_data(
403 partition::MOVEMENT,
404 &movement.id.to_bytes(),
405 Some(sort::movement_sort_key(&movement.time.created_at)),
406 movement,
407 )?;
408 self.inner.write().await.put(record).await
409 }
410
411 async fn get_movement_by_id(&self, movement_id: MovementId) -> anyhow::Result<Movement> {
412 self.inner.read().await.get(partition::MOVEMENT, &movement_id.to_bytes())
413 .await?
414 .context("movement not found")?
415 .to_data()
416 }
417
418 async fn get_all_movements(&self) -> anyhow::Result<Vec<Movement>> {
419 let records = self.inner.read().await.query(Query::new(partition::MOVEMENT)).await?;
420 records.into_iter().map(|r| r.to_data()).collect()
421 }
422
423 async fn store_pending_board(
424 &self,
425 vtxo: &Vtxo<Full>,
426 funding_tx: &Transaction,
427 movement_id: MovementId,
428 ) -> anyhow::Result<()> {
429 let pending_board = PendingBoard {
430 vtxos: vec![vtxo.id()],
431 amount: vtxo.amount(),
432 funding_tx: funding_tx.clone(),
433 movement_id,
434 };
435
436 let record = Record::from_data(
437 partition::PENDING_BOARD,
438 &vtxo.id().to_bytes(),
439 None,
440 &pending_board,
441 )?;
442
443 self.inner.write().await.put(record).await
444 }
445
446 async fn remove_pending_board(&self, vtxo_id: &VtxoId) -> anyhow::Result<()> {
447 self.inner.write().await.delete(partition::PENDING_BOARD, &vtxo_id.to_bytes()).await?;
448 Ok(())
449 }
450
451 async fn get_all_pending_board_ids(&self) -> anyhow::Result<Vec<VtxoId>> {
452 let records = self
453 .inner.read().await.query(Query::new(partition::PENDING_BOARD))
454 .await?;
455 records
456 .into_iter()
457 .map(|r| {
458 let board: PendingBoard = r.to_data()?;
459 Ok(board.vtxos.into_iter().next().context("empty vtxos")?)
460 })
461 .collect()
462 }
463
464 async fn get_pending_board_by_vtxo_id(
465 &self,
466 vtxo_id: VtxoId,
467 ) -> anyhow::Result<Option<PendingBoard>> {
468 match self.inner.read().await.get(partition::PENDING_BOARD, &vtxo_id.to_bytes()).await? {
469 Some(record) => Ok(Some(record.to_data()?)),
470 None => Ok(None),
471 }
472 }
473
474 async fn store_round_state_lock_vtxos(
475 &self,
476 round_state: &RoundState,
477 ) -> anyhow::Result<RoundStateId> {
478 let mut lock = self.inner.write().await;
479
480 let id = RoundStateId(lock.incremental_id(partition::ROUND_STATE).await?);
481
482 let allowed_states = &[VtxoStateKind::Spendable];
483
484 for vtxo in round_state.participation().inputs.iter() {
486 get_check_vtxo_state(&mut *lock, vtxo.id(), allowed_states).await?;
487 }
488
489 for vtxo in round_state.participation().inputs.iter() {
490 update_vtxo_state_checked(
491 &mut *lock,
492 vtxo.id(),
493 VtxoState::Locked { movement_id: round_state.movement_id },
494 allowed_states,
495 ).await?;
496 }
497
498 let serde_state = SerdeRoundState::from(round_state);
499 let record = Record::from_data(
500 partition::ROUND_STATE,
501 &id.to_bytes(),
502 Some(sort::SortKey::u32_asc(id.0)),
503 &serde_state,
504 )?;
505 lock.put(record).await?;
506
507 Ok(id)
508 }
509
510 async fn update_round_state(&self, round_state: &StoredRoundState) -> anyhow::Result<()> {
511 let serde_state = SerdeRoundState::from(round_state.state());
512 let record = Record::from_data(
513 partition::ROUND_STATE,
514 &round_state.id().to_bytes(),
515 Some(sort::SortKey::u32_asc(round_state.id().0)),
516 &serde_state,
517 )?;
518 self.inner.write().await.put(record).await
519 }
520
521 async fn remove_round_state(&self, round_state: &StoredRoundState) -> anyhow::Result<()> {
522 self.inner.write().await
523 .delete(partition::ROUND_STATE, &round_state.id().to_bytes()).await?;
524 Ok(())
525 }
526
527 async fn get_round_state_by_id(&self, _id: RoundStateId) -> anyhow::Result<Option<StoredRoundState<Unlocked>>> {
528 let record = self.inner.read().await
529 .get(partition::ROUND_STATE, &_id.to_bytes()).await?;
530 match record {
531 Some(r) => {
532 let pk_slice: [u8; 4] = r.pk[..4].try_into().expect("4 bytes shouldn't fail");
533 let id = RoundStateId(u32::from_be_bytes(pk_slice));
534 let state = r.to_data::<SerdeRoundState>()?.into();
535 Ok(Some(StoredRoundState::new(id, state)))
536 },
537 None => Ok(None),
538 }
539 }
540
541 async fn get_pending_round_state_ids(&self) -> anyhow::Result<Vec<RoundStateId>> {
542 let records = self.inner.read().await
543 .query(Query::new(partition::ROUND_STATE)).await?;
544 records.into_iter()
545 .map(|r| {
546 let pk_slice: [u8; 4] = r.pk[..4].try_into().expect("4 bytes shouldn't fail");
547 Ok(RoundStateId(u32::from_be_bytes(pk_slice)))
548 })
549 .collect()
550 }
551
552 async fn store_vtxos(&self, vtxos: &[(&Vtxo<Full>, &VtxoState)]) -> anyhow::Result<()> {
553 let mut lock = self.inner.write().await;
554
555 for (vtxo, state) in vtxos {
556 let serde_vtxo = SerdeVtxo {
557 vtxo: (*vtxo).clone(),
558 states: vec![(*state).clone()],
559 };
560
561 let sk = sort::vtxo_sort_key(
562 state.kind(), vtxo.expiry_height(), vtxo.amount(),
563 );
564 let record = Record::from_data(
565 partition::VTXO,
566 &vtxo.id().to_bytes(),
567 Some(sk),
568 &serde_vtxo,
569 )?;
570 lock.put(record).await?;
571 }
572 Ok(())
573 }
574
575 async fn get_wallet_vtxo(&self, id: VtxoId) -> anyhow::Result<Option<WalletVtxo>> {
576 let lock = self.inner.read().await;
577 match get_vtxo(&*lock, id).await? {
578 Some(vtxo) => Ok(Some(WalletVtxo {
579 state: vtxo.current_state().context("vtxo has no state")?.clone(),
580 vtxo: vtxo.vtxo,
581 })),
582 None => Ok(None),
583 }
584 }
585
586 async fn get_all_vtxos(&self) -> anyhow::Result<Vec<WalletVtxo>> {
587 let records = self.inner.read().await
588 .query(Query::new(partition::VTXO)).await?;
589
590 records
591 .into_iter()
592 .map(|r| {
593 let serde_vtxo = r.to_data::<SerdeVtxo>()?;
594 let state = serde_vtxo
595 .current_state()
596 .cloned()
597 .context("vtxo has no state")?;
598 Ok(WalletVtxo {
599 vtxo: serde_vtxo.vtxo,
600 state,
601 })
602 })
603 .collect()
604 }
605
606 async fn get_vtxos_by_state(
607 &self,
608 states: &[VtxoStateKind],
609 ) -> anyhow::Result<Vec<WalletVtxo>> {
610 let lock = self.inner.read().await;
611
612 let range = |state: VtxoStateKind| {
613 let start = sort::vtxo_sort_key(state, u32::MIN, Amount::ZERO);
614 let end = sort::vtxo_sort_key(state, u32::MAX, Amount::MAX);
615 (start, end)
616 };
617
618 let mut records = Vec::new();
619 for state in states {
620 let (start, end) = range(*state);
621 let query = Query::new(partition::VTXO).start(start).end(end);
622
623 for record in lock.query(query).await? {
624 let serde_vtxo = record.to_data::<SerdeVtxo>()?;
625 let current_state = serde_vtxo.current_state()
626 .context("vtxo has no current state")?.clone();
627 debug_assert_eq!(current_state.kind(), *state);
628 records.push(WalletVtxo {
629 vtxo: serde_vtxo.vtxo,
630 state: current_state,
631 });
632 }
633 }
634
635 Ok(records)
636 }
637
638 async fn remove_vtxo(&self, id: VtxoId) -> anyhow::Result<Option<Vtxo<Full>>> {
639 match self.inner.write().await.delete(partition::VTXO, &id.to_bytes()).await? {
640 Some(record) => Ok(Some(record.to_data::<SerdeVtxo>()?.vtxo)),
641 None => Ok(None),
642 }
643 }
644
645 async fn has_spent_vtxo(&self, id: VtxoId) -> anyhow::Result<bool> {
646 match self.get_wallet_vtxo(id).await? {
647 Some(vtxo) => Ok(vtxo.state.kind() == VtxoStateKind::Spent),
648 None => Ok(false),
649 }
650 }
651
652 async fn update_vtxo_state_checked(
653 &self,
654 vtxo_id: VtxoId,
655 new_state: VtxoState,
656 allowed_old_states: &[VtxoStateKind],
657 ) -> anyhow::Result<WalletVtxo> {
658 let mut lock = self.inner.write().await;
659 update_vtxo_state_checked(&mut *lock, vtxo_id, new_state, allowed_old_states).await
660 }
661
662 async fn store_vtxo_key(&self, index: u32, public_key: PublicKey) -> anyhow::Result<()> {
663 let vtxo_key = SerdeVtxoKey { index, public_key };
664 let record = Record::from_data(
665 partition::PUBLIC_KEY,
666 &public_key.serialize()[..],
667 Some(sort::SortKey::u64_desc(index as u64)),
668 &vtxo_key,
669 )?;
670 self.inner.write().await.put(record).await
671 }
672
673 async fn get_last_vtxo_key_index(&self) -> anyhow::Result<Option<u32>> {
674 let query = Query::new(partition::PUBLIC_KEY).limit(1);
676 let records = self.inner.read().await.query(query).await?;
677
678 match records.into_iter().next() {
679 Some(record) => {
680 let vtxo_key = record.to_data::<SerdeVtxoKey>()?;
681 Ok(Some(vtxo_key.index))
682 }
683 None => Ok(None),
684 }
685 }
686
687 async fn get_public_key_idx(&self, public_key: &PublicKey) -> anyhow::Result<Option<u32>> {
688 match self.inner.read().await
689 .get(partition::PUBLIC_KEY, &public_key.serialize()[..]).await?
690 {
691 Some(record) => {
692 let vtxo_key = record.to_data::<SerdeVtxoKey>()?;
693 Ok(Some(vtxo_key.index))
694 }
695 None => Ok(None),
696 }
697 }
698
699 async fn get_mailbox_checkpoint(&self) -> anyhow::Result<u64> {
700 match self.inner.read().await
701 .get(partition::MAILBOX_CHECKPOINT, &[]).await?
702 {
703 Some(record) => Ok(record.to_data::<u64>()?),
704 None => Ok(0),
705 }
706 }
707
708 async fn store_mailbox_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
709 let mut lock = self.inner.write().await;
710 let record = Record::from_data(
711 partition::MAILBOX_CHECKPOINT,
712 &[],
713 None,
714 &checkpoint,
715 )?;
716 lock.put(record).await?;
717 Ok(())
718 }
719
720 async fn store_new_pending_lightning_send(
721 &self,
722 invoice: &Invoice,
723 amount: Amount,
724 fee: Amount,
725 vtxo_ids: &[VtxoId],
726 movement_id: MovementId,
727 ) -> anyhow::Result<LightningSend> {
728 let mut lock = self.inner.write().await;
729 let mut htlc_vtxos = Vec::with_capacity(vtxo_ids.len());
730 for vtxo_id in vtxo_ids {
731 let vtxo = get_vtxo(&*lock, *vtxo_id).await?
732 .context("vtxo not found")?;
733 htlc_vtxos.push(vtxo.to_wallet_vtxo()?);
734 }
735
736 let lightning_send = LightningSend {
737 invoice: invoice.clone(),
738 amount,
739 fee,
740 htlc_vtxos,
741 preimage: None,
742 movement_id,
743 finished_at: None,
744 };
745
746 let record = Record::from_data(
747 partition::LIGHTNING_SEND,
748 &invoice.payment_hash().to_byte_array(),
749 None,
750 &lightning_send,
751 )?;
752
753 lock.put(record).await?;
754
755 Ok(lightning_send)
756 }
757
758 async fn get_all_pending_lightning_send(&self) -> anyhow::Result<Vec<LightningSend>> {
759 let records = self.inner.read().await
760 .query(Query::new(partition::LIGHTNING_SEND)).await?;
761 records
762 .into_iter()
763 .filter_map(|r| {
764 let send: LightningSend = r.to_data().ok()?;
765 if send.finished_at.is_none() {
766 Some(Ok(send))
767 } else {
768 None
769 }
770 })
771 .collect()
772 }
773
774 async fn finish_lightning_send(
775 &self,
776 payment_hash: PaymentHash,
777 preimage: Option<Preimage>,
778 ) -> anyhow::Result<()> {
779 let mut lock = self.inner.write().await;
780
781 let pk = payment_hash.to_byte_array();
782 let record = lock
783 .get(partition::LIGHTNING_SEND, &pk).await?.context("lightning send not found")?;
784 let mut lightning_send: LightningSend = record.to_data()?;
785
786 lightning_send.preimage = preimage;
787 lightning_send.finished_at = Some(Local::now());
788
789 let updated_record = Record::from_data(
790 partition::LIGHTNING_SEND,
791 &pk,
792 None,
793 &lightning_send,
794 )?;
795 lock.put(updated_record).await?;
796
797 Ok(())
798 }
799
800 async fn remove_lightning_send(&self, payment_hash: PaymentHash) -> anyhow::Result<()> {
801 self.inner.write().await.delete(partition::LIGHTNING_SEND, &payment_hash.to_byte_array()).await?;
802 Ok(())
803 }
804
805 async fn get_lightning_send(
806 &self,
807 payment_hash: PaymentHash,
808 ) -> anyhow::Result<Option<LightningSend>> {
809 match self.inner.read().await
810 .get(partition::LIGHTNING_SEND, &payment_hash.to_byte_array()).await?
811 {
812 Some(record) => Ok(Some(record.to_data()?)),
813 None => Ok(None),
814 }
815 }
816
817 async fn store_lightning_receive(
818 &self,
819 payment_hash: PaymentHash,
820 preimage: Preimage,
821 invoice: &Bolt11Invoice,
822 htlc_recv_cltv_delta: BlockDelta,
823 ) -> anyhow::Result<()> {
824 let lightning_receive = LightningReceive {
825 payment_hash,
826 payment_preimage: preimage,
827 invoice: invoice.clone(),
828 htlc_recv_cltv_delta,
829 htlc_vtxos: vec![],
830 movement_id: None,
831 finished_at: None,
832 preimage_revealed_at: None,
833 };
834
835 let record = Record::from_data(
836 partition::LIGHTNING_RECEIVE,
837 &payment_hash.to_byte_array(),
838 None,
839 &lightning_receive,
840 )?;
841 self.inner.write().await.put(record).await
842 }
843
844 async fn get_all_pending_lightning_receives(&self) -> anyhow::Result<Vec<LightningReceive>> {
845 let records = self.inner.read().await
846 .query(Query::new(partition::LIGHTNING_RECEIVE))
847 .await?;
848 records
849 .into_iter()
850 .filter_map(|r| {
851 let receive: LightningReceive = r.to_data().ok()?;
852 if receive.finished_at.is_none() {
853 Some(Ok(receive))
854 } else {
855 None
856 }
857 })
858 .collect()
859 }
860
861 async fn set_preimage_revealed(&self, payment_hash: PaymentHash) -> anyhow::Result<()> {
862 let mut lock = self.inner.write().await;
863
864 let pk = payment_hash.to_byte_array();
865 let record = lock.get(partition::LIGHTNING_RECEIVE, &pk).await?
866 .context("lightning receive not found")?;
867 let mut lightning_receive: LightningReceive = record.to_data()?;
868
869 lightning_receive.preimage_revealed_at = Some(Local::now());
870
871 let updated_record = Record::from_data(
872 partition::LIGHTNING_RECEIVE,
873 &pk,
874 None,
875 &lightning_receive,
876 )?;
877 lock.put(updated_record).await
878 }
879
880 async fn update_lightning_receive(
881 &self,
882 payment_hash: PaymentHash,
883 vtxo_ids: &[VtxoId],
884 movement_id: MovementId,
885 ) -> anyhow::Result<()> {
886 let mut lock = self.inner.write().await;
887 let pk = payment_hash.to_byte_array();
888 let record = lock.get(partition::LIGHTNING_RECEIVE, &pk).await?
889 .context("lightning receive not found")?;
890 let mut lightning_receive: LightningReceive = record.to_data()?;
891
892 let mut htlc_vtxos = Vec::with_capacity(vtxo_ids.len());
893 for vtxo_id in vtxo_ids {
894 let vtxo = get_vtxo(&*lock, *vtxo_id).await?
895 .context("vtxo not found")?;
896 htlc_vtxos.push(vtxo.to_wallet_vtxo()?);
897 }
898
899 lightning_receive.htlc_vtxos = htlc_vtxos;
900 lightning_receive.movement_id = Some(movement_id);
901
902 let updated_record = Record::from_data(
903 partition::LIGHTNING_RECEIVE,
904 &pk,
905 None,
906 &lightning_receive,
907 )?;
908 lock.put(updated_record).await
909 }
910
911 async fn fetch_lightning_receive_by_payment_hash(
912 &self,
913 payment_hash: PaymentHash,
914 ) -> anyhow::Result<Option<LightningReceive>> {
915 match self.inner.read().await
916 .get(partition::LIGHTNING_RECEIVE, &payment_hash.to_byte_array()).await?
917 {
918 Some(record) => Ok(Some(record.to_data()?)),
919 None => Ok(None),
920 }
921 }
922
923 async fn finish_pending_lightning_receive(
924 &self,
925 payment_hash: PaymentHash,
926 ) -> anyhow::Result<()> {
927 let mut lock = self.inner.write().await;
928 let pk = payment_hash.to_byte_array();
929 let record = lock.get(partition::LIGHTNING_RECEIVE, &pk).await?
930 .context("lightning receive not found")?;
931 let mut lightning_receive: LightningReceive = record.to_data()?;
932
933 lightning_receive.finished_at = Some(Local::now());
934
935 let updated_record = Record::from_data(
936 partition::LIGHTNING_RECEIVE,
937 &pk,
938 None,
939 &lightning_receive,
940 )?;
941 lock.put(updated_record).await
942 }
943
944 async fn store_pending_offboard(&self, pending: &PendingOffboard) -> anyhow::Result<()> {
945 let record = Record::from_data(
946 partition::PENDING_OFFBOARD,
947 &pending.movement_id.to_bytes(),
948 None,
949 pending,
950 )?;
951 self.inner.write().await.put(record).await
952 }
953
954 async fn get_pending_offboards(&self) -> anyhow::Result<Vec<PendingOffboard>> {
955 let records = self.inner.read().await
956 .query(Query::new(partition::PENDING_OFFBOARD)).await?;
957 records.into_iter().map(|r| r.to_data()).collect()
958 }
959
960 async fn remove_pending_offboard(&self, movement_id: MovementId) -> anyhow::Result<()> {
961 self.inner.write().await
962 .delete(partition::PENDING_OFFBOARD, &movement_id.to_bytes()).await?;
963 Ok(())
964 }
965
966 async fn store_exit_vtxo_entry(&self, exit: &StoredExit) -> anyhow::Result<()> {
967 let record = Record::from_data(
968 partition::EXIT_VTXO,
969 &exit.vtxo_id.to_bytes(),
970 None,
971 exit,
972 )?;
973 self.inner.write().await.put(record).await
974 }
975
976 async fn remove_exit_vtxo_entry(&self, id: &VtxoId) -> anyhow::Result<()> {
977 self.inner.write().await.delete(partition::EXIT_VTXO, &id.to_bytes()).await?;
978 Ok(())
979 }
980
981 async fn get_exit_vtxo_entries(&self) -> anyhow::Result<Vec<StoredExit>> {
982 let records = self.inner.read().await
983 .query(Query::new(partition::EXIT_VTXO)).await?;
984 records.into_iter().map(|r| r.to_data()).collect()
985 }
986
987 async fn store_exit_child_tx(
988 &self,
989 exit_txid: Txid,
990 child_tx: &Transaction,
991 origin: ExitTxOrigin,
992 ) -> anyhow::Result<()> {
993 let exit_child = SerdeExitChildTx {
994 child_tx: child_tx.clone(),
995 origin,
996 };
997 let record = Record::from_data(
998 partition::EXIT_CHILD_TX,
999 &exit_txid.to_byte_array(),
1000 None,
1001 &exit_child,
1002 )?;
1003 self.inner.write().await.put(record).await
1004 }
1005
1006 async fn get_exit_child_tx(
1007 &self,
1008 exit_txid: Txid,
1009 ) -> anyhow::Result<Option<(Transaction, ExitTxOrigin)>> {
1010 match self.inner.read().await
1011 .get(partition::EXIT_CHILD_TX, &exit_txid.to_byte_array()).await?
1012 {
1013 Some(record) => {
1014 let exit_child = record.to_data::<SerdeExitChildTx>()?;
1015 Ok(Some((exit_child.child_tx, exit_child.origin)))
1016 }
1017 None => Ok(None),
1018 }
1019 }
1020}
1021
1022#[cfg(test)]
1023mod tests {
1024 use super::*;
1025
1026 #[test]
1027 fn storage_query_builder() {
1028 let query = Query::new(0)
1029 .limit(10)
1030 .start(SortKey::u32_asc(100))
1031 .end(SortKey::u32_asc(200));
1032
1033 assert_eq!(query.partition, 0);
1034 assert_eq!(query.limit, Some(10));
1035 assert_eq!(query.start, Some(SortKey::u32_asc(100)));
1036 assert_eq!(query.end, Some(SortKey::u32_asc(200)));
1037 }
1038}
1039
1040#[cfg(test)]
1055pub mod test_suite {
1056 use super::*;
1057 use super::partition::LAST_IDS;
1058 use super::sort::SortKey;
1059
1060 async fn clear_partitions<S: StorageAdaptor>(storage: &mut S, partitions: &[u8]) -> anyhow::Result<()> {
1061 for partition in partitions {
1062 let records = storage.query(Query::new(*partition)).await?;
1063 for record in records {
1064 storage.delete(record.partition, &record.pk).await?;
1065 }
1066 }
1067 Ok(())
1068 }
1069
1070 pub async fn run_all<S: StorageAdaptor>(storage: &mut S) {
1072 test_put_insert(storage).await;
1074 test_put_upsert(storage).await;
1075 test_put_with_sort_key(storage).await;
1076 test_put_without_sort_key(storage).await;
1077 test_put_multiple_partitions(storage).await;
1078
1079 test_get_existing(storage).await;
1081 test_get_after_update(storage).await;
1082
1083 test_delete_existing(storage).await;
1085 test_delete_nonexistent(storage).await;
1086 test_delete_idempotent(storage).await;
1087
1088 test_query_empty_partition(storage).await;
1090 test_query_returns_partition_records(storage).await;
1091 test_query_ordering(storage).await;
1092 test_query_with_limit(storage).await;
1093 test_query_null_sort_key_ordering(storage).await;
1094 test_query_partition_isolation(storage).await;
1095 test_query_range(storage).await;
1096
1097 test_incremental_id_starts_at_one(storage).await;
1099 test_incremental_id_increments(storage).await;
1100 test_incremental_id_partition_isolation(storage).await;
1101 test_incremental_id_persists_across_operations(storage).await;
1102 }
1103
1104 pub async fn test_put_insert<S: StorageAdaptor>(storage: &mut S) {
1106 let record = Record {
1107 pk: "put_insert_1".into(),
1108 partition: 0,
1109 sort_key: None,
1110 data: b"test data".to_vec(),
1111 };
1112
1113 storage.put(record).await.expect("put should succeed");
1114
1115 let retrieved = storage
1116 .get(0, b"put_insert_1")
1117 .await
1118 .expect("get should succeed")
1119 .expect("record should exist");
1120
1121 assert_eq!(retrieved.pk, b"put_insert_1");
1122 assert_eq!(retrieved.partition, 0);
1123 assert_eq!(retrieved.data, b"test data".to_vec());
1124 }
1125
1126 pub async fn test_put_upsert<S: StorageAdaptor>(storage: &mut S) {
1128 let record1 = Record {
1129 pk: b"put_upsert_1".into(),
1130 partition: 0,
1131 sort_key: None,
1132 data: b"original".to_vec(),
1133 };
1134 storage.put(record1).await.expect("first put should succeed");
1135
1136 let record2 = Record {
1137 pk: "put_upsert_1".into(),
1138 partition: 0,
1139 sort_key: None,
1140 data: b"updated".to_vec(),
1141 };
1142 storage
1143 .put(record2)
1144 .await
1145 .expect("second put should succeed");
1146
1147 let retrieved = storage
1148 .get(0, b"put_upsert_1")
1149 .await
1150 .expect("get should succeed")
1151 .expect("record should exist");
1152
1153 assert_eq!(retrieved.data, b"updated".to_vec(), "data should be updated");
1154 }
1155
1156 pub async fn test_put_with_sort_key<S: StorageAdaptor>(storage: &mut S) {
1158 let sort_key = SortKey::u32_asc(42);
1159 let record = Record {
1160 pk: b"put_sort_key_1".into(),
1161 partition: 0,
1162 sort_key: Some(sort_key.clone()),
1163 data: b"with sort key".to_vec(),
1164 };
1165
1166 storage.put(record).await.expect("put should succeed");
1167
1168 let retrieved = storage
1169 .get(0, b"put_sort_key_1")
1170 .await
1171 .expect("get should succeed")
1172 .expect("record should exist");
1173
1174 assert_eq!(retrieved.sort_key, Some(sort_key));
1175 }
1176
1177 pub async fn test_put_without_sort_key<S: StorageAdaptor>(storage: &mut S) {
1179 let record = Record {
1180 pk: b"put_no_sort_key_1".into(),
1181 partition: 0,
1182 sort_key: None,
1183 data: b"no sort key".to_vec(),
1184 };
1185
1186 storage.put(record).await.expect("put should succeed");
1187
1188 let retrieved = storage
1189 .get(0, b"put_no_sort_key_1")
1190 .await
1191 .expect("get should succeed")
1192 .expect("record should exist");
1193
1194 assert!(retrieved.sort_key.is_none());
1195 }
1196
1197 pub async fn test_put_multiple_partitions<S: StorageAdaptor>(storage: &mut S) {
1199 let record_a = Record {
1200 pk: "put_multi_a".into(),
1201 partition: 0,
1202 sort_key: None,
1203 data: b"in partition a".to_vec(),
1204 };
1205 let record_b = Record {
1206 pk: "put_multi_b".into(),
1207 partition: 1,
1208 sort_key: None,
1209 data: b"in partition b".to_vec(),
1210 };
1211
1212 storage.put(record_a).await.expect("put a should succeed");
1213 storage.put(record_b).await.expect("put b should succeed");
1214
1215 let retrieved_a = storage
1216 .get(0, b"put_multi_a")
1217 .await
1218 .expect("get should succeed")
1219 .expect("record a should exist");
1220 let retrieved_b = storage
1221 .get(1, b"put_multi_b")
1222 .await
1223 .expect("get should succeed")
1224 .expect("record b should exist");
1225
1226 assert_eq!(retrieved_a.partition, 0);
1227 assert_eq!(retrieved_b.partition, 1);
1228 }
1229
1230 pub async fn test_get_existing<S: StorageAdaptor>(storage: &mut S) {
1232 let record = Record {
1233 pk: b"get_existing_1".into(),
1234 partition: 0,
1235 sort_key: Some(SortKey::u32_asc(100)),
1236 data: b"test".to_vec(),
1237 };
1238 storage.put(record).await.expect("put should succeed");
1239
1240 let retrieved = storage
1241 .get(0, b"get_existing_1")
1242 .await
1243 .expect("get should succeed");
1244
1245 assert!(retrieved.is_some());
1246 let retrieved = retrieved.unwrap();
1247 assert_eq!(retrieved.pk, b"get_existing_1");
1248 assert_eq!(retrieved.partition, 0);
1249 assert_eq!(retrieved.data, b"test".to_vec());
1250
1251 assert!(storage.get(0, b"get_existing_1_").await.unwrap().is_none());
1253 assert!(storage.get(0, b"get_existing_").await.unwrap().is_none());
1255
1256 assert!(storage.get(0, b"get_nonexistent_does_not_exist").await.unwrap().is_none());
1258 }
1259
1260 pub async fn test_get_after_update<S: StorageAdaptor>(storage: &mut S) {
1262 let record1 = Record {
1263 pk: b"get_after_update_1".into(),
1264 partition: 0,
1265 sort_key: None,
1266 data: b"version1".to_vec(),
1267 };
1268 storage.put(record1).await.expect("put should succeed");
1269
1270 let record2 = Record {
1271 pk: b"get_after_update_1".into(),
1272 partition: 0,
1273 sort_key: None,
1274 data: b"version2".to_vec(),
1275 };
1276 storage.put(record2).await.expect("put should succeed");
1277
1278 let retrieved = storage
1279 .get(0, b"get_after_update_1")
1280 .await
1281 .expect("get should succeed")
1282 .expect("record should exist");
1283
1284 assert_eq!(retrieved.data, b"version2".to_vec());
1285 }
1286
1287 pub async fn test_delete_existing<S: StorageAdaptor>(storage: &mut S) {
1289 let record = Record {
1290 pk: b"delete_existing_1".into(),
1291 partition: 0,
1292 sort_key: None,
1293 data: b"to delete".to_vec(),
1294 };
1295 storage.put(record.clone()).await.expect("put should succeed");
1296
1297 let deleted_record = storage
1298 .delete(0, b"delete_existing_1")
1299 .await
1300 .expect("delete should succeed");
1301
1302 assert_eq!(deleted_record, Some(record));
1303
1304 let retrieved = storage
1305 .get(0, b"delete_existing_1")
1306 .await
1307 .expect("get should succeed");
1308 assert!(retrieved.is_none(), "record should no longer exist");
1309 }
1310
1311 pub async fn test_delete_nonexistent<S: StorageAdaptor>(storage: &mut S) {
1313 let deleted_record = storage
1314 .delete(0, b"delete_nonexistent_does_not_exist")
1315 .await
1316 .expect("delete should succeed");
1317
1318 assert!(
1319 deleted_record.is_none(),
1320 "delete should return None for non-existent record"
1321 );
1322 }
1323
1324 pub async fn test_delete_idempotent<S: StorageAdaptor>(storage: &mut S) {
1326 let record = Record {
1327 pk: b"delete_idempotent_1".into(),
1328 partition: 0,
1329 sort_key: None,
1330 data: b"delete twice".to_vec(),
1331 };
1332 storage.put(record.clone()).await.expect("put should succeed");
1333
1334 let first_delete = storage
1335 .delete(0, b"delete_idempotent_1")
1336 .await
1337 .expect("first delete should succeed");
1338 let second_delete = storage
1339 .delete(0, b"delete_idempotent_1")
1340 .await
1341 .expect("second delete should succeed");
1342
1343 assert_eq!(first_delete, Some(record), "first delete should return the record");
1344 assert_eq!(second_delete, None, "second delete should return None");
1345 }
1346
1347 pub async fn test_query_empty_partition<S: StorageAdaptor>(storage: &mut S) {
1349 clear_partitions(storage, &[0]).await.unwrap();
1350 let results = storage
1351 .query(Query::new(0))
1352 .await
1353 .expect("query should succeed");
1354
1355 assert!(results.is_empty());
1356 }
1357
1358 pub async fn test_query_returns_partition_records<S: StorageAdaptor>(storage: &mut S) {
1360 clear_partitions(storage, &[0]).await.unwrap();
1361 for i in 0..3 {
1362 let record = Record {
1363 pk: format!("query_partition_{}", i).into(),
1364 partition: 0,
1365 sort_key: Some(SortKey::u32_asc(i)),
1366 data: format!("record_{}", i).as_bytes().to_vec(),
1367 };
1368 storage.put(record).await.expect("put should succeed");
1369 }
1370
1371 let results = storage
1372 .query(Query::new(0))
1373 .await
1374 .expect("query should succeed");
1375
1376 assert_eq!(results.len(), 3);
1377 }
1378
1379 pub async fn test_query_ordering<S: StorageAdaptor>(storage: &mut S) {
1381 clear_partitions(storage, &[0]).await.unwrap();
1382 for i in [5, 2, 8, 1, 9] {
1384 let record = Record {
1385 pk: format!("query_asc_{}", i).into(),
1386 partition: 0,
1387 sort_key: Some(SortKey::u32_asc(i)),
1388 data: format!("record_{}", i).as_bytes().to_vec(),
1389 };
1390 storage.put(record).await.expect("put should succeed");
1391 }
1392
1393 let results = storage
1394 .query(Query::new(0))
1395 .await
1396 .expect("query should succeed");
1397
1398 let values = results.iter().map(|r| r.data.clone()).collect::<Vec<_>>();
1399 assert_eq!(
1400 values,
1401 vec![b"record_1".to_vec(), b"record_2".to_vec(), b"record_5".to_vec(), b"record_8".to_vec(), b"record_9".to_vec()],
1402 "should be in ascending order"
1403 );
1404 }
1405
1406 pub async fn test_query_with_limit<S: StorageAdaptor>(storage: &mut S) {
1408 clear_partitions(storage, &[0]).await.unwrap();
1409 for i in 0..10 {
1410 let record = Record {
1411 pk: format!("query_limit_{}", i).into(),
1412 partition: 0,
1413 sort_key: Some(SortKey::u32_asc(i)),
1414 data: format!("record_{}", i).as_bytes().to_vec(),
1415 };
1416 storage.put(record).await.expect("put should succeed");
1417 }
1418
1419 let results = storage
1420 .query(Query::new(0).limit(3))
1421 .await
1422 .expect("query should succeed");
1423
1424 assert_eq!(results.len(), 3);
1425 let values = results.iter().map(|r| r.data.clone()).collect::<Vec<_>>();
1426 assert_eq!(
1427 values,
1428 vec![b"record_0".to_vec(), b"record_1".to_vec(), b"record_2".to_vec()],
1429 "should return first 3 records"
1430 );
1431 }
1432
1433 pub async fn test_query_null_sort_key_ordering<S: StorageAdaptor>(storage: &mut S) {
1435 clear_partitions(storage, &[0]).await.unwrap();
1436 let with_key_1 = Record {
1438 pk: "query_null_with_1".into(),
1439 partition: 0,
1440 sort_key: Some(SortKey::u32_asc(1)),
1441 data: b"with_key_1".to_vec(),
1442 };
1443 let with_key_2 = Record {
1444 pk: "query_null_with_2".into(),
1445 partition: 0,
1446 sort_key: Some(SortKey::u32_asc(2)),
1447 data: b"with_key_2".to_vec(),
1448 };
1449
1450 let without_key = Record {
1452 pk: "query_null_without".into(),
1453 partition: 0,
1454 sort_key: None,
1455 data: b"no_key".to_vec(),
1456 };
1457
1458 storage.put(with_key_1).await.expect("put should succeed");
1459 storage.put(without_key).await.expect("put should succeed");
1460 storage.put(with_key_2).await.expect("put should succeed");
1461
1462 let results_asc = storage
1464 .query(Query::new(0))
1465 .await
1466 .expect("query should succeed");
1467
1468 assert_eq!(results_asc.len(), 3);
1469 assert_eq!(results_asc[0].data, b"with_key_1".to_vec());
1470 assert_eq!(results_asc[1].data, b"with_key_2".to_vec());
1471 assert_eq!(results_asc[2].data, b"no_key".to_vec(), "null sort key should be last");
1472 }
1473
1474 pub async fn test_query_partition_isolation<S: StorageAdaptor>(storage: &mut S) {
1476 clear_partitions(storage, &[0, 1]).await.unwrap();
1477 for i in 0..3 {
1479 let record = Record {
1480 pk: format!("query_iso_a_{}", i).into(),
1481 partition: 0,
1482 sort_key: Some(SortKey::u32_asc(i)),
1483 data: format!("record_{}", i).as_bytes().to_vec(),
1484 };
1485 storage.put(record).await.expect("put should succeed");
1486 }
1487
1488 for i in 0..5 {
1490 let record = Record {
1491 pk: format!("query_iso_b_{}", i).into(),
1492 partition: 1,
1493 sort_key: Some(SortKey::u32_asc(i)),
1494 data: format!("record_{}", i + 100).as_bytes().to_vec(),
1495 };
1496 storage.put(record).await.expect("put should succeed");
1497 }
1498
1499 let results_a = storage
1500 .query(Query::new(0))
1501 .await
1502 .expect("query should succeed");
1503
1504 let results_b = storage
1505 .query(Query::new(1))
1506 .await
1507 .expect("query should succeed");
1508
1509 assert_eq!(results_a.len(), 3, "partition A should have 3 records");
1510 assert_eq!(results_b.len(), 5, "partition B should have 5 records");
1511
1512 assert!(results_a
1514 .iter()
1515 .all(|r| r.partition == 0));
1516
1517 assert!(results_b
1519 .iter()
1520 .all(|r| r.partition == 1));
1521 }
1522
1523 pub async fn test_query_range<S: StorageAdaptor>(storage: &mut S) {
1525 clear_partitions(storage, &[0]).await.unwrap();
1526
1527 for i in 1..=10u32 {
1529 let record = Record {
1530 pk: format!("query_range_{}", i).into(),
1531 partition: 0,
1532 sort_key: Some(SortKey::u32_asc(i)),
1533 data: format!("record_{}", i).as_bytes().to_vec(),
1534 };
1535 storage.put(record).await.expect("put should succeed");
1536 }
1537
1538 let results_start = storage
1540 .query(Query::new(0).start(SortKey::u32_asc(5)))
1541 .await
1542 .expect("query should succeed");
1543
1544 assert_eq!(results_start.len(), 6, "should return records 5-10");
1545 let values: Vec<_> = results_start.iter().map(|r| r.data.clone()).collect();
1546 assert_eq!(
1547 values,
1548 vec![
1549 b"record_5".to_vec(),
1550 b"record_6".to_vec(),
1551 b"record_7".to_vec(),
1552 b"record_8".to_vec(),
1553 b"record_9".to_vec(),
1554 b"record_10".to_vec(),
1555 ],
1556 "should return records from 5 onwards"
1557 );
1558
1559 let results_end = storage
1561 .query(Query::new(0).end(SortKey::u32_asc(3)))
1562 .await
1563 .expect("query should succeed");
1564
1565 assert_eq!(results_end.len(), 3, "should return records 1-3");
1566 let values: Vec<_> = results_end.iter().map(|r| r.data.clone()).collect();
1567 assert_eq!(
1568 values,
1569 vec![
1570 b"record_1".to_vec(),
1571 b"record_2".to_vec(),
1572 b"record_3".to_vec(),
1573 ],
1574 "should return records up to 3"
1575 );
1576
1577 let results_range = storage
1579 .query(Query::new(0).start(SortKey::u32_asc(3)).end(SortKey::u32_asc(7)))
1580 .await
1581 .expect("query should succeed");
1582
1583 assert_eq!(results_range.len(), 5, "should return records 3-7");
1584 let values: Vec<_> = results_range.iter().map(|r| r.data.clone()).collect();
1585 assert_eq!(
1586 values,
1587 vec![
1588 b"record_3".to_vec(),
1589 b"record_4".to_vec(),
1590 b"record_5".to_vec(),
1591 b"record_6".to_vec(),
1592 b"record_7".to_vec(),
1593 ],
1594 "should return records in range 3-7"
1595 );
1596
1597 let results_range_limit = storage
1599 .query(Query::new(0).start(SortKey::u32_asc(2)).end(SortKey::u32_asc(8)).limit(3))
1600 .await
1601 .expect("query should succeed");
1602
1603 assert_eq!(results_range_limit.len(), 3, "should return only 3 records due to limit");
1604 let values: Vec<_> = results_range_limit.iter().map(|r| r.data.clone()).collect();
1605 assert_eq!(
1606 values,
1607 vec![
1608 b"record_2".to_vec(),
1609 b"record_3".to_vec(),
1610 b"record_4".to_vec(),
1611 ],
1612 "should return first 3 records in range"
1613 );
1614
1615 let results_empty = storage
1617 .query(Query::new(0).start(SortKey::u32_asc(100)).end(SortKey::u32_asc(200)))
1618 .await
1619 .expect("query should succeed");
1620
1621 assert!(results_empty.is_empty(), "should return no records for out-of-range query");
1622 }
1623
1624 pub async fn test_incremental_id_starts_at_one<S: StorageAdaptor>(storage: &mut S) {
1626 storage.delete(LAST_IDS, b"0").await.unwrap();
1628 let id = storage.incremental_id(0).await
1629 .expect("incremental_id should succeed");
1630
1631 assert_eq!(id, 1, "first id should be 1");
1632 }
1633
1634 pub async fn test_incremental_id_increments<S: StorageAdaptor>(storage: &mut S) {
1636 clear_partitions(storage, &[0, LAST_IDS]).await.unwrap();
1637
1638 let id1 = storage.incremental_id(0).await
1639 .expect("incremental_id should succeed");
1640 let id2 = storage.incremental_id(0).await
1641 .expect("incremental_id should succeed");
1642 let id3 = storage.incremental_id(0).await
1643 .expect("incremental_id should succeed");
1644
1645 assert_eq!(id1, 1, "first id should be 1");
1646 assert_eq!(id2, 2, "second id should be 2");
1647 assert_eq!(id3, 3, "third id should be 3");
1648 }
1649
1650 pub async fn test_incremental_id_partition_isolation<S: StorageAdaptor>(storage: &mut S) {
1652 clear_partitions(storage, &[0, 1, LAST_IDS]).await.unwrap();
1653
1654 let a1 = storage.incremental_id(0).await
1656 .expect("incremental_id should succeed");
1657 let a2 = storage.incremental_id(0).await
1658 .expect("incremental_id should succeed");
1659 let a3 = storage.incremental_id(0).await
1660 .expect("incremental_id should succeed");
1661
1662 let b1 = storage.incremental_id(1).await
1664 .expect("incremental_id should succeed");
1665 let b2 = storage.incremental_id(1).await
1666 .expect("incremental_id should succeed");
1667
1668 assert_eq!(a1, 1);
1670 assert_eq!(a2, 2);
1671 assert_eq!(a3, 3);
1672
1673 assert_eq!(b1, 1);
1675 assert_eq!(b2, 2);
1676
1677 let a4 = storage.incremental_id(0).await.expect("incremental_id should succeed");
1679 assert_eq!(a4, 4);
1680 }
1681
1682 pub async fn test_incremental_id_persists_across_operations<S: StorageAdaptor>(storage: &mut S) {
1684 clear_partitions(storage, &[0, 1, LAST_IDS]).await.unwrap();
1685
1686 let id1 = storage.incremental_id(0).await
1688 .expect("incremental_id should succeed");
1689 let id2 = storage.incremental_id(0).await
1690 .expect("incremental_id should succeed");
1691 assert_eq!(id1, 1);
1692 assert_eq!(id2, 2);
1693
1694 let stored = storage
1696 .get(LAST_IDS, &[0])
1697 .await
1698 .expect("get should succeed")
1699 .expect("id record should exist");
1700 let stored_id: u32 = serde_json::from_slice(&stored.data).expect("should deserialize");
1701 assert_eq!(stored_id, 2, "stored id should be 2");
1702
1703 let id3 = storage.incremental_id(0).await.expect("incremental_id should succeed");
1705 assert_eq!(id3, 3);
1706 }
1707}