View source with raw comments or as raw
    1:- encoding(utf8).
    2/*  Part of SWI-Prolog
    3
    4    Author:        Torbjörn Lager and Jan Wielemaker
    5    E-mail:        J.Wielemaker@vu.nl
    6    WWW:           http://www.swi-prolog.org
    7    Copyright (C): 2014-2020, Torbjörn Lager,
    8                              VU University Amsterdam
    9    All rights reserved.
   10
   11    Redistribution and use in source and binary forms, with or without
   12    modification, are permitted provided that the following conditions
   13    are met:
   14
   15    1. Redistributions of source code must retain the above copyright
   16       notice, this list of conditions and the following disclaimer.
   17
   18    2. Redistributions in binary form must reproduce the above copyright
   19       notice, this list of conditions and the following disclaimer in
   20       the documentation and/or other materials provided with the
   21       distribution.
   22
   23    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   24    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   25    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   26    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   27    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   28    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   29    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   30    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   31    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   32    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   33    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   34    POSSIBILITY OF SUCH DAMAGE.
   35*/
   36
   37:- module(pengines,
   38          [ pengine_create/1,                   % +Options
   39            pengine_ask/3,                      % +Pengine, :Query, +Options
   40            pengine_next/2,                     % +Pengine. +Options
   41            pengine_stop/2,                     % +Pengine. +Options
   42            pengine_event/2,                    % -Event, +Options
   43            pengine_input/2,                    % +Prompt, -Term
   44            pengine_output/1,                   % +Term
   45            pengine_respond/3,                  % +Pengine, +Input, +Options
   46            pengine_debug/2,                    % +Format, +Args
   47            pengine_self/1,                     % -Pengine
   48            pengine_pull_response/2,            % +Pengine, +Options
   49            pengine_destroy/1,                  % +Pengine
   50            pengine_destroy/2,                  % +Pengine, +Options
   51            pengine_abort/1,                    % +Pengine
   52            pengine_application/1,              % +Application
   53            current_pengine_application/1,      % ?Application
   54            pengine_property/2,                 % ?Pengine, ?Property
   55            pengine_user/1,                     % -User
   56            pengine_event_loop/2,               % :Closure, +Options
   57            pengine_rpc/2,                      % +Server, :Goal
   58            pengine_rpc/3                       % +Server, :Goal, +Options
   59          ]).

Pengines: Web Logic Programming Made Easy

The library(pengines) provides an infrastructure for creating Prolog engines in a (remote) pengine server and accessing these engines either from Prolog or JavaScript.

author
- Torbjörn Lager and Jan Wielemaker */
   70:- autoload(library(aggregate),[aggregate_all/3]).   71:- autoload(library(apply),[maplist/2,partition/4,exclude/3,maplist/3]).   72:- autoload(library(broadcast),[broadcast/1]).   73:- autoload(library(charsio),[open_chars_stream/2]).   74:- autoload(library(debug),[debug/1,debugging/1,debug/3,assertion/1]).   75:- autoload(library(error),
   76	    [ must_be/2,
   77	      existence_error/2,
   78	      permission_error/3,
   79	      domain_error/2
   80	    ]).   81:- autoload(library(filesex),[directory_file_path/3]).   82:- autoload(library(listing),[listing/1]).   83:- autoload(library(lists),[member/2,flatten/2,select/3,append/3]).   84:- autoload(library(modules),[in_temporary_module/3]).   85:- autoload(library(occurs),[sub_term/2]).   86:- autoload(library(option),
   87	    [select_option/3,option/2,option/3,select_option/4]).   88:- autoload(library(prolog_stack),[print_prolog_backtrace/2]).   89:- autoload(library(sandbox),[safe_goal/1]).   90:- autoload(library(statistics),[thread_statistics/2]).   91:- autoload(library(term_to_json),[term_to_json/2]).   92:- autoload(library(thread_pool),
   93	    [thread_pool_create/3,thread_create_in_pool/4]).   94:- autoload(library(time),[alarm/4,call_with_time_limit/2]).   95:- autoload(library(uri),
   96	    [ uri_components/2,
   97	      uri_query_components/2,
   98	      uri_data/3,
   99	      uri_data/4,
  100	      uri_encoded/3
  101	    ]).  102:- autoload(library(http/http_client),[http_read_data/3]).  103:- autoload(library(http/http_cors),[cors_enable/0,cors_enable/2]).  104:- autoload(library(http/http_dispatch),
  105	    [http_handler/3,http_404/2,http_reply_file/3]).  106:- autoload(library(http/http_open),[http_open/3]).  107:- autoload(library(http/http_parameters),[http_parameters/2]).  108:- autoload(library(http/http_stream),[is_cgi_stream/1]).  109:- autoload(library(http/http_wrapper),[http_peer/2]).  110
  111:- use_module(library(settings),[setting/2,setting/4]).  112:- use_module(library(http/http_json),
  113              [http_read_json_dict/2,reply_json/1]).  114
  115:- if(exists_source(library(uuid))).  116:- autoload(library(uuid), [uuid/2]).  117:- endif.  118
  119
  120:- meta_predicate
  121    pengine_create(:),
  122    pengine_rpc(+, +, :),
  123    pengine_event_loop(1, +).  124
  125:- multifile
  126    write_result/3,                 % +Format, +Event, +Dict
  127    event_to_json/3,                % +Event, -JSON, +Format
  128    prepare_module/3,               % +Module, +Application, +Options
  129    prepare_goal/3,                 % +GoalIn, -GoalOut, +Options
  130    authentication_hook/3,          % +Request, +Application, -User
  131    not_sandboxed/2.                % +User, +App
  132
  133:- predicate_options(pengine_create/1, 1,
  134                     [ id(-atom),
  135                       alias(atom),
  136                       application(atom),
  137                       destroy(boolean),
  138                       server(atom),
  139                       ask(compound),
  140                       template(compound),
  141                       chunk(integer),
  142                       bindings(list),
  143                       src_list(list),
  144                       src_text(any),           % text
  145                       src_url(atom),
  146                       src_predicates(list)
  147                     ]).  148:- predicate_options(pengine_ask/3, 3,
  149                     [ template(any),
  150                       chunk(integer),
  151                       bindings(list)
  152                     ]).  153:- predicate_options(pengine_next/2, 2,
  154                     [ chunk(integer),
  155                       pass_to(pengine_send/3, 3)
  156                     ]).  157:- predicate_options(pengine_stop/2, 2,
  158                     [ pass_to(pengine_send/3, 3)
  159                     ]).  160:- predicate_options(pengine_respond/3, 2,
  161                     [ pass_to(pengine_send/3, 3)
  162                     ]).  163:- predicate_options(pengine_rpc/3, 3,
  164                     [ chunk(integer),
  165                       pass_to(pengine_create/1, 1)
  166                     ]).  167:- predicate_options(pengine_send/3, 3,
  168                     [ delay(number)
  169                     ]).  170:- predicate_options(pengine_event/2, 2,
  171                     [ pass_to(thread_get_message/3, 3)
  172                     ]).  173:- predicate_options(pengine_pull_response/2, 2,
  174                     [ pass_to(http_open/3, 3)
  175                     ]).  176:- predicate_options(pengine_event_loop/2, 2,
  177                     []).                       % not yet implemented
  178
  179% :- debug(pengine(transition)).
  180:- debug(pengine(debug)).               % handle pengine_debug in pengine_rpc/3.
  181
  182goal_expansion(random_delay, Expanded) :-
  183    (   debugging(pengine(delay))
  184    ->  Expanded = do_random_delay
  185    ;   Expanded = true
  186    ).
  187
  188do_random_delay :-
  189    Delay is random(20)/1000,
  190    sleep(Delay).
  191
  192:- meta_predicate                       % internal meta predicates
  193    solve(+, ?, 0, +),
  194    findnsols_no_empty(+, ?, 0, -),
  195    pengine_event_loop(+, 1, +).
 pengine_create(:Options) is det
Creates a new pengine. Valid options are:
id(-ID)
ID gets instantiated to the id of the created pengine. ID is atomic.
alias(+Name)
The pengine is named Name (an atom). A slave pengine (child) can subsequently be referred to by this name.
application(+Application)
Application in which the pengine runs. See pengine_application/1.
server(+URL)
The pengine will run in (and in the Prolog context of) the pengine server located at URL.
src_list(+List_of_clauses)
Inject a list of Prolog clauses into the pengine.
src_text(+Atom_or_string)
Inject the clauses specified by a source text into the pengine.
src_url(+URL)
Inject the clauses specified in the file located at URL into the pengine.
src_predicates(+List)
Send the local predicates denoted by List to the remote pengine. List is a list of predicate indicators.

Remaining options are passed to http_open/3 (meaningful only for non-local pengines) and thread_create/3. Note that for thread_create/3 only options changing the stack-sizes can be used. In particular, do not pass the detached or alias options..

Successful creation of a pengine will return an event term of the following form:

create(ID, Term)
ID is the id of the pengine that was created. Term is not used at the moment.

An error will be returned if the pengine could not be created:

error(ID, Term)
ID is invalid, since no pengine was created. Term is the exception's error term. */
  250pengine_create(M:Options0) :-
  251    translate_local_sources(Options0, Options, M),
  252    (   select_option(server(BaseURL), Options, RestOptions)
  253    ->  remote_pengine_create(BaseURL, RestOptions)
  254    ;   local_pengine_create(Options)
  255    ).
 translate_local_sources(+OptionsIn, -Options, +Module) is det
Translate the src_predicates and src_list options into src_text. We need to do that anyway for remote pengines. For local pengines, we could avoid this step, but there is very little point in transferring source to a local pengine anyway as local pengines can access any Prolog predicate that you make visible to the application.

Multiple sources are concatenated to end up with a single src_text option.

  269translate_local_sources(OptionsIn, Options, Module) :-
  270    translate_local_sources(OptionsIn, Sources, Options2, Module),
  271    (   Sources == []
  272    ->  Options = Options2
  273    ;   Sources = [Source]
  274    ->  Options = [src_text(Source)|Options2]
  275    ;   atomics_to_string(Sources, Source)
  276    ->  Options = [src_text(Source)|Options2]
  277    ).
  278
  279translate_local_sources([], [], [], _).
  280translate_local_sources([H0|T], [S0|S], Options, M) :-
  281    nonvar(H0),
  282    translate_local_source(H0, S0, M),
  283    !,
  284    translate_local_sources(T, S, Options, M).
  285translate_local_sources([H|T0], S, [H|T], M) :-
  286    translate_local_sources(T0, S, T, M).
  287
  288translate_local_source(src_predicates(PIs), Source, M) :-
  289    must_be(list, PIs),
  290    with_output_to(string(Source),
  291                   maplist(list_in_module(M), PIs)).
  292translate_local_source(src_list(Terms), Source, _) :-
  293    must_be(list, Terms),
  294    with_output_to(string(Source),
  295                   forall(member(Term, Terms),
  296                          format('~k .~n', [Term]))).
  297translate_local_source(src_text(Source), Source, _).
  298
  299list_in_module(M, PI) :-
  300    listing(M:PI).
 pengine_send(+NameOrID, +Term) is det
Same as pengine_send(NameOrID, Term, []). */
  307pengine_send(Target, Event) :-
  308    pengine_send(Target, Event, []).
 pengine_send(+NameOrID, +Term, +Options) is det
Succeeds immediately and places Term in the queue of the pengine NameOrID. Options is a list of options:
delay(+Time)
The actual sending is delayed by Time seconds. Time is an integer or a float.

Any remaining options are passed to http_open/3. */

  323pengine_send(Target, Event, Options) :-
  324    must_be(atom, Target),
  325    pengine_send2(Target, Event, Options).
  326
  327pengine_send2(self, Event, Options) :-
  328    !,
  329    thread_self(Queue),
  330    delay_message(queue(Queue), Event, Options).
  331pengine_send2(Name, Event, Options) :-
  332    child(Name, Target),
  333    !,
  334    delay_message(pengine(Target), Event, Options).
  335pengine_send2(Target, Event, Options) :-
  336    delay_message(pengine(Target), Event, Options).
  337
  338delay_message(Target, Event, Options) :-
  339    option(delay(Delay), Options),
  340    !,
  341    alarm(Delay,
  342          send_message(Target, Event, Options),
  343          _AlarmID,
  344          [remove(true)]).
  345delay_message(Target, Event, Options) :-
  346    random_delay,
  347    send_message(Target, Event, Options).
  348
  349send_message(queue(Queue), Event, _) :-
  350    thread_send_message(Queue, pengine_request(Event)).
  351send_message(pengine(Pengine), Event, Options) :-
  352    (   pengine_remote(Pengine, Server)
  353    ->  remote_pengine_send(Server, Pengine, Event, Options)
  354    ;   pengine_thread(Pengine, Thread)
  355    ->  thread_send_message(Thread, pengine_request(Event))
  356    ;   existence_error(pengine, Pengine)
  357    ).
 pengine_request(-Request) is det
To be used by a pengine to wait for the next request. Such messages are placed in the queue by pengine_send/2.
  364pengine_request(Request) :-
  365    pengine_self(Self),
  366    get_pengine_application(Self, Application),
  367    setting(Application:idle_limit, IdleLimit),
  368    thread_self(Me),
  369    (   thread_get_message(Me, pengine_request(Request), [timeout(IdleLimit)])
  370    ->  true
  371    ;   Request = destroy
  372    ).
 pengine_reply(+Event) is det
 pengine_reply(+Queue, +Event) is det
Reply Event to the parent of the current Pengine or the given Queue. Such events are read by the other side with pengine_event/1.

