ReactOS 0.4.16-dev-1946-g52006dd
threadpool.c File Reference
#include <assert.h>
#include <stdarg.h>
#include <limits.h>
#include "ntstatus.h"
#include "winternl.h"
#include "wine/debug.h"
#include "wine/list.h"
#include "ntdll_misc.h"
Include dependency graph for threadpool.c:

Go to the source code of this file.

Classes

struct  rtl_work_item
 
struct  queue_timer
 
struct  timer_queue
 
struct  threadpool
 
struct  io_completion
 
struct  threadpool_object
 
struct  threadpool_instance
 
struct  threadpool_group
 
struct  waitqueue_bucket
 

Macros

#define WIN32_NO_STATUS
 
#define EXPIRE_NEVER   (~(ULONGLONG)0)
 
#define TIMER_QUEUE_MAGIC   0x516d6954 /* TimQ */
 
#define THREADPOOL_WORKER_TIMEOUT   5000
 
#define MAXIMUM_WAITQUEUE_OBJECTS   (MAXIMUM_WAIT_OBJECTS - 1)
 

Enumerations

enum  threadpool_objtype {
  TP_OBJECT_TYPE_SIMPLE , TP_OBJECT_TYPE_WORK , TP_OBJECT_TYPE_TIMER , TP_OBJECT_TYPE_WAIT ,
  TP_OBJECT_TYPE_IO
}
 

Functions

 WINE_DEFAULT_DEBUG_CHANNEL (threadpool)
 
static struct threadpoolimpl_from_TP_POOL (TP_POOL *pool)
 
static struct threadpool_objectimpl_from_TP_WORK (TP_WORK *work)
 
static struct threadpool_objectimpl_from_TP_TIMER (TP_TIMER *timer)
 
static struct threadpool_objectimpl_from_TP_WAIT (TP_WAIT *wait)
 
static struct threadpool_objectimpl_from_TP_IO (TP_IO *io)
 
static struct threadpool_groupimpl_from_TP_CLEANUP_GROUP (TP_CLEANUP_GROUP *group)
 
static struct threadpool_instanceimpl_from_TP_CALLBACK_INSTANCE (TP_CALLBACK_INSTANCE *instance)
 
static void CALLBACK threadpool_worker_proc (void *param)
 
static void tp_object_submit (struct threadpool_object *object, BOOL signaled)
 
static void tp_object_execute (struct threadpool_object *object, BOOL wait_thread)
 
static void tp_object_prepare_shutdown (struct threadpool_object *object)
 
static BOOL tp_object_release (struct threadpool_object *object)
 
static BOOL array_reserve (void **elements, unsigned int *capacity, unsigned int count, unsigned int size)
 
static void set_thread_name (const WCHAR *name)
 
static void CALLBACK process_rtl_work_item (TP_CALLBACK_INSTANCE *instance, void *userdata)
 
NTSTATUS WINAPI RtlQueueWorkItem (PRTL_WORK_ITEM_ROUTINE function, PVOID context, ULONG flags)
 
static DWORD CALLBACK iocp_poller (LPVOID Arg)
 
NTSTATUS WINAPI RtlSetIoCompletionCallback (HANDLE FileHandle, PRTL_OVERLAPPED_COMPLETION_ROUTINE Function, ULONG Flags)
 
static PLARGE_INTEGER get_nt_timeout (PLARGE_INTEGER pTime, ULONG timeout)
 
static void queue_remove_timer (struct queue_timer *t)
 
static void timer_cleanup_callback (struct queue_timer *t)
 
static DWORD WINAPI timer_callback_wrapper (LPVOID p)
 
static ULONGLONG queue_current_time (void)
 
static void queue_add_timer (struct queue_timer *t, ULONGLONG time, BOOL set_event)
 
static void queue_move_timer (struct queue_timer *t, ULONGLONG time, BOOL set_event)
 
static void queue_timer_expire (struct timer_queue *q)
 
static ULONG queue_get_timeout (struct timer_queue *q)
 
static void WINAPI timer_queue_thread_proc (LPVOID p)
 
static void queue_destroy_timer (struct queue_timer *t)
 
NTSTATUS WINAPI RtlCreateTimerQueue (PHANDLE NewTimerQueue)
 
NTSTATUS WINAPI RtlDeleteTimerQueueEx (HANDLE TimerQueue, HANDLE CompletionEvent)
 
static struct timer_queueget_timer_queue (HANDLE TimerQueue)
 
NTSTATUS WINAPI RtlCreateTimer (HANDLE TimerQueue, HANDLE *NewTimer, RTL_WAITORTIMERCALLBACKFUNC Callback, PVOID Parameter, DWORD DueTime, DWORD Period, ULONG Flags)
 
NTSTATUS WINAPI RtlUpdateTimer (HANDLE TimerQueue, HANDLE Timer, DWORD DueTime, DWORD Period)
 
NTSTATUS WINAPI RtlDeleteTimer (HANDLE TimerQueue, HANDLE Timer, HANDLE CompletionEvent)
 
static void CALLBACK timerqueue_thread_proc (void *param)
 
static NTSTATUS tp_new_worker_thread (struct threadpool *pool)
 
static NTSTATUS tp_timerqueue_lock (struct threadpool_object *timer)
 
static void tp_timerqueue_unlock (struct threadpool_object *timer)
 
static void CALLBACK waitqueue_thread_proc (void *param)
 
static NTSTATUS tp_waitqueue_lock (struct threadpool_object *wait)
 
static void tp_waitqueue_unlock (struct threadpool_object *wait)
 
static void CALLBACK ioqueue_thread_proc (void *param)
 
static NTSTATUS tp_ioqueue_lock (struct threadpool_object *io, HANDLE file)
 
static NTSTATUS tp_threadpool_alloc (struct threadpool **out)
 
static void tp_threadpool_shutdown (struct threadpool *pool)
 
static BOOL tp_threadpool_release (struct threadpool *pool)
 
static NTSTATUS tp_threadpool_lock (struct threadpool **out, TP_CALLBACK_ENVIRON *environment)
 
static void tp_threadpool_unlock (struct threadpool *pool)
 
static NTSTATUS tp_group_alloc (struct threadpool_group **out)
 
static void tp_group_shutdown (struct threadpool_group *group)
 
static BOOL tp_group_release (struct threadpool_group *group)
 
static void tp_object_initialize (struct threadpool_object *object, struct threadpool *pool, PVOID userdata, TP_CALLBACK_ENVIRON *environment)
 
static void tp_object_prio_queue (struct threadpool_object *object)
 
static void tp_object_cancel (struct threadpool_object *object)
 
static BOOL object_is_finished (struct threadpool_object *object, BOOL group)
 
static void tp_object_wait (struct threadpool_object *object, BOOL group_wait)
 
static void tp_ioqueue_unlock (struct threadpool_object *io)
 
static struct listthreadpool_get_next_item (const struct threadpool *pool)
 
NTSTATUS WINAPI TpAllocCleanupGroup (TP_CLEANUP_GROUP **out)
 
NTSTATUS WINAPI TpAllocIoCompletion (TP_IO **out, HANDLE file, PTP_IO_CALLBACK callback, void *userdata, TP_CALLBACK_ENVIRON *environment)
 
NTSTATUS WINAPI TpAllocPool (TP_POOL **out, PVOID reserved)
 
NTSTATUS WINAPI TpAllocTimer (TP_TIMER **out, PTP_TIMER_CALLBACK callback, PVOID userdata, TP_CALLBACK_ENVIRON *environment)
 
static NTSTATUS tp_alloc_wait (TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata, TP_CALLBACK_ENVIRON *environment, DWORD flags)
 
NTSTATUS WINAPI TpAllocWait (TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata, TP_CALLBACK_ENVIRON *environment)
 
NTSTATUS WINAPI TpAllocWork (TP_WORK **out, PTP_WORK_CALLBACK callback, PVOID userdata, TP_CALLBACK_ENVIRON *environment)
 
void WINAPI TpCancelAsyncIoOperation (TP_IO *io)
 
VOID WINAPI TpCallbackLeaveCriticalSectionOnCompletion (TP_CALLBACK_INSTANCE *instance, CRITICAL_SECTION *crit)
 
NTSTATUS WINAPI TpCallbackMayRunLong (TP_CALLBACK_INSTANCE *instance)
 
VOID WINAPI TpCallbackReleaseMutexOnCompletion (TP_CALLBACK_INSTANCE *instance, HANDLE mutex)
 
VOID WINAPI TpCallbackReleaseSemaphoreOnCompletion (TP_CALLBACK_INSTANCE *instance, HANDLE semaphore, DWORD count)
 
VOID WINAPI TpCallbackSetEventOnCompletion (TP_CALLBACK_INSTANCE *instance, HANDLE event)
 
VOID WINAPI TpCallbackUnloadDllOnCompletion (TP_CALLBACK_INSTANCE *instance, HMODULE module)
 
VOID WINAPI TpDisassociateCallback (TP_CALLBACK_INSTANCE *instance)
 
BOOL WINAPI TpIsTimerSet (TP_TIMER *timer)
 
VOID WINAPI TpPostWork (TP_WORK *work)
 
VOID WINAPI TpReleaseCleanupGroup (TP_CLEANUP_GROUP *group)
 
VOID WINAPI TpReleaseCleanupGroupMembers (TP_CLEANUP_GROUP *group, BOOL cancel_pending, PVOID userdata)
 
void WINAPI TpReleaseIoCompletion (TP_IO *io)
 
VOID WINAPI TpReleasePool (TP_POOL *pool)
 
VOID WINAPI TpReleaseTimer (TP_TIMER *timer)
 
VOID WINAPI TpReleaseWait (TP_WAIT *wait)
 
VOID WINAPI TpReleaseWork (TP_WORK *work)
 
VOID WINAPI TpSetPoolMaxThreads (TP_POOL *pool, DWORD maximum)
 
BOOL WINAPI TpSetPoolMinThreads (TP_POOL *pool, DWORD minimum)
 
VOID WINAPI TpSetTimer (TP_TIMER *timer, LARGE_INTEGER *timeout, LONG period, LONG window_length)
 
VOID WINAPI TpSetWait (TP_WAIT *wait, HANDLE handle, LARGE_INTEGER *timeout)
 
NTSTATUS WINAPI TpSimpleTryPost (PTP_SIMPLE_CALLBACK callback, PVOID userdata, TP_CALLBACK_ENVIRON *environment)
 
void WINAPI TpStartAsyncIoOperation (TP_IO *io)
 
void WINAPI TpWaitForIoCompletion (TP_IO *io, BOOL cancel_pending)
 
VOID WINAPI TpWaitForTimer (TP_TIMER *timer, BOOL cancel_pending)
 
VOID WINAPI TpWaitForWait (TP_WAIT *wait, BOOL cancel_pending)
 
VOID WINAPI TpWaitForWork (TP_WORK *work, BOOL cancel_pending)
 
NTSTATUS WINAPI TpSetPoolStackInformation (TP_POOL *pool, TP_POOL_STACK_INFORMATION *stack_info)
 
NTSTATUS WINAPI TpQueryPoolStackInformation (TP_POOL *pool, TP_POOL_STACK_INFORMATION *stack_info)
 
static void CALLBACK rtl_wait_callback (TP_CALLBACK_INSTANCE *instance, void *userdata, TP_WAIT *wait, TP_WAIT_RESULT result)
 
NTSTATUS WINAPI RtlRegisterWait (HANDLE *out, HANDLE handle, RTL_WAITORTIMERCALLBACKFUNC callback, void *context, ULONG milliseconds, ULONG flags)
 
NTSTATUS WINAPI RtlDeregisterWaitEx (HANDLE handle, HANDLE event)
 
NTSTATUS WINAPI RtlDeregisterWait (HANDLE WaitHandle)
 

Variables

static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug
 
struct {
   HANDLE   compl_port
 
   RTL_CRITICAL_SECTION   threadpool_compl_cs
 
old_threadpool
 
static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug
 
struct {
   CRITICAL_SECTION   cs
 
   LONG   objcount
 
   BOOL   thread_running
 
   struct list   pending_timers
 
   RTL_CONDITION_VARIABLE   update_event
 
timerqueue
 
static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug
 
struct {
   CRITICAL_SECTION   cs
 
   LONG   num_buckets
 
   struct list   buckets
 
waitqueue
 
static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug
 
struct {
   CRITICAL_SECTION   cs
 
   LONG   objcount
 
   BOOL   thread_running
 
   HANDLE   port
 
