355 lines
		
	
	
		
			9.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			355 lines
		
	
	
		
			9.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright 2018 The go-ethereum Authors
 | |
| // This file is part of the go-ethereum library.
 | |
| //
 | |
| // The go-ethereum library is free software: you can redistribute it and/or modify
 | |
| // it under the terms of the GNU Lesser General Public License as published by
 | |
| // the Free Software Foundation, either version 3 of the License, or
 | |
| // (at your option) any later version.
 | |
| //
 | |
| // The go-ethereum library is distributed in the hope that it will be useful,
 | |
| // but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 | |
| // GNU Lesser General Public License for more details.
 | |
| //
 | |
| // You should have received a copy of the GNU Lesser General Public License
 | |
| // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
 | |
| 
 | |
| // +build !noclient,!noprotocol
 | |
| 
 | |
| package client
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/ethereum/go-ethereum/common/hexutil"
 | |
| 	"github.com/ethereum/go-ethereum/p2p"
 | |
| 	"github.com/ethereum/go-ethereum/p2p/discover"
 | |
| 	"github.com/ethereum/go-ethereum/p2p/protocols"
 | |
| 	"github.com/ethereum/go-ethereum/rlp"
 | |
| 	"github.com/ethereum/go-ethereum/rpc"
 | |
| 	"github.com/ethereum/go-ethereum/swarm/log"
 | |
| 	"github.com/ethereum/go-ethereum/swarm/pss"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	handshakeRetryTimeout = 1000
 | |
| 	handshakeRetryCount   = 3
 | |
| )
 | |
| 
 | |
| // The pss client provides devp2p emulation over pss RPC API,
 | |
| // giving access to pss methods from a different process
 | |
| type Client struct {
 | |
| 	BaseAddrHex string
 | |
| 
 | |
| 	// peers
 | |
| 	peerPool map[pss.Topic]map[string]*pssRPCRW
 | |
| 	protos   map[pss.Topic]*p2p.Protocol
 | |
| 
 | |
| 	// rpc connections
 | |
| 	rpc  *rpc.Client
 | |
| 	subs []*rpc.ClientSubscription
 | |
| 
 | |
| 	// channels
 | |
| 	topicsC chan []byte
 | |
| 	quitC   chan struct{}
 | |
| 
 | |
| 	poolMu sync.Mutex
 | |
| }
 | |
| 
 | |
| // implements p2p.MsgReadWriter
 | |
| type pssRPCRW struct {
 | |
| 	*Client
 | |
| 	topic    string
 | |
| 	msgC     chan []byte
 | |
| 	addr     pss.PssAddress
 | |
| 	pubKeyId string
 | |
| 	lastSeen time.Time
 | |
| 	closed   bool
 | |
| }
 | |
| 
 | |
| func (c *Client) newpssRPCRW(pubkeyid string, addr pss.PssAddress, topicobj pss.Topic) (*pssRPCRW, error) {
 | |
| 	topic := topicobj.String()
 | |
| 	err := c.rpc.Call(nil, "pss_setPeerPublicKey", pubkeyid, topic, hexutil.Encode(addr[:]))
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("setpeer %s %s: %v", topic, pubkeyid, err)
 | |
| 	}
 | |
