les: switch to new discv5 (#21940)

This PR enables running the new discv5 protocol in both LES client
and server mode. In client mode it mixes discv5 and dnsdisc iterators
(if both are enabled) and filters incoming ENRs for "les" tag and fork ID.
The old p2p/discv5 package and all references to it are removed.

Co-authored-by: Felix Lange <fjl@twurst.com>
This commit is contained in:
Felföldi Zsolt
2021-01-26 21:41:35 +01:00
committed by GitHub
parent 9c5729311e
commit a72fa88a0d
31 changed files with 113 additions and 6184 deletions

View File

@ -72,6 +72,7 @@ type LightEthereum struct {
netRPCService *ethapi.PublicNetAPI
p2pServer *p2p.Server
p2pConfig *p2p.Config
}
// New creates an instance of the light client.
@ -109,14 +110,11 @@ func New(stack *node.Node, config *eth.Config) (*LightEthereum, error) {
bloomIndexer: eth.NewBloomIndexer(chainDb, params.BloomBitsBlocksClient, params.HelperTrieConfirmations),
valueTracker: lpc.NewValueTracker(lespayDb, &mclock.System{}, requestList, time.Minute, 1/float64(time.Hour), 1/float64(time.Hour*100), 1/float64(time.Hour*1000)),
p2pServer: stack.Server(),
p2pConfig: &stack.Config().P2P,
}
peers.subscribe((*vtSubscription)(leth.valueTracker))
dnsdisc, err := leth.setupDiscovery()
if err != nil {
return nil, err
}
leth.serverPool = newServerPool(lespayDb, []byte("serverpool:"), leth.valueTracker, dnsdisc, time.Second, nil, &mclock.System{}, config.UltraLightServers)
leth.serverPool = newServerPool(lespayDb, []byte("serverpool:"), leth.valueTracker, time.Second, nil, &mclock.System{}, config.UltraLightServers)
peers.subscribe(leth.serverPool)
leth.dialCandidates = leth.serverPool.dialIterator
@ -299,6 +297,11 @@ func (s *LightEthereum) Protocols() []p2p.Protocol {
func (s *LightEthereum) Start() error {
log.Warn("Light client mode is an experimental feature")
discovery, err := s.setupDiscovery(s.p2pConfig)
if err != nil {
return err
}
s.serverPool.addSource(discovery)
s.serverPool.start()
// Start bloom request workers.
s.wg.Add(bloomServiceThreads)

View File

@ -33,7 +33,6 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discv5"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/params"
)
@ -42,17 +41,6 @@ func errResp(code errCode, format string, v ...interface{}) error {
return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...))
}
func lesTopic(genesisHash common.Hash, protocolVersion uint) discv5.Topic {
var name string
switch protocolVersion {
case lpv2:
name = "LES2"
default:
panic(nil)
}
return discv5.Topic(name + "@" + common.Bytes2Hex(genesisHash.Bytes()[0:8]))
}
type chainReader interface {
CurrentHeader() *types.Header
}

View File

