391 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
		
		
			
		
	
	
			391 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
|   | // Copyright 2016 The go-ethereum Authors | ||
|  | // This file is part of the go-ethereum library. | ||
|  | // | ||
|  | // The go-ethereum library is free software: you can redistribute it and/or modify | ||
|  | // it under the terms of the GNU Lesser General Public License as published by | ||
|  | // the Free Software Foundation, either version 3 of the License, or | ||
|  | // (at your option) any later version. | ||
|  | // | ||
|  | // The go-ethereum library is distributed in the hope that it will be useful, | ||
|  | // but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
|  | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
|  | // GNU Lesser General Public License for more details. | ||
|  | // | ||
|  | // You should have received a copy of the GNU Lesser General Public License | ||
|  | // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. | ||
|  | 
 | ||
|  | package network | ||
|  | 
 | ||
|  | import ( | ||
|  | 	"encoding/binary" | ||
|  | 	"fmt" | ||
|  | 
 | ||
|  | 	"github.com/ethereum/go-ethereum/logger" | ||
|  | 	"github.com/ethereum/go-ethereum/logger/glog" | ||
|  | 	"github.com/ethereum/go-ethereum/swarm/storage" | ||
|  | 	"github.com/syndtr/goleveldb/leveldb" | ||
|  | 	"github.com/syndtr/goleveldb/leveldb/iterator" | ||
|  | ) | ||
|  | 
 | ||
|  | const counterKeyPrefix = 0x01 | ||
|  | 
 | ||
|  | /* | ||
|  | syncDb is a queueing service for outgoing deliveries. | ||
|  | One instance per priority queue for each peer | ||
|  | 
 | ||
|  | a syncDb instance maintains an in-memory buffer (of capacity bufferSize) | ||
|  | once its in-memory buffer is full it switches to persisting in db | ||
|  | and dbRead iterator iterates through the items keeping their order | ||
|  | once the db read catches up (there is no more items in the db) then | ||
|  | it switches back to in-memory buffer. | ||
|  | 
 | ||
|  | when syncdb is stopped all items in the buffer are saved to the db | ||
|  | */ | ||
|  | type syncDb struct { | ||
|  | 	start          []byte               // this syncdb starting index in requestdb | ||
|  | 	key            storage.Key          // remote peers address key | ||
|  | 	counterKey     []byte               // db key to persist counter | ||
|  | 	priority       uint                 // priotity High|Medium|Low | ||
|  | 	buffer         chan interface{}     // incoming request channel | ||
|  | 	db             *storage.LDBDatabase // underlying db (TODO should be interface) | ||
|  | 	done           chan bool            // chan to signal goroutines finished quitting | ||
|  | 	quit           chan bool            // chan to signal quitting to goroutines | ||
|  | 	total, dbTotal int                  // counts for one session | ||
|  | 	batch          chan chan int        // channel for batch requests | ||
|  | 	dbBatchSize    uint                 // number of items before batch is saved | ||
|  | } | ||
|  | 
 | ||
|  | // constructor needs a shared request db (leveldb) | ||
|  | // priority is used in the index key | ||
|  | // uses a buffer and a leveldb for persistent storage | ||
|  | // bufferSize, dbBatchSize are config parameters | ||
|  | func newSyncDb(db *storage.LDBDatabase, key storage.Key, priority uint, bufferSize, dbBatchSize uint, deliver func(interface{}, chan bool) bool) *syncDb { | ||
|  | 	start := make([]byte, 42) | ||
|  | 	start[1] = byte(priorities - priority) | ||
|  | 	copy(start[2:34], key) | ||
|  | 
 | ||
|  | 	counterKey := make([]byte, 34) | ||
|  | 	counterKey[0] = counterKeyPrefix | ||
|  | 	copy(counterKey[1:], start[1:34]) | ||
|  | 
 | ||
|  | 	syncdb := &syncDb{ | ||
|  | 		start:       start, | ||
|  | 		key:         key, | ||
|  | 		counterKey:  counterKey, | ||
|  | 		priority:    priority, | ||
|  | 		buffer:      make(chan interface{}, bufferSize), | ||
|  | 		db:          db, | ||
|  | 		done:        make(chan bool), | ||
|  | 		quit:        make(chan bool), | ||
|  | 		batch:       make(chan chan int), | ||
|  | 		dbBatchSize: dbBatchSize, | ||
|  | 	} | ||
|  | 	glog.V(logger.Detail).Infof("syncDb[peer: %v, priority: %v] - initialised", key.Log(), priority) | ||
|  | 
 | ||
|  | 	// starts the main forever loop reading from buffer | ||
|  | 	go syncdb.bufferRead(deliver) | ||
|  | 	return syncdb | ||
|  | } | ||
|  | 
 | ||