| 	return &pssRPCRW{
 | |
| 		Client:   c,
 | |
| 		topic:    topic,
 | |
| 		msgC:     make(chan []byte),
 | |
| 		addr:     addr,
 | |
| 		pubKeyId: pubkeyid,
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| func (rw *pssRPCRW) ReadMsg() (p2p.Msg, error) {
 | |
| 	msg := <-rw.msgC
 | |
| 	log.Trace("pssrpcrw read", "msg", msg)
 | |
| 	pmsg, err := pss.ToP2pMsg(msg)
 | |
| 	if err != nil {
 | |
| 		return p2p.Msg{}, err
 | |
| 	}
 | |
| 
 | |
| 	return pmsg, nil
 | |
| }
 | |
| 
 | |
| // If only one message slot left
 | |
| // then new is requested through handshake
 | |
| // if buffer is empty, handshake request blocks until return
 | |
| // after which pointer is changed to first new key in buffer
 | |
| // will fail if:
 | |
| // - any api calls fail
 | |
| // - handshake retries are exhausted without reply,
 | |
| // - send fails
 | |
| func (rw *pssRPCRW) WriteMsg(msg p2p.Msg) error {
 | |
| 	log.Trace("got writemsg pssclient", "msg", msg)
 | |
| 	if rw.closed {
 | |
| 		return fmt.Errorf("connection closed")
 | |
| 	}
 | |
| 	rlpdata := make([]byte, msg.Size)
 | |
| 	msg.Payload.Read(rlpdata)
 | |
| 	pmsg, err := rlp.EncodeToBytes(pss.ProtocolMsg{
 | |
| 		Code:    msg.Code,
 | |
| 		Size:    msg.Size,
 | |
| 		Payload: rlpdata,
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	// Get the keys we have
 | |
| 	var symkeyids []string
 | |
| 	err = rw.Client.rpc.Call(&symkeyids, "pss_getHandshakeKeys", rw.pubKeyId, rw.topic, false, true)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	// Check the capacity of the first key
 | |
| 	var symkeycap uint16
 | |
| 	if len(symkeyids) > 0 {
 | |
| 		err = rw.Client.rpc.Call(&symkeycap, "pss_getHandshakeKeyCapacity", symkeyids[0])
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	err = rw.Client.rpc.Call(nil, "pss_sendSym", symkeyids[0], rw.topic, hexutil.Encode(pmsg))
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	// If this is the last message it is valid for, initiate new handshake
 | |
| 	if symkeycap == 1 {
 | |
| 		var retries int
 | |
| 		var sync bool
 | |
| 		// if it's the only remaining key, make sure we don't continue until we have new ones for further writes
 | |
| 		if len(symkeyids) == 1 {
 | |
| 			sync = true
 | |
| 		}
 | |
| 		// initiate handshake
 | |
| 		_, err := rw.handshake(retries, sync, false)
 | |
| 		if err != nil {
 | |
| 			log.Warn("failing", "err", err)
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // retry and synchronicity wrapper for handshake api call
 | |
| // returns first new symkeyid upon successful execution
 | |
| func (rw *pssRPCRW) handshake(retries int, sync bool, flush bool) (string, error) {
 | |
| 
 | |
| 	var symkeyids []string
 | |
| 	var i int
 | |
| 	// request new keys
 | |
| 	// if the key buffer was depleted, make this as a blocking call and try several times before giving up
 | |
| 	for i = 0; i < 1+retries; i++ {
 | |
| 		log.Debug("handshake attempt pssrpcrw", "pubkeyid", rw.pubKeyId, "topic", rw.topic, "sync", sync)
 | |
| 		err := rw.Client.rpc.Call(&symkeyids, "pss_handshake", rw.pubKeyId, rw.topic, sync, flush)
 | |
| 		if err == nil {
 | |
| 			var keyid string
 | |
| 			if sync {
 | |
| 				keyid = symkeyids[0]
 | |
| 			}
 | |
| 			return keyid, nil
 | |
| 		}
 | |
| 		if i-1+retries > 1 {
 | |
| 			time.Sleep(time.Millisecond * handshakeRetryTimeout)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return "", fmt.Errorf("handshake failed after %d attempts", i)
 | |
| }
 | |
| 
 | |
| // Custom constructor
 | |
| //
 | |
| // Provides direct access to the rpc object
 | |
| func NewClient(rpcurl string) (*Client, error) {
 | |
| 	rpcclient, err := rpc.Dial(rpcurl)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	client, err := NewClientWithRPC(rpcclient)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	return client, nil
 | |
| }
 | |
| 
 | |
| // Main constructor
 | |
| //
 | |
| // The 'rpcclient' parameter allows passing a in-memory rpc client to act as the remote websocket RPC.
 | |
| func NewClientWithRPC(rpcclient *rpc.Client) (*Client, error) {
 | |
| 	client := newClient()
 | |
| 	client.rpc = rpcclient
 | |
| 	err := client.rpc.Call(&client.BaseAddrHex, "pss_baseAddr")
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("cannot get pss node baseaddress: %v", err)
 | |
| 	}
 | |
| 	return client, nil
 | |
| }
 | |
| 
 | |
| func newClient() (client *Client) {
 | |
| 	client = &Client{
 | |
| 		quitC:    make(chan struct{}),
 | |
| 		peerPool: make(map[pss.Topic]map[string]*pssRPCRW),
 | |
| 		protos:   make(map[pss.Topic]*p2p.Protocol),
 | |
| 	}
 | |
| 	return
 | |
| }
 | |
| 
 | |
| // Mounts a new devp2p protcool on the pss connection
 | |
| //
 | |
| // the protocol is aliased as a "pss topic"
 | |
| // uses normal devp2p send and incoming message handler routines from the p2p/protocols package
 | |
| //
 | |
| // when an incoming message is received from a peer that is not yet known to the client,
 | |
| // this peer object is instantiated, and the protocol is run on it.
 | |
| func (c *Client) RunProtocol(ctx context.Context, proto *p2p.Protocol) error {
 | |
| 	topicobj := pss.BytesToTopic([]byte(fmt.Sprintf("%s:%d", proto.Name, proto.Version)))
 | |
| 	topichex := topicobj.String()
 | |
| 	msgC := make(chan pss.APIMsg)
 | |
| 	c.peerPool[topicobj] = make(map[string]*pssRPCRW)
 | |
| 	sub, err := c.rpc.Subscribe(ctx, "pss", msgC, "receive", topichex)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("pss event subscription failed: %v", err)
 | |
| 	}
 | |
| 	c.subs = append(c.subs, sub)
 | |
| 	err = c.rpc.Call(nil, "pss_addHandshake", topichex)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("pss handshake activation failed: %v", err)
 | |
| 	}
 | |
| 
 | |
| 	// dispatch incoming messages
 | |
| 	go func() {
 | |
| 		for {
 | |
| 			select {
 | |
| 			case msg := <-msgC:
 | |
| 				// we only allow sym msgs here
 | |
| 				if msg.Asymmetric {
 | |
| 					continue
 | |
| 				}
 | |
| 				// we get passed the symkeyid
 | |
| 				// need the symkey itself to resolve to peer's pubkey
 | |
| 				var pubkeyid string
 | |
| 				err = c.rpc.Call(&pubkeyid, "pss_getHandshakePublicKey", msg.Key)
 | |
| 				if err != nil || pubkeyid == "" {
 | |
| 					log.Trace("proto err or no pubkey", "err", err, "symkeyid", msg.Key)
 | |
| 					continue
 | |
| 				}
 | |
| 				// if we don't have the peer on this protocol already, create it
 | |
| 				// this is more or less the same as AddPssPeer, less the handshake initiation
 | |
| 				if c.peerPool[topicobj][pubkeyid] == nil {
 | |
| 					var addrhex string
 | |
| 					err := c.rpc.Call(&addrhex, "pss_getAddress", topichex, false, msg.Key)
 | |
| 					if err != nil {
 | |
| 						log.Trace(err.Error())
 | |
| 						continue
 | |
| 					}
 | |
| 					addrbytes, err := hexutil.Decode(addrhex)
 | |
| 					if err != nil {
 | |
| 						log.Trace(err.Error())
 | |
| 						break
 | |
| 					}
 | |
| 					addr := pss.PssAddress(addrbytes)
 | |
| 					rw, err := c.newpssRPCRW(pubkeyid, addr, topicobj)
 | |
| 					if err != nil {
 | |
| 						break
 | |
| 					}
 | |
| 					c.peerPool[topicobj][pubkeyid] = rw
 | |
| 					nid, _ := discover.HexID("0x00")
 | |
| 					p := p2p.NewPeer(nid, fmt.Sprintf("%v", addr), []p2p.Cap{})
 | |
| 					go proto.Run(p, c.peerPool[topicobj][pubkeyid])
 | |
| 				}
 | |
| 				go func() {
 | |
| 					c.peerPool[topicobj][pubkeyid].msgC <- msg.Msg
 | |
| 				}()
 | |
| 			case <-c.quitC:
 | |
| 				return
 | |
| 			}
 | |
| 		}
 | |
| 	}()
 | |
| 
 | |
| 	c.protos[topicobj] = proto
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Always call this to ensure that we exit cleanly
 | |
| func (c *Client) Close() error {
 | |
| 	for _, s := range c.subs {
 | |
| 		s.Unsubscribe()
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Add a pss peer (public key) and run the protocol on it
 | |
| //
 | |
| // client.RunProtocol with matching topic must have been
 | |
| // run prior to adding the peer, or this method will
 | |
| // return an error.
 | |
| //
 | |
| // The key must exist in the key store of the pss node
 | |
| // before the peer is added. The method will return an error
 | |
| // if it is not.
 | |
| func (c *Client) AddPssPeer(pubkeyid string, addr []byte, spec *protocols.Spec) error {
 | |
| 	topic := pss.ProtocolTopic(spec)
 | |
| 	if c.peerPool[topic] == nil {
 | |
| 		return errors.New("addpeer on unset topic")
 | |
| 	}
 | |
| 	if c.peerPool[topic][pubkeyid] == nil {
 | |
| 		rw, err := c.newpssRPCRW(pubkeyid, addr, topic)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		_, err = rw.handshake(handshakeRetryCount, true, true)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		c.poolMu.Lock()
 | |
| 		c.peerPool[topic][pubkeyid] = rw
 | |
| 		c.poolMu.Unlock()
 | |
| 		nid, _ := discover.HexID("0x00")
 | |
| 		p := p2p.NewPeer(nid, fmt.Sprintf("%v", addr), []p2p.Cap{})
 | |
| 		go c.protos[topic].Run(p, c.peerPool[topic][pubkeyid])
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Remove a pss peer
 | |
| //
 | |
| // TODO: underlying cleanup
 | |
| func (c *Client) RemovePssPeer(pubkeyid string, spec *protocols.Spec) {
 | |
| 	log.Debug("closing pss client peer", "pubkey", pubkeyid, "protoname", spec.Name, "protoversion", spec.Version)
 | |
| 	c.poolMu.Lock()
 | |
| 	defer c.poolMu.Unlock()
 | |
| 	topic := pss.ProtocolTopic(spec)
 | |
| 	c.peerPool[topic][pubkeyid].closed = true
 | |
| 	delete(c.peerPool[topic], pubkeyid)
 | |
| }
 |