Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix custodial peer assumption on lookup custody requests #6815

Merged
merged 2 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions beacon_node/network/src/sync/block_lookups/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use crate::sync::network_context::{LookupRequestResult, SyncNetworkContext};
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::BeaconChainTypes;
use lighthouse_network::service::api_types::Id;
use parking_lot::RwLock;
use std::collections::HashSet;
use std::sync::Arc;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{DataColumnSidecarList, SignedBeaconBlock};
Expand Down Expand Up @@ -41,7 +43,7 @@ pub trait RequestState<T: BeaconChainTypes> {
fn make_request(
&self,
id: Id,
peer_id: PeerId,
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
expected_blobs: usize,
cx: &mut SyncNetworkContext<T>,
) -> Result<LookupRequestResult, LookupRequestError>;
Expand Down Expand Up @@ -76,11 +78,11 @@ impl<T: BeaconChainTypes> RequestState<T> for BlockRequestState<T::EthSpec> {
fn make_request(
&self,
id: SingleLookupId,
peer_id: PeerId,
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
_: usize,
cx: &mut SyncNetworkContext<T>,
) -> Result<LookupRequestResult, LookupRequestError> {
cx.block_lookup_request(id, peer_id, self.requested_block_root)
cx.block_lookup_request(id, lookup_peers, self.requested_block_root)
.map_err(LookupRequestError::SendFailedNetwork)
}

Expand Down Expand Up @@ -124,11 +126,11 @@ impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState<T::EthSpec> {
fn make_request(
&self,
id: Id,
peer_id: PeerId,
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
expected_blobs: usize,
cx: &mut SyncNetworkContext<T>,
) -> Result<LookupRequestResult, LookupRequestError> {
cx.blob_lookup_request(id, peer_id, self.block_root, expected_blobs)
cx.blob_lookup_request(id, lookup_peers, self.block_root, expected_blobs)
.map_err(LookupRequestError::SendFailedNetwork)
}

Expand Down Expand Up @@ -172,12 +174,11 @@ impl<T: BeaconChainTypes> RequestState<T> for CustodyRequestState<T::EthSpec> {
fn make_request(
&self,
id: Id,
// TODO(das): consider selecting peers that have custody but are in this set
_peer_id: PeerId,
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
_: usize,
cx: &mut SyncNetworkContext<T>,
) -> Result<LookupRequestResult, LookupRequestError> {
cx.custody_lookup_request(id, self.block_root)
cx.custody_lookup_request(id, self.block_root, lookup_peers)
.map_err(LookupRequestError::SendFailedNetwork)
}

Expand Down
13 changes: 3 additions & 10 deletions beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,14 +153,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
pub(crate) fn active_single_lookups(&self) -> Vec<BlockLookupSummary> {
self.single_block_lookups
.iter()
.map(|(id, l)| {
(
*id,
l.block_root(),
l.awaiting_parent(),
l.all_peers().copied().collect(),
)
})
.map(|(id, l)| (*id, l.block_root(), l.awaiting_parent(), l.all_peers()))
.collect()
}

Expand Down Expand Up @@ -283,7 +276,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
.find(|(_, l)| l.block_root() == parent_chain_tip)
{
cx.send_sync_message(SyncMessage::AddPeersForceRangeSync {
peers: lookup.all_peers().copied().collect(),
peers: lookup.all_peers(),
head_slot: tip_lookup.peek_downloaded_block_slot(),
head_root: parent_chain_tip,
});
Expand Down Expand Up @@ -682,7 +675,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
lookup.continue_requests(cx)
}
Action::ParentUnknown { parent_root } => {
let peers = lookup.all_peers().copied().collect::<Vec<_>>();
let peers = lookup.all_peers();
lookup.set_awaiting_parent(parent_root);
debug!(self.log, "Marking lookup as awaiting parent"; "id" => lookup.id, "block_root" => ?block_root, "parent_root" => ?parent_root);
self.search_parent_of_child(parent_root, block_root, &peers, cx);
Expand Down
49 changes: 16 additions & 33 deletions beacon_node/network/src/sync/block_lookups/single_block_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::sync::network_context::{
use beacon_chain::{BeaconChainTypes, BlockProcessStatus};
use derivative::Derivative;
use lighthouse_network::service::api_types::Id;
use rand::seq::IteratorRandom;
use parking_lot::RwLock;
use std::collections::HashSet;
use std::fmt::Debug;
use std::sync::Arc;
Expand All @@ -33,8 +33,6 @@ pub enum LookupRequestError {
/// The failed attempts were primarily due to processing failures.
cannot_process: bool,
},
/// No peers left to serve this lookup
NoPeers,
/// Error sending event to network
SendFailedNetwork(RpcRequestSendError),
/// Error sending event to processor
Expand Down Expand Up @@ -63,9 +61,12 @@ pub struct SingleBlockLookup<T: BeaconChainTypes> {
pub id: Id,
pub block_request_state: BlockRequestState<T::EthSpec>,
pub component_requests: ComponentRequests<T::EthSpec>,
/// Peers that claim to have imported this set of block components
/// Peers that claim to have imported this set of block components. This state is shared with
/// the custody request to have an updated view of the peers that claim to have imported the
/// block associated with this lookup. The peer set of a lookup can change rapidly, and faster
/// than the lifetime of a custody request.
#[derivative(Debug(format_with = "fmt_peer_set_as_len"))]
peers: HashSet<PeerId>,
peers: Arc<RwLock<HashSet<PeerId>>>,
block_root: Hash256,
awaiting_parent: Option<Hash256>,
created: Instant,
Expand All @@ -92,7 +93,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
id,
block_request_state: BlockRequestState::new(requested_block_root),
component_requests: ComponentRequests::WaitingForBlock,
peers: HashSet::from_iter(peers.iter().copied()),
peers: Arc::new(RwLock::new(HashSet::from_iter(peers.iter().copied()))),
block_root: requested_block_root,
awaiting_parent,
created: Instant::now(),
Expand Down Expand Up @@ -283,24 +284,11 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
return Err(LookupRequestError::TooManyAttempts { cannot_process });
}

let Some(peer_id) = self.use_rand_available_peer() else {
// Allow lookup to not have any peers and do nothing. This is an optimization to not
// lose progress of lookups created from a block with unknown parent before we receive
// attestations for said block.
// Lookup sync event safety: If a lookup requires peers to make progress, and does
// not receive any new peers for some time it will be dropped. If it receives a new
// peer it must attempt to make progress.
R::request_state_mut(self)
.map_err(|e| LookupRequestError::BadState(e.to_owned()))?
.get_state_mut()
.update_awaiting_download_status("no peers");
return Ok(());
};

let peers = self.peers.clone();
let request = R::request_state_mut(self)
.map_err(|e| LookupRequestError::BadState(e.to_owned()))?;

match request.make_request(id, peer_id, expected_blobs, cx)? {
match request.make_request(id, peers, expected_blobs, cx)? {
LookupRequestResult::RequestSent(req_id) => {
// Lookup sync event safety: If make_request returns `RequestSent`, we are
// guaranteed that `BlockLookups::on_download_response` will be called exactly
Expand Down Expand Up @@ -348,29 +336,24 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
}

/// Get all unique peers that claim to have imported this set of block components
pub fn all_peers(&self) -> impl Iterator<Item = &PeerId> + '_ {
self.peers.iter()
pub fn all_peers(&self) -> Vec<PeerId> {
self.peers.read().iter().copied().collect()
}

/// Add peer to all request states. The peer must be able to serve this request.
/// Returns true if the peer was newly inserted into some request state.
pub fn add_peer(&mut self, peer_id: PeerId) -> bool {
self.peers.insert(peer_id)
self.peers.write().insert(peer_id)
}

/// Remove peer from available peers.
pub fn remove_peer(&mut self, peer_id: &PeerId) {
self.peers.remove(peer_id);
self.peers.write().remove(peer_id);
}

/// Returns true if this lookup has zero peers
pub fn has_no_peers(&self) -> bool {
self.peers.is_empty()
}

/// Selects a random peer from available peers if any
fn use_rand_available_peer(&mut self) -> Option<PeerId> {
self.peers.iter().choose(&mut rand::thread_rng()).copied()
self.peers.read().is_empty()
}
}

Expand Down Expand Up @@ -689,8 +672,8 @@ impl<T: Clone> std::fmt::Debug for State<T> {
}

fn fmt_peer_set_as_len(
peer_set: &HashSet<PeerId>,
peer_set: &Arc<RwLock<HashSet<PeerId>>>,
f: &mut std::fmt::Formatter,
) -> Result<(), std::fmt::Error> {
write!(f, "{}", peer_set.len())
write!(f, "{}", peer_set.read().len())
}
41 changes: 37 additions & 4 deletions beacon_node/network/src/sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ use lighthouse_network::service::api_types::{
DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId,
};
use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource};
use rand::seq::SliceRandom;
use parking_lot::RwLock;
use rand::prelude::IteratorRandom;
use rand::thread_rng;
pub use requests::LookupVerifyError;
use requests::{
Expand Down Expand Up @@ -308,8 +309,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {

pub fn get_random_custodial_peer(&self, column_index: ColumnIndex) -> Option<PeerId> {
self.get_custodial_peers(column_index)
.into_iter()
.choose(&mut thread_rng())
.cloned()
}

pub fn network_globals(&self) -> &NetworkGlobals<T::EthSpec> {
Expand Down Expand Up @@ -562,9 +563,24 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
pub fn block_lookup_request(
&mut self,
lookup_id: SingleLookupId,
peer_id: PeerId,
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
block_root: Hash256,
) -> Result<LookupRequestResult, RpcRequestSendError> {
let Some(peer_id) = lookup_peers
.read()
.iter()
.choose(&mut rand::thread_rng())
.copied()
else {
// Allow lookup to not have any peers and do nothing. This is an optimization to not
// lose progress of lookups created from a block with unknown parent before we receive
// attestations for said block.
// Lookup sync event safety: If a lookup requires peers to make progress, and does
// not receive any new peers for some time it will be dropped. If it receives a new
// peer it must attempt to make progress.
return Ok(LookupRequestResult::Pending("no peers"));
jimmygchen marked this conversation as resolved.
Show resolved Hide resolved
};

match self.chain.get_block_process_status(&block_root) {
// Unknown block, continue request to download
BlockProcessStatus::Unknown => {}
Expand Down Expand Up @@ -634,10 +650,25 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
pub fn blob_lookup_request(
&mut self,
lookup_id: SingleLookupId,
peer_id: PeerId,
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
block_root: Hash256,
expected_blobs: usize,
) -> Result<LookupRequestResult, RpcRequestSendError> {
let Some(peer_id) = lookup_peers
.read()
.iter()
.choose(&mut rand::thread_rng())
.copied()
else {
// Allow lookup to not have any peers and do nothing. This is an optimization to not
// lose progress of lookups created from a block with unknown parent before we receive
// attestations for said block.
// Lookup sync event safety: If a lookup requires peers to make progress, and does
// not receive any new peers for some time it will be dropped. If it receives a new
// peer it must attempt to make progress.
return Ok(LookupRequestResult::Pending("no peers"));
};

let imported_blob_indexes = self
.chain
.data_availability_checker
Expand Down Expand Up @@ -740,6 +771,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
&mut self,
lookup_id: SingleLookupId,
block_root: Hash256,
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
) -> Result<LookupRequestResult, RpcRequestSendError> {
let custody_indexes_imported = self
.chain
Expand Down Expand Up @@ -777,6 +809,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
block_root,
CustodyId { requester },
&custody_indexes_to_fetch,
lookup_peers,
self.log.clone(),
);

Expand Down
20 changes: 15 additions & 5 deletions beacon_node/network/src/sync/network_context/custody.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ use fnv::FnvHashMap;
use lighthouse_network::service::api_types::{CustodyId, DataColumnsByRootRequester};
use lighthouse_network::PeerId;
use lru_cache::LRUTimeCache;
use parking_lot::RwLock;
use rand::Rng;
use slog::{debug, warn};
use std::collections::HashSet;
use std::time::{Duration, Instant};
use std::{collections::HashMap, marker::PhantomData, sync::Arc};
use types::EthSpec;
Expand All @@ -32,6 +34,8 @@ pub struct ActiveCustodyRequest<T: BeaconChainTypes> {
/// Peers that have recently failed to successfully respond to a columns by root request.
/// Having a LRUTimeCache allows this request to not have to track disconnecting peers.
failed_peers: LRUTimeCache<PeerId>,
/// Set of peers that claim to have imported this block and their custody columns
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
/// Logger for the `SyncNetworkContext`.
pub log: slog::Logger,
_phantom: PhantomData<T>,
Expand Down Expand Up @@ -64,6 +68,7 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
block_root: Hash256,
custody_id: CustodyId,
column_indices: &[ColumnIndex],
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
log: slog::Logger,
) -> Self {
Self {
Expand All @@ -76,6 +81,7 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
),
active_batch_columns_requests: <_>::default(),
failed_peers: LRUTimeCache::new(Duration::from_secs(FAILED_PEERS_CACHE_EXPIRY_SECONDS)),
lookup_peers,
log,
_phantom: PhantomData,
}
Expand Down Expand Up @@ -215,6 +221,7 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
}

let mut columns_to_request_by_peer = HashMap::<PeerId, Vec<ColumnIndex>>::new();
let lookup_peers = self.lookup_peers.read();

// Need to:
// - track how many active requests a peer has for load balancing
Expand Down Expand Up @@ -244,6 +251,8 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
.iter()
.map(|peer| {
(
// Prioritize peers that claim to know have imported this block
if lookup_peers.contains(peer) { 0 } else { 1 },
// De-prioritize peers that have failed to successfully respond to
// requests recently
self.failed_peers.contains(peer),
Expand All @@ -257,7 +266,7 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
.collect::<Vec<_>>();
priorized_peers.sort_unstable();

if let Some((_, _, _, peer_id)) = priorized_peers.first() {
if let Some((_, _, _, _, peer_id)) = priorized_peers.first() {
columns_to_request_by_peer
.entry(*peer_id)
.or_default()
Expand All @@ -283,10 +292,11 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
block_root: self.block_root,
indices: indices.clone(),
},
// true = enforce max_requests are returned data_columns_by_root. We only issue requests
// for blocks after we know the block has data, and only request peers after they claim to
// have imported the block+columns and claim to be custodians
true,
// If peer is in the lookup peer set, it claims to have imported the block and
// must have its columns in custody. In that case, set `true = enforce max_requests`
// and downscore if data_columns_by_root does not returned the expected custody
// columns. For the rest of peers, don't downscore if columns are missing.
lookup_peers.contains(&peer_id),
)
.map_err(Error::SendFailed)?;

Expand Down
Loading