If the message cannot be sent within the idle_limit setting of the pengine, abort the pengine.

  385pengine_reply(Event) :-
  386    pengine_parent(Queue),
  387    pengine_reply(Queue, Event).
  388
  389pengine_reply(_Queue, _Event0) :-
  390    nb_current(pengine_idle_limit_exceeded, true),
  391    !.
  392pengine_reply(Queue, Event0) :-
  393    arg(1, Event0, ID),
  394    wrap_first_answer(ID, Event0, Event),
  395    random_delay,
  396    debug(pengine(event), 'Reply to ~p: ~p', [Queue, Event]),
  397    (   pengine_self(ID),
  398        \+ pengine_detached(ID, _)
  399    ->  get_pengine_application(ID, Application),
  400        setting(Application:idle_limit, IdleLimit),
  401        debug(pengine(reply), 'Sending ~p, timout: ~q', [Event, IdleLimit]),
  402        (   thread_send_message(Queue, pengine_event(ID, Event),
  403                                [ timeout(IdleLimit)
  404                                ])
  405        ->  true
  406        ;   thread_self(Me),
  407            debug(pengine(reply), 'pengine_reply: timeout for ~q (thread ~q)',
  408                  [ID, Me]),
  409            nb_setval(pengine_idle_limit_exceeded, true),
  410            thread_detach(Me),
  411            abort
  412        )
  413    ;   thread_send_message(Queue, pengine_event(ID, Event))
  414    ).
  415
  416wrap_first_answer(ID, Event0, CreateEvent) :-
  417    wrap_first_answer_in_create_event(CreateEvent, [answer(Event0)]),
  418    arg(1, CreateEvent, ID),
  419    !,
  420    retract(wrap_first_answer_in_create_event(CreateEvent, [answer(Event0)])).
  421wrap_first_answer(_ID, Event, Event).
  422
  423
  424empty_queue :-
  425    pengine_parent(Queue),
  426    empty_queue(Queue, 0, Discarded),
  427    debug(pengine(abort), 'Abort: discarded ~D messages', [Discarded]).
  428
  429empty_queue(Queue, C0, C) :-
  430    thread_get_message(Queue, _Term, [timeout(0)]),
  431    !,
  432    C1 is C0+1,
  433    empty_queue(Queue, C1, C).
  434empty_queue(_, C, C).
 pengine_ask(+NameOrID, @Query, +Options) is det
Asks pengine NameOrID a query Query.

Options is a list of options:

template(+Template)
Template is a variable (or a term containing variables) shared with the query. By default, the template is identical to the query.
chunk(+Integer)
Retrieve solutions in chunks of Integer rather than one by one. 1 means no chunking (default). Other integers indicate the maximum number of solutions to retrieve in one chunk.
bindings(+Bindings)
Sets the global variable '$variable_names' to a list of Name = Var terms, providing access to the actual variable names.

Any remaining options are passed to pengine_send/3.

Note that the predicate pengine_ask/3 is deterministic, even for queries that have more than one solution. Also, the variables in Query will not be bound. Instead, results will be returned in the form of event terms.

success(ID, Terms, Projection, Time, More)
ID is the id of the pengine that succeeded in solving the query. Terms is a list holding instantiations of Template. Projection is a list of variable names that should be displayed. Time is the CPU time used to produce the results and finally, More is either true or false, indicating whether we can expect the pengine to be able to return more solutions or not, would we call pengine_next/2.
failure(ID)
ID is the id of the pengine that failed for lack of a solutions.
error(ID, Term)
ID is the id of the pengine throwing the exception. Term is the exception's error term.
output(ID, Term)
ID is the id of a pengine running the query that called pengine_output/1. Term is the term that was passed in the first argument of pengine_output/1 when it was called.
prompt(ID, Term)
ID is the id of the pengine that called pengine_input/2 and Term is the prompt.

Defined in terms of pengine_send/3, like so:

pengine_ask(ID, Query, Options) :-
    partition(pengine_ask_option, Options, AskOptions, SendOptions),
    pengine_send(ID, ask(Query, AskOptions), SendOptions).

*/

  499pengine_ask(ID, Query, Options) :-
  500    partition(pengine_ask_option, Options, AskOptions, SendOptions),
  501    pengine_send(ID, ask(Query, AskOptions), SendOptions).
  502
  503
  504pengine_ask_option(template(_)).
  505pengine_ask_option(chunk(_)).
  506pengine_ask_option(bindings(_)).
  507pengine_ask_option(breakpoints(_)).
 pengine_next(+NameOrID, +Options) is det
Asks pengine NameOrID for the next solution to a query started by pengine_ask/3. Defined options are:
chunk(+Count)
Modify the chunk-size to Count before asking the next set of solutions.

Remaining options are passed to pengine_send/3. The result of re-executing the current goal is returned to the caller's message queue in the form of event terms.

success(ID, Terms, Projection, Time, More)
See pengine_ask/3.
failure(ID)
ID is the id of the pengine that failed for lack of more solutions.
error(ID, Term)
ID is the id of the pengine throwing the exception. Term is the exception's error term.
output(ID, Term)
ID is the id of a pengine running the query that called pengine_output/1. Term is the term that was passed in the first argument of pengine_output/1 when it was called.
prompt(ID, Term)
ID is the id of the pengine that called pengine_input/2 and Term is the prompt.

Defined in terms of pengine_send/3, as follows:

pengine_next(ID, Options) :-
    pengine_send(ID, next, Options).

*/

  551pengine_next(ID, Options) :-
  552    select_option(chunk(Count), Options, Options1),
  553    !,
  554    pengine_send(ID, next(Count), Options1).
  555pengine_next(ID, Options) :-
  556    pengine_send(ID, next, Options).
 pengine_stop(+NameOrID, +Options) is det
Tells pengine NameOrID to stop looking for more solutions to a query started by pengine_ask/3. Options are passed to pengine_send/3.

Defined in terms of pengine_send/3, like so:

pengine_stop(ID, Options) :-
    pengine_send(ID, stop, Options).

*/

  572pengine_stop(ID, Options) :- pengine_send(ID, stop, Options).
 pengine_abort(+NameOrID) is det
