convert to opentelemetry part 1

main
William Perron 10 months ago
parent 1646762081
commit 27567d16a2
No known key found for this signature in database
GPG Key ID: 80535D1C3032BD6D

@ -22,6 +22,7 @@ import (
"github.com/rs/zerolog/log"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
@ -29,7 +30,6 @@ import (
"go.wperron.io/themis"
"go.wperron.io/themis/correlation"
zerologcompat "go.wperron.io/themis/correlation/compat/zerolog"
)
const (
@ -40,10 +40,9 @@ var (
dbFile = flag.String("db", "", "SQlite database file path.")
debug = flag.Bool("debug", false, "Set log level to DEBUG.")
store *themis.Store
tracer trace.Tracer
seq = &correlation.CryptoRandSequencer{}
gen = correlation.NewGenerator(seq)
store *themis.Store
tracer trace.Tracer
propagator = propagation.TraceContext{}
)
type Handler func(ctx context.Context, s *discordgo.Session, i *discordgo.InteractionCreate) error
@ -62,7 +61,7 @@ func main() {
zerolog.SetGlobalLevel(zerolog.DebugLevel)
}
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stdout})
log.Logger = log.Logger.Hook(zerologcompat.CorrelationHook{})
log.Logger = log.Logger.Hook(correlation.TraceContextHook{})
zerolog.DurationFieldUnit = time.Millisecond
go func() {
@ -254,10 +253,9 @@ func main() {
},
})
if err != nil {
log.Error().Ctx(ctx).Err(err).Msg("failed to respond to interaction")
return fmt.Errorf("failed to respond to interaction: %w", err)
}
log.Error().Ctx(ctx).Err(err).Msg("failed to count claims")
return
return fmt.Errorf("failed to count claims: %w", err)
}
ev, err := store.LastOf(ctx, themis.EventFlush)
@ -271,10 +269,9 @@ func main() {
},
})
if err != nil {
log.Error().Ctx(ctx).Err(err).Msg("failed to respond to interaction")
return fmt.Errorf("failed to respond to interaction: %w", err)
}
log.Error().Ctx(ctx).Err(err).Msg("failed get last flush event")
return
return fmt.Errorf("failed get last flush event: %w", err)
}
lastFlush = "never"
} else {
@ -288,10 +285,11 @@ func main() {
},
})
if err != nil {
log.Error().Ctx(ctx).Err(err).Msg("failed to respond to interaction")
return fmt.Errorf("failed to respond to interaction: %w", err)
}
return nil
},
"list-claims": func(ctx context.Context, s *discordgo.Session, i *discordgo.InteractionCreate) {
"list-claims": func(ctx context.Context, s *discordgo.Session, i *discordgo.InteractionCreate) error {
claims, err := store.ListClaims(ctx)
if err != nil {
err := s.InteractionRespond(i.Interaction, &discordgo.InteractionResponse{
@ -301,10 +299,9 @@ func main() {
},
})
if err != nil {
log.Error().Ctx(ctx).Err(err).Msg("failed to respond to interaction")
return fmt.Errorf("failed to respond to interaction: %w", err)
}
log.Error().Ctx(ctx).Err(err).Msg("failed to list claims")
return
return fmt.Errorf("failed to list claims: %w", err)
}
sb := strings.Builder{}
@ -320,14 +317,15 @@ func main() {
},
})
if err != nil {
log.Error().Ctx(ctx).Err(err).Msg("failed to respond to interaction")
return fmt.Errorf("failed to respond to interaction: %w", err)
}
return nil
},
"claim": func(ctx context.Context, s *discordgo.Session, i *discordgo.InteractionCreate) {
"claim": func(ctx context.Context, s *discordgo.Session, i *discordgo.InteractionCreate) error {
if i.Type == discordgo.InteractionApplicationCommandAutocomplete {
log.Debug().Ctx(ctx).Msg("command type interaction autocomplete")
handleClaimAutocomplete(ctx, store, s, i)
return
return nil
}
opts := i.ApplicationCommandData().Options
@ -339,9 +337,9 @@ func main() {
},
})
if err != nil {
log.Error().Ctx(ctx).Err(err).Msg("failed to respond to interaction")
return fmt.Errorf("failed to respond to interaction: %w", err)
}
return
return nil
}
claimType, err := themis.ClaimTypeFromString(opts[0].StringValue())
@ -353,10 +351,9 @@ func main() {
},
})
if err != nil {
log.Error().Ctx(ctx).Err(err).Msg("failed to respond to interaction")
return fmt.Errorf("failed to respond to interaction: %w", err)
}
log.Error().Ctx(ctx).Err(err).Str("claim_type", opts[0].StringValue()).Msg("failed to parse claim")
return
return fmt.Errorf("failed to parse claim: %w", err)
}
name := opts[1].StringValue()
@ -385,9 +382,9 @@ func main() {
},
})
if err != nil {
log.Error().Ctx(ctx).Err(err).Msg("failed to respond to interaction")
return fmt.Errorf("failed to respond to interaction")
}
return
return nil
}
err = s.InteractionRespond(i.Interaction, &discordgo.InteractionResponse{
@ -397,10 +394,9 @@ func main() {
},
})
if err != nil {
log.Error().Ctx(ctx).Err(err).Msg("failed to respond to interaction")
return fmt.Errorf("failed to respond to interaction: %w", err)
}
log.Error().Ctx(ctx).Err(err).Msg("failed to acquire claim")
return
return fmt.Errorf("failed to acquire claim: %w", err)
}
err = s.InteractionRespond(i.Interaction, &discordgo.InteractionResponse{
@ -410,10 +406,11 @@ func main() {
},
})
if err != nil {
log.Error().Ctx(ctx).Err(err).Msg("failed to respond to interaction")
return fmt.Errorf("failed to respond to interaction: %w", err)
}
return nil
},
"describe-claim": func(ctx context.Context, s *discordgo.Session, i *discordgo.InteractionCreate) {
"describe-claim": func(ctx context.Context, s *discordgo.Session, i *discordgo.InteractionCreate) error {
id := i.ApplicationCommandData().Options[0]
detail, err := store.DescribeClaim(ctx, int(id.IntValue()))
if err != nil {
@ -424,10 +421,9 @@ func main() {
},
})
if err != nil {
log.Error().Ctx(ctx).Err(err).Msg("failed to respond to interaction")
return fmt.Errorf("failed to respond to interaction: %w", err)
}
log.Error().Ctx(ctx).Err(err).Msg("failed to describe claim")
return
return fmt.Errorf("failed to describe claim: %w", err)
}
sb := strings.Builder{}
@ -443,10 +439,11 @@ func main() {
},
})
if err != nil {
log.Error().Ctx(ctx).Err(err).Msg("failed to respond to interaction")
return fmt.Errorf("failed to respond to interaction: %w", err)
}
return nil
},
"delete-claim": func(ctx context.Context, s *discordgo.Session, i *discordgo.InteractionCreate) {
"delete-claim": func(ctx context.Context, s *discordgo.Session, i *discordgo.InteractionCreate) error {
id := i.ApplicationCommandData().Options[0]
userId := i.Member.User.ID
err := store.DeleteClaim(ctx, int(id.IntValue()), userId)
@ -462,11 +459,10 @@ func main() {
},
})
if err != nil {
log.Error().Ctx(ctx).Err(err).Msg("failed to respond to interaction")
return fmt.Errorf("failed to respond to interaction: %w", err)
}
log.Error().Ctx(ctx).Err(err).Msg("failed to delete claim")
return
return fmt.Errorf("failed to delete claim: %w", err)
}
err = s.InteractionRespond(i.Interaction, &discordgo.InteractionResponse{
@ -476,20 +472,19 @@ func main() {
},
})
if err != nil {
log.Error().Ctx(ctx).Err(err).Msg("failed to respond to interaction")
return fmt.Errorf("failed to respond to interaction: %w", err)
}
return nil
},
"flush": func(ctx context.Context, s *discordgo.Session, i *discordgo.InteractionCreate) {
cid := correlation.FromContext(ctx)
"flush": func(ctx context.Context, s *discordgo.Session, i *discordgo.InteractionCreate) error {
baggage := make(url.Values)
baggage.Set("correlation_id", cid.String())
state := baggage.Encode()
propagator.Inject(ctx, correlation.UrlValuesCarrier(baggage))
sb := strings.Builder{}
sb.WriteString("modal_flush")
if state != "" {
if len(baggage) != 0 {
sb.WriteRune(':')
sb.WriteString(state)
sb.WriteString(baggage.Encode())
}
if err := s.InteractionRespond(i.Interaction, &discordgo.InteractionResponse{
@ -520,14 +515,14 @@ func main() {
},
},
}); err != nil {
log.Error().Ctx(ctx).Err(err).Msg("failed to respond to interaction")
return fmt.Errorf("failed to respond to interaction: %w", err)
}
return nil
},
"query": func(ctx context.Context, s *discordgo.Session, i *discordgo.InteractionCreate) {
"query": func(ctx context.Context, s *discordgo.Session, i *discordgo.InteractionCreate) error {
roDB, err := sql.Open("sqlite3", fmt.Sprintf("file:%s?cache=private&mode=ro", *dbFile))
if err != nil {
log.Error().Ctx(ctx).Err(err).Msg("failed to open read-only copy of database")
return
return fmt.Errorf("failed to open read-only copy of database: %w", err)
}
q := i.ApplicationCommandData().Options[0].StringValue()
@ -535,14 +530,12 @@ func main() {
defer cancelDeadline()
rows, err := roDB.QueryContext(deadlined, q)
if err != nil {
log.Error().Ctx(ctx).Err(err).Msg("failed to exec user-provided query")
return
return fmt.Errorf("faied to exec user-provided query: %w", err)
}
fmtd, err := themis.FormatRows(ctx, rows)
if err != nil {
log.Error().Ctx(ctx).Err(err).Msg("failed to format rows")
return
return fmt.Errorf("failed to format rows: %w", err)
}
// 2000 is a magic number here, it's the character limit for a discord
@ -557,9 +550,11 @@ func main() {
},
}); err != nil {
log.Error().Ctx(ctx).Err(err).Msg("failed to respond to interaction")
return fmt.Errorf("failed to respond to interaction: %w", err)
}
return nil
},
"schedule": func(ctx context.Context, s *discordgo.Session, i *discordgo.InteractionCreate) {
"schedule": func(ctx context.Context, s *discordgo.Session, i *discordgo.InteractionCreate) error {
// get schedule from now to 4 mondays into the future
sched, err := store.GetSchedule(ctx, themis.NextMonday(), themis.NextMonday().Add(4*7*24*time.Hour))
if err != nil {
@ -569,10 +564,9 @@ func main() {
Content: "failed to get schedule, check logs for more info.",
},
}); err != nil {
log.Error().Ctx(ctx).Err(err).Msg("failed to respond to interaction")
return fmt.Errorf("failed to respond to interaction: %w", err)
}
log.Error().Ctx(ctx).Err(err).Msg("failed to get schedule")
return
return fmt.Errorf("failed to get schedule: %w", err)
}
sb := strings.Builder{}
@ -602,9 +596,11 @@ func main() {
},
}); err != nil {
log.Error().Ctx(ctx).Err(err).Msg("failed to respond to interaction")
return fmt.Errorf("failed to respond to interaction: %w", err)
}
return nil
},
"send-schedule": func(ctx context.Context, s *discordgo.Session, i *discordgo.InteractionCreate) {
"send-schedule": func(ctx context.Context, s *discordgo.Session, i *discordgo.InteractionCreate) error {
notifier.Send()
if err := s.InteractionRespond(i.Interaction, &discordgo.InteractionResponse{
@ -614,9 +610,11 @@ func main() {
},
}); err != nil {
log.Error().Ctx(ctx).Err(err).Msg("failed to respond to interaction")
return fmt.Errorf("failed to respond to interaction: %w", err)
}
return nil
},
"absent": func(ctx context.Context, s *discordgo.Session, i *discordgo.InteractionCreate) {
"absent": func(ctx context.Context, s *discordgo.Session, i *discordgo.InteractionCreate) error {
var rawDate string
if len(i.ApplicationCommandData().Options) == 0 {
rawDate = themis.NextMonday().Format(time.DateOnly)
@ -632,9 +630,9 @@ func main() {
Content: "failed to parse provided date, make sure to use the YYYY-MM-DD format.",
},
}); err != nil {
log.Error().Ctx(ctx).Err(err).Msg("failed to respond to interaction")
return fmt.Errorf("failed to respond to interaction: %w", err)
}
return
return nil
}
if date.Before(time.Now()) {
@ -644,9 +642,9 @@ func main() {
Content: "The date must be some time in the future.",
},
}); err != nil {
log.Error().Ctx(ctx).Err(err).Msg("failed to respond to interaction")
return fmt.Errorf("failed to respond to interaction: %w", err)
}
return
return nil
}
if date.Weekday() != time.Monday {
@ -656,10 +654,10 @@ func main() {
Content: "The date you provided is not a Monday.",
},
}); err != nil {
log.Error().Ctx(ctx).Err(err).Msg("failed to respond to interaction")
return fmt.Errorf("failed to respond to interaction: %w", err)
}
// TODO(wperron) suggest Mondays before and after?
return
return nil
}
userId := i.Member.User.ID
@ -670,9 +668,9 @@ func main() {
Content: "something went wrong recording your absence, check logs for more info.",
},
}); err != nil {
log.Error().Ctx(ctx).Err(err).Msg("failed to respond to interaction")
return fmt.Errorf("failed to respond to interaction: %w", err)
}
return
return fmt.Errorf("failed to record absence: %w", err)
}
err = s.InteractionRespond(i.Interaction, &discordgo.InteractionResponse{
@ -682,8 +680,9 @@ func main() {
},
})
if err != nil {
log.Error().Ctx(ctx).Err(err).Msg("failed to respond to interaction")
return fmt.Errorf("failed to respond to interaction: %w", err)
}
return nil
},
}
@ -780,7 +779,6 @@ func registerHandlers(sess *discordgo.Session, handlers map[string]Handler) {
case discordgo.InteractionApplicationCommand:
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
ctx = context.WithValue(ctx, "correlation_id", gen.Next())
if h, ok := handlers[i.ApplicationCommandData().Name]; ok {
withLogging(i.ApplicationCommandData().Name, h)(ctx, s, i)
@ -791,16 +789,11 @@ func registerHandlers(sess *discordgo.Session, handlers map[string]Handler) {
state, err := parseCustomIDState(i.ModalSubmitData().CustomID)
if err != nil {
log.Error().Ctx(ctx).Err(err).Msg("unexpected error occured while parsing state from custom id, returning early.")
log.Error().Ctx(ctx).Err(err).Msg("unexpected error occurred while parsing state from custom id, returning early.")
return
}
cid := state.Get("correlation_id")
if cid != "" {
ctx = context.WithValue(ctx, "correlation_id", cid)
} else {
ctx = context.WithValue(ctx, "correlation_id", gen.Next())
}
ctx = propagator.Extract(ctx, correlation.UrlValuesCarrier(state))
if strings.HasPrefix(i.ModalSubmitData().CustomID, "modal_flush") {
sub := i.ModalSubmitData().Components[0].(*discordgo.ActionsRow).Components[0].(*discordgo.TextInput).Value
@ -848,12 +841,7 @@ func registerHandlers(sess *discordgo.Session, handlers map[string]Handler) {
return
}
cid := state.Get("correlation_id")
if cid != "" {
ctx = context.WithValue(ctx, "correlation_id", cid)
} else {
ctx = context.WithValue(ctx, "correlation_id", gen.Next())
}
ctx = propagator.Extract(ctx, correlation.UrlValuesCarrier(state))
switch i.MessageComponentData().CustomID {
case "schedule-response":

@ -1,16 +0,0 @@
package zerolog
import (
zl "github.com/rs/zerolog"
"go.wperron.io/themis/correlation"
)
type CorrelationHook struct{}
func (h CorrelationHook) Run(e *zl.Event, level zl.Level, msg string) {
ctx := e.GetCtx()
c := correlation.FromContext(ctx)
if c != nil {
e.Stringer("correlation_id", c)
}
}

@ -1,47 +0,0 @@
package correlation
import (
"context"
"encoding/hex"
)
const Key string = "correlation_id"
var Empty CorrelationID = CorrelationID{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
// Correlation ID is a byte array of length 16
type CorrelationID []byte
func (ci CorrelationID) String() string {
if ci == nil {
return hex.EncodeToString(Empty)
}
return hex.EncodeToString(ci)
}
func FromContext(ctx context.Context) CorrelationID {
if v := ctx.Value("correlation_id"); v != nil {
if c, ok := v.(CorrelationID); ok {
return c
}
}
return nil
}
type Sequencer interface {
Next() []byte
}
type Generator struct {
seq Sequencer
}
func NewGenerator(seq Sequencer) *Generator {
return &Generator{
seq: seq,
}
}
func (g *Generator) Next() CorrelationID {
return CorrelationID(g.seq.Next())
}

@ -1,18 +0,0 @@
package correlation
import (
"math/rand"
"testing"
"github.com/stretchr/testify/assert"
)
func TestGeneratorNext(t *testing.T) {
rand.Seed(0)
seq := &MathRandSequencer{}
gen := NewGenerator(seq)
assert.Equal(t, "0194fdc2fa2ffcc041d3ff12045b73c8", gen.Next().String())
assert.Equal(t, "6e4ff95ff662a5eee82abdf44a2d0b75", gen.Next().String())
assert.Equal(t, "fb180daf48a79ee0b10d394651850fd4", gen.Next().String())
}

@ -1,19 +0,0 @@
package correlation
import "crypto/rand"
type CryptoRandSequencer struct{}
func (crs *CryptoRandSequencer) Next() []byte {
buf := make([]byte, 16)
read, err := rand.Read(buf)
if err != nil {
panic("not implemented")
}
if read != 16 {
panic("todo")
}
return buf
}

@ -1,18 +0,0 @@
package correlation
import "math/rand"
type MathRandSequencer struct{}
func (mrs *MathRandSequencer) Next() []byte {
buf := make([]byte, 16)
read, err := rand.Read(buf)
if err != nil {
panic("not implemented")
}
if read != 16 {
panic("todo")
}
return buf
}

@ -0,0 +1,44 @@
package correlation
import (
"net/url"
"github.com/rs/zerolog"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
)
type TraceContextHook struct{}
func (h TraceContextHook) Run(e *zerolog.Event, level zerolog.Level, msg string) {
ctx := e.GetCtx()
spanContext := trace.SpanContextFromContext(ctx)
trace_id := spanContext.TraceID()
if trace_id.IsValid() {
e.Stringer("trace_id", trace_id)
}
}
var _ propagation.TextMapCarrier = UrlValuesCarrier{}
type UrlValuesCarrier url.Values
// Get implements propagation.TextMapCarrier.
func (u UrlValuesCarrier) Get(key string) string {
return url.Values(u).Get(key)
}
// Keys implements propagation.TextMapCarrier.
func (u UrlValuesCarrier) Keys() []string {
raw := map[string][]string(u)
ks := make([]string, 0, len(raw))
for k, _ := range raw {
ks = append(ks, k)
}
return ks
}
// Set implements propagation.TextMapCarrier.
func (u UrlValuesCarrier) Set(key string, value string) {
url.Values(u).Set(key, value)
}
Loading…
Cancel
Save