Skip to content

Commit 364c6c9

Browse files
author
Mario Macias
authored
NETOBSERV-578: removing arrays from pipeline interfaces (#319)
* removing arrays from pipeline interfaces : * fixed linting * reverted batchSize and batchSizeBytes back * fixed linting * adding batcher only for stages that really need batching * make linter happy * fix race condition * fix simple pipeline tests * fix again tests * increased times in slow test * synchronized tests to make them pass in CI * try to fix e2e test * fix data race in test
1 parent f466f1f commit 364c6c9

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+506
-955
lines changed

go.sum

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -699,8 +699,6 @@ github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxzi
699699
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
700700
github.com/netobserv/gopipes v0.1.1 h1:f8zJsvnMgRFRa2B+1siwRtW0Y4dqeBROmkcI/HgT1gE=
701701
github.com/netobserv/gopipes v0.1.1/go.mod h1:eGoHZW1ON8Dx/zmDXUhsbVNqatPjtpdO0UZBmGZGmVI=
702-
github.com/netobserv/loki-client-go v0.0.0-20211018150932-cb17208397a9 h1:c2swm3EamzgjBq9idNbEs5bNz20FJo/HK6uxyigXekQ=
703-
github.com/netobserv/loki-client-go v0.0.0-20211018150932-cb17208397a9/go.mod h1:LHXpc5tjKvsfZn0pwLKrvlgEhZcCaw3Di9mUEZGAI4E=
704702
github.com/netobserv/loki-client-go v0.0.0-20220927092034-f37122a54500 h1:RmnoJe/ci5q+QdM7upFdxiU+D8F3L3qTd5wXCwwHefw=
705703
github.com/netobserv/loki-client-go v0.0.0-20220927092034-f37122a54500/go.mod h1:LHXpc5tjKvsfZn0pwLKrvlgEhZcCaw3Di9mUEZGAI4E=
706704
github.com/netobserv/netobserv-ebpf-agent v0.1.1-0.20220608092850-3fd4695b7cc2 h1:K7SjoqEfzpMfIjHV85Lg8UDMvZu8rPfrsgKRoo7W30o=

pkg/api/transform_network.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,15 @@ func (tn *TransformNetwork) GetServiceFiles() (string, string) {
3636
return p, s
3737
}
3838

39+
const (
40+
OpAddRegexIf = "add_regex_if"
41+
OpAddIf = "add_if"
42+
OpAddSubnet = "add_subnet"
43+
OpAddLocation = "add_location"
44+
OpAddService = "add_service"
45+
OpAddKubernetes = "add_kubernetes"
46+
)
47+
3948
type TransformNetworkOperationEnum struct {
4049
AddRegExIf string `yaml:"add_regex_if" json:"add_regex_if" doc:"add output field if input field satisfies regex pattern from parameters field"`
4150
AddIf string `yaml:"add_if" json:"add_if" doc:"add output field if input field satisfies criteria from parameters field"`

pkg/config/config.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package config
2020
import (
2121
"bytes"
2222
"encoding/json"
23+
"time"
2324

2425
"github.com/netobserv/flowlogs-pipeline/pkg/api"
2526
"github.com/sirupsen/logrus"
@@ -38,6 +39,7 @@ type ConfigFileStruct struct {
3839
Pipeline []Stage `yaml:"pipeline,omitempty" json:"pipeline,omitempty"`
3940
Parameters []StageParam `yaml:"parameters,omitempty" json:"parameters,omitempty"`
4041
MetricsSettings MetricsSettings `yaml:"metricsSettings,omitempty" json:"metricsSettings,omitempty"`
42+
PerfSettings PerfSettings `yaml:"perfSettings,omitempty" json:"perfSettings,omitempty"`
4143
}
4244

4345
type Health struct {
@@ -60,6 +62,12 @@ type MetricsSettings struct {
6062
NoPanic bool `yaml:"noPanic,omitempty" json:"noPanic,omitempty"`
6163
}
6264

65+
// PerfSettings allows setting some internal configuration parameters
66+
type PerfSettings struct {
67+
BatcherMaxLen int `yaml:"batcherMaxLen,omitempty" json:"batcherMaxLen,omitempty"`
68+
BatcherTimeout time.Duration `yaml:"batcherMaxTimeout,omitempty" json:"batcherMaxTimeout,omitempty"`
69+
}
70+
6371
type Stage struct {
6472
Name string `yaml:"name" json:"name"`
6573
Follows string `yaml:"follows,omitempty" json:"follows,omitempty"`

pkg/pipeline/aggregate_prom_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,9 @@ parameters:
178178
// we use ElementsMatch() rather than Equals()
179179
require.ElementsMatch(t, tt.expectedAggs, actualAggs)
180180

181-
promEncode.Encode(actualAggs)
181+
for _, aa := range actualAggs {
182+
promEncode.Encode(aa)
183+
}
182184
exposed := test.ReadExposedMetrics(t)
183185

184186
for _, expected := range tt.expectedEncode {

pkg/pipeline/conntrack_integ_test.go

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@ package pipeline
2020
import (
2121
"bufio"
2222
"os"
23+
"sync/atomic"
2324
"testing"
2425
"time"
2526

27+
test2 "github.com/mariomac/guara/pkg/test"
2628
"github.com/netobserv/flowlogs-pipeline/pkg/config"
2729
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/decode"
2830
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/ingest"
@@ -95,24 +97,31 @@ func TestConnTrack(t *testing.T) {
9597
var err error
9698
v, cfg := test.InitConfig(t, testConfigConntrack)
9799
require.NotNil(t, v)
100+
cfg.PerfSettings.BatcherMaxLen = 200_000
101+
cfg.PerfSettings.BatcherTimeout = 2 * time.Second
98102

99103
mainPipeline, err = NewPipeline(cfg)
100104
require.NoError(t, err)
101105

102106
go mainPipeline.Run()
103107

104-
in := mainPipeline.pipelineStages[0].Ingester.(*ingest.IngestFake).In
108+
ingest := mainPipeline.pipelineStages[0].Ingester.(*ingest.IngestFake)
109+
in := ingest.In
105110
writer := mainPipeline.pipelineStages[2].Writer.(*write.WriteFake)
106111

107-
ingestFile(t, in, "../../hack/examples/ocp-ipfix-flowlogs.json")
108-
writer.Wait()
109-
writer.ResetWait()
112+
sentLines := ingestFile(t, in, "../../hack/examples/ocp-ipfix-flowlogs.json")
113+
114+
// wait for all the lines to be ingested
115+
test2.Eventually(t, 15*time.Second, func(t require.TestingT) {
116+
require.EqualValues(t, atomic.LoadInt64(&ingest.Count), sentLines,
117+
"sent: %d. got: %d", sentLines, ingest.Count)
118+
}, test2.Interval(10*time.Millisecond))
110119

111120
// Wait a moment to make the connections expired
112121
time.Sleep(2 * time.Second)
113-
// Send an empty list to the pipeline to allow the connection tracking output end connection records
114-
in <- []config.GenericMap{}
115-
writer.Wait()
122+
123+
// Send something to the pipeline to allow the connection tracking output end connection records
124+
in <- config.GenericMap{"DstAddr": "1.2.3.4"}
116125

117126
// Verify that the output records contain an expected end connection record.
118127
expected := config.GenericMap{
@@ -131,10 +140,14 @@ func TestConnTrack(t *testing.T) {
131140
"_RecordType": "endConnection",
132141
"numFlowLogs": 5.0,
133142
}
134-
require.Containsf(t, writer.AllRecords, expected, "The output records don't include the expected record %v", expected)
143+
// Wait for the record to be eventually forwarded to the writer
144+
test2.Eventually(t, 15*time.Second, func(t require.TestingT) {
145+
require.Containsf(t, writer.AllRecords(), expected,
146+
"The output records don't include the expected record %v", expected)
147+
})
135148
}
136149

137-
func ingestFile(t *testing.T, in chan<- []config.GenericMap, filepath string) {
150+
func ingestFile(t *testing.T, in chan<- config.GenericMap, filepath string) int {
138151
t.Helper()
139152
file, err := os.Open(filepath)
140153
require.NoError(t, err)
@@ -147,8 +160,14 @@ func ingestFile(t *testing.T, in chan<- []config.GenericMap, filepath string) {
147160
text := scanner.Text()
148161
lines = append(lines, []byte(text))
149162
}
163+
submittedLines := 0
150164
decoder, err := decode.NewDecodeJson()
151165
require.NoError(t, err)
152-
decoded := decoder.Decode(lines)
153-
in <- decoded
166+
for _, line := range lines {
167+
line, err := decoder.Decode(line)
168+
require.NoError(t, err)
169+
in <- line
170+
submittedLines++
171+
}
172+
return submittedLines
154173
}

pkg/pipeline/decode/decode.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import (
2525
)
2626

2727
type Decoder interface {
28-
Decode(in [][]byte) []config.GenericMap
28+
Decode(in []byte) (config.GenericMap, error)
2929
}
3030

3131
func GetDecoder(params api.Decoder) (Decoder, error) {

pkg/pipeline/decode/decode_json.go

Lines changed: 17 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -29,32 +29,26 @@ type DecodeJson struct {
2929
}
3030

3131
// Decode decodes input strings to a list of flow entries
32-
func (c *DecodeJson) Decode(in [][]byte) []config.GenericMap {
33-
out := make([]config.GenericMap, 0)
34-
for _, line := range in {
35-
if log.IsLevelEnabled(log.DebugLevel) {
36-
log.Debugf("decodeJson: line = %v", string(line))
37-
}
38-
var decodedLine map[string]interface{}
39-
err := json.Unmarshal(line, &decodedLine)
40-
if err != nil {
41-
log.Errorf("decodeJson Decode: error unmarshalling a line: %v", err)
42-
log.Errorf("line = %s", line)
32+
func (c *DecodeJson) Decode(line []byte) (config.GenericMap, error) {
33+
34+
if log.IsLevelEnabled(log.DebugLevel) {
35+
log.Debugf("decodeJson: line = %v", string(line))
36+
}
37+
var decodedLine map[string]interface{}
38+
if err := json.Unmarshal(line, &decodedLine); err != nil {
39+
return nil, err
40+
}
41+
decodedLine2 := make(config.GenericMap, len(decodedLine))
42+
// flows directly ingested by flp-transformer won't have this field, so we need to add it
43+
// here. If the received line already contains the field, it will be overridden later
44+
decodedLine2["TimeReceived"] = time.Now().Unix()
45+
for k, v := range decodedLine {
46+
if v == nil {
4347
continue
4448
}
45-
decodedLine2 := make(config.GenericMap, len(decodedLine))
46-
// flows directly ingested by flp-transformer won't have this field, so we need to add it
47-
// here. If the received line already contains the field, it will be overridden later
48-
decodedLine2["TimeReceived"] = time.Now().Unix()
49-
for k, v := range decodedLine {
50-
if v == nil {
51-
continue
52-
}
53-
decodedLine2[k] = v
54-
}
55-
out = append(out, decodedLine2)
49+
decodedLine2[k] = v
5650
}
57-
return out
51+
return decodedLine2, nil
5852
}
5953

6054
// NewDecodeJson create a new decode

pkg/pipeline/decode/decode_json_test.go

Lines changed: 16 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ package decode
2020
import (
2121
"testing"
2222

23-
"github.com/netobserv/flowlogs-pipeline/pkg/config"
2423
"github.com/stretchr/testify/require"
2524
)
2625

@@ -33,41 +32,29 @@ func initNewDecodeJson(t *testing.T) Decoder {
3332
func TestDecodeJson(t *testing.T) {
3433
newDecode := initNewDecodeJson(t)
3534
decodeJson := newDecode.(*DecodeJson)
36-
inputString1 := "{\"varInt\": 12, \"varString\":\"testString\", \"varBool\":false}"
37-
inputString2 := "{\"varInt\": 14, \"varString\":\"testString2\", \"varBool\":true, \"TimeReceived\":12345}"
38-
inputString3 := "{}"
39-
inputStringErr := "{\"varInt\": 14, \"varString\",\"testString2\", \"varBool\":true}"
40-
var in [][]byte
41-
var out []config.GenericMap
42-
out = decodeJson.Decode(in)
43-
require.Equal(t, 0, len(out))
44-
in = append(in, []byte(inputString1))
45-
in = append(in, []byte(inputString2))
46-
in = append(in, []byte(inputString3))
47-
in = append(in, []byte(inputStringErr))
48-
out = decodeJson.Decode(in)
49-
require.Equal(t, len(out), 3)
50-
require.Equal(t, float64(12), out[0]["varInt"])
51-
require.Equal(t, "testString", out[0]["varString"])
52-
require.Equal(t, false, out[0]["varBool"])
35+
36+
out, err := decodeJson.Decode([]byte(
37+
"{\"varInt\": 12, \"varString\":\"testString\", \"varBool\":false}"))
38+
require.NoError(t, err)
39+
require.Equal(t, float64(12), out["varInt"])
40+
require.Equal(t, "testString", out["varString"])
41+
require.Equal(t, false, out["varBool"])
5342
// TimeReceived is added if it does not exist
54-
require.NotZero(t, out[0]["TimeReceived"])
43+
require.NotZero(t, out["TimeReceived"])
44+
45+
out, err = decodeJson.Decode([]byte(
46+
"{\"varInt\": 14, \"varString\":\"testString2\", \"varBool\":true, \"TimeReceived\":12345}"))
47+
require.NoError(t, err)
5548
// TimeReceived is kept if it already existed
56-
require.EqualValues(t, 12345, out[1]["TimeReceived"])
49+
require.EqualValues(t, 12345, out["TimeReceived"])
5750

5851
// TODO: Check for more complicated json structures
5952
}
6053

6154
func TestDecodeJsonTimestamps(t *testing.T) {
6255
newDecode := initNewDecodeJson(t)
6356
decodeJson := newDecode.(*DecodeJson)
64-
inputString1 := "{\"unixTime\": 1645104030 }"
65-
var in [][]byte
66-
var out []config.GenericMap
67-
out = decodeJson.Decode(in)
68-
require.Equal(t, 0, len(out))
69-
in = append(in, []byte(inputString1))
70-
out = decodeJson.Decode(in)
71-
require.Equal(t, len(out), 1)
72-
require.Equal(t, float64(1645104030), out[0]["unixTime"])
57+
out, err := decodeJson.Decode([]byte("{\"unixTime\": 1645104030 }"))
58+
require.NoError(t, err)
59+
require.Equal(t, float64(1645104030), out["unixTime"])
7360
}

pkg/pipeline/decode/decode_protobuf.go

Lines changed: 6 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,9 @@ import (
77

88
"github.com/netobserv/flowlogs-pipeline/pkg/config"
99
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow"
10-
"github.com/sirupsen/logrus"
1110
"google.golang.org/protobuf/proto"
1211
)
1312

14-
var pflog = logrus.WithField("component", "Protobuf")
15-
1613
// Protobuf decodes protobuf flow records definitions, as forwarded by
1714
// ingest.NetObservAgent, into a Generic Map that follows the same naming conventions
1815
// as the IPFIX flows from ingest.IngestCollector
@@ -25,30 +22,15 @@ func NewProtobuf() (*Protobuf, error) {
2522

2623
// Decode decodes the protobuf raw flows and returns a list of GenericMaps representing all
2724
// the flows there
28-
func (p *Protobuf) Decode(rawFlows [][]byte) []config.GenericMap {
29-
flows := make([]config.GenericMap, 0, len(rawFlows))
30-
for _, pbRaw := range rawFlows {
31-
record := pbflow.Record{}
32-
if err := proto.Unmarshal(pbRaw, &record); err != nil {
33-
pflog.WithError(err).Debug("can't unmarshall received protobuf flow. Ignoring")
34-
continue
35-
}
36-
flows = append(flows, pbFlowToMap(&record))
25+
func (p *Protobuf) Decode(rawFlow []byte) (config.GenericMap, error) {
26+
record := pbflow.Record{}
27+
if err := proto.Unmarshal(rawFlow, &record); err != nil {
28+
return nil, fmt.Errorf("unmarshaling ProtoBuf record: %w", err)
3729
}
38-
return flows
39-
}
40-
41-
// PBRecordsAsMaps transform all the flows in a pbflow.Records entry into a slice
42-
// of GenericMaps
43-
func PBRecordsAsMaps(flow *pbflow.Records) []config.GenericMap {
44-
out := make([]config.GenericMap, 0, len(flow.Entries))
45-
for _, entry := range flow.Entries {
46-
out = append(out, pbFlowToMap(entry))
47-
}
48-
return out
30+
return PBFlowToMap(&record), nil
4931
}
5032

51-
func pbFlowToMap(flow *pbflow.Record) config.GenericMap {
33+
func PBFlowToMap(flow *pbflow.Record) config.GenericMap {
5234
if flow == nil {
5335
return config.GenericMap{}
5436
}

pkg/pipeline/decode/decode_protobuf_test.go

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,10 @@ func TestDecodeProtobuf(t *testing.T) {
4545
rawPB, err := proto.Marshal(&flow)
4646
require.NoError(t, err)
4747

48-
out := decoder.Decode([][]byte{rawPB})
49-
require.Len(t, out, 1)
50-
assert.NotZero(t, out[0]["TimeReceived"])
51-
delete(out[0], "TimeReceived")
48+
out, err := decoder.Decode(rawPB)
49+
require.NoError(t, err)
50+
assert.NotZero(t, out["TimeReceived"])
51+
delete(out, "TimeReceived")
5252
assert.Equal(t, config.GenericMap{
5353
"FlowDirection": 1,
5454
"Bytes": uint64(456),
@@ -64,10 +64,10 @@ func TestDecodeProtobuf(t *testing.T) {
6464
"TimeFlowStartMs": someTime.UnixMilli(),
6565
"TimeFlowEndMs": someTime.UnixMilli(),
6666
"Interface": "eth0",
67-
}, out[0])
67+
}, out)
6868
}
6969

70-
func TestPBRecordsAsMaps(t *testing.T) {
70+
func TestPBFlowToMap(t *testing.T) {
7171
someTime := time.Now()
7272
flow := &pbflow.Record{
7373
Interface: "eth0",
@@ -96,10 +96,9 @@ func TestPBRecordsAsMaps(t *testing.T) {
9696
},
9797
}
9898

99-
out := PBRecordsAsMaps(&pbflow.Records{Entries: []*pbflow.Record{flow}})
100-
require.Len(t, out, 1)
101-
assert.NotZero(t, out[0]["TimeReceived"])
102-
delete(out[0], "TimeReceived")
99+
out := PBFlowToMap(flow)
100+
assert.NotZero(t, out["TimeReceived"])
101+
delete(out, "TimeReceived")
103102
assert.Equal(t, config.GenericMap{
104103
"FlowDirection": 1,
105104
"Bytes": uint64(456),
@@ -115,6 +114,6 @@ func TestPBRecordsAsMaps(t *testing.T) {
115114
"TimeFlowStartMs": someTime.UnixMilli(),
116115
"TimeFlowEndMs": someTime.UnixMilli(),
117116
"Interface": "eth0",
118-
}, out[0])
117+
}, out)
119118

120119
}

0 commit comments

Comments
 (0)