You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
toolkit/cmd/tempo-to-otlp/main.go

182 lines
5.8 KiB

package main
import (
"bytes"
"flag"
"fmt"
"io"
"log"
"os"
"github.com/golang/protobuf/proto" // NOTE: keep this, it's required to unmarshall the tempopb
"github.com/grafana/tempo/pkg/tempopb"
tempocommonv1 "github.com/grafana/tempo/pkg/tempopb/common/v1"
tempotracev1 "github.com/grafana/tempo/pkg/tempopb/trace/v1"
commonv1 "go.opentelemetry.io/proto/otlp/common/v1"
resourcev1 "go.opentelemetry.io/proto/otlp/resource/v1"
tracev1 "go.opentelemetry.io/proto/otlp/trace/v1"
newproto "google.golang.org/protobuf/proto"
)
var (
file = flag.String("file", "", "Tempo proto file. if set, will not use stdin.")
out = flag.String("out", "", "Output file destination. Defaults to stdout.")
// asJSON = flag.Bool("json", false, "Read input as jsonpb.") // TODO(wperron) implement this
)
// TODO(wperron) implement reading from stdin
func main() {
flag.Parse()
var output io.Writer
if *out == "" {
output = os.Stdout
} else {
output = must(os.OpenFile(*out, os.O_CREATE+os.O_RDWR, 0o664))
}
h := must(os.Open(*file))
bs := must(io.ReadAll(h))
var trace tempopb.Trace
if err := proto.Unmarshal(bs, &trace); err != nil {
log.Fatalln(err)
}
td := must(convert(trace))
bs = must(newproto.Marshal(&td))
written := must(io.Copy(output, bytes.NewReader(bs)))
fmt.Fprintf(os.Stderr, "completed successfully, %d bytes written", written)
}
func convert(trace tempopb.Trace) (tracev1.TracesData, error) {
td := make([]*tracev1.ResourceSpans, 0, len(trace.Batches))
for _, resourceSpans := range trace.Batches {
rs := &tracev1.ResourceSpans{
Resource: &resourcev1.Resource{
Attributes: convertAttributes(resourceSpans.Resource.Attributes),
DroppedAttributesCount: resourceSpans.Resource.DroppedAttributesCount,
},
ScopeSpans: make([]*tracev1.ScopeSpans, 0),
}
for _, ils := range resourceSpans.InstrumentationLibrarySpans {
scope := &tracev1.ScopeSpans{
Spans: make([]*tracev1.Span, 0),
}
scope.Scope = &commonv1.InstrumentationScope{
Name: ils.InstrumentationLibrary.Name,
Version: ils.InstrumentationLibrary.Version,
Attributes: convertAttributes(resourceSpans.Resource.Attributes), // TODO(wperron) is this right?
DroppedAttributesCount: resourceSpans.Resource.DroppedAttributesCount, // TODO(wperron) is this right?
}
for _, span := range ils.Spans {
otelSpan := &tracev1.Span{
TraceId: span.TraceId,
SpanId: span.SpanId,
TraceState: span.TraceState,
ParentSpanId: span.ParentSpanId,
Name: span.Name,
Kind: tracev1.Span_SpanKind(span.Kind),
StartTimeUnixNano: span.StartTimeUnixNano,
EndTimeUnixNano: span.EndTimeUnixNano,
Attributes: convertAttributes(span.Attributes),
DroppedAttributesCount: span.DroppedAttributesCount,
Events: convertEvents(span.Events),
DroppedEventsCount: span.DroppedEventsCount,
Links: convertLinks(span.Links),
DroppedLinksCount: span.DroppedLinksCount,
Status: convertStatus(span.Status),
}
scope.Spans = append(scope.Spans, otelSpan)
}
rs.ScopeSpans = append(rs.ScopeSpans, scope)
}
td = append(td, rs)
}
return tracev1.TracesData{
ResourceSpans: td,
}, nil
}
func convertAttributes(attrs []*tempocommonv1.KeyValue) []*commonv1.KeyValue {
kvs := make([]*commonv1.KeyValue, 0, len(attrs))
for _, a := range attrs {
kvs = append(kvs, &commonv1.KeyValue{
Key: a.Key,
Value: convertAnyValue(a.Value),
})
}
return kvs
}
func convertAnyValue(av *tempocommonv1.AnyValue) *commonv1.AnyValue {
inner := av.GetValue()
v := &commonv1.AnyValue{}
switch inner.(type) {
case *tempocommonv1.AnyValue_StringValue:
v.Value = &commonv1.AnyValue_StringValue{StringValue: av.GetStringValue()}
case *tempocommonv1.AnyValue_IntValue:
v.Value = &commonv1.AnyValue_IntValue{IntValue: av.GetIntValue()}
case *tempocommonv1.AnyValue_DoubleValue:
v.Value = &commonv1.AnyValue_DoubleValue{DoubleValue: av.GetDoubleValue()}
case *tempocommonv1.AnyValue_BoolValue:
v.Value = &commonv1.AnyValue_BoolValue{BoolValue: av.GetBoolValue()}
case *tempocommonv1.AnyValue_ArrayValue:
inner := av.GetArrayValue().Values
i := make([]*commonv1.AnyValue, 0, len(inner))
for _, val := range inner {
i = append(i, convertAnyValue(val))
}
v.Value = &commonv1.AnyValue_ArrayValue{ArrayValue: &commonv1.ArrayValue{Values: i}}
case *tempocommonv1.AnyValue_KvlistValue:
inner := av.GetKvlistValue().Values
v.Value = &commonv1.AnyValue_KvlistValue{KvlistValue: &commonv1.KeyValueList{Values: convertAttributes(inner)}}
}
return v
}
func convertEvents(events []*tempotracev1.Span_Event) []*tracev1.Span_Event {
ev := make([]*tracev1.Span_Event, 0, len(events))
for _, event := range events {
ev = append(ev, &tracev1.Span_Event{
TimeUnixNano: event.TimeUnixNano,
Name: event.Name,
DroppedAttributesCount: event.DroppedAttributesCount,
Attributes: convertAttributes(event.Attributes),
})
}
return ev
}
func convertLinks(links []*tempotracev1.Span_Link) []*tracev1.Span_Link {
ls := make([]*tracev1.Span_Link, 0, len(links))
for _, link := range links {
ls = append(ls, &tracev1.Span_Link{
TraceId: link.TraceId,
SpanId: link.SpanId,
TraceState: link.TraceState,
DroppedAttributesCount: link.DroppedAttributesCount,
Attributes: convertAttributes(link.Attributes),
})
}
return ls
}
func convertStatus(status *tempotracev1.Status) *tracev1.Status {
return &tracev1.Status{
Message: status.Message,
Code: tracev1.Status_StatusCode(status.Code),
}
}
func must[T any](v T, err error) T {
if err != nil {
panic(err)
}
return v
}