ReactOS 0.4.16-dev-853-g88d9285
threadpool.c
Go to the documentation of this file.
1/*
2 * Thread pooling
3 *
4 * Copyright (c) 2006 Robert Shearman
5 * Copyright (c) 2014-2016 Sebastian Lackner
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
20 */
21
22
23#ifdef __REACTOS__
24#include <rtl_vista.h>
25#define NDEBUG
26#include "wine/list.h"
27#include <debug.h>
28
29#define ERR(fmt, ...) DPRINT1(fmt, ##__VA_ARGS__)
30#define FIXME(fmt, ...) DPRINT(fmt, ##__VA_ARGS__)
31#define WARN(fmt, ...) DPRINT(fmt, ##__VA_ARGS__)
32#define TRACE(fmt, ...) DPRINT(fmt, ##__VA_ARGS__)
33#ifndef ARRAY_SIZE
34#define ARRAY_SIZE(_x) (sizeof((_x))/sizeof((_x)[0]))
35#endif
36
37typedef struct _THREAD_NAME_INFORMATION
38{
39 UNICODE_STRING ThreadName;
40} THREAD_NAME_INFORMATION, *PTHREAD_NAME_INFORMATION;
41
47
50#define PRTL_WORK_ITEM_ROUTINE WORKERCALLBACKFUNC
51
52#define CRITICAL_SECTION RTL_CRITICAL_SECTION
53#define GetProcessHeap() RtlGetProcessHeap()
54#define GetCurrentProcess() NtCurrentProcess()
55#define GetCurrentThread() NtCurrentThread()
56#define GetCurrentThreadId() HandleToULong(NtCurrentTeb()->ClientId.UniqueThread)
57#else
58#include <assert.h>
59#include <stdarg.h>
60#include <limits.h>
61
62#include "ntstatus.h"
63#define WIN32_NO_STATUS
64#include "winternl.h"
65
66#include "wine/debug.h"
67#include "wine/list.h"
68
69#include "ntdll_misc.h"
70
72#endif
73
74/*
75 * Old thread pooling API
76 */
77
79{
82};
83
84#define EXPIRE_NEVER (~(ULONGLONG)0)
85#define TIMER_QUEUE_MAGIC 0x516d6954 /* TimQ */
86
87#ifndef __REACTOS__
89#endif
90
91static struct
92{
95}
97{
98 NULL, /* compl_port */
99#ifdef __REACTOS__
100 {0}, /* threadpool_compl_cs */
101#else
102 { &critsect_compl_debug, -1, 0, 0, 0, 0 }, /* threadpool_compl_cs */
103#endif
105
106#ifndef __REACTOS__
108{
109 0, 0, &old_threadpool.threadpool_compl_cs,
111 0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_compl_cs") }
112};
113#endif
114
115struct timer_queue;
117{
118 struct timer_queue *q;
119 struct list entry;
120 ULONG runcount; /* number of callbacks pending execution */
126 BOOL destroy; /* timer should be deleted; once set, never unset */
127 HANDLE event; /* removal event */
128};
129
131{
134 struct list timers; /* sorted by expiration time */
135 BOOL quit; /* queue should be deleted; once set, never unset */
138};
139
140/*
141 * Object-oriented thread pooling API
142 */
143
144#define THREADPOOL_WORKER_TIMEOUT 5000
145#define MAXIMUM_WAITQUEUE_OBJECTS (MAXIMUM_WAIT_OBJECTS - 1)
146
147/* internal threadpool representation */
149{
154 /* Pools of work items, locked via .cs, order matches TP_CALLBACK_PRIORITY - high, normal, low. */
155 struct list pools[3];
157 /* information about worker threads, locked via .cs */
164};
165
167{
173};
174
176{
179};
180
181/* internal threadpool object representation */
183{
184 void *win32_callback; /* leave space for kernelbase to store win32 callback */
187 /* read-only information */
197 /* information about the group, locked via .group->cs */
200 /* information about the pool, locked via .pool->cs */
208 /* arguments for callback */
209 union
210 {
211 struct
212 {
215 struct
216 {
219 struct
220 {
222 /* information about the timer, locked via timerqueue.cs */
231 struct
232 {
235 /* information about the wait object, locked via waitqueue.cs */
244 struct
245 {
247 /* locked via .pool->cs */
251 } io;
252 } u;
253};
254
255/* internal threadpool instance representation */
257{
262 struct
263 {
271};
272
273/* internal threadpool group representation */
275{
279 /* list of group members, locked via .cs */
280 struct list members;
281};
282
283#ifndef __REACTOS__
284/* global timerqueue object */
286#endif
287
288static struct
289{
295}
297{
298#ifdef __REACTOS__
299 {0}, /* cs */
300#else
301 { &timerqueue_debug, -1, 0, 0, 0, 0 }, /* cs */
302#endif
303 0, /* objcount */
304 FALSE, /* thread_running */
305 LIST_INIT( timerqueue.pending_timers ), /* pending_timers */
306#if __REACTOS__
307 0,
308#else
309 RTL_CONDITION_VARIABLE_INIT /* update_event */
310#endif
312
313#ifndef __REACTOS__
315{
316 0, 0, &timerqueue.cs,
318 0, 0, { (DWORD_PTR)(__FILE__ ": timerqueue.cs") }
319};
320
321/* global waitqueue object */
323#endif
324
325static struct
326{
329 struct list buckets;
330}
331waitqueue =
332{
333#ifdef __REACTOS__
334 {0}, /* cs */
335#else
336 { &waitqueue_debug, -1, 0, 0, 0, 0 }, /* cs */
337#endif
338 0, /* num_buckets */
339 LIST_INIT( waitqueue.buckets ) /* buckets */
341
342#ifndef __REACTOS__
344{
345 0, 0, &waitqueue.cs,
347 0, 0, { (DWORD_PTR)(__FILE__ ": waitqueue.cs") }
348};
349#endif
350
352{
356 struct list waiting;
359};
360
361#ifndef __REACTOS__
362/* global I/O completion queue object */
364#endif
365
366static struct
367{
373}
374ioqueue =
375{
376#ifdef __REACTOS__
377 .cs = {0},
378#else
379 .cs = { &ioqueue_debug, -1, 0, 0, 0, 0 },
380#endif
382
383#ifndef __REACTOS__
385{
386 0, 0, &ioqueue.cs,
388 0, 0, { (DWORD_PTR)(__FILE__ ": ioqueue.cs") }
389};
390#endif
391
392static inline struct threadpool *impl_from_TP_POOL( TP_POOL *pool )
393{
394 return (struct threadpool *)pool;
395}
396
398{
399 struct threadpool_object *object = (struct threadpool_object *)work;
401 return object;
402}
403
405{
406 struct threadpool_object *object = (struct threadpool_object *)timer;
408 return object;
409}
410
412{
413 struct threadpool_object *object = (struct threadpool_object *)wait;
415 return object;
416}
417
418static inline struct threadpool_object *impl_from_TP_IO( TP_IO *io )
419{
420 struct threadpool_object *object = (struct threadpool_object *)io;
421 assert( object->type == TP_OBJECT_TYPE_IO );
422 return object;
423}
424
426{
427 return (struct threadpool_group *)group;
428}
429
431{
432 return (struct threadpool_instance *)instance;
433}
434
435#ifdef __REACTOS__
437#else
438static void CALLBACK threadpool_worker_proc( void *param );
439#endif
440static void tp_object_submit( struct threadpool_object *object, BOOL signaled );
441static void tp_object_execute( struct threadpool_object *object, BOOL wait_thread );
442static void tp_object_prepare_shutdown( struct threadpool_object *object );
443static BOOL tp_object_release( struct threadpool_object *object );
445
446static BOOL array_reserve(void **elements, unsigned int *capacity, unsigned int count, unsigned int size)
447{
448 unsigned int new_capacity, max_capacity;
449 void *new_elements;
450
451 if (count <= *capacity)
452 return TRUE;
453
454 max_capacity = ~(SIZE_T)0 / size;
455 if (count > max_capacity)
456 return FALSE;
457
458 new_capacity = max(4, *capacity);
459 while (new_capacity < count && new_capacity <= max_capacity / 2)
460 new_capacity *= 2;
461 if (new_capacity < count)
462 new_capacity = max_capacity;
463
464 if (!(new_elements = RtlReAllocateHeap( GetProcessHeap(), 0, *elements, new_capacity * size )))
465 return FALSE;
466
467 *elements = new_elements;
468 *capacity = new_capacity;
469
470 return TRUE;
471}
472
473static void set_thread_name(const WCHAR *name)
474{
475#ifndef __REACTOS__ // This is impossible on non vista+
476 THREAD_NAME_INFORMATION info;
477
478 RtlInitUnicodeString(&info.ThreadName, name);
479 NtSetInformationThread(GetCurrentThread(), ThreadNameInformation, &info, sizeof(info));
480#endif
481}
482
483#ifndef __REACTOS__
485{
486 struct rtl_work_item *item = userdata;
487
488 TRACE("executing %p(%p)\n", item->function, item->context);
489 item->function( item->context );
490
492}
493
494/***********************************************************************
495 * RtlQueueWorkItem (NTDLL.@)
496 *
497 * Queues a work item into a thread in the thread pool.
498 *
499 * PARAMS
500 * function [I] Work function to execute.
501 * context [I] Context to pass to the work function when it is executed.
502 * flags [I] Flags. See notes.
503 *
504 * RETURNS
505 * Success: STATUS_SUCCESS.
506 * Failure: Any NTSTATUS code.
507 *
508 * NOTES
509 * Flags can be one or more of the following:
510 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
511 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
512 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
513 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
514 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
515 */
517{
518 TP_CALLBACK_ENVIRON environment;
519 struct rtl_work_item *item;
521
522 TRACE( "%p %p %lu\n", function, context, flags );
523
524 item = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*item) );
525 if (!item)
526 return STATUS_NO_MEMORY;
527
528 memset( &environment, 0, sizeof(environment) );
529 environment.Version = 1;
530 environment.u.s.LongFunction = (flags & WT_EXECUTELONGFUNCTION) != 0;
531 environment.u.s.Persistent = (flags & WT_EXECUTEINPERSISTENTTHREAD) != 0;
532
533 item->function = function;
534 item->context = context;
535
538 return status;
539}
540
541/***********************************************************************
542 * iocp_poller - get completion events and run callbacks
543 */
545{
546 HANDLE cport = Arg;
547
548 while( TRUE )
549 {
553#ifdef __REACTOS__
555#else
557#endif
558 if (res)
559 {
560 ERR("NtRemoveIoCompletion failed: 0x%lx\n", res);
561 }
562 else
563 {
564 DWORD transferred = 0;
565 DWORD err = 0;
566
568 transferred = iosb.Information;
569 else
571
572 callback( err, transferred, overlapped );
573 }
574 }
575 return 0;
576}
577
578/***********************************************************************
579 * RtlSetIoCompletionCallback (NTDLL.@)
580 *
581 * Binds a handle to a thread pool's completion port, and possibly
582 * starts a non-I/O thread to monitor this port and call functions back.
583 *
584 * PARAMS
585 * FileHandle [I] Handle to bind to a completion port.
586 * Function [I] Callback function to call on I/O completions.
587 * Flags [I] Not used.
588 *
589 * RETURNS
590 * Success: STATUS_SUCCESS.
591 * Failure: Any NTSTATUS code.
592 *
593 */
595{
598
599 if (Flags) FIXME("Unknown value Flags=0x%lx\n", Flags);
600
601 if (!old_threadpool.compl_port)
602 {
604
605 RtlEnterCriticalSection(&old_threadpool.threadpool_compl_cs);
606 if (!old_threadpool.compl_port)
607 {
608 HANDLE cport;
609
611 if (!res)
612 {
613 /* FIXME native can start additional threads in case of e.g. hung callback function. */
615 if (!res)
616 old_threadpool.compl_port = cport;
617 else
618 NtClose( cport );
619 }
620 }
621 RtlLeaveCriticalSection(&old_threadpool.threadpool_compl_cs);
622 if (res) return res;
623 }
624
625 info.CompletionPort = old_threadpool.compl_port;
626 info.CompletionKey = (ULONG_PTR)Function;
627
629}
630#endif
631
633{
634 if (timeout == INFINITE) return NULL;
635 pTime->QuadPart = (ULONGLONG)timeout * -10000;
636 return pTime;
637}
638
639/************************** Timer Queue Impl **************************/
640
641static void queue_remove_timer(struct queue_timer *t)
642{
643 /* We MUST hold the queue cs while calling this function. This ensures
644 that we cannot queue another callback for this timer. The runcount
645 being zero makes sure we don't have any already queued. */
646 struct timer_queue *q = t->q;
647
648 assert(t->runcount == 0);
649 assert(t->destroy);
650
651 list_remove(&t->entry);
652 if (t->event)
653 NtSetEvent(t->event, NULL);
655
656 if (q->quit && list_empty(&q->timers))
657 NtSetEvent(q->event, NULL);
658}
659
661{
662 struct timer_queue *q = t->q;
664
665 assert(0 < t->runcount);
666 --t->runcount;
667
668 if (t->destroy && t->runcount == 0)
670
672}
673
675{
676 struct queue_timer *t = p;
677 t->callback(t->param, TRUE);
679 return 0;
680}
681
682static inline ULONGLONG queue_current_time(void)
683{
684 LARGE_INTEGER now, freq;
686 return now.QuadPart * 1000 / freq.QuadPart;
687}
688
690 BOOL set_event)
691{
692 /* We MUST hold the queue cs while calling this function. */
693 struct timer_queue *q = t->q;
694 struct list *ptr = &q->timers;
695
696 assert(!q->quit || (t->destroy && time == EXPIRE_NEVER));
697
698 if (time != EXPIRE_NEVER)
699 LIST_FOR_EACH(ptr, &q->timers)
700 {
701 struct queue_timer *cur = LIST_ENTRY(ptr, struct queue_timer, entry);
702 if (time < cur->expire)
703 break;
704 }
705 list_add_before(ptr, &t->entry);
706
707 t->expire = time;
708
709 /* If we insert at the head of the list, we need to expire sooner
710 than expected. */
711 if (set_event && &t->entry == list_head(&q->timers))
712 NtSetEvent(q->event, NULL);
713}
714
715static inline void queue_move_timer(struct queue_timer *t, ULONGLONG time,
716 BOOL set_event)
717{
718 /* We MUST hold the queue cs while calling this function. */
719 list_remove(&t->entry);
720 queue_add_timer(t, time, set_event);
721}
722
723static void queue_timer_expire(struct timer_queue *q)
724{
725 struct queue_timer *t = NULL;
726
728 if (list_head(&q->timers))
729 {
731 t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
732 if (!t->destroy && t->expire <= ((now = queue_current_time())))
733 {
734 ++t->runcount;
735 if (t->period)
736 {
737 next = t->expire + t->period;
738 /* avoid trigger cascade if overloaded / hibernated */
739 if (next < now)
740 next = now + t->period;
741 }
742 else
745 }
746 else
747 t = NULL;
748 }
750
751 if (t)
752 {
753 if (t->flags & WT_EXECUTEINTIMERTHREAD)
755 else
756 {
758 = (t->flags
762 if (status != STATUS_SUCCESS)
764 }
765 }
766}
767
769{
770 struct queue_timer *t;
772
774 if (list_head(&q->timers))
775 {
776 t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
777 assert(!t->destroy || t->expire == EXPIRE_NEVER);
778
779 if (t->expire != EXPIRE_NEVER)
780 {
782 timeout = t->expire < time ? 0 : t->expire - time;
783 }
784 }
786
787 return timeout;
788}
789
790#ifdef __REACTOS__
792#else
794#endif
795{
796 struct timer_queue *q = p;
797 ULONG timeout_ms;
798
799 set_thread_name(L"wine_threadpool_timer_queue");
800 timeout_ms = INFINITE;
801 for (;;)
802 {
805 BOOL done = FALSE;
806
808 q->event, FALSE, get_nt_timeout(&timeout, timeout_ms));
809
810 if (status == STATUS_WAIT_0)
811 {
812 /* There are two possible ways to trigger the event. Either
813 we are quitting and the last timer got removed, or a new
814 timer got put at the head of the list so we need to adjust
815 our timeout. */
817 if (q->quit && list_empty(&q->timers))
818 done = TRUE;
820 }
821 else if (status == STATUS_TIMEOUT)
823
824 if (done)
825 break;
826
827 timeout_ms = queue_get_timeout(q);
828 }
829
830 NtClose(q->event);
832 q->magic = 0;
835#ifdef __REACTOS__
836 return STATUS_SUCCESS;
837#endif
838}
839
841{
842 /* We MUST hold the queue cs while calling this function. */
843 t->destroy = TRUE;
844 if (t->runcount == 0)
845 /* Ensure a timer is promptly removed. If callbacks are pending,
846 it will be removed after the last one finishes by the callback
847 cleanup wrapper. */
849 else
850 /* Make sure no destroyed timer masks an active timer at the head
851 of the sorted list. */
853}
854
855/***********************************************************************
856 * RtlCreateTimerQueue (NTDLL.@)
857 *
858 * Creates a timer queue object and returns a handle to it.
859 *
860 * PARAMS
861 * NewTimerQueue [O] The newly created queue.
862 *
863 * RETURNS
864 * Success: STATUS_SUCCESS.
865 * Failure: Any NTSTATUS code.
866 */
868{
870 struct timer_queue *q = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *q);
871 if (!q)
872 return STATUS_NO_MEMORY;
873
875 list_init(&q->timers);
876 q->quit = FALSE;
877 q->magic = TIMER_QUEUE_MAGIC;
879 if (status != STATUS_SUCCESS)
880 {
882 return status;
883 }
885 timer_queue_thread_proc, q, &q->thread, NULL);
886 if (status != STATUS_SUCCESS)
887 {
888 NtClose(q->event);
890 return status;
891 }
892
893 *NewTimerQueue = q;
894 return STATUS_SUCCESS;
895}
896
897/***********************************************************************
898 * RtlDeleteTimerQueueEx (NTDLL.@)
899 *
900 * Deletes a timer queue object.
901 *
902 * PARAMS
903 * TimerQueue [I] The timer queue to destroy.
904 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
905 * wait until all timers are finished firing before
906 * returning. Otherwise, return immediately and set the
907 * event when all timers are done.
908 *
909 * RETURNS
910 * Success: STATUS_SUCCESS if synchronous, STATUS_PENDING if not.
911 * Failure: Any NTSTATUS code.
912 */
914{
915 struct timer_queue *q = TimerQueue;
916 struct queue_timer *t, *temp;
919
920 if (!q || q->magic != TIMER_QUEUE_MAGIC)
922
923 thread = q->thread;
924
926 q->quit = TRUE;
927 if (list_head(&q->timers))
928 /* When the last timer is removed, it will signal the timer thread to
929 exit... */
930 LIST_FOR_EACH_ENTRY_SAFE(t, temp, &q->timers, struct queue_timer, entry)
932 else
933 /* However if we have none, we must do it ourselves. */
934 NtSetEvent(q->event, NULL);
936
937 if (CompletionEvent == INVALID_HANDLE_VALUE)
938 {
941 }
942 else
943 {
944 if (CompletionEvent)
945 {
946 FIXME("asynchronous return on completion event unimplemented\n");
948 NtSetEvent(CompletionEvent, NULL);
949 }
951 }
952
954 return status;
955}
956
957static struct timer_queue *get_timer_queue(HANDLE TimerQueue)
958{
959 static struct timer_queue *default_timer_queue;
960
961 if (TimerQueue)
962 return TimerQueue;
963 else
964 {
965 if (!default_timer_queue)
966 {
967 HANDLE q;
969 if (status == STATUS_SUCCESS)
970 {
971 PVOID p = InterlockedCompareExchangePointer( (void **) &default_timer_queue, q, NULL );
972 if (p)
973 /* Got beat to the punch. */
975 }
976 }
977 return default_timer_queue;
978 }
979}
980
981/***********************************************************************
982 * RtlCreateTimer (NTDLL.@)
983 *
984 * Creates a new timer associated with the given queue.
985 *
986 * PARAMS
987 * TimerQueue [I] The queue to hold the timer.
988 * NewTimer [O] The newly created timer.
989 * Callback [I] The callback to fire.
990 * Parameter [I] The argument for the callback.
991 * DueTime [I] The delay, in milliseconds, before first firing the
992 * timer.
993 * Period [I] The period, in milliseconds, at which to fire the timer
994 * after the first callback. If zero, the timer will only
995 * fire once. It still needs to be deleted with
996 * RtlDeleteTimer.
997 * Flags [I] Flags controlling the execution of the callback. In
998 * addition to the WT_* thread pool flags (see
999 * RtlQueueWorkItem), WT_EXECUTEINTIMERTHREAD and
1000 * WT_EXECUTEONLYONCE are supported.
1001 *
1002 * RETURNS
1003 * Success: STATUS_SUCCESS.
1004 * Failure: Any NTSTATUS code.
1005 */
1009 ULONG Flags)
1010{
1012 struct queue_timer *t;
1013 struct timer_queue *q = get_timer_queue(TimerQueue);
1014
1015 if (!q) return STATUS_NO_MEMORY;
1016 if (q->magic != TIMER_QUEUE_MAGIC) return STATUS_INVALID_HANDLE;
1017
1018 t = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *t);
1019 if (!t)
1020 return STATUS_NO_MEMORY;
1021
1022 t->q = q;
1023 t->runcount = 0;
1024 t->callback = Callback;
1025 t->param = Parameter;
1026 t->period = Period;
1027 t->flags = Flags;
1028 t->destroy = FALSE;
1029 t->event = NULL;
1030
1033 if (q->quit)
1035 else
1038
1039 if (status == STATUS_SUCCESS)
1040 *NewTimer = t;
1041 else
1043
1044 return status;
1045}
1046
1047/***********************************************************************
1048 * RtlUpdateTimer (NTDLL.@)
1049 *
1050 * Changes the time at which a timer expires.
1051 *
1052 * PARAMS
1053 * TimerQueue [I] The queue that holds the timer.
1054 * Timer [I] The timer to update.
1055 * DueTime [I] The delay, in milliseconds, before next firing the timer.
1056 * Period [I] The period, in milliseconds, at which to fire the timer
1057 * after the first callback. If zero, the timer will not
1058 * refire once. It still needs to be deleted with
1059 * RtlDeleteTimer.
1060 *
1061 * RETURNS
1062 * Success: STATUS_SUCCESS.
1063 * Failure: Any NTSTATUS code.
1064 */
1067{
1068 struct queue_timer *t = Timer;
1069 struct timer_queue *q = t->q;
1070
1072 /* Can't change a timer if it was once-only or destroyed. */
1073 if (t->expire != EXPIRE_NEVER)
1074 {
1075 t->period = Period;
1077 }
1079
1080 return STATUS_SUCCESS;
1081}
1082
1083/***********************************************************************
1084 * RtlDeleteTimer (NTDLL.@)
1085 *
1086 * Cancels a timer-queue timer.
1087 *
1088 * PARAMS
1089 * TimerQueue [I] The queue that holds the timer.
1090 * Timer [I] The timer to update.
1091 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
1092 * wait until the timer is finished firing all pending
1093 * callbacks before returning. Otherwise, return
1094 * immediately and set the timer is done.
1095 *
1096 * RETURNS
1097 * Success: STATUS_SUCCESS if the timer is done, STATUS_PENDING if not,
1098 or if the completion event is NULL.
1099 * Failure: Any NTSTATUS code.
1100 */
1102 HANDLE CompletionEvent)
1103{
1104 struct queue_timer *t = Timer;
1105 struct timer_queue *q;
1107 HANDLE event = NULL;
1108
1109 if (!Timer)
1111 q = t->q;
1112 if (CompletionEvent == INVALID_HANDLE_VALUE)
1113 {
1115 if (status == STATUS_SUCCESS)
1117 }
1118 else if (CompletionEvent)
1119 event = CompletionEvent;
1120
1122 t->event = event;
1123 if (t->runcount == 0 && event)
1127
1128 if (CompletionEvent == INVALID_HANDLE_VALUE && event)
1129 {
1130 if (status == STATUS_PENDING)
1131 {
1134 }
1135 NtClose(event);
1136 }
1137
1138 return status;
1139}
1140
1141/***********************************************************************
1142 * timerqueue_thread_proc (internal)
1143 */
1144#ifdef __REACTOS__
1146#else
1148#endif
1149{
1150 ULONGLONG timeout_lower, timeout_upper, new_timeout;
1151 struct threadpool_object *other_timer;
1153 struct list *ptr;
1154
1155 TRACE( "starting timer queue thread\n" );
1156 set_thread_name(L"wine_threadpool_timerqueue");
1157
1159 for (;;)
1160 {
1162
1163 /* Check for expired timers. */
1164 while ((ptr = list_head( &timerqueue.pending_timers )))
1165 {
1166 struct threadpool_object *timer = LIST_ENTRY( ptr, struct threadpool_object, u.timer.timer_entry );
1167 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1168 assert( timer->u.timer.timer_pending );
1169 if (timer->u.timer.timeout > now.QuadPart)
1170 break;
1171
1172 /* Queue a new callback in one of the worker threads. */
1173 list_remove( &timer->u.timer.timer_entry );
1174 timer->u.timer.timer_pending = FALSE;
1176
1177 /* Insert the timer back into the queue, except it's marked for shutdown. */
1178 if (timer->u.timer.period && !timer->shutdown)
1179 {
1180 timer->u.timer.timeout += (ULONGLONG)timer->u.timer.period * 10000;
1181 if (timer->u.timer.timeout <= now.QuadPart)
1182 timer->u.timer.timeout = now.QuadPart + 1;
1183
1184 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
1185 struct threadpool_object, u.timer.timer_entry )
1186 {
1187 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
1188 if (timer->u.timer.timeout < other_timer->u.timer.timeout)
1189 break;
1190 }
1191 list_add_before( &other_timer->u.timer.timer_entry, &timer->u.timer.timer_entry );
1192 timer->u.timer.timer_pending = TRUE;
1193 }
1194 }
1195
1196 timeout_lower = timeout_upper = MAXLONGLONG;
1197
1198 /* Determine next timeout and use the window length to optimize wakeup times. */
1199 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
1200 struct threadpool_object, u.timer.timer_entry )
1201 {
1202 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
1203 if (other_timer->u.timer.timeout >= timeout_upper)
1204 break;
1205
1206 timeout_lower = other_timer->u.timer.timeout;
1207 new_timeout = timeout_lower + (ULONGLONG)other_timer->u.timer.window_length * 10000;
1208 if (new_timeout < timeout_upper)
1209 timeout_upper = new_timeout;
1210 }
1211
1212 /* Wait for timer update events or until the next timer expires. */
1213 if (timerqueue.objcount)
1214 {
1215 timeout.QuadPart = timeout_lower;
1217 continue;
1218 }
1219
1220 /* All timers have been destroyed, if no new timers are created
1221 * within some amount of time, then we can shutdown this thread. */
1222 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
1223 if (RtlSleepConditionVariableCS( &timerqueue.update_event, &timerqueue.cs,
1224 &timeout ) == STATUS_TIMEOUT && !timerqueue.objcount)
1225 {
1226 break;
1227 }
1228 }
1229
1230 timerqueue.thread_running = FALSE;
1232
1233 TRACE( "terminating timer queue thread\n" );
1234 RtlExitUserThread( 0 );
1235#ifdef __REACTOS__
1236 return STATUS_SUCCESS;
1237#endif
1238}
1239
1240/***********************************************************************
1241 * tp_new_worker_thread (internal)
1242 *
1243 * Create and account a new worker thread for the desired pool.
1244 */
1246{
1247 HANDLE thread;
1249
1251 pool->stack_info.StackReserve, pool->stack_info.StackCommit,
1253 if (status == STATUS_SUCCESS)
1254 {
1255 InterlockedIncrement( &pool->refcount );
1256 pool->num_workers++;
1257 NtClose( thread );
1258 }
1259 return status;
1260}
1261
1262/***********************************************************************
1263 * tp_timerqueue_lock (internal)
1264 *
1265 * Acquires a lock on the global timerqueue. When the lock is acquired
1266 * successfully, it is guaranteed that the timer thread is running.
1267 */
1269{
1271 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1272
1273 timer->u.timer.timer_initialized = FALSE;
1274 timer->u.timer.timer_pending = FALSE;
1275 timer->u.timer.timer_set = FALSE;
1276 timer->u.timer.timeout = 0;
1277 timer->u.timer.period = 0;
1278 timer->u.timer.window_length = 0;
1279
1281
1282 /* Make sure that the timerqueue thread is running. */
1283 if (!timerqueue.thread_running)
1284 {
1285 HANDLE thread;
1288 if (status == STATUS_SUCCESS)
1289 {
1290 timerqueue.thread_running = TRUE;
1291 NtClose( thread );
1292 }
1293 }
1294
1295 if (status == STATUS_SUCCESS)
1296 {
1297 timer->u.timer.timer_initialized = TRUE;
1298 timerqueue.objcount++;
1299 }
1300
1302 return status;
1303}
1304
1305/***********************************************************************
1306 * tp_timerqueue_unlock (internal)
1307 *
1308 * Releases a lock on the global timerqueue.
1309 */
1311{
1312 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1313
1315 if (timer->u.timer.timer_initialized)
1316 {
1317 /* If timer was pending, remove it. */
1318 if (timer->u.timer.timer_pending)
1319 {
1320 list_remove( &timer->u.timer.timer_entry );
1321 timer->u.timer.timer_pending = FALSE;
1322 }
1323
1324 /* If the last timer object was destroyed, then wake up the thread. */
1325 if (!--timerqueue.objcount)
1326 {
1327 assert( list_empty( &timerqueue.pending_timers ) );
1328 RtlWakeAllConditionVariable( &timerqueue.update_event );
1329 }
1330
1331 timer->u.timer.timer_initialized = FALSE;
1332 }
1334}
1335
1336/***********************************************************************
1337 * waitqueue_thread_proc (internal)
1338 */
1339#ifdef __REACTOS__
1341#else
1343#endif
1344{
1347 struct waitqueue_bucket *bucket = param;
1348 struct threadpool_object *wait, *next;
1350 DWORD num_handles;
1352
1353 TRACE( "starting wait queue thread\n" );
1354 set_thread_name(L"wine_threadpool_waitqueue");
1355
1357
1358 for (;;)
1359 {
1361 timeout.QuadPart = MAXLONGLONG;
1362 num_handles = 0;
1363
1365 u.wait.wait_entry )
1366 {
1367 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1368 if (wait->u.wait.timeout <= now.QuadPart)
1369 {
1370 /* Wait object timed out. */
1371 if ((wait->u.wait.flags & WT_EXECUTEONLYONCE))
1372 {
1373 list_remove( &wait->u.wait.wait_entry );
1374 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1375 }
1376 if ((wait->u.wait.flags & (WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD)))
1377 {
1378 InterlockedIncrement( &wait->refcount );
1379 wait->num_pending_callbacks++;
1380 RtlEnterCriticalSection( &wait->pool->cs );
1382 RtlLeaveCriticalSection( &wait->pool->cs );
1384 }
1385 else tp_object_submit( wait, FALSE );
1386 }
1387 else
1388 {
1389 if (wait->u.wait.timeout < timeout.QuadPart)
1390 timeout.QuadPart = wait->u.wait.timeout;
1391
1392 assert( num_handles < MAXIMUM_WAITQUEUE_OBJECTS );
1393 InterlockedIncrement( &wait->refcount );
1394 objects[num_handles] = wait;
1395 handles[num_handles] = wait->u.wait.handle;
1396 num_handles++;
1397 }
1398 }
1399
1400 if (!bucket->objcount)
1401 {
1402 /* All wait objects have been destroyed, if no new wait objects are created
1403 * within some amount of time, then we can shutdown this thread. */
1404 assert( num_handles == 0 );
1406 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
1407 status = NtWaitForMultipleObjects( 1, &bucket->update_event, TRUE, bucket->alertable, &timeout );
1409
1410 if (status == STATUS_TIMEOUT && !bucket->objcount)
1411 break;
1412 }
1413 else
1414 {
1415 handles[num_handles] = bucket->update_event;
1417 status = NtWaitForMultipleObjects( num_handles + 1, handles, TRUE, bucket->alertable, &timeout );
1419
1420 if (status >= STATUS_WAIT_0 && status < STATUS_WAIT_0 + num_handles)
1421 {
1423 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1424 if (wait->u.wait.bucket)
1425 {
1426 /* Wait object signaled. */
1427 assert( wait->u.wait.bucket == bucket );
1428 if ((wait->u.wait.flags & WT_EXECUTEONLYONCE))
1429 {
1430 list_remove( &wait->u.wait.wait_entry );
1431 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1432 }
1433 if ((wait->u.wait.flags & (WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD)))
1434 {
1435 wait->u.wait.signaled++;
1436 wait->num_pending_callbacks++;
1437 RtlEnterCriticalSection( &wait->pool->cs );
1439 RtlLeaveCriticalSection( &wait->pool->cs );
1440 }
1441 else tp_object_submit( wait, TRUE );
1442 }
1443 else
1444 WARN("wait object %p triggered while object was destroyed\n", wait);
1445 }
1446
1447 /* Release temporary references to wait objects. */
1448 while (num_handles)
1449 {
1450 wait = objects[--num_handles];
1451 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1453 }
1454 }
1455
1456 /* Try to merge bucket with other threads. */
1457 if (waitqueue.num_buckets > 1 && bucket->objcount &&
1458 bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 1 / 3)
1459 {
1460 struct waitqueue_bucket *other_bucket;
1461 LIST_FOR_EACH_ENTRY( other_bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
1462 {
1463 if (other_bucket != bucket && other_bucket->objcount && other_bucket->alertable == bucket->alertable &&
1464 other_bucket->objcount + bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 2 / 3)
1465 {
1466 other_bucket->objcount += bucket->objcount;
1467 bucket->objcount = 0;
1468
1469 /* Update reserved list. */
1470 LIST_FOR_EACH_ENTRY( wait, &bucket->reserved, struct threadpool_object, u.wait.wait_entry )
1471 {
1472 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1473 wait->u.wait.bucket = other_bucket;
1474 }
1475 list_move_tail( &other_bucket->reserved, &bucket->reserved );
1476
1477 /* Update waiting list. */
1478 LIST_FOR_EACH_ENTRY( wait, &bucket->waiting, struct threadpool_object, u.wait.wait_entry )
1479 {
1480 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1481 wait->u.wait.bucket = other_bucket;
1482 }
1483 list_move_tail( &other_bucket->waiting, &bucket->waiting );
1484
1485 /* Move bucket to the end, to keep the probability of
1486 * newly added wait objects as small as possible. */
1487 list_remove( &bucket->bucket_entry );
1488 list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
1489
1490 NtSetEvent( other_bucket->update_event, NULL );
1491 break;
1492 }
1493 }
1494 }
1495 }
1496
1497 /* Remove this bucket from the list. */
1498 list_remove( &bucket->bucket_entry );
1499 if (!--waitqueue.num_buckets)
1500 assert( list_empty( &waitqueue.buckets ) );
1501
1503
1504 TRACE( "terminating wait queue thread\n" );
1505
1506 assert( bucket->objcount == 0 );
1507 assert( list_empty( &bucket->reserved ) );
1508 assert( list_empty( &bucket->waiting ) );
1509 NtClose( bucket->update_event );
1510
1511 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1512 RtlExitUserThread( 0 );
1513}
1514
1515/***********************************************************************
1516 * tp_waitqueue_lock (internal)
1517 */
1519{
1520 struct waitqueue_bucket *bucket;
1522 HANDLE thread;
1523 BOOL alertable = (wait->u.wait.flags & WT_EXECUTEINIOTHREAD) != 0;
1524 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1525
1526 wait->u.wait.signaled = 0;
1527 wait->u.wait.bucket = NULL;
1528 wait->u.wait.wait_pending = FALSE;
1529 wait->u.wait.timeout = 0;
1530 wait->u.wait.handle = INVALID_HANDLE_VALUE;
1531
1533
1534 /* Try to assign to existing bucket if possible. */
1535 LIST_FOR_EACH_ENTRY( bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
1536 {
1537 if (bucket->objcount < MAXIMUM_WAITQUEUE_OBJECTS && bucket->alertable == alertable)
1538 {
1539 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1540 wait->u.wait.bucket = bucket;
1541 bucket->objcount++;
1542
1544 goto out;
1545 }
1546 }
1547
1548 /* Create a new bucket and corresponding worker thread. */
1549 bucket = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*bucket) );
1550 if (!bucket)
1551 {
1553 goto out;
1554 }
1555
1556 bucket->objcount = 0;
1557 bucket->alertable = alertable;
1558 list_init( &bucket->reserved );
1559 list_init( &bucket->waiting );
1560
1563 if (status)
1564 {
1565 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1566 goto out;
1567 }
1568
1571 if (status == STATUS_SUCCESS)
1572 {
1573 list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
1574 waitqueue.num_buckets++;
1575
1576 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1577 wait->u.wait.bucket = bucket;
1578 bucket->objcount++;
1579
1580 NtClose( thread );
1581 }
1582 else
1583 {
1584 NtClose( bucket->update_event );
1585 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1586 }
1587
1588out:
1590 return status;
1591}
1592
1593/***********************************************************************
1594 * tp_waitqueue_unlock (internal)
1595 */
1596static void tp_waitqueue_unlock( struct threadpool_object *wait )
1597{
1598 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1599
1601 if (wait->u.wait.bucket)
1602 {
1603 struct waitqueue_bucket *bucket = wait->u.wait.bucket;
1604 assert( bucket->objcount > 0 );
1605
1606 list_remove( &wait->u.wait.wait_entry );
1607 wait->u.wait.bucket = NULL;
1608 bucket->objcount--;
1609
1610 NtSetEvent( bucket->update_event, NULL );
1611 }
1613}
1614
1615#ifdef __REACTOS__
1617#else
1619#endif
1620{
1621 struct io_completion *completion;
1622 struct threadpool_object *io;
1624#ifdef __REACTOS__
1625 PVOID key, value;
1626#else
1628#endif
1629 BOOL destroy, skip;
1631
1632 TRACE( "starting I/O completion thread\n" );
1633 set_thread_name(L"wine_threadpool_ioqueue");
1634
1636
1637 for (;;)
1638 {
1640 if ((status = NtRemoveIoCompletion( ioqueue.port, &key, &value, &iosb, NULL )))
1641 ERR("NtRemoveIoCompletion failed, status %#lx.\n", status);
1643
1644 destroy = skip = FALSE;
1645 io = (struct threadpool_object *)key;
1646
1647 TRACE( "io %p, iosb.Status %#lx.\n", io, iosb.Status );
1648
1649 if (io && (io->shutdown || io->u.io.shutting_down))
1650 {
1651 RtlEnterCriticalSection( &io->pool->cs );
1652 if (!io->u.io.pending_count)
1653 {
1654 if (io->u.io.skipped_count)
1655 --io->u.io.skipped_count;
1656
1657 if (io->u.io.skipped_count)
1658 skip = TRUE;
1659 else
1660 destroy = TRUE;
1661 }
1662 RtlLeaveCriticalSection( &io->pool->cs );
1663 if (skip) continue;
1664 }
1665
1666 if (destroy)
1667 {
1668 --ioqueue.objcount;
1669 TRACE( "Releasing io %p.\n", io );
1670 io->shutdown = TRUE;
1672 }
1673 else if (io)
1674 {
1675 RtlEnterCriticalSection( &io->pool->cs );
1676
1677 TRACE( "pending_count %u.\n", io->u.io.pending_count );
1678
1679 if (io->u.io.pending_count)
1680 {
1681 --io->u.io.pending_count;
1682 if (!array_reserve((void **)&io->u.io.completions, &io->u.io.completion_max,
1683 io->u.io.completion_count + 1, sizeof(*io->u.io.completions)))
1684 {
1685 ERR( "Failed to allocate memory.\n" );
1686 RtlLeaveCriticalSection( &io->pool->cs );
1687 continue;
1688 }
1689
1690 completion = &io->u.io.completions[io->u.io.completion_count++];
1691 completion->iosb = iosb;
1692#ifdef __REACTOS__
1693 completion->cvalue = (ULONG_PTR)value;
1694#else
1695 completion->cvalue = value;
1696#endif
1697
1699 }
1700 RtlLeaveCriticalSection( &io->pool->cs );
1701 }
1702
1703 if (!ioqueue.objcount)
1704 {
1705 /* All I/O objects have been destroyed; if no new objects are
1706 * created within some amount of time, then we can shutdown this
1707 * thread. */
1708 LARGE_INTEGER timeout = {.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000};
1709 if (RtlSleepConditionVariableCS( &ioqueue.update_event, &ioqueue.cs,
1710 &timeout) == STATUS_TIMEOUT && !ioqueue.objcount)
1711 break;
1712 }
1713 }
1714
1715 ioqueue.thread_running = FALSE;
1717
1718 TRACE( "terminating I/O completion thread\n" );
1719
1720 RtlExitUserThread( 0 );
1721
1722#ifdef __REACTOS__
1723 return STATUS_SUCCESS;
1724#endif
1725}
1726
1728{
1730
1731 assert( io->type == TP_OBJECT_TYPE_IO );
1732
1734
1735 if (!ioqueue.port && (status = NtCreateIoCompletion( &ioqueue.port,
1737 {
1739 return status;
1740 }
1741
1742 if (!ioqueue.thread_running)
1743 {
1744 HANDLE thread;
1745
1747 0, 0, 0, ioqueue_thread_proc, NULL, &thread, NULL )))
1748 {
1749 ioqueue.thread_running = TRUE;
1750 NtClose( thread );
1751 }
1752 }
1753
1754 if (status == STATUS_SUCCESS)
1755 {
1758
1759#ifdef __REACTOS__
1760 info.Port = ioqueue.port;
1761 info.Key = io;
1762#else
1763 info.CompletionPort = ioqueue.port;
1764 info.CompletionKey = (ULONG_PTR)io;
1765#endif
1766
1768 }
1769
1770 if (status == STATUS_SUCCESS)
1771 {
1772 if (!ioqueue.objcount++)
1773 RtlWakeConditionVariable( &ioqueue.update_event );
1774 }
1775
1777 return status;
1778}
1779
1780/***********************************************************************
1781 * tp_threadpool_alloc (internal)
1782 *
1783 * Allocates a new threadpool object.
1784 */
1786{
1787#ifdef __REACTOS__
1788 IMAGE_NT_HEADERS *nt = RtlImageNtHeader( NtCurrentTeb()->ProcessEnvironmentBlock->ImageBaseAddress );
1789#else
1791#endif
1792 struct threadpool *pool;
1793 unsigned int i;
1794
1795 pool = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*pool) );
1796 if (!pool)
1797 return STATUS_NO_MEMORY;
1798
1799 pool->refcount = 1;
1800 pool->objcount = 0;
1801 pool->shutdown = FALSE;
1802
1803#ifdef __REACTOS__
1805#else
1807
1808 pool->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool.cs");
1809#endif
1810
1811 for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
1812 list_init( &pool->pools[i] );
1813 RtlInitializeConditionVariable( &pool->update_event );
1814
1815 pool->max_workers = 500;
1816 pool->min_workers = 0;
1817 pool->num_workers = 0;
1818 pool->num_busy_workers = 0;
1819 pool->stack_info.StackReserve = nt->OptionalHeader.SizeOfStackReserve;
1820 pool->stack_info.StackCommit = nt->OptionalHeader.SizeOfStackCommit;
1821
1822 TRACE( "allocated threadpool %p\n", pool );
1823
1824 *out = pool;
1825 return STATUS_SUCCESS;
1826}
1827
1828/***********************************************************************
1829 * tp_threadpool_shutdown (internal)
1830 *
1831 * Prepares the shutdown of a threadpool object and notifies all worker
1832 * threads to terminate (after all remaining work items have been
1833 * processed).
1834 */
1836{
1838
1839 pool->shutdown = TRUE;
1840 RtlWakeAllConditionVariable( &pool->update_event );
1841}
1842
1843/***********************************************************************
1844 * tp_threadpool_release (internal)
1845 *
1846 * Releases a reference to a threadpool object.
1847 */
1849{
1850 unsigned int i;
1851
1852 if (InterlockedDecrement( &pool->refcount ))
1853 return FALSE;
1854
1855 TRACE( "destroying threadpool %p\n", pool );
1856
1857 assert( pool->shutdown );
1858 assert( !pool->objcount );
1859 for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
1860 assert( list_empty( &pool->pools[i] ) );
1861#ifndef __REACTOS__
1862 pool->cs.DebugInfo->Spare[0] = 0;
1863#endif
1865
1867 return TRUE;
1868}
1869
1870/***********************************************************************
1871 * tp_threadpool_lock (internal)
1872 *
1873 * Acquires a lock on a threadpool, specified with an TP_CALLBACK_ENVIRON
1874 * block. When the lock is acquired successfully, it is guaranteed that
1875 * there is at least one worker thread to process tasks.
1876 */
1878{
1879 struct threadpool *pool = NULL;
1881
1882 if (environment)
1883 {
1884#ifndef __REACTOS__ //Windows 7 stuff
1885 /* Validate environment parameters. */
1886 if (environment->Version == 3)
1887 {
1888 TP_CALLBACK_ENVIRON_V3 *environment3 = (TP_CALLBACK_ENVIRON_V3 *)environment;
1889
1890 switch (environment3->CallbackPriority)
1891 {
1895 break;
1896 default:
1898 }
1899 }
1900#endif
1901 pool = (struct threadpool *)environment->Pool;
1902 }
1903
1904 if (!pool)
1905 {
1906 if (!default_threadpool)
1907 {
1909 if (status != STATUS_SUCCESS)
1910 return status;
1911
1913 {
1916 }
1917 }
1918
1920 }
1921
1923
1924 /* Make sure that the threadpool has at least one thread. */
1925 if (!pool->num_workers)
1927
1928 /* Keep a reference, and increment objcount to ensure that the
1929 * last thread doesn't terminate. */
1930 if (status == STATUS_SUCCESS)
1931 {
1932 InterlockedIncrement( &pool->refcount );
1933 pool->objcount++;
1934 }
1935
1937
1938 if (status != STATUS_SUCCESS)
1939 return status;
1940
1941 *out = pool;
1942 return STATUS_SUCCESS;
1943}
1944
1945/***********************************************************************
1946 * tp_threadpool_unlock (internal)
1947 *
1948 * Releases a lock on a threadpool.
1949 */
1951{
1953 pool->objcount--;
1956}
1957
1958/***********************************************************************
1959 * tp_group_alloc (internal)
1960 *
1961 * Allocates a new threadpool group object.
1962 */
1964{
1965 struct threadpool_group *group;
1966
1967 group = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*group) );
1968 if (!group)
1969 return STATUS_NO_MEMORY;
1970
1971 group->refcount = 1;
1972 group->shutdown = FALSE;
1973
1974#ifdef __REACTOS__
1976#else
1978
1979 group->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool_group.cs");
1980#endif
1981
1982 list_init( &group->members );
1983
1984 TRACE( "allocated group %p\n", group );
1985
1986 *out = group;
1987 return STATUS_SUCCESS;
1988}
1989
1990/***********************************************************************
1991 * tp_group_shutdown (internal)
1992 *
1993 * Marks the group object for shutdown.
1994 */
1996{
1997 group->shutdown = TRUE;
1998}
1999
2000/***********************************************************************
2001 * tp_group_release (internal)
2002 *
2003 * Releases a reference to a group object.
2004 */
2006{
2007 if (InterlockedDecrement( &group->refcount ))
2008 return FALSE;
2009
2010 TRACE( "destroying group %p\n", group );
2011
2012 assert( group->shutdown );
2013 assert( list_empty( &group->members ) );
2014
2015#ifndef __REACTOS__
2016 group->cs.DebugInfo->Spare[0] = 0;
2017#endif
2019
2021 return TRUE;
2022}
2023
2024/***********************************************************************
2025 * tp_object_initialize (internal)
2026 *
2027 * Initializes members of a threadpool object.
2028 */
2029static void tp_object_initialize( struct threadpool_object *object, struct threadpool *pool,
2030 PVOID userdata, TP_CALLBACK_ENVIRON *environment )
2031{
2032 BOOL is_simple_callback = (object->type == TP_OBJECT_TYPE_SIMPLE);
2033
2034 object->refcount = 1;
2035 object->shutdown = FALSE;
2036
2037 object->pool = pool;
2038 object->group = NULL;
2039 object->userdata = userdata;
2040 object->group_cancel_callback = NULL;
2041 object->finalization_callback = NULL;
2042 object->may_run_long = 0;
2043 object->race_dll = NULL;
2044 object->priority = TP_CALLBACK_PRIORITY_NORMAL;
2045
2046 memset( &object->group_entry, 0, sizeof(object->group_entry) );
2047 object->is_group_member = FALSE;
2048
2049 memset( &object->pool_entry, 0, sizeof(object->pool_entry) );
2050 RtlInitializeConditionVariable( &object->finished_event );
2051 RtlInitializeConditionVariable( &object->group_finished_event );
2052 object->completed_event = NULL;
2053 object->num_pending_callbacks = 0;
2054 object->num_running_callbacks = 0;
2055 object->num_associated_callbacks = 0;
2056
2057 if (environment)
2058 {
2059 if (environment->Version != 1 && environment->Version != 3)
2060 FIXME( "unsupported environment version %lu\n", environment->Version );
2061
2062 object->group = impl_from_TP_CLEANUP_GROUP( environment->CleanupGroup );
2063 object->group_cancel_callback = environment->CleanupGroupCancelCallback;
2064 object->finalization_callback = environment->FinalizationCallback;
2065 object->may_run_long = environment->u.s.LongFunction != 0;
2066 object->race_dll = environment->RaceDll;
2067#ifndef __REACTOS__ //Windows 7 stuff
2068 if (environment->Version == 3)
2069 {
2070 TP_CALLBACK_ENVIRON_V3 *environment_v3 = (TP_CALLBACK_ENVIRON_V3 *)environment;
2071
2072 object->priority = environment_v3->CallbackPriority;
2073 assert( object->priority < ARRAY_SIZE(pool->pools) );
2074 }
2075#endif
2076 if (environment->ActivationContext)
2077 FIXME( "activation context not supported yet\n" );
2078
2079 if (environment->u.s.Persistent)
2080 FIXME( "persistent threads not supported yet\n" );
2081 }
2082
2083 if (object->race_dll)
2084 LdrAddRefDll( 0, object->race_dll );
2085
2086 TRACE( "allocated object %p of type %u\n", object, object->type );
2087
2088 /* For simple callbacks we have to run tp_object_submit before adding this object
2089 * to the cleanup group. As soon as the cleanup group members are released ->shutdown
2090 * will be set, and tp_object_submit would fail with an assertion. */
2091
2092 if (is_simple_callback)
2093 tp_object_submit( object, FALSE );
2094
2095 if (object->group)
2096 {
2097 struct threadpool_group *group = object->group;
2098 InterlockedIncrement( &group->refcount );
2099
2101 list_add_tail( &group->members, &object->group_entry );
2102 object->is_group_member = TRUE;
2104 }
2105
2106 if (is_simple_callback)
2107 tp_object_release( object );
2108}
2109
2110static void tp_object_prio_queue( struct threadpool_object *object )
2111{
2112 ++object->pool->num_busy_workers;
2113 list_add_tail( &object->pool->pools[object->priority], &object->pool_entry );
2114}
2115
2116/***********************************************************************
2117 * tp_object_submit (internal)
2118 *
2119 * Submits a threadpool object to the associated threadpool. This
2120 * function has to be VOID because TpPostWork can never fail on Windows.
2121 */
2122static void tp_object_submit( struct threadpool_object *object, BOOL signaled )
2123{
2124 struct threadpool *pool = object->pool;
2126
2127 assert( !object->shutdown );
2128 assert( !pool->shutdown );
2129
2131
2132 /* Start new worker threads if required. */
2133 if (pool->num_busy_workers >= pool->num_workers &&
2134 pool->num_workers < pool->max_workers)
2136
2137 /* Queue work item and increment refcount. */
2138 InterlockedIncrement( &object->refcount );
2139 if (!object->num_pending_callbacks++)
2140 tp_object_prio_queue( object );
2141
2142 /* Count how often the object was signaled. */
2143 if (object->type == TP_OBJECT_TYPE_WAIT && signaled)
2144 object->u.wait.signaled++;
2145
2146 /* No new thread started - wake up one existing thread. */
2147 if (status != STATUS_SUCCESS)
2148 {
2149 assert( pool->num_workers > 0 );
2150 RtlWakeConditionVariable( &pool->update_event );
2151 }
2152
2154}
2155
2156/***********************************************************************
2157 * tp_object_cancel (internal)
2158 *
2159 * Cancels all currently pending callbacks for a specific object.
2160 */
2161static void tp_object_cancel( struct threadpool_object *object )
2162{
2163 struct threadpool *pool = object->pool;
2164 LONG pending_callbacks = 0;
2165
2167 if (object->num_pending_callbacks)
2168 {
2169 pending_callbacks = object->num_pending_callbacks;
2170 object->num_pending_callbacks = 0;
2171 list_remove( &object->pool_entry );
2172
2173 if (object->type == TP_OBJECT_TYPE_WAIT)
2174 object->u.wait.signaled = 0;
2175 }
2176 if (object->type == TP_OBJECT_TYPE_IO)
2177 {
2178 object->u.io.skipped_count += object->u.io.pending_count;
2179 object->u.io.pending_count = 0;
2180 }
2182
2183 while (pending_callbacks--)
2184 tp_object_release( object );
2185}
2186
2188{
2189 if (object->num_pending_callbacks)
2190 return FALSE;
2191 if (object->type == TP_OBJECT_TYPE_IO && object->u.io.pending_count)
2192 return FALSE;
2193
2194 if (group)
2195 return !object->num_running_callbacks;
2196 else
2197 return !object->num_associated_callbacks;
2198}
2199
2200/***********************************************************************
2201 * tp_object_wait (internal)
2202 *
2203 * Waits until all pending and running callbacks of a specific object
2204 * have been processed.
2205 */
2206static void tp_object_wait( struct threadpool_object *object, BOOL group_wait )
2207{
2208 struct threadpool *pool = object->pool;
2209
2211 while (!object_is_finished( object, group_wait ))
2212 {
2213 if (group_wait)
2214 RtlSleepConditionVariableCS( &object->group_finished_event, &pool->cs, NULL );
2215 else
2216 RtlSleepConditionVariableCS( &object->finished_event, &pool->cs, NULL );
2217 }
2219}
2220
2222{
2223 assert( io->type == TP_OBJECT_TYPE_IO );
2224
2226
2227 assert(ioqueue.objcount);
2228
2229 if (!io->shutdown && !--ioqueue.objcount)
2230 NtSetIoCompletion( ioqueue.port, 0, 0, STATUS_SUCCESS, 0 );
2231
2233}
2234
2235/***********************************************************************
2236 * tp_object_prepare_shutdown (internal)
2237 *
2238 * Prepares a threadpool object for shutdown.
2239 */
2241{
2242 if (object->type == TP_OBJECT_TYPE_TIMER)
2243 tp_timerqueue_unlock( object );
2244 else if (object->type == TP_OBJECT_TYPE_WAIT)
2245 tp_waitqueue_unlock( object );
2246 else if (object->type == TP_OBJECT_TYPE_IO)
2247 tp_ioqueue_unlock( object );
2248}
2249
2250/***********************************************************************
2251 * tp_object_release (internal)
2252 *
2253 * Releases a reference to a threadpool object.
2254 */
2256{
2257 if (InterlockedDecrement( &object->refcount ))
2258 return FALSE;
2259
2260 TRACE( "destroying object %p of type %u\n", object, object->type );
2261
2262 assert( object->shutdown );
2263 assert( !object->num_pending_callbacks );
2264 assert( !object->num_running_callbacks );
2265 assert( !object->num_associated_callbacks );
2266
2267 /* release reference to the group */
2268 if (object->group)
2269 {
2270 struct threadpool_group *group = object->group;
2271
2273 if (object->is_group_member)
2274 {
2275 list_remove( &object->group_entry );
2276 object->is_group_member = FALSE;
2277 }
2279
2281 }
2282
2284
2285 if (object->race_dll)
2286 LdrUnloadDll( object->race_dll );
2287
2288 if (object->completed_event && object->completed_event != INVALID_HANDLE_VALUE)
2289 NtSetEvent( object->completed_event, NULL );
2290
2291 RtlFreeHeap( GetProcessHeap(), 0, object );
2292 return TRUE;
2293}
2294
2295static struct list *threadpool_get_next_item( const struct threadpool *pool )
2296{
2297 struct list *ptr;
2298 unsigned int i;
2299
2300 for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
2301 {
2302 if ((ptr = list_head( &pool->pools[i] )))
2303 break;
2304 }
2305
2306 return ptr;
2307}
2308
2309/***********************************************************************
2310 * tp_object_execute (internal)
2311 *
2312 * Executes a threadpool object callback, object->pool->cs has to be
2313 * held.
2314 */
2315static void tp_object_execute( struct threadpool_object *object, BOOL wait_thread )
2316{
2317 TP_CALLBACK_INSTANCE *callback_instance;
2320 struct threadpool *pool = object->pool;
2321 TP_WAIT_RESULT wait_result = 0;
2323
2324 object->num_pending_callbacks--;
2325
2326 /* For wait objects check if they were signaled or have timed out. */
2327 if (object->type == TP_OBJECT_TYPE_WAIT)
2328 {
2329 wait_result = object->u.wait.signaled ? WAIT_OBJECT_0 : WAIT_TIMEOUT;
2330 if (wait_result == WAIT_OBJECT_0) object->u.wait.signaled--;
2331 }
2332 else if (object->type == TP_OBJECT_TYPE_IO)
2333 {
2334 assert( object->u.io.completion_count );
2335 completion = object->u.io.completions[--object->u.io.completion_count];
2336 }
2337
2338 /* Leave critical section and do the actual callback. */
2339 object->num_associated_callbacks++;
2340 object->num_running_callbacks++;
2342 if (wait_thread) RtlLeaveCriticalSection( &waitqueue.cs );
2343
2344 /* Initialize threadpool instance struct. */
2345 callback_instance = (TP_CALLBACK_INSTANCE *)&instance;
2346 instance.object = object;
2347 instance.threadid = GetCurrentThreadId();
2348 instance.associated = TRUE;
2349 instance.may_run_long = object->may_run_long;
2350 instance.cleanup.critical_section = NULL;
2351 instance.cleanup.mutex = NULL;
2352 instance.cleanup.semaphore = NULL;
2353 instance.cleanup.semaphore_count = 0;
2354 instance.cleanup.event = NULL;
2355 instance.cleanup.library = NULL;
2356
2357 switch (object->type)
2358 {
2360 {
2361 TRACE( "executing simple callback %p(%p, %p)\n",
2362 object->u.simple.callback, callback_instance, object->userdata );
2363 object->u.simple.callback( callback_instance, object->userdata );
2364 TRACE( "callback %p returned\n", object->u.simple.callback );
2365 break;
2366 }
2367
2369 {
2370 TRACE( "executing work callback %p(%p, %p, %p)\n",
2371 object->u.work.callback, callback_instance, object->userdata, object );
2372 object->u.work.callback( callback_instance, object->userdata, (TP_WORK *)object );
2373 TRACE( "callback %p returned\n", object->u.work.callback );
2374 break;
2375 }
2376
2378 {
2379 TRACE( "executing timer callback %p(%p, %p, %p)\n",
2380 object->u.timer.callback, callback_instance, object->userdata, object );
2381 object->u.timer.callback( callback_instance, object->userdata, (TP_TIMER *)object );
2382 TRACE( "callback %p returned\n", object->u.timer.callback );
2383 break;
2384 }
2385
2387 {
2388 TRACE( "executing wait callback %p(%p, %p, %p, %lu)\n",
2389 object->u.wait.callback, callback_instance, object->userdata, object, wait_result );
2390 object->u.wait.callback( callback_instance, object->userdata, (TP_WAIT *)object, wait_result );
2391 TRACE( "callback %p returned\n", object->u.wait.callback );
2392 break;
2393 }
2394
2395 case TP_OBJECT_TYPE_IO:
2396 {
2397 TRACE( "executing I/O callback %p(%p, %p, %#Ix, %p, %p)\n",
2398 object->u.io.callback, callback_instance, object->userdata,
2399 completion.cvalue, &completion.iosb, (TP_IO *)object );
2400 object->u.io.callback( callback_instance, object->userdata,
2401 (void *)completion.cvalue, &completion.iosb, (TP_IO *)object );
2402 TRACE( "callback %p returned\n", object->u.io.callback );
2403 break;
2404 }
2405
2406 default:
2407 assert(0);
2408 break;
2409 }
2410
2411 /* Execute finalization callback. */
2412 if (object->finalization_callback)
2413 {
2414 TRACE( "executing finalization callback %p(%p, %p)\n",
2415 object->finalization_callback, callback_instance, object->userdata );
2416 object->finalization_callback( callback_instance, object->userdata );
2417 TRACE( "callback %p returned\n", object->finalization_callback );
2418 }
2419
2420 /* Execute cleanup tasks. */
2421 if (instance.cleanup.critical_section)
2422 {
2423 RtlLeaveCriticalSection( instance.cleanup.critical_section );
2424 }
2425 if (instance.cleanup.mutex)
2426 {
2427 status = NtReleaseMutant( instance.cleanup.mutex, NULL );
2428 if (status != STATUS_SUCCESS) goto skip_cleanup;
2429 }
2430 if (instance.cleanup.semaphore)
2431 {
2432 status = NtReleaseSemaphore( instance.cleanup.semaphore, instance.cleanup.semaphore_count, NULL );
2433 if (status != STATUS_SUCCESS) goto skip_cleanup;
2434 }
2435 if (instance.cleanup.event)
2436 {
2437 status = NtSetEvent( instance.cleanup.event, NULL );
2438 if (status != STATUS_SUCCESS) goto skip_cleanup;
2439 }
2440 if (instance.cleanup.library)
2441 {
2442 LdrUnloadDll( instance.cleanup.library );
2443 }
2444
2445skip_cleanup:
2446 if (wait_thread) RtlEnterCriticalSection( &waitqueue.cs );
2448
2449 /* Simple callbacks are automatically shutdown after execution. */
2450 if (object->type == TP_OBJECT_TYPE_SIMPLE)
2451 {
2453 object->shutdown = TRUE;
2454 }
2455
2456 object->num_running_callbacks--;
2457 if (object_is_finished( object, TRUE ))
2458 RtlWakeAllConditionVariable( &object->group_finished_event );
2459
2460 if (instance.associated)
2461 {
2462 object->num_associated_callbacks--;
2463 if (object_is_finished( object, FALSE ))
2464 RtlWakeAllConditionVariable( &object->finished_event );
2465 }
2466}
2467
2468/***********************************************************************
2469 * threadpool_worker_proc (internal)
2470 */
2471#ifdef __REACTOS__
2473#else
2475#endif
2476{
2477 struct threadpool *pool = param;
2479 struct list *ptr;
2480
2481 TRACE( "starting worker thread for pool %p\n", pool );
2482 set_thread_name(L"wine_threadpool_worker");
2483
2485 for (;;)
2486 {
2487 while ((ptr = threadpool_get_next_item( pool )))
2488 {
2489 struct threadpool_object *object = LIST_ENTRY( ptr, struct threadpool_object, pool_entry );
2490 assert( object->num_pending_callbacks > 0 );
2491
2492 /* If further pending callbacks are queued, move the work item to
2493 * the end of the pool list. Otherwise remove it from the pool. */
2494 list_remove( &object->pool_entry );
2495 if (object->num_pending_callbacks > 1)
2496 tp_object_prio_queue( object );
2497
2498 tp_object_execute( object, FALSE );
2499
2500 assert(pool->num_busy_workers);
2501 pool->num_busy_workers--;
2502
2503 tp_object_release( object );
2504 }
2505
2506 /* Shutdown worker thread if requested. */
2507 if (pool->shutdown)
2508 break;
2509
2510 /* Wait for new tasks or until the timeout expires. A thread only terminates
2511 * when no new tasks are available, and the number of threads can be
2512 * decreased without violating the min_workers limit. An exception is when
2513 * min_workers == 0, then objcount is used to detect if the last thread
2514 * can be terminated. */
2515 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
2516 if (RtlSleepConditionVariableCS( &pool->update_event, &pool->cs, &timeout ) == STATUS_TIMEOUT &&
2517 !threadpool_get_next_item( pool ) && (pool->num_workers > max( pool->min_workers, 1 ) ||
2518 (!pool->min_workers && !pool->objcount)))
2519 {
2520 break;
2521 }
2522 }
2523 pool->num_workers--;
2525
2526 TRACE( "terminating worker thread for pool %p\n", pool );
2528 RtlExitUserThread( 0 );
2529#ifdef __REACTOS__
2530 return STATUS_SUCCESS;
2531#endif
2532}
2533
2534/***********************************************************************
2535 * TpAllocCleanupGroup (NTDLL.@)
2536 */
2538{
2539 TRACE( "%p\n", out );
2540
2541 return tp_group_alloc( (struct threadpool_group **)out );
2542}
2543
2544/***********************************************************************
2545 * TpAllocIoCompletion (NTDLL.@)
2546 */
2548 void *userdata, TP_CALLBACK_ENVIRON *environment )
2549{
2550 struct threadpool_object *object;
2551 struct threadpool *pool;
2553
2554 TRACE( "%p %p %p %p %p\n", out, file, callback, userdata, environment );
2555
2556 if (!(object = RtlAllocateHeap( GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(*object) )))
2557 return STATUS_NO_MEMORY;
2558
2559 if ((status = tp_threadpool_lock( &pool, environment )))
2560 {
2561 RtlFreeHeap( GetProcessHeap(), 0, object );
2562 return status;
2563 }
2564
2565 object->type = TP_OBJECT_TYPE_IO;
2566 object->u.io.callback = callback;
2567 if (!(object->u.io.completions = RtlAllocateHeap( GetProcessHeap(), 0, 8 * sizeof(*object->u.io.completions) )))
2568 {
2570 RtlFreeHeap( GetProcessHeap(), 0, object );
2571 return status;
2572 }
2573
2574 if ((status = tp_ioqueue_lock( object, file )))
2575 {
2577 RtlFreeHeap( GetProcessHeap(), 0, object->u.io.completions );
2578 RtlFreeHeap( GetProcessHeap(), 0, object );
2579 return status;
2580 }
2581
2582 tp_object_initialize( object, pool, userdata, environment );
2583
2584 *out = (TP_IO *)object;
2585 return STATUS_SUCCESS;
2586}
2587
2588/***********************************************************************
2589 * TpAllocPool (NTDLL.@)
2590 */
2592{
2593 TRACE( "%p %p\n", out, reserved );
2594
2595 if (reserved)
2596 FIXME( "reserved argument is nonzero (%p)\n", reserved );
2597
2598 return tp_threadpool_alloc( (struct threadpool **)out );
2599}
2600
2601/***********************************************************************
2602 * TpAllocTimer (NTDLL.@)
2603 */
2605 TP_CALLBACK_ENVIRON *environment )
2606{
2607 struct threadpool_object *object;
2608 struct threadpool *pool;
2610
2611 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2612
2613 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2614 if (!object)
2615 return STATUS_NO_MEMORY;
2616
2617 status = tp_threadpool_lock( &pool, environment );
2618 if (status)
2619 {
2620 RtlFreeHeap( GetProcessHeap(), 0, object );
2621 return status;
2622 }
2623
2624 object->type = TP_OBJECT_TYPE_TIMER;
2625 object->u.timer.callback = callback;
2626
2627 status = tp_timerqueue_lock( object );
2628 if (status)
2629 {
2631 RtlFreeHeap( GetProcessHeap(), 0, object );
2632 return status;
2633 }
2634
2635 tp_object_initialize( object, pool, userdata, environment );
2636
2637 *out = (TP_TIMER *)object;
2638 return STATUS_SUCCESS;
2639}
2640
2642 TP_CALLBACK_ENVIRON *environment, DWORD flags )
2643{
2644 struct threadpool_object *object;
2645 struct threadpool *pool;
2647
2648 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2649 if (!object)
2650 return STATUS_NO_MEMORY;
2651
2652 status = tp_threadpool_lock( &pool, environment );
2653 if (status)
2654 {
2655 RtlFreeHeap( GetProcessHeap(), 0, object );
2656 return status;
2657 }
2658
2659 object->type = TP_OBJECT_TYPE_WAIT;
2660 object->u.wait.callback = callback;
2661 object->u.wait.flags = flags;
2662
2663 status = tp_waitqueue_lock( object );
2664 if (status)
2665 {
2667 RtlFreeHeap( GetProcessHeap(), 0, object );
2668 return status;
2669 }
2670
2671 tp_object_initialize( object, pool, userdata, environment );
2672
2673 *out = (TP_WAIT *)object;
2674 return STATUS_SUCCESS;
2675}
2676
2677/***********************************************************************
2678 * TpAllocWait (NTDLL.@)
2679 */
2681 TP_CALLBACK_ENVIRON *environment )
2682{
2683 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2684 return tp_alloc_wait( out, callback, userdata, environment, WT_EXECUTEONLYONCE );
2685}
2686
2687/***********************************************************************
2688 * TpAllocWork (NTDLL.@)
2689 */
2691 TP_CALLBACK_ENVIRON *environment )
2692{
2693 struct threadpool_object *object;
2694 struct threadpool *pool;
2696
2697 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2698
2699 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2700 if (!object)
2701 return STATUS_NO_MEMORY;
2702
2703 status = tp_threadpool_lock( &pool, environment );
2704 if (status)
2705 {
2706 RtlFreeHeap( GetProcessHeap(), 0, object );
2707 return status;
2708 }
2709
2710 object->type = TP_OBJECT_TYPE_WORK;
2711 object->u.work.callback = callback;
2712 tp_object_initialize( object, pool, userdata, environment );
2713
2714 *out = (TP_WORK *)object;
2715 return STATUS_SUCCESS;
2716}
2717
2718/***********************************************************************
2719 * TpCancelAsyncIoOperation (NTDLL.@)
2720 */
2722{
2723 struct threadpool_object *this = impl_from_TP_IO( io );
2724
2725 TRACE( "%p\n", io );
2726
2727 RtlEnterCriticalSection( &this->pool->cs );
2728
2729 TRACE("pending_count %u.\n", this->u.io.pending_count);
2730
2731 this->u.io.pending_count--;
2732 if (object_is_finished( this, TRUE ))
2734 if (object_is_finished( this, FALSE ))
2736
2737 RtlLeaveCriticalSection( &this->pool->cs );
2738}
2739
2740/***********************************************************************
2741 * TpCallbackLeaveCriticalSectionOnCompletion (NTDLL.@)
2742 */
2744{
2746
2747 TRACE( "%p %p\n", instance, crit );
2748
2749 if (!this->cleanup.critical_section)
2750 this->cleanup.critical_section = crit;
2751}
2752
2753/***********************************************************************
2754 * TpCallbackMayRunLong (NTDLL.@)
2755 */
2757{
2759 struct threadpool_object *object = this->object;
2760 struct threadpool *pool;
2762
2763 TRACE( "%p\n", instance );
2764
2765 if (this->threadid != GetCurrentThreadId())
2766 {
2767 ERR("called from wrong thread, ignoring\n");
2768 return STATUS_UNSUCCESSFUL; /* FIXME */
2769 }
2770
2771 if (this->may_run_long)
2772 return STATUS_SUCCESS;
2773
2774 pool = object->pool;
2776
2777 /* Start new worker threads if required. */
2778 if (pool->num_busy_workers >= pool->num_workers)
2779 {
2780 if (pool->num_workers < pool->max_workers)
2781 {
2783 }
2784 else
2785 {
2787 }
2788 }
2789
2791 this->may_run_long = TRUE;
2792 return status;
2793}
2794
2795/***********************************************************************
2796 * TpCallbackReleaseMutexOnCompletion (NTDLL.@)
2797 */
2799{
2801
2802 TRACE( "%p %p\n", instance, mutex );
2803
2804 if (!this->cleanup.mutex)
2805 this->cleanup.mutex = mutex;
2806}
2807
2808/***********************************************************************
2809 * TpCallbackReleaseSemaphoreOnCompletion (NTDLL.@)
2810 */
2812{
2814
2815 TRACE( "%p %p %lu\n", instance, semaphore, count );
2816
2817 if (!this->cleanup.semaphore)
2818 {
2819 this->cleanup.semaphore = semaphore;
2820 this->cleanup.semaphore_count = count;
2821 }
2822}
2823
2824/***********************************************************************
2825 * TpCallbackSetEventOnCompletion (NTDLL.@)
2826 */
2828{
2830
2831 TRACE( "%p %p\n", instance, event );
2832
2833 if (!this->cleanup.event)
2834 this->cleanup.event = event;
2835}
2836
2837/***********************************************************************
2838 * TpCallbackUnloadDllOnCompletion (NTDLL.@)
2839 */
2841{
2843
2844 TRACE( "%p %p\n", instance, module );
2845
2846 if (!this->cleanup.library)
2847 this->cleanup.library = module;
2848}
2849
2850/***********************************************************************
2851 * TpDisassociateCallback (NTDLL.@)
2852 */
2854{
2856 struct threadpool_object *object = this->object;
2857 struct threadpool *pool;
2858
2859 TRACE( "%p\n", instance );
2860
2861 if (this->threadid != GetCurrentThreadId())
2862 {
2863 ERR("called from wrong thread, ignoring\n");
2864 return;
2865 }
2866
2867 if (!this->associated)
2868 return;
2869
2870 pool = object->pool;
2872
2873 object->num_associated_callbacks--;
2874 if (object_is_finished( object, FALSE ))
2875 RtlWakeAllConditionVariable( &object->finished_event );
2876
2878 this->associated = FALSE;
2879}
2880
2881/***********************************************************************
2882 * TpIsTimerSet (NTDLL.@)
2883 */
2885{
2886 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2887
2888 TRACE( "%p\n", timer );
2889
2890 return this->u.timer.timer_set;
2891}
2892
2893/***********************************************************************
2894 * TpPostWork (NTDLL.@)
2895 */
2897{
2898 struct threadpool_object *this = impl_from_TP_WORK( work );
2899
2900 TRACE( "%p\n", work );
2901
2902 tp_object_submit( this, FALSE );
2903}
2904
2905/***********************************************************************
2906 * TpReleaseCleanupGroup (NTDLL.@)
2907 */
2909{
2911
2912 TRACE( "%p\n", group );
2913
2914 tp_group_shutdown( this );
2915 tp_group_release( this );
2916}
2917
2918/***********************************************************************
2919 * TpReleaseCleanupGroupMembers (NTDLL.@)
2920 */
2922{
2924 struct threadpool_object *object, *next;
2925 struct list members;
2926
2927 TRACE( "%p %u %p\n", group, cancel_pending, userdata );
2928
2929 RtlEnterCriticalSection( &this->cs );
2930
2931 /* Unset group, increase references, and mark objects for shutdown */
2932 LIST_FOR_EACH_ENTRY_SAFE( object, next, &this->members, struct threadpool_object, group_entry )
2933 {
2934 assert( object->group == this );
2935 assert( object->is_group_member );
2936
2937 if (InterlockedIncrement( &object->refcount ) == 1)
2938 {
2939 /* Object is basically already destroyed, but group reference
2940 * was not deleted yet. We can safely ignore this object. */
2941 InterlockedDecrement( &object->refcount );
2942 list_remove( &object->group_entry );
2943 object->is_group_member = FALSE;
2944 continue;
2945 }
2946
2947 object->is_group_member = FALSE;
2949 }
2950
2951 /* Move members to a new temporary list */
2952 list_init( &members );
2953 list_move_tail( &members, &this->members );
2954
2955 RtlLeaveCriticalSection( &this->cs );
2956
2957 /* Cancel pending callbacks if requested */
2958 if (cancel_pending)
2959 {
2960 LIST_FOR_EACH_ENTRY( object, &members, struct threadpool_object, group_entry )
2961 {
2962 tp_object_cancel( object );
2963 }
2964 }
2965
2966 /* Wait for remaining callbacks to finish */
2967 LIST_FOR_EACH_ENTRY_SAFE( object, next, &members, struct threadpool_object, group_entry )
2968 {
2969 tp_object_wait( object, TRUE );
2970
2971 if (!object->shutdown)
2972 {
2973 /* Execute group cancellation callback if defined, and if this was actually a group cancel. */
2974 if (cancel_pending && object->group_cancel_callback)
2975 {
2976 TRACE( "executing group cancel callback %p(%p, %p)\n",
2977 object->group_cancel_callback, object->userdata, userdata );
2978 object->group_cancel_callback( object->userdata, userdata );
2979 TRACE( "callback %p returned\n", object->group_cancel_callback );
2980 }
2981
2982 if (object->type != TP_OBJECT_TYPE_SIMPLE)
2983 tp_object_release( object );
2984 }
2985
2986 object->shutdown = TRUE;
2987 tp_object_release( object );
2988 }
2989}
2990
2991/***********************************************************************
2992 * TpReleaseIoCompletion (NTDLL.@)
2993 */
2995{
2996 struct threadpool_object *this = impl_from_TP_IO( io );
2997 BOOL can_destroy;
2998
2999 TRACE( "%p\n", io );
3000
3001 RtlEnterCriticalSection( &this->pool->cs );
3002 this->u.io.shutting_down = TRUE;
3003 can_destroy = !this->u.io.pending_count && !this->u.io.skipped_count;
3004 RtlLeaveCriticalSection( &this->pool->cs );
3005
3006 if (can_destroy)
3007 {
3009 this->shutdown = TRUE;
3010 tp_object_release( this );
3011 }
3012}
3013
3014/***********************************************************************
3015 * TpReleasePool (NTDLL.@)
3016 */
3018{
3019 struct threadpool *this = impl_from_TP_POOL( pool );
3020
3021 TRACE( "%p\n", pool );
3022
3023 tp_threadpool_shutdown( this );
3024 tp_threadpool_release( this );
3025}
3026
3027/***********************************************************************
3028 * TpReleaseTimer (NTDLL.@)
3029 */
3031{
3032 struct threadpool_object *this = impl_from_TP_TIMER( timer );
3033
3034 TRACE( "%p\n", timer );
3035
3037 this->shutdown = TRUE;
3038 tp_object_release( this );
3039}
3040
3041/***********************************************************************
3042 * TpReleaseWait (NTDLL.@)
3043 */
3045{
3046 struct threadpool_object *this = impl_from_TP_WAIT( wait );
3047
3048 TRACE( "%p\n", wait );
3049
3051 this->shutdown = TRUE;
3052 tp_object_release( this );
3053}
3054
3055/***********************************************************************
3056 * TpReleaseWork (NTDLL.@)
3057 */
3059{
3060 struct threadpool_object *this = impl_from_TP_WORK( work );
3061
3062 TRACE( "%p\n", work );
3063
3065 this->shutdown = TRUE;
3066 tp_object_release( this );
3067}
3068
3069/***********************************************************************
3070 * TpSetPoolMaxThreads (NTDLL.@)
3071 */
3073{
3074 struct threadpool *this = impl_from_TP_POOL( pool );
3075
3076 TRACE( "%p %lu\n", pool, maximum );
3077
3078 RtlEnterCriticalSection( &this->cs );
3079 this->max_workers = max( maximum, 1 );
3080 this->min_workers = min( this->min_workers, this->max_workers );
3081 RtlLeaveCriticalSection( &this->cs );
3082}
3083
3084/***********************************************************************
3085 * TpSetPoolMinThreads (NTDLL.@)
3086 */
3088{
3089 struct threadpool *this = impl_from_TP_POOL( pool );
3091
3092 TRACE( "%p %lu\n", pool, minimum );
3093
3094 RtlEnterCriticalSection( &this->cs );
3095
3096 while (this->num_workers < minimum)
3097 {
3098 status = tp_new_worker_thread( this );
3099 if (status != STATUS_SUCCESS)
3100 break;
3101 }
3102
3103 if (status == STATUS_SUCCESS)
3104 {
3105 this->min_workers = minimum;
3106 this->max_workers = max( this->min_workers, this->max_workers );
3107 }
3108
3109 RtlLeaveCriticalSection( &this->cs );
3110 return !status;
3111}
3112
3113/***********************************************************************
3114 * TpSetTimer (NTDLL.@)
3115 */
3116VOID WINAPI TpSetTimer( TP_TIMER *timer, LARGE_INTEGER *timeout, LONG period, LONG window_length )
3117{
3118 struct threadpool_object *this = impl_from_TP_TIMER( timer );
3119 struct threadpool_object *other_timer;
3120 BOOL submit_timer = FALSE;
3122
3123 TRACE( "%p %p %lu %lu\n", timer, timeout, period, window_length );
3124
3126
3127 assert( this->u.timer.timer_initialized );
3128 this->u.timer.timer_set = timeout != NULL;
3129
3130 /* Convert relative timeout to absolute timestamp and handle a timeout
3131 * of zero, which means that the timer is submitted immediately. */
3132 if (timeout)
3133 {
3134 timestamp = timeout->QuadPart;
3135 if ((LONGLONG)timestamp < 0)
3136 {
3139 timestamp = now.QuadPart - timestamp;
3140 }
3141 else if (!timestamp)
3142 {
3143 if (!period)
3144 timeout = NULL;
3145 else
3146 {
3149 timestamp = now.QuadPart + (ULONGLONG)period * 10000;
3150 }
3151 submit_timer = TRUE;
3152 }
3153 }
3154
3155 /* First remove existing timeout. */
3156 if (this->u.timer.timer_pending)
3157 {
3158 list_remove( &this->u.timer.timer_entry );
3159 this->u.timer.timer_pending = FALSE;
3160 }
3161
3162 /* If the timer was enabled, then add it back to the queue. */
3163 if (timeout)
3164 {
3165 this->u.timer.timeout = timestamp;
3166 this->u.timer.period = period;
3167 this->u.timer.window_length = window_length;
3168
3169 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
3170 struct threadpool_object, u.timer.timer_entry )
3171 {
3172 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
3173 if (this->u.timer.timeout < other_timer->u.timer.timeout)
3174 break;
3175 }
3176 list_add_before( &other_timer->u.timer.timer_entry, &this->u.timer.timer_entry );
3177
3178 /* Wake up the timer thread when the timeout has to be updated. */
3179 if (list_head( &timerqueue.pending_timers ) == &this->u.timer.timer_entry )
3180 RtlWakeAllConditionVariable( &timerqueue.update_event );
3181
3182 this->u.timer.timer_pending = TRUE;
3183 }
3184
3186
3187 if (submit_timer)
3188 tp_object_submit( this, FALSE );
3189}
3190
3191/***********************************************************************
3192 * TpSetWait (NTDLL.@)
3193 */
3195{
3196 struct threadpool_object *this = impl_from_TP_WAIT( wait );
3198
3199 TRACE( "%p %p %p\n", wait, handle, timeout );
3200
3202
3203 assert( this->u.wait.bucket );
3204 this->u.wait.handle = handle;
3205
3206 if (handle || this->u.wait.wait_pending)
3207 {
3208 struct waitqueue_bucket *bucket = this->u.wait.bucket;
3209 list_remove( &this->u.wait.wait_entry );
3210
3211 /* Convert relative timeout to absolute timestamp. */
3212 if (handle && timeout)
3213 {
3214 timestamp = timeout->QuadPart;
3215 if ((LONGLONG)timestamp < 0)
3216 {
3219 timestamp = now.QuadPart - timestamp;
3220 }
3221 }
3222
3223 /* Add wait object back into one of the queues. */
3224 if (handle)
3225 {
3226 list_add_tail( &bucket->waiting, &this->u.wait.wait_entry );
3227 this->u.wait.wait_pending = TRUE;
3228 this->u.wait.timeout = timestamp;
3229 }
3230 else
3231 {
3232 list_add_tail( &bucket->reserved, &this->u.wait.wait_entry );
3233 this->u.wait.wait_pending = FALSE;
3234 }
3235
3236 /* Wake up the wait queue thread. */
3237 NtSetEvent( bucket->update_event, NULL );
3238 }
3239
3241}
3242
3243/***********************************************************************
3244 * TpSimpleTryPost (NTDLL.@)
3245 */
3247 TP_CALLBACK_ENVIRON *environment )
3248{
3249 struct threadpool_object *object;
3250 struct threadpool *pool;
3252
3253 TRACE( "%p %p %p\n", callback, userdata, environment );
3254
3255 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
3256 if (!object)
3257 return STATUS_NO_MEMORY;
3258
3259 status = tp_threadpool_lock( &pool, environment );
3260 if (status)
3261 {
3262 RtlFreeHeap( GetProcessHeap(), 0, object );
3263 return status;
3264 }
3265
3266 object->type = TP_OBJECT_TYPE_SIMPLE;
3267 object->u.simple.callback = callback;
3268 tp_object_initialize( object, pool, userdata, environment );
3269
3270 return STATUS_SUCCESS;
3271}
3272
3273/***********************************************************************
3274 * TpStartAsyncIoOperation (NTDLL.@)
3275 */
3277{
3278 struct threadpool_object *this = impl_from_TP_IO( io );
3279
3280 TRACE( "%p\n", io );
3281
3282 RtlEnterCriticalSection( &this->pool->cs );
3283
3284 this->u.io.pending_count++;
3285
3286 RtlLeaveCriticalSection( &this->pool->cs );
3287}
3288
3289/***********************************************************************
3290 * TpWaitForIoCompletion (NTDLL.@)
3291 */
3292void WINAPI TpWaitForIoCompletion( TP_IO *io, BOOL cancel_pending )
3293{
3294 struct threadpool_object *this = impl_from_TP_IO( io );
3295
3296 TRACE( "%p %d\n", io, cancel_pending );
3297
3298 if (cancel_pending)
3299 tp_object_cancel( this );
3300 tp_object_wait( this, FALSE );
3301}
3302
3303/***********************************************************************
3304 * TpWaitForTimer (NTDLL.@)
3305 */
3307{
3308 struct threadpool_object *this = impl_from_TP_TIMER( timer );
3309
3310 TRACE( "%p %d\n", timer, cancel_pending );
3311
3312 if (cancel_pending)
3313 tp_object_cancel( this );
3314 tp_object_wait( this, FALSE );
3315}
3316
3317/***********************************************************************
3318 * TpWaitForWait (NTDLL.@)
3319 */
3321{
3322 struct threadpool_object *this = impl_from_TP_WAIT( wait );
3323
3324 TRACE( "%p %d\n", wait, cancel_pending );
3325
3326 if (cancel_pending)
3327 tp_object_cancel( this );
3328 tp_object_wait( this, FALSE );
3329}
3330
3331/***********************************************************************
3332 * TpWaitForWork (NTDLL.@)
3333 */
3335{
3336 struct threadpool_object *this = impl_from_TP_WORK( work );
3337
3338 TRACE( "%p %u\n", work, cancel_pending );
3339
3340 if (cancel_pending)
3341 tp_object_cancel( this );
3342 tp_object_wait( this, FALSE );
3343}
3344
3345/***********************************************************************
3346 * TpSetPoolStackInformation (NTDLL.@)
3347 */
3349{
3350 struct threadpool *this = impl_from_TP_POOL( pool );
3351
3352 TRACE( "%p %p\n", pool, stack_info );
3353
3354 if (!stack_info)
3356
3357 RtlEnterCriticalSection( &this->cs );
3358 this->stack_info = *stack_info;
3359 RtlLeaveCriticalSection( &this->cs );
3360
3361 return STATUS_SUCCESS;
3362}
3363
3364/***********************************************************************
3365 * TpQueryPoolStackInformation (NTDLL.@)
3366 */
3368{
3369 struct threadpool *this = impl_from_TP_POOL( pool );
3370
3371 TRACE( "%p %p\n", pool, stack_info );
3372
3373 if (!stack_info)
3375
3376 RtlEnterCriticalSection( &this->cs );
3377 *stack_info = this->stack_info;
3378 RtlLeaveCriticalSection( &this->cs );
3379
3380 return STATUS_SUCCESS;
3381}
3382
3384{
3385 struct threadpool_object *object = impl_from_TP_WAIT(wait);
3386 object->u.wait.rtl_callback( userdata, result != STATUS_WAIT_0 );
3387}
3388
3389/***********************************************************************
3390 * RtlRegisterWait (NTDLL.@)
3391 *
3392 * Registers a wait for a handle to become signaled.
3393 *
3394 * PARAMS
3395 * NewWaitObject [I] Handle to the new wait object. Use RtlDeregisterWait() to free it.
3396 * Object [I] Object to wait to become signaled.
3397 * Callback [I] Callback function to execute when the wait times out or the handle is signaled.
3398 * Context [I] Context to pass to the callback function when it is executed.
3399 * Milliseconds [I] Number of milliseconds to wait before timing out.
3400 * Flags [I] Flags. See notes.
3401 *
3402 * RETURNS
3403 * Success: STATUS_SUCCESS.
3404 * Failure: Any NTSTATUS code.
3405 *
3406 * NOTES
3407 * Flags can be one or more of the following:
3408 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
3409 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
3410 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
3411 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
3412 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
3413 */
3415 void *context, ULONG milliseconds, ULONG flags )
3416{
3417 struct threadpool_object *object;
3418 TP_CALLBACK_ENVIRON environment;
3421 TP_WAIT *wait;
3422
3423 TRACE( "out %p, handle %p, callback %p, context %p, milliseconds %lu, flags %lx\n",
3424 out, handle, callback, context, milliseconds, flags );
3425
3426 memset( &environment, 0, sizeof(environment) );
3427 environment.Version = 1;
3428 environment.u.s.LongFunction = (flags & WT_EXECUTELONGFUNCTION) != 0;
3429 environment.u.s.Persistent = (flags & WT_EXECUTEINPERSISTENTTHREAD) != 0;
3430
3432 if ((status = tp_alloc_wait( &wait, rtl_wait_callback, context, &environment, flags )))
3433 return status;
3434
3435 object = impl_from_TP_WAIT(wait);
3436 object->u.wait.rtl_callback = callback;
3437
3439 TpSetWait( (TP_WAIT *)object, handle, get_nt_timeout( &timeout, milliseconds ) );
3440
3441 *out = object;
3443
3444 return STATUS_SUCCESS;
3445}
3446
3447/***********************************************************************
3448 * RtlDeregisterWaitEx (NTDLL.@)
3449 *
3450 * Cancels a wait operation and frees the resources associated with calling
3451 * RtlRegisterWait().
3452 *
3453 * PARAMS
3454 * WaitObject [I] Handle to the wait object to free.
3455 *
3456 * RETURNS
3457 * Success: STATUS_SUCCESS.
3458 * Failure: Any NTSTATUS code.
3459 */
3461{
3462 struct threadpool_object *object = handle;
3464
3465 TRACE( "handle %p, event %p\n", handle, event );
3466
3467 if (!object) return STATUS_INVALID_HANDLE;
3468
3469 TpSetWait( (TP_WAIT *)object, NULL, NULL );
3470
3471 if (event == INVALID_HANDLE_VALUE) TpWaitForWait( (TP_WAIT *)object, TRUE );
3472 else
3473 {
3474 assert( object->completed_event == NULL );
3475 object->completed_event = event;
3476 }
3477
3478 RtlEnterCriticalSection( &object->pool->cs );
3479 if (object->num_pending_callbacks + object->num_running_callbacks
3480 + object->num_associated_callbacks) status = STATUS_PENDING;
3481 else status = STATUS_SUCCESS;
3482 RtlLeaveCriticalSection( &object->pool->cs );
3483
3484 TpReleaseWait( (TP_WAIT *)object );
3485 return status;
3486}
3487
3488/***********************************************************************
3489 * RtlDeregisterWait (NTDLL.@)
3490 *
3491 * Cancels a wait operation and frees the resources associated with calling
3492 * RtlRegisterWait().
3493 *
3494 * PARAMS
3495 * WaitObject [I] Handle to the wait object to free.
3496 *
3497 * RETURNS
3498 * Success: STATUS_SUCCESS.
3499 * Failure: Any NTSTATUS code.
3500 */
3502{
3503 return RtlDeregisterWaitEx(WaitHandle, NULL);
3504}
3505
3506#ifdef __REACTOS__
3507VOID
3508NTAPI
3510 VOID)
3511{
3512 RtlInitializeCriticalSection(&old_threadpool.threadpool_compl_cs);
3516}
3517#endif
void CALLBACK completion(DWORD dwError, DWORD cbTransferred, LPWSAOVERLAPPED lpOverlapped, DWORD dwFlags)
Definition: WSARecv.c:16
void destroy(_Tp *__pointer)
Definition: _construct.h:278
#define VOID
Definition: acefi.h:82
#define InterlockedIncrement
Definition: armddk.h:53
#define InterlockedDecrement
Definition: armddk.h:52
#define skip(...)
Definition: atltest.h:64
#define WINE_DEFAULT_DEBUG_CHANNEL(t)
Definition: precomp.h:23
LONG NTSTATUS
Definition: precomp.h:26
#define ARRAY_SIZE(A)
Definition: main.h:20
static void list_remove(struct list_entry *entry)
Definition: list.h:90
static int list_empty(struct list_entry *head)
Definition: list.h:58
static void list_add_tail(struct list_entry *head, struct list_entry *entry)
Definition: list.h:83
static void list_init(struct list_entry *head)
Definition: list.h:51
#define FIXME(fmt,...)
Definition: precomp.h:53
#define WARN(fmt,...)
Definition: precomp.h:61
#define ERR(fmt,...)
Definition: precomp.h:57
static HANDLE thread
Definition: service.c:33
#define NTSYSAPI
Definition: ntoskrnl.h:12
PVOID NTAPI RtlAllocateHeap(IN PVOID HeapHandle, IN ULONG Flags, IN SIZE_T Size)
Definition: heap.c:616
BOOLEAN NTAPI RtlFreeHeap(IN PVOID HeapHandle, IN ULONG Flags, IN PVOID HeapBase)
Definition: heap.c:634
_In_ CDROM_SCAN_FOR_SPECIAL_INFO _In_ PCDROM_SCAN_FOR_SPECIAL_HANDLER Function
Definition: cdrom.h:1156
Definition: list.h:37
#define STATUS_TIMEOUT
Definition: d3dkmdt.h:49
#define STATUS_INVALID_HANDLE
Definition: d3dkmdt.h:40
#define STATUS_PENDING
Definition: d3dkmdt.h:43
#define STATUS_NO_MEMORY
Definition: d3dkmdt.h:51
#define WAIT_TIMEOUT
Definition: dderror.h:14
#define NULL
Definition: types.h:112
#define TRUE
Definition: types.h:120
#define FALSE
Definition: types.h:117
static HINSTANCE instance
Definition: main.c:40
#define GetProcessHeap()
Definition: compat.h:736
#define INVALID_HANDLE_VALUE
Definition: compat.h:731
#define GetCurrentProcess()
Definition: compat.h:759
#define RtlImageNtHeader
Definition: compat.h:806
#define CALLBACK
Definition: compat.h:35
#define HEAP_ZERO_MEMORY
Definition: compat.h:134
static void cleanup(void)
Definition: main.c:1335
PPEB Peb
Definition: dllmain.c:27
#define assert(x)
Definition: debug.h:53
static void list_move_tail(struct list_head *list, struct list_head *head)
Definition: list.h:122
r reserved
Definition: btrfs.c:3006
#define INFINITE
Definition: serial.h:102
#define ULONG_PTR
Definition: config.h:101
unsigned int BOOL
Definition: ntddk_ex.h:94
unsigned long DWORD
Definition: ntddk_ex.h:95
time_t now
Definition: finger.c:65
_Must_inspect_result_ _In_opt_ PFLT_INSTANCE _Out_ PHANDLE FileHandle
Definition: fltkernel.h:1231
@ FileCompletionInformation
Definition: from_kernel.h:91
FxCollectionEntry * cur
GLuint GLuint GLsizei count
Definition: gl.h:1545
GLdouble GLdouble t
Definition: gl.h:2047
GLdouble GLdouble GLdouble GLdouble q
Definition: gl.h:2063
struct _cl_event * event
Definition: glext.h:7739
GLuint res
Definition: glext.h:9613
GLsizeiptr size
Definition: glext.h:5919
GLbitfield flags
Definition: glext.h:7161
GLboolean GLuint group
Definition: glext.h:11120
GLuint64EXT * result
Definition: glext.h:11304
GLfloat GLfloat p
Definition: glext.h:8902
GLfloat param
Definition: glext.h:5796
GLsizei GLenum const GLvoid GLsizei GLenum GLbyte GLbyte GLbyte GLdouble GLdouble GLdouble GLfloat GLfloat GLfloat GLint GLint GLint GLshort GLshort GLshort GLubyte GLubyte GLubyte GLuint GLuint GLuint GLushort GLushort GLushort GLbyte GLbyte GLbyte GLbyte GLdouble GLdouble GLdouble GLdouble GLfloat GLfloat GLfloat GLfloat GLint GLint GLint GLint GLshort GLshort GLshort GLshort GLubyte GLubyte GLubyte GLubyte GLuint GLuint GLuint GLuint GLushort GLushort GLushort GLushort GLboolean const GLdouble const GLfloat const GLint const GLshort const GLbyte const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLdouble const GLfloat const GLfloat const GLint const GLint const GLshort const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort GLenum GLenum GLenum GLfloat GLenum GLint GLenum GLenum GLenum GLfloat GLenum GLenum GLint GLenum GLfloat GLenum GLint GLint GLushort GLenum GLenum GLfloat GLenum GLenum GLint GLfloat const GLubyte GLenum GLenum GLenum const GLfloat GLenum GLenum const GLint GLenum GLint GLint GLsizei GLsizei GLint GLenum GLenum const GLvoid GLenum GLenum const GLfloat GLenum GLenum const GLint GLenum GLenum const GLdouble GLenum GLenum const GLfloat GLenum GLenum const GLint GLsizei GLuint GLfloat GLuint GLbitfield GLfloat GLint GLuint GLboolean GLenum GLfloat GLenum GLbitfield GLenum GLfloat GLfloat GLint GLint const GLfloat GLenum GLfloat GLfloat GLint GLint GLfloat GLfloat GLint GLint const GLfloat GLint GLfloat GLfloat GLint GLfloat GLfloat GLint GLfloat GLfloat const GLdouble const GLfloat const GLdouble const GLfloat GLint i
Definition: glfuncs.h:248
GLsizei GLenum const GLvoid GLsizei GLenum GLbyte GLbyte GLbyte GLdouble GLdouble GLdouble GLfloat GLfloat GLfloat GLint GLint GLint GLshort GLshort GLshort GLubyte GLubyte GLubyte GLuint GLuint GLuint GLushort GLushort GLushort GLbyte GLbyte GLbyte GLbyte GLdouble GLdouble GLdouble GLdouble GLfloat GLfloat GLfloat GLfloat GLint GLint GLint GLint GLshort GLshort GLshort GLshort GLubyte GLubyte GLubyte GLubyte GLuint GLuint GLuint GLuint GLushort GLushort GLushort GLushort GLboolean const GLdouble const GLfloat const GLint const GLshort const GLbyte const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLdouble const GLfloat const GLfloat const GLint const GLint const GLshort const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort GLenum GLenum GLenum GLfloat GLenum GLint GLenum GLenum GLenum GLfloat GLenum GLenum GLint GLenum GLfloat GLenum GLint GLint GLushort GLenum GLenum GLfloat GLenum GLenum GLint GLfloat const GLubyte GLenum GLenum GLenum const GLfloat GLenum GLenum const GLint GLenum GLint GLint GLsizei GLsizei GLint GLenum GLenum const GLvoid GLenum GLenum const GLfloat GLenum GLenum const GLint GLenum GLenum const GLdouble GLenum GLenum const GLfloat GLenum GLenum const GLint GLsizei GLuint GLfloat GLuint GLbitfield GLfloat GLint GLuint GLboolean GLenum GLfloat GLenum GLbitfield GLenum GLfloat GLfloat GLint GLint const GLfloat GLenum GLfloat GLfloat GLint GLint GLfloat GLfloat GLint GLint const GLfloat GLint GLfloat GLfloat GLint GLfloat GLfloat GLint GLfloat GLfloat const GLdouble * u
Definition: glfuncs.h:240
DWORD(CALLBACK * PRTL_WORK_ITEM_ROUTINE)(LPVOID)
Definition: winternl.h:1893
NTSYSAPI PVOID WINAPI RtlReAllocateHeap(HANDLE, ULONG, PVOID, SIZE_T)
Definition: heap.c:2686
VOID(CALLBACK * PRTL_OVERLAPPED_COMPLETION_ROUTINE)(DWORD, DWORD, LPVOID)
Definition: winternl.h:2016
void(NTAPI * RTL_WAITORTIMERCALLBACKFUNC)(PVOID, BOOLEAN)
Definition: winternl.h:1894
NTSYSAPI NTSTATUS WINAPI RtlInitializeCriticalSectionEx(RTL_CRITICAL_SECTION *, ULONG, ULONG)
void(CALLBACK * PNTAPCFUNC)(ULONG_PTR, ULONG_PTR, ULONG_PTR)
Definition: winternl.h:1891
void(CALLBACK * PRTL_THREAD_START_ROUTINE)(LPVOID)
Definition: winternl.h:1892
NTSYSAPI ULONG WINAPI RtlNtStatusToDosError(NTSTATUS)
#define InterlockedCompareExchangePointer
Definition: interlocked.h:129
NTSTATUS NTAPI NtSetIoCompletion(IN HANDLE IoCompletionPortHandle, IN PVOID CompletionKey, IN PVOID CompletionContext, IN NTSTATUS CompletionStatus, IN ULONG CompletionInformation)
Definition: iocomp.c:569
NTSTATUS NTAPI NtRemoveIoCompletion(IN HANDLE IoCompletionHandle, OUT PVOID *KeyContext, OUT PVOID *ApcContext, OUT PIO_STATUS_BLOCK IoStatusBlock, IN PLARGE_INTEGER Timeout OPTIONAL)
Definition: iocomp.c:445
NTSTATUS NTAPI NtCreateIoCompletion(OUT PHANDLE IoCompletionHandle, IN ACCESS_MASK DesiredAccess, IN POBJECT_ATTRIBUTES ObjectAttributes, IN ULONG NumberOfConcurrentThreads)
Definition: iocomp.c:253
#define NtCurrentTeb
uint32_t entry
Definition: isohybrid.c:63
#define EVENT_ALL_ACCESS
Definition: isotest.c:82
NTSTATUS NTAPI LdrUnloadDll(_In_ PVOID BaseAddress)
Definition: ldrapi.c:1291
NTSTATUS NTAPI LdrAddRefDll(_In_ ULONG Flags, _In_ PVOID BaseAddress)
Definition: ldrapi.c:1205
if(dx< 0)
Definition: linetemp.h:194
__u16 time
Definition: mkdosfs.c:8
static const CLSID * objects[]
Definition: apphelp.c:112
static PVOID ptr
Definition: dispmode.c:27
static IPrintDialogCallback callback
Definition: printdlg.c:326
IMAGE_NT_HEADERS nt
Definition: module.c:50
#define IO_COMPLETION_ALL_ACCESS
Definition: file.c:72
static HANDLE PIO_APC_ROUTINE PVOID PIO_STATUS_BLOCK io
Definition: file.c:100
static PIO_STATUS_BLOCK iosb
Definition: file.c:98
HANDLE semaphore
Definition: threadpool.c:1889
static TP_CALLBACK_ENVIRON *static TP_CALLBACK_ENVIRON *static PTP_WORK_CALLBACK
Definition: threadpool.c:32
static PTP_TIMER_CALLBACK
Definition: threadpool.c:30
static PTP_IO_CALLBACK
Definition: threadpool.c:28
static DWORD
Definition: threadpool.c:34
static TP_CALLBACK_ENVIRON *static PTP_WAIT_CALLBACK
Definition: threadpool.c:31
static void TP_CALLBACK_ENVIRON *static PVOID
Definition: threadpool.c:29
static ATOM item
Definition: dde.c:856
#define min(a, b)
Definition: monoChain.cc:55
NTSTATUS NTAPI NtReleaseMutant(IN HANDLE MutantHandle, IN PLONG PreviousCount OPTIONAL)
Definition: mutant.c:296
NTSYSAPI VOID NTAPI RtlInitializeConditionVariable(_Out_ PRTL_CONDITION_VARIABLE ConditionVariable)
NTSYSAPI NTSTATUS NTAPI RtlDeleteCriticalSection(_In_ PRTL_CRITICAL_SECTION CriticalSection)
NTSYSAPI NTSTATUS NTAPI RtlQueueWorkItem(_In_ WORKERCALLBACKFUNC Function, _In_opt_ PVOID Context, _In_ ULONG Flags)
NTSYSAPI VOID NTAPI RtlWakeAllConditionVariable(_Inout_ PRTL_CONDITION_VARIABLE ConditionVariable)
NTSYSAPI NTSTATUS NTAPI RtlRegisterWait(_In_ PHANDLE phNewWaitObject, _In_ HANDLE hObject, _In_ WAITORTIMERCALLBACKFUNC Callback, _In_ PVOID pvContext, _In_ ULONG ulMilliseconds, _In_ ULONG ulFlags)
NTSYSAPI NTSTATUS NTAPI RtlSleepConditionVariableCS(_Inout_ PRTL_CONDITION_VARIABLE ConditionVariable, _Inout_ PRTL_CRITICAL_SECTION CriticalSection, _In_opt_ PLARGE_INTEGER TimeOut)
NTSYSAPI NTSTATUS NTAPI RtlEnterCriticalSection(_In_ PRTL_CRITICAL_SECTION CriticalSection)
NTSYSAPI VOID NTAPI RtlExitUserThread(_In_ NTSTATUS Status)
NTSYSAPI VOID NTAPI RtlWakeConditionVariable(_Inout_ PRTL_CONDITION_VARIABLE ConditionVariable)
NTSYSAPI NTSTATUS NTAPI RtlLeaveCriticalSection(_In_ PRTL_CRITICAL_SECTION CriticalSection)
NTSYSAPI NTSTATUS NTAPI RtlCreateUserThread(_In_ PVOID ThreadContext, _Out_ HANDLE *OutThreadHandle, _Reserved_ PVOID Reserved1, _Reserved_ PVOID Reserved2, _Reserved_ PVOID Reserved3, _Reserved_ PVOID Reserved4, _Reserved_ PVOID Reserved5, _Reserved_ PVOID Reserved6, _Reserved_ PVOID Reserved7, _Reserved_ PVOID Reserved8)
NTSYSAPI NTSTATUS NTAPI RtlInitializeCriticalSection(_In_ PRTL_CRITICAL_SECTION CriticalSection)
NTSYSAPI NTSTATUS NTAPI RtlDeregisterWaitEx(_In_ HANDLE hWaitHandle, _In_opt_ HANDLE hCompletionEvent)
NTSYSAPI NTSTATUS NTAPI RtlDeregisterWait(_In_ HANDLE hWaitHandle)
ULONG(NTAPI * PTHREAD_START_ROUTINE)(PVOID Parameter)
Definition: rtltypes.h:566
NTSYSAPI VOID NTAPI RtlInitUnicodeString(PUNICODE_STRING DestinationString, PCWSTR SourceString)
NTSYSAPI NTSTATUS NTAPI NtSetInformationFile(IN HANDLE hFile, OUT PIO_STATUS_BLOCK pIoStatusBlock, IN PVOID FileInformationBuffer, IN ULONG FileInformationBufferLength, IN FILE_INFORMATION_CLASS FileInfoClass)
Definition: iofunc.c:3096
NTSTATUS NTAPI NtClose(IN HANDLE Handle)
Definition: obhandle.c:3402
NTSYSAPI NTSTATUS NTAPI NtWaitForSingleObject(IN HANDLE hObject, IN BOOLEAN bAlertable, IN PLARGE_INTEGER Timeout)
#define LPVOID
Definition: nt_native.h:45
#define MAXLONGLONG
@ SynchronizationEvent
VOID NTAPI RtlpInitializeThreadPooling(VOID)
NTSTATUS NTAPI NtSetEvent(IN HANDLE EventHandle, OUT PLONG PreviousState OPTIONAL)
Definition: event.c:455
NTSTATUS NTAPI NtCreateEvent(OUT PHANDLE EventHandle, IN ACCESS_MASK DesiredAccess, IN POBJECT_ATTRIBUTES ObjectAttributes OPTIONAL, IN EVENT_TYPE EventType, IN BOOLEAN InitialState)
Definition: event.c:96
NTSTATUS NTAPI NtQueryPerformanceCounter(OUT PLARGE_INTEGER PerformanceCounter, OUT PLARGE_INTEGER PerformanceFrequency OPTIONAL)
Definition: profile.c:272
NTSTATUS NTAPI NtQuerySystemTime(OUT PLARGE_INTEGER SystemTime)
Definition: time.c:569
NTSTATUS NTAPI NtSetInformationThread(IN HANDLE ThreadHandle, IN THREADINFOCLASS ThreadInformationClass, IN PVOID ThreadInformation, IN ULONG ThreadInformationLength)
Definition: query.c:2067
PVOID *typedef PHANDLE
Definition: ntsecpkg.h:455
#define STATUS_WAIT_0
Definition: ntstatus.h:237
#define STATUS_TOO_MANY_THREADS
Definition: ntstatus.h:533
#define STATUS_INVALID_PARAMETER_1
Definition: ntstatus.h:475
#define L(x)
Definition: ntvdm.h:50
NTSTATUS NTAPI NtWaitForMultipleObjects(IN ULONG ObjectCount, IN PHANDLE HandleArray, IN WAIT_TYPE WaitType, IN BOOLEAN Alertable, IN PLARGE_INTEGER TimeOut OPTIONAL)
Definition: obwait.c:46
#define BOOLEAN
Definition: pedump.c:73
long LONG
Definition: pedump.c:60
static unsigned __int64 next
Definition: rand_nt.c:6
#define err(...)
static calc_node_t temp
Definition: rpn_ieee.c:38
#define LIST_FOR_EACH_ENTRY(elem, list, type, field)
Definition: list.h:198
#define LIST_FOR_EACH_ENTRY_SAFE(cursor, cursor2, list, type, field)
Definition: list.h:204
#define LIST_FOR_EACH(cursor, list)
Definition: list.h:188
__WINE_SERVER_LIST_INLINE void list_add_before(struct list *elem, struct list *to_add)
Definition: list.h:87
#define memset(x, y, z)
Definition: compat.h:39
VOID WINAPI TpReleaseCleanupGroupMembers(TP_CLEANUP_GROUP *group, BOOL cancel_pending, PVOID userdata)
Definition: threadpool.c:2921
static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug
Definition: threadpool.c:322
static void tp_object_wait(struct threadpool_object *object, BOOL group_wait)
Definition: threadpool.c:2206
VOID WINAPI TpReleaseTimer(TP_TIMER *timer)
Definition: threadpool.c:3030
RTL_CRITICAL_SECTION threadpool_compl_cs
Definition: threadpool.c:94
VOID WINAPI TpReleasePool(TP_POOL *pool)
Definition: threadpool.c:3017
static struct @5099 ioqueue
static BOOL tp_object_release(struct threadpool_object *object)
Definition: threadpool.c:2255
void WINAPI TpStartAsyncIoOperation(TP_IO *io)
Definition: threadpool.c:3276
VOID WINAPI TpDisassociateCallback(TP_CALLBACK_INSTANCE *instance)
Definition: threadpool.c:2853
NTSTATUS WINAPI RtlCreateTimer(HANDLE TimerQueue, HANDLE *NewTimer, RTL_WAITORTIMERCALLBACKFUNC Callback, PVOID Parameter, DWORD DueTime, DWORD Period, ULONG Flags)
Definition: threadpool.c:1006
static PLARGE_INTEGER get_nt_timeout(PLARGE_INTEGER pTime, ULONG timeout)
Definition: threadpool.c:632
NTSTATUS WINAPI RtlSetIoCompletionCallback(HANDLE FileHandle, PRTL_OVERLAPPED_COMPLETION_ROUTINE Function, ULONG Flags)
Definition: threadpool.c:594
NTSTATUS WINAPI TpAllocTimer(TP_TIMER **out, PTP_TIMER_CALLBACK callback, PVOID userdata, TP_CALLBACK_ENVIRON *environment)
Definition: threadpool.c:2604
static void set_thread_name(const WCHAR *name)
Definition: threadpool.c:473
static void CALLBACK ioqueue_thread_proc(void *param)
Definition: threadpool.c:1618
static struct threadpool_group * impl_from_TP_CLEANUP_GROUP(TP_CLEANUP_GROUP *group)
Definition: threadpool.c:425
VOID WINAPI TpCallbackUnloadDllOnCompletion(TP_CALLBACK_INSTANCE *instance, HMODULE module)
Definition: threadpool.c:2840
static void queue_move_timer(struct queue_timer *t, ULONGLONG time, BOOL set_event)
Definition: threadpool.c:715
static struct @5097 timerqueue
VOID WINAPI TpSetTimer(TP_TIMER *timer, LARGE_INTEGER *timeout, LONG period, LONG window_length)
Definition: threadpool.c:3116
HANDLE compl_port
Definition: threadpool.c:93
VOID WINAPI TpReleaseCleanupGroup(TP_CLEANUP_GROUP *group)
Definition: threadpool.c:2908
NTSTATUS WINAPI TpCallbackMayRunLong(TP_CALLBACK_INSTANCE *instance)
Definition: threadpool.c:2756
static void CALLBACK process_rtl_work_item(TP_CALLBACK_INSTANCE *instance, void *userdata)
Definition: threadpool.c:484
static void tp_group_shutdown(struct threadpool_group *group)
Definition: threadpool.c:1995
VOID WINAPI TpWaitForWork(TP_WORK *work, BOOL cancel_pending)
Definition: threadpool.c:3334
NTSTATUS WINAPI RtlCreateTimerQueue(PHANDLE NewTimerQueue)
Definition: threadpool.c:867
#define MAXIMUM_WAITQUEUE_OBJECTS
Definition: threadpool.c:145
HANDLE port
Definition: threadpool.c:371
LONG objcount
Definition: threadpool.c:291
static void queue_add_timer(struct queue_timer *t, ULONGLONG time, BOOL set_event)
Definition: threadpool.c:689
BOOL WINAPI TpSetPoolMinThreads(TP_POOL *pool, DWORD minimum)
Definition: threadpool.c:3087
static BOOL array_reserve(void **elements, unsigned int *capacity, unsigned int count, unsigned int size)
Definition: threadpool.c:446
static struct @5098 waitqueue
static struct list * threadpool_get_next_item(const struct threadpool *pool)
Definition: threadpool.c:2295
static void tp_object_initialize(struct threadpool_object *object, struct threadpool *pool, PVOID userdata, TP_CALLBACK_ENVIRON *environment)
Definition: threadpool.c:2029
static void tp_waitqueue_unlock(struct threadpool_object *wait)
Definition: threadpool.c:1596
static ULONG queue_get_timeout(struct timer_queue *q)
Definition: threadpool.c:768
static NTSTATUS tp_waitqueue_lock(struct threadpool_object *wait)
Definition: threadpool.c:1518
static BOOL tp_group_release(struct threadpool_group *group)
Definition: threadpool.c:2005
threadpool_objtype
Definition: threadpool.c:167
@ TP_OBJECT_TYPE_SIMPLE
Definition: threadpool.c:168
@ TP_OBJECT_TYPE_WORK
Definition: threadpool.c:169
@ TP_OBJECT_TYPE_TIMER
Definition: threadpool.c:170
@ TP_OBJECT_TYPE_IO
Definition: threadpool.c:172
@ TP_OBJECT_TYPE_WAIT
Definition: threadpool.c:171
NTSTATUS WINAPI TpSimpleTryPost(PTP_SIMPLE_CALLBACK callback, PVOID userdata, TP_CALLBACK_ENVIRON *environment)
Definition: threadpool.c:3246
static struct threadpool * default_threadpool
Definition: threadpool.c:444
static ULONGLONG queue_current_time(void)
Definition: threadpool.c:682
void WINAPI TpReleaseIoCompletion(TP_IO *io)
Definition: threadpool.c:2994
static DWORD WINAPI timer_callback_wrapper(LPVOID p)
Definition: threadpool.c:674
static NTSTATUS tp_threadpool_alloc(struct threadpool **out)
Definition: threadpool.c:1785
VOID WINAPI TpPostWork(TP_WORK *work)
Definition: threadpool.c:2896
NTSTATUS WINAPI TpAllocWork(TP_WORK **out, PTP_WORK_CALLBACK callback, PVOID userdata, TP_CALLBACK_ENVIRON *environment)
Definition: threadpool.c:2690
NTSTATUS WINAPI RtlDeleteTimer(HANDLE TimerQueue, HANDLE Timer, HANDLE CompletionEvent)
Definition: threadpool.c:1101
static void tp_object_submit(struct threadpool_object *object, BOOL signaled)
Definition: threadpool.c:2122
static NTSTATUS tp_new_worker_thread(struct threadpool *pool)
Definition: threadpool.c:1245
NTSTATUS WINAPI TpAllocCleanupGroup(TP_CLEANUP_GROUP **out)
Definition: threadpool.c:2537
static BOOL tp_threadpool_release(struct threadpool *pool)
Definition: threadpool.c:1848
NTSTATUS WINAPI TpAllocWait(TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata, TP_CALLBACK_ENVIRON *environment)
Definition: threadpool.c:2680
struct list pending_timers
Definition: threadpool.c:293
static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug
Definition: threadpool.c:285
static struct threadpool_object * impl_from_TP_WORK(TP_WORK *work)
Definition: threadpool.c:397
CRITICAL_SECTION cs
Definition: threadpool.c:290
NTSTATUS WINAPI TpQueryPoolStackInformation(TP_POOL *pool, TP_POOL_STACK_INFORMATION *stack_info)
Definition: threadpool.c:3367
#define TIMER_QUEUE_MAGIC
Definition: threadpool.c:85
VOID WINAPI TpWaitForWait(TP_WAIT *wait, BOOL cancel_pending)
Definition: threadpool.c:3320
static struct threadpool_object * impl_from_TP_WAIT(TP_WAIT *wait)
Definition: threadpool.c:411
VOID WINAPI TpReleaseWork(TP_WORK *work)
Definition: threadpool.c:3058
VOID WINAPI TpWaitForTimer(TP_TIMER *timer, BOOL cancel_pending)
Definition: threadpool.c:3306
void WINAPI TpCancelAsyncIoOperation(TP_IO *io)
Definition: threadpool.c:2721
static BOOL object_is_finished(struct threadpool_object *object, BOOL group)
Definition: threadpool.c:2187
#define THREADPOOL_WORKER_TIMEOUT
Definition: threadpool.c:144
VOID WINAPI TpCallbackSetEventOnCompletion(TP_CALLBACK_INSTANCE *instance, HANDLE event)
Definition: threadpool.c:2827
VOID WINAPI TpReleaseWait(TP_WAIT *wait)
Definition: threadpool.c:3044
struct list buckets
Definition: threadpool.c:329
#define EXPIRE_NEVER
Definition: threadpool.c:84
static void tp_ioqueue_unlock(struct threadpool_object *io)
Definition: threadpool.c:2221
static void CALLBACK threadpool_worker_proc(void *param)
Definition: threadpool.c:2474
static void tp_threadpool_unlock(struct threadpool *pool)
Definition: threadpool.c:1950
static void WINAPI timer_queue_thread_proc(LPVOID p)
Definition: threadpool.c:793
static NTSTATUS tp_group_alloc(struct threadpool_group **out)
Definition: threadpool.c:1963
VOID WINAPI TpSetWait(TP_WAIT *wait, HANDLE handle, LARGE_INTEGER *timeout)
Definition: threadpool.c:3194
VOID WINAPI TpCallbackReleaseMutexOnCompletion(TP_CALLBACK_INSTANCE *instance, HANDLE mutex)
Definition: threadpool.c:2798
static struct timer_queue * get_timer_queue(HANDLE TimerQueue)
Definition: threadpool.c:957
LONG num_buckets
Definition: threadpool.c:328
static void queue_destroy_timer(struct queue_timer *t)
Definition: threadpool.c:840
static struct threadpool_instance * impl_from_TP_CALLBACK_INSTANCE(TP_CALLBACK_INSTANCE *instance)
Definition: threadpool.c:430
static DWORD CALLBACK iocp_poller(LPVOID Arg)
Definition: threadpool.c:544
static struct threadpool_object * impl_from_TP_TIMER(TP_TIMER *timer)
Definition: threadpool.c:404
static NTSTATUS tp_alloc_wait(TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata, TP_CALLBACK_ENVIRON *environment, DWORD flags)
Definition: threadpool.c:2641
BOOL WINAPI TpIsTimerSet(TP_TIMER *timer)
Definition: threadpool.c:2884
RTL_CONDITION_VARIABLE update_event
Definition: threadpool.c:294
static struct threadpool_object * impl_from_TP_IO(TP_IO *io)
Definition: threadpool.c:418
NTSTATUS WINAPI RtlDeleteTimerQueueEx(HANDLE TimerQueue, HANDLE CompletionEvent)
Definition: threadpool.c:913
static NTSTATUS tp_ioqueue_lock(struct threadpool_object *io, HANDLE file)
Definition: threadpool.c:1727
static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug
Definition: threadpool.c:88
static void CALLBACK timerqueue_thread_proc(void *param)
Definition: threadpool.c:1147
static void tp_object_execute(struct threadpool_object *object, BOOL wait_thread)
Definition: threadpool.c:2315
static void tp_timerqueue_unlock(struct threadpool_object *timer)
Definition: threadpool.c:1310
VOID WINAPI TpCallbackReleaseSemaphoreOnCompletion(TP_CALLBACK_INSTANCE *instance, HANDLE semaphore, DWORD count)
Definition: threadpool.c:2811
VOID WINAPI TpSetPoolMaxThreads(TP_POOL *pool, DWORD maximum)
Definition: threadpool.c:3072
NTSTATUS WINAPI TpAllocIoCompletion(TP_IO **out, HANDLE file, PTP_IO_CALLBACK callback, void *userdata, TP_CALLBACK_ENVIRON *environment)
Definition: threadpool.c:2547
static void CALLBACK waitqueue_thread_proc(void *param)
Definition: threadpool.c:1342
static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug
Definition: threadpool.c:363
static void tp_object_cancel(struct threadpool_object *object)
Definition: threadpool.c:2161
NTSTATUS WINAPI RtlUpdateTimer(HANDLE TimerQueue, HANDLE Timer, DWORD DueTime, DWORD Period)
Definition: threadpool.c:1065
BOOL thread_running
Definition: threadpool.c:292
static void timer_cleanup_callback(struct queue_timer *t)
Definition: threadpool.c:660
NTSTATUS WINAPI TpSetPoolStackInformation(TP_POOL *pool, TP_POOL_STACK_INFORMATION *stack_info)
Definition: threadpool.c:3348
void WINAPI TpWaitForIoCompletion(TP_IO *io, BOOL cancel_pending)
Definition: threadpool.c:3292
NTSTATUS WINAPI TpAllocPool(TP_POOL **out, PVOID reserved)
Definition: threadpool.c:2591
VOID WINAPI TpCallbackLeaveCriticalSectionOnCompletion(TP_CALLBACK_INSTANCE *instance, CRITICAL_SECTION *crit)
Definition: threadpool.c:2743
static void queue_remove_timer(struct queue_timer *t)
Definition: threadpool.c:641
static struct threadpool * impl_from_TP_POOL(TP_POOL *pool)
Definition: threadpool.c:392
static struct @5096 old_threadpool
static void tp_threadpool_shutdown(struct threadpool *pool)
Definition: threadpool.c:1835
static void queue_timer_expire(struct timer_queue *q)
Definition: threadpool.c:723
static void tp_object_prepare_shutdown(struct threadpool_object *object)
Definition: threadpool.c:2240
static NTSTATUS tp_timerqueue_lock(struct threadpool_object *timer)
Definition: threadpool.c:1268
static NTSTATUS tp_threadpool_lock(struct threadpool **out, TP_CALLBACK_ENVIRON *environment)
Definition: threadpool.c:1877
static void CALLBACK rtl_wait_callback(TP_CALLBACK_INSTANCE *instance, void *userdata, TP_WAIT *wait, TP_WAIT_RESULT result)
Definition: threadpool.c:3383
static void tp_object_prio_queue(struct threadpool_object *object)
Definition: threadpool.c:2110
NTSTATUS NTAPI NtReleaseSemaphore(IN HANDLE SemaphoreHandle, IN LONG ReleaseCount, OUT PLONG PreviousCount OPTIONAL)
Definition: sem.c:295
#define STATUS_SUCCESS
Definition: shellext.h:65
namespace GUID const ADDRINFOEXW ADDRINFOEXW struct timeval OVERLAPPED * overlapped
Definition: sock.c:81
INT WSAAPI shutdown(IN SOCKET s, IN INT how)
Definition: sockctrl.c:506
#define TRACE(s)
Definition: solgame.cpp:4
IMAGE_OPTIONAL_HEADER32 OptionalHeader
Definition: ntddk_ex.h:184
PVOID ImageBaseAddress
Definition: ntddk_ex.h:245
PTP_CLEANUP_GROUP CleanupGroup
Definition: winnt_old.h:4527
union _TP_CALLBACK_ENVIRON_V3::@4304 u
PTP_SIMPLE_CALLBACK FinalizationCallback
Definition: winnt_old.h:4531
TP_CALLBACK_PRIORITY CallbackPriority
Definition: winnt_old.h:4540
PTP_CLEANUP_GROUP_CANCEL_CALLBACK CleanupGroupCancelCallback
Definition: winnt_old.h:4528
struct _ACTIVATION_CONTEXT * ActivationContext
Definition: winnt_old.h:4530
struct _TP_CALLBACK_ENVIRON_V3::@4304::@4305 s
Definition: http.c:7252
Definition: fci.c:127
IO_STATUS_BLOCK iosb
Definition: threadpool.c:177
ULONG_PTR cvalue
Definition: threadpool.c:178
Definition: copy.c:22
Definition: list.h:15
Definition: module.h:456
Definition: name.c:39
BOOL destroy
Definition: threadpool.c:126
RTL_WAITORTIMERCALLBACKFUNC callback
Definition: threadpool.c:121
DWORD period
Definition: threadpool.c:123
ULONG runcount
Definition: threadpool.c:120
struct timer_queue * q
Definition: threadpool.c:118
PVOID param
Definition: threadpool.c:122
HANDLE event
Definition: threadpool.c:127
ULONG flags
Definition: threadpool.c:124
ULONGLONG expire
Definition: threadpool.c:125
struct list entry
Definition: threadpool.c:119
PVOID context
Definition: threadpool.c:81
PRTL_WORK_ITEM_ROUTINE function
Definition: threadpool.c:80
Definition: ps.c:97
struct list members
Definition: threadpool.c:280
CRITICAL_SECTION cs
Definition: threadpool.c:278
CRITICAL_SECTION * critical_section
Definition: threadpool.c:264
struct threadpool_instance::@5106 cleanup
struct threadpool_object * object
Definition: threadpool.c:258
struct list wait_entry
Definition: threadpool.c:238
PTP_WAIT_CALLBACK callback
Definition: threadpool.c:233
PTP_SIMPLE_CALLBACK callback
Definition: threadpool.c:213
PTP_CLEANUP_GROUP_CANCEL_CALLBACK group_cancel_callback
Definition: threadpool.c:192
RTL_CONDITION_VARIABLE finished_event
Definition: threadpool.c:202
TP_CALLBACK_PRIORITY priority
Definition: threadpool.c:196
ULONGLONG timeout
Definition: threadpool.c:227
struct io_completion * completions
Definition: threadpool.c:250
PTP_IO_CALLBACK callback
Definition: threadpool.c:246
struct threadpool_object::@5100::@5102 work
struct threadpool * pool
Definition: threadpool.c:189
unsigned int skipped_count
Definition: threadpool.c:248
struct list group_entry
Definition: threadpool.c:198
enum threadpool_objtype type
Definition: threadpool.c:188
union threadpool_object::@5100 u
void * win32_callback
Definition: threadpool.c:184
unsigned int completion_count
Definition: threadpool.c:248
unsigned int completion_max
Definition: threadpool.c:248
unsigned int pending_count
Definition: threadpool.c:248
struct list pool_entry
Definition: threadpool.c:201
LONG num_associated_callbacks
Definition: threadpool.c:207
RTL_WAITORTIMERCALLBACKFUNC rtl_callback
Definition: threadpool.c:242
struct threadpool_object::@5100::@5101 simple
LONG num_running_callbacks
Definition: threadpool.c:206
LONG num_pending_callbacks
Definition: threadpool.c:205
PTP_WORK_CALLBACK callback
Definition: threadpool.c:217
struct threadpool_object::@5100::@5105 io
RTL_CONDITION_VARIABLE group_finished_event
Definition: threadpool.c:203
struct threadpool_object::@5100::@5103 timer
struct threadpool_object::@5100::@5104 wait
PTP_SIMPLE_CALLBACK finalization_callback
Definition: threadpool.c:193
struct list timer_entry
Definition: threadpool.c:225
struct waitqueue_bucket * bucket
Definition: threadpool.c:236
HANDLE completed_event
Definition: threadpool.c:204
PTP_TIMER_CALLBACK callback
Definition: threadpool.c:221
struct threadpool_group * group
Definition: threadpool.c:190
int max_workers
Definition: threadpool.c:158
TP_POOL_STACK_INFORMATION stack_info
Definition: threadpool.c:163
CRITICAL_SECTION cs
Definition: threadpool.c:153
RTL_CONDITION_VARIABLE update_event
Definition: threadpool.c:156
int num_workers
Definition: threadpool.c:160
LONG objcount
Definition: threadpool.c:151
int min_workers
Definition: threadpool.c:159
struct list pools[3]
Definition: threadpool.c:155
LONG refcount
Definition: threadpool.c:150
BOOL shutdown
Definition: threadpool.c:152
int num_busy_workers
Definition: threadpool.c:161
HANDLE compl_port
Definition: threadpool.c:162
Definition: dhcpd.h:245
HANDLE event
Definition: threadpool.c:136
RTL_CRITICAL_SECTION cs
Definition: threadpool.c:133
HANDLE thread
Definition: threadpool.c:137
DWORD magic
Definition: threadpool.c:132
struct list timers
Definition: threadpool.c:134
struct list waiting
Definition: threadpool.c:356
HANDLE update_event
Definition: threadpool.c:357
struct list bucket_entry
Definition: threadpool.c:353
struct list reserved
Definition: threadpool.c:355
#define max(a, b)
Definition: svc.c:63
#define LIST_INIT(head)
Definition: queue.h:197
#define LIST_ENTRY(type)
Definition: queue.h:175
#define DWORD_PTR
Definition: treelist.c:76
uint32_t * PULONG_PTR
Definition: typedefs.h:65
int64_t LONGLONG
Definition: typedefs.h:68
#define NTAPI
Definition: typedefs.h:36
ULONG_PTR SIZE_T
Definition: typedefs.h:80
uint32_t ULONG_PTR
Definition: typedefs.h:65
uint32_t ULONG
Definition: typedefs.h:59
uint64_t ULONGLONG
Definition: typedefs.h:67
#define STATUS_INVALID_PARAMETER
Definition: udferr_usr.h:135
#define STATUS_UNSUCCESSFUL
Definition: udferr_usr.h:132
static EFI_HANDLE * handles
Definition: uefidisk.c:62
LONGLONG QuadPart
Definition: typedefs.h:114
Definition: pdh_main.c:96
wchar_t tm const _CrtWcstime_Writes_and_advances_ptr_ count wchar_t ** out
Definition: wcsftime.cpp:383
_In_ WDFINTERRUPT _In_ PFN_WDF_INTERRUPT_SYNCHRONIZE Callback
Definition: wdfinterrupt.h:458
_In_ WDFTIMER _In_ LONGLONG DueTime
Definition: wdftimer.h:190
HANDLE WINAPI GetCurrentThread(void)
Definition: proc.c:1148
DWORD WINAPI GetCurrentThreadId(void)
Definition: thread.c:459
#define WAIT_OBJECT_0
Definition: winbase.h:432
_In_ PCCERT_CONTEXT _In_opt_ LPFILETIME pTime
Definition: wincrypt.h:4837
#define WINAPI
Definition: msvc.h:6
#define WT_TRANSFER_IMPERSONATION
Definition: winnt_old.h:1074
@ TP_CALLBACK_PRIORITY_NORMAL
Definition: winnt_old.h:4490
@ TP_CALLBACK_PRIORITY_HIGH
Definition: winnt_old.h:4489
@ TP_CALLBACK_PRIORITY_LOW
Definition: winnt_old.h:4491
struct _TP_CLEANUP_GROUP TP_CLEANUP_GROUP
Definition: winnt_old.h:4508
VOID(NTAPI * PTP_WAIT_CALLBACK)(PTP_CALLBACK_INSTANCE, PVOID, PTP_WAIT, TP_WAIT_RESULT)
Definition: winnt_old.h:4521
#define WT_EXECUTEINPERSISTENTTHREAD
Definition: winnt_old.h:1073
struct _TP_CALLBACK_INSTANCE * PTP_CALLBACK_INSTANCE
Definition: winnt_old.h:4480
#define WT_EXECUTEONLYONCE
Definition: winnt_old.h:1069
struct _TP_POOL TP_POOL
Definition: winnt_old.h:4478
#define WT_EXECUTEINIOTHREAD
Definition: winnt_old.h:1066
DWORD TP_WAIT_RESULT
Definition: winnt_old.h:4485
#define RTL_CRITICAL_SECTION_FLAG_FORCE_DEBUG_INFO
Definition: winnt_old.h:1116
#define WT_EXECUTELONGFUNCTION
Definition: winnt_old.h:1070
#define WT_EXECUTEINWAITTHREAD
Definition: winnt_old.h:1068
VOID(NTAPI * PTP_CLEANUP_GROUP_CANCEL_CALLBACK)(_Inout_opt_ PVOID ObjectContext, _Inout_opt_ PVOID CleanupContext)
Definition: winnt_old.h:4516
VOID(NTAPI * PTP_WORK_CALLBACK)(_Inout_ PTP_CALLBACK_INSTANCE Instance, _Inout_opt_ PVOID Context, _Inout_ PTP_WORK Work)
Definition: winnt_old.h:4503
#define WT_EXECUTEINTIMERTHREAD
Definition: winnt_old.h:1071
struct _TP_WAIT TP_WAIT
Definition: winnt_old.h:4482
struct _TP_IO * PTP_IO
Definition: winnt_old.h:4483
enum _TP_CALLBACK_PRIORITY TP_CALLBACK_PRIORITY
struct _TP_WORK TP_WORK
Definition: winnt_old.h:4479
struct _TP_IO TP_IO
Definition: winnt_old.h:4483
struct _TP_TIMER TP_TIMER
Definition: winnt_old.h:4481
VOID(NTAPI * PTP_SIMPLE_CALLBACK)(_Inout_ PTP_CALLBACK_INSTANCE Instance, _Inout_opt_ PVOID Context)
Definition: winnt_old.h:4511
VOID(NTAPI * PTP_TIMER_CALLBACK)(PTP_CALLBACK_INSTANCE, PVOID, PTP_TIMER)
Definition: winnt_old.h:4520
struct _TP_CALLBACK_INSTANCE TP_CALLBACK_INSTANCE
Definition: winnt_old.h:4480
#define WT_EXECUTEDEFAULT
Definition: winnt_old.h:1065
_Must_inspect_result_ _In_ ULONG Flags
Definition: wsk.h:170
_In_ LARGE_INTEGER _In_ ULONG Period
Definition: kefuncs.h:1313
#define RTL_CONDITION_VARIABLE_INIT
Definition: rtltypes.h:282
_Inout_opt_ PVOID Parameter
Definition: rtltypes.h:336
__wchar_t WCHAR
Definition: xmlstorage.h:180