Skip to content

Commit 1199790

Browse files
committed
fix: address review comments
1 parent 1ef98f7 commit 1199790

File tree

3 files changed

+51
-40
lines changed

3 files changed

+51
-40
lines changed

internal/collector/syslogprocessor/processor.go

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,9 @@ func (p *syslogProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
6565
var errs error
6666

6767
rl := ld.ResourceLogs()
68-
for i := range rl.Len() {
69-
sl := rl.At(i).ScopeLogs()
70-
for j := range sl.Len() {
71-
if err := p.processLogRecords(sl.At(j).LogRecords()); err != nil {
68+
for _, sl := range rl.All() {
69+
for _, lr := range sl.ScopeLogs().All() {
70+
if err := p.processLogRecords(lr.LogRecords()); err != nil {
7271
errs = multierr.Append(errs, err)
7372
}
7473
}
@@ -83,23 +82,32 @@ func (p *syslogProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
8382

8483
func (p *syslogProcessor) processLogRecords(lrs plog.LogRecordSlice) error {
8584
// Drop anything that isn't a string-bodied log before processing.
85+
var skipped, errCount int
86+
var t pcommon.ValueType
87+
var errs error
8688
lrs.RemoveIf(func(lr plog.LogRecord) bool {
87-
t := lr.Body().Type()
89+
t = lr.Body().Type()
8890
if t == pcommon.ValueTypeStr {
8991
return false
9092
}
91-
p.settings.Logger.Debug("Skipping log record with unsupported body type", zap.Any("type", t))
9293

94+
skipped++
9395
return true
9496
})
95-
96-
for k := range lrs.Len() {
97-
lr := lrs.At(k)
97+
if skipped > 0 {
98+
p.settings.Logger.Debug("Skipping log record with unsupported body type", zap.Any("type", t))
99+
}
100+
errCount = 0
101+
for _, lr := range lrs.All() {
98102
if err := p.processLogRecord(lr); err != nil {
99-
p.settings.Logger.Debug("failed to process log record", zap.Error(err))
100-
return err
103+
errs = multierr.Append(errs, err)
104+
errCount++
101105
}
102106
}
107+
if errCount > 0 {
108+
p.settings.Logger.Debug("Some log records failed to process", zap.Int("count", errCount))
109+
return errs
110+
}
103111

104112
return nil
105113
}
@@ -138,9 +146,6 @@ func (p *syslogProcessor) setSyslogAttributes(lr plog.LogRecord, m *rfc3164.Sysl
138146
if m.Timestamp != nil {
139147
attrs.PutStr("syslog.timestamp", m.Timestamp.Format(time.RFC3339))
140148
}
141-
if m.Hostname != nil {
142-
attrs.PutStr("syslog.hostname", *m.Hostname)
143-
}
144149
if m.Appname != nil {
145150
attrs.PutStr("syslog.appname", *m.Appname)
146151
}
@@ -417,17 +422,22 @@ func (p *syslogProcessor) extractSignatureData(kvMap map[string]string) []Signat
417422
}
418423

419424
func splitAndTrim(value string) []string {
420-
if value == "" || value == notAvailable {
425+
if strings.TrimSpace(value) == "" || value == notAvailable {
421426
return nil
422427
}
428+
423429
parts := strings.Split(value, ",")
424-
for i := range parts {
425-
parts[i] = strings.TrimSpace(parts[i])
430+
431+
var trimmedParts []string
432+
for _, part := range parts {
433+
trimmed := strings.TrimSpace(part)
434+
if trimmed != "" {
435+
trimmedParts = append(trimmedParts, trimmed)
436+
}
426437
}
427438

428-
return parts
439+
return trimmedParts
429440
}
430-
431441
func buildSignatures(ids, names []string, mask, offset, length string) []SignatureData {
432442
signatures := make([]SignatureData, 0, len(ids))
433443
for i, id := range ids {

internal/collector/syslogprocessor/processor_test.go

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package syslogprocessor
88
import (
99
"context"
1010
"encoding/json"
11+
"fmt"
1112
"testing"
1213

1314
"github.com/stretchr/testify/assert"
@@ -31,7 +32,6 @@ func TestSyslogProcessor(t *testing.T) {
3132
name: "csv nginx app protect syslog message",
3233
body: `<130>Aug 22 03:28:35 ip-172-16-0-213 ASM:N/A,80,127.0.0.1,false,GET,nms_app_protect_default_policy,HTTP,blocked,0,N/A,N/A::N/A,{High Accuracy Signatures;Cross Site Scripting Signatures}::{High Accuracy Signatures; Cross Site Scripting Signatures},56064,N/A,5377540117854870581,N/A,5,1-localhost:1-/,N/A,REJECTED,SECURITY_WAF_VIOLATION,Illegal meta character in URL::Attack signature detected::Violation Rating Threat detected::Bot Client Detected,<?xml version='1.0' encoding='UTF-8'?><BAD_MSG><violation_masks><block>414000000200c00-3a03030c30000072-8000000000000000-0</block><alarm>475f0ffcbbd0fea-befbf35cb000007e-f400000000000000-0</alarm><learn>0-0-0-0</learn><staging>0-0-0-0</staging></violation_masks><request-violations><violation><viol_index>42</viol_index><viol_name>VIOL_ATTACK_SIGNATURE</viol_name><context>url</context><sig_data><sig_id>200000099</sig_id><blocking_mask>3</blocking_mask><kw_data><buffer>Lzw+PHNjcmlwdD4=</buffer><offset>3</offset><length>7</length></kw_data></sig_data><sig_data><sig_id>200000093</sig_id><blocking_mask>3</blocking_mask><kw_data><buffer>Lzw+PHNjcmlwdD4=</buffer><offset>4</offset><length>7</length></kw_data></sig_data></violation><violation><viol_index>26</viol_index><viol_name>VIOL_URL_METACHAR</viol_name><uri>Lzw+PHNjcmlwdD4=</uri><metachar_index>60</metachar_index><wildcard_entity>*</wildcard_entity><staging>0</staging></violation><violation><viol_index>26</viol_index><viol_name>VIOL_URL_METACHAR</viol_name><uri>Lzw+PHNjcmlwdD4=</uri><metachar_index>62</metachar_index><wildcard_entity>*</wildcard_entity><staging>0</staging></violation><violation><viol_index>122</viol_index><viol_name>VIOL_BOT_CLIENT</viol_name></violation><violation><viol_index>93</viol_index><viol_name>VIOL_RATING_THREAT</viol_name></violation></request-violations></BAD_MSG>,curl,HTTP Library,N/A,N/A,Untrusted Bot,N/A,N/A,HTTP/1.1,/<><script>,GET /<><script> HTTP/1.1\\r\\nHost: localhost\\r\\nUser-Agent: curl/7.81.0\\r\\nAccept: */*\\r\\n\\r\\n`,
3334
expectAttrs: map[string]string{
34-
"syslog.hostname": "ip-172-16-0-213",
3535
"syslog.appname": "ASM",
3636
"app_protect.policy_name": "nms_app_protect_default_policy",
3737
"app_protect.support_id": "5377540117854870581",
@@ -44,8 +44,7 @@ func TestSyslogProcessor(t *testing.T) {
4444
name: "simple valid syslog message",
4545
body: "<34>Oct 11 22:14:15 mymachine su: 'su root' failed for lonvick on /dev/pts/8",
4646
expectAttrs: map[string]string{
47-
"syslog.hostname": "mymachine",
48-
"syslog.appname": "su",
47+
"syslog.appname": "su",
4948
},
5049
expectRecords: 1,
5150
},
@@ -159,16 +158,14 @@ func TestSyslogProcessor(t *testing.T) {
159158

160159
func TestSyslogProcessorFailure(t *testing.T) {
161160
testCases := []struct {
162-
expectAttrs map[string]string
163-
body any
164161
name string
165-
expectJSON string
162+
body any
166163
expectRecords int
167164
}{
168165
{
169166
name: "invalid syslog message",
170167
body: "not a syslog line",
171-
expectRecords: 1,
168+
expectRecords: 0,
172169
},
173170
}
174171

@@ -177,31 +174,35 @@ func TestSyslogProcessorFailure(t *testing.T) {
177174
ctx := context.Background()
178175
settings := processortest.NewNopSettings(processortest.NopType)
179176
settings.Logger = zap.NewNop()
180-
181177
logs := plog.NewLogs()
182-
lr := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
178+
logRecord := logs.ResourceLogs().
179+
AppendEmpty().
180+
ScopeLogs().
181+
AppendEmpty().
182+
LogRecords().
183+
AppendEmpty()
184+
183185
switch v := tc.body.(type) {
184186
case string:
185-
lr.Body().SetStr(v)
187+
logRecord.Body().SetStr(v)
186188
case int:
187-
lr.Body().SetInt(int64(v))
189+
logRecord.Body().SetInt(int64(v))
188190
case []byte:
189-
lr.Body().SetEmptyBytes().FromRaw(v)
191+
logRecord.Body().SetEmptyBytes().FromRaw(v)
190192
}
191193

194+
// Create sink and processor.
192195
sink := &consumertest.LogsSink{}
193-
p := newSyslogProcessor(sink, settings)
194-
require.NoError(t, p.Start(ctx, nil))
196+
processor := newSyslogProcessor(sink, settings)
195197

196-
err := p.ConsumeLogs(ctx, logs)
198+
require.NoError(t, processor.Start(ctx, nil))
199+
err := processor.ConsumeLogs(ctx, logs)
200+
fmt.Println(err)
197201
require.Error(t, err)
198202

199-
if tc.expectRecords == 0 {
200-
assert.Equal(t, 0, sink.LogRecordCount(), "no logs should be produced")
201-
require.NoError(t, p.Shutdown(ctx))
203+
assert.Equal(t, tc.expectRecords, sink.LogRecordCount(), "unexpected number of logs produced")
202204

203-
return
204-
}
205+
require.NoError(t, processor.Shutdown(ctx))
205206
})
206207
}
207208
}

internal/config/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ func addDefaultPipelines(collector *Collector) {
186186
if _, ok := collector.Pipelines.Logs[DefaultPipeline]; !ok {
187187
collector.Pipelines.Logs[DefaultPipeline] = &Pipeline{
188188
Receivers: []string{"tcplog/nginx_app_protect"},
189-
Processors: []string{"syslog/default", "logsgzip/default", "batch/default_logs"},
189+
Processors: []string{"logsgzip/default", "batch/default_logs"},
190190
Exporters: []string{"otlp/default"},
191191
}
192192
}

0 commit comments

Comments
 (0)