add a persistent ledger of index and data files
This commit is contained in:
		
							
								
								
									
										215
									
								
								src/ledger.rs
									
									
									
									
									
								
							
							
						
						
									
										215
									
								
								src/ledger.rs
									
									
									
									
									
								
							@@ -1,64 +1,154 @@
 | 
			
		||||
//! The `ledger` module provides functions for parallel verification of the
 | 
			
		||||
//! Proof of History ledger.
 | 
			
		||||
 | 
			
		||||
use bincode::{deserialize, serialize_into};
 | 
			
		||||
use bincode::{deserialize, deserialize_from, serialize_into};
 | 
			
		||||
use entry::Entry;
 | 
			
		||||
use hash::Hash;
 | 
			
		||||
use packet::{self, SharedBlob, BLOB_DATA_SIZE};
 | 
			
		||||
use rayon::prelude::*;
 | 
			
		||||
use result::{Error, Result};
 | 
			
		||||
use std::collections::VecDeque;
 | 
			
		||||
use std::io::Cursor;
 | 
			
		||||
use std::fs::{create_dir_all, File, OpenOptions};
 | 
			
		||||
use std::io;
 | 
			
		||||
use std::io::prelude::*;
 | 
			
		||||
use std::io::{Cursor, ErrorKind, Seek, SeekFrom};
 | 
			
		||||
use std::mem::size_of;
 | 
			
		||||
use std::path::Path;
 | 
			
		||||
use transaction::Transaction;
 | 
			
		||||
use std::fs::{File, OpenOptions};
 | 
			
		||||
 | 
			
		||||
// ledger 
 | 
			
		||||
pub struct Ledger{
 | 
			
		||||
    entry_len_len: usize;
 | 
			
		||||
    
 | 
			
		||||
    entry_height: u64; // current index
 | 
			
		||||
 | 
			
		||||
    pub struct Files {
 | 
			
		||||
        index: File; // an array of usize elements
 | 
			
		||||
        data:  File; // concatenated entries
 | 
			
		||||
// ledger window
 | 
			
		||||
pub struct LedgerWindow {
 | 
			
		||||
    index: File,
 | 
			
		||||
    data: File,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
    reader: Files;
 | 
			
		||||
    writer: Files;
 | 
			
		||||
impl LedgerWindow {
 | 
			
		||||
    // opens a Ledger in directory, provides "infinite" window
 | 
			
