Fixes for serializing entries over blobs and reorg into ledger
This commit is contained in:
@ -6,6 +6,7 @@
|
|||||||
extern crate libc;
|
extern crate libc;
|
||||||
|
|
||||||
use chrono::prelude::*;
|
use chrono::prelude::*;
|
||||||
|
use entry::Entry;
|
||||||
use event::Event;
|
use event::Event;
|
||||||
use hash::Hash;
|
use hash::Hash;
|
||||||
use mint::Mint;
|
use mint::Mint;
|
||||||
@ -15,8 +16,8 @@ use signature::{KeyPair, PublicKey, Signature};
|
|||||||
use std::collections::hash_map::Entry::Occupied;
|
use std::collections::hash_map::Entry::Occupied;
|
||||||
use std::collections::{HashMap, HashSet, VecDeque};
|
use std::collections::{HashMap, HashSet, VecDeque};
|
||||||
use std::result;
|
use std::result;
|
||||||
use std::sync::atomic::{AtomicIsize, Ordering};
|
|
||||||
use std::sync::RwLock;
|
use std::sync::RwLock;
|
||||||
|
use std::sync::atomic::{AtomicIsize, Ordering};
|
||||||
use transaction::Transaction;
|
use transaction::Transaction;
|
||||||
|
|
||||||
pub const MAX_ENTRY_IDS: usize = 1024 * 4;
|
pub const MAX_ENTRY_IDS: usize = 1024 * 4;
|
||||||
@ -232,6 +233,16 @@ impl Accountant {
|
|||||||
results
|
results
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn process_verified_entries(&self, entries: Vec<Entry>) -> Result<()> {
|
||||||
|
for entry in entries {
|
||||||
|
self.register_entry_id(&entry.id);
|
||||||
|
for result in self.process_verified_events(entry.events) {
|
||||||
|
result?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Process a Witness Signature that has already been verified.
|
/// Process a Witness Signature that has already been verified.
|
||||||
fn process_verified_sig(&self, from: PublicKey, tx_sig: Signature) -> Result<()> {
|
fn process_verified_sig(&self, from: PublicKey, tx_sig: Signature) -> Result<()> {
|
||||||
if let Occupied(mut e) = self.pending.write().unwrap().entry(tx_sig) {
|
if let Occupied(mut e) = self.pending.write().unwrap().entry(tx_sig) {
|
||||||
|
@ -17,8 +17,8 @@ use std::env;
|
|||||||
use std::io::{stdin, stdout, Read};
|
use std::io::{stdin, stdout, Read};
|
||||||
use std::net::UdpSocket;
|
use std::net::UdpSocket;
|
||||||
use std::process::exit;
|
use std::process::exit;
|
||||||
use std::sync::atomic::AtomicBool;
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::sync::atomic::AtomicBool;
|
||||||
|
|
||||||
fn print_usage(program: &str, opts: Options) {
|
fn print_usage(program: &str, opts: Options) {
|
||||||
let mut brief = format!("Usage: cat <transaction.log> | {} [options]\n\n", program);
|
let mut brief = format!("Usage: cat <transaction.log> | {} [options]\n\n", program);
|
||||||
|
@ -135,8 +135,8 @@ mod tests {
|
|||||||
use packet::{Packet, Packets, SharedPackets};
|
use packet::{Packet, Packets, SharedPackets};
|
||||||
use std::sync::RwLock;
|
use std::sync::RwLock;
|
||||||
use thin_client_service::Request;
|
use thin_client_service::Request;
|
||||||
use transaction::test_tx;
|
|
||||||
use transaction::Transaction;
|
use transaction::Transaction;
|
||||||
|
use transaction::test_tx;
|
||||||
|
|
||||||
fn make_packet_from_transaction(tr: Transaction) -> Packet {
|
fn make_packet_from_transaction(tr: Transaction) -> Packet {
|
||||||
let tx = serialize(&Request::Transaction(tr)).unwrap();
|
let tx = serialize(&Request::Transaction(tr)).unwrap();
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
//! The `hash` module provides functions for creating SHA-256 hashes.
|
//! The `hash` module provides functions for creating SHA-256 hashes.
|
||||||
|
|
||||||
use generic_array::typenum::U32;
|
|
||||||
use generic_array::GenericArray;
|
use generic_array::GenericArray;
|
||||||
|
use generic_array::typenum::U32;
|
||||||
use sha2::{Digest, Sha256};
|
use sha2::{Digest, Sha256};
|
||||||
|
|
||||||
pub type Hash = GenericArray<u8, U32>;
|
pub type Hash = GenericArray<u8, U32>;
|
||||||
|
@ -4,8 +4,8 @@
|
|||||||
use entry::Entry;
|
use entry::Entry;
|
||||||
use hash::Hash;
|
use hash::Hash;
|
||||||
use recorder::{ExitReason, Recorder, Signal};
|
use recorder::{ExitReason, Recorder, Signal};
|
||||||
use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError};
|
|
||||||
use std::sync::Mutex;
|
use std::sync::Mutex;
|
||||||
|
use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError};
|
||||||
use std::thread::{spawn, JoinHandle};
|
use std::thread::{spawn, JoinHandle};
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
|
|
||||||
|
111
src/ledger.rs
111
src/ledger.rs
@ -1,9 +1,17 @@
|
|||||||
//! The `ledger` module provides functions for parallel verification of the
|
//! The `ledger` module provides functions for parallel verification of the
|
||||||
//! Proof of History ledger.
|
//! Proof of History ledger.
|
||||||
|
|
||||||
|
use bincode::{deserialize, serialize_into};
|
||||||
use entry::{next_tick, Entry};
|
use entry::{next_tick, Entry};
|
||||||
|
use event::Event;
|
||||||
use hash::Hash;
|
use hash::Hash;
|
||||||
|
use packet;
|
||||||
|
use packet::{SharedBlob, BLOB_DATA_SIZE, BLOB_SIZE};
|
||||||
use rayon::prelude::*;
|
use rayon::prelude::*;
|
||||||
|
use std::cmp::min;
|
||||||
|
use std::collections::VecDeque;
|
||||||
|
use std::io::Cursor;
|
||||||
|
use std::mem::size_of;
|
||||||
|
|
||||||
pub trait Block {
|
pub trait Block {
|
||||||
/// Verifies the hashes and counts of a slice of events are all consistent.
|
/// Verifies the hashes and counts of a slice of events are all consistent.
|
||||||
@ -30,10 +38,95 @@ pub fn next_ticks(start_hash: &Hash, num_hashes: u64, len: usize) -> Vec<Entry>
|
|||||||
ticks
|
ticks
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn process_entry_list_into_blobs(
|
||||||
|
list: &Vec<Entry>,
|
||||||
|
blob_recycler: &packet::BlobRecycler,
|
||||||
|
q: &mut VecDeque<SharedBlob>,
|
||||||
|
) {
|
||||||
|
let mut start = 0;
|
||||||
|
let mut end = 0;
|
||||||
|
while start < list.len() {
|
||||||
|
let mut entries: Vec<Vec<Entry>> = Vec::new();
|
||||||
|
let mut total = 0;
|
||||||
|
for i in &list[start..] {
|
||||||
|
total += size_of::<Event>() * i.events.len();
|
||||||
|
total += size_of::<Entry>();
|
||||||
|
if total >= BLOB_DATA_SIZE {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
end += 1;
|
||||||
|
}
|
||||||
|
// See if we need to split the events
|
||||||
|
if end <= start {
|
||||||
|
let mut event_start = 0;
|
||||||
|
let num_events_per_blob = BLOB_DATA_SIZE / size_of::<Event>();
|
||||||
|
let total_entry_chunks =
|
||||||
|
(list[end].events.len() + num_events_per_blob - 1) / num_events_per_blob;
|
||||||
|
trace!(
|
||||||
|
"splitting events end: {} total_chunks: {}",
|
||||||
|
end,
|
||||||
|
total_entry_chunks
|
||||||
|
);
|
||||||
|
for _ in 0..total_entry_chunks {
|
||||||
|
let event_end = min(event_start + num_events_per_blob, list[end].events.len());
|
||||||
|
let mut entry = Entry {
|
||||||
|
num_hashes: list[end].num_hashes,
|
||||||
|
id: list[end].id,
|
||||||
|
events: list[end].events[event_start..event_end].to_vec(),
|
||||||
|
};
|
||||||
|
entries.push(vec![entry]);
|
||||||
|
event_start = event_end;
|
||||||
|
}
|
||||||
|
end += 1;
|
||||||
|
} else {
|
||||||
|
entries.push(list[start..end].to_vec());
|
||||||
|
}
|
||||||
|
|
||||||
|
for entry in entries {
|
||||||
|
let b = blob_recycler.allocate();
|
||||||
|
let pos = {
|
||||||
|
let mut bd = b.write().unwrap();
|
||||||
|
let mut out = Cursor::new(bd.data_mut());
|
||||||
|
serialize_into(&mut out, &entry).expect("failed to serialize output");
|
||||||
|
out.position() as usize
|
||||||
|
};
|
||||||
|
assert!(pos < BLOB_SIZE);
|
||||||
|
b.write().unwrap().set_size(pos);
|
||||||
|
q.push_back(b);
|
||||||
|
}
|
||||||
|
start = end;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn reconstruct_entries_from_blobs(blobs: &VecDeque<SharedBlob>) -> Vec<Entry> {
|
||||||
|
let mut entries_to_apply: Vec<Entry> = Vec::new();
|
||||||
|
let mut last_id = Hash::default();
|
||||||
|
for msgs in blobs {
|
||||||
|
let blob = msgs.read().unwrap();
|
||||||
|
let entries: Vec<Entry> = deserialize(&blob.data()[..blob.meta.size]).unwrap();
|
||||||
|
for entry in entries {
|
||||||
|
if entry.id == last_id {
|
||||||
|
if let Some(last_entry) = entries_to_apply.last_mut() {
|
||||||
|
last_entry.events.extend(entry.events);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
last_id = entry.id;
|
||||||
|
entries_to_apply.push(entry);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//TODO respond back to leader with hash of the state
|
||||||
|
}
|
||||||
|
entries_to_apply
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use entry;
|
||||||
use hash::hash;
|
use hash::hash;
|
||||||
|
use packet::BlobRecycler;
|
||||||
|
use signature::{KeyPair, KeyPairUtil};
|
||||||
|
use transaction::Transaction;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_verify_slice() {
|
fn test_verify_slice() {
|
||||||
@ -48,6 +141,24 @@ mod tests {
|
|||||||
bad_ticks[1].id = one;
|
bad_ticks[1].id = one;
|
||||||
assert!(!bad_ticks.verify(&zero)); // inductive step, bad
|
assert!(!bad_ticks.verify(&zero)); // inductive step, bad
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_entry_to_blobs() {
|
||||||
|
let zero = Hash::default();
|
||||||
|
let one = hash(&zero);
|
||||||
|
let keypair = KeyPair::new();
|
||||||
|
let tr0 = Event::Transaction(Transaction::new(&keypair, keypair.pubkey(), 1, one));
|
||||||
|
let events = vec![tr0.clone(); 10000];
|
||||||
|
let e0 = entry::create_entry(&zero, 0, events);
|
||||||
|
|
||||||
|
let entry_list = vec![e0.clone(); 1];
|
||||||
|
let blob_recycler = BlobRecycler::default();
|
||||||
|
let mut blob_q = VecDeque::new();
|
||||||
|
process_entry_list_into_blobs(&entry_list, &blob_recycler, &mut blob_q);
|
||||||
|
let entries = reconstruct_entries_from_blobs(&blob_q);
|
||||||
|
|
||||||
|
assert_eq!(entry_list, entries);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(all(feature = "unstable", test))]
|
#[cfg(all(feature = "unstable", test))]
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
//! The `mint` module is a library for generating the chain's genesis block.
|
//! The `mint` module is a library for generating the chain's genesis block.
|
||||||
|
|
||||||
use entry::create_entry;
|
|
||||||
use entry::Entry;
|
use entry::Entry;
|
||||||
|
use entry::create_entry;
|
||||||
use event::Event;
|
use event::Event;
|
||||||
use hash::{hash, Hash};
|
use hash::{hash, Hash};
|
||||||
use ring::rand::SystemRandom;
|
use ring::rand::SystemRandom;
|
||||||
|
@ -78,9 +78,9 @@ mod tests {
|
|||||||
use std::io;
|
use std::io;
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::sync::mpsc::channel;
|
|
||||||
use std::sync::mpsc::RecvError;
|
use std::sync::mpsc::RecvError;
|
||||||
use std::sync::mpsc::RecvTimeoutError;
|
use std::sync::mpsc::RecvTimeoutError;
|
||||||
|
use std::sync::mpsc::channel;
|
||||||
use std::thread;
|
use std::thread;
|
||||||
|
|
||||||
fn addr_parse_error() -> Result<SocketAddr> {
|
fn addr_parse_error() -> Result<SocketAddr> {
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
//! The `signature` module provides functionality for public, and private keys.
|
//! The `signature` module provides functionality for public, and private keys.
|
||||||
|
|
||||||
use generic_array::typenum::{U32, U64};
|
|
||||||
use generic_array::GenericArray;
|
use generic_array::GenericArray;
|
||||||
|
use generic_array::typenum::{U32, U64};
|
||||||
use ring::signature::Ed25519KeyPair;
|
use ring::signature::Ed25519KeyPair;
|
||||||
use ring::{rand, signature};
|
use ring::{rand, signature};
|
||||||
use untrusted;
|
use untrusted;
|
||||||
|
@ -438,8 +438,8 @@ mod test {
|
|||||||
use std::sync::{Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
use std::thread::sleep;
|
use std::thread::sleep;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use streamer::{blob_receiver, receiver, responder, retransmitter, window};
|
|
||||||
use streamer::{BlobReceiver, PacketReceiver};
|
use streamer::{BlobReceiver, PacketReceiver};
|
||||||
|
use streamer::{blob_receiver, receiver, responder, retransmitter, window};
|
||||||
|
|
||||||
fn get_msgs(r: PacketReceiver, num: &mut usize) {
|
fn get_msgs(r: PacketReceiver, num: &mut usize) {
|
||||||
for _t in 0..5 {
|
for _t in 0..5 {
|
||||||
|
119
src/tpu.rs
119
src/tpu.rs
@ -3,21 +3,21 @@
|
|||||||
|
|
||||||
use accountant::Accountant;
|
use accountant::Accountant;
|
||||||
use accounting_stage::AccountingStage;
|
use accounting_stage::AccountingStage;
|
||||||
use bincode::{deserialize, serialize, serialize_into};
|
use bincode::{deserialize, serialize};
|
||||||
use crdt::{Crdt, ReplicatedData};
|
use crdt::{Crdt, ReplicatedData};
|
||||||
use ecdsa;
|
use ecdsa;
|
||||||
use entry::Entry;
|
use entry::Entry;
|
||||||
use event::Event;
|
use event::Event;
|
||||||
|
use ledger;
|
||||||
use packet;
|
use packet;
|
||||||
use packet::{SharedBlob, SharedPackets, BLOB_DATA_SIZE, BLOB_SIZE};
|
use packet::SharedPackets;
|
||||||
use rand::{thread_rng, Rng};
|
use rand::{thread_rng, Rng};
|
||||||
use rayon::prelude::*;
|
use rayon::prelude::*;
|
||||||
use result::Result;
|
use result::Result;
|
||||||
use serde_json;
|
use serde_json;
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
|
use std::io::Write;
|
||||||
use std::io::sink;
|
use std::io::sink;
|
||||||
use std::io::{Cursor, Write};
|
|
||||||
use std::mem::size_of;
|
|
||||||
use std::net::{SocketAddr, UdpSocket};
|
use std::net::{SocketAddr, UdpSocket};
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::sync::mpsc::{channel, Receiver, Sender};
|
use std::sync::mpsc::{channel, Receiver, Sender};
|
||||||
@ -75,61 +75,6 @@ impl Tpu {
|
|||||||
Ok(l)
|
Ok(l)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn process_entry_list_into_blobs(
|
|
||||||
list: &Vec<Entry>,
|
|
||||||
blob_recycler: &packet::BlobRecycler,
|
|
||||||
q: &mut VecDeque<SharedBlob>,
|
|
||||||
) {
|
|
||||||
let mut start = 0;
|
|
||||||
let mut end = 0;
|
|
||||||
while start < list.len() {
|
|
||||||
let mut entries: Vec<Vec<Entry>> = Vec::new();
|
|
||||||
let mut total = 0;
|
|
||||||
for i in &list[start..] {
|
|
||||||
total += size_of::<Event>() * i.events.len();
|
|
||||||
total += size_of::<Entry>();
|
|
||||||
if total >= BLOB_DATA_SIZE {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
end += 1;
|
|
||||||
}
|
|
||||||
// See if we need to split the events
|
|
||||||
if end <= start {
|
|
||||||
trace!("splitting events");
|
|
||||||
let mut event_start = 0;
|
|
||||||
let num_events_per_blob = BLOB_DATA_SIZE / size_of::<Event>();
|
|
||||||
let total_entry_chunks = list[end].events.len() / num_events_per_blob;
|
|
||||||
for _ in 0..total_entry_chunks {
|
|
||||||
let event_end = event_start + num_events_per_blob;
|
|
||||||
let mut entry = Entry {
|
|
||||||
num_hashes: list[end].num_hashes,
|
|
||||||
id: list[end].id,
|
|
||||||
events: list[end].events[event_start..event_end].to_vec(),
|
|
||||||
};
|
|
||||||
entries.push(vec![entry]);
|
|
||||||
event_start = event_end;
|
|
||||||
}
|
|
||||||
end += 1;
|
|
||||||
} else {
|
|
||||||
entries.push(list[start..end].to_vec());
|
|
||||||
}
|
|
||||||
|
|
||||||
for entry in entries {
|
|
||||||
let b = blob_recycler.allocate();
|
|
||||||
let pos = {
|
|
||||||
let mut bd = b.write().unwrap();
|
|
||||||
let mut out = Cursor::new(bd.data_mut());
|
|
||||||
serialize_into(&mut out, &entry).expect("failed to serialize output");
|
|
||||||
out.position() as usize
|
|
||||||
};
|
|
||||||
assert!(pos < BLOB_SIZE);
|
|
||||||
b.write().unwrap().set_size(pos);
|
|
||||||
q.push_back(b);
|
|
||||||
}
|
|
||||||
start = end;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Process any Entry items that have been published by the Historian.
|
/// Process any Entry items that have been published by the Historian.
|
||||||
/// continuosly broadcast blobs of entries out
|
/// continuosly broadcast blobs of entries out
|
||||||
fn run_sync<W: Write>(
|
fn run_sync<W: Write>(
|
||||||
@ -141,7 +86,7 @@ impl Tpu {
|
|||||||
let mut q = VecDeque::new();
|
let mut q = VecDeque::new();
|
||||||
let list = Self::receive_all(&obj, writer)?;
|
let list = Self::receive_all(&obj, writer)?;
|
||||||
trace!("New blobs? {}", list.len());
|
trace!("New blobs? {}", list.len());
|
||||||
Self::process_entry_list_into_blobs(&list, blob_recycler, &mut q);
|
ledger::process_entry_list_into_blobs(&list, blob_recycler, &mut q);
|
||||||
if !q.is_empty() {
|
if !q.is_empty() {
|
||||||
broadcast.send(q)?;
|
broadcast.send(q)?;
|
||||||
}
|
}
|
||||||
@ -381,6 +326,7 @@ impl Tpu {
|
|||||||
);
|
);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Process verified blobs, already in order
|
/// Process verified blobs, already in order
|
||||||
/// Respond with a signed hash of the state
|
/// Respond with a signed hash of the state
|
||||||
fn replicate_state(
|
fn replicate_state(
|
||||||
@ -391,18 +337,10 @@ impl Tpu {
|
|||||||
let timer = Duration::new(1, 0);
|
let timer = Duration::new(1, 0);
|
||||||
let blobs = verified_receiver.recv_timeout(timer)?;
|
let blobs = verified_receiver.recv_timeout(timer)?;
|
||||||
trace!("replicating blobs {}", blobs.len());
|
trace!("replicating blobs {}", blobs.len());
|
||||||
for msgs in &blobs {
|
let entries = ledger::reconstruct_entries_from_blobs(&blobs);
|
||||||
let blob = msgs.read().unwrap();
|
obj.accounting_stage
|
||||||
let entries: Vec<Entry> = deserialize(&blob.data()[..blob.meta.size]).unwrap();
|
.accountant
|
||||||
let accountant = &obj.accounting_stage.accountant;
|
.process_verified_entries(entries)?;
|
||||||
for entry in entries {
|
|
||||||
accountant.register_entry_id(&entry.id);
|
|
||||||
for result in accountant.process_verified_events(entry.events) {
|
|
||||||
result?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
//TODO respond back to leader with hash of the state
|
|
||||||
}
|
|
||||||
for blob in blobs {
|
for blob in blobs {
|
||||||
blob_recycler.recycle(blob);
|
blob_recycler.recycle(blob);
|
||||||
}
|
}
|
||||||
@ -709,7 +647,7 @@ pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocke
|
|||||||
mod tests {
|
mod tests {
|
||||||
use accountant::Accountant;
|
use accountant::Accountant;
|
||||||
use accounting_stage::AccountingStage;
|
use accounting_stage::AccountingStage;
|
||||||
use bincode::{deserialize, serialize};
|
use bincode::serialize;
|
||||||
use chrono::prelude::*;
|
use chrono::prelude::*;
|
||||||
use crdt::Crdt;
|
use crdt::Crdt;
|
||||||
use ecdsa;
|
use ecdsa;
|
||||||
@ -718,7 +656,7 @@ mod tests {
|
|||||||
use hash::{hash, Hash};
|
use hash::{hash, Hash};
|
||||||
use logger;
|
use logger;
|
||||||
use mint::Mint;
|
use mint::Mint;
|
||||||
use packet::{BlobRecycler, PacketRecycler, BLOB_SIZE, NUM_PACKETS};
|
use packet::{BlobRecycler, PacketRecycler, NUM_PACKETS};
|
||||||
use signature::{KeyPair, KeyPairUtil};
|
use signature::{KeyPair, KeyPairUtil};
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
@ -894,37 +832,4 @@ mod tests {
|
|||||||
t_l_listen.join().expect("join");
|
t_l_listen.join().expect("join");
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_entry_to_blobs() {
|
|
||||||
let zero = Hash::default();
|
|
||||||
let keypair = KeyPair::new();
|
|
||||||
let tr0 = Event::Transaction(Transaction::new(&keypair, keypair.pubkey(), 0, zero));
|
|
||||||
let events = vec![tr0.clone(); 10000];
|
|
||||||
//let tr1 = Event::Transaction(Transaction::new(&keypair, keypair.pubkey(), 1, zero));
|
|
||||||
let e0 = entry::create_entry(&zero, 0, events);
|
|
||||||
|
|
||||||
let entry_list = vec![e0.clone(); 1];
|
|
||||||
let blob_recycler = BlobRecycler::default();
|
|
||||||
let mut blob_q = VecDeque::new();
|
|
||||||
Tpu::process_entry_list_into_blobs(&entry_list, &blob_recycler, &mut blob_q);
|
|
||||||
let serialized_entry_list = serialize(&entry_list).unwrap();
|
|
||||||
let mut num_blobs_ref = serialized_entry_list.len() / BLOB_SIZE;
|
|
||||||
if serialized_entry_list.len() % BLOB_SIZE != 0 {
|
|
||||||
num_blobs_ref += 1
|
|
||||||
}
|
|
||||||
let mut new_events = Vec::new();
|
|
||||||
for b in &blob_q {
|
|
||||||
let blob = b.read().unwrap();
|
|
||||||
let entries: Vec<entry::Entry> = deserialize(&blob.data()[..blob.meta.size]).unwrap();
|
|
||||||
assert_eq!(entries.len(), 1);
|
|
||||||
assert_eq!(entries[0].num_hashes, e0.num_hashes);
|
|
||||||
assert_eq!(entries[0].id, e0.id);
|
|
||||||
new_events.extend(entries[0].events.clone());
|
|
||||||
}
|
|
||||||
for (i, e) in new_events.iter().enumerate() {
|
|
||||||
assert_eq!(*e, e0.events[i]);
|
|
||||||
}
|
|
||||||
trace!("len: {} ref_len: {}", blob_q.len(), num_blobs_ref);
|
|
||||||
assert!(blob_q.len() > num_blobs_ref);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user