Skip to content

Commit 2ba13fc

Browse files
feat: schema detection for k8s events
framework setup for schema detection at the time of ingestion sample json added for k8s events formats file added that holds the list of sample jsons, schema type of all known log sources server loads the known schemas at the initialization at the time of incoming events, it checks if schema of incoming events match with any of the known schema if yes, it adds `schema_type` to the stream info custom flattening is required before storing the schema and ingesting to parseable for those events which have hierarchical structure
1 parent 28b984a commit 2ba13fc

17 files changed

+910
-35
lines changed

src/event/detect_schema.rs

+75
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Parseable Server (C) 2022 - 2024 Parseable, Inc.
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU Affero General Public License as
6+
* published by the Free Software Foundation, either version 3 of the
7+
* License, or (at your option) any later version.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU Affero General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU Affero General Public License
15+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
*
17+
*/
18+
19+
use arrow_json::reader::infer_json_schema_from_iterator;
20+
use arrow_schema::Schema;
21+
use once_cell::sync::OnceCell;
22+
use std::collections::HashMap;
23+
24+
use crate::{event::format::update_data_type_to_datetime, utils::json::flatten_json_body};
25+
26+
// Expose some static variables for internal usage
27+
pub static KNOWN_SCHEMA_LIST: OnceCell<HashMap<String, Schema>> = OnceCell::new();
28+
29+
pub fn detect_schema() -> HashMap<String, Schema> {
30+
let mut known_schema_list: HashMap<String, Schema> = HashMap::new();
31+
//read file formats.json
32+
let formats_file = std::fs::File::open("src/event/known-formats/formats.json").unwrap();
33+
let formats_reader = std::io::BufReader::new(formats_file);
34+
let formats: serde_json::Value = serde_json::from_reader(formats_reader).unwrap();
35+
//iterate over the formats
36+
for format in formats.as_array().unwrap() {
37+
let schema_type = format["schema_type"].as_str().unwrap();
38+
let sample_json_path = format["sample_json_path"].as_str().unwrap();
39+
let sample_file = std::fs::File::open(sample_json_path).unwrap();
40+
let sample_reader = std::io::BufReader::new(sample_file);
41+
let sample_json: serde_json::Value = serde_json::from_reader(sample_reader).unwrap();
42+
let flattened_json = flatten_json_body(sample_json, None, None, None, false).unwrap();
43+
let sample_json_records = [flattened_json.clone()];
44+
let mut schema =
45+
infer_json_schema_from_iterator(sample_json_records.iter().map(Ok)).unwrap();
46+
schema = update_data_type_to_datetime(schema, flattened_json, Vec::new());
47+
known_schema_list.insert(schema_type.to_string(), schema);
48+
}
49+
prepare_known_schema_list(known_schema_list.clone());
50+
known_schema_list
51+
}
52+
53+
pub fn prepare_known_schema_list(known_schema_list: HashMap<String, Schema>) {
54+
KNOWN_SCHEMA_LIST
55+
.set(known_schema_list)
56+
.expect("only set once")
57+
}
58+
59+
pub fn get_known_schema_list() -> &'static HashMap<String, Schema> {
60+
KNOWN_SCHEMA_LIST
61+
.get()
62+
.expect("fetch schema list from static variable")
63+
}
64+
65+
pub fn validate_schema_type(schema: &Schema) -> String {
66+
let known_schema_list = get_known_schema_list();
67+
let mut schema_type = String::default();
68+
for (known_schema_type, known_schema) in known_schema_list.iter() {
69+
if known_schema == schema {
70+
schema_type = known_schema_type.to_string();
71+
break;
72+
}
73+
}
74+
schema_type
75+
}

