debugging vector-datadog data submission

the dog ate my metrics

SEAN K.H. LIAO

debugging vector-datadog data submission

the dog ate my metrics

proxying data between vector and datadog

We have an issue. We have a canary app increasing a counter on a regular interval. It sends the data 2 different ways: via OpenTelemetry OTLP to an otel-collector through to vector through to datadog, and via Prometheus exposition to an otel-collector through to vector through to datadog. The problem is, that sometimes less than expected data would show up in datadog. Our counter increased once every 5s, we had 2 instances, we expected value=24 / in datadog every minute. But sometimes it would be less, like only value=12 in datadog, and it would persist for hours or even days. It was sometimes correlated to a rollout of vector, but sometimes that that wasn't a stable or necessary predictor of failure.

With a complicated pipeline, we suspected something in there was broken. An easy way to rule out half our pipeline was using the otel-collector's datadogexporter, bypassing vector and submitting the metric to datadog. The result? A rock solid, flat line. So the issue wasn't in the canary app or in the collector.

Next we looked at vector. I went for a much more rudimentary method of vector tap on the last transform before the sink, and grepped the log lines. We also had a stable total value=24 / min, even when in our datadog dashboards we could see value=12. So this narrows it down to somewhere between the sink and datadog.

proxy the sink

For reasons, I didn't actually trust the metrics generated by vector about itself (previously, it lost data without reporting it), plus I wanted extra metrics anyway. I also wanted way to read the requests that vector was sending out. Their datadog_metrics sink supported a proxy config, but then I remembered that https_proxy was a bit useless: the proxy would see a HTTP CONNECT request, and tunnel through a TLS connection to the destination without being able to inspect the payload.

Some more thinking later, I determined my best option was setting endpoint to a sidecar, letting vector make the request to the sidecar, and have the sidecar submit to datadog. So now: implement an endpoint capable of proxying requests and decoding the payload. The code was simple enough, except for 2 things that tripped me up: HTTP content-encoding: deflate meant compress/zlib and not just plain compress/deflate (zlib is a thin wrapper with some extra metadata), and Go's net/http.Request.ContentLength was a struct attribute and not just a header field (I needed to clear it to be able to send modified payloads).

Results? vector sends out the request correctly, datadog responds with HTTP 202 Accepted, and out metric data is still missing. :sob: time to go complain.

proxy code

package main

import (
        "bytes"
        "compress/zlib"
        "context"
        "encoding/json"
        "io"
        "log"
        "net/http"
        "net/http/httputil"
        "net/url"
        "os"
        "time"

        "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
        "go.opentelemetry.io/otel"
        "go.opentelemetry.io/otel/attribute"
        "go.opentelemetry.io/otel/metric"
        "go.seankhliao.com/testrepo0133/otelsdk"
        "golang.org/x/time/rate"
)

func main() {
        ctx := context.Background()
        exp, err := otelsdk.Init(ctx, otelsdk.Config{})
        if err != nil {
                log.Fatalln("setup otel sdk", err)
        }
        defer exp.Shutdown(context.Background())

        mt := otel.GetMeterProvider().Meter("dd-proxy")
        hReqSize, _ := mt.Int64Histogram("ddproxy.vector.request.size")
        hReqDur, _ := mt.Int64Histogram("ddproxy.client.duration.ms")
        hOTLP, _ := mt.Int64Histogram("ddproxy.canary.otlp")
        hProm, _ := mt.Int64Histogram("ddproxy.canary.prom")

        u := &url.URL{
                Scheme: "https",
                Host:   "api." + os.Getenv("DD_SITE"),
        }
        rp := httputil.NewSingleHostReverseProxy(u)
        rt := &rtripper{
                hReqSize,
                hReqDur,
                hOTLP,
                hProm,
                http.DefaultTransport,
                rate.NewLimiter(0.5, 1),
        }
        rp.Transport = otelhttp.NewTransport(rt)

        http.Handle("/", otelhttp.NewHandler(rp, "submit to datadog"))
        // http.Handle("/", otelhttp.NewHandler(logreq(rp), "submit to datadog"))
        // http.Handle("/metrics", exp.PromHandler)

        log.Println("single host reverse proxy for", u.String())
        log.Println("serving on :8090")
        http.ListenAndServe(":8090", nil)
}

func logreq(h http.Handler) http.Handler {
        return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
                log.Println("serving request:", r.URL.String())
                h.ServeHTTP(rw, r)
        })
}

type rtripper struct {
        hReqSize metric.Int64Histogram
        hReqDur  metric.Int64Histogram
        hOTLP    metric.Int64Histogram
        hProm    metric.Int64Histogram
        base     http.RoundTripper
        rate     *rate.Limiter
}

func (r *rtripper) RoundTrip(req *http.Request) (*http.Response, error) {
        ctx := context.Background()

        if req.Method == http.MethodPost {
                body, err := io.ReadAll(req.Body)
                if err != nil {
                        log.Println("roundtripper read body", err)
                }

                var uncompressed []byte
                if req.Header.Get("content-encoding") == "deflate" {
                        cr, err := zlib.NewReader(bytes.NewReader(body))
                        if err != nil {
                                log.Println("roundtripper create zlib reader", err)
                        }
                        uncompressed, err = io.ReadAll(cr)
                        if err != nil {
                                log.Println("roundtripper decompress body", err)
                        }
                        req.Header.Set("content-encoding", "identity")
                        req.ContentLength = int64(len(uncompressed))
                } else {
                        uncompressed = body
                }
                req.Body = io.NopCloser(bytes.NewReader(uncompressed))

                // metrics
                r.hReqSize.Record(ctx, int64(len(body)),
                        metric.WithAttributes(
                                attribute.String("path", req.URL.Path),
                                attribute.String("compressed", "true"),
                        ),
                )
                r.hReqSize.Record(ctx, int64(len(uncompressed)),
                        metric.WithAttributes(
                                attribute.String("path", req.URL.Path),
                                attribute.String("compressed", "false"),
                        ),
                )

                switch req.URL.Path {
                case "/api/v1/series":
                        if bytes.Contains(uncompressed, []byte(`polaris.o11y.canary.`)) {
                                var ddseries DDSeries
                                err := json.Unmarshal(uncompressed, &ddseries)
                                if err == nil {
                                        for _, s := range ddseries.Series {
                                                switch s.Metric {
                                                case "polaris.o11y.canary.prom.total":
                                                        // log.Println("found prom canary", s)
                                                        for _, p := range s.Points {
                                                                r.hProm.Record(ctx, int64(p[1]))
                                                        }
                                                case "polaris.o11y.canary.otlp":
                                                        // log.Println("found otlp canary", s)
                                                        for _, p := range s.Points {
                                                                r.hOTLP.Record(ctx, int64(p[1]))
                                                        }

                                                }
                                        }
                                } else {
                                        if r.rate.Allow() {
                                                log.Println(string(uncompressed))
                                        }
                                        log.Println("unmarshal request json", err)
                                }
                        }
                }
        }

        t0 := time.Now()
        res, err := r.base.RoundTrip(req)
        r.hReqDur.Record(ctx, int64(time.Since(t0).Milliseconds()))
        if err == nil {
                log.Println("roundtripper", req.URL.String(), res.Status, req.Header.Get("content-type"), req.Header.Get("content-encoding"))
        }
        return res, err
}

type DDSeries struct {
        Series []struct {
                Metric string      `json:"metric"`
                Points [][]float64 `json:"points"`
        } `json:"series"`
}