Skip to content
This repository was archived by the owner on May 31, 2024. It is now read-only.

Commit 0b5ef91

Browse files
Create executions in flytectl (#39)
1 parent 0a2f561 commit 0b5ef91

40 files changed

Lines changed: 3000 additions & 130 deletions

boilerplate/lyft/golang_support_tools/tools.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,6 @@ package tools
55
import (
66
_ "github.com/alvaroloes/enumer"
77
_ "github.com/golangci/golangci-lint/cmd/golangci-lint"
8-
_ "github.com/lyft/flytestdlib/cli/pflags"
8+
_ "github.com/flyteorg/flytestdlib/cli/pflags"
99
_ "github.com/vektra/mockery/cmd/mockery"
1010
)

boilerplate/lyft/golang_test_targets/download_tooling.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ set -e
1717
# In the format of "<cli>:<package>" or ":<package>" if no cli
1818
tools=(
1919
"github.com/vektra/mockery/cmd/mockery"
20-
"github.com/lyft/flytestdlib/cli/pflags"
20+
"github.com/flyteorg/flytestdlib/cli/pflags"
2121
"github.com/golangci/golangci-lint/cmd/golangci-lint"
2222
"github.com/alvaroloes/enumer"
2323
)

cmd/create/create.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ func RemoteCreateCommand() *cobra.Command {
2626
createResourcesFuncs := map[string]cmdcore.CommandEntry{
2727
"project": {CmdFunc: createProjectsCommand, Aliases: []string{"projects"}, ProjectDomainNotRequired: true, PFlagProvider: projectConfig, Short: projectShort,
2828
Long: projectLong},
29+
"execution": {CmdFunc: createExecutionCommand, Aliases: []string{"executions"}, ProjectDomainNotRequired: false, PFlagProvider: executionConfig, Short: executionShort,
30+
Long: executionLong},
2931
}
3032
cmdcore.AddCommands(createCmd, createResourcesFuncs)
3133
return createCmd

cmd/create/create_test.go

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,43 @@
11
package create
22

33
import (
4+
"context"
45
"sort"
56
"testing"
67

8+
cmdCore "github.com/flyteorg/flytectl/cmd/core"
9+
"github.com/flyteorg/flytectl/cmd/testutils"
10+
"github.com/flyteorg/flyteidl/clients/go/admin/mocks"
11+
712
"github.com/stretchr/testify/assert"
813
)
914

15+
const testDataFolder = "../testdata/"
16+
17+
var (
18+
err error
19+
ctx context.Context
20+
mockClient *mocks.AdminServiceClient
21+
args []string
22+
cmdCtx cmdCore.CommandContext
23+
)
24+
var setup = testutils.Setup
25+
var tearDownAndVerify = testutils.TearDownAndVerify
26+
1027
func TestCreateCommand(t *testing.T) {
1128
createCommand := RemoteCreateCommand()
1229
assert.Equal(t, createCommand.Use, "create")
1330
assert.Equal(t, createCommand.Short, "Used for creating various flyte resources including tasks/workflows/launchplans/executions/project.")
14-
assert.Equal(t, len(createCommand.Commands()), 1)
31+
assert.Equal(t, len(createCommand.Commands()), 2)
1532
cmdNouns := createCommand.Commands()
1633
// Sort by Use value.
1734
sort.Slice(cmdNouns, func(i, j int) bool {
1835
return cmdNouns[i].Use < cmdNouns[j].Use
1936
})
20-
assert.Equal(t, cmdNouns[0].Use, "project")
21-
assert.Equal(t, cmdNouns[0].Aliases, []string{"projects"})
22-
assert.Equal(t, cmdNouns[0].Short, "Create project resources")
37+
assert.Equal(t, cmdNouns[0].Use, "execution")
38+
assert.Equal(t, cmdNouns[0].Aliases, []string{"executions"})
39+
assert.Equal(t, cmdNouns[0].Short, executionShort)
40+
assert.Equal(t, cmdNouns[1].Use, "project")
41+
assert.Equal(t, cmdNouns[1].Aliases, []string{"projects"})
42+
assert.Equal(t, cmdNouns[1].Short, "Create project resources")
2343
}

cmd/create/execution.go

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
package create
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/flyteorg/flytectl/cmd/config"
8+
cmdCore "github.com/flyteorg/flytectl/cmd/core"
9+
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
10+
)
11+
12+
const (
13+
executionShort = "Create execution resources"
14+
executionLong = `
15+
Create the executions for given workflow/task in a project and domain.
16+
17+
There are three steps in generating an execution.
18+
19+
- Generate the execution spec file using the get command.
20+
- Update the inputs for the execution if needed.
21+
- Run the execution by passing in the generated yaml file.
22+
23+
The spec file should be generated first and then run the execution using the spec file.
24+
You can reference the flytectl get task for more details
25+
26+
::
27+
28+
flytectl get tasks -d development -p flytectldemo core.advanced.run_merge_sort.merge --version v2 --execFile execution_spec.yaml
29+
30+
The generated file would look similar to this
31+
32+
.. code-block:: yaml
33+
34+
iamRoleARN: ""
35+
inputs:
36+
sorted_list1:
37+
- 0
38+
sorted_list2:
39+
- 0
40+
kubeServiceAcct: ""
41+
targetDomain: ""
42+
targetProject: ""
43+
task: core.advanced.run_merge_sort.merge
44+
version: "v2"
45+
46+
47+
The generated file can be modified to change the input values.
48+
49+
.. code-block:: yaml
50+
51+
iamRoleARN: 'arn:aws:iam::12345678:role/defaultrole'
52+
inputs:
53+
sorted_list1:
54+
- 2
55+
- 4
56+
- 6
57+
sorted_list2:
58+
- 1
59+
- 3
60+
- 5
61+
kubeServiceAcct: ""
62+
targetDomain: ""
63+
targetProject: ""
64+
task: core.advanced.run_merge_sort.merge
65+
version: "v2"
66+
67+
And then can be passed through the command line.
68+
Notice the source and target domain/projects can be different.
69+
The root project and domain flags of -p and -d should point to task/launch plans project/domain.
70+
71+
::
72+
73+
flytectl create execution --execFile execution_spec.yaml -p flytectldemo -d development --targetProject flytesnacks
74+
75+
Usage
76+
`
77+
)
78+
79+
//go:generate pflags ExecutionConfig --default-var executionConfig
80+
81+
// ExecutionConfig hold configuration for create execution flags and configuration of the actual task or workflow to be launched.
82+
type ExecutionConfig struct {
83+
// pflag section
84+
ExecFile string `json:"execFile,omitempty" pflag:",file for the execution params.If not specified defaults to <<workflow/task>_name>.execution_spec.yaml"`
85+
TargetDomain string `json:"targetDomain" pflag:",project where execution needs to be created.If not specified configured domain would be used."`
86+
TargetProject string `json:"targetProject" pflag:",project where execution needs to be created.If not specified configured project would be used."`
87+
KubeServiceAcct string `json:"kubeServiceAcct" pflag:",kubernetes service account AuthRole for launching execution."`
88+
IamRoleARN string `json:"iamRoleARN" pflag:",iam role ARN AuthRole for launching execution."`
89+
// Non plfag section is read from the execution config generated by get task/launchplan
90+
Workflow string `json:"workflow,omitempty"`
91+
Task string `json:"task,omitempty"`
92+
Version string `json:"version"`
93+
Inputs map[string]interface{} `json:"inputs"`
94+
}
95+
96+
type ExecutionParams struct {
97+
name string
98+
isTask bool
99+
}
100+
101+
var (
102+
executionConfig = &ExecutionConfig{}
103+
)
104+
105+
func createExecutionCommand(ctx context.Context, args []string, cmdCtx cmdCore.CommandContext) error {
106+
var execParams ExecutionParams
107+
var err error
108+
sourceProject := config.GetConfig().Project
109+
sourceDomain := config.GetConfig().Domain
110+
if execParams, err = readConfigAndValidate(config.GetConfig().Project, config.GetConfig().Domain); err != nil {
111+
return err
112+
}
113+
var executionRequest *admin.ExecutionCreateRequest
114+
if execParams.isTask {
115+
if executionRequest, err = createExecutionRequestForTask(ctx, execParams.name, sourceProject, sourceDomain, cmdCtx); err != nil {
116+
return err
117+
}
118+
} else {
119+
if executionRequest, err = createExecutionRequestForWorkflow(ctx, execParams.name, sourceProject, sourceDomain, cmdCtx); err != nil {
120+
return err
121+
}
122+
}
123+
exec, _err := cmdCtx.AdminClient().CreateExecution(ctx, executionRequest)
124+
if _err != nil {
125+
return _err
126+
}
127+
fmt.Printf("execution identifier %v\n", exec.Id)
128+
return nil
129+
}

cmd/create/execution_test.go

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
package create
2+
3+
import (
4+
"testing"
5+
6+
"github.com/flyteorg/flytectl/cmd/config"
7+
"github.com/flyteorg/flytectl/cmd/testutils"
8+
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
9+
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
10+
11+
"github.com/stretchr/testify/assert"
12+
"github.com/stretchr/testify/mock"
13+
"google.golang.org/protobuf/types/known/timestamppb"
14+
)
15+
16+
// This function needs to be called after testutils.Steup()
17+
func createExecutionSetup() {
18+
ctx = testutils.Ctx
19+
cmdCtx = testutils.CmdCtx
20+
mockClient = testutils.MockClient
21+
sortedListLiteralType := core.Variable{
22+
Type: &core.LiteralType{
23+
Type: &core.LiteralType_CollectionType{
24+
CollectionType: &core.LiteralType{
25+
Type: &core.LiteralType_Simple{
26+
Simple: core.SimpleType_INTEGER,
27+
},
28+
},
29+
},
30+
},
31+
}
32+
variableMap := map[string]*core.Variable{
33+
"sorted_list1": &sortedListLiteralType,
34+
"sorted_list2": &sortedListLiteralType,
35+
}
36+
37+
task1 := &admin.Task{
38+
Id: &core.Identifier{
39+
Name: "task1",
40+
Version: "v2",
41+
},
42+
Closure: &admin.TaskClosure{
43+
CreatedAt: &timestamppb.Timestamp{Seconds: 1, Nanos: 0},
44+
CompiledTask: &core.CompiledTask{
45+
Template: &core.TaskTemplate{
46+
Interface: &core.TypedInterface{
47+
Inputs: &core.VariableMap{
48+
Variables: variableMap,
49+
},
50+
},
51+
},
52+
},
53+
},
54+
}
55+
mockClient.OnGetTaskMatch(ctx, mock.Anything).Return(task1, nil)
56+
parameterMap := map[string]*core.Parameter{
57+
"numbers": {
58+
Var: &core.Variable{
59+
Type: &core.LiteralType{
60+
Type: &core.LiteralType_CollectionType{
61+
CollectionType: &core.LiteralType{
62+
Type: &core.LiteralType_Simple{
63+
Simple: core.SimpleType_INTEGER,
64+
},
65+
},
66+
},
67+
},
68+
},
69+
},
70+
"numbers_count": {
71+
Var: &core.Variable{
72+
Type: &core.LiteralType{
73+
Type: &core.LiteralType_Simple{
74+
Simple: core.SimpleType_INTEGER,
75+
},
76+
},
77+
},
78+
},
79+
"run_local_at_count": {
80+
Var: &core.Variable{
81+
Type: &core.LiteralType{
82+
Type: &core.LiteralType_Simple{
83+
Simple: core.SimpleType_INTEGER,
84+
},
85+
},
86+
},
87+
Behavior: &core.Parameter_Default{
88+
Default: &core.Literal{
89+
Value: &core.Literal_Scalar{
90+
Scalar: &core.Scalar{
91+
Value: &core.Scalar_Primitive{
92+
Primitive: &core.Primitive{
93+
Value: &core.Primitive_Integer{
94+
Integer: 10,
95+
},
96+
},
97+
},
98+
},
99+
},
100+
},
101+
},
102+
},
103+
}
104+
launchPlan1 := &admin.LaunchPlan{
105+
Id: &core.Identifier{
106+
Name: "core.advanced.run_merge_sort.merge_sort",
107+
Version: "v3",
108+
},
109+
Spec: &admin.LaunchPlanSpec{
110+
DefaultInputs: &core.ParameterMap{
111+
Parameters: parameterMap,
112+
},
113+
},
114+
Closure: &admin.LaunchPlanClosure{
115+
CreatedAt: &timestamppb.Timestamp{Seconds: 0, Nanos: 0},
116+
ExpectedInputs: &core.ParameterMap{
117+
Parameters: parameterMap,
118+
},
119+
},
120+
}
121+
objectGetRequest := &admin.ObjectGetRequest{
122+
Id: &core.Identifier{
123+
ResourceType: core.ResourceType_LAUNCH_PLAN,
124+
Project: config.GetConfig().Project,
125+
Domain: config.GetConfig().Domain,
126+
Name: "core.advanced.run_merge_sort.merge_sort",
127+
Version: "v3",
128+
},
129+
}
130+
mockClient.OnGetLaunchPlanMatch(ctx, objectGetRequest).Return(launchPlan1, nil)
131+
}
132+
func TestCreateLaunchPlanExecutionFunc(t *testing.T) {
133+
setup()
134+
createExecutionSetup()
135+
executionCreateResponseLP := &admin.ExecutionCreateResponse{
136+
Id: &core.WorkflowExecutionIdentifier{
137+
Project: "flytesnacks",
138+
Domain: "development",
139+
Name: "f652ea3596e7f4d80a0e",
140+
},
141+
}
142+
mockClient.OnCreateExecutionMatch(ctx, mock.Anything).Return(executionCreateResponseLP, nil)
143+
executionConfig.ExecFile = testDataFolder + "launchplan_execution_spec.yaml"
144+
err = createExecutionCommand(ctx, args, cmdCtx)
145+
assert.Nil(t, err)
146+
mockClient.AssertCalled(t, "CreateExecution", ctx, mock.Anything)
147+
tearDownAndVerify(t, `execution identifier project:"flytesnacks" domain:"development" name:"f652ea3596e7f4d80a0e"`)
148+
}
149+
150+
func TestCreateTaskExecutionFunc(t *testing.T) {
151+
setup()
152+
createExecutionSetup()
153+
executionCreateResponseTask := &admin.ExecutionCreateResponse{
154+
Id: &core.WorkflowExecutionIdentifier{
155+
Project: "flytesnacks",
156+
Domain: "development",
157+
Name: "ff513c0e44b5b4a35aa5",
158+
},
159+
}
160+
mockClient.OnCreateExecutionMatch(ctx, mock.Anything).Return(executionCreateResponseTask, nil)
161+
executionConfig.ExecFile = testDataFolder + "task_execution_spec.yaml"
162+
err = createExecutionCommand(ctx, args, cmdCtx)
163+
assert.Nil(t, err)
164+
mockClient.AssertCalled(t, "CreateExecution", ctx, mock.Anything)
165+
tearDownAndVerify(t, `execution identifier project:"flytesnacks" domain:"development" name:"ff513c0e44b5b4a35aa5" `)
166+
}

0 commit comments

Comments
 (0)