Files
robot-shop/dispatch/src/main.go

154 lines
3.9 KiB
Go
Raw Normal View History

2018-01-29 17:40:12 +00:00
package main
import (
"fmt"
"log"
"time"
"os"
2018-01-29 17:40:12 +00:00
"github.com/streadway/amqp"
2018-01-31 10:11:58 +00:00
"github.com/instana/golang-sensor"
ot "github.com/opentracing/opentracing-go"
2018-01-31 17:27:17 +00:00
ext "github.com/opentracing/opentracing-go/ext"
2018-01-31 10:11:58 +00:00
)
const (
Service = "Dispatch"
2018-01-29 17:40:12 +00:00
)
var (
amqpUri string
2018-01-29 17:40:12 +00:00
rabbitChan *amqp.Channel
rabbitCloseError chan *amqp.Error
rabbitReady chan bool
)
func connectToRabbitMQ(uri string) *amqp.Connection {
for {
conn, err := amqp.Dial(uri)
if err == nil {
return conn
}
log.Println(err)
log.Printf("Reconnecting to %s\n", uri)
time.Sleep(1 * time.Second)
}
}
func rabbitConnector(uri string) {
var rabbitErr *amqp.Error
for {
rabbitErr = <-rabbitCloseError
if rabbitErr != nil {
log.Printf("Connecting to %s\n", amqpUri)
2018-01-30 09:16:51 +00:00
rabbitConn := connectToRabbitMQ(uri)
2018-01-29 17:40:12 +00:00
rabbitConn.NotifyClose(rabbitCloseError)
var err error
// create mappings here
rabbitChan, err = rabbitConn.Channel()
failOnError(err, "Failed to create channel")
// create exchange
err = rabbitChan.ExchangeDeclare("robot-shop", "direct", true, false, false, false, nil)
failOnError(err, "Failed to create exchange")
// create queue
queue, err := rabbitChan.QueueDeclare("orders", true, false, false, false, nil)
failOnError(err, "Failed to create queue")
// bind queue to exchange
err = rabbitChan.QueueBind(queue.Name, "orders", "robot-shop", false, nil)
failOnError(err, "Failed to bind queue")
// signal ready
rabbitReady <- true
}
}
}
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("$s : %s", msg, err)
panic(fmt.Sprintf("%s : %s", msg, err))
}
}
2018-01-31 17:27:17 +00:00
func createSpan(headers map[string]interface{}) {
// headers is map[string]interface{}
// carrier is map[string]string
carrier := make(ot.TextMapCarrier)
// convert by copying k, v
for k, v := range headers {
carrier[k] = v.(string)
}
// opentracing
var span ot.Span
tracer := ot.GlobalTracer()
spanContext, err := tracer.Extract(ot.HTTPHeaders, carrier)
if err == nil {
log.Println("Creating span")
// create span
span = tracer.StartSpan("dispatch", ot.ChildOf(spanContext), ext.SpanKindConsumer)
ext.MessageBusDestination.Set(span, "orders")
ext.Component.Set(span, "dispatch")
defer span.Finish()
time.Sleep(42 * time.Millisecond)
} else {
log.Println("Failed to get span context")
log.Println(err)
}
}
2018-01-29 17:40:12 +00:00
func main() {
2018-01-31 10:11:58 +00:00
// Instana tracing
ot.InitGlobalTracer(instana.NewTracerWithOptions(&instana.Options{
Service: Service,
LogLevel: instana.Info}))
// Init amqpUri
2018-04-16 15:19:38 +01:00
// get host from environment
amqpHost, ok := os.LookupEnv("AMQP_HOST")
if !ok {
amqpHost = "rabbitmq"
}
amqpUri = fmt.Sprintf("amqp://guest:guest@%s:5672/", amqpHost)
2018-01-29 17:40:12 +00:00
// MQ error channel
rabbitCloseError = make(chan *amqp.Error)
// MQ ready channel
rabbitReady = make(chan bool)
go rabbitConnector(amqpUri)
rabbitCloseError <- amqp.ErrClosed
go func() {
for {
// wait for rabbit to be ready
ready := <-rabbitReady
log.Printf("Rabbit MQ ready %v\n", ready)
// subscribe to bound queue
2018-01-31 10:11:58 +00:00
msgs, err := rabbitChan.Consume("orders", "", true, false, false, false, nil)
2018-01-29 17:40:12 +00:00
failOnError(err, "Failed to consume")
for d := range msgs {
log.Printf("Order %s\n", d.Body)
2018-01-31 17:27:17 +00:00
log.Printf("Headers %v\n", d.Headers)
go createSpan(d.Headers)
2018-01-29 17:40:12 +00:00
}
}
}()
log.Println("Waiting for messages")
forever := make(chan bool)
<-forever
}