41
42:- module(redis,
43 [ redis_server/3, 44 redis_connect/1, 45 redis_connect/3, 46 redis_disconnect/1, 47 redis_disconnect/2, 48 49 redis/1, 50 redis/2, 51 redis/3, 52 53 redis_get_list/3, 54 redis_get_list/4, 55 redis_set_list/3, 56 redis_get_hash/3, 57 redis_set_hash/3, 58 redis_scan/3, 59 redis_sscan/4, 60 redis_hscan/4, 61 redis_zscan/4, 62 63 redis_subscribe/4, 64 redis_subscribe/2, 65 redis_unsubscribe/2, 66 redis_current_subscription/2, 67 redis_write/2, 68 redis_read/2, 69 70 redis_array_dict/3, 71 72 redis_property/2, 73 redis_current_command/2, 74 redis_current_command/3 75 ]). 76:- autoload(library(socket), [tcp_connect/3]). 77:- autoload(library(apply), [maplist/2, convlist/3, maplist/3, maplist/5]). 78:- autoload(library(broadcast), [broadcast/1]). 79:- autoload(library(error),
80 [ must_be/2,
81 instantiation_error/1,
82 uninstantiation_error/1,
83 existence_error/2
84 ]). 85:- autoload(library(lazy_lists), [lazy_list/2]). 86:- autoload(library(lists), [append/3, member/2]). 87:- autoload(library(option), [merge_options/3, option/2, option/3]). 88:- autoload(library(pairs), [group_pairs_by_key/2]). 89:- use_module(library(debug), [debug/3, assertion/1]). 90:- use_module(library(settings), [setting/4, setting/2]). 91
92:- use_foreign_library(foreign(redis4pl)). 93
94:- setting(max_retry_count, nonneg, 8640, 95 "Max number of retries"). 96:- setting(max_retry_wait, number, 10,
97 "Max time to wait between recovery attempts"). 98
99:- predicate_options(redis_server/3, 3,
100 [ pass_to(redis:redis_connect/3, 3)
101 ]). 102:- predicate_options(redis_connect/3, 3,
103 [ reconnect(boolean),
104 user(atom),
105 password(atomic),
106 version(between(2,3))
107 ]). 108:- predicate_options(redis_disconnect/2, 2,
109 [ force(boolean)
110 ]). 111:- predicate_options(redis_scan/3, 3,
112 [ match(atomic),
113 count(nonneg),
114 type(atom)
115 ]). 117:- predicate_options(redis_sscan/4, 4, [pass_to(redis:redis_scan/3, 3)]). 118:- predicate_options(redis_hscan/4, 4, [pass_to(redis:redis_scan/3, 3)]). 119:- predicate_options(redis_zscan/4, 4, [pass_to(redis:redis_scan/3, 3)]). 120
121
139
140:- dynamic server/3. 141
142:- dynamic ( connection/2 143 ) as volatile. 144
156
157redis_server(Alias, Address, Options) :-
158 must_be(ground, Alias),
159 retractall(server(Alias, _, _)),
160 asserta(server(Alias, Address, Options)).
161
162server(default, localhost:6379, []).
163
199
200redis_connect(Conn) :-
201 redis_connect(default, Conn, []).
202
203redis_connect(Conn, Host, Port) :-
204 var(Conn),
205 ground(Host), ground(Port),
206 !, 207 redis_connect(Host:Port, Conn, []).
208redis_connect(Server, Conn, Options) :-
209 atom(Server),
210 !,
211 ( server(Server, Address, DefaultOptions)
212 -> merge_options(Options, DefaultOptions, Options2),
213 do_connect(Server, Address, Conn, [address(Address)|Options2])
214 ; existence_error(redis_server, Server)
215 ).
216redis_connect(Address, Conn, Options) :-
217 do_connect(Address, Address, Conn, [address(Address)|Options]).
218
224
225do_connect(Id, Address0, Conn, Options) :-
226 tcp_address(Address0, Address),
227 tcp_connect(Address, Stream, Options),
228 Conn = redis_connection(Id, Stream, 0, Options),
229 hello(Conn, Options).
230
231tcp_address(unix(Path), Path) :-
232 !. 233tcp_address(Address, Address).
234
235
240
241hello(Con, Options) :-
242 option(version(V), Options),
243 V >= 3,
244 !,
245 ( option(user(User), Options),
246 option(password(Password), Options)
247 -> redis(Con, hello(3, auth, User, Password))
248 ; redis(Con, hello(3))
249 ).
250hello(Con, Options) :-
251 option(password(Password), Options),
252 !,
253 redis(Con, auth(Password)).
254hello(_, _).
255
262
263redis_stream(Var, S, _) :-
264 ( var(Var)
265 -> !, instantiation_error(Var)
266 ; nonvar(S)
267 -> !, uninstantiation_error(S)
268 ).
269redis_stream(ServerName, S, Connect) :-
270 atom(ServerName),
271 !,
272 ( connection(ServerName, S0)
273 -> S = S0
274 ; Connect == true,
275 server(ServerName, Address, Options)
276 -> redis_connect(Address, Connection, Options),
277 redis_stream(Connection, S, false),
278 asserta(connection(ServerName, S))
279 ; existence_error(redis_server, ServerName)
280 ).
281redis_stream(redis_connection(_,S0,_,_), S, _) :-
282 S0 \== (-),
283 !,
284 S = S0.
285redis_stream(Redis, S, _) :-
286 Redis = redis_connection(Id,-,_,Options),
287 option(address(Address), Options),
288 do_connect(Id,Address,Redis2,Options),
289 arg(2, Redis2, S0),
290 nb_setarg(2, Redis, S0),
291 S = S0.
292
293has_redis_stream(Var, _) :-
294 var(Var),
295 !,
296 instantiation_error(Var).
297has_redis_stream(Alias, S) :-
298 atom(Alias),
299 !,
300 connection(Alias, S).
301has_redis_stream(redis_connection(_,S,_,_), S) :-
302 S \== (-).
303
304
317
318redis_disconnect(Redis) :-
319 redis_disconnect(Redis, []).
320
321redis_disconnect(Redis, Options) :-
322 option(force(true), Options),
323 !,
324 ( Redis = redis_connection(_Id, S, _, _Opts)
325 -> ( S == (-)
326 -> true
327 ; close(S, [force(true)]),
328 nb_setarg(2, Redis, -)
329 )
330 ; has_redis_stream(Redis, S)
331 -> close(S, [force(true)]),
332 retractall(connection(_,S))
333 ; true
334 ).
335redis_disconnect(Redis, _Options) :-
336 redis_stream(Redis, S, false),
337 close(S),
338 retractall(connection(_,S)).
339
381
382redis(Redis, PipeLine) :-
383 is_list(PipeLine),
384 !,
385 redis_pipeline(Redis, PipeLine).
386redis(Redis, Req) :-
387 redis(Redis, Req, _).
388
506
507redis(Redis, Req, Out) :-
508 out_val(Out, Val),
509 redis1(Redis, Req, Out),
510 Val \== nil.
511
512out_val(Out, Val) :-
513 ( nonvar(Out),
514 Out = (Val as _)
515 -> true
516 ; Val = Out
517 ).
518
519redis1(Redis, Req, Out) :-
520 Error = error(Formal, _),
521 catch(redis2(Redis, Req, Out), Error, true),
522 ( var(Formal)
523 -> true
524 ; recover(Error, Redis, redis1(Redis, Req, Out))
525 ).
526
527redis2(Redis, Req, Out) :-
528 atom(Redis),
529 !,
530 redis_stream(Redis, S, true),
531 with_mutex(Redis,
532 ( redis_write_msg(S, Req),
533 redis_read_stream(Redis, S, Out)
534 )).
535redis2(Redis, Req, Out) :-
536 redis_stream(Redis, S, true),
537 redis_write_msg(S, Req),
538 redis_read_stream(Redis, S, Out).
539
541
542redis_pipeline(Redis, PipeLine) :-
543 Error = error(Formal, _),
544 catch(redis_pipeline2(Redis, PipeLine), Error, true),
545 ( var(Formal)
546 -> true
547 ; recover(Error, Redis, redis_pipeline(Redis, PipeLine))
548 ).
549
550redis_pipeline2(Redis, PipeLine) :-
551 atom(Redis),
552 !,
553 redis_stream(Redis, S, true),
554 with_mutex(Redis,
555 redis_pipeline3(Redis, S, PipeLine)).
556redis_pipeline2(Redis, PipeLine) :-
557 redis_stream(Redis, S, true),
558 redis_pipeline3(Redis, S, PipeLine).
559
560redis_pipeline3(Redis, S, PipeLine) :-
561 maplist(write_pipeline(S), PipeLine),
562 flush_output(S),
563 read_pipeline(Redis, S, PipeLine).
564
565write_pipeline(S, Command -> _Reply) :-
566 !,
567 redis_write_msg_no_flush(S, Command).
568write_pipeline(S, Command) :-
569 redis_write_msg_no_flush(S, Command).
570
571read_pipeline(Redis, S, PipeLine) :-
572 E = error(Formal,_),
573 catch(read_pipeline2(Redis, S, PipeLine), E, true),
574 ( var(Formal)
575 -> true
576 ; reconnect_error(E)
577 -> redis_disconnect(Redis, [force(true)]),
578 throw(E)
579 ; resync(Redis),
580 throw(E)
581 ).
582
583read_pipeline2(Redis, S, PipeLine) :-
584 maplist(redis_read_msg3(S), PipeLine, Replies, Errors, Pushed),
585 maplist(handle_push(Redis), Pushed),
586 maplist(handle_error, Errors),
587 maplist(bind_reply, PipeLine, Replies).
588
589redis_read_msg3(S, _Command -> ReplyIn, Reply, Error, Push) :-
590 !,
591 redis_read_msg(S, ReplyIn, Reply, Error, Push).
592redis_read_msg3(S, Var, Reply, Error, Push) :-
593 redis_read_msg(S, Var, Reply, Error, Push).
594
595handle_push(Redis, Pushed) :-
596 handle_push_messages(Pushed, Redis).
597handle_error(Error) :-
598 ( var(Error)
599 -> true
600 ; throw(Error)
601 ).
602bind_reply(_Command -> Reply0, Reply) :-
603 !,
604 Reply0 = Reply.
605bind_reply(_Command, _).
606
607
613
614:- meta_predicate recover(+, +, 0). 615
616recover(Error, Redis, Goal) :-
617 reconnect_error(Error),
618 auto_reconnect(Redis),
619 !,
620 debug(redis(recover), '~p: got error ~p; trying to reconnect',
621 [Redis, Error]),
622 redis_disconnect(Redis, [force(true)]),
623 ( wait_to_retry(Redis, Error)
624 -> call(Goal),
625 retractall(failure(Redis, _))
626 ; throw(Error)
627 ).
628recover(Error, _, _) :-
629 throw(Error).
630
631auto_reconnect(redis_connection(_,_,_,Options)) :-
632 !,
633 option(reconnect(true), Options).
634auto_reconnect(Server) :-
635 ground(Server),
636 server(Server, _, Options),
637 option(reconnect(true), Options, true).
638
639reconnect_error(error(socket_error(_Code, _),_)).
640reconnect_error(error(syntax_error(unexpected_eof),_)).
641
648
649:- dynamic failure/2 as volatile. 650
651wait_to_retry(Redis, Error) :-
652 redis_failures(Redis, Failures),
653 setting(max_retry_count, Count),
654 Failures < Count,
655 Failures2 is Failures+1,
656 redis_set_failures(Redis, Failures2),
657 setting(max_retry_wait, MaxWait),
658 Wait is min(MaxWait*100, 1<<Failures)/100.0,
659 debug(redis(recover), ' Sleeping ~p seconds', [Wait]),
660 retry_message_level(Failures, Level),
661 print_message(Level, redis(retry(Redis, Failures, Wait, Error))),
662 sleep(Wait).
663
664redis_failures(redis_connection(_,_,Failures0,_), Failures) :-
665 !,
666 Failures = Failures0.
667redis_failures(Server, Failures) :-
668 atom(Server),
669 ( failure(Server, Failures)
670 -> true
671 ; Failures = 0
672 ).
673
674redis_set_failures(Connection, Count) :-
675 compound(Connection),
676 !,
677 nb_setarg(3, Connection, Count).
678redis_set_failures(Server, Count) :-
679 atom(Server),
680 retractall(failure(Server, _)),
681 asserta(failure(Server, Count)).
682
683retry_message_level(0, warning) :- !.
684retry_message_level(_, silent).
685
686
692
693redis(Req) :-
694 setup_call_cleanup(
695 redis_connect(default, C, []),
696 redis1(C, Req, Out),
697 redis_disconnect(C)),
698 print(Out).
699
705
706redis_write(Redis, Command) :-
707 redis_stream(Redis, S, true),
708 redis_write_msg(S, Command).
709
710redis_read(Redis, Reply) :-
711 redis_stream(Redis, S, true),
712 redis_read_stream(Redis, S, Reply).
713
714
715 718
733
734redis_get_list(Redis, Key, List) :-
735 redis_get_list(Redis, Key, -1, List).
736
737redis_get_list(Redis, Key, Chunk, List) :-
738 redis(Redis, llen(Key), Len),
739 ( ( Chunk >= Len
740 ; Chunk == -1
741 )
742 -> ( Len == 0
743 -> List = []
744 ; End is Len-1,
745 list_range(Redis, Key, 0, End, List)
746 )
747 ; lazy_list(rlist_next(s(Redis,Key,0,Chunk,Len)), List)
748 ).
749
750rlist_next(State, List, Tail) :-
751 State = s(Redis,Key,Offset,Slice,Len),
752 End is min(Len-1, Offset+Slice-1),
753 list_range(Redis, Key, Offset, End, Elems),
754 ( End =:= Len-1
755 -> List = Elems,
756 Tail = []
757 ; Offset2 is Offset+Slice,
758 nb_setarg(3, State, Offset2),
759 append(Elems, Tail, List)
760 ).
761
763
764list_range(DB, Key, Start, Start, [Elem]) :-
765 !,
766 redis(DB, lindex(Key, Start), Elem).
767list_range(DB, Key, Start, End, List) :-
768 !,
769 redis(DB, lrange(Key, Start, End), List).
770
771
772
779
780redis_set_list(Redis, Key, List) :-
781 redis(Redis, del(Key), _),
782 ( List == []
783 -> true
784 ; Term =.. [rpush,Key|List],
785 redis(Redis, Term, _Count)
786 ).
787
788
798
799redis_get_hash(Redis, Key, Dict) :-
800 redis(Redis, hgetall(Key), Dict as dict(auto)).
801
802redis_set_hash(Redis, Key, Dict) :-
803 redis_array_dict(Array, _, Dict),
804 Term =.. [hset,Key|Array],
805 redis(Redis, del(Key), _),
806 redis(Redis, Term, _Count).
807
816
817redis_array_dict(Array, Tag, Dict) :-
818 nonvar(Array),
819 !,
820 array_to_pairs(Array, Pairs),
821 dict_pairs(Dict, Tag, Pairs).
822redis_array_dict(TwoList, Tag, Dict) :-
823 dict_pairs(Dict, Tag, Pairs),
824 pairs_to_array(Pairs, TwoList).
825
826array_to_pairs([], []) :-
827 !.
828array_to_pairs([NameS-Value|T0], [Name-Value|T]) :-
829 !, 830 atom_string(Name, NameS),
831 array_to_pairs(T0, T).
832array_to_pairs([NameS,Value|T0], [Name-Value|T]) :-
833 atom_string(Name, NameS),
834 array_to_pairs(T0, T).
835
836pairs_to_array([], []) :-
837 !.
838pairs_to_array([Name-Value|T0], [NameS,Value|T]) :-
839 atom_string(Name, NameS),
840 pairs_to_array(T0, T).
841
863
864redis_scan(Redis, LazyList, Options) :-
865 scan_options([match,count,type], Options, Parms),
866 lazy_list(scan_next(s(scan,Redis,0,Parms)), LazyList).
867
868redis_sscan(Redis, Set, LazyList, Options) :-
869 scan_options([match,count,type], Options, Parms),
870 lazy_list(scan_next(s(sscan(Set),Redis,0,Parms)), LazyList).
871
872redis_hscan(Redis, Hash, LazyList, Options) :-
873 scan_options([match,count,type], Options, Parms),
874 lazy_list(scan_next(s(hscan(Hash),Redis,0,Parms)), LazyList).
875
876redis_zscan(Redis, Set, LazyList, Options) :-
877 scan_options([match,count,type], Options, Parms),
878 lazy_list(scan_next(s(zscan(Set),Redis,0,Parms)), LazyList).
879
880scan_options([], _, []).
881scan_options([H|T0], Options, [H,V|T]) :-
882 Term =.. [H,V],
883 option(Term, Options),
884 !,
885 scan_options(T0, Options, T).
886scan_options([_|T0], Options, T) :-
887 scan_options(T0, Options, T).
888
889
890scan_next(State, List, Tail) :-
891 State = s(Command,Redis,Cursor,Params),
892 Command =.. CList,
893 append(CList, [Cursor|Params], CList2),
894 Term =.. CList2,
895 redis(Redis, Term, [NewCursor,Elems0]),
896 scan_pairs(Command, Elems0, Elems),
897 ( NewCursor == 0
898 -> List = Elems,
899 Tail = []
900 ; nb_setarg(3, State, NewCursor),
901 append(Elems, Tail, List)
902 ).
903
904scan_pairs(hscan(_), List, Pairs) :-
905 !,
906 scan_pairs(List, Pairs).
907scan_pairs(zscan(_), List, Pairs) :-
908 !,
909 scan_pairs(List, Pairs).
910scan_pairs(_, List, List).
911
912scan_pairs([], []).
913scan_pairs([Key,Value|T0], [Key-Value|T]) :-
914 !,
915 scan_pairs(T0, T).
916scan_pairs([Key-Value|T0], [Key-Value|T]) :-
917 scan_pairs(T0, T).
918
919
920 923
930
931redis_current_command(Redis, Command) :-
932 redis_current_command(Redis, Command, _).
933
934redis_current_command(Redis, Command, Properties) :-
935 nonvar(Command),
936 !,
937 redis(Redis, command(info, Command), [[_|Properties]]).
938redis_current_command(Redis, Command, Properties) :-
939 redis(Redis, command, Commands),
940 member([Name|Properties], Commands),
941 atom_string(Command, Name).
942
948
949redis_property(Redis, Property) :-
950 redis(Redis, info, String),
951 info_terms(String, Terms),
952 member(Property, Terms).
953
954info_terms(Info, Pairs) :-
955 split_string(Info, "\n", "\r\n ", Lines),
956 convlist(info_line_term, Lines, Pairs).
957
958info_line_term(Line, Term) :-
959 sub_string(Line, B, _, A, :),
960 !,
961 sub_atom(Line, 0, B, _, Name),
962 \+ sub_atom(Name, _, _, 0, '_human'),
963 sub_string(Line, _, A, 0, ValueS),
964 ( number_string(Value, ValueS)
965 -> true
966 ; Value = ValueS
967 ),
968 Term =.. [Name,Value].
969
970
971 974
1002
1003:- dynamic ( subscription/2, 1004 listening/3 1005 ) as volatile. 1006
1007redis_subscribe(Redis, Spec, Id, Options) :-
1008 atom(Redis),
1009 !,
1010 channels(Spec, Channels),
1011 pubsub_thread_options(ThreadOptions, Options),
1012 thread_create(setup_call_cleanup(
1013 redis_connect(Redis, Conn, [reconnect(true)]),
1014 redis_subscribe1(Redis, Conn, Channels),
1015 redis_disconnect(Conn)),
1016 Thread,
1017 ThreadOptions),
1018 pubsub_id(Thread, Id).
1019redis_subscribe(Redis, Spec, Id, Options) :-
1020 channels(Spec, Channels),
1021 pubsub_thread_options(ThreadOptions, Options),
1022 thread_create(redis_subscribe1(Redis, Redis, Channels),
1023 Thread,
1024 ThreadOptions),
1025 pubsub_id(Thread, Id).
1026
1027pubsub_thread_options(ThreadOptions, Options) :-
1028 merge_options(Options, [detached(true)], ThreadOptions).
1029
1030pubsub_id(Thread, Thread).
1034
1035redis_subscribe1(Redis, Conn, Channels) :-
1036 Error = error(Formal, _),
1037 catch(redis_subscribe2(Redis, Conn, Channels), Error, true),
1038 ( var(Formal)
1039 -> true
1040 ; recover(Error, Conn, redis1(Conn, echo("reconnect"), _)),
1041 thread_self(Me),
1042 pubsub_id(Me, Id),
1043 findall(Channel, subscription(Id, Channel), CurrentChannels),
1044 redis_subscribe1(Redis, Conn, CurrentChannels)
1045 ).
1046
1047redis_subscribe2(Redis, Conn, Channels) :-
1048 redis_subscribe3(Conn, Channels),
1049 redis_listen(Redis, Conn).
1050
1051redis_subscribe3(Conn, Channels) :-
1052 thread_self(Me),
1053 pubsub_id(Me, Id),
1054 prolog_listen(this_thread_exit, pubsub_clean(Id)),
1055 maplist(register_subscription(Id), Channels),
1056 redis_stream(Conn, S, true),
1057 Req =.. [subscribe|Channels],
1058 redis_write_msg(S, Req).
1059
1060pubsub_clean(Id) :-
1061 retractall(listening(Id, _Connection, _Thread)),
1062 retractall(subscription(Id, _Channel)).
1063
1073
1074redis_subscribe(Id, Spec) :-
1075 channels(Spec, Channels),
1076 ( listening(Id, Connection, _Thread)
1077 -> true
1078 ; existence_error(redis_pubsub, Id)
1079 ),
1080 maplist(register_subscription(Id), Channels),
1081 redis_stream(Connection, S, true),
1082 Req =.. [subscribe|Channels],
1083 redis_write_msg(S, Req).
1084
1085redis_unsubscribe(Id, Spec) :-
1086 channels(Spec, Channels),
1087 ( listening(Id, Connection, _Thread)
1088 -> true
1089 ; existence_error(redis_pubsub, Id)
1090 ),
1091 maplist(unregister_subscription(Id), Channels),
1092 redis_stream(Connection, S, true),
1093 Req =.. [unsubscribe|Channels],
1094 redis_write_msg(S, Req).
1095
1099
1100redis_current_subscription(Id, Channels) :-
1101 findall(Id-Channel, subscription(Id, Channel), Pairs),
1102 keysort(Pairs, Sorted),
1103 group_pairs_by_key(Sorted, Grouped),
1104 member(Id-Channels, Grouped).
1105
1106channels(Spec, List) :-
1107 is_list(Spec),
1108 !,
1109 maplist(channel_name, Spec, List).
1110channels(Ch, [Key]) :-
1111 channel_name(Ch, Key).
1112
1113channel_name(Atom, Atom) :-
1114 atom(Atom),
1115 !.
1116channel_name(Key, Atom) :-
1117 phrase(key_parts(Key), Parts),
1118 !,
1119 atomic_list_concat(Parts, :, Atom).
1120channel_name(Key, _) :-
1121 type_error(redis_key, Key).
1122
1123key_parts(Var) -->
1124 { var(Var), !, fail }.
1125key_parts(Atom) -->
1126 { atom(Atom) },
1127 !,
1128 [Atom].
1129key_parts(A:B) -->
1130 key_parts(A),
1131 key_parts(B).
1132
1133
1134
1135
1136register_subscription(Id, Channel) :-
1137 ( subscription(Id, Channel)
1138 -> true
1139 ; assertz(subscription(Id, Channel))
1140 ).
1141
1142unregister_subscription(Id, Channel) :-
1143 retractall(subscription(Id, Channel)).
1144
1145redis_listen(Redis, Conn) :-
1146 thread_self(Me),
1147 pubsub_id(Me, Id),
1148 setup_call_cleanup(
1149 assertz(listening(Id, Conn, Me), Ref),
1150 redis_listen_loop(Redis, Id, Conn),
1151 erase(Ref)).
1152
1153redis_listen_loop(Redis, Id, Conn) :-
1154 redis_stream(Conn, S, true),
1155 ( subscription(Id, _)
1156 -> redis_read_stream(Redis, S, Reply),
1157 redis_broadcast(Redis, Reply),
1158 redis_listen_loop(Redis, Id, Conn)
1159 ; true
1160 ).
1161
1162redis_broadcast(_, [subscribe, _Channel, _N]) :-
1163 !.
1164redis_broadcast(Redis, [message, Channel, Data]) :-
1165 !,
1166 catch(broadcast(redis(Redis, Channel, Data)),
1167 Error,
1168 print_message(error, Error)).
1169redis_broadcast(Redis, Message) :-
1170 assertion((Message = [Type, Channel, _Data],
1171 atom(Type),
1172 atom(Channel))),
1173 debug(redis(warning), '~p: Unknown message while listening: ~p',
1174 [Redis,Message]).
1175
1176
1177 1180
1195
1196redis_read_stream(Redis, SI, Out) :-
1197 E = error(Formal,_),
1198 catch(redis_read_msg(SI, Out, Out0, Error, Push), E, true),
1199 ( var(Formal)
1200 -> handle_push_messages(Push, Redis),
1201 ( var(Error)
1202 -> Out = Out0
1203 ; resync(Redis),
1204 throw(Error)
1205 )
1206 ; redis_disconnect(Redis, [force(true)]),
1207 throw(E)
1208 ).
1209
1210handle_push_messages([], _).
1211handle_push_messages([H|T], Redis) :-
1212 ( catch(handle_push_message(H, Redis), E,
1213 print_message(warning, E))
1214 -> true
1215 ; true
1216 ),
1217 handle_push_messages(T, Redis).
1218
1219handle_push_message(["pubsub"|List], Redis) :-
1220 redis_broadcast(Redis, List).
1221
1222
1229
1230resync(Redis) :-
1231 E = error(Formal,_),
1232 catch(do_resync(Redis), E, true),
1233 ( var(Formal)
1234 -> true
1235 ; redis_disconnect(Redis, [force(true)]),
1236 throw(E)
1237 ).
1238
1239do_resync(Redis) :-
1240 A is random(1_000_000_000),
1241 redis_stream(Redis, S, true),
1242 redis_write_msg(S, echo(A)),
1243 '$redis_resync'(S, A).
1244
1245
1255
1256
1257
1258 1261
1262:- multifile
1263 prolog:error_message//1,
1264 prolog:message//1. 1265
1266prolog:error_message(redis_error(Code, String)) -->
1267 [ 'REDIS: ~w: ~s'-[Code, String] ].
1268
1269prolog:message(redis(retry(_Redis, _Failures, Wait, Error))) -->
1270 [ 'REDIS: connection error. Retrying in ~2f seconds'-[Wait], nl ],
1271 [ ' '-[] ], '$messages':translate_message(Error)