swarm: bootnode-mode, new bootnodes and no p2p package discovery (#18498)
(cherry picked from commit bbd120354a)
			
			
This commit is contained in:
		
				
					committed by
					
						
						Rafael Matias
					
				
			
			
				
	
			
			
			
						parent
						
							878aa58ec6
						
					
				
				
					commit
					4976fcc91a
				
			@@ -66,6 +66,7 @@ type Config struct {
 | 
			
		||||
	DeliverySkipCheck    bool
 | 
			
		||||
	MaxStreamPeerServers int
 | 
			
		||||
	LightNodeEnabled     bool
 | 
			
		||||
	BootnodeMode         bool
 | 
			
		||||
	SyncUpdateDelay      time.Duration
 | 
			
		||||
	SwapAPI              string
 | 
			
		||||
	Cors                 string
 | 
			
		||||
 
 | 
			
		||||
@@ -279,7 +279,7 @@ func (k *Kademlia) SuggestPeer() (suggestedPeer *BzzAddr, saturationDepth int, c
 | 
			
		||||
	return suggestedPeer, 0, false
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// On inserts  the peer as a kademlia peer into the live peers
 | 
			
		||||
// On inserts the peer as a kademlia peer into the live peers
 | 
			
		||||
func (k *Kademlia) On(p *Peer) (uint8, bool) {
 | 
			
		||||
	k.lock.Lock()
 | 
			
		||||
	defer k.lock.Unlock()
 | 
			
		||||
 
 | 
			
		||||
@@ -67,6 +67,7 @@ type BzzConfig struct {
 | 
			
		||||
	HiveParams   *HiveParams
 | 
			
		||||
	NetworkID    uint64
 | 
			
		||||
	LightNode    bool
 | 
			
		||||
	BootnodeMode bool
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Bzz is the swarm protocol bundle
 | 
			
		||||
@@ -87,7 +88,7 @@ type Bzz struct {
 | 
			
		||||
// * overlay driver
 | 
			
		||||
// * peer store
 | 
			
		||||
func NewBzz(config *BzzConfig, kad *Kademlia, store state.Store, streamerSpec *protocols.Spec, streamerRun func(*BzzPeer) error) *Bzz {
 | 
			
		||||
	return &Bzz{
 | 
			
		||||
	bzz := &Bzz{
 | 
			
		||||
		Hive:         NewHive(config.HiveParams, kad, store),
 | 
			
		||||
		NetworkID:    config.NetworkID,
 | 
			
		||||
		LightNode:    config.LightNode,
 | 
			
		||||
@@ -96,6 +97,13 @@ func NewBzz(config *BzzConfig, kad *Kademlia, store state.Store, streamerSpec *p
 | 
			
		||||
		streamerRun:  streamerRun,
 | 
			
		||||
		streamerSpec: streamerSpec,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if config.BootnodeMode {
 | 
			
		||||
		bzz.streamerRun = nil
 | 
			
		||||
		bzz.streamerSpec = nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return bzz
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// UpdateLocalAddr updates underlayaddress of the running node
 | 
			
		||||
 
 | 
			
		||||
@@ -255,8 +255,15 @@ func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (
 | 
			
		||||
				return true
 | 
			
		||||
			}
 | 
			
		||||
			sp = d.getPeer(id)
 | 
			
		||||
			// sp is nil, when we encounter a peer that is not registered for delivery, i.e. doesn't support the `stream` protocol
 | 
			
		||||
			if sp == nil {
 | 
			
		||||
				//log.Warn("Delivery.RequestFromPeers: peer not found", "id", id)
 | 
			
		||||
				return true
 | 
			
		||||
			}
 | 
			
		||||
			// nodes that do not provide stream protocol
 | 
			
		||||
			// should not be requested, e.g. bootnodes
 | 
			
		||||
			if !p.HasCap("stream") {
 | 
			
		||||
				// TODO: if we have no errors, delete this if
 | 
			
		||||
				log.Error("Delivery.RequestFromPeers: peer doesn't have stream cap. we should have returned at sp == nil")
 | 
			
		||||
				return true
 | 
			
		||||
			}
 | 
			
		||||
			spID = &id
 | 
			
		||||
 
 | 
			
		||||
@@ -285,7 +285,7 @@ func TestRequestFromPeers(t *testing.T) {
 | 
			
		||||
	addr := network.RandomAddr()
 | 
			
		||||
	to := network.NewKademlia(addr.OAddr, network.NewKadParams())
 | 
			
		||||
	delivery := NewDelivery(to, nil)
 | 
			
		||||
	protocolsPeer := protocols.NewPeer(p2p.NewPeer(dummyPeerID, "dummy", nil), nil, nil)
 | 
			
		||||
	protocolsPeer := protocols.NewPeer(p2p.NewPeer(dummyPeerID, "dummy", []p2p.Cap{{Name: "stream"}}), nil, nil)
 | 
			
		||||
	peer := network.NewPeer(&network.BzzPeer{
 | 
			
		||||
		BzzAddr:   network.RandomAddr(),
 | 
			
		||||
		LightNode: false,
 | 
			
		||||
 
 | 
			
		||||
@@ -516,6 +516,11 @@ func (r *Registry) requestPeerSubscriptions(kad *network.Kademlia, subs map[enod
 | 
			
		||||
	// nil as base takes the node's base; we need to pass 255 as `EachConn` runs
 | 
			
		||||
	// from deepest bins backwards
 | 
			
		||||
	kad.EachConn(nil, 255, func(p *network.Peer, po int) bool {
 | 
			
		||||
		// nodes that do not provide stream protocol
 | 
			
		||||
		// should not be subscribed, e.g. bootnodes
 | 
			
		||||
		if !p.HasCap("stream") {
 | 
			
		||||
			return true
 | 
			
		||||
		}
 | 
			
		||||
		//if the peer's bin is shallower than the kademlia depth,
 | 
			
		||||
		//only the peer's bin should be subscribed
 | 
			
		||||
		if po < kadDepth {
 | 
			
		||||
 
 | 
			
		||||
@@ -201,8 +201,6 @@ func (pc *PyramidChunker) decrementWorkerCount() {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (pc *PyramidChunker) Split(ctx context.Context) (k Address, wait func(context.Context) error, err error) {
 | 
			
		||||
	log.Debug("pyramid.chunker: Split()")
 | 
			
		||||
 | 
			
		||||
	pc.wg.Add(1)
 | 
			
		||||
	pc.prepareChunks(ctx, false)
 | 
			
		||||
 | 
			
		||||
@@ -235,7 +233,6 @@ func (pc *PyramidChunker) Split(ctx context.Context) (k Address, wait func(conte
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (pc *PyramidChunker) Append(ctx context.Context) (k Address, wait func(context.Context) error, err error) {
 | 
			
		||||
	log.Debug("pyramid.chunker: Append()")
 | 
			
		||||
	// Load the right most unfinished tree chunks in every level
 | 
			
		||||
	pc.loadTree(ctx)
 | 
			
		||||
 | 
			
		||||
@@ -283,8 +280,6 @@ func (pc *PyramidChunker) processor(ctx context.Context, id int64) {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (pc *PyramidChunker) processChunk(ctx context.Context, id int64, job *chunkJob) {
 | 
			
		||||
	log.Debug("pyramid.chunker: processChunk()", "id", id)
 | 
			
		||||
 | 
			
		||||
	ref, err := pc.putter.Put(ctx, job.chunk)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		select {
 | 
			
		||||
@@ -301,7 +296,6 @@ func (pc *PyramidChunker) processChunk(ctx context.Context, id int64, job *chunk
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (pc *PyramidChunker) loadTree(ctx context.Context) error {
 | 
			
		||||
	log.Debug("pyramid.chunker: loadTree()")
 | 
			
		||||
	// Get the root chunk to get the total size
 | 
			
		||||
	chunkData, err := pc.getter.Get(ctx, Reference(pc.key))
 | 
			
		||||
	if err != nil {
 | 
			
		||||
@@ -386,7 +380,6 @@ func (pc *PyramidChunker) loadTree(ctx context.Context) error {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (pc *PyramidChunker) prepareChunks(ctx context.Context, isAppend bool) {
 | 
			
		||||
	log.Debug("pyramid.chunker: prepareChunks", "isAppend", isAppend)
 | 
			
		||||
	defer pc.wg.Done()
 | 
			
		||||
 | 
			
		||||
	chunkWG := &sync.WaitGroup{}
 | 
			
		||||
 
 | 
			
		||||
@@ -84,12 +84,11 @@ type Swarm struct {
 | 
			
		||||
	tracerClose io.Closer
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// creates a new swarm service instance
 | 
			
		||||
// NewSwarm creates a new swarm service instance
 | 
			
		||||
// implements node.Service
 | 
			
		||||
// If mockStore is not nil, it will be used as the storage for chunk data.
 | 
			
		||||
// MockStore should be used only for testing.
 | 
			
		||||
func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err error) {
 | 
			
		||||
 | 
			
		||||
	if bytes.Equal(common.FromHex(config.PublicKey), storage.ZeroAddr) {
 | 
			
		||||
		return nil, fmt.Errorf("empty public key")
 | 
			
		||||
	}
 | 
			
		||||
@@ -116,10 +115,11 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e
 | 
			
		||||
	config.HiveParams.Discovery = true
 | 
			
		||||
 | 
			
		||||
	bzzconfig := &network.BzzConfig{
 | 
			
		||||
		NetworkID:   config.NetworkID,
 | 
			
		||||
		OverlayAddr: common.FromHex(config.BzzKey),
 | 
			
		||||
		HiveParams:  config.HiveParams,
 | 
			
		||||
		LightNode:   config.LightNodeEnabled,
 | 
			
		||||
		NetworkID:    config.NetworkID,
 | 
			
		||||
		OverlayAddr:  common.FromHex(config.BzzKey),
 | 
			
		||||
		HiveParams:   config.HiveParams,
 | 
			
		||||
		LightNode:    config.LightNodeEnabled,
 | 
			
		||||
		BootnodeMode: config.BootnodeMode,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	self.stateStore, err = state.NewDBStore(filepath.Join(config.Path, "state-store.db"))
 | 
			
		||||
@@ -455,12 +455,16 @@ func (self *Swarm) Stop() error {
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// implements the node.Service interface
 | 
			
		||||
func (self *Swarm) Protocols() (protos []p2p.Protocol) {
 | 
			
		||||
	protos = append(protos, self.bzz.Protocols()...)
 | 
			
		||||
// Protocols implements the node.Service interface
 | 
			
		||||
func (s *Swarm) Protocols() (protos []p2p.Protocol) {
 | 
			
		||||
	if s.config.BootnodeMode {
 | 
			
		||||
		protos = append(protos, s.bzz.Protocols()...)
 | 
			
		||||
	} else {
 | 
			
		||||
		protos = append(protos, s.bzz.Protocols()...)
 | 
			
		||||
 | 
			
		||||
	if self.ps != nil {
 | 
			
		||||
		protos = append(protos, self.ps.Protocols()...)
 | 
			
		||||
		if s.ps != nil {
 | 
			
		||||
			protos = append(protos, s.ps.Protocols()...)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user