1mod lock;
6
7pub(crate) use lock::{RoundStateGuard, RoundStateLockIndex};
8
9use std::iter;
10use std::borrow::Cow;
11use std::convert::Infallible;
12use std::time::{Duration, SystemTime, UNIX_EPOCH};
13
14use anyhow::Context;
15use ark::vtxo::VtxoValidationError;
16use bdk_esplora::esplora_client::Amount;
17use bip39::rand;
18use bitcoin::{OutPoint, SignedAmount, Transaction, Txid};
19use bitcoin::consensus::encode::{deserialize, serialize_hex};
20use bitcoin::hashes::Hash;
21use bitcoin::hex::DisplayHex;
22use bitcoin::key::Keypair;
23use bitcoin::secp256k1::schnorr;
24use futures::future::{join_all, try_join_all};
25use futures::{Stream, StreamExt};
26use log::{debug, error, info, trace, warn};
27
28use ark::{ProtocolEncoding, SignedVtxoRequest, Vtxo, VtxoRequest};
29use ark::vtxo::Full;
30use ark::forfeit::HashLockedForfeitBundle;
31use ark::musig::{self, DangerousSecretNonce, PublicNonce, SecretNonce};
32use ark::rounds::{
33 RoundAttempt, RoundEvent, RoundFinished, RoundSeq, MIN_ROUND_TX_OUTPUTS,
34 ROUND_TX_VTXO_TREE_VOUT,
35};
36use ark::tree::signed::{LeafVtxoCosignContext, UnlockHash, VtxoTreeSpec};
37use bitcoin_ext::TxStatus;
38use server_rpc::{protos, ServerConnection, TryFromBytes};
39
40use crate::{SECP, Wallet, WalletVtxo};
41use crate::movement::{MovementId, MovementStatus};
42use crate::movement::update::MovementUpdate;
43use crate::persist::models::{RoundStateId, StoredRoundState, Unlocked};
44use crate::subsystem::{RoundMovement, Subsystem};
45
46
47const HARK_TRANSITION_KIND: &str = "hash-locked-cosigned";
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct RoundParticipation {
53 #[serde(with = "ark::encode::serde::vec")]
54 pub inputs: Vec<Vtxo<Full>>,
55 pub outputs: Vec<VtxoRequest>,
58}
59
60impl RoundParticipation {
61 pub fn to_movement_update(&self) -> anyhow::Result<MovementUpdate> {
62 let input_amount = self.inputs.iter().map(|i| i.amount()).sum::<Amount>();
63 let output_amount = self.outputs.iter().map(|r| r.amount).sum::<Amount>();
64 let fee = input_amount - output_amount;
65 Ok(MovementUpdate::new()
66 .consumed_vtxos(&self.inputs)
67 .intended_balance(SignedAmount::ZERO)
68 .effective_balance( - fee.to_signed()?)
69 .fee(fee)
70 )
71 }
72}
73
74#[derive(Debug, Clone)]
75pub enum RoundStatus {
76 Confirmed {
78 funding_txid: Txid,
79 },
80 Unconfirmed {
82 funding_txid: Txid,
83 },
84 Pending,
86 Failed {
88 error: String,
89 },
90 Canceled,
92}
93
94impl RoundStatus {
95 pub fn is_final(&self) -> bool {
97 match self {
98 Self::Confirmed { .. } => true,
99 Self::Unconfirmed { .. } => false,
100 Self::Pending => false,
101 Self::Failed { .. } => true,
102 Self::Canceled => true,
103 }
104 }
105
106 pub fn is_success(&self) -> bool {
108 match self {
109 Self::Confirmed { .. } => true,
110 Self::Unconfirmed { .. } => true,
111 Self::Pending => false,
112 Self::Failed { .. } => false,
113 Self::Canceled => false,
114 }
115 }
116}
117
118pub struct RoundState {
131 pub(crate) done: bool,
133
134 pub(crate) participation: RoundParticipation,
136
137 pub(crate) flow: RoundFlowState,
139
140 pub(crate) new_vtxos: Vec<Vtxo<Full>>,
145
146 pub(crate) sent_forfeit_sigs: bool,
153
154 pub(crate) movement_id: Option<MovementId>,
156}
157
158impl RoundState {
159 fn new_interactive(
160 participation: RoundParticipation,
161 movement_id: Option<MovementId>,
162 ) -> Self {
163 Self {
164 participation,
165 movement_id,
166 flow: RoundFlowState::InteractivePending,
167 new_vtxos: Vec::new(),
168 sent_forfeit_sigs: false,
169 done: false,
170 }
171 }
172
173 fn new_non_interactive(
174 participation: RoundParticipation,
175 unlock_hash: UnlockHash,
176 movement_id: Option<MovementId>,
177 ) -> Self {
178 Self {
179 participation,
180 movement_id,
181 flow: RoundFlowState::NonInteractivePending { unlock_hash },
182 new_vtxos: Vec::new(),
183 sent_forfeit_sigs: false,
184 done: false,
185 }
186 }
187
188 pub fn participation(&self) -> &RoundParticipation {
190 &self.participation
191 }
192
193 pub fn unlock_hash(&self) -> Option<UnlockHash> {
195 match self.flow {
196 RoundFlowState::NonInteractivePending { unlock_hash } => Some(unlock_hash),
197 RoundFlowState::InteractivePending => None,
198 RoundFlowState::InteractiveOngoing { .. } => None,
199 RoundFlowState::Failed { .. } => None,
200 RoundFlowState::Canceled => None,
201 RoundFlowState::Finished { unlock_hash, .. } => Some(unlock_hash),
202 }
203 }
204
205 pub fn funding_tx(&self) -> Option<&Transaction> {
206 match self.flow {
207 RoundFlowState::NonInteractivePending { .. } => None,
208 RoundFlowState::InteractivePending => None,
209 RoundFlowState::InteractiveOngoing { .. } => None,
210 RoundFlowState::Failed { .. } => None,
211 RoundFlowState::Canceled => None,
212 RoundFlowState::Finished { ref funding_tx, .. } => Some(funding_tx),
213 }
214 }
215
216 pub fn ongoing_participation(&self) -> bool {
218 match self.flow {
219 RoundFlowState::NonInteractivePending { .. } => false,
220 RoundFlowState::InteractivePending => true,
221 RoundFlowState::InteractiveOngoing { .. } => true,
222 RoundFlowState::Failed { .. } => false,
223 RoundFlowState::Canceled => false,
224 RoundFlowState::Finished { .. } => false,
225 }
226 }
227
228 pub async fn try_cancel(&mut self, wallet: &Wallet) -> anyhow::Result<bool> {
231 let ret = match self.flow {
232 RoundFlowState::NonInteractivePending { .. } => todo!("we have to cancel with server!"),
233 RoundFlowState::Canceled => true,
234 RoundFlowState::Failed { .. } => true,
235 RoundFlowState::InteractivePending | RoundFlowState::InteractiveOngoing { .. } => {
236 self.flow = RoundFlowState::Canceled;
237 true
238 },
239 RoundFlowState::Finished { .. } => false,
240 };
241 if ret {
242 persist_round_failure(wallet, &self.participation, self.movement_id).await
243 .context("failed to persist round failure for cancelation")?;
244 }
245 Ok(ret)
246 }
247
248 async fn try_start_attempt(&mut self, wallet: &Wallet, attempt: &RoundAttempt) {
249 match start_attempt(wallet, &self.participation, attempt).await {
250 Ok(state) => {
251 self.flow = RoundFlowState::InteractiveOngoing {
252 round_seq: attempt.round_seq,
253 attempt_seq: attempt.attempt_seq,
254 state: state,
255 };
256 },
257 Err(e) => {
258 self.flow = RoundFlowState::Failed {
259 error: format!("{:#}", e),
260 };
261 },
262 }
263 }
264
265 pub async fn process_event(
267 &mut self,
268 wallet: &Wallet,
269 event: &RoundEvent,
270 ) -> bool {
271 let _: Infallible = match self.flow {
272 RoundFlowState::InteractivePending => {
273 if let RoundEvent::Attempt(e) = event && e.attempt_seq == 0 {
274 trace!("Joining round attempt {}:{}", e.round_seq, e.attempt_seq);
275 self.try_start_attempt(wallet, e).await;
276 return true;
277 } else {
278 trace!("Ignoring {} event (seq {}:{}), waiting for round to start",
279 event.kind(), event.round_seq(), event.attempt_seq(),
280 );
281 return false;
282 }
283 },
284 RoundFlowState::InteractiveOngoing { round_seq, attempt_seq, ref mut state } => {
285 if let RoundEvent::Failed(e) = event && e.round_seq == round_seq {
288 warn!("Round {} failed by server", round_seq);
289 self.flow = RoundFlowState::Failed {
290 error: format!("round {} failed by server", round_seq),
291 };
292 return true;
293 }
294
295 if event.round_seq() > round_seq {
296 self.flow = RoundFlowState::Failed {
299 error: format!("round {} started while we were on {}",
300 event.round_seq(), round_seq,
301 ),
302 };
303 return true;
304 }
305
306 if event.attempt_seq() < attempt_seq {
307 trace!("ignoring replayed message from old attempt");
308 return false;
309 }
310
311 if let RoundEvent::Attempt(e) = event && e.attempt_seq > attempt_seq {
312 trace!("Joining new round attempt {}:{}", e.round_seq, e.attempt_seq);
313 self.try_start_attempt(wallet, e).await;
314 return true;
315 }
316 trace!("Processing event {} for round attempt {}:{} in state {}",
317 event.kind(), round_seq, attempt_seq, state.kind(),
318 );
319
320 return match progress_attempt(state, wallet, &self.participation, event).await {
321 AttemptProgressResult::NotUpdated => false,
322 AttemptProgressResult::Updated { new_state } => {
323 *state = new_state;
324 true
325 },
326 AttemptProgressResult::Failed(e) => {
327 warn!("Round failed with error: {:#}", e);
328 self.flow = RoundFlowState::Failed {
329 error: format!("{:#}", e),
330 };
331 true
332 },
333 AttemptProgressResult::Finished { funding_tx, vtxos, unlock_hash } => {
334 self.new_vtxos = vtxos;
335 let funding_txid = funding_tx.compute_txid();
336 self.flow = RoundFlowState::Finished { funding_tx, unlock_hash };
337 if let Some(mid) = self.movement_id {
338 if let Err(e) = update_funding_txid(wallet, mid, funding_txid).await {
339 warn!("Error updating the round funding txid: {:#}", e);
340 }
341 }
342 true
343 },
344 };
345 },
346 RoundFlowState::NonInteractivePending { .. }
347 | RoundFlowState::Finished { .. }
348 | RoundFlowState::Failed { .. }
349 | RoundFlowState::Canceled => return false,
350 };
351 }
352
353 pub async fn sync(&mut self, wallet: &Wallet) -> anyhow::Result<RoundStatus> {
358 match self.flow {
359 RoundFlowState::Finished { ref funding_tx, .. } if self.done => {
360 Ok(RoundStatus::Confirmed {
361 funding_txid: funding_tx.compute_txid(),
362 })
363 },
364
365 RoundFlowState::InteractivePending | RoundFlowState::InteractiveOngoing { .. } => {
366 Ok(RoundStatus::Pending)
367 },
368 RoundFlowState::Failed { ref error } => {
369 persist_round_failure(wallet, &self.participation, self.movement_id).await
370 .context("failed to persist round failure")?;
371 Ok(RoundStatus::Failed { error: error.clone() })
372 },
373 RoundFlowState::Canceled => {
374 persist_round_failure(wallet, &self.participation, self.movement_id).await
375 .context("failed to persist round failure")?;
376 Ok(RoundStatus::Canceled)
377 },
378
379 RoundFlowState::NonInteractivePending { unlock_hash } => {
380 match progress_non_interactive(wallet, &self.participation, unlock_hash).await {
381 Ok(HarkProgressResult::RoundPending) => Ok(RoundStatus::Pending),
382 Ok(HarkProgressResult::Ok { funding_tx, new_vtxos }) => {
383 let funding_txid = funding_tx.compute_txid();
384 self.new_vtxos = new_vtxos;
385 self.flow = RoundFlowState::Finished {
386 funding_tx: funding_tx.clone(),
387 unlock_hash: unlock_hash,
388 };
389
390 persist_round_success(
391 wallet,
392 &self.participation,
393 self.movement_id,
394 &self.new_vtxos,
395 &funding_tx,
396 ).await.context("failed to store successful round in DB!")?;
397
398 self.done = true;
399
400 Ok(RoundStatus::Confirmed { funding_txid })
401 },
402 Ok(HarkProgressResult::FundingTxUnconfirmed { funding_txid }) => {
403 if let Some(mid) = self.movement_id {
404 update_funding_txid(wallet, mid, funding_txid).await
405 .context("failed to update funding txid in DB")?;
406 }
407 Ok(RoundStatus::Unconfirmed { funding_txid })
408 },
409
410 Err(HarkForfeitError::Err(e)) => {
413 Err(e.context("error progressing non-interactive round"))
417 },
418 Err(HarkForfeitError::SentForfeits(e)) => {
419 self.sent_forfeit_sigs = true;
420 Err(e.context("error progressing non-interactive round \
421 after sending forfeit tx signatures"))
422 },
423 }
424 },
425 RoundFlowState::Finished { ref funding_tx, unlock_hash } => {
427 let funding_txid = funding_tx.compute_txid();
428 let confirmed = check_funding_tx_confirmations(
429 wallet, funding_txid, &funding_tx,
430 ).await.context("error checking funding tx confirmations")?;
431 if !confirmed {
432 trace!("Funding tx {} not yet deeply enough confirmed", funding_txid);
433 return Ok(RoundStatus::Unconfirmed { funding_txid });
434 }
435
436 match hark_vtxo_swap(
437 wallet, &self.participation, &mut self.new_vtxos, &funding_tx, unlock_hash,
438 ).await {
439 Ok(()) => {
440 persist_round_success(
441 wallet,
442 &self.participation,
443 self.movement_id,
444 &self.new_vtxos,
445 &funding_tx,
446 ).await.context("failed to store successful round in DB!")?;
447
448 self.done = true;
449
450 Ok(RoundStatus::Confirmed { funding_txid })
451 },
452 Err(HarkForfeitError::Err(e)) => {
453 Err(e.context("error forfeiting VTXOs after round"))
454 },
455 Err(HarkForfeitError::SentForfeits(e)) => {
456 self.sent_forfeit_sigs = true;
457 Err(e.context("error after having signed and sent \
458 forfeit signatures to server"))
459 },
460 }
461 },
462 }
463 }
464
465 pub fn output_vtxos(&self) -> Option<&[Vtxo<Full>]> {
468 if self.new_vtxos.is_empty() {
469 None
470 } else {
471 Some(&self.new_vtxos)
472 }
473 }
474
475 pub fn locked_pending_inputs(&self) -> &[Vtxo<Full>] {
478 match self.flow {
480 RoundFlowState::NonInteractivePending { .. }
481 | RoundFlowState::InteractivePending
482 | RoundFlowState::InteractiveOngoing { .. }
483 => {
484 &self.participation.inputs
485 },
486 RoundFlowState::Finished { .. } => if self.done {
487 &[]
489 } else {
490 &self.participation.inputs
491 },
492 RoundFlowState::Failed { .. }
493 | RoundFlowState::Canceled
494 => {
495 &[]
497 },
498 }
499 }
500
501 pub fn pending_balance(&self) -> Amount {
505 if self.done {
506 return Amount::ZERO;
507 }
508
509 match self.flow {
510 RoundFlowState::NonInteractivePending { .. }
511 | RoundFlowState::InteractivePending
512 | RoundFlowState::InteractiveOngoing { .. }
513 | RoundFlowState::Finished { .. }
514 => {
515 self.participation.outputs.iter().map(|o| o.amount).sum()
516 },
517 RoundFlowState::Failed { .. } | RoundFlowState::Canceled => {
518 Amount::ZERO
519 },
520 }
521 }
522}
523
524pub enum RoundFlowState {
529 NonInteractivePending {
531 unlock_hash: UnlockHash,
532 },
533
534 InteractivePending,
536 InteractiveOngoing {
538 round_seq: RoundSeq,
539 attempt_seq: usize,
540 state: AttemptState,
541 },
542
543 Finished {
545 funding_tx: Transaction,
546 unlock_hash: UnlockHash,
547 },
548
549 Failed {
551 error: String,
552 },
553
554 Canceled,
556}
557
558pub enum AttemptState {
563 AwaitingAttempt,
564 AwaitingUnsignedVtxoTree {
565 cosign_keys: Vec<Keypair>,
566 secret_nonces: Vec<Vec<DangerousSecretNonce>>,
567 unlock_hash: UnlockHash,
568 },
569 AwaitingFinishedRound {
570 unsigned_round_tx: Transaction,
571 vtxos_spec: VtxoTreeSpec,
572 unlock_hash: UnlockHash,
573 },
574}
575
576impl AttemptState {
577 fn kind(&self) -> &'static str {
579 match self {
580 Self::AwaitingAttempt => "AwaitingAttempt",
581 Self::AwaitingUnsignedVtxoTree { .. } => "AwaitingUnsignedVtxoTree",
582 Self::AwaitingFinishedRound { .. } => "AwaitingFinishedRound",
583 }
584 }
585}
586
587enum AttemptProgressResult {
589 Finished {
590 funding_tx: Transaction,
591 vtxos: Vec<Vtxo<Full>>,
592 unlock_hash: UnlockHash,
593 },
594 Failed(anyhow::Error),
595 Updated {
601 new_state: AttemptState,
602 },
603 NotUpdated,
604}
605
606async fn start_attempt(
608 wallet: &Wallet,
609 participation: &RoundParticipation,
610 event: &RoundAttempt,
611) -> anyhow::Result<AttemptState> {
612 let (mut srv, ark_info) = wallet.require_server().await.context("server not available")?;
613
614 let cosign_keys = iter::repeat_with(|| Keypair::new(&SECP, &mut rand::thread_rng()))
616 .take(participation.outputs.len())
617 .collect::<Vec<_>>();
618
619 let cosign_nonces = cosign_keys.iter()
622 .map(|key| {
623 let mut secs = Vec::with_capacity(ark_info.nb_round_nonces);
624 let mut pubs = Vec::with_capacity(ark_info.nb_round_nonces);
625 for _ in 0..ark_info.nb_round_nonces {
626 let (s, p) = musig::nonce_pair(key);
627 secs.push(s);
628 pubs.push(p);
629 }
630 (secs, pubs)
631 })
632 .take(participation.outputs.len())
633 .collect::<Vec<(Vec<SecretNonce>, Vec<PublicNonce>)>>();
634
635
636 debug!("Submitting payment request with {} inputs and {} vtxo outputs",
638 participation.inputs.len(), participation.outputs.len(),
639 );
640
641 let signed_reqs = participation.outputs.iter()
642 .zip(cosign_keys.iter())
643 .zip(cosign_nonces.iter())
644 .map(|((req, cosign_key), (_sec, pub_nonces))| {
645 SignedVtxoRequest {
646 vtxo: req.clone(),
647 cosign_pubkey: cosign_key.public_key(),
648 nonces: pub_nonces.clone(),
649 }
650 })
651 .collect::<Vec<_>>();
652
653 let mut input_vtxos = Vec::with_capacity(participation.inputs.len());
654 for vtxo in participation.inputs.iter() {
655 let keypair = wallet.get_vtxo_key(vtxo).await
656 .map_err(HarkForfeitError::Err)?;
657 input_vtxos.push(protos::InputVtxo {
658 vtxo_id: vtxo.id().to_bytes().to_vec(),
659 ownership_proof: {
660 let sig = event.challenge
661 .sign_with(vtxo.id(), &signed_reqs, &keypair);
662 sig.serialize().to_vec()
663 },
664 });
665 }
666
667 wallet.register_vtxos_with_server(&participation.inputs).await
669 .map_err(HarkForfeitError::Err)?;
670
671 let resp = srv.client.submit_payment(protos::SubmitPaymentRequest {
672 input_vtxos: input_vtxos,
673 vtxo_requests: signed_reqs.into_iter().map(Into::into).collect(),
674 #[allow(deprecated)]
675 offboard_requests: vec![],
676 }).await.context("Ark server refused our payment submission")?;
677
678 Ok(AttemptState::AwaitingUnsignedVtxoTree {
679 unlock_hash: UnlockHash::from_bytes(&resp.into_inner().unlock_hash)?,
680 cosign_keys: cosign_keys,
681 secret_nonces: cosign_nonces.into_iter()
682 .map(|(sec, _pub)| sec.into_iter()
683 .map(DangerousSecretNonce::dangerous_from_secret_nonce)
684 .collect())
685 .collect(),
686 })
687}
688
689#[derive(Debug, thiserror::Error)]
691enum HarkForfeitError {
692 #[error("error after forfeits were sent")]
694 SentForfeits(#[source] anyhow::Error),
695 #[error("error before forfeits were sent")]
697 Err(#[source] anyhow::Error),
698}
699
700async fn hark_cosign_leaf(
701 wallet: &Wallet,
702 srv: &mut ServerConnection,
703 funding_tx: &Transaction,
704 vtxo: &mut Vtxo<Full>,
705) -> anyhow::Result<()> {
706 let key = wallet.pubkey_keypair(&vtxo.user_pubkey()).await
707 .context("error fetching keypair").map_err(HarkForfeitError::Err)?
708 .with_context(|| format!(
709 "keypair {} not found for VTXO {}", vtxo.user_pubkey(), vtxo.id(),
710 ))?.1;
711 let (ctx, cosign_req) = LeafVtxoCosignContext::new(vtxo, funding_tx, &key);
712 let cosign_resp = srv.client.request_leaf_vtxo_cosign(
713 protos::LeafVtxoCosignRequest::from(cosign_req),
714 ).await
715 .with_context(|| format!("error requesting leaf cosign for vtxo {}", vtxo.id()))?
716 .into_inner().try_into()
717 .context("bad leaf vtxo cosign response")?;
718 ensure!(ctx.finalize(vtxo, cosign_resp),
719 "failed to finalize VTXO leaf signature for VTXO {}", vtxo.id(),
720 );
721
722 Ok(())
723}
724
725async fn hark_vtxo_swap(
735 wallet: &Wallet,
736 participation: &RoundParticipation,
737 output_vtxos: &mut [Vtxo<Full>],
738 funding_tx: &Transaction,
739 unlock_hash: UnlockHash,
740) -> Result<(), HarkForfeitError> {
741 let (mut srv, _) = wallet.require_server().await.map_err(HarkForfeitError::Err)?;
742
743 wallet.register_vtxos_with_server(&participation.inputs).await
745 .context("couldn't send our input vtxos to server")
746 .map_err(HarkForfeitError::Err)?;
747
748 for vtxo in output_vtxos.iter_mut() {
750 hark_cosign_leaf(wallet, &mut srv, funding_tx, vtxo).await
751 .map_err(HarkForfeitError::Err)?;
752 }
753
754 let server_nonces = srv.client.request_forfeit_nonces(protos::ForfeitNoncesRequest {
757 unlock_hash: unlock_hash.to_byte_array().to_vec(),
758 vtxo_ids: participation.inputs.iter().map(|v| v.id().to_bytes().to_vec()).collect(),
759 }).await
760 .context("request forfeits nonces call failed")
761 .map_err(HarkForfeitError::Err)?
762 .into_inner().public_nonces.into_iter()
763 .map(|b| musig::PublicNonce::from_bytes(b))
764 .collect::<Result<Vec<_>, _>>()
765 .context("invalid forfeit nonces")
766 .map_err(HarkForfeitError::Err)?;
767
768 if server_nonces.len() != participation.inputs.len() {
769 return Err(HarkForfeitError::Err(anyhow!(
770 "server sent {} nonce pairs, expected {}",
771 server_nonces.len(), participation.inputs.len(),
772 )));
773 }
774
775 let mut forfeit_bundles = Vec::with_capacity(participation.inputs.len());
776 for (input, nonces) in participation.inputs.iter().zip(server_nonces.into_iter()) {
777 let user_key = wallet.pubkey_keypair(&input.user_pubkey()).await
778 .ok().flatten().with_context(|| format!(
779 "failed to fetch keypair for vtxo user pubkey {}", input.user_pubkey(),
780 )).map_err(HarkForfeitError::Err)?.1;
781 forfeit_bundles.push(HashLockedForfeitBundle::new(
782 input, unlock_hash, &user_key, &nonces,
783 ))
784 }
785
786 let preimage = srv.client.forfeit_vtxos(protos::ForfeitVtxosRequest {
787 forfeit_bundles: forfeit_bundles.iter().map(|b| b.serialize()).collect(),
788 }).await
789 .context("forfeit vtxos call failed")
790 .map_err(HarkForfeitError::SentForfeits)?
791 .into_inner().unlock_preimage.as_slice().try_into()
792 .context("invalid preimage length")
793 .map_err(HarkForfeitError::SentForfeits)?;
794
795 for vtxo in output_vtxos.iter_mut() {
796 if !vtxo.provide_unlock_preimage(preimage) {
797 return Err(HarkForfeitError::SentForfeits(anyhow!(
798 "invalid preimage {} for vtxo {} with supposed unlock hash {}",
799 preimage.as_hex(), vtxo.id(), unlock_hash,
800 )));
801 }
802
803 vtxo.validate(&funding_tx).with_context(|| format!(
805 "new VTXO {} does not pass validation after hArk forfeit protocol", vtxo.id(),
806 )).map_err(HarkForfeitError::SentForfeits)?;
807 }
808
809 Ok(())
810}
811
812fn check_vtxo_fails_hash_lock(funding_tx: &Transaction, vtxo: &Vtxo<Full>) -> anyhow::Result<()> {
813 match vtxo.validate(funding_tx) {
814 Err(VtxoValidationError::GenesisTransition {
815 genesis_idx, genesis_len, transition_kind, ..
816 }) if genesis_idx + 1 == genesis_len && transition_kind == HARK_TRANSITION_KIND => Ok(()),
817 Ok(()) => Err(anyhow!("new un-unlocked VTXO should fail validation but doesn't: {}",
818 vtxo.serialize_hex(),
819 )),
820 Err(e) => Err(anyhow!("new VTXO {} failed validation: {:#}", vtxo.id(), e)),
821 }
822}
823
824fn check_round_matches_participation(
825 part: &RoundParticipation,
826 new_vtxos: &[Vtxo<Full>],
827 funding_tx: &Transaction,
828) -> anyhow::Result<()> {
829 ensure!(new_vtxos.len() == part.outputs.len(),
830 "unexpected number of VTXOs: got {}, expected {}", new_vtxos.len(), part.outputs.len(),
831 );
832
833 for (vtxo, req) in new_vtxos.iter().zip(&part.outputs) {
834 ensure!(vtxo.amount() == req.amount,
835 "unexpected VTXO amount: got {}, expected {}", vtxo.amount(), req.amount,
836 );
837 ensure!(*vtxo.policy() == req.policy,
838 "unexpected VTXO policy: got {:?}, expected {:?}", vtxo.policy(), req.policy,
839 );
840
841 check_vtxo_fails_hash_lock(funding_tx, vtxo)?;
843 }
844
845 Ok(())
846}
847
848async fn check_funding_tx_confirmations(
858 wallet: &Wallet,
859 funding_txid: Txid,
860 funding_tx: &Transaction,
861) -> anyhow::Result<bool> {
862 let tip = wallet.chain.tip().await.context("chain source error")?;
863 let conf_height = tip - wallet.config.round_tx_required_confirmations + 1;
864 let tx_status = wallet.chain.tx_status(funding_txid).await.context("chain source error")?;
865 trace!("Round funding tx {} confirmation status: {:?} (tip={})",
866 funding_txid, tx_status, tip,
867 );
868 match tx_status {
869 TxStatus::Confirmed(b) if b.height <= conf_height => Ok(true),
870 TxStatus::Mempool | TxStatus::Confirmed(_) => {
871 if wallet.config.round_tx_required_confirmations == 0 {
872 debug!("Accepting round funding tx without confirmations because of configuration");
873 Ok(true)
874 } else {
875 trace!("Hark round funding tx not confirmed (deep enough) yet: {:?}", tx_status);
876 Ok(false)
877 }
878 },
879 TxStatus::NotFound => {
880 if let Err(e) = wallet.chain.broadcast_tx(&funding_tx).await {
885 Err(anyhow!("hark funding tx {} server sent us is rejected by mempool (hex={}): {:#}",
886 funding_txid, serialize_hex(funding_tx), e,
887 ))
888 } else {
889 trace!("hark funding tx {} was not in mempool but we broadcast it", funding_txid);
890 Ok(false)
891 }
892 },
893 }
894}
895
896enum HarkProgressResult {
897 RoundPending,
898 FundingTxUnconfirmed {
899 funding_txid: Txid,
900 },
901 Ok {
902 funding_tx: Transaction,
903 new_vtxos: Vec<Vtxo<Full>>,
904 },
905}
906
907async fn progress_non_interactive(
908 wallet: &Wallet,
909 participation: &RoundParticipation,
910 unlock_hash: UnlockHash,
911) -> Result<HarkProgressResult, HarkForfeitError> {
912 let (mut srv, _) = wallet.require_server().await.map_err(HarkForfeitError::Err)?;
913
914 let resp = srv.client.round_participation_status(protos::RoundParticipationStatusRequest {
915 unlock_hash: unlock_hash.to_byte_array().to_vec(),
916 }).await
917 .context("error checking round participation status")
918 .map_err(HarkForfeitError::Err)?.into_inner();
919 let status = protos::RoundParticipationStatus::try_from(resp.status)
920 .context("unknown status from server")
921 .map_err(HarkForfeitError::Err) ?;
922
923 if status == protos::RoundParticipationStatus::RoundPartPending {
924 trace!("Hark round still pending");
925 return Ok(HarkProgressResult::RoundPending);
926 }
927
928 if status == protos::RoundParticipationStatus::RoundPartReleased {
933 let preimage = resp.unlock_preimage.as_ref().map(|p| p.as_hex());
934 warn!("Server says preimage was already released for hArk participation \
935 with unlock hash {}. Supposed preimage: {:?}", unlock_hash, preimage,
936 );
937 }
938
939 let funding_tx_bytes = resp.round_funding_tx
940 .context("funding txid should be provided when status is not pending")
941 .map_err(HarkForfeitError::Err)?;
942 let funding_tx = deserialize::<Transaction>(&funding_tx_bytes)
943 .context("invalid funding txid")
944 .map_err(HarkForfeitError::Err)?;
945 let funding_txid = funding_tx.compute_txid();
946 trace!("Funding tx for round participation with unlock hash {}: {} ({})",
947 unlock_hash, funding_tx.compute_txid(), funding_tx_bytes.as_hex(),
948 );
949
950 match check_funding_tx_confirmations(wallet, funding_txid, &funding_tx).await {
952 Ok(true) => {},
953 Ok(false) => return Ok(HarkProgressResult::FundingTxUnconfirmed { funding_txid }),
954 Err(e) => return Err(HarkForfeitError::Err(e.context("checking funding tx confirmations"))),
955 }
956
957 let mut new_vtxos = resp.output_vtxos.into_iter()
958 .map(|v| <Vtxo<Full>>::deserialize(&v))
959 .collect::<Result<Vec<_>, _>>()
960 .context("invalid output VTXOs from server")
961 .map_err(HarkForfeitError::Err)?;
962
963 check_round_matches_participation(participation, &new_vtxos, &funding_tx)
965 .context("new VTXOs received from server don't match our participation")
966 .map_err(HarkForfeitError::Err)?;
967
968 hark_vtxo_swap(wallet, participation, &mut new_vtxos, &funding_tx, unlock_hash).await
969 .context("error forfeiting hArk VTXOs")
970 .map_err(HarkForfeitError::SentForfeits)?;
971
972 Ok(HarkProgressResult::Ok { funding_tx, new_vtxos })
973}
974
975async fn progress_attempt(
976 state: &AttemptState,
977 wallet: &Wallet,
978 part: &RoundParticipation,
979 event: &RoundEvent,
980) -> AttemptProgressResult {
981 match (state, event) {
985
986 (
987 AttemptState::AwaitingUnsignedVtxoTree { cosign_keys, secret_nonces, unlock_hash },
988 RoundEvent::VtxoProposal(e),
989 ) => {
990 trace!("Received VtxoProposal: {:#?}", e);
991 match sign_vtxo_tree(
992 wallet,
993 part,
994 &cosign_keys,
995 &secret_nonces,
996 &e.unsigned_round_tx,
997 &e.vtxos_spec,
998 &e.cosign_agg_nonces,
999 ).await {
1000 Ok(()) => {
1001 AttemptProgressResult::Updated {
1002 new_state: AttemptState::AwaitingFinishedRound {
1003 unsigned_round_tx: e.unsigned_round_tx.clone(),
1004 vtxos_spec: e.vtxos_spec.clone(),
1005 unlock_hash: *unlock_hash,
1006 },
1007 }
1008 },
1009 Err(e) => {
1010 trace!("Error signing VTXO tree: {:#}", e);
1011 AttemptProgressResult::Failed(e)
1012 },
1013 }
1014 },
1015
1016 (
1017 AttemptState::AwaitingFinishedRound { unsigned_round_tx, vtxos_spec, unlock_hash },
1018 RoundEvent::Finished(RoundFinished { cosign_sigs, signed_round_tx, .. }),
1019 ) => {
1020 if unsigned_round_tx.compute_txid() != signed_round_tx.compute_txid() {
1021 return AttemptProgressResult::Failed(anyhow!(
1022 "signed funding tx ({}) doesn't match tx received before ({})",
1023 signed_round_tx.compute_txid(), unsigned_round_tx.compute_txid(),
1024 ));
1025 }
1026
1027 if let Err(e) = wallet.chain.broadcast_tx(&signed_round_tx).await {
1028 warn!("Failed to broadcast signed round tx: {:#}", e);
1029 }
1030
1031 match construct_new_vtxos(
1032 part, unsigned_round_tx, vtxos_spec, cosign_sigs,
1033 ).await {
1034 Ok(v) => AttemptProgressResult::Finished {
1035 funding_tx: signed_round_tx.clone(),
1036 vtxos: v,
1037 unlock_hash: *unlock_hash,
1038 },
1039 Err(e) => AttemptProgressResult::Failed(anyhow!(
1040 "failed to construct new VTXOs for round: {:#}", e,
1041 )),
1042 }
1043 },
1044
1045 (state, RoundEvent::Finished(RoundFinished { .. })) => {
1046 AttemptProgressResult::Failed(anyhow!(
1047 "unexpectedly received a finished round while we were in state {}",
1048 state.kind(),
1049 ))
1050 },
1051
1052 (state, _) => {
1053 trace!("Ignoring round event {} in state {}", event.kind(), state.kind());
1054 AttemptProgressResult::NotUpdated
1055 },
1056 }
1057}
1058
1059async fn sign_vtxo_tree(
1060 wallet: &Wallet,
1061 participation: &RoundParticipation,
1062 cosign_keys: &[Keypair],
1063 secret_nonces: &[impl AsRef<[DangerousSecretNonce]>],
1064 unsigned_round_tx: &Transaction,
1065 vtxo_tree: &VtxoTreeSpec,
1066 cosign_agg_nonces: &[musig::AggregatedNonce],
1067) -> anyhow::Result<()> {
1068 let (srv, _) = wallet.require_server().await.context("server not available")?;
1069
1070 if unsigned_round_tx.output.len() < MIN_ROUND_TX_OUTPUTS {
1071 bail!("server sent round tx with less than 2 outputs: {}",
1072 serialize_hex(&unsigned_round_tx),
1073 );
1074 }
1075
1076 let vtxos_utxo = OutPoint::new(unsigned_round_tx.compute_txid(), ROUND_TX_VTXO_TREE_VOUT);
1077
1078 let mut my_vtxos = participation.outputs.iter().collect::<Vec<_>>();
1080 for vtxo_req in vtxo_tree.iter_vtxos() {
1081 if let Some(i) = my_vtxos.iter().position(|v| {
1082 v.policy == vtxo_req.vtxo.policy && v.amount == vtxo_req.vtxo.amount
1083 }) {
1084 my_vtxos.swap_remove(i);
1085 }
1086 }
1087 if !my_vtxos.is_empty() {
1088 bail!("server didn't include all of our vtxos, missing: {:?}", my_vtxos);
1089 }
1090
1091 let unsigned_vtxos = vtxo_tree.clone().into_unsigned_tree(vtxos_utxo);
1092 let iter = participation.outputs.iter().zip(cosign_keys).zip(secret_nonces);
1093 trace!("Sending vtxo signatures to server...");
1094 let _ = try_join_all(iter.map(|((req, key), sec)| async {
1095 let leaf_idx = unsigned_vtxos.spec.leaf_idx_of_req(req).expect("req included");
1096 let secret_nonces = sec.as_ref().iter().map(|s| s.to_sec_nonce()).collect();
1097 let part_sigs = unsigned_vtxos.cosign_branch(
1098 &cosign_agg_nonces, leaf_idx, key, secret_nonces,
1099 ).context("failed to cosign branch: our request not part of tree")?;
1100
1101 info!("Sending {} partial vtxo cosign signatures for pk {}",
1102 part_sigs.len(), key.public_key(),
1103 );
1104
1105 let _ = srv.client.clone().provide_vtxo_signatures(protos::VtxoSignaturesRequest {
1106 pubkey: key.public_key().serialize().to_vec(),
1107 signatures: part_sigs.iter().map(|s| s.serialize().to_vec()).collect(),
1108 }).await.context("error sending vtxo signatures")?;
1109
1110 Result::<(), anyhow::Error>::Ok(())
1111 })).await.context("error sending VTXO signatures")?;
1112 trace!("Done sending vtxo signatures to server");
1113
1114 Ok(())
1115}
1116
1117async fn construct_new_vtxos(
1118 participation: &RoundParticipation,
1119 unsigned_round_tx: &Transaction,
1120 vtxo_tree: &VtxoTreeSpec,
1121 vtxo_cosign_sigs: &[schnorr::Signature],
1122) -> anyhow::Result<Vec<Vtxo<Full>>> {
1123 let round_txid = unsigned_round_tx.compute_txid();
1124 let vtxos_utxo = OutPoint::new(round_txid, ROUND_TX_VTXO_TREE_VOUT);
1125 let vtxo_tree = vtxo_tree.clone().into_unsigned_tree(vtxos_utxo);
1126
1127 if vtxo_tree.verify_cosign_sigs(&vtxo_cosign_sigs).is_err() {
1129 bail!("Received incorrect vtxo cosign signatures from server");
1131 }
1132
1133 let signed_vtxos = vtxo_tree
1134 .into_signed_tree(vtxo_cosign_sigs.to_vec())
1135 .into_cached_tree();
1136
1137 let mut expected_vtxos = participation.outputs.iter().collect::<Vec<_>>();
1138 let total_nb_expected_vtxos = expected_vtxos.len();
1139
1140 let mut new_vtxos = vec![];
1141 for (idx, req) in signed_vtxos.spec.spec.vtxos.iter().enumerate() {
1142 if let Some(expected_idx) = expected_vtxos.iter().position(|r| **r == req.vtxo) {
1143 let vtxo = signed_vtxos.build_vtxo(idx);
1144
1145 check_vtxo_fails_hash_lock(unsigned_round_tx, &vtxo)
1148 .context("constructed invalid vtxo from tree")?;
1149
1150 info!("New VTXO from round: {} ({}, {})",
1151 vtxo.id(), vtxo.amount(), vtxo.policy_type(),
1152 );
1153
1154 new_vtxos.push(vtxo);
1155 expected_vtxos.swap_remove(expected_idx);
1156 }
1157 }
1158
1159 if !expected_vtxos.is_empty() {
1160 if expected_vtxos.len() == total_nb_expected_vtxos {
1161 bail!("None of our VTXOs were present in round!");
1163 } else {
1164 bail!("Server included some of our VTXOs but not all: {} missing: {:?}",
1165 expected_vtxos.len(), expected_vtxos,
1166 );
1167 }
1168 }
1169 Ok(new_vtxos)
1170}
1171
1172async fn persist_round_success(
1174 wallet: &Wallet,
1175 participation: &RoundParticipation,
1176 movement_id: Option<MovementId>,
1177 new_vtxos: &[Vtxo<Full>],
1178 funding_tx: &Transaction,
1179) -> anyhow::Result<()> {
1180 debug!("Persisting newly finished round. {} new vtxos, movement ID {:?}",
1181 new_vtxos.len(), movement_id,
1182 );
1183
1184 let store_result = wallet.store_spendable_vtxos(new_vtxos).await
1188 .context("failed to store new VTXOs");
1189 let spent_result = wallet.mark_vtxos_as_spent(&participation.inputs).await
1190 .context("failed to mark input VTXOs as spent");
1191 let update_result = if let Some(mid) = movement_id {
1192 wallet.movements.finish_movement_with_update(
1193 mid,
1194 MovementStatus::Successful,
1195 MovementUpdate::new()
1196 .produced_vtxos(new_vtxos)
1197 .metadata([("funding_txid".into(), serde_json::to_value(funding_tx.compute_txid())?)]),
1198 ).await.context("failed to mark movement as finished")
1199 } else {
1200 Ok(())
1201 };
1202
1203 store_result?;
1204 spent_result?;
1205 update_result?;
1206
1207 Ok(())
1208}
1209
1210async fn persist_round_failure(
1211 wallet: &Wallet,
1212 participation: &RoundParticipation,
1213 movement_id: Option<MovementId>,
1214) -> anyhow::Result<()> {
1215 debug!("Attempting to persist the failure of a round with the movement ID {:?}", movement_id);
1216 let unlock_result = wallet.unlock_vtxos(&participation.inputs).await;
1217 let finish_result = if let Some(movement_id) = movement_id {
1218 wallet.movements.finish_movement(movement_id, MovementStatus::Failed).await
1219 } else {
1220 Ok(())
1221 };
1222 if let Err(e) = &finish_result {
1223 error!("Failed to mark movement as failed: {:#}", e);
1224 }
1225 match (unlock_result, finish_result) {
1226 (Ok(()), Ok(())) => Ok(()),
1227 (Err(e), _) => Err(e),
1228 (_, Err(e)) => Err(anyhow!("Failed to mark movement as failed: {:#}", e)),
1229 }
1230}
1231
1232async fn update_funding_txid(
1233 wallet: &Wallet,
1234 movement_id: MovementId,
1235 funding_txid: Txid,
1236) -> anyhow::Result<()> {
1237 wallet.movements.update_movement(
1238 movement_id,
1239 MovementUpdate::new()
1240 .metadata([("funding_txid".into(), serde_json::to_value(&funding_txid)?)])
1241 ).await.context("Unable to update funding txid of round")
1242}
1243
1244impl Wallet {
1245 pub async fn next_round_start_time(&self) -> anyhow::Result<SystemTime> {
1247 let (mut srv, _) = self.require_server().await?;
1248 let ts = srv.client.next_round_time(protos::Empty {}).await?.into_inner().timestamp;
1249 Ok(UNIX_EPOCH.checked_add(Duration::from_secs(ts)).context("invalid timestamp")?)
1250 }
1251
1252 pub async fn join_next_round(
1261 &self,
1262 participation: RoundParticipation,
1263 movement_kind: Option<RoundMovement>,
1264 ) -> anyhow::Result<StoredRoundState> {
1265 let movement_id = if let Some(kind) = movement_kind {
1266 Some(self.movements.new_movement_with_update(
1267 Subsystem::ROUND,
1268 kind.to_string(),
1269 participation.to_movement_update()?
1270 ).await?)
1271 } else {
1272 None
1273 };
1274 let state = RoundState::new_interactive(participation, movement_id);
1275
1276 let id = self.db.store_round_state_lock_vtxos(&state).await?;
1277 let state = self.lock_wait_round_state(id).await?
1278 .context("failed to lock fresh round state")?;
1279
1280 Ok(state)
1281 }
1282
1283 pub async fn join_next_round_delegated(
1285 &self,
1286 participation: RoundParticipation,
1287 movement_kind: Option<RoundMovement>,
1288 ) -> anyhow::Result<StoredRoundState<Unlocked>> {
1289 let (mut srv, _) = self.require_server().await?;
1290
1291 let movement_id = if let Some(kind) = movement_kind {
1292 Some(self.movements.new_movement_with_update(
1293 Subsystem::ROUND, kind.to_string(), participation.to_movement_update()?,
1294 ).await?)
1295 } else {
1296 None
1297 };
1298
1299 let mut input_vtxos = Vec::with_capacity(participation.inputs.len());
1301 for vtxo in participation.inputs.iter() {
1302 let keypair = self.get_vtxo_key(vtxo).await
1303 .context("failed to get vtxo keypair")?;
1304 input_vtxos.push(protos::InputVtxo {
1305 vtxo_id: vtxo.id().to_bytes().to_vec(),
1306 ownership_proof: {
1307 let sig = ark::challenges::NonInteractiveRoundParticipationChallenge::sign_with(
1308 vtxo.id(), &participation.outputs, &keypair,
1309 );
1310 sig.serialize().to_vec()
1311 },
1312 });
1313 }
1314
1315 let resp = srv.client.submit_round_participation(protos::RoundParticipationRequest {
1317 input_vtxos,
1318 vtxo_requests: participation.outputs.iter().map(|r| protos::VtxoRequest {
1319 policy: r.policy.serialize(),
1320 amount: r.amount.to_sat(),
1321 }).collect(),
1322 }).await.context("error submitting round participation to server")?.into_inner();
1323
1324 let unlock_hash = UnlockHash::from_bytes(resp.unlock_hash)
1325 .context("invalid unlock hash from server")?;
1326
1327 let state = RoundState::new_non_interactive(participation, unlock_hash, movement_id);
1328
1329 info!("Non-interactive round participation submitted, it will automatically execute \
1330 when you next sync your wallet after the round happened \
1331 (and has sufficient confirmations).",
1332 );
1333
1334 let id = self.db.store_round_state_lock_vtxos(&state).await?;
1335 Ok(StoredRoundState::new(id, state))
1336 }
1337
1338 pub async fn pending_round_state_ids(&self) -> anyhow::Result<Vec<RoundStateId>> {
1340 self.db.get_pending_round_state_ids().await
1341 }
1342
1343 pub async fn pending_round_states(&self) -> anyhow::Result<Vec<StoredRoundState<Unlocked>>> {
1345 let ids = self.db.get_pending_round_state_ids().await?;
1346 let mut states = Vec::with_capacity(ids.len());
1347 for id in ids {
1348 if let Some(state) = self.db.get_round_state_by_id(id).await? {
1349 states.push(state);
1350 }
1351 }
1352 Ok(states)
1353 }
1354
1355 pub async fn pending_round_balance(&self) -> anyhow::Result<Amount> {
1357 let mut ret = Amount::ZERO;
1358 for round in self.pending_round_states().await? {
1359 ret += round.state().pending_balance();
1360 }
1361 Ok(ret)
1362 }
1363
1364 pub async fn pending_round_input_vtxos(&self) -> anyhow::Result<Vec<WalletVtxo>> {
1369 let mut ret = Vec::new();
1370 for round in self.pending_round_states().await? {
1371 let inputs = round.state().locked_pending_inputs();
1372 ret.reserve(inputs.len());
1373 for input in inputs {
1374 let v = self.get_vtxo_by_id(input.id()).await
1375 .context("unknown round input VTXO")?;
1376 ret.push(v);
1377 }
1378 }
1379 Ok(ret)
1380 }
1381
1382 pub async fn sync_pending_rounds(&self) -> anyhow::Result<()> {
1384 let states = self.pending_round_states().await?;
1385 if states.is_empty() {
1386 return Ok(());
1387 }
1388
1389 debug!("Syncing {} pending round states...", states.len());
1390
1391 tokio_stream::iter(states).for_each_concurrent(10, |state| async move {
1392 if state.state().ongoing_participation() {
1394 return;
1395 }
1396
1397 let mut state = match self.lock_wait_round_state(state.id()).await {
1398 Ok(Some(state)) => state,
1399 Ok(None) => return,
1400 Err(e) => {
1401 warn!("Error locking round state: {:#}", e);
1402 return;
1403 },
1404 };
1405
1406 let status = state.state_mut().sync(self).await;
1407 trace!("Synced round #{}, status: {:?}", state.id(), status);
1408 match status {
1409 Ok(RoundStatus::Confirmed { funding_txid }) => {
1410 info!("Round confirmed. Funding tx {}", funding_txid);
1411 if let Err(e) = self.db.remove_round_state(&state).await {
1412 warn!("Error removing confirmed round state from db: {:#}", e);
1413 }
1414 },
1415 Ok(RoundStatus::Unconfirmed { funding_txid }) => {
1416 info!("Waiting for confirmations for round funding tx {}", funding_txid);
1417 if let Err(e) = self.db.update_round_state(&state).await {
1418 warn!("Error updating pending round state in db: {:#}", e);
1419 }
1420 },
1421 Ok(RoundStatus::Pending) => {
1422 if let Err(e) = self.db.update_round_state(&state).await {
1423 warn!("Error updating pending round state in db: {:#}", e);
1424 }
1425 },
1426 Ok(RoundStatus::Failed { error }) => {
1427 error!("Round failed: {}", error);
1428 if let Err(e) = self.db.remove_round_state(&state).await {
1429 warn!("Error removing failed round state from db: {:#}", e);
1430 }
1431 },
1432 Ok(RoundStatus::Canceled) => {
1433 error!("Round canceled");
1434 if let Err(e) = self.db.remove_round_state(&state).await {
1435 warn!("Error removing canceled round state from db: {:#}", e);
1436 }
1437 },
1438 Err(e) => warn!("Error syncing round: {:#}", e),
1439 }
1440 }).await;
1441
1442 Ok(())
1443 }
1444
1445 async fn get_last_round_event(&self) -> anyhow::Result<RoundEvent> {
1447 let (mut srv, _) = self.require_server().await?;
1448 let e = srv.client.last_round_event(protos::Empty {}).await?.into_inner();
1449 Ok(RoundEvent::try_from(e).context("invalid event format from server")?)
1450 }
1451
1452 async fn inner_process_event(
1453 &self,
1454 state: &mut StoredRoundState,
1455 event: Option<&RoundEvent>,
1456 ) {
1457 if let Some(event) = event && state.state().ongoing_participation() {
1458 let updated = state.state_mut().process_event(self, &event).await;
1459 if updated {
1460 if let Err(e) = self.db.update_round_state(&state).await {
1461 error!("Error storing round state #{} after progress: {:#}", state.id(), e);
1462 }
1463 }
1464 }
1465
1466 match state.state_mut().sync(self).await {
1467 Err(e) => warn!("Error syncing round #{}: {:#}", state.id(), e),
1468 Ok(s) if s.is_final() => {
1469 info!("Round #{} finished with result: {:?}", state.id(), s);
1470 if let Err(e) = self.db.remove_round_state(&state).await {
1471 warn!("Failed to remove finished round #{} from db: {:#}", state.id(), e);
1472 }
1473 },
1474 Ok(s) => {
1475 trace!("Round state #{} is now in state {:?}", state.id(), s);
1476 if let Err(e) = self.db.update_round_state(&state).await {
1477 warn!("Error storing round state #{}: {:#}", state.id(), e);
1478 }
1479 },
1480 }
1481 }
1482
1483 pub async fn progress_pending_rounds(
1488 &self,
1489 last_round_event: Option<&RoundEvent>,
1490 ) -> anyhow::Result<()> {
1491 let states = self.pending_round_states().await?;
1492 info!("Processing {} rounds...", states.len());
1493
1494 let mut last_round_event = last_round_event.map(|e| Cow::Borrowed(e));
1495
1496 let has_ongoing_participation = states.iter()
1497 .any(|s| s.state().ongoing_participation());
1498 if has_ongoing_participation && last_round_event.is_none() {
1499 match self.get_last_round_event().await {
1500 Ok(e) => last_round_event = Some(Cow::Owned(e)),
1501 Err(e) => {
1502 warn!("Error fetching round event, \
1503 failed to progress ongoing rounds: {:#}", e);
1504 },
1505 }
1506 }
1507
1508 let event = last_round_event.as_ref().map(|c| c.as_ref());
1509
1510 let futs = states.into_iter().map(async |state| {
1511 let locked = self.lock_wait_round_state(state.id()).await?;
1512 if let Some(mut locked) = locked {
1513 self.inner_process_event(&mut locked, event).await;
1514 }
1515 Ok::<_, anyhow::Error>(())
1516 });
1517
1518 futures::future::join_all(futs).await;
1519
1520 Ok(())
1521 }
1522
1523 pub async fn subscribe_round_events(&self)
1524 -> anyhow::Result<impl Stream<Item = anyhow::Result<RoundEvent>> + Unpin>
1525 {
1526 let (mut srv, _) = self.require_server().await?;
1527 let events = srv.client.subscribe_rounds(protos::Empty {}).await?
1528 .into_inner().map(|m| {
1529 let m = m.context("received error on event stream")?;
1530 let e = RoundEvent::try_from(m.clone())
1531 .with_context(|| format!("error converting rpc round event: {:?}", m))?;
1532 trace!("Received round event: {}", e);
1533 Ok::<_, anyhow::Error>(e)
1534 });
1535 Ok(events)
1536 }
1537
1538 pub async fn participate_ongoing_rounds(&self) -> anyhow::Result<()> {
1543 let mut events = self.subscribe_round_events().await?;
1544
1545 loop {
1546 let state_ids = self.pending_round_states().await?.iter()
1549 .filter(|s| s.state().ongoing_participation())
1550 .map(|s| s.id())
1551 .collect::<Vec<_>>();
1552
1553 if state_ids.is_empty() {
1554 info!("All rounds handled");
1555 return Ok(());
1556 }
1557
1558 let event = events.next().await
1559 .context("events stream broke")?
1560 .context("error on event stream")?;
1561
1562 let futs = state_ids.into_iter().map(async |state| {
1563 let locked = self.lock_wait_round_state(state).await?;
1564 if let Some(mut locked) = locked {
1565 self.inner_process_event(&mut locked, Some(&event)).await;
1566 }
1567 Ok::<_, anyhow::Error>(())
1568 });
1569
1570 futures::future::join_all(futs).await;
1571 }
1572 }
1573
1574 pub async fn cancel_all_pending_rounds(&self) -> anyhow::Result<()> {
1579 let state_ids = self.db.get_pending_round_state_ids().await?;
1581
1582 let futures = state_ids.into_iter().map(|state_id| {
1583 async move {
1584 let mut state = match self.lock_wait_round_state(state_id).await {
1586 Ok(Some(s)) => s,
1587 Ok(None) => return,
1588 Err(e) => return warn!("Error loading round state #{}: {:#}", state_id, e),
1589 };
1590
1591 match state.state_mut().try_cancel(self).await {
1592 Ok(true) => {
1593 if let Err(e) = self.db.remove_round_state(&state).await {
1594 warn!("Error removing canceled round state from db: {:#}", e);
1595 }
1596 },
1597 Ok(false) => {},
1598 Err(e) => warn!("Error trying to cancel round #{}: {:#}", state_id, e),
1599 }
1600 }
1601 });
1602
1603 join_all(futures).await;
1604
1605 Ok(())
1606 }
1607
1608 pub async fn cancel_pending_round(&self, id: RoundStateId) -> anyhow::Result<()> {
1610 let mut state = self.lock_wait_round_state(id).await?
1611 .context("round state not found")?;
1612
1613 if state.state_mut().try_cancel(self).await.context("failed to cancel round")? {
1614 self.db.remove_round_state(&state).await
1615 .context("error removing canceled round state from db")?;
1616 } else {
1617 bail!("failed to cancel round");
1618 }
1619
1620 Ok(())
1621 }
1622
1623 pub(crate) async fn participate_round(
1630 &self,
1631 participation: RoundParticipation,
1632 movement_kind: Option<RoundMovement>,
1633 ) -> anyhow::Result<RoundStatus> {
1634 let mut state = self.join_next_round(participation, movement_kind).await?;
1635
1636 info!("Waiting for a round start...");
1637 let mut events = self.subscribe_round_events().await?;
1638
1639 loop {
1640 if !state.state().ongoing_participation() {
1641 let status = state.state_mut().sync(self).await?;
1642 match status {
1643 RoundStatus::Failed { error } => bail!("round failed: {}", error),
1644 RoundStatus::Canceled => bail!("round canceled"),
1645 status => return Ok(status),
1646 }
1647 }
1648
1649 let event = events.next().await
1650 .context("events stream broke")?
1651 .context("error on event stream")?;
1652 if state.state_mut().process_event(self, &event).await {
1653 self.db.update_round_state(&state).await?;
1654 }
1655 }
1656 }
1657}