355 lines
		
	
	
		
			9.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
		
		
			
		
	
	
			355 lines
		
	
	
		
			9.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| 
								 | 
							
								// Copyright 2018 The go-ethereum Authors
							 | 
						||
| 
								 | 
							
								// This file is part of the go-ethereum library.
							 | 
						||
| 
								 | 
							
								//
							 | 
						||
| 
								 | 
							
								// The go-ethereum library is free software: you can redistribute it and/or modify
							 | 
						||
| 
								 | 
							
								// it under the terms of the GNU Lesser General Public License as published by
							 | 
						||
| 
								 | 
							
								// the Free Software Foundation, either version 3 of the License, or
							 | 
						||
| 
								 | 
							
								// (at your option) any later version.
							 | 
						||
| 
								 | 
							
								//
							 | 
						||
| 
								 | 
							
								// The go-ethereum library is distributed in the hope that it will be useful,
							 | 
						||
| 
								 | 
							
								// but WITHOUT ANY WARRANTY; without even the implied warranty of
							 | 
						||
| 
								 | 
							
								// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
							 | 
						||
| 
								 | 
							
								// GNU Lesser General Public License for more details.
							 | 
						||
| 
								 | 
							
								//
							 | 
						||
| 
								 | 
							
								// You should have received a copy of the GNU Lesser General Public License
							 | 
						||
| 
								 | 
							
								// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// +build !noclient,!noprotocol
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								package client
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								import (
							 | 
						||
| 
								 | 
							
									"context"
							 | 
						||
| 
								 | 
							
									"errors"
							 | 
						||
| 
								 | 
							
									"fmt"
							 | 
						||
| 
								 | 
							
									"sync"
							 | 
						||
| 
								 | 
							
									"time"
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									"github.com/ethereum/go-ethereum/common/hexutil"
							 | 
						||
| 
								 | 
							
									"github.com/ethereum/go-ethereum/p2p"
							 | 
						||
| 
								 | 
							
									"github.com/ethereum/go-ethereum/p2p/discover"
							 | 
						||
| 
								 | 
							
									"github.com/ethereum/go-ethereum/p2p/protocols"
							 | 
						||
| 
								 | 
							
									"github.com/ethereum/go-ethereum/rlp"
							 | 
						||
| 
								 | 
							
									"github.com/ethereum/go-ethereum/rpc"
							 | 
						||
| 
								 | 
							
									"github.com/ethereum/go-ethereum/swarm/log"
							 | 
						||
| 
								 | 
							
									"github.com/ethereum/go-ethereum/swarm/pss"
							 | 
						||
| 
								 | 
							
								)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								const (
							 | 
						||
| 
								 | 
							
									handshakeRetryTimeout = 1000
							 | 
						||
| 
								 | 
							
									handshakeRetryCount   = 3
							 | 
						||
| 
								 | 
							
								)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// The pss client provides devp2p emulation over pss RPC API,
							 | 
						||
| 
								 | 
							
								// giving access to pss methods from a different process
							 | 
						||
