Switch to await-aware tokio::sync::Mutex

This commit is contained in:
Michael Vines
2022-04-11 08:21:21 -07:00
parent a2be810dbc
commit c1687b0604

View File

@ -11,7 +11,6 @@ use {
fs::{self, File}, fs::{self, File},
io::{self, BufReader}, io::{self, BufReader},
path::PathBuf, path::PathBuf,
sync::RwLock,
}, },
}; };
@ -214,7 +213,7 @@ impl TowerStorage for FileTowerStorage {
} }
pub struct EtcdTowerStorage { pub struct EtcdTowerStorage {
client: RwLock<etcd_client::Client>, client: tokio::sync::Mutex<etcd_client::Client>,
instance_id: [u8; 8], instance_id: [u8; 8],
runtime: tokio::runtime::Runtime, runtime: tokio::runtime::Runtime,
} }
@ -260,7 +259,7 @@ impl EtcdTowerStorage {
.map_err(Self::etdc_to_tower_error)?; .map_err(Self::etdc_to_tower_error)?;
Ok(Self { Ok(Self {
client: RwLock::new(client), client: tokio::sync::Mutex::new(client),
instance_id: solana_sdk::timing::timestamp().to_le_bytes(), instance_id: solana_sdk::timing::timestamp().to_le_bytes(),
runtime, runtime,
}) })
@ -280,7 +279,6 @@ impl EtcdTowerStorage {
impl TowerStorage for EtcdTowerStorage { impl TowerStorage for EtcdTowerStorage {
fn load(&self, node_pubkey: &Pubkey) -> Result<Tower> { fn load(&self, node_pubkey: &Pubkey) -> Result<Tower> {
let (instance_key, tower_key) = Self::get_keys(node_pubkey); let (instance_key, tower_key) = Self::get_keys(node_pubkey);
let mut client = self.client.write().unwrap();
let txn = etcd_client::Txn::new().and_then(vec![etcd_client::TxnOp::put( let txn = etcd_client::Txn::new().and_then(vec![etcd_client::TxnOp::put(
instance_key.clone(), instance_key.clone(),
@ -288,7 +286,7 @@ impl TowerStorage for EtcdTowerStorage {
None, None,
)]); )]);
self.runtime self.runtime
.block_on(async { client.txn(txn).await }) .block_on(async { self.client.lock().await.txn(txn).await })
.map_err(|err| { .map_err(|err| {
error!("Failed to acquire etcd instance lock: {}", err); error!("Failed to acquire etcd instance lock: {}", err);
Self::etdc_to_tower_error(err) Self::etdc_to_tower_error(err)
@ -304,7 +302,7 @@ impl TowerStorage for EtcdTowerStorage {
let response = self let response = self
.runtime .runtime
.block_on(async { client.txn(txn).await }) .block_on(async { self.client.lock().await.txn(txn).await })
.map_err(|err| { .map_err(|err| {
error!("Failed to read etcd saved tower: {}", err); error!("Failed to read etcd saved tower: {}", err);
Self::etdc_to_tower_error(err) Self::etdc_to_tower_error(err)
@ -336,7 +334,6 @@ impl TowerStorage for EtcdTowerStorage {
fn store(&self, saved_tower: &SavedTowerVersions) -> Result<()> { fn store(&self, saved_tower: &SavedTowerVersions) -> Result<()> {
let (instance_key, tower_key) = Self::get_keys(&saved_tower.pubkey()); let (instance_key, tower_key) = Self::get_keys(&saved_tower.pubkey());
let mut client = self.client.write().unwrap();
let txn = etcd_client::Txn::new() let txn = etcd_client::Txn::new()
.when(vec![etcd_client::Compare::value( .when(vec![etcd_client::Compare::value(
@ -352,7 +349,7 @@ impl TowerStorage for EtcdTowerStorage {
let response = self let response = self
.runtime .runtime
.block_on(async { client.txn(txn).await }) .block_on(async { self.client.lock().await.txn(txn).await })
.map_err(|err| { .map_err(|err| {
error!("Failed to write etcd saved tower: {}", err); error!("Failed to write etcd saved tower: {}", err);
err err