View source with raw comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jan Wielemaker
    4    E-mail:        J.Wielemaker@vu.nl
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2002-2020, University of Amsterdam
    7                              VU University Amsterdam
    8                              CWI, Amsterdam
    9    All rights reserved.
   10
   11    Redistribution and use in source and binary forms, with or without
   12    modification, are permitted provided that the following conditions
   13    are met:
   14
   15    1. Redistributions of source code must retain the above copyright
   16       notice, this list of conditions and the following disclaimer.
   17
   18    2. Redistributions in binary form must reproduce the above copyright
   19       notice, this list of conditions and the following disclaimer in
   20       the documentation and/or other materials provided with the
   21       distribution.
   22
   23    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   24    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   25    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   26    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   27    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   28    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   29    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   30    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   31    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   32    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   33    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   34    POSSIBILITY OF SUCH DAMAGE.
   35*/
   36
   37:- module(thread_httpd,
   38          [ http_current_server/2,      % ?:Goal, ?Port
   39            http_server_property/2,     % ?Port, ?Property
   40            http_server/2,              % :Goal, +Options
   41            http_workers/2,             % +Port, ?WorkerCount
   42            http_add_worker/2,          % +Port, +Options
   43            http_current_worker/2,      % ?Port, ?ThreadID
   44            http_stop_server/2,         % +Port, +Options
   45            http_spawn/2,               % :Goal, +Options
   46
   47            http_requeue/1,             % +Request
   48            http_close_connection/1,    % +Request
   49            http_enough_workers/3       % +Queue, +Why, +Peer
   50          ]).   51:- use_module(library(debug)).   52:- use_module(library(error)).   53:- use_module(library(option)).   54:- use_module(library(socket)).   55:- use_module(library(thread_pool)).   56:- use_module(library(gensym)).   57:- use_module(http_wrapper).   58:- use_module(http_path).   59
   60:- autoload(library(uri), [uri_resolve/3]).   61
   62:- predicate_options(http_server/2, 2,
   63                     [ port(any),
   64                       unix_socket(atom),
   65                       entry_page(atom),
   66                       tcp_socket(any),
   67                       workers(positive_integer),
   68                       timeout(number),
   69                       keep_alive_timeout(number),
   70                       silent(boolean),
   71                       ssl(list(any)),  % if http/http_ssl_plugin is loaded
   72                       pass_to(system:thread_create/3, 3)
   73                     ]).   74:- predicate_options(http_spawn/2, 2,
   75                     [ pool(atom),
   76                       pass_to(system:thread_create/3, 3),
   77                       pass_to(thread_pool:thread_create_in_pool/4, 4)
   78                     ]).   79:- predicate_options(http_add_worker/2, 2,
   80                     [ timeout(number),
   81                       keep_alive_timeout(number),
   82                       max_idle_time(number),
   83                       pass_to(system:thread_create/3, 3)
   84                     ]).

Threaded HTTP server

Most code doesn't need to use this directly; instead use library(http/http_server), which combines this library with the typical HTTP libraries that most servers need.

This library defines the HTTP server frontend of choice for SWI-Prolog. It is based on the multi-threading capabilities of SWI-Prolog and thus exploits multiple cores to serve requests concurrently. The server scales well and can cooperate with library(thread_pool) to control the number of concurrent requests of a given type. For example, it can be configured to handle 200 file download requests concurrently, 2 requests that potentially uses a lot of memory and 8 requests that use a lot of CPU resources.

On Unix systems, this library can be combined with library(http/http_unix_daemon) to realise a proper Unix service process that creates a web server at port 80, runs under a specific account, optionally detaches from the controlling terminal, etc.

Combined with library(http/http_ssl_plugin) from the SSL package, this library can be used to create an HTTPS server. See <plbase>/doc/packages/examples/ssl/https for an example server using a self-signed SSL certificate. */

  112:- meta_predicate
  113    http_server(1, :),
  114    http_current_server(1, ?),
  115    http_spawn(0, +).  116
  117:- dynamic
  118    current_server/6,       % Port, Goal, Thread, Queue, Scheme, StartTime
  119    queue_worker/2,         % Queue, ThreadID
  120    queue_options/2.        % Queue, Options
  121
  122:- multifile
  123    make_socket_hook/3,
  124    accept_hook/2,
  125    close_hook/1,
  126    open_client_hook/6,
  127    http:create_pool/1,
  128    http:schedule_workers/1.  129
  130:- meta_predicate
  131    thread_repeat_wait(0).
 http_server(:Goal, :Options) is det
