From 65ccfed868c199e2e7ce17a692dd39865d5a8cf8 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Wed, 28 Jul 2021 09:31:43 -0700 Subject: [PATCH] Accounts db replication replica skeleton (#18767) This is the first installment of the AccountsDb replication. Summary of Changes: The rpc-node for AccountsDb replication executable cherry-picked from Steven's branch Auto figuring out the snapshot to download via gossip The replica now can download the snapshot, initialize the bank and AccountsDb, start the JsonRpcService Integration test of a validator paired with a replica. --- Cargo.lock | 436 ++++++++++++++++++++++------ Cargo.toml | 1 + replica-node/.gitignore | 2 + replica-node/Cargo.toml | 54 ++++ replica-node/src/lib.rs | 2 + replica-node/src/main.rs | 381 ++++++++++++++++++++++++ replica-node/src/replica_node.rs | 302 +++++++++++++++++++ replica-node/src/replica_util.rs | 268 +++++++++++++++++ replica-node/tests/local_replica.rs | 278 ++++++++++++++++++ 9 files changed, 1642 insertions(+), 82 deletions(-) create mode 100644 replica-node/.gitignore create mode 100644 replica-node/Cargo.toml create mode 100644 replica-node/src/lib.rs create mode 100644 replica-node/src/main.rs create mode 100644 replica-node/src/replica_node.rs create mode 100644 replica-node/src/replica_util.rs create mode 100644 replica-node/tests/local_replica.rs diff --git a/Cargo.lock b/Cargo.lock index 6a1e4e9062..60588f6df5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -195,7 +195,7 @@ dependencies = [ "instant", "pin-project 1.0.7", "rand 0.8.3", - "tokio", + "tokio 1.9.0", ] [[package]] @@ -729,6 +729,23 @@ dependencies = [ "unreachable", ] +[[package]] +name = "console" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c0994e656bba7b922d8dd1245db90672ffb701e684e45be58f20719d69abc5a" +dependencies = [ + "encode_unicode", + "lazy_static", + "libc", + "regex", + "terminal_size", + "termios", + "unicode-width", + "winapi 0.3.8", + "winapi-util", +] + [[package]] name = "console" version = "0.14.1" @@ -1044,7 +1061,7 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c9dd058f8b65922819fabb4a41e7d1964e56344042c26efbccd465202c23fa0c" dependencies = [ - "console", + "console 0.14.1", "lazy_static", "tempfile", "zeroize", @@ -1530,7 +1547,7 @@ dependencies = [ "futures-sink", "futures-task", "memchr", - "pin-project-lite", + "pin-project-lite 0.2.4", "pin-utils", "proc-macro-hack", "proc-macro-nested", @@ -1640,7 +1657,7 @@ dependencies = [ "simpl", "smpl_jwt", "time 0.2.25", - "tokio", + "tokio 1.9.0", ] [[package]] @@ -1668,8 +1685,8 @@ dependencies = [ "http", "indexmap", "slab", - "tokio", - "tokio-util", + "tokio 1.9.0", + "tokio-util 0.6.3", "tracing", "tracing-futures", ] @@ -1815,7 +1832,7 @@ checksum = "60daa14be0e0786db0f03a9e57cb404c9d756eed2b6c62b9ea98ec5743ec75a9" dependencies = [ "bytes 1.0.1", "http", - "pin-project-lite", + "pin-project-lite 0.2.4", ] [[package]] @@ -1871,9 +1888,9 @@ dependencies = [ "httparse", "httpdate", "itoa", - "pin-project-lite", + "pin-project-lite 0.2.4", "socket2", - "tokio", + "tokio 1.9.0", "tower-service", "tracing", "want", @@ -1889,7 +1906,7 @@ dependencies = [ "hyper 0.14.3", "log 0.4.14", "rustls", - "tokio", + "tokio 1.9.0", "tokio-rustls", "webpki", ] @@ -1901,8 +1918,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" dependencies = [ "hyper 0.14.3", - "pin-project-lite", - "tokio", + "pin-project-lite 0.2.4", + "tokio 1.9.0", "tokio-io-timeout", ] @@ -1915,7 +1932,7 @@ dependencies = [ "bytes 1.0.1", "hyper 0.14.3", "native-tls", - "tokio", + "tokio 1.9.0", "tokio-native-tls", ] @@ -1964,7 +1981,7 @@ version = "0.16.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2d207dc617c7a380ab07ff572a6e52fa202a2a8f355860ac9c38e23f8196be1b" dependencies = [ - "console", + "console 0.14.1", "lazy_static", "number_prefix", "regex", @@ -2033,6 +2050,26 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "jsonrpc-client-transports" +version = "17.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2f81014e2706fde057e9dcb1036cf6bbf9418d972c597be5c7158c984656722" +dependencies = [ + "derive_more", + "futures 0.3.16", + "jsonrpc-core 17.1.0", + "jsonrpc-pubsub 17.1.0", + "jsonrpc-server-utils 17.1.0", + "log 0.4.14", + "parity-tokio-ipc 0.8.0", + "serde", + "serde_json", + "tokio 0.2.24", + "url 1.7.2", + "websocket", +] + [[package]] name = "jsonrpc-client-transports" version = "18.0.0" @@ -2041,18 +2078,33 @@ checksum = "d2b99d4207e2a04fb4581746903c2bb7eb376f88de9c699d0f3e10feeac0cd3a" dependencies = [ "derive_more", "futures 0.3.16", - "jsonrpc-core", - "jsonrpc-pubsub", - "jsonrpc-server-utils", + "jsonrpc-core 18.0.0", + "jsonrpc-pubsub 18.0.0", + "jsonrpc-server-utils 18.0.0", "log 0.4.14", - "parity-tokio-ipc", + "parity-tokio-ipc 0.9.0", "serde", "serde_json", - "tokio", + "tokio 1.9.0", "url 1.7.2", "websocket", ] +[[package]] +name = "jsonrpc-core" +version = "17.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4467ab6dfa369b69e52bd0692e480c4d117410538526a57a304a0f2250fd95e" +dependencies = [ + "futures 0.3.16", + "futures-executor", + "futures-util", + "log 0.4.14", + "serde", + "serde_derive", + "serde_json", +] + [[package]] name = "jsonrpc-core" version = "18.0.0" @@ -2068,6 +2120,16 @@ dependencies = [ "serde_json", ] +[[package]] +name = "jsonrpc-core-client" +version = "17.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c366c092d6bccc6e7ab44dd635a0f22ab2f201215339915fb7ff9508404f431" +dependencies = [ + "futures 0.3.16", + "jsonrpc-client-transports 17.1.0", +] + [[package]] name = "jsonrpc-core-client" version = "18.0.0" @@ -2075,7 +2137,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b51da17abecbdab3e3d4f26b01c5ec075e88d3abe3ab3b05dc9aa69392764ec0" dependencies = [ "futures 0.3.16", - "jsonrpc-client-transports", + "jsonrpc-client-transports 18.0.0", +] + +[[package]] +name = "jsonrpc-derive" +version = "17.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34f6326966ebac440db89eba788f5a0e5ac2614b4b4bfbdc049a971e71040f32" +dependencies = [ + "proc-macro-crate", + "proc-macro2 1.0.24", + "quote 1.0.9", + "syn 1.0.67", ] [[package]] @@ -2098,14 +2172,29 @@ checksum = "e1dea6e07251d9ce6a552abfb5d7ad6bc290a4596c8dcc3d795fae2bbdc1f3ff" dependencies = [ "futures 0.3.16", "hyper 0.14.3", - "jsonrpc-core", - "jsonrpc-server-utils", + "jsonrpc-core 18.0.0", + "jsonrpc-server-utils 18.0.0", "log 0.4.14", "net2", "parking_lot 0.11.1", "unicase 2.6.0", ] +[[package]] +name = "jsonrpc-ipc-server" +version = "17.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b1d782052ef17051d12681bcc2fa2e9e1aabf3f634588125493d63ddcca6fe1" +dependencies = [ + "futures 0.3.16", + "jsonrpc-core 17.1.0", + "jsonrpc-server-utils 17.1.0", + "log 0.4.14", + "parity-tokio-ipc 0.8.0", + "parking_lot 0.11.1", + "tower-service", +] + [[package]] name = "jsonrpc-ipc-server" version = "18.0.0" @@ -2113,14 +2202,29 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "382bb0206323ca7cda3dcd7e245cea86d37d02457a02a975e3378fb149a48845" dependencies = [ "futures 0.3.16", - "jsonrpc-core", - "jsonrpc-server-utils", + "jsonrpc-core 18.0.0", + "jsonrpc-server-utils 18.0.0", "log 0.4.14", - "parity-tokio-ipc", + "parity-tokio-ipc 0.9.0", "parking_lot 0.11.1", "tower-service", ] +[[package]] +name = "jsonrpc-pubsub" +version = "17.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14739e5523a40739882cc34a44ab2dd9356bce5ce102513f5984a9efbe342f3d" +dependencies = [ + "futures 0.3.16", + "jsonrpc-core 17.1.0", + "lazy_static", + "log 0.4.14", + "parking_lot 0.11.1", + "rand 0.7.3", + "serde", +] + [[package]] name = "jsonrpc-pubsub" version = "18.0.0" @@ -2128,7 +2232,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "240f87695e6c6f62fb37f05c02c04953cf68d6408b8c1c89de85c7a0125b1011" dependencies = [ "futures 0.3.16", - "jsonrpc-core", + "jsonrpc-core 18.0.0", "lazy_static", "log 0.4.14", "parking_lot 0.11.1", @@ -2136,6 +2240,23 @@ dependencies = [ "serde", ] +[[package]] +name = "jsonrpc-server-utils" +version = "17.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bce68fa279a2822b3619369cd024f8a4f8e5ce485468834f8679a3c7919aae2d" +dependencies = [ + "bytes 0.5.4", + "futures 0.3.16", + "globset", + "jsonrpc-core 17.1.0", + "lazy_static", + "log 0.4.14", + "tokio 0.2.24", + "tokio-util 0.3.1", + "unicase 2.6.0", +] + [[package]] name = "jsonrpc-server-utils" version = "18.0.0" @@ -2145,12 +2266,12 @@ dependencies = [ "bytes 1.0.1", "futures 0.3.16", "globset", - "jsonrpc-core", + "jsonrpc-core 18.0.0", "lazy_static", "log 0.4.14", - "tokio", + "tokio 1.9.0", "tokio-stream", - "tokio-util", + "tokio-util 0.6.3", "unicase 2.6.0", ] @@ -2161,8 +2282,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f892c7d766369475ab7b0669f417906302d7c0fb521285c0a0c92e52e7c8e946" dependencies = [ "futures 0.3.16", - "jsonrpc-core", - "jsonrpc-server-utils", + "jsonrpc-core 18.0.0", + "jsonrpc-server-utils 18.0.0", "log 0.4.14", "parity-ws", "parking_lot 0.11.1", @@ -2479,6 +2600,29 @@ dependencies = [ "slab", ] +[[package]] +name = "mio-named-pipes" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0840c1c50fd55e521b247f949c241c9997709f23bd7f023b9762cd561e935656" +dependencies = [ + "log 0.4.14", + "mio 0.6.22", + "miow 0.3.7", + "winapi 0.3.8", +] + +[[package]] +name = "mio-uds" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "afcb699eb26d4332647cc848492bbc15eafb26f08d0304550d5aa1f612e066f0" +dependencies = [ + "iovec", + "libc", + "mio 0.6.22", +] + [[package]] name = "miow" version = "0.2.2" @@ -2754,6 +2898,22 @@ dependencies = [ "syn 1.0.67", ] +[[package]] +name = "parity-tokio-ipc" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd7f6c69d7687501b2205fe51ade1d7b8797bb3aa141fe5bf13dd78c0483bc89" +dependencies = [ + "futures 0.3.16", + "libc", + "log 0.4.14", + "mio-named-pipes", + "miow 0.3.7", + "rand 0.7.3", + "tokio 0.2.24", + "winapi 0.3.8", +] + [[package]] name = "parity-tokio-ipc" version = "0.9.0" @@ -2764,7 +2924,7 @@ dependencies = [ "libc", "log 0.4.14", "rand 0.7.3", - "tokio", + "tokio 1.9.0", "winapi 0.3.8", ] @@ -2971,6 +3131,12 @@ dependencies = [ "syn 1.0.67", ] +[[package]] +name = "pin-project-lite" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "257b64915a082f7811703966789728173279bdebb956b143dbcd23f6f970a777" + [[package]] name = "pin-project-lite" version = "0.2.4" @@ -3539,12 +3705,12 @@ dependencies = [ "mime 0.3.16", "native-tls", "percent-encoding 2.1.0", - "pin-project-lite", + "pin-project-lite 0.2.4", "rustls", "serde", "serde_json", "serde_urlencoded", - "tokio", + "tokio 1.9.0", "tokio-native-tls", "tokio-rustls", "url 2.2.2", @@ -4154,7 +4320,7 @@ dependencies = [ "solana-runtime", "solana-sdk", "tarpc", - "tokio", + "tokio 1.9.0", "tokio-serde", ] @@ -4166,7 +4332,7 @@ dependencies = [ "serde", "solana-sdk", "tarpc", - "tokio", + "tokio 1.9.0", ] [[package]] @@ -4182,7 +4348,7 @@ dependencies = [ "solana-runtime", "solana-sdk", "tarpc", - "tokio", + "tokio 1.9.0", "tokio-serde", "tokio-stream", ] @@ -4296,7 +4462,7 @@ dependencies = [ "bs58 0.4.0", "chrono", "clap 2.33.3", - "console", + "console 0.14.1", "const_format", "criterion-stats", "ctrlc", @@ -4355,7 +4521,7 @@ dependencies = [ "base64 0.13.0", "chrono", "clap 2.33.3", - "console", + "console 0.14.1", "humantime", "indicatif", "serde", @@ -4380,7 +4546,7 @@ dependencies = [ "bs58 0.4.0", "clap 2.33.3", "indicatif", - "jsonrpc-core", + "jsonrpc-core 18.0.0", "jsonrpc-http-server", "log 0.4.14", "net2", @@ -4400,7 +4566,7 @@ dependencies = [ "solana-version", "solana-vote-program", "thiserror", - "tokio", + "tokio 1.9.0", "tungstenite", "url 2.2.2", ] @@ -4445,8 +4611,8 @@ dependencies = [ "fs_extra", "indexmap", "itertools 0.10.1", - "jsonrpc-core", - "jsonrpc-core-client", + "jsonrpc-core 18.0.0", + "jsonrpc-core-client 18.0.0", "libc", "log 0.4.14", "lru", @@ -4500,7 +4666,7 @@ dependencies = [ "systemstat", "tempfile", "thiserror", - "tokio", + "tokio 1.9.0", "trees", ] @@ -4553,7 +4719,7 @@ name = "solana-download-utils" version = "1.8.0" dependencies = [ "bzip2", - "console", + "console 0.14.1", "indicatif", "log 0.4.14", "reqwest", @@ -4609,7 +4775,7 @@ dependencies = [ "solana-version", "spl-memo", "thiserror", - "tokio", + "tokio 1.9.0", ] [[package]] @@ -4756,7 +4922,7 @@ dependencies = [ "bzip2", "chrono", "clap 2.33.3", - "console", + "console 0.14.1", "ctrlc", "dirs-next", "indicatif", @@ -4845,7 +5011,7 @@ dependencies = [ "solana-vote-program", "tempfile", "thiserror", - "tokio", + "tokio 1.9.0", "tokio-stream", "trees", ] @@ -4884,7 +5050,7 @@ dependencies = [ "solana-version", "solana-vote-program", "tempfile", - "tokio", + "tokio 1.9.0", ] [[package]] @@ -5025,7 +5191,7 @@ dependencies = [ "solana-logger 1.8.0", "solana-sdk", "solana-version", - "tokio", + "tokio 1.9.0", "url 2.2.2", ] @@ -5211,7 +5377,7 @@ dependencies = [ "solana-sdk", "solana-vote-program", "thiserror", - "tokio", + "tokio 1.9.0", ] [[package]] @@ -5227,7 +5393,7 @@ name = "solana-remote-wallet" version = "1.8.0" dependencies = [ "base32", - "console", + "console 0.14.1", "dialoguer", "hidapi", "log 0.4.14", @@ -5241,6 +5407,48 @@ dependencies = [ "uriparse", ] +[[package]] +name = "solana-replica-node" +version = "1.8.0" +dependencies = [ + "assert_matches", + "bincode", + "chrono", + "clap 2.33.3", + "console 0.11.3", + "crossbeam-channel", + "jsonrpc-core 17.1.0", + "jsonrpc-core-client 17.1.0", + "jsonrpc-derive 17.1.0", + "jsonrpc-ipc-server 17.1.0", + "jsonrpc-server-utils 17.1.0", + "log 0.4.14", + "rand 0.7.3", + "serde", + "serial_test 0.5.1", + "solana-clap-utils", + "solana-cli-config", + "solana-client", + "solana-core", + "solana-download-utils", + "solana-genesis-utils", + "solana-gossip", + "solana-ledger", + "solana-local-cluster", + "solana-logger 1.8.0", + "solana-metrics", + "solana-net-utils", + "solana-rpc", + "solana-runtime", + "solana-sdk", + "solana-streamer", + "solana-validator", + "solana-version", + "solana-vote-program", + "tempdir", + "tempfile", +] + [[package]] name = "solana-rpc" version = "1.8.0" @@ -5250,11 +5458,11 @@ dependencies = [ "bs58 0.4.0", "crossbeam-channel", "itertools 0.10.1", - "jsonrpc-core", - "jsonrpc-core-client", - "jsonrpc-derive", + "jsonrpc-core 18.0.0", + "jsonrpc-core-client 18.0.0", + "jsonrpc-derive 18.0.0", "jsonrpc-http-server", - "jsonrpc-pubsub", + "jsonrpc-pubsub 18.0.0", "jsonrpc-ws-server", "libc", "log 0.4.14", @@ -5285,8 +5493,8 @@ dependencies = [ "solana-vote-program", "spl-token", "symlink", - "tokio", - "tokio-util", + "tokio 1.9.0", + "tokio-util 0.6.3", ] [[package]] @@ -5563,7 +5771,7 @@ dependencies = [ "bincode", "chrono", "clap 2.33.3", - "console", + "console 0.14.1", "csv", "ctrlc", "dirs-next", @@ -5628,15 +5836,15 @@ dependencies = [ "bincode", "chrono", "clap 2.33.3", - "console", + "console 0.14.1", "core_affinity", "fd-lock", "indicatif", - "jsonrpc-core", - "jsonrpc-core-client", - "jsonrpc-derive", - "jsonrpc-ipc-server", - "jsonrpc-server-utils", + "jsonrpc-core 18.0.0", + "jsonrpc-core-client 18.0.0", + "jsonrpc-derive 18.0.0", + "jsonrpc-ipc-server 18.0.0", + "jsonrpc-server-utils 18.0.0", "libc", "log 0.4.14", "num_cpus", @@ -5972,9 +6180,9 @@ dependencies = [ "static_assertions", "tarpc-plugins", "thiserror", - "tokio", + "tokio 1.9.0", "tokio-serde", - "tokio-util", + "tokio-util 0.6.3", "tracing", "tracing-opentelemetry", ] @@ -5990,6 +6198,16 @@ dependencies = [ "syn 1.0.67", ] +[[package]] +name = "tempdir" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15f2b5fb00ccdf689e0149d1b1b3c03fead81c2b37735d812fa8bddbbf41b6d8" +dependencies = [ + "rand 0.4.6", + "remove_dir_all", +] + [[package]] name = "tempfile" version = "3.2.0" @@ -6023,6 +6241,15 @@ dependencies = [ "winapi 0.3.8", ] +[[package]] +name = "termios" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "411c5bf740737c7918b8b1fe232dca4dc9f8e754b8ad5e20966814001ed0ac6b" +dependencies = [ + "libc", +] + [[package]] name = "textwrap" version = "0.11.0" @@ -6173,6 +6400,26 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" +[[package]] +name = "tokio" +version = "0.2.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "099837d3464c16a808060bb3f02263b412f6fafcb5d01c533d309985fbeebe48" +dependencies = [ + "bytes 0.5.4", + "futures-core", + "iovec", + "lazy_static", + "libc", + "memchr", + "mio 0.6.22", + "mio-uds", + "num_cpus", + "pin-project-lite 0.1.12", + "slab", + "tokio-macros 0.2.6", +] + [[package]] name = "tokio" version = "1.9.0" @@ -6187,9 +6434,9 @@ dependencies = [ "num_cpus", "once_cell", "parking_lot 0.11.1", - "pin-project-lite", + "pin-project-lite 0.2.4", "signal-hook-registry", - "tokio-macros", + "tokio-macros 1.2.0", "winapi 0.3.8", ] @@ -6231,8 +6478,19 @@ version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90c49f106be240de154571dd31fbe48acb10ba6c6dd6f6517ad603abffa42de9" dependencies = [ - "pin-project-lite", - "tokio", + "pin-project-lite 0.2.4", + "tokio 1.9.0", +] + +[[package]] +name = "tokio-macros" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e44da00bfc73a25f814cd8d7e57a68a5c31b74b3152a0a1d1f590c97ed06265a" +dependencies = [ + "proc-macro2 1.0.24", + "quote 1.0.9", + "syn 1.0.67", ] [[package]] @@ -6253,7 +6511,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f7d995660bd2b7f8c1568414c1126076c13fbb725c40112dc0120b78eb9b717b" dependencies = [ "native-tls", - "tokio", + "tokio 1.9.0", ] [[package]] @@ -6282,7 +6540,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc6844de72e57df1980054b38be3a9f4702aba4858be64dd700181a8a6d0e1b6" dependencies = [ "rustls", - "tokio", + "tokio 1.9.0", "webpki", ] @@ -6308,8 +6566,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b2f3f698253f03119ac0102beaa64f67a67e08074d03a22d18784104543727f" dependencies = [ "futures-core", - "pin-project-lite", - "tokio", + "pin-project-lite 0.2.4", + "tokio 1.9.0", ] [[package]] @@ -6347,6 +6605,20 @@ dependencies = [ "tokio-io", ] +[[package]] +name = "tokio-util" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be8242891f2b6cbef26a2d7e8605133c2c554cd35b3e4948ea892d6d68436499" +dependencies = [ + "bytes 0.5.4", + "futures-core", + "futures-sink", + "log 0.4.14", + "pin-project-lite 0.1.12", + "tokio 0.2.24", +] + [[package]] name = "tokio-util" version = "0.6.3" @@ -6357,9 +6629,9 @@ dependencies = [ "futures-core", "futures-sink", "log 0.4.14", - "pin-project-lite", + "pin-project-lite 0.2.4", "slab", - "tokio", + "tokio 1.9.0", ] [[package]] @@ -6392,10 +6664,10 @@ dependencies = [ "pin-project 1.0.7", "prost", "prost-derive", - "tokio", + "tokio 1.9.0", "tokio-rustls", "tokio-stream", - "tokio-util", + "tokio-util 0.6.3", "tower", "tower-layer", "tower-service", @@ -6427,9 +6699,9 @@ dependencies = [ "pin-project 1.0.7", "rand 0.8.3", "slab", - "tokio", + "tokio 1.9.0", "tokio-stream", - "tokio-util", + "tokio-util 0.6.3", "tower-layer", "tower-service", "tracing", @@ -6455,7 +6727,7 @@ checksum = "09adeb8c97449311ccd28a427f96fb563e7fd31aabf994189879d9da2394b89d" dependencies = [ "cfg-if 1.0.0", "log 0.4.14", - "pin-project-lite", + "pin-project-lite 0.2.4", "tracing-attributes", "tracing-core", ] diff --git a/Cargo.toml b/Cargo.toml index 289d44f5bc..8abf20d575 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -71,6 +71,7 @@ members = [ "cli", "rayon-threadlimit", "watchtower", + "replica-node", ] exclude = [ diff --git a/replica-node/.gitignore b/replica-node/.gitignore new file mode 100644 index 0000000000..5404b132db --- /dev/null +++ b/replica-node/.gitignore @@ -0,0 +1,2 @@ +/target/ +/farf/ diff --git a/replica-node/Cargo.toml b/replica-node/Cargo.toml new file mode 100644 index 0000000000..1a0e404d61 --- /dev/null +++ b/replica-node/Cargo.toml @@ -0,0 +1,54 @@ +[package] +authors = ["Solana Maintainers "] +edition = "2018" +name = "solana-replica-node" +description = "Solana replication node" +version = "1.8.0" +repository = "https://github.com/solana-labs/solana" +license = "Apache-2.0" +homepage = "https://solana.com/" +documentation = "https://docs.rs/solana-validator" + +[dependencies] +bincode = "1.3.1" +chrono = { version = "0.4.11", features = ["serde"] } +clap = "2.33.1" +console = "0.11.3" +crossbeam-channel = "0.5" +jsonrpc-core = "17.0.0" +jsonrpc-core-client = { version = "17.0.0", features = ["ipc", "ws"] } +jsonrpc-derive = "17.0.0" +jsonrpc-ipc-server = "17.0.0" +jsonrpc-server-utils= "17.0.0" +log = "0.4.11" +rand = "0.7.0" +serde = "1.0.112" +solana-clap-utils = { path = "../clap-utils", version = "=1.8.0" } +solana-cli-config = { path = "../cli-config", version = "=1.8.0" } +solana-client = { path = "../client", version = "=1.8.0" } +solana-download-utils = { path = "../download-utils", version = "=1.8.0" } +solana-genesis-utils = { path = "../genesis-utils", version = "=1.8.0" } +solana-gossip = { path = "../gossip", version = "=1.8.0" } +solana-ledger = { path = "../ledger", version = "=1.8.0" } +solana-logger = { path = "../logger", version = "=1.8.0" } +solana-metrics = { path = "../metrics", version = "=1.8.0" } +solana-net-utils = { path = "../net-utils", version = "=1.8.0" } +solana-rpc = { path = "../rpc", version = "=1.8.0" } +solana-runtime = { path = "../runtime", version = "=1.8.0" } +solana-sdk = { path = "../sdk", version = "=1.8.0" } +solana-streamer = { path = "../streamer", version = "=1.8.0" } +solana-version = { path = "../version", version = "=1.8.0" } +solana-validator = { path = "../validator", version = "=1.8.0" } + +[dev-dependencies] +solana-core = { path = "../core", version = "=1.8.0" } +solana-local-cluster = { path = "../local-cluster", version = "=1.8.0" } +solana-vote-program = { path = "../programs/vote", version = "=1.8.0" } +assert_matches = "1.5.0" +serial_test = "0.5.1" +tempdir = "0.3.7" +tempfile = "3.2.0" + + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] diff --git a/replica-node/src/lib.rs b/replica-node/src/lib.rs new file mode 100644 index 0000000000..6fc5d3bf9b --- /dev/null +++ b/replica-node/src/lib.rs @@ -0,0 +1,2 @@ +pub mod replica_node; +pub mod replica_util; diff --git a/replica-node/src/main.rs b/replica-node/src/main.rs new file mode 100644 index 0000000000..964d32dd88 --- /dev/null +++ b/replica-node/src/main.rs @@ -0,0 +1,381 @@ +//! The main AccountsDb replication node responsible for replicating +//! AccountsDb information from peer a validator or another replica-node. + +#![allow(clippy::integer_arithmetic)] + +use { + clap::{crate_description, crate_name, value_t, values_t, App, AppSettings, Arg}, + log::*, + rand::{seq::SliceRandom, thread_rng}, + solana_clap_utils::{ + input_parsers::keypair_of, + input_validators::{is_keypair_or_ask_keyword, is_parsable, is_pubkey}, + keypair::SKIP_SEED_PHRASE_VALIDATION_ARG, + }, + solana_gossip::{ + cluster_info::{Node, VALIDATOR_PORT_RANGE}, + contact_info::ContactInfo, + }, + solana_replica_node::{ + replica_node::{ReplicaNode, ReplicaNodeConfig}, + replica_util, + }, + solana_rpc::{rpc::JsonRpcConfig, rpc_pubsub_service::PubSubConfig}, + solana_runtime::accounts_index::AccountSecondaryIndexes, + solana_sdk::{exit::Exit, pubkey::Pubkey, signature::Signer}, + solana_streamer::socket::SocketAddrSpace, + solana_validator::port_range_validator, + std::{ + collections::HashSet, + env, + net::{IpAddr, SocketAddr}, + path::PathBuf, + process::exit, + sync::{Arc, RwLock}, + }, +}; + +pub fn main() { + let default_dynamic_port_range = + &format!("{}-{}", VALIDATOR_PORT_RANGE.0, VALIDATOR_PORT_RANGE.1); + + let matches = App::new(crate_name!()) + .about(crate_description!()) + .version(solana_version::version!()) + .setting(AppSettings::VersionlessSubcommands) + .setting(AppSettings::InferSubcommands) + .arg( + Arg::with_name(SKIP_SEED_PHRASE_VALIDATION_ARG.name) + .long(SKIP_SEED_PHRASE_VALIDATION_ARG.long) + .help(SKIP_SEED_PHRASE_VALIDATION_ARG.help), + ) + .arg( + Arg::with_name("ledger_path") + .short("l") + .long("ledger") + .value_name("DIR") + .takes_value(true) + .required(true) + .default_value("ledger") + .help("Use DIR as ledger location"), + ) + .arg( + Arg::with_name("snapshots") + .long("snapshots") + .value_name("DIR") + .takes_value(true) + .help("Use DIR as snapshot location [default: --ledger value]"), + ) + .arg( + Arg::with_name("peer") + .long("peer") + .value_name("IP:PORT") + .takes_value(true) + .required(true) + .help("The the IP:PORT for the peer validator/replica to download from"), + ) + .arg( + Arg::with_name("peer_pubkey") + .long("peer-pubkey") + .validator(is_pubkey) + .value_name("The peer validator/replica IDENTITY") + .required(true) + .takes_value(true) + .help("The pubkey for the target validator."), + ) + .arg( + Arg::with_name("account_paths") + .long("accounts") + .value_name("PATHS") + .takes_value(true) + .multiple(true) + .help("Comma separated persistent accounts location"), + ) + .arg( + Arg::with_name("identity") + .short("i") + .long("identity") + .value_name("KEYPAIR") + .takes_value(true) + .validator(is_keypair_or_ask_keyword) + .help("Replica identity keypair"), + ) + .arg( + Arg::with_name("entrypoint") + .short("n") + .long("entrypoint") + .value_name("HOST:PORT") + .takes_value(true) + .multiple(true) + .validator(solana_net_utils::is_host_port) + .help("Rendezvous with the cluster at this gossip entrypoint"), + ) + .arg( + Arg::with_name("bind_address") + .long("bind-address") + .value_name("HOST") + .takes_value(true) + .validator(solana_net_utils::is_host) + .default_value("0.0.0.0") + .help("IP address to bind the replica ports"), + ) + .arg( + Arg::with_name("rpc_bind_address") + .long("rpc-bind-address") + .value_name("HOST") + .takes_value(true) + .validator(solana_net_utils::is_host) + .help("IP address to bind the Json RPC port [default: use --bind-address]"), + ) + .arg( + Arg::with_name("rpc_port") + .long("rpc-port") + .value_name("PORT") + .takes_value(true) + .validator(solana_validator::port_validator) + .help("Enable JSON RPC on this port, and the next port for the RPC websocket"), + ) + .arg( + Arg::with_name("dynamic_port_range") + .long("dynamic-port-range") + .value_name("MIN_PORT-MAX_PORT") + .takes_value(true) + .default_value(default_dynamic_port_range) + .validator(port_range_validator) + .help("Range to use for dynamically assigned ports"), + ) + .arg( + Arg::with_name("expected_shred_version") + .long("expected-shred-version") + .value_name("VERSION") + .takes_value(true) + .validator(is_parsable::) + .help("Require the shred version be this value"), + ) + .arg( + Arg::with_name("logfile") + .short("o") + .long("log") + .value_name("FILE") + .takes_value(true) + .help( + "Redirect logging to the specified file, '-' for standard error. \ + Sending the SIGUSR1 signal to the validator process will cause it \ + to re-open the log file", + ), + ) + .arg( + Arg::with_name("allow_private_addr") + .long("allow-private-addr") + .takes_value(false) + .help("Allow contacting private ip addresses") + .hidden(true), + ) + .get_matches(); + + let bind_address = solana_net_utils::parse_host(matches.value_of("bind_address").unwrap()) + .expect("invalid bind_address"); + + let rpc_bind_address = if let Some(rpc_bind_address) = matches.value_of("rpc_bind_address") { + solana_net_utils::parse_host(rpc_bind_address).expect("invalid rpc_bind_address") + } else { + bind_address + }; + + let identity_keypair = keypair_of(&matches, "identity").unwrap_or_else(|| { + clap::Error::with_description( + "The --identity argument is required", + clap::ErrorKind::ArgumentNotFound, + ) + .exit(); + }); + + let peer_pubkey = value_t!(matches, "peer_pubkey", Pubkey).unwrap(); + + let entrypoint_addrs = values_t!(matches, "entrypoint", String) + .unwrap_or_default() + .into_iter() + .map(|entrypoint| { + solana_net_utils::parse_host_port(&entrypoint).unwrap_or_else(|e| { + eprintln!("failed to parse entrypoint address: {}", e); + exit(1); + }) + }) + .collect::>() + .into_iter() + .collect::>(); + + let expected_shred_version = value_t!(matches, "expected_shred_version", u16) + .ok() + .or_else(|| replica_util::get_cluster_shred_version(&entrypoint_addrs)); + + let gossip_host: IpAddr = matches + .value_of("gossip_host") + .map(|gossip_host| { + solana_net_utils::parse_host(gossip_host).unwrap_or_else(|err| { + eprintln!("Failed to parse --gossip-host: {}", err); + exit(1); + }) + }) + .unwrap_or_else(|| { + if !entrypoint_addrs.is_empty() { + let mut order: Vec<_> = (0..entrypoint_addrs.len()).collect(); + order.shuffle(&mut thread_rng()); + + let gossip_host = order.into_iter().find_map(|i| { + let entrypoint_addr = &entrypoint_addrs[i]; + info!( + "Contacting {} to determine the validator's public IP address", + entrypoint_addr + ); + solana_net_utils::get_public_ip_addr(entrypoint_addr).map_or_else( + |err| { + eprintln!( + "Failed to contact cluster entrypoint {}: {}", + entrypoint_addr, err + ); + None + }, + Some, + ) + }); + + gossip_host.unwrap_or_else(|| { + eprintln!("Unable to determine the validator's public IP address"); + exit(1); + }) + } else { + std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)) + } + }); + + let gossip_addr = SocketAddr::new( + gossip_host, + value_t!(matches, "gossip_port", u16).unwrap_or_else(|_| { + solana_net_utils::find_available_port_in_range(bind_address, (0, 1)).unwrap_or_else( + |err| { + eprintln!("Unable to find an available gossip port: {}", err); + exit(1); + }, + ) + }), + ); + + let dynamic_port_range = + solana_net_utils::parse_port_range(matches.value_of("dynamic_port_range").unwrap()) + .expect("invalid dynamic_port_range"); + + let cluster_entrypoints = entrypoint_addrs + .iter() + .map(ContactInfo::new_gossip_entry_point) + .collect::>(); + + let node = Node::new_with_external_ip( + &identity_keypair.pubkey(), + &gossip_addr, + dynamic_port_range, + bind_address, + ); + + let ledger_path = PathBuf::from(matches.value_of("ledger_path").unwrap()); + let snapshot_output_dir = if let Some(snapshots) = matches.value_of("snapshots") { + PathBuf::from(snapshots) + } else { + ledger_path.clone() + }; + let snapshot_path = snapshot_output_dir.join("snapshot"); + + let account_paths: Vec = + if let Ok(account_paths) = values_t!(matches, "account_paths", String) { + account_paths + .join(",") + .split(',') + .map(PathBuf::from) + .collect() + } else { + vec![ledger_path.join("accounts")] + }; + + let rpc_source_addr = + solana_net_utils::parse_host_port(matches.value_of("peer").unwrap_or_else(|| { + clap::Error::with_description( + "The --peer argument is required", + clap::ErrorKind::ArgumentNotFound, + ) + .exit(); + })) + .unwrap_or_else(|e| { + eprintln!("failed to parse entrypoint address: {}", e); + exit(1); + }); + + let rpc_port = value_t!(matches, "rpc_port", u16).unwrap_or_else(|_| { + clap::Error::with_description( + "The --rpc-port argument is required", + clap::ErrorKind::ArgumentNotFound, + ) + .exit(); + }); + let rpc_addrs = ( + SocketAddr::new(rpc_bind_address, rpc_port), + SocketAddr::new(rpc_bind_address, rpc_port + 1), + // If additional ports are added, +2 needs to be skipped to avoid a conflict with + // the websocket port (which is +2) in web3.js This odd port shifting is tracked at + // https://github.com/solana-labs/solana/issues/12250 + ); + + let logfile = { + let logfile = matches + .value_of("logfile") + .map(|s| s.into()) + .unwrap_or_else(|| format!("solana-replica-node-{}.log", identity_keypair.pubkey())); + + if logfile == "-" { + None + } else { + println!("log file: {}", logfile); + Some(logfile) + } + }; + let socket_addr_space = SocketAddrSpace::new(matches.is_present("allow_private_addr")); + + let _logger_thread = solana_validator::redirect_stderr_to_file(logfile); + + let (cluster_info, rpc_contact_info, snapshot_info) = replica_util::get_rpc_peer_info( + identity_keypair, + &cluster_entrypoints, + &ledger_path, + &node, + expected_shred_version, + &peer_pubkey, + &snapshot_output_dir, + socket_addr_space, + ); + + info!( + "Using RPC service from node {}: {:?}, snapshot_info: {:?}", + rpc_contact_info.id, rpc_contact_info.rpc, snapshot_info + ); + + let config = ReplicaNodeConfig { + rpc_source_addr, + rpc_addr: rpc_addrs.0, + rpc_pubsub_addr: rpc_addrs.1, + ledger_path, + snapshot_output_dir, + snapshot_path, + account_paths, + snapshot_info: snapshot_info.unwrap(), + cluster_info, + rpc_config: JsonRpcConfig::default(), + snapshot_config: None, + pubsub_config: PubSubConfig::default(), + socket_addr_space, + account_indexes: AccountSecondaryIndexes::default(), + accounts_db_caching_enabled: false, + replica_exit: Arc::new(RwLock::new(Exit::default())), + }; + + let validator = ReplicaNode::new(config); + validator.join(); +} diff --git a/replica-node/src/replica_node.rs b/replica-node/src/replica_node.rs new file mode 100644 index 0000000000..c37fa5bc85 --- /dev/null +++ b/replica-node/src/replica_node.rs @@ -0,0 +1,302 @@ +use { + crossbeam_channel::unbounded, + log::*, + solana_download_utils::download_snapshot, + solana_genesis_utils::download_then_check_genesis_hash, + solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo}, + solana_ledger::{ + blockstore::Blockstore, blockstore_db::AccessType, blockstore_processor, + leader_schedule_cache::LeaderScheduleCache, + }, + solana_rpc::{ + max_slots::MaxSlots, + optimistically_confirmed_bank_tracker::{ + OptimisticallyConfirmedBank, OptimisticallyConfirmedBankTracker, + }, + rpc::JsonRpcConfig, + rpc_pubsub_service::{PubSubConfig, PubSubService}, + rpc_service::JsonRpcService, + rpc_subscriptions::RpcSubscriptions, + }, + solana_runtime::{ + accounts_index::AccountSecondaryIndexes, + bank_forks::BankForks, + commitment::BlockCommitmentCache, + hardened_unpack::MAX_GENESIS_ARCHIVE_UNPACKED_SIZE, + snapshot_config::SnapshotConfig, + snapshot_utils::{self, ArchiveFormat}, + }, + solana_sdk::{clock::Slot, exit::Exit, genesis_config::GenesisConfig, hash::Hash}, + solana_streamer::socket::SocketAddrSpace, + std::{ + fs, + net::SocketAddr, + path::PathBuf, + sync::{ + atomic::{AtomicBool, AtomicU64}, + Arc, RwLock, + }, + }, +}; + +pub struct ReplicaNodeConfig { + pub rpc_source_addr: SocketAddr, + pub rpc_addr: SocketAddr, + pub rpc_pubsub_addr: SocketAddr, + pub ledger_path: PathBuf, + pub snapshot_output_dir: PathBuf, + pub snapshot_path: PathBuf, + pub account_paths: Vec, + pub snapshot_info: (Slot, Hash), + pub cluster_info: Arc, + pub rpc_config: JsonRpcConfig, + pub snapshot_config: Option, + pub pubsub_config: PubSubConfig, + pub account_indexes: AccountSecondaryIndexes, + pub accounts_db_caching_enabled: bool, + pub replica_exit: Arc>, + pub socket_addr_space: SocketAddrSpace, +} + +pub struct ReplicaNode { + json_rpc_service: Option, + pubsub_service: Option, + optimistically_confirmed_bank_tracker: Option, +} + +// Struct maintaining information about banks +struct ReplicaBankInfo { + bank_forks: Arc>, + optimistically_confirmed_bank: Arc>, + leader_schedule_cache: Arc, + block_commitment_cache: Arc>, +} + +// Initialize the replica by downloading snapshot from the peer, initialize +// the BankForks, OptimisticallyConfirmedBank, LeaderScheduleCache and +// BlockCommitmentCache and return the info wrapped as ReplicaBankInfo. +fn initialize_from_snapshot( + replica_config: &ReplicaNodeConfig, + snapshot_config: &SnapshotConfig, + genesis_config: &GenesisConfig, +) -> ReplicaBankInfo { + info!( + "Downloading snapshot from the peer into {:?}", + replica_config.snapshot_output_dir + ); + + download_snapshot( + &replica_config.rpc_source_addr, + &replica_config.snapshot_output_dir, + replica_config.snapshot_info, + false, + snapshot_config.maximum_snapshots_to_retain, + &mut None, + ) + .unwrap(); + + fs::create_dir_all(&snapshot_config.snapshot_path).expect("Couldn't create snapshot directory"); + + let archive_info = + snapshot_utils::get_highest_full_snapshot_archive_info(&replica_config.snapshot_output_dir) + .unwrap(); + + let process_options = blockstore_processor::ProcessOptions { + account_indexes: replica_config.account_indexes.clone(), + accounts_db_caching_enabled: replica_config.accounts_db_caching_enabled, + ..blockstore_processor::ProcessOptions::default() + }; + + info!( + "Build bank from snapshot archive: {:?}", + &snapshot_config.snapshot_path + ); + let (bank0, _) = snapshot_utils::bank_from_snapshot_archives( + &replica_config.account_paths, + &[], + &snapshot_config.snapshot_path, + archive_info.path(), + None, + *archive_info.archive_format(), + genesis_config, + process_options.debug_keys.clone(), + None, + process_options.account_indexes.clone(), + process_options.accounts_db_caching_enabled, + process_options.limit_load_slot_count_from_snapshot, + process_options.shrink_ratio, + process_options.accounts_db_test_hash_calculation, + process_options.verify_index, + ) + .unwrap(); + + let bank0_slot = bank0.slot(); + let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank0)); + + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank0))); + + let optimistically_confirmed_bank = + OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); + + let mut block_commitment_cache = BlockCommitmentCache::default(); + block_commitment_cache.initialize_slots(bank0_slot); + let block_commitment_cache = Arc::new(RwLock::new(block_commitment_cache)); + + ReplicaBankInfo { + bank_forks, + optimistically_confirmed_bank, + leader_schedule_cache, + block_commitment_cache, + } +} + +fn start_client_rpc_services( + replica_config: &ReplicaNodeConfig, + genesis_config: &GenesisConfig, + cluster_info: Arc, + bank_info: &ReplicaBankInfo, + socket_addr_space: &SocketAddrSpace, +) -> ( + Option, + Option, + Option, +) { + let ReplicaBankInfo { + bank_forks, + optimistically_confirmed_bank, + leader_schedule_cache, + block_commitment_cache, + } = bank_info; + let blockstore = Arc::new( + Blockstore::open_with_access_type( + &replica_config.ledger_path, + AccessType::PrimaryOnly, + None, + false, + ) + .unwrap(), + ); + + let max_complete_transaction_status_slot = Arc::new(AtomicU64::new(0)); + + let max_slots = Arc::new(MaxSlots::default()); + let exit = Arc::new(AtomicBool::new(false)); + + let subscriptions = Arc::new(RpcSubscriptions::new( + &exit, + bank_forks.clone(), + block_commitment_cache.clone(), + optimistically_confirmed_bank.clone(), + )); + + let rpc_override_health_check = Arc::new(AtomicBool::new(false)); + if ContactInfo::is_valid_address(&replica_config.rpc_addr, socket_addr_space) { + assert!(ContactInfo::is_valid_address( + &replica_config.rpc_pubsub_addr, + socket_addr_space + )); + } else { + assert!(!ContactInfo::is_valid_address( + &replica_config.rpc_pubsub_addr, + socket_addr_space + )); + } + + let (_bank_notification_sender, bank_notification_receiver) = unbounded(); + ( + Some(JsonRpcService::new( + replica_config.rpc_addr, + replica_config.rpc_config.clone(), + replica_config.snapshot_config.clone(), + bank_forks.clone(), + block_commitment_cache.clone(), + blockstore, + cluster_info, + None, + genesis_config.hash(), + &replica_config.ledger_path, + replica_config.replica_exit.clone(), + None, + rpc_override_health_check, + optimistically_confirmed_bank.clone(), + 0, + 0, + max_slots, + leader_schedule_cache.clone(), + max_complete_transaction_status_slot, + )), + Some(PubSubService::new( + replica_config.pubsub_config.clone(), + &subscriptions, + replica_config.rpc_pubsub_addr, + &exit, + )), + Some(OptimisticallyConfirmedBankTracker::new( + bank_notification_receiver, + &exit, + bank_forks.clone(), + optimistically_confirmed_bank.clone(), + subscriptions.clone(), + )), + ) +} + +impl ReplicaNode { + pub fn new(replica_config: ReplicaNodeConfig) -> Self { + let genesis_config = download_then_check_genesis_hash( + &replica_config.rpc_source_addr, + &replica_config.ledger_path, + None, + MAX_GENESIS_ARCHIVE_UNPACKED_SIZE, + false, + true, + ) + .unwrap(); + + let snapshot_config = SnapshotConfig { + snapshot_interval_slots: std::u64::MAX, + snapshot_package_output_path: replica_config.snapshot_output_dir.clone(), + snapshot_path: replica_config.snapshot_path.clone(), + archive_format: ArchiveFormat::TarBzip2, + snapshot_version: snapshot_utils::SnapshotVersion::default(), + maximum_snapshots_to_retain: + snapshot_utils::DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN, + }; + + let bank_info = + initialize_from_snapshot(&replica_config, &snapshot_config, &genesis_config); + + let (json_rpc_service, pubsub_service, optimistically_confirmed_bank_tracker) = + start_client_rpc_services( + &replica_config, + &genesis_config, + replica_config.cluster_info.clone(), + &bank_info, + &replica_config.socket_addr_space, + ); + + ReplicaNode { + json_rpc_service, + pubsub_service, + optimistically_confirmed_bank_tracker, + } + } + + pub fn join(self) { + if let Some(json_rpc_service) = self.json_rpc_service { + json_rpc_service.join().expect("rpc_service"); + } + + if let Some(pubsub_service) = self.pubsub_service { + pubsub_service.join().expect("pubsub_service"); + } + + if let Some(optimistically_confirmed_bank_tracker) = + self.optimistically_confirmed_bank_tracker + { + optimistically_confirmed_bank_tracker + .join() + .expect("optimistically_confirmed_bank_tracker"); + } + } +} diff --git a/replica-node/src/replica_util.rs b/replica-node/src/replica_util.rs new file mode 100644 index 0000000000..ba7f8dc5e4 --- /dev/null +++ b/replica-node/src/replica_util.rs @@ -0,0 +1,268 @@ +use { + log::*, + rand::{seq::SliceRandom, thread_rng, Rng}, + solana_gossip::{ + cluster_info::{ClusterInfo, Node}, + contact_info::ContactInfo, + gossip_service::GossipService, + }, + solana_runtime::snapshot_utils, + solana_sdk::{ + clock::Slot, + hash::Hash, + pubkey::Pubkey, + signature::{Keypair, Signer}, + }, + solana_streamer::socket::SocketAddrSpace, + std::{ + collections::HashSet, + net::{SocketAddr, UdpSocket}, + path::Path, + process::exit, + sync::{atomic::AtomicBool, Arc}, + thread::sleep, + time::{Duration, Instant}, + }, +}; + +pub fn get_cluster_shred_version(entrypoints: &[SocketAddr]) -> Option { + let entrypoints = { + let mut index: Vec<_> = (0..entrypoints.len()).collect(); + index.shuffle(&mut rand::thread_rng()); + index.into_iter().map(|i| &entrypoints[i]) + }; + for entrypoint in entrypoints { + match solana_net_utils::get_cluster_shred_version(entrypoint) { + Err(err) => eprintln!("get_cluster_shred_version failed: {}, {}", entrypoint, err), + Ok(0) => eprintln!("zero sherd-version from entrypoint: {}", entrypoint), + Ok(shred_version) => { + info!( + "obtained shred-version {} from {}", + shred_version, entrypoint + ); + return Some(shred_version); + } + } + } + None +} + +// Discover the RPC peer node via Gossip and return's ContactInfo +// And the initial snapshot info: (Slot, Hash) +// Alternatively, this can be solved via a RPC call instead of using gossip. +fn get_rpc_peer_node( + cluster_info: &ClusterInfo, + cluster_entrypoints: &[ContactInfo], + expected_shred_version: Option, + peer_pubkey: &Pubkey, + snapshot_output_dir: &Path, +) -> Option<(ContactInfo, Option<(Slot, Hash)>)> { + let mut newer_cluster_snapshot_timeout = None; + let mut retry_reason = None; + loop { + sleep(Duration::from_secs(1)); + info!("Searching for the rpc peer node and latest snapshot information with shred_version {:?}.", expected_shred_version); + info!("\n{}", cluster_info.rpc_info_trace()); + + let shred_version = + expected_shred_version.unwrap_or_else(|| cluster_info.my_shred_version()); + if shred_version == 0 { + let all_zero_shred_versions = cluster_entrypoints.iter().all(|cluster_entrypoint| { + cluster_info + .lookup_contact_info_by_gossip_addr(&cluster_entrypoint.gossip) + .map_or(false, |entrypoint| entrypoint.shred_version == 0) + }); + + if all_zero_shred_versions { + eprintln!( + "Entrypoint shred version is zero. Restart with --expected-shred-version" + ); + exit(1); + } + info!("Waiting to adopt entrypoint shred version..."); + continue; + } + + info!( + "Searching for an RPC service with shred version {}{}...", + shred_version, + retry_reason + .as_ref() + .map(|s| format!(" (Retrying: {})", s)) + .unwrap_or_default() + ); + + let rpc_peers = cluster_info + .all_rpc_peers() + .into_iter() + .filter(|contact_info| contact_info.shred_version == shred_version) + .collect::>(); + let rpc_peers_total = rpc_peers.len(); + + let rpc_peers_trusted = rpc_peers + .iter() + .filter(|rpc_peer| &rpc_peer.id == peer_pubkey) + .count(); + + info!( + "Total {} RPC nodes found. {} trusted", + rpc_peers_total, rpc_peers_trusted + ); + + let mut highest_snapshot_info: Option<(Slot, Hash)> = + snapshot_utils::get_highest_full_snapshot_archive_info(snapshot_output_dir).map( + |snapshot_archive_info| { + (*snapshot_archive_info.slot(), *snapshot_archive_info.hash()) + }, + ); + let eligible_rpc_peers = { + let mut eligible_rpc_peers = vec![]; + + for rpc_peer in rpc_peers.iter() { + if &rpc_peer.id != peer_pubkey { + continue; + } + cluster_info.get_snapshot_hash_for_node(&rpc_peer.id, |snapshot_hashes| { + for snapshot_hash in snapshot_hashes { + if highest_snapshot_info.is_none() + || snapshot_hash.0 > highest_snapshot_info.unwrap().0 + { + // Found a higher snapshot, remove all nodes with a lower snapshot + eligible_rpc_peers.clear(); + highest_snapshot_info = Some(*snapshot_hash) + } + + if Some(*snapshot_hash) == highest_snapshot_info { + eligible_rpc_peers.push(rpc_peer.clone()); + } + } + }); + } + + match highest_snapshot_info { + None => { + assert!(eligible_rpc_peers.is_empty()); + } + Some(highest_snapshot_info) => { + if eligible_rpc_peers.is_empty() { + match newer_cluster_snapshot_timeout { + None => newer_cluster_snapshot_timeout = Some(Instant::now()), + Some(newer_cluster_snapshot_timeout) => { + if newer_cluster_snapshot_timeout.elapsed().as_secs() > 180 { + warn!("giving up newer snapshot from the cluster"); + return None; + } + } + } + retry_reason = Some(format!( + "Wait for newer snapshot than local: {:?}", + highest_snapshot_info + )); + continue; + } + + info!( + "Highest available snapshot slot is {}, available from {} node{}: {:?}", + highest_snapshot_info.0, + eligible_rpc_peers.len(), + if eligible_rpc_peers.len() > 1 { + "s" + } else { + "" + }, + eligible_rpc_peers + .iter() + .map(|contact_info| contact_info.id) + .collect::>() + ); + } + } + eligible_rpc_peers + }; + + if !eligible_rpc_peers.is_empty() { + let contact_info = + &eligible_rpc_peers[thread_rng().gen_range(0, eligible_rpc_peers.len())]; + return Some((contact_info.clone(), highest_snapshot_info)); + } else { + retry_reason = Some("No snapshots available".to_owned()); + } + } +} + +fn start_gossip_node( + identity_keypair: Arc, + cluster_entrypoints: &[ContactInfo], + ledger_path: &Path, + gossip_addr: &SocketAddr, + gossip_socket: UdpSocket, + expected_shred_version: Option, + gossip_validators: Option>, + should_check_duplicate_instance: bool, + socket_addr_space: SocketAddrSpace, +) -> (Arc, Arc, GossipService) { + let contact_info = ClusterInfo::gossip_contact_info( + identity_keypair.pubkey(), + *gossip_addr, + expected_shred_version.unwrap_or(0), + ); + let mut cluster_info = ClusterInfo::new(contact_info, identity_keypair, socket_addr_space); + cluster_info.set_entrypoints(cluster_entrypoints.to_vec()); + cluster_info.restore_contact_info(ledger_path, 0); + let cluster_info = Arc::new(cluster_info); + + let gossip_exit_flag = Arc::new(AtomicBool::new(false)); + let gossip_service = GossipService::new( + &cluster_info, + None, + gossip_socket, + gossip_validators, + should_check_duplicate_instance, + &gossip_exit_flag, + ); + info!("Started gossip node"); + info!( + "The cluster contact info:\n{}", + cluster_info.contact_info_trace() + ); + + (cluster_info, gossip_exit_flag, gossip_service) +} + +// Get the RPC peer info given the peer's Pubkey +// Returns the ClusterInfo, the peer's ContactInfo and the initial snapshot info +pub fn get_rpc_peer_info( + identity_keypair: Keypair, + cluster_entrypoints: &[ContactInfo], + ledger_path: &Path, + node: &Node, + expected_shred_version: Option, + peer_pubkey: &Pubkey, + snapshot_output_dir: &Path, + socket_addr_space: SocketAddrSpace, +) -> (Arc, ContactInfo, Option<(Slot, Hash)>) { + let identity_keypair = Arc::new(identity_keypair); + + let gossip = start_gossip_node( + identity_keypair, + cluster_entrypoints, + ledger_path, + &node.info.gossip, + node.sockets.gossip.try_clone().unwrap(), + expected_shred_version, + None, + true, + socket_addr_space, + ); + + let rpc_node_details = get_rpc_peer_node( + &gossip.0, + cluster_entrypoints, + expected_shred_version, + peer_pubkey, + snapshot_output_dir, + ); + let rpc_node_details = rpc_node_details.unwrap(); + + (gossip.0, rpc_node_details.0, rpc_node_details.1) +} diff --git a/replica-node/tests/local_replica.rs b/replica-node/tests/local_replica.rs new file mode 100644 index 0000000000..eaf972ea6b --- /dev/null +++ b/replica-node/tests/local_replica.rs @@ -0,0 +1,278 @@ +#![allow(clippy::integer_arithmetic)] +use { + log::*, + serial_test::serial, + solana_core::validator::ValidatorConfig, + solana_gossip::{cluster_info::Node, contact_info::ContactInfo}, + solana_local_cluster::{ + cluster::Cluster, + local_cluster::{ClusterConfig, LocalCluster}, + validator_configs::*, + }, + solana_replica_node::{ + replica_node::{ReplicaNode, ReplicaNodeConfig}, + replica_util, + }, + solana_rpc::{rpc::JsonRpcConfig, rpc_pubsub_service::PubSubConfig}, + solana_runtime::{ + accounts_index::AccountSecondaryIndexes, + snapshot_config::SnapshotConfig, + snapshot_utils::{self, ArchiveFormat}, + }, + solana_sdk::{ + client::SyncClient, + clock::Slot, + commitment_config::CommitmentConfig, + epoch_schedule::MINIMUM_SLOTS_PER_EPOCH, + exit::Exit, + hash::Hash, + signature::{Keypair, Signer}, + }, + solana_streamer::socket::SocketAddrSpace, + std::{ + net::{IpAddr, Ipv4Addr, SocketAddr}, + path::{Path, PathBuf}, + sync::{Arc, RwLock}, + thread::sleep, + time::Duration, + }, + tempfile::TempDir, +}; + +const RUST_LOG_FILTER: &str = + "error,solana_core::replay_stage=warn,solana_local_cluster=info,local_cluster=info"; + +fn wait_for_next_snapshot( + cluster: &LocalCluster, + snapshot_package_output_path: &Path, +) -> (PathBuf, (Slot, Hash)) { + // Get slot after which this was generated + let client = cluster + .get_validator_client(&cluster.entry_point_info.id) + .unwrap(); + let last_slot = client + .get_slot_with_commitment(CommitmentConfig::processed()) + .expect("Couldn't get slot"); + + // Wait for a snapshot for a bank >= last_slot to be made so we know that the snapshot + // must include the transactions just pushed + trace!( + "Waiting for snapshot archive to be generated with slot > {}", + last_slot + ); + loop { + if let Some(full_snapshot_archive_info) = + snapshot_utils::get_highest_full_snapshot_archive_info(snapshot_package_output_path) + { + trace!( + "full snapshot for slot {} exists", + full_snapshot_archive_info.slot() + ); + if *full_snapshot_archive_info.slot() >= last_slot { + return ( + full_snapshot_archive_info.path().clone(), + ( + *full_snapshot_archive_info.slot(), + *full_snapshot_archive_info.hash(), + ), + ); + } + trace!( + "full snapshot slot {} < last_slot {}", + full_snapshot_archive_info.slot(), + last_slot + ); + } + sleep(Duration::from_millis(1000)); + } +} + +fn farf_dir() -> PathBuf { + std::env::var("FARF_DIR") + .unwrap_or_else(|_| "farf".to_string()) + .into() +} + +fn generate_account_paths(num_account_paths: usize) -> (Vec, Vec) { + let account_storage_dirs: Vec = (0..num_account_paths) + .map(|_| tempfile::tempdir_in(farf_dir()).unwrap()) + .collect(); + let account_storage_paths: Vec<_> = account_storage_dirs + .iter() + .map(|a| a.path().to_path_buf()) + .collect(); + (account_storage_dirs, account_storage_paths) +} + +struct SnapshotValidatorConfig { + _snapshot_dir: TempDir, + snapshot_archives_dir: TempDir, + account_storage_dirs: Vec, + validator_config: ValidatorConfig, +} + +fn setup_snapshot_validator_config( + snapshot_interval_slots: u64, + num_account_paths: usize, +) -> SnapshotValidatorConfig { + // Create the snapshot config + let snapshot_dir = tempfile::tempdir_in(farf_dir()).unwrap(); + let snapshot_archives_dir = tempfile::tempdir_in(farf_dir()).unwrap(); + let snapshot_config = SnapshotConfig { + snapshot_interval_slots, + snapshot_package_output_path: snapshot_archives_dir.path().to_path_buf(), + snapshot_path: snapshot_dir.path().to_path_buf(), + archive_format: ArchiveFormat::TarBzip2, + snapshot_version: snapshot_utils::SnapshotVersion::default(), + maximum_snapshots_to_retain: snapshot_utils::DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN, + }; + + // Create the account paths + let (account_storage_dirs, account_storage_paths) = generate_account_paths(num_account_paths); + + // Create the validator config + let validator_config = ValidatorConfig { + snapshot_config: Some(snapshot_config), + account_paths: account_storage_paths, + accounts_hash_interval_slots: snapshot_interval_slots, + ..ValidatorConfig::default() + }; + + SnapshotValidatorConfig { + _snapshot_dir: snapshot_dir, + snapshot_archives_dir, + account_storage_dirs, + validator_config, + } +} + +fn test_local_cluster_start_and_exit_with_config(socket_addr_space: SocketAddrSpace) { + solana_logger::setup(); + const NUM_NODES: usize = 1; + let mut config = ClusterConfig { + validator_configs: make_identical_validator_configs(&ValidatorConfig::default(), NUM_NODES), + node_stakes: vec![3; NUM_NODES], + cluster_lamports: 100, + ticks_per_slot: 8, + slots_per_epoch: MINIMUM_SLOTS_PER_EPOCH as u64, + stakers_slot_offset: MINIMUM_SLOTS_PER_EPOCH as u64, + ..ClusterConfig::default() + }; + let cluster = LocalCluster::new(&mut config, socket_addr_space); + assert_eq!(cluster.validators.len(), NUM_NODES); +} + +#[test] +#[serial] +fn test_replica_bootstrap() { + let socket_addr_space = SocketAddrSpace::new(true); + + test_local_cluster_start_and_exit_with_config(socket_addr_space); + + solana_logger::setup_with_default(RUST_LOG_FILTER); + // First set up the cluster with 1 node + let snapshot_interval_slots = 50; + let num_account_paths = 3; + + let leader_snapshot_test_config = + setup_snapshot_validator_config(snapshot_interval_slots, num_account_paths); + + info!( + "Snapshot config for the leader: accounts: {:?}, snapshot: {:?}", + leader_snapshot_test_config.account_storage_dirs, + leader_snapshot_test_config.snapshot_archives_dir + ); + let stake = 10_000; + let mut config = ClusterConfig { + node_stakes: vec![stake], + cluster_lamports: 1_000_000, + validator_configs: make_identical_validator_configs( + &leader_snapshot_test_config.validator_config, + 1, + ), + ..ClusterConfig::default() + }; + + let cluster = LocalCluster::new(&mut config, socket_addr_space); + + assert_eq!(cluster.validators.len(), 1); + let contact_info = &cluster.entry_point_info; + + info!("Contact info: {:?}", contact_info); + + // Get slot after which this was generated + let snapshot_package_output_path = &leader_snapshot_test_config + .validator_config + .snapshot_config + .as_ref() + .unwrap() + .snapshot_package_output_path; + info!("Waiting for snapshot"); + let (archive_filename, archive_snapshot_hash) = + wait_for_next_snapshot(&cluster, snapshot_package_output_path); + info!("found: {:?}", archive_filename); + + let identity_keypair = Keypair::new(); + + // now bring up a replica to talk to it. + let ip_addr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); + let port = solana_net_utils::find_available_port_in_range(ip_addr, (8101, 8200)).unwrap(); + let rpc_addr = SocketAddr::new(ip_addr, port); + + let port = solana_net_utils::find_available_port_in_range(ip_addr, (8201, 8300)).unwrap(); + + let rpc_pubsub_addr = SocketAddr::new(ip_addr, port); + let ledger_dir = tempfile::tempdir_in(farf_dir()).unwrap(); + let ledger_path = ledger_dir.path(); + let snapshot_output_dir = tempfile::tempdir_in(farf_dir()).unwrap(); + let snapshot_output_path = snapshot_output_dir.path(); + let snapshot_path = snapshot_output_path.join("snapshot"); + let account_paths: Vec = vec![ledger_path.join("accounts")]; + + let port = solana_net_utils::find_available_port_in_range(ip_addr, (8301, 8400)).unwrap(); + let gossip_addr = SocketAddr::new(ip_addr, port); + + let dynamic_port_range = solana_net_utils::parse_port_range("8401-8500").unwrap(); + let bind_address = solana_net_utils::parse_host("127.0.0.1").unwrap(); + let node = Node::new_with_external_ip( + &identity_keypair.pubkey(), + &gossip_addr, + dynamic_port_range, + bind_address, + ); + + info!("The peer id: {:?}", &contact_info.id); + let entry_points = vec![ContactInfo::new_gossip_entry_point(&contact_info.gossip)]; + let (cluster_info, _rpc_contact_info, _snapshot_info) = replica_util::get_rpc_peer_info( + identity_keypair, + &entry_points, + ledger_path, + &node, + None, + &contact_info.id, + snapshot_output_path, + socket_addr_space, + ); + + info!("The cluster info:\n{:?}", cluster_info.contact_info_trace()); + + let config = ReplicaNodeConfig { + rpc_source_addr: contact_info.rpc, + rpc_addr, + rpc_pubsub_addr, + ledger_path: ledger_path.to_path_buf(), + snapshot_output_dir: snapshot_output_path.to_path_buf(), + snapshot_path, + account_paths, + snapshot_info: archive_snapshot_hash, + cluster_info, + rpc_config: JsonRpcConfig::default(), + snapshot_config: None, + pubsub_config: PubSubConfig::default(), + socket_addr_space, + account_indexes: AccountSecondaryIndexes::default(), + accounts_db_caching_enabled: false, + replica_exit: Arc::new(RwLock::new(Exit::default())), + }; + let _replica_node = ReplicaNode::new(config); +}