Aborts the running query. The pengine goes back to state `2', waiting for new queries.
See also
- pengine_destroy/1. */
  583pengine_abort(Name) :-
  584    (   child(Name, Pengine)
  585    ->  true
  586    ;   Pengine = Name
  587    ),
  588    (   pengine_remote(Pengine, Server)
  589    ->  remote_pengine_abort(Server, Pengine, [])
  590    ;   pengine_thread(Pengine, Thread),
  591        debug(pengine(abort), 'Signalling thread ~p', [Thread]),
  592        catch(thread_signal(Thread, throw(abort_query)), _, true)
  593    ).
 pengine_destroy(+NameOrID) is det
 pengine_destroy(+NameOrID, +Options) is det
Destroys the pengine NameOrID. With the option force(true), the pengine is killed using abort/0 and pengine_destroy/2 succeeds. */
  603pengine_destroy(ID) :-
  604    pengine_destroy(ID, []).
  605
  606pengine_destroy(Name, Options) :-
  607    (   child(Name, ID)
  608    ->  true
  609    ;   ID = Name
  610    ),
  611    option(force(true), Options),
  612    !,
  613    (   pengine_thread(ID, Thread)
  614    ->  catch(thread_signal(Thread, abort),
  615              error(existence_error(thread, _), _), true)
  616    ;   true
  617    ).
  618pengine_destroy(ID, _) :-
  619    catch(pengine_send(ID, destroy),
  620          error(existence_error(pengine, ID), _),
  621          retractall(child(_,ID))).
  622
  623
  624/*================= pengines administration =======================
  625*/
 current_pengine(?Id, ?Parent, ?Location)
Dynamic predicate that registers our known pengines. Id is an atomic unique datatype. Parent is the id of our parent pengine. Location is one of
  636:- dynamic
  637    current_pengine/6,              % Id, ParentId, Thread, URL, App, Destroy
  638    pengine_queue/4,                % Id, Queue, TimeOut, Time
  639    output_queue/3,                 % Id, Queue, Time
  640    pengine_user/2,                 % Id, User
  641    pengine_data/2,                 % Id, Data
  642    pengine_detached/2.             % Id, Data
  643:- volatile
  644    current_pengine/6,
  645    pengine_queue/4,
  646    output_queue/3,
  647    pengine_user/2,
  648    pengine_data/2,
  649    pengine_detached/2.  650
  651:- thread_local
  652    child/2.                        % ?Name, ?Child
 pengine_register_local(+Id, +Thread, +Queue, +URL, +App, +Destroy) is det
 pengine_register_remote(+Id, +URL, +Queue, +App, +Destroy) is det
 pengine_unregister(+Id) is det
  658pengine_register_local(Id, Thread, Queue, URL, Application, Destroy) :-
  659    asserta(current_pengine(Id, Queue, Thread, URL, Application, Destroy)).
  660
  661pengine_register_remote(Id, URL, Application, Destroy) :-
  662    thread_self(Queue),
  663    asserta(current_pengine(Id, Queue, 0, URL, Application, Destroy)).
 pengine_unregister(+Id)
Called by the pengine thread destruction. If we are a remote pengine thread, our URL equals http and the queue is the message queue used to send events to the HTTP workers.
  671pengine_unregister(Id) :-
  672    thread_self(Me),
  673    (   current_pengine(Id, Queue, Me, http, _, _)
  674    ->  with_mutex(pengine, sync_destroy_queue_from_pengine(Id, Queue))
  675    ;   true
  676    ),
  677    retractall(current_pengine(Id, _, Me, _, _, _)),
  678    retractall(pengine_user(Id, _)),
  679    retractall(pengine_data(Id, _)).
  680
  681pengine_unregister_remote(Id) :-
  682    retractall(current_pengine(Id, _Parent, 0, _, _, _)).
 pengine_self(-Id) is det
True if the current thread is a pengine with Id.
  688pengine_self(Id) :-
  689    thread_self(Thread),
  690    current_pengine(Id, _Parent, Thread, _URL, _Application, _Destroy).
  691
  692pengine_parent(Parent) :-
  693    nb_getval(pengine_parent, Parent).
  694
  695pengine_thread(Pengine, Thread) :-
  696    current_pengine(Pengine, _Parent, Thread, _URL, _Application, _Destroy),
  697    Thread \== 0,
  698    !.
  699
  700pengine_remote(Pengine, URL) :-
  701    current_pengine(Pengine, _Parent, 0, URL, _Application, _Destroy).
  702
  703get_pengine_application(Pengine, Application) :-
  704    current_pengine(Pengine, _Parent, _, _URL, Application, _Destroy),
  705    !.
  706
  707get_pengine_module(Pengine, Pengine).
  708
  709:- if(current_predicate(uuid/2)).  710pengine_uuid(Id) :-
  711    uuid(Id, [version(4)]).             % Version 4 is random.
  712:- else.  713pengine_uuid(Id) :-
  714    (   current_prolog_flag(max_integer, Max1)
  715    ->  Max is Max1-1
  716    ;   Max is 1<<128
  717    ),
  718    random_between(0, Max, Num),
  719    atom_number(Id, Num).
  720:- endif.
 protect_pengine(+Id, :Goal) is semidet
Run Goal while protecting the Pengine Id from being destroyed. Used by the HTTP I/O routines to avoid that the Pengine's module disappears while I/O is in progress. We use a pool of locks because the lock may be held relatively long by output routines.

This also runs Goal if the Pengine no longer exists. This deals with Pengines terminated through destroy_or_continue/1.

bug
- After destroy_or_continue/1 takes the destroy route, the module may drop-out at any point in time, resulting in a possible crash. Seems the only safe way out is to do (de)serialization inside the Pengine.
  737:- meta_predicate protect_pengine(+, 0).  738
  739protect_pengine(Id, Goal) :-
  740    term_hash(Id, Hash),
  741    LockN is Hash mod 64,
  742    atom_concat(pengine_done_, LockN, Lock),
  743    with_mutex(Lock,
  744               (   pengine_thread(Id, _)
  745               ->  Goal
  746               ;   Goal
  747               )).
 pengine_application(+Application) is det
Directive that must be used to declare a pengine application module. The module must not be associated to any file. The default application is pengine_sandbox. The example below creates a new application address_book and imports the API defined in the module file adress_book_api.pl into the application.
:- pengine_application(address_book).
:- use_module(address_book:adress_book_api).

*/

  764pengine_application(Application) :-
  765    throw(error(context_error(nodirective,
  766                             pengine_application(Application)), _)).
  767
  768:- multifile
  769    system:term_expansion/2,
  770    current_application/1.
 current_pengine_application(?Application) is nondet
True when Application is a currently defined application.
See also
- pengine_application/1
  778current_pengine_application(Application) :-
  779    current_application(Application).
  780
  781
  782% Default settings for all applications
  783
  784:- setting(thread_pool_size, integer, 100,
  785           'Maximum number of pengines this application can run.').  786:- setting(thread_pool_stacks, list(compound), [],
  787           'Maximum stack sizes for pengines this application can run.').  788:- setting(slave_limit, integer, 3,
  789           'Maximum number of slave pengines a master pengine can create.').  790:- setting(time_limit, number, 300,
  791           'Maximum time to wait for output').  792:- setting(idle_limit, number, 300,
  793           'Pengine auto-destroys when idle for this time').  794:- setting(safe_goal_limit, number, 10,
  795           'Maximum time to try proving safety of the goal').  796:- setting(program_space, integer, 100_000_000,
  797           'Maximum memory used by predicates').  798:- setting(allow_from, list(atom), [*],
  799           'IP addresses from which remotes are allowed to connect').  800:- setting(deny_from, list(atom), [],
  801           'IP addresses from which remotes are NOT allowed to connect').  802:- setting(debug_info, boolean, false,
  803           'Keep information to support source-level debugging').  804
  805
  806system:term_expansion((:- pengine_application(Application)), Expanded) :-
  807    must_be(atom, Application),
  808    (   module_property(Application, file(_))
  809    ->  permission_error(create, pengine_application, Application)
  810    ;   true
  811    ),
  812    expand_term((:- setting(Application:thread_pool_size, integer,
  813                            setting(pengines:thread_pool_size),
  814                            'Maximum number of pengines this \c
  815                            application can run.')),
  816                ThreadPoolSizeSetting),
  817    expand_term((:- setting(Application:thread_pool_stacks, list(compound),
  818                            setting(pengines:thread_pool_stacks),
  819                            'Maximum stack sizes for pengines \c
  820                            this application can run.')),
  821                ThreadPoolStacksSetting),
  822    expand_term((:- setting(Application:slave_limit, integer,
  823                            setting(pengines:slave_limit),
  824                            'Maximum number of local slave pengines \c
  825                            a master pengine can create.')),
  826                SlaveLimitSetting),
  827    expand_term((:- setting(Application:time_limit, number,
  828                            setting(pengines:time_limit),
  829                            'Maximum time to wait for output')),
  830                TimeLimitSetting),
  831    expand_term((:- setting(Application:idle_limit, number,
  832                            setting(pengines:idle_limit),
  833                            'Pengine auto-destroys when idle for this time')),
  834                IdleLimitSetting),
  835    expand_term((:- setting(Application:safe_goal_limit, number,
  836                            setting(pengines:safe_goal_limit),
  837                            'Maximum time to try proving safety of the goal')),
  838                SafeGoalLimitSetting),
  839    expand_term((:- setting(Application:program_space, integer,
  840                            setting(pengines:program_space),
  841                            'Maximum memory used by predicates')),
  842                ProgramSpaceSetting),
  843    expand_term((:- setting(Application:allow_from, list(atom),
  844                            setting(pengines:allow_from),
  845                            'IP addresses from which remotes are allowed \c
  846                            to connect')),
  847                AllowFromSetting),
  848    expand_term((:- setting(Application:deny_from, list(atom),
  849                            setting(pengines:deny_from),
  850                            'IP addresses from which remotes are NOT \c
  851                            allowed to connect')),
  852                DenyFromSetting),
  853    expand_term((:- setting(Application:debug_info, boolean,
  854                            setting(pengines:debug_info),
  855                            'Keep information to support source-level \c
  856                            debugging')),
  857                DebugInfoSetting),
  858    flatten([ pengines:current_application(Application),
  859              ThreadPoolSizeSetting,
  860              ThreadPoolStacksSetting,
  861              SlaveLimitSetting,
  862              TimeLimitSetting,
  863              IdleLimitSetting,
  864              SafeGoalLimitSetting,
  865              ProgramSpaceSetting,
  866              AllowFromSetting,
  867              DenyFromSetting,
  868              DebugInfoSetting
  869            ], Expanded).
  870
  871% Register default application
  872
  873:- pengine_application(pengine_sandbox).
 pengine_property(?Pengine, ?Property) is nondet
True when Property is a property of the given Pengine. Enumerates all pengines that are known to the calling Prolog process. Defined properties are:
self(ID)
Identifier of the pengine. This is the same as the first argument, and can be used to enumerate all known pengines.
alias(Name)
Name is the alias name of the pengine, as provided through the alias option when creating the pengine.
thread(Thread)
If the pengine is a local pengine, Thread is the Prolog thread identifier of the pengine.
remote(Server)
If the pengine is remote, the URL of the server.
application(Application)
Pengine runs the given application
module(Module)
Temporary module used for running the Pengine.
destroy(Destroy)
Destroy is true if the pengines is destroyed automatically after completing the query.
parent(Queue)
Message queue to which the (local) pengine reports.
source(?SourceID, ?Source)
Source is the source code with the given SourceID. May be present if the setting debug_info is present.
detached(?Time)
Pengine was detached at Time. */
  910pengine_property(Id, Prop) :-
  911    nonvar(Id), nonvar(Prop),
  912    pengine_property2(Id, Prop),
  913    !.
  914pengine_property(Id, Prop) :-
  915    pengine_property2(Prop, Id).
  916
  917pengine_property2(self(Id), Id) :-
  918    current_pengine(Id, _Parent, _Thread, _URL, _Application, _Destroy).
  919pengine_property2(module(Id), Id) :-
  920    current_pengine(Id, _Parent, _Thread, _URL, _Application, _Destroy).
  921pengine_property2(alias(Alias), Id) :-
  922    child(Alias, Id),
  923    Alias \== Id.
  924pengine_property2(thread(Thread), Id) :-
  925    current_pengine(Id, _Parent, Thread, _URL, _Application, _Destroy),
  926    Thread \== 0.
  927pengine_property2(remote(Server), Id) :-
  928    current_pengine(Id, _Parent, 0, Server, _Application, _Destroy).
  929pengine_property2(application(Application), Id) :-
  930    current_pengine(Id, _Parent, _Thread, _Server, Application, _Destroy).
  931pengine_property2(destroy(Destroy), Id) :-
  932    current_pengine(Id, _Parent, _Thread, _Server, _Application, Destroy).
  933pengine_property2(parent(Parent), Id) :-
  934    current_pengine(Id, Parent, _Thread, _URL, _Application, _Destroy).
  935pengine_property2(source(SourceID, Source), Id) :-
  936    pengine_data(Id, source(SourceID, Source)).
  937pengine_property2(detached(When), Id) :-
  938    pengine_detached(Id, When).
 pengine_output(+Term) is det
Sends Term to the parent pengine or thread. */
  945pengine_output(Term) :-
  946    pengine_self(Me),
  947    pengine_reply(output(Me, Term)).
 pengine_debug(+Format, +Args) is det
Create a message using format/3 from Format and Args and send this to the client. The default JavaScript client will call console.log(Message) if there is a console. The predicate pengine_rpc/3 calls debug(pengine(debug), '~w', [Message]). The debug topic pengine(debug) is enabled by default.
See also
- debug/1 and nodebug/1 for controlling the pengine(debug) topic
- format/2 for format specifications */
  962pengine_debug(Format, Args) :-
  963    pengine_parent(Queue),
  964    pengine_self(Self),
  965    catch(safe_goal(format(atom(_), Format, Args)), E, true),
  966    (   var(E)
  967    ->  format(atom(Message), Format, Args)
  968    ;   message_to_string(E, Message)
  969    ),
  970    pengine_reply(Queue, debug(Self, Message)).
  971
  972
  973/*================= Local pengine =======================
  974*/
 local_pengine_create(+Options)
Creates a local Pengine, which is a thread running pengine_main/2. It maintains two predicates:
  985local_pengine_create(Options) :-
  986    thread_self(Self),
  987    option(application(Application), Options, pengine_sandbox),
  988    create(Self, Child, Options, local, Application),
  989    option(alias(Name), Options, Child),
  990    assert(child(Name, Child)).
 thread_pool:create_pool(+Application) is det
On demand creation of a thread pool for a pengine application.
  997:- multifile thread_pool:create_pool/1.  998
  999thread_pool:create_pool(Application) :-
 1000    current_application(Application),
 1001    setting(Application:thread_pool_size, Size),
 1002    setting(Application:thread_pool_stacks, Stacks),
 1003    thread_pool_create(Application, Size, Stacks).
 create(+Queue, -Child, +Options, +URL, +Application) is det
Create a new pengine thread.
Arguments:
Queue- is the queue (or thread handle) to report to
Child- is the identifier of the created pengine.
URL- is one of local or http
 1013create(Queue, Child, Options, local, Application) :-
 1014    !,
 1015    pengine_child_id(Child),
 1016    create0(Queue, Child, Options, local, Application).
 1017create(Queue, Child, Options, URL, Application) :-
 1018    pengine_child_id(Child),
 1019    catch(create0(Queue, Child, Options, URL, Application),
 1020          Error,
 1021          create_error(Queue, Child, Error)).
 1022
 1023pengine_child_id(Child) :-
 1024    (   nonvar(Child)
 1025    ->  true
 1026    ;   pengine_uuid(Child)
 1027    ).
 1028
 1029create_error(Queue, Child, Error) :-
 1030    pengine_reply(Queue, error(Child, Error)).
 1031
 1032create0(Queue, Child, Options, URL, Application) :-
 1033    (  current_application(Application)
 1034    -> true
 1035    ;  existence_error(pengine_application, Application)
 1036    ),
 1037    (   URL \== http                    % pengine is _not_ a child of the
 1038                                        % HTTP server thread
 1039    ->  aggregate_all(count, child(_,_), Count),
 1040        setting(Application:slave_limit, Max),
 1041        (   Count >= Max
 1042        ->  throw(error(resource_error(max_pengines), _))
 1043        ;   true
 1044        )
 1045    ;   true
 1046    ),
 1047    partition(pengine_create_option, Options, PengineOptions, RestOptions),
 1048    thread_create_in_pool(
 1049        Application,
 1050        pengine_main(Queue, PengineOptions, Application), ChildThread,
 1051        [ at_exit(pengine_done)
 1052        | RestOptions
 1053        ]),
 1054    option(destroy(Destroy), PengineOptions, true),
 1055    pengine_register_local(Child, ChildThread, Queue, URL, Application, Destroy),
 1056    thread_send_message(ChildThread, pengine_registered(Child)),
 1057    (   option(id(Id), Options)
 1058    ->  Id = Child
 1059    ;   true
 1060    ).
 1061
 1062pengine_create_option(src_text(_)).
 1063pengine_create_option(src_url(_)).
 1064pengine_create_option(application(_)).
 1065pengine_create_option(destroy(_)).
 1066pengine_create_option(ask(_)).
 1067pengine_create_option(template(_)).
 1068pengine_create_option(bindings(_)).
 1069pengine_create_option(chunk(_)).
 1070pengine_create_option(alias(_)).
 1071pengine_create_option(user(_)).
 pengine_done is det
Called from the pengine thread at_exit option. Destroys child pengines using pengine_destroy/1. Cleaning up the Pengine is synchronised by the pengine_done mutex. See read_event/6.
 1080:- public
 1081    pengine_done/0. 1082
 1083pengine_done :-
 1084    thread_self(Me),
 1085    (   thread_property(Me, status(exception('$aborted'))),
 1086        thread_detach(Me),
 1087        pengine_self(Pengine)
 1088    ->  catch(pengine_reply(destroy(Pengine, abort(Pengine))),
 1089              error(_,_), true)
 1090    ;   true
 1091    ),
 1092    forall(child(_Name, Child),
 1093           pengine_destroy(Child)),
 1094    pengine_self(Id),
 1095    protect_pengine(Id, pengine_unregister(Id)).
 pengine_main(+Parent, +Options, +Application)
Run a pengine main loop. First acknowledges its creation and run pengine_main_loop/1.
 1103:- thread_local wrap_first_answer_in_create_event/2. 1104
 1105:- meta_predicate
 1106    pengine_prepare_source(:, +). 1107
 1108pengine_main(Parent, Options, Application) :-
 1109    fix_streams,
 1110    thread_get_message(pengine_registered(Self)),
 1111    nb_setval(pengine_parent, Parent),
 1112    pengine_register_user(Options),
 1113    set_prolog_flag(mitigate_spectre, true),
 1114    catch(in_temporary_module(
 1115              Self,
 1116              pengine_prepare_source(Application, Options),
 1117              pengine_create_and_loop(Self, Application, Options)),
 1118          prepare_source_failed,
 1119          pengine_terminate(Self)).
 1120
 1121pengine_create_and_loop(Self, Application, Options) :-
 1122    setting(Application:slave_limit, SlaveLimit),
 1123    CreateEvent = create(Self, [slave_limit(SlaveLimit)|Extra]),
 1124    (   option(ask(Query0), Options)
 1125    ->  asserta(wrap_first_answer_in_create_event(CreateEvent, Extra)),
 1126        (   string(Query0)                      % string is not callable
 1127        ->  (   option(template(TemplateS), Options)
 1128            ->  Ask2 = Query0-TemplateS
 1129            ;   Ask2 = Query0
 1130            ),
 1131            catch(ask_to_term(Ask2, Self, Query, Template, Bindings),
 1132                  Error, true),
 1133            (   var(Error)
 1134            ->  true
 1135            ;   send_error(Error),
 1136                throw(prepare_source_failed)
 1137            )
 1138        ;   Query = Query0,
 1139            option(template(Template), Options, Query),
 1140            option(bindings(Bindings), Options, [])
 1141        ),
 1142        option(chunk(Chunk), Options, 1),
 1143        pengine_ask(Self, Query,
 1144                    [ template(Template),
 1145                      chunk(Chunk),
 1146                      bindings(Bindings)
 1147                    ])
 1148    ;   Extra = [],
 1149        pengine_reply(CreateEvent)
 1150    ),
 1151    pengine_main_loop(Self).
 ask_to_term(+AskSpec, +Module, -Options, OptionsTail) is det
Translate the AskSpec into a query, template and bindings. The trick is that we must parse using the operator declarations of the source and we must make sure variable sharing between query and answer template are known.
 1161ask_to_term(Ask-Template, Module, Ask1, Template1, Bindings) :-
 1162    !,
 1163    format(string(AskTemplate), 't((~s),(~s))', [Template, Ask]),
 1164    term_string(t(Template1,Ask1), AskTemplate,
 1165                [ variable_names(Bindings0),
 1166                  module(Module)
 1167                ]),
 1168    phrase(template_bindings(Template1, Bindings0), Bindings).
 1169ask_to_term(Ask, Module, Ask1, Template, Bindings1) :-
 1170    term_string(Ask1, Ask,
 1171                [ variable_names(Bindings),
 1172                  module(Module)
 1173                ]),
 1174    exclude(anon, Bindings, Bindings1),
 1175    dict_create(Template, swish_default_template, Bindings1).
 1176
 1177template_bindings(Var, Bindings) -->
 1178    { var(Var) }, !,
 1179    (   { var_binding(Bindings, Var, Binding)
 1180        }
 1181    ->  [Binding]
 1182    ;   []
 1183    ).
 1184template_bindings([H|T], Bindings) -->
 1185    !,
 1186    template_bindings(H, Bindings),
 1187    template_bindings(T, Bindings).
 1188template_bindings(Compoound, Bindings) -->
 1189    { compound(Compoound), !,
 1190      compound_name_arguments(Compoound, _, Args)
 1191    },
 1192    template_bindings(Args, Bindings).
 1193template_bindings(_, _) --> [].
 1194
 1195var_binding(Bindings, Var, Binding) :-
 1196    member(Binding, Bindings),
 1197    arg(2, Binding, V),
 1198    V == Var, !.
 fix_streams is det
If we are a pengine that is created from a web server thread, the current output points to a CGI stream.
 1205fix_streams :-
 1206    fix_stream(current_output).
 1207
 1208fix_stream(Name) :-
 1209    is_cgi_stream(Name),
 1210    !,
 1211    debug(pengine(stream), '~w is a CGI stream!', [Name]),
 1212    set_stream(user_output, alias(Name)).
 1213fix_stream(_).
 pengine_prepare_source(:Application, +Options) is det
Load the source into the pengine's module.
throws
- prepare_source_failed if it failed to prepare the sources.
 1222pengine_prepare_source(Module:Application, Options) :-
 1223    setting(Application:program_space, SpaceLimit),
 1224    set_module(Module:program_space(SpaceLimit)),
 1225    delete_import_module(Module, user),
 1226    add_import_module(Module, Application, start),
 1227    catch(prep_module(Module, Application, Options), Error, true),
 1228    (   var(Error)
 1229    ->  true
 1230    ;   send_error(Error),
 1231        throw(prepare_source_failed)
 1232    ).
 1233
 1234prep_module(Module, Application, Options) :-
 1235    maplist(copy_flag(Module, Application), [var_prefix]),
 1236    forall(prepare_module(Module, Application, Options), true),
 1237    setup_call_cleanup(
 1238        '$set_source_module'(OldModule, Module),
 1239        maplist(process_create_option(Module), Options),
 1240        '$set_source_module'(OldModule)).
 1241
 1242copy_flag(Module, Application, Flag) :-
 1243    current_prolog_flag(Application:Flag, Value),
 1244    !,
 1245    set_prolog_flag(Module:Flag, Value).
 1246copy_flag(_, _, _).
 1247
 1248process_create_option(Application, src_text(Text)) :-
 1249    !,
 1250    pengine_src_text(Text, Application).
 1251process_create_option(Application, src_url(URL)) :-
 1252    !,
 1253    pengine_src_url(URL, Application).
 1254process_create_option(_, _).
 prepare_module(+Module, +Application, +Options) is semidet
Hook, called to initialize the temporary private module that provides the working context of a pengine. This hook is executed by the pengine's thread. Preparing the source consists of three steps:
  1. Add Application as (first) default import module for Module
  2. Call this hook
  3. Compile the source provided by the the src_text and src_url options
Arguments:
Module- is a new temporary module (see in_temporary_module/3) that may be (further) prepared by this hook.
Application- (also a module) associated to the pengine.
Options- is passed from the environment and should (currently) be ignored.
 1277pengine_main_loop(ID) :-
 1278    catch(guarded_main_loop(ID), abort_query, pengine_aborted(ID)).
 1279
 1280pengine_aborted(ID) :-
 1281    thread_self(Self),
 1282    debug(pengine(abort), 'Aborting ~p (thread ~p)', [ID, Self]),
 1283    empty_queue,
 1284    destroy_or_continue(abort(ID)).
 guarded_main_loop(+Pengine) is det
Executes state `2' of the pengine, where it waits for two events:
destroy
Terminate the pengine
ask(:Goal, +Options)
Solve Goal.
 1297guarded_main_loop(ID) :-
 1298    pengine_request(Request),
 1299    (   Request = destroy
 1300    ->  debug(pengine(transition), '~q: 2 = ~q => 1', [ID, destroy]),
 1301        pengine_terminate(ID)
 1302    ;   Request = ask(Goal, Options)
 1303    ->  debug(pengine(transition), '~q: 2 = ~q => 3', [ID, ask(Goal)]),
 1304        ask(ID, Goal, Options)
 1305    ;   debug(pengine(transition), '~q: 2 = ~q => 2', [ID, protocol_error]),
 1306        pengine_reply(error(ID, error(protocol_error, _))),
 1307        guarded_main_loop(ID)
 1308    ).
 1309
 1310
 1311pengine_terminate(ID) :-
 1312    pengine_reply(destroy(ID)),
 1313    thread_self(Me),            % Make the thread silently disappear
 1314    thread_detach(Me).
 solve(+Chunk, +Template, :Goal, +ID) is det