|  | /* | ||
|  | bufferRead is a forever iterator loop that takes care of delivering | ||
|  | outgoing store requests reads from incoming buffer | ||
|  | 
 | ||
|  | its argument is the deliver function taking the item as first argument | ||
|  | and a quit channel as second. | ||
|  | Closing of this channel is supposed to abort all waiting for delivery | ||
|  | (typically network write) | ||
|  | 
 | ||
|  | The iteration switches between 2 modes, | ||
|  | * buffer mode reads the in-memory buffer and delivers the items directly | ||
|  | * db mode reads from the buffer and writes to the db, parallelly another | ||
|  | routine is started that reads from the db and delivers items | ||
|  | 
 | ||
|  | If there is buffer contention in buffer mode (slow network, high upload volume) | ||
|  | syncdb switches to db mode and starts dbRead | ||
|  | Once db backlog is delivered, it reverts back to in-memory buffer | ||
|  | 
 | ||
|  | It is automatically started when syncdb is initialised. | ||
|  | 
 | ||
|  | It saves the buffer to db upon receiving quit signal. syncDb#stop() | ||
|  | */ | ||
|  | func (self *syncDb) bufferRead(deliver func(interface{}, chan bool) bool) { | ||
|  | 	var buffer, db chan interface{} // channels representing the two read modes | ||
|  | 	var more bool | ||
|  | 	var req interface{} | ||
|  | 	var entry *syncDbEntry | ||
|  | 	var inBatch, inDb int | ||
|  | 	batch := new(leveldb.Batch) | ||
|  | 	var dbSize chan int | ||
|  | 	quit := self.quit | ||
|  | 	counterValue := make([]byte, 8) | ||
|  | 
 | ||
|  | 	// counter is used for keeping the items in order, persisted to db | ||
|  | 	// start counter where db was at, 0 if not found | ||
|  | 	data, err := self.db.Get(self.counterKey) | ||
|  | 	var counter uint64 | ||
|  | 	if err == nil { | ||
|  | 		counter = binary.BigEndian.Uint64(data) | ||
|  | 		glog.V(logger.Detail).Infof("syncDb[%v/%v] - counter read from db at %v", self.key.Log(), self.priority, counter) | ||
|  | 	} else { | ||
|  | 		glog.V(logger.Detail).Infof("syncDb[%v/%v] - counter starts at %v", self.key.Log(), self.priority, counter) | ||
|  | 	} | ||
|  | 
 | ||
|  | LOOP: | ||
|  | 	for { | ||
|  | 		// waiting for item next in the buffer, or quit signal or batch request | ||
|  | 		select { | ||
|  | 		// buffer only closes when writing to db | ||
|  | 		case req = <-buffer: | ||
|  | 			// deliver request : this is blocking on network write so | ||
|  | 			// it is passed the quit channel as argument, so that it returns | ||
|  | 			// if syncdb is stopped. In this case we need to save the item to the db | ||
|  | 			more = deliver(req, self.quit) | ||
|  | 			if !more { | ||
|  | 				glog.V(logger.Debug).Infof("syncDb[%v/%v] quit: switching to db. session tally (db/total): %v/%v", self.key.Log(), self.priority, self.dbTotal, self.total) | ||
|  | 				// received quit signal, save request currently waiting delivery | ||
|  | 				// by switching to db mode and closing the buffer | ||
|  | 				buffer = nil | ||
|  | 				db = self.buffer | ||
|  | 				close(db) | ||
|  | 				quit = nil // needs to block the quit case in select | ||
|  | 				break      // break from select, this item will be written to the db | ||
|  | 			} | ||
|  | 			self.total++ | ||
|  | 			glog.V(logger.Detail).Infof("syncDb[%v/%v] deliver (db/total): %v/%v", self.key.Log(), self.priority, self.dbTotal, self.total) | ||
|  | 			// by the time deliver returns, there were new writes to the buffer | ||
|  | 			// if buffer contention is detected, switch to db mode which drains | ||
|  | 			// the buffer so no process will block on pushing store requests | ||
|  | 			if len(buffer) == cap(buffer) { | ||
|  | 				glog.V(logger.Debug).Infof("syncDb[%v/%v] buffer full %v: switching to db. session tally (db/total): %v/%v", self.key.Log(), self.priority, cap(buffer), self.dbTotal, self.total) | ||
|  | 				buffer = nil | ||
|  | 				db = self.buffer | ||
|  | 			} | ||
|  | 			continue LOOP | ||
|  | 
 | ||
|  | 			// incoming entry to put into db | ||
|  | 		case req, more = <-db: | ||
|  | 			if !more { | ||
|  | 				// only if quit is called, saved all the buffer | ||
|  | 				binary.BigEndian.PutUint64(counterValue, counter) | ||
|  | 				batch.Put(self.counterKey, counterValue) // persist counter in batch | ||
|  | 				self.writeSyncBatch(batch)               // save batch | ||
|  | 				glog.V(logger.Detail).Infof("syncDb[%v/%v] quitting: save current batch to db", self.key.Log(), self.priority) | ||
|  | 				break LOOP | ||
|  | 			} | ||
|  | 			self.dbTotal++ | ||
|  | 			self.total++ | ||
|  | 			// otherwise break after select | ||
|  | 		case dbSize = <-self.batch: | ||
|  | 			// explicit request for batch | ||
|  | 			if inBatch == 0 && quit != nil { | ||
|  | 				// there was no writes since the last batch so db depleted | ||
|  | 				// switch to buffer mode | ||
|  | 				glog.V(logger.Debug).Infof("syncDb[%v/%v] empty db: switching to buffer", self.key.Log(), self.priority) | ||
|  | 				db = nil | ||
|  | 				buffer = self.buffer | ||
|  | 				dbSize <- 0 // indicates to 'caller' that batch has been written | ||
|  | 				inDb = 0 | ||
|  | 				continue LOOP | ||
|  | 			} | ||
|  | 			binary.BigEndian.PutUint64(counterValue, counter) | ||
|  | 			batch.Put(self.counterKey, counterValue) | ||
|  | 			glog.V(logger.Debug).Infof("syncDb[%v/%v] write batch %v/%v - %x - %x", self.key.Log(), self.priority, inBatch, counter, self.counterKey, counterValue) | ||
|  | 			batch = self.writeSyncBatch(batch) | ||
|  | 			dbSize <- inBatch // indicates to 'caller' that batch has been written | ||
|  | 			inBatch = 0 | ||
|  | 			continue LOOP | ||
|  | 
 | ||
|  | 			// closing syncDb#quit channel is used to signal to all goroutines to quit | ||
|  | 		case <-quit: | ||
|  | 			// need to save backlog, so switch to db mode | ||
|  | 			db = self.buffer | ||
|  | 			buffer = nil | ||
|  | 			quit = nil | ||
|  | 			glog.V(logger.Detail).Infof("syncDb[%v/%v] quitting: save buffer to db", self.key.Log(), self.priority) | ||
|  | 			close(db) | ||
|  | 			continue LOOP | ||
|  | 		} | ||
|  | 
 | ||
|  | 		// only get here if we put req into db | ||
|  | 		entry, err = self.newSyncDbEntry(req, counter) | ||
|  | 		if err != nil { | ||
|  | 			glog.V(logger.Warn).Infof("syncDb[%v/%v] saving request %v (#%v/%v) failed: %v", self.key.Log(), self.priority, req, inBatch, inDb, err) | ||
|  | 			continue LOOP | ||
|  | 		} | ||
|  | 		batch.Put(entry.key, entry.val) | ||
|  | 		glog.V(logger.Detail).Infof("syncDb[%v/%v] to batch %v '%v' (#%v/%v/%v)", self.key.Log(), self.priority, req, entry, inBatch, inDb, counter) | ||
|  | 		// if just switched to db mode and not quitting, then launch dbRead | ||
|  | 		// in a parallel go routine to send deliveries from db | ||
|  | 		if inDb == 0 && quit != nil { | ||
|  | 			glog.V(logger.Detail).Infof("syncDb[%v/%v] start dbRead") | ||
|  | 			go self.dbRead(true, counter, deliver) | ||
|  | 		} | ||
|  | 		inDb++ | ||
|  | 		inBatch++ | ||
|  | 		counter++ | ||
|  | 		// need to save the batch if it gets too large (== dbBatchSize) | ||
|  | 		if inBatch%int(self.dbBatchSize) == 0 { | ||
|  | 			batch = self.writeSyncBatch(batch) | ||
|  | 		} | ||
|  | 	} | ||
|  | 	glog.V(logger.Info).Infof("syncDb[%v:%v]: saved %v keys (saved counter at %v)", self.key.Log(), self.priority, inBatch, counter) | ||
|  | 	close(self.done) | ||
|  | } | ||
|  | 
 | ||
