Skip to content

Commit f86965e

Browse files
committed
all: zmq -> zmq4
1 parent 428fab0 commit f86965e

File tree

3 files changed

+24
-24
lines changed

3 files changed

+24
-24
lines changed

kernel.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import (
1616
"sync"
1717
"time"
1818

19-
zmq "github.com/go-zeromq/zmq4"
19+
"github.com/go-zeromq/zmq4"
2020
"golang.org/x/xerrors"
2121

2222
"github.com/cosmos72/gomacro/ast2"
@@ -48,7 +48,7 @@ type ConnectionInfo struct {
4848

4949
// Socket wraps a zmq socket with a lock which should be used to control write access.
5050
type Socket struct {
51-
Socket zmq.Socket
51+
Socket zmq4.Socket
5252
Lock *sync.Mutex
5353
}
5454

@@ -102,7 +102,7 @@ const (
102102
)
103103

104104
// RunWithSocket invokes the `run` function after acquiring the `Socket.Lock` and releases the lock when done.
105-
func (s *Socket) RunWithSocket(run func(socket zmq.Socket) error) error {
105+
func (s *Socket) RunWithSocket(run func(socket zmq4.Socket) error) error {
106106
s.Lock.Lock()
107107
defer s.Lock.Unlock()
108108
return run(s.Socket)
@@ -164,7 +164,7 @@ func runKernel(connectionFile string) {
164164
// TODO gracefully shutdown the heartbeat handler on kernel shutdown by closing the chan returned by startHeartbeat.
165165

166166
type msgType struct {
167-
Msg zmq.Msg
167+
Msg zmq4.Msg
168168
Err error
169169
}
170170

@@ -176,7 +176,7 @@ func runKernel(connectionFile string) {
176176
)
177177

178178
defer close(quit)
179-
poll := func(msgs chan msgType, sck zmq.Socket) {
179+
poll := func(msgs chan msgType, sck zmq4.Socket) {
180180
defer close(msgs)
181181
for {
182182
msg, err := sck.Recv()
@@ -250,27 +250,27 @@ func prepareSockets(connInfo ConnectionInfo) (SocketGroup, error) {
250250

251251
// Create the shell socket, a request-reply socket that may receive messages from multiple frontend for
252252
// code execution, introspection, auto-completion, etc.
253-
sg.ShellSocket.Socket = zmq.NewRouter(ctx)
253+
sg.ShellSocket.Socket = zmq4.NewRouter(ctx)
254254
sg.ShellSocket.Lock = &sync.Mutex{}
255255

256256
// Create the control socket. This socket is a duplicate of the shell socket where messages on this channel
257257
// should jump ahead of queued messages on the shell socket.
258-
sg.ControlSocket.Socket = zmq.NewRouter(ctx)
258+
sg.ControlSocket.Socket = zmq4.NewRouter(ctx)
259259
sg.ControlSocket.Lock = &sync.Mutex{}
260260

261261
// Create the stdin socket, a request-reply socket used to request user input from a front-end. This is analogous
262262
// to a standard input stream.
263-
sg.StdinSocket.Socket = zmq.NewRouter(ctx)
263+
sg.StdinSocket.Socket = zmq4.NewRouter(ctx)
264264
sg.StdinSocket.Lock = &sync.Mutex{}
265265

266266
// Create the iopub socket, a publisher for broadcasting data like stdout/stderr output, displaying execution
267267
// results or errors, kernel status, etc. to connected subscribers.
268-
sg.IOPubSocket.Socket = zmq.NewPub(ctx)
268+
sg.IOPubSocket.Socket = zmq4.NewPub(ctx)
269269
sg.IOPubSocket.Lock = &sync.Mutex{}
270270

271271
// Create the heartbeat socket, a request-reply socket that only allows alternating recv-send (request-reply)
272272
// calls. It should echo the byte strings it receives to let the requester know the kernel is still alive.
273-
sg.HBSocket.Socket = zmq.NewRep(ctx)
273+
sg.HBSocket.Socket = zmq4.NewRep(ctx)
274274
sg.HBSocket.Lock = &sync.Mutex{}
275275

276276
// Bind the sockets.
@@ -575,7 +575,7 @@ func startHeartbeat(hbSocket Socket, wg *sync.WaitGroup) (shutdown chan struct{}
575575
defer wg.Done()
576576

577577
type msgType struct {
578-
Msg zmq.Msg
578+
Msg zmq4.Msg
579579
Err error
580580
}
581581

@@ -604,7 +604,7 @@ func startHeartbeat(hbSocket Socket, wg *sync.WaitGroup) (shutdown chan struct{}
604604
case <-timeout.C:
605605
continue
606606
case v := <-msgs:
607-
hbSocket.RunWithSocket(func(echo zmq.Socket) error {
607+
hbSocket.RunWithSocket(func(echo zmq4.Socket) error {
608608
if v.Err != nil {
609609
log.Fatalf("Error reading heartbeat ping bytes: %v\n", v.Err)
610610
return v.Err

kernel_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
"testing"
1212
"time"
1313

14-
zmq "github.com/go-zeromq/zmq4"
14+
"github.com/go-zeromq/zmq4"
1515
)
1616

1717
const (
@@ -327,8 +327,8 @@ cases:
327327

328328
// testJupyterClient holds references to the 2 sockets it uses to communicate with the kernel.
329329
type testJupyterClient struct {
330-
shellSocket zmq.Socket
331-
ioSocket zmq.Socket
330+
shellSocket zmq4.Socket
331+
ioSocket zmq4.Socket
332332
}
333333

334334
// newTestJupyterClient creates and connects a fresh client to the kernel. Upon error, newTestJupyterClient
@@ -344,18 +344,18 @@ func newTestJupyterClient(t *testing.T) (testJupyterClient, func()) {
344344
)
345345

346346
// Prepare the shell socket.
347-
shell := zmq.NewReq(ctx)
347+
shell := zmq4.NewReq(ctx)
348348
if err = shell.Dial(addrShell); err != nil {
349349
t.Fatalf("\t%s shell.Connect: %s", failure, err)
350350
}
351351

352352
// Prepare the IOPub socket.
353-
iopub := zmq.NewSub(ctx)
353+
iopub := zmq4.NewSub(ctx)
354354
if err = iopub.Dial(addrIO); err != nil {
355355
t.Fatalf("\t%s iopub.Connect: %s", failure, err)
356356
}
357357

358-
if err = iopub.SetOption(zmq.OptionSubscribe, ""); err != nil {
358+
if err = iopub.SetOption(zmq4.OptionSubscribe, ""); err != nil {
359359
t.Fatalf("\t%s iopub.SetSubscribe: %s", failure, err)
360360
}
361361

@@ -390,7 +390,7 @@ func (client *testJupyterClient) sendShellRequest(t *testing.T, request Composed
390390
}
391391
frames = append(frames, reqMsgParts...)
392392

393-
if err = client.shellSocket.SendMulti(zmq.NewMsgFrom(frames...)); err != nil {
393+
if err = client.shellSocket.SendMulti(zmq4.NewMsgFrom(frames...)); err != nil {
394394
t.Fatalf("\t%s shellSocket.SendMessage: %s", failure, err)
395395
}
396396
}

messages.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
"encoding/json"
88
"time"
99

10-
zmq "github.com/go-zeromq/zmq4"
10+
"github.com/go-zeromq/zmq4"
1111
"github.com/gofrs/uuid"
1212
)
1313

@@ -139,7 +139,7 @@ func (msg ComposedMsg) ToWireMsg(signkey []byte) ([][]byte, error) {
139139
}
140140

141141
// SendResponse sends a message back to return identities of the received message.
142-
func (receipt *msgReceipt) SendResponse(socket zmq.Socket, msg ComposedMsg) error {
142+
func (receipt *msgReceipt) SendResponse(socket zmq4.Socket, msg ComposedMsg) error {
143143

144144
msgParts, err := msg.ToWireMsg(receipt.Sockets.Key)
145145
if err != nil {
@@ -151,7 +151,7 @@ func (receipt *msgReceipt) SendResponse(socket zmq.Socket, msg ComposedMsg) erro
151151
frames = append(frames, []byte("<IDS|MSG>"))
152152
frames = append(frames, msgParts...)
153153

154-
err = socket.SendMulti(zmq.NewMsgFrom(frames...))
154+
err = socket.SendMulti(zmq4.NewMsgFrom(frames...))
155155
if err != nil {
156156
return err
157157
}
@@ -190,7 +190,7 @@ func (receipt *msgReceipt) Publish(msgType string, content interface{}) error {
190190
}
191191

192192
msg.Content = content
193-
return receipt.Sockets.IOPubSocket.RunWithSocket(func(iopub zmq.Socket) error {
193+
return receipt.Sockets.IOPubSocket.RunWithSocket(func(iopub zmq4.Socket) error {
194194
return receipt.SendResponse(iopub, msg)
195195
})
196196
}
@@ -205,7 +205,7 @@ func (receipt *msgReceipt) Reply(msgType string, content interface{}) error {
205205
}
206206

207207
msg.Content = content
208-
return receipt.Sockets.ShellSocket.RunWithSocket(func(shell zmq.Socket) error {
208+
return receipt.Sockets.ShellSocket.RunWithSocket(func(shell zmq4.Socket) error {
209209
return receipt.SendResponse(shell, msg)
210210
})
211211
}

0 commit comments

Comments
 (0)