Implement sdktrace.SpanExporter interface

In order to allow using this exporter as an embedded exporter in an
application.

Adds a new internal `transform` package to transform spans from the
`[]sdktrace.ReadOnlySpan` structure to `ptrace.Traces`.
main v0.1.0-rc2
William Perron 12 months ago
parent 95dea9dfde
commit 4f95e09116
Signed by: wperron
GPG Key ID: BFDB4EF72D73C5F2

@ -58,6 +58,10 @@ func newSqliteExporter(cfg *Config) (*sqliteExporter, error) {
return nil, fmt.Errorf("couldn't open sqlite3 database: %w", err)
}
// IMPORTANT: database/sql opens a connection pool by default, but sqlite
// only allows a single connection to be open at the same time.
db.SetMaxOpenConns(1)
tx, _ := db.Begin()
defer tx.Commit()

@ -10,6 +10,8 @@ require (
require (
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/hashicorp/go-version v1.6.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
@ -38,10 +40,10 @@ require (
go.opentelemetry.io/collector/exporter v0.92.0
go.opentelemetry.io/collector/featuregate v1.0.1 // indirect
go.opentelemetry.io/collector/pdata v1.0.1
go.opentelemetry.io/otel v1.22.0 // indirect
go.opentelemetry.io/otel v1.22.0
go.opentelemetry.io/otel/metric v1.22.0 // indirect
go.opentelemetry.io/otel/sdk v1.22.0 // indirect
go.opentelemetry.io/otel/trace v1.22.0 // indirect
go.opentelemetry.io/otel/sdk v1.22.0
go.opentelemetry.io/otel/trace v1.22.0
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.26.0 // indirect
golang.org/x/net v0.19.0 // indirect

@ -17,8 +17,11 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/go-kit/log v0.2.1 h1:MRVx0/zhvdseW+Gza6N9rVzU/IVzaeE1SFI4raAhmBU=
github.com/go-logfmt/logfmt v0.5.1 h1:otpy5pqBCBZ1ng9RQ0dPu4PN7ba75Y/aA+UpowDyNVA=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=

@ -0,0 +1,166 @@
package transform
import (
"hash/fnv"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)
func Spans(sdl []sdktrace.ReadOnlySpan) ptrace.Traces {
traces := ptrace.NewTraces()
rss := traces.ResourceSpans()
resMap := make(map[uint64]ptrace.ResourceSpans)
scopeMap := make(map[uint64]ptrace.ScopeSpans)
for _, s := range sdl {
var rs ptrace.ResourceSpans
if r, ok := resMap[hashResource(s.Resource())]; ok {
rs = r
} else {
// create a new resource
// append it to the traces
// add it to the map
rs = rss.AppendEmpty()
resMap[hashResource(s.Resource())] = rs
}
res := rs.Resource()
res.SetDroppedAttributesCount(0) // TODO(wperron) how can we get this number?
ra := transformAttributes(s.Resource().Attributes())
ra.CopyTo(res.Attributes())
var ss ptrace.ScopeSpans
if scope, ok := scopeMap[hashScope(s.InstrumentationScope())]; ok {
ss = scope
} else {
// create a new scope
// append it to the resource
// add it to the map
ss = rs.ScopeSpans().AppendEmpty()
scopeMap[hashScope(s.InstrumentationScope())] = ss
}
// create a new span and fill it with the info from the readonly span
span := ss.Spans().AppendEmpty()
span.SetTraceID(pcommon.TraceID(s.SpanContext().TraceID()))
span.SetSpanID(pcommon.SpanID(s.SpanContext().SpanID()))
span.SetParentSpanID(pcommon.SpanID(s.Parent().SpanID()))
span.TraceState().FromRaw(s.SpanContext().TraceState().String())
span.SetName(s.Name())
span.SetKind(ptrace.SpanKind(s.SpanKind()))
span.SetStartTimestamp(pcommon.NewTimestampFromTime(s.StartTime()))
span.SetEndTimestamp(pcommon.NewTimestampFromTime(s.EndTime()))
var code ptrace.StatusCode
switch s.Status().Code {
case codes.Unset:
code = ptrace.StatusCodeUnset
case codes.Ok:
code = ptrace.StatusCodeOk
case codes.Error:
code = ptrace.StatusCodeError
default:
panic("unreachable")
}
span.Status().SetCode(code)
span.Status().SetMessage(s.Status().Description)
span.SetDroppedAttributesCount(uint32(s.DroppedAttributes()))
span.SetDroppedEventsCount(uint32(s.DroppedEvents()))
span.SetDroppedLinksCount(uint32(s.DroppedLinks()))
a := transformAttributes(s.Attributes())
a.CopyTo(span.Attributes())
for _, e := range s.Events() {
ev := span.Events().AppendEmpty()
ev.SetTimestamp(pcommon.NewTimestampFromTime(e.Time))
ev.SetName(e.Name)
ev.SetDroppedAttributesCount(uint32(e.DroppedAttributeCount))
ea := transformAttributes(e.Attributes)
ea.CopyTo(ev.Attributes())
}
for _, l := range s.Links() {
ln := span.Links().AppendEmpty()
ln.SetTraceID(pcommon.TraceID(l.SpanContext.TraceID()))
ln.SetSpanID(pcommon.SpanID(l.SpanContext.SpanID()))
ln.TraceState().FromRaw(l.SpanContext.TraceState().String())
ln.SetDroppedAttributesCount(uint32(l.DroppedAttributeCount))
la := transformAttributes(l.Attributes)
la.CopyTo(ln.Attributes())
}
}
return traces
}
func transformAttributes(from []attribute.KeyValue) pcommon.Map {
to := pcommon.NewMap()
to.EnsureCapacity(len(from))
for _, a := range from {
switch a.Value.Type() {
case attribute.BOOL:
to.PutBool(string(a.Key), a.Value.AsBool())
case attribute.INT64:
to.PutInt(string(a.Key), a.Value.AsInt64())
case attribute.FLOAT64:
to.PutDouble(string(a.Key), a.Value.AsFloat64())
case attribute.STRING:
to.PutStr(string(a.Key), a.Value.AsString())
case attribute.BOOLSLICE:
s := to.PutEmptySlice(string(a.Key))
raw := a.Value.AsBoolSlice()
s.EnsureCapacity(len(raw))
for _, r := range raw {
v := s.AppendEmpty()
v.SetBool(r)
}
case attribute.INT64SLICE:
s := to.PutEmptySlice(string(a.Key))
raw := a.Value.AsInt64Slice()
s.EnsureCapacity(len(raw))
for _, r := range raw {
v := s.AppendEmpty()
v.SetInt(r)
}
case attribute.STRINGSLICE:
s := to.PutEmptySlice(string(a.Key))
raw := a.Value.AsStringSlice()
s.EnsureCapacity(len(raw))
for _, r := range raw {
v := s.AppendEmpty()
v.SetStr(r)
}
case attribute.FLOAT64SLICE:
s := to.PutEmptySlice(string(a.Key))
raw := a.Value.AsFloat64Slice()
s.EnsureCapacity(len(raw))
for _, r := range raw {
v := s.AppendEmpty()
v.SetDouble(r)
}
}
}
return to
}
func hashResource(res *resource.Resource) uint64 {
h := fnv.New64a()
h.Write([]byte(res.Encoded(attribute.DefaultEncoder())))
return h.Sum64()
}
func hashScope(s instrumentation.Scope) uint64 {
h := fnv.New64a()
h.Write([]byte(s.Name))
h.Write([]byte(s.SchemaURL))
h.Write([]byte(s.Version))
return h.Sum64()
}

@ -0,0 +1,234 @@
package transform
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"go.opentelemetry.io/otel/trace"
)
func TestTransformSpan(t *testing.T) {
sdkspanslice := make([]sdktrace.ReadOnlySpan, 1)
start := time.Unix(1000, 0)
end := start.Add(5 * time.Second)
evtts := time.Unix(1500, 0)
stringVals := []string{"first", "second"}
intVals := []int{1, 2, 3}
floatVals := []float64{1.1, 2.2, 3.3}
boolVals := []bool{true, false}
sdkspanslice[0] = tracetest.SpanStub{
Name: "span-stub",
SpanContext: trace.SpanContext{}.
WithSpanID(trace.SpanID{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01}).
WithTraceID(trace.TraceID{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01}).
WithTraceState(must(trace.ParseTraceState("foo=bar"))),
Parent: trace.SpanContext{}.
WithSpanID(trace.SpanID{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x11}).
WithTraceID(trace.TraceID{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01}).
WithTraceState(must(trace.ParseTraceState("foo=bar"))),
SpanKind: trace.SpanKindServer,
StartTime: start,
EndTime: end,
Attributes: []attribute.KeyValue{
{
Key: "stringkey",
Value: attribute.StringValue("stringval"),
},
{
Key: "intkey",
Value: attribute.IntValue(123),
},
{
Key: "floatkey",
Value: attribute.Float64Value(111.2),
},
{
Key: "boolkey",
Value: attribute.BoolValue(true),
},
{
Key: "stringslicekey",
Value: attribute.StringSliceValue(stringVals),
},
{
Key: "intslicekey",
Value: attribute.IntSliceValue(intVals),
},
{
Key: "floatslicekey",
Value: attribute.Float64SliceValue(floatVals),
},
{
Key: "boolslicekey",
Value: attribute.BoolSliceValue(boolVals),
},
},
Events: []sdktrace.Event{
{
Name: "spanevent",
Attributes: []attribute.KeyValue{
{
Key: "eventstringkey",
Value: attribute.StringValue("eventstringval"),
},
},
DroppedAttributeCount: 4,
Time: evtts,
},
},
Links: []sdktrace.Link{
{
SpanContext: trace.NewSpanContext(trace.SpanContextConfig{
TraceID: [16]byte{0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef, 0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef},
SpanID: [8]byte{0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef},
TraceFlags: 1,
TraceState: must(trace.ParseTraceState("foo=bar")),
Remote: false,
}),
Attributes: []attribute.KeyValue{
{
Key: "linkstringkey",
Value: attribute.StringValue("linkstringval"),
},
},
DroppedAttributeCount: 5,
},
},
Status: sdktrace.Status{
Code: codes.Ok,
Description: "OK",
},
DroppedAttributes: 1,
DroppedEvents: 2,
DroppedLinks: 3,
ChildSpanCount: 0,
Resource: resource.NewWithAttributes("https://opentelemetry.io/schemas/1.24.0",
attribute.KeyValue{Key: "service.name", Value: attribute.StringValue("test-service")},
),
InstrumentationLibrary: instrumentation.Scope{
Name: "test-tracer",
Version: "0.0.1",
SchemaURL: "https://opentelemetry.io/schemas/1.24.0",
},
}.Snapshot()
spans := Spans(sdkspanslice)
assert.Equal(t, 1, spans.ResourceSpans().Len())
for i := 0; i < spans.ResourceSpans().Len(); i++ {
require.Less(t, i, 1)
rs := spans.ResourceSpans().At(i)
res := rs.Resource()
exp := pcommon.NewMap()
exp.PutStr("service.name", "test-service")
assert.Equal(t, exp, res.Attributes())
assert.Equal(t, uint32(0), res.DroppedAttributesCount())
for j := 0; j < rs.ScopeSpans().Len(); j++ {
require.Less(t, j, 1)
ss := rs.ScopeSpans().At(j)
for k := 0; k < ss.Spans().Len(); k++ {
require.Less(t, k, 1)
span := ss.Spans().At(k)
ts := pcommon.NewTraceState()
ts.FromRaw("foo=bar")
assert.Equal(t, pcommon.SpanID{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01}, span.SpanID())
assert.Equal(t, pcommon.TraceID{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01}, span.TraceID())
assert.Equal(t, pcommon.SpanID{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x11}, span.ParentSpanID())
assert.Equal(t, ts, span.TraceState())
assert.Equal(t, "span-stub", span.Name())
assert.Equal(t, ptrace.SpanKindServer, span.Kind())
assert.Equal(t, ptrace.StatusCodeOk, span.Status().Code())
assert.Equal(t, "OK", span.Status().Message())
assert.Equal(t, pcommon.NewTimestampFromTime(start), span.StartTimestamp())
assert.Equal(t, pcommon.NewTimestampFromTime(end), span.EndTimestamp())
assert.Equal(t, uint32(1), span.DroppedAttributesCount())
assert.Equal(t, uint32(2), span.DroppedEventsCount())
assert.Equal(t, uint32(3), span.DroppedLinksCount())
a, ok := span.Attributes().Get("stringkey")
assert.True(t, ok)
assert.Equal(t, "stringval", a.Str())
a, ok = span.Attributes().Get("intkey")
assert.True(t, ok)
assert.Equal(t, int64(123), a.Int())
a, ok = span.Attributes().Get("floatkey")
assert.True(t, ok)
assert.Equal(t, float64(111.2), a.Double())
a, ok = span.Attributes().Get("boolkey")
assert.True(t, ok)
assert.Equal(t, true, a.Bool())
a, ok = span.Attributes().Get("stringslicekey")
assert.True(t, ok)
for i := 0; i < a.Slice().Len(); i++ {
v := a.Slice().At(i)
assert.Equal(t, stringVals[i], v.Str())
}
a, ok = span.Attributes().Get("intslicekey")
assert.True(t, ok)
for i := 0; i < a.Slice().Len(); i++ {
v := a.Slice().At(i)
assert.Equal(t, int64(intVals[i]), v.Int())
}
a, ok = span.Attributes().Get("floatslicekey")
assert.True(t, ok)
for i := 0; i < a.Slice().Len(); i++ {
v := a.Slice().At(i)
assert.Equal(t, floatVals[i], v.Double())
}
a, ok = span.Attributes().Get("boolslicekey")
assert.True(t, ok)
for i := 0; i < a.Slice().Len(); i++ {
v := a.Slice().At(i)
assert.Equal(t, boolVals[i], v.Bool())
}
assert.Equal(t, 1, span.Events().Len())
ev := span.Events().At(0)
assert.Equal(t, evtts.UTC(), ev.Timestamp().AsTime())
assert.Equal(t, "spanevent", ev.Name())
assert.Equal(t, uint32(4), ev.DroppedAttributesCount())
a, ok = ev.Attributes().Get("eventstringkey")
assert.True(t, ok)
assert.Equal(t, "eventstringval", a.Str())
assert.Equal(t, 1, span.Links().Len())
ln := span.Links().At(0)
assert.Equal(t, pcommon.TraceID([16]byte{0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef, 0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef}), ln.TraceID())
assert.Equal(t, pcommon.SpanID([8]byte{0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef}), ln.SpanID())
assert.Equal(t, ts, ln.TraceState())
assert.Equal(t, uint32(5), ln.DroppedAttributesCount())
a, ok = ln.Attributes().Get("linkstringkey")
assert.True(t, ok)
assert.Equal(t, "linkstringval", a.Str())
}
}
}
}
func must[T any](v T, err error) T {
if err != nil {
panic(err)
}
return v
}

@ -14,6 +14,8 @@ CREATE TABLE IF NOT EXISTS spans(
"status_description" TEXT,
"attributes" TEXT,
"dropped_attributes_count" INTEGER,
"dropped_events_count" INTEGER,
"dropped_links_count" INTEGER,
"resource_attributes" TEXT,
"resource_dropped_attributes_count" INTEGER,
PRIMARY KEY ("span_id", "trace_id")

@ -11,12 +11,40 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.wperron.io/sqliteexporter/internal/transform"
)
// The goal is to have the sqliteexporter usable both in a collector deployment
// but also embedded in an app. As such, it must satisfy the component.Component
// interface and the sdktrace.SpanExporter interface.
var _ sdktrace.SpanExporter = &sqliteExporter{}
var _ component.Component = &sqliteExporter{}
type sqliteExporter struct {
db *sql.DB
}
// DO NOT CHANGE: any modification will not be backwards compatible and
// must never be done outside of a new major release.
// ExportSpans exports a batch of spans.
//
// This function is called synchronously, so there is no concurrency
// safety requirement. However, due to the synchronous calling pattern,
// it is critical that all timeouts and cancellations contained in the
// passed context must be honored.
//
// Any retry logic must be contained in this function. The SDK that
// calls this function will not implement any retry logic. All errors
// returned by this function are considered unrecoverable and will be
// reported to a configured error Handler.
// DO NOT CHANGE: any modification will not be backwards compatible and
// must never be done outside of a new major release.
func (e *sqliteExporter) ExportSpans(ctx context.Context, spans []sdktrace.ReadOnlySpan) error {
ss := transform.Spans(spans)
return e.ConsumeTraces(ctx, ss)
}
// Start tells the component to start. Host parameter can be used for communicating
// with the host after Start() has already returned. If an error is returned by
// Start() then the collector startup will be aborted.
@ -68,11 +96,13 @@ const insertSpanQ string = `INSERT INTO spans
status_description,
attributes,
dropped_attributes_count,
dropped_events_count,
dropped_links_count,
resource_attributes,
resource_dropped_attributes_count
)
VALUES (
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, json(?), ?, json(?), ?
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, json(?), ?, ?, ?, json(?), ?
);
`
@ -173,6 +203,8 @@ func (e *sqliteExporter) ConsumeTraces(ctx context.Context, traces ptrace.Traces
span.Status().Message(),
attrs,
span.DroppedAttributesCount(),
span.DroppedEventsCount(),
span.DroppedLinksCount(),
rattrs,
resource.Resource().DroppedAttributesCount(),
)

Binary file not shown.
Loading…
Cancel
Save