1/* Part of SWI-Prolog 2 3 Author: Jeffrey Rosenwald and Jan Wielemaker 4 E-mail: jeffrose@acm.org 5 WWW: http://www.swi-prolog.org 6 Copyright (c) 2012-2013, Jeffrey Rosenwald 7 2018-2020, CWI Amsterdam 8 All rights reserved. 9 10 Redistribution and use in source and binary forms, with or without 11 modification, are permitted provided that the following conditions 12 are met: 13 14 1. Redistributions of source code must retain the above copyright 15 notice, this list of conditions and the following disclaimer. 16 17 2. Redistributions in binary form must reproduce the above copyright 18 notice, this list of conditions and the following disclaimer in 19 the documentation and/or other materials provided with the 20 distribution. 21 22 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 25 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 26 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 27 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 28 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 29 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 30 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 31 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 32 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 33 POSSIBILITY OF SUCH DAMAGE. 34*/ 35 36:- module(udp_broadcast, 37 [ udp_broadcast_initialize/2, % +IPAddress, +Options 38 udp_broadcast_close/1, % +Scope 39 40 udp_peer_add/2, % +Scope, +IP 41 udp_peer_del/2, % +Scope, ?IP 42 udp_peer/2 % +Scope, -IP 43 ]). 44:- autoload(library(apply),[maplist/2,maplist/3]). 45:- autoload(library(backcomp),[thread_at_exit/1]). 46:- autoload(library(broadcast), 47 [broadcast_request/1,broadcast/1,listening/3,listen/3]). 48:- autoload(library(debug),[debug/3]). 49:- autoload(library(error), 50 [must_be/2,syntax_error/1,domain_error/2,existence_error/2]). 51:- autoload(library(option),[option/3]). 52:- autoload(library(socket), 53 [ tcp_close_socket/1, 54 udp_socket/1, 55 tcp_bind/2, 56 tcp_getopt/2, 57 tcp_setopt/2, 58 udp_receive/4, 59 udp_send/4 60 ]). 61 62 63% :- debug(udp(broadcast)).
279:- multifile 280 udp_term_string_hook/3, % +Scope, ?Term, ?String 281 udp_unicast_join_hook/3, % +Scope, +From, +Data 282 black_list/1. % +Term 283 284:- meta_predicate 285 safely( ), 286 safely_det( ). 287 288safely(Predicate) :- 289 Err = error(_,_), 290 catch(Predicate, Err, 291 print_message_fail(Err)). 292 293safely_det(Predicate) :- 294 Err = error(_,_), 295 catch(Predicate, Err, 296 print_message_fail(Err)), 297 !. 298safely_det(_). 299 300print_message_fail(Term) :- 301 print_message(error, Term), 302 fail. 303 304udp_broadcast_address(IPAddress, Subnet, BroadcastAddress) :- 305 IPAddress = ip(A1, A2, A3, A4), 306 Subnet = ip(S1, S2, S3, S4), 307 BroadcastAddress = ip(B1, B2, B3, B4), 308 309 B1 is A1 \/ (S1 xor 255), 310 B2 is A2 \/ (S2 xor 255), 311 B3 is A3 \/ (S3 xor 255), 312 B4 is A4 \/ (S4 xor 255).
udp_subnet
.321:- dynamic 322 udp_scope/2, 323 udp_scope_peer/2. 324:- volatile 325 udp_scope/2, 326 udp_scope_peer/2. 327% 328% Here's a UDP proxy to Prolog's broadcast library 329% 330% A sender may extend a broadcast to a subnet of a UDP network by 331% specifying a =|udp_subnet|= scoping qualifier in his/her broadcast. 332% The qualifier has the effect of selecting the appropriate multi-cast 333% address for the transmission. Thus, the sender of the message has 334% control over the scope of his/her traffic on a per-message basis. 335% 336% All in-scope listeners receive the broadcast and simply rebroadcast 337% the message locally. All broadcast replies, if any, are sent directly 338% to the sender via the port-id that was received with the broadcast. 339% 340% Each listener exposes two UDP ports, a shared public port that is 341% bound to a well-known port number and a private port that uniquely 342% indentifies the listener. Broadcasts are received on the public port 343% and replies are sent on the private port. Directed broadcasts 344% (unicasts) are received on the private port and replies are sent on 345% the private port. 346 347% Thread 1 listens for directed traffic on the private port. 348% 349 350:- dynamic 351 udp_private_socket/3, % Port, Socket, FileNo 352 udp_public_socket/4, % Scope, Port, Socket, FileNo 353 udp_closed/1. % Scope 354 355udp_inbound_proxy(Master) :- 356 thread_at_exit(inbound_proxy_died), 357 make_private_socket, 358 thread_send_message(Master, udp_inbound_ready), 359 udp_inbound_proxy_loop. 360 361udp_inbound_proxy_loop :- 362 forall(udp_scope(Scope, ScopeData), 363 make_public_socket(ScopeData, Scope)), 364 retractall(udp_closed(_)), 365 findall(FileNo, udp_socket_file_no(FileNo), FileNos), 366 catch(dispatch_inbound(FileNos), 367 E, dispatch_exception(E)), 368 udp_inbound_proxy_loop. 369 370dispatch_exception(E) :- 371 E = error(_,_), 372 !, 373 print_message(warning, E). 374dispatch_exception(_).
384make_private_socket :- 385 udp_private_socket(_Port, S, _F), 386 !, 387 ( ( udp_scope(Scope, broadcast(_,_,_)) 388 ; udp_scope(Scope, multicast(_,_)) 389 ), 390 \+ udp_closed(Scope) 391 -> true 392 ; tcp_close_socket(S), 393 retractall(udp_private_socket(_,_,_)) 394 ). 395make_private_socket :- 396 udp_scope(_, broadcast(_,_,_)), 397 !, 398 udp_socket(S), 399 tcp_bind(S, Port), 400 tcp_getopt(S, file_no(F)), 401 tcp_setopt(S, broadcast), 402 assertz(udp_private_socket(Port, S, F)). 403make_private_socket :- 404 udp_scope(_, multicast(_,_)), 405 !, 406 udp_socket(S), 407 tcp_bind(S, Port), 408 tcp_getopt(S, file_no(F)), 409 assertz(udp_private_socket(Port, S, F)). 410make_private_socket.
416make_public_socket(_, Scope) :- 417 udp_public_socket(Scope, _Port, S, _), 418 !, 419 ( udp_closed(Scope) 420 -> tcp_close_socket(S), 421 retractall(udp_public_socket(Scope, _, _, _)) 422 ; true 423 ). 424make_public_socket(broadcast(_SubNet, _Broadcast, Port), Scope) :- 425 udp_socket(S), 426 tcp_setopt(S, reuseaddr), 427 tcp_bind(S, Port), 428 tcp_getopt(S, file_no(F)), 429 assertz(udp_public_socket(Scope, Port, S, F)). 430make_public_socket(multicast(Group, Port), Scope) :- 431 udp_socket(S), 432 tcp_setopt(S, reuseaddr), 433 tcp_bind(S, Port), 434 tcp_setopt(S, ip_add_membership(Group)), 435 tcp_getopt(S, file_no(F)), 436 assertz(udp_public_socket(Scope, Port, S, F)). 437make_public_socket(unicast(Port), Scope) :- 438 udp_socket(S), 439 tcp_bind(S, Port), 440 tcp_getopt(S, file_no(F)), 441 assertz(udp_public_socket(Scope, Port, S, F)). 442 443udp_socket_file_no(FileNo) :- 444 udp_private_socket(_,_,FileNo). 445udp_socket_file_no(FileNo) :- 446 udp_public_socket(_,_,_,FileNo).
456dispatch_inbound(FileNos) :- 457 debug(udp(broadcast), 'Waiting for ~p', [FileNos]), 458 wait_for_input(FileNos, Ready, infinite), 459 debug(udp(broadcast), 'Ready: ~p', [Ready]), 460 maplist(dispatch_ready, Ready), 461 dispatch_inbound(FileNos). 462 463dispatch_ready(FileNo) :- 464 udp_private_socket(_Port, Private, FileNo), 465 !, 466 udp_receive(Private, Data, From, [max_message_size(65535)]), 467 debug(udp(broadcast), 'Inbound on private port', []), 468 ( in_scope(Scope, From), 469 udp_term_string(Scope, Term, Data) % only accept valid data 470 -> ld_dispatch(Private, Term, From, Scope) 471 ; true 472 ). 473dispatch_ready(FileNo) :- 474 udp_public_socket(Scope, _PublicPort, Public, FileNo), 475 !, 476 udp_receive(Public, Data, From, [max_message_size(65535)]), 477 debug(udp(broadcast), 'Inbound on public port from ~p for scope ~p', 478 [From, Scope]), 479 ( in_scope(Scope, From), 480 udp_term_string(Scope, Term, Data) % only accept valid data 481 -> ( udp_scope(Scope, unicast(_)) 482 -> ld_dispatch(Public, Term, From, Scope) 483 ; udp_private_socket(_PrivatePort, Private, _FileNo), 484 ld_dispatch(Private, Term, From, Scope) 485 ) 486 ; udp_scope(Scope, unicast(_)), 487 udp_term_string(Scope, Term, Data), 488 unicast_out_of_scope_request(Scope, From, Term) 489 -> true 490 ; true 491 ). 492 493in_scope(Scope, Address) :- 494 udp_scope(Scope, ScopeData), 495 in_scope(ScopeData, Scope, Address), 496 !. 497in_scope(Scope, From) :- 498 debug(udp(broadcast), 'Out-of-scope ~p datagram from ~p', 499 [Scope, From]), 500 fail. 501 502in_scope(broadcast(Subnet, Broadcast, _PublicPort), _Scope, IP:_FromPort) :- 503 udp_broadcast_address(IP, Subnet, Broadcast). 504in_scope(multicast(_Group, _Port), _Scope, _From). 505in_scope(unicast(_PublicPort), Scope, IP:_) :- 506 udp_peer(Scope, IP:_).
515ld_dispatch(_S, Term, From, _Scope) :- 516 debug(udp(broadcast), 'ld_dispatch(~p) from ~p', [Term, From]), 517 fail. 518ld_dispatch(_S, Term, _From, _Scope) :- 519 blacklisted(Term), !. 520ld_dispatch(S, request(Key, Term), From, Scope) :- 521 !, 522 forall(safely(broadcast_request(Term)), 523 safely((udp_term_string(Scope, reply(Key,Term), Message), 524 udp_send(S, Message, From, [])))). 525ld_dispatch(_S, send(Term), _From, _Scope) :- 526 !, 527 safely_det(broadcast(Term)). 528ld_dispatch(_S, reply(Key, Term), From, _Scope) :- 529 ( reply_queue(Key, Queue) 530 -> safely(thread_send_message(Queue, Term:From)) 531 ; true 532 ). 533 534blacklisted(send(Term)) :- black_list(Term). 535blacklisted(request(_,Term)) :- black_list(Term). 536blacklisted(reply(_,Term)) :- black_list(Term).
552reload_udp_proxy :- 553 reload_outbound_proxy, 554 reload_inbound_proxy. 555 556reload_outbound_proxy :- 557 listening(udp_broadcast, udp(_,_), _), 558 !. 559reload_outbound_proxy :- 560 listen(udp_broadcast, udp(Scope,Message), 561 udp_broadcast(Message, Scope, 0.25)), 562 listen(udp_broadcast, udp(Scope,Message,Timeout), 563 udp_broadcast(Message, Scope, Timeout)), 564 listen(udp_broadcast, udp_subnet(Message), % backward compatibility 565 udp_broadcast(Message, subnet, 0.25)), 566 listen(udp_broadcast, udp_subnet(Message,Timeout), 567 udp_broadcast(Message, subnet, Timeout)). 568 569reload_inbound_proxy :- 570 catch(thread_signal(udp_inbound_proxy, throw(udp_reload)), 571 error(existence_error(thread, _),_), 572 fail), 573 !. 574reload_inbound_proxy :- 575 thread_self(Me), 576 thread_create(udp_inbound_proxy(Me), _, 577 [ alias(udp_inbound_proxy), 578 detached(true) 579 ]), 580 thread_get_message(Me, udp_inbound_ready, [timeout(10)]). 581 582inbound_proxy_died :- 583 thread_self(Self), 584 thread_property(Self, status(Status)), 585 ( catch(recreate_proxy(Status), _, fail) 586 -> print_message(informational, 587 httpd_restarted_worker(Self)) 588 ; done_status_message_level(Status, Level), 589 print_message(Level, 590 httpd_stopped_worker(Self, Status)) 591 ). 592 593recreate_proxy(exception(Error)) :- 594 recreate_on_error(Error), 595 reload_inbound_proxy. 596 597recreate_on_error('$aborted'). 598recreate_on_error(time_limit_exceeded). 599 600done_status_message_level(true, silent) :- !. 601done_status_message_level(exception('$aborted'), silent) :- !. 602done_status_message_level(_, informational).
609udp_broadcast_close(Scope) :- 610 udp_scope(Scope, _ScopeData), 611 !, 612 assert(udp_closed(Scope)), 613 reload_udp_proxy. 614udp_broadcast_close(_).
Term:Address
to send Term to a specific address or query
the address from which term is answered or it is a plain Term.
If Term is nonground, it is considered is a request (see broadcast_request/1) and the predicate succeeds for each answer received within TimeOut seconds. If Term is ground it is considered an asynchronous broadcast and udp_broadcast/3 is deterministic.
628udp_broadcast(Term:To, Scope, _Timeout) :- 629 ground(Term), ground(To), % broadcast to single listener 630 !, 631 udp_basic_broadcast(send(Term), Scope, single(To)). 632udp_broadcast(Term, Scope, _Timeout) :- 633 ground(Term), % broadcast to all listeners 634 !, 635 udp_basic_broadcast(send(Term), Scope, broadcast). 636udp_broadcast(Term:To, Scope, Timeout) :- 637 ground(To), % request to single listener 638 !, 639 setup_call_cleanup( 640 request_queue(Id, Queue), 641 ( udp_basic_broadcast(request(Id, Term), Scope, single(To)), 642 udp_br_collect_replies(Queue, Timeout, Term:To) 643 ), 644 destroy_request_queue(Queue)). 645udp_broadcast(Term:From, Scope, Timeout) :- 646 !, % request to all listeners, collect sender 647 setup_call_cleanup( 648 request_queue(Id, Queue), 649 ( udp_basic_broadcast(request(Id, Term), Scope, broadcast), 650 udp_br_collect_replies(Queue, Timeout, Term:From) 651 ), 652 destroy_request_queue(Queue)). 653udp_broadcast(Term, Scope, Timeout) :- % request to all listeners 654 udp_broadcast(Term:_, Scope, Timeout). 655 656:- dynamic 657 reply_queue/2. 658 659request_queue(Id, Queue) :- 660 Id is random(1<<63), 661 message_queue_create(Queue), 662 asserta(reply_queue(Id, Queue)). 663 664destroy_request_queue(Queue) :- % leave queue to GC 665 retractall(reply_queue(_, Queue)).
This predicate succeeds with a choice point. Committing the choice point closes S.
678udp_basic_broadcast(Term, Scope, Dest) :- 679 debug(udp(broadcast), 'UDP proxy outbound ~p to ~p', [Term, Dest]), 680 udp_term_string(Scope, Term, String), 681 udp_send_message(Dest, String, Scope). 682 683udp_send_message(single(Address), String, Scope) :- 684 ( udp_scope(Scope, unicast(_)) 685 -> udp_public_socket(Scope, _Port, S, _) 686 ; udp_private_socket(_Port, S, _F) 687 ), 688 safely(udp_send(S, String, Address, [])). 689udp_send_message(broadcast, String, Scope) :- 690 ( udp_scope(Scope, unicast(_)) 691 -> udp_public_socket(Scope, _Port, S, _), 692 forall(udp_peer(Scope, Address), 693 ( debug(udp(broadcast), 'Unicast to ~p', [Address]), 694 safely(udp_send(S, String, Address, [])))) 695 ; udp_scope(Scope, broadcast(_SubNet, Broadcast, Port)) 696 -> udp_private_socket(_PrivatePort, S, _F), 697 udp_send(S, String, Broadcast:Port, []) 698 ; udp_scope(Scope, multicast(Group, Port)) 699 -> udp_private_socket(_PrivatePort, S, _F), 700 udp_send(S, String, Group:Port, []) 701 ). 702 703% ! udp_br_collect_replies(+Queue, +TimeOut, -TermAndFrom) is nondet. 704% 705% Collect replies on Socket for TimeOut seconds. Succeed for each 706% received message. 707 708udp_br_collect_replies(Queue, Timeout, Reply) :- 709 get_time(Start), 710 Deadline is Start+Timeout, 711 repeat, 712 ( thread_get_message(Queue, Reply, 713 [ deadline(Deadline) 714 ]) 715 -> true 716 ; !, 717 fail 718 ).
ip(A,B,C,D)
or an atom or string of the format A.B.C.D
. Options processed:
subnet
.For compatibility reasons Options may be the subnet mask.
747udp_broadcast_initialize(IP, Options) :- 748 with_mutex(udp_broadcast, 749 udp_broadcast_initialize_sync(IP, Options)). 750 751udp_broadcast_initialize_sync(IP, Options) :- 752 nonvar(Options), 753 Options = ip(_,_,_,_), 754 !, 755 udp_broadcast_initialize(IP, [subnet_mask(Options)]). 756udp_broadcast_initialize_sync(IP, Options) :- 757 to_ip4(IP, IPAddress), 758 option(method(Method), Options, broadcast), 759 must_be(oneof([broadcast, multicast, unicast]), Method), 760 udp_broadcast_initialize_sync(Method, IPAddress, Options), 761 reload_udp_proxy. 762 763udp_broadcast_initialize_sync(broadcast, IPAddress, Options) :- 764 option(subnet_mask(Subnet), Options, _), 765 mk_subnet(Subnet, IPAddress, Subnet4), 766 option(port(Port), Options, 20005), 767 option(scope(Scope), Options, subnet), 768 769 udp_broadcast_address(IPAddress, Subnet4, Broadcast), 770 udp_broadcast_close(Scope), 771 assertz(udp_scope(Scope, broadcast(Subnet4, Broadcast, Port))). 772udp_broadcast_initialize_sync(unicast, _IPAddress, Options) :- 773 option(port(Port), Options, 20005), 774 option(scope(Scope), Options, subnet), 775 udp_broadcast_close(Scope), 776 assertz(udp_scope(Scope, unicast(Port))). 777udp_broadcast_initialize_sync(multicast, IPAddress, Options) :- 778 option(port(Port), Options, 20005), 779 option(scope(Scope), Options, subnet), 780 udp_broadcast_close(Scope), 781 multicast_address(IPAddress), 782 assertz(udp_scope(Scope, multicast(IPAddress, Port))). 783 784to_ip4(Atomic, ip(A,B,C,D)) :- 785 atomic(Atomic), 786 !, 787 ( split_string(Atomic, ".", "", Strings), 788 maplist(number_string, [A,B,C,D], Strings) 789 -> true 790 ; syntax_error(illegal_ip_address) 791 ). 792to_ip4(IP, IP). 793 794mk_subnet(Var, IP, Subnet) :- 795 var(Var), 796 !, 797 ( default_subnet(IP, Subnet) 798 -> true 799 ; domain_error(ip_with_subnet, IP) 800 ). 801mk_subnet(Subnet, _, Subnet4) :- 802 to_ip4(Subnet, Subnet4).
811default_subnet(ip(A,_,_,_), ip(A,0,0,0)) :- 812 between(0,127, A), !. 813default_subnet(ip(A,B,_,_), ip(A,B,0,0)) :- 814 between(128,191, A), !. 815default_subnet(ip(A,B,C,_), ip(A,B,C,0)) :- 816 between(192,223, A), !. 817 818multicast_address(ip(A,_,_,_)) :- 819 between(224, 239, A), 820 !. 821multicast_address(IP) :- 822 domain_error(multicast_network, IP). 823 824 825 /******************************* 826 * UNICAST PEERS * 827 *******************************/
839udp_peer_add(Scope, Address) :- 840 must_be(ground, Address), 841 peer_address(Address, Scope, Canonical), 842 ( udp_scope_peer(Scope, Canonical) 843 -> true 844 ; assertz(udp_scope_peer(Scope, Canonical)) 845 ). 846 847udp_peer_del(Scope, Address) :- 848 peer_address(Address, Scope, Canonical), 849 retractall(udp_scope_peer(Scope, Canonical)). 850 851udp_peer(Scope, IPAddress) :- 852 udp_scope_peer(Scope, IPAddress). 853 854peer_address(IP:Port, _Scope, IPAddress:Port) :- 855 !, 856 to_ip4(IP, IPAddress). 857peer_address(IP, Scope, IPAddress:Port) :- 858 ( udp_scope(Scope, unicast(Port)) 859 -> true 860 ; existence_error(udp_scope, Scope) 861 ), 862 to_ip4(IP, IPAddress). 863 864 865 866 /******************************* 867 * HOOKS * 868 *******************************/
%prolog\n
, followed by the Prolog term in quoted notation while
ignoring operators. This hook may use alternative serialization such
as fast_term_serialized/2, use library(ssl) to realise encrypted
messages, etc.
In mode (+,-), Term is written with the options ignore_ops(true)
and
quoted(true)
.
This predicate first calls udp_term_string_hook/3.
908udp_term_string(Scope, Term, String) :- 909 catch(udp_term_string_hook(Scope, Term, String), udp(Error), true), 910 !, 911 ( var(Error) 912 -> true 913 ; Error == invalid_message 914 -> fail 915 ; throw(udp(Error)) 916 ). 917udp_term_string(_Scope, Term, String) :- 918 ( var(String) 919 -> format(string(String), '%-prolog-\n~W', 920 [ Term, 921 [ ignore_ops(true), 922 quoted(true) 923 ] 924 ]) 925 ; sub_string(String, 0, _, _, '%-prolog-\n'), 926 term_string(Term, String, 927 [ syntax_errors(quiet) 928 ]) 929 ).
This hook is intended to initiate a new node joining the network of
peers. We could in theory also omit the in-scope test and use a
normal broadcast to join. Using a different channal however provides
a basic level of security. A possibe implementation is below. The
first fragment is a hook added to the server, the second is a
predicate added to a client and the last initiates the request in
the client. The excanged term (join(X)
) can be used to exchange a
welcome handshake.
:- multifile udp_broadcast:udp_unicast_join_hook/3. udp_broadcast:udp_unicast_join_hook(Scope, From, join(welcome)) :- udp_peer_add(Scope, From),
join_request(Scope, Address, Reply) :- udp_peer_add(Scope, Address), broadcast_request(udp(Scope, join(X))).
?- join_request(myscope, "1.2.3.4":10001, Reply). Reply = welcome.
967unicast_out_of_scope_request(Scope, From, send(Term)) :- 968 udp_unicast_join_hook(Scope, From, Term). 969unicast_out_of_scope_request(Scope, From, request(Key, Term)) :- 970 udp_unicast_join_hook(Scope, From, Term), 971 udp_public_socket(Scope, _Port, Socket, _FileNo), 972 safely((udp_term_string(Scope, reply(Key,Term), Message), 973 udp_send(Socket, Message, From, [])))
A UDP broadcast proxy
SWI-Prolog's broadcast library provides a means that may be used to facilitate publish and subscribe communication regimes between anonymous members of a community of interest. The members of the community are however, necessarily limited to a single instance of Prolog. The UDP broadcast library removes that restriction. With this library loaded, any member on your local IP subnetwork that also has this library loaded may hear and respond to your broadcasts.
This library support three styles of networking as described below. Each of these networks have their own advantages and disadvantages. Please study the literature to understand the consequences.
After initialization and, in the case of a unicast network managing the set of peers, communication happens through broadcast/1, broadcast_request/1 and listen/1,2,3.
A broadcast/1 or broadcast_request/1 of the shape
udp(Scope, Term)
orudp(Scope, Term, TimeOut)
is forwarded over the UDP network to all peers that joined the same Scope. To prevent the potential for feedback loops, only the plain Term is broadcasted locally. The timeout is optional. It specifies the amount to time to wait for replies to arrive in response to a broadcast_request/1. The default period is 0.250 seconds. The timeout is ignored for broadcasts.An example of three separate processes cooperating in the same scope called
peers
:It is also possible to carry on a private dialog with a single responder. To do this, you supply a compound of the form, Term:PortId, to a UDP scoped broadcast/1 or broadcast_request/1, where PortId is the ip-address and port-id of the intended listener. If you supply an unbound variable, PortId, to broadcast_request, it will be unified with the address of the listener that responds to Term. You may send a directed broadcast to a specific member by simply providing this address in a similarly structured compound to a UDP scoped broadcast/1. The message is sent via unicast to that member only by way of the member's broadcast listener. It is received by the listener just as any other broadcast would be. The listener does not know the difference.
For example, in order to discover who responded with a particular value:
All incomming trafic is handled by a single thread with the alias
udp_inbound_proxy
. This thread also performs the internal dispatching using broadcast/1 and broadcast_request/1. Future versions may provide for handling these requests in separate threads.Caveats
While the implementation is mostly transparent, there are some important and subtle differences that must be taken into consideration:
udp_subnet
scope is not reentrant. If a listener performs a broadcast_request/1 with UDP scope recursively, then disaster looms certain. This caveat does not apply to a UDP scoped broadcast/1, which can safely be performed from a listener context.tipc.pl
*/