swarm: bootnode-mode, new bootnodes and no p2p package discovery (#18498)
This commit is contained in:
committed by
GitHub
parent
ecb781297b
commit
bbd120354a
@ -66,6 +66,7 @@ type Config struct {
|
||||
DeliverySkipCheck bool
|
||||
MaxStreamPeerServers int
|
||||
LightNodeEnabled bool
|
||||
BootnodeMode bool
|
||||
SyncUpdateDelay time.Duration
|
||||
SwapAPI string
|
||||
Cors string
|
||||
|
@ -279,7 +279,7 @@ func (k *Kademlia) SuggestPeer() (suggestedPeer *BzzAddr, saturationDepth int, c
|
||||
return suggestedPeer, 0, false
|
||||
}
|
||||
|
||||
// On inserts the peer as a kademlia peer into the live peers
|
||||
// On inserts the peer as a kademlia peer into the live peers
|
||||
func (k *Kademlia) On(p *Peer) (uint8, bool) {
|
||||
k.lock.Lock()
|
||||
defer k.lock.Unlock()
|
||||
|
@ -67,6 +67,7 @@ type BzzConfig struct {
|
||||
HiveParams *HiveParams
|
||||
NetworkID uint64
|
||||
LightNode bool
|
||||
BootnodeMode bool
|
||||
}
|
||||
|
||||
// Bzz is the swarm protocol bundle
|
||||
@ -87,7 +88,7 @@ type Bzz struct {
|
||||
// * overlay driver
|
||||
// * peer store
|
||||
func NewBzz(config *BzzConfig, kad *Kademlia, store state.Store, streamerSpec *protocols.Spec, streamerRun func(*BzzPeer) error) *Bzz {
|
||||
return &Bzz{
|
||||
bzz := &Bzz{
|
||||
Hive: NewHive(config.HiveParams, kad, store),
|
||||
NetworkID: config.NetworkID,
|
||||
LightNode: config.LightNode,
|
||||
@ -96,6 +97,13 @@ func NewBzz(config *BzzConfig, kad *Kademlia, store state.Store, streamerSpec *p
|
||||
streamerRun: streamerRun,
|
||||
streamerSpec: streamerSpec,
|
||||
}
|
||||
|
||||
if config.BootnodeMode {
|
||||
bzz.streamerRun = nil
|
||||
bzz.streamerSpec = nil
|
||||
}
|
||||
|
||||
return bzz
|
||||
}
|
||||
|
||||
// UpdateLocalAddr updates underlayaddress of the running node
|
||||
|
@ -255,8 +255,15 @@ func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (
|
||||
return true
|
||||
}
|
||||
sp = d.getPeer(id)
|
||||
// sp is nil, when we encounter a peer that is not registered for delivery, i.e. doesn't support the `stream` protocol
|
||||
if sp == nil {
|
||||
//log.Warn("Delivery.RequestFromPeers: peer not found", "id", id)
|
||||
return true
|
||||
}
|
||||
// nodes that do not provide stream protocol
|
||||
// should not be requested, e.g. bootnodes
|
||||
if !p.HasCap("stream") {
|
||||
// TODO: if we have no errors, delete this if
|
||||
log.Error("Delivery.RequestFromPeers: peer doesn't have stream cap. we should have returned at sp == nil")
|
||||
return true
|
||||
}
|
||||
spID = &id
|
||||
|
@ -285,7 +285,7 @@ func TestRequestFromPeers(t *testing.T) {
|
||||
addr := network.RandomAddr()
|
||||
to := network.NewKademlia(addr.OAddr, network.NewKadParams())
|
||||
delivery := NewDelivery(to, nil)
|
||||
protocolsPeer := protocols.NewPeer(p2p.NewPeer(dummyPeerID, "dummy", nil), nil, nil)
|
||||
protocolsPeer := protocols.NewPeer(p2p.NewPeer(dummyPeerID, "dummy", []p2p.Cap{{Name: "stream"}}), nil, nil)
|
||||
peer := network.NewPeer(&network.BzzPeer{
|
||||
BzzAddr: network.RandomAddr(),
|
||||
LightNode: false,
|
||||
|
@ -516,6 +516,11 @@ func (r *Registry) requestPeerSubscriptions(kad *network.Kademlia, subs map[enod
|
||||
// nil as base takes the node's base; we need to pass 255 as `EachConn` runs
|
||||
// from deepest bins backwards
|
||||
kad.EachConn(nil, 255, func(p *network.Peer, po int) bool {
|
||||
// nodes that do not provide stream protocol
|
||||
// should not be subscribed, e.g. bootnodes
|
||||
if !p.HasCap("stream") {
|
||||
return true
|
||||
}
|
||||
//if the peer's bin is shallower than the kademlia depth,
|
||||
//only the peer's bin should be subscribed
|
||||
if po < kadDepth {
|
||||
|
@ -201,8 +201,6 @@ func (pc *PyramidChunker) decrementWorkerCount() {
|
||||
}
|
||||
|
||||
func (pc *PyramidChunker) Split(ctx context.Context) (k Address, wait func(context.Context) error, err error) {
|
||||
log.Debug("pyramid.chunker: Split()")
|
||||
|
||||
pc.wg.Add(1)
|
||||
pc.prepareChunks(ctx, false)
|
||||
|
||||
@ -235,7 +233,6 @@ func (pc *PyramidChunker) Split(ctx context.Context) (k Address, wait func(conte
|
||||
}
|
||||
|
||||
func (pc *PyramidChunker) Append(ctx context.Context) (k Address, wait func(context.Context) error, err error) {
|
||||
log.Debug("pyramid.chunker: Append()")
|
||||
// Load the right most unfinished tree chunks in every level
|
||||
pc.loadTree(ctx)
|
||||
|
||||
@ -283,8 +280,6 @@ func (pc *PyramidChunker) processor(ctx context.Context, id int64) {
|
||||
}
|
||||
|
||||
func (pc *PyramidChunker) processChunk(ctx context.Context, id int64, job *chunkJob) {
|
||||
log.Debug("pyramid.chunker: processChunk()", "id", id)
|
||||
|
||||
ref, err := pc.putter.Put(ctx, job.chunk)
|
||||
if err != nil {
|
||||
select {
|
||||
@ -301,7 +296,6 @@ func (pc *PyramidChunker) processChunk(ctx context.Context, id int64, job *chunk
|
||||
}
|
||||
|
||||
func (pc *PyramidChunker) loadTree(ctx context.Context) error {
|
||||
log.Debug("pyramid.chunker: loadTree()")
|
||||
// Get the root chunk to get the total size
|
||||
chunkData, err := pc.getter.Get(ctx, Reference(pc.key))
|
||||
if err != nil {
|
||||
@ -386,7 +380,6 @@ func (pc *PyramidChunker) loadTree(ctx context.Context) error {
|
||||
}
|
||||
|
||||
func (pc *PyramidChunker) prepareChunks(ctx context.Context, isAppend bool) {
|
||||
log.Debug("pyramid.chunker: prepareChunks", "isAppend", isAppend)
|
||||
defer pc.wg.Done()
|
||||
|
||||
chunkWG := &sync.WaitGroup{}
|
||||
|
@ -84,12 +84,11 @@ type Swarm struct {
|
||||
tracerClose io.Closer
|
||||
}
|
||||
|
||||
// creates a new swarm service instance
|
||||
// NewSwarm creates a new swarm service instance
|
||||
// implements node.Service
|
||||
// If mockStore is not nil, it will be used as the storage for chunk data.
|
||||
// MockStore should be used only for testing.
|
||||
func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err error) {
|
||||
|
||||
if bytes.Equal(common.FromHex(config.PublicKey), storage.ZeroAddr) {
|
||||
return nil, fmt.Errorf("empty public key")
|
||||
}
|
||||
@ -116,10 +115,11 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e
|
||||
config.HiveParams.Discovery = true
|
||||
|
||||
bzzconfig := &network.BzzConfig{
|
||||
NetworkID: config.NetworkID,
|
||||
OverlayAddr: common.FromHex(config.BzzKey),
|
||||
HiveParams: config.HiveParams,
|
||||
LightNode: config.LightNodeEnabled,
|
||||
NetworkID: config.NetworkID,
|
||||
OverlayAddr: common.FromHex(config.BzzKey),
|
||||
HiveParams: config.HiveParams,
|
||||
LightNode: config.LightNodeEnabled,
|
||||
BootnodeMode: config.BootnodeMode,
|
||||
}
|
||||
|
||||
self.stateStore, err = state.NewDBStore(filepath.Join(config.Path, "state-store.db"))
|
||||
@ -455,12 +455,16 @@ func (self *Swarm) Stop() error {
|
||||
return err
|
||||
}
|
||||
|
||||
// implements the node.Service interface
|
||||
func (self *Swarm) Protocols() (protos []p2p.Protocol) {
|
||||
protos = append(protos, self.bzz.Protocols()...)
|
||||
// Protocols implements the node.Service interface
|
||||
func (s *Swarm) Protocols() (protos []p2p.Protocol) {
|
||||
if s.config.BootnodeMode {
|
||||
protos = append(protos, s.bzz.Protocols()...)
|
||||
} else {
|
||||
protos = append(protos, s.bzz.Protocols()...)
|
||||
|
||||
if self.ps != nil {
|
||||
protos = append(protos, self.ps.Protocols()...)
|
||||
if s.ps != nil {
|
||||
protos = append(protos, s.ps.Protocols()...)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
Reference in New Issue
Block a user