otelcol cueprocessor

cue as a data mangler?

SEAN K.H. LIAO

otelcol cueprocessor

cue as a data mangler?

otelcol

The opentelemetry collector is built in a modular way. Which means you can relatively easily write your own receiver/processor/exporter and build a version of the collector with it.

cue

cue is a language more suited for config and validation, but that doesn't mean you can't use it for data transformations too...

writing a processor

So, what do we need to write a procssor?

Using collector v0.56.0

config

This is the config that our procssor takes, which will be nested in the collector config yaml.

 1package cueprocessor
 2
 3import (
 4        "cuelang.org/go/cue/cuecontext"
 5        "go.opentelemetry.io/collector/config"
 6)
 7
 8type Config struct {
 9        // standard field that you need to embed
10        // squash ensures fields are correctly decoded in embedded struct
11        config.ProcessorSettings `mapstructure:",squash"`
12
13        // input that our processor needs
14        Program string `mapstructure:"program"`
15}
16
17// We're not going to do any validation now,
18// cue makes it a bit annoying to handle undeclared references
19func (c *Config) Validate() error {
20        return nil
21}
factory

The collector builder hardcodes the name NewFactory. With that, we register functions that will create our processor instances.

 1package cueprocessor
 2
 3import (
 4        "context"
 5
 6        "go.opentelemetry.io/collector/component"
 7        "go.opentelemetry.io/collector/config"
 8        "go.opentelemetry.io/collector/consumer"
 9        "go.opentelemetry.io/collector/pdata/plog"
10        "go.opentelemetry.io/collector/pdata/pmetric"
11        "go.opentelemetry.io/collector/pdata/ptrace"
12)
13
14const (
15        // The value of "type" key in configuration.
16        typeStr = "cue"
17)
18
19// This registers the type name, the config, and functions to create processor instances.
20func NewFactory() component.ProcessorFactory {
21        return component.NewProcessorFactory(
22                typeStr,
23                createDefaultConfig,
24                component.WithTracesProcessorAndStabilityLevel(createTracesProcessor, component.StabilityLevelInDevelopment),
25                component.WithMetricsProcessorAndStabilityLevel(createMetricsProcessor, component.StabilityLevelInDevelopment),
26                component.WithLogsProcessorAndStabilityLevel(createLogsProcessor, component.StabilityLevelInDevelopment),
27        )
28}
29
30func createDefaultConfig() config.Processor {
31        return &Config{
32                ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)),
33        }
34}
35
36func createTracesProcessor(ctx context.Context, s component.ProcessorCreateSettings, p config.Processor, next consumer.Traces) (component.TracesProcessor, error) {
37        proc, err := newProcessor(p.(*Config))
38        if err != nil {
39                return nil, err
40        }
41        return &tracesProcessor{
42                processor: proc,
43                next:      next,
44                m:         ptrace.NewJSONMarshaler(),
45                u:         ptrace.NewJSONUnmarshaler(),
46        }, nil
47}
48
49func createMetricsProcessor(ctx context.Context, s component.ProcessorCreateSettings, p config.Processor, next consumer.Metrics) (component.MetricsProcessor, error) {
50        proc, err := newProcessor(p.(*Config))
51        if err != nil {
52                return nil, err
53        }
54        return &metricsProcessor{
55                processor: proc,
56                next:      next,
57                m:         pmetric.NewJSONMarshaler(),
58                u:         pmetric.NewJSONUnmarshaler(),
59        }, nil
60}
61
62func createLogsProcessor(ctx context.Context, s component.ProcessorCreateSettings, p config.Processor, next consumer.Logs) (component.LogsProcessor, error) {
63        proc, err := newProcessor(p.(*Config))
64        if err != nil {
65                return nil, err
66        }
67        return &logsProcessor{
68                processor: proc,
69                next:      next,
70                m:         plog.NewJSONMarshaler(),
71                u:         plog.NewJSONUnmarshaler(),
72        }, nil
73}
processor

The actual processor. It's a bit repetitive to work over the 3 metric types.

