-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
156 lines (130 loc) · 4.34 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package main
import (
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"log"
"github.com/pebbe/zmq4"
)
type ConnectionInfo struct {
SignatureScheme string `json:"signature_scheme"`
Transport string `json:"transport"`
StdinPort int `json:"stdin_port"`
ControlPort int `json:"control_port"`
IOPubPort int `json:"iopub_port"`
HBPort int `json:"hb_port"`
ShellPort int `json:"shell_port"`
Key string `json:"key"`
IP string `json:"ip"`
}
type Sockets struct {
ShellSocket *zmq4.Socket
ControlSocket *zmq4.Socket
StdinSocket *zmq4.Socket
IOPubSocket *zmq4.Socket
HBSocket *zmq4.Socket
Key []byte
}
func prepareSockets(connInfo ConnectionInfo) (Sockets, error) {
// Initialize the socket group.
var sockets Sockets
var err error
var ctx *zmq4.Context
ctx, err = zmq4.NewContext()
if err != nil {
return sockets, err
}
// Create the shell socket, a request-reply socket that may receive messages from multiple frontend for
// code execution, introspection, auto-completion, etc.
sockets.ShellSocket, err = ctx.NewSocket(zmq4.ROUTER)
if err != nil {
return sockets, err
}
// Create the control socket. This socket is a duplicate of the shell socket where messages on this channel
// should jump ahead of queued messages on the shell socket.
sockets.ControlSocket, err = ctx.NewSocket(zmq4.ROUTER)
if err != nil {
return sockets, err
}
// Create the stdin socket, a request-reply socket used to request user input from a front-end. This is analogous
// to a standard input stream.
sockets.StdinSocket, err = ctx.NewSocket(zmq4.ROUTER)
if err != nil {
return sockets, err
}
// Create the iopub socket, a publisher for broadcasting data like stdout/stderr output, displaying execution
// results or errors, kernel status, etc. to connected subscribers.
sockets.IOPubSocket, err = ctx.NewSocket(zmq4.PUB)
if err != nil {
return sockets, err
}
// Create the heartbeat socket, a request-reply socket that only allows alternating recv-send (request-reply)
// calls. It should echo the byte strings it receives to let the requester know the kernel is still alive.
sockets.HBSocket, err = ctx.NewSocket(zmq4.REP)
if err != nil {
return sockets, err
}
// Bind the sockets.
address := fmt.Sprintf("%v://%v:%%v", connInfo.Transport, connInfo.IP)
err = sockets.ShellSocket.Bind(fmt.Sprintf(address, connInfo.ShellPort))
if err != nil {
return sockets, fmt.Errorf("could not listen on shell-socket: %w", err)
}
err = sockets.ControlSocket.Bind(fmt.Sprintf(address, connInfo.ControlPort))
if err != nil {
return sockets, fmt.Errorf("could not listen on control-socket: %w", err)
}
err = sockets.StdinSocket.Bind(fmt.Sprintf(address, connInfo.StdinPort))
if err != nil {
return sockets, fmt.Errorf("could not listen on stdin-socket: %w", err)
}
err = sockets.IOPubSocket.Bind(fmt.Sprintf(address, connInfo.IOPubPort))
if err != nil {
return sockets, fmt.Errorf("could not listen on iopub-socket: %w", err)
}
err = sockets.HBSocket.Bind(fmt.Sprintf(address, connInfo.HBPort))
if err != nil {
return sockets, fmt.Errorf("could not listen on hbeat-socket: %w", err)
}
// Set the message signing key.
sockets.Key = []byte(connInfo.Key)
return sockets, nil
}
func main() {
flag.Parse()
if flag.NArg() < 1 {
log.Fatalln("Need a command line argument specifying the connection file.")
}
var connectionFile = flag.Arg(0)
connectionData, err := ioutil.ReadFile(connectionFile)
if err != nil {
log.Fatal(err)
}
var connectionInfo ConnectionInfo
err = json.Unmarshal(connectionData, &connectionInfo)
if err != nil {
log.Fatal(err)
}
socketGroup, err := prepareSockets(connectionInfo)
if err != nil {
log.Fatal(err)
}
var reactor = zmq4.NewReactor()
reactor.AddSocket(socketGroup.HBSocket, zmq4.POLLIN | zmq4.POLLOUT, func(s zmq4.State) error {
return nil
})
reactor.AddSocket(socketGroup.ControlSocket, zmq4.POLLIN | zmq4.POLLOUT, func(s zmq4.State) error {
return nil
})
reactor.AddSocket(socketGroup.ShellSocket, zmq4.POLLIN | zmq4.POLLOUT, func(s zmq4.State) error {
return nil
})
reactor.AddSocket(socketGroup.StdinSocket, zmq4.POLLIN | zmq4.POLLOUT, func(s zmq4.State) error {
return nil
})
reactor.AddSocket(socketGroup.IOPubSocket, zmq4.POLLOUT, func(s zmq4.State) error {
return nil
})
reactor.Run(-1)
}