Swarm accounting (#18050)
* swarm: completed 1st phase of swap accounting * swarm: swap accounting for swarm with p2p accounting * swarm/swap: addressed PR comments * swarm/swap: ignore ErrNotFound on stateStore.Get() * swarm/swap: GetPeerBalance test; add TODO for chequebook API check * swarm/network/stream: fix NewRegistry calls with new arguments * swarm/swap: address @justelad's PR comments
This commit is contained in:
@@ -114,7 +114,7 @@ func newStreamerTester(t *testing.T, registryOptions *RegistryOptions) (*p2ptest
|
||||
|
||||
delivery := NewDelivery(to, netStore)
|
||||
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
|
||||
streamer := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), registryOptions)
|
||||
streamer := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), registryOptions, nil)
|
||||
teardown := func() {
|
||||
streamer.Close()
|
||||
removeDataDir()
|
||||
|
@@ -290,7 +290,7 @@ func TestRequestFromPeers(t *testing.T) {
|
||||
Peer: protocolsPeer,
|
||||
}, to)
|
||||
to.On(peer)
|
||||
r := NewRegistry(addr.ID(), delivery, nil, nil, nil)
|
||||
r := NewRegistry(addr.ID(), delivery, nil, nil, nil, nil)
|
||||
|
||||
// an empty priorityQueue has to be created to prevent a goroutine being called after the test has finished
|
||||
sp := &Peer{
|
||||
@@ -331,7 +331,7 @@ func TestRequestFromPeersWithLightNode(t *testing.T) {
|
||||
Peer: protocolsPeer,
|
||||
}, to)
|
||||
to.On(peer)
|
||||
r := NewRegistry(addr.ID(), delivery, nil, nil, nil)
|
||||
r := NewRegistry(addr.ID(), delivery, nil, nil, nil, nil)
|
||||
// an empty priorityQueue has to be created to prevent a goroutine being called after the test has finished
|
||||
sp := &Peer{
|
||||
Peer: protocolsPeer,
|
||||
@@ -480,7 +480,7 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
|
||||
SkipCheck: skipCheck,
|
||||
Syncing: SyncingDisabled,
|
||||
Retrieval: RetrievalEnabled,
|
||||
})
|
||||
}, nil)
|
||||
bucket.Store(bucketKeyRegistry, r)
|
||||
|
||||
fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
|
||||
@@ -655,7 +655,7 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skip
|
||||
Syncing: SyncingDisabled,
|
||||
Retrieval: RetrievalDisabled,
|
||||
SyncUpdateDelay: 0,
|
||||
})
|
||||
}, nil)
|
||||
|
||||
fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
|
||||
bucket.Store(bucketKeyFileStore, fileStore)
|
||||
|
@@ -84,7 +84,7 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
|
||||
Retrieval: RetrievalDisabled,
|
||||
Syncing: SyncingRegisterOnly,
|
||||
SkipCheck: skipCheck,
|
||||
})
|
||||
}, nil)
|
||||
bucket.Store(bucketKeyRegistry, r)
|
||||
|
||||
r.RegisterClientFunc(externalStreamName, func(p *Peer, t string, live bool) (Client, error) {
|
||||
|
@@ -130,7 +130,7 @@ func retrievalStreamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s no
|
||||
Retrieval: RetrievalEnabled,
|
||||
Syncing: SyncingAutoSubscribe,
|
||||
SyncUpdateDelay: 3 * time.Second,
|
||||
})
|
||||
}, nil)
|
||||
|
||||
fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
|
||||
bucket.Store(bucketKeyFileStore, fileStore)
|
||||
|
@@ -166,7 +166,7 @@ func streamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Servic
|
||||
Retrieval: RetrievalDisabled,
|
||||
Syncing: SyncingAutoSubscribe,
|
||||
SyncUpdateDelay: 3 * time.Second,
|
||||
})
|
||||
}, nil)
|
||||
|
||||
bucket.Store(bucketKeyRegistry, r)
|
||||
|
||||
@@ -360,7 +360,7 @@ func testSyncingViaDirectSubscribe(t *testing.T, chunkCount int, nodeCount int)
|
||||
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
|
||||
Retrieval: RetrievalDisabled,
|
||||
Syncing: SyncingRegisterOnly,
|
||||
})
|
||||
}, nil)
|
||||
bucket.Store(bucketKeyRegistry, r)
|
||||
|
||||
fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
|
||||
|
@@ -21,6 +21,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"reflect"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -87,6 +88,9 @@ type Registry struct {
|
||||
intervalsStore state.Store
|
||||
autoRetrieval bool //automatically subscribe to retrieve request stream
|
||||
maxPeerServers int
|
||||
spec *protocols.Spec //this protocol's spec
|
||||
balance protocols.Balance //implements protocols.Balance, for accounting
|
||||
prices protocols.Prices //implements protocols.Prices, provides prices to accounting
|
||||
}
|
||||
|
||||
// RegistryOptions holds optional values for NewRegistry constructor.
|
||||
@@ -99,7 +103,7 @@ type RegistryOptions struct {
|
||||
}
|
||||
|
||||
// NewRegistry is Streamer constructor
|
||||
func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.SyncChunkStore, intervalsStore state.Store, options *RegistryOptions) *Registry {
|
||||
func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.SyncChunkStore, intervalsStore state.Store, options *RegistryOptions, balance protocols.Balance) *Registry {
|
||||
if options == nil {
|
||||
options = &RegistryOptions{}
|
||||
}
|
||||
@@ -119,7 +123,10 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
|
||||
intervalsStore: intervalsStore,
|
||||
autoRetrieval: retrieval,
|
||||
maxPeerServers: options.MaxPeerServers,
|
||||
balance: balance,
|
||||
}
|
||||
streamer.setupSpec()
|
||||
|
||||
streamer.api = NewAPI(streamer)
|
||||
delivery.getPeer = streamer.getPeer
|
||||
|
||||
@@ -228,6 +235,17 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
|
||||
return streamer
|
||||
}
|
||||
|
||||
//we need to construct a spec instance per node instance
|
||||
func (r *Registry) setupSpec() {
|
||||
//first create the "bare" spec
|
||||
r.createSpec()
|
||||
//if balance is nil, this node has been started without swap support (swapEnabled flag is false)
|
||||
if r.balance != nil && !reflect.ValueOf(r.balance).IsNil() {
|
||||
//swap is enabled, so setup the hook
|
||||
r.spec.Hook = protocols.NewAccounting(r.balance, r.prices)
|
||||
}
|
||||
}
|
||||
|
||||
// RegisterClient registers an incoming streamer constructor
|
||||
func (r *Registry) RegisterClientFunc(stream string, f func(*Peer, string, bool) (Client, error)) {
|
||||
r.clientMu.Lock()
|
||||
@@ -492,7 +510,7 @@ func (r *Registry) updateSyncing() {
|
||||
}
|
||||
|
||||
func (r *Registry) runProtocol(p *p2p.Peer, rw p2p.MsgReadWriter) error {
|
||||
peer := protocols.NewPeer(p, rw, Spec)
|
||||
peer := protocols.NewPeer(p, rw, r.spec)
|
||||
bp := network.NewBzzPeer(peer)
|
||||
np := network.NewPeer(bp, r.delivery.kad)
|
||||
r.delivery.kad.On(np)
|
||||
@@ -716,35 +734,43 @@ func (c *clientParams) clientCreated() {
|
||||
close(c.clientCreatedC)
|
||||
}
|
||||
|
||||
// Spec is the spec of the streamer protocol
|
||||
var Spec = &protocols.Spec{
|
||||
Name: "stream",
|
||||
Version: 8,
|
||||
MaxMsgSize: 10 * 1024 * 1024,
|
||||
Messages: []interface{}{
|
||||
UnsubscribeMsg{},
|
||||
OfferedHashesMsg{},
|
||||
WantedHashesMsg{},
|
||||
TakeoverProofMsg{},
|
||||
SubscribeMsg{},
|
||||
RetrieveRequestMsg{},
|
||||
ChunkDeliveryMsgRetrieval{},
|
||||
SubscribeErrorMsg{},
|
||||
RequestSubscriptionMsg{},
|
||||
QuitMsg{},
|
||||
ChunkDeliveryMsgSyncing{},
|
||||
},
|
||||
//GetSpec returns the streamer spec to callers
|
||||
//This used to be a global variable but for simulations with
|
||||
//multiple nodes its fields (notably the Hook) would be overwritten
|
||||
func (r *Registry) GetSpec() *protocols.Spec {
|
||||
return r.spec
|
||||
}
|
||||
|
||||
func (r *Registry) createSpec() {
|
||||
// Spec is the spec of the streamer protocol
|
||||
var spec = &protocols.Spec{
|
||||
Name: "stream",
|
||||
Version: 8,
|
||||
MaxMsgSize: 10 * 1024 * 1024,
|
||||
Messages: []interface{}{
|
||||
UnsubscribeMsg{},
|
||||
OfferedHashesMsg{},
|
||||
WantedHashesMsg{},
|
||||
TakeoverProofMsg{},
|
||||
SubscribeMsg{},
|
||||
RetrieveRequestMsg{},
|
||||
ChunkDeliveryMsgRetrieval{},
|
||||
SubscribeErrorMsg{},
|
||||
RequestSubscriptionMsg{},
|
||||
QuitMsg{},
|
||||
ChunkDeliveryMsgSyncing{},
|
||||
},
|
||||
}
|
||||
r.spec = spec
|
||||
}
|
||||
|
||||
func (r *Registry) Protocols() []p2p.Protocol {
|
||||
return []p2p.Protocol{
|
||||
{
|
||||
Name: Spec.Name,
|
||||
Version: Spec.Version,
|
||||
Length: Spec.Length(),
|
||||
Name: r.spec.Name,
|
||||
Version: r.spec.Version,
|
||||
Length: r.spec.Length(),
|
||||
Run: r.runProtocol,
|
||||
// NodeInfo: ,
|
||||
// PeerInfo: ,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
@@ -118,7 +118,7 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
|
||||
Retrieval: RetrievalDisabled,
|
||||
Syncing: SyncingAutoSubscribe,
|
||||
SkipCheck: skipCheck,
|
||||
})
|
||||
}, nil)
|
||||
|
||||
fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
|
||||
bucket.Store(bucketKeyFileStore, fileStore)
|
||||
|
Reference in New Issue
Block a user