355 lines
		
	
	
		
			9.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
		
		
			
		
	
	
			355 lines
		
	
	
		
			9.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
|   | // Copyright 2018 The go-ethereum Authors | ||
|  | // This file is part of the go-ethereum library. | ||
|  | // | ||
|  | // The go-ethereum library is free software: you can redistribute it and/or modify | ||
|  | // it under the terms of the GNU Lesser General Public License as published by | ||
|  | // the Free Software Foundation, either version 3 of the License, or | ||
|  | // (at your option) any later version. | ||
|  | // | ||
|  | // The go-ethereum library is distributed in the hope that it will be useful, | ||
|  | // but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
|  | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
|  | // GNU Lesser General Public License for more details. | ||
|  | // | ||
|  | // You should have received a copy of the GNU Lesser General Public License | ||
|  | // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. | ||
|  | 
 | ||
|  | // +build !noclient,!noprotocol | ||
|  | 
 | ||
|  | package client | ||
|  | 
 | ||
|  | import ( | ||
|  | 	"context" | ||
|  | 	"errors" | ||
|  | 	"fmt" | ||
|  | 	"sync" | ||
|  | 	"time" | ||
|  | 
 | ||
|  | 	"github.com/ethereum/go-ethereum/common/hexutil" | ||
|  | 	"github.com/ethereum/go-ethereum/p2p" | ||
|  | 	"github.com/ethereum/go-ethereum/p2p/discover" | ||
|  | 	"github.com/ethereum/go-ethereum/p2p/protocols" | ||
|  | 	"github.com/ethereum/go-ethereum/rlp" | ||
|  | 	"github.com/ethereum/go-ethereum/rpc" | ||
|  | 	"github.com/ethereum/go-ethereum/swarm/log" | ||
|  | 	"github.com/ethereum/go-ethereum/swarm/pss" | ||
|  | ) | ||
|  | 
 | ||
|  | const ( | ||
|  | 	handshakeRetryTimeout = 1000 | ||
|  | 	handshakeRetryCount   = 3 | ||
|  | ) | ||
|  | 
 | ||
|  | // The pss client provides devp2p emulation over pss RPC API, | ||
|  | // giving access to pss methods from a different process | ||
|  | type Client struct { | ||
|  | 	BaseAddrHex string | ||
|  | 
 | ||
|  | 	// peers | ||
|  | 	peerPool map[pss.Topic]map[string]*pssRPCRW | ||
|  | 	protos   map[pss.Topic]*p2p.Protocol | ||
|  | 
 | ||
|  | 	// rpc connections | ||
|  | 	rpc  *rpc.Client | ||
|  | 	subs []*rpc.ClientSubscription | ||
|  | 
 | ||
|  | 	// channels | ||
|  | 	topicsC chan []byte | ||
|  | 	quitC   chan struct{} | ||
|  | 
 | ||
|  | 	poolMu sync.Mutex | ||
|  | } | ||
|  | 
 | ||
|  | // implements p2p.MsgReadWriter | ||
|  | type pssRPCRW struct { | ||
|  | 	*Client | ||
|  | 	topic    string | ||
|  | 	msgC     chan []byte | ||
|  | 	addr     pss.PssAddress | ||
|  | 	pubKeyId string | ||
|  | 	lastSeen time.Time | ||
|  | 	closed   bool | ||
|  | } | ||
|  | 
 | ||
|  | func (c *Client) newpssRPCRW(pubkeyid string, addr pss.PssAddress, topicobj pss.Topic) (*pssRPCRW, error) { | ||
|  | 	topic := topicobj.String() | ||
|  | 	err := c.rpc.Call(nil, "pss_setPeerPublicKey", pubkeyid, topic, hexutil.Encode(addr[:])) | ||
|  | 	if err != nil { | ||
|  | 		return nil, fmt.Errorf("setpeer %s %s: %v", topic, pubkeyid, err) | ||
|  | 	} | ||
|  | 	return &pssRPCRW{ | ||
|  | 		Client:   c, | ||
|  | 		topic:    topic, | ||
|  | 		msgC:     make(chan []byte), | ||
|  | 		addr:     addr, | ||
|  | 		pubKeyId: pubkeyid, | ||
|  | 	}, nil | ||
|  | } | ||
|  | 
 | ||
