Skip to content

Commit 35bb4b8

Browse files
dglazerclaude
authored andcommitted
Fix #6182: Add Google Batch LogsPolicy PATH option for GCS bucket log storage
- Add logsBucket configuration option to BatchConfig - Support both CLOUD_LOGGING (default) and PATH log destinations - Add validation for GCS bucket paths (must start with gs://) - Include comprehensive tests for new functionality - Maintain backward compatibility 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]> Signed-off-by: Emma Rogge <[email protected]>
1 parent fd71d0e commit 35bb4b8

File tree

4 files changed

+119
-4
lines changed

4 files changed

+119
-4
lines changed

plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -447,14 +447,29 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask {
447447
return Job.newBuilder()
448448
.addTaskGroups(taskGroup)
449449
.setAllocationPolicy(allocationPolicy)
450-
.setLogsPolicy(
451-
LogsPolicy.newBuilder()
452-
.setDestination(LogsPolicy.Destination.CLOUD_LOGGING)
453-
)
450+
.setLogsPolicy(createLogsPolicy())
454451
.putAllLabels(task.config.getResourceLabels())
455452
.build()
456453
}
457454

455+
/**
456+
* Create the LogsPolicy based on configuration
457+
* @return LogsPolicy configured for either PATH (GCS bucket) or CLOUD_LOGGING
458+
*/
459+
protected LogsPolicy createLogsPolicy() {
460+
final logsBucket = executor.batchConfig.logsBucket
461+
if( logsBucket ) {
462+
return LogsPolicy.newBuilder()
463+
.setDestination(LogsPolicy.Destination.PATH)
464+
.setLogsPath(logsBucket)
465+
.build()
466+
} else {
467+
return LogsPolicy.newBuilder()
468+
.setDestination(LogsPolicy.Destination.CLOUD_LOGGING)
469+
.build()
470+
}
471+
}
472+
458473
/**
459474
* @return Retrieve the submitted task state
460475
*/

plugins/nf-google/src/main/nextflow/cloud/google/batch/client/BatchConfig.groovy

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,12 @@ class BatchConfig implements ConfigScope {
8585
""")
8686
final boolean installGpuDrivers
8787

88+
@ConfigOption
89+
@Description("""
90+
The Google Cloud Storage bucket path where job logs should be stored, e.g. `gs://my-logs-bucket/logs`. When specified, Google Batch will write job logs to this location instead of Cloud Logging. The bucket must be accessible and writable by the service account.
91+
""")
92+
final String logsBucket
93+
8894
@ConfigOption
8995
@Description("""
9096
Max number of execution attempts of a job interrupted by a Compute Engine Spot reclaim event (default: `0`).
@@ -142,6 +148,7 @@ class BatchConfig implements ConfigScope {
142148
cpuPlatform = opts.cpuPlatform
143149
gcsfuseOptions = opts.gcsfuseOptions as List<String> ?: DEFAULT_GCSFUSE_OPTS
144150
installGpuDrivers = opts.installGpuDrivers as boolean
151+
logsBucket = validateLogsBucket(opts.logsBucket)
145152
maxSpotAttempts = opts.maxSpotAttempts != null ? opts.maxSpotAttempts as int : DEFAULT_MAX_SPOT_ATTEMPTS
146153
network = opts.network
147154
networkTags = opts.networkTags as List<String> ?: Collections.emptyList()
@@ -155,4 +162,17 @@ class BatchConfig implements ConfigScope {
155162

156163
BatchRetryConfig getRetryConfig() { retry }
157164

165+
private static String validateLogsBucket(String bucket) {
166+
if( !bucket )
167+
return null
168+
169+
if( !bucket.startsWith('gs://') )
170+
throw new IllegalArgumentException("Logs bucket path must start with 'gs://' - provided: $bucket")
171+
172+
if( bucket.length() <= 5 || bucket == 'gs://' )
173+
throw new IllegalArgumentException("Invalid logs bucket path - provided: $bucket")
174+
175+
return bucket
176+
}
177+
158178
}

plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchTaskHandlerTest.groovy

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -659,6 +659,52 @@ class GoogleBatchTaskHandlerTest extends Specification {
659659
"SUCCEEDED" | JobStatus.State.FAILED | makeTaskStatus(TaskStatus.State.SUCCEEDED, "") // get from task status
660660
}
661661

662+
def 'should create submit request with logs bucket PATH policy' () {
663+
given:
664+
def WORK_DIR = CloudStorageFileSystem.forBucket('foo').getPath('/scratch')
665+
def CONTAINER_IMAGE = 'ubuntu:22.1'
666+
def LOGS_BUCKET = 'gs://my-logs-bucket/logs'
667+
668+
def session = Mock(Session) {
669+
getBucketDir() >> CloudStorageFileSystem.forBucket('foo').getPath('/')
670+
getConfig() >> [google: [project: 'test-project', batch: [logsBucket: LOGS_BUCKET]]]
671+
}
672+
673+
def exec = Mock(GoogleBatchExecutor) {
674+
getSession() >> session
675+
getBatchConfig() >> new BatchConfig([logsBucket: LOGS_BUCKET])
676+
isFusionEnabled() >> false
677+
}
678+
679+
def bean = new TaskBean(workDir: WORK_DIR, inputFiles: [:])
680+
def task = Mock(TaskRun) {
681+
toTaskBean() >> bean
682+
getHashLog() >> 'abcd1234'
683+
getWorkDir() >> WORK_DIR
684+
getContainer() >> CONTAINER_IMAGE
685+
getConfig() >> Mock(TaskConfig) {
686+
getCpus() >> 2
687+
getResourceLabels() >> [:]
688+
}
689+
}
690+
691+
def mounts = ['/mnt/disks/foo/scratch:/mnt/disks/foo/scratch:rw']
692+
def volumes = [ GCS_VOL ]
693+
def launcher = new GoogleBatchLauncherSpecMock('bash .command.run', mounts, volumes)
694+
695+
def handler = Spy(new GoogleBatchTaskHandler(task, exec))
696+
697+
when:
698+
def req = handler.newSubmitRequest(task, launcher)
699+
then:
700+
handler.fusionEnabled() >> false
701+
handler.findBestMachineType(_, false) >> null
702+
703+
and:
704+
req.getLogsPolicy().getDestination().toString() == 'PATH'
705+
req.getLogsPolicy().getLogsPath() == LOGS_BUCKET
706+
}
707+
662708
def makeTask(String name, TaskStatus.State state){
663709
Task.newBuilder().setName(name)
664710
.setStatus(TaskStatus.newBuilder().setState(state).build())

plugins/nf-google/src/test/nextflow/cloud/google/batch/client/BatchConfigTest.groovy

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,4 +67,38 @@ class BatchConfigTest extends Specification {
6767
config.bootDiskSize == MemoryUnit.of('100GB')
6868
}
6969

70+
@Requires({System.getenv('GOOGLE_APPLICATION_CREDENTIALS')})
71+
def 'should validate logs bucket config' () {
72+
when:
73+
def config = new BatchConfig([logsBucket: 'gs://my-logs-bucket/logs'])
74+
then:
75+
config.logsBucket == 'gs://my-logs-bucket/logs'
76+
77+
when:
78+
config = new BatchConfig([:])
79+
then:
80+
config.logsBucket == null
81+
}
82+
83+
@Requires({System.getenv('GOOGLE_APPLICATION_CREDENTIALS')})
84+
def 'should reject invalid logs bucket paths' () {
85+
when:
86+
new BatchConfig([logsBucket: 'invalid-bucket'])
87+
then:
88+
def e = thrown(IllegalArgumentException)
89+
e.message.contains("Logs bucket path must start with 'gs://'")
90+
91+
when:
92+
new BatchConfig([logsBucket: 'gs://'])
93+
then:
94+
e = thrown(IllegalArgumentException)
95+
e.message.contains("Invalid logs bucket path")
96+
97+
when:
98+
new BatchConfig([logsBucket: 's3://bucket'])
99+
then:
100+
e = thrown(IllegalArgumentException)
101+
e.message.contains("Logs bucket path must start with 'gs://'")
102+
}
103+
70104
}

0 commit comments

Comments
 (0)