forked from brightroll/rq
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprotocol.rb
124 lines (100 loc) · 3.1 KB
/
protocol.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
require 'socket'
require 'fcntl'
# Mix the Protocol module into classes that communicate on internal sockets
module RQ
module Protocol
def set_protocol_sock_path(sock_path)
@protocol_sock_path = sock_path
end
def set_protocol_messages(messages)
@protocol_messages = messages
end
def set_nonblocking(sock)
flags = sock.fcntl(Fcntl::F_GETFL) | Fcntl::O_NONBLOCK
sock.fcntl(Fcntl::F_SETFL, flags)
end
def reset_nonblocking(sock)
flags = sock.fcntl(Fcntl::F_GETFL) ^ Fcntl::O_NONBLOCK
sock.fcntl(Fcntl::F_SETFL, flags)
end
def read_packet(sock)
protocol = do_read(sock, 4)
if protocol != "rq1 "
raise "REQ - Invalid protocol - bad ver #{protocol}"
end
size_str = do_read(sock, 9)
if size_str[-1..-1] != " "
raise "REQ - Invalid protocol - bad size #{size_str}"
end
size = size_str.to_i
$log.debug("REQ - size #{size}")
do_read(sock, size)
end
def send_packet(sock, resp)
log_msg = resp.length > 80 ? "#{resp[0...80]}..." : resp
$log.debug("RESP [ #{resp.bytesize} #{log_msg} ]")
sock_msg = sprintf("rq1 %08d %s", resp.bytesize, resp)
do_write(sock, sock_msg)
end
# msg is single word, data is assumbed to be content as json
def send_recv(msg, data="")
client = UNIXSocket.open(@protocol_sock_path)
send_packet(client, "#{msg} #{data}")
reply = read_packet(client) rescue nil
client.close
JSON.parse(reply) if reply
rescue
$log.warn("Error on the socket: #{$!} [ #{$@} ]")
nil
end
private
# Borrowed from UnixRack and updated by Backports
def do_write(client, buff)
lwritten = client.syswrite(buff)
nwritten = lwritten
# Only dup the original buff if we didn't get it all in the first try
while nwritten < buff.bytesize
remaining = buff.bytesize - nwritten
rbuff = (rbuff || buff).unpack("@#{lwritten}a#{remaining}").first
lwritten = client.syswrite(rbuff)
nwritten += lwritten
end
nwritten
rescue
defined?(nwritten) ? nwritten : 0
end
def do_read(client, size)
out = ""
while out.bytesize < size
remain = size - out.bytesize
out << client.readpartial(remain)
end
out
rescue EOFError
out
rescue Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF
raise "Got an #{$!} from socket read"
end
def message_send_recv(name, *params)
param = params.first
case param
when nil
send_recv(name)
when String
send_recv(name, param)
else
send_recv(name, param.to_json)
end
end
# Provides magic methods for words in the @protocol_messages array
def method_missing(name, *args, &block)
if @protocol_messages && @protocol_messages.include?(name.to_s)
message_send_recv(name, *args, &block)
else
super # You *must* call super if you don't handle the
# method, otherwise you'll mess up Ruby's method
# lookup.
end
end
end
end