Create a server at Port that calls Goal for each parsed request. Options provide a list of options. Defined options are
port(?Address)
Port to bind to. Address is either a port or a term Host:Port. The port may be a variable, causing the system to select a free port. See tcp_bind/2.
unix_socket(+Path)
Instead of binding to a TCP port, bind to a Unix Domain Socket at Path.
entry_page(+URI)
Affects the message printed while the server is started. Interpreted as a URI relative to the server root.
tcp_socket(+Socket)
If provided, use this socket instead of the creating one and binding it to an address. The socket must be bound to an address.
workers(+Count)
Determine the number of worker threads. Default is 5. This is fine for small scale usage. Public servers typically need a higher number.
timeout(+Seconds)
Maximum time of inactivity trying to read the request after a connection has been opened. Default is 60 seconds. See set_stream/1 using the timeout option.
keep_alive_timeout(+Seconds)
Time to keep `Keep alive' connections alive. Default is 2 seconds.
stack_limit(+Bytes)
Stack limit to use for the workers. The default is inherited from the main thread. If you need to control resource usage you may consider the spawn option of http_handler/3 and library(thread_pool).
silent(Bool)
If true (default false), do not print an informational message that the server was started.

A typical initialization for an HTTP server that uses http_dispatch/1 to relay requests to predicates is:

:- use_module(library(http/thread_httpd)).
:- use_module(library(http/http_dispatch)).

start_server(Port) :-
    http_server(http_dispatch, [port(Port)]).

Note that multiple servers can coexist in the same Prolog process. A notable application of this is to have both an HTTP and HTTPS server, where the HTTP server redirects to the HTTPS server for handling sensitive requests.

  196http_server(Goal, M:Options0) :-
  197    server_address(Address, Options0),
  198    !,
  199    make_socket(Address, M:Options0, Options),
  200    create_workers(Options),
  201    create_server(Goal, Address, Options),
  202    (   option(silent(true), Options0)
  203    ->  true
  204    ;   print_message(informational,
  205                      httpd_started_server(Address, Options0))
  206    ).
  207http_server(_Goal, _:Options0) :-
  208    existence_error(server_address, Options0).
  209
  210server_address(Address, Options) :-
  211    (   option(port(Port), Options)
  212    ->  Address = Port
  213    ;   option(unix_socket(Path), Options)
  214    ->  Address = unix_socket(Path)
  215    ).
  216
  217address_port(_IFace:Port, Port) :- !.
  218address_port(unix_socket(Path), Path) :- !.
  219address_port(Address, Address) :- !.
  220
  221tcp_address(Port) :-
  222    var(Port),
  223    !.
  224tcp_address(Port) :-
  225    integer(Port),
  226    !.
  227tcp_address(_Iface:_Port).
 make_socket(+Address, :OptionsIn, -OptionsOut) is det
Create the HTTP server socket and worker pool queue. OptionsOut is quaranteed to hold the option queue(QueueId).
Arguments:
OptionsIn- is qualified to allow passing the module-sensitive ssl option argument.
  237make_socket(Address, M:Options0, Options) :-
  238    tcp_address(Address),
  239    make_socket_hook(Address, M:Options0, Options),
  240    !.
  241make_socket(Address, _:Options0, Options) :-
  242    option(tcp_socket(_), Options0),
  243    !,
  244    make_addr_atom('httpd', Address, Queue),
  245    Options = [ queue(Queue)
  246              | Options0
  247              ].
  248make_socket(Address, _:Options0, Options) :-
  249    tcp_address(Address),
  250    !,
  251    tcp_socket(Socket),
  252    tcp_setopt(Socket, reuseaddr),
  253    tcp_bind(Socket, Address),
  254    tcp_listen(Socket, 64),
  255    make_addr_atom('httpd', Address, Queue),
  256    Options = [ queue(Queue),
  257                tcp_socket(Socket)
  258              | Options0
  259              ].
  260:- if(current_predicate(unix_domain_socket/1)).  261make_socket(Address, _:Options0, Options) :-
  262    Address = unix_socket(Path),
  263    !,
  264    unix_domain_socket(Socket),
  265    tcp_bind(Socket, Path),
  266    tcp_listen(Socket, 64),
  267    make_addr_atom('httpd', Address, Queue),
  268    Options = [ queue(Queue),
  269                tcp_socket(Socket)
  270              | Options0
  271              ].
  272:- endif.
 make_addr_atom(+Scheme, +Address, -Atom) is det
Create an atom that identifies the server's queue and thread resources.
  279make_addr_atom(Scheme, Address, Atom) :-
  280    phrase(address_parts(Address), Parts),
  281    atomic_list_concat([Scheme,@|Parts], Atom).
  282
  283address_parts(Atomic) -->
  284    { atomic(Atomic) },
  285    !,
  286    [Atomic].
  287address_parts(Host:Port) -->
  288    !,
  289    address_parts(Host), [:], address_parts(Port).
  290address_parts(ip(A,B,C,D)) -->
  291    !,
  292    [ A, '.', B, '.', C, '.', D ].
  293address_parts(unix_socket(Path)) -->
  294    [Path].
 create_server(:Goal, +Address, +Options) is det
Create the main server thread that runs accept_server/2 to listen to new requests.
  302create_server(Goal, Address, Options) :-
  303    get_time(StartTime),
  304    memberchk(queue(Queue), Options),
  305    scheme(Scheme, Options),
  306    autoload_https(Scheme),
  307    address_port(Address, Port),
  308    make_addr_atom(Scheme, Port, Alias),
  309    thread_self(Initiator),
  310    thread_create(accept_server(Goal, Initiator, Options), _,
  311                  [ alias(Alias)
  312                  ]),
  313    thread_get_message(server_started),
  314    assert(current_server(Port, Goal, Alias, Queue, Scheme, StartTime)).
  315
  316scheme(Scheme, Options) :-
  317    option(scheme(Scheme), Options),
  318    !.
  319scheme(Scheme, Options) :-
  320    (   option(ssl(_), Options)
  321    ;   option(ssl_instance(_), Options)
  322    ),
  323    !,
  324    Scheme = https.
  325scheme(http, _).
  326
  327autoload_https(https) :-
  328    \+ clause(accept_hook(_Goal, _Options), _),
  329    exists_source(library(http/http_ssl_plugin)),
  330    !,
  331    use_module(library(http/http_ssl_plugin)).
  332autoload_https(_).
 http_current_server(:Goal, ?Port) is nondet
True if Goal is the goal of a server at Port.
deprecated
- Use http_server_property(Port, goal(Goal))
  340http_current_server(Goal, Port) :-
  341    current_server(Port, Goal, _, _, _, _).
 http_server_property(?Port, ?Property) is nondet
True if Property is a property of the HTTP server running at Port. Defined properties are:
goal(:Goal)
Goal used to start the server. This is often http_dispatch/1.
scheme(-Scheme)
Scheme is one of http or https.
start_time(?Time)
Time-stamp when the server was created.
  357http_server_property(_:Port, Property) :-
  358    integer(Port),
  359    !,
  360    server_property(Property, Port).
  361http_server_property(Port, Property) :-
  362    server_property(Property, Port).
  363
  364server_property(goal(Goal), Port) :-
  365    current_server(Port, Goal, _, _, _, _).
  366server_property(scheme(Scheme), Port) :-
  367    current_server(Port, _, _, _, Scheme, _).
  368server_property(start_time(Time), Port) :-
  369    current_server(Port, _, _, _, _, Time).
 http_workers(+Port, -Workers) is det
http_workers(+Port, +Workers:int) is det
Query or set the number of workers for the server at this port. The number of workers is dynamically modified. Setting it to 1 (one) can be used to profile the worker using tprofile/1.
  379http_workers(Port, Workers) :-
  380    must_be(ground, Port),
  381    current_server(Port, _, _, Queue, _, _),
  382    !,
  383    (   integer(Workers)
  384    ->  resize_pool(Queue, Workers)
  385    ;   findall(W, queue_worker(Queue, W), WorkerIDs),
  386        length(WorkerIDs, Workers)
  387    ).
  388http_workers(Port, _) :-
  389    existence_error(http_server, Port).
 http_add_worker(+Port, +Options) is det
Add a new worker to the HTTP server for port Port. Options overrule the default queue options. The following additional options are processed:
max_idle_time(+Seconds)
The created worker will automatically terminate if there is no new work within Seconds.
  402http_add_worker(Port, Options) :-
  403    must_be(ground, Port),
  404    current_server(Port, _, _, Queue, _, _),
  405    !,
  406    queue_options(Queue, QueueOptions),
  407    merge_options(Options, QueueOptions, WorkerOptions),
  408    atom_concat(Queue, '_', AliasBase),
  409    create_workers(1, 1, Queue, AliasBase, WorkerOptions).
  410http_add_worker(Port, _) :-
  411    existence_error(http_server, Port).
 http_current_worker(?Port, ?ThreadID) is nondet
True if ThreadID is the identifier of a Prolog thread serving Port. This predicate is motivated to allow for the use of arbitrary interaction with the worker thread for development and statistics.
  421http_current_worker(Port, ThreadID) :-
  422    current_server(Port, _, _, Queue, _, _),
  423    queue_worker(Queue, ThreadID).
 accept_server(:Goal, +Initiator, +Options)
The goal of a small server-thread accepting new requests and posting them to the queue of workers.
  431accept_server(Goal, Initiator, Options) :-
  432    catch(accept_server2(Goal, Initiator, Options), http_stop, true),
  433    thread_self(Thread),
  434    retract(current_server(_Port, _, Thread, _Queue, _Scheme, _StartTime)),
  435    close_server_socket(Options).
  436
  437accept_server2(Goal, Initiator, Options) :-
  438    thread_send_message(Initiator, server_started),
  439    repeat,
  440      (   catch(accept_server3(Goal, Options), E, true)
  441      ->  (   var(E)
  442          ->  fail
  443          ;   accept_rethrow_error(E)
  444          ->  throw(E)
  445          ;   print_message(error, E),
  446              fail
  447          )
  448      ;   print_message(error,      % internal error
  449                        goal_failed(accept_server3(Goal, Options))),
  450          fail
  451      ).
  452
  453accept_server3(Goal, Options) :-
  454    accept_hook(Goal, Options),
  455    !.
  456accept_server3(Goal, Options) :-
  457    memberchk(tcp_socket(Socket), Options),
  458    memberchk(queue(Queue), Options),
  459    debug(http(connection), 'Waiting for connection', []),
  460    tcp_accept(Socket, Client, Peer),
  461    debug(http(connection), 'New HTTP connection from ~p', [Peer]),
  462    thread_send_message(Queue, tcp_client(Client, Goal, Peer)),
  463    http_enough_workers(Queue, accept, Peer).
  464
  465accept_rethrow_error(http_stop).
  466accept_rethrow_error('$aborted').
 close_server_socket(+Options)
Close the server socket.
  473close_server_socket(Options) :-
  474    close_hook(Options),
  475    !.
  476close_server_socket(Options) :-
  477    memberchk(tcp_socket(Socket), Options),
  478    !,
  479    tcp_close_socket(Socket).
 http_stop_server(+Port, +Options)
Stop the indicated HTTP server gracefully. First stops all workers, then stops the server.
To be done
- Realise non-graceful stop
  489http_stop_server(Host:Port, Options) :-         % e.g., localhost:4000
  490    ground(Host),
  491    !,
  492    http_stop_server(Port, Options).
  493http_stop_server(Port, _Options) :-
  494    http_workers(Port, 0),                  % checks Port is ground
  495    current_server(Port, _, Thread, Queue, _Scheme, _Start),
  496    retractall(queue_options(Queue, _)),
  497    thread_signal(Thread, throw(http_stop)),
  498    catch(connect(localhost:Port), _, true),
  499    thread_join(Thread, _),
  500    message_queue_destroy(Queue).
  501
  502connect(Address) :-
  503    setup_call_cleanup(
  504        tcp_socket(Socket),
  505        tcp_connect(Socket, Address),
  506        tcp_close_socket(Socket)).
 http_enough_workers(+Queue, +Why, +Peer) is det
Check that we have enough workers in our queue. If not, call the hook http:schedule_workers/1 to extend the worker pool. This predicate can be used by accept_hook/2.
  514http_enough_workers(Queue, _Why, _Peer) :-
  515    message_queue_property(Queue, waiting(_0)),
  516    !,
  517    debug(http(scheduler), '~D waiting for work; ok', [_0]).
  518http_enough_workers(Queue, Why, Peer) :-
  519    message_queue_property(Queue, size(Size)),
  520    (   enough(Size, Why)
  521    ->  debug(http(scheduler), '~D in queue; ok', [Size])
  522    ;   current_server(Port, _, _, Queue, _, _),
  523        Data = _{ port:Port,
  524                  reason:Why,
  525                  peer:Peer,
  526                  waiting:Size,
  527                  queue:Queue
  528                },
  529        debug(http(scheduler), 'Asking to reschedule: ~p', [Data]),
  530        catch(http:schedule_workers(Data),
  531              Error,
  532              print_message(error, Error))
  533    ->  true
  534    ;   true
  535    ).
  536
  537enough(0, _).
  538enough(1, keep_alive).                  % I will be ready myself
 http:schedule_workers(+Data:dict) is semidet
Hook called if a new connection or a keep-alive connection cannot be scheduled immediately to a worker. Dict contains the following keys:
port:Port
Port number that identifies the server.
reason:Reason
One of accept for a new connection or keep_alive if a worker tries to reschedule itself.
peer:Peer
Identify the other end of the connection
waiting:Size
Number of messages waiting in the queue.
queue:Queue
Message queue used to dispatch accepted messages.

Note that, when called with reason:accept, we are called in the time critical main accept loop. An implementation of this hook shall typically send the event to thread dedicated to dynamic worker-pool management.

See also
- http_add_worker/2 may be used to create (temporary) extra workers.
  568                 /*******************************
  569                 *    WORKER QUEUE OPERATIONS   *
  570                 *******************************/
 create_workers(+Options)
Create the pool of HTTP worker-threads. Each worker has the alias http_worker_N.
  577create_workers(Options) :-
  578    option(workers(N), Options, 5),
  579    option(queue(Queue), Options),
  580    catch(message_queue_create(Queue), _, true),
  581    atom_concat(Queue, '_', AliasBase),
  582    create_workers(1, N, Queue, AliasBase, Options),
  583    assert(queue_options(Queue, Options)).
  584
  585create_workers(I, N, _, _, _) :-
  586    I > N,
  587    !.
  588create_workers(I, N, Queue, AliasBase, Options) :-
  589    gensym(AliasBase, Alias),
  590    thread_create(http_worker(Options), Id,
  591                  [ alias(Alias)
  592                  | Options
  593                  ]),
  594    assertz(queue_worker(Queue, Id)),
  595    I2 is I + 1,
  596    create_workers(I2, N, Queue, AliasBase, Options).
 resize_pool(+Queue, +Workers) is det
Create or destroy workers. If workers are destroyed, the call waits until the desired number of waiters is reached.
  604resize_pool(Queue, Size) :-
  605    findall(W, queue_worker(Queue, W), Workers),
  606    length(Workers, Now),
  607    (   Now < Size
  608    ->  queue_options(Queue, Options),
  609        atom_concat(Queue, '_', AliasBase),
  610        I0 is Now+1,
  611        create_workers(I0, Size, Queue, AliasBase, Options)
  612    ;   Now == Size
  613    ->  true
  614    ;   Now > Size
  615    ->  Excess is Now - Size,
  616        thread_self(Me),
  617        forall(between(1, Excess, _), thread_send_message(Queue, quit(Me))),
  618        forall(between(1, Excess, _), thread_get_message(quitted(_)))
  619    ).
 http_worker(+Options)
Run HTTP worker main loop. Workers simply wait until they are passed an accepted socket to process a client.

If the message quit(Sender) is read from the queue, the worker stops.

  630http_worker(Options) :-
  631    debug(http(scheduler), 'New worker', []),
  632    prolog_listen(this_thread_exit, done_worker),
  633    option(queue(Queue), Options),
  634    option(max_idle_time(MaxIdle), Options, infinite),
  635    thread_repeat_wait(get_work(Queue, Message, MaxIdle)),
  636      debug(http(worker), 'Waiting for a job ...', []),
  637      debug(http(worker), 'Got job ~p', [Message]),
  638      (   Message = quit(Sender)
  639      ->  !,
  640          thread_self(Self),
  641          thread_detach(Self),
  642          (   Sender == idle
  643          ->  true
  644          ;   retract(queue_worker(Queue, Self)),
  645              thread_send_message(Sender, quitted(Self))
  646          )
  647      ;   open_client(Message, Queue, Goal, In, Out,
  648                      Options, ClientOptions),
  649          (   catch(http_process(Goal, In, Out, ClientOptions),
  650                    Error, true)
  651          ->  true
  652          ;   Error = goal_failed(http_process/4)
  653          ),
  654          (   var(Error)
  655          ->  fail
  656          ;   current_message_level(Error, Level),
  657              print_message(Level, Error),
  658              memberchk(peer(Peer), ClientOptions),
  659              close_connection(Peer, In, Out),
  660              fail
  661          )
  662      ).
  663
  664get_work(Queue, Message, infinite) :-
  665    !,
  666    thread_get_message(Queue, Message).
  667get_work(Queue, Message, MaxIdle) :-
  668    (   thread_get_message(Queue, Message, [timeout(MaxIdle)])
  669    ->  true
  670    ;   Message = quit(idle)
  671    ).
 open_client(+Message, +Queue, -Goal, -In, -Out, +Options, -ClientOptions) is semidet
Opens the connection to the client in a worker from the message sent to the queue by accept_server/2.
  680open_client(requeue(In, Out, Goal, ClOpts),
  681            _, Goal, In, Out, Opts, ClOpts) :-
  682    !,
  683    memberchk(peer(Peer), ClOpts),
  684    option(keep_alive_timeout(KeepAliveTMO), Opts, 2),
  685    check_keep_alive_connection(In, KeepAliveTMO, Peer, In, Out).
  686open_client(Message, Queue, Goal, In, Out, Opts,
  687            [ pool(client(Queue, Goal, In, Out)),
  688              timeout(Timeout)
  689            | Options
  690            ]) :-
  691    catch(open_client(Message, Goal, In, Out, Options, Opts),
  692          E, report_error(E)),
  693    option(timeout(Timeout), Opts, 60),
  694    (   debugging(http(connection))
  695    ->  memberchk(peer(Peer), Options),
  696        debug(http(connection), 'Opened connection from ~p', [Peer])
  697    ;   true
  698    ).
 open_client(+Message, +Goal, -In, -Out, -ClientOptions, +Options) is det
  704open_client(Message, Goal, In, Out, ClientOptions, Options) :-
  705    open_client_hook(Message, Goal, In, Out, ClientOptions, Options),
  706    !.
  707open_client(tcp_client(Socket, Goal, Peer), Goal, In, Out,
  708            [ peer(Peer),
  709              protocol(http)
  710            ], _) :-
  711    tcp_open_socket(Socket, In, Out).
  712
  713report_error(E) :-
  714    print_message(error, E),
  715    fail.
 check_keep_alive_connection(+In, +TimeOut, +Peer, +In, +Out) is semidet
Wait for the client for at most TimeOut seconds. Succeed if the client starts a new request within this time. Otherwise close the connection and fail.
  724check_keep_alive_connection(In, TMO, Peer, In, Out) :-
  725    stream_property(In, timeout(Old)),
  726    set_stream(In, timeout(TMO)),
  727    debug(http(keep_alive), 'Waiting for keep-alive ...', []),
  728    catch(peek_code(In, Code), E, true),
  729    (   var(E),                     % no exception
  730        Code \== -1                 % no end-of-file
  731    ->  set_stream(In, timeout(Old)),
  732        debug(http(keep_alive), '\tre-using keep-alive connection', [])
  733    ;   (   Code == -1
  734        ->  debug(http(keep_alive), '\tRemote closed keep-alive connection', [])
  735        ;   debug(http(keep_alive), '\tTimeout on keep-alive connection', [])
  736        ),
  737        close_connection(Peer, In, Out),
  738        fail
  739    ).
 done_worker
Called when worker is terminated due to http_workers/2 or a (debugging) exception. In the latter case, recreate_worker/2 creates a new worker.
  748done_worker :-
  749    thread_self(Self),
  750    thread_detach(Self),
  751    retract(queue_worker(Queue, Self)),
  752    thread_property(Self, status(Status)),
  753    !,
  754    (   catch(recreate_worker(Status, Queue), _, fail)
  755    ->  print_message(informational,
  756                      httpd_restarted_worker(Self))
  757    ;   done_status_message_level(Status, Level),
  758        print_message(Level,
  759                      httpd_stopped_worker(Self, Status))
  760    ).
  761done_worker :-                                  % received quit(Sender)
  762    thread_self(Self),
  763    thread_property(Self, status(Status)),
  764    done_status_message_level(Status, Level),
  765    print_message(Level,
  766                  httpd_stopped_worker(Self, Status)).
  767
  768done_status_message_level(true, silent) :- !.
  769done_status_message_level(exception('$aborted'), silent) :- !.
  770done_status_message_level(_, informational).
 recreate_worker(+Status, +Queue) is semidet
Deal with the possibility that threads are, during development, killed with abort/0. We recreate the worker to avoid that eventually we run out of workers. If we are aborted due to a halt/0 call, thread_create/3 will raise a permission error.

The first clause deals with the possibility that we cannot write to user_error. This is possible when Prolog is started as a service using some service managers. Would be nice if we could write an error, but where?

  785recreate_worker(exception(error(io_error(write,user_error),_)), _Queue) :-
  786    halt(2).
  787recreate_worker(exception(Error), Queue) :-
  788    recreate_on_error(Error),
  789    queue_options(Queue, Options),
  790    atom_concat(Queue, '_', AliasBase),
  791    create_workers(1, 1, Queue, AliasBase, Options).
  792
  793recreate_on_error('$aborted').
  794recreate_on_error(time_limit_exceeded).
 thread_httpd:message_level(+Exception, -Level)
Determine the message stream used for exceptions that may occur during server_loop/5. Being multifile, clauses can be added by the application to refine error handling. See also message_hook/3 for further programming error handling.
  803:- multifile
  804    message_level/2.  805
  806message_level(error(io_error(read, _), _),               silent).
  807message_level(error(socket_error(epipe,_), _),           silent).
  808message_level(error(http_write_short(_Obj,_Written), _), silent).
  809message_level(error(timeout_error(read, _), _),          informational).
  810message_level(keep_alive_timeout,                        silent).
  811
  812current_message_level(Term, Level) :-
  813    (   message_level(Term, Level)
  814    ->  true
  815    ;   Level = error
  816    ).
 http_requeue(+Header)
Re-queue a connection to the worker pool. This deals with processing additional requests on keep-alive connections.
  824http_requeue(Header) :-
  825    requeue_header(Header, ClientOptions),
  826    memberchk(pool(client(Queue, Goal, In, Out)), ClientOptions),
  827    memberchk(peer(Peer), ClientOptions),
  828    http_enough_workers(Queue, keep_alive, Peer),
  829    thread_send_message(Queue, requeue(In, Out, Goal, ClientOptions)),
  830    !.
  831http_requeue(Header) :-
  832    debug(http(error), 'Re-queue failed: ~p', [Header]),
  833    fail.
  834
  835requeue_header([], []).
  836requeue_header([H|T0], [H|T]) :-
  837    requeue_keep(H),
  838    !,
  839    requeue_header(T0, T).
  840requeue_header([_|T0], T) :-
  841    requeue_header(T0, T).
  842
  843requeue_keep(pool(_)).
  844requeue_keep(peer(_)).
  845requeue_keep(protocol(_)).
 http_process(Message, Queue, +Options)
Handle a single client message on the given stream.
  852http_process(Goal, In, Out, Options) :-
  853    debug(http(server), 'Running server goal ~p on ~p -> ~p',
  854          [Goal, In, Out]),
  855    option(timeout(TMO), Options, 60),
  856    set_stream(In, timeout(TMO)),
  857    set_stream(Out, timeout(TMO)),
  858    http_wrapper(Goal, In, Out, Connection,
  859                 [ request(Request)
  860                 | Options
  861                 ]),
  862    next(Connection, Request).
  863
  864next(Connection, Request) :-
  865    next_(Connection, Request), !.
  866next(Connection, Request) :-
  867    print_message(warning, goal_failed(next(Connection,Request))).
  868
  869next_(switch_protocol(SwitchGoal, _SwitchOptions), Request) :-
  870    !,
  871    memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
  872    (   catch(call(SwitchGoal, In, Out), E,
  873              (   print_message(error, E),
  874                  fail))
  875    ->  true
  876    ;   http_close_connection(Request)
  877    ).
  878next_(spawned(ThreadId), _) :-
  879    !,
  880    debug(http(spawn), 'Handler spawned to thread ~w', [ThreadId]).
  881next_(Connection, Request) :-
  882    downcase_atom(Connection, 'keep-alive'),
  883    http_requeue(Request),
  884    !.
  885next_(_, Request) :-
  886    http_close_connection(Request).
 http_close_connection(+Request)
Close connection associated to Request. See also http_requeue/1.
  893http_close_connection(Request) :-
  894    memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
  895    memberchk(peer(Peer), Request),
  896    close_connection(Peer, In, Out).
 close_connection(+Peer, +In, +Out)
Closes the connection from the server to the client. Errors are currently silently ignored.
  903close_connection(Peer, In, Out) :-
  904    debug(http(connection), 'Closing connection from ~p', [Peer]),
  905    catch(close(In, [force(true)]), _, true),
  906    catch(close(Out, [force(true)]), _, true).
 http_spawn(:Goal, +Options) is det
Continue this connection on a new thread. A handler may call http_spawn/2 to start a new thread that continues processing the current request using Goal. The original thread returns to the worker pool for processing new requests. Options are passed to thread_create/3, except for:
pool(+Pool)
Interfaces to library(thread_pool), starting the thread on the given pool.

If a pool does not exist, this predicate calls the multifile hook http:create_pool/1 to create it. If this predicate succeeds the operation is retried.

  924http_spawn(Goal, Options) :-
  925    select_option(pool(Pool), Options, ThreadOptions),
  926    !,
  927    current_output(CGI),
  928    catch(thread_create_in_pool(Pool,
  929                                wrap_spawned(CGI, Goal), Id,
  930                                [ detached(true)
  931                                | ThreadOptions
  932                                ]),
  933          Error,
  934          true),
  935    (   var(Error)
  936    ->  http_spawned(Id)
  937    ;   Error = error(resource_error(threads_in_pool(_)), _)
  938    ->  throw(http_reply(busy))
  939    ;   Error = error(existence_error(thread_pool, Pool), _),
  940        create_pool(Pool)
  941    ->  http_spawn(Goal, Options)
  942    ;   throw(Error)
  943    ).
  944http_spawn(Goal, Options) :-
  945    current_output(CGI),
  946    thread_create(wrap_spawned(CGI, Goal), Id,
  947                  [ detached(true)
  948                  | Options
  949                  ]),
  950    http_spawned(Id).
  951
  952wrap_spawned(CGI, Goal) :-
  953    set_output(CGI),
  954    http_wrap_spawned(Goal, Request, Connection),
  955    next(Connection, Request).
 create_pool(+Pool)
Lazy creation of worker-pools for the HTTP server. This predicate calls the hook http:create_pool/1. If the hook fails it creates a default pool of size 10. This should suffice most typical usecases. Note that we get a permission error if the pool is already created. We can ignore this.
  965create_pool(Pool) :-
  966    E = error(permission_error(create, thread_pool, Pool), _),
  967    catch(http:create_pool(Pool), E, true).
  968create_pool(Pool) :-
  969    print_message(informational, httpd(created_pool(Pool))),
  970    thread_pool_create(Pool, 10, []).
  971
  972
  973		 /*******************************
  974		 *         WAIT POLICIES	*
  975		 *******************************/
  976
  977:- meta_predicate
  978    thread_repeat_wait(0).
 thread_repeat_wait(:Goal) is multi
Acts as repeat, thread_idle(Goal), choosing whether to use a long or short idle time based on the average firing rate.
  985thread_repeat_wait(Goal) :-
  986    new_rate_mma(5, 1000, State),
  987    repeat,
  988      update_rate_mma(State, MMA),
  989      long(MMA, IsLong),
  990      (   IsLong == brief
  991      ->  call(Goal)
  992      ;   thread_idle(Goal, IsLong)
  993      ).
  994
  995long(MMA, brief) :-
  996    MMA < 0.05,
  997    !.
  998long(MMA, short) :-
  999    MMA < 1,
 1000    !.
 1001long(_, long).
 new_rate_mma(+N, +Resolution, -State) is det
 update_rate_mma(!State, -MMA) is det
Implement Modified Moving Average computing the average time between requests as an exponential moving averate with alpha=1/N.
Arguments:
Resolution- is the time resolution in 1/Resolution seconds. All storage is done in integers to avoid the need for stack freezing in nb_setarg/3.
See also
- https://en.wikipedia.org/wiki/Moving_average
 1015new_rate_mma(N, Resolution, rstate(Base, 0, MaxI, Resolution, N, 0)) :-
 1016    current_prolog_flag(max_tagged_integer, MaxI),
 1017    get_time(Base).
 1018
 1019update_rate_mma(State, MMAr) :-
 1020    State = rstate(Base, Last, MaxI, Resolution, N, MMA0),
 1021    get_time(Now),
 1022    Stamp is round((Now-Base)*Resolution),
 1023    (   Stamp > MaxI
 1024    ->  nb_setarg(1, State, Now),
 1025        nb_setarg(2, State, 0)
 1026    ;   true
 1027    ),
 1028    Diff is Stamp-Last,
 1029    nb_setarg(2, State, Stamp),
 1030    MMA is round(((N-1)*MMA0+Diff)/N),
 1031    nb_setarg(6, State, MMA),
 1032    MMAr is MMA/float(Resolution).
 1033
 1034
 1035                 /*******************************
 1036                 *            MESSAGES          *
 1037                 *******************************/
 1038
 1039:- multifile
 1040    prolog:message/3. 1041
 1042prolog:message(httpd_started_server(Port, Options)) -->
 1043    [ 'Started server at '-[] ],
 1044    http_root(Port, Options).
 1045prolog:message(httpd_stopped_worker(Self, Status)) -->
 1046    [ 'Stopped worker ~p: ~p'-[Self, Status] ].
 1047prolog:message(httpd_restarted_worker(Self)) -->
 1048    [ 'Replaced aborted worker ~p'-[Self] ].
 1049prolog:message(httpd(created_pool(Pool))) -->
 1050    [ 'Created thread-pool ~p of size 10'-[Pool], nl,
 1051      'Create this pool at startup-time or define the hook ', nl,
 1052      'http:create_pool/1 to avoid this message and create a ', nl,
 1053      'pool that fits the usage-profile.'
 1054    ].
 1055
 1056http_root(Address, Options) -->
 1057    { landing_page(Address, URI, Options) },
 1058    [ '~w'-[URI] ].
 1059
 1060landing_page(Host:Port, URI, Options) :-
 1061    !,
 1062    must_be(atom, Host),
 1063    must_be(integer, Port),
 1064    http_server_property(Port, scheme(Scheme)),
 1065    (   default_port(Scheme, Port)
 1066    ->  format(atom(Base), '~w://~w', [Scheme, Host])
 1067    ;   format(atom(Base), '~w://~w:~w', [Scheme, Host, Port])
 1068    ),
 1069    entry_page(Base, URI, Options).
 1070landing_page(unix_socket(Path), URI, _Options) :-
 1071    !,
 1072    format(string(URI), 'Unix domain socket "~w"', [Path]).
 1073landing_page(Port, URI, Options) :-
 1074    landing_page(localhost:Port, URI, Options).
 1075
 1076default_port(http, 80).
 1077default_port(https, 443).
 1078
 1079entry_page(Base, URI, Options) :-
 1080    option(entry_page(Entry), Options),
 1081    !,
 1082    uri_resolve(Entry, Base, URI).
 1083entry_page(Base, URI, _) :-
 1084    http_absolute_location(root(.), Entry, []),
 1085    uri_resolve(Entry, Base, URI)