les: implement ultralight client (#16904)

For more information about this light client mode, read
https://hackmd.io/s/HJy7jjZpm
This commit is contained in:
b00ris
2019-01-24 14:18:26 +03:00
committed by Felix Lange
parent b8f9b3779f
commit 769657060e
23 changed files with 1288 additions and 179 deletions

View File

@ -126,22 +126,22 @@ type serverPool struct {
discNodes chan *enode.Node
discLookups chan bool
trustedNodes map[enode.ID]*enode.Node
entries map[enode.ID]*poolEntry
timeout, enableRetry chan *poolEntry
adjustStats chan poolStatAdjust
connCh chan *connReq
disconnCh chan *disconnReq
registerCh chan *registerReq
knownQueue, newQueue poolEntryQueue
knownSelect, newSelect *weightedRandomSelect
knownSelected, newSelected int
fastDiscover bool
connCh chan *connReq
disconnCh chan *disconnReq
registerCh chan *registerReq
}
// newServerPool creates a new serverPool instance
func newServerPool(db ethdb.Database, quit chan struct{}, wg *sync.WaitGroup) *serverPool {
func newServerPool(db ethdb.Database, quit chan struct{}, wg *sync.WaitGroup, trustedNodes []string) *serverPool {
pool := &serverPool{
db: db,
quit: quit,
@ -156,7 +156,9 @@ func newServerPool(db ethdb.Database, quit chan struct{}, wg *sync.WaitGroup) *s
knownSelect: newWeightedRandomSelect(),
newSelect: newWeightedRandomSelect(),
fastDiscover: true,
trustedNodes: parseTrustedNodes(trustedNodes),
}
pool.knownQueue = newPoolEntryQueue(maxKnownEntries, pool.removeEntry)
pool.newQueue = newPoolEntryQueue(maxNewEntries, pool.removeEntry)
return pool
@ -168,6 +170,7 @@ func (pool *serverPool) start(server *p2p.Server, topic discv5.Topic) {
pool.dbKey = append([]byte("serverPool/"), []byte(topic)...)
pool.wg.Add(1)
pool.loadNodes()
pool.connectToTrustedNodes()
if pool.server.DiscV5 != nil {
pool.discSetPeriod = make(chan time.Duration, 1)
@ -337,8 +340,10 @@ func (pool *serverPool) eventLoop() {
}
case node := <-pool.discNodes:
entry := pool.findOrNewNode(node)
pool.updateCheckDial(entry)
if pool.trustedNodes[node.ID()] == nil {
entry := pool.findOrNewNode(node)
pool.updateCheckDial(entry)
}
case conv := <-pool.discLookups:
if conv {
@ -355,29 +360,34 @@ func (pool *serverPool) eventLoop() {
}
case req := <-pool.connCh:
// Handle peer connection requests.
entry := pool.entries[req.p.ID()]
if entry == nil {
entry = pool.findOrNewNode(req.node)
}
if entry.state == psConnected || entry.state == psRegistered {
if pool.trustedNodes[req.p.ID()] != nil {
// ignore trusted nodes
req.result <- nil
continue
} else {
// Handle peer connection requests.
entry := pool.entries[req.p.ID()]
if entry == nil {
entry = pool.findOrNewNode(req.node)
}
if entry.state == psConnected || entry.state == psRegistered {
req.result <- nil
continue
}
pool.connWg.Add(1)
entry.peer = req.p
entry.state = psConnected
addr := &poolEntryAddress{
ip: req.node.IP(),
port: uint16(req.node.TCP()),
lastSeen: mclock.Now(),
}
entry.lastConnected = addr
entry.addr = make(map[string]*poolEntryAddress)
entry.addr[addr.strKey()] = addr
entry.addrSelect = *newWeightedRandomSelect()
entry.addrSelect.update(addr)
req.result <- entry
}
pool.connWg.Add(1)
entry.peer = req.p
entry.state = psConnected
addr := &poolEntryAddress{
ip: req.node.IP(),
port: uint16(req.node.TCP()),
lastSeen: mclock.Now(),
}
entry.lastConnected = addr
entry.addr = make(map[string]*poolEntryAddress)
entry.addr[addr.strKey()] = addr
entry.addrSelect = *newWeightedRandomSelect()
entry.addrSelect.update(addr)
req.result <- entry
case req := <-pool.registerCh:
// Handle peer registration requests.
@ -470,11 +480,42 @@ func (pool *serverPool) loadNodes() {
"response", fmt.Sprintf("%v/%v", time.Duration(e.responseStats.avg), e.responseStats.weight),
"timeout", fmt.Sprintf("%v/%v", e.timeoutStats.avg, e.timeoutStats.weight))
pool.entries[e.node.ID()] = e
pool.knownQueue.setLatest(e)
pool.knownSelect.update((*knownEntry)(e))
if pool.trustedNodes[e.node.ID()] == nil {
pool.knownQueue.setLatest(e)
pool.knownSelect.update((*knownEntry)(e))
}
}
}
// connectToTrustedNodes adds trusted server nodes as static trusted peers.
//
// Note: trusted nodes are not handled by the server pool logic, they are not
// added to either the known or new selection pools. They are connected/reconnected
// by p2p.Server whenever possible.
func (pool *serverPool) connectToTrustedNodes() {
//connect to trusted nodes
for _, node := range pool.trustedNodes {
pool.server.AddTrustedPeer(node)
pool.server.AddPeer(node)
log.Debug("Added trusted node", "id", node.ID().String())
}
}
// parseTrustedNodes returns valid and parsed enodes
func parseTrustedNodes(trustedNodes []string) map[enode.ID]*enode.Node {
nodes := make(map[enode.ID]*enode.Node)
for _, node := range trustedNodes {
node, err := enode.ParseV4(node)
if err != nil {
log.Warn("Trusted node URL invalid", "enode", node, "err", err)
continue
}
nodes[node.ID()] = node
}
return nodes
}
// saveNodes saves known nodes and their statistics into the database. Nodes are
// ordered from least to most recently connected.
func (pool *serverPool) saveNodes() {