120
src/crdt.rs
120
src/crdt.rs
@@ -18,6 +18,7 @@ use byteorder::{LittleEndian, ReadBytesExt};
|
||||
use choose_gossip_peer_strategy::{ChooseGossipPeerStrategy, ChooseWeightedPeerStrategy};
|
||||
use counter::Counter;
|
||||
use hash::Hash;
|
||||
use ledger::LedgerWindow;
|
||||
use packet::{to_blob, Blob, BlobRecycler, SharedBlob, BLOB_SIZE};
|
||||
use pnet_datalink as datalink;
|
||||
use rand::{thread_rng, RngCore};
|
||||
@@ -960,6 +961,7 @@ impl Crdt {
|
||||
}
|
||||
fn run_window_request(
|
||||
window: &SharedWindow,
|
||||
ledger_window: &mut Option<&mut LedgerWindow>,
|
||||
me: &NodeInfo,
|
||||
from: &NodeInfo,
|
||||
ix: u64,
|
||||
@@ -1006,19 +1008,29 @@ impl Crdt {
|
||||
"requested ix {} != blob_ix {}, outside window!",
|
||||
ix, blob_ix
|
||||
);
|
||||
// falls through to checking window_ledger
|
||||
}
|
||||
} else {
|
||||
inc_new_counter!("crdt-window-request-fail", 1);
|
||||
assert!(window.read().unwrap()[pos].data.is_none());
|
||||
info!(
|
||||
"{:x}: failed RequestWindowIndex {:x} {} {}",
|
||||
me.debug_id(),
|
||||
from.debug_id(),
|
||||
ix,
|
||||
pos,
|
||||
);
|
||||
}
|
||||
|
||||
if let Some(ledger_window) = ledger_window {
|
||||
if let Ok(entry) = ledger_window.get_entry(ix) {
|
||||
inc_new_counter!("crdt-window-request-ledger", 1);
|
||||
|
||||
let out = entry.to_blob(blob_recycler, Some(ix), Some(from.id));
|
||||
|
||||
return Some(out);
|
||||
}
|
||||
}
|
||||
|
||||
inc_new_counter!("crdt-window-request-fail", 1);
|
||||
info!(
|
||||
"{:x}: failed RequestWindowIndex {:x} {} {}",
|
||||
me.debug_id(),
|
||||
from.debug_id(),
|
||||
ix,
|
||||
pos,
|
||||
);
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
@@ -1026,11 +1038,14 @@ impl Crdt {
|
||||
fn handle_blob(
|
||||
obj: &Arc<RwLock<Self>>,
|
||||
window: &SharedWindow,
|
||||
ledger_window: &mut Option<&mut LedgerWindow>,
|
||||
blob_recycler: &BlobRecycler,
|
||||
blob: &Blob,
|
||||
) -> Option<SharedBlob> {
|
||||
match deserialize(&blob.data[..blob.meta.size]) {
|
||||
Ok(request) => Crdt::handle_protocol(request, obj, window, blob_recycler),
|
||||
Ok(request) => {
|
||||
Crdt::handle_protocol(request, obj, window, ledger_window, blob_recycler)
|
||||
}
|
||||
Err(_) => {
|
||||
warn!("deserialize crdt packet failed");
|
||||
None
|
||||
@@ -1042,6 +1057,7 @@ impl Crdt {
|
||||
request: Protocol,
|
||||
obj: &Arc<RwLock<Self>>,
|
||||
window: &SharedWindow,
|
||||
ledger_window: &mut Option<&mut LedgerWindow>,
|
||||
blob_recycler: &BlobRecycler,
|
||||
) -> Option<SharedBlob> {
|
||||
match request {
|
||||
@@ -1129,7 +1145,7 @@ impl Crdt {
|
||||
inc_new_counter!("crdt-window-request-address-eq", 1);
|
||||
return None;
|
||||
}
|
||||
Self::run_window_request(&window, &me, &from, ix, blob_recycler)
|
||||
Self::run_window_request(&window, ledger_window, &me, &from, ix, blob_recycler)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1138,6 +1154,7 @@ impl Crdt {
|
||||
fn run_listen(
|
||||
obj: &Arc<RwLock<Self>>,
|
||||
window: &SharedWindow,
|
||||
ledger_window: &mut Option<&mut LedgerWindow>,
|
||||
blob_recycler: &BlobRecycler,
|
||||
requests_receiver: &BlobReceiver,
|
||||
response_sender: &BlobSender,
|
||||
@@ -1148,31 +1165,42 @@ impl Crdt {
|
||||
while let Ok(mut more) = requests_receiver.try_recv() {
|
||||
reqs.append(&mut more);
|
||||
}
|
||||
let resp: VecDeque<_> = reqs
|
||||
.iter()
|
||||
.filter_map(|b| Self::handle_blob(obj, window, blob_recycler, &b.read().unwrap()))
|
||||
.collect();
|
||||
response_sender.send(resp)?;
|
||||
while let Some(r) = reqs.pop_front() {
|
||||
blob_recycler.recycle(r);
|
||||
let mut resps = VecDeque::new();
|
||||
while let Some(req) = reqs.pop_front() {
|
||||
if let Some(resp) = Self::handle_blob(
|
||||
obj,
|
||||
window,
|
||||
ledger_window,
|
||||
blob_recycler,
|
||||
&req.read().unwrap(),
|
||||
) {
|
||||
resps.push_back(resp);
|
||||
}
|
||||
blob_recycler.recycle(req);
|
||||
}
|
||||
response_sender.send(resps)?;
|
||||
Ok(())
|
||||
}
|
||||
pub fn listen(
|
||||
obj: Arc<RwLock<Self>>,
|
||||
window: SharedWindow,
|
||||
ledger_path: Option<&str>,
|
||||
blob_recycler: BlobRecycler,
|
||||
requests_receiver: BlobReceiver,
|
||||
response_sender: BlobSender,
|
||||
exit: Arc<AtomicBool>,
|
||||
) -> JoinHandle<()> {
|
||||
let debug_id = obj.read().unwrap().debug_id();
|
||||
|
||||
let mut ledger_window = ledger_path.map(|p| LedgerWindow::new(p).unwrap());
|
||||
|
||||
Builder::new()
|
||||
.name("solana-listen".to_string())
|
||||
.spawn(move || loop {
|
||||
let e = Self::run_listen(
|
||||
&obj,
|
||||
&window,
|
||||
&mut ledger_window.as_mut(),
|
||||
&blob_recycler,
|
||||
&requests_receiver,
|
||||
&response_sender,
|
||||
@@ -1314,11 +1342,14 @@ mod tests {
|
||||
parse_port_or_addr, Crdt, CrdtError, NodeInfo, Protocol, GOSSIP_PURGE_MILLIS,
|
||||
GOSSIP_SLEEP_MILLIS, MIN_TABLE_SIZE,
|
||||
};
|
||||
use hash::Hash;
|
||||
use entry::Entry;
|
||||
use hash::{hash, Hash};
|
||||
use ledger::{LedgerWindow, LedgerWriter};
|
||||
use logger;
|
||||
use packet::BlobRecycler;
|
||||
use result::Error;
|
||||
use signature::{KeyPair, KeyPairUtil, PublicKey};
|
||||
use std::fs::remove_dir_all;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::mpsc::channel;
|
||||
use std::sync::{Arc, RwLock};
|
||||
@@ -1827,6 +1858,7 @@ mod tests {
|
||||
/// test window requests respond with the right blob, and do not overrun
|
||||
#[test]
|
||||
fn run_window_request() {
|
||||
logger::setup();
|
||||
let window = default_window();
|
||||
let me = NodeInfo::new(
|
||||
KeyPair::new().pubkey(),
|
||||
@@ -1837,19 +1869,48 @@ mod tests {
|
||||
"127.0.0.1:1238".parse().unwrap(),
|
||||
);
|
||||
let recycler = BlobRecycler::default();
|
||||
let rv = Crdt::run_window_request(&window, &me, &me, 0, &recycler);
|
||||
let rv = Crdt::run_window_request(&window, &mut None, &me, &me, 0, &recycler);
|
||||
assert!(rv.is_none());
|
||||
let out = recycler.allocate();
|
||||
out.write().unwrap().meta.size = 200;
|
||||
window.write().unwrap()[0].data = Some(out);
|
||||
let rv = Crdt::run_window_request(&window, &me, &me, 0, &recycler);
|
||||
let rv = Crdt::run_window_request(&window, &mut None, &me, &me, 0, &recycler);
|
||||
assert!(rv.is_some());
|
||||
let v = rv.unwrap();
|
||||
//test we copied the blob
|
||||
assert_eq!(v.read().unwrap().meta.size, 200);
|
||||
let len = window.read().unwrap().len() as u64;
|
||||
let rv = Crdt::run_window_request(&window, &me, &me, len, &recycler);
|
||||
let rv = Crdt::run_window_request(&window, &mut None, &me, &me, len, &recycler);
|
||||
assert!(rv.is_none());
|
||||
|
||||
fn tmp_ledger(name: &str) -> String {
|
||||
let keypair = KeyPair::new();
|
||||
|
||||
let path = format!("/tmp/farf/{}-{}", name, keypair.pubkey());
|
||||
|
||||
let mut writer = LedgerWriter::new(&path, true).unwrap();
|
||||
let zero = Hash::default();
|
||||
let one = hash(&zero.as_ref());
|
||||
writer
|
||||
.write_entries(vec![Entry::new_tick(0, &zero), Entry::new_tick(0, &one)].to_vec())
|
||||
.unwrap();
|
||||
path
|
||||
}
|
||||
|
||||
let ledger_path = tmp_ledger("run_window_request");
|
||||
let mut ledger_window = LedgerWindow::new(&ledger_path).unwrap();
|
||||
|
||||
let rv = Crdt::run_window_request(
|
||||
&window,
|
||||
&mut Some(&mut ledger_window),
|
||||
&me,
|
||||
&me,
|
||||
1,
|
||||
&recycler,
|
||||
);
|
||||
assert!(rv.is_some());
|
||||
|
||||
remove_dir_all(ledger_path).unwrap();
|
||||
}
|
||||
|
||||
/// test window requests respond with the right blob, and do not overrun
|
||||
@@ -1865,7 +1926,7 @@ mod tests {
|
||||
let recycler = BlobRecycler::default();
|
||||
|
||||
// Simulate handling a repair request from mock_peer
|
||||
let rv = Crdt::run_window_request(&window, &me, &mock_peer, 0, &recycler);
|
||||
let rv = Crdt::run_window_request(&window, &mut None, &me, &mock_peer, 0, &recycler);
|
||||
assert!(rv.is_none());
|
||||
let blob = recycler.allocate();
|
||||
let blob_size = 200;
|
||||
@@ -1874,8 +1935,9 @@ mod tests {
|
||||
|
||||
let num_requests: u32 = 64;
|
||||
for i in 0..num_requests {
|
||||
let shared_blob =
|
||||
Crdt::run_window_request(&window, &me, &mock_peer, 0, &recycler).unwrap();
|
||||
let shared_blob = Crdt::run_window_request(
|
||||
&window, &mut None, &me, &mock_peer, 0, &recycler,
|
||||
).unwrap();
|
||||
let blob = shared_blob.read().unwrap();
|
||||
// Test we copied the blob
|
||||
assert_eq!(blob.meta.size, blob_size);
|
||||
@@ -1935,13 +1997,13 @@ mod tests {
|
||||
let obj = Arc::new(RwLock::new(crdt));
|
||||
|
||||
let request = Protocol::RequestUpdates(1, node.clone());
|
||||
assert!(Crdt::handle_protocol(request, &obj, &window, &recycler).is_none());
|
||||
assert!(Crdt::handle_protocol(request, &obj, &window, &mut None, &recycler).is_none());
|
||||
|
||||
let request = Protocol::RequestUpdates(1, node_with_same_addr.clone());
|
||||
assert!(Crdt::handle_protocol(request, &obj, &window, &recycler).is_none());
|
||||
assert!(Crdt::handle_protocol(request, &obj, &window, &mut None, &recycler).is_none());
|
||||
|
||||
let request = Protocol::RequestUpdates(1, node_with_diff_addr.clone());
|
||||
Crdt::handle_protocol(request, &obj, &window, &recycler);
|
||||
Crdt::handle_protocol(request, &obj, &window, &mut None, &recycler);
|
||||
|
||||
let me = obj.write().unwrap();
|
||||
|
||||
|
Reference in New Issue
Block a user