les: separate peer into clientPeer and serverPeer (#19991)

* les: separate peer into clientPeer and serverPeer

* les: address comments
This commit is contained in:
gary rong
2020-02-26 17:41:24 +08:00
committed by GitHub
parent fadf84a752
commit 4fabd9cbd2
25 changed files with 1296 additions and 1174 deletions

View File

@ -101,13 +101,14 @@ func (h *serverHandler) stop() {
// runPeer is the p2p protocol run function for the given version.
func (h *serverHandler) runPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter) error {
peer := newPeer(int(version), h.server.config.NetworkId, false, p, newMeteredMsgWriter(rw, int(version)))
peer := newClientPeer(int(version), h.server.config.NetworkId, p, newMeteredMsgWriter(rw, int(version)))
defer peer.close()
h.wg.Add(1)
defer h.wg.Done()
return h.handle(peer)
}
func (h *serverHandler) handle(p *peer) error {
func (h *serverHandler) handle(p *clientPeer) error {
p.Log().Debug("Light Ethereum peer connected", "name", p.Name())
// Execute the LES handshake
@ -139,21 +140,21 @@ func (h *serverHandler) handle(p *peer) error {
return errFullClientPool
}
// Register the peer locally
if err := h.server.peers.Register(p); err != nil {
if err := h.server.peers.register(p); err != nil {
h.server.clientPool.disconnect(p)
p.Log().Error("Light Ethereum peer registration failed", "err", err)
return err
}
clientConnectionGauge.Update(int64(h.server.peers.Len()))
clientConnectionGauge.Update(int64(h.server.peers.len()))
var wg sync.WaitGroup // Wait group used to track all in-flight task routines.
connectedAt := mclock.Now()
defer func() {
wg.Wait() // Ensure all background task routines have exited.
h.server.peers.Unregister(p.id)
h.server.peers.unregister(p.id)
h.server.clientPool.disconnect(p)
clientConnectionGauge.Update(int64(h.server.peers.Len()))
clientConnectionGauge.Update(int64(h.server.peers.len()))
connectionTimer.Update(time.Duration(mclock.Now() - connectedAt))
}()
@ -174,7 +175,7 @@ func (h *serverHandler) handle(p *peer) error {
// handleMsg is invoked whenever an inbound message is received from a remote
// peer. The remote connection is torn down upon returning any error.
func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error {
func (h *serverHandler) handleMsg(p *clientPeer, wg *sync.WaitGroup) error {
// Read the next message from the remote peer, and ensure it's fully consumed
msg, err := p.rw.ReadMsg()
if err != nil {
@ -208,7 +209,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error {
maxCost = p.fcCosts.getMaxCost(msg.Code, reqCnt)
accepted, bufShort, priority := p.fcClient.AcceptRequest(reqID, responseCount, maxCost)
if !accepted {
p.freezeClient()
p.freeze()
p.Log().Error("Request came too early", "remaining", common.PrettyDuration(time.Duration(bufShort*1000000/p.fcParams.MinRecharge)))
p.fcClient.OneTimeCost(inSizeCost)
return false
@ -258,7 +259,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error {
h.server.clientPool.requestCost(p, realCost)
}
if reply != nil {
p.queueSend(func() {
p.mustQueueSend(func() {
if err := reply.send(bv); err != nil {
select {
case p.errCh <- err:
@ -372,8 +373,8 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error {
}
first = false
}
reply := p.ReplyBlockHeaders(req.ReqID, headers)
sendResponse(req.ReqID, query.Amount, p.ReplyBlockHeaders(req.ReqID, headers), task.done())
reply := p.replyBlockHeaders(req.ReqID, headers)
sendResponse(req.ReqID, query.Amount, p.replyBlockHeaders(req.ReqID, headers), task.done())
if metrics.EnabledExpensive {
miscOutHeaderPacketsMeter.Mark(1)
miscOutHeaderTrafficMeter.Mark(int64(reply.size()))
@ -421,7 +422,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error {
bodies = append(bodies, body)
bytes += len(body)
}
reply := p.ReplyBlockBodiesRLP(req.ReqID, bodies)
reply := p.replyBlockBodiesRLP(req.ReqID, bodies)
sendResponse(req.ReqID, uint64(reqCnt), reply, task.done())
if metrics.EnabledExpensive {
miscOutBodyPacketsMeter.Mark(1)
@ -493,7 +494,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error {
break
}
}
reply := p.ReplyCode(req.ReqID, data)
reply := p.replyCode(req.ReqID, data)
sendResponse(req.ReqID, uint64(reqCnt), reply, task.done())
if metrics.EnabledExpensive {
miscOutCodePacketsMeter.Mark(1)
@ -550,7 +551,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error {
bytes += len(encoded)
}
}
reply := p.ReplyReceiptsRLP(req.ReqID, receipts)
reply := p.replyReceiptsRLP(req.ReqID, receipts)
sendResponse(req.ReqID, uint64(reqCnt), reply, task.done())
if metrics.EnabledExpensive {
miscOutReceiptPacketsMeter.Mark(1)
@ -653,7 +654,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error {
break
}
}
reply := p.ReplyProofsV2(req.ReqID, nodes.NodeList())
reply := p.replyProofsV2(req.ReqID, nodes.NodeList())
sendResponse(req.ReqID, uint64(reqCnt), reply, task.done())
if metrics.EnabledExpensive {
miscOutTrieProofPacketsMeter.Mark(1)
@ -728,7 +729,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error {
break
}
}
reply := p.ReplyHelperTrieProofs(req.ReqID, HelperTrieResps{Proofs: nodes.NodeList(), AuxData: auxData})
reply := p.replyHelperTrieProofs(req.ReqID, HelperTrieResps{Proofs: nodes.NodeList(), AuxData: auxData})
sendResponse(req.ReqID, uint64(reqCnt), reply, task.done())
if metrics.EnabledExpensive {
miscOutHelperTriePacketsMeter.Mark(1)
@ -777,7 +778,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error {
stats[i] = h.txStatus(hash)
}
}
reply := p.ReplyTxStatus(req.ReqID, stats)
reply := p.replyTxStatus(req.ReqID, stats)
sendResponse(req.ReqID, uint64(reqCnt), reply, task.done())
if metrics.EnabledExpensive {
miscOutTxsPacketsMeter.Mark(1)
@ -814,7 +815,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error {
}
stats[i] = h.txStatus(hash)
}
reply := p.ReplyTxStatus(req.ReqID, stats)
reply := p.replyTxStatus(req.ReqID, stats)
sendResponse(req.ReqID, uint64(reqCnt), reply, task.done())
if metrics.EnabledExpensive {
miscOutTxStatusPacketsMeter.Mark(1)
@ -913,7 +914,7 @@ func (h *serverHandler) broadcastHeaders() {
for {
select {
case ev := <-headCh:
peers := h.server.peers.AllPeers()
peers := h.server.peers.allPeers()
if len(peers) == 0 {
continue
}
@ -939,14 +940,18 @@ func (h *serverHandler) broadcastHeaders() {
p := p
switch p.announceType {
case announceTypeSimple:
p.queueSend(func() { p.SendAnnounce(announce) })
if !p.queueSend(func() { p.sendAnnounce(announce) }) {
log.Debug("Drop announcement because queue is full", "number", number, "hash", hash)
}
case announceTypeSigned:
if !signed {
signedAnnounce = announce
signedAnnounce.sign(h.server.privateKey)
signed = true
}
p.queueSend(func() { p.SendAnnounce(signedAnnounce) })
if !p.queueSend(func() { p.sendAnnounce(signedAnnounce) }) {
log.Debug("Drop announcement because queue is full", "number", number, "hash", hash)
}
}
}
case <-h.closeCh: