1/* Part of SWI-Prolog 2 3 Author: Jan Wielemaker 4 E-mail: J.Wielemaker@vu.nl 5 WWW: http://www.swi-prolog.org 6 Copyright (c) 2008-2016, University of Amsterdam 7 VU University Amsterdam 8 All rights reserved. 9 10 Redistribution and use in source and binary forms, with or without 11 modification, are permitted provided that the following conditions 12 are met: 13 14 1. Redistributions of source code must retain the above copyright 15 notice, this list of conditions and the following disclaimer. 16 17 2. Redistributions in binary form must reproduce the above copyright 18 notice, this list of conditions and the following disclaimer in 19 the documentation and/or other materials provided with the 20 distribution. 21 22 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 25 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 26 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 27 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 28 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 29 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 30 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 31 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 32 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 33 POSSIBILITY OF SUCH DAMAGE. 34*/ 35 36:- module(thread_pool, 37 [ thread_pool_create/3, % +Pool, +Size, +Options 38 thread_pool_destroy/1, % +Pool 39 thread_create_in_pool/4, % +Pool, :Goal, -Id, +Options 40 41 current_thread_pool/1, % ?Pool 42 thread_pool_property/2 % ?Pool, ?Property 43 ]). 44:- autoload(library(debug),[debug/3]). 45:- autoload(library(error),[must_be/2,type_error/2]). 46:- autoload(library(lists),[member/2,delete/3]). 47:- autoload(library(option), 48 [meta_options/3,select_option/4,merge_options/3,option/3]). 49:- autoload(library(rbtrees), 50 [ rb_new/1, 51 rb_insert_new/4, 52 rb_delete/3, 53 rb_keys/2, 54 rb_lookup/3, 55 rb_update/4 56 ]).
93:- meta_predicate 94 thread_create_in_pool( , , , ). 95:- predicate_options(thread_create_in_pool/4, 4, 96 [ wait(boolean), 97 pass_to(system:thread_create/3, 3) 98 ]). 99 100:- multifile 101 create_pool/1.
wait
option of
thread_create_in_pool/4 and the backlog
option described
below. Options are passed to thread_create/3, except for
infinite
. Otherwise it must be a non-negative integer.
Using backlog(0)
will never delay thread creation for this
pool.
The pooling mechanism does not interact with the detached
state of a thread. Threads can be created both detached
and
normal and must be joined using thread_join/2 if they are not
detached.
124thread_pool_create(Name, Size, Options) :-
125 must_be(list, Options),
126 pool_manager(Manager),
127 thread_self(Me),
128 thread_send_message(Manager, create_pool(Name, Size, Options, Me)),
129 wait_reply.
137thread_pool_destroy(Name) :-
138 pool_manager(Manager),
139 thread_self(Me),
140 thread_send_message(Manager, destroy_pool(Name, Me)),
141 wait_reply.
148current_thread_pool(Name) :-
149 pool_manager(Manager),
150 thread_self(Me),
151 thread_send_message(Manager, current_pools(Me)),
152 wait_reply(Pools),
153 ( atom(Name)
154 -> memberchk(Name, Pools)
155 ; member(Name, Pools)
156 ).
176thread_pool_property(Name, Property) :-
177 current_thread_pool(Name),
178 pool_manager(Manager),
179 thread_self(Me),
180 thread_send_message(Manager, pool_properties(Me, Name, Property)),
181 wait_reply(Props),
182 ( nonvar(Property)
183 -> memberchk(Property, Props)
184 ; member(Property, Props)
185 ).
true
(default) and the pool is full, wait until a
member of the pool completes. If false
, throw a
resource_error.204thread_create_in_pool(Pool, Goal, Id, QOptions) :- 205 meta_options(is_meta, QOptions, Options), 206 catch(thread_create_in_pool_(Pool, Goal, Id, Options), 207 Error, true), 208 ( var(Error) 209 -> true 210 ; Error = error(existence_error(thread_pool, Pool), _), 211 create_pool_lazily(Pool) 212 -> thread_create_in_pool_(Pool, Goal, Id, Options) 213 ; throw(Error) 214 ). 215 216thread_create_in_pool_(Pool, Goal, Id, Options) :- 217 select_option(wait(Wait), Options, ThreadOptions, true), 218 pool_manager(Manager), 219 thread_self(Me), 220 thread_send_message(Manager, 221 create(Pool, Goal, Me, Wait, Id, ThreadOptions)), 222 wait_reply(Id). 223 224is_meta(at_exit).
231create_pool_lazily(Pool) :- 232 with_mutex(Pool, 233 ( mutex_destroy(Pool), 234 create_pool_sync(Pool) 235 )). 236 237create_pool_sync(Pool) :- 238 current_thread_pool(Pool), 239 !. 240create_pool_sync(Pool) :- 241 create_pool(Pool). 242 243 244 /******************************* 245 * START MANAGER * 246 *******************************/
253pool_manager(TID) :- 254 TID = '__thread_pool_manager', 255 ( thread_running(TID) 256 -> true 257 ; with_mutex('__thread_pool', create_pool_manager(TID)) 258 ). 259 260thread_running(Thread) :- 261 catch(thread_property(Thread, status(Status)), 262 E, true), 263 ( var(E) 264 -> ( Status == running 265 -> true 266 ; thread_join(Thread, _), 267 print_message(warning, thread_pool(manager_died(Status))), 268 fail 269 ) 270 ; E = error(existence_error(thread, Thread), _) 271 -> fail 272 ; throw(E) 273 ). 274 275create_pool_manager(Thread) :- 276 thread_running(Thread), 277 !. 278create_pool_manager(Thread) :- 279 thread_create(pool_manager_main, _, 280 [ alias(Thread), 281 inherit_from(main) 282 ]). 283 284 285pool_manager_main :- 286 rb_new(State0), 287 manage_thread_pool(State0). 288 289 290 /******************************* 291 * MANAGER LOGIC * 292 *******************************/
296manage_thread_pool(State0) :- 297 thread_get_message(Message), 298 ( update_thread_pool(Message, State0, State) 299 -> debug(thread_pool(state), 'Message ~p --> ~p', [Message, State]), 300 manage_thread_pool(State) 301 ; format(user_error, 'Update failed: ~p~n', [Message]) 302 ). 303 304 305update_thread_pool(create_pool(Name, Size, Options, For), State0, State) :- 306 !, 307 ( rb_insert_new(State0, 308 Name, tpool(Options, Size, Size, WP, WP, []), 309 State) 310 -> thread_send_message(For, thread_pool(true)) 311 ; reply_error(For, permission_error(create, thread_pool, Name)), 312 State = State0 313 ). 314update_thread_pool(destroy_pool(Name, For), State0, State) :- 315 !, 316 ( rb_delete(State0, Name, State) 317 -> thread_send_message(For, thread_pool(true)) 318 ; reply_error(For, existence_error(thread_pool, Name)), 319 State = State0 320 ). 321update_thread_pool(current_pools(For), State, State) :- 322 !, 323 rb_keys(State, Keys), 324 debug(thread_pool(current), 'Reply to ~w: ~p', [For, Keys]), 325 reply(For, Keys). 326update_thread_pool(pool_properties(For, Name, P), State, State) :- 327 !, 328 ( rb_lookup(Name, Pool, State) 329 -> findall(P, pool_property(P, Pool), List), 330 reply(For, List) 331 ; reply_error(For, existence_error(thread_pool, Name)) 332 ). 333update_thread_pool(Message, State0, State) :- 334 arg(1, Message, Name), 335 ( rb_lookup(Name, Pool0, State0) 336 -> update_pool(Message, Pool0, Pool), 337 rb_update(State0, Name, Pool, State) 338 ; State = State0, 339 ( Message = create(Name, _, For, _, _, _) 340 -> reply_error(For, existence_error(thread_pool, Name)) 341 ; true 342 ) 343 ). 344 345pool_property(options(Options), 346 tpool(Options, _Free, _Size, _WP, _WPT, _Members)). 347pool_property(backlog(Size), 348 tpool(_, _Free, _Size, WP, WPT, _Members)) :- 349 diff_list_length(WP, WPT, Size). 350pool_property(free(Free), 351 tpool(_, Free, _Size, _, _, _)). 352pool_property(size(Size), 353 tpool(_, _Free, Size, _, _, _)). 354pool_property(running(Count), 355 tpool(_, Free, Size, _, _, _)) :- 356 Count is Size - Free. 357pool_property(members(IDList), 358 tpool(_, _, _, _, _, IDList)). 359 360diff_list_length(List, Tail, Size) :- 361 '$skip_list'(Length, List, Rest), 362 ( Rest == Tail 363 -> Size = Length 364 ; type_error(difference_list, List/Tail) 365 ).
382update_pool(create(Name, Goal, For, _, Id, MyOptions), 383 tpool(Options, Free0, Size, WP, WPT, Members0), 384 tpool(Options, Free, Size, WP, WPT, Members)) :- 385 succ(Free, Free0), 386 !, 387 merge_options(MyOptions, Options, ThreadOptions), 388 select_option(at_exit(AtExit), ThreadOptions, ThreadOptions1, true), 389 catch(thread_create(Goal, Id, 390 [ at_exit(worker_exitted(Name, Id, AtExit)) 391 | ThreadOptions1 392 ]), 393 E, true), 394 ( var(E) 395 -> Members = [Id|Members0], 396 reply(For, Id) 397 ; reply_error(For, E), 398 Members = Members0 399 ). 400update_pool(Create, 401 tpool(Options, 0, Size, WP, WPT0, Members), 402 tpool(Options, 0, Size, WP, WPT, Members)) :- 403 Create = create(Name, _Goal, For, Wait, _, _Options), 404 !, 405 option(backlog(BackLog), Options, infinite), 406 ( can_delay(Wait, BackLog, WP, WPT0) 407 -> WPT0 = [Create|WPT], 408 debug(thread_pool, 'Delaying ~p', [Create]) 409 ; WPT = WPT0, 410 reply_error(For, resource_error(threads_in_pool(Name))) 411 ). 412update_pool(exitted(_Name, Id), 413 tpool(Options, Free0, Size, WP0, WPT, Members0), 414 Pool) :- 415 succ(Free0, Free), 416 delete(Members0, Id, Members1), 417 Pool1 = tpool(Options, Free, Size, WP, WPT, Members1), 418 ( WP0 == WPT 419 -> WP = WP0, 420 Pool = Pool1 421 ; WP0 = [Waiting|WP], 422 debug(thread_pool, 'Start delayed ~p', [Waiting]), 423 update_pool(Waiting, Pool1, Pool) 424 ). 425 426 427can_delay(true, infinite, _, _) :- !. 428can_delay(true, BackLog, WP, WPT) :- 429 diff_list_length(WP, WPT, Size), 430 BackLog > Size.
440:- public 441 worker_exitted/3. 442 443worker_exitted(Name, Id, AtExit) :- 444 catch(thread_send_message('__thread_pool_manager', exitted(Name, Id)), 445 _, true), 446 call(AtExit). 447 448 449 /******************************* 450 * UTIL * 451 *******************************/ 452 453reply(To, Term) :- 454 thread_send_message(To, thread_pool(true(Term))). 455 456reply_error(To, Error) :- 457 thread_send_message(To, thread_pool(error(Error, _))). 458 459wait_reply :- 460 thread_get_message(thread_pool(Result)), 461 ( Result == true 462 -> true 463 ; Result == fail 464 -> fail 465 ; throw(Result) 466 ). 467 468wait_reply(Value) :- 469 thread_get_message(thread_pool(Reply)), 470 ( Reply = true(Value0) 471 -> Value = Value0 472 ; Reply == fail 473 -> fail 474 ; throw(Reply) 475 ). 476 477 478 /******************************* 479 * HOOKS * 480 *******************************/
media
, which holds a
maximum of 20 threads.
:- multifile thread_pool:create_pool/1. thread_pool:create_pool(media) :- thread_pool_create(media, 20, []).
498 /******************************* 499 * MESSAGES * 500 *******************************/ 501:- multifile 502 prolog:message/3. 503 504prologmessage(thread_pool(Message)) --> 505 message(Message). 506 507message(manager_died(Status)) --> 508 [ 'Thread-pool: manager died on status ~p; restarting'-[Status] ]
Resource bounded thread management
The module library(thread_pool) manages threads in pools. A pool defines properties of its member threads and the maximum number of threads that can coexist in the pool. The call thread_create_in_pool/4 allocates a thread in the pool, just like thread_create/3. If the pool is fully allocated it can be asked to wait or raise an error.
The library has been designed to deal with server applications that receive a variety of requests, such as HTTP servers. Simply starting a thread for each request is a bit too simple minded for such servers:
Using this library, one can define a pool for each set of tasks with comparable characteristics and create threads in this pool. Unlike the worker-pool model, threads are not started immediately. Depending on the design, both approaches can be attractive.
The library is implemented by means of a manager thread with the fixed thread id
__thread_pool_manager
. All state is maintained in this manager thread, which receives and processes requests to create and destroy pools, create threads in a pool and handle messages from terminated threads. Thread pools are not saved in a saved state and must therefore be recreated using the initialization/1 directive or otherwise during startup of the application.