Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 24 additions & 11 deletions app/vlinsert/jsonline/jsonline.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,21 @@ import (

// RequestHandler processes jsonline insert requests
func RequestHandler(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
w.Header().Add("Content-Type", "application/json")

if r.Method != "POST" {
w.Header().Add("Access-Control-Allow-Origin", "*")
w.Header().Add("Access-Control-Allow-Methods", "POST, OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization")
switch r.Method {
case http.MethodOptions:
w.WriteHeader(http.StatusOK)
return
case http.MethodPost:
w.Header().Add("Content-Type", "application/json")
default:
w.WriteHeader(http.StatusMethodNotAllowed)
return
}

startTime := time.Now()
requestsTotal.Inc()

cp, err := insertutil.GetCommonParams(r)
Expand Down Expand Up @@ -103,15 +110,21 @@ func readLine(lr *insertutil.LineReader, timeFields, msgFields []string, lmp ins
p := logstorage.GetJSONParser()
defer logstorage.PutJSONParser(p)

if err := p.ParseLogMessage(line); err != nil {
return true, fmt.Errorf("%s; line contents: %q", err, line)
p.Init(line)
for p.NextMessage() {
if err := p.Error(); err != nil {
return true, err
}
ts, err := insertutil.ExtractTimestampFromFields(timeFields, p.Fields)
if err != nil {
return true, err
}
logstorage.RenameField(p.Fields, msgFields, "_msg")
lmp.AddRow(ts, p.Fields, nil)
}
ts, err := insertutil.ExtractTimestampFromFields(timeFields, p.Fields)
if err != nil {
return true, fmt.Errorf("%s; line contents: %q", err, line)
if err := p.Error(); err != nil {
return true, err
}
logstorage.RenameField(p.Fields, msgFields, "_msg")
lmp.AddRow(ts, p.Fields, nil)

return true, nil
}
Expand Down
5 changes: 5 additions & 0 deletions app/vlinsert/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ func insertHandler(w http.ResponseWriter, r *http.Request, path string) bool {
case "/insert/jsonline":
jsonline.RequestHandler(w, r)
return true
case "/insert/services/collector/event":
r.Header.Add("VL-Msg-Field", "event")
r.Header.Add("VL-Time-Field", "time")
jsonline.RequestHandler(w, r)
return true
case "/insert/ready":
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(200)
Expand Down
1 change: 1 addition & 0 deletions deployment/docker/victorialogs/vector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The folder contains examples of [Vector](https://vector.dev/docs/) integration w
* [jsonline single node](./jsonline)
* [jsonline HA setup](./jsonline-ha)
* [datadog](./datadog)
* [splunk](./splunk)

## Quick start

Expand Down
3 changes: 3 additions & 0 deletions deployment/docker/victorialogs/vector/splunk/compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
include:
- ../compose-base.yml
name: vector-splunk
36 changes: 36 additions & 0 deletions deployment/docker/victorialogs/vector/splunk/vector.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
api:
enabled: true
address: 0.0.0.0:8686
sources:
docker:
type: docker_logs
metrics:
type: internal_metrics
transforms:
msg_parser:
type: remap
inputs:
- docker
source: |
.message = parse_json(.message) ?? .message
sinks:
splunk:
type: splunk_hec_logs
inputs:
- msg_parser
endpoint: http://victorialogs:9428/insert
indexed_fields:
- image
encoding:
codec: json
compression: gzip
default_token: test
healthcheck:
enabled: false
victoriametrics:
type: prometheus_remote_write
endpoint: http://victoriametrics:8428/api/v1/write
inputs:
- metrics
healthcheck:
enabled: false
57 changes: 42 additions & 15 deletions lib/logstorage/json_parser.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package logstorage

import (
"fmt"
"sync"

"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
Expand All @@ -20,8 +21,11 @@ type JSONParser struct {
// or until the parser is returned to the pool with PutParser() call.
Fields []Field

// p is used for fast JSON parsing
p fastjson.Parser
// s is used for fast JSON parsing
s fastjson.Scanner

// err contains parsing error
err error

// buf is used for holding the backing data for Fields
buf []byte
Expand All @@ -32,6 +36,7 @@ type JSONParser struct {
}

func (p *JSONParser) reset() {
p.err = nil
clear(p.Fields)
p.Fields = p.Fields[:0]

Expand Down Expand Up @@ -72,19 +77,9 @@ func (p *JSONParser) ParseLogMessage(msg []byte) error {
//
// The p.Fields remains valid until the next call to ParseLogMessage() or PutJSONParser().
func (p *JSONParser) parseLogMessage(msg []byte, maxFieldNameLen int) error {
p.reset()

msgStr := bytesutil.ToUnsafeString(msg)
v, err := p.p.Parse(msgStr)
if err != nil {
return err
}
o, err := v.Object()
if err != nil {
return err
}
p.Fields, p.buf, p.prefixBuf = appendLogFields(p.Fields, p.buf, p.prefixBuf, o, maxFieldNameLen)
return nil
p.Init(msg)
p.nextMessage(maxFieldNameLen)
return p.Error()
}

func appendLogFields(dst []Field, dstBuf, prefixBuf []byte, o *fastjson.Object, maxFieldNameLen int) ([]Field, []byte, []byte) {
Expand Down Expand Up @@ -166,3 +161,35 @@ func appendLogField(dst []Field, dstBuf, prefixBuf, k, value []byte) ([]Field, [
})
return dst, dstBuf
}

func (p *JSONParser) Init(msg []byte) {
p.s.InitBytes(msg)
}

func (p *JSONParser) NextMessage() bool {
return p.nextMessage(maxFieldNameSize)
}

func (p *JSONParser) nextMessage(maxFieldNameLen int) bool {
p.reset()
if !p.s.Next() {
p.err = p.s.Error()
return false
}
v := p.s.Value()
if v == nil {
p.err = fmt.Errorf("no value found")
return true
}
o, err := v.Object()
if err != nil {
p.err = err
return true
}
p.Fields, p.buf, p.prefixBuf = appendLogFields(p.Fields, p.buf, p.prefixBuf, o, maxFieldNameLen)
return true
}

func (p *JSONParser) Error() error {
return p.err
}
1 change: 0 additions & 1 deletion lib/logstorage/json_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ func TestJSONParserFailure(t *testing.T) {
}
PutJSONParser(p)
}
f("")
f("{foo")
f("[1,2,3]")
f(`{"foo",}`)
Expand Down