les: renamed lespay to vflux (#22347)
This commit is contained in:
107
les/vflux/client/api.go
Normal file
107
les/vflux/client/api.go
Normal file
@@ -0,0 +1,107 @@
|
||||
// Copyright 2020 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package client
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common/mclock"
|
||||
"github.com/ethereum/go-ethereum/les/utils"
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
)
|
||||
|
||||
// PrivateClientAPI implements the vflux client side API
|
||||
type PrivateClientAPI struct {
|
||||
vt *ValueTracker
|
||||
}
|
||||
|
||||
// NewPrivateClientAPI creates a PrivateClientAPI
|
||||
func NewPrivateClientAPI(vt *ValueTracker) *PrivateClientAPI {
|
||||
return &PrivateClientAPI{vt}
|
||||
}
|
||||
|
||||
// parseNodeStr converts either an enode address or a plain hex node id to enode.ID
|
||||
func parseNodeStr(nodeStr string) (enode.ID, error) {
|
||||
if id, err := enode.ParseID(nodeStr); err == nil {
|
||||
return id, nil
|
||||
}
|
||||
if node, err := enode.Parse(enode.ValidSchemes, nodeStr); err == nil {
|
||||
return node.ID(), nil
|
||||
} else {
|
||||
return enode.ID{}, err
|
||||
}
|
||||
}
|
||||
|
||||
// RequestStats returns the current contents of the reference request basket, with
|
||||
// request values meaning average per request rather than total.
|
||||
func (api *PrivateClientAPI) RequestStats() []RequestStatsItem {
|
||||
return api.vt.RequestStats()
|
||||
}
|
||||
|
||||
// Distribution returns a distribution as a series of (X, Y) chart coordinates,
|
||||
// where the X axis is the response time in seconds while the Y axis is the amount of
|
||||
// service value received with a response time close to the X coordinate.
|
||||
// The distribution is optionally normalized to a sum of 1.
|
||||
// If nodeStr == "" then the global distribution is returned, otherwise the individual
|
||||
// distribution of the specified server node.
|
||||
func (api *PrivateClientAPI) Distribution(nodeStr string, normalized bool) (RtDistribution, error) {
|
||||
var expFactor utils.ExpirationFactor
|
||||
if !normalized {
|
||||
expFactor = utils.ExpFactor(api.vt.StatsExpirer().LogOffset(mclock.Now()))
|
||||
}
|
||||
if nodeStr == "" {
|
||||
return api.vt.RtStats().Distribution(normalized, expFactor), nil
|
||||
}
|
||||
if id, err := parseNodeStr(nodeStr); err == nil {
|
||||
return api.vt.GetNode(id).RtStats().Distribution(normalized, expFactor), nil
|
||||
} else {
|
||||
return RtDistribution{}, err
|
||||
}
|
||||
}
|
||||
|
||||
// Timeout suggests a timeout value based on either the global distribution or the
|
||||
// distribution of the specified node. The parameter is the desired rate of timeouts
|
||||
// assuming a similar distribution in the future.
|
||||
// Note that the actual timeout should have a sensible minimum bound so that operating
|
||||
// under ideal working conditions for a long time (for example, using a local server
|
||||
// with very low response times) will not make it very hard for the system to accommodate
|
||||
// longer response times in the future.
|
||||
func (api *PrivateClientAPI) Timeout(nodeStr string, failRate float64) (float64, error) {
|
||||
if nodeStr == "" {
|
||||
return float64(api.vt.RtStats().Timeout(failRate)) / float64(time.Second), nil
|
||||
}
|
||||
if id, err := parseNodeStr(nodeStr); err == nil {
|
||||
return float64(api.vt.GetNode(id).RtStats().Timeout(failRate)) / float64(time.Second), nil
|
||||
} else {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
|
||||
// Value calculates the total service value provided either globally or by the specified
|
||||
// server node, using a weight function based on the given timeout.
|
||||
func (api *PrivateClientAPI) Value(nodeStr string, timeout float64) (float64, error) {
|
||||
wt := TimeoutWeights(time.Duration(timeout * float64(time.Second)))
|
||||
expFactor := utils.ExpFactor(api.vt.StatsExpirer().LogOffset(mclock.Now()))
|
||||
if nodeStr == "" {
|
||||
return api.vt.RtStats().Value(wt, expFactor), nil
|
||||
}
|
||||
if id, err := parseNodeStr(nodeStr); err == nil {
|
||||
return api.vt.GetNode(id).RtStats().Value(wt, expFactor), nil
|
||||
} else {
|
||||
return 0, err
|
||||
}
|
||||
}
|
107
les/vflux/client/fillset.go
Normal file
107
les/vflux/client/fillset.go
Normal file
@@ -0,0 +1,107 @@
|
||||
// Copyright 2020 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package client
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
"github.com/ethereum/go-ethereum/p2p/nodestate"
|
||||
)
|
||||
|
||||
// FillSet tries to read nodes from an input iterator and add them to a node set by
|
||||
// setting the specified node state flag(s) until the size of the set reaches the target.
|
||||
// Note that other mechanisms (like other FillSet instances reading from different inputs)
|
||||
// can also set the same flag(s) and FillSet will always care about the total number of
|
||||
// nodes having those flags.
|
||||
type FillSet struct {
|
||||
lock sync.Mutex
|
||||
cond *sync.Cond
|
||||
ns *nodestate.NodeStateMachine
|
||||
input enode.Iterator
|
||||
closed bool
|
||||
flags nodestate.Flags
|
||||
count, target int
|
||||
}
|
||||
|
||||
// NewFillSet creates a new FillSet
|
||||
func NewFillSet(ns *nodestate.NodeStateMachine, input enode.Iterator, flags nodestate.Flags) *FillSet {
|
||||
fs := &FillSet{
|
||||
ns: ns,
|
||||
input: input,
|
||||
flags: flags,
|
||||
}
|
||||
fs.cond = sync.NewCond(&fs.lock)
|
||||
|
||||
ns.SubscribeState(flags, func(n *enode.Node, oldState, newState nodestate.Flags) {
|
||||
fs.lock.Lock()
|
||||
if oldState.Equals(flags) {
|
||||
fs.count--
|
||||
}
|
||||
if newState.Equals(flags) {
|
||||
fs.count++
|
||||
}
|
||||
if fs.target > fs.count {
|
||||
fs.cond.Signal()
|
||||
}
|
||||
fs.lock.Unlock()
|
||||
})
|
||||
|
||||
go fs.readLoop()
|
||||
return fs
|
||||
}
|
||||
|
||||
// readLoop keeps reading nodes from the input and setting the specified flags for them
|
||||
// whenever the node set size is under the current target
|
||||
func (fs *FillSet) readLoop() {
|
||||
for {
|
||||
fs.lock.Lock()
|
||||
for fs.target <= fs.count && !fs.closed {
|
||||
fs.cond.Wait()
|
||||
}
|
||||
|
||||
fs.lock.Unlock()
|
||||
if !fs.input.Next() {
|
||||
return
|
||||
}
|
||||
fs.ns.SetState(fs.input.Node(), fs.flags, nodestate.Flags{}, 0)
|
||||
}
|
||||
}
|
||||
|
||||
// SetTarget sets the current target for node set size. If the previous target was not
|
||||
// reached and FillSet was still waiting for the next node from the input then the next
|
||||
// incoming node will be added to the set regardless of the target. This ensures that
|
||||
// all nodes coming from the input are eventually added to the set.
|
||||
func (fs *FillSet) SetTarget(target int) {
|
||||
fs.lock.Lock()
|
||||
defer fs.lock.Unlock()
|
||||
|
||||
fs.target = target
|
||||
if fs.target > fs.count {
|
||||
fs.cond.Signal()
|
||||
}
|
||||
}
|
||||
|
||||
// Close shuts FillSet down and closes the input iterator
|
||||
func (fs *FillSet) Close() {
|
||||
fs.lock.Lock()
|
||||
defer fs.lock.Unlock()
|
||||
|
||||
fs.closed = true
|
||||
fs.input.Close()
|
||||
fs.cond.Signal()
|
||||
}
|
113
les/vflux/client/fillset_test.go
Normal file
113
les/vflux/client/fillset_test.go
Normal file
@@ -0,0 +1,113 @@
|
||||
// Copyright 2020 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package client
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common/mclock"
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
"github.com/ethereum/go-ethereum/p2p/enr"
|
||||
"github.com/ethereum/go-ethereum/p2p/nodestate"
|
||||
)
|
||||
|
||||
type testIter struct {
|
||||
waitCh chan struct{}
|
||||
nodeCh chan *enode.Node
|
||||
node *enode.Node
|
||||
}
|
||||
|
||||
func (i *testIter) Next() bool {
|
||||
i.waitCh <- struct{}{}
|
||||
i.node = <-i.nodeCh
|
||||
return i.node != nil
|
||||
}
|
||||
|
||||
func (i *testIter) Node() *enode.Node {
|
||||
return i.node
|
||||
}
|
||||
|
||||
func (i *testIter) Close() {}
|
||||
|
||||
func (i *testIter) push() {
|
||||
var id enode.ID
|
||||
rand.Read(id[:])
|
||||
i.nodeCh <- enode.SignNull(new(enr.Record), id)
|
||||
}
|
||||
|
||||
func (i *testIter) waiting(timeout time.Duration) bool {
|
||||
select {
|
||||
case <-i.waitCh:
|
||||
return true
|
||||
case <-time.After(timeout):
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func TestFillSet(t *testing.T) {
|
||||
ns := nodestate.NewNodeStateMachine(nil, nil, &mclock.Simulated{}, testSetup)
|
||||
iter := &testIter{
|
||||
waitCh: make(chan struct{}),
|
||||
nodeCh: make(chan *enode.Node),
|
||||
}
|
||||
fs := NewFillSet(ns, iter, sfTest1)
|
||||
ns.Start()
|
||||
|
||||
expWaiting := func(i int, push bool) {
|
||||
for ; i > 0; i-- {
|
||||
if !iter.waiting(time.Second * 10) {
|
||||
t.Fatalf("FillSet not waiting for new nodes")
|
||||
}
|
||||
if push {
|
||||
iter.push()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
expNotWaiting := func() {
|
||||
if iter.waiting(time.Millisecond * 100) {
|
||||
t.Fatalf("FillSet unexpectedly waiting for new nodes")
|
||||
}
|
||||
}
|
||||
|
||||
expNotWaiting()
|
||||
fs.SetTarget(3)
|
||||
expWaiting(3, true)
|
||||
expNotWaiting()
|
||||
fs.SetTarget(100)
|
||||
expWaiting(2, true)
|
||||
expWaiting(1, false)
|
||||
// lower the target before the previous one has been filled up
|
||||
fs.SetTarget(0)
|
||||
iter.push()
|
||||
expNotWaiting()
|
||||
fs.SetTarget(10)
|
||||
expWaiting(4, true)
|
||||
expNotWaiting()
|
||||
// remove all previosly set flags
|
||||
ns.ForEach(sfTest1, nodestate.Flags{}, func(node *enode.Node, state nodestate.Flags) {
|
||||
ns.SetState(node, nodestate.Flags{}, sfTest1, 0)
|
||||
})
|
||||
// now expect FillSet to fill the set up again with 10 new nodes
|
||||
expWaiting(10, true)
|
||||
expNotWaiting()
|
||||
|
||||
fs.Close()
|
||||
ns.Stop()
|
||||
}
|
123
les/vflux/client/queueiterator.go
Normal file
123
les/vflux/client/queueiterator.go
Normal file
@@ -0,0 +1,123 @@
|
||||
// Copyright 2020 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package client
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
"github.com/ethereum/go-ethereum/p2p/nodestate"
|
||||
)
|
||||
|
||||
// QueueIterator returns nodes from the specified selectable set in the same order as
|
||||
// they entered the set.
|
||||
type QueueIterator struct {
|
||||
lock sync.Mutex
|
||||
cond *sync.Cond
|
||||
|
||||
ns *nodestate.NodeStateMachine
|
||||
queue []*enode.Node
|
||||
nextNode *enode.Node
|
||||
waitCallback func(bool)
|
||||
fifo, closed bool
|
||||
}
|
||||
|
||||
// NewQueueIterator creates a new QueueIterator. Nodes are selectable if they have all the required
|
||||
// and none of the disabled flags set. When a node is selected the selectedFlag is set which also
|
||||
// disables further selectability until it is removed or times out.
|
||||
func NewQueueIterator(ns *nodestate.NodeStateMachine, requireFlags, disableFlags nodestate.Flags, fifo bool, waitCallback func(bool)) *QueueIterator {
|
||||
qi := &QueueIterator{
|
||||
ns: ns,
|
||||
fifo: fifo,
|
||||
waitCallback: waitCallback,
|
||||
}
|
||||
qi.cond = sync.NewCond(&qi.lock)
|
||||
|
||||
ns.SubscribeState(requireFlags.Or(disableFlags), func(n *enode.Node, oldState, newState nodestate.Flags) {
|
||||
oldMatch := oldState.HasAll(requireFlags) && oldState.HasNone(disableFlags)
|
||||
newMatch := newState.HasAll(requireFlags) && newState.HasNone(disableFlags)
|
||||
if newMatch == oldMatch {
|
||||
return
|
||||
}
|
||||
|
||||
qi.lock.Lock()
|
||||
defer qi.lock.Unlock()
|
||||
|
||||
if newMatch {
|
||||
qi.queue = append(qi.queue, n)
|
||||
} else {
|
||||
id := n.ID()
|
||||
for i, qn := range qi.queue {
|
||||
if qn.ID() == id {
|
||||
copy(qi.queue[i:len(qi.queue)-1], qi.queue[i+1:])
|
||||
qi.queue = qi.queue[:len(qi.queue)-1]
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
qi.cond.Signal()
|
||||
})
|
||||
return qi
|
||||
}
|
||||
|
||||
// Next moves to the next selectable node.
|
||||
func (qi *QueueIterator) Next() bool {
|
||||
qi.lock.Lock()
|
||||
if !qi.closed && len(qi.queue) == 0 {
|
||||
if qi.waitCallback != nil {
|
||||
qi.waitCallback(true)
|
||||
}
|
||||
for !qi.closed && len(qi.queue) == 0 {
|
||||
qi.cond.Wait()
|
||||
}
|
||||
if qi.waitCallback != nil {
|
||||
qi.waitCallback(false)
|
||||
}
|
||||
}
|
||||
if qi.closed {
|
||||
qi.nextNode = nil
|
||||
qi.lock.Unlock()
|
||||
return false
|
||||
}
|
||||
// Move to the next node in queue.
|
||||
if qi.fifo {
|
||||
qi.nextNode = qi.queue[0]
|
||||
copy(qi.queue[:len(qi.queue)-1], qi.queue[1:])
|
||||
qi.queue = qi.queue[:len(qi.queue)-1]
|
||||
} else {
|
||||
qi.nextNode = qi.queue[len(qi.queue)-1]
|
||||
qi.queue = qi.queue[:len(qi.queue)-1]
|
||||
}
|
||||
qi.lock.Unlock()
|
||||
return true
|
||||
}
|
||||
|
||||
// Close ends the iterator.
|
||||
func (qi *QueueIterator) Close() {
|
||||
qi.lock.Lock()
|
||||
qi.closed = true
|
||||
qi.lock.Unlock()
|
||||
qi.cond.Signal()
|
||||
}
|
||||
|
||||
// Node returns the current node.
|
||||
func (qi *QueueIterator) Node() *enode.Node {
|
||||
qi.lock.Lock()
|
||||
defer qi.lock.Unlock()
|
||||
|
||||
return qi.nextNode
|
||||
}
|
106
les/vflux/client/queueiterator_test.go
Normal file
106
les/vflux/client/queueiterator_test.go
Normal file
@@ -0,0 +1,106 @@
|
||||
// Copyright 2020 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package client
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common/mclock"
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
"github.com/ethereum/go-ethereum/p2p/enr"
|
||||
"github.com/ethereum/go-ethereum/p2p/nodestate"
|
||||
)
|
||||
|
||||
func testNodeID(i int) enode.ID {
|
||||
return enode.ID{42, byte(i % 256), byte(i / 256)}
|
||||
}
|
||||
|
||||
func testNodeIndex(id enode.ID) int {
|
||||
if id[0] != 42 {
|
||||
return -1
|
||||
}
|
||||
return int(id[1]) + int(id[2])*256
|
||||
}
|
||||
|
||||
func testNode(i int) *enode.Node {
|
||||
return enode.SignNull(new(enr.Record), testNodeID(i))
|
||||
}
|
||||
|
||||
func TestQueueIteratorFIFO(t *testing.T) {
|
||||
testQueueIterator(t, true)
|
||||
}
|
||||
|
||||
func TestQueueIteratorLIFO(t *testing.T) {
|
||||
testQueueIterator(t, false)
|
||||
}
|
||||
|
||||
func testQueueIterator(t *testing.T, fifo bool) {
|
||||
ns := nodestate.NewNodeStateMachine(nil, nil, &mclock.Simulated{}, testSetup)
|
||||
qi := NewQueueIterator(ns, sfTest2, sfTest3.Or(sfTest4), fifo, nil)
|
||||
ns.Start()
|
||||
for i := 1; i <= iterTestNodeCount; i++ {
|
||||
ns.SetState(testNode(i), sfTest1, nodestate.Flags{}, 0)
|
||||
}
|
||||
next := func() int {
|
||||
ch := make(chan struct{})
|
||||
go func() {
|
||||
qi.Next()
|
||||
close(ch)
|
||||
}()
|
||||
select {
|
||||
case <-ch:
|
||||
case <-time.After(time.Second * 5):
|
||||
t.Fatalf("Iterator.Next() timeout")
|
||||
}
|
||||
node := qi.Node()
|
||||
ns.SetState(node, sfTest4, nodestate.Flags{}, 0)
|
||||
return testNodeIndex(node.ID())
|
||||
}
|
||||
exp := func(i int) {
|
||||
n := next()
|
||||
if n != i {
|
||||
t.Errorf("Wrong item returned by iterator (expected %d, got %d)", i, n)
|
||||
}
|
||||
}
|
||||
explist := func(list []int) {
|
||||
for i := range list {
|
||||
if fifo {
|
||||
exp(list[i])
|
||||
} else {
|
||||
exp(list[len(list)-1-i])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ns.SetState(testNode(1), sfTest2, nodestate.Flags{}, 0)
|
||||
ns.SetState(testNode(2), sfTest2, nodestate.Flags{}, 0)
|
||||
ns.SetState(testNode(3), sfTest2, nodestate.Flags{}, 0)
|
||||
explist([]int{1, 2, 3})
|
||||
ns.SetState(testNode(4), sfTest2, nodestate.Flags{}, 0)
|
||||
ns.SetState(testNode(5), sfTest2, nodestate.Flags{}, 0)
|
||||
ns.SetState(testNode(6), sfTest2, nodestate.Flags{}, 0)
|
||||
ns.SetState(testNode(5), sfTest3, nodestate.Flags{}, 0)
|
||||
explist([]int{4, 6})
|
||||
ns.SetState(testNode(1), nodestate.Flags{}, sfTest4, 0)
|
||||
ns.SetState(testNode(2), nodestate.Flags{}, sfTest4, 0)
|
||||
ns.SetState(testNode(3), nodestate.Flags{}, sfTest4, 0)
|
||||
ns.SetState(testNode(2), sfTest3, nodestate.Flags{}, 0)
|
||||
ns.SetState(testNode(2), nodestate.Flags{}, sfTest3, 0)
|
||||
explist([]int{1, 3, 2})
|
||||
ns.Stop()
|
||||
}
|
285
les/vflux/client/requestbasket.go
Normal file
285
les/vflux/client/requestbasket.go
Normal file
@@ -0,0 +1,285 @@
|
||||
// Copyright 2020 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package client
|
||||
|
||||
import (
|
||||
"io"
|
||||
|
||||
"github.com/ethereum/go-ethereum/les/utils"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
)
|
||||
|
||||
const basketFactor = 1000000 // reference basket amount and value scale factor
|
||||
|
||||
// referenceBasket keeps track of global request usage statistics and the usual prices
|
||||
// of each used request type relative to each other. The amounts in the basket are scaled
|
||||
// up by basketFactor because of the exponential expiration of long-term statistical data.
|
||||
// Values are scaled so that the sum of all amounts and the sum of all values are equal.
|
||||
//
|
||||
// reqValues represent the internal relative value estimates for each request type and are
|
||||
// calculated as value / amount. The average reqValue of all used requests is 1.
|
||||
// In other words: SUM(refBasket[type].amount * reqValue[type]) = SUM(refBasket[type].amount)
|
||||
type referenceBasket struct {
|
||||
basket requestBasket
|
||||
reqValues []float64 // contents are read only, new slice is created for each update
|
||||
}
|
||||
|
||||
// serverBasket collects served request amount and value statistics for a single server.
|
||||
//
|
||||
// Values are gradually transferred to the global reference basket with a long time
|
||||
// constant so that each server basket represents long term usage and price statistics.
|
||||
// When the transferred part is added to the reference basket the values are scaled so
|
||||
// that their sum equals the total value calculated according to the previous reqValues.
|
||||
// The ratio of request values coming from the server basket represent the pricing of
|
||||
// the specific server and modify the global estimates with a weight proportional to
|
||||
// the amount of service provided by the server.
|
||||
type serverBasket struct {
|
||||
basket requestBasket
|
||||
rvFactor float64
|
||||
}
|
||||
|
||||
type (
|
||||
// requestBasket holds amounts and values for each request type.
|
||||
// These values are exponentially expired (see utils.ExpiredValue). The power of 2
|
||||
// exponent is applicable to all values within.
|
||||
requestBasket struct {
|
||||
items []basketItem
|
||||
exp uint64
|
||||
}
|
||||
// basketItem holds amount and value for a single request type. Value is the total
|
||||
// relative request value accumulated for served requests while amount is the counter
|
||||
// for each request type.
|
||||
// Note that these values are both scaled up by basketFactor because of the exponential
|
||||
// expiration.
|
||||
basketItem struct {
|
||||
amount, value uint64
|
||||
}
|
||||
)
|
||||
|
||||
// setExp sets the power of 2 exponent of the structure, scaling base values (the amounts
|
||||
// and request values) up or down if necessary.
|
||||
func (b *requestBasket) setExp(exp uint64) {
|
||||
if exp > b.exp {
|
||||
shift := exp - b.exp
|
||||
for i, item := range b.items {
|
||||
item.amount >>= shift
|
||||
item.value >>= shift
|
||||
b.items[i] = item
|
||||
}
|
||||
b.exp = exp
|
||||
}
|
||||
if exp < b.exp {
|
||||
shift := b.exp - exp
|
||||
for i, item := range b.items {
|
||||
item.amount <<= shift
|
||||
item.value <<= shift
|
||||
b.items[i] = item
|
||||
}
|
||||
b.exp = exp
|
||||
}
|
||||
}
|
||||
|
||||
// init initializes a new server basket with the given service vector size (number of
|
||||
// different request types)
|
||||
func (s *serverBasket) init(size int) {
|
||||
if s.basket.items == nil {
|
||||
s.basket.items = make([]basketItem, size)
|
||||
}
|
||||
}
|
||||
|
||||
// add adds the give type and amount of requests to the basket. Cost is calculated
|
||||
// according to the server's own cost table.
|
||||
func (s *serverBasket) add(reqType, reqAmount uint32, reqCost uint64, expFactor utils.ExpirationFactor) {
|
||||
s.basket.setExp(expFactor.Exp)
|
||||
i := &s.basket.items[reqType]
|
||||
i.amount += uint64(float64(uint64(reqAmount)*basketFactor) * expFactor.Factor)
|
||||
i.value += uint64(float64(reqCost) * s.rvFactor * expFactor.Factor)
|
||||
}
|
||||
|
||||
// updateRvFactor updates the request value factor that scales server costs into the
|
||||
// local value dimensions.
|
||||
func (s *serverBasket) updateRvFactor(rvFactor float64) {
|
||||
s.rvFactor = rvFactor
|
||||
}
|
||||
|
||||
// transfer decreases amounts and values in the basket with the given ratio and
|
||||
// moves the removed amounts into a new basket which is returned and can be added
|
||||
// to the global reference basket.
|
||||
func (s *serverBasket) transfer(ratio float64) requestBasket {
|
||||
res := requestBasket{
|
||||
items: make([]basketItem, len(s.basket.items)),
|
||||
exp: s.basket.exp,
|
||||
}
|
||||
for i, v := range s.basket.items {
|
||||
ta := uint64(float64(v.amount) * ratio)
|
||||
tv := uint64(float64(v.value) * ratio)
|
||||
if ta > v.amount {
|
||||
ta = v.amount
|
||||
}
|
||||
if tv > v.value {
|
||||
tv = v.value
|
||||
}
|
||||
s.basket.items[i] = basketItem{v.amount - ta, v.value - tv}
|
||||
res.items[i] = basketItem{ta, tv}
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
// init initializes the reference basket with the given service vector size (number of
|
||||
// different request types)
|
||||
func (r *referenceBasket) init(size int) {
|
||||
r.reqValues = make([]float64, size)
|
||||
r.normalize()
|
||||
r.updateReqValues()
|
||||
}
|
||||
|
||||
// add adds the transferred part of a server basket to the reference basket while scaling
|
||||
// value amounts so that their sum equals the total value calculated according to the
|
||||
// previous reqValues.
|
||||
func (r *referenceBasket) add(newBasket requestBasket) {
|
||||
r.basket.setExp(newBasket.exp)
|
||||
// scale newBasket to match service unit value
|
||||
var (
|
||||
totalCost uint64
|
||||
totalValue float64
|
||||
)
|
||||
for i, v := range newBasket.items {
|
||||
totalCost += v.value
|
||||
totalValue += float64(v.amount) * r.reqValues[i]
|
||||
}
|
||||
if totalCost > 0 {
|
||||
// add to reference with scaled values
|
||||
scaleValues := totalValue / float64(totalCost)
|
||||
for i, v := range newBasket.items {
|
||||
r.basket.items[i].amount += v.amount
|
||||
r.basket.items[i].value += uint64(float64(v.value) * scaleValues)
|
||||
}
|
||||
}
|
||||
r.updateReqValues()
|
||||
}
|
||||
|
||||
// updateReqValues recalculates reqValues after adding transferred baskets. Note that
|
||||
// values should be normalized first.
|
||||
func (r *referenceBasket) updateReqValues() {
|
||||
r.reqValues = make([]float64, len(r.reqValues))
|
||||
for i, b := range r.basket.items {
|
||||
if b.amount > 0 {
|
||||
r.reqValues[i] = float64(b.value) / float64(b.amount)
|
||||
} else {
|
||||
r.reqValues[i] = 0
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// normalize ensures that the sum of values equal the sum of amounts in the basket.
|
||||
func (r *referenceBasket) normalize() {
|
||||
var sumAmount, sumValue uint64
|
||||
for _, b := range r.basket.items {
|
||||
sumAmount += b.amount
|
||||
sumValue += b.value
|
||||
}
|
||||
add := float64(int64(sumAmount-sumValue)) / float64(sumValue)
|
||||
for i, b := range r.basket.items {
|
||||
b.value += uint64(int64(float64(b.value) * add))
|
||||
r.basket.items[i] = b
|
||||
}
|
||||
}
|
||||
|
||||
// reqValueFactor calculates the request value factor applicable to the server with
|
||||
// the given announced request cost list
|
||||
func (r *referenceBasket) reqValueFactor(costList []uint64) float64 {
|
||||
var (
|
||||
totalCost float64
|
||||
totalValue uint64
|
||||
)
|
||||
for i, b := range r.basket.items {
|
||||
totalCost += float64(costList[i]) * float64(b.amount) // use floats to avoid overflow
|
||||
totalValue += b.value
|
||||
}
|
||||
if totalCost < 1 {
|
||||
return 0
|
||||
}
|
||||
return float64(totalValue) * basketFactor / totalCost
|
||||
}
|
||||
|
||||
// EncodeRLP implements rlp.Encoder
|
||||
func (b *basketItem) EncodeRLP(w io.Writer) error {
|
||||
return rlp.Encode(w, []interface{}{b.amount, b.value})
|
||||
}
|
||||
|
||||
// DecodeRLP implements rlp.Decoder
|
||||
func (b *basketItem) DecodeRLP(s *rlp.Stream) error {
|
||||
var item struct {
|
||||
Amount, Value uint64
|
||||
}
|
||||
if err := s.Decode(&item); err != nil {
|
||||
return err
|
||||
}
|
||||
b.amount, b.value = item.Amount, item.Value
|
||||
return nil
|
||||
}
|
||||
|
||||
// EncodeRLP implements rlp.Encoder
|
||||
func (r *requestBasket) EncodeRLP(w io.Writer) error {
|
||||
return rlp.Encode(w, []interface{}{r.items, r.exp})
|
||||
}
|
||||
|
||||
// DecodeRLP implements rlp.Decoder
|
||||
func (r *requestBasket) DecodeRLP(s *rlp.Stream) error {
|
||||
var enc struct {
|
||||
Items []basketItem
|
||||
Exp uint64
|
||||
}
|
||||
if err := s.Decode(&enc); err != nil {
|
||||
return err
|
||||
}
|
||||
r.items, r.exp = enc.Items, enc.Exp
|
||||
return nil
|
||||
}
|
||||
|
||||
// convertMapping converts a basket loaded from the database into the current format.
|
||||
// If the available request types and their mapping into the service vector differ from
|
||||
// the one used when saving the basket then this function reorders old fields and fills
|
||||
// in previously unknown fields by scaling up amounts and values taken from the
|
||||
// initialization basket.
|
||||
func (r requestBasket) convertMapping(oldMapping, newMapping []string, initBasket requestBasket) requestBasket {
|
||||
nameMap := make(map[string]int)
|
||||
for i, name := range oldMapping {
|
||||
nameMap[name] = i
|
||||
}
|
||||
rc := requestBasket{items: make([]basketItem, len(newMapping))}
|
||||
var scale, oldScale, newScale float64
|
||||
for i, name := range newMapping {
|
||||
if ii, ok := nameMap[name]; ok {
|
||||
rc.items[i] = r.items[ii]
|
||||
oldScale += float64(initBasket.items[i].amount) * float64(initBasket.items[i].amount)
|
||||
newScale += float64(rc.items[i].amount) * float64(initBasket.items[i].amount)
|
||||
}
|
||||
}
|
||||
if oldScale > 1e-10 {
|
||||
scale = newScale / oldScale
|
||||
} else {
|
||||
scale = 1
|
||||
}
|
||||
for i, name := range newMapping {
|
||||
if _, ok := nameMap[name]; !ok {
|
||||
rc.items[i].amount = uint64(float64(initBasket.items[i].amount) * scale)
|
||||
rc.items[i].value = uint64(float64(initBasket.items[i].value) * scale)
|
||||
}
|
||||
}
|
||||
return rc
|
||||
}
|
161
les/vflux/client/requestbasket_test.go
Normal file
161
les/vflux/client/requestbasket_test.go
Normal file
@@ -0,0 +1,161 @@
|
||||
// Copyright 2020 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package client
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"testing"
|
||||
|
||||
"github.com/ethereum/go-ethereum/les/utils"
|
||||
)
|
||||
|
||||
func checkU64(t *testing.T, name string, value, exp uint64) {
|
||||
if value != exp {
|
||||
t.Errorf("Incorrect value for %s: got %d, expected %d", name, value, exp)
|
||||
}
|
||||
}
|
||||
|
||||
func checkF64(t *testing.T, name string, value, exp, tol float64) {
|
||||
if value < exp-tol || value > exp+tol {
|
||||
t.Errorf("Incorrect value for %s: got %f, expected %f", name, value, exp)
|
||||
}
|
||||
}
|
||||
|
||||
func TestServerBasket(t *testing.T) {
|
||||
var s serverBasket
|
||||
s.init(2)
|
||||
// add some requests with different request value factors
|
||||
s.updateRvFactor(1)
|
||||
noexp := utils.ExpirationFactor{Factor: 1}
|
||||
s.add(0, 1000, 10000, noexp)
|
||||
s.add(1, 3000, 60000, noexp)
|
||||
s.updateRvFactor(10)
|
||||
s.add(0, 4000, 4000, noexp)
|
||||
s.add(1, 2000, 4000, noexp)
|
||||
s.updateRvFactor(10)
|
||||
// check basket contents directly
|
||||
checkU64(t, "s.basket[0].amount", s.basket.items[0].amount, 5000*basketFactor)
|
||||
checkU64(t, "s.basket[0].value", s.basket.items[0].value, 50000)
|
||||
checkU64(t, "s.basket[1].amount", s.basket.items[1].amount, 5000*basketFactor)
|
||||
checkU64(t, "s.basket[1].value", s.basket.items[1].value, 100000)
|
||||
// transfer 50% of the contents of the basket
|
||||
transfer1 := s.transfer(0.5)
|
||||
checkU64(t, "transfer1[0].amount", transfer1.items[0].amount, 2500*basketFactor)
|
||||
checkU64(t, "transfer1[0].value", transfer1.items[0].value, 25000)
|
||||
checkU64(t, "transfer1[1].amount", transfer1.items[1].amount, 2500*basketFactor)
|
||||
checkU64(t, "transfer1[1].value", transfer1.items[1].value, 50000)
|
||||
// add more requests
|
||||
s.updateRvFactor(100)
|
||||
s.add(0, 1000, 100, noexp)
|
||||
// transfer 25% of the contents of the basket
|
||||
transfer2 := s.transfer(0.25)
|
||||
checkU64(t, "transfer2[0].amount", transfer2.items[0].amount, (2500+1000)/4*basketFactor)
|
||||
checkU64(t, "transfer2[0].value", transfer2.items[0].value, (25000+10000)/4)
|
||||
checkU64(t, "transfer2[1].amount", transfer2.items[1].amount, 2500/4*basketFactor)
|
||||
checkU64(t, "transfer2[1].value", transfer2.items[1].value, 50000/4)
|
||||
}
|
||||
|
||||
func TestConvertMapping(t *testing.T) {
|
||||
b := requestBasket{items: []basketItem{{3, 3}, {1, 1}, {2, 2}}}
|
||||
oldMap := []string{"req3", "req1", "req2"}
|
||||
newMap := []string{"req1", "req2", "req3", "req4"}
|
||||
init := requestBasket{items: []basketItem{{2, 2}, {4, 4}, {6, 6}, {8, 8}}}
|
||||
bc := b.convertMapping(oldMap, newMap, init)
|
||||
checkU64(t, "bc[0].amount", bc.items[0].amount, 1)
|
||||
checkU64(t, "bc[1].amount", bc.items[1].amount, 2)
|
||||
checkU64(t, "bc[2].amount", bc.items[2].amount, 3)
|
||||
checkU64(t, "bc[3].amount", bc.items[3].amount, 4) // 8 should be scaled down to 4
|
||||
}
|
||||
|
||||
func TestReqValueFactor(t *testing.T) {
|
||||
var ref referenceBasket
|
||||
ref.basket = requestBasket{items: make([]basketItem, 4)}
|
||||
for i := range ref.basket.items {
|
||||
ref.basket.items[i].amount = uint64(i+1) * basketFactor
|
||||
ref.basket.items[i].value = uint64(i+1) * basketFactor
|
||||
}
|
||||
ref.init(4)
|
||||
rvf := ref.reqValueFactor([]uint64{1000, 2000, 3000, 4000})
|
||||
// expected value is (1000000+2000000+3000000+4000000) / (1*1000+2*2000+3*3000+4*4000) = 10000000/30000 = 333.333
|
||||
checkF64(t, "reqValueFactor", rvf, 333.333, 1)
|
||||
}
|
||||
|
||||
func TestNormalize(t *testing.T) {
|
||||
for cycle := 0; cycle < 100; cycle += 1 {
|
||||
// Initialize data for testing
|
||||
valueRange, lower := 1000000, 1000000
|
||||
ref := referenceBasket{basket: requestBasket{items: make([]basketItem, 10)}}
|
||||
for i := 0; i < 10; i++ {
|
||||
ref.basket.items[i].amount = uint64(rand.Intn(valueRange) + lower)
|
||||
ref.basket.items[i].value = uint64(rand.Intn(valueRange) + lower)
|
||||
}
|
||||
ref.normalize()
|
||||
|
||||
// Check whether SUM(amount) ~= SUM(value)
|
||||
var sumAmount, sumValue uint64
|
||||
for i := 0; i < 10; i++ {
|
||||
sumAmount += ref.basket.items[i].amount
|
||||
sumValue += ref.basket.items[i].value
|
||||
}
|
||||
var epsilon = 0.01
|
||||
if float64(sumAmount)*(1+epsilon) < float64(sumValue) || float64(sumAmount)*(1-epsilon) > float64(sumValue) {
|
||||
t.Fatalf("Failed to normalize sumAmount: %d sumValue: %d", sumAmount, sumValue)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestReqValueAdjustment(t *testing.T) {
|
||||
var s1, s2 serverBasket
|
||||
s1.init(3)
|
||||
s2.init(3)
|
||||
cost1 := []uint64{30000, 60000, 90000}
|
||||
cost2 := []uint64{100000, 200000, 300000}
|
||||
var ref referenceBasket
|
||||
ref.basket = requestBasket{items: make([]basketItem, 3)}
|
||||
for i := range ref.basket.items {
|
||||
ref.basket.items[i].amount = 123 * basketFactor
|
||||
ref.basket.items[i].value = 123 * basketFactor
|
||||
}
|
||||
ref.init(3)
|
||||
// initial reqValues are expected to be {1, 1, 1}
|
||||
checkF64(t, "reqValues[0]", ref.reqValues[0], 1, 0.01)
|
||||
checkF64(t, "reqValues[1]", ref.reqValues[1], 1, 0.01)
|
||||
checkF64(t, "reqValues[2]", ref.reqValues[2], 1, 0.01)
|
||||
var logOffset utils.Fixed64
|
||||
for period := 0; period < 1000; period++ {
|
||||
exp := utils.ExpFactor(logOffset)
|
||||
s1.updateRvFactor(ref.reqValueFactor(cost1))
|
||||
s2.updateRvFactor(ref.reqValueFactor(cost2))
|
||||
// throw in random requests into each basket using their internal pricing
|
||||
for i := 0; i < 1000; i++ {
|
||||
reqType, reqAmount := uint32(rand.Intn(3)), uint32(rand.Intn(10)+1)
|
||||
reqCost := uint64(reqAmount) * cost1[reqType]
|
||||
s1.add(reqType, reqAmount, reqCost, exp)
|
||||
reqType, reqAmount = uint32(rand.Intn(3)), uint32(rand.Intn(10)+1)
|
||||
reqCost = uint64(reqAmount) * cost2[reqType]
|
||||
s2.add(reqType, reqAmount, reqCost, exp)
|
||||
}
|
||||
ref.add(s1.transfer(0.1))
|
||||
ref.add(s2.transfer(0.1))
|
||||
ref.normalize()
|
||||
ref.updateReqValues()
|
||||
logOffset += utils.Float64ToFixed64(0.1)
|
||||
}
|
||||
checkF64(t, "reqValues[0]", ref.reqValues[0], 0.5, 0.01)
|
||||
checkF64(t, "reqValues[1]", ref.reqValues[1], 1, 0.01)
|
||||
checkF64(t, "reqValues[2]", ref.reqValues[2], 1.5, 0.01)
|
||||
}
|
237
les/vflux/client/timestats.go
Normal file
237
les/vflux/client/timestats.go
Normal file
@@ -0,0 +1,237 @@
|
||||
// Copyright 2020 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package client
|
||||
|
||||
import (
|
||||
"io"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/les/utils"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
)
|
||||
|
||||
const (
|
||||
minResponseTime = time.Millisecond * 50
|
||||
maxResponseTime = time.Second * 10
|
||||
timeStatLength = 32
|
||||
weightScaleFactor = 1000000
|
||||
)
|
||||
|
||||
// ResponseTimeStats is the response time distribution of a set of answered requests,
|
||||
// weighted with request value, either served by a single server or aggregated for
|
||||
// multiple servers.
|
||||
// It it a fixed length (timeStatLength) distribution vector with linear interpolation.
|
||||
// The X axis (the time values) are not linear, they should be transformed with
|
||||
// TimeToStatScale and StatScaleToTime.
|
||||
type (
|
||||
ResponseTimeStats struct {
|
||||
stats [timeStatLength]uint64
|
||||
exp uint64
|
||||
}
|
||||
ResponseTimeWeights [timeStatLength]float64
|
||||
)
|
||||
|
||||
var timeStatsLogFactor = (timeStatLength - 1) / (math.Log(float64(maxResponseTime)/float64(minResponseTime)) + 1)
|
||||
|
||||
// TimeToStatScale converts a response time to a distribution vector index. The index
|
||||
// is represented by a float64 so that linear interpolation can be applied.
|
||||
func TimeToStatScale(d time.Duration) float64 {
|
||||
if d < 0 {
|
||||
return 0
|
||||
}
|
||||
r := float64(d) / float64(minResponseTime)
|
||||
if r > 1 {
|
||||
r = math.Log(r) + 1
|
||||
}
|
||||
r *= timeStatsLogFactor
|
||||
if r > timeStatLength-1 {
|
||||
return timeStatLength - 1
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
||||
// StatScaleToTime converts a distribution vector index to a response time. The index
|
||||
// is represented by a float64 so that linear interpolation can be applied.
|
||||
func StatScaleToTime(r float64) time.Duration {
|
||||
r /= timeStatsLogFactor
|
||||
if r > 1 {
|
||||
r = math.Exp(r - 1)
|
||||
}
|
||||
return time.Duration(r * float64(minResponseTime))
|
||||
}
|
||||
|
||||
// TimeoutWeights calculates the weight function used for calculating service value
|
||||
// based on the response time distribution of the received service.
|
||||
// It is based on the request timeout value of the system. It consists of a half cosine
|
||||
// function starting with 1, crossing zero at timeout and reaching -1 at 2*timeout.
|
||||
// After 2*timeout the weight is constant -1.
|
||||
func TimeoutWeights(timeout time.Duration) (res ResponseTimeWeights) {
|
||||
for i := range res {
|
||||
t := StatScaleToTime(float64(i))
|
||||
if t < 2*timeout {
|
||||
res[i] = math.Cos(math.Pi / 2 * float64(t) / float64(timeout))
|
||||
} else {
|
||||
res[i] = -1
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// EncodeRLP implements rlp.Encoder
|
||||
func (rt *ResponseTimeStats) EncodeRLP(w io.Writer) error {
|
||||
enc := struct {
|
||||
Stats [timeStatLength]uint64
|
||||
Exp uint64
|
||||
}{rt.stats, rt.exp}
|
||||
return rlp.Encode(w, &enc)
|
||||
}
|
||||
|
||||
// DecodeRLP implements rlp.Decoder
|
||||
func (rt *ResponseTimeStats) DecodeRLP(s *rlp.Stream) error {
|
||||
var enc struct {
|
||||
Stats [timeStatLength]uint64
|
||||
Exp uint64
|
||||
}
|
||||
if err := s.Decode(&enc); err != nil {
|
||||
return err
|
||||
}
|
||||
rt.stats, rt.exp = enc.Stats, enc.Exp
|
||||
return nil
|
||||
}
|
||||
|
||||
// Add adds a new response time with the given weight to the distribution.
|
||||
func (rt *ResponseTimeStats) Add(respTime time.Duration, weight float64, expFactor utils.ExpirationFactor) {
|
||||
rt.setExp(expFactor.Exp)
|
||||
weight *= expFactor.Factor * weightScaleFactor
|
||||
r := TimeToStatScale(respTime)
|
||||
i := int(r)
|
||||
r -= float64(i)
|
||||
rt.stats[i] += uint64(weight * (1 - r))
|
||||
if i < timeStatLength-1 {
|
||||
rt.stats[i+1] += uint64(weight * r)
|
||||
}
|
||||
}
|
||||
|
||||
// setExp sets the power of 2 exponent of the structure, scaling base values (the vector
|
||||
// itself) up or down if necessary.
|
||||
func (rt *ResponseTimeStats) setExp(exp uint64) {
|
||||
if exp > rt.exp {
|
||||
shift := exp - rt.exp
|
||||
for i, v := range rt.stats {
|
||||
rt.stats[i] = v >> shift
|
||||
}
|
||||
rt.exp = exp
|
||||
}
|
||||
if exp < rt.exp {
|
||||
shift := rt.exp - exp
|
||||
for i, v := range rt.stats {
|
||||
rt.stats[i] = v << shift
|
||||
}
|
||||
rt.exp = exp
|
||||
}
|
||||
}
|
||||
|
||||
// Value calculates the total service value based on the given distribution, using the
|
||||
// specified weight function.
|
||||
func (rt ResponseTimeStats) Value(weights ResponseTimeWeights, expFactor utils.ExpirationFactor) float64 {
|
||||
var v float64
|
||||
for i, s := range rt.stats {
|
||||
v += float64(s) * weights[i]
|
||||
}
|
||||
if v < 0 {
|
||||
return 0
|
||||
}
|
||||
return expFactor.Value(v, rt.exp) / weightScaleFactor
|
||||
}
|
||||
|
||||
// AddStats adds the given ResponseTimeStats to the current one.
|
||||
func (rt *ResponseTimeStats) AddStats(s *ResponseTimeStats) {
|
||||
rt.setExp(s.exp)
|
||||
for i, v := range s.stats {
|
||||
rt.stats[i] += v
|
||||
}
|
||||
}
|
||||
|
||||
// SubStats subtracts the given ResponseTimeStats from the current one.
|
||||
func (rt *ResponseTimeStats) SubStats(s *ResponseTimeStats) {
|
||||
rt.setExp(s.exp)
|
||||
for i, v := range s.stats {
|
||||
if v < rt.stats[i] {
|
||||
rt.stats[i] -= v
|
||||
} else {
|
||||
rt.stats[i] = 0
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Timeout suggests a timeout value based on the previous distribution. The parameter
|
||||
// is the desired rate of timeouts assuming a similar distribution in the future.
|
||||
// Note that the actual timeout should have a sensible minimum bound so that operating
|
||||
// under ideal working conditions for a long time (for example, using a local server
|
||||
// with very low response times) will not make it very hard for the system to accommodate
|
||||
// longer response times in the future.
|
||||
func (rt ResponseTimeStats) Timeout(failRatio float64) time.Duration {
|
||||
var sum uint64
|
||||
for _, v := range rt.stats {
|
||||
sum += v
|
||||
}
|
||||
s := uint64(float64(sum) * failRatio)
|
||||
i := timeStatLength - 1
|
||||
for i > 0 && s >= rt.stats[i] {
|
||||
s -= rt.stats[i]
|
||||
i--
|
||||
}
|
||||
r := float64(i) + 0.5
|
||||
if rt.stats[i] > 0 {
|
||||
r -= float64(s) / float64(rt.stats[i])
|
||||
}
|
||||
if r < 0 {
|
||||
r = 0
|
||||
}
|
||||
th := StatScaleToTime(r)
|
||||
if th > maxResponseTime {
|
||||
th = maxResponseTime
|
||||
}
|
||||
return th
|
||||
}
|
||||
|
||||
// RtDistribution represents a distribution as a series of (X, Y) chart coordinates,
|
||||
// where the X axis is the response time in seconds while the Y axis is the amount of
|
||||
// service value received with a response time close to the X coordinate.
|
||||
type RtDistribution [timeStatLength][2]float64
|
||||
|
||||
// Distribution returns a RtDistribution, optionally normalized to a sum of 1.
|
||||
func (rt ResponseTimeStats) Distribution(normalized bool, expFactor utils.ExpirationFactor) (res RtDistribution) {
|
||||
var mul float64
|
||||
if normalized {
|
||||
var sum uint64
|
||||
for _, v := range rt.stats {
|
||||
sum += v
|
||||
}
|
||||
if sum > 0 {
|
||||
mul = 1 / float64(sum)
|
||||
}
|
||||
} else {
|
||||
mul = expFactor.Value(float64(1)/weightScaleFactor, rt.exp)
|
||||
}
|
||||
for i, v := range rt.stats {
|
||||
res[i][0] = float64(StatScaleToTime(float64(i))) / float64(time.Second)
|
||||
res[i][1] = float64(v) * mul
|
||||
}
|
||||
return
|
||||
}
|
137
les/vflux/client/timestats_test.go
Normal file
137
les/vflux/client/timestats_test.go
Normal file
@@ -0,0 +1,137 @@
|
||||
// Copyright 2020 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package client
|
||||
|
||||
import (
|
||||
"math"
|
||||
"math/rand"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/les/utils"
|
||||
)
|
||||
|
||||
func TestTransition(t *testing.T) {
|
||||
var epsilon = 0.01
|
||||
var cases = []time.Duration{
|
||||
time.Millisecond, minResponseTime,
|
||||
time.Second, time.Second * 5, maxResponseTime,
|
||||
}
|
||||
for _, c := range cases {
|
||||
got := StatScaleToTime(TimeToStatScale(c))
|
||||
if float64(got)*(1+epsilon) < float64(c) || float64(got)*(1-epsilon) > float64(c) {
|
||||
t.Fatalf("Failed to transition back")
|
||||
}
|
||||
}
|
||||
// If the time is too large(exceeds the max response time.
|
||||
got := StatScaleToTime(TimeToStatScale(2 * maxResponseTime))
|
||||
if float64(got)*(1+epsilon) < float64(maxResponseTime) || float64(got)*(1-epsilon) > float64(maxResponseTime) {
|
||||
t.Fatalf("Failed to transition back")
|
||||
}
|
||||
}
|
||||
|
||||
var maxResponseWeights = TimeoutWeights(maxResponseTime)
|
||||
|
||||
func TestValue(t *testing.T) {
|
||||
noexp := utils.ExpirationFactor{Factor: 1}
|
||||
for i := 0; i < 1000; i++ {
|
||||
max := minResponseTime + time.Duration(rand.Int63n(int64(maxResponseTime-minResponseTime)))
|
||||
min := minResponseTime + time.Duration(rand.Int63n(int64(max-minResponseTime)))
|
||||
timeout := max/2 + time.Duration(rand.Int63n(int64(maxResponseTime-max/2)))
|
||||
s := makeRangeStats(min, max, 1000, noexp)
|
||||
value := s.Value(TimeoutWeights(timeout), noexp)
|
||||
// calculate the average weight (the average of the given range of the half cosine
|
||||
// weight function).
|
||||
minx := math.Pi / 2 * float64(min) / float64(timeout)
|
||||
maxx := math.Pi / 2 * float64(max) / float64(timeout)
|
||||
avgWeight := (math.Sin(maxx) - math.Sin(minx)) / (maxx - minx)
|
||||
expv := 1000 * avgWeight
|
||||
if expv < 0 {
|
||||
expv = 0
|
||||
}
|
||||
if value < expv-10 || value > expv+10 {
|
||||
t.Errorf("Value failed (expected %v, got %v)", expv, value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddSubExpire(t *testing.T) {
|
||||
var (
|
||||
sum1, sum2 ResponseTimeStats
|
||||
sum1ValueExp, sum2ValueExp float64
|
||||
logOffset utils.Fixed64
|
||||
)
|
||||
for i := 0; i < 1000; i++ {
|
||||
exp := utils.ExpFactor(logOffset)
|
||||
max := minResponseTime + time.Duration(rand.Int63n(int64(maxResponseTime-minResponseTime)))
|
||||
min := minResponseTime + time.Duration(rand.Int63n(int64(max-minResponseTime)))
|
||||
s := makeRangeStats(min, max, 1000, exp)
|
||||
value := s.Value(maxResponseWeights, exp)
|
||||
sum1.AddStats(&s)
|
||||
sum1ValueExp += value
|
||||
if rand.Intn(2) == 1 {
|
||||
sum2.AddStats(&s)
|
||||
sum2ValueExp += value
|
||||
}
|
||||
logOffset += utils.Float64ToFixed64(0.001 / math.Log(2))
|
||||
sum1ValueExp -= sum1ValueExp * 0.001
|
||||
sum2ValueExp -= sum2ValueExp * 0.001
|
||||
}
|
||||
exp := utils.ExpFactor(logOffset)
|
||||
sum1Value := sum1.Value(maxResponseWeights, exp)
|
||||
if sum1Value < sum1ValueExp*0.99 || sum1Value > sum1ValueExp*1.01 {
|
||||
t.Errorf("sum1Value failed (expected %v, got %v)", sum1ValueExp, sum1Value)
|
||||
}
|
||||
sum2Value := sum2.Value(maxResponseWeights, exp)
|
||||
if sum2Value < sum2ValueExp*0.99 || sum2Value > sum2ValueExp*1.01 {
|
||||
t.Errorf("sum2Value failed (expected %v, got %v)", sum2ValueExp, sum2Value)
|
||||
}
|
||||
diff := sum1
|
||||
diff.SubStats(&sum2)
|
||||
diffValue := diff.Value(maxResponseWeights, exp)
|
||||
diffValueExp := sum1ValueExp - sum2ValueExp
|
||||
if diffValue < diffValueExp*0.99 || diffValue > diffValueExp*1.01 {
|
||||
t.Errorf("diffValue failed (expected %v, got %v)", diffValueExp, diffValue)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTimeout(t *testing.T) {
|
||||
testTimeoutRange(t, 0, time.Second)
|
||||
testTimeoutRange(t, time.Second, time.Second*2)
|
||||
testTimeoutRange(t, time.Second, maxResponseTime)
|
||||
}
|
||||
|
||||
func testTimeoutRange(t *testing.T, min, max time.Duration) {
|
||||
s := makeRangeStats(min, max, 1000, utils.ExpirationFactor{Factor: 1})
|
||||
for i := 2; i < 9; i++ {
|
||||
to := s.Timeout(float64(i) / 10)
|
||||
exp := max - (max-min)*time.Duration(i)/10
|
||||
tol := (max - min) / 50
|
||||
if to < exp-tol || to > exp+tol {
|
||||
t.Errorf("Timeout failed (expected %v, got %v)", exp, to)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func makeRangeStats(min, max time.Duration, amount float64, exp utils.ExpirationFactor) ResponseTimeStats {
|
||||
var s ResponseTimeStats
|
||||
amount /= 1000
|
||||
for i := 0; i < 1000; i++ {
|
||||
s.Add(min+(max-min)*time.Duration(i)/999, amount, exp)
|
||||
}
|
||||
return s
|
||||
}
|
511
les/vflux/client/valuetracker.go
Normal file
511
les/vflux/client/valuetracker.go
Normal file
@@ -0,0 +1,511 @@
|
||||
// Copyright 2020 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package client
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common/mclock"
|
||||
"github.com/ethereum/go-ethereum/ethdb"
|
||||
"github.com/ethereum/go-ethereum/les/utils"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
)
|
||||
|
||||
const (
|
||||
vtVersion = 1 // database encoding format for ValueTracker
|
||||
nvtVersion = 1 // database encoding format for NodeValueTracker
|
||||
)
|
||||
|
||||
var (
|
||||
vtKey = []byte("vt:")
|
||||
vtNodeKey = []byte("vtNode:")
|
||||
)
|
||||
|
||||
// NodeValueTracker collects service value statistics for a specific server node
|
||||
type NodeValueTracker struct {
|
||||
lock sync.Mutex
|
||||
|
||||
rtStats, lastRtStats ResponseTimeStats
|
||||
lastTransfer mclock.AbsTime
|
||||
basket serverBasket
|
||||
reqCosts []uint64
|
||||
reqValues *[]float64
|
||||
}
|
||||
|
||||
// init initializes a NodeValueTracker.
|
||||
// Note that the contents of the referenced reqValues slice will not change; a new
|
||||
// reference is passed if the values are updated by ValueTracker.
|
||||
func (nv *NodeValueTracker) init(now mclock.AbsTime, reqValues *[]float64) {
|
||||
reqTypeCount := len(*reqValues)
|
||||
nv.reqCosts = make([]uint64, reqTypeCount)
|
||||
nv.lastTransfer = now
|
||||
nv.reqValues = reqValues
|
||||
nv.basket.init(reqTypeCount)
|
||||
}
|
||||
|
||||
// updateCosts updates the request cost table of the server. The request value factor
|
||||
// is also updated based on the given cost table and the current reference basket.
|
||||
// Note that the contents of the referenced reqValues slice will not change; a new
|
||||
// reference is passed if the values are updated by ValueTracker.
|
||||
func (nv *NodeValueTracker) updateCosts(reqCosts []uint64, reqValues *[]float64, rvFactor float64) {
|
||||
nv.lock.Lock()
|
||||
defer nv.lock.Unlock()
|
||||
|
||||
nv.reqCosts = reqCosts
|
||||
nv.reqValues = reqValues
|
||||
nv.basket.updateRvFactor(rvFactor)
|
||||
}
|
||||
|
||||
// transferStats returns request basket and response time statistics that should be
|
||||
// added to the global statistics. The contents of the server's own request basket are
|
||||
// gradually transferred to the main reference basket and removed from the server basket
|
||||
// with the specified transfer rate.
|
||||
// The response time statistics are retained at both places and therefore the global
|
||||
// distribution is always the sum of the individual server distributions.
|
||||
func (nv *NodeValueTracker) transferStats(now mclock.AbsTime, transferRate float64) (requestBasket, ResponseTimeStats) {
|
||||
nv.lock.Lock()
|
||||
defer nv.lock.Unlock()
|
||||
|
||||
dt := now - nv.lastTransfer
|
||||
nv.lastTransfer = now
|
||||
if dt < 0 {
|
||||
dt = 0
|
||||
}
|
||||
recentRtStats := nv.rtStats
|
||||
recentRtStats.SubStats(&nv.lastRtStats)
|
||||
nv.lastRtStats = nv.rtStats
|
||||
return nv.basket.transfer(-math.Expm1(-transferRate * float64(dt))), recentRtStats
|
||||
}
|
||||
|
||||
// RtStats returns the node's own response time distribution statistics
|
||||
func (nv *NodeValueTracker) RtStats() ResponseTimeStats {
|
||||
nv.lock.Lock()
|
||||
defer nv.lock.Unlock()
|
||||
|
||||
return nv.rtStats
|
||||
}
|
||||
|
||||
// ValueTracker coordinates service value calculation for individual servers and updates
|
||||
// global statistics
|
||||
type ValueTracker struct {
|
||||
clock mclock.Clock
|
||||
lock sync.Mutex
|
||||
quit chan chan struct{}
|
||||
db ethdb.KeyValueStore
|
||||
connected map[enode.ID]*NodeValueTracker
|
||||
reqTypeCount int
|
||||
|
||||
refBasket referenceBasket
|
||||
mappings [][]string
|
||||
currentMapping int
|
||||
initRefBasket requestBasket
|
||||
rtStats ResponseTimeStats
|
||||
|
||||
transferRate float64
|
||||
statsExpLock sync.RWMutex
|
||||
statsExpRate, offlineExpRate float64
|
||||
statsExpirer utils.Expirer
|
||||
statsExpFactor utils.ExpirationFactor
|
||||
}
|
||||
|
||||
type valueTrackerEncV1 struct {
|
||||
Mappings [][]string
|
||||
RefBasketMapping uint
|
||||
RefBasket requestBasket
|
||||
RtStats ResponseTimeStats
|
||||
ExpOffset, SavedAt uint64
|
||||
}
|
||||
|
||||
type nodeValueTrackerEncV1 struct {
|
||||
RtStats ResponseTimeStats
|
||||
ServerBasketMapping uint
|
||||
ServerBasket requestBasket
|
||||
}
|
||||
|
||||
// RequestInfo is an initializer structure for the service vector.
|
||||
type RequestInfo struct {
|
||||
// Name identifies the request type and is used for re-mapping the service vector if necessary
|
||||
Name string
|
||||
// InitAmount and InitValue are used to initialize the reference basket
|
||||
InitAmount, InitValue float64
|
||||
}
|
||||
|
||||
// NewValueTracker creates a new ValueTracker and loads its previously saved state from
|
||||
// the database if possible.
|
||||
func NewValueTracker(db ethdb.KeyValueStore, clock mclock.Clock, reqInfo []RequestInfo, updatePeriod time.Duration, transferRate, statsExpRate, offlineExpRate float64) *ValueTracker {
|
||||
now := clock.Now()
|
||||
|
||||
initRefBasket := requestBasket{items: make([]basketItem, len(reqInfo))}
|
||||
mapping := make([]string, len(reqInfo))
|
||||
|
||||
var sumAmount, sumValue float64
|
||||
for _, req := range reqInfo {
|
||||
sumAmount += req.InitAmount
|
||||
sumValue += req.InitAmount * req.InitValue
|
||||
}
|
||||
scaleValues := sumAmount * basketFactor / sumValue
|
||||
for i, req := range reqInfo {
|
||||
mapping[i] = req.Name
|
||||
initRefBasket.items[i].amount = uint64(req.InitAmount * basketFactor)
|
||||
initRefBasket.items[i].value = uint64(req.InitAmount * req.InitValue * scaleValues)
|
||||
}
|
||||
|
||||
vt := &ValueTracker{
|
||||
clock: clock,
|
||||
connected: make(map[enode.ID]*NodeValueTracker),
|
||||
quit: make(chan chan struct{}),
|
||||
db: db,
|
||||
reqTypeCount: len(initRefBasket.items),
|
||||
initRefBasket: initRefBasket,
|
||||
transferRate: transferRate,
|
||||
statsExpRate: statsExpRate,
|
||||
offlineExpRate: offlineExpRate,
|
||||
}
|
||||
if vt.loadFromDb(mapping) != nil {
|
||||
// previous state not saved or invalid, init with default values
|
||||
vt.refBasket.basket = initRefBasket
|
||||
vt.mappings = [][]string{mapping}
|
||||
vt.currentMapping = 0
|
||||
}
|
||||
vt.statsExpirer.SetRate(now, statsExpRate)
|
||||
vt.refBasket.init(vt.reqTypeCount)
|
||||
vt.periodicUpdate()
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-clock.After(updatePeriod):
|
||||
vt.lock.Lock()
|
||||
vt.periodicUpdate()
|
||||
vt.lock.Unlock()
|
||||
case quit := <-vt.quit:
|
||||
close(quit)
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
return vt
|
||||
}
|
||||
|
||||
// StatsExpirer returns the statistics expirer so that other values can be expired
|
||||
// with the same rate as the service value statistics.
|
||||
func (vt *ValueTracker) StatsExpirer() *utils.Expirer {
|
||||
return &vt.statsExpirer
|
||||
}
|
||||
|
||||
// StatsExpirer returns the current expiration factor so that other values can be expired
|
||||
// with the same rate as the service value statistics.
|
||||
func (vt *ValueTracker) StatsExpFactor() utils.ExpirationFactor {
|
||||
vt.statsExpLock.RLock()
|
||||
defer vt.statsExpLock.RUnlock()
|
||||
|
||||
return vt.statsExpFactor
|
||||
}
|
||||
|
||||
// loadFromDb loads the value tracker's state from the database and converts saved
|
||||
// request basket index mapping if it does not match the specified index to name mapping.
|
||||
func (vt *ValueTracker) loadFromDb(mapping []string) error {
|
||||
enc, err := vt.db.Get(vtKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r := bytes.NewReader(enc)
|
||||
var version uint
|
||||
if err := rlp.Decode(r, &version); err != nil {
|
||||
log.Error("Decoding value tracker state failed", "err", err)
|
||||
return err
|
||||
}
|
||||
if version != vtVersion {
|
||||
log.Error("Unknown ValueTracker version", "stored", version, "current", nvtVersion)
|
||||
return fmt.Errorf("Unknown ValueTracker version %d (current version is %d)", version, vtVersion)
|
||||
}
|
||||
var vte valueTrackerEncV1
|
||||
if err := rlp.Decode(r, &vte); err != nil {
|
||||
log.Error("Decoding value tracker state failed", "err", err)
|
||||
return err
|
||||
}
|
||||
logOffset := utils.Fixed64(vte.ExpOffset)
|
||||
dt := time.Now().UnixNano() - int64(vte.SavedAt)
|
||||
if dt > 0 {
|
||||
logOffset += utils.Float64ToFixed64(float64(dt) * vt.offlineExpRate / math.Log(2))
|
||||
}
|
||||
vt.statsExpirer.SetLogOffset(vt.clock.Now(), logOffset)
|
||||
vt.rtStats = vte.RtStats
|
||||
vt.mappings = vte.Mappings
|
||||
vt.currentMapping = -1
|
||||
loop:
|
||||
for i, m := range vt.mappings {
|
||||
if len(m) != len(mapping) {
|
||||
continue loop
|
||||
}
|
||||
for j, s := range mapping {
|
||||
if m[j] != s {
|
||||
continue loop
|
||||
}
|
||||
}
|
||||
vt.currentMapping = i
|
||||
break
|
||||
}
|
||||
if vt.currentMapping == -1 {
|
||||
vt.currentMapping = len(vt.mappings)
|
||||
vt.mappings = append(vt.mappings, mapping)
|
||||
}
|
||||
if int(vte.RefBasketMapping) == vt.currentMapping {
|
||||
vt.refBasket.basket = vte.RefBasket
|
||||
} else {
|
||||
if vte.RefBasketMapping >= uint(len(vt.mappings)) {
|
||||
log.Error("Unknown request basket mapping", "stored", vte.RefBasketMapping, "current", vt.currentMapping)
|
||||
return fmt.Errorf("Unknown request basket mapping %d (current version is %d)", vte.RefBasketMapping, vt.currentMapping)
|
||||
}
|
||||
vt.refBasket.basket = vte.RefBasket.convertMapping(vt.mappings[vte.RefBasketMapping], mapping, vt.initRefBasket)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// saveToDb saves the value tracker's state to the database
|
||||
func (vt *ValueTracker) saveToDb() {
|
||||
vte := valueTrackerEncV1{
|
||||
Mappings: vt.mappings,
|
||||
RefBasketMapping: uint(vt.currentMapping),
|
||||
RefBasket: vt.refBasket.basket,
|
||||
RtStats: vt.rtStats,
|
||||
ExpOffset: uint64(vt.statsExpirer.LogOffset(vt.clock.Now())),
|
||||
SavedAt: uint64(time.Now().UnixNano()),
|
||||
}
|
||||
enc1, err := rlp.EncodeToBytes(uint(vtVersion))
|
||||
if err != nil {
|
||||
log.Error("Encoding value tracker state failed", "err", err)
|
||||
return
|
||||
}
|
||||
enc2, err := rlp.EncodeToBytes(&vte)
|
||||
if err != nil {
|
||||
log.Error("Encoding value tracker state failed", "err", err)
|
||||
return
|
||||
}
|
||||
if err := vt.db.Put(vtKey, append(enc1, enc2...)); err != nil {
|
||||
log.Error("Saving value tracker state failed", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Stop saves the value tracker's state and each loaded node's individual state and
|
||||
// returns after shutting the internal goroutines down.
|
||||
func (vt *ValueTracker) Stop() {
|
||||
quit := make(chan struct{})
|
||||
vt.quit <- quit
|
||||
<-quit
|
||||
vt.lock.Lock()
|
||||
vt.periodicUpdate()
|
||||
for id, nv := range vt.connected {
|
||||
vt.saveNode(id, nv)
|
||||
}
|
||||
vt.connected = nil
|
||||
vt.saveToDb()
|
||||
vt.lock.Unlock()
|
||||
}
|
||||
|
||||
// Register adds a server node to the value tracker
|
||||
func (vt *ValueTracker) Register(id enode.ID) *NodeValueTracker {
|
||||
vt.lock.Lock()
|
||||
defer vt.lock.Unlock()
|
||||
|
||||
if vt.connected == nil {
|
||||
// ValueTracker has already been stopped
|
||||
return nil
|
||||
}
|
||||
nv := vt.loadOrNewNode(id)
|
||||
nv.init(vt.clock.Now(), &vt.refBasket.reqValues)
|
||||
vt.connected[id] = nv
|
||||
return nv
|
||||
}
|
||||
|
||||
// Unregister removes a server node from the value tracker
|
||||
func (vt *ValueTracker) Unregister(id enode.ID) {
|
||||
vt.lock.Lock()
|
||||
defer vt.lock.Unlock()
|
||||
|
||||
if nv := vt.connected[id]; nv != nil {
|
||||
vt.saveNode(id, nv)
|
||||
delete(vt.connected, id)
|
||||
}
|
||||
}
|
||||
|
||||
// GetNode returns an individual server node's value tracker. If it did not exist before
|
||||
// then a new node is created.
|
||||
func (vt *ValueTracker) GetNode(id enode.ID) *NodeValueTracker {
|
||||
vt.lock.Lock()
|
||||
defer vt.lock.Unlock()
|
||||
|
||||
return vt.loadOrNewNode(id)
|
||||
}
|
||||
|
||||
// loadOrNewNode returns an individual server node's value tracker. If it did not exist before
|
||||
// then a new node is created.
|
||||
func (vt *ValueTracker) loadOrNewNode(id enode.ID) *NodeValueTracker {
|
||||
if nv, ok := vt.connected[id]; ok {
|
||||
return nv
|
||||
}
|
||||
nv := &NodeValueTracker{lastTransfer: vt.clock.Now()}
|
||||
enc, err := vt.db.Get(append(vtNodeKey, id[:]...))
|
||||
if err != nil {
|
||||
return nv
|
||||
}
|
||||
r := bytes.NewReader(enc)
|
||||
var version uint
|
||||
if err := rlp.Decode(r, &version); err != nil {
|
||||
log.Error("Failed to decode node value tracker", "id", id, "err", err)
|
||||
return nv
|
||||
}
|
||||
if version != nvtVersion {
|
||||
log.Error("Unknown NodeValueTracker version", "stored", version, "current", nvtVersion)
|
||||
return nv
|
||||
}
|
||||
var nve nodeValueTrackerEncV1
|
||||
if err := rlp.Decode(r, &nve); err != nil {
|
||||
log.Error("Failed to decode node value tracker", "id", id, "err", err)
|
||||
return nv
|
||||
}
|
||||
nv.rtStats = nve.RtStats
|
||||
nv.lastRtStats = nve.RtStats
|
||||
if int(nve.ServerBasketMapping) == vt.currentMapping {
|
||||
nv.basket.basket = nve.ServerBasket
|
||||
} else {
|
||||
if nve.ServerBasketMapping >= uint(len(vt.mappings)) {
|
||||
log.Error("Unknown request basket mapping", "stored", nve.ServerBasketMapping, "current", vt.currentMapping)
|
||||
return nv
|
||||
}
|
||||
nv.basket.basket = nve.ServerBasket.convertMapping(vt.mappings[nve.ServerBasketMapping], vt.mappings[vt.currentMapping], vt.initRefBasket)
|
||||
}
|
||||
return nv
|
||||
}
|
||||
|
||||
// saveNode saves a server node's value tracker to the database
|
||||
func (vt *ValueTracker) saveNode(id enode.ID, nv *NodeValueTracker) {
|
||||
recentRtStats := nv.rtStats
|
||||
recentRtStats.SubStats(&nv.lastRtStats)
|
||||
vt.rtStats.AddStats(&recentRtStats)
|
||||
nv.lastRtStats = nv.rtStats
|
||||
|
||||
nve := nodeValueTrackerEncV1{
|
||||
RtStats: nv.rtStats,
|
||||
ServerBasketMapping: uint(vt.currentMapping),
|
||||
ServerBasket: nv.basket.basket,
|
||||
}
|
||||
enc1, err := rlp.EncodeToBytes(uint(nvtVersion))
|
||||
if err != nil {
|
||||
log.Error("Failed to encode service value information", "id", id, "err", err)
|
||||
return
|
||||
}
|
||||
enc2, err := rlp.EncodeToBytes(&nve)
|
||||
if err != nil {
|
||||
log.Error("Failed to encode service value information", "id", id, "err", err)
|
||||
return
|
||||
}
|
||||
if err := vt.db.Put(append(vtNodeKey, id[:]...), append(enc1, enc2...)); err != nil {
|
||||
log.Error("Failed to save service value information", "id", id, "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
// UpdateCosts updates the node value tracker's request cost table
|
||||
func (vt *ValueTracker) UpdateCosts(nv *NodeValueTracker, reqCosts []uint64) {
|
||||
vt.lock.Lock()
|
||||
defer vt.lock.Unlock()
|
||||
|
||||
nv.updateCosts(reqCosts, &vt.refBasket.reqValues, vt.refBasket.reqValueFactor(reqCosts))
|
||||
}
|
||||
|
||||
// RtStats returns the global response time distribution statistics
|
||||
func (vt *ValueTracker) RtStats() ResponseTimeStats {
|
||||
vt.lock.Lock()
|
||||
defer vt.lock.Unlock()
|
||||
|
||||
vt.periodicUpdate()
|
||||
return vt.rtStats
|
||||
}
|
||||
|
||||
// periodicUpdate transfers individual node data to the global statistics, normalizes
|
||||
// the reference basket and updates request values. The global state is also saved to
|
||||
// the database with each update.
|
||||
func (vt *ValueTracker) periodicUpdate() {
|
||||
now := vt.clock.Now()
|
||||
vt.statsExpLock.Lock()
|
||||
vt.statsExpFactor = utils.ExpFactor(vt.statsExpirer.LogOffset(now))
|
||||
vt.statsExpLock.Unlock()
|
||||
|
||||
for _, nv := range vt.connected {
|
||||
basket, rtStats := nv.transferStats(now, vt.transferRate)
|
||||
vt.refBasket.add(basket)
|
||||
vt.rtStats.AddStats(&rtStats)
|
||||
}
|
||||
vt.refBasket.normalize()
|
||||
vt.refBasket.updateReqValues()
|
||||
for _, nv := range vt.connected {
|
||||
nv.updateCosts(nv.reqCosts, &vt.refBasket.reqValues, vt.refBasket.reqValueFactor(nv.reqCosts))
|
||||
}
|
||||
vt.saveToDb()
|
||||
}
|
||||
|
||||
type ServedRequest struct {
|
||||
ReqType, Amount uint32
|
||||
}
|
||||
|
||||
// Served adds a served request to the node's statistics. An actual request may be composed
|
||||
// of one or more request types (service vector indices).
|
||||
func (vt *ValueTracker) Served(nv *NodeValueTracker, reqs []ServedRequest, respTime time.Duration) {
|
||||
vt.statsExpLock.RLock()
|
||||
expFactor := vt.statsExpFactor
|
||||
vt.statsExpLock.RUnlock()
|
||||
|
||||
nv.lock.Lock()
|
||||
defer nv.lock.Unlock()
|
||||
|
||||
var value float64
|
||||
for _, r := range reqs {
|
||||
nv.basket.add(r.ReqType, r.Amount, nv.reqCosts[r.ReqType]*uint64(r.Amount), expFactor)
|
||||
value += (*nv.reqValues)[r.ReqType] * float64(r.Amount)
|
||||
}
|
||||
nv.rtStats.Add(respTime, value, vt.statsExpFactor)
|
||||
}
|
||||
|
||||
type RequestStatsItem struct {
|
||||
Name string
|
||||
ReqAmount, ReqValue float64
|
||||
}
|
||||
|
||||
// RequestStats returns the current contents of the reference request basket, with
|
||||
// request values meaning average per request rather than total.
|
||||
func (vt *ValueTracker) RequestStats() []RequestStatsItem {
|
||||
vt.statsExpLock.RLock()
|
||||
expFactor := vt.statsExpFactor
|
||||
vt.statsExpLock.RUnlock()
|
||||
vt.lock.Lock()
|
||||
defer vt.lock.Unlock()
|
||||
|
||||
vt.periodicUpdate()
|
||||
res := make([]RequestStatsItem, len(vt.refBasket.basket.items))
|
||||
for i, item := range vt.refBasket.basket.items {
|
||||
res[i].Name = vt.mappings[vt.currentMapping][i]
|
||||
res[i].ReqAmount = expFactor.Value(float64(item.amount)/basketFactor, vt.refBasket.basket.exp)
|
||||
res[i].ReqValue = vt.refBasket.reqValues[i]
|
||||
}
|
||||
return res
|
||||
}
|
135
les/vflux/client/valuetracker_test.go
Normal file
135
les/vflux/client/valuetracker_test.go
Normal file
@@ -0,0 +1,135 @@
|
||||
// Copyright 2020 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package client
|
||||
|
||||
import (
|
||||
"math"
|
||||
"math/rand"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common/mclock"
|
||||
"github.com/ethereum/go-ethereum/ethdb/memorydb"
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
|
||||
"github.com/ethereum/go-ethereum/les/utils"
|
||||
)
|
||||
|
||||
const (
|
||||
testReqTypes = 3
|
||||
testNodeCount = 5
|
||||
testReqCount = 10000
|
||||
testRounds = 10
|
||||
)
|
||||
|
||||
func TestValueTracker(t *testing.T) {
|
||||
db := memorydb.New()
|
||||
clock := &mclock.Simulated{}
|
||||
requestList := make([]RequestInfo, testReqTypes)
|
||||
relPrices := make([]float64, testReqTypes)
|
||||
totalAmount := make([]uint64, testReqTypes)
|
||||
for i := range requestList {
|
||||
requestList[i] = RequestInfo{Name: "testreq" + strconv.Itoa(i), InitAmount: 1, InitValue: 1}
|
||||
totalAmount[i] = 1
|
||||
relPrices[i] = rand.Float64() + 0.1
|
||||
}
|
||||
nodes := make([]*NodeValueTracker, testNodeCount)
|
||||
for round := 0; round < testRounds; round++ {
|
||||
makeRequests := round < testRounds-2
|
||||
useExpiration := round == testRounds-1
|
||||
var expRate float64
|
||||
if useExpiration {
|
||||
expRate = math.Log(2) / float64(time.Hour*100)
|
||||
}
|
||||
|
||||
vt := NewValueTracker(db, clock, requestList, time.Minute, 1/float64(time.Hour), expRate, expRate)
|
||||
updateCosts := func(i int) {
|
||||
costList := make([]uint64, testReqTypes)
|
||||
baseCost := rand.Float64()*10000000 + 100000
|
||||
for j := range costList {
|
||||
costList[j] = uint64(baseCost * relPrices[j])
|
||||
}
|
||||
vt.UpdateCosts(nodes[i], costList)
|
||||
}
|
||||
for i := range nodes {
|
||||
nodes[i] = vt.Register(enode.ID{byte(i)})
|
||||
updateCosts(i)
|
||||
}
|
||||
if makeRequests {
|
||||
for i := 0; i < testReqCount; i++ {
|
||||
reqType := rand.Intn(testReqTypes)
|
||||
reqAmount := rand.Intn(10) + 1
|
||||
node := rand.Intn(testNodeCount)
|
||||
respTime := time.Duration((rand.Float64() + 1) * float64(time.Second) * float64(node+1) / testNodeCount)
|
||||
totalAmount[reqType] += uint64(reqAmount)
|
||||
vt.Served(nodes[node], []ServedRequest{{uint32(reqType), uint32(reqAmount)}}, respTime)
|
||||
clock.Run(time.Second)
|
||||
}
|
||||
} else {
|
||||
clock.Run(time.Hour * 100)
|
||||
if useExpiration {
|
||||
for i, a := range totalAmount {
|
||||
totalAmount[i] = a / 2
|
||||
}
|
||||
}
|
||||
}
|
||||
vt.Stop()
|
||||
var sumrp, sumrv float64
|
||||
for i, rp := range relPrices {
|
||||
sumrp += rp
|
||||
sumrv += vt.refBasket.reqValues[i]
|
||||
}
|
||||
for i, rp := range relPrices {
|
||||
ratio := vt.refBasket.reqValues[i] * sumrp / (rp * sumrv)
|
||||
if ratio < 0.99 || ratio > 1.01 {
|
||||
t.Errorf("reqValues (%v) does not match relPrices (%v)", vt.refBasket.reqValues, relPrices)
|
||||
break
|
||||
}
|
||||
}
|
||||
exp := utils.ExpFactor(vt.StatsExpirer().LogOffset(clock.Now()))
|
||||
basketAmount := make([]uint64, testReqTypes)
|
||||
for i, bi := range vt.refBasket.basket.items {
|
||||
basketAmount[i] += uint64(exp.Value(float64(bi.amount), vt.refBasket.basket.exp))
|
||||
}
|
||||
if makeRequests {
|
||||
// if we did not make requests in this round then we expect all amounts to be
|
||||
// in the reference basket
|
||||
for _, node := range nodes {
|
||||
for i, bi := range node.basket.basket.items {
|
||||
basketAmount[i] += uint64(exp.Value(float64(bi.amount), node.basket.basket.exp))
|
||||
}
|
||||
}
|
||||
}
|
||||
for i, a := range basketAmount {
|
||||
amount := a / basketFactor
|
||||
if amount+10 < totalAmount[i] || amount > totalAmount[i]+10 {
|
||||
t.Errorf("totalAmount[%d] mismatch in round %d (expected %d, got %d)", i, round, totalAmount[i], amount)
|
||||
}
|
||||
}
|
||||
var sumValue float64
|
||||
for _, node := range nodes {
|
||||
s := node.RtStats()
|
||||
sumValue += s.Value(maxResponseWeights, exp)
|
||||
}
|
||||
s := vt.RtStats()
|
||||
mainValue := s.Value(maxResponseWeights, exp)
|
||||
if sumValue < mainValue-10 || sumValue > mainValue+10 {
|
||||
t.Errorf("Main rtStats value does not match sum of node rtStats values in round %d (main %v, sum %v)", round, mainValue, sumValue)
|
||||
}
|
||||
}
|
||||
}
|
128
les/vflux/client/wrsiterator.go
Normal file
128
les/vflux/client/wrsiterator.go
Normal file
@@ -0,0 +1,128 @@
|
||||
// Copyright 2020 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package client
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/ethereum/go-ethereum/les/utils"
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
"github.com/ethereum/go-ethereum/p2p/nodestate"
|
||||
)
|
||||
|
||||
// WrsIterator returns nodes from the specified selectable set with a weighted random
|
||||
// selection. Selection weights are provided by a callback function.
|
||||
type WrsIterator struct {
|
||||
lock sync.Mutex
|
||||
cond *sync.Cond
|
||||
|
||||
ns *nodestate.NodeStateMachine
|
||||
wrs *utils.WeightedRandomSelect
|
||||
nextNode *enode.Node
|
||||
closed bool
|
||||
}
|
||||
|
||||
// NewWrsIterator creates a new WrsIterator. Nodes are selectable if they have all the required
|
||||
// and none of the disabled flags set. When a node is selected the selectedFlag is set which also
|
||||
// disables further selectability until it is removed or times out.
|
||||
func NewWrsIterator(ns *nodestate.NodeStateMachine, requireFlags, disableFlags nodestate.Flags, weightField nodestate.Field) *WrsIterator {
|
||||
wfn := func(i interface{}) uint64 {
|
||||
n := ns.GetNode(i.(enode.ID))
|
||||
if n == nil {
|
||||
return 0
|
||||
}
|
||||
wt, _ := ns.GetField(n, weightField).(uint64)
|
||||
return wt
|
||||
}
|
||||
|
||||
w := &WrsIterator{
|
||||
ns: ns,
|
||||
wrs: utils.NewWeightedRandomSelect(wfn),
|
||||
}
|
||||
w.cond = sync.NewCond(&w.lock)
|
||||
|
||||
ns.SubscribeField(weightField, func(n *enode.Node, state nodestate.Flags, oldValue, newValue interface{}) {
|
||||
if state.HasAll(requireFlags) && state.HasNone(disableFlags) {
|
||||
w.lock.Lock()
|
||||
w.wrs.Update(n.ID())
|
||||
w.lock.Unlock()
|
||||
w.cond.Signal()
|
||||
}
|
||||
})
|
||||
|
||||
ns.SubscribeState(requireFlags.Or(disableFlags), func(n *enode.Node, oldState, newState nodestate.Flags) {
|
||||
oldMatch := oldState.HasAll(requireFlags) && oldState.HasNone(disableFlags)
|
||||
newMatch := newState.HasAll(requireFlags) && newState.HasNone(disableFlags)
|
||||
if newMatch == oldMatch {
|
||||
return
|
||||
}
|
||||
|
||||
w.lock.Lock()
|
||||
if newMatch {
|
||||
w.wrs.Update(n.ID())
|
||||
} else {
|
||||
w.wrs.Remove(n.ID())
|
||||
}
|
||||
w.lock.Unlock()
|
||||
w.cond.Signal()
|
||||
})
|
||||
return w
|
||||
}
|
||||
|
||||
// Next selects the next node.
|
||||
func (w *WrsIterator) Next() bool {
|
||||
w.nextNode = w.chooseNode()
|
||||
return w.nextNode != nil
|
||||
}
|
||||
|
||||
func (w *WrsIterator) chooseNode() *enode.Node {
|
||||
w.lock.Lock()
|
||||
defer w.lock.Unlock()
|
||||
|
||||
for {
|
||||
for !w.closed && w.wrs.IsEmpty() {
|
||||
w.cond.Wait()
|
||||
}
|
||||
if w.closed {
|
||||
return nil
|
||||
}
|
||||
// Choose the next node at random. Even though w.wrs is guaranteed
|
||||
// non-empty here, Choose might return nil if all items have weight
|
||||
// zero.
|
||||
if c := w.wrs.Choose(); c != nil {
|
||||
id := c.(enode.ID)
|
||||
w.wrs.Remove(id)
|
||||
return w.ns.GetNode(id)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Close ends the iterator.
|
||||
func (w *WrsIterator) Close() {
|
||||
w.lock.Lock()
|
||||
w.closed = true
|
||||
w.lock.Unlock()
|
||||
w.cond.Signal()
|
||||
}
|
||||
|
||||
// Node returns the current node.
|
||||
func (w *WrsIterator) Node() *enode.Node {
|
||||
w.lock.Lock()
|
||||
defer w.lock.Unlock()
|
||||
return w.nextNode
|
||||
}
|
103
les/vflux/client/wrsiterator_test.go
Normal file
103
les/vflux/client/wrsiterator_test.go
Normal file
@@ -0,0 +1,103 @@
|
||||
// Copyright 2020 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package client
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common/mclock"
|
||||
"github.com/ethereum/go-ethereum/p2p/nodestate"
|
||||
)
|
||||
|
||||
var (
|
||||
testSetup = &nodestate.Setup{}
|
||||
sfTest1 = testSetup.NewFlag("test1")
|
||||
sfTest2 = testSetup.NewFlag("test2")
|
||||
sfTest3 = testSetup.NewFlag("test3")
|
||||
sfTest4 = testSetup.NewFlag("test4")
|
||||
sfiTestWeight = testSetup.NewField("nodeWeight", reflect.TypeOf(uint64(0)))
|
||||
)
|
||||
|
||||
const iterTestNodeCount = 6
|
||||
|
||||
func TestWrsIterator(t *testing.T) {
|
||||
ns := nodestate.NewNodeStateMachine(nil, nil, &mclock.Simulated{}, testSetup)
|
||||
w := NewWrsIterator(ns, sfTest2, sfTest3.Or(sfTest4), sfiTestWeight)
|
||||
ns.Start()
|
||||
for i := 1; i <= iterTestNodeCount; i++ {
|
||||
ns.SetState(testNode(i), sfTest1, nodestate.Flags{}, 0)
|
||||
ns.SetField(testNode(i), sfiTestWeight, uint64(1))
|
||||
}
|
||||
next := func() int {
|
||||
ch := make(chan struct{})
|
||||
go func() {
|
||||
w.Next()
|
||||
close(ch)
|
||||
}()
|
||||
select {
|
||||
case <-ch:
|
||||
case <-time.After(time.Second * 5):
|
||||
t.Fatalf("Iterator.Next() timeout")
|
||||
}
|
||||
node := w.Node()
|
||||
ns.SetState(node, sfTest4, nodestate.Flags{}, 0)
|
||||
return testNodeIndex(node.ID())
|
||||
}
|
||||
set := make(map[int]bool)
|
||||
expset := func() {
|
||||
for len(set) > 0 {
|
||||
n := next()
|
||||
if !set[n] {
|
||||
t.Errorf("Item returned by iterator not in the expected set (got %d)", n)
|
||||
}
|
||||
delete(set, n)
|
||||
}
|
||||
}
|
||||
|
||||
ns.SetState(testNode(1), sfTest2, nodestate.Flags{}, 0)
|
||||
ns.SetState(testNode(2), sfTest2, nodestate.Flags{}, 0)
|
||||
ns.SetState(testNode(3), sfTest2, nodestate.Flags{}, 0)
|
||||
set[1] = true
|
||||
set[2] = true
|
||||
set[3] = true
|
||||
expset()
|
||||
ns.SetState(testNode(4), sfTest2, nodestate.Flags{}, 0)
|
||||
ns.SetState(testNode(5), sfTest2.Or(sfTest3), nodestate.Flags{}, 0)
|
||||
ns.SetState(testNode(6), sfTest2, nodestate.Flags{}, 0)
|
||||
set[4] = true
|
||||
set[6] = true
|
||||
expset()
|
||||
ns.SetField(testNode(2), sfiTestWeight, uint64(0))
|
||||
ns.SetState(testNode(1), nodestate.Flags{}, sfTest4, 0)
|
||||
ns.SetState(testNode(2), nodestate.Flags{}, sfTest4, 0)
|
||||
ns.SetState(testNode(3), nodestate.Flags{}, sfTest4, 0)
|
||||
set[1] = true
|
||||
set[3] = true
|
||||
expset()
|
||||
ns.SetField(testNode(2), sfiTestWeight, uint64(1))
|
||||
ns.SetState(testNode(2), nodestate.Flags{}, sfTest2, 0)
|
||||
ns.SetState(testNode(1), nodestate.Flags{}, sfTest4, 0)
|
||||
ns.SetState(testNode(2), sfTest2, sfTest4, 0)
|
||||
ns.SetState(testNode(3), nodestate.Flags{}, sfTest4, 0)
|
||||
set[1] = true
|
||||
set[2] = true
|
||||
set[3] = true
|
||||
expset()
|
||||
ns.Stop()
|
||||
}
|
Reference in New Issue
Block a user