View source with raw comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jan Wielemaker
    4    E-mail:        J.Wielemaker@vu.nl
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2014-2020, VU University Amsterdam
    7                              CWI Amsterdam
    8    All rights reserved.
    9
   10    Redistribution and use in source and binary forms, with or without
   11    modification, are permitted provided that the following conditions
   12    are met:
   13
   14    1. Redistributions of source code must retain the above copyright
   15       notice, this list of conditions and the following disclaimer.
   16
   17    2. Redistributions in binary form must reproduce the above copyright
   18       notice, this list of conditions and the following disclaimer in
   19       the documentation and/or other materials provided with the
   20       distribution.
   21
   22    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   23    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   24    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   25    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   26    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   27    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   28    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   29    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   30    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   31    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   32    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   33    POSSIBILITY OF SUCH DAMAGE.
   34*/
   35
   36:- module(hub,
   37          [ hub_create/3,               % +HubName, -Hub, +Options
   38            hub_add/3,                  % +HubName, +Websocket, ?Id
   39            hub_member/2,               % +HubName, ?Id
   40            hub_send/2,                 % +ClientId, +Message
   41            hub_broadcast/2,            % +HubName, +Message
   42            hub_broadcast/3,            % +HubName, +Message, +Condition
   43            current_hub/2               % ?HubName, ?Hub
   44          ]).   45:- use_module(library(debug)).   46:- use_module(library(error)).   47:- use_module(library(apply)).   48:- use_module(library(gensym)).   49:- if(exists_source(library(uuid))).   50:- use_module(library(uuid)).   51:- endif.   52:- use_module(library(ordsets)).   53:- use_module(library(http/websocket)).   54
   55:- meta_predicate
   56    hub_broadcast(+,+,1).

Manage a hub for websockets

This library manages a hub that consists of clients that are connected using a websocket. Messages arriving at any of the websockets are sent to the event queue of the hub. In addition, the hub provides a broadcast interface. A typical usage scenario for a hub is a chat server A scenario for realizing an chat server is:

  1. Create a new hub using hub_create/3.
  2. Create one or more threads that listen to Hub.queues.event from the created hub. These threads can update the shared view of the world. A message is a dict as returned by ws_receive/2 or a hub control message. Currently, the following control messages are defined:
    hub{error:Error, left:ClientId, reason:Reason}
    A client left us because of an I/O error. Reason is read or write and Error is the Prolog I/O exception.
    hub{joined:ClientId}
    A new client has joined the chatroom.

    The thread(s) can talk to clients using two predicates:

A hub consists of (currenty) four message queues and a simple dynamic fact. Threads that are needed for the communication tasks are created on demand and die if no more work needs to be done.

To be done
- The current design does not use threads to perform tasks for multiple hubs. This implies that the design scales rather poorly for hosting many hubs with few users. */
   95:- dynamic
   96    hub/2,                          % Hub, Queues ...
   97    websocket/5.                    % Hub, Socket, Queue, Lock, Id
   98
   99:- volatile hub/2, websocket/5.
 hub_create(+Name, -Hub, +Options) is det
Create a new hub. Hub is a dict containing the following public information:
Hub.name
The name of the hub (the Name argument)
queues.event
Message queue to which the hub thread(s) can listen.

After creating a hub, the application normally creates a thread that listens to Hub.queues.event and exposes some mechanisms to establish websockets and add them to the hub using hub_add/3.

See also
- http_upgrade_to_websocket/3 establishes a websocket from the SWI-Prolog webserver.
  118hub_create(HubName, Hub, _Options) :-
  119    must_be(atom, HubName),
  120    message_queue_create(WaitQueue),
  121    message_queue_create(ReadyQueue),
  122    message_queue_create(EventQueue),
  123    message_queue_create(BroadcastQueue),
  124    Hub = hub{name:HubName,
  125              queues:_{wait:WaitQueue,
  126                       ready:ReadyQueue,
  127                       event:EventQueue,
  128                       broadcast:BroadcastQueue
  129                      }},
  130    assertz(hub(HubName, Hub)).
 current_hub(?Name, ?Hub) is nondet
