bark/
round.rs

1//! Round State Machine
2//!
3//!
4
5use std::{cmp, iter};
6use std::borrow::Cow;
7use std::convert::Infallible;
8use std::sync::Arc;
9use std::time::{Duration, SystemTime};
10use std::collections::{HashMap, HashSet};
11
12use anyhow::Context;
13use bdk_esplora::esplora_client::Amount;
14use bip39::rand;
15use bitcoin::consensus::encode::{serialize_hex, deserialize};
16use bitcoin::key::Keypair;
17use bitcoin::secp256k1::{schnorr, PublicKey};
18use bitcoin::{Address, Network, OutPoint, Transaction, Txid};
19use bitcoin::consensus::Params;
20use bitcoin::hashes::Hash;
21use futures::future::try_join_all;
22use futures::{Stream, StreamExt};
23use log::{debug, error, info, trace, warn};
24
25use ark::{OffboardRequest, ProtocolEncoding, SignedVtxoRequest, Vtxo, VtxoId, VtxoRequest};
26use ark::connectors::ConnectorChain;
27use ark::musig::{self, DangerousSecretNonce, PublicNonce, SecretNonce};
28use ark::rounds::{
29	RoundAttempt, RoundEvent, RoundFinished, RoundId, RoundSeq, MIN_ROUND_TX_OUTPUTS, ROUND_TX_CONNECTOR_VOUT, ROUND_TX_VTXO_TREE_VOUT
30};
31use ark::tree::signed::{SignedVtxoTreeSpec, VtxoTreeSpec};
32use bitcoin_ext::{TxStatus, P2TR_DUST};
33use bitcoin_ext::rpc::RpcApi;
34use server_rpc::protos;
35
36use crate::{SECP, Wallet};
37use crate::movement::{MovementDestination, MovementId, MovementStatus};
38use crate::movement::update::MovementUpdate;
39use crate::onchain::{ChainSource, ChainSourceClient};
40use crate::persist::{RoundStateId, StoredRoundState};
41use crate::subsystem::{BarkSubsystem, RoundMovement};
42
43/// Bitcoin's block time of 10 minutes.
44const BLOCK_TIME: Duration = Duration::from_secs(10 * 60);
45
46
47/// Struct to communicate your specific participation for an Ark round.
48#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct RoundParticipation {
50	#[serde(with = "ark::encode::serde::vec")]
51	pub inputs: Vec<Vtxo>,
52	/// The output VTXOs that we request in the round,
53	/// including change
54	pub outputs: Vec<VtxoRequest>,
55	pub offboards: Vec<OffboardRequest>,
56}
57
58impl RoundParticipation {
59	pub fn to_movement_update(&self, network: Network) -> anyhow::Result<MovementUpdate> {
60		let params = Params::from(network);
61		let input_amount = self.inputs.iter().map(|i| i.amount()).sum::<Amount>();
62		let output_amount = self.outputs.iter().map(|r| r.amount).sum::<Amount>();
63		let offboard_amount = self.offboards.iter().map(|r| r.amount).sum::<Amount>();
64		let fee = input_amount - output_amount - offboard_amount;
65		let intended = -offboard_amount.to_signed()?;
66		let mut sent_to = Vec::with_capacity(self.offboards.len());
67		for o in &self.offboards {
68			let address = Address::from_script(&o.script_pubkey, &params)?;
69			sent_to.push(MovementDestination::new(address.to_string(), o.amount));
70		}
71		Ok(MovementUpdate::new()
72			.consumed_vtxos(&self.inputs)
73			.intended_balance(intended)
74			.effective_balance(intended - fee.to_signed()?)
75			.fee(fee)
76			.sent_to(sent_to)
77		)
78	}
79}
80
81#[derive(Debug, Clone)]
82pub enum RoundStatus {
83	/// The round was successful and is fully confirmed
84	Confirmed {
85		funding_txid: Txid,
86	},
87	/// Round successful but not fully confirmed
88	Unconfirmed {
89		funding_txid: Txid,
90	},
91	/// We have unsigned funding transactions that might confirm
92	Pending {
93		unsigned_funding_txids: Vec<Txid>,
94	},
95	/// The round failed
96	Failed {
97		error: String,
98	},
99	Canceled,
100}
101
102impl RoundStatus {
103	/// Whether this is the final state and it won't change anymore
104	pub fn is_final(&self) -> bool {
105		match self {
106			Self::Confirmed { .. } => true,
107			Self::Unconfirmed { .. } => false,
108			Self::Pending { .. } => false,
109			Self::Failed { .. } => true,
110			Self::Canceled => true,
111		}
112	}
113
114	/// Whether it looks like the round succeeded
115	pub fn is_success(&self) -> bool {
116		match self {
117			Self::Confirmed { .. } => true,
118			Self::Unconfirmed { .. } => true,
119			Self::Pending { .. } => false,
120			Self::Failed { .. } => false,
121			Self::Canceled => false,
122		}
123	}
124}
125
126/// State of the progress of a round participation
127///
128/// An instance of this struct is kept all the way from the intention of joining
129/// the next round, until either the round fully confirms or it fails and we are
130/// sure it won't have any effect on our wallet.
131///
132/// As soon as we have signed forfeit txs for the round, we keep track of this
133/// round attempt until we see another attempt we participated in confirm or
134/// we gain confidence that the failed attempt will never confirm.
135pub struct RoundState {
136	/// Our participation in this round
137	pub(crate) participation: RoundParticipation,
138
139	/// The flow of the round in case it is still ongoing with the server
140	pub(crate) flow: RoundFlowState,
141
142	/// A potential final state for each round-attempt
143	pub(crate) unconfirmed_rounds: Vec<UnconfirmedRound>,
144
145	/// The ID of the [Movement] associated with this round
146	pub(crate) movement_id: Option<MovementId>,
147}
148
149impl RoundState {
150	fn new(participation: RoundParticipation, movement_id: Option<MovementId>) -> Self {
151		Self {
152			participation,
153			movement_id,
154			flow: RoundFlowState::WaitingToStart,
155			unconfirmed_rounds: Vec::new(),
156		}
157	}
158
159	/// Our participation in this round
160	pub fn participation(&self) -> &RoundParticipation {
161		&self.participation
162	}
163
164	pub fn flow(&self) -> &RoundFlowState {
165		&self.flow
166	}
167
168	pub fn unconfirmed_rounds(&self) -> &[UnconfirmedRound] {
169		&self.unconfirmed_rounds
170	}
171
172	/// Whether the interactive part of the round is still ongoing
173	pub fn ongoing_participation(&self) -> bool {
174		match self.flow {
175			RoundFlowState::WaitingToStart => true,
176			RoundFlowState::Ongoing { .. } => true,
177			RoundFlowState::Success => false,
178			RoundFlowState::Failed { .. } => false,
179			RoundFlowState::Canceled => false,
180		}
181	}
182
183	/// Tries to cancel the round and returns whether it was succesfully canceled
184	/// or if it was already canceled or failed
185	pub async fn try_cancel(&mut self, wallet: &Wallet) -> anyhow::Result<bool> {
186		let ret = match self.flow {
187			RoundFlowState::Canceled => true,
188			RoundFlowState::Success => false,
189			RoundFlowState::WaitingToStart => {
190				self.flow = RoundFlowState::Canceled;
191				true
192			},
193			RoundFlowState::Failed { .. } => self.unconfirmed_rounds().is_empty(),
194			RoundFlowState::Ongoing { .. } => {
195				if self.unconfirmed_rounds().is_empty() {
196					self.flow = RoundFlowState::Canceled;
197					true
198				} else {
199					false
200				}
201			},
202		};
203		if ret {
204			persist_round_failure(wallet, &self.participation, self.movement_id).await
205				.context("failed to persist round failure for cancelation")?;
206		}
207		Ok(ret)
208	}
209
210	async fn try_start_attempt(&mut self, wallet: &Wallet, attempt: &RoundAttempt) {
211		match start_attempt(wallet, &self.participation, attempt).await {
212			Ok(state) => {
213				self.flow = RoundFlowState::Ongoing {
214					round_seq: attempt.round_seq,
215					attempt_seq: attempt.attempt_seq,
216					state: state,
217				};
218			},
219			Err(e) => {
220				self.flow = RoundFlowState::Failed {
221					error: format!("{:#}", e),
222				};
223			},
224		}
225	}
226
227	/// Processes the given event and returns true if some update was made to the state
228	pub async fn process_event(
229		&mut self,
230		wallet: &Wallet,
231		event: &RoundEvent,
232	) -> bool {
233		let _: Infallible = match self.flow {
234			RoundFlowState::WaitingToStart => {
235				if let RoundEvent::Attempt(e) = event && e.attempt_seq == 0 {
236					trace!("Joining round attempt {}:{}", e.round_seq, e.attempt_seq);
237					self.try_start_attempt(wallet, e).await;
238					return true;
239				} else {
240					trace!("Ignoring {} event (seq {}:{}), waiting for round to start",
241						event.kind(), event.round_seq(), event.attempt_seq(),
242					);
243					return false;
244				}
245			},
246			RoundFlowState::Ongoing { round_seq, attempt_seq, ref mut state } => {
247				// here we catch the cases where we're in a wrong flow
248
249				if event.round_seq() > round_seq {
250					// new round started, we don't support multiple parallel rounds,
251					// this means we failed
252					self.flow = RoundFlowState::Failed {
253						error: format!("round {} started while we were on {}",
254							event.round_seq(), round_seq,
255						),
256					};
257					return true;
258				}
259
260				if event.attempt_seq() < attempt_seq {
261					trace!("ignoring replayed message from old attempt");
262					return false;
263				}
264
265				if let RoundEvent::Attempt(e) = event && e.attempt_seq > attempt_seq {
266					trace!("Joining new round attempt {}:{}", e.round_seq, e.attempt_seq);
267					self.try_start_attempt(wallet, e).await;
268					return true;
269				}
270				trace!("Processing event {} for round attempt {}:{} in state {}",
271					event.kind(), round_seq, attempt_seq, state.kind(),
272				);
273
274				let mut updated = false;
275				match progress_attempt(state, wallet, &self.participation, event).await {
276					AttemptProgressResult::NotUpdated => {},
277					AttemptProgressResult::Updated { new_state, new_unconfirmed_round } => {
278						if let Some(r) = new_unconfirmed_round {
279							self.unconfirmed_rounds.push(r);
280						}
281						*state = new_state;
282						updated = true;
283					},
284					AttemptProgressResult::Failed(e) => {
285						self.flow = RoundFlowState::Failed { error: format!("{:#}", e) };
286						updated = true;
287					},
288					AttemptProgressResult::Finished { signed_round_tx, vtxos } => {
289						assert!(!self.unconfirmed_rounds.is_empty());
290
291						// we need to update our UnconfirmedRound with the signed tx
292						let txid = signed_round_tx.compute_txid();
293						if let Some(round) = self.unconfirmed_rounds.iter_mut()
294							.find(|r| r.funding_txid() == txid)
295						{
296							round.funding_tx = signed_round_tx;
297
298							if let Err(e) = persist_round_success(
299								wallet,
300								&self.participation,
301								self.movement_id,
302								&vtxos,
303								&round.funding_tx,
304							).await {
305								error!("Error while storing succesful round: {:#}", e);
306								//TODO(stevenroose) make sure we call this again timely!
307							}
308
309							self.flow = RoundFlowState::Success;
310						} else {
311							self.flow = RoundFlowState::Failed {
312								error: format!("server sent signed round tx {}, \
313									but we don't have a state for that", txid,),
314							};
315						};
316						updated = true;
317					},
318				}
319				return updated;
320			},
321			RoundFlowState::Success { .. }
322				| RoundFlowState::Failed { .. }
323				| RoundFlowState::Canceled => return false,
324		};
325	}
326
327	/// Sync the round's status and return it
328	///
329	/// When success or failure is returned, the round state can be eliminated
330	pub async fn sync(&mut self, wallet: &Wallet) -> anyhow::Result<RoundStatus> {
331		let mut confirmed_funding_txid = None;
332		let mut idx = 0;
333		while idx < self.unconfirmed_rounds.len() {
334			let round = self.unconfirmed_rounds.get_mut(idx).unwrap();
335
336			let was_signed = round.is_tx_signed();
337			let res = round.sync(wallet).await;
338
339			// if we just saw the signed tx, issue the new VTXOs and mark movement as OK
340			//TODO(stevenroose) after we make `persist_round_success` idempotent,
341			// just always go into this branch when we have a signed tx to make
342			// sure a db failure get fixed on the next call of `sync`.
343			if !was_signed && round.is_tx_signed() {
344				if let Err(e) = persist_round_success(
345					wallet,
346					&self.participation,
347					self.movement_id,
348					&round.new_vtxos,
349					&round.funding_tx,
350				).await {
351					error!("Error storing state after seeing signed funding tx: {:#?}", e);
352					idx += 1;
353					continue;
354				}
355			}
356
357			//TODO(stevenroose) after the persist methods are idempotent, also
358			// persist the round as failed here if a signed round tx is no longer in
359			// the mempool
360
361			let _: Infallible = match res {
362				Ok(UnconfirmedRoundStatus::Confirmed) => {
363					confirmed_funding_txid = Some(round.funding_txid());
364					// let's not remove this one just to be sure we can't
365					// accidentally lose track of it
366					// we should remove the entire state after this anyway
367					idx += 1;
368					continue;
369				},
370				Ok(UnconfirmedRoundStatus::DoubleSpent { double_spender }) => {
371					debug!("Round with round txid {} got double spent by tx {:?}",
372						round.funding_tx.compute_txid(), double_spender,
373					);
374					self.unconfirmed_rounds.swap_remove(idx);
375					continue; // skip idx increment
376				},
377				Ok(UnconfirmedRoundStatus::Unconfirmed) => {
378					idx += 1;
379					continue;
380				},
381				Err(e) => {
382					warn!("Error syncing status of unconfirmed round: {:#}", e);
383					trace!("Error syncing status of unconfirmed round: err={:#}; state={:?}",
384						e, round,
385					);
386					idx += 1;
387					continue;
388				}
389			};
390		}
391
392		let status = if let Some(funding_txid) = confirmed_funding_txid {
393			if let Some(movement_id) = self.movement_id {
394				update_funding_txid(funding_txid, movement_id, wallet).await?;
395				wallet.movements.finish_movement(movement_id, MovementStatus::Finished).await?;
396			}
397
398			RoundStatus::Confirmed { funding_txid }
399		} else if self.unconfirmed_rounds.is_empty() {
400			match self.flow {
401				RoundFlowState::WaitingToStart | RoundFlowState::Ongoing { .. } => {
402					RoundStatus::Pending { unsigned_funding_txids: vec![] }
403				}
404				RoundFlowState::Success => {
405					persist_round_failure(wallet, &self.participation, self.movement_id).await
406						.context("failed to persist round failure")?;
407					RoundStatus::Failed {
408						error: "all pending round funding transactions have been double spent".into(),
409					}
410				},
411				RoundFlowState::Failed { ref error } => {
412					persist_round_failure(wallet, &self.participation, self.movement_id).await
413						.context("failed to persist round failure")?;
414					RoundStatus::Failed { error: error.clone() }
415				},
416				RoundFlowState::Canceled => {
417					persist_round_failure(wallet, &self.participation, self.movement_id).await
418						.context("failed to persist round failure")?;
419					RoundStatus::Canceled
420				},
421			}
422		} else if let Some(signed) = self.unconfirmed_rounds.iter().find(|r| r.is_tx_signed()) {
423			let funding_txid = signed.funding_txid();
424			if let Some(movement_id) = self.movement_id {
425				update_funding_txid(funding_txid, movement_id, wallet).await?;
426			}
427
428			RoundStatus::Unconfirmed { funding_txid }
429		} else {
430			RoundStatus::Pending {
431				unsigned_funding_txids: self.unconfirmed_rounds.iter()
432					.map(|r| r.funding_txid())
433					.collect(),
434			}
435		};
436		Ok(status)
437	}
438
439	/// Once we know the signed round funding tx, this returns the output VTXOs
440	/// for this round.
441	pub fn output_vtxos(&self) -> Option<&[Vtxo]> {
442		for round in self.unconfirmed_rounds.iter() {
443			if round.is_tx_signed() {
444				return Some(&round.new_vtxos);
445			}
446		}
447		None
448	}
449
450	/// Returns the input VTXOs that are locked in this round, but only
451	/// if no output VTXOs were issued yet.
452	pub fn locked_pending_inputs(&self) -> &[Vtxo] {
453		if self.unconfirmed_rounds.iter().any(|r| r.is_tx_signed()) {
454			// new vtxos aready issued
455			return &[];
456		}
457
458		match self.flow {
459			RoundFlowState::WaitingToStart
460				| RoundFlowState::Ongoing { .. }
461				| RoundFlowState::Success =>
462			{
463				&self.participation.inputs
464			},
465			RoundFlowState::Failed { .. } | RoundFlowState::Canceled => {
466				// inputs already unlocked
467				&[]
468			},
469		}
470	}
471}
472
473/// The state of the process flow of a round
474///
475/// This tracks the progress of the interactive part of the round, from
476/// waiting to start until finishing either succesfully or with a failure.
477pub enum RoundFlowState {
478	WaitingToStart,
479	Ongoing {
480		round_seq: RoundSeq,
481		attempt_seq: usize,
482		state: AttemptState,
483	},
484	Success,
485	Failed {
486		error: String,
487	},
488	Canceled,
489}
490
491/// The state of a single round attempt
492///
493/// For each attempt that we participate in, we keep the state of our concrete
494/// participation.
495pub enum AttemptState {
496	AwaitingAttempt,
497	AwaitingUnsignedVtxoTree {
498		cosign_keys: Vec<Keypair>,
499		secret_nonces: Vec<Vec<DangerousSecretNonce>>,
500	},
501	AwaitingRoundProposal {
502		unsigned_round_tx: Transaction,
503		vtxos_spec: VtxoTreeSpec,
504	},
505	AwaitingFinishedRound {
506		unsigned_round_tx: Transaction,
507		new_vtxos: Vec<Vtxo>,
508	},
509}
510
511impl AttemptState {
512	/// The state kind represented as a string
513	fn kind(&self) -> &'static str {
514		match self {
515			Self::AwaitingAttempt => "AwaitingAttempt",
516			Self::AwaitingUnsignedVtxoTree { .. } => "AwaitingUnsignedVtxoTree",
517			Self::AwaitingRoundProposal { .. } => "AwaitingRoundProposal",
518			Self::AwaitingFinishedRound { .. } => "AwaitingFinishedRound",
519		}
520	}
521}
522
523/// Result from trying to progress an ongoing round attempt
524enum AttemptProgressResult {
525	Finished {
526		signed_round_tx: Transaction,
527		vtxos: Vec<Vtxo>,
528	},
529	Failed(anyhow::Error),
530	/// When the state changes, this variant is returned
531	///
532	/// If during the processing, we have signed any forfeit txs and tried
533	/// sending them to the server, the [UnconfirmedRound] instance is returned
534	/// so that it can be stored in the state.
535	Updated {
536		new_state: AttemptState,
537		new_unconfirmed_round: Option<UnconfirmedRound>,
538	},
539	NotUpdated,
540}
541
542/// Participate in the new round attempt by submitting our round participation
543async fn start_attempt(
544	wallet: &Wallet,
545	participation: &RoundParticipation,
546	event: &RoundAttempt,
547) -> anyhow::Result<AttemptState> {
548	let mut srv = wallet.require_server().context("server not available")?;
549	let ark_info = srv.ark_info().await?;
550
551	// Assign cosign pubkeys to the payment requests.
552	let cosign_keys = iter::repeat_with(|| Keypair::new(&SECP, &mut rand::thread_rng()))
553		.take(participation.outputs.len())
554		.collect::<Vec<_>>();
555	let vtxo_reqs = participation.outputs.iter().zip(cosign_keys.iter()).map(|(p, ck)| {
556		SignedVtxoRequest { vtxo: p.clone(), cosign_pubkey: Some(ck.public_key()) }
557	}).collect::<Vec<_>>();
558
559	// Prepare round participation info.
560	// For each of our requested vtxo output, we need a set of public and secret nonces.
561	let cosign_nonces = cosign_keys.iter()
562		.map(|key| {
563			let mut secs = Vec::with_capacity(ark_info.nb_round_nonces);
564			let mut pubs = Vec::with_capacity(ark_info.nb_round_nonces);
565			for _ in 0..ark_info.nb_round_nonces {
566				let (s, p) = musig::nonce_pair(key);
567				secs.push(s);
568				pubs.push(p);
569			}
570			(secs, pubs)
571		})
572		.take(vtxo_reqs.len())
573		.collect::<Vec<(Vec<SecretNonce>, Vec<PublicNonce>)>>();
574
575	// The round has now started. We can submit our payment.
576	debug!("Submitting payment request with {} inputs, {} vtxo outputs and {} offboard outputs",
577		participation.inputs.len(), vtxo_reqs.len(), participation.offboards.len(),
578	);
579
580	srv.client.submit_payment(protos::SubmitPaymentRequest {
581		input_vtxos: participation.inputs.iter().map(|vtxo| {
582			let keypair = wallet.get_vtxo_key(&vtxo)
583				.expect("owned vtxo key should be in database");
584
585			protos::InputVtxo {
586				vtxo_id: vtxo.id().to_bytes().to_vec(),
587				ownership_proof: {
588					let sig = event.challenge.sign_with(
589						vtxo.id(), &vtxo_reqs, &participation.offboards, keypair,
590					);
591					sig.serialize().to_vec()
592				},
593			}
594		}).collect(),
595		vtxo_requests: vtxo_reqs.iter().zip(cosign_nonces.iter()).map(|(r, n)| {
596			protos::SignedVtxoRequest {
597				vtxo: Some(protos::VtxoRequest {
598					amount: r.vtxo.amount.to_sat(),
599					policy: r.vtxo.policy.serialize(),
600				}),
601				cosign_pubkey: r.cosign_pubkey.expect("just set").serialize().to_vec(),
602				public_nonces: n.1.iter().map(|n| n.serialize().to_vec()).collect(),
603			}
604		}).collect(),
605		offboard_requests: participation.offboards.iter().map(|r| {
606			protos::OffboardRequest {
607				amount: r.amount.to_sat(),
608				offboard_spk: r.script_pubkey.to_bytes(),
609			}
610		}).collect(),
611	}).await.context("Ark server refused our payment submission")?;
612
613	Ok(AttemptState::AwaitingUnsignedVtxoTree {
614		cosign_keys: cosign_keys,
615		secret_nonces: cosign_nonces.into_iter()
616			.map(|(sec, _pub)| sec.into_iter()
617				.map(DangerousSecretNonce::dangerous_from_secret_nonce)
618				.collect())
619			.collect(),
620	})
621}
622
623async fn progress_attempt(
624	state: &AttemptState,
625	wallet: &Wallet,
626	part: &RoundParticipation,
627	event: &RoundEvent,
628) -> AttemptProgressResult {
629	// we will match only the states and messages required to make progress,
630	// all else we ignore, except an unexpected finish
631
632	match (state, event) {
633
634		(
635			AttemptState::AwaitingUnsignedVtxoTree { cosign_keys, secret_nonces },
636			RoundEvent::VtxoProposal(e),
637		) => {
638			match sign_vtxo_tree(
639				wallet, part, &cosign_keys, &secret_nonces, &e.unsigned_round_tx, &e.vtxos_spec, &e.cosign_agg_nonces,
640			).await {
641				Ok(()) => {
642					AttemptProgressResult::Updated {
643						new_state: AttemptState::AwaitingRoundProposal {
644							unsigned_round_tx: e.unsigned_round_tx.clone(),
645							vtxos_spec: e.vtxos_spec.clone(),
646						},
647						new_unconfirmed_round: None,
648					}
649				},
650				Err(e) => AttemptProgressResult::Failed(e),
651			}
652		},
653
654		(
655			AttemptState::AwaitingRoundProposal { unsigned_round_tx, vtxos_spec },
656			RoundEvent::RoundProposal(e),
657		) => {
658			match sign_forfeits(
659				wallet, part, unsigned_round_tx, vtxos_spec, &e.cosign_sigs, &e.forfeit_nonces, e.connector_pubkey,
660			).await {
661				Ok((new_vtxos, forfeit_sigs)) => {
662					let round = UnconfirmedRound::new(unsigned_round_tx.clone(), new_vtxos.clone());
663					match submit_forfeit_sigs(wallet, forfeit_sigs).await {
664						Ok(()) => AttemptProgressResult::Updated {
665							new_state: AttemptState::AwaitingFinishedRound {
666								unsigned_round_tx: unsigned_round_tx.clone(),
667								new_vtxos: new_vtxos,
668							},
669							new_unconfirmed_round: Some(round),
670						},
671						Err(e) => {
672							warn!("Error sending forfeit sigs to server: {:#}", e);
673							AttemptProgressResult::Updated {
674								new_state: AttemptState::AwaitingAttempt,
675								new_unconfirmed_round: Some(round),
676							}
677						},
678					}
679				},
680				Err(e) => AttemptProgressResult::Failed(e),
681			}
682		},
683
684		(
685			AttemptState::AwaitingFinishedRound { unsigned_round_tx, new_vtxos },
686			RoundEvent::Finished(RoundFinished { signed_round_tx, .. }),
687		) => {
688			if unsigned_round_tx.compute_txid() != signed_round_tx.compute_txid() {
689				return AttemptProgressResult::Failed(anyhow!(
690					"signed funding tx ({}) doesn't match tx received before ({})",
691					signed_round_tx.compute_txid(), unsigned_round_tx.compute_txid(),
692				));
693			}
694
695			AttemptProgressResult::Finished {
696				signed_round_tx: signed_round_tx.clone(),
697				vtxos: new_vtxos.clone(),
698			}
699		},
700
701		// unexpected finish
702		(state, RoundEvent::Finished(RoundFinished { .. })) => {
703			AttemptProgressResult::Failed(anyhow!(
704				"unexpectedly received a finished round while we were in state {}",
705				state.kind(),
706			))
707		},
708
709		(state, _) => {
710			trace!("Ignoring round event {} in state {}", event.kind(), state.kind());
711			AttemptProgressResult::NotUpdated
712		},
713	}
714}
715
716async fn sign_vtxo_tree(
717	wallet: &Wallet,
718	participation: &RoundParticipation,
719	cosign_keys: &[Keypair],
720	secret_nonces: &[impl AsRef<[DangerousSecretNonce]>],
721	unsigned_round_tx: &Transaction,
722	vtxo_tree: &VtxoTreeSpec,
723	cosign_agg_nonces: &[musig::AggregatedNonce],
724) -> anyhow::Result<()> {
725	let srv = wallet.require_server().context("server not available")?;
726
727	if unsigned_round_tx.output.len() < MIN_ROUND_TX_OUTPUTS {
728		bail!("server sent round tx with less than 2 outputs: {}",
729			serialize_hex(&unsigned_round_tx),
730		);
731	}
732
733	let vtxos_utxo = OutPoint::new(unsigned_round_tx.compute_txid(), ROUND_TX_VTXO_TREE_VOUT);
734
735	let my_vtxos = participation.outputs.iter().zip(cosign_keys.iter())
736		.map(|(r, k)| SignedVtxoRequest {
737			vtxo: r.clone(),
738			cosign_pubkey: Some(k.public_key()),
739		})
740		.collect::<Vec<_>>();
741
742	// Check that the proposal contains our inputs.
743	{
744		let mut my_vtxos = participation.outputs.iter().collect::<Vec<_>>();
745		for vtxo_req in vtxo_tree.iter_vtxos() {
746			if let Some(i) = my_vtxos.iter().position(|v| {
747				v.policy == vtxo_req.vtxo.policy && v.amount == vtxo_req.vtxo.amount
748			}) {
749				my_vtxos.swap_remove(i);
750			}
751		}
752		if !my_vtxos.is_empty() {
753			bail!("server didn't include all of our vtxos, missing: {:?}", my_vtxos);
754		}
755
756		let mut my_offbs = participation.offboards.to_vec();
757		for offb in unsigned_round_tx.output.iter().skip(2) {
758			if let Some(i) = my_offbs.iter().position(|o| o.to_txout() == *offb) {
759				my_offbs.swap_remove(i);
760			}
761		}
762		if !my_offbs.is_empty() {
763			bail!("server didn't include all of our offboards, missing: {:?}", my_offbs);
764		}
765	}
766
767	// Make vtxo signatures from top to bottom, just like sighashes are returned.
768	let unsigned_vtxos = vtxo_tree.clone().into_unsigned_tree(vtxos_utxo);
769	let iter = my_vtxos.iter().zip(cosign_keys).zip(secret_nonces);
770	let _ = try_join_all(iter.map(|((req, key), sec)| async {
771		let leaf_idx = unsigned_vtxos.spec.leaf_idx_of(req).expect("req included");
772		let secret_nonces = sec.as_ref().iter().map(|s| s.to_sec_nonce()).collect();
773		let part_sigs = unsigned_vtxos.cosign_branch(
774			&cosign_agg_nonces, leaf_idx, key, secret_nonces,
775		).context("failed to cosign branch: our request not part of tree")?;
776
777		info!("Sending {} partial vtxo cosign signatures for pk {}",
778			part_sigs.len(), key.public_key(),
779		);
780
781		let _ = srv.clone().client.provide_vtxo_signatures(protos::VtxoSignaturesRequest {
782			pubkey: key.public_key().serialize().to_vec(),
783			signatures: part_sigs.iter().map(|s| s.serialize().to_vec()).collect(),
784		}).await.context("error sending vtxo signatures")?;
785		Result::<(), anyhow::Error>::Ok(())
786	})).await.context("error sending VTXO signatures")?;
787
788	Ok(())
789}
790
791/// Sign the forfeit signatures but doesn't submit them yet
792async fn sign_forfeits(
793	wallet: &Wallet,
794	participation: &RoundParticipation,
795	unsigned_round_tx: &Transaction,
796	vtxo_tree: &VtxoTreeSpec,
797	vtxo_cosign_sigs: &[schnorr::Signature],
798	forfeit_nonces: &HashMap<VtxoId, Vec<musig::PublicNonce>>,
799	connector_pubkey: PublicKey,
800) -> anyhow::Result<(Vec<Vtxo>, HashMap<VtxoId, Vec<(musig::PublicNonce, musig::PartialSignature)>>)> {
801	let srv = wallet.require_server().context("server not available")?;
802	let ark_info = srv.ark_info().await?;
803
804	let round_txid = unsigned_round_tx.compute_txid();
805	let vtxos_utxo = OutPoint::new(round_txid, ROUND_TX_VTXO_TREE_VOUT);
806	let vtxo_tree = vtxo_tree.clone().into_unsigned_tree(vtxos_utxo);
807
808	// Validate the vtxo tree and cosign signatures.
809	if vtxo_tree.verify_cosign_sigs(&vtxo_cosign_sigs).is_err() {
810		// bad server!
811		bail!("Received incorrect vtxo cosign signatures from server");
812	}
813
814	let signed_vtxos = vtxo_tree.into_signed_tree(vtxo_cosign_sigs.to_vec());
815
816	// Check that the connector key is correct.
817	let conn_txout = unsigned_round_tx.output.get(ROUND_TX_CONNECTOR_VOUT as usize)
818		.expect("checked before");
819	let expected_conn_txout = ConnectorChain::output(forfeit_nonces.len(), connector_pubkey);
820	if *conn_txout != expected_conn_txout {
821		bail!("round tx from server has unexpected connector output: {:?} (expected {:?})",
822			conn_txout, expected_conn_txout,
823		);
824	}
825
826	let conns_utxo = OutPoint::new(round_txid, ROUND_TX_CONNECTOR_VOUT);
827
828	// Make forfeit signatures.
829	let connectors = ConnectorChain::new(
830		forfeit_nonces.values().next().unwrap().len(),
831		conns_utxo,
832		connector_pubkey,
833	);
834
835	let forfeit_sigs = participation.inputs.iter().map(|vtxo| {
836		let keypair = wallet.get_vtxo_key(&vtxo)?;
837
838		let sigs = connectors.connectors().enumerate().map(|(i, (conn, _))| {
839			let (sighash, _tx) = ark::forfeit::connector_forfeit_sighash_exit(
840				vtxo, conn, connector_pubkey,
841			);
842			let srv_nonce = forfeit_nonces.get(&vtxo.id())
843				.with_context(|| format!("missing srv forfeit nonce for {}", vtxo.id()))?
844				.get(i)
845				.context("srv didn't provide enough forfeit nonces")?;
846
847			let (nonce, sig) = musig::deterministic_partial_sign(
848				&keypair,
849				[ark_info.server_pubkey],
850				&[srv_nonce],
851				sighash.to_byte_array(),
852				Some(vtxo.output_taproot().tap_tweak().to_byte_array()),
853			);
854			Ok((nonce, sig))
855		}).collect::<anyhow::Result<Vec<_>>>()?;
856
857		Ok((vtxo.id(), sigs))
858	})
859		.collect::<anyhow::Result<HashMap<_, _>>>()
860		.context("error signing forfeits")?;
861
862	let signed_vtxos = signed_vtxos.into_cached_tree();
863
864	let mut expected_vtxos = participation.outputs.iter().collect::<Vec<_>>();
865	let total_nb_expected_vtxos = expected_vtxos.len();
866
867	let mut new_vtxos = vec![];
868	for (idx, req) in signed_vtxos.spec.spec.vtxos.iter().enumerate() {
869		if let Some(expected_idx) = expected_vtxos.iter().position(|r| **r == req.vtxo) {
870			let vtxo = signed_vtxos.build_vtxo(idx).expect("correct leaf idx");
871
872			// validate the received vtxos
873			// This is more like a sanity check since we crafted them ourselves.
874			vtxo.validate(&unsigned_round_tx)
875				.context("constructed invalid vtxo from tree")?;
876
877			info!("New VTXO from round: {} ({}, {})",
878				vtxo.id(), vtxo.amount(), vtxo.policy_type(),
879			);
880
881			new_vtxos.push(vtxo);
882			expected_vtxos.swap_remove(expected_idx);
883		}
884	}
885
886	if !expected_vtxos.is_empty() {
887		if expected_vtxos.len() == total_nb_expected_vtxos {
888			// we must have done something wrong
889			bail!("None of our VTXOs were present in round!");
890		} else {
891			bail!("Server included some of our VTXOs but not all: {} missing: {:?}",
892				expected_vtxos.len(), expected_vtxos,
893			);
894		}
895	}
896	Ok((new_vtxos, forfeit_sigs))
897}
898
899async fn submit_forfeit_sigs(
900	wallet: &Wallet,
901	forfeit_sigs: HashMap<VtxoId, Vec<(musig::PublicNonce, musig::PartialSignature)>>,
902) -> anyhow::Result<()> {
903	let mut srv = wallet.require_server().context("server not available")?;
904
905	debug!("Sending {} sets of forfeit signatures for our inputs", forfeit_sigs.len());
906	srv.client.provide_forfeit_signatures(protos::ForfeitSignaturesRequest {
907		signatures: forfeit_sigs.into_iter().map(|(id, sigs)| {
908			protos::ForfeitSignatures {
909				input_vtxo_id: id.to_bytes().to_vec(),
910				pub_nonces: sigs.iter().map(|s| s.0.serialize().to_vec()).collect(),
911				signatures: sigs.iter().map(|s| s.1.serialize().to_vec()).collect(),
912			}
913		}).collect(),
914	}).await.context("failed to submit forfeit signatures")?;
915
916	Ok(())
917}
918
919//TODO(stevenroose) should be made idempotent
920async fn persist_round_success(
921	wallet: &Wallet,
922	participation: &RoundParticipation,
923	movement_id: Option<MovementId>,
924	new_vtxos: &[Vtxo],
925	signed_round_tx: &Transaction,
926) -> anyhow::Result<()> {
927	debug!("Persisting newly finished round. {} new vtxos, {} offboards, movement ID {:?}",
928		new_vtxos.len(), participation.offboards.len(), movement_id,
929	);
930
931	let store_result = wallet.store_spendable_vtxos(new_vtxos);
932	let spent_result = wallet.mark_vtxos_as_spent(&participation.inputs);
933	let update_result = if let Some(movement_id) = movement_id {
934		wallet.movements.update_movement(movement_id, MovementUpdate::new()
935			.produced_vtxos(new_vtxos)
936			.metadata([("funding_txid".into(), serde_json::to_value(signed_round_tx.compute_txid())?)])
937		).await
938	} else {
939		Ok(())
940	};
941	match (store_result, spent_result, update_result) {
942		(Ok(()), Ok(()), Ok(())) => Ok(()),
943		(Err(e), _, _) => Err(e),
944		(_, Err(e), _) => Err(e),
945		(_, _, Err(e)) => Err(anyhow!(
946			"Failed to update movement after round success: {:#}", e
947		)),
948	}
949}
950
951//TODO(stevenroose) should be made idempotent
952async fn persist_round_failure(
953	wallet: &Wallet,
954	participation: &RoundParticipation,
955	movement_id: Option<MovementId>,
956) -> anyhow::Result<()> {
957	debug!("Attempting to persist the failure of a round with the movement ID {:?}", movement_id);
958	let unlock_result = wallet.unlock_vtxos(&participation.inputs);
959	let finish_result = if let Some(movement_id) = movement_id {
960		wallet.movements.finish_movement(movement_id, MovementStatus::Failed).await
961	} else {
962		Ok(())
963	};
964	if let Err(e) = &finish_result {
965		error!("Failed to mark movement as failed: {:#}", e);
966	}
967	match (unlock_result, finish_result) {
968		(Ok(()), Ok(())) => Ok(()),
969		(Err(e), _) => Err(e),
970		(_, Err(e)) => Err(anyhow!("Failed to mark movement as failed: {:#}", e)),
971	}
972}
973
974async fn update_funding_txid(
975	funding_txid: Txid,
976	movement_id: MovementId,
977	wallet: &Wallet,
978) -> anyhow::Result<()> {
979	wallet.movements.update_movement(
980		movement_id,
981		MovementUpdate::new()
982			.metadata([("funding_txid".into(), serde_json::to_value(&funding_txid)?)])
983	).await.context("Unable to update funding txid of round")
984}
985
986/// Track any round for which we signed forfeit txs
987///
988/// Any round for which we signed forfeit txs will be tracked in an object like this.
989/// Both when the round finished successfully or not. The funding tx in this object
990/// can thus be unsigned.
991#[derive(Debug)]
992pub struct UnconfirmedRound {
993	/// This round tx is not necessarily signed
994	pub(crate) funding_tx: Transaction,
995	pub(crate) new_vtxos: Vec<Vtxo>,
996
997	// Some information for double spend detection
998
999	/// A txid that double spends each input of the round tx
1000	pub(crate) double_spenders: Vec<Option<Txid>>,
1001
1002	/// The time at which we first noticed we got double spent
1003	///
1004	/// We use this when the user is using bitcoind because in bitcoind it's
1005	/// impossible to detect which txs spend a UTXO. So in order to detect
1006	/// whether our tx is double spend, we will just abort the round
1007	/// if we are out of the mempool for double the expected time to be
1008	/// deeply double spent.
1009	pub(crate) first_double_spent_at: Option<SystemTime>,
1010}
1011
1012#[derive(Debug, Clone, PartialEq, Eq)]
1013pub(crate) enum UnconfirmedRoundStatus {
1014	/// The round's funding tx confirmed deeply
1015	Confirmed,
1016	/// The round's funding tx was double spent deeply
1017	DoubleSpent {
1018		/// We can't always know the double spender
1019		double_spender: Option<Txid>,
1020	},
1021	Unconfirmed,
1022}
1023
1024impl UnconfirmedRound {
1025	/// Create a new instance of the [AwaitingConfirmation] state for
1026	/// a round that was synced from the server, but we have lost context for.
1027	pub fn new(
1028		funding_tx: Transaction,
1029		new_vtxos: Vec<Vtxo>,
1030	) -> Self {
1031		UnconfirmedRound {
1032			new_vtxos: new_vtxos,
1033			double_spenders: vec![None; funding_tx.input.len()],
1034			funding_tx: funding_tx,
1035			first_double_spent_at: None,
1036		}
1037	}
1038
1039	pub fn funding_txid(&self) -> Txid {
1040		self.funding_tx.compute_txid()
1041	}
1042
1043	/// Whether we have a signed tx
1044	fn is_tx_signed(&self) -> bool {
1045		!self.funding_tx.input.iter().any(|i| i.witness.is_empty())
1046	}
1047
1048	/// Check if our version of the round tx is signed and if not try replace it
1049	async fn maybe_update_tx(&mut self, txid: Txid, chain: &ChainSource) {
1050		if !self.is_tx_signed() {
1051			if let Ok(Some(tx)) = chain.get_tx(&txid).await {
1052				assert_eq!(txid, tx.compute_txid());
1053				debug!("Retrieved signed version of round tx {}", txid);
1054				self.funding_tx = tx;
1055			}
1056		}
1057	}
1058
1059	/// Check if the round tx was double spent and if so
1060	/// returns a UnconfirmedRoundStatus::DoubleSpent.
1061	async fn check_if_double_spent(
1062		&mut self,
1063		wallet: &Wallet,
1064	) -> anyhow::Result<Option<UnconfirmedRoundStatus>> {
1065
1066		let round_txid = self.funding_txid();
1067		match wallet.chain.inner() {
1068			ChainSourceClient::Esplora(c) => {
1069				let mut confirmed = None;
1070				for (idx, input) in self.funding_tx.input.iter().enumerate() {
1071					if let Some(txid) = self.double_spenders[idx] {
1072						match wallet.chain.tx_status(txid).await? {
1073							TxStatus::Confirmed(b) => {
1074								confirmed = cmp::max(confirmed, Some((b.height, txid)));
1075								continue;
1076							},
1077							TxStatus::Mempool => continue,
1078							TxStatus::NotFound => self.double_spenders[idx] = None,
1079						}
1080					}
1081
1082					let info = c.get_output_status(
1083						&input.previous_output.txid, input.previous_output.vout as u64,
1084					).await?;
1085					match info {
1086						None => warn!("Input {} of round tx {} not found by chain source",
1087							input.previous_output, round_txid,
1088						),
1089						Some(info) => {
1090							if !info.spent || info.txid == Some(round_txid) {
1091								continue;
1092							}
1093
1094							let txid = info.txid.context("expected txid")?;
1095							self.double_spenders[idx] = Some(txid);
1096							let status = info.status.context("expected status")?;
1097							if let Some(height) = status.block_height {
1098								// NB we rely on Ord impl that sorts tuples first by first item
1099								confirmed = cmp::max(confirmed, Some((height, txid)));
1100							}
1101						},
1102					}
1103				}
1104
1105				if let Some((height, txid)) = confirmed {
1106					let confirmations = wallet.chain.tip().await? - (height - 1);
1107					if confirmations >= wallet.config.round_tx_required_confirmations {
1108						return Ok(Some(UnconfirmedRoundStatus::DoubleSpent {
1109							double_spender: Some(txid),
1110						}));
1111					}
1112					debug!("Round tx {} double spent by tx {} with {} confirmations",
1113						round_txid, txid, confirmations,
1114					);
1115				}
1116
1117				Ok(None)
1118			},
1119			ChainSourceClient::Bitcoind(b) => {
1120				// check whether our round tx is double spent
1121				let mut doublespent = false;
1122				for inp in &self.funding_tx.input {
1123					let OutPoint { txid, vout } = inp.previous_output;
1124					if b.get_tx_out(&txid, vout, Some(false))?.is_none() {
1125						doublespent = true;
1126						break;
1127					}
1128				}
1129
1130				if doublespent {
1131					let now = SystemTime::now();
1132					let since = self.first_double_spent_at.get_or_insert(now);
1133					let req_confs = wallet.config.round_tx_required_confirmations;
1134					//TODO(stevenroose) maybe also do 5 days?
1135					let req_time = 2 * req_confs * BLOCK_TIME;
1136					if let Ok(time) = now.duration_since(*since) && time > req_time {
1137						return Ok(Some(UnconfirmedRoundStatus::DoubleSpent {
1138							double_spender: None,
1139						}));
1140					}
1141				} else {
1142					self.first_double_spent_at.take();
1143				}
1144
1145				Ok(None)
1146			},
1147		}
1148	}
1149
1150	// NB we must never restart again from here
1151	pub(crate) async fn sync(
1152		&mut self,
1153		wallet: &Wallet,
1154	) -> anyhow::Result<UnconfirmedRoundStatus> {
1155		let txid = self.funding_txid();
1156		match wallet.chain.tx_status(txid).await? {
1157			TxStatus::NotFound => {
1158				debug!("Round funding tx {} no longer found in mempool", txid);
1159				if let Some(res) = self.check_if_double_spent(wallet).await? {
1160					return Ok(res);
1161				}
1162				if self.is_tx_signed() {
1163					// try to broadcast
1164					let _ = wallet.chain.broadcast_tx(&self.funding_tx).await;
1165				}
1166				Ok(UnconfirmedRoundStatus::Unconfirmed)
1167			},
1168			TxStatus::Mempool => {
1169				debug!("Round funding tx {} still in mempool, waiting for confirmations", txid);
1170				self.first_double_spent_at = None;
1171				self.maybe_update_tx(txid, &wallet.chain).await;
1172				Ok(UnconfirmedRoundStatus::Unconfirmed)
1173			},
1174			TxStatus::Confirmed(block) => {
1175				self.first_double_spent_at = None;
1176				self.maybe_update_tx(txid, &wallet.chain).await;
1177				let confirmations = {
1178					let tip = wallet.chain.tip().await?;
1179					tip - block.height + 1
1180				};
1181				debug!("Round funding tx {} has {} confirmations", txid, confirmations);
1182
1183				if confirmations >= wallet.config.round_tx_required_confirmations {
1184					//TODO(stevenroose) ensure vtxos are created
1185					// we currently make the movement when the round finishes, and
1186					// we currently don't have a way to not accidentally do this twice.
1187					// Should probably have some idempotent VTXO state/movement API
1188					// so that here we can call this again in the case of:
1189					// - initial call after finished round failed
1190					// - we recovered from a round we didn't get finish message from
1191					// - we synced a round from when we were offline
1192					Ok(UnconfirmedRoundStatus::Confirmed)
1193				} else {
1194					Ok(UnconfirmedRoundStatus::Unconfirmed)
1195				}
1196			},
1197		}
1198	}
1199}
1200
1201
1202impl Wallet {
1203	/// Start a new round participation
1204	///
1205	/// This function will store the state in the db and mark the VTXOs as locked.
1206	pub async fn join_next_round(
1207		&self,
1208		participation: RoundParticipation,
1209		movement_kind: Option<RoundMovement>,
1210	) -> anyhow::Result<StoredRoundState> {
1211		// verify if our participation makes sense
1212		if let Some(payreq) = participation.outputs.iter().find(|p| p.amount < P2TR_DUST) {
1213			bail!("VTXO amount must be at least {}, requested {}", P2TR_DUST, payreq.amount);
1214		}
1215		if let Some(offb) = participation.offboards.iter().find(|o| o.amount < P2TR_DUST) {
1216			bail!("Offboard amount must be at least {}, requested {}", P2TR_DUST, offb.amount);
1217		}
1218
1219		let movement_id = if let Some(kind) = movement_kind {
1220			let movement_id = self.movements.new_movement(
1221				self.subsystem_ids[&BarkSubsystem::Round], kind.to_string(),
1222			).await?;
1223			let update = participation.to_movement_update(self.chain.network())?;
1224			self.movements.update_movement(movement_id, update).await?;
1225			Some(movement_id)
1226		} else {
1227			None
1228		};
1229		let state = RoundState::new(participation, movement_id);
1230
1231		let id = self.db.store_round_state_lock_vtxos(&state)?;
1232		Ok(StoredRoundState { id, state })
1233	}
1234
1235	/// Get all pending round states
1236	pub fn pending_round_states(&self) -> anyhow::Result<Vec<StoredRoundState>> {
1237		self.db.load_round_states()
1238	}
1239
1240	/// Sync pending rounds that have finished but are waiting for confirmations
1241	pub async fn sync_pending_rounds(&self) -> anyhow::Result<()> {
1242		let states = self.db.load_round_states()?;
1243		if !states.is_empty() {
1244			debug!("Syncing {} pending round states...", states.len());
1245
1246			tokio_stream::iter(states).for_each_concurrent(10, |mut state| async move {
1247				// not processing events here
1248				if state.state.ongoing_participation() {
1249					return;
1250				}
1251
1252				match state.state.sync(self).await {
1253					Ok(RoundStatus::Confirmed { funding_txid }) => {
1254						info!("Round confirmed. Funding tx {}", funding_txid);
1255						if let Err(e) = self.db.remove_round_state(&state) {
1256							warn!("Error removing confirmed round state from db: {:#}", e);
1257						}
1258					},
1259					Ok(RoundStatus::Unconfirmed { funding_txid }) => {
1260						info!("Waiting for confirmations for round funding tx {}", funding_txid);
1261						if let Err(e) = self.db.update_round_state(&state) {
1262							warn!("Error updating pending round state in db: {:#}", e);
1263						}
1264					},
1265					Ok(RoundStatus::Pending { unsigned_funding_txids: txs }) => {
1266						info!("Round still pending, potential funding txs: {:?}", txs);
1267						if let Err(e) = self.db.update_round_state(&state) {
1268							warn!("Error updating pending round state in db: {:#}", e);
1269						}
1270					},
1271					Ok(RoundStatus::Failed { error }) => {
1272						error!("Round failed: {}", error);
1273						if let Err(e) = self.db.remove_round_state(&state) {
1274							warn!("Error removing failed round state from db: {:#}", e);
1275						}
1276					},
1277					Ok(RoundStatus::Canceled) => {
1278						error!("Round canceled");
1279						if let Err(e) = self.db.remove_round_state(&state) {
1280							warn!("Error removing canceled round state from db: {:#}", e);
1281						}
1282					},
1283					Err(e) => warn!("Error syncing round: {:#}", e),
1284				}
1285			}).await;
1286		}
1287
1288		// also sync recovered states if we have any
1289		let recovered = self.db.load_recovered_rounds()?;
1290		if !recovered.is_empty() {
1291			debug!("Syncing {} recovered past rounds...", recovered.len());
1292
1293			tokio_stream::iter(recovered).for_each_concurrent(10, |mut state| async move {
1294				match state.sync(self).await {
1295					Ok(UnconfirmedRoundStatus::Confirmed) => {
1296						info!("Recovered old round with funding txid {} confirmed",
1297							state.funding_txid(),
1298						);
1299						if let Err(e) = self.db.remove_recovered_round(state.funding_txid()) {
1300							warn!("Error removing finished recovered round from db: {:#}", e);
1301						}
1302					},
1303					Ok(UnconfirmedRoundStatus::DoubleSpent { double_spender }) => {
1304						debug!("Old recovered round {} invalidated because double spent by {:?}",
1305							state.funding_txid(), double_spender,
1306						);
1307						if let Err(e) = self.db.remove_recovered_round(state.funding_txid()) {
1308							warn!("Error invalidated recovered round from db: {:#}", e);
1309						}
1310					},
1311					Ok(UnconfirmedRoundStatus::Unconfirmed) => {},
1312					Err(e) => debug!("Error trying to progress recovered past round: {:#}", e),
1313				}
1314			}).await;
1315		}
1316
1317		Ok(())
1318	}
1319
1320	/// Fetch last round event from server
1321	async fn get_last_round_event(&self) -> anyhow::Result<RoundEvent> {
1322		let mut srv = self.require_server()?;
1323		let e = srv.client.last_round_event(protos::Empty {}).await?.into_inner();
1324		Ok(RoundEvent::try_from(e).context("invalid event format from server")?)
1325	}
1326
1327	async fn inner_process_event(
1328		&self,
1329		states: impl IntoIterator<Item = &mut StoredRoundState>,
1330		event: Option<&RoundEvent>,
1331	) {
1332		tokio_stream::iter(states).for_each_concurrent(3, |state| async move {
1333			if let Some(event) = event && state.state.ongoing_participation() {
1334				let updated = state.state.process_event(self, &event).await;
1335				if updated {
1336					if let Err(e) = self.db.update_round_state(&state) {
1337						error!("Error storing round state #{} after progress: {:#}", state.id, e);
1338					}
1339				}
1340			}
1341
1342			match state.state.sync(self).await {
1343				Err(e) => warn!("Error syncing round #{}: {:#}", state.id, e),
1344				Ok(s) if s.is_final() => {
1345					info!("Round #{} finished with result: {:?}", state.id, s);
1346					if let Err(e) = self.db.remove_round_state(&state) {
1347						warn!("Failed to remove finished round #{} from db: {:#}", state.id, e);
1348					}
1349				},
1350				Ok(s) => {
1351					trace!("Round state #{} is now in state {:?}", state.id, s);
1352					if let Err(e) = self.db.update_round_state(&state) {
1353						warn!("Error storing round state #{}: {:#}", state.id, e);
1354					}
1355				},
1356			}
1357		}).await;
1358	}
1359
1360	/// Try to make incremental progress on all pending round states
1361	///
1362	/// If the `last_round_event` argument is not provided, it will be fetched
1363	/// from the server.
1364	pub async fn progress_pending_rounds(
1365		&self,
1366		last_round_event: Option<&RoundEvent>,
1367	) -> anyhow::Result<()> {
1368		let mut states = self.db.load_round_states()?;
1369		info!("Processing {} rounds...", states.len());
1370
1371		let mut last_round_event = last_round_event.map(|e| Cow::Borrowed(e));
1372		if states.iter().any(|s| s.state.ongoing_participation()) && last_round_event.is_none() {
1373			match self.get_last_round_event().await {
1374				Ok(e) => last_round_event = Some(Cow::Owned(e)),
1375				Err(e) => {
1376					warn!("Error fetching round event, \
1377						failed to progress ongoing rounds: {:#}", e);
1378				},
1379			}
1380		}
1381
1382		let event = last_round_event.as_ref().map(|c| c.as_ref());
1383		self.inner_process_event(states.iter_mut(), event).await;
1384
1385		Ok(())
1386	}
1387
1388	pub async fn subscribe_round_events(&self)
1389		-> anyhow::Result<impl Stream<Item = anyhow::Result<RoundEvent>> + Unpin>
1390	{
1391		let mut srv = self.require_server()?;
1392		let events = srv.client.subscribe_rounds(protos::Empty {}).await?
1393			.into_inner().map(|m| {
1394				let m = m.context("received error on event stream")?;
1395				let e = RoundEvent::try_from(m.clone())
1396					.with_context(|| format!("error converting rpc round event: {:?}", m))?;
1397				trace!("Received round event: {}", e);
1398				Ok::<_, anyhow::Error>(e)
1399			});
1400		Ok(events)
1401	}
1402
1403	/// A blocking call that will try to perform a full round participation
1404	/// for all ongoing rounds
1405	///
1406	/// Returns only once a round has happened on the server.
1407	pub async fn participate_ongoing_rounds(&self) -> anyhow::Result<()> {
1408		let mut states = self.db.load_round_states()?;
1409		states.retain(|s| s.state.ongoing_participation());
1410
1411		if states.is_empty() {
1412			info!("No pending round states");
1413			return Ok(());
1414		}
1415
1416		let mut events = self.subscribe_round_events().await?;
1417
1418		info!("Participating with {} round states...", states.len());
1419
1420		loop {
1421			let event = events.next().await
1422				.context("events stream broke")?
1423				.context("error on event stream")?;
1424
1425			self.inner_process_event(states.iter_mut(), Some(&event)).await;
1426
1427			states.retain(|s| s.state.ongoing_participation());
1428			if states.is_empty() {
1429				info!("All rounds handled");
1430				return Ok(());
1431			}
1432		}
1433	}
1434
1435	/// Will cancel all pending rounds that can safely be canceled
1436	///
1437	/// All rounds that have not started yet can safely be canceled,
1438	/// as well as rounds where we have not yet signed any forfeit txs.
1439	pub async fn cancel_all_pending_rounds(&self) -> anyhow::Result<()> {
1440		let states = self.db.load_round_states()?;
1441		for mut state in states {
1442			match state.state.try_cancel(self).await {
1443				Ok(true) => {
1444					if let Err(e) = self.db.remove_round_state(&state) {
1445						warn!("Error removing canceled round state from db: {:#}", e);
1446					}
1447				},
1448				Ok(false) => {},
1449				Err(e) => warn!("Error trying to cancel round #{}: {:#}", state.id, e),
1450			}
1451		}
1452		Ok(())
1453	}
1454
1455	/// Try to cancel the given round
1456	pub async fn cancel_pending_round(&self, id: RoundStateId) -> anyhow::Result<()> {
1457		let states = self.db.load_round_states()?;
1458		for mut state in states {
1459			if state.id != id {
1460				continue;
1461			}
1462
1463			if state.state.try_cancel(self).await.context("failed to cancel round")? {
1464				self.db.remove_round_state(&state)
1465					.context("error removing canceled round state from db")?;
1466			} else {
1467				bail!("failed to cancel round");
1468			}
1469			return Ok(());
1470		}
1471		bail!("round not found")
1472	}
1473
1474	/// Participate in a round
1475	///
1476	/// This function will start a new round participation and block until
1477	/// the round is finished.
1478	/// After this method returns the round state will be kept active until
1479	/// the round tx fully confirms.
1480	pub(crate) async fn participate_round(
1481		&self,
1482		participation: RoundParticipation,
1483		movement_kind: Option<RoundMovement>,
1484	) -> anyhow::Result<RoundStatus> {
1485		let mut state = self.join_next_round(participation, movement_kind).await?;
1486
1487		info!("Waiting for a round start...");
1488		let mut events = self.subscribe_round_events().await?;
1489
1490		loop {
1491			if !state.state.ongoing_participation() {
1492				return Ok(state.state.sync(self).await?);
1493			}
1494
1495			let event = events.next().await
1496				.context("events stream broke")?
1497				.context("error on event stream")?;
1498			if state.state.process_event(self, &event).await {
1499				self.db.update_round_state(&state)?;
1500			}
1501		}
1502	}
1503
1504	/// Look for past rounds that might contain some of our VTXOs
1505	///
1506	/// Afterwards, call [Wallet::sync_pending_rounds] to make progress on these.
1507	pub async fn start_sync_past_rounds(&self) -> anyhow::Result<()> {
1508		let mut srv = self.require_server()?;
1509
1510		let fresh_rounds = srv.client.get_fresh_rounds(protos::FreshRoundsRequest {
1511			last_round_txid: None,
1512		}).await?.into_inner().txids.into_iter()
1513			.map(|txid| RoundId::from_slice(&txid))
1514			.collect::<Result<Vec<_>, _>>()?;
1515
1516		if fresh_rounds.is_empty() {
1517			debug!("No new rounds to sync");
1518			return Ok(());
1519		}
1520
1521		debug!("Received {} new rounds from ark", fresh_rounds.len());
1522
1523		let last_pk_index = self.db.get_last_vtxo_key_index()?.unwrap_or_default();
1524		let pubkeys = (0..=last_pk_index).map(|idx| {
1525			self.vtxo_seed.derive_keypair(idx).public_key()
1526		}).collect::<HashSet<_>>();
1527
1528		let pending_states = Arc::new(self.db.load_recovered_rounds()?.into_iter()
1529			.map(|s| (s.funding_txid(), s))
1530			.collect::<HashMap<_, _>>());
1531
1532		let results = tokio_stream::iter(fresh_rounds).map(|round_id| {
1533			let pubkeys = pubkeys.clone();
1534			let mut srv = srv.clone();
1535			let pending_states = pending_states.clone();
1536
1537			async move {
1538				//TODO(stevenroose) detect if we already are aware
1539				if pending_states.contains_key(&round_id.as_round_txid()) {
1540					debug!("Skipping round {} because it already exists", round_id);
1541					return Ok::<_, anyhow::Error>(());
1542				}
1543
1544				let req = protos::RoundId {
1545					txid: round_id.as_round_txid().to_byte_array().to_vec(),
1546				};
1547				let round = srv.client.get_round(req).await?.into_inner();
1548
1549				let tree = SignedVtxoTreeSpec::deserialize(&round.signed_vtxos)
1550					.context("invalid signed vtxo tree from srv")?
1551					.into_cached_tree();
1552
1553				let mut reqs = Vec::new();
1554				let mut vtxos = vec![];
1555				for (idx, dest) in tree.spec.spec.vtxos.iter().enumerate() {
1556					if pubkeys.contains(&dest.vtxo.policy.user_pubkey()) {
1557						let vtxo = tree.build_vtxo(idx).expect("correct leaf idx");
1558
1559						if self.db.get_wallet_vtxo(vtxo.id())?.is_none() {
1560							debug!("Built new vtxo {} with value {}", vtxo.id(), vtxo.amount());
1561							reqs.push(dest.vtxo.clone());
1562							vtxos.push(vtxo);
1563						} else {
1564							debug!("Not adding vtxo {} because it already exists", vtxo.id());
1565						}
1566					}
1567				}
1568
1569				let round_tx = deserialize::<Transaction>(&round.funding_tx)?;
1570
1571				let state = UnconfirmedRound::new(round_tx, vtxos);
1572				self.db.store_recovered_round(&state)?;
1573
1574				Ok(())
1575			}
1576		})
1577			.buffer_unordered(10)
1578			.collect::<Vec<_>>()
1579			.await;
1580
1581		for result in results {
1582			if let Err(e) = result {
1583				return Err(e).context("failed to sync round");
1584			}
1585		}
1586
1587		Ok(())
1588	}
1589}