Merge pull request #35 from instana/proooooofiles
This commit is contained in:
@@ -1,16 +1,10 @@
|
|||||||
FROM golang:1.12.7
|
FROM golang:1.14
|
||||||
|
|
||||||
ENV GOPATH=/go
|
WORKDIR /go/src/app
|
||||||
|
|
||||||
RUN apt-get update && apt-get install -y go-dep
|
COPY . .
|
||||||
|
|
||||||
WORKDIR /go/src/github.com/instana/dispatch
|
RUN go get -d -v ./...
|
||||||
|
RUN go install -v ./...
|
||||||
|
|
||||||
COPY src/ /go/src/github.com/instana/dispatch
|
CMD dispatch
|
||||||
|
|
||||||
RUN dep init && dep ensure
|
|
||||||
RUN go build -o bin/gorcv
|
|
||||||
|
|
||||||
# TODO stage this build
|
|
||||||
|
|
||||||
CMD bin/gorcv
|
|
||||||
|
8
dispatch/go.mod
Normal file
8
dispatch/go.mod
Normal file
@@ -0,0 +1,8 @@
|
|||||||
|
module github.com/instana/robot-shop/dispatch
|
||||||
|
|
||||||
|
go 1.14
|
||||||
|
|
||||||
|
require (
|
||||||
|
github.com/instana/go-sensor v1.13.1 // indirect
|
||||||
|
github.com/streadway/amqp v1.0.0 // indirect
|
||||||
|
)
|
25
dispatch/go.sum
Normal file
25
dispatch/go.sum
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
|
github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE=
|
||||||
|
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
|
||||||
|
github.com/instana/go-sensor v1.13.1 h1:9txDn4B9s2sSRDe30WZs5jxK/Isdk56w/AOxIIa/vRo=
|
||||||
|
github.com/instana/go-sensor v1.13.1/go.mod h1:lDfZvfAyo5DWJ2AvOHINRTUTG5TMdZNnwXXcFRtfZBE=
|
||||||
|
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
|
||||||
|
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||||
|
github.com/looplab/fsm v0.1.0 h1:Qte7Zdn/5hBNbXzP7yxVU4OIFHWXBovyTT2LaBTyC20=
|
||||||
|
github.com/looplab/fsm v0.1.0/go.mod h1:m2VaOfDHxqXBBMgc26m6yUOwkFn8H2AlJDE+jd/uafI=
|
||||||
|
github.com/opentracing/basictracer-go v1.0.0 h1:YyUAhaEfjoWXclZVJ9sGoNct7j4TVk7lZWlQw5UXuoo=
|
||||||
|
github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74=
|
||||||
|
github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU=
|
||||||
|
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
|
||||||
|
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
|
github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo=
|
||||||
|
github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
|
||||||
|
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||||
|
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
|
||||||
|
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||||
|
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||||
|
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||||
|
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||||
|
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||||
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
|
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
220
dispatch/main.go
Normal file
220
dispatch/main.go
Normal file
@@ -0,0 +1,220 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"math/rand"
|
||||||
|
"os"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/instana/go-sensor"
|
||||||
|
ot "github.com/opentracing/opentracing-go"
|
||||||
|
"github.com/opentracing/opentracing-go/ext"
|
||||||
|
otlog "github.com/opentracing/opentracing-go/log"
|
||||||
|
"github.com/streadway/amqp"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
Service = "dispatch"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
amqpUri string
|
||||||
|
rabbitChan *amqp.Channel
|
||||||
|
rabbitCloseError chan *amqp.Error
|
||||||
|
rabbitReady chan bool
|
||||||
|
errorPercent int
|
||||||
|
)
|
||||||
|
|
||||||
|
func connectToRabbitMQ(uri string) *amqp.Connection {
|
||||||
|
for {
|
||||||
|
conn, err := amqp.Dial(uri)
|
||||||
|
if err == nil {
|
||||||
|
return conn
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Println(err)
|
||||||
|
log.Printf("Reconnecting to %s\n", uri)
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func rabbitConnector(uri string) {
|
||||||
|
var rabbitErr *amqp.Error
|
||||||
|
|
||||||
|
for {
|
||||||
|
rabbitErr = <-rabbitCloseError
|
||||||
|
if rabbitErr == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("Connecting to %s\n", amqpUri)
|
||||||
|
rabbitConn := connectToRabbitMQ(uri)
|
||||||
|
rabbitConn.NotifyClose(rabbitCloseError)
|
||||||
|
|
||||||
|
var err error
|
||||||
|
|
||||||
|
// create mappings here
|
||||||
|
rabbitChan, err = rabbitConn.Channel()
|
||||||
|
failOnError(err, "Failed to create channel")
|
||||||
|
|
||||||
|
// create exchange
|
||||||
|
err = rabbitChan.ExchangeDeclare("robot-shop", "direct", true, false, false, false, nil)
|
||||||
|
failOnError(err, "Failed to create exchange")
|
||||||
|
|
||||||
|
// create queue
|
||||||
|
queue, err := rabbitChan.QueueDeclare("orders", true, false, false, false, nil)
|
||||||
|
failOnError(err, "Failed to create queue")
|
||||||
|
|
||||||
|
// bind queue to exchange
|
||||||
|
err = rabbitChan.QueueBind(queue.Name, "orders", "robot-shop", false, nil)
|
||||||
|
failOnError(err, "Failed to bind queue")
|
||||||
|
|
||||||
|
// signal ready
|
||||||
|
rabbitReady <- true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func failOnError(err error, msg string) {
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("%s : %s", msg, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func getOrderId(order []byte) string {
|
||||||
|
id := "unknown"
|
||||||
|
var f interface{}
|
||||||
|
err := json.Unmarshal(order, &f)
|
||||||
|
if err == nil {
|
||||||
|
m := f.(map[string]interface{})
|
||||||
|
id = m["orderid"].(string)
|
||||||
|
}
|
||||||
|
|
||||||
|
return id
|
||||||
|
}
|
||||||
|
|
||||||
|
func createSpan(headers map[string]interface{}, order string) {
|
||||||
|
// headers is map[string]interface{}
|
||||||
|
// carrier is map[string]string
|
||||||
|
carrier := make(ot.TextMapCarrier)
|
||||||
|
// convert by copying k, v
|
||||||
|
for k, v := range headers {
|
||||||
|
carrier[k] = v.(string)
|
||||||
|
}
|
||||||
|
|
||||||
|
// get the order id
|
||||||
|
log.Printf("order %s\n", order)
|
||||||
|
|
||||||
|
// opentracing
|
||||||
|
var span ot.Span
|
||||||
|
tracer := ot.GlobalTracer()
|
||||||
|
spanContext, err := tracer.Extract(ot.HTTPHeaders, carrier)
|
||||||
|
if err == nil {
|
||||||
|
log.Println("Creating child span")
|
||||||
|
// create child span
|
||||||
|
span = tracer.StartSpan("getOrder", ot.ChildOf(spanContext))
|
||||||
|
} else {
|
||||||
|
log.Println(err)
|
||||||
|
log.Println("Failed to get context from headers")
|
||||||
|
log.Println("Creating root span")
|
||||||
|
// create root span
|
||||||
|
span = tracer.StartSpan("getOrder")
|
||||||
|
}
|
||||||
|
|
||||||
|
span.SetTag(string(ext.SpanKind), ext.SpanKindConsumerEnum)
|
||||||
|
span.SetTag(string(ext.MessageBusDestination), "robot-shop")
|
||||||
|
span.SetTag("exchange", "robot-shop")
|
||||||
|
span.SetTag("sort", "consume")
|
||||||
|
span.SetTag("address", "rabbitmq")
|
||||||
|
span.SetTag("key", "orders")
|
||||||
|
span.LogFields(otlog.String("orderid", order))
|
||||||
|
defer span.Finish()
|
||||||
|
|
||||||
|
time.Sleep(time.Duration(42+rand.Int63n(42)) * time.Millisecond)
|
||||||
|
if rand.Intn(100) < errorPercent {
|
||||||
|
span.SetTag("error", true)
|
||||||
|
span.LogFields(
|
||||||
|
otlog.String("error.kind", "Exception"),
|
||||||
|
otlog.String("message", "Failed to dispatch to SOP"))
|
||||||
|
log.Println("Span tagged with error")
|
||||||
|
}
|
||||||
|
|
||||||
|
processSale(span)
|
||||||
|
}
|
||||||
|
|
||||||
|
func processSale(parentSpan ot.Span) {
|
||||||
|
tracer := ot.GlobalTracer()
|
||||||
|
span := tracer.StartSpan("processSale", ot.ChildOf(parentSpan.Context()))
|
||||||
|
defer span.Finish()
|
||||||
|
span.SetTag(string(ext.SpanKind), "intermediate")
|
||||||
|
span.LogFields(otlog.String("info", "Order sent for processing"))
|
||||||
|
time.Sleep(time.Duration(42+rand.Int63n(42)) * time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
// Instana tracing
|
||||||
|
ot.InitGlobalTracer(instana.NewTracerWithOptions(&instana.Options{
|
||||||
|
Service: Service,
|
||||||
|
LogLevel: instana.Info,
|
||||||
|
EnableAutoProfile: true,
|
||||||
|
}))
|
||||||
|
|
||||||
|
// Init amqpUri
|
||||||
|
// get host from environment
|
||||||
|
amqpHost, ok := os.LookupEnv("AMQP_HOST")
|
||||||
|
if !ok {
|
||||||
|
amqpHost = "rabbitmq"
|
||||||
|
}
|
||||||
|
amqpUri = fmt.Sprintf("amqp://guest:guest@%s:5672/", amqpHost)
|
||||||
|
|
||||||
|
// get error threshold from environment
|
||||||
|
errorPercent = 0
|
||||||
|
epct, ok := os.LookupEnv("DISPATCH_ERROR_PERCENT")
|
||||||
|
if ok {
|
||||||
|
epcti, err := strconv.Atoi(epct)
|
||||||
|
if err == nil {
|
||||||
|
if epcti > 100 {
|
||||||
|
epcti = 100
|
||||||
|
}
|
||||||
|
if epcti < 0 {
|
||||||
|
epcti = 0
|
||||||
|
}
|
||||||
|
errorPercent = epcti
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.Printf("Error Percent is %d\n", errorPercent)
|
||||||
|
|
||||||
|
// MQ error channel
|
||||||
|
rabbitCloseError = make(chan *amqp.Error)
|
||||||
|
|
||||||
|
// MQ ready channel
|
||||||
|
rabbitReady = make(chan bool)
|
||||||
|
|
||||||
|
go rabbitConnector(amqpUri)
|
||||||
|
|
||||||
|
rabbitCloseError <- amqp.ErrClosed
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
// wait for rabbit to be ready
|
||||||
|
ready := <-rabbitReady
|
||||||
|
log.Printf("Rabbit MQ ready %v\n", ready)
|
||||||
|
|
||||||
|
// subscribe to bound queue
|
||||||
|
msgs, err := rabbitChan.Consume("orders", "", true, false, false, false, nil)
|
||||||
|
failOnError(err, "Failed to consume")
|
||||||
|
|
||||||
|
for d := range msgs {
|
||||||
|
log.Printf("Order %s\n", d.Body)
|
||||||
|
log.Printf("Headers %v\n", d.Headers)
|
||||||
|
id := getOrderId(d.Body)
|
||||||
|
go createSpan(d.Headers, id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
log.Println("Waiting for messages")
|
||||||
|
select {}
|
||||||
|
}
|
@@ -1,219 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"log"
|
|
||||||
"time"
|
|
||||||
"os"
|
|
||||||
"math/rand"
|
|
||||||
"strconv"
|
|
||||||
"encoding/json"
|
|
||||||
|
|
||||||
"github.com/streadway/amqp"
|
|
||||||
"github.com/instana/go-sensor"
|
|
||||||
ot "github.com/opentracing/opentracing-go"
|
|
||||||
ext "github.com/opentracing/opentracing-go/ext"
|
|
||||||
otlog "github.com/opentracing/opentracing-go/log"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
Service = "dispatch"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
amqpUri string
|
|
||||||
rabbitChan *amqp.Channel
|
|
||||||
rabbitCloseError chan *amqp.Error
|
|
||||||
rabbitReady chan bool
|
|
||||||
errorPercent int
|
|
||||||
)
|
|
||||||
|
|
||||||
func connectToRabbitMQ(uri string) *amqp.Connection {
|
|
||||||
for {
|
|
||||||
conn, err := amqp.Dial(uri)
|
|
||||||
if err == nil {
|
|
||||||
return conn
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Println(err)
|
|
||||||
log.Printf("Reconnecting to %s\n", uri)
|
|
||||||
time.Sleep(1 * time.Second)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func rabbitConnector(uri string) {
|
|
||||||
var rabbitErr *amqp.Error
|
|
||||||
|
|
||||||
for {
|
|
||||||
rabbitErr = <-rabbitCloseError
|
|
||||||
if rabbitErr != nil {
|
|
||||||
log.Printf("Connecting to %s\n", amqpUri)
|
|
||||||
rabbitConn := connectToRabbitMQ(uri)
|
|
||||||
rabbitConn.NotifyClose(rabbitCloseError)
|
|
||||||
|
|
||||||
var err error
|
|
||||||
|
|
||||||
// create mappings here
|
|
||||||
rabbitChan, err = rabbitConn.Channel()
|
|
||||||
failOnError(err, "Failed to create channel")
|
|
||||||
|
|
||||||
// create exchange
|
|
||||||
err = rabbitChan.ExchangeDeclare("robot-shop", "direct", true, false, false, false, nil)
|
|
||||||
failOnError(err, "Failed to create exchange")
|
|
||||||
|
|
||||||
// create queue
|
|
||||||
queue, err := rabbitChan.QueueDeclare("orders", true, false, false, false, nil)
|
|
||||||
failOnError(err, "Failed to create queue")
|
|
||||||
|
|
||||||
// bind queue to exchange
|
|
||||||
err = rabbitChan.QueueBind(queue.Name, "orders", "robot-shop", false, nil)
|
|
||||||
failOnError(err, "Failed to bind queue")
|
|
||||||
|
|
||||||
// signal ready
|
|
||||||
rabbitReady <- true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func failOnError(err error, msg string) {
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("$s : %s", msg, err)
|
|
||||||
panic(fmt.Sprintf("%s : %s", msg, err))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func getOrderId(order []byte) string {
|
|
||||||
id := "unknown"
|
|
||||||
var f interface{}
|
|
||||||
err := json.Unmarshal(order, &f)
|
|
||||||
if err == nil {
|
|
||||||
m := f.(map[string]interface{})
|
|
||||||
id = m["orderid"].(string)
|
|
||||||
}
|
|
||||||
|
|
||||||
return id
|
|
||||||
}
|
|
||||||
|
|
||||||
func createSpan(headers map[string]interface{}, order string) {
|
|
||||||
// headers is map[string]interface{}
|
|
||||||
// carrier is map[string]string
|
|
||||||
carrier := make(ot.TextMapCarrier)
|
|
||||||
// convert by copying k, v
|
|
||||||
for k, v := range headers {
|
|
||||||
carrier[k] = v.(string)
|
|
||||||
}
|
|
||||||
|
|
||||||
// get the order id
|
|
||||||
log.Printf("order %s\n", order)
|
|
||||||
|
|
||||||
// opentracing
|
|
||||||
var span ot.Span
|
|
||||||
tracer := ot.GlobalTracer()
|
|
||||||
spanContext, err := tracer.Extract(ot.HTTPHeaders, carrier)
|
|
||||||
if err == nil {
|
|
||||||
log.Println("Creating child span")
|
|
||||||
// create child span
|
|
||||||
span = tracer.StartSpan("getOrder", ot.ChildOf(spanContext))
|
|
||||||
} else {
|
|
||||||
log.Println(err)
|
|
||||||
log.Println("Failed to get context from headers")
|
|
||||||
log.Println("Creating root span")
|
|
||||||
// create root span
|
|
||||||
span = tracer.StartSpan("getOrder")
|
|
||||||
}
|
|
||||||
|
|
||||||
span.SetTag(string(ext.SpanKind), ext.SpanKindConsumerEnum)
|
|
||||||
span.SetTag(string(ext.MessageBusDestination), "robot-shop")
|
|
||||||
span.SetTag("exchange", "robot-shop")
|
|
||||||
span.SetTag("sort", "consume")
|
|
||||||
span.SetTag("address", "rabbitmq")
|
|
||||||
span.SetTag("key", "orders")
|
|
||||||
span.LogFields(otlog.String("orderid", order))
|
|
||||||
defer span.Finish()
|
|
||||||
|
|
||||||
time.Sleep(time.Duration(42 + rand.Int63n(42)) * time.Millisecond)
|
|
||||||
if rand.Intn(100) < errorPercent {
|
|
||||||
span.SetTag("error", true)
|
|
||||||
span.LogFields(
|
|
||||||
otlog.String("error.kind", "Exception"),
|
|
||||||
otlog.String("message", "Failed to dispatch to SOP"))
|
|
||||||
log.Println("Span tagged with error")
|
|
||||||
}
|
|
||||||
|
|
||||||
processSale(span)
|
|
||||||
}
|
|
||||||
|
|
||||||
func processSale(parentSpan ot.Span) {
|
|
||||||
tracer := ot.GlobalTracer()
|
|
||||||
span := tracer.StartSpan("processSale", ot.ChildOf(parentSpan.Context()))
|
|
||||||
defer span.Finish()
|
|
||||||
span.SetTag(string(ext.SpanKind), "intermediate")
|
|
||||||
span.LogFields(otlog.String("info", "Order sent for processing"))
|
|
||||||
time.Sleep(time.Duration(42 + rand.Int63n(42)) * time.Millisecond)
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
// Instana tracing
|
|
||||||
ot.InitGlobalTracer(instana.NewTracerWithOptions(&instana.Options{
|
|
||||||
Service: Service,
|
|
||||||
LogLevel: instana.Info}))
|
|
||||||
|
|
||||||
// Init amqpUri
|
|
||||||
// get host from environment
|
|
||||||
amqpHost, ok := os.LookupEnv("AMQP_HOST")
|
|
||||||
if !ok {
|
|
||||||
amqpHost = "rabbitmq"
|
|
||||||
}
|
|
||||||
amqpUri = fmt.Sprintf("amqp://guest:guest@%s:5672/", amqpHost)
|
|
||||||
|
|
||||||
// get error threshold from environment
|
|
||||||
errorPercent = 0
|
|
||||||
epct, ok := os.LookupEnv("DISPATCH_ERROR_PERCENT")
|
|
||||||
if ok {
|
|
||||||
epcti, err := strconv.Atoi(epct)
|
|
||||||
if err == nil {
|
|
||||||
if epcti > 100 {
|
|
||||||
epcti = 100
|
|
||||||
}
|
|
||||||
if epcti < 0 {
|
|
||||||
epcti = 0
|
|
||||||
}
|
|
||||||
errorPercent = epcti
|
|
||||||
}
|
|
||||||
}
|
|
||||||
log.Printf("Error Percent is %d\n", errorPercent)
|
|
||||||
|
|
||||||
// MQ error channel
|
|
||||||
rabbitCloseError = make(chan *amqp.Error)
|
|
||||||
|
|
||||||
// MQ ready channel
|
|
||||||
rabbitReady = make(chan bool)
|
|
||||||
|
|
||||||
go rabbitConnector(amqpUri)
|
|
||||||
|
|
||||||
rabbitCloseError <- amqp.ErrClosed
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
// wait for rabbit to be ready
|
|
||||||
ready := <-rabbitReady
|
|
||||||
log.Printf("Rabbit MQ ready %v\n", ready)
|
|
||||||
|
|
||||||
// subscribe to bound queue
|
|
||||||
msgs, err := rabbitChan.Consume("orders", "", true, false, false, false, nil)
|
|
||||||
failOnError(err, "Failed to consume")
|
|
||||||
|
|
||||||
for d := range msgs {
|
|
||||||
log.Printf("Order %s\n", d.Body)
|
|
||||||
log.Printf("Headers %v\n", d.Headers)
|
|
||||||
id := getOrderId(d.Body)
|
|
||||||
go createSpan(d.Headers, id)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
log.Println("Waiting for messages")
|
|
||||||
forever := make(chan bool)
|
|
||||||
<-forever
|
|
||||||
}
|
|
Reference in New Issue
Block a user