34
35:- module(redis_streams,
36 [ xstream_set/3, 37 xadd/4, 38 xlisten/3, 39 xlisten_group/5, 40 41 xconsumer_stop/1 42 ]). 43:- use_module(library(redis)). 44:- use_module(library(error)). 45:- use_module(library(apply)). 46:- use_module(library(broadcast)). 47:- use_module(library(lists)). 48:- use_module(library(option)). 49:- use_module(library(debug)). 50
51:- meta_predicate
52 xlisten(+, +, 5, 5, +). 53
54:- multifile
55 xhook/2. 56
57:- predicate_options(xlisten/3, 3,
58 [ count(nonneg),
59 start(one_of([$,0])),
60 starts(list)
61 ]). 62:- predicate_options(xlisten_group/5, 5,
63 [ block(number),
64 max_deliveries(nonneg),
65 max_claim(nonneg)
66 ]). 67
68
95
96:- dynamic
97 xstream_option/3. 98
107
108xstream_set(Redis, KeyS, Option) :-
109 must_be(atom, Redis),
110 atom_string(Key, KeyS),
111 valid_option(Option),
112 functor(Option, Name, Arity),
113 functor(Gen, Name, Arity),
114 retractall(xstream_option(Redis, Key, Gen)),
115 asserta(xstream_option(Redis, Key, Option)).
116
117valid_option(Option) :-
118 stream_option(Option),
119 !.
120valid_option(Option) :-
121 domain_error(redis_stream_option, Option).
122
123stream_option(maxlen(X)) :- must_be(integer, X).
124
125
134
135xadd(DB, Key, Id, Dict) :-
136 redis_array_dict(Array, _, Dict),
137 ( var(Id)
138 -> IdIn = '*'
139 ; IdIn = Id
140 ),
141 ( xstream_option(DB, Key, maxlen(MaxLen))
142 -> Command =.. [xadd, Key, maxlen, ~, MaxLen, IdIn|Array]
143 ; Command =.. [xadd, Key, IdIn|Array]
144 ),
145 redis(DB, Command, Reply),
146 return_id(Id, Reply).
147
148return_id(Id, Reply) :-
149 var(Id),
150 !,
151 Id = Time-Seq,
152 split_string(Reply, "-", "", [TimeS,SeqS]),
153 number_string(Time, TimeS),
154 number_string(Seq, SeqS).
155return_id(_, _).
156
157
158 161
200
201xlisten(Redis, Streams, Options) :-
202 xlisten(Redis, Streams, xbroadcast, xidle, Options).
203
204xbroadcast(Redis, Stream, Id, Dict, _Options) :-
205 redis_id(Redis, RedisId),
206 catch(broadcast(redis(RedisId, Stream, Id, Dict)), Error,
207 print_message(error, Error)).
208
209redis_id(redis(Id, _, _), Id) :-
210 !.
211redis_id(Id, Id).
212
213xidle(_Redis, _Streams, _Starts, _NewStarts, _Options).
214
227
228xlisten(Redis, Streams, OnBroadcast, OnIdle, Options) :-
229 atom(Redis),
230 !,
231 setup_call_cleanup(
232 redis_connect(Redis, Conn, [reconnect(true)|Options]),
233 xlisten_(Conn, Streams, OnBroadcast, OnIdle, Options),
234 redis_disconnect(Conn)).
235xlisten(Redis, Streams, OnBroadcast, OnIdle, Options) :-
236 xlisten_(Redis, Streams, OnBroadcast, OnIdle, Options).
237
238xlisten_(Redis, Streams, OnBroadcast, OnIdle, Options) :-
239 xread_command(Streams, Starts0, CommandTempl, Options),
240 listen_loop(Redis, Starts0, CommandTempl,
241 OnBroadcast, OnIdle, Streams, Options).
242
243xread_command(Streams, Starts0, Starts-Command, Options) :-
244 option(group(Group-Consumer), Options),
245 !,
246 xread_command_args(Streams, Starts0, Starts, CmdArgs, Options),
247 Command =.. [xreadgroup, group, Group, Consumer | CmdArgs].
248xread_command(Streams, Starts0, Starts-Command, Options) :-
249 xread_command_args(Streams, Starts0, Starts, CmdArgs, Options),
250 Command =.. [xread|CmdArgs].
251
252xread_command_args(Streams, Starts0, Starts, CmdArgs, Options) :-
253 option(count(Count), Options),
254 !,
255 option(block(Block), Options, 0),
256 BlockMS is integer(Block*1000),
257 start_templ(Streams, Starts0, Starts, StreamArgs, Options),
258 CmdArgs = [count, Count, block, BlockMS, streams | StreamArgs].
259xread_command_args(Streams, Starts0, Starts, CmdArgs, Options) :-
260 option(block(Block), Options, 0),
261 BlockMS is integer(Block*1000),
262 start_templ(Streams, Starts0, Starts, StreamArgs, Options),
263 CmdArgs = [block, BlockMS, streams | StreamArgs].
264
265start_templ(Streams, Starts0, Starts, StreamArgs, Options) :-
266 option(starts(Starts0), Options),
267 !,
268 length(Streams, Len),
269 length(Starts, Len),
270 length(Starts0, LenS),
271 ( LenS =:= Len
272 -> true
273 ; domain_error(xread_starts, Starts0)
274 ),
275 append(Streams, Starts, StreamArgs).
276start_templ(Streams, Starts0, Starts, StreamArgs, Options) :-
277 option(start(Start), Options, $),
278 !,
279 length(Streams, Len),
280 length(Starts, Len),
281 length(Starts0, Len),
282 maplist(=(Start), Starts0),
283 append(Streams, Starts, StreamArgs).
284
285listen_loop(Redis, Starts, CommandTempl, OnBroadcast, OnIdle, Streams, Options) :-
286 copy_term(CommandTempl, Starts-Command),
287 ( redis(Redis, Command, Reply),
288 Reply \== nil
289 -> dispatch_streams(Reply, Redis, Streams, Starts, NewStarts,
290 OnBroadcast, OnIdle, Options)
291 ; NewStarts = Starts
292 ),
293 call(OnIdle, Redis, Streams, Starts, NewStarts, Options),
294 listen_loop(Redis, NewStarts, CommandTempl,
295 OnBroadcast, OnIdle, Streams, Options).
296
297dispatch_streams([], _, _, Starts, NewStarts, _, _, _) :-
298 maplist(copy_start, Starts, NewStarts).
299dispatch_streams([Tuple|T], Redis, Streams, Starts, NewStarts,
300 OnBroadcast, OnIdle, Options) :-
301 stream_tuple(Tuple, StreamS, []),
302 atom_string(Stream, StreamS),
303 !, 304 set_start(Stream, _Start, >, Streams, Starts, NewStarts),
305 dispatch_streams(T, Redis, Streams, Starts, NewStarts,
306 OnBroadcast, OnIdle, Options).
307dispatch_streams([Tuple|T], Redis, Streams, Starts, NewStarts,
308 OnBroadcast, OnIdle, Options) :-
309 stream_tuple(Tuple, StreamS, Messages),
310 atom_string(Stream, StreamS),
311 set_start(Stream, Start, NewStart, Streams, Starts, NewStarts),
312 dispatch_messages(Messages, Stream, Redis, Start, NewStart,
313 OnBroadcast, Options),
314 dispatch_streams(T, Redis, Streams, Starts, NewStarts,
315 OnBroadcast, OnIdle, Options).
316
317stream_tuple(Stream-Messages, Stream, Messages) :- !.
318stream_tuple([Stream,Messages], Stream, Messages).
319
320set_start(Stream, Old, New, [Stream|_], [Old|_], [New|_]) :-
321 !.
322set_start(Stream, Old, New, [_|Streams], [_|OldStarts], [_|NewStarts]) :-
323 set_start(Stream, Old, New, Streams, OldStarts, NewStarts).
324
325copy_start(Old, New) :-
326 ( var(New)
327 -> Old = New
328 ; true
329 ).
330
332
333dispatch_messages([], _, _, Start, Start, _, _).
334dispatch_messages([[Start,Data]|T], Stream, Redis, Start0, NewStart,
335 OnBroadcast, Options) :-
336 dispatch_message(Data, Stream, Redis, Start, OnBroadcast, Options),
337 join_starts(Start0, Start, Start1),
338 dispatch_messages(T, Stream, Redis, Start1, NewStart, OnBroadcast, Options).
339
340dispatch_message(Data, Stream, Redis, Id, OnBroadcast, Options) :-
341 ( Data == nil 342 -> Dict = redis{}
343 ; redis_array_dict(Data, redis, Dict)
344 ),
345 call(OnBroadcast, Redis, Stream, Id, Dict, Options).
346
347join_starts(>, _Start, >) :-
348 !.
349join_starts(_Start0, Start, Start).
350
351 354
386
387xlisten_group(Redis, Group, Consumer, Streams, Options) :-
388 catch(xlisten(Redis, Streams, xbroadcast_group, xidle_group,
389 [ group(Group-Consumer),
390 start(0)
391 | Options
392 ]),
393 redis(stop(Leave)),
394 true),
395 ( Leave == true
396 -> xleave_group(Redis, Group, Consumer, Streams)
397 ; true
398 ).
399
400xbroadcast_group(Connection, Stream, Id, Dict, Options) :-
401 option(group(Group-Consumer), Options),
402 redis_id(Connection, RedisId),
403 ( catch(broadcast_request(redis_consume(Stream, Dict,
404 _{redis:RedisId,
405 message:Id,
406 group:Group,
407 consumer:Consumer})),
408 Error, xbroadcast_error(Error, Connection, Stream, Group, Id))
409 -> redis(Connection, xack(Stream, Group, Id))
410 ; true
411 ).
412
413xbroadcast_error(redis(stop(Unregister)), Connection, Stream, Group, Id) :-
414 !,
415 redis(Connection, xack(Stream, Group, Id), _),
416 throw(redis(stop(Unregister))).
417xbroadcast_error(Error, _Connection, _Stream, _Group, _Id) :-
418 print_message(error, Error),
419 fail.
420
431
432xidle_group(Redis, Streams, Starts, Starts, Options) :- 433 !,
434 option(group(Group-_Consumer), Options),
435 claim(Streams, Redis, Group, 0, _Claimed, Options).
436xidle_group(_Redis, _Streams, _Starts, _NewStarts, _Options).
437
438claim([], _, _, Claimed, Claimed, _).
439claim([Stream|ST], Redis, Group, Claimed0, Claimed, Options) :-
440 claim_for_stream(Redis, Stream, Group, Claimed0, Claimed1, Options),
441 claim(ST, Redis, Group, Claimed1, Claimed, Options).
442
443claim_for_stream(Redis, Stream, Group, Claimed0, Claimed, Options) :-
444 check_limit_claimed(Claimed0, Options),
445 redis(Redis, xpending(Stream, Group), [Count,_Oldest,_Newest, Cons]),
446 Count > 0,
447 !,
448 debug(redis(claim), '~D pending messages on ~p for ~p (Consumers = ~p)',
449 [Count, Stream, Group, Cons]),
450 claim_for_stream_for_consumers(Cons, Redis, Stream, Group,
451 Claimed0, Claimed, Options).
452claim_for_stream(_Redis, _Stream, _Group, Claimed, Claimed, _Options).
453
454claim_for_stream_for_consumers([], _Redis, _Stream, _Group,
455 Claimed, Claimed, _Options).
456claim_for_stream_for_consumers([C|T], Redis, Stream, Group,
457 Claimed0, Claimed, Options) :-
458 claim_for_stream_for_consumer(Redis, Stream, Group, C,
459 Claimed0, Claimed1, Options),
460 claim_for_stream_for_consumers(T, Redis, Stream, Group,
461 Claimed1, Claimed, Options).
462
463claim_for_stream_for_consumer(Redis, Stream, Group, [Consumer,_Pending],
464 Claimed0, Claimed, Options) :-
465 redis(Redis, xpending(Stream, Group, -, +, 10, Consumer), Reply),
466 claim_messages(Reply, Redis, Stream, Group, Claimed0, Claimed, Options).
467
468claim_messages([], _Redis, _Stream, _Group, Claimed, Claimed, _Options).
469claim_messages([H|T], Redis, Stream, Group, Claimed0, Claimed, Options) :-
470 debug(redis(claim), 'Found pending ~p', [H]),
471 claim_message(H, Redis, Stream, Group, Claimed0, Claimed1, Options),
472 claim_messages(T, Redis, Stream, Group, Claimed1, Claimed, Options).
473
474claim_message([Id,Consumer,Idle,Delivered], Redis, Stream, Group,
475 Claimed0, Claimed, Options) :-
476 option(block(Block), Options),
477 BlockMS is integer(Block*1000),
478 Idle > BlockMS,
479 check_limit_deliveries(Redis, Stream, Delivered, Id, Options),
480 check_limit_claimed(Claimed0, Options),
481 option(group(Group-Me), Options),
482 debug(redis(claim), '~p: trying to claim ~p from ~p (idle ~D)',
483 [Me, Id, Consumer, Idle]),
484 redis(Redis, xclaim(Stream, Group, Me, BlockMS, Id), ClaimedMsgs),
485 !,
486 Claimed is Claimed0 + 1,
487 debug(redis(claimed), '~p: claimed ~p', [Me, ClaimedMsgs]),
488 dispatch_claimed(ClaimedMsgs, Redis, Stream, Options).
489claim_message(_Msg, _Redis, _Stream, _Group, Claimed, Claimed, _Options).
490
491dispatch_claimed([], _Redis, _Stream, _Options).
492dispatch_claimed([[MsgId,MsgArray]|Msgs], Redis, Stream, Options) :-
493 redis_array_dict(MsgArray, _, Dict),
494 xbroadcast_group(Redis, Stream, MsgId, Dict, Options),
495 dispatch_claimed(Msgs, Redis, Stream, Options).
496
497
504
505check_limit_deliveries(_Redis, _Stream, Delivered, _Id, Options) :-
506 option(max_deliveries(Max), Options, 3),
507 Delivered =< Max,
508 !.
509check_limit_deliveries(Redis, Stream, Delivered, Id, Options) :-
510 option(group(Group-_Me), Options),
511 ( xhook(Stream, delivery_failed(Id,Group,Delivered))
512 -> true
513 ; print_message(warning, redis(delivery_failed(Id,Group,Delivered))),
514 redis(Redis, xack(Stream, Group, Id))
515 ),
516 fail.
517
518check_limit_claimed(Claimed0, Options) :-
519 option(max_claim(Max), Options, 10),
520 Claimed0 < Max.
521
522
528
529xleave_group(Redis, Group, Consumer, [Stream|_]) :-
530 redis(Redis, xgroup(delconsumer, Stream, Group, Consumer), _).
531
537
538xconsumer_stop(Leave) :-
539 throw(redis(stop(Leave))).
540
541
542 545
563
564
565 568
569:- multifile prolog:message//1. 570
571prolog:message(redis(Message)) -->
572 [ 'REDIS: '-[] ],
573 redis_message(Message).
574
575redis_message(delivery_failed(Id,Group,Delivered)) -->
576 [ 'Failed to deliver ~p to group ~p (tried ~D times)'-
577 [Id, Group, Delivered]
578 ]