Skip to content

Commit

Permalink
Fix custodial peer assumption on lookup custody requests
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed Jan 17, 2025
1 parent 06329ec commit 4278402
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 52 deletions.
17 changes: 9 additions & 8 deletions beacon_node/network/src/sync/block_lookups/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use crate::sync::network_context::{LookupRequestResult, SyncNetworkContext};
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::BeaconChainTypes;
use lighthouse_network::service::api_types::Id;
use parking_lot::RwLock;
use std::collections::HashSet;
use std::sync::Arc;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{DataColumnSidecarList, SignedBeaconBlock};
Expand Down Expand Up @@ -41,7 +43,7 @@ pub trait RequestState<T: BeaconChainTypes> {
fn make_request(
&self,
id: Id,
peer_id: PeerId,
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
expected_blobs: usize,
cx: &mut SyncNetworkContext<T>,
) -> Result<LookupRequestResult, LookupRequestError>;
Expand Down Expand Up @@ -76,11 +78,11 @@ impl<T: BeaconChainTypes> RequestState<T> for BlockRequestState<T::EthSpec> {
fn make_request(
&self,
id: SingleLookupId,
peer_id: PeerId,
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
_: usize,
cx: &mut SyncNetworkContext<T>,
) -> Result<LookupRequestResult, LookupRequestError> {
cx.block_lookup_request(id, peer_id, self.requested_block_root)
cx.block_lookup_request(id, lookup_peers, self.requested_block_root)
.map_err(LookupRequestError::SendFailedNetwork)
}

Expand Down Expand Up @@ -124,11 +126,11 @@ impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState<T::EthSpec> {
fn make_request(
&self,
id: Id,
peer_id: PeerId,
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
expected_blobs: usize,
cx: &mut SyncNetworkContext<T>,
) -> Result<LookupRequestResult, LookupRequestError> {
cx.blob_lookup_request(id, peer_id, self.block_root, expected_blobs)
cx.blob_lookup_request(id, lookup_peers, self.block_root, expected_blobs)
.map_err(LookupRequestError::SendFailedNetwork)
}

Expand Down Expand Up @@ -172,12 +174,11 @@ impl<T: BeaconChainTypes> RequestState<T> for CustodyRequestState<T::EthSpec> {
fn make_request(
&self,
id: Id,
// TODO(das): consider selecting peers that have custody but are in this set
_peer_id: PeerId,
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
_: usize,
cx: &mut SyncNetworkContext<T>,
) -> Result<LookupRequestResult, LookupRequestError> {
cx.custody_lookup_request(id, self.block_root)
cx.custody_lookup_request(id, self.block_root, lookup_peers)
.map_err(LookupRequestError::SendFailedNetwork)
}

Expand Down
4 changes: 2 additions & 2 deletions beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
.find(|(_, l)| l.block_root() == parent_chain_tip)
{
cx.send_sync_message(SyncMessage::AddPeersForceRangeSync {
peers: lookup.all_peers().copied().collect(),
peers: lookup.all_peers(),
head_slot: tip_lookup.peek_downloaded_block_slot(),
head_root: parent_chain_tip,
});
Expand Down Expand Up @@ -682,7 +682,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
lookup.continue_requests(cx)
}
Action::ParentUnknown { parent_root } => {
let peers = lookup.all_peers().copied().collect::<Vec<_>>();
let peers = lookup.all_peers();
lookup.set_awaiting_parent(parent_root);
debug!(self.log, "Marking lookup as awaiting parent"; "id" => lookup.id, "block_root" => ?block_root, "parent_root" => ?parent_root);
self.search_parent_of_child(parent_root, block_root, &peers, cx);
Expand Down
49 changes: 16 additions & 33 deletions beacon_node/network/src/sync/block_lookups/single_block_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::sync::network_context::{
use beacon_chain::{BeaconChainTypes, BlockProcessStatus};
use derivative::Derivative;
use lighthouse_network::service::api_types::Id;
use rand::seq::IteratorRandom;
use parking_lot::RwLock;
use std::collections::HashSet;
use std::fmt::Debug;
use std::sync::Arc;
Expand All @@ -33,8 +33,6 @@ pub enum LookupRequestError {
/// The failed attempts were primarily due to processing failures.
cannot_process: bool,
},
/// No peers left to serve this lookup
NoPeers,
/// Error sending event to network
SendFailedNetwork(RpcRequestSendError),
/// Error sending event to processor
Expand Down Expand Up @@ -63,9 +61,12 @@ pub struct SingleBlockLookup<T: BeaconChainTypes> {
pub id: Id,
pub block_request_state: BlockRequestState<T::EthSpec>,
pub component_requests: ComponentRequests<T::EthSpec>,
/// Peers that claim to have imported this set of block components
/// Peers that claim to have imported this set of block components. This state is shared with
/// the custody request to have an updated view of the peers that claim to have imported the
/// block associated with this lookup. The peer set of a lookup can change rapidly, and faster
/// than the lifetime of a custody request.
#[derivative(Debug(format_with = "fmt_peer_set_as_len"))]
peers: HashSet<PeerId>,
peers: Arc<RwLock<HashSet<PeerId>>>,
block_root: Hash256,
awaiting_parent: Option<Hash256>,
created: Instant,
Expand All @@ -92,7 +93,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
id,
block_request_state: BlockRequestState::new(requested_block_root),
component_requests: ComponentRequests::WaitingForBlock,
peers: HashSet::from_iter(peers.iter().copied()),
peers: Arc::new(RwLock::new(HashSet::from_iter(peers.iter().copied()))),
block_root: requested_block_root,
awaiting_parent,
created: Instant::now(),
Expand Down Expand Up @@ -283,24 +284,11 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
return Err(LookupRequestError::TooManyAttempts { cannot_process });
}

let Some(peer_id) = self.use_rand_available_peer() else {
// Allow lookup to not have any peers and do nothing. This is an optimization to not
// lose progress of lookups created from a block with unknown parent before we receive
// attestations for said block.
// Lookup sync event safety: If a lookup requires peers to make progress, and does
// not receive any new peers for some time it will be dropped. If it receives a new
// peer it must attempt to make progress.
R::request_state_mut(self)
.map_err(|e| LookupRequestError::BadState(e.to_owned()))?
.get_state_mut()
.update_awaiting_download_status("no peers");
return Ok(());
};

let peers = self.peers.clone();
let request = R::request_state_mut(self)
.map_err(|e| LookupRequestError::BadState(e.to_owned()))?;

match request.make_request(id, peer_id, expected_blobs, cx)? {
match request.make_request(id, peers, expected_blobs, cx)? {
LookupRequestResult::RequestSent(req_id) => {
// Lookup sync event safety: If make_request returns `RequestSent`, we are
// guaranteed that `BlockLookups::on_download_response` will be called exactly
Expand Down Expand Up @@ -348,29 +336,24 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
}

/// Get all unique peers that claim to have imported this set of block components
pub fn all_peers(&self) -> impl Iterator<Item = &PeerId> + '_ {
self.peers.iter()
pub fn all_peers(&self) -> Vec<PeerId> {
self.peers.read().iter().copied().collect()
}

/// Add peer to all request states. The peer must be able to serve this request.
/// Returns true if the peer was newly inserted into some request state.
pub fn add_peer(&mut self, peer_id: PeerId) -> bool {
self.peers.insert(peer_id)
self.peers.write().insert(peer_id)
}

/// Remove peer from available peers.
pub fn remove_peer(&mut self, peer_id: &PeerId) {
self.peers.remove(peer_id);
self.peers.write().remove(peer_id);
}

/// Returns true if this lookup has zero peers
pub fn has_no_peers(&self) -> bool {
self.peers.is_empty()
}

/// Selects a random peer from available peers if any
fn use_rand_available_peer(&mut self) -> Option<PeerId> {
self.peers.iter().choose(&mut rand::thread_rng()).copied()
self.peers.read().is_empty()
}
}

Expand Down Expand Up @@ -689,8 +672,8 @@ impl<T: Clone> std::fmt::Debug for State<T> {
}

fn fmt_peer_set_as_len(
peer_set: &HashSet<PeerId>,
peer_set: &Arc<RwLock<HashSet<PeerId>>>,
f: &mut std::fmt::Formatter,
) -> Result<(), std::fmt::Error> {
write!(f, "{}", peer_set.len())
write!(f, "{}", peer_set.read().len())
}
41 changes: 37 additions & 4 deletions beacon_node/network/src/sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ use lighthouse_network::service::api_types::{
DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId,
};
use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource};
use rand::seq::SliceRandom;
use parking_lot::RwLock;
use rand::prelude::IteratorRandom;
use rand::thread_rng;
pub use requests::LookupVerifyError;
use requests::{
Expand Down Expand Up @@ -308,8 +309,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {

pub fn get_random_custodial_peer(&self, column_index: ColumnIndex) -> Option<PeerId> {
self.get_custodial_peers(column_index)
.into_iter()
.choose(&mut thread_rng())
.cloned()
}

pub fn network_globals(&self) -> &NetworkGlobals<T::EthSpec> {
Expand Down Expand Up @@ -562,9 +563,24 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
pub fn block_lookup_request(
&mut self,
lookup_id: SingleLookupId,
peer_id: PeerId,
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
block_root: Hash256,
) -> Result<LookupRequestResult, RpcRequestSendError> {
let Some(peer_id) = lookup_peers
.read()
.iter()
.choose(&mut rand::thread_rng())
.copied()
else {
// Allow lookup to not have any peers and do nothing. This is an optimization to not
// lose progress of lookups created from a block with unknown parent before we receive
// attestations for said block.
// Lookup sync event safety: If a lookup requires peers to make progress, and does
// not receive any new peers for some time it will be dropped. If it receives a new
// peer it must attempt to make progress.
return Ok(LookupRequestResult::Pending("no peers"));
};

match self.chain.get_block_process_status(&block_root) {
// Unknown block, continue request to download
BlockProcessStatus::Unknown => {}
Expand Down Expand Up @@ -634,10 +650,25 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
pub fn blob_lookup_request(
&mut self,
lookup_id: SingleLookupId,
peer_id: PeerId,
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
block_root: Hash256,
expected_blobs: usize,
) -> Result<LookupRequestResult, RpcRequestSendError> {
let Some(peer_id) = lookup_peers
.read()
.iter()
.choose(&mut rand::thread_rng())
.copied()
else {
// Allow lookup to not have any peers and do nothing. This is an optimization to not
// lose progress of lookups created from a block with unknown parent before we receive
// attestations for said block.
// Lookup sync event safety: If a lookup requires peers to make progress, and does
// not receive any new peers for some time it will be dropped. If it receives a new
// peer it must attempt to make progress.
return Ok(LookupRequestResult::Pending("no peers"));
};

let imported_blob_indexes = self
.chain
.data_availability_checker
Expand Down Expand Up @@ -740,6 +771,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
&mut self,
lookup_id: SingleLookupId,
block_root: Hash256,
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
) -> Result<LookupRequestResult, RpcRequestSendError> {
let custody_indexes_imported = self
.chain
Expand Down Expand Up @@ -777,6 +809,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
block_root,
CustodyId { requester },
&custody_indexes_to_fetch,
lookup_peers,
self.log.clone(),
);

Expand Down
19 changes: 14 additions & 5 deletions beacon_node/network/src/sync/network_context/custody.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ use fnv::FnvHashMap;
use lighthouse_network::service::api_types::{CustodyId, DataColumnsByRootRequester};
use lighthouse_network::PeerId;
use lru_cache::LRUTimeCache;
use parking_lot::RwLock;
use rand::Rng;
use slog::{debug, warn};
use std::collections::HashSet;
use std::time::{Duration, Instant};
use std::{collections::HashMap, marker::PhantomData, sync::Arc};
use types::EthSpec;
Expand All @@ -32,6 +34,7 @@ pub struct ActiveCustodyRequest<T: BeaconChainTypes> {
/// Peers that have recently failed to successfully respond to a columns by root request.
/// Having a LRUTimeCache allows this request to not have to track disconnecting peers.
failed_peers: LRUTimeCache<PeerId>,
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
/// Logger for the `SyncNetworkContext`.
pub log: slog::Logger,
_phantom: PhantomData<T>,
Expand Down Expand Up @@ -64,6 +67,7 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
block_root: Hash256,
custody_id: CustodyId,
column_indices: &[ColumnIndex],
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
log: slog::Logger,
) -> Self {
Self {
Expand All @@ -76,6 +80,7 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
),
active_batch_columns_requests: <_>::default(),
failed_peers: LRUTimeCache::new(Duration::from_secs(FAILED_PEERS_CACHE_EXPIRY_SECONDS)),
lookup_peers,
log,
_phantom: PhantomData,
}
Expand Down Expand Up @@ -215,6 +220,7 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
}

let mut columns_to_request_by_peer = HashMap::<PeerId, Vec<ColumnIndex>>::new();
let lookup_peers = self.lookup_peers.read();

// Need to:
// - track how many active requests a peer has for load balancing
Expand Down Expand Up @@ -244,6 +250,8 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
.iter()
.map(|peer| {
(
// Prioritize peers that claim to know have imported this block
if lookup_peers.contains(peer) { 0 } else { 1 },
// De-prioritize peers that have failed to successfully respond to
// requests recently
self.failed_peers.contains(peer),
Expand All @@ -257,7 +265,7 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
.collect::<Vec<_>>();
priorized_peers.sort_unstable();

if let Some((_, _, _, peer_id)) = priorized_peers.first() {
if let Some((_, _, _, _, peer_id)) = priorized_peers.first() {
columns_to_request_by_peer
.entry(*peer_id)
.or_default()
Expand All @@ -283,10 +291,11 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
block_root: self.block_root,
indices: indices.clone(),
},
// true = enforce max_requests are returned data_columns_by_root. We only issue requests
// for blocks after we know the block has data, and only request peers after they claim to
// have imported the block+columns and claim to be custodians
true,
// If peer is in the lookup peer set, it claims to have imported the block and
// must have its columns in custody. In that case, set `true = enforce max_requests`
// and downscore if data_columns_by_root does not returned the expected custody
// columns. For the rest of peers, don't downscore if columns are missing.
lookup_peers.contains(&peer_id),
)
.map_err(Error::SendFailed)?;

Expand Down

0 comments on commit 4278402

Please sign in to comment.