StorageStage now sends transactions at the local TPU

This commit is contained in:
Michael Vines
2019-03-08 10:36:56 -08:00
parent 7bd0929157
commit 12f3fd75e8

View File

@ -195,8 +195,10 @@ impl StorageStage {
.spawn(move || loop { .spawn(move || loop {
match tx_receiver.recv_timeout(Duration::from_secs(1)) { match tx_receiver.recv_timeout(Duration::from_secs(1)) {
Ok(mut tx) => { Ok(mut tx) => {
if Self::send_tx(&cluster_info0, &mut tx, &exit1, &keypair1, None).is_ok() { if Self::send_transaction(&cluster_info0, &mut tx, &exit1, &keypair1, None)
debug!("sent tx: {:?}", tx); .is_ok()
{
debug!("sent transaction: {:?}", tx);
} }
} }
Err(e) => match e { Err(e) => match e {
@ -218,58 +220,57 @@ impl StorageStage {
} }
} }
fn send_tx( fn send_transaction(
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
tx: &mut Transaction, transaction: &mut Transaction,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
keypair: &Arc<Keypair>, keypair: &Arc<Keypair>,
account_to_create: Option<Pubkey>, account_to_create: Option<Pubkey>,
) -> io::Result<()> { ) -> io::Result<()> {
if let Some(leader_info) = cluster_info.read().unwrap().leader_data() { let node_info = cluster_info.read().unwrap().my_data();
let mut client = mk_client_with_timeout(leader_info, Duration::from_secs(5)); let mut client = mk_client_with_timeout(&node_info, Duration::from_secs(5));
if let Some(account) = account_to_create { if let Some(account) = account_to_create {
if client.get_account_userdata(&account).is_ok() { if client.get_account_userdata(&account).is_ok() {
return Ok(()); return Ok(());
} }
}
let mut blockhash = None;
for _ in 0..10 {
if let Some(new_blockhash) = client.try_get_recent_blockhash(1) {
blockhash = Some(new_blockhash);
break;
} }
let mut blockhash = None; if exit.load(Ordering::Relaxed) {
for _ in 0..10 { Err(io::Error::new(io::ErrorKind::Other, "exit signaled"))?;
if let Some(new_blockhash) = client.try_get_recent_blockhash(1) { }
blockhash = Some(new_blockhash); }
break;
}
if exit.load(Ordering::Relaxed) { if let Some(blockhash) = blockhash {
Err(io::Error::new(io::ErrorKind::Other, "exit signaled"))?; transaction.sign(&[keypair.as_ref()], blockhash);
}
if exit.load(Ordering::Relaxed) {
Err(io::Error::new(io::ErrorKind::Other, "exit signaled"))?;
} }
if let Some(blockhash) = blockhash { if let Ok(signature) = client.transfer_signed(&transaction) {
tx.sign(&[keypair.as_ref()], blockhash); for _ in 0..10 {
if client.check_signature(&signature) {
if exit.load(Ordering::Relaxed) { return Ok(());
Err(io::Error::new(io::ErrorKind::Other, "exit signaled"))?;
}
if let Ok(signature) = client.transfer_signed(&tx) {
for _ in 0..10 {
if client.check_signature(&signature) {
return Ok(());
}
if exit.load(Ordering::Relaxed) {
Err(io::Error::new(io::ErrorKind::Other, "exit signaled"))?;
}
sleep(Duration::from_millis(200));
} }
if exit.load(Ordering::Relaxed) {
Err(io::Error::new(io::ErrorKind::Other, "exit signaled"))?;
}
sleep(Duration::from_millis(200));
} }
} }
} }
Err(io::Error::new(io::ErrorKind::Other, "leader not found")) Err(io::Error::new(io::ErrorKind::Other, "other failure"))
} }
pub fn process_entry_crossing( pub fn process_entry_crossing(