We have an issue. We have a canary app increasing a counter on a regular interval. It sends the data 2 different ways: via OpenTelemetry OTLP to an otel-collector through to vector through to datadog, and via Prometheus exposition to an otel-collector through to vector through to datadog. The problem is, that sometimes less than expected data would show up in datadog. Our counter increased once every 5s, we had 2 instances, we expected value=24 / in datadog every minute. But sometimes it would be less, like only value=12 in datadog, and it would persist for hours or even days. It was sometimes correlated to a rollout of vector, but sometimes that that wasn't a stable or necessary predictor of failure.
With a complicated pipeline, we suspected something in there was broken. An easy way to rule out half our pipeline was using the otel-collector's datadogexporter, bypassing vector and submitting the metric to datadog. The result? A rock solid, flat line. So the issue wasn't in the canary app or in the collector.
Next we looked at vector.
I went for a much more rudimentary method of vector tap
on the last transform before the sink,
and grepped the log lines.
We also had a stable total value=24 / min, even when in our datadog dashboards we could see value=12.
So this narrows it down to somewhere between the sink and datadog.
For reasons, I didn't actually trust the metrics generated by vector about itself
(previously, it lost data without reporting it),
plus I wanted extra metrics anyway.
I also wanted way to read the requests that vector was sending out.
Their datadog_metrics sink supported a proxy
config,
but then I remembered that https_proxy
was a bit useless:
the proxy would see a HTTP CONNECT
request,
and tunnel through a TLS connection to the destination without being able to inspect the payload.
Some more thinking later,
I determined my best option was setting endpoint
to a sidecar,
letting vector make the request to the sidecar, and have the sidecar submit to datadog.
So now: implement an endpoint capable of proxying requests and decoding the payload.
The code was simple enough,
except for 2 things that tripped me up:
HTTP content-encoding: deflate
meant compress/zlib
and not just plain compress/deflate
(zlib is a thin wrapper with some extra metadata),
and Go's net/http.Request.ContentLength
was a struct attribute and not just a header field
(I needed to clear it to be able to send modified payloads).
Results? vector sends out the request correctly, datadog responds with HTTP 202 Accepted, and out metric data is still missing. :sob: time to go complain.
1package main
2
3import (
4 "bytes"
5 "compress/zlib"
6 "context"
7 "encoding/json"
8 "io"
9 "log"
10 "net/http"
11 "net/http/httputil"
12 "net/url"
13 "os"
14 "time"
15
16 "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
17 "go.opentelemetry.io/otel"
18 "go.opentelemetry.io/otel/attribute"
19 "go.opentelemetry.io/otel/metric"
20 "go.seankhliao.com/testrepo0133/otelsdk"
21 "golang.org/x/time/rate"
22)
23
24func main() {
25 ctx := context.Background()
26 exp, err := otelsdk.Init(ctx, otelsdk.Config{})
27 if err != nil {
28 log.Fatalln("setup otel sdk", err)
29 }
30 defer exp.Shutdown(context.Background())
31
32 mt := otel.GetMeterProvider().Meter("dd-proxy")
33 hReqSize, _ := mt.Int64Histogram("ddproxy.vector.request.size")
34 hReqDur, _ := mt.Int64Histogram("ddproxy.client.duration.ms")
35 hOTLP, _ := mt.Int64Histogram("ddproxy.canary.otlp")
36 hProm, _ := mt.Int64Histogram("ddproxy.canary.prom")
37
38 u := &url.URL{
39 Scheme: "https",
40 Host: "api." + os.Getenv("DD_SITE"),
41 }
42 rp := httputil.NewSingleHostReverseProxy(u)
43 rt := &rtripper{
44 hReqSize,
45 hReqDur,
46 hOTLP,
47 hProm,
48 http.DefaultTransport,
49 rate.NewLimiter(0.5, 1),
50 }
51 rp.Transport = otelhttp.NewTransport(rt)
52
53 http.Handle("/", otelhttp.NewHandler(rp, "submit to datadog"))
54 // http.Handle("/", otelhttp.NewHandler(logreq(rp), "submit to datadog"))
55 // http.Handle("/metrics", exp.PromHandler)
56
57 log.Println("single host reverse proxy for", u.String())
58 log.Println("serving on :8090")
59 http.ListenAndServe(":8090", nil)
60}
61
62func logreq(h http.Handler) http.Handler {
63 return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
64 log.Println("serving request:", r.URL.String())
65 h.ServeHTTP(rw, r)
66 })
67}
68
69type rtripper struct {
70 hReqSize metric.Int64Histogram
71 hReqDur metric.Int64Histogram
72 hOTLP metric.Int64Histogram
73 hProm metric.Int64Histogram
74 base http.RoundTripper
75 rate *rate.Limiter
76}
77
78func (r *rtripper) RoundTrip(req *http.Request) (*http.Response, error) {
79 ctx := context.Background()
80
81 if req.Method == http.MethodPost {
82 body, err := io.ReadAll(req.Body)
83 if err != nil {
84 log.Println("roundtripper read body", err)
85 }
86
87 var uncompressed []byte
88 if req.Header.Get("content-encoding") == "deflate" {
89 cr, err := zlib.NewReader(bytes.NewReader(body))
90 if err != nil {
91 log.Println("roundtripper create zlib reader", err)
92 }
93 uncompressed, err = io.ReadAll(cr)
94 if err != nil {
95 log.Println("roundtripper decompress body", err)
96 }
97 req.Header.Set("content-encoding", "identity")
98 req.ContentLength = int64(len(uncompressed))
99 } else {
100 uncompressed = body
101 }
102 req.Body = io.NopCloser(bytes.NewReader(uncompressed))
103
104 // metrics
105 r.hReqSize.Record(ctx, int64(len(body)),
106 metric.WithAttributes(
107 attribute.String("path", req.URL.Path),
108 attribute.String("compressed", "true"),
109 ),
110 )
111 r.hReqSize.Record(ctx, int64(len(uncompressed)),
112 metric.WithAttributes(
113 attribute.String("path", req.URL.Path),
114 attribute.String("compressed", "false"),
115 ),
116 )
117
118 switch req.URL.Path {
119 case "/api/v1/series":
120 if bytes.Contains(uncompressed, []byte(`polaris.o11y.canary.`)) {
121 var ddseries DDSeries
122 err := json.Unmarshal(uncompressed, &ddseries)
123 if err == nil {
124 for _, s := range ddseries.Series {
125 switch s.Metric {
126 case "polaris.o11y.canary.prom.total":
127 // log.Println("found prom canary", s)
128 for _, p := range s.Points {
129 r.hProm.Record(ctx, int64(p[1]))
130 }
131 case "polaris.o11y.canary.otlp":
132 // log.Println("found otlp canary", s)
133 for _, p := range s.Points {
134 r.hOTLP.Record(ctx, int64(p[1]))
135 }
136
137 }
138 }
139 } else {
140 if r.rate.Allow() {
141 log.Println(string(uncompressed))
142 }
143 log.Println("unmarshal request json", err)
144 }
145 }
146 }
147 }
148
149 t0 := time.Now()
150 res, err := r.base.RoundTrip(req)
151 r.hReqDur.Record(ctx, int64(time.Since(t0).Milliseconds()))
152 if err == nil {
153 log.Println("roundtripper", req.URL.String(), res.Status, req.Header.Get("content-type"), req.Header.Get("content-encoding"))
154 }
155 return res, err
156}
157
158type DDSeries struct {
159 Series []struct {
160 Metric string `json:"metric"`
161 Points [][]float64 `json:"points"`
162 } `json:"series"`
163}