1/* Part of SWI-Prolog 2 3 Author: Jan Wielemaker 4 E-mail: jan@swi-prolog.org 5 WWW: http://www.swi-prolog.org 6 Copyright (c) 2020, SWI-Prolog Solutions b.v. 7 All rights reserved. 8 9 Redistribution and use in source and binary forms, with or without 10 modification, are permitted provided that the following conditions 11 are met: 12 13 1. Redistributions of source code must retain the above copyright 14 notice, this list of conditions and the following disclaimer. 15 16 2. Redistributions in binary form must reproduce the above copyright 17 notice, this list of conditions and the following disclaimer in 18 the documentation and/or other materials provided with the 19 distribution. 20 21 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 29 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 30 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 31 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 32 POSSIBILITY OF SUCH DAMAGE. 33*/ 34 35:- module(redis_streams, 36 [ xstream_set/3, % +Redis, +Key, +Option 37 xadd/4, % +Redis, +Key, ?Id, +Data:dict 38 xlisten/3, % +Redis, +Streams, +Options 39 xlisten_group/5, % +Redis, +Group, +Consumer, 40 % +Streams, +Options 41 xconsumer_stop/1 % +Leave 42 ]). 43:- use_module(library(redis)). 44:- use_module(library(error)). 45:- use_module(library(apply)). 46:- use_module(library(broadcast)). 47:- use_module(library(lists)). 48:- use_module(library(option)). 49:- use_module(library(debug)). 50 51:- meta_predicate 52 xlisten( , , , , ). 53 54:- multifile 55 xhook/2. % +Stream, +Event 56 57:- predicate_options(xlisten/3, 3, 58 [ count(nonneg), 59 start(one_of([$,0])), 60 starts(list) 61 ]). 62:- predicate_options(xlisten_group/5, 5, 63 [ block(number), 64 max_deliveries(nonneg), 65 max_claim(nonneg) 66 ]).
96:- dynamic
97 xstream_option/3.
MAXLEN ~
Count option to the XADD
command, capping the length of the stream. See also
Redis as a message brokering system108xstream_set(Redis, KeyS, Option) :- 109 must_be(atom, Redis), 110 atom_string(Key, KeyS), 111 valid_option(Option), 112 functor(Option, Name, Arity), 113 functor(Gen, Name, Arity), 114 retractall(xstream_option(Redis, Key, Gen)), 115 asserta(xstream_option(Redis, Key, Option)). 116 117valid_option(Option) :- 118 stream_option(Option), 119 !. 120valid_option(Option) :- 121 domain_error(redis_stream_option, Option). 122 123stream_option(maxlen(X)) :- must_be(integer, X).
maxlen(Count)
. If Id is
unbound, generating the id is left to the server and Id is unified
with the returned id. The returned id is a string consisting of the
time stamp in milliseconds and a sequence number. See Redis docs for
details.135xadd(DB, Key, Id, Dict) :- 136 redis_array_dict(Array, _, Dict), 137 ( var(Id) 138 -> IdIn = '*' 139 ; IdIn = Id 140 ), 141 ( xstream_option(DB, Key, maxlen(MaxLen)) 142 -> Command =.. [xadd, Key, maxlen, ~, MaxLen, IdIn|Array] 143 ; Command =.. [xadd, Key, IdIn|Array] 144 ), 145 redis(DB, Command, Reply), 146 return_id(Id, Reply). 147 148return_id(Id, Reply) :- 149 var(Id), 150 !, 151 Id = Time-Seq, 152 split_string(Reply, "-", "", [TimeS,SeqS]), 153 number_string(Time, TimeS), 154 number_string(Seq, SeqS). 155return_id(_, _). 156 157 158 /******************************* 159 * SUBSCRIBE * 160 *******************************/
XREAD
on one or more Streams on the server Redis.
For each message that arrives, call broadcast/1, where Data is a
dict representing the message.
broadcast(redis(Redis, Stream, Id, Data))
Options:
0
to start get all messages from the epoch
or $
to get messages starting with the last. Default is $
.
Note that this predicate does not terminate. It is normally
executed in a thread. The following call listens to the streams
key1
and key2
on the default Redis server. Using
reconnect(true)
, the client will try to re-establish a connection if
the collection got lost.
?- redis_connect(default, C, [reconnect(true)]), thread_create(xlisten(C, [key1, key2], [start($)]), _, [detached(true)]).
201xlisten(Redis, Streams, Options) :- 202 xlisten(Redis, Streams, xbroadcast, xidle, Options). 203 204xbroadcast(Redis, Stream, Id, Dict, _Options) :- 205 redis_id(Redis, RedisId), 206 catch(broadcast(redis(RedisId, Stream, Id, Dict)), Error, 207 print_message(error, Error)). 208 209redis_id(redis(Id, _, _), Id) :- 210 !. 211redis_id(Id, Id). 212 213xidle(_Redis, _Streams, _Starts, _NewStarts, _Options).
XREAD
or XREADGROUP
has returned and the messages are processed. These
callbacks are called as follows:
call(OnBroadCast, +Redis, +Stream, +MessageId, +Dict) call(OnIdle, +Redis, +Streams, +Starts, +NewStarts, +Options)
Both callbacks must succeeds and not leave any open choice points. Failure or exception causes xlisten/5 to stop.
228xlisten(Redis, Streams, OnBroadcast, OnIdle, Options) :- 229 atom(Redis), 230 !, 231 setup_call_cleanup( 232 redis_connect(Redis, Conn, [reconnect(true)|Options]), 233 xlisten_(Conn, Streams, OnBroadcast, OnIdle, Options), 234 redis_disconnect(Conn)). 235xlisten(Redis, Streams, OnBroadcast, OnIdle, Options) :- 236 xlisten_(Redis, Streams, OnBroadcast, OnIdle, Options). 237 238xlisten_(Redis, Streams, OnBroadcast, OnIdle, Options) :- 239 xread_command(Streams, Starts0, CommandTempl, Options), 240 listen_loop(Redis, Starts0, CommandTempl, 241 OnBroadcast, OnIdle, Streams, Options). 242 243xread_command(Streams, Starts0, Starts-Command, Options) :- 244 option(group(Group-Consumer), Options), 245 !, 246 xread_command_args(Streams, Starts0, Starts, CmdArgs, Options), 247 Command =.. [xreadgroup, group, Group, Consumer | CmdArgs]. 248xread_command(Streams, Starts0, Starts-Command, Options) :- 249 xread_command_args(Streams, Starts0, Starts, CmdArgs, Options), 250 Command =.. [xread|CmdArgs]. 251 252xread_command_args(Streams, Starts0, Starts, CmdArgs, Options) :- 253 option(count(Count), Options), 254 !, 255 option(block(Block), Options, 0), 256 BlockMS is integer(Block*1000), 257 start_templ(Streams, Starts0, Starts, StreamArgs, Options), 258 CmdArgs = [count, Count, block, BlockMS, streams | StreamArgs]. 259xread_command_args(Streams, Starts0, Starts, CmdArgs, Options) :- 260 option(block(Block), Options, 0), 261 BlockMS is integer(Block*1000), 262 start_templ(Streams, Starts0, Starts, StreamArgs, Options), 263 CmdArgs = [block, BlockMS, streams | StreamArgs]. 264 265start_templ(Streams, Starts0, Starts, StreamArgs, Options) :- 266 option(starts(Starts0), Options), 267 !, 268 length(Streams, Len), 269 length(Starts, Len), 270 length(Starts0, LenS), 271 ( LenS =:= Len 272 -> true 273 ; domain_error(xread_starts, Starts0) 274 ), 275 append(Streams, Starts, StreamArgs). 276start_templ(Streams, Starts0, Starts, StreamArgs, Options) :- 277 option(start(Start), Options, $), 278 !, 279 length(Streams, Len), 280 length(Starts, Len), 281 length(Starts0, Len), 282 maplist(=(Start), Starts0), 283 append(Streams, Starts, StreamArgs). 284 285listen_loop(Redis, Starts, CommandTempl, OnBroadcast, OnIdle, Streams, Options) :- 286 copy_term(CommandTempl, Starts-Command), 287 ( redis(Redis, Command, Reply), 288 Reply \== nil 289 -> dispatch_streams(Reply, Redis, Streams, Starts, NewStarts, 290 OnBroadcast, OnIdle, Options) 291 ; NewStarts = Starts 292 ), 293 call(OnIdle, Redis, Streams, Starts, NewStarts, Options), 294 listen_loop(Redis, NewStarts, CommandTempl, 295 OnBroadcast, OnIdle, Streams, Options). 296 297dispatch_streams([], _, _, Starts, NewStarts, _, _, _) :- 298 maplist(copy_start, Starts, NewStarts). 299dispatch_streams([Tuple|T], Redis, Streams, Starts, NewStarts, 300 OnBroadcast, OnIdle, Options) :- 301 stream_tuple(Tuple, StreamS, []), 302 atom_string(Stream, StreamS), 303 !, % xreadgroup: no more old pending stuff 304 set_start(Stream, _Start, >, Streams, Starts, NewStarts), 305 dispatch_streams(T, Redis, Streams, Starts, NewStarts, 306 OnBroadcast, OnIdle, Options). 307dispatch_streams([Tuple|T], Redis, Streams, Starts, NewStarts, 308 OnBroadcast, OnIdle, Options) :- 309 stream_tuple(Tuple, StreamS, Messages), 310 atom_string(Stream, StreamS), 311 set_start(Stream, Start, NewStart, Streams, Starts, NewStarts), 312 dispatch_messages(Messages, Stream, Redis, Start, NewStart, 313 OnBroadcast, Options), 314 dispatch_streams(T, Redis, Streams, Starts, NewStarts, 315 OnBroadcast, OnIdle, Options). 316 317stream_tuple(Stream-Messages, Stream, Messages) :- !. 318stream_tuple([Stream,Messages], Stream, Messages). 319 320set_start(Stream, Old, New, [Stream|_], [Old|_], [New|_]) :- 321 !. 322set_start(Stream, Old, New, [_|Streams], [_|OldStarts], [_|NewStarts]) :- 323 set_start(Stream, Old, New, Streams, OldStarts, NewStarts). 324 325copy_start(Old, New) :- 326 ( var(New) 327 -> Old = New 328 ; true 329 ).
333dispatch_messages([], _, _, Start, Start, _, _). 334dispatch_messages([[Start,Data]|T], Stream, Redis, Start0, NewStart, 335 OnBroadcast, Options) :- 336 dispatch_message(Data, Stream, Redis, Start, OnBroadcast, Options), 337 join_starts(Start0, Start, Start1), 338 dispatch_messages(T, Stream, Redis, Start1, NewStart, OnBroadcast, Options). 339 340dispatch_message(Data, Stream, Redis, Id, OnBroadcast, Options) :- 341 ( Data == nil % when does this happen? 342 -> Dict = redis{} 343 ; redis_array_dict(Data, redis, Dict) 344 ), 345 call(OnBroadcast, Redis, Stream, Id, Dict, Options). 346 347join_starts(>, _Start, >) :- 348 !. 349join_starts(_Start0, Start, Start). 350 351 /******************************* 352 * CONSUMERS * 353 *******************************/
XACK
is sent to the server.Options processed:
XREADGROUP
to return with timeout when no messages
arrive within Seconds. On a timeout, xidle_group/5 is called
which will try to handle messages to other consumers pending
longer than Seconds. Choosing the time depends on the
application. Notably:
max_deliveries(Count)
is exceeded. Note that the original
receiver does not notice that the job is claimed and thus
multiple consumers may ultimately answer the message.XCLAIM
) a message max Count times.
Exceeding this calls xhook/2. Default Count is 3
.10
.387xlisten_group(Redis, Group, Consumer, Streams, Options) :- 388 catch(xlisten(Redis, Streams, xbroadcast_group, xidle_group, 389 [ group(Group-Consumer), 390 start(0) 391 | Options 392 ]), 393 redis(stop(Leave)), 394 true), 395 ( Leave == true 396 -> xleave_group(Redis, Group, Consumer, Streams) 397 ; true 398 ). 399 400xbroadcast_group(Connection, Stream, Id, Dict, Options) :- 401 option(group(Group-Consumer), Options), 402 redis_id(Connection, RedisId), 403 ( catch(broadcast_request(redis_consume(Stream, Dict, 404 _{redis:RedisId, 405 message:Id, 406 group:Group, 407 consumer:Consumer})), 408 Error, xbroadcast_error(Error, Connection, Stream, Group, Id)) 409 -> redis(Connection, xack(Stream, Group, Id)) 410 ; true 411 ). 412 413xbroadcast_error(redis(stop(Unregister)), Connection, Stream, Group, Id) :- 414 !, 415 redis(Connection, xack(Stream, Group, Id), _), 416 throw(redis(stop(Unregister))). 417xbroadcast_error(Error, _Connection, _Stream, _Group, _Id) :- 418 print_message(error, Error), 419 fail.
XREADGROUP
returns and the returned messages (if
any) have been processed. If Start == NewStarts
no messages have
been processed, indicating a timeout.
This implementation looks for idle messages on other consumer and will try to claim them.
432xidle_group(Redis, Streams, Starts, Starts, Options) :- % Idle time 433 !, 434 option(group(Group-_Consumer), Options), 435 claim(Streams, Redis, Group, 0, _Claimed, Options). 436xidle_group(_Redis, _Streams, _Starts, _NewStarts, _Options). 437 438claim([], _, _, Claimed, Claimed, _). 439claim([Stream|ST], Redis, Group, Claimed0, Claimed, Options) :- 440 claim_for_stream(Redis, Stream, Group, Claimed0, Claimed1, Options), 441 claim(ST, Redis, Group, Claimed1, Claimed, Options). 442 443claim_for_stream(Redis, Stream, Group, Claimed0, Claimed, Options) :- 444 check_limit_claimed(Claimed0, Options), 445 redis(Redis, xpending(Stream, Group), [Count,_Oldest,_Newest, Cons]), 446 Count > 0, 447 !, 448 debug(redis(claim), '~D pending messages on ~p for ~p (Consumers = ~p)', 449 [Count, Stream, Group, Cons]), 450 claim_for_stream_for_consumers(Cons, Redis, Stream, Group, 451 Claimed0, Claimed, Options). 452claim_for_stream(_Redis, _Stream, _Group, Claimed, Claimed, _Options). 453 454claim_for_stream_for_consumers([], _Redis, _Stream, _Group, 455 Claimed, Claimed, _Options). 456claim_for_stream_for_consumers([C|T], Redis, Stream, Group, 457 Claimed0, Claimed, Options) :- 458 claim_for_stream_for_consumer(Redis, Stream, Group, C, 459 Claimed0, Claimed1, Options), 460 claim_for_stream_for_consumers(T, Redis, Stream, Group, 461 Claimed1, Claimed, Options). 462 463claim_for_stream_for_consumer(Redis, Stream, Group, [Consumer,_Pending], 464 Claimed0, Claimed, Options) :- 465 redis(Redis, xpending(Stream, Group, -, +, 10, Consumer), Reply), 466 claim_messages(Reply, Redis, Stream, Group, Claimed0, Claimed, Options). 467 468claim_messages([], _Redis, _Stream, _Group, Claimed, Claimed, _Options). 469claim_messages([H|T], Redis, Stream, Group, Claimed0, Claimed, Options) :- 470 debug(redis(claim), 'Found pending ~p', [H]), 471 claim_message(H, Redis, Stream, Group, Claimed0, Claimed1, Options), 472 claim_messages(T, Redis, Stream, Group, Claimed1, Claimed, Options). 473 474claim_message([Id,Consumer,Idle,Delivered], Redis, Stream, Group, 475 Claimed0, Claimed, Options) :- 476 option(block(Block), Options), 477 BlockMS is integer(Block*1000), 478 Idle > BlockMS, 479 check_limit_deliveries(Redis, Stream, Delivered, Id, Options), 480 check_limit_claimed(Claimed0, Options), 481 option(group(Group-Me), Options), 482 debug(redis(claim), '~p: trying to claim ~p from ~p (idle ~D)', 483 [Me, Id, Consumer, Idle]), 484 redis(Redis, xclaim(Stream, Group, Me, BlockMS, Id), ClaimedMsgs), 485 !, 486 Claimed is Claimed0 + 1, 487 debug(redis(claimed), '~p: claimed ~p', [Me, ClaimedMsgs]), 488 dispatch_claimed(ClaimedMsgs, Redis, Stream, Options). 489claim_message(_Msg, _Redis, _Stream, _Group, Claimed, Claimed, _Options). 490 491dispatch_claimed([], _Redis, _Stream, _Options). 492dispatch_claimed([[MsgId,MsgArray]|Msgs], Redis, Stream, Options) :- 493 redis_array_dict(MsgArray, _, Dict), 494 xbroadcast_group(Redis, Stream, MsgId, Dict, Options), 495 dispatch_claimed(Msgs, Redis, Stream, Options).
505check_limit_deliveries(_Redis, _Stream, Delivered, _Id, Options) :- 506 option(max_deliveries(Max), Options, 3), 507 Delivered =< Max, 508 !. 509check_limit_deliveries(Redis, Stream, Delivered, Id, Options) :- 510 option(group(Group-_Me), Options), 511 ( xhook(Stream, delivery_failed(Id,Group,Delivered)) 512 -> true 513 ; print_message(warning, redis(delivery_failed(Id,Group,Delivered))), 514 redis(Redis, xack(Stream, Group, Id)) 515 ), 516 fail. 517 518check_limit_claimed(Claimed0, Options) :- 519 option(max_claim(Max), Options, 10), 520 Claimed0 < Max.
529xleave_group(Redis, Group, Consumer, [Stream|_]) :-
530 redis(Redis, xgroup(delconsumer, Stream, Group, Consumer), _).
redis(stop(Leave))
, which is caught
by xlisten_group/5.538xconsumer_stop(Leave) :- 539 throw(redis(stop(Leave))). 540 541 542 /******************************* 543 * HOOKS * 544 *******************************/
XACK
. From introduction
to streams:
"So once the deliveries counter reaches a given large number that you chose, it is probably wiser to put such messages in another stream and send a notification to the system administrator. This is basically the way that Redis streams implement the concept of the dead letter."
565 /******************************* 566 * MESSAGES * 567 *******************************/ 568 569:- multifile prolog:message//1. 570 571prologmessage(redis(Message)) --> 572 [ 'REDIS: '-[] ], 573 redis_message(Message). 574 575redis_message(delivery_failed(Id,Group,Delivered)) --> 576 [ 'Failed to deliver ~p to group ~p (tried ~D times)'- 577 [Id, Group, Delivered] 578 ]
Using Redis streams
A Redis stream is a set of messages consisting of key-value pairs that are identified by a time and sequence number. Streams are powerful objects that can roughly be used for three purposes:
This library abstracts the latter two scenarios. The main predicates are