From 3a20a20807335d43ee02c89402ef691449bfb4b1 Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Wed, 27 Feb 2019 13:37:08 -0800 Subject: [PATCH] Reintroduce leader_id to blobs (#2986) --- src/blocktree.rs | 9 ++++--- src/broadcast_service.rs | 2 +- src/chacha.rs | 2 +- src/db_window.rs | 55 +++++++--------------------------------- src/entry.rs | 2 ++ src/erasure.rs | 9 +++++-- src/fullnode.rs | 15 +++++++---- src/packet.rs | 17 +++++++++++-- src/replicator.rs | 3 +-- src/retransmit_stage.rs | 1 - src/window_service.rs | 40 +++++++++++------------------ tests/tvu.rs | 2 +- 12 files changed, 67 insertions(+), 90 deletions(-) diff --git a/src/blocktree.rs b/src/blocktree.rs index 73f9590506..11a63c163e 100644 --- a/src/blocktree.rs +++ b/src/blocktree.rs @@ -1253,7 +1253,7 @@ pub fn create_new_ledger(ledger_path: &str, genesis_block: &GenesisBlock) -> Res Ok(entries.last().unwrap().id) } -pub fn genesis<'a, I>(ledger_path: &str, entries: I) -> Result<()> +pub fn genesis<'a, I>(ledger_path: &str, keypair: &Keypair, entries: I) -> Result<()> where I: IntoIterator, { @@ -1267,6 +1267,7 @@ where let mut b = entry.borrow().to_blob(); b.set_index(idx as u64); b.forward(true); + b.set_id(&keypair.pubkey()); b.set_slot(0); b }) @@ -1486,7 +1487,7 @@ pub mod tests { fn test_read_blobs_bytes() { let shared_blobs = make_tiny_test_entries(10).to_shared_blobs(); let slot = 0; - index_blobs(&shared_blobs, &mut 0, slot); + index_blobs(&shared_blobs, &Keypair::new().pubkey(), &mut 0, slot); let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect(); @@ -1850,7 +1851,7 @@ pub mod tests { let ledger_path = get_tmp_ledger_path("test_genesis_and_entry_iterator"); { - genesis(&ledger_path, &entries).unwrap(); + genesis(&ledger_path, &Keypair::new(), &entries).unwrap(); let ledger = Blocktree::open(&ledger_path).expect("open failed"); @@ -1868,7 +1869,7 @@ pub mod tests { let ledger_path = get_tmp_ledger_path("test_genesis_and_entry_iterator"); { // put entries except last 2 into ledger - genesis(&ledger_path, &entries[..entries.len() - 2]).unwrap(); + genesis(&ledger_path, &Keypair::new(), &entries[..entries.len() - 2]).unwrap(); let ledger = Blocktree::open(&ledger_path).expect("open failed"); diff --git a/src/broadcast_service.rs b/src/broadcast_service.rs index 1f77196511..f3647d2171 100644 --- a/src/broadcast_service.rs +++ b/src/broadcast_service.rs @@ -74,7 +74,7 @@ impl Broadcast { .collect(); // TODO: blob_index should be slot-relative... - index_blobs(&blobs, &mut self.blob_index, slot_height); + index_blobs(&blobs, &self.id, &mut self.blob_index, slot_height); let parent = { if slot_height == 0 { 0 diff --git a/src/chacha.rs b/src/chacha.rs index 6234c1c7d1..4c557c759f 100644 --- a/src/chacha.rs +++ b/src/chacha.rs @@ -161,7 +161,7 @@ mod tests { use bs58; // golden needs to be updated if blob stuff changes.... let golden = Hash::new( - &bs58::decode("3hY6c5V5R6ho3k5KBYkDg2nZTtkuBpvu9n421nGntHrg") + &bs58::decode("BCNVsE19CCpsvGseZTCEEM1qSiX1ridku2w155VveqEu") .into_vec() .unwrap(), ); diff --git a/src/db_window.rs b/src/db_window.rs index 5b2515ee44..fa55372d35 100644 --- a/src/db_window.rs +++ b/src/db_window.rs @@ -1,47 +1,26 @@ //! Set of functions for emulating windowing functions from a database ledger implementation -use crate::bank_forks::BankForks; use crate::blocktree::*; #[cfg(feature = "erasure")] use crate::erasure; -use crate::leader_scheduler::LeaderScheduler; use crate::packet::{SharedBlob, BLOB_HEADER_SIZE}; use crate::result::Result; use crate::streamer::BlobSender; use solana_metrics::counter::Counter; -use solana_metrics::{influxdb, submit}; use solana_sdk::pubkey::Pubkey; use std::borrow::Borrow; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; pub const MAX_REPAIR_LENGTH: usize = 128; -pub fn retransmit_blobs( - dq: &[SharedBlob], - bank_forks: &Arc>, - retransmit: &BlobSender, - id: &Pubkey, -) -> Result<()> { +pub fn retransmit_blobs(dq: &[SharedBlob], retransmit: &BlobSender, id: &Pubkey) -> Result<()> { let mut retransmit_queue: Vec = Vec::new(); - let leader_scheduler = LeaderScheduler::default(); for b in dq { - let bank = bank_forks.read().unwrap().working_bank(); // Don't add blobs generated by this node to the retransmit queue - let slot = b.read().unwrap().slot(); - if leader_scheduler.slot_leader_at(slot, &bank) != *id { + if b.read().unwrap().id() != *id { retransmit_queue.push(b.clone()); } } - //todo maybe move this to where retransmit is actually happening - submit( - influxdb::Point::new("retransmit-queue") - .add_field( - "count", - influxdb::Value::Integer(retransmit_queue.len() as i64), - ) - .to_owned(), - ); - if !retransmit_queue.is_empty() { inc_new_counter_info!("streamer-recv_window-retransmit", retransmit_queue.len()); retransmit.send(retransmit_queue)?; @@ -50,11 +29,7 @@ pub fn retransmit_blobs( } /// Process a blob: Add blob to the ledger window. -pub fn process_blob( - bank_forks: &Arc>, - blocktree: &Arc, - blob: &SharedBlob, -) -> Result<()> { +pub fn process_blob(blocktree: &Arc, blob: &SharedBlob) -> Result<()> { let is_coding = blob.read().unwrap().is_coding(); // Check if the blob is in the range of our known leaders. If not, we return. @@ -62,8 +37,6 @@ pub fn process_blob( let r_blob = blob.read().unwrap(); (r_blob.slot(), r_blob.index()) }; - let bank = bank_forks.read().unwrap().working_bank(); - let _leader = LeaderScheduler::default().slot_leader_at(slot, &bank); // TODO: Once the original leader signature is added to the blob, make sure that // the blob was originally generated by the expected leader for this slot @@ -124,14 +97,13 @@ mod test { use crate::erasure::{NUM_CODING, NUM_DATA}; use crate::packet::{index_blobs, Blob, Packet, Packets, SharedBlob, PACKET_DATA_SIZE}; use crate::streamer::{receiver, responder, PacketReceiver}; - use solana_runtime::bank::Bank; - use solana_sdk::genesis_block::GenesisBlock; + use solana_sdk::signature::{Keypair, KeypairUtil}; use std::io; use std::io::Write; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; - use std::sync::{Arc, RwLock}; + use std::sync::Arc; use std::time::Duration; fn get_msgs(r: PacketReceiver, num: &mut usize) { @@ -448,7 +420,7 @@ mod test { let num_entries = 10; let shared_blobs = make_tiny_test_entries(num_entries).to_shared_blobs(); - index_blobs(&shared_blobs, &mut 0, slot); + index_blobs(&shared_blobs, &Keypair::new().pubkey(), &mut 0, slot); let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect(); @@ -534,23 +506,14 @@ mod test { fn test_process_blob() { let blocktree_path = get_tmp_ledger_path!(); let blocktree = Arc::new(Blocktree::open(&blocktree_path).unwrap()); - - let (genesis_block, _) = GenesisBlock::new(100); - let bank0 = Bank::new(&genesis_block); - let bank_id = 0; - let bank_forks = BankForks::new(bank_id, bank0); - let num_entries = 10; let original_entries = make_tiny_test_entries(num_entries); let shared_blobs = original_entries.clone().to_shared_blobs(); - index_blobs(&shared_blobs, &mut 0, 0); - - let bank_forks = Arc::new(RwLock::new(bank_forks)); + index_blobs(&shared_blobs, &Keypair::new().pubkey(), &mut 0, 0); for blob in shared_blobs.iter().rev() { - process_blob(&bank_forks, &blocktree, blob) - .expect("Expect successful processing of blob"); + process_blob(&blocktree, blob).expect("Expect successful processing of blob"); } assert_eq!( diff --git a/src/entry.rs b/src/entry.rs index ed38d3ad9d..1af4d74925 100644 --- a/src/entry.rs +++ b/src/entry.rs @@ -461,6 +461,7 @@ pub fn make_large_test_entries(num_entries: usize) -> Vec { #[cfg(test)] pub fn make_consecutive_blobs( + id: &Pubkey, num_blobs_to_make: u64, start_height: u64, start_hash: Hash, @@ -473,6 +474,7 @@ pub fn make_consecutive_blobs( for blob in &blobs { let mut blob = blob.write().unwrap(); blob.set_index(index); + blob.set_id(id); blob.forward(true); blob.meta.set_addr(addr); index += 1; diff --git a/src/erasure.rs b/src/erasure.rs index 9307896cb0..7e1e3cc0e9 100644 --- a/src/erasure.rs +++ b/src/erasure.rs @@ -329,6 +329,7 @@ impl CodingGenerator { for data_blob in &data_locks[NUM_DATA - NUM_CODING..NUM_DATA] { let index = data_blob.index(); let slot = data_blob.slot(); + let id = data_blob.id(); let should_forward = data_blob.should_forward(); let coding_blob = SharedBlob::default(); @@ -336,6 +337,7 @@ impl CodingGenerator { let mut coding_blob = coding_blob.write().unwrap(); coding_blob.set_index(index); coding_blob.set_slot(slot); + coding_blob.set_id(&id); coding_blob.forward(should_forward); coding_blob.set_size(max_data_size); coding_blob.set_coding(); @@ -509,6 +511,7 @@ pub mod test { use crate::window::WindowSlot; use rand::{thread_rng, Rng}; use solana_sdk::pubkey::Pubkey; + use solana_sdk::signature::{Keypair, KeypairUtil}; use std::sync::Arc; #[test] @@ -761,6 +764,7 @@ pub mod test { let index = data_rl.index(); let slot = data_rl.slot(); + let id = data_rl.id(); let should_forward = data_rl.should_forward(); trace!( @@ -771,6 +775,7 @@ pub mod test { ); coding_wl.set_index(index); coding_wl.set_slot(slot); + coding_wl.set_id(&id); coding_wl.forward(should_forward); } coding_wl.set_size(max_data_size); @@ -889,7 +894,7 @@ pub mod test { } // Make some dummy slots - index_blobs(&blobs, &mut (offset as u64), slot); + index_blobs(&blobs, &Keypair::new().pubkey(), &mut (offset as u64), slot); for b in blobs { let idx = b.read().unwrap().index() as usize % WINDOW_SIZE; @@ -902,7 +907,7 @@ pub mod test { fn generate_test_blobs(offset: usize, num_blobs: usize) -> Vec { let blobs = make_tiny_test_entries(num_blobs).to_shared_blobs(); - index_blobs(&blobs, &mut (offset as u64), 0); + index_blobs(&blobs, &Keypair::new().pubkey(), &mut (offset as u64), 0); blobs } diff --git a/src/fullnode.rs b/src/fullnode.rs index c6ebbed9de..bc382140d0 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -722,11 +722,16 @@ mod tests { let tvu_address = &validator_info.tvu; - let msgs = - make_consecutive_blobs(blobs_to_send, ledger_initial_len, last_id, &tvu_address) - .into_iter() - .rev() - .collect(); + let msgs = make_consecutive_blobs( + &leader_id, + blobs_to_send, + ledger_initial_len, + last_id, + &tvu_address, + ) + .into_iter() + .rev() + .collect(); s_responder.send(msgs).expect("send"); t_responder }; diff --git a/src/packet.rs b/src/packet.rs index abee26f5f5..b8807be4f9 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -6,6 +6,7 @@ use byteorder::{ByteOrder, LittleEndian}; use serde::Serialize; use solana_metrics::counter::Counter; pub use solana_sdk::packet::PACKET_DATA_SIZE; +use solana_sdk::pubkey::Pubkey; use std::cmp; use std::fmt; use std::io; @@ -278,7 +279,8 @@ macro_rules! range { const PARENT_RANGE: std::ops::Range = range!(0, u64); const SLOT_RANGE: std::ops::Range = range!(PARENT_RANGE.end, u64); const INDEX_RANGE: std::ops::Range = range!(SLOT_RANGE.end, u64); -const FORWARD_RANGE: std::ops::Range = range!(INDEX_RANGE.end, bool); +const ID_RANGE: std::ops::Range = range!(INDEX_RANGE.end, Pubkey); +const FORWARD_RANGE: std::ops::Range = range!(ID_RANGE.end, bool); const FLAGS_RANGE: std::ops::Range = range!(FORWARD_RANGE.end, u32); const SIZE_RANGE: std::ops::Range = range!(FLAGS_RANGE.end, u64); @@ -323,6 +325,16 @@ impl Blob { LittleEndian::write_u64(&mut self.data[INDEX_RANGE], ix); } + /// sender id, we use this for identifying if its a blob from the leader that we should + /// retransmit. eventually blobs should have a signature that we can use for spam filtering + pub fn id(&self) -> Pubkey { + Pubkey::new(&self.data[ID_RANGE]) + } + + pub fn set_id(&mut self, id: &Pubkey) { + self.data[ID_RANGE].copy_from_slice(id.as_ref()) + } + /// Used to determine whether or not this blob should be forwarded in retransmit /// A bool is used here instead of a flag because this item is not intended to be signed when /// blob signatures are introduced @@ -451,13 +463,14 @@ impl Blob { } } -pub fn index_blobs(blobs: &[SharedBlob], blob_index: &mut u64, slot: u64) { +pub fn index_blobs(blobs: &[SharedBlob], id: &Pubkey, blob_index: &mut u64, slot: u64) { // enumerate all the blobs, those are the indices for blob in blobs.iter() { let mut blob = blob.write().unwrap(); blob.set_index(*blob_index); blob.set_slot(slot); + blob.set_id(id); blob.forward(true); *blob_index += 1; } diff --git a/src/replicator.rs b/src/replicator.rs index 5c09c518d2..82926569a2 100644 --- a/src/replicator.rs +++ b/src/replicator.rs @@ -138,7 +138,7 @@ impl Replicator { let genesis_block = GenesisBlock::load(ledger_path).expect("Expected to successfully open genesis block"); - let (bank_forks, _bank_forks_info) = + let (_bank_forks, _bank_forks_info) = blocktree_processor::process_blocktree(&genesis_block, &blocktree, None) .expect("process_blocktree failed"); @@ -183,7 +183,6 @@ impl Replicator { blob_fetch_receiver, retransmit_sender, repair_socket, - Arc::new(RwLock::new(bank_forks)), exit.clone(), ); diff --git a/src/retransmit_stage.rs b/src/retransmit_stage.rs index 9de6671f74..2b4f020638 100644 --- a/src/retransmit_stage.rs +++ b/src/retransmit_stage.rs @@ -128,7 +128,6 @@ impl RetransmitStage { fetch_stage_receiver, retransmit_sender, repair_socket, - bank_forks.clone(), exit, ); diff --git a/src/window_service.rs b/src/window_service.rs index 627f4dad3c..e38825ec18 100644 --- a/src/window_service.rs +++ b/src/window_service.rs @@ -1,6 +1,5 @@ //! The `window_service` provides a thread for maintaining a window (tail of the ledger). //! -use crate::bank_forks::BankForks; use crate::blocktree::Blocktree; use crate::cluster_info::ClusterInfo; use crate::db_window::*; @@ -30,7 +29,6 @@ pub enum WindowServiceReturnType { fn recv_window( blocktree: &Arc, id: &Pubkey, - bank_forks: &Arc>, r: &BlobReceiver, retransmit: &BlobSender, ) -> Result<()> { @@ -49,7 +47,7 @@ fn recv_window( .to_owned(), ); - retransmit_blobs(&dq, bank_forks, retransmit, id)?; + retransmit_blobs(&dq, retransmit, id)?; //send a contiguous set of blocks trace!("{} num blobs received: {}", id, dq.len()); @@ -62,7 +60,7 @@ fn recv_window( trace!("{} window pix: {} size: {}", id, pix, meta_size); - let _ = process_blob(bank_forks, blocktree, &b); + let _ = process_blob(blocktree, &b); } trace!( @@ -104,7 +102,6 @@ impl WindowService { r: BlobReceiver, retransmit: BlobSender, repair_socket: Arc, - bank_forks: Arc>, exit: Arc, ) -> WindowService { let exit_ = exit.clone(); @@ -124,7 +121,7 @@ impl WindowService { if exit.load(Ordering::Relaxed) { break; } - if let Err(e) = recv_window(&blocktree, &id, &bank_forks, &r, &retransmit) { + if let Err(e) = recv_window(&blocktree, &id, &r, &retransmit) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), @@ -156,7 +153,6 @@ impl Service for WindowService { #[cfg(test)] mod test { - use crate::bank_forks::BankForks; use crate::blocktree::get_tmp_ledger_path; use crate::blocktree::Blocktree; use crate::cluster_info::{ClusterInfo, Node}; @@ -164,8 +160,6 @@ mod test { use crate::service::Service; use crate::streamer::{blob_receiver, responder}; use crate::window_service::WindowService; - use solana_runtime::bank::Bank; - use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::hash::Hash; use std::fs::remove_dir_all; use std::net::UdpSocket; @@ -195,17 +189,12 @@ mod test { let blocktree = Arc::new( Blocktree::open(&blocktree_path).expect("Expected to be able to open database ledger"), ); - let (genesis_block, _) = GenesisBlock::new(100); - let bank0 = Bank::new(&genesis_block); - let bank_id = 0; - let bank_forks = BankForks::new(bank_id, bank0); let t_window = WindowService::new( blocktree, subs, r_reader, s_retransmit, Arc::new(leader_node.sockets.repair), - Arc::new(RwLock::new(bank_forks)), exit.clone(), ); let t_responder = { @@ -216,11 +205,16 @@ mod test { let t_responder = responder("window_send_test", blob_sockets[0].clone(), r_responder); let num_blobs_to_make = 10; let gossip_address = &leader_node.info.gossip; - let msgs = - make_consecutive_blobs(num_blobs_to_make, 0, Hash::default(), &gossip_address) - .into_iter() - .rev() - .collect();; + let msgs = make_consecutive_blobs( + &me_id, + num_blobs_to_make, + 0, + Hash::default(), + &gossip_address, + ) + .into_iter() + .rev() + .collect();; s_responder.send(msgs).expect("send"); t_responder }; @@ -267,17 +261,12 @@ mod test { let blocktree = Arc::new( Blocktree::open(&blocktree_path).expect("Expected to be able to open database ledger"), ); - let (genesis_block, _) = GenesisBlock::new(100); - let bank0 = Bank::new(&genesis_block); - let bank_id = 0; - let bank_forks = BankForks::new(bank_id, bank0); let t_window = WindowService::new( blocktree, subs.clone(), r_reader, s_retransmit, Arc::new(leader_node.sockets.repair), - Arc::new(RwLock::new(bank_forks)), exit.clone(), ); let t_responder = { @@ -286,7 +275,8 @@ mod test { leader_node.sockets.tvu.into_iter().map(Arc::new).collect(); let t_responder = responder("window_send_test", blob_sockets[0].clone(), r_responder); let mut msgs = Vec::new(); - let blobs = make_consecutive_blobs(14u64, 0, Hash::default(), &leader_node.info.gossip); + let blobs = + make_consecutive_blobs(&me_id, 14u64, 0, Hash::default(), &leader_node.info.gossip); for v in 0..10 { let i = 9 - v; diff --git a/tests/tvu.rs b/tests/tvu.rs index 0ec3723f27..76e8638957 100644 --- a/tests/tvu.rs +++ b/tests/tvu.rs @@ -159,7 +159,7 @@ fn test_replay() { let entries = vec![entry0, entry_tick0, entry_tick1, entry1, entry_tick2]; let blobs = entries.to_shared_blobs(); - index_blobs(&blobs, &mut blob_idx, 0); + index_blobs(&blobs, &leader.info.id, &mut blob_idx, 0); blobs .iter() .for_each(|b| b.write().unwrap().meta.set_addr(&tvu_addr));