   RTL_CONDITION_VARIABLE   update_event
 
ioqueue
 
static struct threadpooldefault_threadpool = NULL
 

Macro Definition Documentation

◆ EXPIRE_NEVER

#define EXPIRE_NEVER   (~(ULONGLONG)0)

Definition at line 79 of file threadpool.c.

◆ MAXIMUM_WAITQUEUE_OBJECTS

#define MAXIMUM_WAITQUEUE_OBJECTS   (MAXIMUM_WAIT_OBJECTS - 1)

Definition at line 140 of file threadpool.c.

◆ THREADPOOL_WORKER_TIMEOUT

#define THREADPOOL_WORKER_TIMEOUT   5000

Definition at line 139 of file threadpool.c.

◆ TIMER_QUEUE_MAGIC

#define TIMER_QUEUE_MAGIC   0x516d6954 /* TimQ */

Definition at line 80 of file threadpool.c.

◆ WIN32_NO_STATUS

#define WIN32_NO_STATUS

Definition at line 58 of file threadpool.c.

Enumeration Type Documentation

◆ threadpool_objtype

Enumerator
TP_OBJECT_TYPE_SIMPLE 
TP_OBJECT_TYPE_WORK 
TP_OBJECT_TYPE_TIMER 
TP_OBJECT_TYPE_WAIT 
TP_OBJECT_TYPE_IO 

Definition at line 161 of file threadpool.c.

162{
168};
@ TP_OBJECT_TYPE_SIMPLE
Definition: threadpool.c:163
@ TP_OBJECT_TYPE_WORK
Definition: threadpool.c:164
@ TP_OBJECT_TYPE_TIMER
Definition: threadpool.c:165
@ TP_OBJECT_TYPE_IO
Definition: threadpool.c:167
@ TP_OBJECT_TYPE_WAIT
Definition: threadpool.c:166

Function Documentation

◆ array_reserve()

static BOOL array_reserve ( void **  elements,
unsigned int capacity,
unsigned int  count,
unsigned int  size 
)
static

Definition at line 441 of file threadpool.c.

442{
443 unsigned int new_capacity, max_capacity;
444 void *new_elements;
445
446 if (count <= *capacity)
447 return TRUE;
448
449 max_capacity = ~(SIZE_T)0 / size;
450 if (count > max_capacity)
451 return FALSE;
452
453 new_capacity = max(4, *capacity);
454 while (new_capacity < count && new_capacity <= max_capacity / 2)
455 new_capacity *= 2;
456 if (new_capacity < count)
457 new_capacity = max_capacity;
458
459 if (!(new_elements = RtlReAllocateHeap( GetProcessHeap(), 0, *elements, new_capacity * size )))
460 return FALSE;
461
462 *elements = new_elements;
463 *capacity = new_capacity;
464
465 return TRUE;
466}
#define TRUE
Definition: types.h:120
#define FALSE
Definition: types.h:117
#define GetProcessHeap()
Definition: compat.h:736
GLuint GLuint GLsizei count
Definition: gl.h:1545
GLsizeiptr size
Definition: glext.h:5919
NTSYSAPI PVOID WINAPI RtlReAllocateHeap(HANDLE, ULONG, PVOID, SIZE_T) __WINE_ALLOC_SIZE(4) __WINE_DEALLOC(RtlFreeHeap
#define max(a, b)
Definition: svc.c:63
ULONG_PTR SIZE_T
Definition: typedefs.h:80

Referenced by ioqueue_thread_proc().

◆ get_nt_timeout()

static PLARGE_INTEGER get_nt_timeout ( PLARGE_INTEGER  pTime,
ULONG  timeout 
)
inlinestatic

Definition at line 627 of file threadpool.c.

628{
629 if (timeout == INFINITE) return NULL;
630 pTime->QuadPart = (ULONGLONG)timeout * -10000;
631 return pTime;
632}
#define NULL
Definition: types.h:112
#define INFINITE
Definition: serial.h:102
Definition: dhcpd.h:248
uint64_t ULONGLONG
Definition: typedefs.h:67
LONGLONG QuadPart
Definition: typedefs.h:114

Referenced by RtlRegisterWait(), and timer_queue_thread_proc().

◆ get_timer_queue()

static struct timer_queue * get_timer_queue ( HANDLE  TimerQueue)
static

Definition at line 954 of file threadpool.c.

955{
956 static struct timer_queue *default_timer_queue;
957
958 if (TimerQueue)
959 return TimerQueue;
960 else
961 {
962 if (!default_timer_queue)
963 {
964 HANDLE q;
966 if (status == STATUS_SUCCESS)
967 {
968 PVOID p = InterlockedCompareExchangePointer( (void **) &default_timer_queue, q, NULL );
969 if (p)
970 /* Got beat to the punch. */
972 }
973 }
974 return default_timer_queue;
975 }
976}
LONG NTSTATUS
Definition: precomp.h:26
GLdouble GLdouble GLdouble GLdouble q
Definition: gl.h:2063
GLfloat GLfloat p
Definition: glext.h:8902
#define InterlockedCompareExchangePointer
Definition: interlocked.h:144
NTSTATUS WINAPI RtlCreateTimerQueue(PHANDLE NewTimerQueue)
Definition: threadpool.c:864
NTSTATUS WINAPI RtlDeleteTimerQueueEx(HANDLE TimerQueue, HANDLE CompletionEvent)
Definition: threadpool.c:910
#define STATUS_SUCCESS
Definition: shellext.h:65
Definition: ps.c:97

Referenced by RtlCreateTimer().

◆ impl_from_TP_CALLBACK_INSTANCE()

◆ impl_from_TP_CLEANUP_GROUP()

static struct threadpool_group * impl_from_TP_CLEANUP_GROUP ( TP_CLEANUP_GROUP group)
inlinestatic

Definition at line 420 of file threadpool.c.

421{
422 return (struct threadpool_group *)group;
423}
GLboolean GLuint group
Definition: glext.h:11120

Referenced by tp_object_initialize(), TpReleaseCleanupGroup(), and TpReleaseCleanupGroupMembers().

◆ impl_from_TP_IO()

static struct threadpool_object * impl_from_TP_IO ( TP_IO io)
inlinestatic

Definition at line 413 of file threadpool.c.

414{
415 struct threadpool_object *object = (struct threadpool_object *)io;
416 assert( object->type == TP_OBJECT_TYPE_IO );
417 return object;
418}
#define assert(x)
Definition: debug.h:53
static HANDLE PIO_APC_ROUTINE PVOID PIO_STATUS_BLOCK io
Definition: file.c:100

Referenced by TpCancelAsyncIoOperation(), TpReleaseIoCompletion(), TpStartAsyncIoOperation(), and TpWaitForIoCompletion().

◆ impl_from_TP_POOL()

static struct threadpool * impl_from_TP_POOL ( TP_POOL pool)
inlinestatic

◆ impl_from_TP_TIMER()

static struct threadpool_object * impl_from_TP_TIMER ( TP_TIMER timer)
inlinestatic

Definition at line 399 of file threadpool.c.

400{
401 struct threadpool_object *object = (struct threadpool_object *)timer;
403 return object;
404}
struct threadpool_object::@5309::@5312 timer

Referenced by TpIsTimerSet(), TpReleaseTimer(), TpSetTimer(), and TpWaitForTimer().

◆ impl_from_TP_WAIT()

static struct threadpool_object * impl_from_TP_WAIT ( TP_WAIT wait)
inlinestatic

Definition at line 406 of file threadpool.c.

407{
408 struct threadpool_object *object = (struct threadpool_object *)wait;
410 return object;
411}
struct threadpool_object::@5309::@5313 wait

Referenced by rtl_wait_callback(), RtlRegisterWait(), TpReleaseWait(), TpSetWait(), and TpWaitForWait().

◆ impl_from_TP_WORK()

static struct threadpool_object * impl_from_TP_WORK ( TP_WORK work)
inlinestatic

Definition at line 392 of file threadpool.c.

393{
394 struct threadpool_object *object = (struct threadpool_object *)work;
396 return object;
397}
struct threadpool_object::@5309::@5311 work

Referenced by TpPostWork(), TpReleaseWork(), and TpWaitForWork().

◆ iocp_poller()

static DWORD CALLBACK iocp_poller ( LPVOID  Arg)
static

Definition at line 539 of file threadpool.c.

540{
541 HANDLE cport = Arg;
542
543 while( TRUE )
544 {
548#ifdef __REACTOS__
550#else
552#endif
553 if (res)
554 {
555 ERR("NtRemoveIoCompletion failed: 0x%lx\n", res);
556 }
557 else
558 {
559 DWORD transferred = 0;
560 DWORD err = 0;
561
563 transferred = iosb.Information;
564 else
566
567 callback( err, transferred, overlapped );
568 }
569 }
570 return 0;
571}
#define ERR(fmt,...)
Definition: precomp.h:57
unsigned long DWORD
Definition: ntddk_ex.h:95
GLuint res
Definition: glext.h:9613
VOID(CALLBACK * PRTL_OVERLAPPED_COMPLETION_ROUTINE)(DWORD, DWORD, LPVOID)
Definition: winternl.h:3619
NTSYSAPI ULONG WINAPI RtlNtStatusToDosError(NTSTATUS)
NTSTATUS NTAPI NtRemoveIoCompletion(IN HANDLE IoCompletionHandle, OUT PVOID *KeyContext, OUT PVOID *ApcContext, OUT PIO_STATUS_BLOCK IoStatusBlock, IN PLARGE_INTEGER Timeout OPTIONAL)
Definition: iocomp.c:445
static IPrintDialogCallback callback
Definition: printdlg.c:326
static PIO_STATUS_BLOCK iosb
Definition: file.c:98
#define err(...)
namespace GUID const ADDRINFOEXW ADDRINFOEXW struct timeval OVERLAPPED * overlapped
Definition: sock.c:81
uint32_t * PULONG_PTR
Definition: typedefs.h:65

Referenced by RtlSetIoCompletionCallback().

◆ ioqueue_thread_proc()

static void CALLBACK ioqueue_thread_proc ( void param)
static

Definition at line 1615 of file threadpool.c.

1617{
1618 struct io_completion *completion;
1619 struct threadpool_object *io;
1621#ifdef __REACTOS__
1622 PVOID key, value;
1623#else
1625#endif
1626 BOOL destroy, skip;
1628
1629 TRACE( "starting I/O completion thread\n" );
1630 set_thread_name(L"wine_threadpool_ioqueue");
1631
1633
1634 for (;;)
1635 {
1637 if ((status = NtRemoveIoCompletion( ioqueue.port, &key, &value, &iosb, NULL )))
1638 ERR("NtRemoveIoCompletion failed, status %#lx.\n", status);
1640
1641 destroy = skip = FALSE;
1642 io = (struct threadpool_object *)key;
1643
1644 TRACE( "io %p, iosb.Status %#lx.\n", io, iosb.Status );
1645
1646 if (io && (io->shutdown || io->u.io.shutting_down))
1647 {
1648 RtlEnterCriticalSection( &io->pool->cs );
1649 if (!io->u.io.pending_count)
1650 {
1651 if (io->u.io.skipped_count)
1652 --io->u.io.skipped_count;
1653
1654 if (io->u.io.skipped_count)
1655 skip = TRUE;
1656 else
1657 destroy = TRUE;
1658 }
1659 RtlLeaveCriticalSection( &io->pool->cs );
1660 if (skip) continue;
1661 }
1662
1663 if (destroy)
1664 {
1665 --ioqueue.objcount;
1666 TRACE( "Releasing io %p.\n", io );
1667 io->shutdown = TRUE;
1669 }
1670 else if (io)
1671 {
1672 RtlEnterCriticalSection( &io->pool->cs );
1673
1674 TRACE( "pending_count %u.\n", io->u.io.pending_count );
1675
1676 if (io->u.io.pending_count)
1677 {
1678 --io->u.io.pending_count;
1679 if (!array_reserve((void **)&io->u.io.completions, &io->u.io.completion_max,
1680 io->u.io.completion_count + 1, sizeof(*io->u.io.completions)))
1681 {
1682 ERR( "Failed to allocate memory.\n" );
1683 RtlLeaveCriticalSection( &io->pool->cs );
1684 continue;
1685 }
1686
1687 completion = &io->u.io.completions[io->u.io.completion_count++];
1688 completion->iosb = iosb;
1689#ifdef __REACTOS__
1690 completion->cvalue = (ULONG_PTR)value;
1691#else
1692 completion->cvalue = value;
1693#endif
1694
1696 }
1697 RtlLeaveCriticalSection( &io->pool->cs );
1698 }
1699
1700 if (!ioqueue.objcount)
1701 {
1702 /* All I/O objects have been destroyed; if no new objects are
1703 * created within some amount of time, then we can shutdown this
1704 * thread. */
1705 LARGE_INTEGER timeout = {.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000};
1706 if (RtlSleepConditionVariableCS( &ioqueue.update_event, &ioqueue.cs,
1707 &timeout) == STATUS_TIMEOUT && !ioqueue.objcount)
1708 break;
1709 }
1710 }
1711
1712 ioqueue.thread_running = FALSE;
1714
1715 TRACE( "terminating I/O completion thread\n" );
1716
1717 RtlExitUserThread( 0 );
1718
1719#ifdef __REACTOS__
1720 return STATUS_SUCCESS;
1721#endif
1722}
void CALLBACK completion(DWORD dwError, DWORD cbTransferred, LPWSAOVERLAPPED lpOverlapped, DWORD dwFlags)
Definition: WSARecv.c:16
void destroy(_Tp *__pointer)
Definition: _construct.h:278
#define skip(...)
Definition: atltest.h:64
#define STATUS_TIMEOUT
Definition: d3dkmdt.h:49
#define L(x)
Definition: resources.c:13
#define ULONG_PTR
Definition: config.h:101
unsigned int BOOL
Definition: ntddk_ex.h:94
NTSYSAPI NTSTATUS NTAPI RtlSleepConditionVariableCS(_Inout_ PRTL_CONDITION_VARIABLE ConditionVariable, _Inout_ PRTL_CRITICAL_SECTION CriticalSection, _In_opt_ PLARGE_INTEGER TimeOut)
NTSYSAPI NTSTATUS NTAPI RtlEnterCriticalSection(_In_ PRTL_CRITICAL_SECTION CriticalSection)
NTSYSAPI VOID NTAPI RtlExitUserThread(_In_ NTSTATUS Status)
NTSYSAPI NTSTATUS NTAPI RtlLeaveCriticalSection(_In_ PRTL_CRITICAL_SECTION CriticalSection)
static BOOL tp_object_release(struct threadpool_object *object)
Definition: threadpool.c:2252
static void set_thread_name(const WCHAR *name)
Definition: threadpool.c:468
static BOOL array_reserve(void **elements, unsigned int *capacity, unsigned int count, unsigned int size)
Definition: threadpool.c:441
static void tp_object_submit(struct threadpool_object *object, BOOL signaled)
Definition: threadpool.c:2119
#define THREADPOOL_WORKER_TIMEOUT
Definition: threadpool.c:139
static struct @5308 ioqueue
#define TRACE(s)
Definition: solgame.cpp:4
Definition: copy.c:22
uint32_t ULONG_PTR
Definition: typedefs.h:65
Definition: pdh_main.c:96

Referenced by tp_ioqueue_lock().

◆ object_is_finished()

static BOOL object_is_finished ( struct threadpool_object object,
BOOL  group 
)
static

Definition at line 2184 of file threadpool.c.

2185{
2186 if (object->num_pending_callbacks)
2187 return FALSE;
2188 if (object->type == TP_OBJECT_TYPE_IO && object->u.io.pending_count)
2189 return FALSE;
2190
2191 if (group)
2192 return !object->num_running_callbacks;
2193 else
2194 return !object->num_associated_callbacks;
2195}

Referenced by tp_object_execute(), tp_object_wait(), TpCancelAsyncIoOperation(), and TpDisassociateCallback().

◆ process_rtl_work_item()

static void CALLBACK process_rtl_work_item ( TP_CALLBACK_INSTANCE instance,
void userdata 
)
static

Definition at line 479 of file threadpool.c.

480{
481 struct rtl_work_item *item = userdata;
482
483 TRACE("executing %p(%p)\n", item->function, item->context);
484 item->function( item->context );
485
487}
BOOLEAN NTAPI RtlFreeHeap(IN PVOID HeapHandle, IN ULONG Flags, IN PVOID HeapBase)
Definition: heap.c:634

Referenced by RtlQueueWorkItem().

◆ queue_add_timer()

static void queue_add_timer ( struct queue_timer t,
ULONGLONG  time,
BOOL  set_event 
)
static

Definition at line 684 of file threadpool.c.

686{
687 /* We MUST hold the queue cs while calling this function. */
688 struct timer_queue *q = t->q;
689 struct list *ptr = &q->timers;
690
691 assert(!q->quit || (t->destroy && time == EXPIRE_NEVER));
692
693 if (time != EXPIRE_NEVER)
694 LIST_FOR_EACH(ptr, &q->timers)
695 {
696 struct queue_timer *cur = LIST_ENTRY(ptr, struct queue_timer, entry);
697 if (time < cur->expire)
698 break;
699 }
700 list_add_before(ptr, &t->entry);
701
702 t->expire = time;
703
704 /* If we insert at the head of the list, we need to expire sooner
705 than expected. */
706 if (set_event && &t->entry == list_head(&q->timers))
707 NtSetEvent(q->event, NULL);
708}
Definition: list.h:37
static uacpi_status set_event(uacpi_u8 event, uacpi_u8 value)
Definition: event.c:84
FxCollectionEntry * cur
GLdouble GLdouble t
Definition: gl.h:2047
uint32_t entry
Definition: isohybrid.c:63
__u16 time
Definition: mkdosfs.c:8
static PVOID ptr
Definition: dispmode.c:27
NTSTATUS NTAPI NtSetEvent(IN HANDLE EventHandle, OUT PLONG PreviousState OPTIONAL)
Definition: event.c:463
#define LIST_FOR_EACH(cursor, list)
Definition: list.h:188
__WINE_SERVER_LIST_INLINE void list_add_before(struct list *elem, struct list *to_add)
Definition: list.h:87
#define EXPIRE_NEVER
Definition: threadpool.c:79
Definition: list.h:15
ULONGLONG expire
Definition: threadpool.c:120
#define LIST_ENTRY(type)
Definition: queue.h:175

Referenced by queue_move_timer(), and RtlCreateTimer().

◆ queue_current_time()

static ULONGLONG queue_current_time ( void  )
inlinestatic

Definition at line 677 of file threadpool.c.

678{
679 LARGE_INTEGER now, freq;
681 return now.QuadPart * 1000 / freq.QuadPart;
682}
time_t now
Definition: finger.c:65
NTSTATUS NTAPI NtQueryPerformanceCounter(OUT PLARGE_INTEGER PerformanceCounter, OUT PLARGE_INTEGER PerformanceFrequency OPTIONAL)
Definition: profile.c:278

Referenced by queue_get_timeout(), queue_timer_expire(), RtlCreateTimer(), and RtlUpdateTimer().

◆ queue_destroy_timer()

static void queue_destroy_timer ( struct queue_timer t)
static

Definition at line 837 of file threadpool.c.

838{
839 /* We MUST hold the queue cs while calling this function. */
840 t->destroy = TRUE;
841 if (t->runcount == 0)
842 /* Ensure a timer is promptly removed. If callbacks are pending,
843 it will be removed after the last one finishes by the callback
844 cleanup wrapper. */
846 else
847 /* Make sure no destroyed timer masks an active timer at the head
848 of the sorted list. */
850}
static void queue_move_timer(struct queue_timer *t, ULONGLONG time, BOOL set_event)
Definition: threadpool.c:710
static void queue_remove_timer(struct queue_timer *t)
Definition: threadpool.c:636

Referenced by RtlDeleteTimer(), and RtlDeleteTimerQueueEx().

◆ queue_get_timeout()

static ULONG queue_get_timeout ( struct timer_queue q)
static

Definition at line 763 of file threadpool.c.

764{
765 struct queue_timer *t;
767
769 if (list_head(&q->timers))
770 {
771 t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
772 assert(!t->destroy || t->expire == EXPIRE_NEVER);
773
774 if (t->expire != EXPIRE_NEVER)
775 {
777 timeout = t->expire < time ? 0 : t->expire - time;
778 }
779 }
781
782 return timeout;
783}
static ULONGLONG queue_current_time(void)
Definition: threadpool.c:677
uint32_t ULONG
Definition: typedefs.h:59

Referenced by timer_queue_thread_proc().

◆ queue_move_timer()

static void queue_move_timer ( struct queue_timer t,
ULONGLONG  time,
BOOL  set_event 
)
inlinestatic

Definition at line 710 of file threadpool.c.

712{
713 /* We MUST hold the queue cs while calling this function. */
714 list_remove(&t->entry);
716}
static void list_remove(struct list_entry *entry)
Definition: list.h:90
static void queue_add_timer(struct queue_timer *t, ULONGLONG time, BOOL set_event)
Definition: threadpool.c:684

Referenced by queue_destroy_timer(), queue_timer_expire(), and RtlUpdateTimer().

◆ queue_remove_timer()

static void queue_remove_timer ( struct queue_timer t)
static

Definition at line 636 of file threadpool.c.

637{
638 /* We MUST hold the queue cs while calling this function. This ensures
639 that we cannot queue another callback for this timer. The runcount
640 being zero makes sure we don't have any already queued. */
641 struct timer_queue *q = t->q;
642
643 assert(t->runcount == 0);
644 assert(t->destroy);
645
646 list_remove(&t->entry);
647 if (t->event)
648 NtSetEvent(t->event, NULL);
650
651 if (q->quit && list_empty(&q->timers))
652 NtSetEvent(q->event, NULL);
653}
static int list_empty(struct list_entry *head)
Definition: list.h:58

Referenced by queue_destroy_timer(), and timer_cleanup_callback().

◆ queue_timer_expire()

static void queue_timer_expire ( struct timer_queue q)
static

Definition at line 718 of file threadpool.c.

719{
720 struct queue_timer *t = NULL;
721
723 if (list_head(&q->timers))
724 {
726 t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
727 if (!t->destroy && t->expire <= ((now = queue_current_time())))
728 {
729 ++t->runcount;
730 if (t->period)
731 {
732 next = t->expire + t->period;
733 /* avoid trigger cascade if overloaded / hibernated */
734 if (next < now)
735 next = now + t->period;
736 }
737 else
740 }
741 else
742 t = NULL;
743 }
745
746 if (t)
747 {
748 if (t->flags & WT_EXECUTEINTIMERTHREAD)
750 else
751 {
753 = (t->flags
757 if (status != STATUS_SUCCESS)
759 }
760 }
761}
GLbitfield flags
Definition: glext.h:7161
NTSYSAPI NTSTATUS NTAPI RtlQueueWorkItem(_In_ WORKERCALLBACKFUNC Function, _In_opt_ PVOID Context, _In_ ULONG Flags)
static unsigned __int64 next
Definition: rand_nt.c:6
static DWORD WINAPI timer_callback_wrapper(LPVOID p)
Definition: threadpool.c:669
static void timer_cleanup_callback(struct queue_timer *t)
Definition: threadpool.c:655
#define WT_TRANSFER_IMPERSONATION
Definition: winnt_old.h:1093
#define WT_EXECUTEINPERSISTENTTHREAD
Definition: winnt_old.h:1092
#define WT_EXECUTEINIOTHREAD
Definition: winnt_old.h:1085
#define WT_EXECUTELONGFUNCTION
Definition: winnt_old.h:1089
#define WT_EXECUTEINTIMERTHREAD
Definition: winnt_old.h:1090

Referenced by timer_queue_thread_proc().

◆ rtl_wait_callback()

static void CALLBACK rtl_wait_callback ( TP_CALLBACK_INSTANCE instance,
void userdata,
TP_WAIT wait,
TP_WAIT_RESULT  result 
)
static

Definition at line 3381 of file threadpool.c.

3382{
3383 struct threadpool_object *object = impl_from_TP_WAIT(wait);
3384 object->u.wait.rtl_callback( userdata, result != STATUS_WAIT_0 );
3385}
GLuint64EXT * result
Definition: glext.h:11304
#define STATUS_WAIT_0
Definition: ntstatus.h:330
static struct threadpool_object * impl_from_TP_WAIT(TP_WAIT *wait)
Definition: threadpool.c:406

Referenced by RtlRegisterWait().

◆ RtlCreateTimer()

NTSTATUS WINAPI RtlCreateTimer ( HANDLE  TimerQueue,
HANDLE NewTimer,
RTL_WAITORTIMERCALLBACKFUNC  Callback,
PVOID  Parameter,
DWORD  DueTime,
DWORD  Period,
ULONG  Flags 
)

Definition at line 1003 of file threadpool.c.

1007{
1009 struct queue_timer *t;
1010 struct timer_queue *q = get_timer_queue(TimerQueue);
1011
1012 if (!q) return STATUS_NO_MEMORY;
1013 if (q->magic != TIMER_QUEUE_MAGIC) return STATUS_INVALID_HANDLE;
1014
1015 t = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *t);
1016 if (!t)
1017 return STATUS_NO_MEMORY;
1018
1019 t->q = q;
1020 t->runcount = 0;
1021 t->callback = Callback;
1022 t->param = Parameter;
1023 t->period = Period;
1024 t->flags = Flags;
1025 t->destroy = FALSE;
1026 t->event = NULL;
1027
1030 if (q->quit)
1032 else
1035
1036 if (status == STATUS_SUCCESS)
1037 *NewTimer = t;
1038 else
1040
1041 return status;
1042}
PVOID NTAPI RtlAllocateHeap(IN PVOID HeapHandle, IN ULONG Flags, IN SIZE_T Size)
Definition: heap.c:616
#define STATUS_INVALID_HANDLE
Definition: d3dkmdt.h:40
#define STATUS_NO_MEMORY
Definition: d3dkmdt.h:51
#define TIMER_QUEUE_MAGIC
Definition: threadpool.c:80
static struct timer_queue * get_timer_queue(HANDLE TimerQueue)
Definition: threadpool.c:954
_In_ WDFINTERRUPT _In_ PFN_WDF_INTERRUPT_SYNCHRONIZE Callback
Definition: wdfinterrupt.h:458
_In_ WDFTIMER _In_ LONGLONG DueTime
Definition: wdftimer.h:190
_Must_inspect_result_ _In_ ULONG Flags
Definition: wsk.h:170
_In_ LARGE_INTEGER _In_ ULONG Period
Definition: kefuncs.h:1313
_Inout_opt_ PVOID Parameter
Definition: rtltypes.h:336

◆ RtlCreateTimerQueue()

NTSTATUS WINAPI RtlCreateTimerQueue ( PHANDLE  NewTimerQueue)

Definition at line 864 of file threadpool.c.

865{
867 struct timer_queue *q = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *q);
868 if (!q)
869 return STATUS_NO_MEMORY;
870
872 list_init(&q->timers);
873 q->quit = FALSE;
874 q->magic = TIMER_QUEUE_MAGIC;
876 if (status != STATUS_SUCCESS)
877 {
879 return status;
880 }
882 timer_queue_thread_proc, q, &q->thread, NULL);
883 if (status != STATUS_SUCCESS)
884 {
885 NtClose(q->event);
887 return status;
888 }
889
890 *NewTimerQueue = q;
891 return STATUS_SUCCESS;
892}
static void list_init(struct list_entry *head)
Definition: list.h:51
#define GetCurrentProcess()
Definition: compat.h:759
#define EVENT_ALL_ACCESS
Definition: isotest.c:82
NTSYSAPI NTSTATUS NTAPI RtlCreateUserThread(_In_ PVOID ThreadContext, _Out_ HANDLE *OutThreadHandle, _Reserved_ PVOID Reserved1, _Reserved_ PVOID Reserved2, _Reserved_ PVOID Reserved3, _Reserved_ PVOID Reserved4, _Reserved_ PVOID Reserved5, _Reserved_ PVOID Reserved6, _Reserved_ PVOID Reserved7, _Reserved_ PVOID Reserved8)
NTSYSAPI NTSTATUS NTAPI RtlInitializeCriticalSection(_In_ PRTL_CRITICAL_SECTION CriticalSection)
NTSTATUS NTAPI NtClose(IN HANDLE Handle)
Definition: obhandle.c:3402
@ SynchronizationEvent
NTSTATUS NTAPI NtCreateEvent(OUT PHANDLE EventHandle, IN ACCESS_MASK DesiredAccess, IN POBJECT_ATTRIBUTES ObjectAttributes OPTIONAL, IN EVENT_TYPE EventType, IN BOOLEAN InitialState)
Definition: event.c:96
static void WINAPI timer_queue_thread_proc(LPVOID p)
Definition: threadpool.c:788

Referenced by get_timer_queue().

◆ RtlDeleteTimer()

NTSTATUS WINAPI RtlDeleteTimer ( HANDLE  TimerQueue,
HANDLE  Timer,
HANDLE  CompletionEvent 
)

Definition at line 1098 of file threadpool.c.

1100{
1101 struct queue_timer *t = Timer;
1102 struct timer_queue *q;
1104 HANDLE event = NULL;
1105
1106 if (!Timer)
1108 q = t->q;
1109 if (CompletionEvent == INVALID_HANDLE_VALUE)
1110 {
1112 if (status == STATUS_SUCCESS)
1114 }
1115 else if (CompletionEvent)
1116 event = CompletionEvent;
1117
1119 t->event = event;
1120 if (t->runcount == 0 && event)
1124
1125 if (CompletionEvent == INVALID_HANDLE_VALUE && event)
1126 {
1127 if (status == STATUS_PENDING)
1128 {
1131 }
1132 NtClose(event);
1133 }
1134
1135 return status;
1136}
#define INVALID_HANDLE_VALUE
Definition: compat.h:731
struct _cl_event * event
Definition: glext.h:7739
NTSYSAPI NTSTATUS NTAPI NtWaitForSingleObject(IN HANDLE hObject, IN BOOLEAN bAlertable, IN PLARGE_INTEGER Timeout)
#define STATUS_INVALID_PARAMETER_1
Definition: ntstatus.h:569
static void queue_destroy_timer(struct queue_timer *t)
Definition: threadpool.c:837
#define STATUS_PENDING
Definition: telnetd.h:14

◆ RtlDeleteTimerQueueEx()

NTSTATUS WINAPI RtlDeleteTimerQueueEx ( HANDLE  TimerQueue,
HANDLE  CompletionEvent 
)

Definition at line 910 of file threadpool.c.

911{
912 struct timer_queue *q = TimerQueue;
913 struct queue_timer *t, *temp;
916
917 if (!q || q->magic != TIMER_QUEUE_MAGIC)
919
920 thread = q->thread;
921
923 q->quit = TRUE;
924 if (list_head(&q->timers))
925 /* When the last timer is removed, it will signal the timer thread to
926 exit... */
927 LIST_FOR_EACH_ENTRY_SAFE(t, temp, &q->timers, struct queue_timer, entry)
929 else
930 /* However if we have none, we must do it ourselves. */
931 NtSetEvent(q->event, NULL);
933
934 if (CompletionEvent == INVALID_HANDLE_VALUE)
935 {
938 }
939 else
940 {
941 if (CompletionEvent)
942 {
943 FIXME("asynchronous return on completion event unimplemented\n");
945 NtSetEvent(CompletionEvent, NULL);
946 }
948 }
949
951 return status;
952}
#define FIXME(fmt,...)
Definition: precomp.h:53
static HANDLE thread
Definition: service.c:33
static calc_node_t temp
Definition: rpn_ieee.c:38
#define LIST_FOR_EACH_ENTRY_SAFE(cursor, cursor2, list, type, field)
Definition: list.h:204

Referenced by get_timer_queue().

◆ RtlDeregisterWait()

NTSTATUS WINAPI RtlDeregisterWait ( HANDLE  WaitHandle)

Definition at line 3499 of file threadpool.c.

3500{
3501 return RtlDeregisterWaitEx(WaitHandle, NULL);
3502}
NTSYSAPI NTSTATUS NTAPI RtlDeregisterWaitEx(_In_ HANDLE hWaitHandle, _In_opt_ HANDLE hCompletionEvent)

◆ RtlDeregisterWaitEx()

NTSTATUS WINAPI RtlDeregisterWaitEx ( HANDLE  handle,
HANDLE  event 
)

Definition at line 3458 of file threadpool.c.

3459{
3460 struct threadpool_object *object = handle;
3462
3463 TRACE( "handle %p, event %p\n", handle, event );
3464
3465 if (!object) return STATUS_INVALID_HANDLE;
3466
3467 TpSetWait( (TP_WAIT *)object, NULL, NULL );
3468
3469 if (event == INVALID_HANDLE_VALUE) TpWaitForWait( (TP_WAIT *)object, TRUE );
3470 else
3471 {
3472 assert( object->completed_event == NULL );
3473 object->completed_event = event;
3474 }
3475
3476 RtlEnterCriticalSection( &object->pool->cs );
3477 if (object->num_pending_callbacks + object->num_running_callbacks
3478 + object->num_associated_callbacks) status = STATUS_PENDING;
3479 else status = STATUS_SUCCESS;
3480 RtlLeaveCriticalSection( &object->pool->cs );
3481
3482 TpReleaseWait( (TP_WAIT *)object );
3483 return status;
3484}
NTSYSAPI void WINAPI TpWaitForWait(TP_WAIT *, BOOL)
Definition: threadpool.c:3317
NTSYSAPI void WINAPI TpSetWait(TP_WAIT *, HANDLE, LARGE_INTEGER *)
Definition: threadpool.c:3191
NTSYSAPI void WINAPI TpReleaseWait(TP_WAIT *)
Definition: threadpool.c:3041
struct _TP_WAIT TP_WAIT
Definition: winnt_old.h:4645

◆ RtlQueueWorkItem()

NTSTATUS WINAPI RtlQueueWorkItem ( PRTL_WORK_ITEM_ROUTINE  function,
PVOID  context,
ULONG  flags 
)

Definition at line 511 of file threadpool.c.

512{
513 TP_CALLBACK_ENVIRON environment;
514 struct rtl_work_item *item;
516
517 TRACE( "%p %p %lu\n", function, context, flags );
518
519 item = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*item) );
520 if (!item)
521 return STATUS_NO_MEMORY;
522
523 memset( &environment, 0, sizeof(environment) );
524 environment.Version = 1;
525 environment.u.s.LongFunction = (flags & WT_EXECUTELONGFUNCTION) != 0;
526 environment.u.s.Persistent = (flags & WT_EXECUTEINPERSISTENTTHREAD) != 0;
527
528 item->function = function;
529 item->context = context;
530
533 return status;
534}
NTSYSAPI NTSTATUS WINAPI TpSimpleTryPost(PTP_SIMPLE_CALLBACK, PVOID, TP_CALLBACK_ENVIRON *)
Definition: threadpool.c:3243
#define memset(x, y, z)
Definition: compat.h:39
static void CALLBACK process_rtl_work_item(TP_CALLBACK_INSTANCE *instance, void *userdata)
Definition: threadpool.c:479
union _TP_CALLBACK_ENVIRON_V3::@4519 u
struct _TP_CALLBACK_ENVIRON_V3::@4519::@4520 s
Definition: http.c:7252
PVOID context
Definition: threadpool.c:76
PRTL_WORK_ITEM_ROUTINE function
Definition: threadpool.c:75

◆ RtlRegisterWait()

NTSTATUS WINAPI RtlRegisterWait ( HANDLE out,
HANDLE  handle,
RTL_WAITORTIMERCALLBACKFUNC  callback,
void context,
ULONG  milliseconds,
ULONG  flags 
)

Definition at line 3412 of file threadpool.c.

3414{
3415 struct threadpool_object *object;
3416 TP_CALLBACK_ENVIRON environment;
3419 TP_WAIT *wait;
3420
3421 TRACE( "out %p, handle %p, callback %p, context %p, milliseconds %lu, flags %lx\n",
3422 out, handle, callback, context, milliseconds, flags );
3423
3424 memset( &environment, 0, sizeof(environment) );
3425 environment.Version = 1;
3426 environment.u.s.LongFunction = (flags & WT_EXECUTELONGFUNCTION) != 0;
3427 environment.u.s.Persistent = (flags & WT_EXECUTEINPERSISTENTTHREAD) != 0;
3428
3430 if ((status = tp_alloc_wait( &wait, rtl_wait_callback, context, &environment, flags )))
3431 return status;
3432
3433 object = impl_from_TP_WAIT(wait);
3434 object->u.wait.rtl_callback = callback;
3435
3437 TpSetWait( (TP_WAIT *)object, handle, get_nt_timeout( &timeout, milliseconds ) );
3438
3439 *out = object;
3441
3442 return STATUS_SUCCESS;
3443}
static PLARGE_INTEGER get_nt_timeout(PLARGE_INTEGER pTime, ULONG timeout)
Definition: threadpool.c:627
static struct @5307 waitqueue
static NTSTATUS tp_alloc_wait(TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata, TP_CALLBACK_ENVIRON *environment, DWORD flags)
Definition: threadpool.c:2638
static void CALLBACK rtl_wait_callback(TP_CALLBACK_INSTANCE *instance, void *userdata, TP_WAIT *wait, TP_WAIT_RESULT result)
Definition: threadpool.c:3381
ULONGLONG timeout
Definition: threadpool.c:222
wchar_t tm const _CrtWcstime_Writes_and_advances_ptr_ count wchar_t ** out
Definition: wcsftime.cpp:383
#define WT_EXECUTEONLYONCE
Definition: winnt_old.h:1088
#define WT_EXECUTEINWAITTHREAD
Definition: winnt_old.h:1087

◆ RtlSetIoCompletionCallback()

NTSTATUS WINAPI RtlSetIoCompletionCallback ( HANDLE  FileHandle,
PRTL_OVERLAPPED_COMPLETION_ROUTINE  Function,
ULONG  Flags 
)

Definition at line 589 of file threadpool.c.

590{
593
594 if (Flags) FIXME("Unknown value Flags=0x%lx\n", Flags);
595
596 if (!old_threadpool.compl_port)
597 {
599
600 RtlEnterCriticalSection(&old_threadpool.threadpool_compl_cs);
601 if (!old_threadpool.compl_port)
602 {
603 HANDLE cport;
604
606 if (!res)
607 {
608 /* FIXME native can start additional threads in case of e.g. hung callback function. */
610 if (!res)
611 old_threadpool.compl_port = cport;
612 else
613 NtClose( cport );
614 }
615 }
616 RtlLeaveCriticalSection(&old_threadpool.threadpool_compl_cs);
617 if (res) return res;
618 }
619
620 info.CompletionPort = old_threadpool.compl_port;
621 info.CompletionKey = (ULONG_PTR)Function;
622
624}
_In_ CDROM_SCAN_FOR_SPECIAL_INFO _In_ PCDROM_SCAN_FOR_SPECIAL_HANDLER Function
Definition: cdrom.h:1156
_Must_inspect_result_ _In_opt_ PFLT_INSTANCE _Out_ PHANDLE FileHandle
Definition: fltkernel.h:1231
@ FileCompletionInformation
Definition: from_kernel.h:91
NTSTATUS NTAPI NtCreateIoCompletion(OUT PHANDLE IoCompletionHandle, IN ACCESS_MASK DesiredAccess, IN POBJECT_ATTRIBUTES ObjectAttributes, IN ULONG NumberOfConcurrentThreads)
Definition: iocomp.c:253
#define IO_COMPLETION_ALL_ACCESS
Definition: file.c:72
NTSYSAPI NTSTATUS NTAPI NtSetInformationFile(IN HANDLE hFile, OUT PIO_STATUS_BLOCK pIoStatusBlock, IN PVOID FileInformationBuffer, IN ULONG FileInformationBufferLength, IN FILE_INFORMATION_CLASS FileInfoClass)
Definition: iofunc.c:3096
static struct @5305 old_threadpool
static DWORD CALLBACK iocp_poller(LPVOID Arg)
Definition: threadpool.c:539
#define WT_EXECUTEDEFAULT
Definition: winnt_old.h:1084

◆ RtlUpdateTimer()

NTSTATUS WINAPI RtlUpdateTimer ( HANDLE  TimerQueue,
HANDLE  Timer,
DWORD  DueTime,
DWORD  Period 
)

Definition at line 1062 of file threadpool.c.

1064{
1065 struct queue_timer *t = Timer;
1066 struct timer_queue *q = t->q;
1067
1069 /* Can't change a timer if it was once-only or destroyed. */
1070 if (t->expire != EXPIRE_NEVER)
1071 {
1072 t->period = Period;
1074 }
1076
1077 return STATUS_SUCCESS;
1078}

◆ set_thread_name()

static void set_thread_name ( const WCHAR name)
static

Definition at line 468 of file threadpool.c.

469{
470#ifndef __REACTOS__ // This is impossible on non vista+
472
473 RtlInitUnicodeString(&info.ThreadName, name);
475#endif
476}
@ ThreadNameInformation
Definition: winternl.h:2319
NTSYSAPI VOID NTAPI RtlInitUnicodeString(PUNICODE_STRING DestinationString, PCWSTR SourceString)
NTSTATUS NTAPI NtSetInformationThread(_In_ HANDLE ThreadHandle, _In_ THREADINFOCLASS ThreadInformationClass, _In_reads_bytes_(ThreadInformationLength) PVOID ThreadInformation, _In_ ULONG ThreadInformationLength)
Definition: query.c:2268
Definition: name.c:39
HANDLE WINAPI GetCurrentThread(void)
Definition: proc.c:1148

Referenced by ioqueue_thread_proc(), threadpool_worker_proc(), timer_queue_thread_proc(), timerqueue_thread_proc(), and waitqueue_thread_proc().

◆ threadpool_get_next_item()

static struct list * threadpool_get_next_item ( const struct threadpool pool)
static

Definition at line 2292 of file threadpool.c.

2293{
2294 struct list *ptr;
2295 unsigned int i;
2296
2297 for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
2298 {
2299 if ((ptr = list_head( &pool->pools[i] )))
2300 break;
2301 }
2302
2303 return ptr;
2304}
#define ARRAY_SIZE(A)
Definition: main.h:20
GLsizei GLenum const GLvoid GLsizei GLenum GLbyte GLbyte GLbyte GLdouble GLdouble GLdouble GLfloat GLfloat GLfloat GLint GLint GLint GLshort GLshort GLshort GLubyte GLubyte GLubyte GLuint GLuint GLuint GLushort GLushort GLushort GLbyte GLbyte GLbyte GLbyte GLdouble GLdouble GLdouble GLdouble GLfloat GLfloat GLfloat GLfloat GLint GLint GLint GLint GLshort GLshort GLshort GLshort GLubyte GLubyte GLubyte GLubyte GLuint GLuint GLuint GLuint GLushort GLushort GLushort GLushort GLboolean const GLdouble const GLfloat const GLint const GLshort const GLbyte const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLdouble const GLfloat const GLfloat const GLint const GLint const GLshort const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort GLenum GLenum GLenum GLfloat GLenum GLint GLenum GLenum GLenum GLfloat GLenum GLenum GLint GLenum GLfloat GLenum GLint GLint GLushort GLenum GLenum GLfloat GLenum GLenum GLint GLfloat const GLubyte GLenum GLenum GLenum const GLfloat GLenum GLenum const GLint GLenum GLint GLint GLsizei GLsizei GLint GLenum GLenum const GLvoid GLenum GLenum const GLfloat GLenum GLenum const GLint GLenum GLenum const GLdouble GLenum GLenum const GLfloat GLenum GLenum const GLint GLsizei GLuint GLfloat GLuint GLbitfield GLfloat GLint GLuint GLboolean GLenum GLfloat GLenum GLbitfield GLenum GLfloat GLfloat GLint GLint const GLfloat GLenum GLfloat GLfloat GLint GLint GLfloat GLfloat GLint GLint const GLfloat GLint GLfloat GLfloat GLint GLfloat GLfloat GLint GLfloat GLfloat const GLdouble const GLfloat const GLdouble const GLfloat GLint i
Definition: glfuncs.h:248

Referenced by threadpool_worker_proc().

◆ threadpool_worker_proc()

static void CALLBACK threadpool_worker_proc ( void param)
static

Definition at line 2471 of file threadpool.c.

2473{
2474 struct threadpool *pool = param;
2476 struct list *ptr;
2477
2478 TRACE( "starting worker thread for pool %p\n", pool );
2479 set_thread_name(L"wine_threadpool_worker");
2480
2482 for (;;)
2483 {
2484 while ((ptr = threadpool_get_next_item( pool )))
2485 {
2486 struct threadpool_object *object = LIST_ENTRY( ptr, struct threadpool_object, pool_entry );
2487 assert( object->num_pending_callbacks > 0 );
2488
2489 /* If further pending callbacks are queued, move the work item to
2490 * the end of the pool list. Otherwise remove it from the pool. */
2491 list_remove( &object->pool_entry );
2492 if (object->num_pending_callbacks > 1)
2493 tp_object_prio_queue( object );
2494
2495 tp_object_execute( object, FALSE );
2496
2497 assert(pool->num_busy_workers);
2498 pool->num_busy_workers--;
2499
2500 tp_object_release( object );
2501 }
2502
2503 /* Shutdown worker thread if requested. */
2504 if (pool->shutdown)
2505 break;
2506
2507 /* Wait for new tasks or until the timeout expires. A thread only terminates
2508 * when no new tasks are available, and the number of threads can be
2509 * decreased without violating the min_workers limit. An exception is when
2510 * min_workers == 0, then objcount is used to detect if the last thread
2511 * can be terminated. */
2512 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
2513 if (RtlSleepConditionVariableCS( &pool->update_event, &pool->cs, &timeout ) == STATUS_TIMEOUT &&
2514 !threadpool_get_next_item( pool ) && (pool->num_workers > max( pool->min_workers, 1 ) ||
2515 (!pool->min_workers && !pool->objcount)))
2516 {
2517 break;
2518 }
2519 }
2520 pool->num_workers--;
2522
2523 TRACE( "terminating worker thread for pool %p\n", pool );
2525 RtlExitUserThread( 0 );
2526#ifdef __REACTOS__
2527 return STATUS_SUCCESS;
2528#endif
2529}
GLfloat param
Definition: glext.h:5796
static struct list * threadpool_get_next_item(const struct threadpool *pool)
Definition: threadpool.c:2292
static BOOL tp_threadpool_release(struct threadpool *pool)
Definition: threadpool.c:1845
static void tp_object_execute(struct threadpool_object *object, BOOL wait_thread)
Definition: threadpool.c:2312
static void tp_object_prio_queue(struct threadpool_object *object)
Definition: threadpool.c:2107
struct list pool_entry
Definition: threadpool.c:196

Referenced by tp_new_worker_thread().

◆ timer_callback_wrapper()

static DWORD WINAPI timer_callback_wrapper ( LPVOID  p)
static

Definition at line 669 of file threadpool.c.

670{
671 struct queue_timer *t = p;
672 t->callback(t->param, TRUE);
674 return 0;
675}

Referenced by queue_timer_expire().

◆ timer_cleanup_callback()

static void timer_cleanup_callback ( struct queue_timer t)
static

Definition at line 655 of file threadpool.c.

656{
657 struct timer_queue *q = t->q;
659
660 assert(0 < t->runcount);
661 --t->runcount;
662
663 if (t->destroy && t->runcount == 0)
665
667}

Referenced by queue_timer_expire(), and timer_callback_wrapper().

◆ timer_queue_thread_proc()

static void WINAPI timer_queue_thread_proc ( LPVOID  p)
static

Definition at line 788 of file threadpool.c.

790{
791 struct timer_queue *q = p;
792 ULONG timeout_ms;
793
794 set_thread_name(L"wine_threadpool_timer_queue");
795 timeout_ms = INFINITE;
796 for (;;)
797 {
800 BOOL done = FALSE;
801
803 q->event, FALSE, get_nt_timeout(&timeout, timeout_ms));
804
805 if (status == STATUS_WAIT_0)
806 {
807 /* There are two possible ways to trigger the event. Either
808 we are quitting and the last timer got removed, or a new
809 timer got put at the head of the list so we need to adjust
810 our timeout. */
812 if (q->quit && list_empty(&q->timers))
813 done = TRUE;
815 }
816 else if (status == STATUS_TIMEOUT)
818
819 if (done)
820 break;
821
822 timeout_ms = queue_get_timeout(q);
823 }
824
825 NtClose(q->event);
827 q->magic = 0;
830#ifdef __REACTOS__
831 return STATUS_SUCCESS;
832#endif
833}
NTSYSAPI NTSTATUS NTAPI RtlDeleteCriticalSection(_In_ PRTL_CRITICAL_SECTION CriticalSection)
static ULONG queue_get_timeout(struct timer_queue *q)
Definition: threadpool.c:763
static void queue_timer_expire(struct timer_queue *q)
Definition: threadpool.c:718

