390 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			390 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright 2016 The go-ethereum Authors
 | |
| // This file is part of the go-ethereum library.
 | |
| //
 | |
| // The go-ethereum library is free software: you can redistribute it and/or modify
 | |
| // it under the terms of the GNU Lesser General Public License as published by
 | |
| // the Free Software Foundation, either version 3 of the License, or
 | |
| // (at your option) any later version.
 | |
| //
 | |
| // The go-ethereum library is distributed in the hope that it will be useful,
 | |
| // but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 | |
| // GNU Lesser General Public License for more details.
 | |
| //
 | |
| // You should have received a copy of the GNU Lesser General Public License
 | |
| // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
 | |
| 
 | |
| package network
 | |
| 
 | |
| import (
 | |
| 	"encoding/binary"
 | |
| 	"fmt"
 | |
| 
 | |
| 	"github.com/ethereum/go-ethereum/log"
 | |
| 	"github.com/ethereum/go-ethereum/swarm/storage"
 | |
| 	"github.com/syndtr/goleveldb/leveldb"
 | |
| 	"github.com/syndtr/goleveldb/leveldb/iterator"
 | |
| )
 | |
| 
 | |
| const counterKeyPrefix = 0x01
 | |
| 
 | |
| /*
 | |
| syncDb is a queueing service for outgoing deliveries.
 | |
| One instance per priority queue for each peer
 | |
| 
 | |
| a syncDb instance maintains an in-memory buffer (of capacity bufferSize)
 | |
| once its in-memory buffer is full it switches to persisting in db
 | |
| and dbRead iterator iterates through the items keeping their order
 | |
| once the db read catches up (there is no more items in the db) then
 | |
| it switches back to in-memory buffer.
 | |
| 
 | |
| when syncdb is stopped all items in the buffer are saved to the db
 | |
| */
 | |
| type syncDb struct {
 | |
| 	start          []byte               // this syncdb starting index in requestdb
 | |
| 	key            storage.Key          // remote peers address key
 | |
| 	counterKey     []byte               // db key to persist counter
 | |
| 	priority       uint                 // priotity High|Medium|Low
 | |
| 	buffer         chan interface{}     // incoming request channel
 | |
| 	db             *storage.LDBDatabase // underlying db (TODO should be interface)
 | |
| 	done           chan bool            // chan to signal goroutines finished quitting
 | |
| 	quit           chan bool            // chan to signal quitting to goroutines
 | |
| 	total, dbTotal int                  // counts for one session
 | |
| 	batch          chan chan int        // channel for batch requests
 | |
| 	dbBatchSize    uint                 // number of items before batch is saved
 | |
| }
 | |
| 
 | |
| // constructor needs a shared request db (leveldb)
 | |
| // priority is used in the index key
 | |
| // uses a buffer and a leveldb for persistent storage
 | |
| // bufferSize, dbBatchSize are config parameters
 | |
| func newSyncDb(db *storage.LDBDatabase, key storage.Key, priority uint, bufferSize, dbBatchSize uint, deliver func(interface{}, chan bool) bool) *syncDb {
 | |
| 	start := make([]byte, 42)
 | |
| 	start[1] = byte(priorities - priority)
 | |
| 	copy(start[2:34], key)
 | |
| 
 | |
| 	counterKey := make([]byte, 34)
 | |
| 	counterKey[0] = counterKeyPrefix
 | |
| 	copy(counterKey[1:], start[1:34])
 | |
| 
 | |
| 	syncdb := &syncDb{
 | |
| 		start:       start,
 | |
| 		key:         key,
 | |
| 		counterKey:  counterKey,
 | |
| 		priority:    priority,
 | |
| 		buffer:      make(chan interface{}, bufferSize),
 | |
| 		db:          db,
 | |
| 		done:        make(chan bool),
 | |
| 		quit:        make(chan bool),
 | |
| 		batch:       make(chan chan int),
 | |
| 		dbBatchSize: dbBatchSize,
 | |
| 	}
 | |
| 	log.Trace(fmt.Sprintf("syncDb[peer: %v, priority: %v] - initialised", key.Log(), priority))
 | |
| 
 | |
| 	// starts the main forever loop reading from buffer
 | |
| 	go syncdb.bufferRead(deliver)
 | |
| 	return syncdb
 | |
| }
 | |