|  | // writes the batch to the db and returns a new batch object | ||
|  | func (self *syncDb) writeSyncBatch(batch *leveldb.Batch) *leveldb.Batch { | ||
|  | 	err := self.db.Write(batch) | ||
|  | 	if err != nil { | ||
|  | 		glog.V(logger.Warn).Infof("syncDb[%v/%v] saving batch to db failed: %v", self.key.Log(), self.priority, err) | ||
|  | 		return batch | ||
|  | 	} | ||
|  | 	return new(leveldb.Batch) | ||
|  | } | ||
|  | 
 | ||
|  | // abstract type for db entries (TODO could be a feature of Receipts) | ||
|  | type syncDbEntry struct { | ||
|  | 	key, val []byte | ||
|  | } | ||
|  | 
 | ||
|  | func (self syncDbEntry) String() string { | ||
|  | 	return fmt.Sprintf("key: %x, value: %x", self.key, self.val) | ||
|  | } | ||
|  | 
 | ||
|  | /* | ||
|  | 	dbRead is iterating over store requests to be sent over to the peer | ||
|  | 	this is mainly to prevent crashes due to network output buffer contention (???) | ||
|  | 	as well as to make syncronisation resilient to disconnects | ||
|  | 	the messages are supposed to be sent in the p2p priority queue. | ||
|  | 
 | ||
|  | 	the request DB is shared between peers, but domains for each syncdb | ||
|  | 	are disjoint. dbkeys (42 bytes) are structured: | ||
|  | 	* 0: 0x00 (0x01 reserved for counter key) | ||
|  | 	* 1: priorities - priority (so that high priority can be replayed first) | ||
|  | 	* 2-33: peers address | ||
|  | 	* 34-41: syncdb counter to preserve order (this field is missing for the counter key) | ||
|  | 
 | ||
|  | 	values (40 bytes) are: | ||
|  | 	* 0-31: key | ||
|  | 	* 32-39: request id | ||
|  | 
 | ||
|  | dbRead needs a boolean to indicate if on first round all the historical | ||
|  | record is synced. Second argument to indicate current db counter | ||
|  | The third is the function to apply | ||
|  | */ | ||
|  | func (self *syncDb) dbRead(useBatches bool, counter uint64, fun func(interface{}, chan bool) bool) { | ||
|  | 	key := make([]byte, 42) | ||
|  | 	copy(key, self.start) | ||
|  | 	binary.BigEndian.PutUint64(key[34:], counter) | ||
|  | 	var batches, n, cnt, total int | ||
|  | 	var more bool | ||
|  | 	var entry *syncDbEntry | ||
|  | 	var it iterator.Iterator | ||
|  | 	var del *leveldb.Batch | ||
|  | 	batchSizes := make(chan int) | ||
|  | 
 | ||
|  | 	for { | ||
|  | 		// if useBatches is false, cnt is not set | ||
|  | 		if useBatches { | ||
|  | 			// this could be called before all cnt items sent out | ||
|  | 			// so that loop is not blocking while delivering | ||
|  | 			// only relevant if cnt is large | ||
|  | 			select { | ||
|  | 			case self.batch <- batchSizes: | ||
|  | 			case <-self.quit: | ||
|  | 				return | ||
|  | 			} | ||
|  | 			// wait for the write to finish and get the item count in the next batch | ||
|  | 			cnt = <-batchSizes | ||
|  | 			batches++ | ||
|  | 			if cnt == 0 { | ||
|  | 				// empty | ||
|  | 				return | ||
|  | 			} | ||
|  | 		} | ||
|  | 		it = self.db.NewIterator() | ||
|  | 		it.Seek(key) | ||
|  | 		if !it.Valid() { | ||
|  | 			copy(key, self.start) | ||
|  | 			useBatches = true | ||
|  | 			continue | ||
|  | 		} | ||
|  | 		del = new(leveldb.Batch) | ||
|  | 		glog.V(logger.Detail).Infof("syncDb[%v/%v]: new iterator: %x (batch %v, count %v)", self.key.Log(), self.priority, key, batches, cnt) | ||
|  | 
 | ||
|  | 		for n = 0; !useBatches || n < cnt; it.Next() { | ||
|  | 			copy(key, it.Key()) | ||
|  | 			if len(key) == 0 || key[0] != 0 { | ||
|  | 				copy(key, self.start) | ||
|  | 				useBatches = true | ||
|  | 				break | ||
|  | 			} | ||
|  | 			val := make([]byte, 40) | ||
|  | 			copy(val, it.Value()) | ||
|  | 			entry = &syncDbEntry{key, val} | ||
|  | 			// glog.V(logger.Detail).Infof("syncDb[%v/%v] - %v, batches: %v, total: %v, session total from db: %v/%v", self.key.Log(), self.priority, self.key.Log(), batches, total, self.dbTotal, self.total) | ||
|  | 			more = fun(entry, self.quit) | ||
|  | 			if !more { | ||
|  | 				// quit received when waiting to deliver entry, the entry will not be deleted | ||
|  | 				glog.V(logger.Detail).Infof("syncDb[%v/%v] batch %v quit after %v/%v items", self.key.Log(), self.priority, batches, n, cnt) | ||
|  | 				break | ||
|  | 			} | ||
|  | 			// since subsequent batches of the same db session are indexed incrementally | ||
|  | 			// deleting earlier batches can be delayed and parallelised | ||
|  | 			// this could be batch delete when db is idle (but added complexity esp when quitting) | ||
|  | 			del.Delete(key) | ||
|  | 			n++ | ||
|  | 			total++ | ||
|  | 		} | ||
|  | 		glog.V(logger.Debug).Infof("syncDb[%v/%v] - db session closed, batches: %v, total: %v, session total from db: %v/%v", self.key.Log(), self.priority, batches, total, self.dbTotal, self.total) | ||
|  | 		self.db.Write(del) // this could be async called only when db is idle | ||
|  | 		it.Release() | ||
|  | 	} | ||
|  | } | ||
|  | 
 | ||
