diff --git a/stash/config/config.go b/stash/config/config.go index e03ce46..09dede8 100644 --- a/stash/config/config.go +++ b/stash/config/config.go @@ -17,7 +17,7 @@ type ( ElasticSearchConf struct { Hosts []string Index string - DocType string `json:",default=doc"` + DocType string `json:",default=_doc"` TimeZone string `json:",optional"` MaxChunkBytes int `json:",default=15728640"` // default 15M Compress bool `json:",default=false"` diff --git a/stash/es/writer.go b/stash/es/writer.go index cbe8eeb..0521fc6 100644 --- a/stash/es/writer.go +++ b/stash/es/writer.go @@ -2,6 +2,7 @@ package es import ( "context" + "fmt" "github.com/kevwan/go-stash/stash/config" "github.com/olivere/elastic/v7" @@ -10,7 +11,7 @@ import ( "github.com/zeromicro/go-zero/core/logx" ) -const es8Version = "8.0.0" +const es8Version = "v8.0.0" type ( Writer struct { @@ -22,7 +23,7 @@ type ( valueWithIndex struct { index string - val string + val map[string]interface{} } ) @@ -45,13 +46,13 @@ func NewWriter(c config.ElasticSearchConf) (*Writer, error) { writer := Writer{ docType: c.DocType, client: client, - esVersion: version, + esVersion: fmt.Sprintf("v%s", version), } writer.inserter = executors.NewChunkExecutor(writer.execute, executors.WithChunkBytes(c.MaxChunkBytes)) return &writer, nil } -func (w *Writer) Write(index, val string) error { +func (w *Writer) Write(index string, val map[string]interface{}) error { return w.inserter.Add(valueWithIndex{ index: index, val: val, diff --git a/stash/handler/handler.go b/stash/handler/handler.go index 7da902a..4c23b2f 100644 --- a/stash/handler/handler.go +++ b/stash/handler/handler.go @@ -4,6 +4,7 @@ import ( jsoniter "github.com/json-iterator/go" "github.com/kevwan/go-stash/stash/es" "github.com/kevwan/go-stash/stash/filter" + "time" ) type MessageHandler struct { @@ -37,11 +38,7 @@ func (mh *MessageHandler) Consume(_, val string) error { return nil } } + m["timestamp"] = time.Now() - bs, err := jsoniter.Marshal(m) - if err != nil { - return err - } - - return mh.writer.Write(index, string(bs)) + return mh.writer.Write(index, m) }