Add quic port for accepting transactions (#22753) (#22937)

using quinn library

streamer: Sign TLS cert with validator identity key

Handle multiple incoming chunks

(cherry picked from commit 5a230f418d)

Co-authored-by: sakridge <sakridge@gmail.com>
This commit is contained in:
mergify[bot]
2022-02-04 20:53:27 +00:00
committed by GitHub
parent 2605724aa3
commit c43cef79b5
9 changed files with 680 additions and 45 deletions

240
Cargo.lock generated
View File

@ -251,6 +251,12 @@ version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd"
[[package]]
name = "base64ct"
version = "1.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "874f8444adcb4952a8bc51305c8be95c8ec8237bb0d2e78d2e039f771f8828a0"
[[package]] [[package]]
name = "bincode" name = "bincode"
version = "1.3.3" version = "1.3.3"
@ -772,6 +778,12 @@ dependencies = [
"web-sys", "web-sys",
] ]
[[package]]
name = "const-oid"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e4c78c047431fee22c1a7bb92e00ad095a02a983affe4d8a72e2a2c62c1b94f3"
[[package]] [[package]]
name = "const_fn" name = "const_fn"
version = "0.4.8" version = "0.4.8"
@ -1029,6 +1041,15 @@ dependencies = [
"rayon", "rayon",
] ]
[[package]]
name = "der"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6919815d73839e7ad218de758883aae3a257ba6759ce7a9992501efbb53d705c"
dependencies = [
"const-oid",
]
[[package]] [[package]]
name = "derivation-path" name = "derivation-path"
version = "0.1.3" version = "0.1.3"
@ -1522,9 +1543,9 @@ dependencies = [
[[package]] [[package]]
name = "futures-channel" name = "futures-channel"
version = "0.3.18" version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fc8cd39e3dbf865f7340dce6a2d401d24fd37c6fe6c4f0ee0de8bfca2252d27" checksum = "ba3dda0b6588335f360afc675d0564c17a77a2bda81ca178a4b6081bd86c7f0b"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"futures-sink", "futures-sink",
@ -1532,9 +1553,9 @@ dependencies = [
[[package]] [[package]]
name = "futures-core" name = "futures-core"
version = "0.3.18" version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "629316e42fe7c2a0b9a65b47d159ceaa5453ab14e8f0a3c5eedbb8cd55b4a445" checksum = "d0c8ff0461b82559810cdccfde3215c3f373807f5e5232b71479bff7bb2583d7"
[[package]] [[package]]
name = "futures-executor" name = "futures-executor"
@ -1550,15 +1571,15 @@ dependencies = [
[[package]] [[package]]
name = "futures-io" name = "futures-io"
version = "0.3.18" version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e481354db6b5c353246ccf6a728b0c5511d752c08da7260546fc0933869daa11" checksum = "b1f9d34af5a1aac6fb380f735fe510746c38067c5bf16c7fd250280503c971b2"
[[package]] [[package]]
name = "futures-macro" name = "futures-macro"
version = "0.3.18" version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a89f17b21645bc4ed773c69af9c9a0effd4a3f1a3876eadd453469f8854e7fdd" checksum = "6dbd947adfffb0efc70599b3ddcf7b5597bb5fa9e245eb99f62b3a5f7bb8bd3c"
dependencies = [ dependencies = [
"proc-macro2 1.0.32", "proc-macro2 1.0.32",
"quote 1.0.10", "quote 1.0.10",
@ -1567,21 +1588,21 @@ dependencies = [
[[package]] [[package]]
name = "futures-sink" name = "futures-sink"
version = "0.3.18" version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "996c6442437b62d21a32cd9906f9c41e7dc1e19a9579843fad948696769305af" checksum = "e3055baccb68d74ff6480350f8d6eb8fcfa3aa11bdc1a1ae3afdd0514617d508"
[[package]] [[package]]
name = "futures-task" name = "futures-task"
version = "0.3.18" version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dabf1872aaab32c886832f2276d2f5399887e2bd613698a02359e4ea83f8de12" checksum = "6ee7c6485c30167ce4dfb83ac568a849fe53274c831081476ee13e0dce1aad72"
[[package]] [[package]]
name = "futures-util" name = "futures-util"
version = "0.3.18" version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41d22213122356472061ac0f1ab2cee28d2bac8491410fd68c2af53d1cedb83e" checksum = "d9b5cf40b47a271f77a8b1bec03ca09044d99d2372c0de244e66430761127164"
dependencies = [ dependencies = [
"futures 0.1.31", "futures 0.1.31",
"futures-channel", "futures-channel",
@ -1596,6 +1617,15 @@ dependencies = [
"slab", "slab",
] ]
[[package]]
name = "fxhash"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c"
dependencies = [
"byteorder",
]
[[package]] [[package]]
name = "gag" name = "gag"
version = "1.0.0" version = "1.0.0"
@ -2071,9 +2101,9 @@ dependencies = [
[[package]] [[package]]
name = "itertools" name = "itertools"
version = "0.10.1" version = "0.10.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69ddb889f9d0d08a67338271fa9b62996bc788c7796a5c18cf057420aaed5eaf" checksum = "a9a9d19fa1e79b6215ff29b9d6880b706147f16e9b1dbb1e4e5947b5b02bc5e3"
dependencies = [ dependencies = [
"either", "either",
] ]
@ -2274,9 +2304,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.108" version = "0.2.117"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8521a1b57e76b1ec69af7599e75e38e7b7fad6610f037db8c79b127201b5d119" checksum = "e74d72e0f9b65b5b4ca49a346af3976df0f9c61d550727f349ecd559f251a26c"
[[package]] [[package]]
name = "libloading" name = "libloading"
@ -2613,9 +2643,9 @@ dependencies = [
[[package]] [[package]]
name = "nix" name = "nix"
version = "0.23.0" version = "0.23.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f305c2c2e4c39a82f7bf0bf65fb557f9070ce06781d4f2454295cc34b1c43188" checksum = "9f866317acbd3a240710c63f065ffb1e4fd466259045ccb504130b7f668f35c6"
dependencies = [ dependencies = [
"bitflags", "bitflags",
"cc", "cc",
@ -2962,6 +2992,15 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099"
[[package]]
name = "pem"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e9a3b09a20e374558580a4914d3b7d89bd61b954a5a5e1dcbea98753addb1947"
dependencies = [
"base64 0.13.0",
]
[[package]] [[package]]
name = "percent-encoding" name = "percent-encoding"
version = "1.0.1" version = "1.0.1"
@ -3056,6 +3095,17 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pkcs8"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7cabda3fb821068a9a4fab19a683eac3af12edf0f34b94a8be53c4972b8149d0"
dependencies = [
"der",
"spki",
"zeroize",
]
[[package]] [[package]]
name = "pkg-config" name = "pkg-config"
version = "0.3.22" version = "0.3.22"
@ -3137,7 +3187,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c6ce811d0b2e103743eec01db1c50612221f173084ce2f7941053e94b6bb474" checksum = "5c6ce811d0b2e103743eec01db1c50612221f173084ce2f7941053e94b6bb474"
dependencies = [ dependencies = [
"difflib", "difflib",
"itertools 0.10.1", "itertools 0.10.3",
"predicates-core", "predicates-core",
] ]
@ -3268,7 +3318,7 @@ checksum = "62941722fb675d463659e49c4f3fe1fe792ff24fe5bbaa9c08cd3b98a1c354f5"
dependencies = [ dependencies = [
"bytes 1.1.0", "bytes 1.1.0",
"heck", "heck",
"itertools 0.10.1", "itertools 0.10.3",
"lazy_static", "lazy_static",
"log 0.4.14", "log 0.4.14",
"multimap", "multimap",
@ -3287,7 +3337,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9cc1a3263e07e0bf68e96268f37665207b49560d98739662cdfaae215c720fe" checksum = "f9cc1a3263e07e0bf68e96268f37665207b49560d98739662cdfaae215c720fe"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"itertools 0.10.1", "itertools 0.10.3",
"proc-macro2 1.0.32", "proc-macro2 1.0.32",
"quote 1.0.10", "quote 1.0.10",
"syn 1.0.81", "syn 1.0.81",
@ -3324,6 +3374,60 @@ version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a993555f31e5a609f617c12db6250dedcac1b0a85076912c436e6fc9b2c8e6a3" checksum = "a993555f31e5a609f617c12db6250dedcac1b0a85076912c436e6fc9b2c8e6a3"
[[package]]
name = "quinn"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61a84d97630b137463c8e6802adc1dfe9de81457b41bb1ac59189e6761ab9255"
dependencies = [
"bytes 1.1.0",
"futures-channel",
"futures-util",
"fxhash",
"quinn-proto",
"quinn-udp",
"rustls 0.20.2",
"thiserror",
"tokio",
"tracing",
"webpki 0.22.0",
]
[[package]]
name = "quinn-proto"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "063dedf7983c8d57db474218f258daa85b627de6f2dbc458b690a93b1de790e8"
dependencies = [
"bytes 1.1.0",
"fxhash",
"rand 0.8.4",
"ring",
"rustls 0.20.2",
"rustls-native-certs",
"rustls-pemfile",
"slab",
"thiserror",
"tinyvec",
"tracing",
"webpki 0.22.0",
]
[[package]]
name = "quinn-udp"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f7996776e9ee3fc0e5c14476c1a640a17e993c847ae9c81191c2c102fbef903"
dependencies = [
"futures-util",
"libc",
"mio 0.7.14",
"quinn-proto",
"socket2",
"tokio",
"tracing",
]
[[package]] [[package]]
name = "quote" name = "quote"
version = "0.6.13" version = "0.6.13"
@ -3613,6 +3717,18 @@ dependencies = [
"time 0.3.5", "time 0.3.5",
] ]
[[package]]
name = "rcgen"
version = "0.8.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5911d1403f4143c9d56a702069d593e8d0f3fab880a85e103604d0893ea31ba7"
dependencies = [
"chrono",
"pem",
"ring",
"yasna",
]
[[package]] [[package]]
name = "rdrand" name = "rdrand"
version = "0.4.0" version = "0.4.0"
@ -3827,9 +3943,9 @@ dependencies = [
[[package]] [[package]]
name = "rustls" name = "rustls"
version = "0.20.0" version = "0.20.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b5ac6078ca424dc1d3ae2328526a76787fecc7f8011f520e3276730e711fc95" checksum = "d37e5e2290f3e040b594b1a9e04377c2c671f1a1cfd9bfdef82106ac1c113f84"
dependencies = [ dependencies = [
"log 0.4.14", "log 0.4.14",
"ring", "ring",
@ -3837,6 +3953,27 @@ dependencies = [
"webpki 0.22.0", "webpki 0.22.0",
] ]
[[package]]
name = "rustls-native-certs"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ca9ebdfa27d3fc180e42879037b5338ab1c040c06affd00d8338598e7800943"
dependencies = [
"openssl-probe",
"rustls-pemfile",
"schannel",
"security-framework",
]
[[package]]
name = "rustls-pemfile"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5eebeaeb360c87bfb72e84abdb3447159c0eaececf1bef2aecd65a8be949d1c9"
dependencies = [
"base64 0.13.0",
]
[[package]] [[package]]
name = "rustversion" name = "rustversion"
version = "1.0.5" version = "1.0.5"
@ -4775,7 +4912,7 @@ dependencies = [
"etcd-client", "etcd-client",
"fs_extra", "fs_extra",
"histogram", "histogram",
"itertools 0.10.1", "itertools 0.10.3",
"jsonrpc-core", "jsonrpc-core",
"jsonrpc-core-client", "jsonrpc-core-client",
"jsonrpc-derive", "jsonrpc-derive",
@ -5002,7 +5139,7 @@ dependencies = [
"clap 2.33.3", "clap 2.33.3",
"flate2", "flate2",
"indexmap", "indexmap",
"itertools 0.10.1", "itertools 0.10.3",
"log 0.4.14", "log 0.4.14",
"lru", "lru",
"matches", "matches",
@ -5097,7 +5234,7 @@ dependencies = [
"crossbeam-channel", "crossbeam-channel",
"fs_extra", "fs_extra",
"futures 0.3.18", "futures 0.3.18",
"itertools 0.10.1", "itertools 0.10.3",
"lazy_static", "lazy_static",
"libc", "libc",
"log 0.4.14", "log 0.4.14",
@ -5150,7 +5287,7 @@ dependencies = [
"csv", "csv",
"dashmap", "dashmap",
"histogram", "histogram",
"itertools 0.10.1", "itertools 0.10.3",
"log 0.4.14", "log 0.4.14",
"regex", "regex",
"serde", "serde",
@ -5183,7 +5320,7 @@ dependencies = [
"crossbeam-channel", "crossbeam-channel",
"fs_extra", "fs_extra",
"gag", "gag",
"itertools 0.10.1", "itertools 0.10.3",
"log 0.4.14", "log 0.4.14",
"rand 0.7.3", "rand 0.7.3",
"rayon", "rayon",
@ -5443,7 +5580,7 @@ dependencies = [
"console_log", "console_log",
"curve25519-dalek 3.2.0", "curve25519-dalek 3.2.0",
"getrandom 0.1.16", "getrandom 0.1.16",
"itertools 0.10.1", "itertools 0.10.3",
"js-sys", "js-sys",
"lazy_static", "lazy_static",
"libsecp256k1 0.6.0", "libsecp256k1 0.6.0",
@ -5475,7 +5612,7 @@ version = "1.9.6"
dependencies = [ dependencies = [
"base64 0.13.0", "base64 0.13.0",
"bincode", "bincode",
"itertools 0.10.1", "itertools 0.10.3",
"libc", "libc",
"libloading", "libloading",
"log 0.4.14", "log 0.4.14",
@ -5596,7 +5733,7 @@ dependencies = [
"bs58 0.4.0", "bs58 0.4.0",
"crossbeam-channel", "crossbeam-channel",
"dashmap", "dashmap",
"itertools 0.10.1", "itertools 0.10.3",
"jsonrpc-core", "jsonrpc-core",
"jsonrpc-core-client", "jsonrpc-core-client",
"jsonrpc-derive", "jsonrpc-derive",
@ -5683,7 +5820,7 @@ dependencies = [
"flate2", "flate2",
"fnv", "fnv",
"index_list", "index_list",
"itertools 0.10.1", "itertools 0.10.3",
"lazy_static", "lazy_static",
"libsecp256k1 0.6.0", "libsecp256k1 0.6.0",
"log 0.4.14", "log 0.4.14",
@ -5742,7 +5879,7 @@ dependencies = [
"ed25519-dalek-bip32", "ed25519-dalek-bip32",
"generic-array 0.14.4", "generic-array 0.14.4",
"hmac 0.11.0", "hmac 0.11.0",
"itertools 0.10.1", "itertools 0.10.3",
"js-sys", "js-sys",
"lazy_static", "lazy_static",
"libsecp256k1 0.6.0", "libsecp256k1 0.6.0",
@ -5901,15 +6038,25 @@ dependencies = [
name = "solana-streamer" name = "solana-streamer"
version = "1.9.6" version = "1.9.6"
dependencies = [ dependencies = [
"itertools 0.10.1", "crossbeam-channel",
"futures-util",
"histogram",
"itertools 0.10.3",
"libc", "libc",
"log 0.4.14", "log 0.4.14",
"nix", "nix",
"pem",
"pkcs8",
"quinn",
"rand 0.7.3",
"rcgen",
"rustls 0.20.2",
"solana-logger 1.9.6", "solana-logger 1.9.6",
"solana-metrics", "solana-metrics",
"solana-perf", "solana-perf",
"solana-sdk", "solana-sdk",
"thiserror", "thiserror",
"tokio",
] ]
[[package]] [[package]]
@ -6170,6 +6317,16 @@ version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "511254be0c5bcf062b019a6c89c01a664aa359ded62f78aa72c6fc137c0590e5" checksum = "511254be0c5bcf062b019a6c89c01a664aa359ded62f78aa72c6fc137c0590e5"
[[package]]
name = "spki"
version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44d01ac02a6ccf3e07db148d2be087da624fea0221a16152ed01f0496a6b0a27"
dependencies = [
"base64ct",
"der",
]
[[package]] [[package]]
name = "spl-associated-token-account" name = "spl-associated-token-account"
version = "1.0.3" version = "1.0.3"
@ -7042,7 +7199,7 @@ dependencies = [
"httparse", "httparse",
"log 0.4.14", "log 0.4.14",
"rand 0.8.4", "rand 0.8.4",
"rustls 0.20.0", "rustls 0.20.2",
"sha-1 0.9.8", "sha-1 0.9.8",
"thiserror", "thiserror",
"url 2.2.2", "url 2.2.2",
@ -7581,6 +7738,15 @@ dependencies = [
"linked-hash-map", "linked-hash-map",
] ]
[[package]]
name = "yasna"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e262a29d0e61ccf2b6190d7050d4b237535fc76ce4c1210d9caa316f71dffa75"
dependencies = [
"chrono",
]
[[package]] [[package]]
name = "zeroize" name = "zeroize"
version = "1.4.2" version = "1.4.2"

View File

@ -26,6 +26,7 @@ use {
cost_model::CostModel, cost_model::CostModel,
vote_sender_types::{ReplayVoteReceiver, ReplayVoteSender}, vote_sender_types::{ReplayVoteReceiver, ReplayVoteSender},
}, },
solana_sdk::signature::Keypair,
std::{ std::{
net::UdpSocket, net::UdpSocket,
sync::{ sync::{
@ -46,6 +47,7 @@ pub struct Tpu {
banking_stage: BankingStage, banking_stage: BankingStage,
cluster_info_vote_listener: ClusterInfoVoteListener, cluster_info_vote_listener: ClusterInfoVoteListener,
broadcast_stage: BroadcastStage, broadcast_stage: BroadcastStage,
tpu_quic_t: thread::JoinHandle<()>,
} }
impl Tpu { impl Tpu {
@ -59,6 +61,7 @@ impl Tpu {
tpu_forwards_sockets: Vec<UdpSocket>, tpu_forwards_sockets: Vec<UdpSocket>,
tpu_vote_sockets: Vec<UdpSocket>, tpu_vote_sockets: Vec<UdpSocket>,
broadcast_sockets: Vec<UdpSocket>, broadcast_sockets: Vec<UdpSocket>,
transactions_quic_socket: UdpSocket,
subscriptions: &Arc<RpcSubscriptions>, subscriptions: &Arc<RpcSubscriptions>,
transaction_status_sender: Option<TransactionStatusSender>, transaction_status_sender: Option<TransactionStatusSender>,
blockstore: &Arc<Blockstore>, blockstore: &Arc<Blockstore>,
@ -75,6 +78,7 @@ impl Tpu {
tpu_coalesce_ms: u64, tpu_coalesce_ms: u64,
cluster_confirmed_slot_sender: GossipDuplicateConfirmedSlotsSender, cluster_confirmed_slot_sender: GossipDuplicateConfirmedSlotsSender,
cost_model: &Arc<RwLock<CostModel>>, cost_model: &Arc<RwLock<CostModel>>,
keypair: &Keypair,
) -> Self { ) -> Self {
let (packet_sender, packet_receiver) = channel(); let (packet_sender, packet_receiver) = channel();
let (vote_packet_sender, vote_packet_receiver) = channel(); let (vote_packet_sender, vote_packet_receiver) = channel();
@ -90,6 +94,15 @@ impl Tpu {
); );
let (verified_sender, verified_receiver) = unbounded(); let (verified_sender, verified_receiver) = unbounded();
let tpu_quic_t = solana_streamer::quic::spawn_server(
transactions_quic_socket,
keypair,
cluster_info.my_contact_info().tpu.ip(),
packet_sender,
exit.clone(),
)
.unwrap();
let sigverify_stage = { let sigverify_stage = {
let verifier = TransactionSigVerifier::default(); let verifier = TransactionSigVerifier::default();
SigVerifyStage::new(packet_receiver, verified_sender, verifier) SigVerifyStage::new(packet_receiver, verified_sender, verifier)
@ -153,6 +166,7 @@ impl Tpu {
banking_stage, banking_stage,
cluster_info_vote_listener, cluster_info_vote_listener,
broadcast_stage, broadcast_stage,
tpu_quic_t,
} }
} }
@ -164,6 +178,7 @@ impl Tpu {
self.cluster_info_vote_listener.join(), self.cluster_info_vote_listener.join(),
self.banking_stage.join(), self.banking_stage.join(),
]; ];
self.tpu_quic_t.join()?;
let broadcast_result = self.broadcast_stage.join(); let broadcast_result = self.broadcast_stage.join();
for result in results { for result in results {
result?; result?;

View File

@ -539,8 +539,11 @@ impl Validator {
} }
} }
let mut cluster_info = let mut cluster_info = ClusterInfo::new(
ClusterInfo::new(node.info.clone(), identity_keypair, socket_addr_space); node.info.clone(),
identity_keypair.clone(),
socket_addr_space,
);
cluster_info.set_contact_debug_interval(config.contact_debug_interval); cluster_info.set_contact_debug_interval(config.contact_debug_interval);
cluster_info.set_entrypoints(cluster_entrypoints); cluster_info.set_entrypoints(cluster_entrypoints);
cluster_info.restore_contact_info(ledger_path, config.contact_save_interval); cluster_info.restore_contact_info(ledger_path, config.contact_save_interval);
@ -904,6 +907,7 @@ impl Validator {
node.sockets.tpu_forwards, node.sockets.tpu_forwards,
node.sockets.tpu_vote, node.sockets.tpu_vote,
node.sockets.broadcast, node.sockets.broadcast,
node.sockets.tpu_quic,
&rpc_subscriptions, &rpc_subscriptions,
transaction_status_sender, transaction_status_sender,
&blockstore, &blockstore,
@ -920,6 +924,7 @@ impl Validator {
config.tpu_coalesce_ms, config.tpu_coalesce_ms,
cluster_confirmed_slot_sender, cluster_confirmed_slot_sender,
&cost_model, &cost_model,
&identity_keypair,
); );
datapoint_info!("validator-new", ("id", id.to_string(), String)); datapoint_info!("validator-new", ("id", id.to_string(), String));

View File

@ -58,6 +58,7 @@ use {
feature_set::FeatureSet, feature_set::FeatureSet,
hash::Hash, hash::Hash,
pubkey::Pubkey, pubkey::Pubkey,
quic::QUIC_PORT_OFFSET,
sanitize::{Sanitize, SanitizeError}, sanitize::{Sanitize, SanitizeError},
signature::{Keypair, Signable, Signature, Signer}, signature::{Keypair, Signable, Signature, Signer},
timing::timestamp, timing::timestamp,
@ -92,7 +93,7 @@ use {
}; };
pub const VALIDATOR_PORT_RANGE: PortRange = (8000, 10_000); pub const VALIDATOR_PORT_RANGE: PortRange = (8000, 10_000);
pub const MINIMUM_VALIDATOR_PORT_RANGE_WIDTH: u16 = 11; // VALIDATOR_PORT_RANGE must be at least this wide pub const MINIMUM_VALIDATOR_PORT_RANGE_WIDTH: u16 = 12; // VALIDATOR_PORT_RANGE must be at least this wide
/// The Data plane fanout size, also used as the neighborhood size /// The Data plane fanout size, also used as the neighborhood size
pub const DATA_PLANE_FANOUT: usize = 200; pub const DATA_PLANE_FANOUT: usize = 200;
@ -2741,6 +2742,7 @@ pub struct Sockets {
pub retransmit_sockets: Vec<UdpSocket>, pub retransmit_sockets: Vec<UdpSocket>,
pub serve_repair: UdpSocket, pub serve_repair: UdpSocket,
pub ancestor_hashes_requests: UdpSocket, pub ancestor_hashes_requests: UdpSocket,
pub tpu_quic: UdpSocket,
} }
#[derive(Debug)] #[derive(Debug)]
@ -2757,6 +2759,8 @@ impl Node {
pub fn new_localhost_with_pubkey(pubkey: &Pubkey) -> Self { pub fn new_localhost_with_pubkey(pubkey: &Pubkey) -> Self {
let bind_ip_addr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)); let bind_ip_addr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));
let tpu = UdpSocket::bind("127.0.0.1:0").unwrap(); let tpu = UdpSocket::bind("127.0.0.1:0").unwrap();
let tpu_quic_port = tpu.local_addr().unwrap().port() + QUIC_PORT_OFFSET;
let tpu_quic = UdpSocket::bind(format!("127.0.0.1:{}", tpu_quic_port)).unwrap();
let (gossip_port, (gossip, ip_echo)) = let (gossip_port, (gossip, ip_echo)) =
bind_common_in_range(bind_ip_addr, (1024, 65535)).unwrap(); bind_common_in_range(bind_ip_addr, (1024, 65535)).unwrap();
let gossip_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), gossip_port); let gossip_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), gossip_port);
@ -2806,6 +2810,7 @@ impl Node {
retransmit_sockets: vec![retransmit_socket], retransmit_sockets: vec![retransmit_socket],
serve_repair, serve_repair,
ancestor_hashes_requests, ancestor_hashes_requests,
tpu_quic,
}, },
} }
} }
@ -2841,6 +2846,10 @@ impl Node {
let (tvu_port, tvu) = Self::bind(bind_ip_addr, port_range); let (tvu_port, tvu) = Self::bind(bind_ip_addr, port_range);
let (tvu_forwards_port, tvu_forwards) = Self::bind(bind_ip_addr, port_range); let (tvu_forwards_port, tvu_forwards) = Self::bind(bind_ip_addr, port_range);
let (tpu_port, tpu) = Self::bind(bind_ip_addr, port_range); let (tpu_port, tpu) = Self::bind(bind_ip_addr, port_range);
let (_tpu_port_quic, tpu_quic) = Self::bind(
bind_ip_addr,
(tpu_port + QUIC_PORT_OFFSET, tpu_port + QUIC_PORT_OFFSET + 1),
);
let (tpu_forwards_port, tpu_forwards) = Self::bind(bind_ip_addr, port_range); let (tpu_forwards_port, tpu_forwards) = Self::bind(bind_ip_addr, port_range);
let (tpu_vote_port, tpu_vote) = Self::bind(bind_ip_addr, port_range); let (tpu_vote_port, tpu_vote) = Self::bind(bind_ip_addr, port_range);
let (_, retransmit_socket) = Self::bind(bind_ip_addr, port_range); let (_, retransmit_socket) = Self::bind(bind_ip_addr, port_range);
@ -2884,6 +2893,7 @@ impl Node {
retransmit_sockets: vec![retransmit_socket], retransmit_sockets: vec![retransmit_socket],
serve_repair, serve_repair,
ancestor_hashes_requests, ancestor_hashes_requests,
tpu_quic,
}, },
} }
} }
@ -2906,6 +2916,11 @@ impl Node {
let (tpu_port, tpu_sockets) = let (tpu_port, tpu_sockets) =
multi_bind_in_range(bind_ip_addr, port_range, 32).expect("tpu multi_bind"); multi_bind_in_range(bind_ip_addr, port_range, 32).expect("tpu multi_bind");
let (_tpu_port_quic, tpu_quic) = Self::bind(
bind_ip_addr,
(tpu_port + QUIC_PORT_OFFSET, tpu_port + QUIC_PORT_OFFSET + 1),
);
let (tpu_forwards_port, tpu_forwards_sockets) = let (tpu_forwards_port, tpu_forwards_sockets) =
multi_bind_in_range(bind_ip_addr, port_range, 8).expect("tpu_forwards multi_bind"); multi_bind_in_range(bind_ip_addr, port_range, 8).expect("tpu_forwards multi_bind");
@ -2955,6 +2970,7 @@ impl Node {
serve_repair, serve_repair,
ip_echo: Some(ip_echo), ip_echo: Some(ip_echo),
ancestor_hashes_requests, ancestor_hashes_requests,
tpu_quic,
}, },
} }
} }

View File

@ -38,6 +38,7 @@ pub mod poh_config;
pub mod precompiles; pub mod precompiles;
pub mod program_utils; pub mod program_utils;
pub mod pubkey; pub mod pubkey;
pub mod quic;
pub mod recent_blockhashes_account; pub mod recent_blockhashes_account;
pub mod rpc_port; pub mod rpc_port;
pub mod secp256k1_instruction; pub mod secp256k1_instruction;

1
sdk/src/quic.rs Normal file
View File

@ -0,0 +1 @@
pub const QUIC_PORT_OFFSET: u16 = 1;

View File

@ -10,15 +10,25 @@ documentation = "https://docs.rs/solana-streamer"
edition = "2021" edition = "2021"
[dependencies] [dependencies]
itertools = "0.10.1" crossbeam-channel = "0.5"
futures-util = "0.3.19"
histogram = "0.6.9"
itertools = "0.10.3"
libc = "0.2.115"
log = "0.4.14" log = "0.4.14"
nix = "0.23.1"
quinn = "0.8.0"
rand = "0.7.0"
rcgen = "0.8.14"
rustls = { version = "0.20.2", features = ["dangerous_configuration"] }
pem = "1.0.2"
pkcs8 = { version = "0.8.0", features = ["alloc"] }
solana-logger = { path = "../logger", version = "=1.9.6" }
solana-metrics = { path = "../metrics", version = "=1.9.6" } solana-metrics = { path = "../metrics", version = "=1.9.6" }
solana-sdk = { path = "../sdk", version = "=1.9.6" } solana-sdk = { path = "../sdk", version = "=1.9.6" }
thiserror = "1.0"
solana-logger = { path = "../logger", version = "=1.9.6" }
libc = "0.2.108"
nix = "0.23.0"
solana-perf = { path = "../perf", version = "=1.9.6" } solana-perf = { path = "../perf", version = "=1.9.6" }
thiserror = "1.0"
tokio = { version = "1", features = ["full"] }
[dev-dependencies] [dev-dependencies]

View File

@ -1,5 +1,6 @@
#![allow(clippy::integer_arithmetic)] #![allow(clippy::integer_arithmetic)]
pub mod packet; pub mod packet;
pub mod quic;
pub mod recvmmsg; pub mod recvmmsg;
pub mod sendmmsg; pub mod sendmmsg;
pub mod socket; pub mod socket;

420
streamer/src/quic.rs Normal file
View File

@ -0,0 +1,420 @@
use {
futures_util::stream::StreamExt,
pem::Pem,
pkcs8::{der::Document, AlgorithmIdentifier, ObjectIdentifier},
quinn::{Endpoint, EndpointConfig, ServerConfig},
rcgen::{CertificateParams, DistinguishedName, DnType, SanType},
solana_perf::packet::PacketBatch,
solana_sdk::{
packet::{Packet, PACKET_DATA_SIZE},
signature::Keypair,
},
std::{
error::Error,
net::{IpAddr, SocketAddr, UdpSocket},
sync::{
atomic::{AtomicBool, Ordering},
mpsc::Sender,
Arc,
},
thread,
time::Duration,
},
tokio::{
runtime::{Builder, Runtime},
time::timeout,
},
};
/// Returns default server configuration along with its PEM certificate chain.
#[allow(clippy::field_reassign_with_default)] // https://github.com/rust-lang/rust-clippy/issues/6527
fn configure_server(
identity_keypair: &Keypair,
gossip_host: IpAddr,
) -> Result<(ServerConfig, String), QuicServerError> {
let (cert_chain, priv_key) =
new_cert(identity_keypair, gossip_host).map_err(|_e| QuicServerError::ConfigureFailed)?;
let cert_chain_pem_parts: Vec<Pem> = cert_chain
.iter()
.map(|cert| Pem {
tag: "CERTIFICATE".to_string(),
contents: cert.0.clone(),
})
.collect();
let cert_chain_pem = pem::encode_many(&cert_chain_pem_parts);
let mut server_config = ServerConfig::with_single_cert(cert_chain, priv_key)
.map_err(|_e| QuicServerError::ConfigureFailed)?;
let config = Arc::get_mut(&mut server_config.transport).unwrap();
const MAX_CONCURRENT_UNI_STREAMS: u32 = 1;
config.max_concurrent_uni_streams(MAX_CONCURRENT_UNI_STREAMS.into());
config.stream_receive_window((PACKET_DATA_SIZE as u32).into());
config.receive_window((PACKET_DATA_SIZE as u32 * MAX_CONCURRENT_UNI_STREAMS).into());
// disable bidi & datagrams
const MAX_CONCURRENT_BIDI_STREAMS: u32 = 0;
config.max_concurrent_bidi_streams(MAX_CONCURRENT_BIDI_STREAMS.into());
config.datagram_receive_buffer_size(None);
Ok((server_config, cert_chain_pem))
}
fn new_cert(
identity_keypair: &Keypair,
san: IpAddr,
) -> Result<(Vec<rustls::Certificate>, rustls::PrivateKey), Box<dyn Error>> {
// Generate a self-signed cert from validator identity key
let cert_params = new_cert_params(identity_keypair, san);
let cert = rcgen::Certificate::from_params(cert_params)?;
let cert_der = cert.serialize_der().unwrap();
let priv_key = cert.serialize_private_key_der();
let priv_key = rustls::PrivateKey(priv_key);
let cert_chain = vec![rustls::Certificate(cert_der)];
Ok((cert_chain, priv_key))
}
fn convert_to_rcgen_keypair(identity_keypair: &Keypair) -> rcgen::KeyPair {
// from https://datatracker.ietf.org/doc/html/rfc8410#section-3
const ED25519_IDENTIFIER: [u32; 4] = [1, 3, 101, 112];
let mut private_key = Vec::<u8>::with_capacity(34);
private_key.extend_from_slice(&[0x04, 0x20]); // ASN.1 OCTET STRING
private_key.extend_from_slice(identity_keypair.secret().as_bytes());
let key_pkcs8 = pkcs8::PrivateKeyInfo {
algorithm: AlgorithmIdentifier {
oid: ObjectIdentifier::from_arcs(&ED25519_IDENTIFIER).unwrap(),
parameters: None,
},
private_key: &private_key,
public_key: None,
};
let key_pkcs8_der = key_pkcs8
.to_der()
.expect("Failed to convert keypair to DER")
.to_der();
// Parse private key into rcgen::KeyPair struct.
rcgen::KeyPair::from_der(&key_pkcs8_der).expect("Failed to parse keypair from DER")
}
fn new_cert_params(identity_keypair: &Keypair, san: IpAddr) -> CertificateParams {
// TODO(terorie): Is it safe to sign the TLS cert with the identity private key?
// Unfortunately, rcgen does not accept a "raw" Ed25519 key.
// We have to convert it to DER and pass it to the library.
// Convert private key into PKCS#8 v1 object.
// RFC 8410, Section 7: Private Key Format
// https://datatracker.ietf.org/doc/html/rfc8410#section-
let keypair = convert_to_rcgen_keypair(identity_keypair);
let mut cert_params = CertificateParams::default();
cert_params.subject_alt_names = vec![SanType::IpAddress(san)];
cert_params.alg = &rcgen::PKCS_ED25519;
cert_params.key_pair = Some(keypair);
cert_params.distinguished_name = DistinguishedName::new();
cert_params
.distinguished_name
.push(DnType::CommonName, "Solana node");
cert_params
}
pub fn rt() -> Runtime {
Builder::new_current_thread().enable_all().build().unwrap()
}
#[derive(thiserror::Error, Debug)]
pub enum QuicServerError {
#[error("Server configure failed")]
ConfigureFailed,
#[error("Endpoint creation failed")]
EndpointFailed,
}
// Return true if the server should drop the stream
fn handle_chunk(
chunk: &Result<Option<quinn::Chunk>, quinn::ReadError>,
maybe_batch: &mut Option<PacketBatch>,
remote_addr: &SocketAddr,
packet_sender: &Sender<PacketBatch>,
) -> bool {
match chunk {
Ok(maybe_chunk) => {
if let Some(chunk) = maybe_chunk {
trace!("got chunk: {:?}", chunk);
let chunk_len = chunk.bytes.len() as u64;
// shouldn't happen, but sanity check the size and offsets
if chunk.offset > PACKET_DATA_SIZE as u64 || chunk_len > PACKET_DATA_SIZE as u64 {
return true;
}
if chunk.offset + chunk_len > PACKET_DATA_SIZE as u64 {
return true;
}
// chunk looks valid
if maybe_batch.is_none() {
let mut batch = PacketBatch::with_capacity(1);
let mut packet = Packet::default();
packet.meta.set_addr(remote_addr);
batch.packets.push(packet);
*maybe_batch = Some(batch);
}
if let Some(batch) = maybe_batch.as_mut() {
let end = chunk.offset as usize + chunk.bytes.len();
batch.packets[0].data[chunk.offset as usize..end].copy_from_slice(&chunk.bytes);
batch.packets[0].meta.size = std::cmp::max(batch.packets[0].meta.size, end);
}
} else {
trace!("chunk is none");
// done receiving chunks
if let Some(batch) = maybe_batch.take() {
let len = batch.packets[0].meta.size;
if let Err(e) = packet_sender.send(batch) {
info!("send error: {}", e);
} else {
trace!("sent {} byte packet", len);
}
}
return true;
}
}
Err(e) => {
debug!("Received stream error: {:?}", e);
return true;
}
}
false
}
pub fn spawn_server(
sock: UdpSocket,
keypair: &Keypair,
gossip_host: IpAddr,
packet_sender: Sender<PacketBatch>,
exit: Arc<AtomicBool>,
) -> Result<thread::JoinHandle<()>, QuicServerError> {
let (config, _cert) = configure_server(keypair, gossip_host)?;
let runtime = rt();
let (_, mut incoming) = {
let _guard = runtime.enter();
Endpoint::new(EndpointConfig::default(), Some(config), sock)
.map_err(|_e| QuicServerError::EndpointFailed)?
};
let handle = thread::spawn(move || {
let handle = runtime.spawn(async move {
while !exit.load(Ordering::Relaxed) {
const WAIT_FOR_CONNECTION_TIMEOUT_MS: u64 = 1000;
let timeout_connection = timeout(
Duration::from_millis(WAIT_FOR_CONNECTION_TIMEOUT_MS),
incoming.next(),
)
.await;
if let Ok(Some(connection)) = timeout_connection {
if let Ok(new_connection) = connection.await {
let exit = exit.clone();
let quinn::NewConnection {
connection,
mut uni_streams,
..
} = new_connection;
let remote_addr = connection.remote_address();
let packet_sender = packet_sender.clone();
tokio::spawn(async move {
debug!("new connection {}", remote_addr);
while let Some(Ok(mut stream)) = uni_streams.next().await {
let mut maybe_batch = None;
while !exit.load(Ordering::Relaxed) {
if handle_chunk(
&stream.read_chunk(PACKET_DATA_SIZE, false).await,
&mut maybe_batch,
&remote_addr,
&packet_sender,
) {
break;
}
}
}
});
}
}
}
});
if let Err(e) = runtime.block_on(handle) {
warn!("error from runtime.block_on: {:?}", e);
}
});
Ok(handle)
}
#[cfg(test)]
mod test {
use super::*;
use quinn::{ClientConfig, NewConnection};
use std::{net::SocketAddr, sync::mpsc::channel, time::Instant};
struct SkipServerVerification;
impl SkipServerVerification {
fn new() -> Arc<Self> {
Arc::new(Self)
}
}
impl rustls::client::ServerCertVerifier for SkipServerVerification {
fn verify_server_cert(
&self,
_end_entity: &rustls::Certificate,
_intermediates: &[rustls::Certificate],
_server_name: &rustls::ServerName,
_scts: &mut dyn Iterator<Item = &[u8]>,
_ocsp_response: &[u8],
_now: std::time::SystemTime,
) -> Result<rustls::client::ServerCertVerified, rustls::Error> {
Ok(rustls::client::ServerCertVerified::assertion())
}
}
pub fn get_client_config() -> quinn::ClientConfig {
let crypto = rustls::ClientConfig::builder()
.with_safe_defaults()
.with_custom_certificate_verifier(SkipServerVerification::new())
.with_no_client_auth();
ClientConfig::new(Arc::new(crypto))
}
#[test]
fn test_quic_server_exit() {
let s = UdpSocket::bind("127.0.0.1:0").unwrap();
let exit = Arc::new(AtomicBool::new(false));
let (sender, _receiver) = channel();
let keypair = Keypair::new();
let ip = "127.0.0.1".parse().unwrap();
let t = spawn_server(s, &keypair, ip, sender, exit.clone()).unwrap();
exit.store(true, Ordering::Relaxed);
t.join().unwrap();
}
fn make_client_endpoint(runtime: &Runtime, addr: &SocketAddr) -> NewConnection {
let client_socket = UdpSocket::bind("127.0.0.1:0").unwrap();
let mut endpoint = quinn::Endpoint::new(EndpointConfig::default(), None, client_socket)
.unwrap()
.0;
endpoint.set_default_client_config(get_client_config());
runtime
.block_on(endpoint.connect(*addr, "localhost").unwrap())
.unwrap()
}
#[test]
fn test_quic_server_multiple_streams() {
solana_logger::setup();
let s = UdpSocket::bind("127.0.0.1:0").unwrap();
let exit = Arc::new(AtomicBool::new(false));
let (sender, receiver) = channel();
let keypair = Keypair::new();
let ip = "127.0.0.1".parse().unwrap();
let server_address = s.local_addr().unwrap();
let t = spawn_server(s, &keypair, ip, sender, exit.clone()).unwrap();
let runtime = rt();
let _rt_guard = runtime.enter();
let conn1 = Arc::new(make_client_endpoint(&runtime, &server_address));
let conn2 = Arc::new(make_client_endpoint(&runtime, &server_address));
let mut num_expected_packets = 0;
for i in 0..10 {
info!("sending: {}", i);
let c1 = conn1.clone();
let c2 = conn2.clone();
let handle = runtime.spawn(async move {
let mut s1 = c1.connection.open_uni().await.unwrap();
let mut s2 = c2.connection.open_uni().await.unwrap();
s1.write_all(&[0u8]).await.unwrap();
s1.finish().await.unwrap();
s2.write_all(&[0u8]).await.unwrap();
s2.finish().await.unwrap();
});
runtime.block_on(handle).unwrap();
num_expected_packets += 2;
thread::sleep(Duration::from_millis(200));
}
let mut all_packets = vec![];
let now = Instant::now();
let mut total_packets = 0;
while now.elapsed().as_secs() < 10 {
if let Ok(packets) = receiver.recv_timeout(Duration::from_secs(1)) {
total_packets += packets.packets.len();
all_packets.push(packets)
}
if total_packets == num_expected_packets {
break;
}
}
for batch in all_packets {
for p in &batch.packets {
assert_eq!(p.meta.size, 1);
}
}
assert_eq!(total_packets, num_expected_packets);
exit.store(true, Ordering::Relaxed);
t.join().unwrap();
}
#[test]
fn test_quic_server_multiple_writes() {
solana_logger::setup();
let s = UdpSocket::bind("127.0.0.1:0").unwrap();
let exit = Arc::new(AtomicBool::new(false));
let (sender, receiver) = channel();
let keypair = Keypair::new();
let ip = "127.0.0.1".parse().unwrap();
let server_address = s.local_addr().unwrap();
let t = spawn_server(s, &keypair, ip, sender, exit.clone()).unwrap();
let runtime = rt();
let _rt_guard = runtime.enter();
let conn1 = Arc::new(make_client_endpoint(&runtime, &server_address));
// Send a full size packet with single byte writes.
let num_bytes = PACKET_DATA_SIZE;
let num_expected_packets = 1;
let handle = runtime.spawn(async move {
let mut s1 = conn1.connection.open_uni().await.unwrap();
for _ in 0..num_bytes {
s1.write_all(&[0u8]).await.unwrap();
}
s1.finish().await.unwrap();
});
runtime.block_on(handle).unwrap();
let mut all_packets = vec![];
let now = Instant::now();
let mut total_packets = 0;
while now.elapsed().as_secs() < 5 {
if let Ok(packets) = receiver.recv_timeout(Duration::from_secs(1)) {
total_packets += packets.packets.len();
all_packets.push(packets)
}
if total_packets > num_expected_packets {
break;
}
}
for batch in all_packets {
for p in &batch.packets {
assert_eq!(p.meta.size, num_bytes);
}
}
assert_eq!(total_packets, num_expected_packets);
exit.store(true, Ordering::Relaxed);
t.join().unwrap();
}
}