* Stream additional block metadata through plugin
blockhash, block_height, block_time, rewards are streamed
(cherry picked from commit f14928a970
)
Co-authored-by: Lijun Wang <83639177+lijunwangs@users.noreply.github.com>
This commit is contained in:
@ -2,6 +2,8 @@ use {
|
||||
crate::{
|
||||
accounts_update_notifier::AccountsUpdateNotifierImpl,
|
||||
accountsdb_plugin_manager::AccountsDbPluginManager,
|
||||
block_metadata_notifier::BlockMetadataNotifierImpl,
|
||||
block_metadata_notifier_interface::BlockMetadataNotifierLock,
|
||||
slot_status_notifier::SlotStatusNotifierImpl, slot_status_observer::SlotStatusObserver,
|
||||
transaction_notifier::TransactionNotifierImpl,
|
||||
},
|
||||
@ -50,6 +52,7 @@ pub struct AccountsDbPluginService {
|
||||
plugin_manager: Arc<RwLock<AccountsDbPluginManager>>,
|
||||
accounts_update_notifier: Option<AccountsUpdateNotifier>,
|
||||
transaction_notifier: Option<TransactionNotifierLock>,
|
||||
block_metadata_notifier: Option<BlockMetadataNotifierLock>,
|
||||
}
|
||||
|
||||
impl AccountsDbPluginService {
|
||||
@ -102,17 +105,24 @@ impl AccountsDbPluginService {
|
||||
None
|
||||
};
|
||||
|
||||
let slot_status_observer =
|
||||
if account_data_notifications_enabled || transaction_notifications_enabled {
|
||||
let slot_status_notifier = SlotStatusNotifierImpl::new(plugin_manager.clone());
|
||||
let slot_status_notifier = Arc::new(RwLock::new(slot_status_notifier));
|
||||
let (slot_status_observer, block_metadata_notifier): (
|
||||
Option<SlotStatusObserver>,
|
||||
Option<BlockMetadataNotifierLock>,
|
||||
) = if account_data_notifications_enabled || transaction_notifications_enabled {
|
||||
let slot_status_notifier = SlotStatusNotifierImpl::new(plugin_manager.clone());
|
||||
let slot_status_notifier = Arc::new(RwLock::new(slot_status_notifier));
|
||||
(
|
||||
Some(SlotStatusObserver::new(
|
||||
confirmed_bank_receiver,
|
||||
slot_status_notifier,
|
||||
))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
)),
|
||||
Some(Arc::new(RwLock::new(BlockMetadataNotifierImpl::new(
|
||||
plugin_manager.clone(),
|
||||
)))),
|
||||
)
|
||||
} else {
|
||||
(None, None)
|
||||
};
|
||||
|
||||
info!("Started AccountsDbPluginService");
|
||||
Ok(AccountsDbPluginService {
|
||||
@ -120,6 +130,7 @@ impl AccountsDbPluginService {
|
||||
plugin_manager,
|
||||
accounts_update_notifier,
|
||||
transaction_notifier,
|
||||
block_metadata_notifier,
|
||||
})
|
||||
}
|
||||
|
||||
@ -186,6 +197,10 @@ impl AccountsDbPluginService {
|
||||
self.transaction_notifier.clone()
|
||||
}
|
||||
|
||||
pub fn get_block_metadata_notifier(&self) -> Option<BlockMetadataNotifierLock> {
|
||||
self.block_metadata_notifier.clone()
|
||||
}
|
||||
|
||||
pub fn join(self) -> thread::Result<()> {
|
||||
if let Some(mut slot_status_observer) = self.slot_status_observer {
|
||||
slot_status_observer.join()?;
|
||||
|
105
accountsdb-plugin-manager/src/block_metadata_notifier.rs
Normal file
105
accountsdb-plugin-manager/src/block_metadata_notifier.rs
Normal file
@ -0,0 +1,105 @@
|
||||
use {
|
||||
crate::{
|
||||
accountsdb_plugin_manager::AccountsDbPluginManager,
|
||||
block_metadata_notifier_interface::BlockMetadataNotifier,
|
||||
},
|
||||
log::*,
|
||||
solana_accountsdb_plugin_interface::accountsdb_plugin_interface::{
|
||||
ReplicaBlockInfo, ReplicaBlockInfoVersions,
|
||||
},
|
||||
solana_measure::measure::Measure,
|
||||
solana_metrics::*,
|
||||
solana_runtime::bank::RewardInfo,
|
||||
solana_sdk::{clock::UnixTimestamp, pubkey::Pubkey},
|
||||
solana_transaction_status::{Reward, Rewards},
|
||||
std::sync::{Arc, RwLock},
|
||||
};
|
||||
|
||||
pub(crate) struct BlockMetadataNotifierImpl {
|
||||
plugin_manager: Arc<RwLock<AccountsDbPluginManager>>,
|
||||
}
|
||||
|
||||
impl BlockMetadataNotifier for BlockMetadataNotifierImpl {
|
||||
/// Notify the block metadata
|
||||
fn notify_block_metadata(
|
||||
&self,
|
||||
slot: u64,
|
||||
blockhash: &str,
|
||||
rewards: &RwLock<Vec<(Pubkey, RewardInfo)>>,
|
||||
block_time: Option<UnixTimestamp>,
|
||||
block_height: Option<u64>,
|
||||
) {
|
||||
let mut plugin_manager = self.plugin_manager.write().unwrap();
|
||||
if plugin_manager.plugins.is_empty() {
|
||||
return;
|
||||
}
|
||||
let rewards = Self::build_rewards(rewards);
|
||||
|
||||
for plugin in plugin_manager.plugins.iter_mut() {
|
||||
let mut measure = Measure::start("accountsdb-plugin-update-slot");
|
||||
let block_info =
|
||||
Self::build_replica_block_info(slot, blockhash, &rewards, block_time, block_height);
|
||||
let block_info = ReplicaBlockInfoVersions::V0_0_1(&block_info);
|
||||
match plugin.notify_block_metadata(block_info) {
|
||||
Err(err) => {
|
||||
error!(
|
||||
"Failed to update block metadata at slot {}, error: {} to plugin {}",
|
||||
slot,
|
||||
err,
|
||||
plugin.name()
|
||||
)
|
||||
}
|
||||
Ok(_) => {
|
||||
trace!(
|
||||
"Successfully updated block metadata at slot {} to plugin {}",
|
||||
slot,
|
||||
plugin.name()
|
||||
);
|
||||
}
|
||||
}
|
||||
measure.stop();
|
||||
inc_new_counter_debug!(
|
||||
"accountsdb-plugin-update-block-metadata-us",
|
||||
measure.as_us() as usize,
|
||||
1000,
|
||||
1000
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl BlockMetadataNotifierImpl {
|
||||
fn build_rewards(rewards: &RwLock<Vec<(Pubkey, RewardInfo)>>) -> Rewards {
|
||||
let rewards = rewards.read().unwrap();
|
||||
rewards
|
||||
.iter()
|
||||
.map(|(pubkey, reward)| Reward {
|
||||
pubkey: pubkey.to_string(),
|
||||
lamports: reward.lamports,
|
||||
post_balance: reward.post_balance,
|
||||
reward_type: Some(reward.reward_type),
|
||||
commission: reward.commission,
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn build_replica_block_info<'a>(
|
||||
slot: u64,
|
||||
blockhash: &'a str,
|
||||
rewards: &'a [Reward],
|
||||
block_time: Option<UnixTimestamp>,
|
||||
block_height: Option<u64>,
|
||||
) -> ReplicaBlockInfo<'a> {
|
||||
ReplicaBlockInfo {
|
||||
slot,
|
||||
blockhash,
|
||||
rewards,
|
||||
block_time,
|
||||
block_height,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new(plugin_manager: Arc<RwLock<AccountsDbPluginManager>>) -> Self {
|
||||
Self { plugin_manager }
|
||||
}
|
||||
}
|
@ -0,0 +1,20 @@
|
||||
use {
|
||||
solana_runtime::bank::RewardInfo,
|
||||
solana_sdk::{clock::UnixTimestamp, pubkey::Pubkey},
|
||||
std::sync::{Arc, RwLock},
|
||||
};
|
||||
|
||||
/// Interface for notifying block metadata changes
|
||||
pub trait BlockMetadataNotifier {
|
||||
/// Notify the block metadata
|
||||
fn notify_block_metadata(
|
||||
&self,
|
||||
slot: u64,
|
||||
blockhash: &str,
|
||||
rewards: &RwLock<Vec<(Pubkey, RewardInfo)>>,
|
||||
block_time: Option<UnixTimestamp>,
|
||||
block_height: Option<u64>,
|
||||
);
|
||||
}
|
||||
|
||||
pub type BlockMetadataNotifierLock = Arc<RwLock<dyn BlockMetadataNotifier + Sync + Send>>;
|
@ -1,6 +1,8 @@
|
||||
pub mod accounts_update_notifier;
|
||||
pub mod accountsdb_plugin_manager;
|
||||
pub mod accountsdb_plugin_service;
|
||||
pub mod block_metadata_notifier;
|
||||
pub mod block_metadata_notifier_interface;
|
||||
pub mod slot_status_notifier;
|
||||
pub mod slot_status_observer;
|
||||
pub mod transaction_notifier;
|
||||
|
Reference in New Issue
Block a user