|  | func (rw *pssRPCRW) ReadMsg() (p2p.Msg, error) { | ||
|  | 	msg := <-rw.msgC | ||
|  | 	log.Trace("pssrpcrw read", "msg", msg) | ||
|  | 	pmsg, err := pss.ToP2pMsg(msg) | ||
|  | 	if err != nil { | ||
|  | 		return p2p.Msg{}, err | ||
|  | 	} | ||
|  | 
 | ||
|  | 	return pmsg, nil | ||
|  | } | ||
|  | 
 | ||
|  | // If only one message slot left | ||
|  | // then new is requested through handshake | ||
|  | // if buffer is empty, handshake request blocks until return | ||
|  | // after which pointer is changed to first new key in buffer | ||
|  | // will fail if: | ||
|  | // - any api calls fail | ||
|  | // - handshake retries are exhausted without reply, | ||
|  | // - send fails | ||
|  | func (rw *pssRPCRW) WriteMsg(msg p2p.Msg) error { | ||
|  | 	log.Trace("got writemsg pssclient", "msg", msg) | ||
|  | 	if rw.closed { | ||
|  | 		return fmt.Errorf("connection closed") | ||
|  | 	} | ||
|  | 	rlpdata := make([]byte, msg.Size) | ||
|  | 	msg.Payload.Read(rlpdata) | ||
|  | 	pmsg, err := rlp.EncodeToBytes(pss.ProtocolMsg{ | ||
|  | 		Code:    msg.Code, | ||
|  | 		Size:    msg.Size, | ||
|  | 		Payload: rlpdata, | ||
|  | 	}) | ||
|  | 	if err != nil { | ||
|  | 		return err | ||
|  | 	} | ||
|  | 
 | ||
|  | 	// Get the keys we have | ||
|  | 	var symkeyids []string | ||
|  | 	err = rw.Client.rpc.Call(&symkeyids, "pss_getHandshakeKeys", rw.pubKeyId, rw.topic, false, true) | ||
|  | 	if err != nil { | ||
|  | 		return err | ||
|  | 	} | ||
|  | 
 | ||
|  | 	// Check the capacity of the first key | ||
|  | 	var symkeycap uint16 | ||
|  | 	if len(symkeyids) > 0 { | ||
|  | 		err = rw.Client.rpc.Call(&symkeycap, "pss_getHandshakeKeyCapacity", symkeyids[0]) | ||
|  | 		if err != nil { | ||
|  | 			return err | ||
|  | 		} | ||
|  | 	} | ||
|  | 
 | ||
|  | 	err = rw.Client.rpc.Call(nil, "pss_sendSym", symkeyids[0], rw.topic, hexutil.Encode(pmsg)) | ||
|  | 	if err != nil { | ||
|  | 		return err | ||
|  | 	} | ||
|  | 
 | ||
|  | 	// If this is the last message it is valid for, initiate new handshake | ||
|  | 	if symkeycap == 1 { | ||
|  | 		var retries int | ||
|  | 		var sync bool | ||
|  | 		// if it's the only remaining key, make sure we don't continue until we have new ones for further writes | ||
|  | 		if len(symkeyids) == 1 { | ||
|  | 			sync = true | ||
|  | 		} | ||
|  | 		// initiate handshake | ||
|  | 		_, err := rw.handshake(retries, sync, false) | ||
|  | 		if err != nil { | ||
|  | 			log.Warn("failing", "err", err) | ||
|  | 			return err | ||
|  | 		} | ||
|  | 	} | ||
|  | 	return nil | ||
|  | } | ||
|  | 
 | ||
|  | // retry and synchronicity wrapper for handshake api call | ||
|  | // returns first new symkeyid upon successful execution | ||
|  | func (rw *pssRPCRW) handshake(retries int, sync bool, flush bool) (string, error) { | ||
|  | 
 | ||
|  | 	var symkeyids []string | ||
|  | 	var i int | ||
|  | 	// request new keys | ||
|  | 	// if the key buffer was depleted, make this as a blocking call and try several times before giving up | ||
|  | 	for i = 0; i < 1+retries; i++ { | ||
|  | 		log.Debug("handshake attempt pssrpcrw", "pubkeyid", rw.pubKeyId, "topic", rw.topic, "sync", sync) | ||
|  | 		err := rw.Client.rpc.Call(&symkeyids, "pss_handshake", rw.pubKeyId, rw.topic, sync, flush) | ||
|  | 		if err == nil { | ||
|  | 			var keyid string | ||
|  | 			if sync { | ||
|  | 				keyid = symkeyids[0] | ||
|  | 			} | ||
|  | 			return keyid, nil | ||
|  | 		} | ||
|  | 		if i-1+retries > 1 { | ||
|  | 			time.Sleep(time.Millisecond * handshakeRetryTimeout) | ||
|  | 		} | ||
|  | 	} | ||
|  | 
 | ||
|  | 	return "", fmt.Errorf("handshake failed after %d attempts", i) | ||
|  | } | ||
|  | 
 | ||
