Skip to content

Commit 11c93f3

Browse files
committed
Adding support for context aware segments
Signed-off-by: RS146BIJAY <[email protected]>
1 parent 8001e2d commit 11c93f3

34 files changed

+2336
-131
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1111
- Add a toBuilder method in EngineConfig to support easy modification of configs([#19054](https://github.com/opensearch-project/OpenSearch/pull/19054))
1212
- Add StoreFactory plugin interface for custom Store implementations([#19091](https://github.com/opensearch-project/OpenSearch/pull/19091))
1313
- Add a dynamic setting to change skip_cache_factor and min_frequency for querycache ([#18351](https://github.com/opensearch-project/OpenSearch/issues/18351))
14+
- Add support for context aware segments ([#19098](https://github.com/opensearch-project/OpenSearch/pull/19098))
1415

1516
### Changed
1617
- Add CompletionStage variants to methods in the Client Interface and default to ActionListener impl ([#18998](https://github.com/opensearch-project/OpenSearch/pull/18998))

plugins/store-smb/src/main/java/org/opensearch/index/store/smbmmapfs/SmbMmapFsDirectoryFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
public final class SmbMmapFsDirectoryFactory extends FsDirectoryFactory {
4848

4949
@Override
50-
protected Directory newFSDirectory(Path location, LockFactory lockFactory, IndexSettings indexSettings) throws IOException {
50+
public Directory newFSDirectory(Path location, LockFactory lockFactory, IndexSettings indexSettings) throws IOException {
5151
return new SmbDirectoryWrapper(
5252
setPreload(
5353
new MMapDirectory(location, lockFactory),

plugins/store-smb/src/main/java/org/opensearch/index/store/smbniofs/SmbNIOFsDirectoryFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
public final class SmbNIOFsDirectoryFactory extends FsDirectoryFactory {
2525

2626
@Override
27-
protected Directory newFSDirectory(Path location, LockFactory lockFactory, IndexSettings indexSettings) throws IOException {
27+
public Directory newFSDirectory(Path location, LockFactory lockFactory, IndexSettings indexSettings) throws IOException {
2828
return new SmbDirectoryWrapper(new NIOFSDirectory(location, lockFactory));
2929
}
3030
}

release-notes/opensearch.release-notes-3.0.0-alpha1.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@
5555
- Fix Bug - Handle unsigned long in sorting order assertion of LongHashSet ([#17207](https://github.com/opensearch-project/OpenSearch/pull/17207))
5656
- Implemented computation of segment replication stats at shard level ([#17055](https://github.com/opensearch-project/OpenSearch/pull/17055))
5757
- [Rule Based Auto-tagging] Add in-memory attribute value store ([#17342](https://github.com/opensearch-project/OpenSearch/pull/17342))
58+
- Add support for context aware segments ([#19098](https://github.com/opensearch-project/OpenSearch/pull/19098)
59+
5860

5961
### Dependencies
6062
- Bump Apache HttpCore5/HttpClient5 dependencies from 5.2.5/5.3.1 to 5.3.1/5.4.1 to support ExtendedSocketOption in HttpAsyncClient ([#16757](https://github.com/opensearch-project/OpenSearch/pull/16757))
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index;
10+
11+
import org.apache.lucene.store.Directory;
12+
import org.apache.lucene.store.FilterDirectory;
13+
14+
import java.io.IOException;
15+
import java.util.Arrays;
16+
17+
public class CriteriaBasedCompositeDirectory extends FilterDirectory {
18+
/**
19+
* Sole constructor, typically called from sub-classes.
20+
*
21+
* @param in
22+
*/
23+
protected CriteriaBasedCompositeDirectory(Directory in) {
24+
super(in);
25+
}
26+
27+
@Override
28+
public String[] listAll() throws IOException {
29+
return Arrays.stream(super.listAll()).filter(fileName -> !fileName.startsWith("temp_")).distinct().toArray(String[]::new);
30+
}
31+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index;
10+
11+
import org.apache.lucene.index.MergeTrigger;
12+
import org.apache.lucene.index.SegmentCommitInfo;
13+
import org.apache.lucene.index.SegmentInfos;
14+
import org.apache.lucene.index.TieredMergePolicy;
15+
16+
import java.io.IOException;
17+
import java.util.ArrayList;
18+
import java.util.HashMap;
19+
import java.util.List;
20+
import java.util.Map;
21+
import java.util.Set;
22+
23+
public class CriteriaBasedMergePolicy extends TieredMergePolicy {
24+
@Override
25+
public MergeSpecification findMerges(
26+
MergeTrigger mergeTrigger, SegmentInfos infos, MergeContext mergeContext) throws IOException {
27+
final Set<SegmentCommitInfo> merging = mergeContext.getMergingSegments();
28+
MergeSpecification spec = null;
29+
final Map<String, List<SegmentCommitInfo>> commitInfos = new HashMap<>();
30+
for (SegmentCommitInfo si : infos) {
31+
if (merging.contains(si)) {
32+
continue;
33+
}
34+
35+
final String dwptGroupNumber = si.info.getAttribute("criteria");
36+
commitInfos.computeIfAbsent(dwptGroupNumber, k -> new ArrayList<>()).add(si);
37+
}
38+
39+
for (String dwptGroupNumber : commitInfos.keySet()) {
40+
if (commitInfos.get(dwptGroupNumber).size() > 1) {
41+
final SegmentInfos newSIS = new SegmentInfos(infos.getIndexCreatedVersionMajor());
42+
for (SegmentCommitInfo info : commitInfos.get(dwptGroupNumber)) {
43+
newSIS.add(info);
44+
}
45+
46+
final MergeSpecification tieredMergePolicySpec =
47+
super.findMerges(mergeTrigger, newSIS, mergeContext);
48+
if (tieredMergePolicySpec != null) {
49+
if (spec == null) {
50+
spec = new MergeSpecification();
51+
}
52+
53+
spec.merges.addAll(tieredMergePolicySpec.merges);
54+
}
55+
}
56+
}
57+
58+
return spec;
59+
}
60+
}

server/src/main/java/org/opensearch/index/IndexModule.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
import org.apache.lucene.index.LeafReader;
3939
import org.apache.lucene.search.similarities.BM25Similarity;
4040
import org.apache.lucene.search.similarities.Similarity;
41+
import org.apache.lucene.store.Directory;
42+
import org.apache.lucene.store.LockFactory;
4143
import org.apache.lucene.util.Constants;
4244
import org.opensearch.Version;
4345
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
@@ -76,6 +78,7 @@
7678
import org.opensearch.index.shard.IndexShard;
7779
import org.opensearch.index.shard.IndexingOperationListener;
7880
import org.opensearch.index.shard.SearchOperationListener;
81+
import org.opensearch.index.shard.ShardPath;
7982
import org.opensearch.index.similarity.SimilarityService;
8083
import org.opensearch.index.store.DefaultCompositeDirectoryFactory;
8184
import org.opensearch.index.store.FsDirectoryFactory;
@@ -97,6 +100,7 @@
97100
import org.opensearch.transport.client.Client;
98101

99102
import java.io.IOException;
103+
import java.nio.file.Path;
100104
import java.util.ArrayList;
101105
import java.util.Collections;
102106
import java.util.HashMap;
@@ -837,7 +841,18 @@ private static IndexStorePlugin.DirectoryFactory getDirectoryFactory(
837841
throw new IllegalArgumentException("Unknown store type [" + storeType + "]");
838842
}
839843
}
840-
return factory;
844+
845+
return new IndexStorePlugin.DirectoryFactory() {
846+
@Override
847+
public Directory newDirectory(IndexSettings indexSettings, ShardPath shardPath) throws IOException {
848+
return new CriteriaBasedCompositeDirectory(factory.newDirectory(indexSettings, shardPath));
849+
}
850+
851+
@Override
852+
public Directory newFSDirectory(Path location, LockFactory lockFactory, IndexSettings indexSettings) throws IOException {
853+
return factory.newFSDirectory(location, lockFactory, indexSettings);
854+
}
855+
};
841856
}
842857

843858
private static IndexStorePlugin.CompositeDirectoryFactory getCompositeDirectoryFactory(

server/src/main/java/org/opensearch/index/IndexService.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -718,7 +718,7 @@ protected void closeInternal() {
718718
// Do nothing for shard lock on remote store
719719
}
720720
};
721-
remoteStore = new Store(shardId, this.indexSettings, remoteDirectory, remoteStoreLock, Store.OnClose.EMPTY, path);
721+
remoteStore = new Store(shardId, this.indexSettings, remoteDirectory, remoteStoreLock, Store.OnClose.EMPTY, path, directoryFactory);
722722
} else {
723723
// Disallow shards with remote store based settings to be created on non-remote store enabled nodes
724724
// Even though we have `RemoteStoreMigrationAllocationDecider` in place to prevent something like this from happening at the
@@ -753,7 +753,8 @@ protected void closeInternal() {
753753
directory,
754754
lock,
755755
new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId)),
756-
path
756+
path,
757+
directoryFactory
757758
);
758759
eventListener.onStoreCreated(shardId);
759760
indexShard = new IndexShard(
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*Add commentMore actions
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.codec;
10+
11+
import org.apache.lucene.codecs.Codec;
12+
import org.apache.lucene.codecs.FilterCodec;
13+
import org.apache.lucene.codecs.SegmentInfoFormat;
14+
import org.apache.lucene.codecs.lucene101.Lucene101Codec;
15+
import org.apache.lucene.index.SegmentInfo;
16+
import org.apache.lucene.store.Directory;
17+
import org.apache.lucene.store.IOContext;
18+
19+
import java.io.IOException;
20+
21+
public class CriteriaBasedCodec extends FilterCodec {
22+
23+
private final String criteria;
24+
public CriteriaBasedCodec() {
25+
super("CriteriaBasedCodec", new Lucene101Codec());
26+
criteria = null;
27+
}
28+
29+
public CriteriaBasedCodec(Codec delegate, String criteria) {
30+
super("CriteriaBasedCodec", delegate);
31+
this.criteria = criteria;
32+
}
33+
34+
@Override
35+
public SegmentInfoFormat segmentInfoFormat() {
36+
return new SegmentInfoFormat() {
37+
@Override
38+
public SegmentInfo read(Directory directory, String segmentName, byte[] segmentID, IOContext context) throws IOException {
39+
return delegate.segmentInfoFormat().read(directory, segmentName, segmentID, context);
40+
}
41+
42+
@Override
43+
public void write(Directory directory, SegmentInfo info, IOContext ioContext) throws IOException {
44+
info.putAttribute("criteria", criteria);
45+
delegate.segmentInfoFormat().write(directory, info, ioContext);
46+
}
47+
};
48+
}
49+
}

0 commit comments

Comments
 (0)