-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove SubProcess and references in EventProcessor related code #47659
base: master
Are you sure you want to change the base?
Conversation
cms-bot internal usage |
+code-checks Logs: https://cmssdt.cern.ch/SDT/code-checks/cms-sw-PR-47659/44194
|
A new Pull Request was created by @wddgit for master. It involves the following packages:
@Dr15Jones, @cmsbuild, @makortel, @smuzaffar can you please review it and eventually sign? Thanks. cms-bot commands are listed here |
enable threading |
please test |
+1 Size: This PR adds an extra 168KB to repository Comparison SummarySummary:
|
Comparison differences are related to #47071 |
@@ -661,22 +624,6 @@ namespace edm { | |||
pathsAndConsumesOfModules.initialize(schedule_.get(), preg()); | |||
|
|||
std::vector<ModuleProcessName> consumedBySubProcesses; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the consumedBySubProcess
something to be cleaned up in a subsequent PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that is part of the module deletion code. I was going to tackle that next.
}) | chain::runLast(std::move(task)); | ||
ServiceRegistry::Operate op(serviceToken_); | ||
schedule_->writeProcessBlockAsync( | ||
task, principalCache_.processBlockPrincipal(processBlockType), &processContext_, actReg_.get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The task
could be moved
task, principalCache_.processBlockPrincipal(processBlockType), &processContext_, actReg_.get()); | |
std::move(task), principalCache_.processBlockPrincipal(processBlockType), &processContext_, actReg_.get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to not move so that the ServiceRegistry::Operate goes out of scope before the task goes out of scope. I believe there is the extremely unlikely possibility the Services stay on a very long time and cause problems during shutdown, because once we call doneWaiting everything else continues to completion and we are off to the races. IIRC, we actually saw that problem in the past in some rare cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. Could you then add a comment here that the lifetime of the task
must be longer than the lifetime of op
for that reason?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There 10 or 20 places in the code like this in EventProcessor. Should I mark them all? Maybe a good idea. This is something easy to forget about.
} | ||
}) | chain::runLast(std::move(task)); | ||
ServiceRegistry::Operate op(serviceToken_); | ||
schedule_->writeRunAsync(task, runPrincipal, &processContext_, actReg_.get(), mergeableRunProductMetadata); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The task
could be moved
schedule_->writeRunAsync(task, runPrincipal, &processContext_, actReg_.get(), mergeableRunProductMetadata); | |
schedule_->writeRunAsync(std::move(task), runPrincipal, &processContext_, actReg_.get(), mergeableRunProductMetadata); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as previous comment.
for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) { | ||
schedule_->processOneStreamAsync<Traits>( | ||
WaitingTaskHolder(taskGroup_, &streamLoopWaitTask), i, transitionInfo, serviceToken_); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find the replacement of beginGlobalTransitionAsync()
with schedule_->processOneGlobalAsync()
fine as such, although one could make an argument against along "if we ever want to have a customization point there, we need to add the beginGlobalTransitionAsync()
back".
I'm less certain about the replacement of beginStreamsTransitionAsync()
with the loop over streams calling schedule_->processOneStreamAsync()
. Or in other words, I'd find some value in keeping the loop over streams abstracted in beginStreamsTransitionAsync()
.
Further thoughts? @wddgit @Dr15Jones
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What ever we do we should keep EventProcessor
and TestProcessor
using the same code, preferable by sharing the same routines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The loop over streams is already handled very differently in EventProcessor and TestProcessor. The one in EventProcessor involves complex concurrency code. In the loop over streams in EventProcessor, tasks are inserted into the streamQueues_. For example, this is the loop over streams in EventProcessor::beginRunAsync. beginStreamsTransitionAsync is not called at all. The call to beginStreamTransitionAsync (almost the same but no "s" in the function name) is inside streamBeginRunAsync.
streamQueuesInserter_.push(*holder.group(), [this, status, holder]() mutable {
for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
CMS_SA_ALLOW try {
streamQueues_[i].push(*holder.group(), [this, i, status, holder]() mutable {
streamBeginRunAsync(i, std::move(status), std::move(holder));
});
} catch (...) {
if (status->streamFinishedBeginRun()) {
WaitingTaskHolder copyHolder(holder);
copyHolder.doneWaiting(std::current_exception());
status->resetBeginResources();
queueWhichWaitsForIOVsToFinish_.resume();
exceptionRunStatus_ = status;
}
}
}
});
TestProcessor does not have support for this level of concurrency. Even before this PR, TestProcessor is the only thing in all of CMSSW using beginStreamsTransitionAsync
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could add the beginStreamTransitionAsync
back, but it will simply call directly through to the other function and add no value at all. It seems to me, it is just as likely to get in the way as help if a new version of SubProcess is implemented. And in the meantime removing it simplifies the code.
emptyList, | ||
false); | ||
for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) { | ||
schedule_->processOneStreamAsync<Traits>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this loop has a problem. Each iteration creates a new WaitingTaskHolder
, while the old code made 1 which lived for the lifetime of the loop. If the thread running the loop was stalled, then the already started scheduled could finish their jobs an cause all the previously created WaitingTaskHolders
to go away thereby starting the held task BEFORE this loop finishes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, I would agree that in our code this pattern would cause problems. But in this case, it is in a FinalWaitingTask section of code. It will finish the loop before it gets to the wait. I think it will be OK in this case.
FinalWaitingTask streamLoopWaitTask{taskGroup_};
using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
schedule_->processOneStreamAsync<Traits>(
WaitingTaskHolder(taskGroup_, &streamLoopWaitTask), i, transitionInfo, serviceToken_, false);
}
streamLoopWaitTask.wait();
PR description:
Remove from SubProcess from EventProcessor related code.
I've intentionally left uses of SubProcess related to the EventSetup, Services, and module deletion for separate later PRs, but this one gets everything else.
This is the second PR in a series. The first was PR #47565. That PR removed support for SubProcess from the CMS configuration system and made it impossible to run a job with a SubProcess configured. It also removed all SubProcess tests.
Resolves cms-sw/framework-team#1282
PR validation:
Relies on existing tests. There is no new functionality. This is cleaning up code that served no purpose anymore.