| 
 | |
| /*
 | |
| bufferRead is a forever iterator loop that takes care of delivering
 | |
| outgoing store requests reads from incoming buffer
 | |
| 
 | |
| its argument is the deliver function taking the item as first argument
 | |
| and a quit channel as second.
 | |
| Closing of this channel is supposed to abort all waiting for delivery
 | |
| (typically network write)
 | |
| 
 | |
| The iteration switches between 2 modes,
 | |
| * buffer mode reads the in-memory buffer and delivers the items directly
 | |
| * db mode reads from the buffer and writes to the db, parallelly another
 | |
| routine is started that reads from the db and delivers items
 | |
| 
 | |
| If there is buffer contention in buffer mode (slow network, high upload volume)
 | |
| syncdb switches to db mode and starts dbRead
 | |
| Once db backlog is delivered, it reverts back to in-memory buffer
 | |
| 
 | |
| It is automatically started when syncdb is initialised.
 | |
| 
 | |
| It saves the buffer to db upon receiving quit signal. syncDb#stop()
 | |
| */
 | |
| func (self *syncDb) bufferRead(deliver func(interface{}, chan bool) bool) {
 | |
| 	var buffer, db chan interface{} // channels representing the two read modes
 | |
| 	var more bool
 | |
| 	var req interface{}
 | |
| 	var entry *syncDbEntry
 | |
| 	var inBatch, inDb int
 | |
| 	batch := new(leveldb.Batch)
 | |
| 	var dbSize chan int
 | |
| 	quit := self.quit
 | |
| 	counterValue := make([]byte, 8)
 | |
| 
 | |
| 	// counter is used for keeping the items in order, persisted to db
 | |
| 	// start counter where db was at, 0 if not found
 | |
| 	data, err := self.db.Get(self.counterKey)
 | |
| 	var counter uint64
 | |
| 	if err == nil {
 | |
| 		counter = binary.BigEndian.Uint64(data)
 | |
| 		log.Trace(fmt.Sprintf("syncDb[%v/%v] - counter read from db at %v", self.key.Log(), self.priority, counter))
 | |
| 	} else {
 | |
| 		log.Trace(fmt.Sprintf("syncDb[%v/%v] - counter starts at %v", self.key.Log(), self.priority, counter))
 | |
| 	}
 | |
| 
 | |
| LOOP:
 | |
| 	for {
 | |
| 		// waiting for item next in the buffer, or quit signal or batch request
 | |
| 		select {
 | |
| 		// buffer only closes when writing to db
 | |
| 		case req = <-buffer:
 | |
| 			// deliver request : this is blocking on network write so
 | |
| 			// it is passed the quit channel as argument, so that it returns
 | |
| 			// if syncdb is stopped. In this case we need to save the item to the db
 | |
| 			more = deliver(req, self.quit)
 | |
| 			if !more {
 | |
| 				log.Debug(fmt.Sprintf("syncDb[%v/%v] quit: switching to db. session tally (db/total): %v/%v", self.key.Log(), self.priority, self.dbTotal, self.total))
 | |
| 				// received quit signal, save request currently waiting delivery
 | |
| 				// by switching to db mode and closing the buffer
 | |
| 				buffer = nil
 | |
| 				db = self.buffer
 | |
| 				close(db)
 | |
| 				quit = nil // needs to block the quit case in select
 | |
| 				break      // break from select, this item will be written to the db
 | |
| 			}
 | |
| 			self.total++
 | |
| 			log.Trace(fmt.Sprintf("syncDb[%v/%v] deliver (db/total): %v/%v", self.key.Log(), self.priority, self.dbTotal, self.total))
 | |
| 			// by the time deliver returns, there were new writes to the buffer
 | |
| 			// if buffer contention is detected, switch to db mode which drains
 | |
| 			// the buffer so no process will block on pushing store requests
 | |
| 			if len(buffer) == cap(buffer) {
 | |
| 				log.Debug(fmt.Sprintf("syncDb[%v/%v] buffer full %v: switching to db. session tally (db/total): %v/%v", self.key.Log(), self.priority, cap(buffer), self.dbTotal, self.total))
 | |
| 				buffer = nil
 | |
| 				db = self.buffer
 | |
| 			}
 | |
| 			continue LOOP
 | |
| 
 | |
| 			// incoming entry to put into db
 | |
| 		case req, more = <-db:
 | |
| 			if !more {
 | |
| 				// only if quit is called, saved all the buffer
 | |
| 				binary.BigEndian.PutUint64(counterValue, counter)
 | |
| 				batch.Put(self.counterKey, counterValue) // persist counter in batch
 | |
| 				self.writeSyncBatch(batch)               // save batch
 | |
| 				log.Trace(fmt.Sprintf("syncDb[%v/%v] quitting: save current batch to db", self.key.Log(), self.priority))
 | |
| 				break LOOP
 | |
| 			}
 | |
| 			self.dbTotal++
 | |
| 			self.total++
 | |
| 			// otherwise break after select
 | |
| 		case dbSize = <-self.batch:
 | |
| 			// explicit request for batch
 | |
| 			if inBatch == 0 && quit != nil {
 | |
| 				// there was no writes since the last batch so db depleted
 | |
| 				// switch to buffer mode
 | |
| 				log.Debug(fmt.Sprintf("syncDb[%v/%v] empty db: switching to buffer", self.key.Log(), self.priority))
 | |
| 				db = nil
 | |
| 				buffer = self.buffer
 | |
| 				dbSize <- 0 // indicates to 'caller' that batch has been written
 | |
| 				inDb = 0
 | |
| 				continue LOOP
 | |
| 			}
 | |
| 			binary.BigEndian.PutUint64(counterValue, counter)
 | |
| 			batch.Put(self.counterKey, counterValue)
 | |
| 			log.Debug(fmt.Sprintf("syncDb[%v/%v] write batch %v/%v - %x - %x", self.key.Log(), self.priority, inBatch, counter, self.counterKey, counterValue))
 | |
| 			batch = self.writeSyncBatch(batch)
 | |
| 			dbSize <- inBatch // indicates to 'caller' that batch has been written
 | |
| 			inBatch = 0
 | |
| 			continue LOOP
 | |
| 
 | |
| 			// closing syncDb#quit channel is used to signal to all goroutines to quit
 | |
| 		case <-quit:
 | |
| 			// need to save backlog, so switch to db mode
 | |
| 			db = self.buffer
 | |
| 			buffer = nil
 | |
| 			quit = nil
 | |
| 			log.Trace(fmt.Sprintf("syncDb[%v/%v] quitting: save buffer to db", self.key.Log(), self.priority))
 | |
| 			close(db)
 | |
| 			continue LOOP
 | |
| 		}
 | |
| 
 | |
| 		// only get here if we put req into db
 | |
| 		entry, err = self.newSyncDbEntry(req, counter)
 | |
| 		if err != nil {
 | |
| 			log.Warn(fmt.Sprintf("syncDb[%v/%v] saving request %v (#%v/%v) failed: %v", self.key.Log(), self.priority, req, inBatch, inDb, err))
 | |
| 			continue LOOP
 | |
| 		}
 | |
| 		batch.Put(entry.key, entry.val)
 | |
| 		log.Trace(fmt.Sprintf("syncDb[%v/%v] to batch %v '%v' (#%v/%v/%v)", self.key.Log(), self.priority, req, entry, inBatch, inDb, counter))
 | |
| 		// if just switched to db mode and not quitting, then launch dbRead
 | |
| 		// in a parallel go routine to send deliveries from db
 | |
| 		if inDb == 0 && quit != nil {
 | |
| 			log.Trace(fmt.Sprintf("syncDb[%v/%v] start dbRead", self.key.Log(), self.priority))
 | |
| 			go self.dbRead(true, counter, deliver)
 | |
| 		}
 | |
| 		inDb++
 | |
| 		inBatch++
 | |
| 		counter++
 | |
| 		// need to save the batch if it gets too large (== dbBatchSize)
 | |
| 		if inBatch%int(self.dbBatchSize) == 0 {
 | |
| 			batch = self.writeSyncBatch(batch)
 | |
| 		}
 | |
| 	}
 | |
| 	log.Info(fmt.Sprintf("syncDb[%v:%v]: saved %v keys (saved counter at %v)", self.key.Log(), self.priority, inBatch, counter))
 | |
| 	close(self.done)
 | |
| }
 | |
