eth: update metrics collection to handle eth/62 algos

This commit is contained in:
Péter Szilágyi
2015-08-25 13:57:49 +03:00
parent 47a7fe5d22
commit 17f65cd1e5
7 changed files with 160 additions and 36 deletions

View File

@ -347,18 +347,19 @@ func (f *Fetcher) loop() {
case notification := <-f.notify:
// A block was announced, make sure the peer isn't DOSing us
announceMeter.Mark(1)
propAnnounceInMeter.Mark(1)
count := f.announces[notification.origin] + 1
if count > hashLimit {
glog.V(logger.Debug).Infof("Peer %s: exceeded outstanding announces (%d)", notification.origin, hashLimit)
propAnnounceDOSMeter.Mark(1)
break
}
// If we have a valid block number, check that it's potentially useful
if notification.number > 0 {
if dist := int64(notification.number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
glog.V(logger.Debug).Infof("[eth/62] Peer %s: discarded announcement #%d [%x…], distance %d", notification.origin, notification.number, notification.hash[:4], dist)
discardMeter.Mark(1)
propAnnounceDropMeter.Mark(1)
break
}
}
@ -377,7 +378,7 @@ func (f *Fetcher) loop() {
case op := <-f.inject:
// A direct block insertion was requested, try and fill any pending gaps
broadcastMeter.Mark(1)
propBroadcastInMeter.Mark(1)
f.enqueue(op.origin, op.block)
case hash := <-f.done:
@ -425,10 +426,12 @@ func (f *Fetcher) loop() {
}
if fetchBlocks != nil {
// Use old eth/61 protocol to retrieve whole blocks
blockFetchMeter.Mark(int64(len(hashes)))
fetchBlocks(hashes)
} else {
// Use new eth/62 protocol to retrieve headers first
for _, hash := range hashes {
headerFetchMeter.Mark(1)
fetchHeader(hash) // Suboptimal, but protocol doesn't allow batch header retrievals
}
}
@ -467,6 +470,7 @@ func (f *Fetcher) loop() {
if f.completingHook != nil {
f.completingHook(hashes)
}
bodyFetchMeter.Mark(int64(len(hashes)))
go f.completing[hashes[0]].fetchBodies(hashes)
}
// Schedule the next fetch if blocks are still pending
@ -480,6 +484,7 @@ func (f *Fetcher) loop() {
case <-f.quit:
return
}
blockFilterInMeter.Mark(int64(len(blocks)))
explicit, download := []*types.Block{}, []*types.Block{}
for _, block := range blocks {
@ -498,6 +503,7 @@ func (f *Fetcher) loop() {
}
}
blockFilterOutMeter.Mark(int64(len(download)))
select {
case filter <- download:
case <-f.quit:
@ -520,6 +526,8 @@ func (f *Fetcher) loop() {
case <-f.quit:
return
}
headerFilterInMeter.Mark(int64(len(task.headers)))
// Split the batch of headers into unknown ones (to return to the caller),
// known incomplete ones (requiring body retrievals) and completed blocks.
unknown, incomplete, complete := []*types.Header{}, []*announce{}, []*types.Block{}
@ -544,7 +552,10 @@ func (f *Fetcher) loop() {
if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) {
glog.V(logger.Detail).Infof("[eth/62] Peer %s: block #%d [%x…] empty, skipping body retrieval", announce.origin, header.Number.Uint64(), header.Hash().Bytes()[:4])
complete = append(complete, types.NewBlockWithHeader(header))
block := types.NewBlockWithHeader(header)
block.ReceivedAt = task.time
complete = append(complete, block)
f.completing[hash] = announce
continue
}
@ -559,6 +570,7 @@ func (f *Fetcher) loop() {
unknown = append(unknown, header)
}
}
headerFilterOutMeter.Mark(int64(len(unknown)))
select {
case filter <- &headerFilterTask{headers: unknown, time: task.time}:
case <-f.quit:
@ -590,6 +602,7 @@ func (f *Fetcher) loop() {
case <-f.quit:
return
}
bodyFilterInMeter.Mark(int64(len(task.transactions)))
blocks := []*types.Block{}
for i := 0; i < len(task.transactions) && i < len(task.uncles); i++ {
@ -606,7 +619,10 @@ func (f *Fetcher) loop() {
matched = true
if f.getBlock(hash) == nil {
blocks = append(blocks, types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.uncles[i]))
block := types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.uncles[i])
block.ReceivedAt = task.time
blocks = append(blocks, block)
} else {
f.forgetHash(hash)
}
@ -621,6 +637,7 @@ func (f *Fetcher) loop() {
}
}
bodyFilterOutMeter.Mark(int64(len(task.transactions)))
select {
case filter <- task:
case <-f.quit:
@ -677,13 +694,14 @@ func (f *Fetcher) enqueue(peer string, block *types.Block) {
count := f.queues[peer] + 1
if count > blockLimit {
glog.V(logger.Debug).Infof("Peer %s: discarded block #%d [%x…], exceeded allowance (%d)", peer, block.NumberU64(), hash.Bytes()[:4], blockLimit)
propBroadcastDOSMeter.Mark(1)
f.forgetHash(hash)
return
}
// Discard any past or too distant blocks
if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
glog.V(logger.Debug).Infof("Peer %s: discarded block #%d [%x…], distance %d", peer, block.NumberU64(), hash.Bytes()[:4], dist)
discardMeter.Mark(1)
propBroadcastDropMeter.Mark(1)
f.forgetHash(hash)
return
}
@ -724,11 +742,10 @@ func (f *Fetcher) insert(peer string, block *types.Block) {
switch err := f.validateBlock(block, parent); err {
case nil:
// All ok, quickly propagate to our peers
broadcastTimer.UpdateSince(block.ReceivedAt)
propBroadcastOutTimer.UpdateSince(block.ReceivedAt)
go f.broadcastBlock(block, true)
case core.BlockFutureErr:
futureMeter.Mark(1)
// Weird future block, don't fail, but neither propagate
default:
@ -743,7 +760,7 @@ func (f *Fetcher) insert(peer string, block *types.Block) {
return
}
// If import succeeded, broadcast the block
announceTimer.UpdateSince(block.ReceivedAt)
propAnnounceOutTimer.UpdateSince(block.ReceivedAt)
go f.broadcastBlock(block, false)
// Invoke the testing hook if needed