2016-08-29 21:18:00 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								// Copyright 2016 The go-ethereum Authors  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// This file is part of the go-ethereum library.  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// The go-ethereum library is free software: you can redistribute it and/or modify  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// it under the terms of the GNU Lesser General Public License as published by  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// the Free Software Foundation, either version 3 of the License, or  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// (at your option) any later version.  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// The go-ethereum library is distributed in the hope that it will be useful,  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// but WITHOUT ANY WARRANTY; without even the implied warranty of  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// GNU Lesser General Public License for more details.  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// You should have received a copy of the GNU Lesser General Public License  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								package  network  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  (  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									"encoding/binary" 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									"encoding/json" 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									"fmt" 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									"path/filepath" 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2017-02-22 14:10:07 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									"github.com/ethereum/go-ethereum/log" 
							 
						 
					
						
							
								
									
										
										
										
											2016-08-29 21:18:00 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
									"github.com/ethereum/go-ethereum/swarm/storage" 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								)  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// syncer parameters (global, not peer specific) default values  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								const  (  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									requestDbBatchSize  =  512   // size of batch before written to request db 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									keyBufferSize       =  1024  // size of buffer  for unsynced keys 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									syncBatchSize       =  128   // maximum batchsize for outgoing requests 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									syncBufferSize      =  128   // size of buffer  for delivery requests 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									syncCacheSize       =  1024  // cache capacity to store request queue in memory 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								)  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// priorities  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								const  (  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									Low         =  iota  // 0 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									Medium             // 1 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									High               // 2 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									priorities         // 3 number of priority levels 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								)  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// request types  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								const  (  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									DeliverReq    =  iota  // 0 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									PushReq              // 1 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									PropagateReq         // 2 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									HistoryReq           // 3 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									BacklogReq           // 4 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								)  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// json serialisable struct to record the syncronisation state between 2 peers  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								type  syncState  struct  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									* storage . DbSyncState  // embeds the following 4 fields: 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									// Start      Key    // lower limit of address space 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									// Stop       Key    // upper limit of address space 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									// First      uint64 // counter taken from last sync state 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									// Last       uint64 // counter of remote peer dbStore at the time of last connection 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									SessionAt   uint64       // set at the time of connection 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									LastSeenAt  uint64       // set at the time of connection 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									Latest      storage . Key  // cursor of dbstore when last (continuously set by syncer) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									Synced      bool         // true iff Sync is done up to the last disconnect 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									synced      chan  bool    // signal that sync stage finished 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// wrapper of db-s to provide mockable custom local chunk store access to syncer  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								type  DbAccess  struct  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									db   * storage . DbStore 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									loc  * storage . LocalStore 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								func  NewDbAccess ( loc  * storage . LocalStore )  * DbAccess  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									return  & DbAccess { loc . DbStore . ( * storage . DbStore ) ,  loc } 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// to obtain the chunks from key or request db entry only  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								func  ( self  * DbAccess )  get ( key  storage . Key )  ( * storage . Chunk ,  error )  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									return  self . loc . Get ( key ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// current storage counter of chunk db  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								func  ( self  * DbAccess )  counter ( )  uint64  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									return  self . db . Counter ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// implemented by dbStoreSyncIterator  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								type  keyIterator  interface  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									Next ( )  storage . Key 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// generator function for iteration by address range and storage counter  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								func  ( self  * DbAccess )  iterator ( s  * syncState )  keyIterator  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									it ,  err  :=  self . db . NewSyncIterator ( * ( s . DbSyncState ) ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  err  !=  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										return  nil 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									return  keyIterator ( it ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								func  ( self  syncState )  String ( )  string  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  self . Synced  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										return  fmt . Sprintf ( 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											"session started at: %v, last seen at: %v, latest key: %v" , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											self . SessionAt ,  self . LastSeenAt , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											self . Latest . Log ( ) , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									}  else  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										return  fmt . Sprintf ( 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											"address: %v-%v, index: %v-%v, session started at: %v, last seen at: %v, latest key: %v" , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											self . Start . Log ( ) ,  self . Stop . Log ( ) , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											self . First ,  self . Last , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											self . SessionAt ,  self . LastSeenAt , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											self . Latest . Log ( ) , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// syncer parameters (global, not peer specific)  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								type  SyncParams  struct  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									RequestDbPath       string  // path for request db (leveldb) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									RequestDbBatchSize  uint    // nuber of items before batch is saved to requestdb 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									KeyBufferSize       uint    // size of key buffer 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									SyncBatchSize       uint    // maximum batchsize for outgoing requests 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									SyncBufferSize      uint    // size of buffer for 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									SyncCacheSize       uint    // cache capacity to store request queue in memory 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									SyncPriorities      [ ] uint  // list of priority levels for req types 0-3 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									SyncModes           [ ] bool  // list of sync modes for  for req types 0-3 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// constructor with default values  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								func  NewSyncParams ( bzzdir  string )  * SyncParams  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									return  & SyncParams { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										RequestDbPath :       filepath . Join ( bzzdir ,  "requests" ) , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										RequestDbBatchSize :  requestDbBatchSize , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										KeyBufferSize :       keyBufferSize , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										SyncBufferSize :      syncBufferSize , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										SyncBatchSize :       syncBatchSize , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										SyncCacheSize :       syncCacheSize , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										SyncPriorities :      [ ] uint { High ,  Medium ,  Medium ,  Low ,  Low } , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										SyncModes :           [ ] bool { true ,  true ,  true ,  true ,  false } , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// syncer is the agent that manages content distribution/storage replication/chunk storeRequest forwarding  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								type  syncer  struct  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									* SyncParams                      // sync parameters 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									syncF            func ( )  bool      // if syncing is needed 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									key              storage . Key      // remote peers address key 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									state            * syncState       // sync state for our dbStore 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									syncStates       chan  * syncState  // different stages of sync 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									deliveryRequest  chan  bool        // one of two triggers needed to send unsyncedKeys 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									newUnsyncedKeys  chan  bool        // one of two triggers needed to send unsynced keys 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									quit             chan  bool        // signal to quit loops 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									// DB related fields 
							 
						 
					
						
							
								
									
										
										
										
											2017-08-08 20:34:35 +03:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									dbAccess  * DbAccess  // access to dbStore 
							 
						 
					
						
							
								
									
										
										
										
											2016-08-29 21:18:00 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									// native fields 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									queues      [ priorities ] * syncDb                    // in-memory cache / queues for sync reqs 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									keys        [ priorities ] chan  interface { }           // buffer for unsynced keys 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									deliveries  [ priorities ] chan  * storeRequestMsgData  // delivery 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									// bzz protocol instance outgoing message callbacks (mockable for testing) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									unsyncedKeys  func ( [ ] * syncRequest ,  * syncState )  error  // send unsyncedKeysMsg 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									store         func ( * storeRequestMsgData )  error        // send storeRequestMsg 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// a syncer instance is linked to each peer connection  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// constructor is called from protocol after successful handshake  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// the returned instance is attached to the peer and can be called  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// by the forwarder  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								func  newSyncer (  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									db  * storage . LDBDatabase ,  remotekey  storage . Key , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									dbAccess  * DbAccess , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									unsyncedKeys  func ( [ ] * syncRequest ,  * syncState )  error , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									store  func ( * storeRequestMsgData )  error , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									params  * SyncParams , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									state  * syncState , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									syncF  func ( )  bool , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								)  ( * syncer ,  error )  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									syncBufferSize  :=  params . SyncBufferSize 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									keyBufferSize  :=  params . KeyBufferSize 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									dbBatchSize  :=  params . RequestDbBatchSize 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									self  :=  & syncer { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										syncF :            syncF , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										key :              remotekey , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										dbAccess :         dbAccess , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										syncStates :       make ( chan  * syncState ,  20 ) , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										deliveryRequest :  make ( chan  bool ,  1 ) , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										newUnsyncedKeys :  make ( chan  bool ,  1 ) , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										SyncParams :       params , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										state :            state , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										quit :             make ( chan  bool ) , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										unsyncedKeys :     unsyncedKeys , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										store :            store , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									// initialising 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									for  i  :=  0 ;  i  <  priorities ;  i ++  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										self . keys [ i ]  =  make ( chan  interface { } ,  keyBufferSize ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										self . deliveries [ i ]  =  make ( chan  * storeRequestMsgData ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										// initialise a syncdb instance for each priority queue 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										self . queues [ i ]  =  newSyncDb ( db ,  remotekey ,  uint ( i ) ,  syncBufferSize ,  dbBatchSize ,  self . deliver ( uint ( i ) ) ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
									
										
										
										
											2017-02-22 14:10:07 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									log . Info ( fmt . Sprintf ( "syncer started: %v" ,  state ) ) 
							 
						 
					
						
							
								
									
										
										
										
											2016-08-29 21:18:00 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
									// launch chunk delivery service 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									go  self . syncDeliveries ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									// launch sync task manager 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  self . syncF ( )  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										go  self . sync ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									// process unsynced keys to broadcast 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									go  self . syncUnsyncedKeys ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									return  self ,  nil 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// metadata serialisation  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								func  encodeSync ( state  * syncState )  ( * json . RawMessage ,  error )  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									data ,  err  :=  json . MarshalIndent ( state ,  "" ,  " " ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  err  !=  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										return  nil ,  err 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									meta  :=  json . RawMessage ( data ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									return  & meta ,  nil 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								func  decodeSync ( meta  * json . RawMessage )  ( * syncState ,  error )  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  meta  ==  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										return  nil ,  fmt . Errorf ( "unable to deserialise sync state from <nil>" ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									data  :=  [ ] byte ( * ( meta ) ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  len ( data )  ==  0  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										return  nil ,  fmt . Errorf ( "unable to deserialise sync state from <nil>" ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									state  :=  & syncState { DbSyncState :  & storage . DbSyncState { } } 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									err  :=  json . Unmarshal ( data ,  state ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									return  state ,  err 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								/ *  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								 sync  implements  the  syncing  script 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								 *  first  all  items  left  in  the  request  Db  are  replayed 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								   *  type  =  StaleSync 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								   *  Mode :  by  default  once  again  via  confirmation  roundtrip 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								   *  Priority :  the  items  are  replayed  as  the  proirity  specified  for  StaleSync 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								   *  but  within  the  order  respects  earlier  priority  level  of  request 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								 *  after  all  items  are  consumed  for  a  priority  level ,  the  the  respective 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  queue  for  delivery  requests  is  open  ( this  way  new  reqs  not  written  to  db ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  ( TODO :  this  should  be  checked ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								 *  the  sync  state  provided  by  the  remote  peer  is  used  to  sync  history 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								   *  all  the  backlog  from  earlier  ( aborted )  syncing  is  completed  starting  from  latest 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								   *  if  Last   <  LastSeenAt  then  all  items  in  between  then  process  all 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								     backlog  from  upto  last  disconnect 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								   *  if  Last  >  0  && 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								 sync  is  called  from  the  syncer  constructor  and  is  not  supposed  to  be  used  externally 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								* /  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								func  ( self  * syncer )  sync ( )  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									state  :=  self . state 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									// sync finished 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									defer  close ( self . syncStates ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									// 0. first replay stale requests from request db 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  state . SessionAt  ==  0  { 
							 
						 
					
						
							
								
									
										
										
										
											2017-02-22 14:10:07 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
										log . Debug ( fmt . Sprintf ( "syncer[%v]: nothing to sync" ,  self . key . Log ( ) ) ) 
							 
						 
					
						
							
								
									
										
										
										
											2016-08-29 21:18:00 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
										return 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
									
										
										
										
											2017-02-22 14:10:07 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									log . Debug ( fmt . Sprintf ( "syncer[%v]: start replaying stale requests from request db" ,  self . key . Log ( ) ) ) 
							 
						 
					
						
							
								
									
										
										
										
											2016-08-29 21:18:00 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
									for  p  :=  priorities  -  1 ;  p  >=  0 ;  p --  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										self . queues [ p ] . dbRead ( false ,  0 ,  self . replay ( ) ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
									
										
										
										
											2017-02-22 14:10:07 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									log . Debug ( fmt . Sprintf ( "syncer[%v]: done replaying stale requests from request db" ,  self . key . Log ( ) ) ) 
							 
						 
					
						
							
								
									
										
										
										
											2016-08-29 21:18:00 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									// unless peer is synced sync unfinished history beginning on 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  ! state . Synced  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										start  :=  state . Start 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										if  ! storage . IsZeroKey ( state . Latest )  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											// 1. there is unfinished earlier sync 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											state . Start  =  state . Latest 
							 
						 
					
						
							
								
									
										
										
										
											2017-02-22 14:10:07 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
											log . Debug ( fmt . Sprintf ( "syncer[%v]: start syncronising backlog (unfinished sync: %v)" ,  self . key . Log ( ) ,  state ) ) 
							 
						 
					
						
							
								
									
										
										
										
											2016-08-29 21:18:00 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
											// blocks while the entire history upto state is synced 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											self . syncState ( state ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											if  state . Last  <  state . SessionAt  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												state . First  =  state . Last  +  1 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										state . Latest  =  storage . ZeroKey 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										state . Start  =  start 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										// 2. sync up to last disconnect1 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										if  state . First  <  state . LastSeenAt  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											state . Last  =  state . LastSeenAt 
							 
						 
					
						
							
								
									
										
										
										
											2017-02-22 14:10:07 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
											log . Debug ( fmt . Sprintf ( "syncer[%v]: start syncronising history upto last disconnect at %v: %v" ,  self . key . Log ( ) ,  state . LastSeenAt ,  state ) ) 
							 
						 
					
						
							
								
									
										
										
										
											2016-08-29 21:18:00 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
											self . syncState ( state ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											state . First  =  state . LastSeenAt 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										state . Latest  =  storage . ZeroKey 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									}  else  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										// synchronisation starts at end of last session 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										state . First  =  state . LastSeenAt 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									// 3. sync up to current session start 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									// if there have been new chunks since last session 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  state . LastSeenAt  <  state . SessionAt  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										state . Last  =  state . SessionAt 
							 
						 
					
						
							
								
									
										
										
										
											2017-02-22 14:10:07 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
										log . Debug ( fmt . Sprintf ( "syncer[%v]: start syncronising history since last disconnect at %v up until session start at %v: %v" ,  self . key . Log ( ) ,  state . LastSeenAt ,  state . SessionAt ,  state ) ) 
							 
						 
					
						
							
								
									
										
										
										
											2016-08-29 21:18:00 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
										// blocks until state syncing is finished 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										self . syncState ( state ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
									
										
										
										
											2017-02-22 14:10:07 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									log . Info ( fmt . Sprintf ( "syncer[%v]: syncing all history complete" ,  self . key . Log ( ) ) ) 
							 
						 
					
						
							
								
									
										
										
										
											2016-08-29 21:18:00 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// wait till syncronised block uptil state is synced  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								func  ( self  * syncer )  syncState ( state  * syncState )  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									self . syncStates  <-  state 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									select  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									case  <- state . synced : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									case  <- self . quit : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// stop quits both request processor and saves the request cache to disk  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								func  ( self  * syncer )  stop ( )  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									close ( self . quit ) 
							 
						 
					
						
							
								
									
										
										
										
											2017-02-22 14:10:07 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									log . Trace ( fmt . Sprintf ( "syncer[%v]: stop and save sync request db backlog" ,  self . key . Log ( ) ) ) 
							 
						 
					
						
							
								
									
										
										
										
											2016-08-29 21:18:00 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
									for  _ ,  db  :=  range  self . queues  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										db . stop ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// rlp serialisable sync request  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								type  syncRequest  struct  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									Key       storage . Key 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									Priority  uint 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								func  ( self  * syncRequest )  String ( )  string  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									return  fmt . Sprintf ( "<Key: %v, Priority: %v>" ,  self . Key . Log ( ) ,  self . Priority ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								func  ( self  * syncer )  newSyncRequest ( req  interface { } ,  p  int )  ( * syncRequest ,  error )  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									key ,  _ ,  _ ,  _ ,  err  :=  parseRequest ( req ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									// TODO: if req has chunk, it should be put in a cache 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									// create 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  err  !=  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										return  nil ,  err 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									return  & syncRequest { key ,  uint ( p ) } ,  nil 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// serves historical items from the DB  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// * read is on demand, blocking unless history channel is read  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// * accepts sync requests (syncStates) to create new db iterator  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// * closes the channel one iteration finishes  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								func  ( self  * syncer )  syncHistory ( state  * syncState )  chan  interface { }  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									var  n  uint 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									history  :=  make ( chan  interface { } ) 
							 
						 
					
						
							
								
									
										
										
										
											2017-02-22 14:10:07 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									log . Debug ( fmt . Sprintf ( "syncer[%v]: syncing history between %v - %v for chunk addresses %v - %v" ,  self . key . Log ( ) ,  state . First ,  state . Last ,  state . Start ,  state . Stop ) ) 
							 
						 
					
						
							
								
									
										
										
										
											2016-08-29 21:18:00 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
									it  :=  self . dbAccess . iterator ( state ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  it  !=  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										go  func ( )  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											// signal end of the iteration ended 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											defer  close ( history ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										IT : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											for  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												key  :=  it . Next ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												if  key  ==  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													break  IT 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												select  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												// blocking until history channel is read from 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												case  history  <-  storage . Key ( key ) : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													n ++ 
							 
						 
					
						
							
								
									
										
										
										
											2017-02-22 14:10:07 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
													log . Trace ( fmt . Sprintf ( "syncer[%v]: history: %v (%v keys)" ,  self . key . Log ( ) ,  key . Log ( ) ,  n ) ) 
							 
						 
					
						
							
								
									
										
										
										
											2016-08-29 21:18:00 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
													state . Latest  =  key 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												case  <- self . quit : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													return 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											} 
							 
						 
					
						
							
								
									
										
										
										
											2017-02-22 14:10:07 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
											log . Debug ( fmt . Sprintf ( "syncer[%v]: finished syncing history between %v - %v for chunk addresses %v - %v (at %v) (chunks = %v)" ,  self . key . Log ( ) ,  state . First ,  state . Last ,  state . Start ,  state . Stop ,  state . Latest ,  n ) ) 
							 
						 
					
						
							
								
									
										
										
										
											2016-08-29 21:18:00 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
										} ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									return  history 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// triggers key syncronisation  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								func  ( self  * syncer )  sendUnsyncedKeys ( )  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									select  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									case  self . deliveryRequest  <-  true : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									default : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// assembles a new batch of unsynced keys  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// * keys are drawn from the key buffers in order of priority queue  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// * if the queues of priority for History (HistoryReq) or higher are depleted,  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//   historical data is used so historical items are lower priority within  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//   their priority group.  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// * Order of historical data is unspecified  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								func  ( self  * syncer )  syncUnsyncedKeys ( )  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									// send out new 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									var  unsynced  [ ] * syncRequest 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									var  more ,  justSynced  bool 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									var  keyCount ,  historyCnt  int 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									var  history  chan  interface { } 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									priority  :=  High 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									keys  :=  self . keys [ priority ] 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									var  newUnsyncedKeys ,  deliveryRequest  chan  bool 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									keyCounts  :=  make ( [ ] int ,  priorities ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									histPrior  :=  self . SyncPriorities [ HistoryReq ] 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									syncStates  :=  self . syncStates 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									state  :=  self . state 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								LOOP :  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									for  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										var  req  interface { } 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										// select the highest priority channel to read from 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										// keys channels are buffered so the highest priority ones 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										// are checked first - integrity can only be guaranteed if writing 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										// is locked while selecting 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										if  priority  !=  High  ||  len ( keys )  ==  0  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											// selection is not needed if the High priority queue has items 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											keys  =  nil 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										PRIORITIES : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											for  priority  =  High ;  priority  >=  0 ;  priority --  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												// the first priority channel that is non-empty will be assigned to keys 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												if  len ( self . keys [ priority ] )  >  0  { 
							 
						 
					
						
							
								
									
										
										
										
											2017-02-22 14:10:07 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
													log . Trace ( fmt . Sprintf ( "syncer[%v]: reading request with	priority %v" ,  self . key . Log ( ) ,  priority ) ) 
							 
						 
					
						
							
								
									
										
										
										
											2016-08-29 21:18:00 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
													keys  =  self . keys [ priority ] 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													break  PRIORITIES 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												} 
							 
						 
					
						
							
								
									
										
										
										
											2017-02-22 14:10:07 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
												log . Trace ( fmt . Sprintf ( "syncer[%v/%v]: queue: [%v, %v, %v]" ,  self . key . Log ( ) ,  priority ,  len ( self . keys [ High ] ) ,  len ( self . keys [ Medium ] ) ,  len ( self . keys [ Low ] ) ) ) 
							 
						 
					
						
							
								
									
										
										
										
											2016-08-29 21:18:00 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
												// if the input queue is empty on this level, resort to history if there is any 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												if  uint ( priority )  ==  histPrior  &&  history  !=  nil  { 
							 
						 
					
						
							
								
									
										
										
										
											2017-02-22 14:10:07 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
													log . Trace ( fmt . Sprintf ( "syncer[%v]: reading history for %v" ,  self . key . Log ( ) ,  self . key ) ) 
							 
						 
					
						
							
								
									
										
										
										
											2016-08-29 21:18:00 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
													keys  =  history 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													break  PRIORITIES 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										// if peer ready to receive but nothing to send 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										if  keys  ==  nil  &&  deliveryRequest  ==  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											// if no items left and switch to waiting mode 
							 
						 
					
						
							
								
									
										
										
										
											2017-02-22 14:10:07 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
											log . Trace ( fmt . Sprintf ( "syncer[%v]: buffers consumed. Waiting" ,  self . key . Log ( ) ) ) 
							 
						 
					
						
							
								
									
										
										
										
											2016-08-29 21:18:00 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
											newUnsyncedKeys  =  self . newUnsyncedKeys 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										// send msg iff 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										// * peer is ready to receive keys AND ( 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										// * all queues and history are depleted OR 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										// * batch full OR 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										// * all history have been consumed, synced) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										if  deliveryRequest  ==  nil  && 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											( justSynced  || 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												len ( unsynced )  >  0  &&  keys  ==  nil  || 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												len ( unsynced )  ==  int ( self . SyncBatchSize ) )  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											justSynced  =  false 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											// listen to requests 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											deliveryRequest  =  self . deliveryRequest 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											newUnsyncedKeys  =  nil  // not care about data until next req comes in 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											// set sync to current counter 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											// (all nonhistorical outgoing traffic sheduled and persisted 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											state . LastSeenAt  =  self . dbAccess . counter ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											state . Latest  =  storage . ZeroKey 
							 
						 
					
						
							
								
									
										
										
										
											2017-02-22 14:10:07 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
											log . Trace ( fmt . Sprintf ( "syncer[%v]: sending %v" ,  self . key . Log ( ) ,  unsynced ) ) 
							 
						 
					
						
							
								
									
										
										
										
											2016-08-29 21:18:00 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
											//  send the unsynced keys 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											stateCopy  :=  * state 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											err  :=  self . unsyncedKeys ( unsynced ,  & stateCopy ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											if  err  !=  nil  { 
							 
						 
					
						
							
								
									
										
										
										
											2017-02-22 14:56:09 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
												log . Warn ( fmt . Sprintf ( "syncer[%v]: unable to send unsynced keys: %v" ,  self . key . Log ( ) ,  err ) ) 
							 
						 
					
						
							
								
									
										
										
										
											2016-08-29 21:18:00 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
											} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											self . state  =  state 
							 
						 
					
						
							
								
									
										
										
										
											2017-02-22 14:10:07 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
											log . Debug ( fmt . Sprintf ( "syncer[%v]: --> %v keys sent: (total: %v (%v), history: %v), sent sync state: %v" ,  self . key . Log ( ) ,  len ( unsynced ) ,  keyCounts ,  keyCount ,  historyCnt ,  stateCopy ) ) 
							 
						 
					
						
							
								
									
										
										
										
											2016-08-29 21:18:00 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
											unsynced  =  nil 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											keys  =  nil 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										// process item and add it to the batch 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										select  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										case  <- self . quit : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											break  LOOP 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										case  req ,  more  =  <- keys : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											if  keys  ==  history  &&  ! more  { 
							 
						 
					
						
							
								
									
										
										
										
											2017-02-22 14:10:07 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
												log . Trace ( fmt . Sprintf ( "syncer[%v]: syncing history segment complete" ,  self . key . Log ( ) ) ) 
							 
						 
					
						
							
								
									
										
										
										
											2016-08-29 21:18:00 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
												// history channel is closed, waiting for new state (called from sync()) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												syncStates  =  self . syncStates 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												state . Synced  =  true  // this signals that the  current segment is complete 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												select  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												case  state . synced  <-  false : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												case  <- self . quit : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													break  LOOP 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												justSynced  =  true 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												history  =  nil 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										case  <- deliveryRequest : 
							 
						 
					
						
							
								
									
										
										
										
											2017-02-22 14:10:07 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
											log . Trace ( fmt . Sprintf ( "syncer[%v]: peer ready to receive" ,  self . key . Log ( ) ) ) 
							 
						 
					
						
							
								
									
										
										
										
											2016-08-29 21:18:00 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											// this 1 cap channel can wake up the loop 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											// signaling that peer is ready to receive unsynced Keys 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											// the channel is set to nil any further writes will be ignored 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											deliveryRequest  =  nil 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										case  <- newUnsyncedKeys : 
							 
						 
					
						
							
								
									
										
										
										
											2017-02-22 14:10:07 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
											log . Trace ( fmt . Sprintf ( "syncer[%v]: new unsynced keys available" ,  self . key . Log ( ) ) ) 
							 
						 
					
						
							
								
									
										
										
										
											2016-08-29 21:18:00 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
											// this 1 cap channel can wake up the loop 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											// signals that data is available to send if peer is ready to receive 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											newUnsyncedKeys  =  nil 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											keys  =  self . keys [ High ] 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										case  state ,  more  =  <- syncStates : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											// this resets the state 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											if  ! more  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												state  =  self . state 
							 
						 
					
						
							
								
									
										
										
										
											2017-02-22 14:10:07 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
												log . Trace ( fmt . Sprintf ( "syncer[%v]: (priority %v) syncing complete upto %v)" ,  self . key . Log ( ) ,  priority ,  state ) ) 
							 
						 
					
						
							
								
									
										
										
										
											2016-08-29 21:18:00 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
												state . Synced  =  true 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												syncStates  =  nil 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											}  else  { 
							 
						 
					
						
							
								
									
										
										
										
											2017-02-22 14:10:07 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
												log . Trace ( fmt . Sprintf ( "syncer[%v]: (priority %v) syncing history upto %v priority %v)" ,  self . key . Log ( ) ,  priority ,  state ,  histPrior ) ) 
							 
						 
					
						
							
								
									
										
										
										
											2016-08-29 21:18:00 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
												state . Synced  =  false 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												history  =  self . syncHistory ( state ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												// only one history at a time, only allow another one once the 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												// history channel is closed 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												syncStates  =  nil 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										if  req  ==  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											continue  LOOP 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2017-02-22 14:10:07 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
										log . Trace ( fmt . Sprintf ( "syncer[%v]: (priority %v) added to unsynced keys: %v" ,  self . key . Log ( ) ,  priority ,  req ) ) 
							 
						 
					
						
							
								
									
										
										
										
											2016-08-29 21:18:00 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
										keyCounts [ priority ] ++ 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										keyCount ++ 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										if  keys  ==  history  { 
							 
						 
					
						
							
								
									
										
										
										
											2017-02-22 14:10:07 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
											log . Trace ( fmt . Sprintf ( "syncer[%v]: (priority %v) history item %v (synced = %v)" ,  self . key . Log ( ) ,  priority ,  req ,  state . Synced ) ) 
							 
						 
					
						
							
								
									
										
										
										
											2016-08-29 21:18:00 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
											historyCnt ++ 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										if  sreq ,  err  :=  self . newSyncRequest ( req ,  priority ) ;  err  ==  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											// extract key from req 
							 
						 
					
						
							
								
									
										
										
										
											2017-02-22 14:10:07 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
											log . Trace ( fmt . Sprintf ( "syncer[%v]: (priority %v): request %v (synced = %v)" ,  self . key . Log ( ) ,  priority ,  req ,  state . Synced ) ) 
							 
						 
					
						
							
								
									
										
										
										
											2016-08-29 21:18:00 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
											unsynced  =  append ( unsynced ,  sreq ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										}  else  { 
							 
						 
					
						
							
								
									
										
										
										
											2017-02-22 14:56:09 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
											log . Warn ( fmt . Sprintf ( "syncer[%v]: (priority %v): error creating request for %v: %v)" ,  self . key . Log ( ) ,  priority ,  req ,  err ) ) 
							 
						 
					
						
							
								
									
										
										
										
											2016-08-29 21:18:00 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// delivery loop  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// takes into account priority, send store Requests with chunk (delivery)  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// idle blocking if no new deliveries in any of the queues  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								func  ( self  * syncer )  syncDeliveries ( )  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									var  req  * storeRequestMsgData 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									p  :=  High 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									var  deliveries  chan  * storeRequestMsgData 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									var  msg  * storeRequestMsgData 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									var  err  error 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									var  c  =  [ priorities ] int { } 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									var  n  =  [ priorities ] int { } 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									var  total ,  success  uint 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									for  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										deliveries  =  self . deliveries [ p ] 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										select  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										case  req  =  <- deliveries : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											n [ p ] ++ 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											c [ p ] ++ 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										default : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											if  p  ==  Low  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												// blocking, depletion on all channels, no preference for priority 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												select  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												case  req  =  <- self . deliveries [ High ] : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													n [ High ] ++ 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												case  req  =  <- self . deliveries [ Medium ] : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													n [ Medium ] ++ 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												case  req  =  <- self . deliveries [ Low ] : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													n [ Low ] ++ 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												case  <- self . quit : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													return 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												p  =  High 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											}  else  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												p -- 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												continue 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										total ++ 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										msg ,  err  =  self . newStoreRequestMsgData ( req ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										if  err  !=  nil  { 
							 
						 
					
						
							
								
									
										
										
										
											2017-02-22 14:10:07 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
											log . Warn ( fmt . Sprintf ( "syncer[%v]: failed to create store request for %v: %v" ,  self . key . Log ( ) ,  req ,  err ) ) 
							 
						 
					
						
							
								
									
										
										
										
											2016-08-29 21:18:00 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
										}  else  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											err  =  self . store ( msg ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											if  err  !=  nil  { 
							 
						 
					
						
							
								
									
										
										
										
											2017-02-22 14:10:07 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
												log . Warn ( fmt . Sprintf ( "syncer[%v]: failed to deliver %v: %v" ,  self . key . Log ( ) ,  req ,  err ) ) 
							 
						 
					
						
							
								
									
										
										
										
											2016-08-29 21:18:00 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
											}  else  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												success ++ 
							 
						 
					
						
							
								
									
										
										
										
											2017-02-22 14:10:07 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
												log . Trace ( fmt . Sprintf ( "syncer[%v]: %v successfully delivered" ,  self . key . Log ( ) ,  req ) ) 
							 
						 
					
						
							
								
									
										
										
										
											2016-08-29 21:18:00 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
											} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										if  total % self . SyncBatchSize  ==  0  { 
							 
						 
					
						
							
								
									
										
										
										
											2017-02-22 14:10:07 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
											log . Debug ( fmt . Sprintf ( "syncer[%v]: deliver Total: %v, Success: %v, High: %v/%v, Medium: %v/%v, Low %v/%v" ,  self . key . Log ( ) ,  total ,  success ,  c [ High ] ,  n [ High ] ,  c [ Medium ] ,  n [ Medium ] ,  c [ Low ] ,  n [ Low ] ) ) 
							 
						 
					
						
							
								
									
										
										
										
											2016-08-29 21:18:00 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								/ *  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								 addRequest  handles  requests  for  delivery 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								 it  accepts  4  types : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								 *  storeRequestMsgData :  coming  from  netstore  propagate  response 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								 *  chunk :  coming  from  forwarding  ( questionable :  id ? ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								 *  key :  from  incoming  syncRequest 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								 *  syncDbEntry :  key , id  encoded  in  db 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								 If  sync  mode  is  on  for  the  type  of  request ,  then 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								 it  sends  the  request  to  the  keys  queue  of  the  correct  priority 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								 channel  buffered  with  capacity  ( SyncBufferSize ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								 If  sync  mode  is  off  then ,  requests  are  directly  sent  to  deliveries 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								* /  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								func  ( self  * syncer )  addRequest ( req  interface { } ,  ty  int )  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									// retrieve priority for request type name int8 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									priority  :=  self . SyncPriorities [ ty ] 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									// sync mode for this type ON 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  self . syncF ( )  ||  ty  ==  DeliverReq  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										if  self . SyncModes [ ty ]  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											self . addKey ( req ,  priority ,  self . quit ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										}  else  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											self . addDelivery ( req ,  priority ,  self . quit ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// addKey queues sync request for sync confirmation with given priority  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// ie the key will go out in an unsyncedKeys message  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								func  ( self  * syncer )  addKey ( req  interface { } ,  priority  uint ,  quit  chan  bool )  bool  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									select  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									case  self . keys [ priority ]  <-  req : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										// this wakes up the unsynced keys loop if idle 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										select  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										case  self . newUnsyncedKeys  <-  true : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										default : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										return  true 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									case  <- quit : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										return  false 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// addDelivery queues delivery request for with given priority  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// ie the chunk will be delivered ASAP mod priority queueing handled by syncdb  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// requests are persisted across sessions for correct sync  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								func  ( self  * syncer )  addDelivery ( req  interface { } ,  priority  uint ,  quit  chan  bool )  bool  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									select  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									case  self . queues [ priority ] . buffer  <-  req : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										return  true 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									case  <- quit : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										return  false 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// doDelivery delivers the chunk for the request with given priority  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// without queuing  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								func  ( self  * syncer )  doDelivery ( req  interface { } ,  priority  uint ,  quit  chan  bool )  bool  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									msgdata ,  err  :=  self . newStoreRequestMsgData ( req ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  err  !=  nil  { 
							 
						 
					
						
							
								
									
										
										
										
											2017-02-22 14:10:07 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
										log . Warn ( fmt . Sprintf ( "unable to deliver request %v: %v" ,  msgdata ,  err ) ) 
							 
						 
					
						
							
								
									
										
										
										
											2016-08-29 21:18:00 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
										return  false 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									select  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									case  self . deliveries [ priority ]  <-  msgdata : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										return  true 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									case  <- quit : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										return  false 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// returns the delivery function for given priority  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// passed on to syncDb  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								func  ( self  * syncer )  deliver ( priority  uint )  func ( req  interface { } ,  quit  chan  bool )  bool  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									return  func ( req  interface { } ,  quit  chan  bool )  bool  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										return  self . doDelivery ( req ,  priority ,  quit ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// returns the replay function passed on to syncDb  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// depending on sync mode settings for BacklogReq,  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// re	play of request db backlog sends items via confirmation  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// or directly delivers  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								func  ( self  * syncer )  replay ( )  func ( req  interface { } ,  quit  chan  bool )  bool  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									sync  :=  self . SyncModes [ BacklogReq ] 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									priority  :=  self . SyncPriorities [ BacklogReq ] 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									// sync mode for this type ON 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  sync  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										return  func ( req  interface { } ,  quit  chan  bool )  bool  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											return  self . addKey ( req ,  priority ,  quit ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									}  else  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										return  func ( req  interface { } ,  quit  chan  bool )  bool  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											return  self . doDelivery ( req ,  priority ,  quit ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// given a request, extends it to a full storeRequestMsgData  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// polimorphic: see addRequest for the types accepted  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								func  ( self  * syncer )  newStoreRequestMsgData ( req  interface { } )  ( * storeRequestMsgData ,  error )  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									key ,  id ,  chunk ,  sreq ,  err  :=  parseRequest ( req ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  err  !=  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										return  nil ,  err 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  sreq  ==  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										if  chunk  ==  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											var  err  error 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											chunk ,  err  =  self . dbAccess . get ( key ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											if  err  !=  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												return  nil ,  err 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										sreq  =  & storeRequestMsgData { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											Id :     id , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											Key :    chunk . Key , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											SData :  chunk . SData , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									return  sreq ,  nil 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// parse request types and extracts, key, id, chunk, request if available  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// does not do chunk lookup !  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								func  parseRequest ( req  interface { } )  ( storage . Key ,  uint64 ,  * storage . Chunk ,  * storeRequestMsgData ,  error )  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									var  key  storage . Key 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									var  entry  * syncDbEntry 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									var  chunk  * storage . Chunk 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									var  id  uint64 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									var  ok  bool 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									var  sreq  * storeRequestMsgData 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									var  err  error 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  key ,  ok  =  req . ( storage . Key ) ;  ok  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										id  =  generateId ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									}  else  if  entry ,  ok  =  req . ( * syncDbEntry ) ;  ok  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										id  =  binary . BigEndian . Uint64 ( entry . val [ 32 : ] ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										key  =  storage . Key ( entry . val [ : 32 ] ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									}  else  if  chunk ,  ok  =  req . ( * storage . Chunk ) ;  ok  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										key  =  chunk . Key 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										id  =  generateId ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									}  else  if  sreq ,  ok  =  req . ( * storeRequestMsgData ) ;  ok  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										key  =  sreq . Key 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									}  else  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										err  =  fmt . Errorf ( "type not allowed: %v (%T)" ,  req ,  req ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									return  key ,  id ,  chunk ,  sreq ,  err 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}