| 
 | |
| // writes the batch to the db and returns a new batch object
 | |
| func (self *syncDb) writeSyncBatch(batch *leveldb.Batch) *leveldb.Batch {
 | |
| 	err := self.db.Write(batch)
 | |
| 	if err != nil {
 | |
| 		log.Warn(fmt.Sprintf("syncDb[%v/%v] saving batch to db failed: %v", self.key.Log(), self.priority, err))
 | |
| 		return batch
 | |
| 	}
 | |
| 	return new(leveldb.Batch)
 | |
| }
 | |
| 
 | |
| // abstract type for db entries (TODO could be a feature of Receipts)
 | |
| type syncDbEntry struct {
 | |
| 	key, val []byte
 | |
| }
 | |
| 
 | |
| func (self syncDbEntry) String() string {
 | |
| 	return fmt.Sprintf("key: %x, value: %x", self.key, self.val)
 | |
| }
 | |
| 
 | |
| /*
 | |
| 	dbRead is iterating over store requests to be sent over to the peer
 | |
| 	this is mainly to prevent crashes due to network output buffer contention (???)
 | |
| 	as well as to make syncronisation resilient to disconnects
 | |
| 	the messages are supposed to be sent in the p2p priority queue.
 | |
| 
 | |
| 	the request DB is shared between peers, but domains for each syncdb
 | |
| 	are disjoint. dbkeys (42 bytes) are structured:
 | |
| 	* 0: 0x00 (0x01 reserved for counter key)
 | |
| 	* 1: priorities - priority (so that high priority can be replayed first)
 | |
| 	* 2-33: peers address
 | |
| 	* 34-41: syncdb counter to preserve order (this field is missing for the counter key)
 | |
| 
 | |
| 	values (40 bytes) are:
 | |
| 	* 0-31: key
 | |
| 	* 32-39: request id
 | |
| 
 | |
| dbRead needs a boolean to indicate if on first round all the historical
 | |
| record is synced. Second argument to indicate current db counter
 | |
| The third is the function to apply
 | |
| */
 | |
