data_replicator -> ncp

Fixes #327
This commit is contained in:
Greg Fitzgerald
2018-06-07 16:06:32 -06:00
parent cdfbbe5e60
commit 7aa05618a3
7 changed files with 29 additions and 29 deletions

View File

@ -11,8 +11,8 @@ use isatty::stdin_isatty;
use pnet::datalink; use pnet::datalink;
use rayon::prelude::*; use rayon::prelude::*;
use solana::crdt::{Crdt, ReplicatedData}; use solana::crdt::{Crdt, ReplicatedData};
use solana::data_replicator::DataReplicator;
use solana::mint::MintDemo; use solana::mint::MintDemo;
use solana::ncp::Ncp;
use solana::signature::{GenKeys, KeyPair, KeyPairUtil}; use solana::signature::{GenKeys, KeyPair, KeyPairUtil};
use solana::streamer::default_window; use solana::streamer::default_window;
use solana::thin_client::ThinClient; use solana::thin_client::ThinClient;
@ -290,7 +290,7 @@ fn converge(
let spy_ref = Arc::new(RwLock::new(spy_crdt)); let spy_ref = Arc::new(RwLock::new(spy_crdt));
let window = default_window(); let window = default_window();
let gossip_send_socket = UdpSocket::bind("0.0.0.0:0").expect("bind 0"); let gossip_send_socket = UdpSocket::bind("0.0.0.0:0").expect("bind 0");
let data_replicator = DataReplicator::new( let ncp = Ncp::new(
spy_ref.clone(), spy_ref.clone(),
window.clone(), window.clone(),
spy_gossip, spy_gossip,
@ -316,7 +316,7 @@ fn converge(
} }
sleep(Duration::new(1, 0)); sleep(Duration::new(1, 0));
} }
threads.extend(data_replicator.thread_hdls.into_iter()); threads.extend(ncp.thread_hdls.into_iter());
rv rv
} }

View File

@ -13,7 +13,6 @@ pub mod bank;
pub mod banking_stage; pub mod banking_stage;
pub mod budget; pub mod budget;
pub mod crdt; pub mod crdt;
pub mod data_replicator;
pub mod entry; pub mod entry;
pub mod entry_writer; pub mod entry_writer;
#[cfg(feature = "erasure")] #[cfg(feature = "erasure")]
@ -23,6 +22,7 @@ pub mod hash;
pub mod ledger; pub mod ledger;
pub mod logger; pub mod logger;
pub mod mint; pub mod mint;
pub mod ncp;
pub mod packet; pub mod packet;
pub mod payment_plan; pub mod payment_plan;
pub mod record_stage; pub mod record_stage;

View File