Solve Goal. Note that because we can ask for a new goal in state `6', we must provide for an ancesteral cut (prolog_cut_to/1). We need to be sure to have a choice point before we can call prolog_current_choice/1. This is the reason why this predicate has two clauses.
 1325solve(Chunk, Template, Goal, ID) :-
 1326    prolog_current_choice(Choice),
 1327    State = count(Chunk),
 1328    statistics(cputime, Epoch),
 1329    Time = time(Epoch),
 1330    nb_current('$variable_names', Bindings),
 1331    filter_template(Template, Bindings, Template2),
 1332    '$current_typein_module'(CurrTypeIn),
 1333    (   '$set_typein_module'(ID),
 1334        call_cleanup(catch(findnsols_no_empty(State, Template2,
 1335                                              set_projection(Goal, Bindings),
 1336                                              Result),
 1337                           Error, true),
 1338                     query_done(Det, CurrTypeIn)),
 1339        arg(1, Time, T0),
 1340        statistics(cputime, T1),
 1341        CPUTime is T1-T0,
 1342        (   var(Error)
 1343        ->  projection(Projection),
 1344            (   var(Det)
 1345            ->  pengine_reply(success(ID, Result, Projection,
 1346                                      CPUTime, true)),
 1347                more_solutions(ID, Choice, State, Time)
 1348            ;   !,                      % commit
 1349                destroy_or_continue(success(ID, Result, Projection,
 1350                                            CPUTime, false))
 1351            )
 1352        ;   !,                          % commit
 1353            (   Error == abort_query
 1354            ->  throw(Error)
 1355            ;   destroy_or_continue(error(ID, Error))
 1356            )
 1357        )
 1358    ;   !,                              % commit
 1359        arg(1, Time, T0),
 1360        statistics(cputime, T1),
 1361        CPUTime is T1-T0,
 1362        destroy_or_continue(failure(ID, CPUTime))
 1363    ).
 1364solve(_, _, _, _).                      % leave a choice point
 1365
 1366query_done(true, CurrTypeIn) :-
 1367    '$set_typein_module'(CurrTypeIn).
 set_projection(:Goal, +Bindings)
findnsols/4 copies its goal and template to avoid instantiation thereof when it stops after finding N solutions. Using this helper we can a renamed version of Bindings that we can set.
 1376set_projection(Goal, Bindings) :-
 1377    b_setval('$variable_names', Bindings),
 1378    call(Goal).
 1379
 1380projection(Projection) :-
 1381    nb_current('$variable_names', Bindings),
 1382    !,
 1383    maplist(var_name, Bindings, Projection).
 1384projection([]).
 filter_template(+Template0, +Bindings, -Template) is det
Establish the final template. This is there because hooks such as goal_expansion/2 and the SWISH query hooks can modify the set of bindings.
bug
- Projection and template handling is pretty messy.
 1394filter_template(Template0, Bindings, Template) :-
 1395    is_dict(Template0, swish_default_template),
 1396    !,
 1397    dict_create(Template, swish_default_template, Bindings).
 1398filter_template(Template, _Bindings, Template).
 1399
 1400findnsols_no_empty(N, Template, Goal, List) :-
 1401    findnsols(N, Template, Goal, List),
 1402    List \== [].
 1403
 1404destroy_or_continue(Event) :-
 1405    arg(1, Event, ID),
 1406    (   pengine_property(ID, destroy(true))
 1407    ->  thread_self(Me),
 1408        thread_detach(Me),
 1409        pengine_reply(destroy(ID, Event))
 1410    ;   pengine_reply(Event),
 1411        garbage_collect,                % minimise our footprint
 1412        trim_stacks,
 1413        guarded_main_loop(ID)
 1414    ).
 more_solutions(+Pengine, +Choice, +State, +Time)
Called after a solution was found while there can be more. This is state `6' of the state machine. It processes these events:
stop
Go back via state `7' to state `2' (guarded_main_loop/1)
next
Fail. This causes solve/3 to backtrack on the goal asked, providing at most the current chunk solutions.
next(Count)
As next, but sets the new chunk-size to Count.
ask(Goal, Options)
Ask another goal. Note that we must commit the choice point of the previous goal asked for.
 1432more_solutions(ID, Choice, State, Time) :-
 1433    pengine_request(Event),
 1434    more_solutions(Event, ID, Choice, State, Time).
 1435
 1436more_solutions(stop, ID, _Choice, _State, _Time) :-
 1437    !,
 1438    debug(pengine(transition), '~q: 6 = ~q => 7', [ID, stop]),
 1439    destroy_or_continue(stop(ID)).
 1440more_solutions(next, ID, _Choice, _State, Time) :-
 1441    !,
 1442    debug(pengine(transition), '~q: 6 = ~q => 3', [ID, next]),
 1443    statistics(cputime, T0),
 1444    nb_setarg(1, Time, T0),
 1445    fail.
 1446more_solutions(next(Count), ID, _Choice, State, Time) :-
 1447    Count > 0,
 1448    !,
 1449    debug(pengine(transition), '~q: 6 = ~q => 3', [ID, next(Count)]),
 1450    nb_setarg(1, State, Count),
 1451    statistics(cputime, T0),
 1452    nb_setarg(1, Time, T0),
 1453    fail.
 1454more_solutions(ask(Goal, Options), ID, Choice, _State, _Time) :-
 1455    !,
 1456    debug(pengine(transition), '~q: 6 = ~q => 3', [ID, ask(Goal)]),
 1457    prolog_cut_to(Choice),
 1458    ask(ID, Goal, Options).
 1459more_solutions(destroy, ID, _Choice, _State, _Time) :-
 1460    !,
 1461    debug(pengine(transition), '~q: 6 = ~q => 1', [ID, destroy]),
 1462    pengine_terminate(ID).
 1463more_solutions(Event, ID, Choice, State, Time) :-
 1464    debug(pengine(transition), '~q: 6 = ~q => 6', [ID, protocol_error(Event)]),
 1465    pengine_reply(error(ID, error(protocol_error, _))),
 1466    more_solutions(ID, Choice, State, Time).
 ask(+Pengine, :Goal, +Options)
Migrate from state `2' to `3'. This predicate validates that it is safe to call Goal using safe_goal/1 and then calls solve/3 to prove the goal. It takes care of the chunk(N) option.
 1474ask(ID, Goal, Options) :-
 1475    catch(prepare_goal(ID, Goal, Goal1, Options), Error, true),
 1476    !,
 1477    (   var(Error)
 1478    ->  option(template(Template), Options, Goal),
 1479        option(chunk(N), Options, 1),
 1480        solve(N, Template, Goal1, ID)
 1481    ;   pengine_reply(error(ID, Error)),
 1482        guarded_main_loop(ID)
 1483    ).
 prepare_goal(+Pengine, +GoalIn, -GoalOut, +Options) is det
Prepare GoalIn for execution in Pengine. This implies we must perform goal expansion and, if the system is sandboxed, check the sandbox.

Note that expand_goal(Module:GoalIn, GoalOut) is what we'd like to write, but this does not work correctly if the user wishes to expand X:Y while interpreting X not as the module in which to run Y. This happens in the CQL package. Possibly we should disallow this reinterpretation?

 1497prepare_goal(ID, Goal0, Module:Goal, Options) :-
 1498    option(bindings(Bindings), Options, []),
 1499    b_setval('$variable_names', Bindings),
 1500    (   prepare_goal(Goal0, Goal1, Options)
 1501    ->  true
 1502    ;   Goal1 = Goal0
 1503    ),
 1504    get_pengine_module(ID, Module),
 1505    setup_call_cleanup(
 1506        '$set_source_module'(Old, Module),
 1507        expand_goal(Goal1, Goal),
 1508        '$set_source_module'(_, Old)),
 1509    (   pengine_not_sandboxed(ID)
 1510    ->  true
 1511    ;   get_pengine_application(ID, App),
 1512        setting(App:safe_goal_limit, Limit),
 1513        catch(call_with_time_limit(
 1514                  Limit,
 1515                  safe_goal(Module:Goal)), E, true)
 1516    ->  (   var(E)
 1517        ->  true
 1518        ;   E = time_limit_exceeded
 1519        ->  throw(error(sandbox(time_limit_exceeded, Limit),_))
 1520        ;   throw(E)
 1521        )
 1522    ).
 prepare_goal(+Goal0, -Goal1, +Options) is semidet
Pre-preparation hook for running Goal0. The hook runs in the context of the pengine. Goal is the raw goal given to ask. The returned Goal1 is subject to goal expansion (expand_goal/2) and sandbox validation (safe_goal/1) prior to execution. If this goal fails, Goal0 is used for further processing.
Arguments:
Options- provides the options as given to ask
 pengine_not_sandboxed(+Pengine) is semidet
True when pengine does not operate in sandboxed mode. This implies a user must be registered by authentication_hook/3 and the hook pengines:not_sandboxed(User, Application) must succeed.
 1542pengine_not_sandboxed(ID) :-
 1543    pengine_user(ID, User),
 1544    pengine_property(ID, application(App)),
 1545    not_sandboxed(User, App),
 1546    !.
 not_sandboxed(+User, +Application) is semidet
This hook is called to see whether the Pengine must be executed in a protected environment. It is only called after authentication_hook/3 has confirmed the authentity of the current user. If this hook succeeds, both loading the code and executing the query is executed without enforcing sandbox security. Typically, one should:
  1. Provide a safe user authentication hook.
  2. Enable HTTPS in the server or put it behind an HTTPS proxy and ensure that the network between the proxy and the pengine server can be trusted.
 pengine_pull_response(+Pengine, +Options) is det
Pulls a response (an event term) from the slave Pengine if Pengine is a remote process, else does nothing at all. */
 1568pengine_pull_response(Pengine, Options) :-
 1569    pengine_remote(Pengine, Server),
 1570    !,
 1571    remote_pengine_pull_response(Server, Pengine, Options).
 1572pengine_pull_response(_ID, _Options).
 pengine_input(+Prompt, -Term) is det
Sends Prompt to the master (parent) pengine and waits for input. Note that Prompt may be any term, compound as well as atomic. */
 1581pengine_input(Prompt, Term) :-
 1582    pengine_self(Self),
 1583    pengine_parent(Parent),
 1584    pengine_reply(Parent, prompt(Self, Prompt)),
 1585    pengine_request(Request),
 1586    (   Request = input(Input)
 1587    ->  Term = Input
 1588    ;   Request == destroy
 1589    ->  abort
 1590    ;   throw(error(protocol_error,_))
 1591    ).
 pengine_respond(+Pengine, +Input, +Options) is det
Sends a response in the form of the term Input to a slave (child) pengine that has prompted its master (parent) for input.

Defined in terms of pengine_send/3, as follows:

pengine_respond(Pengine, Input, Options) :-
    pengine_send(Pengine, input(Input), Options).

*/

 1608pengine_respond(Pengine, Input, Options) :-
 1609    pengine_send(Pengine, input(Input), Options).
 send_error(+Error) is det
Send an error to my parent. Remove non-readable blobs from the error term first using replace_blobs/2. If the error contains a stack-trace, this is resolved to a string before sending.
 1618send_error(error(Formal, context(prolog_stack(Frames), Message))) :-
 1619    is_list(Frames),
 1620    !,
 1621    with_output_to(string(Stack),
 1622                   print_prolog_backtrace(current_output, Frames)),
 1623    pengine_self(Self),
 1624    replace_blobs(Formal, Formal1),
 1625    replace_blobs(Message, Message1),
 1626    pengine_reply(error(Self, error(Formal1,
 1627                                    context(prolog_stack(Stack), Message1)))).
 1628send_error(Error) :-
 1629    pengine_self(Self),
 1630    replace_blobs(Error, Error1),
 1631    pengine_reply(error(Self, Error1)).
 replace_blobs(Term0, Term) is det
