From dee69a8e44bea1aa466556cc71ab3af0d3b7acea Mon Sep 17 00:00:00 2001 From: Tomiles <116039+tomiles@users.noreply.github.com> Date: Thu, 2 Apr 2026 22:49:57 +0200 Subject: [PATCH 1/6] feat(nf-google): add google.batch stage-in/out copy transports (posix, gcloud, gsutil) Signed-off-by: Tomiles <116039+tomiles@users.noreply.github.com> --- .../google/batch/GoogleBatchBashLib.groovy | 269 ++++++++++++++++++ .../batch/GoogleBatchFileCopyStrategy.groovy | 170 +++++++++++ .../batch/GoogleBatchScriptLauncher.groovy | 15 +- .../batch/GoogleBatchTaskHandler.groovy | 2 +- .../google/batch/client/BatchConfig.groovy | 92 ++++++ .../GoogleBatchFileCopyStrategyTest.groovy | 127 +++++++++ .../batch/client/BatchConfigTest.groovy | 41 ++- 7 files changed, 711 insertions(+), 5 deletions(-) create mode 100644 plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchBashLib.groovy create mode 100644 plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchFileCopyStrategy.groovy create mode 100644 plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchFileCopyStrategyTest.groovy diff --git a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchBashLib.groovy b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchBashLib.groovy new file mode 100644 index 0000000000..619998a082 --- /dev/null +++ b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchBashLib.groovy @@ -0,0 +1,269 @@ +/* + * Copyright 2013-2026, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.cloud.google.batch + +import groovy.transform.CompileStatic +import groovy.transform.Memoized +import nextflow.cloud.google.batch.client.BatchConfig +import nextflow.executor.BashFunLib +import nextflow.util.Escape + +/** + * Bash helpers for Google Cloud Storage transfers in Google Batch tasks. + * Order within each transport is controlled by {@code NXF_STAGE_IN_COPY_TRANSPORT} / {@code NXF_STAGE_OUT_COPY_TRANSPORT}. + * + * @author Paolo Di Tommaso + */ +@CompileStatic +class GoogleBatchBashLib extends BashFunLib { + + private String gcloudCli = 'gcloud' + + private String gsutilCli = 'gsutil' + + private BatchConfig batchConfig + + GoogleBatchBashLib withGcloudCli(String value) { + if( value ) + this.gcloudCli = value + return this + } + + GoogleBatchBashLib withGsutilCli(String value) { + if( value ) + this.gsutilCli = value + return this + } + + GoogleBatchBashLib withBatchConfig(BatchConfig config) { + this.batchConfig = config + return this + } + + protected String gsLib() { + final gcloud = Escape.path(gcloudCli ?: 'gcloud') + final gsutil = Escape.path(gsutilCli ?: 'gsutil') + final mountRoot = GoogleBatchScriptLauncher.MOUNT_ROOT + """ + export NXF_GCLOUD=$gcloud + export NXF_GSUTIL=$gsutil + export NXF_GS_MOUNT_ROOT="$mountRoot" + + nxf_gs_uri_to_mount() { + case "\$1" in + gs://*) + local p=\${1#gs://} + local bucket=\${p%%/*} + if [[ "\$bucket" == "\$p" ]]; then + echo "\$NXF_GS_MOUNT_ROOT/\$bucket" + else + local rest=\${p#*/} + echo "\$NXF_GS_MOUNT_ROOT/\$bucket/\$rest" + fi + ;; + *) + echo "" + ;; + esac + } + + nxf_gs_download_gcloud() { + command -v "\$NXF_GCLOUD" >/dev/null 2>&1 || return 1 + local source=\$1 + local target=\$2 + local basedir=\$(dirname "\$2") + mkdir -p "\$basedir" + if \$NXF_GCLOUD storage cp "\$source" "\$target" 2>/dev/null; then + return 0 + fi + mkdir -p "\$target" + if \$NXF_GCLOUD storage cp --recursive "\$source" "\$target" >/dev/null 2>&1; then + return 0 + fi + rm -rf "\$target" + return 1 + } + + nxf_gs_download_gsutil() { + command -v "\$NXF_GSUTIL" >/dev/null 2>&1 || return 1 + local source=\$1 + local target=\$2 + local basedir=\$(dirname "\$2") + mkdir -p "\$basedir" + local ret + ret=\$(\$NXF_GSUTIL cp "\$source" "\$target" 2>&1) || { + mkdir -p "\$target" + \$NXF_GSUTIL -m cp -r "\$source" "\$target" >/dev/null || { + rm -rf "\$target" + return 1 + } + } + return 0 + } + + nxf_gs_download_mount() { + local source=\$1 + local target=\$2 + local ms + ms=\$(nxf_gs_uri_to_mount "\$source") + [[ -n "\$ms" ]] || return 1 + [[ -e "\$ms" || -d "\$ms" ]] || return 1 + mkdir -p "\$(dirname "\$target")" + cp -fRL "\$ms" "\$target" + } + + nxf_gs_download() { + local source=\$1 + local target=\$2 + case "\${NXF_STAGE_IN_COPY_TRANSPORT:-posix}" in + gsutil) + if nxf_gs_download_gsutil "\$source" "\$target"; then return 0; fi + if nxf_gs_download_gcloud "\$source" "\$target"; then return 0; fi + if nxf_gs_download_mount "\$source" "\$target"; then return 0; fi + ;; + gcloud) + if nxf_gs_download_gcloud "\$source" "\$target"; then return 0; fi + if nxf_gs_download_gsutil "\$source" "\$target"; then return 0; fi + if nxf_gs_download_mount "\$source" "\$target"; then return 0; fi + ;; + posix|*) + if nxf_gs_download_mount "\$source" "\$target"; then return 0; fi + ;; + esac + >&2 echo "Unable to download path: \$source" + exit 1 + } + + nxf_gs_upload_try_gcloud() { + command -v "\$NXF_GCLOUD" >/dev/null 2>&1 || return 1 + local name=\$1 + local gspath=\$2 + if [[ "\$name" == '-' ]]; then + return 1 + fi + if [[ -d "\$name" ]]; then + \$NXF_GCLOUD storage cp --recursive "\$name" "\$gspath/\$name" + else + \$NXF_GCLOUD storage cp "\$name" "\$gspath/\$name" + fi + } + + nxf_gs_upload_try_gsutil() { + command -v "\$NXF_GSUTIL" >/dev/null 2>&1 || return 1 + local name=\$1 + local gspath=\$2 + if [[ "\$name" == '-' ]]; then + return 1 + fi + if [[ -d "\$name" ]]; then + \$NXF_GSUTIL -m cp -r "\$name" "\$gspath/\$name" + else + \$NXF_GSUTIL cp "\$name" "\$gspath/\$name" + fi + } + + nxf_gs_upload_try_mount() { + local name=\$1 + local gspath=\$2 + if [[ "\$name" == '-' ]]; then + return 1 + fi + local dest + dest=\$(nxf_gs_uri_to_mount "\$gspath/\$name") + [[ -n "\$dest" ]] || return 1 + mkdir -p "\$(dirname "\$dest")" + cp -fRL "\$name" "\$dest" + } + + nxf_gs_upload() { + local name=\$1 + local gspath=\${2%/} + local move=\${3:-0} + if [[ "\$name" == '-' ]]; then + if command -v "\$NXF_GCLOUD" >/dev/null 2>&1 && \$NXF_GCLOUD storage cp - "\$gspath"; then + : + elif command -v "\$NXF_GSUTIL" >/dev/null 2>&1 && \$NXF_GSUTIL cp - "\$gspath"; then + : + else + return 1 + fi + else + case "\${NXF_STAGE_OUT_COPY_TRANSPORT:-posix}" in + gsutil) + if nxf_gs_upload_try_gsutil "\$name" "\$gspath"; then :; + elif nxf_gs_upload_try_gcloud "\$name" "\$gspath"; then :; + elif nxf_gs_upload_try_mount "\$name" "\$gspath"; then :; + else + >&2 echo "Unable to upload path: \$name" + return 1 + fi + ;; + gcloud) + if nxf_gs_upload_try_gcloud "\$name" "\$gspath"; then :; + elif nxf_gs_upload_try_gsutil "\$name" "\$gspath"; then :; + elif nxf_gs_upload_try_mount "\$name" "\$gspath"; then :; + else + >&2 echo "Unable to upload path: \$name" + return 1 + fi + ;; + posix|*) + if nxf_gs_upload_try_mount "\$name" "\$gspath"; then :; + else + >&2 echo "Unable to upload path: \$name" + return 1 + fi + ;; + esac + fi + if [[ "\$move" == "1" && "\$name" != '-' ]]; then + rm -rf "\$name" + fi + } + """.stripIndent(true) + } + + protected String transportExports() { + final cfg = batchConfig + if( !cfg ) + return '' + final inTr = cfg.stageInCopyTransport ?: BatchConfig.COPY_TRANSPORT_POSIX + final outTr = cfg.stageOutCopyTransport ?: BatchConfig.COPY_TRANSPORT_POSIX + """ + export NXF_STAGE_IN_COPY_TRANSPORT=${Escape.path(inTr)} + export NXF_STAGE_OUT_COPY_TRANSPORT=${Escape.path(outTr)} + """.stripIndent(true) + } + + @Override + String render() { + super.render() + transportExports() + gsLib() + } + + @Memoized + static String script(BatchConfig config) { + new GoogleBatchBashLib() + .includeCoreFun(true) + .withMaxParallelTransfers(config.maxParallelTransfers) + .withMaxTransferAttempts(config.maxTransferAttempts) + .withDelayBetweenAttempts(config.delayBetweenAttempts) + .withGcloudCli(config.gcloudCli) + .withGsutilCli(config.gsutilCli) + .withBatchConfig(config) + .render() + } +} diff --git a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchFileCopyStrategy.groovy b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchFileCopyStrategy.groovy new file mode 100644 index 0000000000..891355d44a --- /dev/null +++ b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchFileCopyStrategy.groovy @@ -0,0 +1,170 @@ +/* + * Copyright 2013-2026, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.cloud.google.batch + +import java.nio.file.Path + +import com.google.cloud.storage.contrib.nio.CloudStoragePath +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j +import nextflow.cloud.google.batch.client.BatchConfig +import nextflow.executor.SimpleFileCopyStrategy +import nextflow.extension.FilesEx +import nextflow.processor.TaskBean +import nextflow.util.Escape + +/** + * Optional Google Batch staging when {@link BatchConfig#usesGoogleBatchStaging()} is true, honouring + * {@link BatchConfig#stageInCopyTransport} / {@link BatchConfig#stageOutCopyTransport} for {@code copy} modes. + * + * @author Paolo Di Tommaso + */ +@Slf4j +@CompileStatic +class GoogleBatchFileCopyStrategy extends SimpleFileCopyStrategy { + + private final BatchConfig batchConfig + + GoogleBatchFileCopyStrategy(TaskBean bean, BatchConfig batchConfig) { + super(bean) + this.batchConfig = batchConfig + } + + /** + * CLI stage-in when a CLI transport is selected and {@code stageInMode=copy}. + */ + private boolean useCliStageInCopy() { + return BatchConfig.isCliCopyTransport(batchConfig.stageInCopyTransport) && 'copy' == stageinMode + } + + private String effectiveStageOutMode() { + stageoutMode ?: ( workDir==targetDir ? 'copy' : 'move' ) + } + + private boolean shouldUseCliStageOutCopy() { + if( effectiveStageOutMode() != 'copy' ) + return false + return BatchConfig.isCliCopyTransport(batchConfig.stageOutCopyTransport) + } + + private boolean needsGoogleBatchBashLib() { + useCliStageInCopy() || shouldUseCliStageOutCopy() + } + + @Override + String getBeforeStartScript() { + final base = super.getBeforeStartScript() + if( !needsGoogleBatchBashLib() ) + return base + final gs = GoogleBatchBashLib.script(batchConfig) + return gs + (base ? '\n' + base : '') + } + + @Override + String getStageInputFilesScript(Map inputFiles) { + if( !useCliStageInCopy() ) + return super.getStageInputFilesScript(inputFiles) + def result = 'downloads=(true)\n' + result += super.getStageInputFilesScript(inputFiles) + '\n' + result += 'nxf_parallel "${downloads[@]}"\n' + return result + } + + @Override + String stageInputFile(Path path, String targetName) { + if( !useCliStageInCopy() ) + return super.stageInputFile(path, targetName) + final gsUri = gsUriForCliStageIn(path) + if( gsUri != null ) { + final cmd = batchConfig.maxTransferAttempts > 1 + ? "downloads+=(\"nxf_cp_retry nxf_gs_download '${gsUri}' ${Escape.path(targetName)}\")" + : "downloads+=(\"nxf_gs_download '${gsUri}' ${Escape.path(targetName)}\")" + return cmd + } + return super.stageInputFile(path, targetName) + } + + /** + * Resolve a {@code gs://} URI for CLI stage-in: from {@link CloudStoragePath}, or from a container fuse path under {@link GoogleBatchScriptLauncher#MOUNT_ROOT}. + */ + private static String gsUriForCliStageIn(Path path) { + if( path instanceof CloudStoragePath ) + return FilesEx.toUriString((CloudStoragePath)path) + final s = path.toString() + final prefix = GoogleBatchScriptLauncher.MOUNT_ROOT + '/' + if( s.startsWith(prefix) ) + return toGsUriFromContainerMount(path) + return null + } + + @Override + String getUnstageOutputFilesScript(List outputFiles, Path targetDir) { + final mode = effectiveStageOutMode() + if( mode == 'move' ) + return super.getUnstageOutputFilesScript(outputFiles, targetDir) + if( mode == 'copy' && shouldUseCliStageOutCopy() ) + return getUnstageOutputFilesScriptGcloud(outputFiles, targetDir) + return super.getUnstageOutputFilesScript(outputFiles, targetDir) + } + + private String getUnstageOutputFilesScriptGcloud(List outputFiles, Path targetDir) { + final patterns = normalizeGlobStarPaths(outputFiles) + log.trace "[GOOGLE BATCH] Unstaging file path (CLI transport): $patterns" + + if( !patterns ) + return null + + final gsTarget = toGsUriFromContainerMount(targetDir) + final escape = new ArrayList(outputFiles.size()) + for( String it : patterns ) + escape.add( Escape.path(it) ) + + return """\ + uploads=() + IFS=\$'\\n' + for name in \$(eval "ls -1d ${escape.join(' ')}" | sort | uniq); do + uploads+=("nxf_gs_upload '\$name' '${gsTarget}' 0") + done + unset IFS + nxf_parallel "\${uploads[@]}" + """.stripIndent(true) + } + + @Override + String fileStr(Path path) { + !useCliStageInCopy() ? super.fileStr(path) : Escape.path(path.getFileName()) + } + + @Override + String pipeInputFile(Path file) { + !useCliStageInCopy() ? super.pipeInputFile(file) : " < ${Escape.path(file.getFileName())}" + } + + static String toGsUriFromContainerMount(Path containerPath) { + final s = containerPath.toString() + final prefix = GoogleBatchScriptLauncher.MOUNT_ROOT + '/' + if( !s.startsWith(prefix) ) + throw new IllegalArgumentException("Expected path under ${GoogleBatchScriptLauncher.MOUNT_ROOT}, got: $s") + final rest = s.substring(prefix.length()) + final slash = rest.indexOf('/') + if( slash < 0 ) + return "gs://$rest/" + final bucket = rest.substring(0, slash) + final obj = rest.substring(slash) + return "gs://$bucket$obj" + } +} diff --git a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchScriptLauncher.groovy b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchScriptLauncher.groovy index 9db0bf9fb2..ff9b862f41 100644 --- a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchScriptLauncher.groovy +++ b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchScriptLauncher.groovy @@ -25,7 +25,10 @@ import com.google.cloud.storage.contrib.nio.CloudStoragePath import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import nextflow.cloud.google.GoogleOpts +import nextflow.cloud.google.batch.client.BatchConfig import nextflow.executor.BashWrapperBuilder +import nextflow.executor.ScriptFileCopyStrategy +import nextflow.executor.SimpleFileCopyStrategy import nextflow.extension.FilesEx import nextflow.processor.TaskBean import nextflow.processor.TaskRun @@ -42,7 +45,7 @@ import nextflow.util.TestOnly @CompileStatic class GoogleBatchScriptLauncher extends BashWrapperBuilder implements GoogleBatchLauncherSpec { - private static final String MOUNT_ROOT = '/mnt/disks' + public static final String MOUNT_ROOT = '/mnt/disks' private GoogleOpts config private CloudStoragePath remoteWorkDir @@ -54,8 +57,8 @@ class GoogleBatchScriptLauncher extends BashWrapperBuilder implements GoogleBatc @TestOnly protected GoogleBatchScriptLauncher() {} - GoogleBatchScriptLauncher(TaskBean bean, Path remoteBinDir) { - super(bean) + GoogleBatchScriptLauncher(TaskBean bean, Path remoteBinDir, BatchConfig batchConfig) { + super(bean, copyStrategyFor(bean, batchConfig)) // keep track the google storage work dir this.remoteWorkDir = (CloudStoragePath) bean.workDir this.remoteBinDir = toContainerMount(remoteBinDir) @@ -101,6 +104,12 @@ class GoogleBatchScriptLauncher extends BashWrapperBuilder implements GoogleBatc scratch = true } + private static ScriptFileCopyStrategy copyStrategyFor(TaskBean bean, BatchConfig batchConfig) { + batchConfig.usesGoogleBatchStaging() + ? new GoogleBatchFileCopyStrategy(bean, batchConfig) + : new SimpleFileCopyStrategy(bean) + } + protected String headerScript(TaskBean bean) { def result = "NXF_CHDIR=${Escape.path(bean.workDir)}\n" if( remoteBinDir ) { diff --git a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy index 792ec949d5..656f875955 100644 --- a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy +++ b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy @@ -169,7 +169,7 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask { } else { final taskBean = task.toTaskBean() - return new GoogleBatchScriptLauncher(taskBean, executor.remoteBinDir) + return new GoogleBatchScriptLauncher(taskBean, executor.remoteBinDir, batchConfig) .withConfig(executor.googleOpts) .withIsArray(task.isArray()) } diff --git a/plugins/nf-google/src/main/nextflow/cloud/google/batch/client/BatchConfig.groovy b/plugins/nf-google/src/main/nextflow/cloud/google/batch/client/BatchConfig.groovy index b3d28343fa..3f729d2b70 100644 --- a/plugins/nf-google/src/main/nextflow/cloud/google/batch/client/BatchConfig.groovy +++ b/plugins/nf-google/src/main/nextflow/cloud/google/batch/client/BatchConfig.groovy @@ -20,9 +20,11 @@ import java.nio.file.Path import groovy.transform.CompileStatic import groovy.util.logging.Slf4j +import nextflow.cloud.CloudTransferOptions import nextflow.config.spec.ConfigOption import nextflow.config.spec.ConfigScope import nextflow.script.dsl.Description +import nextflow.util.Duration import nextflow.util.MemoryUnit /** * Model Google Batch config settings @@ -33,6 +35,23 @@ import nextflow.util.MemoryUnit @CompileStatic class BatchConfig implements ConfigScope { + /** + * Copy via POSIX {@code cp}/links from gcsfuse paths under {@code /mnt/disks/...}. + */ + static final String COPY_TRANSPORT_POSIX = 'posix' + + /** + * Copy using {@code gcloud storage} first, then {@code gsutil}, then POSIX from the mount (see {@link nextflow.cloud.google.batch.GoogleBatchBashLib}). + */ + static final String COPY_TRANSPORT_GCLOUD = 'gcloud' + + /** + * Copy using {@code gsutil} first, then {@code gcloud storage}, then POSIX from the mount. + */ + static final String COPY_TRANSPORT_GSUTIL = 'gsutil' + + static final private List VALID_COPY_TRANSPORTS = List.of(COPY_TRANSPORT_POSIX, COPY_TRANSPORT_GCLOUD, COPY_TRANSPORT_GSUTIL) + static final private int DEFAULT_MAX_SPOT_ATTEMPTS = 0 static final private List DEFAULT_RETRY_LIST = List.of(50001) @@ -79,6 +98,52 @@ class BatchConfig implements ConfigScope { """) final List gcsfuseOptions + @ConfigOption + @Description(""" + Path to the `gcloud` executable for `gcloud storage` transfers (default: `gcloud` on `PATH`). + """) + final String gcloudCli + + @ConfigOption + @Description(""" + Path to the `gsutil` executable (default: `gsutil` on `PATH`). + """) + final String gsutilCli + + @ConfigOption + @Description(""" + Maximum parallel object-storage transfers when using `gcloud` / `gsutil` staging (default: same as other cloud executors). + """) + final int maxParallelTransfers + + @ConfigOption + @Description(""" + Maximum retry attempts for each `gcloud` / `gsutil` transfer when using CLI-based staging. + """) + final int maxTransferAttempts + + @ConfigOption + @Description(""" + Delay between retry attempts for `gcloud` / `gsutil` transfers when using CLI-based staging. + """) + final Duration delayBetweenAttempts + + @ConfigOption + @Description(""" + When neither this nor `stageOutCopyTransport` is `gcloud` or `gsutil` (including when both are unset or only `posix`), the default is POSIX staging via {@link nextflow.executor.SimpleFileCopyStrategy} with gcsfuse mounts under `/mnt/disks`. + + When set to `posix`, `gcloud`, or `gsutil`, it participates in selecting **input** copy behaviour (with `stageOutCopyTransport`). `posix` uses `cp`/links from the mount. `gcloud` / `gsutil` use the respective CLI (with fallbacks) when `process stageInMode` is `copy` (paths are under the gcsfuse mount; `gs://` URIs are derived for CLI copy). Other `stageInMode` values use the mount. Buckets remain mounted for POSIX fallback. + """) + final String stageInCopyTransport + + @ConfigOption + @Description(""" + When neither this nor `stageInCopyTransport` is `gcloud` or `gsutil` (including when both are unset or only `posix`), the default is POSIX staging via {@link nextflow.executor.SimpleFileCopyStrategy} with gcsfuse mounts only. + + When set to `posix`, `gcloud`, or `gsutil`, it participates in selecting **output** copy behaviour (with `stageInCopyTransport`): `posix` uses POSIX from the gcsfuse mount; `gcloud` / `gsutil` use CLIs when the effective `stageOutMode` is `copy`. `move`, `rsync`, `rclone`, and `fcp` use existing wrapper behaviour (POSIX via mount). CLI-based `move` staging is not implemented. + """) + final String stageOutCopyTransport + @ConfigOption @Description(""" """) @@ -152,6 +217,13 @@ class BatchConfig implements ConfigScope { bootDiskSize = opts.bootDiskSize as MemoryUnit cpuPlatform = opts.cpuPlatform gcsfuseOptions = opts.gcsfuseOptions as List ?: DEFAULT_GCSFUSE_OPTS + gcloudCli = opts.gcloudCli as String + gsutilCli = opts.gsutilCli as String + maxParallelTransfers = opts.maxParallelTransfers != null ? opts.maxParallelTransfers as int : CloudTransferOptions.MAX_TRANSFER + maxTransferAttempts = opts.maxTransferAttempts != null ? opts.maxTransferAttempts as int : CloudTransferOptions.MAX_TRANSFER_ATTEMPTS + delayBetweenAttempts = opts.delayBetweenAttempts ? opts.delayBetweenAttempts as Duration : CloudTransferOptions.DEFAULT_DELAY_BETWEEN_ATTEMPTS + stageInCopyTransport = normaliseOptionalCopyTransport(opts.stageInCopyTransport) + stageOutCopyTransport = normaliseOptionalCopyTransport(opts.stageOutCopyTransport) installGpuDrivers = opts.installGpuDrivers as boolean installOpsAgent = opts.installOpsAgent as boolean logsPath = opts.logsPath @@ -166,6 +238,26 @@ class BatchConfig implements ConfigScope { usePrivateAddress = opts.usePrivateAddress as boolean } + /** + * Load {@link nextflow.cloud.google.batch.GoogleBatchFileCopyStrategy} when any transport requests CLI object-storage copy. + */ + boolean usesGoogleBatchStaging() { + return isCliCopyTransport(stageInCopyTransport) || isCliCopyTransport(stageOutCopyTransport) + } + + static boolean isCliCopyTransport(String transport) { + return COPY_TRANSPORT_GCLOUD == transport || COPY_TRANSPORT_GSUTIL == transport + } + + private static String normaliseOptionalCopyTransport(Object value) { + if( value == null ) + return null + final t = value as String + if( !VALID_COPY_TRANSPORTS.contains(t) ) + throw new IllegalArgumentException("Invalid google.batch copy transport: '$t' — valid values are: ${VALID_COPY_TRANSPORTS.join(', ')}") + return t + } + BatchRetryConfig getRetryConfig() { retry } Path logsPath() { diff --git a/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchFileCopyStrategyTest.groovy b/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchFileCopyStrategyTest.groovy new file mode 100644 index 0000000000..361a10a208 --- /dev/null +++ b/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchFileCopyStrategyTest.groovy @@ -0,0 +1,127 @@ +/* + * Copyright 2013-2026, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.cloud.google.batch + +import java.nio.file.Paths + +import nextflow.cloud.google.batch.client.BatchConfig +import nextflow.processor.TaskBean +import spock.lang.Specification + +class GoogleBatchFileCopyStrategyTest extends Specification { + + def 'should map container mount path to gs uri' () { + expect: + GoogleBatchFileCopyStrategy.toGsUriFromContainerMount(Paths.get('/mnt/disks/mybucket/work/dir')) == 'gs://mybucket/work/dir' + GoogleBatchFileCopyStrategy.toGsUriFromContainerMount(Paths.get('/mnt/disks/onlybucket')) == 'gs://onlybucket/' + } + + def 'should build gs download staging line when gcloud copy transport' () { + given: + def batch = new BatchConfig([ + stageInCopyTransport: 'gcloud' + ]) + def bean = Mock(TaskBean) { + getWorkDir() >> Paths.get('/mnt/disks/w/x') + getTargetDir() >> Paths.get('/mnt/disks/w/x') + getStageInMode() >> 'copy' + } + def path = Paths.get('/mnt/disks/b/data/in.txt') + def copy = new GoogleBatchFileCopyStrategy(bean, batch) + + expect: + copy.stageInputFile(path, 'in.txt') == 'downloads+=("nxf_gs_download \'gs://b/data/in.txt\' in.txt")' + } + + def 'should build gs upload unstage script' () { + given: + def batch = new BatchConfig([stageOutCopyTransport: 'gcloud']) + def wd = Paths.get('/mnt/disks/foo/wd') + def bean = Mock(TaskBean) { + getWorkDir() >> wd + getTargetDir() >> wd + getStageOutMode() >> null + } + def copy = new GoogleBatchFileCopyStrategy(bean, batch) + + when: + def script = copy.getUnstageOutputFilesScript(['out.txt'], wd) + then: + script.trim() == ''' + uploads=() + IFS=$'\\n' + for name in $(eval "ls -1d out.txt" | sort | uniq); do + uploads+=("nxf_gs_upload '$name' 'gs://foo/wd' 0") + done + unset IFS + nxf_parallel "${uploads[@]}" + ''' + .stripIndent().trim() + } + + def 'should use posix symlink when stageIn is not cli copy' () { + given: + def batch = new BatchConfig([stageOutCopyTransport: 'gcloud']) + def wd = Paths.get('/mnt/disks/foo/wd') + def bean = Mock(TaskBean) { + getWorkDir() >> wd + getTargetDir() >> wd + getStageInMode() >> 'symlink' + } + def path = Paths.get('/mnt/disks/foo/bucket/data.txt') + def copy = new GoogleBatchFileCopyStrategy(bean, batch) + + expect: + copy.stageInputFile(path, 'data.txt') == 'ln -s /mnt/disks/foo/bucket/data.txt data.txt' + } + + def 'should use posix move when stageOutMode is move' () { + given: + def batch = new BatchConfig([stageOutCopyTransport: 'gcloud']) + def wd = Paths.get('/mnt/disks/b/work') + def td = Paths.get('/mnt/disks/b/out') + def bean = Mock(TaskBean) { + getWorkDir() >> wd + getTargetDir() >> td + getStageOutMode() >> 'move' + } + def copy = new GoogleBatchFileCopyStrategy(bean, batch) + + when: + def script = copy.getUnstageOutputFilesScript(['x.txt'], td) + then: + script.contains('nxf_fs_move') + script.contains('/mnt/disks/b/out') + } + + def 'should use posix copy when stageOutCopyTransport is posix' () { + given: + def batch = new BatchConfig([stageOutCopyTransport: 'posix']) + def wd = Paths.get('/mnt/disks/b/work') + def bean = Mock(TaskBean) { + getWorkDir() >> wd + getTargetDir() >> wd + getStageOutMode() >> 'copy' + } + def copy = new GoogleBatchFileCopyStrategy(bean, batch) + + when: + def script = copy.getUnstageOutputFilesScript(['out.txt'], wd) + then: + script.contains('nxf_fs_copy') + } +} diff --git a/plugins/nf-google/src/test/nextflow/cloud/google/batch/client/BatchConfigTest.groovy b/plugins/nf-google/src/test/nextflow/cloud/google/batch/client/BatchConfigTest.groovy index 13e2f7e06f..6810d11c12 100644 --- a/plugins/nf-google/src/test/nextflow/cloud/google/batch/client/BatchConfigTest.groovy +++ b/plugins/nf-google/src/test/nextflow/cloud/google/batch/client/BatchConfigTest.groovy @@ -16,6 +16,8 @@ package nextflow.cloud.google.batch.client +import nextflow.cloud.CloudTransferOptions +import nextflow.util.Duration import nextflow.util.MemoryUnit import spock.lang.Specification /** @@ -37,6 +39,30 @@ class BatchConfigTest extends Specification { !config.bootDiskImage !config.bootDiskSize !config.logsPath + and: + config.maxParallelTransfers == CloudTransferOptions.MAX_TRANSFER + config.maxTransferAttempts == CloudTransferOptions.MAX_TRANSFER_ATTEMPTS + config.delayBetweenAttempts == CloudTransferOptions.DEFAULT_DELAY_BETWEEN_ATTEMPTS + !config.gcloudCli + !config.gsutilCli + and: + !config.stageInCopyTransport + !config.stageOutCopyTransport + !config.usesGoogleBatchStaging() + } + + def 'should reject invalid copy transport' () { + when: + new BatchConfig([stageInCopyTransport: 'ftp']) + then: + thrown(IllegalArgumentException) + } + + def 'should detect google batch staging when cli transport set' () { + expect: + new BatchConfig([stageOutCopyTransport: 'gcloud']).usesGoogleBatchStaging() + new BatchConfig([stageInCopyTransport: 'gsutil']).usesGoogleBatchStaging() + !new BatchConfig([stageInCopyTransport: 'posix']).usesGoogleBatchStaging() } def 'should create batch config with custom settings' () { @@ -49,7 +75,13 @@ class BatchConfigTest extends Specification { bootDiskImage: 'batch-foo', bootDiskSize: '100GB', logsPath: 'gs://my-logs-bucket/logs', - installOpsAgent: true + installOpsAgent: true, + stageInCopyTransport: 'gcloud', + stageOutCopyTransport: 'posix', + gcloudCli: '/opt/google/gcloud', + maxParallelTransfers: 8, + maxTransferAttempts: 3, + delayBetweenAttempts: '5s' ] when: @@ -67,6 +99,13 @@ class BatchConfigTest extends Specification { config.logsPath == 'gs://my-logs-bucket/logs' and: config.installOpsAgent == true + and: + config.stageInCopyTransport == 'gcloud' + config.stageOutCopyTransport == 'posix' + config.gcloudCli == '/opt/google/gcloud' + config.maxParallelTransfers == 8 + config.maxTransferAttempts == 3 + config.delayBetweenAttempts == Duration.of('5s') } } From 2dc467241ad34df8933c523652d4945682384272 Mon Sep 17 00:00:00 2001 From: Tomiles <116039+tomiles@users.noreply.github.com> Date: Thu, 2 Apr 2026 23:21:19 +0200 Subject: [PATCH 2/6] feat(nf-google): some optimizations based on review Signed-off-by: Tomiles <116039+tomiles@users.noreply.github.com> --- .../google/batch/GoogleBatchBashLib.groovy | 82 ++++++------------- .../batch/GoogleBatchFileCopyStrategy.groovy | 15 ++-- .../google/batch/client/BatchConfig.groovy | 54 ++++++------ .../GoogleBatchFileCopyStrategyTest.groovy | 4 +- 4 files changed, 59 insertions(+), 96 deletions(-) diff --git a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchBashLib.groovy b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchBashLib.groovy index 619998a082..01bf04943f 100644 --- a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchBashLib.groovy +++ b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchBashLib.groovy @@ -123,7 +123,8 @@ class GoogleBatchBashLib extends BashFunLib { [[ -n "\$ms" ]] || return 1 [[ -e "\$ms" || -d "\$ms" ]] || return 1 mkdir -p "\$(dirname "\$target")" - cp -fRL "\$ms" "\$target" + cp -fRL "\$ms" "\$target" || return 1 + return 0 } nxf_gs_download() { @@ -145,20 +146,17 @@ class GoogleBatchBashLib extends BashFunLib { ;; esac >&2 echo "Unable to download path: \$source" - exit 1 + return 1 } nxf_gs_upload_try_gcloud() { command -v "\$NXF_GCLOUD" >/dev/null 2>&1 || return 1 local name=\$1 local gspath=\$2 - if [[ "\$name" == '-' ]]; then - return 1 - fi if [[ -d "\$name" ]]; then - \$NXF_GCLOUD storage cp --recursive "\$name" "\$gspath/\$name" + \$NXF_GCLOUD storage cp --recursive "\$name" "\$gspath/\$name" || return 1 else - \$NXF_GCLOUD storage cp "\$name" "\$gspath/\$name" + \$NXF_GCLOUD storage cp "\$name" "\$gspath/\$name" || return 1 fi } @@ -166,73 +164,43 @@ class GoogleBatchBashLib extends BashFunLib { command -v "\$NXF_GSUTIL" >/dev/null 2>&1 || return 1 local name=\$1 local gspath=\$2 - if [[ "\$name" == '-' ]]; then - return 1 - fi if [[ -d "\$name" ]]; then - \$NXF_GSUTIL -m cp -r "\$name" "\$gspath/\$name" + \$NXF_GSUTIL -m cp -r "\$name" "\$gspath/\$name" || return 1 else - \$NXF_GSUTIL cp "\$name" "\$gspath/\$name" + \$NXF_GSUTIL cp "\$name" "\$gspath/\$name" || return 1 fi } nxf_gs_upload_try_mount() { local name=\$1 local gspath=\$2 - if [[ "\$name" == '-' ]]; then - return 1 - fi local dest dest=\$(nxf_gs_uri_to_mount "\$gspath/\$name") [[ -n "\$dest" ]] || return 1 mkdir -p "\$(dirname "\$dest")" - cp -fRL "\$name" "\$dest" + cp -fRL "\$name" "\$dest" || return 1 } nxf_gs_upload() { local name=\$1 local gspath=\${2%/} - local move=\${3:-0} - if [[ "\$name" == '-' ]]; then - if command -v "\$NXF_GCLOUD" >/dev/null 2>&1 && \$NXF_GCLOUD storage cp - "\$gspath"; then - : - elif command -v "\$NXF_GSUTIL" >/dev/null 2>&1 && \$NXF_GSUTIL cp - "\$gspath"; then - : - else - return 1 - fi - else - case "\${NXF_STAGE_OUT_COPY_TRANSPORT:-posix}" in - gsutil) - if nxf_gs_upload_try_gsutil "\$name" "\$gspath"; then :; - elif nxf_gs_upload_try_gcloud "\$name" "\$gspath"; then :; - elif nxf_gs_upload_try_mount "\$name" "\$gspath"; then :; - else - >&2 echo "Unable to upload path: \$name" - return 1 - fi - ;; - gcloud) - if nxf_gs_upload_try_gcloud "\$name" "\$gspath"; then :; - elif nxf_gs_upload_try_gsutil "\$name" "\$gspath"; then :; - elif nxf_gs_upload_try_mount "\$name" "\$gspath"; then :; - else - >&2 echo "Unable to upload path: \$name" - return 1 - fi - ;; - posix|*) - if nxf_gs_upload_try_mount "\$name" "\$gspath"; then :; - else - >&2 echo "Unable to upload path: \$name" - return 1 - fi - ;; - esac - fi - if [[ "\$move" == "1" && "\$name" != '-' ]]; then - rm -rf "\$name" - fi + case "\${NXF_STAGE_OUT_COPY_TRANSPORT:-posix}" in + gsutil) + if nxf_gs_upload_try_gsutil "\$name" "\$gspath"; then return 0; fi + if nxf_gs_upload_try_gcloud "\$name" "\$gspath"; then return 0; fi + if nxf_gs_upload_try_mount "\$name" "\$gspath"; then return 0; fi + ;; + gcloud) + if nxf_gs_upload_try_gcloud "\$name" "\$gspath"; then return 0; fi + if nxf_gs_upload_try_gsutil "\$name" "\$gspath"; then return 0; fi + if nxf_gs_upload_try_mount "\$name" "\$gspath"; then return 0; fi + ;; + posix|*) + if nxf_gs_upload_try_mount "\$name" "\$gspath"; then return 0; fi + ;; + esac + >&2 echo "Unable to upload path: \$name" + return 1 } """.stripIndent(true) } diff --git a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchFileCopyStrategy.groovy b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchFileCopyStrategy.groovy index 891355d44a..19c7d5f9d1 100644 --- a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchFileCopyStrategy.groovy +++ b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchFileCopyStrategy.groovy @@ -48,7 +48,7 @@ class GoogleBatchFileCopyStrategy extends SimpleFileCopyStrategy { * CLI stage-in when a CLI transport is selected and {@code stageInMode=copy}. */ private boolean useCliStageInCopy() { - return BatchConfig.isCliCopyTransport(batchConfig.stageInCopyTransport) && 'copy' == stageinMode + return batchConfig.stageInCopyTransport in [BatchConfig.COPY_TRANSPORT_GCLOUD, BatchConfig.COPY_TRANSPORT_GSUTIL] && 'copy' == stageinMode } private String effectiveStageOutMode() { @@ -58,7 +58,7 @@ class GoogleBatchFileCopyStrategy extends SimpleFileCopyStrategy { private boolean shouldUseCliStageOutCopy() { if( effectiveStageOutMode() != 'copy' ) return false - return BatchConfig.isCliCopyTransport(batchConfig.stageOutCopyTransport) + return batchConfig.stageOutCopyTransport in [BatchConfig.COPY_TRANSPORT_GCLOUD, BatchConfig.COPY_TRANSPORT_GSUTIL] } private boolean needsGoogleBatchBashLib() { @@ -90,10 +90,7 @@ class GoogleBatchFileCopyStrategy extends SimpleFileCopyStrategy { return super.stageInputFile(path, targetName) final gsUri = gsUriForCliStageIn(path) if( gsUri != null ) { - final cmd = batchConfig.maxTransferAttempts > 1 - ? "downloads+=(\"nxf_cp_retry nxf_gs_download '${gsUri}' ${Escape.path(targetName)}\")" - : "downloads+=(\"nxf_gs_download '${gsUri}' ${Escape.path(targetName)}\")" - return cmd + return "downloads+=(\"nxf_cp_retry nxf_gs_download '${gsUri}' ${Escape.path(targetName)}\")" } return super.stageInputFile(path, targetName) } @@ -117,11 +114,11 @@ class GoogleBatchFileCopyStrategy extends SimpleFileCopyStrategy { if( mode == 'move' ) return super.getUnstageOutputFilesScript(outputFiles, targetDir) if( mode == 'copy' && shouldUseCliStageOutCopy() ) - return getUnstageOutputFilesScriptGcloud(outputFiles, targetDir) + return getUnstageOutputFilesScriptCli(outputFiles, targetDir) return super.getUnstageOutputFilesScript(outputFiles, targetDir) } - private String getUnstageOutputFilesScriptGcloud(List outputFiles, Path targetDir) { + private String getUnstageOutputFilesScriptCli(List outputFiles, Path targetDir) { final patterns = normalizeGlobStarPaths(outputFiles) log.trace "[GOOGLE BATCH] Unstaging file path (CLI transport): $patterns" @@ -137,7 +134,7 @@ class GoogleBatchFileCopyStrategy extends SimpleFileCopyStrategy { uploads=() IFS=\$'\\n' for name in \$(eval "ls -1d ${escape.join(' ')}" | sort | uniq); do - uploads+=("nxf_gs_upload '\$name' '${gsTarget}' 0") + uploads+=("nxf_gs_upload '\$name' '${gsTarget}'") done unset IFS nxf_parallel "\${uploads[@]}" diff --git a/plugins/nf-google/src/main/nextflow/cloud/google/batch/client/BatchConfig.groovy b/plugins/nf-google/src/main/nextflow/cloud/google/batch/client/BatchConfig.groovy index 3f729d2b70..44ef686a22 100644 --- a/plugins/nf-google/src/main/nextflow/cloud/google/batch/client/BatchConfig.groovy +++ b/plugins/nf-google/src/main/nextflow/cloud/google/batch/client/BatchConfig.groovy @@ -27,7 +27,18 @@ import nextflow.script.dsl.Description import nextflow.util.Duration import nextflow.util.MemoryUnit /** - * Model Google Batch config settings + * Model Google Batch config settings. + *

+ * Copy transports ({@link #stageInCopyTransport}, {@link #stageOutCopyTransport}): optional values + * {@code posix}, {@code gcloud}, {@code gsutil}. When unset or only {@code posix}, staging uses + * {@link nextflow.executor.SimpleFileCopyStrategy} with gcsfuse paths under {@code /mnt/disks}. When {@code gcloud} + * or {@code gsutil} is set for a direction, {@link nextflow.cloud.google.batch.GoogleBatchFileCopyStrategy} loads and + * generated bash uses {@code gcloud storage} / {@code gsutil} with fallbacks to POSIX copy from the mount; order is + * chosen per transport (see {@link nextflow.cloud.google.batch.GoogleBatchBashLib}). CLI stage-in applies when + * {@code process stageInMode} is {@code copy}; CLI stage-out when effective {@code stageOutMode} is {@code copy}. + * {@code move} / {@code rsync} / etc. keep existing POSIX behaviour. Parallelism and retries use + * {@link #maxParallelTransfers}, {@link #maxTransferAttempts}, {@link #delayBetweenAttempts} with {@code nxf_parallel} + * / {@code nxf_cp_retry} in the task script. * * @author Paolo Di Tommaso */ @@ -100,47 +111,43 @@ class BatchConfig implements ConfigScope { @ConfigOption @Description(""" - Path to the `gcloud` executable for `gcloud storage` transfers (default: `gcloud` on `PATH`). + `gcloud` executable path (default: `gcloud` on `PATH`). """) final String gcloudCli @ConfigOption @Description(""" - Path to the `gsutil` executable (default: `gsutil` on `PATH`). + `gsutil` executable path (default: `gsutil` on `PATH`). """) final String gsutilCli @ConfigOption @Description(""" - Maximum parallel object-storage transfers when using `gcloud` / `gsutil` staging (default: same as other cloud executors). + Max parallel CLI object copies (default: same as other cloud executors). """) final int maxParallelTransfers @ConfigOption @Description(""" - Maximum retry attempts for each `gcloud` / `gsutil` transfer when using CLI-based staging. + Max retries per CLI copy (default: same as other cloud executors). """) final int maxTransferAttempts @ConfigOption @Description(""" - Delay between retry attempts for `gcloud` / `gsutil` transfers when using CLI-based staging. + Delay between CLI copy retries (default: same as other cloud executors). """) final Duration delayBetweenAttempts @ConfigOption @Description(""" - When neither this nor `stageOutCopyTransport` is `gcloud` or `gsutil` (including when both are unset or only `posix`), the default is POSIX staging via {@link nextflow.executor.SimpleFileCopyStrategy} with gcsfuse mounts under `/mnt/disks`. - - When set to `posix`, `gcloud`, or `gsutil`, it participates in selecting **input** copy behaviour (with `stageOutCopyTransport`). `posix` uses `cp`/links from the mount. `gcloud` / `gsutil` use the respective CLI (with fallbacks) when `process stageInMode` is `copy` (paths are under the gcsfuse mount; `gs://` URIs are derived for CLI copy). Other `stageInMode` values use the mount. Buckets remain mounted for POSIX fallback. + Stage-in copy transport: `posix`, `gcloud`, or `gsutil`. """) final String stageInCopyTransport @ConfigOption @Description(""" - When neither this nor `stageInCopyTransport` is `gcloud` or `gsutil` (including when both are unset or only `posix`), the default is POSIX staging via {@link nextflow.executor.SimpleFileCopyStrategy} with gcsfuse mounts only. - - When set to `posix`, `gcloud`, or `gsutil`, it participates in selecting **output** copy behaviour (with `stageInCopyTransport`): `posix` uses POSIX from the gcsfuse mount; `gcloud` / `gsutil` use CLIs when the effective `stageOutMode` is `copy`. `move`, `rsync`, `rclone`, and `fcp` use existing wrapper behaviour (POSIX via mount). CLI-based `move` staging is not implemented. + Stage-out copy transport: `posix`, `gcloud`, or `gsutil`. """) final String stageOutCopyTransport @@ -222,8 +229,12 @@ class BatchConfig implements ConfigScope { maxParallelTransfers = opts.maxParallelTransfers != null ? opts.maxParallelTransfers as int : CloudTransferOptions.MAX_TRANSFER maxTransferAttempts = opts.maxTransferAttempts != null ? opts.maxTransferAttempts as int : CloudTransferOptions.MAX_TRANSFER_ATTEMPTS delayBetweenAttempts = opts.delayBetweenAttempts ? opts.delayBetweenAttempts as Duration : CloudTransferOptions.DEFAULT_DELAY_BETWEEN_ATTEMPTS - stageInCopyTransport = normaliseOptionalCopyTransport(opts.stageInCopyTransport) - stageOutCopyTransport = normaliseOptionalCopyTransport(opts.stageOutCopyTransport) + stageInCopyTransport = opts.stageInCopyTransport as String + stageOutCopyTransport = opts.stageOutCopyTransport as String + for (String t in [stageInCopyTransport, stageOutCopyTransport]) { + if (t && t !in VALID_COPY_TRANSPORTS) + throw new IllegalArgumentException("Invalid google.batch copy transport: '$t' — valid values are: ${VALID_COPY_TRANSPORTS.join(', ')}") + } installGpuDrivers = opts.installGpuDrivers as boolean installOpsAgent = opts.installOpsAgent as boolean logsPath = opts.logsPath @@ -242,20 +253,7 @@ class BatchConfig implements ConfigScope { * Load {@link nextflow.cloud.google.batch.GoogleBatchFileCopyStrategy} when any transport requests CLI object-storage copy. */ boolean usesGoogleBatchStaging() { - return isCliCopyTransport(stageInCopyTransport) || isCliCopyTransport(stageOutCopyTransport) - } - - static boolean isCliCopyTransport(String transport) { - return COPY_TRANSPORT_GCLOUD == transport || COPY_TRANSPORT_GSUTIL == transport - } - - private static String normaliseOptionalCopyTransport(Object value) { - if( value == null ) - return null - final t = value as String - if( !VALID_COPY_TRANSPORTS.contains(t) ) - throw new IllegalArgumentException("Invalid google.batch copy transport: '$t' — valid values are: ${VALID_COPY_TRANSPORTS.join(', ')}") - return t + return [stageInCopyTransport, stageOutCopyTransport].any { it in [COPY_TRANSPORT_GCLOUD, COPY_TRANSPORT_GSUTIL] } } BatchRetryConfig getRetryConfig() { retry } diff --git a/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchFileCopyStrategyTest.groovy b/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchFileCopyStrategyTest.groovy index 361a10a208..7e62e88120 100644 --- a/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchFileCopyStrategyTest.groovy +++ b/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchFileCopyStrategyTest.groovy @@ -44,7 +44,7 @@ class GoogleBatchFileCopyStrategyTest extends Specification { def copy = new GoogleBatchFileCopyStrategy(bean, batch) expect: - copy.stageInputFile(path, 'in.txt') == 'downloads+=("nxf_gs_download \'gs://b/data/in.txt\' in.txt")' + copy.stageInputFile(path, 'in.txt') == 'downloads+=("nxf_cp_retry nxf_gs_download \'gs://b/data/in.txt\' in.txt")' } def 'should build gs upload unstage script' () { @@ -65,7 +65,7 @@ class GoogleBatchFileCopyStrategyTest extends Specification { uploads=() IFS=$'\\n' for name in $(eval "ls -1d out.txt" | sort | uniq); do - uploads+=("nxf_gs_upload '$name' 'gs://foo/wd' 0") + uploads+=("nxf_gs_upload '$name' 'gs://foo/wd'") done unset IFS nxf_parallel "${uploads[@]}" From c0e3c0960fc8b965c01f8c02ea89cc3c46babdf9 Mon Sep 17 00:00:00 2001 From: Tomiles <116039+tomiles@users.noreply.github.com> Date: Tue, 14 Apr 2026 10:53:13 +0200 Subject: [PATCH 3/6] feat(nf-google): make the retry behaviour more consistent, and centralized CLI transport detection Signed-off-by: Tomiles <116039+tomiles@users.noreply.github.com> --- .../batch/GoogleBatchFileCopyStrategy.groovy | 8 +++++--- .../google/batch/client/BatchConfig.groovy | 7 ++++++- .../GoogleBatchFileCopyStrategyTest.groovy | 18 ++++++++++++++++++ 3 files changed, 29 insertions(+), 4 deletions(-) diff --git a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchFileCopyStrategy.groovy b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchFileCopyStrategy.groovy index 19c7d5f9d1..c4471d8af6 100644 --- a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchFileCopyStrategy.groovy +++ b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchFileCopyStrategy.groovy @@ -48,7 +48,7 @@ class GoogleBatchFileCopyStrategy extends SimpleFileCopyStrategy { * CLI stage-in when a CLI transport is selected and {@code stageInMode=copy}. */ private boolean useCliStageInCopy() { - return batchConfig.stageInCopyTransport in [BatchConfig.COPY_TRANSPORT_GCLOUD, BatchConfig.COPY_TRANSPORT_GSUTIL] && 'copy' == stageinMode + return BatchConfig.isCliCopyTransport(batchConfig.stageInCopyTransport) && 'copy' == stageinMode } private String effectiveStageOutMode() { @@ -58,7 +58,7 @@ class GoogleBatchFileCopyStrategy extends SimpleFileCopyStrategy { private boolean shouldUseCliStageOutCopy() { if( effectiveStageOutMode() != 'copy' ) return false - return batchConfig.stageOutCopyTransport in [BatchConfig.COPY_TRANSPORT_GCLOUD, BatchConfig.COPY_TRANSPORT_GSUTIL] + return BatchConfig.isCliCopyTransport(batchConfig.stageOutCopyTransport) } private boolean needsGoogleBatchBashLib() { @@ -90,7 +90,9 @@ class GoogleBatchFileCopyStrategy extends SimpleFileCopyStrategy { return super.stageInputFile(path, targetName) final gsUri = gsUriForCliStageIn(path) if( gsUri != null ) { - return "downloads+=(\"nxf_cp_retry nxf_gs_download '${gsUri}' ${Escape.path(targetName)}\")" + return batchConfig.maxTransferAttempts > 1 + ? "downloads+=(\"nxf_cp_retry nxf_gs_download '${gsUri}' ${Escape.path(targetName)}\")" + : "downloads+=(\"nxf_gs_download '${gsUri}' ${Escape.path(targetName)}\")" } return super.stageInputFile(path, targetName) } diff --git a/plugins/nf-google/src/main/nextflow/cloud/google/batch/client/BatchConfig.groovy b/plugins/nf-google/src/main/nextflow/cloud/google/batch/client/BatchConfig.groovy index 44ef686a22..b2cf7bba04 100644 --- a/plugins/nf-google/src/main/nextflow/cloud/google/batch/client/BatchConfig.groovy +++ b/plugins/nf-google/src/main/nextflow/cloud/google/batch/client/BatchConfig.groovy @@ -62,6 +62,7 @@ class BatchConfig implements ConfigScope { static final String COPY_TRANSPORT_GSUTIL = 'gsutil' static final private List VALID_COPY_TRANSPORTS = List.of(COPY_TRANSPORT_POSIX, COPY_TRANSPORT_GCLOUD, COPY_TRANSPORT_GSUTIL) + static final private List CLI_COPY_TRANSPORTS = List.of(COPY_TRANSPORT_GCLOUD, COPY_TRANSPORT_GSUTIL) static final private int DEFAULT_MAX_SPOT_ATTEMPTS = 0 @@ -253,7 +254,11 @@ class BatchConfig implements ConfigScope { * Load {@link nextflow.cloud.google.batch.GoogleBatchFileCopyStrategy} when any transport requests CLI object-storage copy. */ boolean usesGoogleBatchStaging() { - return [stageInCopyTransport, stageOutCopyTransport].any { it in [COPY_TRANSPORT_GCLOUD, COPY_TRANSPORT_GSUTIL] } + return [stageInCopyTransport, stageOutCopyTransport].any { isCliCopyTransport(it) } + } + + static boolean isCliCopyTransport(String transport) { + return transport in CLI_COPY_TRANSPORTS } BatchRetryConfig getRetryConfig() { retry } diff --git a/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchFileCopyStrategyTest.groovy b/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchFileCopyStrategyTest.groovy index 7e62e88120..a5642e791e 100644 --- a/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchFileCopyStrategyTest.groovy +++ b/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchFileCopyStrategyTest.groovy @@ -43,6 +43,24 @@ class GoogleBatchFileCopyStrategyTest extends Specification { def path = Paths.get('/mnt/disks/b/data/in.txt') def copy = new GoogleBatchFileCopyStrategy(bean, batch) + expect: + copy.stageInputFile(path, 'in.txt') == 'downloads+=("nxf_gs_download \'gs://b/data/in.txt\' in.txt")' + } + + def 'should build gs download staging line with retry when maxTransferAttempts > 1' () { + given: + def batch = new BatchConfig([ + stageInCopyTransport: 'gcloud', + maxTransferAttempts: 3 + ]) + def bean = Mock(TaskBean) { + getWorkDir() >> Paths.get('/mnt/disks/w/x') + getTargetDir() >> Paths.get('/mnt/disks/w/x') + getStageInMode() >> 'copy' + } + def path = Paths.get('/mnt/disks/b/data/in.txt') + def copy = new GoogleBatchFileCopyStrategy(bean, batch) + expect: copy.stageInputFile(path, 'in.txt') == 'downloads+=("nxf_cp_retry nxf_gs_download \'gs://b/data/in.txt\' in.txt")' } From caf8f541be5e15c81361d06bb84a3c7f5f15e809 Mon Sep 17 00:00:00 2001 From: Tomiles <116039+tomiles@users.noreply.github.com> Date: Tue, 14 Apr 2026 11:23:31 +0200 Subject: [PATCH 4/6] feat(nf-google): fold in #6917 that defaults to copy stageout mode Signed-off-by: Tomiles <116039+tomiles@users.noreply.github.com> --- .../batch/GoogleBatchScriptLauncher.groovy | 4 ++++ .../GoogleBatchScriptLauncherTest.groovy | 20 ++++++++++++++++++- 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchScriptLauncher.groovy b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchScriptLauncher.groovy index ff9b862f41..6efa60a7fd 100644 --- a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchScriptLauncher.groovy +++ b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchScriptLauncher.groovy @@ -58,6 +58,10 @@ class GoogleBatchScriptLauncher extends BashWrapperBuilder implements GoogleBatc protected GoogleBatchScriptLauncher() {} GoogleBatchScriptLauncher(TaskBean bean, Path remoteBinDir, BatchConfig batchConfig) { + // Unstaging is cross-device on Google Batch (gcsfuse-mounted work dir). + // `move` can fail with overlapping outputs or symlinked paths. + if( bean.stageOutMode == null ) + bean.stageOutMode = 'copy' super(bean, copyStrategyFor(bean, batchConfig)) // keep track the google storage work dir this.remoteWorkDir = (CloudStoragePath) bean.workDir diff --git a/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchScriptLauncherTest.groovy b/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchScriptLauncherTest.groovy index 1982722518..d833de90c5 100644 --- a/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchScriptLauncherTest.groovy +++ b/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchScriptLauncherTest.groovy @@ -17,10 +17,10 @@ package nextflow.cloud.google.batch import java.nio.file.Paths - import com.google.cloud.storage.contrib.nio.CloudStorageFileSystem import nextflow.cloud.google.GoogleOpts import nextflow.cloud.google.batch.client.BatchConfig +import nextflow.processor.TaskBean import nextflow.processor.TaskRun import spock.lang.Specification import spock.lang.Unroll @@ -87,6 +87,24 @@ class GoogleBatchScriptLauncherTest extends Specification{ volumes[1].getMountOptionsList() == ['-o rw', '-implicit-dirs', '-o allow_other', '--uid=1000', '--billing-project my-project'] } + def 'should default stageOutMode to copy when not set' () { + given: + def workDir = CloudStorageFileSystem.forBucket('foo').getPath('/scratch') + def targetDir = CloudStorageFileSystem.forBucket('foo').getPath('/scratch') + def bean = new TaskBean( + workDir: workDir, + targetDir: targetDir, + stageOutMode: null, + inputFiles: [:] + ) + + when: + new GoogleBatchScriptLauncher(bean, null, Mock(BatchConfig)) + + then: + bean.stageOutMode == 'copy' + } + def 'should return target files in remote work dir' () { given: def launcher = new GoogleBatchScriptLauncher() From 03e2f1a79bb38fcd7198880d5c5ba19e3932688e Mon Sep 17 00:00:00 2001 From: Tomiles <116039+tomiles@users.noreply.github.com> Date: Tue, 14 Apr 2026 11:36:35 +0200 Subject: [PATCH 5/6] feat(nf-google): document changes Signed-off-by: Tomiles <116039+tomiles@users.noreply.github.com> --- docs/google.md | 16 ++++++++++++++++ docs/reference/config.md | 24 ++++++++++++++++++++++++ 2 files changed, 40 insertions(+) diff --git a/docs/google.md b/docs/google.md index d2188a4e8f..c3dfdafbe9 100644 --- a/docs/google.md +++ b/docs/google.md @@ -88,6 +88,22 @@ Notes: - A container image must be specified to execute processes. You can use a different Docker image for each process using one or more {ref}`config-process-selectors`. - Make sure to specify the project ID, not the project name. - Make sure to specify a location where Google Batch is available. Refer to the [Google Batch documentation](https://cloud.google.com/batch/docs/get-started#locations) for region availability. +- By default, Google Batch output unstaging uses `copy` when `stageOutMode` is not set, because unstaging goes from local scratch to a gcsfuse-mounted work directory and `move` can fail with overlapping outputs or symlinked paths. + +Optional transfer settings: + +```groovy +google { + batch { + // Default is 'posix' when unset + stageInCopyTransport = 'gcloud' // or 'gsutil' / 'posix' + stageOutCopyTransport = 'gcloud' // or 'gsutil' / 'posix' + } +} +``` + +When `gcloud` or `gsutil` is selected, CLI staging is used for `copy` mode with fallback to POSIX mount copy. +Users are responsible for ensuring the selected CLI transport is available in the task container/runtime environment. Read the {ref}`Google configuration` section to learn more about advanced configuration options. diff --git a/docs/reference/config.md b/docs/reference/config.md index 7519a4fe9f..a9f51191fc 100644 --- a/docs/reference/config.md +++ b/docs/reference/config.md @@ -960,11 +960,20 @@ The following settings are available: `google.batch.cpuPlatform` : The [minimum CPU Platform](https://cloud.google.com/compute/docs/instances/specify-min-cpu-platform#specifications), e.g. `'Intel Skylake'` (default: none). +`google.batch.delayBetweenAttempts` +: Delay between transfer retry attempts (default: `10 sec`). + +`google.batch.gcloudCli` +: The `gcloud` executable path used for CLI staging (default: `gcloud` from `PATH`). + `google.batch.gcsfuseOptions` : :::{versionadded} 25.03.0-edge ::: : List of custom mount options for `gcsfuse` (default: `['-o rw', '-implicit-dirs']`). +`google.batch.gsutilCli` +: The `gsutil` executable path used for CLI staging (default: `gsutil` from `PATH`). + `google.batch.installOpsAgent` : Enables Ops Agent installation on Google Batch instances for enhanced monitoring and logging (default: `false`). See the [Google Batch documentation](https://docs.cloud.google.com/batch/docs/create-run-job-ops-agent) for details. @@ -987,6 +996,12 @@ The following settings are available: : Max number of execution attempts of a job interrupted by a Compute Engine Spot reclaim event (default: `0`). : See also: `google.batch.autoRetryExitCodes` +`google.batch.maxParallelTransfers` +: Max parallel upload/download transfer operations per task script (default: `4`). + +`google.batch.maxTransferAttempts` +: Max transfer retry attempts used by built-in transfer retry wrappers (default: `1`). + `google.batch.network` : The URL of an existing network resource to which the VM will be attached. @@ -1007,6 +1022,15 @@ The following settings are available: `google.batch.spot` : Enable the use of spot virtual machines (default: `false`). +`google.batch.stageInCopyTransport` +: Stage-in transport for `copy` mode. Can be `posix`, `gcloud`, or `gsutil` (default: `posix`). +: When set to `gcloud` or `gsutil`, CLI staging is used with fallback to POSIX mount copy. + +`google.batch.stageOutCopyTransport` +: Stage-out transport for `copy` mode. Can be `posix`, `gcloud`, or `gsutil` (default: `posix`). +: When set to `gcloud` or `gsutil`, CLI staging is used with fallback to POSIX mount copy. +: Users are responsible for ensuring the selected CLI transport is available in the task runtime/container environment. + `google.batch.subnetwork` : The URL of an existing subnetwork resource in the network to which the VM will be attached. From b48e88675ed700f51c47dad16063964cd79c2b25 Mon Sep 17 00:00:00 2001 From: Tomiles <116039+tomiles@users.noreply.github.com> Date: Tue, 14 Apr 2026 11:51:56 +0200 Subject: [PATCH 6/6] feat(nf-google): fixes for failing tests Signed-off-by: Tomiles <116039+tomiles@users.noreply.github.com> --- .../google/batch/GoogleBatchScriptLauncher.groovy | 14 +++++++++----- .../cloud/google/batch/client/BatchConfig.groovy | 2 +- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchScriptLauncher.groovy b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchScriptLauncher.groovy index 6efa60a7fd..feda0aedd9 100644 --- a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchScriptLauncher.groovy +++ b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchScriptLauncher.groovy @@ -58,11 +58,7 @@ class GoogleBatchScriptLauncher extends BashWrapperBuilder implements GoogleBatc protected GoogleBatchScriptLauncher() {} GoogleBatchScriptLauncher(TaskBean bean, Path remoteBinDir, BatchConfig batchConfig) { - // Unstaging is cross-device on Google Batch (gcsfuse-mounted work dir). - // `move` can fail with overlapping outputs or symlinked paths. - if( bean.stageOutMode == null ) - bean.stageOutMode = 'copy' - super(bean, copyStrategyFor(bean, batchConfig)) + super(defaultStageOutMode(bean), copyStrategyFor(bean, batchConfig)) // keep track the google storage work dir this.remoteWorkDir = (CloudStoragePath) bean.workDir this.remoteBinDir = toContainerMount(remoteBinDir) @@ -114,6 +110,14 @@ class GoogleBatchScriptLauncher extends BashWrapperBuilder implements GoogleBatc : new SimpleFileCopyStrategy(bean) } + private static TaskBean defaultStageOutMode(TaskBean bean) { + // Unstaging is cross-device on Google Batch (gcsfuse-mounted work dir). + // `move` can fail with overlapping outputs or symlinked paths. + if( bean.stageOutMode == null ) + bean.stageOutMode = 'copy' + return bean + } + protected String headerScript(TaskBean bean) { def result = "NXF_CHDIR=${Escape.path(bean.workDir)}\n" if( remoteBinDir ) { diff --git a/plugins/nf-google/src/main/nextflow/cloud/google/batch/client/BatchConfig.groovy b/plugins/nf-google/src/main/nextflow/cloud/google/batch/client/BatchConfig.groovy index b2cf7bba04..8fc07bab80 100644 --- a/plugins/nf-google/src/main/nextflow/cloud/google/batch/client/BatchConfig.groovy +++ b/plugins/nf-google/src/main/nextflow/cloud/google/batch/client/BatchConfig.groovy @@ -258,7 +258,7 @@ class BatchConfig implements ConfigScope { } static boolean isCliCopyTransport(String transport) { - return transport in CLI_COPY_TRANSPORTS + return transport != null && transport in CLI_COPY_TRANSPORTS } BatchRetryConfig getRetryConfig() { retry }