les: create utilities as common package (#20509)

* les: move execqueue into utilities package

execqueue is a util for executing queued functions
in a serial order which is used by both les server
and les client. Move it to common package.

* les: move randselect to utilities package

weighted_random_selector is a helpful tool for randomly select
items maintained in a set but based on the item weight.

It's used anywhere is LES package, mainly by les client but will
be used in les server with very high chance. So move it into a
common package as the second step for les separation.

* les: rename to utils
This commit is contained in:
gary rong
2020-03-31 23:17:24 +08:00
committed by GitHub
parent 32d31c31af
commit f78ffc0545
7 changed files with 90 additions and 90 deletions

View File

@ -30,6 +30,7 @@ import (
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/les/utils"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discv5"
@ -129,7 +130,7 @@ type serverPool struct {
adjustStats chan poolStatAdjust
knownQueue, newQueue poolEntryQueue
knownSelect, newSelect *weightedRandomSelect
knownSelect, newSelect *utils.WeightedRandomSelect
knownSelected, newSelected int
fastDiscover bool
connCh chan *connReq
@ -152,8 +153,8 @@ func newServerPool(db ethdb.Database, ulcServers []string) *serverPool {
disconnCh: make(chan *disconnReq),
registerCh: make(chan *registerReq),
closeCh: make(chan struct{}),
knownSelect: newWeightedRandomSelect(),
newSelect: newWeightedRandomSelect(),
knownSelect: utils.NewWeightedRandomSelect(),
newSelect: utils.NewWeightedRandomSelect(),
fastDiscover: true,
trustedNodes: parseTrustedNodes(ulcServers),
}
@ -402,8 +403,8 @@ func (pool *serverPool) eventLoop() {
entry.lastConnected = addr
entry.addr = make(map[string]*poolEntryAddress)
entry.addr[addr.strKey()] = addr
entry.addrSelect = *newWeightedRandomSelect()
entry.addrSelect.update(addr)
entry.addrSelect = *utils.NewWeightedRandomSelect()
entry.addrSelect.Update(addr)
req.result <- entry
}
@ -459,7 +460,7 @@ func (pool *serverPool) findOrNewNode(node *enode.Node) *poolEntry {
entry = &poolEntry{
node: node,
addr: make(map[string]*poolEntryAddress),
addrSelect: *newWeightedRandomSelect(),
addrSelect: *utils.NewWeightedRandomSelect(),
shortRetry: shortRetryCnt,
}
pool.entries[node.ID()] = entry
@ -477,7 +478,7 @@ func (pool *serverPool) findOrNewNode(node *enode.Node) *poolEntry {
entry.addr[addr.strKey()] = addr
}
addr.lastSeen = now
entry.addrSelect.update(addr)
entry.addrSelect.Update(addr)
if !entry.known {
pool.newQueue.setLatest(entry)
}
@ -505,7 +506,7 @@ func (pool *serverPool) loadNodes() {
pool.entries[e.node.ID()] = e
if pool.trustedNodes[e.node.ID()] == nil {
pool.knownQueue.setLatest(e)
pool.knownSelect.update((*knownEntry)(e))
pool.knownSelect.Update((*knownEntry)(e))
}
}
}
@ -556,8 +557,8 @@ func (pool *serverPool) saveNodes() {
// Note that it is called by the new/known queues from which the entry has already
// been removed so removing it from the queues is not necessary.
func (pool *serverPool) removeEntry(entry *poolEntry) {
pool.newSelect.remove((*discoveredEntry)(entry))
pool.knownSelect.remove((*knownEntry)(entry))
pool.newSelect.Remove((*discoveredEntry)(entry))
pool.knownSelect.Remove((*knownEntry)(entry))
entry.removed = true
delete(pool.entries, entry.node.ID())
}
@ -586,8 +587,8 @@ func (pool *serverPool) setRetryDial(entry *poolEntry) {
// updateCheckDial is called when an entry can potentially be dialed again. It updates
// its selection weights and checks if new dials can/should be made.
func (pool *serverPool) updateCheckDial(entry *poolEntry) {
pool.newSelect.update((*discoveredEntry)(entry))
pool.knownSelect.update((*knownEntry)(entry))
pool.newSelect.Update((*discoveredEntry)(entry))
pool.knownSelect.Update((*knownEntry)(entry))
pool.checkDial()
}
@ -596,7 +597,7 @@ func (pool *serverPool) updateCheckDial(entry *poolEntry) {
func (pool *serverPool) checkDial() {
fillWithKnownSelects := !pool.fastDiscover
for pool.knownSelected < targetKnownSelect {
entry := pool.knownSelect.choose()
entry := pool.knownSelect.Choose()
if entry == nil {
fillWithKnownSelects = false
break
@ -604,7 +605,7 @@ func (pool *serverPool) checkDial() {
pool.dial((*poolEntry)(entry.(*knownEntry)), true)
}
for pool.knownSelected+pool.newSelected < targetServerCount {
entry := pool.newSelect.choose()
entry := pool.newSelect.Choose()
if entry == nil {
break
}
@ -615,7 +616,7 @@ func (pool *serverPool) checkDial() {
// is over, we probably won't find more in the near future so select more
// known entries if possible
for pool.knownSelected < targetServerCount {
entry := pool.knownSelect.choose()
entry := pool.knownSelect.Choose()
if entry == nil {
break
}
@ -636,7 +637,7 @@ func (pool *serverPool) dial(entry *poolEntry, knownSelected bool) {
} else {
pool.newSelected++
}
addr := entry.addrSelect.choose().(*poolEntryAddress)
addr := entry.addrSelect.Choose().(*poolEntryAddress)
log.Debug("Dialing new peer", "lesaddr", entry.node.ID().String()+"@"+addr.strKey(), "set", len(entry.addr), "known", knownSelected)
entry.dialed = addr
go func() {
@ -684,7 +685,7 @@ type poolEntry struct {
addr map[string]*poolEntryAddress
node *enode.Node
lastConnected, dialed *poolEntryAddress
addrSelect weightedRandomSelect
addrSelect utils.WeightedRandomSelect
lastDiscovered mclock.AbsTime
known, knownSelected, trusted bool
@ -734,8 +735,8 @@ func (e *poolEntry) DecodeRLP(s *rlp.Stream) error {
e.node = enode.NewV4(pubkey, entry.IP, int(entry.Port), int(entry.Port))
e.addr = make(map[string]*poolEntryAddress)
e.addr[addr.strKey()] = addr
e.addrSelect = *newWeightedRandomSelect()
e.addrSelect.update(addr)
e.addrSelect = *utils.NewWeightedRandomSelect()
e.addrSelect.Update(addr)
e.lastConnected = addr
e.connectStats = entry.CStat
e.delayStats = entry.DStat