35
36:- module(thread_pool,
37 [ thread_pool_create/3, 38 thread_pool_destroy/1, 39 thread_create_in_pool/4, 40
41 current_thread_pool/1, 42 thread_pool_property/2 43 ]). 44:- autoload(library(debug),[debug/3]). 45:- autoload(library(error),[must_be/2,type_error/2]). 46:- autoload(library(lists),[member/2,delete/3]). 47:- autoload(library(option),
48 [meta_options/3,select_option/4,merge_options/3,option/3]). 49:- autoload(library(rbtrees),
50 [ rb_new/1,
51 rb_insert_new/4,
52 rb_delete/3,
53 rb_keys/2,
54 rb_lookup/3,
55 rb_update/4
56 ]). 57
58
92
93:- meta_predicate
94 thread_create_in_pool(+, 0, -, :). 95:- predicate_options(thread_create_in_pool/4, 4,
96 [ wait(boolean),
97 pass_to(system:thread_create/3, 3)
98 ]). 99
100:- multifile
101 create_pool/1. 102
123
124thread_pool_create(Name, Size, Options) :-
125 must_be(list, Options),
126 pool_manager(Manager),
127 thread_self(Me),
128 thread_send_message(Manager, create_pool(Name, Size, Options, Me)),
129 wait_reply.
130
136
137thread_pool_destroy(Name) :-
138 pool_manager(Manager),
139 thread_self(Me),
140 thread_send_message(Manager, destroy_pool(Name, Me)),
141 wait_reply.
142
143
147
148current_thread_pool(Name) :-
149 pool_manager(Manager),
150 thread_self(Me),
151 thread_send_message(Manager, current_pools(Me)),
152 wait_reply(Pools),
153 ( atom(Name)
154 -> memberchk(Name, Pools)
155 ; member(Name, Pools)
156 ).
157
175
176thread_pool_property(Name, Property) :-
177 current_thread_pool(Name),
178 pool_manager(Manager),
179 thread_self(Me),
180 thread_send_message(Manager, pool_properties(Me, Name, Property)),
181 wait_reply(Props),
182 ( nonvar(Property)
183 -> memberchk(Property, Props)
184 ; member(Property, Props)
185 ).
186
187
203
204thread_create_in_pool(Pool, Goal, Id, QOptions) :-
205 meta_options(is_meta, QOptions, Options),
206 catch(thread_create_in_pool_(Pool, Goal, Id, Options),
207 Error, true),
208 ( var(Error)
209 -> true
210 ; Error = error(existence_error(thread_pool, Pool), _),
211 create_pool_lazily(Pool)
212 -> thread_create_in_pool_(Pool, Goal, Id, Options)
213 ; throw(Error)
214 ).
215
216thread_create_in_pool_(Pool, Goal, Id, Options) :-
217 select_option(wait(Wait), Options, ThreadOptions, true),
218 pool_manager(Manager),
219 thread_self(Me),
220 thread_send_message(Manager,
221 create(Pool, Goal, Me, Wait, Id, ThreadOptions)),
222 wait_reply(Id).
223
224is_meta(at_exit).
225
226
230
231create_pool_lazily(Pool) :-
232 with_mutex(Pool,
233 ( mutex_destroy(Pool),
234 create_pool_sync(Pool)
235 )).
236
237create_pool_sync(Pool) :-
238 current_thread_pool(Pool),
239 !.
240create_pool_sync(Pool) :-
241 create_pool(Pool).
242
243
244 247
252
253pool_manager(TID) :-
254 TID = '__thread_pool_manager',
255 ( thread_running(TID)
256 -> true
257 ; with_mutex('__thread_pool', create_pool_manager(TID))
258 ).
259
260thread_running(Thread) :-
261 catch(thread_property(Thread, status(Status)),
262 E, true),
263 ( var(E)
264 -> ( Status == running
265 -> true
266 ; thread_join(Thread, _),
267 print_message(warning, thread_pool(manager_died(Status))),
268 fail
269 )
270 ; E = error(existence_error(thread, Thread), _)
271 -> fail
272 ; throw(E)
273 ).
274
275create_pool_manager(Thread) :-
276 thread_running(Thread),
277 !.
278create_pool_manager(Thread) :-
279 thread_create(pool_manager_main, _,
280 [ alias(Thread),
281 inherit_from(main)
282 ]).
283
284
285pool_manager_main :-
286 rb_new(State0),
287 manage_thread_pool(State0).
288
289
290 293
295
296manage_thread_pool(State0) :-
297 thread_get_message(Message),
298 ( update_thread_pool(Message, State0, State)
299 -> debug(thread_pool(state), 'Message ~p --> ~p', [Message, State]),
300 manage_thread_pool(State)
301 ; format(user_error, 'Update failed: ~p~n', [Message])
302 ).
303
304
305update_thread_pool(create_pool(Name, Size, Options, For), State0, State) :-
306 !,
307 ( rb_insert_new(State0,
308 Name, tpool(Options, Size, Size, WP, WP, []),
309 State)
310 -> thread_send_message(For, thread_pool(true))
311 ; reply_error(For, permission_error(create, thread_pool, Name)),
312 State = State0
313 ).
314update_thread_pool(destroy_pool(Name, For), State0, State) :-
315 !,
316 ( rb_delete(State0, Name, State)
317 -> thread_send_message(For, thread_pool(true))
318 ; reply_error(For, existence_error(thread_pool, Name)),
319 State = State0
320 ).
321update_thread_pool(current_pools(For), State, State) :-
322 !,
323 rb_keys(State, Keys),
324 debug(thread_pool(current), 'Reply to ~w: ~p', [For, Keys]),
325 reply(For, Keys).
326update_thread_pool(pool_properties(For, Name, P), State, State) :-
327 !,
328 ( rb_lookup(Name, Pool, State)
329 -> findall(P, pool_property(P, Pool), List),
330 reply(For, List)
331 ; reply_error(For, existence_error(thread_pool, Name))
332 ).
333update_thread_pool(Message, State0, State) :-
334 arg(1, Message, Name),
335 ( rb_lookup(Name, Pool0, State0)
336 -> update_pool(Message, Pool0, Pool),
337 rb_update(State0, Name, Pool, State)
338 ; State = State0,
339 ( Message = create(Name, _, For, _, _, _)
340 -> reply_error(For, existence_error(thread_pool, Name))
341 ; true
342 )
343 ).
344
345pool_property(options(Options),
346 tpool(Options, _Free, _Size, _WP, _WPT, _Members)).
347pool_property(backlog(Size),
348 tpool(_, _Free, _Size, WP, WPT, _Members)) :-
349 diff_list_length(WP, WPT, Size).
350pool_property(free(Free),
351 tpool(_, Free, _Size, _, _, _)).
352pool_property(size(Size),
353 tpool(_, _Free, Size, _, _, _)).
354pool_property(running(Count),
355 tpool(_, Free, Size, _, _, _)) :-
356 Count is Size - Free.
357pool_property(members(IDList),
358 tpool(_, _, _, _, _, IDList)).
359
360diff_list_length(List, Tail, Size) :-
361 '$skip_list'(Length, List, Rest),
362 ( Rest == Tail
363 -> Size = Length
364 ; type_error(difference_list, List/Tail)
365 ).
366
367
381
382update_pool(create(Name, Goal, For, _, Id, MyOptions),
383 tpool(Options, Free0, Size, WP, WPT, Members0),
384 tpool(Options, Free, Size, WP, WPT, Members)) :-
385 succ(Free, Free0),
386 !,
387 merge_options(MyOptions, Options, ThreadOptions),
388 select_option(at_exit(AtExit), ThreadOptions, ThreadOptions1, true),
389 catch(thread_create(Goal, Id,
390 [ at_exit(worker_exitted(Name, Id, AtExit))
391 | ThreadOptions1
392 ]),
393 E, true),
394 ( var(E)
395 -> Members = [Id|Members0],
396 reply(For, Id)
397 ; reply_error(For, E),
398 Members = Members0
399 ).
400update_pool(Create,
401 tpool(Options, 0, Size, WP, WPT0, Members),
402 tpool(Options, 0, Size, WP, WPT, Members)) :-
403 Create = create(Name, _Goal, For, Wait, _, _Options),
404 !,
405 option(backlog(BackLog), Options, infinite),
406 ( can_delay(Wait, BackLog, WP, WPT0)
407 -> WPT0 = [Create|WPT],
408 debug(thread_pool, 'Delaying ~p', [Create])
409 ; WPT = WPT0,
410 reply_error(For, resource_error(threads_in_pool(Name)))
411 ).
412update_pool(exitted(_Name, Id),
413 tpool(Options, Free0, Size, WP0, WPT, Members0),
414 Pool) :-
415 succ(Free0, Free),
416 delete(Members0, Id, Members1),
417 Pool1 = tpool(Options, Free, Size, WP, WPT, Members1),
418 ( WP0 == WPT
419 -> WP = WP0,
420 Pool = Pool1
421 ; WP0 = [Waiting|WP],
422 debug(thread_pool, 'Start delayed ~p', [Waiting]),
423 update_pool(Waiting, Pool1, Pool)
424 ).
425
426
427can_delay(true, infinite, _, _) :- !.
428can_delay(true, BackLog, WP, WPT) :-
429 diff_list_length(WP, WPT, Size),
430 BackLog > Size.
431
439
440:- public
441 worker_exitted/3. 442
443worker_exitted(Name, Id, AtExit) :-
444 catch(thread_send_message('__thread_pool_manager', exitted(Name, Id)),
445 _, true),
446 call(AtExit).
447
448
449 452
453reply(To, Term) :-
454 thread_send_message(To, thread_pool(true(Term))).
455
456reply_error(To, Error) :-
457 thread_send_message(To, thread_pool(error(Error, _))).
458
459wait_reply :-
460 thread_get_message(thread_pool(Result)),
461 ( Result == true
462 -> true
463 ; Result == fail
464 -> fail
465 ; throw(Result)
466 ).
467
468wait_reply(Value) :-
469 thread_get_message(thread_pool(Reply)),
470 ( Reply = true(Value0)
471 -> Value = Value0
472 ; Reply == fail
473 -> fail
474 ; throw(Reply)
475 ).
476
477
478 481
497
498 501:- multifile
502 prolog:message/3. 503
504prolog:message(thread_pool(Message)) -->
505 message(Message).
506
507message(manager_died(Status)) -->
508 [ 'Thread-pool: manager died on status ~p; restarting'-[Status] ]