Referenced by RtlCreateTimerQueue().

◆ timerqueue_thread_proc()

static void CALLBACK timerqueue_thread_proc ( void param)
static

Definition at line 1144 of file threadpool.c.

1146{
1147 ULONGLONG timeout_lower, timeout_upper, new_timeout;
1148 struct threadpool_object *other_timer;
1150 struct list *ptr;
1151
1152 TRACE( "starting timer queue thread\n" );
1153 set_thread_name(L"wine_threadpool_timerqueue");
1154
1156 for (;;)
1157 {
1159
1160 /* Check for expired timers. */
1161 while ((ptr = list_head( &timerqueue.pending_timers )))
1162 {
1163 struct threadpool_object *timer = LIST_ENTRY( ptr, struct threadpool_object, u.timer.timer_entry );
1164 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1165 assert( timer->u.timer.timer_pending );
1166 if (timer->u.timer.timeout > now.QuadPart)
1167 break;
1168
1169 /* Queue a new callback in one of the worker threads. */
1170 list_remove( &timer->u.timer.timer_entry );
1171 timer->u.timer.timer_pending = FALSE;
1173
1174 /* Insert the timer back into the queue, except it's marked for shutdown. */
1175 if (timer->u.timer.period && !timer->shutdown)
1176 {
1177 timer->u.timer.timeout += (ULONGLONG)timer->u.timer.period * 10000;
1178 if (timer->u.timer.timeout <= now.QuadPart)
1179 timer->u.timer.timeout = now.QuadPart + 1;
1180
1181 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
1182 struct threadpool_object, u.timer.timer_entry )
1183 {
1184 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
1185 if (timer->u.timer.timeout < other_timer->u.timer.timeout)
1186 break;
1187 }
1188 list_add_before( &other_timer->u.timer.timer_entry, &timer->u.timer.timer_entry );
1189 timer->u.timer.timer_pending = TRUE;
1190 }
1191 }
1192
1193 timeout_lower = timeout_upper = MAXLONGLONG;
1194
1195 /* Determine next timeout and use the window length to optimize wakeup times. */
1196 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
1197 struct threadpool_object, u.timer.timer_entry )
1198 {
1199 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
1200 if (other_timer->u.timer.timeout >= timeout_upper)
1201 break;
1202
1203 timeout_lower = other_timer->u.timer.timeout;
1204 new_timeout = timeout_lower + (ULONGLONG)other_timer->u.timer.window_length * 10000;
1205 if (new_timeout < timeout_upper)
1206 timeout_upper = new_timeout;
1207 }
1208
1209 /* Wait for timer update events or until the next timer expires. */
1210 if (timerqueue.objcount)
1211 {
1212 timeout.QuadPart = timeout_lower;
1214 continue;
1215 }
1216
1217 /* All timers have been destroyed, if no new timers are created
1218 * within some amount of time, then we can shutdown this thread. */
1219 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
1220 if (RtlSleepConditionVariableCS( &timerqueue.update_event, &timerqueue.cs,
1221 &timeout ) == STATUS_TIMEOUT && !timerqueue.objcount)
1222 {
1223 break;
1224 }
1225 }
1226
1227 timerqueue.thread_running = FALSE;
1229
1230 TRACE( "terminating timer queue thread\n" );
1231 RtlExitUserThread( 0 );
1232#ifdef __REACTOS__
1233 return STATUS_SUCCESS;
1234#endif
1235}
GLsizei GLenum const GLvoid GLsizei GLenum GLbyte GLbyte GLbyte GLdouble GLdouble GLdouble GLfloat GLfloat GLfloat GLint GLint GLint GLshort GLshort GLshort GLubyte GLubyte GLubyte GLuint GLuint GLuint GLushort GLushort GLushort GLbyte GLbyte GLbyte GLbyte GLdouble GLdouble GLdouble GLdouble GLfloat GLfloat GLfloat GLfloat GLint GLint GLint GLint GLshort GLshort GLshort GLshort GLubyte GLubyte GLubyte GLubyte GLuint GLuint GLuint GLuint GLushort GLushort GLushort GLushort GLboolean const GLdouble const GLfloat const GLint const GLshort const GLbyte const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLdouble const GLfloat const GLfloat const GLint const GLint const GLshort const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort GLenum GLenum GLenum GLfloat GLenum GLint GLenum GLenum GLenum GLfloat GLenum GLenum GLint GLenum GLfloat GLenum GLint GLint GLushort GLenum GLenum GLfloat GLenum GLenum GLint GLfloat const GLubyte GLenum GLenum GLenum const GLfloat GLenum GLenum const GLint GLenum GLint GLint GLsizei GLsizei GLint GLenum GLenum const GLvoid GLenum GLenum const GLfloat GLenum GLenum const GLint GLenum GLenum const GLdouble GLenum GLenum const GLfloat GLenum GLenum const GLint GLsizei GLuint GLfloat GLuint GLbitfield GLfloat GLint GLuint GLboolean GLenum GLfloat GLenum GLbitfield GLenum GLfloat GLfloat GLint GLint const GLfloat GLenum GLfloat GLfloat GLint GLint GLfloat GLfloat GLint GLint const GLfloat GLint GLfloat GLfloat GLint GLfloat GLfloat GLint GLfloat GLfloat const GLdouble * u
Definition: glfuncs.h:240
if(dx< 0)
Definition: linetemp.h:194
#define MAXLONGLONG
NTSTATUS NTAPI NtQuerySystemTime(OUT PLARGE_INTEGER SystemTime)
Definition: time.c:569
#define LIST_FOR_EACH_ENTRY(elem, list, type, field)
Definition: list.h:198
static struct @5306 timerqueue
union threadpool_object::@5309 u
enum threadpool_objtype type
Definition: threadpool.c:183

