diff --git a/packages/cdk/lib/env/context-app-parameters.ts b/packages/cdk/lib/env/context-app-parameters.ts index 0ce1447d..fd7e1820 100644 --- a/packages/cdk/lib/env/context-app-parameters.ts +++ b/packages/cdk/lib/env/context-app-parameters.ts @@ -105,6 +105,10 @@ export class ContextAppParameters { * Map of custom tags to be applied to all the infrastructure in the context. */ public readonly customTags: { [key: string]: string }; + /** + * Map of custom environment variables to be passed to the WES Adapter. + */ + public readonly customWesEnvVars?: { [key: string]: string }; constructor(node: Node) { const instanceTypeStrings = getEnvStringListOrDefault(node, "BATCH_COMPUTE_INSTANCE_TYPES"); @@ -142,6 +146,16 @@ export class ContextAppParameters { } else { this.customTags = {}; } + + const customWesEnvVarsString = getEnvStringOrDefault(node, "CUSTOM_WES_ENV_VARS"); + + if (customWesEnvVarsString) { + const customWesEnvVars: Array<{ Key: string; Value: string }> = JSON.parse(customWesEnvVarsString); + if (customWesEnvVars && customWesEnvVars.length) { + this.customWesEnvVars = {}; + customWesEnvVars.forEach((customWesEnvVar) => (this.customWesEnvVars![customWesEnvVar["Key"]] = customWesEnvVar["Value"])); + } + } } public getContextBucketPath(): string { diff --git a/packages/cdk/lib/stacks/engines/cromwell-engine-construct.ts b/packages/cdk/lib/stacks/engines/cromwell-engine-construct.ts index db11f555..e605636e 100644 --- a/packages/cdk/lib/stacks/engines/cromwell-engine-construct.ts +++ b/packages/cdk/lib/stacks/engines/cromwell-engine-construct.ts @@ -69,6 +69,7 @@ export class CromwellEngineConstruct extends EngineConstruct { contextName: params.contextName, userId: params.userId, engineEndpoint: this.engine.loadBalancer.loadBalancerDnsName, + customEnvs: params.customWesEnvVars, }); this.adapterLogGroup = lambda.logGroup; @@ -130,21 +131,18 @@ export class CromwellEngineConstruct extends EngineConstruct { return engine; } - private renderAdapterLambda({ role, jobQueueArn, engineLogGroupName, projectName, contextName, userId, engineEndpoint, vpc }) { - return super.renderPythonLambda( - this, - "CromwellWesAdapterLambda", - role, - { - ENGINE_NAME: "cromwell", - ENGINE_ENDPOINT: engineEndpoint, - ENGINE_LOG_GROUP: engineLogGroupName, - JOB_QUEUE: jobQueueArn, - PROJECT_NAME: projectName, - CONTEXT_NAME: contextName, - USER_ID: userId, - }, - vpc - ); + private renderAdapterLambda({ role, jobQueueArn, engineLogGroupName, projectName, contextName, userId, engineEndpoint, vpc, customEnvs }) { + const environment = { + ...customEnvs, + ENGINE_NAME: "cromwell", + ENGINE_ENDPOINT: engineEndpoint, + ENGINE_LOG_GROUP: engineLogGroupName, + JOB_QUEUE: jobQueueArn, + PROJECT_NAME: projectName, + CONTEXT_NAME: contextName, + USER_ID: userId, + }; + + return super.renderPythonLambda(this, "CromwellWesAdapterLambda", role, environment, vpc); } } diff --git a/packages/cdk/lib/stacks/engines/miniwdl-engine-construct.ts b/packages/cdk/lib/stacks/engines/miniwdl-engine-construct.ts index 935f5b71..8573ad51 100644 --- a/packages/cdk/lib/stacks/engines/miniwdl-engine-construct.ts +++ b/packages/cdk/lib/stacks/engines/miniwdl-engine-construct.ts @@ -93,6 +93,7 @@ export class MiniwdlEngineConstruct extends EngineConstruct { jobDefinitionArn: this.miniwdlEngine.headJobDefinition.jobDefinitionArn, rootDirS3Uri: rootDirS3Uri, vpc: props.contextParameters.usePublicSubnets ? undefined : props.vpc, + customEnvs: props.contextParameters.customWesEnvVars, }); this.adapterLogGroup = lambda.logGroup; @@ -144,18 +145,15 @@ export class MiniwdlEngineConstruct extends EngineConstruct { return [this.batchHead.role, this.batchWorkers.role]; } - private renderAdapterLambda({ role, jobQueueArn, jobDefinitionArn, rootDirS3Uri, vpc }) { - return super.renderPythonLambda( - this, - "MiniWDLWesAdapterLambda", - role, - { - ENGINE_NAME: ENGINE_MINIWDL, - JOB_QUEUE: jobQueueArn, - JOB_DEFINITION: jobDefinitionArn, - OUTPUT_DIR_S3_URI: rootDirS3Uri, - }, - vpc - ); + private renderAdapterLambda({ role, jobQueueArn, jobDefinitionArn, rootDirS3Uri, vpc, customEnvs }) { + const environment = { + ...customEnvs, + ENGINE_NAME: ENGINE_MINIWDL, + JOB_QUEUE: jobQueueArn, + JOB_DEFINITION: jobDefinitionArn, + OUTPUT_DIR_S3_URI: rootDirS3Uri, + }; + + return super.renderPythonLambda(this, "MiniWDLWesAdapterLambda", role, environment, vpc); } } diff --git a/packages/cdk/lib/stacks/engines/nextflow-engine-construct.ts b/packages/cdk/lib/stacks/engines/nextflow-engine-construct.ts index b4227d73..ae012255 100644 --- a/packages/cdk/lib/stacks/engines/nextflow-engine-construct.ts +++ b/packages/cdk/lib/stacks/engines/nextflow-engine-construct.ts @@ -63,6 +63,7 @@ export class NextflowEngineConstruct extends EngineConstruct { jobDefinitionArn: this.nextflowEngine.headJobDefinition.jobDefinitionArn, engineLogGroupName: engineLogGroup.logGroupName, vpc: props.contextParameters.usePublicSubnets ? undefined : props.vpc, + customEnvs: props.contextParameters.customWesEnvVars, }); this.adapterLogGroup = lambda.logGroup; @@ -82,18 +83,15 @@ export class NextflowEngineConstruct extends EngineConstruct { }; } - private renderAdapterLambda({ role, jobQueueArn, jobDefinitionArn, engineLogGroupName, vpc }) { - return super.renderPythonLambda( - this, - "NextflowWesAdapterLambda", - role, - { - ENGINE_NAME: "nextflow", - JOB_QUEUE: jobQueueArn, - JOB_DEFINITION: jobDefinitionArn, - ENGINE_LOG_GROUP: engineLogGroupName, - }, - vpc - ); + private renderAdapterLambda({ role, jobQueueArn, jobDefinitionArn, engineLogGroupName, vpc, customEnvs }) { + const environment = { + ...customEnvs, + ENGINE_NAME: "nextflow", + JOB_QUEUE: jobQueueArn, + JOB_DEFINITION: jobDefinitionArn, + ENGINE_LOG_GROUP: engineLogGroupName, + }; + + return super.renderPythonLambda(this, "NextflowWesAdapterLambda", role, environment, vpc); } } diff --git a/packages/cdk/lib/stacks/engines/snakemake-engine-construct.ts b/packages/cdk/lib/stacks/engines/snakemake-engine-construct.ts index b7a73b51..a8e458e9 100644 --- a/packages/cdk/lib/stacks/engines/snakemake-engine-construct.ts +++ b/packages/cdk/lib/stacks/engines/snakemake-engine-construct.ts @@ -59,6 +59,7 @@ export class SnakemakeEngineConstruct extends EngineConstruct { taskQueueArn: this.batchWorkers.jobQueue.jobQueueArn, fsapId: this.snakemakeEngine.fsap.accessPointId, outputBucket: params.getEngineBucketPath(), + customEnvs: contextParameters.customWesEnvVars, }); this.adapterLogGroup = lambda.logGroup; @@ -173,22 +174,19 @@ export class SnakemakeEngineConstruct extends EngineConstruct { }); } - private renderAdapterLambda({ vpc, role, jobQueueArn, jobDefinitionArn, taskQueueArn, workflowRoleArn, fsapId, outputBucket }) { - return super.renderPythonLambda( - this, - "SnakemakeWesAdapterLambda", - role, - { - ENGINE_NAME: "snakemake", - JOB_QUEUE: jobQueueArn, - JOB_DEFINITION: jobDefinitionArn, - TASK_QUEUE: taskQueueArn, - WORKFLOW_ROLE: workflowRoleArn, - FSAP_ID: fsapId, - OUTPUT_DIR_S3_URI: outputBucket, - TIME: Date.now().toString(), - }, - vpc - ); + private renderAdapterLambda({ vpc, role, jobQueueArn, jobDefinitionArn, taskQueueArn, workflowRoleArn, fsapId, outputBucket, customEnvs }) { + const environment = { + ...customEnvs, + ENGINE_NAME: "snakemake", + JOB_QUEUE: jobQueueArn, + JOB_DEFINITION: jobDefinitionArn, + TASK_QUEUE: taskQueueArn, + WORKFLOW_ROLE: workflowRoleArn, + FSAP_ID: fsapId, + OUTPUT_DIR_S3_URI: outputBucket, + TIME: Date.now().toString(), + }; + + return super.renderPythonLambda(this, "SnakemakeWesAdapterLambda", role, environment, vpc); } } diff --git a/packages/cli/internal/pkg/cli/context/context_environment.go b/packages/cli/internal/pkg/cli/context/context_environment.go index 59a23cf3..74002cb6 100644 --- a/packages/cli/internal/pkg/cli/context/context_environment.go +++ b/packages/cli/internal/pkg/cli/context/context_environment.go @@ -29,12 +29,13 @@ type contextEnvironment struct { AdapterDesignation string AdapterRepository string - ArtifactBucketName string - ReadBucketArns string - ReadWriteBucketArns string - InstanceTypes string - ResourceType string - MaxVCpus int + ArtifactBucketName string + ReadBucketArns string + ReadWriteBucketArns string + InstanceTypes string + ResourceType string + MaxVCpus int + CustomWesEnvVarsJson string RequestSpotInstances bool UsePublicSubnets bool @@ -42,13 +43,14 @@ type contextEnvironment struct { func (input contextEnvironment) ToEnvironmentList() []string { return environmentMapToList(map[string]string{ - "PROJECT": input.ProjectName, - "CONTEXT": input.ContextName, - "USER_ID": input.UserId, - "USER_EMAIL": input.UserEmail, - "OUTPUT_BUCKET": input.OutputBucketName, - "AGC_VERSION": version.Version, - "CUSTOM_TAGS": input.CustomTagsJson, + "PROJECT": input.ProjectName, + "CONTEXT": input.ContextName, + "USER_ID": input.UserId, + "USER_EMAIL": input.UserEmail, + "OUTPUT_BUCKET": input.OutputBucketName, + "AGC_VERSION": version.Version, + "CUSTOM_TAGS": input.CustomTagsJson, + "CUSTOM_WES_ENV_VARS": input.CustomWesEnvVarsJson, "ENGINE_NAME": input.EngineName, "FILESYSTEM_TYPE": input.FilesystemType, diff --git a/packages/cli/internal/pkg/cli/context/manager.go b/packages/cli/internal/pkg/cli/context/manager.go index b99486ae..a7dc9ea9 100644 --- a/packages/cli/internal/pkg/cli/context/manager.go +++ b/packages/cli/internal/pkg/cli/context/manager.go @@ -1,6 +1,7 @@ package context import ( + "encoding/json" "fmt" "net/url" "strings" @@ -225,6 +226,12 @@ func (m *Manager) setContextEnv(contextName string) { return } + customWesEnvVarsJsonBytes, err := json.Marshal(m.contextSpec.CustomWesEnvVars) + if err != nil { + m.err = err + return + } + m.contextEnv = contextEnvironment{ ProjectName: m.projectSpec.Name, ContextName: contextName, @@ -236,6 +243,7 @@ func (m *Manager) setContextEnv(contextName string) { ReadBucketArns: strings.Join(m.readBuckets, listDelimiter), ReadWriteBucketArns: strings.Join(m.readWriteBuckets, listDelimiter), InstanceTypes: strings.Join(m.contextSpec.InstanceTypes, listDelimiter), + CustomWesEnvVarsJson: string(customWesEnvVarsJsonBytes), MaxVCpus: m.contextSpec.MaxVCpus, RequestSpotInstances: m.contextSpec.RequestSpotInstances, UsePublicSubnets: m.contextSpec.UsePublicSubnets, diff --git a/packages/cli/internal/pkg/cli/context/manager_deploy_test.go b/packages/cli/internal/pkg/cli/context/manager_deploy_test.go index 5d598e20..8cda8c9d 100644 --- a/packages/cli/internal/pkg/cli/context/manager_deploy_test.go +++ b/packages/cli/internal/pkg/cli/context/manager_deploy_test.go @@ -19,7 +19,7 @@ const ( // We check a lot of generated CDK commands to make sure they have the // right number of command line arguments. How many should there be to // start? - testCdkBaseArgumentCount = 27 + testCdkBaseArgumentCount = 28 // And how many do we expect if the WES adapter images are also to be // passed? testCdkAdaptedArgumentCount = testCdkBaseArgumentCount + 4 diff --git a/packages/cli/internal/pkg/cli/spec/context.go b/packages/cli/internal/pkg/cli/spec/context.go index 38fcbb02..cdd55241 100644 --- a/packages/cli/internal/pkg/cli/spec/context.go +++ b/packages/cli/internal/pkg/cli/spec/context.go @@ -14,12 +14,17 @@ type Engine struct { Engine string `yaml:"engine"` Filesystem Filesystem `yaml:"filesystem,omitempty"` } +type CustomWesEnvVar struct { + Key string `yaml:"key"` + Value string `yaml:"value"` +} type Context struct { - InstanceTypes []string `yaml:"instanceTypes,omitempty"` - RequestSpotInstances bool `yaml:"requestSpotInstances,omitempty"` - MaxVCpus int `yaml:"maxVCpus,omitempty"` - UsePublicSubnets bool `yaml:"usePublicSubnets,omitempty"` - Engines []Engine `yaml:"engines"` + InstanceTypes []string `yaml:"instanceTypes,omitempty"` + RequestSpotInstances bool `yaml:"requestSpotInstances,omitempty"` + MaxVCpus int `yaml:"maxVCpus,omitempty"` + UsePublicSubnets bool `yaml:"usePublicSubnets,omitempty"` + Engines []Engine `yaml:"engines"` + CustomWesEnvVars []CustomWesEnvVar `yaml:"customWesEnvVars"` } func (context *Context) UnmarshalYAML(unmarshal func(interface{}) error) error { diff --git a/packages/cli/internal/pkg/cli/spec/project_schema.json b/packages/cli/internal/pkg/cli/spec/project_schema.json index 995f8b1c..9da92e4b 100644 --- a/packages/cli/internal/pkg/cli/spec/project_schema.json +++ b/packages/cli/internal/pkg/cli/spec/project_schema.json @@ -151,6 +151,27 @@ }, "required": ["type", "engine"] } + }, + "customWesEnvVars": { + "type":[ + "array", + "null" + ], + "items": { + "type": "object", + "additionalProperties": false, + "properties": { + "key": { + "type": "string", + "minLength": 1 + }, + "value": { + "type": "string", + "minLength": 1 + } + }, + "required": ["key", "value"] + } } }, "required": [ diff --git a/packages/cli/internal/pkg/cli/spec/project_test.go b/packages/cli/internal/pkg/cli/spec/project_test.go index ada31392..6725a01b 100644 --- a/packages/cli/internal/pkg/cli/spec/project_test.go +++ b/packages/cli/internal/pkg/cli/spec/project_test.go @@ -37,6 +37,7 @@ func TestProjectYaml(t *testing.T) { }, }, }, + CustomWesEnvVars: []CustomWesEnvVar{}, }, }, }, @@ -69,6 +70,7 @@ contexts: engine: cromwell filesystem: fsType: S3 + customWesEnvVars: [] `, }, "empty": { @@ -103,6 +105,7 @@ schemaVersion: 0 }, }, }, + CustomWesEnvVars: []CustomWesEnvVar{}, }, "ctx2": { MaxVCpus: 256, @@ -115,6 +118,7 @@ schemaVersion: 0 }, }, }, + CustomWesEnvVars: []CustomWesEnvVar{}, }, "ctx3": { MaxVCpus: 256, @@ -124,6 +128,12 @@ schemaVersion: 0 Engine: "nextflow", }, }, + CustomWesEnvVars: []CustomWesEnvVar{ + { + Key: "k1", + Value: "v1", + }, + }, }, }, }, @@ -156,6 +166,7 @@ contexts: fsType: EFS configuration: provisionedThroughput: 50 + customWesEnvVars: [] ctx2: maxVCpus: 256 engines: @@ -163,11 +174,15 @@ contexts: engine: nextflow filesystem: fsType: S3 + customWesEnvVars: [] ctx3: maxVCpus: 256 engines: - type: nextflow engine: nextflow + customWesEnvVars: + - key: k1 + value: v1 `, }, } @@ -237,6 +252,7 @@ func TestGetContext(t *testing.T) { }, }, }, + CustomWesEnvVars: []CustomWesEnvVar{}, }, "ctx2": { Engines: []Engine{ @@ -248,6 +264,7 @@ func TestGetContext(t *testing.T) { }, }, }, + CustomWesEnvVars: []CustomWesEnvVar(nil), }, }, }, diff --git a/packages/cli/internal/pkg/constants/product.go b/packages/cli/internal/pkg/constants/product.go index 9f03f62a..fc454b1c 100644 --- a/packages/cli/internal/pkg/constants/product.go +++ b/packages/cli/internal/pkg/constants/product.go @@ -6,10 +6,11 @@ const ( AppTagValue = "agc" AgcVersionKey = "agc-version" - CustomTagEnvKey = "CUSTOM_TAGS" - AgcBucketNameEnvKey = "AGC_BUCKET_NAME" - CreateBucketEnvKey = "CREATE_AGC_BUCKET" - AgcVersionEnvKey = "AGC_VERSION" - VpcIdEnvKey = "VPC_ID" - PublicSubnetsEnvKey = "AGC_USE_PUBLIC_SUBNETS" + CustomTagEnvKey = "CUSTOM_TAGS" + AgcBucketNameEnvKey = "AGC_BUCKET_NAME" + CreateBucketEnvKey = "CREATE_AGC_BUCKET" + AgcVersionEnvKey = "AGC_VERSION" + VpcIdEnvKey = "VPC_ID" + PublicSubnetsEnvKey = "AGC_USE_PUBLIC_SUBNETS" + CustomWesEnvVarEnvKey = "CUSTOM_WES_ENV_VARS" ) diff --git a/packages/engines/miniwdl/buildspec.yml b/packages/engines/miniwdl/buildspec.yml index b6e88112..f2dfde91 100644 --- a/packages/engines/miniwdl/buildspec.yml +++ b/packages/engines/miniwdl/buildspec.yml @@ -5,7 +5,7 @@ env: variables: # These variables may be over-ridden as appropriate by the CI/CD pipeline MINIWDL_IMAGE_NAME: "miniwdl" - MINIWDL_VERSION: "v0.1.11" + MINIWDL_VERSION: "v0.2.0" phases: pre_build: diff --git a/packages/wes_adapter/amazon_genomics/wes/adapters/BatchAdapter.py b/packages/wes_adapter/amazon_genomics/wes/adapters/BatchAdapter.py index d5ac17fe..0abac6ea 100644 --- a/packages/wes_adapter/amazon_genomics/wes/adapters/BatchAdapter.py +++ b/packages/wes_adapter/amazon_genomics/wes/adapters/BatchAdapter.py @@ -2,6 +2,7 @@ import os import typing import time +import json from abc import abstractmethod from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime @@ -163,14 +164,38 @@ def run_workflow( workflow_attachment=workflow_attachment, ) - submit_job_response = self.aws_batch.submit_job( - jobName="agc-run-workflow", - jobQueue=self.job_queue, - jobDefinition=self.job_definition, - containerOverrides={ - "command": command, - }, - ) + """ + E.g. [{ "key": "k1", "value": { "k2": 15 } }, { "key": "k3", "value": 10 }] + """ + if os.environ.get("CUSTOM_WORKFLOW_JOB_ENV_VARS") is not None: + custom_environments = os.environ["CUSTOM_WORKFLOW_JOB_ENV_VARS"] + + environment = list( + map( + lambda key_value: { + "name": key_value["key"], + # Convert to string if a value is JSON + "value": json.dumps(key_value["value"]), + }, + json.loads(custom_environments), + ) + ) + + submit_job_response = self.aws_batch.submit_job( + jobName="agc-run-workflow", + jobQueue=self.job_queue, + jobDefinition=self.job_definition, + containerOverrides={"command": command, "environment": environment}, + ) + else: + submit_job_response = self.aws_batch.submit_job( + jobName="agc-run-workflow", + jobQueue=self.job_queue, + jobDefinition=self.job_definition, + containerOverrides={ + "command": command, + }, + ) return RunId(submit_job_response["jobId"]) def describe_job(self, job_id: str) -> Optional[JobDetailTypeDef]: diff --git a/site/content/en/docs/Concepts/contexts.md b/site/content/en/docs/Concepts/contexts.md index 27aee2d0..fe60c94c 100644 --- a/site/content/en/docs/Concepts/contexts.md +++ b/site/content/en/docs/Concepts/contexts.md @@ -106,6 +106,36 @@ contexts: engine: nextflow ``` +### Custom WES Adapter environment variables + +You may optionally specify custom environment variables to pass into WES Adapter +```yaml +contexts: + miniContext: + customWesEnvVars: + - key: k1 + value: v1 + - key: k2 + value: "[{ \"key\": \"k3\", \"value\": 10 }, { \"key\": \"k4\", \"value\": \"v4\" }]" + engines: + - type: wdl + engine: miniwdl +``` + +### Custom Workflow job environment variables + +You may optionally specify `CUSTOM_WORKFLOW_JOB_ENV_VARS` WES Adapter environment variable to pass custom environment variables into Workflow Job +```yaml +contexts: + miniContext: + customWesEnvVars: + - key: CUSTOM_WORKFLOW_JOB_ENV_VARS + value: "[{ \"key\": \"MINIWDL__AWS__BOTO3_RETRIES\", \"value\": { \"max_attempts\": 15, \"mode\": \"standard\" } }, { \"key\": \"MINIWDL__AWS__DESCRIBE_PERIOD\", \"value\": 10 }, { \"key\": \"MINIWDL__AWS__SUBMIT_PERIOD\", \"value\": 10 }]" + engines: + - type: wdl + engine: miniwdl +``` + ### Public Subnets In the interest of saving money, in particular if you intend to have the AGC stack deployed for a long period, you may choose to deploy in "public subnet" mode.