|  | // | ||
|  | func (self *syncDb) stop() { | ||
|  | 	close(self.quit) | ||
|  | 	<-self.done | ||
|  | } | ||
|  | 
 | ||
|  | // calculate a dbkey for the request, for the db to work | ||
|  | // see syncdb for db key structure | ||
|  | // polimorphic: accepted types, see syncer#addRequest | ||
|  | func (self *syncDb) newSyncDbEntry(req interface{}, counter uint64) (entry *syncDbEntry, err error) { | ||
|  | 	var key storage.Key | ||
|  | 	var chunk *storage.Chunk | ||
|  | 	var id uint64 | ||
|  | 	var ok bool | ||
|  | 	var sreq *storeRequestMsgData | ||
|  | 
 | ||
|  | 	if key, ok = req.(storage.Key); ok { | ||
|  | 		id = generateId() | ||
|  | 	} else if chunk, ok = req.(*storage.Chunk); ok { | ||
|  | 		key = chunk.Key | ||
|  | 		id = generateId() | ||
|  | 	} else if sreq, ok = req.(*storeRequestMsgData); ok { | ||
|  | 		key = sreq.Key | ||
|  | 		id = sreq.Id | ||
|  | 	} else if entry, ok = req.(*syncDbEntry); !ok { | ||
|  | 		return nil, fmt.Errorf("type not allowed: %v (%T)", req, req) | ||
|  | 	} | ||
|  | 
 | ||
|  | 	// order by peer > priority > seqid | ||
|  | 	// value is request id if exists | ||
|  | 	if entry == nil { | ||
|  | 		dbkey := make([]byte, 42) | ||
|  | 		dbval := make([]byte, 40) | ||
|  | 
 | ||
|  | 		// encode key | ||
|  | 		copy(dbkey[:], self.start[:34]) // db  peer | ||
|  | 		binary.BigEndian.PutUint64(dbkey[34:], counter) | ||
|  | 		// encode value | ||
|  | 		copy(dbval, key[:]) | ||
|  | 		binary.BigEndian.PutUint64(dbval[32:], id) | ||
|  | 
 | ||
|  | 		entry = &syncDbEntry{dbkey, dbval} | ||
|  | 	} | ||
|  | 	return | ||
|  | } |