diff --git a/core/src/replicator.rs b/core/src/replicator.rs index 0adc3f3d42..55e0eba17d 100644 --- a/core/src/replicator.rs +++ b/core/src/replicator.rs @@ -2,8 +2,9 @@ use crate::blob_fetch_stage::BlobFetchStage; use crate::blocktree::Blocktree; #[cfg(feature = "chacha")] use crate::chacha::{chacha_cbc_encrypt_ledger, CHACHA_BLOCK_SIZE}; -use crate::cluster_info::{ClusterInfo, Node, FULLNODE_PORT_RANGE}; +use crate::cluster_info::{ClusterInfo, Node}; use crate::contact_info::ContactInfo; +use crate::fullnode::new_banks_from_blocktree; use crate::gossip_service::GossipService; use crate::packet::to_shared_blob; use crate::repair_service::{RepairSlotRange, RepairStrategy}; @@ -18,7 +19,7 @@ use rand::thread_rng; use rand::Rng; use solana_client::rpc_client::RpcClient; use solana_client::rpc_request::RpcRequest; -use solana_client::thin_client::{create_client, ThinClient}; +use solana_client::thin_client::ThinClient; use solana_ed25519_dalek as ed25519_dalek; use solana_sdk::client::{AsyncClient, SyncClient}; use solana_sdk::hash::{Hash, Hasher}; @@ -64,7 +65,7 @@ pub struct Replicator { keypair: Arc, storage_keypair: Arc, signature: ed25519_dalek::Signature, - cluster_entrypoint: ContactInfo, + client: ThinClient, ledger_data_file_encrypted: PathBuf, sampling_offsets: Vec, hash: Hash, @@ -188,15 +189,15 @@ impl Replicator { cluster_info.set_entrypoint(cluster_entrypoint.clone()); let cluster_info = Arc::new(RwLock::new(cluster_info)); - // Create Blocktree, eventually will simply repurpose the input - // ledger path as the Blocktree path once we replace the ledger with - // Blocktree. Note for now, this ledger will not contain any of the existing entries + // Note for now, this ledger will not contain any of the existing entries // in the ledger located at ledger_path, and will only append on newly received // entries after being passed to window_service - let blocktree = - Blocktree::open(ledger_path).expect("Expected to be able to open database ledger"); - + let (bank_forks, bank_forks_info, blocktree, _, _, _) = + new_banks_from_blocktree(ledger_path, None); let blocktree = Arc::new(blocktree); + let bank_info = &bank_forks_info[0]; + let bank = bank_forks[bank_info.bank_slot].clone(); + let genesis_blockhash = bank.last_blockhash(); let gossip_service = GossipService::new( &cluster_info, @@ -231,7 +232,6 @@ impl Replicator { let (retransmit_sender, retransmit_receiver) = channel(); let window_service = WindowService::new( - None, //TODO: need a way to validate blobs... https://github.com/solana-labs/solana/issues/3924 blocktree.clone(), cluster_info.clone(), blob_fetch_receiver, @@ -239,7 +239,8 @@ impl Replicator { repair_socket, &exit, RepairStrategy::RepairRange(repair_slot_range), - &Hash::default(), + &genesis_blockhash, + |_, _, _| true, ); Self::setup_mining_account(&client, &keypair, &storage_keypair)?; @@ -261,11 +262,8 @@ impl Replicator { let t_replicate = { let exit = exit.clone(); let blocktree = blocktree.clone(); - spawn(move || loop { - Self::wait_for_ledger_download(slot, &blocktree, &exit, &node_info, &cluster_info); - if exit.load(Ordering::Relaxed) { - break; - } + spawn(move || { + Self::wait_for_ledger_download(slot, &blocktree, &exit, &node_info, &cluster_info) }) }; //always push this last @@ -282,7 +280,7 @@ impl Replicator { keypair, storage_keypair, signature, - cluster_entrypoint, + client, ledger_data_file_encrypted: PathBuf::default(), sampling_offsets: vec![], hash: Hash::default(), @@ -305,6 +303,8 @@ impl Replicator { break; } self.submit_mining_proof(); + // TODO: Replicators should be submitting proofs as fast as possible + sleep(Duration::from_secs(2)); } } @@ -315,16 +315,17 @@ impl Replicator { node_info: &ContactInfo, cluster_info: &Arc>, ) { - info!("window created, waiting for ledger download"); - let mut _received_so_far = 0; - + info!( + "window created, waiting for ledger download starting at slot {:?}", + start_slot + ); let mut current_slot = start_slot; 'outer: loop { while let Ok(meta) = blocktree.meta(current_slot) { if let Some(meta) = meta { - if meta.is_connected { + if meta.is_full() { current_slot += 1; - warn!("current slot: {}", current_slot); + info!("current slot: {}", current_slot); if current_slot >= start_slot + SLOTS_PER_SEGMENT { break 'outer; } @@ -444,20 +445,25 @@ impl Replicator { } fn submit_mining_proof(&self) { - let client = create_client( - self.cluster_entrypoint.client_facing_addr(), - FULLNODE_PORT_RANGE, - ); // No point if we've got no storage account... assert!( - client + self.client .poll_get_balance(&self.storage_keypair.pubkey()) .unwrap() > 0 ); // ...or no lamports for fees - assert!(client.poll_get_balance(&self.keypair.pubkey()).unwrap() > 0); + assert!( + self.client + .poll_get_balance(&self.keypair.pubkey()) + .unwrap() + > 0 + ); + let blockhash = self + .client + .get_recent_blockhash() + .expect("No recent blockhash"); let instruction = storage_instruction::mining_proof( &self.storage_keypair.pubkey(), self.hash, @@ -465,8 +471,12 @@ impl Replicator { Signature::new(&self.signature.to_bytes()), ); let message = Message::new_with_payer(vec![instruction], Some(&self.keypair.pubkey())); - let mut transaction = Transaction::new_unsigned(message); - client + let mut transaction = Transaction::new( + &[self.keypair.as_ref(), self.storage_keypair.as_ref()], + message, + blockhash, + ); + self.client .send_and_confirm_transaction( &[&self.keypair, &self.storage_keypair], &mut transaction, diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 5e26c5c94d..53f8083176 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -9,7 +9,7 @@ use crate::result::{Error, Result}; use crate::service::Service; use crate::staking_utils; use crate::streamer::BlobReceiver; -use crate::window_service::WindowService; +use crate::window_service::{should_retransmit_and_persist, WindowService}; use solana_metrics::{datapoint, inc_new_counter_info}; use solana_runtime::epoch_schedule::EpochSchedule; use solana_sdk::hash::Hash; @@ -135,8 +135,8 @@ impl RetransmitStage { completed_slots_receiver, epoch_schedule, }; + let leader_schedule_cache = leader_schedule_cache.clone(); let window_service = WindowService::new( - Some(leader_schedule_cache.clone()), blocktree, cluster_info.clone(), fetch_stage_receiver, @@ -145,6 +145,9 @@ impl RetransmitStage { exit, repair_strategy, genesis_blockhash, + move |id, blob, working_bank| { + should_retransmit_and_persist(blob, working_bank, &leader_schedule_cache, id) + }, ); let thread_hdls = vec![t_retransmit]; diff --git a/core/src/window_service.rs b/core/src/window_service.rs index ce85aab64d..d2acd04607 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -1,11 +1,9 @@ //! `window_service` handles the data plane incoming blobs, storing them in //! blocktree and retransmitting where required //! -use crate::bank_forks::BankForks; use crate::blocktree::Blocktree; use crate::cluster_info::ClusterInfo; use crate::leader_schedule_cache::LeaderScheduleCache; -use crate::leader_schedule_utils::slot_leader_at; use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE}; use crate::repair_service::{RepairService, RepairStrategy}; use crate::result::{Error, Result}; @@ -77,19 +75,16 @@ fn process_blobs(blobs: &[SharedBlob], blocktree: &Arc) -> Result<()> } /// drop blobs that are from myself or not from the correct leader for the -/// blob's slot -fn should_retransmit_and_persist( +/// blob's slot +pub fn should_retransmit_and_persist( blob: &Blob, - bank: Option<&Arc>, - leader_schedule_cache: Option<&Arc>, + bank: Option>, + leader_schedule_cache: &Arc, my_id: &Pubkey, ) -> bool { let slot_leader_id = match bank { - None => leader_schedule_cache.and_then(|cache| cache.slot_leader_at(blob.slot(), None)), - Some(bank) => match leader_schedule_cache { - None => slot_leader_at(blob.slot(), &bank), - Some(cache) => cache.slot_leader_at(blob.slot(), Some(bank)), - }, + None => leader_schedule_cache.slot_leader_at(blob.slot(), None), + Some(bank) => leader_schedule_cache.slot_leader_at(blob.slot(), Some(&bank)), }; if blob.id() == *my_id { @@ -106,15 +101,17 @@ fn should_retransmit_and_persist( } } -fn recv_window( - bank_forks: Option<&Arc>>, - leader_schedule_cache: Option<&Arc>, +fn recv_window( blocktree: &Arc, my_id: &Pubkey, r: &BlobReceiver, retransmit: &BlobSender, genesis_blockhash: &Hash, -) -> Result<()> { + blob_filter: F, +) -> Result<()> +where + F: Fn(&Blob) -> bool, +{ let timer = Duration::from_millis(200); let mut blobs = r.recv_timeout(timer)?; @@ -125,14 +122,8 @@ fn recv_window( inc_new_counter_info!("streamer-recv_window-recv", blobs.len(), 0, 1000); blobs.retain(|blob| { - should_retransmit_and_persist( - &blob.read().unwrap(), - bank_forks - .map(|bank_forks| bank_forks.read().unwrap().working_bank()) - .as_ref(), - leader_schedule_cache, - my_id, - ) && blob.read().unwrap().genesis_blockhash() == *genesis_blockhash + blob_filter(&blob.read().unwrap()) + && blob.read().unwrap().genesis_blockhash() == *genesis_blockhash }); retransmit_blobs(&blobs, retransmit, my_id)?; @@ -174,8 +165,7 @@ pub struct WindowService { impl WindowService { #[allow(clippy::too_many_arguments)] - pub fn new( - leader_schedule_cache: Option>, + pub fn new( blocktree: Arc, cluster_info: Arc>, r: BlobReceiver, @@ -184,7 +174,14 @@ impl WindowService { exit: &Arc, repair_strategy: RepairStrategy, genesis_blockhash: &Hash, - ) -> WindowService { + blob_filter: F, + ) -> WindowService + where + F: 'static + + Fn(&Pubkey, &Blob, Option>) -> bool + + std::marker::Send + + std::marker::Sync, + { let bank_forks = match repair_strategy { RepairStrategy::RepairRange(_) => None, @@ -199,8 +196,9 @@ impl WindowService { repair_strategy, ); let exit = exit.clone(); - let leader_schedule_cache = leader_schedule_cache.clone(); let hash = *genesis_blockhash; + let blob_filter = Arc::new(blob_filter); + let bank_forks = bank_forks.clone(); let t_window = Builder::new() .name("solana-window".to_string()) .spawn(move || { @@ -212,15 +210,15 @@ impl WindowService { break; } - if let Err(e) = recv_window( - bank_forks.as_ref(), - leader_schedule_cache.as_ref(), - &blocktree, - &id, - &r, - &retransmit, - &hash, - ) { + if let Err(e) = recv_window(&blocktree, &id, &r, &retransmit, &hash, |blob| { + blob_filter( + &id, + blob, + bank_forks + .as_ref() + .map(|bank_forks| bank_forks.read().unwrap().working_bank()), + ) + }) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), @@ -307,20 +305,20 @@ mod test { // without a Bank and blobs not from me, blob continues assert_eq!( - should_retransmit_and_persist(&blob, None, None, &me_id), + should_retransmit_and_persist(&blob, None, &cache, &me_id), true ); // with a Bank for slot 0, blob continues assert_eq!( - should_retransmit_and_persist(&blob, Some(&bank), Some(&cache), &me_id), + should_retransmit_and_persist(&blob, Some(bank.clone()), &cache, &me_id), true ); // set the blob to have come from the wrong leader blob.set_id(&Pubkey::new_rand()); assert_eq!( - should_retransmit_and_persist(&blob, Some(&bank), Some(&cache), &me_id), + should_retransmit_and_persist(&blob, Some(bank.clone()), &cache, &me_id), false ); @@ -328,14 +326,14 @@ mod test { // TODO: persist in blocktree that we didn't know who the leader was at the time? blob.set_slot(MINIMUM_SLOT_LENGTH as u64 * 3); assert_eq!( - should_retransmit_and_persist(&blob, Some(&bank), Some(&cache), &me_id), + should_retransmit_and_persist(&blob, Some(bank), &cache, &me_id), true ); // if the blob came back from me, it doesn't continue, whether or not I have a bank blob.set_id(&me_id); assert_eq!( - should_retransmit_and_persist(&blob, None, None, &me_id), + should_retransmit_and_persist(&blob, None, &cache, &me_id), false ); } @@ -361,7 +359,6 @@ mod test { let blocktree = Arc::new(blocktree); let bank = Bank::new(&create_genesis_block_with_leader(100, &me_id, 10).0); - let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); let repair_strategy = RepairStrategy::RepairAll { bank_forks: bank_forks.clone(), @@ -374,7 +371,6 @@ mod test { .clone(), }; let t_window = WindowService::new( - Some(leader_schedule_cache), blocktree, subs, r_reader, @@ -383,6 +379,7 @@ mod test { &exit, repair_strategy, &Hash::default(), + |_, _, _| true, ); let t_responder = { let (s_responder, r_responder) = channel(); @@ -449,7 +446,6 @@ mod test { let blocktree = Arc::new(blocktree); let bank = Bank::new(&create_genesis_block_with_leader(100, &me_id, 10).0); - let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); let epoch_schedule = *bank_forks.read().unwrap().working_bank().epoch_schedule(); let repair_strategy = RepairStrategy::RepairAll { @@ -458,7 +454,6 @@ mod test { epoch_schedule, }; let t_window = WindowService::new( - Some(leader_schedule_cache), blocktree, subs.clone(), r_reader, @@ -467,6 +462,7 @@ mod test { &exit, repair_strategy, &Hash::default(), + |_, _, _| true, ); let t_responder = { let (s_responder, r_responder) = channel();