Referenced by tp_timerqueue_lock().

◆ tp_alloc_wait()

static NTSTATUS tp_alloc_wait ( TP_WAIT **  out,
PTP_WAIT_CALLBACK  callback,
PVOID  userdata,
TP_CALLBACK_ENVIRON environment,
DWORD  flags 
)
static

Definition at line 2638 of file threadpool.c.

2640{
2641 struct threadpool_object *object;
2642 struct threadpool *pool;
2644
2645 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2646 if (!object)
2647 return STATUS_NO_MEMORY;
2648
2649 status = tp_threadpool_lock( &pool, environment );
2650 if (status)
2651 {
2652 RtlFreeHeap( GetProcessHeap(), 0, object );
2653 return status;
2654 }
2655
2656 object->type = TP_OBJECT_TYPE_WAIT;
2657 object->u.wait.callback = callback;
2658 object->u.wait.flags = flags;
2659
2660 status = tp_waitqueue_lock( object );
2661 if (status)
2662 {
2664 RtlFreeHeap( GetProcessHeap(), 0, object );
2665 return status;
2666 }
2667
2668 tp_object_initialize( object, pool, userdata, environment );
2669
2670 *out = (TP_WAIT *)object;
2671 return STATUS_SUCCESS;
2672}
static void tp_object_initialize(struct threadpool_object *object, struct threadpool *pool, PVOID userdata, TP_CALLBACK_ENVIRON *environment)
Definition: threadpool.c:2026
static NTSTATUS tp_waitqueue_lock(struct threadpool_object *wait)
Definition: threadpool.c:1515
static void tp_threadpool_unlock(struct threadpool *pool)
Definition: threadpool.c:1947
static NTSTATUS tp_threadpool_lock(struct threadpool **out, TP_CALLBACK_ENVIRON *environment)
Definition: threadpool.c:1874

Referenced by RtlRegisterWait(), and TpAllocWait().

◆ tp_group_alloc()

static NTSTATUS tp_group_alloc ( struct threadpool_group **  out)
static

Definition at line 1960 of file threadpool.c.

1961{
1962 struct threadpool_group *group;
1963
1964 group = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*group) );
1965 if (!group)
1966 return STATUS_NO_MEMORY;
1967
1968 group->refcount = 1;
1969 group->shutdown = FALSE;
1970
1971#ifdef __REACTOS__
1973#else
1975
1976 group->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool_group.cs");
1977#endif
1978
1979 list_init( &group->members );
1980
1981 TRACE( "allocated group %p\n", group );
1982
1983 *out = group;
1984 return STATUS_SUCCESS;
1985}
NTSYSAPI NTSTATUS WINAPI RtlInitializeCriticalSectionEx(RTL_CRITICAL_SECTION *, ULONG, ULONG)
#define DWORD_PTR
Definition: treelist.c:76
#define RTL_CRITICAL_SECTION_FLAG_FORCE_DEBUG_INFO
Definition: winnt_old.h:1138

Referenced by TpAllocCleanupGroup().

◆ tp_group_release()

static BOOL tp_group_release ( struct threadpool_group group)
static

Definition at line 2002 of file threadpool.c.

2003{
2004 if (InterlockedDecrement( &group->refcount ))
2005 return FALSE;
2006
2007 TRACE( "destroying group %p\n", group );
2008
2009 assert( group->shutdown );
2010 assert( list_empty( &group->members ) );
2011
2012#ifndef __REACTOS__
2013 group->cs.DebugInfo->Spare[0] = 0;
2014#endif
2016
2018 return TRUE;
2019}
#define InterlockedDecrement
Definition: armddk.h:52

Referenced by tp_object_release(), and TpReleaseCleanupGroup().

◆ tp_group_shutdown()

static void tp_group_shutdown ( struct threadpool_group group)
static

Definition at line 1992 of file threadpool.c.

1993{
1994 group->shutdown = TRUE;
1995}

Referenced by TpReleaseCleanupGroup().

◆ tp_ioqueue_lock()

static NTSTATUS tp_ioqueue_lock ( struct threadpool_object io,
HANDLE  file 
)
static

Definition at line 1724 of file threadpool.c.

