From b328e5f6ece1dc8b0f6a9add75e0aee0c07c27d3 Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Tue, 23 Apr 2019 17:41:21 -0700 Subject: [PATCH] Add genesis blockhashes to blobs (#3955) * Add genesis blockhashes to blobs * Update golden --- core/src/broadcast_stage.rs | 13 +++++++++++-- core/src/chacha.rs | 2 +- core/src/fullnode.rs | 3 +++ core/src/packet.rs | 36 ++++++++++++++++++++++++++++++++++-- core/src/replicator.rs | 1 + core/src/retransmit_stage.rs | 3 +++ core/src/tpu.rs | 3 +++ core/src/tvu.rs | 4 ++++ core/src/window_service.rs | 11 +++++++++-- core/tests/tvu.rs | 1 + 10 files changed, 70 insertions(+), 7 deletions(-) diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index c23227dc16..1e582f0b84 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -5,7 +5,7 @@ use crate::cluster_info::{ClusterInfo, ClusterInfoError, DATA_PLANE_FANOUT}; use crate::entry::{EntrySender, EntrySlice}; #[cfg(feature = "erasure")] use crate::erasure::CodingGenerator; -use crate::packet::index_blobs; +use crate::packet::index_blobs_with_genesis; use crate::poh_recorder::WorkingBankEntries; use crate::result::{Error, Result}; use crate::service::Service; @@ -13,6 +13,7 @@ use crate::staking_utils; use rayon::prelude::*; use solana_metrics::counter::Counter; use solana_metrics::{influxdb, submit}; +use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::duration_as_ms; use std::net::UdpSocket; @@ -42,6 +43,7 @@ impl Broadcast { sock: &UdpSocket, blocktree: &Arc, storage_entry_sender: &EntrySender, + genesis_blockhash: &Hash, ) -> Result<()> { let timer = Duration::new(1, 0); let (mut bank, entries) = receiver.recv_timeout(timer)?; @@ -103,9 +105,10 @@ impl Broadcast { .map(|meta| meta.consumed) .unwrap_or(0); - index_blobs( + index_blobs_with_genesis( &blobs, &self.id, + genesis_blockhash, blob_index, bank.slot(), bank.parent().map_or(0, |parent| parent.slot()), @@ -192,6 +195,7 @@ impl BroadcastStage { receiver: &Receiver, blocktree: &Arc, storage_entry_sender: EntrySender, + genesis_blockhash: &Hash, ) -> BroadcastStageReturnType { let me = cluster_info.read().unwrap().my_data().clone(); @@ -208,6 +212,7 @@ impl BroadcastStage { sock, blocktree, &storage_entry_sender, + genesis_blockhash, ) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) | Error::SendError => { @@ -247,9 +252,11 @@ impl BroadcastStage { exit_sender: &Arc, blocktree: &Arc, storage_entry_sender: EntrySender, + genesis_blockhash: &Hash, ) -> Self { let blocktree = blocktree.clone(); let exit_sender = exit_sender.clone(); + let genesis_blockhash = *genesis_blockhash; let thread_hdl = Builder::new() .name("solana-broadcaster".to_string()) .spawn(move || { @@ -260,6 +267,7 @@ impl BroadcastStage { &receiver, &blocktree, storage_entry_sender, + &genesis_blockhash, ) }) .unwrap(); @@ -331,6 +339,7 @@ mod test { &exit_sender, &blocktree, storage_sender, + &Hash::default(), ); MockBroadcastStage { diff --git a/core/src/chacha.rs b/core/src/chacha.rs index 09eb03d78c..87614717c1 100644 --- a/core/src/chacha.rs +++ b/core/src/chacha.rs @@ -164,7 +164,7 @@ mod tests { use bs58; // golden needs to be updated if blob stuff changes.... let golden = Hash::new( - &bs58::decode("5NBn4cBZmNZRftkjxj3um8W1eyYPzn2RgUJSA3SVbHaJ") + &bs58::decode("F5zEvoozmV87cgS1sfG226nYSJoupRjKA93QUQiWd31E") .into_vec() .unwrap(), ); diff --git a/core/src/fullnode.rs b/core/src/fullnode.rs index db4a76789f..d80343cafa 100644 --- a/core/src/fullnode.rs +++ b/core/src/fullnode.rs @@ -100,6 +100,7 @@ impl Fullnode { let exit = Arc::new(AtomicBool::new(false)); let bank_info = &bank_forks_info[0]; let bank = bank_forks[bank_info.bank_slot].clone(); + let genesis_blockhash = bank.last_blockhash(); info!( "starting PoH... {} {}", @@ -232,6 +233,7 @@ impl Fullnode { sender.clone(), receiver, &exit, + &genesis_blockhash, ); let tpu = Tpu::new( &id, @@ -245,6 +247,7 @@ impl Fullnode { &blocktree, sender, &exit, + &genesis_blockhash, ); inc_new_counter_info!("fullnode-new", 1); diff --git a/core/src/packet.rs b/core/src/packet.rs index 1bf6f810eb..c6a7885d0a 100644 --- a/core/src/packet.rs +++ b/core/src/packet.rs @@ -5,6 +5,7 @@ use bincode; use byteorder::{ByteOrder, LittleEndian}; use serde::Serialize; use solana_metrics::counter::Counter; +use solana_sdk::hash::Hash; pub use solana_sdk::packet::PACKET_DATA_SIZE; use solana_sdk::pubkey::Pubkey; use std::borrow::Borrow; @@ -363,7 +364,8 @@ const SLOT_RANGE: std::ops::Range = range!(PARENT_RANGE.end, u64); const INDEX_RANGE: std::ops::Range = range!(SLOT_RANGE.end, u64); const ID_RANGE: std::ops::Range = range!(INDEX_RANGE.end, Pubkey); const FORWARD_RANGE: std::ops::Range = range!(ID_RANGE.end, bool); -const FLAGS_RANGE: std::ops::Range = range!(FORWARD_RANGE.end, u32); +const GENESIS_RANGE: std::ops::Range = range!(FORWARD_RANGE.end, Hash); +const FLAGS_RANGE: std::ops::Range = range!(GENESIS_RANGE.end, u32); const SIZE_RANGE: std::ops::Range = range!(FLAGS_RANGE.end, u64); macro_rules! align { @@ -439,6 +441,14 @@ impl Blob { self.data[FORWARD_RANGE][0] = u8::from(forward) } + pub fn set_genesis_blockhash(&mut self, blockhash: &Hash) { + self.data[GENESIS_RANGE].copy_from_slice(blockhash.as_ref()) + } + + pub fn genesis_blockhash(&self) -> Hash { + Hash::new(&self.data[GENESIS_RANGE]) + } + pub fn flags(&self) -> u32 { LittleEndian::read_u32(&self.data[FLAGS_RANGE]) } @@ -577,12 +587,24 @@ impl Blob { } } -pub fn index_blobs(blobs: &[SharedBlob], id: &Pubkey, mut blob_index: u64, slot: u64, parent: u64) { +pub fn index_blobs(blobs: &[SharedBlob], id: &Pubkey, blob_index: u64, slot: u64, parent: u64) { + index_blobs_with_genesis(blobs, id, &Hash::default(), blob_index, slot, parent) +} + +pub fn index_blobs_with_genesis( + blobs: &[SharedBlob], + id: &Pubkey, + genesis: &Hash, + mut blob_index: u64, + slot: u64, + parent: u64, +) { // enumerate all the blobs, those are the indices for blob in blobs.iter() { let mut blob = blob.write().unwrap(); blob.set_index(blob_index); + blob.set_genesis_blockhash(genesis); blob.set_slot(slot); blob.set_parent(parent); blob.set_id(id); @@ -835,4 +857,14 @@ mod tests { assert!(p1 != p2); } + #[test] + fn test_blob_genesis_blockhash() { + let mut blob = Blob::default(); + assert_eq!(blob.genesis_blockhash(), Hash::default()); + + let hash = Hash::new(&Pubkey::new_rand().as_ref()); + blob.set_genesis_blockhash(&hash); + assert_eq!(blob.genesis_blockhash(), hash); + } + } diff --git a/core/src/replicator.rs b/core/src/replicator.rs index 3e5702f2de..f4d6f49a31 100644 --- a/core/src/replicator.rs +++ b/core/src/replicator.rs @@ -246,6 +246,7 @@ impl Replicator { repair_socket, &exit, Some(repair_slot_range), + &Hash::default(), ); let client = create_client(cluster_entrypoint.client_facing_addr(), FULLNODE_PORT_RANGE); diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 5822a43b37..b1ea4369ed 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -14,6 +14,7 @@ use crate::streamer::BlobReceiver; use crate::window_service::WindowService; use solana_metrics::counter::Counter; use solana_metrics::{influxdb, submit}; +use solana_sdk::hash::Hash; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; use std::sync::mpsc::channel; @@ -115,6 +116,7 @@ impl RetransmitStage { repair_socket: Arc, fetch_stage_receiver: BlobReceiver, exit: &Arc, + genesis_blockhash: &Hash, ) -> Self { let (retransmit_sender, retransmit_receiver) = channel(); @@ -133,6 +135,7 @@ impl RetransmitStage { repair_socket, exit, None, + genesis_blockhash, ); let thread_hdls = vec![t_retransmit]; diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 3962f83475..23840ee1c4 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -11,6 +11,7 @@ use crate::fetch_stage::FetchStage; use crate::poh_recorder::{PohRecorder, WorkingBankEntries}; use crate::service::Service; use crate::sigverify_stage::SigVerifyStage; +use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; @@ -40,6 +41,7 @@ impl Tpu { blocktree: &Arc, storage_entry_sender: EntrySender, exit: &Arc, + genesis_blockhash: &Hash, ) -> Self { cluster_info.write().unwrap().set_leader(id); @@ -72,6 +74,7 @@ impl Tpu { &exit, blocktree, storage_entry_sender, + genesis_blockhash, ); Self { diff --git a/core/src/tvu.rs b/core/src/tvu.rs index c8ea5a7ce6..f0e2865773 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -25,6 +25,7 @@ use crate::retransmit_stage::RetransmitStage; use crate::rpc_subscriptions::RpcSubscriptions; use crate::service::Service; use crate::storage_stage::{StorageStage, StorageState}; +use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil}; use std::net::UdpSocket; @@ -72,6 +73,7 @@ impl Tvu { storage_entry_sender: EntrySender, storage_entry_receiver: EntryReceiver, exit: &Arc, + genesis_blockhash: &Hash, ) -> Self where T: 'static + KeypairUtil + Sync + Send, @@ -107,6 +109,7 @@ impl Tvu { repair_socket, blob_fetch_receiver, &exit, + genesis_blockhash, ); let (replay_stage, slot_full_receiver) = ReplayStage::new( @@ -237,6 +240,7 @@ pub mod tests { storage_entry_sender, storage_entry_receiver, &exit, + &Hash::default(), ); exit.store(true, Ordering::Relaxed); tvu.join().unwrap(); diff --git a/core/src/window_service.rs b/core/src/window_service.rs index d62f289b0e..1c7fe6827c 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -12,6 +12,7 @@ use crate::service::Service; use crate::streamer::{BlobReceiver, BlobSender}; use solana_metrics::counter::Counter; use solana_runtime::bank::Bank; +use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::duration_as_ms; use std::net::UdpSocket; @@ -91,6 +92,7 @@ fn recv_window( my_id: &Pubkey, r: &BlobReceiver, retransmit: &BlobSender, + genesis_blockhash: &Hash, ) -> Result<()> { let timer = Duration::from_millis(200); let mut blobs = r.recv_timeout(timer)?; @@ -108,7 +110,7 @@ fn recv_window( .map(|bank_forks| bank_forks.read().unwrap().working_bank()) .as_ref(), my_id, - ) + ) && blob.read().unwrap().genesis_blockhash() == *genesis_blockhash }); retransmit_blobs(&blobs, retransmit, my_id)?; @@ -149,6 +151,7 @@ pub struct WindowService { } impl WindowService { + #[allow(clippy::too_many_arguments)] pub fn new( bank_forks: Option>>, blocktree: Arc, @@ -158,6 +161,7 @@ impl WindowService { repair_socket: Arc, exit: &Arc, repair_slot_range: Option, + genesis_blockhash: &Hash, ) -> WindowService { let repair_service = RepairService::new( blocktree.clone(), @@ -168,6 +172,7 @@ impl WindowService { ); let exit = exit.clone(); let bank_forks = bank_forks.clone(); + let hash = *genesis_blockhash; let t_window = Builder::new() .name("solana-window".to_string()) .spawn(move || { @@ -179,7 +184,7 @@ impl WindowService { break; } if let Err(e) = - recv_window(bank_forks.as_ref(), &blocktree, &id, &r, &retransmit) + recv_window(bank_forks.as_ref(), &blocktree, &id, &r, &retransmit, &hash) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, @@ -324,6 +329,7 @@ mod test { Arc::new(leader_node.sockets.repair), &exit, None, + &Hash::default(), ); let t_responder = { let (s_responder, r_responder) = channel(); @@ -400,6 +406,7 @@ mod test { Arc::new(leader_node.sockets.repair), &exit, None, + &Hash::default(), ); let t_responder = { let (s_responder, r_responder) = channel(); diff --git a/core/tests/tvu.rs b/core/tests/tvu.rs index f72f4f4c1b..da28b50dcd 100644 --- a/core/tests/tvu.rs +++ b/core/tests/tvu.rs @@ -127,6 +127,7 @@ fn test_replay() { storage_sender, storage_receiver, &exit, + &solana_sdk::hash::Hash::default(), ); let mut mint_ref_balance = starting_mint_balance;