diff --git a/crates/blockchain/fork_choice/src/lib.rs b/crates/blockchain/fork_choice/src/lib.rs index b35a6d38..27669694 100644 --- a/crates/blockchain/fork_choice/src/lib.rs +++ b/crates/blockchain/fork_choice/src/lib.rs @@ -2,6 +2,9 @@ use std::collections::HashMap; use ethlambda_types::{attestation::AttestationData, primitives::H256}; +mod proto_array; +pub use proto_array::{ProtoArray, VoteTracker}; + /// Compute per-block attestation weights for the fork choice tree. /// /// For each validator attestation, walks backward from the attestation's head diff --git a/crates/blockchain/fork_choice/src/proto_array.rs b/crates/blockchain/fork_choice/src/proto_array.rs new file mode 100644 index 00000000..51694850 --- /dev/null +++ b/crates/blockchain/fork_choice/src/proto_array.rs @@ -0,0 +1,612 @@ +use std::collections::HashMap; + +use ethlambda_types::{attestation::AttestationData, primitives::H256}; + +/// A node in the proto-array fork choice tree. +#[derive(Debug, Clone)] +#[allow(dead_code)] +struct ProtoNode { + root: H256, + parent: Option, + slot: u64, + /// Subtree weight: direct votes on this node + all descendant votes. + weight: i64, + /// Index of the heaviest direct child (used for O(depth) head lookup). + best_child: Option, +} + +/// Incremental fork choice structure that maintains the block tree and +/// propagates vote weight changes via a single backward pass. +/// +/// Nodes are append-only (sorted by insertion order, which respects slot ordering). +/// Iterating backward guarantees children are always processed before parents. +#[derive(Debug, Clone, Default)] +pub struct ProtoArray { + nodes: Vec, + indices: HashMap, +} + +impl ProtoArray { + pub fn new() -> Self { + Self::default() + } + + /// Register a new block in the tree. O(1) append. + /// + /// The parent must already exist in the array (or be absent for the anchor block). + pub fn on_block(&mut self, root: H256, parent_root: H256, slot: u64) { + if self.indices.contains_key(&root) { + return; + } + + let parent_index = self.indices.get(&parent_root).copied(); + let index = self.nodes.len(); + + self.nodes.push(ProtoNode { + root, + parent: parent_index, + slot, + weight: 0, + best_child: None, + }); + self.indices.insert(root, index); + } + + /// Apply score deltas and propagate weights upward in a single backward pass. + /// + /// After this call, each node's `weight` reflects the total votes for its subtree, + /// and `best_child` pointers are updated. + pub fn apply_score_changes(&mut self, deltas: &mut [i64]) { + for i in (0..self.nodes.len()).rev() { + if i < deltas.len() { + self.nodes[i].weight += deltas[i]; + } + + let Some(parent_idx) = self.nodes[i].parent else { + continue; + }; + + // Propagate this node's delta to parent + if i < deltas.len() && parent_idx < deltas.len() { + deltas[parent_idx] += deltas[i]; + } + + // Update best_child: pick the child with highest weight, tiebreak by root hash + self.maybe_update_best_child(parent_idx, i); + } + } + + /// Find the head of the chain starting from the justified root. + /// + /// Follows `best_child` pointers from the justified root down to a leaf. + /// Returns the justified root itself if it has no children. + pub fn find_head(&self, justified_root: H256) -> H256 { + self.find_head_with_threshold(justified_root, 0) + } + + /// Find the head with a minimum weight threshold. + /// + /// Like `find_head`, but stops descending when the best child's subtree + /// weight is below `min_score`. Since `best_child` always points to the + /// heaviest child, if it doesn't meet the threshold, no child can. + pub fn find_head_with_threshold(&self, justified_root: H256, min_score: i64) -> H256 { + let Some(&start_idx) = self.indices.get(&justified_root) else { + return justified_root; + }; + + let mut current_idx = start_idx; + while let Some(best_child_idx) = self.nodes[current_idx].best_child { + if self.nodes[best_child_idx].weight < min_score { + break; + } + current_idx = best_child_idx; + } + + self.nodes[current_idx].root + } + + /// Rebuild the array keeping only descendants of the finalized root. + /// + /// All indices are recomputed. O(nodes). + pub fn prune(&mut self, finalized_root: H256) { + let Some(&finalized_idx) = self.indices.get(&finalized_root) else { + return; + }; + + // Collect indices of nodes to keep: finalized root + all descendants + let mut keep = vec![false; self.nodes.len()]; + keep[finalized_idx] = true; + for i in (finalized_idx + 1)..self.nodes.len() { + if let Some(parent) = self.nodes[i].parent + && keep[parent] + { + keep[i] = true; + } + } + + // Build new array with only kept nodes, mapping old indices to new + let mut old_to_new: HashMap = HashMap::new(); + let mut new_nodes = Vec::new(); + let mut new_indices = HashMap::new(); + + for (old_idx, node) in self.nodes.iter().enumerate() { + if !keep[old_idx] { + continue; + } + let new_idx = new_nodes.len(); + old_to_new.insert(old_idx, new_idx); + new_indices.insert(node.root, new_idx); + new_nodes.push(node.clone()); + } + + // Remap parent and best_child indices + for node in &mut new_nodes { + node.parent = node.parent.and_then(|p| old_to_new.get(&p).copied()); + node.best_child = node.best_child.and_then(|c| old_to_new.get(&c).copied()); + } + + self.nodes = new_nodes; + self.indices = new_indices; + } + + /// Number of nodes currently in the array. + pub fn len(&self) -> usize { + self.nodes.len() + } + + /// Whether the array is empty. + pub fn is_empty(&self) -> bool { + self.nodes.is_empty() + } + + /// Get the index for a block root, if it exists. + pub fn get_index(&self, root: &H256) -> Option { + self.indices.get(root).copied() + } + + fn maybe_update_best_child(&mut self, parent_idx: usize, child_idx: usize) { + let child_weight = self.nodes[child_idx].weight; + let child_root = self.nodes[child_idx].root; + + let dominated = match self.nodes[parent_idx].best_child { + None => true, + Some(current_best) => { + let best_weight = self.nodes[current_best].weight; + let best_root = self.nodes[current_best].root; + (child_weight, child_root) > (best_weight, best_root) + } + }; + + if dominated { + self.nodes[parent_idx].best_child = Some(child_idx); + } + } +} + +/// Tracks each validator's latest head vote and computes deltas between updates. +#[derive(Debug, Clone, Default)] +pub struct VoteTracker { + /// Current head vote per validator. Indexed by validator_id. + votes: Vec>, +} + +impl VoteTracker { + pub fn new() -> Self { + Self::default() + } + + /// Compare current votes against new attestations and produce a delta array. + /// + /// For each validator whose vote changed: + /// - old vote's node gets -1 + /// - new vote's node gets +1 + /// + /// After computing deltas, internal state is updated to reflect new votes. + pub fn compute_deltas( + &mut self, + new_attestations: &HashMap, + proto_array: &ProtoArray, + ) -> Vec { + let mut deltas = vec![0i64; proto_array.len()]; + + for (&validator_id, attestation) in new_attestations { + let new_root = attestation.head.root; + let id = validator_id as usize; + + // Grow votes vec if needed + if id >= self.votes.len() { + self.votes.resize(id + 1, None); + } + + let old_root = self.votes[id]; + + // Skip if vote hasn't changed + if old_root == Some(new_root) { + continue; + } + + // Remove weight from old vote + if let Some(old) = old_root + && let Some(idx) = proto_array.get_index(&old) + { + deltas[idx] -= 1; + } + + // Add weight to new vote + if let Some(idx) = proto_array.get_index(&new_root) { + deltas[idx] += 1; + } + + self.votes[id] = Some(new_root); + } + + deltas + } + + /// Reset vote tracker state. Used after pruning when votes may reference + /// nodes that no longer exist. + pub fn reset(&mut self) { + self.votes.clear(); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use ethlambda_types::checkpoint::Checkpoint; + + fn h(byte: u8) -> H256 { + H256::from([byte; 32]) + } + + fn make_attestation(head_root: H256, slot: u64) -> AttestationData { + AttestationData { + slot, + head: Checkpoint { + root: head_root, + slot, + }, + target: Checkpoint::default(), + source: Checkpoint::default(), + } + } + + // ==================== ProtoArray tests ==================== + + #[test] + fn linear_chain_head() { + // anchor(0) -> a(1) -> b(2) -> c(3) + let mut pa = ProtoArray::new(); + pa.on_block(h(0), H256::ZERO, 0); + pa.on_block(h(1), h(0), 1); + pa.on_block(h(2), h(1), 2); + pa.on_block(h(3), h(2), 3); + + // One validator votes for c + let mut attestations = HashMap::new(); + attestations.insert(0, make_attestation(h(3), 3)); + + let mut vt = VoteTracker::new(); + let mut deltas = vt.compute_deltas(&attestations, &pa); + pa.apply_score_changes(&mut deltas); + + assert_eq!(pa.find_head(h(0)), h(3)); + } + + #[test] + fn fork_heavier_branch_wins() { + // anchor(0) + // / \ + // a(1) b(1) + // 2 votes for a, 1 vote for b → head = a + let mut pa = ProtoArray::new(); + pa.on_block(h(0), H256::ZERO, 0); + pa.on_block(h(1), h(0), 1); // a + pa.on_block(h(2), h(0), 1); // b + + let mut attestations = HashMap::new(); + attestations.insert(0, make_attestation(h(1), 1)); + attestations.insert(1, make_attestation(h(1), 1)); + attestations.insert(2, make_attestation(h(2), 1)); + + let mut vt = VoteTracker::new(); + let mut deltas = vt.compute_deltas(&attestations, &pa); + pa.apply_score_changes(&mut deltas); + + assert_eq!(pa.find_head(h(0)), h(1)); + } + + #[test] + fn fork_tiebreak_by_root_hash() { + // Equal weight → highest root hash wins + let mut pa = ProtoArray::new(); + pa.on_block(h(0), H256::ZERO, 0); + pa.on_block(h(1), h(0), 1); + pa.on_block(h(2), h(0), 1); + + let mut attestations = HashMap::new(); + attestations.insert(0, make_attestation(h(1), 1)); + attestations.insert(1, make_attestation(h(2), 1)); + + let mut vt = VoteTracker::new(); + let mut deltas = vt.compute_deltas(&attestations, &pa); + pa.apply_score_changes(&mut deltas); + + // h(2) > h(1) lexicographically + assert_eq!(pa.find_head(h(0)), h(2)); + } + + #[test] + fn vote_change_shifts_head() { + // Fork: anchor(0) -> a(1), anchor(0) -> b(1) + // Initially: 2 votes for a, 1 for b → head = a + // Then: move 2 votes to b → head = b + let mut pa = ProtoArray::new(); + pa.on_block(h(0), H256::ZERO, 0); + pa.on_block(h(1), h(0), 1); // a + pa.on_block(h(2), h(0), 1); // b + + let mut vt = VoteTracker::new(); + + // Round 1: 2 votes a, 1 vote b + let mut att1 = HashMap::new(); + att1.insert(0, make_attestation(h(1), 1)); + att1.insert(1, make_attestation(h(1), 1)); + att1.insert(2, make_attestation(h(2), 1)); + let mut deltas = vt.compute_deltas(&att1, &pa); + pa.apply_score_changes(&mut deltas); + assert_eq!(pa.find_head(h(0)), h(1)); + + // Round 2: move validators 0,1 to b + let mut att2 = HashMap::new(); + att2.insert(0, make_attestation(h(2), 1)); + att2.insert(1, make_attestation(h(2), 1)); + att2.insert(2, make_attestation(h(2), 1)); + let mut deltas = vt.compute_deltas(&att2, &pa); + pa.apply_score_changes(&mut deltas); + assert_eq!(pa.find_head(h(0)), h(2)); + } + + #[test] + fn unchanged_votes_produce_zero_deltas() { + let mut pa = ProtoArray::new(); + pa.on_block(h(0), H256::ZERO, 0); + pa.on_block(h(1), h(0), 1); + + let mut vt = VoteTracker::new(); + let mut attestations = HashMap::new(); + attestations.insert(0, make_attestation(h(1), 1)); + + // First call establishes the vote + let mut deltas = vt.compute_deltas(&attestations, &pa); + pa.apply_score_changes(&mut deltas); + + // Second call with same votes → all deltas should be zero + let deltas = vt.compute_deltas(&attestations, &pa); + assert!(deltas.iter().all(|&d| d == 0)); + } + + #[test] + fn prune_removes_pre_finalized_nodes() { + // anchor(0) -> a(1) -> b(2) -> c(3) + // Finalize at b(2) → anchor and a should be pruned + let mut pa = ProtoArray::new(); + pa.on_block(h(0), H256::ZERO, 0); + pa.on_block(h(1), h(0), 1); + pa.on_block(h(2), h(1), 2); + pa.on_block(h(3), h(2), 3); + + pa.prune(h(2)); + + assert_eq!(pa.len(), 2); // b and c remain + assert!(pa.get_index(&h(0)).is_none()); + assert!(pa.get_index(&h(1)).is_none()); + assert!(pa.get_index(&h(2)).is_some()); + assert!(pa.get_index(&h(3)).is_some()); + } + + #[test] + fn prune_preserves_fork_descendants() { + // anchor(0) + // | + // a(1) ← finalize here + // / \ + // b(2) c(2) + let mut pa = ProtoArray::new(); + pa.on_block(h(0), H256::ZERO, 0); + pa.on_block(h(1), h(0), 1); + pa.on_block(h(2), h(1), 2); + pa.on_block(h(3), h(1), 2); + + pa.prune(h(1)); + + assert_eq!(pa.len(), 3); // a, b, c + assert!(pa.get_index(&h(0)).is_none()); + assert!(pa.get_index(&h(1)).is_some()); + assert!(pa.get_index(&h(2)).is_some()); + assert!(pa.get_index(&h(3)).is_some()); + } + + #[test] + fn find_head_with_deep_chain() { + // Build a chain of 50 blocks, all votes on the tip + let mut pa = ProtoArray::new(); + pa.on_block(h(0), H256::ZERO, 0); + for i in 1..50u8 { + pa.on_block(h(i), h(i - 1), i as u64); + } + + let mut attestations = HashMap::new(); + attestations.insert(0, make_attestation(h(49), 49)); + attestations.insert(1, make_attestation(h(49), 49)); + + let mut vt = VoteTracker::new(); + let mut deltas = vt.compute_deltas(&attestations, &pa); + pa.apply_score_changes(&mut deltas); + + assert_eq!(pa.find_head(h(0)), h(49)); + } + + #[test] + fn duplicate_on_block_is_idempotent() { + let mut pa = ProtoArray::new(); + pa.on_block(h(0), H256::ZERO, 0); + pa.on_block(h(1), h(0), 1); + pa.on_block(h(1), h(0), 1); // duplicate + + assert_eq!(pa.len(), 2); + } + + #[test] + fn find_head_no_votes_returns_justified() { + let mut pa = ProtoArray::new(); + pa.on_block(h(0), H256::ZERO, 0); + pa.on_block(h(1), h(0), 1); + pa.on_block(h(2), h(0), 1); + + // No votes → no best_child set → returns justified root + assert_eq!(pa.find_head(h(0)), h(0)); + } + + #[test] + fn find_head_unknown_justified_returns_it() { + let pa = ProtoArray::new(); + assert_eq!(pa.find_head(h(99)), h(99)); + } + + #[test] + fn weight_propagation_through_chain() { + // anchor(0) -> a(1) -> b(2) + // Vote for b should propagate weight to a and anchor + let mut pa = ProtoArray::new(); + pa.on_block(h(0), H256::ZERO, 0); + pa.on_block(h(1), h(0), 1); + pa.on_block(h(2), h(1), 2); + + let mut attestations = HashMap::new(); + attestations.insert(0, make_attestation(h(2), 2)); + + let mut vt = VoteTracker::new(); + let mut deltas = vt.compute_deltas(&attestations, &pa); + pa.apply_score_changes(&mut deltas); + + // best_child chain should lead from anchor through a to b + assert_eq!(pa.find_head(h(0)), h(2)); + assert_eq!(pa.find_head(h(1)), h(2)); + } + + #[test] + fn prune_then_new_blocks_and_votes() { + // anchor(0) -> a(1) -> b(2) -> c(3) + // Finalize b, then add d(4) as child of c, vote for d + let mut pa = ProtoArray::new(); + pa.on_block(h(0), H256::ZERO, 0); + pa.on_block(h(1), h(0), 1); + pa.on_block(h(2), h(1), 2); + pa.on_block(h(3), h(2), 3); + + let mut vt = VoteTracker::new(); + let mut att = HashMap::new(); + att.insert(0, make_attestation(h(3), 3)); + let mut deltas = vt.compute_deltas(&att, &pa); + pa.apply_score_changes(&mut deltas); + + // Prune to b(2), reset votes since indices changed + pa.prune(h(2)); + vt.reset(); + + // Add new block d(4) + pa.on_block(h(4), h(3), 4); + + // Vote for d + let mut att2 = HashMap::new(); + att2.insert(0, make_attestation(h(4), 4)); + let mut deltas = vt.compute_deltas(&att2, &pa); + pa.apply_score_changes(&mut deltas); + + assert_eq!(pa.find_head(h(2)), h(4)); + } + + // ==================== Threshold tests ==================== + + #[test] + fn threshold_stops_at_branch_below_min_score() { + // anchor(0) + // | + // a(1) + // / \ + // b(2) c(2) + // 2 votes for b, 1 vote for c → threshold=2 stops at b, threshold=3 stops at a + let mut pa = ProtoArray::new(); + pa.on_block(h(0), H256::ZERO, 0); + pa.on_block(h(1), h(0), 1); + pa.on_block(h(2), h(1), 2); // b + pa.on_block(h(3), h(1), 2); // c + + let mut vt = VoteTracker::new(); + let mut att = HashMap::new(); + att.insert(0, make_attestation(h(2), 2)); + att.insert(1, make_attestation(h(2), 2)); + att.insert(2, make_attestation(h(3), 2)); + let mut deltas = vt.compute_deltas(&att, &pa); + pa.apply_score_changes(&mut deltas); + + // No threshold → follows best_child to b (weight 2 > c weight 1) + assert_eq!(pa.find_head_with_threshold(h(0), 0), h(2)); + // Threshold=2 → b meets it (weight=2), so head=b + assert_eq!(pa.find_head_with_threshold(h(0), 2), h(2)); + // Threshold=3 → b doesn't meet it (weight=2 < 3), stop at a + assert_eq!(pa.find_head_with_threshold(h(0), 3), h(1)); + } + + #[test] + fn threshold_returns_justified_when_no_child_qualifies() { + // anchor(0) -> a(1) + // 1 vote for a, threshold=2 → stop at anchor + let mut pa = ProtoArray::new(); + pa.on_block(h(0), H256::ZERO, 0); + pa.on_block(h(1), h(0), 1); + + let mut vt = VoteTracker::new(); + let mut att = HashMap::new(); + att.insert(0, make_attestation(h(1), 1)); + let mut deltas = vt.compute_deltas(&att, &pa); + pa.apply_score_changes(&mut deltas); + + assert_eq!(pa.find_head_with_threshold(h(0), 2), h(0)); + } + + #[test] + fn threshold_walks_deep_chain_until_weight_drops() { + // anchor(0) -> a(1) -> b(2) -> c(3) + // 3 votes for c, threshold=3 → walks all the way to c + // Then move 1 vote to b: c has weight=2, b has weight=3 + // threshold=3 → walks to b but stops before c + let mut pa = ProtoArray::new(); + pa.on_block(h(0), H256::ZERO, 0); + pa.on_block(h(1), h(0), 1); + pa.on_block(h(2), h(1), 2); + pa.on_block(h(3), h(2), 3); + + let mut vt = VoteTracker::new(); + let mut att = HashMap::new(); + att.insert(0, make_attestation(h(3), 3)); + att.insert(1, make_attestation(h(3), 3)); + att.insert(2, make_attestation(h(3), 3)); + let mut deltas = vt.compute_deltas(&att, &pa); + pa.apply_score_changes(&mut deltas); + + assert_eq!(pa.find_head_with_threshold(h(0), 3), h(3)); + + // Move validator 2 to vote for b instead of c + let mut att2 = HashMap::new(); + att2.insert(0, make_attestation(h(3), 3)); + att2.insert(1, make_attestation(h(3), 3)); + att2.insert(2, make_attestation(h(2), 2)); + let mut deltas = vt.compute_deltas(&att2, &pa); + pa.apply_score_changes(&mut deltas); + + // c now has weight=2 (below threshold=3), b has weight=3 (meets threshold) + assert_eq!(pa.find_head_with_threshold(h(0), 3), h(2)); + } +} diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 56a10d49..4b6fe231 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -18,7 +18,7 @@ use spawned_concurrency::protocol; use spawned_concurrency::tasks::{Actor, ActorRef, ActorStart, Context, Handler, send_after}; use tracing::{error, info, trace, warn}; -use crate::store::StoreError; +use crate::store::{ForkChoice, StoreError}; pub(crate) mod fork_choice_tree; pub mod key_manager; @@ -44,8 +44,10 @@ impl BlockChain { metrics::set_is_aggregator(is_aggregator); let genesis_time = store.config().genesis_time; let key_manager = key_manager::KeyManager::new(validator_keys); + let fork_choice = ForkChoice::from_store(&store); let handle = BlockChainServer { store, + fork_choice, p2p: None, key_manager, pending_blocks: HashMap::new(), @@ -78,6 +80,9 @@ impl BlockChain { pub struct BlockChainServer { store: Store, + /// Incremental fork choice state (proto-array + vote tracker). + fork_choice: ForkChoice, + // P2P protocol ref (set via InitP2P message) p2p: Option, @@ -123,6 +128,7 @@ impl BlockChainServer { // Tick the store first - this accepts attestations at interval 0 if we have a proposal let new_aggregates = store::on_tick( &mut self.store, + &mut self.fork_choice, timestamp_ms, proposer_validator_id.is_some(), self.is_aggregator, @@ -213,10 +219,13 @@ impl BlockChainServer { info!(%slot, %validator_id, "We are the proposer for this slot"); // Build the block with attestation signatures - let Ok((block, attestation_signatures)) = - store::produce_block_with_signatures(&mut self.store, slot, validator_id) - .inspect_err(|err| error!(%slot, %validator_id, %err, "Failed to build block")) - else { + let Ok((block, attestation_signatures)) = store::produce_block_with_signatures( + &mut self.store, + &mut self.fork_choice, + slot, + validator_id, + ) + .inspect_err(|err| error!(%slot, %validator_id, %err, "Failed to build block")) else { return; }; @@ -280,7 +289,12 @@ impl BlockChainServer { signed_block: SignedBlockWithAttestation, ) -> Result<(), StoreError> { let validator_ids = self.key_manager.validator_ids(); - store::on_block(&mut self.store, signed_block, &validator_ids)?; + store::on_block( + &mut self.store, + &mut self.fork_choice, + signed_block, + &validator_ids, + )?; metrics::update_head_slot(self.store.head_slot()); metrics::update_latest_justified_slot(self.store.latest_justified().slot); metrics::update_latest_finalized_slot(self.store.latest_finalized().slot); diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index 03bd4f08..635878a7 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -1,6 +1,7 @@ use std::collections::{HashMap, HashSet}; use ethlambda_crypto::aggregate_signatures; +use ethlambda_fork_choice::{ProtoArray, VoteTracker}; use ethlambda_state_transition::{ is_proposer, process_block, process_slots, slot_is_justifiable_after, }; @@ -24,6 +25,89 @@ use tracing::{info, trace, warn}; use crate::{INTERVALS_PER_SLOT, MILLISECONDS_PER_INTERVAL, MILLISECONDS_PER_SLOT, metrics}; +/// Per-pool fork choice state: proto-array tree + vote tracker. +/// +/// Each instance tracks one attestation pool independently. +struct ForkChoiceState { + proto_array: ProtoArray, + vote_tracker: VoteTracker, +} + +impl ForkChoiceState { + /// Compute vote deltas and apply them to the proto-array in one step. + fn apply_attestations(&mut self, attestations: &HashMap) { + let mut deltas = self + .vote_tracker + .compute_deltas(attestations, &self.proto_array); + self.proto_array.apply_score_changes(&mut deltas); + } +} + +/// In-memory fork choice state managed alongside the Store. +/// +/// Wraps two independent `ForkChoiceState` instances: +/// - `head`: fed with known attestations, used for head selection +/// - `safe_target`: fed with known + new attestations, used for safe target with 2/3 threshold +/// +/// Both share the same block tree topology but track votes independently. +pub struct ForkChoice { + head: ForkChoiceState, + safe_target: ForkChoiceState, +} + +impl ForkChoice { + /// Build fork choice from the current live chain in the store. + pub fn from_store(store: &Store) -> Self { + let proto_array = Self::build_proto_array(store); + + let mut head = ForkChoiceState { + proto_array: proto_array.clone(), + vote_tracker: VoteTracker::new(), + }; + let mut safe_target = ForkChoiceState { + proto_array, + vote_tracker: VoteTracker::new(), + }; + + // Initialize weights with current known attestations + let attestations = store.extract_latest_known_attestations(); + head.apply_attestations(&attestations); + safe_target.apply_attestations(&attestations); + + Self { head, safe_target } + } + + fn build_proto_array(store: &Store) -> ProtoArray { + let mut proto_array = ProtoArray::new(); + + let blocks = store.get_live_chain(); + let mut sorted: Vec<_> = blocks.into_iter().collect(); + sorted.sort_by_key(|(_, (slot, _))| *slot); + + for (root, (slot, parent_root)) in sorted { + proto_array.on_block(root, parent_root, slot); + } + + proto_array + } + + /// Register a new block in both proto-arrays. + fn on_block(&mut self, root: H256, parent_root: H256, slot: u64) { + self.head.proto_array.on_block(root, parent_root, slot); + self.safe_target + .proto_array + .on_block(root, parent_root, slot); + } + + /// Prune both proto-arrays and reset both vote trackers on finalization. + fn prune_and_reset(&mut self, finalized_root: H256) { + self.head.proto_array.prune(finalized_root); + self.head.vote_tracker.reset(); + self.safe_target.proto_array.prune(finalized_root); + self.safe_target.vote_tracker.reset(); + } +} + const JUSTIFICATION_LOOKBACK_SLOTS: u64 = 3; /// Number of attestation committees per slot. @@ -37,27 +121,43 @@ fn compute_subnet_id(validator_id: u64) -> u64 { } /// Accept new aggregated payloads, promoting them to known for fork choice. -fn accept_new_attestations(store: &mut Store, log_tree: bool) { +fn accept_new_attestations(store: &mut Store, fc: &mut ForkChoice, log_tree: bool) { store.promote_new_aggregated_payloads(); metrics::update_latest_new_aggregated_payloads(store.new_aggregated_payloads_count()); metrics::update_latest_known_aggregated_payloads(store.known_aggregated_payloads_count()); - update_head(store, log_tree); + update_head(store, fc, log_tree); } /// Update the head based on the fork choice rule. /// -/// When `log_tree` is true, also computes block weights and logs an ASCII -/// fork choice tree to the terminal. -fn update_head(store: &mut Store, log_tree: bool) { - let blocks = store.get_live_chain(); +/// Uses proto-array for incremental head computation. When `log_tree` is true, +/// falls back to the full spec implementation for tree visualization. +fn update_head(store: &mut Store, fc: &mut ForkChoice, log_tree: bool) { let attestations = store.extract_latest_known_attestations(); + let justified_root = store.latest_justified().root; + + // Incremental fork choice via proto-array + fc.head.apply_attestations(&attestations); + let new_head = fc.head.proto_array.find_head(justified_root); + + // Debug oracle: verify proto-array matches spec implementation + #[cfg(debug_assertions)] + { + let blocks = store.get_live_chain(); + let (spec_head, _) = ethlambda_fork_choice::compute_lmd_ghost_head( + justified_root, + &blocks, + &attestations, + 0, + ); + assert_eq!( + new_head, spec_head, + "proto-array diverged from spec: proto={:?} spec={:?}", + new_head, spec_head + ); + } + let old_head = store.head(); - let (new_head, weights) = ethlambda_fork_choice::compute_lmd_ghost_head( - store.latest_justified().root, - &blocks, - &attestations, - 0, - ); if let Some(depth) = reorg_depth(old_head, new_head, store) { metrics::inc_fork_choice_reorgs(); metrics::observe_fork_choice_reorg_depth(depth); @@ -88,6 +188,12 @@ fn update_head(store: &mut Store, log_tree: bool) { } if log_tree { + let blocks = store.get_live_chain(); + let weights = ethlambda_fork_choice::compute_block_weights( + store.latest_justified().slot, + &blocks, + &attestations, + ); let tree = crate::fork_choice_tree::format_fork_choice_tree( &blocks, &weights, @@ -100,13 +206,15 @@ fn update_head(store: &mut Store, log_tree: bool) { } /// Update the safe target for attestation. -fn update_safe_target(store: &mut Store) { +/// +/// Uses the safe-target proto-array with the merged attestation pool (known + new) +/// and a 2/3 majority threshold. +fn update_safe_target(store: &mut Store, fc: &mut ForkChoice) { let head_state = store.get_state(&store.head()).expect("head state exists"); let num_validators = head_state.validators.len() as u64; - let min_target_score = (num_validators * 2).div_ceil(3); + let min_target_score = (num_validators * 2).div_ceil(3) as i64; - let blocks = store.get_live_chain(); // Merge both attestation pools (keys only — skip payload deserialization). // At interval 3 the migration (interval 4) hasn't run yet, so attestations // that entered "known" directly (proposer's own attestation in block body, @@ -116,12 +224,33 @@ fn update_safe_target(store: &mut Store) { .chain(store.iter_new_aggregated_payload_keys()) .collect(); let attestations = store.extract_latest_attestations(all_keys.into_iter()); - let (safe_target, _weights) = ethlambda_fork_choice::compute_lmd_ghost_head( - store.latest_justified().root, - &blocks, - &attestations, - min_target_score, - ); + + let justified_root = store.latest_justified().root; + + // Incremental fork choice via safe-target proto-array + fc.safe_target.apply_attestations(&attestations); + let safe_target = fc + .safe_target + .proto_array + .find_head_with_threshold(justified_root, min_target_score); + + // Debug oracle: verify proto-array matches spec implementation + #[cfg(debug_assertions)] + { + let blocks = store.get_live_chain(); + let (spec_safe_target, _) = ethlambda_fork_choice::compute_lmd_ghost_head( + justified_root, + &blocks, + &attestations, + min_target_score as u64, + ); + assert_eq!( + safe_target, spec_safe_target, + "proto-array safe-target diverged from spec: proto={:?} spec={:?}", + safe_target, spec_safe_target + ); + } + store.set_safe_target(safe_target); } @@ -299,6 +428,7 @@ fn validate_attestation_data(store: &Store, data: &AttestationData) -> Result<() /// interval = store.time() % INTERVALS_PER_SLOT pub fn on_tick( store: &mut Store, + fc: &mut ForkChoice, timestamp_ms: u64, has_proposal: bool, is_aggregator: bool, @@ -333,7 +463,7 @@ pub fn on_tick( 0 => { // Start of slot - process attestations if proposal exists if should_signal_proposal { - accept_new_attestations(store, false); + accept_new_attestations(store, fc, false); } } 1 => { @@ -347,11 +477,11 @@ pub fn on_tick( } 3 => { // Update safe target for validators - update_safe_target(store); + update_safe_target(store, fc); } 4 => { // End of slot - accept accumulated attestations and log tree - accept_new_attestations(store, true); + accept_new_attestations(store, fc, true); } _ => unreachable!("slots only have 5 intervals"), } @@ -523,10 +653,11 @@ pub fn on_gossip_aggregated_attestation( /// and stores them for future block building. Use this for all production paths. pub fn on_block( store: &mut Store, + fc: &mut ForkChoice, signed_block: SignedBlockWithAttestation, local_validator_ids: &[u64], ) -> Result<(), StoreError> { - on_block_core(store, signed_block, true, local_validator_ids) + on_block_core(store, fc, signed_block, true, local_validator_ids) } /// Process a new block without signature verification. @@ -535,9 +666,10 @@ pub fn on_block( /// where signatures are absent or irrelevant (e.g., fork choice spec tests). pub fn on_block_without_verification( store: &mut Store, + fc: &mut ForkChoice, signed_block: SignedBlockWithAttestation, ) -> Result<(), StoreError> { - on_block_core(store, signed_block, false, &[]) + on_block_core(store, fc, signed_block, false, &[]) } /// Core block processing logic. @@ -546,6 +678,7 @@ pub fn on_block_without_verification( /// for future block building. When false, all signature checks are skipped. fn on_block_core( store: &mut Store, + fc: &mut ForkChoice, signed_block: SignedBlockWithAttestation, verify: bool, local_validator_ids: &[u64], @@ -598,6 +731,15 @@ fn on_block_core( store.update_checkpoints(ForkCheckpoints::new(store.head(), justified, finalized)); } + // Prune both proto-arrays on finalization advance and reset vote trackers + // (must happen after store.update_checkpoints which prunes storage) + if let Some(finalized) = finalized { + fc.prune_and_reset(finalized.root); + } + + // Register block in both proto-arrays for incremental fork choice + fc.on_block(block_root, block.parent_root, slot); + // Store signed block and state store.insert_signed_block(block_root, signed_block.clone()); store.insert_state(block_root, post_state); @@ -642,7 +784,7 @@ fn on_block_core( // Update forkchoice head based on new block and attestations // IMPORTANT: This must happen BEFORE processing proposer attestation // to prevent the proposer from gaining circular weight advantage. - update_head(store, false); + update_head(store, fc, false); if !verify { // Without sig verification, insert directly with a dummy proof @@ -775,15 +917,15 @@ pub fn produce_attestation_data(store: &Store, slot: u64) -> AttestationData { /// /// Ensures store is up-to-date and processes any pending attestations /// before returning the canonical head. -fn get_proposal_head(store: &mut Store, slot: u64) -> H256 { +fn get_proposal_head(store: &mut Store, fc: &mut ForkChoice, slot: u64) -> H256 { // Calculate time corresponding to this slot let slot_time_ms = store.config().genesis_time * 1000 + slot * MILLISECONDS_PER_SLOT; // Advance time to current slot (ticking intervals) - on_tick(store, slot_time_ms, true, false); + on_tick(store, fc, slot_time_ms, true, false); // Process any pending attestations before proposal - accept_new_attestations(store, false); + accept_new_attestations(store, fc, false); store.head() } @@ -794,11 +936,12 @@ fn get_proposal_head(store: &mut Store, slot: u64) -> H256 { /// with `block.body.attestations`. pub fn produce_block_with_signatures( store: &mut Store, + fc: &mut ForkChoice, slot: u64, validator_index: u64, ) -> Result<(Block, Vec), StoreError> { // Get parent block and state to build upon - let head_root = get_proposal_head(store, slot); + let head_root = get_proposal_head(store, fc, slot); let head_state = store .get_state(&head_root) .ok_or(StoreError::MissingParentState { diff --git a/crates/blockchain/tests/forkchoice_spectests.rs b/crates/blockchain/tests/forkchoice_spectests.rs index e7222c34..7e7c0484 100644 --- a/crates/blockchain/tests/forkchoice_spectests.rs +++ b/crates/blockchain/tests/forkchoice_spectests.rs @@ -39,6 +39,7 @@ fn run(path: &Path) -> datatest_stable::Result<()> { let genesis_time = anchor_state.config.genesis_time; let backend = Arc::new(InMemoryBackend::new()); let mut store = Store::get_forkchoice_store(backend, anchor_state, anchor_block); + let mut fc = store::ForkChoice::from_store(&store); // Block registry: maps block labels to their roots let mut block_registry: HashMap = HashMap::new(); @@ -62,8 +63,9 @@ fn run(path: &Path) -> datatest_stable::Result<()> { + signed_block.message.block.slot * MILLISECONDS_PER_SLOT; // NOTE: the has_proposal argument is set to true, following the spec - store::on_tick(&mut store, block_time_ms, true, false); - let result = store::on_block_without_verification(&mut store, signed_block); + store::on_tick(&mut store, &mut fc, block_time_ms, true, false); + let result = + store::on_block_without_verification(&mut store, &mut fc, signed_block); match (result.is_ok(), step.valid) { (true, false) => { @@ -87,7 +89,7 @@ fn run(path: &Path) -> datatest_stable::Result<()> { "tick" => { let timestamp_ms = step.time.expect("tick step missing time") * 1000; // NOTE: the has_proposal argument is set to false, following the spec - store::on_tick(&mut store, timestamp_ms, false, false); + store::on_tick(&mut store, &mut fc, timestamp_ms, false, false); } other => { // Fail for unsupported step types for now diff --git a/crates/blockchain/tests/signature_spectests.rs b/crates/blockchain/tests/signature_spectests.rs index c35c9ebe..1986164f 100644 --- a/crates/blockchain/tests/signature_spectests.rs +++ b/crates/blockchain/tests/signature_spectests.rs @@ -52,10 +52,11 @@ fn run(path: &Path) -> datatest_stable::Result<()> { // Advance time to the block's slot let block_time_ms = genesis_time * 1000 + signed_block.message.block.slot * MILLISECONDS_PER_SLOT; - store::on_tick(&mut st, block_time_ms, true, false); + let mut fc = store::ForkChoice::from_store(&st); + store::on_tick(&mut st, &mut fc, block_time_ms, true, false); // Process the block (this includes signature verification) - let result = store::on_block(&mut st, signed_block, &[]); + let result = store::on_block(&mut st, &mut fc, signed_block, &[]); // Step 3: Check that it succeeded or failed as expected match (result.is_ok(), test.expect_exception.as_ref()) { diff --git a/docs/proto_array_fork_choice.md b/docs/proto_array_fork_choice.md new file mode 100644 index 00000000..5a4ae85e --- /dev/null +++ b/docs/proto_array_fork_choice.md @@ -0,0 +1,107 @@ +# Proto-array fork choice + +## The problem + +Every time `update_head` ran (up to 2x per slot), it recomputed the LMD-GHOST head from scratch: for every validator attestation, walk backward through the entire chain accumulating weights, then pick the heaviest path. That's **O(validators × chain_depth)** per call. + +## How proto-array works + +Proto-array is a flat array of nodes (one per block) that maintains: +- **Subtree weights**: how many votes each node's subtree has +- **`best_child` pointers**: which child has the heaviest subtree + +Instead of recomputing everything, it works incrementally: + +1. **`compute_deltas`**: Compare each validator's current vote against their previous vote. If validator 5 moved from block A to block B, produce `A: -1, B: +1`. Unchanged votes produce zero deltas. This is **O(changed_votes)**. + +2. **`apply_score_changes`**: Single backward pass over the array — update each node's weight by its delta, propagate to parent, update `best_child`. This is **O(nodes)**. + +3. **`find_head`**: Follow `best_child` pointers from the justified root to a leaf. This is **O(depth)**. + +### Complexity comparison + +| Operation | Spec implementation | Proto-array | +|-----------|-------------------|-------------| +| `update_head` | O(validators × chain_depth) | O(changed_votes + nodes + depth) | +| `update_safe_target` | O(validators × chain_depth) | O(changed_votes + nodes + depth) | + +## Architecture + +### Two independent fork choice pools + +`update_head` and `update_safe_target` use **different attestation pools**: +- **Head**: only "known" attestations (already promoted through the pipeline) +- **Safe target**: "known" + "new" merged (the most complete picture available) + +A `VoteTracker` is stateful — it remembers each validator's last vote to compute deltas. If we fed both pools into the same tracker, the head computation would see attestations it shouldn't, corrupting weights. + +Solution: two independent `ForkChoiceState` instances, each with its own proto-array and vote tracker: + +``` +ForkChoice +├── head: ForkChoiceState ← known attestations only +│ ├── proto_array (same block tree) +│ └── vote_tracker (tracks known-pool votes) +└── safe_target: ForkChoiceState ← known + new attestations merged + ├── proto_array (same block tree) + └── vote_tracker (tracks merged-pool votes) +``` + +The `ForkChoice` wrapper ensures both proto-arrays stay in sync: +- `fc.on_block(root, parent, slot)` → registers the block in **both** trees +- `fc.prune_and_reset(finalized_root)` → prunes **both** trees and resets **both** vote trackers + +### Safe target threshold + +`update_safe_target` picks the deepest block with ≥ 2/3 validator support. This uses `find_head_with_threshold(root, min_score)` — same as `find_head` but stops walking down when the best child's weight drops below the threshold: + +```rust +while let Some(best_child_idx) = self.nodes[current_idx].best_child { + if self.nodes[best_child_idx].weight < min_score { + break; // no child meets the 2/3 threshold, stop here + } + current_idx = best_child_idx; +} +``` + +This works because `best_child` always points to the heaviest child. If even the heaviest doesn't meet the threshold, no child can. + +## Lifecycle + +### Startup + +`ForkChoice::from_store` builds both proto-arrays from the `LiveChain` table (sorted by slot for topological order), then applies the current known attestations to initialize weights. + +### Per-block (`on_block_core`) + +1. Run state transition +2. If finalization advanced: `fc.prune_and_reset(finalized_root)` — prune both trees, reset both vote trackers +3. `fc.on_block(root, parent, slot)` — register in both trees +4. `update_head` — apply known attestations to `fc.head`, find head + +### Per-slot tick + +| Interval | Action | Fork choice involvement | +|----------|--------|------------------------| +| 0 | Accept attestations if proposal exists | `fc.head` updated via `update_head` | +| 1 | Vote propagation | — | +| 2 | Aggregate committee signatures | — | +| 3 | Update safe target | `fc.safe_target` updated via `update_safe_target` | +| 4 | Accept accumulated attestations | `fc.head` updated via `update_head` | + +## Debug oracles + +Both `update_head` and `update_safe_target` have a `#[cfg(debug_assertions)]` block that runs the old spec implementation in parallel and asserts the result matches proto-array: + +- **Debug/test builds**: both implementations run and any divergence panics immediately +- **Release builds**: only proto-array runs (zero overhead from the oracle) + +This means the 27 fork choice spec tests validate proto-array correctness on every `update_head` call, and any `update_safe_target` call in debug builds is also verified. + +## Key files + +| File | What | +|------|------| +| `crates/blockchain/fork_choice/src/proto_array.rs` | `ProtoArray` (tree + weights), `VoteTracker` (delta computation) | +| `crates/blockchain/src/store.rs` | `ForkChoice` wrapper, `update_head`, `update_safe_target`, `on_block_core` | +| `crates/blockchain/src/lib.rs` | `BlockChainServer` owns the `ForkChoice` instance |