src/event/known-formats/formats.json

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
[
2+
{
3+
"name": "kubernetes",
4+
"schema_type": "kubernetes-events",
5+
"sample_json_path": "src/event/known-formats/kubernetes-events-sample/kubernetes-events-sample.json"
6+
}
7+
]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
{
2+
"apiVersion": "v1",
3+
"items": [
4+
{
5+
"apiVersion": "v1",
6+
"count": 1,
7+
"eventTime": null,
8+
"firstTimestamp": "2024-11-08T10:17:18Z",
9+
"involvedObject": {
10+
"apiVersion": "v1",
11+
"fieldPath": "spec.containers{vantage-kubernetes-agent}",
12+
"kind": "Pod",
13+
"name": "vka-vantage-kubernetes-agent-0",
14+
"namespace": "vantage",
15+
"resourceVersion": "15629581",
16+
"uid": "3fa579b0-0c6f-4f44-a320-69389c8f607a"
17+
},
18+
"kind": "Event",
19+
"lastTimestamp": "2024-11-08T10:17:18Z",
20+
"message": "Stopping container vantage-kubernetes-agent",
21+
"metadata": {
22+
"creationTimestamp": "2024-11-08T10:17:18Z",
23+
"name": "vka-vantage-kubernetes-agent-0.1805f6d7de4bc710",
24+
"namespace": "vantage",
25+
"resourceVersion": "25741805",
26+
"uid": "629a5864-06de-414d-8ad7-b7637b8cbfa0"
27+
},
28+
"reason": "Killing",
29+
"reportingComponent": "kubelet",
30+
"reportingInstance": "ip-10-0-2-170.ec2.internal",
31+
"source": {
32+
"component": "kubelet",
33+
"host": "ip-10-0-2-170.ec2.internal"
34+
},
35+
"type": "Normal"
36+
},
37+
{
38+
"apiVersion": "v1",
39+
"count": 1,
40+
"eventTime": null,
41+
"firstTimestamp": "2024-11-08T10:17:19Z",
42+
"involvedObject": {
43+
"apiVersion": "v1",
44+
"kind": "Pod",
45+
"name": "vka-vantage-kubernetes-agent-0",
46+
"namespace": "vantage",
47+
"resourceVersion": "25741822",
48+
"uid": "0118c8be-55df-40bf-96ed-41bb11b5a771"
49+
},
50+
"kind": "Event",
51+
"lastTimestamp": "2024-11-08T10:17:19Z",
52+
"message": "Successfully assigned vantage/vka-vantage-kubernetes-agent-0 to ip-10-0-2-170.ec2.internal",
53+
"metadata": {
54+
"creationTimestamp": "2024-11-08T10:17:19Z",
55+
"name": "vka-vantage-kubernetes-agent-0.1805f6d80c652af1",
56+
"namespace": "vantage",
57+
"resourceVersion": "25741826",
58+
"uid": "e1dab7eb-ab65-44be-9b75-2f400cd70275"
59+
},
60+
"reason": "Scheduled",
61+
"reportingComponent": "default-scheduler",
62+
"reportingInstance": "",
63+
"source": {
64+
"component": "default-scheduler"
65+
},
66+
"type": "Normal"
67+
},
68+
{
69+
"apiVersion": "v1",
70+
"count": 1,
71+
"eventTime": null,
72+
"firstTimestamp": "2024-11-08T10:17:22Z",
73+
"involvedObject": {
74+
"apiVersion": "v1",
75+
"fieldPath": "spec.containers{vantage-kubernetes-agent}",
76+
"kind": "Pod",
77+
"name": "vka-vantage-kubernetes-agent-0",
78+
"namespace": "vantage",
79+
"resourceVersion": "25741823",
80+
"uid": "0118c8be-55df-40bf-96ed-41bb11b5a771"
81+
},
82+
"kind": "Event",
83+
"lastTimestamp": "2024-11-08T10:17:22Z",
84+
"message": "Container image \"quay.io/vantage-sh/kubernetes-agent:1.0.26\" already present on machine",
85+
"metadata": {
86+
"creationTimestamp": "2024-11-08T10:17:22Z",
87+
"name": "vka-vantage-kubernetes-agent-0.1805f6d8d0c1d741",
88+
"namespace": "vantage",
89+
"resourceVersion": "25741846",
90+
"uid": "6c9c24bb-4ff3-486f-8151-91d1dad159ee"
91+
},
92+
"reason": "Pulled",
93+
"reportingComponent": "kubelet",
94+
"reportingInstance": "ip-10-0-2-170.ec2.internal",
95+
"source": {
96+
"component": "kubelet",
97+
"host": "ip-10-0-2-170.ec2.internal"
98+
},
99+
"type": "Normal"
100+
},
101+
{
102+
"apiVersion": "v1",
103+
"count": 1,
104+
"eventTime": null,
105+
"firstTimestamp": "2024-11-08T10:17:22Z",
106+
"involvedObject": {
107+
"apiVersion": "v1",
108+
"fieldPath": "spec.containers{vantage-kubernetes-agent}",
109+
"kind": "Pod",
110+
"name": "vka-vantage-kubernetes-agent-0",
111+
"namespace": "vantage",
112+
"resourceVersion": "25741823",
113+
"uid": "0118c8be-55df-40bf-96ed-41bb11b5a771"
114+
},
115+
"kind": "Event",
116+
"lastTimestamp": "2024-11-08T10:17:22Z",
117+
"message": "Created container vantage-kubernetes-agent",
118+
"metadata": {
119+
"creationTimestamp": "2024-11-08T10:17:22Z",
120+
"name": "vka-vantage-kubernetes-agent-0.1805f6d8d271c600",
121+
"namespace": "vantage",
122+
"resourceVersion": "25741847",
123+
"uid": "d23e308a-b17e-42ba-a5ed-3a55c3d9e0d2"
124+
},
125+
"reason": "Created",
126+
"reportingComponent": "kubelet",
127+
"reportingInstance": "ip-10-0-2-170.ec2.internal",
128+
"source": {
129+
"component": "kubelet",
130+
"host": "ip-10-0-2-170.ec2.internal"
131+
},
132+
"type": "Normal"
133+
},
134+
{
135+
"apiVersion": "v1",
136+
"count": 1,
137+
"eventTime": null,
138+
"firstTimestamp": "2024-11-08T10:17:22Z",
139+
"involvedObject": {
140+
"apiVersion": "v1",
141+
"fieldPath": "spec.containers{vantage-kubernetes-agent}",
142+
"kind": "Pod",
143+
"name": "vka-vantage-kubernetes-agent-0",
144+
"namespace": "vantage",
145+
"resourceVersion": "25741823",
146+
"uid": "0118c8be-55df-40bf-96ed-41bb11b5a771"
147+
},
148+
"kind": "Event",
149+
"lastTimestamp": "2024-11-08T10:17:22Z",
150+
"message": "Started container vantage-kubernetes-agent",
151+
"metadata": {
152+
"creationTimestamp": "2024-11-08T10:17:23Z",
153+
"name": "vka-vantage-kubernetes-agent-0.1805f6d8d87a3795",
154+
"namespace": "vantage",
155+
"resourceVersion": "25741848",
156+
"uid": "e48c06da-3fbf-41a1-8685-6224854f0391"
157+
},
158+
"reason": "Started",
159+
"reportingComponent": "kubelet",
160+
"reportingInstance": "ip-10-0-2-170.ec2.internal",
161+
"source": {
162+
"component": "kubelet",
163+
"host": "ip-10-0-2-170.ec2.internal"
164+
},
165+
"type": "Normal"
166+
},
167+
{
168+
"apiVersion": "v1",
169+
"count": 1,
170+
"eventTime": null,
171+
"firstTimestamp": "2024-11-08T10:17:23Z",
172+
"involvedObject": {
173+
"apiVersion": "v1",
174+
"fieldPath": "spec.containers{vantage-kubernetes-agent}",
175+
"kind": "Pod",
176+
"name": "vka-vantage-kubernetes-agent-0",
177+
"namespace": "vantage",
178+
"resourceVersion": "25741823",
179+
"uid": "0118c8be-55df-40bf-96ed-41bb11b5a771"
180+
},
181+
"kind": "Event",
182+
"lastTimestamp": "2024-11-08T10:17:23Z",
183+
"message": "Readiness probe failed: Get \"http://10.0.2.143:9010/healthz\": dial tcp 10.0.2.143:9010: connect: connection refused",
184+
"metadata": {
185+
"creationTimestamp": "2024-11-08T10:17:23Z",
186+
"name": "vka-vantage-kubernetes-agent-0.1805f6d8f61959d7",
187+
"namespace": "vantage",
188+
"resourceVersion": "25741851",
189+
"uid": "6199c62b-9ca5-4c46-abcb-53137ed24c47"
190+
},
191+
"reason": "Unhealthy",
192+
"reportingComponent": "kubelet",
193+
"reportingInstance": "ip-10-0-2-170.ec2.internal",
194+
"source": {
195+
"component": "kubelet",
196+
"host": "ip-10-0-2-170.ec2.internal"
197+
},
198+
"type": "Warning"
199+
},
200+
{
201+
"apiVersion": "v1",
202+
"count": 1,
203+
"eventTime": null,
204+
"firstTimestamp": "2024-11-08T10:17:19Z",
205+
"involvedObject": {
206+
"apiVersion": "apps/v1",
207+
"kind": "StatefulSet",
208+
"name": "vka-vantage-kubernetes-agent",
209+
"namespace": "vantage",
210+
"resourceVersion": "25741814",
211+
"uid": "3f91d728-f31f-4582-8639-df259d97ac55"
212+
},
213+
"kind": "Event",
214+
"lastTimestamp": "2024-11-08T10:17:19Z",
215+
"message": "create Pod vka-vantage-kubernetes-agent-0 in StatefulSet vka-vantage-kubernetes-agent successful",
216+
"metadata": {
217+
"creationTimestamp": "2024-11-08T10:17:19Z",
218+
"name": "vka-vantage-kubernetes-agent.1805f6d80bd97994",
219+
"namespace": "vantage",
220+
"resourceVersion": "25741827",
221+
"uid": "c5bf4dee-649f-48ba-b6da-c6ccf4e9262c"
222+
},
223+
"reason": "SuccessfulCreate",
224+
"reportingComponent": "statefulset-controller",
225+
"reportingInstance": "",
226+
"source": {
227+
"component": "statefulset-controller"
228+
},
229+
"type": "Normal"
230+
}
231+
],
232+
"kind": "List",
233+
"metadata": {
234+
"resourceVersion": ""
235+
}
236+
}

0 commit comments

Comments
 (0)