Copy Term0 to Term, replacing non-text blobs. This is required for error messages that may hold streams and other handles to non-readable objects.
 1639replace_blobs(Blob, Atom) :-
 1640    blob(Blob, Type), Type \== text,
 1641    !,
 1642    format(atom(Atom), '~p', [Blob]).
 1643replace_blobs(Term0, Term) :-
 1644    compound(Term0),
 1645    !,
 1646    compound_name_arguments(Term0, Name, Args0),
 1647    maplist(replace_blobs, Args0, Args),
 1648    compound_name_arguments(Term, Name, Args).
 1649replace_blobs(Term, Term).
 1650
 1651
 1652/*================= Remote pengines =======================
 1653*/
 1654
 1655
 1656remote_pengine_create(BaseURL, Options) :-
 1657    partition(pengine_create_option, Options, PengineOptions0, RestOptions),
 1658        (       option(ask(Query), PengineOptions0),
 1659                \+ option(template(_Template), PengineOptions0)
 1660        ->      PengineOptions = [template(Query)|PengineOptions0]
 1661        ;       PengineOptions = PengineOptions0
 1662        ),
 1663    options_to_dict(PengineOptions, PostData),
 1664    remote_post_rec(BaseURL, create, PostData, Reply, RestOptions),
 1665    arg(1, Reply, ID),
 1666    (   option(id(ID2), Options)
 1667    ->  ID = ID2
 1668    ;   true
 1669    ),
 1670    option(alias(Name), Options, ID),
 1671    assert(child(Name, ID)),
 1672    (   (   functor(Reply, create, _)   % actually created
 1673        ;   functor(Reply, output, _)   % compiler messages
 1674        )
 1675    ->  option(application(Application), PengineOptions, pengine_sandbox),
 1676        option(destroy(Destroy), PengineOptions, true),
 1677        pengine_register_remote(ID, BaseURL, Application, Destroy)
 1678    ;   true
 1679    ),
 1680    thread_self(Queue),
 1681    pengine_reply(Queue, Reply).
 1682
 1683options_to_dict(Options, Dict) :-
 1684    select_option(ask(Ask), Options, Options1),
 1685    select_option(template(Template), Options1, Options2),
 1686    !,
 1687    no_numbered_var_in(Ask+Template),
 1688    findall(AskString-TemplateString,
 1689            ask_template_to_strings(Ask, Template, AskString, TemplateString),
 1690            [ AskString-TemplateString ]),
 1691    options_to_dict(Options2, Dict0),
 1692    Dict = Dict0.put(_{ask:AskString,template:TemplateString}).
 1693options_to_dict(Options, Dict) :-
 1694    maplist(prolog_option, Options, Options1),
 1695    dict_create(Dict, _, Options1).
 1696
 1697no_numbered_var_in(Term) :-
 1698    sub_term(Sub, Term),
 1699    subsumes_term('$VAR'(_), Sub),
 1700    !,
 1701    domain_error(numbered_vars_free_term, Term).
 1702no_numbered_var_in(_).
 1703
 1704ask_template_to_strings(Ask, Template, AskString, TemplateString) :-
 1705    numbervars(Ask+Template, 0, _),
 1706    WOpts = [ numbervars(true), ignore_ops(true), quoted(true) ],
 1707    format(string(AskTemplate), '~W\n~W', [ Ask, WOpts,
 1708                                            Template, WOpts
 1709                                          ]),
 1710    split_string(AskTemplate, "\n", "", [AskString, TemplateString]).
 1711
 1712prolog_option(Option0, Option) :-
 1713    create_option_type(Option0, term),
 1714    !,
 1715    Option0 =.. [Name,Value],
 1716    format(string(String), '~k', [Value]),
 1717    Option =.. [Name,String].
 1718prolog_option(Option, Option).
 1719
 1720create_option_type(ask(_),         term).
 1721create_option_type(template(_),    term).
 1722create_option_type(application(_), atom).
 1723
 1724remote_pengine_send(BaseURL, ID, Event, Options) :-
 1725    remote_send_rec(BaseURL, send, ID, [event=Event], Reply, Options),
 1726    thread_self(Queue),
 1727    pengine_reply(Queue, Reply).
 1728
 1729remote_pengine_pull_response(BaseURL, ID, Options) :-
 1730    remote_send_rec(BaseURL, pull_response, ID, [], Reply, Options),
 1731    thread_self(Queue),
 1732    pengine_reply(Queue, Reply).
 1733
 1734remote_pengine_abort(BaseURL, ID, Options) :-
 1735    remote_send_rec(BaseURL, abort, ID, [], Reply, Options),
 1736    thread_self(Queue),
 1737    pengine_reply(Queue, Reply).
 remote_send_rec(+Server, +Action, +ID, +Params, -Reply, +Options)
Issue a GET request on Server and unify Reply with the replied term.
 1744remote_send_rec(Server, Action, ID, [event=Event], Reply, Options) :-
 1745    !,
 1746    server_url(Server, Action, [id=ID], URL),
 1747    http_open(URL, Stream,              % putting this in setup_call_cleanup/3
 1748              [ post(prolog(Event))     % makes it impossible to interrupt.
 1749              | Options
 1750              ]),
 1751    call_cleanup(
 1752        read_prolog_reply(Stream, Reply),
 1753        close(Stream)).
 1754remote_send_rec(Server, Action, ID, Params, Reply, Options) :-
 1755    server_url(Server, Action, [id=ID|Params], URL),
 1756    http_open(URL, Stream, Options),
 1757    call_cleanup(
 1758        read_prolog_reply(Stream, Reply),
 1759        close(Stream)).
 1760
 1761remote_post_rec(Server, Action, Data, Reply, Options) :-
 1762    server_url(Server, Action, [], URL),
 1763    probe(Action, URL),
 1764    http_open(URL, Stream,
 1765              [ post(json(Data))
 1766              | Options
 1767              ]),
 1768    call_cleanup(
 1769        read_prolog_reply(Stream, Reply),
 1770        close(Stream)).
 probe(+Action, +URL) is det
Probe the target. This is a good idea before posting a large document and be faced with an authentication challenge. Possibly we should make this an option for simpler scenarios.
 1778probe(create, URL) :-
 1779    !,
 1780    http_open(URL, Stream, [method(options)]),
 1781    close(Stream).
 1782probe(_, _).
 1783
 1784read_prolog_reply(In, Reply) :-
 1785    set_stream(In, encoding(utf8)),
 1786    read(In, Reply0),
 1787    rebind_cycles(Reply0, Reply).
 1788
 1789rebind_cycles(@(Reply, Bindings), Reply) :-
 1790    is_list(Bindings),
 1791    !,
 1792    maplist(bind, Bindings).
 1793rebind_cycles(Reply, Reply).
 1794
 1795bind(Var = Value) :-
 1796    Var = Value.
 1797
 1798server_url(Server, Action, Params, URL) :-
 1799    uri_components(Server, Components0),
 1800    uri_query_components(Query, Params),
 1801    uri_data(path, Components0, Path0),
 1802    atom_concat('pengine/', Action, PAction),
 1803    directory_file_path(Path0, PAction, Path),
 1804    uri_data(path, Components0, Path, Components),
 1805    uri_data(search, Components, Query),
 1806    uri_components(URL, Components).
 pengine_event(?EventTerm) is det
 pengine_event(?EventTerm, +Options) is det
Examines the pengine's event queue and if necessary blocks execution until a term that unifies to Term arrives in the queue. After a term from the queue has been unified to Term, the term is deleted from the queue.

Valid options are:

timeout(+Time)
Time is a float or integer and specifies the maximum time to wait in seconds. If no event has arrived before the time is up EventTerm is bound to the atom timeout.
listen(+Id)
Only listen to events from the pengine identified by Id. */
 1827pengine_event(Event) :-
 1828    pengine_event(Event, []).
 1829
 1830pengine_event(Event, Options) :-
 1831    thread_self(Self),
 1832    option(listen(Id), Options, _),
 1833    (   thread_get_message(Self, pengine_event(Id, Event), Options)
 1834    ->  true
 1835    ;   Event = timeout
 1836    ),
 1837    update_remote_destroy(Event).
 1838
 1839update_remote_destroy(Event) :-
 1840    destroy_event(Event),
 1841    arg(1, Event, Id),
 1842    pengine_remote(Id, _Server),
 1843    !,
 1844    pengine_unregister_remote(Id).
 1845update_remote_destroy(_).
 1846
 1847destroy_event(destroy(_)).
 1848destroy_event(destroy(_,_)).
 1849destroy_event(create(_,Features)) :-
 1850    memberchk(answer(Answer), Features),
 1851    !,
 1852    nonvar(Answer),
 1853    destroy_event(Answer).
 pengine_event_loop(:Closure, +Options) is det
Starts an event loop accepting event terms sent to the current pengine or thread. For each such event E, calls ignore(call(Closure, E)). A closure thus acts as a handler for the event. Some events are also treated specially:
create(ID, Term)
The ID is placed in a list of active pengines.
destroy(ID)
The ID is removed from the list of active pengines. When the last pengine ID is removed, the loop terminates.
output(ID, Term)
The predicate pengine_pull_response/2 is called.

Valid options are:

autoforward(+To)
Forwards received event terms to slaves. To is either all, all_but_sender or a Prolog list of NameOrIDs. [not yet implemented]

*/

 1882pengine_event_loop(Closure, Options) :-
 1883    child(_,_),
 1884    !,
 1885    pengine_event(Event),
 1886    (   option(autoforward(all), Options) % TODO: Implement all_but_sender and list of IDs
 1887    ->  forall(child(_,ID), pengine_send(ID, Event))
 1888    ;   true
 1889    ),
 1890    pengine_event_loop(Event, Closure, Options).
 1891pengine_event_loop(_, _).
 1892
 1893:- meta_predicate
 1894    pengine_process_event(+, 1, -, +). 1895
 1896pengine_event_loop(Event, Closure, Options) :-
 1897    pengine_process_event(Event, Closure, Continue, Options),
 1898    (   Continue == true
 1899    ->  pengine_event_loop(Closure, Options)
 1900    ;   true
 1901    ).
 1902
 1903pengine_process_event(create(ID, T), Closure, Continue, Options) :-
 1904    debug(pengine(transition), '~q: 1 = /~q => 2', [ID, create(T)]),
 1905    (   select(answer(First), T, T1)
 1906    ->  ignore(call(Closure, create(ID, T1))),
 1907        pengine_process_event(First, Closure, Continue, Options)
 1908    ;   ignore(call(Closure, create(ID, T))),
 1909        Continue = true
 1910    ).
 1911pengine_process_event(output(ID, Msg), Closure, true, _Options) :-
 1912    debug(pengine(transition), '~q: 3 = /~q => 4', [ID, output(Msg)]),
 1913    ignore(call(Closure, output(ID, Msg))),
 1914    pengine_pull_response(ID, []).
 1915pengine_process_event(debug(ID, Msg), Closure, true, _Options) :-
 1916    debug(pengine(transition), '~q: 3 = /~q => 4', [ID, debug(Msg)]),
 1917    ignore(call(Closure, debug(ID, Msg))),
 1918    pengine_pull_response(ID, []).
 1919pengine_process_event(prompt(ID, Term), Closure, true, _Options) :-
 1920    debug(pengine(transition), '~q: 3 = /~q => 5', [ID, prompt(Term)]),
 1921    ignore(call(Closure, prompt(ID, Term))).
 1922pengine_process_event(success(ID, Sol, _Proj, _Time, More), Closure, true, _) :-
 1923    debug(pengine(transition), '~q: 3 = /~q => 6/2', [ID, success(Sol, More)]),
 1924    ignore(call(Closure, success(ID, Sol, More))).
 1925pengine_process_event(failure(ID, _Time), Closure, true, _Options) :-
 1926    debug(pengine(transition), '~q: 3 = /~q => 2', [ID, failure]),
 1927    ignore(call(Closure, failure(ID))).
 1928pengine_process_event(error(ID, Error), Closure, Continue, _Options) :-
 1929    debug(pengine(transition), '~q: 3 = /~q => 2', [ID, error(Error)]),
 1930    (   call(Closure, error(ID, Error))
 1931    ->  Continue = true
 1932    ;   forall(child(_,Child), pengine_destroy(Child)),
 1933        throw(Error)
 1934    ).
 1935pengine_process_event(stop(ID), Closure, true, _Options) :-
 1936    debug(pengine(transition), '~q: 7 = /~q => 2', [ID, stop]),
 1937    ignore(call(Closure, stop(ID))).
 1938pengine_process_event(destroy(ID, Event), Closure, Continue, Options) :-
 1939    pengine_process_event(Event, Closure, _, Options),
 1940    pengine_process_event(destroy(ID), Closure, Continue, Options).
 1941pengine_process_event(destroy(ID), Closure, true, _Options) :-
 1942    retractall(child(_,ID)),
 1943    debug(pengine(transition), '~q: 1 = /~q => 0', [ID, destroy]),
 1944    ignore(call(Closure, destroy(ID))).
 pengine_rpc(+URL, +Query) is nondet
 pengine_rpc(+URL, +Query, +Options) is nondet
Semantically equivalent to the sequence below, except that the query is executed in (and in the Prolog context of) the pengine server referred to by URL, rather than locally.
  copy_term_nat(Query, Copy),  % attributes are not copied to the server
  call(Copy),			 % executed on server at URL
  Query = Copy.

Valid options are:

chunk(+Integer)
Can be used to reduce the number of network roundtrips being made. See pengine_ask/3.
timeout(+Time)
Wait at most Time seconds for the next event from the server. The default is defined by the setting pengines:time_limit.

