Skip to content

Commit

Permalink
HDiff in the hot DB - squashed
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed Jan 2, 2025
1 parent 29ad4fc commit d16f311
Show file tree
Hide file tree
Showing 16 changed files with 929 additions and 411 deletions.
4 changes: 2 additions & 2 deletions account_manager/src/validator/slashing_protection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pub fn cli_run<E: EthSpec>(
let slashing_protection_database =
SlashingDatabase::open_or_create(&slashing_protection_db_path).map_err(|e| {
format!(
"Unable to open database at {}: {:?}",
"Unable to open slashing protection database at {}: {:?}",
slashing_protection_db_path.display(),
e
)
Expand Down Expand Up @@ -198,7 +198,7 @@ pub fn cli_run<E: EthSpec>(
let slashing_protection_database = SlashingDatabase::open(&slashing_protection_db_path)
.map_err(|e| {
format!(
"Unable to open database at {}: {:?}",
"Unable to open slashing protection database at {}: {:?}",
slashing_protection_db_path.display(),
e
)
Expand Down
85 changes: 41 additions & 44 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ use std::fmt::Debug;
use std::fs;
use std::io::Write;
use std::sync::Arc;
use store::{Error as DBError, HotStateSummary, KeyValueStore, StoreOp};
use store::{Error as DBError, KeyValueStore, StoreOp};
use strum::AsRefStr;
use task_executor::JoinHandle;
use types::{
Expand Down Expand Up @@ -1455,52 +1455,49 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {

let distance = block.slot().as_u64().saturating_sub(state.slot().as_u64());
for _ in 0..distance {
let state_root = if parent.beacon_block.slot() == state.slot() {
// If it happens that `pre_state` has *not* already been advanced forward a single
// slot, then there is no need to compute the state root for this
// `per_slot_processing` call since that state root is already stored in the parent
// block.
parent.beacon_block.state_root()
} else {
// This is a new state we've reached, so stage it for storage in the DB.
// Computing the state root here is time-equivalent to computing it during slot
// processing, but we get early access to it.
let state_root = state.update_tree_hash_cache()?;

// Store the state immediately, marking it as temporary, and staging the deletion
// of its temporary status as part of the larger atomic operation.
let txn_lock = chain.store.hot_db.begin_rw_transaction();
let state_already_exists =
chain.store.load_hot_state_summary(&state_root)?.is_some();

let state_batch = if state_already_exists {
// If the state exists, it could be temporary or permanent, but in neither case
// should we rewrite it or store a new temporary flag for it. We *will* stage
// the temporary flag for deletion because it's OK to double-delete the flag,
// and we don't mind if another thread gets there first.
vec![]
let state_root =
if parent.beacon_block.slot() == state.slot() {
// If it happens that `pre_state` has *not* already been advanced forward a single
// slot, then there is no need to compute the state root for this
// `per_slot_processing` call since that state root is already stored in the parent
// block.
parent.beacon_block.state_root()
} else {
vec![
if state.slot() % T::EthSpec::slots_per_epoch() == 0 {
StoreOp::PutState(state_root, &state)
} else {
StoreOp::PutStateSummary(
state_root,
HotStateSummary::new(&state_root, &state)?,
)
},
StoreOp::PutStateTemporaryFlag(state_root),
]
};
chain
.store
.do_atomically_with_block_and_blobs_cache(state_batch)?;
drop(txn_lock);
// This is a new state we've reached, so stage it for storage in the DB.
// Computing the state root here is time-equivalent to computing it during slot
// processing, but we get early access to it.
let state_root = state.update_tree_hash_cache()?;

// Store the state immediately, marking it as temporary, and staging the deletion
// of its temporary status as part of the larger atomic operation.
// TODO(hdiff): Is it necessary to do this read tx now? Also why is it necessary to
// check that the summary exists at all? Are double writes common? Can this txn
// lock deadlock with the `do_atomically` call?
let txn_lock = chain.store.hot_db.begin_rw_transaction();
let state_already_exists =
chain.store.load_hot_state_summary(&state_root)?.is_some();

if state_already_exists {
// If the state exists, it could be temporary or permanent, but in neither case
// should we rewrite it or store a new temporary flag for it. We *will* stage
// the temporary flag for deletion because it's OK to double-delete the flag,
// and we don't mind if another thread gets there first.
} else {
let mut ops = vec![];
// Recycle store codepath to create a state summary and store the state / diff
chain.store.store_hot_state(&state_root, &state, &mut ops)?;
// Additionally write a temporary flag as part of the atomic write
ops.extend(chain.store.convert_to_kv_batch(vec![
StoreOp::PutStateTemporaryFlag(state_root),
])?);
chain.store.hot_db.do_atomically(ops)?;
}
drop(txn_lock);

confirmed_state_roots.push(state_root);
confirmed_state_roots.push(state_root);

state_root
};
state_root
};

if let Some(summary) = per_slot_processing(&mut state, Some(state_root), &chain.spec)? {
// Expose Prometheus metrics.
Expand Down
46 changes: 30 additions & 16 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ use std::time::Duration;
use store::{Error as StoreError, HotColdDB, ItemStore, KeyValueStoreOp};
use task_executor::{ShutdownReason, TaskExecutor};
use types::{
BeaconBlock, BeaconState, BlobSidecarList, ChainSpec, Checkpoint, Epoch, EthSpec,
FixedBytesExtended, Hash256, Signature, SignedBeaconBlock, Slot,
BeaconBlock, BeaconState, BlobSidecarList, ChainSpec, Epoch, EthSpec, FixedBytesExtended,
Hash256, Signature, SignedBeaconBlock, Slot,
};

/// An empty struct used to "witness" all the `BeaconChainTypes` traits. It has no user-facing
Expand Down Expand Up @@ -398,7 +398,11 @@ where
let retain_historic_states = self.chain_config.reconstruct_historic_states;
self.pending_io_batch.push(
store
.init_anchor_info(genesis.beacon_block.message(), retain_historic_states)
.init_anchor_info(
genesis.beacon_block.message(),
Slot::new(0),
retain_historic_states,
)
.map_err(|e| format!("Failed to initialize genesis anchor: {:?}", e))?,
);
self.pending_io_batch.push(
Expand Down Expand Up @@ -518,6 +522,14 @@ where
}
}

debug!(
log,
"Storing split from weak subjectivity state";
"slot" => weak_subj_slot,
"state_root" => ?weak_subj_state_root,
"block_root" => ?weak_subj_block_root,
);

// Set the store's split point *before* storing genesis so that genesis is stored
// immediately in the freezer DB.
store.set_split(weak_subj_slot, weak_subj_state_root, weak_subj_block_root);
Expand All @@ -539,6 +551,19 @@ where
.do_atomically(block_root_batch)
.map_err(|e| format!("Error writing frozen block roots: {e:?}"))?;

// Write the anchor to memory before calling `put_state` otherwise hot hdiff can't store
// states that do not align with the start_slot grid
let retain_historic_states = self.chain_config.reconstruct_historic_states;
self.pending_io_batch.push(
store
.init_anchor_info(
weak_subj_block.message(),
weak_subj_slot,
retain_historic_states,
)
.map_err(|e| format!("Failed to initialize anchor info: {:?}", e))?,
);

// Write the state, block and blobs non-atomically, it doesn't matter if they're forgotten
// about on a crash restart.
store
Expand All @@ -548,6 +573,8 @@ where
weak_subj_state.clone(),
)
.map_err(|e| format!("Failed to set checkpoint state as finalized state: {:?}", e))?;
// Note: post hot hdiff must update the anchor info before attempting to put_state otherwise
// the write will fail if the weak_subj_slot is not aligned with the snapshot moduli.
store
.put_state(&weak_subj_state_root, &weak_subj_state)
.map_err(|e| format!("Failed to store weak subjectivity state: {e:?}"))?;
Expand All @@ -563,13 +590,7 @@ where
// Stage the database's metadata fields for atomic storage when `build` is called.
// This prevents the database from restarting in an inconsistent state if the anchor
// info or split point is written before the `PersistedBeaconChain`.
let retain_historic_states = self.chain_config.reconstruct_historic_states;
self.pending_io_batch.push(store.store_split_in_batch());
self.pending_io_batch.push(
store
.init_anchor_info(weak_subj_block.message(), retain_historic_states)
.map_err(|e| format!("Failed to initialize anchor info: {:?}", e))?,
);
self.pending_io_batch.push(
store
.init_blob_info(weak_subj_block.slot())
Expand All @@ -581,13 +602,6 @@ where
.map_err(|e| format!("Failed to initialize data column info: {:?}", e))?,
);

// Store pruning checkpoint to prevent attempting to prune before the anchor state.
self.pending_io_batch
.push(store.pruning_checkpoint_store_op(Checkpoint {
root: weak_subj_block_root,
epoch: weak_subj_state.slot().epoch(E::slots_per_epoch()),
}));

let snapshot = BeaconSnapshot {
beacon_block_root: weak_subj_block_root,
beacon_block: Arc::new(weak_subj_block),
Expand Down
51 changes: 37 additions & 14 deletions beacon_node/beacon_chain/src/migrate.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::errors::BeaconChainError;
use crate::summaries_dag::{
BlockSummariesDAG, DAGBlockSummary, DAGStateSummaryV22, Error as SummariesDagError,
BlockSummariesDAG, DAGBlockSummary, DAGStateSummary, Error as SummariesDagError,
StateSummariesDAG,
};
use parking_lot::Mutex;
use slog::{debug, error, info, warn, Logger};
use std::collections::{HashMap, HashSet};
use std::collections::HashSet;
use std::mem;
use std::sync::{mpsc, Arc};
use std::thread;
Expand Down Expand Up @@ -494,7 +494,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
.load_hot_state_summaries()?
.into_iter()
.map(|(state_root, summary)| (state_root, summary.into()))
.collect::<Vec<(Hash256, DAGStateSummaryV22)>>();
.collect::<Vec<(Hash256, DAGStateSummary)>>();

// De-duplicate block roots to reduce block reads below
let summary_block_roots = HashSet::<Hash256>::from_iter(
Expand Down Expand Up @@ -528,18 +528,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
})
.collect::<Result<Vec<_>, BeaconChainError>>()?;

let parent_block_roots = blocks
.iter()
.map(|(block_root, block)| (*block_root, block.parent_root))
.collect::<HashMap<Hash256, Hash256>>();

(
StateSummariesDAG::new_from_v22(
state_summaries,
parent_block_roots,
split_state_root,
)
.map_err(PruningError::SummariesDagError)?,
StateSummariesDAG::new(state_summaries),
BlockSummariesDAG::new(&blocks),
)
};
Expand Down Expand Up @@ -585,10 +575,17 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
.min()
.ok_or(PruningError::EmptyFinalizedBlocks)?;

// Compute the set of finalized state roots that we must keep to make the dynamic HDiff system
// work.
let required_finalized_diff_state_slots = store
.hierarchy_hot
.closest_layer_points(new_finalized_slot, store.hot_hdiff_start_slot());

// We don't know which blocks are shared among abandoned chains, so we buffer and delete
// everything in one fell swoop.
let mut blocks_to_prune: HashSet<Hash256> = HashSet::new();
let mut states_to_prune: HashSet<(Slot, Hash256)> = HashSet::new();
let mut kept_summaries_for_hdiff = vec![];

for (slot, summaries) in state_summaries_dag.summaries_by_slot_ascending() {
for (state_root, summary) in summaries {
Expand All @@ -597,6 +594,30 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
// Keep this state is the post state of a viable head, or a state advance from a
// viable head.
false
} else if required_finalized_diff_state_slots.contains(&slot) {
// Keep this state and diff as it's necessary for the finalized portion of the
// HDiff links. `required_finalized_diff_state_slots` tracks the set of slots on
// each diff layer, and by checking `newly_finalized_state_roots` which only
// keep those on the finalized canonical chain. Checking the state root ensures
// we avoid lingering forks.

// In the diagram below, `o` are diffs by slot that we must keep. In the prior
// finalized section there's only one chain so we preserve them unconditionally.
// For the newly finalized chain, we check which of is canonical and only keep
// those. Slots below `min_finalized_state_slot` we don't have canonical
// information so we assume they are part of the finalized pruned chain.
//
// /-----o----
// o-------o------/-------o----
if slot < newly_finalized_states_min_slot
|| newly_finalized_state_roots.contains(&state_root)
{
// Track kept summaries to debug hdiff inconsistencies with "Extra pruning information"
kept_summaries_for_hdiff.push((state_root, slot));
false
} else {
true
}
} else {
// Everything else, prune
true
Expand Down Expand Up @@ -650,6 +671,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
"newly_finalized_blocks_min_slot" => newly_finalized_blocks_min_slot,
"newly_finalized_state_roots" => newly_finalized_state_roots.len(),
"newly_finalized_states_min_slot" => newly_finalized_states_min_slot,
"required_finalized_diff_state_slots" => ?required_finalized_diff_state_slots,
"kept_summaries_for_hdiff" => ?kept_summaries_for_hdiff,
"state_summaries_count" => state_summaries_dag.summaries_count(),
"finalized_and_descendant_block_roots" => finalized_and_descendant_block_roots.len(),
"blocks_to_prune_count" => blocks_to_prune.len(),
Expand Down
9 changes: 9 additions & 0 deletions beacon_node/beacon_chain/src/schema_change.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
mod migration_schema_v20;
mod migration_schema_v21;
mod migration_schema_v22;
mod migration_schema_v23;

use crate::beacon_chain::BeaconChainTypes;
use slog::Logger;
Expand Down Expand Up @@ -59,6 +60,14 @@ pub fn migrate_schema<T: BeaconChainTypes>(
// bumped inside the upgrade_to_v22 fn
migration_schema_v22::upgrade_to_v22::<T>(db.clone(), genesis_state_root, log)
}
(SchemaVersion(22), SchemaVersion(23)) => {
let ops = migration_schema_v23::upgrade_to_v23::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(23), SchemaVersion(22)) => {
let ops = migration_schema_v23::downgrade_to_v22::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
// Anything else is an error.
(_, _) => Err(HotColdDBError::UnsupportedSchemaVersion {
target_version: to,
Expand Down
Loading

0 comments on commit d16f311

Please sign in to comment.