//! The `window` module defines data structure for storing the tail of the ledger. //! use crate::packet::SharedBlob; use solana_sdk::pubkey::Pubkey; use std::cmp; use std::sync::{Arc, RwLock}; #[derive(Default, Clone)] pub struct WindowSlot { pub data: Option, pub coding: Option, pub leader_unknown: bool, } impl WindowSlot { fn blob_index(&self) -> Option { match self.data { Some(ref blob) => Some(blob.read().unwrap().index()), None => None, } } fn clear_data(&mut self) { self.data.take(); } } type Window = Vec; pub type SharedWindow = Arc>; #[derive(Debug)] pub struct WindowIndex { pub data: u64, pub coding: u64, } pub trait WindowUtil { /// Finds available slots, clears them, and returns their indices. fn clear_slots(&mut self, consumed: u64, received: u64) -> Vec; fn window_size(&self) -> u64; fn print(&self, id: &Pubkey, consumed: u64) -> String; fn blob_idx_in_window(&self, id: &Pubkey, pix: u64, consumed: u64, received: &mut u64) -> bool; } impl WindowUtil for Window { fn clear_slots(&mut self, consumed: u64, received: u64) -> Vec { (consumed..received) .filter_map(|pix| { let i = (pix % self.window_size()) as usize; if let Some(blob_idx) = self[i].blob_index() { if blob_idx == pix { return None; } } self[i].clear_data(); Some(pix) }) .collect() } fn blob_idx_in_window(&self, id: &Pubkey, pix: u64, consumed: u64, received: &mut u64) -> bool { // Prevent receive window from running over // Got a blob which has already been consumed, skip it // probably from a repair window request if pix < consumed { trace!( "{}: received: {} but older than consumed: {} skipping..", id, pix, consumed ); false } else { // received always has to be updated even if we don't accept the packet into // the window. The worst case here is the server *starts* outside // the window, none of the packets it receives fits in the window // and repair requests (which are based on received) are never generated *received = cmp::max(pix, *received); if pix >= consumed + self.window_size() { trace!( "{}: received: {} will overrun window: {} skipping..", id, pix, consumed + self.window_size() ); false } else { true } } } fn window_size(&self) -> u64 { self.len() as u64 } fn print(&self, id: &Pubkey, consumed: u64) -> String { let pointer: Vec<_> = self .iter() .enumerate() .map(|(i, _v)| { if i == (consumed % self.window_size()) as usize { "V" } else { " " } }) .collect(); let buf: Vec<_> = self .iter() .map(|v| { if v.data.is_none() && v.coding.is_none() { "O" } else if v.data.is_some() && v.coding.is_some() { "D" } else if v.data.is_some() { // coding.is_none() "d" } else { // data.is_none() "c" } }) .collect(); format!( "\n{}: WINDOW ({}): {}\n{}: WINDOW ({}): {}", id, consumed, pointer.join(""), id, consumed, buf.join("") ) } } fn calculate_max_repair( num_peers: u64, consumed: u64, received: u64, times: usize, is_next_leader: bool, window_size: u64, ) -> u64 { // Calculate the highest blob index that this node should have already received // via avalanche. The avalanche splits data stream into nodes and each node retransmits // the data to their peer nodes. So there's a possibility that a blob (with index lower // than current received index) is being retransmitted by a peer node. let max_repair = if times >= 8 || is_next_leader { // if repair backoff is getting high, or if we are the next leader, // don't wait for avalanche cmp::max(consumed, received) } else { cmp::max(consumed, received.saturating_sub(num_peers)) }; // This check prevents repairing a blob that will cause window to roll over. Even if // the highes_lost blob is actually missing, asking to repair it might cause our // current window to move past other missing blobs cmp::min(consumed + window_size - 1, max_repair) } pub fn new_window(window_size: usize) -> Window { (0..window_size).map(|_| WindowSlot::default()).collect() } pub fn default_window() -> Window { (0..2048).map(|_| WindowSlot::default()).collect() } #[cfg(test)] mod test { use crate::packet::{Blob, Packet, Packets, SharedBlob, PACKET_DATA_SIZE}; use crate::streamer::{receiver, responder, PacketReceiver}; use crate::window::{calculate_max_repair, new_window, Window, WindowUtil}; use solana_sdk::pubkey::Pubkey; use std::io; use std::io::Write; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; use std::sync::Arc; use std::time::Duration; fn get_msgs(r: PacketReceiver, num: &mut usize) { for _t in 0..5 { let timer = Duration::new(1, 0); match r.recv_timeout(timer) { Ok(m) => *num += m.read().unwrap().packets.len(), e => info!("error {:?}", e), } if *num == 10 { break; } } } #[test] pub fn streamer_debug() { write!(io::sink(), "{:?}", Packet::default()).unwrap(); write!(io::sink(), "{:?}", Packets::default()).unwrap(); write!(io::sink(), "{:?}", Blob::default()).unwrap(); } #[test] pub fn streamer_send_test() { let read = UdpSocket::bind("127.0.0.1:0").expect("bind"); read.set_read_timeout(Some(Duration::new(1, 0))).unwrap(); let addr = read.local_addr().unwrap(); let send = UdpSocket::bind("127.0.0.1:0").expect("bind"); let exit = Arc::new(AtomicBool::new(false)); let (s_reader, r_reader) = channel(); let t_receiver = receiver(Arc::new(read), &exit, s_reader, "window-streamer-test"); let t_responder = { let (s_responder, r_responder) = channel(); let t_responder = responder("streamer_send_test", Arc::new(send), r_responder); let mut msgs = Vec::new(); for i in 0..10 { let b = SharedBlob::default(); { let mut w = b.write().unwrap(); w.data[0] = i as u8; w.meta.size = PACKET_DATA_SIZE; w.meta.set_addr(&addr); } msgs.push(b); } s_responder.send(msgs).expect("send"); t_responder }; let mut num = 0; get_msgs(r_reader, &mut num); assert_eq!(num, 10); exit.store(true, Ordering::Relaxed); t_receiver.join().expect("join"); t_responder.join().expect("join"); } #[test] pub fn test_calculate_max_repair() { const WINDOW_SIZE: u64 = 200; assert_eq!(calculate_max_repair(0, 10, 90, 0, false, WINDOW_SIZE), 90); assert_eq!(calculate_max_repair(15, 10, 90, 32, false, WINDOW_SIZE), 90); assert_eq!(calculate_max_repair(15, 10, 90, 0, false, WINDOW_SIZE), 75); assert_eq!(calculate_max_repair(90, 10, 90, 0, false, WINDOW_SIZE), 10); assert_eq!(calculate_max_repair(90, 10, 50, 0, false, WINDOW_SIZE), 10); assert_eq!(calculate_max_repair(90, 10, 99, 0, false, WINDOW_SIZE), 10); assert_eq!(calculate_max_repair(90, 10, 101, 0, false, WINDOW_SIZE), 11); assert_eq!( calculate_max_repair(90, 10, 95 + WINDOW_SIZE, 0, false, WINDOW_SIZE), WINDOW_SIZE + 5 ); assert_eq!( calculate_max_repair(90, 10, 99 + WINDOW_SIZE, 0, false, WINDOW_SIZE), WINDOW_SIZE + 9 ); assert_eq!( calculate_max_repair(90, 10, 100 + WINDOW_SIZE, 0, false, WINDOW_SIZE), WINDOW_SIZE + 9 ); assert_eq!( calculate_max_repair(90, 10, 120 + WINDOW_SIZE, 0, false, WINDOW_SIZE), WINDOW_SIZE + 9 ); assert_eq!( calculate_max_repair(50, 100, 50 + WINDOW_SIZE, 0, false, WINDOW_SIZE), WINDOW_SIZE ); assert_eq!( calculate_max_repair(50, 100, 50 + WINDOW_SIZE, 0, true, WINDOW_SIZE), 50 + WINDOW_SIZE ); } fn wrap_blob_idx_in_window( window: &Window, id: &Pubkey, pix: u64, consumed: u64, received: u64, ) -> (bool, u64) { let mut received = received; let is_in_window = window.blob_idx_in_window(&id, pix, consumed, &mut received); (is_in_window, received) } #[test] pub fn test_blob_idx_in_window() { let id = Pubkey::default(); const WINDOW_SIZE: u64 = 200; let window = new_window(WINDOW_SIZE as usize); assert_eq!( wrap_blob_idx_in_window(&window, &id, 90 + WINDOW_SIZE, 90, 100), (false, 90 + WINDOW_SIZE) ); assert_eq!( wrap_blob_idx_in_window(&window, &id, 91 + WINDOW_SIZE, 90, 100), (false, 91 + WINDOW_SIZE) ); assert_eq!( wrap_blob_idx_in_window(&window, &id, 89, 90, 100), (false, 100) ); assert_eq!( wrap_blob_idx_in_window(&window, &id, 91, 90, 100), (true, 100) ); assert_eq!( wrap_blob_idx_in_window(&window, &id, 101, 90, 100), (true, 101) ); } }