@ -1,4 +1,4 @@
//! The `data_replicator` module implements the replication threads. //! The `ncp` module implements the network control plane.
use crdt; use crdt;
use packet; use packet;
@ -10,22 +10,22 @@ use std::sync::{Arc, RwLock};
use std::thread::JoinHandle; use std::thread::JoinHandle;
use streamer; use streamer;
pub struct DataReplicator { pub struct Ncp {
pub thread_hdls: Vec<JoinHandle<()>>, pub thread_hdls: Vec<JoinHandle<()>>,
} }
impl DataReplicator { impl Ncp {
pub fn new( pub fn new(
crdt: Arc<RwLock<crdt::Crdt>>, crdt: Arc<RwLock<crdt::Crdt>>,
window: Arc<RwLock<Vec<Option<packet::SharedBlob>>>>, window: Arc<RwLock<Vec<Option<packet::SharedBlob>>>>,
gossip_listen_socket: UdpSocket, gossip_listen_socket: UdpSocket,
gossip_send_socket: UdpSocket, gossip_send_socket: UdpSocket,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
) -> Result<DataReplicator> { ) -> Result<Ncp> {
let blob_recycler = packet::BlobRecycler::default(); let blob_recycler = packet::BlobRecycler::default();
let (request_sender, request_receiver) = channel(); let (request_sender, request_receiver) = channel();
trace!( trace!(
"DataReplicator: id: {:?}, listening on: {:?}", "Ncp: id: {:?}, listening on: {:?}",
&crdt.read().unwrap().me[..4], &crdt.read().unwrap().me[..4],
gossip_listen_socket.local_addr().unwrap() gossip_listen_socket.local_addr().unwrap()
); );
@ -52,14 +52,14 @@ impl DataReplicator {
); );
let t_gossip = crdt::Crdt::gossip(crdt.clone(), blob_recycler, response_sender, exit); let t_gossip = crdt::Crdt::gossip(crdt.clone(), blob_recycler, response_sender, exit);
let thread_hdls = vec![t_receiver, t_responder, t_listen, t_gossip]; let thread_hdls = vec![t_receiver, t_responder, t_listen, t_gossip];
Ok(DataReplicator { thread_hdls }) Ok(Ncp { thread_hdls })
} }
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crdt::{Crdt, TestNode}; use crdt::{Crdt, TestNode};
use data_replicator::DataReplicator; use ncp::Ncp;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
@ -71,7 +71,7 @@ mod tests {
let crdt = Crdt::new(tn.data.clone()); let crdt = Crdt::new(tn.data.clone());
let c = Arc::new(RwLock::new(crdt)); let c = Arc::new(RwLock::new(crdt));
let w = Arc::new(RwLock::new(vec![])); let w = Arc::new(RwLock::new(vec![]));
let d = DataReplicator::new( let d = Ncp::new(
c.clone(), c.clone(),
w, w,
tn.sockets.gossip, tn.sockets.gossip,

View File

@ -2,7 +2,7 @@
use bank::Bank; use bank::Bank;
use crdt::{Crdt, ReplicatedData}; use crdt::{Crdt, ReplicatedData};
use data_replicator::DataReplicator; use ncp::Ncp;
use packet; use packet;
use rpu::Rpu; use rpu::Rpu;
use std::io::Write; use std::io::Write;
@ -75,14 +75,14 @@ impl Server {
let crdt = Arc::new(RwLock::new(Crdt::new(me))); let crdt = Arc::new(RwLock::new(Crdt::new(me)));
let window = streamer::default_window(); let window = streamer::default_window();
let gossip_send_socket = UdpSocket::bind("0.0.0.0:0").expect("bind 0"); let gossip_send_socket = UdpSocket::bind("0.0.0.0:0").expect("bind 0");
let data_replicator = DataReplicator::new( let ncp = Ncp::new(
crdt.clone(), crdt.clone(),
window.clone(), window.clone(),
gossip_socket, gossip_socket,
gossip_send_socket, gossip_send_socket,
exit.clone(), exit.clone(),
).expect("DataReplicator::new"); ).expect("Ncp::new");
thread_hdls.extend(data_replicator.thread_hdls); thread_hdls.extend(ncp.thread_hdls);
let t_broadcast = streamer::broadcaster( let t_broadcast = streamer::broadcaster(
broadcast_socket, broadcast_socket,

View File

@ -22,7 +22,7 @@
use bank::Bank; use bank::Bank;
use crdt::{Crdt, ReplicatedData}; use crdt::{Crdt, ReplicatedData};
use data_replicator::DataReplicator; use ncp::Ncp;
use packet; use packet;
use replicate_stage::ReplicateStage; use replicate_stage::ReplicateStage;
use std::net::UdpSocket; use std::net::UdpSocket;
@ -65,13 +65,13 @@ impl Tvu {
.insert(&leader); .insert(&leader);
let window = streamer::default_window(); let window = streamer::default_window();
let gossip_send_socket = UdpSocket::bind("0.0.0.0:0").expect("bind 0"); let gossip_send_socket = UdpSocket::bind("0.0.0.0:0").expect("bind 0");
let data_replicator = DataReplicator::new( let ncp = Ncp::new(
crdt.clone(), crdt.clone(),
window.clone(), window.clone(),
gossip_listen_socket, gossip_listen_socket,
gossip_send_socket, gossip_send_socket,
exit.clone(), exit.clone(),
).expect("DataReplicator::new"); ).expect("Ncp::new");
// TODO pull this socket out through the public interface // TODO pull this socket out through the public interface
// make sure we are on the same interface // make sure we are on the same interface
@ -132,7 +132,7 @@ impl Tvu {
t_repair_receiver, t_repair_receiver,
replicate_stage.thread_hdl, replicate_stage.thread_hdl,
]; ];
threads.extend(data_replicator.thread_hdls.into_iter()); threads.extend(ncp.thread_hdls.into_iter());
Tvu { Tvu {
thread_hdls: threads, thread_hdls: threads,
} }
@ -144,11 +144,11 @@ pub mod tests {
use bank::Bank; use bank::Bank;
use bincode::serialize; use bincode::serialize;
use crdt::{Crdt, TestNode}; use crdt::{Crdt, TestNode};
use data_replicator::DataReplicator;
use entry::Entry; use entry::Entry;
use hash::{hash, Hash}; use hash::{hash, Hash};
use logger; use logger;
use mint::Mint; use mint::Mint;
use ncp::Ncp;
use packet::BlobRecycler; use packet::BlobRecycler;
use result::Result; use result::Result;
use signature::{KeyPair, KeyPairUtil}; use signature::{KeyPair, KeyPairUtil};
@ -166,10 +166,10 @@ pub mod tests {
crdt: Arc<RwLock<Crdt>>, crdt: Arc<RwLock<Crdt>>,
listen: UdpSocket, listen: UdpSocket,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
) -> Result<DataReplicator> { ) -> Result<Ncp> {
let window = streamer::default_window(); let window = streamer::default_window();
let send_sock = UdpSocket::bind("0.0.0.0:0").expect("bind 0"); let send_sock = UdpSocket::bind("0.0.0.0:0").expect("bind 0");
DataReplicator::new(crdt, window, listen, send_sock, exit) Ncp::new(crdt, window, listen, send_sock, exit)
} }
/// Test that message sent from leader to target1 and replicated to target2 /// Test that message sent from leader to target1 and replicated to target2
#[test] #[test]

View File

@ -5,8 +5,8 @@ extern crate solana;
use rayon::iter::*; use rayon::iter::*;
use solana::crdt::{Crdt, TestNode}; use solana::crdt::{Crdt, TestNode};
use solana::data_replicator::DataReplicator;
use solana::logger; use solana::logger;
use solana::ncp::Ncp;
use solana::packet::Blob; use solana::packet::Blob;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
@ -14,12 +14,12 @@ use std::sync::{Arc, RwLock};
use std::thread::sleep; use std::thread::sleep;
use std::time::Duration; use std::time::Duration;
fn test_node(exit: Arc<AtomicBool>) -> (Arc<RwLock<Crdt>>, DataReplicator, UdpSocket) { fn test_node(exit: Arc<AtomicBool>) -> (Arc<RwLock<Crdt>>, Ncp, UdpSocket) {
let tn = TestNode::new(); let tn = TestNode::new();
let crdt = Crdt::new(tn.data.clone()); let crdt = Crdt::new(tn.data.clone());
let c = Arc::new(RwLock::new(crdt)); let c = Arc::new(RwLock::new(crdt));
let w = Arc::new(RwLock::new(vec![])); let w = Arc::new(RwLock::new(vec![]));
let d = DataReplicator::new( let d = Ncp::new(
c.clone(), c.clone(),
w, w,
tn.sockets.gossip, tn.sockets.gossip,
@ -35,7 +35,7 @@ fn test_node(exit: Arc<AtomicBool>) -> (Arc<RwLock<Crdt>>, DataReplicator, UdpSo
/// tests that actually use this function are below /// tests that actually use this function are below
fn run_gossip_topo<F>(topo: F) fn run_gossip_topo<F>(topo: F)
where where
F: Fn(&Vec<(Arc<RwLock<Crdt>>, DataReplicator, UdpSocket)>) -> (), F: Fn(&Vec<(Arc<RwLock<Crdt>>, Ncp, UdpSocket)>) -> (),
{ {
let num: usize = 5; let num: usize = 5;
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));

View File

@ -6,9 +6,9 @@ extern crate solana;
use solana::bank::Bank; use solana::bank::Bank;
use solana::crdt::TestNode; use solana::crdt::TestNode;
use solana::crdt::{Crdt, ReplicatedData}; use solana::crdt::{Crdt, ReplicatedData};
use solana::data_replicator::DataReplicator;
use solana::logger; use solana::logger;
use solana::mint::Mint; use solana::mint::Mint;
use solana::ncp::Ncp;
use solana::server::Server; use solana::server::Server;
use solana::signature::{KeyPair, KeyPairUtil, PublicKey}; use solana::signature::{KeyPair, KeyPairUtil, PublicKey};
use solana::streamer::default_window; use solana::streamer::default_window;
@ -61,7 +61,7 @@ fn converge(
spy_crdt.set_leader(leader.id); spy_crdt.set_leader(leader.id);
let spy_ref = Arc::new(RwLock::new(spy_crdt)); let spy_ref = Arc::new(RwLock::new(spy_crdt));
let spy_window = default_window(); let spy_window = default_window();
let dr = DataReplicator::new( let dr = Ncp::new(
spy_ref.clone(), spy_ref.clone(),
spy_window, spy_window,
spy.sockets.gossip, spy.sockets.gossip,