diff --git a/dispatch/Dockerfile b/dispatch/Dockerfile index 3d84a4d..f134dbf 100644 --- a/dispatch/Dockerfile +++ b/dispatch/Dockerfile @@ -1,16 +1,10 @@ -FROM golang:1.12.7 +FROM golang:1.14 -ENV GOPATH=/go +WORKDIR /go/src/app -RUN apt-get update && apt-get install -y go-dep +COPY . . -WORKDIR /go/src/github.com/instana/dispatch +RUN go get -d -v ./... +RUN go install -v ./... -COPY src/ /go/src/github.com/instana/dispatch - -RUN dep init && dep ensure -RUN go build -o bin/gorcv - -# TODO stage this build - -CMD bin/gorcv +CMD dispatch diff --git a/dispatch/go.mod b/dispatch/go.mod new file mode 100644 index 0000000..817f67c --- /dev/null +++ b/dispatch/go.mod @@ -0,0 +1,8 @@ +module github.com/instana/robot-shop/dispatch + +go 1.14 + +require ( + github.com/instana/go-sensor v1.13.1 // indirect + github.com/streadway/amqp v1.0.0 // indirect +) diff --git a/dispatch/go.sum b/dispatch/go.sum new file mode 100644 index 0000000..5d935ea --- /dev/null +++ b/dispatch/go.sum @@ -0,0 +1,25 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= +github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= +github.com/instana/go-sensor v1.13.1 h1:9txDn4B9s2sSRDe30WZs5jxK/Isdk56w/AOxIIa/vRo= +github.com/instana/go-sensor v1.13.1/go.mod h1:lDfZvfAyo5DWJ2AvOHINRTUTG5TMdZNnwXXcFRtfZBE= +github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/looplab/fsm v0.1.0 h1:Qte7Zdn/5hBNbXzP7yxVU4OIFHWXBovyTT2LaBTyC20= +github.com/looplab/fsm v0.1.0/go.mod h1:m2VaOfDHxqXBBMgc26m6yUOwkFn8H2AlJDE+jd/uafI= +github.com/opentracing/basictracer-go v1.0.0 h1:YyUAhaEfjoWXclZVJ9sGoNct7j4TVk7lZWlQw5UXuoo= +github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74= +github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU= +github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo= +github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/dispatch/main.go b/dispatch/main.go new file mode 100644 index 0000000..1073412 --- /dev/null +++ b/dispatch/main.go @@ -0,0 +1,220 @@ +package main + +import ( + "encoding/json" + "fmt" + "log" + "math/rand" + "os" + "strconv" + "time" + + "github.com/instana/go-sensor" + ot "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" + otlog "github.com/opentracing/opentracing-go/log" + "github.com/streadway/amqp" +) + +const ( + Service = "dispatch" +) + +var ( + amqpUri string + rabbitChan *amqp.Channel + rabbitCloseError chan *amqp.Error + rabbitReady chan bool + errorPercent int +) + +func connectToRabbitMQ(uri string) *amqp.Connection { + for { + conn, err := amqp.Dial(uri) + if err == nil { + return conn + } + + log.Println(err) + log.Printf("Reconnecting to %s\n", uri) + time.Sleep(1 * time.Second) + } +} + +func rabbitConnector(uri string) { + var rabbitErr *amqp.Error + + for { + rabbitErr = <-rabbitCloseError + if rabbitErr == nil { + return + } + + log.Printf("Connecting to %s\n", amqpUri) + rabbitConn := connectToRabbitMQ(uri) + rabbitConn.NotifyClose(rabbitCloseError) + + var err error + + // create mappings here + rabbitChan, err = rabbitConn.Channel() + failOnError(err, "Failed to create channel") + + // create exchange + err = rabbitChan.ExchangeDeclare("robot-shop", "direct", true, false, false, false, nil) + failOnError(err, "Failed to create exchange") + + // create queue + queue, err := rabbitChan.QueueDeclare("orders", true, false, false, false, nil) + failOnError(err, "Failed to create queue") + + // bind queue to exchange + err = rabbitChan.QueueBind(queue.Name, "orders", "robot-shop", false, nil) + failOnError(err, "Failed to bind queue") + + // signal ready + rabbitReady <- true + } +} + +func failOnError(err error, msg string) { + if err != nil { + log.Fatalf("%s : %s", msg, err) + } +} + +func getOrderId(order []byte) string { + id := "unknown" + var f interface{} + err := json.Unmarshal(order, &f) + if err == nil { + m := f.(map[string]interface{}) + id = m["orderid"].(string) + } + + return id +} + +func createSpan(headers map[string]interface{}, order string) { + // headers is map[string]interface{} + // carrier is map[string]string + carrier := make(ot.TextMapCarrier) + // convert by copying k, v + for k, v := range headers { + carrier[k] = v.(string) + } + + // get the order id + log.Printf("order %s\n", order) + + // opentracing + var span ot.Span + tracer := ot.GlobalTracer() + spanContext, err := tracer.Extract(ot.HTTPHeaders, carrier) + if err == nil { + log.Println("Creating child span") + // create child span + span = tracer.StartSpan("getOrder", ot.ChildOf(spanContext)) + } else { + log.Println(err) + log.Println("Failed to get context from headers") + log.Println("Creating root span") + // create root span + span = tracer.StartSpan("getOrder") + } + + span.SetTag(string(ext.SpanKind), ext.SpanKindConsumerEnum) + span.SetTag(string(ext.MessageBusDestination), "robot-shop") + span.SetTag("exchange", "robot-shop") + span.SetTag("sort", "consume") + span.SetTag("address", "rabbitmq") + span.SetTag("key", "orders") + span.LogFields(otlog.String("orderid", order)) + defer span.Finish() + + time.Sleep(time.Duration(42+rand.Int63n(42)) * time.Millisecond) + if rand.Intn(100) < errorPercent { + span.SetTag("error", true) + span.LogFields( + otlog.String("error.kind", "Exception"), + otlog.String("message", "Failed to dispatch to SOP")) + log.Println("Span tagged with error") + } + + processSale(span) +} + +func processSale(parentSpan ot.Span) { + tracer := ot.GlobalTracer() + span := tracer.StartSpan("processSale", ot.ChildOf(parentSpan.Context())) + defer span.Finish() + span.SetTag(string(ext.SpanKind), "intermediate") + span.LogFields(otlog.String("info", "Order sent for processing")) + time.Sleep(time.Duration(42+rand.Int63n(42)) * time.Millisecond) +} + +func main() { + // Instana tracing + ot.InitGlobalTracer(instana.NewTracerWithOptions(&instana.Options{ + Service: Service, + LogLevel: instana.Info, + EnableAutoProfile: true, + })) + + // Init amqpUri + // get host from environment + amqpHost, ok := os.LookupEnv("AMQP_HOST") + if !ok { + amqpHost = "rabbitmq" + } + amqpUri = fmt.Sprintf("amqp://guest:guest@%s:5672/", amqpHost) + + // get error threshold from environment + errorPercent = 0 + epct, ok := os.LookupEnv("DISPATCH_ERROR_PERCENT") + if ok { + epcti, err := strconv.Atoi(epct) + if err == nil { + if epcti > 100 { + epcti = 100 + } + if epcti < 0 { + epcti = 0 + } + errorPercent = epcti + } + } + log.Printf("Error Percent is %d\n", errorPercent) + + // MQ error channel + rabbitCloseError = make(chan *amqp.Error) + + // MQ ready channel + rabbitReady = make(chan bool) + + go rabbitConnector(amqpUri) + + rabbitCloseError <- amqp.ErrClosed + + go func() { + for { + // wait for rabbit to be ready + ready := <-rabbitReady + log.Printf("Rabbit MQ ready %v\n", ready) + + // subscribe to bound queue + msgs, err := rabbitChan.Consume("orders", "", true, false, false, false, nil) + failOnError(err, "Failed to consume") + + for d := range msgs { + log.Printf("Order %s\n", d.Body) + log.Printf("Headers %v\n", d.Headers) + id := getOrderId(d.Body) + go createSpan(d.Headers, id) + } + } + }() + + log.Println("Waiting for messages") + select {} +} diff --git a/dispatch/src/main.go b/dispatch/src/main.go deleted file mode 100644 index a746745..0000000 --- a/dispatch/src/main.go +++ /dev/null @@ -1,219 +0,0 @@ -package main - -import ( - "fmt" - "log" - "time" - "os" - "math/rand" - "strconv" - "encoding/json" - - "github.com/streadway/amqp" - "github.com/instana/go-sensor" - ot "github.com/opentracing/opentracing-go" - ext "github.com/opentracing/opentracing-go/ext" - otlog "github.com/opentracing/opentracing-go/log" -) - -const ( - Service = "dispatch" -) - -var ( - amqpUri string - rabbitChan *amqp.Channel - rabbitCloseError chan *amqp.Error - rabbitReady chan bool - errorPercent int -) - -func connectToRabbitMQ(uri string) *amqp.Connection { - for { - conn, err := amqp.Dial(uri) - if err == nil { - return conn - } - - log.Println(err) - log.Printf("Reconnecting to %s\n", uri) - time.Sleep(1 * time.Second) - } -} - -func rabbitConnector(uri string) { - var rabbitErr *amqp.Error - - for { - rabbitErr = <-rabbitCloseError - if rabbitErr != nil { - log.Printf("Connecting to %s\n", amqpUri) - rabbitConn := connectToRabbitMQ(uri) - rabbitConn.NotifyClose(rabbitCloseError) - - var err error - - // create mappings here - rabbitChan, err = rabbitConn.Channel() - failOnError(err, "Failed to create channel") - - // create exchange - err = rabbitChan.ExchangeDeclare("robot-shop", "direct", true, false, false, false, nil) - failOnError(err, "Failed to create exchange") - - // create queue - queue, err := rabbitChan.QueueDeclare("orders", true, false, false, false, nil) - failOnError(err, "Failed to create queue") - - // bind queue to exchange - err = rabbitChan.QueueBind(queue.Name, "orders", "robot-shop", false, nil) - failOnError(err, "Failed to bind queue") - - // signal ready - rabbitReady <- true - } - } -} - -func failOnError(err error, msg string) { - if err != nil { - log.Fatalf("$s : %s", msg, err) - panic(fmt.Sprintf("%s : %s", msg, err)) - } -} - -func getOrderId(order []byte) string { - id := "unknown" - var f interface{} - err := json.Unmarshal(order, &f) - if err == nil { - m := f.(map[string]interface{}) - id = m["orderid"].(string) - } - - return id -} - -func createSpan(headers map[string]interface{}, order string) { - // headers is map[string]interface{} - // carrier is map[string]string - carrier := make(ot.TextMapCarrier) - // convert by copying k, v - for k, v := range headers { - carrier[k] = v.(string) - } - - // get the order id - log.Printf("order %s\n", order) - - // opentracing - var span ot.Span - tracer := ot.GlobalTracer() - spanContext, err := tracer.Extract(ot.HTTPHeaders, carrier) - if err == nil { - log.Println("Creating child span") - // create child span - span = tracer.StartSpan("getOrder", ot.ChildOf(spanContext)) - } else { - log.Println(err) - log.Println("Failed to get context from headers") - log.Println("Creating root span") - // create root span - span = tracer.StartSpan("getOrder") - } - - span.SetTag(string(ext.SpanKind), ext.SpanKindConsumerEnum) - span.SetTag(string(ext.MessageBusDestination), "robot-shop") - span.SetTag("exchange", "robot-shop") - span.SetTag("sort", "consume") - span.SetTag("address", "rabbitmq") - span.SetTag("key", "orders") - span.LogFields(otlog.String("orderid", order)) - defer span.Finish() - - time.Sleep(time.Duration(42 + rand.Int63n(42)) * time.Millisecond) - if rand.Intn(100) < errorPercent { - span.SetTag("error", true) - span.LogFields( - otlog.String("error.kind", "Exception"), - otlog.String("message", "Failed to dispatch to SOP")) - log.Println("Span tagged with error") - } - - processSale(span) -} - -func processSale(parentSpan ot.Span) { - tracer := ot.GlobalTracer() - span := tracer.StartSpan("processSale", ot.ChildOf(parentSpan.Context())) - defer span.Finish() - span.SetTag(string(ext.SpanKind), "intermediate") - span.LogFields(otlog.String("info", "Order sent for processing")) - time.Sleep(time.Duration(42 + rand.Int63n(42)) * time.Millisecond) -} - - -func main() { - // Instana tracing - ot.InitGlobalTracer(instana.NewTracerWithOptions(&instana.Options{ - Service: Service, - LogLevel: instana.Info})) - - // Init amqpUri - // get host from environment - amqpHost, ok := os.LookupEnv("AMQP_HOST") - if !ok { - amqpHost = "rabbitmq" - } - amqpUri = fmt.Sprintf("amqp://guest:guest@%s:5672/", amqpHost) - - // get error threshold from environment - errorPercent = 0 - epct, ok := os.LookupEnv("DISPATCH_ERROR_PERCENT") - if ok { - epcti, err := strconv.Atoi(epct) - if err == nil { - if epcti > 100 { - epcti = 100 - } - if epcti < 0 { - epcti = 0 - } - errorPercent = epcti - } - } - log.Printf("Error Percent is %d\n", errorPercent) - - // MQ error channel - rabbitCloseError = make(chan *amqp.Error) - - // MQ ready channel - rabbitReady = make(chan bool) - - go rabbitConnector(amqpUri) - - rabbitCloseError <- amqp.ErrClosed - - go func() { - for { - // wait for rabbit to be ready - ready := <-rabbitReady - log.Printf("Rabbit MQ ready %v\n", ready) - - // subscribe to bound queue - msgs, err := rabbitChan.Consume("orders", "", true, false, false, false, nil) - failOnError(err, "Failed to consume") - - for d := range msgs { - log.Printf("Order %s\n", d.Body) - log.Printf("Headers %v\n", d.Headers) - id := getOrderId(d.Body) - go createSpan(d.Headers, id) - } - } - }() - - log.Println("Waiting for messages") - forever := make(chan bool) - <-forever -}