779 lines
		
	
	
		
			25 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
		
		
			
		
	
	
			779 lines
		
	
	
		
			25 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| 
								 | 
							
								// Copyright 2016 The go-ethereum Authors
							 | 
						||
| 
								 | 
							
								// This file is part of the go-ethereum library.
							 | 
						||
| 
								 | 
							
								//
							 | 
						||
| 
								 | 
							
								// The go-ethereum library is free software: you can redistribute it and/or modify
							 | 
						||
| 
								 | 
							
								// it under the terms of the GNU Lesser General Public License as published by
							 | 
						||
| 
								 | 
							
								// the Free Software Foundation, either version 3 of the License, or
							 | 
						||
| 
								 | 
							
								// (at your option) any later version.
							 | 
						||
| 
								 | 
							
								//
							 | 
						||
| 
								 | 
							
								// The go-ethereum library is distributed in the hope that it will be useful,
							 | 
						||
| 
								 | 
							
								// but WITHOUT ANY WARRANTY; without even the implied warranty of
							 | 
						||
| 
								 | 
							
								// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
							 | 
						||
| 
								 | 
							
								// GNU Lesser General Public License for more details.
							 | 
						||
| 
								 | 
							
								//
							 | 
						||
| 
								 | 
							
								// You should have received a copy of the GNU Lesser General Public License
							 | 
						||
| 
								 | 
							
								// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								package network
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								import (
							 | 
						||
| 
								 | 
							
									"encoding/binary"
							 | 
						||
| 
								 | 
							
									"encoding/json"
							 | 
						||
| 
								 | 
							
									"fmt"
							 | 
						||
| 
								 | 
							
									"path/filepath"
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									"github.com/ethereum/go-ethereum/logger"
							 | 
						||
| 
								 | 
							
									"github.com/ethereum/go-ethereum/logger/glog"
							 | 
						||
| 
								 | 
							
									"github.com/ethereum/go-ethereum/swarm/storage"
							 | 
						||
| 
								 | 
							
								)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// syncer parameters (global, not peer specific) default values
							 | 
						||
| 
								 | 
							
								const (
							 | 
						||
| 
								 | 
							
									requestDbBatchSize = 512  // size of batch before written to request db
							 | 
						||
| 
								 | 
							
									keyBufferSize      = 1024 // size of buffer  for unsynced keys
							 | 
						||
| 
								 | 
							
									syncBatchSize      = 128  // maximum batchsize for outgoing requests
							 | 
						||
| 
								 | 
							
									syncBufferSize     = 128  // size of buffer  for delivery requests
							 | 
						||
| 
								 | 
							
									syncCacheSize      = 1024 // cache capacity to store request queue in memory
							 | 
						||
| 
								 | 
							
								)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// priorities
							 | 
						||
| 
								 | 
							
								const (
							 | 
						||
| 
								 | 
							
									Low        = iota // 0
							 | 
						||
| 
								 | 
							
									Medium            // 1
							 | 
						||
| 
								 | 
							
									High              // 2
							 | 
						||
| 
								 | 
							
									priorities        // 3 number of priority levels
							 | 
						||
| 
								 | 
							
								)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// request types
							 | 
						||
| 
								 | 
							
								const (
							 | 
						||
| 
								 | 
							
									DeliverReq   = iota // 0
							 | 
						||
| 
								 | 
							
									PushReq             // 1
							 | 
						||
| 
								 | 
							
									PropagateReq        // 2
							 | 
						||
| 
								 | 
							
									HistoryReq          // 3
							 | 
						||
| 
								 | 
							
									BacklogReq          // 4
							 | 
						||
| 
								 | 
							
								)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// json serialisable struct to record the syncronisation state between 2 peers
							 | 
						||
