|
|
|
@ -1,15 +1,17 @@
|
|
|
|
|
//! `window_service` handles the data plane incoming blobs, storing them in
|
|
|
|
|
//! blocktree and retransmitting where required
|
|
|
|
|
//!
|
|
|
|
|
use crate::bank_forks::BankForks;
|
|
|
|
|
use crate::blocktree::Blocktree;
|
|
|
|
|
use crate::cluster_info::ClusterInfo;
|
|
|
|
|
use crate::packet::{SharedBlob, BLOB_HEADER_SIZE};
|
|
|
|
|
use crate::leader_schedule_utils::slot_leader_at;
|
|
|
|
|
use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE};
|
|
|
|
|
use crate::repair_service::{RepairService, RepairSlotRange};
|
|
|
|
|
use crate::result::{Error, Result};
|
|
|
|
|
use crate::service::Service;
|
|
|
|
|
use crate::streamer::{BlobReceiver, BlobSender};
|
|
|
|
|
use solana_metrics::counter::Counter;
|
|
|
|
|
use solana_metrics::{influxdb, submit};
|
|
|
|
|
use solana_runtime::bank::Bank;
|
|
|
|
|
use solana_sdk::pubkey::Pubkey;
|
|
|
|
|
use solana_sdk::timing::duration_as_ms;
|
|
|
|
|
use std::net::UdpSocket;
|
|
|
|
@ -64,33 +66,56 @@ fn process_blobs(blobs: &[SharedBlob], blocktree: &Arc<Blocktree>) -> Result<()>
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// drop blobs that are from myself or not from the correct leader for the
|
|
|
|
|
/// blob's slot
|
|
|
|
|
fn should_retransmit_and_persist(blob: &Blob, bank: Option<&Arc<Bank>>, my_id: &Pubkey) -> bool {
|
|
|
|
|
let slot_leader_id = bank.and_then(|bank| slot_leader_at(blob.slot(), &bank));
|
|
|
|
|
|
|
|
|
|
if blob.id() == *my_id {
|
|
|
|
|
inc_new_counter_info!("streamer-recv_window-circular_transmission", 1);
|
|
|
|
|
false
|
|
|
|
|
} else if slot_leader_id == None {
|
|
|
|
|
inc_new_counter_info!("streamer-recv_window-unknown_leader", 1);
|
|
|
|
|
true
|
|
|
|
|
} else if slot_leader_id != Some(blob.id()) {
|
|
|
|
|
inc_new_counter_info!("streamer-recv_window-wrong_leader", 1);
|
|
|
|
|
false
|
|
|
|
|
} else {
|
|
|
|
|
true
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn recv_window(
|
|
|
|
|
bank_forks: Option<&Arc<RwLock<BankForks>>>,
|
|
|
|
|
blocktree: &Arc<Blocktree>,
|
|
|
|
|
id: &Pubkey,
|
|
|
|
|
my_id: &Pubkey,
|
|
|
|
|
r: &BlobReceiver,
|
|
|
|
|
retransmit: &BlobSender,
|
|
|
|
|
) -> Result<()> {
|
|
|
|
|
let timer = Duration::from_millis(200);
|
|
|
|
|
let mut dq = r.recv_timeout(timer)?;
|
|
|
|
|
let mut blobs = r.recv_timeout(timer)?;
|
|
|
|
|
|
|
|
|
|
while let Ok(mut nq) = r.try_recv() {
|
|
|
|
|
dq.append(&mut nq)
|
|
|
|
|
while let Ok(mut blob) = r.try_recv() {
|
|
|
|
|
blobs.append(&mut blob)
|
|
|
|
|
}
|
|
|
|
|
let now = Instant::now();
|
|
|
|
|
inc_new_counter_info!("streamer-recv_window-recv", dq.len(), 100);
|
|
|
|
|
inc_new_counter_info!("streamer-recv_window-recv", blobs.len());
|
|
|
|
|
|
|
|
|
|
submit(
|
|
|
|
|
influxdb::Point::new("recv-window")
|
|
|
|
|
.add_field("count", influxdb::Value::Integer(dq.len() as i64))
|
|
|
|
|
.to_owned(),
|
|
|
|
|
);
|
|
|
|
|
blobs.retain(|blob| {
|
|
|
|
|
should_retransmit_and_persist(
|
|
|
|
|
&blob.read().unwrap(),
|
|
|
|
|
bank_forks
|
|
|
|
|
.map(|bank_forks| bank_forks.read().unwrap().working_bank())
|
|
|
|
|
.as_ref(),
|
|
|
|
|
my_id,
|
|
|
|
|
)
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
retransmit_blobs(&dq, retransmit, id)?;
|
|
|
|
|
retransmit_blobs(&blobs, retransmit, my_id)?;
|
|
|
|
|
|
|
|
|
|
//send a contiguous set of blocks
|
|
|
|
|
trace!("{} num blobs received: {}", id, dq.len());
|
|
|
|
|
trace!("{} num blobs received: {}", my_id, blobs.len());
|
|
|
|
|
|
|
|
|
|
process_blobs(&dq, blocktree)?;
|
|
|
|
|
process_blobs(&blobs, blocktree)?;
|
|
|
|
|
|
|
|
|
|
trace!(
|
|
|
|
|
"Elapsed processing time in recv_window(): {}",
|
|
|
|
@ -125,6 +150,7 @@ pub struct WindowService {
|
|
|
|
|
|
|
|
|
|
impl WindowService {
|
|
|
|
|
pub fn new(
|
|
|
|
|
bank_forks: Option<Arc<RwLock<BankForks>>>,
|
|
|
|
|
blocktree: Arc<Blocktree>,
|
|
|
|
|
cluster_info: Arc<RwLock<ClusterInfo>>,
|
|
|
|
|
r: BlobReceiver,
|
|
|
|
@ -141,6 +167,7 @@ impl WindowService {
|
|
|
|
|
repair_slot_range,
|
|
|
|
|
);
|
|
|
|
|
let exit = exit.clone();
|
|
|
|
|
let bank_forks = bank_forks.clone();
|
|
|
|
|
let t_window = Builder::new()
|
|
|
|
|
.name("solana-window".to_string())
|
|
|
|
|
.spawn(move || {
|
|
|
|
@ -151,7 +178,9 @@ impl WindowService {
|
|
|
|
|
if exit.load(Ordering::Relaxed) {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
if let Err(e) = recv_window(&blocktree, &id, &r, &retransmit) {
|
|
|
|
|
if let Err(e) =
|
|
|
|
|
recv_window(bank_forks.as_ref(), &blocktree, &id, &r, &retransmit)
|
|
|
|
|
{
|
|
|
|
|
match e {
|
|
|
|
|
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
|
|
|
|
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
|
|
|
@ -184,12 +213,15 @@ impl Service for WindowService {
|
|
|
|
|
#[cfg(test)]
|
|
|
|
|
mod test {
|
|
|
|
|
use super::*;
|
|
|
|
|
use crate::bank_forks::BankForks;
|
|
|
|
|
use crate::blocktree::{get_tmp_ledger_path, Blocktree};
|
|
|
|
|
use crate::cluster_info::{ClusterInfo, Node};
|
|
|
|
|
use crate::entry::{make_consecutive_blobs, make_tiny_test_entries, EntrySlice};
|
|
|
|
|
use crate::packet::index_blobs;
|
|
|
|
|
use crate::packet::{index_blobs, Blob};
|
|
|
|
|
use crate::service::Service;
|
|
|
|
|
use crate::streamer::{blob_receiver, responder};
|
|
|
|
|
use solana_runtime::bank::Bank;
|
|
|
|
|
use solana_sdk::genesis_block::GenesisBlock;
|
|
|
|
|
use solana_sdk::hash::Hash;
|
|
|
|
|
use std::fs::remove_dir_all;
|
|
|
|
|
use std::net::UdpSocket;
|
|
|
|
@ -221,6 +253,46 @@ mod test {
|
|
|
|
|
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn test_should_retransmit_and_persist() {
|
|
|
|
|
let me_id = Pubkey::new_rand();
|
|
|
|
|
let leader_id = Pubkey::new_rand();
|
|
|
|
|
let bank = Arc::new(Bank::new(
|
|
|
|
|
&GenesisBlock::new_with_leader(100, &leader_id, 10).0,
|
|
|
|
|
));
|
|
|
|
|
|
|
|
|
|
let mut blob = Blob::default();
|
|
|
|
|
blob.set_id(&leader_id);
|
|
|
|
|
|
|
|
|
|
// without a Bank and blobs not from me, blob continues
|
|
|
|
|
assert_eq!(should_retransmit_and_persist(&blob, None, &me_id), true);
|
|
|
|
|
|
|
|
|
|
// with a Bank for slot 0, blob continues
|
|
|
|
|
assert_eq!(
|
|
|
|
|
should_retransmit_and_persist(&blob, Some(&bank), &me_id),
|
|
|
|
|
true
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
// set the blob to have come from the wrong leader
|
|
|
|
|
blob.set_id(&Pubkey::new_rand());
|
|
|
|
|
assert_eq!(
|
|
|
|
|
should_retransmit_and_persist(&blob, Some(&bank), &me_id),
|
|
|
|
|
false
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
// with a Bank and no idea who leader is, we keep the blobs (for now)
|
|
|
|
|
// TODO: persistr in blocktree that we didn't know who the leader was at the time?
|
|
|
|
|
blob.set_slot(100);
|
|
|
|
|
assert_eq!(
|
|
|
|
|
should_retransmit_and_persist(&blob, Some(&bank), &me_id),
|
|
|
|
|
true
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
// if the blob came back from me, it doesn't continue, whether or not I have a bank
|
|
|
|
|
blob.set_id(&me_id);
|
|
|
|
|
assert_eq!(should_retransmit_and_persist(&blob, None, &me_id), false);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
pub fn window_send_test() {
|
|
|
|
|
solana_logger::setup();
|
|
|
|
@ -241,6 +313,10 @@ mod test {
|
|
|
|
|
Blocktree::open(&blocktree_path).expect("Expected to be able to open database ledger"),
|
|
|
|
|
);
|
|
|
|
|
let t_window = WindowService::new(
|
|
|
|
|
Some(Arc::new(RwLock::new(BankForks::new(
|
|
|
|
|
0,
|
|
|
|
|
Bank::new(&GenesisBlock::new_with_leader(100, &me_id, 10).0),
|
|
|
|
|
)))),
|
|
|
|
|
blocktree,
|
|
|
|
|
subs,
|
|
|
|
|
r_reader,
|
|
|
|
@ -313,6 +389,10 @@ mod test {
|
|
|
|
|
Blocktree::open(&blocktree_path).expect("Expected to be able to open database ledger"),
|
|
|
|
|
);
|
|
|
|
|
let t_window = WindowService::new(
|
|
|
|
|
Some(Arc::new(RwLock::new(BankForks::new(
|
|
|
|
|
0,
|
|
|
|
|
Bank::new(&GenesisBlock::new_with_leader(100, &me_id, 10).0),
|
|
|
|
|
)))),
|
|
|
|
|
blocktree,
|
|
|
|
|
subs.clone(),
|
|
|
|
|
r_reader,
|
|
|
|
|