bdk_chain/indexed_tx_graph.rs
1//! Contains the [`IndexedTxGraph`] and associated types. Refer to the
2//! [`IndexedTxGraph`] documentation for more.
3use core::{
4 convert::Infallible,
5 fmt::{self, Debug},
6 ops::RangeBounds,
7};
8
9use alloc::{sync::Arc, vec::Vec};
10use bitcoin::{Block, OutPoint, ScriptBuf, Transaction, TxOut, Txid};
11
12use crate::{
13 spk_txout::SpkTxOutIndex,
14 tx_graph::{self, TxGraph},
15 Anchor, BlockId, ChainOracle, Indexer, Merge, TxPosInBlock,
16};
17
18/// A [`TxGraph<A>`] paired with an indexer `I`, enforcing that every insertion into the graph is
19/// simultaneously fed through the indexer.
20///
21/// This guarantees that `tx_graph` and `index` remain in sync: any transaction or floating txout
22/// you add to `tx_graph` has already been processed by `index`.
23#[derive(Debug, Clone)]
24pub struct IndexedTxGraph<A, I> {
25 /// The indexer used for filtering transactions and floating txouts that we are interested in.
26 pub index: I,
27 graph: TxGraph<A>,
28}
29
30impl<A, I: Default> Default for IndexedTxGraph<A, I> {
31 fn default() -> Self {
32 Self {
33 graph: Default::default(),
34 index: Default::default(),
35 }
36 }
37}
38
39impl<A, I> IndexedTxGraph<A, I> {
40 /// Get a reference of the internal transaction graph.
41 pub fn graph(&self) -> &TxGraph<A> {
42 &self.graph
43 }
44}
45
46impl<A: Anchor, I: Indexer> IndexedTxGraph<A, I> {
47 /// Applies the [`ChangeSet`] to the [`IndexedTxGraph`].
48 pub fn apply_changeset(&mut self, changeset: ChangeSet<A, I::ChangeSet>) {
49 self.index.apply_changeset(changeset.indexer);
50
51 for tx in &changeset.tx_graph.txs {
52 self.index.index_tx(tx);
53 }
54 for (&outpoint, txout) in &changeset.tx_graph.txouts {
55 self.index.index_txout(outpoint, txout);
56 }
57
58 self.graph.apply_changeset(changeset.tx_graph);
59 }
60
61 /// Determines the [`ChangeSet`] between `self` and an empty [`IndexedTxGraph`].
62 pub fn initial_changeset(&self) -> ChangeSet<A, I::ChangeSet> {
63 let graph = self.graph.initial_changeset();
64 let indexer = self.index.initial_changeset();
65 ChangeSet {
66 tx_graph: graph,
67 indexer,
68 }
69 }
70
71 // If `tx` replaces a relevant tx, it should also be considered relevant.
72 fn is_tx_or_conflict_relevant(&self, tx: &Transaction) -> bool {
73 self.index.is_tx_relevant(tx)
74 || self
75 .graph
76 .direct_conflicts(tx)
77 .filter_map(|(_, txid)| self.graph.get_tx(txid))
78 .any(|tx| self.index.is_tx_relevant(&tx))
79 }
80}
81
82impl<A: Anchor, I: Indexer> IndexedTxGraph<A, I>
83where
84 I::ChangeSet: Default + Merge,
85{
86 /// Create a new, empty [`IndexedTxGraph`].
87 ///
88 /// The underlying `TxGraph` is initialized with `TxGraph::default()`, and the provided
89 /// `index`er is used as‐is (since there are no existing transactions to process).
90 pub fn new(index: I) -> Self {
91 Self {
92 index,
93 graph: TxGraph::default(),
94 }
95 }
96
97 /// Reconstruct an [`IndexedTxGraph`] from persisted graph + indexer state.
98 ///
99 /// 1. Rebuilds the `TxGraph` from `changeset.tx_graph`.
100 /// 2. Calls your `indexer_from_changeset` closure on `changeset.indexer` to restore any state
101 /// your indexer needs beyond its raw changeset.
102 /// 3. Runs a full `.reindex()`, returning its `ChangeSet` to describe any additional updates
103 /// applied.
104 ///
105 /// # Errors
106 ///
107 /// Returns `Err(E)` if `indexer_from_changeset` fails.
108 ///
109 /// # Examples
110 ///
111 /// ```rust,no_run
112 /// use bdk_chain::IndexedTxGraph;
113 /// # use bdk_chain::indexed_tx_graph::ChangeSet;
114 /// # use bdk_chain::indexer::keychain_txout::{KeychainTxOutIndex, DEFAULT_LOOKAHEAD};
115 /// # use bdk_core::BlockId;
116 /// # use bdk_testenv::anyhow;
117 /// # use miniscript::{Descriptor, DescriptorPublicKey};
118 /// # use std::str::FromStr;
119 /// # let persisted_changeset = ChangeSet::<BlockId, _>::default();
120 /// # let persisted_desc = Some(Descriptor::<DescriptorPublicKey>::from_str("")?);
121 /// # let persisted_change_desc = Some(Descriptor::<DescriptorPublicKey>::from_str("")?);
122 ///
123 /// let (graph, reindex_cs) =
124 /// IndexedTxGraph::from_changeset(persisted_changeset, move |idx_cs| -> anyhow::Result<_> {
125 /// // e.g. KeychainTxOutIndex needs descriptors that weren’t in its change set.
126 /// let mut idx = KeychainTxOutIndex::from_changeset(DEFAULT_LOOKAHEAD, true, idx_cs);
127 /// if let Some(desc) = persisted_desc {
128 /// idx.insert_descriptor("external", desc)?;
129 /// }
130 /// if let Some(desc) = persisted_change_desc {
131 /// idx.insert_descriptor("internal", desc)?;
132 /// }
133 /// Ok(idx)
134 /// })?;
135 /// # Ok::<(), anyhow::Error>(())
136 /// ```
137 pub fn from_changeset<F, E>(
138 changeset: ChangeSet<A, I::ChangeSet>,
139 indexer_from_changeset: F,
140 ) -> Result<(Self, ChangeSet<A, I::ChangeSet>), E>
141 where
142 F: FnOnce(I::ChangeSet) -> Result<I, E>,
143 {
144 let graph = TxGraph::<A>::from_changeset(changeset.tx_graph);
145 let index = indexer_from_changeset(changeset.indexer)?;
146 let mut out = Self { graph, index };
147 let out_changeset = out.reindex();
148 Ok((out, out_changeset))
149 }
150
151 /// Synchronizes the indexer to reflect every entry in the transaction graph.
152 ///
153 /// Iterates over **all** full transactions and floating outputs in `self.graph`, passing each
154 /// into `self.index`. Any indexer-side changes produced (via `index_tx` or `index_txout`) are
155 /// merged into a fresh `ChangeSet`, which is then returned.
156 pub fn reindex(&mut self) -> ChangeSet<A, I::ChangeSet> {
157 let mut changeset = ChangeSet::<A, I::ChangeSet>::default();
158 for tx in self.graph.full_txs() {
159 changeset.indexer.merge(self.index.index_tx(&tx));
160 }
161 for (op, txout) in self.graph.floating_txouts() {
162 changeset.indexer.merge(self.index.index_txout(op, txout));
163 }
164 changeset
165 }
166
167 fn index_tx_graph_changeset(
168 &mut self,
169 tx_graph_changeset: &tx_graph::ChangeSet<A>,
170 ) -> I::ChangeSet {
171 let mut changeset = I::ChangeSet::default();
172 for added_tx in &tx_graph_changeset.txs {
173 changeset.merge(self.index.index_tx(added_tx));
174 }
175 for (&added_outpoint, added_txout) in &tx_graph_changeset.txouts {
176 changeset.merge(self.index.index_txout(added_outpoint, added_txout));
177 }
178 changeset
179 }
180
181 /// Apply an `update` directly.
182 ///
183 /// `update` is a [`tx_graph::TxUpdate<A>`] and the resultant changes is returned as
184 /// [`ChangeSet`].
185 pub fn apply_update(&mut self, update: tx_graph::TxUpdate<A>) -> ChangeSet<A, I::ChangeSet> {
186 let tx_graph = self.graph.apply_update(update);
187 let indexer = self.index_tx_graph_changeset(&tx_graph);
188 ChangeSet { tx_graph, indexer }
189 }
190
191 /// Insert a floating `txout` of given `outpoint`.
192 pub fn insert_txout(&mut self, outpoint: OutPoint, txout: TxOut) -> ChangeSet<A, I::ChangeSet> {
193 let graph = self.graph.insert_txout(outpoint, txout);
194 let indexer = self.index_tx_graph_changeset(&graph);
195 ChangeSet {
196 tx_graph: graph,
197 indexer,
198 }
199 }
200
201 /// Insert and index a transaction into the graph.
202 pub fn insert_tx<T: Into<Arc<Transaction>>>(&mut self, tx: T) -> ChangeSet<A, I::ChangeSet> {
203 let tx_graph = self.graph.insert_tx(tx);
204 let indexer = self.index_tx_graph_changeset(&tx_graph);
205 ChangeSet { tx_graph, indexer }
206 }
207
208 /// Insert an `anchor` for a given transaction.
209 pub fn insert_anchor(&mut self, txid: Txid, anchor: A) -> ChangeSet<A, I::ChangeSet> {
210 self.graph.insert_anchor(txid, anchor).into()
211 }
212
213 /// Insert a unix timestamp of when a transaction is seen in the mempool.
214 ///
215 /// This is used for transaction conflict resolution in [`TxGraph`] where the transaction with
216 /// the later last-seen is prioritized.
217 pub fn insert_seen_at(&mut self, txid: Txid, seen_at: u64) -> ChangeSet<A, I::ChangeSet> {
218 self.graph.insert_seen_at(txid, seen_at).into()
219 }
220
221 /// Inserts the given `evicted_at` for `txid`.
222 ///
223 /// The `evicted_at` timestamp represents the last known time when the transaction was observed
224 /// to be missing from the mempool. If `txid` was previously recorded with an earlier
225 /// `evicted_at` value, it is updated only if the new value is greater.
226 pub fn insert_evicted_at(&mut self, txid: Txid, evicted_at: u64) -> ChangeSet<A, I::ChangeSet> {
227 let tx_graph = self.graph.insert_evicted_at(txid, evicted_at);
228 ChangeSet {
229 tx_graph,
230 ..Default::default()
231 }
232 }
233
234 /// Batch inserts `(txid, evicted_at)` pairs for `txid`s that the graph is tracking.
235 ///
236 /// The `evicted_at` timestamp represents the last known time when the transaction was observed
237 /// to be missing from the mempool. If `txid` was previously recorded with an earlier
238 /// `evicted_at` value, it is updated only if the new value is greater.
239 pub fn batch_insert_relevant_evicted_at(
240 &mut self,
241 evicted_ats: impl IntoIterator<Item = (Txid, u64)>,
242 ) -> ChangeSet<A, I::ChangeSet> {
243 let tx_graph = self.graph.batch_insert_relevant_evicted_at(evicted_ats);
244 ChangeSet {
245 tx_graph,
246 ..Default::default()
247 }
248 }
249
250 /// Batch insert transactions, filtering out those that are irrelevant.
251 ///
252 /// `txs` do not need to be in topological order.
253 ///
254 /// Relevancy is determined by the internal [`Indexer::is_tx_relevant`] implementation of `I`.
255 /// A transaction that conflicts with a relevant transaction is also considered relevant.
256 /// Irrelevant transactions in `txs` will be ignored.
257 pub fn batch_insert_relevant<T: Into<Arc<Transaction>>>(
258 &mut self,
259 txs: impl IntoIterator<Item = (T, impl IntoIterator<Item = A>)>,
260 ) -> ChangeSet<A, I::ChangeSet> {
261 // The algorithm below allows for non-topologically ordered transactions by using two loops.
262 // This is achieved by:
263 // 1. insert all txs into the index. If they are irrelevant then that's fine it will just
264 // not store anything about them.
265 // 2. decide whether to insert them into the graph depending on whether `is_tx_relevant`
266 // returns true or not. (in a second loop).
267 let txs = txs
268 .into_iter()
269 .map(|(tx, anchors)| (<T as Into<Arc<Transaction>>>::into(tx), anchors))
270 .collect::<Vec<_>>();
271
272 let mut indexer = I::ChangeSet::default();
273 for (tx, _) in &txs {
274 indexer.merge(self.index.index_tx(tx));
275 }
276
277 let mut tx_graph = tx_graph::ChangeSet::default();
278 for (tx, anchors) in txs {
279 if self.is_tx_or_conflict_relevant(&tx) {
280 let txid = tx.compute_txid();
281 tx_graph.merge(self.graph.insert_tx(tx.clone()));
282 for anchor in anchors {
283 tx_graph.merge(self.graph.insert_anchor(txid, anchor));
284 }
285 }
286 }
287
288 ChangeSet { tx_graph, indexer }
289 }
290
291 /// Batch insert unconfirmed transactions, filtering out those that are irrelevant.
292 ///
293 /// Relevancy is determined by the internal [`Indexer::is_tx_relevant`] implementation of `I`.
294 /// A transaction that conflicts with a relevant transaction is also considered relevant.
295 /// Irrelevant transactions in `unconfirmed_txs` will be ignored.
296 ///
297 /// Items of `txs` are tuples containing the transaction and a *last seen* timestamp. The
298 /// *last seen* communicates when the transaction is last seen in the mempool which is used for
299 /// conflict-resolution in [`TxGraph`] (refer to [`TxGraph::insert_seen_at`] for details).
300 pub fn batch_insert_relevant_unconfirmed<T: Into<Arc<Transaction>>>(
301 &mut self,
302 unconfirmed_txs: impl IntoIterator<Item = (T, u64)>,
303 ) -> ChangeSet<A, I::ChangeSet> {
304 // The algorithm below allows for non-topologically ordered transactions by using two loops.
305 // This is achieved by:
306 // 1. insert all txs into the index. If they are irrelevant then that's fine it will just
307 // not store anything about them.
308 // 2. decide whether to insert them into the graph depending on whether `is_tx_relevant`
309 // returns true or not. (in a second loop).
310 let txs = unconfirmed_txs
311 .into_iter()
312 .map(|(tx, last_seen)| (<T as Into<Arc<Transaction>>>::into(tx), last_seen))
313 .collect::<Vec<_>>();
314
315 let mut indexer = I::ChangeSet::default();
316 for (tx, _) in &txs {
317 indexer.merge(self.index.index_tx(tx));
318 }
319
320 let graph = self.graph.batch_insert_unconfirmed(
321 txs.into_iter()
322 .filter(|(tx, _)| self.is_tx_or_conflict_relevant(tx))
323 .map(|(tx, seen_at)| (tx.clone(), seen_at))
324 .collect::<Vec<_>>(),
325 );
326
327 ChangeSet {
328 tx_graph: graph,
329 indexer,
330 }
331 }
332
333 /// Batch insert unconfirmed transactions.
334 ///
335 /// Items of `txs` are tuples containing the transaction and a *last seen* timestamp. The
336 /// *last seen* communicates when the transaction is last seen in the mempool which is used for
337 /// conflict-resolution in [`TxGraph`] (refer to [`TxGraph::insert_seen_at`] for details).
338 ///
339 /// To filter out irrelevant transactions, use [`batch_insert_relevant_unconfirmed`] instead.
340 ///
341 /// [`batch_insert_relevant_unconfirmed`]: IndexedTxGraph::batch_insert_relevant_unconfirmed
342 pub fn batch_insert_unconfirmed<T: Into<Arc<Transaction>>>(
343 &mut self,
344 txs: impl IntoIterator<Item = (T, u64)>,
345 ) -> ChangeSet<A, I::ChangeSet> {
346 let graph = self.graph.batch_insert_unconfirmed(txs);
347 let indexer = self.index_tx_graph_changeset(&graph);
348 ChangeSet {
349 tx_graph: graph,
350 indexer,
351 }
352 }
353}
354
355/// Methods are available if the anchor (`A`) can be created from [`TxPosInBlock`].
356impl<A, I> IndexedTxGraph<A, I>
357where
358 I::ChangeSet: Default + Merge,
359 for<'b> A: Anchor + From<TxPosInBlock<'b>>,
360 I: Indexer,
361{
362 /// Batch insert all transactions of the given `block` of `height`, filtering out those that are
363 /// irrelevant.
364 ///
365 /// Each inserted transaction's anchor will be constructed using [`TxPosInBlock`].
366 ///
367 /// Relevancy is determined by the internal [`Indexer::is_tx_relevant`] implementation of `I`.
368 /// A transaction that conflicts with a relevant transaction is also considered relevant.
369 /// Irrelevant transactions in `block` will be ignored.
370 pub fn apply_block_relevant(
371 &mut self,
372 block: &Block,
373 height: u32,
374 ) -> ChangeSet<A, I::ChangeSet> {
375 let block_id = BlockId {
376 hash: block.block_hash(),
377 height,
378 };
379 let mut changeset = ChangeSet::<A, I::ChangeSet>::default();
380 for (tx_pos, tx) in block.txdata.iter().enumerate() {
381 changeset.indexer.merge(self.index.index_tx(tx));
382 if self.is_tx_or_conflict_relevant(tx) {
383 let txid = tx.compute_txid();
384 let anchor = TxPosInBlock {
385 block,
386 block_id,
387 tx_pos,
388 }
389 .into();
390 changeset.tx_graph.merge(self.graph.insert_tx(tx.clone()));
391 changeset
392 .tx_graph
393 .merge(self.graph.insert_anchor(txid, anchor));
394 }
395 }
396 changeset
397 }
398
399 /// Batch insert all transactions of the given `block` of `height`.
400 ///
401 /// Each inserted transaction's anchor will be constructed using [`TxPosInBlock`].
402 ///
403 /// To only insert relevant transactions, use [`apply_block_relevant`] instead.
404 ///
405 /// [`apply_block_relevant`]: IndexedTxGraph::apply_block_relevant
406 pub fn apply_block(&mut self, block: Block, height: u32) -> ChangeSet<A, I::ChangeSet> {
407 let block_id = BlockId {
408 hash: block.block_hash(),
409 height,
410 };
411 let mut graph = tx_graph::ChangeSet::default();
412 for (tx_pos, tx) in block.txdata.iter().enumerate() {
413 let anchor = TxPosInBlock {
414 block: &block,
415 block_id,
416 tx_pos,
417 }
418 .into();
419 graph.merge(self.graph.insert_anchor(tx.compute_txid(), anchor));
420 graph.merge(self.graph.insert_tx(tx.clone()));
421 }
422 let indexer = self.index_tx_graph_changeset(&graph);
423 ChangeSet {
424 tx_graph: graph,
425 indexer,
426 }
427 }
428}
429
430impl<A, X> IndexedTxGraph<A, X>
431where
432 A: Anchor,
433{
434 /// List txids that are expected to exist under the given spks.
435 ///
436 /// This is used to fill
437 /// [`SyncRequestBuilder::expected_spk_txids`](bdk_core::spk_client::SyncRequestBuilder::expected_spk_txids).
438 ///
439 ///
440 /// The spk index range can be contrained with `range`.
441 ///
442 /// # Error
443 ///
444 /// If the [`ChainOracle`] implementation (`chain`) fails, an error will be returned with the
445 /// returned item.
446 ///
447 /// If the [`ChainOracle`] is infallible,
448 /// [`list_expected_spk_txids`](Self::list_expected_spk_txids) can be used instead.
449 pub fn try_list_expected_spk_txids<'a, C, I>(
450 &'a self,
451 chain: &'a C,
452 chain_tip: BlockId,
453 spk_index_range: impl RangeBounds<I> + 'a,
454 ) -> impl Iterator<Item = Result<(ScriptBuf, Txid), C::Error>> + 'a
455 where
456 C: ChainOracle,
457 X: AsRef<SpkTxOutIndex<I>> + 'a,
458 I: fmt::Debug + Clone + Ord + 'a,
459 {
460 self.graph
461 .try_list_expected_spk_txids(chain, chain_tip, &self.index, spk_index_range)
462 }
463
464 /// List txids that are expected to exist under the given spks.
465 ///
466 /// This is the infallible version of
467 /// [`try_list_expected_spk_txids`](Self::try_list_expected_spk_txids).
468 pub fn list_expected_spk_txids<'a, C, I>(
469 &'a self,
470 chain: &'a C,
471 chain_tip: BlockId,
472 spk_index_range: impl RangeBounds<I> + 'a,
473 ) -> impl Iterator<Item = (ScriptBuf, Txid)> + 'a
474 where
475 C: ChainOracle<Error = Infallible>,
476 X: AsRef<SpkTxOutIndex<I>> + 'a,
477 I: fmt::Debug + Clone + Ord + 'a,
478 {
479 self.try_list_expected_spk_txids(chain, chain_tip, spk_index_range)
480 .map(|r| r.expect("infallible"))
481 }
482}
483
484impl<A, I> AsRef<TxGraph<A>> for IndexedTxGraph<A, I> {
485 fn as_ref(&self) -> &TxGraph<A> {
486 &self.graph
487 }
488}
489
490/// Represents changes to an [`IndexedTxGraph`].
491#[derive(Clone, Debug, PartialEq)]
492#[cfg_attr(
493 feature = "serde",
494 derive(serde::Deserialize, serde::Serialize),
495 serde(bound(
496 deserialize = "A: Ord + serde::Deserialize<'de>, IA: serde::Deserialize<'de>",
497 serialize = "A: Ord + serde::Serialize, IA: serde::Serialize"
498 ))
499)]
500#[must_use]
501pub struct ChangeSet<A, IA> {
502 /// [`TxGraph`] changeset.
503 pub tx_graph: tx_graph::ChangeSet<A>,
504 /// [`Indexer`] changeset.
505 pub indexer: IA,
506}
507
508impl<A, IA: Default> Default for ChangeSet<A, IA> {
509 fn default() -> Self {
510 Self {
511 tx_graph: Default::default(),
512 indexer: Default::default(),
513 }
514 }
515}
516
517impl<A: Anchor, IA: Merge> Merge for ChangeSet<A, IA> {
518 fn merge(&mut self, other: Self) {
519 self.tx_graph.merge(other.tx_graph);
520 self.indexer.merge(other.indexer);
521 }
522
523 fn is_empty(&self) -> bool {
524 self.tx_graph.is_empty() && self.indexer.is_empty()
525 }
526}
527
528impl<A, IA: Default> From<tx_graph::ChangeSet<A>> for ChangeSet<A, IA> {
529 fn from(graph: tx_graph::ChangeSet<A>) -> Self {
530 Self {
531 tx_graph: graph,
532 ..Default::default()
533 }
534 }
535}
536
537impl<A, IA> From<(tx_graph::ChangeSet<A>, IA)> for ChangeSet<A, IA> {
538 fn from((tx_graph, indexer): (tx_graph::ChangeSet<A>, IA)) -> Self {
539 Self { tx_graph, indexer }
540 }
541}
542
543#[cfg(feature = "miniscript")]
544impl<A> From<crate::keychain_txout::ChangeSet> for ChangeSet<A, crate::keychain_txout::ChangeSet> {
545 fn from(indexer: crate::keychain_txout::ChangeSet) -> Self {
546 Self {
547 tx_graph: Default::default(),
548 indexer,
549 }
550 }
551}