The collector only exposes the data via a lot of functions and not the underlying data structure. This makes is sufficiently annoying to work with and I opted to roundtrip through json serialization. Additionally, I made the choice here to call the processor once per resource, but after thinking a bit more about it, I think it could have been called over the entire batch, and left to the user to iterate over the input in a list comprehension.

  1package cueprocessor
  2
  3import (
  4        "context"
  5
  6        "cuelang.org/go/cue"
  7        "cuelang.org/go/cue/cuecontext"
  8        cuejson "cuelang.org/go/encoding/json"
  9        "go.opentelemetry.io/collector/component"
 10        "go.opentelemetry.io/collector/consumer"
 11        "go.opentelemetry.io/collector/pdata/plog"
 12        "go.opentelemetry.io/collector/pdata/pmetric"
 13        "go.opentelemetry.io/collector/pdata/ptrace"
 14)
 15
 16type tracesProcessor struct {
 17        processor
 18        next consumer.Traces
 19        m    ptrace.Marshaler
 20        u    ptrace.Unmarshaler
 21}
 22
 23type metricsProcessor struct {
 24        processor
 25        next consumer.Metrics
 26        m    pmetric.Marshaler
 27        u    pmetric.Unmarshaler
 28}
 29
 30type logsProcessor struct {
 31        processor
 32        next consumer.Logs
 33        m    plog.Marshaler
 34        u    plog.Unmarshaler
 35}
 36
 37type processor struct {
 38        prog string
 39}
 40
 41func newProcessor(c *Config) (processor, error) {
 42        return processor{
 43                prog: c.Program,
 44        }, nil
 45}
 46
 47func (p processor) Capabilities() consumer.Capabilities {
 48        return consumer.Capabilities{MutatesData: true}
 49}
 50
 51func (p processor) Start(ctx context.Context, host component.Host) error {
 52        return nil
 53}
 54
 55func (p processor) Shutdown(ctx context.Context) error {
 56        return nil
 57}
 58
 59func (p tracesProcessor) ConsumeTraces(ctx context.Context, data ptrace.Traces) error {
 60        final := ptrace.NewTraces()
 61        final.ResourceSpans().EnsureCapacity(data.ResourceSpans().Len())
 62
 63        single := ptrace.NewTraces()
 64        single.ResourceSpans().EnsureCapacity(1)
 65
 66        for i := 0; i < data.ResourceSpans().Len(); i++ {
 67                single.ResourceSpans().AppendEmpty()
 68                data.ResourceSpans().At(i).MoveTo(single.ResourceSpans().At(0))
 69                jsonRaw, err := p.m.MarshalTraces(single)
 70                if err != nil {
 71                        return err
 72                }
 73                expr, err := cuejson.Extract("", jsonRaw)
 74                if err != nil {
 75                        return err
 76                }
 77
 78                c := cuecontext.New()
 79                val := c.CompileString("in: {}")
 80                val = val.Fill(expr, "in")
 81                val = c.CompileString(p.prog, cue.Scope(val))
 82                val = val.Lookup("out")
 83
 84                jsonRaw, err = val.MarshalJSON()
 85                if err != nil {
 86                        return err
 87                }
 88                single, err = p.u.UnmarshalTraces(jsonRaw)
 89                if err != nil {
 90                        return err
 91                }
 92
 93                single.ResourceSpans().MoveAndAppendTo(final.ResourceSpans())
 94        }
 95
 96        return p.next.ConsumeTraces(ctx, final)
 97}
 98
 99func (p metricsProcessor) ConsumeMetrics(ctx context.Context, data pmetric.Metrics) error {
100        final := pmetric.NewMetrics()
101        final.ResourceMetrics().EnsureCapacity(data.ResourceMetrics().Len())
102
103        single := pmetric.NewMetrics()
104        single.ResourceMetrics().EnsureCapacity(1)
105
106        for i := 0; i < data.ResourceMetrics().Len(); i++ {
107                single.ResourceMetrics().AppendEmpty()
108                data.ResourceMetrics().At(i).MoveTo(single.ResourceMetrics().At(0))
109                jsonRaw, err := p.m.MarshalMetrics(single)
110                if err != nil {
111                        return err
112                }
113                expr, err := cuejson.Extract("", jsonRaw)
114                if err != nil {
115                        return err
116                }
117
118                c := cuecontext.New()
119                val := c.CompileString("in: {}")
120                val = val.Fill(expr, "in")
121                val = c.CompileString(p.prog, cue.Scope(val))
122                val = val.Lookup("out")
123
124                jsonRaw, err = val.MarshalJSON()
125                if err != nil {
126                        return err
127                }
128                single, err = p.u.UnmarshalMetrics(jsonRaw)
129                if err != nil {
130                        return err
131                }
132
133                single.ResourceMetrics().MoveAndAppendTo(final.ResourceMetrics())
134        }
135
136        return p.next.ConsumeMetrics(ctx, final)
137}
138
139func (p logsProcessor) ConsumeLogs(ctx context.Context, data plog.Logs) error {
140        final := plog.NewLogs()
141        final.ResourceLogs().EnsureCapacity(data.ResourceLogs().Len())
142
143        single := plog.NewLogs()
144        single.ResourceLogs().EnsureCapacity(1)
145
146        for i := 0; i < data.ResourceLogs().Len(); i++ {
147                single.ResourceLogs().AppendEmpty()
148                data.ResourceLogs().At(i).MoveTo(single.ResourceLogs().At(0))
149                jsonRaw, err := p.m.MarshalLogs(single)
150                if err != nil {
151                        return err
152                }
153                expr, err := cuejson.Extract("", jsonRaw)
154                if err != nil {
155                        return err
156                }
157
158                c := cuecontext.New()
159                val := c.CompileString("in: {}")
160                val = val.Fill(expr, "in")
161                val = c.CompileString(p.prog, cue.Scope(val))
162                val = val.Lookup("out")
163
164                jsonRaw, err = val.MarshalJSON()
165                if err != nil {
166                        return err
167                }
168                single, err = p.u.UnmarshalLogs(jsonRaw)
169                if err != nil {
170                        return err
171                }
172
173                single.ResourceLogs().MoveAndAppendTo(final.ResourceLogs())
174        }
175
176        return p.next.ConsumeLogs(ctx, final)
177}
178

