From 03e6a56b3c2f3acf05afef3790b7497394b2a370 Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Wed, 30 Jan 2019 11:12:31 -0700 Subject: [PATCH] Add datetime to EntryStream message --- src/entry_stream.rs | 11 +++++++---- src/replay_stage.rs | 17 +++++++++++++++-- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/src/entry_stream.rs b/src/entry_stream.rs index 73150f66c1..57500dcccc 100644 --- a/src/entry_stream.rs +++ b/src/entry_stream.rs @@ -4,6 +4,7 @@ use crate::entry::Entry; use crate::result::Result; +use chrono::Utc; use std::io::prelude::*; use std::net::Shutdown; use std::os::unix::net::UnixStream; @@ -27,8 +28,9 @@ impl EntryStreamHandler for EntryStream { fn stream_entries(&mut self, entries: &[Entry]) -> Result<()> { let mut socket = UnixStream::connect(Path::new(&self.socket))?; for entry in entries { - let result = serde_json::to_string(&entry)?; - socket.write_all(result.as_bytes())?; + let json = serde_json::to_string(&entry)?; + let payload = format!(r#"{{"dt":"{}","entry":{}}}"#, Utc::now().to_rfc3339(), json); + socket.write_all(payload.as_bytes())?; } socket.shutdown(Shutdown::Write)?; Ok(()) @@ -49,8 +51,9 @@ impl MockEntryStream { impl EntryStreamHandler for MockEntryStream { fn stream_entries(&mut self, entries: &[Entry]) -> Result<()> { for entry in entries { - let result = serde_json::to_string(&entry)?; - self.socket.push(result); + let json = serde_json::to_string(&entry)?; + let payload = format!(r#"{{"dt":"{}","entry":{}}}"#, Utc::now().to_rfc3339(), json); + self.socket.push(payload); } Ok(()) } diff --git a/src/replay_stage.rs b/src/replay_stage.rs index 0beb90af27..51532c95ea 100644 --- a/src/replay_stage.rs +++ b/src/replay_stage.rs @@ -294,6 +294,8 @@ mod test { use crate::service::Service; use crate::tvu::TvuReturnType; use crate::voting_keypair::VotingKeypair; + use chrono::{DateTime, FixedOffset}; + use serde_json::Value; use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil}; use std::fs::remove_dir_all; @@ -740,7 +742,7 @@ mod test { for _ in 0..5 { let entry = Entry::new(&mut last_id, 0, 1, vec![]); //just ticks last_id = entry.id; - expected_entries.push(serde_json::to_string(&entry).unwrap()); + expected_entries.push(entry.clone()); entries.push(entry); } entry_sender @@ -763,6 +765,17 @@ mod test { .unwrap(); assert_eq!(entry_stream.socket.len(), 5); - assert_eq!(entry_stream.socket, expected_entries); + + for (i, item) in entry_stream.socket.iter().enumerate() { + let json: Value = serde_json::from_str(&item).unwrap(); + let dt_str = json["dt"].as_str().unwrap(); + + // Ensure `ts` field parses as valid DateTime + let _dt: DateTime = DateTime::parse_from_rfc3339(dt_str).unwrap(); + + let entry_obj = json["entry"].clone(); + let entry: Entry = serde_json::from_value(entry_obj).unwrap(); + assert_eq!(entry, expected_entries[i]); + } } }