Broadcast for slots (#2081)

* Insert blobs into db_ledger in broadcast stage to support leader to validator transitions

* Add transmitting real slots to broadcast stage

* Handle real slots instead of default slots in window

* Switch to dummy repair on slots and modify erasure to support leader rotation

* Shorten length of holding locks

* Remove logger from replicator test
This commit is contained in:
carllin
2018-12-12 15:58:29 -08:00
committed by GitHub
parent bf33d9d703
commit ae903f190e
11 changed files with 474 additions and 150 deletions

View File

@@ -22,7 +22,6 @@ use std::sync::{Arc, RwLock};
pub const MAX_REPAIR_LENGTH: usize = 128;
pub fn repair(
slot: u64,
db_ledger: &DbLedger,
cluster_info: &Arc<RwLock<ClusterInfo>>,
id: &Pubkey,
@@ -33,7 +32,9 @@ pub fn repair(
) -> Result<Vec<(SocketAddr, Vec<u8>)>> {
let rcluster_info = cluster_info.read().unwrap();
let mut is_next_leader = false;
let meta = db_ledger.meta_cf.get(&db_ledger.db, &MetaCf::key(slot))?;
let meta = db_ledger
.meta_cf
.get(&db_ledger.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT))?;
if meta.is_none() {
return Ok(vec![]);
}
@@ -84,7 +85,7 @@ pub fn repair(
};
let idxs = find_missing_data_indexes(
slot,
DEFAULT_SLOT_HEIGHT,
db_ledger,
consumed,
max_repair_entry_height - 1,
@@ -219,7 +220,7 @@ pub fn find_missing_coding_indexes(
pub fn retransmit_all_leader_blocks(
dq: &[SharedBlob],
leader_scheduler: &LeaderScheduler,
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
retransmit: &BlobSender,
) -> Result<()> {
let mut retransmit_queue: Vec<SharedBlob> = Vec::new();
@@ -227,7 +228,7 @@ pub fn retransmit_all_leader_blocks(
// Check if the blob is from the scheduled leader for its slot. If so,
// add to the retransmit_queue
if let Ok(slot) = b.read().unwrap().slot() {
if let Some(leader_id) = leader_scheduler.get_leader_for_slot(slot) {
if let Some(leader_id) = leader_scheduler.read().unwrap().get_leader_for_slot(slot) {
add_blob_to_retransmit_queue(b, leader_id, &mut retransmit_queue);
}
}
@@ -271,8 +272,8 @@ pub fn add_blob_to_retransmit_queue(
/// starting from consumed is thereby formed, add that continuous
/// range of blobs to a queue to be sent on to the next stage.
pub fn process_blob(
leader_scheduler: &LeaderScheduler,
db_ledger: &mut DbLedger,
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
db_ledger: &Arc<RwLock<DbLedger>>,
blob: &SharedBlob,
max_ix: u64,
pix: u64,
@@ -287,11 +288,10 @@ pub fn process_blob(
// leader rotation enabled
// Github issue: https://github.com/solana-labs/solana/issues/1899.
let slot = blob.read().unwrap().slot()?;
let leader = leader_scheduler.get_leader_for_slot(slot);
let leader = leader_scheduler.read().unwrap().get_leader_for_slot(slot);
// TODO: Once the original leader signature is added to the blob, make sure that
// the blob was originally generated by the expected leader for this slot
if leader.is_none() {
return Ok(());
}
@@ -301,15 +301,21 @@ pub fn process_blob(
let erasure_key = ErasureCf::key(slot, pix);
let rblob = &blob.read().unwrap();
let size = rblob.size()?;
db_ledger.erasure_cf.put(
&db_ledger.db,
&erasure_key,
&rblob.data[..BLOB_HEADER_SIZE + size],
)?;
{
let w_db = db_ledger.write().unwrap();
w_db.erasure_cf.put(
&w_db.db,
&erasure_key,
&rblob.data[..BLOB_HEADER_SIZE + size],
)?;
}
vec![]
} else {
let data_key = DataCf::key(slot, pix);
db_ledger.insert_data_blob(&data_key, &blob.read().unwrap())?
db_ledger
.write()
.unwrap()
.insert_data_blob(&data_key, &blob.read().unwrap())?
};
#[cfg(feature = "erasure")]
@@ -317,7 +323,7 @@ pub fn process_blob(
// If write_shared_blobs() of these recovered blobs fails fails, don't return
// because consumed_entries might be nonempty from earlier, and tick height needs to
// be updated. Hopefully we can recover these blobs next time successfully.
if let Err(e) = try_erasure(db_ledger, slot, consume_queue) {
if let Err(e) = try_erasure(db_ledger, consume_queue) {
trace!(
"erasure::recover failed to write recovered coding blobs. Err: {:?}",
e
@@ -333,10 +339,12 @@ pub fn process_blob(
// we only want up to a certain index
// then stop
if max_ix != 0 && !consumed_entries.is_empty() {
let meta = db_ledger
.meta_cf
.get(&db_ledger.db, &MetaCf::key(slot))?
.expect("Expect metadata to exist if consumed entries is nonzero");
let meta = {
let r_db = db_ledger.read().unwrap();
r_db.meta_cf
.get(&r_db.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT))?
.expect("Expect metadata to exist if consumed entries is nonzero")
};
let consumed = meta.consumed;
@@ -374,23 +382,31 @@ pub fn calculate_max_repair_entry_height(
}
#[cfg(feature = "erasure")]
fn try_erasure(db_ledger: &mut DbLedger, slot: u64, consume_queue: &mut Vec<Entry>) -> Result<()> {
let meta = db_ledger.meta_cf.get(&db_ledger.db, &MetaCf::key(slot))?;
fn try_erasure(db_ledger: &Arc<RwLock<DbLedger>>, consume_queue: &mut Vec<Entry>) -> Result<()> {
let meta = {
let r_db = db_ledger.read().unwrap();
r_db.meta_cf
.get(&r_db.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT))?
};
if let Some(meta) = meta {
let (data, coding) = erasure::recover(db_ledger, slot, meta.consumed)?;
let (data, coding) = erasure::recover(db_ledger, meta.consumed_slot, meta.consumed)?;
for c in coding {
let cl = c.read().unwrap();
let erasure_key =
ErasureCf::key(slot, cl.index().expect("Recovered blob must set index"));
let erasure_key = ErasureCf::key(
meta.consumed_slot,
cl.index().expect("Recovered blob must set index"),
);
let size = cl.size().expect("Recovered blob must set size");
db_ledger.erasure_cf.put(
&db_ledger.db,
&erasure_key,
&cl.data[..BLOB_HEADER_SIZE + size],
)?;
let r_db = db_ledger.read().unwrap();
r_db.erasure_cf
.put(&r_db.db, &erasure_key, &cl.data[..BLOB_HEADER_SIZE + size])?;
}
let entries = db_ledger.write_shared_blobs(slot, data)?;
let entries = db_ledger
.write()
.unwrap()
.write_shared_blobs(meta.consumed_slot, data)?;
consume_queue.extend(entries);
}
@@ -416,7 +432,7 @@ mod test {
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::time::Duration;
fn get_msgs(r: PacketReceiver, num: &mut usize) {
@@ -500,7 +516,8 @@ mod test {
pub fn test_retransmit() {
let leader = Keypair::new().pubkey();
let nonleader = Keypair::new().pubkey();
let leader_scheduler = LeaderScheduler::from_bootstrap_leader(leader);
let leader_scheduler =
Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader(leader)));
let blob = SharedBlob::default();
let (blob_sender, blob_receiver) = channel();
@@ -714,12 +731,15 @@ mod test {
// Generate the db_ledger from the window
let ledger_path = get_tmp_ledger_path("test_try_erasure");
let mut db_ledger =
generate_db_ledger_from_window(&ledger_path, &window, slot_height, false);
let db_ledger = Arc::new(RwLock::new(generate_db_ledger_from_window(
&ledger_path,
&window,
slot_height,
false,
)));
let mut consume_queue = vec![];
try_erasure(&mut db_ledger, slot_height, &mut consume_queue)
.expect("Expected successful erasure attempt");
try_erasure(&db_ledger, &mut consume_queue).expect("Expected successful erasure attempt");
window[erase_offset].data = erased_data;
let data_blobs: Vec<_> = window[erase_offset..end_index]
@@ -730,10 +750,11 @@ mod test {
assert_eq!(consume_queue, expected);
let erased_coding_l = erased_coding.read().unwrap();
let r_db = db_ledger.read().unwrap();
assert_eq!(
&db_ledger
&r_db
.erasure_cf
.get_by_slot_index(&db_ledger.db, slot_height, erase_offset as u64)
.get_by_slot_index(&r_db.db, slot_height, erase_offset as u64)
.unwrap()
.unwrap()[BLOB_HEADER_SIZE..],
&erased_coding_l.data()[..erased_coding_l.size().unwrap() as usize],