-
Notifications
You must be signed in to change notification settings - Fork 822
/
Copy pathmetaflow_config.py
606 lines (527 loc) · 22.8 KB
/
metaflow_config.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
import os
import sys
import types
from metaflow.exception import MetaflowException
from metaflow.metaflow_config_funcs import from_conf, get_validate_choice_fn
# Disable multithreading security on MacOS
if sys.platform == "darwin":
os.environ["OBJC_DISABLE_INITIALIZE_FORK_SAFETY"] = "YES"
## NOTE: Just like Click's auto_envar_prefix `METAFLOW` (see in cli.py), all environment
## variables here are also named METAFLOW_XXX. So, for example, in the statement:
## `DEFAULT_DATASTORE = from_conf("DEFAULT_DATASTORE", "local")`, to override the default
## value, either set `METAFLOW_DEFAULT_DATASTORE` in your configuration file or set
## an environment variable called `METAFLOW_DEFAULT_DATASTORE`
##
# Constants (NOTE: these need to live before any from_conf)
##
# Path to the local directory to store artifacts for 'local' datastore.
DATASTORE_LOCAL_DIR = ".metaflow"
# Local configuration file (in .metaflow) containing overrides per-project
LOCAL_CONFIG_FILE = "config.json"
###
# Default configuration
###
DEFAULT_DATASTORE = from_conf("DEFAULT_DATASTORE", "local")
DEFAULT_ENVIRONMENT = from_conf("DEFAULT_ENVIRONMENT", "local")
DEFAULT_EVENT_LOGGER = from_conf("DEFAULT_EVENT_LOGGER", "nullSidecarLogger")
DEFAULT_METADATA = from_conf("DEFAULT_METADATA", "local")
DEFAULT_MONITOR = from_conf("DEFAULT_MONITOR", "nullSidecarMonitor")
DEFAULT_PACKAGE_SUFFIXES = from_conf("DEFAULT_PACKAGE_SUFFIXES", ".py,.R,.RDS")
DEFAULT_AWS_CLIENT_PROVIDER = from_conf("DEFAULT_AWS_CLIENT_PROVIDER", "boto3")
DEFAULT_AZURE_CLIENT_PROVIDER = from_conf(
"DEFAULT_AZURE_CLIENT_PROVIDER", "azure-default"
)
DEFAULT_GCP_CLIENT_PROVIDER = from_conf("DEFAULT_GCP_CLIENT_PROVIDER", "gcp-default")
DEFAULT_SECRETS_BACKEND_TYPE = from_conf("DEFAULT_SECRETS_BACKEND_TYPE")
DEFAULT_SECRETS_ROLE = from_conf("DEFAULT_SECRETS_ROLE")
DEFAULT_FROM_DEPLOYMENT_IMPL = from_conf(
"DEFAULT_FROM_DEPLOYMENT_IMPL", "argo-workflows"
)
###
# User configuration
###
USER = from_conf("USER")
###
# Datastore configuration
###
DATASTORE_SYSROOT_LOCAL = from_conf("DATASTORE_SYSROOT_LOCAL")
# S3 bucket and prefix to store artifacts for 's3' datastore.
DATASTORE_SYSROOT_S3 = from_conf("DATASTORE_SYSROOT_S3")
# Azure Blob Storage container and blob prefix
DATASTORE_SYSROOT_AZURE = from_conf("DATASTORE_SYSROOT_AZURE")
DATASTORE_SYSROOT_GS = from_conf("DATASTORE_SYSROOT_GS")
# GS bucket and prefix to store artifacts for 'gs' datastore
###
# Datastore local cache
###
# Path to the client cache
CLIENT_CACHE_PATH = from_conf("CLIENT_CACHE_PATH", "/tmp/metaflow_client")
# Maximum size (in bytes) of the cache
CLIENT_CACHE_MAX_SIZE = from_conf("CLIENT_CACHE_MAX_SIZE", 10000)
# Maximum number of cached Flow and TaskDatastores in the cache
CLIENT_CACHE_MAX_FLOWDATASTORE_COUNT = from_conf(
"CLIENT_CACHE_MAX_FLOWDATASTORE_COUNT", 50
)
CLIENT_CACHE_MAX_TASKDATASTORE_COUNT = from_conf(
"CLIENT_CACHE_MAX_TASKDATASTORE_COUNT", CLIENT_CACHE_MAX_FLOWDATASTORE_COUNT * 100
)
###
# Datatools (S3) configuration
###
S3_ENDPOINT_URL = from_conf("S3_ENDPOINT_URL")
S3_VERIFY_CERTIFICATE = from_conf("S3_VERIFY_CERTIFICATE")
# Set ServerSideEncryption for S3 uploads
S3_SERVER_SIDE_ENCRYPTION = from_conf("S3_SERVER_SIDE_ENCRYPTION")
# S3 retry configuration
# This is useful if you want to "fail fast" on S3 operations; use with caution
# though as this may increase failures. Note that this is the number of *retries*
# so setting it to 0 means each operation will be tried once.
S3_RETRY_COUNT = from_conf("S3_RETRY_COUNT", 7)
# Number of concurrent S3 processes for parallel operations.
S3_WORKER_COUNT = from_conf("S3_WORKER_COUNT", 64)
# Number of retries on *transient* failures (such as SlowDown errors). Note
# that if after S3_TRANSIENT_RETRY_COUNT times, all operations haven't been done,
# it will try up to S3_RETRY_COUNT again so the total number of tries can be up to
# (S3_RETRY_COUNT + 1) * (S3_TRANSIENT_RETRY_COUNT + 1)
# You typically want this number fairly high as transient retires are "cheap" (only
# operations that have not succeeded retry as opposed to all operations for the
# top-level retries)
S3_TRANSIENT_RETRY_COUNT = from_conf("S3_TRANSIENT_RETRY_COUNT", 20)
# Threshold to start printing warnings for an AWS retry
RETRY_WARNING_THRESHOLD = 3
# S3 datatools root location
DATATOOLS_SUFFIX = from_conf("DATATOOLS_SUFFIX", "data")
DATATOOLS_S3ROOT = from_conf(
"DATATOOLS_S3ROOT",
(
os.path.join(DATASTORE_SYSROOT_S3, DATATOOLS_SUFFIX)
if DATASTORE_SYSROOT_S3
else None
),
)
TEMPDIR = from_conf("TEMPDIR", ".")
DATATOOLS_CLIENT_PARAMS = from_conf("DATATOOLS_CLIENT_PARAMS", {})
if S3_ENDPOINT_URL:
DATATOOLS_CLIENT_PARAMS["endpoint_url"] = S3_ENDPOINT_URL
if S3_VERIFY_CERTIFICATE:
DATATOOLS_CLIENT_PARAMS["verify"] = S3_VERIFY_CERTIFICATE
DATATOOLS_SESSION_VARS = from_conf("DATATOOLS_SESSION_VARS", {})
# Azure datatools root location
# Note: we do not expose an actual datatools library for Azure (like we do for S3)
# Similar to DATATOOLS_LOCALROOT, this is used ONLY by the IncludeFile's internal implementation.
DATATOOLS_AZUREROOT = from_conf(
"DATATOOLS_AZUREROOT",
(
os.path.join(DATASTORE_SYSROOT_AZURE, DATATOOLS_SUFFIX)
if DATASTORE_SYSROOT_AZURE
else None
),
)
# GS datatools root location
# Note: we do not expose an actual datatools library for GS (like we do for S3)
# Similar to DATATOOLS_LOCALROOT, this is used ONLY by the IncludeFile's internal implementation.
DATATOOLS_GSROOT = from_conf(
"DATATOOLS_GSROOT",
(
os.path.join(DATASTORE_SYSROOT_GS, DATATOOLS_SUFFIX)
if DATASTORE_SYSROOT_GS
else None
),
)
# Local datatools root location
DATATOOLS_LOCALROOT = from_conf(
"DATATOOLS_LOCALROOT",
(
os.path.join(DATASTORE_SYSROOT_LOCAL, DATATOOLS_SUFFIX)
if DATASTORE_SYSROOT_LOCAL
else None
),
)
# Secrets Backend - AWS Secrets Manager configuration
AWS_SECRETS_MANAGER_DEFAULT_REGION = from_conf("AWS_SECRETS_MANAGER_DEFAULT_REGION")
# Secrets Backend - GCP Secrets name prefix. With this, users don't have
# to specify the full secret name in the @secret decorator.
#
# Note that it makes a difference whether the prefix ends with a slash or not
# E.g. if secret name passed to @secret decorator is mysecret:
# - "projects/1234567890/secrets/" -> "projects/1234567890/secrets/mysecret"
# - "projects/1234567890/secrets/foo-" -> "projects/1234567890/secrets/foo-mysecret"
GCP_SECRET_MANAGER_PREFIX = from_conf("GCP_SECRET_MANAGER_PREFIX")
# Secrets Backend - Azure Key Vault prefix. With this, users don't have to
# specify the full https:// vault url in the @secret decorator.
#
# It does not make a difference if the prefix ends in a / or not. We will handle either
# case correctly.
AZURE_KEY_VAULT_PREFIX = from_conf("AZURE_KEY_VAULT_PREFIX")
# The root directory to save artifact pulls in, when using S3 or Azure
ARTIFACT_LOCALROOT = from_conf("ARTIFACT_LOCALROOT", os.getcwd())
# Cards related config variables
CARD_SUFFIX = "mf.cards"
CARD_LOCALROOT = from_conf("CARD_LOCALROOT")
CARD_S3ROOT = from_conf(
"CARD_S3ROOT",
os.path.join(DATASTORE_SYSROOT_S3, CARD_SUFFIX) if DATASTORE_SYSROOT_S3 else None,
)
CARD_AZUREROOT = from_conf(
"CARD_AZUREROOT",
(
os.path.join(DATASTORE_SYSROOT_AZURE, CARD_SUFFIX)
if DATASTORE_SYSROOT_AZURE
else None
),
)
CARD_GSROOT = from_conf(
"CARD_GSROOT",
os.path.join(DATASTORE_SYSROOT_GS, CARD_SUFFIX) if DATASTORE_SYSROOT_GS else None,
)
CARD_NO_WARNING = from_conf("CARD_NO_WARNING", False)
SKIP_CARD_DUALWRITE = from_conf("SKIP_CARD_DUALWRITE", False)
RUNTIME_CARD_RENDER_INTERVAL = from_conf("RUNTIME_CARD_RENDER_INTERVAL", 60)
# Azure storage account URL
AZURE_STORAGE_BLOB_SERVICE_ENDPOINT = from_conf("AZURE_STORAGE_BLOB_SERVICE_ENDPOINT")
# Azure storage can use process-based parallelism instead of threads.
# Processes perform better for high throughput workloads (e.g. many huge artifacts)
AZURE_STORAGE_WORKLOAD_TYPE = from_conf(
"AZURE_STORAGE_WORKLOAD_TYPE",
default="general",
validate_fn=get_validate_choice_fn(["general", "high_throughput"]),
)
# GS storage can use process-based parallelism instead of threads.
# Processes perform better for high throughput workloads (e.g. many huge artifacts)
GS_STORAGE_WORKLOAD_TYPE = from_conf(
"GS_STORAGE_WORKLOAD_TYPE",
"general",
validate_fn=get_validate_choice_fn(["general", "high_throughput"]),
)
###
# Metadata configuration
###
SERVICE_URL = from_conf("SERVICE_URL")
SERVICE_RETRY_COUNT = from_conf("SERVICE_RETRY_COUNT", 5)
SERVICE_AUTH_KEY = from_conf("SERVICE_AUTH_KEY")
SERVICE_HEADERS = from_conf("SERVICE_HEADERS", {})
if SERVICE_AUTH_KEY is not None:
SERVICE_HEADERS["x-api-key"] = SERVICE_AUTH_KEY
# Checks version compatibility with Metadata service
SERVICE_VERSION_CHECK = from_conf("SERVICE_VERSION_CHECK", True)
# Default container image
DEFAULT_CONTAINER_IMAGE = from_conf("DEFAULT_CONTAINER_IMAGE")
# Default container registry
DEFAULT_CONTAINER_REGISTRY = from_conf("DEFAULT_CONTAINER_REGISTRY")
# Controls whether to include foreach stack information in metadata.
INCLUDE_FOREACH_STACK = from_conf("INCLUDE_FOREACH_STACK", True)
# Maximum length of the foreach value string to be stored in each ForeachFrame.
MAXIMUM_FOREACH_VALUE_CHARS = from_conf("MAXIMUM_FOREACH_VALUE_CHARS", 30)
# The default runtime limit (In seconds) of jobs launched by any compute provider. Default of 5 days.
DEFAULT_RUNTIME_LIMIT = from_conf("DEFAULT_RUNTIME_LIMIT", 5 * 24 * 60 * 60)
###
# Organization customizations
###
UI_URL = from_conf("UI_URL")
###
# Capture error logs from argo
###
ARGO_WORKFLOWS_CAPTURE_ERROR_SCRIPT = from_conf("ARGO_WORKFLOWS_CAPTURE_ERROR_SCRIPT")
# Contact information displayed when running the `metaflow` command.
# Value should be a dictionary where:
# - key is a string describing contact method
# - value is a string describing contact itself (email, web address, etc.)
# The default value shows an example of this
CONTACT_INFO = from_conf(
"CONTACT_INFO",
{
"Read the documentation": "http://docs.metaflow.org",
"Chat with us": "http://chat.metaflow.org",
"Get help by email": "[email protected]",
},
)
###
# Decorators
###
# Format is a space separated string of decospecs (what is passed
# using --with)
DECOSPECS = from_conf("DECOSPECS", "")
###
# AWS Batch configuration
###
# IAM role for AWS Batch container with Amazon S3 access
# (and AWS DynamoDb access for AWS StepFunctions, if enabled)
ECS_S3_ACCESS_IAM_ROLE = from_conf("ECS_S3_ACCESS_IAM_ROLE")
# IAM role for AWS Batch container for AWS Fargate
ECS_FARGATE_EXECUTION_ROLE = from_conf("ECS_FARGATE_EXECUTION_ROLE")
# Job queue for AWS Batch
BATCH_JOB_QUEUE = from_conf("BATCH_JOB_QUEUE")
# Default container image for AWS Batch
BATCH_CONTAINER_IMAGE = from_conf("BATCH_CONTAINER_IMAGE", DEFAULT_CONTAINER_IMAGE)
# Default container registry for AWS Batch
BATCH_CONTAINER_REGISTRY = from_conf(
"BATCH_CONTAINER_REGISTRY", DEFAULT_CONTAINER_REGISTRY
)
# Metadata service URL for AWS Batch
SERVICE_INTERNAL_URL = from_conf("SERVICE_INTERNAL_URL", SERVICE_URL)
# Assign resource tags to AWS Batch jobs. Set to False by default since
# it requires `Batch:TagResource` permissions which may not be available
# in all Metaflow deployments. Hopefully, some day we can flip the
# default to True.
BATCH_EMIT_TAGS = from_conf("BATCH_EMIT_TAGS", False)
###
# AWS Step Functions configuration
###
# IAM role for AWS Step Functions with AWS Batch and AWS DynamoDb access
# https://docs.aws.amazon.com/step-functions/latest/dg/batch-iam.html
SFN_IAM_ROLE = from_conf("SFN_IAM_ROLE")
# AWS DynamoDb Table name (with partition key - `pathspec` of type string)
SFN_DYNAMO_DB_TABLE = from_conf("SFN_DYNAMO_DB_TABLE")
# IAM role for AWS Events with AWS Step Functions access
# https://docs.aws.amazon.com/eventbridge/latest/userguide/auth-and-access-control-eventbridge.html
EVENTS_SFN_ACCESS_IAM_ROLE = from_conf("EVENTS_SFN_ACCESS_IAM_ROLE")
# Prefix for AWS Step Functions state machines. Set to stack name for Metaflow
# sandbox.
SFN_STATE_MACHINE_PREFIX = from_conf("SFN_STATE_MACHINE_PREFIX")
# Optional AWS CloudWatch Log Group ARN for emitting AWS Step Functions state
# machine execution logs. This needs to be available when using the
# `step-functions create --log-execution-history` command.
SFN_EXECUTION_LOG_GROUP_ARN = from_conf("SFN_EXECUTION_LOG_GROUP_ARN")
# Amazon S3 path for storing the results of AWS Step Functions Distributed Map
SFN_S3_DISTRIBUTED_MAP_OUTPUT_PATH = from_conf(
"SFN_S3_DISTRIBUTED_MAP_OUTPUT_PATH",
(
os.path.join(DATASTORE_SYSROOT_S3, "sfn_distributed_map_output")
if DATASTORE_SYSROOT_S3
else None
),
)
###
# Kubernetes configuration
###
# Kubernetes namespace to use for all objects created by Metaflow
KUBERNETES_NAMESPACE = from_conf("KUBERNETES_NAMESPACE", "default")
# Default service account to use by K8S jobs created by Metaflow
KUBERNETES_SERVICE_ACCOUNT = from_conf("KUBERNETES_SERVICE_ACCOUNT")
# Default node selectors to use by K8S jobs created by Metaflow - foo=bar,baz=bab
KUBERNETES_NODE_SELECTOR = from_conf("KUBERNETES_NODE_SELECTOR", "")
KUBERNETES_TOLERATIONS = from_conf("KUBERNETES_TOLERATIONS", "")
KUBERNETES_PERSISTENT_VOLUME_CLAIMS = from_conf(
"KUBERNETES_PERSISTENT_VOLUME_CLAIMS", ""
)
KUBERNETES_SECRETS = from_conf("KUBERNETES_SECRETS", "")
# Default labels for kubernetes pods
KUBERNETES_LABELS = from_conf("KUBERNETES_LABELS", "")
# Default annotations for kubernetes pods
KUBERNETES_ANNOTATIONS = from_conf("KUBERNETES_ANNOTATIONS", "")
# Default GPU vendor to use by K8S jobs created by Metaflow (supports nvidia, amd)
KUBERNETES_GPU_VENDOR = from_conf("KUBERNETES_GPU_VENDOR", "nvidia")
# Default container image for K8S
KUBERNETES_CONTAINER_IMAGE = from_conf(
"KUBERNETES_CONTAINER_IMAGE", DEFAULT_CONTAINER_IMAGE
)
# Image pull policy for container images
KUBERNETES_IMAGE_PULL_POLICY = from_conf("KUBERNETES_IMAGE_PULL_POLICY", None)
# Default container registry for K8S
KUBERNETES_CONTAINER_REGISTRY = from_conf(
"KUBERNETES_CONTAINER_REGISTRY", DEFAULT_CONTAINER_REGISTRY
)
# Toggle for trying to fetch EC2 instance metadata
KUBERNETES_FETCH_EC2_METADATA = from_conf("KUBERNETES_FETCH_EC2_METADATA", False)
# Shared memory in MB to use for this step
KUBERNETES_SHARED_MEMORY = from_conf("KUBERNETES_SHARED_MEMORY", None)
# Default port number to open on the pods
KUBERNETES_PORT = from_conf("KUBERNETES_PORT", None)
# Default kubernetes resource requests for CPU, memory and disk
KUBERNETES_CPU = from_conf("KUBERNETES_CPU", None)
KUBERNETES_MEMORY = from_conf("KUBERNETES_MEMORY", None)
KUBERNETES_DISK = from_conf("KUBERNETES_DISK", None)
# Default kubernetes QoS class
KUBERNETES_QOS = from_conf("KUBERNETES_QOS", "burstable")
ARGO_WORKFLOWS_KUBERNETES_SECRETS = from_conf("ARGO_WORKFLOWS_KUBERNETES_SECRETS", "")
ARGO_WORKFLOWS_ENV_VARS_TO_SKIP = from_conf("ARGO_WORKFLOWS_ENV_VARS_TO_SKIP", "")
KUBERNETES_JOBSET_GROUP = from_conf("KUBERNETES_JOBSET_GROUP", "jobset.x-k8s.io")
KUBERNETES_JOBSET_VERSION = from_conf("KUBERNETES_JOBSET_VERSION", "v1alpha2")
##
# Argo Events Configuration
##
ARGO_EVENTS_SERVICE_ACCOUNT = from_conf("ARGO_EVENTS_SERVICE_ACCOUNT")
ARGO_EVENTS_EVENT_BUS = from_conf("ARGO_EVENTS_EVENT_BUS", "default")
ARGO_EVENTS_EVENT_SOURCE = from_conf("ARGO_EVENTS_EVENT_SOURCE")
ARGO_EVENTS_EVENT = from_conf("ARGO_EVENTS_EVENT")
ARGO_EVENTS_WEBHOOK_URL = from_conf("ARGO_EVENTS_WEBHOOK_URL")
ARGO_EVENTS_INTERNAL_WEBHOOK_URL = from_conf(
"ARGO_EVENTS_INTERNAL_WEBHOOK_URL", ARGO_EVENTS_WEBHOOK_URL
)
ARGO_EVENTS_WEBHOOK_AUTH = from_conf("ARGO_EVENTS_WEBHOOK_AUTH", "none")
ARGO_WORKFLOWS_UI_URL = from_conf("ARGO_WORKFLOWS_UI_URL")
##
# Airflow Configuration
##
# This configuration sets `startup_timeout_seconds` in airflow's KubernetesPodOperator.
AIRFLOW_KUBERNETES_STARTUP_TIMEOUT_SECONDS = from_conf(
"AIRFLOW_KUBERNETES_STARTUP_TIMEOUT_SECONDS", 60 * 60
)
# This configuration sets `kubernetes_conn_id` in airflow's KubernetesPodOperator.
AIRFLOW_KUBERNETES_CONN_ID = from_conf("AIRFLOW_KUBERNETES_CONN_ID")
AIRFLOW_KUBERNETES_KUBECONFIG_FILE = from_conf("AIRFLOW_KUBERNETES_KUBECONFIG_FILE")
AIRFLOW_KUBERNETES_KUBECONFIG_CONTEXT = from_conf(
"AIRFLOW_KUBERNETES_KUBECONFIG_CONTEXT"
)
###
# Conda configuration
###
# Conda package root location on S3
CONDA_PACKAGE_S3ROOT = from_conf("CONDA_PACKAGE_S3ROOT")
# Conda package root location on Azure
CONDA_PACKAGE_AZUREROOT = from_conf("CONDA_PACKAGE_AZUREROOT")
# Conda package root location on GS
CONDA_PACKAGE_GSROOT = from_conf("CONDA_PACKAGE_GSROOT")
# Use an alternate dependency resolver for conda packages instead of conda
# Mamba promises faster package dependency resolution times, which
# should result in an appreciable speedup in flow environment initialization.
CONDA_DEPENDENCY_RESOLVER = from_conf("CONDA_DEPENDENCY_RESOLVER", "conda")
###
# Escape hatch configuration
###
# Print out warning if escape hatch is not used for the target packages
ESCAPE_HATCH_WARNING = from_conf("ESCAPE_HATCH_WARNING", True)
###
# Debug configuration
###
DEBUG_OPTIONS = ["subcommand", "sidecar", "s3client", "tracing", "stubgen", "userconf"]
for typ in DEBUG_OPTIONS:
vars()["DEBUG_%s" % typ.upper()] = from_conf("DEBUG_%s" % typ.upper(), False)
###
# Plugin configuration
###
# Plugin configuration variables exist in plugins/__init__.py.
# Specifically, there is an ENABLED_<category> configuration value to determine
# the set of plugins to enable. The categories are: step_decorator, flow_decorator,
# environment, metadata_provider, datastore, sidecar, logging_sidecar, monitor_sidecar,
# aws_client_provider, and cli. If not set (the default), all plugins are enabled.
# You can restrict which plugins are enabled by listing them explicitly, for example
# ENABLED_STEP_DECORATOR = ["batch", "resources"] will enable only those two step
# decorators and none other.
###
# Command configuration
###
# Command (ie: metaflow <cmd>) configuration variable ENABLED_CMD
# exists in cmd/main_cli.py. It behaves just like any of the other ENABLED_<category>
# configuration variables.
###
# AWS Sandbox configuration
###
# Boolean flag for metaflow AWS sandbox access
AWS_SANDBOX_ENABLED = from_conf("AWS_SANDBOX_ENABLED", False)
# Metaflow AWS sandbox auth endpoint
AWS_SANDBOX_STS_ENDPOINT_URL = SERVICE_URL
# Metaflow AWS sandbox API auth key
AWS_SANDBOX_API_KEY = from_conf("AWS_SANDBOX_API_KEY")
# Internal Metadata URL
AWS_SANDBOX_INTERNAL_SERVICE_URL = from_conf("AWS_SANDBOX_INTERNAL_SERVICE_URL")
# AWS region
AWS_SANDBOX_REGION = from_conf("AWS_SANDBOX_REGION")
# Finalize configuration
if AWS_SANDBOX_ENABLED:
os.environ["AWS_DEFAULT_REGION"] = AWS_SANDBOX_REGION
SERVICE_INTERNAL_URL = AWS_SANDBOX_INTERNAL_SERVICE_URL
SERVICE_HEADERS["x-api-key"] = AWS_SANDBOX_API_KEY
SFN_STATE_MACHINE_PREFIX = from_conf("AWS_SANDBOX_STACK_NAME")
KUBERNETES_SANDBOX_INIT_SCRIPT = from_conf("KUBERNETES_SANDBOX_INIT_SCRIPT")
OTEL_ENDPOINT = from_conf("OTEL_ENDPOINT")
ZIPKIN_ENDPOINT = from_conf("ZIPKIN_ENDPOINT")
CONSOLE_TRACE_ENABLED = from_conf("CONSOLE_TRACE_ENABLED", False)
# internal env used for preventing the tracing module from loading during Conda bootstrapping.
DISABLE_TRACING = bool(os.environ.get("DISABLE_TRACING", False))
# MAX_ATTEMPTS is the maximum number of attempts, including the first
# task, retries, and the final fallback task and its retries.
#
# Datastore needs to check all attempt files to find the latest one, so
# increasing this limit has real performance implications for all tasks.
# Decreasing this limit is very unsafe, as it can lead to wrong results
# being read from old tasks.
#
# Note also that DataStoreSet resolves the latest attempt_id using
# lexicographic ordering of attempts. This won't work if MAX_ATTEMPTS > 99.
MAX_ATTEMPTS = from_conf("MAX_ATTEMPTS", 6)
# Feature flag (experimental features that are *explicitly* unsupported)
# Process configs even when using the click_api for Runner/Deployer
CLICK_API_PROCESS_CONFIG = from_conf("CLICK_API_PROCESS_CONFIG", False)
# PINNED_CONDA_LIBS are the libraries that metaflow depends on for execution
# and are needed within a conda environment
def get_pinned_conda_libs(python_version, datastore_type):
pins = {
"requests": ">=2.21.0",
}
if datastore_type == "s3":
pins["boto3"] = ">=1.14.0"
elif datastore_type == "azure":
pins["azure-identity"] = ">=1.10.0"
pins["azure-storage-blob"] = ">=12.12.0"
pins["azure-keyvault-secrets"] = ">=4.7.0"
elif datastore_type == "gs":
pins["google-cloud-storage"] = ">=2.5.0"
pins["google-auth"] = ">=2.11.0"
pins["google-cloud-secret-manager"] = ">=2.10.0"
elif datastore_type == "local":
pass
else:
raise MetaflowException(
msg="conda lib pins for datastore %s are undefined" % (datastore_type,)
)
return pins
# Check if there are extensions to Metaflow to load and override everything
try:
from metaflow.extension_support import get_modules
_TOGGLE_DECOSPECS = []
ext_modules = get_modules("config")
for m in ext_modules:
# We load into globals whatever we have in extension_module
# We specifically exclude any modules that may be included (like sys, os, etc)
for n, o in m.module.__dict__.items():
if n == "DEBUG_OPTIONS":
DEBUG_OPTIONS.extend(o)
for typ in o:
vars()["DEBUG_%s" % typ.upper()] = from_conf(
"DEBUG_%s" % typ.upper(), False
)
elif n == "get_pinned_conda_libs":
def _new_get_pinned_conda_libs(
python_version, datastore_type, f1=globals()[n], f2=o
):
d1 = f1(python_version, datastore_type)
d2 = f2(python_version, datastore_type)
for k, v in d2.items():
d1[k] = v if k not in d1 else ",".join([d1[k], v])
return d1
globals()[n] = _new_get_pinned_conda_libs
elif n == "TOGGLE_DECOSPECS":
if any([x.startswith("-") for x in o]):
raise ValueError("Removing decospecs is not currently supported")
if any(" " in x for x in o):
raise ValueError("Decospecs cannot contain spaces")
_TOGGLE_DECOSPECS.extend(o)
elif not n.startswith("__") and not isinstance(o, types.ModuleType):
globals()[n] = o
# If DECOSPECS is set, use that, else extrapolate from extensions
if not DECOSPECS:
DECOSPECS = " ".join(_TOGGLE_DECOSPECS)
finally:
# Erase all temporary names to avoid leaking things
for _n in [
"m",
"n",
"o",
"typ",
"ext_modules",
"get_modules",
"_new_get_pinned_conda_libs",
"d1",
"d2",
"k",
"v",
"f1",
"f2",
"_TOGGLE_DECOSPECS",
]:
try:
del globals()[_n]
except KeyError:
pass
del globals()["_n"]