We have an issue. We have a canary app increasing a counter on a regular interval. It sends the data 2 different ways: via OpenTelemetry OTLP to an otel-collector through to vector through to datadog, and via Prometheus exposition to an otel-collector through to vector through to datadog. The problem is, that sometimes less than expected data would show up in datadog. Our counter increased once every 5s, we had 2 instances, we expected value=24 / in datadog every minute. But sometimes it would be less, like only value=12 in datadog, and it would persist for hours or even days. It was sometimes correlated to a rollout of vector, but sometimes that that wasn't a stable or necessary predictor of failure.
With a complicated pipeline, we suspected something in there was broken. An easy way to rule out half our pipeline was using the otel-collector's datadogexporter, bypassing vector and submitting the metric to datadog. The result? A rock solid, flat line. So the issue wasn't in the canary app or in the collector.
Next we looked at vector.
I went for a much more rudimentary method of vector tap
on the last transform before the sink,
and grepped the log lines.
We also had a stable total value=24 / min, even when in our datadog dashboards we could see value=12.
So this narrows it down to somewhere between the sink and datadog.
For reasons, I didn't actually trust the metrics generated by vector about itself
(previously, it lost data without reporting it),
plus I wanted extra metrics anyway.
I also wanted way to read the requests that vector was sending out.
Their datadog_metrics sink supported a proxy
config,
but then I remembered that https_proxy
was a bit useless:
the proxy would see a HTTP CONNECT
request,
and tunnel through a TLS connection to the destination without being able to inspect the payload.
Some more thinking later,
I determined my best option was setting endpoint
to a sidecar,
letting vector make the request to the sidecar, and have the sidecar submit to datadog.
So now: implement an endpoint capable of proxying requests and decoding the payload.
The code was simple enough,
except for 2 things that tripped me up:
HTTP content-encoding: deflate
meant compress/zlib
and not just plain compress/deflate
(zlib is a thin wrapper with some extra metadata),
and Go's net/http.Request.ContentLength
was a struct attribute and not just a header field
(I needed to clear it to be able to send modified payloads).
Results? vector sends out the request correctly, datadog responds with HTTP 202 Accepted, and out metric data is still missing. :sob: time to go complain.
package main
import (
"bytes"
"compress/zlib"
"context"
"encoding/json"
"io"
"log"
"net/http"
"net/http/httputil"
"net/url"
"os"
"time"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.seankhliao.com/testrepo0133/otelsdk"
"golang.org/x/time/rate"
)
func main() {
ctx := context.Background()
exp, err := otelsdk.Init(ctx, otelsdk.Config{})
if err != nil {
log.Fatalln("setup otel sdk", err)
}
defer exp.Shutdown(context.Background())
mt := otel.GetMeterProvider().Meter("dd-proxy")
hReqSize, _ := mt.Int64Histogram("ddproxy.vector.request.size")
hReqDur, _ := mt.Int64Histogram("ddproxy.client.duration.ms")
hOTLP, _ := mt.Int64Histogram("ddproxy.canary.otlp")
hProm, _ := mt.Int64Histogram("ddproxy.canary.prom")
u := &url.URL{
Scheme: "https",
Host: "api." + os.Getenv("DD_SITE"),
}
rp := httputil.NewSingleHostReverseProxy(u)
rt := &rtripper{
hReqSize,
hReqDur,
hOTLP,
hProm,
http.DefaultTransport,
rate.NewLimiter(0.5, 1),
}
rp.Transport = otelhttp.NewTransport(rt)
http.Handle("/", otelhttp.NewHandler(rp, "submit to datadog"))
// http.Handle("/", otelhttp.NewHandler(logreq(rp), "submit to datadog"))
// http.Handle("/metrics", exp.PromHandler)
log.Println("single host reverse proxy for", u.String())
log.Println("serving on :8090")
http.ListenAndServe(":8090", nil)
}
func logreq(h http.Handler) http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
log.Println("serving request:", r.URL.String())
h.ServeHTTP(rw, r)
})
}
type rtripper struct {
hReqSize metric.Int64Histogram
hReqDur metric.Int64Histogram
hOTLP metric.Int64Histogram
hProm metric.Int64Histogram
base http.RoundTripper
rate *rate.Limiter
}
func (r *rtripper) RoundTrip(req *http.Request) (*http.Response, error) {
ctx := context.Background()
if req.Method == http.MethodPost {
body, err := io.ReadAll(req.Body)
if err != nil {
log.Println("roundtripper read body", err)
}
var uncompressed []byte
if req.Header.Get("content-encoding") == "deflate" {
cr, err := zlib.NewReader(bytes.NewReader(body))
if err != nil {
log.Println("roundtripper create zlib reader", err)
}
uncompressed, err = io.ReadAll(cr)
if err != nil {
log.Println("roundtripper decompress body", err)
}
req.Header.Set("content-encoding", "identity")
req.ContentLength = int64(len(uncompressed))
} else {
uncompressed = body
}
req.Body = io.NopCloser(bytes.NewReader(uncompressed))
// metrics
r.hReqSize.Record(ctx, int64(len(body)),
metric.WithAttributes(
attribute.String("path", req.URL.Path),
attribute.String("compressed", "true"),
),
)
r.hReqSize.Record(ctx, int64(len(uncompressed)),
metric.WithAttributes(
attribute.String("path", req.URL.Path),
attribute.String("compressed", "false"),
),
)
switch req.URL.Path {
case "/api/v1/series":
if bytes.Contains(uncompressed, []byte(`polaris.o11y.canary.`)) {
var ddseries DDSeries
err := json.Unmarshal(uncompressed, &ddseries)
if err == nil {
for _, s := range ddseries.Series {
switch s.Metric {
case "polaris.o11y.canary.prom.total":
// log.Println("found prom canary", s)
for _, p := range s.Points {
r.hProm.Record(ctx, int64(p[1]))
}
case "polaris.o11y.canary.otlp":
// log.Println("found otlp canary", s)
for _, p := range s.Points {
r.hOTLP.Record(ctx, int64(p[1]))
}
}
}
} else {
if r.rate.Allow() {
log.Println(string(uncompressed))
}
log.Println("unmarshal request json", err)
}
}
}
}
t0 := time.Now()
res, err := r.base.RoundTrip(req)
r.hReqDur.Record(ctx, int64(time.Since(t0).Milliseconds()))
if err == nil {
log.Println("roundtripper", req.URL.String(), res.Status, req.Header.Get("content-type"), req.Header.Get("content-encoding"))
}
return res, err
}
type DDSeries struct {
Series []struct {
Metric string `json:"metric"`
Points [][]float64 `json:"points"`
} `json:"series"`
}