| 
								 | 
							
								type syncState struct {
							 | 
						||
| 
								 | 
							
									*storage.DbSyncState // embeds the following 4 fields:
							 | 
						||
| 
								 | 
							
									// Start      Key    // lower limit of address space
							 | 
						||
| 
								 | 
							
									// Stop       Key    // upper limit of address space
							 | 
						||
| 
								 | 
							
									// First      uint64 // counter taken from last sync state
							 | 
						||
| 
								 | 
							
									// Last       uint64 // counter of remote peer dbStore at the time of last connection
							 | 
						||
| 
								 | 
							
									SessionAt  uint64      // set at the time of connection
							 | 
						||
| 
								 | 
							
									LastSeenAt uint64      // set at the time of connection
							 | 
						||
| 
								 | 
							
									Latest     storage.Key // cursor of dbstore when last (continuously set by syncer)
							 | 
						||
| 
								 | 
							
									Synced     bool        // true iff Sync is done up to the last disconnect
							 | 
						||
| 
								 | 
							
									synced     chan bool   // signal that sync stage finished
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// wrapper of db-s to provide mockable custom local chunk store access to syncer
							 | 
						||
| 
								 | 
							
								type DbAccess struct {
							 | 
						||
| 
								 | 
							
									db  *storage.DbStore
							 | 
						||
| 
								 | 
							
									loc *storage.LocalStore
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func NewDbAccess(loc *storage.LocalStore) *DbAccess {
							 | 
						||
| 
								 | 
							
									return &DbAccess{loc.DbStore.(*storage.DbStore), loc}
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// to obtain the chunks from key or request db entry only
							 | 
						||
| 
								 | 
							
								func (self *DbAccess) get(key storage.Key) (*storage.Chunk, error) {
							 | 
						||
| 
								 | 
							
									return self.loc.Get(key)
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// current storage counter of chunk db
							 | 
						||
| 
								 | 
							
								func (self *DbAccess) counter() uint64 {
							 | 
						||
| 
								 | 
							
									return self.db.Counter()
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// implemented by dbStoreSyncIterator
							 | 
						||
| 
								 | 
							
								type keyIterator interface {
							 | 
						||
| 
								 | 
							
									Next() storage.Key
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// generator function for iteration by address range and storage counter
							 | 
						||
| 
								 | 
							
								func (self *DbAccess) iterator(s *syncState) keyIterator {
							 | 
						||
| 
								 | 
							
									it, err := self.db.NewSyncIterator(*(s.DbSyncState))
							 | 
						||
| 
								 | 
							
									if err != nil {
							 | 
						||
| 
								 | 
							
										return nil
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
									return keyIterator(it)
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (self syncState) String() string {
							 | 
						||
| 
								 | 
							
									if self.Synced {
							 | 
						||
| 
								 | 
							
										return fmt.Sprintf(
							 | 
						||
| 
								 | 
							
											"session started at: %v, last seen at: %v, latest key: %v",
							 | 
						||
| 
								 | 
							
											self.SessionAt, self.LastSeenAt,
							 | 
						||
| 
								 | 
							
											self.Latest.Log(),
							 | 
						||
| 
								 | 
							
										)
							 | 
						||
| 
								 | 
							
									} else {
							 | 
						||
| 
								 | 
							
										return fmt.Sprintf(
							 | 
						||
| 
								 | 
							
											"address: %v-%v, index: %v-%v, session started at: %v, last seen at: %v, latest key: %v",
							 | 
						||
| 
								 | 
							
											self.Start.Log(), self.Stop.Log(),
							 | 
						||
| 
								 | 
							
											self.First, self.Last,
							 | 
						||
| 
								 | 
							
											self.SessionAt, self.LastSeenAt,
							 | 
						||
| 
								 | 
							
											self.Latest.Log(),
							 | 
						||
| 
								 | 
							
										)
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// syncer parameters (global, not peer specific)
							 | 
						||
| 
								 | 
							
								type SyncParams struct {
							 | 
						||
| 
								 | 
							
									RequestDbPath      string // path for request db (leveldb)
							 | 
						||
| 
								 | 
							
									RequestDbBatchSize uint   // nuber of items before batch is saved to requestdb
							 | 
						||
| 
								 | 
							
									KeyBufferSize      uint   // size of key buffer
							 | 
						||
| 
								 | 
							
									SyncBatchSize      uint   // maximum batchsize for outgoing requests
							 | 
						||
| 
								 | 
							
									SyncBufferSize     uint   // size of buffer for
							 | 
						||
| 
								 | 
							
									SyncCacheSize      uint   // cache capacity to store request queue in memory
							 | 
						||
| 
								 | 
							
									SyncPriorities     []uint // list of priority levels for req types 0-3
							 | 
						||
| 
								 | 
							
									SyncModes          []bool // list of sync modes for  for req types 0-3
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// constructor with default values
							 | 
						||
| 
								 | 
							
								func NewSyncParams(bzzdir string) *SyncParams {
							 | 
						||
| 
								 | 
							
									return &SyncParams{
							 | 
						||
| 
								 | 
							
										RequestDbPath:      filepath.Join(bzzdir, "requests"),
							 | 
						||
| 
								 | 
							
										RequestDbBatchSize: requestDbBatchSize,
							 | 
						||
| 
								 | 
							
										KeyBufferSize:      keyBufferSize,
							 | 
						||
| 
								 | 
							
										SyncBufferSize:     syncBufferSize,
							 | 
						||
| 
								 | 
							
										SyncBatchSize:      syncBatchSize,
							 | 
						||
| 
								 | 
							
										SyncCacheSize:      syncCacheSize,
							 | 
						||
| 
								 | 
							
										SyncPriorities:     []uint{High, Medium, Medium, Low, Low},
							 | 
						||
| 
								 | 
							
										SyncModes:          []bool{true, true, true, true, false},
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// syncer is the agent that manages content distribution/storage replication/chunk storeRequest forwarding
							 | 
						||
| 
								 | 
							
								type syncer struct {
							 | 
						||
| 
								 | 
							
									*SyncParams                     // sync parameters
							 | 
						||
| 
								 | 
							
									syncF           func() bool     // if syncing is needed
							 | 
						||
| 
								 | 
							
									key             storage.Key     // remote peers address key
							 | 
						||
| 
								 | 
							
									state           *syncState      // sync state for our dbStore
							 | 
						||
| 
								 | 
							
									syncStates      chan *syncState // different stages of sync
							 | 
						||
| 
								 | 
							
									deliveryRequest chan bool       // one of two triggers needed to send unsyncedKeys
							 | 
						||
| 
								 | 
							
									newUnsyncedKeys chan bool       // one of two triggers needed to send unsynced keys
							 | 
						||
| 
								 | 
							
									quit            chan bool       // signal to quit loops
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									// DB related fields
							 | 
						||
| 
								 | 
							
									dbAccess *DbAccess            // access to dbStore
							 | 
						||
| 
								 | 
							
									db       *storage.LDBDatabase // delivery msg db
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									// native fields
							 | 
						||
| 
								 | 
							
									queues     [priorities]*syncDb                   // in-memory cache / queues for sync reqs
							 | 
						||
| 
								 | 
							
									keys       [priorities]chan interface{}          // buffer for unsynced keys
							 | 
						||
| 
								 | 
							
									deliveries [priorities]chan *storeRequestMsgData // delivery
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									// bzz protocol instance outgoing message callbacks (mockable for testing)
							 | 
						||
| 
								 | 
							
									unsyncedKeys func([]*syncRequest, *syncState) error // send unsyncedKeysMsg
							 | 
						||
| 
								 | 
							
									store        func(*storeRequestMsgData) error       // send storeRequestMsg
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// a syncer instance is linked to each peer connection
							 | 
						||
| 
								 | 
							
								// constructor is called from protocol after successful handshake
							 | 
						||
| 
								 | 
							
								// the returned instance is attached to the peer and can be called
							 | 
						||
| 
								 | 
							
								// by the forwarder
							 | 
						||
| 
								 | 
							
								func newSyncer(
							 | 
						||
| 
								 | 
							
									db *storage.LDBDatabase, remotekey storage.Key,
							 | 
						||
| 
								 | 
							
									dbAccess *DbAccess,
							 | 
						||
| 
								 | 
							
									unsyncedKeys func([]*syncRequest, *syncState) error,
							 | 
						||
| 
								 | 
							
									store func(*storeRequestMsgData) error,
							 | 
						||
| 
								 | 
							
									params *SyncParams,
							 | 
						||
| 
								 | 
							
									state *syncState,
							 | 
						||
| 
								 | 
							
									syncF func() bool,
							 | 
						||
| 
								 | 
							
								) (*syncer, error) {
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									syncBufferSize := params.SyncBufferSize
							 | 
						||
| 
								 | 
							
									keyBufferSize := params.KeyBufferSize
							 | 
						||
| 
								 | 
							
									dbBatchSize := params.RequestDbBatchSize
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									self := &syncer{
							 | 
						||
| 
								 | 
							
										syncF:           syncF,
							 | 
						||
| 
								 | 
							
										key:             remotekey,
							 | 
						||
| 
								 | 
							
										dbAccess:        dbAccess,
							 | 
						||
| 
								 | 
							
										syncStates:      make(chan *syncState, 20),
							 | 
						||
| 
								 | 
							
										deliveryRequest: make(chan bool, 1),
							 | 
						||
| 
								 | 
							
										newUnsyncedKeys: make(chan bool, 1),
							 | 
						||
| 
								 | 
							
										SyncParams:      params,
							 | 
						||
| 
								 | 
							
										state:           state,
							 | 
						||
| 
								 | 
							
										quit:            make(chan bool),
							 | 
						||
| 
								 | 
							
										unsyncedKeys:    unsyncedKeys,
							 | 
						||
| 
								 | 
							
										store:           store,
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									// initialising
							 | 
						||
| 
								 | 
							
									for i := 0; i < priorities; i++ {
							 | 
						||
| 
								 | 
							
										self.keys[i] = make(chan interface{}, keyBufferSize)
							 | 
						||
| 
								 | 
							
										self.deliveries[i] = make(chan *storeRequestMsgData)
							 | 
						||
| 
								 | 
							
										// initialise a syncdb instance for each priority queue
							 | 
						||
| 
								 | 
							
										self.queues[i] = newSyncDb(db, remotekey, uint(i), syncBufferSize, dbBatchSize, self.deliver(uint(i)))
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
									glog.V(logger.Info).Infof("syncer started: %v", state)
							 | 
						||
| 
								 | 
							
									// launch chunk delivery service
							 | 
						||
| 
								 | 
							
									go self.syncDeliveries()
							 | 
						||
| 
								 | 
							
									// launch sync task manager
							 | 
						||
| 
								 | 
							
									if self.syncF() {
							 | 
						||
| 
								 | 
							
										go self.sync()
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
									// process unsynced keys to broadcast
							 | 
						||
| 
								 | 
							
									go self.syncUnsyncedKeys()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									return self, nil
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// metadata serialisation
							 | 
						||
| 
								 | 
							
								func encodeSync(state *syncState) (*json.RawMessage, error) {
							 | 
						||
| 
								 | 
							
									data, err := json.MarshalIndent(state, "", " ")
							 | 
						||
| 
								 | 
							
									if err != nil {
							 | 
						||
| 
								 | 
							
										return nil, err
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
									meta := json.RawMessage(data)
							 | 
						||
| 
								 | 
							
									return &meta, nil
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func decodeSync(meta *json.RawMessage) (*syncState, error) {
							 | 
						||
| 
								 | 
							
									if meta == nil {
							 | 
						||
| 
								 | 
							
										return nil, fmt.Errorf("unable to deserialise sync state from <nil>")
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
									data := []byte(*(meta))
							 | 
						||
| 
								 | 
							
									if len(data) == 0 {
							 | 
						||
| 
								 | 
							
										return nil, fmt.Errorf("unable to deserialise sync state from <nil>")
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
									state := &syncState{DbSyncState: &storage.DbSyncState{}}
							 | 
						||
| 
								 | 
							
									err := json.Unmarshal(data, state)
							 | 
						||
| 
								 | 
							
									return state, err
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								/*
							 | 
						||
| 
								 | 
							
								 sync implements the syncing script
							 | 
						||
| 
								 | 
							
								 * first all items left in the request Db are replayed
							 | 
						||
| 
								 | 
							
								   * type = StaleSync
							 | 
						||
| 
								 | 
							
								   * Mode: by default once again via confirmation roundtrip
							 | 
						||
| 
								 | 
							
								   * Priority: the items are replayed as the proirity specified for StaleSync
							 | 
						||
| 
								 | 
							
								   * but within the order respects earlier priority level of request
							 | 
						||
| 
								 | 
							
								 * after all items are consumed for a priority level, the the respective
							 | 
						||
| 
								 | 
							
								  queue for delivery requests is open (this way new reqs not written to db)
							 | 
						||
| 
								 | 
							
								  (TODO: this should be checked)
							 | 
						||
| 
								 | 
							
								 * the sync state provided by the remote peer is used to sync history
							 | 
						||
| 
								 | 
							
								   * all the backlog from earlier (aborted) syncing is completed starting from latest
							 | 
						||
| 
								 | 
							
								   * if Last  < LastSeenAt then all items in between then process all
							 | 
						||
| 
								 | 
							
								     backlog from upto last disconnect
							 | 
						||
| 
								 | 
							
								   * if Last > 0 &&
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								 sync is called from the syncer constructor and is not supposed to be used externally
							 | 
						||
| 
								 | 
							
								*/
							 | 
						||
| 
								 | 
							
								func (self *syncer) sync() {
							 | 
						||
| 
								 | 
							
									state := self.state
							 | 
						||
| 
								 | 
							
									// sync finished
							 | 
						||
| 
								 | 
							
									defer close(self.syncStates)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									// 0. first replay stale requests from request db
							 | 
						||
| 
								 | 
							
									if state.SessionAt == 0 {
							 | 
						||
| 
								 | 
							
										glog.V(logger.Debug).Infof("syncer[%v]: nothing to sync", self.key.Log())
							 | 
						||
| 
								 | 
							
										return
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
									glog.V(logger.Debug).Infof("syncer[%v]: start replaying stale requests from request db", self.key.Log())
							 | 
						||
| 
								 | 
							
									for p := priorities - 1; p >= 0; p-- {
							 | 
						||
| 
								 | 
							
										self.queues[p].dbRead(false, 0, self.replay())
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
									glog.V(logger.Debug).Infof("syncer[%v]: done replaying stale requests from request db", self.key.Log())
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									// unless peer is synced sync unfinished history beginning on
							 | 
						||
| 
								 | 
							
									if !state.Synced {
							 | 
						||
| 
								 | 
							
										start := state.Start
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
										if !storage.IsZeroKey(state.Latest) {
							 | 
						||
| 
								 | 
							
											// 1. there is unfinished earlier sync
							 | 
						||
| 
								 | 
							
											state.Start = state.Latest
							 | 
						||
| 
								 | 
							
											glog.V(logger.Debug).Infof("syncer[%v]: start syncronising backlog (unfinished sync: %v)", self.key.Log(), state)
							 | 
						||
| 
								 | 
							
											// blocks while the entire history upto state is synced
							 | 
						||
| 
								 | 
							
											self.syncState(state)
							 | 
						||
| 
								 | 
							
											if state.Last < state.SessionAt {
							 | 
						||
| 
								 | 
							
												state.First = state.Last + 1
							 | 
						||
| 
								 | 
							
											}
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
										state.Latest = storage.ZeroKey
							 | 
						||
| 
								 | 
							
										state.Start = start
							 | 
						||
| 
								 | 
							
										// 2. sync up to last disconnect1
							 | 
						||
| 
								 | 
							
										if state.First < state.LastSeenAt {
							 | 
						||
| 
								 | 
							
											state.Last = state.LastSeenAt
							 | 
						||
| 
								 | 
							
											glog.V(logger.Debug).Infof("syncer[%v]: start syncronising history upto last disconnect at %v: %v", self.key.Log(), state.LastSeenAt, state)
							 | 
						||
| 
								 | 
							
											self.syncState(state)
							 | 
						||
| 
								 | 
							
											state.First = state.LastSeenAt
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
										state.Latest = storage.ZeroKey
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									} else {
							 | 
						||
| 
								 | 
							
										// synchronisation starts at end of last session
							 | 
						||
| 
								 | 
							
										state.First = state.LastSeenAt
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									// 3. sync up to current session start
							 | 
						||
| 
								 | 
							
									// if there have been new chunks since last session
							 | 
						||
| 
								 | 
							
									if state.LastSeenAt < state.SessionAt {
							 | 
						||
| 
								 | 
							
										state.Last = state.SessionAt
							 | 
						||
| 
								 | 
							
										glog.V(logger.Debug).Infof("syncer[%v]: start syncronising history since last disconnect at %v up until session start at %v: %v", self.key.Log(), state.LastSeenAt, state.SessionAt, state)
							 | 
						||
| 
								 | 
							
										// blocks until state syncing is finished
							 | 
						||
| 
								 | 
							
										self.syncState(state)
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
									glog.V(logger.Info).Infof("syncer[%v]: syncing all history complete", self.key.Log())
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// wait till syncronised block uptil state is synced
							 | 
						||
| 
								 | 
							
								func (self *syncer) syncState(state *syncState) {
							 | 
						||
| 
								 | 
							
									self.syncStates <- state
							 | 
						||
| 
								 | 
							
									select {
							 | 
						||
| 
								 | 
							
									case <-state.synced:
							 | 
						||
| 
								 | 
							
									case <-self.quit:
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// stop quits both request processor and saves the request cache to disk
							 | 
						||
| 
								 | 
							
								func (self *syncer) stop() {
							 | 
						||
| 
								 | 
							
									close(self.quit)
							 | 
						||
| 
								 | 
							
									glog.V(logger.Detail).Infof("syncer[%v]: stop and save sync request db backlog", self.key.Log())
							 | 
						||
| 
								 | 
							
									for _, db := range self.queues {
							 | 
						||
| 
								 | 
							
										db.stop()
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// rlp serialisable sync request
							 | 
						||
| 
								 | 
							
								type syncRequest struct {
							 | 
						||
| 
								 | 
							
									Key      storage.Key
							 | 
						||
| 
								 | 
							
									Priority uint
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (self *syncRequest) String() string {
							 | 
						||
| 
								 | 
							
									return fmt.Sprintf("<Key: %v, Priority: %v>", self.Key.Log(), self.Priority)
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (self *syncer) newSyncRequest(req interface{}, p int) (*syncRequest, error) {
							 | 
						||
| 
								 | 
							
									key, _, _, _, err := parseRequest(req)
							 | 
						||
| 
								 | 
							
									// TODO: if req has chunk, it should be put in a cache
							 | 
						||
| 
								 | 
							
									// create
							 | 
						||
| 
								 | 
							
									if err != nil {
							 | 
						||
| 
								 | 
							
										return nil, err
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
									return &syncRequest{key, uint(p)}, nil
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// serves historical items from the DB
							 | 
						||
| 
								 | 
							
								// * read is on demand, blocking unless history channel is read
							 | 
						||
| 
								 | 
							
								// * accepts sync requests (syncStates) to create new db iterator
							 | 
						||
| 
								 | 
							
								// * closes the channel one iteration finishes
							 | 
						||
| 
								 | 
							
								func (self *syncer) syncHistory(state *syncState) chan interface{} {
							 | 
						||
| 
								 | 
							
									var n uint
							 | 
						||
| 
								 | 
							
									history := make(chan interface{})
							 | 
						||
| 
								 | 
							
									glog.V(logger.Debug).Infof("syncer[%v]: syncing history between %v - %v for chunk addresses %v - %v", self.key.Log(), state.First, state.Last, state.Start, state.Stop)
							 | 
						||
| 
								 | 
							
									it := self.dbAccess.iterator(state)
							 | 
						||
| 
								 | 
							
									if it != nil {
							 | 
						||
| 
								 | 
							
										go func() {
							 | 
						||
| 
								 | 
							
											// signal end of the iteration ended
							 | 
						||
| 
								 | 
							
											defer close(history)
							 | 
						||
| 
								 | 
							
										IT:
							 | 
						||
| 
								 | 
							
											for {
							 | 
						||
| 
								 | 
							
												key := it.Next()
							 | 
						||
| 
								 | 
							
												if key == nil {
							 | 
						||
| 
								 | 
							
													break IT
							 | 
						||
| 
								 | 
							
												}
							 | 
						||
| 
								 | 
							
												select {
							 | 
						||
| 
								 | 
							
												// blocking until history channel is read from
							 | 
						||
| 
								 | 
							
												case history <- storage.Key(key):
							 | 
						||
| 
								 | 
							
													n++
							 | 
						||
| 
								 | 
							
													glog.V(logger.Detail).Infof("syncer[%v]: history: %v (%v keys)", self.key.Log(), key.Log(), n)
							 | 
						||
| 
								 | 
							
													state.Latest = key
							 | 
						||
| 
								 | 
							
												case <-self.quit:
							 | 
						||
| 
								 | 
							
													return
							 | 
						||
| 
								 | 
							
												}
							 | 
						||
| 
								 | 
							
											}
							 | 
						||
| 
								 | 
							
											glog.V(logger.Debug).Infof("syncer[%v]: finished syncing history between %v - %v for chunk addresses %v - %v (at %v) (chunks = %v)", self.key.Log(), state.First, state.Last, state.Start, state.Stop, state.Latest, n)
							 | 
						||
| 
								 | 
							
										}()
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
									return history
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// triggers key syncronisation
							 | 
						||
| 
								 | 
							
								func (self *syncer) sendUnsyncedKeys() {
							 | 
						||
| 
								 | 
							
									select {
							 | 
						||
| 
								 | 
							
									case self.deliveryRequest <- true:
							 | 
						||
| 
								 | 
							
									default:
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// assembles a new batch of unsynced keys
							 | 
						||
| 
								 | 
							
								// * keys are drawn from the key buffers in order of priority queue
							 | 
						||
| 
								 | 
							
								// * if the queues of priority for History (HistoryReq) or higher are depleted,
							 | 
						||
| 
								 | 
							
								//   historical data is used so historical items are lower priority within
							 | 
						||
| 
								 | 
							
								//   their priority group.
							 | 
						||
| 
								 | 
							
								// * Order of historical data is unspecified
							 | 
						||
| 
								 | 
							
								func (self *syncer) syncUnsyncedKeys() {
							 | 
						||
| 
								 | 
							
									// send out new
							 | 
						||
| 
								 | 
							
									var unsynced []*syncRequest
							 | 
						||
| 
								 | 
							
									var more, justSynced bool
							 | 
						||
| 
								 | 
							
									var keyCount, historyCnt int
							 | 
						||
| 
								 | 
							
									var history chan interface{}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									priority := High
							 | 
						||
| 
								 | 
							
									keys := self.keys[priority]
							 | 
						||
| 
								 | 
							
									var newUnsyncedKeys, deliveryRequest chan bool
							 | 
						||
| 
								 | 
							
									keyCounts := make([]int, priorities)
							 | 
						||
| 
								 | 
							
									histPrior := self.SyncPriorities[HistoryReq]
							 | 
						||
| 
								 | 
							
									syncStates := self.syncStates
							 | 
						||
| 
								 | 
							
									state := self.state
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								LOOP:
							 | 
						||
| 
								 | 
							
									for {
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
										var req interface{}
							 | 
						||
| 
								 | 
							
										// select the highest priority channel to read from
							 | 
						||
| 
								 | 
							
										// keys channels are buffered so the highest priority ones
							 | 
						||
| 
								 | 
							
										// are checked first - integrity can only be guaranteed if writing
							 | 
						||
| 
								 | 
							
										// is locked while selecting
							 | 
						||
| 
								 | 
							
										if priority != High || len(keys) == 0 {
							 | 
						||
| 
								 | 
							
											// selection is not needed if the High priority queue has items
							 | 
						||
| 
								 | 
							
											keys = nil
							 | 
						||
| 
								 | 
							
										PRIORITIES:
							 | 
						||
| 
								 | 
							
											for priority = High; priority >= 0; priority-- {
							 | 
						||
| 
								 | 
							
												// the first priority channel that is non-empty will be assigned to keys
							 | 
						||
| 
								 | 
							
												if len(self.keys[priority]) > 0 {
							 | 
						||
| 
								 | 
							
													glog.V(logger.Detail).Infof("syncer[%v]: reading request with	 priority %v", self.key.Log(), priority)
							 | 
						||
| 
								 | 
							
													keys = self.keys[priority]
							 | 
						||
| 
								 | 
							
													break PRIORITIES
							 | 
						||
| 
								 | 
							
												}
							 | 
						||
| 
								 | 
							
												glog.V(logger.Detail).Infof("syncer[%v/%v]: queue: [%v, %v, %v]", self.key.Log(), priority, len(self.keys[High]), len(self.keys[Medium]), len(self.keys[Low]))
							 | 
						||
| 
								 | 
							
												// if the input queue is empty on this level, resort to history if there is any
							 | 
						||
| 
								 | 
							
												if uint(priority) == histPrior && history != nil {
							 | 
						||
| 
								 | 
							
													glog.V(logger.Detail).Infof("syncer[%v]: reading history for %v", self.key.Log(), self.key)
							 | 
						||
| 
								 | 
							
													keys = history
							 | 
						||
| 
								 | 
							
													break PRIORITIES
							 | 
						||
| 
								 | 
							
												}
							 | 
						||
| 
								 | 
							
											}
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
										// if peer ready to receive but nothing to send
							 | 
						||
| 
								 | 
							
										if keys == nil && deliveryRequest == nil {
							 | 
						||
| 
								 | 
							
											// if no items left and switch to waiting mode
							 | 
						||
| 
								 | 
							
											glog.V(logger.Detail).Infof("syncer[%v]: buffers consumed. Waiting", self.key.Log())
							 | 
						||
| 
								 | 
							
											newUnsyncedKeys = self.newUnsyncedKeys
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
										// send msg iff
							 | 
						||
| 
								 | 
							
										// * peer is ready to receive keys AND (
							 | 
						||
| 
								 | 
							
										// * all queues and history are depleted OR
							 | 
						||
| 
								 | 
							
										// * batch full OR
							 | 
						||
| 
								 | 
							
										// * all history have been consumed, synced)
							 | 
						||
| 
								 | 
							
										if deliveryRequest == nil &&
							 | 
						||
| 
								 | 
							
											(justSynced ||
							 | 
						||
| 
								 | 
							
												len(unsynced) > 0 && keys == nil ||
							 | 
						||
| 
								 | 
							
												len(unsynced) == int(self.SyncBatchSize)) {
							 | 
						||
| 
								 | 
							
											justSynced = false
							 | 
						||
| 
								 | 
							
											// listen to requests
							 | 
						||
| 
								 | 
							
											deliveryRequest = self.deliveryRequest
							 | 
						||
| 
								 | 
							
											newUnsyncedKeys = nil // not care about data until next req comes in
							 | 
						||
| 
								 | 
							
											// set sync to current counter
							 | 
						||
| 
								 | 
							
											// (all nonhistorical outgoing traffic sheduled and persisted
							 | 
						||
| 
								 | 
							
											state.LastSeenAt = self.dbAccess.counter()
							 | 
						||
| 
								 | 
							
											state.Latest = storage.ZeroKey
							 | 
						||
| 
								 | 
							
											glog.V(logger.Detail).Infof("syncer[%v]: sending %v", self.key.Log(), unsynced)
							 | 
						||
| 
								 | 
							
											//  send the unsynced keys
							 | 
						||
| 
								 | 
							
											stateCopy := *state
							 | 
						||
| 
								 | 
							
											err := self.unsyncedKeys(unsynced, &stateCopy)
							 | 
						||
| 
								 | 
							
											if err != nil {
							 | 
						||
| 
								 | 
							
												glog.V(logger.Warn).Infof("syncer[%v]: unable to send unsynced keys: %v", err)
							 | 
						||
| 
								 | 
							
											}
							 | 
						||
| 
								 | 
							
											self.state = state
							 | 
						||
| 
								 | 
							
											glog.V(logger.Debug).Infof("syncer[%v]: --> %v keys sent: (total: %v (%v), history: %v), sent sync state: %v", self.key.Log(), len(unsynced), keyCounts, keyCount, historyCnt, stateCopy)
							 | 
						||
| 
								 | 
							
											unsynced = nil
							 | 
						||
| 
								 | 
							
											keys = nil
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
										// process item and add it to the batch
							 | 
						||
| 
								 | 
							
										select {
							 | 
						||
| 
								 | 
							
										case <-self.quit:
							 | 
						||
| 
								 | 
							
											break LOOP
							 | 
						||
| 
								 | 
							
										case req, more = <-keys:
							 | 
						||
| 
								 | 
							
											if keys == history && !more {
							 | 
						||
| 
								 | 
							
												glog.V(logger.Detail).Infof("syncer[%v]: syncing history segment complete", self.key.Log())
							 | 
						||
| 
								 | 
							
												// history channel is closed, waiting for new state (called from sync())
							 | 
						||
| 
								 | 
							
												syncStates = self.syncStates
							 | 
						||
| 
								 | 
							
												state.Synced = true // this signals that the  current segment is complete
							 | 
						||
| 
								 | 
							
												select {
							 | 
						||
| 
								 | 
							
												case state.synced <- false:
							 | 
						||
| 
								 | 
							
												case <-self.quit:
							 | 
						||
| 
								 | 
							
													break LOOP
							 | 
						||
| 
								 | 
							
												}
							 | 
						||
| 
								 | 
							
												justSynced = true
							 | 
						||
| 
								 | 
							
												history = nil
							 | 
						||
| 
								 | 
							
											}
							 | 
						||
| 
								 | 
							
										case <-deliveryRequest:
							 | 
						||
| 
								 | 
							
											glog.V(logger.Detail).Infof("syncer[%v]: peer ready to receive", self.key.Log())
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
											// this 1 cap channel can wake up the loop
							 | 
						||
| 
								 | 
							
											// signaling that peer is ready to receive unsynced Keys
							 | 
						||
| 
								 | 
							
											// the channel is set to nil any further writes will be ignored
							 | 
						||
| 
								 | 
							
											deliveryRequest = nil
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
										case <-newUnsyncedKeys:
							 | 
						||
| 
								 | 
							
											glog.V(logger.Detail).Infof("syncer[%v]: new unsynced keys available", self.key.Log())
							 | 
						||
| 
								 | 
							
											// this 1 cap channel can wake up the loop
							 | 
						||
| 
								 | 
							
											// signals that data is available to send if peer is ready to receive
							 | 
						||
| 
								 | 
							
											newUnsyncedKeys = nil
							 | 
						||
| 
								 | 
							
											keys = self.keys[High]
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
										case state, more = <-syncStates:
							 | 
						||
| 
								 | 
							
											// this resets the state
							 | 
						||
| 
								 | 
							
											if !more {
							 | 
						||
| 
								 | 
							
												state = self.state
							 | 
						||
| 
								 | 
							
												glog.V(logger.Detail).Infof("syncer[%v]: (priority %v) syncing complete upto %v)", self.key.Log(), priority, state)
							 | 
						||
| 
								 | 
							
												state.Synced = true
							 | 
						||
| 
								 | 
							
												syncStates = nil
							 | 
						||
| 
								 | 
							
											} else {
							 | 
						||
| 
								 | 
							
												glog.V(logger.Detail).Infof("syncer[%v]: (priority %v) syncing history upto %v priority %v)", self.key.Log(), priority, state, histPrior)
							 | 
						||
| 
								 | 
							
												state.Synced = false
							 | 
						||
| 
								 | 
							
												history = self.syncHistory(state)
							 | 
						||
| 
								 | 
							
												// only one history at a time, only allow another one once the
							 | 
						||
| 
								 | 
							
												// history channel is closed
							 | 
						||
| 
								 | 
							
												syncStates = nil
							 | 
						||
| 
								 | 
							
											}
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
										if req == nil {
							 | 
						||
| 
								 | 
							
											continue LOOP
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
										glog.V(logger.Detail).Infof("syncer[%v]: (priority %v) added to unsynced keys: %v", self.key.Log(), priority, req)
							 | 
						||
| 
								 | 
							
										keyCounts[priority]++
							 | 
						||
| 
								 | 
							
										keyCount++
							 | 
						||
| 
								 | 
							
										if keys == history {
							 | 
						||
| 
								 | 
							
											glog.V(logger.Detail).Infof("syncer[%v]: (priority %v) history item %v (synced = %v)", self.key.Log(), priority, req, state.Synced)
							 | 
						||
| 
								 | 
							
											historyCnt++
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
										if sreq, err := self.newSyncRequest(req, priority); err == nil {
							 | 
						||
| 
								 | 
							
											// extract key from req
							 | 
						||
| 
								 | 
							
											glog.V(logger.Detail).Infof("syncer(priority %v): request %v (synced = %v)", self.key.Log(), priority, req, state.Synced)
							 | 
						||
| 
								 | 
							
											unsynced = append(unsynced, sreq)
							 | 
						||
| 
								 | 
							
										} else {
							 | 
						||
| 
								 | 
							
											glog.V(logger.Warn).Infof("syncer(priority %v): error creating request for %v: %v)", self.key.Log(), priority, req, state.Synced, err)
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// delivery loop
							 | 
						||
| 
								 | 
							
								// takes into account priority, send store Requests with chunk (delivery)
							 | 
						||
| 
								 | 
							
								// idle blocking if no new deliveries in any of the queues
							 | 
						||
| 
								 | 
							
								func (self *syncer) syncDeliveries() {
							 | 
						||
| 
								 | 
							
									var req *storeRequestMsgData
							 | 
						||
| 
								 | 
							
									p := High
							 | 
						||
| 
								 | 
							
									var deliveries chan *storeRequestMsgData
							 | 
						||
| 
								 | 
							
									var msg *storeRequestMsgData
							 | 
						||
| 
								 | 
							
									var err error
							 | 
						||
| 
								 | 
							
									var c = [priorities]int{}
							 | 
						||
| 
								 | 
							
									var n = [priorities]int{}
							 | 
						||
| 
								 | 
							
									var total, success uint
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									for {
							 | 
						||
| 
								 | 
							
										deliveries = self.deliveries[p]
							 | 
						||
| 
								 | 
							
										select {
							 | 
						||
| 
								 | 
							
										case req = <-deliveries:
							 | 
						||
| 
								 | 
							
											n[p]++
							 | 
						||
| 
								 | 
							
											c[p]++
							 | 
						||
| 
								 | 
							
										default:
							 | 
						||
| 
								 | 
							
											if p == Low {
							 | 
						||
| 
								 | 
							
												// blocking, depletion on all channels, no preference for priority
							 | 
						||
| 
								 | 
							
												select {
							 | 
						||
| 
								 | 
							
												case req = <-self.deliveries[High]:
							 | 
						||
| 
								 | 
							
													n[High]++
							 | 
						||
| 
								 | 
							
												case req = <-self.deliveries[Medium]:
							 | 
						||
| 
								 | 
							
													n[Medium]++
							 | 
						||
| 
								 | 
							
												case req = <-self.deliveries[Low]:
							 | 
						||
| 
								 | 
							
													n[Low]++
							 | 
						||
| 
								 | 
							
												case <-self.quit:
							 | 
						||
| 
								 | 
							
													return
							 | 
						||
| 
								 | 
							
												}
							 | 
						||
| 
								 | 
							
												p = High
							 | 
						||
| 
								 | 
							
											} else {
							 | 
						||
| 
								 | 
							
												p--
							 | 
						||
| 
								 | 
							
												continue
							 | 
						||
| 
								 | 
							
											}
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
										total++
							 | 
						||
| 
								 | 
							
										msg, err = self.newStoreRequestMsgData(req)
							 | 
						||
| 
								 | 
							
										if err != nil {
							 | 
						||
| 
								 | 
							
											glog.V(logger.Warn).Infof("syncer[%v]: failed to create store request for %v: %v", self.key.Log(), req, err)
							 | 
						||
| 
								 | 
							
										} else {
							 | 
						||
| 
								 | 
							
											err = self.store(msg)
							 | 
						||
| 
								 | 
							
											if err != nil {
							 | 
						||
| 
								 | 
							
												glog.V(logger.Warn).Infof("syncer[%v]: failed to deliver %v: %v", self.key.Log(), req, err)
							 | 
						||
| 
								 | 
							
											} else {
							 | 
						||
| 
								 | 
							
												success++
							 | 
						||
| 
								 | 
							
												glog.V(logger.Detail).Infof("syncer[%v]: %v successfully delivered", self.key.Log(), req)
							 | 
						||
| 
								 | 
							
											}
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
										if total%self.SyncBatchSize == 0 {
							 | 
						||
| 
								 | 
							
											glog.V(logger.Debug).Infof("syncer[%v]: deliver Total: %v, Success: %v, High: %v/%v, Medium: %v/%v, Low %v/%v", self.key.Log(), total, success, c[High], n[High], c[Medium], n[Medium], c[Low], n[Low])
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								/*
							 | 
						||
| 
								 | 
							
								 addRequest handles requests for delivery
							 | 
						||
| 
								 | 
							
								 it accepts 4 types:
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								 * storeRequestMsgData: coming from netstore propagate response
							 | 
						||
| 
								 | 
							
								 * chunk: coming from forwarding (questionable: id?)
							 | 
						||
| 
								 | 
							
								 * key: from incoming syncRequest
							 | 
						||
| 
								 | 
							
								 * syncDbEntry: key,id encoded in db
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								 If sync mode is on for the type of request, then
							 | 
						||
| 
								 | 
							
								 it sends the request to the keys queue of the correct priority
							 | 
						||
| 
								 | 
							
								 channel buffered with capacity (SyncBufferSize)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								 If sync mode is off then, requests are directly sent to deliveries
							 | 
						||
| 
								 | 
							
								*/
							 | 
						||
| 
								 | 
							
								func (self *syncer) addRequest(req interface{}, ty int) {
							 | 
						||
| 
								 | 
							
									// retrieve priority for request type name int8
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									priority := self.SyncPriorities[ty]
							 | 
						||
| 
								 | 
							
									// sync mode for this type ON
							 | 
						||
| 
								 | 
							
									if self.syncF() || ty == DeliverReq {
							 | 
						||
| 
								 | 
							
										if self.SyncModes[ty] {
							 | 
						||
| 
								 | 
							
											self.addKey(req, priority, self.quit)
							 | 
						||
| 
								 | 
							
										} else {
							 | 
						||
| 
								 | 
							
											self.addDelivery(req, priority, self.quit)
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// addKey queues sync request for sync confirmation with given priority
							 | 
						||
| 
								 | 
							
								// ie the key will go out in an unsyncedKeys message
							 | 
						||
| 
								 | 
							
								func (self *syncer) addKey(req interface{}, priority uint, quit chan bool) bool {
							 | 
						||
| 
								 | 
							
									select {
							 | 
						||
| 
								 | 
							
									case self.keys[priority] <- req:
							 | 
						||
| 
								 | 
							
										// this wakes up the unsynced keys loop if idle
							 | 
						||
| 
								 | 
							
										select {
							 | 
						||
| 
								 | 
							
										case self.newUnsyncedKeys <- true:
							 | 
						||
| 
								 | 
							
										default:
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
										return true
							 | 
						||
| 
								 | 
							
									case <-quit:
							 | 
						||
| 
								 | 
							
										return false
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// addDelivery queues delivery request for with given priority
							 | 
						||
| 
								 | 
							
								// ie the chunk will be delivered ASAP mod priority queueing handled by syncdb
							 | 
						||
| 
								 | 
							
								// requests are persisted across sessions for correct sync
							 | 
						||
| 
								 | 
							
								func (self *syncer) addDelivery(req interface{}, priority uint, quit chan bool) bool {
							 | 
						||
| 
								 | 
							
									select {
							 | 
						||
| 
								 | 
							
									case self.queues[priority].buffer <- req:
							 | 
						||
| 
								 | 
							
										return true
							 | 
						||
| 
								 | 
							
									case <-quit:
							 | 
						||
| 
								 | 
							
										return false
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// doDelivery delivers the chunk for the request with given priority
							 | 
						||
| 
								 | 
							
								// without queuing
							 | 
						||
| 
								 | 
							
								func (self *syncer) doDelivery(req interface{}, priority uint, quit chan bool) bool {
							 | 
						||
| 
								 | 
							
									msgdata, err := self.newStoreRequestMsgData(req)
							 | 
						||
| 
								 | 
							
									if err != nil {
							 | 
						||
| 
								 | 
							
										glog.V(logger.Warn).Infof("unable to deliver request %v: %v", msgdata, err)
							 | 
						||
| 
								 | 
							
										return false
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
									select {
							 | 
						||
| 
								 | 
							
									case self.deliveries[priority] <- msgdata:
							 | 
						||
| 
								 | 
							
										return true
							 | 
						||
| 
								 | 
							
									case <-quit:
							 | 
						||
| 
								 | 
							
										return false
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// returns the delivery function for given priority
							 | 
						||
| 
								 | 
							
								// passed on to syncDb
							 | 
						||
| 
								 | 
							
								func (self *syncer) deliver(priority uint) func(req interface{}, quit chan bool) bool {
							 | 
						||
| 
								 | 
							
									return func(req interface{}, quit chan bool) bool {
							 | 
						||
| 
								 | 
							
										return self.doDelivery(req, priority, quit)
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// returns the replay function passed on to syncDb
							 | 
						||
| 
								 | 
							
								// depending on sync mode settings for BacklogReq,
							 | 
						||
| 
								 | 
							
								// re	play of request db backlog sends items via confirmation
							 | 
						||
| 
								 | 
							
								// or directly delivers
							 | 
						||
| 
								 | 
							
								func (self *syncer) replay() func(req interface{}, quit chan bool) bool {
							 | 
						||
| 
								 | 
							
									sync := self.SyncModes[BacklogReq]
							 | 
						||
| 
								 | 
							
									priority := self.SyncPriorities[BacklogReq]
							 | 
						||
| 
								 | 
							
									// sync mode for this type ON
							 | 
						||
| 
								 | 
							
									if sync {
							 | 
						||
| 
								 | 
							
										return func(req interface{}, quit chan bool) bool {
							 | 
						||
| 
								 | 
							
											return self.addKey(req, priority, quit)
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
									} else {
							 | 
						||
| 
								 | 
							
										return func(req interface{}, quit chan bool) bool {
							 | 
						||
| 
								 | 
							
											return self.doDelivery(req, priority, quit)
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// given a request, extends it to a full storeRequestMsgData
							 | 
						||
| 
								 | 
							
								// polimorphic: see addRequest for the types accepted
							 | 
						||
| 
								 | 
							
								func (self *syncer) newStoreRequestMsgData(req interface{}) (*storeRequestMsgData, error) {
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									key, id, chunk, sreq, err := parseRequest(req)
							 | 
						||
| 
								 | 
							
									if err != nil {
							 | 
						||
| 
								 | 
							
										return nil, err
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									if sreq == nil {
							 | 
						||
| 
								 | 
							
										if chunk == nil {
							 | 
						||
| 
								 | 
							
											var err error
							 | 
						||
| 
								 | 
							
											chunk, err = self.dbAccess.get(key)
							 | 
						||
| 
								 | 
							
											if err != nil {
							 | 
						||
| 
								 | 
							
												return nil, err
							 | 
						||
| 
								 | 
							
											}
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
										sreq = &storeRequestMsgData{
							 | 
						||
| 
								 | 
							
											Id:    id,
							 | 
						||
| 
								 | 
							
											Key:   chunk.Key,
							 | 
						||
| 
								 | 
							
											SData: chunk.SData,
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									return sreq, nil
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// parse request types and extracts, key, id, chunk, request if available
							 | 
						||
| 
								 | 
							
								// does not do chunk lookup !
							 | 
						||
| 
								 | 
							
								func parseRequest(req interface{}) (storage.Key, uint64, *storage.Chunk, *storeRequestMsgData, error) {
							 | 
						||
| 
								 | 
							
									var key storage.Key
							 | 
						||
| 
								 | 
							
									var entry *syncDbEntry
							 | 
						||
| 
								 | 
							
									var chunk *storage.Chunk
							 | 
						||
| 
								 | 
							
									var id uint64
							 | 
						||
| 
								 | 
							
									var ok bool
							 | 
						||
| 
								 | 
							
									var sreq *storeRequestMsgData
							 | 
						||
| 
								 | 
							
									var err error
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									if key, ok = req.(storage.Key); ok {
							 | 
						||
| 
								 | 
							
										id = generateId()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									} else if entry, ok = req.(*syncDbEntry); ok {
							 | 
						||
| 
								 | 
							
										id = binary.BigEndian.Uint64(entry.val[32:])
							 | 
						||
| 
								 | 
							
										key = storage.Key(entry.val[:32])
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									} else if chunk, ok = req.(*storage.Chunk); ok {
							 | 
						||
| 
								 | 
							
										key = chunk.Key
							 | 
						||
| 
								 | 
							
										id = generateId()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									} else if sreq, ok = req.(*storeRequestMsgData); ok {
							 | 
						||
| 
								 | 
							
										key = sreq.Key
							 | 
						||
| 
								 | 
							
									} else {
							 | 
						||
| 
								 | 
							
										err = fmt.Errorf("type not allowed: %v (%T)", req, req)
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									return key, id, chunk, sreq, err
							 | 
						||
| 
								 | 
							
								}
							 |