468 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			468 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package p2p
 | 
						|
 | 
						|
import (
 | 
						|
	"bytes"
 | 
						|
	"errors"
 | 
						|
	"fmt"
 | 
						|
	"net"
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/ethereum/go-ethereum/logger"
 | 
						|
)
 | 
						|
 | 
						|
const (
 | 
						|
	outboundAddressPoolSize   = 500
 | 
						|
	defaultDialTimeout        = 10 * time.Second
 | 
						|
	portMappingUpdateInterval = 15 * time.Minute
 | 
						|
	portMappingTimeout        = 20 * time.Minute
 | 
						|
)
 | 
						|
 | 
						|
var srvlog = logger.NewLogger("P2P Server")
 | 
						|
 | 
						|
// Server manages all peer connections.
 | 
						|
//
 | 
						|
// The fields of Server are used as configuration parameters.
 | 
						|
// You should set them before starting the Server. Fields may not be
 | 
						|
// modified while the server is running.
 | 
						|
type Server struct {
 | 
						|
	// This field must be set to a valid client identity.
 | 
						|
	Identity ClientIdentity
 | 
						|
 | 
						|
	// MaxPeers is the maximum number of peers that can be
 | 
						|
	// connected. It must be greater than zero.
 | 
						|
	MaxPeers int
 | 
						|
 | 
						|
	// Protocols should contain the protocols supported
 | 
						|
	// by the server. Matching protocols are launched for
 | 
						|
	// each peer.
 | 
						|
	Protocols []Protocol
 | 
						|
 | 
						|
	// If Blacklist is set to a non-nil value, the given Blacklist
 | 
						|
	// is used to verify peer connections.
 | 
						|
	Blacklist Blacklist
 | 
						|
 | 
						|
	// If ListenAddr is set to a non-nil address, the server
 | 
						|
	// will listen for incoming connections.
 | 
						|
	//
 | 
						|
	// If the port is zero, the operating system will pick a port. The
 | 
						|
	// ListenAddr field will be updated with the actual address when
 | 
						|
	// the server is started.
 | 
						|
	ListenAddr string
 | 
						|
 | 
						|
	// If set to a non-nil value, the given NAT port mapper
 | 
						|
	// is used to make the listening port available to the
 | 
						|
	// Internet.
 | 
						|
	NAT NAT
 | 
						|
 | 
						|
	// If Dialer is set to a non-nil value, the given Dialer
 | 
						|
	// is used to dial outbound peer connections.
 | 
						|
	Dialer *net.Dialer
 | 
						|
 | 
						|
	// If NoDial is true, the server will not dial any peers.
 | 
						|
	NoDial bool
 | 
						|
 | 
						|
	// Hook for testing. This is useful because we can inhibit
 | 
						|
	// the whole protocol stack.
 | 
						|
	newPeerFunc peerFunc
 | 
						|
 | 
						|
	lock      sync.RWMutex
 | 
						|
	running   bool
 | 
						|
	listener  net.Listener
 | 
						|
	laddr     *net.TCPAddr // real listen addr
 | 
						|
	peers     []*Peer
 | 
						|
	peerSlots chan int
 | 
						|
	peerCount int
 | 
						|
 | 
						|
	quit           chan struct{}
 | 
						|
	wg             sync.WaitGroup
 | 
						|
	peerConnect    chan *peerAddr
 | 
						|
	peerDisconnect chan *Peer
 | 
						|
}
 | 
						|
 | 
						|
// NAT is implemented by NAT traversal methods.
 | 
						|
type NAT interface {
 | 
						|
	GetExternalAddress() (net.IP, error)
 | 
						|
	AddPortMapping(protocol string, extport, intport int, name string, lifetime time.Duration) error
 | 
						|
	DeletePortMapping(protocol string, extport, intport int) error
 | 
						|
 | 
						|
	// Should return name of the method.
 | 
						|
	String() string
 | 
						|
}
 | 
						|
 | 
						|
type peerFunc func(srv *Server, c net.Conn, dialAddr *peerAddr) *Peer
 | 
						|
 | 
						|
