eth/downloader: adaptive quality of service tuning
This commit is contained in:
@ -54,14 +54,15 @@ var (
|
||||
blockTargetRTT = 3 * time.Second / 2 // [eth/61] Target time for completing a block retrieval request
|
||||
blockTTL = 3 * blockTargetRTT // [eth/61] Maximum time allowance before a block request is considered expired
|
||||
|
||||
headerTargetRTT = time.Second // [eth/62] Target time for completing a header retrieval request (only for measurements for now)
|
||||
headerTTL = 3 * time.Second // [eth/62] Time it takes for a header request to time out
|
||||
bodyTargetRTT = 3 * time.Second / 2 // [eth/62] Target time for completing a block body retrieval request
|
||||
bodyTTL = 3 * bodyTargetRTT // [eth/62] Maximum time allowance before a block body request is considered expired
|
||||
receiptTargetRTT = 3 * time.Second / 2 // [eth/63] Target time for completing a receipt retrieval request
|
||||
receiptTTL = 3 * receiptTargetRTT // [eth/63] Maximum time allowance before a receipt request is considered expired
|
||||
stateTargetRTT = 2 * time.Second / 2 // [eth/63] Target time for completing a state trie retrieval request
|
||||
stateTTL = 3 * stateTargetRTT // [eth/63] Maximum time allowance before a node data request is considered expired
|
||||
rttMinEstimate = 2 * time.Second // Minimum round-trip time to target for download requests
|
||||
rttMaxEstimate = 20 * time.Second // Maximum rount-trip time to target for download requests
|
||||
rttMinConfidence = 0.1 // Worse confidence factor in our estimated RTT value
|
||||
ttlScaling = 3 // Constant scaling factor for RTT -> TTL conversion
|
||||
ttlLimit = time.Minute // Maximum TTL allowance to prevent reaching crazy timeouts
|
||||
|
||||
qosTuningPeers = 5 // Number of peers to tune based on (best peers)
|
||||
qosConfidenceCap = 10 // Number of peers above which not to modify RTT confidence
|
||||
qosTuningImpact = 0.25 // Impact that a new tuning target has on the previous value
|
||||
|
||||
maxQueuedHashes = 32 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection)
|
||||
maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
|
||||
@ -113,7 +114,8 @@ type Downloader struct {
|
||||
fsPivotLock *types.Header // Pivot header on critical section entry (cannot change between retries)
|
||||
fsPivotFails int // Number of fast sync failures in the critical section
|
||||
|
||||
interrupt int32 // Atomic boolean to signal termination
|
||||
rttEstimate uint64 // Round trip time to target for download requests
|
||||
rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops)
|
||||
|
||||
// Statistics
|
||||
syncStatsChainOrigin uint64 // Origin block number where syncing started at
|
||||
@ -159,6 +161,9 @@ type Downloader struct {
|
||||
cancelCh chan struct{} // Channel to cancel mid-flight syncs
|
||||
cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers
|
||||
|
||||
quitCh chan struct{} // Quit channel to signal termination
|
||||
quitLock sync.RWMutex // Lock to prevent double closes
|
||||
|
||||
// Testing hooks
|
||||
syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run
|
||||
bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch
|
||||
@ -172,11 +177,13 @@ func New(stateDb ethdb.Database, mux *event.TypeMux, hasHeader headerCheckFn, ha
|
||||
headFastBlock headFastBlockRetrievalFn, commitHeadBlock headBlockCommitterFn, getTd tdRetrievalFn, insertHeaders headerChainInsertFn,
|
||||
insertBlocks blockChainInsertFn, insertReceipts receiptChainInsertFn, rollback chainRollbackFn, dropPeer peerDropFn) *Downloader {
|
||||
|
||||
return &Downloader{
|
||||
dl := &Downloader{
|
||||
mode: FullSync,
|
||||
mux: mux,
|
||||
queue: newQueue(stateDb),
|
||||
peers: newPeerSet(),
|
||||
rttEstimate: uint64(rttMaxEstimate),
|
||||
rttConfidence: uint64(1000000),
|
||||
hasHeader: hasHeader,
|
||||
hasBlockAndState: hasBlockAndState,
|
||||
getHeader: getHeader,
|
||||
@ -203,7 +210,10 @@ func New(stateDb ethdb.Database, mux *event.TypeMux, hasHeader headerCheckFn, ha
|
||||
receiptWakeCh: make(chan bool, 1),
|
||||
stateWakeCh: make(chan bool, 1),
|
||||
headerProcCh: make(chan []*types.Header, 1),
|
||||
quitCh: make(chan struct{}),
|
||||
}
|
||||
go dl.qosTuner()
|
||||
return dl
|
||||
}
|
||||
|
||||
// Progress retrieves the synchronisation boundaries, specifically the origin
|
||||
@ -250,6 +260,8 @@ func (d *Downloader) RegisterPeer(id string, version int, head common.Hash,
|
||||
glog.V(logger.Error).Infoln("Register failed:", err)
|
||||
return err
|
||||
}
|
||||
d.qosReduceConfidence()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -515,7 +527,16 @@ func (d *Downloader) cancel() {
|
||||
// Terminate interrupts the downloader, canceling all pending operations.
|
||||
// The downloader cannot be reused after calling Terminate.
|
||||
func (d *Downloader) Terminate() {
|
||||
atomic.StoreInt32(&d.interrupt, 1)
|
||||
// Close the termination channel (make sure double close is allowed)
|
||||
d.quitLock.Lock()
|
||||
select {
|
||||
case <-d.quitCh:
|
||||
default:
|
||||
close(d.quitCh)
|
||||
}
|
||||
d.quitLock.Unlock()
|
||||
|
||||
// Cancel any pending download requests
|
||||
d.cancel()
|
||||
}
|
||||
|
||||
@ -932,7 +953,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
|
||||
// Reserve a chunk of hashes for a peer. A nil can mean either that
|
||||
// no more hashes are available, or that the peer is known not to
|
||||
// have them.
|
||||
request := d.queue.ReserveBlocks(peer, peer.BlockCapacity())
|
||||
request := d.queue.ReserveBlocks(peer, peer.BlockCapacity(blockTargetRTT))
|
||||
if request == nil {
|
||||
continue
|
||||
}
|
||||
@ -973,7 +994,7 @@ func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) {
|
||||
// Request the advertised remote head block and wait for the response
|
||||
go p.getRelHeaders(p.head, 1, 0, false)
|
||||
|
||||
timeout := time.After(headerTTL)
|
||||
timeout := time.After(d.requestTTL())
|
||||
for {
|
||||
select {
|
||||
case <-d.cancelCh:
|
||||
@ -1041,7 +1062,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
|
||||
|
||||
// Wait for the remote response to the head fetch
|
||||
number, hash := uint64(0), common.Hash{}
|
||||
timeout := time.After(hashTTL)
|
||||
timeout := time.After(d.requestTTL())
|
||||
|
||||
for finished := false; !finished; {
|
||||
select {
|
||||
@ -1118,7 +1139,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
|
||||
// Split our chain interval in two, and request the hash to cross check
|
||||
check := (start + end) / 2
|
||||
|
||||
timeout := time.After(hashTTL)
|
||||
timeout := time.After(d.requestTTL())
|
||||
go p.getAbsHeaders(uint64(check), 1, 0, false)
|
||||
|
||||
// Wait until a reply arrives to this request
|
||||
@ -1199,7 +1220,7 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
|
||||
|
||||
getHeaders := func(from uint64) {
|
||||
request = time.Now()
|
||||
timeout.Reset(headerTTL)
|
||||
timeout.Reset(d.requestTTL())
|
||||
|
||||
if skeleton {
|
||||
glog.V(logger.Detail).Infof("%v: fetching %d skeleton headers from #%d", p, MaxHeaderFetch, from)
|
||||
@ -1311,13 +1332,13 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) (
|
||||
pack := packet.(*headerPack)
|
||||
return d.queue.DeliverHeaders(pack.peerId, pack.headers, d.headerProcCh)
|
||||
}
|
||||
expire = func() map[string]int { return d.queue.ExpireHeaders(headerTTL) }
|
||||
expire = func() map[string]int { return d.queue.ExpireHeaders(d.requestTTL()) }
|
||||
throttle = func() bool { return false }
|
||||
reserve = func(p *peer, count int) (*fetchRequest, bool, error) {
|
||||
return d.queue.ReserveHeaders(p, count), false, nil
|
||||
}
|
||||
fetch = func(p *peer, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) }
|
||||
capacity = func(p *peer) int { return p.HeaderCapacity() }
|
||||
capacity = func(p *peer) int { return p.HeaderCapacity(d.requestRTT()) }
|
||||
setIdle = func(p *peer, accepted int) { p.SetHeadersIdle(accepted) }
|
||||
)
|
||||
err := d.fetchParts(errCancelHeaderFetch, d.headerCh, deliver, d.queue.headerContCh, expire,
|
||||
@ -1341,9 +1362,9 @@ func (d *Downloader) fetchBodies(from uint64) error {
|
||||
pack := packet.(*bodyPack)
|
||||
return d.queue.DeliverBodies(pack.peerId, pack.transactions, pack.uncles)
|
||||
}
|
||||
expire = func() map[string]int { return d.queue.ExpireBodies(bodyTTL) }
|
||||
expire = func() map[string]int { return d.queue.ExpireBodies(d.requestTTL()) }
|
||||
fetch = func(p *peer, req *fetchRequest) error { return p.FetchBodies(req) }
|
||||
capacity = func(p *peer) int { return p.BlockCapacity() }
|
||||
capacity = func(p *peer) int { return p.BlockCapacity(d.requestRTT()) }
|
||||
setIdle = func(p *peer, accepted int) { p.SetBodiesIdle(accepted) }
|
||||
)
|
||||
err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire,
|
||||
@ -1365,9 +1386,9 @@ func (d *Downloader) fetchReceipts(from uint64) error {
|
||||
pack := packet.(*receiptPack)
|
||||
return d.queue.DeliverReceipts(pack.peerId, pack.receipts)
|
||||
}
|
||||
expire = func() map[string]int { return d.queue.ExpireReceipts(receiptTTL) }
|
||||
expire = func() map[string]int { return d.queue.ExpireReceipts(d.requestTTL()) }
|
||||
fetch = func(p *peer, req *fetchRequest) error { return p.FetchReceipts(req) }
|
||||
capacity = func(p *peer) int { return p.ReceiptCapacity() }
|
||||
capacity = func(p *peer) int { return p.ReceiptCapacity(d.requestRTT()) }
|
||||
setIdle = func(p *peer, accepted int) { p.SetReceiptsIdle(accepted) }
|
||||
)
|
||||
err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire,
|
||||
@ -1417,13 +1438,13 @@ func (d *Downloader) fetchNodeData() error {
|
||||
}
|
||||
})
|
||||
}
|
||||
expire = func() map[string]int { return d.queue.ExpireNodeData(stateTTL) }
|
||||
expire = func() map[string]int { return d.queue.ExpireNodeData(d.requestTTL()) }
|
||||
throttle = func() bool { return false }
|
||||
reserve = func(p *peer, count int) (*fetchRequest, bool, error) {
|
||||
return d.queue.ReserveNodeData(p, count), false, nil
|
||||
}
|
||||
fetch = func(p *peer, req *fetchRequest) error { return p.FetchNodeData(req) }
|
||||
capacity = func(p *peer) int { return p.NodeDataCapacity() }
|
||||
capacity = func(p *peer) int { return p.NodeDataCapacity(d.requestRTT()) }
|
||||
setIdle = func(p *peer, accepted int) { p.SetNodeDataIdle(accepted) }
|
||||
)
|
||||
err := d.fetchParts(errCancelStateFetch, d.stateCh, deliver, d.stateWakeCh, expire,
|
||||
@ -1799,8 +1820,10 @@ func (d *Downloader) processContent() error {
|
||||
}
|
||||
for len(results) != 0 {
|
||||
// Check for any termination requests
|
||||
if atomic.LoadInt32(&d.interrupt) == 1 {
|
||||
select {
|
||||
case <-d.quitCh:
|
||||
return errCancelContentProcessing
|
||||
default:
|
||||
}
|
||||
// Retrieve the a batch of results to import
|
||||
var (
|
||||
@ -1901,3 +1924,74 @@ func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, i
|
||||
return errNoSyncActive
|
||||
}
|
||||
}
|
||||
|
||||
// qosTuner is the quality of service tuning loop that occasionally gathers the
|
||||
// peer latency statistics and updates the estimated request round trip time.
|
||||
func (d *Downloader) qosTuner() {
|
||||
for {
|
||||
// Retrieve the current median RTT and integrate into the previoust target RTT
|
||||
rtt := time.Duration(float64(1-qosTuningImpact)*float64(atomic.LoadUint64(&d.rttEstimate)) + qosTuningImpact*float64(d.peers.medianRTT()))
|
||||
atomic.StoreUint64(&d.rttEstimate, uint64(rtt))
|
||||
|
||||
// A new RTT cycle passed, increase our confidence in the estimated RTT
|
||||
conf := atomic.LoadUint64(&d.rttConfidence)
|
||||
conf = conf + (1000000-conf)/2
|
||||
atomic.StoreUint64(&d.rttConfidence, conf)
|
||||
|
||||
// Log the new QoS values and sleep until the next RTT
|
||||
glog.V(logger.Debug).Infof("Quality of service: rtt %v, conf %.3f, ttl %v", rtt, float64(conf)/1000000.0, d.requestTTL())
|
||||
select {
|
||||
case <-d.quitCh:
|
||||
return
|
||||
case <-time.After(rtt):
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// qosReduceConfidence is meant to be called when a new peer joins the downloader's
|
||||
// peer set, needing to reduce the confidence we have in out QoS estimates.
|
||||
func (d *Downloader) qosReduceConfidence() {
|
||||
// If we have a single peer, confidence is always 1
|
||||
peers := uint64(d.peers.Len())
|
||||
if peers == 1 {
|
||||
atomic.StoreUint64(&d.rttConfidence, 1000000)
|
||||
return
|
||||
}
|
||||
// If we have a ton of peers, don't drop confidence)
|
||||
if peers >= uint64(qosConfidenceCap) {
|
||||
return
|
||||
}
|
||||
// Otherwise drop the confidence factor
|
||||
conf := atomic.LoadUint64(&d.rttConfidence) * (peers - 1) / peers
|
||||
if float64(conf)/1000000 < rttMinConfidence {
|
||||
conf = uint64(rttMinConfidence * 1000000)
|
||||
}
|
||||
atomic.StoreUint64(&d.rttConfidence, conf)
|
||||
|
||||
rtt := time.Duration(atomic.LoadUint64(&d.rttEstimate))
|
||||
glog.V(logger.Debug).Infof("Quality of service: rtt %v, conf %.3f, ttl %v", rtt, float64(conf)/1000000.0, d.requestTTL())
|
||||
}
|
||||
|
||||
// requestRTT returns the current target round trip time for a download request
|
||||
// to complete in.
|
||||
//
|
||||
// Note, the returned RTT is .9 of the actually estimated RTT. The reason is that
|
||||
// the downloader tries to adapt queries to the RTT, so multiple RTT values can
|
||||
// be adapted to, but smaller ones are preffered (stabler download stream).
|
||||
func (d *Downloader) requestRTT() time.Duration {
|
||||
return time.Duration(atomic.LoadUint64(&d.rttEstimate)) * 9 / 10
|
||||
}
|
||||
|
||||
// requestTTL returns the current timeout allowance for a single download request
|
||||
// to finish under.
|
||||
func (d *Downloader) requestTTL() time.Duration {
|
||||
var (
|
||||
rtt = time.Duration(atomic.LoadUint64(&d.rttEstimate))
|
||||
conf = float64(atomic.LoadUint64(&d.rttConfidence)) / 1000000.0
|
||||
)
|
||||
ttl := time.Duration(ttlScaling) * time.Duration(float64(rtt)/conf)
|
||||
if ttl > ttlLimit {
|
||||
ttl = ttlLimit
|
||||
}
|
||||
return ttl
|
||||
}
|
||||
|
Reference in New Issue
Block a user