diff --git a/src/bin/genesis.rs b/src/bin/genesis.rs index 17683c0494..74e49bca9b 100644 --- a/src/bin/genesis.rs +++ b/src/bin/genesis.rs @@ -5,6 +5,7 @@ extern crate clap; use serde_json; use clap::{App, Arg}; +use solana::db_ledger::genesis; use solana::ledger::LedgerWriter; use solana::mint::Mint; use solana_sdk::signature::{read_keypair, KeypairUtil}; @@ -79,9 +80,13 @@ fn main() -> Result<(), Box> { ); // Write the ledger entries + let entries = mint.create_entries(); + let ledger_path = matches.value_of("ledger").unwrap(); let mut ledger_writer = LedgerWriter::open(&ledger_path, true)?; - ledger_writer.write_entries(&mint.create_entries())?; + ledger_writer.write_entries(&entries)?; + + genesis(&ledger_path, &leader_keypair, &entries)?; Ok(()) } diff --git a/src/broadcast_service.rs b/src/broadcast_service.rs index c0033abeca..859b25c931 100644 --- a/src/broadcast_service.rs +++ b/src/broadcast_service.rs @@ -70,13 +70,12 @@ fn broadcast( // Generate the slot heights for all the entries inside ventries let slot_heights = generate_slots(&ventries, leader_scheduler); - let blobs_vec: Vec<_> = ventries + let blobs: Vec<_> = ventries .into_par_iter() .flat_map(|p| p.to_blobs()) .collect(); - let blobs_slot_heights: Vec<(SharedBlob, u64)> = - blobs_vec.into_iter().zip(slot_heights).collect(); + let blobs_slot_heights: Vec<(SharedBlob, u64)> = blobs.into_iter().zip(slot_heights).collect(); let to_blobs_elapsed = duration_as_ms(&to_blobs_start.elapsed()); diff --git a/src/db_ledger.rs b/src/db_ledger.rs index 6b5f42ddfb..0a841ba180 100644 --- a/src/db_ledger.rs +++ b/src/db_ledger.rs @@ -320,10 +320,12 @@ impl DbLedger { I: IntoIterator, I::Item: Borrow, { - let shared_blobs = entries - .into_iter() - .enumerate() - .map(|(idx, entry)| entry.borrow().to_blob(Some(idx as u64), None, None)); + let shared_blobs = entries.into_iter().enumerate().map(|(idx, entry)| { + let b = entry.borrow().to_blob(); + b.write().unwrap().set_index(idx as u64).unwrap(); + b + }); + self.write_shared_blobs(slot, shared_blobs) } @@ -534,19 +536,19 @@ where } } -pub fn genesis<'a, I>(ledger_path: &str, keypair: Option<&Keypair>, entries: I) -> Result<()> +pub fn genesis<'a, I>(ledger_path: &str, keypair: &Keypair, entries: I) -> Result<()> where I: IntoIterator, { let mut db_ledger = DbLedger::open(ledger_path)?; - let pubkey = keypair.map(|k| k.pubkey()); - // TODO sign these blobs with keypair - let blobs = entries - .into_iter() - .enumerate() - .map(|(idx, entry)| entry.borrow().to_blob(Some(idx as u64), pubkey, None)); + let blobs = entries.into_iter().enumerate().map(|(idx, entry)| { + let b = entry.borrow().to_blob(); + b.write().unwrap().set_index(idx as u64).unwrap(); + b.write().unwrap().set_id(&keypair.pubkey()).unwrap(); + b + }); db_ledger.write_shared_blobs(DEFAULT_SLOT_HEIGHT, blobs)?; Ok(()) @@ -556,6 +558,7 @@ where mod tests { use super::*; use crate::ledger::{get_tmp_ledger_path, make_tiny_test_entries, Block}; + use crate::packet::index_blobs; #[test] fn test_put_get_simple() { @@ -611,6 +614,12 @@ mod tests { #[test] fn test_get_blobs_bytes() { let shared_blobs = make_tiny_test_entries(10).to_blobs(); + index_blobs( + shared_blobs.iter().zip(vec![0u64; 10].into_iter()), + &Keypair::new().pubkey(), + 0, + ); + let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect(); let slot = DEFAULT_SLOT_HEIGHT; @@ -836,9 +845,9 @@ mod tests { pub fn test_genesis_and_entry_iterator() { // Create RocksDb ledger let entries = make_tiny_test_entries(100); - let ledger_path = get_tmp_ledger_path("test_entry_iterator"); + let ledger_path = get_tmp_ledger_path("test_genesis_and_entry_iterator"); { - assert!(genesis(&ledger_path, None, &entries).is_ok()); + assert!(genesis(&ledger_path, &Keypair::new(), &entries).is_ok()); let ledger = DbLedger::open(&ledger_path).expect("open failed"); diff --git a/src/db_window.rs b/src/db_window.rs index 3514c1217a..19f6629065 100644 --- a/src/db_window.rs +++ b/src/db_window.rs @@ -423,7 +423,7 @@ mod test { #[cfg(all(feature = "erasure", test))] use crate::erasure::{NUM_CODING, NUM_DATA}; use crate::ledger::{get_tmp_ledger_path, make_tiny_test_entries, Block}; - use crate::packet::{Blob, Packet, Packets, SharedBlob, PACKET_DATA_SIZE}; + use crate::packet::{index_blobs, Blob, Packet, Packets, SharedBlob, PACKET_DATA_SIZE}; use crate::streamer::{receiver, responder, PacketReceiver}; use rocksdb::{Options, DB}; use solana_sdk::signature::{Keypair, KeypairUtil}; @@ -689,6 +689,13 @@ mod test { // Write entries let num_entries = 10; let shared_blobs = make_tiny_test_entries(num_entries).to_blobs(); + + index_blobs( + shared_blobs.iter().zip(vec![0u64; num_entries].into_iter()), + &Keypair::new().pubkey(), + 0, + ); + let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect(); db_ledger.write_blobs(slot, &blobs).unwrap(); diff --git a/src/entry.rs b/src/entry.rs index acb7585b7c..127326260f 100644 --- a/src/entry.rs +++ b/src/entry.rs @@ -7,11 +7,9 @@ use crate::poh::Poh; use crate::result::Result; use bincode::{deserialize, serialize_into, serialized_size}; use solana_sdk::hash::Hash; -use solana_sdk::pubkey::Pubkey; use solana_sdk::transaction::Transaction; use std::io::Cursor; use std::mem::size_of; -use std::net::SocketAddr; use std::sync::mpsc::{Receiver, Sender}; pub type EntrySender = Sender>; @@ -106,12 +104,7 @@ impl Entry { entry } - pub fn to_blob( - &self, - idx: Option, - id: Option, - addr: Option<&SocketAddr>, - ) -> SharedBlob { + pub fn to_blob(&self) -> SharedBlob { let blob = SharedBlob::default(); { let mut blob_w = blob.write().unwrap(); @@ -121,17 +114,6 @@ impl Entry { out.position() as usize }; blob_w.set_size(pos); - - if let Some(idx) = idx { - blob_w.set_index(idx).expect("set_index()"); - } - if let Some(id) = id { - blob_w.set_id(&id).expect("set_id()"); - } - if let Some(addr) = addr { - blob_w.meta.set_addr(addr); - } - blob_w.set_flags(0).unwrap(); } blob } diff --git a/src/fullnode.rs b/src/fullnode.rs index 872949cc1e..b237e9c398 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -674,9 +674,9 @@ mod tests { make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig, }; use crate::ledger::{ - create_tmp_genesis, create_tmp_sample_ledger, tmp_copy_ledger, LedgerWriter, + create_tmp_genesis, create_tmp_sample_ledger, make_consecutive_blobs, tmp_copy_ledger, + LedgerWriter, }; - use crate::packet::make_consecutive_blobs; use crate::service::Service; use crate::streamer::responder; use solana_sdk::signature::{Keypair, KeypairUtil}; @@ -1050,7 +1050,7 @@ mod tests { let total_blobs_to_send = bootstrap_height + extra_blobs; let tvu_address = &validator_info.tvu; let msgs = make_consecutive_blobs( - leader_id, + &leader_id, total_blobs_to_send, ledger_initial_len, last_id, diff --git a/src/ledger.rs b/src/ledger.rs index 558c53d836..2d20a27eaa 100644 --- a/src/ledger.rs +++ b/src/ledger.rs @@ -20,7 +20,6 @@ use std::fs::{copy, create_dir_all, remove_dir_all, File, OpenOptions}; use std::io::prelude::*; use std::io::{self, BufReader, BufWriter, Seek, SeekFrom}; use std::mem::size_of; -use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::path::Path; // @@ -407,7 +406,7 @@ impl LedgerWriter { I: IntoIterator, { for entry in entries { - self.write_entry_noflush(&entry)?; + self.write_entry_noflush(entry)?; } self.index.flush()?; self.data.flush()?; @@ -452,7 +451,6 @@ pub trait Block { /// Verifies the hashes and counts of a slice of transactions are all consistent. fn verify(&self, start_hash: &Hash) -> bool; fn to_blobs(&self) -> Vec; - fn to_blobs_with_id(&self, id: Pubkey, start_id: u64, addr: &SocketAddr) -> Vec; fn votes(&self) -> Vec<(Pubkey, Vote, Hash)>; } @@ -480,16 +478,8 @@ impl Block for [Entry] { }) } - fn to_blobs_with_id(&self, id: Pubkey, start_idx: u64, addr: &SocketAddr) -> Vec { - self.iter() - .enumerate() - .map(|(i, entry)| entry.to_blob(Some(start_idx + i as u64), Some(id), Some(&addr))) - .collect() - } - fn to_blobs(&self) -> Vec { - let default_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0); - self.to_blobs_with_id(Pubkey::default(), 0, &default_addr) + self.iter().map(|entry| entry.to_blob()).collect() } fn votes(&self) -> Vec<(Pubkey, Vote, Hash)> { @@ -705,6 +695,28 @@ pub fn make_large_test_entries(num_entries: usize) -> Vec { vec![entry; num_entries] } +#[cfg(test)] +pub fn make_consecutive_blobs( + id: &Pubkey, + num_blobs_to_make: u64, + start_height: u64, + start_hash: Hash, + addr: &std::net::SocketAddr, +) -> Vec { + let entries = create_ticks(num_blobs_to_make as usize, start_hash); + + let blobs = entries.to_blobs(); + let mut index = start_height; + for blob in &blobs { + let mut blob = blob.write().unwrap(); + blob.set_index(index).unwrap(); + blob.set_id(id).unwrap(); + blob.meta.set_addr(addr); + index += 1; + } + blobs +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/packet.rs b/src/packet.rs index ea8ac79f51..abfcfed65b 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -1,17 +1,11 @@ //! The `packet` module defines data structures and methods to pull data from the network. use crate::counter::Counter; -#[cfg(test)] -use crate::entry::Entry; -#[cfg(test)] -use crate::ledger::Block; use crate::recvmmsg::{recv_mmsg, NUM_RCVMMSGS}; use crate::result::{Error, Result}; use bincode::{deserialize, serialize}; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use log::Level; use serde::Serialize; -#[cfg(test)] -use solana_sdk::hash::Hash; pub use solana_sdk::packet::PACKET_DATA_SIZE; use solana_sdk::pubkey::Pubkey; use std::borrow::Borrow; @@ -459,27 +453,6 @@ where } } -#[cfg(test)] -pub fn make_consecutive_blobs( - me_id: Pubkey, - num_blobs_to_make: u64, - start_height: u64, - start_hash: Hash, - addr: &SocketAddr, -) -> SharedBlobs { - let mut last_hash = start_hash; - let num_hashes = 1; - let mut all_entries = Vec::with_capacity(num_blobs_to_make as usize); - for _ in 0..num_blobs_to_make { - let entry = Entry::new(&last_hash, 0, num_hashes, vec![]); - last_hash = entry.id; - all_entries.push(entry); - } - let mut new_blobs = all_entries.to_blobs_with_id(me_id, start_height, addr); - new_blobs.truncate(num_blobs_to_make as usize); - new_blobs -} - #[cfg(test)] mod tests { use crate::packet::{ diff --git a/src/window_service.rs b/src/window_service.rs index d8f2994849..f404a41399 100644 --- a/src/window_service.rs +++ b/src/window_service.rs @@ -236,9 +236,9 @@ mod test { use crate::db_ledger::DbLedger; use crate::entry::Entry; use crate::leader_scheduler::LeaderScheduler; - use crate::ledger::get_tmp_ledger_path; + use crate::ledger::{get_tmp_ledger_path, make_consecutive_blobs}; use crate::logger; - use crate::packet::{make_consecutive_blobs, SharedBlob, PACKET_DATA_SIZE}; + use crate::packet::{SharedBlob, PACKET_DATA_SIZE}; use crate::streamer::{blob_receiver, responder}; use crate::window_service::{repair_backoff, window_service}; use rocksdb::{Options, DB}; @@ -306,7 +306,7 @@ mod test { let num_blobs_to_make = 10; let gossip_address = &tn.info.gossip; let msgs = make_consecutive_blobs( - me_id, + &me_id, num_blobs_to_make, 0, Hash::default(),