ReactOS 0.4.16-dev-1946-g52006dd
threadpool.c
Go to the documentation of this file.
1/*
2 * Thread pooling
3 *
4 * Copyright (c) 2006 Robert Shearman
5 * Copyright (c) 2014-2016 Sebastian Lackner
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
20 */
21
22
23#ifdef __REACTOS__
24#include <rtl_vista.h>
25#define NDEBUG
26#include "wine/list.h"
27#include <debug.h>
28
29#define ERR(fmt, ...) DPRINT1(fmt, ##__VA_ARGS__)
30#define FIXME(fmt, ...) DPRINT(fmt, ##__VA_ARGS__)
31#define WARN(fmt, ...) DPRINT(fmt, ##__VA_ARGS__)
32#define TRACE(fmt, ...) DPRINT(fmt, ##__VA_ARGS__)
33#ifndef ARRAY_SIZE
34#define ARRAY_SIZE(_x) (sizeof((_x))/sizeof((_x)[0]))
35#endif
36
42
45#define PRTL_WORK_ITEM_ROUTINE WORKERCALLBACKFUNC
46
47#define CRITICAL_SECTION RTL_CRITICAL_SECTION
48#define GetProcessHeap() RtlGetProcessHeap()
49#define GetCurrentProcess() NtCurrentProcess()
50#define GetCurrentThread() NtCurrentThread()
51#define GetCurrentThreadId() HandleToULong(NtCurrentTeb()->ClientId.UniqueThread)
52#else
53#include <assert.h>
54#include <stdarg.h>
55#include <limits.h>
56
57#include "ntstatus.h"
58#define WIN32_NO_STATUS
59#include "winternl.h"
60
61#include "wine/debug.h"
62#include "wine/list.h"
63
64#include "ntdll_misc.h"
65
67#endif
68
69/*
70 * Old thread pooling API
71 */
72
74{
77};
78
79#define EXPIRE_NEVER (~(ULONGLONG)0)
80#define TIMER_QUEUE_MAGIC 0x516d6954 /* TimQ */
81
82#ifndef __REACTOS__
84#endif
85
86static struct
87{
90}
92{
93 NULL, /* compl_port */
94#ifdef __REACTOS__
95 {0}, /* threadpool_compl_cs */
96#else
97 { &critsect_compl_debug, -1, 0, 0, 0, 0 }, /* threadpool_compl_cs */
98#endif
99};
100
101#ifndef __REACTOS__
103{
104 0, 0, &old_threadpool.threadpool_compl_cs,
106 0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_compl_cs") }
107};
108#endif
109
110struct timer_queue;
112{
113 struct timer_queue *q;
114 struct list entry;
115 ULONG runcount; /* number of callbacks pending execution */
121 BOOL destroy; /* timer should be deleted; once set, never unset */
122 HANDLE event; /* removal event */
123};
124
126{
129 struct list timers; /* sorted by expiration time */
130 BOOL quit; /* queue should be deleted; once set, never unset */
133};
134
135/*
136 * Object-oriented thread pooling API
137 */
138
139#define THREADPOOL_WORKER_TIMEOUT 5000
140#define MAXIMUM_WAITQUEUE_OBJECTS (MAXIMUM_WAIT_OBJECTS - 1)
141
142/* internal threadpool representation */
144{
149 /* Pools of work items, locked via .cs, order matches TP_CALLBACK_PRIORITY - high, normal, low. */
150 struct list pools[3];
152 /* information about worker threads, locked via .cs */
159};
160
162{
168};
169
171{
174};
175
176/* internal threadpool object representation */
178{
179 void *win32_callback; /* leave space for kernelbase to store win32 callback */
182 /* read-only information */
192 /* information about the group, locked via .group->cs */
195 /* information about the pool, locked via .pool->cs */
203 /* arguments for callback */
204 union
205 {
206 struct
207 {
210 struct
211 {
214 struct
215 {
217 /* information about the timer, locked via timerqueue.cs */
226 struct
227 {
230 /* information about the wait object, locked via waitqueue.cs */
239 struct
240 {
242 /* locked via .pool->cs */
246 } io;
247 } u;
248};
249
250/* internal threadpool instance representation */
252{
257 struct
258 {
266};
267
268/* internal threadpool group representation */
270{
274 /* list of group members, locked via .cs */
275 struct list members;
276};
277
278#ifndef __REACTOS__
279/* global timerqueue object */
281#endif
282
283static struct
284{
290}
292{
293#ifdef __REACTOS__
294 {0}, /* cs */
295#else
296 { &timerqueue_debug, -1, 0, 0, 0, 0 }, /* cs */
297#endif
298 0, /* objcount */
299 FALSE, /* thread_running */
300 LIST_INIT( timerqueue.pending_timers ), /* pending_timers */
301#if __REACTOS__
302 0,
303#else
304 RTL_CONDITION_VARIABLE_INIT /* update_event */
305#endif
307
308#ifndef __REACTOS__
310{
311 0, 0, &timerqueue.cs,
313 0, 0, { (DWORD_PTR)(__FILE__ ": timerqueue.cs") }
314};
315
316/* global waitqueue object */
318#endif
319
320static struct
321{
324 struct list buckets;
325}
326waitqueue =
327{
328#ifdef __REACTOS__
329 {0}, /* cs */
330#else
331 { &waitqueue_debug, -1, 0, 0, 0, 0 }, /* cs */
332#endif
333 0, /* num_buckets */
334 LIST_INIT( waitqueue.buckets ) /* buckets */
336
337#ifndef __REACTOS__
339{
340 0, 0, &waitqueue.cs,
342 0, 0, { (DWORD_PTR)(__FILE__ ": waitqueue.cs") }
343};
344#endif
345
347{
351 struct list waiting;
354};
355
356#ifndef __REACTOS__
357/* global I/O completion queue object */
359#endif
360
361static struct
362{
368}
369ioqueue =
370{
371#ifdef __REACTOS__
372 .cs = {0},
373#else
374 .cs = { &ioqueue_debug, -1, 0, 0, 0, 0 },
375#endif
377
378#ifndef __REACTOS__
380{
381 0, 0, &ioqueue.cs,
383 0, 0, { (DWORD_PTR)(__FILE__ ": ioqueue.cs") }
384};
385#endif
386
387static inline struct threadpool *impl_from_TP_POOL( TP_POOL *pool )
388{
389 return (struct threadpool *)pool;
390}
391
393{
394 struct threadpool_object *object = (struct threadpool_object *)work;
396 return object;
397}
398
400{
401 struct threadpool_object *object = (struct threadpool_object *)timer;
403 return object;
404}
405
407{
408 struct threadpool_object *object = (struct threadpool_object *)wait;
410 return object;
411}
412
413static inline struct threadpool_object *impl_from_TP_IO( TP_IO *io )
414{
415 struct threadpool_object *object = (struct threadpool_object *)io;
416 assert( object->type == TP_OBJECT_TYPE_IO );
417 return object;
418}
419
421{
422 return (struct threadpool_group *)group;
423}
424
426{
427 return (struct threadpool_instance *)instance;
428}
429
430#ifdef __REACTOS__
432#else
433static void CALLBACK threadpool_worker_proc( void *param );
434#endif
435static void tp_object_submit( struct threadpool_object *object, BOOL signaled );
436static void tp_object_execute( struct threadpool_object *object, BOOL wait_thread );
437static void tp_object_prepare_shutdown( struct threadpool_object *object );
438static BOOL tp_object_release( struct threadpool_object *object );
440
441static BOOL array_reserve(void **elements, unsigned int *capacity, unsigned int count, unsigned int size)
442{
443 unsigned int new_capacity, max_capacity;
444 void *new_elements;
445
446 if (count <= *capacity)
447 return TRUE;
448
449 max_capacity = ~(SIZE_T)0 / size;
450 if (count > max_capacity)
451 return FALSE;
452
453 new_capacity = max(4, *capacity);
454 while (new_capacity < count && new_capacity <= max_capacity / 2)
455 new_capacity *= 2;
456 if (new_capacity < count)
457 new_capacity = max_capacity;
458
459 if (!(new_elements = RtlReAllocateHeap( GetProcessHeap(), 0, *elements, new_capacity * size )))
460 return FALSE;
461
462 *elements = new_elements;
463 *capacity = new_capacity;
464
465 return TRUE;
466}
467
468static void set_thread_name(const WCHAR *name)
469{
470#ifndef __REACTOS__ // This is impossible on non vista+
472
473 RtlInitUnicodeString(&info.ThreadName, name);
475#endif
476}
477
478#ifndef __REACTOS__
480{
481 struct rtl_work_item *item = userdata;
482
483 TRACE("executing %p(%p)\n", item->function, item->context);
484 item->function( item->context );
485
487}
488
489/***********************************************************************
490 * RtlQueueWorkItem (NTDLL.@)
491 *
492 * Queues a work item into a thread in the thread pool.
493 *
494 * PARAMS
495 * function [I] Work function to execute.
496 * context [I] Context to pass to the work function when it is executed.
497 * flags [I] Flags. See notes.
498 *
499 * RETURNS
500 * Success: STATUS_SUCCESS.
501 * Failure: Any NTSTATUS code.
502 *
503 * NOTES
504 * Flags can be one or more of the following:
505 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
506 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
507 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
508 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
509 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
510 */
512{
513 TP_CALLBACK_ENVIRON environment;
514 struct rtl_work_item *item;
516
517 TRACE( "%p %p %lu\n", function, context, flags );
518
519 item = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*item) );
520 if (!item)
521 return STATUS_NO_MEMORY;
522
523 memset( &environment, 0, sizeof(environment) );
524 environment.Version = 1;
525 environment.u.s.LongFunction = (flags & WT_EXECUTELONGFUNCTION) != 0;
526 environment.u.s.Persistent = (flags & WT_EXECUTEINPERSISTENTTHREAD) != 0;
527
528 item->function = function;
529 item->context = context;
530
533 return status;
534}
535
536/***********************************************************************
537 * iocp_poller - get completion events and run callbacks
538 */
540{
541 HANDLE cport = Arg;
542
543 while( TRUE )
544 {
548#ifdef __REACTOS__
550#else
552#endif
553 if (res)
554 {
555 ERR("NtRemoveIoCompletion failed: 0x%lx\n", res);
556 }
557 else
558 {
559 DWORD transferred = 0;
560 DWORD err = 0;
561
563 transferred = iosb.Information;
564 else
566
567 callback( err, transferred, overlapped );
568 }
569 }
570 return 0;
571}
572
573/***********************************************************************
574 * RtlSetIoCompletionCallback (NTDLL.@)
575 *
576 * Binds a handle to a thread pool's completion port, and possibly
577 * starts a non-I/O thread to monitor this port and call functions back.
578 *
579 * PARAMS
580 * FileHandle [I] Handle to bind to a completion port.
581 * Function [I] Callback function to call on I/O completions.
582 * Flags [I] Not used.
583 *
584 * RETURNS
585 * Success: STATUS_SUCCESS.
586 * Failure: Any NTSTATUS code.
587 *
588 */
590{
593
594 if (Flags) FIXME("Unknown value Flags=0x%lx\n", Flags);
595
596 if (!old_threadpool.compl_port)
597 {
599
600 RtlEnterCriticalSection(&old_threadpool.threadpool_compl_cs);
601 if (!old_threadpool.compl_port)
602 {
603 HANDLE cport;
604
606 if (!res)
607 {
608 /* FIXME native can start additional threads in case of e.g. hung callback function. */
610 if (!res)
611 old_threadpool.compl_port = cport;
612 else
613 NtClose( cport );
614 }
615 }
616 RtlLeaveCriticalSection(&old_threadpool.threadpool_compl_cs);
617 if (res) return res;
618 }
619
620 info.CompletionPort = old_threadpool.compl_port;
621 info.CompletionKey = (ULONG_PTR)Function;
622
624}
625#endif
626
628{
629 if (timeout == INFINITE) return NULL;
630 pTime->QuadPart = (ULONGLONG)timeout * -10000;
631 return pTime;
632}
633
634/************************** Timer Queue Impl **************************/
635
636static void queue_remove_timer(struct queue_timer *t)
637{
638 /* We MUST hold the queue cs while calling this function. This ensures
639 that we cannot queue another callback for this timer. The runcount
640 being zero makes sure we don't have any already queued. */
641 struct timer_queue *q = t->q;
642
643 assert(t->runcount == 0);
644 assert(t->destroy);
645
646 list_remove(&t->entry);
647 if (t->event)
648 NtSetEvent(t->event, NULL);
650
651 if (q->quit && list_empty(&q->timers))
652 NtSetEvent(q->event, NULL);
653}
654
656{
657 struct timer_queue *q = t->q;
659
660 assert(0 < t->runcount);
661 --t->runcount;
662
663 if (t->destroy && t->runcount == 0)
665
667}
668
670{
671 struct queue_timer *t = p;
672 t->callback(t->param, TRUE);
674 return 0;
675}
676
677static inline ULONGLONG queue_current_time(void)
678{
679 LARGE_INTEGER now, freq;
681 return now.QuadPart * 1000 / freq.QuadPart;
682}
683
686{
687 /* We MUST hold the queue cs while calling this function. */
688 struct timer_queue *q = t->q;
689 struct list *ptr = &q->timers;
690
691 assert(!q->quit || (t->destroy && time == EXPIRE_NEVER));
692
693 if (time != EXPIRE_NEVER)
694 LIST_FOR_EACH(ptr, &q->timers)
695 {
696 struct queue_timer *cur = LIST_ENTRY(ptr, struct queue_timer, entry);
697 if (time < cur->expire)
698 break;
699 }
700 list_add_before(ptr, &t->entry);
701
702 t->expire = time;
703
704 /* If we insert at the head of the list, we need to expire sooner
705 than expected. */
706 if (set_event && &t->entry == list_head(&q->timers))
707 NtSetEvent(q->event, NULL);
708}
709
710static inline void queue_move_timer(struct queue_timer *t, ULONGLONG time,
712{
713 /* We MUST hold the queue cs while calling this function. */
714 list_remove(&t->entry);
716}
717
718static void queue_timer_expire(struct timer_queue *q)
719{
720 struct queue_timer *t = NULL;
721
723 if (list_head(&q->timers))
724 {
726 t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
727 if (!t->destroy && t->expire <= ((now = queue_current_time())))
728 {
729 ++t->runcount;
730 if (t->period)
731 {
732 next = t->expire + t->period;
733 /* avoid trigger cascade if overloaded / hibernated */
734 if (next < now)
735 next = now + t->period;
736 }
737 else
740 }
741 else
742 t = NULL;
743 }
745
746 if (t)
747 {
748 if (t->flags & WT_EXECUTEINTIMERTHREAD)
750 else
751 {
753 = (t->flags
757 if (status != STATUS_SUCCESS)
759 }
760 }
761}
762
764{
765 struct queue_timer *t;
767
769 if (list_head(&q->timers))
770 {
771 t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
772 assert(!t->destroy || t->expire == EXPIRE_NEVER);
773
774 if (t->expire != EXPIRE_NEVER)
775 {
777 timeout = t->expire < time ? 0 : t->expire - time;
778 }
779 }
781
782 return timeout;
783}
784
785#ifdef __REACTOS__
787#else
789#endif
790{
791 struct timer_queue *q = p;
792 ULONG timeout_ms;
793
794 set_thread_name(L"wine_threadpool_timer_queue");
795 timeout_ms = INFINITE;
796 for (;;)
797 {
800 BOOL done = FALSE;
801
803 q->event, FALSE, get_nt_timeout(&timeout, timeout_ms));
804
805 if (status == STATUS_WAIT_0)
806 {
807 /* There are two possible ways to trigger the event. Either
808 we are quitting and the last timer got removed, or a new
809 timer got put at the head of the list so we need to adjust
810 our timeout. */
812 if (q->quit && list_empty(&q->timers))
813 done = TRUE;
815 }
816 else if (status == STATUS_TIMEOUT)
818
819 if (done)
820 break;
821
822 timeout_ms = queue_get_timeout(q);
823 }
824
825 NtClose(q->event);
827 q->magic = 0;
830#ifdef __REACTOS__
831 return STATUS_SUCCESS;
832#endif
833}
834
835#ifndef __REACTOS__
836
838{
839 /* We MUST hold the queue cs while calling this function. */
840 t->destroy = TRUE;
841 if (t->runcount == 0)
842 /* Ensure a timer is promptly removed. If callbacks are pending,
843 it will be removed after the last one finishes by the callback
844 cleanup wrapper. */
846 else
847 /* Make sure no destroyed timer masks an active timer at the head
848 of the sorted list. */
850}
851
852/***********************************************************************
853 * RtlCreateTimerQueue (NTDLL.@)
854 *
855 * Creates a timer queue object and returns a handle to it.
856 *
857 * PARAMS
858 * NewTimerQueue [O] The newly created queue.
859 *
860 * RETURNS
861 * Success: STATUS_SUCCESS.
862 * Failure: Any NTSTATUS code.
863 */
865{
867 struct timer_queue *q = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *q);
868 if (!q)
869 return STATUS_NO_MEMORY;
870
872 list_init(&q->timers);
873 q->quit = FALSE;
874 q->magic = TIMER_QUEUE_MAGIC;
876 if (status != STATUS_SUCCESS)
877 {
879 return status;
880 }
882 timer_queue_thread_proc, q, &q->thread, NULL);
883 if (status != STATUS_SUCCESS)
884 {
885 NtClose(q->event);
887 return status;
888 }
889
890 *NewTimerQueue = q;
891 return STATUS_SUCCESS;
892}
893
894/***********************************************************************
895 * RtlDeleteTimerQueueEx (NTDLL.@)
896 *
897 * Deletes a timer queue object.
898 *
899 * PARAMS
900 * TimerQueue [I] The timer queue to destroy.
901 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
902 * wait until all timers are finished firing before
903 * returning. Otherwise, return immediately and set the
904 * event when all timers are done.
905 *
906 * RETURNS
907 * Success: STATUS_SUCCESS if synchronous, STATUS_PENDING if not.
908 * Failure: Any NTSTATUS code.
909 */
911{
912 struct timer_queue *q = TimerQueue;
913 struct queue_timer *t, *temp;
916
917 if (!q || q->magic != TIMER_QUEUE_MAGIC)
919
920 thread = q->thread;
921
923 q->quit = TRUE;
924 if (list_head(&q->timers))
925 /* When the last timer is removed, it will signal the timer thread to
926 exit... */
927 LIST_FOR_EACH_ENTRY_SAFE(t, temp, &q->timers, struct queue_timer, entry)
929 else
930 /* However if we have none, we must do it ourselves. */
931 NtSetEvent(q->event, NULL);
933
934 if (CompletionEvent == INVALID_HANDLE_VALUE)
935 {
938 }
939 else
940 {
941 if (CompletionEvent)
942 {
943 FIXME("asynchronous return on completion event unimplemented\n");
945 NtSetEvent(CompletionEvent, NULL);
946 }
948 }
949
951 return status;
952}
953
954static struct timer_queue *get_timer_queue(HANDLE TimerQueue)
955{
956 static struct timer_queue *default_timer_queue;
957
958 if (TimerQueue)
959 return TimerQueue;
960 else
961 {
962 if (!default_timer_queue)
963 {
964 HANDLE q;
966 if (status == STATUS_SUCCESS)
967 {
968 PVOID p = InterlockedCompareExchangePointer( (void **) &default_timer_queue, q, NULL );
969 if (p)
970 /* Got beat to the punch. */
972 }
973 }
974 return default_timer_queue;
975 }
976}
977
978/***********************************************************************
979 * RtlCreateTimer (NTDLL.@)
980 *
981 * Creates a new timer associated with the given queue.
982 *
983 * PARAMS
984 * TimerQueue [I] The queue to hold the timer.
985 * NewTimer [O] The newly created timer.
986 * Callback [I] The callback to fire.
987 * Parameter [I] The argument for the callback.
988 * DueTime [I] The delay, in milliseconds, before first firing the
989 * timer.
990 * Period [I] The period, in milliseconds, at which to fire the timer
991 * after the first callback. If zero, the timer will only
992 * fire once. It still needs to be deleted with
993 * RtlDeleteTimer.
994 * Flags [I] Flags controlling the execution of the callback. In
995 * addition to the WT_* thread pool flags (see
996 * RtlQueueWorkItem), WT_EXECUTEINTIMERTHREAD and
997 * WT_EXECUTEONLYONCE are supported.
998 *
999 * RETURNS
1000 * Success: STATUS_SUCCESS.
1001 * Failure: Any NTSTATUS code.
1002 */
1006 ULONG Flags)
1007{
1009 struct queue_timer *t;
1010 struct timer_queue *q = get_timer_queue(TimerQueue);
1011
1012 if (!q) return STATUS_NO_MEMORY;
1013 if (q->magic != TIMER_QUEUE_MAGIC) return STATUS_INVALID_HANDLE;
1014
1015 t = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *t);
1016 if (!t)
1017 return STATUS_NO_MEMORY;
1018
1019 t->q = q;
1020 t->runcount = 0;
1021 t->callback = Callback;
1022 t->param = Parameter;
1023 t->period = Period;
1024 t->flags = Flags;
1025 t->destroy = FALSE;
1026 t->event = NULL;
1027
1030 if (q->quit)
1032 else
1035
1036 if (status == STATUS_SUCCESS)
1037 *NewTimer = t;
1038 else
1040
1041 return status;
1042}
1043
1044/***********************************************************************
1045 * RtlUpdateTimer (NTDLL.@)
1046 *
1047 * Changes the time at which a timer expires.
1048 *
1049 * PARAMS
1050 * TimerQueue [I] The queue that holds the timer.
1051 * Timer [I] The timer to update.
1052 * DueTime [I] The delay, in milliseconds, before next firing the timer.
1053 * Period [I] The period, in milliseconds, at which to fire the timer
1054 * after the first callback. If zero, the timer will not
1055 * refire once. It still needs to be deleted with
1056 * RtlDeleteTimer.
1057 *
1058 * RETURNS
1059 * Success: STATUS_SUCCESS.
1060 * Failure: Any NTSTATUS code.
1061 */
1064{
1065 struct queue_timer *t = Timer;
1066 struct timer_queue *q = t->q;
1067
1069 /* Can't change a timer if it was once-only or destroyed. */
1070 if (t->expire != EXPIRE_NEVER)
1071 {
1072 t->period = Period;
1074 }
1076
1077 return STATUS_SUCCESS;
1078}
1079
1080/***********************************************************************
1081 * RtlDeleteTimer (NTDLL.@)
1082 *
1083 * Cancels a timer-queue timer.
1084 *
1085 * PARAMS
1086 * TimerQueue [I] The queue that holds the timer.
1087 * Timer [I] The timer to update.
1088 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
1089 * wait until the timer is finished firing all pending
1090 * callbacks before returning. Otherwise, return
1091 * immediately and set the timer is done.
1092 *
1093 * RETURNS
1094 * Success: STATUS_SUCCESS if the timer is done, STATUS_PENDING if not,
1095 or if the completion event is NULL.
1096 * Failure: Any NTSTATUS code.
1097 */
1099 HANDLE CompletionEvent)
1100{
1101 struct queue_timer *t = Timer;
1102 struct timer_queue *q;
1104 HANDLE event = NULL;
1105
1106 if (!Timer)
1108 q = t->q;
1109 if (CompletionEvent == INVALID_HANDLE_VALUE)
1110 {
1112 if (status == STATUS_SUCCESS)
1114 }
1115 else if (CompletionEvent)
1116 event = CompletionEvent;
1117
1119 t->event = event;
1120 if (t->runcount == 0 && event)
1124
1125 if (CompletionEvent == INVALID_HANDLE_VALUE && event)
1126 {
1127 if (status == STATUS_PENDING)
1128 {
1131 }
1132 NtClose(event);
1133 }
1134
1135 return status;
1136}
1137#endif
1138/***********************************************************************
1139 * timerqueue_thread_proc (internal)
1140 */
1141#ifdef __REACTOS__
1143#else
1145#endif
1146{
1147 ULONGLONG timeout_lower, timeout_upper, new_timeout;
1148 struct threadpool_object *other_timer;
1150 struct list *ptr;
1151
1152 TRACE( "starting timer queue thread\n" );
1153 set_thread_name(L"wine_threadpool_timerqueue");
1154
1156 for (;;)
1157 {
1159
1160 /* Check for expired timers. */
1161 while ((ptr = list_head( &timerqueue.pending_timers )))
1162 {
1163 struct threadpool_object *timer = LIST_ENTRY( ptr, struct threadpool_object, u.timer.timer_entry );
1164 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1165 assert( timer->u.timer.timer_pending );
1166 if (timer->u.timer.timeout > now.QuadPart)
1167 break;
1168
1169 /* Queue a new callback in one of the worker threads. */
1170 list_remove( &timer->u.timer.timer_entry );
1171 timer->u.timer.timer_pending = FALSE;
1173
1174 /* Insert the timer back into the queue, except it's marked for shutdown. */
1175 if (timer->u.timer.period && !timer->shutdown)
1176 {
1177 timer->u.timer.timeout += (ULONGLONG)timer->u.timer.period * 10000;
1178 if (timer->u.timer.timeout <= now.QuadPart)
1179 timer->u.timer.timeout = now.QuadPart + 1;
1180
1181 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
1182 struct threadpool_object, u.timer.timer_entry )
1183 {
1184 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
1185 if (timer->u.timer.timeout < other_timer->u.timer.timeout)
1186 break;
1187 }
1188 list_add_before( &other_timer->u.timer.timer_entry, &timer->u.timer.timer_entry );
1189 timer->u.timer.timer_pending = TRUE;
1190 }
1191 }
1192
1193 timeout_lower = timeout_upper = MAXLONGLONG;
1194
1195 /* Determine next timeout and use the window length to optimize wakeup times. */
1196 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
1197 struct threadpool_object, u.timer.timer_entry )
1198 {
1199 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
1200 if (other_timer->u.timer.timeout >= timeout_upper)
1201 break;
1202
1203 timeout_lower = other_timer->u.timer.timeout;
1204 new_timeout = timeout_lower + (ULONGLONG)other_timer->u.timer.window_length * 10000;
1205 if (new_timeout < timeout_upper)
1206 timeout_upper = new_timeout;
1207 }
1208
1209 /* Wait for timer update events or until the next timer expires. */
1210 if (timerqueue.objcount)
1211 {
1212 timeout.QuadPart = timeout_lower;
1214 continue;
1215 }
1216
1217 /* All timers have been destroyed, if no new timers are created
1218 * within some amount of time, then we can shutdown this thread. */
1219 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
1220 if (RtlSleepConditionVariableCS( &timerqueue.update_event, &timerqueue.cs,
1221 &timeout ) == STATUS_TIMEOUT && !timerqueue.objcount)
1222 {
1223 break;
1224 }
1225 }
1226
1227 timerqueue.thread_running = FALSE;
1229
1230 TRACE( "terminating timer queue thread\n" );
1231 RtlExitUserThread( 0 );
1232#ifdef __REACTOS__
1233 return STATUS_SUCCESS;
1234#endif
1235}
1236
1237/***********************************************************************
1238 * tp_new_worker_thread (internal)
1239 *
1240 * Create and account a new worker thread for the desired pool.
1241 */
1243{
1244 HANDLE thread;
1246
1248 pool->stack_info.StackReserve, pool->stack_info.StackCommit,
1250 if (status == STATUS_SUCCESS)
1251 {
1252 InterlockedIncrement( &pool->refcount );
1253 pool->num_workers++;
1254 NtClose( thread );
1255 }
1256 return status;
1257}
1258
1259/***********************************************************************
1260 * tp_timerqueue_lock (internal)
1261 *
1262 * Acquires a lock on the global timerqueue. When the lock is acquired
1263 * successfully, it is guaranteed that the timer thread is running.
1264 */
1266{
1268 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1269
1270 timer->u.timer.timer_initialized = FALSE;
1271 timer->u.timer.timer_pending = FALSE;
1272 timer->u.timer.timer_set = FALSE;
1273 timer->u.timer.timeout = 0;
1274 timer->u.timer.period = 0;
1275 timer->u.timer.window_length = 0;
1276
1278
1279 /* Make sure that the timerqueue thread is running. */
1280 if (!timerqueue.thread_running)
1281 {
1282 HANDLE thread;
1285 if (status == STATUS_SUCCESS)
1286 {
1287 timerqueue.thread_running = TRUE;
1288 NtClose( thread );
1289 }
1290 }
1291
1292 if (status == STATUS_SUCCESS)
1293 {
1294 timer->u.timer.timer_initialized = TRUE;
1295 timerqueue.objcount++;
1296 }
1297
1299 return status;
1300}
1301
1302/***********************************************************************
1303 * tp_timerqueue_unlock (internal)
1304 *
1305 * Releases a lock on the global timerqueue.
1306 */
1308{
1309 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1310
1312 if (timer->u.timer.timer_initialized)
1313 {
1314 /* If timer was pending, remove it. */
1315 if (timer->u.timer.timer_pending)
1316 {
1317 list_remove( &timer->u.timer.timer_entry );
1318 timer->u.timer.timer_pending = FALSE;
1319 }
1320
1321 /* If the last timer object was destroyed, then wake up the thread. */
1322 if (!--timerqueue.objcount)
1323 {
1324 assert( list_empty( &timerqueue.pending_timers ) );
1325 RtlWakeAllConditionVariable( &timerqueue.update_event );
1326 }
1327
1328 timer->u.timer.timer_initialized = FALSE;
1329 }
1331}
1332
1333/***********************************************************************
1334 * waitqueue_thread_proc (internal)
1335 */
1336#ifdef __REACTOS__
1338#else
1340#endif
1341{
1344 struct waitqueue_bucket *bucket = param;
1345 struct threadpool_object *wait, *next;
1347 DWORD num_handles;
1349
1350 TRACE( "starting wait queue thread\n" );
1351 set_thread_name(L"wine_threadpool_waitqueue");
1352
1354
1355 for (;;)
1356 {
1358 timeout.QuadPart = MAXLONGLONG;
1359 num_handles = 0;
1360
1362 u.wait.wait_entry )
1363 {
1364 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1365 if (wait->u.wait.timeout <= now.QuadPart)
1366 {
1367 /* Wait object timed out. */
1368 if ((wait->u.wait.flags & WT_EXECUTEONLYONCE))
1369 {
1370 list_remove( &wait->u.wait.wait_entry );
1371 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1372 }
1373 if ((wait->u.wait.flags & (WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD)))
1374 {
1375 InterlockedIncrement( &wait->refcount );
1376 wait->num_pending_callbacks++;
1377 RtlEnterCriticalSection( &wait->pool->cs );
1379 RtlLeaveCriticalSection( &wait->pool->cs );
1381 }
1382 else tp_object_submit( wait, FALSE );
1383 }
1384 else
1385 {
1386 if (wait->u.wait.timeout < timeout.QuadPart)
1387 timeout.QuadPart = wait->u.wait.timeout;
1388
1389 assert( num_handles < MAXIMUM_WAITQUEUE_OBJECTS );
1390 InterlockedIncrement( &wait->refcount );
1391 objects[num_handles] = wait;
1392 handles[num_handles] = wait->u.wait.handle;
1393 num_handles++;
1394 }
1395 }
1396
1397 if (!bucket->objcount)
1398 {
1399 /* All wait objects have been destroyed, if no new wait objects are created
1400 * within some amount of time, then we can shutdown this thread. */
1401 assert( num_handles == 0 );
1403 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
1404 status = NtWaitForMultipleObjects( 1, &bucket->update_event, TRUE, bucket->alertable, &timeout );
1406
1407 if (status == STATUS_TIMEOUT && !bucket->objcount)
1408 break;
1409 }
1410 else
1411 {
1412 handles[num_handles] = bucket->update_event;
1414 status = NtWaitForMultipleObjects( num_handles + 1, handles, TRUE, bucket->alertable, &timeout );
1416
1417 if (status >= STATUS_WAIT_0 && status < STATUS_WAIT_0 + num_handles)
1418 {
1420 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1421 if (wait->u.wait.bucket)
1422 {
1423 /* Wait object signaled. */
1424 assert( wait->u.wait.bucket == bucket );
1425 if ((wait->u.wait.flags & WT_EXECUTEONLYONCE))
1426 {
1427 list_remove( &wait->u.wait.wait_entry );
1428 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1429 }
1430 if ((wait->u.wait.flags & (WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD)))
1431 {
1432 wait->u.wait.signaled++;
1433 wait->num_pending_callbacks++;
1434 RtlEnterCriticalSection( &wait->pool->cs );
1436 RtlLeaveCriticalSection( &wait->pool->cs );
1437 }
1438 else tp_object_submit( wait, TRUE );
1439 }
1440 else
1441 WARN("wait object %p triggered while object was destroyed\n", wait);
1442 }
1443
1444 /* Release temporary references to wait objects. */
1445 while (num_handles)
1446 {
1447 wait = objects[--num_handles];
1448 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1450 }
1451 }
1452
1453 /* Try to merge bucket with other threads. */
1454 if (waitqueue.num_buckets > 1 && bucket->objcount &&
1455 bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 1 / 3)
1456 {
1457 struct waitqueue_bucket *other_bucket;
1458 LIST_FOR_EACH_ENTRY( other_bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
1459 {
1460 if (other_bucket != bucket && other_bucket->objcount && other_bucket->alertable == bucket->alertable &&
1461 other_bucket->objcount + bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 2 / 3)
1462 {
1463 other_bucket->objcount += bucket->objcount;
1464 bucket->objcount = 0;
1465
1466 /* Update reserved list. */
1467 LIST_FOR_EACH_ENTRY( wait, &bucket->reserved, struct threadpool_object, u.wait.wait_entry )
1468 {
1469 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1470 wait->u.wait.bucket = other_bucket;
1471 }
1472 list_move_tail( &other_bucket->reserved, &bucket->reserved );
1473
1474 /* Update waiting list. */
1475 LIST_FOR_EACH_ENTRY( wait, &bucket->waiting, struct threadpool_object, u.wait.wait_entry )
1476 {
1477 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1478 wait->u.wait.bucket = other_bucket;
1479 }
1480 list_move_tail( &other_bucket->waiting, &bucket->waiting );
1481
1482 /* Move bucket to the end, to keep the probability of
1483 * newly added wait objects as small as possible. */
1484 list_remove( &bucket->bucket_entry );
1485 list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
1486
1487 NtSetEvent( other_bucket->update_event, NULL );
1488 break;
1489 }
1490 }
1491 }
1492 }
1493
1494 /* Remove this bucket from the list. */
1495 list_remove( &bucket->bucket_entry );
1496 if (!--waitqueue.num_buckets)
1497 assert( list_empty( &waitqueue.buckets ) );
1498
1500
1501 TRACE( "terminating wait queue thread\n" );
1502
1503 assert( bucket->objcount == 0 );
1504 assert( list_empty( &bucket->reserved ) );
1505 assert( list_empty( &bucket->waiting ) );
1506 NtClose( bucket->update_event );
1507
1508 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1509 RtlExitUserThread( 0 );
1510}
1511
1512/***********************************************************************
1513 * tp_waitqueue_lock (internal)
1514 */
1516{
1517 struct waitqueue_bucket *bucket;
1519 HANDLE thread;
1520 BOOL alertable = (wait->u.wait.flags & WT_EXECUTEINIOTHREAD) != 0;
1521 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1522
1523 wait->u.wait.signaled = 0;
1524 wait->u.wait.bucket = NULL;
1525 wait->u.wait.wait_pending = FALSE;
1526 wait->u.wait.timeout = 0;
1527 wait->u.wait.handle = INVALID_HANDLE_VALUE;
1528
1530
1531 /* Try to assign to existing bucket if possible. */
1532 LIST_FOR_EACH_ENTRY( bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
1533 {
1534 if (bucket->objcount < MAXIMUM_WAITQUEUE_OBJECTS && bucket->alertable == alertable)
1535 {
1536 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1537 wait->u.wait.bucket = bucket;
1538 bucket->objcount++;
1539
1541 goto out;
1542 }
1543 }
1544
1545 /* Create a new bucket and corresponding worker thread. */
1546 bucket = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*bucket) );
1547 if (!bucket)
1548 {
1550 goto out;
1551 }
1552
1553 bucket->objcount = 0;
1554 bucket->alertable = alertable;
1555 list_init( &bucket->reserved );
1556 list_init( &bucket->waiting );
1557
1560 if (status)
1561 {
1562 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1563 goto out;
1564 }
1565
1568 if (status == STATUS_SUCCESS)
1569 {
1570 list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
1571 waitqueue.num_buckets++;
1572
1573 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1574 wait->u.wait.bucket = bucket;
1575 bucket->objcount++;
1576
1577 NtClose( thread );
1578 }
1579 else
1580 {
1581 NtClose( bucket->update_event );
1582 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1583 }
1584
1585out:
1587 return status;
1588}
1589
1590/***********************************************************************
1591 * tp_waitqueue_unlock (internal)
1592 */
1593static void tp_waitqueue_unlock( struct threadpool_object *wait )
1594{
1595 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1596
1598 if (wait->u.wait.bucket)
1599 {
1600 struct waitqueue_bucket *bucket = wait->u.wait.bucket;
1601 assert( bucket->objcount > 0 );
1602
1603 list_remove( &wait->u.wait.wait_entry );
1604 wait->u.wait.bucket = NULL;
1605 bucket->objcount--;
1606
1607 NtSetEvent( bucket->update_event, NULL );
1608 }
1610}
1611
1612#ifdef __REACTOS__
1614#else
1616#endif
1617{
1618 struct io_completion *completion;
1619 struct threadpool_object *io;
1621#ifdef __REACTOS__
1622 PVOID key, value;
1623#else
1625#endif
1626 BOOL destroy, skip;
1628
1629 TRACE( "starting I/O completion thread\n" );
1630 set_thread_name(L"wine_threadpool_ioqueue");
1631
1633
1634 for (;;)
1635 {
1637 if ((status = NtRemoveIoCompletion( ioqueue.port, &key, &value, &iosb, NULL )))
1638 ERR("NtRemoveIoCompletion failed, status %#lx.\n", status);
1640
1641 destroy = skip = FALSE;
1642 io = (struct threadpool_object *)key;
1643
1644 TRACE( "io %p, iosb.Status %#lx.\n", io, iosb.Status );
1645
1646 if (io && (io->shutdown || io->u.io.shutting_down))
1647 {
1648 RtlEnterCriticalSection( &io->pool->cs );
1649 if (!io->u.io.pending_count)
1650 {
1651 if (io->u.io.skipped_count)
1652 --io->u.io.skipped_count;
1653
1654 if (io->u.io.skipped_count)
1655 skip = TRUE;
1656 else
1657 destroy = TRUE;
1658 }
1659 RtlLeaveCriticalSection( &io->pool->cs );
1660 if (skip) continue;
1661 }
1662
1663 if (destroy)
1664 {
1665 --ioqueue.objcount;
1666 TRACE( "Releasing io %p.\n", io );
1667 io->shutdown = TRUE;
1669 }
1670 else if (io)
1671 {
1672 RtlEnterCriticalSection( &io->pool->cs );
1673
1674 TRACE( "pending_count %u.\n", io->u.io.pending_count );
1675
1676 if (io->u.io.pending_count)
1677 {
1678 --io->u.io.pending_count;
1679 if (!array_reserve((void **)&io->u.io.completions, &io->u.io.completion_max,
1680 io->u.io.completion_count + 1, sizeof(*io->u.io.completions)))
1681 {
1682 ERR( "Failed to allocate memory.\n" );
1683 RtlLeaveCriticalSection( &io->pool->cs );
1684 continue;
1685 }
1686
1687 completion = &io->u.io.completions[io->u.io.completion_count++];
1688 completion->iosb = iosb;
1689#ifdef __REACTOS__
1690 completion->cvalue = (ULONG_PTR)value;
1691#else
1692 completion->cvalue = value;
1693#endif
1694
1696 }
1697 RtlLeaveCriticalSection( &io->pool->cs );
1698 }
1699
1700 if (!ioqueue.objcount)
1701 {
1702 /* All I/O objects have been destroyed; if no new objects are
1703 * created within some amount of time, then we can shutdown this
1704 * thread. */
1705 LARGE_INTEGER timeout = {.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000};
1706 if (RtlSleepConditionVariableCS( &ioqueue.update_event, &ioqueue.cs,
1707 &timeout) == STATUS_TIMEOUT && !ioqueue.objcount)
1708 break;
1709 }
1710 }
1711
1712 ioqueue.thread_running = FALSE;
1714
1715 TRACE( "terminating I/O completion thread\n" );
1716
1717 RtlExitUserThread( 0 );
1718
1719#ifdef __REACTOS__
1720 return STATUS_SUCCESS;
1721#endif
1722}
1723
1725{
1727
1728 assert( io->type == TP_OBJECT_TYPE_IO );
1729
1731
1732 if (!ioqueue.port && (status = NtCreateIoCompletion( &ioqueue.port,
1734 {
1736 return status;
1737 }
1738
1739 if (!ioqueue.thread_running)
1740 {
1741 HANDLE thread;
1742
1744 0, 0, 0, ioqueue_thread_proc, NULL, &thread, NULL )))
1745 {
1746 ioqueue.thread_running = TRUE;
1747 NtClose( thread );
1748 }
1749 }
1750
1751 if (status == STATUS_SUCCESS)
1752 {
1755
1756#ifdef __REACTOS__
1757 info.Port = ioqueue.port;
1758 info.Key = io;
1759#else
1760 info.CompletionPort = ioqueue.port;
1761 info.CompletionKey = (ULONG_PTR)io;
1762#endif
1763
1765 }
1766
1767 if (status == STATUS_SUCCESS)
1768 {
1769 if (!ioqueue.objcount++)
1770 RtlWakeConditionVariable( &ioqueue.update_event );
1771 }
1772
1774 return status;
1775}
1776
1777/***********************************************************************
1778 * tp_threadpool_alloc (internal)
1779 *
1780 * Allocates a new threadpool object.
1781 */
1783{
1784#ifdef __REACTOS__
1785 IMAGE_NT_HEADERS *nt = RtlImageNtHeader( NtCurrentTeb()->ProcessEnvironmentBlock->ImageBaseAddress );
1786#else
1788#endif
1789 struct threadpool *pool;
1790 unsigned int i;
1791
1792 pool = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*pool) );
1793 if (!pool)
1794 return STATUS_NO_MEMORY;
1795
1796 pool->refcount = 1;
1797 pool->objcount = 0;
1798 pool->shutdown = FALSE;
1799
1800#ifdef __REACTOS__
1802#else
1804
1805 pool->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool.cs");
1806#endif
1807
1808 for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
1809 list_init( &pool->pools[i] );
1810 RtlInitializeConditionVariable( &pool->update_event );
1811
1812 pool->max_workers = 500;
1813 pool->min_workers = 0;
1814 pool->num_workers = 0;
1815 pool->num_busy_workers = 0;
1816 pool->stack_info.StackReserve = nt->OptionalHeader.SizeOfStackReserve;
1817 pool->stack_info.StackCommit = nt->OptionalHeader.SizeOfStackCommit;
1818
1819 TRACE( "allocated threadpool %p\n", pool );
1820
1821 *out = pool;
1822 return STATUS_SUCCESS;
1823}
1824
1825/***********************************************************************
1826 * tp_threadpool_shutdown (internal)
1827 *
1828 * Prepares the shutdown of a threadpool object and notifies all worker
1829 * threads to terminate (after all remaining work items have been
1830 * processed).
1831 */
1833{
1835
1836 pool->shutdown = TRUE;
1837 RtlWakeAllConditionVariable( &pool->update_event );
1838}
1839
1840/***********************************************************************
1841 * tp_threadpool_release (internal)
1842 *
1843 * Releases a reference to a threadpool object.
1844 */
1846{
1847 unsigned int i;
1848
1849 if (InterlockedDecrement( &pool->refcount ))
1850 return FALSE;
1851
1852 TRACE( "destroying threadpool %p\n", pool );
1853
1854 assert( pool->shutdown );
1855 assert( !pool->objcount );
1856 for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
1857 assert( list_empty( &pool->pools[i] ) );
1858#ifndef __REACTOS__
1859 pool->cs.DebugInfo->Spare[0] = 0;
1860#endif
1862
1864 return TRUE;
1865}
1866
1867/***********************************************************************
1868 * tp_threadpool_lock (internal)
1869 *
1870 * Acquires a lock on a threadpool, specified with an TP_CALLBACK_ENVIRON
1871 * block. When the lock is acquired successfully, it is guaranteed that
1872 * there is at least one worker thread to process tasks.
1873 */
1875{
1876 struct threadpool *pool = NULL;
1878
1879 if (environment)
1880 {
1881#ifndef __REACTOS__ //Windows 7 stuff
1882 /* Validate environment parameters. */
1883 if (environment->Version == 3)
1884 {
1885 TP_CALLBACK_ENVIRON_V3 *environment3 = (TP_CALLBACK_ENVIRON_V3 *)environment;
1886
1887 switch (environment3->CallbackPriority)
1888 {
1892 break;
1893 default:
1895 }
1896 }
1897#endif
1898 pool = (struct threadpool *)environment->Pool;
1899 }
1900
1901 if (!pool)
1902 {
1903 if (!default_threadpool)
1904 {
1906 if (status != STATUS_SUCCESS)
1907 return status;
1908
1910 {
1913 }
1914 }
1915
1917 }
1918
1920
1921 /* Make sure that the threadpool has at least one thread. */
1922 if (!pool->num_workers)
1924
1925 /* Keep a reference, and increment objcount to ensure that the
1926 * last thread doesn't terminate. */
1927 if (status == STATUS_SUCCESS)
1928 {
1929 InterlockedIncrement( &pool->refcount );
1930 pool->objcount++;
1931 }
1932
1934
1935 if (status != STATUS_SUCCESS)
1936 return status;
1937
1938 *out = pool;
1939 return STATUS_SUCCESS;
1940}
1941
1942/***********************************************************************
1943 * tp_threadpool_unlock (internal)
1944 *
1945 * Releases a lock on a threadpool.
1946 */
1948{
1950 pool->objcount--;
1953}
1954
1955/***********************************************************************
1956 * tp_group_alloc (internal)
1957 *
1958 * Allocates a new threadpool group object.
1959 */
1961{
1962 struct threadpool_group *group;
1963
1964 group = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*group) );
1965 if (!group)
1966 return STATUS_NO_MEMORY;
1967
1968 group->refcount = 1;
1969 group->shutdown = FALSE;
1970
1971#ifdef __REACTOS__
1973#else
1975
1976 group->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool_group.cs");
1977#endif
1978
1979 list_init( &group->members );
1980
1981 TRACE( "allocated group %p\n", group );
1982
1983 *out = group;
1984 return STATUS_SUCCESS;
1985}
1986
1987/***********************************************************************
1988 * tp_group_shutdown (internal)
1989 *
1990 * Marks the group object for shutdown.
1991 */
1993{
1994 group->shutdown = TRUE;
1995}
1996
1997/***********************************************************************
1998 * tp_group_release (internal)
1999 *
2000 * Releases a reference to a group object.
2001 */
2003{
2004 if (InterlockedDecrement( &group->refcount ))
2005 return FALSE;
2006
2007 TRACE( "destroying group %p\n", group );
2008
2009 assert( group->shutdown );
2010 assert( list_empty( &group->members ) );
2011
2012#ifndef __REACTOS__
2013 group->cs.DebugInfo->Spare[0] = 0;
2014#endif
2016
2018 return TRUE;
2019}
2020
2021/***********************************************************************
2022 * tp_object_initialize (internal)
2023 *
2024 * Initializes members of a threadpool object.
2025 */
2026static void tp_object_initialize( struct threadpool_object *object, struct threadpool *pool,
2027 PVOID userdata, TP_CALLBACK_ENVIRON *environment )
2028{
2029 BOOL is_simple_callback = (object->type == TP_OBJECT_TYPE_SIMPLE);
2030
2031 object->refcount = 1;
2032 object->shutdown = FALSE;
2033
2034 object->pool = pool;
2035 object->group = NULL;
2036 object->userdata = userdata;
2037 object->group_cancel_callback = NULL;
2038 object->finalization_callback = NULL;
2039 object->may_run_long = 0;
2040 object->race_dll = NULL;
2041 object->priority = TP_CALLBACK_PRIORITY_NORMAL;
2042
2043 memset( &object->group_entry, 0, sizeof(object->group_entry) );
2044 object->is_group_member = FALSE;
2045
2046 memset( &object->pool_entry, 0, sizeof(object->pool_entry) );
2047 RtlInitializeConditionVariable( &object->finished_event );
2048 RtlInitializeConditionVariable( &object->group_finished_event );
2049 object->completed_event = NULL;
2050 object->num_pending_callbacks = 0;
2051 object->num_running_callbacks = 0;
2052 object->num_associated_callbacks = 0;
2053
2054 if (environment)
2055 {
2056 if (environment->Version != 1 && environment->Version != 3)
2057 FIXME( "unsupported environment version %lu\n", environment->Version );
2058
2059 object->group = impl_from_TP_CLEANUP_GROUP( environment->CleanupGroup );
2060 object->group_cancel_callback = environment->CleanupGroupCancelCallback;
2061 object->finalization_callback = environment->FinalizationCallback;
2062 object->may_run_long = environment->u.s.LongFunction != 0;
2063 object->race_dll = environment->RaceDll;
2064#ifndef __REACTOS__ //Windows 7 stuff
2065 if (environment->Version == 3)
2066 {
2067 TP_CALLBACK_ENVIRON_V3 *environment_v3 = (TP_CALLBACK_ENVIRON_V3 *)environment;
2068
2069 object->priority = environment_v3->CallbackPriority;
2070 assert( object->priority < ARRAY_SIZE(pool->pools) );
2071 }
2072#endif
2073 if (environment->ActivationContext)
2074 FIXME( "activation context not supported yet\n" );
2075
2076 if (environment->u.s.Persistent)
2077 FIXME( "persistent threads not supported yet\n" );
2078 }
2079
2080 if (object->race_dll)
2081 LdrAddRefDll( 0, object->race_dll );
2082
2083 TRACE( "allocated object %p of type %u\n", object, object->type );
2084
2085 /* For simple callbacks we have to run tp_object_submit before adding this object
2086 * to the cleanup group. As soon as the cleanup group members are released ->shutdown
2087 * will be set, and tp_object_submit would fail with an assertion. */
2088
2089 if (is_simple_callback)
2090 tp_object_submit( object, FALSE );
2091
2092 if (object->group)
2093 {
2094 struct threadpool_group *group = object->group;
2095 InterlockedIncrement( &group->refcount );
2096
2098 list_add_tail( &group->members, &object->group_entry );
2099 object->is_group_member = TRUE;
2101 }
2102
2103 if (is_simple_callback)
2104 tp_object_release( object );
2105}
2106
2107static void tp_object_prio_queue( struct threadpool_object *object )
2108{
2109 ++object->pool->num_busy_workers;
2110 list_add_tail( &object->pool->pools[object->priority], &object->pool_entry );
2111}
2112
2113/***********************************************************************
2114 * tp_object_submit (internal)
2115 *
2116 * Submits a threadpool object to the associated threadpool. This
2117 * function has to be VOID because TpPostWork can never fail on Windows.
2118 */
2119static void tp_object_submit( struct threadpool_object *object, BOOL signaled )
2120{
2121 struct threadpool *pool = object->pool;
2123
2124 assert( !object->shutdown );
2125 assert( !pool->shutdown );
2126
2128
2129 /* Start new worker threads if required. */
2130 if (pool->num_busy_workers >= pool->num_workers &&
2131 pool->num_workers < pool->max_workers)
2133
2134 /* Queue work item and increment refcount. */
2135 InterlockedIncrement( &object->refcount );
2136 if (!object->num_pending_callbacks++)
2137 tp_object_prio_queue( object );
2138
2139 /* Count how often the object was signaled. */
2140 if (object->type == TP_OBJECT_TYPE_WAIT && signaled)
2141 object->u.wait.signaled++;
2142
2143 /* No new thread started - wake up one existing thread. */
2144 if (status != STATUS_SUCCESS)
2145 {
2146 assert( pool->num_workers > 0 );
2147 RtlWakeConditionVariable( &pool->update_event );
2148 }
2149
2151}
2152
2153/***********************************************************************
2154 * tp_object_cancel (internal)
2155 *
2156 * Cancels all currently pending callbacks for a specific object.
2157 */
2158static void tp_object_cancel( struct threadpool_object *object )
2159{
2160 struct threadpool *pool = object->pool;
2161 LONG pending_callbacks = 0;
2162
2164 if (object->num_pending_callbacks)
2165 {
2166 pending_callbacks = object->num_pending_callbacks;
2167 object->num_pending_callbacks = 0;
2168 list_remove( &object->pool_entry );
2169
2170 if (object->type == TP_OBJECT_TYPE_WAIT)
2171 object->u.wait.signaled = 0;
2172 }
2173 if (object->type == TP_OBJECT_TYPE_IO)
2174 {
2175 object->u.io.skipped_count += object->u.io.pending_count;
2176 object->u.io.pending_count = 0;
2177 }
2179
2180 while (pending_callbacks--)
2181 tp_object_release( object );
2182}
2183
2185{
2186 if (object->num_pending_callbacks)
2187 return FALSE;
2188 if (object->type == TP_OBJECT_TYPE_IO && object->u.io.pending_count)
2189 return FALSE;
2190
2191 if (group)
2192 return !object->num_running_callbacks;
2193 else
2194 return !object->num_associated_callbacks;
2195}
2196
2197/***********************************************************************
2198 * tp_object_wait (internal)
2199 *
2200 * Waits until all pending and running callbacks of a specific object
2201 * have been processed.
2202 */
2203static void tp_object_wait( struct threadpool_object *object, BOOL group_wait )
2204{
2205 struct threadpool *pool = object->pool;
2206
2208 while (!object_is_finished( object, group_wait ))
2209 {
2210 if (group_wait)
2211 RtlSleepConditionVariableCS( &object->group_finished_event, &pool->cs, NULL );
2212 else
2213 RtlSleepConditionVariableCS( &object->finished_event, &pool->cs, NULL );
2214 }
2216}
2217
2219{
2220 assert( io->type == TP_OBJECT_TYPE_IO );
2221
2223
2224 assert(ioqueue.objcount);
2225
2226 if (!io->shutdown && !--ioqueue.objcount)
2227 NtSetIoCompletion( ioqueue.port, 0, 0, STATUS_SUCCESS, 0 );
2228
2230}
2231
2232/***********************************************************************
2233 * tp_object_prepare_shutdown (internal)
2234 *
2235 * Prepares a threadpool object for shutdown.
2236 */
2238{
2239 if (object->type == TP_OBJECT_TYPE_TIMER)
2240 tp_timerqueue_unlock( object );
2241 else if (object->type == TP_OBJECT_TYPE_WAIT)
2242 tp_waitqueue_unlock( object );
2243 else if (object->type == TP_OBJECT_TYPE_IO)
2244 tp_ioqueue_unlock( object );
2245}
2246
2247/***********************************************************************
2248 * tp_object_release (internal)
2249 *
2250 * Releases a reference to a threadpool object.
2251 */
2253{
2254 if (InterlockedDecrement( &object->refcount ))
2255 return FALSE;
2256
2257 TRACE( "destroying object %p of type %u\n", object, object->type );
2258
2259 assert( object->shutdown );
2260 assert( !object->num_pending_callbacks );
2261 assert( !object->num_running_callbacks );
2262 assert( !object->num_associated_callbacks );
2263
2264 /* release reference to the group */
2265 if (object->group)
2266 {
2267 struct threadpool_group *group = object->group;
2268
2270 if (object->is_group_member)
2271 {
2272 list_remove( &object->group_entry );
2273 object->is_group_member = FALSE;
2274 }
2276
2278 }
2279
2281
2282 if (object->race_dll)
2283 LdrUnloadDll( object->race_dll );
2284
2285 if (object->completed_event && object->completed_event != INVALID_HANDLE_VALUE)
2286 NtSetEvent( object->completed_event, NULL );
2287
2288 RtlFreeHeap( GetProcessHeap(), 0, object );
2289 return TRUE;
2290}
2291
2292static struct list *threadpool_get_next_item( const struct threadpool *pool )
2293{
2294 struct list *ptr;
2295 unsigned int i;
2296
2297 for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
2298 {
2299 if ((ptr = list_head( &pool->pools[i] )))
2300 break;
2301 }
2302
2303 return ptr;
2304}
2305
2306/***********************************************************************
2307 * tp_object_execute (internal)
2308 *
2309 * Executes a threadpool object callback, object->pool->cs has to be
2310 * held.
2311 */
2312static void tp_object_execute( struct threadpool_object *object, BOOL wait_thread )
2313{
2314 TP_CALLBACK_INSTANCE *callback_instance;
2317 struct threadpool *pool = object->pool;
2318 TP_WAIT_RESULT wait_result = 0;
2320
2321 object->num_pending_callbacks--;
2322
2323 /* For wait objects check if they were signaled or have timed out. */
2324 if (object->type == TP_OBJECT_TYPE_WAIT)
2325 {
2326 wait_result = object->u.wait.signaled ? WAIT_OBJECT_0 : WAIT_TIMEOUT;
2327 if (wait_result == WAIT_OBJECT_0) object->u.wait.signaled--;
2328 }
2329 else if (object->type == TP_OBJECT_TYPE_IO)
2330 {
2331 assert( object->u.io.completion_count );
2332 completion = object->u.io.completions[--object->u.io.completion_count];
2333 }
2334
2335 /* Leave critical section and do the actual callback. */
2336 object->num_associated_callbacks++;
2337 object->num_running_callbacks++;
2339 if (wait_thread) RtlLeaveCriticalSection( &waitqueue.cs );
2340
2341 /* Initialize threadpool instance struct. */
2342 callback_instance = (TP_CALLBACK_INSTANCE *)&instance;
2343 instance.object = object;
2344 instance.threadid = GetCurrentThreadId();
2345 instance.associated = TRUE;
2346 instance.may_run_long = object->may_run_long;
2347 instance.cleanup.critical_section = NULL;
2348 instance.cleanup.mutex = NULL;
2349 instance.cleanup.semaphore = NULL;
2350 instance.cleanup.semaphore_count = 0;
2351 instance.cleanup.event = NULL;
2352 instance.cleanup.library = NULL;
2353
2354 switch (object->type)
2355 {
2357 {
2358 TRACE( "executing simple callback %p(%p, %p)\n",
2359 object->u.simple.callback, callback_instance, object->userdata );
2360 object->u.simple.callback( callback_instance, object->userdata );
2361 TRACE( "callback %p returned\n", object->u.simple.callback );
2362 break;
2363 }
2364
2366 {
2367 TRACE( "executing work callback %p(%p, %p, %p)\n",
2368 object->u.work.callback, callback_instance, object->userdata, object );
2369 object->u.work.callback( callback_instance, object->userdata, (TP_WORK *)object );
2370 TRACE( "callback %p returned\n", object->u.work.callback );
2371 break;
2372 }
2373
2375 {
2376 TRACE( "executing timer callback %p(%p, %p, %p)\n",
2377 object->u.timer.callback, callback_instance, object->userdata, object );
2378 object->u.timer.callback( callback_instance, object->userdata, (TP_TIMER *)object );
2379 TRACE( "callback %p returned\n", object->u.timer.callback );
2380 break;
2381 }
2382
2384 {
2385 TRACE( "executing wait callback %p(%p, %p, %p, %lu)\n",
2386 object->u.wait.callback, callback_instance, object->userdata, object, wait_result );
2387 object->u.wait.callback( callback_instance, object->userdata, (TP_WAIT *)object, wait_result );
2388 TRACE( "callback %p returned\n", object->u.wait.callback );
2389 break;
2390 }
2391
2392 case TP_OBJECT_TYPE_IO:
2393 {
2394 TRACE( "executing I/O callback %p(%p, %p, %#Ix, %p, %p)\n",
2395 object->u.io.callback, callback_instance, object->userdata,
2396 completion.cvalue, &completion.iosb, (TP_IO *)object );
2397 object->u.io.callback( callback_instance, object->userdata,
2398 (void *)completion.cvalue, &completion.iosb, (TP_IO *)object );
2399 TRACE( "callback %p returned\n", object->u.io.callback );
2400 break;
2401 }
2402
2403 default:
2404 assert(0);
2405 break;
2406 }
2407
2408 /* Execute finalization callback. */
2409 if (object->finalization_callback)
2410 {
2411 TRACE( "executing finalization callback %p(%p, %p)\n",
2412 object->finalization_callback, callback_instance, object->userdata );
2413 object->finalization_callback( callback_instance, object->userdata );
2414 TRACE( "callback %p returned\n", object->finalization_callback );
2415 }
2416
2417 /* Execute cleanup tasks. */
2418 if (instance.cleanup.critical_section)
2419 {
2420 RtlLeaveCriticalSection( instance.cleanup.critical_section );
2421 }
2422 if (instance.cleanup.mutex)
2423 {
2424 status = NtReleaseMutant( instance.cleanup.mutex, NULL );
2425 if (status != STATUS_SUCCESS) goto skip_cleanup;
2426 }
2427 if (instance.cleanup.semaphore)
2428 {
2429 status = NtReleaseSemaphore( instance.cleanup.semaphore, instance.cleanup.semaphore_count, NULL );
2430 if (status != STATUS_SUCCESS) goto skip_cleanup;
2431 }
2432 if (instance.cleanup.event)
2433 {
2434 status = NtSetEvent( instance.cleanup.event, NULL );
2435 if (status != STATUS_SUCCESS) goto skip_cleanup;
2436 }
2437 if (instance.cleanup.library)
2438 {
2439 LdrUnloadDll( instance.cleanup.library );
2440 }
2441
2442skip_cleanup:
2443 if (wait_thread) RtlEnterCriticalSection( &waitqueue.cs );
2445
2446 /* Simple callbacks are automatically shutdown after execution. */
2447 if (object->type == TP_OBJECT_TYPE_SIMPLE)
2448 {
2450 object->shutdown = TRUE;
2451 }
2452
2453 object->num_running_callbacks--;
2454 if (object_is_finished( object, TRUE ))
2455 RtlWakeAllConditionVariable( &object->group_finished_event );
2456
2457 if (instance.associated)
2458 {
2459 object->num_associated_callbacks--;
2460 if (object_is_finished( object, FALSE ))
2461 RtlWakeAllConditionVariable( &object->finished_event );
2462 }
2463}
2464
2465/***********************************************************************
2466 * threadpool_worker_proc (internal)
2467 */
2468#ifdef __REACTOS__
2470#else
2472#endif
2473{
2474 struct threadpool *pool = param;
2476 struct list *ptr;
2477
2478 TRACE( "starting worker thread for pool %p\n", pool );
2479 set_thread_name(L"wine_threadpool_worker");
2480
2482 for (;;)
2483 {
2484 while ((ptr = threadpool_get_next_item( pool )))
2485 {
2486 struct threadpool_object *object = LIST_ENTRY( ptr, struct threadpool_object, pool_entry );
2487 assert( object->num_pending_callbacks > 0 );
2488
2489 /* If further pending callbacks are queued, move the work item to
2490 * the end of the pool list. Otherwise remove it from the pool. */
2491 list_remove( &object->pool_entry );
2492 if (object->num_pending_callbacks > 1)
2493 tp_object_prio_queue( object );
2494
2495 tp_object_execute( object, FALSE );
2496
2497 assert(pool->num_busy_workers);
2498 pool->num_busy_workers--;
2499
2500 tp_object_release( object );
2501 }
2502
2503 /* Shutdown worker thread if requested. */
2504 if (pool->shutdown)
2505 break;
2506
2507 /* Wait for new tasks or until the timeout expires. A thread only terminates
2508 * when no new tasks are available, and the number of threads can be
2509 * decreased without violating the min_workers limit. An exception is when
2510 * min_workers == 0, then objcount is used to detect if the last thread
2511 * can be terminated. */
2512 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
2513 if (RtlSleepConditionVariableCS( &pool->update_event, &pool->cs, &timeout ) == STATUS_TIMEOUT &&
2514 !threadpool_get_next_item( pool ) && (pool->num_workers > max( pool->min_workers, 1 ) ||
2515 (!pool->min_workers && !pool->objcount)))
2516 {
2517 break;
2518 }
2519 }
2520 pool->num_workers--;
2522
2523 TRACE( "terminating worker thread for pool %p\n", pool );
2525 RtlExitUserThread( 0 );
2526#ifdef __REACTOS__
2527 return STATUS_SUCCESS;
2528#endif
2529}
2530
2531/***********************************************************************
2532 * TpAllocCleanupGroup (NTDLL.@)
2533 */
2535{
2536 TRACE( "%p\n", out );
2537
2538 return tp_group_alloc( (struct threadpool_group **)out );
2539}
2540
2541/***********************************************************************
2542 * TpAllocIoCompletion (NTDLL.@)
2543 */
2545 void *userdata, TP_CALLBACK_ENVIRON *environment )
2546{
2547 struct threadpool_object *object;
2548 struct threadpool *pool;
2550
2551 TRACE( "%p %p %p %p %p\n", out, file, callback, userdata, environment );
2552
2553 if (!(object = RtlAllocateHeap( GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(*object) )))
2554 return STATUS_NO_MEMORY;
2555
2556 if ((status = tp_threadpool_lock( &pool, environment )))
2557 {
2558 RtlFreeHeap( GetProcessHeap(), 0, object );
2559 return status;
2560 }
2561
2562 object->type = TP_OBJECT_TYPE_IO;
2563 object->u.io.callback = callback;
2564 if (!(object->u.io.completions = RtlAllocateHeap( GetProcessHeap(), 0, 8 * sizeof(*object->u.io.completions) )))
2565 {
2567 RtlFreeHeap( GetProcessHeap(), 0, object );
2568 return status;
2569 }
2570
2571 if ((status = tp_ioqueue_lock( object, file )))
2572 {
2574 RtlFreeHeap( GetProcessHeap(), 0, object->u.io.completions );
2575 RtlFreeHeap( GetProcessHeap(), 0, object );
2576 return status;
2577 }
2578
2579 tp_object_initialize( object, pool, userdata, environment );
2580
2581 *out = (TP_IO *)object;
2582 return STATUS_SUCCESS;
2583}
2584
2585/***********************************************************************
2586 * TpAllocPool (NTDLL.@)
2587 */
2589{
2590 TRACE( "%p %p\n", out, reserved );
2591
2592 if (reserved)
2593 FIXME( "reserved argument is nonzero (%p)\n", reserved );
2594
2595 return tp_threadpool_alloc( (struct threadpool **)out );
2596}
2597
2598/***********************************************************************
2599 * TpAllocTimer (NTDLL.@)
2600 */
2602 TP_CALLBACK_ENVIRON *environment )
2603{
2604 struct threadpool_object *object;
2605 struct threadpool *pool;
2607
2608 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2609
2610 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2611 if (!object)
2612 return STATUS_NO_MEMORY;
2613
2614 status = tp_threadpool_lock( &pool, environment );
2615 if (status)
2616 {
2617 RtlFreeHeap( GetProcessHeap(), 0, object );
2618 return status;
2619 }
2620
2621 object->type = TP_OBJECT_TYPE_TIMER;
2622 object->u.timer.callback = callback;
2623
2624 status = tp_timerqueue_lock( object );
2625 if (status)
2626 {
2628 RtlFreeHeap( GetProcessHeap(), 0, object );
2629 return status;
2630 }
2631
2632 tp_object_initialize( object, pool, userdata, environment );
2633
2634 *out = (TP_TIMER *)object;
2635 return STATUS_SUCCESS;
2636}
2637
2639 TP_CALLBACK_ENVIRON *environment, DWORD flags )
2640{
2641 struct threadpool_object *object;
2642 struct threadpool *pool;
2644
2645 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2646 if (!object)
2647 return STATUS_NO_MEMORY;
2648
2649 status = tp_threadpool_lock( &pool, environment );
2650 if (status)
2651 {
2652 RtlFreeHeap( GetProcessHeap(), 0, object );
2653 return status;
2654 }
2655
2656 object->type = TP_OBJECT_TYPE_WAIT;
2657 object->u.wait.callback = callback;
2658 object->u.wait.flags = flags;
2659
2660 status = tp_waitqueue_lock( object );
2661 if (status)
2662 {
2664 RtlFreeHeap( GetProcessHeap(), 0, object );
2665 return status;
2666 }
2667
2668 tp_object_initialize( object, pool, userdata, environment );
2669
2670 *out = (TP_WAIT *)object;
2671 return STATUS_SUCCESS;
2672}
2673
2674/***********************************************************************
2675 * TpAllocWait (NTDLL.@)
2676 */
2678 TP_CALLBACK_ENVIRON *environment )
2679{
2680 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2681 return tp_alloc_wait( out, callback, userdata, environment, WT_EXECUTEONLYONCE );
2682}
2683
2684/***********************************************************************
2685 * TpAllocWork (NTDLL.@)
2686 */
2688 TP_CALLBACK_ENVIRON *environment )
2689{
2690 struct threadpool_object *object;
2691 struct threadpool *pool;
2693
2694 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2695
2696 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2697 if (!object)
2698 return STATUS_NO_MEMORY;
2699
2700 status = tp_threadpool_lock( &pool, environment );
2701 if (status)
2702 {
2703 RtlFreeHeap( GetProcessHeap(), 0, object );
2704 return status;
2705 }
2706
2707 object->type = TP_OBJECT_TYPE_WORK;
2708 object->u.work.callback = callback;
2709 tp_object_initialize( object, pool, userdata, environment );
2710
2711 *out = (TP_WORK *)object;
2712 return STATUS_SUCCESS;
2713}
2714
2715/***********************************************************************
2716 * TpCancelAsyncIoOperation (NTDLL.@)
2717 */
2719{
2720 struct threadpool_object *this = impl_from_TP_IO( io );
2721
2722 TRACE( "%p\n", io );
2723
2724 RtlEnterCriticalSection( &this->pool->cs );
2725
2726 TRACE("pending_count %u.\n", this->u.io.pending_count);
2727
2728 this->u.io.pending_count--;
2729 if (object_is_finished( this, TRUE ))
2731 if (object_is_finished( this, FALSE ))
2733
2734 RtlLeaveCriticalSection( &this->pool->cs );
2735}
2736
2737/***********************************************************************
2738 * TpCallbackLeaveCriticalSectionOnCompletion (NTDLL.@)
2739 */
2741{
2743
2744 TRACE( "%p %p\n", instance, crit );
2745
2746 if (!this->cleanup.critical_section)
2747 this->cleanup.critical_section = crit;
2748}
2749
2750/***********************************************************************
2751 * TpCallbackMayRunLong (NTDLL.@)
2752 */
2754{
2756 struct threadpool_object *object = this->object;
2757 struct threadpool *pool;
2759
2760 TRACE( "%p\n", instance );
2761
2762 if (this->threadid != GetCurrentThreadId())
2763 {
2764 ERR("called from wrong thread, ignoring\n");
2765 return STATUS_UNSUCCESSFUL; /* FIXME */
2766 }
2767
2768 if (this->may_run_long)
2769 return STATUS_SUCCESS;
2770
2771 pool = object->pool;
2773
2774 /* Start new worker threads if required. */
2775 if (pool->num_busy_workers >= pool->num_workers)
2776 {
2777 if (pool->num_workers < pool->max_workers)
2778 {
2780 }
2781 else
2782 {
2784 }
2785 }
2786
2788 this->may_run_long = TRUE;
2789 return status;
2790}
2791
2792/***********************************************************************
2793 * TpCallbackReleaseMutexOnCompletion (NTDLL.@)
2794 */
2796{
2798
2799 TRACE( "%p %p\n", instance, mutex );
2800
2801 if (!this->cleanup.mutex)
2802 this->cleanup.mutex = mutex;
2803}
2804
2805/***********************************************************************
2806 * TpCallbackReleaseSemaphoreOnCompletion (NTDLL.@)
2807 */
2809{
2811
2812 TRACE( "%p %p %lu\n", instance, semaphore, count );
2813
2814 if (!this->cleanup.semaphore)
2815 {
2816 this->cleanup.semaphore = semaphore;
2817 this->cleanup.semaphore_count = count;
2818 }
2819}
2820
2821/***********************************************************************
2822 * TpCallbackSetEventOnCompletion (NTDLL.@)
2823 */
2825{
2827
2828 TRACE( "%p %p\n", instance, event );
2829
2830 if (!this->cleanup.event)
2831 this->cleanup.event = event;
2832}
2833
2834/***********************************************************************
2835 * TpCallbackUnloadDllOnCompletion (NTDLL.@)
2836 */
2838{
2840
2841 TRACE( "%p %p\n", instance, module );
2842
2843 if (!this->cleanup.library)
2844 this->cleanup.library = module;
2845}
2846
2847/***********************************************************************
2848 * TpDisassociateCallback (NTDLL.@)
2849 */
2851{
2853 struct threadpool_object *object = this->object;
2854 struct threadpool *pool;
2855
2856 TRACE( "%p\n", instance );
2857
2858 if (this->threadid != GetCurrentThreadId())
2859 {
2860 ERR("called from wrong thread, ignoring\n");
2861 return;
2862 }
2863
2864 if (!this->associated)
2865 return;
2866
2867 pool = object->pool;
2869
2870 object->num_associated_callbacks--;
2871 if (object_is_finished( object, FALSE ))
2872 RtlWakeAllConditionVariable( &object->finished_event );
2873
2875 this->associated = FALSE;
2876}
2877
2878/***********************************************************************
2879 * TpIsTimerSet (NTDLL.@)
2880 */
2882{
2883 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2884
2885 TRACE( "%p\n", timer );
2886
2887 return this->u.timer.timer_set;
2888}
2889
2890/***********************************************************************
2891 * TpPostWork (NTDLL.@)
2892 */
2894{
2895 struct threadpool_object *this = impl_from_TP_WORK( work );
2896
2897 TRACE( "%p\n", work );
2898
2899 tp_object_submit( this, FALSE );
2900}
2901
2902/***********************************************************************
2903 * TpReleaseCleanupGroup (NTDLL.@)
2904 */
2906{
2908
2909 TRACE( "%p\n", group );
2910
2911 tp_group_shutdown( this );
2912 tp_group_release( this );
2913}
2914
2915/***********************************************************************
2916 * TpReleaseCleanupGroupMembers (NTDLL.@)
2917 */
2919{
2921 struct threadpool_object *object, *next;
2922 struct list members;
2923
2924 TRACE( "%p %u %p\n", group, cancel_pending, userdata );
2925
2926 RtlEnterCriticalSection( &this->cs );
2927
2928 /* Unset group, increase references, and mark objects for shutdown */
2929 LIST_FOR_EACH_ENTRY_SAFE( object, next, &this->members, struct threadpool_object, group_entry )
2930 {
2931 assert( object->group == this );
2932 assert( object->is_group_member );
2933
2934 if (InterlockedIncrement( &object->refcount ) == 1)
2935 {
2936 /* Object is basically already destroyed, but group reference
2937 * was not deleted yet. We can safely ignore this object. */
2938 InterlockedDecrement( &object->refcount );
2939 list_remove( &object->group_entry );
2940 object->is_group_member = FALSE;
2941 continue;
2942 }
2943
2944 object->is_group_member = FALSE;
2946 }
2947
2948 /* Move members to a new temporary list */
2949 list_init( &members );
2950 list_move_tail( &members, &this->members );
2951
2952 RtlLeaveCriticalSection( &this->cs );
2953
2954 /* Cancel pending callbacks if requested */
2955 if (cancel_pending)
2956 {
2957 LIST_FOR_EACH_ENTRY( object, &members, struct threadpool_object, group_entry )
2958 {
2959 tp_object_cancel( object );
2960 }
2961 }
2962
2963 /* Wait for remaining callbacks to finish */
2964 LIST_FOR_EACH_ENTRY_SAFE( object, next, &members, struct threadpool_object, group_entry )
2965 {
2966 tp_object_wait( object, TRUE );
2967
2968 if (!object->shutdown)
2969 {
2970 /* Execute group cancellation callback if defined, and if this was actually a group cancel. */
2971 if (cancel_pending && object->group_cancel_callback)
2972 {
2973 TRACE( "executing group cancel callback %p(%p, %p)\n",
2974 object->group_cancel_callback, object->userdata, userdata );
2975 object->group_cancel_callback( object->userdata, userdata );
2976 TRACE( "callback %p returned\n", object->group_cancel_callback );
2977 }
2978
2979 if (object->type != TP_OBJECT_TYPE_SIMPLE)
2980 tp_object_release( object );
2981 }
2982
2983 object->shutdown = TRUE;
2984 tp_object_release( object );
2985 }
2986}
2987
2988/***********************************************************************
2989 * TpReleaseIoCompletion (NTDLL.@)
2990 */
2992{
2993 struct threadpool_object *this = impl_from_TP_IO( io );
2994 BOOL can_destroy;
2995
2996 TRACE( "%p\n", io );
2997
2998 RtlEnterCriticalSection( &this->pool->cs );
2999 this->u.io.shutting_down = TRUE;
3000 can_destroy = !this->u.io.pending_count && !this->u.io.skipped_count;
3001 RtlLeaveCriticalSection( &this->pool->cs );
3002
3003 if (can_destroy)
3004 {
3006 this->shutdown = TRUE;
3007 tp_object_release( this );
3008 }
3009}
3010
3011/***********************************************************************
3012 * TpReleasePool (NTDLL.@)
3013 */
3015{
3016 struct threadpool *this = impl_from_TP_POOL( pool );
3017
3018 TRACE( "%p\n", pool );
3019
3020 tp_threadpool_shutdown( this );
3021 tp_threadpool_release( this );
3022}
3023
3024/***********************************************************************
3025 * TpReleaseTimer (NTDLL.@)
3026 */
3028{
3029 struct threadpool_object *this = impl_from_TP_TIMER( timer );
3030
3031 TRACE( "%p\n", timer );
3032
3034 this->shutdown = TRUE;
3035 tp_object_release( this );
3036}
3037
3038/***********************************************************************
3039 * TpReleaseWait (NTDLL.@)
3040 */
3042{
3043 struct threadpool_object *this = impl_from_TP_WAIT( wait );
3044
3045 TRACE( "%p\n", wait );
3046
3048 this->shutdown = TRUE;
3049 tp_object_release( this );
3050}
3051
3052/***********************************************************************
3053 * TpReleaseWork (NTDLL.@)
3054 */
3056{
3057 struct threadpool_object *this = impl_from_TP_WORK( work );
3058
3059 TRACE( "%p\n", work );
3060
3062 this->shutdown = TRUE;
3063 tp_object_release( this );
3064}
3065
3066/***********************************************************************
3067 * TpSetPoolMaxThreads (NTDLL.@)
3068 */
3070{
3071 struct threadpool *this = impl_from_TP_POOL( pool );
3072
3073 TRACE( "%p %lu\n", pool, maximum );
3074
3075 RtlEnterCriticalSection( &this->cs );
3076 this->max_workers = max( maximum, 1 );
3077 this->min_workers = min( this->min_workers, this->max_workers );
3078 RtlLeaveCriticalSection( &this->cs );
3079}
3080
3081/***********************************************************************
3082 * TpSetPoolMinThreads (NTDLL.@)
3083 */
3085{
3086 struct threadpool *this = impl_from_TP_POOL( pool );
3088
3089 TRACE( "%p %lu\n", pool, minimum );
3090
3091 RtlEnterCriticalSection( &this->cs );
3092
3093 while (this->num_workers < minimum)
3094 {
3095 status = tp_new_worker_thread( this );
3096 if (status != STATUS_SUCCESS)
3097 break;
3098 }
3099
3100 if (status == STATUS_SUCCESS)
3101 {
3102 this->min_workers = minimum;
3103 this->max_workers = max( this->min_workers, this->max_workers );
3104 }
3105
3106 RtlLeaveCriticalSection( &this->cs );
3107 return !status;
3108}
3109
3110/***********************************************************************
3111 * TpSetTimer (NTDLL.@)
3112 */
3113VOID WINAPI TpSetTimer( TP_TIMER *timer, LARGE_INTEGER *timeout, LONG period, LONG window_length )
3114{
3115 struct threadpool_object *this = impl_from_TP_TIMER( timer );
3116 struct threadpool_object *other_timer;
3117 BOOL submit_timer = FALSE;
3119
3120 TRACE( "%p %p %lu %lu\n", timer, timeout, period, window_length );
3121
3123
3124 assert( this->u.timer.timer_initialized );
3125 this->u.timer.timer_set = timeout != NULL;
3126
3127 /* Convert relative timeout to absolute timestamp and handle a timeout
3128 * of zero, which means that the timer is submitted immediately. */
3129 if (timeout)
3130 {
3131 timestamp = timeout->QuadPart;
3132 if ((LONGLONG)timestamp < 0)
3133 {
3136 timestamp = now.QuadPart - timestamp;
3137 }
3138 else if (!timestamp)
3139 {
3140 if (!period)
3141 timeout = NULL;
3142 else
3143 {
3146 timestamp = now.QuadPart + (ULONGLONG)period * 10000;
3147 }
3148 submit_timer = TRUE;
3149 }
3150 }
3151
3152 /* First remove existing timeout. */
3153 if (this->u.timer.timer_pending)
3154 {
3155 list_remove( &this->u.timer.timer_entry );
3156 this->u.timer.timer_pending = FALSE;
3157 }
3158
3159 /* If the timer was enabled, then add it back to the queue. */
3160 if (timeout)
3161 {
3162 this->u.timer.timeout = timestamp;
3163 this->u.timer.period = period;
3164 this->u.timer.window_length = window_length;
3165
3166 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
3167 struct threadpool_object, u.timer.timer_entry )
3168 {
3169 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
3170 if (this->u.timer.timeout < other_timer->u.timer.timeout)
3171 break;
3172 }
3173 list_add_before( &other_timer->u.timer.timer_entry, &this->u.timer.timer_entry );
3174
3175 /* Wake up the timer thread when the timeout has to be updated. */
3176 if (list_head( &timerqueue.pending_timers ) == &this->u.timer.timer_entry )
3177 RtlWakeAllConditionVariable( &timerqueue.update_event );
3178
3179 this->u.timer.timer_pending = TRUE;
3180 }
3181
3183
3184 if (submit_timer)
3185 tp_object_submit( this, FALSE );
3186}
3187
3188/***********************************************************************
3189 * TpSetWait (NTDLL.@)
3190 */
3192{
3193 struct threadpool_object *this = impl_from_TP_WAIT( wait );
3195
3196 TRACE( "%p %p %p\n", wait, handle, timeout );
3197
3199
3200 assert( this->u.wait.bucket );
3201 this->u.wait.handle = handle;
3202
3203 if (handle || this->u.wait.wait_pending)
3204 {
3205 struct waitqueue_bucket *bucket = this->u.wait.bucket;
3206 list_remove( &this->u.wait.wait_entry );
3207
3208 /* Convert relative timeout to absolute timestamp. */
3209 if (handle && timeout)
3210 {
3211 timestamp = timeout->QuadPart;
3212 if ((LONGLONG)timestamp < 0)
3213 {
3216 timestamp = now.QuadPart - timestamp;
3217 }
3218 }
3219
3220 /* Add wait object back into one of the queues. */
3221 if (handle)
3222 {
3223 list_add_tail( &bucket->waiting, &this->u.wait.wait_entry );
3224 this->u.wait.wait_pending = TRUE;
3225 this->u.wait.timeout = timestamp;
3226 }
3227 else
3228 {
3229 list_add_tail( &bucket->reserved, &this->u.wait.wait_entry );
3230 this->u.wait.wait_pending = FALSE;
3231 }
3232
3233 /* Wake up the wait queue thread. */
3234 NtSetEvent( bucket->update_event, NULL );
3235 }
3236
3238}
3239
3240/***********************************************************************
3241 * TpSimpleTryPost (NTDLL.@)
3242 */
3244 TP_CALLBACK_ENVIRON *environment )
3245{
3246 struct threadpool_object *object;
3247 struct threadpool *pool;
3249
3250 TRACE( "%p %p %p\n", callback, userdata, environment );
3251
3252 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
3253 if (!object)
3254 return STATUS_NO_MEMORY;
3255
3256 status = tp_threadpool_lock( &pool, environment );
3257 if (status)
3258 {
3259 RtlFreeHeap( GetProcessHeap(), 0, object );
3260 return status;
3261 }
3262
3263 object->type = TP_OBJECT_TYPE_SIMPLE;
3264 object->u.simple.callback = callback;
3265 tp_object_initialize( object, pool, userdata, environment );
3266
3267 return STATUS_SUCCESS;
3268}
3269
3270/***********************************************************************
3271 * TpStartAsyncIoOperation (NTDLL.@)
3272 */
3274{
3275 struct threadpool_object *this = impl_from_TP_IO( io );
3276
3277 TRACE( "%p\n", io );
3278
3279 RtlEnterCriticalSection( &this->pool->cs );
3280
3281 this->u.io.pending_count++;
3282
3283 RtlLeaveCriticalSection( &this->pool->cs );
3284}
3285
3286/***********************************************************************
3287 * TpWaitForIoCompletion (NTDLL.@)
3288 */
3289void WINAPI TpWaitForIoCompletion( TP_IO *io, BOOL cancel_pending )
3290{
3291 struct threadpool_object *this = impl_from_TP_IO( io );
3292
3293 TRACE( "%p %d\n", io, cancel_pending );
3294
3295 if (cancel_pending)
3296 tp_object_cancel( this );
3297 tp_object_wait( this, FALSE );
3298}
3299
3300/***********************************************************************
3301 * TpWaitForTimer (NTDLL.@)
3302 */
3304{
3305 struct threadpool_object *this = impl_from_TP_TIMER( timer );
3306
3307 TRACE( "%p %d\n", timer, cancel_pending );
3308
3309 if (cancel_pending)
3310 tp_object_cancel( this );
3311 tp_object_wait( this, FALSE );
3312}
3313
3314/***********************************************************************
3315 * TpWaitForWait (NTDLL.@)
3316 */
3318{
3319 struct threadpool_object *this = impl_from_TP_WAIT( wait );
3320
3321 TRACE( "%p %d\n", wait, cancel_pending );
3322
3323 if (cancel_pending)
3324 tp_object_cancel( this );
3325 tp_object_wait( this, FALSE );
3326}
3327
3328/***********************************************************************
3329 * TpWaitForWork (NTDLL.@)
3330 */
3332{
3333 struct threadpool_object *this = impl_from_TP_WORK( work );
3334
3335 TRACE( "%p %u\n", work, cancel_pending );
3336
3337 if (cancel_pending)
3338 tp_object_cancel( this );
3339 tp_object_wait( this, FALSE );
3340}
3341
3342/***********************************************************************
3343 * TpSetPoolStackInformation (NTDLL.@)
3344 */
3346{
3347 struct threadpool *this = impl_from_TP_POOL( pool );
3348
3349 TRACE( "%p %p\n", pool, stack_info );
3350
3351 if (!stack_info)
3353
3354 RtlEnterCriticalSection( &this->cs );
3355 this->stack_info = *stack_info;
3356 RtlLeaveCriticalSection( &this->cs );
3357
3358 return STATUS_SUCCESS;
3359}
3360
3361/***********************************************************************
3362 * TpQueryPoolStackInformation (NTDLL.@)
3363 */
3365{
3366 struct threadpool *this = impl_from_TP_POOL( pool );
3367
3368 TRACE( "%p %p\n", pool, stack_info );
3369
3370 if (!stack_info)
3372
3373 RtlEnterCriticalSection( &this->cs );
3374 *stack_info = this->stack_info;
3375 RtlLeaveCriticalSection( &this->cs );
3376
3377 return STATUS_SUCCESS;
3378}
3379
3380#ifndef __REACTOS__
3382{
3383 struct threadpool_object *object = impl_from_TP_WAIT(wait);
3384 object->u.wait.rtl_callback( userdata, result != STATUS_WAIT_0 );
3385}
3386
3387/***********************************************************************
3388 * RtlRegisterWait (NTDLL.@)
3389 *
3390 * Registers a wait for a handle to become signaled.
3391 *
3392 * PARAMS
3393 * NewWaitObject [I] Handle to the new wait object. Use RtlDeregisterWait() to free it.
3394 * Object [I] Object to wait to become signaled.
3395 * Callback [I] Callback function to execute when the wait times out or the handle is signaled.
3396 * Context [I] Context to pass to the callback function when it is executed.
3397 * Milliseconds [I] Number of milliseconds to wait before timing out.
3398 * Flags [I] Flags. See notes.
3399 *
3400 * RETURNS
3401 * Success: STATUS_SUCCESS.
3402 * Failure: Any NTSTATUS code.
3403 *
3404 * NOTES
3405 * Flags can be one or more of the following:
3406 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
3407 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
3408 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
3409 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
3410 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
3411 */
3413 void *context, ULONG milliseconds, ULONG flags )
3414{
3415 struct threadpool_object *object;
3416 TP_CALLBACK_ENVIRON environment;
3419 TP_WAIT *wait;
3420
3421 TRACE( "out %p, handle %p, callback %p, context %p, milliseconds %lu, flags %lx\n",
3422 out, handle, callback, context, milliseconds, flags );
3423
3424 memset( &environment, 0, sizeof(environment) );
3425 environment.Version = 1;
3426 environment.u.s.LongFunction = (flags & WT_EXECUTELONGFUNCTION) != 0;
3427 environment.u.s.Persistent = (flags & WT_EXECUTEINPERSISTENTTHREAD) != 0;
3428
3430 if ((status = tp_alloc_wait( &wait, rtl_wait_callback, context, &environment, flags )))
3431 return status;
3432
3433 object = impl_from_TP_WAIT(wait);
3434 object->u.wait.rtl_callback = callback;
3435
3437 TpSetWait( (TP_WAIT *)object, handle, get_nt_timeout( &timeout, milliseconds ) );
3438
3439 *out = object;
3441
3442 return STATUS_SUCCESS;
3443}
3444
3445/***********************************************************************
3446 * RtlDeregisterWaitEx (NTDLL.@)
3447 *
3448 * Cancels a wait operation and frees the resources associated with calling
3449 * RtlRegisterWait().
3450 *
3451 * PARAMS
3452 * WaitObject [I] Handle to the wait object to free.
3453 *
3454 * RETURNS
3455 * Success: STATUS_SUCCESS.
3456 * Failure: Any NTSTATUS code.
3457 */
3459{
3460 struct threadpool_object *object = handle;
3462
3463 TRACE( "handle %p, event %p\n", handle, event );
3464
3465 if (!object) return STATUS_INVALID_HANDLE;
3466
3467 TpSetWait( (TP_WAIT *)object, NULL, NULL );
3468
3469 if (event == INVALID_HANDLE_VALUE) TpWaitForWait( (TP_WAIT *)object, TRUE );
3470 else
3471 {
3472 assert( object->completed_event == NULL );
3473 object->completed_event = event;
3474 }
3475
3476 RtlEnterCriticalSection( &object->pool->cs );
3477 if (object->num_pending_callbacks + object->num_running_callbacks
3478 + object->num_associated_callbacks) status = STATUS_PENDING;
3479 else status = STATUS_SUCCESS;
3480 RtlLeaveCriticalSection( &object->pool->cs );
3481
3482 TpReleaseWait( (TP_WAIT *)object );
3483 return status;
3484}
3485
3486/***********************************************************************
3487 * RtlDeregisterWait (NTDLL.@)
3488 *
3489 * Cancels a wait operation and frees the resources associated with calling
3490 * RtlRegisterWait().
3491 *
3492 * PARAMS
3493 * WaitObject [I] Handle to the wait object to free.
3494 *
3495 * RETURNS
3496 * Success: STATUS_SUCCESS.
3497 * Failure: Any NTSTATUS code.
3498 */
3500{
3501 return RtlDeregisterWaitEx(WaitHandle, NULL);
3502}
3503#endif
3504
3505#ifdef __REACTOS__
3506VOID
3507NTAPI
3509 VOID)
3510{
3511 RtlInitializeCriticalSection(&old_threadpool.threadpool_compl_cs);
3515}
3516#endif
void CALLBACK completion(DWORD dwError, DWORD cbTransferred, LPWSAOVERLAPPED lpOverlapped, DWORD dwFlags)
Definition: WSARecv.c:16
void destroy(_Tp *__pointer)
Definition: _construct.h:278
#define VOID
Definition: acefi.h:82
#define InterlockedIncrement
Definition: armddk.h:53
#define InterlockedDecrement
Definition: armddk.h:52
#define skip(...)
Definition: atltest.h:64
#define WINE_DEFAULT_DEBUG_CHANNEL(t)
Definition: precomp.h:23
LONG NTSTATUS
Definition: precomp.h:26
#define ARRAY_SIZE(A)
Definition: main.h:20
static void list_remove(struct list_entry *entry)
Definition: list.h:90
static int list_empty(struct list_entry *head)
Definition: list.h:58
static void list_add_tail(struct list_entry *head, struct list_entry *entry)
Definition: list.h:83
static void list_init(struct list_entry *head)
Definition: list.h:51
#define FIXME(fmt,...)
Definition: precomp.h:53
#define WARN(fmt,...)
Definition: precomp.h:61
#define ERR(fmt,...)
Definition: precomp.h:57
static HANDLE thread
Definition: service.c:33
#define NTSYSAPI
Definition: ntoskrnl.h:12
PVOID NTAPI RtlAllocateHeap(IN PVOID HeapHandle, IN ULONG Flags, IN SIZE_T Size)
Definition: heap.c:616
BOOLEAN NTAPI RtlFreeHeap(IN PVOID HeapHandle, IN ULONG Flags, IN PVOID HeapBase)
Definition: heap.c:634
_In_ CDROM_SCAN_FOR_SPECIAL_INFO _In_ PCDROM_SCAN_FOR_SPECIAL_HANDLER Function
Definition: cdrom.h:1156
Definition: list.h:37
#define STATUS_TIMEOUT
Definition: d3dkmdt.h:49
#define STATUS_INVALID_HANDLE
Definition: d3dkmdt.h:40
#define STATUS_NO_MEMORY
Definition: d3dkmdt.h:51
#define WAIT_TIMEOUT
Definition: dderror.h:14
#define NULL
Definition: types.h:112
#define TRUE
Definition: types.h:120
#define FALSE
Definition: types.h:117
static HINSTANCE instance
Definition: main.c:40
#define GetProcessHeap()
Definition: compat.h:736
#define INVALID_HANDLE_VALUE
Definition: compat.h:731
#define GetCurrentProcess()
Definition: compat.h:759
#define RtlImageNtHeader
Definition: compat.h:806
#define CALLBACK
Definition: compat.h:35
#define HEAP_ZERO_MEMORY
Definition: compat.h:134
static void cleanup(void)
Definition: main.c:1335
PPEB Peb
Definition: dllmain.c:27
#define assert(x)
Definition: debug.h:53
static void list_move_tail(struct list_head *list, struct list_head *head)
Definition: list.h:122
static uacpi_status set_event(uacpi_u8 event, uacpi_u8 value)
Definition: event.c:84
#define L(x)
Definition: resources.c:13
r reserved
Definition: btrfs.c:3006
#define INFINITE
Definition: serial.h:102
#define ULONG_PTR
Definition: config.h:101
unsigned int BOOL
Definition: ntddk_ex.h:94
unsigned long DWORD
Definition: ntddk_ex.h:95
time_t now
Definition: finger.c:65
_Must_inspect_result_ _In_opt_ PFLT_INSTANCE _Out_ PHANDLE FileHandle
Definition: fltkernel.h:1231
@ FileCompletionInformation
Definition: from_kernel.h:91
FxCollectionEntry * cur
GLuint GLuint GLsizei count
Definition: gl.h:1545
GLdouble GLdouble t
Definition: gl.h:2047
GLdouble GLdouble GLdouble GLdouble q
Definition: gl.h:2063
struct _cl_event * event
Definition: glext.h:7739
GLuint res
Definition: glext.h:9613
GLsizeiptr size
Definition: glext.h:5919
GLbitfield flags
Definition: glext.h:7161
GLboolean GLuint group
Definition: glext.h:11120
GLuint64EXT * result
Definition: glext.h:11304
GLfloat GLfloat p
Definition: glext.h:8902
GLfloat param
Definition: glext.h:5796
GLsizei GLenum const GLvoid GLsizei GLenum GLbyte GLbyte GLbyte GLdouble GLdouble GLdouble GLfloat GLfloat GLfloat GLint GLint GLint GLshort GLshort GLshort GLubyte GLubyte GLubyte GLuint GLuint GLuint GLushort GLushort GLushort GLbyte GLbyte GLbyte GLbyte GLdouble GLdouble GLdouble GLdouble GLfloat GLfloat GLfloat GLfloat GLint GLint GLint GLint GLshort GLshort GLshort GLshort GLubyte GLubyte GLubyte GLubyte GLuint GLuint GLuint GLuint GLushort GLushort GLushort GLushort GLboolean const GLdouble const GLfloat const GLint const GLshort const GLbyte const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLdouble const GLfloat const GLfloat const GLint const GLint const GLshort const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort GLenum GLenum GLenum GLfloat GLenum GLint GLenum GLenum GLenum GLfloat GLenum GLenum GLint GLenum GLfloat GLenum GLint GLint GLushort GLenum GLenum GLfloat GLenum GLenum GLint GLfloat const GLubyte GLenum GLenum GLenum const GLfloat GLenum GLenum const GLint GLenum GLint GLint GLsizei GLsizei GLint GLenum GLenum const GLvoid GLenum GLenum const GLfloat GLenum GLenum const GLint GLenum GLenum const GLdouble GLenum GLenum const GLfloat GLenum GLenum const GLint GLsizei GLuint GLfloat GLuint GLbitfield GLfloat GLint GLuint GLboolean GLenum GLfloat GLenum GLbitfield GLenum GLfloat GLfloat GLint GLint const GLfloat GLenum GLfloat GLfloat GLint GLint GLfloat GLfloat GLint GLint const GLfloat GLint GLfloat GLfloat GLint GLfloat GLfloat GLint GLfloat GLfloat const GLdouble const GLfloat const GLdouble const GLfloat GLint i
Definition: glfuncs.h:248
GLsizei GLenum const GLvoid GLsizei GLenum GLbyte GLbyte GLbyte GLdouble GLdouble GLdouble GLfloat GLfloat GLfloat GLint GLint GLint GLshort GLshort GLshort GLubyte GLubyte GLubyte GLuint GLuint GLuint GLushort GLushort GLushort GLbyte GLbyte GLbyte GLbyte GLdouble GLdouble GLdouble GLdouble GLfloat GLfloat GLfloat GLfloat GLint GLint GLint GLint GLshort GLshort GLshort GLshort GLubyte GLubyte GLubyte GLubyte GLuint GLuint GLuint GLuint GLushort GLushort GLushort GLushort GLboolean const GLdouble const GLfloat const GLint const GLshort const GLbyte const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLdouble const GLfloat const GLfloat const GLint const GLint const GLshort const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort GLenum GLenum GLenum GLfloat GLenum GLint GLenum GLenum GLenum GLfloat GLenum GLenum GLint GLenum GLfloat GLenum GLint GLint GLushort GLenum GLenum GLfloat GLenum GLenum GLint GLfloat const GLubyte GLenum GLenum GLenum const GLfloat GLenum GLenum const GLint GLenum GLint GLint GLsizei GLsizei GLint GLenum GLenum const GLvoid GLenum GLenum const GLfloat GLenum GLenum const GLint GLenum GLenum const GLdouble GLenum GLenum const GLfloat GLenum GLenum const GLint GLsizei GLuint GLfloat GLuint GLbitfield GLfloat GLint GLuint GLboolean GLenum GLfloat GLenum GLbitfield GLenum GLfloat GLfloat GLint GLint const GLfloat GLenum GLfloat GLfloat GLint GLint GLfloat GLfloat GLint GLint const GLfloat GLint GLfloat GLfloat GLint GLfloat GLfloat GLint GLfloat GLfloat const GLdouble * u
Definition: glfuncs.h:240
DWORD(CALLBACK * PRTL_WORK_ITEM_ROUTINE)(LPVOID)
Definition: winternl.h:3493
NTSYSAPI void WINAPI TpWaitForWork(TP_WORK *, BOOL)
Definition: threadpool.c:3331
NTSYSAPI void WINAPI TpReleaseWork(TP_WORK *)
Definition: threadpool.c:3055
NTSYSAPI void WINAPI TpDisassociateCallback(TP_CALLBACK_INSTANCE *)
Definition: threadpool.c:2850
NTSYSAPI NTSTATUS WINAPI TpQueryPoolStackInformation(TP_POOL *, TP_POOL_STACK_INFORMATION *stack_info)
Definition: threadpool.c:3364
NTSYSAPI void WINAPI TpStartAsyncIoOperation(TP_IO *)
Definition: threadpool.c:3273
NTSYSAPI void WINAPI TpReleasePool(TP_POOL *)
Definition: threadpool.c:3014
NTSYSAPI NTSTATUS WINAPI TpAllocTimer(TP_TIMER **, PTP_TIMER_CALLBACK, PVOID, TP_CALLBACK_ENVIRON *)
Definition: threadpool.c:2601
NTSYSAPI PVOID WINAPI RtlReAllocateHeap(HANDLE, ULONG, PVOID, SIZE_T) __WINE_ALLOC_SIZE(4) __WINE_DEALLOC(RtlFreeHeap
VOID(CALLBACK * PRTL_OVERLAPPED_COMPLETION_ROUTINE)(DWORD, DWORD, LPVOID)
Definition: winternl.h:3619
NTSYSAPI NTSTATUS WINAPI TpAllocCleanupGroup(TP_CLEANUP_GROUP **)
Definition: threadpool.c:2534
void(NTAPI * RTL_WAITORTIMERCALLBACKFUNC)(PVOID, BOOLEAN)
Definition: winternl.h:3494
NTSYSAPI void WINAPI TpWaitForTimer(TP_TIMER *, BOOL)
Definition: threadpool.c:3303
NTSYSAPI void WINAPI TpWaitForIoCompletion(TP_IO *, BOOL)
Definition: threadpool.c:3289
NTSYSAPI void WINAPI TpCancelAsyncIoOperation(TP_IO *)
Definition: threadpool.c:2718
NTSYSAPI BOOL WINAPI TpIsTimerSet(TP_TIMER *)
Definition: threadpool.c:2881
NTSYSAPI void WINAPI TpReleaseTimer(TP_TIMER *)
Definition: threadpool.c:3027
NTSYSAPI void WINAPI TpPostWork(TP_WORK *)
Definition: threadpool.c:2893
NTSYSAPI void WINAPI TpWaitForWait(TP_WAIT *, BOOL)
Definition: threadpool.c:3317
NTSYSAPI NTSTATUS WINAPI TpAllocIoCompletion(TP_IO **, HANDLE, PTP_IO_CALLBACK, void *, TP_CALLBACK_ENVIRON *)
Definition: threadpool.c:2544
@ ThreadNameInformation
Definition: winternl.h:2319
NTSYSAPI void WINAPI TpSetWait(TP_WAIT *, HANDLE, LARGE_INTEGER *)
Definition: threadpool.c:3191
NTSYSAPI NTSTATUS WINAPI TpAllocPool(TP_POOL **, PVOID)
Definition: threadpool.c:2588
NTSYSAPI NTSTATUS WINAPI RtlInitializeCriticalSectionEx(RTL_CRITICAL_SECTION *, ULONG, ULONG)
NTSYSAPI void WINAPI TpReleaseIoCompletion(TP_IO *)
Definition: threadpool.c:2991
void(CALLBACK * PNTAPCFUNC)(ULONG_PTR, ULONG_PTR, ULONG_PTR)
Definition: winternl.h:3491
void(CALLBACK * PRTL_THREAD_START_ROUTINE)(LPVOID)
Definition: winternl.h:3492
NTSYSAPI NTSTATUS WINAPI TpAllocWait(TP_WAIT **, PTP_WAIT_CALLBACK, PVOID, TP_CALLBACK_ENVIRON *)
Definition: threadpool.c:2677
NTSYSAPI void WINAPI TpSetPoolMaxThreads(TP_POOL *, DWORD)
Definition: threadpool.c:3069
NTSYSAPI void WINAPI TpCallbackReleaseSemaphoreOnCompletion(TP_CALLBACK_INSTANCE *, HANDLE, DWORD)
Definition: threadpool.c:2808
NTSYSAPI NTSTATUS WINAPI TpAllocWork(TP_WORK **, PTP_WORK_CALLBACK, PVOID, TP_CALLBACK_ENVIRON *)
Definition: threadpool.c:2687
NTSYSAPI ULONG WINAPI RtlNtStatusToDosError(NTSTATUS)
NTSYSAPI void WINAPI TpReleaseCleanupGroupMembers(TP_CLEANUP_GROUP *, BOOL, PVOID)
Definition: threadpool.c:2918
NTSYSAPI void WINAPI TpReleaseCleanupGroup(TP_CLEANUP_GROUP *)
Definition: threadpool.c:2905
NTSYSAPI void WINAPI TpSetTimer(TP_TIMER *, LARGE_INTEGER *, LONG, LONG)
Definition: threadpool.c:3113
NTSYSAPI NTSTATUS WINAPI TpSimpleTryPost(PTP_SIMPLE_CALLBACK, PVOID, TP_CALLBACK_ENVIRON *)
Definition: threadpool.c:3243
NTSYSAPI NTSTATUS WINAPI TpCallbackMayRunLong(TP_CALLBACK_INSTANCE *)
Definition: threadpool.c:2753
void(CALLBACK * PTP_IO_CALLBACK)(PTP_CALLBACK_INSTANCE, void *, void *, IO_STATUS_BLOCK *, PTP_IO)
Definition: winternl.h:4110
NTSYSAPI NTSTATUS WINAPI TpSetPoolStackInformation(TP_POOL *, TP_POOL_STACK_INFORMATION *stack_info)
Definition: threadpool.c:3345
NTSYSAPI void WINAPI TpReleaseWait(TP_WAIT *)
Definition: threadpool.c:3041
#define InterlockedCompareExchangePointer
Definition: interlocked.h:144
NTSTATUS NTAPI NtSetIoCompletion(IN HANDLE IoCompletionPortHandle, IN PVOID CompletionKey, IN PVOID CompletionContext, IN NTSTATUS CompletionStatus, IN ULONG CompletionInformation)
Definition: iocomp.c:569
NTSTATUS NTAPI NtRemoveIoCompletion(IN HANDLE IoCompletionHandle, OUT PVOID *KeyContext, OUT PVOID *ApcContext, OUT PIO_STATUS_BLOCK IoStatusBlock, IN PLARGE_INTEGER Timeout OPTIONAL)
Definition: iocomp.c:445
NTSTATUS NTAPI NtCreateIoCompletion(OUT PHANDLE IoCompletionHandle, IN ACCESS_MASK DesiredAccess, IN POBJECT_ATTRIBUTES ObjectAttributes, IN ULONG NumberOfConcurrentThreads)
Definition: iocomp.c:253
#define NtCurrentTeb
uint32_t entry
Definition: isohybrid.c:63
#define EVENT_ALL_ACCESS
Definition: isotest.c:82
NTSTATUS NTAPI LdrUnloadDll(_In_ PVOID BaseAddress)
Definition: ldrapi.c:1291
NTSTATUS NTAPI LdrAddRefDll(_In_ ULONG Flags, _In_ PVOID BaseAddress)
Definition: ldrapi.c:1205
if(dx< 0)
Definition: linetemp.h:194
__u16 time
Definition: mkdosfs.c:8
static const CLSID * objects[]
Definition: apphelp.c:112
static PVOID ptr
Definition: dispmode.c:27
static IPrintDialogCallback callback
Definition: printdlg.c:326
IMAGE_NT_HEADERS nt
Definition: module.c:50
#define IO_COMPLETION_ALL_ACCESS
Definition: file.c:72
static HANDLE PIO_APC_ROUTINE PVOID PIO_STATUS_BLOCK io
Definition: file.c:100
static PIO_STATUS_BLOCK iosb
Definition: file.c:98
HANDLE semaphore
Definition: threadpool.c:1889
static TP_CALLBACK_ENVIRON *static TP_CALLBACK_ENVIRON *static PTP_WORK_CALLBACK
Definition: threadpool.c:32
static PTP_TIMER_CALLBACK
Definition: threadpool.c:30
static PTP_IO_CALLBACK
Definition: threadpool.c:28
static DWORD
Definition: threadpool.c:34
static TP_CALLBACK_ENVIRON *static PTP_WAIT_CALLBACK
Definition: threadpool.c:31
static void TP_CALLBACK_ENVIRON *static PVOID
Definition: threadpool.c:29
#define min(a, b)
Definition: monoChain.cc:55
NTSTATUS NTAPI NtReleaseMutant(IN HANDLE MutantHandle, IN PLONG PreviousCount OPTIONAL)
Definition: mutant.c:296
NTSYSAPI VOID NTAPI RtlInitializeConditionVariable(_Out_ PRTL_CONDITION_VARIABLE ConditionVariable)
NTSYSAPI NTSTATUS NTAPI RtlDeleteCriticalSection(_In_ PRTL_CRITICAL_SECTION CriticalSection)
NTSYSAPI NTSTATUS NTAPI RtlQueueWorkItem(_In_ WORKERCALLBACKFUNC Function, _In_opt_ PVOID Context, _In_ ULONG Flags)
NTSYSAPI VOID NTAPI RtlWakeAllConditionVariable(_Inout_ PRTL_CONDITION_VARIABLE ConditionVariable)
NTSYSAPI NTSTATUS NTAPI RtlRegisterWait(_In_ PHANDLE phNewWaitObject, _In_ HANDLE hObject, _In_ WAITORTIMERCALLBACKFUNC Callback, _In_ PVOID pvContext, _In_ ULONG ulMilliseconds, _In_ ULONG ulFlags)
NTSYSAPI NTSTATUS NTAPI RtlSleepConditionVariableCS(_Inout_ PRTL_CONDITION_VARIABLE ConditionVariable, _Inout_ PRTL_CRITICAL_SECTION CriticalSection, _In_opt_ PLARGE_INTEGER TimeOut)
NTSYSAPI NTSTATUS NTAPI RtlEnterCriticalSection(_In_ PRTL_CRITICAL_SECTION CriticalSection)
NTSYSAPI VOID NTAPI RtlExitUserThread(_In_ NTSTATUS Status)
NTSYSAPI VOID NTAPI RtlWakeConditionVariable(_Inout_ PRTL_CONDITION_VARIABLE ConditionVariable)
NTSYSAPI NTSTATUS NTAPI RtlLeaveCriticalSection(_In_ PRTL_CRITICAL_SECTION CriticalSection)
NTSYSAPI NTSTATUS NTAPI RtlCreateUserThread(_In_ PVOID ThreadContext, _Out_ HANDLE *OutThreadHandle, _Reserved_ PVOID Reserved1, _Reserved_ PVOID Reserved2, _Reserved_ PVOID Reserved3, _Reserved_ PVOID Reserved4, _Reserved_ PVOID Reserved5, _Reserved_ PVOID Reserved6, _Reserved_ PVOID Reserved7, _Reserved_ PVOID Reserved8)
NTSYSAPI NTSTATUS NTAPI RtlInitializeCriticalSection(_In_ PRTL_CRITICAL_SECTION CriticalSection)
NTSYSAPI NTSTATUS NTAPI RtlDeregisterWaitEx(_In_ HANDLE hWaitHandle, _In_opt_ HANDLE hCompletionEvent)
NTSYSAPI NTSTATUS NTAPI RtlDeregisterWait(_In_ HANDLE hWaitHandle)
ULONG(NTAPI * PTHREAD_START_ROUTINE)(PVOID Parameter)
Definition: rtltypes.h:560
NTSYSAPI VOID NTAPI RtlInitUnicodeString(PUNICODE_STRING DestinationString, PCWSTR SourceString)
NTSYSAPI NTSTATUS NTAPI NtSetInformationFile(IN HANDLE hFile, OUT PIO_STATUS_BLOCK pIoStatusBlock, IN PVOID FileInformationBuffer, IN ULONG FileInformationBufferLength, IN FILE_INFORMATION_CLASS FileInfoClass)
Definition: iofunc.c:3096
#define LPVOID
Definition: nt_native.h:45
NTSTATUS NTAPI NtClose(IN HANDLE Handle)
Definition: obhandle.c:3402
NTSYSAPI NTSTATUS NTAPI NtWaitForSingleObject(IN HANDLE hObject, IN BOOLEAN bAlertable, IN PLARGE_INTEGER Timeout)
#define MAXLONGLONG
@ SynchronizationEvent
VOID NTAPI RtlpInitializeThreadPooling(VOID)
NTSTATUS NTAPI NtSetEvent(IN HANDLE EventHandle, OUT PLONG PreviousState OPTIONAL)
Definition: event.c:463
NTSTATUS NTAPI NtCreateEvent(OUT PHANDLE EventHandle, IN ACCESS_MASK DesiredAccess, IN POBJECT_ATTRIBUTES ObjectAttributes OPTIONAL, IN EVENT_TYPE EventType, IN BOOLEAN InitialState)
Definition: event.c:96
NTSTATUS NTAPI NtQueryPerformanceCounter(OUT PLARGE_INTEGER PerformanceCounter, OUT PLARGE_INTEGER PerformanceFrequency OPTIONAL)
Definition: profile.c:278
NTSTATUS NTAPI NtQuerySystemTime(OUT PLARGE_INTEGER SystemTime)
Definition: time.c:569
NTSTATUS NTAPI NtSetInformationThread(_In_ HANDLE ThreadHandle, _In_ THREADINFOCLASS ThreadInformationClass, _In_reads_bytes_(ThreadInformationLength) PVOID ThreadInformation, _In_ ULONG ThreadInformationLength)
Definition: query.c:2268
PVOID *typedef PHANDLE
Definition: ntsecpkg.h:455
#define STATUS_WAIT_0
Definition: ntstatus.h:330
#define STATUS_TOO_MANY_THREADS
Definition: ntstatus.h:627
#define STATUS_INVALID_PARAMETER_1
Definition: ntstatus.h:569
NTSTATUS NTAPI NtWaitForMultipleObjects(IN ULONG ObjectCount, IN PHANDLE HandleArray, IN WAIT_TYPE WaitType, IN BOOLEAN Alertable, IN PLARGE_INTEGER TimeOut OPTIONAL)
Definition: obwait.c:46
#define BOOLEAN
Definition: pedump.c:73
long LONG
Definition: pedump.c:60
static unsigned __int64 next
Definition: rand_nt.c:6
#define err(...)
static calc_node_t temp
Definition: rpn_ieee.c:38
#define LIST_FOR_EACH_ENTRY(elem, list, type, field)
Definition: list.h:198
#define LIST_FOR_EACH_ENTRY_SAFE(cursor, cursor2, list, type, field)
Definition: list.h:204
#define LIST_FOR_EACH(cursor, list)
Definition: list.h:188
__WINE_SERVER_LIST_INLINE void list_add_before(struct list *elem, struct list *to_add)
Definition: list.h:87
#define memset(x, y, z)
Definition: compat.h:39
static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug
Definition: threadpool.c:317
static void tp_object_wait(struct threadpool_object *object, BOOL group_wait)
Definition: threadpool.c:2203
RTL_CRITICAL_SECTION threadpool_compl_cs
Definition: threadpool.c:89
static BOOL tp_object_release(struct threadpool_object *object)
Definition: threadpool.c:2252
NTSTATUS WINAPI RtlCreateTimer(HANDLE TimerQueue, HANDLE *NewTimer, RTL_WAITORTIMERCALLBACKFUNC Callback, PVOID Parameter, DWORD DueTime, DWORD Period, ULONG Flags)
Definition: threadpool.c:1003
static PLARGE_INTEGER get_nt_timeout(PLARGE_INTEGER pTime, ULONG timeout)
Definition: threadpool.c:627
NTSTATUS WINAPI RtlSetIoCompletionCallback(HANDLE FileHandle, PRTL_OVERLAPPED_COMPLETION_ROUTINE Function, ULONG Flags)
Definition: threadpool.c:589
static void set_thread_name(const WCHAR *name)
Definition: threadpool.c:468
static void CALLBACK ioqueue_thread_proc(void *param)
Definition: threadpool.c:1615
static struct threadpool_group * impl_from_TP_CLEANUP_GROUP(TP_CLEANUP_GROUP *group)
Definition: threadpool.c:420
VOID WINAPI TpCallbackUnloadDllOnCompletion(TP_CALLBACK_INSTANCE *instance, HMODULE module)
Definition: threadpool.c:2837
static void queue_move_timer(struct queue_timer *t, ULONGLONG time, BOOL set_event)
Definition: threadpool.c:710
static struct @5307 waitqueue
HANDLE compl_port
Definition: threadpool.c:88
static void CALLBACK process_rtl_work_item(TP_CALLBACK_INSTANCE *instance, void *userdata)
Definition: threadpool.c:479
static void tp_group_shutdown(struct threadpool_group *group)
Definition: threadpool.c:1992
NTSTATUS WINAPI RtlCreateTimerQueue(PHANDLE NewTimerQueue)
Definition: threadpool.c:864
#define MAXIMUM_WAITQUEUE_OBJECTS
Definition: threadpool.c:140
HANDLE port
Definition: threadpool.c:366
LONG objcount
Definition: threadpool.c:286
static void queue_add_timer(struct queue_timer *t, ULONGLONG time, BOOL set_event)
Definition: threadpool.c:684
BOOL WINAPI TpSetPoolMinThreads(TP_POOL *pool, DWORD minimum)
Definition: threadpool.c:3084
static BOOL array_reserve(void **elements, unsigned int *capacity, unsigned int count, unsigned int size)
Definition: threadpool.c:441
static struct @5306 timerqueue
static struct list * threadpool_get_next_item(const struct threadpool *pool)
Definition: threadpool.c:2292
static void tp_object_initialize(struct threadpool_object *object, struct threadpool *pool, PVOID userdata, TP_CALLBACK_ENVIRON *environment)
Definition: threadpool.c:2026
static void tp_waitqueue_unlock(struct threadpool_object *wait)
Definition: threadpool.c:1593
static ULONG queue_get_timeout(struct timer_queue *q)
Definition: threadpool.c:763
static NTSTATUS tp_waitqueue_lock(struct threadpool_object *wait)
Definition: threadpool.c:1515
static BOOL tp_group_release(struct threadpool_group *group)
Definition: threadpool.c:2002
threadpool_objtype
Definition: threadpool.c:162
@ TP_OBJECT_TYPE_SIMPLE
Definition: threadpool.c:163
@ TP_OBJECT_TYPE_WORK
Definition: threadpool.c:164
@ TP_OBJECT_TYPE_TIMER
Definition: threadpool.c:165
@ TP_OBJECT_TYPE_IO
Definition: threadpool.c:167
@ TP_OBJECT_TYPE_WAIT
Definition: threadpool.c:166
static struct threadpool * default_threadpool
Definition: threadpool.c:439
static ULONGLONG queue_current_time(void)
Definition: threadpool.c:677
static DWORD WINAPI timer_callback_wrapper(LPVOID p)
Definition: threadpool.c:669
static NTSTATUS tp_threadpool_alloc(struct threadpool **out)
Definition: threadpool.c:1782
NTSTATUS WINAPI RtlDeleteTimer(HANDLE TimerQueue, HANDLE Timer, HANDLE CompletionEvent)
Definition: threadpool.c:1098
static void tp_object_submit(struct threadpool_object *object, BOOL signaled)
Definition: threadpool.c:2119
static NTSTATUS tp_new_worker_thread(struct threadpool *pool)
Definition: threadpool.c:1242
static BOOL tp_threadpool_release(struct threadpool *pool)
Definition: threadpool.c:1845
struct list pending_timers
Definition: threadpool.c:288
static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug
Definition: threadpool.c:280
static struct threadpool_object * impl_from_TP_WORK(TP_WORK *work)
Definition: threadpool.c:392
CRITICAL_SECTION cs
Definition: threadpool.c:285
#define TIMER_QUEUE_MAGIC
Definition: threadpool.c:80
static struct threadpool_object * impl_from_TP_WAIT(TP_WAIT *wait)
Definition: threadpool.c:406
static BOOL object_is_finished(struct threadpool_object *object, BOOL group)
Definition: threadpool.c:2184
#define THREADPOOL_WORKER_TIMEOUT
Definition: threadpool.c:139
VOID WINAPI TpCallbackSetEventOnCompletion(TP_CALLBACK_INSTANCE *instance, HANDLE event)
Definition: threadpool.c:2824
struct list buckets
Definition: threadpool.c:324
#define EXPIRE_NEVER
Definition: threadpool.c:79
static void tp_ioqueue_unlock(struct threadpool_object *io)
Definition: threadpool.c:2218
static void CALLBACK threadpool_worker_proc(void *param)
Definition: threadpool.c:2471
static void tp_threadpool_unlock(struct threadpool *pool)
Definition: threadpool.c:1947
static void WINAPI timer_queue_thread_proc(LPVOID p)
Definition: threadpool.c:788
static NTSTATUS tp_group_alloc(struct threadpool_group **out)
Definition: threadpool.c:1960
VOID WINAPI TpCallbackReleaseMutexOnCompletion(TP_CALLBACK_INSTANCE *instance, HANDLE mutex)
Definition: threadpool.c:2795
static struct timer_queue * get_timer_queue(HANDLE TimerQueue)
Definition: threadpool.c:954
static struct @5305 old_threadpool
LONG num_buckets
Definition: threadpool.c:323
static void queue_destroy_timer(struct queue_timer *t)
Definition: threadpool.c:837
static struct threadpool_instance * impl_from_TP_CALLBACK_INSTANCE(TP_CALLBACK_INSTANCE *instance)
Definition: threadpool.c:425
static DWORD CALLBACK iocp_poller(LPVOID Arg)
Definition: threadpool.c:539
static struct threadpool_object * impl_from_TP_TIMER(TP_TIMER *timer)
Definition: threadpool.c:399
static NTSTATUS tp_alloc_wait(TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata, TP_CALLBACK_ENVIRON *environment, DWORD flags)
Definition: threadpool.c:2638
RTL_CONDITION_VARIABLE update_event
Definition: threadpool.c:289
static struct threadpool_object * impl_from_TP_IO(TP_IO *io)
Definition: threadpool.c:413
NTSTATUS WINAPI RtlDeleteTimerQueueEx(HANDLE TimerQueue, HANDLE CompletionEvent)
Definition: threadpool.c:910
static NTSTATUS tp_ioqueue_lock(struct threadpool_object *io, HANDLE file)
Definition: threadpool.c:1724
static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug
Definition: threadpool.c:83
static void CALLBACK timerqueue_thread_proc(void *param)
Definition: threadpool.c:1144
static void tp_object_execute(struct threadpool_object *object, BOOL wait_thread)
Definition: threadpool.c:2312
static void tp_timerqueue_unlock(struct threadpool_object *timer)
Definition: threadpool.c:1307
static struct @5308 ioqueue
static void CALLBACK waitqueue_thread_proc(void *param)
Definition: threadpool.c:1339
static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug
Definition: threadpool.c:358
static void tp_object_cancel(struct threadpool_object *object)
Definition: threadpool.c:2158
NTSTATUS WINAPI RtlUpdateTimer(HANDLE TimerQueue, HANDLE Timer, DWORD DueTime, DWORD Period)
Definition: threadpool.c:1062
BOOL thread_running
Definition: threadpool.c:287
static void timer_cleanup_callback(struct queue_timer *t)
Definition: threadpool.c:655
VOID WINAPI TpCallbackLeaveCriticalSectionOnCompletion(TP_CALLBACK_INSTANCE *instance, CRITICAL_SECTION *crit)
Definition: threadpool.c:2740
static void queue_remove_timer(struct queue_timer *t)
Definition: threadpool.c:636
static struct threadpool * impl_from_TP_POOL(TP_POOL *pool)
Definition: threadpool.c:387
static void tp_threadpool_shutdown(struct threadpool *pool)
Definition: threadpool.c:1832
static void queue_timer_expire(struct timer_queue *q)
Definition: threadpool.c:718
static void tp_object_prepare_shutdown(struct threadpool_object *object)
Definition: threadpool.c:2237
static NTSTATUS tp_timerqueue_lock(struct threadpool_object *timer)
Definition: threadpool.c:1265
static NTSTATUS tp_threadpool_lock(struct threadpool **out, TP_CALLBACK_ENVIRON *environment)
Definition: threadpool.c:1874
static void CALLBACK rtl_wait_callback(TP_CALLBACK_INSTANCE *instance, void *userdata, TP_WAIT *wait, TP_WAIT_RESULT result)
Definition: threadpool.c:3381
static void tp_object_prio_queue(struct threadpool_object *object)
Definition: threadpool.c:2107
NTSTATUS NTAPI NtReleaseSemaphore(IN HANDLE SemaphoreHandle, IN LONG ReleaseCount, OUT PLONG PreviousCount OPTIONAL)
Definition: sem.c:295
#define STATUS_SUCCESS
Definition: shellext.h:65
namespace GUID const ADDRINFOEXW ADDRINFOEXW struct timeval OVERLAPPED * overlapped
Definition: sock.c:81
INT WSAAPI shutdown(IN SOCKET s, IN INT how)
Definition: sockctrl.c:506
#define TRACE(s)
Definition: solgame.cpp:4
IMAGE_OPTIONAL_HEADER32 OptionalHeader
Definition: ntddk_ex.h:184
PVOID ImageBaseAddress
Definition: ntddk_ex.h:245
PTP_CLEANUP_GROUP CleanupGroup
Definition: winnt_old.h:4690
union _TP_CALLBACK_ENVIRON_V3::@4519 u
PTP_SIMPLE_CALLBACK FinalizationCallback
Definition: winnt_old.h:4694
TP_CALLBACK_PRIORITY CallbackPriority
Definition: winnt_old.h:4703
PTP_CLEANUP_GROUP_CANCEL_CALLBACK CleanupGroupCancelCallback
Definition: winnt_old.h:4691
struct _ACTIVATION_CONTEXT * ActivationContext
Definition: winnt_old.h:4693
struct _TP_CALLBACK_ENVIRON_V3::@4519::@4520 s
Definition: http.c:7252
Definition: fci.c:127
IO_STATUS_BLOCK iosb
Definition: threadpool.c:172
ULONG_PTR cvalue
Definition: threadpool.c:173
Definition: copy.c:22
Definition: list.h:15
Definition: module.h:456
Definition: name.c:39
BOOL destroy
Definition: threadpool.c:121
RTL_WAITORTIMERCALLBACKFUNC callback
Definition: threadpool.c:116
DWORD period
Definition: threadpool.c:118
ULONG runcount
Definition: threadpool.c:115
struct timer_queue * q
Definition: threadpool.c:113
PVOID param
Definition: threadpool.c:117
HANDLE event
Definition: threadpool.c:122
ULONG flags
Definition: threadpool.c:119
ULONGLONG expire
Definition: threadpool.c:120
struct list entry
Definition: threadpool.c:114
PVOID context
Definition: threadpool.c:76
PRTL_WORK_ITEM_ROUTINE function
Definition: threadpool.c:75
Definition: ps.c:97
struct list members
Definition: threadpool.c:275
CRITICAL_SECTION cs
Definition: threadpool.c:273
CRITICAL_SECTION * critical_section
Definition: threadpool.c:259
struct threadpool_instance::@5315 cleanup
struct threadpool_object * object
Definition: threadpool.c:253
struct list wait_entry
Definition: threadpool.c:233
PTP_WAIT_CALLBACK callback
Definition: threadpool.c:228
PTP_SIMPLE_CALLBACK callback
Definition: threadpool.c:208
PTP_CLEANUP_GROUP_CANCEL_CALLBACK group_cancel_callback
Definition: threadpool.c:187
RTL_CONDITION_VARIABLE finished_event
Definition: threadpool.c:197
TP_CALLBACK_PRIORITY priority
Definition: threadpool.c:191
ULONGLONG timeout
Definition: threadpool.c:222
struct io_completion * completions
Definition: threadpool.c:245
PTP_IO_CALLBACK callback
Definition: threadpool.c:241
struct threadpool * pool
Definition: threadpool.c:184
unsigned int skipped_count
Definition: threadpool.c:243
union threadpool_object::@5309 u
struct list group_entry
Definition: threadpool.c:193
enum threadpool_objtype type
Definition: threadpool.c:183
struct threadpool_object::@5309::@5313 wait
struct threadpool_object::@5309::@5314 io
void * win32_callback
Definition: threadpool.c:179
unsigned int completion_count
Definition: threadpool.c:243
unsigned int completion_max
Definition: threadpool.c:243
unsigned int pending_count
Definition: threadpool.c:243
struct list pool_entry
Definition: threadpool.c:196
LONG num_associated_callbacks
Definition: threadpool.c:202
RTL_WAITORTIMERCALLBACKFUNC rtl_callback
Definition: threadpool.c:237
LONG num_running_callbacks
Definition: threadpool.c:201
struct threadpool_object::@5309::@5310 simple
LONG num_pending_callbacks
Definition: threadpool.c:200
PTP_WORK_CALLBACK callback
Definition: threadpool.c:212
RTL_CONDITION_VARIABLE group_finished_event
Definition: threadpool.c:198
struct threadpool_object::@5309::@5311 work
PTP_SIMPLE_CALLBACK finalization_callback
Definition: threadpool.c:188
struct list timer_entry
Definition: threadpool.c:220
struct waitqueue_bucket * bucket
Definition: threadpool.c:231
HANDLE completed_event
Definition: threadpool.c:199
PTP_TIMER_CALLBACK callback
Definition: threadpool.c:216
struct threadpool_group * group
Definition: threadpool.c:185
struct threadpool_object::@5309::@5312 timer
int max_workers
Definition: threadpool.c:153
TP_POOL_STACK_INFORMATION stack_info
Definition: threadpool.c:158
CRITICAL_SECTION cs
Definition: threadpool.c:148
RTL_CONDITION_VARIABLE update_event
Definition: threadpool.c:151
int num_workers
Definition: threadpool.c:155
LONG objcount
Definition: threadpool.c:146
int min_workers
Definition: threadpool.c:154
struct list pools[3]
Definition: threadpool.c:150
LONG refcount
Definition: threadpool.c:145
BOOL shutdown
Definition: threadpool.c:147
int num_busy_workers
Definition: threadpool.c:156
HANDLE compl_port
Definition: threadpool.c:157
Definition: dhcpd.h:248
HANDLE event
Definition: threadpool.c:131
RTL_CRITICAL_SECTION cs
Definition: threadpool.c:128
HANDLE thread
Definition: threadpool.c:132
DWORD magic
Definition: threadpool.c:127
struct list timers
Definition: threadpool.c:129
struct list waiting
Definition: threadpool.c:351
HANDLE update_event
Definition: threadpool.c:352
struct list bucket_entry
Definition: threadpool.c:348
struct list reserved
Definition: threadpool.c:350
#define max(a, b)
Definition: svc.c:63
#define LIST_INIT(head)
Definition: queue.h:197
#define LIST_ENTRY(type)
Definition: queue.h:175
#define STATUS_PENDING
Definition: telnetd.h:14
#define DWORD_PTR
Definition: treelist.c:76
uint32_t * PULONG_PTR
Definition: typedefs.h:65
int64_t LONGLONG
Definition: typedefs.h:68
#define NTAPI
Definition: typedefs.h:36
ULONG_PTR SIZE_T
Definition: typedefs.h:80
uint32_t ULONG_PTR
Definition: typedefs.h:65
uint32_t ULONG
Definition: typedefs.h:59
uint64_t ULONGLONG
Definition: typedefs.h:67
#define STATUS_INVALID_PARAMETER
Definition: udferr_usr.h:135
#define STATUS_UNSUCCESSFUL
Definition: udferr_usr.h:132
static EFI_HANDLE * handles
Definition: uefidisk.c:62
LONGLONG QuadPart
Definition: typedefs.h:114
Definition: pdh_main.c:96
wchar_t tm const _CrtWcstime_Writes_and_advances_ptr_ count wchar_t ** out
Definition: wcsftime.cpp:383
_In_ WDFINTERRUPT _In_ PFN_WDF_INTERRUPT_SYNCHRONIZE Callback
Definition: wdfinterrupt.h:458
_In_ WDFTIMER _In_ LONGLONG DueTime
Definition: wdftimer.h:190
HANDLE WINAPI GetCurrentThread(void)
Definition: proc.c:1148
DWORD WINAPI GetCurrentThreadId(void)
Definition: thread.c:459
#define WAIT_OBJECT_0
Definition: winbase.h:383
#define WINAPI
Definition: msvc.h:6
#define WT_TRANSFER_IMPERSONATION
Definition: winnt_old.h:1093
@ TP_CALLBACK_PRIORITY_NORMAL
Definition: winnt_old.h:4653
@ TP_CALLBACK_PRIORITY_HIGH
Definition: winnt_old.h:4652
@ TP_CALLBACK_PRIORITY_LOW
Definition: winnt_old.h:4654
struct _TP_CLEANUP_GROUP TP_CLEANUP_GROUP
Definition: winnt_old.h:4671
VOID(NTAPI * PTP_WAIT_CALLBACK)(PTP_CALLBACK_INSTANCE, PVOID, PTP_WAIT, TP_WAIT_RESULT)
Definition: winnt_old.h:4684
#define WT_EXECUTEINPERSISTENTTHREAD
Definition: winnt_old.h:1092
struct _TP_CALLBACK_INSTANCE * PTP_CALLBACK_INSTANCE
Definition: winnt_old.h:4643
#define WT_EXECUTEONLYONCE
Definition: winnt_old.h:1088
struct _TP_POOL TP_POOL
Definition: winnt_old.h:4641
#define WT_EXECUTEINIOTHREAD
Definition: winnt_old.h:1085
DWORD TP_WAIT_RESULT
Definition: winnt_old.h:4648
#define RTL_CRITICAL_SECTION_FLAG_FORCE_DEBUG_INFO
Definition: winnt_old.h:1138
#define WT_EXECUTELONGFUNCTION
Definition: winnt_old.h:1089
#define WT_EXECUTEINWAITTHREAD
Definition: winnt_old.h:1087
VOID(NTAPI * PTP_CLEANUP_GROUP_CANCEL_CALLBACK)(_Inout_opt_ PVOID ObjectContext, _Inout_opt_ PVOID CleanupContext)
Definition: winnt_old.h:4679
VOID(NTAPI * PTP_WORK_CALLBACK)(_Inout_ PTP_CALLBACK_INSTANCE Instance, _Inout_opt_ PVOID Context, _Inout_ PTP_WORK Work)
Definition: winnt_old.h:4666
#define WT_EXECUTEINTIMERTHREAD
Definition: winnt_old.h:1090
struct _TP_WAIT TP_WAIT
Definition: winnt_old.h:4645
struct _TP_IO * PTP_IO
Definition: winnt_old.h:4646
enum _TP_CALLBACK_PRIORITY TP_CALLBACK_PRIORITY
struct _TP_WORK TP_WORK
Definition: winnt_old.h:4642
struct _TP_IO TP_IO
Definition: winnt_old.h:4646
struct _TP_TIMER TP_TIMER
Definition: winnt_old.h:4644
VOID(NTAPI * PTP_SIMPLE_CALLBACK)(_Inout_ PTP_CALLBACK_INSTANCE Instance, _Inout_opt_ PVOID Context)
Definition: winnt_old.h:4674
VOID(NTAPI * PTP_TIMER_CALLBACK)(PTP_CALLBACK_INSTANCE, PVOID, PTP_TIMER)
Definition: winnt_old.h:4683
struct _TP_CALLBACK_INSTANCE TP_CALLBACK_INSTANCE
Definition: winnt_old.h:4643
#define WT_EXECUTEDEFAULT
Definition: winnt_old.h:1084
_Must_inspect_result_ _In_ ULONG Flags
Definition: wsk.h:170
_In_ LARGE_INTEGER _In_ ULONG Period
Definition: kefuncs.h:1313
#define RTL_CONDITION_VARIABLE_INIT
Definition: rtltypes.h:282
_Inout_opt_ PVOID Parameter
Definition: rtltypes.h:336
__wchar_t WCHAR
Definition: xmlstorage.h:180