1725{
1727
1728 assert( io->type == TP_OBJECT_TYPE_IO );
1729
1731
1732 if (!ioqueue.port && (status = NtCreateIoCompletion( &ioqueue.port,
1734 {
1736 return status;
1737 }
1738
1739 if (!ioqueue.thread_running)
1740 {
1741 HANDLE thread;
1742
1744 0, 0, 0, ioqueue_thread_proc, NULL, &thread, NULL )))
1745 {
1746 ioqueue.thread_running = TRUE;
1747 NtClose( thread );
1748 }
1749 }
1750
1751 if (status == STATUS_SUCCESS)
1752 {
1755
1756#ifdef __REACTOS__
1757 info.Port = ioqueue.port;
1758 info.Key = io;
1759#else
1760 info.CompletionPort = ioqueue.port;
1761 info.CompletionKey = (ULONG_PTR)io;
1762#endif
1763
1765 }
1766
1767 if (status == STATUS_SUCCESS)
1768 {
1769 if (!ioqueue.objcount++)
1770 RtlWakeConditionVariable( &ioqueue.update_event );
1771 }
1772
1774 return status;
1775}
NTSYSAPI VOID NTAPI RtlWakeConditionVariable(_Inout_ PRTL_CONDITION_VARIABLE ConditionVariable)
static void CALLBACK ioqueue_thread_proc(void *param)
Definition: threadpool.c:1615
Definition: fci.c:127

Referenced by TpAllocIoCompletion().

◆ tp_ioqueue_unlock()

static void tp_ioqueue_unlock ( struct threadpool_object io)
static

Definition at line 2218 of file threadpool.c.

2219{
2220 assert( io->type == TP_OBJECT_TYPE_IO );
2221
2223
2224 assert(ioqueue.objcount);
2225
2226 if (!io->shutdown && !--ioqueue.objcount)
2227 NtSetIoCompletion( ioqueue.port, 0, 0, STATUS_SUCCESS, 0 );
2228
2230}
NTSTATUS NTAPI NtSetIoCompletion(IN HANDLE IoCompletionPortHandle, IN PVOID CompletionKey, IN PVOID CompletionContext, IN NTSTATUS CompletionStatus, IN ULONG CompletionInformation)
Definition: iocomp.c:569

Referenced by tp_object_prepare_shutdown().

◆ tp_new_worker_thread()

static NTSTATUS tp_new_worker_thread ( struct threadpool pool)
static

Definition at line 1242 of file threadpool.c.

1243{
1244 HANDLE thread;
1246
1248 pool->stack_info.StackReserve, pool->stack_info.StackCommit,
1250 if (status == STATUS_SUCCESS)
1251 {
1252 InterlockedIncrement( &pool->refcount );
1253 pool->num_workers++;
1254 NtClose( thread );
1255 }
1256 return status;
1257}
#define InterlockedIncrement
Definition: armddk.h:53
static void CALLBACK threadpool_worker_proc(void *param)
Definition: threadpool.c:2471

Referenced by tp_object_submit(), tp_threadpool_lock(), TpCallbackMayRunLong(), and TpSetPoolMinThreads().

◆ tp_object_cancel()

static void tp_object_cancel ( struct threadpool_object object)
static

Definition at line 2158 of file threadpool.c.

2159{
2160 struct threadpool *pool = object->pool;
2161 LONG pending_callbacks = 0;
2162
2164 if (object->num_pending_callbacks)
2165 {
2166 pending_callbacks = object->num_pending_callbacks;
2167 object->num_pending_callbacks = 0;
2168 list_remove( &object->pool_entry );
2169
2170 if (object->type == TP_OBJECT_TYPE_WAIT)
2171 object->u.wait.signaled = 0;
2172 }
2173 if (object->type == TP_OBJECT_TYPE_IO)
2174 {
2175 object->u.io.skipped_count += object->u.io.pending_count;
2176 object->u.io.pending_count = 0;
2177 }
2179
2180 while (pending_callbacks--)
2181 tp_object_release( object );
2182}
long LONG
Definition: pedump.c:60

Referenced by TpReleaseCleanupGroupMembers(), TpWaitForIoCompletion(), TpWaitForTimer(), TpWaitForWait(), and TpWaitForWork().

◆ tp_object_execute()

static void tp_object_execute ( struct threadpool_object object,
BOOL  wait_thread 
)
static

Definition at line 2312 of file threadpool.c.

2313{
2314 TP_CALLBACK_INSTANCE *callback_instance;
2317 struct threadpool *pool = object->pool;
2318 TP_WAIT_RESULT wait_result = 0;
2320
2321 object->num_pending_callbacks--;
2322
2323 /* For wait objects check if they were signaled or have timed out. */
2324 if (object->type == TP_OBJECT_TYPE_WAIT)
2325 {
2326 wait_result = object->u.wait.signaled ? WAIT_OBJECT_0 : WAIT_TIMEOUT;
2327 if (wait_result == WAIT_OBJECT_0) object->u.wait.signaled--;
2328 }
2329 else if (object->type == TP_OBJECT_TYPE_IO)
2330 {
2331 assert( object->u.io.completion_count );
2332 completion = object->u.io.completions[--object->u.io.completion_count];
2333 }
2334
2335 /* Leave critical section and do the actual callback. */
2336 object->num_associated_callbacks++;
2337 object->num_running_callbacks++;
2339 if (wait_thread) RtlLeaveCriticalSection( &waitqueue.cs );
2340
2341 /* Initialize threadpool instance struct. */
2342 callback_instance = (TP_CALLBACK_INSTANCE *)&instance;
2343 instance.object = object;
2344 instance.threadid = GetCurrentThreadId();
2345 instance.associated = TRUE;
2346 instance.may_run_long = object->may_run_long;
2347 instance.cleanup.critical_section = NULL;
2348 instance.cleanup.mutex = NULL;
2349 instance.cleanup.semaphore = NULL;
2350 instance.cleanup.semaphore_count = 0;
2351 instance.cleanup.event = NULL;
2352 instance.cleanup.library = NULL;
2353
2354 switch (object->type)
2355 {
2357 {
2358 TRACE( "executing simple callback %p(%p, %p)\n",
2359 object->u.simple.callback, callback_instance, object->userdata );
2360 object->u.simple.callback( callback_instance, object->userdata );
2361 TRACE( "callback %p returned\n", object->u.simple.callback );
2362 break;
2363 }
2364
2366 {
2367 TRACE( "executing work callback %p(%p, %p, %p)\n",
2368 object->u.work.callback, callback_instance, object->userdata, object );
2369 object->u.work.callback( callback_instance, object->userdata, (TP_WORK *)object );
2370 TRACE( "callback %p returned\n", object->u.work.callback );
2371 break;
2372 }
2373
2375 {
2376 TRACE( "executing timer callback %p(%p, %p, %p)\n",
2377 object->u.timer.callback, callback_instance, object->userdata, object );
2378 object->u.timer.callback( callback_instance, object->userdata, (TP_TIMER *)object );
2379 TRACE( "callback %p returned\n", object->u.timer.callback );
2380 break;
2381 }
2382
2384 {
2385 TRACE( "executing wait callback %p(%p, %p, %p, %lu)\n",
2386 object->u.wait.callback, callback_instance, object->userdata, object, wait_result );
2387 object->u.wait.callback( callback_instance, object->userdata, (TP_WAIT *)object, wait_result );
2388 TRACE( "callback %p returned\n", object->u.wait.callback );
2389 break;
2390 }
2391
2392 case TP_OBJECT_TYPE_IO:
2393 {
2394 TRACE( "executing I/O callback %p(%p, %p, %#Ix, %p, %p)\n",
2395 object->u.io.callback, callback_instance, object->userdata,
2396 completion.cvalue, &completion.iosb, (TP_IO *)object );
2397 object->u.io.callback( callback_instance, object->userdata,
2398 (void *)completion.cvalue, &completion.iosb, (TP_IO *)object );
2399 TRACE( "callback %p returned\n", object->u.io.callback );
2400 break;
2401 }
2402
2403 default:
2404 assert(0);
2405 break;
2406 }
2407
2408 /* Execute finalization callback. */
2409 if (object->finalization_callback)
2410 {
2411 TRACE( "executing finalization callback %p(%p, %p)\n",
2412 object->finalization_callback, callback_instance, object->userdata );
2413 object->finalization_callback( callback_instance, object->userdata );
2414 TRACE( "callback %p returned\n", object->finalization_callback );
2415 }
2416
2417 /* Execute cleanup tasks. */
2418 if (instance.cleanup.critical_section)
2419 {
2420 RtlLeaveCriticalSection( instance.cleanup.critical_section );
2421 }
2422 if (instance.cleanup.mutex)
2423 {
2424 status = NtReleaseMutant( instance.cleanup.mutex, NULL );
2425 if (status != STATUS_SUCCESS) goto skip_cleanup;
2426 }
2427 if (instance.cleanup.semaphore)
2428 {
2429 status = NtReleaseSemaphore( instance.cleanup.semaphore, instance.cleanup.semaphore_count, NULL );
2430 if (status != STATUS_SUCCESS) goto skip_cleanup;
2431 }
2432 if (instance.cleanup.event)
2433 {
2434 status = NtSetEvent( instance.cleanup.event, NULL );
2435 if (status != STATUS_SUCCESS) goto skip_cleanup;
2436 }
2437 if (instance.cleanup.library)
2438 {
2439 LdrUnloadDll( instance.cleanup.library );
2440 }
2441
2442skip_cleanup:
2443 if (wait_thread) RtlEnterCriticalSection( &waitqueue.cs );
2445
2446 /* Simple callbacks are automatically shutdown after execution. */
2447 if (object->type == TP_OBJECT_TYPE_SIMPLE)
2448 {
2450 object->shutdown = TRUE;
2451 }
2452
2453 object->num_running_callbacks--;
2454 if (object_is_finished( object, TRUE ))
2455 RtlWakeAllConditionVariable( &object->group_finished_event );
2456
2457 if (instance.associated)
2458 {
2459 object->num_associated_callbacks--;
2460 if (object_is_finished( object, FALSE ))
2461 RtlWakeAllConditionVariable( &object->finished_event );
2462 }
2463}
#define WAIT_TIMEOUT
Definition: dderror.h:14
NTSTATUS NTAPI LdrUnloadDll(_In_ PVOID BaseAddress)
Definition: ldrapi.c:1291
NTSTATUS NTAPI NtReleaseMutant(IN HANDLE MutantHandle, IN PLONG PreviousCount OPTIONAL)
Definition: mutant.c:296
NTSYSAPI VOID NTAPI RtlWakeAllConditionVariable(_Inout_ PRTL_CONDITION_VARIABLE ConditionVariable)
static BOOL object_is_finished(struct threadpool_object *object, BOOL group)
Definition: threadpool.c:2184
static void tp_object_prepare_shutdown(struct threadpool_object *object)
Definition: threadpool.c:2237
NTSTATUS NTAPI NtReleaseSemaphore(IN HANDLE SemaphoreHandle, IN LONG ReleaseCount, OUT PLONG PreviousCount OPTIONAL)
Definition: sem.c:295
DWORD WINAPI GetCurrentThreadId(void)
Definition: thread.c:459
#define WAIT_OBJECT_0
Definition: winbase.h:383
DWORD TP_WAIT_RESULT
Definition: winnt_old.h:4648
struct _TP_WORK TP_WORK
Definition: winnt_old.h:4642
struct _TP_IO TP_IO
Definition: winnt_old.h:4646
struct _TP_TIMER TP_TIMER
Definition: winnt_old.h:4644
struct _TP_CALLBACK_INSTANCE TP_CALLBACK_INSTANCE
Definition: winnt_old.h:4643

Referenced by threadpool_worker_proc(), and waitqueue_thread_proc().

◆ tp_object_initialize()

static void tp_object_initialize ( struct threadpool_object object,
struct threadpool pool,
PVOID  userdata,
TP_CALLBACK_ENVIRON environment 
)
static

Definition at line 2026 of file threadpool.c.

2028{
2029 BOOL is_simple_callback = (object->type == TP_OBJECT_TYPE_SIMPLE);
2030
2031 object->refcount = 1;
2032 object->shutdown = FALSE;
2033
2034 object->pool = pool;
2035 object->group = NULL;
2036 object->userdata = userdata;
2037 object->group_cancel_callback = NULL;
2038 object->finalization_callback = NULL;
2039 object->may_run_long = 0;
2040 object->race_dll = NULL;
2041 object->priority = TP_CALLBACK_PRIORITY_NORMAL;
2042
2043 memset( &object->group_entry, 0, sizeof(object->group_entry) );
2044 object->is_group_member = FALSE;
2045
2046 memset( &object->pool_entry, 0, sizeof(object->pool_entry) );
2047 RtlInitializeConditionVariable( &object->finished_event );
2048 RtlInitializeConditionVariable( &object->group_finished_event );
2049 object->completed_event = NULL;
2050 object->num_pending_callbacks = 0;
2051 object->num_running_callbacks = 0;
2052 object->num_associated_callbacks = 0;
2053
2054 if (environment)
2055 {
2056 if (environment->Version != 1 && environment->Version != 3)
2057 FIXME( "unsupported environment version %lu\n", environment->Version );
2058
2059 object->group = impl_from_TP_CLEANUP_GROUP( environment->CleanupGroup );
2060 object->group_cancel_callback = environment->CleanupGroupCancelCallback;
2061 object->finalization_callback = environment->FinalizationCallback;
2062 object->may_run_long = environment->u.s.LongFunction != 0;
2063 object->race_dll = environment->RaceDll;
2064#ifndef __REACTOS__ //Windows 7 stuff
2065 if (environment->Version == 3)
2066 {
2067 TP_CALLBACK_ENVIRON_V3 *environment_v3 = (TP_CALLBACK_ENVIRON_V3 *)environment;
2068
2069 object->priority = environment_v3->CallbackPriority;
2070 assert( object->priority < ARRAY_SIZE(pool->pools) );
2071 }
2072#endif
2073 if (environment->ActivationContext)
2074 FIXME( "activation context not supported yet\n" );
2075
2076 if (environment->u.s.Persistent)
2077 FIXME( "persistent threads not supported yet\n" );
2078 }
2079
2080 if (object->race_dll)
2081 LdrAddRefDll( 0, object->race_dll );
2082
2083 TRACE( "allocated object %p of type %u\n", object, object->type );
2084
2085 /* For simple callbacks we have to run tp_object_submit before adding this object
2086 * to the cleanup group. As soon as the cleanup group members are released ->shutdown
2087 * will be set, and tp_object_submit would fail with an assertion. */
2088
2089 if (is_simple_callback)
2090 tp_object_submit( object, FALSE );
2091
2092 if (object->group)
2093 {
2094 struct threadpool_group *group = object->group;
2095 InterlockedIncrement( &group->refcount );
2096
2098 list_add_tail( &group->members, &object->group_entry );
2099 object->is_group_member = TRUE;
2101 }
2102
2103 if (is_simple_callback)
2104 tp_object_release( object );
2105}
static void list_add_tail(struct list_entry *head, struct list_entry *entry)
Definition: list.h:83
NTSTATUS NTAPI LdrAddRefDll(_In_ ULONG Flags, _In_ PVOID BaseAddress)
Definition: ldrapi.c:1205
NTSYSAPI VOID NTAPI RtlInitializeConditionVariable(_Out_ PRTL_CONDITION_VARIABLE ConditionVariable)
static struct threadpool_group * impl_from_TP_CLEANUP_GROUP(TP_CLEANUP_GROUP *group)
Definition: threadpool.c:420
PTP_CLEANUP_GROUP CleanupGroup
Definition: winnt_old.h:4690
PTP_SIMPLE_CALLBACK FinalizationCallback
Definition: winnt_old.h:4694
TP_CALLBACK_PRIORITY CallbackPriority
Definition: winnt_old.h:4703
PTP_CLEANUP_GROUP_CANCEL_CALLBACK CleanupGroupCancelCallback
Definition: winnt_old.h:4691
struct _ACTIVATION_CONTEXT * ActivationContext
Definition: winnt_old.h:4693
@ TP_CALLBACK_PRIORITY_NORMAL
Definition: winnt_old.h:4653

Referenced by tp_alloc_wait(), TpAllocIoCompletion(), TpAllocTimer(), TpAllocWork(), and TpSimpleTryPost().

◆ tp_object_prepare_shutdown()

static void tp_object_prepare_shutdown ( struct threadpool_object object)
static

Definition at line 2237 of file threadpool.c.

2238{
2239 if (object->type == TP_OBJECT_TYPE_TIMER)
2240 tp_timerqueue_unlock( object );
2241 else if (object->type == TP_OBJECT_TYPE_WAIT)
2242 tp_waitqueue_unlock( object );
2243 else if (object->type == TP_OBJECT_TYPE_IO)
2244 tp_ioqueue_unlock( object );
2245}
static void tp_waitqueue_unlock(struct threadpool_object *wait)
Definition: threadpool.c:1593
static void tp_ioqueue_unlock(struct threadpool_object *io)
Definition: threadpool.c:2218
static void tp_timerqueue_unlock(struct threadpool_object *timer)
Definition: threadpool.c:1307

Referenced by tp_object_execute(), TpReleaseCleanupGroupMembers(), TpReleaseIoCompletion(), TpReleaseTimer(), TpReleaseWait(), and TpReleaseWork().

◆ tp_object_prio_queue()

static void tp_object_prio_queue ( struct threadpool_object object)
static

Definition at line 2107 of file threadpool.c.

2108{
2109 ++object->pool->num_busy_workers;
2110 list_add_tail( &object->pool->pools[object->priority], &object->pool_entry );
2111}

Referenced by threadpool_worker_proc(), and tp_object_submit().

◆ tp_object_release()

static BOOL tp_object_release ( struct threadpool_object object)
static

Definition at line 2252 of file threadpool.c.

