improved correlation via rabbitmq
This commit is contained in:
2
.env
2
.env
@@ -1,3 +1,3 @@
|
||||
# environment file for docker-compose
|
||||
REPO=robotshop
|
||||
TAG=0.4.2
|
||||
TAG=0.4.3
|
||||
|
@@ -8,10 +8,10 @@ COPY src/ /go/src/github.com/instana/dispatch
|
||||
WORKDIR /go/src/github.com/instana/dispatch
|
||||
|
||||
RUN curl -fsSL -o /usr/local/bin/dep ${DEP_URL} && \
|
||||
chmod +x /usr/local/bin/dep && \
|
||||
dep ensure && \
|
||||
go build -o bin/gorcv
|
||||
chmod +x /usr/local/bin/dep
|
||||
|
||||
# @todo stage this build
|
||||
RUN dep ensure && go build -o bin/gorcv
|
||||
|
||||
# TODO stage this build
|
||||
|
||||
CMD bin/gorcv
|
||||
|
@@ -7,11 +7,12 @@ import (
|
||||
"os"
|
||||
"math/rand"
|
||||
"strconv"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/streadway/amqp"
|
||||
"github.com/instana/golang-sensor"
|
||||
ot "github.com/opentracing/opentracing-go"
|
||||
ext "github.com/opentracing/opentracing-go/ext"
|
||||
// ext "github.com/opentracing/opentracing-go/ext"
|
||||
otlog "github.com/opentracing/opentracing-go/log"
|
||||
)
|
||||
|
||||
@@ -81,7 +82,19 @@ func failOnError(err error, msg string) {
|
||||
}
|
||||
}
|
||||
|
||||
func createSpan(headers map[string]interface{}) {
|
||||
func getOrderId(order []byte) string {
|
||||
id := "unknown"
|
||||
var f interface{}
|
||||
err := json.Unmarshal(order, &f)
|
||||
if err == nil {
|
||||
m := f.(map[string]interface{})
|
||||
id = m["orderid"].(string)
|
||||
}
|
||||
|
||||
return id
|
||||
}
|
||||
|
||||
func createSpan(headers map[string]interface{}, order string) {
|
||||
// headers is map[string]interface{}
|
||||
// carrier is map[string]string
|
||||
carrier := make(ot.TextMapCarrier)
|
||||
@@ -90,6 +103,9 @@ func createSpan(headers map[string]interface{}) {
|
||||
carrier[k] = v.(string)
|
||||
}
|
||||
|
||||
// get the order id
|
||||
log.Printf("order %s\n", order)
|
||||
|
||||
// opentracing
|
||||
var span ot.Span
|
||||
tracer := ot.GlobalTracer()
|
||||
@@ -97,24 +113,37 @@ func createSpan(headers map[string]interface{}) {
|
||||
if err == nil {
|
||||
log.Println("Creating span")
|
||||
// create span
|
||||
span = tracer.StartSpan("dispatch", ot.ChildOf(spanContext), ext.SpanKindConsumer)
|
||||
ext.MessageBusDestination.Set(span, "orders")
|
||||
ext.Component.Set(span, "dispatch")
|
||||
span = tracer.StartSpan("getOrder", ot.ChildOf(spanContext))
|
||||
span.SetTag("exchange", "robot-shop")
|
||||
span.SetTag("sort", "consume")
|
||||
span.SetTag("address", "rabbitmq")
|
||||
span.SetTag("key", "orders")
|
||||
span.LogFields(otlog.String("orderid", order))
|
||||
defer span.Finish()
|
||||
|
||||
time.Sleep(time.Duration(42 + rand.Int63n(42)) * time.Millisecond)
|
||||
if rand.Intn(100) < errorPercent {
|
||||
span.SetTag("error", true)
|
||||
span.LogFields(
|
||||
otlog.String("error.kind", "Exception"),
|
||||
otlog.String("message", "Failed to dispatch to SOP"))
|
||||
log.Println("Span tagged with error")
|
||||
log.Println("Span tagged with error")
|
||||
}
|
||||
processSale(span)
|
||||
} else {
|
||||
log.Println("Failed to get span context")
|
||||
log.Println(err)
|
||||
}
|
||||
}
|
||||
|
||||
func processSale(parentSpan ot.Span) {
|
||||
tracer := ot.GlobalTracer()
|
||||
span := tracer.StartSpan("processSale", ot.ChildOf(parentSpan.Context()))
|
||||
defer span.Finish()
|
||||
span.LogFields(otlog.String("info", "Order sent for processing"))
|
||||
time.Sleep(time.Duration(42 + rand.Int63n(42)) * time.Millisecond)
|
||||
}
|
||||
|
||||
|
||||
func main() {
|
||||
// Instana tracing
|
||||
@@ -170,7 +199,8 @@ func main() {
|
||||
for d := range msgs {
|
||||
log.Printf("Order %s\n", d.Body)
|
||||
log.Printf("Headers %v\n", d.Headers)
|
||||
go createSpan(d.Headers)
|
||||
id := getOrderId(d.Body)
|
||||
go createSpan(d.Headers, id)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
@@ -108,25 +108,30 @@ def queueOrder(order):
|
||||
# opentracing tracer is automatically set to Instana tracer
|
||||
# start a span
|
||||
|
||||
# context = ot.tracer.current_context()
|
||||
parent_span = ot.tracer.active_span
|
||||
with ot.tracer.start_active_span('queue-order', child_of=parent_span,
|
||||
with ot.tracer.start_active_span('queueOrder', child_of=parent_span,
|
||||
tags={
|
||||
tags.SPAN_KIND: 'producer',
|
||||
tags.COMPONENT: 'payment',
|
||||
'message_bus.destination': 'orders'
|
||||
}
|
||||
) as scope:
|
||||
'exchange': Publisher.EXCHANGE,
|
||||
'key': Publisher.ROUTING_KEY
|
||||
}) as tscope:
|
||||
with ot.tracer.start_active_span('rabbitmq', child_of=tscope.span,
|
||||
tags={
|
||||
'exchange': Publisher.EXCHANGE,
|
||||
'sort': 'publish',
|
||||
'address': Publisher.HOST,
|
||||
'key': Publisher.ROUTING_KEY
|
||||
}
|
||||
) as scope:
|
||||
|
||||
# For screenshot demo requirements optionally add in a bit of delay
|
||||
delay = int(os.getenv('PAYMENT_DELAY_MS', 0))
|
||||
time.sleep(delay / 1000)
|
||||
# For screenshot demo requirements optionally add in a bit of delay
|
||||
delay = int(os.getenv('PAYMENT_DELAY_MS', 0))
|
||||
time.sleep(delay / 1000)
|
||||
|
||||
headers = {}
|
||||
ot.tracer.inject(scope.span.context, ot.Format.HTTP_HEADERS, headers)
|
||||
app.logger.info('msg headers {}'.format(headers))
|
||||
headers = {}
|
||||
ot.tracer.inject(scope.span.context, ot.Format.HTTP_HEADERS, headers)
|
||||
app.logger.info('msg headers {}'.format(headers))
|
||||
|
||||
publisher.publish(order, headers)
|
||||
publisher.publish(order, headers)
|
||||
|
||||
|
||||
# RabbitMQ
|
||||
|
Reference in New Issue
Block a user