Skip to content

Commit afae667

Browse files
authored
Merge pull request #371 from deep-splunk/record-key-extraction
Record key extraction
2 parents 3f4db26 + b58585d commit afae667

File tree

8 files changed

+35
-9
lines changed

8 files changed

+35
-9
lines changed

src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java

-2
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,4 @@ private void executeHttpRequest(final HttpUriRequest req, CloseableHttpClient ht
216216
}
217217
}
218218
}
219-
220-
221219
}

src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java

+1
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,7 @@ private Event createHecEventFrom(final SinkRecord record) {
429429
trackMetas.put("kafka_timestamp", String.valueOf(record.timestamp()));
430430
trackMetas.put("kafka_topic", record.topic());
431431
trackMetas.put("kafka_partition", String.valueOf(record.kafkaPartition()));
432+
trackMetas.put("kafka_record_key", String.valueOf(record.key()));
432433
if (HOSTNAME != null)
433434
trackMetas.put("kafka_connect_host", HOSTNAME);
434435
event.addFields(trackMetas);

src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -415,6 +415,7 @@ private void putWithSuccess(boolean raw, boolean withMeta) {
415415
Assert.assertEquals(String.valueOf(1), event.getFields().get("kafka_partition"));
416416
Assert.assertEquals(new UnitUtil(0).configProfile.getTopics(), event.getFields().get("kafka_topic"));
417417
Assert.assertEquals(String.valueOf(0), event.getFields().get("kafka_timestamp"));
418+
Assert.assertEquals("test", event.getFields().get("kafka_record_key"));
418419
j++;
419420
}
420421

@@ -441,7 +442,7 @@ private Collection<SinkRecord> createSinkRecords(int numOfRecords, String value)
441442
private Collection<SinkRecord> createSinkRecords(int numOfRecords, int start, String value) {
442443
List<SinkRecord> records = new ArrayList<>();
443444
for (int i = start; i < start + numOfRecords; i++) {
444-
SinkRecord rec = new SinkRecord(new UnitUtil(0).configProfile.getTopics(), 1, null, null, null, value, i, 0L, TimestampType.NO_TIMESTAMP_TYPE);
445+
SinkRecord rec = new SinkRecord(new UnitUtil(0).configProfile.getTopics(), 1, null, "test", null, value, i, 0L, TimestampType.NO_TIMESTAMP_TYPE);
445446
records.add(rec);
446447
}
447448
return records;

test/conftest.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ def setup(request):
3939
def pytest_configure():
4040
# Generate message data
4141
topics = [config["kafka_topic"], config["kafka_topic_2"], config["kafka_header_topic"],"prototopic",
42-
"test_splunk_hec_malformed_events","epoch_format","date_format"]
42+
"test_splunk_hec_malformed_events","epoch_format","date_format","record_key"]
4343

4444
create_kafka_topics(config, topics)
4545
producer = KafkaProducer(bootstrap_servers=config["kafka_broker_url"],
@@ -67,9 +67,9 @@ def pytest_configure():
6767
('splunk.header.source', b'kafka_custom_header_source'),
6868
('splunk.header.sourcetype', b'kafka_custom_header_sourcetype')]
6969
producer.send(config["kafka_header_topic"], msg, headers=headers_to_send)
70-
7170
producer.send("test_splunk_hec_malformed_events", {})
7271
producer.send("test_splunk_hec_malformed_events", {"&&": "null", "message": ["$$$$****////", 123, None]})
72+
producer.send("record_key",{"timestamp": config['timestamp']},b"{}")
7373
protobuf_producer.send("prototopic",value=b'\x00\x00\x00\x00\x01\x00\n\x011\x12\r10-01-04-3:45\x18\x15%\x00\x00*C*\x02No:\x12\n\x011\x12\x04this\x1a\x07New oneB\x0c\n\x011\x12\x07shampooJ\x04Many')
7474
timestamp_producer.send("date_format",b"{\"id\": \"19\",\"host\":\"host-01\",\"source\":\"bu\",\"fields\":{\"hn\":\"hostname\",\"CLASS\":\"class\",\"cust_id\":\"000013934\",\"time\": \"Jun 13 2010 23:11:52.454 UTC\",\"category\":\"IFdata\",\"ifname\":\"LoopBack7\",\"IFdata.Bits received\":\"0\",\"IFdata.Bits sent\":\"0\"}")
7575
timestamp_producer.send("epoch_format",b"{\"id\": \"19\",\"host\":\"host-01\",\"source\":\"bu\",\"fields\":{\"hn\":\"hostname\",\"CLASS\":\"class\",\"cust_id\":\"000013934\",\"time\": \"1555209605000\",\"category\":\"IFdata\",\"ifname\":\"LoopBack7\",\"IFdata.Bits received\":\"0\",\"IFdata.Bits sent\":\"0\"}")

test/lib/connect_params.py

+7-2
Original file line numberDiff line numberDiff line change
@@ -204,5 +204,10 @@
204204
"splunk_hec_raw": False,
205205
"enable_timestamp_extraction" : "true",
206206
"timestamp_regex": r"\\\"time\\\":\\s*\\\"(?<time>.*?)\"",
207-
"timestamp_format": "epoch"}
208-
]
207+
"timestamp_format": "epoch"},
208+
{"name": "test_extracted_record_key",
209+
"splunk_sourcetypes": "track_record_key",
210+
"topics": "record_key",
211+
"splunk_hec_raw": False,
212+
"splunk_hec_track_data": "true"}
213+
]

