Skip to content

Commit 328d91d

Browse files
iliaxiliax
and
iliax
authored
Smart filters test execution endpoint added (#3656)
Co-authored-by: iliax <[email protected]>
1 parent c743067 commit 328d91d

File tree

4 files changed

+144
-0
lines changed

4 files changed

+144
-0
lines changed

kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import com.provectus.kafka.ui.model.SeekDirectionDTO;
1616
import com.provectus.kafka.ui.model.SeekTypeDTO;
1717
import com.provectus.kafka.ui.model.SerdeUsageDTO;
18+
import com.provectus.kafka.ui.model.SmartFilterTestExecutionDTO;
19+
import com.provectus.kafka.ui.model.SmartFilterTestExecutionResultDTO;
1820
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
1921
import com.provectus.kafka.ui.model.TopicSerdeSuggestionDTO;
2022
import com.provectus.kafka.ui.model.rbac.AccessContext;
@@ -70,6 +72,14 @@ public Mono<ResponseEntity<Void>> deleteTopicMessages(
7072
).doOnEach(sig -> auditService.audit(context, sig));
7173
}
7274

75+
@Override
76+
public Mono<ResponseEntity<SmartFilterTestExecutionResultDTO>> executeSmartFilterTest(
77+
Mono<SmartFilterTestExecutionDTO> smartFilterTestExecutionDto, ServerWebExchange exchange) {
78+
return smartFilterTestExecutionDto
79+
.map(MessagesService::execSmartFilterTest)
80+
.map(ResponseEntity::ok);
81+
}
82+
7383
@Override
7484
public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessages(String clusterName,
7585
String topicName,
@@ -188,4 +198,8 @@ public Mono<ResponseEntity<TopicSerdeSuggestionDTO>> getSerdes(String clusterNam
188198
.map(ResponseEntity::ok)
189199
);
190200
}
201+
202+
203+
204+
191205
}

kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,16 @@
1414
import com.provectus.kafka.ui.model.KafkaCluster;
1515
import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
1616
import com.provectus.kafka.ui.model.SeekDirectionDTO;
17+
import com.provectus.kafka.ui.model.SmartFilterTestExecutionDTO;
18+
import com.provectus.kafka.ui.model.SmartFilterTestExecutionResultDTO;
1719
import com.provectus.kafka.ui.model.TopicMessageDTO;
1820
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
1921
import com.provectus.kafka.ui.serde.api.Serde;
2022
import com.provectus.kafka.ui.serdes.ProducerRecordCreator;
2123
import com.provectus.kafka.ui.util.SslPropertiesUtil;
24+
import java.time.Instant;
25+
import java.time.OffsetDateTime;
26+
import java.time.ZoneOffset;
2227
import java.util.List;
2328
import java.util.Map;
2429
import java.util.Optional;
@@ -81,6 +86,40 @@ private Mono<TopicDescription> withExistingTopic(KafkaCluster cluster, String to
8186
.switchIfEmpty(Mono.error(new TopicNotFoundException()));
8287
}
8388

89+
public static SmartFilterTestExecutionResultDTO execSmartFilterTest(SmartFilterTestExecutionDTO execData) {
90+
Predicate<TopicMessageDTO> predicate;
91+
try {
92+
predicate = MessageFilters.createMsgFilter(
93+
execData.getFilterCode(),
94+
MessageFilterTypeDTO.GROOVY_SCRIPT
95+
);
96+
} catch (Exception e) {
97+
log.info("Smart filter '{}' compilation error", execData.getFilterCode(), e);
98+
return new SmartFilterTestExecutionResultDTO()
99+
.error("Compilation error : " + e.getMessage());
100+
}
101+
try {
102+
var result = predicate.test(
103+
new TopicMessageDTO()
104+
.key(execData.getKey())
105+
.content(execData.getValue())
106+
.headers(execData.getHeaders())
107+
.offset(execData.getOffset())
108+
.partition(execData.getPartition())
109+
.timestamp(
110+
Optional.ofNullable(execData.getTimestampMs())
111+
.map(ts -> OffsetDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC))
112+
.orElse(null))
113+
);
114+
return new SmartFilterTestExecutionResultDTO()
115+
.result(result);
116+
} catch (Exception e) {
117+
log.info("Smart filter {} execution error", execData, e);
118+
return new SmartFilterTestExecutionResultDTO()
119+
.error("Execution error : " + e.getMessage());
120+
}
121+
}
122+
84123
public Mono<Void> deleteTopicMessages(KafkaCluster cluster, String topicName,
85124
List<Integer> partitionsToInclude) {
86125
return withExistingTopic(cluster, topicName)

kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/MessagesServiceTest.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,22 @@
11
package com.provectus.kafka.ui.service;
22

3+
import static com.provectus.kafka.ui.service.MessagesService.execSmartFilterTest;
4+
import static org.assertj.core.api.Assertions.assertThat;
5+
36
import com.provectus.kafka.ui.AbstractIntegrationTest;
47
import com.provectus.kafka.ui.exception.TopicNotFoundException;
58
import com.provectus.kafka.ui.model.ConsumerPosition;
69
import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
710
import com.provectus.kafka.ui.model.KafkaCluster;
811
import com.provectus.kafka.ui.model.SeekDirectionDTO;
912
import com.provectus.kafka.ui.model.SeekTypeDTO;
13+
import com.provectus.kafka.ui.model.SmartFilterTestExecutionDTO;
1014
import com.provectus.kafka.ui.model.TopicMessageDTO;
1115
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
1216
import com.provectus.kafka.ui.producer.KafkaTestProducer;
1317
import com.provectus.kafka.ui.serdes.builtin.StringSerde;
1418
import java.util.List;
19+
import java.util.Map;
1520
import java.util.UUID;
1621
import org.apache.kafka.clients.admin.NewTopic;
1722
import org.junit.jupiter.api.BeforeEach;
@@ -91,4 +96,40 @@ void maskingAppliedOnConfiguredClusters() throws Exception {
9196
}
9297
}
9398

