35
36:- module(paxos,
37 [ paxos_get/1, 38 paxos_get/2, 39 paxos_get/3, 40 paxos_set/1, 41 paxos_set/2, 42 paxos_set/3, 43 paxos_on_change/2, 44 paxos_on_change/3, 45
46 paxos_initialize/1, 47
48 paxos_admin_key/2, 49 paxos_property/1, 50 paxos_quorum_ask/4, 51 52 paxos_replicate_key/3 53 ]). 54:- autoload(library(apply),[partition/4,maplist/3]). 55:- autoload(library(broadcast),
56 [ listen/3,
57 broadcast_request/1,
58 broadcast/1,
59 unlisten/1,
60 listen/2,
61 unlisten/2
62 ]). 63:- autoload(library(debug),[debug/3]). 64:- autoload(library(error),
65 [permission_error/3,resource_error/1,must_be/2]). 66:- autoload(library(lists),[select/3,nth1/3,max_list/2,member/2]). 67:- autoload(library(option),[option/2,option/3]). 68:- autoload(library(solution_sequences),[call_nth/2]). 69:- use_module(library(settings),[setting/4,setting/2]). 70
140
141:- meta_predicate
142 paxos_on_change(?, 0),
143 paxos_on_change(?, ?, 0). 144
145:- multifile
146 paxos_message_hook/3, 147 paxos_ledger_hook/5. 148
149:- setting(max_sets, nonneg, 20,
150 "Max Retries to get to an agreement"). 151:- setting(max_gets, nonneg, 5,
152 "Max Retries to get a value from the forum"). 153:- setting(response_timeout, float, 0.020,
154 "Max time to wait for a response"). 155:- setting(replication_rate, number, 1000,
156 "Number of keys replicated per second"). 157:- setting(death_half_life, number, 10,
158 "Half-time for failure score"). 159:- setting(death_score, number, 100,
160 "Consider a node dead if cummulative failure \c
161 score exceeds this number"). 162
163
182
183:- dynamic paxos_initialized/0. 184:- volatile paxos_initialized/0. 185
186paxos_initialize(_Options) :-
187 paxos_initialized,
188 !.
189paxos_initialize(Options) :-
190 with_mutex(paxos, paxos_initialize_sync(Options)).
191
192paxos_initialize_sync(_Options) :-
193 paxos_initialized,
194 !.
195paxos_initialize_sync(Options) :-
196 at_halt(paxos_leave),
197 listen(paxos, paxos(X), paxos_message(X)),
198 paxos_assign_node(Options),
199 start_replicator,
200 asserta(paxos_initialized).
201
202paxos_initialize :-
203 paxos_initialize([]).
204
205
206 209
215
216paxos_admin_key(quorum, '$paxos_quorum').
217paxos_admin_key(dead, '$paxos_dead_nodes').
218
219paxos_get_admin(Name, Value) :-
220 paxos_admin_key(Name, Key),
221 paxos_get(Key, Value).
222
223paxos_set_admin(Name, Value) :-
224 paxos_admin_key(Name, Key),
225 paxos_set(Key, Value).
226
227paxos_set_admin_bg(Name, Value) :-
228 thread_create(ignore(paxos_set_admin(Name, Value)), _,
229 [ detached(true)
230 ]).
231
232
233 236
254
255:- dynamic
256 node/1, 257 quorum/1, 258 failed/1, 259 failed/3, 260 leaving/0, 261 dead/1, 262 salt/1. 263:- volatile
264 node/1,
265 quorum/1,
266 failed/1,
267 failed/3,
268 leaving/0,
269 dead/1,
270 salt/1. 271
277
278paxos_assign_node(Options) :-
279 ( option(node(Node), Options)
280 -> node(Node)
281 ; node(_)
282 ), 283 !.
284paxos_assign_node(Options) :-
285 between(1, 20, Retry),
286 option(node(Node), Options, Node),
287 ( node(_)
288 -> permission_error(set, paxos_node, Node)
289 ; true
290 ),
291 retractall(dead(_)),
292 retractall(quorum(_)),
293 retractall(failed(_)),
294 retractall(failed(_,_,_)),
295 retractall(leaving),
296 Salt is random(1<<63),
297 asserta(salt(Salt)),
298 paxos_message(node(N,Q,D):From, 0.25, NodeQuery),
299 findall(t(N,Q,D,From),
300 broadcast_request(NodeQuery),
301 Network),
302 select(t(self,0,Salt,Me), Network, AllNodeStatus),
303 partition(starting, AllNodeStatus, Starting, Running),
304 nth_starting(Starting, Salt, Offset),
305 retractall(salt(_)),
306 debug(paxos(node), 'Me@~p; starting: ~p; running: ~p',
307 [Me, Starting, Running]),
308 arg_union(2, Running, Quorum),
309 arg_union(3, Running, Dead),
310 ( var(Node)
311 -> ( call_nth(( between(0, 1000, Node),
312 \+ memberchk(t(Node,_,_,_), Running),
313 Dead /\ (1<<Node) =:= 0),
314 Offset)
315 -> debug(paxos(node), 'Assigning myself node ~d', [Node])
316 ; resource_error(paxos_nodes)
317 )
318 ; memberchk(t(Node,_,_,_), Running)
319 -> permission_error(set, paxos_node, Node)
320 ; Rejoin = true
321 ),
322 asserta(node(Node)),
323 ( claim_node(Node, Me)
324 -> !,
325 asserta(dead(Dead)),
326 set_quorum(Node, Quorum),
327 ( Rejoin == true
328 -> paxos_rejoin
329 ; true
330 )
331 ; debug(paxos(node), 'Node ~p already claimed; retrying (~p)',
332 [Node, Retry]),
333 retractall(node(Node)),
334 fail
335 ).
336
337starting(t(self,_Quorum,_Salt,_Address)).
338
339nth_starting(Starting, Salt, N) :-
340 maplist(arg(3), Starting, Salts),
341 sort([Salt|Salts], Sorted),
342 nth1(N, Sorted, Salt),
343 !.
344
345claim_node(Node, Me) :-
346 paxos_message(claim_node(Node, Ok):From, 0.25, NodeQuery),
347 forall(( broadcast_request(NodeQuery),
348 From \== Me,
349 debug(paxos(node), 'Claim ~p ~p: ~p', [Node, From, Ok])
350 ),
351 Ok == true).
352
353set_quorum(Node, Quorum0) :-
354 Quorum is Quorum0 \/ (1<<Node),
355 debug(paxos(node), 'Adding ~d to quorum (now 0x~16r)', [Node, Quorum]),
356 asserta(quorum(Quorum)),
357 paxos_set_admin(quorum, Quorum).
358
359
366
367paxos_rejoin :-
368 node(Node),
369 repeat,
370 ( paxos_get_admin(dead, Dead0)
371 -> Dead is Dead0 /\ \(1<<Node),
372 ( Dead == Dead0
373 -> true
374 ; paxos_set_admin(dead, Dead)
375 )
376 ; true
377 ),
378 !.
379
387
388paxos_leave :-
389 node(Node),
390 !,
391 asserta(leaving),
392 paxos_leave(Node),
393 Set is 1<<Node,
394 paxos_message(forget(Set), -, Forget),
395 broadcast(Forget),
396 unlisten(paxos),
397 retractall(leaving).
398paxos_leave.
399
400paxos_leave(Node) :-
401 !,
402 paxos_update_set(quorum, del(Node)),
403 paxos_update_set(dead, add(Node)).
404paxos_leave(_).
405
406paxos_update_set(Set, How) :-
407 repeat,
408 Term =.. [Set,Value],
409 call(Term),
410 ( How = add(Node)
411 -> NewValue is Value \/ (1<<Node)
412 ; How = del(Node)
413 -> NewValue is Value /\ \(1<<Node)
414 ),
415 ( Value == NewValue
416 -> true
417 ; paxos_set_admin(Set, NewValue)
418 -> true
419 ; leaving
420 ),
421 !.
422
423 426
434
435update_failed(Action, Quorum, Alive) :-
436 Failed is Quorum /\ \Alive,
437 alive(Alive),
438 consider_dead(Failed),
439 ( failed(Failed)
440 -> true
441 ; ( clause(failed(_Old), true, Ref)
442 -> asserta(failed(Failed)),
443 erase(Ref),
444 debug(paxos(node), 'Updated failed quorum to 0x~16r', [Failed])
445 ; asserta(failed(Failed))
446 ),
447 ( Action == set
448 -> start_replicator
449 ; true
450 )
451 ).
452
453consider_dead(0) :-
454 !.
455consider_dead(Failed) :-
456 Node is lsb(Failed),
457 consider_dead1(Node),
458 Rest is Failed /\ \(1<<Node),
459 consider_dead(Rest).
460
461consider_dead1(Node) :-
462 clause(failed(Node, Last, Score), true, Ref),
463 !,
464 setting(death_half_life, HalfLife),
465 setting(death_score, DeathScore),
466 get_time(Now),
467 Passed is Now-Last,
468 NewScore is Score*(2**(-Passed/HalfLife)) + 10,
469 asserta(failed(Node, Now, NewScore)),
470 erase(Ref),
471 ( NewScore < DeathScore
472 -> debug(paxos(node), 'Consider node ~d dead', [Node]),
473 paxos_leave(Node)
474 ; true
475 ).
476consider_dead1(Node) :-
477 get_time(Now),
478 asserta(failed(Node, Now, 10)).
479
480alive(Bitmap) :-
481 ( clause(failed(Node, _Last, _Score), true, Ref),
482 Bitmap /\ (1<<Node) =\= 0,
483 erase(Ref),
484 fail
485 ; true
486 ).
487
488
496
497life_quorum(Quorum, LifeQuorum) :-
498 quorum(Quorum),
499 ( failed(Failed),
500 Failed \== 0,
501 LifeQuorum is Quorum /\ \Failed,
502 majority(LifeQuorum, Quorum)
503 -> true
504 ; LifeQuorum = Quorum
505 ).
506
507
508 511
512:- paxos_admin_key(quorum, Key),
513 listen(paxos_changed(Key, Quorum),
514 update_quorum(Quorum)). 515:- paxos_admin_key(dead, Key),
516 listen(paxos_changed(Key, Death),
517 update_dead(Death)). 518
519update_quorum(Proposed) :-
520 debug(paxos(node), 'Received quorum proposal 0x~16r', [Proposed]),
521 quorum(Proposed),
522 !.
523update_quorum(Proposed) :-
524 leaving,
525 !,
526 update(quorum(Proposed)).
527update_quorum(Proposed) :-
528 node(Node),
529 Proposed /\ (1<<Node) =\= 0,
530 !,
531 update(quorum(Proposed)).
532update_quorum(Proposed) :-
533 node(Node),
534 NewQuorum is Proposed \/ (1<<Node),
535 update(quorum(NewQuorum)),
536 debug(paxos(node), 'I''m not in the quorum! Proposing 0x~16r', [NewQuorum]),
537 paxos_set_admin_bg(quorum, NewQuorum).
538
539update_dead(Proposed) :-
540 debug(paxos(node), 'Received dead proposal 0x~16r', [Proposed]),
541 dead(Proposed),
542 !.
543update_dead(Proposed) :-
544 leaving,
545 !,
546 update(dead(Proposed)).
547update_dead(Proposed) :-
548 node(Node),
549 Proposed /\ (1<<Node) =:= 0,
550 !,
551 update(dead(Proposed)).
552update_dead(Proposed) :-
553 node(Node),
554 NewDead is Proposed /\ \(1<<Node),
555 update(dead(NewDead)),
556 paxos_set_admin_bg(dead, NewDead).
557
558update(Clause) :-
559 functor(Clause, Name, Arity),
560 functor(Generic, Name, Arity),
561 ( clause(Generic, true, Ref)
562 -> asserta(Clause),
563 erase(Ref)
564 ; asserta(Clause)
565 ).
566
575
576paxos_property(node(NodeID)) :-
577 node(NodeID).
578paxos_property(quorum(Quorum)) :-
579 quorum(Quorum).
580paxos_property(failed(Nodes)) :-
581 failed(Nodes).
582
583
584 587
622
623paxos_message(prepare(Key,Node,Gen,Value)) :-
624 node(Node),
625 ( ledger(Key, Gen, _)
626 -> true
627 ; Gen = 0,
628 ledger_create(Key, Gen, Value)
629 ),
630 debug(paxos, 'Prepared ~p-~p@~d', [Key,Value,Gen]).
631paxos_message(accept(Key,Node,Gen,GenA,Value)) :-
632 node(Node),
633 ( ledger_update(Key, Gen, Value)
634 -> debug(paxos, 'Accepted ~p-~p@~d', [Key,Value,Gen]),
635 GenA = Gen
636 ; debug(paxos, 'Rejected ~p-~p@~d', [Key,Value,Gen]),
637 GenA = nack
638 ).
639paxos_message(changed(Key,Gen,Value,Acceptors)) :-
640 debug(paxos, 'Changed ~p-~p@~d for ~p', [Key, Value, Gen, Acceptors]),
641 ledger_update_holders(Key,Gen,Acceptors),
642 broadcast(paxos_changed(Key,Value)).
643paxos_message(learn(Key,Node,Gen,GenA,Value)) :-
644 node(Node),
645 debug(paxos, 'Learn ~p-~p@~p?', [Key, Value, Gen]),
646 ( ledger_learn(Key,Gen,Value)
647 -> debug(paxos, 'Learned ~p-~p@~d', [Key,Value,Gen]),
648 GenA = Gen
649 ; debug(paxos, 'Rejected ~p@~d', [Key, Gen]),
650 GenA = nack
651 ).
652paxos_message(learned(Key,Gen,_Value,Acceptors)) :-
653 ledger_update_holders(Key,Gen,Acceptors).
654paxos_message(retrieve(Key,Node,K,Value)) :-
655 node(Node),
656 debug(paxos, 'Retrieving ~p', [Key]),
657 ledger(Key,K,Value),
658 debug(paxos, 'Retrieved ~p-~p@~d', [Key,Value,K]),
659 !.
660paxos_message(forget(Nodes)) :-
661 ledger_forget(Nodes).
662paxos_message(node(Node,Quorum,Dead)) :-
663 ( node(Node),
664 quorum(Quorum),
665 dead(Dead)
666 -> true
667 ; salt(Salt),
668 Node = self,
669 Quorum = 0,
670 Dead = Salt
671 ).
672paxos_message(claim_node(Node, Ok)) :-
673 ( node(Node)
674 -> Ok = false
675 ; Ok = true
676 ).
677paxos_message(ask(Node, Message)) :-
678 node(Node),
679 broadcast_request(Message).
680
681
682 685
691
721
722paxos_set(Term) :-
723 paxos_key(Term, Key),
724 paxos_set(Key, Term, []).
725
726paxos_set(Key, Value) :-
727 paxos_set(Key, Value, []).
728
729paxos_set(Key, Value, Options) :-
730 must_be(ground, Key-Value),
731 paxos_initialize,
732 option(retry(Retries), Options, Retries),
733 option(timeout(TMO), Options, TMO),
734 apply_default(Retries, max_sets),
735 apply_default(TMO, response_timeout),
736 paxos_message(prepare(Key,Np,Rp,Value), TMO, Prepare),
737 between(0, Retries, _),
738 life_quorum(Quorum, Alive),
739 Alive \== 0,
740 debug(paxos, 'Set: ~p -> ~p', [Key, Value]),
741 collect(Quorum, false, Np, Rp, Prepare, Rps, PrepNodes),
742 debug(paxos, 'Set: quorum: 0x~16r, prepared by 0x~16r, gens ~p',
743 [Quorum, PrepNodes, Rps]),
744 majority(PrepNodes, Quorum),
745 max_list(Rps, K),
746 succ(K, K1),
747 paxos_message(accept(Key,Na,K1,Ra,Value), TMO, Accept),
748 collect(Alive, Ra == nack, Na, Ra, Accept, Ras, AcceptNodes),
749 majority(AcceptNodes, Quorum),
750 intersecting(PrepNodes, AcceptNodes),
751 c_element(Ras, K, K1),
752 broadcast(paxos(log(Key,Value,AcceptNodes,K1))),
753 paxos_message(changed(Key,K1,Value,AcceptNodes), -, Changed),
754 broadcast(Changed),
755 update_failed(set, Quorum, AcceptNodes),
756 !.
757
758apply_default(Var, Setting) :-
759 var(Var),
760 !,
761 setting(Setting, Var).
762apply_default(_, _).
763
764majority(SubSet, Set) :-
765 popcount(SubSet) >= (popcount(Set)+2)//2.
766
767intersecting(Set1, Set2) :-
768 Set1 /\ Set2 =\= 0.
769
770
780
781collect(Quorum, Stop, Node, Template, Message, Result, NodeSet) :-
782 State = state(0),
783 L0 = [dummy|_],
784 Answers = list(L0),
785 ( broadcast_request(Message),
786 debug(paxos(request), 'broadcast_request: ~p', [Message]),
787 ( Stop
788 -> !,
789 fail
790 ; true
791 ),
792 duplicate_term(Template, Copy),
793 NewLastCell = [Copy|_],
794 arg(1, Answers, LastCell),
795 nb_linkarg(2, LastCell, NewLastCell),
796 nb_linkarg(1, Answers, NewLastCell),
797 arg(1, State, Replied0),
798 Replied is Replied0 \/ (1<<Node),
799 nb_setarg(1, State, Replied),
800 Quorum /\ Replied =:= Quorum
801 -> true
802 ; true
803 ),
804 arg(1, State, NodeSet),
805 arg(1, Answers, [_]), 806 L0 = [_|Result].
807
823
824paxos_quorum_ask(Template, Message, Result, Options) :-
825 option(timeout(TMO), Options, TMO),
826 option(node(Node), Options, _),
827 option(quorum(Quorum), Options, Quorum),
828 apply_default(TMO, response_timeout),
829 ( var(Quorum)
830 -> life_quorum(Quorum, _Alive)
831 ; true
832 ),
833 paxos_message(ask(Node, Message), TMO, BroadcastMessage),
834 collect(Quorum, false, Node, Template, BroadcastMessage, Result, _PrepNodes).
835
841
864
865paxos_get(Term) :-
866 paxos_key(Term, Key),
867 paxos_get(Key, Term, []).
868paxos_get(Key, Value) :-
869 paxos_get(Key, Value, []).
870
871paxos_get(Key, Value, _) :-
872 ledger(Key, _Line, Value),
873 !.
874paxos_get(Key, Value, Options) :-
875 paxos_initialize,
876 option(retry(Retries), Options, Retries),
877 option(timeout(TMO), Options, TMO),
878 apply_default(Retries, max_gets),
879 apply_default(TMO, response_timeout),
880 Msg = Line-Value,
881 paxos_message(retrieve(Key,Nr,Line,Value), TMO, Retrieve),
882 node(Node),
883 between(0, Retries, _),
884 life_quorum(Quorum, Alive),
885 QuorumA is Alive /\ \(1<<Node),
886 collect(QuorumA, false, Nr, Msg, Retrieve, Terms, RetrievedNodes),
887 debug(paxos, 'Retrieved: ~p from 0x~16r', [Terms, RetrievedNodes]),
888 highest_vote(Terms, _Line-MajorityValue, Count),
889 debug(paxos, 'Best: ~p with ~d votes', [MajorityValue, Count]),
890 Count >= (popcount(QuorumA)+2)//2,
891 debug(paxos, 'Retrieve: accept ~p', [MajorityValue]),
892 update_failed(get, Quorum, RetrievedNodes),
893 paxos_set(Key, MajorityValue), 894 !.
895
896highest_vote(Terms, Term, Count) :-
897 msort(Terms, Sorted),
898 count_votes(Sorted, Counted),
899 sort(1, >, Counted, [Count-Term|_]).
900
901count_votes([], []).
902count_votes([H|T0], [N-H|T]) :-
903 count_same(H, T0, 1, N, R),
904 count_votes(R, T).
905
906count_same(H, [Hc|T0], C0, C, R) :-
907 H == Hc,
908 !,
909 C1 is C0+1,
910 count_same(H, T0, C1, C, R).
911count_same(_, R, C, C, R).
912
919
920paxos_key(Compound, '$c'(Name,Arity)) :-
921 compound(Compound), !,
922 compound_name_arity(Compound, Name, Arity).
923paxos_key(Compound, _) :-
924 must_be(compound, Compound).
925
926
927 930
939
940start_replicator :-
941 catch(thread_send_message(paxos_replicator, run),
942 error(existence_error(_,_),_),
943 fail),
944 !.
945start_replicator :-
946 catch(thread_create(replicator, _,
947 [ alias(paxos_replicator),
948 detached(true)
949 ]),
950 error(permission_error(_,_,_),_),
951 true).
952
953replicator :-
954 setting(replication_rate, ReplRate),
955 ReplSleep is 1/ReplRate,
956 node(Node),
957 debug(paxos(replicate), 'Starting replicator', []),
958 State = state(idle),
959 repeat,
960 quorum(Quorum),
961 dead(Dead),
962 LifeQuorum is Quorum /\ \Dead,
963 ( LifeQuorum /\ \(1<<Node) =:= 0
964 -> debug(paxos(replicate),
965 'Me: ~d, Quorum: 0x~16r, Dead: 0x~16r: I''m alone, waiting ...',
966 [Node, Quorum, Dead]),
967 thread_get_message(_)
968 ; ( paxos_replicate_key(LifeQuorum, Key, [])
969 -> replicated(State, key(Key)),
970 thread_self(Me),
971 thread_get_message(Me, _, [timeout(ReplSleep)])
972 ; replicated(State, idle),
973 thread_get_message(_)
974 )
975 ),
976 fail.
977
978replicated(State, key(_Key)) :-
979 arg(1, State, idle),
980 !,
981 debug(paxos(replicate), 'Start replicating ...', []),
982 nb_setarg(1, State, 1).
983replicated(State, key(_Key)) :-
984 !,
985 arg(1, State, C0),
986 C is C0+1,
987 nb_setarg(1, State, C).
988replicated(State, idle) :-
989 arg(1, State, idle),
990 !.
991replicated(State, idle) :-
992 arg(1, State, Count),
993 debug(paxos(replicate), 'Replicated ~D keys', [Count]),
994 nb_setarg(1, State, idle).
995
996
1005
1006paxos_replicate_key(Nodes, Key, Options) :-
1007 replication_key(Nodes, Key),
1008 option(timeout(TMO), Options, TMO),
1009 apply_default(TMO, response_timeout),
1010 ledger_current(Key, Gen, Value, Holders),
1011 paxos_message(learn(Key,Na,Gen,Ga,Value), TMO, Learn),
1012 collect(Nodes, Ga == nack, Na, Ga, Learn, _Gas, LearnedNodes),
1013 NewHolders is Holders \/ LearnedNodes,
1014 paxos_message(learned(Key,Gen,Value,NewHolders), -, Learned),
1015 broadcast(Learned),
1016 update_failed(replicate, Nodes, LearnedNodes).
1017
1018replication_key(_Nodes, Key) :-
1019 ground(Key),
1020 !.
1021replication_key(Nodes, Key) :-
1022 ( Nth is 1+random(popcount(Nodes))
1023 ; Nth = 1
1024 ),
1025 call_nth(needs_replicate(Nodes, Key), Nth),
1026 !.
1027
1028needs_replicate(Nodes, Key) :-
1029 ledger_current(Key, _Gen, _Value, Holders),
1030 Nodes /\ \Holders =\= 0,
1031 \+ paxos_admin_key(_, Key).
1032
1033
1034 1037
1053
1054paxos_on_change(Term, Goal) :-
1055 paxos_key(Term, Key),
1056 paxos_on_change(Key, Term, Goal).
1057
1058paxos_on_change(Key, Value, Goal) :-
1059 Goal = _:Plain,
1060 must_be(callable, Plain),
1061 ( Plain == ignore
1062 -> unlisten(paxos_user, paxos_changed(Key,Value))
1063 ; listen(paxos_user, paxos_changed(Key,Value),
1064 key_changed(Key, Value, Goal)),
1065 paxos_initialize
1066 ).
1067
1068key_changed(_Key, _Value, Goal) :-
1069 E = error(_,_),
1070 catch(thread_create(Goal, _, [detached(true)]),
1071 E, key_error(E)).
1072
1073key_error(error(permission_error(create, thread, _), _)) :-
1074 !.
1075key_error(E) :-
1076 print_message(error, E).
1077
1078
1079 1082
1086
1090
1098
1099paxos_message(Paxos:From, TMO, Message) :-
1100 paxos_message_raw(paxos(Paxos):From, TMO, Message).
1101paxos_message(Paxos, TMO, Message) :-
1102 paxos_message_raw(paxos(Paxos), TMO, Message).
1103
1104paxos_message_raw(Message, TMO, WireMessage) :-
1105 paxos_message_hook(Message, TMO, WireMessage),
1106 !.
1107paxos_message_raw(Message, TMO, WireMessage) :-
1108 throw(error(mode_error(det, fail,
1109 paxos:paxos_message_hook(Message, TMO, WireMessage)), _)).
1110
1111
1112 1115
1135
1136:- dynamic
1137 paxons_ledger/4. 1138
1142
1143ledger_current(Key, Gen, Value, Holders) :-
1144 paxos_ledger_hook(current, Key, Gen, Value, Holders).
1145ledger_current(Key, Gen, Value, Holders) :-
1146 paxons_ledger(Key, Gen, Value, Holders),
1147 valid(Holders).
1148
1149
1155
1156ledger(Key, Gen, Value) :-
1157 paxos_ledger_hook(get, Key, Gen, Value0, Holders),
1158 !,
1159 valid(Holders),
1160 Value = Value0.
1161ledger(Key, Gen, Value) :-
1162 paxons_ledger(Key, Gen, Value0, Holders),
1163 valid(Holders),
1164 !,
1165 Value = Value0.
1166
1171
1172ledger_create(Key, Gen, Value) :-
1173 paxos_ledger_hook(create, Key, Gen, Value, -),
1174 !.
1175ledger_create(Key, Gen, Value) :-
1176 get_time(Now),
1177 asserta(paxons_ledger(Key, Gen, Value, created(Now))).
1178
1183
1184ledger_update(Key, Gen, Value) :-
1185 paxos_ledger_hook(accept, Key, Gen, Value, -),
1186 !.
1187ledger_update(Key, Gen, Value) :-
1188 paxons_ledger(Key, Gen0, _Value, _Holders),
1189 !,
1190 Gen > Gen0,
1191 get_time(Now),
1192 asserta(paxons_ledger(Key, Gen, Value, accepted(Now))),
1193 ( Gen0 == 0
1194 -> retractall(paxons_ledger(Key, Gen0, _, _))
1195 ; true
1196 ).
1197
1201
1202ledger_update_holders(Key, Gen, Holders) :-
1203 paxos_ledger_hook(set, Key, Gen, _, Holders),
1204 !.
1205ledger_update_holders(Key, Gen, Holders) :-
1206 clause(paxons_ledger(Key, Gen, Value, Holders0), true, Ref),
1207 !,
1208 ( Holders0 == Holders
1209 -> true
1210 ; asserta(paxons_ledger(Key, Gen, Value, Holders)),
1211 erase(Ref)
1212 ),
1213 clean_key(Holders0, Key, Gen).
1214
1215clean_key(Holders, _Key, _Gen) :-
1216 valid(Holders),
1217 !.
1218clean_key(_, Key, Gen) :-
1219 ( clause(paxons_ledger(Key, Gen0, _Value, _Holders0), true, Ref),
1220 Gen0 < Gen,
1221 erase(Ref),
1222 fail
1223 ; true
1224 ).
1225
1226
1230
1231ledger_learn(Key,Gen,Value) :-
1232 paxos_ledger_hook(learn, Key, Gen, Value, -),
1233 !.
1234ledger_learn(Key,Gen,Value) :-
1235 paxons_ledger(Key, Gen0, Value0, _Holders),
1236 !,
1237 ( Gen == Gen0,
1238 Value == Value0
1239 -> true
1240 ; Gen > Gen0
1241 -> get_time(Now),
1242 asserta(paxons_ledger(Key, Gen, Value, learned(Now)))
1243 ).
1244ledger_learn(Key,Gen,Value) :-
1245 get_time(Now),
1246 asserta(paxons_ledger(Key, Gen, Value, learned(Now))).
1247
1252
1253ledger_forget(Nodes) :-
1254 catch(thread_create(ledger_forget_threaded(Nodes), _,
1255 [ detached(true)
1256 ]),
1257 error(permission_error(create, thread, _), _),
1258 true).
1259
1260ledger_forget_threaded(Nodes) :-
1261 debug(paxos(node), 'Forgetting 0x~16r', [Nodes]),
1262 forall(ledger_current(Key, Gen, _Value, Holders),
1263 ledger_forget(Nodes, Key, Gen, Holders)),
1264 debug(paxos(node), 'Forgotten 0x~16r', [Nodes]).
1265
1266ledger_forget(Nodes, Key, Gen, Holders) :-
1267 NewHolders is Holders /\ \Nodes,
1268 ( NewHolders \== Holders,
1269 ledger_update_holders(Key, Gen, NewHolders)
1270 -> true
1271 ; true
1272 ).
1273
1274valid(Holders) :-
1275 integer(Holders).
1276
1277
1278 1281
1287
1288c_element([New | More], _Old, New) :-
1289 forall(member(N, More), N == New),
1290 !.
1291c_element(_List, Old, Old).
1292
1297
1298arg_union(Arg, NodeStatusList, Set) :-
1299 maplist(arg(Arg), NodeStatusList, Sets),
1300 list_union(Sets, Set).
1301
1302list_union(Sets, Set) :-
1303 list_union(Sets, 0, Set).
1304
1305list_union([], Set, Set).
1306list_union([H|T], Set0, Set) :-
1307 Set1 is Set0 \/ H,
1308 list_union(T, Set1, Set)