True when there exists a hub Hub with Name.
  137current_hub(HubName, Hub) :-
  138    hub(HubName, Hub).
  139
  140
  141                 /*******************************
  142                 *            WAITERS           *
  143                 *******************************/
  144
  145/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  146The task of this layer is to wait   for  (a potentially large number of)
  147websockets. Whenever there is data on one   of these sockets, the socket
  148is handed to Hub.queues.ready. This is realised using wait_for_input/3,
  149which allows a single thread  to  wait   for  many  sockets.  But ... on
  150Windows it allows to wait for at most  64 sockets. In addition, there is
  151no way to add an additional input   for control messages because Windows
  152select() can only wait for sockets. On Unix   we could use pipe/2 to add
  153the control channal. On Windows  we   would  need  an additional network
  154service, giving rise its own  problems   with  allocation, firewalls and
  155security.
  156
  157So, instead we keep a queue of websockets   that  need to be waited for.
  158Whenever we add a  websocket,  we  create   a  waiter  thread  that will
  159typically start waiting for this socket.   In  addition, we schedule any
  160waiting thread that has less  than  the   maximum  number  of sockets to
  161timeout at as good as we can the same   time.  All of them will hunt for
  162the same set of queues,  but  they  have   to  wait  for  each other and
  163therefore most of the time one thread will walk away with all websockets
  164and the others commit suicide because there is nothing to wait for.
  165- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
  166
  167:- meta_predicate
  168    hub_thread(0, +, +).
 hub_add(+Hub, +WebSocket, ?Id) is det
Add a WebSocket to the hub. Id is used to identify this user. It may be provided (as a ground term) or is generated as a UUID.
  175hub_add(HubName, WebSocket, Id) :-
  176    must_be(atom, HubName),
  177    hub(HubName, Hub),
  178    (   var(Id)
  179    ->  uuid(Id)
  180    ;   true
  181    ),
  182    message_queue_create(OutputQueue),
  183    mutex_create(Lock),
  184                                         % asserta/1 allows for reuse of Id
  185    asserta(websocket(HubName, WebSocket, OutputQueue, Lock, Id)),
  186    thread_send_message(Hub.queues.wait, WebSocket),
  187    thread_send_message(Hub.queues.event,
  188                        hub{joined:Id}),
  189    debug(hub(gate), 'Joined ~w: ~w', [HubName, Id]),
  190    create_wait_thread(Hub).
 hub_member(?HubName, ?Id) is nondet
True when Id is a member of the hub HubName.
  196hub_member(HubName, Id) :-
  197    websocket(HubName, _WebSocket, _OutputQueue, _Lock, Id).
  198
  199:- if(\+current_predicate(uuid/1)).  200% FIXME: Proper pure Prolog random UUID implementation
  201uuid(UUID) :-
  202    A is random(1<<63),
  203    format(atom(UUID), '~d', [A]).
  204:- endif.  205
  206create_wait_thread(Hub) :-
  207    hub_thread(wait_for_sockets(Hub), Hub, hub_wait_).
  208
  209wait_for_sockets(Hub) :-
  210    wait_for_sockets(Hub, 64).
  211
  212wait_for_sockets(Hub, Max) :-
  213    Queues = Hub.queues,
  214    repeat,
  215      get_messages(Queues.wait, Max, List),
  216      (   List \== []
  217      ->  create_new_waiter_if_needed(Hub),
  218          sort(List, Set),
  219          (   debugging(hub(wait))
  220          ->  length(Set, Len),
  221              debug(hub(wait), 'Waiting for ~d queues', [Len])
  222          ;   true
  223          ),
  224          wait_for_set(Set, Left, ReadySet, Max),
  225          (   ReadySet \== []
  226          ->  debug(hub(ready), 'Data on ~p', [ReadySet]),
  227              Ready = Queues.ready,
  228              maplist(thread_send_message(Ready), ReadySet),
  229              create_reader_threads(Hub),
  230              ord_subtract(Set, ReadySet, NotReadySet)
  231          ;   NotReadySet = Left             % timeout
  232          ),
  233          (   NotReadySet \== []
  234          ->  debug(hub(wait), 'Re-scheduling: ~p', [NotReadySet]),
  235              Wait = Queues.wait,
  236              maplist(thread_send_message(Wait), NotReadySet),
  237              fail
  238          ;   true
  239          )
  240      ;   !
  241      ).
  242
  243create_new_waiter_if_needed(Hub) :-
  244    message_queue_property(Hub.queues.wait, size(0)),
  245    !.
  246create_new_waiter_if_needed(Hub) :-
  247    create_wait_thread(Hub).
 wait_for_set(+Set0, -Left, -Ready, +Max) is det