| func (self *syncDb) dbRead(useBatches bool, counter uint64, fun func(interface{}, chan bool) bool) {
 | |
| 	key := make([]byte, 42)
 | |
| 	copy(key, self.start)
 | |
| 	binary.BigEndian.PutUint64(key[34:], counter)
 | |
| 	var batches, n, cnt, total int
 | |
| 	var more bool
 | |
| 	var entry *syncDbEntry
 | |
| 	var it iterator.Iterator
 | |
| 	var del *leveldb.Batch
 | |
| 	batchSizes := make(chan int)
 | |
| 
 | |
| 	for {
 | |
| 		// if useBatches is false, cnt is not set
 | |
| 		if useBatches {
 | |
| 			// this could be called before all cnt items sent out
 | |
| 			// so that loop is not blocking while delivering
 | |
| 			// only relevant if cnt is large
 | |
| 			select {
 | |
| 			case self.batch <- batchSizes:
 | |
| 			case <-self.quit:
 | |
| 				return
 | |
| 			}
 | |
| 			// wait for the write to finish and get the item count in the next batch
 | |
| 			cnt = <-batchSizes
 | |
| 			batches++
 | |
| 			if cnt == 0 {
 | |
| 				// empty
 | |
| 				return
 | |
| 			}
 | |
| 		}
 | |
| 		it = self.db.NewIterator()
 | |
| 		it.Seek(key)
 | |
| 		if !it.Valid() {
 | |
| 			copy(key, self.start)
 | |
| 			useBatches = true
 | |
| 			continue
 | |
| 		}
 | |
| 		del = new(leveldb.Batch)
 | |
| 		log.Trace(fmt.Sprintf("syncDb[%v/%v]: new iterator: %x (batch %v, count %v)", self.key.Log(), self.priority, key, batches, cnt))
 | |
| 
 | |
| 		for n = 0; !useBatches || n < cnt; it.Next() {
 | |
| 			copy(key, it.Key())
 | |
| 			if len(key) == 0 || key[0] != 0 {
 | |
| 				copy(key, self.start)
 | |
| 				useBatches = true
 | |
| 				break
 | |
| 			}
 | |
| 			val := make([]byte, 40)
 | |
| 			copy(val, it.Value())
 | |
| 			entry = &syncDbEntry{key, val}
 | |
| 			// log.Trace(fmt.Sprintf("syncDb[%v/%v] - %v, batches: %v, total: %v, session total from db: %v/%v", self.key.Log(), self.priority, self.key.Log(), batches, total, self.dbTotal, self.total))
 | |
| 			more = fun(entry, self.quit)
 | |
| 			if !more {
 | |
| 				// quit received when waiting to deliver entry, the entry will not be deleted
 | |
| 				log.Trace(fmt.Sprintf("syncDb[%v/%v] batch %v quit after %v/%v items", self.key.Log(), self.priority, batches, n, cnt))
 | |
| 				break
 | |
| 			}
 | |
| 			// since subsequent batches of the same db session are indexed incrementally
 | |
| 			// deleting earlier batches can be delayed and parallelised
 | |
| 			// this could be batch delete when db is idle (but added complexity esp when quitting)
 | |
| 			del.Delete(key)
 | |
| 			n++
 | |
| 			total++
 | |
| 		}
 | |
| 		log.Debug(fmt.Sprintf("syncDb[%v/%v] - db session closed, batches: %v, total: %v, session total from db: %v/%v", self.key.Log(), self.priority, batches, total, self.dbTotal, self.total))
 | |
| 		self.db.Write(del) // this could be async called only when db is idle
 | |
| 		it.Release()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| //
 | |
| func (self *syncDb) stop() {
 | |
| 	close(self.quit)
 | |
| 	<-self.done
 | |
| }
 | |
| 
 | |
| // calculate a dbkey for the request, for the db to work
 | |
| // see syncdb for db key structure
 | |
| // polimorphic: accepted types, see syncer#addRequest
 | |
| func (self *syncDb) newSyncDbEntry(req interface{}, counter uint64) (entry *syncDbEntry, err error) {
 | |
| 	var key storage.Key
 | |
| 	var chunk *storage.Chunk
 | |
| 	var id uint64
 | |
| 	var ok bool
 | |
| 	var sreq *storeRequestMsgData
 | |
| 
 | |
| 	if key, ok = req.(storage.Key); ok {
 | |
| 		id = generateId()
 | |
| 	} else if chunk, ok = req.(*storage.Chunk); ok {
 | |
| 		key = chunk.Key
 | |
| 		id = generateId()
 | |
| 	} else if sreq, ok = req.(*storeRequestMsgData); ok {
 | |
| 		key = sreq.Key
 | |
| 		id = sreq.Id
 | |
| 	} else if entry, ok = req.(*syncDbEntry); !ok {
 | |
| 		return nil, fmt.Errorf("type not allowed: %v (%T)", req, req)
 | |
| 	}
 | |
| 
 | |
| 	// order by peer > priority > seqid
 | |
| 	// value is request id if exists
 | |
| 	if entry == nil {
 | |
| 		dbkey := make([]byte, 42)
 | |
| 		dbval := make([]byte, 40)
 | |
| 
 | |
| 		// encode key
 | |
| 		copy(dbkey[:], self.start[:34]) // db  peer
 | |
| 		binary.BigEndian.PutUint64(dbkey[34:], counter)
 | |
| 		// encode value
 | |
| 		copy(dbval, key[:])
 | |
| 		binary.BigEndian.PutUint64(dbval[32:], id)
 | |
| 
 | |
| 		entry = &syncDbEntry{dbkey, dbval}
 | |
| 	}
 | |
| 	return
 | |
| }
 |