Cleanup replicator sockets

Add optional UdpSocket for storage interface.
Add new_localhost_replicator to create a new replicator local node.
This commit is contained in:
Stephen Akridge
2019-03-18 11:02:32 -07:00
committed by sakridge
parent 1be7ee51be
commit 07f4dd385d
3 changed files with 31 additions and 3 deletions

View File

@ -1347,6 +1347,7 @@ pub struct Sockets {
pub broadcast: UdpSocket, pub broadcast: UdpSocket,
pub repair: UdpSocket, pub repair: UdpSocket,
pub retransmit: UdpSocket, pub retransmit: UdpSocket,
pub storage: Option<UdpSocket>,
} }
#[derive(Debug)] #[derive(Debug)]
@ -1360,6 +1361,13 @@ impl Node {
let pubkey = Keypair::new().pubkey(); let pubkey = Keypair::new().pubkey();
Self::new_localhost_with_pubkey(&pubkey) Self::new_localhost_with_pubkey(&pubkey)
} }
pub fn new_localhost_replicator(pubkey: &Pubkey) -> Self {
let mut new = Self::new_localhost_with_pubkey(pubkey);
let storage = UdpSocket::bind("127.0.0.1:0").unwrap();
new.info.storage_addr = storage.local_addr().unwrap();
new.sockets.storage = Some(storage);
new
}
pub fn new_localhost_with_pubkey(pubkey: &Pubkey) -> Self { pub fn new_localhost_with_pubkey(pubkey: &Pubkey) -> Self {
let tpu = UdpSocket::bind("127.0.0.1:0").unwrap(); let tpu = UdpSocket::bind("127.0.0.1:0").unwrap();
let gossip = UdpSocket::bind("127.0.0.1:0").unwrap(); let gossip = UdpSocket::bind("127.0.0.1:0").unwrap();
@ -1396,6 +1404,7 @@ impl Node {
broadcast, broadcast,
repair, repair,
retransmit, retransmit,
storage: None,
}, },
} }
} }
@ -1427,7 +1436,6 @@ impl Node {
let (_, repair) = bind(); let (_, repair) = bind();
let (_, broadcast) = bind(); let (_, broadcast) = bind();
let (_, retransmit) = bind(); let (_, retransmit) = bind();
let (storage_port, _) = bind();
let info = ContactInfo::new( let info = ContactInfo::new(
pubkey, pubkey,
@ -1435,7 +1443,7 @@ impl Node {
SocketAddr::new(gossip_addr.ip(), tvu_port), SocketAddr::new(gossip_addr.ip(), tvu_port),
SocketAddr::new(gossip_addr.ip(), tpu_port), SocketAddr::new(gossip_addr.ip(), tpu_port),
SocketAddr::new(gossip_addr.ip(), tpu_via_blobs_port), SocketAddr::new(gossip_addr.ip(), tpu_via_blobs_port),
SocketAddr::new(gossip_addr.ip(), storage_port), "0.0.0.0:0".parse().unwrap(),
SocketAddr::new(gossip_addr.ip(), rpc_port::DEFAULT_RPC_PORT), SocketAddr::new(gossip_addr.ip(), rpc_port::DEFAULT_RPC_PORT),
SocketAddr::new(gossip_addr.ip(), rpc_port::DEFAULT_RPC_PUBSUB_PORT), SocketAddr::new(gossip_addr.ip(), rpc_port::DEFAULT_RPC_PUBSUB_PORT),
0, 0,
@ -1452,6 +1460,7 @@ impl Node {
broadcast, broadcast,
repair, repair,
retransmit, retransmit,
storage: None,
}, },
} }
} }

View File

@ -122,13 +122,20 @@ impl Replicator {
#[allow(clippy::new_ret_no_self)] #[allow(clippy::new_ret_no_self)]
pub fn new( pub fn new(
ledger_path: &str, ledger_path: &str,
node: Node, mut node: Node,
cluster_entrypoint: ContactInfo, cluster_entrypoint: ContactInfo,
keypair: Arc<Keypair>, keypair: Arc<Keypair>,
_timeout: Option<Duration>, _timeout: Option<Duration>,
) -> Result<Self> { ) -> Result<Self> {
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
// replicator cannot give information on rpc and
// cannot be leader so tpu/rpc ports are cleared
node.info.rpc = "0.0.0.0:0".parse().unwrap();
node.info.rpc_pubsub = "0.0.0.0:0".parse().unwrap();
node.info.tpu = "0.0.0.0:0".parse().unwrap();
node.info.tpu_via_blobs = "0.0.0.0:0".parse().unwrap();
info!("Replicator: id: {}", keypair.pubkey()); info!("Replicator: id: {}", keypair.pubkey());
info!("Creating cluster info...."); info!("Creating cluster info....");
let mut cluster_info = ClusterInfo::new(node.info.clone(), keypair.clone()); let mut cluster_info = ClusterInfo::new(node.info.clone(), keypair.clone());

View File

@ -8,6 +8,7 @@ use solana::blocktree::{create_new_tmp_ledger, tmp_copy_blocktree, Blocktree};
use solana::cluster_info::Node; use solana::cluster_info::Node;
use solana::contact_info::ContactInfo; use solana::contact_info::ContactInfo;
use solana::fullnode::{Fullnode, FullnodeConfig}; use solana::fullnode::{Fullnode, FullnodeConfig};
use solana::gossip_service::discover;
use solana::local_cluster::LocalCluster; use solana::local_cluster::LocalCluster;
use solana::replicator::Replicator; use solana::replicator::Replicator;
use solana::storage_stage::STORAGE_ROTATE_TEST_COUNT; use solana::storage_stage::STORAGE_ROTATE_TEST_COUNT;
@ -35,6 +36,17 @@ fn test_replicator_startup_basic() {
DEFAULT_TICKS_PER_SLOT, DEFAULT_TICKS_PER_SLOT,
DEFAULT_SLOTS_PER_EPOCH, DEFAULT_SLOTS_PER_EPOCH,
); );
let cluster_nodes = discover(&cluster.entry_point_info.gossip, 3).unwrap();
assert_eq!(cluster_nodes.len(), 3);
let mut replicator_count = 0;
for node in &cluster_nodes {
info!("storage: {:?} rpc: {:?}", node.storage_addr, node.rpc);
if ContactInfo::is_valid_address(&node.storage_addr) {
replicator_count += 1;
}
}
assert_eq!(replicator_count, 1);
} }
#[test] #[test]