| 
									
										
										
										
											2016-08-29 21:18:00 +02:00
										 |  |  | // Copyright 2016 The go-ethereum Authors | 
					
						
							|  |  |  | // This file is part of the go-ethereum library. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // The go-ethereum library is free software: you can redistribute it and/or modify | 
					
						
							|  |  |  | // it under the terms of the GNU Lesser General Public License as published by | 
					
						
							|  |  |  | // the Free Software Foundation, either version 3 of the License, or | 
					
						
							|  |  |  | // (at your option) any later version. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // The go-ethereum library is distributed in the hope that it will be useful, | 
					
						
							|  |  |  | // but WITHOUT ANY WARRANTY; without even the implied warranty of | 
					
						
							|  |  |  | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | 
					
						
							|  |  |  | // GNU Lesser General Public License for more details. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // You should have received a copy of the GNU Lesser General Public License | 
					
						
							|  |  |  | // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | package network | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | /* | 
					
						
							|  |  |  | bzz implements the swarm wire protocol [bzz] (sister of eth and shh) | 
					
						
							|  |  |  | the protocol instance is launched on each peer by the network layer if the | 
					
						
							|  |  |  | bzz protocol handler is registered on the p2p server. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | The bzz protocol component speaks the bzz protocol | 
					
						
							|  |  |  | * handle the protocol handshake | 
					
						
							|  |  |  | * register peers in the KΛÐΞMLIΛ table via the hive logistic manager | 
					
						
							|  |  |  | * dispatch to hive for handling the DHT logic | 
					
						
							|  |  |  | * encode and decode requests for storage and retrieval | 
					
						
							|  |  |  | * handle sync protocol messages via the syncer | 
					
						
							|  |  |  | * talks the SWAP payment protocol (swap accounting is done within NetStore) | 
					
						
							|  |  |  | */ | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							|  |  |  | 	"fmt" | 
					
						
							|  |  |  | 	"net" | 
					
						
							|  |  |  | 	"strconv" | 
					
						
							|  |  |  | 	"time" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	"github.com/ethereum/go-ethereum/contracts/chequebook" | 
					
						
							|  |  |  | 	"github.com/ethereum/go-ethereum/errs" | 
					
						
							|  |  |  | 	"github.com/ethereum/go-ethereum/logger" | 
					
						
							|  |  |  | 	"github.com/ethereum/go-ethereum/logger/glog" | 
					
						
							|  |  |  | 	"github.com/ethereum/go-ethereum/p2p" | 
					
						
							|  |  |  | 	"github.com/ethereum/go-ethereum/p2p/discover" | 
					
						
							|  |  |  | 	bzzswap "github.com/ethereum/go-ethereum/swarm/services/swap" | 
					
						
							|  |  |  | 	"github.com/ethereum/go-ethereum/swarm/services/swap/swap" | 
					
						
							|  |  |  | 	"github.com/ethereum/go-ethereum/swarm/storage" | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | const ( | 
					
						
							|  |  |  | 	Version            = 0 | 
					
						
							|  |  |  | 	ProtocolLength     = uint64(8) | 
					
						
							|  |  |  | 	ProtocolMaxMsgSize = 10 * 1024 * 1024 | 
					
						
							|  |  |  | 	NetworkId          = 322 | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | const ( | 
					
						
							|  |  |  | 	ErrMsgTooLarge = iota | 
					
						
							|  |  |  | 	ErrDecode | 
					
						
							|  |  |  | 	ErrInvalidMsgCode | 
					
						
							|  |  |  | 	ErrVersionMismatch | 
					
						
							|  |  |  | 	ErrNetworkIdMismatch | 
					
						
							|  |  |  | 	ErrNoStatusMsg | 
					
						
							|  |  |  | 	ErrExtraStatusMsg | 
					
						
							|  |  |  | 	ErrSwap | 
					
						
							|  |  |  | 	ErrSync | 
					
						
							|  |  |  | 	ErrUnwanted | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | var errorToString = map[int]string{ | 
					
						
							|  |  |  | 	ErrMsgTooLarge:       "Message too long", | 
					
						
							|  |  |  | 	ErrDecode:            "Invalid message", | 
					
						
							|  |  |  | 	ErrInvalidMsgCode:    "Invalid message code", | 
					
						
							|  |  |  | 	ErrVersionMismatch:   "Protocol version mismatch", | 
					
						
							|  |  |  | 	ErrNetworkIdMismatch: "NetworkId mismatch", | 
					
						
							|  |  |  | 	ErrNoStatusMsg:       "No status message", | 
					
						
							|  |  |  | 	ErrExtraStatusMsg:    "Extra status message", | 
					
						
							|  |  |  | 	ErrSwap:              "SWAP error", | 
					
						
							|  |  |  | 	ErrSync:              "Sync error", | 
					
						
							|  |  |  | 	ErrUnwanted:          "Unwanted peer", | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // bzz represents the swarm wire protocol | 
					
						
							|  |  |  | // an instance is running on each peer | 
					
						
							|  |  |  | type bzz struct { | 
					
						
							|  |  |  | 	selfID     discover.NodeID      // peer's node id used in peer advertising in handshake | 
					
						
							|  |  |  | 	key        storage.Key          // baseaddress as storage.Key | 
					
						
							|  |  |  | 	storage    StorageHandler       // handler storage/retrieval related requests coming via the bzz wire protocol | 
					
						
							|  |  |  | 	hive       *Hive                // the logistic manager, peerPool, routing service and peer handler | 
					
						
							|  |  |  | 	dbAccess   *DbAccess            // access to db storage counter and iterator for syncing | 
					
						
							|  |  |  | 	requestDb  *storage.LDBDatabase // db to persist backlog of deliveries to aid syncing | 
					
						
							|  |  |  | 	remoteAddr *peerAddr            // remote peers address | 
					
						
							|  |  |  | 	peer       *p2p.Peer            // the p2p peer object | 
					
						
							|  |  |  | 	rw         p2p.MsgReadWriter    // messageReadWriter to send messages to | 
					
						
							|  |  |  | 	errors     *errs.Errors         // errors table | 
					
						
							|  |  |  | 	backend    chequebook.Backend | 
					
						
							|  |  |  | 	lastActive time.Time | 
					
						
							| 
									
										
										
										
											2016-11-28 13:29:33 +01:00
										 |  |  | 	NetworkId  uint64 | 
					
						
							| 
									
										
										
										
											2016-08-29 21:18:00 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	swap        *swap.Swap          // swap instance for the peer connection | 
					
						
							|  |  |  | 	swapParams  *bzzswap.SwapParams // swap settings both local and remote | 
					
						
							|  |  |  | 	swapEnabled bool                // flag to enable SWAP (will be set via Caps in handshake) | 
					
						
							|  |  |  | 	syncEnabled bool                // flag to enable SYNC (will be set via Caps in handshake) | 
					
						
							|  |  |  | 	syncer      *syncer             // syncer instance for the peer connection | 
					
						
							|  |  |  | 	syncParams  *SyncParams         // syncer params | 
					
						
							|  |  |  | 	syncState   *syncState          // outgoing syncronisation state (contains reference to remote peers db counter) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // interface type for handler of storage/retrieval related requests coming | 
					
						
							|  |  |  | // via the bzz wire protocol | 
					
						
							|  |  |  | // messages: UnsyncedKeys, DeliveryRequest, StoreRequest, RetrieveRequest | 
					
						
							|  |  |  | type StorageHandler interface { | 
					
						
							|  |  |  | 	HandleUnsyncedKeysMsg(req *unsyncedKeysMsgData, p *peer) error | 
					
						
							|  |  |  | 	HandleDeliveryRequestMsg(req *deliveryRequestMsgData, p *peer) error | 
					
						
							|  |  |  | 	HandleStoreRequestMsg(req *storeRequestMsgData, p *peer) | 
					
						
							|  |  |  | 	HandleRetrieveRequestMsg(req *retrieveRequestMsgData, p *peer) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | /* | 
					
						
							|  |  |  | main entrypoint, wrappers starting a server that will run the bzz protocol | 
					
						
							|  |  |  | use this constructor to attach the protocol ("class") to server caps | 
					
						
							|  |  |  | This is done by node.Node#Register(func(node.ServiceContext) (Service, error)) | 
					
						
							|  |  |  | Service implements Protocols() which is an array of protocol constructors | 
					
						
							|  |  |  | at node startup the protocols are initialised | 
					
						
							|  |  |  | the Dev p2p layer then calls Run(p *p2p.Peer, rw p2p.MsgReadWriter) error | 
					
						
							|  |  |  | on each peer connection | 
					
						
							|  |  |  | The Run function of the Bzz protocol class creates a bzz instance | 
					
						
							|  |  |  | which will represent the peer for the swarm hive and all peer-aware components | 
					
						
							|  |  |  | */ | 
					
						
							| 
									
										
										
										
											2016-11-28 13:29:33 +01:00
										 |  |  | func Bzz(cloud StorageHandler, backend chequebook.Backend, hive *Hive, dbaccess *DbAccess, sp *bzzswap.SwapParams, sy *SyncParams, networkId uint64) (p2p.Protocol, error) { | 
					
						
							| 
									
										
										
										
											2016-08-29 21:18:00 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	// a single global request db is created for all peer connections | 
					
						
							|  |  |  | 	// this is to persist delivery backlog and aid syncronisation | 
					
						
							|  |  |  | 	requestDb, err := storage.NewLDBDatabase(sy.RequestDbPath) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return p2p.Protocol{}, fmt.Errorf("error setting up request db: %v", err) | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2016-11-28 13:29:33 +01:00
										 |  |  | 	if networkId == 0 { | 
					
						
							|  |  |  | 		networkId = NetworkId | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2016-08-29 21:18:00 +02:00
										 |  |  | 	return p2p.Protocol{ | 
					
						
							|  |  |  | 		Name:    "bzz", | 
					
						
							|  |  |  | 		Version: Version, | 
					
						
							|  |  |  | 		Length:  ProtocolLength, | 
					
						
							|  |  |  | 		Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { | 
					
						
							| 
									
										
										
										
											2016-11-28 13:29:33 +01:00
										 |  |  | 			return run(requestDb, cloud, backend, hive, dbaccess, sp, sy, networkId, p, rw) | 
					
						
							| 
									
										
										
										
											2016-08-29 21:18:00 +02:00
										 |  |  | 		}, | 
					
						
							|  |  |  | 	}, nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | /* | 
					
						
							|  |  |  | the main protocol loop that | 
					
						
							|  |  |  |  * does the handshake by exchanging statusMsg | 
					
						
							|  |  |  |  * if peer is valid and accepted, registers with the hive | 
					
						
							|  |  |  |  * then enters into a forever loop handling incoming messages | 
					
						
							|  |  |  |  * storage and retrieval related queries coming via bzz are dispatched to StorageHandler | 
					
						
							|  |  |  |  * peer-related messages are dispatched to the hive | 
					
						
							|  |  |  |  * payment related messages are relayed to SWAP service | 
					
						
							|  |  |  |  * on disconnect, unregister the peer in the hive (note RemovePeer in the post-disconnect hook) | 
					
						
							|  |  |  |  * whenever the loop terminates, the peer will disconnect with Subprotocol error | 
					
						
							|  |  |  |  * whenever handlers return an error the loop terminates | 
					
						
							|  |  |  | */ | 
					
						
							| 
									
										
										
										
											2016-11-28 13:29:33 +01:00
										 |  |  | func run(requestDb *storage.LDBDatabase, depo StorageHandler, backend chequebook.Backend, hive *Hive, dbaccess *DbAccess, sp *bzzswap.SwapParams, sy *SyncParams, networkId uint64, p *p2p.Peer, rw p2p.MsgReadWriter) (err error) { | 
					
						
							| 
									
										
										
										
											2016-08-29 21:18:00 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	self := &bzz{ | 
					
						
							|  |  |  | 		storage:   depo, | 
					
						
							|  |  |  | 		backend:   backend, | 
					
						
							|  |  |  | 		hive:      hive, | 
					
						
							|  |  |  | 		dbAccess:  dbaccess, | 
					
						
							|  |  |  | 		requestDb: requestDb, | 
					
						
							|  |  |  | 		peer:      p, | 
					
						
							|  |  |  | 		rw:        rw, | 
					
						
							|  |  |  | 		errors: &errs.Errors{ | 
					
						
							|  |  |  | 			Package: "BZZ", | 
					
						
							|  |  |  | 			Errors:  errorToString, | 
					
						
							|  |  |  | 		}, | 
					
						
							|  |  |  | 		swapParams:  sp, | 
					
						
							|  |  |  | 		syncParams:  sy, | 
					
						
							|  |  |  | 		swapEnabled: hive.swapEnabled, | 
					
						
							|  |  |  | 		syncEnabled: true, | 
					
						
							| 
									
										
										
										
											2016-11-28 13:29:33 +01:00
										 |  |  | 		NetworkId:   networkId, | 
					
						
							| 
									
										
										
										
											2016-08-29 21:18:00 +02:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// handle handshake | 
					
						
							|  |  |  | 	err = self.handleStatus() | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	defer func() { | 
					
						
							|  |  |  | 		// if the handler loop exits, the peer is disconnecting | 
					
						
							|  |  |  | 		// deregister the peer in the hive | 
					
						
							|  |  |  | 		self.hive.removePeer(&peer{bzz: self}) | 
					
						
							|  |  |  | 		if self.syncer != nil { | 
					
						
							|  |  |  | 			self.syncer.stop() // quits request db and delivery loops, save requests | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		if self.swap != nil { | 
					
						
							|  |  |  | 			self.swap.Stop() // quits chequebox autocash etc | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	}() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// the main forever loop that handles incoming requests | 
					
						
							|  |  |  | 	for { | 
					
						
							|  |  |  | 		if self.hive.blockRead { | 
					
						
							|  |  |  | 			glog.V(logger.Warn).Infof("Cannot read network") | 
					
						
							|  |  |  | 			time.Sleep(100 * time.Millisecond) | 
					
						
							|  |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		err = self.handle() | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			return | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // TODO: may need to implement protocol drop only? don't want to kick off the peer | 
					
						
							|  |  |  | // if they are useful for other protocols | 
					
						
							|  |  |  | func (self *bzz) Drop() { | 
					
						
							|  |  |  | 	self.peer.Disconnect(p2p.DiscSubprotocolError) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // one cycle of the main forever loop that handles and dispatches incoming messages | 
					
						
							|  |  |  | func (self *bzz) handle() error { | 
					
						
							|  |  |  | 	msg, err := self.rw.ReadMsg() | 
					
						
							|  |  |  | 	glog.V(logger.Debug).Infof("<- %v", msg) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if msg.Size > ProtocolMaxMsgSize { | 
					
						
							|  |  |  | 		return self.protoError(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	// make sure that the payload has been fully consumed | 
					
						
							|  |  |  | 	defer msg.Discard() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	switch msg.Code { | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	case statusMsg: | 
					
						
							|  |  |  | 		// no extra status message allowed. The one needed already handled by | 
					
						
							|  |  |  | 		// handleStatus | 
					
						
							|  |  |  | 		glog.V(logger.Debug).Infof("Status message: %v", msg) | 
					
						
							|  |  |  | 		return self.protoError(ErrExtraStatusMsg, "") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	case storeRequestMsg: | 
					
						
							|  |  |  | 		// store requests are dispatched to netStore | 
					
						
							|  |  |  | 		var req storeRequestMsgData | 
					
						
							|  |  |  | 		if err := msg.Decode(&req); err != nil { | 
					
						
							|  |  |  | 			return self.protoError(ErrDecode, "<- %v: %v", msg, err) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		if len(req.SData) < 9 { | 
					
						
							|  |  |  | 			return self.protoError(ErrDecode, "<- %v: Data too short (%v)", msg) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		// last Active time is set only when receiving chunks | 
					
						
							|  |  |  | 		self.lastActive = time.Now() | 
					
						
							|  |  |  | 		glog.V(logger.Detail).Infof("incoming store request: %s", req.String()) | 
					
						
							|  |  |  | 		// swap accounting is done within forwarding | 
					
						
							|  |  |  | 		self.storage.HandleStoreRequestMsg(&req, &peer{bzz: self}) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	case retrieveRequestMsg: | 
					
						
							|  |  |  | 		// retrieve Requests are dispatched to netStore | 
					
						
							|  |  |  | 		var req retrieveRequestMsgData | 
					
						
							|  |  |  | 		if err := msg.Decode(&req); err != nil { | 
					
						
							|  |  |  | 			return self.protoError(ErrDecode, "<- %v: %v", msg, err) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		req.from = &peer{bzz: self} | 
					
						
							|  |  |  | 		// if request is lookup and not to be delivered | 
					
						
							|  |  |  | 		if req.isLookup() { | 
					
						
							|  |  |  | 			glog.V(logger.Detail).Infof("self lookup for %v: responding with peers only...", req.from) | 
					
						
							|  |  |  | 		} else if req.Key == nil { | 
					
						
							|  |  |  | 			return self.protoError(ErrDecode, "protocol handler: req.Key == nil || req.Timeout == nil") | 
					
						
							|  |  |  | 		} else { | 
					
						
							|  |  |  | 			// swap accounting is done within netStore | 
					
						
							|  |  |  | 			self.storage.HandleRetrieveRequestMsg(&req, &peer{bzz: self}) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		// direct response with peers, TODO: sort this out | 
					
						
							|  |  |  | 		self.hive.peers(&req) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	case peersMsg: | 
					
						
							|  |  |  | 		// response to lookups and immediate response to retrieve requests | 
					
						
							|  |  |  | 		// dispatches new peer data to the hive that adds them to KADDB | 
					
						
							|  |  |  | 		var req peersMsgData | 
					
						
							|  |  |  | 		if err := msg.Decode(&req); err != nil { | 
					
						
							|  |  |  | 			return self.protoError(ErrDecode, "<- %v: %v", msg, err) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		req.from = &peer{bzz: self} | 
					
						
							|  |  |  | 		glog.V(logger.Detail).Infof("<- peer addresses: %v", req) | 
					
						
							|  |  |  | 		self.hive.HandlePeersMsg(&req, &peer{bzz: self}) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	case syncRequestMsg: | 
					
						
							|  |  |  | 		var req syncRequestMsgData | 
					
						
							|  |  |  | 		if err := msg.Decode(&req); err != nil { | 
					
						
							|  |  |  | 			return self.protoError(ErrDecode, "<- %v: %v", msg, err) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		glog.V(logger.Debug).Infof("<- sync request: %v", req) | 
					
						
							|  |  |  | 		self.lastActive = time.Now() | 
					
						
							|  |  |  | 		self.sync(req.SyncState) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	case unsyncedKeysMsg: | 
					
						
							|  |  |  | 		// coming from parent node offering | 
					
						
							|  |  |  | 		var req unsyncedKeysMsgData | 
					
						
							|  |  |  | 		if err := msg.Decode(&req); err != nil { | 
					
						
							|  |  |  | 			return self.protoError(ErrDecode, "<- %v: %v", msg, err) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		glog.V(logger.Debug).Infof("<- unsynced keys : %s", req.String()) | 
					
						
							|  |  |  | 		err := self.storage.HandleUnsyncedKeysMsg(&req, &peer{bzz: self}) | 
					
						
							|  |  |  | 		self.lastActive = time.Now() | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			return self.protoError(ErrDecode, "<- %v: %v", msg, err) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	case deliveryRequestMsg: | 
					
						
							|  |  |  | 		// response to syncKeysMsg hashes filtered not existing in db | 
					
						
							|  |  |  | 		// also relays the last synced state to the source | 
					
						
							|  |  |  | 		var req deliveryRequestMsgData | 
					
						
							|  |  |  | 		if err := msg.Decode(&req); err != nil { | 
					
						
							|  |  |  | 			return self.protoError(ErrDecode, "<-msg %v: %v", msg, err) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		glog.V(logger.Debug).Infof("<- delivery request: %s", req.String()) | 
					
						
							|  |  |  | 		err := self.storage.HandleDeliveryRequestMsg(&req, &peer{bzz: self}) | 
					
						
							|  |  |  | 		self.lastActive = time.Now() | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			return self.protoError(ErrDecode, "<- %v: %v", msg, err) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	case paymentMsg: | 
					
						
							|  |  |  | 		// swap protocol message for payment, Units paid for, Cheque paid with | 
					
						
							|  |  |  | 		if self.swapEnabled { | 
					
						
							|  |  |  | 			var req paymentMsgData | 
					
						
							|  |  |  | 			if err := msg.Decode(&req); err != nil { | 
					
						
							|  |  |  | 				return self.protoError(ErrDecode, "<- %v: %v", msg, err) | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			glog.V(logger.Debug).Infof("<- payment: %s", req.String()) | 
					
						
							|  |  |  | 			self.swap.Receive(int(req.Units), req.Promise) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	default: | 
					
						
							|  |  |  | 		// no other message is allowed | 
					
						
							|  |  |  | 		return self.protoError(ErrInvalidMsgCode, "%v", msg.Code) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (self *bzz) handleStatus() (err error) { | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	handshake := &statusMsgData{ | 
					
						
							|  |  |  | 		Version:   uint64(Version), | 
					
						
							|  |  |  | 		ID:        "honey", | 
					
						
							|  |  |  | 		Addr:      self.selfAddr(), | 
					
						
							| 
									
										
										
										
											2016-11-28 13:29:33 +01:00
										 |  |  | 		NetworkId: uint64(self.NetworkId), | 
					
						
							| 
									
										
										
										
											2016-08-29 21:18:00 +02:00
										 |  |  | 		Swap: &bzzswap.SwapProfile{ | 
					
						
							|  |  |  | 			Profile:    self.swapParams.Profile, | 
					
						
							|  |  |  | 			PayProfile: self.swapParams.PayProfile, | 
					
						
							|  |  |  | 		}, | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	err = p2p.Send(self.rw, statusMsg, handshake) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		self.protoError(ErrNoStatusMsg, err.Error()) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// read and handle remote status | 
					
						
							|  |  |  | 	var msg p2p.Msg | 
					
						
							|  |  |  | 	msg, err = self.rw.ReadMsg() | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	if msg.Code != statusMsg { | 
					
						
							|  |  |  | 		self.protoError(ErrNoStatusMsg, "first msg has code %x (!= %x)", msg.Code, statusMsg) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	if msg.Size > ProtocolMaxMsgSize { | 
					
						
							|  |  |  | 		return self.protoError(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	var status statusMsgData | 
					
						
							|  |  |  | 	if err := msg.Decode(&status); err != nil { | 
					
						
							|  |  |  | 		return self.protoError(ErrDecode, " %v: %v", msg, err) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-11-28 13:29:33 +01:00
										 |  |  | 	if status.NetworkId != self.NetworkId { | 
					
						
							|  |  |  | 		return self.protoError(ErrNetworkIdMismatch, "%d (!= %d)", status.NetworkId, self.NetworkId) | 
					
						
							| 
									
										
										
										
											2016-08-29 21:18:00 +02:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	if Version != status.Version { | 
					
						
							|  |  |  | 		return self.protoError(ErrVersionMismatch, "%d (!= %d)", status.Version, Version) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	self.remoteAddr = self.peerAddr(status.Addr) | 
					
						
							|  |  |  | 	glog.V(logger.Detail).Infof("self: advertised IP: %v, peer advertised: %v, local address: %v\npeer: advertised IP: %v, remote address: %v\n", self.selfAddr(), self.remoteAddr, self.peer.LocalAddr(), status.Addr.IP, self.peer.RemoteAddr()) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	if self.swapEnabled { | 
					
						
							|  |  |  | 		// set remote profile for accounting | 
					
						
							|  |  |  | 		self.swap, err = bzzswap.NewSwap(self.swapParams, status.Swap, self.backend, self) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			return self.protoError(ErrSwap, "%v", err) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	glog.V(logger.Info).Infof("Peer %08x is capable (%d/%d)", self.remoteAddr.Addr[:4], status.Version, status.NetworkId) | 
					
						
							|  |  |  | 	err = self.hive.addPeer(&peer{bzz: self}) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return self.protoError(ErrUnwanted, "%v", err) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// hive sets syncstate so sync should start after node added | 
					
						
							|  |  |  | 	glog.V(logger.Info).Infof("syncronisation request sent with %v", self.syncState) | 
					
						
							|  |  |  | 	self.syncRequest() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (self *bzz) sync(state *syncState) error { | 
					
						
							|  |  |  | 	// syncer setup | 
					
						
							|  |  |  | 	if self.syncer != nil { | 
					
						
							|  |  |  | 		return self.protoError(ErrSync, "sync request can only be sent once") | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	cnt := self.dbAccess.counter() | 
					
						
							|  |  |  | 	remoteaddr := self.remoteAddr.Addr | 
					
						
							|  |  |  | 	start, stop := self.hive.kad.KeyRange(remoteaddr) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// an explicitly received nil syncstate disables syncronisation | 
					
						
							|  |  |  | 	if state == nil { | 
					
						
							|  |  |  | 		self.syncEnabled = false | 
					
						
							|  |  |  | 		glog.V(logger.Warn).Infof("syncronisation disabled for peer %v", self) | 
					
						
							|  |  |  | 		state = &syncState{DbSyncState: &storage.DbSyncState{}, Synced: true} | 
					
						
							|  |  |  | 	} else { | 
					
						
							|  |  |  | 		state.synced = make(chan bool) | 
					
						
							|  |  |  | 		state.SessionAt = cnt | 
					
						
							|  |  |  | 		if storage.IsZeroKey(state.Stop) && state.Synced { | 
					
						
							|  |  |  | 			state.Start = storage.Key(start[:]) | 
					
						
							|  |  |  | 			state.Stop = storage.Key(stop[:]) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		glog.V(logger.Debug).Infof("syncronisation requested by peer %v at state %v", self, state) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	var err error | 
					
						
							|  |  |  | 	self.syncer, err = newSyncer( | 
					
						
							|  |  |  | 		self.requestDb, | 
					
						
							|  |  |  | 		storage.Key(remoteaddr[:]), | 
					
						
							|  |  |  | 		self.dbAccess, | 
					
						
							|  |  |  | 		self.unsyncedKeys, self.store, | 
					
						
							|  |  |  | 		self.syncParams, state, func() bool { return self.syncEnabled }, | 
					
						
							|  |  |  | 	) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return self.protoError(ErrSync, "%v", err) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	glog.V(logger.Detail).Infof("syncer set for peer %v", self) | 
					
						
							|  |  |  | 	return nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (self *bzz) String() string { | 
					
						
							|  |  |  | 	return self.remoteAddr.String() | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // repair reported address if IP missing | 
					
						
							|  |  |  | func (self *bzz) peerAddr(base *peerAddr) *peerAddr { | 
					
						
							|  |  |  | 	if base.IP.IsUnspecified() { | 
					
						
							|  |  |  | 		host, _, _ := net.SplitHostPort(self.peer.RemoteAddr().String()) | 
					
						
							|  |  |  | 		base.IP = net.ParseIP(host) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return base | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // returns self advertised node connection info (listening address w enodes) | 
					
						
							|  |  |  | // IP will get repaired on the other end if missing | 
					
						
							|  |  |  | // or resolved via ID by discovery at dialout | 
					
						
							|  |  |  | func (self *bzz) selfAddr() *peerAddr { | 
					
						
							|  |  |  | 	id := self.hive.id | 
					
						
							|  |  |  | 	host, port, _ := net.SplitHostPort(self.hive.listenAddr()) | 
					
						
							|  |  |  | 	intport, _ := strconv.Atoi(port) | 
					
						
							|  |  |  | 	addr := &peerAddr{ | 
					
						
							|  |  |  | 		Addr: self.hive.addr, | 
					
						
							|  |  |  | 		ID:   id[:], | 
					
						
							|  |  |  | 		IP:   net.ParseIP(host), | 
					
						
							|  |  |  | 		Port: uint16(intport), | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return addr | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // outgoing messages | 
					
						
							|  |  |  | // send retrieveRequestMsg | 
					
						
							|  |  |  | func (self *bzz) retrieve(req *retrieveRequestMsgData) error { | 
					
						
							|  |  |  | 	return self.send(retrieveRequestMsg, req) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // send storeRequestMsg | 
					
						
							|  |  |  | func (self *bzz) store(req *storeRequestMsgData) error { | 
					
						
							|  |  |  | 	return self.send(storeRequestMsg, req) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (self *bzz) syncRequest() error { | 
					
						
							|  |  |  | 	req := &syncRequestMsgData{} | 
					
						
							|  |  |  | 	if self.hive.syncEnabled { | 
					
						
							|  |  |  | 		glog.V(logger.Debug).Infof("syncronisation request to peer %v at state %v", self, self.syncState) | 
					
						
							|  |  |  | 		req.SyncState = self.syncState | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if self.syncState == nil { | 
					
						
							|  |  |  | 		glog.V(logger.Warn).Infof("syncronisation disabled for peer %v at state %v", self, self.syncState) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return self.send(syncRequestMsg, req) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // queue storeRequestMsg in request db | 
					
						
							|  |  |  | func (self *bzz) deliveryRequest(reqs []*syncRequest) error { | 
					
						
							|  |  |  | 	req := &deliveryRequestMsgData{ | 
					
						
							|  |  |  | 		Deliver: reqs, | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return self.send(deliveryRequestMsg, req) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // batch of syncRequests to send off | 
					
						
							|  |  |  | func (self *bzz) unsyncedKeys(reqs []*syncRequest, state *syncState) error { | 
					
						
							|  |  |  | 	req := &unsyncedKeysMsgData{ | 
					
						
							|  |  |  | 		Unsynced: reqs, | 
					
						
							|  |  |  | 		State:    state, | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return self.send(unsyncedKeysMsg, req) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // send paymentMsg | 
					
						
							|  |  |  | func (self *bzz) Pay(units int, promise swap.Promise) { | 
					
						
							|  |  |  | 	req := &paymentMsgData{uint(units), promise.(*chequebook.Cheque)} | 
					
						
							|  |  |  | 	self.payment(req) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // send paymentMsg | 
					
						
							|  |  |  | func (self *bzz) payment(req *paymentMsgData) error { | 
					
						
							|  |  |  | 	return self.send(paymentMsg, req) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // sends peersMsg | 
					
						
							|  |  |  | func (self *bzz) peers(req *peersMsgData) error { | 
					
						
							|  |  |  | 	return self.send(peersMsg, req) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (self *bzz) protoError(code int, format string, params ...interface{}) (err *errs.Error) { | 
					
						
							|  |  |  | 	err = self.errors.New(code, format, params...) | 
					
						
							|  |  |  | 	err.Log(glog.V(logger.Info)) | 
					
						
							|  |  |  | 	return | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (self *bzz) protoErrorDisconnect(err *errs.Error) { | 
					
						
							|  |  |  | 	err.Log(glog.V(logger.Info)) | 
					
						
							|  |  |  | 	if err.Fatal() { | 
					
						
							|  |  |  | 		self.peer.Disconnect(p2p.DiscSubprotocolError) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (self *bzz) send(msg uint64, data interface{}) error { | 
					
						
							|  |  |  | 	if self.hive.blockWrite { | 
					
						
							|  |  |  | 		return fmt.Errorf("network write blocked") | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	glog.V(logger.Detail).Infof("-> %v: %v (%T) to %v", msg, data, data, self) | 
					
						
							|  |  |  | 	err := p2p.Send(self.rw, msg, data) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		self.Drop() | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return err | 
					
						
							|  |  |  | } |