1use std::{cmp, iter};
6use std::borrow::Cow;
7use std::convert::Infallible;
8use std::sync::Arc;
9use std::time::{Duration, SystemTime};
10use std::collections::{HashMap, HashSet};
11
12use anyhow::Context;
13use bdk_esplora::esplora_client::Amount;
14use bip39::rand;
15use bitcoin::consensus::encode::{serialize_hex, deserialize};
16use bitcoin::key::Keypair;
17use bitcoin::secp256k1::{schnorr, PublicKey};
18use bitcoin::{Address, Network, OutPoint, Transaction, Txid};
19use bitcoin::consensus::Params;
20use bitcoin::hashes::Hash;
21use futures::future::try_join_all;
22use futures::{Stream, StreamExt};
23use log::{debug, error, info, trace, warn};
24
25use ark::{OffboardRequest, ProtocolEncoding, SignedVtxoRequest, Vtxo, VtxoId, VtxoRequest};
26use ark::connectors::ConnectorChain;
27use ark::musig::{self, DangerousSecretNonce, PublicNonce, SecretNonce};
28use ark::rounds::{
29 RoundAttempt, RoundEvent, RoundFinished, RoundId, RoundSeq, MIN_ROUND_TX_OUTPUTS, ROUND_TX_CONNECTOR_VOUT, ROUND_TX_VTXO_TREE_VOUT
30};
31use ark::tree::signed::{SignedVtxoTreeSpec, VtxoTreeSpec};
32use bitcoin_ext::{TxStatus, P2TR_DUST};
33use bitcoin_ext::rpc::RpcApi;
34use server_rpc::protos;
35
36use crate::{SECP, Wallet};
37use crate::movement::{MovementDestination, MovementId, MovementStatus};
38use crate::movement::update::MovementUpdate;
39use crate::onchain::{ChainSource, ChainSourceClient};
40use crate::persist::{RoundStateId, StoredRoundState};
41use crate::subsystem::{BarkSubsystem, RoundMovement};
42
43const BLOCK_TIME: Duration = Duration::from_secs(10 * 60);
45
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct RoundParticipation {
50 #[serde(with = "ark::encode::serde::vec")]
51 pub inputs: Vec<Vtxo>,
52 pub outputs: Vec<VtxoRequest>,
55 pub offboards: Vec<OffboardRequest>,
56}
57
58impl RoundParticipation {
59 pub fn to_movement_update(&self, network: Network) -> anyhow::Result<MovementUpdate> {
60 let params = Params::from(network);
61 let input_amount = self.inputs.iter().map(|i| i.amount()).sum::<Amount>();
62 let output_amount = self.outputs.iter().map(|r| r.amount).sum::<Amount>();
63 let offboard_amount = self.offboards.iter().map(|r| r.amount).sum::<Amount>();
64 let fee = input_amount - output_amount - offboard_amount;
65 let intended = -offboard_amount.to_signed()?;
66 let mut sent_to = Vec::with_capacity(self.offboards.len());
67 for o in &self.offboards {
68 let address = Address::from_script(&o.script_pubkey, ¶ms)?;
69 sent_to.push(MovementDestination::new(address.to_string(), o.amount));
70 }
71 Ok(MovementUpdate::new()
72 .consumed_vtxos(&self.inputs)
73 .intended_balance(intended)
74 .effective_balance(intended - fee.to_signed()?)
75 .fee(fee)
76 .sent_to(sent_to)
77 )
78 }
79}
80
81#[derive(Debug, Clone)]
82pub enum RoundStatus {
83 Confirmed {
85 funding_txid: Txid,
86 },
87 Unconfirmed {
89 funding_txid: Txid,
90 },
91 Pending {
93 unsigned_funding_txids: Vec<Txid>,
94 },
95 Failed {
97 error: String,
98 },
99 Canceled,
100}
101
102impl RoundStatus {
103 pub fn is_final(&self) -> bool {
105 match self {
106 Self::Confirmed { .. } => true,
107 Self::Unconfirmed { .. } => false,
108 Self::Pending { .. } => false,
109 Self::Failed { .. } => true,
110 Self::Canceled => true,
111 }
112 }
113
114 pub fn is_success(&self) -> bool {
116 match self {
117 Self::Confirmed { .. } => true,
118 Self::Unconfirmed { .. } => true,
119 Self::Pending { .. } => false,
120 Self::Failed { .. } => false,
121 Self::Canceled => false,
122 }
123 }
124}
125
126pub struct RoundState {
136 pub(crate) participation: RoundParticipation,
138
139 pub(crate) flow: RoundFlowState,
141
142 pub(crate) unconfirmed_rounds: Vec<UnconfirmedRound>,
144
145 pub(crate) movement_id: Option<MovementId>,
147}
148
149impl RoundState {
150 fn new(participation: RoundParticipation, movement_id: Option<MovementId>) -> Self {
151 Self {
152 participation,
153 movement_id,
154 flow: RoundFlowState::WaitingToStart,
155 unconfirmed_rounds: Vec::new(),
156 }
157 }
158
159 pub fn participation(&self) -> &RoundParticipation {
161 &self.participation
162 }
163
164 pub fn flow(&self) -> &RoundFlowState {
165 &self.flow
166 }
167
168 pub fn unconfirmed_rounds(&self) -> &[UnconfirmedRound] {
169 &self.unconfirmed_rounds
170 }
171
172 pub fn ongoing_participation(&self) -> bool {
174 match self.flow {
175 RoundFlowState::WaitingToStart => true,
176 RoundFlowState::Ongoing { .. } => true,
177 RoundFlowState::Success => false,
178 RoundFlowState::Failed { .. } => false,
179 RoundFlowState::Canceled => false,
180 }
181 }
182
183 pub async fn try_cancel(&mut self, wallet: &Wallet) -> anyhow::Result<bool> {
186 let ret = match self.flow {
187 RoundFlowState::Canceled => true,
188 RoundFlowState::Success => false,
189 RoundFlowState::WaitingToStart => {
190 self.flow = RoundFlowState::Canceled;
191 true
192 },
193 RoundFlowState::Failed { .. } => self.unconfirmed_rounds().is_empty(),
194 RoundFlowState::Ongoing { .. } => {
195 if self.unconfirmed_rounds().is_empty() {
196 self.flow = RoundFlowState::Canceled;
197 true
198 } else {
199 false
200 }
201 },
202 };
203 if ret {
204 persist_round_failure(wallet, &self.participation, self.movement_id).await
205 .context("failed to persist round failure for cancelation")?;
206 }
207 Ok(ret)
208 }
209
210 async fn try_start_attempt(&mut self, wallet: &Wallet, attempt: &RoundAttempt) {
211 match start_attempt(wallet, &self.participation, attempt).await {
212 Ok(state) => {
213 self.flow = RoundFlowState::Ongoing {
214 round_seq: attempt.round_seq,
215 attempt_seq: attempt.attempt_seq,
216 state: state,
217 };
218 },
219 Err(e) => {
220 self.flow = RoundFlowState::Failed {
221 error: format!("{:#}", e),
222 };
223 },
224 }
225 }
226
227 pub async fn process_event(
229 &mut self,
230 wallet: &Wallet,
231 event: &RoundEvent,
232 ) -> bool {
233 let _: Infallible = match self.flow {
234 RoundFlowState::WaitingToStart => {
235 if let RoundEvent::Attempt(e) = event && e.attempt_seq == 0 {
236 trace!("Joining round attempt {}:{}", e.round_seq, e.attempt_seq);
237 self.try_start_attempt(wallet, e).await;
238 return true;
239 } else {
240 trace!("Ignoring {} event (seq {}:{}), waiting for round to start",
241 event.kind(), event.round_seq(), event.attempt_seq(),
242 );
243 return false;
244 }
245 },
246 RoundFlowState::Ongoing { round_seq, attempt_seq, ref mut state } => {
247 if event.round_seq() > round_seq {
250 self.flow = RoundFlowState::Failed {
253 error: format!("round {} started while we were on {}",
254 event.round_seq(), round_seq,
255 ),
256 };
257 return true;
258 }
259
260 if event.attempt_seq() < attempt_seq {
261 trace!("ignoring replayed message from old attempt");
262 return false;
263 }
264
265 if let RoundEvent::Attempt(e) = event && e.attempt_seq > attempt_seq {
266 trace!("Joining new round attempt {}:{}", e.round_seq, e.attempt_seq);
267 self.try_start_attempt(wallet, e).await;
268 return true;
269 }
270 trace!("Processing event {} for round attempt {}:{} in state {}",
271 event.kind(), round_seq, attempt_seq, state.kind(),
272 );
273
274 let mut updated = false;
275 match progress_attempt(state, wallet, &self.participation, event).await {
276 AttemptProgressResult::NotUpdated => {},
277 AttemptProgressResult::Updated { new_state, new_unconfirmed_round } => {
278 if let Some(r) = new_unconfirmed_round {
279 self.unconfirmed_rounds.push(r);
280 }
281 *state = new_state;
282 updated = true;
283 },
284 AttemptProgressResult::Failed(e) => {
285 self.flow = RoundFlowState::Failed { error: format!("{:#}", e) };
286 updated = true;
287 },
288 AttemptProgressResult::Finished { signed_round_tx, vtxos } => {
289 assert!(!self.unconfirmed_rounds.is_empty());
290
291 let txid = signed_round_tx.compute_txid();
293 if let Some(round) = self.unconfirmed_rounds.iter_mut()
294 .find(|r| r.funding_txid() == txid)
295 {
296 round.funding_tx = signed_round_tx;
297
298 if let Err(e) = persist_round_success(
299 wallet,
300 &self.participation,
301 self.movement_id,
302 &vtxos,
303 &round.funding_tx,
304 ).await {
305 error!("Error while storing succesful round: {:#}", e);
306 }
308
309 self.flow = RoundFlowState::Success;
310 } else {
311 self.flow = RoundFlowState::Failed {
312 error: format!("server sent signed round tx {}, \
313 but we don't have a state for that", txid,),
314 };
315 };
316 updated = true;
317 },
318 }
319 return updated;
320 },
321 RoundFlowState::Success { .. }
322 | RoundFlowState::Failed { .. }
323 | RoundFlowState::Canceled => return false,
324 };
325 }
326
327 pub async fn sync(&mut self, wallet: &Wallet) -> anyhow::Result<RoundStatus> {
331 let mut confirmed_funding_txid = None;
332 let mut idx = 0;
333 while idx < self.unconfirmed_rounds.len() {
334 let round = self.unconfirmed_rounds.get_mut(idx).unwrap();
335
336 let was_signed = round.is_tx_signed();
337 let res = round.sync(wallet).await;
338
339 if !was_signed && round.is_tx_signed() {
344 if let Err(e) = persist_round_success(
345 wallet,
346 &self.participation,
347 self.movement_id,
348 &round.new_vtxos,
349 &round.funding_tx,
350 ).await {
351 error!("Error storing state after seeing signed funding tx: {:#?}", e);
352 idx += 1;
353 continue;
354 }
355 }
356
357 let _: Infallible = match res {
362 Ok(UnconfirmedRoundStatus::Confirmed) => {
363 confirmed_funding_txid = Some(round.funding_txid());
364 idx += 1;
368 continue;
369 },
370 Ok(UnconfirmedRoundStatus::DoubleSpent { double_spender }) => {
371 debug!("Round with round txid {} got double spent by tx {:?}",
372 round.funding_tx.compute_txid(), double_spender,
373 );
374 self.unconfirmed_rounds.swap_remove(idx);
375 continue; },
377 Ok(UnconfirmedRoundStatus::Unconfirmed) => {
378 idx += 1;
379 continue;
380 },
381 Err(e) => {
382 warn!("Error syncing status of unconfirmed round: {:#}", e);
383 trace!("Error syncing status of unconfirmed round: err={:#}; state={:?}",
384 e, round,
385 );
386 idx += 1;
387 continue;
388 }
389 };
390 }
391
392 let status = if let Some(funding_txid) = confirmed_funding_txid {
393 if let Some(movement_id) = self.movement_id {
394 update_funding_txid(funding_txid, movement_id, wallet).await?;
395 wallet.movements.finish_movement(movement_id, MovementStatus::Finished).await?;
396 }
397
398 RoundStatus::Confirmed { funding_txid }
399 } else if self.unconfirmed_rounds.is_empty() {
400 match self.flow {
401 RoundFlowState::WaitingToStart | RoundFlowState::Ongoing { .. } => {
402 RoundStatus::Pending { unsigned_funding_txids: vec![] }
403 }
404 RoundFlowState::Success => {
405 persist_round_failure(wallet, &self.participation, self.movement_id).await
406 .context("failed to persist round failure")?;
407 RoundStatus::Failed {
408 error: "all pending round funding transactions have been double spent".into(),
409 }
410 },
411 RoundFlowState::Failed { ref error } => {
412 persist_round_failure(wallet, &self.participation, self.movement_id).await
413 .context("failed to persist round failure")?;
414 RoundStatus::Failed { error: error.clone() }
415 },
416 RoundFlowState::Canceled => {
417 persist_round_failure(wallet, &self.participation, self.movement_id).await
418 .context("failed to persist round failure")?;
419 RoundStatus::Canceled
420 },
421 }
422 } else if let Some(signed) = self.unconfirmed_rounds.iter().find(|r| r.is_tx_signed()) {
423 let funding_txid = signed.funding_txid();
424 if let Some(movement_id) = self.movement_id {
425 update_funding_txid(funding_txid, movement_id, wallet).await?;
426 }
427
428 RoundStatus::Unconfirmed { funding_txid }
429 } else {
430 RoundStatus::Pending {
431 unsigned_funding_txids: self.unconfirmed_rounds.iter()
432 .map(|r| r.funding_txid())
433 .collect(),
434 }
435 };
436 Ok(status)
437 }
438
439 pub fn output_vtxos(&self) -> Option<&[Vtxo]> {
442 for round in self.unconfirmed_rounds.iter() {
443 if round.is_tx_signed() {
444 return Some(&round.new_vtxos);
445 }
446 }
447 None
448 }
449
450 pub fn locked_pending_inputs(&self) -> &[Vtxo] {
453 if self.unconfirmed_rounds.iter().any(|r| r.is_tx_signed()) {
454 return &[];
456 }
457
458 match self.flow {
459 RoundFlowState::WaitingToStart
460 | RoundFlowState::Ongoing { .. }
461 | RoundFlowState::Success =>
462 {
463 &self.participation.inputs
464 },
465 RoundFlowState::Failed { .. } | RoundFlowState::Canceled => {
466 &[]
468 },
469 }
470 }
471}
472
473pub enum RoundFlowState {
478 WaitingToStart,
479 Ongoing {
480 round_seq: RoundSeq,
481 attempt_seq: usize,
482 state: AttemptState,
483 },
484 Success,
485 Failed {
486 error: String,
487 },
488 Canceled,
489}
490
491pub enum AttemptState {
496 AwaitingAttempt,
497 AwaitingUnsignedVtxoTree {
498 cosign_keys: Vec<Keypair>,
499 secret_nonces: Vec<Vec<DangerousSecretNonce>>,
500 },
501 AwaitingRoundProposal {
502 unsigned_round_tx: Transaction,
503 vtxos_spec: VtxoTreeSpec,
504 },
505 AwaitingFinishedRound {
506 unsigned_round_tx: Transaction,
507 new_vtxos: Vec<Vtxo>,
508 },
509}
510
511impl AttemptState {
512 fn kind(&self) -> &'static str {
514 match self {
515 Self::AwaitingAttempt => "AwaitingAttempt",
516 Self::AwaitingUnsignedVtxoTree { .. } => "AwaitingUnsignedVtxoTree",
517 Self::AwaitingRoundProposal { .. } => "AwaitingRoundProposal",
518 Self::AwaitingFinishedRound { .. } => "AwaitingFinishedRound",
519 }
520 }
521}
522
523enum AttemptProgressResult {
525 Finished {
526 signed_round_tx: Transaction,
527 vtxos: Vec<Vtxo>,
528 },
529 Failed(anyhow::Error),
530 Updated {
536 new_state: AttemptState,
537 new_unconfirmed_round: Option<UnconfirmedRound>,
538 },
539 NotUpdated,
540}
541
542async fn start_attempt(
544 wallet: &Wallet,
545 participation: &RoundParticipation,
546 event: &RoundAttempt,
547) -> anyhow::Result<AttemptState> {
548 let mut srv = wallet.require_server().context("server not available")?;
549 let ark_info = srv.ark_info().await?;
550
551 let cosign_keys = iter::repeat_with(|| Keypair::new(&SECP, &mut rand::thread_rng()))
553 .take(participation.outputs.len())
554 .collect::<Vec<_>>();
555 let vtxo_reqs = participation.outputs.iter().zip(cosign_keys.iter()).map(|(p, ck)| {
556 SignedVtxoRequest { vtxo: p.clone(), cosign_pubkey: Some(ck.public_key()) }
557 }).collect::<Vec<_>>();
558
559 let cosign_nonces = cosign_keys.iter()
562 .map(|key| {
563 let mut secs = Vec::with_capacity(ark_info.nb_round_nonces);
564 let mut pubs = Vec::with_capacity(ark_info.nb_round_nonces);
565 for _ in 0..ark_info.nb_round_nonces {
566 let (s, p) = musig::nonce_pair(key);
567 secs.push(s);
568 pubs.push(p);
569 }
570 (secs, pubs)
571 })
572 .take(vtxo_reqs.len())
573 .collect::<Vec<(Vec<SecretNonce>, Vec<PublicNonce>)>>();
574
575 debug!("Submitting payment request with {} inputs, {} vtxo outputs and {} offboard outputs",
577 participation.inputs.len(), vtxo_reqs.len(), participation.offboards.len(),
578 );
579
580 srv.client.submit_payment(protos::SubmitPaymentRequest {
581 input_vtxos: participation.inputs.iter().map(|vtxo| {
582 let keypair = wallet.get_vtxo_key(&vtxo)
583 .expect("owned vtxo key should be in database");
584
585 protos::InputVtxo {
586 vtxo_id: vtxo.id().to_bytes().to_vec(),
587 ownership_proof: {
588 let sig = event.challenge.sign_with(
589 vtxo.id(), &vtxo_reqs, &participation.offboards, keypair,
590 );
591 sig.serialize().to_vec()
592 },
593 }
594 }).collect(),
595 vtxo_requests: vtxo_reqs.iter().zip(cosign_nonces.iter()).map(|(r, n)| {
596 protos::SignedVtxoRequest {
597 vtxo: Some(protos::VtxoRequest {
598 amount: r.vtxo.amount.to_sat(),
599 policy: r.vtxo.policy.serialize(),
600 }),
601 cosign_pubkey: r.cosign_pubkey.expect("just set").serialize().to_vec(),
602 public_nonces: n.1.iter().map(|n| n.serialize().to_vec()).collect(),
603 }
604 }).collect(),
605 offboard_requests: participation.offboards.iter().map(|r| {
606 protos::OffboardRequest {
607 amount: r.amount.to_sat(),
608 offboard_spk: r.script_pubkey.to_bytes(),
609 }
610 }).collect(),
611 }).await.context("Ark server refused our payment submission")?;
612
613 Ok(AttemptState::AwaitingUnsignedVtxoTree {
614 cosign_keys: cosign_keys,
615 secret_nonces: cosign_nonces.into_iter()
616 .map(|(sec, _pub)| sec.into_iter()
617 .map(DangerousSecretNonce::dangerous_from_secret_nonce)
618 .collect())
619 .collect(),
620 })
621}
622
623async fn progress_attempt(
624 state: &AttemptState,
625 wallet: &Wallet,
626 part: &RoundParticipation,
627 event: &RoundEvent,
628) -> AttemptProgressResult {
629 match (state, event) {
633
634 (
635 AttemptState::AwaitingUnsignedVtxoTree { cosign_keys, secret_nonces },
636 RoundEvent::VtxoProposal(e),
637 ) => {
638 match sign_vtxo_tree(
639 wallet, part, &cosign_keys, &secret_nonces, &e.unsigned_round_tx, &e.vtxos_spec, &e.cosign_agg_nonces,
640 ).await {
641 Ok(()) => {
642 AttemptProgressResult::Updated {
643 new_state: AttemptState::AwaitingRoundProposal {
644 unsigned_round_tx: e.unsigned_round_tx.clone(),
645 vtxos_spec: e.vtxos_spec.clone(),
646 },
647 new_unconfirmed_round: None,
648 }
649 },
650 Err(e) => AttemptProgressResult::Failed(e),
651 }
652 },
653
654 (
655 AttemptState::AwaitingRoundProposal { unsigned_round_tx, vtxos_spec },
656 RoundEvent::RoundProposal(e),
657 ) => {
658 match sign_forfeits(
659 wallet, part, unsigned_round_tx, vtxos_spec, &e.cosign_sigs, &e.forfeit_nonces, e.connector_pubkey,
660 ).await {
661 Ok((new_vtxos, forfeit_sigs)) => {
662 let round = UnconfirmedRound::new(unsigned_round_tx.clone(), new_vtxos.clone());
663 match submit_forfeit_sigs(wallet, forfeit_sigs).await {
664 Ok(()) => AttemptProgressResult::Updated {
665 new_state: AttemptState::AwaitingFinishedRound {
666 unsigned_round_tx: unsigned_round_tx.clone(),
667 new_vtxos: new_vtxos,
668 },
669 new_unconfirmed_round: Some(round),
670 },
671 Err(e) => {
672 warn!("Error sending forfeit sigs to server: {:#}", e);
673 AttemptProgressResult::Updated {
674 new_state: AttemptState::AwaitingAttempt,
675 new_unconfirmed_round: Some(round),
676 }
677 },
678 }
679 },
680 Err(e) => AttemptProgressResult::Failed(e),
681 }
682 },
683
684 (
685 AttemptState::AwaitingFinishedRound { unsigned_round_tx, new_vtxos },
686 RoundEvent::Finished(RoundFinished { signed_round_tx, .. }),
687 ) => {
688 if unsigned_round_tx.compute_txid() != signed_round_tx.compute_txid() {
689 return AttemptProgressResult::Failed(anyhow!(
690 "signed funding tx ({}) doesn't match tx received before ({})",
691 signed_round_tx.compute_txid(), unsigned_round_tx.compute_txid(),
692 ));
693 }
694
695 AttemptProgressResult::Finished {
696 signed_round_tx: signed_round_tx.clone(),
697 vtxos: new_vtxos.clone(),
698 }
699 },
700
701 (state, RoundEvent::Finished(RoundFinished { .. })) => {
703 AttemptProgressResult::Failed(anyhow!(
704 "unexpectedly received a finished round while we were in state {}",
705 state.kind(),
706 ))
707 },
708
709 (state, _) => {
710 trace!("Ignoring round event {} in state {}", event.kind(), state.kind());
711 AttemptProgressResult::NotUpdated
712 },
713 }
714}
715
716async fn sign_vtxo_tree(
717 wallet: &Wallet,
718 participation: &RoundParticipation,
719 cosign_keys: &[Keypair],
720 secret_nonces: &[impl AsRef<[DangerousSecretNonce]>],
721 unsigned_round_tx: &Transaction,
722 vtxo_tree: &VtxoTreeSpec,
723 cosign_agg_nonces: &[musig::AggregatedNonce],
724) -> anyhow::Result<()> {
725 let srv = wallet.require_server().context("server not available")?;
726
727 if unsigned_round_tx.output.len() < MIN_ROUND_TX_OUTPUTS {
728 bail!("server sent round tx with less than 2 outputs: {}",
729 serialize_hex(&unsigned_round_tx),
730 );
731 }
732
733 let vtxos_utxo = OutPoint::new(unsigned_round_tx.compute_txid(), ROUND_TX_VTXO_TREE_VOUT);
734
735 let my_vtxos = participation.outputs.iter().zip(cosign_keys.iter())
736 .map(|(r, k)| SignedVtxoRequest {
737 vtxo: r.clone(),
738 cosign_pubkey: Some(k.public_key()),
739 })
740 .collect::<Vec<_>>();
741
742 {
744 let mut my_vtxos = participation.outputs.iter().collect::<Vec<_>>();
745 for vtxo_req in vtxo_tree.iter_vtxos() {
746 if let Some(i) = my_vtxos.iter().position(|v| {
747 v.policy == vtxo_req.vtxo.policy && v.amount == vtxo_req.vtxo.amount
748 }) {
749 my_vtxos.swap_remove(i);
750 }
751 }
752 if !my_vtxos.is_empty() {
753 bail!("server didn't include all of our vtxos, missing: {:?}", my_vtxos);
754 }
755
756 let mut my_offbs = participation.offboards.to_vec();
757 for offb in unsigned_round_tx.output.iter().skip(2) {
758 if let Some(i) = my_offbs.iter().position(|o| o.to_txout() == *offb) {
759 my_offbs.swap_remove(i);
760 }
761 }
762 if !my_offbs.is_empty() {
763 bail!("server didn't include all of our offboards, missing: {:?}", my_offbs);
764 }
765 }
766
767 let unsigned_vtxos = vtxo_tree.clone().into_unsigned_tree(vtxos_utxo);
769 let iter = my_vtxos.iter().zip(cosign_keys).zip(secret_nonces);
770 let _ = try_join_all(iter.map(|((req, key), sec)| async {
771 let leaf_idx = unsigned_vtxos.spec.leaf_idx_of(req).expect("req included");
772 let secret_nonces = sec.as_ref().iter().map(|s| s.to_sec_nonce()).collect();
773 let part_sigs = unsigned_vtxos.cosign_branch(
774 &cosign_agg_nonces, leaf_idx, key, secret_nonces,
775 ).context("failed to cosign branch: our request not part of tree")?;
776
777 info!("Sending {} partial vtxo cosign signatures for pk {}",
778 part_sigs.len(), key.public_key(),
779 );
780
781 let _ = srv.clone().client.provide_vtxo_signatures(protos::VtxoSignaturesRequest {
782 pubkey: key.public_key().serialize().to_vec(),
783 signatures: part_sigs.iter().map(|s| s.serialize().to_vec()).collect(),
784 }).await.context("error sending vtxo signatures")?;
785 Result::<(), anyhow::Error>::Ok(())
786 })).await.context("error sending VTXO signatures")?;
787
788 Ok(())
789}
790
791async fn sign_forfeits(
793 wallet: &Wallet,
794 participation: &RoundParticipation,
795 unsigned_round_tx: &Transaction,
796 vtxo_tree: &VtxoTreeSpec,
797 vtxo_cosign_sigs: &[schnorr::Signature],
798 forfeit_nonces: &HashMap<VtxoId, Vec<musig::PublicNonce>>,
799 connector_pubkey: PublicKey,
800) -> anyhow::Result<(Vec<Vtxo>, HashMap<VtxoId, Vec<(musig::PublicNonce, musig::PartialSignature)>>)> {
801 let srv = wallet.require_server().context("server not available")?;
802 let ark_info = srv.ark_info().await?;
803
804 let round_txid = unsigned_round_tx.compute_txid();
805 let vtxos_utxo = OutPoint::new(round_txid, ROUND_TX_VTXO_TREE_VOUT);
806 let vtxo_tree = vtxo_tree.clone().into_unsigned_tree(vtxos_utxo);
807
808 if vtxo_tree.verify_cosign_sigs(&vtxo_cosign_sigs).is_err() {
810 bail!("Received incorrect vtxo cosign signatures from server");
812 }
813
814 let signed_vtxos = vtxo_tree.into_signed_tree(vtxo_cosign_sigs.to_vec());
815
816 let conn_txout = unsigned_round_tx.output.get(ROUND_TX_CONNECTOR_VOUT as usize)
818 .expect("checked before");
819 let expected_conn_txout = ConnectorChain::output(forfeit_nonces.len(), connector_pubkey);
820 if *conn_txout != expected_conn_txout {
821 bail!("round tx from server has unexpected connector output: {:?} (expected {:?})",
822 conn_txout, expected_conn_txout,
823 );
824 }
825
826 let conns_utxo = OutPoint::new(round_txid, ROUND_TX_CONNECTOR_VOUT);
827
828 let connectors = ConnectorChain::new(
830 forfeit_nonces.values().next().unwrap().len(),
831 conns_utxo,
832 connector_pubkey,
833 );
834
835 let forfeit_sigs = participation.inputs.iter().map(|vtxo| {
836 let keypair = wallet.get_vtxo_key(&vtxo)?;
837
838 let sigs = connectors.connectors().enumerate().map(|(i, (conn, _))| {
839 let (sighash, _tx) = ark::forfeit::connector_forfeit_sighash_exit(
840 vtxo, conn, connector_pubkey,
841 );
842 let srv_nonce = forfeit_nonces.get(&vtxo.id())
843 .with_context(|| format!("missing srv forfeit nonce for {}", vtxo.id()))?
844 .get(i)
845 .context("srv didn't provide enough forfeit nonces")?;
846
847 let (nonce, sig) = musig::deterministic_partial_sign(
848 &keypair,
849 [ark_info.server_pubkey],
850 &[srv_nonce],
851 sighash.to_byte_array(),
852 Some(vtxo.output_taproot().tap_tweak().to_byte_array()),
853 );
854 Ok((nonce, sig))
855 }).collect::<anyhow::Result<Vec<_>>>()?;
856
857 Ok((vtxo.id(), sigs))
858 })
859 .collect::<anyhow::Result<HashMap<_, _>>>()
860 .context("error signing forfeits")?;
861
862 let signed_vtxos = signed_vtxos.into_cached_tree();
863
864 let mut expected_vtxos = participation.outputs.iter().collect::<Vec<_>>();
865 let total_nb_expected_vtxos = expected_vtxos.len();
866
867 let mut new_vtxos = vec![];
868 for (idx, req) in signed_vtxos.spec.spec.vtxos.iter().enumerate() {
869 if let Some(expected_idx) = expected_vtxos.iter().position(|r| **r == req.vtxo) {
870 let vtxo = signed_vtxos.build_vtxo(idx).expect("correct leaf idx");
871
872 vtxo.validate(&unsigned_round_tx)
875 .context("constructed invalid vtxo from tree")?;
876
877 info!("New VTXO from round: {} ({}, {})",
878 vtxo.id(), vtxo.amount(), vtxo.policy_type(),
879 );
880
881 new_vtxos.push(vtxo);
882 expected_vtxos.swap_remove(expected_idx);
883 }
884 }
885
886 if !expected_vtxos.is_empty() {
887 if expected_vtxos.len() == total_nb_expected_vtxos {
888 bail!("None of our VTXOs were present in round!");
890 } else {
891 bail!("Server included some of our VTXOs but not all: {} missing: {:?}",
892 expected_vtxos.len(), expected_vtxos,
893 );
894 }
895 }
896 Ok((new_vtxos, forfeit_sigs))
897}
898
899async fn submit_forfeit_sigs(
900 wallet: &Wallet,
901 forfeit_sigs: HashMap<VtxoId, Vec<(musig::PublicNonce, musig::PartialSignature)>>,
902) -> anyhow::Result<()> {
903 let mut srv = wallet.require_server().context("server not available")?;
904
905 debug!("Sending {} sets of forfeit signatures for our inputs", forfeit_sigs.len());
906 srv.client.provide_forfeit_signatures(protos::ForfeitSignaturesRequest {
907 signatures: forfeit_sigs.into_iter().map(|(id, sigs)| {
908 protos::ForfeitSignatures {
909 input_vtxo_id: id.to_bytes().to_vec(),
910 pub_nonces: sigs.iter().map(|s| s.0.serialize().to_vec()).collect(),
911 signatures: sigs.iter().map(|s| s.1.serialize().to_vec()).collect(),
912 }
913 }).collect(),
914 }).await.context("failed to submit forfeit signatures")?;
915
916 Ok(())
917}
918
919async fn persist_round_success(
921 wallet: &Wallet,
922 participation: &RoundParticipation,
923 movement_id: Option<MovementId>,
924 new_vtxos: &[Vtxo],
925 signed_round_tx: &Transaction,
926) -> anyhow::Result<()> {
927 debug!("Persisting newly finished round. {} new vtxos, {} offboards, movement ID {:?}",
928 new_vtxos.len(), participation.offboards.len(), movement_id,
929 );
930
931 let store_result = wallet.store_spendable_vtxos(new_vtxos);
932 let spent_result = wallet.mark_vtxos_as_spent(&participation.inputs);
933 let update_result = if let Some(movement_id) = movement_id {
934 wallet.movements.update_movement(movement_id, MovementUpdate::new()
935 .produced_vtxos(new_vtxos)
936 .metadata([("funding_txid".into(), serde_json::to_value(signed_round_tx.compute_txid())?)])
937 ).await
938 } else {
939 Ok(())
940 };
941 match (store_result, spent_result, update_result) {
942 (Ok(()), Ok(()), Ok(())) => Ok(()),
943 (Err(e), _, _) => Err(e),
944 (_, Err(e), _) => Err(e),
945 (_, _, Err(e)) => Err(anyhow!(
946 "Failed to update movement after round success: {:#}", e
947 )),
948 }
949}
950
951async fn persist_round_failure(
953 wallet: &Wallet,
954 participation: &RoundParticipation,
955 movement_id: Option<MovementId>,
956) -> anyhow::Result<()> {
957 debug!("Attempting to persist the failure of a round with the movement ID {:?}", movement_id);
958 let unlock_result = wallet.unlock_vtxos(&participation.inputs);
959 let finish_result = if let Some(movement_id) = movement_id {
960 wallet.movements.finish_movement(movement_id, MovementStatus::Failed).await
961 } else {
962 Ok(())
963 };
964 if let Err(e) = &finish_result {
965 error!("Failed to mark movement as failed: {:#}", e);
966 }
967 match (unlock_result, finish_result) {
968 (Ok(()), Ok(())) => Ok(()),
969 (Err(e), _) => Err(e),
970 (_, Err(e)) => Err(anyhow!("Failed to mark movement as failed: {:#}", e)),
971 }
972}
973
974async fn update_funding_txid(
975 funding_txid: Txid,
976 movement_id: MovementId,
977 wallet: &Wallet,
978) -> anyhow::Result<()> {
979 wallet.movements.update_movement(
980 movement_id,
981 MovementUpdate::new()
982 .metadata([("funding_txid".into(), serde_json::to_value(&funding_txid)?)])
983 ).await.context("Unable to update funding txid of round")
984}
985
986#[derive(Debug)]
992pub struct UnconfirmedRound {
993 pub(crate) funding_tx: Transaction,
995 pub(crate) new_vtxos: Vec<Vtxo>,
996
997 pub(crate) double_spenders: Vec<Option<Txid>>,
1001
1002 pub(crate) first_double_spent_at: Option<SystemTime>,
1010}
1011
1012#[derive(Debug, Clone, PartialEq, Eq)]
1013pub(crate) enum UnconfirmedRoundStatus {
1014 Confirmed,
1016 DoubleSpent {
1018 double_spender: Option<Txid>,
1020 },
1021 Unconfirmed,
1022}
1023
1024impl UnconfirmedRound {
1025 pub fn new(
1028 funding_tx: Transaction,
1029 new_vtxos: Vec<Vtxo>,
1030 ) -> Self {
1031 UnconfirmedRound {
1032 new_vtxos: new_vtxos,
1033 double_spenders: vec![None; funding_tx.input.len()],
1034 funding_tx: funding_tx,
1035 first_double_spent_at: None,
1036 }
1037 }
1038
1039 pub fn funding_txid(&self) -> Txid {
1040 self.funding_tx.compute_txid()
1041 }
1042
1043 fn is_tx_signed(&self) -> bool {
1045 !self.funding_tx.input.iter().any(|i| i.witness.is_empty())
1046 }
1047
1048 async fn maybe_update_tx(&mut self, txid: Txid, chain: &ChainSource) {
1050 if !self.is_tx_signed() {
1051 if let Ok(Some(tx)) = chain.get_tx(&txid).await {
1052 assert_eq!(txid, tx.compute_txid());
1053 debug!("Retrieved signed version of round tx {}", txid);
1054 self.funding_tx = tx;
1055 }
1056 }
1057 }
1058
1059 async fn check_if_double_spent(
1062 &mut self,
1063 wallet: &Wallet,
1064 ) -> anyhow::Result<Option<UnconfirmedRoundStatus>> {
1065
1066 let round_txid = self.funding_txid();
1067 match wallet.chain.inner() {
1068 ChainSourceClient::Esplora(c) => {
1069 let mut confirmed = None;
1070 for (idx, input) in self.funding_tx.input.iter().enumerate() {
1071 if let Some(txid) = self.double_spenders[idx] {
1072 match wallet.chain.tx_status(txid).await? {
1073 TxStatus::Confirmed(b) => {
1074 confirmed = cmp::max(confirmed, Some((b.height, txid)));
1075 continue;
1076 },
1077 TxStatus::Mempool => continue,
1078 TxStatus::NotFound => self.double_spenders[idx] = None,
1079 }
1080 }
1081
1082 let info = c.get_output_status(
1083 &input.previous_output.txid, input.previous_output.vout as u64,
1084 ).await?;
1085 match info {
1086 None => warn!("Input {} of round tx {} not found by chain source",
1087 input.previous_output, round_txid,
1088 ),
1089 Some(info) => {
1090 if !info.spent || info.txid == Some(round_txid) {
1091 continue;
1092 }
1093
1094 let txid = info.txid.context("expected txid")?;
1095 self.double_spenders[idx] = Some(txid);
1096 let status = info.status.context("expected status")?;
1097 if let Some(height) = status.block_height {
1098 confirmed = cmp::max(confirmed, Some((height, txid)));
1100 }
1101 },
1102 }
1103 }
1104
1105 if let Some((height, txid)) = confirmed {
1106 let confirmations = wallet.chain.tip().await? - (height - 1);
1107 if confirmations >= wallet.config.round_tx_required_confirmations {
1108 return Ok(Some(UnconfirmedRoundStatus::DoubleSpent {
1109 double_spender: Some(txid),
1110 }));
1111 }
1112 debug!("Round tx {} double spent by tx {} with {} confirmations",
1113 round_txid, txid, confirmations,
1114 );
1115 }
1116
1117 Ok(None)
1118 },
1119 ChainSourceClient::Bitcoind(b) => {
1120 let mut doublespent = false;
1122 for inp in &self.funding_tx.input {
1123 let OutPoint { txid, vout } = inp.previous_output;
1124 if b.get_tx_out(&txid, vout, Some(false))?.is_none() {
1125 doublespent = true;
1126 break;
1127 }
1128 }
1129
1130 if doublespent {
1131 let now = SystemTime::now();
1132 let since = self.first_double_spent_at.get_or_insert(now);
1133 let req_confs = wallet.config.round_tx_required_confirmations;
1134 let req_time = 2 * req_confs * BLOCK_TIME;
1136 if let Ok(time) = now.duration_since(*since) && time > req_time {
1137 return Ok(Some(UnconfirmedRoundStatus::DoubleSpent {
1138 double_spender: None,
1139 }));
1140 }
1141 } else {
1142 self.first_double_spent_at.take();
1143 }
1144
1145 Ok(None)
1146 },
1147 }
1148 }
1149
1150 pub(crate) async fn sync(
1152 &mut self,
1153 wallet: &Wallet,
1154 ) -> anyhow::Result<UnconfirmedRoundStatus> {
1155 let txid = self.funding_txid();
1156 match wallet.chain.tx_status(txid).await? {
1157 TxStatus::NotFound => {
1158 debug!("Round funding tx {} no longer found in mempool", txid);
1159 if let Some(res) = self.check_if_double_spent(wallet).await? {
1160 return Ok(res);
1161 }
1162 if self.is_tx_signed() {
1163 let _ = wallet.chain.broadcast_tx(&self.funding_tx).await;
1165 }
1166 Ok(UnconfirmedRoundStatus::Unconfirmed)
1167 },
1168 TxStatus::Mempool => {
1169 debug!("Round funding tx {} still in mempool, waiting for confirmations", txid);
1170 self.first_double_spent_at = None;
1171 self.maybe_update_tx(txid, &wallet.chain).await;
1172 Ok(UnconfirmedRoundStatus::Unconfirmed)
1173 },
1174 TxStatus::Confirmed(block) => {
1175 self.first_double_spent_at = None;
1176 self.maybe_update_tx(txid, &wallet.chain).await;
1177 let confirmations = {
1178 let tip = wallet.chain.tip().await?;
1179 tip - block.height + 1
1180 };
1181 debug!("Round funding tx {} has {} confirmations", txid, confirmations);
1182
1183 if confirmations >= wallet.config.round_tx_required_confirmations {
1184 Ok(UnconfirmedRoundStatus::Confirmed)
1193 } else {
1194 Ok(UnconfirmedRoundStatus::Unconfirmed)
1195 }
1196 },
1197 }
1198 }
1199}
1200
1201
1202impl Wallet {
1203 pub async fn join_next_round(
1207 &self,
1208 participation: RoundParticipation,
1209 movement_kind: Option<RoundMovement>,
1210 ) -> anyhow::Result<StoredRoundState> {
1211 if let Some(payreq) = participation.outputs.iter().find(|p| p.amount < P2TR_DUST) {
1213 bail!("VTXO amount must be at least {}, requested {}", P2TR_DUST, payreq.amount);
1214 }
1215 if let Some(offb) = participation.offboards.iter().find(|o| o.amount < P2TR_DUST) {
1216 bail!("Offboard amount must be at least {}, requested {}", P2TR_DUST, offb.amount);
1217 }
1218
1219 let movement_id = if let Some(kind) = movement_kind {
1220 let movement_id = self.movements.new_movement(
1221 self.subsystem_ids[&BarkSubsystem::Round], kind.to_string(),
1222 ).await?;
1223 let update = participation.to_movement_update(self.chain.network())?;
1224 self.movements.update_movement(movement_id, update).await?;
1225 Some(movement_id)
1226 } else {
1227 None
1228 };
1229 let state = RoundState::new(participation, movement_id);
1230
1231 let id = self.db.store_round_state_lock_vtxos(&state)?;
1232 Ok(StoredRoundState { id, state })
1233 }
1234
1235 pub fn pending_round_states(&self) -> anyhow::Result<Vec<StoredRoundState>> {
1237 self.db.load_round_states()
1238 }
1239
1240 pub async fn sync_pending_rounds(&self) -> anyhow::Result<()> {
1242 let states = self.db.load_round_states()?;
1243 if !states.is_empty() {
1244 debug!("Syncing {} pending round states...", states.len());
1245
1246 tokio_stream::iter(states).for_each_concurrent(10, |mut state| async move {
1247 if state.state.ongoing_participation() {
1249 return;
1250 }
1251
1252 match state.state.sync(self).await {
1253 Ok(RoundStatus::Confirmed { funding_txid }) => {
1254 info!("Round confirmed. Funding tx {}", funding_txid);
1255 if let Err(e) = self.db.remove_round_state(&state) {
1256 warn!("Error removing confirmed round state from db: {:#}", e);
1257 }
1258 },
1259 Ok(RoundStatus::Unconfirmed { funding_txid }) => {
1260 info!("Waiting for confirmations for round funding tx {}", funding_txid);
1261 if let Err(e) = self.db.update_round_state(&state) {
1262 warn!("Error updating pending round state in db: {:#}", e);
1263 }
1264 },
1265 Ok(RoundStatus::Pending { unsigned_funding_txids: txs }) => {
1266 info!("Round still pending, potential funding txs: {:?}", txs);
1267 if let Err(e) = self.db.update_round_state(&state) {
1268 warn!("Error updating pending round state in db: {:#}", e);
1269 }
1270 },
1271 Ok(RoundStatus::Failed { error }) => {
1272 error!("Round failed: {}", error);
1273 if let Err(e) = self.db.remove_round_state(&state) {
1274 warn!("Error removing failed round state from db: {:#}", e);
1275 }
1276 },
1277 Ok(RoundStatus::Canceled) => {
1278 error!("Round canceled");
1279 if let Err(e) = self.db.remove_round_state(&state) {
1280 warn!("Error removing canceled round state from db: {:#}", e);
1281 }
1282 },
1283 Err(e) => warn!("Error syncing round: {:#}", e),
1284 }
1285 }).await;
1286 }
1287
1288 let recovered = self.db.load_recovered_rounds()?;
1290 if !recovered.is_empty() {
1291 debug!("Syncing {} recovered past rounds...", recovered.len());
1292
1293 tokio_stream::iter(recovered).for_each_concurrent(10, |mut state| async move {
1294 match state.sync(self).await {
1295 Ok(UnconfirmedRoundStatus::Confirmed) => {
1296 info!("Recovered old round with funding txid {} confirmed",
1297 state.funding_txid(),
1298 );
1299 if let Err(e) = self.db.remove_recovered_round(state.funding_txid()) {
1300 warn!("Error removing finished recovered round from db: {:#}", e);
1301 }
1302 },
1303 Ok(UnconfirmedRoundStatus::DoubleSpent { double_spender }) => {
1304 debug!("Old recovered round {} invalidated because double spent by {:?}",
1305 state.funding_txid(), double_spender,
1306 );
1307 if let Err(e) = self.db.remove_recovered_round(state.funding_txid()) {
1308 warn!("Error invalidated recovered round from db: {:#}", e);
1309 }
1310 },
1311 Ok(UnconfirmedRoundStatus::Unconfirmed) => {},
1312 Err(e) => debug!("Error trying to progress recovered past round: {:#}", e),
1313 }
1314 }).await;
1315 }
1316
1317 Ok(())
1318 }
1319
1320 async fn get_last_round_event(&self) -> anyhow::Result<RoundEvent> {
1322 let mut srv = self.require_server()?;
1323 let e = srv.client.last_round_event(protos::Empty {}).await?.into_inner();
1324 Ok(RoundEvent::try_from(e).context("invalid event format from server")?)
1325 }
1326
1327 async fn inner_process_event(
1328 &self,
1329 states: impl IntoIterator<Item = &mut StoredRoundState>,
1330 event: Option<&RoundEvent>,
1331 ) {
1332 tokio_stream::iter(states).for_each_concurrent(3, |state| async move {
1333 if let Some(event) = event && state.state.ongoing_participation() {
1334 let updated = state.state.process_event(self, &event).await;
1335 if updated {
1336 if let Err(e) = self.db.update_round_state(&state) {
1337 error!("Error storing round state #{} after progress: {:#}", state.id, e);
1338 }
1339 }
1340 }
1341
1342 match state.state.sync(self).await {
1343 Err(e) => warn!("Error syncing round #{}: {:#}", state.id, e),
1344 Ok(s) if s.is_final() => {
1345 info!("Round #{} finished with result: {:?}", state.id, s);
1346 if let Err(e) = self.db.remove_round_state(&state) {
1347 warn!("Failed to remove finished round #{} from db: {:#}", state.id, e);
1348 }
1349 },
1350 Ok(s) => {
1351 trace!("Round state #{} is now in state {:?}", state.id, s);
1352 if let Err(e) = self.db.update_round_state(&state) {
1353 warn!("Error storing round state #{}: {:#}", state.id, e);
1354 }
1355 },
1356 }
1357 }).await;
1358 }
1359
1360 pub async fn progress_pending_rounds(
1365 &self,
1366 last_round_event: Option<&RoundEvent>,
1367 ) -> anyhow::Result<()> {
1368 let mut states = self.db.load_round_states()?;
1369 info!("Processing {} rounds...", states.len());
1370
1371 let mut last_round_event = last_round_event.map(|e| Cow::Borrowed(e));
1372 if states.iter().any(|s| s.state.ongoing_participation()) && last_round_event.is_none() {
1373 match self.get_last_round_event().await {
1374 Ok(e) => last_round_event = Some(Cow::Owned(e)),
1375 Err(e) => {
1376 warn!("Error fetching round event, \
1377 failed to progress ongoing rounds: {:#}", e);
1378 },
1379 }
1380 }
1381
1382 let event = last_round_event.as_ref().map(|c| c.as_ref());
1383 self.inner_process_event(states.iter_mut(), event).await;
1384
1385 Ok(())
1386 }
1387
1388 pub async fn subscribe_round_events(&self)
1389 -> anyhow::Result<impl Stream<Item = anyhow::Result<RoundEvent>> + Unpin>
1390 {
1391 let mut srv = self.require_server()?;
1392 let events = srv.client.subscribe_rounds(protos::Empty {}).await?
1393 .into_inner().map(|m| {
1394 let m = m.context("received error on event stream")?;
1395 let e = RoundEvent::try_from(m.clone())
1396 .with_context(|| format!("error converting rpc round event: {:?}", m))?;
1397 trace!("Received round event: {}", e);
1398 Ok::<_, anyhow::Error>(e)
1399 });
1400 Ok(events)
1401 }
1402
1403 pub async fn participate_ongoing_rounds(&self) -> anyhow::Result<()> {
1408 let mut states = self.db.load_round_states()?;
1409 states.retain(|s| s.state.ongoing_participation());
1410
1411 if states.is_empty() {
1412 info!("No pending round states");
1413 return Ok(());
1414 }
1415
1416 let mut events = self.subscribe_round_events().await?;
1417
1418 info!("Participating with {} round states...", states.len());
1419
1420 loop {
1421 let event = events.next().await
1422 .context("events stream broke")?
1423 .context("error on event stream")?;
1424
1425 self.inner_process_event(states.iter_mut(), Some(&event)).await;
1426
1427 states.retain(|s| s.state.ongoing_participation());
1428 if states.is_empty() {
1429 info!("All rounds handled");
1430 return Ok(());
1431 }
1432 }
1433 }
1434
1435 pub async fn cancel_all_pending_rounds(&self) -> anyhow::Result<()> {
1440 let states = self.db.load_round_states()?;
1441 for mut state in states {
1442 match state.state.try_cancel(self).await {
1443 Ok(true) => {
1444 if let Err(e) = self.db.remove_round_state(&state) {
1445 warn!("Error removing canceled round state from db: {:#}", e);
1446 }
1447 },
1448 Ok(false) => {},
1449 Err(e) => warn!("Error trying to cancel round #{}: {:#}", state.id, e),
1450 }
1451 }
1452 Ok(())
1453 }
1454
1455 pub async fn cancel_pending_round(&self, id: RoundStateId) -> anyhow::Result<()> {
1457 let states = self.db.load_round_states()?;
1458 for mut state in states {
1459 if state.id != id {
1460 continue;
1461 }
1462
1463 if state.state.try_cancel(self).await.context("failed to cancel round")? {
1464 self.db.remove_round_state(&state)
1465 .context("error removing canceled round state from db")?;
1466 } else {
1467 bail!("failed to cancel round");
1468 }
1469 return Ok(());
1470 }
1471 bail!("round not found")
1472 }
1473
1474 pub(crate) async fn participate_round(
1481 &self,
1482 participation: RoundParticipation,
1483 movement_kind: Option<RoundMovement>,
1484 ) -> anyhow::Result<RoundStatus> {
1485 let mut state = self.join_next_round(participation, movement_kind).await?;
1486
1487 info!("Waiting for a round start...");
1488 let mut events = self.subscribe_round_events().await?;
1489
1490 loop {
1491 if !state.state.ongoing_participation() {
1492 return Ok(state.state.sync(self).await?);
1493 }
1494
1495 let event = events.next().await
1496 .context("events stream broke")?
1497 .context("error on event stream")?;
1498 if state.state.process_event(self, &event).await {
1499 self.db.update_round_state(&state)?;
1500 }
1501 }
1502 }
1503
1504 pub async fn start_sync_past_rounds(&self) -> anyhow::Result<()> {
1508 let mut srv = self.require_server()?;
1509
1510 let fresh_rounds = srv.client.get_fresh_rounds(protos::FreshRoundsRequest {
1511 last_round_txid: None,
1512 }).await?.into_inner().txids.into_iter()
1513 .map(|txid| RoundId::from_slice(&txid))
1514 .collect::<Result<Vec<_>, _>>()?;
1515
1516 if fresh_rounds.is_empty() {
1517 debug!("No new rounds to sync");
1518 return Ok(());
1519 }
1520
1521 debug!("Received {} new rounds from ark", fresh_rounds.len());
1522
1523 let last_pk_index = self.db.get_last_vtxo_key_index()?.unwrap_or_default();
1524 let pubkeys = (0..=last_pk_index).map(|idx| {
1525 self.vtxo_seed.derive_keypair(idx).public_key()
1526 }).collect::<HashSet<_>>();
1527
1528 let pending_states = Arc::new(self.db.load_recovered_rounds()?.into_iter()
1529 .map(|s| (s.funding_txid(), s))
1530 .collect::<HashMap<_, _>>());
1531
1532 let results = tokio_stream::iter(fresh_rounds).map(|round_id| {
1533 let pubkeys = pubkeys.clone();
1534 let mut srv = srv.clone();
1535 let pending_states = pending_states.clone();
1536
1537 async move {
1538 if pending_states.contains_key(&round_id.as_round_txid()) {
1540 debug!("Skipping round {} because it already exists", round_id);
1541 return Ok::<_, anyhow::Error>(());
1542 }
1543
1544 let req = protos::RoundId {
1545 txid: round_id.as_round_txid().to_byte_array().to_vec(),
1546 };
1547 let round = srv.client.get_round(req).await?.into_inner();
1548
1549 let tree = SignedVtxoTreeSpec::deserialize(&round.signed_vtxos)
1550 .context("invalid signed vtxo tree from srv")?
1551 .into_cached_tree();
1552
1553 let mut reqs = Vec::new();
1554 let mut vtxos = vec![];
1555 for (idx, dest) in tree.spec.spec.vtxos.iter().enumerate() {
1556 if pubkeys.contains(&dest.vtxo.policy.user_pubkey()) {
1557 let vtxo = tree.build_vtxo(idx).expect("correct leaf idx");
1558
1559 if self.db.get_wallet_vtxo(vtxo.id())?.is_none() {
1560 debug!("Built new vtxo {} with value {}", vtxo.id(), vtxo.amount());
1561 reqs.push(dest.vtxo.clone());
1562 vtxos.push(vtxo);
1563 } else {
1564 debug!("Not adding vtxo {} because it already exists", vtxo.id());
1565 }
1566 }
1567 }
1568
1569 let round_tx = deserialize::<Transaction>(&round.funding_tx)?;
1570
1571 let state = UnconfirmedRound::new(round_tx, vtxos);
1572 self.db.store_recovered_round(&state)?;
1573
1574 Ok(())
1575 }
1576 })
1577 .buffer_unordered(10)
1578 .collect::<Vec<_>>()
1579 .await;
1580
1581 for result in results {
1582 if let Err(e) = result {
1583 return Err(e).context("failed to sync round");
1584 }
1585 }
1586
1587 Ok(())
1588 }
1589}