@ -17,6 +17,8 @@
package les
import (
"github.com/ethereum/go-ethereum/core/forkid"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/dnsdisc"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rlp"
@ -25,19 +27,46 @@ import (
// lesEntry is the "les" ENR entry. This is set for LES servers only.
type lesEntry struct {
// Ignore additional fields (for forward compatibility).
Rest []rlp.RawValue `rlp:"tail"`
_ []rlp.RawValue `rlp:"tail"`
}
// ENRKey implements enr.Entry.
func (e lesEntry) ENRKey() string {
return "les"
func (lesEntry) ENRKey() string { return "les" }
// ethEntry is the "eth" ENR entry. This is redeclared here to avoid depending on package eth.
type ethEntry struct {
ForkID forkid.ID
_ []rlp.RawValue `rlp:"tail"`
}
func (ethEntry) ENRKey() string { return "eth" }
// setupDiscovery creates the node discovery source for the eth protocol.
func (eth *LightEthereum) setupDiscovery() (enode.Iterator, error) {
if len(eth.config.EthDiscoveryURLs) == 0 {
return nil, nil
func (eth *LightEthereum) setupDiscovery(cfg *p2p.Config) (enode.Iterator, error) {
it := enode.NewFairMix(0)
// Enable DNS discovery.
if len(eth.config.EthDiscoveryURLs) != 0 {
client := dnsdisc.NewClient(dnsdisc.Config{})
dns, err := client.NewIterator(eth.config.EthDiscoveryURLs...)
if err != nil {
return nil, err
}
it.AddSource(dns)
}
client := dnsdisc.NewClient(dnsdisc.Config{})
return client.NewIterator(eth.config.EthDiscoveryURLs...)
// Enable DHT.
if cfg.DiscoveryV5 && eth.p2pServer.DiscV5 != nil {
it.AddSource(eth.p2pServer.DiscV5.RandomNodes())
}
forkFilter := forkid.NewFilter(eth.blockchain)
iterator := enode.Filter(it, func(n *enode.Node) bool { return nodeIsServer(forkFilter, n) })
return iterator, nil
}
// nodeIsServer checks whether n is an LES server node.
func nodeIsServer(forkFilter forkid.Filter, n *enode.Node) bool {
var les lesEntry
var eth ethEntry
return n.Load(&les) == nil && n.Load(&eth) == nil && forkFilter(eth.ForkID) == nil
}

View File

@ -29,7 +29,6 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discv5"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/ethereum/go-ethereum/p2p/nodestate"
@ -58,7 +57,6 @@ type LesServer struct {
archiveMode bool // Flag whether the ethereum node runs in archive mode.
handler *serverHandler
broadcaster *broadcaster
lesTopics []discv5.Topic
privateKey *ecdsa.PrivateKey
// Flow control and capacity management
@ -77,11 +75,6 @@ type LesServer struct {
func NewLesServer(node *node.Node, e *eth.Ethereum, config *eth.Config) (*LesServer, error) {
ns := nodestate.NewNodeStateMachine(nil, nil, mclock.System{}, serverSetup)
// Collect les protocol version information supported by local node.
lesTopics := make([]discv5.Topic, len(AdvertiseProtocolVersions))
for i, pv := range AdvertiseProtocolVersions {
lesTopics[i] = lesTopic(e.BlockChain().Genesis().Hash(), pv)
}
// Calculate the number of threads used to service the light client
// requests based on the user-specified value.
threads := config.LightServ * 4 / 100
@ -103,7 +96,6 @@ func NewLesServer(node *node.Node, e *eth.Ethereum, config *eth.Config) (*LesSer
ns: ns,
archiveMode: e.ArchiveMode(),
broadcaster: newBroadcaster(ns),
lesTopics: lesTopics,
fcManager: flowcontrol.NewClientManager(nil, &mclock.System{}),
servingQueue: newServingQueue(int64(time.Millisecond*10), float64(config.LightServ)/100),
threadsBusy: config.LightServ/100 + 1,
@ -203,19 +195,6 @@ func (s *LesServer) Start() error {
s.wg.Add(1)
go s.capacityManagement()
if s.p2pSrv.DiscV5 != nil {
for _, topic := range s.lesTopics {
topic := topic
go func() {
logger := log.New("topic", topic)
logger.Info("Starting topic registration")
defer logger.Info("Terminated topic registration")
s.p2pSrv.DiscV5.RegisterTopic(topic, s.closeCh)
}()
}
}
return nil
}

View File

@ -131,7 +131,7 @@ var (
)
// newServerPool creates a new server pool
func newServerPool(db ethdb.KeyValueStore, dbKey []byte, vt *lpc.ValueTracker, discovery enode.Iterator, mixTimeout time.Duration, query queryFunc, clock mclock.Clock, trustedURLs []string) *serverPool {
func newServerPool(db ethdb.KeyValueStore, dbKey []byte, vt *lpc.ValueTracker, mixTimeout time.Duration, query queryFunc, clock mclock.Clock, trustedURLs []string) *serverPool {
s := &serverPool{
db: db,
clock: clock,
@ -147,9 +147,6 @@ func newServerPool(db ethdb.KeyValueStore, dbKey []byte, vt *lpc.ValueTracker, d
alwaysConnect := lpc.NewQueueIterator(s.ns, sfAlwaysConnect, sfDisableSelection, true, nil)
s.mixSources = append(s.mixSources, knownSelector)
s.mixSources = append(s.mixSources, alwaysConnect)
if discovery != nil {
s.mixSources = append(s.mixSources, discovery)
}
iter := enode.Iterator(s.mixer)
if query != nil {
@ -175,6 +172,13 @@ func newServerPool(db ethdb.KeyValueStore, dbKey []byte, vt *lpc.ValueTracker, d
return s
}
// addSource adds a node discovery source to the server pool (should be called before start)
func (s *serverPool) addSource(source enode.Iterator) {
if source != nil {
s.mixSources = append(s.mixSources, source)
}
}
// addPreNegFilter installs a node filter mechanism that performs a pre-negotiation query.
// Nodes that are filtered out and does not appear on the output iterator are put back
// into redialWait state.

View File

@ -145,7 +145,8 @@ func (s *serverPoolTest) start() {
}
s.vt = lpc.NewValueTracker(s.db, s.clock, requestList, time.Minute, 1/float64(time.Hour), 1/float64(time.Hour*100), 1/float64(time.Hour*1000))
s.sp = newServerPool(s.db, []byte("serverpool:"), s.vt, s.input, 0, testQuery, s.clock, s.trusted)
s.sp = newServerPool(s.db, []byte("serverpool:"), s.vt, 0, testQuery, s.clock, s.trusted)
s.sp.addSource(s.input)
s.sp.validSchemes = enode.ValidSchemesForTesting
s.sp.unixTime = func() int64 { return int64(s.clock.Now()) / int64(time.Second) }
s.disconnect = make(map[int][]int)