building the collector

With the above, we can build a collector using the builder

We need a config file with some basic receivers/exporters for debugging:

 1receivers:
 2  - import: go.opentelemetry.io/collector/receiver/otlpreceiver
 3    gomod: go.opentelemetry.io/collector v0.56.0
 4processors:
 5  - gomod: go.seankhliao.com/otelcol-exp v0.0.0
 6    import: go.seankhliao.com/otelcol-exp/cueprocessor
 7    path: /home/arccy/code/soft-serve/otelcol-exp
 8exporters:
 9  - import: go.opentelemetry.io/collector/exporter/loggingexporter
10    gomod: go.opentelemetry.io/collector v0.56.0

and we can build it with

1$ builder --config builder.yaml

running the collector

And we can run it, with an example config like:

 1receivers:
 2  otlp:
 3    protocols:
 4      grpc:
 5        endpoint: localhost:4317
 6
 7processors:
 8  cue:
 9    program: |
10      out: resourceSpans: [{
11        resource: attributes: [
12            for x in in.resourceSpans[0].resource.attributes {x},
13            {key: "foo", value: stringValue: "bar"},
14        ]
15        scopeSpans: in.resourceSpans[0].scopeSpans
16        schemaUrl: in.resourceSpans[0].schemaUrl
17      }]      
18
19exporters:
20  logging:
21    loglevel: debug
22
23service:
24  pipelines:
25    traces:
26      receivers:
27        - otlp
28      processors:
29        - cue
30      exporters:
31        - logging

running it with:

1$ /tmp/otelcol-distribution1976041369/otelcol-custom --config config.yaml

and we can test it by sending some traces using tracegen

1$ tracegen -otlp-insecure -traces 1