11#!/usr/bin/env python3
22
3- import click
43import datetime
54import json
65import sys
76import time
87import traceback
8+ from typing import Dict , List , Mapping , Tuple
9+
10+ import click
911import requests
10- from typing import List , Mapping , Tuple , Dict
11- from flytekit .remote import FlyteRemote
12+ from flytekit .configuration import Config
1213from flytekit .models .core .execution import WorkflowExecutionPhase
13- from flytekit .configuration import Config , ImageConfig , SerializationSettings
14+ from flytekit .remote import FlyteRemote
1415from flytekit .remote .executions import FlyteWorkflowExecution
1516
16-
1717WAIT_TIME = 10
1818MAX_ATTEMPTS = 200
1919
2222# starting with "core".
2323FLYTESNACKS_WORKFLOW_GROUPS : Mapping [str , List [Tuple [str , dict ]]] = {
2424 "lite" : [
25- ("basics.hello_world.my_wf" , {}),
26- ("basics.lp.go_greet" , {"day_of_week" : "5" , "number" : 3 , "am" : True }),
25+ ("basics.hello_world.hello_world_wf" , {}),
2726 ],
2827 "core" : [
29- ( "basics.deck.wf " , {}),
28+ # ("development_lifecycle.decks.image_renderer_wf ", {}),
3029 # The chain_workflows example in flytesnacks expects to be running in a sandbox.
31- # ("control_flow .chain_entities.chain_workflows_wf", {}),
32- ("control_flow .dynamics.wf" , {"s1" : "Pear" , "s2" : "Earth" }),
33- ("control_flow .map_task.my_map_workflow" , {"a" : [1 , 2 , 3 , 4 , 5 ]}),
30+ ( "advanced_composition .chain_entities.chain_workflows_wf" , {}),
31+ ("advanced_composition .dynamics.wf" , {"s1" : "Pear" , "s2" : "Earth" }),
32+ ("advanced_composition .map_task.my_map_workflow" , {"a" : [1 , 2 , 3 , 4 , 5 ]}),
3433 # Workflows that use nested executions cannot be launched via flyteremote.
3534 # This issue is being tracked in https://github.com/flyteorg/flyte/issues/1482.
3635 # ("control_flow.run_conditions.multiplier", {"my_input": 0.5}),
4140 # ("control_flow.run_conditions.nested_conditions", {"my_input": 0.4}),
4241 # ("control_flow.run_conditions.consume_outputs", {"my_input": 0.4, "seed": 7}),
4342 # ("control_flow.run_merge_sort.merge_sort", {"numbers": [5, 4, 3, 2, 1], "count": 5}),
44- ("control_flow .subworkflows.parent_wf " , {"a " : 3 }),
45- ("control_flow .subworkflows.nested_parent_wf" , {"a" : 3 }),
46- ("basics.basic_workflow.my_wf " , {"a " : 50 , "b " : "hello" }),
43+ ("advanced_composition .subworkflows.parent_workflow " , {"my_input1 " : "hello" }),
44+ ("advanced_composition .subworkflows.nested_parent_wf" , {"a" : 3 }),
45+ ("basics.workflow.simple_wf " , {"x " : [ 1 , 2 , 3 ], "y " : [ 1 , 2 , 3 ] }),
4746 # TODO: enable new files and folders workflows
4847 # ("basics.files.rotate_one_workflow", {"in_image": "https://upload.wikimedia.org/wikipedia/commons/d/d2/Julia_set_%28C_%3D_0.285%2C_0.01%29.jpg"}),
4948 # ("basics.folders.download_and_rotate", {}),
50- ("basics.hello_world.my_wf" , {}),
51- ("basics.lp.my_wf" , {"val" : 4 }),
52- ("basics.lp.go_greet" , {"day_of_week" : "5" , "number" : 3 , "am" : True }),
53- ("basics.named_outputs.my_wf" , {}),
49+ ("basics.hello_world.hello_world_wf" , {}),
50+ ("basics.named_outputs.simple_wf_with_named_outputs" , {}),
5451 # # Getting a 403 for the wikipedia image
5552 # # ("basics.reference_task.wf", {}),
56- ("type_system .custom_objects.wf" , {"x" : 10 , "y" : 20 }),
53+ ("data_types_and_io .custom_objects.wf" , {"x" : 10 , "y" : 20 }),
5754 # Enums are not supported in flyteremote
5855 # ("type_system.enums.enum_wf", {"c": "red"}),
59- ("type_system .schema.df_wf" , {"a" : 42 }),
60- ("type_system .typed_schema.wf" , {}),
61- #("my.imperative.workflow.example", {"in1": "hello", "in2": "foo"}),
56+ ("data_types_and_io .schema.df_wf" , {"a" : 42 }),
57+ ("data_types_and_io .typed_schema.wf" , {}),
58+ # ("my.imperative.workflow.example", {"in1": "hello", "in2": "foo"}),
6259 ],
6360 "integrations-k8s-spark" : [
6461 ("k8s_spark_plugin.pyspark_pi.my_spark" , {"triggered_date" : datetime .datetime .now ()}),
@@ -97,19 +94,22 @@ def execute_workflow(remote, version, workflow_name, inputs):
9794 wf = remote .fetch_workflow (name = workflow_name , version = version )
9895 return remote .execute (wf , inputs = inputs , wait = False )
9996
97+
10098def executions_finished (executions_by_wfgroup : Dict [str , List [FlyteWorkflowExecution ]]) -> bool :
10199 for executions in executions_by_wfgroup .values ():
102100 if not all ([execution .is_done for execution in executions ]):
103101 return False
104102 return True
105103
104+
106105def sync_executions (remote : FlyteRemote , executions_by_wfgroup : Dict [str , List [FlyteWorkflowExecution ]]):
107106 try :
108107 for executions in executions_by_wfgroup .values ():
109108 for execution in executions :
110109 print (f"About to sync execution_id={ execution .id .name } " )
111110 remote .sync (execution )
112- except :
111+ except Exception :
112+ print (traceback .format_exc ())
113113 print ("GOT TO THE EXCEPT" )
114114 print ("COUNT THIS!" )
115115
@@ -119,6 +119,7 @@ def report_executions(executions_by_wfgroup: Dict[str, List[FlyteWorkflowExecuti
119119 for execution in executions :
120120 print (execution )
121121
122+
122123def schedule_workflow_groups (
123124 tag : str ,
124125 workflow_groups : List [str ],
@@ -139,17 +140,12 @@ def schedule_workflow_groups(
139140
140141 # Wait for all executions to finish
141142 attempt = 0
142- while attempt == 0 or (
143- not executions_finished (executions_by_wfgroup ) and attempt < MAX_ATTEMPTS
144- ):
143+ while attempt == 0 or (not executions_finished (executions_by_wfgroup ) and attempt < MAX_ATTEMPTS ):
145144 attempt += 1
146- print (
147- f"Not all executions finished yet. Sleeping for some time, will check again in { WAIT_TIME } s"
148- )
145+ print (f"Not all executions finished yet. Sleeping for some time, will check again in { WAIT_TIME } s" )
149146 time .sleep (WAIT_TIME )
150147 sync_executions (remote , executions_by_wfgroup )
151148
152-
153149 report_executions (executions_by_wfgroup )
154150
155151 results = {}
@@ -192,14 +188,17 @@ def run(
192188
193189 # For a given release tag and priority, this function filters the workflow groups from the flytesnacks
194190 # manifest file. For example, for the release tag "v0.2.224" and the priority "P0" it returns [ "core" ].
195- manifest_url = "https://raw.githubusercontent.com/flyteorg/flytesnacks/" \
196- f"{ flytesnacks_release_tag } /flyte_tests_manifest.json"
191+ manifest_url = (
192+ "https://raw.githubusercontent.com/flyteorg/flytesnacks/" f"{ flytesnacks_release_tag } /flyte_tests_manifest.json"
193+ )
197194 r = requests .get (manifest_url )
198195 parsed_manifest = r .json ()
199196 workflow_groups = []
200- workflow_groups = ["lite" ] if "lite" in priorities else [
201- group ["name" ] for group in parsed_manifest if group ["priority" ] in priorities
202- ]
197+ workflow_groups = (
198+ ["lite" ]
199+ if "lite" in priorities
200+ else [group ["name" ] for group in parsed_manifest if group ["priority" ] in priorities ]
201+ )
203202
204203 results = []
205204 valid_workgroups = []
@@ -216,10 +215,7 @@ def run(
216215 valid_workgroups .append (workflow_group )
217216
218217 results_by_wfgroup = schedule_workflow_groups (
219- flytesnacks_release_tag ,
220- valid_workgroups ,
221- remote ,
222- terminate_workflow_on_failure
218+ flytesnacks_release_tag , valid_workgroups , remote , terminate_workflow_on_failure
223219 )
224220
225221 for workflow_group , succeeded in results_by_wfgroup .items ():
@@ -273,9 +269,7 @@ def cli(
273269 terminate_workflow_on_failure ,
274270):
275271 print (f"return_non_zero_on_failure={ return_non_zero_on_failure } " )
276- results = run (
277- flytesnacks_release_tag , priorities , config_file , terminate_workflow_on_failure
278- )
272+ results = run (flytesnacks_release_tag , priorities , config_file , terminate_workflow_on_failure )
279273
280274 # Write a json object in its own line describing the result of this run to stdout
281275 print (f"Result of run:\n { json .dumps (results )} " )
0 commit comments