give fullnode ownership of state needed to dynamically start up a tpu or tvu for role transition

This commit is contained in:
Carl
2018-09-14 01:53:18 -07:00
committed by Greg Fitzgerald
parent 1fb1c0a681
commit 6d27751365
10 changed files with 199 additions and 71 deletions

View File

@ -11,12 +11,11 @@ use clap::{App, Arg};
use solana::client::mk_client; use solana::client::mk_client;
use solana::crdt::Node; use solana::crdt::Node;
use solana::drone::DRONE_PORT; use solana::drone::DRONE_PORT;
use solana::fullnode::{Config, Fullnode, NodeRole}; use solana::fullnode::{Config, Fullnode, FullnodeReturnType};
use solana::logger; use solana::logger;
use solana::metrics::set_panic_hook; use solana::metrics::set_panic_hook;
use solana::signature::{Keypair, KeypairUtil}; use solana::signature::{Keypair, KeypairUtil};
use solana::thin_client::poll_gossip_for_leader; use solana::thin_client::poll_gossip_for_leader;
use solana::tpu::TpuReturnType;
use solana::wallet::request_airdrop; use solana::wallet::request_airdrop;
use std::fs::File; use std::fs::File;
use std::net::{Ipv4Addr, SocketAddr}; use std::net::{Ipv4Addr, SocketAddr};
@ -125,23 +124,14 @@ fn main() -> () {
} }
loop { loop {
let node_role = fullnode.node_role.take(); let status = fullnode.handle_role_transition();
match node_role { match status {
Some(NodeRole::Leader(leader_services)) => { Ok(Some(FullnodeReturnType::LeaderRotation)) => (),
match leader_services.join() { _ => {
Ok(Some(TpuReturnType::LeaderRotation)) => (), // Fullnode tpu/tvu exited for some unexpected
//fullnode.start_tvu(); // reason, so exit
Err(e) => { exit(1);
eprintln!("Leader returned error: {:?}", e);
exit(1);
}
_ => (),
}
} }
Some(NodeRole::Validator(validator_services)) => {
let _ = validator_services.join();
}
_ => (),
} }
} }
} }

View File