2253{
2254 if (InterlockedDecrement( &object->refcount ))
2255 return FALSE;
2256
2257 TRACE( "destroying object %p of type %u\n", object, object->type );
2258
2259 assert( object->shutdown );
2260 assert( !object->num_pending_callbacks );
2261 assert( !object->num_running_callbacks );
2262 assert( !object->num_associated_callbacks );
2263
2264 /* release reference to the group */
2265 if (object->group)
2266 {
2267 struct threadpool_group *group = object->group;
2268
2270 if (object->is_group_member)
2271 {
2272 list_remove( &object->group_entry );
2273 object->is_group_member = FALSE;
2274 }
2276
2278 }
2279
2281
2282 if (object->race_dll)
2283 LdrUnloadDll( object->race_dll );
2284
2285 if (object->completed_event && object->completed_event != INVALID_HANDLE_VALUE)
2286 NtSetEvent( object->completed_event, NULL );
2287
2288 RtlFreeHeap( GetProcessHeap(), 0, object );
2289 return TRUE;
2290}
static BOOL tp_group_release(struct threadpool_group *group)
Definition: threadpool.c:2002

Referenced by ioqueue_thread_proc(), threadpool_worker_proc(), tp_object_cancel(), tp_object_initialize(), TpReleaseCleanupGroupMembers(), TpReleaseIoCompletion(), TpReleaseTimer(), TpReleaseWait(), TpReleaseWork(), and waitqueue_thread_proc().

◆ tp_object_submit()

static void tp_object_submit ( struct threadpool_object object,
BOOL  signaled 
)
static

Definition at line 2119 of file threadpool.c.

2120{
2121 struct threadpool *pool = object->pool;
2123
2124 assert( !object->shutdown );
2125 assert( !pool->shutdown );
2126
2128
2129 /* Start new worker threads if required. */
2130 if (pool->num_busy_workers >= pool->num_workers &&
2131 pool->num_workers < pool->max_workers)
2133
2134 /* Queue work item and increment refcount. */
2135 InterlockedIncrement( &object->refcount );
2136 if (!object->num_pending_callbacks++)
2137 tp_object_prio_queue( object );
2138
2139 /* Count how often the object was signaled. */
2140 if (object->type == TP_OBJECT_TYPE_WAIT && signaled)
2141 object->u.wait.signaled++;
2142
2143 /* No new thread started - wake up one existing thread. */
2144 if (status != STATUS_SUCCESS)
2145 {
2146 assert( pool->num_workers > 0 );
2147 RtlWakeConditionVariable( &pool->update_event );
2148 }
2149
2151}
static NTSTATUS tp_new_worker_thread(struct threadpool *pool)
Definition: threadpool.c:1242
#define STATUS_UNSUCCESSFUL
Definition: udferr_usr.h:132

Referenced by ioqueue_thread_proc(), timerqueue_thread_proc(), tp_object_initialize(), TpPostWork(), TpSetTimer(), and waitqueue_thread_proc().

◆ tp_object_wait()

static void tp_object_wait ( struct threadpool_object object,
BOOL  group_wait 
)
static

Definition at line 2203 of file threadpool.c.

2204{
2205 struct threadpool *pool = object->pool;
2206
2208 while (!object_is_finished( object, group_wait ))
2209 {
2210 if (group_wait)
2211 RtlSleepConditionVariableCS( &object->group_finished_event, &pool->cs, NULL );
2212 else
2213 RtlSleepConditionVariableCS( &object->finished_event, &pool->cs, NULL );
2214 }
2216}

Referenced by TpReleaseCleanupGroupMembers(), TpWaitForIoCompletion(), TpWaitForTimer(), TpWaitForWait(), and TpWaitForWork().

◆ tp_threadpool_alloc()

static NTSTATUS tp_threadpool_alloc ( struct threadpool **  out)
static

Definition at line 1782 of file threadpool.c.

1783{
1784#ifdef __REACTOS__
1785 IMAGE_NT_HEADERS *nt = RtlImageNtHeader( NtCurrentTeb()->ProcessEnvironmentBlock->ImageBaseAddress );
1786#else
1788#endif
1789 struct threadpool *pool;
1790 unsigned int i;
1791
1792 pool = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*pool) );
1793 if (!pool)
1794 return STATUS_NO_MEMORY;
1795
1796 pool->refcount = 1;
1797 pool->objcount = 0;
1798 pool->shutdown = FALSE;
1799
1800#ifdef __REACTOS__
1802#else
1804
1805 pool->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool.cs");
1806#endif
1807
1808 for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
1809 list_init( &pool->pools[i] );
1810 RtlInitializeConditionVariable( &pool->update_event );
1811
1812 pool->max_workers = 500;
1813 pool->min_workers = 0;
1814 pool->num_workers = 0;
1815 pool->num_busy_workers = 0;
1816 pool->stack_info.StackReserve = nt->OptionalHeader.SizeOfStackReserve;
1817 pool->stack_info.StackCommit = nt->OptionalHeader.SizeOfStackCommit;
1818
1819 TRACE( "allocated threadpool %p\n", pool );
1820
1821 *out = pool;
1822 return STATUS_SUCCESS;
1823}
#define RtlImageNtHeader
Definition: compat.h:806
PPEB Peb
Definition: dllmain.c:27
#define NtCurrentTeb
IMAGE_NT_HEADERS nt
Definition: module.c:50
IMAGE_OPTIONAL_HEADER32 OptionalHeader
Definition: ntddk_ex.h:184
PVOID ImageBaseAddress
Definition: ntddk_ex.h:245

Referenced by tp_threadpool_lock(), and TpAllocPool().

◆ tp_threadpool_lock()

static NTSTATUS tp_threadpool_lock ( struct threadpool **  out,
TP_CALLBACK_ENVIRON environment 
)
static

Definition at line 1874 of file threadpool.c.

1875{
1876 struct threadpool *pool = NULL;
1878
1879 if (environment)
1880 {
1881#ifndef __REACTOS__ //Windows 7 stuff
1882 /* Validate environment parameters. */
1883 if (environment->Version == 3)
1884 {
1885 TP_CALLBACK_ENVIRON_V3 *environment3 = (TP_CALLBACK_ENVIRON_V3 *)environment;
1886
1887 switch (environment3->CallbackPriority)
1888 {
1892 break;
1893 default:
1895 }
1896 }
1897#endif
1898 pool = (struct threadpool *)environment->Pool;
1899 }
1900
1901 if (!pool)
1902 {
1903 if (!default_threadpool)
1904 {
1906 if (status != STATUS_SUCCESS)
1907 return status;
1908
1910 {
1913 }
1914 }
1915
1917 }
1918
1920
1921 /* Make sure that the threadpool has at least one thread. */
1922 if (!pool->num_workers)
1924
1925 /* Keep a reference, and increment objcount to ensure that the
1926 * last thread doesn't terminate. */
1927 if (status == STATUS_SUCCESS)
1928 {
1929 InterlockedIncrement( &pool->refcount );
1930 pool->objcount++;
1931 }
1932
1934
1935 if (status != STATUS_SUCCESS)
1936 return status;
1937
1938 *out = pool;
1939 return STATUS_SUCCESS;
1940}
static struct threadpool * default_threadpool
Definition: threadpool.c:439
static NTSTATUS tp_threadpool_alloc(struct threadpool **out)
Definition: threadpool.c:1782
static void tp_threadpool_shutdown(struct threadpool *pool)
Definition: threadpool.c:1832
#define STATUS_INVALID_PARAMETER
Definition: udferr_usr.h:135
@ TP_CALLBACK_PRIORITY_HIGH
Definition: winnt_old.h:4652
@ TP_CALLBACK_PRIORITY_LOW
Definition: winnt_old.h:4654

Referenced by tp_alloc_wait(), TpAllocIoCompletion(), TpAllocTimer(), TpAllocWork(), and TpSimpleTryPost().

◆ tp_threadpool_release()

static BOOL tp_threadpool_release ( struct threadpool pool)
static

Definition at line 1845 of file threadpool.c.

1846{
1847 unsigned int i;
1848
1849 if (InterlockedDecrement( &pool->refcount ))
1850 return FALSE;
1851
1852 TRACE( "destroying threadpool %p\n", pool );
1853
1854 assert( pool->shutdown );
1855 assert( !pool->objcount );
1856 for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
1857 assert( list_empty( &pool->pools[i] ) );
1858#ifndef __REACTOS__
1859 pool->cs.DebugInfo->Spare[0] = 0;
1860#endif
1862
1864 return TRUE;
1865}

Referenced by threadpool_worker_proc(), tp_threadpool_lock(), tp_threadpool_unlock(), and TpReleasePool().

◆ tp_threadpool_shutdown()

static void tp_threadpool_shutdown ( struct threadpool pool)
static

Definition at line 1832 of file threadpool.c.

1833{
1835
1836 pool->shutdown = TRUE;
1837 RtlWakeAllConditionVariable( &pool->update_event );
1838}

Referenced by tp_threadpool_lock(), and TpReleasePool().

◆ tp_threadpool_unlock()

static void tp_threadpool_unlock ( struct threadpool pool)
static

Definition at line 1947 of file threadpool.c.

1948{
1950 pool->objcount--;
1953}

Referenced by tp_alloc_wait(), tp_object_release(), TpAllocIoCompletion(), and TpAllocTimer().

◆ tp_timerqueue_lock()

static NTSTATUS tp_timerqueue_lock ( struct threadpool_object timer)
static

Definition at line 1265 of file threadpool.c.

1266{
1268 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1269
1270 timer->u.timer.timer_initialized = FALSE;
1271 timer->u.timer.timer_pending = FALSE;
1272 timer->u.timer.timer_set = FALSE;
1273 timer->u.timer.timeout = 0;
1274 timer->u.timer.period = 0;
1275 timer->u.timer.window_length = 0;
1276
1278
1279 /* Make sure that the timerqueue thread is running. */
1280 if (!timerqueue.thread_running)
1281 {
1282 HANDLE thread;
1285 if (status == STATUS_SUCCESS)
1286 {
1287 timerqueue.thread_running = TRUE;
1288 NtClose( thread );
1289 }
1290 }
1291
1292 if (status == STATUS_SUCCESS)
1293 {
1294 timer->u.timer.timer_initialized = TRUE;
1295 timerqueue.objcount++;
1296 }
1297
1299 return status;
1300}
static void CALLBACK timerqueue_thread_proc(void *param)
Definition: threadpool.c:1144

Referenced by TpAllocTimer().

◆ tp_timerqueue_unlock()

static void tp_timerqueue_unlock ( struct threadpool_object timer)
static

Definition at line 1307 of file threadpool.c.

1308{
1309 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1310
1312 if (timer->u.timer.timer_initialized)
1313 {
1314 /* If timer was pending, remove it. */
1315 if (timer->u.timer.timer_pending)
1316 {
1317 list_remove( &timer->u.timer.timer_entry );
1318 timer->u.timer.timer_pending = FALSE;
1319 }
1320
1321 /* If the last timer object was destroyed, then wake up the thread. */
1322 if (!--timerqueue.objcount)
1323 {
1324 assert( list_empty( &timerqueue.pending_timers ) );
1325 RtlWakeAllConditionVariable( &timerqueue.update_event );
1326 }
1327
1328 timer->u.timer.timer_initialized = FALSE;
1329 }
1331}

Referenced by tp_object_prepare_shutdown().

◆ tp_waitqueue_lock()

static NTSTATUS tp_waitqueue_lock ( struct threadpool_object wait)
static

Definition at line 1515 of file threadpool.c.

1516{
1517 struct waitqueue_bucket *bucket;
1519 HANDLE thread;
1520 BOOL alertable = (wait->u.wait.flags & WT_EXECUTEINIOTHREAD) != 0;
1521 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1522
1523 wait->u.wait.signaled = 0;
1524 wait->u.wait.bucket = NULL;
1525 wait->u.wait.wait_pending = FALSE;
1526 wait->u.wait.timeout = 0;
1527 wait->u.wait.handle = INVALID_HANDLE_VALUE;
1528
1530
1531 /* Try to assign to existing bucket if possible. */
1532 LIST_FOR_EACH_ENTRY( bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
1533 {
1534 if (bucket->objcount < MAXIMUM_WAITQUEUE_OBJECTS && bucket->alertable == alertable)
1535 {
1536 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1537 wait->u.wait.bucket = bucket;
1538 bucket->objcount++;
1539
1541 goto out;
1542 }
1543 }
1544
1545 /* Create a new bucket and corresponding worker thread. */
1546 bucket = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*bucket) );
1547 if (!bucket)
1548 {
1550 goto out;
1551 }
1552
1553 bucket->objcount = 0;
1554 bucket->alertable = alertable;
1555 list_init( &bucket->reserved );
1556 list_init( &bucket->waiting );
1557
1560 if (status)
1561 {
1562 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1563 goto out;
1564 }
1565
1568 if (status == STATUS_SUCCESS)
1569 {
1570 list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
1571 waitqueue.num_buckets++;
1572
1573 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1574 wait->u.wait.bucket = bucket;
1575 bucket->objcount++;
1576
1577 NtClose( thread );
1578 }
1579 else
1580 {
1581 NtClose( bucket->update_event );
1582 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1583 }
1584
1585out:
1587 return status;
1588}
ULONG(NTAPI * PTHREAD_START_ROUTINE)(PVOID Parameter)
Definition: rtltypes.h:560
#define MAXIMUM_WAITQUEUE_OBJECTS
Definition: threadpool.c:140
static void CALLBACK waitqueue_thread_proc(void *param)
Definition: threadpool.c:1339
struct list waiting
Definition: threadpool.c:351
HANDLE update_event
Definition: threadpool.c:352
struct list bucket_entry
Definition: threadpool.c:348
struct list reserved
Definition: threadpool.c:350

Referenced by tp_alloc_wait().

◆ tp_waitqueue_unlock()

static void tp_waitqueue_unlock ( struct threadpool_object wait)
static

Definition at line 1593 of file threadpool.c.

1594{
1595 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1596
1598 if (wait->u.wait.bucket)
1599 {
1600 struct waitqueue_bucket *bucket = wait->u.wait.bucket;
1601 assert( bucket->objcount > 0 );
1602
1603 list_remove( &wait->u.wait.wait_entry );
1604 wait->u.wait.bucket = NULL;
1605 bucket->objcount--;
1606
1607 NtSetEvent( bucket->update_event, NULL );
1608 }
1610}

Referenced by tp_object_prepare_shutdown().

◆ TpAllocCleanupGroup()

NTSTATUS WINAPI TpAllocCleanupGroup ( TP_CLEANUP_GROUP **  out)

Definition at line 2534 of file threadpool.c.

2535{
2536 TRACE( "%p\n", out );
2537
2538 return tp_group_alloc( (struct threadpool_group **)out );
2539}
static NTSTATUS tp_group_alloc(struct threadpool_group **out)
Definition: threadpool.c:1960

Referenced by CreateThreadpoolCleanupGroup(), and init_threadpool().

◆ TpAllocIoCompletion()

NTSTATUS WINAPI TpAllocIoCompletion ( TP_IO **  out,
HANDLE  file,
PTP_IO_CALLBACK  callback,
void userdata,
TP_CALLBACK_ENVIRON environment 
)

Definition at line 2544 of file threadpool.c.

2546{
2547 struct threadpool_object *object;
2548 struct threadpool *pool;
2550
2551 TRACE( "%p %p %p %p %p\n", out, file, callback, userdata, environment );
2552
2553 if (!(object = RtlAllocateHeap( GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(*object) )))
2554 return STATUS_NO_MEMORY;
2555
2556 if ((status = tp_threadpool_lock( &pool, environment )))
2557 {
2558 RtlFreeHeap( GetProcessHeap(), 0, object );
2559 return status;
2560 }
2561
2562 object->type = TP_OBJECT_TYPE_IO;
2563 object->u.io.callback = callback;
2564 if (!(object->u.io.completions = RtlAllocateHeap( GetProcessHeap(), 0, 8 * sizeof(*object->u.io.completions) )))
2565 {
2567 RtlFreeHeap( GetProcessHeap(), 0, object );
2568 return status;
2569 }
2570
2571 if ((status = tp_ioqueue_lock( object, file )))
2572 {
2574 RtlFreeHeap( GetProcessHeap(), 0, object->u.io.completions );
2575 RtlFreeHeap( GetProcessHeap(), 0, object );
2576 return status;
2577 }
2578
2579 tp_object_initialize( object, pool, userdata, environment );
2580
2581 *out = (TP_IO *)object;
2582 return STATUS_SUCCESS;
2583}
#define HEAP_ZERO_MEMORY
Definition: compat.h:134
static NTSTATUS tp_ioqueue_lock(struct threadpool_object *io, HANDLE file)
Definition: threadpool.c:1724

Referenced by CreateThreadpoolIo(), and init_threadpool().

◆ TpAllocPool()

NTSTATUS WINAPI TpAllocPool ( TP_POOL **  out,
PVOID  reserved 
)

Definition at line 2588 of file threadpool.c.

2589{
2590 TRACE( "%p %p\n", out, reserved );
2591
2592 if (reserved)
2593 FIXME( "reserved argument is nonzero (%p)\n", reserved );
2594
2595 return tp_threadpool_alloc( (struct threadpool **)out );
2596}
r reserved
Definition: btrfs.c:3006

Referenced by CreateThreadpool(), and init_threadpool().

◆ TpAllocTimer()

NTSTATUS WINAPI TpAllocTimer ( TP_TIMER **  out,
PTP_TIMER_CALLBACK  callback,
PVOID  userdata,
TP_CALLBACK_ENVIRON environment 
)

Definition at line 2601 of file threadpool.c.

