1use std::iter;
6use std::borrow::Cow;
7use std::convert::Infallible;
8
9use anyhow::Context;
10use ark::vtxo::VtxoValidationError;
11use bdk_esplora::esplora_client::Amount;
12use bip39::rand;
13use bitcoin::{OutPoint, SignedAmount, Transaction, Txid};
14use bitcoin::consensus::encode::{deserialize, serialize_hex};
15use bitcoin::hashes::Hash;
16use bitcoin::hex::DisplayHex;
17use bitcoin::key::Keypair;
18use bitcoin::secp256k1::schnorr;
19use futures::future::try_join_all;
20use futures::{Stream, StreamExt};
21use log::{debug, error, info, trace, warn};
22
23use ark::{ProtocolEncoding, SignedVtxoRequest, Vtxo, VtxoRequest};
24use ark::forfeit::{HashLockedForfeitBundle, HashLockedForfeitNonces};
25use ark::musig::{self, DangerousSecretNonce, PublicNonce, SecretNonce};
26use ark::rounds::{
27 RoundAttempt, RoundEvent, RoundFinished, RoundSeq, MIN_ROUND_TX_OUTPUTS,
28 ROUND_TX_VTXO_TREE_VOUT,
29};
30use ark::tree::signed::{LeafVtxoCosignContext, UnlockHash, VtxoTreeSpec};
31use bitcoin_ext::{TxStatus, P2TR_DUST};
32use server_rpc::{protos, ServerConnection, TryFromBytes};
33
34use crate::{SECP, Wallet};
35use crate::movement::{MovementId, MovementStatus};
36use crate::movement::update::MovementUpdate;
37use crate::persist::{RoundStateId, StoredRoundState};
38use crate::subsystem::{RoundMovement, Subsystem};
39
40
41const HARK_TRANSITION_KIND: &str = "hash-locked-cosigned";
43
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct RoundParticipation {
48 #[serde(with = "ark::encode::serde::vec")]
49 pub inputs: Vec<Vtxo>,
50 pub outputs: Vec<VtxoRequest>,
53}
54
55impl RoundParticipation {
56 pub fn sanity_check(&self) -> anyhow::Result<()> {
58 if let Some(payreq) = self.outputs.iter().find(|p| p.amount < P2TR_DUST) {
59 bail!("VTXO amount must be at least {}, requested {}", P2TR_DUST, payreq.amount);
60 }
61 Ok(())
62 }
63
64 pub fn to_movement_update(&self) -> anyhow::Result<MovementUpdate> {
65 let input_amount = self.inputs.iter().map(|i| i.amount()).sum::<Amount>();
66 let output_amount = self.outputs.iter().map(|r| r.amount).sum::<Amount>();
67 let fee = input_amount - output_amount;
68 Ok(MovementUpdate::new()
69 .consumed_vtxos(&self.inputs)
70 .intended_balance(SignedAmount::ZERO)
71 .effective_balance( - fee.to_signed()?)
72 .fee(fee)
73 )
74 }
75}
76
77#[derive(Debug, Clone)]
78pub enum RoundStatus {
79 Confirmed {
81 funding_txid: Txid,
82 },
83 Unconfirmed {
85 funding_txid: Txid,
86 },
87 Pending,
89 Failed {
91 error: String,
92 },
93 Canceled,
95}
96
97impl RoundStatus {
98 pub fn is_final(&self) -> bool {
100 match self {
101 Self::Confirmed { .. } => true,
102 Self::Unconfirmed { .. } => false,
103 Self::Pending => false,
104 Self::Failed { .. } => true,
105 Self::Canceled => true,
106 }
107 }
108
109 pub fn is_success(&self) -> bool {
111 match self {
112 Self::Confirmed { .. } => true,
113 Self::Unconfirmed { .. } => true,
114 Self::Pending => false,
115 Self::Failed { .. } => false,
116 Self::Canceled => false,
117 }
118 }
119}
120
121pub struct RoundState {
134 pub(crate) done: bool,
136
137 pub(crate) participation: RoundParticipation,
139
140 pub(crate) flow: RoundFlowState,
142
143 pub(crate) new_vtxos: Vec<Vtxo>,
148
149 pub(crate) sent_forfeit_sigs: bool,
156
157 pub(crate) movement_id: Option<MovementId>,
159}
160
161impl RoundState {
162 fn new_interactive(
163 participation: RoundParticipation,
164 movement_id: Option<MovementId>,
165 ) -> Self {
166 Self {
167 participation,
168 movement_id,
169 flow: RoundFlowState::InteractivePending,
170 new_vtxos: Vec::new(),
171 sent_forfeit_sigs: false,
172 done: false,
173 }
174 }
175
176 #[allow(unused)]
177 fn new_non_interactive(
178 participation: RoundParticipation,
179 unlock_hash: UnlockHash,
180 movement_id: Option<MovementId>,
181 ) -> Self {
182 Self {
183 participation,
184 movement_id,
185 flow: RoundFlowState::NonInteractivePending { unlock_hash },
186 new_vtxos: Vec::new(),
187 sent_forfeit_sigs: false,
188 done: false,
189 }
190 }
191
192 pub fn participation(&self) -> &RoundParticipation {
194 &self.participation
195 }
196
197 pub fn unlock_hash(&self) -> Option<UnlockHash> {
199 match self.flow {
200 RoundFlowState::NonInteractivePending { unlock_hash } => Some(unlock_hash),
201 RoundFlowState::InteractivePending => None,
202 RoundFlowState::InteractiveOngoing { .. } => None,
203 RoundFlowState::Failed { .. } => None,
204 RoundFlowState::Canceled => None,
205 RoundFlowState::Finished { unlock_hash, .. } => Some(unlock_hash),
206 }
207 }
208
209 pub fn funding_tx(&self) -> Option<&Transaction> {
210 match self.flow {
211 RoundFlowState::NonInteractivePending { .. } => None,
212 RoundFlowState::InteractivePending => None,
213 RoundFlowState::InteractiveOngoing { .. } => None,
214 RoundFlowState::Failed { .. } => None,
215 RoundFlowState::Canceled => None,
216 RoundFlowState::Finished { ref funding_tx, .. } => Some(funding_tx),
217 }
218 }
219
220 pub fn ongoing_participation(&self) -> bool {
222 match self.flow {
223 RoundFlowState::NonInteractivePending { .. } => false,
224 RoundFlowState::InteractivePending => true,
225 RoundFlowState::InteractiveOngoing { .. } => true,
226 RoundFlowState::Failed { .. } => false,
227 RoundFlowState::Canceled => false,
228 RoundFlowState::Finished { .. } => false,
229 }
230 }
231
232 pub async fn try_cancel(&mut self, wallet: &Wallet) -> anyhow::Result<bool> {
235 let ret = match self.flow {
236 RoundFlowState::NonInteractivePending { .. } => todo!("we have to cancel with server!"),
237 RoundFlowState::Canceled => true,
238 RoundFlowState::Failed { .. } => true,
239 RoundFlowState::InteractivePending | RoundFlowState::InteractiveOngoing { .. } => {
240 self.flow = RoundFlowState::Canceled;
241 true
242 },
243 RoundFlowState::Finished { .. } => false,
244 };
245 if ret {
246 persist_round_failure(wallet, &self.participation, self.movement_id).await
247 .context("failed to persist round failure for cancelation")?;
248 }
249 Ok(ret)
250 }
251
252 async fn try_start_attempt(&mut self, wallet: &Wallet, attempt: &RoundAttempt) {
253 match start_attempt(wallet, &self.participation, attempt).await {
254 Ok(state) => {
255 self.flow = RoundFlowState::InteractiveOngoing {
256 round_seq: attempt.round_seq,
257 attempt_seq: attempt.attempt_seq,
258 state: state,
259 };
260 },
261 Err(e) => {
262 self.flow = RoundFlowState::Failed {
263 error: format!("{:#}", e),
264 };
265 },
266 }
267 }
268
269 pub async fn process_event(
271 &mut self,
272 wallet: &Wallet,
273 event: &RoundEvent,
274 ) -> bool {
275 let _: Infallible = match self.flow {
276 RoundFlowState::InteractivePending => {
277 if let RoundEvent::Attempt(e) = event && e.attempt_seq == 0 {
278 trace!("Joining round attempt {}:{}", e.round_seq, e.attempt_seq);
279 self.try_start_attempt(wallet, e).await;
280 return true;
281 } else {
282 trace!("Ignoring {} event (seq {}:{}), waiting for round to start",
283 event.kind(), event.round_seq(), event.attempt_seq(),
284 );
285 return false;
286 }
287 },
288 RoundFlowState::InteractiveOngoing { round_seq, attempt_seq, ref mut state } => {
289 if event.round_seq() > round_seq {
292 self.flow = RoundFlowState::Failed {
295 error: format!("round {} started while we were on {}",
296 event.round_seq(), round_seq,
297 ),
298 };
299 return true;
300 }
301
302 if event.attempt_seq() < attempt_seq {
303 trace!("ignoring replayed message from old attempt");
304 return false;
305 }
306
307 if let RoundEvent::Attempt(e) = event && e.attempt_seq > attempt_seq {
308 trace!("Joining new round attempt {}:{}", e.round_seq, e.attempt_seq);
309 self.try_start_attempt(wallet, e).await;
310 return true;
311 }
312 trace!("Processing event {} for round attempt {}:{} in state {}",
313 event.kind(), round_seq, attempt_seq, state.kind(),
314 );
315
316 return match progress_attempt(state, wallet, &self.participation, event).await {
317 AttemptProgressResult::NotUpdated => false,
318 AttemptProgressResult::Updated { new_state } => {
319 *state = new_state;
320 true
321 },
322 AttemptProgressResult::Failed(e) => {
323 debug!("Round failed with error: {:#}", e);
324 self.flow = RoundFlowState::Failed {
325 error: format!("{:#}", e),
326 };
327 true
328 },
329 AttemptProgressResult::Finished { funding_tx, vtxos, unlock_hash } => {
330 self.new_vtxos = vtxos;
331 let funding_txid = funding_tx.compute_txid();
332 self.flow = RoundFlowState::Finished { funding_tx, unlock_hash };
333 if let Some(mid) = self.movement_id {
334 if let Err(e) = update_funding_txid(wallet, mid, funding_txid).await {
335 warn!("Error updating the round funding txid: {:#}", e);
336 }
337 }
338 true
339 },
340 };
341 },
342 RoundFlowState::NonInteractivePending { .. }
343 | RoundFlowState::Finished { .. }
344 | RoundFlowState::Failed { .. }
345 | RoundFlowState::Canceled => return false,
346 };
347 }
348
349 pub async fn sync(&mut self, wallet: &Wallet) -> anyhow::Result<RoundStatus> {
354 match self.flow {
355 RoundFlowState::Finished { ref funding_tx, .. } if self.done => {
356 Ok(RoundStatus::Confirmed {
357 funding_txid: funding_tx.compute_txid(),
358 })
359 },
360
361 RoundFlowState::InteractivePending | RoundFlowState::InteractiveOngoing { .. } => {
362 Ok(RoundStatus::Pending)
363 },
364 RoundFlowState::Failed { ref error } => {
365 persist_round_failure(wallet, &self.participation, self.movement_id).await
366 .context("failed to persist round failure")?;
367 Ok(RoundStatus::Failed { error: error.clone() })
368 },
369 RoundFlowState::Canceled => {
370 persist_round_failure(wallet, &self.participation, self.movement_id).await
371 .context("failed to persist round failure")?;
372 Ok(RoundStatus::Canceled)
373 },
374
375 RoundFlowState::NonInteractivePending { unlock_hash } => {
376 match progress_non_interactive(wallet, &self.participation, unlock_hash).await {
377 Ok(HarkProgressResult::RoundPending) => Ok(RoundStatus::Pending),
378 Ok(HarkProgressResult::Ok { funding_tx, new_vtxos }) => {
379 let funding_txid = funding_tx.compute_txid();
380 self.new_vtxos = new_vtxos;
381 self.flow = RoundFlowState::Finished {
382 funding_tx: funding_tx.clone(),
383 unlock_hash: unlock_hash,
384 };
385
386 persist_round_success(
387 wallet,
388 &self.participation,
389 self.movement_id,
390 &self.new_vtxos,
391 &funding_tx,
392 ).await.context("failed to store successful round in DB!")?;
393
394 self.done = true;
395
396 Ok(RoundStatus::Confirmed { funding_txid })
397 },
398 Ok(HarkProgressResult::FundingTxUnconfirmed { funding_txid }) => {
399 if let Some(mid) = self.movement_id {
400 update_funding_txid(wallet, mid, funding_txid).await
401 .context("failed to update funding txid in DB")?;
402 }
403 Ok(RoundStatus::Unconfirmed { funding_txid })
404 },
405
406 Err(HarkForfeitError::Err(e)) => {
409 Err(e.context("error progressing non-interactive round"))
413 },
414 Err(HarkForfeitError::SentForfeits(e)) => {
415 self.sent_forfeit_sigs = true;
416 Err(e.context("error progressing non-interactive round \
417 after sending forfeit tx signatures"))
418 },
419 }
420 },
421 RoundFlowState::Finished { ref funding_tx, unlock_hash } => {
423 let funding_txid = funding_tx.compute_txid();
424 let confirmed = check_funding_tx_confirmations(
425 wallet, funding_txid, &funding_tx,
426 ).await.context("error checking funding tx confirmations")?;
427 if !confirmed {
428 trace!("Funding tx {} not yet deeply enough confirmed", funding_txid);
429 return Ok(RoundStatus::Unconfirmed { funding_txid });
430 }
431
432 match hark_vtxo_swap(
433 wallet, &self.participation, &mut self.new_vtxos, &funding_tx, unlock_hash,
434 ).await {
435 Ok(()) => {
436 persist_round_success(
437 wallet,
438 &self.participation,
439 self.movement_id,
440 &self.new_vtxos,
441 &funding_tx,
442 ).await.context("failed to store successful round in DB!")?;
443
444 self.done = true;
445
446 Ok(RoundStatus::Confirmed { funding_txid })
447 },
448 Err(HarkForfeitError::Err(e)) => {
449 Err(e.context("error forfeiting VTXOs after round"))
450 },
451 Err(HarkForfeitError::SentForfeits(e)) => {
452 self.sent_forfeit_sigs = true;
453 Err(e.context("error after having signed and sent \
454 forfeit signatures to server"))
455 },
456 }
457 },
458 }
459 }
460
461 pub fn output_vtxos(&self) -> Option<&[Vtxo]> {
464 if self.new_vtxos.is_empty() {
465 None
466 } else {
467 Some(&self.new_vtxos)
468 }
469 }
470
471 pub fn locked_pending_inputs(&self) -> &[Vtxo] {
474 match self.flow {
476 RoundFlowState::NonInteractivePending { .. }
477 | RoundFlowState::InteractivePending
478 | RoundFlowState::InteractiveOngoing { .. }
479 => {
480 &self.participation.inputs
481 },
482 RoundFlowState::Failed { .. }
483 | RoundFlowState::Finished { .. }
484 | RoundFlowState::Canceled
485 => {
486 &[]
488 },
489 }
490 }
491}
492
493pub enum RoundFlowState {
498 NonInteractivePending {
500 unlock_hash: UnlockHash,
501 },
502
503 InteractivePending,
505 InteractiveOngoing {
507 round_seq: RoundSeq,
508 attempt_seq: usize,
509 state: AttemptState,
510 },
511
512 Finished {
514 funding_tx: Transaction,
515 unlock_hash: UnlockHash,
516 },
517
518 Failed {
520 error: String,
521 },
522
523 Canceled,
525}
526
527pub enum AttemptState {
532 AwaitingAttempt,
533 AwaitingUnsignedVtxoTree {
534 cosign_keys: Vec<Keypair>,
535 secret_nonces: Vec<Vec<DangerousSecretNonce>>,
536 unlock_hash: UnlockHash,
537 },
538 AwaitingFinishedRound {
539 unsigned_round_tx: Transaction,
540 vtxos_spec: VtxoTreeSpec,
541 unlock_hash: UnlockHash,
542 },
543}
544
545impl AttemptState {
546 fn kind(&self) -> &'static str {
548 match self {
549 Self::AwaitingAttempt => "AwaitingAttempt",
550 Self::AwaitingUnsignedVtxoTree { .. } => "AwaitingUnsignedVtxoTree",
551 Self::AwaitingFinishedRound { .. } => "AwaitingFinishedRound",
552 }
553 }
554}
555
556enum AttemptProgressResult {
558 Finished {
559 funding_tx: Transaction,
560 vtxos: Vec<Vtxo>,
561 unlock_hash: UnlockHash,
562 },
563 Failed(anyhow::Error),
564 Updated {
570 new_state: AttemptState,
571 },
572 NotUpdated,
573}
574
575async fn start_attempt(
577 wallet: &Wallet,
578 participation: &RoundParticipation,
579 event: &RoundAttempt,
580) -> anyhow::Result<AttemptState> {
581 let mut srv = wallet.require_server().context("server not available")?;
582 let ark_info = srv.ark_info().await?;
583
584 let cosign_keys = iter::repeat_with(|| Keypair::new(&SECP, &mut rand::thread_rng()))
586 .take(participation.outputs.len())
587 .collect::<Vec<_>>();
588
589 let cosign_nonces = cosign_keys.iter()
592 .map(|key| {
593 let mut secs = Vec::with_capacity(ark_info.nb_round_nonces);
594 let mut pubs = Vec::with_capacity(ark_info.nb_round_nonces);
595 for _ in 0..ark_info.nb_round_nonces {
596 let (s, p) = musig::nonce_pair(key);
597 secs.push(s);
598 pubs.push(p);
599 }
600 (secs, pubs)
601 })
602 .take(participation.outputs.len())
603 .collect::<Vec<(Vec<SecretNonce>, Vec<PublicNonce>)>>();
604
605
606 debug!("Submitting payment request with {} inputs and {} vtxo outputs",
608 participation.inputs.len(), participation.outputs.len(),
609 );
610
611 let signed_reqs = participation.outputs.iter()
612 .zip(cosign_keys.iter())
613 .zip(cosign_nonces.iter())
614 .map(|((req, cosign_key), (_sec, pub_nonces))| {
615 SignedVtxoRequest {
616 vtxo: req.clone(),
617 cosign_pubkey: cosign_key.public_key(),
618 nonces: pub_nonces.clone(),
619 }
620 })
621 .collect::<Vec<_>>();
622
623 let mut input_vtxos = Vec::with_capacity(participation.inputs.len());
624 for vtxo in participation.inputs.iter() {
625 let keypair = wallet.get_vtxo_key(vtxo).await
626 .map_err(HarkForfeitError::Err)?;
627 input_vtxos.push(protos::InputVtxo {
628 vtxo_id: vtxo.id().to_bytes().to_vec(),
629 ownership_proof: {
630 let sig = event.challenge
631 .sign_with(vtxo.id(), &signed_reqs, &keypair);
632 sig.serialize().to_vec()
633 },
634 });
635 }
636
637 let resp = srv.client.submit_payment(protos::SubmitPaymentRequest {
638 input_vtxos: input_vtxos,
639 vtxo_requests: signed_reqs.into_iter().map(Into::into).collect(),
640 #[allow(deprecated)]
641 offboard_requests: vec![],
642 }).await.context("Ark server refused our payment submission")?;
643
644 Ok(AttemptState::AwaitingUnsignedVtxoTree {
645 unlock_hash: UnlockHash::from_bytes(&resp.into_inner().unlock_hash)?,
646 cosign_keys: cosign_keys,
647 secret_nonces: cosign_nonces.into_iter()
648 .map(|(sec, _pub)| sec.into_iter()
649 .map(DangerousSecretNonce::dangerous_from_secret_nonce)
650 .collect())
651 .collect(),
652 })
653}
654
655#[derive(Debug, thiserror::Error)]
657enum HarkForfeitError {
658 #[error("error after forfeits were sent: {0}")]
660 SentForfeits(anyhow::Error),
661 #[error("error before forfeits were sent: {0}")]
663 Err(anyhow::Error),
664}
665
666async fn hark_cosign_leaf(
667 wallet: &Wallet,
668 srv: &mut ServerConnection,
669 funding_tx: &Transaction,
670 vtxo: &mut Vtxo,
671) -> anyhow::Result<()> {
672 let key = wallet.pubkey_keypair(&vtxo.user_pubkey()).await
673 .context("error fetching keypair").map_err(HarkForfeitError::Err)?
674 .with_context(|| format!(
675 "keypair {} not found for VTXO {}", vtxo.user_pubkey(), vtxo.id(),
676 ))?.1;
677 let (ctx, cosign_req) = LeafVtxoCosignContext::new(vtxo, funding_tx, &key);
678 let cosign_resp = srv.client.request_leaf_vtxo_cosign(
679 protos::LeafVtxoCosignRequest::from(cosign_req),
680 ).await
681 .with_context(|| format!("error requesting leaf cosign for vtxo {}", vtxo.id()))?
682 .into_inner().try_into()
683 .context("bad leaf vtxo cosign response")?;
684 ensure!(ctx.finalize(vtxo, cosign_resp),
685 "failed to finalize VTXO leaf signature for VTXO {}", vtxo.id(),
686 );
687
688 Ok(())
689}
690
691async fn hark_vtxo_swap(
701 wallet: &Wallet,
702 participation: &RoundParticipation,
703 output_vtxos: &mut [Vtxo],
704 funding_tx: &Transaction,
705 unlock_hash: UnlockHash,
706) -> Result<(), HarkForfeitError> {
707 let mut srv = wallet.require_server().map_err(HarkForfeitError::Err)?;
708
709 for vtxo in output_vtxos.iter_mut() {
711 hark_cosign_leaf(wallet, &mut srv, funding_tx, vtxo).await
712 .map_err(HarkForfeitError::Err)?;
713 }
714
715 let server_nonces = srv.client.request_forfeit_nonces(protos::ForfeitNoncesRequest {
718 unlock_hash: unlock_hash.to_byte_array().to_vec(),
719 vtxo_ids: participation.inputs.iter().map(|v| v.id().to_bytes().to_vec()).collect(),
720 }).await
721 .context("request forfeits nonces call failed")
722 .map_err(HarkForfeitError::Err)?
723 .into_inner().public_nonces.into_iter()
724 .map(|b| HashLockedForfeitNonces::from_bytes(b))
725 .collect::<Result<Vec<_>, _>>()
726 .context("invalid forfeit nonces")
727 .map_err(HarkForfeitError::Err)?;
728
729 if server_nonces.len() != participation.inputs.len() {
730 return Err(HarkForfeitError::Err(anyhow!(
731 "server sent {} nonce pairs, expected {}",
732 server_nonces.len(), participation.inputs.len(),
733 )));
734 }
735
736 let mut forfeit_bundles = Vec::with_capacity(participation.inputs.len());
737 for (input, nonces) in participation.inputs.iter().zip(server_nonces.into_iter()) {
738 let user_key = wallet.pubkey_keypair(&input.user_pubkey()).await
739 .ok().flatten().with_context(|| format!(
740 "failed to fetch keypair for vtxo user pubkey {}", input.user_pubkey(),
741 )).map_err(HarkForfeitError::Err)?.1;
742 forfeit_bundles.push(HashLockedForfeitBundle::forfeit_vtxo(
743 &input, unlock_hash, &user_key, &nonces,
744 ))
745 }
746
747 let preimage = srv.client.forfeit_vtxos(protos::ForfeitVtxosRequest {
748 forfeit_bundles: forfeit_bundles.iter().map(|b| b.serialize()).collect(),
749 }).await
750 .context("forfeit vtxos call failed")
751 .map_err(HarkForfeitError::SentForfeits)?
752 .into_inner().unlock_preimage.as_slice().try_into()
753 .context("invalid preimage length")
754 .map_err(HarkForfeitError::SentForfeits)?;
755
756 for vtxo in output_vtxos.iter_mut() {
757 if !vtxo.provide_unlock_preimage(preimage) {
758 return Err(HarkForfeitError::SentForfeits(anyhow!(
759 "invalid preimage {} for vtxo {} with supposed unlock hash {}",
760 preimage.as_hex(), vtxo.id(), unlock_hash,
761 )));
762 }
763
764 vtxo.validate(&funding_tx).with_context(|| format!(
766 "new VTXO {} does not pass validation after hArk forfeit protocol", vtxo.id(),
767 )).map_err(HarkForfeitError::SentForfeits)?;
768 }
769
770 Ok(())
771}
772
773fn check_vtxo_fails_hash_lock(funding_tx: &Transaction, vtxo: &Vtxo) -> anyhow::Result<()> {
774 match vtxo.validate(funding_tx) {
775 Err(VtxoValidationError::GenesisTransition {
776 genesis_idx, genesis_len, transition_kind, ..
777 }) if genesis_idx + 1 == genesis_len && transition_kind == HARK_TRANSITION_KIND => Ok(()),
778 Ok(()) => Err(anyhow!("new un-unlocked VTXO should fail validation but doesn't: {}",
779 vtxo.serialize_hex(),
780 )),
781 Err(e) => Err(anyhow!("new VTXO {} failed validation: {:#}", vtxo.id(), e)),
782 }
783}
784
785fn check_round_matches_participation(
786 part: &RoundParticipation,
787 new_vtxos: &[Vtxo],
788 funding_tx: &Transaction,
789) -> anyhow::Result<()> {
790 ensure!(new_vtxos.len() == part.outputs.len(),
791 "unexpected number of VTXOs: got {}, expected {}", new_vtxos.len(), part.outputs.len(),
792 );
793
794 for (vtxo, req) in new_vtxos.iter().zip(&part.outputs) {
795 ensure!(vtxo.amount() == req.amount,
796 "unexpected VTXO amount: got {}, expected {}", vtxo.amount(), req.amount,
797 );
798 ensure!(*vtxo.policy() == req.policy,
799 "unexpected VTXO policy: got {:?}, expected {:?}", vtxo.policy(), req.policy,
800 );
801
802 check_vtxo_fails_hash_lock(funding_tx, vtxo)?;
804 }
805
806 Ok(())
807}
808
809async fn check_funding_tx_confirmations(
819 wallet: &Wallet,
820 funding_txid: Txid,
821 funding_tx: &Transaction,
822) -> anyhow::Result<bool> {
823 let tip = wallet.chain.tip().await.context("chain source error")?;
824 let conf_height = tip - wallet.config.round_tx_required_confirmations + 1;
825 let tx_status = wallet.chain.tx_status(funding_txid).await.context("chain source error")?;
826 trace!("Round funding tx {} confirmation status: {:?} (tip={})",
827 funding_txid, tx_status, tip,
828 );
829 match tx_status {
830 TxStatus::Confirmed(b) if b.height <= conf_height => Ok(true),
831 TxStatus::Mempool | TxStatus::Confirmed(_) => {
832 if wallet.config.round_tx_required_confirmations == 0 {
833 debug!("Accepting round funding tx without confirmations because of configuration");
834 Ok(true)
835 } else {
836 trace!("Hark round funding tx not confirmed (deep enough) yet: {:?}", tx_status);
837 Ok(false)
838 }
839 },
840 TxStatus::NotFound => {
841 if let Err(e) = wallet.chain.broadcast_tx(&funding_tx).await {
846 Err(anyhow!("hark funding tx {} server sent us is rejected by mempool (hex={}): {:#}",
847 funding_txid, serialize_hex(funding_tx), e,
848 ))
849 } else {
850 trace!("hark funding tx {} was not in mempool but we broadcast it", funding_txid);
851 Ok(false)
852 }
853 },
854 }
855}
856
857enum HarkProgressResult {
858 RoundPending,
859 FundingTxUnconfirmed {
860 funding_txid: Txid,
861 },
862 Ok {
863 funding_tx: Transaction,
864 new_vtxos: Vec<Vtxo>,
865 },
866}
867
868async fn progress_non_interactive(
869 wallet: &Wallet,
870 participation: &RoundParticipation,
871 unlock_hash: UnlockHash,
872) -> Result<HarkProgressResult, HarkForfeitError> {
873 let mut srv = wallet.require_server().map_err(HarkForfeitError::Err)?;
874
875 let resp = srv.client.round_participation_status(protos::RoundParticipationStatusRequest {
876 unlock_hash: unlock_hash.to_byte_array().to_vec(),
877 }).await
878 .context("error checking round participation status")
879 .map_err(HarkForfeitError::Err)?.into_inner();
880 let status = protos::RoundParticipationStatus::try_from(resp.status)
881 .context("unknown status from server")
882 .map_err(HarkForfeitError::Err) ?;
883
884 if status == protos::RoundParticipationStatus::RoundPartPending {
885 trace!("Hark round still pending");
886 return Ok(HarkProgressResult::RoundPending);
887 }
888
889 if status == protos::RoundParticipationStatus::RoundPartReleased {
894 let preimage = resp.unlock_preimage.as_ref().map(|p| p.as_hex());
895 warn!("Server says preimage was already released for hArk participation \
896 with unlock hash {}. Supposed preimage: {:?}", unlock_hash, preimage,
897 );
898 }
899
900 let funding_tx_bytes = resp.round_funding_tx
901 .context("funding txid should be provided when status is not pending")
902 .map_err(HarkForfeitError::Err)?;
903 let funding_tx = deserialize::<Transaction>(&funding_tx_bytes)
904 .context("invalid funding txid")
905 .map_err(HarkForfeitError::Err)?;
906 let funding_txid = funding_tx.compute_txid();
907 trace!("Funding tx for round participation with unlock hash {}: {} ({})",
908 unlock_hash, funding_tx.compute_txid(), funding_tx_bytes.as_hex(),
909 );
910
911 match check_funding_tx_confirmations(wallet, funding_txid, &funding_tx).await {
913 Ok(true) => {},
914 Ok(false) => return Ok(HarkProgressResult::FundingTxUnconfirmed { funding_txid }),
915 Err(e) => return Err(HarkForfeitError::Err(e.context("checking funding tx confirmations"))),
916 }
917
918 let mut new_vtxos = resp.output_vtxos.into_iter()
919 .map(|v| Vtxo::from_bytes(v))
920 .collect::<Result<Vec<_>, _>>()
921 .context("invalid output VTXOs from server")
922 .map_err(HarkForfeitError::Err)?;
923
924 check_round_matches_participation(participation, &new_vtxos, &funding_tx)
926 .context("new VTXOs received from server don't match our participation")
927 .map_err(HarkForfeitError::Err)?;
928
929 hark_vtxo_swap(wallet, participation, &mut new_vtxos, &funding_tx, unlock_hash).await
930 .context("error forfeiting hArk VTXOs")
931 .map_err(HarkForfeitError::SentForfeits)?;
932
933 Ok(HarkProgressResult::Ok { funding_tx, new_vtxos })
934}
935
936async fn progress_attempt(
937 state: &AttemptState,
938 wallet: &Wallet,
939 part: &RoundParticipation,
940 event: &RoundEvent,
941) -> AttemptProgressResult {
942 match (state, event) {
946
947 (
948 AttemptState::AwaitingUnsignedVtxoTree { cosign_keys, secret_nonces, unlock_hash },
949 RoundEvent::VtxoProposal(e),
950 ) => {
951 trace!("Received VtxoProposal: {:#?}", e);
952 match sign_vtxo_tree(
953 wallet,
954 part,
955 &cosign_keys,
956 &secret_nonces,
957 &e.unsigned_round_tx,
958 &e.vtxos_spec,
959 &e.cosign_agg_nonces,
960 ).await {
961 Ok(()) => {
962 AttemptProgressResult::Updated {
963 new_state: AttemptState::AwaitingFinishedRound {
964 unsigned_round_tx: e.unsigned_round_tx.clone(),
965 vtxos_spec: e.vtxos_spec.clone(),
966 unlock_hash: *unlock_hash,
967 },
968 }
969 },
970 Err(e) => {
971 trace!("Error signing VTXO tree: {:#}", e);
972 AttemptProgressResult::Failed(e)
973 },
974 }
975 },
976
977 (
978 AttemptState::AwaitingFinishedRound { unsigned_round_tx, vtxos_spec, unlock_hash },
979 RoundEvent::Finished(RoundFinished { cosign_sigs, signed_round_tx, .. }),
980 ) => {
981 if unsigned_round_tx.compute_txid() != signed_round_tx.compute_txid() {
982 return AttemptProgressResult::Failed(anyhow!(
983 "signed funding tx ({}) doesn't match tx received before ({})",
984 signed_round_tx.compute_txid(), unsigned_round_tx.compute_txid(),
985 ));
986 }
987
988 if let Err(e) = wallet.chain.broadcast_tx(&signed_round_tx).await {
989 warn!("Failed to broadcast signed round tx: {:#}", e);
990 }
991
992 match construct_new_vtxos(
993 part, unsigned_round_tx, vtxos_spec, cosign_sigs,
994 ).await {
995 Ok(v) => AttemptProgressResult::Finished {
996 funding_tx: signed_round_tx.clone(),
997 vtxos: v,
998 unlock_hash: *unlock_hash,
999 },
1000 Err(e) => AttemptProgressResult::Failed(anyhow!(
1001 "failed to construct new VTXOs for round: {:#}", e,
1002 )),
1003 }
1004 },
1005
1006 (state, RoundEvent::Finished(RoundFinished { .. })) => {
1007 AttemptProgressResult::Failed(anyhow!(
1008 "unexpectedly received a finished round while we were in state {}",
1009 state.kind(),
1010 ))
1011 },
1012
1013 (state, _) => {
1014 trace!("Ignoring round event {} in state {}", event.kind(), state.kind());
1015 AttemptProgressResult::NotUpdated
1016 },
1017 }
1018}
1019
1020async fn sign_vtxo_tree(
1021 wallet: &Wallet,
1022 participation: &RoundParticipation,
1023 cosign_keys: &[Keypair],
1024 secret_nonces: &[impl AsRef<[DangerousSecretNonce]>],
1025 unsigned_round_tx: &Transaction,
1026 vtxo_tree: &VtxoTreeSpec,
1027 cosign_agg_nonces: &[musig::AggregatedNonce],
1028) -> anyhow::Result<()> {
1029 let srv = wallet.require_server().context("server not available")?;
1030
1031 if unsigned_round_tx.output.len() < MIN_ROUND_TX_OUTPUTS {
1032 bail!("server sent round tx with less than 2 outputs: {}",
1033 serialize_hex(&unsigned_round_tx),
1034 );
1035 }
1036
1037 let vtxos_utxo = OutPoint::new(unsigned_round_tx.compute_txid(), ROUND_TX_VTXO_TREE_VOUT);
1038
1039 let mut my_vtxos = participation.outputs.iter().collect::<Vec<_>>();
1041 for vtxo_req in vtxo_tree.iter_vtxos() {
1042 if let Some(i) = my_vtxos.iter().position(|v| {
1043 v.policy == vtxo_req.vtxo.policy && v.amount == vtxo_req.vtxo.amount
1044 }) {
1045 my_vtxos.swap_remove(i);
1046 }
1047 }
1048 if !my_vtxos.is_empty() {
1049 bail!("server didn't include all of our vtxos, missing: {:?}", my_vtxos);
1050 }
1051
1052 let unsigned_vtxos = vtxo_tree.clone().into_unsigned_tree(vtxos_utxo);
1053 let iter = participation.outputs.iter().zip(cosign_keys).zip(secret_nonces);
1054 trace!("Sending vtxo signatures to server...");
1055 let _ = try_join_all(iter.map(|((req, key), sec)| async {
1056 let leaf_idx = unsigned_vtxos.spec.leaf_idx_of_req(req).expect("req included");
1057 let secret_nonces = sec.as_ref().iter().map(|s| s.to_sec_nonce()).collect();
1058 let part_sigs = unsigned_vtxos.cosign_branch(
1059 &cosign_agg_nonces, leaf_idx, key, secret_nonces,
1060 ).context("failed to cosign branch: our request not part of tree")?;
1061
1062 info!("Sending {} partial vtxo cosign signatures for pk {}",
1063 part_sigs.len(), key.public_key(),
1064 );
1065
1066 let _ = srv.client.clone().provide_vtxo_signatures(protos::VtxoSignaturesRequest {
1067 pubkey: key.public_key().serialize().to_vec(),
1068 signatures: part_sigs.iter().map(|s| s.serialize().to_vec()).collect(),
1069 }).await.context("error sending vtxo signatures")?;
1070
1071 Result::<(), anyhow::Error>::Ok(())
1072 })).await.context("error sending VTXO signatures")?;
1073 trace!("Done sending vtxo signatures to server");
1074
1075 Ok(())
1076}
1077
1078async fn construct_new_vtxos(
1079 participation: &RoundParticipation,
1080 unsigned_round_tx: &Transaction,
1081 vtxo_tree: &VtxoTreeSpec,
1082 vtxo_cosign_sigs: &[schnorr::Signature],
1083) -> anyhow::Result<Vec<Vtxo>> {
1084 let round_txid = unsigned_round_tx.compute_txid();
1085 let vtxos_utxo = OutPoint::new(round_txid, ROUND_TX_VTXO_TREE_VOUT);
1086 let vtxo_tree = vtxo_tree.clone().into_unsigned_tree(vtxos_utxo);
1087
1088 if vtxo_tree.verify_cosign_sigs(&vtxo_cosign_sigs).is_err() {
1090 bail!("Received incorrect vtxo cosign signatures from server");
1092 }
1093
1094 let signed_vtxos = vtxo_tree
1095 .into_signed_tree(vtxo_cosign_sigs.to_vec())
1096 .into_cached_tree();
1097
1098 let mut expected_vtxos = participation.outputs.iter().collect::<Vec<_>>();
1099 let total_nb_expected_vtxos = expected_vtxos.len();
1100
1101 let mut new_vtxos = vec![];
1102 for (idx, req) in signed_vtxos.spec.spec.vtxos.iter().enumerate() {
1103 if let Some(expected_idx) = expected_vtxos.iter().position(|r| **r == req.vtxo) {
1104 let vtxo = signed_vtxos.build_vtxo(idx);
1105
1106 check_vtxo_fails_hash_lock(unsigned_round_tx, &vtxo)
1109 .context("constructed invalid vtxo from tree")?;
1110
1111 info!("New VTXO from round: {} ({}, {})",
1112 vtxo.id(), vtxo.amount(), vtxo.policy_type(),
1113 );
1114
1115 new_vtxos.push(vtxo);
1116 expected_vtxos.swap_remove(expected_idx);
1117 }
1118 }
1119
1120 if !expected_vtxos.is_empty() {
1121 if expected_vtxos.len() == total_nb_expected_vtxos {
1122 bail!("None of our VTXOs were present in round!");
1124 } else {
1125 bail!("Server included some of our VTXOs but not all: {} missing: {:?}",
1126 expected_vtxos.len(), expected_vtxos,
1127 );
1128 }
1129 }
1130 Ok(new_vtxos)
1131}
1132
1133async fn persist_round_success(
1135 wallet: &Wallet,
1136 participation: &RoundParticipation,
1137 movement_id: Option<MovementId>,
1138 new_vtxos: &[Vtxo],
1139 funding_tx: &Transaction,
1140) -> anyhow::Result<()> {
1141 debug!("Persisting newly finished round. {} new vtxos, movement ID {:?}",
1142 new_vtxos.len(), movement_id,
1143 );
1144
1145 let store_result = wallet.store_spendable_vtxos(new_vtxos).await
1149 .context("failed to store new VTXOs");
1150 let spent_result = wallet.mark_vtxos_as_spent(&participation.inputs).await
1151 .context("failed to mark input VTXOs as spent");
1152 let update_result = if let Some(mid) = movement_id {
1153 wallet.movements.finish_movement_with_update(
1154 mid,
1155 MovementStatus::Successful,
1156 MovementUpdate::new()
1157 .produced_vtxos(new_vtxos)
1158 .metadata([("funding_txid".into(), serde_json::to_value(funding_tx.compute_txid())?)]),
1159 ).await.context("failed to mark movement as finished")
1160 } else {
1161 Ok(())
1162 };
1163
1164 store_result?;
1165 spent_result?;
1166 update_result?;
1167
1168 Ok(())
1169}
1170
1171async fn persist_round_failure(
1173 wallet: &Wallet,
1174 participation: &RoundParticipation,
1175 movement_id: Option<MovementId>,
1176) -> anyhow::Result<()> {
1177 debug!("Attempting to persist the failure of a round with the movement ID {:?}", movement_id);
1178 let unlock_result = wallet.unlock_vtxos(&participation.inputs).await;
1179 let finish_result = if let Some(movement_id) = movement_id {
1180 wallet.movements.finish_movement(movement_id, MovementStatus::Failed).await
1181 } else {
1182 Ok(())
1183 };
1184 if let Err(e) = &finish_result {
1185 error!("Failed to mark movement as failed: {:#}", e);
1186 }
1187 match (unlock_result, finish_result) {
1188 (Ok(()), Ok(())) => Ok(()),
1189 (Err(e), _) => Err(e),
1190 (_, Err(e)) => Err(anyhow!("Failed to mark movement as failed: {:#}", e)),
1191 }
1192}
1193
1194async fn update_funding_txid(
1195 wallet: &Wallet,
1196 movement_id: MovementId,
1197 funding_txid: Txid,
1198) -> anyhow::Result<()> {
1199 wallet.movements.update_movement(
1200 movement_id,
1201 MovementUpdate::new()
1202 .metadata([("funding_txid".into(), serde_json::to_value(&funding_txid)?)])
1203 ).await.context("Unable to update funding txid of round")
1204}
1205
1206impl Wallet {
1207 pub async fn join_next_round(
1211 &self,
1212 participation: RoundParticipation,
1213 movement_kind: Option<RoundMovement>,
1214 ) -> anyhow::Result<StoredRoundState> {
1215 participation.sanity_check().context("bad participations")?;
1216
1217 let movement_id = if let Some(kind) = movement_kind {
1218 Some(self.movements.new_movement_with_update(
1219 Subsystem::ROUND,
1220 kind.to_string(),
1221 participation.to_movement_update()?
1222 ).await?)
1223 } else {
1224 None
1225 };
1226 let state = RoundState::new_interactive(participation, movement_id);
1227
1228 let id = self.db.store_round_state_lock_vtxos(&state).await?;
1229 Ok(StoredRoundState { id, state })
1230 }
1231
1232 pub async fn join_non_interactive_round(
1233 &self,
1234 participation: RoundParticipation,
1235 movement_kind: Option<RoundMovement>,
1236 ) -> anyhow::Result<StoredRoundState> {
1237 participation.sanity_check().context("bad participations")?;
1238
1239 let movement_id = if let Some(kind) = movement_kind {
1240 let movement_id = self.movements.new_movement(
1241 Subsystem::ROUND, kind.to_string(),
1242 ).await?;
1243 let update = participation.to_movement_update()?;
1244 self.movements.update_movement(movement_id, update).await?;
1245 Some(movement_id)
1246 } else {
1247 None
1248 };
1249 let state = RoundState::new_interactive(participation, movement_id);
1250
1251 let id = self.db.store_round_state_lock_vtxos(&state).await?;
1252 Ok(StoredRoundState { id, state })
1253 }
1254
1255 pub async fn pending_round_states(&self) -> anyhow::Result<Vec<StoredRoundState>> {
1257 self.db.load_round_states().await
1258 }
1259
1260 pub async fn sync_pending_rounds(&self) -> anyhow::Result<()> {
1262 let states = self.db.load_round_states().await?;
1263 if !states.is_empty() {
1264 debug!("Syncing {} pending round states...", states.len());
1265
1266 tokio_stream::iter(states).for_each_concurrent(10, |mut state| async move {
1267 if state.state.ongoing_participation() {
1269 return;
1270 }
1271
1272 let status = state.state.sync(self).await;
1273 trace!("Synced round #{}, status: {:?}", state.id, status);
1274 match status {
1275 Ok(RoundStatus::Confirmed { funding_txid }) => {
1276 info!("Round confirmed. Funding tx {}", funding_txid);
1277 if let Err(e) = self.db.remove_round_state(&state).await {
1278 warn!("Error removing confirmed round state from db: {:#}", e);
1279 }
1280 },
1281 Ok(RoundStatus::Unconfirmed { funding_txid }) => {
1282 info!("Waiting for confirmations for round funding tx {}", funding_txid);
1283 if let Err(e) = self.db.update_round_state(&state).await {
1284 warn!("Error updating pending round state in db: {:#}", e);
1285 }
1286 },
1287 Ok(RoundStatus::Pending) => {
1288 if let Err(e) = self.db.update_round_state(&state).await {
1289 warn!("Error updating pending round state in db: {:#}", e);
1290 }
1291 },
1292 Ok(RoundStatus::Failed { error }) => {
1293 error!("Round failed: {}", error);
1294 if let Err(e) = self.db.remove_round_state(&state).await {
1295 warn!("Error removing failed round state from db: {:#}", e);
1296 }
1297 },
1298 Ok(RoundStatus::Canceled) => {
1299 error!("Round canceled");
1300 if let Err(e) = self.db.remove_round_state(&state).await {
1301 warn!("Error removing canceled round state from db: {:#}", e);
1302 }
1303 },
1304 Err(e) => warn!("Error syncing round: {:#}", e),
1305 }
1306 }).await;
1307 }
1308
1309 Ok(())
1310 }
1311
1312 async fn get_last_round_event(&self) -> anyhow::Result<RoundEvent> {
1314 let mut srv = self.require_server()?;
1315 let e = srv.client.last_round_event(protos::Empty {}).await?.into_inner();
1316 Ok(RoundEvent::try_from(e).context("invalid event format from server")?)
1317 }
1318
1319 async fn inner_process_event(
1320 &self,
1321 states: impl IntoIterator<Item = &mut StoredRoundState>,
1322 event: Option<&RoundEvent>,
1323 ) {
1324 tokio_stream::iter(states).for_each_concurrent(3, |state| async move {
1325 if let Some(event) = event && state.state.ongoing_participation() {
1326 let updated = state.state.process_event(self, &event).await;
1327 if updated {
1328 if let Err(e) = self.db.update_round_state(&state).await {
1329 error!("Error storing round state #{} after progress: {:#}", state.id, e);
1330 }
1331 }
1332 }
1333
1334 match state.state.sync(self).await {
1335 Err(e) => warn!("Error syncing round #{}: {:#}", state.id, e),
1336 Ok(s) if s.is_final() => {
1337 info!("Round #{} finished with result: {:?}", state.id, s);
1338 if let Err(e) = self.db.remove_round_state(&state).await {
1339 warn!("Failed to remove finished round #{} from db: {:#}", state.id, e);
1340 }
1341 },
1342 Ok(s) => {
1343 trace!("Round state #{} is now in state {:?}", state.id, s);
1344 if let Err(e) = self.db.update_round_state(&state).await {
1345 warn!("Error storing round state #{}: {:#}", state.id, e);
1346 }
1347 },
1348 }
1349 }).await;
1350 }
1351
1352 pub async fn progress_pending_rounds(
1357 &self,
1358 last_round_event: Option<&RoundEvent>,
1359 ) -> anyhow::Result<()> {
1360 let mut states = self.db.load_round_states().await?;
1361 info!("Processing {} rounds...", states.len());
1362
1363 let mut last_round_event = last_round_event.map(|e| Cow::Borrowed(e));
1364 if states.iter().any(|s| s.state.ongoing_participation()) && last_round_event.is_none() {
1365 match self.get_last_round_event().await {
1366 Ok(e) => last_round_event = Some(Cow::Owned(e)),
1367 Err(e) => {
1368 warn!("Error fetching round event, \
1369 failed to progress ongoing rounds: {:#}", e);
1370 },
1371 }
1372 }
1373
1374 let event = last_round_event.as_ref().map(|c| c.as_ref());
1375 self.inner_process_event(states.iter_mut(), event).await;
1376
1377 Ok(())
1378 }
1379
1380 pub async fn subscribe_round_events(&self)
1381 -> anyhow::Result<impl Stream<Item = anyhow::Result<RoundEvent>> + Unpin>
1382 {
1383 let mut srv = self.require_server()?;
1384 let events = srv.client.subscribe_rounds(protos::Empty {}).await?
1385 .into_inner().map(|m| {
1386 let m = m.context("received error on event stream")?;
1387 let e = RoundEvent::try_from(m.clone())
1388 .with_context(|| format!("error converting rpc round event: {:?}", m))?;
1389 trace!("Received round event: {}", e);
1390 Ok::<_, anyhow::Error>(e)
1391 });
1392 Ok(events)
1393 }
1394
1395 pub async fn participate_ongoing_rounds(&self) -> anyhow::Result<()> {
1400 let mut states = self.db.load_round_states().await?;
1401 states.retain(|s| s.state.ongoing_participation());
1402
1403 if states.is_empty() {
1404 info!("No pending round states");
1405 return Ok(());
1406 }
1407
1408 let mut events = self.subscribe_round_events().await?;
1409
1410 info!("Participating with {} round states...", states.len());
1411
1412 loop {
1413 let event = events.next().await
1414 .context("events stream broke")?
1415 .context("error on event stream")?;
1416
1417 self.inner_process_event(states.iter_mut(), Some(&event)).await;
1418
1419 states.retain(|s| s.state.ongoing_participation());
1420 if states.is_empty() {
1421 info!("All rounds handled");
1422 return Ok(());
1423 }
1424 }
1425 }
1426
1427 pub async fn cancel_all_pending_rounds(&self) -> anyhow::Result<()> {
1432 let states = self.db.load_round_states().await?;
1433 for mut state in states {
1434 match state.state.try_cancel(self).await {
1435 Ok(true) => {
1436 if let Err(e) = self.db.remove_round_state(&state).await {
1437 warn!("Error removing canceled round state from db: {:#}", e);
1438 }
1439 },
1440 Ok(false) => {},
1441 Err(e) => warn!("Error trying to cancel round #{}: {:#}", state.id, e),
1442 }
1443 }
1444 Ok(())
1445 }
1446
1447 pub async fn cancel_pending_round(&self, id: RoundStateId) -> anyhow::Result<()> {
1449 let states = self.db.load_round_states().await?;
1450 for mut state in states {
1451 if state.id != id {
1452 continue;
1453 }
1454
1455 if state.state.try_cancel(self).await.context("failed to cancel round")? {
1456 self.db.remove_round_state(&state).await
1457 .context("error removing canceled round state from db")?;
1458 } else {
1459 bail!("failed to cancel round");
1460 }
1461 return Ok(());
1462 }
1463 bail!("round not found")
1464 }
1465
1466 pub(crate) async fn participate_round(
1473 &self,
1474 participation: RoundParticipation,
1475 movement_kind: Option<RoundMovement>,
1476 ) -> anyhow::Result<RoundStatus> {
1477 let mut state = self.join_next_round(participation, movement_kind).await?;
1478
1479 info!("Waiting for a round start...");
1480 let mut events = self.subscribe_round_events().await?;
1481
1482 loop {
1483 if !state.state.ongoing_participation() {
1484 return Ok(state.state.sync(self).await?);
1485 }
1486
1487 let event = events.next().await
1488 .context("events stream broke")?
1489 .context("error on event stream")?;
1490 if state.state.process_event(self, &event).await {
1491 self.db.update_round_state(&state).await?;
1492 }
1493 }
1494 }
1495}