Skip to content

Commit 36c1445

Browse files
committed
Adding support for context aware segments
Signed-off-by: RS146BIJAY <[email protected]>
1 parent e80b907 commit 36c1445

26 files changed

+1312
-302
lines changed

CHANGELOG-3.0.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2525
- Add filter function for AbstractQueryBuilder, BoolQueryBuilder, ConstantScoreQueryBuilder([#17409](https://github.com/opensearch-project/OpenSearch/pull/17409))
2626
- [Star Tree] [Search] Resolving keyword & numeric bucket aggregation with metric aggregation using star-tree ([#17165](https://github.com/opensearch-project/OpenSearch/pull/17165))
2727
- Added error handling support for the pull-based ingestion ([#17427](https://github.com/opensearch-project/OpenSearch/pull/17427))
28+
- Add support for context aware segments ([#19098](https://github.com/opensearch-project/OpenSearch/pull/19098)
2829

2930

3031
### Dependencies

plugins/store-smb/src/main/java/org/opensearch/index/store/smbmmapfs/SmbMmapFsDirectoryFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
public final class SmbMmapFsDirectoryFactory extends FsDirectoryFactory {
4848

4949
@Override
50-
protected Directory newFSDirectory(Path location, LockFactory lockFactory, IndexSettings indexSettings) throws IOException {
50+
public Directory newFSDirectory(Path location, LockFactory lockFactory, IndexSettings indexSettings) throws IOException {
5151
return new SmbDirectoryWrapper(
5252
setPreload(
5353
new MMapDirectory(location, lockFactory),

plugins/store-smb/src/main/java/org/opensearch/index/store/smbniofs/SmbNIOFsDirectoryFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
public final class SmbNIOFsDirectoryFactory extends FsDirectoryFactory {
2525

2626
@Override
27-
protected Directory newFSDirectory(Path location, LockFactory lockFactory, IndexSettings indexSettings) throws IOException {
27+
public Directory newFSDirectory(Path location, LockFactory lockFactory, IndexSettings indexSettings) throws IOException {
2828
return new SmbDirectoryWrapper(new NIOFSDirectory(location, lockFactory));
2929
}
3030
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index;
10+
11+
import org.apache.lucene.store.Directory;
12+
import org.apache.lucene.store.FilterDirectory;
13+
14+
import java.io.IOException;
15+
import java.util.Arrays;
16+
17+
public class CriteriaBasedCompositeDirectory extends FilterDirectory {
18+
/**
19+
* Sole constructor, typically called from sub-classes.
20+
*
21+
* @param in
22+
*/
23+
protected CriteriaBasedCompositeDirectory(Directory in) {
24+
super(in);
25+
}
26+
27+
@Override
28+
public String[] listAll() throws IOException {
29+
return Arrays.stream(super.listAll()).filter(fileName -> !fileName.startsWith("temp_")).distinct().toArray(String[]::new);
30+
}
31+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index;
10+
11+
import org.apache.lucene.index.MergeTrigger;
12+
import org.apache.lucene.index.SegmentCommitInfo;
13+
import org.apache.lucene.index.SegmentInfos;
14+
import org.apache.lucene.index.TieredMergePolicy;
15+
16+
import java.io.IOException;
17+
import java.util.ArrayList;
18+
import java.util.HashMap;
19+
import java.util.List;
20+
import java.util.Map;
21+
import java.util.Set;
22+
23+
public class CriteriaBasedMergePolicy extends TieredMergePolicy {
24+
@Override
25+
public MergeSpecification findMerges(
26+
MergeTrigger mergeTrigger, SegmentInfos infos, MergeContext mergeContext) throws IOException {
27+
final Set<SegmentCommitInfo> merging = mergeContext.getMergingSegments();
28+
MergeSpecification spec = null;
29+
final Map<String, List<SegmentCommitInfo>> commitInfos = new HashMap<>();
30+
for (SegmentCommitInfo si : infos) {
31+
if (merging.contains(si)) {
32+
continue;
33+
}
34+
35+
final String dwptGroupNumber = si.info.getAttribute("criteria");
36+
commitInfos.computeIfAbsent(dwptGroupNumber, k -> new ArrayList<>()).add(si);
37+
}
38+
39+
for (String dwptGroupNumber : commitInfos.keySet()) {
40+
if (commitInfos.get(dwptGroupNumber).size() > 1) {
41+
final SegmentInfos newSIS = new SegmentInfos(infos.getIndexCreatedVersionMajor());
42+
for (SegmentCommitInfo info : commitInfos.get(dwptGroupNumber)) {
43+
newSIS.add(info);
44+
}
45+
46+
final MergeSpecification tieredMergePolicySpec =
47+
super.findMerges(mergeTrigger, newSIS, mergeContext);
48+
if (tieredMergePolicySpec != null) {
49+
if (spec == null) {
50+
spec = new MergeSpecification();
51+
}
52+
53+
spec.merges.addAll(tieredMergePolicySpec.merges);
54+
}
55+
}
56+
}
57+
58+
return spec;
59+
}
60+
}

server/src/main/java/org/opensearch/index/IndexService.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -637,7 +637,7 @@ protected void closeInternal() {
637637
// Do nothing for shard lock on remote store
638638
}
639639
};
640-
remoteStore = new Store(shardId, this.indexSettings, remoteDirectory, remoteStoreLock, Store.OnClose.EMPTY, path);
640+
remoteStore = new Store(shardId, this.indexSettings, remoteDirectory, remoteStoreLock, Store.OnClose.EMPTY, path, directoryFactory);
641641
} else {
642642
// Disallow shards with remote store based settings to be created on non-remote store enabled nodes
643643
// Even though we have `RemoteStoreMigrationAllocationDecider` in place to prevent something like this from happening at the
@@ -658,15 +658,17 @@ protected void closeInternal() {
658658
Directory localDirectory = directoryFactory.newDirectory(this.indexSettings, path);
659659
directory = new CompositeDirectory(localDirectory, remoteDirectory, fileCache);
660660
} else {
661-
directory = directoryFactory.newDirectory(this.indexSettings, path);
661+
Directory localDirectory = directoryFactory.newDirectory(this.indexSettings, path);
662+
directory = new CriteriaBasedCompositeDirectory(localDirectory);
662663
}
663664
store = new Store(
664665
shardId,
665666
this.indexSettings,
666667
directory,
667668
lock,
668669
new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId)),
669-
path
670+
path,
671+
directoryFactory
670672
);
671673
eventListener.onStoreCreated(shardId);
672674
indexShard = new IndexShard(
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*Add commentMore actions
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.codec;
10+
11+
import org.apache.lucene.codecs.Codec;
12+
import org.apache.lucene.codecs.FilterCodec;
13+
import org.apache.lucene.codecs.SegmentInfoFormat;
14+
import org.apache.lucene.codecs.lucene101.Lucene101Codec;
15+
import org.apache.lucene.index.SegmentInfo;
16+
import org.apache.lucene.store.Directory;
17+
import org.apache.lucene.store.IOContext;
18+
19+
import java.io.IOException;
20+
21+
public class CriteriaBasedCodec extends FilterCodec {
22+
23+
private final String criteria;
24+
public CriteriaBasedCodec() {
25+
super("CriteriaBasedCodec", new Lucene101Codec());
26+
criteria = null;
27+
}
28+
29+
public CriteriaBasedCodec(Codec delegate, String criteria) {
30+
super("CriteriaBasedCodec", delegate);
31+
this.criteria = criteria;
32+
}
33+
34+
@Override
35+
public SegmentInfoFormat segmentInfoFormat() {
36+
return new SegmentInfoFormat() {
37+
@Override
38+
public SegmentInfo read(Directory directory, String segmentName, byte[] segmentID, IOContext context) throws IOException {
39+
return delegate.segmentInfoFormat().read(directory, segmentName, segmentID, context);
40+
}
41+
42+
@Override
43+
public void write(Directory directory, SegmentInfo info, IOContext ioContext) throws IOException {
44+
info.putAttribute("criteria", criteria);
45+
delegate.segmentInfoFormat().write(directory, info, ioContext);
46+
}
47+
};
48+
}
49+
}

0 commit comments

Comments
 (0)