TransactionRecorder uses unique channel so we can use Recv instead of RecvTimeout (#16195) (#16485)

* time

* new channel each call

* new channel every time

(cherry picked from commit 5eff23db0c)

Co-authored-by: Jeff Washington (jwash) <75863576+jeffwashington@users.noreply.github.com>
This commit is contained in:
mergify[bot]
2021-04-12 18:56:22 +00:00
committed by GitHub
parent 6c8bbdca0a
commit d9944c8ae3

View File

@ -77,9 +77,6 @@ impl Record {
pub struct TransactionRecorder { pub struct TransactionRecorder {
// shared by all users of PohRecorder // shared by all users of PohRecorder
pub record_sender: Sender<Record>, pub record_sender: Sender<Record>,
// unique to this caller
pub result_sender: Sender<Result<()>>,
pub result_receiver: Receiver<Result<()>>,
} }
impl Clone for TransactionRecorder { impl Clone for TransactionRecorder {
@ -90,13 +87,9 @@ impl Clone for TransactionRecorder {
impl TransactionRecorder { impl TransactionRecorder {
pub fn new(record_sender: Sender<Record>) -> Self { pub fn new(record_sender: Sender<Record>) -> Self {
let (result_sender, result_receiver) = channel();
Self { Self {
// shared // shared
record_sender, record_sender,
// unique to this caller
result_sender,
result_receiver,
} }
} }
pub fn record( pub fn record(
@ -105,21 +98,18 @@ impl TransactionRecorder {
mixin: Hash, mixin: Hash,
transactions: Vec<Transaction>, transactions: Vec<Transaction>,
) -> Result<()> { ) -> Result<()> {
let res = self.record_sender.send(Record::new( // create a new channel so that there is only 1 sender and when it goes out of scope, the receiver fails
mixin, let (result_sender, result_receiver) = channel();
transactions, let res =
bank_slot, self.record_sender
self.result_sender.clone(), .send(Record::new(mixin, transactions, bank_slot, result_sender));
));
if res.is_err() { if res.is_err() {
// If the channel is dropped, then the validator is shutting down so return that we are hitting // If the channel is dropped, then the validator is shutting down so return that we are hitting
// the max tick height to stop transaction processing and flush any transactions in the pipeline. // the max tick height to stop transaction processing and flush any transactions in the pipeline.
return Err(PohRecorderError::MaxHeightReached); return Err(PohRecorderError::MaxHeightReached);
} }
// Besides validator exit, this timeout should primarily be seen to affect test execution environments where the various pieces can be shutdown abruptly // Besides validator exit, this timeout should primarily be seen to affect test execution environments where the various pieces can be shutdown abruptly
let res = self let res = result_receiver.recv();
.result_receiver
.recv_timeout(std::time::Duration::from_millis(5000));
match res { match res {
Err(_err) => Err(PohRecorderError::MaxHeightReached), Err(_err) => Err(PohRecorderError::MaxHeightReached),
Ok(result) => result, Ok(result) => result,