		||||
    pub fn new(directory: String) -> io::Result<Self> {
 | 
			
		||||
        let directory = Path::new(&directory);
 | 
			
		||||
 | 
			
		||||
    pub fn new(directory: String) -> Self {
 | 
			
		||||
        let index = File::open(directory.join("index"))?;
 | 
			
		||||
        let data = File::open(directory.join("data"))?;
 | 
			
		||||
 | 
			
		||||
        Ok(LedgerWindow { index, data })
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn get_entry_height(&self) -> u64 {
 | 
			
		||||
        entry_height
 | 
			
		||||
    pub fn get_entry(&mut self, index: u64) -> io::Result<Entry> {
 | 
			
		||||
        fn u64_at(file: &mut File, at: u64) -> io::Result<u64> {
 | 
			
		||||
            file.seek(SeekFrom::Start(at))?;
 | 
			
		||||
            deserialize_from(file.take(size_of::<u64>() as u64))
 | 
			
		||||
                .map_err(|e| io::Error::new(ErrorKind::Other, e.to_string()))
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
    pub fn entry_at(&self, index: u64) -> Result<Entry> {
 | 
			
		||||
        let end_offset = u64_at(&mut self.index, index * size_of::<u64>() as u64)?;
 | 
			
		||||
 | 
			
		||||
        let start_offset = if index != 0 {
 | 
			
		||||
            u64_at(&mut self.index, (index - 1) * size_of::<u64>() as u64)?
 | 
			
		||||
        } else {
 | 
			
		||||
            0u64
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        fn entry_at(file: &mut File, at: u64, len: u64) -> io::Result<Entry> {
 | 
			
		||||
            file.seek(SeekFrom::Start(at))?;
 | 
			
		||||
 | 
			
		||||
            deserialize_from(file.take(len))
 | 
			
		||||
                .map_err(|e| io::Error::new(ErrorKind::Other, e.to_string()))
 | 
			
		||||
        }
 | 
			
		||||
    pub fn append_entry(&self, entry: &Entry) -> io::Result<u64> {
 | 
			
		||||
        Ok(0)
 | 
			
		||||
    }
 | 
			
		||||
    pub fn append_entries(&self, entries: &[Entry]) -> io::Result<u64> {
 | 
			
		||||
       Ok(0)
 | 
			
		||||
 | 
			
		||||
        entry_at(&mut self.data, start_offset, end_offset - start_offset)
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<R: BufRead> Iterator for Ledger<R> {
 | 
			
		||||
pub struct LedgerWriter {
 | 
			
		||||
    index: File,
 | 
			
		||||
    data: File,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl LedgerWriter {
 | 
			
		||||
    // opens or creates a LedgerWriter in directory
 | 
			
		||||
    pub fn new(directory: String) -> io::Result<Self> {
 | 
			
		||||
        let directory = Path::new(&directory);
 | 
			
		||||
 | 
			
		||||
        create_dir_all(directory)?;
 | 
			
		||||
 | 
			
		||||
        let index = OpenOptions::new()
 | 
			
		||||
            .create(true)
 | 
			
		||||
            .append(true)
 | 
			
		||||
            .open(directory.join("index"))?;
 | 
			
		||||
 | 
			
		||||
        let data = OpenOptions::new()
 | 
			
		||||
            .create(true)
 | 
			
		||||
            .append(true)
 | 
			
		||||
            .open(directory.join("data"))?;
 | 
			
		||||
 | 
			
		||||
        Ok(LedgerWriter { index, data })
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn write_entry(&mut self, entry: &Entry) -> io::Result<()> {
 | 
			
		||||
        serialize_into(&mut self.data, &entry)
 | 
			
		||||
            .map_err(|e| io::Error::new(ErrorKind::Other, e.to_string()))?;
 | 
			
		||||
        self.data.flush()?;
 | 
			
		||||
 | 
			
		||||
        let offset = self.data.seek(SeekFrom::Current(0))?;
 | 
			
		||||
        serialize_into(&mut self.index, &offset)
 | 
			
		||||
            .map_err(|e| io::Error::new(ErrorKind::Other, e.to_string()))?;
 | 
			
		||||
        self.index.flush()
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn write_entries<I>(&mut self, entries: I) -> io::Result<()>
 | 
			
		||||
    where
 | 
			
		||||
        I: IntoIterator<Item = Entry>,
 | 
			
		||||
    {
 | 
			
		||||
        for entry in entries {
 | 
			
		||||
            self.write_entry(&entry)?;
 | 
			
		||||
        }
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub struct LedgerReader {
 | 
			
		||||
    offset: u64, // next start_offset
 | 
			
		||||
    index: File,
 | 
			
		||||
    data: File,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl Iterator for LedgerReader {
 | 
			
		||||
    type Item = io::Result<Entry>;
 | 
			
		||||
 | 
			
		||||
    fn next(&mut self) -> Option<io::Result<Entry>> {
 | 
			
		||||
        let mut entry_len_bytes = [0u8; sizeof(::<usize>()]; // TODO: sizeof()?
 | 
			
		||||
        
 | 
			
		||||
        let mut entry_len =
 | 
			
		||||
            if self.reader.index
 | 
			
		||||
            .read_exact(&mut entry_len_bytes[..self.entry_len_len])
 | 
			
		||||
            .is_ok()
 | 
			
		||||
        fn next_offset(file: &mut File) -> io::Result<u64> {
 | 
			
		||||
            deserialize_from(file.take(size_of::<u64>() as u64))
 | 
			
		||||
                .map_err(|e| io::Error::new(ErrorKind::Other, e.to_string()))
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        fn next_entry(file: &mut File, len: u64) -> io::Result<Entry> {
 | 
			
		||||
            deserialize_from(file.take(len))
 | 
			
		||||
                .map_err(|e| io::Error::new(ErrorKind::Other, e.to_string()))
 | 
			
		||||
        }
 | 
			
		||||
        match next_offset(&mut self.index) {
 | 
			
		||||
            Ok(end_offset) => {
 | 
			
		||||
                let len = end_offset - self.offset;
 | 
			
		||||
                self.offset = end_offset;
 | 
			
		||||
                Some(next_entry(&mut self.data, len))
 | 
			
		||||
            }
 | 
			
		||||
            Err(_) => None,
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Return an iterator for all the entries in the given file.
 | 
			
		||||
pub fn read_ledger(directory: String) -> io::Result<impl Iterator<Item = io::Result<Entry>>> {
 | 
			
		||||
    let directory = Path::new(&directory);
 | 
			
		||||
 | 
			
		||||
    let index = File::open(directory.join("index"))?;
 | 
			
		||||
    let data = File::open(directory.join("data"))?;
 | 
			
		||||
 | 
			
		||||
    Ok(LedgerReader {
 | 
			
		||||
        offset: 0,
 | 
			
		||||
        index,
 | 
			
		||||
        data,
 | 
			
		||||
    })
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// a Block is a slice of Entries
 | 
			
		||||
pub trait Block {
 | 
			
		||||
@@ -204,6 +294,7 @@ mod tests {
 | 
			
		||||
    use hash::hash;
 | 
			
		||||
    use packet::{BlobRecycler, BLOB_DATA_SIZE, PACKET_DATA_SIZE};
 | 
			
		||||
    use signature::{KeyPair, KeyPairUtil};
 | 
			
		||||
    use std;
 | 
			
		||||
    use std::net::{IpAddr, Ipv4Addr, SocketAddr};
 | 
			
		||||
    use transaction::{Transaction, Vote};
 | 
			
		||||
 | 
			
		||||
@@ -221,8 +312,7 @@ mod tests {
 | 
			
		||||
        assert!(!bad_ticks.verify(&zero)); // inductive step, bad
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn test_entries_to_blobs() {
 | 
			
		||||
    fn make_test_entries() -> Vec<Entry> {
 | 
			
		||||
        let zero = Hash::default();
 | 
			
		||||
        let one = hash(&zero);
 | 
			
		||||
        let keypair = KeyPair::new();
 | 
			
		||||
@@ -248,7 +338,13 @@ mod tests {
 | 
			
		||||
        //                                V
 | 
			
		||||
        let mut transactions = vec![tx0; 362];
 | 
			
		||||
        transactions.extend(vec![tx1; 100]);
 | 
			
		||||
        let entries = next_entries(&zero, 0, transactions);
 | 
			
		||||
        next_entries(&zero, 0, transactions);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn test_entries_to_blobs() {
 | 
			
		||||
        let entries = make_test_entries();
 | 
			
		||||
 | 
			
		||||
        let blob_recycler = BlobRecycler::default();
 | 
			
		||||
        let mut blob_q = VecDeque::new();
 | 
			
		||||
        entries.to_blobs(&blob_recycler, &mut blob_q);
 | 
			
		||||
@@ -316,17 +412,46 @@ mod tests {
 | 
			
		||||
        assert!(entries0[0].has_more);
 | 
			
		||||
        assert!(!entries0[entries0.len() - 1].has_more);
 | 
			
		||||
        assert!(entries0.verify(&id));
 | 
			
		||||
        // test hand-construction... brittle, changes if split method changes... ?
 | 
			
		||||
        //        let mut entries1 = vec![];
 | 
			
		||||
        //        entries1.push(Entry::new(&id, 1, transactions[..threshold].to_vec(), true));
 | 
			
		||||
        //        id = entries1[0].id;
 | 
			
		||||
        //        entries1.push(Entry::new(
 | 
			
		||||
        //            &id,
 | 
			
		||||
        //            1,
 | 
			
		||||
        //            transactions[threshold..].to_vec(),
 | 
			
		||||
        //            false,
 | 
			
		||||
        //        ));
 | 
			
		||||
        //
 | 
			
		||||
        //        assert_eq!(entries0, entries1);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn test_ledger_reader_writer() {
 | 
			
		||||
        let keypair = KeyPair::new();
 | 
			
		||||
 | 
			
		||||
        let ledger_path = {
 | 
			
		||||
            let id = {
 | 
			
		||||
                let ids: Vec<_> = keypair
 | 
			
		||||
                    .pubkey()
 | 
			
		||||
                    .iter()
 | 
			
		||||
                    .map(|id| format!("{}", id))
 | 
			
		||||
                    .collect();
 | 
			
		||||
                ids.join("")
 | 
			
		||||
            };
 | 
			
		||||
 | 
			
		||||
            format!("target/test_ledger_reader_writer_window-{}", id)
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        let entries = make_test_entries();
 | 
			
		||||
 | 
			
		||||
        let mut writer = LedgerWriter::new(ledger_path.clone()).unwrap();
 | 
			
		||||
        writer.write_entries(entries.clone()).unwrap();
 | 
			
		||||
 | 
			
		||||
        let mut read_entries = vec![];
 | 
			
		||||
        for x in read_ledger(ledger_path.clone()).unwrap() {
 | 
			
		||||
            let entry = x.unwrap();
 | 
			
		||||
            trace!("entry... {:?}", entry);
 | 
			
		||||
            read_entries.push(entry);
 | 
			
		||||
        }
 | 
			
		||||
        assert_eq!(read_entries, entries);
 | 
			
		||||
 | 
			
		||||
        let mut window = LedgerWindow::new(ledger_path.clone()).unwrap();
 | 
			
		||||
 | 
			
		||||
        for (i, entry) in entries.iter().enumerate() {
 | 
			
		||||
            let read_entry = window.get_entry(i as u64).unwrap();
 | 
			
		||||
            assert_eq!(*entry, read_entry);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        std::fs::remove_dir_all(ledger_path).unwrap();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user