bark/round/
mod.rs

1//!
2//! Round State Machine
3//!
4
5mod lock;
6
7pub(crate) use lock::{RoundStateGuard, RoundStateLockIndex};
8
9use std::iter;
10use std::borrow::Cow;
11use std::convert::Infallible;
12use std::time::{Duration, SystemTime, UNIX_EPOCH};
13
14use anyhow::Context;
15use ark::vtxo::VtxoValidationError;
16use bdk_esplora::esplora_client::Amount;
17use bip39::rand;
18use bitcoin::{OutPoint, SignedAmount, Transaction, Txid};
19use bitcoin::consensus::encode::{deserialize, serialize_hex};
20use bitcoin::hashes::Hash;
21use bitcoin::hex::DisplayHex;
22use bitcoin::key::Keypair;
23use bitcoin::secp256k1::schnorr;
24use futures::future::{join_all, try_join_all};
25use futures::{Stream, StreamExt};
26use log::{debug, error, info, trace, warn};
27
28use ark::{ProtocolEncoding, SignedVtxoRequest, Vtxo, VtxoRequest};
29use ark::vtxo::Full;
30use ark::forfeit::HashLockedForfeitBundle;
31use ark::musig::{self, DangerousSecretNonce, PublicNonce, SecretNonce};
32use ark::rounds::{
33	RoundAttempt, RoundEvent, RoundFinished, RoundSeq, MIN_ROUND_TX_OUTPUTS,
34	ROUND_TX_VTXO_TREE_VOUT,
35};
36use ark::tree::signed::{LeafVtxoCosignContext, UnlockHash, VtxoTreeSpec};
37use bitcoin_ext::TxStatus;
38use server_rpc::{protos, ServerConnection, TryFromBytes};
39
40use crate::{SECP, Wallet, WalletVtxo};
41use crate::movement::{MovementId, MovementStatus};
42use crate::movement::update::MovementUpdate;
43use crate::persist::models::{RoundStateId, StoredRoundState, Unlocked};
44use crate::subsystem::{RoundMovement, Subsystem};
45
46
47/// The type string for the hArk leaf transition
48const HARK_TRANSITION_KIND: &str = "hash-locked-cosigned";
49
50/// Struct to communicate your specific participation for an Ark round.
51#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct RoundParticipation {
53	#[serde(with = "ark::encode::serde::vec")]
54	pub inputs: Vec<Vtxo<Full>>,
55	/// The output VTXOs that we request in the round,
56	/// including change
57	pub outputs: Vec<VtxoRequest>,
58}
59
60impl RoundParticipation {
61	pub fn to_movement_update(&self) -> anyhow::Result<MovementUpdate> {
62		let input_amount = self.inputs.iter().map(|i| i.amount()).sum::<Amount>();
63		let output_amount = self.outputs.iter().map(|r| r.amount).sum::<Amount>();
64		let fee = input_amount - output_amount;
65		Ok(MovementUpdate::new()
66			.consumed_vtxos(&self.inputs)
67			.intended_balance(SignedAmount::ZERO)
68			.effective_balance( - fee.to_signed()?)
69			.fee(fee)
70		)
71	}
72}
73
74#[derive(Debug, Clone)]
75pub enum RoundStatus {
76	/// The round was successful and is fully confirmed
77	Confirmed {
78		funding_txid: Txid,
79	},
80	/// Round successful but not fully confirmed
81	Unconfirmed {
82		funding_txid: Txid,
83	},
84	/// Round didn't finish yet
85	Pending,
86	/// The round failed
87	Failed {
88		error: String,
89	},
90	/// User canceled the round
91	Canceled,
92}
93
94impl RoundStatus {
95	/// Whether this is the final state and it won't change anymore
96	pub fn is_final(&self) -> bool {
97		match self {
98			Self::Confirmed { .. } => true,
99			Self::Unconfirmed { .. } => false,
100			Self::Pending => false,
101			Self::Failed { .. } => true,
102			Self::Canceled => true,
103		}
104	}
105
106	/// Whether it looks like the round succeeded
107	pub fn is_success(&self) -> bool {
108		match self {
109			Self::Confirmed { .. } => true,
110			Self::Unconfirmed { .. } => true,
111			Self::Pending => false,
112			Self::Failed { .. } => false,
113			Self::Canceled => false,
114		}
115	}
116}
117
118/// State of the progress of a round participation
119///
120/// An instance of this struct is kept all the way from the intention of joining
121/// the next round, until either the round fully confirms or it fails and we are
122/// sure it won't have any effect on our wallet.
123///
124/// As soon as we have signed forfeit txs for the round, we keep track of this
125/// round attempt until we see another attempt we participated in confirm or
126/// we gain confidence that the failed attempt will never confirm.
127//
128//TODO(stevenroose) move the id in here and have the state persist itself with the wallet
129// to have better control. this way we can touch db before we sent forfeit sigs
130pub struct RoundState {
131	/// Round is fully done
132	pub(crate) done: bool,
133
134	/// Our participation in this round
135	pub(crate) participation: RoundParticipation,
136
137	/// The flow of the round in case it is still ongoing with the server
138	pub(crate) flow: RoundFlowState,
139
140	/// The new output vtxos of this round participation
141	///
142	/// After we finish the interactive part, we fill this with the uncompleted
143	/// VTXOs which we then try to complete with the unlock preimage.
144	pub(crate) new_vtxos: Vec<Vtxo<Full>>,
145
146	/// Whether we sent our forfeit signatures to the server
147	///
148	/// If we did this and the server refused to reveal our new VTXOs,
149	/// we will be forced to exit.
150	//TODO(stevenroose) implement exit when this is true and we can't make progress
151	// probably based on the input vtxos becoming close to expiry
152	pub(crate) sent_forfeit_sigs: bool,
153
154	/// The ID of the [Movement] associated with this round
155	pub(crate) movement_id: Option<MovementId>,
156}
157
158impl RoundState {
159	fn new_interactive(
160		participation: RoundParticipation,
161		movement_id: Option<MovementId>,
162	) -> Self {
163		Self {
164			participation,
165			movement_id,
166			flow: RoundFlowState::InteractivePending,
167			new_vtxos: Vec::new(),
168			sent_forfeit_sigs: false,
169			done: false,
170		}
171	}
172
173	fn new_non_interactive(
174		participation: RoundParticipation,
175		unlock_hash: UnlockHash,
176		movement_id: Option<MovementId>,
177	) -> Self {
178		Self {
179			participation,
180			movement_id,
181			flow: RoundFlowState::NonInteractivePending { unlock_hash },
182			new_vtxos: Vec::new(),
183			sent_forfeit_sigs: false,
184			done: false,
185		}
186	}
187
188	/// Our participation in this round
189	pub fn participation(&self) -> &RoundParticipation {
190		&self.participation
191	}
192
193	/// the unlock hash if already known
194	pub fn unlock_hash(&self) -> Option<UnlockHash> {
195		match self.flow {
196			RoundFlowState::NonInteractivePending { unlock_hash } => Some(unlock_hash),
197			RoundFlowState::InteractivePending => None,
198			RoundFlowState::InteractiveOngoing { .. } => None,
199			RoundFlowState::Failed { .. } => None,
200			RoundFlowState::Canceled => None,
201			RoundFlowState::Finished { unlock_hash, .. } => Some(unlock_hash),
202		}
203	}
204
205	pub fn funding_tx(&self) -> Option<&Transaction> {
206		match self.flow {
207			RoundFlowState::NonInteractivePending { .. } => None,
208			RoundFlowState::InteractivePending => None,
209			RoundFlowState::InteractiveOngoing { .. } => None,
210			RoundFlowState::Failed { .. } => None,
211			RoundFlowState::Canceled => None,
212			RoundFlowState::Finished { ref funding_tx, .. } => Some(funding_tx),
213		}
214	}
215
216	/// Whether the interactive part of the round is still ongoing
217	pub fn ongoing_participation(&self) -> bool {
218		match self.flow {
219			RoundFlowState::NonInteractivePending { .. } => false,
220			RoundFlowState::InteractivePending => true,
221			RoundFlowState::InteractiveOngoing { .. } => true,
222			RoundFlowState::Failed { .. } => false,
223			RoundFlowState::Canceled => false,
224			RoundFlowState::Finished { .. } => false,
225		}
226	}
227
228	/// Tries to cancel the round and returns whether it was succesfully canceled
229	/// or if it was already canceled or failed
230	pub async fn try_cancel(&mut self, wallet: &Wallet) -> anyhow::Result<bool> {
231		let ret = match self.flow {
232			RoundFlowState::NonInteractivePending { .. } => todo!("we have to cancel with server!"),
233			RoundFlowState::Canceled => true,
234			RoundFlowState::Failed { .. } => true,
235			RoundFlowState::InteractivePending | RoundFlowState::InteractiveOngoing { .. } => {
236				self.flow = RoundFlowState::Canceled;
237				true
238			},
239			RoundFlowState::Finished { .. } => false,
240		};
241		if ret {
242			persist_round_failure(wallet, &self.participation, self.movement_id).await
243				.context("failed to persist round failure for cancelation")?;
244		}
245		Ok(ret)
246	}
247
248	async fn try_start_attempt(&mut self, wallet: &Wallet, attempt: &RoundAttempt) {
249		match start_attempt(wallet, &self.participation, attempt).await {
250			Ok(state) => {
251				self.flow = RoundFlowState::InteractiveOngoing {
252					round_seq: attempt.round_seq,
253					attempt_seq: attempt.attempt_seq,
254					state: state,
255				};
256			},
257			Err(e) => {
258				self.flow = RoundFlowState::Failed {
259					error: format!("{:#}", e),
260				};
261			},
262		}
263	}
264
265	/// Processes the given event and returns true if some update was made to the state
266	pub async fn process_event(
267		&mut self,
268		wallet: &Wallet,
269		event: &RoundEvent,
270	) -> bool {
271		let _: Infallible = match self.flow {
272			RoundFlowState::InteractivePending => {
273				if let RoundEvent::Attempt(e) = event && e.attempt_seq == 0 {
274					trace!("Joining round attempt {}:{}", e.round_seq, e.attempt_seq);
275					self.try_start_attempt(wallet, e).await;
276					return true;
277				} else {
278					trace!("Ignoring {} event (seq {}:{}), waiting for round to start",
279						event.kind(), event.round_seq(), event.attempt_seq(),
280					);
281					return false;
282				}
283			},
284			RoundFlowState::InteractiveOngoing { round_seq, attempt_seq, ref mut state } => {
285				// here we catch the cases where we're in a wrong flow
286
287				if let RoundEvent::Failed(e) = event && e.round_seq == round_seq {
288					warn!("Round {} failed by server", round_seq);
289					self.flow = RoundFlowState::Failed {
290						error: format!("round {} failed by server", round_seq),
291					};
292					return true;
293				}
294
295				if event.round_seq() > round_seq {
296					// new round started, we don't support multiple parallel rounds,
297					// this means we failed
298					self.flow = RoundFlowState::Failed {
299						error: format!("round {} started while we were on {}",
300							event.round_seq(), round_seq,
301						),
302					};
303					return true;
304				}
305
306				if event.attempt_seq() < attempt_seq {
307					trace!("ignoring replayed message from old attempt");
308					return false;
309				}
310
311				if let RoundEvent::Attempt(e) = event && e.attempt_seq > attempt_seq {
312					trace!("Joining new round attempt {}:{}", e.round_seq, e.attempt_seq);
313					self.try_start_attempt(wallet, e).await;
314					return true;
315				}
316				trace!("Processing event {} for round attempt {}:{} in state {}",
317					event.kind(), round_seq, attempt_seq, state.kind(),
318				);
319
320				return match progress_attempt(state, wallet, &self.participation, event).await {
321					AttemptProgressResult::NotUpdated => false,
322					AttemptProgressResult::Updated { new_state } => {
323						*state = new_state;
324						true
325					},
326					AttemptProgressResult::Failed(e) => {
327						warn!("Round failed with error: {:#}", e);
328						self.flow = RoundFlowState::Failed {
329							error: format!("{:#}", e),
330						};
331						true
332					},
333					AttemptProgressResult::Finished { funding_tx, vtxos, unlock_hash } => {
334						self.new_vtxos = vtxos;
335						let funding_txid = funding_tx.compute_txid();
336						self.flow = RoundFlowState::Finished { funding_tx, unlock_hash };
337						if let Some(mid) = self.movement_id {
338							if let Err(e) = update_funding_txid(wallet, mid, funding_txid).await {
339								warn!("Error updating the round funding txid: {:#}", e);
340							}
341						}
342						true
343					},
344				};
345			},
346			RoundFlowState::NonInteractivePending { .. }
347				| RoundFlowState::Finished { .. }
348				| RoundFlowState::Failed { .. }
349				| RoundFlowState::Canceled => return false,
350		};
351	}
352
353	/// Sync the round's status and return it
354	///
355	/// When success or failure is returned, the round state can be eliminated
356	//TODO(stevenroose) make RoundState manage its own db record
357	pub async fn sync(&mut self, wallet: &Wallet) -> anyhow::Result<RoundStatus> {
358		match self.flow {
359			RoundFlowState::Finished { ref funding_tx, .. } if self.done => {
360				Ok(RoundStatus::Confirmed {
361					funding_txid: funding_tx.compute_txid(),
362				})
363			},
364
365			RoundFlowState::InteractivePending | RoundFlowState::InteractiveOngoing { .. } => {
366				Ok(RoundStatus::Pending)
367			},
368			RoundFlowState::Failed { ref error } => {
369				persist_round_failure(wallet, &self.participation, self.movement_id).await
370					.context("failed to persist round failure")?;
371				Ok(RoundStatus::Failed { error: error.clone() })
372			},
373			RoundFlowState::Canceled => {
374				persist_round_failure(wallet, &self.participation, self.movement_id).await
375					.context("failed to persist round failure")?;
376				Ok(RoundStatus::Canceled)
377			},
378
379			RoundFlowState::NonInteractivePending { unlock_hash } => {
380				match progress_non_interactive(wallet, &self.participation, unlock_hash).await {
381					Ok(HarkProgressResult::RoundPending) => Ok(RoundStatus::Pending),
382					Ok(HarkProgressResult::Ok { funding_tx, new_vtxos }) => {
383						let funding_txid = funding_tx.compute_txid();
384						self.new_vtxos = new_vtxos;
385						self.flow = RoundFlowState::Finished {
386							funding_tx: funding_tx.clone(),
387							unlock_hash: unlock_hash,
388						};
389
390						persist_round_success(
391							wallet,
392							&self.participation,
393							self.movement_id,
394							&self.new_vtxos,
395							&funding_tx,
396						).await.context("failed to store successful round in DB!")?;
397
398						self.done = true;
399
400						Ok(RoundStatus::Confirmed { funding_txid })
401					},
402					Ok(HarkProgressResult::FundingTxUnconfirmed { funding_txid }) => {
403						if let Some(mid) = self.movement_id {
404							update_funding_txid(wallet, mid, funding_txid).await
405								.context("failed to update funding txid in DB")?;
406						}
407						Ok(RoundStatus::Unconfirmed { funding_txid })
408					},
409
410					//TODO(stevenroose) should we mark as failed for these cases?
411
412					Err(HarkForfeitError::Err(e)) => {
413						//TODO(stevenroose) we failed here but we might actualy be able to
414						// succeed if we retry. should we implement some kind of limited
415						// retry after which we mark as failed?
416						Err(e.context("error progressing non-interactive round"))
417					},
418					Err(HarkForfeitError::SentForfeits(e)) => {
419						self.sent_forfeit_sigs = true;
420						Err(e.context("error progressing non-interactive round \
421							after sending forfeit tx signatures"))
422					},
423				}
424			},
425			// interactive part finished, but didn't forfeit yet
426			RoundFlowState::Finished { ref funding_tx, unlock_hash } => {
427				let funding_txid = funding_tx.compute_txid();
428				let confirmed = check_funding_tx_confirmations(
429					wallet, funding_txid, &funding_tx,
430				).await.context("error checking funding tx confirmations")?;
431				if !confirmed {
432					trace!("Funding tx {} not yet deeply enough confirmed", funding_txid);
433					return Ok(RoundStatus::Unconfirmed { funding_txid });
434				}
435
436				match hark_vtxo_swap(
437					wallet, &self.participation, &mut self.new_vtxos, &funding_tx, unlock_hash,
438				).await {
439					Ok(()) => {
440						persist_round_success(
441							wallet,
442							&self.participation,
443							self.movement_id,
444							&self.new_vtxos,
445							&funding_tx,
446						).await.context("failed to store successful round in DB!")?;
447
448						self.done = true;
449
450						Ok(RoundStatus::Confirmed { funding_txid })
451					},
452					Err(HarkForfeitError::Err(e)) => {
453						Err(e.context("error forfeiting VTXOs after round"))
454					},
455					Err(HarkForfeitError::SentForfeits(e)) => {
456						self.sent_forfeit_sigs = true;
457						Err(e.context("error after having signed and sent \
458							forfeit signatures to server"))
459					},
460				}
461			},
462		}
463	}
464
465	/// Once we know the signed round funding tx, this returns the output VTXOs
466	/// for this round.
467	pub fn output_vtxos(&self) -> Option<&[Vtxo<Full>]> {
468		if self.new_vtxos.is_empty() {
469			None
470		} else {
471			Some(&self.new_vtxos)
472		}
473	}
474
475	/// Returns the input VTXOs that are locked in this round, but only
476	/// if no output VTXOs were issued yet.
477	pub fn locked_pending_inputs(&self) -> &[Vtxo<Full>] {
478		//TODO(stevenroose) consider if we can't just drop the state after forfeit exchange
479		match self.flow {
480			RoundFlowState::NonInteractivePending { .. }
481				| RoundFlowState::InteractivePending
482				| RoundFlowState::InteractiveOngoing { .. }
483			=> {
484				&self.participation.inputs
485			},
486			RoundFlowState::Finished { .. } => if self.done {
487				// inputs already unlocked
488				&[]
489			} else {
490				&self.participation.inputs
491			},
492			RoundFlowState::Failed { .. }
493				| RoundFlowState::Canceled
494			=> {
495				// inputs already unlocked
496				&[]
497			},
498		}
499	}
500
501	/// The balance pending in this round
502	///
503	/// This becomes zero once the new round VTXOs are unlocked.
504	pub fn pending_balance(&self) -> Amount {
505		if self.done {
506			return Amount::ZERO;
507		}
508
509		match self.flow {
510			RoundFlowState::NonInteractivePending { .. }
511				| RoundFlowState::InteractivePending
512				| RoundFlowState::InteractiveOngoing { .. }
513				| RoundFlowState::Finished { .. }
514			=> {
515				self.participation.outputs.iter().map(|o| o.amount).sum()
516			},
517			RoundFlowState::Failed { .. } | RoundFlowState::Canceled => {
518				Amount::ZERO
519			},
520		}
521	}
522}
523
524/// The state of the process flow of a round
525///
526/// This tracks the progress of the interactive part of the round, from
527/// waiting to start until finishing either succesfully or with a failure.
528pub enum RoundFlowState {
529	/// We don't do flow and we just wait for the round to finish
530	NonInteractivePending {
531		unlock_hash: UnlockHash,
532	},
533
534	/// Waiting for round to happen
535	InteractivePending,
536	/// Interactive part ongoing
537	InteractiveOngoing {
538		round_seq: RoundSeq,
539		attempt_seq: usize,
540		state: AttemptState,
541	},
542
543	/// Interactive part finished, waiting for confirmation
544	Finished {
545		funding_tx: Transaction,
546		unlock_hash: UnlockHash,
547	},
548
549	/// Failed during round
550	Failed {
551		error: String,
552	},
553
554	/// User canceled round
555	Canceled,
556}
557
558/// The state of a single round attempt
559///
560/// For each attempt that we participate in, we keep the state of our concrete
561/// participation.
562pub enum AttemptState {
563	AwaitingAttempt,
564	AwaitingUnsignedVtxoTree {
565		cosign_keys: Vec<Keypair>,
566		secret_nonces: Vec<Vec<DangerousSecretNonce>>,
567		unlock_hash: UnlockHash,
568	},
569	AwaitingFinishedRound {
570		unsigned_round_tx: Transaction,
571		vtxos_spec: VtxoTreeSpec,
572		unlock_hash: UnlockHash,
573	},
574}
575
576impl AttemptState {
577	/// The state kind represented as a string
578	fn kind(&self) -> &'static str {
579		match self {
580			Self::AwaitingAttempt => "AwaitingAttempt",
581			Self::AwaitingUnsignedVtxoTree { .. } => "AwaitingUnsignedVtxoTree",
582			Self::AwaitingFinishedRound { .. } => "AwaitingFinishedRound",
583		}
584	}
585}
586
587/// Result from trying to progress an ongoing round attempt
588enum AttemptProgressResult {
589	Finished {
590		funding_tx: Transaction,
591		vtxos: Vec<Vtxo<Full>>,
592		unlock_hash: UnlockHash,
593	},
594	Failed(anyhow::Error),
595	/// When the state changes, this variant is returned
596	///
597	/// If during the processing, we have signed any forfeit txs and tried
598	/// sending them to the server, the [UnconfirmedRound] instance is returned
599	/// so that it can be stored in the state.
600	Updated {
601		new_state: AttemptState,
602	},
603	NotUpdated,
604}
605
606/// Participate in the new round attempt by submitting our round participation
607async fn start_attempt(
608	wallet: &Wallet,
609	participation: &RoundParticipation,
610	event: &RoundAttempt,
611) -> anyhow::Result<AttemptState> {
612	let (mut srv, ark_info) = wallet.require_server().await.context("server not available")?;
613
614	// Assign cosign pubkeys to the payment requests.
615	let cosign_keys = iter::repeat_with(|| Keypair::new(&SECP, &mut rand::thread_rng()))
616		.take(participation.outputs.len())
617		.collect::<Vec<_>>();
618
619	// Prepare round participation info.
620	// For each of our requested vtxo output, we need a set of public and secret nonces.
621	let cosign_nonces = cosign_keys.iter()
622		.map(|key| {
623			let mut secs = Vec::with_capacity(ark_info.nb_round_nonces);
624			let mut pubs = Vec::with_capacity(ark_info.nb_round_nonces);
625			for _ in 0..ark_info.nb_round_nonces {
626				let (s, p) = musig::nonce_pair(key);
627				secs.push(s);
628				pubs.push(p);
629			}
630			(secs, pubs)
631		})
632		.take(participation.outputs.len())
633		.collect::<Vec<(Vec<SecretNonce>, Vec<PublicNonce>)>>();
634
635
636	// The round has now started. We can submit our payment.
637	debug!("Submitting payment request with {} inputs and {} vtxo outputs",
638		participation.inputs.len(), participation.outputs.len(),
639	);
640
641	let signed_reqs = participation.outputs.iter()
642		.zip(cosign_keys.iter())
643		.zip(cosign_nonces.iter())
644		.map(|((req, cosign_key), (_sec, pub_nonces))| {
645			SignedVtxoRequest {
646				vtxo: req.clone(),
647				cosign_pubkey: cosign_key.public_key(),
648				nonces: pub_nonces.clone(),
649			}
650		})
651		.collect::<Vec<_>>();
652
653	let mut input_vtxos = Vec::with_capacity(participation.inputs.len());
654	for vtxo in participation.inputs.iter() {
655		let keypair = wallet.get_vtxo_key(vtxo).await
656			.map_err(HarkForfeitError::Err)?;
657		input_vtxos.push(protos::InputVtxo {
658			vtxo_id: vtxo.id().to_bytes().to_vec(),
659			ownership_proof: {
660				let sig = event.challenge
661					.sign_with(vtxo.id(), &signed_reqs, &keypair);
662				sig.serialize().to_vec()
663			},
664		});
665	}
666
667	// Register VTXOs with server before round participation
668	wallet.register_vtxos_with_server(&participation.inputs).await
669		.map_err(HarkForfeitError::Err)?;
670
671	let resp = srv.client.submit_payment(protos::SubmitPaymentRequest {
672		input_vtxos: input_vtxos,
673		vtxo_requests: signed_reqs.into_iter().map(Into::into).collect(),
674		#[allow(deprecated)]
675		offboard_requests: vec![],
676	}).await.context("Ark server refused our payment submission")?;
677
678	Ok(AttemptState::AwaitingUnsignedVtxoTree {
679		unlock_hash: UnlockHash::from_bytes(&resp.into_inner().unlock_hash)?,
680		cosign_keys: cosign_keys,
681		secret_nonces: cosign_nonces.into_iter()
682			.map(|(sec, _pub)| sec.into_iter()
683				.map(DangerousSecretNonce::dangerous_from_secret_nonce)
684				.collect())
685			.collect(),
686	})
687}
688
689/// just an internal type; need Error trait to work with anyhow
690#[derive(Debug, thiserror::Error)]
691enum HarkForfeitError {
692	/// An error happened after we sent forfeit signatures to the server
693	#[error("error after forfeits were sent")]
694	SentForfeits(#[source] anyhow::Error),
695	/// An error happened before we sent forfeit signatures to the server
696	#[error("error before forfeits were sent")]
697	Err(#[source] anyhow::Error),
698}
699
700async fn hark_cosign_leaf(
701	wallet: &Wallet,
702	srv: &mut ServerConnection,
703	funding_tx: &Transaction,
704	vtxo: &mut Vtxo<Full>,
705) -> anyhow::Result<()> {
706	let key = wallet.pubkey_keypair(&vtxo.user_pubkey()).await
707		.context("error fetching keypair").map_err(HarkForfeitError::Err)?
708		.with_context(|| format!(
709			"keypair {} not found for VTXO {}", vtxo.user_pubkey(), vtxo.id(),
710		))?.1;
711	let (ctx, cosign_req) = LeafVtxoCosignContext::new(vtxo, funding_tx, &key);
712	let cosign_resp = srv.client.request_leaf_vtxo_cosign(
713		protos::LeafVtxoCosignRequest::from(cosign_req),
714	).await
715		.with_context(|| format!("error requesting leaf cosign for vtxo {}", vtxo.id()))?
716		.into_inner().try_into()
717		.context("bad leaf vtxo cosign response")?;
718	ensure!(ctx.finalize(vtxo, cosign_resp),
719		"failed to finalize VTXO leaf signature for VTXO {}", vtxo.id(),
720	);
721
722	Ok(())
723}
724
725/// Finish the hArk VTXO swap protocol
726///
727/// This includes:
728/// - requesting cosignature of the locked hArk leaves
729/// - sending forfeit txs to the server in return for the unlock preimage
730///
731/// NB all the actions in this function are idempotent, meaning that the server
732/// allows them to be done multiple times. this means that if this function calls
733/// fails, it's safe to just call it again at a later time
734async fn hark_vtxo_swap(
735	wallet: &Wallet,
736	participation: &RoundParticipation,
737	output_vtxos: &mut [Vtxo<Full>],
738	funding_tx: &Transaction,
739	unlock_hash: UnlockHash,
740) -> Result<(), HarkForfeitError> {
741	let (mut srv, _) = wallet.require_server().await.map_err(HarkForfeitError::Err)?;
742
743	// before we start make sure the server has our input vtxo signatures
744	wallet.register_vtxos_with_server(&participation.inputs).await
745		.context("couldn't send our input vtxos to server")
746		.map_err(HarkForfeitError::Err)?;
747
748	// first get the leaves signed
749	for vtxo in output_vtxos.iter_mut() {
750		hark_cosign_leaf(wallet, &mut srv, funding_tx, vtxo).await
751			.map_err(HarkForfeitError::Err)?;
752	}
753
754	// then do the forfeit dance
755
756	let server_nonces = srv.client.request_forfeit_nonces(protos::ForfeitNoncesRequest {
757		unlock_hash: unlock_hash.to_byte_array().to_vec(),
758		vtxo_ids: participation.inputs.iter().map(|v| v.id().to_bytes().to_vec()).collect(),
759	}).await
760		.context("request forfeits nonces call failed")
761		.map_err(HarkForfeitError::Err)?
762		.into_inner().public_nonces.into_iter()
763		.map(|b| musig::PublicNonce::from_bytes(b))
764		.collect::<Result<Vec<_>, _>>()
765		.context("invalid forfeit nonces")
766		.map_err(HarkForfeitError::Err)?;
767
768	if server_nonces.len() != participation.inputs.len() {
769		return Err(HarkForfeitError::Err(anyhow!(
770			"server sent {} nonce pairs, expected {}",
771			server_nonces.len(), participation.inputs.len(),
772		)));
773	}
774
775	let mut forfeit_bundles = Vec::with_capacity(participation.inputs.len());
776	for (input, nonces) in participation.inputs.iter().zip(server_nonces.into_iter()) {
777		let user_key = wallet.pubkey_keypair(&input.user_pubkey()).await
778			.ok().flatten().with_context(|| format!(
779				"failed to fetch keypair for vtxo user pubkey {}", input.user_pubkey(),
780			)).map_err(HarkForfeitError::Err)?.1;
781		forfeit_bundles.push(HashLockedForfeitBundle::new(
782			input, unlock_hash, &user_key, &nonces,
783		))
784	}
785
786	let preimage = srv.client.forfeit_vtxos(protos::ForfeitVtxosRequest {
787		forfeit_bundles: forfeit_bundles.iter().map(|b| b.serialize()).collect(),
788	}).await
789		.context("forfeit vtxos call failed")
790		.map_err(HarkForfeitError::SentForfeits)?
791		.into_inner().unlock_preimage.as_slice().try_into()
792		.context("invalid preimage length")
793		.map_err(HarkForfeitError::SentForfeits)?;
794
795	for vtxo in output_vtxos.iter_mut() {
796		if !vtxo.provide_unlock_preimage(preimage) {
797			return Err(HarkForfeitError::SentForfeits(anyhow!(
798				"invalid preimage {} for vtxo {} with supposed unlock hash {}",
799				preimage.as_hex(), vtxo.id(), unlock_hash,
800			)));
801		}
802
803		// then validate the vtxo works
804		vtxo.validate(&funding_tx).with_context(|| format!(
805			"new VTXO {} does not pass validation after hArk forfeit protocol", vtxo.id(),
806		)).map_err(HarkForfeitError::SentForfeits)?;
807	}
808
809	Ok(())
810}
811
812fn check_vtxo_fails_hash_lock(funding_tx: &Transaction, vtxo: &Vtxo<Full>) -> anyhow::Result<()> {
813	match vtxo.validate(funding_tx) {
814		Err(VtxoValidationError::GenesisTransition {
815			genesis_idx, genesis_len, transition_kind, ..
816		}) if genesis_idx + 1 == genesis_len && transition_kind == HARK_TRANSITION_KIND => Ok(()),
817		Ok(()) => Err(anyhow!("new un-unlocked VTXO should fail validation but doesn't: {}",
818			vtxo.serialize_hex(),
819		)),
820		Err(e) => Err(anyhow!("new VTXO {} failed validation: {:#}", vtxo.id(), e)),
821	}
822}
823
824fn check_round_matches_participation(
825	part: &RoundParticipation,
826	new_vtxos: &[Vtxo<Full>],
827	funding_tx: &Transaction,
828) -> anyhow::Result<()> {
829	ensure!(new_vtxos.len() == part.outputs.len(),
830		"unexpected number of VTXOs: got {}, expected {}", new_vtxos.len(), part.outputs.len(),
831	);
832
833	for (vtxo, req) in new_vtxos.iter().zip(&part.outputs) {
834		ensure!(vtxo.amount() == req.amount,
835			"unexpected VTXO amount: got {}, expected {}", vtxo.amount(), req.amount,
836		);
837		ensure!(*vtxo.policy() == req.policy,
838			"unexpected VTXO policy: got {:?}, expected {:?}", vtxo.policy(), req.policy,
839		);
840
841		// We accept the VTXO if only the hArk transition (last) failure happens
842		check_vtxo_fails_hash_lock(funding_tx, vtxo)?;
843	}
844
845	Ok(())
846}
847
848/// Check the confirmation status of a funding tx
849///
850/// Returns true if the funding tx is confirmed deeply enough for us to accept it.
851/// The required number of confirmations depends on the wallet's configuration.
852///
853/// Returns false if the funding tx seems valid but not confirmed yet.
854///
855/// Returns an error if the chain source fails or if we can't submit the tx to the
856/// mempool, suggesting it might be double spent.
857async fn check_funding_tx_confirmations(
858	wallet: &Wallet,
859	funding_txid: Txid,
860	funding_tx: &Transaction,
861) -> anyhow::Result<bool> {
862	let tip = wallet.chain.tip().await.context("chain source error")?;
863	let conf_height = tip - wallet.config.round_tx_required_confirmations + 1;
864	let tx_status = wallet.chain.tx_status(funding_txid).await.context("chain source error")?;
865	trace!("Round funding tx {} confirmation status: {:?} (tip={})",
866		funding_txid, tx_status, tip,
867	);
868	match tx_status {
869		TxStatus::Confirmed(b) if b.height <= conf_height => Ok(true),
870		TxStatus::Mempool | TxStatus::Confirmed(_) => {
871			if wallet.config.round_tx_required_confirmations == 0 {
872				debug!("Accepting round funding tx without confirmations because of configuration");
873				Ok(true)
874			} else {
875				trace!("Hark round funding tx not confirmed (deep enough) yet: {:?}", tx_status);
876				Ok(false)
877			}
878		},
879		TxStatus::NotFound => {
880			// let's try to submit it to our mempool
881			//TODO(stevenroose) change this to an explicit "testmempoolaccept" so that we can
882			// reliably distinguish the cases of our chain source having issues and the tx
883			// actually being rejected which suggests the round was double-spent
884			if let Err(e) = wallet.chain.broadcast_tx(&funding_tx).await {
885				Err(anyhow!("hark funding tx {} server sent us is rejected by mempool (hex={}): {:#}",
886					funding_txid, serialize_hex(funding_tx), e,
887				))
888			} else {
889				trace!("hark funding tx {} was not in mempool but we broadcast it", funding_txid);
890				Ok(false)
891			}
892		},
893	}
894}
895
896enum HarkProgressResult {
897	RoundPending,
898	FundingTxUnconfirmed {
899		funding_txid: Txid,
900	},
901	Ok {
902		funding_tx: Transaction,
903		new_vtxos: Vec<Vtxo<Full>>,
904	},
905}
906
907async fn progress_non_interactive(
908	wallet: &Wallet,
909	participation: &RoundParticipation,
910	unlock_hash: UnlockHash,
911) -> Result<HarkProgressResult, HarkForfeitError> {
912	let (mut srv, _) = wallet.require_server().await.map_err(HarkForfeitError::Err)?;
913
914	let resp = srv.client.round_participation_status(protos::RoundParticipationStatusRequest {
915		unlock_hash: unlock_hash.to_byte_array().to_vec(),
916	}).await
917		.context("error checking round participation status")
918		.map_err(HarkForfeitError::Err)?.into_inner();
919	let status = protos::RoundParticipationStatus::try_from(resp.status)
920		.context("unknown status from server")
921		.map_err(HarkForfeitError::Err)	?;
922
923	if status == protos::RoundParticipationStatus::RoundPartPending {
924		trace!("Hark round still pending");
925		return Ok(HarkProgressResult::RoundPending);
926	}
927
928	// Since we got here, we clearly don't think we're finished.
929	// So even if the server thinks we did the dance before, we need the
930	// cosignature on the leaf tx so we need to do the dance again.
931	// "Guilty feet have got no rhythm."
932	if status == protos::RoundParticipationStatus::RoundPartReleased {
933		let preimage = resp.unlock_preimage.as_ref().map(|p| p.as_hex());
934		warn!("Server says preimage was already released for hArk participation \
935			with unlock hash {}. Supposed preimage: {:?}", unlock_hash, preimage,
936		);
937	}
938
939	let funding_tx_bytes = resp.round_funding_tx
940		.context("funding txid should be provided when status is not pending")
941		.map_err(HarkForfeitError::Err)?;
942	let funding_tx = deserialize::<Transaction>(&funding_tx_bytes)
943		.context("invalid funding txid")
944		.map_err(HarkForfeitError::Err)?;
945	let funding_txid = funding_tx.compute_txid();
946	trace!("Funding tx for round participation with unlock hash {}: {} ({})",
947		unlock_hash, funding_tx.compute_txid(), funding_tx_bytes.as_hex(),
948	);
949
950	// Check the confirmation status of the funding tx
951	match check_funding_tx_confirmations(wallet, funding_txid, &funding_tx).await {
952		Ok(true) => {},
953		Ok(false) => return Ok(HarkProgressResult::FundingTxUnconfirmed { funding_txid }),
954		Err(e) => return Err(HarkForfeitError::Err(e.context("checking funding tx confirmations"))),
955	}
956
957	let mut new_vtxos = resp.output_vtxos.into_iter()
958		.map(|v| <Vtxo<Full>>::deserialize(&v))
959		.collect::<Result<Vec<_>, _>>()
960		.context("invalid output VTXOs from server")
961		.map_err(HarkForfeitError::Err)?;
962
963	// Check that the vtxos match our participation in the exact order
964	check_round_matches_participation(participation, &new_vtxos, &funding_tx)
965		.context("new VTXOs received from server don't match our participation")
966		.map_err(HarkForfeitError::Err)?;
967
968	hark_vtxo_swap(wallet, participation, &mut new_vtxos, &funding_tx, unlock_hash).await
969		.context("error forfeiting hArk VTXOs")
970		.map_err(HarkForfeitError::SentForfeits)?;
971
972	Ok(HarkProgressResult::Ok { funding_tx, new_vtxos })
973}
974
975async fn progress_attempt(
976	state: &AttemptState,
977	wallet: &Wallet,
978	part: &RoundParticipation,
979	event: &RoundEvent,
980) -> AttemptProgressResult {
981	// we will match only the states and messages required to make progress,
982	// all else we ignore, except an unexpected finish
983
984	match (state, event) {
985
986		(
987			AttemptState::AwaitingUnsignedVtxoTree { cosign_keys, secret_nonces, unlock_hash },
988			RoundEvent::VtxoProposal(e),
989		) => {
990			trace!("Received VtxoProposal: {:#?}", e);
991			match sign_vtxo_tree(
992				wallet,
993				part,
994				&cosign_keys,
995				&secret_nonces,
996				&e.unsigned_round_tx,
997				&e.vtxos_spec,
998				&e.cosign_agg_nonces,
999			).await {
1000				Ok(()) => {
1001					AttemptProgressResult::Updated {
1002						new_state: AttemptState::AwaitingFinishedRound {
1003							unsigned_round_tx: e.unsigned_round_tx.clone(),
1004							vtxos_spec: e.vtxos_spec.clone(),
1005							unlock_hash: *unlock_hash,
1006						},
1007					}
1008				},
1009				Err(e) => {
1010					trace!("Error signing VTXO tree: {:#}", e);
1011					AttemptProgressResult::Failed(e)
1012				},
1013			}
1014		},
1015
1016		(
1017			AttemptState::AwaitingFinishedRound { unsigned_round_tx, vtxos_spec, unlock_hash },
1018			RoundEvent::Finished(RoundFinished { cosign_sigs, signed_round_tx, .. }),
1019		) => {
1020			if unsigned_round_tx.compute_txid() != signed_round_tx.compute_txid() {
1021				return AttemptProgressResult::Failed(anyhow!(
1022					"signed funding tx ({}) doesn't match tx received before ({})",
1023					signed_round_tx.compute_txid(), unsigned_round_tx.compute_txid(),
1024				));
1025			}
1026
1027			if let Err(e) = wallet.chain.broadcast_tx(&signed_round_tx).await {
1028				warn!("Failed to broadcast signed round tx: {:#}", e);
1029			}
1030
1031			match construct_new_vtxos(
1032				part, unsigned_round_tx, vtxos_spec, cosign_sigs,
1033			).await {
1034				Ok(v) => AttemptProgressResult::Finished {
1035					funding_tx: signed_round_tx.clone(),
1036					vtxos: v,
1037					unlock_hash: *unlock_hash,
1038				},
1039				Err(e) => AttemptProgressResult::Failed(anyhow!(
1040					"failed to construct new VTXOs for round: {:#}", e,
1041				)),
1042			}
1043		},
1044
1045		(state, RoundEvent::Finished(RoundFinished { .. })) => {
1046			AttemptProgressResult::Failed(anyhow!(
1047				"unexpectedly received a finished round while we were in state {}",
1048				state.kind(),
1049			))
1050		},
1051
1052		(state, _) => {
1053			trace!("Ignoring round event {} in state {}", event.kind(), state.kind());
1054			AttemptProgressResult::NotUpdated
1055		},
1056	}
1057}
1058
1059async fn sign_vtxo_tree(
1060	wallet: &Wallet,
1061	participation: &RoundParticipation,
1062	cosign_keys: &[Keypair],
1063	secret_nonces: &[impl AsRef<[DangerousSecretNonce]>],
1064	unsigned_round_tx: &Transaction,
1065	vtxo_tree: &VtxoTreeSpec,
1066	cosign_agg_nonces: &[musig::AggregatedNonce],
1067) -> anyhow::Result<()> {
1068	let (srv, _) = wallet.require_server().await.context("server not available")?;
1069
1070	if unsigned_round_tx.output.len() < MIN_ROUND_TX_OUTPUTS {
1071		bail!("server sent round tx with less than 2 outputs: {}",
1072			serialize_hex(&unsigned_round_tx),
1073		);
1074	}
1075
1076	let vtxos_utxo = OutPoint::new(unsigned_round_tx.compute_txid(), ROUND_TX_VTXO_TREE_VOUT);
1077
1078	// Check that the proposal contains our inputs.
1079	let mut my_vtxos = participation.outputs.iter().collect::<Vec<_>>();
1080	for vtxo_req in vtxo_tree.iter_vtxos() {
1081		if let Some(i) = my_vtxos.iter().position(|v| {
1082			v.policy == vtxo_req.vtxo.policy && v.amount == vtxo_req.vtxo.amount
1083		}) {
1084			my_vtxos.swap_remove(i);
1085		}
1086	}
1087	if !my_vtxos.is_empty() {
1088		bail!("server didn't include all of our vtxos, missing: {:?}", my_vtxos);
1089	}
1090
1091	let unsigned_vtxos = vtxo_tree.clone().into_unsigned_tree(vtxos_utxo);
1092	let iter = participation.outputs.iter().zip(cosign_keys).zip(secret_nonces);
1093	trace!("Sending vtxo signatures to server...");
1094	let _ = try_join_all(iter.map(|((req, key), sec)| async {
1095		let leaf_idx = unsigned_vtxos.spec.leaf_idx_of_req(req).expect("req included");
1096		let secret_nonces = sec.as_ref().iter().map(|s| s.to_sec_nonce()).collect();
1097		let part_sigs = unsigned_vtxos.cosign_branch(
1098			&cosign_agg_nonces, leaf_idx, key, secret_nonces,
1099		).context("failed to cosign branch: our request not part of tree")?;
1100
1101		info!("Sending {} partial vtxo cosign signatures for pk {}",
1102			part_sigs.len(), key.public_key(),
1103		);
1104
1105		let _ = srv.client.clone().provide_vtxo_signatures(protos::VtxoSignaturesRequest {
1106			pubkey: key.public_key().serialize().to_vec(),
1107			signatures: part_sigs.iter().map(|s| s.serialize().to_vec()).collect(),
1108		}).await.context("error sending vtxo signatures")?;
1109
1110		Result::<(), anyhow::Error>::Ok(())
1111	})).await.context("error sending VTXO signatures")?;
1112	trace!("Done sending vtxo signatures to server");
1113
1114	Ok(())
1115}
1116
1117async fn construct_new_vtxos(
1118	participation: &RoundParticipation,
1119	unsigned_round_tx: &Transaction,
1120	vtxo_tree: &VtxoTreeSpec,
1121	vtxo_cosign_sigs: &[schnorr::Signature],
1122) -> anyhow::Result<Vec<Vtxo<Full>>> {
1123	let round_txid = unsigned_round_tx.compute_txid();
1124	let vtxos_utxo = OutPoint::new(round_txid, ROUND_TX_VTXO_TREE_VOUT);
1125	let vtxo_tree = vtxo_tree.clone().into_unsigned_tree(vtxos_utxo);
1126
1127	// Validate the vtxo tree and cosign signatures.
1128	if vtxo_tree.verify_cosign_sigs(&vtxo_cosign_sigs).is_err() {
1129		// bad server!
1130		bail!("Received incorrect vtxo cosign signatures from server");
1131	}
1132
1133	let signed_vtxos = vtxo_tree
1134		.into_signed_tree(vtxo_cosign_sigs.to_vec())
1135		.into_cached_tree();
1136
1137	let mut expected_vtxos = participation.outputs.iter().collect::<Vec<_>>();
1138	let total_nb_expected_vtxos = expected_vtxos.len();
1139
1140	let mut new_vtxos = vec![];
1141	for (idx, req) in signed_vtxos.spec.spec.vtxos.iter().enumerate() {
1142		if let Some(expected_idx) = expected_vtxos.iter().position(|r| **r == req.vtxo) {
1143			let vtxo = signed_vtxos.build_vtxo(idx);
1144
1145			// validate the received vtxos
1146			// This is more like a sanity check since we crafted them ourselves.
1147			check_vtxo_fails_hash_lock(unsigned_round_tx, &vtxo)
1148				.context("constructed invalid vtxo from tree")?;
1149
1150			info!("New VTXO from round: {} ({}, {})",
1151				vtxo.id(), vtxo.amount(), vtxo.policy_type(),
1152			);
1153
1154			new_vtxos.push(vtxo);
1155			expected_vtxos.swap_remove(expected_idx);
1156		}
1157	}
1158
1159	if !expected_vtxos.is_empty() {
1160		if expected_vtxos.len() == total_nb_expected_vtxos {
1161			// we must have done something wrong
1162			bail!("None of our VTXOs were present in round!");
1163		} else {
1164			bail!("Server included some of our VTXOs but not all: {} missing: {:?}",
1165				expected_vtxos.len(), expected_vtxos,
1166			);
1167		}
1168	}
1169	Ok(new_vtxos)
1170}
1171
1172//TODO(stevenroose) should be made idempotent
1173async fn persist_round_success(
1174	wallet: &Wallet,
1175	participation: &RoundParticipation,
1176	movement_id: Option<MovementId>,
1177	new_vtxos: &[Vtxo<Full>],
1178	funding_tx: &Transaction,
1179) -> anyhow::Result<()> {
1180	debug!("Persisting newly finished round. {} new vtxos, movement ID {:?}",
1181		new_vtxos.len(), movement_id,
1182	);
1183
1184	// we first try all actions that need to happen and only afterwards return errors
1185	// so that we achieve maximum success
1186
1187	let store_result = wallet.store_spendable_vtxos(new_vtxos).await
1188		.context("failed to store new VTXOs");
1189	let spent_result = wallet.mark_vtxos_as_spent(&participation.inputs).await
1190		.context("failed to mark input VTXOs as spent");
1191	let update_result = if let Some(mid) = movement_id {
1192		wallet.movements.finish_movement_with_update(
1193			mid,
1194			MovementStatus::Successful,
1195			MovementUpdate::new()
1196				.produced_vtxos(new_vtxos)
1197				.metadata([("funding_txid".into(), serde_json::to_value(funding_tx.compute_txid())?)]),
1198		).await.context("failed to mark movement as finished")
1199	} else {
1200		Ok(())
1201	};
1202
1203	store_result?;
1204	spent_result?;
1205	update_result?;
1206
1207	Ok(())
1208}
1209
1210async fn persist_round_failure(
1211	wallet: &Wallet,
1212	participation: &RoundParticipation,
1213	movement_id: Option<MovementId>,
1214) -> anyhow::Result<()> {
1215	debug!("Attempting to persist the failure of a round with the movement ID {:?}", movement_id);
1216	let unlock_result = wallet.unlock_vtxos(&participation.inputs).await;
1217	let finish_result = if let Some(movement_id) = movement_id {
1218		wallet.movements.finish_movement(movement_id, MovementStatus::Failed).await
1219	} else {
1220		Ok(())
1221	};
1222	if let Err(e) = &finish_result {
1223		error!("Failed to mark movement as failed: {:#}", e);
1224	}
1225	match (unlock_result, finish_result) {
1226		(Ok(()), Ok(())) => Ok(()),
1227		(Err(e), _) => Err(e),
1228		(_, Err(e)) => Err(anyhow!("Failed to mark movement as failed: {:#}", e)),
1229	}
1230}
1231
1232async fn update_funding_txid(
1233	wallet: &Wallet,
1234	movement_id: MovementId,
1235	funding_txid: Txid,
1236) -> anyhow::Result<()> {
1237	wallet.movements.update_movement(
1238		movement_id,
1239		MovementUpdate::new()
1240			.metadata([("funding_txid".into(), serde_json::to_value(&funding_txid)?)])
1241	).await.context("Unable to update funding txid of round")
1242}
1243
1244impl Wallet {
1245	/// Ask the server when the next round is scheduled to start
1246	pub async fn next_round_start_time(&self) -> anyhow::Result<SystemTime> {
1247		let (mut srv, _) = self.require_server().await?;
1248		let ts = srv.client.next_round_time(protos::Empty {}).await?.into_inner().timestamp;
1249		Ok(UNIX_EPOCH.checked_add(Duration::from_secs(ts)).context("invalid timestamp")?)
1250	}
1251
1252	/// Start a new round participation
1253	///
1254	/// This function will store the state in the db and mark the VTXOs as locked.
1255	///
1256	/// ### Return
1257	///
1258	/// - By default, the returned state will be locked to prevent race conditions.
1259	/// To unlock the state, [StoredRoundState::unlock()] can be called.
1260	pub async fn join_next_round(
1261		&self,
1262		participation: RoundParticipation,
1263		movement_kind: Option<RoundMovement>,
1264	) -> anyhow::Result<StoredRoundState> {
1265		let movement_id = if let Some(kind) = movement_kind {
1266			Some(self.movements.new_movement_with_update(
1267				Subsystem::ROUND,
1268				kind.to_string(),
1269				participation.to_movement_update()?
1270			).await?)
1271		} else {
1272			None
1273		};
1274		let state = RoundState::new_interactive(participation, movement_id);
1275
1276		let id = self.db.store_round_state_lock_vtxos(&state).await?;
1277		let state = self.lock_wait_round_state(id).await?
1278			.context("failed to lock fresh round state")?;
1279
1280		Ok(state)
1281	}
1282
1283	/// Join next round in non-interactive or delegated mode
1284	pub async fn join_next_round_delegated(
1285		&self,
1286		participation: RoundParticipation,
1287		movement_kind: Option<RoundMovement>,
1288	) -> anyhow::Result<StoredRoundState<Unlocked>> {
1289		let (mut srv, _) = self.require_server().await?;
1290
1291		let movement_id = if let Some(kind) = movement_kind {
1292			Some(self.movements.new_movement_with_update(
1293				Subsystem::ROUND, kind.to_string(), participation.to_movement_update()?,
1294			).await?)
1295		} else {
1296			None
1297		};
1298
1299		// Generate ownership proofs for input vtxos
1300		let mut input_vtxos = Vec::with_capacity(participation.inputs.len());
1301		for vtxo in participation.inputs.iter() {
1302			let keypair = self.get_vtxo_key(vtxo).await
1303				.context("failed to get vtxo keypair")?;
1304			input_vtxos.push(protos::InputVtxo {
1305				vtxo_id: vtxo.id().to_bytes().to_vec(),
1306				ownership_proof: {
1307					let sig = ark::challenges::NonInteractiveRoundParticipationChallenge::sign_with(
1308						vtxo.id(), &participation.outputs, &keypair,
1309					);
1310					sig.serialize().to_vec()
1311				},
1312			});
1313		}
1314
1315		// Submit participation to server and get unlock_hash
1316		let resp = srv.client.submit_round_participation(protos::RoundParticipationRequest {
1317			input_vtxos,
1318			vtxo_requests: participation.outputs.iter().map(|r| protos::VtxoRequest {
1319				policy: r.policy.serialize(),
1320				amount: r.amount.to_sat(),
1321			}).collect(),
1322		}).await.context("error submitting round participation to server")?.into_inner();
1323
1324		let unlock_hash = UnlockHash::from_bytes(resp.unlock_hash)
1325			.context("invalid unlock hash from server")?;
1326
1327		let state = RoundState::new_non_interactive(participation, unlock_hash, movement_id);
1328
1329		info!("Non-interactive round participation submitted, it will automatically execute \
1330			when you next sync your wallet after the round happened \
1331			(and has sufficient confirmations).",
1332		);
1333
1334		let id = self.db.store_round_state_lock_vtxos(&state).await?;
1335		Ok(StoredRoundState::new(id, state))
1336	}
1337
1338	/// Get all pending round states
1339	pub async fn pending_round_state_ids(&self) -> anyhow::Result<Vec<RoundStateId>> {
1340		self.db.get_pending_round_state_ids().await
1341	}
1342
1343	/// Get all pending round states
1344	pub async fn pending_round_states(&self) -> anyhow::Result<Vec<StoredRoundState<Unlocked>>> {
1345		let ids = self.db.get_pending_round_state_ids().await?;
1346		let mut states = Vec::with_capacity(ids.len());
1347		for id in ids {
1348			if let Some(state) = self.db.get_round_state_by_id(id).await? {
1349				states.push(state);
1350			}
1351		}
1352		Ok(states)
1353	}
1354
1355	/// Balance locked in pending rounds
1356	pub async fn pending_round_balance(&self) -> anyhow::Result<Amount> {
1357		let mut ret = Amount::ZERO;
1358		for round in self.pending_round_states().await? {
1359			ret += round.state().pending_balance();
1360		}
1361		Ok(ret)
1362	}
1363
1364	/// Returns all VTXOs that are locked in a pending round
1365	///
1366	/// This excludes all input VTXOs for which the output VTXOs have already
1367	/// been created.
1368	pub async fn pending_round_input_vtxos(&self) -> anyhow::Result<Vec<WalletVtxo>> {
1369		let mut ret = Vec::new();
1370		for round in self.pending_round_states().await? {
1371			let inputs = round.state().locked_pending_inputs();
1372			ret.reserve(inputs.len());
1373			for input in inputs {
1374				let v = self.get_vtxo_by_id(input.id()).await
1375					.context("unknown round input VTXO")?;
1376				ret.push(v);
1377			}
1378		}
1379		Ok(ret)
1380	}
1381
1382	/// Sync pending rounds that have finished but are waiting for confirmations
1383	pub async fn sync_pending_rounds(&self) -> anyhow::Result<()> {
1384		let states = self.pending_round_states().await?;
1385		if states.is_empty() {
1386			return Ok(());
1387		}
1388
1389		debug!("Syncing {} pending round states...", states.len());
1390
1391		tokio_stream::iter(states).for_each_concurrent(10, |state| async move {
1392			// not processing events here
1393			if state.state().ongoing_participation() {
1394				return;
1395			}
1396
1397			let mut state = match self.lock_wait_round_state(state.id()).await {
1398				Ok(Some(state)) => state,
1399				Ok(None) => return,
1400				Err(e) => {
1401					warn!("Error locking round state: {:#}", e);
1402					return;
1403				},
1404			};
1405
1406			let status = state.state_mut().sync(self).await;
1407			trace!("Synced round #{}, status: {:?}", state.id(), status);
1408			match status {
1409				Ok(RoundStatus::Confirmed { funding_txid }) => {
1410					info!("Round confirmed. Funding tx {}", funding_txid);
1411					if let Err(e) = self.db.remove_round_state(&state).await {
1412						warn!("Error removing confirmed round state from db: {:#}", e);
1413					}
1414				},
1415				Ok(RoundStatus::Unconfirmed { funding_txid }) => {
1416					info!("Waiting for confirmations for round funding tx {}", funding_txid);
1417					if let Err(e) = self.db.update_round_state(&state).await {
1418						warn!("Error updating pending round state in db: {:#}", e);
1419					}
1420				},
1421				Ok(RoundStatus::Pending) => {
1422					if let Err(e) = self.db.update_round_state(&state).await {
1423						warn!("Error updating pending round state in db: {:#}", e);
1424					}
1425				},
1426				Ok(RoundStatus::Failed { error }) => {
1427					error!("Round failed: {}", error);
1428					if let Err(e) = self.db.remove_round_state(&state).await {
1429						warn!("Error removing failed round state from db: {:#}", e);
1430					}
1431				},
1432				Ok(RoundStatus::Canceled) => {
1433					error!("Round canceled");
1434					if let Err(e) = self.db.remove_round_state(&state).await {
1435						warn!("Error removing canceled round state from db: {:#}", e);
1436					}
1437				},
1438				Err(e) => warn!("Error syncing round: {:#}", e),
1439			}
1440		}).await;
1441
1442		Ok(())
1443	}
1444
1445	/// Fetch last round event from server
1446	async fn get_last_round_event(&self) -> anyhow::Result<RoundEvent> {
1447		let (mut srv, _) = self.require_server().await?;
1448		let e = srv.client.last_round_event(protos::Empty {}).await?.into_inner();
1449		Ok(RoundEvent::try_from(e).context("invalid event format from server")?)
1450	}
1451
1452	async fn inner_process_event(
1453		&self,
1454		state: &mut StoredRoundState,
1455		event: Option<&RoundEvent>,
1456	) {
1457		if let Some(event) = event && state.state().ongoing_participation() {
1458			let updated = state.state_mut().process_event(self, &event).await;
1459			if updated {
1460				if let Err(e) = self.db.update_round_state(&state).await {
1461					error!("Error storing round state #{} after progress: {:#}", state.id(), e);
1462				}
1463			}
1464		}
1465
1466		match state.state_mut().sync(self).await {
1467			Err(e) => warn!("Error syncing round #{}: {:#}", state.id(), e),
1468			Ok(s) if s.is_final() => {
1469				info!("Round #{} finished with result: {:?}", state.id(), s);
1470				if let Err(e) = self.db.remove_round_state(&state).await {
1471					warn!("Failed to remove finished round #{} from db: {:#}", state.id(), e);
1472				}
1473			},
1474			Ok(s) => {
1475				trace!("Round state #{} is now in state {:?}", state.id(), s);
1476				if let Err(e) = self.db.update_round_state(&state).await {
1477					warn!("Error storing round state #{}: {:#}", state.id(), e);
1478				}
1479			},
1480		}
1481	}
1482
1483	/// Try to make incremental progress on all pending round states
1484	///
1485	/// If the `last_round_event` argument is not provided, it will be fetched
1486	/// from the server.
1487	pub async fn progress_pending_rounds(
1488		&self,
1489		last_round_event: Option<&RoundEvent>,
1490	) -> anyhow::Result<()> {
1491		let states = self.pending_round_states().await?;
1492		info!("Processing {} rounds...", states.len());
1493
1494		let mut last_round_event = last_round_event.map(|e| Cow::Borrowed(e));
1495
1496		let has_ongoing_participation = states.iter()
1497			.any(|s| s.state().ongoing_participation());
1498		if has_ongoing_participation && last_round_event.is_none() {
1499			match self.get_last_round_event().await {
1500				Ok(e) => last_round_event = Some(Cow::Owned(e)),
1501				Err(e) => {
1502					warn!("Error fetching round event, \
1503						failed to progress ongoing rounds: {:#}", e);
1504				},
1505			}
1506		}
1507
1508		let event = last_round_event.as_ref().map(|c| c.as_ref());
1509
1510		let futs = states.into_iter().map(async |state| {
1511			let locked = self.lock_wait_round_state(state.id()).await?;
1512			if let Some(mut locked) = locked {
1513				self.inner_process_event(&mut locked, event).await;
1514			}
1515			Ok::<_, anyhow::Error>(())
1516		});
1517
1518		futures::future::join_all(futs).await;
1519
1520		Ok(())
1521	}
1522
1523	pub async fn subscribe_round_events(&self)
1524		-> anyhow::Result<impl Stream<Item = anyhow::Result<RoundEvent>> + Unpin>
1525	{
1526		let (mut srv, _) = self.require_server().await?;
1527		let events = srv.client.subscribe_rounds(protos::Empty {}).await?
1528			.into_inner().map(|m| {
1529				let m = m.context("received error on event stream")?;
1530				let e = RoundEvent::try_from(m.clone())
1531					.with_context(|| format!("error converting rpc round event: {:?}", m))?;
1532				trace!("Received round event: {}", e);
1533				Ok::<_, anyhow::Error>(e)
1534			});
1535		Ok(events)
1536	}
1537
1538	/// A blocking call that will try to perform a full round participation
1539	/// for all ongoing rounds
1540	///
1541	/// Returns only once there is no ongoing rounds anymore.
1542	pub async fn participate_ongoing_rounds(&self) -> anyhow::Result<()> {
1543		let mut events = self.subscribe_round_events().await?;
1544
1545		loop {
1546			// NB: we need to load all ongoing rounds on every iteration here
1547			// because some might be finished by another call
1548			let state_ids = self.pending_round_states().await?.iter()
1549				.filter(|s| s.state().ongoing_participation())
1550				.map(|s| s.id())
1551				.collect::<Vec<_>>();
1552
1553			if state_ids.is_empty() {
1554				info!("All rounds handled");
1555				return Ok(());
1556			}
1557
1558			let event = events.next().await
1559				.context("events stream broke")?
1560				.context("error on event stream")?;
1561
1562			let futs = state_ids.into_iter().map(async |state| {
1563				let locked = self.lock_wait_round_state(state).await?;
1564				if let Some(mut locked) = locked {
1565					self.inner_process_event(&mut locked, Some(&event)).await;
1566				}
1567				Ok::<_, anyhow::Error>(())
1568			});
1569
1570			futures::future::join_all(futs).await;
1571		}
1572	}
1573
1574	/// Will cancel all pending rounds that can safely be canceled
1575	///
1576	/// All rounds that have not started yet can safely be canceled,
1577	/// as well as rounds where we have not yet signed any forfeit txs.
1578	pub async fn cancel_all_pending_rounds(&self) -> anyhow::Result<()> {
1579		// initial load to get all pending round states ids
1580		let state_ids = self.db.get_pending_round_state_ids().await?;
1581
1582		let futures = state_ids.into_iter().map(|state_id| {
1583			async move {
1584				// wait for lock and load again to ensure most recent state
1585				let mut state = match self.lock_wait_round_state(state_id).await {
1586					Ok(Some(s)) => s,
1587					Ok(None) => return,
1588					Err(e) => return warn!("Error loading round state #{}: {:#}", state_id, e),
1589				};
1590
1591				match state.state_mut().try_cancel(self).await {
1592					Ok(true) => {
1593						if let Err(e) = self.db.remove_round_state(&state).await {
1594							warn!("Error removing canceled round state from db: {:#}", e);
1595						}
1596					},
1597					Ok(false) => {},
1598					Err(e) => warn!("Error trying to cancel round #{}: {:#}", state_id, e),
1599				}
1600			}
1601		});
1602
1603		join_all(futures).await;
1604
1605		Ok(())
1606	}
1607
1608	/// Try to cancel the given round
1609	pub async fn cancel_pending_round(&self, id: RoundStateId) -> anyhow::Result<()> {
1610		let mut state = self.lock_wait_round_state(id).await?
1611			.context("round state not found")?;
1612
1613		if state.state_mut().try_cancel(self).await.context("failed to cancel round")? {
1614			self.db.remove_round_state(&state).await
1615				.context("error removing canceled round state from db")?;
1616		} else {
1617			bail!("failed to cancel round");
1618		}
1619
1620		Ok(())
1621	}
1622
1623	/// Participate in a round
1624	///
1625	/// This function will start a new round participation and block until
1626	/// the round is finished.
1627	/// After this method returns the round state will be kept active until
1628	/// the round tx fully confirms.
1629	pub(crate) async fn participate_round(
1630		&self,
1631		participation: RoundParticipation,
1632		movement_kind: Option<RoundMovement>,
1633	) -> anyhow::Result<RoundStatus> {
1634		let mut state = self.join_next_round(participation, movement_kind).await?;
1635
1636		info!("Waiting for a round start...");
1637		let mut events = self.subscribe_round_events().await?;
1638
1639		loop {
1640			if !state.state().ongoing_participation() {
1641				let status = state.state_mut().sync(self).await?;
1642				match status {
1643					RoundStatus::Failed { error } => bail!("round failed: {}", error),
1644					RoundStatus::Canceled => bail!("round canceled"),
1645					status => return Ok(status),
1646				}
1647			}
1648
1649			let event = events.next().await
1650				.context("events stream broke")?
1651				.context("error on event stream")?;
1652			if state.state_mut().process_event(self, &event).await {
1653				self.db.update_round_state(&state).await?;
1654			}
1655		}
1656	}
1657}