Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion stash/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type (
ElasticSearchConf struct {
Hosts []string
Index string
DocType string `json:",default=doc"`
DocType string `json:",default=_doc"`
TimeZone string `json:",optional"`
MaxChunkBytes int `json:",default=15728640"` // default 15M
Compress bool `json:",default=false"`
Expand Down
9 changes: 5 additions & 4 deletions stash/es/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package es

import (
"context"
"fmt"

"github.com/kevwan/go-stash/stash/config"
"github.com/olivere/elastic/v7"
Expand All @@ -10,7 +11,7 @@ import (
"github.com/zeromicro/go-zero/core/logx"
)

const es8Version = "8.0.0"
const es8Version = "v8.0.0"

type (
Writer struct {
Expand All @@ -22,7 +23,7 @@ type (

valueWithIndex struct {
index string
val string
val map[string]interface{}
}
)

Expand All @@ -45,13 +46,13 @@ func NewWriter(c config.ElasticSearchConf) (*Writer, error) {
writer := Writer{
docType: c.DocType,
client: client,
esVersion: version,
esVersion: fmt.Sprintf("v%s", version),
}
writer.inserter = executors.NewChunkExecutor(writer.execute, executors.WithChunkBytes(c.MaxChunkBytes))
return &writer, nil
}

func (w *Writer) Write(index, val string) error {
func (w *Writer) Write(index string, val map[string]interface{}) error {
return w.inserter.Add(valueWithIndex{
index: index,
val: val,
Expand Down
9 changes: 3 additions & 6 deletions stash/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
jsoniter "github.com/json-iterator/go"
"github.com/kevwan/go-stash/stash/es"
"github.com/kevwan/go-stash/stash/filter"
"time"
)

type MessageHandler struct {
Expand Down Expand Up @@ -37,11 +38,7 @@ func (mh *MessageHandler) Consume(_, val string) error {
return nil
}
}
m["timestamp"] = time.Now()

bs, err := jsoniter.Marshal(m)
if err != nil {
return err
}

return mh.writer.Write(index, string(bs))
return mh.writer.Write(index, m)
}