|  | // Custom constructor | ||
|  | // | ||
|  | // Provides direct access to the rpc object | ||
|  | func NewClient(rpcurl string) (*Client, error) { | ||
|  | 	rpcclient, err := rpc.Dial(rpcurl) | ||
|  | 	if err != nil { | ||
|  | 		return nil, err | ||
|  | 	} | ||
|  | 
 | ||
|  | 	client, err := NewClientWithRPC(rpcclient) | ||
|  | 	if err != nil { | ||
|  | 		return nil, err | ||
|  | 	} | ||
|  | 	return client, nil | ||
|  | } | ||
|  | 
 | ||
|  | // Main constructor | ||
|  | // | ||
|  | // The 'rpcclient' parameter allows passing a in-memory rpc client to act as the remote websocket RPC. | ||
|  | func NewClientWithRPC(rpcclient *rpc.Client) (*Client, error) { | ||
|  | 	client := newClient() | ||
|  | 	client.rpc = rpcclient | ||
|  | 	err := client.rpc.Call(&client.BaseAddrHex, "pss_baseAddr") | ||
|  | 	if err != nil { | ||
|  | 		return nil, fmt.Errorf("cannot get pss node baseaddress: %v", err) | ||
|  | 	} | ||
|  | 	return client, nil | ||
|  | } | ||
|  | 
 | ||
|  | func newClient() (client *Client) { | ||
|  | 	client = &Client{ | ||
|  | 		quitC:    make(chan struct{}), | ||
|  | 		peerPool: make(map[pss.Topic]map[string]*pssRPCRW), | ||
|  | 		protos:   make(map[pss.Topic]*p2p.Protocol), | ||
|  | 	} | ||
|  | 	return | ||
|  | } | ||
|  | 
 | ||
|  | // Mounts a new devp2p protcool on the pss connection | ||
|  | // | ||
|  | // the protocol is aliased as a "pss topic" | ||
|  | // uses normal devp2p send and incoming message handler routines from the p2p/protocols package | ||
|  | // | ||
|  | // when an incoming message is received from a peer that is not yet known to the client, | ||
|  | // this peer object is instantiated, and the protocol is run on it. | ||
|  | func (c *Client) RunProtocol(ctx context.Context, proto *p2p.Protocol) error { | ||
|  | 	topicobj := pss.BytesToTopic([]byte(fmt.Sprintf("%s:%d", proto.Name, proto.Version))) | ||
|  | 	topichex := topicobj.String() | ||
|  | 	msgC := make(chan pss.APIMsg) | ||
|  | 	c.peerPool[topicobj] = make(map[string]*pssRPCRW) | ||
|  | 	sub, err := c.rpc.Subscribe(ctx, "pss", msgC, "receive", topichex) | ||
|  | 	if err != nil { | ||
|  | 		return fmt.Errorf("pss event subscription failed: %v", err) | ||
|  | 	} | ||
|  | 	c.subs = append(c.subs, sub) | ||
|  | 	err = c.rpc.Call(nil, "pss_addHandshake", topichex) | ||
|  | 	if err != nil { | ||
|  | 		return fmt.Errorf("pss handshake activation failed: %v", err) | ||
|  | 	} | ||
|  | 
 | ||
|  | 	// dispatch incoming messages | ||
|  | 	go func() { | ||
|  | 		for { | ||
|  | 			select { | ||
|  | 			case msg := <-msgC: | ||
|  | 				// we only allow sym msgs here | ||
|  | 				if msg.Asymmetric { | ||
|  | 					continue | ||
|  | 				} | ||
|  | 				// we get passed the symkeyid | ||
|  | 				// need the symkey itself to resolve to peer's pubkey | ||
|  | 				var pubkeyid string | ||
|  | 				err = c.rpc.Call(&pubkeyid, "pss_getHandshakePublicKey", msg.Key) | ||
|  | 				if err != nil || pubkeyid == "" { | ||
|  | 					log.Trace("proto err or no pubkey", "err", err, "symkeyid", msg.Key) | ||
|  | 					continue | ||
|  | 				} | ||
|  | 				// if we don't have the peer on this protocol already, create it | ||
|  | 				// this is more or less the same as AddPssPeer, less the handshake initiation | ||
|  | 				if c.peerPool[topicobj][pubkeyid] == nil { | ||
|  | 					var addrhex string | ||
|  | 					err := c.rpc.Call(&addrhex, "pss_getAddress", topichex, false, msg.Key) | ||
|  | 					if err != nil { | ||
|  | 						log.Trace(err.Error()) | ||
|  | 						continue | ||
|  | 					} | ||
|  | 					addrbytes, err := hexutil.Decode(addrhex) | ||
|  | 					if err != nil { | ||
|  | 						log.Trace(err.Error()) | ||
|  | 						break | ||
|  | 					} | ||
|  | 					addr := pss.PssAddress(addrbytes) | ||
|  | 					rw, err := c.newpssRPCRW(pubkeyid, addr, topicobj) | ||
|  | 					if err != nil { | ||
|  | 						break | ||
|  | 					} | ||
|  | 					c.peerPool[topicobj][pubkeyid] = rw | ||
|  | 					nid, _ := discover.HexID("0x00") | ||
|  | 					p := p2p.NewPeer(nid, fmt.Sprintf("%v", addr), []p2p.Cap{}) | ||
|  | 					go proto.Run(p, c.peerPool[topicobj][pubkeyid]) | ||
|  | 				} | ||
|  | 				go func() { | ||
|  | 					c.peerPool[topicobj][pubkeyid].msgC <- msg.Msg | ||
|  | 				}() | ||
|  | 			case <-c.quitC: | ||
|  | 				return | ||
|  | 			} | ||
|  | 		} | ||
|  | 	}() | ||
|  | 
 | ||
|  | 	c.protos[topicobj] = proto | ||
|  | 	return nil | ||
|  | } | ||
|  | 
 | ||
