core, eth, trie: bloom filter for trie node dedup during fast sync (#19489)
* core, eth, trie: bloom filter for trie node dedup during fast sync * eth/downloader, trie: address review comments * core, ethdb, trie: restart fast-sync bloom construction now and again * eth/downloader: initialize fast sync bloom on startup * eth: reenable eth/62 until we properly remove it
This commit is contained in:
37
trie/sync.go
37
trie/sync.go
@@ -76,15 +76,17 @@ type Sync struct {
|
||||
membatch *syncMemBatch // Memory buffer to avoid frequent database writes
|
||||
requests map[common.Hash]*request // Pending requests pertaining to a key hash
|
||||
queue *prque.Prque // Priority queue with the pending requests
|
||||
bloom *SyncBloom // Bloom filter for fast node existence checks
|
||||
}
|
||||
|
||||
// NewSync creates a new trie data download scheduler.
|
||||
func NewSync(root common.Hash, database ethdb.Reader, callback LeafCallback) *Sync {
|
||||
func NewSync(root common.Hash, database ethdb.Reader, callback LeafCallback, bloom *SyncBloom) *Sync {
|
||||
ts := &Sync{
|
||||
database: database,
|
||||
membatch: newSyncMemBatch(),
|
||||
requests: make(map[common.Hash]*request),
|
||||
queue: prque.New(nil),
|
||||
bloom: bloom,
|
||||
}
|
||||
ts.AddSubTrie(root, 0, common.Hash{}, callback)
|
||||
return ts
|
||||
@@ -99,10 +101,14 @@ func (s *Sync) AddSubTrie(root common.Hash, depth int, parent common.Hash, callb
|
||||
if _, ok := s.membatch.batch[root]; ok {
|
||||
return
|
||||
}
|
||||
key := root.Bytes()
|
||||
blob, _ := s.database.Get(key)
|
||||
if local, err := decodeNode(key, blob); local != nil && err == nil {
|
||||
return
|
||||
if s.bloom.Contains(root[:]) {
|
||||
// Bloom filter says this might be a duplicate, double check
|
||||
blob, _ := s.database.Get(root[:])
|
||||
if local, err := decodeNode(root[:], blob); local != nil && err == nil {
|
||||
return
|
||||
}
|
||||
// False positive, bump fault meter
|
||||
bloomFaultMeter.Mark(1)
|
||||
}
|
||||
// Assemble the new sub-trie sync request
|
||||
req := &request{
|
||||
@@ -134,8 +140,13 @@ func (s *Sync) AddRawEntry(hash common.Hash, depth int, parent common.Hash) {
|
||||
if _, ok := s.membatch.batch[hash]; ok {
|
||||
return
|
||||
}
|
||||
if ok, _ := s.database.Has(hash.Bytes()); ok {
|
||||
return
|
||||
if s.bloom.Contains(hash[:]) {
|
||||
// Bloom filter says this might be a duplicate, double check
|
||||
if ok, _ := s.database.Has(hash[:]); ok {
|
||||
return
|
||||
}
|
||||
// False positive, bump fault meter
|
||||
bloomFaultMeter.Mark(1)
|
||||
}
|
||||
// Assemble the new sub-trie sync request
|
||||
req := &request{
|
||||
@@ -219,8 +230,9 @@ func (s *Sync) Commit(dbw ethdb.Writer) (int, error) {
|
||||
if err := dbw.Put(key[:], s.membatch.batch[key]); err != nil {
|
||||
return i, err
|
||||
}
|
||||
s.bloom.Add(key[:])
|
||||
}
|
||||
written := len(s.membatch.order)
|
||||
written := len(s.membatch.order) // TODO(karalabe): could an order change improve write performance?
|
||||
|
||||
// Drop the membatch data and return
|
||||
s.membatch = newSyncMemBatch()
|
||||
@@ -292,8 +304,13 @@ func (s *Sync) children(req *request, object node) ([]*request, error) {
|
||||
if _, ok := s.membatch.batch[hash]; ok {
|
||||
continue
|
||||
}
|
||||
if ok, _ := s.database.Has(node); ok {
|
||||
continue
|
||||
if s.bloom.Contains(node) {
|
||||
// Bloom filter says this might be a duplicate, double check
|
||||
if ok, _ := s.database.Has(node); ok {
|
||||
continue
|
||||
}
|
||||
// False positive, bump fault meter
|
||||
bloomFaultMeter.Mark(1)
|
||||
}
|
||||
// Locally unknown node, schedule for retrieval
|
||||
requests = append(requests, &request{
|
||||
|
Reference in New Issue
Block a user