From 27567d16a2c630d568aff2b440913fd854a00483 Mon Sep 17 00:00:00 2001 From: William Perron Date: Tue, 6 Feb 2024 17:37:50 -0500 Subject: [PATCH] convert to opentelemetry part 1 --- cmd/themis-server/main.go | 156 +++++++++++++-------------- correlation/compat/zerolog/compat.go | 16 --- correlation/correlation.go | 47 -------- correlation/correlation_test.go | 18 ---- correlation/crypto.go | 19 ---- correlation/math.go | 18 ---- correlation/opentelemetry.go | 44 ++++++++ 7 files changed, 116 insertions(+), 202 deletions(-) delete mode 100644 correlation/compat/zerolog/compat.go delete mode 100644 correlation/correlation.go delete mode 100644 correlation/correlation_test.go delete mode 100644 correlation/crypto.go delete mode 100644 correlation/math.go create mode 100644 correlation/opentelemetry.go diff --git a/cmd/themis-server/main.go b/cmd/themis-server/main.go index 8b60574..5dbeee9 100644 --- a/cmd/themis-server/main.go +++ b/cmd/themis-server/main.go @@ -22,6 +22,7 @@ import ( "github.com/rs/zerolog/log" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/trace" @@ -29,7 +30,6 @@ import ( "go.wperron.io/themis" "go.wperron.io/themis/correlation" - zerologcompat "go.wperron.io/themis/correlation/compat/zerolog" ) const ( @@ -40,10 +40,9 @@ var ( dbFile = flag.String("db", "", "SQlite database file path.") debug = flag.Bool("debug", false, "Set log level to DEBUG.") - store *themis.Store - tracer trace.Tracer - seq = &correlation.CryptoRandSequencer{} - gen = correlation.NewGenerator(seq) + store *themis.Store + tracer trace.Tracer + propagator = propagation.TraceContext{} ) type Handler func(ctx context.Context, s *discordgo.Session, i *discordgo.InteractionCreate) error @@ -62,7 +61,7 @@ func main() { zerolog.SetGlobalLevel(zerolog.DebugLevel) } log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stdout}) - log.Logger = log.Logger.Hook(zerologcompat.CorrelationHook{}) + log.Logger = log.Logger.Hook(correlation.TraceContextHook{}) zerolog.DurationFieldUnit = time.Millisecond go func() { @@ -254,10 +253,9 @@ func main() { }, }) if err != nil { - log.Error().Ctx(ctx).Err(err).Msg("failed to respond to interaction") + return fmt.Errorf("failed to respond to interaction: %w", err) } - log.Error().Ctx(ctx).Err(err).Msg("failed to count claims") - return + return fmt.Errorf("failed to count claims: %w", err) } ev, err := store.LastOf(ctx, themis.EventFlush) @@ -271,10 +269,9 @@ func main() { }, }) if err != nil { - log.Error().Ctx(ctx).Err(err).Msg("failed to respond to interaction") + return fmt.Errorf("failed to respond to interaction: %w", err) } - log.Error().Ctx(ctx).Err(err).Msg("failed get last flush event") - return + return fmt.Errorf("failed get last flush event: %w", err) } lastFlush = "never" } else { @@ -288,10 +285,11 @@ func main() { }, }) if err != nil { - log.Error().Ctx(ctx).Err(err).Msg("failed to respond to interaction") + return fmt.Errorf("failed to respond to interaction: %w", err) } + return nil }, - "list-claims": func(ctx context.Context, s *discordgo.Session, i *discordgo.InteractionCreate) { + "list-claims": func(ctx context.Context, s *discordgo.Session, i *discordgo.InteractionCreate) error { claims, err := store.ListClaims(ctx) if err != nil { err := s.InteractionRespond(i.Interaction, &discordgo.InteractionResponse{ @@ -301,10 +299,9 @@ func main() { }, }) if err != nil { - log.Error().Ctx(ctx).Err(err).Msg("failed to respond to interaction") + return fmt.Errorf("failed to respond to interaction: %w", err) } - log.Error().Ctx(ctx).Err(err).Msg("failed to list claims") - return + return fmt.Errorf("failed to list claims: %w", err) } sb := strings.Builder{} @@ -320,14 +317,15 @@ func main() { }, }) if err != nil { - log.Error().Ctx(ctx).Err(err).Msg("failed to respond to interaction") + return fmt.Errorf("failed to respond to interaction: %w", err) } + return nil }, - "claim": func(ctx context.Context, s *discordgo.Session, i *discordgo.InteractionCreate) { + "claim": func(ctx context.Context, s *discordgo.Session, i *discordgo.InteractionCreate) error { if i.Type == discordgo.InteractionApplicationCommandAutocomplete { log.Debug().Ctx(ctx).Msg("command type interaction autocomplete") handleClaimAutocomplete(ctx, store, s, i) - return + return nil } opts := i.ApplicationCommandData().Options @@ -339,9 +337,9 @@ func main() { }, }) if err != nil { - log.Error().Ctx(ctx).Err(err).Msg("failed to respond to interaction") + return fmt.Errorf("failed to respond to interaction: %w", err) } - return + return nil } claimType, err := themis.ClaimTypeFromString(opts[0].StringValue()) @@ -353,10 +351,9 @@ func main() { }, }) if err != nil { - log.Error().Ctx(ctx).Err(err).Msg("failed to respond to interaction") + return fmt.Errorf("failed to respond to interaction: %w", err) } - log.Error().Ctx(ctx).Err(err).Str("claim_type", opts[0].StringValue()).Msg("failed to parse claim") - return + return fmt.Errorf("failed to parse claim: %w", err) } name := opts[1].StringValue() @@ -385,9 +382,9 @@ func main() { }, }) if err != nil { - log.Error().Ctx(ctx).Err(err).Msg("failed to respond to interaction") + return fmt.Errorf("failed to respond to interaction") } - return + return nil } err = s.InteractionRespond(i.Interaction, &discordgo.InteractionResponse{ @@ -397,10 +394,9 @@ func main() { }, }) if err != nil { - log.Error().Ctx(ctx).Err(err).Msg("failed to respond to interaction") + return fmt.Errorf("failed to respond to interaction: %w", err) } - log.Error().Ctx(ctx).Err(err).Msg("failed to acquire claim") - return + return fmt.Errorf("failed to acquire claim: %w", err) } err = s.InteractionRespond(i.Interaction, &discordgo.InteractionResponse{ @@ -410,10 +406,11 @@ func main() { }, }) if err != nil { - log.Error().Ctx(ctx).Err(err).Msg("failed to respond to interaction") + return fmt.Errorf("failed to respond to interaction: %w", err) } + return nil }, - "describe-claim": func(ctx context.Context, s *discordgo.Session, i *discordgo.InteractionCreate) { + "describe-claim": func(ctx context.Context, s *discordgo.Session, i *discordgo.InteractionCreate) error { id := i.ApplicationCommandData().Options[0] detail, err := store.DescribeClaim(ctx, int(id.IntValue())) if err != nil { @@ -424,10 +421,9 @@ func main() { }, }) if err != nil { - log.Error().Ctx(ctx).Err(err).Msg("failed to respond to interaction") + return fmt.Errorf("failed to respond to interaction: %w", err) } - log.Error().Ctx(ctx).Err(err).Msg("failed to describe claim") - return + return fmt.Errorf("failed to describe claim: %w", err) } sb := strings.Builder{} @@ -443,10 +439,11 @@ func main() { }, }) if err != nil { - log.Error().Ctx(ctx).Err(err).Msg("failed to respond to interaction") + return fmt.Errorf("failed to respond to interaction: %w", err) } + return nil }, - "delete-claim": func(ctx context.Context, s *discordgo.Session, i *discordgo.InteractionCreate) { + "delete-claim": func(ctx context.Context, s *discordgo.Session, i *discordgo.InteractionCreate) error { id := i.ApplicationCommandData().Options[0] userId := i.Member.User.ID err := store.DeleteClaim(ctx, int(id.IntValue()), userId) @@ -462,11 +459,10 @@ func main() { }, }) if err != nil { - log.Error().Ctx(ctx).Err(err).Msg("failed to respond to interaction") + return fmt.Errorf("failed to respond to interaction: %w", err) } - log.Error().Ctx(ctx).Err(err).Msg("failed to delete claim") - return + return fmt.Errorf("failed to delete claim: %w", err) } err = s.InteractionRespond(i.Interaction, &discordgo.InteractionResponse{ @@ -476,20 +472,19 @@ func main() { }, }) if err != nil { - log.Error().Ctx(ctx).Err(err).Msg("failed to respond to interaction") + return fmt.Errorf("failed to respond to interaction: %w", err) } + return nil }, - "flush": func(ctx context.Context, s *discordgo.Session, i *discordgo.InteractionCreate) { - cid := correlation.FromContext(ctx) + "flush": func(ctx context.Context, s *discordgo.Session, i *discordgo.InteractionCreate) error { baggage := make(url.Values) - baggage.Set("correlation_id", cid.String()) - state := baggage.Encode() + propagator.Inject(ctx, correlation.UrlValuesCarrier(baggage)) sb := strings.Builder{} sb.WriteString("modal_flush") - if state != "" { + if len(baggage) != 0 { sb.WriteRune(':') - sb.WriteString(state) + sb.WriteString(baggage.Encode()) } if err := s.InteractionRespond(i.Interaction, &discordgo.InteractionResponse{ @@ -520,14 +515,14 @@ func main() { }, }, }); err != nil { - log.Error().Ctx(ctx).Err(err).Msg("failed to respond to interaction") + return fmt.Errorf("failed to respond to interaction: %w", err) } + return nil }, - "query": func(ctx context.Context, s *discordgo.Session, i *discordgo.InteractionCreate) { + "query": func(ctx context.Context, s *discordgo.Session, i *discordgo.InteractionCreate) error { roDB, err := sql.Open("sqlite3", fmt.Sprintf("file:%s?cache=private&mode=ro", *dbFile)) if err != nil { - log.Error().Ctx(ctx).Err(err).Msg("failed to open read-only copy of database") - return + return fmt.Errorf("failed to open read-only copy of database: %w", err) } q := i.ApplicationCommandData().Options[0].StringValue() @@ -535,14 +530,12 @@ func main() { defer cancelDeadline() rows, err := roDB.QueryContext(deadlined, q) if err != nil { - log.Error().Ctx(ctx).Err(err).Msg("failed to exec user-provided query") - return + return fmt.Errorf("faied to exec user-provided query: %w", err) } fmtd, err := themis.FormatRows(ctx, rows) if err != nil { - log.Error().Ctx(ctx).Err(err).Msg("failed to format rows") - return + return fmt.Errorf("failed to format rows: %w", err) } // 2000 is a magic number here, it's the character limit for a discord @@ -557,9 +550,11 @@ func main() { }, }); err != nil { log.Error().Ctx(ctx).Err(err).Msg("failed to respond to interaction") + return fmt.Errorf("failed to respond to interaction: %w", err) } + return nil }, - "schedule": func(ctx context.Context, s *discordgo.Session, i *discordgo.InteractionCreate) { + "schedule": func(ctx context.Context, s *discordgo.Session, i *discordgo.InteractionCreate) error { // get schedule from now to 4 mondays into the future sched, err := store.GetSchedule(ctx, themis.NextMonday(), themis.NextMonday().Add(4*7*24*time.Hour)) if err != nil { @@ -569,10 +564,9 @@ func main() { Content: "failed to get schedule, check logs for more info.", }, }); err != nil { - log.Error().Ctx(ctx).Err(err).Msg("failed to respond to interaction") + return fmt.Errorf("failed to respond to interaction: %w", err) } - log.Error().Ctx(ctx).Err(err).Msg("failed to get schedule") - return + return fmt.Errorf("failed to get schedule: %w", err) } sb := strings.Builder{} @@ -602,9 +596,11 @@ func main() { }, }); err != nil { log.Error().Ctx(ctx).Err(err).Msg("failed to respond to interaction") + return fmt.Errorf("failed to respond to interaction: %w", err) } + return nil }, - "send-schedule": func(ctx context.Context, s *discordgo.Session, i *discordgo.InteractionCreate) { + "send-schedule": func(ctx context.Context, s *discordgo.Session, i *discordgo.InteractionCreate) error { notifier.Send() if err := s.InteractionRespond(i.Interaction, &discordgo.InteractionResponse{ @@ -614,9 +610,11 @@ func main() { }, }); err != nil { log.Error().Ctx(ctx).Err(err).Msg("failed to respond to interaction") + return fmt.Errorf("failed to respond to interaction: %w", err) } + return nil }, - "absent": func(ctx context.Context, s *discordgo.Session, i *discordgo.InteractionCreate) { + "absent": func(ctx context.Context, s *discordgo.Session, i *discordgo.InteractionCreate) error { var rawDate string if len(i.ApplicationCommandData().Options) == 0 { rawDate = themis.NextMonday().Format(time.DateOnly) @@ -632,9 +630,9 @@ func main() { Content: "failed to parse provided date, make sure to use the YYYY-MM-DD format.", }, }); err != nil { - log.Error().Ctx(ctx).Err(err).Msg("failed to respond to interaction") + return fmt.Errorf("failed to respond to interaction: %w", err) } - return + return nil } if date.Before(time.Now()) { @@ -644,9 +642,9 @@ func main() { Content: "The date must be some time in the future.", }, }); err != nil { - log.Error().Ctx(ctx).Err(err).Msg("failed to respond to interaction") + return fmt.Errorf("failed to respond to interaction: %w", err) } - return + return nil } if date.Weekday() != time.Monday { @@ -656,10 +654,10 @@ func main() { Content: "The date you provided is not a Monday.", }, }); err != nil { - log.Error().Ctx(ctx).Err(err).Msg("failed to respond to interaction") + return fmt.Errorf("failed to respond to interaction: %w", err) } // TODO(wperron) suggest Mondays before and after? - return + return nil } userId := i.Member.User.ID @@ -670,9 +668,9 @@ func main() { Content: "something went wrong recording your absence, check logs for more info.", }, }); err != nil { - log.Error().Ctx(ctx).Err(err).Msg("failed to respond to interaction") + return fmt.Errorf("failed to respond to interaction: %w", err) } - return + return fmt.Errorf("failed to record absence: %w", err) } err = s.InteractionRespond(i.Interaction, &discordgo.InteractionResponse{ @@ -682,8 +680,9 @@ func main() { }, }) if err != nil { - log.Error().Ctx(ctx).Err(err).Msg("failed to respond to interaction") + return fmt.Errorf("failed to respond to interaction: %w", err) } + return nil }, } @@ -780,7 +779,6 @@ func registerHandlers(sess *discordgo.Session, handlers map[string]Handler) { case discordgo.InteractionApplicationCommand: ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - ctx = context.WithValue(ctx, "correlation_id", gen.Next()) if h, ok := handlers[i.ApplicationCommandData().Name]; ok { withLogging(i.ApplicationCommandData().Name, h)(ctx, s, i) @@ -791,16 +789,11 @@ func registerHandlers(sess *discordgo.Session, handlers map[string]Handler) { state, err := parseCustomIDState(i.ModalSubmitData().CustomID) if err != nil { - log.Error().Ctx(ctx).Err(err).Msg("unexpected error occured while parsing state from custom id, returning early.") + log.Error().Ctx(ctx).Err(err).Msg("unexpected error occurred while parsing state from custom id, returning early.") return } - cid := state.Get("correlation_id") - if cid != "" { - ctx = context.WithValue(ctx, "correlation_id", cid) - } else { - ctx = context.WithValue(ctx, "correlation_id", gen.Next()) - } + ctx = propagator.Extract(ctx, correlation.UrlValuesCarrier(state)) if strings.HasPrefix(i.ModalSubmitData().CustomID, "modal_flush") { sub := i.ModalSubmitData().Components[0].(*discordgo.ActionsRow).Components[0].(*discordgo.TextInput).Value @@ -848,12 +841,7 @@ func registerHandlers(sess *discordgo.Session, handlers map[string]Handler) { return } - cid := state.Get("correlation_id") - if cid != "" { - ctx = context.WithValue(ctx, "correlation_id", cid) - } else { - ctx = context.WithValue(ctx, "correlation_id", gen.Next()) - } + ctx = propagator.Extract(ctx, correlation.UrlValuesCarrier(state)) switch i.MessageComponentData().CustomID { case "schedule-response": diff --git a/correlation/compat/zerolog/compat.go b/correlation/compat/zerolog/compat.go deleted file mode 100644 index 985b0ae..0000000 --- a/correlation/compat/zerolog/compat.go +++ /dev/null @@ -1,16 +0,0 @@ -package zerolog - -import ( - zl "github.com/rs/zerolog" - "go.wperron.io/themis/correlation" -) - -type CorrelationHook struct{} - -func (h CorrelationHook) Run(e *zl.Event, level zl.Level, msg string) { - ctx := e.GetCtx() - c := correlation.FromContext(ctx) - if c != nil { - e.Stringer("correlation_id", c) - } -} diff --git a/correlation/correlation.go b/correlation/correlation.go deleted file mode 100644 index 6e2c92b..0000000 --- a/correlation/correlation.go +++ /dev/null @@ -1,47 +0,0 @@ -package correlation - -import ( - "context" - "encoding/hex" -) - -const Key string = "correlation_id" - -var Empty CorrelationID = CorrelationID{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00} - -// Correlation ID is a byte array of length 16 -type CorrelationID []byte - -func (ci CorrelationID) String() string { - if ci == nil { - return hex.EncodeToString(Empty) - } - return hex.EncodeToString(ci) -} - -func FromContext(ctx context.Context) CorrelationID { - if v := ctx.Value("correlation_id"); v != nil { - if c, ok := v.(CorrelationID); ok { - return c - } - } - return nil -} - -type Sequencer interface { - Next() []byte -} - -type Generator struct { - seq Sequencer -} - -func NewGenerator(seq Sequencer) *Generator { - return &Generator{ - seq: seq, - } -} - -func (g *Generator) Next() CorrelationID { - return CorrelationID(g.seq.Next()) -} diff --git a/correlation/correlation_test.go b/correlation/correlation_test.go deleted file mode 100644 index 565ec3d..0000000 --- a/correlation/correlation_test.go +++ /dev/null @@ -1,18 +0,0 @@ -package correlation - -import ( - "math/rand" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestGeneratorNext(t *testing.T) { - rand.Seed(0) - seq := &MathRandSequencer{} - gen := NewGenerator(seq) - - assert.Equal(t, "0194fdc2fa2ffcc041d3ff12045b73c8", gen.Next().String()) - assert.Equal(t, "6e4ff95ff662a5eee82abdf44a2d0b75", gen.Next().String()) - assert.Equal(t, "fb180daf48a79ee0b10d394651850fd4", gen.Next().String()) -} diff --git a/correlation/crypto.go b/correlation/crypto.go deleted file mode 100644 index 7656e3c..0000000 --- a/correlation/crypto.go +++ /dev/null @@ -1,19 +0,0 @@ -package correlation - -import "crypto/rand" - -type CryptoRandSequencer struct{} - -func (crs *CryptoRandSequencer) Next() []byte { - buf := make([]byte, 16) - read, err := rand.Read(buf) - if err != nil { - panic("not implemented") - } - - if read != 16 { - panic("todo") - } - - return buf -} diff --git a/correlation/math.go b/correlation/math.go deleted file mode 100644 index 1577416..0000000 --- a/correlation/math.go +++ /dev/null @@ -1,18 +0,0 @@ -package correlation - -import "math/rand" - -type MathRandSequencer struct{} - -func (mrs *MathRandSequencer) Next() []byte { - buf := make([]byte, 16) - read, err := rand.Read(buf) - if err != nil { - panic("not implemented") - } - - if read != 16 { - panic("todo") - } - return buf -} diff --git a/correlation/opentelemetry.go b/correlation/opentelemetry.go new file mode 100644 index 0000000..0829a65 --- /dev/null +++ b/correlation/opentelemetry.go @@ -0,0 +1,44 @@ +package correlation + +import ( + "net/url" + + "github.com/rs/zerolog" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/trace" +) + +type TraceContextHook struct{} + +func (h TraceContextHook) Run(e *zerolog.Event, level zerolog.Level, msg string) { + ctx := e.GetCtx() + spanContext := trace.SpanContextFromContext(ctx) + trace_id := spanContext.TraceID() + if trace_id.IsValid() { + e.Stringer("trace_id", trace_id) + } +} + +var _ propagation.TextMapCarrier = UrlValuesCarrier{} + +type UrlValuesCarrier url.Values + +// Get implements propagation.TextMapCarrier. +func (u UrlValuesCarrier) Get(key string) string { + return url.Values(u).Get(key) +} + +// Keys implements propagation.TextMapCarrier. +func (u UrlValuesCarrier) Keys() []string { + raw := map[string][]string(u) + ks := make([]string, 0, len(raw)) + for k, _ := range raw { + ks = append(ks, k) + } + return ks +} + +// Set implements propagation.TextMapCarrier. +func (u UrlValuesCarrier) Set(key string, value string) { + url.Values(u).Set(key, value) +}