Add missing files
This commit is contained in:
		
							
								
								
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							@@ -1,3 +1,4 @@
 | 
				
			|||||||
Cargo.lock
 | 
					Cargo.lock
 | 
				
			||||||
/target/
 | 
					/target/
 | 
				
			||||||
**/*.rs.bk
 | 
					**/*.rs.bk
 | 
				
			||||||
 | 
					.cargo
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										99
									
								
								src/request.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										99
									
								
								src/request.rs
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,99 @@
 | 
				
			|||||||
 | 
					//! The `request` module defines the messages for the thin client.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					use bincode::serialize;
 | 
				
			||||||
 | 
					use hash::Hash;
 | 
				
			||||||
 | 
					use packet;
 | 
				
			||||||
 | 
					use packet::SharedPackets;
 | 
				
			||||||
 | 
					use signature::PublicKey;
 | 
				
			||||||
 | 
					use transaction::Transaction;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))]
 | 
				
			||||||
 | 
					#[derive(Serialize, Deserialize, Debug, Clone)]
 | 
				
			||||||
 | 
					pub enum Request {
 | 
				
			||||||
 | 
					    Transaction(Transaction),
 | 
				
			||||||
 | 
					    GetBalance { key: PublicKey },
 | 
				
			||||||
 | 
					    Subscribe { subscriptions: Vec<Subscription> },
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#[derive(Serialize, Deserialize, Debug, Clone)]
 | 
				
			||||||
 | 
					pub enum Subscription {
 | 
				
			||||||
 | 
					    EntryInfo,
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#[derive(Serialize, Deserialize, Debug, Clone)]
 | 
				
			||||||
 | 
					pub struct EntryInfo {
 | 
				
			||||||
 | 
					    pub id: Hash,
 | 
				
			||||||
 | 
					    pub num_hashes: u64,
 | 
				
			||||||
 | 
					    pub num_events: u64,
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					impl Request {
 | 
				
			||||||
 | 
					    /// Verify the request is valid.
 | 
				
			||||||
 | 
					    pub fn verify(&self) -> bool {
 | 
				
			||||||
 | 
					        match *self {
 | 
				
			||||||
 | 
					            Request::Transaction(ref tr) => tr.verify_plan(),
 | 
				
			||||||
 | 
					            _ => true,
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#[derive(Serialize, Deserialize, Debug)]
 | 
				
			||||||
 | 
					pub enum Response {
 | 
				
			||||||
 | 
					    Balance { key: PublicKey, val: Option<i64> },
 | 
				
			||||||
 | 
					    EntryInfo(EntryInfo),
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					pub fn to_request_packets(r: &packet::PacketRecycler, reqs: Vec<Request>) -> Vec<SharedPackets> {
 | 
				
			||||||
 | 
					    let mut out = vec![];
 | 
				
			||||||
 | 
					    for rrs in reqs.chunks(packet::NUM_PACKETS) {
 | 
				
			||||||
 | 
					        let p = r.allocate();
 | 
				
			||||||
 | 
					        p.write()
 | 
				
			||||||
 | 
					            .unwrap()
 | 
				
			||||||
 | 
					            .packets
 | 
				
			||||||
 | 
					            .resize(rrs.len(), Default::default());
 | 
				
			||||||
 | 
					        for (i, o) in rrs.iter().zip(p.write().unwrap().packets.iter_mut()) {
 | 
				
			||||||
 | 
					            let v = serialize(&i).expect("serialize request");
 | 
				
			||||||
 | 
					            let len = v.len();
 | 
				
			||||||
 | 
					            o.data[..len].copy_from_slice(&v);
 | 
				
			||||||
 | 
					            o.meta.size = len;
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        out.push(p);
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    return out;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#[cfg(test)]
 | 
				
			||||||
 | 
					mod tests {
 | 
				
			||||||
 | 
					    use bincode::serialize;
 | 
				
			||||||
 | 
					    use ecdsa;
 | 
				
			||||||
 | 
					    use packet::{PacketRecycler, NUM_PACKETS};
 | 
				
			||||||
 | 
					    use request::{to_request_packets, Request};
 | 
				
			||||||
 | 
					    use transaction::{memfind, test_tx};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    #[test]
 | 
				
			||||||
 | 
					    fn test_layout() {
 | 
				
			||||||
 | 
					        let tr = test_tx();
 | 
				
			||||||
 | 
					        let tx = serialize(&tr).unwrap();
 | 
				
			||||||
 | 
					        let packet = serialize(&Request::Transaction(tr)).unwrap();
 | 
				
			||||||
 | 
					        assert_matches!(memfind(&packet, &tx), Some(ecdsa::TX_OFFSET));
 | 
				
			||||||
 | 
					        assert_matches!(memfind(&packet, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), None);
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    #[test]
 | 
				
			||||||
 | 
					    fn test_to_packets() {
 | 
				
			||||||
 | 
					        let tr = Request::Transaction(test_tx());
 | 
				
			||||||
 | 
					        let re = PacketRecycler::default();
 | 
				
			||||||
 | 
					        let rv = to_request_packets(&re, vec![tr.clone(); 1]);
 | 
				
			||||||
 | 
					        assert_eq!(rv.len(), 1);
 | 
				
			||||||
 | 
					        assert_eq!(rv[0].read().unwrap().packets.len(), 1);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        let rv = to_request_packets(&re, vec![tr.clone(); NUM_PACKETS]);
 | 
				
			||||||
 | 
					        assert_eq!(rv.len(), 1);
 | 
				
			||||||
 | 
					        assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        let rv = to_request_packets(&re, vec![tr.clone(); NUM_PACKETS + 1]);
 | 
				
			||||||
 | 
					        assert_eq!(rv.len(), 2);
 | 
				
			||||||
 | 
					        assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS);
 | 
				
			||||||
 | 
					        assert_eq!(rv[1].read().unwrap().packets.len(), 1);
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
							
								
								
									
										232
									
								
								src/request_processor.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										232
									
								
								src/request_processor.rs
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,232 @@
 | 
				
			|||||||
 | 
					//! The `request_stage` processes thin client Request messages.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					use accountant::Accountant;
 | 
				
			||||||
 | 
					use bincode::{deserialize, serialize};
 | 
				
			||||||
 | 
					use entry::Entry;
 | 
				
			||||||
 | 
					use event::Event;
 | 
				
			||||||
 | 
					use event_processor::EventProcessor;
 | 
				
			||||||
 | 
					use packet;
 | 
				
			||||||
 | 
					use packet::SharedPackets;
 | 
				
			||||||
 | 
					use rayon::prelude::*;
 | 
				
			||||||
 | 
					use request::{EntryInfo, Request, Response, Subscription};
 | 
				
			||||||
 | 
					use result::Result;
 | 
				
			||||||
 | 
					use std::collections::VecDeque;
 | 
				
			||||||
 | 
					use std::net::{SocketAddr, UdpSocket};
 | 
				
			||||||
 | 
					use std::sync::mpsc::{Receiver, Sender};
 | 
				
			||||||
 | 
					use std::sync::{Arc, Mutex};
 | 
				
			||||||
 | 
					use std::time::Duration;
 | 
				
			||||||
 | 
					use std::time::Instant;
 | 
				
			||||||
 | 
					use streamer;
 | 
				
			||||||
 | 
					use timing;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					pub struct RequestProcessor {
 | 
				
			||||||
 | 
					    accountant: Arc<Accountant>,
 | 
				
			||||||
 | 
					    entry_info_subscribers: Mutex<Vec<SocketAddr>>,
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					impl RequestProcessor {
 | 
				
			||||||
 | 
					    /// Create a new Tpu that wraps the given Accountant.
 | 
				
			||||||
 | 
					    pub fn new(accountant: Arc<Accountant>) -> Self {
 | 
				
			||||||
 | 
					        RequestProcessor {
 | 
				
			||||||
 | 
					            accountant,
 | 
				
			||||||
 | 
					            entry_info_subscribers: Mutex::new(vec![]),
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    /// Process Request items sent by clients.
 | 
				
			||||||
 | 
					    fn process_request(
 | 
				
			||||||
 | 
					        &self,
 | 
				
			||||||
 | 
					        msg: Request,
 | 
				
			||||||
 | 
					        rsp_addr: SocketAddr,
 | 
				
			||||||
 | 
					    ) -> Option<(Response, SocketAddr)> {
 | 
				
			||||||
 | 
					        match msg {
 | 
				
			||||||
 | 
					            Request::GetBalance { key } => {
 | 
				
			||||||
 | 
					                let val = self.accountant.get_balance(&key);
 | 
				
			||||||
 | 
					                let rsp = (Response::Balance { key, val }, rsp_addr);
 | 
				
			||||||
 | 
					                info!("Response::Balance {:?}", rsp);
 | 
				
			||||||
 | 
					                Some(rsp)
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					            Request::Transaction(_) => unreachable!(),
 | 
				
			||||||
 | 
					            Request::Subscribe { subscriptions } => {
 | 
				
			||||||
 | 
					                for subscription in subscriptions {
 | 
				
			||||||
 | 
					                    match subscription {
 | 
				
			||||||
 | 
					                        Subscription::EntryInfo => {
 | 
				
			||||||
 | 
					                            self.entry_info_subscribers.lock().unwrap().push(rsp_addr)
 | 
				
			||||||
 | 
					                        }
 | 
				
			||||||
 | 
					                    }
 | 
				
			||||||
 | 
					                }
 | 
				
			||||||
 | 
					                None
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    pub fn process_requests(
 | 
				
			||||||
 | 
					        &self,
 | 
				
			||||||
 | 
					        reqs: Vec<(Request, SocketAddr)>,
 | 
				
			||||||
 | 
					    ) -> Vec<(Response, SocketAddr)> {
 | 
				
			||||||
 | 
					        reqs.into_iter()
 | 
				
			||||||
 | 
					            .filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr))
 | 
				
			||||||
 | 
					            .collect()
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    pub fn notify_entry_info_subscribers(&self, entry: &Entry) {
 | 
				
			||||||
 | 
					        // TODO: No need to bind().
 | 
				
			||||||
 | 
					        let socket = UdpSocket::bind("0.0.0.0:0").expect("bind");
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        // copy subscribers to avoid taking lock while doing io
 | 
				
			||||||
 | 
					        let addrs = self.entry_info_subscribers.lock().unwrap().clone();
 | 
				
			||||||
 | 
					        trace!("Sending to {} addrs", addrs.len());
 | 
				
			||||||
 | 
					        for addr in addrs {
 | 
				
			||||||
 | 
					            let entry_info = EntryInfo {
 | 
				
			||||||
 | 
					                id: entry.id,
 | 
				
			||||||
 | 
					                num_hashes: entry.num_hashes,
 | 
				
			||||||
 | 
					                num_events: entry.events.len() as u64,
 | 
				
			||||||
 | 
					            };
 | 
				
			||||||
 | 
					            let data = serialize(&Response::EntryInfo(entry_info)).expect("serialize EntryInfo");
 | 
				
			||||||
 | 
					            trace!("sending {} to {}", data.len(), addr);
 | 
				
			||||||
 | 
					            //TODO dont do IO here, this needs to be on a separate channel
 | 
				
			||||||
 | 
					            let res = socket.send_to(&data, addr);
 | 
				
			||||||
 | 
					            if res.is_err() {
 | 
				
			||||||
 | 
					                eprintln!("couldn't send response: {:?}", res);
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    fn deserialize_requests(p: &packet::Packets) -> Vec<Option<(Request, SocketAddr)>> {
 | 
				
			||||||
 | 
					        p.packets
 | 
				
			||||||
 | 
					            .par_iter()
 | 
				
			||||||
 | 
					            .map(|x| {
 | 
				
			||||||
 | 
					                deserialize(&x.data[0..x.meta.size])
 | 
				
			||||||
 | 
					                    .map(|req| (req, x.meta.addr()))
 | 
				
			||||||
 | 
					                    .ok()
 | 
				
			||||||
 | 
					            })
 | 
				
			||||||
 | 
					            .collect()
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    // Copy-paste of deserialize_requests() because I can't figure out how to
 | 
				
			||||||
 | 
					    // route the lifetimes in a generic version.
 | 
				
			||||||
 | 
					    pub fn deserialize_events(p: &packet::Packets) -> Vec<Option<(Event, SocketAddr)>> {
 | 
				
			||||||
 | 
					        p.packets
 | 
				
			||||||
 | 
					            .par_iter()
 | 
				
			||||||
 | 
					            .map(|x| {
 | 
				
			||||||
 | 
					                deserialize(&x.data[0..x.meta.size])
 | 
				
			||||||
 | 
					                    .map(|req| (req, x.meta.addr()))
 | 
				
			||||||
 | 
					                    .ok()
 | 
				
			||||||
 | 
					            })
 | 
				
			||||||
 | 
					            .collect()
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    /// Split Request list into verified transactions and the rest
 | 
				
			||||||
 | 
					    fn partition_requests(
 | 
				
			||||||
 | 
					        req_vers: Vec<(Request, SocketAddr, u8)>,
 | 
				
			||||||
 | 
					    ) -> (Vec<Event>, Vec<(Request, SocketAddr)>) {
 | 
				
			||||||
 | 
					        let mut events = vec![];
 | 
				
			||||||
 | 
					        let mut reqs = vec![];
 | 
				
			||||||
 | 
					        for (msg, rsp_addr, verify) in req_vers {
 | 
				
			||||||
 | 
					            match msg {
 | 
				
			||||||
 | 
					                Request::Transaction(tr) => {
 | 
				
			||||||
 | 
					                    if verify != 0 {
 | 
				
			||||||
 | 
					                        events.push(Event::Transaction(tr));
 | 
				
			||||||
 | 
					                    }
 | 
				
			||||||
 | 
					                }
 | 
				
			||||||
 | 
					                _ => reqs.push((msg, rsp_addr)),
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        (events, reqs)
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    fn serialize_response(
 | 
				
			||||||
 | 
					        resp: Response,
 | 
				
			||||||
 | 
					        rsp_addr: SocketAddr,
 | 
				
			||||||
 | 
					        blob_recycler: &packet::BlobRecycler,
 | 
				
			||||||
 | 
					    ) -> Result<packet::SharedBlob> {
 | 
				
			||||||
 | 
					        let blob = blob_recycler.allocate();
 | 
				
			||||||
 | 
					        {
 | 
				
			||||||
 | 
					            let mut b = blob.write().unwrap();
 | 
				
			||||||
 | 
					            let v = serialize(&resp)?;
 | 
				
			||||||
 | 
					            let len = v.len();
 | 
				
			||||||
 | 
					            b.data[..len].copy_from_slice(&v);
 | 
				
			||||||
 | 
					            b.meta.size = len;
 | 
				
			||||||
 | 
					            b.meta.set_addr(&rsp_addr);
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        Ok(blob)
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    fn serialize_responses(
 | 
				
			||||||
 | 
					        rsps: Vec<(Response, SocketAddr)>,
 | 
				
			||||||
 | 
					        blob_recycler: &packet::BlobRecycler,
 | 
				
			||||||
 | 
					    ) -> Result<VecDeque<packet::SharedBlob>> {
 | 
				
			||||||
 | 
					        let mut blobs = VecDeque::new();
 | 
				
			||||||
 | 
					        for (resp, rsp_addr) in rsps {
 | 
				
			||||||
 | 
					            blobs.push_back(Self::serialize_response(resp, rsp_addr, blob_recycler)?);
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        Ok(blobs)
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    pub fn process_request_packets(
 | 
				
			||||||
 | 
					        &self,
 | 
				
			||||||
 | 
					        event_processor: &EventProcessor,
 | 
				
			||||||
 | 
					        verified_receiver: &Receiver<Vec<(SharedPackets, Vec<u8>)>>,
 | 
				
			||||||
 | 
					        entry_sender: &Sender<Entry>,
 | 
				
			||||||
 | 
					        blob_sender: &streamer::BlobSender,
 | 
				
			||||||
 | 
					        packet_recycler: &packet::PacketRecycler,
 | 
				
			||||||
 | 
					        blob_recycler: &packet::BlobRecycler,
 | 
				
			||||||
 | 
					    ) -> Result<()> {
 | 
				
			||||||
 | 
					        let timer = Duration::new(1, 0);
 | 
				
			||||||
 | 
					        let recv_start = Instant::now();
 | 
				
			||||||
 | 
					        let mms = verified_receiver.recv_timeout(timer)?;
 | 
				
			||||||
 | 
					        let mut reqs_len = 0;
 | 
				
			||||||
 | 
					        let mms_len = mms.len();
 | 
				
			||||||
 | 
					        info!(
 | 
				
			||||||
 | 
					            "@{:?} process start stalled for: {:?}ms batches: {}",
 | 
				
			||||||
 | 
					            timing::timestamp(),
 | 
				
			||||||
 | 
					            timing::duration_as_ms(&recv_start.elapsed()),
 | 
				
			||||||
 | 
					            mms.len(),
 | 
				
			||||||
 | 
					        );
 | 
				
			||||||
 | 
					        let proc_start = Instant::now();
 | 
				
			||||||
 | 
					        for (msgs, vers) in mms {
 | 
				
			||||||
 | 
					            let reqs = Self::deserialize_requests(&msgs.read().unwrap());
 | 
				
			||||||
 | 
					            reqs_len += reqs.len();
 | 
				
			||||||
 | 
					            let req_vers = reqs.into_iter()
 | 
				
			||||||
 | 
					                .zip(vers)
 | 
				
			||||||
 | 
					                .filter_map(|(req, ver)| req.map(|(msg, addr)| (msg, addr, ver)))
 | 
				
			||||||
 | 
					                .filter(|x| {
 | 
				
			||||||
 | 
					                    let v = x.0.verify();
 | 
				
			||||||
 | 
					                    v
 | 
				
			||||||
 | 
					                })
 | 
				
			||||||
 | 
					                .collect();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            debug!("partitioning");
 | 
				
			||||||
 | 
					            let (events, reqs) = Self::partition_requests(req_vers);
 | 
				
			||||||
 | 
					            debug!("events: {} reqs: {}", events.len(), reqs.len());
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            debug!("process_events");
 | 
				
			||||||
 | 
					            let entry = event_processor.process_events(events)?;
 | 
				
			||||||
 | 
					            entry_sender.send(entry)?;
 | 
				
			||||||
 | 
					            debug!("done process_events");
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            debug!("process_requests");
 | 
				
			||||||
 | 
					            let rsps = self.process_requests(reqs);
 | 
				
			||||||
 | 
					            debug!("done process_requests");
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            let blobs = Self::serialize_responses(rsps, blob_recycler)?;
 | 
				
			||||||
 | 
					            if !blobs.is_empty() {
 | 
				
			||||||
 | 
					                info!("process: sending blobs: {}", blobs.len());
 | 
				
			||||||
 | 
					                //don't wake up the other side if there is nothing
 | 
				
			||||||
 | 
					                blob_sender.send(blobs)?;
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					            packet_recycler.recycle(msgs);
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        let total_time_s = timing::duration_as_s(&proc_start.elapsed());
 | 
				
			||||||
 | 
					        let total_time_ms = timing::duration_as_ms(&proc_start.elapsed());
 | 
				
			||||||
 | 
					        info!(
 | 
				
			||||||
 | 
					            "@{:?} done process batches: {} time: {:?}ms reqs: {} reqs/s: {}",
 | 
				
			||||||
 | 
					            timing::timestamp(),
 | 
				
			||||||
 | 
					            mms_len,
 | 
				
			||||||
 | 
					            total_time_ms,
 | 
				
			||||||
 | 
					            reqs_len,
 | 
				
			||||||
 | 
					            (reqs_len as f32) / (total_time_s)
 | 
				
			||||||
 | 
					        );
 | 
				
			||||||
 | 
					        Ok(())
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
		Reference in New Issue
	
	Block a user