Remaining options (except the server option) are passed to pengine_create/1. */

 1973pengine_rpc(URL, Query) :-
 1974    pengine_rpc(URL, Query, []).
 1975
 1976pengine_rpc(URL, Query, M:Options0) :-
 1977    translate_local_sources(Options0, Options1, M),
 1978    (  option(timeout(_), Options1)
 1979    -> Options = Options1
 1980    ;  setting(time_limit, Limit),
 1981       Options = [timeout(Limit)|Options1]
 1982    ),
 1983    term_variables(Query, Vars),
 1984    Template =.. [v|Vars],
 1985    State = destroy(true),              % modified by process_event/4
 1986    setup_call_catcher_cleanup(
 1987        pengine_create([ ask(Query),
 1988                         template(Template),
 1989                         server(URL),
 1990                         id(Id)
 1991                       | Options
 1992                       ]),
 1993        wait_event(Template, State, [listen(Id)|Options]),
 1994        Why,
 1995        pengine_destroy_and_wait(State, Id, Why)).
 1996
 1997pengine_destroy_and_wait(destroy(true), Id, Why) :-
 1998    !,
 1999    debug(pengine(rpc), 'Destroying RPC because of ~p', [Why]),
 2000    pengine_destroy(Id),
 2001    wait_destroy(Id, 10).
 2002pengine_destroy_and_wait(_, _, Why) :-
 2003    debug(pengine(rpc), 'Not destroying RPC (~p)', [Why]).
 2004
 2005wait_destroy(Id, _) :-
 2006    \+ child(_, Id),
 2007    !.
 2008wait_destroy(Id, N) :-
 2009    pengine_event(Event, [listen(Id),timeout(10)]),
 2010    !,
 2011    (   destroy_event(Event)
 2012    ->  retractall(child(_,Id))
 2013    ;   succ(N1, N)
 2014    ->  wait_destroy(Id, N1)
 2015    ;   debug(pengine(rpc), 'RPC did not answer to destroy ~p', [Id]),
 2016        pengine_unregister_remote(Id),
 2017        retractall(child(_,Id))
 2018    ).
 2019
 2020wait_event(Template, State, Options) :-
 2021    pengine_event(Event, Options),
 2022    debug(pengine(event), 'Received ~p', [Event]),
 2023    process_event(Event, Template, State, Options).
 2024
 2025process_event(create(_ID, Features), Template, State, Options) :-
 2026    memberchk(answer(First), Features),
 2027    process_event(First, Template, State, Options).
 2028process_event(error(_ID, Error), _Template, _, _Options) :-
 2029    throw(Error).
 2030process_event(failure(_ID, _Time), _Template, _, _Options) :-
 2031    fail.
 2032process_event(prompt(ID, Prompt), Template, State, Options) :-
 2033    pengine_rpc_prompt(ID, Prompt, Reply),
 2034    pengine_send(ID, input(Reply)),
 2035    wait_event(Template, State, Options).
 2036process_event(output(ID, Term), Template, State, Options) :-
 2037    pengine_rpc_output(ID, Term),
 2038    pengine_pull_response(ID, Options),
 2039    wait_event(Template, State, Options).
 2040process_event(debug(ID, Message), Template, State, Options) :-
 2041    debug(pengine(debug), '~w', [Message]),
 2042    pengine_pull_response(ID, Options),
 2043    wait_event(Template, State, Options).
 2044process_event(success(_ID, Solutions, _Proj, _Time, false),
 2045              Template, _, _Options) :-
 2046    !,
 2047    member(Template, Solutions).
 2048process_event(success(ID, Solutions, _Proj, _Time, true),
 2049              Template, State, Options) :-
 2050    (   member(Template, Solutions)
 2051    ;   pengine_next(ID, Options),
 2052        wait_event(Template, State, Options)
 2053    ).
 2054process_event(destroy(ID, Event), Template, State, Options) :-
 2055    !,
 2056    retractall(child(_,ID)),
 2057    nb_setarg(1, State, false),
 2058    debug(pengine(destroy), 'State: ~p~n', [State]),
 2059    process_event(Event, Template, State, Options).
 2060% compatibility with older versions of the protocol.
 2061process_event(success(ID, Solutions, Time, More),
 2062              Template, State, Options) :-
 2063    process_event(success(ID, Solutions, _Proj, Time, More),
 2064                  Template, State, Options).
 2065
 2066
 2067pengine_rpc_prompt(ID, Prompt, Term) :-
 2068    prompt(ID, Prompt, Term0),
 2069    !,
 2070    Term = Term0.
 2071pengine_rpc_prompt(_ID, Prompt, Term) :-
 2072    setup_call_cleanup(
 2073        prompt(Old, Prompt),
 2074        read(Term),
 2075        prompt(_, Old)).
 2076
 2077pengine_rpc_output(ID, Term) :-
 2078    output(ID, Term),
 2079    !.
 2080pengine_rpc_output(_ID, Term) :-
 2081    print(Term).
 prompt(+ID, +Prompt, -Term) is semidet
Hook to handle pengine_input/2 from the remote pengine. If the hooks fails, pengine_rpc/3 calls read/1 using the current prompt.
 2088:- multifile prompt/3.
 output(+ID, +Term) is semidet
Hook to handle pengine_output/1 from the remote pengine. If the hook fails, it calls print/1 on Term.
 2095:- multifile output/2. 2096
 2097
 2098/*================= HTTP handlers =======================
 2099*/
 2100
 2101%   Declare  HTTP  locations  we  serve  and   how.  Note  that  we  use
 2102%   time_limit(inifinite) because pengines have their  own timeout. Also
 2103%   note that we use spawn. This  is   needed  because we can easily get
 2104%   many clients waiting for  some  action   on  a  pengine to complete.
 2105%   Without spawning, we would quickly exhaust   the  worker pool of the
 2106%   HTTP server.
 2107%
 2108%   FIXME: probably we should wait for a   short time for the pengine on
 2109%   the default worker thread. Only if  that   time  has expired, we can
 2110%   call http_spawn/2 to continue waiting on   a  new thread. That would
 2111%   improve the performance and reduce the usage of threads.
 2112
 2113:- http_handler(root(pengine),               http_404([]),
 2114                [ id(pengines) ]). 2115:- http_handler(root(pengine/create),        http_pengine_create,
 2116                [ time_limit(infinite), spawn([]) ]). 2117:- http_handler(root(pengine/send),          http_pengine_send,
 2118                [ time_limit(infinite), spawn([]) ]). 2119:- http_handler(root(pengine/pull_response), http_pengine_pull_response,
 2120                [ time_limit(infinite), spawn([]) ]). 2121:- http_handler(root(pengine/abort),         http_pengine_abort,         []). 2122:- http_handler(root(pengine/detach),        http_pengine_detach,        []). 2123:- http_handler(root(pengine/list),          http_pengine_list,          []). 2124:- http_handler(root(pengine/ping),          http_pengine_ping,          []). 2125:- http_handler(root(pengine/destroy_all),   http_pengine_destroy_all,   []). 2126
 2127:- http_handler(root(pengine/'pengines.js'),
 2128                http_reply_file(library('http/web/js/pengines.js'), []), []). 2129:- http_handler(root(pengine/'plterm.css'),
 2130                http_reply_file(library('http/web/css/plterm.css'), []), []).
 http_pengine_create(+Request)
HTTP POST handler for =/pengine/create=. This API accepts the pengine creation parameters both as application/json and as www-form-encoded. Accepted parameters:
ParameterDefaultComment
formatprologOutput format
applicationpengine_sandboxPengine application
chunk1Chunk-size for results
solutionschunkedIf all, emit all results
ask-The query
template-Output template
src_text""Program
src_url-Program to download
disposition-Download location

Note that solutions=all internally uses chunking to obtain the results from the pengine, but the results are combined in a single HTTP reply. This is currently only implemented by the CSV backend that is part of SWISH for downloading unbounded result sets with limited memory resources.

 2157http_pengine_create(Request) :-
 2158    reply_options(Request, [post]),
 2159    !.
 2160http_pengine_create(Request) :-
 2161    memberchk(content_type(CT), Request),
 2162    sub_atom(CT, 0, _, _, 'application/json'),
 2163    !,
 2164    http_read_json_dict(Request, Dict),
 2165    dict_atom_option(format, Dict, Format, prolog),
 2166    dict_atom_option(application, Dict, Application, pengine_sandbox),
 2167    http_pengine_create(Request, Application, Format, Dict).
 2168http_pengine_create(Request) :-
 2169    Optional = [optional(true)],
 2170    OptString = [string|Optional],
 2171    Form = [ format(Format, [default(prolog)]),
 2172             application(Application, [default(pengine_sandbox)]),
 2173             chunk(_, [integer, default(1)]),
 2174             solutions(_, [oneof([all,chunked]), default(chunked)]),
 2175             ask(_, OptString),
 2176             template(_, OptString),
 2177             src_text(_, OptString),
 2178             disposition(_, OptString),
 2179             src_url(_, Optional)
 2180           ],
 2181    http_parameters(Request, Form),
 2182    form_dict(Form, Dict),
 2183    http_pengine_create(Request, Application, Format, Dict).
 2184
 2185dict_atom_option(Key, Dict, Atom, Default) :-
 2186    (   get_dict(Key, Dict, String)
 2187    ->  atom_string(Atom, String)
 2188    ;   Atom = Default
 2189    ).
 2190
 2191form_dict(Form, Dict) :-
 2192    form_values(Form, Pairs),
 2193    dict_pairs(Dict, _, Pairs).
 2194
 2195form_values([], []).
 2196form_values([H|T], Pairs) :-
 2197    arg(1, H, Value),
 2198    nonvar(Value),
 2199    !,
 2200    functor(H, Name, _),
 2201    Pairs = [Name-Value|PairsT],
 2202    form_values(T, PairsT).
 2203form_values([_|T], Pairs) :-
 2204    form_values(T, Pairs).
 http_pengine_create(+Request, +Application, +Format, +OptionsDict)
 2209http_pengine_create(Request, Application, Format, Dict) :-
 2210    current_application(Application),
 2211    !,
 2212    allowed(Request, Application),
 2213    authenticate(Request, Application, UserOptions),
 2214    dict_to_options(Dict, Application, CreateOptions0),
 2215    append(UserOptions, CreateOptions0, CreateOptions),
 2216    pengine_uuid(Pengine),
 2217    message_queue_create(Queue, [max_size(25)]),
 2218    setting(Application:time_limit, TimeLimit),
 2219    get_time(Now),
 2220    asserta(pengine_queue(Pengine, Queue, TimeLimit, Now)),
 2221    broadcast(pengine(create(Pengine, Application, CreateOptions))),
 2222    create(Queue, Pengine, CreateOptions, http, Application),
 2223    create_wait_and_output_result(Pengine, Queue, Format,
 2224                                  TimeLimit, Dict),
 2225    gc_abandoned_queues.
 2226http_pengine_create(_Request, Application, Format, _Dict) :-
 2227    Error = existence_error(pengine_application, Application),
 2228    pengine_uuid(ID),
 2229    output_result(Format, error(ID, error(Error, _))).
 2230
 2231
 2232dict_to_options(Dict, Application, CreateOptions) :-
 2233    dict_pairs(Dict, _, Pairs),
 2234    pairs_create_options(Pairs, Application, CreateOptions).
 2235
 2236pairs_create_options([], _, []) :- !.
 2237pairs_create_options([N-V0|T0], App, [Opt|T]) :-
 2238    Opt =.. [N,V],
 2239    pengine_create_option(Opt), N \== user,
 2240    !,
 2241    (   create_option_type(Opt, atom)
 2242    ->  atom_string(V, V0)               % term creation must be done if
 2243    ;   V = V0                           % we created the source and know
 2244    ),                                   % the operators.
 2245    pairs_create_options(T0, App, T).
 2246pairs_create_options([_|T0], App, T) :-
 2247    pairs_create_options(T0, App, T).
 wait_and_output_result(+Pengine, +Queue, +Format, +TimeLimit) is det
Wait for the Pengine's Queue and if there is a message, send it to the requester using output_result/1. If Pengine does not answer within the time specified by the setting time_limit, Pengine is aborted and the result is error(time_limit_exceeded, _).
 2258wait_and_output_result(Pengine, Queue, Format, TimeLimit) :-
 2259    (   catch(thread_get_message(Queue, pengine_event(_, Event),
 2260                                 [ timeout(TimeLimit)
 2261                                 ]),
 2262              Error, true)
 2263    ->  (   var(Error)
 2264        ->  debug(pengine(wait), 'Got ~q from ~q', [Event, Queue]),
 2265            ignore(destroy_queue_from_http(Pengine, Event, Queue)),
 2266            protect_pengine(Pengine, output_result(Format, Event))
 2267        ;   output_result(Format, died(Pengine))
 2268        )
 2269    ;   time_limit_exceeded(Pengine, Format)
 2270    ).
 create_wait_and_output_result(+Pengine, +Queue, +Format, +TimeLimit, +Dict) is det