@ -328,8 +328,15 @@ mod tests {
#[test] #[test]
fn test_broadcast_stage_leader_rotation_exit() { fn test_broadcast_stage_leader_rotation_exit() {
let (id, buddy_id, broadcast_stage, shared_window, entry_sender, crdt, entries) = let (
setup_dummy_broadcast_stage(); id,
buddy_id,
broadcast_stage,
shared_window,
entry_sender,
crdt,
entries,
) = setup_dummy_broadcast_stage();
{ {
let mut wcrdt = crdt.write().unwrap(); let mut wcrdt = crdt.write().unwrap();
// Set leader to myself // Set leader to myself

View File

@ -335,7 +335,7 @@ mod tests {
&[], &[],
leader, leader,
None, None,
Some(&ledger_path), &ledger_path,
false, false,
); );

View File

@ -12,6 +12,7 @@ use rpc::{JsonRpcService, RPC_PORT};
use rpu::Rpu; use rpu::Rpu;
use service::Service; use service::Service;
use signature::{Keypair, KeypairUtil}; use signature::{Keypair, KeypairUtil};
use std::net::UdpSocket;
use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
@ -64,11 +65,23 @@ pub enum FullnodeReturnType {
} }
pub struct Fullnode { pub struct Fullnode {
pub node_role: Option<NodeRole>,
keypair: Arc<Keypair>,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
rpu: Rpu, rpu: Rpu,
rpc_service: JsonRpcService, rpc_service: JsonRpcService,
ncp: Ncp, ncp: Ncp,
pub node_role: Option<NodeRole>, bank: Arc<Bank>,
crdt: Arc<RwLock<Crdt>>,
ledger_path: String,
sigverify_disabled: bool,
shared_window: window::SharedWindow,
replicate_socket: Vec<UdpSocket>,
repair_socket: UdpSocket,
retransmit_socket: UdpSocket,
transaction_sockets: Vec<UdpSocket>,
broadcast_socket: UdpSocket,
blob_recycler: BlobRecycler,
} }
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
@ -102,22 +115,11 @@ impl Fullnode {
sigverify_disabled: bool, sigverify_disabled: bool,
) -> Self { ) -> Self {
info!("creating bank..."); info!("creating bank...");
let bank = Bank::new_default(leader_addr.is_none()); let (bank, entry_height, ledger_tail) = Self::new_bank_from_ledger(ledger_path);
let entries = read_ledger(ledger_path, true).expect("opening ledger");
let entries = entries
.map(|e| e.unwrap_or_else(|err| panic!("failed to parse entry. error: {}", err)));
info!("processing ledger...");
let (entry_height, ledger_tail) = bank.process_ledger(entries).expect("process_ledger");
// entry_height is the network-wide agreed height of the ledger.
// initialize it from the input ledger
info!("processed {} ledger...", entry_height);
info!("creating networking stack..."); info!("creating networking stack...");
let local_gossip_addr = node.sockets.gossip.local_addr().unwrap(); let local_gossip_addr = node.sockets.gossip.local_addr().unwrap();
info!( info!(
"starting... local gossip address: {} (advertising {})", "starting... local gossip address: {} (advertising {})",
local_gossip_addr, node.info.contact_info.ncp local_gossip_addr, node.info.contact_info.ncp
@ -133,7 +135,7 @@ impl Fullnode {
&ledger_tail, &ledger_tail,
node, node,
leader_info.as_ref(), leader_info.as_ref(),
Some(ledger_path), ledger_path,
sigverify_disabled, sigverify_disabled,
); );
@ -212,7 +214,7 @@ impl Fullnode {
ledger_tail: &[Entry], ledger_tail: &[Entry],
mut node: Node, mut node: Node,
leader_info: Option<&NodeInfo>, leader_info: Option<&NodeInfo>,
ledger_path: Option<&str>, ledger_path: &str,
sigverify_disabled: bool, sigverify_disabled: bool,
) -> Self { ) -> Self {
if leader_info.is_none() { if leader_info.is_none() {
@ -253,11 +255,12 @@ impl Fullnode {
&crdt, &crdt,
shared_window.clone(), shared_window.clone(),
blob_recycler.clone(), blob_recycler.clone(),
ledger_path, Some(ledger_path),
node.sockets.gossip, node.sockets.gossip,
exit.clone(), exit.clone(),
); );
let keypair = Arc::new(keypair);
let node_role; let node_role;
match leader_info { match leader_info {
Some(leader_info) => { Some(leader_info) => {
@ -265,16 +268,26 @@ impl Fullnode {
// TODO: let Crdt get that data from the network? // TODO: let Crdt get that data from the network?
crdt.write().unwrap().insert(leader_info); crdt.write().unwrap().insert(leader_info);
let tvu = Tvu::new( let tvu = Tvu::new(
keypair, keypair.clone(),
&bank, &bank,
entry_height, entry_height,
crdt, crdt.clone(),
shared_window, shared_window.clone(),
blob_recycler.clone(), blob_recycler.clone(),
node.sockets.replicate, node.sockets
node.sockets.repair, .replicate
node.sockets.retransmit, .iter()
ledger_path, .map(|s| s.try_clone().expect("Failed to clone replicate sockets"))
.collect(),
node.sockets
.repair
.try_clone()
.expect("Failed to clone repair socket"),
node.sockets
.retransmit
.try_clone()
.expect("Failed to clone retransmit socket"),
Some(ledger_path),
exit.clone(), exit.clone(),
); );
let validator_state = ValidatorServices::new(tvu); let validator_state = ValidatorServices::new(tvu);
@ -282,17 +295,20 @@ impl Fullnode {
} }
None => { None => {
// Start in leader mode. // Start in leader mode.
let ledger_path = ledger_path.expect("ledger path");
let tick_duration = None; let tick_duration = None;
// TODO: To light up PoH, uncomment the following line: // TODO: To light up PoH, uncomment the following line:
//let tick_duration = Some(Duration::from_millis(1000)); //let tick_duration = Some(Duration::from_millis(1000));
let (tpu, entry_receiver) = Tpu::new( let (tpu, entry_receiver) = Tpu::new(
keypair, keypair.clone(),
&bank, &bank,
&crdt, &crdt,
tick_duration, tick_duration,
node.sockets.transaction, node.sockets
.transaction
.iter()
.map(|s| s.try_clone().expect("Failed to clone transaction sockets"))
.collect(),
&blob_recycler, &blob_recycler,
exit.clone(), exit.clone(),
ledger_path, ledger_path,
@ -301,9 +317,12 @@ impl Fullnode {
); );
let broadcast_stage = BroadcastStage::new( let broadcast_stage = BroadcastStage::new(
node.sockets.broadcast, node.sockets
crdt, .broadcast
shared_window, .try_clone()
.expect("Failed to clone broadcast socket"),
crdt.clone(),
shared_window.clone(),
entry_height, entry_height,
blob_recycler.clone(), blob_recycler.clone(),
entry_receiver, entry_receiver,
@ -314,11 +333,81 @@ impl Fullnode {
} }
Fullnode { Fullnode {
keypair,
crdt,
shared_window,
bank,
sigverify_disabled,
rpu, rpu,
ncp, ncp,
rpc_service, rpc_service,
node_role, node_role,
blob_recycler: blob_recycler.clone(),
ledger_path: ledger_path.to_owned(),
exit, exit,
replicate_socket: node.sockets.replicate,
repair_socket: node.sockets.repair,
retransmit_socket: node.sockets.retransmit,
transaction_sockets: node.sockets.transaction,
broadcast_socket: node.sockets.broadcast,
}
}
fn leader_to_validator(&mut self) {
// TODO: We can avoid building the bank again once RecordStage is
// integrated with BankingStage
let (bank, entry_height, _) = Self::new_bank_from_ledger(&self.ledger_path);
self.bank = Arc::new(bank);
{
let mut wcrdt = self.crdt.write().unwrap();
let scheduled_leader = wcrdt.get_scheduled_leader(entry_height);
match scheduled_leader {
//TODO: Handle the case where we don't know who the next
//scheduled leader is
None => (),
Some(leader_id) => wcrdt.set_leader(leader_id),
}
}
let tvu = Tvu::new(
self.keypair.clone(),
&self.bank,
entry_height,
self.crdt.clone(),
self.shared_window.clone(),
self.blob_recycler.clone(),
self.replicate_socket
.iter()
.map(|s| s.try_clone().expect("Failed to clone replicate sockets"))
.collect(),
self.repair_socket
.try_clone()
.expect("Failed to clone repair socket"),
self.retransmit_socket
.try_clone()
.expect("Failed to clone retransmit socket"),
Some(&self.ledger_path),
self.exit.clone(),
);
let validator_state = ValidatorServices::new(tvu);
self.node_role = Some(NodeRole::Validator(validator_state));
}
pub fn handle_role_transition(&mut self) -> Result<Option<FullnodeReturnType>> {
let node_role = self.node_role.take();
match node_role {
Some(NodeRole::Leader(leader_services)) => match leader_services.join()? {
Some(TpuReturnType::LeaderRotation) => {
self.leader_to_validator();
Ok(Some(FullnodeReturnType::LeaderRotation))
}
_ => Ok(None),
},
Some(NodeRole::Validator(validator_services)) => match validator_services.join()? {
_ => Ok(None),
},
None => Ok(None),
} }
} }
@ -331,6 +420,19 @@ impl Fullnode {
self.exit(); self.exit();
self.join() self.join()
} }
fn new_bank_from_ledger(ledger_path: &str) -> (Bank, u64, Vec<Entry>) {
let bank = Bank::new_default(false);
let entries = read_ledger(ledger_path, true).expect("opening ledger");
let entries = entries
.map(|e| e.unwrap_or_else(|err| panic!("failed to parse entry. error: {}", err)));
info!("processing ledger...");
let (entry_height, ledger_tail) = bank.process_ledger(entries).expect("process_ledger");
// entry_height is the network-wide agreed height of the ledger.
// initialize it from the input ledger
info!("processed {} ledger...", entry_height);
(bank, entry_height, ledger_tail)
}
} }
impl Service for Fullnode { impl Service for Fullnode {
@ -365,31 +467,56 @@ mod tests {
use bank::Bank; use bank::Bank;
use crdt::Node; use crdt::Node;
use fullnode::Fullnode; use fullnode::Fullnode;
use mint::Mint; use ledger::genesis;
use service::Service; use service::Service;
use signature::{Keypair, KeypairUtil}; use signature::{Keypair, KeypairUtil};
use std::fs::remove_dir_all;
#[test] #[test]
fn validator_exit() { fn validator_exit() {
let keypair = Keypair::new(); let keypair = Keypair::new();
let tn = Node::new_localhost_with_pubkey(keypair.pubkey()); let tn = Node::new_localhost_with_pubkey(keypair.pubkey());
let alice = Mint::new(10_000); let (alice, validator_ledger_path) = genesis("validator_exit", 10_000);
let bank = Bank::new(&alice); let bank = Bank::new(&alice);
let entry = tn.info.clone(); let entry = tn.info.clone();
let v = Fullnode::new_with_bank(keypair, bank, 0, &[], tn, Some(&entry), None, false); let v = Fullnode::new_with_bank(
keypair,
bank,
0,
&[],
tn,
Some(&entry),
&validator_ledger_path,
false,
);
v.close().unwrap(); v.close().unwrap();
remove_dir_all(validator_ledger_path).unwrap();
} }
#[test] #[test]
fn validator_parallel_exit() { fn validator_parallel_exit() {
let mut ledger_paths = vec![];
let vals: Vec<Fullnode> = (0..2) let vals: Vec<Fullnode> = (0..2)
.map(|_| { .map(|i| {
let keypair = Keypair::new(); let keypair = Keypair::new();
let tn = Node::new_localhost_with_pubkey(keypair.pubkey()); let tn = Node::new_localhost_with_pubkey(keypair.pubkey());
let alice = Mint::new(10_000); let (alice, validator_ledger_path) =
genesis(&format!("validator_parallel_exit_{}", i), 10_000);
ledger_paths.push(validator_ledger_path.clone());
let bank = Bank::new(&alice); let bank = Bank::new(&alice);
let entry = tn.info.clone(); let entry = tn.info.clone();
Fullnode::new_with_bank(keypair, bank, 0, &[], tn, Some(&entry), None, false) Fullnode::new_with_bank(
}).collect(); keypair,
bank,
0,
&[],
tn,
Some(&entry),
&validator_ledger_path,
false,
)
})
.collect();
//each validator can exit in parallel to speed many sequential calls to `join` //each validator can exit in parallel to speed many sequential calls to `join`
vals.iter().for_each(|v| v.exit()); vals.iter().for_each(|v| v.exit());
//while join is called sequentially, the above exit call notified all the //while join is called sequentially, the above exit call notified all the
@ -397,5 +524,9 @@ mod tests {
vals.into_iter().for_each(|v| { vals.into_iter().for_each(|v| {
v.join().unwrap(); v.join().unwrap();
}); });
for path in ledger_paths {
remove_dir_all(path).unwrap();
}
} }
} }

View File

@ -67,7 +67,7 @@ impl ReplicateStage {
Ok(()) Ok(())
} }
pub fn new( pub fn new(
keypair: Keypair, keypair: Arc<Keypair>,
bank: Arc<Bank>, bank: Arc<Bank>,
crdt: Arc<RwLock<Crdt>>, crdt: Arc<RwLock<Crdt>>,
blob_recycler: BlobRecycler, blob_recycler: BlobRecycler,
@ -85,7 +85,7 @@ impl ReplicateStage {
); );
let vote_stage = VoteStage::new( let vote_stage = VoteStage::new(
Arc::new(keypair), keypair,
bank.clone(), bank.clone(),
crdt.clone(), crdt.clone(),
blob_recycler.clone(), blob_recycler.clone(),

View File

@ -470,7 +470,7 @@ mod tests {
&[], &[],
leader, leader,
None, None,
Some(&ledger_path), &ledger_path,
false, false,
); );
sleep(Duration::from_millis(900)); sleep(Duration::from_millis(900));
@ -515,7 +515,7 @@ mod tests {
&[], &[],
leader, leader,
None, None,
Some(&ledger_path), &ledger_path,
false, false,
); );
//TODO: remove this sleep, or add a retry so CI is stable //TODO: remove this sleep, or add a retry so CI is stable
@ -573,7 +573,7 @@ mod tests {
&[], &[],
leader, leader,
None, None,
Some(&ledger_path), &ledger_path,
false, false,
); );
sleep(Duration::from_millis(300)); sleep(Duration::from_millis(300));
@ -632,7 +632,7 @@ mod tests {
&[], &[],
leader, leader,
None, None,
Some(&ledger_path), &ledger_path,
false, false,
); );
sleep(Duration::from_millis(900)); sleep(Duration::from_millis(900));

View File

@ -57,7 +57,7 @@ pub struct Tpu {
impl Tpu { impl Tpu {
pub fn new( pub fn new(
keypair: Keypair, keypair: Arc<Keypair>,
bank: &Arc<Bank>, bank: &Arc<Bank>,
crdt: &Arc<RwLock<Crdt>>, crdt: &Arc<RwLock<Crdt>>,
tick_duration: Option<Duration>, tick_duration: Option<Duration>,

View File

@ -70,7 +70,7 @@ impl Tvu {
/// * `exit` - The exit signal. /// * `exit` - The exit signal.
#[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))] #[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))]
pub fn new( pub fn new(
keypair: Keypair, keypair: Arc<Keypair>,
bank: &Arc<Bank>, bank: &Arc<Bank>,
entry_height: u64, entry_height: u64,
crdt: Arc<RwLock<Crdt>>, crdt: Arc<RwLock<Crdt>>,
@ -236,7 +236,7 @@ pub mod tests {
let dr_1 = new_ncp(cref1.clone(), target1.sockets.gossip, exit.clone()); let dr_1 = new_ncp(cref1.clone(), target1.sockets.gossip, exit.clone());
let tvu = Tvu::new( let tvu = Tvu::new(
target1_keypair, Arc::new(target1_keypair),
&bank, &bank,
0, 0,
cref1, cref1,

View File

@ -385,7 +385,7 @@ mod tests {
&[], &[],
leader, leader,
None, None,
Some(&ledger_path), &ledger_path,
false, false,
); );
sleep(Duration::from_millis(200)); sleep(Duration::from_millis(200));
@ -451,7 +451,7 @@ mod tests {
&[], &[],
leader, leader,
None, None,
Some(&ledger_path), &ledger_path,
false, false,
); );
sleep(Duration::from_millis(200)); sleep(Duration::from_millis(200));

View File

@ -170,7 +170,7 @@ impl WriteStage {
/// Create a new WriteStage for writing and broadcasting entries. /// Create a new WriteStage for writing and broadcasting entries.
pub fn new( pub fn new(
keypair: Keypair, keypair: Arc<Keypair>,
bank: Arc<Bank>, bank: Arc<Bank>,
crdt: Arc<RwLock<Crdt>>, crdt: Arc<RwLock<Crdt>>,
blob_recycler: BlobRecycler, blob_recycler: BlobRecycler,
@ -315,7 +315,7 @@ mod tests {
Vec<Entry>, Vec<Entry>,
) { ) {
// Setup leader info // Setup leader info
let leader_keypair = Keypair::new(); let leader_keypair = Arc::new(Keypair::new());
let id = leader_keypair.pubkey(); let id = leader_keypair.pubkey();
let leader_info = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_info = Node::new_localhost_with_pubkey(leader_keypair.pubkey());