2603{
2604 struct threadpool_object *object;
2605 struct threadpool *pool;
2607
2608 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2609
2610 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2611 if (!object)
2612 return STATUS_NO_MEMORY;
2613
2614 status = tp_threadpool_lock( &pool, environment );
2615 if (status)
2616 {
2617 RtlFreeHeap( GetProcessHeap(), 0, object );
2618 return status;
2619 }
2620
2621 object->type = TP_OBJECT_TYPE_TIMER;
2622 object->u.timer.callback = callback;
2623
2624 status = tp_timerqueue_lock( object );
2625 if (status)
2626 {
2628 RtlFreeHeap( GetProcessHeap(), 0, object );
2629 return status;
2630 }
2631
2632 tp_object_initialize( object, pool, userdata, environment );
2633
2634 *out = (TP_TIMER *)object;
2635 return STATUS_SUCCESS;
2636}
static NTSTATUS tp_timerqueue_lock(struct threadpool_object *timer)
Definition: threadpool.c:1265

Referenced by CreateThreadpoolTimer(), and init_threadpool().

◆ TpAllocWait()

NTSTATUS WINAPI TpAllocWait ( TP_WAIT **  out,
PTP_WAIT_CALLBACK  callback,
PVOID  userdata,
TP_CALLBACK_ENVIRON environment 
)

Definition at line 2677 of file threadpool.c.

2679{
2680 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2681 return tp_alloc_wait( out, callback, userdata, environment, WT_EXECUTEONLYONCE );
2682}

Referenced by CreateThreadpoolWait(), and init_threadpool().

◆ TpAllocWork()

NTSTATUS WINAPI TpAllocWork ( TP_WORK **  out,
PTP_WORK_CALLBACK  callback,
PVOID  userdata,
TP_CALLBACK_ENVIRON environment 
)

Definition at line 2687 of file threadpool.c.

2689{
2690 struct threadpool_object *object;
2691 struct threadpool *pool;
2693
2694 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2695
2696 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2697 if (!object)
2698 return STATUS_NO_MEMORY;
2699
2700 status = tp_threadpool_lock( &pool, environment );
2701 if (status)
2702 {
2703 RtlFreeHeap( GetProcessHeap(), 0, object );
2704 return status;
2705 }
2706
2707 object->type = TP_OBJECT_TYPE_WORK;
2708 object->u.work.callback = callback;
2709 tp_object_initialize( object, pool, userdata, environment );
2710
2711 *out = (TP_WORK *)object;
2712 return STATUS_SUCCESS;
2713}

Referenced by CreateThreadpoolWork(), and init_threadpool().

◆ TpCallbackLeaveCriticalSectionOnCompletion()

VOID WINAPI TpCallbackLeaveCriticalSectionOnCompletion ( TP_CALLBACK_INSTANCE instance,
CRITICAL_SECTION crit 
)

Definition at line 2740 of file threadpool.c.

2741{
2743
2744 TRACE( "%p %p\n", instance, crit );
2745
2746 if (!this->cleanup.critical_section)
2747 this->cleanup.critical_section = crit;
2748}
static void cleanup(void)
Definition: main.c:1335
static struct threadpool_instance * impl_from_TP_CALLBACK_INSTANCE(TP_CALLBACK_INSTANCE *instance)
Definition: threadpool.c:425

◆ TpCallbackMayRunLong()

NTSTATUS WINAPI TpCallbackMayRunLong ( TP_CALLBACK_INSTANCE instance)

Definition at line 2753 of file threadpool.c.

2754{
2756 struct threadpool_object *object = this->object;
2757 struct threadpool *pool;
2759
2760 TRACE( "%p\n", instance );
2761
2762 if (this->threadid != GetCurrentThreadId())
2763 {
2764 ERR("called from wrong thread, ignoring\n");
2765 return STATUS_UNSUCCESSFUL; /* FIXME */
2766 }
2767
2768 if (this->may_run_long)
2769 return STATUS_SUCCESS;
2770
2771 pool = object->pool;
2773
2774 /* Start new worker threads if required. */
2775 if (pool->num_busy_workers >= pool->num_workers)
2776 {
2777 if (pool->num_workers < pool->max_workers)
2778 {
2780 }
2781 else
2782 {
2784 }
2785 }
2786
2788 this->may_run_long = TRUE;
2789 return status;
2790}
#define STATUS_TOO_MANY_THREADS
Definition: ntstatus.h:627

Referenced by CallbackMayRunLong(), and init_threadpool().

◆ TpCallbackReleaseMutexOnCompletion()

VOID WINAPI TpCallbackReleaseMutexOnCompletion ( TP_CALLBACK_INSTANCE instance,
HANDLE  mutex 
)

Definition at line 2795 of file threadpool.c.

2796{
2798
2799 TRACE( "%p %p\n", instance, mutex );
2800
2801 if (!this->cleanup.mutex)
2802 this->cleanup.mutex = mutex;
2803}
Definition: module.h:456

◆ TpCallbackReleaseSemaphoreOnCompletion()

VOID WINAPI TpCallbackReleaseSemaphoreOnCompletion ( TP_CALLBACK_INSTANCE instance,
HANDLE  semaphore,
DWORD  count 
)

Definition at line 2808 of file threadpool.c.

2809{
2811
2812 TRACE( "%p %p %lu\n", instance, semaphore, count );
2813
2814 if (!this->cleanup.semaphore)
2815 {
2816 this->cleanup.semaphore = semaphore;
2817 this->cleanup.semaphore_count = count;
2818 }
2819}
HANDLE semaphore
Definition: threadpool.c:1889

Referenced by init_threadpool().

◆ TpCallbackSetEventOnCompletion()

VOID WINAPI TpCallbackSetEventOnCompletion ( TP_CALLBACK_INSTANCE instance,
HANDLE  event 
)

Definition at line 2824 of file threadpool.c.

2825{
2827
2828 TRACE( "%p %p\n", instance, event );
2829
2830 if (!this->cleanup.event)
2831 this->cleanup.event = event;
2832}

◆ TpCallbackUnloadDllOnCompletion()

VOID WINAPI TpCallbackUnloadDllOnCompletion ( TP_CALLBACK_INSTANCE instance,
HMODULE  module 
)

Definition at line 2837 of file threadpool.c.

2838{
2840
2841 TRACE( "%p %p\n", instance, module );
2842
2843 if (!this->cleanup.library)
2844 this->cleanup.library = module;
2845}

◆ TpCancelAsyncIoOperation()

void WINAPI TpCancelAsyncIoOperation ( TP_IO io)

Definition at line 2718 of file threadpool.c.

2719{
2720 struct threadpool_object *this = impl_from_TP_IO( io );
2721
2722 TRACE( "%p\n", io );
2723
2724 RtlEnterCriticalSection( &this->pool->cs );
2725
2726 TRACE("pending_count %u.\n", this->u.io.pending_count);
2727
2728 this->u.io.pending_count--;
2729 if (object_is_finished( this, TRUE ))
2731 if (object_is_finished( this, FALSE ))
2733
2734 RtlLeaveCriticalSection( &this->pool->cs );
2735}
static struct threadpool_object * impl_from_TP_IO(TP_IO *io)
Definition: threadpool.c:413
RTL_CONDITION_VARIABLE finished_event
Definition: threadpool.c:197
RTL_CONDITION_VARIABLE group_finished_event
Definition: threadpool.c:198

Referenced by init_threadpool().

◆ TpDisassociateCallback()

VOID WINAPI TpDisassociateCallback ( TP_CALLBACK_INSTANCE instance)

Definition at line 2850 of file threadpool.c.

2851{
2853 struct threadpool_object *object = this->object;
2854 struct threadpool *pool;
2855
2856 TRACE( "%p\n", instance );
2857
2858 if (this->threadid != GetCurrentThreadId())
2859 {
2860 ERR("called from wrong thread, ignoring\n");
2861 return;
2862 }
2863
2864 if (!this->associated)
2865 return;
2866
2867 pool = object->pool;
2869
2870 object->num_associated_callbacks--;
2871 if (object_is_finished( object, FALSE ))
2872 RtlWakeAllConditionVariable( &object->finished_event );
2873
2875 this->associated = FALSE;
2876}

Referenced by init_threadpool().

◆ TpIsTimerSet()

BOOL WINAPI TpIsTimerSet ( TP_TIMER timer)

Definition at line 2881 of file threadpool.c.

2882{
2883 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2884
2885 TRACE( "%p\n", timer );
2886
2887 return this->u.timer.timer_set;
2888}
static struct threadpool_object * impl_from_TP_TIMER(TP_TIMER *timer)
Definition: threadpool.c:399

Referenced by init_threadpool().

◆ TpPostWork()

VOID WINAPI TpPostWork ( TP_WORK work)

Definition at line 2893 of file threadpool.c.

2894{
2895 struct threadpool_object *this = impl_from_TP_WORK( work );
2896
2897 TRACE( "%p\n", work );
2898
2899 tp_object_submit( this, FALSE );
2900}
static struct threadpool_object * impl_from_TP_WORK(TP_WORK *work)
Definition: threadpool.c:392

Referenced by init_threadpool().

◆ TpQueryPoolStackInformation()

NTSTATUS WINAPI TpQueryPoolStackInformation ( TP_POOL pool,
TP_POOL_STACK_INFORMATION stack_info 
)

Definition at line 3364 of file threadpool.c.

3365{
3366 struct threadpool *this = impl_from_TP_POOL( pool );
3367
3368 TRACE( "%p %p\n", pool, stack_info );
3369
3370 if (!stack_info)
3372
3373 RtlEnterCriticalSection( &this->cs );
3374 *stack_info = this->stack_info;
3375 RtlLeaveCriticalSection( &this->cs );
3376
3377 return STATUS_SUCCESS;
3378}
CRITICAL_SECTION cs
Definition: threadpool.c:285
static struct threadpool * impl_from_TP_POOL(TP_POOL *pool)
Definition: threadpool.c:387
TP_POOL_STACK_INFORMATION stack_info
Definition: threadpool.c:158

Referenced by init_threadpool(), and QueryThreadpoolStackInformation().

◆ TpReleaseCleanupGroup()

VOID WINAPI TpReleaseCleanupGroup ( TP_CLEANUP_GROUP group)

Definition at line 2905 of file threadpool.c.

2906{
2908
2909 TRACE( "%p\n", group );
2910
2911 tp_group_shutdown( this );
2912 tp_group_release( this );
2913}
static void tp_group_shutdown(struct threadpool_group *group)
Definition: threadpool.c:1992

Referenced by init_threadpool().

◆ TpReleaseCleanupGroupMembers()

VOID WINAPI TpReleaseCleanupGroupMembers ( TP_CLEANUP_GROUP group,
BOOL  cancel_pending,
PVOID  userdata 
)

Definition at line 2918 of file threadpool.c.

2919{
2921 struct threadpool_object *object, *next;
2922 struct list members;
2923
2924 TRACE( "%p %u %p\n", group, cancel_pending, userdata );
2925
2926 RtlEnterCriticalSection( &this->cs );
2927
2928 /* Unset group, increase references, and mark objects for shutdown */
2929 LIST_FOR_EACH_ENTRY_SAFE( object, next, &this->members, struct threadpool_object, group_entry )
2930 {
2931 assert( object->group == this );
2932 assert( object->is_group_member );
2933
2934 if (InterlockedIncrement( &object->refcount ) == 1)
2935 {
2936 /* Object is basically already destroyed, but group reference
2937 * was not deleted yet. We can safely ignore this object. */
2938 InterlockedDecrement( &object->refcount );
2939 list_remove( &object->group_entry );
2940 object->is_group_member = FALSE;
2941 continue;
2942 }
2943
2944 object->is_group_member = FALSE;
2946 }
2947
2948 /* Move members to a new temporary list */
2949 list_init( &members );
2950 list_move_tail( &members, &this->members );
2951
2952 RtlLeaveCriticalSection( &this->cs );
2953
2954 /* Cancel pending callbacks if requested */
2955 if (cancel_pending)
2956 {
2957 LIST_FOR_EACH_ENTRY( object, &members, struct threadpool_object, group_entry )
2958 {
2959 tp_object_cancel( object );
2960 }
2961 }
2962
2963 /* Wait for remaining callbacks to finish */
2964 LIST_FOR_EACH_ENTRY_SAFE( object, next, &members, struct threadpool_object, group_entry )
2965 {
2966 tp_object_wait( object, TRUE );
2967
2968 if (!object->shutdown)
2969 {
2970 /* Execute group cancellation callback if defined, and if this was actually a group cancel. */
2971 if (cancel_pending && object->group_cancel_callback)
2972 {
2973 TRACE( "executing group cancel callback %p(%p, %p)\n",
2974 object->group_cancel_callback, object->userdata, userdata );
2975 object->group_cancel_callback( object->userdata, userdata );
2976 TRACE( "callback %p returned\n", object->group_cancel_callback );
2977 }
2978
2979 if (object->type != TP_OBJECT_TYPE_SIMPLE)
2980 tp_object_release( object );
2981 }
2982
2983 object->shutdown = TRUE;
2984 tp_object_release( object );
2985 }
2986}
static void list_move_tail(struct list_head *list, struct list_head *head)
Definition: list.h:122
static void tp_object_wait(struct threadpool_object *object, BOOL group_wait)
Definition: threadpool.c:2203
static void tp_object_cancel(struct threadpool_object *object)
Definition: threadpool.c:2158

Referenced by init_threadpool().

◆ TpReleaseIoCompletion()

void WINAPI TpReleaseIoCompletion ( TP_IO io)

Definition at line 2991 of file threadpool.c.

2992{
2993 struct threadpool_object *this = impl_from_TP_IO( io );
2994 BOOL can_destroy;
2995
2996 TRACE( "%p\n", io );
2997
2998 RtlEnterCriticalSection( &this->pool->cs );
2999 this->u.io.shutting_down = TRUE;
3000 can_destroy = !this->u.io.pending_count && !this->u.io.skipped_count;
3001 RtlLeaveCriticalSection( &this->pool->cs );
3002
3003 if (can_destroy)
3004 {
3006 this->shutdown = TRUE;
3007 tp_object_release( this );
3008 }
3009}
INT WSAAPI shutdown(IN SOCKET s, IN INT how)
Definition: sockctrl.c:506

Referenced by init_threadpool().

◆ TpReleasePool()

VOID WINAPI TpReleasePool ( TP_POOL pool)

Definition at line 3014 of file threadpool.c.

3015{
3016 struct threadpool *this = impl_from_TP_POOL( pool );
3017
3018 TRACE( "%p\n", pool );
3019
3020 tp_threadpool_shutdown( this );
3021 tp_threadpool_release( this );
3022}

Referenced by init_threadpool().

◆ TpReleaseTimer()

VOID WINAPI TpReleaseTimer ( TP_TIMER timer)

Definition at line 3027 of file threadpool.c.

3028{
3029 struct threadpool_object *this = impl_from_TP_TIMER( timer );
3030
3031 TRACE( "%p\n", timer );
3032
3034 this->shutdown = TRUE;
3035 tp_object_release( this );
3036}

Referenced by init_threadpool().

◆ TpReleaseWait()

VOID WINAPI TpReleaseWait ( TP_WAIT wait)

Definition at line 3041 of file threadpool.c.

3042{
3043 struct threadpool_object *this = impl_from_TP_WAIT( wait );
3044
3045 TRACE( "%p\n", wait );
3046
3048 this->shutdown = TRUE;
3049 tp_object_release( this );
3050}

Referenced by init_threadpool(), and RtlDeregisterWaitEx().

◆ TpReleaseWork()

VOID WINAPI TpReleaseWork ( TP_WORK work)

Definition at line 3055 of file threadpool.c.

3056{
3057 struct threadpool_object *this = impl_from_TP_WORK( work );
3058
3059 TRACE( "%p\n", work );
3060
3062 this->shutdown = TRUE;
3063 tp_object_release( this );
3064}

Referenced by init_threadpool().

◆ TpSetPoolMaxThreads()

VOID WINAPI TpSetPoolMaxThreads ( TP_POOL pool,
DWORD  maximum 
)

Definition at line 3069 of file threadpool.c.

3070{
3071 struct threadpool *this = impl_from_TP_POOL( pool );
3072
3073 TRACE( "%p %lu\n", pool, maximum );
3074
3075 RtlEnterCriticalSection( &this->cs );
3076 this->max_workers = max( maximum, 1 );
3077 this->min_workers = min( this->min_workers, this->max_workers );
3078 RtlLeaveCriticalSection( &this->cs );
3079}
#define min(a, b)
Definition: monoChain.cc:55
int max_workers
Definition: threadpool.c:153
int min_workers
Definition: threadpool.c:154

Referenced by init_threadpool().

◆ TpSetPoolMinThreads()

BOOL WINAPI TpSetPoolMinThreads ( TP_POOL pool,
DWORD  minimum 
)

Definition at line 3084 of file threadpool.c.

3085{
3086 struct threadpool *this = impl_from_TP_POOL( pool );
3088
3089 TRACE( "%p %lu\n", pool, minimum );
3090
3091 RtlEnterCriticalSection( &this->cs );
3092
3093 while (this->num_workers < minimum)
3094 {
3095 status = tp_new_worker_thread( this );
3096 if (status != STATUS_SUCCESS)
3097 break;
3098 }
3099
3100 if (status == STATUS_SUCCESS)
3101 {
3102 this->min_workers = minimum;
3103 this->max_workers = max( this->min_workers, this->max_workers );
3104 }
3105
3106 RtlLeaveCriticalSection( &this->cs );
3107 return !status;
3108}
int num_workers
Definition: threadpool.c:155

◆ TpSetPoolStackInformation()

NTSTATUS WINAPI TpSetPoolStackInformation ( TP_POOL pool,
TP_POOL_STACK_INFORMATION stack_info 
)

Definition at line 3345 of file threadpool.c.

3346{
3347 struct threadpool *this = impl_from_TP_POOL( pool );
3348
3349 TRACE( "%p %p\n", pool, stack_info );
3350
3351 if (!stack_info)
3353
3354 RtlEnterCriticalSection( &this->cs );
3355 this->stack_info = *stack_info;
3356 RtlLeaveCriticalSection( &this->cs );
3357
3358 return STATUS_SUCCESS;
3359}

Referenced by init_threadpool(), and SetThreadpoolStackInformation().

