* removes seed and slot fields from Packet.Meta507367e6ac
updated window-service to send shreds (as opposed to packets) to retransmit-stage and so seed and slot fields in Packet.Meta are unused: https://github.com/solana-labs/solana/blob/d6ec103be/sdk/src/packet.rs#L27-L28 (cherry picked from commitaa9f7ed7e8
) * uses std::net::IpAddr type for Packet.Meta.addr (cherry picked from commit73a7741c49
) # Conflicts: # streamer/src/streamer.rs * adds bitflags to Packet.Meta Instead of a separate bool type for each flag, all the flags can be encoded in a type-safe bitflags encoded in a single u8: https://github.com/solana-labs/solana/blob/d6ec103be/sdk/src/packet.rs#L19-L31 (cherry picked from commit01a096adc8
) # Conflicts: # sdk/Cargo.toml * removes backport merge conflicts Co-authored-by: behzad nouri <behzadnouri@gmail.com>
This commit is contained in:
@ -412,7 +412,7 @@ impl BankingStage {
|
||||
let packet_vec: Vec<_> = packets
|
||||
.iter()
|
||||
.filter_map(|p| {
|
||||
if !p.meta.forwarded && data_budget.take(p.meta.size) {
|
||||
if !p.meta.forwarded() && data_budget.take(p.meta.size) {
|
||||
Some((&p.data[..p.meta.size], tpu_forwards))
|
||||
} else {
|
||||
None
|
||||
@ -1132,7 +1132,7 @@ impl BankingStage {
|
||||
.iter()
|
||||
.filter_map(|tx_index| {
|
||||
let p = &packet_batch.packets[*tx_index];
|
||||
if votes_only && !p.meta.is_simple_vote_tx {
|
||||
if votes_only && !p.meta.is_simple_vote_tx() {
|
||||
return None;
|
||||
}
|
||||
|
||||
@ -1142,7 +1142,7 @@ impl BankingStage {
|
||||
let tx = SanitizedTransaction::try_create(
|
||||
tx,
|
||||
message_hash,
|
||||
Some(p.meta.is_simple_vote_tx),
|
||||
Some(p.meta.is_simple_vote_tx()),
|
||||
|_| Err(TransactionError::UnsupportedVersion),
|
||||
)
|
||||
.ok()?;
|
||||
@ -1313,15 +1313,8 @@ impl BankingStage {
|
||||
fn generate_packet_indexes(vers: &PinnedVec<Packet>) -> Vec<usize> {
|
||||
vers.iter()
|
||||
.enumerate()
|
||||
.filter_map(
|
||||
|(index, ver)| {
|
||||
if !ver.meta.discard {
|
||||
Some(index)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
},
|
||||
)
|
||||
.filter(|(_, pkt)| !pkt.meta.discard())
|
||||
.map(|(index, _)| index)
|
||||
.collect()
|
||||
}
|
||||
|
||||
@ -1610,7 +1603,7 @@ mod tests {
|
||||
get_tmp_ledger_path,
|
||||
leader_schedule_cache::LeaderScheduleCache,
|
||||
},
|
||||
solana_perf::packet::to_packet_batches,
|
||||
solana_perf::packet::{to_packet_batches, PacketFlags},
|
||||
solana_poh::{
|
||||
poh_recorder::{create_test_recorder, Record, WorkingBankEntry},
|
||||
poh_service::PohService,
|
||||
@ -1754,7 +1747,7 @@ mod tests {
|
||||
b.packets
|
||||
.iter_mut()
|
||||
.zip(v)
|
||||
.for_each(|(p, f)| p.meta.discard = *f == 0)
|
||||
.for_each(|(p, f)| p.meta.set_discard(*f == 0))
|
||||
});
|
||||
with_vers.into_iter().map(|(b, _)| b).collect()
|
||||
}
|
||||
@ -2957,7 +2950,7 @@ mod tests {
|
||||
const FWD_PACKET: u8 = 1;
|
||||
let forwarded_packet = {
|
||||
let mut packet = Packet::from_data(None, &[FWD_PACKET]).unwrap();
|
||||
packet.meta.forwarded = true;
|
||||
packet.meta.flags |= PacketFlags::FORWARDED;
|
||||
packet
|
||||
};
|
||||
|
||||
@ -3201,7 +3194,7 @@ mod tests {
|
||||
packet_indexes.push(index);
|
||||
}
|
||||
for index in vote_indexes.iter() {
|
||||
packet_batch.packets[*index].meta.is_simple_vote_tx = true;
|
||||
packet_batch.packets[*index].meta.flags |= PacketFlags::SIMPLE_VOTE_TX;
|
||||
}
|
||||
(packet_batch, packet_indexes)
|
||||
}
|
||||
|
@ -308,7 +308,7 @@ impl ClusterInfoVoteListener {
|
||||
.filter(|(_, packet_batch)| {
|
||||
// to_packet_batches() above splits into 1 packet long batches
|
||||
assert_eq!(packet_batch.packets.len(), 1);
|
||||
!packet_batch.packets[0].meta.discard
|
||||
!packet_batch.packets[0].meta.discard()
|
||||
})
|
||||
.filter_map(|(tx, packet_batch)| {
|
||||
let (vote_account_key, vote, _) = vote_transaction::parse_vote_transaction(&tx)?;
|
||||
|
@ -8,7 +8,10 @@ use {
|
||||
solana_metrics::{inc_new_counter_debug, inc_new_counter_info},
|
||||
solana_perf::{packet::PacketBatchRecycler, recycler::Recycler},
|
||||
solana_poh::poh_recorder::PohRecorder,
|
||||
solana_sdk::{clock::DEFAULT_TICKS_PER_SLOT, packet::Packet},
|
||||
solana_sdk::{
|
||||
clock::DEFAULT_TICKS_PER_SLOT,
|
||||
packet::{Packet, PacketFlags},
|
||||
},
|
||||
solana_streamer::streamer::{self, PacketBatchReceiver, PacketBatchSender},
|
||||
std::{
|
||||
net::UdpSocket,
|
||||
@ -84,7 +87,7 @@ impl FetchStage {
|
||||
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
||||
) -> Result<()> {
|
||||
let mark_forwarded = |packet: &mut Packet| {
|
||||
packet.meta.forwarded = true;
|
||||
packet.meta.flags |= PacketFlags::FORWARDED;
|
||||
};
|
||||
|
||||
let mut packet_batch = recvr.recv()?;
|
||||
|
@ -56,7 +56,10 @@ mod test {
|
||||
shred::{Shred, Shredder},
|
||||
sigverify_shreds::verify_shred_cpu,
|
||||
},
|
||||
solana_sdk::signature::{Keypair, Signer},
|
||||
solana_sdk::{
|
||||
packet::PacketFlags,
|
||||
signature::{Keypair, Signer},
|
||||
},
|
||||
std::{
|
||||
collections::HashMap,
|
||||
net::{IpAddr, Ipv4Addr},
|
||||
@ -87,7 +90,7 @@ mod test {
|
||||
nonce,
|
||||
)
|
||||
.unwrap();
|
||||
packet.meta.repair = true;
|
||||
packet.meta.flags |= PacketFlags::REPAIR;
|
||||
|
||||
let leader_slots = [(slot, keypair.pubkey().to_bytes())]
|
||||
.iter()
|
||||
|
@ -615,7 +615,7 @@ mod tests {
|
||||
let mut packet_batch = PacketBatch::new(vec![]);
|
||||
solana_streamer::packet::recv_from(&mut packet_batch, &me_retransmit, 1).unwrap();
|
||||
assert_eq!(packet_batch.packets.len(), 1);
|
||||
assert!(!packet_batch.packets[0].meta.repair);
|
||||
assert!(!packet_batch.packets[0].meta.repair());
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -6,7 +6,7 @@ use {
|
||||
solana_ledger::shred::{get_shred_slot_index_type, ShredFetchStats},
|
||||
solana_perf::{
|
||||
cuda_runtime::PinnedVec,
|
||||
packet::{Packet, PacketBatchRecycler},
|
||||
packet::{Packet, PacketBatchRecycler, PacketFlags},
|
||||
recycler::Recycler,
|
||||
},
|
||||
solana_runtime::bank_forks::BankForks,
|
||||
@ -40,7 +40,7 @@ impl ShredFetchStage {
|
||||
) where
|
||||
F: Fn(&mut Packet),
|
||||
{
|
||||
p.meta.discard = true;
|
||||
p.meta.set_discard(true);
|
||||
if let Some((slot, _index, _shred_type)) = get_shred_slot_index_type(p, stats) {
|
||||
// Seems reasonable to limit shreds to 2 epochs away
|
||||
if slot > last_root && slot < (last_slot + 2 * slots_per_epoch) {
|
||||
@ -50,7 +50,7 @@ impl ShredFetchStage {
|
||||
|
||||
if shreds_received.get(&hash).is_none() {
|
||||
shreds_received.put(hash, ());
|
||||
p.meta.discard = false;
|
||||
p.meta.set_discard(false);
|
||||
modify(p);
|
||||
} else {
|
||||
stats.duplicate_shred += 1;
|
||||
@ -192,7 +192,7 @@ impl ShredFetchStage {
|
||||
recycler.clone(),
|
||||
bank_forks.clone(),
|
||||
"shred_fetch_tvu_forwards",
|
||||
|p| p.meta.forwarded = true,
|
||||
|p| p.meta.flags.insert(PacketFlags::FORWARDED),
|
||||
);
|
||||
|
||||
let (repair_receiver, repair_handler) = Self::packet_modifier(
|
||||
@ -202,7 +202,7 @@ impl ShredFetchStage {
|
||||
recycler,
|
||||
bank_forks,
|
||||
"shred_fetch_repair",
|
||||
|p| p.meta.repair = true,
|
||||
|p| p.meta.flags.insert(PacketFlags::REPAIR),
|
||||
);
|
||||
|
||||
tvu_threads.extend(tvu_forwards_threads.into_iter());
|
||||
@ -266,7 +266,7 @@ mod tests {
|
||||
&|_p| {},
|
||||
&hasher,
|
||||
);
|
||||
assert!(!packet.meta.discard);
|
||||
assert!(!packet.meta.discard());
|
||||
let coding = solana_ledger::shred::Shredder::generate_coding_shreds(
|
||||
&[shred],
|
||||
false, // is_last_in_slot
|
||||
@ -283,7 +283,7 @@ mod tests {
|
||||
&|_p| {},
|
||||
&hasher,
|
||||
);
|
||||
assert!(!packet.meta.discard);
|
||||
assert!(!packet.meta.discard());
|
||||
}
|
||||
|
||||
#[test]
|
||||
@ -310,7 +310,7 @@ mod tests {
|
||||
&hasher,
|
||||
);
|
||||
assert_eq!(stats.index_overrun, 1);
|
||||
assert!(packet.meta.discard);
|
||||
assert!(packet.meta.discard());
|
||||
let shred = Shred::new_from_data(1, 3, 0, None, true, true, 0, 0, 0);
|
||||
shred.copy_to_packet(&mut packet);
|
||||
|
||||
@ -325,7 +325,7 @@ mod tests {
|
||||
&|_p| {},
|
||||
&hasher,
|
||||
);
|
||||
assert!(packet.meta.discard);
|
||||
assert!(packet.meta.discard());
|
||||
|
||||
// Accepted for 1,3
|
||||
ShredFetchStage::process_packet(
|
||||
@ -338,7 +338,7 @@ mod tests {
|
||||
&|_p| {},
|
||||
&hasher,
|
||||
);
|
||||
assert!(!packet.meta.discard);
|
||||
assert!(!packet.meta.discard());
|
||||
|
||||
// shreds_received should filter duplicate
|
||||
ShredFetchStage::process_packet(
|
||||
@ -351,7 +351,7 @@ mod tests {
|
||||
&|_p| {},
|
||||
&hasher,
|
||||
);
|
||||
assert!(packet.meta.discard);
|
||||
assert!(packet.meta.discard());
|
||||
|
||||
let shred = Shred::new_from_data(1_000_000, 3, 0, None, true, true, 0, 0, 0);
|
||||
shred.copy_to_packet(&mut packet);
|
||||
@ -367,7 +367,7 @@ mod tests {
|
||||
&|_p| {},
|
||||
&hasher,
|
||||
);
|
||||
assert!(packet.meta.discard);
|
||||
assert!(packet.meta.discard());
|
||||
|
||||
let index = MAX_DATA_SHREDS_PER_SLOT as u32;
|
||||
let shred = Shred::new_from_data(5, index, 0, None, true, true, 0, 0, 0);
|
||||
@ -382,6 +382,6 @@ mod tests {
|
||||
&|_p| {},
|
||||
&hasher,
|
||||
);
|
||||
assert!(packet.meta.discard);
|
||||
assert!(packet.meta.discard());
|
||||
}
|
||||
}
|
||||
|
@ -162,7 +162,7 @@ pub mod tests {
|
||||
batches[0].packets[1].meta.size = shred.payload.len();
|
||||
|
||||
let rv = verifier.verify_batches(batches);
|
||||
assert!(!rv[0].packets[0].meta.discard);
|
||||
assert!(rv[0].packets[1].meta.discard);
|
||||
assert!(!rv[0].packets[0].meta.discard());
|
||||
assert!(rv[0].packets[1].meta.discard());
|
||||
}
|
||||
}
|
||||
|
@ -163,7 +163,9 @@ impl SigVerifyStage {
|
||||
}
|
||||
for (_addr, indexes) in received_ips {
|
||||
for (batch_index, packet_index) in indexes {
|
||||
batches[batch_index].packets[packet_index].meta.discard = true;
|
||||
batches[batch_index].packets[packet_index]
|
||||
.meta
|
||||
.set_discard(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -275,7 +277,7 @@ mod tests {
|
||||
batch
|
||||
.packets
|
||||
.iter()
|
||||
.map(|p| if p.meta.discard { 0 } else { 1 })
|
||||
.map(|p| if p.meta.discard() { 0 } else { 1 })
|
||||
.sum::<usize>()
|
||||
})
|
||||
.sum::<usize>()
|
||||
@ -286,12 +288,12 @@ mod tests {
|
||||
solana_logger::setup();
|
||||
let mut batch = PacketBatch::default();
|
||||
batch.packets.resize(10, Packet::default());
|
||||
batch.packets[3].meta.addr = [1u16; 8];
|
||||
batch.packets[3].meta.addr = std::net::IpAddr::from([1u16; 8]);
|
||||
let mut batches = vec![batch];
|
||||
let max = 3;
|
||||
SigVerifyStage::discard_excess_packets(&mut batches, max);
|
||||
assert_eq!(count_non_discard(&batches), max);
|
||||
assert!(!batches[0].packets[0].meta.discard);
|
||||
assert!(!batches[0].packets[3].meta.discard);
|
||||
assert!(!batches[0].packets[0].meta.discard());
|
||||
assert!(!batches[0].packets[3].meta.discard());
|
||||
}
|
||||
}
|
||||
|
@ -362,7 +362,7 @@ where
|
||||
let last_root = blockstore.last_root();
|
||||
let working_bank = bank_forks.read().unwrap().working_bank();
|
||||
let handle_packet = |packet: &Packet| {
|
||||
if packet.meta.discard {
|
||||
if packet.meta.discard() {
|
||||
inc_new_counter_debug!("streamer-recv_window-invalid_or_unnecessary_packet", 1);
|
||||
return None;
|
||||
}
|
||||
@ -375,7 +375,7 @@ where
|
||||
if !shred_filter(&shred, working_bank.clone(), last_root) {
|
||||
return None;
|
||||
}
|
||||
if packet.meta.repair {
|
||||
if packet.meta.repair() {
|
||||
let repair_info = RepairMeta {
|
||||
_from_addr: packet.meta.addr(),
|
||||
// If can't parse the nonce, dump the packet.
|
||||
|
Reference in New Issue
Block a user