From bc162637a638f80d89da7b46b05aad5f167d6a6a Mon Sep 17 00:00:00 2001 From: Carl Date: Mon, 11 Feb 2019 17:30:46 -0800 Subject: [PATCH] Add is_last_blob flag to blob to signal the end of a slot --- src/broadcast_service.rs | 8 ++++++-- src/packet.rs | 11 +++++++++++ tests/multinode.rs | 7 +++++-- 3 files changed, 22 insertions(+), 4 deletions(-) diff --git a/src/broadcast_service.rs b/src/broadcast_service.rs index 2c85c05638..ef7e8c13c1 100644 --- a/src/broadcast_service.rs +++ b/src/broadcast_service.rs @@ -60,7 +60,7 @@ impl Broadcast { num_entries += entries.len(); ventries.push(entries); } - let last_tick = { + let contains_last_tick = { if let Some(Some(last)) = ventries.last().map(|entries| entries.last()) { last.tick_height == self.max_tick_height } else { @@ -90,10 +90,14 @@ impl Broadcast { inc_new_counter_info!("streamer-broadcast-sent", blobs.len()); + if contains_last_tick { + blobs.last().unwrap().write().unwrap().set_is_last_blob(); + } + blocktree.write_shared_blobs(&blobs)?; // Send out data - ClusterInfo::broadcast(&self.id, last_tick, &broadcast_table, sock, &blobs)?; + ClusterInfo::broadcast(&self.id, contains_last_tick, &broadcast_table, sock, &blobs)?; // Fill in the coding blob data from the window data blobs #[cfg(feature = "erasure")] diff --git a/src/packet.rs b/src/packet.rs index 10790dddf6..80196634b9 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -292,6 +292,8 @@ macro_rules! align { pub const BLOB_HEADER_SIZE: usize = align!(SIZE_RANGE.end, 8); +pub const BLOB_FLAG_IS_LAST: u32 = 0x2; + pub const BLOB_FLAG_IS_CODING: u32 = 0x1; impl Blob { @@ -350,6 +352,15 @@ impl Blob { self.set_flags(flags | BLOB_FLAG_IS_CODING); } + pub fn set_is_last_blob(&mut self) { + let flags = self.flags(); + self.set_flags(flags | BLOB_FLAG_IS_LAST); + } + + pub fn is_last_blob(&self) -> bool { + (self.flags() & BLOB_FLAG_IS_LAST) != 0 + } + pub fn data_size(&self) -> u64 { LittleEndian::read_u64(&self.data[SIZE_RANGE]) } diff --git a/tests/multinode.rs b/tests/multinode.rs index b27cea59dc..02528cc55a 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -1751,13 +1751,14 @@ fn test_fullnode_rotate(ticks_per_slot: u64, slots_per_epoch: u64) { let leader_pubkey = leader_keypair.pubkey().clone(); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_info = leader.info.clone(); - let (_mint, leader_ledger_path, _last_entry_height, _last_id, last_entry_id) = + let (_mint, leader_ledger_path, _tick_height, _last_entry_height, _last_id, last_entry_id) = create_tmp_sample_ledger( "fullnode_transact_while_rotating_fast", 1_000_000_000_000_000_000, 0, leader_pubkey, 123, + ticks_per_slot, ); info!("ledger is {}", leader_ledger_path); @@ -1779,7 +1780,9 @@ fn test_fullnode_rotate(ticks_per_slot: u64, slots_per_epoch: u64) { fullnode_config.ledger_config(), ) .unwrap(); - blocktree.write_entries(1, 0, &entries).unwrap(); + blocktree + .write_entries(1, 0, ticks_per_slot, 0, &entries) + .unwrap(); tick_height_of_next_rotation += 1; }