diff --git a/factory.go b/factory.go index 46679ea..b22150c 100644 --- a/factory.go +++ b/factory.go @@ -58,6 +58,10 @@ func newSqliteExporter(cfg *Config) (*sqliteExporter, error) { return nil, fmt.Errorf("couldn't open sqlite3 database: %w", err) } + // IMPORTANT: database/sql opens a connection pool by default, but sqlite + // only allows a single connection to be open at the same time. + db.SetMaxOpenConns(1) + tx, _ := db.Begin() defer tx.Commit() diff --git a/go.mod b/go.mod index 120933a..f331239 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,8 @@ require ( require ( github.com/cenkalti/backoff/v4 v4.2.1 // indirect + github.com/go-logr/logr v1.4.1 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/hashicorp/go-version v1.6.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect @@ -38,10 +40,10 @@ require ( go.opentelemetry.io/collector/exporter v0.92.0 go.opentelemetry.io/collector/featuregate v1.0.1 // indirect go.opentelemetry.io/collector/pdata v1.0.1 - go.opentelemetry.io/otel v1.22.0 // indirect + go.opentelemetry.io/otel v1.22.0 go.opentelemetry.io/otel/metric v1.22.0 // indirect - go.opentelemetry.io/otel/sdk v1.22.0 // indirect - go.opentelemetry.io/otel/trace v1.22.0 // indirect + go.opentelemetry.io/otel/sdk v1.22.0 + go.opentelemetry.io/otel/trace v1.22.0 go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.26.0 // indirect golang.org/x/net v0.19.0 // indirect diff --git a/go.sum b/go.sum index f9391b6..5aeacaf 100644 --- a/go.sum +++ b/go.sum @@ -17,8 +17,11 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/go-kit/log v0.2.1 h1:MRVx0/zhvdseW+Gza6N9rVzU/IVzaeE1SFI4raAhmBU= github.com/go-logfmt/logfmt v0.5.1 h1:otpy5pqBCBZ1ng9RQ0dPu4PN7ba75Y/aA+UpowDyNVA= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= +github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= diff --git a/internal/transform/span.go b/internal/transform/span.go new file mode 100644 index 0000000..29fbe14 --- /dev/null +++ b/internal/transform/span.go @@ -0,0 +1,166 @@ +package transform + +import ( + "hash/fnv" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" +) + +func Spans(sdl []sdktrace.ReadOnlySpan) ptrace.Traces { + traces := ptrace.NewTraces() + rss := traces.ResourceSpans() + resMap := make(map[uint64]ptrace.ResourceSpans) + scopeMap := make(map[uint64]ptrace.ScopeSpans) + + for _, s := range sdl { + var rs ptrace.ResourceSpans + if r, ok := resMap[hashResource(s.Resource())]; ok { + rs = r + } else { + // create a new resource + // append it to the traces + // add it to the map + rs = rss.AppendEmpty() + resMap[hashResource(s.Resource())] = rs + } + + res := rs.Resource() + res.SetDroppedAttributesCount(0) // TODO(wperron) how can we get this number? + ra := transformAttributes(s.Resource().Attributes()) + ra.CopyTo(res.Attributes()) + + var ss ptrace.ScopeSpans + if scope, ok := scopeMap[hashScope(s.InstrumentationScope())]; ok { + ss = scope + } else { + // create a new scope + // append it to the resource + // add it to the map + ss = rs.ScopeSpans().AppendEmpty() + scopeMap[hashScope(s.InstrumentationScope())] = ss + } + + // create a new span and fill it with the info from the readonly span + span := ss.Spans().AppendEmpty() + span.SetTraceID(pcommon.TraceID(s.SpanContext().TraceID())) + span.SetSpanID(pcommon.SpanID(s.SpanContext().SpanID())) + span.SetParentSpanID(pcommon.SpanID(s.Parent().SpanID())) + span.TraceState().FromRaw(s.SpanContext().TraceState().String()) + span.SetName(s.Name()) + span.SetKind(ptrace.SpanKind(s.SpanKind())) + span.SetStartTimestamp(pcommon.NewTimestampFromTime(s.StartTime())) + span.SetEndTimestamp(pcommon.NewTimestampFromTime(s.EndTime())) + var code ptrace.StatusCode + switch s.Status().Code { + case codes.Unset: + code = ptrace.StatusCodeUnset + case codes.Ok: + code = ptrace.StatusCodeOk + case codes.Error: + code = ptrace.StatusCodeError + default: + panic("unreachable") + } + span.Status().SetCode(code) + span.Status().SetMessage(s.Status().Description) + span.SetDroppedAttributesCount(uint32(s.DroppedAttributes())) + span.SetDroppedEventsCount(uint32(s.DroppedEvents())) + span.SetDroppedLinksCount(uint32(s.DroppedLinks())) + + a := transformAttributes(s.Attributes()) + a.CopyTo(span.Attributes()) + + for _, e := range s.Events() { + ev := span.Events().AppendEmpty() + ev.SetTimestamp(pcommon.NewTimestampFromTime(e.Time)) + ev.SetName(e.Name) + ev.SetDroppedAttributesCount(uint32(e.DroppedAttributeCount)) + ea := transformAttributes(e.Attributes) + ea.CopyTo(ev.Attributes()) + } + + for _, l := range s.Links() { + ln := span.Links().AppendEmpty() + ln.SetTraceID(pcommon.TraceID(l.SpanContext.TraceID())) + ln.SetSpanID(pcommon.SpanID(l.SpanContext.SpanID())) + ln.TraceState().FromRaw(l.SpanContext.TraceState().String()) + ln.SetDroppedAttributesCount(uint32(l.DroppedAttributeCount)) + la := transformAttributes(l.Attributes) + la.CopyTo(ln.Attributes()) + } + } + + return traces +} + +func transformAttributes(from []attribute.KeyValue) pcommon.Map { + to := pcommon.NewMap() + to.EnsureCapacity(len(from)) + + for _, a := range from { + switch a.Value.Type() { + case attribute.BOOL: + to.PutBool(string(a.Key), a.Value.AsBool()) + case attribute.INT64: + to.PutInt(string(a.Key), a.Value.AsInt64()) + case attribute.FLOAT64: + to.PutDouble(string(a.Key), a.Value.AsFloat64()) + case attribute.STRING: + to.PutStr(string(a.Key), a.Value.AsString()) + case attribute.BOOLSLICE: + s := to.PutEmptySlice(string(a.Key)) + raw := a.Value.AsBoolSlice() + s.EnsureCapacity(len(raw)) + for _, r := range raw { + v := s.AppendEmpty() + v.SetBool(r) + } + case attribute.INT64SLICE: + s := to.PutEmptySlice(string(a.Key)) + raw := a.Value.AsInt64Slice() + s.EnsureCapacity(len(raw)) + for _, r := range raw { + v := s.AppendEmpty() + v.SetInt(r) + } + case attribute.STRINGSLICE: + s := to.PutEmptySlice(string(a.Key)) + raw := a.Value.AsStringSlice() + s.EnsureCapacity(len(raw)) + for _, r := range raw { + v := s.AppendEmpty() + v.SetStr(r) + } + case attribute.FLOAT64SLICE: + s := to.PutEmptySlice(string(a.Key)) + raw := a.Value.AsFloat64Slice() + s.EnsureCapacity(len(raw)) + for _, r := range raw { + v := s.AppendEmpty() + v.SetDouble(r) + } + } + } + + return to +} + +func hashResource(res *resource.Resource) uint64 { + h := fnv.New64a() + h.Write([]byte(res.Encoded(attribute.DefaultEncoder()))) + return h.Sum64() +} + +func hashScope(s instrumentation.Scope) uint64 { + h := fnv.New64a() + h.Write([]byte(s.Name)) + h.Write([]byte(s.SchemaURL)) + h.Write([]byte(s.Version)) + return h.Sum64() +} diff --git a/internal/transform/span_test.go b/internal/transform/span_test.go new file mode 100644 index 0000000..af6b083 --- /dev/null +++ b/internal/transform/span_test.go @@ -0,0 +1,234 @@ +package transform + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + "go.opentelemetry.io/otel/trace" +) + +func TestTransformSpan(t *testing.T) { + sdkspanslice := make([]sdktrace.ReadOnlySpan, 1) + start := time.Unix(1000, 0) + end := start.Add(5 * time.Second) + evtts := time.Unix(1500, 0) + + stringVals := []string{"first", "second"} + intVals := []int{1, 2, 3} + floatVals := []float64{1.1, 2.2, 3.3} + boolVals := []bool{true, false} + + sdkspanslice[0] = tracetest.SpanStub{ + Name: "span-stub", + SpanContext: trace.SpanContext{}. + WithSpanID(trace.SpanID{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01}). + WithTraceID(trace.TraceID{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01}). + WithTraceState(must(trace.ParseTraceState("foo=bar"))), + Parent: trace.SpanContext{}. + WithSpanID(trace.SpanID{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x11}). + WithTraceID(trace.TraceID{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01}). + WithTraceState(must(trace.ParseTraceState("foo=bar"))), + SpanKind: trace.SpanKindServer, + StartTime: start, + EndTime: end, + Attributes: []attribute.KeyValue{ + { + Key: "stringkey", + Value: attribute.StringValue("stringval"), + }, + { + Key: "intkey", + Value: attribute.IntValue(123), + }, + { + Key: "floatkey", + Value: attribute.Float64Value(111.2), + }, + { + Key: "boolkey", + Value: attribute.BoolValue(true), + }, + { + Key: "stringslicekey", + Value: attribute.StringSliceValue(stringVals), + }, + { + Key: "intslicekey", + Value: attribute.IntSliceValue(intVals), + }, + { + Key: "floatslicekey", + Value: attribute.Float64SliceValue(floatVals), + }, + { + Key: "boolslicekey", + Value: attribute.BoolSliceValue(boolVals), + }, + }, + Events: []sdktrace.Event{ + { + Name: "spanevent", + Attributes: []attribute.KeyValue{ + { + Key: "eventstringkey", + Value: attribute.StringValue("eventstringval"), + }, + }, + DroppedAttributeCount: 4, + Time: evtts, + }, + }, + Links: []sdktrace.Link{ + { + SpanContext: trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: [16]byte{0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef, 0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef}, + SpanID: [8]byte{0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef}, + TraceFlags: 1, + TraceState: must(trace.ParseTraceState("foo=bar")), + Remote: false, + }), + Attributes: []attribute.KeyValue{ + { + Key: "linkstringkey", + Value: attribute.StringValue("linkstringval"), + }, + }, + DroppedAttributeCount: 5, + }, + }, + Status: sdktrace.Status{ + Code: codes.Ok, + Description: "OK", + }, + DroppedAttributes: 1, + DroppedEvents: 2, + DroppedLinks: 3, + ChildSpanCount: 0, + Resource: resource.NewWithAttributes("https://opentelemetry.io/schemas/1.24.0", + attribute.KeyValue{Key: "service.name", Value: attribute.StringValue("test-service")}, + ), + InstrumentationLibrary: instrumentation.Scope{ + Name: "test-tracer", + Version: "0.0.1", + SchemaURL: "https://opentelemetry.io/schemas/1.24.0", + }, + }.Snapshot() + + spans := Spans(sdkspanslice) + assert.Equal(t, 1, spans.ResourceSpans().Len()) + + for i := 0; i < spans.ResourceSpans().Len(); i++ { + require.Less(t, i, 1) + rs := spans.ResourceSpans().At(i) + res := rs.Resource() + exp := pcommon.NewMap() + exp.PutStr("service.name", "test-service") + assert.Equal(t, exp, res.Attributes()) + assert.Equal(t, uint32(0), res.DroppedAttributesCount()) + + for j := 0; j < rs.ScopeSpans().Len(); j++ { + require.Less(t, j, 1) + ss := rs.ScopeSpans().At(j) + for k := 0; k < ss.Spans().Len(); k++ { + require.Less(t, k, 1) + span := ss.Spans().At(k) + + ts := pcommon.NewTraceState() + ts.FromRaw("foo=bar") + assert.Equal(t, pcommon.SpanID{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01}, span.SpanID()) + assert.Equal(t, pcommon.TraceID{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01}, span.TraceID()) + assert.Equal(t, pcommon.SpanID{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x11}, span.ParentSpanID()) + assert.Equal(t, ts, span.TraceState()) + assert.Equal(t, "span-stub", span.Name()) + assert.Equal(t, ptrace.SpanKindServer, span.Kind()) + assert.Equal(t, ptrace.StatusCodeOk, span.Status().Code()) + assert.Equal(t, "OK", span.Status().Message()) + assert.Equal(t, pcommon.NewTimestampFromTime(start), span.StartTimestamp()) + assert.Equal(t, pcommon.NewTimestampFromTime(end), span.EndTimestamp()) + assert.Equal(t, uint32(1), span.DroppedAttributesCount()) + assert.Equal(t, uint32(2), span.DroppedEventsCount()) + assert.Equal(t, uint32(3), span.DroppedLinksCount()) + + a, ok := span.Attributes().Get("stringkey") + assert.True(t, ok) + assert.Equal(t, "stringval", a.Str()) + + a, ok = span.Attributes().Get("intkey") + assert.True(t, ok) + assert.Equal(t, int64(123), a.Int()) + + a, ok = span.Attributes().Get("floatkey") + assert.True(t, ok) + assert.Equal(t, float64(111.2), a.Double()) + + a, ok = span.Attributes().Get("boolkey") + assert.True(t, ok) + assert.Equal(t, true, a.Bool()) + + a, ok = span.Attributes().Get("stringslicekey") + assert.True(t, ok) + for i := 0; i < a.Slice().Len(); i++ { + v := a.Slice().At(i) + assert.Equal(t, stringVals[i], v.Str()) + } + + a, ok = span.Attributes().Get("intslicekey") + assert.True(t, ok) + for i := 0; i < a.Slice().Len(); i++ { + v := a.Slice().At(i) + assert.Equal(t, int64(intVals[i]), v.Int()) + } + + a, ok = span.Attributes().Get("floatslicekey") + assert.True(t, ok) + for i := 0; i < a.Slice().Len(); i++ { + v := a.Slice().At(i) + assert.Equal(t, floatVals[i], v.Double()) + } + + a, ok = span.Attributes().Get("boolslicekey") + assert.True(t, ok) + for i := 0; i < a.Slice().Len(); i++ { + v := a.Slice().At(i) + assert.Equal(t, boolVals[i], v.Bool()) + } + + assert.Equal(t, 1, span.Events().Len()) + ev := span.Events().At(0) + assert.Equal(t, evtts.UTC(), ev.Timestamp().AsTime()) + assert.Equal(t, "spanevent", ev.Name()) + assert.Equal(t, uint32(4), ev.DroppedAttributesCount()) + a, ok = ev.Attributes().Get("eventstringkey") + assert.True(t, ok) + assert.Equal(t, "eventstringval", a.Str()) + + assert.Equal(t, 1, span.Links().Len()) + ln := span.Links().At(0) + assert.Equal(t, pcommon.TraceID([16]byte{0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef, 0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef}), ln.TraceID()) + assert.Equal(t, pcommon.SpanID([8]byte{0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef}), ln.SpanID()) + assert.Equal(t, ts, ln.TraceState()) + assert.Equal(t, uint32(5), ln.DroppedAttributesCount()) + a, ok = ln.Attributes().Get("linkstringkey") + assert.True(t, ok) + assert.Equal(t, "linkstringval", a.Str()) + } + } + } +} + +func must[T any](v T, err error) T { + if err != nil { + panic(err) + } + return v +} diff --git a/migrations/20240120195122_init.up.sql b/migrations/20240120195122_init.up.sql index 11568da..9389092 100644 --- a/migrations/20240120195122_init.up.sql +++ b/migrations/20240120195122_init.up.sql @@ -14,6 +14,8 @@ CREATE TABLE IF NOT EXISTS spans( "status_description" TEXT, "attributes" TEXT, "dropped_attributes_count" INTEGER, + "dropped_events_count" INTEGER, + "dropped_links_count" INTEGER, "resource_attributes" TEXT, "resource_dropped_attributes_count" INTEGER, PRIMARY KEY ("span_id", "trace_id") diff --git a/sqlite_exporter.go b/sqlite_exporter.go index b2c1cea..95f88cb 100644 --- a/sqlite_exporter.go +++ b/sqlite_exporter.go @@ -11,12 +11,40 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.wperron.io/sqliteexporter/internal/transform" ) +// The goal is to have the sqliteexporter usable both in a collector deployment +// but also embedded in an app. As such, it must satisfy the component.Component +// interface and the sdktrace.SpanExporter interface. +var _ sdktrace.SpanExporter = &sqliteExporter{} +var _ component.Component = &sqliteExporter{} + type sqliteExporter struct { db *sql.DB } +// DO NOT CHANGE: any modification will not be backwards compatible and +// must never be done outside of a new major release. +// ExportSpans exports a batch of spans. +// +// This function is called synchronously, so there is no concurrency +// safety requirement. However, due to the synchronous calling pattern, +// it is critical that all timeouts and cancellations contained in the +// passed context must be honored. +// +// Any retry logic must be contained in this function. The SDK that +// calls this function will not implement any retry logic. All errors +// returned by this function are considered unrecoverable and will be +// reported to a configured error Handler. +// DO NOT CHANGE: any modification will not be backwards compatible and +// must never be done outside of a new major release. +func (e *sqliteExporter) ExportSpans(ctx context.Context, spans []sdktrace.ReadOnlySpan) error { + ss := transform.Spans(spans) + return e.ConsumeTraces(ctx, ss) +} + // Start tells the component to start. Host parameter can be used for communicating // with the host after Start() has already returned. If an error is returned by // Start() then the collector startup will be aborted. @@ -68,11 +96,13 @@ const insertSpanQ string = `INSERT INTO spans status_description, attributes, dropped_attributes_count, + dropped_events_count, + dropped_links_count, resource_attributes, resource_dropped_attributes_count ) VALUES ( - ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, json(?), ?, json(?), ? + ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, json(?), ?, ?, ?, json(?), ? ); ` @@ -173,6 +203,8 @@ func (e *sqliteExporter) ConsumeTraces(ctx context.Context, traces ptrace.Traces span.Status().Message(), attrs, span.DroppedAttributesCount(), + span.DroppedEventsCount(), + span.DroppedLinksCount(), rattrs, resource.Resource().DroppedAttributesCount(), ) diff --git a/traces.db b/traces.db new file mode 100644 index 0000000..034e667 Binary files /dev/null and b/traces.db differ