test/lib/connector.template

+2-1
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
"value.converter.schemas.enable": "{{value_converter_schemas_enable}}",
5151
"enable.timestamp.extraction": "{{enable_timestamp_extraction}}",
5252
"timestamp.regex": "{{timestamp_regex}}",
53-
"timestamp.format": "{{timestamp_format}}"
53+
"timestamp.format": "{{timestamp_format}}",
54+
"splunk.hec.track.data": "{{splunk_hec_track_data}}"
5455
}
5556
}

test/lib/data_gen.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ def generate_connector_content(input_disc=None):
3838
"value_converter_schemas_enable": "false",
3939
"enable_timestamp_extraction": "false",
4040
"regex": "",
41-
"timestamp_format": ""
41+
"timestamp_format": "",
42+
"splunk_hec_track_data": "false"
4243
}
4344

4445
if input_disc:

test/testcases/test_data_enrichment.py

+19
Original file line numberDiff line numberDiff line change
@@ -59,3 +59,22 @@ def test_line_breaking_configuration(self, setup, test_case, test_input, expecte
5959
setup["timestamp"], setup["timestamp"], setup["timestamp"])
6060
assert actual_raw_data == expected_data, \
6161
f'\nActual value: \n{actual_raw_data} \ndoes not match expected value: \n{expected_data}'
62+
63+
@pytest.mark.parametrize("test_scenario, test_input, expected", [
64+
("record_key_extraction", "sourcetype::track_record_key", "{}"),
65+
])
66+
def test_record_key_data_enrichment(self, setup, test_scenario, test_input, expected):
67+
logger.info(f"testing {test_scenario} input={test_input} expected={expected} event(s)")
68+
search_query = f"index={setup['splunk_index']} | search {test_input} | fields *"
69+
logger.info(search_query)
70+
events = check_events_from_splunk(start_time="-15m@m",
71+
url=setup["splunkd_url"],
72+
user=setup["splunk_user"],
73+
query=[f"search {search_query}"],
74+
password=setup["splunk_password"])
75+
logger.info("Splunk received %s events in the last hour", len(events))
76+
77+
if(len(events)==1):
78+
assert events[0]["kafka_record_key"] == expected
79+
else:
80+
assert False,"No event found or duplicate events found"

0 commit comments

Comments
 (0)