Wait for input from Set0. Note that Set0 may contain closed websockets.
  254wait_for_set([], [], [], _) :-
  255    !.
  256wait_for_set(Set0, Set, ReadySet, Max) :-
  257    wait_timeout(Set0, Max, Timeout),
  258    catch(wait_for_input(Set0, ReadySet, Timeout),
  259          error(existence_error(stream, S), _), true),
  260    (   var(S)
  261    ->  Set = Set0
  262    ;   delete(Set0, S, Set1),
  263        wait_for_set(Set1, Set, ReadySet, Max)
  264    ).
 wait_timeout(+WaitForList, +Max, -TimeOut) is det
Determine the timeout, such that multiple threads waiting for less than the maximum number of sockets time out at the same moment and we can combine them on a single thread.
  273:- dynamic
  274    scheduled_timeout/1.  275
  276wait_timeout(List, Max, Timeout) :-
  277    length(List, Max),
  278    !,
  279    Timeout = infinite.
  280wait_timeout(_, _, Timeout) :-
  281    get_time(Now),
  282    (   scheduled_timeout(SchedAt)
  283    ->  (   SchedAt > Now
  284        ->  At = SchedAt
  285        ;   retractall(scheduled_timeout(_)),
  286            At is ceiling(Now) + 1,
  287            asserta(scheduled_timeout(At))
  288        )
  289    ;   At is ceiling(Now) + 1,
  290        asserta(scheduled_timeout(At))
  291    ),
  292    Timeout is At - Now.
 get_messages(+Queue, +Max, -List) is det