99+
@Test
100+
void execSmartFilterTestReturnsExecutionResult() {
101+
var params = new SmartFilterTestExecutionDTO()
102+
.filterCode("key != null && value != null && headers != null && timestampMs != null && offset != null")
103+
.key("1234")
104+
.value("{ \"some\" : \"value\" } ")
105+
.headers(Map.of("h1", "hv1"))
106+
.offset(12345L)
107+
.timestampMs(System.currentTimeMillis())
108+
.partition(1);
109+
assertThat(execSmartFilterTest(params).getResult()).isTrue();
110+
111+
params.setFilterCode("return false");
112+
assertThat(execSmartFilterTest(params).getResult()).isFalse();
113+
}
114+
115+
@Test
116+
void execSmartFilterTestReturnsErrorOnFilterApplyError() {
117+
var result = execSmartFilterTest(
118+
new SmartFilterTestExecutionDTO()
119+
.filterCode("return 1/0")
120+
);
121+
assertThat(result.getResult()).isNull();
122+
assertThat(result.getError()).containsIgnoringCase("execution error");
123+
}
124+
125+
@Test
126+
void execSmartFilterTestReturnsErrorOnFilterCompilationError() {
127+
var result = execSmartFilterTest(
128+
new SmartFilterTestExecutionDTO()
129+
.filterCode("this is invalid groovy syntax = 1")
130+
);
131+
assertThat(result.getResult()).isNull();
132+
assertThat(result.getError()).containsIgnoringCase("Compilation error");
133+
}
134+
94135
}

kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -625,6 +625,25 @@ paths:
625625
schema:
626626
$ref: '#/components/schemas/TopicSerdeSuggestion'
627627

628+
/api/smartfilters/testexecutions:
629+
put:
630+
tags:
631+
- Messages
632+
summary: executeSmartFilterTest
633+
operationId: executeSmartFilterTest
634+
requestBody:
635+
content:
636+
application/json:
637+
schema:
638+
$ref: '#/components/schemas/SmartFilterTestExecution'
639+
responses:
640+
200:
641+
description: OK
642+
content:
643+
application/json:
644+
schema:
645+
$ref: '#/components/schemas/SmartFilterTestExecutionResult'
646+
628647

629648
/api/clusters/{clusterName}/topics/{topicName}/messages:
630649
get:
@@ -2584,6 +2603,37 @@ components:
25842603
items:
25852604
$ref: '#/components/schemas/ConsumerGroup'
25862605

2606+
SmartFilterTestExecution:
2607+
type: object
2608+
required: [filterCode]
2609+
properties:
2610+
filterCode:
2611+
type: string
2612+
key:
2613+
type: string
2614+
value:
2615+
type: string
2616+
headers:
2617+
type: object
2618+
additionalProperties:
2619+
type: string
2620+
partition:
2621+
type: integer
2622+
offset:
2623+
type: integer
2624+
format: int64
2625+
timestampMs:
2626+
type: integer
2627+
format: int64
2628+
2629+
SmartFilterTestExecutionResult:
2630+
type: object
2631+
properties:
2632+
result:
2633+
type: boolean
2634+
error:
2635+
type: string
2636+
25872637
CreateTopicMessage:
25882638
type: object
25892639
properties:

0 commit comments

Comments
 (0)