* count entries processed by Bank * initialize windows with initial height of Entries
This commit is contained in:
committed by
anatoly yakovenko
parent
3966eb5374
commit
1919ec247b
15
src/bank.rs
15
src/bank.rs
@ -87,6 +87,11 @@ pub struct Bank {
|
|||||||
/// The number of transactions the bank has processed without error since the
|
/// The number of transactions the bank has processed without error since the
|
||||||
/// start of the ledger.
|
/// start of the ledger.
|
||||||
transaction_count: AtomicUsize,
|
transaction_count: AtomicUsize,
|
||||||
|
|
||||||
|
/// The number of Entries the bank has processed without error since start
|
||||||
|
/// of the ledger, i.e. poor-man's network synchronization
|
||||||
|
/// TODO: upgrade to U64 when stable?
|
||||||
|
entry_count: AtomicUsize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Bank {
|
impl Bank {
|
||||||
@ -100,6 +105,7 @@ impl Bank {
|
|||||||
time_sources: RwLock::new(HashSet::new()),
|
time_sources: RwLock::new(HashSet::new()),
|
||||||
last_time: RwLock::new(Utc.timestamp(0, 0)),
|
last_time: RwLock::new(Utc.timestamp(0, 0)),
|
||||||
transaction_count: AtomicUsize::new(0),
|
transaction_count: AtomicUsize::new(0),
|
||||||
|
entry_count: AtomicUsize::new(0),
|
||||||
};
|
};
|
||||||
bank.apply_payment(deposit, &mut bank.balances.write().unwrap());
|
bank.apply_payment(deposit, &mut bank.balances.write().unwrap());
|
||||||
bank
|
bank
|
||||||
@ -296,11 +302,13 @@ impl Bank {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Process an ordered list of entries.
|
/// Process an ordered list of entries.
|
||||||
pub fn process_entries<I>(&self, entries: I) -> Result<()>
|
pub fn process_entries<I>(&self, entries: I) -> Result<usize>
|
||||||
where
|
where
|
||||||
I: IntoIterator<Item = Entry>,
|
I: IntoIterator<Item = Entry>,
|
||||||
{
|
{
|
||||||
for entry in entries {
|
for entry in entries {
|
||||||
|
self.entry_count.fetch_add(1, Ordering::Relaxed);
|
||||||
|
|
||||||
if !entry.transactions.is_empty() {
|
if !entry.transactions.is_empty() {
|
||||||
for result in self.process_transactions(entry.transactions) {
|
for result in self.process_transactions(entry.transactions) {
|
||||||
result?;
|
result?;
|
||||||
@ -308,7 +316,7 @@ impl Bank {
|
|||||||
}
|
}
|
||||||
self.register_entry_id(&entry.id);
|
self.register_entry_id(&entry.id);
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(self.entry_count())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Process a Witness Signature. Any payment plans waiting on this signature
|
/// Process a Witness Signature. Any payment plans waiting on this signature
|
||||||
@ -422,6 +430,9 @@ impl Bank {
|
|||||||
pub fn transaction_count(&self) -> usize {
|
pub fn transaction_count(&self) -> usize {
|
||||||
self.transaction_count.load(Ordering::Relaxed)
|
self.transaction_count.load(Ordering::Relaxed)
|
||||||
}
|
}
|
||||||
|
pub fn entry_count(&self) -> usize {
|
||||||
|
self.entry_count.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
@ -102,7 +102,8 @@ fn main() {
|
|||||||
bank.register_entry_id(&entry1.id);
|
bank.register_entry_id(&entry1.id);
|
||||||
|
|
||||||
eprintln!("processing entries...");
|
eprintln!("processing entries...");
|
||||||
bank.process_entries(entries).expect("process_entries");
|
let num_entries = bank.process_entries(entries).expect("process_entries");
|
||||||
|
eprintln!("processed {} entries...", num_entries);
|
||||||
|
|
||||||
eprintln!("creating networking stack...");
|
eprintln!("creating networking stack...");
|
||||||
|
|
||||||
|
@ -395,13 +395,14 @@ pub fn window(
|
|||||||
r: BlobReceiver,
|
r: BlobReceiver,
|
||||||
s: BlobSender,
|
s: BlobSender,
|
||||||
retransmit: BlobSender,
|
retransmit: BlobSender,
|
||||||
|
entry_count: usize,
|
||||||
) -> JoinHandle<()> {
|
) -> JoinHandle<()> {
|
||||||
Builder::new()
|
Builder::new()
|
||||||
.name("solana-window".to_string())
|
.name("solana-window".to_string())
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
let mut consumed = 0;
|
let mut consumed = entry_count;
|
||||||
let mut received = 0;
|
let mut received = entry_count;
|
||||||
let mut last = 0;
|
let mut last = entry_count;
|
||||||
let mut times = 0;
|
let mut times = 0;
|
||||||
loop {
|
loop {
|
||||||
if exit.load(Ordering::Relaxed) {
|
if exit.load(Ordering::Relaxed) {
|
||||||
@ -816,6 +817,7 @@ mod test {
|
|||||||
r_reader,
|
r_reader,
|
||||||
s_window,
|
s_window,
|
||||||
s_retransmit,
|
s_retransmit,
|
||||||
|
0,
|
||||||
);
|
);
|
||||||
let (s_responder, r_responder) = channel();
|
let (s_responder, r_responder) = channel();
|
||||||
let t_responder = responder(
|
let t_responder = responder(
|
||||||
|
@ -86,6 +86,7 @@ impl Tvu {
|
|||||||
exit.clone(),
|
exit.clone(),
|
||||||
blob_recycler.clone(),
|
blob_recycler.clone(),
|
||||||
fetch_stage.blob_receiver,
|
fetch_stage.blob_receiver,
|
||||||
|
bank.entry_count(),
|
||||||
);
|
);
|
||||||
|
|
||||||
let replicate_stage =
|
let replicate_stage =
|
||||||
|
@ -22,6 +22,7 @@ impl WindowStage {
|
|||||||
exit: Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
blob_recycler: packet::BlobRecycler,
|
blob_recycler: packet::BlobRecycler,
|
||||||
fetch_stage_receiver: streamer::BlobReceiver,
|
fetch_stage_receiver: streamer::BlobReceiver,
|
||||||
|
entry_count: usize,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let (retransmit_sender, retransmit_receiver) = channel();
|
let (retransmit_sender, retransmit_receiver) = channel();
|
||||||
|
|
||||||
@ -41,6 +42,7 @@ impl WindowStage {
|
|||||||
fetch_stage_receiver,
|
fetch_stage_receiver,
|
||||||
blob_sender,
|
blob_sender,
|
||||||
retransmit_sender,
|
retransmit_sender,
|
||||||
|
entry_count,
|
||||||
);
|
);
|
||||||
let thread_hdls = vec![t_retransmit, t_window];
|
let thread_hdls = vec![t_retransmit, t_window];
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user