diff --git a/Cargo.toml b/Cargo.toml index 12fe01d84f..f0c26403e1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,6 +68,7 @@ atty = "0.2" bincode = "1.0.0" bs58 = "0.2.0" byteorder = "1.2.1" +bytes = "0.4" chrono = { version = "0.4.0", features = ["serde"] } clap = "2.31" dirs = "1.0.2" diff --git a/src/bin/drone.rs b/src/bin/drone.rs index 1194e93a5c..8cd1a8ca0c 100644 --- a/src/bin/drone.rs +++ b/src/bin/drone.rs @@ -1,4 +1,5 @@ extern crate bincode; +extern crate bytes; #[macro_use] extern crate clap; extern crate serde_json; @@ -6,7 +7,8 @@ extern crate solana; extern crate tokio; extern crate tokio_codec; -use bincode::deserialize; +use bincode::{deserialize, serialize}; +use bytes::Bytes; use clap::{App, Arg}; use solana::crdt::NodeInfo; use solana::drone::{Drone, DroneRequest, DRONE_PORT}; @@ -18,6 +20,7 @@ use solana::signature::read_keypair; use solana::thin_client::poll_gossip_for_leader; use std::error; use std::fs::File; +use std::io; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::process::exit; use std::sync::{Arc, Mutex}; @@ -150,32 +153,43 @@ fn main() -> Result<(), Box> { let drone2 = drone.clone(); // let client_ip = socket.peer_addr().expect("drone peer_addr").ip(); let framed = BytesCodec::new().framed(socket); - let (_writer, reader) = framed.split(); + let (writer, reader) = framed.split(); - let processor = reader - .for_each(move |bytes| { - let req: DroneRequest = deserialize(&bytes).or_else(|err| { - use std::io; - Err(io::Error::new( - io::ErrorKind::Other, - format!("deserialize packet in drone: {:?}", err), - )) - })?; + let processor = reader.and_then(move |bytes| { + let req: DroneRequest = deserialize(&bytes).or_else(|err| { + Err(io::Error::new( + io::ErrorKind::Other, + format!("deserialize packet in drone: {:?}", err), + )) + })?; - println!("Airdrop requested..."); - // let res = drone2.lock().unwrap().check_rate_limit(client_ip); - let res1 = drone2.lock().unwrap().send_airdrop(req); - match res1 { - Ok(_) => println!("Airdrop sent!"), - Err(_) => println!("Request limit reached for this time slice"), - } - Ok(()) - }) - .then(|result| { - println!("Socket closed with result: {:?}", result); - Ok(()) - }); - tokio::spawn(processor) + println!("Airdrop requested..."); + // let res = drone2.lock().unwrap().check_rate_limit(client_ip); + let res1 = drone2.lock().unwrap().send_airdrop(req); + match res1 { + Ok(_) => println!("Airdrop sent!"), + Err(_) => println!("Request limit reached for this time slice"), + } + let response = res1?; + println!("Airdrop tx signature: {:?}", response); + let response_vec = serialize(&response).or_else(|err| { + Err(io::Error::new( + io::ErrorKind::Other, + format!("serialize signature in drone: {:?}", err), + )) + })?; + let response_bytes = Bytes::from(response_vec.clone()); + Ok(response_bytes) + }); + let server = writer + .send_all(processor.or_else(|err| { + Err(io::Error::new( + io::ErrorKind::Other, + format!("Drone response: {:?}", err), + )) + })) + .then(|_| Ok(())); + tokio::spawn(server) }); tokio::run(done); Ok(())