Skip to content

Commit a51163c

Browse files
author
huanggze
committed
support parser plugin
Signed-off-by: huanggze <[email protected]>
1 parent 3cbf894 commit a51163c

28 files changed

+947
-6
lines changed

PROJECT

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,7 @@ resources:
1616
- group: logging
1717
kind: Output
1818
version: v1alpha2
19+
- group: logging
20+
kind: Parser
21+
version: v1alpha2
1922
version: "2"

api/v1alpha2/fluentbitconfig_types.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ type FluentBitConfigSpec struct {
3535
FilterSelector metav1.LabelSelector `json:"filterSelector,omitempty"`
3636
// Select output plugins
3737
OutputSelector metav1.LabelSelector `json:"outputSelector,omitempty"`
38+
// Select parser plugins
39+
ParserSelector metav1.LabelSelector `json:"parserSelector,omitempty"`
3840
}
3941

4042
type Service struct {
@@ -117,8 +119,7 @@ func (s *Service) Params() *plugins.KVs {
117119
return m
118120
}
119121

120-
func (cfg FluentBitConfig) Render(sl plugins.SecretLoader, inputs InputList, filters FilterList,
121-
outputs OutputList) (string, error) {
122+
func (cfg FluentBitConfig) RenderMainConfig(sl plugins.SecretLoader, inputs InputList, filters FilterList, outputs OutputList) (string, error) {
122123
var buf bytes.Buffer
123124

124125
// The Service defines the global behaviour of the Fluent Bit engine.
@@ -153,3 +154,16 @@ func (cfg FluentBitConfig) Render(sl plugins.SecretLoader, inputs InputList, fil
153154

154155
return buf.String(), nil
155156
}
157+
158+
func (cfg FluentBitConfig) RenderParserConfig(sl plugins.SecretLoader, parsers ParserList) (string, error) {
159+
var buf bytes.Buffer
160+
161+
parserSections, err := parsers.Load(sl)
162+
if err != nil {
163+
return "", err
164+
}
165+
166+
buf.WriteString(parserSections)
167+
168+
return buf.String(), nil
169+
}

api/v1alpha2/parser_types.go

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
7+
http://www.apache.org/licenses/LICENSE-2.0
8+
9+
Unless required by applicable law or agreed to in writing, software
10+
distributed under the License is distributed on an "AS IS" BASIS,
11+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
See the License for the specific language governing permissions and
13+
limitations under the License.
14+
*/
15+
16+
package v1alpha2
17+
18+
import (
19+
"bytes"
20+
"fmt"
21+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
22+
"kubesphere.io/fluentbit-operator/api/v1alpha2/plugins"
23+
"kubesphere.io/fluentbit-operator/api/v1alpha2/plugins/parser"
24+
"reflect"
25+
)
26+
27+
// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
28+
// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.
29+
30+
// ParserSpec defines the desired state of Parser
31+
type ParserSpec struct {
32+
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
33+
// Important: Run "make" to regenerate code after modifying this file
34+
35+
JSON *parser.JSON `json:"json,omitempty"`
36+
Regex *parser.Regex `json:"regex,omitempty"`
37+
LTSV *parser.LSTV `json:"ltsv,omitempty"`
38+
Logfmt *parser.Logfmt `json:"logfmt,omitempty"`
39+
40+
Decoders []Decorder `json:"decoders,omitempty"`
41+
}
42+
43+
type Decorder struct {
44+
DecodeField string `json:"decodeField,omitempty"`
45+
DecodeFieldAs string `json:"decodeFieldAs,omitempty"`
46+
}
47+
48+
// ParserStatus defines the observed state of Parser
49+
type ParserStatus struct {
50+
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
51+
// Important: Run "make" to regenerate code after modifying this file
52+
}
53+
54+
// +kubebuilder:object:root=true
55+
56+
// Parser is the Schema for the parsers API
57+
type Parser struct {
58+
metav1.TypeMeta `json:",inline"`
59+
metav1.ObjectMeta `json:"metadata,omitempty"`
60+
61+
Spec ParserSpec `json:"spec,omitempty"`
62+
Status ParserStatus `json:"status,omitempty"`
63+
}
64+
65+
// +kubebuilder:object:root=true
66+
67+
// ParserList contains a list of Parser
68+
type ParserList struct {
69+
metav1.TypeMeta `json:",inline"`
70+
metav1.ListMeta `json:"metadata,omitempty"`
71+
Items []Parser `json:"items"`
72+
}
73+
74+
func (list ParserList) Load(sl plugins.SecretLoader) (string, error) {
75+
var buf bytes.Buffer
76+
77+
for _, item := range list.Items {
78+
merge := func(p plugins.Plugin) error {
79+
if reflect.ValueOf(p).IsNil() {
80+
return nil
81+
}
82+
83+
buf.WriteString("[PARSER]\n")
84+
buf.WriteString(fmt.Sprintf(" Name %s\n", item.Name))
85+
buf.WriteString(fmt.Sprintf(" Format %s\n", p.Name()))
86+
87+
kvs, err := p.Params(sl)
88+
if err != nil {
89+
return err
90+
}
91+
buf.WriteString(kvs.String())
92+
93+
for _, decorder := range item.Spec.Decoders {
94+
if decorder.DecodeField != "" {
95+
buf.WriteString(fmt.Sprintf(" Decode_Field %s\n", decorder.DecodeField))
96+
}
97+
if decorder.DecodeFieldAs != "" {
98+
buf.WriteString(fmt.Sprintf(" Decode_Field_As %s\n", decorder.DecodeFieldAs))
99+
}
100+
}
101+
return nil
102+
}
103+
104+
for i := 0; i < reflect.ValueOf(item.Spec).NumField()-1; i++ {
105+
p, _ := reflect.ValueOf(item.Spec).Field(i).Interface().(plugins.Plugin)
106+
if err := merge(p); err != nil {
107+
return "", err
108+
}
109+
}
110+
}
111+
112+
return buf.String(), nil
113+
}
114+
115+
func init() {
116+
SchemeBuilder.Register(&Parser{}, &ParserList{})
117+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package parser
2+
3+
import (
4+
"fmt"
5+
"kubesphere.io/fluentbit-operator/api/v1alpha2/plugins"
6+
)
7+
8+
// +kubebuilder:object:generate:=true
9+
10+
// The JSON parser plugin
11+
type JSON struct {
12+
TimeKey string `json:"timeKey,omitempty"`
13+
TimeFormat string `json:"timeFormat,omitempty"`
14+
TimeKeep *bool `json:"timeKeep,omitempty"`
15+
}
16+
17+
func (_ *JSON) Name() string {
18+
return "json"
19+
}
20+
21+
func (j *JSON) Params(_ plugins.SecretLoader) (*plugins.KVs, error) {
22+
kvs := plugins.NewKVs()
23+
if j.TimeKey != "" {
24+
kvs.Insert("Time_Key", j.TimeKey)
25+
}
26+
if j.TimeFormat != "" {
27+
kvs.Insert("Time_Format", j.TimeFormat)
28+
}
29+
if j.TimeKeep != nil {
30+
kvs.Insert("Time_Keep", fmt.Sprint(*j.TimeKeep))
31+
}
32+
return kvs, nil
33+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package parser
2+
3+
import (
4+
"kubesphere.io/fluentbit-operator/api/v1alpha2/plugins"
5+
)
6+
7+
// +kubebuilder:object:generate:=true
8+
9+
// The logfmt parser plugin
10+
type Logfmt struct{}
11+
12+
func (_ *Logfmt) Name() string {
13+
return "logfmt"
14+
}
15+
16+
func (_ *Logfmt) Params(_ plugins.SecretLoader) (*plugins.KVs, error) {
17+
return nil, nil
18+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package parser
2+
3+
import (
4+
"fmt"
5+
"kubesphere.io/fluentbit-operator/api/v1alpha2/plugins"
6+
)
7+
8+
// +kubebuilder:object:generate:=true
9+
10+
// The LSTV parser plugin
11+
type LSTV struct {
12+
TimeKey string `json:"timeKey,omitempty"`
13+
TimeFormat string `json:"timeFormat,omitempty"`
14+
TimeKeep *bool `json:"timeKeep,omitempty"`
15+
Types string `json:"types,omitempty"`
16+
}
17+
18+
func (_ *LSTV) Name() string {
19+
return "ltsv"
20+
}
21+
22+
func (l *LSTV) Params(_ plugins.SecretLoader) (*plugins.KVs, error) {
23+
kvs := plugins.NewKVs()
24+
if l.TimeKey != "" {
25+
kvs.Insert("Time_Key", l.TimeKey)
26+
}
27+
if l.TimeFormat != "" {
28+
kvs.Insert("Time_Format", l.TimeFormat)
29+
}
30+
if l.TimeKeep != nil {
31+
kvs.Insert("Time_Format", fmt.Sprint(*l.TimeKeep))
32+
}
33+
if l.Types != "" {
34+
kvs.Insert("Types", l.Types)
35+
}
36+
return kvs, nil
37+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package parser
2+
3+
import (
4+
"fmt"
5+
"kubesphere.io/fluentbit-operator/api/v1alpha2/plugins"
6+
)
7+
8+
// +kubebuilder:object:generate:=true
9+
10+
// The regex parser plugin
11+
type Regex struct {
12+
Regex string `json:"regex,omitempty"`
13+
TimeKey string `json:"timeKey,omitempty"`
14+
TimeFormat string `json:"timeFormat,omitempty"`
15+
TimeKeep *bool `json:"timeKeep,omitempty"`
16+
Types string `json:"types,omitempty"`
17+
}
18+
19+
func (_ *Regex) Name() string {
20+
return "regex"
21+
}
22+
23+
func (re *Regex) Params(_ plugins.SecretLoader) (*plugins.KVs, error) {
24+
kvs := plugins.NewKVs()
25+
if re.Regex != "" {
26+
kvs.Insert("Regex", re.Regex)
27+
}
28+
if re.TimeKey != "" {
29+
kvs.Insert("Time_Key", re.TimeKey)
30+
}
31+
if re.TimeFormat != "" {
32+
kvs.Insert("Time_Format", re.TimeFormat)
33+
}
34+
if re.TimeKeep != nil {
35+
kvs.Insert("Time_Format", fmt.Sprint(*re.TimeKeep))
36+
}
37+
if re.Types != "" {
38+
kvs.Insert("Types", re.Types)
39+
}
40+
return kvs, nil
41+
}

api/v1alpha2/plugins/parser/zz_generated.deepcopy.go

Lines changed: 97 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)