Skip to content

Commit df43cfb

Browse files
author
Daniel Mapleson
committed
Fixed bugs which prevented RAMPART working in unscheduled mode.
Fixed a bug which prevented RAMPART working without a mecq element in the configuration file. Using Tgacconanprocs 0.12.7.
1 parent 15ebd4d commit df43cfb

File tree

11 files changed

+80
-33
lines changed

11 files changed

+80
-33
lines changed

pom.xml

+2-2
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
<groupId>uk.ac.tgac.rampart</groupId>
2525
<artifactId>rampart</artifactId>
26-
<version>0.12.0</version>
26+
<version>0.12.1</version>
2727
<packaging>jar</packaging>
2828

2929
<name>RAMPART</name>
@@ -477,7 +477,7 @@
477477
<dependency>
478478
<groupId>uk.ac.tgac.conan</groupId>
479479
<artifactId>tgac-conan-process-wrappers</artifactId>
480-
<version>0.12.5</version>
480+
<version>0.12.7</version>
481481
<exclusions>
482482
<exclusion>
483483
<groupId>commons-cli</groupId>

src/main/java/uk/ac/tgac/rampart/RampartPipeline.java

+29-23
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import uk.ac.ebi.fgpt.conan.model.param.ConanParameter;
3535
import uk.ac.ebi.fgpt.conan.model.param.ParamMap;
3636
import uk.ac.ebi.fgpt.conan.service.ConanExecutorService;
37-
import uk.ac.tgac.conan.core.data.Library;
3837
import uk.ac.tgac.conan.core.data.Organism;
3938
import uk.ac.tgac.conan.core.util.XmlHelper;
4039
import uk.ac.tgac.rampart.stage.*;
@@ -98,13 +97,13 @@ public void init() throws IOException {
9897
public static class Args extends AbstractProcessArgs {
9998

10099
public static final String KEY_ELEM_MECQ = "mecq";
101-
public static final String KEY_ELEM_ANALYSE_READS = "mecq_analysis";
100+
public static final String KEY_ELEM_MECQ_ANALYSIS = "mecq_analysis";
102101
public static final String KEY_ELEM_KMER_CALC = "kmer_calc";
103102
public static final String KEY_ELEM_MASS = "mass";
104-
public static final String KEY_ELEM_ANALYSE_MASS = "mass_analysis";
105-
public static final String KEY_ELEM_SELECT_MASS = "mass_select";
103+
public static final String KEY_ELEM_MASS_ANALYSIS = "mass_analysis";
104+
public static final String KEY_ELEM_MASS_SELECT = "mass_select";
106105
public static final String KEY_ELEM_AMP = "amp";
107-
public static final String KEY_ELEM_ANALYSE_AMP = "amp_analysis";
106+
public static final String KEY_ELEM_AMP_ANALYSIS = "amp_analysis";
108107
public static final String KEY_ELEM_FINALISE = "finalise";
109108
public static final String KEY_ELEM_COLLECT = "collect";
110109

@@ -176,13 +175,13 @@ public Args(Element element, List<Mecq.Sample> samples, Organism organism, File
176175
new String[0],
177176
new String[]{
178177
KEY_ELEM_MECQ,
179-
KEY_ELEM_ANALYSE_READS,
178+
KEY_ELEM_MECQ_ANALYSIS,
180179
KEY_ELEM_KMER_CALC,
181180
KEY_ELEM_MASS,
182-
KEY_ELEM_ANALYSE_MASS,
183-
KEY_ELEM_SELECT_MASS,
181+
KEY_ELEM_MASS_ANALYSIS,
182+
KEY_ELEM_MASS_SELECT,
184183
KEY_ELEM_AMP,
185-
KEY_ELEM_ANALYSE_AMP,
184+
KEY_ELEM_AMP_ANALYSIS,
186185
KEY_ELEM_FINALISE,
187186
KEY_ELEM_COLLECT
188187
}
@@ -219,15 +218,22 @@ public Args(Element element, List<Mecq.Sample> samples, Organism organism, File
219218

220219
this.stages.setArgsIfPresent(RampartStage.MECQ, this.mecqArgs);
221220

221+
if (this.mecqArgs == null) {
222+
// Add ECQ information into samples list
223+
for (int i = 0; i < samples.size(); i++) {
224+
samples.get(i).ecqArgList = new ArrayList<>();
225+
}
226+
}
227+
222228

223229
// Analyse reads
224-
Element mecqAnalysisElement = XmlHelper.getDistinctElementByName(element, KEY_ELEM_ANALYSE_READS);
230+
Element mecqAnalysisElement = XmlHelper.getDistinctElementByName(element, KEY_ELEM_MECQ_ANALYSIS);
225231
this.mecqAnalysisArgs = mecqAnalysisElement == null ? null :
226232
new MecqAnalysis.Args(
227233
mecqAnalysisElement,
228234
this.outputDir,
229235
this.jobPrefix,
230-
this.mecqArgs.getSamples(),
236+
samples,
231237
this.organism,
232238
this.runParallel);
233239

@@ -240,7 +246,7 @@ public Args(Element element, List<Mecq.Sample> samples, Organism organism, File
240246
kmerCalcElement,
241247
this.outputDir,
242248
this.jobPrefix,
243-
this.mecqArgs.getSamples(),
249+
samples,
244250
this.organism,
245251
this.runParallel);
246252

@@ -253,7 +259,7 @@ public Args(Element element, List<Mecq.Sample> samples, Organism organism, File
253259
massElement,
254260
this.outputDir,
255261
this.jobPrefix,
256-
this.mecqArgs.getSamples(),
262+
samples,
257263
this.organism,
258264
this.kmerCalcArgs,
259265
this.runParallel);
@@ -276,7 +282,7 @@ public Args(Element element, List<Mecq.Sample> samples, Organism organism, File
276282
}
277283

278284
// Analyse MASS assemblies
279-
Element analyseMassElement = XmlHelper.getDistinctElementByName(element, KEY_ELEM_ANALYSE_MASS);
285+
Element analyseMassElement = XmlHelper.getDistinctElementByName(element, KEY_ELEM_MASS_ANALYSIS);
280286
this.analyseMassArgs = analyseMassElement == null ? null :
281287
new AnalyseMassAssemblies.Args(
282288
analyseMassElement,
@@ -290,7 +296,7 @@ public Args(Element element, List<Mecq.Sample> samples, Organism organism, File
290296
this.stages.setArgsIfPresent(RampartStage.MASS_ANALYSIS, this.analyseMassArgs);
291297

292298
// Select MASS assembly
293-
Element selectMassElement = XmlHelper.getDistinctElementByName(element, KEY_ELEM_SELECT_MASS);
299+
Element selectMassElement = XmlHelper.getDistinctElementByName(element, KEY_ELEM_MASS_SELECT);
294300
this.selectMassArgs = selectMassElement == null ? null :
295301
new Select.Args(
296302
selectMassElement,
@@ -312,13 +318,13 @@ public Args(Element element, List<Mecq.Sample> samples, Organism organism, File
312318
ampElement,
313319
this.outputDir,
314320
this.jobPrefix,
315-
this.mecqArgs.getSamples(),
321+
samples,
316322
this.organism,
317-
this.ampInput != null && this.mecqArgs.getSamples().size() == 1 ?
323+
this.ampInput != null && samples.size() == 1 ?
318324
this.ampInput :
319325
null,
320326
this.organism.getPloidy() > 1 ?
321-
this.ampBubble != null && this.mecqArgs.getSamples().size() == 1 ?
327+
this.ampBubble != null && samples.size() == 1 ?
322328
this.ampBubble :
323329
null :
324330
null,
@@ -328,13 +334,13 @@ public Args(Element element, List<Mecq.Sample> samples, Organism organism, File
328334
this.stages.setArgsIfPresent(RampartStage.AMP, this.ampArgs);
329335

330336
// Analyse AMP assemblies
331-
Element analyseAmpElement = XmlHelper.getDistinctElementByName(element, KEY_ELEM_ANALYSE_AMP);
337+
Element analyseAmpElement = XmlHelper.getDistinctElementByName(element, KEY_ELEM_AMP_ANALYSIS);
332338
this.analyseAmpArgs = analyseAmpElement == null ? null :
333339
new AnalyseAmpAssemblies.Args(
334340
analyseAmpElement,
335341
this.outputDir,
336342
this.jobPrefix,
337-
this.mecqArgs.getSamples(),
343+
samples,
338344
this.organism,
339345
this.ampArgs == null ? null : this.ampArgs.getStageArgsList(),
340346
this.runParallel
@@ -351,7 +357,7 @@ public Args(Element element, List<Mecq.Sample> samples, Organism organism, File
351357
finaliseElement,
352358
this.outputDir,
353359
this.jobPrefix,
354-
this.mecqArgs.getSamples(),
360+
samples,
355361
this.organism,
356362
this.institution,
357363
inputFromMass,
@@ -363,13 +369,13 @@ public Args(Element element, List<Mecq.Sample> samples, Organism organism, File
363369

364370
this.stages.setArgsIfPresent(RampartStage.FINALISE, this.finaliseArgs);
365371

366-
if (this.mecqArgs.getSamples().size() > 1) {
372+
if (samples.size() > 1) {
367373
Element collectElement = XmlHelper.getDistinctElementByName(element, KEY_ELEM_COLLECT);
368374
this.collectArgs = new Collect.Args(
369375
collectElement,
370376
this.outputDir,
371377
this.jobPrefix,
372-
this.mecqArgs.getSamples(),
378+
samples,
373379
this.organism,
374380
this.finaliseArgs,
375381
this.runParallel);

src/main/java/uk/ac/tgac/rampart/stage/Amp.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -266,8 +266,14 @@ public Args(Element ele, File outputDir, String jobPrefix, List<Mecq.Sample> sam
266266
this.bubbleFile = bubbleFile;
267267
this.stageArgsList = new HashMap<>();
268268

269+
File lastAssembly = null;
270+
269271
for(Mecq.Sample sample : samples) {
270272

273+
if (lastAssembly == null) {
274+
lastAssembly = new File(new File(this.getSampleDir(sample), RampartStage.MASS_SELECT.getOutputDirName()), "best.fa");
275+
}
276+
271277
// Parse Xml for AMP stages
272278
// All single mass args
273279
List<AmpStage.Args> stageList = new ArrayList<>();
@@ -284,13 +290,13 @@ public Args(Element ele, File outputDir, String jobPrefix, List<Mecq.Sample> sam
284290
jobPrefix + "-" + stageName,
285291
sample,
286292
this.organism,
287-
this.getInputAssembly(sample),
293+
lastAssembly,
288294
this.bubbleFile,
289295
i);
290296

291297
stageList.add(stage);
292298

293-
this.inputAssembly = stage.getOutputFile();
299+
lastAssembly = stage.getOutputFile();
294300
}
295301

296302
this.stageArgsList.put(sample, stageList);

src/main/java/uk/ac/tgac/rampart/stage/MassJob.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,8 @@ public ExecutionResult execute(ExecutionContext executionContext) throws Process
150150
ExecutionResult result = this.executeAssembler(
151151
assembler,
152152
args.getJobPrefix() + "-assembly-" + outputDir.getName(),
153-
ssResults.get(asmArgs.getDesiredCoverage()));
153+
ssResults.get(asmArgs.getDesiredCoverage()),
154+
executionContext.usingScheduler() && (args.isMassParallel() || args.isRunParallel()));
154155

155156
// Add assembler id to list
156157
result.setName(title);
@@ -445,7 +446,7 @@ protected long getCount(File seqFile, File outputDir, String jobName, boolean li
445446
return Long.parseLong(lines.get(0).trim());
446447
}
447448

448-
public ExecutionResult executeAssembler(Assembler assembler, String jobName, List<Integer> jobIds)
449+
public ExecutionResult executeAssembler(Assembler assembler, String jobName, List<Integer> jobIds, boolean runParallel)
449450
throws ProcessExecutionException, InterruptedException, IOException, ConanParameterException {
450451

451452
// Important that this happens after directory cleaning.
@@ -460,7 +461,7 @@ public ExecutionResult executeAssembler(Assembler assembler, String jobName, Lis
460461
args.getThreads(),
461462
args.getMemory(),
462463
args.getExpWallTimeMins(),
463-
args.isMassParallel() || args.isRunParallel(),
464+
runParallel,
464465
jobIds,
465466
assembler.usesOpenMpi());
466467
}

src/main/java/uk/ac/tgac/rampart/stage/analyse/asm/AnalyseAmpAssemblies.java

+4
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,10 @@ public TaskResult executeSample(Mecq.Sample sample, ExecutionContext executionCo
133133
Set<AssemblyAnalyser> requestedServices = new HashSet<>();
134134
for(AnalyseAssembliesArgs.ToolArgs requestedService : this.getArgs().getTools()) {
135135
AssemblyAnalyser aa = this.assemblyAnalyserFactory.create(requestedService.getName(), this.conanExecutorService);
136+
if (!executionContext.usingScheduler() && args.isRunParallel()) {
137+
log.warn("Forcing linear execution due to lack of job scheduler");
138+
requestedService.setRunParallel(false);
139+
}
136140
aa.setArgs(requestedService);
137141
requestedServices.add(aa);
138142
}

src/main/java/uk/ac/tgac/rampart/stage/analyse/asm/AnalyseMassAssemblies.java

+5
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,11 @@ public TaskResult executeSample(Mecq.Sample sample, ExecutionContext executionCo
117117
Set<AssemblyAnalyser> requestedServices = args.getAssemblyAnalysers();
118118
for (AssemblyAnalyser requestedService : requestedServices) {
119119
requestedService.setConanExecutorService(this.conanExecutorService);
120+
// Force run parallel to false if not using a scheduler
121+
if (!executionContext.usingScheduler() && args.isRunParallel()) {
122+
log.warn("Forcing linear execution due to lack of job scheduler");
123+
requestedService.setRunParallel(false);
124+
}
120125
}
121126

122127
List<File> unitigAssemblies = new ArrayList<>();

src/main/java/uk/ac/tgac/rampart/stage/analyse/asm/analysers/AssemblyAnalyser.java

+6
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,12 @@ List<ExecutionResult> execute(List<File> assemblies, File outputDir, String jobP
6868
*/
6969
void setArgs(AnalyseAssembliesArgs.ToolArgs args);
7070

71+
/**
72+
* Used to override whether or not this process should run parallel
73+
* @param runParallel
74+
*/
75+
void setRunParallel(boolean runParallel);
76+
7177
/**
7278
* Updates the provided table with information from this analysis
7379
* @param table The table to update with new information

src/main/java/uk/ac/tgac/rampart/stage/analyse/asm/analysers/CegmaAsmAnalyser.java

+8
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,14 @@ public void setArgs(AnalyseAssembliesArgs.ToolArgs args) {
111111
this.args = args;
112112
}
113113

114+
/**
115+
* Always run linearly for CEGMA
116+
*/
117+
@Override
118+
public void setRunParallel(boolean runParallel) {
119+
args.setRunParallel(false);
120+
}
121+
114122
@Override
115123
public void updateTable(AssemblyStatsTable table, File reportDir) throws IOException {
116124

src/main/java/uk/ac/tgac/rampart/stage/analyse/asm/analysers/KatAsmAnalyser.java

+5
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@ public void setArgs(AnalyseAssembliesArgs.ToolArgs args) {
6565
this.args = args;
6666
}
6767

68+
@Override
69+
public void setRunParallel(boolean runParallel) {
70+
args.setRunParallel(runParallel);
71+
}
72+
6873
@Override
6974
public List<ExecutionResult> execute(List<File> assemblies, File outputDir, String jobPrefix, ConanExecutorService ces)
7075
throws InterruptedException, ProcessExecutionException, ConanParameterException, IOException {

src/main/java/uk/ac/tgac/rampart/stage/analyse/asm/analysers/QuastAsmAnalyser.java

+5
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,11 @@ public void setArgs(AnalyseAssembliesArgs.ToolArgs args) {
6969
this.args = args;
7070
}
7171

72+
@Override
73+
public void setRunParallel(boolean runParallel) {
74+
args.setRunParallel(runParallel);
75+
}
76+
7277
@Override
7378
public List<ExecutionResult> execute(List<File> assemblies, File outputDir, String jobPrefix, ConanExecutorService ces)
7479
throws InterruptedException, ProcessExecutionException, ConanParameterException, IOException {

src/test/java/uk/ac/tgac/rampart/stage/MassJobTest.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040

4141
import static org.junit.Assert.assertTrue;
4242
import static org.mockito.Matchers.any;
43+
import static org.mockito.Matchers.anyBoolean;
4344
import static org.mockito.Matchers.anyString;
4445
import static org.mockito.Mockito.doReturn;
4546
import static org.mockito.Mockito.when;
@@ -76,7 +77,7 @@ public void testExecuteAbyss() throws ProcessExecutionException, InterruptedExce
7677
when(conanProcessService.execute(massJob, ec)).thenReturn(new DefaultExecutionResult("test", 0, null, null, -1));
7778
doReturn(new DefaultExecutionResult("test", 0, null, null, -1))
7879
.when(spy)
79-
.executeAssembler((Assembler) any(), anyString(), (List<Integer>) any());
80+
.executeAssembler((Assembler) any(), anyString(), (List<Integer>) any(), anyBoolean());
8081

8182

8283
ReflectionTestUtils.setField(smParent, "conanExecutorService", conanExecutorService);
@@ -109,7 +110,7 @@ public void testExecuteSoap() throws ProcessExecutionException, InterruptedExcep
109110
when(conanProcessService.execute(massJob, ec)).thenReturn(new DefaultExecutionResult("test", 0, null, null, -1));
110111
doReturn(new DefaultExecutionResult("test", 0, null, null, -1))
111112
.when(spy)
112-
.executeAssembler((Assembler) any(), anyString(), (List<Integer>) any());
113+
.executeAssembler((Assembler) any(), anyString(), (List<Integer>) any(), anyBoolean());
113114

114115

115116
ReflectionTestUtils.setField(smParent, "conanExecutorService", conanExecutorService);
@@ -142,7 +143,7 @@ public void testExecuteVelvet() throws ProcessExecutionException, InterruptedExc
142143
when(conanProcessService.execute(massJob, ec)).thenReturn(new DefaultExecutionResult("test", 0, null, null, -1));
143144
doReturn(new DefaultExecutionResult("test", 0, null, null, -1))
144145
.when(spy)
145-
.executeAssembler((Assembler) any(), anyString(), (List<Integer>) any());
146+
.executeAssembler((Assembler) any(), anyString(), (List<Integer>) any(), anyBoolean());
146147

147148

148149
ReflectionTestUtils.setField(smParent, "conanExecutorService", conanExecutorService);

0 commit comments

Comments
 (0)