bark/
mailbox.rs

1
2pub extern crate ark;
3
4pub extern crate bip39;
5pub extern crate lightning_invoice;
6pub extern crate lnurl as lnurllib;
7
8use std::collections::HashMap;
9
10use anyhow::Context;
11use bitcoin::Amount;
12use bitcoin::hex::DisplayHex;
13use bitcoin::secp256k1::Keypair;
14use log::{debug, error, info, trace};
15
16use ark::{ProtocolEncoding, Vtxo};
17use ark::mailbox::MailboxAuthorization;
18use ark::vtxo::Full;
19use server_rpc::protos;
20use server_rpc::protos::mailbox_server::{mailbox_message, ArkoorMessage};
21use crate::movement::{MovementDestination, MovementStatus};
22use crate::movement::update::MovementUpdate;
23use crate::Wallet;
24use crate::subsystem::{ArkoorMovement, Subsystem};
25
26
27/// The maximum number of times we will call the fetch mailbox endpoint in one go
28///
29/// We can't trust the server to honestly tell us to keep trying more forever.
30/// A malicious server could send us empty messages or invalid messages and
31/// lock up our resources forever. So we limit the number of times we will fetch.
32/// If a user actually has more messages left, he will have to call sync again.
33///
34/// (Note that currently the server sends 100 messages per fetch, so this would
35/// only happen for users with more than 1000 pending items.)
36const MAX_MAILBOX_REQUEST_BURST: usize = 10;
37
38impl Wallet {
39	/// Fetch the mailbox keypair.
40	pub fn mailbox_keypair(&self) -> anyhow::Result<Keypair> {
41		Ok(self.seed.to_mailbox_keypair())
42	}
43
44	/// Create a mailbox authorization that is valid until the given expiry time
45	///
46	/// This authorization can be used by third parties to lookup your mailbox
47	/// with the Ark server.
48	pub fn mailbox_authorization(
49		&self,
50		authorization_expiry: chrono::DateTime<chrono::Local>,
51	) -> anyhow::Result<MailboxAuthorization> {
52		Ok(MailboxAuthorization::new(&self.mailbox_keypair()?, authorization_expiry))
53	}
54
55	/// Sync with the mailbox on the Ark server and look for out-of-round received VTXOs.
56	pub async fn sync_mailbox(&self) -> anyhow::Result<()> {
57		let (mut srv, _) = self.require_server().await?;
58
59		// we should be able to do all our syncing in 10 minutes
60		let expiry = chrono::Local::now() + std::time::Duration::from_secs(10 * 60);
61		let auth = self.mailbox_authorization(expiry)?;
62		let mailbox_id = auth.mailbox();
63
64		for _ in 0..MAX_MAILBOX_REQUEST_BURST {
65			let checkpoint = self.get_mailbox_checkpoint().await?;
66			let mailbox_req = protos::mailbox_server::MailboxRequest {
67				unblinded_id: mailbox_id.to_vec(),
68				authorization: Some(auth.serialize()),
69				checkpoint,
70			};
71
72			let mailbox_resp = srv.mailbox_client.read_mailbox(mailbox_req).await
73				.context("error fetching mailbox")?.into_inner();
74			debug!("Ark server has {} mailbox messages for us", mailbox_resp.messages.len());
75
76			for mailbox_msg in mailbox_resp.messages {
77				match mailbox_msg.message {
78					Some(mailbox_message::Message::Arkoor(ArkoorMessage { vtxos })) => {
79						let result = self
80							.process_received_arkoor_package(vtxos, Some(mailbox_msg.checkpoint)).await;
81						if let Err(e) = result {
82							error!("Error processing received arkoor package: {:#}", e);
83						}
84					},
85					None => debug!("Unknown mailbox message: {:?}", mailbox_msg),
86				}
87			}
88
89			if !mailbox_resp.have_more {
90				break;
91			}
92		}
93
94		Ok(())
95	}
96
97	/// Turn raw byte arrays into VTXOs, then validate them.
98	///
99	/// This function doesn't return a result on purpose,
100	/// because we want to make sure we don't early return on
101	/// the first error. This ensure we process all VTXOs, even
102	/// if some are invalid, and print everything we received.
103	async fn process_raw_vtxos(
104		&self,
105		raw_vtxos: Vec<Vec<u8>>,
106	) -> Vec<Vtxo<Full>> {
107		let mut invalid_vtxos = Vec::with_capacity(raw_vtxos.len());
108		let mut valid_vtxos = Vec::with_capacity(raw_vtxos.len());
109
110		for bytes in &raw_vtxos {
111			let vtxo = match Vtxo::<Full>::deserialize(&bytes) {
112				Ok(vtxo) => vtxo,
113				Err(e) => {
114					error!("Failed to deserialize arkoor VTXO: {}: {}", bytes.as_hex(), e);
115					invalid_vtxos.push(bytes);
116					continue;
117				}
118			};
119
120			if let Err(e) = self.validate_vtxo(&vtxo).await {
121				error!("Received invalid arkoor VTXO {} from server: {}", vtxo.id(), e);
122				invalid_vtxos.push(bytes);
123				continue;
124			}
125
126			valid_vtxos.push(vtxo);
127		}
128
129		// We log all invalid VTXOs to keep track
130		if !invalid_vtxos.is_empty() {
131			error!("Received {} invalid arkoor VTXOs out of {} from server", invalid_vtxos.len(), raw_vtxos.len());
132		}
133
134		valid_vtxos
135	}
136
137	async fn process_received_arkoor_package(
138		&self,
139		raw_vtxos: Vec<Vec<u8>>,
140		checkpoint: Option<u64>,
141	) -> anyhow::Result<()> {
142		let vtxos = self.process_raw_vtxos(raw_vtxos).await;
143
144		let mut new_vtxos = Vec::with_capacity(vtxos.len());
145		for vtxo in &vtxos {
146			// Skip if already in wallet
147			if self.db.get_wallet_vtxo(vtxo.id()).await?.is_some() {
148				debug!("Ignoring duplicate arkoor VTXO {}", vtxo.id());
149				continue;
150			}
151
152			trace!("Received arkoor VTXO {} for {} (checkpoint {:?})", vtxo.id(), vtxo.amount(), checkpoint);
153			new_vtxos.push(vtxo);
154		}
155
156		if new_vtxos.is_empty() {
157			return Ok(());
158		}
159
160		let balance = vtxos
161			.iter()
162			.map(|vtxo| vtxo.amount()).sum::<Amount>()
163			.to_signed()?;
164		self.store_spendable_vtxos(&vtxos).await?;
165
166		// Build received_on destinations from received VTXOs, aggregated by address
167		let mut received_by_address = HashMap::<ark::Address, Amount>::new();
168		for vtxo in &vtxos {
169			if let Ok(Some((index, _))) = self.pubkey_keypair(&vtxo.user_pubkey()).await {
170				if let Ok(address) = self.peak_address(index).await {
171					*received_by_address.entry(address).or_default() += vtxo.amount();
172				}
173			}
174		}
175		let received_on: Vec<_> = received_by_address
176			.into_iter()
177			.map(|(addr, amount)| MovementDestination::ark(addr, amount))
178			.collect();
179
180		let movement_id = self.movements.new_finished_movement(
181			Subsystem::ARKOOR,
182			ArkoorMovement::Receive.to_string(),
183			MovementStatus::Successful,
184			MovementUpdate::new()
185				.produced_vtxos(&vtxos)
186				.intended_and_effective_balance(balance)
187				.received_on(received_on),
188		).await?;
189
190		info!("Received arkoor (movement {}) for {}", movement_id, balance);
191
192		if let Some(checkpoint) = checkpoint {
193			self.store_mailbox_checkpoint(checkpoint).await?;
194		}
195
196		Ok(())
197	}
198
199	async fn get_mailbox_checkpoint(&self) -> anyhow::Result<u64> {
200		Ok(self.db.get_mailbox_checkpoint().await?)
201	}
202
203	async fn store_mailbox_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
204		Ok(self.db.store_mailbox_checkpoint(checkpoint).await?)
205	}
206}