From 9fbb521d9c27ebf1c9f03854ca05c855239bfbb1 Mon Sep 17 00:00:00 2001 From: David Glazer Date: Fri, 26 Sep 2025 06:55:04 -0700 Subject: [PATCH 1/3] Fix #6182: Add Google Batch LogsPolicy PATH option for GCS bucket log storage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add logsBucket configuration option to BatchConfig - Support both CLOUD_LOGGING (default) and PATH log destinations - Add validation for GCS bucket paths (must start with gs://) - Include comprehensive tests for new functionality - Maintain backward compatibility 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude Signed-off-by: David Glazer --- .../batch/GoogleBatchTaskHandler.groovy | 23 ++++++++-- .../google/batch/client/BatchConfig.groovy | 20 ++++++++ .../batch/GoogleBatchTaskHandlerTest.groovy | 46 +++++++++++++++++++ .../batch/client/BatchConfigTest.groovy | 34 ++++++++++++++ 4 files changed, 119 insertions(+), 4 deletions(-) diff --git a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy index 2b67dfd75b..b86666bba6 100644 --- a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy +++ b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy @@ -447,14 +447,29 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask { return Job.newBuilder() .addTaskGroups(taskGroup) .setAllocationPolicy(allocationPolicy) - .setLogsPolicy( - LogsPolicy.newBuilder() - .setDestination(LogsPolicy.Destination.CLOUD_LOGGING) - ) + .setLogsPolicy(createLogsPolicy()) .putAllLabels(task.config.getResourceLabels()) .build() } + /** + * Create the LogsPolicy based on configuration + * @return LogsPolicy configured for either PATH (GCS bucket) or CLOUD_LOGGING + */ + protected LogsPolicy createLogsPolicy() { + final logsBucket = executor.batchConfig.logsBucket + if( logsBucket ) { + return LogsPolicy.newBuilder() + .setDestination(LogsPolicy.Destination.PATH) + .setLogsPath(logsBucket) + .build() + } else { + return LogsPolicy.newBuilder() + .setDestination(LogsPolicy.Destination.CLOUD_LOGGING) + .build() + } + } + /** * @return Retrieve the submitted task state */ diff --git a/plugins/nf-google/src/main/nextflow/cloud/google/batch/client/BatchConfig.groovy b/plugins/nf-google/src/main/nextflow/cloud/google/batch/client/BatchConfig.groovy index 984fa5930f..3dbeb05b29 100644 --- a/plugins/nf-google/src/main/nextflow/cloud/google/batch/client/BatchConfig.groovy +++ b/plugins/nf-google/src/main/nextflow/cloud/google/batch/client/BatchConfig.groovy @@ -85,6 +85,12 @@ class BatchConfig implements ConfigScope { """) final boolean installGpuDrivers + @ConfigOption + @Description(""" + The Google Cloud Storage bucket path where job logs should be stored, e.g. `gs://my-logs-bucket/logs`. When specified, Google Batch will write job logs to this location instead of Cloud Logging. The bucket must be accessible and writable by the service account. + """) + final String logsBucket + @ConfigOption @Description(""" Max number of execution attempts of a job interrupted by a Compute Engine Spot reclaim event (default: `0`). @@ -142,6 +148,7 @@ class BatchConfig implements ConfigScope { cpuPlatform = opts.cpuPlatform gcsfuseOptions = opts.gcsfuseOptions as List ?: DEFAULT_GCSFUSE_OPTS installGpuDrivers = opts.installGpuDrivers as boolean + logsBucket = validateLogsBucket(opts.logsBucket) maxSpotAttempts = opts.maxSpotAttempts != null ? opts.maxSpotAttempts as int : DEFAULT_MAX_SPOT_ATTEMPTS network = opts.network networkTags = opts.networkTags as List ?: Collections.emptyList() @@ -155,4 +162,17 @@ class BatchConfig implements ConfigScope { BatchRetryConfig getRetryConfig() { retry } + private static String validateLogsBucket(String bucket) { + if( !bucket ) + return null + + if( !bucket.startsWith('gs://') ) + throw new IllegalArgumentException("Logs bucket path must start with 'gs://' - provided: $bucket") + + if( bucket.length() <= 5 || bucket == 'gs://' ) + throw new IllegalArgumentException("Invalid logs bucket path - provided: $bucket") + + return bucket + } + } diff --git a/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchTaskHandlerTest.groovy b/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchTaskHandlerTest.groovy index aa27e0e81e..e57c49e05d 100644 --- a/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchTaskHandlerTest.groovy +++ b/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchTaskHandlerTest.groovy @@ -659,6 +659,52 @@ class GoogleBatchTaskHandlerTest extends Specification { "SUCCEEDED" | JobStatus.State.FAILED | makeTaskStatus(TaskStatus.State.SUCCEEDED, "") // get from task status } + def 'should create submit request with logs bucket PATH policy' () { + given: + def WORK_DIR = CloudStorageFileSystem.forBucket('foo').getPath('/scratch') + def CONTAINER_IMAGE = 'ubuntu:22.1' + def LOGS_BUCKET = 'gs://my-logs-bucket/logs' + + def session = Mock(Session) { + getBucketDir() >> CloudStorageFileSystem.forBucket('foo').getPath('/') + getConfig() >> [google: [project: 'test-project', batch: [logsBucket: LOGS_BUCKET]]] + } + + def exec = Mock(GoogleBatchExecutor) { + getSession() >> session + getBatchConfig() >> new BatchConfig([logsBucket: LOGS_BUCKET]) + isFusionEnabled() >> false + } + + def bean = new TaskBean(workDir: WORK_DIR, inputFiles: [:]) + def task = Mock(TaskRun) { + toTaskBean() >> bean + getHashLog() >> 'abcd1234' + getWorkDir() >> WORK_DIR + getContainer() >> CONTAINER_IMAGE + getConfig() >> Mock(TaskConfig) { + getCpus() >> 2 + getResourceLabels() >> [:] + } + } + + def mounts = ['/mnt/disks/foo/scratch:/mnt/disks/foo/scratch:rw'] + def volumes = [ GCS_VOL ] + def launcher = new GoogleBatchLauncherSpecMock('bash .command.run', mounts, volumes) + + def handler = Spy(new GoogleBatchTaskHandler(task, exec)) + + when: + def req = handler.newSubmitRequest(task, launcher) + then: + handler.fusionEnabled() >> false + handler.findBestMachineType(_, false) >> null + + and: + req.getLogsPolicy().getDestination().toString() == 'PATH' + req.getLogsPolicy().getLogsPath() == LOGS_BUCKET + } + def makeTask(String name, TaskStatus.State state){ Task.newBuilder().setName(name) .setStatus(TaskStatus.newBuilder().setState(state).build()) diff --git a/plugins/nf-google/src/test/nextflow/cloud/google/batch/client/BatchConfigTest.groovy b/plugins/nf-google/src/test/nextflow/cloud/google/batch/client/BatchConfigTest.groovy index 6a2e043221..1d4965f3b6 100644 --- a/plugins/nf-google/src/test/nextflow/cloud/google/batch/client/BatchConfigTest.groovy +++ b/plugins/nf-google/src/test/nextflow/cloud/google/batch/client/BatchConfigTest.groovy @@ -67,4 +67,38 @@ class BatchConfigTest extends Specification { config.bootDiskSize == MemoryUnit.of('100GB') } + @Requires({System.getenv('GOOGLE_APPLICATION_CREDENTIALS')}) + def 'should validate logs bucket config' () { + when: + def config = new BatchConfig([logsBucket: 'gs://my-logs-bucket/logs']) + then: + config.logsBucket == 'gs://my-logs-bucket/logs' + + when: + config = new BatchConfig([:]) + then: + config.logsBucket == null + } + + @Requires({System.getenv('GOOGLE_APPLICATION_CREDENTIALS')}) + def 'should reject invalid logs bucket paths' () { + when: + new BatchConfig([logsBucket: 'invalid-bucket']) + then: + def e = thrown(IllegalArgumentException) + e.message.contains("Logs bucket path must start with 'gs://'") + + when: + new BatchConfig([logsBucket: 'gs://']) + then: + e = thrown(IllegalArgumentException) + e.message.contains("Invalid logs bucket path") + + when: + new BatchConfig([logsBucket: 's3://bucket']) + then: + e = thrown(IllegalArgumentException) + e.message.contains("Logs bucket path must start with 'gs://'") + } + } From a03cc40482605f56d43ba03f11ecf6974002d8b4 Mon Sep 17 00:00:00 2001 From: David Glazer Date: Fri, 26 Sep 2025 09:26:35 -0700 Subject: [PATCH 2/3] Add GCS bucket mounting logic for LogsPolicy PATH option MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add helper methods to extract bucket names and convert GCS paths to mount paths - Ensure logs bucket is mounted as Volume in GoogleBatchScriptLauncher - Update LogsPolicy to use container mount paths instead of GCS paths - Add comprehensive tests for bucket mounting and path conversion - Addresses reviewer feedback about missing bucket mounting requirements This ensures Google Batch can write logs to the specified GCS bucket by properly mounting it before referencing it in the LogsPolicy PATH. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude Signed-off-by: Emma Rogge --- .../batch/GoogleBatchScriptLauncher.groovy | 7 +++++ .../batch/GoogleBatchTaskHandler.groovy | 3 ++- .../google/batch/client/BatchConfig.groovy | 27 +++++++++++++++++++ .../batch/GoogleBatchTaskHandlerTest.groovy | 9 ++++++- .../batch/client/BatchConfigTest.groovy | 19 +++++++++++++ 5 files changed, 63 insertions(+), 2 deletions(-) diff --git a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchScriptLauncher.groovy b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchScriptLauncher.groovy index 176ad46123..ff9a084e08 100644 --- a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchScriptLauncher.groovy +++ b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchScriptLauncher.groovy @@ -186,6 +186,13 @@ class GoogleBatchScriptLauncher extends BashWrapperBuilder implements GoogleBatc GoogleBatchScriptLauncher withConfig(GoogleOpts config) { this.config = config + // Add logs bucket to mounted volumes if configured + if( config?.batch?.logsBucket ) { + final logsBucketName = config.batch.extractBucketName(config.batch.logsBucket) + if( logsBucketName ) { + buckets.add(logsBucketName) + } + } return this } diff --git a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy index b86666bba6..dfd7d3aa5c 100644 --- a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy +++ b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy @@ -459,9 +459,10 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask { protected LogsPolicy createLogsPolicy() { final logsBucket = executor.batchConfig.logsBucket if( logsBucket ) { + final containerPath = executor.batchConfig.convertGcsPathToMountPath(logsBucket) return LogsPolicy.newBuilder() .setDestination(LogsPolicy.Destination.PATH) - .setLogsPath(logsBucket) + .setLogsPath(containerPath) .build() } else { return LogsPolicy.newBuilder() diff --git a/plugins/nf-google/src/main/nextflow/cloud/google/batch/client/BatchConfig.groovy b/plugins/nf-google/src/main/nextflow/cloud/google/batch/client/BatchConfig.groovy index 3dbeb05b29..f100d1e028 100644 --- a/plugins/nf-google/src/main/nextflow/cloud/google/batch/client/BatchConfig.groovy +++ b/plugins/nf-google/src/main/nextflow/cloud/google/batch/client/BatchConfig.groovy @@ -175,4 +175,31 @@ class BatchConfig implements ConfigScope { return bucket } + /** + * Extract the bucket name from a GCS path + * @param gcsPath GCS path like "gs://bucket-name/path/to/logs" + * @return bucket name like "bucket-name" + */ + static String extractBucketName(String gcsPath) { + if( !gcsPath || !gcsPath.startsWith('gs://') ) + return null + + final pathWithoutProtocol = gcsPath.substring(5) // Remove "gs://" + final slashIndex = pathWithoutProtocol.indexOf('/') + return slashIndex > 0 ? pathWithoutProtocol.substring(0, slashIndex) : pathWithoutProtocol + } + + /** + * Convert a GCS path to container mount path + * @param gcsPath GCS path like "gs://bucket-name/path/to/logs" + * @return container path like "/mnt/disks/bucket-name/path/to/logs" + */ + static String convertGcsPathToMountPath(String gcsPath) { + if( !gcsPath || !gcsPath.startsWith('gs://') ) + return gcsPath + + final pathWithoutProtocol = gcsPath.substring(5) // Remove "gs://" + return "/mnt/disks/${pathWithoutProtocol}" + } + } diff --git a/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchTaskHandlerTest.groovy b/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchTaskHandlerTest.groovy index e57c49e05d..17a0fcfd51 100644 --- a/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchTaskHandlerTest.groovy +++ b/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchTaskHandlerTest.groovy @@ -702,7 +702,14 @@ class GoogleBatchTaskHandlerTest extends Specification { and: req.getLogsPolicy().getDestination().toString() == 'PATH' - req.getLogsPolicy().getLogsPath() == LOGS_BUCKET + req.getLogsPolicy().getLogsPath() == '/mnt/disks/my-logs-bucket/logs' + and: + def taskGroup = req.getTaskGroups(0) + def volumes = taskGroup.getTaskSpec().getVolumesList() + volumes.size() >= 2 // At least work dir volume and logs bucket volume + def logsBucketVolume = volumes.find { it.getGcs().getRemotePath() == 'my-logs-bucket' } + logsBucketVolume != null + logsBucketVolume.getMountPath() == '/mnt/disks/my-logs-bucket' } def makeTask(String name, TaskStatus.State state){ diff --git a/plugins/nf-google/src/test/nextflow/cloud/google/batch/client/BatchConfigTest.groovy b/plugins/nf-google/src/test/nextflow/cloud/google/batch/client/BatchConfigTest.groovy index 1d4965f3b6..21adb4d54d 100644 --- a/plugins/nf-google/src/test/nextflow/cloud/google/batch/client/BatchConfigTest.groovy +++ b/plugins/nf-google/src/test/nextflow/cloud/google/batch/client/BatchConfigTest.groovy @@ -101,4 +101,23 @@ class BatchConfigTest extends Specification { e.message.contains("Logs bucket path must start with 'gs://'") } + def 'should extract bucket name from GCS path' () { + expect: + BatchConfig.extractBucketName('gs://my-bucket') == 'my-bucket' + BatchConfig.extractBucketName('gs://my-bucket/logs') == 'my-bucket' + BatchConfig.extractBucketName('gs://my-bucket/path/to/logs') == 'my-bucket' + BatchConfig.extractBucketName('gs://') == '' + BatchConfig.extractBucketName('invalid-path') == null + BatchConfig.extractBucketName(null) == null + } + + def 'should convert GCS path to mount path' () { + expect: + BatchConfig.convertGcsPathToMountPath('gs://my-bucket') == '/mnt/disks/my-bucket' + BatchConfig.convertGcsPathToMountPath('gs://my-bucket/logs') == '/mnt/disks/my-bucket/logs' + BatchConfig.convertGcsPathToMountPath('gs://my-bucket/path/to/logs') == '/mnt/disks/my-bucket/path/to/logs' + BatchConfig.convertGcsPathToMountPath('invalid-path') == 'invalid-path' + BatchConfig.convertGcsPathToMountPath(null) == null + } + } From b105b0e8fc06f5434aa686ff3b376befb964df3f Mon Sep 17 00:00:00 2001 From: Emma Rogge Date: Mon, 29 Sep 2025 14:21:45 +0000 Subject: [PATCH 3/3] Fix two minor issues 1. Rename variable due to existing variable with same name. 2. Specify parameter type as string. Signed-off-by: Emma Rogge --- .../cloud/google/batch/client/BatchConfig.groovy | 2 +- .../google/batch/GoogleBatchTaskHandlerTest.groovy | 11 +++++++---- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/plugins/nf-google/src/main/nextflow/cloud/google/batch/client/BatchConfig.groovy b/plugins/nf-google/src/main/nextflow/cloud/google/batch/client/BatchConfig.groovy index f100d1e028..2b5d7ebf1e 100644 --- a/plugins/nf-google/src/main/nextflow/cloud/google/batch/client/BatchConfig.groovy +++ b/plugins/nf-google/src/main/nextflow/cloud/google/batch/client/BatchConfig.groovy @@ -148,7 +148,7 @@ class BatchConfig implements ConfigScope { cpuPlatform = opts.cpuPlatform gcsfuseOptions = opts.gcsfuseOptions as List ?: DEFAULT_GCSFUSE_OPTS installGpuDrivers = opts.installGpuDrivers as boolean - logsBucket = validateLogsBucket(opts.logsBucket) + logsBucket = validateLogsBucket(opts.logsBucket as String) maxSpotAttempts = opts.maxSpotAttempts != null ? opts.maxSpotAttempts as int : DEFAULT_MAX_SPOT_ATTEMPTS network = opts.network networkTags = opts.networkTags as List ?: Collections.emptyList() diff --git a/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchTaskHandlerTest.groovy b/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchTaskHandlerTest.groovy index 17a0fcfd51..6f1ee7d769 100644 --- a/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchTaskHandlerTest.groovy +++ b/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchTaskHandlerTest.groovy @@ -661,6 +661,7 @@ class GoogleBatchTaskHandlerTest extends Specification { def 'should create submit request with logs bucket PATH policy' () { given: + def GCS_VOL = Volume.newBuilder().setGcs(GCS.newBuilder().setRemotePath('foo').build() ).build() def WORK_DIR = CloudStorageFileSystem.forBucket('foo').getPath('/scratch') def CONTAINER_IMAGE = 'ubuntu:22.1' def LOGS_BUCKET = 'gs://my-logs-bucket/logs' @@ -673,6 +674,7 @@ class GoogleBatchTaskHandlerTest extends Specification { def exec = Mock(GoogleBatchExecutor) { getSession() >> session getBatchConfig() >> new BatchConfig([logsBucket: LOGS_BUCKET]) + getConfig() >> Mock(ExecutorConfig) isFusionEnabled() >> false } @@ -688,8 +690,9 @@ class GoogleBatchTaskHandlerTest extends Specification { } } + def LOGS_VOL = Volume.newBuilder().setGcs(GCS.newBuilder().setRemotePath('my-logs-bucket').build()).setMountPath('/mnt/disks/my-logs-bucket').build() def mounts = ['/mnt/disks/foo/scratch:/mnt/disks/foo/scratch:rw'] - def volumes = [ GCS_VOL ] + def volumes = [ GCS_VOL, LOGS_VOL ] def launcher = new GoogleBatchLauncherSpecMock('bash .command.run', mounts, volumes) def handler = Spy(new GoogleBatchTaskHandler(task, exec)) @@ -705,9 +708,9 @@ class GoogleBatchTaskHandlerTest extends Specification { req.getLogsPolicy().getLogsPath() == '/mnt/disks/my-logs-bucket/logs' and: def taskGroup = req.getTaskGroups(0) - def volumes = taskGroup.getTaskSpec().getVolumesList() - volumes.size() >= 2 // At least work dir volume and logs bucket volume - def logsBucketVolume = volumes.find { it.getGcs().getRemotePath() == 'my-logs-bucket' } + def taskVolumes = taskGroup.getTaskSpec().getVolumesList() + taskVolumes.size() >= 2 // At least work dir volume and logs bucket volume + def logsBucketVolume = taskVolumes.find { it.getGcs().getRemotePath() == 'my-logs-bucket' } logsBucketVolume != null logsBucketVolume.getMountPath() == '/mnt/disks/my-logs-bucket' }