◆ TpSetTimer()

VOID WINAPI TpSetTimer ( TP_TIMER timer,
LARGE_INTEGER timeout,
LONG  period,
LONG  window_length 
)

Definition at line 3113 of file threadpool.c.

3114{
3115 struct threadpool_object *this = impl_from_TP_TIMER( timer );
3116 struct threadpool_object *other_timer;
3117 BOOL submit_timer = FALSE;
3119
3120 TRACE( "%p %p %lu %lu\n", timer, timeout, period, window_length );
3121
3123
3124 assert( this->u.timer.timer_initialized );
3125 this->u.timer.timer_set = timeout != NULL;
3126
3127 /* Convert relative timeout to absolute timestamp and handle a timeout
3128 * of zero, which means that the timer is submitted immediately. */
3129 if (timeout)
3130 {
3131 timestamp = timeout->QuadPart;
3132 if ((LONGLONG)timestamp < 0)
3133 {
3136 timestamp = now.QuadPart - timestamp;
3137 }
3138 else if (!timestamp)
3139 {
3140 if (!period)
3141 timeout = NULL;
3142 else
3143 {
3146 timestamp = now.QuadPart + (ULONGLONG)period * 10000;
3147 }
3148 submit_timer = TRUE;
3149 }
3150 }
3151
3152 /* First remove existing timeout. */
3153 if (this->u.timer.timer_pending)
3154 {
3155 list_remove( &this->u.timer.timer_entry );
3156 this->u.timer.timer_pending = FALSE;
3157 }
3158
3159 /* If the timer was enabled, then add it back to the queue. */
3160 if (timeout)
3161 {
3162 this->u.timer.timeout = timestamp;
3163 this->u.timer.period = period;
3164 this->u.timer.window_length = window_length;
3165
3166 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
3167 struct threadpool_object, u.timer.timer_entry )
3168 {
3169 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
3170 if (this->u.timer.timeout < other_timer->u.timer.timeout)
3171 break;
3172 }
3173 list_add_before( &other_timer->u.timer.timer_entry, &this->u.timer.timer_entry );
3174
3175 /* Wake up the timer thread when the timeout has to be updated. */
3176 if (list_head( &timerqueue.pending_timers ) == &this->u.timer.timer_entry )
3177 RtlWakeAllConditionVariable( &timerqueue.update_event );
3178
3179 this->u.timer.timer_pending = TRUE;
3180 }
3181
3183
3184 if (submit_timer)
3185 tp_object_submit( this, FALSE );
3186}
int64_t LONGLONG
Definition: typedefs.h:68

Referenced by init_threadpool().

◆ TpSetWait()

VOID WINAPI TpSetWait ( TP_WAIT wait,
HANDLE  handle,
LARGE_INTEGER timeout 
)

Definition at line 3191 of file threadpool.c.

3192{
3193 struct threadpool_object *this = impl_from_TP_WAIT( wait );
3195
3196 TRACE( "%p %p %p\n", wait, handle, timeout );
3197
3199
3200 assert( this->u.wait.bucket );
3201 this->u.wait.handle = handle;
3202
3203 if (handle || this->u.wait.wait_pending)
3204 {
3205 struct waitqueue_bucket *bucket = this->u.wait.bucket;
3206 list_remove( &this->u.wait.wait_entry );
3207
3208 /* Convert relative timeout to absolute timestamp. */
3209 if (handle && timeout)
3210 {
3211 timestamp = timeout->QuadPart;
3212 if ((LONGLONG)timestamp < 0)
3213 {
3216 timestamp = now.QuadPart - timestamp;
3217 }
3218 }
3219
3220 /* Add wait object back into one of the queues. */
3221 if (handle)
3222 {
3223 list_add_tail( &bucket->waiting, &this->u.wait.wait_entry );
3224 this->u.wait.wait_pending = TRUE;
3225 this->u.wait.timeout = timestamp;
3226 }
3227 else
3228 {
3229 list_add_tail( &bucket->reserved, &this->u.wait.wait_entry );
3230 this->u.wait.wait_pending = FALSE;
3231 }
3232
3233 /* Wake up the wait queue thread. */
3234 NtSetEvent( bucket->update_event, NULL );
3235 }
3236
3238}

Referenced by init_threadpool(), RtlDeregisterWaitEx(), and RtlRegisterWait().

◆ TpSimpleTryPost()

NTSTATUS WINAPI TpSimpleTryPost ( PTP_SIMPLE_CALLBACK  callback,
PVOID  userdata,
TP_CALLBACK_ENVIRON environment 
)

Definition at line 3243 of file threadpool.c.

3245{
3246 struct threadpool_object *object;
3247 struct threadpool *pool;
3249
3250 TRACE( "%p %p %p\n", callback, userdata, environment );
3251
3252 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
3253 if (!object)
3254 return STATUS_NO_MEMORY;
3255
3256 status = tp_threadpool_lock( &pool, environment );
3257 if (status)
3258 {
3259 RtlFreeHeap( GetProcessHeap(), 0, object );
3260 return status;
3261 }
3262
3263 object->type = TP_OBJECT_TYPE_SIMPLE;
3264 object->u.simple.callback = callback;
3265 tp_object_initialize( object, pool, userdata, environment );
3266
3267 return STATUS_SUCCESS;
3268}

Referenced by init_threadpool(), RtlQueueWorkItem(), and TrySubmitThreadpoolCallback().

◆ TpStartAsyncIoOperation()

void WINAPI TpStartAsyncIoOperation ( TP_IO io)

Definition at line 3273 of file threadpool.c.

3274{
3275 struct threadpool_object *this = impl_from_TP_IO( io );
3276
3277 TRACE( "%p\n", io );
3278
3279 RtlEnterCriticalSection( &this->pool->cs );
3280
3281 this->u.io.pending_count++;
3282
3283 RtlLeaveCriticalSection( &this->pool->cs );
3284}

Referenced by init_threadpool().

◆ TpWaitForIoCompletion()

void WINAPI TpWaitForIoCompletion ( TP_IO io,
BOOL  cancel_pending 
)

Definition at line 3289 of file threadpool.c.

3290{
3291 struct threadpool_object *this = impl_from_TP_IO( io );
3292
3293 TRACE( "%p %d\n", io, cancel_pending );
3294
3295 if (cancel_pending)
3296 tp_object_cancel( this );
3297 tp_object_wait( this, FALSE );
3298}

Referenced by init_threadpool().

◆ TpWaitForTimer()

VOID WINAPI TpWaitForTimer ( TP_TIMER timer,
BOOL  cancel_pending 
)

Definition at line 3303 of file threadpool.c.

3304{
3305 struct threadpool_object *this = impl_from_TP_TIMER( timer );
3306
3307 TRACE( "%p %d\n", timer, cancel_pending );
3308
3309 if (cancel_pending)
3310 tp_object_cancel( this );
3311 tp_object_wait( this, FALSE );
3312}

Referenced by init_threadpool().

◆ TpWaitForWait()

VOID WINAPI TpWaitForWait ( TP_WAIT wait,
BOOL  cancel_pending 
)

Definition at line 3317 of file threadpool.c.

3318{
3319 struct threadpool_object *this = impl_from_TP_WAIT( wait );
3320
3321 TRACE( "%p %d\n", wait, cancel_pending );
3322
3323 if (cancel_pending)
3324 tp_object_cancel( this );
3325 tp_object_wait( this, FALSE );
3326}

Referenced by init_threadpool(), and RtlDeregisterWaitEx().

◆ TpWaitForWork()

VOID WINAPI TpWaitForWork ( TP_WORK work,
BOOL  cancel_pending 
)

Definition at line 3331 of file threadpool.c.

3332{
3333 struct threadpool_object *this = impl_from_TP_WORK( work );
3334
3335 TRACE( "%p %u\n", work, cancel_pending );
3336
3337 if (cancel_pending)
3338 tp_object_cancel( this );
3339 tp_object_wait( this, FALSE );
3340}

Referenced by init_threadpool().

◆ waitqueue_thread_proc()

static void CALLBACK waitqueue_thread_proc ( void param)
static

Definition at line 1339 of file threadpool.c.

1341{
1344 struct waitqueue_bucket *bucket = param;
1345 struct threadpool_object *wait, *next;
1347 DWORD num_handles;
1349
1350 TRACE( "starting wait queue thread\n" );
1351 set_thread_name(L"wine_threadpool_waitqueue");
1352
1354
1355 for (;;)
1356 {
1358 timeout.QuadPart = MAXLONGLONG;
1359 num_handles = 0;
1360
1362 u.wait.wait_entry )
1363 {
1364 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1365 if (wait->u.wait.timeout <= now.QuadPart)
1366 {
1367 /* Wait object timed out. */
1368 if ((wait->u.wait.flags & WT_EXECUTEONLYONCE))
1369 {
1370 list_remove( &wait->u.wait.wait_entry );
1371 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1372 }
1373 if ((wait->u.wait.flags & (WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD)))
1374 {
1375 InterlockedIncrement( &wait->refcount );
1376 wait->num_pending_callbacks++;
1377 RtlEnterCriticalSection( &wait->pool->cs );
1379 RtlLeaveCriticalSection( &wait->pool->cs );
1381 }
1382 else tp_object_submit( wait, FALSE );
1383 }
1384 else
1385 {
1386 if (wait->u.wait.timeout < timeout.QuadPart)
1387 timeout.QuadPart = wait->u.wait.timeout;
1388
1389 assert( num_handles < MAXIMUM_WAITQUEUE_OBJECTS );
1390 InterlockedIncrement( &wait->refcount );
1391 objects[num_handles] = wait;
1392 handles[num_handles] = wait->u.wait.handle;
1393 num_handles++;
1394 }
1395 }
1396
1397 if (!bucket->objcount)
1398 {
1399 /* All wait objects have been destroyed, if no new wait objects are created
1400 * within some amount of time, then we can shutdown this thread. */
1401 assert( num_handles == 0 );
1403 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
1404 status = NtWaitForMultipleObjects( 1, &bucket->update_event, TRUE, bucket->alertable, &timeout );
1406
1407 if (status == STATUS_TIMEOUT && !bucket->objcount)
1408 break;
1409 }
1410 else
1411 {
1412 handles[num_handles] = bucket->update_event;
1414 status = NtWaitForMultipleObjects( num_handles + 1, handles, TRUE, bucket->alertable, &timeout );
1416
1417 if (status >= STATUS_WAIT_0 && status < STATUS_WAIT_0 + num_handles)
1418 {
1420 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1421 if (wait->u.wait.bucket)
1422 {
1423 /* Wait object signaled. */
1424 assert( wait->u.wait.bucket == bucket );
1425 if ((wait->u.wait.flags & WT_EXECUTEONLYONCE))
1426 {
1427 list_remove( &wait->u.wait.wait_entry );
1428 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1429 }
1430 if ((wait->u.wait.flags & (WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD)))
1431 {
1432 wait->u.wait.signaled++;
1433 wait->num_pending_callbacks++;
1434 RtlEnterCriticalSection( &wait->pool->cs );
1436 RtlLeaveCriticalSection( &wait->pool->cs );
1437 }
1438 else tp_object_submit( wait, TRUE );
1439 }
1440 else
1441 WARN("wait object %p triggered while object was destroyed\n", wait);
1442 }
1443
1444 /* Release temporary references to wait objects. */
1445 while (num_handles)
1446 {
1447 wait = objects[--num_handles];
1448 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1450 }
1451 }
1452
1453 /* Try to merge bucket with other threads. */
1454 if (waitqueue.num_buckets > 1 && bucket->objcount &&
1455 bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 1 / 3)
1456 {
1457 struct waitqueue_bucket *other_bucket;
1458 LIST_FOR_EACH_ENTRY( other_bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
1459 {
1460 if (other_bucket != bucket && other_bucket->objcount && other_bucket->alertable == bucket->alertable &&
1461 other_bucket->objcount + bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 2 / 3)
1462 {
1463 other_bucket->objcount += bucket->objcount;
1464 bucket->objcount = 0;
1465
1466 /* Update reserved list. */
1467 LIST_FOR_EACH_ENTRY( wait, &bucket->reserved, struct threadpool_object, u.wait.wait_entry )
1468 {
1469 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1470 wait->u.wait.bucket = other_bucket;
1471 }
1472 list_move_tail( &other_bucket->reserved, &bucket->reserved );
1473
1474 /* Update waiting list. */
1475 LIST_FOR_EACH_ENTRY( wait, &bucket->waiting, struct threadpool_object, u.wait.wait_entry )
1476 {
1477 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1478 wait->u.wait.bucket = other_bucket;
1479 }
1480 list_move_tail( &other_bucket->waiting, &bucket->waiting );
1481
1482 /* Move bucket to the end, to keep the probability of
1483 * newly added wait objects as small as possible. */
1484 list_remove( &bucket->bucket_entry );
1485 list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
1486
1487 NtSetEvent( other_bucket->update_event, NULL );
1488 break;
1489 }
1490 }
1491 }
1492 }
1493
1494 /* Remove this bucket from the list. */
1495 list_remove( &bucket->bucket_entry );
1496 if (!--waitqueue.num_buckets)
1497 assert( list_empty( &waitqueue.buckets ) );
1498
1500
1501 TRACE( "terminating wait queue thread\n" );
1502
1503 assert( bucket->objcount == 0 );
1504 assert( list_empty( &bucket->reserved ) );
1505 assert( list_empty( &bucket->waiting ) );
1506 NtClose( bucket->update_event );
1507
1508 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1509 RtlExitUserThread( 0 );
1510}
#define WARN(fmt,...)
Definition: precomp.h:61
static const CLSID * objects[]
Definition: apphelp.c:112
NTSTATUS NTAPI NtWaitForMultipleObjects(IN ULONG ObjectCount, IN PHANDLE HandleArray, IN WAIT_TYPE WaitType, IN BOOLEAN Alertable, IN PLARGE_INTEGER TimeOut OPTIONAL)
Definition: obwait.c:46
struct waitqueue_bucket * bucket
Definition: threadpool.c:231
static EFI_HANDLE * handles
Definition: uefidisk.c:62

Referenced by tp_waitqueue_lock().

◆ WINE_DEFAULT_DEBUG_CHANNEL()

WINE_DEFAULT_DEBUG_CHANNEL ( threadpool  )

Variable Documentation

◆ buckets

struct list buckets

Definition at line 324 of file threadpool.c.

Referenced by add_hash(), delete_hash_entry(), hash_lookup(), and new_hash().

◆ compl_port

HANDLE compl_port

Definition at line 88 of file threadpool.c.

◆ critsect_compl_debug

static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug
static
Initial value:
=
{
0, 0, &old_threadpool.threadpool_compl_cs,
0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_compl_cs") }
}
static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug
Definition: threadpool.c:83

Definition at line 83 of file threadpool.c.

◆ cs

◆ default_threadpool

struct threadpool* default_threadpool = NULL
static

Definition at line 439 of file threadpool.c.

Referenced by tp_threadpool_lock(), and tp_threadpool_shutdown().

◆ 

struct { ... } ioqueue
Initial value:
=
{
.cs = { &ioqueue_debug, -1, 0, 0, 0, 0 },
}
static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug
Definition: threadpool.c:358

Referenced by ioqueue_thread_proc(), tp_ioqueue_lock(), and tp_ioqueue_unlock().

◆ ioqueue_debug

static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug
static
Initial value:
=
{
0, 0, &ioqueue.cs,
0, 0, { (DWORD_PTR)(__FILE__ ": ioqueue.cs") }
}

Definition at line 358 of file threadpool.c.

◆ num_buckets

LONG num_buckets

Definition at line 323 of file threadpool.c.

Referenced by hash_table_hash(), and hash_table_init().

◆ objcount

LONG objcount

Definition at line 286 of file threadpool.c.

◆ 

struct { ... } old_threadpool
Initial value:
=
{
NULL,
{ &critsect_compl_debug, -1, 0, 0, 0, 0 },
}

Referenced by RtlSetIoCompletionCallback().

◆ pending_timers

struct list pending_timers

Definition at line 288 of file threadpool.c.

◆ port

HANDLE port

Definition at line 366 of file threadpool.c.

◆ thread_running

BOOL thread_running

Definition at line 287 of file threadpool.c.

◆ threadpool_compl_cs

RTL_CRITICAL_SECTION threadpool_compl_cs

Definition at line 89 of file threadpool.c.

◆ 

struct { ... } timerqueue
Initial value:
=
{
{ &timerqueue_debug, -1, 0, 0, 0, 0 },
0,
LIST_INIT( timerqueue.pending_timers ),
}
static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug
Definition: threadpool.c:280
#define LIST_INIT(head)
Definition: queue.h:197
#define RTL_CONDITION_VARIABLE_INIT
Definition: rtltypes.h:282

Referenced by timerqueue_thread_proc(), tp_timerqueue_lock(), tp_timerqueue_unlock(), and TpSetTimer().

◆ timerqueue_debug

static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug
static
Initial value:
=
{
0, 0, &timerqueue.cs,
0, 0, { (DWORD_PTR)(__FILE__ ": timerqueue.cs") }
}

Definition at line 280 of file threadpool.c.

◆ update_event

RTL_CONDITION_VARIABLE update_event

Definition at line 289 of file threadpool.c.

◆ 

struct { ... } waitqueue
Initial value:
=
{
{ &waitqueue_debug, -1, 0, 0, 0, 0 },
0,
}
static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug
Definition: threadpool.c:317

Referenced by RtlRegisterWait(), tp_object_execute(), tp_waitqueue_lock(), tp_waitqueue_unlock(), TpSetWait(), and waitqueue_thread_proc().

◆ waitqueue_debug

static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug
static
Initial value:
=
{
0, 0, &waitqueue.cs,
0, 0, { (DWORD_PTR)(__FILE__ ": waitqueue.cs") }
}

Definition at line 317 of file threadpool.c.