| 
								 | 
							
								type Client struct {
							 | 
						||
| 
								 | 
							
									BaseAddrHex string
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									// peers
							 | 
						||
| 
								 | 
							
									peerPool map[pss.Topic]map[string]*pssRPCRW
							 | 
						||
| 
								 | 
							
									protos   map[pss.Topic]*p2p.Protocol
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									// rpc connections
							 | 
						||
| 
								 | 
							
									rpc  *rpc.Client
							 | 
						||
| 
								 | 
							
									subs []*rpc.ClientSubscription
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									// channels
							 | 
						||
| 
								 | 
							
									topicsC chan []byte
							 | 
						||
| 
								 | 
							
									quitC   chan struct{}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									poolMu sync.Mutex
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// implements p2p.MsgReadWriter
							 | 
						||
| 
								 | 
							
								type pssRPCRW struct {
							 | 
						||
| 
								 | 
							
									*Client
							 | 
						||
| 
								 | 
							
									topic    string
							 | 
						||
| 
								 | 
							
									msgC     chan []byte
							 | 
						||
| 
								 | 
							
									addr     pss.PssAddress
							 | 
						||
| 
								 | 
							
									pubKeyId string
							 | 
						||
| 
								 | 
							
									lastSeen time.Time
							 | 
						||
| 
								 | 
							
									closed   bool
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (c *Client) newpssRPCRW(pubkeyid string, addr pss.PssAddress, topicobj pss.Topic) (*pssRPCRW, error) {
							 | 
						||
| 
								 | 
							
									topic := topicobj.String()
							 | 
						||
| 
								 | 
							
									err := c.rpc.Call(nil, "pss_setPeerPublicKey", pubkeyid, topic, hexutil.Encode(addr[:]))
							 | 
						||
| 
								 | 
							
									if err != nil {
							 | 
						||
| 
								 | 
							
										return nil, fmt.Errorf("setpeer %s %s: %v", topic, pubkeyid, err)
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
									return &pssRPCRW{
							 | 
						||
| 
								 | 
							
										Client:   c,
							 | 
						||
| 
								 | 
							
										topic:    topic,
							 | 
						||
| 
								 | 
							
										msgC:     make(chan []byte),
							 | 
						||
| 
								 | 
							
										addr:     addr,
							 | 
						||
| 
								 | 
							
										pubKeyId: pubkeyid,
							 | 
						||
| 
								 | 
							
									}, nil
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (rw *pssRPCRW) ReadMsg() (p2p.Msg, error) {
							 | 
						||
| 
								 | 
							
									msg := <-rw.msgC
							 | 
						||
| 
								 | 
							
									log.Trace("pssrpcrw read", "msg", msg)
							 | 
						||
| 
								 | 
							
									pmsg, err := pss.ToP2pMsg(msg)
							 | 
						||
| 
								 | 
							
									if err != nil {
							 | 
						||
| 
								 | 
							
										return p2p.Msg{}, err
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									return pmsg, nil
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// If only one message slot left
							 | 
						||
| 
								 | 
							
								// then new is requested through handshake
							 | 
						||
| 
								 | 
							
								// if buffer is empty, handshake request blocks until return
							 | 
						||
| 
								 | 
							
								// after which pointer is changed to first new key in buffer
							 | 
						||
| 
								 | 
							
								// will fail if:
							 | 
						||
| 
								 | 
							
								// - any api calls fail
							 | 
						||
| 
								 | 
							
								// - handshake retries are exhausted without reply,
							 | 
						||
| 
								 | 
							
								// - send fails
							 | 
						||
| 
								 | 
							
								func (rw *pssRPCRW) WriteMsg(msg p2p.Msg) error {
							 | 
						||
| 
								 | 
							
									log.Trace("got writemsg pssclient", "msg", msg)
							 | 
						||
| 
								 | 
							
									if rw.closed {
							 | 
						||
| 
								 | 
							
										return fmt.Errorf("connection closed")
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
									rlpdata := make([]byte, msg.Size)
							 | 
						||
| 
								 | 
							
									msg.Payload.Read(rlpdata)
							 | 
						||
| 
								 | 
							
									pmsg, err := rlp.EncodeToBytes(pss.ProtocolMsg{
							 | 
						||
| 
								 | 
							
										Code:    msg.Code,
							 | 
						||
| 
								 | 
							
										Size:    msg.Size,
							 | 
						||
| 
								 | 
							
										Payload: rlpdata,
							 | 
						||
| 
								 | 
							
									})
							 | 
						||
| 
								 | 
							
									if err != nil {
							 | 
						||
| 
								 | 
							
										return err
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									// Get the keys we have
							 | 
						||
| 
								 | 
							
									var symkeyids []string
							 | 
						||
| 
								 | 
							
									err = rw.Client.rpc.Call(&symkeyids, "pss_getHandshakeKeys", rw.pubKeyId, rw.topic, false, true)
							 | 
						||
| 
								 | 
							
									if err != nil {
							 | 
						||
| 
								 | 
							
										return err
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									// Check the capacity of the first key
							 | 
						||
| 
								 | 
							
									var symkeycap uint16
							 | 
						||
| 
								 | 
							
									if len(symkeyids) > 0 {
							 | 
						||
| 
								 | 
							
										err = rw.Client.rpc.Call(&symkeycap, "pss_getHandshakeKeyCapacity", symkeyids[0])
							 | 
						||
| 
								 | 
							
										if err != nil {
							 | 
						||
| 
								 | 
							
											return err
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									err = rw.Client.rpc.Call(nil, "pss_sendSym", symkeyids[0], rw.topic, hexutil.Encode(pmsg))
							 | 
						||
| 
								 | 
							
									if err != nil {
							 | 
						||
| 
								 | 
							
										return err
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									// If this is the last message it is valid for, initiate new handshake
							 | 
						||
| 
								 | 
							
									if symkeycap == 1 {
							 | 
						||
| 
								 | 
							
										var retries int
							 | 
						||
| 
								 | 
							
										var sync bool
							 | 
						||
| 
								 | 
							
										// if it's the only remaining key, make sure we don't continue until we have new ones for further writes
							 | 
						||
| 
								 | 
							
										if len(symkeyids) == 1 {
							 | 
						||
| 
								 | 
							
											sync = true
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
										// initiate handshake
							 | 
						||
| 
								 | 
							
										_, err := rw.handshake(retries, sync, false)
							 | 
						||
| 
								 | 
							
										if err != nil {
							 | 
						||
| 
								 | 
							
											log.Warn("failing", "err", err)
							 | 
						||
| 
								 | 
							
											return err
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
									return nil
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// retry and synchronicity wrapper for handshake api call
							 | 
						||
| 
								 | 
							
								// returns first new symkeyid upon successful execution
							 | 
						||
| 
								 | 
							
								func (rw *pssRPCRW) handshake(retries int, sync bool, flush bool) (string, error) {
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									var symkeyids []string
							 | 
						||
| 
								 | 
							
									var i int
							 | 
						||
| 
								 | 
							
									// request new keys
							 | 
						||
| 
								 | 
							
									// if the key buffer was depleted, make this as a blocking call and try several times before giving up
							 | 
						||
| 
								 | 
							
									for i = 0; i < 1+retries; i++ {
							 | 
						||
| 
								 | 
							
										log.Debug("handshake attempt pssrpcrw", "pubkeyid", rw.pubKeyId, "topic", rw.topic, "sync", sync)
							 | 
						||
| 
								 | 
							
										err := rw.Client.rpc.Call(&symkeyids, "pss_handshake", rw.pubKeyId, rw.topic, sync, flush)
							 | 
						||
| 
								 | 
							
										if err == nil {
							 | 
						||
| 
								 | 
							
											var keyid string
							 | 
						||
| 
								 | 
							
											if sync {
							 | 
						||
| 
								 | 
							
												keyid = symkeyids[0]
							 | 
						||
| 
								 | 
							
											}
							 | 
						||
| 
								 | 
							
											return keyid, nil
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
										if i-1+retries > 1 {
							 | 
						||
| 
								 | 
							
											time.Sleep(time.Millisecond * handshakeRetryTimeout)
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									return "", fmt.Errorf("handshake failed after %d attempts", i)
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// Custom constructor
							 | 
						||
| 
								 | 
							
								//
							 | 
						||
| 
								 | 
							
								// Provides direct access to the rpc object
							 | 
						||
| 
								 | 
							
								func NewClient(rpcurl string) (*Client, error) {
							 | 
						||
| 
								 | 
							
									rpcclient, err := rpc.Dial(rpcurl)
							 | 
						||
| 
								 | 
							
									if err != nil {
							 | 
						||
| 
								 | 
							
										return nil, err
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									client, err := NewClientWithRPC(rpcclient)
							 | 
						||
| 
								 | 
							
									if err != nil {
							 | 
						||
| 
								 | 
							
										return nil, err
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
									return client, nil
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// Main constructor
							 | 
						||
| 
								 | 
							
								//
							 | 
						||
| 
								 | 
							
								// The 'rpcclient' parameter allows passing a in-memory rpc client to act as the remote websocket RPC.
							 | 
						||
| 
								 | 
							
								func NewClientWithRPC(rpcclient *rpc.Client) (*Client, error) {
							 | 
						||
| 
								 | 
							
									client := newClient()
							 | 
						||
| 
								 | 
							
									client.rpc = rpcclient
							 | 
						||
| 
								 | 
							
									err := client.rpc.Call(&client.BaseAddrHex, "pss_baseAddr")
							 | 
						||
| 
								 | 
							
									if err != nil {
							 | 
						||
| 
								 | 
							
										return nil, fmt.Errorf("cannot get pss node baseaddress: %v", err)
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
									return client, nil
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func newClient() (client *Client) {
							 | 
						||
| 
								 | 
							
									client = &Client{
							 | 
						||
| 
								 | 
							
										quitC:    make(chan struct{}),
							 | 
						||
| 
								 | 
							
										peerPool: make(map[pss.Topic]map[string]*pssRPCRW),
							 | 
						||
| 
								 | 
							
										protos:   make(map[pss.Topic]*p2p.Protocol),
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
									return
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// Mounts a new devp2p protcool on the pss connection
							 | 
						||
| 
								 | 
							
								//
							 | 
						||
| 
								 | 
							
								// the protocol is aliased as a "pss topic"
							 | 
						||
| 
								 | 
							
								// uses normal devp2p send and incoming message handler routines from the p2p/protocols package
							 | 
						||
| 
								 | 
							
								//
							 | 
						||
| 
								 | 
							
								// when an incoming message is received from a peer that is not yet known to the client,
							 | 
						||
| 
								 | 
							
								// this peer object is instantiated, and the protocol is run on it.
							 | 
						||
| 
								 | 
							
								func (c *Client) RunProtocol(ctx context.Context, proto *p2p.Protocol) error {
							 | 
						||
| 
								 | 
							
									topicobj := pss.BytesToTopic([]byte(fmt.Sprintf("%s:%d", proto.Name, proto.Version)))
							 | 
						||
| 
								 | 
							
									topichex := topicobj.String()
							 | 
						||
| 
								 | 
							
									msgC := make(chan pss.APIMsg)
							 | 
						||
| 
								 | 
							
									c.peerPool[topicobj] = make(map[string]*pssRPCRW)
							 | 
						||
| 
								 | 
							
									sub, err := c.rpc.Subscribe(ctx, "pss", msgC, "receive", topichex)
							 | 
						||
| 
								 | 
							
									if err != nil {
							 | 
						||
| 
								 | 
							
										return fmt.Errorf("pss event subscription failed: %v", err)
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
									c.subs = append(c.subs, sub)
							 | 
						||
| 
								 | 
							
									err = c.rpc.Call(nil, "pss_addHandshake", topichex)
							 | 
						||
| 
								 | 
							
									if err != nil {
							 | 
						||
| 
								 | 
							
										return fmt.Errorf("pss handshake activation failed: %v", err)
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									// dispatch incoming messages
							 | 
						||
| 
								 | 
							
									go func() {
							 | 
						||
| 
								 | 
							
										for {
							 | 
						||
| 
								 | 
							
											select {
							 | 
						||
| 
								 | 
							
											case msg := <-msgC:
							 | 
						||
| 
								 | 
							
												// we only allow sym msgs here
							 | 
						||
| 
								 | 
							
												if msg.Asymmetric {
							 | 
						||
| 
								 | 
							
													continue
							 | 
						||
| 
								 | 
							
												}
							 | 
						||
| 
								 | 
							
												// we get passed the symkeyid
							 | 
						||
| 
								 | 
							
												// need the symkey itself to resolve to peer's pubkey
							 | 
						||
| 
								 | 
							
												var pubkeyid string
							 | 
						||
| 
								 | 
							
												err = c.rpc.Call(&pubkeyid, "pss_getHandshakePublicKey", msg.Key)
							 | 
						||
| 
								 | 
							
												if err != nil || pubkeyid == "" {
							 | 
						||
| 
								 | 
							
													log.Trace("proto err or no pubkey", "err", err, "symkeyid", msg.Key)
							 | 
						||
| 
								 | 
							
													continue
							 | 
						||
| 
								 | 
							
												}
							 | 
						||
| 
								 | 
							
												// if we don't have the peer on this protocol already, create it
							 | 
						||
| 
								 | 
							
												// this is more or less the same as AddPssPeer, less the handshake initiation
							 | 
						||
| 
								 | 
							
												if c.peerPool[topicobj][pubkeyid] == nil {
							 | 
						||
| 
								 | 
							
													var addrhex string
							 | 
						||
| 
								 | 
							
													err := c.rpc.Call(&addrhex, "pss_getAddress", topichex, false, msg.Key)
							 | 
						||
| 
								 | 
							
													if err != nil {
							 | 
						||
| 
								 | 
							
														log.Trace(err.Error())
							 | 
						||
| 
								 | 
							
														continue
							 | 
						||
| 
								 | 
							
													}
							 | 
						||
| 
								 | 
							
													addrbytes, err := hexutil.Decode(addrhex)
							 | 
						||
| 
								 | 
							
													if err != nil {
							 | 
						||
| 
								 | 
							
														log.Trace(err.Error())
							 | 
						||
| 
								 | 
							
														break
							 | 
						||
| 
								 | 
							
													}
							 | 
						||
| 
								 | 
							
													addr := pss.PssAddress(addrbytes)
							 | 
						||
| 
								 | 
							
													rw, err := c.newpssRPCRW(pubkeyid, addr, topicobj)
							 | 
						||
| 
								 | 
							
													if err != nil {
							 | 
						||
| 
								 | 
							
														break
							 | 
						||
| 
								 | 
							
													}
							 | 
						||
| 
								 | 
							
													c.peerPool[topicobj][pubkeyid] = rw
							 | 
						||
| 
								 | 
							
													nid, _ := discover.HexID("0x00")
							 | 
						||
| 
								 | 
							
													p := p2p.NewPeer(nid, fmt.Sprintf("%v", addr), []p2p.Cap{})
							 | 
						||
| 
								 | 
							
													go proto.Run(p, c.peerPool[topicobj][pubkeyid])
							 | 
						||
| 
								 | 
							
												}
							 | 
						||
| 
								 | 
							
												go func() {
							 | 
						||
| 
								 | 
							
													c.peerPool[topicobj][pubkeyid].msgC <- msg.Msg
							 | 
						||
| 
								 | 
							
												}()
							 | 
						||
| 
								 | 
							
											case <-c.quitC:
							 | 
						||
| 
								 | 
							
												return
							 | 
						||
| 
								 | 
							
											}
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
									}()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									c.protos[topicobj] = proto
							 | 
						||
| 
								 | 
							
									return nil
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// Always call this to ensure that we exit cleanly
							 | 
						||
| 
								 | 
							
								func (c *Client) Close() error {
							 | 
						||
| 
								 | 
							
									for _, s := range c.subs {
							 | 
						||
| 
								 | 
							
										s.Unsubscribe()
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
									return nil
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// Add a pss peer (public key) and run the protocol on it
							 | 
						||
| 
								 | 
							
								//
							 | 
						||
| 
								 | 
							
								// client.RunProtocol with matching topic must have been
							 | 
						||
| 
								 | 
							
								// run prior to adding the peer, or this method will
							 | 
						||
| 
								 | 
							
								// return an error.
							 | 
						||
| 
								 | 
							
								//
							 | 
						||
| 
								 | 
							
								// The key must exist in the key store of the pss node
							 | 
						||
| 
								 | 
							
								// before the peer is added. The method will return an error
							 | 
						||
| 
								 | 
							
								// if it is not.
							 | 
						||
| 
								 | 
							
								func (c *Client) AddPssPeer(pubkeyid string, addr []byte, spec *protocols.Spec) error {
							 | 
						||
| 
								 | 
							
									topic := pss.ProtocolTopic(spec)
							 | 
						||
| 
								 | 
							
									if c.peerPool[topic] == nil {
							 | 
						||
| 
								 | 
							
										return errors.New("addpeer on unset topic")
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
									if c.peerPool[topic][pubkeyid] == nil {
							 | 
						||
| 
								 | 
							
										rw, err := c.newpssRPCRW(pubkeyid, addr, topic)
							 | 
						||
| 
								 | 
							
										if err != nil {
							 | 
						||
| 
								 | 
							
											return err
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
										_, err = rw.handshake(handshakeRetryCount, true, true)
							 | 
						||
| 
								 | 
							
										if err != nil {
							 | 
						||
| 
								 | 
							
											return err
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
										c.poolMu.Lock()
							 | 
						||
| 
								 | 
							
										c.peerPool[topic][pubkeyid] = rw
							 | 
						||
| 
								 | 
							
										c.poolMu.Unlock()
							 | 
						||
| 
								 | 
							
										nid, _ := discover.HexID("0x00")
							 | 
						||
| 
								 | 
							
										p := p2p.NewPeer(nid, fmt.Sprintf("%v", addr), []p2p.Cap{})
							 | 
						||
| 
								 | 
							
										go c.protos[topic].Run(p, c.peerPool[topic][pubkeyid])
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
									return nil
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// Remove a pss peer
							 | 
						||
| 
								 | 
							
								//
							 | 
						||
| 
								 | 
							
								// TODO: underlying cleanup
							 | 
						||
| 
								 | 
							
								func (c *Client) RemovePssPeer(pubkeyid string, spec *protocols.Spec) {
							 | 
						||
| 
								 | 
							
									log.Debug("closing pss client peer", "pubkey", pubkeyid, "protoname", spec.Name, "protoversion", spec.Version)
							 | 
						||
| 
								 | 
							
									c.poolMu.Lock()
							 | 
						||
| 
								 | 
							
									defer c.poolMu.Unlock()
							 | 
						||
| 
								 | 
							
									topic := pss.ProtocolTopic(spec)
							 | 
						||
| 
								 | 
							
									c.peerPool[topic][pubkeyid].closed = true
							 | 
						||
| 
								 | 
							
									delete(c.peerPool[topic], pubkeyid)
							 | 
						||
| 
								 | 
							
								}
							 |