Merge pull request #1934 from karalabe/polish-protocol-infos

eth, p2p, rpc/api: polish protocol info gathering
This commit is contained in:
Jeffrey Wilcke
2015-11-04 11:59:31 +01:00
11 changed files with 209 additions and 86 deletions

View File

@ -467,62 +467,10 @@ func New(config *Config) (*Ethereum, error) {
return eth, nil
}
type NodeInfo struct {
Name string
NodeUrl string
NodeID string
IP string
DiscPort int // UDP listening port for discovery protocol
TCPPort int // TCP listening port for RLPx
Td string
ListenAddr string
}
func (s *Ethereum) NodeInfo() *NodeInfo {
node := s.net.Self()
return &NodeInfo{
Name: s.Name(),
NodeUrl: node.String(),
NodeID: node.ID.String(),
IP: node.IP.String(),
DiscPort: int(node.UDP),
TCPPort: int(node.TCP),
ListenAddr: s.net.ListenAddr,
Td: s.BlockChain().GetTd(s.BlockChain().CurrentBlock().Hash()).String(),
}
}
type PeerInfo struct {
ID string
Name string
Caps string
RemoteAddress string
LocalAddress string
}
func newPeerInfo(peer *p2p.Peer) *PeerInfo {
var caps []string
for _, cap := range peer.Caps() {
caps = append(caps, cap.String())
}
return &PeerInfo{
ID: peer.ID().String(),
Name: peer.Name(),
Caps: strings.Join(caps, ", "),
RemoteAddress: peer.RemoteAddr().String(),
LocalAddress: peer.LocalAddr().String(),
}
}
// PeersInfo returns an array of PeerInfo objects describing connected peers
func (s *Ethereum) PeersInfo() (peersinfo []*PeerInfo) {
for _, peer := range s.net.Peers() {
if peer != nil {
peersinfo = append(peersinfo, newPeerInfo(peer))
}
}
return
// Network retrieves the underlying P2P network server. This should eventually
// be moved out into a protocol independent package, but for now use an accessor.
func (s *Ethereum) Network() *p2p.Server {
return s.net
}
func (s *Ethereum) ResetWithGenesisBlock(gb *types.Block) {

View File

@ -34,6 +34,7 @@ import (
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/pow"
"github.com/ethereum/go-ethereum/rlp"
)
@ -55,6 +56,8 @@ type hashFetcherFn func(common.Hash) error
type blockFetcherFn func([]common.Hash) error
type ProtocolManager struct {
networkId int
fastSync bool
txpool txPool
blockchain *core.BlockChain
@ -91,6 +94,7 @@ func NewProtocolManager(fastSync bool, networkId int, mux *event.TypeMux, txpool
}
// Create the protocol manager with the base fields
manager := &ProtocolManager{
networkId: networkId,
fastSync: fastSync,
eventMux: mux,
txpool: txpool,
@ -111,14 +115,23 @@ func NewProtocolManager(fastSync bool, networkId int, mux *event.TypeMux, txpool
// Compatible; initialise the sub-protocol
version := version // Closure for the run
manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
Name: "eth",
Name: ProtocolName,
Version: version,
Length: ProtocolLengths[i],
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
peer := manager.newPeer(int(version), networkId, p, rw)
peer := manager.newPeer(int(version), p, rw)
manager.newPeerCh <- peer
return manager.handle(peer)
},
NodeInfo: func() interface{} {
return manager.NodeInfo()
},
PeerInfo: func(id discover.NodeID) interface{} {
if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {
return p.Info()
}
return nil
},
})
}
if len(manager.SubProtocols) == 0 {
@ -188,8 +201,8 @@ func (pm *ProtocolManager) Stop() {
glog.V(logger.Info).Infoln("Ethereum protocol handler stopped")
}
func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
return newPeer(pv, nv, p, newMeteredMsgWriter(rw))
func (pm *ProtocolManager) newPeer(pv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
return newPeer(pv, p, newMeteredMsgWriter(rw))
}
// handle is the callback invoked to manage the life cycle of an eth peer. When
@ -199,7 +212,7 @@ func (pm *ProtocolManager) handle(p *peer) error {
// Execute the Ethereum handshake
td, head, genesis := pm.blockchain.Status()
if err := p.Handshake(td, head, genesis); err != nil {
if err := p.Handshake(pm.networkId, td, head, genesis); err != nil {
glog.V(logger.Debug).Infof("%v: handshake failed: %v", p, err)
return err
}
@ -730,3 +743,22 @@ func (self *ProtocolManager) txBroadcastLoop() {
self.BroadcastTx(event.Tx.Hash(), event.Tx)
}
}
// EthNodeInfo represents a short summary of the Ethereum sub-protocol metadata known
// about the host peer.
type EthNodeInfo struct {
Network int `json:"network"` // Ethereum network ID (0=Olympic, 1=Frontier, 2=Morden)
Difficulty *big.Int `json:"difficulty"` // Total difficulty of the host's blockchain
Genesis string `json:"genesis"` // SHA3 hash of the host's genesis block
Head string `json:"head"` // SHA3 hash of the host's best owned block
}
// NodeInfo retrieves some protocol metadata about the running host node.
func (self *ProtocolManager) NodeInfo() *EthNodeInfo {
return &EthNodeInfo{
Network: self.networkId,
Difficulty: self.blockchain.GetTd(self.blockchain.CurrentBlock().Hash()),
Genesis: fmt.Sprintf("%x", self.blockchain.Genesis().Hash()),
Head: fmt.Sprintf("%x", self.blockchain.CurrentBlock().Hash()),
}
}

View File

@ -117,7 +117,7 @@ func newTestPeer(name string, version int, pm *ProtocolManager, shake bool) (*te
var id discover.NodeID
rand.Read(id[:])
peer := pm.newPeer(version, NetworkId, p2p.NewPeer(id, name, nil), net)
peer := pm.newPeer(version, p2p.NewPeer(id, name, nil), net)
// Start the peer on a new thread
errc := make(chan error, 1)

View File

@ -44,38 +44,51 @@ const (
handshakeTimeout = 5 * time.Second
)
type peer struct {
*p2p.Peer
// PeerInfo represents a short summary of the Ethereum sub-protocol metadata known
// about a connected peer.
type PeerInfo struct {
Version int `json:"version"` // Ethereum protocol version negotiated
Difficulty *big.Int `json:"difficulty"` // Total difficulty of the peer's blockchain
Head string `json:"head"` // SHA3 hash of the peer's best owned block
}
type peer struct {
id string
*p2p.Peer
rw p2p.MsgReadWriter
version int // Protocol version negotiated
network int // Network ID being on
id string
head common.Hash
td *big.Int
lock sync.RWMutex
head common.Hash
td *big.Int
lock sync.RWMutex
knownTxs *set.Set // Set of transaction hashes known to be known by this peer
knownBlocks *set.Set // Set of block hashes known to be known by this peer
}
func newPeer(version, network int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
id := p.ID()
return &peer{
Peer: p,
rw: rw,
version: version,
network: network,
id: fmt.Sprintf("%x", id[:8]),
knownTxs: set.New(),
knownBlocks: set.New(),
}
}
// Info gathers and returns a collection of metadata known about a peer.
func (p *peer) Info() *PeerInfo {
return &PeerInfo{
Version: p.version,
Difficulty: p.Td(),
Head: fmt.Sprintf("%x", p.Head()),
}
}
// Head retrieves a copy of the current head (most recent) hash of the peer.
func (p *peer) Head() (hash common.Hash) {
p.lock.RLock()
@ -268,20 +281,22 @@ func (p *peer) RequestReceipts(hashes []common.Hash) error {
// Handshake executes the eth protocol handshake, negotiating version number,
// network IDs, difficulties, head and genesis blocks.
func (p *peer) Handshake(td *big.Int, head common.Hash, genesis common.Hash) error {
func (p *peer) Handshake(network int, td *big.Int, head common.Hash, genesis common.Hash) error {
// Send out own handshake in a new thread
errc := make(chan error, 2)
var status statusData // safe to read after two values have been received from errc
go func() {
errc <- p2p.Send(p.rw, StatusMsg, &statusData{
ProtocolVersion: uint32(p.version),
NetworkId: uint32(p.network),
NetworkId: uint32(network),
TD: td,
CurrentBlock: head,
GenesisBlock: genesis,
})
}()
go func() {
errc <- p.readStatus(&status, genesis)
errc <- p.readStatus(network, &status, genesis)
}()
timeout := time.NewTimer(handshakeTimeout)
defer timeout.Stop()
@ -299,7 +314,7 @@ func (p *peer) Handshake(td *big.Int, head common.Hash, genesis common.Hash) err
return nil
}
func (p *peer) readStatus(status *statusData, genesis common.Hash) (err error) {
func (p *peer) readStatus(network int, status *statusData, genesis common.Hash) (err error) {
msg, err := p.rw.ReadMsg()
if err != nil {
return err
@ -317,8 +332,8 @@ func (p *peer) readStatus(status *statusData, genesis common.Hash) (err error) {
if status.GenesisBlock != genesis {
return errResp(ErrGenesisBlockMismatch, "%x (!= %x)", status.GenesisBlock, genesis)
}
if int(status.NetworkId) != p.network {
return errResp(ErrNetworkIdMismatch, "%d (!= %d)", status.NetworkId, p.network)
if int(status.NetworkId) != network {
return errResp(ErrNetworkIdMismatch, "%d (!= %d)", status.NetworkId, network)
}
if int(status.ProtocolVersion) != p.version {
return errResp(ErrProtocolVersionMismatch, "%d (!= %d)", status.ProtocolVersion, p.version)

View File

@ -33,6 +33,9 @@ const (
eth63 = 63
)
// Official short name of the protocol used during capability negotiation.
var ProtocolName = "eth"
// Supported versions of the eth protocol (first is primary).
var ProtocolVersions = []uint{eth63, eth62, eth61}

View File

@ -40,8 +40,8 @@ func TestFastSyncDisabling(t *testing.T) {
// Sync up the two peers
io1, io2 := p2p.MsgPipe()
go pmFull.handle(pmFull.newPeer(63, NetworkId, p2p.NewPeer(discover.NodeID{}, "empty", nil), io2))
go pmEmpty.handle(pmEmpty.newPeer(63, NetworkId, p2p.NewPeer(discover.NodeID{}, "full", nil), io1))
go pmFull.handle(pmFull.newPeer(63, p2p.NewPeer(discover.NodeID{}, "empty", nil), io2))
go pmEmpty.handle(pmEmpty.newPeer(63, p2p.NewPeer(discover.NodeID{}, "full", nil), io1))
time.Sleep(250 * time.Millisecond)
pmEmpty.synchronise(pmEmpty.peers.BestPeer())