From e8836116877fbffed38e79596414a0787a2ba15c Mon Sep 17 00:00:00 2001 From: vine Date: Fri, 12 Jul 2024 16:50:49 +0800 Subject: [PATCH] =?UTF-8?q?fix:=E4=BF=AE=E5=A4=8D=E7=89=88=E6=9C=AC?= =?UTF-8?q?=E6=AF=94=E5=AF=B9=E5=BC=82=E5=B8=B8=20feat:=E6=97=A5=E5=BF=97?= =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E6=97=B6=E9=97=B4=E7=B1=BB=E5=9E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- stash/config/config.go | 2 +- stash/es/writer.go | 9 +++++---- stash/handler/handler.go | 9 +++------ 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/stash/config/config.go b/stash/config/config.go index e03ce46..09dede8 100644 --- a/stash/config/config.go +++ b/stash/config/config.go @@ -17,7 +17,7 @@ type ( ElasticSearchConf struct { Hosts []string Index string - DocType string `json:",default=doc"` + DocType string `json:",default=_doc"` TimeZone string `json:",optional"` MaxChunkBytes int `json:",default=15728640"` // default 15M Compress bool `json:",default=false"` diff --git a/stash/es/writer.go b/stash/es/writer.go index cbe8eeb..0521fc6 100644 --- a/stash/es/writer.go +++ b/stash/es/writer.go @@ -2,6 +2,7 @@ package es import ( "context" + "fmt" "github.com/kevwan/go-stash/stash/config" "github.com/olivere/elastic/v7" @@ -10,7 +11,7 @@ import ( "github.com/zeromicro/go-zero/core/logx" ) -const es8Version = "8.0.0" +const es8Version = "v8.0.0" type ( Writer struct { @@ -22,7 +23,7 @@ type ( valueWithIndex struct { index string - val string + val map[string]interface{} } ) @@ -45,13 +46,13 @@ func NewWriter(c config.ElasticSearchConf) (*Writer, error) { writer := Writer{ docType: c.DocType, client: client, - esVersion: version, + esVersion: fmt.Sprintf("v%s", version), } writer.inserter = executors.NewChunkExecutor(writer.execute, executors.WithChunkBytes(c.MaxChunkBytes)) return &writer, nil } -func (w *Writer) Write(index, val string) error { +func (w *Writer) Write(index string, val map[string]interface{}) error { return w.inserter.Add(valueWithIndex{ index: index, val: val, diff --git a/stash/handler/handler.go b/stash/handler/handler.go index 7da902a..4c23b2f 100644 --- a/stash/handler/handler.go +++ b/stash/handler/handler.go @@ -4,6 +4,7 @@ import ( jsoniter "github.com/json-iterator/go" "github.com/kevwan/go-stash/stash/es" "github.com/kevwan/go-stash/stash/filter" + "time" ) type MessageHandler struct { @@ -37,11 +38,7 @@ func (mh *MessageHandler) Consume(_, val string) error { return nil } } + m["timestamp"] = time.Now() - bs, err := jsoniter.Marshal(m) - if err != nil { - return err - } - - return mh.writer.Write(index, string(bs)) + return mh.writer.Write(index, m) }