-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcontract.txt
287 lines (245 loc) · 9.05 KB
/
contract.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
Create Feed:
SADD feeds [feed] //if fail, abort
PUBLISH newfeed [feed]\x00[instance uuid]
Delete Feed:
WATCH feeds
SISMEMBER feeds [feed] //if nil: fail
MULTI
SREM feeds [feed]
for key associated with feed:
DELETE [key]
PUBLISH delfeed [feed]\x00[instance uuid]
EXEC // if nil: go back to WATCH
Set Config Value:
HSET feed.config:[feed] name value
PUBLISH conffeed [feed]\x00[instance uuid]
Get Config Value:
HGET feed.config:[feed] name
Feed:
Publish:
//id may be provided or generated. An id that has already been published will update that id
max = feed.config:[feed] max_length
WATCH feed.ids:[feed]
delete_ids = ZRANGE feed.ids:[feed] 0 [-max] // eg ZRANGE feed.ids:test 0 -5 if the max is 4
MULTI
for id in delete_ids:
if id != publish_id:
ZREM feed.ids:[feed] [id]
HDEL feed.items:[feed] [id]
PUBLISH feed.retract:[feed] [id]
ZADD feed.ids:[feed] [utc epoch milliseconds] [id]
INCR feed.publishes:[feed]
HSET feed.items:[feed] [id] [item]
EXEC // if nil, go back to WATCH
//if zadd returned 1
PUBLISH feed.publish:[feed] [id]\x00[item]
//else
PUBLISH feed.edit:[feed] [id]\x00[item]
Retract:
WATCH feed.ids:[feed]
if ZRANK feed.ids:[feed] [id]
MULTI
ZREM feed.ids:[feed] [id]
HDEL feed.items:[feed] [id]
PUBLISH feed.retract:[feed] [id]
EXEC // if nil, go back to WATCH
Get IDs:
ZRANGE feed.ids:[feed] 0 -1
Get Items:
HGET feed.items:[feed] [id]
Get All:
HGETALL feed.items:[feed]
Sorted Feed:
Append/Publish:
id = INCR feed.idincr:[feed]
MULTI
RPUSH feed.ids:[feed] [id]
HSET feed.items:[feed] [id] [item]
INCR feed.publishes:[feed]
PUBLISH feed.publish:[feed] [id]\x00[item]
PUBLISH feed.position:[feed] [id]\x00:end
EXEC
Prepend:
id = INCR feed.idincr:[feed]
MULTI
LPUSH feed.ids:[feed] [id]
HSET feed.items:[feed] [id] [item]
INCR feed.publishes:[feed]
PUBLISH feed.publish:[feed] [id]\x00[item]
PUBLISH feed.position:[feed] [id]\x00begin:
EXEC
Edit:
WATCH feed.items:[feed]
HEXISTS feed.items:[feed] [id] //if not: UNWATCH fail
MULTI
HSET feed.items:[feed] [id] [item]
INCR feed.publishes:[feed]
PUBLISH feed.publish:[feed] [id]\x00[item]
EXEC //if nil, retry at WATCH
Publish Before:
WATCH feed.items:[feed]
HEXISTS feed.items:[feed] [before id] //if not: UNWATCH fail
id = INCR feed.idincr:[feed]
MULTI
LINSERT feed.ids:[feed] BEFORE [before id] [id]
HSET feed.items:[feed] [id] [item]
INCR feed.publishes:[feed]
PUBLISH feed.publish:[feed] [id]\x00[item]
PUBLISH feed.position:[feed] [id]\x00:[before id]
EXEC // if nil: retry at WATCH
Publish After:
WATCH feed.items:[feed]
HEXISTS feed.items:[feed] [after id] //if not: UNWATCH, fail
id = INCR feed.idincr:[feed]
MULTI
LINSERT feed.ids:[feed] AFTER [after id] [id]
HSET feed.items:[feed] [id] [item]
INCR feed.publishes:[feed]
PUBLISH feed.publish:[feed] [id]\x00[item]
PUBLISH feed.position:[feed] [id]\x00[after id]:
EXEC // if nil: retry at WATCH
Move:
//relative_position = :3 for before 3, 3: for after 3
// = :end for end, begin: for begin
WATCH feed.items:[feed]
HEXISTS feed.items:[feed] [id] //if not: UNWATCH, fail
//if relative_position is not :end or begin:
HEXISTS feed.items:[feed] [relative id] //if not: UNWATCH, fail
MULTI
LREM feed.ids:[feed] 1 [id]
//if begin:
LPUSH feed.ids:[feed] [id]
//if :end
RPUSH feed.ids:[feed] [id]
//else
LINSERT feed.ids:[feed] [BEFORE/AFTER] [relative id] [id]
PUBLISH feed.position:[feed] [id]\x00[relative position]
EXEC // if nil: retry at WATCH
Retract:
WATCH feed.items:[feed]
if HEXISTS feed.items:[feed] [id]
MULTI
LREM feed.ids:[feed] 1 [id]
HDEL feed.items:[feed] [id]
PUBLISH feed.retract:[feed] [id]
EXEC // if nil, go back to WATCH
else UNWATCH feed.ids:[feed]
Get IDs:
LRANGE feed.ids:[feed] 0 -1
Get Items:
HGET feed.items:[feed] [id]
Get All:
HGETALL feed.items:[feed]
Queue:
Normal Priority Put:
// id = generated uuid
MULTI
LPUSH feed.ids:[feed] [id]
HSET feed.items:[feed] [id] [item]
INCR feed.publishes:[feed]
EXEC
High Priority Put:
// id = generated uuid
MULTI
RPUSH feed.ids:[feed] [id]
HSET feed.items:[feed] [id] [item]
INCR feed.publishes:[feed]
EXEC
Get:
id = BRPOP feed.ids:[feed] [timeout]
//if error, abort
MULTI
item = HGET feed.items:[feed] [id]
HDEL feed.items:feed [id]
EXEC
Get IDs:
LRANGE feed.ids:[feed] 0 -1
Job:
Normal Priority Put:
// id = generated uuid
MULTI
LPUSH feed.ids:[feed] [id]
INCR feed.publishes:[feed]
HSET feed.items:[feed] [id] [item]
ZADD feed.published:[feed] [utc epoch milliseconds] [id]
EXEC
High Priority Put:
// id = generated uuid
MULTI
RPUSH feed.ids:[feed] [id]
HSET feed.items:[feed] [id] [item]
ZADD feed.published:[feed] [utc epoch milliseconds] [id]
EXEC
Get:
id = BRPOP feed.ids:[feed] [timeout]
//if error/timeout, abort
MULTI
ZADD feed.claimed:[feed] [utc epoch milliseconds] [id]
item = HGET feed:items[feed] [id]
EXEC
//if the id fails to get from feed.ids to feed.claimed, the maintenance will notice eventually
Finish: //worker finishes job
WATCH feed.claimed:[feed]
ZRANK feed.claimed:[feed] [id] // if nil: fail, this job isn't claimed and you can't finish it
MULTI
ZREM feed.claimed:[feed] [id]
HDEL feed.cancelled:[feed] [id] //just to make sure
INCR feed.finishes:[feed]
//optionally if publishing a result:
PUBLISH job.finish:[feed] [id]\x00[result]
HDEL feed.items:[feed] [id]
EXEC // if nil: go back to WATCH and try again
Get Ids:
HKEYS feed.items:[feed]
Cancel: //cancel a running job, will get retried
WATCH feed.claimed:[feed]
ZRANK feed.claimed:[feed] [id] // if nil: fail UNWATCH
MULTI
HINCRBY feed.cancelled:[feed] [id] 1
LPUSH feed.ids:[feed] [id]
ZREM feed.claimed:[feed] [id]
EXEC // if nil: go back to watch and try again
Stall: //fail a claimed job semi-permantently and take it out of the running, but don't delete
WATCH feed.claimed:[feed]
ZRANK feed.claimed:[feed] [id] // if nil: fail
MULTI
ZREM feed.claimed:[feed] [id]
HDEL feed.cancelled:[feed] [id]
SADD feed.stalled:[feed] [id]
ZREM feed.published:[feed] [id]
EXEC // if nil, retry
Retry: //retry a stalled job
WATCH feed.stalled:[feed]
SISMEMBER feed.stalled:[feed] [id] // if nil: UNWATCH fail
MULTI
SREM feed.stalled:[feed] [id]
//if error, abort
LPUSH feed.ids:[feed] [id]
ZADD feed.published:[feed] [utc epoch milliseconds] [id]
EXEC // if nil retry
Retract:
WATCH feed.items:[feed]
if not HEXISTS feed.items:[feed] [id] // fail UNWATCH
MULTI
HDEL feed.items:[feed] [id]
HDEL feed.cancelled:[feed] [id]
ZREM feed.published:[feed] [id]
SREM feed.stalled:[feed] [id]
ZREM feed.claimed:[feed] [id]
LREM feed.ids:[feed] 1 [id]
EXEC // if fail, retry
getNumOfFailures:
HGET feed.cancelled:[feed] [id]
Maintenance: //maintain job queue -- only ran by one process per jobqueue on occassion -- still a bit hand-wavey
MULTI
keys = HKEYS feed.items:[feed]
avail = LRANGE feed.ids:[feed]
claim = ZRANGE feed.claimed:[feed] 0 -1
stall = SMEMBERS feed.stalled:[feed]
EXEC
unaccounted = any of keys not in avail, claim, stall
for key in unnaccounted_last:
if key in keys and not in avail, claim, stall:
LPUSH feed.ids:[feed] [key]
check claimed jobs to see if any have been claimed too long and "Cancel" or "Stall" them
publish stats to a feed