-
Notifications
You must be signed in to change notification settings - Fork 2.4k
Adding support for context aware segments #19098
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
❌ Gradle check result for 98c1d8c: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
7550764
to
36c1445
Compare
❌ Gradle check result for 36c1445: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
63716a4
to
489fbb4
Compare
❌ Gradle check result for 489fbb4: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
❌ Gradle check result for 9b1f6cf: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
Signed-off-by: RS146BIJAY <[email protected]>
❌ Gradle check result for 11c93f3: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
import java.io.IOException; | ||
import java.util.Arrays; | ||
|
||
public class CriteriaBasedCompositeDirectory extends FilterDirectory { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rename to BucketedCompositeDirectory
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
@Override | ||
public Directory newFSDirectory(Path location, LockFactory lockFactory, IndexSettings indexSettings) throws IOException { | ||
return factory.newFSDirectory(location, lockFactory, indexSettings); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this need to be overridden?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For creating child level writer, we are creating a sub directory within the parent shard directory and associating it with child writer. For this we need to make a function call to directly call newFSDirectory function call which accepts a subdirectory path on which we need to create a directory. So need to override this function to make this function call from store.
private IndexWriterConfig getIndexWriterConfig(MergeScheduler childMergeScheduler) { | ||
final IndexWriterConfig iwc = new IndexWriterConfig(engineConfig.getAnalyzer()); | ||
iwc.setCommitOnClose(true); | ||
iwc.setOpenMode(IndexWriterConfig.OpenMode.CREATE); | ||
iwc.setIndexCreatedVersionMajor(config().getIndexSettings().getIndexVersionCreated().luceneVersion.major); | ||
//iwc.setIndexDeletionPolicy(childCombinedDeletionPolicy); | ||
// with tests.verbose, lucene sets this up: plumb to align with filesystem stream | ||
boolean verbose = false; | ||
try { | ||
verbose = Boolean.parseBoolean(System.getProperty("tests.verbose")); | ||
} catch (Exception ignore) {} | ||
iwc.setInfoStream(verbose ? InfoStream.getDefault() : new LoggerInfoStream(logger)); | ||
iwc.setMergeScheduler(childMergeScheduler); | ||
// Give us the opportunity to upgrade old segments while performing | ||
// background merges | ||
MergePolicy mergePolicy = config().getMergePolicy(); | ||
// always configure soft-deletes field so an engine with soft-deletes disabled can open a Lucene index with soft-deletes. | ||
iwc.setSoftDeletesField(Lucene.SOFT_DELETES_FIELD); | ||
mergePolicy = new RecoverySourcePruneMergePolicy( | ||
SourceFieldMapper.RECOVERY_SOURCE_NAME, | ||
softDeletesPolicy::getRetentionQuery, | ||
new SoftDeletesRetentionMergePolicy( | ||
Lucene.SOFT_DELETES_FIELD, | ||
softDeletesPolicy::getRetentionQuery, | ||
new PrunePostingsMergePolicy(mergePolicy, IdFieldMapper.NAME) | ||
) | ||
); | ||
boolean shuffleForcedMerge = Booleans.parseBoolean(System.getProperty("opensearch.shuffle_forced_merge", Boolean.TRUE.toString())); | ||
if (shuffleForcedMerge) { | ||
// We wrap the merge policy for all indices even though it is mostly useful for time-based indices | ||
// but there should be no overhead for other type of indices so it's simpler than adding a setting | ||
// to enable it. | ||
mergePolicy = new ShuffleForcedMergePolicy(mergePolicy); | ||
} | ||
if (config().getIndexSettings().isMergeOnFlushEnabled()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove redundant logic and merge with the other getIndexWriterConfig
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack.
private CompositeIndexWriter.DisposableIndexWriter createChildWriterUtil(String criteria, CompositeIndexWriter.CriteriaBasedIndexWriterLookup lookup) throws IOException { | ||
return new CompositeIndexWriter.DisposableIndexWriter(new IndexWriter(store.newTempDirectory("temp_" + criteria + "_" + UUID.randomUUID()), | ||
getIndexWriterConfig(new OpenSearchConcurrentMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings()))), lookup); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic should sit in CompositeIndexWriter
so as to not overload engine with the inner working of the index writer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack.
* Try acquiring lock, returning null if unable to acquire lock within timeout. | ||
*/ | ||
public CriteriaBasedIndexWriterLookup tryAcquire(TimeValue timeout) throws InterruptedException { | ||
boolean locked = lock.tryLock(timeout.duration(), timeout.timeUnit()); | ||
if (locked) { | ||
assert addCurrentThread(); | ||
return lookup; | ||
} else { | ||
return null; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are the consequences of a timeout while acquiring this lock?
public final synchronized void setLiveCommitData( | ||
Iterable<Map.Entry<String, String>> commitUserData) { | ||
accumulatingIndexWriter.setLiveCommitData(commitUserData); | ||
} | ||
|
||
public final long commit() throws IOException { | ||
ensureOpen(); | ||
return accumulatingIndexWriter.commit(); | ||
} | ||
|
||
public final synchronized Iterable<Map.Entry<String, String>> getLiveCommitData() { | ||
return accumulatingIndexWriter.getLiveCommitData(); | ||
} | ||
|
||
public void rollback() throws IOException { | ||
if (shouldClose()) { | ||
Collection<IndexWriter> currentWriterSet = liveIndexWriterDeletesMap.current.criteriaBasedIndexWriterMap.values().stream() | ||
.map(DisposableIndexWriter::getIndexWriter).collect(Collectors.toSet()); | ||
|
||
for (IndexWriter indexWriter : currentWriterSet) { | ||
if (indexWriter.isOpen() == true) { | ||
indexWriter.rollback(); | ||
} | ||
} | ||
|
||
Collection<IndexWriter> oldWriterSet = liveIndexWriterDeletesMap.old.criteriaBasedIndexWriterMap.values().stream() | ||
.map(DisposableIndexWriter::getIndexWriter).collect(Collectors.toSet()); | ||
for (IndexWriter indexWriter : oldWriterSet) { | ||
if (indexWriter.isOpen() == true) { | ||
indexWriter.rollback(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this class extend IndexWriter
, if not we would need to simplify the interfaces to ensure that underlying Lucene changes in the APIs reflect in these classes as well going forward
Description
In order to ensure, only more relevant data is iterated during query execution, we suggest collocating related data into same segment or group of segments. Group of a segment can be determined by a grouping criteria function. The goal is to align segment boundaries with the anticipated query patterns, ensuring that documents frequently queried together resides in the same segments. For eg: For log analytics scenarios, users often queries for anomalies (4xx and 5xx status code logs) over success logs (2xx). By applying a grouping criteria function based on status code anomalies and success logs are segregated into distinct segments (or groups of segments). This will ensure that search queries like “number of faults in the last hour” or “number of errors in the last three hours” will be more efficient, as they will need to only process segments with 4xx or 5xx status codes, which will be a much smaller dataset, improving query performance.
In this approach, we are maintaining context aware segments using a pool of disposable IndexWriters. We are modelling disposable IndexWriters similar to how Lucene model DWPTs inside IndexWriters. At any point of time, OpenSearch will now maintain a map of IndexWriters for each group, along with a common accumulating IndexWriter. Indexing request for a document belonging to a group will be redirected to respective group specific IndexWriter. Any operations like opening a reader for search, getting checkpoints during replication or snapshot, etc is performed on the accumulating IndexWriters. In order to ensure this accumulating IndexWriters remains in sync with group specific child writers, we periodically sync the child level writer with Accumulating writer during refresh via IndexWriter’s addIndexes api call.
Disposable IndexWriters
All write operations will now be handled by a pool of group specific disposable IndexWriters. These disposable IndexWriters will be modelled after Lucene’s DWPTs.
States of disposable IndexWriters
Similar to DWPTs these disposable IndexWriters will have three states:
Active
IndexWriters in this state will handle all write requests coming to InternalEngine. For each group/tenant, there will be at most a single IndexWriter that will be in the active state. OpenSearch maintains a mapping of active IndexWriters, each associated with a specific group. During indexing, the specific IndexWriter selected for indexing a document will depend on the outcome of the document for the grouping criteria function. Should there is no active IndexWriter for a group, a new IndexWriter will be instantiated for this criteria and added to the pool.
Mark for Refresh
During refresh, we transition all group specific active IndexWriters from active pool to an intermediate refresh pending state. At this stage, these IndexWriters will not be accepting any active writes, but will continue to handle any ongoing operation.
Close
At this stage, OpenSearch will sync the content of group specific IndexWriters with an accumulating parent IndexWriter via Lucene’s addIndexes API call. Post the sync, we remove all group specific IndexWriters from Mark for refresh stage and close them.
CompositeIndexWriter
InternalEngine will now delegate all IndexWriter specific operations through a new CompositeIndexWriter class rather than directly interacting with IndexWriter. This wrapper serves as a unified interface for coordinating write operations with group specific IndexWriter and managing read operation through accumulating parent IndexWriter. This wrapper class also takes care of syncing group specific IndexWriter with the accumulating IndexWriter during refresh by implementing the RefreshListener interface.
In addition to managing group-specific IndexWriters, CompositeIndexWriter tracks all updates and deletion applied during each refresh cycle. This state is maintained using a refresh-rotating map structure analogous to LiveVersionMap’s implementation.
Class Diagram
Indexing
During indexing, CompositeIndexWriter will first evaluates the group for this document using a grouping criteria function. The specific IndexWriter selected for indexing a document will depend on the outcome of the document for the grouping criteria function. Should the relevant IndexWriter entry inside map is null, a new IndexWriter will be instantiated for this criteria and added to this map.
Resolve version
InternalEngine currently resolves the current version of the document before indexing it to figure out whether this request is an indexing or update request. InternalEngine does this by first doing a lookup in version map. Incase no version of this document is present in version map, it queries the Lucene via searcher to look for current version of document. Since the version map is maintained till an entire refresh cycle, there is no change in how we will resolve version in the above approach. InternalEngine will do a lookup for the document first in version map followed by querying doc associated with parent IndexWriter.
Locks
OpenSearch currently utilises ReentrantReadWriteLock to ensure underlying IndexWriter does not closed during active Indexing. With context aware segments, we will use an extra lock for each IndexWriterMap inside CompositeIndexWriter.
During each write/update/delete operations, we will take a ReadLock the ReentrantLock associated with this map. This lock gets released when Indexing completes. During refresh, we obtain a write lock on the same ReentrantLock just before rotating the WriterMap. Since write lock will be acquired only when there is no active read lock on this writer, all the writers of a map is closed and synced with parent writer only when their are no active writes happening on these IndexWriters.
Related Issues
Resolves #[Issue number to be closed when this PR is merged]
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.