From ef205593c567317ae12ff6cdbd9ae9eff1db6c22 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Tue, 15 Jun 2021 15:16:20 +0000 Subject: [PATCH] removes port-based forwarding logic from turbine retransmit (#17716) (#17973) Turbine retransmit logic is based on which socket it received the packet from (i.e `packet.meta.forward`): https://github.com/solana-labs/solana/blob/708bbcb00/core/src/retransmit_stage.rs#L467-L470 This can leave the cluster vulnerable to spoofing and selective propagation of packets; see https://github.com/solana-labs/solana/issues/6672 https://github.com/solana-labs/solana/pull/7774 This commit identifies if the node is on the "critical path" based on its index in the shuffled cluster. If so, it forwards the packet to both neighbors and children; otherwise, the packet is only forwarded to the children. The metrics added in https://github.com/solana-labs/solana/pull/17351 shows that the number of times the index does not match the port is very rare, and therefore this change should be safe. (cherry picked from commit 161838655c770712ec5b993b469b128f2f64bf60) Co-authored-by: behzad nouri --- core/benches/retransmit_stage.rs | 7 ++++++- core/src/retransmit_stage.rs | 28 ++++++++++++++++++++++------ 2 files changed, 28 insertions(+), 7 deletions(-) diff --git a/core/benches/retransmit_stage.rs b/core/benches/retransmit_stage.rs index 5d22556071..63f762a2fb 100644 --- a/core/benches/retransmit_stage.rs +++ b/core/benches/retransmit_stage.rs @@ -39,7 +39,12 @@ fn bench_retransmitter(bencher: &mut Bencher) { const NUM_PEERS: usize = 4; let mut peer_sockets = Vec::new(); for _ in 0..NUM_PEERS { - let id = pubkey::new_rand(); + // This ensures that cluster_info.id() is the root of turbine + // retransmit tree and so the shreds are retransmited to all other + // nodes in the cluster. + let id = std::iter::repeat_with(pubkey::new_rand) + .find(|pk| cluster_info.id() < *pk) + .unwrap(); let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut contact_info = ContactInfo::new_localhost(&id, timestamp()); contact_info.tvu = socket.local_addr().unwrap(); diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 137d0873ae..499a8f28ae 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -428,7 +428,9 @@ fn retransmit( // neighborhood), then we expect that the packet arrives at tvu socket // as opposed to tvu-forwards. If this is not the case, then the // turbine broadcast/retransmit tree is mismatched across nodes. - if packet.meta.forward == (my_index % DATA_PLANE_FANOUT == 0) { + let anchor_node = my_index % DATA_PLANE_FANOUT == 0; + if packet.meta.forward == anchor_node { + // TODO: Consider forwarding the packet to the root node here. retransmit_tree_mismatch += 1; } peers_len = cmp::max(peers_len, shuffled_stakes_and_index.len()); @@ -464,10 +466,19 @@ fn retransmit( .or_default() += 1; let mut retransmit_time = Measure::start("retransmit_to"); - if !packet.meta.forward { - ClusterInfo::retransmit_to(&neighbors, packet, sock, true); + // If the node is on the critical path (i.e. the first node in each + // neighborhood), it should send the packet to tvu socket of its + // children and also tvu_forward socket of its neighbors. Otherwise it + // should only forward to tvu_forward socket of its children. + if anchor_node { + ClusterInfo::retransmit_to(&neighbors, packet, sock, /*forward socket=*/ true); } - ClusterInfo::retransmit_to(&children, packet, sock, packet.meta.forward); + ClusterInfo::retransmit_to( + &children, + packet, + sock, + !anchor_node, // send to forward socket! + ); retransmit_time.stop(); retransmit_total += retransmit_time.as_us(); } @@ -726,8 +737,13 @@ mod tests { .unwrap() .local_addr() .unwrap(); - - let other = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); + // This fixes the order of nodes returned by shuffle_peers_and_index, + // and makes turbine retransmit tree deterministic for the purpose of + // the test. + let other = std::iter::repeat_with(solana_sdk::pubkey::new_rand) + .find(|pk| me.id < *pk) + .unwrap(); + let other = ContactInfo::new_localhost(&other, 0); let cluster_info = ClusterInfo::new_with_invalid_keypair(other); cluster_info.insert_info(me);