Skip to content

Commit 9e1f9d6

Browse files
germanosinCopilot
andauthored
BE: Issue#1332 Sort based on prefix offsets (#1421)
Co-authored-by: Copilot <[email protected]>
1 parent aa4a489 commit 9e1f9d6

25 files changed

+598
-46
lines changed

api/src/main/java/io/kafbat/ui/config/ClustersProperties.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ public static class CacheProperties {
236236
public static class NgramProperties {
237237
int ngramMin = 1;
238238
int ngramMax = 4;
239+
boolean distanceScore = true;
239240
}
240241

241242
@Data
@@ -244,10 +245,10 @@ public static class NgramProperties {
244245
public static class ClusterFtsProperties {
245246
boolean enabled = true;
246247
boolean defaultEnabled = false;
247-
NgramProperties schemas = new NgramProperties(1, 4);
248-
NgramProperties consumers = new NgramProperties(1, 4);
249-
NgramProperties connect = new NgramProperties(1, 4);
250-
NgramProperties acl = new NgramProperties(1, 4);
248+
NgramProperties schemas = new NgramProperties(1, 4, true);
249+
NgramProperties consumers = new NgramProperties(1, 4, true);
250+
NgramProperties connect = new NgramProperties(1, 4, true);
251+
NgramProperties acl = new NgramProperties(1, 4, true);
251252

252253
public boolean use(Boolean request) {
253254
if (enabled) {

api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.kafbat.ui.service.mcp.McpTool;
2424
import java.util.Comparator;
2525
import java.util.Map;
26+
import java.util.Optional;
2627
import java.util.Set;
2728
import javax.validation.Valid;
2829
import lombok.RequiredArgsConstructor;
@@ -141,15 +142,18 @@ public Mono<ResponseEntity<Flux<FullConnectorInfoDTO>>> getAllConnectors(
141142
.operationName("getAllConnectors")
142143
.build();
143144

145+
var maybeComparator = Optional.ofNullable(orderBy).map(this::getConnectorsComparator);
146+
144147
var comparator = sortOrder == null || sortOrder.equals(SortOrderDTO.ASC)
145-
? getConnectorsComparator(orderBy)
146-
: getConnectorsComparator(orderBy).reversed();
148+
? maybeComparator
149+
: maybeComparator.map(Comparator::reversed);
150+
151+
Flux<FullConnectorInfoDTO> connectors = kafkaConnectService.getAllConnectors(getCluster(clusterName), search, fts)
152+
.filterWhen(dto -> accessControlService.isConnectAccessible(dto.getConnect(), clusterName));
147153

148-
Flux<FullConnectorInfoDTO> job = kafkaConnectService.getAllConnectors(getCluster(clusterName), search, fts)
149-
.filterWhen(dto -> accessControlService.isConnectAccessible(dto.getConnect(), clusterName))
150-
.sort(comparator);
154+
Flux<FullConnectorInfoDTO> sorted = comparator.map(connectors::sort).orElse(connectors);
151155

152-
return Mono.just(ResponseEntity.ok(job))
156+
return Mono.just(ResponseEntity.ok(sorted))
153157
.doOnEach(sig -> audit(context, sig));
154158
}
155159

@@ -284,9 +288,7 @@ private Comparator<FullConnectorInfoDTO> getConnectorsComparator(ConnectorColumn
284288
FullConnectorInfoDTO::getName,
285289
Comparator.nullsFirst(Comparator.naturalOrder())
286290
);
287-
if (orderBy == null) {
288-
return defaultComparator;
289-
}
291+
290292
return switch (orderBy) {
291293
case CONNECT -> Comparator.comparing(
292294
FullConnectorInfoDTO::getConnect,

api/src/main/java/io/kafbat/ui/controller/SchemasController.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.Comparator;
2626
import java.util.List;
2727
import java.util.Map;
28+
import java.util.Optional;
2829
import javax.validation.Valid;
2930
import lombok.RequiredArgsConstructor;
3031
import lombok.extern.slf4j.Slf4j;
@@ -244,11 +245,15 @@ public Mono<ResponseEntity<SchemaSubjectsResponseDTO>> getSchemas(String cluster
244245

245246
List<String> subjectsToRetrieve;
246247
boolean paginate = true;
247-
var schemaComparator = getComparatorForSchema(orderBy);
248-
final Comparator<SubjectWithCompatibilityLevel> comparator =
249-
sortOrder == null || !sortOrder.equals(SortOrderDTO.DESC)
250-
? schemaComparator : schemaComparator.reversed();
248+
249+
var schemaComparator = Optional.ofNullable(orderBy).map(this::getComparatorForSchema);
250+
var comparator = sortOrder == null || !sortOrder.equals(SortOrderDTO.DESC)
251+
? schemaComparator : schemaComparator.map(Comparator::reversed);
252+
251253
if (orderBy == null || SchemaColumnsToSortDTO.SUBJECT.equals(orderBy)) {
254+
if (orderBy != null) {
255+
filteredSubjects.sort(Comparator.nullsFirst(Comparator.naturalOrder()));
256+
}
252257
if (SortOrderDTO.DESC.equals(sortOrder)) {
253258
filteredSubjects.sort(Comparator.nullsFirst(Comparator.reverseOrder()));
254259
}
@@ -274,11 +279,13 @@ public Mono<ResponseEntity<SchemaSubjectsResponseDTO>> getSchemas(String cluster
274279

275280
private List<SubjectWithCompatibilityLevel> paginateSchemas(
276281
List<SubjectWithCompatibilityLevel> subjects,
277-
Comparator<SubjectWithCompatibilityLevel> comparator,
282+
Optional<Comparator<SubjectWithCompatibilityLevel>> comparator,
278283
boolean paginate,
279284
int pageSize,
280285
int subjectToSkip) {
281-
subjects.sort(comparator);
286+
287+
comparator.ifPresent(subjects::sort);
288+
282289
if (paginate) {
283290
return subjects.subList(subjectToSkip, Math.min(subjectToSkip + pageSize, subjects.size()));
284291
} else {

api/src/main/java/io/kafbat/ui/service/acl/AclsService.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,7 @@ public Flux<AclBinding> listAcls(KafkaCluster cluster, ResourcePatternFilter fil
7575
Boolean fts) {
7676
return adminClientService.get(cluster)
7777
.flatMap(c -> c.listAcls(filter))
78-
.flatMapIterable(acls -> acls)
79-
.filter(acl -> principalSearch == null || acl.entry().principal().contains(principalSearch))
80-
.collectList()
81-
.map(lst -> filter(lst, principalSearch, fts))
78+
.map(lst -> filter(new ArrayList<>(lst), principalSearch, fts))
8279
.flatMapMany(Flux::fromIterable)
8380
.sort(Comparator.comparing(AclBinding::toString)); //sorting to keep stable order on different calls
8481
}

api/src/main/java/io/kafbat/ui/service/index/AclBindingNgramFilter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ public class AclBindingNgramFilter extends NgramFilter<AclBinding> {
1111
private final List<Tuple2<List<String>, AclBinding>> bindings;
1212

1313
public AclBindingNgramFilter(Collection<AclBinding> bindings) {
14-
this(bindings, true, new ClustersProperties.NgramProperties(1, 4));
14+
this(bindings, true, new ClustersProperties.NgramProperties(1, 4, true));
1515
}
1616

1717
public AclBindingNgramFilter(

api/src/main/java/io/kafbat/ui/service/index/ConsumerGroupFilter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ public class ConsumerGroupFilter extends NgramFilter<ConsumerGroupListing> {
1111
private final List<Tuple2<List<String>, ConsumerGroupListing>> groups;
1212

1313
public ConsumerGroupFilter(Collection<ConsumerGroupListing> groups) {
14-
this(groups, true, new ClustersProperties.NgramProperties(1, 4));
14+
this(groups, true, new ClustersProperties.NgramProperties(1, 4, true));
1515
}
1616

1717
public ConsumerGroupFilter(

api/src/main/java/io/kafbat/ui/service/index/KafkaConnectNgramFilter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ public class KafkaConnectNgramFilter extends NgramFilter<FullConnectorInfoDTO> {
1111
private final List<Tuple2<List<String>, FullConnectorInfoDTO>> connectors;
1212

1313
public KafkaConnectNgramFilter(Collection<FullConnectorInfoDTO> connectors) {
14-
this(connectors, true, new ClustersProperties.NgramProperties(1, 4));
14+
this(connectors, true, new ClustersProperties.NgramProperties(1, 4, true));
1515
}
1616

1717
public KafkaConnectNgramFilter(

api/src/main/java/io/kafbat/ui/service/index/LuceneTopicsIndex.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
import io.kafbat.ui.model.InternalTopic;
44
import io.kafbat.ui.model.InternalTopicConfig;
5+
import io.kafbat.ui.service.index.lucene.IndexedTextField;
6+
import io.kafbat.ui.service.index.lucene.NameDistanceScoringFunction;
7+
import io.kafbat.ui.service.index.lucene.ShortWordAnalyzer;
58
import java.io.IOException;
69
import java.io.UncheckedIOException;
710
import java.util.ArrayList;
@@ -18,11 +21,11 @@
1821
import org.apache.lucene.document.IntPoint;
1922
import org.apache.lucene.document.LongPoint;
2023
import org.apache.lucene.document.StringField;
21-
import org.apache.lucene.document.TextField;
2224
import org.apache.lucene.index.DirectoryReader;
2325
import org.apache.lucene.index.IndexWriter;
2426
import org.apache.lucene.index.IndexWriterConfig;
2527
import org.apache.lucene.index.Term;
28+
import org.apache.lucene.queries.function.FunctionScoreQuery;
2629
import org.apache.lucene.queryparser.classic.ParseException;
2730
import org.apache.lucene.queryparser.classic.QueryParser;
2831
import org.apache.lucene.search.BooleanClause;
@@ -59,11 +62,13 @@ public LuceneTopicsIndex(List<InternalTopic> topics) throws IOException {
5962

6063
private Directory build(List<InternalTopic> topics) {
6164
Directory directory = new ByteBuffersDirectory();
65+
6266
try (IndexWriter directoryWriter = new IndexWriter(directory, new IndexWriterConfig(this.analyzer))) {
6367
for (InternalTopic topic : topics) {
6468
Document doc = new Document();
69+
6570
doc.add(new StringField(FIELD_NAME_RAW, topic.getName(), Field.Store.YES));
66-
doc.add(new TextField(FIELD_NAME, topic.getName(), Field.Store.NO));
71+
doc.add(new IndexedTextField(FIELD_NAME, topic.getName(), Field.Store.YES));
6772
doc.add(new IntPoint(FIELD_PARTITIONS, topic.getPartitionCount()));
6873
doc.add(new IntPoint(FIELD_REPLICATION, topic.getReplicationFactor()));
6974
doc.add(new LongPoint(FIELD_SIZE, topic.getSegmentSize()));
@@ -117,9 +122,9 @@ public List<InternalTopic> find(String search, Boolean showInternal,
117122
closeLock.readLock().lock();
118123
try {
119124

120-
QueryParser queryParser = new PrefixQueryParser(FIELD_NAME, this.analyzer);
125+
PrefixQueryParser queryParser = new PrefixQueryParser(FIELD_NAME, this.analyzer);
121126
queryParser.setDefaultOperator(QueryParser.Operator.AND);
122-
Query nameQuery = queryParser.parse(search);;
127+
Query nameQuery = queryParser.parse(search);
123128

124129
Query internalFilter = new TermQuery(new Term(FIELD_INTERNAL, "true"));
125130

@@ -129,6 +134,12 @@ public List<InternalTopic> find(String search, Boolean showInternal,
129134
queryBuilder.add(internalFilter, BooleanClause.Occur.MUST_NOT);
130135
}
131136

137+
BooleanQuery combined = queryBuilder.build();
138+
Query wrapped = new FunctionScoreQuery(
139+
combined,
140+
new NameDistanceScoringFunction(FIELD_NAME, queryParser.getPrefixes())
141+
);
142+
132143
List<SortField> sortFields = new ArrayList<>();
133144
sortFields.add(SortField.FIELD_SCORE);
134145
if (!sortField.equals(FIELD_NAME)) {
@@ -137,7 +148,7 @@ public List<InternalTopic> find(String search, Boolean showInternal,
137148

138149
Sort sort = new Sort(sortFields.toArray(new SortField[0]));
139150

140-
TopDocs result = this.indexSearcher.search(queryBuilder.build(), count != null ? count : this.maxSize, sort);
151+
TopDocs result = this.indexSearcher.search(wrapped, count != null ? count : this.maxSize, sort);
141152

142153
List<String> topics = new ArrayList<>();
143154
for (ScoreDoc scoreDoc : result.scoreDocs) {

api/src/main/java/io/kafbat/ui/service/index/NgramFilter.java

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@
2222
public abstract class NgramFilter<T> {
2323
private final Analyzer analyzer;
2424
private final boolean enabled;
25+
private final boolean distanceScore;
2526

2627
public NgramFilter(ClustersProperties.NgramProperties properties, boolean enabled) {
2728
this.enabled = enabled;
29+
this.distanceScore = properties.isDistanceScore();
2830
this.analyzer = new ShortWordNGramAnalyzer(properties.getNgramMin(), properties.getNgramMax(), false);
2931
}
3032

@@ -52,15 +54,25 @@ public List<T> find(String search, Comparator<T> comparator) {
5254
try {
5355
List<SearchResult<T>> result = new ArrayList<>();
5456
List<String> queryTokens = tokenizeString(analyzer, search);
55-
Map<String, Integer> queryFreq = termFreq(queryTokens);
57+
Map<String, Integer> queryFreq = Map.of();
58+
59+
if (!distanceScore) {
60+
queryFreq = termFreq(queryTokens);
61+
}
5662

5763
for (Tuple2<List<String>, T> item : getItems()) {
5864
for (String field : item.getT1()) {
5965
List<String> itemTokens = tokenizeString(analyzer, field);
6066
HashSet<String> itemTokensSet = new HashSet<>(itemTokens);
6167
if (itemTokensSet.containsAll(queryTokens)) {
62-
double score = cosineSimilarity(queryFreq, itemTokens);
68+
double score;
69+
if (distanceScore) {
70+
score = distanceSimilarity(queryTokens, itemTokens);
71+
} else {
72+
score = cosineSimilarity(queryFreq, itemTokens);
73+
}
6374
result.add(new SearchResult<>(item.getT2(), score));
75+
break;
6476
}
6577
}
6678
}
@@ -77,6 +89,22 @@ public List<T> find(String search, Comparator<T> comparator) {
7789
}
7890
}
7991

92+
private double distanceSimilarity(List<String> queryTokens, List<String> itemTokens) {
93+
int smallest = Integer.MAX_VALUE;
94+
for (String queryToken : queryTokens) {
95+
int i = itemTokens.indexOf(queryToken);
96+
if (i >= 0) {
97+
smallest = Math.min(smallest, i);
98+
}
99+
}
100+
101+
if (smallest == Integer.MAX_VALUE) {
102+
return 1.0;
103+
} else {
104+
return 1.0 / (1.0 + smallest);
105+
}
106+
}
107+
80108
private List<T> list(Stream<T> stream, Comparator<T> comparator) {
81109
if (comparator != null) {
82110
return stream.sorted(comparator).toList();
@@ -94,7 +122,7 @@ static List<String> tokenizeString(Analyzer analyzer, String text) {
94122
}
95123

96124
@SneakyThrows
97-
static List<String> tokenizeStringSimple(Analyzer analyzer, String text) {
125+
public static List<String> tokenizeStringSimple(Analyzer analyzer, String text) {
98126
List<String> tokens = new ArrayList<>();
99127
try (TokenStream tokenStream = analyzer.tokenStream(null, text)) {
100128
CharTermAttribute attr = tokenStream.addAttribute(CharTermAttribute.class);

api/src/main/java/io/kafbat/ui/service/index/PrefixQueryParser.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
import static org.apache.lucene.search.BoostAttribute.DEFAULT_BOOST;
44

55
import io.kafbat.ui.service.index.TopicsIndex.FieldType;
6+
import java.util.ArrayList;
67
import java.util.List;
8+
import java.util.Objects;
79
import java.util.Optional;
810
import org.apache.lucene.analysis.Analyzer;
911
import org.apache.lucene.document.IntPoint;
@@ -14,10 +16,11 @@
1416
import org.apache.lucene.search.PrefixQuery;
1517
import org.apache.lucene.search.Query;
1618
import org.apache.lucene.search.TermQuery;
17-
import org.apache.lucene.search.TermRangeQuery;
1819

1920
public class PrefixQueryParser extends QueryParser {
2021

22+
private final List<String> prefixes = new ArrayList<>();
23+
2124
public PrefixQueryParser(String field, Analyzer analyzer) {
2225
super(field, analyzer);
2326
}
@@ -60,7 +63,13 @@ protected Query newTermQuery(Term term, float boost) {
6063
.orElse(FieldType.STRING);
6164

6265
Query query = switch (fieldType) {
63-
case STRING -> new PrefixQuery(term);
66+
case STRING -> {
67+
if (Objects.equals(term.field(), field)) {
68+
prefixes.add(term.text());
69+
}
70+
71+
yield new PrefixQuery(term);
72+
}
6473
case INT -> IntPoint.newExactQuery(term.field(), Integer.parseInt(term.text()));
6574
case LONG -> LongPoint.newExactQuery(term.field(), Long.parseLong(term.text()));
6675
case BOOLEAN -> new TermQuery(term);
@@ -72,4 +81,7 @@ protected Query newTermQuery(Term term, float boost) {
7281
return new BoostQuery(query, boost);
7382
}
7483

84+
public List<String> getPrefixes() {
85+
return prefixes;
86+
}
7587
}

0 commit comments

Comments
 (0)