Intercepts the `solutions=all' case used for downloading results. Dict may contain a disposition key to denote the download location.
 2279create_wait_and_output_result(Pengine, Queue, Format, TimeLimit, Dict) :-
 2280    get_dict(solutions, Dict, all),
 2281    !,
 2282    between(1, infinite, Page),
 2283    (   catch(thread_get_message(Queue, pengine_event(_, Event),
 2284                                 [ timeout(TimeLimit)
 2285                                 ]),
 2286              Error, true)
 2287    ->  (   var(Error)
 2288        ->  debug(pengine(wait), 'Page ~D: got ~q from ~q', [Page, Event, Queue]),
 2289            (   destroy_queue_from_http(Pengine, Event, Queue)
 2290            ->  !,
 2291                protect_pengine(Pengine,
 2292                                output_result(Format, page(Page, Event), Dict))
 2293            ;   is_more_event(Event)
 2294            ->  pengine_thread(Pengine, Thread),
 2295                thread_send_message(Thread, pengine_request(next)),
 2296                protect_pengine(Pengine,
 2297                                output_result(Format, page(Page, Event), Dict)),
 2298                fail
 2299            ;   !,
 2300                protect_pengine(Pengine,
 2301                                output_result(Format, page(Page, Event), Dict))
 2302            )
 2303        ;   !, output_result(Format, died(Pengine))
 2304        )
 2305    ;   !, time_limit_exceeded(Pengine, Format)
 2306    ),
 2307    !.
 2308create_wait_and_output_result(Pengine, Queue, Format, TimeLimit, _Dict) :-
 2309    wait_and_output_result(Pengine, Queue, Format, TimeLimit).
 2310
 2311is_more_event(success(_Id, _Answers, _Projection, _Time, true)).
 2312is_more_event(create(_, Options)) :-
 2313    memberchk(answer(Event), Options),
 2314    is_more_event(Event).
 time_limit_exceeded(+Pengine, +Format)
The Pengine did not reply within its time limit. Send a reply to the client in the requested format and interrupt the Pengine.
bug
- Ideally, if the Pengine has destroy set to false, we should get the Pengine back to its main loop. Unfortunately we only have normal exceptions that may be caught by the Pengine and abort which cannot be caught and thus destroys the Pengine.
 2328time_limit_exceeded(Pengine, Format) :-
 2329    call_cleanup(
 2330        pengine_destroy(Pengine, [force(true)]),
 2331        output_result(Format,
 2332                      destroy(Pengine,
 2333                              error(Pengine, time_limit_exceeded)))).
 destroy_queue_from_http(+Pengine, +Event, +Queue) is semidet
Consider destroying the output queue for Pengine after sending Event back to the HTTP client. We can destroy the queue if
To be done
- If the client did not request all output, the queue will not be destroyed. We need some timeout and GC for that.
 2348destroy_queue_from_http(ID, _, Queue) :-
 2349    output_queue(ID, Queue, _),
 2350    !,
 2351    destroy_queue_if_empty(Queue).
 2352destroy_queue_from_http(ID, Event, Queue) :-
 2353    debug(pengine(destroy), 'DESTROY? ~p', [Event]),
 2354    is_destroy_event(Event),
 2355    !,
 2356    message_queue_property(Queue, size(Waiting)),
 2357    debug(pengine(destroy), 'Destroy ~p (waiting ~D)', [Queue, Waiting]),
 2358    with_mutex(pengine, sync_destroy_queue_from_http(ID, Queue)).
 2359
 2360is_destroy_event(destroy(_)).
 2361is_destroy_event(destroy(_,_)).
 2362is_destroy_event(create(_, Options)) :-
 2363    memberchk(answer(Event), Options),
 2364    is_destroy_event(Event).
 2365
 2366destroy_queue_if_empty(Queue) :-
 2367    thread_peek_message(Queue, _),
 2368    !.
 2369destroy_queue_if_empty(Queue) :-
 2370    retractall(output_queue(_, Queue, _)),
 2371    message_queue_destroy(Queue).
 gc_abandoned_queues
Check whether there are queues that have been abadoned. This happens if the stream contains output events and not all of them are read by the client.
 2379:- dynamic
 2380    last_gc/1. 2381
 2382gc_abandoned_queues :-
 2383    consider_queue_gc,
 2384    !,
 2385    get_time(Now),
 2386    (   output_queue(_, Queue, Time),
 2387        Now-Time > 15*60,
 2388        retract(output_queue(_, Queue, Time)),
 2389        message_queue_destroy(Queue),
 2390        fail
 2391    ;   retractall(last_gc(_)),
 2392        asserta(last_gc(Now))
 2393    ).
 2394gc_abandoned_queues.
 2395
 2396consider_queue_gc :-
 2397    predicate_property(output_queue(_,_,_), number_of_clauses(N)),
 2398    N > 100,
 2399    (   last_gc(Time),
 2400        get_time(Now),
 2401        Now-Time > 5*60
 2402    ->  true
 2403    ;   \+ last_gc(_)
 2404    ).
 sync_destroy_queue_from_http(+Pengine, +Queue) is det
 sync_delay_destroy_queue(+Pengine, +Queue) is det
Handle destruction of the message queue connecting the HTTP side to the pengine. We cannot delete the queue when the pengine dies because the queue may contain output events. Termination of the pengine and finishing the HTTP exchange may happen in both orders. This means we need handle this using synchronization.
sync_destroy_queue_from_pengine(+Pengine, +Queue)
Called (indirectly) from pengine_done/1 if the pengine's thread dies.
sync_destroy_queue_from_http(+Pengine, +Queue)
Called from destroy_queue/3, from wait_and_output_result/4, i.e., from the HTTP side.
 2422:- dynamic output_queue_destroyed/1. 2423
 2424sync_destroy_queue_from_http(ID, Queue) :-
 2425    (   output_queue(ID, Queue, _)
 2426    ->  destroy_queue_if_empty(Queue)
 2427    ;   thread_peek_message(Queue, pengine_event(_, output(_,_)))
 2428    ->  debug(pengine(destroy), 'Delay destruction of ~p because of output',
 2429              [Queue]),
 2430        get_time(Now),
 2431        asserta(output_queue(ID, Queue, Now))
 2432    ;   message_queue_destroy(Queue),
 2433        asserta(output_queue_destroyed(Queue))
 2434    ).
 sync_destroy_queue_from_pengine(+Pengine, +Queue)
Called from pengine_unregister/1 when the pengine thread terminates. It is called while the mutex pengine held.
 2441sync_destroy_queue_from_pengine(ID, Queue) :-
 2442    (   retract(output_queue_destroyed(Queue))
 2443    ->  true
 2444    ;   get_time(Now),
 2445        asserta(output_queue(ID, Queue, Now))
 2446    ),
 2447    retractall(pengine_queue(ID, Queue, _, _)).
 2448
 2449
 2450http_pengine_send(Request) :-
 2451    reply_options(Request, [get,post]),
 2452    !.
 2453http_pengine_send(Request) :-
 2454    http_parameters(Request,
 2455                    [ id(ID, [ type(atom) ]),
 2456                      event(EventString, [optional(true)]),
 2457                      format(Format, [default(prolog)])
 2458                    ]),
 2459    catch(read_event(ID, Request, Format, EventString, Event),
 2460          Error,
 2461          true),
 2462    (   var(Error)
 2463    ->  debug(pengine(event), 'HTTP send: ~p', [Event]),
 2464        (   pengine_thread(ID, Thread)
 2465        ->  pengine_queue(ID, Queue, TimeLimit, _),
 2466            random_delay,
 2467            broadcast(pengine(send(ID, Event))),
 2468            thread_send_message(Thread, pengine_request(Event)),
 2469            wait_and_output_result(ID, Queue, Format, TimeLimit)
 2470        ;   atom(ID)
 2471        ->  pengine_died(Format, ID)
 2472        ;   http_404([], Request)
 2473        )
 2474    ;   Error = error(existence_error(pengine, ID), _)
 2475    ->  pengine_died(Format, ID)
 2476    ;   output_result(Format, error(ID, Error))
 2477    ).
 2478
 2479pengine_died(Format, Pengine) :-
 2480    output_result(Format, error(Pengine,
 2481                                error(existence_error(pengine, Pengine),_))).
 read_event(+Pengine, +Request, +Format, +EventString, -Event) is det
Read an event on behalve of Pengine. Note that the pengine's module should not be deleted while we are reading using its syntax (module). This is ensured using the pengine_done mutex.
See also
- pengine_done/0.
 2492read_event(Pengine, Request, Format, EventString, Event) :-
 2493    protect_pengine(
 2494        Pengine,
 2495        ( get_pengine_module(Pengine, Module),
 2496          read_event_2(Request, EventString, Module, Event0, Bindings)
 2497        )),
 2498    !,
 2499    fix_bindings(Format, Event0, Bindings, Event).
 2500read_event(Pengine, Request, _Format, _EventString, _Event) :-
 2501    debug(pengine(event), 'Pengine ~q vanished', [Pengine]),
 2502    discard_post_data(Request),
 2503    existence_error(pengine, Pengine).
 read_event_(+Request, +EventString, +Module, -Event, -Bindings)
Read the sent event. The event is a Prolog term that is either in the event parameter or as a posted document.
 2511read_event_2(_Request, EventString, Module, Event, Bindings) :-
 2512    nonvar(EventString),
 2513    !,
 2514    term_string(Event, EventString,
 2515                [ variable_names(Bindings),
 2516                  module(Module)
 2517                ]).
 2518read_event_2(Request, _EventString, Module, Event, Bindings) :-
 2519    option(method(post), Request),
 2520    http_read_data(Request,     Event,
 2521                   [ content_type('application/x-prolog'),
 2522                     module(Module),
 2523                     variable_names(Bindings)
 2524                   ]).
 discard_post_data(+Request) is det
If this is a POST request, discard the posted data.
 2530discard_post_data(Request) :-
 2531    option(method(post), Request),
 2532    !,
 2533    setup_call_cleanup(
 2534        open_null_stream(NULL),
 2535        http_read_data(Request, _, [to(stream(NULL))]),
 2536        close(NULL)).
 2537discard_post_data(_).
 fix_bindings(+Format, +EventIn, +Bindings, -Event) is det
Generate the template for json(-s) Format from the variables in the asked Goal. Variables starting with an underscore, followed by an capital letter are ignored from the template.
 2545fix_bindings(Format,
 2546             ask(Goal, Options0), Bindings,
 2547             ask(Goal, NewOptions)) :-
 2548    json_lang(Format),
 2549    !,
 2550    exclude(anon, Bindings, NamedBindings),
 2551    template(NamedBindings, Template, Options0, Options1),
 2552    select_option(chunk(Paging), Options1, Options2, 1),
 2553    NewOptions = [ template(Template),
 2554                   chunk(Paging),
 2555                   bindings(NamedBindings)
 2556                 | Options2
 2557                 ].
 2558fix_bindings(_, Command, _, Command).
 2559
 2560template(_, Template, Options0, Options) :-
 2561    select_option(template(Template), Options0, Options),
 2562    !.
 2563template(Bindings, Template, Options, Options) :-
 2564    dict_create(Template, swish_default_template, Bindings).
 2565
 2566anon(Name=_) :-
 2567    sub_atom(Name, 0, _, _, '_'),
 2568    sub_atom(Name, 1, 1, _, Next),
 2569    char_type(Next, prolog_var_start).
 2570
 2571var_name(Name=_, Name).
 json_lang(+Format) is semidet
True if Format is a JSON variation.
 2578json_lang(json) :- !.
 2579json_lang(Format) :-
 2580    sub_atom(Format, 0, _, _, 'json-').
 http_pengine_pull_response(+Request)
HTTP handler for /pengine/pull_response. Pulls possible pending messages from the pengine.
 2587http_pengine_pull_response(Request) :-
 2588    reply_options(Request, [get]),
 2589    !.
 2590http_pengine_pull_response(Request) :-
 2591    http_parameters(Request,
 2592            [   id(ID, []),
 2593                format(Format, [default(prolog)])
 2594            ]),
 2595    reattach(ID),
 2596    (   (   pengine_queue(ID, Queue, TimeLimit, _)
 2597        ->  true
 2598        ;   output_queue(ID, Queue, _),
 2599            TimeLimit = 0
 2600        )
 2601    ->  wait_and_output_result(ID, Queue, Format, TimeLimit)
 2602    ;   http_404([], Request)
 2603    ).
 http_pengine_abort(+Request)
HTTP handler for /pengine/abort. Note that abort may be sent at any time and the reply may be handled by a pull_response. In that case, our pengine has already died before we get to wait_and_output_result/4.
 2612http_pengine_abort(Request) :-
 2613    reply_options(Request, [get,post]),
 2614    !.
 2615http_pengine_abort(Request) :-
 2616    http_parameters(Request,
 2617            [   id(ID, [])
 2618            ]),
 2619    (   pengine_thread(ID, _Thread)
 2620    ->  broadcast(pengine(abort(ID))),
 2621        abort_pending_output(ID),
 2622        pengine_abort(ID),
 2623        reply_json(true)
 2624    ;   http_404([], Request)
 2625    ).
 http_pengine_detach(+Request)
Detach a Pengine while keeping it running. This has the following consequences:
 2637http_pengine_detach(Request) :-
 2638    reply_options(Request, [post]),
 2639    !.
 2640http_pengine_detach(Request) :-
 2641    http_parameters(Request,
 2642                    [ id(ID, [])
 2643                    ]),
 2644    http_read_json_dict(Request, ClientData),
 2645    (   pengine_property(ID, application(Application)),
 2646        allowed(Request, Application),
 2647        authenticate(Request, Application, _UserOptions)
 2648    ->  broadcast(pengine(detach(ID))),
 2649        get_time(Now),
 2650        assertz(pengine_detached(ID, ClientData.put(time, Now))),
 2651        pengine_queue(ID, Queue, _TimeLimit, _Now),
 2652        message_queue_set(Queue, max_size(1000)),
 2653        pengine_reply(Queue, detached(ID)),
 2654        reply_json(true)
 2655    ;   http_404([], Request)
 2656    ).
 2657
 2658:- if(\+current_predicate(message_queue_set/2)). 2659message_queue_set(_,_).
 2660:- endif. 2661
 2662reattach(ID) :-
 2663    (   retract(pengine_detached(ID, _Data)),
 2664        pengine_queue(ID, Queue, _TimeLimit, _Now)
 2665    ->  message_queue_set(Queue, max_size(25))
 2666    ;   true
 2667    ).
 http_pengine_destroy_all(+Request)
Destroy a list of pengines. Normally called by pengines.js if the browser window is closed.
 2675http_pengine_destroy_all(Request) :-
 2676    reply_options(Request, [get,post]),
 2677    !.
 2678http_pengine_destroy_all(Request) :-
 2679    http_parameters(Request,
 2680                    [ ids(IDsAtom, [])
 2681                    ]),
 2682    atomic_list_concat(IDs, ',', IDsAtom),
 2683    forall(( member(ID, IDs),
 2684             \+ pengine_detached(ID, _)
 2685           ),
 2686           pengine_destroy(ID, [force(true)])),
 2687    reply_json("ok").
 http_pengine_ping(+Request)
HTTP handler for /pengine/ping. If the requested Pengine is alive and event status(Pengine, Stats) is created, where Stats is the return of thread_statistics/2.
 2695http_pengine_ping(Request) :-
 2696    reply_options(Request, [get]),
 2697    !.
 2698http_pengine_ping(Request) :-
 2699    http_parameters(Request,
 2700                    [ id(Pengine, []),
 2701                      format(Format, [default(prolog)])
 2702                    ]),
 2703    (   pengine_thread(Pengine, Thread),
 2704        Error = error(_,_),
 2705        catch(thread_statistics(Thread, Stats), Error, fail)
 2706    ->  output_result(Format, ping(Pengine, Stats))
 2707    ;   output_result(Format, died(Pengine))
 2708    ).
 http_pengine_list(+Request)
HTTP handler for `/pengine/list`, providing information about running Pengines.
To be done
- Only list detached Pengines associated to the logged in user.
 2717http_pengine_list(Request) :-
 2718    reply_options(Request, [get]),
 2719    !.
 2720http_pengine_list(Request) :-
 2721    http_parameters(Request,
 2722                    [ status(Status, [default(detached), oneof([detached])]),
 2723                      application(Application, [default(pengine_sandbox)])
 2724                    ]),
 2725    allowed(Request, Application),
 2726    authenticate(Request, Application, _UserOptions),
 2727    findall(Term, listed_pengine(Application, Status, Term), Terms),
 2728    reply_json(json{pengines: Terms}).
 2729
 2730listed_pengine(Application, detached, State) :-
 2731    State = pengine{id:Id,
 2732                    detached:Time,
 2733                    queued:Queued,
 2734                    stats:Stats},
 2735
 2736    pengine_property(Id, application(Application)),
 2737    pengine_property(Id, detached(Time)),
 2738    pengine_queue(Id, Queue, _TimeLimit, _Now),
 2739    message_queue_property(Queue, size(Queued)),
 2740    (   pengine_thread(Id, Thread),
 2741        catch(thread_statistics(Thread, Stats), _, fail)
 2742    ->  true
 2743    ;   Stats = thread{status:died}
 2744    ).
 output_result(+Format, +EventTerm) is det
 output_result(+Format, +EventTerm, +OptionsDict) is det
Formulate an HTTP response from a pengine event term. Format is one of prolog, json or json-s.
 2753:- dynamic
 2754    pengine_replying/2.             % +Pengine, +Thread
 2755
 2756output_result(Format, Event) :-
 2757    arg(1, Event, Pengine),
 2758    thread_self(Thread),
 2759    cors_enable,            % contingent on http:cors setting
 2760    disable_client_cache,
 2761    setup_call_cleanup(
 2762        asserta(pengine_replying(Pengine, Thread), Ref),
 2763        catch(output_result(Format, Event, _{}),
 2764              pengine_abort_output,
 2765              true),
 2766        erase(Ref)).
 2767
 2768output_result(Lang, Event, Dict) :-
 2769    write_result(Lang, Event, Dict),
 2770    !.
 2771output_result(prolog, Event, _) :-
 2772    !,
 2773    format('Content-type: text/x-prolog; charset=UTF-8~n~n'),
 2774    write_term(Event,
 2775               [ quoted(true),
 2776                 ignore_ops(true),
 2777                 fullstop(true),
 2778                 blobs(portray),
 2779                 portray_goal(portray_blob),
 2780                 nl(true)
 2781               ]).
 2782output_result(Lang, Event, _) :-
 2783    json_lang(Lang),
 2784    !,
 2785    (   event_term_to_json_data(Event, JSON, Lang)
 2786    ->  reply_json(JSON)
 2787    ;   assertion(event_term_to_json_data(Event, _, Lang))
 2788    ).
 2789output_result(Lang, _Event, _) :-    % FIXME: allow for non-JSON format
 2790    domain_error(pengine_format, Lang).
 portray_blob(+Blob, +Options) is det
Portray non-text blobs that may appear in output terms. Not really sure about that. Basically such terms need to be avoided as they are meaningless outside the process. The generated error is hard to debug though, so now we send them as '$BLOB'(Type). Future versions may include more info, depending on Type.
 2800:- public portray_blob/2.               % called from write-term
 2801portray_blob(Blob, _Options) :-
 2802    blob(Blob, Type),
 2803    writeq('$BLOB'(Type)).
 abort_pending_output(+Pengine) is det
If we get an abort, it is possible that output is being produced for the client. This predicate aborts these threads.
 2810abort_pending_output(Pengine) :-
 2811    forall(pengine_replying(Pengine, Thread),
 2812           abort_output_thread(Thread)).
 2813
 2814abort_output_thread(Thread) :-
 2815    catch(thread_signal(Thread, throw(pengine_abort_output)),
 2816          error(existence_error(thread, _), _),
 2817          true).
 write_result(+Lang, +Event, +Dict) is semidet
Hook that allows for different output formats. The core Pengines library supports prolog and various JSON dialects. The hook event_to_json/3 can be used to refine the JSON dialects. This hook must be used if a completely different output format is desired.
 disable_client_cache
Make sure the client will not cache our page.
See also
- http://stackoverflow.com/questions/49547/making-sure-a-web-page-is-not-cached-across-all-browsers
 2833disable_client_cache :-
 2834    format('Cache-Control: no-cache, no-store, must-revalidate\r\n\c
 2835            Pragma: no-cache\r\n\c
 2836            Expires: 0\r\n').
 2837
 2838event_term_to_json_data(Event, JSON, Lang) :-
 2839    event_to_json(Event, JSON, Lang),
 2840    !.
 2841event_term_to_json_data(success(ID, Bindings0, Projection, Time, More),
 2842                        json{event:success, id:ID, time:Time,
 2843                             data:Bindings, more:More, projection:Projection},
 2844                        json) :-
 2845    !,
 2846    term_to_json(Bindings0, Bindings).
 2847event_term_to_json_data(destroy(ID, Event),
 2848                        json{event:destroy, id:ID, data:JSON},
 2849                        Style) :-
 2850    !,
 2851    event_term_to_json_data(Event, JSON, Style).
 2852event_term_to_json_data(create(ID, Features0), JSON, Style) :-
 2853    !,
 2854    (   select(answer(First0), Features0, Features1)
 2855    ->  event_term_to_json_data(First0, First, Style),
 2856        Features = [answer(First)|Features1]
 2857    ;   Features = Features0
 2858    ),
 2859    dict_create(JSON, json, [event(create), id(ID)|Features]).
 2860event_term_to_json_data(destroy(ID, Event),
 2861                        json{event:destroy, id:ID, data:JSON}, Style) :-
 2862    !,
 2863    event_term_to_json_data(Event, JSON, Style).
 2864event_term_to_json_data(error(ID, ErrorTerm), Error, _Style) :-
 2865    !,
 2866    Error0 = json{event:error, id:ID, data:Message},
 2867    add_error_details(ErrorTerm, Error0, Error),
 2868    message_to_string(ErrorTerm, Message).
 2869event_term_to_json_data(failure(ID, Time),
 2870                        json{event:failure, id:ID, time:Time}, _) :-
 2871    !.
 2872event_term_to_json_data(EventTerm, json{event:F, id:ID}, _) :-
 2873    functor(EventTerm, F, 1),
 2874    !,
 2875    arg(1, EventTerm, ID).
 2876event_term_to_json_data(EventTerm, json{event:F, id:ID, data:JSON}, _) :-
 2877    functor(EventTerm, F, 2),
 2878    arg(1, EventTerm, ID),
 2879    arg(2, EventTerm, Data),
 2880    term_to_json(Data, JSON).
 2881
 2882:- public add_error_details/3.
 add_error_details(+Error, +JSON0, -JSON)
Add format error code and location information to an error. Also used by pengines_io.pl.
 2889add_error_details(Error, JSON0, JSON) :-
 2890    add_error_code(Error, JSON0, JSON1),
 2891    add_error_location(Error, JSON1, JSON).
 add_error_code(+Error, +JSON0, -JSON) is det
Add a code field to JSON0 of Error is an ISO error term. The error code is the functor name of the formal part of the error, e.g., syntax_error, type_error, etc. Some errors carry more information:
existence_error(Type, Obj)
{arg1:Type, arg2:Obj}, where Obj is stringified of it is not atomic.
 2904add_error_code(error(existence_error(Type, Obj), _), Error0, Error) :-
 2905    atom(Type),
 2906    !,
 2907    to_atomic(Obj, Value),
 2908    Error = Error0.put(_{code:existence_error, arg1:Type, arg2:Value}).
 2909add_error_code(error(Formal, _), Error0, Error) :-
 2910    callable(Formal),
 2911    !,
 2912    functor(Formal, Code, _),
 2913    Error = Error0.put(code, Code).
 2914add_error_code(_, Error, Error).
 2915
 2916% What to do with large integers?
 2917to_atomic(Obj, Atomic) :- atom(Obj),   !, Atomic = Obj.
 2918to_atomic(Obj, Atomic) :- number(Obj), !, Atomic = Obj.
 2919to_atomic(Obj, Atomic) :- string(Obj), !, Atomic = Obj.
 2920to_atomic(Obj, Atomic) :- term_string(Obj, Atomic).
 add_error_location(+Error, +JSON0, -JSON) is det
Add a location property if the error can be associated with a source location. The location is an object with properties file and line and, if available, the character location in the line.
 2929add_error_location(error(_, file(Path, Line, -1, _CharNo)), Term0, Term) :-
 2930    atom(Path), integer(Line),
 2931    !,
 2932    Term = Term0.put(_{location:_{file:Path, line:Line}}).
 2933add_error_location(error(_, file(Path, Line, Ch, _CharNo)), Term0, Term) :-
 2934    atom(Path), integer(Line), integer(Ch),
 2935    !,
 2936    Term = Term0.put(_{location:_{file:Path, line:Line, ch:Ch}}).
 2937add_error_location(_, Term, Term).
 event_to_json(+Event, -JSONTerm, +Lang) is semidet
Hook that translates a Pengine event structure into a term suitable for reply_json/1, according to the language specification Lang. This can be used to massage general Prolog terms, notably associated with success(ID, Bindings, Projection, Time, More) and output(ID, Term) into a format suitable for processing at the client side.
 2948%:- multifile pengines:event_to_json/3.
 2949
 2950
 2951                 /*******************************
 2952                 *        ACCESS CONTROL        *
 2953                 *******************************/
 allowed(+Request, +Application) is det
Check whether the peer is allowed to connect. Returns a forbidden header if contact is not allowed.
 2960allowed(Request, Application) :-
 2961    setting(Application:allow_from, Allow),
 2962    match_peer(Request, Allow),
 2963    setting(Application:deny_from, Deny),
 2964    \+ match_peer(Request, Deny),
 2965    !.
 2966allowed(Request, _Application) :-
 2967    memberchk(request_uri(Here), Request),
 2968    throw(http_reply(forbidden(Here))).
 2969
 2970match_peer(_, Allowed) :-
 2971    memberchk(*, Allowed),
 2972    !.
 2973match_peer(_, []) :- !, fail.
 2974match_peer(Request, Allowed) :-
 2975    http_peer(Request, Peer),
 2976    debug(pengine(allow), 'Peer: ~q, Allow: ~q', [Peer, Allowed]),
 2977    (   memberchk(Peer, Allowed)
 2978    ->  true
 2979    ;   member(Pattern, Allowed),
 2980        match_peer_pattern(Pattern, Peer)
 2981    ).
 2982
 2983match_peer_pattern(Pattern, Peer) :-
 2984    ip_term(Pattern, IP),
 2985    ip_term(Peer, IP),
 2986    !.
 2987
 2988ip_term(Peer, Pattern) :-
 2989    split_string(Peer, ".", "", PartStrings),
 2990    ip_pattern(PartStrings, Pattern).
 2991
 2992ip_pattern([], []).
 2993ip_pattern([*], _) :- !.
 2994ip_pattern([S|T0], [N|T]) :-
 2995    number_string(N, S),
 2996    ip_pattern(T0, T).
 authenticate(+Request, +Application, -UserOptions:list) is det
Call authentication_hook/3, returning either [user(User)], [] or an exception.
 3004authenticate(Request, Application, UserOptions) :-
 3005    authentication_hook(Request, Application, User),
 3006    !,
 3007    must_be(ground, User),
 3008    UserOptions = [user(User)].
 3009authenticate(_, _, []).
 authentication_hook(+Request, +Application, -User) is semidet
This hook is called from the =/pengine/create= HTTP handler to discover whether the server is accessed by an authorized user. It can react in three ways:
See also
- http_authenticate/3 can be used to implement this hook using default HTTP authentication data.
 3031pengine_register_user(Options) :-
 3032    option(user(User), Options),
 3033    !,
 3034    pengine_self(Me),
 3035    asserta(pengine_user(Me, User)).
 3036pengine_register_user(_).
 pengine_user(-User) is semidet
True when the pengine was create by an HTTP request that authorized User.
See also
- authentication_hook/3 can be used to extract authorization from the HTTP header.
 3047pengine_user(User) :-
 3048    pengine_self(Me),
 3049    pengine_user(Me, User).
 reply_options(+Request, +Methods) is semidet
Reply the HTTP OPTIONS request
 3055reply_options(Request, Allowed) :-
 3056    option(method(options), Request),
 3057    !,
 3058    cors_enable(Request,
 3059                [ methods(Allowed)
 3060                ]),
 3061    format('Content-type: text/plain\r\n'),
 3062    format('~n').                   % empty body
 3063
 3064
 3065                 /*******************************
 3066                 *        COMPILE SOURCE        *
 3067                 *******************************/
 pengine_src_text(+SrcText, +Module) is det
Asserts the clauses defined in SrcText in the private database of the current Pengine. This predicate processes the `src_text' option of pengine_create/1. */
 3076pengine_src_text(Src, Module) :-
 3077    pengine_self(Self),
 3078    format(atom(ID), 'pengine://~w/src', [Self]),
 3079    extra_load_options(Self, Options),
 3080    setup_call_cleanup(
 3081        open_chars_stream(Src, Stream),
 3082        load_files(Module:ID,
 3083                   [ stream(Stream),
 3084                     module(Module),
 3085                     silent(true)
 3086                   | Options
 3087                   ]),
 3088        close(Stream)),
 3089    keep_source(Self, ID, Src).
 3090
 3091system:'#file'(File, _Line) :-
 3092    prolog_load_context(stream, Stream),
 3093    set_stream(Stream, file_name(File)),
 3094    set_stream(Stream, record_position(false)),
 3095    set_stream(Stream, record_position(true)).
 pengine_src_url(+URL, +Module) is det
