Skip to content

Commit bf16528

Browse files
committed
workflow: remove promise in workflow
1 parent 8a19538 commit bf16528

9 files changed

+101
-139
lines changed

alert/server.go

+1-4
Original file line numberDiff line numberDiff line change
@@ -402,14 +402,11 @@ func (a *Server) Stop() {
402402
func NewServer(apiServer *api.Server, pool ws.StructSpeakerPool, graph *graph.Graph, parser *traversal.GremlinTraversalParser, etcdClient *etcd.Client) (*Server, error) {
403403
election := etcdClient.NewElection("alert-server")
404404

405-
runtime, err := js.NewRuntime()
405+
runtime, err := api.NewWorkflowRuntime(graph, parser, apiServer)
406406
if err != nil {
407407
return nil, err
408408
}
409409

410-
runtime.Start()
411-
api.RegisterAPIServer(runtime, graph, parser, apiServer)
412-
413410
as := &Server{
414411
MasterElection: election,
415412
Pool: pool,

api/server/workflow_call.go

+16-14
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ import (
2121
"encoding/json"
2222
"fmt"
2323
"net/http"
24-
"strings"
2524

25+
"github.com/gorilla/mux"
2626
"github.com/skydive-project/skydive/js"
2727
"github.com/skydive-project/skydive/rbac"
2828

@@ -38,6 +38,7 @@ type WorkflowCallAPIHandler struct {
3838
apiServer *Server
3939
graph *graph.Graph
4040
parser *traversal.GremlinTraversalParser
41+
runtime *js.Runtime
4142
}
4243

4344
func (wc *WorkflowCallAPIHandler) executeWorkflow(w http.ResponseWriter, r *auth.AuthenticatedRequest) {
@@ -46,29 +47,22 @@ func (wc *WorkflowCallAPIHandler) executeWorkflow(w http.ResponseWriter, r *auth
4647
return
4748
}
4849

49-
uriSegments := strings.Split(r.URL.Path, "/")
5050
decoder := json.NewDecoder(r.Body)
5151
var wfCall types.WorkflowCall
5252
if err := decoder.Decode(&wfCall); err != nil {
5353
writeError(w, http.StatusBadRequest, err)
5454
return
5555
}
5656

57-
workflow, err := wc.getWorkflow(uriSegments[3])
58-
if err != nil {
59-
writeError(w, http.StatusBadRequest, err)
60-
return
61-
}
57+
vars := mux.Vars(&r.Request)
6258

63-
runtime, err := js.NewRuntime()
59+
workflow, err := wc.getWorkflow(vars["ID"])
6460
if err != nil {
65-
writeError(w, http.StatusFailedDependency, err)
61+
writeError(w, http.StatusBadRequest, err)
6662
return
6763
}
6864

69-
runtime.Start()
70-
RegisterAPIServer(runtime, wc.graph, wc.parser, wc.apiServer)
71-
ottoResult, err := runtime.ExecPromise(workflow.Source, wfCall.Params...)
65+
ottoResult, err := wc.runtime.ExecFunction(workflow.Source, wfCall.Params...)
7266
if err != nil {
7367
writeError(w, http.StatusBadRequest, err)
7468
return
@@ -100,7 +94,7 @@ func (wc *WorkflowCallAPIHandler) registerEndPoints(s *shttp.Server, authBackend
10094
{
10195
Name: "WorkflowCall",
10296
Method: "POST",
103-
Path: "/api/workflow/{workflowID}/call",
97+
Path: "/api/workflow/{ID}/call",
10498
HandlerFunc: wc.executeWorkflow,
10599
},
106100
}
@@ -109,11 +103,19 @@ func (wc *WorkflowCallAPIHandler) registerEndPoints(s *shttp.Server, authBackend
109103
}
110104

111105
// RegisterWorkflowCallAPI registers a new workflow call api handler
112-
func RegisterWorkflowCallAPI(s *shttp.Server, authBackend shttp.AuthenticationBackend, apiServer *Server, g *graph.Graph, tr *traversal.GremlinTraversalParser) {
106+
func RegisterWorkflowCallAPI(s *shttp.Server, authBackend shttp.AuthenticationBackend, apiServer *Server, g *graph.Graph, tr *traversal.GremlinTraversalParser) error {
107+
runtime, err := NewWorkflowRuntime(g, tr, apiServer)
108+
if err != nil {
109+
return err
110+
}
111+
113112
workflowCallAPIHandler := &WorkflowCallAPIHandler{
114113
apiServer: apiServer,
115114
graph: g,
116115
parser: tr,
116+
runtime: runtime,
117117
}
118118
workflowCallAPIHandler.registerEndPoints(s, authBackend)
119+
120+
return nil
119121
}

api/server/runtime.go api/server/workflow_runtime.go

+36-19
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"encoding/json"
2222
"fmt"
2323
"strings"
24+
"time"
2425

2526
"github.com/skydive-project/skydive/js"
2627

@@ -30,42 +31,56 @@ import (
3031
"github.com/skydive-project/skydive/graffiti/graph/traversal"
3132
)
3233

33-
// RegisterAPIServer exports Go functions required by the API
34-
// to run inside the JS VM
35-
func RegisterAPIServer(r *js.Runtime, g *graph.Graph, gremlinParser *traversal.GremlinTraversalParser, server *Server) {
34+
// NewWorkflowRuntime returns a new Workflow runtime
35+
func NewWorkflowRuntime(g *graph.Graph, tr *traversal.GremlinTraversalParser, server *Server) (*js.Runtime, error) {
36+
runtime, err := js.NewRuntime()
37+
if err != nil {
38+
return nil, err
39+
}
40+
runtime.Start()
41+
3642
queryGremlin := func(query string) otto.Value {
37-
ts, err := gremlinParser.Parse(strings.NewReader(query))
43+
ts, err := tr.Parse(strings.NewReader(query))
3844
if err != nil {
39-
return r.MakeCustomError("ParseError", err.Error())
45+
return runtime.MakeCustomError("ParseError", err.Error())
4046
}
4147

4248
result, err := ts.Exec(g, false)
4349
if err != nil {
44-
return r.MakeCustomError("ExecuteError", err.Error())
50+
return runtime.MakeCustomError("ExecuteError", err.Error())
4551
}
4652

4753
source, err := result.MarshalJSON()
4854
if err != nil {
49-
return r.MakeCustomError("MarshalError", err.Error())
55+
return runtime.MakeCustomError("MarshalError", err.Error())
5056
}
5157

52-
r, _ := r.ToValue(string(source))
58+
r, _ := runtime.ToValue(string(source))
5359
return r
5460
}
5561

56-
r.Set("Gremlin", func(call otto.FunctionCall) otto.Value {
62+
runtime.Set("sleep", func(call otto.FunctionCall) otto.Value {
63+
if len(call.ArgumentList) != 1 || !call.Argument(0).IsNumber() {
64+
return runtime.MakeCustomError("MissingArgument", "Sleep requires a number parameter")
65+
}
66+
t, _ := call.Argument(0).ToInteger()
67+
time.Sleep(time.Duration(t) * time.Millisecond)
68+
return otto.NullValue()
69+
})
70+
71+
runtime.Set("Gremlin", func(call otto.FunctionCall) otto.Value {
5772
if len(call.ArgumentList) < 1 || !call.Argument(0).IsString() {
58-
return r.MakeCustomError("MissingQueryArgument", "Gremlin requires a string parameter")
73+
return runtime.MakeCustomError("MissingQueryArgument", "Gremlin requires a string parameter")
5974
}
6075

6176
query := call.Argument(0).String()
6277

6378
return queryGremlin(query)
6479
})
6580

66-
r.Set("request", func(call otto.FunctionCall) otto.Value {
81+
runtime.Set("request", func(call otto.FunctionCall) otto.Value {
6782
if len(call.ArgumentList) < 3 || !call.Argument(0).IsString() || !call.Argument(1).IsString() || !call.Argument(2).IsString() {
68-
return r.MakeCustomError("WrongArguments", "Import requires 3 string parameters")
83+
return runtime.MakeCustomError("WrongArguments", "Import requires 3 string parameters")
6984
}
7085

7186
url := call.Argument(0).String()
@@ -74,15 +89,15 @@ func RegisterAPIServer(r *js.Runtime, g *graph.Graph, gremlinParser *traversal.G
7489

7590
subs := strings.Split(url, "/") // filepath.Base(url)
7691
if len(subs) < 3 {
77-
return r.MakeCustomError("WrongArgument", fmt.Sprintf("Malformed URL %s", url))
92+
return runtime.MakeCustomError("WrongArgument", fmt.Sprintf("Malformed URL %s", url))
7893
}
7994
resource := subs[2]
8095

8196
// For topology query, we directly call the Gremlin engine
8297
if resource == "topology" {
8398
query := types.TopologyParam{}
8499
if err := json.Unmarshal(data, &query); err != nil {
85-
return r.MakeCustomError("WrongArgument", fmt.Sprintf("Invalid query %s", string(data)))
100+
return runtime.MakeCustomError("WrongArgument", fmt.Sprintf("Invalid query %s", string(data)))
86101
}
87102

88103
return queryGremlin(query.GremlinQuery)
@@ -98,17 +113,17 @@ func RegisterAPIServer(r *js.Runtime, g *graph.Graph, gremlinParser *traversal.G
98113
case "POST":
99114
res := handler.New()
100115
if err := json.Unmarshal([]byte(data), res); err != nil {
101-
return r.MakeCustomError("UnmarshalError", err.Error())
116+
return runtime.MakeCustomError("UnmarshalError", err.Error())
102117
}
103118
if err := handler.Create(res); err != nil {
104-
return r.MakeCustomError("CreateError", err.Error())
119+
return runtime.MakeCustomError("CreateError", err.Error())
105120
}
106121
b, _ := json.Marshal(res)
107122
content = string(b)
108123

109124
case "DELETE":
110125
if len(subs) < 4 {
111-
return r.MakeCustomError("WrongArgument", "No ID specified")
126+
return runtime.MakeCustomError("WrongArgument", "No ID specified")
112127
}
113128
handler.Delete(subs[3])
114129

@@ -121,7 +136,7 @@ func RegisterAPIServer(r *js.Runtime, g *graph.Graph, gremlinParser *traversal.G
121136
id := subs[3]
122137
obj, found := handler.Get(id)
123138
if !found {
124-
return r.MakeCustomError("NotFound", fmt.Sprintf("%s %s could not be found", resource, id))
139+
return runtime.MakeCustomError("NotFound", fmt.Sprintf("%s %s could not be found", resource, id))
125140
}
126141
b, _ := json.Marshal(obj)
127142
content = string(b)
@@ -130,9 +145,11 @@ func RegisterAPIServer(r *js.Runtime, g *graph.Graph, gremlinParser *traversal.G
130145

131146
value, err := otto.ToValue(content)
132147
if err != nil {
133-
return r.MakeCustomError("WrongValue", err.Error())
148+
return runtime.MakeCustomError("WrongValue", err.Error())
134149
}
135150

136151
return value
137152
})
153+
154+
return runtime, nil
138155
}

js/api.ts

+11-30
Original file line numberDiff line numberDiff line change
@@ -70,22 +70,14 @@ if (jsEnv) {
7070
// exported by Skydive that makes use of the REST client
7171
// so we get support for authentication
7272
makeRequest = function (client: Client, url: string, method: string, body: string, opts: Object) : any {
73-
return new Promise(function (resolve, reject) {
74-
var err: any
75-
var data: any
76-
try {
77-
var output = request(url, method, body)
78-
if (output && output.length > 0) {
79-
data = JSON.parse(output)
80-
} else {
81-
data = null
82-
}
83-
resolve(data)
84-
}
85-
catch(e) {
86-
reject(e)
87-
}
88-
})
73+
var data: any
74+
var output = request(url, method, body)
75+
if (output && output.length > 0) {
76+
data = JSON.parse(output)
77+
} else {
78+
data = null
79+
}
80+
return {'then': function() { return data; } }
8981
}
9082
}
9183

@@ -215,14 +207,10 @@ export class API<T extends APIObject> {
215207
get(id) {
216208
let resource = this.Resource;
217209
let factory = this.Factory;
218-
219210
return this.client.request('/api/' + resource + "/" + id, "GET", "", {})
220211
.then(function (data) {
221212
return SerializationHelper.toInstance(new factory(), data)
222213
})
223-
.catch(function (error) {
224-
return Error("Capture not found")
225-
})
226214
}
227215

228216
delete(id: string) {
@@ -325,12 +313,9 @@ export class Step implements Step {
325313
return this.previous.serialize(data);
326314
}
327315

328-
then(cb) {
329-
let self = this;
330-
return self.api.query(self.toString())
331-
.then(function (data) {
332-
return cb(self.serialize(data))
333-
});
316+
result() {
317+
let data = this.api.query(this.toString())
318+
return this.serialize(data)
334319
}
335320
}
336321

@@ -937,7 +922,3 @@ export class Client {
937922
return makeRequest(this, url, method, data, opts);
938923
}
939924
}
940-
941-
export function sleep(time) {
942-
return new Promise(function(resolve) { return setTimeout(resolve, time); })
943-
}

js/browser.ts

-1
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,3 @@ window.EdgeRule = apiLib.EdgeRule
3737
window.NodeRule = apiLib.NodeRule
3838
window.PacketInjection = apiLib.PacketInjection
3939
window.Workflow = apiLib.Workflow
40-
window.sleep = apiLib.sleep

statics/workflows/check-connectivity.yaml

+14-33
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ Parameters:
1111
Type: node
1212
Source: |
1313
function CheckConnectivity(from, to) {
14+
try {
1415
var capture = new Capture();
1516
capture.GremlinQuery = "G.V().Has('TID', '" + from + "').ShortestPathTo(Metadata('TID', '" + to + "'))";
1617
@@ -21,39 +22,19 @@ Source: |
2122
packetInjection.ICMPID = Math.floor(Math.random() * 1000);
2223
packetInjection.Count = 5
2324
24-
return client.captures.create(capture).then(function (c) {
25-
capture = c
26-
return sleep(2000)
27-
}).then(function () {
28-
return client.G.V().Has("TID", from)
29-
}).then(function (nodes) {
30-
if (nodes[0].Metadata.Neutron && nodes[0].Metadata.Neutron.IPV4) {
31-
packetInjection.SrcIP = nodes[0].Metadata.Neutron.IPV4[0]
32-
}
33-
if (nodes[0].Metadata.ExtID && nodes[0].Metadata.ExtID["attached-mac"]) {
34-
packetInjection.SrcMAC = nodes[0].Metadata.ExtID["attached-mac"]
35-
}
36-
return client.G.V().Has("TID", to)
37-
}).then(function (nodes) {
38-
if (nodes[0].Metadata.Neutron && nodes[0].Metadata.Neutron.IPV4) {
39-
packetInjection.DstIP = nodes[0].Metadata.Neutron.IPV4[0]
40-
}
41-
if (nodes[0].Metadata.ExtID && nodes[0].Metadata.ExtID["attached-mac"]) {
42-
packetInjection.DstMAC = nodes[0].Metadata.ExtID["attached-mac"]
43-
}
44-
return client.packetInjections.create(packetInjection)
45-
}).then(function () {
46-
return sleep(2000)
47-
}).then(function () {
48-
return client.G.Flows().Has("ICMP.ID", packetInjection.ICMPID)
49-
}).then(function (flows) {
50-
console.log("Flows requested ! :-)")
51-
console.log(flows)
52-
return {
25+
capture = client.captures.create(capture)
26+
sleep(1000)
27+
client.packetInjections.create(packetInjection)
28+
sleep(1000)
29+
30+
var flows = client.G.Flows().Has("ICMP.ID", packetInjection.ICMPID).result()
31+
return {
5332
"State": flows.length > 0 && flows[0].Metric.ABPackets > 0 && flows[0].Metric.BAPackets > 0,
5433
"Flows": flows
55-
}
56-
}).finally(function () {
57-
client.captures.delete(capture.UUID)
58-
})
34+
};
35+
} catch (e) {
36+
console.log(e)
37+
} finally {
38+
if (capture && capture.UUID) client.captures.delete(capture.UUID)
39+
}
5940
}

0 commit comments

Comments
 (0)