The opentelemetry collector is built in a modular way. Which means you can relatively easily write your own receiver/processor/exporter and build a version of the collector with it.
cue is a language more suited for config and validation, but that doesn't mean you can't use it for data transformations too...
So, what do we need to write a procssor?
Using collector v0.56.0
This is the config that our procssor takes, which will be nested in the collector config yaml.
1package cueprocessor
2
3import (
4 "cuelang.org/go/cue/cuecontext"
5 "go.opentelemetry.io/collector/config"
6)
7
8type Config struct {
9 // standard field that you need to embed
10 // squash ensures fields are correctly decoded in embedded struct
11 config.ProcessorSettings `mapstructure:",squash"`
12
13 // input that our processor needs
14 Program string `mapstructure:"program"`
15}
16
17// We're not going to do any validation now,
18// cue makes it a bit annoying to handle undeclared references
19func (c *Config) Validate() error {
20 return nil
21}
The collector builder hardcodes the name NewFactory. With that, we register functions that will create our processor instances.
1package cueprocessor
2
3import (
4 "context"
5
6 "go.opentelemetry.io/collector/component"
7 "go.opentelemetry.io/collector/config"
8 "go.opentelemetry.io/collector/consumer"
9 "go.opentelemetry.io/collector/pdata/plog"
10 "go.opentelemetry.io/collector/pdata/pmetric"
11 "go.opentelemetry.io/collector/pdata/ptrace"
12)
13
14const (
15 // The value of "type" key in configuration.
16 typeStr = "cue"
17)
18
19// This registers the type name, the config, and functions to create processor instances.
20func NewFactory() component.ProcessorFactory {
21 return component.NewProcessorFactory(
22 typeStr,
23 createDefaultConfig,
24 component.WithTracesProcessorAndStabilityLevel(createTracesProcessor, component.StabilityLevelInDevelopment),
25 component.WithMetricsProcessorAndStabilityLevel(createMetricsProcessor, component.StabilityLevelInDevelopment),
26 component.WithLogsProcessorAndStabilityLevel(createLogsProcessor, component.StabilityLevelInDevelopment),
27 )
28}
29
30func createDefaultConfig() config.Processor {
31 return &Config{
32 ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)),
33 }
34}
35
36func createTracesProcessor(ctx context.Context, s component.ProcessorCreateSettings, p config.Processor, next consumer.Traces) (component.TracesProcessor, error) {
37 proc, err := newProcessor(p.(*Config))
38 if err != nil {
39 return nil, err
40 }
41 return &tracesProcessor{
42 processor: proc,
43 next: next,
44 m: ptrace.NewJSONMarshaler(),
45 u: ptrace.NewJSONUnmarshaler(),
46 }, nil
47}
48
49func createMetricsProcessor(ctx context.Context, s component.ProcessorCreateSettings, p config.Processor, next consumer.Metrics) (component.MetricsProcessor, error) {
50 proc, err := newProcessor(p.(*Config))
51 if err != nil {
52 return nil, err
53 }
54 return &metricsProcessor{
55 processor: proc,
56 next: next,
57 m: pmetric.NewJSONMarshaler(),
58 u: pmetric.NewJSONUnmarshaler(),
59 }, nil
60}
61
62func createLogsProcessor(ctx context.Context, s component.ProcessorCreateSettings, p config.Processor, next consumer.Logs) (component.LogsProcessor, error) {
63 proc, err := newProcessor(p.(*Config))
64 if err != nil {
65 return nil, err
66 }
67 return &logsProcessor{
68 processor: proc,
69 next: next,
70 m: plog.NewJSONMarshaler(),
71 u: plog.NewJSONUnmarshaler(),
72 }, nil
73}
The actual processor. It's a bit repetitive to work over the 3 metric types.
The collector only exposes the data via a lot of functions and not the underlying data structure. This makes is sufficiently annoying to work with and I opted to roundtrip through json serialization. Additionally, I made the choice here to call the processor once per resource, but after thinking a bit more about it, I think it could have been called over the entire batch, and left to the user to iterate over the input in a list comprehension.
1package cueprocessor
2
3import (
4 "context"
5
6 "cuelang.org/go/cue"
7 "cuelang.org/go/cue/cuecontext"
8 cuejson "cuelang.org/go/encoding/json"
9 "go.opentelemetry.io/collector/component"
10 "go.opentelemetry.io/collector/consumer"
11 "go.opentelemetry.io/collector/pdata/plog"
12 "go.opentelemetry.io/collector/pdata/pmetric"
13 "go.opentelemetry.io/collector/pdata/ptrace"
14)
15
16type tracesProcessor struct {
17 processor
18 next consumer.Traces
19 m ptrace.Marshaler
20 u ptrace.Unmarshaler
21}
22
23type metricsProcessor struct {
24 processor
25 next consumer.Metrics
26 m pmetric.Marshaler
27 u pmetric.Unmarshaler
28}
29
30type logsProcessor struct {
31 processor
32 next consumer.Logs
33 m plog.Marshaler
34 u plog.Unmarshaler
35}
36
37type processor struct {
38 prog string
39}
40
41func newProcessor(c *Config) (processor, error) {
42 return processor{
43 prog: c.Program,
44 }, nil
45}
46
47func (p processor) Capabilities() consumer.Capabilities {
48 return consumer.Capabilities{MutatesData: true}
49}
50
51func (p processor) Start(ctx context.Context, host component.Host) error {
52 return nil
53}
54
55func (p processor) Shutdown(ctx context.Context) error {
56 return nil
57}
58
59func (p tracesProcessor) ConsumeTraces(ctx context.Context, data ptrace.Traces) error {
60 final := ptrace.NewTraces()
61 final.ResourceSpans().EnsureCapacity(data.ResourceSpans().Len())
62
63 single := ptrace.NewTraces()
64 single.ResourceSpans().EnsureCapacity(1)
65
66 for i := 0; i < data.ResourceSpans().Len(); i++ {
67 single.ResourceSpans().AppendEmpty()
68 data.ResourceSpans().At(i).MoveTo(single.ResourceSpans().At(0))
69 jsonRaw, err := p.m.MarshalTraces(single)
70 if err != nil {
71 return err
72 }
73 expr, err := cuejson.Extract("", jsonRaw)
74 if err != nil {
75 return err
76 }
77
78 c := cuecontext.New()
79 val := c.CompileString("in: {}")
80 val = val.Fill(expr, "in")
81 val = c.CompileString(p.prog, cue.Scope(val))
82 val = val.Lookup("out")
83
84 jsonRaw, err = val.MarshalJSON()
85 if err != nil {
86 return err
87 }
88 single, err = p.u.UnmarshalTraces(jsonRaw)
89 if err != nil {
90 return err
91 }
92
93 single.ResourceSpans().MoveAndAppendTo(final.ResourceSpans())
94 }
95
96 return p.next.ConsumeTraces(ctx, final)
97}
98
99func (p metricsProcessor) ConsumeMetrics(ctx context.Context, data pmetric.Metrics) error {
100 final := pmetric.NewMetrics()
101 final.ResourceMetrics().EnsureCapacity(data.ResourceMetrics().Len())
102
103 single := pmetric.NewMetrics()
104 single.ResourceMetrics().EnsureCapacity(1)
105
106 for i := 0; i < data.ResourceMetrics().Len(); i++ {
107 single.ResourceMetrics().AppendEmpty()
108 data.ResourceMetrics().At(i).MoveTo(single.ResourceMetrics().At(0))
109 jsonRaw, err := p.m.MarshalMetrics(single)
110 if err != nil {
111 return err
112 }
113 expr, err := cuejson.Extract("", jsonRaw)
114 if err != nil {
115 return err
116 }
117
118 c := cuecontext.New()
119 val := c.CompileString("in: {}")
120 val = val.Fill(expr, "in")
121 val = c.CompileString(p.prog, cue.Scope(val))
122 val = val.Lookup("out")
123
124 jsonRaw, err = val.MarshalJSON()
125 if err != nil {
126 return err
127 }
128 single, err = p.u.UnmarshalMetrics(jsonRaw)
129 if err != nil {
130 return err
131 }
132
133 single.ResourceMetrics().MoveAndAppendTo(final.ResourceMetrics())
134 }
135
136 return p.next.ConsumeMetrics(ctx, final)
137}
138
139func (p logsProcessor) ConsumeLogs(ctx context.Context, data plog.Logs) error {
140 final := plog.NewLogs()
141 final.ResourceLogs().EnsureCapacity(data.ResourceLogs().Len())
142
143 single := plog.NewLogs()
144 single.ResourceLogs().EnsureCapacity(1)
145
146 for i := 0; i < data.ResourceLogs().Len(); i++ {
147 single.ResourceLogs().AppendEmpty()
148 data.ResourceLogs().At(i).MoveTo(single.ResourceLogs().At(0))
149 jsonRaw, err := p.m.MarshalLogs(single)
150 if err != nil {
151 return err
152 }
153 expr, err := cuejson.Extract("", jsonRaw)
154 if err != nil {
155 return err
156 }
157
158 c := cuecontext.New()
159 val := c.CompileString("in: {}")
160 val = val.Fill(expr, "in")
161 val = c.CompileString(p.prog, cue.Scope(val))
162 val = val.Lookup("out")
163
164 jsonRaw, err = val.MarshalJSON()
165 if err != nil {
166 return err
167 }
168 single, err = p.u.UnmarshalLogs(jsonRaw)
169 if err != nil {
170 return err
171 }
172
173 single.ResourceLogs().MoveAndAppendTo(final.ResourceLogs())
174 }
175
176 return p.next.ConsumeLogs(ctx, final)
177}
178
With the above, we can build a collector using the builder
We need a config file with some basic receivers/exporters for debugging:
1receivers:
2 - import: go.opentelemetry.io/collector/receiver/otlpreceiver
3 gomod: go.opentelemetry.io/collector v0.56.0
4processors:
5 - gomod: go.seankhliao.com/otelcol-exp v0.0.0
6 import: go.seankhliao.com/otelcol-exp/cueprocessor
7 path: /home/arccy/code/soft-serve/otelcol-exp
8exporters:
9 - import: go.opentelemetry.io/collector/exporter/loggingexporter
10 gomod: go.opentelemetry.io/collector v0.56.0
and we can build it with
1$ builder --config builder.yaml
And we can run it, with an example config like:
1receivers:
2 otlp:
3 protocols:
4 grpc:
5 endpoint: localhost:4317
6
7processors:
8 cue:
9 program: |
10 out: resourceSpans: [{
11 resource: attributes: [
12 for x in in.resourceSpans[0].resource.attributes {x},
13 {key: "foo", value: stringValue: "bar"},
14 ]
15 scopeSpans: in.resourceSpans[0].scopeSpans
16 schemaUrl: in.resourceSpans[0].schemaUrl
17 }]
18
19exporters:
20 logging:
21 loglevel: debug
22
23service:
24 pipelines:
25 traces:
26 receivers:
27 - otlp
28 processors:
29 - cue
30 exporters:
31 - logging
running it with:
1$ /tmp/otelcol-distribution1976041369/otelcol-custom --config config.yaml
and we can test it by sending some traces using tracegen
1$ tracegen -otlp-insecure -traces 1