diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 3dd2d6ede5..9a0f5bbfaa 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -134,15 +134,13 @@ impl BankingStage { fn forward_buffered_packets( socket: &std::net::UdpSocket, - tpu_via_blobs: &std::net::SocketAddr, + tpu_forwards: &std::net::SocketAddr, unprocessed_packets: &[PacketsAndOffsets], ) -> std::io::Result<()> { let packets = Self::filter_valid_packets_for_forwarding(unprocessed_packets); inc_new_counter_info!("banking_stage-forwarded_packets", packets.len()); - let blobs = packet::packets_to_blobs(&packets); - - for blob in blobs { - socket.send_to(&blob.data[..blob.meta.size], tpu_via_blobs)?; + for p in packets { + socket.send_to(&p.data[..p.meta.size], &tpu_forwards)?; } Ok(()) @@ -316,7 +314,7 @@ impl BankingStage { .read() .unwrap() .lookup(&leader_pubkey) - .map(|leader| leader.tpu_via_blobs) + .map(|leader| leader.tpu_forwards) }; leader_addr.map_or(Ok(()), |leader_addr| { diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index add160c10c..7632dff056 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -1463,7 +1463,7 @@ pub struct Sockets { pub gossip: UdpSocket, pub tvu: Vec, pub tpu: Vec, - pub tpu_via_blobs: Vec, + pub tpu_forwards: Vec, pub broadcast: UdpSocket, pub repair: UdpSocket, pub retransmit: UdpSocket, @@ -1508,7 +1508,7 @@ impl Node { gossip, tvu: vec![tvu], tpu: vec![], - tpu_via_blobs: vec![], + tpu_forwards: vec![], broadcast, repair, retransmit, @@ -1520,7 +1520,7 @@ impl Node { let tpu = UdpSocket::bind("127.0.0.1:0").unwrap(); let gossip = UdpSocket::bind("127.0.0.1:0").unwrap(); let tvu = UdpSocket::bind("127.0.0.1:0").unwrap(); - let tpu_via_blobs = UdpSocket::bind("127.0.0.1:0").unwrap(); + let tpu_forwards = UdpSocket::bind("127.0.0.1:0").unwrap(); let repair = UdpSocket::bind("127.0.0.1:0").unwrap(); let rpc_port = find_available_port_in_range((1024, 65535)).unwrap(); let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_port); @@ -1536,7 +1536,7 @@ impl Node { gossip.local_addr().unwrap(), tvu.local_addr().unwrap(), tpu.local_addr().unwrap(), - tpu_via_blobs.local_addr().unwrap(), + tpu_forwards.local_addr().unwrap(), storage.local_addr().unwrap(), rpc_addr, rpc_pubsub_addr, @@ -1548,7 +1548,7 @@ impl Node { gossip, tvu: vec![tvu], tpu: vec![tpu], - tpu_via_blobs: vec![tpu_via_blobs], + tpu_forwards: vec![tpu_forwards], broadcast, repair, retransmit, @@ -1582,7 +1582,7 @@ impl Node { let (tpu_port, tpu_sockets) = multi_bind_in_range(port_range, 32).expect("tpu multi_bind"); - let (tpu_via_blobs_port, tpu_via_blobs_sockets) = + let (tpu_forwards_port, tpu_forwards_sockets) = multi_bind_in_range(port_range, 8).expect("tpu multi_bind"); let (_, repair) = Self::bind(port_range); @@ -1594,7 +1594,7 @@ impl Node { SocketAddr::new(gossip_addr.ip(), gossip_port), SocketAddr::new(gossip_addr.ip(), tvu_port), SocketAddr::new(gossip_addr.ip(), tpu_port), - SocketAddr::new(gossip_addr.ip(), tpu_via_blobs_port), + SocketAddr::new(gossip_addr.ip(), tpu_forwards_port), socketaddr_any!(), socketaddr_any!(), socketaddr_any!(), @@ -1608,7 +1608,7 @@ impl Node { gossip, tvu: tvu_sockets, tpu: tpu_sockets, - tpu_via_blobs: tpu_via_blobs_sockets, + tpu_forwards: tpu_forwards_sockets, broadcast, repair, retransmit, @@ -1629,9 +1629,9 @@ impl Node { let empty = socketaddr_any!(); new.info.tpu = empty; - new.info.tpu_via_blobs = empty; + new.info.tpu_forwards = empty; new.sockets.tpu = vec![]; - new.sockets.tpu_via_blobs = vec![]; + new.sockets.tpu_forwards = vec![]; new } diff --git a/core/src/contact_info.rs b/core/src/contact_info.rs index 9d77a20bff..3cf1262b5e 100644 --- a/core/src/contact_info.rs +++ b/core/src/contact_info.rs @@ -23,7 +23,7 @@ pub struct ContactInfo { /// transactions address pub tpu: SocketAddr, /// address to forward unprocessed transactions to - pub tpu_via_blobs: SocketAddr, + pub tpu_forwards: SocketAddr, /// storage data address pub storage_addr: SocketAddr, /// address to which to send JSON-RPC requests @@ -78,7 +78,7 @@ impl Default for ContactInfo { gossip: socketaddr_any!(), tvu: socketaddr_any!(), tpu: socketaddr_any!(), - tpu_via_blobs: socketaddr_any!(), + tpu_forwards: socketaddr_any!(), storage_addr: socketaddr_any!(), rpc: socketaddr_any!(), rpc_pubsub: socketaddr_any!(), @@ -94,7 +94,7 @@ impl ContactInfo { gossip: SocketAddr, tvu: SocketAddr, tpu: SocketAddr, - tpu_via_blobs: SocketAddr, + tpu_forwards: SocketAddr, storage_addr: SocketAddr, rpc: SocketAddr, rpc_pubsub: SocketAddr, @@ -106,7 +106,7 @@ impl ContactInfo { gossip, tvu, tpu, - tpu_via_blobs, + tpu_forwards, storage_addr, rpc, rpc_pubsub, @@ -157,7 +157,7 @@ impl ContactInfo { let tpu_addr = *bind_addr; let gossip_addr = next_port(&bind_addr, 1); let tvu_addr = next_port(&bind_addr, 2); - let tpu_via_blobs_addr = next_port(&bind_addr, 3); + let tpu_forwards_addr = next_port(&bind_addr, 3); let rpc_addr = SocketAddr::new(bind_addr.ip(), rpc_port::DEFAULT_RPC_PORT); let rpc_pubsub_addr = SocketAddr::new(bind_addr.ip(), rpc_port::DEFAULT_RPC_PUBSUB_PORT); Self::new( @@ -165,7 +165,7 @@ impl ContactInfo { gossip_addr, tvu_addr, tpu_addr, - tpu_via_blobs_addr, + tpu_forwards_addr, "0.0.0.0:0".parse().unwrap(), rpc_addr, rpc_pubsub_addr, @@ -233,7 +233,7 @@ impl Signable for ContactInfo { gossip: SocketAddr, tvu: SocketAddr, tpu: SocketAddr, - tpu_via_blobs: SocketAddr, + tpu_forwards: SocketAddr, storage_addr: SocketAddr, rpc: SocketAddr, rpc_pubsub: SocketAddr, @@ -247,7 +247,7 @@ impl Signable for ContactInfo { tvu: me.tvu, tpu: me.tpu, storage_addr: me.storage_addr, - tpu_via_blobs: me.tpu_via_blobs, + tpu_forwards: me.tpu_forwards, rpc: me.rpc, rpc_pubsub: me.rpc_pubsub, wallclock: me.wallclock, @@ -287,7 +287,7 @@ mod tests { let ci = ContactInfo::default(); assert!(ci.gossip.ip().is_unspecified()); assert!(ci.tvu.ip().is_unspecified()); - assert!(ci.tpu_via_blobs.ip().is_unspecified()); + assert!(ci.tpu_forwards.ip().is_unspecified()); assert!(ci.rpc.ip().is_unspecified()); assert!(ci.rpc_pubsub.ip().is_unspecified()); assert!(ci.tpu.ip().is_unspecified()); @@ -298,7 +298,7 @@ mod tests { let ci = ContactInfo::new_multicast(); assert!(ci.gossip.ip().is_multicast()); assert!(ci.tvu.ip().is_multicast()); - assert!(ci.tpu_via_blobs.ip().is_multicast()); + assert!(ci.tpu_forwards.ip().is_multicast()); assert!(ci.rpc.ip().is_multicast()); assert!(ci.rpc_pubsub.ip().is_multicast()); assert!(ci.tpu.ip().is_multicast()); @@ -310,7 +310,7 @@ mod tests { let ci = ContactInfo::new_gossip_entry_point(&addr); assert_eq!(ci.gossip, addr); assert!(ci.tvu.ip().is_unspecified()); - assert!(ci.tpu_via_blobs.ip().is_unspecified()); + assert!(ci.tpu_forwards.ip().is_unspecified()); assert!(ci.rpc.ip().is_unspecified()); assert!(ci.rpc_pubsub.ip().is_unspecified()); assert!(ci.tpu.ip().is_unspecified()); @@ -323,7 +323,7 @@ mod tests { assert_eq!(ci.tpu, addr); assert_eq!(ci.gossip.port(), 11); assert_eq!(ci.tvu.port(), 12); - assert_eq!(ci.tpu_via_blobs.port(), 13); + assert_eq!(ci.tpu_forwards.port(), 13); assert_eq!(ci.rpc.port(), 8899); assert_eq!(ci.rpc_pubsub.port(), 8900); assert!(ci.storage_addr.ip().is_unspecified()); @@ -338,7 +338,7 @@ mod tests { assert_eq!(d1.id, keypair.pubkey()); assert_eq!(d1.gossip, socketaddr!("127.0.0.1:1235")); assert_eq!(d1.tvu, socketaddr!("127.0.0.1:1236")); - assert_eq!(d1.tpu_via_blobs, socketaddr!("127.0.0.1:1237")); + assert_eq!(d1.tpu_forwards, socketaddr!("127.0.0.1:1237")); assert_eq!(d1.tpu, socketaddr!("127.0.0.1:1234")); assert_eq!(d1.rpc, socketaddr!("127.0.0.1:8899")); assert_eq!(d1.rpc_pubsub, socketaddr!("127.0.0.1:8900")); diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index 675e5906bd..5c89d9340f 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -22,28 +22,28 @@ impl FetchStage { #[allow(clippy::new_ret_no_self)] pub fn new( sockets: Vec, - tpu_via_blobs_sockets: Vec, + tpu_forwards_sockets: Vec, exit: &Arc, poh_recorder: &Arc>, ) -> (Self, PacketReceiver) { let (sender, receiver) = channel(); ( - Self::new_with_sender(sockets, tpu_via_blobs_sockets, exit, &sender, &poh_recorder), + Self::new_with_sender(sockets, tpu_forwards_sockets, exit, &sender, &poh_recorder), receiver, ) } pub fn new_with_sender( sockets: Vec, - tpu_via_blobs_sockets: Vec, + tpu_forwards_sockets: Vec, exit: &Arc, sender: &PacketSender, poh_recorder: &Arc>, ) -> Self { let tx_sockets = sockets.into_iter().map(Arc::new).collect(); - let tpu_via_blobs_sockets = tpu_via_blobs_sockets.into_iter().map(Arc::new).collect(); + let tpu_forwards_sockets = tpu_forwards_sockets.into_iter().map(Arc::new).collect(); Self::new_multi_socket( tx_sockets, - tpu_via_blobs_sockets, + tpu_forwards_sockets, exit, &sender, &poh_recorder, @@ -83,7 +83,7 @@ impl FetchStage { fn new_multi_socket( sockets: Vec>, - tpu_via_blobs_sockets: Vec>, + tpu_forwards_sockets: Vec>, exit: &Arc, sender: &PacketSender, poh_recorder: &Arc>, @@ -100,9 +100,15 @@ impl FetchStage { }); let (forward_sender, forward_receiver) = channel(); - let tpu_via_blobs_threads = tpu_via_blobs_sockets - .into_iter() - .map(|socket| streamer::blob_packet_receiver(socket, &exit, forward_sender.clone())); + let tpu_forwards_threads = tpu_forwards_sockets.into_iter().map(|socket| { + streamer::receiver( + socket, + &exit, + forward_sender.clone(), + recycler.clone(), + "fetch_forward_stage", + ) + }); let sender = sender.clone(); let poh_recorder = poh_recorder.clone(); @@ -124,7 +130,7 @@ impl FetchStage { }) .unwrap(); - let mut thread_hdls: Vec<_> = tpu_threads.chain(tpu_via_blobs_threads).collect(); + let mut thread_hdls: Vec<_> = tpu_threads.chain(tpu_forwards_threads).collect(); thread_hdls.push(fwd_thread_hdl); Self { thread_hdls } } diff --git a/core/src/packet.rs b/core/src/packet.rs index d6caa4b481..ad978ada2d 100644 --- a/core/src/packet.rs +++ b/core/src/packet.rs @@ -12,13 +12,11 @@ pub use solana_sdk::packet::PACKET_DATA_SIZE; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::Signable; use solana_sdk::signature::Signature; -use std::borrow::Borrow; use std::borrow::Cow; use std::cmp; use std::fmt; use std::io; use std::io::Cursor; -use std::io::Write; use std::mem; use std::mem::size_of; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}; @@ -365,18 +363,6 @@ pub fn to_shared_blobs(rsps: Vec<(T, SocketAddr)>) -> Result>(packets: &[T]) -> Vec { - let mut current_index = 0; - let mut blobs = vec![]; - while current_index < packets.len() { - let mut blob = Blob::default(); - current_index += blob.store_packets(&packets[current_index..]) as usize; - blobs.push(blob); - } - - blobs -} - macro_rules! range { ($prev:expr, $type:ident) => { $prev..$prev + size_of::<$type>() @@ -558,52 +544,6 @@ impl Blob { &self.data[SIGNATURE_RANGE] } - pub fn store_packets>(&mut self, packets: &[T]) -> u64 { - let size = self.size(); - let mut cursor = Cursor::new(&mut self.data_mut()[size..]); - let mut written = 0; - let mut last_index = 0; - for packet in packets { - if bincode::serialize_into(&mut cursor, &packet.borrow().meta.size).is_err() { - break; - } - let packet = packet.borrow(); - if cursor.write_all(&packet.data[..packet.meta.size]).is_err() { - break; - } - - written = cursor.position() as usize; - last_index += 1; - } - - self.set_size(size + written); - last_index - } - - // other side of store_packets - pub fn load_packets(&self, packets: &mut PinnedVec) { - // rough estimate - let mut pos = 0; - let size_len = bincode::serialized_size(&0usize).unwrap() as usize; - - while pos + size_len < self.size() { - let size: usize = bincode::deserialize_from(&self.data()[pos..]).unwrap(); - - pos += size_len; - - if size > PACKET_DATA_SIZE || pos + size > self.size() { - break; - } - - let mut packet = Packet::default(); - packet.meta.size = size; - packet.data[..size].copy_from_slice(&self.data()[pos..pos + size]); - - pos += size; - packets.push(packet); - } - } - pub fn recv_blob(socket: &UdpSocket, r: &SharedBlob) -> io::Result<()> { let mut p = r.write().unwrap(); trace!("receiving on {}", socket.local_addr().unwrap()); @@ -701,8 +641,6 @@ pub fn index_blobs(blobs: &[SharedBlob], id: &Pubkey, mut blob_index: u64, slot: #[cfg(test)] mod tests { use super::*; - use bincode; - use rand::Rng; use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::system_transaction; @@ -834,62 +772,6 @@ mod tests { assert_eq!(config, b.erasure_config()); } - #[test] - fn test_store_blobs_max() { - let serialized_size_size = bincode::serialized_size(&0usize).unwrap() as usize; - let serialized_packet_size = serialized_size_size + PACKET_DATA_SIZE; - let num_packets = (BLOB_SIZE - BLOB_HEADER_SIZE) / serialized_packet_size + 1; - let mut blob = Blob::default(); - let packets: Vec<_> = (0..num_packets) - .map(|_| { - let mut packet = Packet::default(); - packet.meta.size = PACKET_DATA_SIZE; - packet - }) - .collect(); - - // Everything except the last packet should have been written - assert_eq!(blob.store_packets(&packets[..]), (num_packets - 1) as u64); - - blob = Blob::default(); - // Store packets such that blob only has room for one more - assert_eq!( - blob.store_packets(&packets[..num_packets - 2]), - (num_packets - 2) as u64 - ); - - // Fill the last packet in the blob - assert_eq!(blob.store_packets(&packets[..num_packets - 2]), 1); - - // Blob is now full - assert_eq!(blob.store_packets(&packets), 0); - } - - #[test] - fn test_packets_to_blobs() { - let mut rng = rand::thread_rng(); - - let packets: Vec<_> = (0..2) - .map(|_| { - let mut packet = Packet::default(); - packet.meta.size = rng.gen_range(1, PACKET_DATA_SIZE); - for i in 0..packet.meta.size { - packet.data[i] = rng.gen_range(1, std::u8::MAX); - } - packet - }) - .collect(); - - let blobs = packets_to_blobs(&packets[..]); - - let mut reconstructed_packets = PinnedVec::default(); - blobs - .iter() - .for_each(|b| b.load_packets(&mut reconstructed_packets)); - - assert_eq!(reconstructed_packets[..], packets[..]); - } - #[test] fn test_blob_data_align() { assert_eq!(std::mem::align_of::(), BLOB_DATA_ALIGN); diff --git a/core/src/streamer.rs b/core/src/streamer.rs index d7b7557f4f..9674671d5b 100644 --- a/core/src/streamer.rs +++ b/core/src/streamer.rs @@ -134,46 +134,6 @@ pub fn blob_receiver( .unwrap() } -fn recv_blob_packets(sock: &UdpSocket, s: &PacketSender, recycler: &PacketsRecycler) -> Result<()> { - trace!( - "recv_blob_packets: receiving on {}", - sock.local_addr().unwrap() - ); - - let blobs = Blob::recv_from(sock)?; - for blob in blobs { - let mut packets = - Packets::new_with_recycler(recycler.clone(), PACKETS_PER_BLOB, "recv_blob_packets"); - blob.read().unwrap().load_packets(&mut packets.packets); - s.send(packets)?; - } - - Ok(()) -} - -pub fn blob_packet_receiver( - sock: Arc, - exit: &Arc, - s: PacketSender, -) -> JoinHandle<()> { - //DOCUMENTED SIDE-EFFECT - //1 second timeout on socket read - let timer = Duration::new(1, 0); - sock.set_read_timeout(Some(timer)) - .expect("set socket timeout"); - let exit = exit.clone(); - let recycler = PacketsRecycler::default(); - Builder::new() - .name("solana-blob_packet_receiver".to_string()) - .spawn(move || loop { - if exit.load(Ordering::Relaxed) { - break; - } - let _ = recv_blob_packets(&sock, &s, &recycler); - }) - .unwrap() -} - #[cfg(test)] mod test { use super::*; diff --git a/core/src/tpu.rs b/core/src/tpu.rs index cb064b33dc..0596367cc9 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -33,7 +33,7 @@ impl Tpu { poh_recorder: &Arc>, entry_receiver: Receiver, transactions_sockets: Vec, - tpu_via_blobs_sockets: Vec, + tpu_forwards_sockets: Vec, broadcast_socket: UdpSocket, sigverify_disabled: bool, blocktree: &Arc, @@ -44,7 +44,7 @@ impl Tpu { let (packet_sender, packet_receiver) = channel(); let fetch_stage = FetchStage::new_with_sender( transactions_sockets, - tpu_via_blobs_sockets, + tpu_forwards_sockets, &exit, &packet_sender, &poh_recorder, diff --git a/core/src/validator.rs b/core/src/validator.rs index 0a4eb24784..e067f112ff 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -267,7 +267,7 @@ impl Validator { &poh_recorder, entry_receiver, node.sockets.tpu, - node.sockets.tpu_via_blobs, + node.sockets.tpu_forwards, node.sockets.broadcast, config.sigverify_disabled, &blocktree,