1/* Part of SWI-Prolog 2 3 Author: Jan Wielemaker 4 E-mail: J.Wielemaker@vu.nl 5 WWW: http://www.swi-prolog.org 6 Copyright (c) 2014-2020, VU University Amsterdam 7 CWI Amsterdam 8 All rights reserved. 9 10 Redistribution and use in source and binary forms, with or without 11 modification, are permitted provided that the following conditions 12 are met: 13 14 1. Redistributions of source code must retain the above copyright 15 notice, this list of conditions and the following disclaimer. 16 17 2. Redistributions in binary form must reproduce the above copyright 18 notice, this list of conditions and the following disclaimer in 19 the documentation and/or other materials provided with the 20 distribution. 21 22 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 25 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 26 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 27 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 28 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 29 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 30 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 31 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 32 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 33 POSSIBILITY OF SUCH DAMAGE. 34*/ 35 36:- module(hub, 37 [ hub_create/3, % +HubName, -Hub, +Options 38 hub_add/3, % +HubName, +Websocket, ?Id 39 hub_member/2, % +HubName, ?Id 40 hub_send/2, % +ClientId, +Message 41 hub_broadcast/2, % +HubName, +Message 42 hub_broadcast/3, % +HubName, +Message, +Condition 43 current_hub/2 % ?HubName, ?Hub 44 ]). 45:- use_module(library(debug)). 46:- use_module(library(error)). 47:- use_module(library(apply)). 48:- use_module(library(gensym)). 49:- if(exists_source(library(uuid))). 50:- use_module(library(uuid)). 51:- endif. 52:- use_module(library(ordsets)). 53:- use_module(library(http/websocket)). 54 55:- meta_predicate 56 hub_broadcast( , , ). 57 58/** <module> Manage a hub for websockets 59 60This library manages a hub that consists of clients that are connected 61using a websocket. Messages arriving at any of the websockets are sent 62to the _event_ queue of the hub. In addition, the hub provides a 63_broadcast_ interface. A typical usage scenario for a hub is a _chat 64server_ A scenario for realizing an chat server is: 65 66 1. Create a new hub using hub_create/3. 67 2. Create one or more threads that listen to Hub.queues.event from 68 the created hub. These threads can update the shared view of the 69 world. A message is a dict as returned by ws_receive/2 or a 70 hub control message. Currently, the following control messages 71 are defined: 72 73 - hub{left:ClientId, reason:Reason, error:Error} 74 A client left us because of an I/O error. Reason is =read= 75 or =write= and Error is the Prolog I/O exception. 76 77 - hub{joined:ClientId} 78 A new client has joined the chatroom. 79 80 The thread(s) can talk to clients using two predicates: 81 82 - hub_send/2 sends a message to a specific client 83 - hub_broadcast/2 sends a message to all clients of the 84 hub. 85 86A hub consists of (currenty) four message queues and a simple dynamic 87fact. Threads that are needed for the communication tasks are created on 88demand and die if no more work needs to be done. 89 90@tbd The current design does not use threads to perform tasks for 91 multiple hubs. This implies that the design scales rather 92 poorly for hosting many hubs with few users. 93*/ 94 95:- dynamic 96 hub/2, % Hub, Queues ... 97 websocket/5. % Hub, Socket, Queue, Lock, Id 98 99:- volatile hub/2, websocket/5. 100 101%! hub_create(+Name, -Hub, +Options) is det. 102% 103% Create a new hub. Hub is a dict containing the following public 104% information: 105% 106% - Hub.name 107% The name of the hub (the Name argument) 108% - queues.event 109% Message queue to which the hub thread(s) can listen. 110% 111% After creating a hub, the application normally creates a thread 112% that listens to Hub.queues.event and exposes some mechanisms to 113% establish websockets and add them to the hub using hub_add/3. 114% 115% @see http_upgrade_to_websocket/3 establishes a websocket from 116% the SWI-Prolog webserver. 117 118hub_create(HubName, Hub, _Options) :- 119 must_be(atom, HubName), 120 message_queue_create(WaitQueue), 121 message_queue_create(ReadyQueue), 122 message_queue_create(EventQueue), 123 message_queue_create(BroadcastQueue), 124 Hub = hub{name:HubName, 125 queues:_{wait:WaitQueue, 126 ready:ReadyQueue, 127 event:EventQueue, 128 broadcast:BroadcastQueue 129 }}, 130 assertz(hub(HubName, Hub)). 131 132 133%! current_hub(?Name, ?Hub) is nondet. 134% 135% True when there exists a hub Hub with Name. 136 137current_hub(HubName, Hub) :- 138 hub(HubName, Hub). 139 140 141 /******************************* 142 * WAITERS * 143 *******************************/ 144 145/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 146The task of this layer is to wait for (a potentially large number of) 147websockets. Whenever there is data on one of these sockets, the socket 148is handed to Hub.queues.ready. This is realised using wait_for_input/3, 149which allows a single thread to wait for many sockets. But ... on 150Windows it allows to wait for at most 64 sockets. In addition, there is 151no way to add an additional input for control messages because Windows 152select() can only wait for sockets. On Unix we could use pipe/2 to add 153the control channal. On Windows we would need an additional network 154service, giving rise its own problems with allocation, firewalls and 155security. 156 157So, instead we keep a queue of websockets that need to be waited for. 158Whenever we add a websocket, we create a waiter thread that will 159typically start waiting for this socket. In addition, we schedule any 160waiting thread that has less than the maximum number of sockets to 161timeout at as good as we can the same time. All of them will hunt for 162the same set of queues, but they have to wait for each other and 163therefore most of the time one thread will walk away with all websockets 164and the others commit suicide because there is nothing to wait for. 165- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */ 166 167:- meta_predicate 168 hub_thread( , , ). 169 170%! hub_add(+Hub, +WebSocket, ?Id) is det. 171% 172% Add a WebSocket to the hub. Id is used to identify this user. It may 173% be provided (as a ground term) or is generated as a UUID. 174 175hub_add(HubName, WebSocket, Id) :- 176 must_be(atom, HubName), 177 hub(HubName, Hub), 178 ( var(Id) 179 -> uuid(Id) 180 ; true 181 ), 182 message_queue_create(OutputQueue), 183 mutex_create(Lock), 184 % asserta/1 allows for reuse of Id 185 asserta(websocket(HubName, WebSocket, OutputQueue, Lock, Id)), 186 thread_send_message(Hub.queues.wait, WebSocket), 187 thread_send_message(Hub.queues.event, 188 hub{joined:Id}), 189 debug(hub(gate), 'Joined ~w: ~w', [HubName, Id]), 190 create_wait_thread(Hub). 191 192%! hub_member(?HubName, ?Id) is nondet. 193% 194% True when Id is a member of the hub HubName. 195 196hub_member(HubName, Id) :- 197 websocket(HubName, _WebSocket, _OutputQueue, _Lock, Id). 198 199:- if(\+current_predicate(uuid/1)). 200% FIXME: Proper pure Prolog random UUID implementation 201uuid(UUID) :- 202 A is random(1<<63), 203 format(atom(UUID), '~d', [A]). 204:- endif. 205 206create_wait_thread(Hub) :- 207 hub_thread(wait_for_sockets(Hub), Hub, hub_wait_). 208 209wait_for_sockets(Hub) :- 210 wait_for_sockets(Hub, 64). 211 212wait_for_sockets(Hub, Max) :- 213 Queues = Hub.queues, 214 repeat, 215 get_messages(Queues.wait, Max, List), 216 ( List \== [] 217 -> create_new_waiter_if_needed(Hub), 218 sort(List, Set), 219 ( debugging(hub(wait)) 220 -> length(Set, Len), 221 debug(hub(wait), 'Waiting for ~d queues', [Len]) 222 ; true 223 ), 224 wait_for_set(Set, Left, ReadySet, Max), 225 ( ReadySet \== [] 226 -> debug(hub(ready), 'Data on ~p', [ReadySet]), 227 Ready = Queues.ready, 228 maplist(thread_send_message(Ready), ReadySet), 229 create_reader_threads(Hub), 230 ord_subtract(Set, ReadySet, NotReadySet) 231 ; NotReadySet = Left % timeout 232 ), 233 ( NotReadySet \== [] 234 -> debug(hub(wait), 'Re-scheduling: ~p', [NotReadySet]), 235 Wait = Queues.wait, 236 maplist(thread_send_message(Wait), NotReadySet), 237 fail 238 ; true 239 ) 240 ; ! 241 ). 242 243create_new_waiter_if_needed(Hub) :- 244 message_queue_property(Hub.queues.wait, size(0)), 245 !. 246create_new_waiter_if_needed(Hub) :- 247 create_wait_thread(Hub). 248 249%! wait_for_set(+Set0, -Left, -Ready, +Max) is det. 250% 251% Wait for input from Set0. Note that Set0 may contain closed 252% websockets. 253 254wait_for_set([], [], [], _) :- 255 !. 256wait_for_set(Set0, Set, ReadySet, Max) :- 257 wait_timeout(Set0, Max, Timeout), 258 catch(wait_for_input(Set0, ReadySet, Timeout), 259 error(existence_error(stream, S), _), true), 260 ( var(S) 261 -> Set = Set0 262 ; delete(Set0, S, Set1), 263 wait_for_set(Set1, Set, ReadySet, Max) 264 ). 265 266 267%! wait_timeout(+WaitForList, +Max, -TimeOut) is det. 268% 269% Determine the timeout, such that multiple threads waiting for 270% less than the maximum number of sockets time out at the same 271% moment and we can combine them on a single thread. 272 273:- dynamic 274 scheduled_timeout/1. 275 276wait_timeout(List, Max, Timeout) :- 277 length(List, Max), 278 !, 279 Timeout = infinite. 280wait_timeout(_, _, Timeout) :- 281 get_time(Now), 282 ( scheduled_timeout(SchedAt) 283 -> ( SchedAt > Now 284 -> At = SchedAt 285 ; retractall(scheduled_timeout(_)), 286 At is ceiling(Now) + 1, 287 asserta(scheduled_timeout(At)) 288 ) 289 ; At is ceiling(Now) + 1, 290 asserta(scheduled_timeout(At)) 291 ), 292 Timeout is At - Now. 293 294 295%! get_messages(+Queue, +Max, -List) is det. 296% 297% Get the next Max messages from Queue or as many as there are 298% available without blocking very long. This routine is designed 299% such that if multiple threads are running for messages, one gets 300% all of them and the others nothing. 301 302get_messages(Q, N, List) :- 303 with_mutex(hub_wait, 304 get_messages_sync(Q, N, List)). 305 306get_messages_sync(Q, N, [H|T]) :- 307 succ(N2, N), 308 thread_get_message(Q, H, [timeout(0.01)]), 309 !, 310 get_messages_sync(Q, N2, T). 311get_messages_sync(_, _, []). 312 313 314 /******************************* 315 * READERS * 316 *******************************/ 317 318/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 319The next layer consists of `readers'. Whenever one or more websockets 320have data, the socket is added to Hub.queues.ready and 321create_reader_threads/1 is called. This examines the number of ready 322sockets and fires a number of threads to handle the read requests. 323Multiple threads are mainly needed for the case that a client signals to 324be ready, but only provides an incomplete message, causing the 325ws_receive/2 to block. 326 327Each of the threads reads the next message and sends this to 328Hub.queues.event. The websocket is then rescheduled to listen for new 329events. This read either fires a thread to listen for the new waiting 330socket using create_wait_thread/1 or, if there are no more websockets, 331does this job itself. This deals with the common scenario that one 332client wakes up, starts a thread to read its event and waits for new 333messages on the same websockets. 334- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */ 335 336create_reader_threads(Hub) :- 337 message_queue_property(Hub.queues.ready, size(Ready)), 338 Threads is ceiling(sqrt(Ready)), 339 forall(between(1, Threads, _), 340 create_reader_thread(Hub)). 341 342create_reader_thread(Hub) :- 343 hub_thread(read_message(Hub), Hub, hub_read_ws_). 344 345read_message(Hub) :- 346 Queues = Hub.queues, 347 thread_get_message(Queues.ready, WS, [timeout(0)]), 348 !, 349 catch(ws_receive(WS, Message), Error, true), 350 ( var(Error), 351 websocket(HubName, WS, _, _, Id) 352 -> ( Message.get(opcode) == close 353 -> close_client(WS, Message) 354 ; Event = Message.put(_{client:Id, hub:HubName}), 355 debug(hub(event), 'Event: ~p', [Event]), 356 thread_send_message(Queues.event, Event), 357 ( Message.get(opcode) == close 358 -> CloseError = error(_,_), 359 catch(ws_close(WS, 1000, ""), CloseError, 360 ws_warning(CloseError)) 361 ; thread_send_message(Queues.wait, WS) 362 ), 363 ( message_queue_property(Queues.ready, size(0)) 364 -> !, 365 wait_for_sockets(Hub) 366 ; create_wait_thread(Hub), 367 read_message(Hub) 368 ) 369 ) 370 ; websocket(_, WS, _, _, _) 371 -> io_read_error(WS, Error), 372 read_message(Hub) 373 ; read_message(Hub) % already destroyed 374 ). 375read_message(_). 376 377ws_warning(error(Formal, _)) :- 378 silent(Formal), 379 !. 380ws_warning(Error) :- 381 print_message(warning, Error). 382 383silent(socket_error(epipe, _)). 384 385%! io_read_error(+WebSocket, +Error) 386% 387% Called on a read error from WebSocket. We close the websocket and 388% send the hub an event that we lost the connection to the specified 389% client. Note that we leave destruction of the anonymous message 390% queue and mutex to the Prolog garbage collector. 391 392io_read_error(WebSocket, Error) :- 393 debug(hub(gate), 'Got read error on ~w: ~p', 394 [WebSocket, Error]), 395 retract(websocket(HubName, WebSocket, _Queue, _Lock, Id)), 396 !, 397 E = error(_,_), 398 catch(ws_close(WebSocket, 1011, Error), E, 399 ws_warning(E)), 400 hub(HubName, Hub), 401 thread_send_message(Hub.queues.event, 402 hub{left:Id, 403 hub:HubName, 404 reason:read, 405 error:Error}). 406io_read_error(_, _). % already considered gone 407 408close_client(WebSocket, Message) :- 409 Message.get(data) == end_of_file, 410 !, 411 io_read_error(WebSocket, end_of_file). 412close_client(WebSocket, Message) :- 413 retract(websocket(HubName, WebSocket, _Queue, _Lock, Id)), 414 !, 415 E = error(_,_), 416 catch(ws_close(WebSocket, 1000, "Bye"), E, 417 ws_warning(E)), 418 hub(HubName, Hub), 419 thread_send_message(Hub.queues.event, 420 hub{left:Id, 421 hub:HubName, 422 reason:close, 423 data:Message.data 424 }). 425 426%! io_write_error(+WebSocket, +Message, +Error) 427% 428% Failed to write Message to WebSocket due to Error. Note that this 429% may be a pending but closed WebSocket. We first check whether there 430% is a new one and if not send a `left` message and pass the error 431% such that the client can re-send it when appropriate. 432 433io_write_error(WebSocket, Message, Error) :- 434 debug(hub(gate), 'Got write error on ~w: ~p', 435 [WebSocket, Error]), 436 retract(websocket(HubName, WebSocket, _Queue, _Lock, Id)), 437 !, 438 catch(ws_close(WebSocket, 1011, Error), _, true), 439 ( websocket(_, _, _, _, Id) 440 -> true 441 ; hub(HubName, Hub), 442 thread_send_message(Hub.queues.event, 443 hub{left:Id, 444 hub:HubName, 445 reason:write(Message), 446 error:Error}) 447 ). 448io_write_error(_, _, _). % already considered gone 449 450 451 /******************************* 452 * SENDING MESSAGES * 453 *******************************/ 454 455/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 456My initial thought about sending messages was to add a tuple 457WebSocket-Message to an output queue and have a dynamic number of 458threads sending these messages to the websockets. But, it is desirable 459that, if multiple messages are sent to a particular client, they arrive 460in this order. As multiple threads are performing this task, this is not 461easy to guarantee. Therefore, we create an output queue and a mutex for 462each client. An output thread will walk along the websockets, looking 463for one that has pending messages. It then grabs the lock associated 464with the client and sends all waiting output messages. 465 466The price is that we might peek a significant number of message queues 467before we find one that contains messages. If this proves to be a 468significant problem, we could maintain a queue of queues holding 469messages. 470- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */ 471 472%! hub_send(+ClientId, +Message) is semidet. 473% 474% Send message to the indicated ClientId. Fails silently if ClientId 475% does not exist. 476% 477% @arg Message is either a single message (as accepted by 478% ws_send/2) or a list of such messages. 479 480hub_send(ClientId, Message) :- 481 websocket(HubName, _WS, ClientQueue, _Lock, ClientId), 482 hub(HubName, Hub), 483 ( is_list(Message) 484 -> maplist(queue_output(ClientQueue), Message) 485 ; queue_output(ClientQueue, Message) 486 ), 487 create_output_thread(Hub, ClientQueue). 488 489create_output_thread(Hub, Queue) :- 490 hub_thread(broadcast_from_queue(Queue, [timeout(0)]), 491 Hub, hub_out_q_). 492 493%! hub_broadcast(+Hub, +Message) is det. 494%! hub_broadcast(+Hub, +Message, :Condition) is det. 495% 496% Send Message to all websockets associated with Hub for which 497% call(Condition, Id) succeeds. Note that this process is 498% _asynchronous_: this predicate returns immediately after putting 499% all requests in a broadcast queue. If a message cannot be 500% delivered due to a network error, the hub is informed through 501% io_error/3. 502 503hub_broadcast(HubName, Message) :- 504 hub_broadcast(HubName, Message, all). 505 506all(_). 507 508hub_broadcast(HubName, Message, Condition) :- 509 must_be(atom, HubName), 510 hub(HubName, Hub), 511 State = count(0), 512 forall(( websocket(HubName, _WS, ClientQueue, _Lock, Id), 513 call(Condition, Id) 514 ), 515 ( queue_output(ClientQueue, Message), 516 inc_count(State) 517 )), 518 State = count(Count), 519 create_broadcast_threads(Hub, Count). 520 521queue_output(Queue, Message) :- 522 thread_send_message(Queue, Message). 523 524inc_count(State) :- 525 arg(1, State, C0), 526 C1 is C0+1, 527 nb_setarg(1, State, C1). 528 529create_broadcast_threads(Hub, Count) :- 530 Threads is ceiling(sqrt(Count)), 531 forall(between(1, Threads, _), 532 create_broadcast_thread(Hub)). 533 534create_broadcast_thread(Hub) :- 535 hub_thread(broadcast_from_queues(Hub, [timeout(0)]), 536 Hub, hub_out_all_). 537 538 539%! broadcast_from_queues(+Hub, +Options) is det. 540% 541% Broadcast from over all known queues. 542 543broadcast_from_queues(Hub, Options) :- 544 forall(websocket(Hub.name, _WebSocket, Queue, _Lock, _Id), 545 broadcast_from_queue(Queue, Options)). 546 547 548%! broadcast_from_queue(+Queue, +Options) is det. 549% 550% Send all messages pending for Queue. Note that this predicate 551% locks the mutex associated with the Queue, such that other 552% workers cannot start sending messages to this client. Concurrent 553% sending would lead to out-of-order arrival of broadcast 554% messages. If the mutex is already held, someone else is 555% processing this message queue, so we don't have to worry. 556 557broadcast_from_queue(Queue, _Options) :- 558 message_queue_property(Queue, size(0)), 559 !. 560broadcast_from_queue(Queue, Options) :- 561 websocket(_Hub, _WebSocket, Queue, Lock, _Id), 562 !, 563 ( setup_call_cleanup( 564 mutex_trylock(Lock), 565 broadcast_from_queue_sync(Queue, Options), 566 mutex_unlock(Lock)) 567 -> true 568 ; true 569 ). 570broadcast_from_queue(_, _). 571 572% Note that we re-fetch websocket/5, such that we terminate if something 573% closed the websocket. 574 575broadcast_from_queue_sync(Queue, Options) :- 576 repeat, 577 ( websocket(_Hub, WebSocket, Queue, _Lock, _Id), 578 thread_get_message(Queue, Message, Options) 579 -> debug(hub(broadcast), 580 'To: ~p messages: ~p', [WebSocket, Message]), 581 catch(ws_send(WebSocket, Message), E, 582 io_write_error(WebSocket, Message, E)), 583 fail 584 ; ! 585 ). 586 587%! hub_thread(:Goal, +Hub, +Task) is det. 588% 589% Create a (temporary) thread for the hub to perform Task. We 590% created named threads if debugging hub(thread) is enabled. 591 592hub_thread(Goal, _, Task) :- 593 debugging(hub(thread)), 594 !, 595 gensym(Task, Alias), 596 thread_create(Goal, _, [detached(true), alias(Alias)]). 597hub_thread(Goal, _, _) :- 598 thread_create(Goal, _, [detached(true)])