1pub extern crate ark;
287
288pub extern crate bip39;
289pub extern crate lightning_invoice;
290pub extern crate lnurl as lnurllib;
291
292#[macro_use] extern crate anyhow;
293#[macro_use] extern crate async_trait;
294#[macro_use] extern crate serde;
295
296pub mod chain;
297pub mod exit;
298pub mod movement;
299pub mod onchain;
300pub mod persist;
301pub mod round;
302pub mod subsystem;
303pub mod vtxo;
304
305#[cfg(feature = "pid-lock")]
306pub mod pid_lock;
307
308mod arkoor;
309mod board;
310mod config;
311mod daemon;
312mod fees;
313mod lightning;
314mod offboard;
315mod psbtext;
316mod mailbox;
317mod utils;
318
319pub use self::arkoor::ArkoorCreateResult;
320pub use self::config::{BarkNetwork, Config};
321pub use self::daemon::DaemonHandle;
322pub use self::fees::FeeEstimate;
323pub use self::vtxo::WalletVtxo;
324
325use std::collections::HashSet;
326use std::sync::Arc;
327
328use anyhow::{bail, Context};
329use bip39::Mnemonic;
330use bitcoin::{Amount, Network, OutPoint};
331use bitcoin::bip32::{self, ChildNumber, Fingerprint};
332use bitcoin::secp256k1::{self, Keypair, PublicKey};
333use log::{trace, info, warn, error};
334use tokio::sync::{Mutex, RwLock};
335
336use ark::lightning::PaymentHash;
337
338use ark::{ArkInfo, ProtocolEncoding, Vtxo, VtxoId, VtxoPolicy, VtxoRequest};
339use ark::address::VtxoDelivery;
340use ark::fees::{validate_and_subtract_fee_min_dust, VtxoFeeInfo};
341use ark::mailbox::MailboxIdentifier;
342use ark::vtxo::{Full, PubkeyVtxoPolicy, VtxoRef};
343use ark::vtxo::policy::signing::VtxoSigner;
344use bitcoin_ext::{BlockHeight, P2TR_DUST, TxStatus};
345use server_rpc::{protos, ServerConnection};
346
347use crate::chain::{ChainSource, ChainSourceSpec};
348use crate::exit::Exit;
349use crate::movement::{Movement, MovementStatus};
350use crate::movement::manager::MovementManager;
351use crate::movement::update::MovementUpdate;
352use crate::onchain::{ExitUnilaterally, PreparePsbt, SignPsbt, Utxo};
353use crate::onchain::DaemonizableOnchainWallet;
354use crate::persist::BarkPersister;
355use crate::persist::models::{PendingOffboard, RoundStateId, StoredRoundState, Unlocked};
356use crate::round::{RoundParticipation, RoundStateLockIndex, RoundStatus};
357use crate::subsystem::{ArkoorMovement, RoundMovement};
358use crate::vtxo::{FilterVtxos, RefreshStrategy, VtxoFilter, VtxoState, VtxoStateKind};
359
360const BARK_PURPOSE_INDEX: u32 = 350;
362const VTXO_KEYS_INDEX: u32 = 0;
364const MAILBOX_KEY_INDEX: u32 = 1;
366
367lazy_static::lazy_static! {
368 static ref SECP: secp256k1::Secp256k1<secp256k1::All> = secp256k1::Secp256k1::new();
370}
371
372fn log_server_pubkey_changed_error(expected: PublicKey, got: PublicKey) {
377 error!(
378 "
379Server public key has changed!
380
381The Ark server's public key is different from the one stored when this
382wallet was created. This typically happens when:
383
384 - The server operator has rotated their keys
385 - You are connecting to a different server
386 - The server has been replaced
387
388For safety, this wallet will not connect to the server until you
389resolve this. You can recover your funds on-chain by doing an emergency exit.
390
391This will exit your VTXOs to on-chain Bitcoin without needing the server's cooperation.
392
393Expected: {expected}
394Got: {got}")
395}
396
397#[derive(Debug, Clone)]
399pub struct LightningReceiveBalance {
400 pub total: Amount,
402 pub claimable: Amount,
404}
405
406#[derive(Debug, Clone)]
408pub struct Balance {
409 pub spendable: Amount,
411 pub pending_lightning_send: Amount,
413 pub claimable_lightning_receive: Amount,
415 pub pending_in_round: Amount,
417 pub pending_exit: Option<Amount>,
420 pub pending_board: Amount,
422}
423
424pub struct UtxoInfo {
425 pub outpoint: OutPoint,
426 pub amount: Amount,
427 pub confirmation_height: Option<u32>,
428}
429
430impl From<Utxo> for UtxoInfo {
431 fn from(value: Utxo) -> Self {
432 match value {
433 Utxo::Local(o) => UtxoInfo {
434 outpoint: o.outpoint,
435 amount: o.amount,
436 confirmation_height: o.confirmation_height,
437 },
438 Utxo::Exit(e) => UtxoInfo {
439 outpoint: e.vtxo.point(),
440 amount: e.vtxo.amount(),
441 confirmation_height: Some(e.height),
442 },
443 }
444 }
445}
446
447pub struct OffchainBalance {
450 pub available: Amount,
452 pub pending_in_round: Amount,
454 pub pending_exit: Amount,
457}
458
459#[derive(Debug, Clone, Serialize, Deserialize)]
461pub struct WalletProperties {
462 pub network: Network,
466
467 pub fingerprint: Fingerprint,
471
472 pub server_pubkey: Option<PublicKey>,
479}
480
481pub struct WalletSeed {
487 master: bip32::Xpriv,
488 vtxo: bip32::Xpriv,
489}
490
491impl WalletSeed {
492 fn new(network: Network, seed: &[u8; 64]) -> Self {
493 let bark_path = [ChildNumber::from_hardened_idx(BARK_PURPOSE_INDEX).unwrap()];
494 let master = bip32::Xpriv::new_master(network, seed)
495 .expect("invalid seed")
496 .derive_priv(&SECP, &bark_path)
497 .expect("purpose is valid");
498
499 let vtxo_path = [ChildNumber::from_hardened_idx(VTXO_KEYS_INDEX).unwrap()];
500 let vtxo = master.derive_priv(&SECP, &vtxo_path)
501 .expect("vtxo path is valid");
502
503 Self { master, vtxo }
504 }
505
506 fn fingerprint(&self) -> Fingerprint {
507 self.master.fingerprint(&SECP)
508 }
509
510 fn derive_vtxo_keypair(&self, idx: u32) -> Keypair {
511 self.vtxo.derive_priv(&SECP, &[idx.into()]).unwrap().to_keypair(&SECP)
512 }
513
514 fn to_mailbox_keypair(&self) -> Keypair {
515 let mailbox_path = [ChildNumber::from_hardened_idx(MAILBOX_KEY_INDEX).unwrap()];
516 self.master.derive_priv(&SECP, &mailbox_path).unwrap().to_keypair(&SECP)
517 }
518}
519
520pub struct Wallet {
653 pub chain: Arc<ChainSource>,
655
656 pub exit: RwLock<Exit>,
658
659 pub movements: Arc<MovementManager>,
661
662 config: Config,
664
665 db: Arc<dyn BarkPersister>,
667
668 seed: WalletSeed,
670
671 server: parking_lot::RwLock<Option<ServerConnection>>,
673
674 inflight_lightning_payments: Mutex<HashSet<PaymentHash>>,
677
678 round_state_lock_index: RoundStateLockIndex,
680}
681
682impl Wallet {
683 pub fn chain_source(
686 config: &Config,
687 ) -> anyhow::Result<ChainSourceSpec> {
688 if let Some(ref url) = config.esplora_address {
689 Ok(ChainSourceSpec::Esplora {
690 url: url.clone(),
691 })
692 } else if let Some(ref url) = config.bitcoind_address {
693 let auth = if let Some(ref c) = config.bitcoind_cookiefile {
694 bitcoin_ext::rpc::Auth::CookieFile(c.clone())
695 } else {
696 bitcoin_ext::rpc::Auth::UserPass(
697 config.bitcoind_user.clone().context("need bitcoind auth config")?,
698 config.bitcoind_pass.clone().context("need bitcoind auth config")?,
699 )
700 };
701 Ok(ChainSourceSpec::Bitcoind {
702 url: url.clone(),
703 auth,
704 })
705 } else {
706 bail!("Need to either provide esplora or bitcoind info");
707 }
708 }
709
710 pub fn require_chainsource_version(&self) -> anyhow::Result<()> {
714 self.chain.require_version()
715 }
716
717 pub async fn network(&self) -> anyhow::Result<Network> {
718 Ok(self.properties().await?.network)
719 }
720
721 pub async fn derive_store_next_keypair(&self) -> anyhow::Result<(Keypair, u32)> {
724 let last_revealed = self.db.get_last_vtxo_key_index().await?;
725
726 let index = last_revealed.map(|i| i + 1).unwrap_or(u32::MIN);
727 let keypair = self.seed.derive_vtxo_keypair(index);
728
729 self.db.store_vtxo_key(index, keypair.public_key()).await?;
730 Ok((keypair, index))
731 }
732
733 pub async fn peak_keypair(&self, index: u32) -> anyhow::Result<Keypair> {
747 let keypair = self.seed.derive_vtxo_keypair(index);
748 if self.db.get_public_key_idx(&keypair.public_key()).await?.is_some() {
749 Ok(keypair)
750 } else {
751 bail!("VTXO key {} does not exist, please derive it first", index)
752 }
753 }
754
755
756 pub async fn pubkey_keypair(&self, public_key: &PublicKey) -> anyhow::Result<Option<(u32, Keypair)>> {
768 if let Some(index) = self.db.get_public_key_idx(&public_key).await? {
769 Ok(Some((index, self.seed.derive_vtxo_keypair(index))))
770 } else {
771 Ok(None)
772 }
773 }
774
775 pub async fn get_vtxo_key(&self, vtxo: impl VtxoRef) -> anyhow::Result<Keypair> {
786 let wallet_vtxo = self.get_vtxo_by_id(vtxo.vtxo_id()).await?;
787 let pubkey = self.find_signable_clause(&wallet_vtxo.vtxo).await
788 .context("VTXO is not signable by wallet")?
789 .pubkey();
790 let idx = self.db.get_public_key_idx(&pubkey).await?
791 .context("VTXO key not found")?;
792 Ok(self.seed.derive_vtxo_keypair(idx))
793 }
794
795 pub async fn peak_address(&self, index: u32) -> anyhow::Result<ark::Address> {
799 let (_, ark_info) = &self.require_server().await?;
800 let network = self.properties().await?.network;
801 let keypair = self.peak_keypair(index).await?;
802
803 let mailbox_kp = self.mailbox_keypair()?;
804 let mailbox = MailboxIdentifier::from_pubkey(mailbox_kp.public_key());
805
806 Ok(ark::Address::builder()
807 .testnet(network != bitcoin::Network::Bitcoin)
808 .server_pubkey(ark_info.server_pubkey)
809 .pubkey_policy(keypair.public_key())
810 .mailbox(ark_info.mailbox_pubkey, mailbox, &keypair)
811 .expect("Failed to assign mailbox")
812 .into_address().unwrap())
813 }
814
815 pub async fn new_address_with_index(&self) -> anyhow::Result<(ark::Address, u32)> {
819 let (_, index) = self.derive_store_next_keypair().await?;
820 let addr = self.peak_address(index).await?;
821 Ok((addr, index))
822 }
823
824 pub async fn new_address(&self) -> anyhow::Result<ark::Address> {
826 let (addr, _) = self.new_address_with_index().await?;
827 Ok(addr)
828 }
829
830 pub async fn create(
836 mnemonic: &Mnemonic,
837 network: Network,
838 config: Config,
839 db: Arc<dyn BarkPersister>,
840 force: bool,
841 ) -> anyhow::Result<Wallet> {
842 trace!("Config: {:?}", config);
843 if let Some(existing) = db.read_properties().await? {
844 trace!("Existing config: {:?}", existing);
845 bail!("cannot overwrite already existing config")
846 }
847
848 let server_pubkey = if !force {
850 match ServerConnection::connect(&config.server_address, network).await {
851 Ok(conn) => {
852 let ark_info = conn.ark_info().await?;
853 Some(ark_info.server_pubkey)
854 }
855 Err(err) => {
856 bail!("Failed to connect to provided server (if you are sure use the --force flag): {}", err);
857 }
858 }
859 } else {
860 None
861 };
862
863 let wallet_fingerprint = WalletSeed::new(network, &mnemonic.to_seed("")).fingerprint();
864 let properties = WalletProperties {
865 network,
866 fingerprint: wallet_fingerprint,
867 server_pubkey,
868 };
869
870 db.init_wallet(&properties).await.context("cannot init wallet in the database")?;
872 info!("Created wallet with fingerprint: {}", wallet_fingerprint);
873 if let Some(pk) = server_pubkey {
874 info!("Stored server pubkey: {}", pk);
875 }
876
877 let wallet = Wallet::open(&mnemonic, db, config).await.context("failed to open wallet")?;
879 wallet.require_chainsource_version()?;
880
881 Ok(wallet)
882 }
883
884 pub async fn create_with_onchain(
892 mnemonic: &Mnemonic,
893 network: Network,
894 config: Config,
895 db: Arc<dyn BarkPersister>,
896 onchain: &dyn ExitUnilaterally,
897 force: bool,
898 ) -> anyhow::Result<Wallet> {
899 let mut wallet = Wallet::create(mnemonic, network, config, db, force).await?;
900 wallet.exit.get_mut().load(onchain).await?;
901 Ok(wallet)
902 }
903
904 pub async fn open(
906 mnemonic: &Mnemonic,
907 db: Arc<dyn BarkPersister>,
908 config: Config,
909 ) -> anyhow::Result<Wallet> {
910 let properties = db.read_properties().await?.context("Wallet is not initialised")?;
911
912 let seed = {
913 let seed = mnemonic.to_seed("");
914 WalletSeed::new(properties.network, &seed)
915 };
916
917 if properties.fingerprint != seed.fingerprint() {
918 bail!("incorrect mnemonic")
919 }
920
921 let chain_source = if let Some(ref url) = config.esplora_address {
922 ChainSourceSpec::Esplora {
923 url: url.clone(),
924 }
925 } else if let Some(ref url) = config.bitcoind_address {
926 let auth = if let Some(ref c) = config.bitcoind_cookiefile {
927 bitcoin_ext::rpc::Auth::CookieFile(c.clone())
928 } else {
929 bitcoin_ext::rpc::Auth::UserPass(
930 config.bitcoind_user.clone().context("need bitcoind auth config")?,
931 config.bitcoind_pass.clone().context("need bitcoind auth config")?,
932 )
933 };
934 ChainSourceSpec::Bitcoind { url: url.clone(), auth }
935 } else {
936 bail!("Need to either provide esplora or bitcoind info");
937 };
938
939 let chain_source_client = ChainSource::new(
940 chain_source, properties.network, config.fallback_fee_rate,
941 ).await?;
942 let chain = Arc::new(chain_source_client);
943
944 let server = match ServerConnection::connect(
945 &config.server_address, properties.network,
946 ).await {
947 Ok(s) => Some(s),
948 Err(e) => {
949 warn!("Ark server handshake failed: {}", e);
950 None
951 }
952 };
953 let server = parking_lot::RwLock::new(server);
954
955 let movements = Arc::new(MovementManager::new(db.clone()));
956 let exit = RwLock::new(Exit::new(db.clone(), chain.clone(), movements.clone()).await?);
957
958 Ok(Wallet {
959 config,
960 db,
961 seed,
962 exit,
963 movements,
964 server,
965 chain,
966 inflight_lightning_payments: Mutex::new(HashSet::new()),
967 round_state_lock_index: RoundStateLockIndex::new(),
968 })
969 }
970
971 pub async fn open_with_onchain(
974 mnemonic: &Mnemonic,
975 db: Arc<dyn BarkPersister>,
976 onchain: &dyn ExitUnilaterally,
977 cfg: Config,
978 ) -> anyhow::Result<Wallet> {
979 let mut wallet = Wallet::open(mnemonic, db, cfg).await?;
980 wallet.exit.get_mut().load(onchain).await?;
981 Ok(wallet)
982 }
983
984 pub fn config(&self) -> &Config {
986 &self.config
987 }
988
989 pub async fn properties(&self) -> anyhow::Result<WalletProperties> {
991 let properties = self.db.read_properties().await?.context("Wallet is not initialised")?;
992 Ok(properties)
993 }
994
995 pub fn fingerprint(&self) -> Fingerprint {
997 self.seed.fingerprint()
998 }
999
1000 async fn require_server(&self) -> anyhow::Result<(ServerConnection, ArkInfo)> {
1001 let conn = self.server.read().clone()
1002 .context("You should be connected to Ark server to perform this action")?;
1003 let ark_info = conn.ark_info().await?;
1004
1005 if let Some(stored_pubkey) = self.properties().await?.server_pubkey {
1007 if stored_pubkey != ark_info.server_pubkey {
1008 log_server_pubkey_changed_error(stored_pubkey, ark_info.server_pubkey);
1009 bail!("Server public key has changed. You should exit all your VTXOs!");
1010 }
1011 } else {
1012 self.db.set_server_pubkey(ark_info.server_pubkey).await?;
1014 info!("Stored server pubkey for existing wallet: {}", ark_info.server_pubkey);
1015 }
1016
1017 Ok((conn, ark_info))
1018 }
1019
1020 pub async fn refresh_server(&self) -> anyhow::Result<()> {
1021 let server = self.server.read().clone();
1022 let properties = self.properties().await?;
1023
1024 let srv = if let Some(srv) = server {
1025 srv.check_connection().await?;
1026 let ark_info = srv.ark_info().await?;
1027 ark_info.fees.validate().context("invalid fee schedule")?;
1028
1029 if let Some(stored_pubkey) = properties.server_pubkey {
1031 if stored_pubkey != ark_info.server_pubkey {
1032 log_server_pubkey_changed_error(stored_pubkey, ark_info.server_pubkey);
1033 bail!("Server public key has changed. You should exit all your VTXOs!");
1034 }
1035 } else {
1036 self.db.set_server_pubkey(ark_info.server_pubkey).await?;
1038 info!("Stored server pubkey for existing wallet: {}", ark_info.server_pubkey);
1039 }
1040
1041 srv
1042 } else {
1043 let srv_address = &self.config.server_address;
1044 let network = properties.network;
1045
1046 let conn = ServerConnection::connect(srv_address, network).await?;
1047 let ark_info = conn.ark_info().await?;
1048 ark_info.fees.validate().context("invalid fee schedule")?;
1049
1050 if let Some(stored_pubkey) = properties.server_pubkey {
1052 if stored_pubkey != ark_info.server_pubkey {
1053 log_server_pubkey_changed_error(stored_pubkey, ark_info.server_pubkey);
1054 bail!("Server public key has changed. You should exit all your VTXOs!");
1055 }
1056 } else {
1057 self.db.set_server_pubkey(ark_info.server_pubkey).await?;
1059 info!("Stored server pubkey for existing wallet: {}", ark_info.server_pubkey);
1060 }
1061
1062 conn
1063 };
1064
1065 let _ = self.server.write().insert(srv);
1066
1067 Ok(())
1068 }
1069
1070 pub async fn ark_info(&self) -> anyhow::Result<Option<ArkInfo>> {
1072 let server = self.server.read().clone();
1073 match server.as_ref() {
1074 Some(srv) => Ok(Some(srv.ark_info().await?)),
1075 _ => Ok(None),
1076 }
1077 }
1078
1079 pub async fn balance(&self) -> anyhow::Result<Balance> {
1083 let vtxos = self.vtxos().await?;
1084
1085 let spendable = {
1086 let mut v = vtxos.iter().collect();
1087 VtxoStateKind::Spendable.filter_vtxos(&mut v).await?;
1088 v.into_iter().map(|v| v.amount()).sum::<Amount>()
1089 };
1090
1091 let pending_lightning_send = self.pending_lightning_send_vtxos().await?.iter()
1092 .map(|v| v.amount())
1093 .sum::<Amount>();
1094
1095 let claimable_lightning_receive = self.claimable_lightning_receive_balance().await?;
1096
1097 let pending_board = self.pending_board_vtxos().await?.iter()
1098 .map(|v| v.amount())
1099 .sum::<Amount>();
1100
1101 let pending_in_round = self.pending_round_balance().await?;
1102
1103 let pending_exit = self.exit.try_read().ok().map(|e| e.pending_total());
1104
1105 Ok(Balance {
1106 spendable,
1107 pending_in_round,
1108 pending_lightning_send,
1109 claimable_lightning_receive,
1110 pending_exit,
1111 pending_board,
1112 })
1113 }
1114
1115 pub async fn validate_vtxo(&self, vtxo: &Vtxo<Full>) -> anyhow::Result<()> {
1117 let tx = self.chain.get_tx(&vtxo.chain_anchor().txid).await
1118 .context("could not fetch chain tx")?;
1119
1120 let tx = tx.with_context(|| {
1121 format!("vtxo chain anchor not found for vtxo: {}", vtxo.chain_anchor().txid)
1122 })?;
1123
1124 vtxo.validate(&tx)?;
1125
1126 Ok(())
1127 }
1128
1129 pub async fn import_vtxo(&self, vtxo: &Vtxo<Full>) -> anyhow::Result<()> {
1139 if self.db.get_wallet_vtxo(vtxo.id()).await?.is_some() {
1140 info!("VTXO {} already exists in wallet, skipping import", vtxo.id());
1141 return Ok(());
1142 }
1143
1144 self.validate_vtxo(vtxo).await.context("VTXO validation failed")?;
1145
1146 if self.find_signable_clause(vtxo).await.is_none() {
1147 bail!("VTXO {} is not owned by this wallet (no signable clause found)", vtxo.id());
1148 }
1149
1150 let current_height = self.chain.tip().await?;
1151 if vtxo.expiry_height() <= current_height {
1152 bail!("Vtxo {} has expired", vtxo.id());
1153 }
1154
1155 self.store_spendable_vtxos([vtxo]).await.context("failed to store imported VTXO")?;
1156
1157 info!("Successfully imported VTXO {}", vtxo.id());
1158 Ok(())
1159 }
1160
1161 pub async fn get_vtxo_by_id(&self, vtxo_id: VtxoId) -> anyhow::Result<WalletVtxo> {
1163 let vtxo = self.db.get_wallet_vtxo(vtxo_id).await
1164 .with_context(|| format!("Error when querying vtxo {} in database", vtxo_id))?
1165 .with_context(|| format!("The VTXO with id {} cannot be found", vtxo_id))?;
1166 Ok(vtxo)
1167 }
1168
1169 #[deprecated(since="0.1.0-beta.5", note = "Use Wallet::history instead")]
1171 pub async fn movements(&self) -> anyhow::Result<Vec<Movement>> {
1172 self.history().await
1173 }
1174
1175 pub async fn history(&self) -> anyhow::Result<Vec<Movement>> {
1177 Ok(self.db.get_all_movements().await?)
1178 }
1179
1180 pub async fn all_vtxos(&self) -> anyhow::Result<Vec<WalletVtxo>> {
1182 Ok(self.db.get_all_vtxos().await?)
1183 }
1184
1185 pub async fn vtxos(&self) -> anyhow::Result<Vec<WalletVtxo>> {
1187 Ok(self.db.get_vtxos_by_state(&VtxoStateKind::UNSPENT_STATES).await?)
1188 }
1189
1190 pub async fn vtxos_with(&self, filter: &impl FilterVtxos) -> anyhow::Result<Vec<WalletVtxo>> {
1192 let mut vtxos = self.vtxos().await?;
1193 filter.filter_vtxos(&mut vtxos).await.context("error filtering vtxos")?;
1194 Ok(vtxos)
1195 }
1196
1197 pub async fn spendable_vtxos(&self) -> anyhow::Result<Vec<WalletVtxo>> {
1199 Ok(self.vtxos_with(&VtxoStateKind::Spendable).await?)
1200 }
1201
1202 pub async fn spendable_vtxos_with(
1204 &self,
1205 filter: &impl FilterVtxos,
1206 ) -> anyhow::Result<Vec<WalletVtxo>> {
1207 let mut vtxos = self.spendable_vtxos().await?;
1208 filter.filter_vtxos(&mut vtxos).await.context("error filtering vtxos")?;
1209 Ok(vtxos)
1210 }
1211
1212 pub async fn get_expiring_vtxos(
1214 &self,
1215 threshold: BlockHeight,
1216 ) -> anyhow::Result<Vec<WalletVtxo>> {
1217 let expiry = self.chain.tip().await? + threshold;
1218 let filter = VtxoFilter::new(&self).expires_before(expiry);
1219 Ok(self.spendable_vtxos_with(&filter).await?)
1220 }
1221
1222 pub async fn sync_pending_offboards(&self) -> anyhow::Result<()> {
1228 let pending_offboards: Vec<PendingOffboard> = self.db.get_pending_offboards().await?;
1229
1230 if pending_offboards.is_empty() {
1231 return Ok(());
1232 }
1233
1234 let current_height = self.chain.tip().await?;
1235 let required_confs = self.config.offboard_required_confirmations;
1236
1237 trace!("Checking {} pending offboard transaction(s)", pending_offboards.len());
1238
1239 for pending in pending_offboards {
1240 let status = self.chain.tx_status(pending.offboard_txid).await;
1241
1242 match status {
1243 Ok(TxStatus::Confirmed(block_ref)) => {
1244 let confs = current_height - (block_ref.height - 1);
1245 if confs < required_confs as BlockHeight {
1246 trace!(
1247 "Offboard tx {} has {}/{} confirmations, waiting...",
1248 pending.offboard_txid, confs, required_confs,
1249 );
1250 continue;
1251 }
1252
1253 info!(
1254 "Offboard tx {} confirmed, finalizing movement {}",
1255 pending.offboard_txid, pending.movement_id,
1256 );
1257
1258 for vtxo_id in &pending.vtxo_ids {
1260 if let Err(e) = self.db.update_vtxo_state_checked(
1261 *vtxo_id,
1262 VtxoState::Spent,
1263 &[VtxoStateKind::Locked],
1264 ).await {
1265 warn!("Failed to mark vtxo {} as spent: {:#}", vtxo_id, e);
1266 }
1267 }
1268
1269 if let Err(e) = self.movements.finish_movement(
1271 pending.movement_id,
1272 MovementStatus::Successful,
1273 ).await {
1274 warn!("Failed to finish movement {}: {:#}", pending.movement_id, e);
1275 }
1276
1277 self.db.remove_pending_offboard(pending.movement_id).await?;
1278 }
1279 Ok(TxStatus::Mempool) => {
1280 if required_confs == 0 {
1281 info!(
1282 "Offboard tx {} in mempool with 0 required confirmations, \
1283 finalizing movement {}",
1284 pending.offboard_txid, pending.movement_id,
1285 );
1286
1287 for vtxo_id in &pending.vtxo_ids {
1289 if let Err(e) = self.db.update_vtxo_state_checked(
1290 *vtxo_id,
1291 VtxoState::Spent,
1292 &[VtxoStateKind::Locked],
1293 ).await {
1294 warn!("Failed to mark vtxo {} as spent: {:#}", vtxo_id, e);
1295 }
1296 }
1297
1298 if let Err(e) = self.movements.finish_movement(
1300 pending.movement_id,
1301 MovementStatus::Successful,
1302 ).await {
1303 warn!("Failed to finish movement {}: {:#}", pending.movement_id, e);
1304 }
1305
1306 self.db.remove_pending_offboard(pending.movement_id).await?;
1307 } else {
1308 trace!(
1309 "Offboard tx {} still in mempool, waiting...",
1310 pending.offboard_txid,
1311 );
1312 }
1313 }
1314 Ok(TxStatus::NotFound) => {
1315 let age = chrono::Local::now() - pending.created_at;
1319 if age < chrono::Duration::hours(1) {
1320 trace!(
1321 "Offboard tx {} not found, but only {} minutes old — waiting...",
1322 pending.offboard_txid, age.num_minutes(),
1323 );
1324 continue;
1325 }
1326
1327 warn!(
1328 "Offboard tx {} not found after {} minutes, canceling movement {}",
1329 pending.offboard_txid, age.num_minutes(), pending.movement_id,
1330 );
1331
1332 for vtxo_id in &pending.vtxo_ids {
1334 if let Err(e) = self.db.update_vtxo_state_checked(
1335 *vtxo_id,
1336 VtxoState::Spendable,
1337 &[VtxoStateKind::Locked],
1338 ).await {
1339 warn!("Failed to restore vtxo {} to spendable: {:#}", vtxo_id, e);
1340 }
1341 }
1342
1343 if let Err(e) = self.movements.finish_movement(
1345 pending.movement_id,
1346 MovementStatus::Failed,
1347 ).await {
1348 warn!("Failed to fail movement {}: {:#}", pending.movement_id, e);
1349 }
1350
1351 self.db.remove_pending_offboard(pending.movement_id).await?;
1352 }
1353 Err(e) => {
1354 warn!(
1355 "Failed to check status of offboard tx {}: {:#}",
1356 pending.offboard_txid, e,
1357 );
1358 }
1359 }
1360 }
1361
1362 Ok(())
1363 }
1364
1365 pub async fn maintenance(&self) -> anyhow::Result<()> {
1371 info!("Starting wallet maintenance in interactive mode");
1372 self.sync().await;
1373
1374 let rounds = self.progress_pending_rounds(None).await;
1375 if let Err(e) = rounds.as_ref() {
1376 warn!("Error progressing pending rounds: {:#}", e);
1377 }
1378 let refresh = self.maintenance_refresh().await;
1379 if let Err(e) = refresh.as_ref() {
1380 warn!("Error refreshing VTXOs: {:#}", e);
1381 }
1382 if rounds.is_err() || refresh.is_err() {
1383 bail!("Maintenance encountered errors.\nprogress_rounds: {:#?}\nrefresh: {:#?}", rounds, refresh);
1384 }
1385 Ok(())
1386 }
1387
1388 pub async fn maintenance_delegated(&self) -> anyhow::Result<()> {
1394 info!("Starting wallet maintenance in delegated mode");
1395 self.sync().await;
1396 let rounds = self.progress_pending_rounds(None).await;
1397 if let Err(e) = rounds.as_ref() {
1398 warn!("Error progressing pending rounds: {:#}", e);
1399 }
1400 let refresh = self.maybe_schedule_maintenance_refresh_delegated().await;
1401 if let Err(e) = refresh.as_ref() {
1402 warn!("Error refreshing VTXOs: {:#}", e);
1403 }
1404 if rounds.is_err() || refresh.is_err() {
1405 bail!("Delegated maintenance encountered errors.\nprogress_rounds: {:#?}\nrefresh: {:#?}", rounds, refresh);
1406 }
1407 Ok(())
1408 }
1409
1410 pub async fn maintenance_with_onchain<W: PreparePsbt + SignPsbt + ExitUnilaterally>(
1418 &self,
1419 onchain: &mut W,
1420 ) -> anyhow::Result<()> {
1421 info!("Starting wallet maintenance in interactive mode with onchain wallet");
1422
1423 let maintenance = self.maintenance().await;
1425
1426 let exit_sync = self.sync_exits(onchain).await;
1428 if let Err(e) = exit_sync.as_ref() {
1429 warn!("Error syncing exits: {:#}", e);
1430 }
1431 let exit_progress = self.exit.write().await.progress_exits(&self, onchain, None).await;
1432 if let Err(e) = exit_progress.as_ref() {
1433 warn!("Error progressing exits: {:#}", e);
1434 }
1435 if maintenance.is_err() || exit_sync.is_err() || exit_progress.is_err() {
1436 bail!("Maintenance encountered errors.\nmaintenance: {:#?}\nexit_sync: {:#?}\nexit_progress: {:#?}", maintenance, exit_sync, exit_progress);
1437 }
1438 Ok(())
1439 }
1440
1441 pub async fn maintenance_with_onchain_delegated<W: PreparePsbt + SignPsbt + ExitUnilaterally>(
1448 &self,
1449 onchain: &mut W,
1450 ) -> anyhow::Result<()> {
1451 info!("Starting wallet maintenance in delegated mode with onchain wallet");
1452
1453 let maintenance = self.maintenance_delegated().await;
1455
1456 let exit_sync = self.sync_exits(onchain).await;
1458 if let Err(e) = exit_sync.as_ref() {
1459 warn!("Error syncing exits: {:#}", e);
1460 }
1461 let exit_progress = self.exit.write().await.progress_exits(&self, onchain, None).await;
1462 if let Err(e) = exit_progress.as_ref() {
1463 warn!("Error progressing exits: {:#}", e);
1464 }
1465 if maintenance.is_err() || exit_sync.is_err() || exit_progress.is_err() {
1466 bail!("Delegated maintenance encountered errors.\nmaintenance: {:#?}\nexit_sync: {:#?}\nexit_progress: {:#?}", maintenance, exit_sync, exit_progress);
1467 }
1468 Ok(())
1469 }
1470
1471 pub async fn maybe_schedule_maintenance_refresh(&self) -> anyhow::Result<Option<RoundStateId>> {
1479 let vtxos = self.get_vtxos_to_refresh().await?;
1480 if vtxos.len() == 0 {
1481 return Ok(None);
1482 }
1483
1484 let participation = match self.build_refresh_participation(vtxos).await? {
1485 Some(participation) => participation,
1486 None => return Ok(None),
1487 };
1488
1489 info!("Scheduling maintenance refresh ({} vtxos)", participation.inputs.len());
1490 let state = self.join_next_round(participation, Some(RoundMovement::Refresh)).await?;
1491 Ok(Some(state.id()))
1492 }
1493
1494 pub async fn maybe_schedule_maintenance_refresh_delegated(
1502 &self,
1503 ) -> anyhow::Result<Option<RoundStateId>> {
1504 let vtxos = self.get_vtxos_to_refresh().await?;
1505 if vtxos.len() == 0 {
1506 return Ok(None);
1507 }
1508
1509 let participation = match self.build_refresh_participation(vtxos).await? {
1510 Some(participation) => participation,
1511 None => return Ok(None),
1512 };
1513
1514 info!("Scheduling delegated maintenance refresh ({} vtxos)", participation.inputs.len());
1515 let state = self.join_next_round_delegated(participation, Some(RoundMovement::Refresh)).await?;
1516 Ok(Some(state.id()))
1517 }
1518
1519 pub async fn maintenance_refresh(&self) -> anyhow::Result<Option<RoundStatus>> {
1527 let vtxos = self.get_vtxos_to_refresh().await?;
1528 if vtxos.len() == 0 {
1529 return Ok(None);
1530 }
1531
1532 info!("Performing maintenance refresh");
1533 self.refresh_vtxos(vtxos).await
1534 }
1535
1536 pub async fn sync(&self) {
1542 futures::join!(
1543 async {
1544 if let Err(e) = self.chain.update_fee_rates(self.config.fallback_fee_rate).await {
1547 warn!("Error updating fee rates: {:#}", e);
1548 }
1549 },
1550 async {
1551 if let Err(e) = self.sync_mailbox().await {
1552 warn!("Error in mailbox sync: {:#}", e);
1553 }
1554 },
1555 async {
1556 if let Err(e) = self.sync_pending_rounds().await {
1557 warn!("Error while trying to progress rounds awaiting confirmations: {:#}", e);
1558 }
1559 },
1560 async {
1561 if let Err(e) = self.sync_pending_lightning_send_vtxos().await {
1562 warn!("Error syncing pending lightning payments: {:#}", e);
1563 }
1564 },
1565 async {
1566 if let Err(e) = self.try_claim_all_lightning_receives(false).await {
1567 warn!("Error claiming pending lightning receives: {:#}", e);
1568 }
1569 },
1570 async {
1571 if let Err(e) = self.sync_pending_boards().await {
1572 warn!("Error syncing pending boards: {:#}", e);
1573 }
1574 },
1575 async {
1576 if let Err(e) = self.sync_pending_offboards().await {
1577 warn!("Error syncing pending offboards: {:#}", e);
1578 }
1579 }
1580 );
1581 }
1582
1583 pub async fn sync_exits(
1589 &self,
1590 onchain: &mut dyn ExitUnilaterally,
1591 ) -> anyhow::Result<()> {
1592 self.exit.write().await.sync(&self, onchain).await?;
1593 Ok(())
1594 }
1595
1596 pub async fn dangerous_drop_vtxo(&self, vtxo_id: VtxoId) -> anyhow::Result<()> {
1599 warn!("Drop vtxo {} from the database", vtxo_id);
1600 self.db.remove_vtxo(vtxo_id).await?;
1601 Ok(())
1602 }
1603
1604 pub async fn dangerous_drop_all_vtxos(&self) -> anyhow::Result<()> {
1607 warn!("Dropping all vtxos from the db...");
1608 for vtxo in self.vtxos().await? {
1609 self.db.remove_vtxo(vtxo.id()).await?;
1610 }
1611
1612 self.exit.write().await.dangerous_clear_exit().await?;
1613 Ok(())
1614 }
1615
1616 async fn has_counterparty_risk(&self, vtxo: &Vtxo<Full>) -> anyhow::Result<bool> {
1621 for past_pks in vtxo.past_arkoor_pubkeys() {
1622 let mut owns_any = false;
1623 for past_pk in past_pks {
1624 if self.db.get_public_key_idx(&past_pk).await?.is_some() {
1625 owns_any = true;
1626 break;
1627 }
1628 }
1629 if !owns_any {
1630 return Ok(true);
1631 }
1632 }
1633
1634 let my_clause = self.find_signable_clause(vtxo).await;
1635 Ok(!my_clause.is_some())
1636 }
1637
1638 async fn add_should_refresh_vtxos(
1644 &self,
1645 participation: &mut RoundParticipation,
1646 ) -> anyhow::Result<()> {
1647 let tip = self.chain.tip().await?;
1650 let mut vtxos_to_refresh = self.spendable_vtxos_with(
1651 &RefreshStrategy::should_refresh(self, tip, self.chain.fee_rates().await.fast),
1652 ).await?;
1653 if vtxos_to_refresh.is_empty() {
1654 return Ok(());
1655 }
1656
1657 let excluded_ids = participation.inputs.iter().map(|v| v.vtxo_id())
1658 .collect::<HashSet<_>>();
1659 let mut total_amount = Amount::ZERO;
1660 for i in (0..vtxos_to_refresh.len()).rev() {
1661 let vtxo = &vtxos_to_refresh[i];
1662 if excluded_ids.contains(&vtxo.id()) {
1663 vtxos_to_refresh.swap_remove(i);
1664 continue;
1665 }
1666 total_amount += vtxo.amount();
1667 }
1668 if vtxos_to_refresh.is_empty() {
1669 return Ok(());
1671 }
1672
1673 let (_, ark_info) = self.require_server().await?;
1676 let fee = ark_info.fees.refresh.calculate_no_base_fee(
1677 vtxos_to_refresh.iter().map(|wv| VtxoFeeInfo::from_vtxo_and_tip(&wv.vtxo, tip)),
1678 ).context("fee overflowed")?;
1679
1680 let output_amount = match validate_and_subtract_fee_min_dust(total_amount, fee) {
1682 Ok(amount) => amount,
1683 Err(e) => {
1684 trace!("Cannot add should-refresh VTXOs: {}", e);
1685 return Ok(());
1686 },
1687 };
1688 info!(
1689 "Adding {} extra VTXOs to round participation total = {}, fee = {}, output = {}",
1690 vtxos_to_refresh.len(), total_amount, fee, output_amount,
1691 );
1692 let (user_keypair, _) = self.derive_store_next_keypair().await?;
1693 let req = VtxoRequest {
1694 policy: VtxoPolicy::new_pubkey(user_keypair.public_key()),
1695 amount: output_amount,
1696 };
1697 participation.inputs.reserve(vtxos_to_refresh.len());
1698 participation.inputs.extend(vtxos_to_refresh.into_iter().map(|wv| wv.vtxo));
1699 participation.outputs.push(req);
1700
1701 Ok(())
1702 }
1703
1704 pub async fn build_refresh_participation<V: VtxoRef>(
1705 &self,
1706 vtxos: impl IntoIterator<Item = V>,
1707 ) -> anyhow::Result<Option<RoundParticipation>> {
1708 let (vtxos, total_amount) = {
1709 let iter = vtxos.into_iter();
1710 let size_hint = iter.size_hint();
1711 let mut vtxos = Vec::<Vtxo<Full>>::with_capacity(size_hint.1.unwrap_or(size_hint.0));
1712 let mut amount = Amount::ZERO;
1713 for vref in iter {
1714 let id = vref.vtxo_id();
1719 if vtxos.iter().any(|v| v.id() == id) {
1720 bail!("duplicate VTXO id: {}", id);
1721 }
1722 let vtxo = if let Some(vtxo) = vref.into_full_vtxo() {
1723 vtxo
1724 } else {
1725 self.get_vtxo_by_id(id).await
1726 .with_context(|| format!("vtxo with id {} not found", id))?.vtxo
1727 };
1728 amount += vtxo.amount();
1729 vtxos.push(vtxo);
1730 }
1731 (vtxos, amount)
1732 };
1733
1734 if vtxos.is_empty() {
1735 info!("Skipping refresh since no VTXOs are provided.");
1736 return Ok(None);
1737 }
1738 ensure!(total_amount >= P2TR_DUST,
1739 "vtxo amount must be at least {} to participate in a round",
1740 P2TR_DUST,
1741 );
1742
1743 let (_, ark_info) = self.require_server().await?;
1745 let current_height = self.chain.tip().await?;
1746 let vtxo_fee_infos = vtxos.iter()
1747 .map(|v| VtxoFeeInfo::from_vtxo_and_tip(v, current_height));
1748 let fee = ark_info.fees.refresh.calculate(vtxo_fee_infos).context("fee overflowed")?;
1749 let output_amount = validate_and_subtract_fee_min_dust(total_amount, fee)?;
1750
1751 info!("Refreshing {} VTXOs (total amount = {}, fee = {}, output = {}).",
1752 vtxos.len(), total_amount, fee, output_amount,
1753 );
1754 let (user_keypair, _) = self.derive_store_next_keypair().await?;
1755 let req = VtxoRequest {
1756 policy: VtxoPolicy::Pubkey(PubkeyVtxoPolicy { user_pubkey: user_keypair.public_key() }),
1757 amount: output_amount,
1758 };
1759
1760 Ok(Some(RoundParticipation {
1761 inputs: vtxos,
1762 outputs: vec![req],
1763 }))
1764 }
1765
1766 pub async fn refresh_vtxos<V: VtxoRef>(
1771 &self,
1772 vtxos: impl IntoIterator<Item = V>,
1773 ) -> anyhow::Result<Option<RoundStatus>> {
1774 let mut participation = match self.build_refresh_participation(vtxos).await? {
1775 Some(participation) => participation,
1776 None => return Ok(None),
1777 };
1778
1779 if let Err(e) = self.add_should_refresh_vtxos(&mut participation).await {
1780 warn!("Error trying to add additional VTXOs that should be refreshed: {:#}", e);
1781 }
1782
1783 Ok(Some(self.participate_round(participation, Some(RoundMovement::Refresh)).await?))
1784 }
1785
1786 pub async fn refresh_vtxos_delegated<V: VtxoRef>(
1792 &self,
1793 vtxos: impl IntoIterator<Item = V>,
1794 ) -> anyhow::Result<Option<StoredRoundState<Unlocked>>> {
1795 let mut part = match self.build_refresh_participation(vtxos).await? {
1796 Some(participation) => participation,
1797 None => return Ok(None),
1798 };
1799
1800 if let Err(e) = self.add_should_refresh_vtxos(&mut part).await {
1801 warn!("Error trying to add additional VTXOs that should be refreshed: {:#}", e);
1802 }
1803
1804 Ok(Some(self.join_next_round_delegated(part, Some(RoundMovement::Refresh)).await?))
1805 }
1806
1807 pub async fn get_vtxos_to_refresh(&self) -> anyhow::Result<Vec<WalletVtxo>> {
1810 let vtxos = self.spendable_vtxos_with(&RefreshStrategy::should_refresh_if_must(
1811 self,
1812 self.chain.tip().await?,
1813 self.chain.fee_rates().await.fast,
1814 )).await?;
1815 Ok(vtxos)
1816 }
1817
1818 pub async fn get_first_expiring_vtxo_blockheight(
1820 &self,
1821 ) -> anyhow::Result<Option<BlockHeight>> {
1822 Ok(self.spendable_vtxos().await?.iter().map(|v| v.expiry_height()).min())
1823 }
1824
1825 pub async fn get_next_required_refresh_blockheight(
1828 &self,
1829 ) -> anyhow::Result<Option<BlockHeight>> {
1830 let first_expiry = self.get_first_expiring_vtxo_blockheight().await?;
1831 Ok(first_expiry.map(|h| h.saturating_sub(self.config.vtxo_refresh_expiry_threshold)))
1832 }
1833
1834 async fn select_vtxos_to_cover(
1840 &self,
1841 amount: Amount,
1842 ) -> anyhow::Result<Vec<WalletVtxo>> {
1843 let mut vtxos = self.spendable_vtxos().await?;
1844 vtxos.sort_by_key(|v| v.expiry_height());
1845
1846 let mut result = Vec::new();
1848 let mut total_amount = Amount::ZERO;
1849 for input in vtxos {
1850 total_amount += input.amount();
1851 result.push(input);
1852
1853 if total_amount >= amount {
1854 return Ok(result)
1855 }
1856 }
1857
1858 bail!("Insufficient money available. Needed {} but {} is available",
1859 amount, total_amount,
1860 );
1861 }
1862
1863 async fn select_vtxos_to_cover_with_fee<F>(
1869 &self,
1870 amount: Amount,
1871 calc_fee: F,
1872 ) -> anyhow::Result<(Vec<WalletVtxo>, Amount)>
1873 where
1874 F: for<'a> Fn(
1875 Amount, std::iter::Copied<std::slice::Iter<'a, VtxoFeeInfo>>,
1876 ) -> anyhow::Result<Amount>,
1877 {
1878 let tip = self.chain.tip().await?;
1879
1880 const MAX_ITERATIONS: usize = 100;
1883 let mut fee = Amount::ZERO;
1884 let mut fee_info = Vec::new();
1885 for _ in 0..MAX_ITERATIONS {
1886 let required = amount.checked_add(fee)
1887 .context("Amount + fee overflow")?;
1888
1889 let vtxos = self.select_vtxos_to_cover(required).await
1890 .context("Could not find enough suitable VTXOs to cover payment + fees")?;
1891
1892 fee_info.reserve(vtxos.len());
1893 let mut vtxo_amount = Amount::ZERO;
1894 for vtxo in &vtxos {
1895 vtxo_amount += vtxo.amount();
1896 fee_info.push(VtxoFeeInfo::from_vtxo_and_tip(vtxo, tip));
1897 }
1898
1899 fee = calc_fee(amount, fee_info.iter().copied())?;
1900 if amount + fee <= vtxo_amount {
1901 trace!("Selected vtxos to cover amount + fee: amount = {}, fee = {}, total inputs = {}",
1902 amount, fee, vtxo_amount,
1903 );
1904 return Ok((vtxos, fee));
1905 }
1906 trace!("VTXO sum of {} did not exceed amount {} and fee {}, iterating again",
1907 vtxo_amount, amount, fee,
1908 );
1909 fee_info.clear();
1910 }
1911 bail!("Fee calculation did not converge after maximum iterations")
1912 }
1913
1914 pub async fn run_daemon(
1920 self: &Arc<Self>,
1921 onchain: Arc<RwLock<dyn DaemonizableOnchainWallet>>,
1922 ) -> anyhow::Result<DaemonHandle> {
1923 Ok(crate::daemon::start_daemon(self.clone(), onchain))
1926 }
1927
1928 pub async fn register_vtxos_with_server(
1932 &self,
1933 vtxos: &[impl AsRef<Vtxo<Full>>],
1934 ) -> anyhow::Result<()> {
1935 if vtxos.is_empty() {
1936 return Ok(());
1937 }
1938
1939 let (mut srv, _) = self.require_server().await?;
1940 srv.client.register_vtxos(protos::RegisterVtxosRequest {
1941 vtxos: vtxos.iter().map(|v| v.as_ref().serialize()).collect(),
1942 }).await.context("failed to register vtxos")?;
1943
1944 Ok(())
1945 }
1946}