Review comments

This commit is contained in:
Tyera Eulberg
2019-02-13 11:42:56 -07:00
committed by Michael Vines
parent c1447b2695
commit cbaba5cbf3
2 changed files with 42 additions and 30 deletions

View File

@ -57,8 +57,14 @@ impl Output for SocketOutput {
} }
pub trait EntryStreamHandler { pub trait EntryStreamHandler {
fn emit_entry_event(&self, entries: &Entry) -> Result<()>; fn emit_entry_event(&self, slot: u64, leader_id: &str, entries: &Entry) -> Result<()>;
fn emit_block_event(&self, tick_height: u64, last_id: Hash) -> Result<()>; fn emit_block_event(
&self,
slot: u64,
leader_id: &str,
tick_height: u64,
last_id: Hash,
) -> Result<()>;
} }
#[derive(Debug)] #[derive(Debug)]
@ -72,14 +78,7 @@ impl<T> EntryStreamHandler for EntryStream<T>
where where
T: Output, T: Output,
{ {
fn emit_entry_event(&self, entry: &Entry) -> Result<()> { fn emit_entry_event(&self, slot: u64, leader_id: &str, entry: &Entry) -> Result<()> {
let leader_scheduler = self.leader_scheduler.read().unwrap();
let slot = leader_scheduler.tick_height_to_slot(entry.tick_height);
let leader_id = leader_scheduler
.get_leader_for_slot(slot)
.map(|leader| leader.to_string())
.unwrap_or_else(|| "None".to_string());
let json_entry = serde_json::to_string(&entry)?; let json_entry = serde_json::to_string(&entry)?;
let payload = format!( let payload = format!(
r#"{{"dt":"{}","t":"entry","s":{},"l":{:?},"entry":{}}}"#, r#"{{"dt":"{}","t":"entry","s":{},"l":{:?},"entry":{}}}"#,
@ -92,13 +91,13 @@ where
Ok(()) Ok(())
} }
fn emit_block_event(&self, tick_height: u64, last_id: Hash) -> Result<()> { fn emit_block_event(
let leader_scheduler = self.leader_scheduler.read().unwrap(); &self,
let slot = leader_scheduler.tick_height_to_slot(tick_height); slot: u64,
let leader_id = leader_scheduler leader_id: &str,
.get_leader_for_slot(slot) tick_height: u64,
.map(|leader| leader.to_string()) last_id: Hash,
.unwrap_or_else(|| "None".to_string()); ) -> Result<()> {
let payload = format!( let payload = format!(
r#"{{"dt":"{}","t":"block","s":{},"h":{},"l":{:?},"id":"{:?}"}}"#, r#"{{"dt":"{}","t":"block","s":{},"h":{},"l":{:?},"id":"{:?}"}}"#,
Utc::now().to_rfc3339_opts(SecondsFormat::Nanos, true), Utc::now().to_rfc3339_opts(SecondsFormat::Nanos, true),
@ -180,6 +179,13 @@ mod test {
.read() .read()
.unwrap() .unwrap()
.tick_height_to_slot(tick_height_initial); .tick_height_to_slot(tick_height_initial);
let leader_id = bank
.leader_scheduler
.read()
.unwrap()
.get_leader_for_slot(previous_slot)
.map(|leader| leader.to_string())
.unwrap_or_else(|| "None".to_string());
for tick_height in tick_height_initial..=tick_height_final { for tick_height in tick_height_initial..=tick_height_final {
bank.leader_scheduler bank.leader_scheduler
@ -193,13 +199,15 @@ mod test {
.tick_height_to_slot(tick_height); .tick_height_to_slot(tick_height);
if curr_slot != previous_slot { if curr_slot != previous_slot {
entry_stream entry_stream
.emit_block_event(tick_height - 1, last_id) .emit_block_event(previous_slot, &leader_id, tick_height - 1, last_id)
.unwrap(); .unwrap();
} }
let entry = Entry::new(&mut last_id, tick_height, 1, vec![]); //just ticks let entry = Entry::new(&mut last_id, tick_height, 1, vec![]); // just ticks
last_id = entry.id; last_id = entry.id;
previous_slot = curr_slot; previous_slot = curr_slot;
entry_stream.emit_entry_event(&entry).unwrap(); entry_stream
.emit_entry_event(curr_slot, &leader_id, &entry)
.unwrap();
expected_entries.push(entry.clone()); expected_entries.push(entry.clone());
entries.push(entry); entries.push(entry);
} }

View File

@ -59,28 +59,32 @@ impl EntryStreamStage {
) -> Result<()> { ) -> Result<()> {
let timeout = Duration::new(1, 0); let timeout = Duration::new(1, 0);
let entries = ledger_entry_receiver.recv_timeout(timeout)?; let entries = ledger_entry_receiver.recv_timeout(timeout)?;
let leader_scheduler = entry_stream.leader_scheduler.read().unwrap();
for entry in &entries { for entry in &entries {
let slot = leader_scheduler.tick_height_to_slot(entry.tick_height);
let leader_id = leader_scheduler
.get_leader_for_slot(slot)
.map(|leader| leader.to_string())
.unwrap_or_else(|| "None".to_string());
if entry.is_tick() && entry_stream.queued_block.is_some() { if entry.is_tick() && entry_stream.queued_block.is_some() {
let queued_block = entry_stream.queued_block.as_ref(); let queued_block = entry_stream.queued_block.as_ref();
let block_tick_height = queued_block.unwrap().tick_height; let block_tick_height = queued_block.unwrap().tick_height;
let block_id = queued_block.unwrap().id; let block_id = queued_block.unwrap().id;
entry_stream entry_stream
.emit_block_event(block_tick_height, block_id) .emit_block_event(slot, &leader_id, block_tick_height, block_id)
.unwrap_or_else(|e| { .unwrap_or_else(|e| {
error!("Entry Stream error: {:?}, {:?}", e, entry_stream.output); error!("Entry Stream error: {:?}, {:?}", e, entry_stream.output);
}); });
entry_stream.queued_block = None; entry_stream.queued_block = None;
} }
entry_stream.emit_entry_event(&entry).unwrap_or_else(|e| { entry_stream
error!("Entry Stream error: {:?}, {:?}", e, entry_stream.output); .emit_entry_event(slot, &leader_id, &entry)
}); .unwrap_or_else(|e| {
if 0 == entry_stream error!("Entry Stream error: {:?}, {:?}", e, entry_stream.output);
.leader_scheduler });
.read() if 0 == leader_scheduler.num_ticks_left_in_slot(entry.tick_height) {
.unwrap()
.num_ticks_left_in_slot(entry.tick_height)
{
entry_stream.queued_block = Some(EntryStreamBlock { entry_stream.queued_block = Some(EntryStreamBlock {
tick_height: entry.tick_height, tick_height: entry.tick_height,
id: entry.id, id: entry.id,