1
2pub extern crate ark;
3
4pub extern crate bip39;
5pub extern crate lightning_invoice;
6pub extern crate lnurl as lnurllib;
7
8use std::collections::HashMap;
9
10use anyhow::Context;
11use bitcoin::Amount;
12use bitcoin::hex::DisplayHex;
13use bitcoin::secp256k1::Keypair;
14use log::{debug, error, info, trace};
15
16use ark::{ProtocolEncoding, Vtxo};
17use ark::mailbox::MailboxAuthorization;
18use ark::vtxo::Full;
19use server_rpc::protos;
20use server_rpc::protos::mailbox_server::{mailbox_message, ArkoorMessage};
21use crate::movement::{MovementDestination, MovementStatus};
22use crate::movement::update::MovementUpdate;
23use crate::Wallet;
24use crate::subsystem::{ArkoorMovement, Subsystem};
25
26
27const MAX_MAILBOX_REQUEST_BURST: usize = 10;
37
38impl Wallet {
39 pub fn mailbox_keypair(&self) -> anyhow::Result<Keypair> {
41 Ok(self.seed.to_mailbox_keypair())
42 }
43
44 pub fn mailbox_authorization(
49 &self,
50 authorization_expiry: chrono::DateTime<chrono::Local>,
51 ) -> anyhow::Result<MailboxAuthorization> {
52 Ok(MailboxAuthorization::new(&self.mailbox_keypair()?, authorization_expiry))
53 }
54
55 pub async fn sync_mailbox(&self) -> anyhow::Result<()> {
57 let (mut srv, _) = self.require_server().await?;
58
59 let expiry = chrono::Local::now() + std::time::Duration::from_secs(10 * 60);
61 let auth = self.mailbox_authorization(expiry)?;
62 let mailbox_id = auth.mailbox();
63
64 for _ in 0..MAX_MAILBOX_REQUEST_BURST {
65 let checkpoint = self.get_mailbox_checkpoint().await?;
66 let mailbox_req = protos::mailbox_server::MailboxRequest {
67 unblinded_id: mailbox_id.to_vec(),
68 authorization: Some(auth.serialize()),
69 checkpoint,
70 };
71
72 let mailbox_resp = srv.mailbox_client.read_mailbox(mailbox_req).await
73 .context("error fetching mailbox")?.into_inner();
74 debug!("Ark server has {} mailbox messages for us", mailbox_resp.messages.len());
75
76 for mailbox_msg in mailbox_resp.messages {
77 match mailbox_msg.message {
78 Some(mailbox_message::Message::Arkoor(ArkoorMessage { vtxos })) => {
79 let result = self
80 .process_received_arkoor_package(vtxos, Some(mailbox_msg.checkpoint)).await;
81 if let Err(e) = result {
82 error!("Error processing received arkoor package: {:#}", e);
83 }
84 },
85 None => debug!("Unknown mailbox message: {:?}", mailbox_msg),
86 }
87 }
88
89 if !mailbox_resp.have_more {
90 break;
91 }
92 }
93
94 Ok(())
95 }
96
97 async fn process_raw_vtxos(
104 &self,
105 raw_vtxos: Vec<Vec<u8>>,
106 ) -> Vec<Vtxo<Full>> {
107 let mut invalid_vtxos = Vec::with_capacity(raw_vtxos.len());
108 let mut valid_vtxos = Vec::with_capacity(raw_vtxos.len());
109
110 for bytes in &raw_vtxos {
111 let vtxo = match Vtxo::<Full>::deserialize(&bytes) {
112 Ok(vtxo) => vtxo,
113 Err(e) => {
114 error!("Failed to deserialize arkoor VTXO: {}: {}", bytes.as_hex(), e);
115 invalid_vtxos.push(bytes);
116 continue;
117 }
118 };
119
120 if let Err(e) = self.validate_vtxo(&vtxo).await {
121 error!("Received invalid arkoor VTXO {} from server: {}", vtxo.id(), e);
122 invalid_vtxos.push(bytes);
123 continue;
124 }
125
126 valid_vtxos.push(vtxo);
127 }
128
129 if !invalid_vtxos.is_empty() {
131 error!("Received {} invalid arkoor VTXOs out of {} from server", invalid_vtxos.len(), raw_vtxos.len());
132 }
133
134 valid_vtxos
135 }
136
137 async fn process_received_arkoor_package(
138 &self,
139 raw_vtxos: Vec<Vec<u8>>,
140 checkpoint: Option<u64>,
141 ) -> anyhow::Result<()> {
142 let vtxos = self.process_raw_vtxos(raw_vtxos).await;
143
144 let mut new_vtxos = Vec::with_capacity(vtxos.len());
145 for vtxo in &vtxos {
146 if self.db.get_wallet_vtxo(vtxo.id()).await?.is_some() {
148 debug!("Ignoring duplicate arkoor VTXO {}", vtxo.id());
149 continue;
150 }
151
152 trace!("Received arkoor VTXO {} for {} (checkpoint {:?})", vtxo.id(), vtxo.amount(), checkpoint);
153 new_vtxos.push(vtxo);
154 }
155
156 if new_vtxos.is_empty() {
157 return Ok(());
158 }
159
160 let balance = vtxos
161 .iter()
162 .map(|vtxo| vtxo.amount()).sum::<Amount>()
163 .to_signed()?;
164 self.store_spendable_vtxos(&vtxos).await?;
165
166 let mut received_by_address = HashMap::<ark::Address, Amount>::new();
168 for vtxo in &vtxos {
169 if let Ok(Some((index, _))) = self.pubkey_keypair(&vtxo.user_pubkey()).await {
170 if let Ok(address) = self.peak_address(index).await {
171 *received_by_address.entry(address).or_default() += vtxo.amount();
172 }
173 }
174 }
175 let received_on: Vec<_> = received_by_address
176 .into_iter()
177 .map(|(addr, amount)| MovementDestination::ark(addr, amount))
178 .collect();
179
180 let movement_id = self.movements.new_finished_movement(
181 Subsystem::ARKOOR,
182 ArkoorMovement::Receive.to_string(),
183 MovementStatus::Successful,
184 MovementUpdate::new()
185 .produced_vtxos(&vtxos)
186 .intended_and_effective_balance(balance)
187 .received_on(received_on),
188 ).await?;
189
190 info!("Received arkoor (movement {}) for {}", movement_id, balance);
191
192 if let Some(checkpoint) = checkpoint {
193 self.store_mailbox_checkpoint(checkpoint).await?;
194 }
195
196 Ok(())
197 }
198
199 async fn get_mailbox_checkpoint(&self) -> anyhow::Result<u64> {
200 Ok(self.db.get_mailbox_checkpoint().await?)
201 }
202
203 async fn store_mailbox_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
204 Ok(self.db.store_mailbox_checkpoint(checkpoint).await?)
205 }
206}