diff --git a/docs/google.md b/docs/google.md index d2188a4e8f..c3dfdafbe9 100644 --- a/docs/google.md +++ b/docs/google.md @@ -88,6 +88,22 @@ Notes: - A container image must be specified to execute processes. You can use a different Docker image for each process using one or more {ref}`config-process-selectors`. - Make sure to specify the project ID, not the project name. - Make sure to specify a location where Google Batch is available. Refer to the [Google Batch documentation](https://cloud.google.com/batch/docs/get-started#locations) for region availability. +- By default, Google Batch output unstaging uses `copy` when `stageOutMode` is not set, because unstaging goes from local scratch to a gcsfuse-mounted work directory and `move` can fail with overlapping outputs or symlinked paths. + +Optional transfer settings: + +```groovy +google { + batch { + // Default is 'posix' when unset + stageInCopyTransport = 'gcloud' // or 'gsutil' / 'posix' + stageOutCopyTransport = 'gcloud' // or 'gsutil' / 'posix' + } +} +``` + +When `gcloud` or `gsutil` is selected, CLI staging is used for `copy` mode with fallback to POSIX mount copy. +Users are responsible for ensuring the selected CLI transport is available in the task container/runtime environment. Read the {ref}`Google configuration` section to learn more about advanced configuration options. diff --git a/docs/reference/config.md b/docs/reference/config.md index 7519a4fe9f..a9f51191fc 100644 --- a/docs/reference/config.md +++ b/docs/reference/config.md @@ -960,11 +960,20 @@ The following settings are available: `google.batch.cpuPlatform` : The [minimum CPU Platform](https://cloud.google.com/compute/docs/instances/specify-min-cpu-platform#specifications), e.g. `'Intel Skylake'` (default: none). +`google.batch.delayBetweenAttempts` +: Delay between transfer retry attempts (default: `10 sec`). + +`google.batch.gcloudCli` +: The `gcloud` executable path used for CLI staging (default: `gcloud` from `PATH`). + `google.batch.gcsfuseOptions` : :::{versionadded} 25.03.0-edge ::: : List of custom mount options for `gcsfuse` (default: `['-o rw', '-implicit-dirs']`). +`google.batch.gsutilCli` +: The `gsutil` executable path used for CLI staging (default: `gsutil` from `PATH`). + `google.batch.installOpsAgent` : Enables Ops Agent installation on Google Batch instances for enhanced monitoring and logging (default: `false`). See the [Google Batch documentation](https://docs.cloud.google.com/batch/docs/create-run-job-ops-agent) for details. @@ -987,6 +996,12 @@ The following settings are available: : Max number of execution attempts of a job interrupted by a Compute Engine Spot reclaim event (default: `0`). : See also: `google.batch.autoRetryExitCodes` +`google.batch.maxParallelTransfers` +: Max parallel upload/download transfer operations per task script (default: `4`). + +`google.batch.maxTransferAttempts` +: Max transfer retry attempts used by built-in transfer retry wrappers (default: `1`). + `google.batch.network` : The URL of an existing network resource to which the VM will be attached. @@ -1007,6 +1022,15 @@ The following settings are available: `google.batch.spot` : Enable the use of spot virtual machines (default: `false`). +`google.batch.stageInCopyTransport` +: Stage-in transport for `copy` mode. Can be `posix`, `gcloud`, or `gsutil` (default: `posix`). +: When set to `gcloud` or `gsutil`, CLI staging is used with fallback to POSIX mount copy. + +`google.batch.stageOutCopyTransport` +: Stage-out transport for `copy` mode. Can be `posix`, `gcloud`, or `gsutil` (default: `posix`). +: When set to `gcloud` or `gsutil`, CLI staging is used with fallback to POSIX mount copy. +: Users are responsible for ensuring the selected CLI transport is available in the task runtime/container environment. + `google.batch.subnetwork` : The URL of an existing subnetwork resource in the network to which the VM will be attached. diff --git a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchBashLib.groovy b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchBashLib.groovy new file mode 100644 index 0000000000..01bf04943f --- /dev/null +++ b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchBashLib.groovy @@ -0,0 +1,237 @@ +/* + * Copyright 2013-2026, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.cloud.google.batch + +import groovy.transform.CompileStatic +import groovy.transform.Memoized +import nextflow.cloud.google.batch.client.BatchConfig +import nextflow.executor.BashFunLib +import nextflow.util.Escape + +/** + * Bash helpers for Google Cloud Storage transfers in Google Batch tasks. + * Order within each transport is controlled by {@code NXF_STAGE_IN_COPY_TRANSPORT} / {@code NXF_STAGE_OUT_COPY_TRANSPORT}. + * + * @author Paolo Di Tommaso + */ +@CompileStatic +class GoogleBatchBashLib extends BashFunLib { + + private String gcloudCli = 'gcloud' + + private String gsutilCli = 'gsutil' + + private BatchConfig batchConfig + + GoogleBatchBashLib withGcloudCli(String value) { + if( value ) + this.gcloudCli = value + return this + } + + GoogleBatchBashLib withGsutilCli(String value) { + if( value ) + this.gsutilCli = value + return this + } + + GoogleBatchBashLib withBatchConfig(BatchConfig config) { + this.batchConfig = config + return this + } + + protected String gsLib() { + final gcloud = Escape.path(gcloudCli ?: 'gcloud') + final gsutil = Escape.path(gsutilCli ?: 'gsutil') + final mountRoot = GoogleBatchScriptLauncher.MOUNT_ROOT + """ + export NXF_GCLOUD=$gcloud + export NXF_GSUTIL=$gsutil + export NXF_GS_MOUNT_ROOT="$mountRoot" + + nxf_gs_uri_to_mount() { + case "\$1" in + gs://*) + local p=\${1#gs://} + local bucket=\${p%%/*} + if [[ "\$bucket" == "\$p" ]]; then + echo "\$NXF_GS_MOUNT_ROOT/\$bucket" + else + local rest=\${p#*/} + echo "\$NXF_GS_MOUNT_ROOT/\$bucket/\$rest" + fi + ;; + *) + echo "" + ;; + esac + } + + nxf_gs_download_gcloud() { + command -v "\$NXF_GCLOUD" >/dev/null 2>&1 || return 1 + local source=\$1 + local target=\$2 + local basedir=\$(dirname "\$2") + mkdir -p "\$basedir" + if \$NXF_GCLOUD storage cp "\$source" "\$target" 2>/dev/null; then + return 0 + fi + mkdir -p "\$target" + if \$NXF_GCLOUD storage cp --recursive "\$source" "\$target" >/dev/null 2>&1; then + return 0 + fi + rm -rf "\$target" + return 1 + } + + nxf_gs_download_gsutil() { + command -v "\$NXF_GSUTIL" >/dev/null 2>&1 || return 1 + local source=\$1 + local target=\$2 + local basedir=\$(dirname "\$2") + mkdir -p "\$basedir" + local ret + ret=\$(\$NXF_GSUTIL cp "\$source" "\$target" 2>&1) || { + mkdir -p "\$target" + \$NXF_GSUTIL -m cp -r "\$source" "\$target" >/dev/null || { + rm -rf "\$target" + return 1 + } + } + return 0 + } + + nxf_gs_download_mount() { + local source=\$1 + local target=\$2 + local ms + ms=\$(nxf_gs_uri_to_mount "\$source") + [[ -n "\$ms" ]] || return 1 + [[ -e "\$ms" || -d "\$ms" ]] || return 1 + mkdir -p "\$(dirname "\$target")" + cp -fRL "\$ms" "\$target" || return 1 + return 0 + } + + nxf_gs_download() { + local source=\$1 + local target=\$2 + case "\${NXF_STAGE_IN_COPY_TRANSPORT:-posix}" in + gsutil) + if nxf_gs_download_gsutil "\$source" "\$target"; then return 0; fi + if nxf_gs_download_gcloud "\$source" "\$target"; then return 0; fi + if nxf_gs_download_mount "\$source" "\$target"; then return 0; fi + ;; + gcloud) + if nxf_gs_download_gcloud "\$source" "\$target"; then return 0; fi + if nxf_gs_download_gsutil "\$source" "\$target"; then return 0; fi + if nxf_gs_download_mount "\$source" "\$target"; then return 0; fi + ;; + posix|*) + if nxf_gs_download_mount "\$source" "\$target"; then return 0; fi + ;; + esac + >&2 echo "Unable to download path: \$source" + return 1 + } + + nxf_gs_upload_try_gcloud() { + command -v "\$NXF_GCLOUD" >/dev/null 2>&1 || return 1 + local name=\$1 + local gspath=\$2 + if [[ -d "\$name" ]]; then + \$NXF_GCLOUD storage cp --recursive "\$name" "\$gspath/\$name" || return 1 + else + \$NXF_GCLOUD storage cp "\$name" "\$gspath/\$name" || return 1 + fi + } + + nxf_gs_upload_try_gsutil() { + command -v "\$NXF_GSUTIL" >/dev/null 2>&1 || return 1 + local name=\$1 + local gspath=\$2 + if [[ -d "\$name" ]]; then + \$NXF_GSUTIL -m cp -r "\$name" "\$gspath/\$name" || return 1 + else + \$NXF_GSUTIL cp "\$name" "\$gspath/\$name" || return 1 + fi + } + + nxf_gs_upload_try_mount() { + local name=\$1 + local gspath=\$2 + local dest + dest=\$(nxf_gs_uri_to_mount "\$gspath/\$name") + [[ -n "\$dest" ]] || return 1 + mkdir -p "\$(dirname "\$dest")" + cp -fRL "\$name" "\$dest" || return 1 + } + + nxf_gs_upload() { + local name=\$1 + local gspath=\${2%/} + case "\${NXF_STAGE_OUT_COPY_TRANSPORT:-posix}" in + gsutil) + if nxf_gs_upload_try_gsutil "\$name" "\$gspath"; then return 0; fi + if nxf_gs_upload_try_gcloud "\$name" "\$gspath"; then return 0; fi + if nxf_gs_upload_try_mount "\$name" "\$gspath"; then return 0; fi + ;; + gcloud) + if nxf_gs_upload_try_gcloud "\$name" "\$gspath"; then return 0; fi + if nxf_gs_upload_try_gsutil "\$name" "\$gspath"; then return 0; fi + if nxf_gs_upload_try_mount "\$name" "\$gspath"; then return 0; fi + ;; + posix|*) + if nxf_gs_upload_try_mount "\$name" "\$gspath"; then return 0; fi + ;; + esac + >&2 echo "Unable to upload path: \$name" + return 1 + } + """.stripIndent(true) + } + + protected String transportExports() { + final cfg = batchConfig + if( !cfg ) + return '' + final inTr = cfg.stageInCopyTransport ?: BatchConfig.COPY_TRANSPORT_POSIX + final outTr = cfg.stageOutCopyTransport ?: BatchConfig.COPY_TRANSPORT_POSIX + """ + export NXF_STAGE_IN_COPY_TRANSPORT=${Escape.path(inTr)} + export NXF_STAGE_OUT_COPY_TRANSPORT=${Escape.path(outTr)} + """.stripIndent(true) + } + + @Override + String render() { + super.render() + transportExports() + gsLib() + } + + @Memoized + static String script(BatchConfig config) { + new GoogleBatchBashLib() + .includeCoreFun(true) + .withMaxParallelTransfers(config.maxParallelTransfers) + .withMaxTransferAttempts(config.maxTransferAttempts) + .withDelayBetweenAttempts(config.delayBetweenAttempts) + .withGcloudCli(config.gcloudCli) + .withGsutilCli(config.gsutilCli) + .withBatchConfig(config) + .render() + } +} diff --git a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchFileCopyStrategy.groovy b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchFileCopyStrategy.groovy new file mode 100644 index 0000000000..c4471d8af6 --- /dev/null +++ b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchFileCopyStrategy.groovy @@ -0,0 +1,169 @@ +/* + * Copyright 2013-2026, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.cloud.google.batch + +import java.nio.file.Path + +import com.google.cloud.storage.contrib.nio.CloudStoragePath +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j +import nextflow.cloud.google.batch.client.BatchConfig +import nextflow.executor.SimpleFileCopyStrategy +import nextflow.extension.FilesEx +import nextflow.processor.TaskBean +import nextflow.util.Escape + +/** + * Optional Google Batch staging when {@link BatchConfig#usesGoogleBatchStaging()} is true, honouring + * {@link BatchConfig#stageInCopyTransport} / {@link BatchConfig#stageOutCopyTransport} for {@code copy} modes. + * + * @author Paolo Di Tommaso + */ +@Slf4j +@CompileStatic +class GoogleBatchFileCopyStrategy extends SimpleFileCopyStrategy { + + private final BatchConfig batchConfig + + GoogleBatchFileCopyStrategy(TaskBean bean, BatchConfig batchConfig) { + super(bean) + this.batchConfig = batchConfig + } + + /** + * CLI stage-in when a CLI transport is selected and {@code stageInMode=copy}. + */ + private boolean useCliStageInCopy() { + return BatchConfig.isCliCopyTransport(batchConfig.stageInCopyTransport) && 'copy' == stageinMode + } + + private String effectiveStageOutMode() { + stageoutMode ?: ( workDir==targetDir ? 'copy' : 'move' ) + } + + private boolean shouldUseCliStageOutCopy() { + if( effectiveStageOutMode() != 'copy' ) + return false + return BatchConfig.isCliCopyTransport(batchConfig.stageOutCopyTransport) + } + + private boolean needsGoogleBatchBashLib() { + useCliStageInCopy() || shouldUseCliStageOutCopy() + } + + @Override + String getBeforeStartScript() { + final base = super.getBeforeStartScript() + if( !needsGoogleBatchBashLib() ) + return base + final gs = GoogleBatchBashLib.script(batchConfig) + return gs + (base ? '\n' + base : '') + } + + @Override + String getStageInputFilesScript(Map inputFiles) { + if( !useCliStageInCopy() ) + return super.getStageInputFilesScript(inputFiles) + def result = 'downloads=(true)\n' + result += super.getStageInputFilesScript(inputFiles) + '\n' + result += 'nxf_parallel "${downloads[@]}"\n' + return result + } + + @Override + String stageInputFile(Path path, String targetName) { + if( !useCliStageInCopy() ) + return super.stageInputFile(path, targetName) + final gsUri = gsUriForCliStageIn(path) + if( gsUri != null ) { + return batchConfig.maxTransferAttempts > 1 + ? "downloads+=(\"nxf_cp_retry nxf_gs_download '${gsUri}' ${Escape.path(targetName)}\")" + : "downloads+=(\"nxf_gs_download '${gsUri}' ${Escape.path(targetName)}\")" + } + return super.stageInputFile(path, targetName) + } + + /** + * Resolve a {@code gs://} URI for CLI stage-in: from {@link CloudStoragePath}, or from a container fuse path under {@link GoogleBatchScriptLauncher#MOUNT_ROOT}. + */ + private static String gsUriForCliStageIn(Path path) { + if( path instanceof CloudStoragePath ) + return FilesEx.toUriString((CloudStoragePath)path) + final s = path.toString() + final prefix = GoogleBatchScriptLauncher.MOUNT_ROOT + '/' + if( s.startsWith(prefix) ) + return toGsUriFromContainerMount(path) + return null + } + + @Override + String getUnstageOutputFilesScript(List outputFiles, Path targetDir) { + final mode = effectiveStageOutMode() + if( mode == 'move' ) + return super.getUnstageOutputFilesScript(outputFiles, targetDir) + if( mode == 'copy' && shouldUseCliStageOutCopy() ) + return getUnstageOutputFilesScriptCli(outputFiles, targetDir) + return super.getUnstageOutputFilesScript(outputFiles, targetDir) + } + + private String getUnstageOutputFilesScriptCli(List outputFiles, Path targetDir) { + final patterns = normalizeGlobStarPaths(outputFiles) + log.trace "[GOOGLE BATCH] Unstaging file path (CLI transport): $patterns" + + if( !patterns ) + return null + + final gsTarget = toGsUriFromContainerMount(targetDir) + final escape = new ArrayList(outputFiles.size()) + for( String it : patterns ) + escape.add( Escape.path(it) ) + + return """\ + uploads=() + IFS=\$'\\n' + for name in \$(eval "ls -1d ${escape.join(' ')}" | sort | uniq); do + uploads+=("nxf_gs_upload '\$name' '${gsTarget}'") + done + unset IFS + nxf_parallel "\${uploads[@]}" + """.stripIndent(true) + } + + @Override + String fileStr(Path path) { + !useCliStageInCopy() ? super.fileStr(path) : Escape.path(path.getFileName()) + } + + @Override + String pipeInputFile(Path file) { + !useCliStageInCopy() ? super.pipeInputFile(file) : " < ${Escape.path(file.getFileName())}" + } + + static String toGsUriFromContainerMount(Path containerPath) { + final s = containerPath.toString() + final prefix = GoogleBatchScriptLauncher.MOUNT_ROOT + '/' + if( !s.startsWith(prefix) ) + throw new IllegalArgumentException("Expected path under ${GoogleBatchScriptLauncher.MOUNT_ROOT}, got: $s") + final rest = s.substring(prefix.length()) + final slash = rest.indexOf('/') + if( slash < 0 ) + return "gs://$rest/" + final bucket = rest.substring(0, slash) + final obj = rest.substring(slash) + return "gs://$bucket$obj" + } +} diff --git a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchScriptLauncher.groovy b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchScriptLauncher.groovy index 9db0bf9fb2..feda0aedd9 100644 --- a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchScriptLauncher.groovy +++ b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchScriptLauncher.groovy @@ -25,7 +25,10 @@ import com.google.cloud.storage.contrib.nio.CloudStoragePath import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import nextflow.cloud.google.GoogleOpts +import nextflow.cloud.google.batch.client.BatchConfig import nextflow.executor.BashWrapperBuilder +import nextflow.executor.ScriptFileCopyStrategy +import nextflow.executor.SimpleFileCopyStrategy import nextflow.extension.FilesEx import nextflow.processor.TaskBean import nextflow.processor.TaskRun @@ -42,7 +45,7 @@ import nextflow.util.TestOnly @CompileStatic class GoogleBatchScriptLauncher extends BashWrapperBuilder implements GoogleBatchLauncherSpec { - private static final String MOUNT_ROOT = '/mnt/disks' + public static final String MOUNT_ROOT = '/mnt/disks' private GoogleOpts config private CloudStoragePath remoteWorkDir @@ -54,8 +57,8 @@ class GoogleBatchScriptLauncher extends BashWrapperBuilder implements GoogleBatc @TestOnly protected GoogleBatchScriptLauncher() {} - GoogleBatchScriptLauncher(TaskBean bean, Path remoteBinDir) { - super(bean) + GoogleBatchScriptLauncher(TaskBean bean, Path remoteBinDir, BatchConfig batchConfig) { + super(defaultStageOutMode(bean), copyStrategyFor(bean, batchConfig)) // keep track the google storage work dir this.remoteWorkDir = (CloudStoragePath) bean.workDir this.remoteBinDir = toContainerMount(remoteBinDir) @@ -101,6 +104,20 @@ class GoogleBatchScriptLauncher extends BashWrapperBuilder implements GoogleBatc scratch = true } + private static ScriptFileCopyStrategy copyStrategyFor(TaskBean bean, BatchConfig batchConfig) { + batchConfig.usesGoogleBatchStaging() + ? new GoogleBatchFileCopyStrategy(bean, batchConfig) + : new SimpleFileCopyStrategy(bean) + } + + private static TaskBean defaultStageOutMode(TaskBean bean) { + // Unstaging is cross-device on Google Batch (gcsfuse-mounted work dir). + // `move` can fail with overlapping outputs or symlinked paths. + if( bean.stageOutMode == null ) + bean.stageOutMode = 'copy' + return bean + } + protected String headerScript(TaskBean bean) { def result = "NXF_CHDIR=${Escape.path(bean.workDir)}\n" if( remoteBinDir ) { diff --git a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy index 792ec949d5..656f875955 100644 --- a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy +++ b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy @@ -169,7 +169,7 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask { } else { final taskBean = task.toTaskBean() - return new GoogleBatchScriptLauncher(taskBean, executor.remoteBinDir) + return new GoogleBatchScriptLauncher(taskBean, executor.remoteBinDir, batchConfig) .withConfig(executor.googleOpts) .withIsArray(task.isArray()) } diff --git a/plugins/nf-google/src/main/nextflow/cloud/google/batch/client/BatchConfig.groovy b/plugins/nf-google/src/main/nextflow/cloud/google/batch/client/BatchConfig.groovy index b3d28343fa..8fc07bab80 100644 --- a/plugins/nf-google/src/main/nextflow/cloud/google/batch/client/BatchConfig.groovy +++ b/plugins/nf-google/src/main/nextflow/cloud/google/batch/client/BatchConfig.groovy @@ -20,12 +20,25 @@ import java.nio.file.Path import groovy.transform.CompileStatic import groovy.util.logging.Slf4j +import nextflow.cloud.CloudTransferOptions import nextflow.config.spec.ConfigOption import nextflow.config.spec.ConfigScope import nextflow.script.dsl.Description +import nextflow.util.Duration import nextflow.util.MemoryUnit /** - * Model Google Batch config settings + * Model Google Batch config settings. + *

+ * Copy transports ({@link #stageInCopyTransport}, {@link #stageOutCopyTransport}): optional values + * {@code posix}, {@code gcloud}, {@code gsutil}. When unset or only {@code posix}, staging uses + * {@link nextflow.executor.SimpleFileCopyStrategy} with gcsfuse paths under {@code /mnt/disks}. When {@code gcloud} + * or {@code gsutil} is set for a direction, {@link nextflow.cloud.google.batch.GoogleBatchFileCopyStrategy} loads and + * generated bash uses {@code gcloud storage} / {@code gsutil} with fallbacks to POSIX copy from the mount; order is + * chosen per transport (see {@link nextflow.cloud.google.batch.GoogleBatchBashLib}). CLI stage-in applies when + * {@code process stageInMode} is {@code copy}; CLI stage-out when effective {@code stageOutMode} is {@code copy}. + * {@code move} / {@code rsync} / etc. keep existing POSIX behaviour. Parallelism and retries use + * {@link #maxParallelTransfers}, {@link #maxTransferAttempts}, {@link #delayBetweenAttempts} with {@code nxf_parallel} + * / {@code nxf_cp_retry} in the task script. * * @author Paolo Di Tommaso */ @@ -33,6 +46,24 @@ import nextflow.util.MemoryUnit @CompileStatic class BatchConfig implements ConfigScope { + /** + * Copy via POSIX {@code cp}/links from gcsfuse paths under {@code /mnt/disks/...}. + */ + static final String COPY_TRANSPORT_POSIX = 'posix' + + /** + * Copy using {@code gcloud storage} first, then {@code gsutil}, then POSIX from the mount (see {@link nextflow.cloud.google.batch.GoogleBatchBashLib}). + */ + static final String COPY_TRANSPORT_GCLOUD = 'gcloud' + + /** + * Copy using {@code gsutil} first, then {@code gcloud storage}, then POSIX from the mount. + */ + static final String COPY_TRANSPORT_GSUTIL = 'gsutil' + + static final private List VALID_COPY_TRANSPORTS = List.of(COPY_TRANSPORT_POSIX, COPY_TRANSPORT_GCLOUD, COPY_TRANSPORT_GSUTIL) + static final private List CLI_COPY_TRANSPORTS = List.of(COPY_TRANSPORT_GCLOUD, COPY_TRANSPORT_GSUTIL) + static final private int DEFAULT_MAX_SPOT_ATTEMPTS = 0 static final private List DEFAULT_RETRY_LIST = List.of(50001) @@ -79,6 +110,48 @@ class BatchConfig implements ConfigScope { """) final List gcsfuseOptions + @ConfigOption + @Description(""" + `gcloud` executable path (default: `gcloud` on `PATH`). + """) + final String gcloudCli + + @ConfigOption + @Description(""" + `gsutil` executable path (default: `gsutil` on `PATH`). + """) + final String gsutilCli + + @ConfigOption + @Description(""" + Max parallel CLI object copies (default: same as other cloud executors). + """) + final int maxParallelTransfers + + @ConfigOption + @Description(""" + Max retries per CLI copy (default: same as other cloud executors). + """) + final int maxTransferAttempts + + @ConfigOption + @Description(""" + Delay between CLI copy retries (default: same as other cloud executors). + """) + final Duration delayBetweenAttempts + + @ConfigOption + @Description(""" + Stage-in copy transport: `posix`, `gcloud`, or `gsutil`. + """) + final String stageInCopyTransport + + @ConfigOption + @Description(""" + Stage-out copy transport: `posix`, `gcloud`, or `gsutil`. + """) + final String stageOutCopyTransport + @ConfigOption @Description(""" """) @@ -152,6 +225,17 @@ class BatchConfig implements ConfigScope { bootDiskSize = opts.bootDiskSize as MemoryUnit cpuPlatform = opts.cpuPlatform gcsfuseOptions = opts.gcsfuseOptions as List ?: DEFAULT_GCSFUSE_OPTS + gcloudCli = opts.gcloudCli as String + gsutilCli = opts.gsutilCli as String + maxParallelTransfers = opts.maxParallelTransfers != null ? opts.maxParallelTransfers as int : CloudTransferOptions.MAX_TRANSFER + maxTransferAttempts = opts.maxTransferAttempts != null ? opts.maxTransferAttempts as int : CloudTransferOptions.MAX_TRANSFER_ATTEMPTS + delayBetweenAttempts = opts.delayBetweenAttempts ? opts.delayBetweenAttempts as Duration : CloudTransferOptions.DEFAULT_DELAY_BETWEEN_ATTEMPTS + stageInCopyTransport = opts.stageInCopyTransport as String + stageOutCopyTransport = opts.stageOutCopyTransport as String + for (String t in [stageInCopyTransport, stageOutCopyTransport]) { + if (t && t !in VALID_COPY_TRANSPORTS) + throw new IllegalArgumentException("Invalid google.batch copy transport: '$t' — valid values are: ${VALID_COPY_TRANSPORTS.join(', ')}") + } installGpuDrivers = opts.installGpuDrivers as boolean installOpsAgent = opts.installOpsAgent as boolean logsPath = opts.logsPath @@ -166,6 +250,17 @@ class BatchConfig implements ConfigScope { usePrivateAddress = opts.usePrivateAddress as boolean } + /** + * Load {@link nextflow.cloud.google.batch.GoogleBatchFileCopyStrategy} when any transport requests CLI object-storage copy. + */ + boolean usesGoogleBatchStaging() { + return [stageInCopyTransport, stageOutCopyTransport].any { isCliCopyTransport(it) } + } + + static boolean isCliCopyTransport(String transport) { + return transport != null && transport in CLI_COPY_TRANSPORTS + } + BatchRetryConfig getRetryConfig() { retry } Path logsPath() { diff --git a/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchFileCopyStrategyTest.groovy b/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchFileCopyStrategyTest.groovy new file mode 100644 index 0000000000..a5642e791e --- /dev/null +++ b/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchFileCopyStrategyTest.groovy @@ -0,0 +1,145 @@ +/* + * Copyright 2013-2026, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.cloud.google.batch + +import java.nio.file.Paths + +import nextflow.cloud.google.batch.client.BatchConfig +import nextflow.processor.TaskBean +import spock.lang.Specification + +class GoogleBatchFileCopyStrategyTest extends Specification { + + def 'should map container mount path to gs uri' () { + expect: + GoogleBatchFileCopyStrategy.toGsUriFromContainerMount(Paths.get('/mnt/disks/mybucket/work/dir')) == 'gs://mybucket/work/dir' + GoogleBatchFileCopyStrategy.toGsUriFromContainerMount(Paths.get('/mnt/disks/onlybucket')) == 'gs://onlybucket/' + } + + def 'should build gs download staging line when gcloud copy transport' () { + given: + def batch = new BatchConfig([ + stageInCopyTransport: 'gcloud' + ]) + def bean = Mock(TaskBean) { + getWorkDir() >> Paths.get('/mnt/disks/w/x') + getTargetDir() >> Paths.get('/mnt/disks/w/x') + getStageInMode() >> 'copy' + } + def path = Paths.get('/mnt/disks/b/data/in.txt') + def copy = new GoogleBatchFileCopyStrategy(bean, batch) + + expect: + copy.stageInputFile(path, 'in.txt') == 'downloads+=("nxf_gs_download \'gs://b/data/in.txt\' in.txt")' + } + + def 'should build gs download staging line with retry when maxTransferAttempts > 1' () { + given: + def batch = new BatchConfig([ + stageInCopyTransport: 'gcloud', + maxTransferAttempts: 3 + ]) + def bean = Mock(TaskBean) { + getWorkDir() >> Paths.get('/mnt/disks/w/x') + getTargetDir() >> Paths.get('/mnt/disks/w/x') + getStageInMode() >> 'copy' + } + def path = Paths.get('/mnt/disks/b/data/in.txt') + def copy = new GoogleBatchFileCopyStrategy(bean, batch) + + expect: + copy.stageInputFile(path, 'in.txt') == 'downloads+=("nxf_cp_retry nxf_gs_download \'gs://b/data/in.txt\' in.txt")' + } + + def 'should build gs upload unstage script' () { + given: + def batch = new BatchConfig([stageOutCopyTransport: 'gcloud']) + def wd = Paths.get('/mnt/disks/foo/wd') + def bean = Mock(TaskBean) { + getWorkDir() >> wd + getTargetDir() >> wd + getStageOutMode() >> null + } + def copy = new GoogleBatchFileCopyStrategy(bean, batch) + + when: + def script = copy.getUnstageOutputFilesScript(['out.txt'], wd) + then: + script.trim() == ''' + uploads=() + IFS=$'\\n' + for name in $(eval "ls -1d out.txt" | sort | uniq); do + uploads+=("nxf_gs_upload '$name' 'gs://foo/wd'") + done + unset IFS + nxf_parallel "${uploads[@]}" + ''' + .stripIndent().trim() + } + + def 'should use posix symlink when stageIn is not cli copy' () { + given: + def batch = new BatchConfig([stageOutCopyTransport: 'gcloud']) + def wd = Paths.get('/mnt/disks/foo/wd') + def bean = Mock(TaskBean) { + getWorkDir() >> wd + getTargetDir() >> wd + getStageInMode() >> 'symlink' + } + def path = Paths.get('/mnt/disks/foo/bucket/data.txt') + def copy = new GoogleBatchFileCopyStrategy(bean, batch) + + expect: + copy.stageInputFile(path, 'data.txt') == 'ln -s /mnt/disks/foo/bucket/data.txt data.txt' + } + + def 'should use posix move when stageOutMode is move' () { + given: + def batch = new BatchConfig([stageOutCopyTransport: 'gcloud']) + def wd = Paths.get('/mnt/disks/b/work') + def td = Paths.get('/mnt/disks/b/out') + def bean = Mock(TaskBean) { + getWorkDir() >> wd + getTargetDir() >> td + getStageOutMode() >> 'move' + } + def copy = new GoogleBatchFileCopyStrategy(bean, batch) + + when: + def script = copy.getUnstageOutputFilesScript(['x.txt'], td) + then: + script.contains('nxf_fs_move') + script.contains('/mnt/disks/b/out') + } + + def 'should use posix copy when stageOutCopyTransport is posix' () { + given: + def batch = new BatchConfig([stageOutCopyTransport: 'posix']) + def wd = Paths.get('/mnt/disks/b/work') + def bean = Mock(TaskBean) { + getWorkDir() >> wd + getTargetDir() >> wd + getStageOutMode() >> 'copy' + } + def copy = new GoogleBatchFileCopyStrategy(bean, batch) + + when: + def script = copy.getUnstageOutputFilesScript(['out.txt'], wd) + then: + script.contains('nxf_fs_copy') + } +} diff --git a/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchScriptLauncherTest.groovy b/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchScriptLauncherTest.groovy index 1982722518..d833de90c5 100644 --- a/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchScriptLauncherTest.groovy +++ b/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchScriptLauncherTest.groovy @@ -17,10 +17,10 @@ package nextflow.cloud.google.batch import java.nio.file.Paths - import com.google.cloud.storage.contrib.nio.CloudStorageFileSystem import nextflow.cloud.google.GoogleOpts import nextflow.cloud.google.batch.client.BatchConfig +import nextflow.processor.TaskBean import nextflow.processor.TaskRun import spock.lang.Specification import spock.lang.Unroll @@ -87,6 +87,24 @@ class GoogleBatchScriptLauncherTest extends Specification{ volumes[1].getMountOptionsList() == ['-o rw', '-implicit-dirs', '-o allow_other', '--uid=1000', '--billing-project my-project'] } + def 'should default stageOutMode to copy when not set' () { + given: + def workDir = CloudStorageFileSystem.forBucket('foo').getPath('/scratch') + def targetDir = CloudStorageFileSystem.forBucket('foo').getPath('/scratch') + def bean = new TaskBean( + workDir: workDir, + targetDir: targetDir, + stageOutMode: null, + inputFiles: [:] + ) + + when: + new GoogleBatchScriptLauncher(bean, null, Mock(BatchConfig)) + + then: + bean.stageOutMode == 'copy' + } + def 'should return target files in remote work dir' () { given: def launcher = new GoogleBatchScriptLauncher() diff --git a/plugins/nf-google/src/test/nextflow/cloud/google/batch/client/BatchConfigTest.groovy b/plugins/nf-google/src/test/nextflow/cloud/google/batch/client/BatchConfigTest.groovy index 13e2f7e06f..6810d11c12 100644 --- a/plugins/nf-google/src/test/nextflow/cloud/google/batch/client/BatchConfigTest.groovy +++ b/plugins/nf-google/src/test/nextflow/cloud/google/batch/client/BatchConfigTest.groovy @@ -16,6 +16,8 @@ package nextflow.cloud.google.batch.client +import nextflow.cloud.CloudTransferOptions +import nextflow.util.Duration import nextflow.util.MemoryUnit import spock.lang.Specification /** @@ -37,6 +39,30 @@ class BatchConfigTest extends Specification { !config.bootDiskImage !config.bootDiskSize !config.logsPath + and: + config.maxParallelTransfers == CloudTransferOptions.MAX_TRANSFER + config.maxTransferAttempts == CloudTransferOptions.MAX_TRANSFER_ATTEMPTS + config.delayBetweenAttempts == CloudTransferOptions.DEFAULT_DELAY_BETWEEN_ATTEMPTS + !config.gcloudCli + !config.gsutilCli + and: + !config.stageInCopyTransport + !config.stageOutCopyTransport + !config.usesGoogleBatchStaging() + } + + def 'should reject invalid copy transport' () { + when: + new BatchConfig([stageInCopyTransport: 'ftp']) + then: + thrown(IllegalArgumentException) + } + + def 'should detect google batch staging when cli transport set' () { + expect: + new BatchConfig([stageOutCopyTransport: 'gcloud']).usesGoogleBatchStaging() + new BatchConfig([stageInCopyTransport: 'gsutil']).usesGoogleBatchStaging() + !new BatchConfig([stageInCopyTransport: 'posix']).usesGoogleBatchStaging() } def 'should create batch config with custom settings' () { @@ -49,7 +75,13 @@ class BatchConfigTest extends Specification { bootDiskImage: 'batch-foo', bootDiskSize: '100GB', logsPath: 'gs://my-logs-bucket/logs', - installOpsAgent: true + installOpsAgent: true, + stageInCopyTransport: 'gcloud', + stageOutCopyTransport: 'posix', + gcloudCli: '/opt/google/gcloud', + maxParallelTransfers: 8, + maxTransferAttempts: 3, + delayBetweenAttempts: '5s' ] when: @@ -67,6 +99,13 @@ class BatchConfigTest extends Specification { config.logsPath == 'gs://my-logs-bucket/logs' and: config.installOpsAgent == true + and: + config.stageInCopyTransport == 'gcloud' + config.stageOutCopyTransport == 'posix' + config.gcloudCli == '/opt/google/gcloud' + config.maxParallelTransfers == 8 + config.maxTransferAttempts == 3 + config.delayBetweenAttempts == Duration.of('5s') } }