Get the next Max messages from Queue or as many as there are available without blocking very long. This routine is designed such that if multiple threads are running for messages, one gets all of them and the others nothing.
  302get_messages(Q, N, List) :-
  303    with_mutex(hub_wait,
  304               get_messages_sync(Q, N, List)).
  305
  306get_messages_sync(Q, N, [H|T]) :-
  307    succ(N2, N),
  308    thread_get_message(Q, H, [timeout(0.01)]),
  309    !,
  310    get_messages_sync(Q, N2, T).
  311get_messages_sync(_, _, []).
  312
  313
  314                 /*******************************
  315                 *            READERS           *
  316                 *******************************/
  317
  318/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  319The next layer consists of `readers'.   Whenever  one or more websockets
  320have   data,   the   socket   is    added   to   Hub.queues.ready   and
  321create_reader_threads/1 is called. This  examines   the  number of ready
  322sockets and fires a number  of  threads   to  handle  the read requests.
  323Multiple threads are mainly needed for the case that a client signals to
  324be  ready,  but  only  provides  an   incomplete  message,  causing  the
  325ws_receive/2 to block.
  326
  327Each  of  the  threads  reads  the  next   message  and  sends  this  to
  328Hub.queues.event. The websocket is then rescheduled   to listen for new
  329events. This read either fires a thread   to  listen for the new waiting
  330socket using create_wait_thread/1 or, if there   are no more websockets,
  331does this job itself. This  deals  with   the  common  scenario that one
  332client wakes up, starts a thread to  read   its  event and waits for new
  333messages on the same websockets.
  334- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
  335
  336create_reader_threads(Hub) :-
  337    message_queue_property(Hub.queues.ready, size(Ready)),
  338    Threads is ceiling(sqrt(Ready)),
  339    forall(between(1, Threads, _),
  340           create_reader_thread(Hub)).
  341
  342create_reader_thread(Hub) :-
  343    hub_thread(read_message(Hub), Hub, hub_read_ws_).
  344
  345read_message(Hub) :-
  346    Queues = Hub.queues,
  347    thread_get_message(Queues.ready, WS, [timeout(0)]),
  348    !,
  349    catch(ws_receive(WS, Message), Error, true),
  350    (   var(Error),
  351        websocket(HubName, WS, _, _, Id)
  352    ->  (   Message.get(opcode) == close
  353        ->  close_client(WS, Message)
  354        ;   Event = Message.put(_{client:Id, hub:HubName}),
  355            debug(hub(event), 'Event: ~p', [Event]),
  356            thread_send_message(Queues.event, Event),
  357            (   Message.get(opcode) == close
  358            ->  CloseError = error(_,_),
  359                catch(ws_close(WS, 1000, ""), CloseError,
  360                      ws_warning(CloseError))
  361            ;   thread_send_message(Queues.wait, WS)
  362            ),
  363            (   message_queue_property(Queues.ready, size(0))
  364            ->  !,
  365                wait_for_sockets(Hub)
  366            ;   create_wait_thread(Hub),
  367                read_message(Hub)
  368            )
  369        )
  370    ;   websocket(_, WS, _, _, _)
  371    ->  io_read_error(WS, Error),
  372        read_message(Hub)
  373    ;   read_message(Hub)                   % already destroyed
  374    ).
  375read_message(_).
  376
  377ws_warning(error(Formal, _)) :-
  378    silent(Formal),
  379    !.
  380ws_warning(Error) :-
  381    print_message(warning, Error).
  382
  383silent(socket_error(epipe, _)).
 io_read_error(+WebSocket, +Error)
Called on a read error from WebSocket. We close the websocket and send the hub an event that we lost the connection to the specified client. Note that we leave destruction of the anonymous message queue and mutex to the Prolog garbage collector.
  392io_read_error(WebSocket, Error) :-
  393    debug(hub(gate), 'Got read error on ~w: ~p',
  394          [WebSocket, Error]),
  395    retract(websocket(HubName, WebSocket, _Queue, _Lock, Id)),
  396    !,
  397    E = error(_,_),
  398    catch(ws_close(WebSocket, 1011, Error), E,
  399          ws_warning(E)),
  400    hub(HubName, Hub),
  401    thread_send_message(Hub.queues.event,
  402                        hub{left:Id,
  403                            hub:HubName,
  404                            reason:read,
  405                            error:Error}).
  406io_read_error(_, _).                      % already considered gone
  407
  408close_client(WebSocket, Message) :-
  409    Message.get(data) == end_of_file,
  410    !,
  411    io_read_error(WebSocket, end_of_file).
  412close_client(WebSocket, Message) :-
  413    retract(websocket(HubName, WebSocket, _Queue, _Lock, Id)),
  414    !,
  415    E = error(_,_),
  416    catch(ws_close(WebSocket, 1000, "Bye"), E,
  417          ws_warning(E)),
  418    hub(HubName, Hub),
  419    thread_send_message(Hub.queues.event,
  420                        hub{left:Id,
  421                            hub:HubName,
  422                            reason:close,
  423                            data:Message.data
  424                           }).
 io_write_error(+WebSocket, +Message, +Error)
Failed to write Message to WebSocket due to Error. Note that this may be a pending but closed WebSocket. We first check whether there is a new one and if not send a left message and pass the error such that the client can re-send it when appropriate.
  433io_write_error(WebSocket, Message, Error) :-
  434    debug(hub(gate), 'Got write error on ~w: ~p',
  435          [WebSocket, Error]),
  436    retract(websocket(HubName, WebSocket, _Queue, _Lock, Id)),
  437    !,
  438    catch(ws_close(WebSocket, 1011, Error), _, true),
  439    (   websocket(_, _, _, _, Id)
  440    ->  true
  441    ;   hub(HubName, Hub),
  442        thread_send_message(Hub.queues.event,
  443                            hub{left:Id,
  444                                hub:HubName,
  445                                reason:write(Message),
  446                                error:Error})
  447    ).
  448io_write_error(_, _, _).                      % already considered gone
  449
  450
  451                 /*******************************
  452                 *        SENDING MESSAGES      *
  453                 *******************************/
  454
  455/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  456My  initial  thought  about  sending  messages    was  to  add  a  tuple
  457WebSocket-Message to an output  queue  and   have  a  dynamic  number of
  458threads sending these messages to the   websockets. But, it is desirable
  459that, if multiple messages are sent to  a particular client, they arrive
  460in this order. As multiple threads are performing this task, this is not
  461easy to guarantee. Therefore, we create an  output queue and a mutex for
  462each client. An output thread will   walk  along the websockets, looking
  463for one that has pending messages.  It   then  grabs the lock associated
  464with the client and sends all waiting output messages.
  465
  466The price is that we might peek   a significant number of message queues
  467before we find one that  contains  messages.   If  this  proves  to be a
  468significant problem, we  could  maintain  a   queue  of  queues  holding
  469messages.
  470- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
 hub_send(+ClientId, +Message) is semidet
Send message to the indicated ClientId. Fails silently if ClientId does not exist.
Arguments:
Message- is either a single message (as accepted by ws_send/2) or a list of such messages.
  480hub_send(ClientId, Message) :-
  481    websocket(HubName, _WS, ClientQueue, _Lock, ClientId),
  482    hub(HubName, Hub),
  483    (   is_list(Message)
  484    ->  maplist(queue_output(ClientQueue), Message)
  485    ;   queue_output(ClientQueue, Message)
  486    ),
  487    create_output_thread(Hub, ClientQueue).
  488
  489create_output_thread(Hub, Queue) :-
  490    hub_thread(broadcast_from_queue(Queue, [timeout(0)]),
  491               Hub, hub_out_q_).
 hub_broadcast(+Hub, +Message) is det
 hub_broadcast(+Hub, +Message, :Condition) is det
Send Message to all websockets associated with Hub for which call(Condition, Id) succeeds. Note that this process is asynchronous: this predicate returns immediately after putting all requests in a broadcast queue. If a message cannot be delivered due to a network error, the hub is informed through io_error/3.
  503hub_broadcast(HubName, Message) :-
  504    hub_broadcast(HubName, Message, all).
  505
  506all(_).
  507
  508hub_broadcast(HubName, Message, Condition) :-
  509    must_be(atom, HubName),
  510    hub(HubName, Hub),
  511    State = count(0),
  512    forall(( websocket(HubName, _WS, ClientQueue, _Lock, Id),
  513             call(Condition, Id)
  514           ),
  515           ( queue_output(ClientQueue, Message),
  516             inc_count(State)
  517           )),
  518    State = count(Count),
  519    create_broadcast_threads(Hub, Count).
  520
  521queue_output(Queue, Message) :-
  522    thread_send_message(Queue, Message).
  523
  524inc_count(State) :-
  525    arg(1, State, C0),
  526    C1 is C0+1,
  527    nb_setarg(1, State, C1).
  528
  529create_broadcast_threads(Hub, Count) :-
  530    Threads is ceiling(sqrt(Count)),
  531    forall(between(1, Threads, _),
  532           create_broadcast_thread(Hub)).
  533
  534create_broadcast_thread(Hub) :-
  535    hub_thread(broadcast_from_queues(Hub, [timeout(0)]),
  536                    Hub, hub_out_all_).
 broadcast_from_queues(+Hub, +Options) is det
Broadcast from over all known queues.
  543broadcast_from_queues(Hub, Options) :-
  544    forall(websocket(Hub.name, _WebSocket, Queue, _Lock, _Id),
  545           broadcast_from_queue(Queue, Options)).
 broadcast_from_queue(+Queue, +Options) is det
Send all messages pending for Queue. Note that this predicate locks the mutex associated with the Queue, such that other workers cannot start sending messages to this client. Concurrent sending would lead to out-of-order arrival of broadcast messages. If the mutex is already held, someone else is processing this message queue, so we don't have to worry.
  557broadcast_from_queue(Queue, _Options) :-
  558    message_queue_property(Queue, size(0)),
  559    !.
  560broadcast_from_queue(Queue, Options) :-
  561    websocket(_Hub, _WebSocket, Queue, Lock, _Id),
  562    !,
  563    (   setup_call_cleanup(
  564            mutex_trylock(Lock),
  565            broadcast_from_queue_sync(Queue, Options),
  566            mutex_unlock(Lock))
  567    ->  true
  568    ;   true
  569    ).
  570broadcast_from_queue(_, _).
  571
  572% Note that we re-fetch websocket/5, such that we terminate if something
  573% closed the websocket.
  574
  575broadcast_from_queue_sync(Queue, Options) :-
  576    repeat,
  577      (   websocket(_Hub, WebSocket, Queue, _Lock, _Id),
  578          thread_get_message(Queue, Message, Options)
  579      ->  debug(hub(broadcast),
  580                'To: ~p messages: ~p', [WebSocket, Message]),
  581          catch(ws_send(WebSocket, Message), E,
  582                io_write_error(WebSocket, Message, E)),
  583          fail
  584      ;   !
  585      ).
 hub_thread(:Goal, +Hub, +Task) is det
Create a (temporary) thread for the hub to perform Task. We created named threads if debugging hub(thread) is enabled.
  592hub_thread(Goal, _, Task) :-
  593    debugging(hub(thread)),
  594    !,
  595    gensym(Task, Alias),
  596    thread_create(Goal, _, [detached(true), alias(Alias)]).
  597hub_thread(Goal, _, _) :-
  598    thread_create(Goal, _, [detached(true)])