diff --git a/src/entry_stream.rs b/src/entry_stream.rs index 887d7f3394..32edb79329 100644 --- a/src/entry_stream.rs +++ b/src/entry_stream.rs @@ -57,8 +57,14 @@ impl Output for SocketOutput { } pub trait EntryStreamHandler { - fn emit_entry_event(&self, entries: &Entry) -> Result<()>; - fn emit_block_event(&self, tick_height: u64, last_id: Hash) -> Result<()>; + fn emit_entry_event(&self, slot: u64, leader_id: &str, entries: &Entry) -> Result<()>; + fn emit_block_event( + &self, + slot: u64, + leader_id: &str, + tick_height: u64, + last_id: Hash, + ) -> Result<()>; } #[derive(Debug)] @@ -72,14 +78,7 @@ impl EntryStreamHandler for EntryStream where T: Output, { - fn emit_entry_event(&self, entry: &Entry) -> Result<()> { - let leader_scheduler = self.leader_scheduler.read().unwrap(); - let slot = leader_scheduler.tick_height_to_slot(entry.tick_height); - let leader_id = leader_scheduler - .get_leader_for_slot(slot) - .map(|leader| leader.to_string()) - .unwrap_or_else(|| "None".to_string()); - + fn emit_entry_event(&self, slot: u64, leader_id: &str, entry: &Entry) -> Result<()> { let json_entry = serde_json::to_string(&entry)?; let payload = format!( r#"{{"dt":"{}","t":"entry","s":{},"l":{:?},"entry":{}}}"#, @@ -92,13 +91,13 @@ where Ok(()) } - fn emit_block_event(&self, tick_height: u64, last_id: Hash) -> Result<()> { - let leader_scheduler = self.leader_scheduler.read().unwrap(); - let slot = leader_scheduler.tick_height_to_slot(tick_height); - let leader_id = leader_scheduler - .get_leader_for_slot(slot) - .map(|leader| leader.to_string()) - .unwrap_or_else(|| "None".to_string()); + fn emit_block_event( + &self, + slot: u64, + leader_id: &str, + tick_height: u64, + last_id: Hash, + ) -> Result<()> { let payload = format!( r#"{{"dt":"{}","t":"block","s":{},"h":{},"l":{:?},"id":"{:?}"}}"#, Utc::now().to_rfc3339_opts(SecondsFormat::Nanos, true), @@ -180,6 +179,13 @@ mod test { .read() .unwrap() .tick_height_to_slot(tick_height_initial); + let leader_id = bank + .leader_scheduler + .read() + .unwrap() + .get_leader_for_slot(previous_slot) + .map(|leader| leader.to_string()) + .unwrap_or_else(|| "None".to_string()); for tick_height in tick_height_initial..=tick_height_final { bank.leader_scheduler @@ -193,13 +199,15 @@ mod test { .tick_height_to_slot(tick_height); if curr_slot != previous_slot { entry_stream - .emit_block_event(tick_height - 1, last_id) + .emit_block_event(previous_slot, &leader_id, tick_height - 1, last_id) .unwrap(); } - let entry = Entry::new(&mut last_id, tick_height, 1, vec![]); //just ticks + let entry = Entry::new(&mut last_id, tick_height, 1, vec![]); // just ticks last_id = entry.id; previous_slot = curr_slot; - entry_stream.emit_entry_event(&entry).unwrap(); + entry_stream + .emit_entry_event(curr_slot, &leader_id, &entry) + .unwrap(); expected_entries.push(entry.clone()); entries.push(entry); } diff --git a/src/entry_stream_stage.rs b/src/entry_stream_stage.rs index 27e554a5aa..569052ecdd 100644 --- a/src/entry_stream_stage.rs +++ b/src/entry_stream_stage.rs @@ -59,28 +59,32 @@ impl EntryStreamStage { ) -> Result<()> { let timeout = Duration::new(1, 0); let entries = ledger_entry_receiver.recv_timeout(timeout)?; + let leader_scheduler = entry_stream.leader_scheduler.read().unwrap(); for entry in &entries { + let slot = leader_scheduler.tick_height_to_slot(entry.tick_height); + let leader_id = leader_scheduler + .get_leader_for_slot(slot) + .map(|leader| leader.to_string()) + .unwrap_or_else(|| "None".to_string()); + if entry.is_tick() && entry_stream.queued_block.is_some() { let queued_block = entry_stream.queued_block.as_ref(); let block_tick_height = queued_block.unwrap().tick_height; let block_id = queued_block.unwrap().id; entry_stream - .emit_block_event(block_tick_height, block_id) + .emit_block_event(slot, &leader_id, block_tick_height, block_id) .unwrap_or_else(|e| { error!("Entry Stream error: {:?}, {:?}", e, entry_stream.output); }); entry_stream.queued_block = None; } - entry_stream.emit_entry_event(&entry).unwrap_or_else(|e| { - error!("Entry Stream error: {:?}, {:?}", e, entry_stream.output); - }); - if 0 == entry_stream - .leader_scheduler - .read() - .unwrap() - .num_ticks_left_in_slot(entry.tick_height) - { + entry_stream + .emit_entry_event(slot, &leader_id, &entry) + .unwrap_or_else(|e| { + error!("Entry Stream error: {:?}, {:?}", e, entry_stream.output); + }); + if 0 == leader_scheduler.num_ticks_left_in_slot(entry.tick_height) { entry_stream.queued_block = Some(EntryStreamBlock { tick_height: entry.tick_height, id: entry.id,