ReactOS 0.4.16-dev-1493-ge7358c5
threadpool.c
Go to the documentation of this file.
1/*
2 * Thread pooling
3 *
4 * Copyright (c) 2006 Robert Shearman
5 * Copyright (c) 2014-2016 Sebastian Lackner
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
20 */
21
22
23#ifdef __REACTOS__
24#include <rtl_vista.h>
25#define NDEBUG
26#include "wine/list.h"
27#include <debug.h>
28
29#define ERR(fmt, ...) DPRINT1(fmt, ##__VA_ARGS__)
30#define FIXME(fmt, ...) DPRINT(fmt, ##__VA_ARGS__)
31#define WARN(fmt, ...) DPRINT(fmt, ##__VA_ARGS__)
32#define TRACE(fmt, ...) DPRINT(fmt, ##__VA_ARGS__)
33#ifndef ARRAY_SIZE
34#define ARRAY_SIZE(_x) (sizeof((_x))/sizeof((_x)[0]))
35#endif
36
37typedef struct _THREAD_NAME_INFORMATION
38{
41
47
50#define PRTL_WORK_ITEM_ROUTINE WORKERCALLBACKFUNC
51
52#define CRITICAL_SECTION RTL_CRITICAL_SECTION
53#define GetProcessHeap() RtlGetProcessHeap()
54#define GetCurrentProcess() NtCurrentProcess()
55#define GetCurrentThread() NtCurrentThread()
56#define GetCurrentThreadId() HandleToULong(NtCurrentTeb()->ClientId.UniqueThread)
57#else
58#include <assert.h>
59#include <stdarg.h>
60#include <limits.h>
61
62#include "ntstatus.h"
63#define WIN32_NO_STATUS
64#include "winternl.h"
65
66#include "wine/debug.h"
67#include "wine/list.h"
68
69#include "ntdll_misc.h"
70
72#endif
73
74/*
75 * Old thread pooling API
76 */
77
79{
82};
83
84#define EXPIRE_NEVER (~(ULONGLONG)0)
85#define TIMER_QUEUE_MAGIC 0x516d6954 /* TimQ */
86
87#ifndef __REACTOS__
89#endif
90
91static struct
92{
95}
97{
98 NULL, /* compl_port */
99#ifdef __REACTOS__
100 {0}, /* threadpool_compl_cs */
101#else
102 { &critsect_compl_debug, -1, 0, 0, 0, 0 }, /* threadpool_compl_cs */
103#endif
105
106#ifndef __REACTOS__
108{
109 0, 0, &old_threadpool.threadpool_compl_cs,
111 0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_compl_cs") }
112};
113#endif
114
115struct timer_queue;
117{
118 struct timer_queue *q;
119 struct list entry;
120 ULONG runcount; /* number of callbacks pending execution */
126 BOOL destroy; /* timer should be deleted; once set, never unset */
127 HANDLE event; /* removal event */
128};
129
131{
134 struct list timers; /* sorted by expiration time */
135 BOOL quit; /* queue should be deleted; once set, never unset */
138};
139
140/*
141 * Object-oriented thread pooling API
142 */
143
144#define THREADPOOL_WORKER_TIMEOUT 5000
145#define MAXIMUM_WAITQUEUE_OBJECTS (MAXIMUM_WAIT_OBJECTS - 1)
146
147/* internal threadpool representation */
149{
154 /* Pools of work items, locked via .cs, order matches TP_CALLBACK_PRIORITY - high, normal, low. */
155 struct list pools[3];
157 /* information about worker threads, locked via .cs */
164};
165
167{
173};
174
176{
179};
180
181/* internal threadpool object representation */
183{
184 void *win32_callback; /* leave space for kernelbase to store win32 callback */
187 /* read-only information */
197 /* information about the group, locked via .group->cs */
200 /* information about the pool, locked via .pool->cs */
208 /* arguments for callback */
209 union
210 {
211 struct
212 {
215 struct
216 {
219 struct
220 {
222 /* information about the timer, locked via timerqueue.cs */
231 struct
232 {
235 /* information about the wait object, locked via waitqueue.cs */
244 struct
245 {
247 /* locked via .pool->cs */
251 } io;
252 } u;
253};
254
255/* internal threadpool instance representation */
257{
262 struct
263 {
271};
272
273/* internal threadpool group representation */
275{
279 /* list of group members, locked via .cs */
280 struct list members;
281};
282
283#ifndef __REACTOS__
284/* global timerqueue object */
286#endif
287
288static struct
289{
295}
297{
298#ifdef __REACTOS__
299 {0}, /* cs */
300#else
301 { &timerqueue_debug, -1, 0, 0, 0, 0 }, /* cs */
302#endif
303 0, /* objcount */
304 FALSE, /* thread_running */
305 LIST_INIT( timerqueue.pending_timers ), /* pending_timers */
306#if __REACTOS__
307 0,
308#else
309 RTL_CONDITION_VARIABLE_INIT /* update_event */
310#endif
312
313#ifndef __REACTOS__
315{
316 0, 0, &timerqueue.cs,
318 0, 0, { (DWORD_PTR)(__FILE__ ": timerqueue.cs") }
319};
320
321/* global waitqueue object */
323#endif
324
325static struct
326{
329 struct list buckets;
330}
331waitqueue =
332{
333#ifdef __REACTOS__
334 {0}, /* cs */
335#else
336 { &waitqueue_debug, -1, 0, 0, 0, 0 }, /* cs */
337#endif
338 0, /* num_buckets */
339 LIST_INIT( waitqueue.buckets ) /* buckets */
341
342#ifndef __REACTOS__
344{
345 0, 0, &waitqueue.cs,
347 0, 0, { (DWORD_PTR)(__FILE__ ": waitqueue.cs") }
348};
349#endif
350
352{
356 struct list waiting;
359};
360
361#ifndef __REACTOS__
362/* global I/O completion queue object */
364#endif
365
366static struct
367{
373}
374ioqueue =
375{
376#ifdef __REACTOS__
377 .cs = {0},
378#else
379 .cs = { &ioqueue_debug, -1, 0, 0, 0, 0 },
380#endif
382
383#ifndef __REACTOS__
385{
386 0, 0, &ioqueue.cs,
388 0, 0, { (DWORD_PTR)(__FILE__ ": ioqueue.cs") }
389};
390#endif
391
392static inline struct threadpool *impl_from_TP_POOL( TP_POOL *pool )
393{
394 return (struct threadpool *)pool;
395}
396
398{
399 struct threadpool_object *object = (struct threadpool_object *)work;
401 return object;
402}
403
405{
406 struct threadpool_object *object = (struct threadpool_object *)timer;
408 return object;
409}
410
412{
413 struct threadpool_object *object = (struct threadpool_object *)wait;
415 return object;
416}
417
418static inline struct threadpool_object *impl_from_TP_IO( TP_IO *io )
419{
420 struct threadpool_object *object = (struct threadpool_object *)io;
421 assert( object->type == TP_OBJECT_TYPE_IO );
422 return object;
423}
424
426{
427 return (struct threadpool_group *)group;
428}
429
431{
432 return (struct threadpool_instance *)instance;
433}
434
435#ifdef __REACTOS__
437#else
438static void CALLBACK threadpool_worker_proc( void *param );
439#endif
440static void tp_object_submit( struct threadpool_object *object, BOOL signaled );
441static void tp_object_execute( struct threadpool_object *object, BOOL wait_thread );
442static void tp_object_prepare_shutdown( struct threadpool_object *object );
443static BOOL tp_object_release( struct threadpool_object *object );
445
446static BOOL array_reserve(void **elements, unsigned int *capacity, unsigned int count, unsigned int size)
447{
448 unsigned int new_capacity, max_capacity;
449 void *new_elements;
450
451 if (count <= *capacity)
452 return TRUE;
453
454 max_capacity = ~(SIZE_T)0 / size;
455 if (count > max_capacity)
456 return FALSE;
457
458 new_capacity = max(4, *capacity);
459 while (new_capacity < count && new_capacity <= max_capacity / 2)
460 new_capacity *= 2;
461 if (new_capacity < count)
462 new_capacity = max_capacity;
463
464 if (!(new_elements = RtlReAllocateHeap( GetProcessHeap(), 0, *elements, new_capacity * size )))
465 return FALSE;
466
467 *elements = new_elements;
468 *capacity = new_capacity;
469
470 return TRUE;
471}
472
473static void set_thread_name(const WCHAR *name)
474{
475#ifndef __REACTOS__ // This is impossible on non vista+
477
478 RtlInitUnicodeString(&info.ThreadName, name);
480#endif
481}
482
483#ifndef __REACTOS__
485{
486 struct rtl_work_item *item = userdata;
487
488 TRACE("executing %p(%p)\n", item->function, item->context);
489 item->function( item->context );
490
492}
493
494/***********************************************************************
495 * RtlQueueWorkItem (NTDLL.@)
496 *
497 * Queues a work item into a thread in the thread pool.
498 *
499 * PARAMS
500 * function [I] Work function to execute.
501 * context [I] Context to pass to the work function when it is executed.
502 * flags [I] Flags. See notes.
503 *
504 * RETURNS
505 * Success: STATUS_SUCCESS.
506 * Failure: Any NTSTATUS code.
507 *
508 * NOTES
509 * Flags can be one or more of the following:
510 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
511 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
512 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
513 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
514 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
515 */
517{
518 TP_CALLBACK_ENVIRON environment;
519 struct rtl_work_item *item;
521
522 TRACE( "%p %p %lu\n", function, context, flags );
523
524 item = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*item) );
525 if (!item)
526 return STATUS_NO_MEMORY;
527
528 memset( &environment, 0, sizeof(environment) );
529 environment.Version = 1;
530 environment.u.s.LongFunction = (flags & WT_EXECUTELONGFUNCTION) != 0;
531 environment.u.s.Persistent = (flags & WT_EXECUTEINPERSISTENTTHREAD) != 0;
532
533 item->function = function;
534 item->context = context;
535
538 return status;
539}
540
541/***********************************************************************
542 * iocp_poller - get completion events and run callbacks
543 */
545{
546 HANDLE cport = Arg;
547
548 while( TRUE )
549 {
553#ifdef __REACTOS__
555#else
557#endif
558 if (res)
559 {
560 ERR("NtRemoveIoCompletion failed: 0x%lx\n", res);
561 }
562 else
563 {
564 DWORD transferred = 0;
565 DWORD err = 0;
566
568 transferred = iosb.Information;
569 else
571
572 callback( err, transferred, overlapped );
573 }
574 }
575 return 0;
576}
577
578/***********************************************************************
579 * RtlSetIoCompletionCallback (NTDLL.@)
580 *
581 * Binds a handle to a thread pool's completion port, and possibly
582 * starts a non-I/O thread to monitor this port and call functions back.
583 *
584 * PARAMS
585 * FileHandle [I] Handle to bind to a completion port.
586 * Function [I] Callback function to call on I/O completions.
587 * Flags [I] Not used.
588 *
589 * RETURNS
590 * Success: STATUS_SUCCESS.
591 * Failure: Any NTSTATUS code.
592 *
593 */
595{
598
599 if (Flags) FIXME("Unknown value Flags=0x%lx\n", Flags);
600
601 if (!old_threadpool.compl_port)
602 {
604
605 RtlEnterCriticalSection(&old_threadpool.threadpool_compl_cs);
606 if (!old_threadpool.compl_port)
607 {
608 HANDLE cport;
609
611 if (!res)
612 {
613 /* FIXME native can start additional threads in case of e.g. hung callback function. */
615 if (!res)
616 old_threadpool.compl_port = cport;
617 else
618 NtClose( cport );
619 }
620 }
621 RtlLeaveCriticalSection(&old_threadpool.threadpool_compl_cs);
622 if (res) return res;
623 }
624
625 info.CompletionPort = old_threadpool.compl_port;
626 info.CompletionKey = (ULONG_PTR)Function;
627
629}
630#endif
631
633{
634 if (timeout == INFINITE) return NULL;
635 pTime->QuadPart = (ULONGLONG)timeout * -10000;
636 return pTime;
637}
638
639/************************** Timer Queue Impl **************************/
640
641static void queue_remove_timer(struct queue_timer *t)
642{
643 /* We MUST hold the queue cs while calling this function. This ensures
644 that we cannot queue another callback for this timer. The runcount
645 being zero makes sure we don't have any already queued. */
646 struct timer_queue *q = t->q;
647
648 assert(t->runcount == 0);
649 assert(t->destroy);
650
651 list_remove(&t->entry);
652 if (t->event)
653 NtSetEvent(t->event, NULL);
655
656 if (q->quit && list_empty(&q->timers))
657 NtSetEvent(q->event, NULL);
658}
659
661{
662 struct timer_queue *q = t->q;
664
665 assert(0 < t->runcount);
666 --t->runcount;
667
668 if (t->destroy && t->runcount == 0)
670
672}
673
675{
676 struct queue_timer *t = p;
677 t->callback(t->param, TRUE);
679 return 0;
680}
681
682static inline ULONGLONG queue_current_time(void)
683{
684 LARGE_INTEGER now, freq;
686 return now.QuadPart * 1000 / freq.QuadPart;
687}
688
691{
692 /* We MUST hold the queue cs while calling this function. */
693 struct timer_queue *q = t->q;
694 struct list *ptr = &q->timers;
695
696 assert(!q->quit || (t->destroy && time == EXPIRE_NEVER));
697
698 if (time != EXPIRE_NEVER)
699 LIST_FOR_EACH(ptr, &q->timers)
700 {
701 struct queue_timer *cur = LIST_ENTRY(ptr, struct queue_timer, entry);
702 if (time < cur->expire)
703 break;
704 }
705 list_add_before(ptr, &t->entry);
706
707 t->expire = time;
708
709 /* If we insert at the head of the list, we need to expire sooner
710 than expected. */
711 if (set_event && &t->entry == list_head(&q->timers))
712 NtSetEvent(q->event, NULL);
713}
714
715static inline void queue_move_timer(struct queue_timer *t, ULONGLONG time,
717{
718 /* We MUST hold the queue cs while calling this function. */
719 list_remove(&t->entry);
721}
722
723static void queue_timer_expire(struct timer_queue *q)
724{
725 struct queue_timer *t = NULL;
726
728 if (list_head(&q->timers))
729 {
731 t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
732 if (!t->destroy && t->expire <= ((now = queue_current_time())))
733 {
734 ++t->runcount;
735 if (t->period)
736 {
737 next = t->expire + t->period;
738 /* avoid trigger cascade if overloaded / hibernated */
739 if (next < now)
740 next = now + t->period;
741 }
742 else
745 }
746 else
747 t = NULL;
748 }
750
751 if (t)
752 {
753 if (t->flags & WT_EXECUTEINTIMERTHREAD)
755 else
756 {
758 = (t->flags
762 if (status != STATUS_SUCCESS)
764 }
765 }
766}
767
769{
770 struct queue_timer *t;
772
774 if (list_head(&q->timers))
775 {
776 t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
777 assert(!t->destroy || t->expire == EXPIRE_NEVER);
778
779 if (t->expire != EXPIRE_NEVER)
780 {
782 timeout = t->expire < time ? 0 : t->expire - time;
783 }
784 }
786
787 return timeout;
788}
789
790#ifdef __REACTOS__
792#else
794#endif
795{
796 struct timer_queue *q = p;
797 ULONG timeout_ms;
798
799 set_thread_name(L"wine_threadpool_timer_queue");
800 timeout_ms = INFINITE;
801 for (;;)
802 {
805 BOOL done = FALSE;
806
808 q->event, FALSE, get_nt_timeout(&timeout, timeout_ms));
809
810 if (status == STATUS_WAIT_0)
811 {
812 /* There are two possible ways to trigger the event. Either
813 we are quitting and the last timer got removed, or a new
814 timer got put at the head of the list so we need to adjust
815 our timeout. */
817 if (q->quit && list_empty(&q->timers))
818 done = TRUE;
820 }
821 else if (status == STATUS_TIMEOUT)
823
824 if (done)
825 break;
826
827 timeout_ms = queue_get_timeout(q);
828 }
829
830 NtClose(q->event);
832 q->magic = 0;
835#ifdef __REACTOS__
836 return STATUS_SUCCESS;
837#endif
838}
839
840#ifndef __REACTOS__
841
843{
844 /* We MUST hold the queue cs while calling this function. */
845 t->destroy = TRUE;
846 if (t->runcount == 0)
847 /* Ensure a timer is promptly removed. If callbacks are pending,
848 it will be removed after the last one finishes by the callback
849 cleanup wrapper. */
851 else
852 /* Make sure no destroyed timer masks an active timer at the head
853 of the sorted list. */
855}
856
857/***********************************************************************
858 * RtlCreateTimerQueue (NTDLL.@)
859 *
860 * Creates a timer queue object and returns a handle to it.
861 *
862 * PARAMS
863 * NewTimerQueue [O] The newly created queue.
864 *
865 * RETURNS
866 * Success: STATUS_SUCCESS.
867 * Failure: Any NTSTATUS code.
868 */
870{
872 struct timer_queue *q = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *q);
873 if (!q)
874 return STATUS_NO_MEMORY;
875
877 list_init(&q->timers);
878 q->quit = FALSE;
879 q->magic = TIMER_QUEUE_MAGIC;
881 if (status != STATUS_SUCCESS)
882 {
884 return status;
885 }
887 timer_queue_thread_proc, q, &q->thread, NULL);
888 if (status != STATUS_SUCCESS)
889 {
890 NtClose(q->event);
892 return status;
893 }
894
895 *NewTimerQueue = q;
896 return STATUS_SUCCESS;
897}
898
899/***********************************************************************
900 * RtlDeleteTimerQueueEx (NTDLL.@)
901 *
902 * Deletes a timer queue object.
903 *
904 * PARAMS
905 * TimerQueue [I] The timer queue to destroy.
906 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
907 * wait until all timers are finished firing before
908 * returning. Otherwise, return immediately and set the
909 * event when all timers are done.
910 *
911 * RETURNS
912 * Success: STATUS_SUCCESS if synchronous, STATUS_PENDING if not.
913 * Failure: Any NTSTATUS code.
914 */
916{
917 struct timer_queue *q = TimerQueue;
918 struct queue_timer *t, *temp;
921
922 if (!q || q->magic != TIMER_QUEUE_MAGIC)
924
925 thread = q->thread;
926
928 q->quit = TRUE;
929 if (list_head(&q->timers))
930 /* When the last timer is removed, it will signal the timer thread to
931 exit... */
932 LIST_FOR_EACH_ENTRY_SAFE(t, temp, &q->timers, struct queue_timer, entry)
934 else
935 /* However if we have none, we must do it ourselves. */
936 NtSetEvent(q->event, NULL);
938
939 if (CompletionEvent == INVALID_HANDLE_VALUE)
940 {
943 }
944 else
945 {
946 if (CompletionEvent)
947 {
948 FIXME("asynchronous return on completion event unimplemented\n");
950 NtSetEvent(CompletionEvent, NULL);
951 }
953 }
954
956 return status;
957}
958
959static struct timer_queue *get_timer_queue(HANDLE TimerQueue)
960{
961 static struct timer_queue *default_timer_queue;
962
963 if (TimerQueue)
964 return TimerQueue;
965 else
966 {
967 if (!default_timer_queue)
968 {
969 HANDLE q;
971 if (status == STATUS_SUCCESS)
972 {
973 PVOID p = InterlockedCompareExchangePointer( (void **) &default_timer_queue, q, NULL );
974 if (p)
975 /* Got beat to the punch. */
977 }
978 }
979 return default_timer_queue;
980 }
981}
982
983/***********************************************************************
984 * RtlCreateTimer (NTDLL.@)
985 *
986 * Creates a new timer associated with the given queue.
987 *
988 * PARAMS
989 * TimerQueue [I] The queue to hold the timer.
990 * NewTimer [O] The newly created timer.
991 * Callback [I] The callback to fire.
992 * Parameter [I] The argument for the callback.
993 * DueTime [I] The delay, in milliseconds, before first firing the
994 * timer.
995 * Period [I] The period, in milliseconds, at which to fire the timer
996 * after the first callback. If zero, the timer will only
997 * fire once. It still needs to be deleted with
998 * RtlDeleteTimer.
999 * Flags [I] Flags controlling the execution of the callback. In
1000 * addition to the WT_* thread pool flags (see
1001 * RtlQueueWorkItem), WT_EXECUTEINTIMERTHREAD and
1002 * WT_EXECUTEONLYONCE are supported.
1003 *
1004 * RETURNS
1005 * Success: STATUS_SUCCESS.
1006 * Failure: Any NTSTATUS code.
1007 */
1011 ULONG Flags)
1012{
1014 struct queue_timer *t;
1015 struct timer_queue *q = get_timer_queue(TimerQueue);
1016
1017 if (!q) return STATUS_NO_MEMORY;
1018 if (q->magic != TIMER_QUEUE_MAGIC) return STATUS_INVALID_HANDLE;
1019
1020 t = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *t);
1021 if (!t)
1022 return STATUS_NO_MEMORY;
1023
1024 t->q = q;
1025 t->runcount = 0;
1026 t->callback = Callback;
1027 t->param = Parameter;
1028 t->period = Period;
1029 t->flags = Flags;
1030 t->destroy = FALSE;
1031 t->event = NULL;
1032
1035 if (q->quit)
1037 else
1040
1041 if (status == STATUS_SUCCESS)
1042 *NewTimer = t;
1043 else
1045
1046 return status;
1047}
1048
1049/***********************************************************************
1050 * RtlUpdateTimer (NTDLL.@)
1051 *
1052 * Changes the time at which a timer expires.
1053 *
1054 * PARAMS
1055 * TimerQueue [I] The queue that holds the timer.
1056 * Timer [I] The timer to update.
1057 * DueTime [I] The delay, in milliseconds, before next firing the timer.
1058 * Period [I] The period, in milliseconds, at which to fire the timer
1059 * after the first callback. If zero, the timer will not
1060 * refire once. It still needs to be deleted with
1061 * RtlDeleteTimer.
1062 *
1063 * RETURNS
1064 * Success: STATUS_SUCCESS.
1065 * Failure: Any NTSTATUS code.
1066 */
1069{
1070 struct queue_timer *t = Timer;
1071 struct timer_queue *q = t->q;
1072
1074 /* Can't change a timer if it was once-only or destroyed. */
1075 if (t->expire != EXPIRE_NEVER)
1076 {
1077 t->period = Period;
1079 }
1081
1082 return STATUS_SUCCESS;
1083}
1084
1085/***********************************************************************
1086 * RtlDeleteTimer (NTDLL.@)
1087 *
1088 * Cancels a timer-queue timer.
1089 *
1090 * PARAMS
1091 * TimerQueue [I] The queue that holds the timer.
1092 * Timer [I] The timer to update.
1093 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
1094 * wait until the timer is finished firing all pending
1095 * callbacks before returning. Otherwise, return
1096 * immediately and set the timer is done.
1097 *
1098 * RETURNS
1099 * Success: STATUS_SUCCESS if the timer is done, STATUS_PENDING if not,
1100 or if the completion event is NULL.
1101 * Failure: Any NTSTATUS code.
1102 */
1104 HANDLE CompletionEvent)
1105{
1106 struct queue_timer *t = Timer;
1107 struct timer_queue *q;
1109 HANDLE event = NULL;
1110
1111 if (!Timer)
1113 q = t->q;
1114 if (CompletionEvent == INVALID_HANDLE_VALUE)
1115 {
1117 if (status == STATUS_SUCCESS)
1119 }
1120 else if (CompletionEvent)
1121 event = CompletionEvent;
1122
1124 t->event = event;
1125 if (t->runcount == 0 && event)
1129
1130 if (CompletionEvent == INVALID_HANDLE_VALUE && event)
1131 {
1132 if (status == STATUS_PENDING)
1133 {
1136 }
1137 NtClose(event);
1138 }
1139
1140 return status;
1141}
1142#endif
1143/***********************************************************************
1144 * timerqueue_thread_proc (internal)
1145 */
1146#ifdef __REACTOS__
1148#else
1150#endif
1151{
1152 ULONGLONG timeout_lower, timeout_upper, new_timeout;
1153 struct threadpool_object *other_timer;
1155 struct list *ptr;
1156
1157 TRACE( "starting timer queue thread\n" );
1158 set_thread_name(L"wine_threadpool_timerqueue");
1159
1161 for (;;)
1162 {
1164
1165 /* Check for expired timers. */
1166 while ((ptr = list_head( &timerqueue.pending_timers )))
1167 {
1168 struct threadpool_object *timer = LIST_ENTRY( ptr, struct threadpool_object, u.timer.timer_entry );
1169 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1170 assert( timer->u.timer.timer_pending );
1171 if (timer->u.timer.timeout > now.QuadPart)
1172 break;
1173
1174 /* Queue a new callback in one of the worker threads. */
1175 list_remove( &timer->u.timer.timer_entry );
1176 timer->u.timer.timer_pending = FALSE;
1178
1179 /* Insert the timer back into the queue, except it's marked for shutdown. */
1180 if (timer->u.timer.period && !timer->shutdown)
1181 {
1182 timer->u.timer.timeout += (ULONGLONG)timer->u.timer.period * 10000;
1183 if (timer->u.timer.timeout <= now.QuadPart)
1184 timer->u.timer.timeout = now.QuadPart + 1;
1185
1186 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
1187 struct threadpool_object, u.timer.timer_entry )
1188 {
1189 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
1190 if (timer->u.timer.timeout < other_timer->u.timer.timeout)
1191 break;
1192 }
1193 list_add_before( &other_timer->u.timer.timer_entry, &timer->u.timer.timer_entry );
1194 timer->u.timer.timer_pending = TRUE;
1195 }
1196 }
1197
1198 timeout_lower = timeout_upper = MAXLONGLONG;
1199
1200 /* Determine next timeout and use the window length to optimize wakeup times. */
1201 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
1202 struct threadpool_object, u.timer.timer_entry )
1203 {
1204 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
1205 if (other_timer->u.timer.timeout >= timeout_upper)
1206 break;
1207
1208 timeout_lower = other_timer->u.timer.timeout;
1209 new_timeout = timeout_lower + (ULONGLONG)other_timer->u.timer.window_length * 10000;
1210 if (new_timeout < timeout_upper)
1211 timeout_upper = new_timeout;
1212 }
1213
1214 /* Wait for timer update events or until the next timer expires. */
1215 if (timerqueue.objcount)
1216 {
1217 timeout.QuadPart = timeout_lower;
1219 continue;
1220 }
1221
1222 /* All timers have been destroyed, if no new timers are created
1223 * within some amount of time, then we can shutdown this thread. */
1224 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
1225 if (RtlSleepConditionVariableCS( &timerqueue.update_event, &timerqueue.cs,
1226 &timeout ) == STATUS_TIMEOUT && !timerqueue.objcount)
1227 {
1228 break;
1229 }
1230 }
1231
1232 timerqueue.thread_running = FALSE;
1234
1235 TRACE( "terminating timer queue thread\n" );
1236 RtlExitUserThread( 0 );
1237#ifdef __REACTOS__
1238 return STATUS_SUCCESS;
1239#endif
1240}
1241
1242/***********************************************************************
1243 * tp_new_worker_thread (internal)
1244 *
1245 * Create and account a new worker thread for the desired pool.
1246 */
1248{
1249 HANDLE thread;
1251
1253 pool->stack_info.StackReserve, pool->stack_info.StackCommit,
1255 if (status == STATUS_SUCCESS)
1256 {
1257 InterlockedIncrement( &pool->refcount );
1258 pool->num_workers++;
1259 NtClose( thread );
1260 }
1261 return status;
1262}
1263
1264/***********************************************************************
1265 * tp_timerqueue_lock (internal)
1266 *
1267 * Acquires a lock on the global timerqueue. When the lock is acquired
1268 * successfully, it is guaranteed that the timer thread is running.
1269 */
1271{
1273 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1274
1275 timer->u.timer.timer_initialized = FALSE;
1276 timer->u.timer.timer_pending = FALSE;
1277 timer->u.timer.timer_set = FALSE;
1278 timer->u.timer.timeout = 0;
1279 timer->u.timer.period = 0;
1280 timer->u.timer.window_length = 0;
1281
1283
1284 /* Make sure that the timerqueue thread is running. */
1285 if (!timerqueue.thread_running)
1286 {
1287 HANDLE thread;
1290 if (status == STATUS_SUCCESS)
1291 {
1292 timerqueue.thread_running = TRUE;
1293 NtClose( thread );
1294 }
1295 }
1296
1297 if (status == STATUS_SUCCESS)
1298 {
1299 timer->u.timer.timer_initialized = TRUE;
1300 timerqueue.objcount++;
1301 }
1302
1304 return status;
1305}
1306
1307/***********************************************************************
1308 * tp_timerqueue_unlock (internal)
1309 *
1310 * Releases a lock on the global timerqueue.
1311 */
1313{
1314 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1315
1317 if (timer->u.timer.timer_initialized)
1318 {
1319 /* If timer was pending, remove it. */
1320 if (timer->u.timer.timer_pending)
1321 {
1322 list_remove( &timer->u.timer.timer_entry );
1323 timer->u.timer.timer_pending = FALSE;
1324 }
1325
1326 /* If the last timer object was destroyed, then wake up the thread. */
1327 if (!--timerqueue.objcount)
1328 {
1329 assert( list_empty( &timerqueue.pending_timers ) );
1330 RtlWakeAllConditionVariable( &timerqueue.update_event );
1331 }
1332
1333 timer->u.timer.timer_initialized = FALSE;
1334 }
1336}
1337
1338/***********************************************************************
1339 * waitqueue_thread_proc (internal)
1340 */
1341#ifdef __REACTOS__
1343#else
1345#endif
1346{
1349 struct waitqueue_bucket *bucket = param;
1350 struct threadpool_object *wait, *next;
1352 DWORD num_handles;
1354
1355 TRACE( "starting wait queue thread\n" );
1356 set_thread_name(L"wine_threadpool_waitqueue");
1357
1359
1360 for (;;)
1361 {
1363 timeout.QuadPart = MAXLONGLONG;
1364 num_handles = 0;
1365
1367 u.wait.wait_entry )
1368 {
1369 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1370 if (wait->u.wait.timeout <= now.QuadPart)
1371 {
1372 /* Wait object timed out. */
1373 if ((wait->u.wait.flags & WT_EXECUTEONLYONCE))
1374 {
1375 list_remove( &wait->u.wait.wait_entry );
1376 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1377 }
1378 if ((wait->u.wait.flags & (WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD)))
1379 {
1380 InterlockedIncrement( &wait->refcount );
1381 wait->num_pending_callbacks++;
1382 RtlEnterCriticalSection( &wait->pool->cs );
1384 RtlLeaveCriticalSection( &wait->pool->cs );
1386 }
1387 else tp_object_submit( wait, FALSE );
1388 }
1389 else
1390 {
1391 if (wait->u.wait.timeout < timeout.QuadPart)
1392 timeout.QuadPart = wait->u.wait.timeout;
1393
1394 assert( num_handles < MAXIMUM_WAITQUEUE_OBJECTS );
1395 InterlockedIncrement( &wait->refcount );
1396 objects[num_handles] = wait;
1397 handles[num_handles] = wait->u.wait.handle;
1398 num_handles++;
1399 }
1400 }
1401
1402 if (!bucket->objcount)
1403 {
1404 /* All wait objects have been destroyed, if no new wait objects are created
1405 * within some amount of time, then we can shutdown this thread. */
1406 assert( num_handles == 0 );
1408 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
1409 status = NtWaitForMultipleObjects( 1, &bucket->update_event, TRUE, bucket->alertable, &timeout );
1411
1412 if (status == STATUS_TIMEOUT && !bucket->objcount)
1413 break;
1414 }
1415 else
1416 {
1417 handles[num_handles] = bucket->update_event;
1419 status = NtWaitForMultipleObjects( num_handles + 1, handles, TRUE, bucket->alertable, &timeout );
1421
1422 if (status >= STATUS_WAIT_0 && status < STATUS_WAIT_0 + num_handles)
1423 {
1425 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1426 if (wait->u.wait.bucket)
1427 {
1428 /* Wait object signaled. */
1429 assert( wait->u.wait.bucket == bucket );
1430 if ((wait->u.wait.flags & WT_EXECUTEONLYONCE))
1431 {
1432 list_remove( &wait->u.wait.wait_entry );
1433 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1434 }
1435 if ((wait->u.wait.flags & (WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD)))
1436 {
1437 wait->u.wait.signaled++;
1438 wait->num_pending_callbacks++;
1439 RtlEnterCriticalSection( &wait->pool->cs );
1441 RtlLeaveCriticalSection( &wait->pool->cs );
1442 }
1443 else tp_object_submit( wait, TRUE );
1444 }
1445 else
1446 WARN("wait object %p triggered while object was destroyed\n", wait);
1447 }
1448
1449 /* Release temporary references to wait objects. */
1450 while (num_handles)
1451 {
1452 wait = objects[--num_handles];
1453 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1455 }
1456 }
1457
1458 /* Try to merge bucket with other threads. */
1459 if (waitqueue.num_buckets > 1 && bucket->objcount &&
1460 bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 1 / 3)
1461 {
1462 struct waitqueue_bucket *other_bucket;
1463 LIST_FOR_EACH_ENTRY( other_bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
1464 {
1465 if (other_bucket != bucket && other_bucket->objcount && other_bucket->alertable == bucket->alertable &&
1466 other_bucket->objcount + bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 2 / 3)
1467 {
1468 other_bucket->objcount += bucket->objcount;
1469 bucket->objcount = 0;
1470
1471 /* Update reserved list. */
1472 LIST_FOR_EACH_ENTRY( wait, &bucket->reserved, struct threadpool_object, u.wait.wait_entry )
1473 {
1474 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1475 wait->u.wait.bucket = other_bucket;
1476 }
1477 list_move_tail( &other_bucket->reserved, &bucket->reserved );
1478
1479 /* Update waiting list. */
1480 LIST_FOR_EACH_ENTRY( wait, &bucket->waiting, struct threadpool_object, u.wait.wait_entry )
1481 {
1482 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1483 wait->u.wait.bucket = other_bucket;
1484 }
1485 list_move_tail( &other_bucket->waiting, &bucket->waiting );
1486
1487 /* Move bucket to the end, to keep the probability of
1488 * newly added wait objects as small as possible. */
1489 list_remove( &bucket->bucket_entry );
1490 list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
1491
1492 NtSetEvent( other_bucket->update_event, NULL );
1493 break;
1494 }
1495 }
1496 }
1497 }
1498
1499 /* Remove this bucket from the list. */
1500 list_remove( &bucket->bucket_entry );
1501 if (!--waitqueue.num_buckets)
1502 assert( list_empty( &waitqueue.buckets ) );
1503
1505
1506 TRACE( "terminating wait queue thread\n" );
1507
1508 assert( bucket->objcount == 0 );
1509 assert( list_empty( &bucket->reserved ) );
1510 assert( list_empty( &bucket->waiting ) );
1511 NtClose( bucket->update_event );
1512
1513 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1514 RtlExitUserThread( 0 );
1515}
1516
1517/***********************************************************************
1518 * tp_waitqueue_lock (internal)
1519 */
1521{
1522 struct waitqueue_bucket *bucket;
1524 HANDLE thread;
1525 BOOL alertable = (wait->u.wait.flags & WT_EXECUTEINIOTHREAD) != 0;
1526 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1527
1528 wait->u.wait.signaled = 0;
1529 wait->u.wait.bucket = NULL;
1530 wait->u.wait.wait_pending = FALSE;
1531 wait->u.wait.timeout = 0;
1532 wait->u.wait.handle = INVALID_HANDLE_VALUE;
1533
1535
1536 /* Try to assign to existing bucket if possible. */
1537 LIST_FOR_EACH_ENTRY( bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
1538 {
1539 if (bucket->objcount < MAXIMUM_WAITQUEUE_OBJECTS && bucket->alertable == alertable)
1540 {
1541 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1542 wait->u.wait.bucket = bucket;
1543 bucket->objcount++;
1544
1546 goto out;
1547 }
1548 }
1549
1550 /* Create a new bucket and corresponding worker thread. */
1551 bucket = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*bucket) );
1552 if (!bucket)
1553 {
1555 goto out;
1556 }
1557
1558 bucket->objcount = 0;
1559 bucket->alertable = alertable;
1560 list_init( &bucket->reserved );
1561 list_init( &bucket->waiting );
1562
1565 if (status)
1566 {
1567 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1568 goto out;
1569 }
1570
1573 if (status == STATUS_SUCCESS)
1574 {
1575 list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
1576 waitqueue.num_buckets++;
1577
1578 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1579 wait->u.wait.bucket = bucket;
1580 bucket->objcount++;
1581
1582 NtClose( thread );
1583 }
1584 else
1585 {
1586 NtClose( bucket->update_event );
1587 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1588 }
1589
1590out:
1592 return status;
1593}
1594
1595/***********************************************************************
1596 * tp_waitqueue_unlock (internal)
1597 */
1598static void tp_waitqueue_unlock( struct threadpool_object *wait )
1599{
1600 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1601
1603 if (wait->u.wait.bucket)
1604 {
1605 struct waitqueue_bucket *bucket = wait->u.wait.bucket;
1606 assert( bucket->objcount > 0 );
1607
1608 list_remove( &wait->u.wait.wait_entry );
1609 wait->u.wait.bucket = NULL;
1610 bucket->objcount--;
1611
1612 NtSetEvent( bucket->update_event, NULL );
1613 }
1615}
1616
1617#ifdef __REACTOS__
1619#else
1621#endif
1622{
1623 struct io_completion *completion;
1624 struct threadpool_object *io;
1626#ifdef __REACTOS__
1627 PVOID key, value;
1628#else
1630#endif
1631 BOOL destroy, skip;
1633
1634 TRACE( "starting I/O completion thread\n" );
1635 set_thread_name(L"wine_threadpool_ioqueue");
1636
1638
1639 for (;;)
1640 {
1642 if ((status = NtRemoveIoCompletion( ioqueue.port, &key, &value, &iosb, NULL )))
1643 ERR("NtRemoveIoCompletion failed, status %#lx.\n", status);
1645
1646 destroy = skip = FALSE;
1647 io = (struct threadpool_object *)key;
1648
1649 TRACE( "io %p, iosb.Status %#lx.\n", io, iosb.Status );
1650
1651 if (io && (io->shutdown || io->u.io.shutting_down))
1652 {
1653 RtlEnterCriticalSection( &io->pool->cs );
1654 if (!io->u.io.pending_count)
1655 {
1656 if (io->u.io.skipped_count)
1657 --io->u.io.skipped_count;
1658
1659 if (io->u.io.skipped_count)
1660 skip = TRUE;
1661 else
1662 destroy = TRUE;
1663 }
1664 RtlLeaveCriticalSection( &io->pool->cs );
1665 if (skip) continue;
1666 }
1667
1668 if (destroy)
1669 {
1670 --ioqueue.objcount;
1671 TRACE( "Releasing io %p.\n", io );
1672 io->shutdown = TRUE;
1674 }
1675 else if (io)
1676 {
1677 RtlEnterCriticalSection( &io->pool->cs );
1678
1679 TRACE( "pending_count %u.\n", io->u.io.pending_count );
1680
1681 if (io->u.io.pending_count)
1682 {
1683 --io->u.io.pending_count;
1684 if (!array_reserve((void **)&io->u.io.completions, &io->u.io.completion_max,
1685 io->u.io.completion_count + 1, sizeof(*io->u.io.completions)))
1686 {
1687 ERR( "Failed to allocate memory.\n" );
1688 RtlLeaveCriticalSection( &io->pool->cs );
1689 continue;
1690 }
1691
1692 completion = &io->u.io.completions[io->u.io.completion_count++];
1693 completion->iosb = iosb;
1694#ifdef __REACTOS__
1695 completion->cvalue = (ULONG_PTR)value;
1696#else
1697 completion->cvalue = value;
1698#endif
1699
1701 }
1702 RtlLeaveCriticalSection( &io->pool->cs );
1703 }
1704
1705 if (!ioqueue.objcount)
1706 {
1707 /* All I/O objects have been destroyed; if no new objects are
1708 * created within some amount of time, then we can shutdown this
1709 * thread. */
1710 LARGE_INTEGER timeout = {.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000};
1711 if (RtlSleepConditionVariableCS( &ioqueue.update_event, &ioqueue.cs,
1712 &timeout) == STATUS_TIMEOUT && !ioqueue.objcount)
1713 break;
1714 }
1715 }
1716
1717 ioqueue.thread_running = FALSE;
1719
1720 TRACE( "terminating I/O completion thread\n" );
1721
1722 RtlExitUserThread( 0 );
1723
1724#ifdef __REACTOS__
1725 return STATUS_SUCCESS;
1726#endif
1727}
1728
1730{
1732
1733 assert( io->type == TP_OBJECT_TYPE_IO );
1734
1736
1737 if (!ioqueue.port && (status = NtCreateIoCompletion( &ioqueue.port,
1739 {
1741 return status;
1742 }
1743
1744 if (!ioqueue.thread_running)
1745 {
1746 HANDLE thread;
1747
1749 0, 0, 0, ioqueue_thread_proc, NULL, &thread, NULL )))
1750 {
1751 ioqueue.thread_running = TRUE;
1752 NtClose( thread );
1753 }
1754 }
1755
1756 if (status == STATUS_SUCCESS)
1757 {
1760
1761#ifdef __REACTOS__
1762 info.Port = ioqueue.port;
1763 info.Key = io;
1764#else
1765 info.CompletionPort = ioqueue.port;
1766 info.CompletionKey = (ULONG_PTR)io;
1767#endif
1768
1770 }
1771
1772 if (status == STATUS_SUCCESS)
1773 {
1774 if (!ioqueue.objcount++)
1775 RtlWakeConditionVariable( &ioqueue.update_event );
1776 }
1777
1779 return status;
1780}
1781
1782/***********************************************************************
1783 * tp_threadpool_alloc (internal)
1784 *
1785 * Allocates a new threadpool object.
1786 */
1788{
1789#ifdef __REACTOS__
1790 IMAGE_NT_HEADERS *nt = RtlImageNtHeader( NtCurrentTeb()->ProcessEnvironmentBlock->ImageBaseAddress );
1791#else
1793#endif
1794 struct threadpool *pool;
1795 unsigned int i;
1796
1797 pool = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*pool) );
1798 if (!pool)
1799 return STATUS_NO_MEMORY;
1800
1801 pool->refcount = 1;
1802 pool->objcount = 0;
1803 pool->shutdown = FALSE;
1804
1805#ifdef __REACTOS__
1807#else
1809
1810 pool->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool.cs");
1811#endif
1812
1813 for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
1814 list_init( &pool->pools[i] );
1815 RtlInitializeConditionVariable( &pool->update_event );
1816
1817 pool->max_workers = 500;
1818 pool->min_workers = 0;
1819 pool->num_workers = 0;
1820 pool->num_busy_workers = 0;
1821 pool->stack_info.StackReserve = nt->OptionalHeader.SizeOfStackReserve;
1822 pool->stack_info.StackCommit = nt->OptionalHeader.SizeOfStackCommit;
1823
1824 TRACE( "allocated threadpool %p\n", pool );
1825
1826 *out = pool;
1827 return STATUS_SUCCESS;
1828}
1829
1830/***********************************************************************
1831 * tp_threadpool_shutdown (internal)
1832 *
1833 * Prepares the shutdown of a threadpool object and notifies all worker
1834 * threads to terminate (after all remaining work items have been
1835 * processed).
1836 */
1838{
1840
1841 pool->shutdown = TRUE;
1842 RtlWakeAllConditionVariable( &pool->update_event );
1843}
1844
1845/***********************************************************************
1846 * tp_threadpool_release (internal)
1847 *
1848 * Releases a reference to a threadpool object.
1849 */
1851{
1852 unsigned int i;
1853
1854 if (InterlockedDecrement( &pool->refcount ))
1855 return FALSE;
1856
1857 TRACE( "destroying threadpool %p\n", pool );
1858
1859 assert( pool->shutdown );
1860 assert( !pool->objcount );
1861 for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
1862 assert( list_empty( &pool->pools[i] ) );
1863#ifndef __REACTOS__
1864 pool->cs.DebugInfo->Spare[0] = 0;
1865#endif
1867
1869 return TRUE;
1870}
1871
1872/***********************************************************************
1873 * tp_threadpool_lock (internal)
1874 *
1875 * Acquires a lock on a threadpool, specified with an TP_CALLBACK_ENVIRON
1876 * block. When the lock is acquired successfully, it is guaranteed that
1877 * there is at least one worker thread to process tasks.
1878 */
1880{
1881 struct threadpool *pool = NULL;
1883
1884 if (environment)
1885 {
1886#ifndef __REACTOS__ //Windows 7 stuff
1887 /* Validate environment parameters. */
1888 if (environment->Version == 3)
1889 {
1890 TP_CALLBACK_ENVIRON_V3 *environment3 = (TP_CALLBACK_ENVIRON_V3 *)environment;
1891
1892 switch (environment3->CallbackPriority)
1893 {
1897 break;
1898 default:
1900 }
1901 }
1902#endif
1903 pool = (struct threadpool *)environment->Pool;
1904 }
1905
1906 if (!pool)
1907 {
1908 if (!default_threadpool)
1909 {
1911 if (status != STATUS_SUCCESS)
1912 return status;
1913
1915 {
1918 }
1919 }
1920
1922 }
1923
1925
1926 /* Make sure that the threadpool has at least one thread. */
1927 if (!pool->num_workers)
1929
1930 /* Keep a reference, and increment objcount to ensure that the
1931 * last thread doesn't terminate. */
1932 if (status == STATUS_SUCCESS)
1933 {
1934 InterlockedIncrement( &pool->refcount );
1935 pool->objcount++;
1936 }
1937
1939
1940 if (status != STATUS_SUCCESS)
1941 return status;
1942
1943 *out = pool;
1944 return STATUS_SUCCESS;
1945}
1946
1947/***********************************************************************
1948 * tp_threadpool_unlock (internal)
1949 *
1950 * Releases a lock on a threadpool.
1951 */
1953{
1955 pool->objcount--;
1958}
1959
1960/***********************************************************************
1961 * tp_group_alloc (internal)
1962 *
1963 * Allocates a new threadpool group object.
1964 */
1966{
1967 struct threadpool_group *group;
1968
1969 group = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*group) );
1970 if (!group)
1971 return STATUS_NO_MEMORY;
1972
1973 group->refcount = 1;
1974 group->shutdown = FALSE;
1975
1976#ifdef __REACTOS__
1978#else
1980
1981 group->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool_group.cs");
1982#endif
1983
1984 list_init( &group->members );
1985
1986 TRACE( "allocated group %p\n", group );
1987
1988 *out = group;
1989 return STATUS_SUCCESS;
1990}
1991
1992/***********************************************************************
1993 * tp_group_shutdown (internal)
1994 *
1995 * Marks the group object for shutdown.
1996 */
1998{
1999 group->shutdown = TRUE;
2000}
2001
2002/***********************************************************************
2003 * tp_group_release (internal)
2004 *
2005 * Releases a reference to a group object.
2006 */
2008{
2009 if (InterlockedDecrement( &group->refcount ))
2010 return FALSE;
2011
2012 TRACE( "destroying group %p\n", group );
2013
2014 assert( group->shutdown );
2015 assert( list_empty( &group->members ) );
2016
2017#ifndef __REACTOS__
2018 group->cs.DebugInfo->Spare[0] = 0;
2019#endif
2021
2023 return TRUE;
2024}
2025
2026/***********************************************************************
2027 * tp_object_initialize (internal)
2028 *
2029 * Initializes members of a threadpool object.
2030 */
2031static void tp_object_initialize( struct threadpool_object *object, struct threadpool *pool,
2032 PVOID userdata, TP_CALLBACK_ENVIRON *environment )
2033{
2034 BOOL is_simple_callback = (object->type == TP_OBJECT_TYPE_SIMPLE);
2035
2036 object->refcount = 1;
2037 object->shutdown = FALSE;
2038
2039 object->pool = pool;
2040 object->group = NULL;
2041 object->userdata = userdata;
2042 object->group_cancel_callback = NULL;
2043 object->finalization_callback = NULL;
2044 object->may_run_long = 0;
2045 object->race_dll = NULL;
2046 object->priority = TP_CALLBACK_PRIORITY_NORMAL;
2047
2048 memset( &object->group_entry, 0, sizeof(object->group_entry) );
2049 object->is_group_member = FALSE;
2050
2051 memset( &object->pool_entry, 0, sizeof(object->pool_entry) );
2052 RtlInitializeConditionVariable( &object->finished_event );
2053 RtlInitializeConditionVariable( &object->group_finished_event );
2054 object->completed_event = NULL;
2055 object->num_pending_callbacks = 0;
2056 object->num_running_callbacks = 0;
2057 object->num_associated_callbacks = 0;
2058
2059 if (environment)
2060 {
2061 if (environment->Version != 1 && environment->Version != 3)
2062 FIXME( "unsupported environment version %lu\n", environment->Version );
2063
2064 object->group = impl_from_TP_CLEANUP_GROUP( environment->CleanupGroup );
2065 object->group_cancel_callback = environment->CleanupGroupCancelCallback;
2066 object->finalization_callback = environment->FinalizationCallback;
2067 object->may_run_long = environment->u.s.LongFunction != 0;
2068 object->race_dll = environment->RaceDll;
2069#ifndef __REACTOS__ //Windows 7 stuff
2070 if (environment->Version == 3)
2071 {
2072 TP_CALLBACK_ENVIRON_V3 *environment_v3 = (TP_CALLBACK_ENVIRON_V3 *)environment;
2073
2074 object->priority = environment_v3->CallbackPriority;
2075 assert( object->priority < ARRAY_SIZE(pool->pools) );
2076 }
2077#endif
2078 if (environment->ActivationContext)
2079 FIXME( "activation context not supported yet\n" );
2080
2081 if (environment->u.s.Persistent)
2082 FIXME( "persistent threads not supported yet\n" );
2083 }
2084
2085 if (object->race_dll)
2086 LdrAddRefDll( 0, object->race_dll );
2087
2088 TRACE( "allocated object %p of type %u\n", object, object->type );
2089
2090 /* For simple callbacks we have to run tp_object_submit before adding this object
2091 * to the cleanup group. As soon as the cleanup group members are released ->shutdown
2092 * will be set, and tp_object_submit would fail with an assertion. */
2093
2094 if (is_simple_callback)
2095 tp_object_submit( object, FALSE );
2096
2097 if (object->group)
2098 {
2099 struct threadpool_group *group = object->group;
2100 InterlockedIncrement( &group->refcount );
2101
2103 list_add_tail( &group->members, &object->group_entry );
2104 object->is_group_member = TRUE;
2106 }
2107
2108 if (is_simple_callback)
2109 tp_object_release( object );
2110}
2111
2112static void tp_object_prio_queue( struct threadpool_object *object )
2113{
2114 ++object->pool->num_busy_workers;
2115 list_add_tail( &object->pool->pools[object->priority], &object->pool_entry );
2116}
2117
2118/***********************************************************************
2119 * tp_object_submit (internal)
2120 *
2121 * Submits a threadpool object to the associated threadpool. This
2122 * function has to be VOID because TpPostWork can never fail on Windows.
2123 */
2124static void tp_object_submit( struct threadpool_object *object, BOOL signaled )
2125{
2126 struct threadpool *pool = object->pool;
2128
2129 assert( !object->shutdown );
2130 assert( !pool->shutdown );
2131
2133
2134 /* Start new worker threads if required. */
2135 if (pool->num_busy_workers >= pool->num_workers &&
2136 pool->num_workers < pool->max_workers)
2138
2139 /* Queue work item and increment refcount. */
2140 InterlockedIncrement( &object->refcount );
2141 if (!object->num_pending_callbacks++)
2142 tp_object_prio_queue( object );
2143
2144 /* Count how often the object was signaled. */
2145 if (object->type == TP_OBJECT_TYPE_WAIT && signaled)
2146 object->u.wait.signaled++;
2147
2148 /* No new thread started - wake up one existing thread. */
2149 if (status != STATUS_SUCCESS)
2150 {
2151 assert( pool->num_workers > 0 );
2152 RtlWakeConditionVariable( &pool->update_event );
2153 }
2154
2156}
2157
2158/***********************************************************************
2159 * tp_object_cancel (internal)
2160 *
2161 * Cancels all currently pending callbacks for a specific object.
2162 */
2163static void tp_object_cancel( struct threadpool_object *object )
2164{
2165 struct threadpool *pool = object->pool;
2166 LONG pending_callbacks = 0;
2167
2169 if (object->num_pending_callbacks)
2170 {
2171 pending_callbacks = object->num_pending_callbacks;
2172 object->num_pending_callbacks = 0;
2173 list_remove( &object->pool_entry );
2174
2175 if (object->type == TP_OBJECT_TYPE_WAIT)
2176 object->u.wait.signaled = 0;
2177 }
2178 if (object->type == TP_OBJECT_TYPE_IO)
2179 {
2180 object->u.io.skipped_count += object->u.io.pending_count;
2181 object->u.io.pending_count = 0;
2182 }
2184
2185 while (pending_callbacks--)
2186 tp_object_release( object );
2187}
2188
2190{
2191 if (object->num_pending_callbacks)
2192 return FALSE;
2193 if (object->type == TP_OBJECT_TYPE_IO && object->u.io.pending_count)
2194 return FALSE;
2195
2196 if (group)
2197 return !object->num_running_callbacks;
2198 else
2199 return !object->num_associated_callbacks;
2200}
2201
2202/***********************************************************************
2203 * tp_object_wait (internal)
2204 *
2205 * Waits until all pending and running callbacks of a specific object
2206 * have been processed.
2207 */
2208static void tp_object_wait( struct threadpool_object *object, BOOL group_wait )
2209{
2210 struct threadpool *pool = object->pool;
2211
2213 while (!object_is_finished( object, group_wait ))
2214 {
2215 if (group_wait)
2216 RtlSleepConditionVariableCS( &object->group_finished_event, &pool->cs, NULL );
2217 else
2218 RtlSleepConditionVariableCS( &object->finished_event, &pool->cs, NULL );
2219 }
2221}
2222
2224{
2225 assert( io->type == TP_OBJECT_TYPE_IO );
2226
2228
2229 assert(ioqueue.objcount);
2230
2231 if (!io->shutdown && !--ioqueue.objcount)
2232 NtSetIoCompletion( ioqueue.port, 0, 0, STATUS_SUCCESS, 0 );
2233
2235}
2236
2237/***********************************************************************
2238 * tp_object_prepare_shutdown (internal)
2239 *
2240 * Prepares a threadpool object for shutdown.
2241 */
2243{
2244 if (object->type == TP_OBJECT_TYPE_TIMER)
2245 tp_timerqueue_unlock( object );
2246 else if (object->type == TP_OBJECT_TYPE_WAIT)
2247 tp_waitqueue_unlock( object );
2248 else if (object->type == TP_OBJECT_TYPE_IO)
2249 tp_ioqueue_unlock( object );
2250}
2251
2252/***********************************************************************
2253 * tp_object_release (internal)
2254 *
2255 * Releases a reference to a threadpool object.
2256 */
2258{
2259 if (InterlockedDecrement( &object->refcount ))
2260 return FALSE;
2261
2262 TRACE( "destroying object %p of type %u\n", object, object->type );
2263
2264 assert( object->shutdown );
2265 assert( !object->num_pending_callbacks );
2266 assert( !object->num_running_callbacks );
2267 assert( !object->num_associated_callbacks );
2268
2269 /* release reference to the group */
2270 if (object->group)
2271 {
2272 struct threadpool_group *group = object->group;
2273
2275 if (object->is_group_member)
2276 {
2277 list_remove( &object->group_entry );
2278 object->is_group_member = FALSE;
2279 }
2281
2283 }
2284
2286
2287 if (object->race_dll)
2288 LdrUnloadDll( object->race_dll );
2289
2290 if (object->completed_event && object->completed_event != INVALID_HANDLE_VALUE)
2291 NtSetEvent( object->completed_event, NULL );
2292
2293 RtlFreeHeap( GetProcessHeap(), 0, object );
2294 return TRUE;
2295}
2296
2297static struct list *threadpool_get_next_item( const struct threadpool *pool )
2298{
2299 struct list *ptr;
2300 unsigned int i;
2301
2302 for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
2303 {
2304 if ((ptr = list_head( &pool->pools[i] )))
2305 break;
2306 }
2307
2308 return ptr;
2309}
2310
2311/***********************************************************************
2312 * tp_object_execute (internal)
2313 *
2314 * Executes a threadpool object callback, object->pool->cs has to be
2315 * held.
2316 */
2317static void tp_object_execute( struct threadpool_object *object, BOOL wait_thread )
2318{
2319 TP_CALLBACK_INSTANCE *callback_instance;
2322 struct threadpool *pool = object->pool;
2323 TP_WAIT_RESULT wait_result = 0;
2325
2326 object->num_pending_callbacks--;
2327
2328 /* For wait objects check if they were signaled or have timed out. */
2329 if (object->type == TP_OBJECT_TYPE_WAIT)
2330 {
2331 wait_result = object->u.wait.signaled ? WAIT_OBJECT_0 : WAIT_TIMEOUT;
2332 if (wait_result == WAIT_OBJECT_0) object->u.wait.signaled--;
2333 }
2334 else if (object->type == TP_OBJECT_TYPE_IO)
2335 {
2336 assert( object->u.io.completion_count );
2337 completion = object->u.io.completions[--object->u.io.completion_count];
2338 }
2339
2340 /* Leave critical section and do the actual callback. */
2341 object->num_associated_callbacks++;
2342 object->num_running_callbacks++;
2344 if (wait_thread) RtlLeaveCriticalSection( &waitqueue.cs );
2345
2346 /* Initialize threadpool instance struct. */
2347 callback_instance = (TP_CALLBACK_INSTANCE *)&instance;
2348 instance.object = object;
2349 instance.threadid = GetCurrentThreadId();
2350 instance.associated = TRUE;
2351 instance.may_run_long = object->may_run_long;
2352 instance.cleanup.critical_section = NULL;
2353 instance.cleanup.mutex = NULL;
2354 instance.cleanup.semaphore = NULL;
2355 instance.cleanup.semaphore_count = 0;
2356 instance.cleanup.event = NULL;
2357 instance.cleanup.library = NULL;
2358
2359 switch (object->type)
2360 {
2362 {
2363 TRACE( "executing simple callback %p(%p, %p)\n",
2364 object->u.simple.callback, callback_instance, object->userdata );
2365 object->u.simple.callback( callback_instance, object->userdata );
2366 TRACE( "callback %p returned\n", object->u.simple.callback );
2367 break;
2368 }
2369
2371 {
2372 TRACE( "executing work callback %p(%p, %p, %p)\n",
2373 object->u.work.callback, callback_instance, object->userdata, object );
2374 object->u.work.callback( callback_instance, object->userdata, (TP_WORK *)object );
2375 TRACE( "callback %p returned\n", object->u.work.callback );
2376 break;
2377 }
2378
2380 {
2381 TRACE( "executing timer callback %p(%p, %p, %p)\n",
2382 object->u.timer.callback, callback_instance, object->userdata, object );
2383 object->u.timer.callback( callback_instance, object->userdata, (TP_TIMER *)object );
2384 TRACE( "callback %p returned\n", object->u.timer.callback );
2385 break;
2386 }
2387
2389 {
2390 TRACE( "executing wait callback %p(%p, %p, %p, %lu)\n",
2391 object->u.wait.callback, callback_instance, object->userdata, object, wait_result );
2392 object->u.wait.callback( callback_instance, object->userdata, (TP_WAIT *)object, wait_result );
2393 TRACE( "callback %p returned\n", object->u.wait.callback );
2394 break;
2395 }
2396
2397 case TP_OBJECT_TYPE_IO:
2398 {
2399 TRACE( "executing I/O callback %p(%p, %p, %#Ix, %p, %p)\n",
2400 object->u.io.callback, callback_instance, object->userdata,
2401 completion.cvalue, &completion.iosb, (TP_IO *)object );
2402 object->u.io.callback( callback_instance, object->userdata,
2403 (void *)completion.cvalue, &completion.iosb, (TP_IO *)object );
2404 TRACE( "callback %p returned\n", object->u.io.callback );
2405 break;
2406 }
2407
2408 default:
2409 assert(0);
2410 break;
2411 }
2412
2413 /* Execute finalization callback. */
2414 if (object->finalization_callback)
2415 {
2416 TRACE( "executing finalization callback %p(%p, %p)\n",
2417 object->finalization_callback, callback_instance, object->userdata );
2418 object->finalization_callback( callback_instance, object->userdata );
2419 TRACE( "callback %p returned\n", object->finalization_callback );
2420 }
2421
2422 /* Execute cleanup tasks. */
2423 if (instance.cleanup.critical_section)
2424 {
2425 RtlLeaveCriticalSection( instance.cleanup.critical_section );
2426 }
2427 if (instance.cleanup.mutex)
2428 {
2429 status = NtReleaseMutant( instance.cleanup.mutex, NULL );
2430 if (status != STATUS_SUCCESS) goto skip_cleanup;
2431 }
2432 if (instance.cleanup.semaphore)
2433 {
2434 status = NtReleaseSemaphore( instance.cleanup.semaphore, instance.cleanup.semaphore_count, NULL );
2435 if (status != STATUS_SUCCESS) goto skip_cleanup;
2436 }
2437 if (instance.cleanup.event)
2438 {
2439 status = NtSetEvent( instance.cleanup.event, NULL );
2440 if (status != STATUS_SUCCESS) goto skip_cleanup;
2441 }
2442 if (instance.cleanup.library)
2443 {
2444 LdrUnloadDll( instance.cleanup.library );
2445 }
2446
2447skip_cleanup:
2448 if (wait_thread) RtlEnterCriticalSection( &waitqueue.cs );
2450
2451 /* Simple callbacks are automatically shutdown after execution. */
2452 if (object->type == TP_OBJECT_TYPE_SIMPLE)
2453 {
2455 object->shutdown = TRUE;
2456 }
2457
2458 object->num_running_callbacks--;
2459 if (object_is_finished( object, TRUE ))
2460 RtlWakeAllConditionVariable( &object->group_finished_event );
2461
2462 if (instance.associated)
2463 {
2464 object->num_associated_callbacks--;
2465 if (object_is_finished( object, FALSE ))
2466 RtlWakeAllConditionVariable( &object->finished_event );
2467 }
2468}
2469
2470/***********************************************************************
2471 * threadpool_worker_proc (internal)
2472 */
2473#ifdef __REACTOS__
2475#else
2477#endif
2478{
2479 struct threadpool *pool = param;
2481 struct list *ptr;
2482
2483 TRACE( "starting worker thread for pool %p\n", pool );
2484 set_thread_name(L"wine_threadpool_worker");
2485
2487 for (;;)
2488 {
2489 while ((ptr = threadpool_get_next_item( pool )))
2490 {
2491 struct threadpool_object *object = LIST_ENTRY( ptr, struct threadpool_object, pool_entry );
2492 assert( object->num_pending_callbacks > 0 );
2493
2494 /* If further pending callbacks are queued, move the work item to
2495 * the end of the pool list. Otherwise remove it from the pool. */
2496 list_remove( &object->pool_entry );
2497 if (object->num_pending_callbacks > 1)
2498 tp_object_prio_queue( object );
2499
2500 tp_object_execute( object, FALSE );
2501
2502 assert(pool->num_busy_workers);
2503 pool->num_busy_workers--;
2504
2505 tp_object_release( object );
2506 }
2507
2508 /* Shutdown worker thread if requested. */
2509 if (pool->shutdown)
2510 break;
2511
2512 /* Wait for new tasks or until the timeout expires. A thread only terminates
2513 * when no new tasks are available, and the number of threads can be
2514 * decreased without violating the min_workers limit. An exception is when
2515 * min_workers == 0, then objcount is used to detect if the last thread
2516 * can be terminated. */
2517 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
2518 if (RtlSleepConditionVariableCS( &pool->update_event, &pool->cs, &timeout ) == STATUS_TIMEOUT &&
2519 !threadpool_get_next_item( pool ) && (pool->num_workers > max( pool->min_workers, 1 ) ||
2520 (!pool->min_workers && !pool->objcount)))
2521 {
2522 break;
2523 }
2524 }
2525 pool->num_workers--;
2527
2528 TRACE( "terminating worker thread for pool %p\n", pool );
2530 RtlExitUserThread( 0 );
2531#ifdef __REACTOS__
2532 return STATUS_SUCCESS;
2533#endif
2534}
2535
2536/***********************************************************************
2537 * TpAllocCleanupGroup (NTDLL.@)
2538 */
2540{
2541 TRACE( "%p\n", out );
2542
2543 return tp_group_alloc( (struct threadpool_group **)out );
2544}
2545
2546/***********************************************************************
2547 * TpAllocIoCompletion (NTDLL.@)
2548 */
2550 void *userdata, TP_CALLBACK_ENVIRON *environment )
2551{
2552 struct threadpool_object *object;
2553 struct threadpool *pool;
2555
2556 TRACE( "%p %p %p %p %p\n", out, file, callback, userdata, environment );
2557
2558 if (!(object = RtlAllocateHeap( GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(*object) )))
2559 return STATUS_NO_MEMORY;
2560
2561 if ((status = tp_threadpool_lock( &pool, environment )))
2562 {
2563 RtlFreeHeap( GetProcessHeap(), 0, object );
2564 return status;
2565 }
2566
2567 object->type = TP_OBJECT_TYPE_IO;
2568 object->u.io.callback = callback;
2569 if (!(object->u.io.completions = RtlAllocateHeap( GetProcessHeap(), 0, 8 * sizeof(*object->u.io.completions) )))
2570 {
2572 RtlFreeHeap( GetProcessHeap(), 0, object );
2573 return status;
2574 }
2575
2576 if ((status = tp_ioqueue_lock( object, file )))
2577 {
2579 RtlFreeHeap( GetProcessHeap(), 0, object->u.io.completions );
2580 RtlFreeHeap( GetProcessHeap(), 0, object );
2581 return status;
2582 }
2583
2584 tp_object_initialize( object, pool, userdata, environment );
2585
2586 *out = (TP_IO *)object;
2587 return STATUS_SUCCESS;
2588}
2589
2590/***********************************************************************
2591 * TpAllocPool (NTDLL.@)
2592 */
2594{
2595 TRACE( "%p %p\n", out, reserved );
2596
2597 if (reserved)
2598 FIXME( "reserved argument is nonzero (%p)\n", reserved );
2599
2600 return tp_threadpool_alloc( (struct threadpool **)out );
2601}
2602
2603/***********************************************************************
2604 * TpAllocTimer (NTDLL.@)
2605 */
2607 TP_CALLBACK_ENVIRON *environment )
2608{
2609 struct threadpool_object *object;
2610 struct threadpool *pool;
2612
2613 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2614
2615 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2616 if (!object)
2617 return STATUS_NO_MEMORY;
2618
2619 status = tp_threadpool_lock( &pool, environment );
2620 if (status)
2621 {
2622 RtlFreeHeap( GetProcessHeap(), 0, object );
2623 return status;
2624 }
2625
2626 object->type = TP_OBJECT_TYPE_TIMER;
2627 object->u.timer.callback = callback;
2628
2629 status = tp_timerqueue_lock( object );
2630 if (status)
2631 {
2633 RtlFreeHeap( GetProcessHeap(), 0, object );
2634 return status;
2635 }
2636
2637 tp_object_initialize( object, pool, userdata, environment );
2638
2639 *out = (TP_TIMER *)object;
2640 return STATUS_SUCCESS;
2641}
2642
2644 TP_CALLBACK_ENVIRON *environment, DWORD flags )
2645{
2646 struct threadpool_object *object;
2647 struct threadpool *pool;
2649
2650 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2651 if (!object)
2652 return STATUS_NO_MEMORY;
2653
2654 status = tp_threadpool_lock( &pool, environment );
2655 if (status)
2656 {
2657 RtlFreeHeap( GetProcessHeap(), 0, object );
2658 return status;
2659 }
2660
2661 object->type = TP_OBJECT_TYPE_WAIT;
2662 object->u.wait.callback = callback;
2663 object->u.wait.flags = flags;
2664
2665 status = tp_waitqueue_lock( object );
2666 if (status)
2667 {
2669 RtlFreeHeap( GetProcessHeap(), 0, object );
2670 return status;
2671 }
2672
2673 tp_object_initialize( object, pool, userdata, environment );
2674
2675 *out = (TP_WAIT *)object;
2676 return STATUS_SUCCESS;
2677}
2678
2679/***********************************************************************
2680 * TpAllocWait (NTDLL.@)
2681 */
2683 TP_CALLBACK_ENVIRON *environment )
2684{
2685 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2686 return tp_alloc_wait( out, callback, userdata, environment, WT_EXECUTEONLYONCE );
2687}
2688
2689/***********************************************************************
2690 * TpAllocWork (NTDLL.@)
2691 */
2693 TP_CALLBACK_ENVIRON *environment )
2694{
2695 struct threadpool_object *object;
2696 struct threadpool *pool;
2698
2699 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2700
2701 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2702 if (!object)
2703 return STATUS_NO_MEMORY;
2704
2705 status = tp_threadpool_lock( &pool, environment );
2706 if (status)
2707 {
2708 RtlFreeHeap( GetProcessHeap(), 0, object );
2709 return status;
2710 }
2711
2712 object->type = TP_OBJECT_TYPE_WORK;
2713 object->u.work.callback = callback;
2714 tp_object_initialize( object, pool, userdata, environment );
2715
2716 *out = (TP_WORK *)object;
2717 return STATUS_SUCCESS;
2718}
2719
2720/***********************************************************************
2721 * TpCancelAsyncIoOperation (NTDLL.@)
2722 */
2724{
2725 struct threadpool_object *this = impl_from_TP_IO( io );
2726
2727 TRACE( "%p\n", io );
2728
2729 RtlEnterCriticalSection( &this->pool->cs );
2730
2731 TRACE("pending_count %u.\n", this->u.io.pending_count);
2732
2733 this->u.io.pending_count--;
2734 if (object_is_finished( this, TRUE ))
2736 if (object_is_finished( this, FALSE ))
2738
2739 RtlLeaveCriticalSection( &this->pool->cs );
2740}
2741
2742/***********************************************************************
2743 * TpCallbackLeaveCriticalSectionOnCompletion (NTDLL.@)
2744 */
2746{
2748
2749 TRACE( "%p %p\n", instance, crit );
2750
2751 if (!this->cleanup.critical_section)
2752 this->cleanup.critical_section = crit;
2753}
2754
2755/***********************************************************************
2756 * TpCallbackMayRunLong (NTDLL.@)
2757 */
2759{
2761 struct threadpool_object *object = this->object;
2762 struct threadpool *pool;
2764
2765 TRACE( "%p\n", instance );
2766
2767 if (this->threadid != GetCurrentThreadId())
2768 {
2769 ERR("called from wrong thread, ignoring\n");
2770 return STATUS_UNSUCCESSFUL; /* FIXME */
2771 }
2772
2773 if (this->may_run_long)
2774 return STATUS_SUCCESS;
2775
2776 pool = object->pool;
2778
2779 /* Start new worker threads if required. */
2780 if (pool->num_busy_workers >= pool->num_workers)
2781 {
2782 if (pool->num_workers < pool->max_workers)
2783 {
2785 }
2786 else
2787 {
2789 }
2790 }
2791
2793 this->may_run_long = TRUE;
2794 return status;
2795}
2796
2797/***********************************************************************
2798 * TpCallbackReleaseMutexOnCompletion (NTDLL.@)
2799 */
2801{
2803
2804 TRACE( "%p %p\n", instance, mutex );
2805
2806 if (!this->cleanup.mutex)
2807 this->cleanup.mutex = mutex;
2808}
2809
2810/***********************************************************************
2811 * TpCallbackReleaseSemaphoreOnCompletion (NTDLL.@)
2812 */
2814{
2816
2817 TRACE( "%p %p %lu\n", instance, semaphore, count );
2818
2819 if (!this->cleanup.semaphore)
2820 {
2821 this->cleanup.semaphore = semaphore;
2822 this->cleanup.semaphore_count = count;
2823 }
2824}
2825
2826/***********************************************************************
2827 * TpCallbackSetEventOnCompletion (NTDLL.@)
2828 */
2830{
2832
2833 TRACE( "%p %p\n", instance, event );
2834
2835 if (!this->cleanup.event)
2836 this->cleanup.event = event;
2837}
2838
2839/***********************************************************************
2840 * TpCallbackUnloadDllOnCompletion (NTDLL.@)
2841 */
2843{
2845
2846 TRACE( "%p %p\n", instance, module );
2847
2848 if (!this->cleanup.library)
2849 this->cleanup.library = module;
2850}
2851
2852/***********************************************************************
2853 * TpDisassociateCallback (NTDLL.@)
2854 */
2856{
2858 struct threadpool_object *object = this->object;
2859 struct threadpool *pool;
2860
2861 TRACE( "%p\n", instance );
2862
2863 if (this->threadid != GetCurrentThreadId())
2864 {
2865 ERR("called from wrong thread, ignoring\n");
2866 return;
2867 }
2868
2869 if (!this->associated)
2870 return;
2871
2872 pool = object->pool;
2874
2875 object->num_associated_callbacks--;
2876 if (object_is_finished( object, FALSE ))
2877 RtlWakeAllConditionVariable( &object->finished_event );
2878
2880 this->associated = FALSE;
2881}
2882
2883/***********************************************************************
2884 * TpIsTimerSet (NTDLL.@)
2885 */
2887{
2888 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2889
2890 TRACE( "%p\n", timer );
2891
2892 return this->u.timer.timer_set;
2893}
2894
2895/***********************************************************************
2896 * TpPostWork (NTDLL.@)
2897 */
2899{
2900 struct threadpool_object *this = impl_from_TP_WORK( work );
2901
2902 TRACE( "%p\n", work );
2903
2904 tp_object_submit( this, FALSE );
2905}
2906
2907/***********************************************************************
2908 * TpReleaseCleanupGroup (NTDLL.@)
2909 */
2911{
2913
2914 TRACE( "%p\n", group );
2915
2916 tp_group_shutdown( this );
2917 tp_group_release( this );
2918}
2919
2920/***********************************************************************
2921 * TpReleaseCleanupGroupMembers (NTDLL.@)
2922 */
2924{
2926 struct threadpool_object *object, *next;
2927 struct list members;
2928
2929 TRACE( "%p %u %p\n", group, cancel_pending, userdata );
2930
2931 RtlEnterCriticalSection( &this->cs );
2932
2933 /* Unset group, increase references, and mark objects for shutdown */
2934 LIST_FOR_EACH_ENTRY_SAFE( object, next, &this->members, struct threadpool_object, group_entry )
2935 {
2936 assert( object->group == this );
2937 assert( object->is_group_member );
2938
2939 if (InterlockedIncrement( &object->refcount ) == 1)
2940 {
2941 /* Object is basically already destroyed, but group reference
2942 * was not deleted yet. We can safely ignore this object. */
2943 InterlockedDecrement( &object->refcount );
2944 list_remove( &object->group_entry );
2945 object->is_group_member = FALSE;
2946 continue;
2947 }
2948
2949 object->is_group_member = FALSE;
2951 }
2952
2953 /* Move members to a new temporary list */
2954 list_init( &members );
2955 list_move_tail( &members, &this->members );
2956
2957 RtlLeaveCriticalSection( &this->cs );
2958
2959 /* Cancel pending callbacks if requested */
2960 if (cancel_pending)
2961 {
2962 LIST_FOR_EACH_ENTRY( object, &members, struct threadpool_object, group_entry )
2963 {
2964 tp_object_cancel( object );
2965 }
2966 }
2967
2968 /* Wait for remaining callbacks to finish */
2969 LIST_FOR_EACH_ENTRY_SAFE( object, next, &members, struct threadpool_object, group_entry )
2970 {
2971 tp_object_wait( object, TRUE );
2972
2973 if (!object->shutdown)
2974 {
2975 /* Execute group cancellation callback if defined, and if this was actually a group cancel. */
2976 if (cancel_pending && object->group_cancel_callback)
2977 {
2978 TRACE( "executing group cancel callback %p(%p, %p)\n",
2979 object->group_cancel_callback, object->userdata, userdata );
2980 object->group_cancel_callback( object->userdata, userdata );
2981 TRACE( "callback %p returned\n", object->group_cancel_callback );
2982 }
2983
2984 if (object->type != TP_OBJECT_TYPE_SIMPLE)
2985 tp_object_release( object );
2986 }
2987
2988 object->shutdown = TRUE;
2989 tp_object_release( object );
2990 }
2991}
2992
2993/***********************************************************************
2994 * TpReleaseIoCompletion (NTDLL.@)
2995 */
2997{
2998 struct threadpool_object *this = impl_from_TP_IO( io );
2999 BOOL can_destroy;
3000
3001 TRACE( "%p\n", io );
3002
3003 RtlEnterCriticalSection( &this->pool->cs );
3004 this->u.io.shutting_down = TRUE;
3005 can_destroy = !this->u.io.pending_count && !this->u.io.skipped_count;
3006 RtlLeaveCriticalSection( &this->pool->cs );
3007
3008 if (can_destroy)
3009 {
3011 this->shutdown = TRUE;
3012 tp_object_release( this );
3013 }
3014}
3015
3016/***********************************************************************
3017 * TpReleasePool (NTDLL.@)
3018 */
3020{
3021 struct threadpool *this = impl_from_TP_POOL( pool );
3022
3023 TRACE( "%p\n", pool );
3024
3025 tp_threadpool_shutdown( this );
3026 tp_threadpool_release( this );
3027}
3028
3029/***********************************************************************
3030 * TpReleaseTimer (NTDLL.@)
3031 */
3033{
3034 struct threadpool_object *this = impl_from_TP_TIMER( timer );
3035
3036 TRACE( "%p\n", timer );
3037
3039 this->shutdown = TRUE;
3040 tp_object_release( this );
3041}
3042
3043/***********************************************************************
3044 * TpReleaseWait (NTDLL.@)
3045 */
3047{
3048 struct threadpool_object *this = impl_from_TP_WAIT( wait );
3049
3050 TRACE( "%p\n", wait );
3051
3053 this->shutdown = TRUE;
3054 tp_object_release( this );
3055}
3056
3057/***********************************************************************
3058 * TpReleaseWork (NTDLL.@)
3059 */
3061{
3062 struct threadpool_object *this = impl_from_TP_WORK( work );
3063
3064 TRACE( "%p\n", work );
3065
3067 this->shutdown = TRUE;
3068 tp_object_release( this );
3069}
3070
3071/***********************************************************************
3072 * TpSetPoolMaxThreads (NTDLL.@)
3073 */
3075{
3076 struct threadpool *this = impl_from_TP_POOL( pool );
3077
3078 TRACE( "%p %lu\n", pool, maximum );
3079
3080 RtlEnterCriticalSection( &this->cs );
3081 this->max_workers = max( maximum, 1 );
3082 this->min_workers = min( this->min_workers, this->max_workers );
3083 RtlLeaveCriticalSection( &this->cs );
3084}
3085
3086/***********************************************************************
3087 * TpSetPoolMinThreads (NTDLL.@)
3088 */
3090{
3091 struct threadpool *this = impl_from_TP_POOL( pool );
3093
3094 TRACE( "%p %lu\n", pool, minimum );
3095
3096 RtlEnterCriticalSection( &this->cs );
3097
3098 while (this->num_workers < minimum)
3099 {
3100 status = tp_new_worker_thread( this );
3101 if (status != STATUS_SUCCESS)
3102 break;
3103 }
3104
3105 if (status == STATUS_SUCCESS)
3106 {
3107 this->min_workers = minimum;
3108 this->max_workers = max( this->min_workers, this->max_workers );
3109 }
3110
3111 RtlLeaveCriticalSection( &this->cs );
3112 return !status;
3113}
3114
3115/***********************************************************************
3116 * TpSetTimer (NTDLL.@)
3117 */
3118VOID WINAPI TpSetTimer( TP_TIMER *timer, LARGE_INTEGER *timeout, LONG period, LONG window_length )
3119{
3120 struct threadpool_object *this = impl_from_TP_TIMER( timer );
3121 struct threadpool_object *other_timer;
3122 BOOL submit_timer = FALSE;
3124
3125 TRACE( "%p %p %lu %lu\n", timer, timeout, period, window_length );
3126
3128
3129 assert( this->u.timer.timer_initialized );
3130 this->u.timer.timer_set = timeout != NULL;
3131
3132 /* Convert relative timeout to absolute timestamp and handle a timeout
3133 * of zero, which means that the timer is submitted immediately. */
3134 if (timeout)
3135 {
3136 timestamp = timeout->QuadPart;
3137 if ((LONGLONG)timestamp < 0)
3138 {
3141 timestamp = now.QuadPart - timestamp;
3142 }
3143 else if (!timestamp)
3144 {
3145 if (!period)
3146 timeout = NULL;
3147 else
3148 {
3151 timestamp = now.QuadPart + (ULONGLONG)period * 10000;
3152 }
3153 submit_timer = TRUE;
3154 }
3155 }
3156
3157 /* First remove existing timeout. */
3158 if (this->u.timer.timer_pending)
3159 {
3160 list_remove( &this->u.timer.timer_entry );
3161 this->u.timer.timer_pending = FALSE;
3162 }
3163
3164 /* If the timer was enabled, then add it back to the queue. */
3165 if (timeout)
3166 {
3167 this->u.timer.timeout = timestamp;
3168 this->u.timer.period = period;
3169 this->u.timer.window_length = window_length;
3170
3171 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
3172 struct threadpool_object, u.timer.timer_entry )
3173 {
3174 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
3175 if (this->u.timer.timeout < other_timer->u.timer.timeout)
3176 break;
3177 }
3178 list_add_before( &other_timer->u.timer.timer_entry, &this->u.timer.timer_entry );
3179
3180 /* Wake up the timer thread when the timeout has to be updated. */
3181 if (list_head( &timerqueue.pending_timers ) == &this->u.timer.timer_entry )
3182 RtlWakeAllConditionVariable( &timerqueue.update_event );
3183
3184 this->u.timer.timer_pending = TRUE;
3185 }
3186
3188
3189 if (submit_timer)
3190 tp_object_submit( this, FALSE );
3191}
3192
3193/***********************************************************************
3194 * TpSetWait (NTDLL.@)
3195 */
3197{
3198 struct threadpool_object *this = impl_from_TP_WAIT( wait );
3200
3201 TRACE( "%p %p %p\n", wait, handle, timeout );
3202
3204
3205 assert( this->u.wait.bucket );
3206 this->u.wait.handle = handle;
3207
3208 if (handle || this->u.wait.wait_pending)
3209 {
3210 struct waitqueue_bucket *bucket = this->u.wait.bucket;
3211 list_remove( &this->u.wait.wait_entry );
3212
3213 /* Convert relative timeout to absolute timestamp. */
3214 if (handle && timeout)
3215 {
3216 timestamp = timeout->QuadPart;
3217 if ((LONGLONG)timestamp < 0)
3218 {
3221 timestamp = now.QuadPart - timestamp;
3222 }
3223 }
3224
3225 /* Add wait object back into one of the queues. */
3226 if (handle)
3227 {
3228 list_add_tail( &bucket->waiting, &this->u.wait.wait_entry );
3229 this->u.wait.wait_pending = TRUE;
3230 this->u.wait.timeout = timestamp;
3231 }
3232 else
3233 {
3234 list_add_tail( &bucket->reserved, &this->u.wait.wait_entry );
3235 this->u.wait.wait_pending = FALSE;
3236 }
3237
3238 /* Wake up the wait queue thread. */
3239 NtSetEvent( bucket->update_event, NULL );
3240 }
3241
3243}
3244
3245/***********************************************************************
3246 * TpSimpleTryPost (NTDLL.@)
3247 */
3249 TP_CALLBACK_ENVIRON *environment )
3250{
3251 struct threadpool_object *object;
3252 struct threadpool *pool;
3254
3255 TRACE( "%p %p %p\n", callback, userdata, environment );
3256
3257 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
3258 if (!object)
3259 return STATUS_NO_MEMORY;
3260
3261 status = tp_threadpool_lock( &pool, environment );
3262 if (status)
3263 {
3264 RtlFreeHeap( GetProcessHeap(), 0, object );
3265 return status;
3266 }
3267
3268 object->type = TP_OBJECT_TYPE_SIMPLE;
3269 object->u.simple.callback = callback;
3270 tp_object_initialize( object, pool, userdata, environment );
3271
3272 return STATUS_SUCCESS;
3273}
3274
3275/***********************************************************************
3276 * TpStartAsyncIoOperation (NTDLL.@)
3277 */
3279{
3280 struct threadpool_object *this = impl_from_TP_IO( io );
3281
3282 TRACE( "%p\n", io );
3283
3284 RtlEnterCriticalSection( &this->pool->cs );
3285
3286 this->u.io.pending_count++;
3287
3288 RtlLeaveCriticalSection( &this->pool->cs );
3289}
3290
3291/***********************************************************************
3292 * TpWaitForIoCompletion (NTDLL.@)
3293 */
3294void WINAPI TpWaitForIoCompletion( TP_IO *io, BOOL cancel_pending )
3295{
3296 struct threadpool_object *this = impl_from_TP_IO( io );
3297
3298 TRACE( "%p %d\n", io, cancel_pending );
3299
3300 if (cancel_pending)
3301 tp_object_cancel( this );
3302 tp_object_wait( this, FALSE );
3303}
3304
3305/***********************************************************************
3306 * TpWaitForTimer (NTDLL.@)
3307 */
3309{
3310 struct threadpool_object *this = impl_from_TP_TIMER( timer );
3311
3312 TRACE( "%p %d\n", timer, cancel_pending );
3313
3314 if (cancel_pending)
3315 tp_object_cancel( this );
3316 tp_object_wait( this, FALSE );
3317}
3318
3319/***********************************************************************
3320 * TpWaitForWait (NTDLL.@)
3321 */
3323{
3324 struct threadpool_object *this = impl_from_TP_WAIT( wait );
3325
3326 TRACE( "%p %d\n", wait, cancel_pending );
3327
3328 if (cancel_pending)
3329 tp_object_cancel( this );
3330 tp_object_wait( this, FALSE );
3331}
3332
3333/***********************************************************************
3334 * TpWaitForWork (NTDLL.@)
3335 */
3337{
3338 struct threadpool_object *this = impl_from_TP_WORK( work );
3339
3340 TRACE( "%p %u\n", work, cancel_pending );
3341
3342 if (cancel_pending)
3343 tp_object_cancel( this );
3344 tp_object_wait( this, FALSE );
3345}
3346
3347/***********************************************************************
3348 * TpSetPoolStackInformation (NTDLL.@)
3349 */
3351{
3352 struct threadpool *this = impl_from_TP_POOL( pool );
3353
3354 TRACE( "%p %p\n", pool, stack_info );
3355
3356 if (!stack_info)
3358
3359 RtlEnterCriticalSection( &this->cs );
3360 this->stack_info = *stack_info;
3361 RtlLeaveCriticalSection( &this->cs );
3362
3363 return STATUS_SUCCESS;
3364}
3365
3366/***********************************************************************
3367 * TpQueryPoolStackInformation (NTDLL.@)
3368 */
3370{
3371 struct threadpool *this = impl_from_TP_POOL( pool );
3372
3373 TRACE( "%p %p\n", pool, stack_info );
3374
3375 if (!stack_info)
3377
3378 RtlEnterCriticalSection( &this->cs );
3379 *stack_info = this->stack_info;
3380 RtlLeaveCriticalSection( &this->cs );
3381
3382 return STATUS_SUCCESS;
3383}
3384
3385#ifndef __REACTOS__
3387{
3388 struct threadpool_object *object = impl_from_TP_WAIT(wait);
3389 object->u.wait.rtl_callback( userdata, result != STATUS_WAIT_0 );
3390}
3391
3392/***********************************************************************
3393 * RtlRegisterWait (NTDLL.@)
3394 *
3395 * Registers a wait for a handle to become signaled.
3396 *
3397 * PARAMS
3398 * NewWaitObject [I] Handle to the new wait object. Use RtlDeregisterWait() to free it.
3399 * Object [I] Object to wait to become signaled.
3400 * Callback [I] Callback function to execute when the wait times out or the handle is signaled.
3401 * Context [I] Context to pass to the callback function when it is executed.
3402 * Milliseconds [I] Number of milliseconds to wait before timing out.
3403 * Flags [I] Flags. See notes.
3404 *
3405 * RETURNS
3406 * Success: STATUS_SUCCESS.
3407 * Failure: Any NTSTATUS code.
3408 *
3409 * NOTES
3410 * Flags can be one or more of the following:
3411 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
3412 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
3413 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
3414 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
3415 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
3416 */
3418 void *context, ULONG milliseconds, ULONG flags )
3419{
3420 struct threadpool_object *object;
3421 TP_CALLBACK_ENVIRON environment;
3424 TP_WAIT *wait;
3425
3426 TRACE( "out %p, handle %p, callback %p, context %p, milliseconds %lu, flags %lx\n",
3427 out, handle, callback, context, milliseconds, flags );
3428
3429 memset( &environment, 0, sizeof(environment) );
3430 environment.Version = 1;
3431 environment.u.s.LongFunction = (flags & WT_EXECUTELONGFUNCTION) != 0;
3432 environment.u.s.Persistent = (flags & WT_EXECUTEINPERSISTENTTHREAD) != 0;
3433
3435 if ((status = tp_alloc_wait( &wait, rtl_wait_callback, context, &environment, flags )))
3436 return status;
3437
3438 object = impl_from_TP_WAIT(wait);
3439 object->u.wait.rtl_callback = callback;
3440
3442 TpSetWait( (TP_WAIT *)object, handle, get_nt_timeout( &timeout, milliseconds ) );
3443
3444 *out = object;
3446
3447 return STATUS_SUCCESS;
3448}
3449
3450/***********************************************************************
3451 * RtlDeregisterWaitEx (NTDLL.@)
3452 *
3453 * Cancels a wait operation and frees the resources associated with calling
3454 * RtlRegisterWait().
3455 *
3456 * PARAMS
3457 * WaitObject [I] Handle to the wait object to free.
3458 *
3459 * RETURNS
3460 * Success: STATUS_SUCCESS.
3461 * Failure: Any NTSTATUS code.
3462 */
3464{
3465 struct threadpool_object *object = handle;
3467
3468 TRACE( "handle %p, event %p\n", handle, event );
3469
3470 if (!object) return STATUS_INVALID_HANDLE;
3471
3472 TpSetWait( (TP_WAIT *)object, NULL, NULL );
3473
3474 if (event == INVALID_HANDLE_VALUE) TpWaitForWait( (TP_WAIT *)object, TRUE );
3475 else
3476 {
3477 assert( object->completed_event == NULL );
3478 object->completed_event = event;
3479 }
3480
3481 RtlEnterCriticalSection( &object->pool->cs );
3482 if (object->num_pending_callbacks + object->num_running_callbacks
3483 + object->num_associated_callbacks) status = STATUS_PENDING;
3484 else status = STATUS_SUCCESS;
3485 RtlLeaveCriticalSection( &object->pool->cs );
3486
3487 TpReleaseWait( (TP_WAIT *)object );
3488 return status;
3489}
3490
3491/***********************************************************************
3492 * RtlDeregisterWait (NTDLL.@)
3493 *
3494 * Cancels a wait operation and frees the resources associated with calling
3495 * RtlRegisterWait().
3496 *
3497 * PARAMS
3498 * WaitObject [I] Handle to the wait object to free.
3499 *
3500 * RETURNS
3501 * Success: STATUS_SUCCESS.
3502 * Failure: Any NTSTATUS code.
3503 */
3505{
3506 return RtlDeregisterWaitEx(WaitHandle, NULL);
3507}
3508#endif
3509
3510#ifdef __REACTOS__
3511VOID
3512NTAPI
3514 VOID)
3515{
3516 RtlInitializeCriticalSection(&old_threadpool.threadpool_compl_cs);
3520}
3521#endif
void CALLBACK completion(DWORD dwError, DWORD cbTransferred, LPWSAOVERLAPPED lpOverlapped, DWORD dwFlags)
Definition: WSARecv.c:16
void destroy(_Tp *__pointer)
Definition: _construct.h:278
#define VOID
Definition: acefi.h:82
#define InterlockedIncrement
Definition: armddk.h:53
#define InterlockedDecrement
Definition: armddk.h:52
#define skip(...)
Definition: atltest.h:64
#define WINE_DEFAULT_DEBUG_CHANNEL(t)
Definition: precomp.h:23
LONG NTSTATUS
Definition: precomp.h:26
#define ARRAY_SIZE(A)
Definition: main.h:20
static void list_remove(struct list_entry *entry)
Definition: list.h:90
static int list_empty(struct list_entry *head)
Definition: list.h:58
static void list_add_tail(struct list_entry *head, struct list_entry *entry)
Definition: list.h:83
static void list_init(struct list_entry *head)
Definition: list.h:51
#define FIXME(fmt,...)
Definition: precomp.h:53
#define WARN(fmt,...)
Definition: precomp.h:61
#define ERR(fmt,...)
Definition: precomp.h:57
static HANDLE thread
Definition: service.c:33
#define NTSYSAPI
Definition: ntoskrnl.h:12
PVOID NTAPI RtlAllocateHeap(IN PVOID HeapHandle, IN ULONG Flags, IN SIZE_T Size)
Definition: heap.c:616
BOOLEAN NTAPI RtlFreeHeap(IN PVOID HeapHandle, IN ULONG Flags, IN PVOID HeapBase)
Definition: heap.c:634
_In_ CDROM_SCAN_FOR_SPECIAL_INFO _In_ PCDROM_SCAN_FOR_SPECIAL_HANDLER Function
Definition: cdrom.h:1156
Definition: list.h:37
#define STATUS_TIMEOUT
Definition: d3dkmdt.h:49
#define STATUS_INVALID_HANDLE
Definition: d3dkmdt.h:40
#define STATUS_PENDING
Definition: d3dkmdt.h:43
#define STATUS_NO_MEMORY
Definition: d3dkmdt.h:51
#define WAIT_TIMEOUT
Definition: dderror.h:14
#define NULL
Definition: types.h:112
#define TRUE
Definition: types.h:120
#define FALSE
Definition: types.h:117
static HINSTANCE instance
Definition: main.c:40
#define GetProcessHeap()
Definition: compat.h:736
#define INVALID_HANDLE_VALUE
Definition: compat.h:731
#define GetCurrentProcess()
Definition: compat.h:759
#define RtlImageNtHeader
Definition: compat.h:806
#define CALLBACK
Definition: compat.h:35
#define HEAP_ZERO_MEMORY
Definition: compat.h:134
static void cleanup(void)
Definition: main.c:1335
PPEB Peb
Definition: dllmain.c:27
#define assert(x)
Definition: debug.h:53
static void list_move_tail(struct list_head *list, struct list_head *head)
Definition: list.h:122
static uacpi_status set_event(uacpi_u8 event, uacpi_u8 value)
Definition: event.c:84
#define L(x)
Definition: resources.c:13
r reserved
Definition: btrfs.c:3006
#define INFINITE
Definition: serial.h:102
#define ULONG_PTR
Definition: config.h:101
unsigned int BOOL
Definition: ntddk_ex.h:94
unsigned long DWORD
Definition: ntddk_ex.h:95
time_t now
Definition: finger.c:65
_Must_inspect_result_ _In_opt_ PFLT_INSTANCE _Out_ PHANDLE FileHandle
Definition: fltkernel.h:1231
@ FileCompletionInformation
Definition: from_kernel.h:91
FxCollectionEntry * cur
GLuint GLuint GLsizei count
Definition: gl.h:1545
GLdouble GLdouble t
Definition: gl.h:2047
GLdouble GLdouble GLdouble GLdouble q
Definition: gl.h:2063
struct _cl_event * event
Definition: glext.h:7739
GLuint res
Definition: glext.h:9613
GLsizeiptr size
Definition: glext.h:5919
GLbitfield flags
Definition: glext.h:7161
GLboolean GLuint group
Definition: glext.h:11120
GLuint64EXT * result
Definition: glext.h:11304
GLfloat GLfloat p
Definition: glext.h:8902
GLfloat param
Definition: glext.h:5796
GLsizei GLenum const GLvoid GLsizei GLenum GLbyte GLbyte GLbyte GLdouble GLdouble GLdouble GLfloat GLfloat GLfloat GLint GLint GLint GLshort GLshort GLshort GLubyte GLubyte GLubyte GLuint GLuint GLuint GLushort GLushort GLushort GLbyte GLbyte GLbyte GLbyte GLdouble GLdouble GLdouble GLdouble GLfloat GLfloat GLfloat GLfloat GLint GLint GLint GLint GLshort GLshort GLshort GLshort GLubyte GLubyte GLubyte GLubyte GLuint GLuint GLuint GLuint GLushort GLushort GLushort GLushort GLboolean const GLdouble const GLfloat const GLint const GLshort const GLbyte const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLdouble const GLfloat const GLfloat const GLint const GLint const GLshort const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort GLenum GLenum GLenum GLfloat GLenum GLint GLenum GLenum GLenum GLfloat GLenum GLenum GLint GLenum GLfloat GLenum GLint GLint GLushort GLenum GLenum GLfloat GLenum GLenum GLint GLfloat const GLubyte GLenum GLenum GLenum const GLfloat GLenum GLenum const GLint GLenum GLint GLint GLsizei GLsizei GLint GLenum GLenum const GLvoid GLenum GLenum const GLfloat GLenum GLenum const GLint GLenum GLenum const GLdouble GLenum GLenum const GLfloat GLenum GLenum const GLint GLsizei GLuint GLfloat GLuint GLbitfield GLfloat GLint GLuint GLboolean GLenum GLfloat GLenum GLbitfield GLenum GLfloat GLfloat GLint GLint const GLfloat GLenum GLfloat GLfloat GLint GLint GLfloat GLfloat GLint GLint const GLfloat GLint GLfloat GLfloat GLint GLfloat GLfloat GLint GLfloat GLfloat const GLdouble const GLfloat const GLdouble const GLfloat GLint i
Definition: glfuncs.h:248
GLsizei GLenum const GLvoid GLsizei GLenum GLbyte GLbyte GLbyte GLdouble GLdouble GLdouble GLfloat GLfloat GLfloat GLint GLint GLint GLshort GLshort GLshort GLubyte GLubyte GLubyte GLuint GLuint GLuint GLushort GLushort GLushort GLbyte GLbyte GLbyte GLbyte GLdouble GLdouble GLdouble GLdouble GLfloat GLfloat GLfloat GLfloat GLint GLint GLint GLint GLshort GLshort GLshort GLshort GLubyte GLubyte GLubyte GLubyte GLuint GLuint GLuint GLuint GLushort GLushort GLushort GLushort GLboolean const GLdouble const GLfloat const GLint const GLshort const GLbyte const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLdouble const GLfloat const GLfloat const GLint const GLint const GLshort const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort GLenum GLenum GLenum GLfloat GLenum GLint GLenum GLenum GLenum GLfloat GLenum GLenum GLint GLenum GLfloat GLenum GLint GLint GLushort GLenum GLenum GLfloat GLenum GLenum GLint GLfloat const GLubyte GLenum GLenum GLenum const GLfloat GLenum GLenum const GLint GLenum GLint GLint GLsizei GLsizei GLint GLenum GLenum const GLvoid GLenum GLenum const GLfloat GLenum GLenum const GLint GLenum GLenum const GLdouble GLenum GLenum const GLfloat GLenum GLenum const GLint GLsizei GLuint GLfloat GLuint GLbitfield GLfloat GLint GLuint GLboolean GLenum GLfloat GLenum GLbitfield GLenum GLfloat GLfloat GLint GLint const GLfloat GLenum GLfloat GLfloat GLint GLint GLfloat GLfloat GLint GLint const GLfloat GLint GLfloat GLfloat GLint GLfloat GLfloat GLint GLfloat GLfloat const GLdouble * u
Definition: glfuncs.h:240
DWORD(CALLBACK * PRTL_WORK_ITEM_ROUTINE)(LPVOID)
Definition: winternl.h:3493
NTSYSAPI void WINAPI TpWaitForWork(TP_WORK *, BOOL)
Definition: threadpool.c:3336
NTSYSAPI void WINAPI TpReleaseWork(TP_WORK *)
Definition: threadpool.c:3060
NTSYSAPI void WINAPI TpDisassociateCallback(TP_CALLBACK_INSTANCE *)
Definition: threadpool.c:2855
NTSYSAPI NTSTATUS WINAPI TpQueryPoolStackInformation(TP_POOL *, TP_POOL_STACK_INFORMATION *stack_info)
Definition: threadpool.c:3369
NTSYSAPI void WINAPI TpStartAsyncIoOperation(TP_IO *)
Definition: threadpool.c:3278
NTSYSAPI void WINAPI TpReleasePool(TP_POOL *)
Definition: threadpool.c:3019
NTSYSAPI NTSTATUS WINAPI TpAllocTimer(TP_TIMER **, PTP_TIMER_CALLBACK, PVOID, TP_CALLBACK_ENVIRON *)
Definition: threadpool.c:2606
NTSYSAPI PVOID WINAPI RtlReAllocateHeap(HANDLE, ULONG, PVOID, SIZE_T) __WINE_ALLOC_SIZE(4) __WINE_DEALLOC(RtlFreeHeap
VOID(CALLBACK * PRTL_OVERLAPPED_COMPLETION_ROUTINE)(DWORD, DWORD, LPVOID)
Definition: winternl.h:3619
NTSYSAPI NTSTATUS WINAPI TpAllocCleanupGroup(TP_CLEANUP_GROUP **)
Definition: threadpool.c:2539
void(NTAPI * RTL_WAITORTIMERCALLBACKFUNC)(PVOID, BOOLEAN)
Definition: winternl.h:3494
NTSYSAPI void WINAPI TpWaitForTimer(TP_TIMER *, BOOL)
Definition: threadpool.c:3308
NTSYSAPI void WINAPI TpWaitForIoCompletion(TP_IO *, BOOL)
Definition: threadpool.c:3294
NTSYSAPI void WINAPI TpCancelAsyncIoOperation(TP_IO *)
Definition: threadpool.c:2723
NTSYSAPI BOOL WINAPI TpIsTimerSet(TP_TIMER *)
Definition: threadpool.c:2886
NTSYSAPI void WINAPI TpReleaseTimer(TP_TIMER *)
Definition: threadpool.c:3032
NTSYSAPI void WINAPI TpPostWork(TP_WORK *)
Definition: threadpool.c:2898
NTSYSAPI void WINAPI TpWaitForWait(TP_WAIT *, BOOL)
Definition: threadpool.c:3322
NTSYSAPI NTSTATUS WINAPI TpAllocIoCompletion(TP_IO **, HANDLE, PTP_IO_CALLBACK, void *, TP_CALLBACK_ENVIRON *)
Definition: threadpool.c:2549
@ ThreadNameInformation
Definition: winternl.h:2319
NTSYSAPI void WINAPI TpSetWait(TP_WAIT *, HANDLE, LARGE_INTEGER *)
Definition: threadpool.c:3196
NTSYSAPI NTSTATUS WINAPI TpAllocPool(TP_POOL **, PVOID)
Definition: threadpool.c:2593
NTSYSAPI NTSTATUS WINAPI RtlInitializeCriticalSectionEx(RTL_CRITICAL_SECTION *, ULONG, ULONG)
struct _THREAD_NAME_INFORMATION THREAD_NAME_INFORMATION
NTSYSAPI void WINAPI TpReleaseIoCompletion(TP_IO *)
Definition: threadpool.c:2996
void(CALLBACK * PNTAPCFUNC)(ULONG_PTR, ULONG_PTR, ULONG_PTR)
Definition: winternl.h:3491
struct _THREAD_NAME_INFORMATION * PTHREAD_NAME_INFORMATION
void(CALLBACK * PRTL_THREAD_START_ROUTINE)(LPVOID)
Definition: winternl.h:3492
NTSYSAPI NTSTATUS WINAPI TpAllocWait(TP_WAIT **, PTP_WAIT_CALLBACK, PVOID, TP_CALLBACK_ENVIRON *)
Definition: threadpool.c:2682
NTSYSAPI void WINAPI TpSetPoolMaxThreads(TP_POOL *, DWORD)
Definition: threadpool.c:3074
NTSYSAPI void WINAPI TpCallbackReleaseSemaphoreOnCompletion(TP_CALLBACK_INSTANCE *, HANDLE, DWORD)
Definition: threadpool.c:2813
NTSYSAPI NTSTATUS WINAPI TpAllocWork(TP_WORK **, PTP_WORK_CALLBACK, PVOID, TP_CALLBACK_ENVIRON *)
Definition: threadpool.c:2692
NTSYSAPI ULONG WINAPI RtlNtStatusToDosError(NTSTATUS)
NTSYSAPI void WINAPI TpReleaseCleanupGroupMembers(TP_CLEANUP_GROUP *, BOOL, PVOID)
Definition: threadpool.c:2923
NTSYSAPI void WINAPI TpReleaseCleanupGroup(TP_CLEANUP_GROUP *)
Definition: threadpool.c:2910
NTSYSAPI void WINAPI TpSetTimer(TP_TIMER *, LARGE_INTEGER *, LONG, LONG)
Definition: threadpool.c:3118
NTSYSAPI NTSTATUS WINAPI TpSimpleTryPost(PTP_SIMPLE_CALLBACK, PVOID, TP_CALLBACK_ENVIRON *)
Definition: threadpool.c:3248
NTSYSAPI NTSTATUS WINAPI TpCallbackMayRunLong(TP_CALLBACK_INSTANCE *)
Definition: threadpool.c:2758
void(CALLBACK * PTP_IO_CALLBACK)(PTP_CALLBACK_INSTANCE, void *, void *, IO_STATUS_BLOCK *, PTP_IO)
Definition: winternl.h:4110
NTSYSAPI NTSTATUS WINAPI TpSetPoolStackInformation(TP_POOL *, TP_POOL_STACK_INFORMATION *stack_info)
Definition: threadpool.c:3350
NTSYSAPI void WINAPI TpReleaseWait(TP_WAIT *)
Definition: threadpool.c:3046
#define InterlockedCompareExchangePointer
Definition: interlocked.h:144
NTSTATUS NTAPI NtSetIoCompletion(IN HANDLE IoCompletionPortHandle, IN PVOID CompletionKey, IN PVOID CompletionContext, IN NTSTATUS CompletionStatus, IN ULONG CompletionInformation)
Definition: iocomp.c:569
NTSTATUS NTAPI NtRemoveIoCompletion(IN HANDLE IoCompletionHandle, OUT PVOID *KeyContext, OUT PVOID *ApcContext, OUT PIO_STATUS_BLOCK IoStatusBlock, IN PLARGE_INTEGER Timeout OPTIONAL)
Definition: iocomp.c:445
NTSTATUS NTAPI NtCreateIoCompletion(OUT PHANDLE IoCompletionHandle, IN ACCESS_MASK DesiredAccess, IN POBJECT_ATTRIBUTES ObjectAttributes, IN ULONG NumberOfConcurrentThreads)
Definition: iocomp.c:253
#define NtCurrentTeb
uint32_t entry
Definition: isohybrid.c:63
#define EVENT_ALL_ACCESS
Definition: isotest.c:82
NTSTATUS NTAPI LdrUnloadDll(_In_ PVOID BaseAddress)
Definition: ldrapi.c:1291
NTSTATUS NTAPI LdrAddRefDll(_In_ ULONG Flags, _In_ PVOID BaseAddress)
Definition: ldrapi.c:1205
if(dx< 0)
Definition: linetemp.h:194
__u16 time
Definition: mkdosfs.c:8
static const CLSID * objects[]
Definition: apphelp.c:112
static PVOID ptr
Definition: dispmode.c:27
static IPrintDialogCallback callback
Definition: printdlg.c:326
IMAGE_NT_HEADERS nt
Definition: module.c:50
#define IO_COMPLETION_ALL_ACCESS
Definition: file.c:72
static HANDLE PIO_APC_ROUTINE PVOID PIO_STATUS_BLOCK io
Definition: file.c:100
static PIO_STATUS_BLOCK iosb
Definition: file.c:98
HANDLE semaphore
Definition: threadpool.c:1889
static TP_CALLBACK_ENVIRON *static TP_CALLBACK_ENVIRON *static PTP_WORK_CALLBACK
Definition: threadpool.c:32
static PTP_TIMER_CALLBACK
Definition: threadpool.c:30
static PTP_IO_CALLBACK
Definition: threadpool.c:28
static DWORD
Definition: threadpool.c:34
static TP_CALLBACK_ENVIRON *static PTP_WAIT_CALLBACK
Definition: threadpool.c:31
static void TP_CALLBACK_ENVIRON *static PVOID
Definition: threadpool.c:29
#define min(a, b)
Definition: monoChain.cc:55
NTSTATUS NTAPI NtReleaseMutant(IN HANDLE MutantHandle, IN PLONG PreviousCount OPTIONAL)
Definition: mutant.c:296
NTSYSAPI VOID NTAPI RtlInitializeConditionVariable(_Out_ PRTL_CONDITION_VARIABLE ConditionVariable)
NTSYSAPI NTSTATUS NTAPI RtlDeleteCriticalSection(_In_ PRTL_CRITICAL_SECTION CriticalSection)
NTSYSAPI NTSTATUS NTAPI RtlQueueWorkItem(_In_ WORKERCALLBACKFUNC Function, _In_opt_ PVOID Context, _In_ ULONG Flags)
NTSYSAPI VOID NTAPI RtlWakeAllConditionVariable(_Inout_ PRTL_CONDITION_VARIABLE ConditionVariable)
NTSYSAPI NTSTATUS NTAPI RtlRegisterWait(_In_ PHANDLE phNewWaitObject, _In_ HANDLE hObject, _In_ WAITORTIMERCALLBACKFUNC Callback, _In_ PVOID pvContext, _In_ ULONG ulMilliseconds, _In_ ULONG ulFlags)
NTSYSAPI NTSTATUS NTAPI RtlSleepConditionVariableCS(_Inout_ PRTL_CONDITION_VARIABLE ConditionVariable, _Inout_ PRTL_CRITICAL_SECTION CriticalSection, _In_opt_ PLARGE_INTEGER TimeOut)
NTSYSAPI NTSTATUS NTAPI RtlEnterCriticalSection(_In_ PRTL_CRITICAL_SECTION CriticalSection)
NTSYSAPI VOID NTAPI RtlExitUserThread(_In_ NTSTATUS Status)
NTSYSAPI VOID NTAPI RtlWakeConditionVariable(_Inout_ PRTL_CONDITION_VARIABLE ConditionVariable)
NTSYSAPI NTSTATUS NTAPI RtlLeaveCriticalSection(_In_ PRTL_CRITICAL_SECTION CriticalSection)
NTSYSAPI NTSTATUS NTAPI RtlCreateUserThread(_In_ PVOID ThreadContext, _Out_ HANDLE *OutThreadHandle, _Reserved_ PVOID Reserved1, _Reserved_ PVOID Reserved2, _Reserved_ PVOID Reserved3, _Reserved_ PVOID Reserved4, _Reserved_ PVOID Reserved5, _Reserved_ PVOID Reserved6, _Reserved_ PVOID Reserved7, _Reserved_ PVOID Reserved8)
NTSYSAPI NTSTATUS NTAPI RtlInitializeCriticalSection(_In_ PRTL_CRITICAL_SECTION CriticalSection)
NTSYSAPI NTSTATUS NTAPI RtlDeregisterWaitEx(_In_ HANDLE hWaitHandle, _In_opt_ HANDLE hCompletionEvent)
NTSYSAPI NTSTATUS NTAPI RtlDeregisterWait(_In_ HANDLE hWaitHandle)
ULONG(NTAPI * PTHREAD_START_ROUTINE)(PVOID Parameter)
Definition: rtltypes.h:560
NTSYSAPI VOID NTAPI RtlInitUnicodeString(PUNICODE_STRING DestinationString, PCWSTR SourceString)
NTSYSAPI NTSTATUS NTAPI NtSetInformationFile(IN HANDLE hFile, OUT PIO_STATUS_BLOCK pIoStatusBlock, IN PVOID FileInformationBuffer, IN ULONG FileInformationBufferLength, IN FILE_INFORMATION_CLASS FileInfoClass)
Definition: iofunc.c:3096
NTSTATUS NTAPI NtClose(IN HANDLE Handle)
Definition: obhandle.c:3402
NTSYSAPI NTSTATUS NTAPI NtWaitForSingleObject(IN HANDLE hObject, IN BOOLEAN bAlertable, IN PLARGE_INTEGER Timeout)
#define LPVOID
Definition: nt_native.h:45
#define MAXLONGLONG
@ SynchronizationEvent
VOID NTAPI RtlpInitializeThreadPooling(VOID)
NTSTATUS NTAPI NtSetEvent(IN HANDLE EventHandle, OUT PLONG PreviousState OPTIONAL)
Definition: event.c:455
NTSTATUS NTAPI NtCreateEvent(OUT PHANDLE EventHandle, IN ACCESS_MASK DesiredAccess, IN POBJECT_ATTRIBUTES ObjectAttributes OPTIONAL, IN EVENT_TYPE EventType, IN BOOLEAN InitialState)
Definition: event.c:96
NTSTATUS NTAPI NtQueryPerformanceCounter(OUT PLARGE_INTEGER PerformanceCounter, OUT PLARGE_INTEGER PerformanceFrequency OPTIONAL)
Definition: profile.c:278
NTSTATUS NTAPI NtQuerySystemTime(OUT PLARGE_INTEGER SystemTime)
Definition: time.c:569
NTSTATUS NTAPI NtSetInformationThread(IN HANDLE ThreadHandle, IN THREADINFOCLASS ThreadInformationClass, IN PVOID ThreadInformation, IN ULONG ThreadInformationLength)
Definition: query.c:2074
PVOID *typedef PHANDLE
Definition: ntsecpkg.h:455
#define STATUS_WAIT_0
Definition: ntstatus.h:237
#define STATUS_TOO_MANY_THREADS
Definition: ntstatus.h:533
#define STATUS_INVALID_PARAMETER_1
Definition: ntstatus.h:475
NTSTATUS NTAPI NtWaitForMultipleObjects(IN ULONG ObjectCount, IN PHANDLE HandleArray, IN WAIT_TYPE WaitType, IN BOOLEAN Alertable, IN PLARGE_INTEGER TimeOut OPTIONAL)
Definition: obwait.c:46
#define BOOLEAN
Definition: pedump.c:73
long LONG
Definition: pedump.c:60
static unsigned __int64 next
Definition: rand_nt.c:6
#define err(...)
static calc_node_t temp
Definition: rpn_ieee.c:38
#define LIST_FOR_EACH_ENTRY(elem, list, type, field)
Definition: list.h:198
#define LIST_FOR_EACH_ENTRY_SAFE(cursor, cursor2, list, type, field)
Definition: list.h:204
#define LIST_FOR_EACH(cursor, list)
Definition: list.h:188
__WINE_SERVER_LIST_INLINE void list_add_before(struct list *elem, struct list *to_add)
Definition: list.h:87
#define memset(x, y, z)
Definition: compat.h:39
static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug
Definition: threadpool.c:322
static void tp_object_wait(struct threadpool_object *object, BOOL group_wait)
Definition: threadpool.c:2208
static struct @5260 waitqueue
RTL_CRITICAL_SECTION threadpool_compl_cs
Definition: threadpool.c:94
static BOOL tp_object_release(struct threadpool_object *object)
Definition: threadpool.c:2257
NTSTATUS WINAPI RtlCreateTimer(HANDLE TimerQueue, HANDLE *NewTimer, RTL_WAITORTIMERCALLBACKFUNC Callback, PVOID Parameter, DWORD DueTime, DWORD Period, ULONG Flags)
Definition: threadpool.c:1008
static PLARGE_INTEGER get_nt_timeout(PLARGE_INTEGER pTime, ULONG timeout)
Definition: threadpool.c:632
NTSTATUS WINAPI RtlSetIoCompletionCallback(HANDLE FileHandle, PRTL_OVERLAPPED_COMPLETION_ROUTINE Function, ULONG Flags)
Definition: threadpool.c:594
static void set_thread_name(const WCHAR *name)
Definition: threadpool.c:473
static void CALLBACK ioqueue_thread_proc(void *param)
Definition: threadpool.c:1620
static struct threadpool_group * impl_from_TP_CLEANUP_GROUP(TP_CLEANUP_GROUP *group)
Definition: threadpool.c:425
VOID WINAPI TpCallbackUnloadDllOnCompletion(TP_CALLBACK_INSTANCE *instance, HMODULE module)
Definition: threadpool.c:2842
static void queue_move_timer(struct queue_timer *t, ULONGLONG time, BOOL set_event)
Definition: threadpool.c:715
HANDLE compl_port
Definition: threadpool.c:93
static void CALLBACK process_rtl_work_item(TP_CALLBACK_INSTANCE *instance, void *userdata)
Definition: threadpool.c:484
static void tp_group_shutdown(struct threadpool_group *group)
Definition: threadpool.c:1997
NTSTATUS WINAPI RtlCreateTimerQueue(PHANDLE NewTimerQueue)
Definition: threadpool.c:869
#define MAXIMUM_WAITQUEUE_OBJECTS
Definition: threadpool.c:145
HANDLE port
Definition: threadpool.c:371
LONG objcount
Definition: threadpool.c:291
static void queue_add_timer(struct queue_timer *t, ULONGLONG time, BOOL set_event)
Definition: threadpool.c:689
BOOL WINAPI TpSetPoolMinThreads(TP_POOL *pool, DWORD minimum)
Definition: threadpool.c:3089
static BOOL array_reserve(void **elements, unsigned int *capacity, unsigned int count, unsigned int size)
Definition: threadpool.c:446
static struct list * threadpool_get_next_item(const struct threadpool *pool)
Definition: threadpool.c:2297
static void tp_object_initialize(struct threadpool_object *object, struct threadpool *pool, PVOID userdata, TP_CALLBACK_ENVIRON *environment)
Definition: threadpool.c:2031
static void tp_waitqueue_unlock(struct threadpool_object *wait)
Definition: threadpool.c:1598
static ULONG queue_get_timeout(struct timer_queue *q)
Definition: threadpool.c:768
static NTSTATUS tp_waitqueue_lock(struct threadpool_object *wait)
Definition: threadpool.c:1520
static BOOL tp_group_release(struct threadpool_group *group)
Definition: threadpool.c:2007
threadpool_objtype
Definition: threadpool.c:167
@ TP_OBJECT_TYPE_SIMPLE
Definition: threadpool.c:168
@ TP_OBJECT_TYPE_WORK
Definition: threadpool.c:169
@ TP_OBJECT_TYPE_TIMER
Definition: threadpool.c:170
@ TP_OBJECT_TYPE_IO
Definition: threadpool.c:172
@ TP_OBJECT_TYPE_WAIT
Definition: threadpool.c:171
static struct threadpool * default_threadpool
Definition: threadpool.c:444
static ULONGLONG queue_current_time(void)
Definition: threadpool.c:682
static DWORD WINAPI timer_callback_wrapper(LPVOID p)
Definition: threadpool.c:674
static NTSTATUS tp_threadpool_alloc(struct threadpool **out)
Definition: threadpool.c:1787
NTSTATUS WINAPI RtlDeleteTimer(HANDLE TimerQueue, HANDLE Timer, HANDLE CompletionEvent)
Definition: threadpool.c:1103
static void tp_object_submit(struct threadpool_object *object, BOOL signaled)
Definition: threadpool.c:2124
static NTSTATUS tp_new_worker_thread(struct threadpool *pool)
Definition: threadpool.c:1247
static BOOL tp_threadpool_release(struct threadpool *pool)
Definition: threadpool.c:1850
struct list pending_timers
Definition: threadpool.c:293
static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug
Definition: threadpool.c:285
static struct threadpool_object * impl_from_TP_WORK(TP_WORK *work)
Definition: threadpool.c:397
CRITICAL_SECTION cs
Definition: threadpool.c:290
static struct @5259 timerqueue
#define TIMER_QUEUE_MAGIC
Definition: threadpool.c:85
static struct threadpool_object * impl_from_TP_WAIT(TP_WAIT *wait)
Definition: threadpool.c:411
static BOOL object_is_finished(struct threadpool_object *object, BOOL group)
Definition: threadpool.c:2189
#define THREADPOOL_WORKER_TIMEOUT
Definition: threadpool.c:144
VOID WINAPI TpCallbackSetEventOnCompletion(TP_CALLBACK_INSTANCE *instance, HANDLE event)
Definition: threadpool.c:2829
struct list buckets
Definition: threadpool.c:329
#define EXPIRE_NEVER
Definition: threadpool.c:84
static void tp_ioqueue_unlock(struct threadpool_object *io)
Definition: threadpool.c:2223
static void CALLBACK threadpool_worker_proc(void *param)
Definition: threadpool.c:2476
static void tp_threadpool_unlock(struct threadpool *pool)
Definition: threadpool.c:1952
static void WINAPI timer_queue_thread_proc(LPVOID p)
Definition: threadpool.c:793
static NTSTATUS tp_group_alloc(struct threadpool_group **out)
Definition: threadpool.c:1965
VOID WINAPI TpCallbackReleaseMutexOnCompletion(TP_CALLBACK_INSTANCE *instance, HANDLE mutex)
Definition: threadpool.c:2800
static struct timer_queue * get_timer_queue(HANDLE TimerQueue)
Definition: threadpool.c:959
LONG num_buckets
Definition: threadpool.c:328
static void queue_destroy_timer(struct queue_timer *t)
Definition: threadpool.c:842
static struct threadpool_instance * impl_from_TP_CALLBACK_INSTANCE(TP_CALLBACK_INSTANCE *instance)
Definition: threadpool.c:430
static struct @5258 old_threadpool
static DWORD CALLBACK iocp_poller(LPVOID Arg)
Definition: threadpool.c:544
static struct threadpool_object * impl_from_TP_TIMER(TP_TIMER *timer)
Definition: threadpool.c:404
static NTSTATUS tp_alloc_wait(TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata, TP_CALLBACK_ENVIRON *environment, DWORD flags)
Definition: threadpool.c:2643
RTL_CONDITION_VARIABLE update_event
Definition: threadpool.c:294
static struct threadpool_object * impl_from_TP_IO(TP_IO *io)
Definition: threadpool.c:418
NTSTATUS WINAPI RtlDeleteTimerQueueEx(HANDLE TimerQueue, HANDLE CompletionEvent)
Definition: threadpool.c:915
static NTSTATUS tp_ioqueue_lock(struct threadpool_object *io, HANDLE file)
Definition: threadpool.c:1729
static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug
Definition: threadpool.c:88
static void CALLBACK timerqueue_thread_proc(void *param)
Definition: threadpool.c:1149
static void tp_object_execute(struct threadpool_object *object, BOOL wait_thread)
Definition: threadpool.c:2317
static void tp_timerqueue_unlock(struct threadpool_object *timer)
Definition: threadpool.c:1312
static void CALLBACK waitqueue_thread_proc(void *param)
Definition: threadpool.c:1344
static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug
Definition: threadpool.c:363
static void tp_object_cancel(struct threadpool_object *object)
Definition: threadpool.c:2163
NTSTATUS WINAPI RtlUpdateTimer(HANDLE TimerQueue, HANDLE Timer, DWORD DueTime, DWORD Period)
Definition: threadpool.c:1067
BOOL thread_running
Definition: threadpool.c:292
static void timer_cleanup_callback(struct queue_timer *t)
Definition: threadpool.c:660
VOID WINAPI TpCallbackLeaveCriticalSectionOnCompletion(TP_CALLBACK_INSTANCE *instance, CRITICAL_SECTION *crit)
Definition: threadpool.c:2745
static void queue_remove_timer(struct queue_timer *t)
Definition: threadpool.c:641
static struct threadpool * impl_from_TP_POOL(TP_POOL *pool)
Definition: threadpool.c:392
static void tp_threadpool_shutdown(struct threadpool *pool)
Definition: threadpool.c:1837
static void queue_timer_expire(struct timer_queue *q)
Definition: threadpool.c:723
static struct @5261 ioqueue
static void tp_object_prepare_shutdown(struct threadpool_object *object)
Definition: threadpool.c:2242
static NTSTATUS tp_timerqueue_lock(struct threadpool_object *timer)
Definition: threadpool.c:1270
static NTSTATUS tp_threadpool_lock(struct threadpool **out, TP_CALLBACK_ENVIRON *environment)
Definition: threadpool.c:1879
static void CALLBACK rtl_wait_callback(TP_CALLBACK_INSTANCE *instance, void *userdata, TP_WAIT *wait, TP_WAIT_RESULT result)
Definition: threadpool.c:3386
static void tp_object_prio_queue(struct threadpool_object *object)
Definition: threadpool.c:2112
NTSTATUS NTAPI NtReleaseSemaphore(IN HANDLE SemaphoreHandle, IN LONG ReleaseCount, OUT PLONG PreviousCount OPTIONAL)
Definition: sem.c:295
#define STATUS_SUCCESS
Definition: shellext.h:65
namespace GUID const ADDRINFOEXW ADDRINFOEXW struct timeval OVERLAPPED * overlapped
Definition: sock.c:81
INT WSAAPI shutdown(IN SOCKET s, IN INT how)
Definition: sockctrl.c:506
#define TRACE(s)
Definition: solgame.cpp:4
IMAGE_OPTIONAL_HEADER32 OptionalHeader
Definition: ntddk_ex.h:184
PVOID ImageBaseAddress
Definition: ntddk_ex.h:245
UNICODE_STRING ThreadName
Definition: winternl.h:2361
PTP_CLEANUP_GROUP CleanupGroup
Definition: winnt_old.h:4695
PTP_SIMPLE_CALLBACK FinalizationCallback
Definition: winnt_old.h:4699
struct _TP_CALLBACK_ENVIRON_V3::@4475::@4476 s
TP_CALLBACK_PRIORITY CallbackPriority
Definition: winnt_old.h:4708
PTP_CLEANUP_GROUP_CANCEL_CALLBACK CleanupGroupCancelCallback
Definition: winnt_old.h:4696
struct _ACTIVATION_CONTEXT * ActivationContext
Definition: winnt_old.h:4698
union _TP_CALLBACK_ENVIRON_V3::@4475 u
Definition: http.c:7252
Definition: fci.c:127
IO_STATUS_BLOCK iosb
Definition: threadpool.c:177
ULONG_PTR cvalue
Definition: threadpool.c:178
Definition: copy.c:22
Definition: list.h:15
Definition: module.h:456
Definition: name.c:39
BOOL destroy
Definition: threadpool.c:126
RTL_WAITORTIMERCALLBACKFUNC callback
Definition: threadpool.c:121
DWORD period
Definition: threadpool.c:123
ULONG runcount
Definition: threadpool.c:120
struct timer_queue * q
Definition: threadpool.c:118
PVOID param
Definition: threadpool.c:122
HANDLE event
Definition: threadpool.c:127
ULONG flags
Definition: threadpool.c:124
ULONGLONG expire
Definition: threadpool.c:125
struct list entry
Definition: threadpool.c:119
PVOID context
Definition: threadpool.c:81
PRTL_WORK_ITEM_ROUTINE function
Definition: threadpool.c:80
Definition: ps.c:97
struct list members
Definition: threadpool.c:280
CRITICAL_SECTION cs
Definition: threadpool.c:278
CRITICAL_SECTION * critical_section
Definition: threadpool.c:264
struct threadpool_instance::@5268 cleanup
struct threadpool_object * object
Definition: threadpool.c:258
struct list wait_entry
Definition: threadpool.c:238
PTP_WAIT_CALLBACK callback
Definition: threadpool.c:233
PTP_SIMPLE_CALLBACK callback
Definition: threadpool.c:213
PTP_CLEANUP_GROUP_CANCEL_CALLBACK group_cancel_callback
Definition: threadpool.c:192
RTL_CONDITION_VARIABLE finished_event
Definition: threadpool.c:202
TP_CALLBACK_PRIORITY priority
Definition: threadpool.c:196
struct threadpool_object::@5262::@5266 wait
ULONGLONG timeout
Definition: threadpool.c:227
struct io_completion * completions
Definition: threadpool.c:250
struct threadpool_object::@5262::@5264 work
PTP_IO_CALLBACK callback
Definition: threadpool.c:246
struct threadpool * pool
Definition: threadpool.c:189
unsigned int skipped_count
Definition: threadpool.c:248
struct list group_entry
Definition: threadpool.c:198
enum threadpool_objtype type
Definition: threadpool.c:188
void * win32_callback
Definition: threadpool.c:184
struct threadpool_object::@5262::@5263 simple
struct threadpool_object::@5262::@5265 timer
unsigned int completion_count
Definition: threadpool.c:248
unsigned int completion_max
Definition: threadpool.c:248
union threadpool_object::@5262 u
unsigned int pending_count
Definition: threadpool.c:248
struct list pool_entry
Definition: threadpool.c:201
LONG num_associated_callbacks
Definition: threadpool.c:207
RTL_WAITORTIMERCALLBACKFUNC rtl_callback
Definition: threadpool.c:242
LONG num_running_callbacks
Definition: threadpool.c:206
LONG num_pending_callbacks
Definition: threadpool.c:205
PTP_WORK_CALLBACK callback
Definition: threadpool.c:217
RTL_CONDITION_VARIABLE group_finished_event
Definition: threadpool.c:203
PTP_SIMPLE_CALLBACK finalization_callback
Definition: threadpool.c:193
struct list timer_entry
Definition: threadpool.c:225
struct waitqueue_bucket * bucket
Definition: threadpool.c:236
HANDLE completed_event
Definition: threadpool.c:204
PTP_TIMER_CALLBACK callback
Definition: threadpool.c:221
struct threadpool_group * group
Definition: threadpool.c:190
struct threadpool_object::@5262::@5267 io
int max_workers
Definition: threadpool.c:158
TP_POOL_STACK_INFORMATION stack_info
Definition: threadpool.c:163
CRITICAL_SECTION cs
Definition: threadpool.c:153
RTL_CONDITION_VARIABLE update_event
Definition: threadpool.c:156
int num_workers
Definition: threadpool.c:160
LONG objcount
Definition: threadpool.c:151
int min_workers
Definition: threadpool.c:159
struct list pools[3]
Definition: threadpool.c:155
LONG refcount
Definition: threadpool.c:150
BOOL shutdown
Definition: threadpool.c:152
int num_busy_workers
Definition: threadpool.c:161
HANDLE compl_port
Definition: threadpool.c:162
Definition: dhcpd.h:245
HANDLE event
Definition: threadpool.c:136
RTL_CRITICAL_SECTION cs
Definition: threadpool.c:133
HANDLE thread
Definition: threadpool.c:137
DWORD magic
Definition: threadpool.c:132
struct list timers
Definition: threadpool.c:134
struct list waiting
Definition: threadpool.c:356
HANDLE update_event
Definition: threadpool.c:357
struct list bucket_entry
Definition: threadpool.c:353
struct list reserved
Definition: threadpool.c:355
#define max(a, b)
Definition: svc.c:63
#define LIST_INIT(head)
Definition: queue.h:197
#define LIST_ENTRY(type)
Definition: queue.h:175
#define DWORD_PTR
Definition: treelist.c:76
uint32_t * PULONG_PTR
Definition: typedefs.h:65
int64_t LONGLONG
Definition: typedefs.h:68
#define NTAPI
Definition: typedefs.h:36
ULONG_PTR SIZE_T
Definition: typedefs.h:80
uint32_t ULONG_PTR
Definition: typedefs.h:65
uint32_t ULONG
Definition: typedefs.h:59
uint64_t ULONGLONG
Definition: typedefs.h:67
#define STATUS_INVALID_PARAMETER
Definition: udferr_usr.h:135
#define STATUS_UNSUCCESSFUL
Definition: udferr_usr.h:132
static EFI_HANDLE * handles
Definition: uefidisk.c:62
LONGLONG QuadPart
Definition: typedefs.h:114
Definition: pdh_main.c:96
wchar_t tm const _CrtWcstime_Writes_and_advances_ptr_ count wchar_t ** out
Definition: wcsftime.cpp:383
_In_ WDFINTERRUPT _In_ PFN_WDF_INTERRUPT_SYNCHRONIZE Callback
Definition: wdfinterrupt.h:458
_In_ WDFTIMER _In_ LONGLONG DueTime
Definition: wdftimer.h:190
HANDLE WINAPI GetCurrentThread(void)
Definition: proc.c:1148
DWORD WINAPI GetCurrentThreadId(void)
Definition: thread.c:459
#define WAIT_OBJECT_0
Definition: winbase.h:439
#define WINAPI
Definition: msvc.h:6
#define WT_TRANSFER_IMPERSONATION
Definition: winnt_old.h:1089
@ TP_CALLBACK_PRIORITY_NORMAL
Definition: winnt_old.h:4658
@ TP_CALLBACK_PRIORITY_HIGH
Definition: winnt_old.h:4657
@ TP_CALLBACK_PRIORITY_LOW
Definition: winnt_old.h:4659
struct _TP_CLEANUP_GROUP TP_CLEANUP_GROUP
Definition: winnt_old.h:4676
VOID(NTAPI * PTP_WAIT_CALLBACK)(PTP_CALLBACK_INSTANCE, PVOID, PTP_WAIT, TP_WAIT_RESULT)
Definition: winnt_old.h:4689
#define WT_EXECUTEINPERSISTENTTHREAD
Definition: winnt_old.h:1088
struct _TP_CALLBACK_INSTANCE * PTP_CALLBACK_INSTANCE
Definition: winnt_old.h:4648
#define WT_EXECUTEONLYONCE
Definition: winnt_old.h:1084
struct _TP_POOL TP_POOL
Definition: winnt_old.h:4646
#define WT_EXECUTEINIOTHREAD
Definition: winnt_old.h:1081
DWORD TP_WAIT_RESULT
Definition: winnt_old.h:4653
#define RTL_CRITICAL_SECTION_FLAG_FORCE_DEBUG_INFO
Definition: winnt_old.h:1134
#define WT_EXECUTELONGFUNCTION
Definition: winnt_old.h:1085
#define WT_EXECUTEINWAITTHREAD
Definition: winnt_old.h:1083
VOID(NTAPI * PTP_CLEANUP_GROUP_CANCEL_CALLBACK)(_Inout_opt_ PVOID ObjectContext, _Inout_opt_ PVOID CleanupContext)
Definition: winnt_old.h:4684
VOID(NTAPI * PTP_WORK_CALLBACK)(_Inout_ PTP_CALLBACK_INSTANCE Instance, _Inout_opt_ PVOID Context, _Inout_ PTP_WORK Work)
Definition: winnt_old.h:4671
#define WT_EXECUTEINTIMERTHREAD
Definition: winnt_old.h:1086
struct _TP_WAIT TP_WAIT
Definition: winnt_old.h:4650
struct _TP_IO * PTP_IO
Definition: winnt_old.h:4651
enum _TP_CALLBACK_PRIORITY TP_CALLBACK_PRIORITY
struct _TP_WORK TP_WORK
Definition: winnt_old.h:4647
struct _TP_IO TP_IO
Definition: winnt_old.h:4651
struct _TP_TIMER TP_TIMER
Definition: winnt_old.h:4649
VOID(NTAPI * PTP_SIMPLE_CALLBACK)(_Inout_ PTP_CALLBACK_INSTANCE Instance, _Inout_opt_ PVOID Context)
Definition: winnt_old.h:4679
VOID(NTAPI * PTP_TIMER_CALLBACK)(PTP_CALLBACK_INSTANCE, PVOID, PTP_TIMER)
Definition: winnt_old.h:4688
struct _TP_CALLBACK_INSTANCE TP_CALLBACK_INSTANCE
Definition: winnt_old.h:4648
#define WT_EXECUTEDEFAULT
Definition: winnt_old.h:1080
_Must_inspect_result_ _In_ ULONG Flags
Definition: wsk.h:170
_In_ LARGE_INTEGER _In_ ULONG Period
Definition: kefuncs.h:1313
#define RTL_CONDITION_VARIABLE_INIT
Definition: rtltypes.h:282
_Inout_opt_ PVOID Parameter
Definition: rtltypes.h:336
__wchar_t WCHAR
Definition: xmlstorage.h:180