|  | // Always call this to ensure that we exit cleanly | ||
|  | func (c *Client) Close() error { | ||
|  | 	for _, s := range c.subs { | ||
|  | 		s.Unsubscribe() | ||
|  | 	} | ||
|  | 	return nil | ||
|  | } | ||
|  | 
 | ||
|  | // Add a pss peer (public key) and run the protocol on it | ||
|  | // | ||
|  | // client.RunProtocol with matching topic must have been | ||
|  | // run prior to adding the peer, or this method will | ||
|  | // return an error. | ||
|  | // | ||
|  | // The key must exist in the key store of the pss node | ||
|  | // before the peer is added. The method will return an error | ||
|  | // if it is not. | ||
|  | func (c *Client) AddPssPeer(pubkeyid string, addr []byte, spec *protocols.Spec) error { | ||
|  | 	topic := pss.ProtocolTopic(spec) | ||
|  | 	if c.peerPool[topic] == nil { | ||
|  | 		return errors.New("addpeer on unset topic") | ||
|  | 	} | ||
|  | 	if c.peerPool[topic][pubkeyid] == nil { | ||
|  | 		rw, err := c.newpssRPCRW(pubkeyid, addr, topic) | ||
|  | 		if err != nil { | ||
|  | 			return err | ||
|  | 		} | ||
|  | 		_, err = rw.handshake(handshakeRetryCount, true, true) | ||
|  | 		if err != nil { | ||
|  | 			return err | ||
|  | 		} | ||
|  | 		c.poolMu.Lock() | ||
|  | 		c.peerPool[topic][pubkeyid] = rw | ||
|  | 		c.poolMu.Unlock() | ||
|  | 		nid, _ := discover.HexID("0x00") | ||
|  | 		p := p2p.NewPeer(nid, fmt.Sprintf("%v", addr), []p2p.Cap{}) | ||
|  | 		go c.protos[topic].Run(p, c.peerPool[topic][pubkeyid]) | ||
|  | 	} | ||
|  | 	return nil | ||
|  | } | ||
|  | 
 | ||
|  | // Remove a pss peer | ||
|  | // | ||
|  | // TODO: underlying cleanup | ||
|  | func (c *Client) RemovePssPeer(pubkeyid string, spec *protocols.Spec) { | ||
|  | 	log.Debug("closing pss client peer", "pubkey", pubkeyid, "protoname", spec.Name, "protoversion", spec.Version) | ||
|  | 	c.poolMu.Lock() | ||
|  | 	defer c.poolMu.Unlock() | ||
|  | 	topic := pss.ProtocolTopic(spec) | ||
|  | 	c.peerPool[topic][pubkeyid].closed = true | ||
|  | 	delete(c.peerPool[topic], pubkeyid) | ||
|  | } |