/* Part of SWI-Prolog Author: Jeffrey Rosenwald, Jan Wielemaker E-mail: jeffrose@acm.org WWW: http://www.swi-prolog.org Copyright (c) 2009-2019, Jeffrey Rosenwald CWI, Amsterdam All rights reserved. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ :- module(paxos, [ paxos_get/1, % ?Term paxos_get/2, % +Key, -Value paxos_get/3, % +Key, -Value, +Options paxos_set/1, % ?Term paxos_set/2, % +Key, +Value paxos_set/3, % +Key, +Value, +Options paxos_on_change/2, % ?Term, +Goal paxos_on_change/3, % ?Key, ?Value, +Goal paxos_initialize/1, % +Options paxos_admin_key/2, % ?Name, ?Key paxos_property/1, % ?Property paxos_quorum_ask/4, % ?Templ, +Msg, -Result, +Options % Hook support paxos_replicate_key/3 % +Nodes, ?Key, +Options ]). :- autoload(library(apply),[partition/4,maplist/3]). :- autoload(library(broadcast), [ listen/3, broadcast_request/1, broadcast/1, unlisten/1, listen/2, unlisten/2 ]). :- autoload(library(debug),[debug/3]). :- autoload(library(error), [permission_error/3,resource_error/1,must_be/2]). :- autoload(library(lists),[select/3,nth1/3,max_list/2,member/2]). :- autoload(library(option),[option/2,option/3]). :- autoload(library(solution_sequences),[call_nth/2]). :- use_module(library(settings),[setting/4,setting/2]). /** A Replicated Data Store This module provides a replicated data store that is coordinated using a variation on Lamport's Paxos concensus protocol. The original method is described in his paper entitled, "The Part-time Parliament", which was published in 1998. The algorithm is tolerant of non-Byzantine failure. That is late or lost delivery or reply, but not senseless delivery or reply. The present algorithm takes advantage of the convenience offered by multicast to the quorum's membership, who can remain anonymous and who can come and go as they please without effecting Liveness or Safety properties. Paxos' quorum is a set of one or more attentive members, whose processes respond to queries within some known time limit (< 20ms), which includes roundtrip delivery delay. This property is easy to satisfy given that every coordinator is necessarily a member of the quorum as well, and a quorum of one is permitted. An inattentive member (e.g. one whose actions are late or lost) is deemed to be "not-present" for the purposes of the present transaction and consistency cannot be assured for that member. As long as there is at least one attentive member of the quorum, then persistence of the database is assured. Each member maintains a ledger of terms along with information about when they were originally recorded. The member's ledger is deterministic. That is to say that there can only be one entry per functor/arity combination. No member will accept a new term proposal that has a line number that is equal-to or lower-than the one that is already recorded in the ledger. Paxos is a three-phase protocol: 1: A coordinator first prepares the quorum for a new proposal by broadcasting a proposed term. The quorum responds by returning the last known line number for that functor/arity combination that is recorded in their respective ledgers. 2: The coordinator selects the highest line number it receives, increments it by one, and then asks the quorum to finally accept the new term with the new line number. The quorum checks their respective ledgers once again and if there is still no other ledger entry for that functor/arity combination that is equal-to or higher than the specified line, then each member records the term in the ledger at the specified line. The member indicates consent by returning the specified line number back to the coordinator. If consent is withheld by a member, then the member returns a =nack= instead. The coordinator requires unanimous consent. If it isn't achieved then the proposal fails and the coordinator must start over from the beginning. 3: Finally, the coordinator concludes the successful negotiation by broadcasting the agreement to the quorum in the form of a paxos_changed(Key,Value) event. This is the only event that should be of interest to user programs. For practical reasons, we rely on the partially synchronous behavior (e.g. limited upper time bound for replies) of broadcast_request/1 over TIPC to ensure Progress. Perhaps more importantly, we rely on the fact that the TIPC broadcast listener state machine guarantees the atomicity of broadcast_request/1 at the process level, thus obviating the need for external mutual exclusion mechanisms. _|Note that this algorithm does not guarantee the rightness of the value proposed. It only guarantees that if successful, the value proposed is identical for all attentive members of the quorum.|_ @author Jeffrey Rosenwald (JeffRose@acm.org) @license BSD-2 @see tipc_broadcast.pl, udp_broadcast.pl */ :- meta_predicate paxos_on_change(?, 0), paxos_on_change(?, ?, 0). :- multifile paxos_message_hook/3, % +PaxOS, +TimeOut, -Message paxos_ledger_hook/5. % +Op, ?Key, ?Gen, ?Value, ?Status :- setting(max_sets, nonneg, 20, "Max Retries to get to an agreement"). :- setting(max_gets, nonneg, 5, "Max Retries to get a value from the forum"). :- setting(response_timeout, float, 0.020, "Max time to wait for a response"). :- setting(replication_rate, number, 1000, "Number of keys replicated per second"). :- setting(death_half_life, number, 10, "Half-time for failure score"). :- setting(death_score, number, 100, "Consider a node dead if cummulative failure \c score exceeds this number"). %! paxos_initialize(+Options) is det. % % Initialize this Prolog process as a paxos node. The initialization % requires an initialized and configured TIPC, UDP or other broadcast % protocol. Calling this initialization may be omitted, in which case % the equivant of paxos_initialize([]) is executed lazily as part of % the first paxos operation. Defined options: % % - node(?NodeID) % When instantiated, this node rejoins the network with the given % node id. A fixed node idea should be used if the node is % configured for persistency and causes the new node to receive % updates for keys that have been created or modified since the % node left the network. If NodeID is a variable it is unified % with the discovered NodeID. % % NodeID must be a small non-negative integer as these identifiers % are used in bitmaps. :- dynamic paxos_initialized/0. :- volatile paxos_initialized/0. paxos_initialize(_Options) :- paxos_initialized, !. paxos_initialize(Options) :- with_mutex(paxos, paxos_initialize_sync(Options)). paxos_initialize_sync(_Options) :- paxos_initialized, !. paxos_initialize_sync(Options) :- at_halt(paxos_leave), listen(paxos, paxos(X), paxos_message(X)), paxos_assign_node(Options), start_replicator, asserta(paxos_initialized). paxos_initialize :- paxos_initialize([]). /******************************* * ADMIN * *******************************/ %! paxos_get_admin(+Name, -Value) is semidet. %! paxos_set_admin(+Name, +Value) is semidet. % % Set administrative keys. We use a wrapper such that we can hide the % key identity. paxos_admin_key(quorum, '$paxos_quorum'). paxos_admin_key(dead, '$paxos_dead_nodes'). paxos_get_admin(Name, Value) :- paxos_admin_key(Name, Key), paxos_get(Key, Value). paxos_set_admin(Name, Value) :- paxos_admin_key(Name, Key), paxos_set(Key, Value). paxos_set_admin_bg(Name, Value) :- thread_create(ignore(paxos_set_admin(Name, Value)), _, [ detached(true) ]). /******************************* * NODE DATA * *******************************/ %! node(?NodeId). %! quorum(?Bitmap). %! dead(?Bitmap). %! failed(?Bitmap). %! failed(?NodeId, ?LastTried, ?Score). % % Track our identity as well as as the status of our peers in the % network. NodeId is a small integer. Multiple NodeIds are combined in % a Bitmap. % % - node/1 is our identity. % - quorum/1 is the set of members of the quorum % - failed/1 is the set of members for which the last message was % not confirmed. % - failed/3 tracks individual failed nodes. If accumulates failures % until the node is marked _dead_. % - dead/1 is the set of members that is considered dead. :- dynamic node/1, % NodeID quorum/1, % Bitmap failed/1, % Bitmap failed/3, % NodeID, LastTried, Score leaving/0, % Node is leaving dead/1, % Bitmap salt/1. % Unique key :- volatile node/1, quorum/1, failed/1, failed/3, leaving/0, dead/1, salt/1. %! paxos_assign_node(+Options) is det. % % Assign a node for this paxos instance. If node is given as an % option, this is the node id that is used. Otherwise the network is % analysed and the system selects a new node. paxos_assign_node(Options) :- ( option(node(Node), Options) -> node(Node) ; node(_) ), % already done !. paxos_assign_node(Options) :- between(1, 20, Retry), option(node(Node), Options, Node), ( node(_) -> permission_error(set, paxos_node, Node) ; true ), retractall(dead(_)), retractall(quorum(_)), retractall(failed(_)), retractall(failed(_,_,_)), retractall(leaving), Salt is random(1<<63), asserta(salt(Salt)), paxos_message(node(N,Q,D):From, 0.25, NodeQuery), findall(t(N,Q,D,From), broadcast_request(NodeQuery), Network), select(t(self,0,Salt,Me), Network, AllNodeStatus), partition(starting, AllNodeStatus, Starting, Running), nth_starting(Starting, Salt, Offset), retractall(salt(_)), debug(paxos(node), 'Me@~p; starting: ~p; running: ~p', [Me, Starting, Running]), arg_union(2, Running, Quorum), arg_union(3, Running, Dead), ( var(Node) -> ( call_nth(( between(0, 1000, Node), \+ memberchk(t(Node,_,_,_), Running), Dead /\ (1< debug(paxos(node), 'Assigning myself node ~d', [Node]) ; resource_error(paxos_nodes) ) ; memberchk(t(Node,_,_,_), Running) -> permission_error(set, paxos_node, Node) ; Rejoin = true ), asserta(node(Node)), ( claim_node(Node, Me) -> !, asserta(dead(Dead)), set_quorum(Node, Quorum), ( Rejoin == true -> paxos_rejoin ; true ) ; debug(paxos(node), 'Node ~p already claimed; retrying (~p)', [Node, Retry]), retractall(node(Node)), fail ). starting(t(self,_Quorum,_Salt,_Address)). nth_starting(Starting, Salt, N) :- maplist(arg(3), Starting, Salts), sort([Salt|Salts], Sorted), nth1(N, Sorted, Salt), !. claim_node(Node, Me) :- paxos_message(claim_node(Node, Ok):From, 0.25, NodeQuery), forall(( broadcast_request(NodeQuery), From \== Me, debug(paxos(node), 'Claim ~p ~p: ~p', [Node, From, Ok]) ), Ok == true). set_quorum(Node, Quorum0) :- Quorum is Quorum0 \/ (1< Dead is Dead0 /\ \(1< true ; paxos_set_admin(dead, Dead) ) ; true ), !. %! paxos_leave is det. %! paxos_leave(+Node) is det. % % Leave the network. The predicate paxos_leave/0 is called from % at_halt/1 to ensure the node is deleted as the process dies. The % paxos_leave/1 version is called to discard other nodes if they % repeatedly did not respond to queries. paxos_leave :- node(Node), !, asserta(leaving), paxos_leave(Node), Set is 1< NewValue is Value \/ (1< NewValue is Value /\ \(1< true ; paxos_set_admin(Set, NewValue) -> true ; leaving ), !. /******************************* * NODE STATUS * *******************************/ %! update_failed(+Action, +Quorum, +Alive) is det. % % We just sent the Quorum a message and got a reply from the set % Alive. % % @arg is one of `set`, `get` or `replicate` and indicates the % intended action. update_failed(Action, Quorum, Alive) :- Failed is Quorum /\ \Alive, alive(Alive), consider_dead(Failed), ( failed(Failed) -> true ; ( clause(failed(_Old), true, Ref) -> asserta(failed(Failed)), erase(Ref), debug(paxos(node), 'Updated failed quorum to 0x~16r', [Failed]) ; asserta(failed(Failed)) ), ( Action == set -> start_replicator ; true ) ). consider_dead(0) :- !. consider_dead(Failed) :- Node is lsb(Failed), consider_dead1(Node), Rest is Failed /\ \(1< debug(paxos(node), 'Consider node ~d dead', [Node]), paxos_leave(Node) ; true ). consider_dead1(Node) :- get_time(Now), asserta(failed(Node, Now, 10)). alive(Bitmap) :- ( clause(failed(Node, _Last, _Score), true, Ref), Bitmap /\ (1< true ; LifeQuorum = Quorum ). /******************************* * NETWORK STATUS * *******************************/ :- paxos_admin_key(quorum, Key), listen(paxos_changed(Key, Quorum), update_quorum(Quorum)). :- paxos_admin_key(dead, Key), listen(paxos_changed(Key, Death), update_dead(Death)). update_quorum(Proposed) :- debug(paxos(node), 'Received quorum proposal 0x~16r', [Proposed]), quorum(Proposed), !. update_quorum(Proposed) :- leaving, !, update(quorum(Proposed)). update_quorum(Proposed) :- node(Node), Proposed /\ (1< asserta(Clause), erase(Ref) ; asserta(Clause) ). %! paxos_property(?Property) % % True if Property is a current property for the paxos network. % Defined properties are: % % - node(?NodeID) % - quorum(?NodeBitmap) % - failed(?NodeBitmap) paxos_property(node(NodeID)) :- node(NodeID). paxos_property(quorum(Quorum)) :- quorum(Quorum). paxos_property(failed(Nodes)) :- failed(Nodes). /******************************* * INBOUND EVENTS * *******************************/ %! paxos_message(?Message) % % Handle inbound actions from our peers. Defines values for Message % are: % % - prepare(+Key,-Node,-Gen,+Value) % A request message to set Key to Value. Returns the current % generation at which we have a value or `0` for Gen and the % our node id for Node. % - accept(+Key,-Node,+Gen,-GenA,+Value) % A request message to set Key to Value if Gen is newer than % the generation we have for Key. In that case GenA is Gen. % Otherwise we reject using GenA = `nack`. % - changed(+Key,+Gen,+Value,+Acceptors) % The leader got enough accepts for setting Key to Value at Gen. % Acceptors is the set of nodes that accepted this value. % - learn(+Key,-Node,+Gen,-GenA,+Value) % Request message peforming phase one for replication to learner % nodes. % - learned(+Key,+Gen,+Value,+Acceptors) % Phase two of the replication. Confirm the newly learned knowledge. % - retrieve(+Key,-Node,-Gen,-Value) % A request message to retrieve our value for Key. Also provides % our node id and the generation. % - forget(+Nodes) % Forget the existence of Nodes. % - node(-Node,-Quorum,-Dead) % Get my view about the network. Node is the (integer) node id of % this node, Quorum is the idea of the quorum and Dead is the idea % about non-responsive nodes. % % @tbd: originally the changed was handled by a get and when not % successful with a new set, named _paxos_audit_. I don't really see % why we need this. paxos_message(prepare(Key,Node,Gen,Value)) :- node(Node), ( ledger(Key, Gen, _) -> true ; Gen = 0, ledger_create(Key, Gen, Value) ), debug(paxos, 'Prepared ~p-~p@~d', [Key,Value,Gen]). paxos_message(accept(Key,Node,Gen,GenA,Value)) :- node(Node), ( ledger_update(Key, Gen, Value) -> debug(paxos, 'Accepted ~p-~p@~d', [Key,Value,Gen]), GenA = Gen ; debug(paxos, 'Rejected ~p-~p@~d', [Key,Value,Gen]), GenA = nack ). paxos_message(changed(Key,Gen,Value,Acceptors)) :- debug(paxos, 'Changed ~p-~p@~d for ~p', [Key, Value, Gen, Acceptors]), ledger_update_holders(Key,Gen,Acceptors), broadcast(paxos_changed(Key,Value)). paxos_message(learn(Key,Node,Gen,GenA,Value)) :- node(Node), debug(paxos, 'Learn ~p-~p@~p?', [Key, Value, Gen]), ( ledger_learn(Key,Gen,Value) -> debug(paxos, 'Learned ~p-~p@~d', [Key,Value,Gen]), GenA = Gen ; debug(paxos, 'Rejected ~p@~d', [Key, Gen]), GenA = nack ). paxos_message(learned(Key,Gen,_Value,Acceptors)) :- ledger_update_holders(Key,Gen,Acceptors). paxos_message(retrieve(Key,Node,K,Value)) :- node(Node), debug(paxos, 'Retrieving ~p', [Key]), ledger(Key,K,Value), debug(paxos, 'Retrieved ~p-~p@~d', [Key,Value,K]), !. paxos_message(forget(Nodes)) :- ledger_forget(Nodes). paxos_message(node(Node,Quorum,Dead)) :- ( node(Node), quorum(Quorum), dead(Dead) -> true ; salt(Salt), Node = self, Quorum = 0, Dead = Salt ). paxos_message(claim_node(Node, Ok)) :- ( node(Node) -> Ok = false ; Ok = true ). paxos_message(ask(Node, Message)) :- node(Node), broadcast_request(Message). /******************************* * KEY-VALUE OPERATIONS * *******************************/ %% paxos_set(+Term) is semidet. % % Equivalent to paxos_key(Term,Key), pasox_set(Key,Term). I.e., Term % is a ground compound term and its key is the name/arity pair. This % version provides compatibility with older versions of this library. %% paxos_set(+Key, +Value) is semidet. %% paxos_set(+Key, +Value, +Options) is semidet. % % negotiates to have Key-Value recorded in the ledger for each of the % quorum's members. This predicate succeeds if the quorum unanimously % accepts the proposed term. If no such entry exists in the Paxon's % ledger, then one is silently created. paxos_set/1 will retry the % transaction several times (default: 20) before failing. Failure is % rare and is usually the result of a collision of two or more writers % writing to the same term at precisely the same time. On failure, it % may be useful to wait some random period of time, and then retry the % transaction. By specifying a retry count of zero, paxos_set/2 will % succeed iff the first ballot succeeds. % % On success, paxos_set/1 will also broadcast the term % paxos_changed(Key,Value), to the quorum. % % Options processed: % % - retry(Retries) % is a non-negative integer specifying the number of retries that % will be performed before a set is abandoned. Defaults to the % _setting_ `max_sets` (20). % - timeout(+Seconds) % Max time to wait for the forum to reply. Defaults to the % _setting_ `response_timeout` (0.020, 20ms). % % @arg Term is a compound that may have unbound variables. % @tbd If the Value is already current, should we simply do nothing? paxos_set(Term) :- paxos_key(Term, Key), paxos_set(Key, Term, []). paxos_set(Key, Value) :- paxos_set(Key, Value, []). paxos_set(Key, Value, Options) :- must_be(ground, Key-Value), paxos_initialize, option(retry(Retries), Options, Retries), option(timeout(TMO), Options, TMO), apply_default(Retries, max_sets), apply_default(TMO, response_timeout), paxos_message(prepare(Key,Np,Rp,Value), TMO, Prepare), between(0, Retries, _), life_quorum(Quorum, Alive), Alive \== 0, debug(paxos, 'Set: ~p -> ~p', [Key, Value]), collect(Quorum, false, Np, Rp, Prepare, Rps, PrepNodes), debug(paxos, 'Set: quorum: 0x~16r, prepared by 0x~16r, gens ~p', [Quorum, PrepNodes, Rps]), majority(PrepNodes, Quorum), max_list(Rps, K), succ(K, K1), paxos_message(accept(Key,Na,K1,Ra,Value), TMO, Accept), collect(Alive, Ra == nack, Na, Ra, Accept, Ras, AcceptNodes), majority(AcceptNodes, Quorum), intersecting(PrepNodes, AcceptNodes), c_element(Ras, K, K1), broadcast(paxos(log(Key,Value,AcceptNodes,K1))), paxos_message(changed(Key,K1,Value,AcceptNodes), -, Changed), broadcast(Changed), update_failed(set, Quorum, AcceptNodes), !. apply_default(Var, Setting) :- var(Var), !, setting(Setting, Var). apply_default(_, _). majority(SubSet, Set) :- popcount(SubSet) >= (popcount(Set)+2)//2. intersecting(Set1, Set2) :- Set1 /\ Set2 =\= 0. %! collect(+Quorum, :Stop, ?Node, ?Template, ?Message, %! -Result, -NodeSet) is semidet. % % Perform a broadcast request using Message. Node and Template share % with Message and extract the replying node and the result value from % Message. Result is the list of instantiations for Template received % and NodeSet is the set (bitmask) of Node values that replies, i.e. % |NodeSet| is length(Result). The transfer stops if all members of % the set Quorum responded or the configured timeout passed. collect(Quorum, Stop, Node, Template, Message, Result, NodeSet) :- State = state(0), L0 = [dummy|_], Answers = list(L0), ( broadcast_request(Message), debug(paxos(request), 'broadcast_request: ~p', [Message]), ( Stop -> !, fail ; true ), duplicate_term(Template, Copy), NewLastCell = [Copy|_], arg(1, Answers, LastCell), nb_linkarg(2, LastCell, NewLastCell), nb_linkarg(1, Answers, NewLastCell), arg(1, State, Replied0), Replied is Replied0 \/ (1< true ; true ), arg(1, State, NodeSet), arg(1, Answers, [_]), % close the answer list L0 = [_|Result]. %! paxos_quorum_ask(?Template, +Message, -Result, +Options) % % Ask the paxos forum for their opinion. This predicate is not really % part of the paxos protocol, but reuses notably the quorum % maintenance mechanism of this library for asking questions to the % quorum (cluster). Message is the message to be asked. Result is a % list of copies of Template from the quorum. Options: % % - timeout(+Seconds) % Max time to wait for a reply. Default is the setting % `response_timeout`. % - node(?Node) % Can be used to include the replying node into Template. % - quorum(?Quorum) % Set/query the interviewed quorum as a bitmask paxos_quorum_ask(Template, Message, Result, Options) :- option(timeout(TMO), Options, TMO), option(node(Node), Options, _), option(quorum(Quorum), Options, Quorum), apply_default(TMO, response_timeout), ( var(Quorum) -> life_quorum(Quorum, _Alive) ; true ), paxos_message(ask(Node, Message), TMO, BroadcastMessage), collect(Quorum, false, Node, Template, BroadcastMessage, Result, _PrepNodes). %! paxos_get(?Term) is semidet. % % Equivalent to paxos_key(Term,Key), pasox_get(Key,Term). I.e., Term % is a compound term and its key is the name/arity pair. This version % provides compatibility with older versions of this library. %! paxos_get(+Key, +Value) is semidet. %! paxos_get(+Key, +Value, +Options) is semidet. % % unifies Term with the entry retrieved from the Paxon's ledger. If no % such entry exists in the member's local cache, then the quorum is % asked to provide a value, which is verified for consistency. An % implied paxos_set/1 follows. This predicate succeeds if a term % with the same functor and arity exists in the Paxon's ledger, and % fails otherwise. % % Options processed: % % - retry(Retries) % is a non-negative integer specifying the number of retries that % will be performed before a set is abandoned. Defaults to the % _setting_ `max_gets` (5). % - timeout(+Seconds) % Max time to wait for the forum to reply. Defaults to the % _setting_ `response_timeout` (0.020, 20ms). % % @arg Term is a compound. Any unbound variables are unified with % those provided in the ledger entry. paxos_get(Term) :- paxos_key(Term, Key), paxos_get(Key, Term, []). paxos_get(Key, Value) :- paxos_get(Key, Value, []). paxos_get(Key, Value, _) :- ledger(Key, _Line, Value), !. paxos_get(Key, Value, Options) :- paxos_initialize, option(retry(Retries), Options, Retries), option(timeout(TMO), Options, TMO), apply_default(Retries, max_gets), apply_default(TMO, response_timeout), Msg = Line-Value, paxos_message(retrieve(Key,Nr,Line,Value), TMO, Retrieve), node(Node), between(0, Retries, _), life_quorum(Quorum, Alive), QuorumA is Alive /\ \(1<= (popcount(QuorumA)+2)//2, debug(paxos, 'Retrieve: accept ~p', [MajorityValue]), update_failed(get, Quorum, RetrievedNodes), paxos_set(Key, MajorityValue), % Is this needed? !. highest_vote(Terms, Term, Count) :- msort(Terms, Sorted), count_votes(Sorted, Counted), sort(1, >, Counted, [Count-Term|_]). count_votes([], []). count_votes([H|T0], [N-H|T]) :- count_same(H, T0, 1, N, R), count_votes(R, T). count_same(H, [Hc|T0], C0, C, R) :- H == Hc, !, C1 is C0+1, count_same(H, T0, C1, C, R). count_same(_, R, C, C, R). %! paxos_key(+Term, -Key) is det. % % Compatibility to allow for paxos_get/1, paxos_set/1, etc. The key of % a compound term is a term `'$c'(Name,Arity)`. Note that we do not % use `Name/Arity` and `X/Y` is naturally used to organize keys as % hierachical _paths_. paxos_key(Compound, '$c'(Name,Arity)) :- compound(Compound), !, compound_name_arity(Compound, Name, Arity). paxos_key(Compound, _) :- must_be(compound, Compound). /******************************* * REPLICATION * *******************************/ %! start_replicator % % Start or signal the replicator thread that there may be outstanding % replication work. This is the case if % % - The union of _quorum_ and _learners_ was extended, and thus % all data may need to be replicated to the new members. % - A paxos_set/3 was not fully acknowledged. start_replicator :- catch(thread_send_message(paxos_replicator, run), error(existence_error(_,_),_), fail), !. start_replicator :- catch(thread_create(replicator, _, [ alias(paxos_replicator), detached(true) ]), error(permission_error(_,_,_),_), true). replicator :- setting(replication_rate, ReplRate), ReplSleep is 1/ReplRate, node(Node), debug(paxos(replicate), 'Starting replicator', []), State = state(idle), repeat, quorum(Quorum), dead(Dead), LifeQuorum is Quorum /\ \Dead, ( LifeQuorum /\ \(1< debug(paxos(replicate), 'Me: ~d, Quorum: 0x~16r, Dead: 0x~16r: I''m alone, waiting ...', [Node, Quorum, Dead]), thread_get_message(_) ; ( paxos_replicate_key(LifeQuorum, Key, []) -> replicated(State, key(Key)), thread_self(Me), thread_get_message(Me, _, [timeout(ReplSleep)]) ; replicated(State, idle), thread_get_message(_) ) ), fail. replicated(State, key(_Key)) :- arg(1, State, idle), !, debug(paxos(replicate), 'Start replicating ...', []), nb_setarg(1, State, 1). replicated(State, key(_Key)) :- !, arg(1, State, C0), C is C0+1, nb_setarg(1, State, C). replicated(State, idle) :- arg(1, State, idle), !. replicated(State, idle) :- arg(1, State, Count), debug(paxos(replicate), 'Replicated ~D keys', [Count]), nb_setarg(1, State, idle). %! paxos_replicate_key(+Nodes:bitmap, ?Key, +Options) is det. % % Replicate a Key to Nodes. If Key is unbound, a random key is % selected. % % - timeout(+Seconds) % Max time to wait for the forum to reply. Defaults to the % _setting_ `response_timeout` (0.020, 20ms). paxos_replicate_key(Nodes, Key, Options) :- replication_key(Nodes, Key), option(timeout(TMO), Options, TMO), apply_default(TMO, response_timeout), ledger_current(Key, Gen, Value, Holders), paxos_message(learn(Key,Na,Gen,Ga,Value), TMO, Learn), collect(Nodes, Ga == nack, Na, Ga, Learn, _Gas, LearnedNodes), NewHolders is Holders \/ LearnedNodes, paxos_message(learned(Key,Gen,Value,NewHolders), -, Learned), broadcast(Learned), update_failed(replicate, Nodes, LearnedNodes). replication_key(_Nodes, Key) :- ground(Key), !. replication_key(Nodes, Key) :- ( Nth is 1+random(popcount(Nodes)) ; Nth = 1 ), call_nth(needs_replicate(Nodes, Key), Nth), !. needs_replicate(Nodes, Key) :- ledger_current(Key, _Gen, _Value, Holders), Nodes /\ \Holders =\= 0, \+ paxos_admin_key(_, Key). /******************************* * KEY CHANGE EVENTS * *******************************/ %! paxos_on_change(?Term, :Goal) is det. %! paxos_on_change(?Key, ?Value, :Goal) is det. % % Executes the specified Goal when Key changes. paxos_on_change/2 % listens for paxos_changed(Key,Value) notifications for Key, which % are emitted as the result of successful paxos_set/3 transactions. % When one is received for Key, then Goal is executed in a separate % thread of execution. % % @arg Term is a compound, identical to that used for % paxos_get/1. % @arg Goal is one of: % - a callable atom or term, or % - the atom =ignore=, which causes monitoring for Term to be % discontinued. paxos_on_change(Term, Goal) :- paxos_key(Term, Key), paxos_on_change(Key, Term, Goal). paxos_on_change(Key, Value, Goal) :- Goal = _:Plain, must_be(callable, Plain), ( Plain == ignore -> unlisten(paxos_user, paxos_changed(Key,Value)) ; listen(paxos_user, paxos_changed(Key,Value), key_changed(Key, Value, Goal)), paxos_initialize ). key_changed(_Key, _Value, Goal) :- E = error(_,_), catch(thread_create(Goal, _, [detached(true)]), E, key_error(E)). key_error(error(permission_error(create, thread, _), _)) :- !. key_error(E) :- print_message(error, E). /******************************* * HOOKS * *******************************/ %! node(-Node) is det. % % Get the node ID for this paxos node. %! quorum(-Quorum) is det. % % Get the current quorum as a bitmask %! paxos_message(+PaxOS, +TimeOut, -BroadcastMessage) is det. % % Transform a basic PaxOS message in a message for the broadcasting % service. This predicate is hooked by paxos_message_hook/3 with the % same signature. % % @arg TimeOut is one of `-` or a time in seconds. paxos_message(Paxos:From, TMO, Message) :- paxos_message_raw(paxos(Paxos):From, TMO, Message). paxos_message(Paxos, TMO, Message) :- paxos_message_raw(paxos(Paxos), TMO, Message). paxos_message_raw(Message, TMO, WireMessage) :- paxos_message_hook(Message, TMO, WireMessage), !. paxos_message_raw(Message, TMO, WireMessage) :- throw(error(mode_error(det, fail, paxos:paxos_message_hook(Message, TMO, WireMessage)), _)). /******************************* * STORAGE * *******************************/ %! paxos_ledger_hook(+Action, ?Key, ?Gen, ?Value, ?Holders) % % Hook called for all operations on the ledger. Defined actions are: % % - current % Enumerate our ledger content. % - get % Get a single value from our ledger. % - create % Create a new key in our ledger. % - accept % Accept a new newly proposed value for a key. Failure causes % the library to send a _NACK_ message. % - set % Final acceptance of Ken@Gen, providing the holders that accepted % the new value. % - learn % Accept new keys in a new node or node that has been offline for % some time. :- dynamic paxons_ledger/4. % Key, Gen, Value, Holders %! ledger_current(?Key, ?Gen, ?Value, ?Holders) is nondet. % % True when Key is a known key in my ledger. ledger_current(Key, Gen, Value, Holders) :- paxos_ledger_hook(current, Key, Gen, Value, Holders). ledger_current(Key, Gen, Value, Holders) :- paxons_ledger(Key, Gen, Value, Holders), valid(Holders). %! ledger(+Key, -Gen, -Value) is semidet. % % True if the ledger has Value associated with Key at generation Gen. % Note that if the value is not yet acknowledged by the leader we % should not use it. ledger(Key, Gen, Value) :- paxos_ledger_hook(get, Key, Gen, Value0, Holders), !, valid(Holders), Value = Value0. ledger(Key, Gen, Value) :- paxons_ledger(Key, Gen, Value0, Holders), valid(Holders), !, Value = Value0. %! ledger_create(+Key, +Gen, +Value) is det. % % Create a new Key-Value pair at generation Gen. This is executed % during the preparation phase. ledger_create(Key, Gen, Value) :- paxos_ledger_hook(create, Key, Gen, Value, -), !. ledger_create(Key, Gen, Value) :- get_time(Now), asserta(paxons_ledger(Key, Gen, Value, created(Now))). %! ledger_update(+Key, +Gen, +Value) is semidet. % % Update Key to Value if the current generation is older than Gen. % This reflects the accept phase of the protocol. ledger_update(Key, Gen, Value) :- paxos_ledger_hook(accept, Key, Gen, Value, -), !. ledger_update(Key, Gen, Value) :- paxons_ledger(Key, Gen0, _Value, _Holders), !, Gen > Gen0, get_time(Now), asserta(paxons_ledger(Key, Gen, Value, accepted(Now))), ( Gen0 == 0 -> retractall(paxons_ledger(Key, Gen0, _, _)) ; true ). %! ledger_update_holders(+Key, +Gen, +Holders) is det. % % The leader acknowledged that Key@Gen represents a valid new ledger_update_holders(Key, Gen, Holders) :- paxos_ledger_hook(set, Key, Gen, _, Holders), !. ledger_update_holders(Key, Gen, Holders) :- clause(paxons_ledger(Key, Gen, Value, Holders0), true, Ref), !, ( Holders0 == Holders -> true ; asserta(paxons_ledger(Key, Gen, Value, Holders)), erase(Ref) ), clean_key(Holders0, Key, Gen). clean_key(Holders, _Key, _Gen) :- valid(Holders), !. clean_key(_, Key, Gen) :- ( clause(paxons_ledger(Key, Gen0, _Value, _Holders0), true, Ref), Gen0 < Gen, erase(Ref), fail ; true ). %! ledger_learn(+Key,+Gen,+Value) is semidet. % % We received a learn event. ledger_learn(Key,Gen,Value) :- paxos_ledger_hook(learn, Key, Gen, Value, -), !. ledger_learn(Key,Gen,Value) :- paxons_ledger(Key, Gen0, Value0, _Holders), !, ( Gen == Gen0, Value == Value0 -> true ; Gen > Gen0 -> get_time(Now), asserta(paxons_ledger(Key, Gen, Value, learned(Now))) ). ledger_learn(Key,Gen,Value) :- get_time(Now), asserta(paxons_ledger(Key, Gen, Value, learned(Now))). %! ledger_forget(+Nodes) is det. % % Remove Nodes from all ledgers. This is executed in a background % thread. ledger_forget(Nodes) :- catch(thread_create(ledger_forget_threaded(Nodes), _, [ detached(true) ]), error(permission_error(create, thread, _), _), true). ledger_forget_threaded(Nodes) :- debug(paxos(node), 'Forgetting 0x~16r', [Nodes]), forall(ledger_current(Key, Gen, _Value, Holders), ledger_forget(Nodes, Key, Gen, Holders)), debug(paxos(node), 'Forgotten 0x~16r', [Nodes]). ledger_forget(Nodes, Key, Gen, Holders) :- NewHolders is Holders /\ \Nodes, ( NewHolders \== Holders, ledger_update_holders(Key, Gen, NewHolders) -> true ; true ). valid(Holders) :- integer(Holders). /******************************* * UTIL * *******************************/ %! c_element(+NewList, +Old, -Value) % % A Muller c-element is a logic block used in asynchronous logic. Its % output assumes the value of its input iff all of its inputs are % identical. Otherwise, the output retains its original value. c_element([New | More], _Old, New) :- forall(member(N, More), N == New), !. c_element(_List, Old, Old). %! arg_union(+Arg, +ListOfTerms, -Set) is det. % % Get all the nth args from ListOfTerms and do a set union on the % result. arg_union(Arg, NodeStatusList, Set) :- maplist(arg(Arg), NodeStatusList, Sets), list_union(Sets, Set). list_union(Sets, Set) :- list_union(Sets, 0, Set). list_union([], Set, Set). list_union([H|T], Set0, Set) :- Set1 is Set0 \/ H, list_union(T, Set1, Set).