Asserts the clauses defined in URL in the private database of the current Pengine. This predicate processes the `src_url' option of pengine_create/1.
To be done
- : make a sensible guess at the encoding.
 3105pengine_src_url(URL, Module) :-
 3106    pengine_self(Self),
 3107    uri_encoded(path, URL, Path),
 3108    format(atom(ID), 'pengine://~w/url/~w', [Self, Path]),
 3109    extra_load_options(Self, Options),
 3110    (   get_pengine_application(Self, Application),
 3111        setting(Application:debug_info, false)
 3112    ->  setup_call_cleanup(
 3113            http_open(URL, Stream, []),
 3114            ( set_stream(Stream, encoding(utf8)),
 3115              load_files(Module:ID,
 3116                         [ stream(Stream),
 3117                           module(Module)
 3118                         | Options
 3119                         ])
 3120            ),
 3121            close(Stream))
 3122    ;   setup_call_cleanup(
 3123            http_open(URL, TempStream, []),
 3124            ( set_stream(TempStream, encoding(utf8)),
 3125              read_string(TempStream, _, Src)
 3126            ),
 3127            close(TempStream)),
 3128        setup_call_cleanup(
 3129            open_chars_stream(Src, Stream),
 3130            load_files(Module:ID,
 3131                       [ stream(Stream),
 3132                         module(Module)
 3133                       | Options
 3134                       ]),
 3135            close(Stream)),
 3136        keep_source(Self, ID, Src)
 3137    ).
 3138
 3139
 3140extra_load_options(Pengine, Options) :-
 3141    pengine_not_sandboxed(Pengine),
 3142    !,
 3143    Options = [].
 3144extra_load_options(_, [sandboxed(true)]).
 3145
 3146
 3147keep_source(Pengine, ID, SrcText) :-
 3148    get_pengine_application(Pengine, Application),
 3149    setting(Application:debug_info, true),
 3150    !,
 3151    to_string(SrcText, SrcString),
 3152    assertz(pengine_data(Pengine, source(ID, SrcString))).
 3153keep_source(_, _, _).
 3154
 3155to_string(String, String) :-
 3156    string(String),
 3157    !.
 3158to_string(Atom, String) :-
 3159    atom_string(Atom, String),
 3160    !.
 3161
 3162		 /*******************************
 3163		 *            SANDBOX		*
 3164		 *******************************/
 3165
 3166:- multifile
 3167    sandbox:safe_primitive/1. 3168
 3169sandbox:safe_primitive(pengines:pengine_input(_, _)).
 3170sandbox:safe_primitive(pengines:pengine_output(_)).
 3171sandbox:safe_primitive(pengines:pengine_debug(_,_)).
 3172
 3173
 3174                 /*******************************
 3175                 *            MESSAGES          *
 3176                 *******************************/
 3177
 3178prolog:error_message(sandbox(time_limit_exceeded, Limit)) -->
 3179    [ 'Could not prove safety of your goal within ~f seconds.'-[Limit], nl,
 3180      'This is normally caused by an insufficiently instantiated'-[], nl,
 3181      'meta-call (e.g., call(Var)) for which it is too expensive to'-[], nl,
 3182      'find all possible instantations of Var.'-[]
 3183    ]