swarm: integrate OpenTracing; propagate ctx to internal APIs (#17169)
* swarm: propagate ctx, enable opentracing * swarm/tracing: log error when tracing is misconfigured
This commit is contained in:
committed by
Balint Gabor
parent
f7d3678c28
commit
7c9314f231
@ -17,6 +17,7 @@
|
||||
package stream
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
@ -25,7 +26,9 @@ import (
|
||||
"github.com/ethereum/go-ethereum/metrics"
|
||||
"github.com/ethereum/go-ethereum/swarm/log"
|
||||
bv "github.com/ethereum/go-ethereum/swarm/network/bitvector"
|
||||
"github.com/ethereum/go-ethereum/swarm/spancontext"
|
||||
"github.com/ethereum/go-ethereum/swarm/storage"
|
||||
opentracing "github.com/opentracing/opentracing-go"
|
||||
)
|
||||
|
||||
// Stream defines a unique stream identifier.
|
||||
@ -71,17 +74,17 @@ type RequestSubscriptionMsg struct {
|
||||
Priority uint8 // delivered on priority channel
|
||||
}
|
||||
|
||||
func (p *Peer) handleRequestSubscription(req *RequestSubscriptionMsg) (err error) {
|
||||
func (p *Peer) handleRequestSubscription(ctx context.Context, req *RequestSubscriptionMsg) (err error) {
|
||||
log.Debug(fmt.Sprintf("handleRequestSubscription: streamer %s to subscribe to %s with stream %s", p.streamer.addr.ID(), p.ID(), req.Stream))
|
||||
return p.streamer.Subscribe(p.ID(), req.Stream, req.History, req.Priority)
|
||||
}
|
||||
|
||||
func (p *Peer) handleSubscribeMsg(req *SubscribeMsg) (err error) {
|
||||
func (p *Peer) handleSubscribeMsg(ctx context.Context, req *SubscribeMsg) (err error) {
|
||||
metrics.GetOrRegisterCounter("peer.handlesubscribemsg", nil).Inc(1)
|
||||
|
||||
defer func() {
|
||||
if err != nil {
|
||||
if e := p.Send(SubscribeErrorMsg{
|
||||
if e := p.Send(context.TODO(), SubscribeErrorMsg{
|
||||
Error: err.Error(),
|
||||
}); e != nil {
|
||||
log.Error("send stream subscribe error message", "err", err)
|
||||
@ -181,9 +184,15 @@ func (m OfferedHashesMsg) String() string {
|
||||
|
||||
// handleOfferedHashesMsg protocol msg handler calls the incoming streamer interface
|
||||
// Filter method
|
||||
func (p *Peer) handleOfferedHashesMsg(req *OfferedHashesMsg) error {
|
||||
func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg) error {
|
||||
metrics.GetOrRegisterCounter("peer.handleofferedhashes", nil).Inc(1)
|
||||
|
||||
var sp opentracing.Span
|
||||
ctx, sp = spancontext.StartSpan(
|
||||
ctx,
|
||||
"handle.offered.hashes")
|
||||
defer sp.Finish()
|
||||
|
||||
c, _, err := p.getOrSetClient(req.Stream, req.From, req.To)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -197,7 +206,7 @@ func (p *Peer) handleOfferedHashesMsg(req *OfferedHashesMsg) error {
|
||||
for i := 0; i < len(hashes); i += HashSize {
|
||||
hash := hashes[i : i+HashSize]
|
||||
|
||||
if wait := c.NeedData(hash); wait != nil {
|
||||
if wait := c.NeedData(ctx, hash); wait != nil {
|
||||
want.Set(i/HashSize, true)
|
||||
wg.Add(1)
|
||||
// create request and wait until the chunk data arrives and is stored
|
||||
@ -260,7 +269,7 @@ func (p *Peer) handleOfferedHashesMsg(req *OfferedHashesMsg) error {
|
||||
return
|
||||
}
|
||||
log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To)
|
||||
err := p.SendPriority(msg, c.priority)
|
||||
err := p.SendPriority(ctx, msg, c.priority)
|
||||
if err != nil {
|
||||
log.Warn("SendPriority err, so dropping peer", "err", err)
|
||||
p.Drop(err)
|
||||
@ -285,7 +294,7 @@ func (m WantedHashesMsg) String() string {
|
||||
// handleWantedHashesMsg protocol msg handler
|
||||
// * sends the next batch of unsynced keys
|
||||
// * sends the actual data chunks as per WantedHashesMsg
|
||||
func (p *Peer) handleWantedHashesMsg(req *WantedHashesMsg) error {
|
||||
func (p *Peer) handleWantedHashesMsg(ctx context.Context, req *WantedHashesMsg) error {
|
||||
metrics.GetOrRegisterCounter("peer.handlewantedhashesmsg", nil).Inc(1)
|
||||
|
||||
log.Trace("received wanted batch", "peer", p.ID(), "stream", req.Stream, "from", req.From, "to", req.To)
|
||||
@ -314,7 +323,7 @@ func (p *Peer) handleWantedHashesMsg(req *WantedHashesMsg) error {
|
||||
metrics.GetOrRegisterCounter("peer.handlewantedhashesmsg.actualget", nil).Inc(1)
|
||||
|
||||
hash := hashes[i*HashSize : (i+1)*HashSize]
|
||||
data, err := s.GetData(hash)
|
||||
data, err := s.GetData(ctx, hash)
|
||||
if err != nil {
|
||||
return fmt.Errorf("handleWantedHashesMsg get data %x: %v", hash, err)
|
||||
}
|
||||
@ -323,7 +332,7 @@ func (p *Peer) handleWantedHashesMsg(req *WantedHashesMsg) error {
|
||||
if length := len(chunk.SData); length < 9 {
|
||||
log.Error("Chunk.SData to sync is too short", "len(chunk.SData)", length, "address", chunk.Addr)
|
||||
}
|
||||
if err := p.Deliver(chunk, s.priority); err != nil {
|
||||
if err := p.Deliver(ctx, chunk, s.priority); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@ -363,7 +372,7 @@ func (m TakeoverProofMsg) String() string {
|
||||
return fmt.Sprintf("Stream: '%v' [%v-%v], Root: %x, Sig: %x", m.Stream, m.Start, m.End, m.Root, m.Sig)
|
||||
}
|
||||
|
||||
func (p *Peer) handleTakeoverProofMsg(req *TakeoverProofMsg) error {
|
||||
func (p *Peer) handleTakeoverProofMsg(ctx context.Context, req *TakeoverProofMsg) error {
|
||||
_, err := p.getServer(req.Stream)
|
||||
// store the strongest takeoverproof for the stream in streamer
|
||||
return err
|
||||
|
Reference in New Issue
Block a user