@@ -3,6 +3,7 @@ package scraper
3
3
import (
4
4
"bytes"
5
5
"context"
6
+ "encoding/json"
6
7
"errors"
7
8
"fmt"
8
9
"math"
@@ -757,59 +758,69 @@ func getDerivedMetrics(mfs []*dto.MetricFamily, summaries map[uint64]prometheus.
757
758
}
758
759
759
760
func (s Scraper ) extractLogs (t time.Time , logs []byte , sharedLabels []labelPair ) Streams {
760
- var line strings.Builder
761
-
762
- dec := logfmt .NewDecoder (bytes .NewReader (logs ))
763
-
764
761
labels := make ([]labelPair , 0 , len (sharedLabels ))
765
762
var entries []logproto.Entry
766
- RECORD:
767
- for dec .ScanRecord () {
768
- var t time.Time
769
763
770
- line .Reset ()
764
+ // Split logs into lines and process each JSON line
765
+ lines := strings .Split (string (logs ), "\n " )
766
+ for _ , line := range lines {
767
+ line = strings .TrimSpace (line )
768
+ if line == "" {
769
+ continue
770
+ }
771
771
772
- enc := logfmt .NewEncoder (& line )
772
+ // Parse JSON log entry
773
+ var logEntry map [string ]interface {}
774
+ if err := json .Unmarshal ([]byte (line ), & logEntry ); err != nil {
775
+ s .logger .Warn ().Err (err ).Str ("line" , line ).Msg ("invalid JSON log entry" )
776
+ continue
777
+ }
773
778
774
- labels = labels [: 0 ]
775
- labels = append ( labels , sharedLabels ... )
779
+ // Extract timestamp
780
+ logTime := s . extractLogTimestamp ( logEntry , t )
776
781
777
- for dec .ScanKeyval () {
778
- value := dec .Value ()
782
+ // Convert JSON back to logfmt format for consistency
783
+ var logfmtLine strings.Builder
784
+ enc := logfmt .NewEncoder (& logfmtLine )
779
785
780
- switch key := dec .Key (); string (key ) {
781
- case "ts" :
782
- var err error
783
- t , err = time .Parse (time .RFC3339Nano , string (value ))
784
- if err != nil {
785
- // We should never hit this as the timestamp string in the log should be valid.
786
- // Without a timestamp we cannot do anything. And we cannot use something like
787
- // time.Now() because that would mess up other entries
788
- s .logger .Warn ().Err (err ).Bytes ("value" , value ).Msg ("invalid timestamp scanning logs" )
789
- continue RECORD
790
- }
786
+ for key , value := range logEntry {
787
+ // Skip the time field as it's handled separately
788
+ if key == "time" {
789
+ continue
790
+ }
791
791
792
+ var valueStr string
793
+ switch v := value .(type ) {
794
+ case string :
795
+ valueStr = v
796
+ case float64 :
797
+ valueStr = strconv .FormatFloat (v , 'f' , - 1 , 64 )
798
+ case bool :
799
+ valueStr = strconv .FormatBool (v )
800
+ case nil :
801
+ valueStr = ""
792
802
default :
793
- if err := enc .EncodeKeyval (key , value ); err != nil {
794
- // We should never hit this because all the entries are valid.
795
- s .logger .Warn ().Err (err ).Bytes ("key" , key ).Bytes ("value" , value ).Msg ("invalid entry scanning logs" )
796
- continue RECORD
797
- }
803
+ valueStr = fmt .Sprintf ("%v" , v )
804
+ }
805
+
806
+ if err := enc .EncodeKeyval ([]byte (key ), []byte (valueStr )); err != nil {
807
+ s .logger .Warn ().Err (err ).Str ("key" , key ).Str ("value" , valueStr ).Msg ("invalid logfmt encoding" )
808
+ continue
798
809
}
799
810
}
800
811
801
812
if err := enc .EndRecord (); err != nil {
802
813
s .logger .Warn ().Err (err ).Msg ("encoding logs" )
803
814
}
815
+
804
816
entries = append (entries , logproto.Entry {
805
- Timestamp : t ,
806
- Line : line .String (),
817
+ Timestamp : logTime ,
818
+ Line : logfmtLine .String (),
807
819
})
808
820
}
809
821
810
- if err := dec .Err (); err != nil {
811
- s .logger .Error ().Err (err ).Msg ("decoding logs" )
812
- }
822
+ // Copy shared labels
823
+ labels = append (labels , sharedLabels ... )
813
824
814
825
return Streams {
815
826
logproto.Stream {
@@ -819,6 +830,28 @@ RECORD:
819
830
}
820
831
}
821
832
833
+ // extractLogTimestamp extracts and parses the timestamp from a log entry
834
+ func (s Scraper ) extractLogTimestamp (logEntry map [string ]interface {}, fallbackTime time.Time ) time.Time {
835
+ // Try to get timestamp as string first
836
+ if tsStr , ok := logEntry ["time" ].(string ); ok {
837
+ if ts , err := time .Parse (time .RFC3339Nano , tsStr ); err == nil {
838
+ return ts
839
+ }
840
+ // If string parsing fails, try as float64 (Unix timestamp)
841
+ if tsFloat , ok := logEntry ["time" ].(float64 ); ok {
842
+ return time .Unix (int64 (tsFloat ), 0 )
843
+ }
844
+ }
845
+
846
+ // Try to get timestamp as float64 directly
847
+ if tsFloat , ok := logEntry ["time" ].(float64 ); ok {
848
+ return time .Unix (int64 (tsFloat ), 0 )
849
+ }
850
+
851
+ // Fallback to provided time
852
+ return fallbackTime
853
+ }
854
+
822
855
func (s Scraper ) extractTimeseries (t time.Time , metrics []* dto.MetricFamily , sharedLabels []labelPair ) TimeSeries {
823
856
return extractTimeseries (t , metrics , sharedLabels , s .summaries , s .histograms , s .logger )
824
857
}
0 commit comments