Drop support for EntryInfo subscriptions
This commit is contained in:
parent
0ae69bdcd9
commit
cc447c0fda
@ -4,7 +4,6 @@ use accountant::Accountant;
|
||||
use entry::Entry;
|
||||
use ledger;
|
||||
use packet;
|
||||
use request_processor::RequestProcessor;
|
||||
use result::Result;
|
||||
use serde_json;
|
||||
use std::collections::VecDeque;
|
||||
@ -17,16 +16,12 @@ use streamer;
|
||||
|
||||
pub struct EntryWriter<'a> {
|
||||
accountant: &'a Accountant,
|
||||
request_processor: &'a RequestProcessor,
|
||||
}
|
||||
|
||||
impl<'a> EntryWriter<'a> {
|
||||
/// Create a new Tpu that wraps the given Accountant.
|
||||
pub fn new(accountant: &'a Accountant, request_processor: &'a RequestProcessor) -> Self {
|
||||
EntryWriter {
|
||||
accountant,
|
||||
request_processor,
|
||||
}
|
||||
pub fn new(accountant: &'a Accountant) -> Self {
|
||||
EntryWriter { accountant }
|
||||
}
|
||||
|
||||
fn write_entry<W: Write>(&self, writer: &Mutex<W>, entry: &Entry) {
|
||||
@ -37,7 +32,6 @@ impl<'a> EntryWriter<'a> {
|
||||
"{}",
|
||||
serde_json::to_string(&entry).expect("'entry' to_strong in fn write_entry")
|
||||
).expect("writeln! in fn write_entry");
|
||||
self.request_processor.notify_entry_info_subscribers(&entry);
|
||||
}
|
||||
|
||||
fn write_entries<W: Write>(
|
||||
|
@ -14,19 +14,6 @@ pub enum Request {
|
||||
GetBalance { key: PublicKey },
|
||||
GetLastId,
|
||||
GetTransactionCount,
|
||||
Subscribe { subscriptions: Vec<Subscription> },
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub enum Subscription {
|
||||
EntryInfo,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub struct EntryInfo {
|
||||
pub id: Hash,
|
||||
pub num_hashes: u64,
|
||||
pub num_events: u64,
|
||||
}
|
||||
|
||||
impl Request {
|
||||
@ -44,7 +31,6 @@ pub enum Response {
|
||||
Balance { key: PublicKey, val: Option<i64> },
|
||||
LastId { id: Hash },
|
||||
TransactionCount { transaction_count: u64 },
|
||||
EntryInfo(EntryInfo),
|
||||
}
|
||||
|
||||
pub fn to_request_packets(r: &packet::PacketRecycler, reqs: Vec<Request>) -> Vec<SharedPackets> {
|
||||
|
@ -8,12 +8,12 @@ use event_processor::EventProcessor;
|
||||
use packet;
|
||||
use packet::SharedPackets;
|
||||
use rayon::prelude::*;
|
||||
use request::{EntryInfo, Request, Response, Subscription};
|
||||
use request::{Request, Response};
|
||||
use result::Result;
|
||||
use std::collections::VecDeque;
|
||||
use std::net::{SocketAddr, UdpSocket};
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::sync::mpsc::{Receiver, Sender};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use streamer;
|
||||
@ -21,16 +21,12 @@ use timing;
|
||||
|
||||
pub struct RequestProcessor {
|
||||
accountant: Arc<Accountant>,
|
||||
entry_info_subscribers: Mutex<Vec<SocketAddr>>,
|
||||
}
|
||||
|
||||
impl RequestProcessor {
|
||||
/// Create a new Tpu that wraps the given Accountant.
|
||||
pub fn new(accountant: Arc<Accountant>) -> Self {
|
||||
RequestProcessor {
|
||||
accountant,
|
||||
entry_info_subscribers: Mutex::new(vec![]),
|
||||
}
|
||||
RequestProcessor { accountant }
|
||||
}
|
||||
|
||||
/// Process Request items sent by clients.
|
||||
@ -59,16 +55,6 @@ impl RequestProcessor {
|
||||
Some(rsp)
|
||||
}
|
||||
Request::Transaction(_) => unreachable!(),
|
||||
Request::Subscribe { subscriptions } => {
|
||||
for subscription in subscriptions {
|
||||
match subscription {
|
||||
Subscription::EntryInfo => {
|
||||
self.entry_info_subscribers.lock().unwrap().push(rsp_addr)
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -81,29 +67,6 @@ impl RequestProcessor {
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn notify_entry_info_subscribers(&self, entry: &Entry) {
|
||||
// TODO: No need to bind().
|
||||
let socket = UdpSocket::bind("0.0.0.0:0").expect("bind");
|
||||
|
||||
// copy subscribers to avoid taking lock while doing io
|
||||
let addrs = self.entry_info_subscribers.lock().unwrap().clone();
|
||||
trace!("Sending to {} addrs", addrs.len());
|
||||
for addr in addrs {
|
||||
let entry_info = EntryInfo {
|
||||
id: entry.id,
|
||||
num_hashes: entry.num_hashes,
|
||||
num_events: entry.events.len() as u64,
|
||||
};
|
||||
let data = serialize(&Response::EntryInfo(entry_info)).expect("serialize EntryInfo");
|
||||
trace!("sending {} to {}", data.len(), addr);
|
||||
//TODO dont do IO here, this needs to be on a separate channel
|
||||
let res = socket.send_to(&data, addr);
|
||||
if res.is_err() {
|
||||
eprintln!("couldn't send response: {:?}", res);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn deserialize_requests(p: &packet::Packets) -> Vec<Option<(Request, SocketAddr)>> {
|
||||
p.packets
|
||||
.par_iter()
|
||||
|
@ -33,7 +33,6 @@ impl Rpu {
|
||||
|
||||
fn write_service<W: Write + Send + 'static>(
|
||||
accountant: Arc<Accountant>,
|
||||
request_processor: Arc<RequestProcessor>,
|
||||
exit: Arc<AtomicBool>,
|
||||
broadcast: streamer::BlobSender,
|
||||
blob_recycler: packet::BlobRecycler,
|
||||
@ -41,7 +40,7 @@ impl Rpu {
|
||||
entry_receiver: Receiver<Entry>,
|
||||
) -> JoinHandle<()> {
|
||||
spawn(move || loop {
|
||||
let entry_writer = EntryWriter::new(&accountant, &request_processor);
|
||||
let entry_writer = EntryWriter::new(&accountant);
|
||||
let _ = entry_writer.write_and_send_entries(
|
||||
&broadcast,
|
||||
&blob_recycler,
|
||||
@ -99,7 +98,6 @@ impl Rpu {
|
||||
let (broadcast_sender, broadcast_receiver) = channel();
|
||||
let t_write = Self::write_service(
|
||||
self.event_processor.accountant.clone(),
|
||||
request_stage.request_processor.clone(),
|
||||
exit.clone(),
|
||||
broadcast_sender,
|
||||
blob_recycler.clone(),
|
||||
|
@ -6,7 +6,7 @@
|
||||
use bincode::{deserialize, serialize};
|
||||
use futures::future::{ok, FutureResult};
|
||||
use hash::Hash;
|
||||
use request::{Request, Response, Subscription};
|
||||
use request::{Request, Response};
|
||||
use signature::{KeyPair, PublicKey, Signature};
|
||||
use std::collections::HashMap;
|
||||
use std::io;
|
||||
@ -18,7 +18,6 @@ pub struct ThinClient {
|
||||
pub requests_socket: UdpSocket,
|
||||
pub events_socket: UdpSocket,
|
||||
last_id: Option<Hash>,
|
||||
num_events: u64,
|
||||
transaction_count: u64,
|
||||
balances: HashMap<PublicKey, Option<i64>>,
|
||||
}
|
||||
@ -33,22 +32,12 @@ impl ThinClient {
|
||||
requests_socket,
|
||||
events_socket,
|
||||
last_id: None,
|
||||
num_events: 0,
|
||||
transaction_count: 0,
|
||||
balances: HashMap::new(),
|
||||
};
|
||||
client.init();
|
||||
client
|
||||
}
|
||||
|
||||
pub fn init(&self) {
|
||||
let subscriptions = vec![Subscription::EntryInfo];
|
||||
let req = Request::Subscribe { subscriptions };
|
||||
let data = serialize(&req).expect("serialize Subscribe in thin_client");
|
||||
trace!("subscribing to {}", self.addr);
|
||||
let _res = self.requests_socket.send_to(&data, &self.addr);
|
||||
}
|
||||
|
||||
pub fn recv_response(&self) -> io::Result<Response> {
|
||||
let mut buf = vec![0u8; 1024];
|
||||
info!("start recv_from");
|
||||
@ -72,11 +61,6 @@ impl ThinClient {
|
||||
info!("Response transaction count {:?}", transaction_count);
|
||||
self.transaction_count = transaction_count;
|
||||
}
|
||||
Response::EntryInfo(entry_info) => {
|
||||
trace!("Response entry_info {:?}", entry_info.id);
|
||||
self.last_id = Some(entry_info.id);
|
||||
self.num_events += entry_info.num_events;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -34,12 +34,11 @@ impl Tvu {
|
||||
|
||||
fn drain_service(
|
||||
accountant: Arc<Accountant>,
|
||||
request_processor: Arc<RequestProcessor>,
|
||||
exit: Arc<AtomicBool>,
|
||||
entry_receiver: Receiver<Entry>,
|
||||
) -> JoinHandle<()> {
|
||||
spawn(move || {
|
||||
let entry_writer = EntryWriter::new(&accountant, &request_processor);
|
||||
let entry_writer = EntryWriter::new(&accountant);
|
||||
loop {
|
||||
let _ = entry_writer.drain_entries(&entry_receiver);
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
@ -183,7 +182,6 @@ impl Tvu {
|
||||
|
||||
let t_write = Self::drain_service(
|
||||
obj.event_processor.accountant.clone(),
|
||||
request_stage.request_processor.clone(),
|
||||
exit.clone(),
|
||||
request_stage.entry_receiver,
|
||||
);
|
||||
|
Loading…
x
Reference in New Issue
Block a user