SEANK.H.LIAO

debugging vector-datadog data submission

the dog ate my metrics

proxying data between vector and datadog

We have an issue. We have a canary app increasing a counter on a regular interval. It sends the data 2 different ways: via OpenTelemetry OTLP to an otel-collector through to vector through to datadog, and via Prometheus exposition to an otel-collector through to vector through to datadog. The problem is, that sometimes less than expected data would show up in datadog. Our counter increased once every 5s, we had 2 instances, we expected value=24 / in datadog every minute. But sometimes it would be less, like only value=12 in datadog, and it would persist for hours or even days. It was sometimes correlated to a rollout of vector, but sometimes that that wasn't a stable or necessary predictor of failure.

With a complicated pipeline, we suspected something in there was broken. An easy way to rule out half our pipeline was using the otel-collector's datadogexporter, bypassing vector and submitting the metric to datadog. The result? A rock solid, flat line. So the issue wasn't in the canary app or in the collector.

Next we looked at vector. I went for a much more rudimentary method of vector tap on the last transform before the sink, and grepped the log lines. We also had a stable total value=24 / min, even when in our datadog dashboards we could see value=12. So this narrows it down to somewhere between the sink and datadog.

proxy the sink

For reasons, I didn't actually trust the metrics generated by vector about itself (previously, it lost data without reporting it), plus I wanted extra metrics anyway. I also wanted way to read the requests that vector was sending out. Their datadog_metrics sink supported a proxy config, but then I remembered that https_proxy was a bit useless: the proxy would see a HTTP CONNECT request, and tunnel through a TLS connection to the destination without being able to inspect the payload.

Some more thinking later, I determined my best option was setting endpoint to a sidecar, letting vector make the request to the sidecar, and have the sidecar submit to datadog. So now: implement an endpoint capable of proxying requests and decoding the payload. The code was simple enough, except for 2 things that tripped me up: HTTP content-encoding: deflate meant compress/zlib and not just plain compress/deflate (zlib is a thin wrapper with some extra metadata), and Go's net/http.Request.ContentLength was a struct attribute and not just a header field (I needed to clear it to be able to send modified payloads).

Results? vector sends out the request correctly, datadog responds with HTTP 202 Accepted, and out metric data is still missing. :sob: time to go complain.

