Stream RPC snapshot downloads
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@ -3991,6 +3991,7 @@ dependencies = [
|
|||||||
"bs58",
|
"bs58",
|
||||||
"bv",
|
"bv",
|
||||||
"byteorder",
|
"byteorder",
|
||||||
|
"bytes 0.4.12",
|
||||||
"chrono",
|
"chrono",
|
||||||
"core_affinity",
|
"core_affinity",
|
||||||
"crossbeam-channel",
|
"crossbeam-channel",
|
||||||
@ -4056,6 +4057,7 @@ dependencies = [
|
|||||||
"thiserror",
|
"thiserror",
|
||||||
"tokio 0.1.22",
|
"tokio 0.1.22",
|
||||||
"tokio 0.2.22",
|
"tokio 0.2.22",
|
||||||
|
"tokio-codec",
|
||||||
"tokio-fs",
|
"tokio-fs",
|
||||||
"tokio-io",
|
"tokio-io",
|
||||||
"trees",
|
"trees",
|
||||||
|
@ -78,8 +78,10 @@ tempfile = "3.1.0"
|
|||||||
thiserror = "1.0"
|
thiserror = "1.0"
|
||||||
tokio = { version = "0.2", features = ["full"] }
|
tokio = { version = "0.2", features = ["full"] }
|
||||||
tokio_01 = { version = "0.1", package = "tokio" }
|
tokio_01 = { version = "0.1", package = "tokio" }
|
||||||
|
tokio_01_bytes = { version = "0.4.7", package = "bytes" }
|
||||||
tokio_fs_01 = { version = "0.1", package = "tokio-fs" }
|
tokio_fs_01 = { version = "0.1", package = "tokio-fs" }
|
||||||
tokio_io_01 = { version = "0.1", package = "tokio-io" }
|
tokio_io_01 = { version = "0.1", package = "tokio-io" }
|
||||||
|
tokio_codec_01 = { version = "0.1", package = "tokio-codec" }
|
||||||
solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "1.6.0" }
|
solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "1.6.0" }
|
||||||
trees = "0.2.1"
|
trees = "0.2.1"
|
||||||
|
|
||||||
|
@ -17,6 +17,7 @@ use jsonrpc_http_server::{
|
|||||||
};
|
};
|
||||||
use regex::Regex;
|
use regex::Regex;
|
||||||
use solana_ledger::blockstore::Blockstore;
|
use solana_ledger::blockstore::Blockstore;
|
||||||
|
use solana_metrics::inc_new_counter_info;
|
||||||
use solana_runtime::{
|
use solana_runtime::{
|
||||||
bank_forks::{BankForks, SnapshotConfig},
|
bank_forks::{BankForks, SnapshotConfig},
|
||||||
commitment::BlockCommitmentCache,
|
commitment::BlockCommitmentCache,
|
||||||
@ -61,7 +62,7 @@ impl RpcRequestMiddleware {
|
|||||||
Self {
|
Self {
|
||||||
ledger_path,
|
ledger_path,
|
||||||
snapshot_archive_path_regex: Regex::new(
|
snapshot_archive_path_regex: Regex::new(
|
||||||
r"/snapshot-\d+-[[:alnum:]]+\.tar\.(bz2|zst|gz)$",
|
r"/snapshot-\d+-[[:alnum:]]+\.(tar|tar\.bz2|tar\.zst|tar\.gz)$",
|
||||||
)
|
)
|
||||||
.unwrap(),
|
.unwrap(),
|
||||||
snapshot_config,
|
snapshot_config,
|
||||||
@ -85,6 +86,7 @@ impl RpcRequestMiddleware {
|
|||||||
.unwrap()
|
.unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(dead_code)]
|
||||||
fn internal_server_error() -> hyper::Response<hyper::Body> {
|
fn internal_server_error() -> hyper::Response<hyper::Body> {
|
||||||
hyper::Response::builder()
|
hyper::Response::builder()
|
||||||
.status(hyper::StatusCode::INTERNAL_SERVER_ERROR)
|
.status(hyper::StatusCode::INTERNAL_SERVER_ERROR)
|
||||||
@ -112,13 +114,18 @@ impl RpcRequestMiddleware {
|
|||||||
let stem = path.split_at(1).1; // Drop leading '/' from path
|
let stem = path.split_at(1).1; // Drop leading '/' from path
|
||||||
let filename = {
|
let filename = {
|
||||||
match path {
|
match path {
|
||||||
"/genesis.tar.bz2" => self.ledger_path.join(stem),
|
"/genesis.tar.bz2" => {
|
||||||
_ => self
|
inc_new_counter_info!("rpc-get_genesis", 1);
|
||||||
.snapshot_config
|
self.ledger_path.join(stem)
|
||||||
.as_ref()
|
}
|
||||||
.unwrap()
|
_ => {
|
||||||
.snapshot_package_output_path
|
inc_new_counter_info!("rpc-get_snapshot", 1);
|
||||||
.join(stem),
|
self.snapshot_config
|
||||||
|
.as_ref()
|
||||||
|
.unwrap()
|
||||||
|
.snapshot_package_output_path
|
||||||
|
.join(stem)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -129,10 +136,13 @@ impl RpcRequestMiddleware {
|
|||||||
response: Box::new(
|
response: Box::new(
|
||||||
tokio_fs_01::file::File::open(filename)
|
tokio_fs_01::file::File::open(filename)
|
||||||
.and_then(|file| {
|
.and_then(|file| {
|
||||||
let buf: Vec<u8> = Vec::new();
|
use tokio_codec_01::{BytesCodec, FramedRead};
|
||||||
tokio_io_01::io::read_to_end(file, buf)
|
|
||||||
.and_then(|item| Ok(hyper::Response::new(item.1.into())))
|
let stream = FramedRead::new(file, BytesCodec::new())
|
||||||
.or_else(|_| Ok(RpcRequestMiddleware::internal_server_error()))
|
.map(tokio_01_bytes::BytesMut::freeze);
|
||||||
|
let body = hyper::Body::wrap_stream(stream);
|
||||||
|
|
||||||
|
Ok(hyper::Response::new(body))
|
||||||
})
|
})
|
||||||
.or_else(|_| Ok(RpcRequestMiddleware::not_found())),
|
.or_else(|_| Ok(RpcRequestMiddleware::not_found())),
|
||||||
),
|
),
|
||||||
@ -530,6 +540,13 @@ mod tests {
|
|||||||
assert!(rrm_with_snapshot_config.is_file_get_path(
|
assert!(rrm_with_snapshot_config.is_file_get_path(
|
||||||
"/snapshot-100-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.bz2"
|
"/snapshot-100-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.bz2"
|
||||||
));
|
));
|
||||||
|
assert!(rrm_with_snapshot_config.is_file_get_path(
|
||||||
|
"/snapshot-100-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.zst"
|
||||||
|
));
|
||||||
|
assert!(rrm_with_snapshot_config
|
||||||
|
.is_file_get_path("/snapshot-100-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.gz"));
|
||||||
|
assert!(rrm_with_snapshot_config
|
||||||
|
.is_file_get_path("/snapshot-100-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar"));
|
||||||
|
|
||||||
assert!(!rrm.is_file_get_path(
|
assert!(!rrm.is_file_get_path(
|
||||||
"/snapshot-notaslotnumber-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.bz2"
|
"/snapshot-notaslotnumber-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.bz2"
|
||||||
|
Reference in New Issue
Block a user