| 
									
										
										
										
											2019-02-17 10:09:46 -07:00
										 |  |  | //! The `pubsub` module implements a threaded subscription service on client RPC request
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-03-30 17:53:25 -06:00
										 |  |  | use crate::{
 | 
					
						
							| 
									
										
										
										
											2020-09-16 21:26:31 +00:00
										 |  |  |     rpc::MAX_REQUEST_PAYLOAD_SIZE,
 | 
					
						
							| 
									
										
										
										
											2020-03-30 17:53:25 -06:00
										 |  |  |     rpc_pubsub::{RpcSolPubSub, RpcSolPubSubImpl},
 | 
					
						
							|  |  |  |     rpc_subscriptions::RpcSubscriptions,
 | 
					
						
							|  |  |  | };
 | 
					
						
							| 
									
										
										
										
											2019-02-17 10:09:46 -07:00
										 |  |  | use jsonrpc_pubsub::{PubSubHandler, Session};
 | 
					
						
							|  |  |  | use jsonrpc_ws_server::{RequestContext, ServerBuilder};
 | 
					
						
							| 
									
										
										
										
											2020-03-30 17:53:25 -06:00
										 |  |  | use std::{
 | 
					
						
							|  |  |  |     net::SocketAddr,
 | 
					
						
							|  |  |  |     sync::{
 | 
					
						
							|  |  |  |         atomic::{AtomicBool, Ordering},
 | 
					
						
							|  |  |  |         Arc,
 | 
					
						
							|  |  |  |     },
 | 
					
						
							|  |  |  |     thread::{self, sleep, Builder, JoinHandle},
 | 
					
						
							|  |  |  |     time::Duration,
 | 
					
						
							|  |  |  | };
 | 
					
						
							| 
									
										
										
										
											2019-02-17 10:09:46 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | pub struct PubSubService {
 | 
					
						
							|  |  |  |     thread_hdl: JoinHandle<()>,
 | 
					
						
							|  |  |  | }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | impl PubSubService {
 | 
					
						
							| 
									
										
										
										
											2019-03-04 15:44:31 -08:00
										 |  |  |     pub fn new(
 | 
					
						
							|  |  |  |         subscriptions: &Arc<RpcSubscriptions>,
 | 
					
						
							|  |  |  |         pubsub_addr: SocketAddr,
 | 
					
						
							|  |  |  |         exit: &Arc<AtomicBool>,
 | 
					
						
							|  |  |  |     ) -> Self {
 | 
					
						
							| 
									
										
										
										
											2019-02-17 10:09:46 -07:00
										 |  |  |         info!("rpc_pubsub bound to {:?}", pubsub_addr);
 | 
					
						
							| 
									
										
										
										
											2019-02-18 17:25:17 -07:00
										 |  |  |         let rpc = RpcSolPubSubImpl::new(subscriptions.clone());
 | 
					
						
							| 
									
										
										
										
											2019-02-17 10:09:46 -07:00
										 |  |  |         let exit_ = exit.clone();
 | 
					
						
							|  |  |  |         let thread_hdl = Builder::new()
 | 
					
						
							|  |  |  |             .name("solana-pubsub".to_string())
 | 
					
						
							|  |  |  |             .spawn(move || {
 | 
					
						
							|  |  |  |                 let mut io = PubSubHandler::default();
 | 
					
						
							|  |  |  |                 io.extend_with(rpc.to_delegate());
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 let server = ServerBuilder::with_meta_extractor(io, |context: &RequestContext| {
 | 
					
						
							|  |  |  |                         info!("New pubsub connection");
 | 
					
						
							| 
									
										
										
										
											2019-12-19 23:27:54 -08:00
										 |  |  |                         let session = Arc::new(Session::new(context.sender()));
 | 
					
						
							| 
									
										
										
										
											2019-02-17 10:09:46 -07:00
										 |  |  |                         session.on_drop(|| {
 | 
					
						
							|  |  |  |                             info!("Pubsub connection dropped");
 | 
					
						
							|  |  |  |                         });
 | 
					
						
							|  |  |  |                         session
 | 
					
						
							|  |  |  |                 })
 | 
					
						
							| 
									
										
										
										
											2020-02-27 08:54:53 +08:00
										 |  |  |                 .max_connections(1000) // Arbitrary, default of 100 is too low
 | 
					
						
							| 
									
										
										
										
											2020-09-16 21:26:31 +00:00
										 |  |  |                 .max_payload(MAX_REQUEST_PAYLOAD_SIZE)
 | 
					
						
							| 
									
										
										
										
											2019-02-17 10:09:46 -07:00
										 |  |  |                 .start(&pubsub_addr);
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 if let Err(e) = server {
 | 
					
						
							|  |  |  |                     warn!("Pubsub service unavailable error: {:?}. \nAlso, check that port {} is not already in use by another application", e, pubsub_addr.port());
 | 
					
						
							|  |  |  |                     return;
 | 
					
						
							|  |  |  |                 }
 | 
					
						
							|  |  |  |                 while !exit_.load(Ordering::Relaxed) {
 | 
					
						
							|  |  |  |                     sleep(Duration::from_millis(100));
 | 
					
						
							|  |  |  |                 }
 | 
					
						
							|  |  |  |                 server.unwrap().close();
 | 
					
						
							|  |  |  |             })
 | 
					
						
							|  |  |  |             .unwrap();
 | 
					
						
							| 
									
										
										
										
											2019-03-04 15:44:31 -08:00
										 |  |  |         Self { thread_hdl }
 | 
					
						
							| 
									
										
										
										
											2019-02-17 10:09:46 -07:00
										 |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     pub fn close(self) -> thread::Result<()> {
 | 
					
						
							|  |  |  |         self.join()
 | 
					
						
							|  |  |  |     }
 | 
					
						
							| 
									
										
										
										
											2019-11-13 11:12:09 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     pub fn join(self) -> thread::Result<()> {
 | 
					
						
							|  |  |  |         self.thread_hdl.join()
 | 
					
						
							|  |  |  |     }
 | 
					
						
							| 
									
										
										
										
											2019-02-17 10:09:46 -07:00
										 |  |  | }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | #[cfg(test)]
 | 
					
						
							|  |  |  | mod tests {
 | 
					
						
							|  |  |  |     use super::*;
 | 
					
						
							| 
									
										
										
										
											2020-06-25 22:06:58 -06:00
										 |  |  |     use solana_runtime::{
 | 
					
						
							|  |  |  |         bank::Bank,
 | 
					
						
							|  |  |  |         bank_forks::BankForks,
 | 
					
						
							|  |  |  |         commitment::BlockCommitmentCache,
 | 
					
						
							| 
									
										
										
										
											2020-05-07 00:23:06 -06:00
										 |  |  |         genesis_utils::{create_genesis_config, GenesisConfigInfo},
 | 
					
						
							|  |  |  |     };
 | 
					
						
							| 
									
										
										
										
											2020-03-30 17:53:25 -06:00
										 |  |  |     use std::{
 | 
					
						
							|  |  |  |         net::{IpAddr, Ipv4Addr},
 | 
					
						
							|  |  |  |         sync::RwLock,
 | 
					
						
							|  |  |  |     };
 | 
					
						
							| 
									
										
										
										
											2019-02-17 10:09:46 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     #[test]
 | 
					
						
							|  |  |  |     fn test_pubsub_new() {
 | 
					
						
							|  |  |  |         let pubsub_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
 | 
					
						
							| 
									
										
										
										
											2019-03-04 15:44:31 -08:00
										 |  |  |         let exit = Arc::new(AtomicBool::new(false));
 | 
					
						
							| 
									
										
										
										
											2020-05-07 00:23:06 -06:00
										 |  |  |         let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
 | 
					
						
							|  |  |  |         let bank = Bank::new(&genesis_config);
 | 
					
						
							| 
									
										
										
										
											2020-06-12 11:04:17 -06:00
										 |  |  |         let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
 | 
					
						
							| 
									
										
										
										
											2020-03-30 17:53:25 -06:00
										 |  |  |         let subscriptions = Arc::new(RpcSubscriptions::new(
 | 
					
						
							|  |  |  |             &exit,
 | 
					
						
							| 
									
										
										
										
											2020-05-07 00:23:06 -06:00
										 |  |  |             bank_forks,
 | 
					
						
							| 
									
										
										
										
											2020-06-25 22:06:58 -06:00
										 |  |  |             Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
 | 
					
						
							| 
									
										
										
										
											2020-03-30 17:53:25 -06:00
										 |  |  |         ));
 | 
					
						
							| 
									
										
										
										
											2019-03-04 15:44:31 -08:00
										 |  |  |         let pubsub_service = PubSubService::new(&subscriptions, pubsub_addr, &exit);
 | 
					
						
							| 
									
										
										
										
											2019-02-17 10:09:46 -07:00
										 |  |  |         let thread = pubsub_service.thread_hdl.thread();
 | 
					
						
							|  |  |  |         assert_eq!(thread.name().unwrap(), "solana-pubsub");
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | }
 |