les: fix balance expiration (#22343)
* les/lespay/server: fix balance expiration and add test * les: move client balances to a new db * les: rename lespayDb to lesDb
This commit is contained in:
		@@ -81,7 +81,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*LightEthereum, error) {
 | 
				
			|||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	lespayDb, err := stack.OpenDatabase("lespay", 0, 0, "eth/db/lespay")
 | 
						lesDb, err := stack.OpenDatabase("les.client", 0, 0, "eth/db/les.client")
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -99,6 +99,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*LightEthereum, error) {
 | 
				
			|||||||
			chainConfig: chainConfig,
 | 
								chainConfig: chainConfig,
 | 
				
			||||||
			iConfig:     light.DefaultClientIndexerConfig,
 | 
								iConfig:     light.DefaultClientIndexerConfig,
 | 
				
			||||||
			chainDb:     chainDb,
 | 
								chainDb:     chainDb,
 | 
				
			||||||
 | 
								lesDb:       lesDb,
 | 
				
			||||||
			closeCh:     make(chan struct{}),
 | 
								closeCh:     make(chan struct{}),
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
		peers:          peers,
 | 
							peers:          peers,
 | 
				
			||||||
@@ -108,13 +109,13 @@ func New(stack *node.Node, config *ethconfig.Config) (*LightEthereum, error) {
 | 
				
			|||||||
		engine:         ethconfig.CreateConsensusEngine(stack, chainConfig, &config.Ethash, nil, false, chainDb),
 | 
							engine:         ethconfig.CreateConsensusEngine(stack, chainConfig, &config.Ethash, nil, false, chainDb),
 | 
				
			||||||
		bloomRequests:  make(chan chan *bloombits.Retrieval),
 | 
							bloomRequests:  make(chan chan *bloombits.Retrieval),
 | 
				
			||||||
		bloomIndexer:   core.NewBloomIndexer(chainDb, params.BloomBitsBlocksClient, params.HelperTrieConfirmations),
 | 
							bloomIndexer:   core.NewBloomIndexer(chainDb, params.BloomBitsBlocksClient, params.HelperTrieConfirmations),
 | 
				
			||||||
		valueTracker:   vfc.NewValueTracker(lespayDb, &mclock.System{}, requestList, time.Minute, 1/float64(time.Hour), 1/float64(time.Hour*100), 1/float64(time.Hour*1000)),
 | 
							valueTracker:   vfc.NewValueTracker(lesDb, &mclock.System{}, requestList, time.Minute, 1/float64(time.Hour), 1/float64(time.Hour*100), 1/float64(time.Hour*1000)),
 | 
				
			||||||
		p2pServer:      stack.Server(),
 | 
							p2pServer:      stack.Server(),
 | 
				
			||||||
		p2pConfig:      &stack.Config().P2P,
 | 
							p2pConfig:      &stack.Config().P2P,
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	peers.subscribe((*vtSubscription)(leth.valueTracker))
 | 
						peers.subscribe((*vtSubscription)(leth.valueTracker))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	leth.serverPool = newServerPool(lespayDb, []byte("serverpool:"), leth.valueTracker, time.Second, nil, &mclock.System{}, config.UltraLightServers)
 | 
						leth.serverPool = newServerPool(lesDb, []byte("serverpool:"), leth.valueTracker, time.Second, nil, &mclock.System{}, config.UltraLightServers)
 | 
				
			||||||
	peers.subscribe(leth.serverPool)
 | 
						peers.subscribe(leth.serverPool)
 | 
				
			||||||
	leth.dialCandidates = leth.serverPool.dialIterator
 | 
						leth.dialCandidates = leth.serverPool.dialIterator
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -331,6 +332,7 @@ func (s *LightEthereum) Stop() error {
 | 
				
			|||||||
	s.eventMux.Stop()
 | 
						s.eventMux.Stop()
 | 
				
			||||||
	rawdb.PopUncleanShutdownMarker(s.chainDb)
 | 
						rawdb.PopUncleanShutdownMarker(s.chainDb)
 | 
				
			||||||
	s.chainDb.Close()
 | 
						s.chainDb.Close()
 | 
				
			||||||
 | 
						s.lesDb.Close()
 | 
				
			||||||
	s.wg.Wait()
 | 
						s.wg.Wait()
 | 
				
			||||||
	log.Info("Light ethereum stopped")
 | 
						log.Info("Light ethereum stopped")
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -105,7 +105,7 @@ type clientInfo struct {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// newClientPool creates a new client pool
 | 
					// newClientPool creates a new client pool
 | 
				
			||||||
func newClientPool(ns *nodestate.NodeStateMachine, lespayDb ethdb.Database, minCap uint64, connectedBias time.Duration, clock mclock.Clock, removePeer func(enode.ID)) *clientPool {
 | 
					func newClientPool(ns *nodestate.NodeStateMachine, lesDb ethdb.Database, minCap uint64, connectedBias time.Duration, clock mclock.Clock, removePeer func(enode.ID)) *clientPool {
 | 
				
			||||||
	pool := &clientPool{
 | 
						pool := &clientPool{
 | 
				
			||||||
		ns:                  ns,
 | 
							ns:                  ns,
 | 
				
			||||||
		BalanceTrackerSetup: balanceTrackerSetup,
 | 
							BalanceTrackerSetup: balanceTrackerSetup,
 | 
				
			||||||
@@ -115,7 +115,7 @@ func newClientPool(ns *nodestate.NodeStateMachine, lespayDb ethdb.Database, minC
 | 
				
			|||||||
		connectedBias:       connectedBias,
 | 
							connectedBias:       connectedBias,
 | 
				
			||||||
		removePeer:          removePeer,
 | 
							removePeer:          removePeer,
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	pool.bt = vfs.NewBalanceTracker(ns, balanceTrackerSetup, lespayDb, clock, &utils.Expirer{}, &utils.Expirer{})
 | 
						pool.bt = vfs.NewBalanceTracker(ns, balanceTrackerSetup, lesDb, clock, &utils.Expirer{}, &utils.Expirer{})
 | 
				
			||||||
	pool.pp = vfs.NewPriorityPool(ns, priorityPoolSetup, clock, minCap, connectedBias, 4)
 | 
						pool.pp = vfs.NewPriorityPool(ns, priorityPoolSetup, clock, minCap, connectedBias, 4)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// set default expiration constants used by tests
 | 
						// set default expiration constants used by tests
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -51,7 +51,7 @@ type lesCommons struct {
 | 
				
			|||||||
	config                       *ethconfig.Config
 | 
						config                       *ethconfig.Config
 | 
				
			||||||
	chainConfig                  *params.ChainConfig
 | 
						chainConfig                  *params.ChainConfig
 | 
				
			||||||
	iConfig                      *light.IndexerConfig
 | 
						iConfig                      *light.IndexerConfig
 | 
				
			||||||
	chainDb                      ethdb.Database
 | 
						chainDb, lesDb               ethdb.Database
 | 
				
			||||||
	chainReader                  chainReader
 | 
						chainReader                  chainReader
 | 
				
			||||||
	chtIndexer, bloomTrieIndexer *core.ChainIndexer
 | 
						chtIndexer, bloomTrieIndexer *core.ChainIndexer
 | 
				
			||||||
	oracle                       *checkpointoracle.CheckpointOracle
 | 
						oracle                       *checkpointoracle.CheckpointOracle
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -85,6 +85,10 @@ type LesServer struct {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func NewLesServer(node *node.Node, e ethBackend, config *ethconfig.Config) (*LesServer, error) {
 | 
					func NewLesServer(node *node.Node, e ethBackend, config *ethconfig.Config) (*LesServer, error) {
 | 
				
			||||||
 | 
						lesDb, err := node.OpenDatabase("les.server", 0, 0, "eth/db/les.server")
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	ns := nodestate.NewNodeStateMachine(nil, nil, mclock.System{}, serverSetup)
 | 
						ns := nodestate.NewNodeStateMachine(nil, nil, mclock.System{}, serverSetup)
 | 
				
			||||||
	// Calculate the number of threads used to service the light client
 | 
						// Calculate the number of threads used to service the light client
 | 
				
			||||||
	// requests based on the user-specified value.
 | 
						// requests based on the user-specified value.
 | 
				
			||||||
@@ -99,6 +103,7 @@ func NewLesServer(node *node.Node, e ethBackend, config *ethconfig.Config) (*Les
 | 
				
			|||||||
			chainConfig:      e.BlockChain().Config(),
 | 
								chainConfig:      e.BlockChain().Config(),
 | 
				
			||||||
			iConfig:          light.DefaultServerIndexerConfig,
 | 
								iConfig:          light.DefaultServerIndexerConfig,
 | 
				
			||||||
			chainDb:          e.ChainDb(),
 | 
								chainDb:          e.ChainDb(),
 | 
				
			||||||
 | 
								lesDb:            lesDb,
 | 
				
			||||||
			chainReader:      e.BlockChain(),
 | 
								chainReader:      e.BlockChain(),
 | 
				
			||||||
			chtIndexer:       light.NewChtIndexer(e.ChainDb(), nil, params.CHTFrequency, params.HelperTrieProcessConfirmations, true),
 | 
								chtIndexer:       light.NewChtIndexer(e.ChainDb(), nil, params.CHTFrequency, params.HelperTrieProcessConfirmations, true),
 | 
				
			||||||
			bloomTrieIndexer: light.NewBloomTrieIndexer(e.ChainDb(), nil, params.BloomBitsBlocks, params.BloomTrieFrequency, true),
 | 
								bloomTrieIndexer: light.NewBloomTrieIndexer(e.ChainDb(), nil, params.BloomBitsBlocks, params.BloomTrieFrequency, true),
 | 
				
			||||||
@@ -136,7 +141,7 @@ func NewLesServer(node *node.Node, e ethBackend, config *ethconfig.Config) (*Les
 | 
				
			|||||||
		srv.maxCapacity = totalRecharge
 | 
							srv.maxCapacity = totalRecharge
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	srv.fcManager.SetCapacityLimits(srv.minCapacity, srv.maxCapacity, srv.minCapacity*2)
 | 
						srv.fcManager.SetCapacityLimits(srv.minCapacity, srv.maxCapacity, srv.minCapacity*2)
 | 
				
			||||||
	srv.clientPool = newClientPool(ns, srv.chainDb, srv.minCapacity, defaultConnectedBias, mclock.System{}, srv.dropClient)
 | 
						srv.clientPool = newClientPool(ns, lesDb, srv.minCapacity, defaultConnectedBias, mclock.System{}, srv.dropClient)
 | 
				
			||||||
	srv.clientPool.setDefaultFactors(vfs.PriceFactors{TimeFactor: 0, CapacityFactor: 1, RequestFactor: 1}, vfs.PriceFactors{TimeFactor: 0, CapacityFactor: 1, RequestFactor: 1})
 | 
						srv.clientPool.setDefaultFactors(vfs.PriceFactors{TimeFactor: 0, CapacityFactor: 1, RequestFactor: 1}, vfs.PriceFactors{TimeFactor: 0, CapacityFactor: 1, RequestFactor: 1})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	checkpoint := srv.latestLocalCheckpoint()
 | 
						checkpoint := srv.latestLocalCheckpoint()
 | 
				
			||||||
@@ -222,6 +227,7 @@ func (s *LesServer) Stop() error {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	// Note, bloom trie indexer is closed by parent bloombits indexer.
 | 
						// Note, bloom trie indexer is closed by parent bloombits indexer.
 | 
				
			||||||
	s.chtIndexer.Close()
 | 
						s.chtIndexer.Close()
 | 
				
			||||||
 | 
						s.lesDb.Close()
 | 
				
			||||||
	s.wg.Wait()
 | 
						s.wg.Wait()
 | 
				
			||||||
	log.Info("Les server stopped")
 | 
						log.Info("Les server stopped")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -17,6 +17,7 @@
 | 
				
			|||||||
package server
 | 
					package server
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
 | 
						"math"
 | 
				
			||||||
	"math/rand"
 | 
						"math/rand"
 | 
				
			||||||
	"reflect"
 | 
						"reflect"
 | 
				
			||||||
	"testing"
 | 
						"testing"
 | 
				
			||||||
@@ -69,7 +70,9 @@ func (b *balanceTestSetup) newNode(capacity uint64) *NodeBalance {
 | 
				
			|||||||
	node := enode.SignNull(&enr.Record{}, enode.ID{})
 | 
						node := enode.SignNull(&enr.Record{}, enode.ID{})
 | 
				
			||||||
	b.ns.SetState(node, testFlag, nodestate.Flags{}, 0)
 | 
						b.ns.SetState(node, testFlag, nodestate.Flags{}, 0)
 | 
				
			||||||
	b.ns.SetField(node, btTestSetup.connAddressField, "")
 | 
						b.ns.SetField(node, btTestSetup.connAddressField, "")
 | 
				
			||||||
	b.ns.SetField(node, ppTestSetup.CapacityField, capacity)
 | 
						if capacity != 0 {
 | 
				
			||||||
 | 
							b.ns.SetField(node, ppTestSetup.CapacityField, capacity)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	n, _ := b.ns.GetField(node, btTestSetup.BalanceField).(*NodeBalance)
 | 
						n, _ := b.ns.GetField(node, btTestSetup.BalanceField).(*NodeBalance)
 | 
				
			||||||
	return n
 | 
						return n
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
@@ -398,3 +401,71 @@ func TestCallback(t *testing.T) {
 | 
				
			|||||||
	case <-time.NewTimer(time.Millisecond * 100).C:
 | 
						case <-time.NewTimer(time.Millisecond * 100).C:
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestBalancePersistence(t *testing.T) {
 | 
				
			||||||
 | 
						clock := &mclock.Simulated{}
 | 
				
			||||||
 | 
						ns := nodestate.NewNodeStateMachine(nil, nil, clock, testSetup)
 | 
				
			||||||
 | 
						db := memorydb.New()
 | 
				
			||||||
 | 
						posExp := &utils.Expirer{}
 | 
				
			||||||
 | 
						negExp := &utils.Expirer{}
 | 
				
			||||||
 | 
						posExp.SetRate(clock.Now(), math.Log(2)/float64(time.Hour*2)) // halves every two hours
 | 
				
			||||||
 | 
						negExp.SetRate(clock.Now(), math.Log(2)/float64(time.Hour))   // halves every hour
 | 
				
			||||||
 | 
						bt := NewBalanceTracker(ns, btTestSetup, db, clock, posExp, negExp)
 | 
				
			||||||
 | 
						ns.Start()
 | 
				
			||||||
 | 
						bts := &balanceTestSetup{
 | 
				
			||||||
 | 
							clock: clock,
 | 
				
			||||||
 | 
							ns:    ns,
 | 
				
			||||||
 | 
							bt:    bt,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						var nb *NodeBalance
 | 
				
			||||||
 | 
						exp := func(expPos, expNeg uint64) {
 | 
				
			||||||
 | 
							pos, neg := nb.GetBalance()
 | 
				
			||||||
 | 
							if pos != expPos {
 | 
				
			||||||
 | 
								t.Fatalf("Positive balance incorrect, want %v, got %v", expPos, pos)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if neg != expNeg {
 | 
				
			||||||
 | 
								t.Fatalf("Positive balance incorrect, want %v, got %v", expPos, pos)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						expTotal := func(expTotal uint64) {
 | 
				
			||||||
 | 
							total := bt.TotalTokenAmount()
 | 
				
			||||||
 | 
							if total != expTotal {
 | 
				
			||||||
 | 
								t.Fatalf("Total token amount incorrect, want %v, got %v", expTotal, total)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						expTotal(0)
 | 
				
			||||||
 | 
						nb = bts.newNode(0)
 | 
				
			||||||
 | 
						expTotal(0)
 | 
				
			||||||
 | 
						nb.SetBalance(16000000000, 16000000000)
 | 
				
			||||||
 | 
						exp(16000000000, 16000000000)
 | 
				
			||||||
 | 
						expTotal(16000000000)
 | 
				
			||||||
 | 
						clock.Run(time.Hour * 2)
 | 
				
			||||||
 | 
						exp(8000000000, 4000000000)
 | 
				
			||||||
 | 
						expTotal(8000000000)
 | 
				
			||||||
 | 
						bt.Stop()
 | 
				
			||||||
 | 
						ns.Stop()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						clock = &mclock.Simulated{}
 | 
				
			||||||
 | 
						ns = nodestate.NewNodeStateMachine(nil, nil, clock, testSetup)
 | 
				
			||||||
 | 
						posExp = &utils.Expirer{}
 | 
				
			||||||
 | 
						negExp = &utils.Expirer{}
 | 
				
			||||||
 | 
						posExp.SetRate(clock.Now(), math.Log(2)/float64(time.Hour*2)) // halves every two hours
 | 
				
			||||||
 | 
						negExp.SetRate(clock.Now(), math.Log(2)/float64(time.Hour))   // halves every hour
 | 
				
			||||||
 | 
						bt = NewBalanceTracker(ns, btTestSetup, db, clock, posExp, negExp)
 | 
				
			||||||
 | 
						ns.Start()
 | 
				
			||||||
 | 
						bts = &balanceTestSetup{
 | 
				
			||||||
 | 
							clock: clock,
 | 
				
			||||||
 | 
							ns:    ns,
 | 
				
			||||||
 | 
							bt:    bt,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						expTotal(8000000000)
 | 
				
			||||||
 | 
						nb = bts.newNode(0)
 | 
				
			||||||
 | 
						exp(8000000000, 4000000000)
 | 
				
			||||||
 | 
						expTotal(8000000000)
 | 
				
			||||||
 | 
						clock.Run(time.Hour * 2)
 | 
				
			||||||
 | 
						exp(4000000000, 1000000000)
 | 
				
			||||||
 | 
						expTotal(4000000000)
 | 
				
			||||||
 | 
						bt.Stop()
 | 
				
			||||||
 | 
						ns.Stop()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -99,6 +99,10 @@ func NewBalanceTracker(ns *nodestate.NodeStateMachine, setup BalanceTrackerSetup
 | 
				
			|||||||
		balanceTimer:        utils.NewUpdateTimer(clock, time.Second*10),
 | 
							balanceTimer:        utils.NewUpdateTimer(clock, time.Second*10),
 | 
				
			||||||
		quit:                make(chan struct{}),
 | 
							quit:                make(chan struct{}),
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						posOffset, negOffset := bt.ndb.getExpiration()
 | 
				
			||||||
 | 
						posExp.SetLogOffset(clock.Now(), posOffset)
 | 
				
			||||||
 | 
						negExp.SetLogOffset(clock.Now(), negOffset)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	bt.ndb.forEachBalance(false, func(id enode.ID, balance utils.ExpiredValue) bool {
 | 
						bt.ndb.forEachBalance(false, func(id enode.ID, balance utils.ExpiredValue) bool {
 | 
				
			||||||
		bt.inactive.AddExp(balance)
 | 
							bt.inactive.AddExp(balance)
 | 
				
			||||||
		return true
 | 
							return true
 | 
				
			||||||
@@ -177,7 +181,7 @@ func (bt *BalanceTracker) TotalTokenAmount() uint64 {
 | 
				
			|||||||
	bt.balanceTimer.Update(func(_ time.Duration) bool {
 | 
						bt.balanceTimer.Update(func(_ time.Duration) bool {
 | 
				
			||||||
		bt.active = utils.ExpiredValue{}
 | 
							bt.active = utils.ExpiredValue{}
 | 
				
			||||||
		bt.ns.ForEach(nodestate.Flags{}, nodestate.Flags{}, func(node *enode.Node, state nodestate.Flags) {
 | 
							bt.ns.ForEach(nodestate.Flags{}, nodestate.Flags{}, func(node *enode.Node, state nodestate.Flags) {
 | 
				
			||||||
			if n, ok := bt.ns.GetField(node, bt.BalanceField).(*NodeBalance); ok {
 | 
								if n, ok := bt.ns.GetField(node, bt.BalanceField).(*NodeBalance); ok && n.active {
 | 
				
			||||||
				pos, _ := n.GetRawBalance()
 | 
									pos, _ := n.GetRawBalance()
 | 
				
			||||||
				bt.active.AddExp(pos)
 | 
									bt.active.AddExp(pos)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user