bark/
round.rs

1//!
2//! Round State Machine
3//!
4
5use std::iter;
6use std::borrow::Cow;
7use std::convert::Infallible;
8
9use anyhow::Context;
10use ark::vtxo::VtxoValidationError;
11use bdk_esplora::esplora_client::Amount;
12use bip39::rand;
13use bitcoin::{OutPoint, SignedAmount, Transaction, Txid};
14use bitcoin::consensus::encode::{deserialize, serialize_hex};
15use bitcoin::hashes::Hash;
16use bitcoin::hex::DisplayHex;
17use bitcoin::key::Keypair;
18use bitcoin::secp256k1::schnorr;
19use futures::future::try_join_all;
20use futures::{Stream, StreamExt};
21use log::{debug, error, info, trace, warn};
22
23use ark::{ProtocolEncoding, SignedVtxoRequest, Vtxo, VtxoRequest};
24use ark::forfeit::{HashLockedForfeitBundle, HashLockedForfeitNonces};
25use ark::musig::{self, DangerousSecretNonce, PublicNonce, SecretNonce};
26use ark::rounds::{
27	RoundAttempt, RoundEvent, RoundFinished, RoundSeq, MIN_ROUND_TX_OUTPUTS,
28	ROUND_TX_VTXO_TREE_VOUT,
29};
30use ark::tree::signed::{LeafVtxoCosignContext, UnlockHash, VtxoTreeSpec};
31use bitcoin_ext::{TxStatus, P2TR_DUST};
32use server_rpc::{protos, ServerConnection, TryFromBytes};
33
34use crate::{SECP, Wallet};
35use crate::movement::{MovementId, MovementStatus};
36use crate::movement::update::MovementUpdate;
37use crate::persist::{RoundStateId, StoredRoundState};
38use crate::subsystem::{RoundMovement, Subsystem};
39
40
41/// The type string for the hArk leaf transition
42const HARK_TRANSITION_KIND: &str = "hash-locked-cosigned";
43
44
45/// Struct to communicate your specific participation for an Ark round.
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct RoundParticipation {
48	#[serde(with = "ark::encode::serde::vec")]
49	pub inputs: Vec<Vtxo>,
50	/// The output VTXOs that we request in the round,
51	/// including change
52	pub outputs: Vec<VtxoRequest>,
53}
54
55impl RoundParticipation {
56	/// Check that participation doesn't make some obvious violations
57	pub fn sanity_check(&self) -> anyhow::Result<()> {
58		if let Some(payreq) = self.outputs.iter().find(|p| p.amount < P2TR_DUST) {
59			bail!("VTXO amount must be at least {}, requested {}", P2TR_DUST, payreq.amount);
60		}
61		Ok(())
62	}
63
64	pub fn to_movement_update(&self) -> anyhow::Result<MovementUpdate> {
65		let input_amount = self.inputs.iter().map(|i| i.amount()).sum::<Amount>();
66		let output_amount = self.outputs.iter().map(|r| r.amount).sum::<Amount>();
67		let fee = input_amount - output_amount;
68		Ok(MovementUpdate::new()
69			.consumed_vtxos(&self.inputs)
70			.intended_balance(SignedAmount::ZERO)
71			.effective_balance( - fee.to_signed()?)
72			.fee(fee)
73		)
74	}
75}
76
77#[derive(Debug, Clone)]
78pub enum RoundStatus {
79	/// The round was successful and is fully confirmed
80	Confirmed {
81		funding_txid: Txid,
82	},
83	/// Round successful but not fully confirmed
84	Unconfirmed {
85		funding_txid: Txid,
86	},
87	/// Round didn't finish yet
88	Pending,
89	/// The round failed
90	Failed {
91		error: String,
92	},
93	/// User canceled the round
94	Canceled,
95}
96
97impl RoundStatus {
98	/// Whether this is the final state and it won't change anymore
99	pub fn is_final(&self) -> bool {
100		match self {
101			Self::Confirmed { .. } => true,
102			Self::Unconfirmed { .. } => false,
103			Self::Pending => false,
104			Self::Failed { .. } => true,
105			Self::Canceled => true,
106		}
107	}
108
109	/// Whether it looks like the round succeeded
110	pub fn is_success(&self) -> bool {
111		match self {
112			Self::Confirmed { .. } => true,
113			Self::Unconfirmed { .. } => true,
114			Self::Pending => false,
115			Self::Failed { .. } => false,
116			Self::Canceled => false,
117		}
118	}
119}
120
121/// State of the progress of a round participation
122///
123/// An instance of this struct is kept all the way from the intention of joining
124/// the next round, until either the round fully confirms or it fails and we are
125/// sure it won't have any effect on our wallet.
126///
127/// As soon as we have signed forfeit txs for the round, we keep track of this
128/// round attempt until we see another attempt we participated in confirm or
129/// we gain confidence that the failed attempt will never confirm.
130//
131//TODO(stevenroose) move the id in here and have the state persist itself with the wallet
132// to have better control. this way we can touch db before we sent forfeit sigs
133pub struct RoundState {
134	/// Round is fully done
135	pub(crate) done: bool,
136
137	/// Our participation in this round
138	pub(crate) participation: RoundParticipation,
139
140	/// The flow of the round in case it is still ongoing with the server
141	pub(crate) flow: RoundFlowState,
142
143	/// The new output vtxos of this round participation
144	///
145	/// After we finish the interactive part, we fill this with the uncompleted
146	/// VTXOs which we then try to complete with the unlock preimage.
147	pub(crate) new_vtxos: Vec<Vtxo>,
148
149	/// Whether we sent our forfeit signatures to the server
150	///
151	/// If we did this and the server refused to reveal our new VTXOs,
152	/// we will be forced to exit.
153	//TODO(stevenroose) implement exit when this is true and we can't make progress
154	// probably based on the input vtxos becoming close to expiry
155	pub(crate) sent_forfeit_sigs: bool,
156
157	/// The ID of the [Movement] associated with this round
158	pub(crate) movement_id: Option<MovementId>,
159}
160
161impl RoundState {
162	fn new_interactive(
163		participation: RoundParticipation,
164		movement_id: Option<MovementId>,
165	) -> Self {
166		Self {
167			participation,
168			movement_id,
169			flow: RoundFlowState::InteractivePending,
170			new_vtxos: Vec::new(),
171			sent_forfeit_sigs: false,
172			done: false,
173		}
174	}
175
176	#[allow(unused)]
177	fn new_non_interactive(
178		participation: RoundParticipation,
179		unlock_hash: UnlockHash,
180		movement_id: Option<MovementId>,
181	) -> Self {
182		Self {
183			participation,
184			movement_id,
185			flow: RoundFlowState::NonInteractivePending { unlock_hash },
186			new_vtxos: Vec::new(),
187			sent_forfeit_sigs: false,
188			done: false,
189		}
190	}
191
192	/// Our participation in this round
193	pub fn participation(&self) -> &RoundParticipation {
194		&self.participation
195	}
196
197	/// the unlock hash if already known
198	pub fn unlock_hash(&self) -> Option<UnlockHash> {
199		match self.flow {
200			RoundFlowState::NonInteractivePending { unlock_hash } => Some(unlock_hash),
201			RoundFlowState::InteractivePending => None,
202			RoundFlowState::InteractiveOngoing { .. } => None,
203			RoundFlowState::Failed { .. } => None,
204			RoundFlowState::Canceled => None,
205			RoundFlowState::Finished { unlock_hash, .. } => Some(unlock_hash),
206		}
207	}
208
209	pub fn funding_tx(&self) -> Option<&Transaction> {
210		match self.flow {
211			RoundFlowState::NonInteractivePending { .. } => None,
212			RoundFlowState::InteractivePending => None,
213			RoundFlowState::InteractiveOngoing { .. } => None,
214			RoundFlowState::Failed { .. } => None,
215			RoundFlowState::Canceled => None,
216			RoundFlowState::Finished { ref funding_tx, .. } => Some(funding_tx),
217		}
218	}
219
220	/// Whether the interactive part of the round is still ongoing
221	pub fn ongoing_participation(&self) -> bool {
222		match self.flow {
223			RoundFlowState::NonInteractivePending { .. } => false,
224			RoundFlowState::InteractivePending => true,
225			RoundFlowState::InteractiveOngoing { .. } => true,
226			RoundFlowState::Failed { .. } => false,
227			RoundFlowState::Canceled => false,
228			RoundFlowState::Finished { .. } => false,
229		}
230	}
231
232	/// Tries to cancel the round and returns whether it was succesfully canceled
233	/// or if it was already canceled or failed
234	pub async fn try_cancel(&mut self, wallet: &Wallet) -> anyhow::Result<bool> {
235		let ret = match self.flow {
236			RoundFlowState::NonInteractivePending { .. } => todo!("we have to cancel with server!"),
237			RoundFlowState::Canceled => true,
238			RoundFlowState::Failed { .. } => true,
239			RoundFlowState::InteractivePending | RoundFlowState::InteractiveOngoing { .. } => {
240				self.flow = RoundFlowState::Canceled;
241				true
242			},
243			RoundFlowState::Finished { .. } => false,
244		};
245		if ret {
246			persist_round_failure(wallet, &self.participation, self.movement_id).await
247				.context("failed to persist round failure for cancelation")?;
248		}
249		Ok(ret)
250	}
251
252	async fn try_start_attempt(&mut self, wallet: &Wallet, attempt: &RoundAttempt) {
253		match start_attempt(wallet, &self.participation, attempt).await {
254			Ok(state) => {
255				self.flow = RoundFlowState::InteractiveOngoing {
256					round_seq: attempt.round_seq,
257					attempt_seq: attempt.attempt_seq,
258					state: state,
259				};
260			},
261			Err(e) => {
262				self.flow = RoundFlowState::Failed {
263					error: format!("{:#}", e),
264				};
265			},
266		}
267	}
268
269	/// Processes the given event and returns true if some update was made to the state
270	pub async fn process_event(
271		&mut self,
272		wallet: &Wallet,
273		event: &RoundEvent,
274	) -> bool {
275		let _: Infallible = match self.flow {
276			RoundFlowState::InteractivePending => {
277				if let RoundEvent::Attempt(e) = event && e.attempt_seq == 0 {
278					trace!("Joining round attempt {}:{}", e.round_seq, e.attempt_seq);
279					self.try_start_attempt(wallet, e).await;
280					return true;
281				} else {
282					trace!("Ignoring {} event (seq {}:{}), waiting for round to start",
283						event.kind(), event.round_seq(), event.attempt_seq(),
284					);
285					return false;
286				}
287			},
288			RoundFlowState::InteractiveOngoing { round_seq, attempt_seq, ref mut state } => {
289				// here we catch the cases where we're in a wrong flow
290
291				if event.round_seq() > round_seq {
292					// new round started, we don't support multiple parallel rounds,
293					// this means we failed
294					self.flow = RoundFlowState::Failed {
295						error: format!("round {} started while we were on {}",
296							event.round_seq(), round_seq,
297						),
298					};
299					return true;
300				}
301
302				if event.attempt_seq() < attempt_seq {
303					trace!("ignoring replayed message from old attempt");
304					return false;
305				}
306
307				if let RoundEvent::Attempt(e) = event && e.attempt_seq > attempt_seq {
308					trace!("Joining new round attempt {}:{}", e.round_seq, e.attempt_seq);
309					self.try_start_attempt(wallet, e).await;
310					return true;
311				}
312				trace!("Processing event {} for round attempt {}:{} in state {}",
313					event.kind(), round_seq, attempt_seq, state.kind(),
314				);
315
316				return match progress_attempt(state, wallet, &self.participation, event).await {
317					AttemptProgressResult::NotUpdated => false,
318					AttemptProgressResult::Updated { new_state } => {
319						*state = new_state;
320						true
321					},
322					AttemptProgressResult::Failed(e) => {
323						debug!("Round failed with error: {:#}", e);
324						self.flow = RoundFlowState::Failed {
325							error: format!("{:#}", e),
326						};
327						true
328					},
329					AttemptProgressResult::Finished { funding_tx, vtxos, unlock_hash } => {
330						self.new_vtxos = vtxos;
331						let funding_txid = funding_tx.compute_txid();
332						self.flow = RoundFlowState::Finished { funding_tx, unlock_hash };
333						if let Some(mid) = self.movement_id {
334							if let Err(e) = update_funding_txid(wallet, mid, funding_txid).await {
335								warn!("Error updating the round funding txid: {:#}", e);
336							}
337						}
338						true
339					},
340				};
341			},
342			RoundFlowState::NonInteractivePending { .. }
343				| RoundFlowState::Finished { .. }
344				| RoundFlowState::Failed { .. }
345				| RoundFlowState::Canceled => return false,
346		};
347	}
348
349	/// Sync the round's status and return it
350	///
351	/// When success or failure is returned, the round state can be eliminated
352	//TODO(stevenroose) make RoundState manage its own db record
353	pub async fn sync(&mut self, wallet: &Wallet) -> anyhow::Result<RoundStatus> {
354		match self.flow {
355			RoundFlowState::Finished { ref funding_tx, .. } if self.done => {
356				Ok(RoundStatus::Confirmed {
357					funding_txid: funding_tx.compute_txid(),
358				})
359			},
360
361			RoundFlowState::InteractivePending | RoundFlowState::InteractiveOngoing { .. } => {
362				Ok(RoundStatus::Pending)
363			},
364			RoundFlowState::Failed { ref error } => {
365				persist_round_failure(wallet, &self.participation, self.movement_id).await
366					.context("failed to persist round failure")?;
367				Ok(RoundStatus::Failed { error: error.clone() })
368			},
369			RoundFlowState::Canceled => {
370				persist_round_failure(wallet, &self.participation, self.movement_id).await
371					.context("failed to persist round failure")?;
372				Ok(RoundStatus::Canceled)
373			},
374
375			RoundFlowState::NonInteractivePending { unlock_hash } => {
376				match progress_non_interactive(wallet, &self.participation, unlock_hash).await {
377					Ok(HarkProgressResult::RoundPending) => Ok(RoundStatus::Pending),
378					Ok(HarkProgressResult::Ok { funding_tx, new_vtxos }) => {
379						let funding_txid = funding_tx.compute_txid();
380						self.new_vtxos = new_vtxos;
381						self.flow = RoundFlowState::Finished {
382							funding_tx: funding_tx.clone(),
383							unlock_hash: unlock_hash,
384						};
385
386						persist_round_success(
387							wallet,
388							&self.participation,
389							self.movement_id,
390							&self.new_vtxos,
391							&funding_tx,
392						).await.context("failed to store successful round in DB!")?;
393
394						self.done = true;
395
396						Ok(RoundStatus::Confirmed { funding_txid })
397					},
398					Ok(HarkProgressResult::FundingTxUnconfirmed { funding_txid }) => {
399						if let Some(mid) = self.movement_id {
400							update_funding_txid(wallet, mid, funding_txid).await
401								.context("failed to update funding txid in DB")?;
402						}
403						Ok(RoundStatus::Unconfirmed { funding_txid })
404					},
405
406					//TODO(stevenroose) should we mark as failed for these cases?
407
408					Err(HarkForfeitError::Err(e)) => {
409						//TODO(stevenroose) we failed here but we might actualy be able to
410						// succeed if we retry. should we implement some kind of limited
411						// retry after which we mark as failed?
412						Err(e.context("error progressing non-interactive round"))
413					},
414					Err(HarkForfeitError::SentForfeits(e)) => {
415						self.sent_forfeit_sigs = true;
416						Err(e.context("error progressing non-interactive round \
417							after sending forfeit tx signatures"))
418					},
419				}
420			},
421			// interactive part finished, but didn't forfeit yet
422			RoundFlowState::Finished { ref funding_tx, unlock_hash } => {
423				let funding_txid = funding_tx.compute_txid();
424				let confirmed = check_funding_tx_confirmations(
425					wallet, funding_txid, &funding_tx,
426				).await.context("error checking funding tx confirmations")?;
427				if !confirmed {
428					trace!("Funding tx {} not yet deeply enough confirmed", funding_txid);
429					return Ok(RoundStatus::Unconfirmed { funding_txid });
430				}
431
432				match hark_vtxo_swap(
433					wallet, &self.participation, &mut self.new_vtxos, &funding_tx, unlock_hash,
434				).await {
435					Ok(()) => {
436						persist_round_success(
437							wallet,
438							&self.participation,
439							self.movement_id,
440							&self.new_vtxos,
441							&funding_tx,
442						).await.context("failed to store successful round in DB!")?;
443
444						self.done = true;
445
446						Ok(RoundStatus::Confirmed { funding_txid })
447					},
448					Err(HarkForfeitError::Err(e)) => {
449						Err(e.context("error forfeiting VTXOs after round"))
450					},
451					Err(HarkForfeitError::SentForfeits(e)) => {
452						self.sent_forfeit_sigs = true;
453						Err(e.context("error after having signed and sent \
454							forfeit signatures to server"))
455					},
456				}
457			},
458		}
459	}
460
461	/// Once we know the signed round funding tx, this returns the output VTXOs
462	/// for this round.
463	pub fn output_vtxos(&self) -> Option<&[Vtxo]> {
464		if self.new_vtxos.is_empty() {
465			None
466		} else {
467			Some(&self.new_vtxos)
468		}
469	}
470
471	/// Returns the input VTXOs that are locked in this round, but only
472	/// if no output VTXOs were issued yet.
473	pub fn locked_pending_inputs(&self) -> &[Vtxo] {
474		//TODO(stevenroose) consider if we can't just drop the state after forfeit exchange
475		match self.flow {
476			RoundFlowState::NonInteractivePending { .. }
477				| RoundFlowState::InteractivePending
478				| RoundFlowState::InteractiveOngoing { .. }
479			=> {
480				&self.participation.inputs
481			},
482			RoundFlowState::Failed { .. }
483				| RoundFlowState::Finished { .. }
484				| RoundFlowState::Canceled
485			=> {
486				// inputs already unlocked
487				&[]
488			},
489		}
490	}
491}
492
493/// The state of the process flow of a round
494///
495/// This tracks the progress of the interactive part of the round, from
496/// waiting to start until finishing either succesfully or with a failure.
497pub enum RoundFlowState {
498	/// We don't do flow and we just wait for the round to finish
499	NonInteractivePending {
500		unlock_hash: UnlockHash,
501	},
502
503	/// Waiting for round to happen
504	InteractivePending,
505	/// Interactive part ongoing
506	InteractiveOngoing {
507		round_seq: RoundSeq,
508		attempt_seq: usize,
509		state: AttemptState,
510	},
511
512	/// Interactive part finished, waiting for confirmation
513	Finished {
514		funding_tx: Transaction,
515		unlock_hash: UnlockHash,
516	},
517
518	/// Failed during round
519	Failed {
520		error: String,
521	},
522
523	/// User canceled round
524	Canceled,
525}
526
527/// The state of a single round attempt
528///
529/// For each attempt that we participate in, we keep the state of our concrete
530/// participation.
531pub enum AttemptState {
532	AwaitingAttempt,
533	AwaitingUnsignedVtxoTree {
534		cosign_keys: Vec<Keypair>,
535		secret_nonces: Vec<Vec<DangerousSecretNonce>>,
536		unlock_hash: UnlockHash,
537	},
538	AwaitingFinishedRound {
539		unsigned_round_tx: Transaction,
540		vtxos_spec: VtxoTreeSpec,
541		unlock_hash: UnlockHash,
542	},
543}
544
545impl AttemptState {
546	/// The state kind represented as a string
547	fn kind(&self) -> &'static str {
548		match self {
549			Self::AwaitingAttempt => "AwaitingAttempt",
550			Self::AwaitingUnsignedVtxoTree { .. } => "AwaitingUnsignedVtxoTree",
551			Self::AwaitingFinishedRound { .. } => "AwaitingFinishedRound",
552		}
553	}
554}
555
556/// Result from trying to progress an ongoing round attempt
557enum AttemptProgressResult {
558	Finished {
559		funding_tx: Transaction,
560		vtxos: Vec<Vtxo>,
561		unlock_hash: UnlockHash,
562	},
563	Failed(anyhow::Error),
564	/// When the state changes, this variant is returned
565	///
566	/// If during the processing, we have signed any forfeit txs and tried
567	/// sending them to the server, the [UnconfirmedRound] instance is returned
568	/// so that it can be stored in the state.
569	Updated {
570		new_state: AttemptState,
571	},
572	NotUpdated,
573}
574
575/// Participate in the new round attempt by submitting our round participation
576async fn start_attempt(
577	wallet: &Wallet,
578	participation: &RoundParticipation,
579	event: &RoundAttempt,
580) -> anyhow::Result<AttemptState> {
581	let mut srv = wallet.require_server().context("server not available")?;
582	let ark_info = srv.ark_info().await?;
583
584	// Assign cosign pubkeys to the payment requests.
585	let cosign_keys = iter::repeat_with(|| Keypair::new(&SECP, &mut rand::thread_rng()))
586		.take(participation.outputs.len())
587		.collect::<Vec<_>>();
588
589	// Prepare round participation info.
590	// For each of our requested vtxo output, we need a set of public and secret nonces.
591	let cosign_nonces = cosign_keys.iter()
592		.map(|key| {
593			let mut secs = Vec::with_capacity(ark_info.nb_round_nonces);
594			let mut pubs = Vec::with_capacity(ark_info.nb_round_nonces);
595			for _ in 0..ark_info.nb_round_nonces {
596				let (s, p) = musig::nonce_pair(key);
597				secs.push(s);
598				pubs.push(p);
599			}
600			(secs, pubs)
601		})
602		.take(participation.outputs.len())
603		.collect::<Vec<(Vec<SecretNonce>, Vec<PublicNonce>)>>();
604
605
606	// The round has now started. We can submit our payment.
607	debug!("Submitting payment request with {} inputs and {} vtxo outputs",
608		participation.inputs.len(), participation.outputs.len(),
609	);
610
611	let signed_reqs = participation.outputs.iter()
612		.zip(cosign_keys.iter())
613		.zip(cosign_nonces.iter())
614		.map(|((req, cosign_key), (_sec, pub_nonces))| {
615			SignedVtxoRequest {
616				vtxo: req.clone(),
617				cosign_pubkey: cosign_key.public_key(),
618				nonces: pub_nonces.clone(),
619			}
620		})
621		.collect::<Vec<_>>();
622
623	let mut input_vtxos = Vec::with_capacity(participation.inputs.len());
624	for vtxo in participation.inputs.iter() {
625		let keypair = wallet.get_vtxo_key(vtxo).await
626			.map_err(HarkForfeitError::Err)?;
627		input_vtxos.push(protos::InputVtxo {
628			vtxo_id: vtxo.id().to_bytes().to_vec(),
629			ownership_proof: {
630				let sig = event.challenge
631					.sign_with(vtxo.id(), &signed_reqs, &keypair);
632				sig.serialize().to_vec()
633			},
634		});
635	}
636
637	let resp = srv.client.submit_payment(protos::SubmitPaymentRequest {
638		input_vtxos: input_vtxos,
639		vtxo_requests: signed_reqs.into_iter().map(Into::into).collect(),
640		#[allow(deprecated)]
641		offboard_requests: vec![],
642	}).await.context("Ark server refused our payment submission")?;
643
644	Ok(AttemptState::AwaitingUnsignedVtxoTree {
645		unlock_hash: UnlockHash::from_bytes(&resp.into_inner().unlock_hash)?,
646		cosign_keys: cosign_keys,
647		secret_nonces: cosign_nonces.into_iter()
648			.map(|(sec, _pub)| sec.into_iter()
649				.map(DangerousSecretNonce::dangerous_from_secret_nonce)
650				.collect())
651			.collect(),
652	})
653}
654
655/// just an internal type; need Error trait to work with anyhow
656#[derive(Debug, thiserror::Error)]
657enum HarkForfeitError {
658	/// An error happened after we sent forfeit signatures to the server
659	#[error("error after forfeits were sent: {0}")]
660	SentForfeits(anyhow::Error),
661	/// An error happened before we sent forfeit signatures to the server
662	#[error("error before forfeits were sent: {0}")]
663	Err(anyhow::Error),
664}
665
666async fn hark_cosign_leaf(
667	wallet: &Wallet,
668	srv: &mut ServerConnection,
669	funding_tx: &Transaction,
670	vtxo: &mut Vtxo,
671) -> anyhow::Result<()> {
672	let key = wallet.pubkey_keypair(&vtxo.user_pubkey()).await
673		.context("error fetching keypair").map_err(HarkForfeitError::Err)?
674		.with_context(|| format!(
675			"keypair {} not found for VTXO {}", vtxo.user_pubkey(), vtxo.id(),
676		))?.1;
677	let (ctx, cosign_req) = LeafVtxoCosignContext::new(vtxo, funding_tx, &key);
678	let cosign_resp = srv.client.request_leaf_vtxo_cosign(
679		protos::LeafVtxoCosignRequest::from(cosign_req),
680	).await
681		.with_context(|| format!("error requesting leaf cosign for vtxo {}", vtxo.id()))?
682		.into_inner().try_into()
683		.context("bad leaf vtxo cosign response")?;
684	ensure!(ctx.finalize(vtxo, cosign_resp),
685		"failed to finalize VTXO leaf signature for VTXO {}", vtxo.id(),
686	);
687
688	Ok(())
689}
690
691/// Finish the hArk VTXO swap protocol
692///
693/// This includes:
694/// - requesting cosignature of the locked hArk leaves
695/// - sending forfeit txs to the server in return for the unlock preimage
696///
697/// NB all the actions in this function are idempotent, meaning that the server
698/// allows them to be done multiple times. this means that if this function calls
699/// fails, it's safe to just call it again at a later time
700async fn hark_vtxo_swap(
701	wallet: &Wallet,
702	participation: &RoundParticipation,
703	output_vtxos: &mut [Vtxo],
704	funding_tx: &Transaction,
705	unlock_hash: UnlockHash,
706) -> Result<(), HarkForfeitError> {
707	let mut srv = wallet.require_server().map_err(HarkForfeitError::Err)?;
708
709	// first get the leaves signed
710	for vtxo in output_vtxos.iter_mut() {
711		hark_cosign_leaf(wallet, &mut srv, funding_tx, vtxo).await
712			.map_err(HarkForfeitError::Err)?;
713	}
714
715	// then do the forfeit dance
716
717	let server_nonces = srv.client.request_forfeit_nonces(protos::ForfeitNoncesRequest {
718		unlock_hash: unlock_hash.to_byte_array().to_vec(),
719		vtxo_ids: participation.inputs.iter().map(|v| v.id().to_bytes().to_vec()).collect(),
720	}).await
721		.context("request forfeits nonces call failed")
722		.map_err(HarkForfeitError::Err)?
723		.into_inner().public_nonces.into_iter()
724		.map(|b| HashLockedForfeitNonces::from_bytes(b))
725		.collect::<Result<Vec<_>, _>>()
726		.context("invalid forfeit nonces")
727		.map_err(HarkForfeitError::Err)?;
728
729	if server_nonces.len() != participation.inputs.len() {
730		return Err(HarkForfeitError::Err(anyhow!(
731			"server sent {} nonce pairs, expected {}",
732			server_nonces.len(), participation.inputs.len(),
733		)));
734	}
735
736	let mut forfeit_bundles = Vec::with_capacity(participation.inputs.len());
737	for (input, nonces) in participation.inputs.iter().zip(server_nonces.into_iter()) {
738		let user_key = wallet.pubkey_keypair(&input.user_pubkey()).await
739			.ok().flatten().with_context(|| format!(
740				"failed to fetch keypair for vtxo user pubkey {}", input.user_pubkey(),
741			)).map_err(HarkForfeitError::Err)?.1;
742		forfeit_bundles.push(HashLockedForfeitBundle::forfeit_vtxo(
743			&input, unlock_hash, &user_key, &nonces,
744		))
745	}
746
747	let preimage = srv.client.forfeit_vtxos(protos::ForfeitVtxosRequest {
748		forfeit_bundles: forfeit_bundles.iter().map(|b| b.serialize()).collect(),
749	}).await
750		.context("forfeit vtxos call failed")
751		.map_err(HarkForfeitError::SentForfeits)?
752		.into_inner().unlock_preimage.as_slice().try_into()
753		.context("invalid preimage length")
754		.map_err(HarkForfeitError::SentForfeits)?;
755
756	for vtxo in output_vtxos.iter_mut() {
757		if !vtxo.provide_unlock_preimage(preimage) {
758			return Err(HarkForfeitError::SentForfeits(anyhow!(
759				"invalid preimage {} for vtxo {} with supposed unlock hash {}",
760				preimage.as_hex(), vtxo.id(), unlock_hash,
761			)));
762		}
763
764		// then validate the vtxo works
765		vtxo.validate(&funding_tx).with_context(|| format!(
766			"new VTXO {} does not pass validation after hArk forfeit protocol", vtxo.id(),
767		)).map_err(HarkForfeitError::SentForfeits)?;
768	}
769
770	Ok(())
771}
772
773fn check_vtxo_fails_hash_lock(funding_tx: &Transaction, vtxo: &Vtxo) -> anyhow::Result<()> {
774	match vtxo.validate(funding_tx) {
775		Err(VtxoValidationError::GenesisTransition {
776			genesis_idx, genesis_len, transition_kind, ..
777		}) if genesis_idx + 1 == genesis_len && transition_kind == HARK_TRANSITION_KIND => Ok(()),
778		Ok(()) => Err(anyhow!("new un-unlocked VTXO should fail validation but doesn't: {}",
779			vtxo.serialize_hex(),
780		)),
781		Err(e) => Err(anyhow!("new VTXO {} failed validation: {:#}", vtxo.id(), e)),
782	}
783}
784
785fn check_round_matches_participation(
786	part: &RoundParticipation,
787	new_vtxos: &[Vtxo],
788	funding_tx: &Transaction,
789) -> anyhow::Result<()> {
790	ensure!(new_vtxos.len() == part.outputs.len(),
791		"unexpected number of VTXOs: got {}, expected {}", new_vtxos.len(), part.outputs.len(),
792	);
793
794	for (vtxo, req) in new_vtxos.iter().zip(&part.outputs) {
795		ensure!(vtxo.amount() == req.amount,
796			"unexpected VTXO amount: got {}, expected {}", vtxo.amount(), req.amount,
797		);
798		ensure!(*vtxo.policy() == req.policy,
799			"unexpected VTXO policy: got {:?}, expected {:?}", vtxo.policy(), req.policy,
800		);
801
802		// We accept the VTXO if only the hArk transition (last) failure happens
803		check_vtxo_fails_hash_lock(funding_tx, vtxo)?;
804	}
805
806	Ok(())
807}
808
809/// Check the confirmation status of a funding tx
810///
811/// Returns true if the funding tx is confirmed deeply enough for us to accept it.
812/// The required number of confirmations depends on the wallet's configuration.
813///
814/// Returns false if the funding tx seems valid but not confirmed yet.
815///
816/// Returns an error if the chain source fails or if we can't submit the tx to the
817/// mempool, suggesting it might be double spent.
818async fn check_funding_tx_confirmations(
819	wallet: &Wallet,
820	funding_txid: Txid,
821	funding_tx: &Transaction,
822) -> anyhow::Result<bool> {
823	let tip = wallet.chain.tip().await.context("chain source error")?;
824	let conf_height = tip - wallet.config.round_tx_required_confirmations + 1;
825	let tx_status = wallet.chain.tx_status(funding_txid).await.context("chain source error")?;
826	trace!("Round funding tx {} confirmation status: {:?} (tip={})",
827		funding_txid, tx_status, tip,
828	);
829	match tx_status {
830		TxStatus::Confirmed(b) if b.height <= conf_height => Ok(true),
831		TxStatus::Mempool | TxStatus::Confirmed(_) => {
832			if wallet.config.round_tx_required_confirmations == 0 {
833				debug!("Accepting round funding tx without confirmations because of configuration");
834				Ok(true)
835			} else {
836				trace!("Hark round funding tx not confirmed (deep enough) yet: {:?}", tx_status);
837				Ok(false)
838			}
839		},
840		TxStatus::NotFound => {
841			// let's try to submit it to our mempool
842			//TODO(stevenroose) change this to an explicit "testmempoolaccept" so that we can
843			// reliably distinguish the cases of our chain source having issues and the tx
844			// actually being rejected which suggests the round was double-spent
845			if let Err(e) = wallet.chain.broadcast_tx(&funding_tx).await {
846				Err(anyhow!("hark funding tx {} server sent us is rejected by mempool (hex={}): {:#}",
847					funding_txid, serialize_hex(funding_tx), e,
848				))
849			} else {
850				trace!("hark funding tx {} was not in mempool but we broadcast it", funding_txid);
851				Ok(false)
852			}
853		},
854	}
855}
856
857enum HarkProgressResult {
858	RoundPending,
859	FundingTxUnconfirmed {
860		funding_txid: Txid,
861	},
862	Ok {
863		funding_tx: Transaction,
864		new_vtxos: Vec<Vtxo>,
865	},
866}
867
868async fn progress_non_interactive(
869	wallet: &Wallet,
870	participation: &RoundParticipation,
871	unlock_hash: UnlockHash,
872) -> Result<HarkProgressResult, HarkForfeitError> {
873	let mut srv = wallet.require_server().map_err(HarkForfeitError::Err)?;
874
875	let resp = srv.client.round_participation_status(protos::RoundParticipationStatusRequest {
876		unlock_hash: unlock_hash.to_byte_array().to_vec(),
877	}).await
878		.context("error checking round participation status")
879		.map_err(HarkForfeitError::Err)?.into_inner();
880	let status = protos::RoundParticipationStatus::try_from(resp.status)
881		.context("unknown status from server")
882		.map_err(HarkForfeitError::Err)	?;
883
884	if status == protos::RoundParticipationStatus::RoundPartPending {
885		trace!("Hark round still pending");
886		return Ok(HarkProgressResult::RoundPending);
887	}
888
889	// Since we got here, we clearly don't think we're finished.
890	// So even if the server thinks we did the dance before, we need the
891	// cosignature on the leaf tx so we need to do the dance again.
892	// "Guilty feet have got no rhythm."
893	if status == protos::RoundParticipationStatus::RoundPartReleased {
894		let preimage = resp.unlock_preimage.as_ref().map(|p| p.as_hex());
895		warn!("Server says preimage was already released for hArk participation \
896			with unlock hash {}. Supposed preimage: {:?}", unlock_hash, preimage,
897		);
898	}
899
900	let funding_tx_bytes = resp.round_funding_tx
901		.context("funding txid should be provided when status is not pending")
902		.map_err(HarkForfeitError::Err)?;
903	let funding_tx = deserialize::<Transaction>(&funding_tx_bytes)
904		.context("invalid funding txid")
905		.map_err(HarkForfeitError::Err)?;
906	let funding_txid = funding_tx.compute_txid();
907	trace!("Funding tx for round participation with unlock hash {}: {} ({})",
908		unlock_hash, funding_tx.compute_txid(), funding_tx_bytes.as_hex(),
909	);
910
911	// Check the confirmation status of the funding tx
912	match check_funding_tx_confirmations(wallet, funding_txid, &funding_tx).await {
913		Ok(true) => {},
914		Ok(false) => return Ok(HarkProgressResult::FundingTxUnconfirmed { funding_txid }),
915		Err(e) => return Err(HarkForfeitError::Err(e.context("checking funding tx confirmations"))),
916	}
917
918	let mut new_vtxos = resp.output_vtxos.into_iter()
919		.map(|v| Vtxo::from_bytes(v))
920		.collect::<Result<Vec<_>, _>>()
921		.context("invalid output VTXOs from server")
922		.map_err(HarkForfeitError::Err)?;
923
924	// Check that the vtxos match our participation in the exact order
925	check_round_matches_participation(participation, &new_vtxos, &funding_tx)
926		.context("new VTXOs received from server don't match our participation")
927		.map_err(HarkForfeitError::Err)?;
928
929	hark_vtxo_swap(wallet, participation, &mut new_vtxos, &funding_tx, unlock_hash).await
930		.context("error forfeiting hArk VTXOs")
931		.map_err(HarkForfeitError::SentForfeits)?;
932
933	Ok(HarkProgressResult::Ok { funding_tx, new_vtxos })
934}
935
936async fn progress_attempt(
937	state: &AttemptState,
938	wallet: &Wallet,
939	part: &RoundParticipation,
940	event: &RoundEvent,
941) -> AttemptProgressResult {
942	// we will match only the states and messages required to make progress,
943	// all else we ignore, except an unexpected finish
944
945	match (state, event) {
946
947		(
948			AttemptState::AwaitingUnsignedVtxoTree { cosign_keys, secret_nonces, unlock_hash },
949			RoundEvent::VtxoProposal(e),
950		) => {
951			trace!("Received VtxoProposal: {:#?}", e);
952			match sign_vtxo_tree(
953				wallet,
954				part,
955				&cosign_keys,
956				&secret_nonces,
957				&e.unsigned_round_tx,
958				&e.vtxos_spec,
959				&e.cosign_agg_nonces,
960			).await {
961				Ok(()) => {
962					AttemptProgressResult::Updated {
963						new_state: AttemptState::AwaitingFinishedRound {
964							unsigned_round_tx: e.unsigned_round_tx.clone(),
965							vtxos_spec: e.vtxos_spec.clone(),
966							unlock_hash: *unlock_hash,
967						},
968					}
969				},
970				Err(e) => {
971					trace!("Error signing VTXO tree: {:#}", e);
972					AttemptProgressResult::Failed(e)
973				},
974			}
975		},
976
977		(
978			AttemptState::AwaitingFinishedRound { unsigned_round_tx, vtxos_spec, unlock_hash },
979			RoundEvent::Finished(RoundFinished { cosign_sigs, signed_round_tx, .. }),
980		) => {
981			if unsigned_round_tx.compute_txid() != signed_round_tx.compute_txid() {
982				return AttemptProgressResult::Failed(anyhow!(
983					"signed funding tx ({}) doesn't match tx received before ({})",
984					signed_round_tx.compute_txid(), unsigned_round_tx.compute_txid(),
985				));
986			}
987
988			if let Err(e) = wallet.chain.broadcast_tx(&signed_round_tx).await {
989				warn!("Failed to broadcast signed round tx: {:#}", e);
990			}
991
992			match construct_new_vtxos(
993				part, unsigned_round_tx, vtxos_spec, cosign_sigs,
994			).await {
995				Ok(v) => AttemptProgressResult::Finished {
996					funding_tx: signed_round_tx.clone(),
997					vtxos: v,
998					unlock_hash: *unlock_hash,
999				},
1000				Err(e) => AttemptProgressResult::Failed(anyhow!(
1001					"failed to construct new VTXOs for round: {:#}", e,
1002				)),
1003			}
1004		},
1005
1006		(state, RoundEvent::Finished(RoundFinished { .. })) => {
1007			AttemptProgressResult::Failed(anyhow!(
1008				"unexpectedly received a finished round while we were in state {}",
1009				state.kind(),
1010			))
1011		},
1012
1013		(state, _) => {
1014			trace!("Ignoring round event {} in state {}", event.kind(), state.kind());
1015			AttemptProgressResult::NotUpdated
1016		},
1017	}
1018}
1019
1020async fn sign_vtxo_tree(
1021	wallet: &Wallet,
1022	participation: &RoundParticipation,
1023	cosign_keys: &[Keypair],
1024	secret_nonces: &[impl AsRef<[DangerousSecretNonce]>],
1025	unsigned_round_tx: &Transaction,
1026	vtxo_tree: &VtxoTreeSpec,
1027	cosign_agg_nonces: &[musig::AggregatedNonce],
1028) -> anyhow::Result<()> {
1029	let srv = wallet.require_server().context("server not available")?;
1030
1031	if unsigned_round_tx.output.len() < MIN_ROUND_TX_OUTPUTS {
1032		bail!("server sent round tx with less than 2 outputs: {}",
1033			serialize_hex(&unsigned_round_tx),
1034		);
1035	}
1036
1037	let vtxos_utxo = OutPoint::new(unsigned_round_tx.compute_txid(), ROUND_TX_VTXO_TREE_VOUT);
1038
1039	// Check that the proposal contains our inputs.
1040	let mut my_vtxos = participation.outputs.iter().collect::<Vec<_>>();
1041	for vtxo_req in vtxo_tree.iter_vtxos() {
1042		if let Some(i) = my_vtxos.iter().position(|v| {
1043			v.policy == vtxo_req.vtxo.policy && v.amount == vtxo_req.vtxo.amount
1044		}) {
1045			my_vtxos.swap_remove(i);
1046		}
1047	}
1048	if !my_vtxos.is_empty() {
1049		bail!("server didn't include all of our vtxos, missing: {:?}", my_vtxos);
1050	}
1051
1052	let unsigned_vtxos = vtxo_tree.clone().into_unsigned_tree(vtxos_utxo);
1053	let iter = participation.outputs.iter().zip(cosign_keys).zip(secret_nonces);
1054	trace!("Sending vtxo signatures to server...");
1055	let _ = try_join_all(iter.map(|((req, key), sec)| async {
1056		let leaf_idx = unsigned_vtxos.spec.leaf_idx_of_req(req).expect("req included");
1057		let secret_nonces = sec.as_ref().iter().map(|s| s.to_sec_nonce()).collect();
1058		let part_sigs = unsigned_vtxos.cosign_branch(
1059			&cosign_agg_nonces, leaf_idx, key, secret_nonces,
1060		).context("failed to cosign branch: our request not part of tree")?;
1061
1062		info!("Sending {} partial vtxo cosign signatures for pk {}",
1063			part_sigs.len(), key.public_key(),
1064		);
1065
1066		let _ = srv.client.clone().provide_vtxo_signatures(protos::VtxoSignaturesRequest {
1067			pubkey: key.public_key().serialize().to_vec(),
1068			signatures: part_sigs.iter().map(|s| s.serialize().to_vec()).collect(),
1069		}).await.context("error sending vtxo signatures")?;
1070
1071		Result::<(), anyhow::Error>::Ok(())
1072	})).await.context("error sending VTXO signatures")?;
1073	trace!("Done sending vtxo signatures to server");
1074
1075	Ok(())
1076}
1077
1078async fn construct_new_vtxos(
1079	participation: &RoundParticipation,
1080	unsigned_round_tx: &Transaction,
1081	vtxo_tree: &VtxoTreeSpec,
1082	vtxo_cosign_sigs: &[schnorr::Signature],
1083) -> anyhow::Result<Vec<Vtxo>> {
1084	let round_txid = unsigned_round_tx.compute_txid();
1085	let vtxos_utxo = OutPoint::new(round_txid, ROUND_TX_VTXO_TREE_VOUT);
1086	let vtxo_tree = vtxo_tree.clone().into_unsigned_tree(vtxos_utxo);
1087
1088	// Validate the vtxo tree and cosign signatures.
1089	if vtxo_tree.verify_cosign_sigs(&vtxo_cosign_sigs).is_err() {
1090		// bad server!
1091		bail!("Received incorrect vtxo cosign signatures from server");
1092	}
1093
1094	let signed_vtxos = vtxo_tree
1095		.into_signed_tree(vtxo_cosign_sigs.to_vec())
1096		.into_cached_tree();
1097
1098	let mut expected_vtxos = participation.outputs.iter().collect::<Vec<_>>();
1099	let total_nb_expected_vtxos = expected_vtxos.len();
1100
1101	let mut new_vtxos = vec![];
1102	for (idx, req) in signed_vtxos.spec.spec.vtxos.iter().enumerate() {
1103		if let Some(expected_idx) = expected_vtxos.iter().position(|r| **r == req.vtxo) {
1104			let vtxo = signed_vtxos.build_vtxo(idx);
1105
1106			// validate the received vtxos
1107			// This is more like a sanity check since we crafted them ourselves.
1108			check_vtxo_fails_hash_lock(unsigned_round_tx, &vtxo)
1109				.context("constructed invalid vtxo from tree")?;
1110
1111			info!("New VTXO from round: {} ({}, {})",
1112				vtxo.id(), vtxo.amount(), vtxo.policy_type(),
1113			);
1114
1115			new_vtxos.push(vtxo);
1116			expected_vtxos.swap_remove(expected_idx);
1117		}
1118	}
1119
1120	if !expected_vtxos.is_empty() {
1121		if expected_vtxos.len() == total_nb_expected_vtxos {
1122			// we must have done something wrong
1123			bail!("None of our VTXOs were present in round!");
1124		} else {
1125			bail!("Server included some of our VTXOs but not all: {} missing: {:?}",
1126				expected_vtxos.len(), expected_vtxos,
1127			);
1128		}
1129	}
1130	Ok(new_vtxos)
1131}
1132
1133//TODO(stevenroose) should be made idempotent
1134async fn persist_round_success(
1135	wallet: &Wallet,
1136	participation: &RoundParticipation,
1137	movement_id: Option<MovementId>,
1138	new_vtxos: &[Vtxo],
1139	funding_tx: &Transaction,
1140) -> anyhow::Result<()> {
1141	debug!("Persisting newly finished round. {} new vtxos, movement ID {:?}",
1142		new_vtxos.len(), movement_id,
1143	);
1144
1145	// we first try all actions that need to happen and only afterwards return errors
1146	// so that we achieve maximum success
1147
1148	let store_result = wallet.store_spendable_vtxos(new_vtxos).await
1149		.context("failed to store new VTXOs");
1150	let spent_result = wallet.mark_vtxos_as_spent(&participation.inputs).await
1151		.context("failed to mark input VTXOs as spent");
1152	let update_result = if let Some(mid) = movement_id {
1153		wallet.movements.finish_movement_with_update(
1154			mid,
1155			MovementStatus::Successful,
1156			MovementUpdate::new()
1157				.produced_vtxos(new_vtxos)
1158				.metadata([("funding_txid".into(), serde_json::to_value(funding_tx.compute_txid())?)]),
1159		).await.context("failed to mark movement as finished")
1160	} else {
1161		Ok(())
1162	};
1163
1164	store_result?;
1165	spent_result?;
1166	update_result?;
1167
1168	Ok(())
1169}
1170
1171//TODO(stevenroose) should be made idempotent
1172async fn persist_round_failure(
1173	wallet: &Wallet,
1174	participation: &RoundParticipation,
1175	movement_id: Option<MovementId>,
1176) -> anyhow::Result<()> {
1177	debug!("Attempting to persist the failure of a round with the movement ID {:?}", movement_id);
1178	let unlock_result = wallet.unlock_vtxos(&participation.inputs).await;
1179	let finish_result = if let Some(movement_id) = movement_id {
1180		wallet.movements.finish_movement(movement_id, MovementStatus::Failed).await
1181	} else {
1182		Ok(())
1183	};
1184	if let Err(e) = &finish_result {
1185		error!("Failed to mark movement as failed: {:#}", e);
1186	}
1187	match (unlock_result, finish_result) {
1188		(Ok(()), Ok(())) => Ok(()),
1189		(Err(e), _) => Err(e),
1190		(_, Err(e)) => Err(anyhow!("Failed to mark movement as failed: {:#}", e)),
1191	}
1192}
1193
1194async fn update_funding_txid(
1195	wallet: &Wallet,
1196	movement_id: MovementId,
1197	funding_txid: Txid,
1198) -> anyhow::Result<()> {
1199	wallet.movements.update_movement(
1200		movement_id,
1201		MovementUpdate::new()
1202			.metadata([("funding_txid".into(), serde_json::to_value(&funding_txid)?)])
1203	).await.context("Unable to update funding txid of round")
1204}
1205
1206impl Wallet {
1207	/// Start a new round participation
1208	///
1209	/// This function will store the state in the db and mark the VTXOs as locked.
1210	pub async fn join_next_round(
1211		&self,
1212		participation: RoundParticipation,
1213		movement_kind: Option<RoundMovement>,
1214	) -> anyhow::Result<StoredRoundState> {
1215		participation.sanity_check().context("bad participations")?;
1216
1217		let movement_id = if let Some(kind) = movement_kind {
1218			Some(self.movements.new_movement_with_update(
1219				Subsystem::ROUND,
1220				kind.to_string(),
1221				participation.to_movement_update()?
1222			).await?)
1223		} else {
1224			None
1225		};
1226		let state = RoundState::new_interactive(participation, movement_id);
1227
1228		let id = self.db.store_round_state_lock_vtxos(&state).await?;
1229		Ok(StoredRoundState { id, state })
1230	}
1231
1232	pub async fn join_non_interactive_round(
1233		&self,
1234		participation: RoundParticipation,
1235		movement_kind: Option<RoundMovement>,
1236	) -> anyhow::Result<StoredRoundState> {
1237		participation.sanity_check().context("bad participations")?;
1238
1239		let movement_id = if let Some(kind) = movement_kind {
1240			let movement_id = self.movements.new_movement(
1241				Subsystem::ROUND, kind.to_string(),
1242			).await?;
1243			let update = participation.to_movement_update()?;
1244			self.movements.update_movement(movement_id, update).await?;
1245			Some(movement_id)
1246		} else {
1247			None
1248		};
1249		let state = RoundState::new_interactive(participation, movement_id);
1250
1251		let id = self.db.store_round_state_lock_vtxos(&state).await?;
1252		Ok(StoredRoundState { id, state })
1253	}
1254
1255	/// Get all pending round states
1256	pub async fn pending_round_states(&self) -> anyhow::Result<Vec<StoredRoundState>> {
1257		self.db.load_round_states().await
1258	}
1259
1260	/// Sync pending rounds that have finished but are waiting for confirmations
1261	pub async fn sync_pending_rounds(&self) -> anyhow::Result<()> {
1262		let states = self.db.load_round_states().await?;
1263		if !states.is_empty() {
1264			debug!("Syncing {} pending round states...", states.len());
1265
1266			tokio_stream::iter(states).for_each_concurrent(10, |mut state| async move {
1267				// not processing events here
1268				if state.state.ongoing_participation() {
1269					return;
1270				}
1271
1272				let status = state.state.sync(self).await;
1273				trace!("Synced round #{}, status: {:?}", state.id, status);
1274				match status {
1275					Ok(RoundStatus::Confirmed { funding_txid }) => {
1276						info!("Round confirmed. Funding tx {}", funding_txid);
1277						if let Err(e) = self.db.remove_round_state(&state).await {
1278							warn!("Error removing confirmed round state from db: {:#}", e);
1279						}
1280					},
1281					Ok(RoundStatus::Unconfirmed { funding_txid }) => {
1282						info!("Waiting for confirmations for round funding tx {}", funding_txid);
1283						if let Err(e) = self.db.update_round_state(&state).await {
1284							warn!("Error updating pending round state in db: {:#}", e);
1285						}
1286					},
1287					Ok(RoundStatus::Pending) => {
1288						if let Err(e) = self.db.update_round_state(&state).await {
1289							warn!("Error updating pending round state in db: {:#}", e);
1290						}
1291					},
1292					Ok(RoundStatus::Failed { error }) => {
1293						error!("Round failed: {}", error);
1294						if let Err(e) = self.db.remove_round_state(&state).await {
1295							warn!("Error removing failed round state from db: {:#}", e);
1296						}
1297					},
1298					Ok(RoundStatus::Canceled) => {
1299						error!("Round canceled");
1300						if let Err(e) = self.db.remove_round_state(&state).await {
1301							warn!("Error removing canceled round state from db: {:#}", e);
1302						}
1303					},
1304					Err(e) => warn!("Error syncing round: {:#}", e),
1305				}
1306			}).await;
1307		}
1308
1309		Ok(())
1310	}
1311
1312	/// Fetch last round event from server
1313	async fn get_last_round_event(&self) -> anyhow::Result<RoundEvent> {
1314		let mut srv = self.require_server()?;
1315		let e = srv.client.last_round_event(protos::Empty {}).await?.into_inner();
1316		Ok(RoundEvent::try_from(e).context("invalid event format from server")?)
1317	}
1318
1319	async fn inner_process_event(
1320		&self,
1321		states: impl IntoIterator<Item = &mut StoredRoundState>,
1322		event: Option<&RoundEvent>,
1323	) {
1324		tokio_stream::iter(states).for_each_concurrent(3, |state| async move {
1325			if let Some(event) = event && state.state.ongoing_participation() {
1326				let updated = state.state.process_event(self, &event).await;
1327				if updated {
1328					if let Err(e) = self.db.update_round_state(&state).await {
1329						error!("Error storing round state #{} after progress: {:#}", state.id, e);
1330					}
1331				}
1332			}
1333
1334			match state.state.sync(self).await {
1335				Err(e) => warn!("Error syncing round #{}: {:#}", state.id, e),
1336				Ok(s) if s.is_final() => {
1337					info!("Round #{} finished with result: {:?}", state.id, s);
1338					if let Err(e) = self.db.remove_round_state(&state).await {
1339						warn!("Failed to remove finished round #{} from db: {:#}", state.id, e);
1340					}
1341				},
1342				Ok(s) => {
1343					trace!("Round state #{} is now in state {:?}", state.id, s);
1344					if let Err(e) = self.db.update_round_state(&state).await {
1345						warn!("Error storing round state #{}: {:#}", state.id, e);
1346					}
1347				},
1348			}
1349		}).await;
1350	}
1351
1352	/// Try to make incremental progress on all pending round states
1353	///
1354	/// If the `last_round_event` argument is not provided, it will be fetched
1355	/// from the server.
1356	pub async fn progress_pending_rounds(
1357		&self,
1358		last_round_event: Option<&RoundEvent>,
1359	) -> anyhow::Result<()> {
1360		let mut states = self.db.load_round_states().await?;
1361		info!("Processing {} rounds...", states.len());
1362
1363		let mut last_round_event = last_round_event.map(|e| Cow::Borrowed(e));
1364		if states.iter().any(|s| s.state.ongoing_participation()) && last_round_event.is_none() {
1365			match self.get_last_round_event().await {
1366				Ok(e) => last_round_event = Some(Cow::Owned(e)),
1367				Err(e) => {
1368					warn!("Error fetching round event, \
1369						failed to progress ongoing rounds: {:#}", e);
1370				},
1371			}
1372		}
1373
1374		let event = last_round_event.as_ref().map(|c| c.as_ref());
1375		self.inner_process_event(states.iter_mut(), event).await;
1376
1377		Ok(())
1378	}
1379
1380	pub async fn subscribe_round_events(&self)
1381		-> anyhow::Result<impl Stream<Item = anyhow::Result<RoundEvent>> + Unpin>
1382	{
1383		let mut srv = self.require_server()?;
1384		let events = srv.client.subscribe_rounds(protos::Empty {}).await?
1385			.into_inner().map(|m| {
1386				let m = m.context("received error on event stream")?;
1387				let e = RoundEvent::try_from(m.clone())
1388					.with_context(|| format!("error converting rpc round event: {:?}", m))?;
1389				trace!("Received round event: {}", e);
1390				Ok::<_, anyhow::Error>(e)
1391			});
1392		Ok(events)
1393	}
1394
1395	/// A blocking call that will try to perform a full round participation
1396	/// for all ongoing rounds
1397	///
1398	/// Returns only once a round has happened on the server.
1399	pub async fn participate_ongoing_rounds(&self) -> anyhow::Result<()> {
1400		let mut states = self.db.load_round_states().await?;
1401		states.retain(|s| s.state.ongoing_participation());
1402
1403		if states.is_empty() {
1404			info!("No pending round states");
1405			return Ok(());
1406		}
1407
1408		let mut events = self.subscribe_round_events().await?;
1409
1410		info!("Participating with {} round states...", states.len());
1411
1412		loop {
1413			let event = events.next().await
1414				.context("events stream broke")?
1415				.context("error on event stream")?;
1416
1417			self.inner_process_event(states.iter_mut(), Some(&event)).await;
1418
1419			states.retain(|s| s.state.ongoing_participation());
1420			if states.is_empty() {
1421				info!("All rounds handled");
1422				return Ok(());
1423			}
1424		}
1425	}
1426
1427	/// Will cancel all pending rounds that can safely be canceled
1428	///
1429	/// All rounds that have not started yet can safely be canceled,
1430	/// as well as rounds where we have not yet signed any forfeit txs.
1431	pub async fn cancel_all_pending_rounds(&self) -> anyhow::Result<()> {
1432		let states = self.db.load_round_states().await?;
1433		for mut state in states {
1434			match state.state.try_cancel(self).await {
1435				Ok(true) => {
1436					if let Err(e) = self.db.remove_round_state(&state).await {
1437						warn!("Error removing canceled round state from db: {:#}", e);
1438					}
1439				},
1440				Ok(false) => {},
1441				Err(e) => warn!("Error trying to cancel round #{}: {:#}", state.id, e),
1442			}
1443		}
1444		Ok(())
1445	}
1446
1447	/// Try to cancel the given round
1448	pub async fn cancel_pending_round(&self, id: RoundStateId) -> anyhow::Result<()> {
1449		let states = self.db.load_round_states().await?;
1450		for mut state in states {
1451			if state.id != id {
1452				continue;
1453			}
1454
1455			if state.state.try_cancel(self).await.context("failed to cancel round")? {
1456				self.db.remove_round_state(&state).await
1457					.context("error removing canceled round state from db")?;
1458			} else {
1459				bail!("failed to cancel round");
1460			}
1461			return Ok(());
1462		}
1463		bail!("round not found")
1464	}
1465
1466	/// Participate in a round
1467	///
1468	/// This function will start a new round participation and block until
1469	/// the round is finished.
1470	/// After this method returns the round state will be kept active until
1471	/// the round tx fully confirms.
1472	pub(crate) async fn participate_round(
1473		&self,
1474		participation: RoundParticipation,
1475		movement_kind: Option<RoundMovement>,
1476	) -> anyhow::Result<RoundStatus> {
1477		let mut state = self.join_next_round(participation, movement_kind).await?;
1478
1479		info!("Waiting for a round start...");
1480		let mut events = self.subscribe_round_events().await?;
1481
1482		loop {
1483			if !state.state.ongoing_participation() {
1484				return Ok(state.state.sync(self).await?);
1485			}
1486
1487			let event = events.next().await
1488				.context("events stream broke")?
1489				.context("error on event stream")?;
1490			if state.state.process_event(self, &event).await {
1491				self.db.update_round_state(&state).await?;
1492			}
1493		}
1494	}
1495}