Add SendTransactionService
This commit is contained in:
parent
2da9b12d67
commit
ed351400b2
@ -48,6 +48,7 @@ pub mod rpc_pubsub;
|
|||||||
pub mod rpc_pubsub_service;
|
pub mod rpc_pubsub_service;
|
||||||
pub mod rpc_service;
|
pub mod rpc_service;
|
||||||
pub mod rpc_subscriptions;
|
pub mod rpc_subscriptions;
|
||||||
|
pub mod send_transaction_service;
|
||||||
pub mod serve_repair;
|
pub mod serve_repair;
|
||||||
pub mod serve_repair_service;
|
pub mod serve_repair_service;
|
||||||
pub mod sigverify;
|
pub mod sigverify;
|
||||||
|
172
core/src/rpc.rs
172
core/src/rpc.rs
@ -7,6 +7,7 @@ use crate::{
|
|||||||
non_circulating_supply::calculate_non_circulating_supply,
|
non_circulating_supply::calculate_non_circulating_supply,
|
||||||
rpc_error::RpcCustomError,
|
rpc_error::RpcCustomError,
|
||||||
rpc_health::*,
|
rpc_health::*,
|
||||||
|
send_transaction_service::SendTransactionService,
|
||||||
validator::ValidatorExit,
|
validator::ValidatorExit,
|
||||||
};
|
};
|
||||||
use bincode::serialize;
|
use bincode::serialize;
|
||||||
@ -44,11 +45,9 @@ use solana_vote_program::vote_state::{VoteState, MAX_LOCKOUT_HISTORY};
|
|||||||
use std::{
|
use std::{
|
||||||
cmp::{max, min},
|
cmp::{max, min},
|
||||||
collections::{HashMap, HashSet},
|
collections::{HashMap, HashSet},
|
||||||
net::{SocketAddr, UdpSocket},
|
net::SocketAddr,
|
||||||
str::FromStr,
|
str::FromStr,
|
||||||
sync::{Arc, RwLock},
|
sync::{Arc, RwLock},
|
||||||
thread::sleep,
|
|
||||||
time::{Duration, Instant},
|
|
||||||
};
|
};
|
||||||
|
|
||||||
type RpcResponse<T> = Result<Response<T>>;
|
type RpcResponse<T> = Result<Response<T>>;
|
||||||
@ -78,6 +77,7 @@ pub struct JsonRpcRequestProcessor {
|
|||||||
health: Arc<RpcHealth>,
|
health: Arc<RpcHealth>,
|
||||||
cluster_info: Arc<ClusterInfo>,
|
cluster_info: Arc<ClusterInfo>,
|
||||||
genesis_hash: Hash,
|
genesis_hash: Hash,
|
||||||
|
send_transaction_service: Arc<SendTransactionService>,
|
||||||
}
|
}
|
||||||
impl Metadata for JsonRpcRequestProcessor {}
|
impl Metadata for JsonRpcRequestProcessor {}
|
||||||
|
|
||||||
@ -136,6 +136,7 @@ impl JsonRpcRequestProcessor {
|
|||||||
health: Arc<RpcHealth>,
|
health: Arc<RpcHealth>,
|
||||||
cluster_info: Arc<ClusterInfo>,
|
cluster_info: Arc<ClusterInfo>,
|
||||||
genesis_hash: Hash,
|
genesis_hash: Hash,
|
||||||
|
send_transaction_service: Arc<SendTransactionService>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
config,
|
config,
|
||||||
@ -146,6 +147,7 @@ impl JsonRpcRequestProcessor {
|
|||||||
health,
|
health,
|
||||||
cluster_info,
|
cluster_info,
|
||||||
genesis_hash,
|
genesis_hash,
|
||||||
|
send_transaction_service,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -707,11 +709,6 @@ impl JsonRpcRequestProcessor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_tpu_addr(cluster_info: &ClusterInfo) -> Result<SocketAddr> {
|
|
||||||
let contact_info = cluster_info.my_contact_info();
|
|
||||||
Ok(contact_info.tpu)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn verify_pubkey(input: String) -> Result<Pubkey> {
|
fn verify_pubkey(input: String) -> Result<Pubkey> {
|
||||||
input
|
input
|
||||||
.parse()
|
.parse()
|
||||||
@ -1314,49 +1311,32 @@ impl RpcSol for RpcSolImpl {
|
|||||||
let faucet_addr = meta.config.faucet_addr.ok_or_else(Error::invalid_request)?;
|
let faucet_addr = meta.config.faucet_addr.ok_or_else(Error::invalid_request)?;
|
||||||
let pubkey = verify_pubkey(pubkey_str)?;
|
let pubkey = verify_pubkey(pubkey_str)?;
|
||||||
|
|
||||||
let blockhash = meta.bank(commitment)?.confirmed_last_blockhash().0;
|
let (blockhash, last_valid_slot) = {
|
||||||
|
let bank = meta.bank(commitment)?;
|
||||||
|
|
||||||
|
let blockhash = bank.confirmed_last_blockhash().0;
|
||||||
|
(
|
||||||
|
blockhash,
|
||||||
|
bank.get_blockhash_last_valid_slot(&blockhash).unwrap_or(0),
|
||||||
|
)
|
||||||
|
};
|
||||||
|
|
||||||
let transaction = request_airdrop_transaction(&faucet_addr, &pubkey, lamports, blockhash)
|
let transaction = request_airdrop_transaction(&faucet_addr, &pubkey, lamports, blockhash)
|
||||||
.map_err(|err| {
|
.map_err(|err| {
|
||||||
info!("request_airdrop_transaction failed: {:?}", err);
|
info!("request_airdrop_transaction failed: {:?}", err);
|
||||||
Error::internal_error()
|
Error::internal_error()
|
||||||
})?;
|
})?;
|
||||||
|
let signature = transaction.signatures[0];
|
||||||
|
|
||||||
let data = serialize(&transaction).map_err(|err| {
|
let wire_transaction = serialize(&transaction).map_err(|err| {
|
||||||
info!("request_airdrop: serialize error: {:?}", err);
|
info!("request_airdrop: serialize error: {:?}", err);
|
||||||
Error::internal_error()
|
Error::internal_error()
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
meta.send_transaction_service
|
||||||
let tpu_addr = get_tpu_addr(&meta.cluster_info)?;
|
.send(signature, wire_transaction, last_valid_slot);
|
||||||
transactions_socket
|
|
||||||
.send_to(&data, tpu_addr)
|
|
||||||
.map_err(|err| {
|
|
||||||
info!("request_airdrop: send_to error: {:?}", err);
|
|
||||||
Error::internal_error()
|
|
||||||
})?;
|
|
||||||
|
|
||||||
let signature = transaction.signatures[0];
|
Ok(signature.to_string())
|
||||||
let now = Instant::now();
|
|
||||||
let mut signature_status;
|
|
||||||
let signature_timeout = match &commitment {
|
|
||||||
Some(config) if config.commitment == CommitmentLevel::Recent => 5,
|
|
||||||
_ => 30,
|
|
||||||
};
|
|
||||||
loop {
|
|
||||||
signature_status = meta.get_signature_statuses(vec![signature], None)?.value[0]
|
|
||||||
.clone()
|
|
||||||
.filter(|result| result.satisfies_commitment(commitment.unwrap_or_default()))
|
|
||||||
.map(|x| x.status);
|
|
||||||
|
|
||||||
if signature_status == Some(Ok(())) {
|
|
||||||
info!("airdrop signature ok");
|
|
||||||
return Ok(signature.to_string());
|
|
||||||
} else if now.elapsed().as_secs() > signature_timeout {
|
|
||||||
info!("airdrop signature timeout");
|
|
||||||
return Err(Error::internal_error());
|
|
||||||
}
|
|
||||||
sleep(Duration::from_millis(100));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn send_transaction(
|
fn send_transaction(
|
||||||
@ -1367,7 +1347,11 @@ impl RpcSol for RpcSolImpl {
|
|||||||
) -> Result<String> {
|
) -> Result<String> {
|
||||||
let config = config.unwrap_or_default();
|
let config = config.unwrap_or_default();
|
||||||
let (wire_transaction, transaction) = deserialize_bs58_transaction(data)?;
|
let (wire_transaction, transaction) = deserialize_bs58_transaction(data)?;
|
||||||
let signature = transaction.signatures[0].to_string();
|
let signature = transaction.signatures[0];
|
||||||
|
let bank = &*meta.bank(None)?;
|
||||||
|
let last_valid_slot = bank
|
||||||
|
.get_blockhash_last_valid_slot(&transaction.message.recent_blockhash)
|
||||||
|
.unwrap_or(0);
|
||||||
|
|
||||||
if !config.skip_preflight {
|
if !config.skip_preflight {
|
||||||
if transaction.verify().is_err() {
|
if transaction.verify().is_err() {
|
||||||
@ -1384,7 +1368,6 @@ impl RpcSol for RpcSolImpl {
|
|||||||
.into());
|
.into());
|
||||||
}
|
}
|
||||||
|
|
||||||
let bank = &*meta.bank(None)?;
|
|
||||||
if let (Err(err), _log_output) = run_transaction_simulation(&bank, transaction) {
|
if let (Err(err), _log_output) = run_transaction_simulation(&bank, transaction) {
|
||||||
// Note: it's possible that the transaction simulation failed but the actual
|
// Note: it's possible that the transaction simulation failed but the actual
|
||||||
// transaction would succeed, such as when a transaction depends on an earlier
|
// transaction would succeed, such as when a transaction depends on an earlier
|
||||||
@ -1398,20 +1381,9 @@ impl RpcSol for RpcSolImpl {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
meta.send_transaction_service
|
||||||
let tpu_addr = get_tpu_addr(&meta.cluster_info)?;
|
.send(signature, wire_transaction, last_valid_slot);
|
||||||
transactions_socket
|
Ok(signature.to_string())
|
||||||
.send_to(&wire_transaction, tpu_addr)
|
|
||||||
.map_err(|err| {
|
|
||||||
info!("send_transaction: send_to error: {:?}", err);
|
|
||||||
Error::internal_error()
|
|
||||||
})?;
|
|
||||||
trace!(
|
|
||||||
"send_transaction: sent {} bytes, signature={}",
|
|
||||||
wire_transaction.len(),
|
|
||||||
signature
|
|
||||||
);
|
|
||||||
Ok(signature)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn simulate_transaction(
|
fn simulate_transaction(
|
||||||
@ -1755,10 +1727,20 @@ pub mod tests {
|
|||||||
blockstore,
|
blockstore,
|
||||||
validator_exit,
|
validator_exit,
|
||||||
RpcHealth::stub(),
|
RpcHealth::stub(),
|
||||||
cluster_info,
|
cluster_info.clone(),
|
||||||
Hash::default(),
|
Hash::default(),
|
||||||
|
Arc::new(SendTransactionService::new(
|
||||||
|
&cluster_info,
|
||||||
|
&bank_forks,
|
||||||
|
&exit,
|
||||||
|
)),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
cluster_info.insert_info(ContactInfo::new_with_pubkey_socketaddr(
|
||||||
|
&leader_pubkey,
|
||||||
|
&socketaddr!("127.0.0.1:1234"),
|
||||||
|
));
|
||||||
|
|
||||||
let mut io = MetaIoHandler::default();
|
let mut io = MetaIoHandler::default();
|
||||||
let rpc = RpcSolImpl;
|
let rpc = RpcSolImpl;
|
||||||
io.extend_with(rpc.to_delegate());
|
io.extend_with(rpc.to_delegate());
|
||||||
@ -1788,15 +1770,21 @@ pub mod tests {
|
|||||||
let block_commitment_cache = Arc::new(RwLock::new(
|
let block_commitment_cache = Arc::new(RwLock::new(
|
||||||
BlockCommitmentCache::default_with_blockstore(blockstore.clone()),
|
BlockCommitmentCache::default_with_blockstore(blockstore.clone()),
|
||||||
));
|
));
|
||||||
|
let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(ContactInfo::default()));
|
||||||
let request_processor = JsonRpcRequestProcessor::new(
|
let request_processor = JsonRpcRequestProcessor::new(
|
||||||
JsonRpcConfig::default(),
|
JsonRpcConfig::default(),
|
||||||
bank_forks,
|
bank_forks.clone(),
|
||||||
block_commitment_cache,
|
block_commitment_cache,
|
||||||
blockstore,
|
blockstore,
|
||||||
validator_exit,
|
validator_exit,
|
||||||
RpcHealth::stub(),
|
RpcHealth::stub(),
|
||||||
Arc::new(ClusterInfo::default()),
|
Arc::new(ClusterInfo::default()),
|
||||||
Hash::default(),
|
Hash::default(),
|
||||||
|
Arc::new(SendTransactionService::new(
|
||||||
|
&cluster_info,
|
||||||
|
&bank_forks,
|
||||||
|
&exit,
|
||||||
|
)),
|
||||||
);
|
);
|
||||||
thread::spawn(move || {
|
thread::spawn(move || {
|
||||||
let blockhash = bank.confirmed_last_blockhash().0;
|
let blockhash = bank.confirmed_last_blockhash().0;
|
||||||
@ -2733,6 +2721,8 @@ pub mod tests {
|
|||||||
let mut io = MetaIoHandler::default();
|
let mut io = MetaIoHandler::default();
|
||||||
let rpc = RpcSolImpl;
|
let rpc = RpcSolImpl;
|
||||||
io.extend_with(rpc.to_delegate());
|
io.extend_with(rpc.to_delegate());
|
||||||
|
let cluster_info = Arc::new(ClusterInfo::default());
|
||||||
|
let bank_forks = new_bank_forks().0;
|
||||||
let meta = JsonRpcRequestProcessor::new(
|
let meta = JsonRpcRequestProcessor::new(
|
||||||
JsonRpcConfig::default(),
|
JsonRpcConfig::default(),
|
||||||
new_bank_forks().0,
|
new_bank_forks().0,
|
||||||
@ -2740,8 +2730,13 @@ pub mod tests {
|
|||||||
blockstore,
|
blockstore,
|
||||||
validator_exit,
|
validator_exit,
|
||||||
RpcHealth::stub(),
|
RpcHealth::stub(),
|
||||||
Arc::new(ClusterInfo::default()),
|
cluster_info.clone(),
|
||||||
Hash::default(),
|
Hash::default(),
|
||||||
|
Arc::new(SendTransactionService::new(
|
||||||
|
&cluster_info,
|
||||||
|
&bank_forks,
|
||||||
|
&exit,
|
||||||
|
)),
|
||||||
);
|
);
|
||||||
|
|
||||||
let req = r#"{"jsonrpc":"2.0","id":1,"method":"sendTransaction","params":["37u9WtQpcm6ULa3Vmu7ySnANv"]}"#;
|
let req = r#"{"jsonrpc":"2.0","id":1,"method":"sendTransaction","params":["37u9WtQpcm6ULa3Vmu7ySnANv"]}"#;
|
||||||
@ -2769,17 +2764,23 @@ pub mod tests {
|
|||||||
let mut io = MetaIoHandler::default();
|
let mut io = MetaIoHandler::default();
|
||||||
let rpc = RpcSolImpl;
|
let rpc = RpcSolImpl;
|
||||||
io.extend_with(rpc.to_delegate());
|
io.extend_with(rpc.to_delegate());
|
||||||
|
let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(
|
||||||
|
ContactInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")),
|
||||||
|
));
|
||||||
let meta = JsonRpcRequestProcessor::new(
|
let meta = JsonRpcRequestProcessor::new(
|
||||||
JsonRpcConfig::default(),
|
JsonRpcConfig::default(),
|
||||||
bank_forks,
|
bank_forks.clone(),
|
||||||
block_commitment_cache,
|
block_commitment_cache,
|
||||||
blockstore,
|
blockstore,
|
||||||
validator_exit,
|
validator_exit,
|
||||||
health.clone(),
|
health.clone(),
|
||||||
Arc::new(ClusterInfo::new_with_invalid_keypair(
|
cluster_info.clone(),
|
||||||
ContactInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")),
|
|
||||||
)),
|
|
||||||
Hash::default(),
|
Hash::default(),
|
||||||
|
Arc::new(SendTransactionService::new(
|
||||||
|
&cluster_info,
|
||||||
|
&bank_forks,
|
||||||
|
&exit,
|
||||||
|
)),
|
||||||
);
|
);
|
||||||
|
|
||||||
let mut bad_transaction =
|
let mut bad_transaction =
|
||||||
@ -2843,17 +2844,6 @@ pub mod tests {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_rpc_get_tpu_addr() {
|
|
||||||
let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(
|
|
||||||
ContactInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")),
|
|
||||||
));
|
|
||||||
assert_eq!(
|
|
||||||
get_tpu_addr(&cluster_info),
|
|
||||||
Ok(socketaddr!("127.0.0.1:1234"))
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_rpc_verify_pubkey() {
|
fn test_rpc_verify_pubkey() {
|
||||||
let pubkey = Pubkey::new_rand();
|
let pubkey = Pubkey::new_rand();
|
||||||
@ -2879,7 +2869,7 @@ pub mod tests {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn new_bank_forks() -> (Arc<RwLock<BankForks>>, Keypair, Keypair) {
|
pub(crate) fn new_bank_forks() -> (Arc<RwLock<BankForks>>, Keypair, Keypair) {
|
||||||
let GenesisConfigInfo {
|
let GenesisConfigInfo {
|
||||||
mut genesis_config,
|
mut genesis_config,
|
||||||
mint_keypair,
|
mint_keypair,
|
||||||
@ -2917,15 +2907,22 @@ pub mod tests {
|
|||||||
let block_commitment_cache = Arc::new(RwLock::new(
|
let block_commitment_cache = Arc::new(RwLock::new(
|
||||||
BlockCommitmentCache::default_with_blockstore(blockstore.clone()),
|
BlockCommitmentCache::default_with_blockstore(blockstore.clone()),
|
||||||
));
|
));
|
||||||
|
let cluster_info = Arc::new(ClusterInfo::default());
|
||||||
|
let bank_forks = new_bank_forks().0;
|
||||||
let request_processor = JsonRpcRequestProcessor::new(
|
let request_processor = JsonRpcRequestProcessor::new(
|
||||||
JsonRpcConfig::default(),
|
JsonRpcConfig::default(),
|
||||||
new_bank_forks().0,
|
bank_forks.clone(),
|
||||||
block_commitment_cache,
|
block_commitment_cache,
|
||||||
blockstore,
|
blockstore,
|
||||||
validator_exit,
|
validator_exit,
|
||||||
RpcHealth::stub(),
|
RpcHealth::stub(),
|
||||||
Arc::new(ClusterInfo::default()),
|
cluster_info.clone(),
|
||||||
Hash::default(),
|
Hash::default(),
|
||||||
|
Arc::new(SendTransactionService::new(
|
||||||
|
&cluster_info,
|
||||||
|
&bank_forks,
|
||||||
|
&exit,
|
||||||
|
)),
|
||||||
);
|
);
|
||||||
assert_eq!(request_processor.validator_exit(), Ok(false));
|
assert_eq!(request_processor.validator_exit(), Ok(false));
|
||||||
assert_eq!(exit.load(Ordering::Relaxed), false);
|
assert_eq!(exit.load(Ordering::Relaxed), false);
|
||||||
@ -2942,15 +2939,22 @@ pub mod tests {
|
|||||||
));
|
));
|
||||||
let mut config = JsonRpcConfig::default();
|
let mut config = JsonRpcConfig::default();
|
||||||
config.enable_validator_exit = true;
|
config.enable_validator_exit = true;
|
||||||
|
let bank_forks = new_bank_forks().0;
|
||||||
|
let cluster_info = Arc::new(ClusterInfo::default());
|
||||||
let request_processor = JsonRpcRequestProcessor::new(
|
let request_processor = JsonRpcRequestProcessor::new(
|
||||||
config,
|
config,
|
||||||
new_bank_forks().0,
|
bank_forks.clone(),
|
||||||
block_commitment_cache,
|
block_commitment_cache,
|
||||||
blockstore,
|
blockstore,
|
||||||
validator_exit,
|
validator_exit,
|
||||||
RpcHealth::stub(),
|
RpcHealth::stub(),
|
||||||
Arc::new(ClusterInfo::default()),
|
cluster_info.clone(),
|
||||||
Hash::default(),
|
Hash::default(),
|
||||||
|
Arc::new(SendTransactionService::new(
|
||||||
|
&cluster_info,
|
||||||
|
&bank_forks,
|
||||||
|
&exit,
|
||||||
|
)),
|
||||||
);
|
);
|
||||||
assert_eq!(request_processor.validator_exit(), Ok(true));
|
assert_eq!(request_processor.validator_exit(), Ok(true));
|
||||||
assert_eq!(exit.load(Ordering::Relaxed), true);
|
assert_eq!(exit.load(Ordering::Relaxed), true);
|
||||||
@ -3027,15 +3031,21 @@ pub mod tests {
|
|||||||
|
|
||||||
let mut config = JsonRpcConfig::default();
|
let mut config = JsonRpcConfig::default();
|
||||||
config.enable_validator_exit = true;
|
config.enable_validator_exit = true;
|
||||||
|
let cluster_info = Arc::new(ClusterInfo::default());
|
||||||
let request_processor = JsonRpcRequestProcessor::new(
|
let request_processor = JsonRpcRequestProcessor::new(
|
||||||
config,
|
config,
|
||||||
bank_forks,
|
bank_forks.clone(),
|
||||||
block_commitment_cache,
|
block_commitment_cache,
|
||||||
blockstore,
|
blockstore,
|
||||||
validator_exit,
|
validator_exit,
|
||||||
RpcHealth::stub(),
|
RpcHealth::stub(),
|
||||||
Arc::new(ClusterInfo::default()),
|
cluster_info.clone(),
|
||||||
Hash::default(),
|
Hash::default(),
|
||||||
|
Arc::new(SendTransactionService::new(
|
||||||
|
&cluster_info,
|
||||||
|
&bank_forks,
|
||||||
|
&exit,
|
||||||
|
)),
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
request_processor.get_block_commitment(0),
|
request_processor.get_block_commitment(0),
|
||||||
|
@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
cluster_info::ClusterInfo, commitment::BlockCommitmentCache, rpc::*, rpc_health::*,
|
cluster_info::ClusterInfo, commitment::BlockCommitmentCache, rpc::*, rpc_health::*,
|
||||||
validator::ValidatorExit,
|
send_transaction_service::SendTransactionService, validator::ValidatorExit,
|
||||||
};
|
};
|
||||||
use jsonrpc_core::MetaIoHandler;
|
use jsonrpc_core::MetaIoHandler;
|
||||||
use jsonrpc_http_server::{
|
use jsonrpc_http_server::{
|
||||||
@ -20,7 +20,7 @@ use std::{
|
|||||||
collections::HashSet,
|
collections::HashSet,
|
||||||
net::SocketAddr,
|
net::SocketAddr,
|
||||||
path::{Path, PathBuf},
|
path::{Path, PathBuf},
|
||||||
sync::atomic::AtomicBool,
|
sync::atomic::{AtomicBool, Ordering},
|
||||||
sync::{mpsc::channel, Arc, RwLock},
|
sync::{mpsc::channel, Arc, RwLock},
|
||||||
thread::{self, Builder, JoinHandle},
|
thread::{self, Builder, JoinHandle},
|
||||||
};
|
};
|
||||||
@ -249,6 +249,13 @@ impl JsonRpcService {
|
|||||||
override_health_check,
|
override_health_check,
|
||||||
));
|
));
|
||||||
|
|
||||||
|
let exit_send_transaction_service = Arc::new(AtomicBool::new(false));
|
||||||
|
let send_transaction_service = Arc::new(SendTransactionService::new(
|
||||||
|
&cluster_info,
|
||||||
|
&bank_forks,
|
||||||
|
&exit_send_transaction_service,
|
||||||
|
));
|
||||||
|
|
||||||
let request_processor = JsonRpcRequestProcessor::new(
|
let request_processor = JsonRpcRequestProcessor::new(
|
||||||
config,
|
config,
|
||||||
bank_forks.clone(),
|
bank_forks.clone(),
|
||||||
@ -258,6 +265,7 @@ impl JsonRpcService {
|
|||||||
health.clone(),
|
health.clone(),
|
||||||
cluster_info,
|
cluster_info,
|
||||||
genesis_hash,
|
genesis_hash,
|
||||||
|
send_transaction_service,
|
||||||
);
|
);
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@ -304,6 +312,7 @@ impl JsonRpcService {
|
|||||||
let server = server.unwrap();
|
let server = server.unwrap();
|
||||||
close_handle_sender.send(server.close_handle()).unwrap();
|
close_handle_sender.send(server.close_handle()).unwrap();
|
||||||
server.wait();
|
server.wait();
|
||||||
|
exit_send_transaction_service.store(true, Ordering::Relaxed);
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
@ -347,10 +356,7 @@ mod tests {
|
|||||||
};
|
};
|
||||||
use solana_runtime::bank::Bank;
|
use solana_runtime::bank::Bank;
|
||||||
use solana_sdk::signature::Signer;
|
use solana_sdk::signature::Signer;
|
||||||
use std::{
|
use std::net::{IpAddr, Ipv4Addr};
|
||||||
net::{IpAddr, Ipv4Addr, SocketAddr},
|
|
||||||
sync::atomic::Ordering,
|
|
||||||
};
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_rpc_new() {
|
fn test_rpc_new() {
|
||||||
|
377
core/src/send_transaction_service.rs
Normal file
377
core/src/send_transaction_service.rs
Normal file
@ -0,0 +1,377 @@
|
|||||||
|
use crate::cluster_info::ClusterInfo;
|
||||||
|
use solana_ledger::bank_forks::BankForks;
|
||||||
|
use solana_metrics::{datapoint_warn, inc_new_counter_info};
|
||||||
|
use solana_runtime::bank::Bank;
|
||||||
|
use solana_sdk::{clock::Slot, signature::Signature};
|
||||||
|
use std::{
|
||||||
|
collections::HashMap,
|
||||||
|
net::{SocketAddr, UdpSocket},
|
||||||
|
sync::{
|
||||||
|
atomic::{AtomicBool, Ordering},
|
||||||
|
mpsc::{channel, Receiver, Sender},
|
||||||
|
Arc, Mutex, RwLock,
|
||||||
|
},
|
||||||
|
thread::{self, Builder, JoinHandle},
|
||||||
|
time::{Duration, Instant},
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Maximum size of the transaction queue
|
||||||
|
const MAX_TRANSACTION_QUEUE_SIZE: usize = 10_000; // This seems like a lot but maybe it needs to be bigger one day
|
||||||
|
|
||||||
|
pub struct SendTransactionService {
|
||||||
|
thread: JoinHandle<()>,
|
||||||
|
sender: Mutex<Sender<TransactionInfo>>,
|
||||||
|
send_socket: UdpSocket,
|
||||||
|
tpu_address: SocketAddr,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct TransactionInfo {
|
||||||
|
signature: Signature,
|
||||||
|
wire_transaction: Vec<u8>,
|
||||||
|
last_valid_slot: Slot,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default, Debug, PartialEq)]
|
||||||
|
struct ProcessTransactionsResult {
|
||||||
|
rooted: u64,
|
||||||
|
expired: u64,
|
||||||
|
retried: u64,
|
||||||
|
failed: u64,
|
||||||
|
retained: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SendTransactionService {
|
||||||
|
pub fn new(
|
||||||
|
cluster_info: &Arc<ClusterInfo>,
|
||||||
|
bank_forks: &Arc<RwLock<BankForks>>,
|
||||||
|
exit: &Arc<AtomicBool>,
|
||||||
|
) -> Self {
|
||||||
|
let (sender, receiver) = channel::<TransactionInfo>();
|
||||||
|
let tpu_address = cluster_info.my_contact_info().tpu;
|
||||||
|
|
||||||
|
let thread = Self::retry_thread(receiver, bank_forks.clone(), tpu_address, exit.clone());
|
||||||
|
Self {
|
||||||
|
thread,
|
||||||
|
sender: Mutex::new(sender),
|
||||||
|
send_socket: UdpSocket::bind("0.0.0.0:0").unwrap(),
|
||||||
|
tpu_address,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn retry_thread(
|
||||||
|
receiver: Receiver<TransactionInfo>,
|
||||||
|
bank_forks: Arc<RwLock<BankForks>>,
|
||||||
|
tpu_address: SocketAddr,
|
||||||
|
exit: Arc<AtomicBool>,
|
||||||
|
) -> JoinHandle<()> {
|
||||||
|
let mut last_status_check = Instant::now();
|
||||||
|
let mut transactions = HashMap::new();
|
||||||
|
let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
|
|
||||||
|
Builder::new()
|
||||||
|
.name("send-tx-svc".to_string())
|
||||||
|
.spawn(move || loop {
|
||||||
|
if exit.load(Ordering::Relaxed) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Ok(transaction_info) = receiver.recv_timeout(Duration::from_secs(1)) {
|
||||||
|
if transactions.len() < MAX_TRANSACTION_QUEUE_SIZE {
|
||||||
|
transactions.insert(transaction_info.signature, transaction_info);
|
||||||
|
} else {
|
||||||
|
datapoint_warn!("send_transaction_service-queue-overflow");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if Instant::now().duration_since(last_status_check).as_secs() >= 5 {
|
||||||
|
if !transactions.is_empty() {
|
||||||
|
datapoint_info!(
|
||||||
|
"send_transaction_service-queue-size",
|
||||||
|
("len", transactions.len(), i64)
|
||||||
|
);
|
||||||
|
let bank_forks = bank_forks.read().unwrap();
|
||||||
|
let root_bank = bank_forks.root_bank();
|
||||||
|
let working_bank = bank_forks.working_bank();
|
||||||
|
|
||||||
|
let _result = Self::process_transactions(
|
||||||
|
&working_bank,
|
||||||
|
&root_bank,
|
||||||
|
&send_socket,
|
||||||
|
&tpu_address,
|
||||||
|
&mut transactions,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
last_status_check = Instant::now();
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn process_transactions(
|
||||||
|
working_bank: &Arc<Bank>,
|
||||||
|
root_bank: &Arc<Bank>,
|
||||||
|
send_socket: &UdpSocket,
|
||||||
|
tpu_address: &SocketAddr,
|
||||||
|
transactions: &mut HashMap<Signature, TransactionInfo>,
|
||||||
|
) -> ProcessTransactionsResult {
|
||||||
|
let mut result = ProcessTransactionsResult::default();
|
||||||
|
|
||||||
|
transactions.retain(|signature, transaction_info| {
|
||||||
|
if root_bank.has_signature(signature) {
|
||||||
|
info!("Transaction is rooted: {}", signature);
|
||||||
|
result.rooted += 1;
|
||||||
|
inc_new_counter_info!("send_transaction_service-rooted", 1);
|
||||||
|
false
|
||||||
|
} else if transaction_info.last_valid_slot < root_bank.slot() {
|
||||||
|
info!("Dropping expired transaction: {}", signature);
|
||||||
|
result.expired += 1;
|
||||||
|
inc_new_counter_info!("send_transaction_service-expired", 1);
|
||||||
|
false
|
||||||
|
} else {
|
||||||
|
match working_bank.get_signature_status_slot(signature) {
|
||||||
|
None => {
|
||||||
|
// Transaction is unknown to the working bank, it might have been
|
||||||
|
// dropped or landed in another fork. Re-send it
|
||||||
|
info!("Retrying transaction: {}", signature);
|
||||||
|
result.retried += 1;
|
||||||
|
inc_new_counter_info!("send_transaction_service-retry", 1);
|
||||||
|
Self::send_transaction(
|
||||||
|
&send_socket,
|
||||||
|
&tpu_address,
|
||||||
|
&transaction_info.wire_transaction,
|
||||||
|
);
|
||||||
|
true
|
||||||
|
}
|
||||||
|
Some((_slot, status)) => {
|
||||||
|
if status.is_err() {
|
||||||
|
info!("Dropping failed transaction: {}", signature);
|
||||||
|
result.failed += 1;
|
||||||
|
inc_new_counter_info!("send_transaction_service-failed", 1);
|
||||||
|
false
|
||||||
|
} else {
|
||||||
|
result.retained += 1;
|
||||||
|
true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
result
|
||||||
|
}
|
||||||
|
|
||||||
|
fn send_transaction(
|
||||||
|
send_socket: &UdpSocket,
|
||||||
|
tpu_address: &SocketAddr,
|
||||||
|
wire_transaction: &[u8],
|
||||||
|
) {
|
||||||
|
if let Err(err) = send_socket.send_to(wire_transaction, tpu_address) {
|
||||||
|
warn!("Failed to send transaction to {}: {:?}", tpu_address, err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn send(&self, signature: Signature, wire_transaction: Vec<u8>, last_valid_slot: Slot) {
|
||||||
|
inc_new_counter_info!("send_transaction_service-enqueue", 1, 1);
|
||||||
|
Self::send_transaction(&self.send_socket, &self.tpu_address, &wire_transaction);
|
||||||
|
|
||||||
|
self.sender
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.send(TransactionInfo {
|
||||||
|
signature,
|
||||||
|
wire_transaction,
|
||||||
|
last_valid_slot,
|
||||||
|
})
|
||||||
|
.unwrap_or_else(|err| warn!("Failed to enqueue transaction: {}", err));
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn join(self) -> thread::Result<()> {
|
||||||
|
self.thread.join()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod test {
|
||||||
|
use super::*;
|
||||||
|
use crate::rpc::tests::new_bank_forks;
|
||||||
|
use solana_sdk::{pubkey::Pubkey, signature::Signer};
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn service_exit() {
|
||||||
|
let cluster_info = Arc::new(ClusterInfo::default());
|
||||||
|
let bank_forks = new_bank_forks().0;
|
||||||
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
|
|
||||||
|
let send_tranaction_service =
|
||||||
|
SendTransactionService::new(&cluster_info, &bank_forks, &exit);
|
||||||
|
|
||||||
|
exit.store(true, Ordering::Relaxed);
|
||||||
|
send_tranaction_service.join().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn process_transactions() {
|
||||||
|
solana_logger::setup();
|
||||||
|
|
||||||
|
let (bank_forks, mint_keypair, _voting_keypair) = new_bank_forks();
|
||||||
|
let cluster_info = ClusterInfo::default();
|
||||||
|
let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
|
let tpu_address = cluster_info.my_contact_info().tpu;
|
||||||
|
|
||||||
|
let root_bank = Arc::new(Bank::new_from_parent(
|
||||||
|
&bank_forks.read().unwrap().working_bank(),
|
||||||
|
&Pubkey::default(),
|
||||||
|
1,
|
||||||
|
));
|
||||||
|
let rooted_signature = root_bank
|
||||||
|
.transfer(1, &mint_keypair, &mint_keypair.pubkey())
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let working_bank = Arc::new(Bank::new_from_parent(&root_bank, &Pubkey::default(), 2));
|
||||||
|
|
||||||
|
let non_rooted_signature = working_bank
|
||||||
|
.transfer(2, &mint_keypair, &mint_keypair.pubkey())
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let failed_signature = {
|
||||||
|
let blockhash = working_bank.last_blockhash();
|
||||||
|
let transaction = solana_sdk::system_transaction::transfer(
|
||||||
|
&mint_keypair,
|
||||||
|
&Pubkey::default(),
|
||||||
|
1,
|
||||||
|
blockhash,
|
||||||
|
);
|
||||||
|
let signature = transaction.signatures[0];
|
||||||
|
working_bank.process_transaction(&transaction).unwrap_err();
|
||||||
|
signature
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut transactions = HashMap::new();
|
||||||
|
|
||||||
|
info!("Expired transactions are dropped..");
|
||||||
|
transactions.insert(
|
||||||
|
Signature::default(),
|
||||||
|
TransactionInfo {
|
||||||
|
signature: Signature::default(),
|
||||||
|
wire_transaction: vec![],
|
||||||
|
last_valid_slot: root_bank.slot() - 1,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
let result = SendTransactionService::process_transactions(
|
||||||
|
&working_bank,
|
||||||
|
&root_bank,
|
||||||
|
&send_socket,
|
||||||
|
&tpu_address,
|
||||||
|
&mut transactions,
|
||||||
|
);
|
||||||
|
assert!(transactions.is_empty());
|
||||||
|
assert_eq!(
|
||||||
|
result,
|
||||||
|
ProcessTransactionsResult {
|
||||||
|
expired: 1,
|
||||||
|
..ProcessTransactionsResult::default()
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
info!("Rooted transactions are dropped...");
|
||||||
|
transactions.insert(
|
||||||
|
rooted_signature,
|
||||||
|
TransactionInfo {
|
||||||
|
signature: rooted_signature,
|
||||||
|
wire_transaction: vec![],
|
||||||
|
last_valid_slot: working_bank.slot(),
|
||||||
|
},
|
||||||
|
);
|
||||||
|
let result = SendTransactionService::process_transactions(
|
||||||
|
&working_bank,
|
||||||
|
&root_bank,
|
||||||
|
&send_socket,
|
||||||
|
&tpu_address,
|
||||||
|
&mut transactions,
|
||||||
|
);
|
||||||
|
assert!(transactions.is_empty());
|
||||||
|
assert_eq!(
|
||||||
|
result,
|
||||||
|
ProcessTransactionsResult {
|
||||||
|
rooted: 1,
|
||||||
|
..ProcessTransactionsResult::default()
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
info!("Failed transactions are dropped...");
|
||||||
|
transactions.insert(
|
||||||
|
failed_signature,
|
||||||
|
TransactionInfo {
|
||||||
|
signature: failed_signature,
|
||||||
|
wire_transaction: vec![],
|
||||||
|
last_valid_slot: working_bank.slot(),
|
||||||
|
},
|
||||||
|
);
|
||||||
|
let result = SendTransactionService::process_transactions(
|
||||||
|
&working_bank,
|
||||||
|
&root_bank,
|
||||||
|
&send_socket,
|
||||||
|
&tpu_address,
|
||||||
|
&mut transactions,
|
||||||
|
);
|
||||||
|
assert!(transactions.is_empty());
|
||||||
|
assert_eq!(
|
||||||
|
result,
|
||||||
|
ProcessTransactionsResult {
|
||||||
|
failed: 1,
|
||||||
|
..ProcessTransactionsResult::default()
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
info!("Non-rooted transactions are kept...");
|
||||||
|
transactions.insert(
|
||||||
|
non_rooted_signature,
|
||||||
|
TransactionInfo {
|
||||||
|
signature: non_rooted_signature,
|
||||||
|
wire_transaction: vec![],
|
||||||
|
last_valid_slot: working_bank.slot(),
|
||||||
|
},
|
||||||
|
);
|
||||||
|
let result = SendTransactionService::process_transactions(
|
||||||
|
&working_bank,
|
||||||
|
&root_bank,
|
||||||
|
&send_socket,
|
||||||
|
&tpu_address,
|
||||||
|
&mut transactions,
|
||||||
|
);
|
||||||
|
assert_eq!(transactions.len(), 1);
|
||||||
|
assert_eq!(
|
||||||
|
result,
|
||||||
|
ProcessTransactionsResult {
|
||||||
|
retained: 1,
|
||||||
|
..ProcessTransactionsResult::default()
|
||||||
|
}
|
||||||
|
);
|
||||||
|
transactions.clear();
|
||||||
|
|
||||||
|
info!("Unknown transactions are retried...");
|
||||||
|
transactions.insert(
|
||||||
|
Signature::default(),
|
||||||
|
TransactionInfo {
|
||||||
|
signature: Signature::default(),
|
||||||
|
wire_transaction: vec![],
|
||||||
|
last_valid_slot: working_bank.slot(),
|
||||||
|
},
|
||||||
|
);
|
||||||
|
let result = SendTransactionService::process_transactions(
|
||||||
|
&working_bank,
|
||||||
|
&root_bank,
|
||||||
|
&send_socket,
|
||||||
|
&tpu_address,
|
||||||
|
&mut transactions,
|
||||||
|
);
|
||||||
|
assert_eq!(transactions.len(), 1);
|
||||||
|
assert_eq!(
|
||||||
|
result,
|
||||||
|
ProcessTransactionsResult {
|
||||||
|
retried: 1,
|
||||||
|
..ProcessTransactionsResult::default()
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user