// Peers returns all connected peers.
 | 
						|
func (srv *Server) Peers() (peers []*Peer) {
 | 
						|
	srv.lock.RLock()
 | 
						|
	defer srv.lock.RUnlock()
 | 
						|
	for _, peer := range srv.peers {
 | 
						|
		if peer != nil {
 | 
						|
			peers = append(peers, peer)
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
// PeerCount returns the number of connected peers.
 | 
						|
func (srv *Server) PeerCount() int {
 | 
						|
	srv.lock.RLock()
 | 
						|
	defer srv.lock.RUnlock()
 | 
						|
	return srv.peerCount
 | 
						|
}
 | 
						|
 | 
						|
// SuggestPeer injects an address into the outbound address pool.
 | 
						|
func (srv *Server) SuggestPeer(ip net.IP, port int, nodeID []byte) {
 | 
						|
	addr := &peerAddr{ip, uint64(port), nodeID}
 | 
						|
	select {
 | 
						|
	case srv.peerConnect <- addr:
 | 
						|
	default: // don't block
 | 
						|
		srvlog.Warnf("peer suggestion %v ignored", addr)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Broadcast sends an RLP-encoded message to all connected peers.
 | 
						|
// This method is deprecated and will be removed later.
 | 
						|
func (srv *Server) Broadcast(protocol string, code uint64, data ...interface{}) {
 | 
						|
	var payload []byte
 | 
						|
	if data != nil {
 | 
						|
		payload = encodePayload(data...)
 | 
						|
	}
 | 
						|
	srv.lock.RLock()
 | 
						|
	defer srv.lock.RUnlock()
 | 
						|
	for _, peer := range srv.peers {
 | 
						|
		if peer != nil {
 | 
						|
			var msg = Msg{Code: code}
 | 
						|
			if data != nil {
 | 
						|
				msg.Payload = bytes.NewReader(payload)
 | 
						|
				msg.Size = uint32(len(payload))
 | 
						|
			}
 | 
						|
			peer.writeProtoMsg(protocol, msg)
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Start starts running the server.
 | 
						|
// Servers can be re-used and started again after stopping.
 | 
						|
func (srv *Server) Start() (err error) {
 | 
						|
	srv.lock.Lock()
 | 
						|
	defer srv.lock.Unlock()
 | 
						|
	if srv.running {
 | 
						|
		return errors.New("server already running")
 | 
						|
	}
 | 
						|
	srvlog.Infoln("Starting Server")
 | 
						|
 | 
						|
	// initialize fields
 | 
						|
	if srv.Identity == nil {
 | 
						|
		return fmt.Errorf("Server.Identity must be set to a non-nil identity")
 | 
						|
	}
 | 
						|
	if srv.MaxPeers <= 0 {
 | 
						|
		return fmt.Errorf("Server.MaxPeers must be > 0")
 | 
						|
	}
 | 
						|
	srv.quit = make(chan struct{})
 | 
						|
	srv.peers = make([]*Peer, srv.MaxPeers)
 | 
						|
	srv.peerSlots = make(chan int, srv.MaxPeers)
 | 
						|
	srv.peerConnect = make(chan *peerAddr, outboundAddressPoolSize)
 | 
						|
	srv.peerDisconnect = make(chan *Peer)
 | 
						|
	if srv.newPeerFunc == nil {
 | 
						|
		srv.newPeerFunc = newServerPeer
 | 
						|
	}
 | 
						|
	if srv.Blacklist == nil {
 | 
						|
		srv.Blacklist = NewBlacklist()
 | 
						|
	}
 | 
						|
	if srv.Dialer == nil {
 | 
						|
		srv.Dialer = &net.Dialer{Timeout: defaultDialTimeout}
 | 
						|
	}
 | 
						|
 | 
						|
	if srv.ListenAddr != "" {
 | 
						|
		if err := srv.startListening(); err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
	}
 | 
						|
	if !srv.NoDial {
 | 
						|
		srv.wg.Add(1)
 | 
						|
		go srv.dialLoop()
 | 
						|
	}
 | 
						|
	if srv.NoDial && srv.ListenAddr == "" {
 | 
						|
		srvlog.Warnln("I will be kind-of useless, neither dialing nor listening.")
 | 
						|
	}
 | 
						|
 | 
						|
	// make all slots available
 | 
						|
	for i := range srv.peers {
 | 
						|
		srv.peerSlots <- i
 | 
						|
	}
 | 
						|
	// note: discLoop is not part of WaitGroup
 | 
						|
	go srv.discLoop()
 | 
						|
	srv.running = true
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (srv *Server) startListening() error {
 | 
						|
	listener, err := net.Listen("tcp", srv.ListenAddr)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	srv.ListenAddr = listener.Addr().String()
 | 
						|
	srv.laddr = listener.Addr().(*net.TCPAddr)
 | 
						|
	srv.listener = listener
 | 
						|
	srv.wg.Add(1)
 | 
						|
	go srv.listenLoop()
 | 
						|
	if !srv.laddr.IP.IsLoopback() && srv.NAT != nil {
 | 
						|
		srv.wg.Add(1)
 | 
						|
		go srv.natLoop(srv.laddr.Port)
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// Stop terminates the server and all active peer connections.
 | 
						|
// It blocks until all active connections have been closed.
 | 
						|
func (srv *Server) Stop() {
 | 
						|
	srv.lock.Lock()
 | 
						|
	if !srv.running {
 | 
						|
		srv.lock.Unlock()
 | 
						|
		return
 | 
						|
	}
 | 
						|
	srv.running = false
 | 
						|
	srv.lock.Unlock()
 | 
						|
 | 
						|
	srvlog.Infoln("Stopping server")
 | 
						|
	if srv.listener != nil {
 | 
						|
		// this unblocks listener Accept
 | 
						|
		srv.listener.Close()
 | 
						|
	}
 | 
						|
	close(srv.quit)
 | 
						|
	for _, peer := range srv.Peers() {
 | 
						|
		peer.Disconnect(DiscQuitting)
 | 
						|
	}
 | 
						|
	srv.wg.Wait()
 | 
						|
 | 
						|
	// wait till they actually disconnect
 | 
						|
	// this is checked by claiming all peerSlots.
 | 
						|
	// slots become available as the peers disconnect.
 | 
						|
	for i := 0; i < cap(srv.peerSlots); i++ {
 | 
						|
		<-srv.peerSlots
 | 
						|
	}
 | 
						|
	// terminate discLoop
 | 
						|
	close(srv.peerDisconnect)
 | 
						|
}
 | 
						|
 | 
						|
func (srv *Server) discLoop() {
 | 
						|
	for peer := range srv.peerDisconnect {
 | 
						|
		srv.removePeer(peer)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// main loop for adding connections via listening
 | 
						|
func (srv *Server) listenLoop() {
 | 
						|
	defer srv.wg.Done()
 | 
						|
 | 
						|
	srvlog.Infoln("Listening on", srv.listener.Addr())
 | 
						|
	for {
 | 
						|
		select {
 | 
						|
		case slot := <-srv.peerSlots:
 | 
						|
			srvlog.Debugf("grabbed slot %v for listening", slot)
 | 
						|
			conn, err := srv.listener.Accept()
 | 
						|
			if err != nil {
 | 
						|
				srv.peerSlots <- slot
 | 
						|
				return
 | 
						|
			}
 | 
						|
			srvlog.Debugf("Accepted conn %v (slot %d)\n", conn.RemoteAddr(), slot)
 | 
						|
			srv.addPeer(conn, nil, slot)
 | 
						|
		case <-srv.quit:
 | 
						|
			return
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (srv *Server) natLoop(port int) {
 | 
						|
	defer srv.wg.Done()
 | 
						|
	for {
 | 
						|
		srv.updatePortMapping(port)
 | 
						|
		select {
 | 
						|
		case <-time.After(portMappingUpdateInterval):
 | 
						|
			// one more round
 | 
						|
		case <-srv.quit:
 | 
						|
			srv.removePortMapping(port)
 | 
						|
			return
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (srv *Server) updatePortMapping(port int) {
 | 
						|
	srvlog.Infoln("Attempting to map port", port, "with", srv.NAT)
 | 
						|
	err := srv.NAT.AddPortMapping("tcp", port, port, "ethereum p2p", portMappingTimeout)
 | 
						|
	if err != nil {
 | 
						|
		srvlog.Errorln("Port mapping error:", err)
 | 
						|
		return
 | 
						|
	}
 | 
						|
	extip, err := srv.NAT.GetExternalAddress()
 | 
						|
	if err != nil {
 | 
						|
		srvlog.Errorln("Error getting external IP:", err)
 | 
						|
		return
 | 
						|
	}
 | 
						|
	srv.lock.Lock()
 | 
						|
	extaddr := *(srv.listener.Addr().(*net.TCPAddr))
 | 
						|
	extaddr.IP = extip
 | 
						|
	srvlog.Infoln("Mapped port, external addr is", &extaddr)
 | 
						|
	srv.laddr = &extaddr
 | 
						|
	srv.lock.Unlock()
 | 
						|
}
 | 
						|
 | 
						|
func (srv *Server) removePortMapping(port int) {
 | 
						|
	srvlog.Infoln("Removing port mapping for", port, "with", srv.NAT)
 | 
						|
	srv.NAT.DeletePortMapping("tcp", port, port)
 | 
						|
}
 | 
						|
 | 
						|
func (srv *Server) dialLoop() {
 | 
						|
	defer srv.wg.Done()
 | 
						|
	var (
 | 
						|
		suggest chan *peerAddr
 | 
						|
		slot    *int
 | 
						|
		slots   = srv.peerSlots
 | 
						|
	)
 | 
						|
	for {
 | 
						|
		select {
 | 
						|
		case i := <-slots:
 | 
						|
			// we need a peer in slot i, slot reserved
 | 
						|
			slot = &i
 | 
						|
			// now we can watch for candidate peers in the next loop
 | 
						|
			suggest = srv.peerConnect
 | 
						|
			// do not consume more until candidate peer is found
 | 
						|
			slots = nil
 | 
						|
 | 
						|
		case desc := <-suggest:
 | 
						|
			// candidate peer found, will dial out asyncronously
 | 
						|
			// if connection fails slot will be released
 | 
						|
			srvlog.DebugDetailf("dial %v (%v)", desc, *slot)
 | 
						|
			go srv.dialPeer(desc, *slot)
 | 
						|
			// we can watch if more peers needed in the next loop
 | 
						|
			slots = srv.peerSlots
 | 
						|
			// until then we dont care about candidate peers
 | 
						|
			suggest = nil
 | 
						|
 | 
						|
		case <-srv.quit:
 | 
						|
			// give back the currently reserved slot
 | 
						|
			if slot != nil {
 | 
						|
				srv.peerSlots <- *slot
 | 
						|
			}
 | 
						|
			return
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// connect to peer via dial out
 | 
						|
func (srv *Server) dialPeer(desc *peerAddr, slot int) {
 | 
						|
	srvlog.Debugf("Dialing %v (slot %d)\n", desc, slot)
 | 
						|
	conn, err := srv.Dialer.Dial(desc.Network(), desc.String())
 | 
						|
	if err != nil {
 | 
						|
		srvlog.DebugDetailf("dial error: %v", err)
 | 
						|
		srv.peerSlots <- slot
 | 
						|
		return
 | 
						|
	}
 | 
						|
	go srv.addPeer(conn, desc, slot)
 | 
						|
}
 | 
						|
 | 
						|
// creates the new peer object and inserts it into its slot
 | 
						|
func (srv *Server) addPeer(conn net.Conn, desc *peerAddr, slot int) *Peer {
 | 
						|
	srv.lock.Lock()
 | 
						|
	defer srv.lock.Unlock()
 | 
						|
	if !srv.running {
 | 
						|
		conn.Close()
 | 
						|
		srv.peerSlots <- slot // release slot
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	peer := srv.newPeerFunc(srv, conn, desc)
 | 
						|
	peer.slot = slot
 | 
						|
	srv.peers[slot] = peer
 | 
						|
	srv.peerCount++
 | 
						|
	go func() { peer.loop(); srv.peerDisconnect <- peer }()
 | 
						|
	return peer
 | 
						|
}
 | 
						|
 | 
						|
// removes peer: sending disconnect msg, stop peer, remove rom list/table, release slot
 | 
						|
func (srv *Server) removePeer(peer *Peer) {
 | 
						|
	srv.lock.Lock()
 | 
						|
	defer srv.lock.Unlock()
 | 
						|
	srvlog.Debugf("Removing %v (slot %v)\n", peer, peer.slot)
 | 
						|
	if srv.peers[peer.slot] != peer {
 | 
						|
		srvlog.Warnln("Invalid peer to remove:", peer)
 | 
						|
		return
 | 
						|
	}
 | 
						|
	// remove from list and index
 | 
						|
	srv.peerCount--
 | 
						|
	srv.peers[peer.slot] = nil
 | 
						|
	// release slot to signal need for a new peer, last!
 | 
						|
	srv.peerSlots <- peer.slot
 | 
						|
}
 | 
						|
 | 
						|
func (srv *Server) verifyPeer(addr *peerAddr) error {
 | 
						|
	if srv.Blacklist.Exists(addr.Pubkey) {
 | 
						|
		return errors.New("blacklisted")
 | 
						|
	}
 | 
						|
	if bytes.Equal(srv.Identity.Pubkey()[1:], addr.Pubkey) {
 | 
						|
		return newPeerError(errPubkeyForbidden, "not allowed to connect to srv")
 | 
						|
	}
 | 
						|
	srv.lock.RLock()
 | 
						|
	defer srv.lock.RUnlock()
 | 
						|
	for _, peer := range srv.peers {
 | 
						|
		if peer != nil {
 | 
						|
			id := peer.Identity()
 | 
						|
			if id != nil && bytes.Equal(id.Pubkey(), addr.Pubkey) {
 | 
						|
				return errors.New("already connected")
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// TODO replace with "Set"
 | 
						|
type Blacklist interface {
 | 
						|
	Get([]byte) (bool, error)
 | 
						|
	Put([]byte) error
 | 
						|
	Delete([]byte) error
 | 
						|
	Exists(pubkey []byte) (ok bool)
 | 
						|
}
 | 
						|
 | 
						|
type BlacklistMap struct {
 | 
						|
	blacklist map[string]bool
 | 
						|
	lock      sync.RWMutex
 | 
						|
}
 | 
						|
 | 
						|
func NewBlacklist() *BlacklistMap {
 | 
						|
	return &BlacklistMap{
 | 
						|
		blacklist: make(map[string]bool),
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (self *BlacklistMap) Get(pubkey []byte) (bool, error) {
 | 
						|
	self.lock.RLock()
 | 
						|
	defer self.lock.RUnlock()
 | 
						|
	v, ok := self.blacklist[string(pubkey)]
 | 
						|
	var err error
 | 
						|
	if !ok {
 | 
						|
		err = fmt.Errorf("not found")
 | 
						|
	}
 | 
						|
	return v, err
 | 
						|
}
 | 
						|
 | 
						|
func (self *BlacklistMap) Exists(pubkey []byte) (ok bool) {
 | 
						|
	self.lock.RLock()
 | 
						|
	defer self.lock.RUnlock()
 | 
						|
	_, ok = self.blacklist[string(pubkey)]
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
func (self *BlacklistMap) Put(pubkey []byte) error {
 | 
						|
	self.lock.RLock()
 | 
						|
	defer self.lock.RUnlock()
 | 
						|
	self.blacklist[string(pubkey)] = true
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (self *BlacklistMap) Delete(pubkey []byte) error {
 | 
						|
	self.lock.RLock()
 | 
						|
	defer self.lock.RUnlock()
 | 
						|
	delete(self.blacklist, string(pubkey))
 | 
						|
	return nil
 | 
						|
}
 |