proxy code

  1package main
  2
  3import (
  4        "bytes"
  5        "compress/zlib"
  6        "context"
  7        "encoding/json"
  8        "io"
  9        "log"
 10        "net/http"
 11        "net/http/httputil"
 12        "net/url"
 13        "os"
 14        "time"
 15
 16        "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
 17        "go.opentelemetry.io/otel"
 18        "go.opentelemetry.io/otel/attribute"
 19        "go.opentelemetry.io/otel/metric"
 20        "go.seankhliao.com/testrepo0133/otelsdk"
 21        "golang.org/x/time/rate"
 22)
 23
 24func main() {
 25        ctx := context.Background()
 26        exp, err := otelsdk.Init(ctx, otelsdk.Config{})
 27        if err != nil {
 28                log.Fatalln("setup otel sdk", err)
 29        }
 30        defer exp.Shutdown(context.Background())
 31
 32        mt := otel.GetMeterProvider().Meter("dd-proxy")
 33        hReqSize, _ := mt.Int64Histogram("ddproxy.vector.request.size")
 34        hReqDur, _ := mt.Int64Histogram("ddproxy.client.duration.ms")
 35        hOTLP, _ := mt.Int64Histogram("ddproxy.canary.otlp")
 36        hProm, _ := mt.Int64Histogram("ddproxy.canary.prom")
 37
 38        u := &url.URL{
 39                Scheme: "https",
 40                Host:   "api." + os.Getenv("DD_SITE"),
 41        }
 42        rp := httputil.NewSingleHostReverseProxy(u)
 43        rt := &rtripper{
 44                hReqSize,
 45                hReqDur,
 46                hOTLP,
 47                hProm,
 48                http.DefaultTransport,
 49                rate.NewLimiter(0.5, 1),
 50        }
 51        rp.Transport = otelhttp.NewTransport(rt)
 52
 53        http.Handle("/", otelhttp.NewHandler(rp, "submit to datadog"))
 54        // http.Handle("/", otelhttp.NewHandler(logreq(rp), "submit to datadog"))
 55        // http.Handle("/metrics", exp.PromHandler)
 56
 57        log.Println("single host reverse proxy for", u.String())
 58        log.Println("serving on :8090")
 59        http.ListenAndServe(":8090", nil)
 60}
 61
 62func logreq(h http.Handler) http.Handler {
 63        return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
 64                log.Println("serving request:", r.URL.String())
 65                h.ServeHTTP(rw, r)
 66        })
 67}
 68
 69type rtripper struct {
 70        hReqSize metric.Int64Histogram
 71        hReqDur  metric.Int64Histogram
 72        hOTLP    metric.Int64Histogram
 73        hProm    metric.Int64Histogram
 74        base     http.RoundTripper
 75        rate     *rate.Limiter
 76}
 77
 78func (r *rtripper) RoundTrip(req *http.Request) (*http.Response, error) {
 79        ctx := context.Background()
 80
 81        if req.Method == http.MethodPost {
 82                body, err := io.ReadAll(req.Body)
 83                if err != nil {
 84                        log.Println("roundtripper read body", err)
 85                }
 86
 87                var uncompressed []byte
 88                if req.Header.Get("content-encoding") == "deflate" {
 89                        cr, err := zlib.NewReader(bytes.NewReader(body))
 90                        if err != nil {
 91                                log.Println("roundtripper create zlib reader", err)
 92                        }
 93                        uncompressed, err = io.ReadAll(cr)
 94                        if err != nil {
 95                                log.Println("roundtripper decompress body", err)
 96                        }
 97                        req.Header.Set("content-encoding", "identity")
 98                        req.ContentLength = int64(len(uncompressed))
 99                } else {
100                        uncompressed = body
101                }
102                req.Body = io.NopCloser(bytes.NewReader(uncompressed))
103
104                // metrics
105                r.hReqSize.Record(ctx, int64(len(body)),
106                        metric.WithAttributes(
107                                attribute.String("path", req.URL.Path),
108                                attribute.String("compressed", "true"),
109                        ),
110                )
111                r.hReqSize.Record(ctx, int64(len(uncompressed)),
112                        metric.WithAttributes(
113                                attribute.String("path", req.URL.Path),
114                                attribute.String("compressed", "false"),
115                        ),
116                )
117
118                switch req.URL.Path {
119                case "/api/v1/series":
120                        if bytes.Contains(uncompressed, []byte(`polaris.o11y.canary.`)) {
121                                var ddseries DDSeries
122                                err := json.Unmarshal(uncompressed, &ddseries)
123                                if err == nil {
124                                        for _, s := range ddseries.Series {
125                                                switch s.Metric {
126                                                case "polaris.o11y.canary.prom.total":
127                                                        // log.Println("found prom canary", s)
128                                                        for _, p := range s.Points {
129                                                                r.hProm.Record(ctx, int64(p[1]))
130                                                        }
131                                                case "polaris.o11y.canary.otlp":
132                                                        // log.Println("found otlp canary", s)
133                                                        for _, p := range s.Points {
134                                                                r.hOTLP.Record(ctx, int64(p[1]))
135                                                        }
136
137                                                }
138                                        }
139                                } else {
140                                        if r.rate.Allow() {
141                                                log.Println(string(uncompressed))
142                                        }
143                                        log.Println("unmarshal request json", err)
144                                }
145                        }
146                }
147        }
148
149        t0 := time.Now()
150        res, err := r.base.RoundTrip(req)
151        r.hReqDur.Record(ctx, int64(time.Since(t0).Milliseconds()))
152        if err == nil {
153                log.Println("roundtripper", req.URL.String(), res.Status, req.Header.Get("content-type"), req.Header.Get("content-encoding"))
154        }
155        return res, err
156}
157
158type DDSeries struct {
159        Series []struct {
160                Metric string      `json:"metric"`
161                Points [][]float64 `json:"points"`
162        } `json:"series"`
163}