ReactOS 0.4.16-dev-1505-g12fa72a
threadpool.c File Reference
#include <assert.h>
#include <stdarg.h>
#include <limits.h>
#include "ntstatus.h"
#include "winternl.h"
#include "wine/debug.h"
#include "wine/list.h"
#include "ntdll_misc.h"
Include dependency graph for threadpool.c:

Go to the source code of this file.

Classes

struct  rtl_work_item
 
struct  queue_timer
 
struct  timer_queue
 
struct  threadpool
 
struct  io_completion
 
struct  threadpool_object
 
struct  threadpool_instance
 
struct  threadpool_group
 
struct  waitqueue_bucket
 

Macros

#define WIN32_NO_STATUS
 
#define EXPIRE_NEVER   (~(ULONGLONG)0)
 
#define TIMER_QUEUE_MAGIC   0x516d6954 /* TimQ */
 
#define THREADPOOL_WORKER_TIMEOUT   5000
 
#define MAXIMUM_WAITQUEUE_OBJECTS   (MAXIMUM_WAIT_OBJECTS - 1)
 

Enumerations

enum  threadpool_objtype {
  TP_OBJECT_TYPE_SIMPLE , TP_OBJECT_TYPE_WORK , TP_OBJECT_TYPE_TIMER , TP_OBJECT_TYPE_WAIT ,
  TP_OBJECT_TYPE_IO
}
 

Functions

 WINE_DEFAULT_DEBUG_CHANNEL (threadpool)
 
static struct threadpoolimpl_from_TP_POOL (TP_POOL *pool)
 
static struct threadpool_objectimpl_from_TP_WORK (TP_WORK *work)
 
static struct threadpool_objectimpl_from_TP_TIMER (TP_TIMER *timer)
 
static struct threadpool_objectimpl_from_TP_WAIT (TP_WAIT *wait)
 
static struct threadpool_objectimpl_from_TP_IO (TP_IO *io)
 
static struct threadpool_groupimpl_from_TP_CLEANUP_GROUP (TP_CLEANUP_GROUP *group)
 
static struct threadpool_instanceimpl_from_TP_CALLBACK_INSTANCE (TP_CALLBACK_INSTANCE *instance)
 
static void CALLBACK threadpool_worker_proc (void *param)
 
static void tp_object_submit (struct threadpool_object *object, BOOL signaled)
 
static void tp_object_execute (struct threadpool_object *object, BOOL wait_thread)
 
static void tp_object_prepare_shutdown (struct threadpool_object *object)
 
static BOOL tp_object_release (struct threadpool_object *object)
 
static BOOL array_reserve (void **elements, unsigned int *capacity, unsigned int count, unsigned int size)
 
static void set_thread_name (const WCHAR *name)
 
static void CALLBACK process_rtl_work_item (TP_CALLBACK_INSTANCE *instance, void *userdata)
 
NTSTATUS WINAPI RtlQueueWorkItem (PRTL_WORK_ITEM_ROUTINE function, PVOID context, ULONG flags)
 
static DWORD CALLBACK iocp_poller (LPVOID Arg)
 
NTSTATUS WINAPI RtlSetIoCompletionCallback (HANDLE FileHandle, PRTL_OVERLAPPED_COMPLETION_ROUTINE Function, ULONG Flags)
 
static PLARGE_INTEGER get_nt_timeout (PLARGE_INTEGER pTime, ULONG timeout)
 
static void queue_remove_timer (struct queue_timer *t)
 
static void timer_cleanup_callback (struct queue_timer *t)
 
static DWORD WINAPI timer_callback_wrapper (LPVOID p)
 
static ULONGLONG queue_current_time (void)
 
static void queue_add_timer (struct queue_timer *t, ULONGLONG time, BOOL set_event)
 
static void queue_move_timer (struct queue_timer *t, ULONGLONG time, BOOL set_event)
 
static void queue_timer_expire (struct timer_queue *q)
 
static ULONG queue_get_timeout (struct timer_queue *q)
 
static void WINAPI timer_queue_thread_proc (LPVOID p)
 
static void queue_destroy_timer (struct queue_timer *t)
 
NTSTATUS WINAPI RtlCreateTimerQueue (PHANDLE NewTimerQueue)
 
NTSTATUS WINAPI RtlDeleteTimerQueueEx (HANDLE TimerQueue, HANDLE CompletionEvent)
 
static struct timer_queueget_timer_queue (HANDLE TimerQueue)
 
NTSTATUS WINAPI RtlCreateTimer (HANDLE TimerQueue, HANDLE *NewTimer, RTL_WAITORTIMERCALLBACKFUNC Callback, PVOID Parameter, DWORD DueTime, DWORD Period, ULONG Flags)
 
NTSTATUS WINAPI RtlUpdateTimer (HANDLE TimerQueue, HANDLE Timer, DWORD DueTime, DWORD Period)
 
NTSTATUS WINAPI RtlDeleteTimer (HANDLE TimerQueue, HANDLE Timer, HANDLE CompletionEvent)
 
static void CALLBACK timerqueue_thread_proc (void *param)
 
static NTSTATUS tp_new_worker_thread (struct threadpool *pool)
 
static NTSTATUS tp_timerqueue_lock (struct threadpool_object *timer)
 
static void tp_timerqueue_unlock (struct threadpool_object *timer)
 
static void CALLBACK waitqueue_thread_proc (void *param)
 
static NTSTATUS tp_waitqueue_lock (struct threadpool_object *wait)
 
static void tp_waitqueue_unlock (struct threadpool_object *wait)
 
static void CALLBACK ioqueue_thread_proc (void *param)
 
static NTSTATUS tp_ioqueue_lock (struct threadpool_object *io, HANDLE file)
 
static NTSTATUS tp_threadpool_alloc (struct threadpool **out)
 
static void tp_threadpool_shutdown (struct threadpool *pool)
 
static BOOL tp_threadpool_release (struct threadpool *pool)
 
static NTSTATUS tp_threadpool_lock (struct threadpool **out, TP_CALLBACK_ENVIRON *environment)
 
static void tp_threadpool_unlock (struct threadpool *pool)
 
static NTSTATUS tp_group_alloc (struct threadpool_group **out)
 
static void tp_group_shutdown (struct threadpool_group *group)
 
static BOOL tp_group_release (struct threadpool_group *group)
 
static void tp_object_initialize (struct threadpool_object *object, struct threadpool *pool, PVOID userdata, TP_CALLBACK_ENVIRON *environment)
 
static void tp_object_prio_queue (struct threadpool_object *object)
 
static void tp_object_cancel (struct threadpool_object *object)
 
static BOOL object_is_finished (struct threadpool_object *object, BOOL group)
 
static void tp_object_wait (struct threadpool_object *object, BOOL group_wait)
 
static void tp_ioqueue_unlock (struct threadpool_object *io)
 
static struct listthreadpool_get_next_item (const struct threadpool *pool)
 
NTSTATUS WINAPI TpAllocCleanupGroup (TP_CLEANUP_GROUP **out)
 
NTSTATUS WINAPI TpAllocIoCompletion (TP_IO **out, HANDLE file, PTP_IO_CALLBACK callback, void *userdata, TP_CALLBACK_ENVIRON *environment)
 
NTSTATUS WINAPI TpAllocPool (TP_POOL **out, PVOID reserved)
 
NTSTATUS WINAPI TpAllocTimer (TP_TIMER **out, PTP_TIMER_CALLBACK callback, PVOID userdata, TP_CALLBACK_ENVIRON *environment)
 
static NTSTATUS tp_alloc_wait (TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata, TP_CALLBACK_ENVIRON *environment, DWORD flags)
 
NTSTATUS WINAPI TpAllocWait (TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata, TP_CALLBACK_ENVIRON *environment)
 
NTSTATUS WINAPI TpAllocWork (TP_WORK **out, PTP_WORK_CALLBACK callback, PVOID userdata, TP_CALLBACK_ENVIRON *environment)
 
void WINAPI TpCancelAsyncIoOperation (TP_IO *io)
 
VOID WINAPI TpCallbackLeaveCriticalSectionOnCompletion (TP_CALLBACK_INSTANCE *instance, CRITICAL_SECTION *crit)
 
NTSTATUS WINAPI TpCallbackMayRunLong (TP_CALLBACK_INSTANCE *instance)
 
VOID WINAPI TpCallbackReleaseMutexOnCompletion (TP_CALLBACK_INSTANCE *instance, HANDLE mutex)
 
VOID WINAPI TpCallbackReleaseSemaphoreOnCompletion (TP_CALLBACK_INSTANCE *instance, HANDLE semaphore, DWORD count)
 
VOID WINAPI TpCallbackSetEventOnCompletion (TP_CALLBACK_INSTANCE *instance, HANDLE event)
 
VOID WINAPI TpCallbackUnloadDllOnCompletion (TP_CALLBACK_INSTANCE *instance, HMODULE module)
 
VOID WINAPI TpDisassociateCallback (TP_CALLBACK_INSTANCE *instance)
 
BOOL WINAPI TpIsTimerSet (TP_TIMER *timer)
 
VOID WINAPI TpPostWork (TP_WORK *work)
 
VOID WINAPI TpReleaseCleanupGroup (TP_CLEANUP_GROUP *group)
 
VOID WINAPI TpReleaseCleanupGroupMembers (TP_CLEANUP_GROUP *group, BOOL cancel_pending, PVOID userdata)
 
void WINAPI TpReleaseIoCompletion (TP_IO *io)
 
VOID WINAPI TpReleasePool (TP_POOL *pool)
 
VOID WINAPI TpReleaseTimer (TP_TIMER *timer)
 
VOID WINAPI TpReleaseWait (TP_WAIT *wait)
 
VOID WINAPI TpReleaseWork (TP_WORK *work)
 
VOID WINAPI TpSetPoolMaxThreads (TP_POOL *pool, DWORD maximum)
 
BOOL WINAPI TpSetPoolMinThreads (TP_POOL *pool, DWORD minimum)
 
VOID WINAPI TpSetTimer (TP_TIMER *timer, LARGE_INTEGER *timeout, LONG period, LONG window_length)
 
VOID WINAPI TpSetWait (TP_WAIT *wait, HANDLE handle, LARGE_INTEGER *timeout)
 
NTSTATUS WINAPI TpSimpleTryPost (PTP_SIMPLE_CALLBACK callback, PVOID userdata, TP_CALLBACK_ENVIRON *environment)
 
void WINAPI TpStartAsyncIoOperation (TP_IO *io)
 
void WINAPI TpWaitForIoCompletion (TP_IO *io, BOOL cancel_pending)
 
VOID WINAPI TpWaitForTimer (TP_TIMER *timer, BOOL cancel_pending)
 
VOID WINAPI TpWaitForWait (TP_WAIT *wait, BOOL cancel_pending)
 
VOID WINAPI TpWaitForWork (TP_WORK *work, BOOL cancel_pending)
 
NTSTATUS WINAPI TpSetPoolStackInformation (TP_POOL *pool, TP_POOL_STACK_INFORMATION *stack_info)
 
NTSTATUS WINAPI TpQueryPoolStackInformation (TP_POOL *pool, TP_POOL_STACK_INFORMATION *stack_info)
 
static void CALLBACK rtl_wait_callback (TP_CALLBACK_INSTANCE *instance, void *userdata, TP_WAIT *wait, TP_WAIT_RESULT result)
 
NTSTATUS WINAPI RtlRegisterWait (HANDLE *out, HANDLE handle, RTL_WAITORTIMERCALLBACKFUNC callback, void *context, ULONG milliseconds, ULONG flags)
 
NTSTATUS WINAPI RtlDeregisterWaitEx (HANDLE handle, HANDLE event)
 
NTSTATUS WINAPI RtlDeregisterWait (HANDLE WaitHandle)
 

Variables

static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug
 
struct {
   HANDLE   compl_port
 
   RTL_CRITICAL_SECTION   threadpool_compl_cs
 
old_threadpool
 
static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug
 
struct {
   CRITICAL_SECTION   cs
 
   LONG   objcount
 
   BOOL   thread_running
 
   struct list   pending_timers
 
   RTL_CONDITION_VARIABLE   update_event
 
timerqueue
 
static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug
 
struct {
   CRITICAL_SECTION   cs
 
   LONG   num_buckets
 
   struct list   buckets
 
waitqueue
 
static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug
 
struct {
   CRITICAL_SECTION   cs
 
   LONG   objcount
 
   BOOL   thread_running
 
   HANDLE   port
 
   RTL_CONDITION_VARIABLE   update_event
 
ioqueue
 
static struct threadpooldefault_threadpool = NULL
 

Macro Definition Documentation

◆ EXPIRE_NEVER

#define EXPIRE_NEVER   (~(ULONGLONG)0)

Definition at line 84 of file threadpool.c.

◆ MAXIMUM_WAITQUEUE_OBJECTS

#define MAXIMUM_WAITQUEUE_OBJECTS   (MAXIMUM_WAIT_OBJECTS - 1)

Definition at line 145 of file threadpool.c.

◆ THREADPOOL_WORKER_TIMEOUT

#define THREADPOOL_WORKER_TIMEOUT   5000

Definition at line 144 of file threadpool.c.

◆ TIMER_QUEUE_MAGIC

#define TIMER_QUEUE_MAGIC   0x516d6954 /* TimQ */

Definition at line 85 of file threadpool.c.

◆ WIN32_NO_STATUS

#define WIN32_NO_STATUS

Definition at line 63 of file threadpool.c.

Enumeration Type Documentation

◆ threadpool_objtype

Enumerator
TP_OBJECT_TYPE_SIMPLE 
TP_OBJECT_TYPE_WORK 
TP_OBJECT_TYPE_TIMER 
TP_OBJECT_TYPE_WAIT 
TP_OBJECT_TYPE_IO 

Definition at line 166 of file threadpool.c.

167{
173};
@ TP_OBJECT_TYPE_SIMPLE
Definition: threadpool.c:168
@ TP_OBJECT_TYPE_WORK
Definition: threadpool.c:169
@ TP_OBJECT_TYPE_TIMER
Definition: threadpool.c:170
@ TP_OBJECT_TYPE_IO
Definition: threadpool.c:172
@ TP_OBJECT_TYPE_WAIT
Definition: threadpool.c:171

Function Documentation

◆ array_reserve()

static BOOL array_reserve ( void **  elements,
unsigned int capacity,
unsigned int  count,
unsigned int  size 
)
static

Definition at line 446 of file threadpool.c.

447{
448 unsigned int new_capacity, max_capacity;
449 void *new_elements;
450
451 if (count <= *capacity)
452 return TRUE;
453
454 max_capacity = ~(SIZE_T)0 / size;
455 if (count > max_capacity)
456 return FALSE;
457
458 new_capacity = max(4, *capacity);
459 while (new_capacity < count && new_capacity <= max_capacity / 2)
460 new_capacity *= 2;
461 if (new_capacity < count)
462 new_capacity = max_capacity;
463
464 if (!(new_elements = RtlReAllocateHeap( GetProcessHeap(), 0, *elements, new_capacity * size )))
465 return FALSE;
466
467 *elements = new_elements;
468 *capacity = new_capacity;
469
470 return TRUE;
471}
#define TRUE
Definition: types.h:120
#define FALSE
Definition: types.h:117
#define GetProcessHeap()
Definition: compat.h:736
GLuint GLuint GLsizei count
Definition: gl.h:1545
GLsizeiptr size
Definition: glext.h:5919
NTSYSAPI PVOID WINAPI RtlReAllocateHeap(HANDLE, ULONG, PVOID, SIZE_T) __WINE_ALLOC_SIZE(4) __WINE_DEALLOC(RtlFreeHeap
#define max(a, b)
Definition: svc.c:63
ULONG_PTR SIZE_T
Definition: typedefs.h:80

Referenced by ioqueue_thread_proc().

◆ get_nt_timeout()

static PLARGE_INTEGER get_nt_timeout ( PLARGE_INTEGER  pTime,
ULONG  timeout 
)
inlinestatic

Definition at line 632 of file threadpool.c.

633{
634 if (timeout == INFINITE) return NULL;
635 pTime->QuadPart = (ULONGLONG)timeout * -10000;
636 return pTime;
637}
#define NULL
Definition: types.h:112
#define INFINITE
Definition: serial.h:102
Definition: dhcpd.h:245
uint64_t ULONGLONG
Definition: typedefs.h:67
LONGLONG QuadPart
Definition: typedefs.h:114

Referenced by RtlRegisterWait(), and timer_queue_thread_proc().

◆ get_timer_queue()

static struct timer_queue * get_timer_queue ( HANDLE  TimerQueue)
static

Definition at line 959 of file threadpool.c.

960{
961 static struct timer_queue *default_timer_queue;
962
963 if (TimerQueue)
964 return TimerQueue;
965 else
966 {
967 if (!default_timer_queue)
968 {
969 HANDLE q;
971 if (status == STATUS_SUCCESS)
972 {
973 PVOID p = InterlockedCompareExchangePointer( (void **) &default_timer_queue, q, NULL );
974 if (p)
975 /* Got beat to the punch. */
977 }
978 }
979 return default_timer_queue;
980 }
981}
LONG NTSTATUS
Definition: precomp.h:26
GLdouble GLdouble GLdouble GLdouble q
Definition: gl.h:2063
GLfloat GLfloat p
Definition: glext.h:8902
#define InterlockedCompareExchangePointer
Definition: interlocked.h:144
NTSTATUS WINAPI RtlCreateTimerQueue(PHANDLE NewTimerQueue)
Definition: threadpool.c:869
NTSTATUS WINAPI RtlDeleteTimerQueueEx(HANDLE TimerQueue, HANDLE CompletionEvent)
Definition: threadpool.c:915
#define STATUS_SUCCESS
Definition: shellext.h:65
Definition: ps.c:97

Referenced by RtlCreateTimer().

◆ impl_from_TP_CALLBACK_INSTANCE()

◆ impl_from_TP_CLEANUP_GROUP()

static struct threadpool_group * impl_from_TP_CLEANUP_GROUP ( TP_CLEANUP_GROUP group)
inlinestatic

Definition at line 425 of file threadpool.c.

426{
427 return (struct threadpool_group *)group;
428}
GLboolean GLuint group
Definition: glext.h:11120

Referenced by tp_object_initialize(), TpReleaseCleanupGroup(), and TpReleaseCleanupGroupMembers().

◆ impl_from_TP_IO()

static struct threadpool_object * impl_from_TP_IO ( TP_IO io)
inlinestatic

Definition at line 418 of file threadpool.c.

419{
420 struct threadpool_object *object = (struct threadpool_object *)io;
421 assert( object->type == TP_OBJECT_TYPE_IO );
422 return object;
423}
#define assert(x)
Definition: debug.h:53
static HANDLE PIO_APC_ROUTINE PVOID PIO_STATUS_BLOCK io
Definition: file.c:100

Referenced by TpCancelAsyncIoOperation(), TpReleaseIoCompletion(), TpStartAsyncIoOperation(), and TpWaitForIoCompletion().

◆ impl_from_TP_POOL()

static struct threadpool * impl_from_TP_POOL ( TP_POOL pool)
inlinestatic

◆ impl_from_TP_TIMER()

static struct threadpool_object * impl_from_TP_TIMER ( TP_TIMER timer)
inlinestatic

Definition at line 404 of file threadpool.c.

405{
406 struct threadpool_object *object = (struct threadpool_object *)timer;
408 return object;
409}
struct threadpool_object::@5262::@5265 timer

Referenced by TpIsTimerSet(), TpReleaseTimer(), TpSetTimer(), and TpWaitForTimer().

◆ impl_from_TP_WAIT()

static struct threadpool_object * impl_from_TP_WAIT ( TP_WAIT wait)
inlinestatic

Definition at line 411 of file threadpool.c.

412{
413 struct threadpool_object *object = (struct threadpool_object *)wait;
415 return object;
416}
struct threadpool_object::@5262::@5266 wait

Referenced by rtl_wait_callback(), RtlRegisterWait(), TpReleaseWait(), TpSetWait(), and TpWaitForWait().

◆ impl_from_TP_WORK()

static struct threadpool_object * impl_from_TP_WORK ( TP_WORK work)
inlinestatic

Definition at line 397 of file threadpool.c.

398{
399 struct threadpool_object *object = (struct threadpool_object *)work;
401 return object;
402}
struct threadpool_object::@5262::@5264 work

Referenced by TpPostWork(), TpReleaseWork(), and TpWaitForWork().

◆ iocp_poller()

static DWORD CALLBACK iocp_poller ( LPVOID  Arg)
static

Definition at line 544 of file threadpool.c.

545{
546 HANDLE cport = Arg;
547
548 while( TRUE )
549 {
553#ifdef __REACTOS__
555#else
557#endif
558 if (res)
559 {
560 ERR("NtRemoveIoCompletion failed: 0x%lx\n", res);
561 }
562 else
563 {
564 DWORD transferred = 0;
565 DWORD err = 0;
566
568 transferred = iosb.Information;
569 else
571
572 callback( err, transferred, overlapped );
573 }
574 }
575 return 0;
576}
#define ERR(fmt,...)
Definition: precomp.h:57
unsigned long DWORD
Definition: ntddk_ex.h:95
GLuint res
Definition: glext.h:9613
VOID(CALLBACK * PRTL_OVERLAPPED_COMPLETION_ROUTINE)(DWORD, DWORD, LPVOID)
Definition: winternl.h:3619
NTSYSAPI ULONG WINAPI RtlNtStatusToDosError(NTSTATUS)
NTSTATUS NTAPI NtRemoveIoCompletion(IN HANDLE IoCompletionHandle, OUT PVOID *KeyContext, OUT PVOID *ApcContext, OUT PIO_STATUS_BLOCK IoStatusBlock, IN PLARGE_INTEGER Timeout OPTIONAL)
Definition: iocomp.c:445
static IPrintDialogCallback callback
Definition: printdlg.c:326
static PIO_STATUS_BLOCK iosb
Definition: file.c:98
#define err(...)
namespace GUID const ADDRINFOEXW ADDRINFOEXW struct timeval OVERLAPPED * overlapped
Definition: sock.c:81
uint32_t * PULONG_PTR
Definition: typedefs.h:65

Referenced by RtlSetIoCompletionCallback().

◆ ioqueue_thread_proc()

static void CALLBACK ioqueue_thread_proc ( void param)
static

Definition at line 1620 of file threadpool.c.

1622{
1623 struct io_completion *completion;
1624 struct threadpool_object *io;
1626#ifdef __REACTOS__
1627 PVOID key, value;
1628#else
1630#endif
1631 BOOL destroy, skip;
1633
1634 TRACE( "starting I/O completion thread\n" );
1635 set_thread_name(L"wine_threadpool_ioqueue");
1636
1638
1639 for (;;)
1640 {
1642 if ((status = NtRemoveIoCompletion( ioqueue.port, &key, &value, &iosb, NULL )))
1643 ERR("NtRemoveIoCompletion failed, status %#lx.\n", status);
1645
1646 destroy = skip = FALSE;
1647 io = (struct threadpool_object *)key;
1648
1649 TRACE( "io %p, iosb.Status %#lx.\n", io, iosb.Status );
1650
1651 if (io && (io->shutdown || io->u.io.shutting_down))
1652 {
1653 RtlEnterCriticalSection( &io->pool->cs );
1654 if (!io->u.io.pending_count)
1655 {
1656 if (io->u.io.skipped_count)
1657 --io->u.io.skipped_count;
1658
1659 if (io->u.io.skipped_count)
1660 skip = TRUE;
1661 else
1662 destroy = TRUE;
1663 }
1664 RtlLeaveCriticalSection( &io->pool->cs );
1665 if (skip) continue;
1666 }
1667
1668 if (destroy)
1669 {
1670 --ioqueue.objcount;
1671 TRACE( "Releasing io %p.\n", io );
1672 io->shutdown = TRUE;
1674 }
1675 else if (io)
1676 {
1677 RtlEnterCriticalSection( &io->pool->cs );
1678
1679 TRACE( "pending_count %u.\n", io->u.io.pending_count );
1680
1681 if (io->u.io.pending_count)
1682 {
1683 --io->u.io.pending_count;
1684 if (!array_reserve((void **)&io->u.io.completions, &io->u.io.completion_max,
1685 io->u.io.completion_count + 1, sizeof(*io->u.io.completions)))
1686 {
1687 ERR( "Failed to allocate memory.\n" );
1688 RtlLeaveCriticalSection( &io->pool->cs );
1689 continue;
1690 }
1691
1692 completion = &io->u.io.completions[io->u.io.completion_count++];
1693 completion->iosb = iosb;
1694#ifdef __REACTOS__
1695 completion->cvalue = (ULONG_PTR)value;
1696#else
1697 completion->cvalue = value;
1698#endif
1699
1701 }
1702 RtlLeaveCriticalSection( &io->pool->cs );
1703 }
1704
1705 if (!ioqueue.objcount)
1706 {
1707 /* All I/O objects have been destroyed; if no new objects are
1708 * created within some amount of time, then we can shutdown this
1709 * thread. */
1710 LARGE_INTEGER timeout = {.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000};
1711 if (RtlSleepConditionVariableCS( &ioqueue.update_event, &ioqueue.cs,
1712 &timeout) == STATUS_TIMEOUT && !ioqueue.objcount)
1713 break;
1714 }
1715 }
1716
1717 ioqueue.thread_running = FALSE;
1719
1720 TRACE( "terminating I/O completion thread\n" );
1721
1722 RtlExitUserThread( 0 );
1723
1724#ifdef __REACTOS__
1725 return STATUS_SUCCESS;
1726#endif
1727}
void CALLBACK completion(DWORD dwError, DWORD cbTransferred, LPWSAOVERLAPPED lpOverlapped, DWORD dwFlags)
Definition: WSARecv.c:16
void destroy(_Tp *__pointer)
Definition: _construct.h:278
#define skip(...)
Definition: atltest.h:64
#define STATUS_TIMEOUT
Definition: d3dkmdt.h:49
#define L(x)
Definition: resources.c:13
#define ULONG_PTR
Definition: config.h:101
unsigned int BOOL
Definition: ntddk_ex.h:94
NTSYSAPI NTSTATUS NTAPI RtlSleepConditionVariableCS(_Inout_ PRTL_CONDITION_VARIABLE ConditionVariable, _Inout_ PRTL_CRITICAL_SECTION CriticalSection, _In_opt_ PLARGE_INTEGER TimeOut)
NTSYSAPI NTSTATUS NTAPI RtlEnterCriticalSection(_In_ PRTL_CRITICAL_SECTION CriticalSection)
NTSYSAPI VOID NTAPI RtlExitUserThread(_In_ NTSTATUS Status)
NTSYSAPI NTSTATUS NTAPI RtlLeaveCriticalSection(_In_ PRTL_CRITICAL_SECTION CriticalSection)
static BOOL tp_object_release(struct threadpool_object *object)
Definition: threadpool.c:2257
static void set_thread_name(const WCHAR *name)
Definition: threadpool.c:473
static BOOL array_reserve(void **elements, unsigned int *capacity, unsigned int count, unsigned int size)
Definition: threadpool.c:446
static void tp_object_submit(struct threadpool_object *object, BOOL signaled)
Definition: threadpool.c:2124
#define THREADPOOL_WORKER_TIMEOUT
Definition: threadpool.c:144
static struct @5261 ioqueue
#define TRACE(s)
Definition: solgame.cpp:4
Definition: copy.c:22
uint32_t ULONG_PTR
Definition: typedefs.h:65
Definition: pdh_main.c:96

Referenced by tp_ioqueue_lock().

◆ object_is_finished()

static BOOL object_is_finished ( struct threadpool_object object,
BOOL  group 
)
static

Definition at line 2189 of file threadpool.c.

2190{
2191 if (object->num_pending_callbacks)
2192 return FALSE;
2193 if (object->type == TP_OBJECT_TYPE_IO && object->u.io.pending_count)
2194 return FALSE;
2195
2196 if (group)
2197 return !object->num_running_callbacks;
2198 else
2199 return !object->num_associated_callbacks;
2200}

Referenced by tp_object_execute(), tp_object_wait(), TpCancelAsyncIoOperation(), and TpDisassociateCallback().

◆ process_rtl_work_item()

static void CALLBACK process_rtl_work_item ( TP_CALLBACK_INSTANCE instance,
void userdata 
)
static

Definition at line 484 of file threadpool.c.

485{
486 struct rtl_work_item *item = userdata;
487
488 TRACE("executing %p(%p)\n", item->function, item->context);
489 item->function( item->context );
490
492}
BOOLEAN NTAPI RtlFreeHeap(IN PVOID HeapHandle, IN ULONG Flags, IN PVOID HeapBase)
Definition: heap.c:634

Referenced by RtlQueueWorkItem().

◆ queue_add_timer()

static void queue_add_timer ( struct queue_timer t,
ULONGLONG  time,
BOOL  set_event 
)
static

Definition at line 689 of file threadpool.c.

691{
692 /* We MUST hold the queue cs while calling this function. */
693 struct timer_queue *q = t->q;
694 struct list *ptr = &q->timers;
695
696 assert(!q->quit || (t->destroy && time == EXPIRE_NEVER));
697
698 if (time != EXPIRE_NEVER)
699 LIST_FOR_EACH(ptr, &q->timers)
700 {
701 struct queue_timer *cur = LIST_ENTRY(ptr, struct queue_timer, entry);
702 if (time < cur->expire)
703 break;
704 }
705 list_add_before(ptr, &t->entry);
706
707 t->expire = time;
708
709 /* If we insert at the head of the list, we need to expire sooner
710 than expected. */
711 if (set_event && &t->entry == list_head(&q->timers))
712 NtSetEvent(q->event, NULL);
713}
Definition: list.h:37
static uacpi_status set_event(uacpi_u8 event, uacpi_u8 value)
Definition: event.c:84
FxCollectionEntry * cur
GLdouble GLdouble t
Definition: gl.h:2047
uint32_t entry
Definition: isohybrid.c:63
__u16 time
Definition: mkdosfs.c:8
static PVOID ptr
Definition: dispmode.c:27
NTSTATUS NTAPI NtSetEvent(IN HANDLE EventHandle, OUT PLONG PreviousState OPTIONAL)
Definition: event.c:455
#define LIST_FOR_EACH(cursor, list)
Definition: list.h:188
__WINE_SERVER_LIST_INLINE void list_add_before(struct list *elem, struct list *to_add)
Definition: list.h:87
#define EXPIRE_NEVER
Definition: threadpool.c:84
Definition: list.h:15
ULONGLONG expire
Definition: threadpool.c:125
#define LIST_ENTRY(type)
Definition: queue.h:175

Referenced by queue_move_timer(), and RtlCreateTimer().

◆ queue_current_time()

static ULONGLONG queue_current_time ( void  )
inlinestatic

Definition at line 682 of file threadpool.c.

683{
684 LARGE_INTEGER now, freq;
686 return now.QuadPart * 1000 / freq.QuadPart;
687}
time_t now
Definition: finger.c:65
NTSTATUS NTAPI NtQueryPerformanceCounter(OUT PLARGE_INTEGER PerformanceCounter, OUT PLARGE_INTEGER PerformanceFrequency OPTIONAL)
Definition: profile.c:278

Referenced by queue_get_timeout(), queue_timer_expire(), RtlCreateTimer(), and RtlUpdateTimer().

◆ queue_destroy_timer()

static void queue_destroy_timer ( struct queue_timer t)
static

Definition at line 842 of file threadpool.c.

843{
844 /* We MUST hold the queue cs while calling this function. */
845 t->destroy = TRUE;
846 if (t->runcount == 0)
847 /* Ensure a timer is promptly removed. If callbacks are pending,
848 it will be removed after the last one finishes by the callback
849 cleanup wrapper. */
851 else
852 /* Make sure no destroyed timer masks an active timer at the head
853 of the sorted list. */
855}
static void queue_move_timer(struct queue_timer *t, ULONGLONG time, BOOL set_event)
Definition: threadpool.c:715
static void queue_remove_timer(struct queue_timer *t)
Definition: threadpool.c:641

Referenced by RtlDeleteTimer(), and RtlDeleteTimerQueueEx().

◆ queue_get_timeout()

static ULONG queue_get_timeout ( struct timer_queue q)
static

Definition at line 768 of file threadpool.c.

769{
770 struct queue_timer *t;
772
774 if (list_head(&q->timers))
775 {
776 t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
777 assert(!t->destroy || t->expire == EXPIRE_NEVER);
778
779 if (t->expire != EXPIRE_NEVER)
780 {
782 timeout = t->expire < time ? 0 : t->expire - time;
783 }
784 }
786
787 return timeout;
788}
static ULONGLONG queue_current_time(void)
Definition: threadpool.c:682
uint32_t ULONG
Definition: typedefs.h:59

Referenced by timer_queue_thread_proc().

◆ queue_move_timer()

static void queue_move_timer ( struct queue_timer t,
ULONGLONG  time,
BOOL  set_event 
)
inlinestatic

Definition at line 715 of file threadpool.c.

717{
718 /* We MUST hold the queue cs while calling this function. */
719 list_remove(&t->entry);
721}
static void list_remove(struct list_entry *entry)
Definition: list.h:90
static void queue_add_timer(struct queue_timer *t, ULONGLONG time, BOOL set_event)
Definition: threadpool.c:689

Referenced by queue_destroy_timer(), queue_timer_expire(), and RtlUpdateTimer().

◆ queue_remove_timer()

static void queue_remove_timer ( struct queue_timer t)
static

Definition at line 641 of file threadpool.c.

642{
643 /* We MUST hold the queue cs while calling this function. This ensures
644 that we cannot queue another callback for this timer. The runcount
645 being zero makes sure we don't have any already queued. */
646 struct timer_queue *q = t->q;
647
648 assert(t->runcount == 0);
649 assert(t->destroy);
650
651 list_remove(&t->entry);
652 if (t->event)
653 NtSetEvent(t->event, NULL);
655
656 if (q->quit && list_empty(&q->timers))
657 NtSetEvent(q->event, NULL);
658}
static int list_empty(struct list_entry *head)
Definition: list.h:58

Referenced by queue_destroy_timer(), and timer_cleanup_callback().

◆ queue_timer_expire()

static void queue_timer_expire ( struct timer_queue q)
static

Definition at line 723 of file threadpool.c.

724{
725 struct queue_timer *t = NULL;
726
728 if (list_head(&q->timers))
729 {
731 t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
732 if (!t->destroy && t->expire <= ((now = queue_current_time())))
733 {
734 ++t->runcount;
735 if (t->period)
736 {
737 next = t->expire + t->period;
738 /* avoid trigger cascade if overloaded / hibernated */
739 if (next < now)
740 next = now + t->period;
741 }
742 else
745 }
746 else
747 t = NULL;
748 }
750
751 if (t)
752 {
753 if (t->flags & WT_EXECUTEINTIMERTHREAD)
755 else
756 {
758 = (t->flags
762 if (status != STATUS_SUCCESS)
764 }
765 }
766}
GLbitfield flags
Definition: glext.h:7161
NTSYSAPI NTSTATUS NTAPI RtlQueueWorkItem(_In_ WORKERCALLBACKFUNC Function, _In_opt_ PVOID Context, _In_ ULONG Flags)
static unsigned __int64 next
Definition: rand_nt.c:6
static DWORD WINAPI timer_callback_wrapper(LPVOID p)
Definition: threadpool.c:674
static void timer_cleanup_callback(struct queue_timer *t)
Definition: threadpool.c:660
#define WT_TRANSFER_IMPERSONATION
Definition: winnt_old.h:1089
#define WT_EXECUTEINPERSISTENTTHREAD
Definition: winnt_old.h:1088
#define WT_EXECUTEINIOTHREAD
Definition: winnt_old.h:1081
#define WT_EXECUTELONGFUNCTION
Definition: winnt_old.h:1085
#define WT_EXECUTEINTIMERTHREAD
Definition: winnt_old.h:1086

Referenced by timer_queue_thread_proc().

◆ rtl_wait_callback()

static void CALLBACK rtl_wait_callback ( TP_CALLBACK_INSTANCE instance,
void userdata,
TP_WAIT wait,
TP_WAIT_RESULT  result 
)
static

Definition at line 3386 of file threadpool.c.

3387{
3388 struct threadpool_object *object = impl_from_TP_WAIT(wait);
3389 object->u.wait.rtl_callback( userdata, result != STATUS_WAIT_0 );
3390}
GLuint64EXT * result
Definition: glext.h:11304
#define STATUS_WAIT_0
Definition: ntstatus.h:237
static struct threadpool_object * impl_from_TP_WAIT(TP_WAIT *wait)
Definition: threadpool.c:411

Referenced by RtlRegisterWait().

◆ RtlCreateTimer()

NTSTATUS WINAPI RtlCreateTimer ( HANDLE  TimerQueue,
HANDLE NewTimer,
RTL_WAITORTIMERCALLBACKFUNC  Callback,
PVOID  Parameter,
DWORD  DueTime,
DWORD  Period,
ULONG  Flags 
)

Definition at line 1008 of file threadpool.c.

1012{
1014 struct queue_timer *t;
1015 struct timer_queue *q = get_timer_queue(TimerQueue);
1016
1017 if (!q) return STATUS_NO_MEMORY;
1018 if (q->magic != TIMER_QUEUE_MAGIC) return STATUS_INVALID_HANDLE;
1019
1020 t = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *t);
1021 if (!t)
1022 return STATUS_NO_MEMORY;
1023
1024 t->q = q;
1025 t->runcount = 0;
1026 t->callback = Callback;
1027 t->param = Parameter;
1028 t->period = Period;
1029 t->flags = Flags;
1030 t->destroy = FALSE;
1031 t->event = NULL;
1032
1035 if (q->quit)
1037 else
1040
1041 if (status == STATUS_SUCCESS)
1042 *NewTimer = t;
1043 else
1045
1046 return status;
1047}
PVOID NTAPI RtlAllocateHeap(IN PVOID HeapHandle, IN ULONG Flags, IN SIZE_T Size)
Definition: heap.c:616
#define STATUS_INVALID_HANDLE
Definition: d3dkmdt.h:40
#define STATUS_NO_MEMORY
Definition: d3dkmdt.h:51
#define TIMER_QUEUE_MAGIC
Definition: threadpool.c:85
static struct timer_queue * get_timer_queue(HANDLE TimerQueue)
Definition: threadpool.c:959
_In_ WDFINTERRUPT _In_ PFN_WDF_INTERRUPT_SYNCHRONIZE Callback
Definition: wdfinterrupt.h:458
_In_ WDFTIMER _In_ LONGLONG DueTime
Definition: wdftimer.h:190
_Must_inspect_result_ _In_ ULONG Flags
Definition: wsk.h:170
_In_ LARGE_INTEGER _In_ ULONG Period
Definition: kefuncs.h:1313
_Inout_opt_ PVOID Parameter
Definition: rtltypes.h:336

◆ RtlCreateTimerQueue()

NTSTATUS WINAPI RtlCreateTimerQueue ( PHANDLE  NewTimerQueue)

Definition at line 869 of file threadpool.c.

870{
872 struct timer_queue *q = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *q);
873 if (!q)
874 return STATUS_NO_MEMORY;
875
877 list_init(&q->timers);
878 q->quit = FALSE;
879 q->magic = TIMER_QUEUE_MAGIC;
881 if (status != STATUS_SUCCESS)
882 {
884 return status;
885 }
887 timer_queue_thread_proc, q, &q->thread, NULL);
888 if (status != STATUS_SUCCESS)
889 {
890 NtClose(q->event);
892 return status;
893 }
894
895 *NewTimerQueue = q;
896 return STATUS_SUCCESS;
897}
static void list_init(struct list_entry *head)
Definition: list.h:51
#define GetCurrentProcess()
Definition: compat.h:759
#define EVENT_ALL_ACCESS
Definition: isotest.c:82
NTSYSAPI NTSTATUS NTAPI RtlCreateUserThread(_In_ PVOID ThreadContext, _Out_ HANDLE *OutThreadHandle, _Reserved_ PVOID Reserved1, _Reserved_ PVOID Reserved2, _Reserved_ PVOID Reserved3, _Reserved_ PVOID Reserved4, _Reserved_ PVOID Reserved5, _Reserved_ PVOID Reserved6, _Reserved_ PVOID Reserved7, _Reserved_ PVOID Reserved8)
NTSYSAPI NTSTATUS NTAPI RtlInitializeCriticalSection(_In_ PRTL_CRITICAL_SECTION CriticalSection)
NTSTATUS NTAPI NtClose(IN HANDLE Handle)
Definition: obhandle.c:3402
@ SynchronizationEvent
NTSTATUS NTAPI NtCreateEvent(OUT PHANDLE EventHandle, IN ACCESS_MASK DesiredAccess, IN POBJECT_ATTRIBUTES ObjectAttributes OPTIONAL, IN EVENT_TYPE EventType, IN BOOLEAN InitialState)
Definition: event.c:96
static void WINAPI timer_queue_thread_proc(LPVOID p)
Definition: threadpool.c:793

Referenced by get_timer_queue().

◆ RtlDeleteTimer()

NTSTATUS WINAPI RtlDeleteTimer ( HANDLE  TimerQueue,
HANDLE  Timer,
HANDLE  CompletionEvent 
)

Definition at line 1103 of file threadpool.c.

1105{
1106 struct queue_timer *t = Timer;
1107 struct timer_queue *q;
1109 HANDLE event = NULL;
1110
1111 if (!Timer)
1113 q = t->q;
1114 if (CompletionEvent == INVALID_HANDLE_VALUE)
1115 {
1117 if (status == STATUS_SUCCESS)
1119 }
1120 else if (CompletionEvent)
1121 event = CompletionEvent;
1122
1124 t->event = event;
1125 if (t->runcount == 0 && event)
1129
1130 if (CompletionEvent == INVALID_HANDLE_VALUE && event)
1131 {
1132 if (status == STATUS_PENDING)
1133 {
1136 }
1137 NtClose(event);
1138 }
1139
1140 return status;
1141}
#define STATUS_PENDING
Definition: d3dkmdt.h:43
#define INVALID_HANDLE_VALUE
Definition: compat.h:731
struct _cl_event * event
Definition: glext.h:7739
NTSYSAPI NTSTATUS NTAPI NtWaitForSingleObject(IN HANDLE hObject, IN BOOLEAN bAlertable, IN PLARGE_INTEGER Timeout)
#define STATUS_INVALID_PARAMETER_1
Definition: ntstatus.h:475
static void queue_destroy_timer(struct queue_timer *t)
Definition: threadpool.c:842

◆ RtlDeleteTimerQueueEx()

NTSTATUS WINAPI RtlDeleteTimerQueueEx ( HANDLE  TimerQueue,
HANDLE  CompletionEvent 
)

Definition at line 915 of file threadpool.c.

916{
917 struct timer_queue *q = TimerQueue;
918 struct queue_timer *t, *temp;
921
922 if (!q || q->magic != TIMER_QUEUE_MAGIC)
924
925 thread = q->thread;
926
928 q->quit = TRUE;
929 if (list_head(&q->timers))
930 /* When the last timer is removed, it will signal the timer thread to
931 exit... */
932 LIST_FOR_EACH_ENTRY_SAFE(t, temp, &q->timers, struct queue_timer, entry)
934 else
935 /* However if we have none, we must do it ourselves. */
936 NtSetEvent(q->event, NULL);
938
939 if (CompletionEvent == INVALID_HANDLE_VALUE)
940 {
943 }
944 else
945 {
946 if (CompletionEvent)
947 {
948 FIXME("asynchronous return on completion event unimplemented\n");
950 NtSetEvent(CompletionEvent, NULL);
951 }
953 }
954
956 return status;
957}
#define FIXME(fmt,...)
Definition: precomp.h:53
static HANDLE thread
Definition: service.c:33
static calc_node_t temp
Definition: rpn_ieee.c:38
#define LIST_FOR_EACH_ENTRY_SAFE(cursor, cursor2, list, type, field)
Definition: list.h:204

Referenced by get_timer_queue().

◆ RtlDeregisterWait()

NTSTATUS WINAPI RtlDeregisterWait ( HANDLE  WaitHandle)

Definition at line 3504 of file threadpool.c.

3505{
3506 return RtlDeregisterWaitEx(WaitHandle, NULL);
3507}
NTSYSAPI NTSTATUS NTAPI RtlDeregisterWaitEx(_In_ HANDLE hWaitHandle, _In_opt_ HANDLE hCompletionEvent)

◆ RtlDeregisterWaitEx()

NTSTATUS WINAPI RtlDeregisterWaitEx ( HANDLE  handle,
HANDLE  event 
)

Definition at line 3463 of file threadpool.c.

3464{
3465 struct threadpool_object *object = handle;
3467
3468 TRACE( "handle %p, event %p\n", handle, event );
3469
3470 if (!object) return STATUS_INVALID_HANDLE;
3471
3472 TpSetWait( (TP_WAIT *)object, NULL, NULL );
3473
3474 if (event == INVALID_HANDLE_VALUE) TpWaitForWait( (TP_WAIT *)object, TRUE );
3475 else
3476 {
3477 assert( object->completed_event == NULL );
3478 object->completed_event = event;
3479 }
3480
3481 RtlEnterCriticalSection( &object->pool->cs );
3482 if (object->num_pending_callbacks + object->num_running_callbacks
3483 + object->num_associated_callbacks) status = STATUS_PENDING;
3484 else status = STATUS_SUCCESS;
3485 RtlLeaveCriticalSection( &object->pool->cs );
3486
3487 TpReleaseWait( (TP_WAIT *)object );
3488 return status;
3489}
NTSYSAPI void WINAPI TpWaitForWait(TP_WAIT *, BOOL)
Definition: threadpool.c:3322
NTSYSAPI void WINAPI TpSetWait(TP_WAIT *, HANDLE, LARGE_INTEGER *)
Definition: threadpool.c:3196
NTSYSAPI void WINAPI TpReleaseWait(TP_WAIT *)
Definition: threadpool.c:3046
struct _TP_WAIT TP_WAIT
Definition: winnt_old.h:4650

◆ RtlQueueWorkItem()

NTSTATUS WINAPI RtlQueueWorkItem ( PRTL_WORK_ITEM_ROUTINE  function,
PVOID  context,
ULONG  flags 
)

Definition at line 516 of file threadpool.c.

517{
518 TP_CALLBACK_ENVIRON environment;
519 struct rtl_work_item *item;
521
522 TRACE( "%p %p %lu\n", function, context, flags );
523
524 item = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*item) );
525 if (!item)
526 return STATUS_NO_MEMORY;
527
528 memset( &environment, 0, sizeof(environment) );
529 environment.Version = 1;
530 environment.u.s.LongFunction = (flags & WT_EXECUTELONGFUNCTION) != 0;
531 environment.u.s.Persistent = (flags & WT_EXECUTEINPERSISTENTTHREAD) != 0;
532
533 item->function = function;
534 item->context = context;
535
538 return status;
539}
NTSYSAPI NTSTATUS WINAPI TpSimpleTryPost(PTP_SIMPLE_CALLBACK, PVOID, TP_CALLBACK_ENVIRON *)
Definition: threadpool.c:3248
#define memset(x, y, z)
Definition: compat.h:39
static void CALLBACK process_rtl_work_item(TP_CALLBACK_INSTANCE *instance, void *userdata)
Definition: threadpool.c:484
struct _TP_CALLBACK_ENVIRON_V3::@4475::@4476 s
union _TP_CALLBACK_ENVIRON_V3::@4475 u
Definition: http.c:7252
PVOID context
Definition: threadpool.c:81
PRTL_WORK_ITEM_ROUTINE function
Definition: threadpool.c:80

◆ RtlRegisterWait()

NTSTATUS WINAPI RtlRegisterWait ( HANDLE out,
HANDLE  handle,
RTL_WAITORTIMERCALLBACKFUNC  callback,
void context,
ULONG  milliseconds,
ULONG  flags 
)

Definition at line 3417 of file threadpool.c.

3419{
3420 struct threadpool_object *object;
3421 TP_CALLBACK_ENVIRON environment;
3424 TP_WAIT *wait;
3425
3426 TRACE( "out %p, handle %p, callback %p, context %p, milliseconds %lu, flags %lx\n",
3427 out, handle, callback, context, milliseconds, flags );
3428
3429 memset( &environment, 0, sizeof(environment) );
3430 environment.Version = 1;
3431 environment.u.s.LongFunction = (flags & WT_EXECUTELONGFUNCTION) != 0;
3432 environment.u.s.Persistent = (flags & WT_EXECUTEINPERSISTENTTHREAD) != 0;
3433
3435 if ((status = tp_alloc_wait( &wait, rtl_wait_callback, context, &environment, flags )))
3436 return status;
3437
3438 object = impl_from_TP_WAIT(wait);
3439 object->u.wait.rtl_callback = callback;
3440
3442 TpSetWait( (TP_WAIT *)object, handle, get_nt_timeout( &timeout, milliseconds ) );
3443
3444 *out = object;
3446
3447 return STATUS_SUCCESS;
3448}
static struct @5260 waitqueue
static PLARGE_INTEGER get_nt_timeout(PLARGE_INTEGER pTime, ULONG timeout)
Definition: threadpool.c:632
static NTSTATUS tp_alloc_wait(TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata, TP_CALLBACK_ENVIRON *environment, DWORD flags)
Definition: threadpool.c:2643
static void CALLBACK rtl_wait_callback(TP_CALLBACK_INSTANCE *instance, void *userdata, TP_WAIT *wait, TP_WAIT_RESULT result)
Definition: threadpool.c:3386
ULONGLONG timeout
Definition: threadpool.c:227
wchar_t tm const _CrtWcstime_Writes_and_advances_ptr_ count wchar_t ** out
Definition: wcsftime.cpp:383
#define WT_EXECUTEONLYONCE
Definition: winnt_old.h:1084
#define WT_EXECUTEINWAITTHREAD
Definition: winnt_old.h:1083

◆ RtlSetIoCompletionCallback()

NTSTATUS WINAPI RtlSetIoCompletionCallback ( HANDLE  FileHandle,
PRTL_OVERLAPPED_COMPLETION_ROUTINE  Function,
ULONG  Flags 
)

Definition at line 594 of file threadpool.c.

595{
598
599 if (Flags) FIXME("Unknown value Flags=0x%lx\n", Flags);
600
601 if (!old_threadpool.compl_port)
602 {
604
605 RtlEnterCriticalSection(&old_threadpool.threadpool_compl_cs);
606 if (!old_threadpool.compl_port)
607 {
608 HANDLE cport;
609
611 if (!res)
612 {
613 /* FIXME native can start additional threads in case of e.g. hung callback function. */
615 if (!res)
616 old_threadpool.compl_port = cport;
617 else
618 NtClose( cport );
619 }
620 }
621 RtlLeaveCriticalSection(&old_threadpool.threadpool_compl_cs);
622 if (res) return res;
623 }
624
625 info.CompletionPort = old_threadpool.compl_port;
626 info.CompletionKey = (ULONG_PTR)Function;
627
629}
_In_ CDROM_SCAN_FOR_SPECIAL_INFO _In_ PCDROM_SCAN_FOR_SPECIAL_HANDLER Function
Definition: cdrom.h:1156
_Must_inspect_result_ _In_opt_ PFLT_INSTANCE _Out_ PHANDLE FileHandle
Definition: fltkernel.h:1231
@ FileCompletionInformation
Definition: from_kernel.h:91
NTSTATUS NTAPI NtCreateIoCompletion(OUT PHANDLE IoCompletionHandle, IN ACCESS_MASK DesiredAccess, IN POBJECT_ATTRIBUTES ObjectAttributes, IN ULONG NumberOfConcurrentThreads)
Definition: iocomp.c:253
#define IO_COMPLETION_ALL_ACCESS
Definition: file.c:72
NTSYSAPI NTSTATUS NTAPI NtSetInformationFile(IN HANDLE hFile, OUT PIO_STATUS_BLOCK pIoStatusBlock, IN PVOID FileInformationBuffer, IN ULONG FileInformationBufferLength, IN FILE_INFORMATION_CLASS FileInfoClass)
Definition: iofunc.c:3096
static struct @5258 old_threadpool
static DWORD CALLBACK iocp_poller(LPVOID Arg)
Definition: threadpool.c:544
#define WT_EXECUTEDEFAULT
Definition: winnt_old.h:1080

◆ RtlUpdateTimer()

NTSTATUS WINAPI RtlUpdateTimer ( HANDLE  TimerQueue,
HANDLE  Timer,
DWORD  DueTime,
DWORD  Period 
)

Definition at line 1067 of file threadpool.c.

1069{
1070 struct queue_timer *t = Timer;
1071 struct timer_queue *q = t->q;
1072
1074 /* Can't change a timer if it was once-only or destroyed. */
1075 if (t->expire != EXPIRE_NEVER)
1076 {
1077 t->period = Period;
1079 }
1081
1082 return STATUS_SUCCESS;
1083}

◆ set_thread_name()

static void set_thread_name ( const WCHAR name)
static

Definition at line 473 of file threadpool.c.

474{
475#ifndef __REACTOS__ // This is impossible on non vista+
477
478 RtlInitUnicodeString(&info.ThreadName, name);
480#endif
481}
@ ThreadNameInformation
Definition: winternl.h:2319
NTSYSAPI VOID NTAPI RtlInitUnicodeString(PUNICODE_STRING DestinationString, PCWSTR SourceString)
NTSTATUS NTAPI NtSetInformationThread(IN HANDLE ThreadHandle, IN THREADINFOCLASS ThreadInformationClass, IN PVOID ThreadInformation, IN ULONG ThreadInformationLength)
Definition: query.c:2074
Definition: name.c:39
HANDLE WINAPI GetCurrentThread(void)
Definition: proc.c:1148

Referenced by ioqueue_thread_proc(), threadpool_worker_proc(), timer_queue_thread_proc(), timerqueue_thread_proc(), and waitqueue_thread_proc().

◆ threadpool_get_next_item()

static struct list * threadpool_get_next_item ( const struct threadpool pool)
static

Definition at line 2297 of file threadpool.c.

2298{
2299 struct list *ptr;
2300 unsigned int i;
2301
2302 for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
2303 {
2304 if ((ptr = list_head( &pool->pools[i] )))
2305 break;
2306 }
2307
2308 return ptr;
2309}
#define ARRAY_SIZE(A)
Definition: main.h:20
GLsizei GLenum const GLvoid GLsizei GLenum GLbyte GLbyte GLbyte GLdouble GLdouble GLdouble GLfloat GLfloat GLfloat GLint GLint GLint GLshort GLshort GLshort GLubyte GLubyte GLubyte GLuint GLuint GLuint GLushort GLushort GLushort GLbyte GLbyte GLbyte GLbyte GLdouble GLdouble GLdouble GLdouble GLfloat GLfloat GLfloat GLfloat GLint GLint GLint GLint GLshort GLshort GLshort GLshort GLubyte GLubyte GLubyte GLubyte GLuint GLuint GLuint GLuint GLushort GLushort GLushort GLushort GLboolean const GLdouble const GLfloat const GLint const GLshort const GLbyte const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLdouble const GLfloat const GLfloat const GLint const GLint const GLshort const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort GLenum GLenum GLenum GLfloat GLenum GLint GLenum GLenum GLenum GLfloat GLenum GLenum GLint GLenum GLfloat GLenum GLint GLint GLushort GLenum GLenum GLfloat GLenum GLenum GLint GLfloat const GLubyte GLenum GLenum GLenum const GLfloat GLenum GLenum const GLint GLenum GLint GLint GLsizei GLsizei GLint GLenum GLenum const GLvoid GLenum GLenum const GLfloat GLenum GLenum const GLint GLenum GLenum const GLdouble GLenum GLenum const GLfloat GLenum GLenum const GLint GLsizei GLuint GLfloat GLuint GLbitfield GLfloat GLint GLuint GLboolean GLenum GLfloat GLenum GLbitfield GLenum GLfloat GLfloat GLint GLint const GLfloat GLenum GLfloat GLfloat GLint GLint GLfloat GLfloat GLint GLint const GLfloat GLint GLfloat GLfloat GLint GLfloat GLfloat GLint GLfloat GLfloat const GLdouble const GLfloat const GLdouble const GLfloat GLint i
Definition: glfuncs.h:248

Referenced by threadpool_worker_proc().

◆ threadpool_worker_proc()

static void CALLBACK threadpool_worker_proc ( void param)
static

Definition at line 2476 of file threadpool.c.

2478{
2479 struct threadpool *pool = param;
2481 struct list *ptr;
2482
2483 TRACE( "starting worker thread for pool %p\n", pool );
2484 set_thread_name(L"wine_threadpool_worker");
2485
2487 for (;;)
2488 {
2489 while ((ptr = threadpool_get_next_item( pool )))
2490 {
2491 struct threadpool_object *object = LIST_ENTRY( ptr, struct threadpool_object, pool_entry );
2492 assert( object->num_pending_callbacks > 0 );
2493
2494 /* If further pending callbacks are queued, move the work item to
2495 * the end of the pool list. Otherwise remove it from the pool. */
2496 list_remove( &object->pool_entry );
2497 if (object->num_pending_callbacks > 1)
2498 tp_object_prio_queue( object );
2499
2500 tp_object_execute( object, FALSE );
2501
2502 assert(pool->num_busy_workers);
2503 pool->num_busy_workers--;
2504
2505 tp_object_release( object );
2506 }
2507
2508 /* Shutdown worker thread if requested. */
2509 if (pool->shutdown)
2510 break;
2511
2512 /* Wait for new tasks or until the timeout expires. A thread only terminates
2513 * when no new tasks are available, and the number of threads can be
2514 * decreased without violating the min_workers limit. An exception is when
2515 * min_workers == 0, then objcount is used to detect if the last thread
2516 * can be terminated. */
2517 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
2518 if (RtlSleepConditionVariableCS( &pool->update_event, &pool->cs, &timeout ) == STATUS_TIMEOUT &&
2519 !threadpool_get_next_item( pool ) && (pool->num_workers > max( pool->min_workers, 1 ) ||
2520 (!pool->min_workers && !pool->objcount)))
2521 {
2522 break;
2523 }
2524 }
2525 pool->num_workers--;
2527
2528 TRACE( "terminating worker thread for pool %p\n", pool );
2530 RtlExitUserThread( 0 );
2531#ifdef __REACTOS__
2532 return STATUS_SUCCESS;
2533#endif
2534}
GLfloat param
Definition: glext.h:5796
static struct list * threadpool_get_next_item(const struct threadpool *pool)
Definition: threadpool.c:2297
static BOOL tp_threadpool_release(struct threadpool *pool)
Definition: threadpool.c:1850
static void tp_object_execute(struct threadpool_object *object, BOOL wait_thread)
Definition: threadpool.c:2317
static void tp_object_prio_queue(struct threadpool_object *object)
Definition: threadpool.c:2112
struct list pool_entry
Definition: threadpool.c:201

Referenced by tp_new_worker_thread().

◆ timer_callback_wrapper()

static DWORD WINAPI timer_callback_wrapper ( LPVOID  p)
static

Definition at line 674 of file threadpool.c.

675{
676 struct queue_timer *t = p;
677 t->callback(t->param, TRUE);
679 return 0;
680}

Referenced by queue_timer_expire().

◆ timer_cleanup_callback()

static void timer_cleanup_callback ( struct queue_timer t)
static

Definition at line 660 of file threadpool.c.

661{
662 struct timer_queue *q = t->q;
664
665 assert(0 < t->runcount);
666 --t->runcount;
667
668 if (t->destroy && t->runcount == 0)
670
672}

Referenced by queue_timer_expire(), and timer_callback_wrapper().

◆ timer_queue_thread_proc()

static void WINAPI timer_queue_thread_proc ( LPVOID  p)
static

Definition at line 793 of file threadpool.c.

795{
796 struct timer_queue *q = p;
797 ULONG timeout_ms;
798
799 set_thread_name(L"wine_threadpool_timer_queue");
800 timeout_ms = INFINITE;
801 for (;;)
802 {
805 BOOL done = FALSE;
806
808 q->event, FALSE, get_nt_timeout(&timeout, timeout_ms));
809
810 if (status == STATUS_WAIT_0)
811 {
812 /* There are two possible ways to trigger the event. Either
813 we are quitting and the last timer got removed, or a new
814 timer got put at the head of the list so we need to adjust
815 our timeout. */
817 if (q->quit && list_empty(&q->timers))
818 done = TRUE;
820 }
821 else if (status == STATUS_TIMEOUT)
823
824 if (done)
825 break;
826
827 timeout_ms = queue_get_timeout(q);
828 }
829
830 NtClose(q->event);
832 q->magic = 0;
835#ifdef __REACTOS__
836 return STATUS_SUCCESS;
837#endif
838}
NTSYSAPI NTSTATUS NTAPI RtlDeleteCriticalSection(_In_ PRTL_CRITICAL_SECTION CriticalSection)
static ULONG queue_get_timeout(struct timer_queue *q)
Definition: threadpool.c:768
static void queue_timer_expire(struct timer_queue *q)
Definition: threadpool.c:723

Referenced by RtlCreateTimerQueue().

◆ timerqueue_thread_proc()

static void CALLBACK timerqueue_thread_proc ( void param)
static

Definition at line 1149 of file threadpool.c.

1151{
1152 ULONGLONG timeout_lower, timeout_upper, new_timeout;
1153 struct threadpool_object *other_timer;
1155 struct list *ptr;
1156
1157 TRACE( "starting timer queue thread\n" );
1158 set_thread_name(L"wine_threadpool_timerqueue");
1159
1161 for (;;)
1162 {
1164
1165 /* Check for expired timers. */
1166 while ((ptr = list_head( &timerqueue.pending_timers )))
1167 {
1168 struct threadpool_object *timer = LIST_ENTRY( ptr, struct threadpool_object, u.timer.timer_entry );
1169 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1170 assert( timer->u.timer.timer_pending );
1171 if (timer->u.timer.timeout > now.QuadPart)
1172 break;
1173
1174 /* Queue a new callback in one of the worker threads. */
1175 list_remove( &timer->u.timer.timer_entry );
1176 timer->u.timer.timer_pending = FALSE;
1178
1179 /* Insert the timer back into the queue, except it's marked for shutdown. */
1180 if (timer->u.timer.period && !timer->shutdown)
1181 {
1182 timer->u.timer.timeout += (ULONGLONG)timer->u.timer.period * 10000;
1183 if (timer->u.timer.timeout <= now.QuadPart)
1184 timer->u.timer.timeout = now.QuadPart + 1;
1185
1186 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
1187 struct threadpool_object, u.timer.timer_entry )
1188 {
1189 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
1190 if (timer->u.timer.timeout < other_timer->u.timer.timeout)
1191 break;
1192 }
1193 list_add_before( &other_timer->u.timer.timer_entry, &timer->u.timer.timer_entry );
1194 timer->u.timer.timer_pending = TRUE;
1195 }
1196 }
1197
1198 timeout_lower = timeout_upper = MAXLONGLONG;
1199
1200 /* Determine next timeout and use the window length to optimize wakeup times. */
1201 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
1202 struct threadpool_object, u.timer.timer_entry )
1203 {
1204 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
1205 if (other_timer->u.timer.timeout >= timeout_upper)
1206 break;
1207
1208 timeout_lower = other_timer->u.timer.timeout;
1209 new_timeout = timeout_lower + (ULONGLONG)other_timer->u.timer.window_length * 10000;
1210 if (new_timeout < timeout_upper)
1211 timeout_upper = new_timeout;
1212 }
1213
1214 /* Wait for timer update events or until the next timer expires. */
1215 if (timerqueue.objcount)
1216 {
1217 timeout.QuadPart = timeout_lower;
1219 continue;
1220 }
1221
1222 /* All timers have been destroyed, if no new timers are created
1223 * within some amount of time, then we can shutdown this thread. */
1224 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
1225 if (RtlSleepConditionVariableCS( &timerqueue.update_event, &timerqueue.cs,
1226 &timeout ) == STATUS_TIMEOUT && !timerqueue.objcount)
1227 {
1228 break;
1229 }
1230 }
1231
1232 timerqueue.thread_running = FALSE;
1234
1235 TRACE( "terminating timer queue thread\n" );
1236 RtlExitUserThread( 0 );
1237#ifdef __REACTOS__
1238 return STATUS_SUCCESS;
1239#endif
1240}
GLsizei GLenum const GLvoid GLsizei GLenum GLbyte GLbyte GLbyte GLdouble GLdouble GLdouble GLfloat GLfloat GLfloat GLint GLint GLint GLshort GLshort GLshort GLubyte GLubyte GLubyte GLuint GLuint GLuint GLushort GLushort GLushort GLbyte GLbyte GLbyte GLbyte GLdouble GLdouble GLdouble GLdouble GLfloat GLfloat GLfloat GLfloat GLint GLint GLint GLint GLshort GLshort GLshort GLshort GLubyte GLubyte GLubyte GLubyte GLuint GLuint GLuint GLuint GLushort GLushort GLushort GLushort GLboolean const GLdouble const GLfloat const GLint const GLshort const GLbyte const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLdouble const GLfloat const GLfloat const GLint const GLint const GLshort const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort GLenum GLenum GLenum GLfloat GLenum GLint GLenum GLenum GLenum GLfloat GLenum GLenum GLint GLenum GLfloat GLenum GLint GLint GLushort GLenum GLenum GLfloat GLenum GLenum GLint GLfloat const GLubyte GLenum GLenum GLenum const GLfloat GLenum GLenum const GLint GLenum GLint GLint GLsizei GLsizei GLint GLenum GLenum const GLvoid GLenum GLenum const GLfloat GLenum GLenum const GLint GLenum GLenum const GLdouble GLenum GLenum const GLfloat GLenum GLenum const GLint GLsizei GLuint GLfloat GLuint GLbitfield GLfloat GLint GLuint GLboolean GLenum GLfloat GLenum GLbitfield GLenum GLfloat GLfloat GLint GLint const GLfloat GLenum GLfloat GLfloat GLint GLint GLfloat GLfloat GLint GLint const GLfloat GLint GLfloat GLfloat GLint GLfloat GLfloat GLint GLfloat GLfloat const GLdouble * u
Definition: glfuncs.h:240
if(dx< 0)
Definition: linetemp.h:194
#define MAXLONGLONG
NTSTATUS NTAPI NtQuerySystemTime(OUT PLARGE_INTEGER SystemTime)
Definition: time.c:569
#define LIST_FOR_EACH_ENTRY(elem, list, type, field)
Definition: list.h:198
static struct @5259 timerqueue
enum threadpool_objtype type
Definition: threadpool.c:188
union threadpool_object::@5262 u

Referenced by tp_timerqueue_lock().

◆ tp_alloc_wait()

static NTSTATUS tp_alloc_wait ( TP_WAIT **  out,
PTP_WAIT_CALLBACK  callback,
PVOID  userdata,
TP_CALLBACK_ENVIRON environment,
DWORD  flags 
)
static

Definition at line 2643 of file threadpool.c.

2645{
2646 struct threadpool_object *object;
2647 struct threadpool *pool;
2649
2650 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2651 if (!object)
2652 return STATUS_NO_MEMORY;
2653
2654 status = tp_threadpool_lock( &pool, environment );
2655 if (status)
2656 {
2657 RtlFreeHeap( GetProcessHeap(), 0, object );
2658 return status;
2659 }
2660
2661 object->type = TP_OBJECT_TYPE_WAIT;
2662 object->u.wait.callback = callback;
2663 object->u.wait.flags = flags;
2664
2665 status = tp_waitqueue_lock( object );
2666 if (status)
2667 {
2669 RtlFreeHeap( GetProcessHeap(), 0, object );
2670 return status;
2671 }
2672
2673 tp_object_initialize( object, pool, userdata, environment );
2674
2675 *out = (TP_WAIT *)object;
2676 return STATUS_SUCCESS;
2677}
static void tp_object_initialize(struct threadpool_object *object, struct threadpool *pool, PVOID userdata, TP_CALLBACK_ENVIRON *environment)
Definition: threadpool.c:2031
static NTSTATUS tp_waitqueue_lock(struct threadpool_object *wait)
Definition: threadpool.c:1520
static void tp_threadpool_unlock(struct threadpool *pool)
Definition: threadpool.c:1952
static NTSTATUS tp_threadpool_lock(struct threadpool **out, TP_CALLBACK_ENVIRON *environment)
Definition: threadpool.c:1879

Referenced by RtlRegisterWait(), and TpAllocWait().

◆ tp_group_alloc()

static NTSTATUS tp_group_alloc ( struct threadpool_group **  out)
static

Definition at line 1965 of file threadpool.c.

1966{
1967 struct threadpool_group *group;
1968
1969 group = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*group) );
1970 if (!group)
1971 return STATUS_NO_MEMORY;
1972
1973 group->refcount = 1;
1974 group->shutdown = FALSE;
1975
1976#ifdef __REACTOS__
1978#else
1980
1981 group->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool_group.cs");
1982#endif
1983
1984 list_init( &group->members );
1985
1986 TRACE( "allocated group %p\n", group );
1987
1988 *out = group;
1989 return STATUS_SUCCESS;
1990}
NTSYSAPI NTSTATUS WINAPI RtlInitializeCriticalSectionEx(RTL_CRITICAL_SECTION *, ULONG, ULONG)
#define DWORD_PTR
Definition: treelist.c:76
#define RTL_CRITICAL_SECTION_FLAG_FORCE_DEBUG_INFO
Definition: winnt_old.h:1134

Referenced by TpAllocCleanupGroup().

◆ tp_group_release()

static BOOL tp_group_release ( struct threadpool_group group)
static

Definition at line 2007 of file threadpool.c.

2008{
2009 if (InterlockedDecrement( &group->refcount ))
2010 return FALSE;
2011
2012 TRACE( "destroying group %p\n", group );
2013
2014 assert( group->shutdown );
2015 assert( list_empty( &group->members ) );
2016
2017#ifndef __REACTOS__
2018 group->cs.DebugInfo->Spare[0] = 0;
2019#endif
2021
2023 return TRUE;
2024}
#define InterlockedDecrement
Definition: armddk.h:52

Referenced by tp_object_release(), and TpReleaseCleanupGroup().

◆ tp_group_shutdown()

static void tp_group_shutdown ( struct threadpool_group group)
static

Definition at line 1997 of file threadpool.c.

1998{
1999 group->shutdown = TRUE;
2000}

Referenced by TpReleaseCleanupGroup().

◆ tp_ioqueue_lock()

static NTSTATUS tp_ioqueue_lock ( struct threadpool_object io,
HANDLE  file 
)
static

Definition at line 1729 of file threadpool.c.

1730{
1732
1733 assert( io->type == TP_OBJECT_TYPE_IO );
1734
1736
1737 if (!ioqueue.port && (status = NtCreateIoCompletion( &ioqueue.port,
1739 {
1741 return status;
1742 }
1743
1744 if (!ioqueue.thread_running)
1745 {
1746 HANDLE thread;
1747
1749 0, 0, 0, ioqueue_thread_proc, NULL, &thread, NULL )))
1750 {
1751 ioqueue.thread_running = TRUE;
1752 NtClose( thread );
1753 }
1754 }
1755
1756 if (status == STATUS_SUCCESS)
1757 {
1760
1761#ifdef __REACTOS__
1762 info.Port = ioqueue.port;
1763 info.Key = io;
1764#else
1765 info.CompletionPort = ioqueue.port;
1766 info.CompletionKey = (ULONG_PTR)io;
1767#endif
1768
1770 }
1771
1772 if (status == STATUS_SUCCESS)
1773 {
1774 if (!ioqueue.objcount++)
1775 RtlWakeConditionVariable( &ioqueue.update_event );
1776 }
1777
1779 return status;
1780}
NTSYSAPI VOID NTAPI RtlWakeConditionVariable(_Inout_ PRTL_CONDITION_VARIABLE ConditionVariable)
static void CALLBACK ioqueue_thread_proc(void *param)
Definition: threadpool.c:1620
Definition: fci.c:127

Referenced by TpAllocIoCompletion().

◆ tp_ioqueue_unlock()

static void tp_ioqueue_unlock ( struct threadpool_object io)
static

Definition at line 2223 of file threadpool.c.

2224{
2225 assert( io->type == TP_OBJECT_TYPE_IO );
2226
2228
2229 assert(ioqueue.objcount);
2230
2231 if (!io->shutdown && !--ioqueue.objcount)
2232 NtSetIoCompletion( ioqueue.port, 0, 0, STATUS_SUCCESS, 0 );
2233
2235}
NTSTATUS NTAPI NtSetIoCompletion(IN HANDLE IoCompletionPortHandle, IN PVOID CompletionKey, IN PVOID CompletionContext, IN NTSTATUS CompletionStatus, IN ULONG CompletionInformation)
Definition: iocomp.c:569

Referenced by tp_object_prepare_shutdown().

◆ tp_new_worker_thread()

static NTSTATUS tp_new_worker_thread ( struct threadpool pool)
static

Definition at line 1247 of file threadpool.c.

1248{
1249 HANDLE thread;
1251
1253 pool->stack_info.StackReserve, pool->stack_info.StackCommit,
1255 if (status == STATUS_SUCCESS)
1256 {
1257 InterlockedIncrement( &pool->refcount );
1258 pool->num_workers++;
1259 NtClose( thread );
1260 }
1261 return status;
1262}
#define InterlockedIncrement
Definition: armddk.h:53
static void CALLBACK threadpool_worker_proc(void *param)
Definition: threadpool.c:2476

Referenced by tp_object_submit(), tp_threadpool_lock(), TpCallbackMayRunLong(), and TpSetPoolMinThreads().

◆ tp_object_cancel()

static void tp_object_cancel ( struct threadpool_object object)
static

Definition at line 2163 of file threadpool.c.

2164{
2165 struct threadpool *pool = object->pool;
2166 LONG pending_callbacks = 0;
2167
2169 if (object->num_pending_callbacks)
2170 {
2171 pending_callbacks = object->num_pending_callbacks;
2172 object->num_pending_callbacks = 0;
2173 list_remove( &object->pool_entry );
2174
2175 if (object->type == TP_OBJECT_TYPE_WAIT)
2176 object->u.wait.signaled = 0;
2177 }
2178 if (object->type == TP_OBJECT_TYPE_IO)
2179 {
2180 object->u.io.skipped_count += object->u.io.pending_count;
2181 object->u.io.pending_count = 0;
2182 }
2184
2185 while (pending_callbacks--)
2186 tp_object_release( object );
2187}
long LONG
Definition: pedump.c:60

Referenced by TpReleaseCleanupGroupMembers(), TpWaitForIoCompletion(), TpWaitForTimer(), TpWaitForWait(), and TpWaitForWork().

◆ tp_object_execute()

static void tp_object_execute ( struct threadpool_object object,
BOOL  wait_thread 
)
static

Definition at line 2317 of file threadpool.c.

2318{
2319 TP_CALLBACK_INSTANCE *callback_instance;
2322 struct threadpool *pool = object->pool;
2323 TP_WAIT_RESULT wait_result = 0;
2325
2326 object->num_pending_callbacks--;
2327
2328 /* For wait objects check if they were signaled or have timed out. */
2329 if (object->type == TP_OBJECT_TYPE_WAIT)
2330 {
2331 wait_result = object->u.wait.signaled ? WAIT_OBJECT_0 : WAIT_TIMEOUT;
2332 if (wait_result == WAIT_OBJECT_0) object->u.wait.signaled--;
2333 }
2334 else if (object->type == TP_OBJECT_TYPE_IO)
2335 {
2336 assert( object->u.io.completion_count );
2337 completion = object->u.io.completions[--object->u.io.completion_count];
2338 }
2339
2340 /* Leave critical section and do the actual callback. */
2341 object->num_associated_callbacks++;
2342 object->num_running_callbacks++;
2344 if (wait_thread) RtlLeaveCriticalSection( &waitqueue.cs );
2345
2346 /* Initialize threadpool instance struct. */
2347 callback_instance = (TP_CALLBACK_INSTANCE *)&instance;
2348 instance.object = object;
2349 instance.threadid = GetCurrentThreadId();
2350 instance.associated = TRUE;
2351 instance.may_run_long = object->may_run_long;
2352 instance.cleanup.critical_section = NULL;
2353 instance.cleanup.mutex = NULL;
2354 instance.cleanup.semaphore = NULL;
2355 instance.cleanup.semaphore_count = 0;
2356 instance.cleanup.event = NULL;
2357 instance.cleanup.library = NULL;
2358
2359 switch (object->type)
2360 {
2362 {
2363 TRACE( "executing simple callback %p(%p, %p)\n",
2364 object->u.simple.callback, callback_instance, object->userdata );
2365 object->u.simple.callback( callback_instance, object->userdata );
2366 TRACE( "callback %p returned\n", object->u.simple.callback );
2367 break;
2368 }
2369
2371 {
2372 TRACE( "executing work callback %p(%p, %p, %p)\n",
2373 object->u.work.callback, callback_instance, object->userdata, object );
2374 object->u.work.callback( callback_instance, object->userdata, (TP_WORK *)object );
2375 TRACE( "callback %p returned\n", object->u.work.callback );
2376 break;
2377 }
2378
2380 {
2381 TRACE( "executing timer callback %p(%p, %p, %p)\n",
2382 object->u.timer.callback, callback_instance, object->userdata, object );
2383 object->u.timer.callback( callback_instance, object->userdata, (TP_TIMER *)object );
2384 TRACE( "callback %p returned\n", object->u.timer.callback );
2385 break;
2386 }
2387
2389 {
2390 TRACE( "executing wait callback %p(%p, %p, %p, %lu)\n",
2391 object->u.wait.callback, callback_instance, object->userdata, object, wait_result );
2392 object->u.wait.callback( callback_instance, object->userdata, (TP_WAIT *)object, wait_result );
2393 TRACE( "callback %p returned\n", object->u.wait.callback );
2394 break;
2395 }
2396
2397 case TP_OBJECT_TYPE_IO:
2398 {
2399 TRACE( "executing I/O callback %p(%p, %p, %#Ix, %p, %p)\n",
2400 object->u.io.callback, callback_instance, object->userdata,
2401 completion.cvalue, &completion.iosb, (TP_IO *)object );
2402 object->u.io.callback( callback_instance, object->userdata,
2403 (void *)completion.cvalue, &completion.iosb, (TP_IO *)object );
2404 TRACE( "callback %p returned\n", object->u.io.callback );
2405 break;
2406 }
2407
2408 default:
2409 assert(0);
2410 break;
2411 }
2412
2413 /* Execute finalization callback. */
2414 if (object->finalization_callback)
2415 {
2416 TRACE( "executing finalization callback %p(%p, %p)\n",
2417 object->finalization_callback, callback_instance, object->userdata );
2418 object->finalization_callback( callback_instance, object->userdata );
2419 TRACE( "callback %p returned\n", object->finalization_callback );
2420 }
2421
2422 /* Execute cleanup tasks. */
2423 if (instance.cleanup.critical_section)
2424 {
2425 RtlLeaveCriticalSection( instance.cleanup.critical_section );
2426 }
2427 if (instance.cleanup.mutex)
2428 {
2429 status = NtReleaseMutant( instance.cleanup.mutex, NULL );
2430 if (status != STATUS_SUCCESS) goto skip_cleanup;
2431 }
2432 if (instance.cleanup.semaphore)
2433 {
2434 status = NtReleaseSemaphore( instance.cleanup.semaphore, instance.cleanup.semaphore_count, NULL );
2435 if (status != STATUS_SUCCESS) goto skip_cleanup;
2436 }
2437 if (instance.cleanup.event)
2438 {
2439 status = NtSetEvent( instance.cleanup.event, NULL );
2440 if (status != STATUS_SUCCESS) goto skip_cleanup;
2441 }
2442 if (instance.cleanup.library)
2443 {
2444 LdrUnloadDll( instance.cleanup.library );
2445 }
2446
2447skip_cleanup:
2448 if (wait_thread) RtlEnterCriticalSection( &waitqueue.cs );
2450
2451 /* Simple callbacks are automatically shutdown after execution. */
2452 if (object->type == TP_OBJECT_TYPE_SIMPLE)
2453 {
2455 object->shutdown = TRUE;
2456 }
2457
2458 object->num_running_callbacks--;
2459 if (object_is_finished( object, TRUE ))
2460 RtlWakeAllConditionVariable( &object->group_finished_event );
2461
2462 if (instance.associated)
2463 {
2464 object->num_associated_callbacks--;
2465 if (object_is_finished( object, FALSE ))
2466 RtlWakeAllConditionVariable( &object->finished_event );
2467 }
2468}
#define WAIT_TIMEOUT
Definition: dderror.h:14
NTSTATUS NTAPI LdrUnloadDll(_In_ PVOID BaseAddress)
Definition: ldrapi.c:1291
NTSTATUS NTAPI NtReleaseMutant(IN HANDLE MutantHandle, IN PLONG PreviousCount OPTIONAL)
Definition: mutant.c:296
NTSYSAPI VOID NTAPI RtlWakeAllConditionVariable(_Inout_ PRTL_CONDITION_VARIABLE ConditionVariable)
static BOOL object_is_finished(struct threadpool_object *object, BOOL group)
Definition: threadpool.c:2189
static void tp_object_prepare_shutdown(struct threadpool_object *object)
Definition: threadpool.c:2242
NTSTATUS NTAPI NtReleaseSemaphore(IN HANDLE SemaphoreHandle, IN LONG ReleaseCount, OUT PLONG PreviousCount OPTIONAL)
Definition: sem.c:295
DWORD WINAPI GetCurrentThreadId(void)
Definition: thread.c:459
#define WAIT_OBJECT_0
Definition: winbase.h:439
DWORD TP_WAIT_RESULT
Definition: winnt_old.h:4653
struct _TP_WORK TP_WORK
Definition: winnt_old.h:4647
struct _TP_IO TP_IO
Definition: winnt_old.h:4651
struct _TP_TIMER TP_TIMER
Definition: winnt_old.h:4649
struct _TP_CALLBACK_INSTANCE TP_CALLBACK_INSTANCE
Definition: winnt_old.h:4648

Referenced by threadpool_worker_proc(), and waitqueue_thread_proc().

◆ tp_object_initialize()

static void tp_object_initialize ( struct threadpool_object object,
struct threadpool pool,
PVOID  userdata,
TP_CALLBACK_ENVIRON environment 
)
static

Definition at line 2031 of file threadpool.c.

2033{
2034 BOOL is_simple_callback = (object->type == TP_OBJECT_TYPE_SIMPLE);
2035
2036 object->refcount = 1;
2037 object->shutdown = FALSE;
2038
2039 object->pool = pool;
2040 object->group = NULL;
2041 object->userdata = userdata;
2042 object->group_cancel_callback = NULL;
2043 object->finalization_callback = NULL;
2044 object->may_run_long = 0;
2045 object->race_dll = NULL;
2046 object->priority = TP_CALLBACK_PRIORITY_NORMAL;
2047
2048 memset( &object->group_entry, 0, sizeof(object->group_entry) );
2049 object->is_group_member = FALSE;
2050
2051 memset( &object->pool_entry, 0, sizeof(object->pool_entry) );
2052 RtlInitializeConditionVariable( &object->finished_event );
2053 RtlInitializeConditionVariable( &object->group_finished_event );
2054 object->completed_event = NULL;
2055 object->num_pending_callbacks = 0;
2056 object->num_running_callbacks = 0;
2057 object->num_associated_callbacks = 0;
2058
2059 if (environment)
2060 {
2061 if (environment->Version != 1 && environment->Version != 3)
2062 FIXME( "unsupported environment version %lu\n", environment->Version );
2063
2064 object->group = impl_from_TP_CLEANUP_GROUP( environment->CleanupGroup );
2065 object->group_cancel_callback = environment->CleanupGroupCancelCallback;
2066 object->finalization_callback = environment->FinalizationCallback;
2067 object->may_run_long = environment->u.s.LongFunction != 0;
2068 object->race_dll = environment->RaceDll;
2069#ifndef __REACTOS__ //Windows 7 stuff
2070 if (environment->Version == 3)
2071 {
2072 TP_CALLBACK_ENVIRON_V3 *environment_v3 = (TP_CALLBACK_ENVIRON_V3 *)environment;
2073
2074 object->priority = environment_v3->CallbackPriority;
2075 assert( object->priority < ARRAY_SIZE(pool->pools) );
2076 }
2077#endif
2078 if (environment->ActivationContext)
2079 FIXME( "activation context not supported yet\n" );
2080
2081 if (environment->u.s.Persistent)
2082 FIXME( "persistent threads not supported yet\n" );
2083 }
2084
2085 if (object->race_dll)
2086 LdrAddRefDll( 0, object->race_dll );
2087
2088 TRACE( "allocated object %p of type %u\n", object, object->type );
2089
2090 /* For simple callbacks we have to run tp_object_submit before adding this object
2091 * to the cleanup group. As soon as the cleanup group members are released ->shutdown
2092 * will be set, and tp_object_submit would fail with an assertion. */
2093
2094 if (is_simple_callback)
2095 tp_object_submit( object, FALSE );
2096
2097 if (object->group)
2098 {
2099 struct threadpool_group *group = object->group;
2100 InterlockedIncrement( &group->refcount );
2101
2103 list_add_tail( &group->members, &object->group_entry );
2104 object->is_group_member = TRUE;
2106 }
2107
2108 if (is_simple_callback)
2109 tp_object_release( object );
2110}
static void list_add_tail(struct list_entry *head, struct list_entry *entry)
Definition: list.h:83
NTSTATUS NTAPI LdrAddRefDll(_In_ ULONG Flags, _In_ PVOID BaseAddress)
Definition: ldrapi.c:1205
NTSYSAPI VOID NTAPI RtlInitializeConditionVariable(_Out_ PRTL_CONDITION_VARIABLE ConditionVariable)
static struct threadpool_group * impl_from_TP_CLEANUP_GROUP(TP_CLEANUP_GROUP *group)
Definition: threadpool.c:425
PTP_CLEANUP_GROUP CleanupGroup
Definition: winnt_old.h:4695
PTP_SIMPLE_CALLBACK FinalizationCallback
Definition: winnt_old.h:4699
TP_CALLBACK_PRIORITY CallbackPriority
Definition: winnt_old.h:4708
PTP_CLEANUP_GROUP_CANCEL_CALLBACK CleanupGroupCancelCallback
Definition: winnt_old.h:4696
struct _ACTIVATION_CONTEXT * ActivationContext
Definition: winnt_old.h:4698
@ TP_CALLBACK_PRIORITY_NORMAL
Definition: winnt_old.h:4658

Referenced by tp_alloc_wait(), TpAllocIoCompletion(), TpAllocTimer(), TpAllocWork(), and TpSimpleTryPost().

◆ tp_object_prepare_shutdown()

static void tp_object_prepare_shutdown ( struct threadpool_object object)
static

Definition at line 2242 of file threadpool.c.

2243{
2244 if (object->type == TP_OBJECT_TYPE_TIMER)
2245 tp_timerqueue_unlock( object );
2246 else if (object->type == TP_OBJECT_TYPE_WAIT)
2247 tp_waitqueue_unlock( object );
2248 else if (object->type == TP_OBJECT_TYPE_IO)
2249 tp_ioqueue_unlock( object );
2250}
static void tp_waitqueue_unlock(struct threadpool_object *wait)
Definition: threadpool.c:1598
static void tp_ioqueue_unlock(struct threadpool_object *io)
Definition: threadpool.c:2223
static void tp_timerqueue_unlock(struct threadpool_object *timer)
Definition: threadpool.c:1312

Referenced by tp_object_execute(), TpReleaseCleanupGroupMembers(), TpReleaseIoCompletion(), TpReleaseTimer(), TpReleaseWait(), and TpReleaseWork().

◆ tp_object_prio_queue()

static void tp_object_prio_queue ( struct threadpool_object object)
static

Definition at line 2112 of file threadpool.c.

2113{
2114 ++object->pool->num_busy_workers;
2115 list_add_tail( &object->pool->pools[object->priority], &object->pool_entry );
2116}

Referenced by threadpool_worker_proc(), and tp_object_submit().

◆ tp_object_release()

static BOOL tp_object_release ( struct threadpool_object object)
static

Definition at line 2257 of file threadpool.c.

2258{
2259 if (InterlockedDecrement( &object->refcount ))
2260 return FALSE;
2261
2262 TRACE( "destroying object %p of type %u\n", object, object->type );
2263
2264 assert( object->shutdown );
2265 assert( !object->num_pending_callbacks );
2266 assert( !object->num_running_callbacks );
2267 assert( !object->num_associated_callbacks );
2268
2269 /* release reference to the group */
2270 if (object->group)
2271 {
2272 struct threadpool_group *group = object->group;
2273
2275 if (object->is_group_member)
2276 {
2277 list_remove( &object->group_entry );
2278 object->is_group_member = FALSE;
2279 }
2281
2283 }
2284
2286
2287 if (object->race_dll)
2288 LdrUnloadDll( object->race_dll );
2289
2290 if (object->completed_event && object->completed_event != INVALID_HANDLE_VALUE)
2291 NtSetEvent( object->completed_event, NULL );
2292
2293 RtlFreeHeap( GetProcessHeap(), 0, object );
2294 return TRUE;
2295}
static BOOL tp_group_release(struct threadpool_group *group)
Definition: threadpool.c:2007

Referenced by ioqueue_thread_proc(), threadpool_worker_proc(), tp_object_cancel(), tp_object_initialize(), TpReleaseCleanupGroupMembers(), TpReleaseIoCompletion(), TpReleaseTimer(), TpReleaseWait(), TpReleaseWork(), and waitqueue_thread_proc().

◆ tp_object_submit()

static void tp_object_submit ( struct threadpool_object object,
BOOL  signaled 
)
static

Definition at line 2124 of file threadpool.c.

2125{
2126 struct threadpool *pool = object->pool;
2128
2129 assert( !object->shutdown );
2130 assert( !pool->shutdown );
2131
2133
2134 /* Start new worker threads if required. */
2135 if (pool->num_busy_workers >= pool->num_workers &&
2136 pool->num_workers < pool->max_workers)
2138
2139 /* Queue work item and increment refcount. */
2140 InterlockedIncrement( &object->refcount );
2141 if (!object->num_pending_callbacks++)
2142 tp_object_prio_queue( object );
2143
2144 /* Count how often the object was signaled. */
2145 if (object->type == TP_OBJECT_TYPE_WAIT && signaled)
2146 object->u.wait.signaled++;
2147
2148 /* No new thread started - wake up one existing thread. */
2149 if (status != STATUS_SUCCESS)
2150 {
2151 assert( pool->num_workers > 0 );
2152 RtlWakeConditionVariable( &pool->update_event );
2153 }
2154
2156}
static NTSTATUS tp_new_worker_thread(struct threadpool *pool)
Definition: threadpool.c:1247
#define STATUS_UNSUCCESSFUL
Definition: udferr_usr.h:132

Referenced by ioqueue_thread_proc(), timerqueue_thread_proc(), tp_object_initialize(), TpPostWork(), TpSetTimer(), and waitqueue_thread_proc().

◆ tp_object_wait()

static void tp_object_wait ( struct threadpool_object object,
BOOL  group_wait 
)
static

Definition at line 2208 of file threadpool.c.

2209{
2210 struct threadpool *pool = object->pool;
2211
2213 while (!object_is_finished( object, group_wait ))
2214 {
2215 if (group_wait)
2216 RtlSleepConditionVariableCS( &object->group_finished_event, &pool->cs, NULL );
2217 else
2218 RtlSleepConditionVariableCS( &object->finished_event, &pool->cs, NULL );
2219 }
2221}

Referenced by TpReleaseCleanupGroupMembers(), TpWaitForIoCompletion(), TpWaitForTimer(), TpWaitForWait(), and TpWaitForWork().

◆ tp_threadpool_alloc()

static NTSTATUS tp_threadpool_alloc ( struct threadpool **  out)
static

Definition at line 1787 of file threadpool.c.

1788{
1789#ifdef __REACTOS__
1790 IMAGE_NT_HEADERS *nt = RtlImageNtHeader( NtCurrentTeb()->ProcessEnvironmentBlock->ImageBaseAddress );
1791#else
1793#endif
1794 struct threadpool *pool;
1795 unsigned int i;
1796
1797 pool = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*pool) );
1798 if (!pool)
1799 return STATUS_NO_MEMORY;
1800
1801 pool->refcount = 1;
1802 pool->objcount = 0;
1803 pool->shutdown = FALSE;
1804
1805#ifdef __REACTOS__
1807#else
1809
1810 pool->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool.cs");
1811#endif
1812
1813 for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
1814 list_init( &pool->pools[i] );
1815 RtlInitializeConditionVariable( &pool->update_event );
1816
1817 pool->max_workers = 500;
1818 pool->min_workers = 0;
1819 pool->num_workers = 0;
1820 pool->num_busy_workers = 0;
1821 pool->stack_info.StackReserve = nt->OptionalHeader.SizeOfStackReserve;
1822 pool->stack_info.StackCommit = nt->OptionalHeader.SizeOfStackCommit;
1823
1824 TRACE( "allocated threadpool %p\n", pool );
1825
1826 *out = pool;
1827 return STATUS_SUCCESS;
1828}
#define RtlImageNtHeader
Definition: compat.h:806
PPEB Peb
Definition: dllmain.c:27
#define NtCurrentTeb
IMAGE_NT_HEADERS nt
Definition: module.c:50
IMAGE_OPTIONAL_HEADER32 OptionalHeader
Definition: ntddk_ex.h:184
PVOID ImageBaseAddress
Definition: ntddk_ex.h:245

Referenced by tp_threadpool_lock(), and TpAllocPool().

◆ tp_threadpool_lock()

static NTSTATUS tp_threadpool_lock ( struct threadpool **  out,
TP_CALLBACK_ENVIRON environment 
)
static

Definition at line 1879 of file threadpool.c.

1880{
1881 struct threadpool *pool = NULL;
1883
1884 if (environment)
1885 {
1886#ifndef __REACTOS__ //Windows 7 stuff
1887 /* Validate environment parameters. */
1888 if (environment->Version == 3)
1889 {
1890 TP_CALLBACK_ENVIRON_V3 *environment3 = (TP_CALLBACK_ENVIRON_V3 *)environment;
1891
1892 switch (environment3->CallbackPriority)
1893 {
1897 break;
1898 default:
1900 }
1901 }
1902#endif
1903 pool = (struct threadpool *)environment->Pool;
1904 }
1905
1906 if (!pool)
1907 {
1908 if (!default_threadpool)
1909 {
1911 if (status != STATUS_SUCCESS)
1912 return status;
1913
1915 {
1918 }
1919 }
1920
1922 }
1923
1925
1926 /* Make sure that the threadpool has at least one thread. */
1927 if (!pool->num_workers)
1929
1930 /* Keep a reference, and increment objcount to ensure that the
1931 * last thread doesn't terminate. */
1932 if (status == STATUS_SUCCESS)
1933 {
1934 InterlockedIncrement( &pool->refcount );
1935 pool->objcount++;
1936 }
1937
1939
1940 if (status != STATUS_SUCCESS)
1941 return status;
1942
1943 *out = pool;
1944 return STATUS_SUCCESS;
1945}
static struct threadpool * default_threadpool
Definition: threadpool.c:444
static NTSTATUS tp_threadpool_alloc(struct threadpool **out)
Definition: threadpool.c:1787
static void tp_threadpool_shutdown(struct threadpool *pool)
Definition: threadpool.c:1837
#define STATUS_INVALID_PARAMETER
Definition: udferr_usr.h:135
@ TP_CALLBACK_PRIORITY_HIGH
Definition: winnt_old.h:4657
@ TP_CALLBACK_PRIORITY_LOW
Definition: winnt_old.h:4659

Referenced by tp_alloc_wait(), TpAllocIoCompletion(), TpAllocTimer(), TpAllocWork(), and TpSimpleTryPost().

◆ tp_threadpool_release()

static BOOL tp_threadpool_release ( struct threadpool pool)
static

Definition at line 1850 of file threadpool.c.

1851{
1852 unsigned int i;
1853
1854 if (InterlockedDecrement( &pool->refcount ))
1855 return FALSE;
1856
1857 TRACE( "destroying threadpool %p\n", pool );
1858
1859 assert( pool->shutdown );
1860 assert( !pool->objcount );
1861 for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
1862 assert( list_empty( &pool->pools[i] ) );
1863#ifndef __REACTOS__
1864 pool->cs.DebugInfo->Spare[0] = 0;
1865#endif
1867
1869 return TRUE;
1870}

Referenced by threadpool_worker_proc(), tp_threadpool_lock(), tp_threadpool_unlock(), and TpReleasePool().

◆ tp_threadpool_shutdown()

static void tp_threadpool_shutdown ( struct threadpool pool)
static

Definition at line 1837 of file threadpool.c.

1838{
1840
1841 pool->shutdown = TRUE;
1842 RtlWakeAllConditionVariable( &pool->update_event );
1843}

Referenced by tp_threadpool_lock(), and TpReleasePool().

◆ tp_threadpool_unlock()

static void tp_threadpool_unlock ( struct threadpool pool)
static

Definition at line 1952 of file threadpool.c.

1953{
1955 pool->objcount--;
1958}

Referenced by tp_alloc_wait(), tp_object_release(), TpAllocIoCompletion(), and TpAllocTimer().

◆ tp_timerqueue_lock()

static NTSTATUS tp_timerqueue_lock ( struct threadpool_object timer)
static

Definition at line 1270 of file threadpool.c.

1271{
1273 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1274
1275 timer->u.timer.timer_initialized = FALSE;
1276 timer->u.timer.timer_pending = FALSE;
1277 timer->u.timer.timer_set = FALSE;
1278 timer->u.timer.timeout = 0;
1279 timer->u.timer.period = 0;
1280 timer->u.timer.window_length = 0;
1281
1283
1284 /* Make sure that the timerqueue thread is running. */
1285 if (!timerqueue.thread_running)
1286 {
1287 HANDLE thread;
1290 if (status == STATUS_SUCCESS)
1291 {
1292 timerqueue.thread_running = TRUE;
1293 NtClose( thread );
1294 }
1295 }
1296
1297 if (status == STATUS_SUCCESS)
1298 {
1299 timer->u.timer.timer_initialized = TRUE;
1300 timerqueue.objcount++;
1301 }
1302
1304 return status;
1305}
static void CALLBACK timerqueue_thread_proc(void *param)
Definition: threadpool.c:1149

Referenced by TpAllocTimer().

◆ tp_timerqueue_unlock()

static void tp_timerqueue_unlock ( struct threadpool_object timer)
static

Definition at line 1312 of file threadpool.c.

1313{
1314 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1315
1317 if (timer->u.timer.timer_initialized)
1318 {
1319 /* If timer was pending, remove it. */
1320 if (timer->u.timer.timer_pending)
1321 {
1322 list_remove( &timer->u.timer.timer_entry );
1323 timer->u.timer.timer_pending = FALSE;
1324 }
1325
1326 /* If the last timer object was destroyed, then wake up the thread. */
1327 if (!--timerqueue.objcount)
1328 {
1329 assert( list_empty( &timerqueue.pending_timers ) );
1330 RtlWakeAllConditionVariable( &timerqueue.update_event );
1331 }
1332
1333 timer->u.timer.timer_initialized = FALSE;
1334 }
1336}

Referenced by tp_object_prepare_shutdown().

◆ tp_waitqueue_lock()

static NTSTATUS tp_waitqueue_lock ( struct threadpool_object wait)
static

Definition at line 1520 of file threadpool.c.

1521{
1522 struct waitqueue_bucket *bucket;
1524 HANDLE thread;
1525 BOOL alertable = (wait->u.wait.flags & WT_EXECUTEINIOTHREAD) != 0;
1526 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1527
1528 wait->u.wait.signaled = 0;
1529 wait->u.wait.bucket = NULL;
1530 wait->u.wait.wait_pending = FALSE;
1531 wait->u.wait.timeout = 0;
1532 wait->u.wait.handle = INVALID_HANDLE_VALUE;
1533
1535
1536 /* Try to assign to existing bucket if possible. */
1537 LIST_FOR_EACH_ENTRY( bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
1538 {
1539 if (bucket->objcount < MAXIMUM_WAITQUEUE_OBJECTS && bucket->alertable == alertable)
1540 {
1541 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1542 wait->u.wait.bucket = bucket;
1543 bucket->objcount++;
1544
1546 goto out;
1547 }
1548 }
1549
1550 /* Create a new bucket and corresponding worker thread. */
1551 bucket = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*bucket) );
1552 if (!bucket)
1553 {
1555 goto out;
1556 }
1557
1558 bucket->objcount = 0;
1559 bucket->alertable = alertable;
1560 list_init( &bucket->reserved );
1561 list_init( &bucket->waiting );
1562
1565 if (status)
1566 {
1567 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1568 goto out;
1569 }
1570
1573 if (status == STATUS_SUCCESS)
1574 {
1575 list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
1576 waitqueue.num_buckets++;
1577
1578 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1579 wait->u.wait.bucket = bucket;
1580 bucket->objcount++;
1581
1582 NtClose( thread );
1583 }
1584 else
1585 {
1586 NtClose( bucket->update_event );
1587 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1588 }
1589
1590out:
1592 return status;
1593}
ULONG(NTAPI * PTHREAD_START_ROUTINE)(PVOID Parameter)
Definition: rtltypes.h:560
#define MAXIMUM_WAITQUEUE_OBJECTS
Definition: threadpool.c:145
static void CALLBACK waitqueue_thread_proc(void *param)
Definition: threadpool.c:1344
struct list waiting
Definition: threadpool.c:356
HANDLE update_event
Definition: threadpool.c:357
struct list bucket_entry
Definition: threadpool.c:353
struct list reserved
Definition: threadpool.c:355

Referenced by tp_alloc_wait().

◆ tp_waitqueue_unlock()

static void tp_waitqueue_unlock ( struct threadpool_object wait)
static

Definition at line 1598 of file threadpool.c.

1599{
1600 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1601
1603 if (wait->u.wait.bucket)
1604 {
1605 struct waitqueue_bucket *bucket = wait->u.wait.bucket;
1606 assert( bucket->objcount > 0 );
1607
1608 list_remove( &wait->u.wait.wait_entry );
1609 wait->u.wait.bucket = NULL;
1610 bucket->objcount--;
1611
1612 NtSetEvent( bucket->update_event, NULL );
1613 }
1615}

Referenced by tp_object_prepare_shutdown().

◆ TpAllocCleanupGroup()

NTSTATUS WINAPI TpAllocCleanupGroup ( TP_CLEANUP_GROUP **  out)

Definition at line 2539 of file threadpool.c.

2540{
2541 TRACE( "%p\n", out );
2542
2543 return tp_group_alloc( (struct threadpool_group **)out );
2544}
static NTSTATUS tp_group_alloc(struct threadpool_group **out)
Definition: threadpool.c:1965

Referenced by CreateThreadpoolCleanupGroup(), and init_threadpool().

◆ TpAllocIoCompletion()

NTSTATUS WINAPI TpAllocIoCompletion ( TP_IO **  out,
HANDLE  file,
PTP_IO_CALLBACK  callback,
void userdata,
TP_CALLBACK_ENVIRON environment 
)

Definition at line 2549 of file threadpool.c.

2551{
2552 struct threadpool_object *object;
2553 struct threadpool *pool;
2555
2556 TRACE( "%p %p %p %p %p\n", out, file, callback, userdata, environment );
2557
2558 if (!(object = RtlAllocateHeap( GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(*object) )))
2559 return STATUS_NO_MEMORY;
2560
2561 if ((status = tp_threadpool_lock( &pool, environment )))
2562 {
2563 RtlFreeHeap( GetProcessHeap(), 0, object );
2564 return status;
2565 }
2566
2567 object->type = TP_OBJECT_TYPE_IO;
2568 object->u.io.callback = callback;
2569 if (!(object->u.io.completions = RtlAllocateHeap( GetProcessHeap(), 0, 8 * sizeof(*object->u.io.completions) )))
2570 {
2572 RtlFreeHeap( GetProcessHeap(), 0, object );
2573 return status;
2574 }
2575
2576 if ((status = tp_ioqueue_lock( object, file )))
2577 {
2579 RtlFreeHeap( GetProcessHeap(), 0, object->u.io.completions );
2580 RtlFreeHeap( GetProcessHeap(), 0, object );
2581 return status;
2582 }
2583
2584 tp_object_initialize( object, pool, userdata, environment );
2585
2586 *out = (TP_IO *)object;
2587 return STATUS_SUCCESS;
2588}
#define HEAP_ZERO_MEMORY
Definition: compat.h:134
static NTSTATUS tp_ioqueue_lock(struct threadpool_object *io, HANDLE file)
Definition: threadpool.c:1729

Referenced by CreateThreadpoolIo(), and init_threadpool().

◆ TpAllocPool()

NTSTATUS WINAPI TpAllocPool ( TP_POOL **  out,
PVOID  reserved 
)

Definition at line 2593 of file threadpool.c.

2594{
2595 TRACE( "%p %p\n", out, reserved );
2596
2597 if (reserved)
2598 FIXME( "reserved argument is nonzero (%p)\n", reserved );
2599
2600 return tp_threadpool_alloc( (struct threadpool **)out );
2601}
r reserved
Definition: btrfs.c:3006

Referenced by CreateThreadpool(), and init_threadpool().

◆ TpAllocTimer()

NTSTATUS WINAPI TpAllocTimer ( TP_TIMER **  out,
PTP_TIMER_CALLBACK  callback,
PVOID  userdata,
TP_CALLBACK_ENVIRON environment 
)

Definition at line 2606 of file threadpool.c.

2608{
2609 struct threadpool_object *object;
2610 struct threadpool *pool;
2612
2613 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2614
2615 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2616 if (!object)
2617 return STATUS_NO_MEMORY;
2618
2619 status = tp_threadpool_lock( &pool, environment );
2620 if (status)
2621 {
2622 RtlFreeHeap( GetProcessHeap(), 0, object );
2623 return status;
2624 }
2625
2626 object->type = TP_OBJECT_TYPE_TIMER;
2627 object->u.timer.callback = callback;
2628
2629 status = tp_timerqueue_lock( object );
2630 if (status)
2631 {
2633 RtlFreeHeap( GetProcessHeap(), 0, object );
2634 return status;
2635 }
2636
2637 tp_object_initialize( object, pool, userdata, environment );
2638
2639 *out = (TP_TIMER *)object;
2640 return STATUS_SUCCESS;
2641}
static NTSTATUS tp_timerqueue_lock(struct threadpool_object *timer)
Definition: threadpool.c:1270

Referenced by CreateThreadpoolTimer(), and init_threadpool().

◆ TpAllocWait()

NTSTATUS WINAPI TpAllocWait ( TP_WAIT **  out,
PTP_WAIT_CALLBACK  callback,
PVOID  userdata,
TP_CALLBACK_ENVIRON environment 
)

Definition at line 2682 of file threadpool.c.

2684{
2685 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2686 return tp_alloc_wait( out, callback, userdata, environment, WT_EXECUTEONLYONCE );
2687}

Referenced by CreateThreadpoolWait(), and init_threadpool().

◆ TpAllocWork()

NTSTATUS WINAPI TpAllocWork ( TP_WORK **  out,
PTP_WORK_CALLBACK  callback,
PVOID  userdata,
TP_CALLBACK_ENVIRON environment 
)

Definition at line 2692 of file threadpool.c.

2694{
2695 struct threadpool_object *object;
2696 struct threadpool *pool;
2698
2699 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2700
2701 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2702 if (!object)
2703 return STATUS_NO_MEMORY;
2704
2705 status = tp_threadpool_lock( &pool, environment );
2706 if (status)
2707 {
2708 RtlFreeHeap( GetProcessHeap(), 0, object );
2709 return status;
2710 }
2711
2712 object->type = TP_OBJECT_TYPE_WORK;
2713 object->u.work.callback = callback;
2714 tp_object_initialize( object, pool, userdata, environment );
2715
2716 *out = (TP_WORK *)object;
2717 return STATUS_SUCCESS;
2718}

Referenced by CreateThreadpoolWork(), and init_threadpool().

◆ TpCallbackLeaveCriticalSectionOnCompletion()

VOID WINAPI TpCallbackLeaveCriticalSectionOnCompletion ( TP_CALLBACK_INSTANCE instance,
CRITICAL_SECTION crit 
)

Definition at line 2745 of file threadpool.c.

2746{
2748
2749 TRACE( "%p %p\n", instance, crit );
2750
2751 if (!this->cleanup.critical_section)
2752 this->cleanup.critical_section = crit;
2753}
static void cleanup(void)
Definition: main.c:1335
static struct threadpool_instance * impl_from_TP_CALLBACK_INSTANCE(TP_CALLBACK_INSTANCE *instance)
Definition: threadpool.c:430

◆ TpCallbackMayRunLong()

NTSTATUS WINAPI TpCallbackMayRunLong ( TP_CALLBACK_INSTANCE instance)

Definition at line 2758 of file threadpool.c.

2759{
2761 struct threadpool_object *object = this->object;
2762 struct threadpool *pool;
2764
2765 TRACE( "%p\n", instance );
2766
2767 if (this->threadid != GetCurrentThreadId())
2768 {
2769 ERR("called from wrong thread, ignoring\n");
2770 return STATUS_UNSUCCESSFUL; /* FIXME */
2771 }
2772
2773 if (this->may_run_long)
2774 return STATUS_SUCCESS;
2775
2776 pool = object->pool;
2778
2779 /* Start new worker threads if required. */
2780 if (pool->num_busy_workers >= pool->num_workers)
2781 {
2782 if (pool->num_workers < pool->max_workers)
2783 {
2785 }
2786 else
2787 {
2789 }
2790 }
2791
2793 this->may_run_long = TRUE;
2794 return status;
2795}
#define STATUS_TOO_MANY_THREADS
Definition: ntstatus.h:533

Referenced by CallbackMayRunLong(), and init_threadpool().

◆ TpCallbackReleaseMutexOnCompletion()

VOID WINAPI TpCallbackReleaseMutexOnCompletion ( TP_CALLBACK_INSTANCE instance,
HANDLE  mutex 
)

Definition at line 2800 of file threadpool.c.

2801{
2803
2804 TRACE( "%p %p\n", instance, mutex );
2805
2806 if (!this->cleanup.mutex)
2807 this->cleanup.mutex = mutex;
2808}
Definition: module.h:456

◆ TpCallbackReleaseSemaphoreOnCompletion()

VOID WINAPI TpCallbackReleaseSemaphoreOnCompletion ( TP_CALLBACK_INSTANCE instance,
HANDLE  semaphore,
DWORD  count 
)

Definition at line 2813 of file threadpool.c.

2814{
2816
2817 TRACE( "%p %p %lu\n", instance, semaphore, count );
2818
2819 if (!this->cleanup.semaphore)
2820 {
2821 this->cleanup.semaphore = semaphore;
2822 this->cleanup.semaphore_count = count;
2823 }
2824}
HANDLE semaphore
Definition: threadpool.c:1889

Referenced by init_threadpool().

◆ TpCallbackSetEventOnCompletion()

VOID WINAPI TpCallbackSetEventOnCompletion ( TP_CALLBACK_INSTANCE instance,
HANDLE  event 
)

Definition at line 2829 of file threadpool.c.

2830{
2832
2833 TRACE( "%p %p\n", instance, event );
2834
2835 if (!this->cleanup.event)
2836 this->cleanup.event = event;
2837}

◆ TpCallbackUnloadDllOnCompletion()

VOID WINAPI TpCallbackUnloadDllOnCompletion ( TP_CALLBACK_INSTANCE instance,
HMODULE  module 
)

Definition at line 2842 of file threadpool.c.

2843{
2845
2846 TRACE( "%p %p\n", instance, module );
2847
2848 if (!this->cleanup.library)
2849 this->cleanup.library = module;
2850}

◆ TpCancelAsyncIoOperation()

void WINAPI TpCancelAsyncIoOperation ( TP_IO io)

Definition at line 2723 of file threadpool.c.

2724{
2725 struct threadpool_object *this = impl_from_TP_IO( io );
2726
2727 TRACE( "%p\n", io );
2728
2729 RtlEnterCriticalSection( &this->pool->cs );
2730
2731 TRACE("pending_count %u.\n", this->u.io.pending_count);
2732
2733 this->u.io.pending_count--;
2734 if (object_is_finished( this, TRUE ))
2736 if (object_is_finished( this, FALSE ))
2738
2739 RtlLeaveCriticalSection( &this->pool->cs );
2740}
static struct threadpool_object * impl_from_TP_IO(TP_IO *io)
Definition: threadpool.c:418
RTL_CONDITION_VARIABLE finished_event
Definition: threadpool.c:202
RTL_CONDITION_VARIABLE group_finished_event
Definition: threadpool.c:203

Referenced by init_threadpool().

◆ TpDisassociateCallback()

VOID WINAPI TpDisassociateCallback ( TP_CALLBACK_INSTANCE instance)

Definition at line 2855 of file threadpool.c.

2856{
2858 struct threadpool_object *object = this->object;
2859 struct threadpool *pool;
2860
2861 TRACE( "%p\n", instance );
2862
2863 if (this->threadid != GetCurrentThreadId())
2864 {
2865 ERR("called from wrong thread, ignoring\n");
2866 return;
2867 }
2868
2869 if (!this->associated)
2870 return;
2871
2872 pool = object->pool;
2874
2875 object->num_associated_callbacks--;
2876 if (object_is_finished( object, FALSE ))
2877 RtlWakeAllConditionVariable( &object->finished_event );
2878
2880 this->associated = FALSE;
2881}

Referenced by init_threadpool().

◆ TpIsTimerSet()

BOOL WINAPI TpIsTimerSet ( TP_TIMER timer)

Definition at line 2886 of file threadpool.c.

2887{
2888 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2889
2890 TRACE( "%p\n", timer );
2891
2892 return this->u.timer.timer_set;
2893}
static struct threadpool_object * impl_from_TP_TIMER(TP_TIMER *timer)
Definition: threadpool.c:404

Referenced by init_threadpool().

◆ TpPostWork()

VOID WINAPI TpPostWork ( TP_WORK work)

Definition at line 2898 of file threadpool.c.

2899{
2900 struct threadpool_object *this = impl_from_TP_WORK( work );
2901
2902 TRACE( "%p\n", work );
2903
2904 tp_object_submit( this, FALSE );
2905}
static struct threadpool_object * impl_from_TP_WORK(TP_WORK *work)
Definition: threadpool.c:397

Referenced by init_threadpool().

◆ TpQueryPoolStackInformation()

NTSTATUS WINAPI TpQueryPoolStackInformation ( TP_POOL pool,
TP_POOL_STACK_INFORMATION stack_info 
)

Definition at line 3369 of file threadpool.c.

3370{
3371 struct threadpool *this = impl_from_TP_POOL( pool );
3372
3373 TRACE( "%p %p\n", pool, stack_info );
3374
3375 if (!stack_info)
3377
3378 RtlEnterCriticalSection( &this->cs );
3379 *stack_info = this->stack_info;
3380 RtlLeaveCriticalSection( &this->cs );
3381
3382 return STATUS_SUCCESS;
3383}
CRITICAL_SECTION cs
Definition: threadpool.c:290
static struct threadpool * impl_from_TP_POOL(TP_POOL *pool)
Definition: threadpool.c:392
TP_POOL_STACK_INFORMATION stack_info
Definition: threadpool.c:163

Referenced by init_threadpool(), and QueryThreadpoolStackInformation().

◆ TpReleaseCleanupGroup()

VOID WINAPI TpReleaseCleanupGroup ( TP_CLEANUP_GROUP group)

Definition at line 2910 of file threadpool.c.

2911{
2913
2914 TRACE( "%p\n", group );
2915
2916 tp_group_shutdown( this );
2917 tp_group_release( this );
2918}
static void tp_group_shutdown(struct threadpool_group *group)
Definition: threadpool.c:1997

Referenced by init_threadpool().

◆ TpReleaseCleanupGroupMembers()

VOID WINAPI TpReleaseCleanupGroupMembers ( TP_CLEANUP_GROUP group,
BOOL  cancel_pending,
PVOID  userdata 
)

Definition at line 2923 of file threadpool.c.

2924{
2926 struct threadpool_object *object, *next;
2927 struct list members;
2928
2929 TRACE( "%p %u %p\n", group, cancel_pending, userdata );
2930
2931 RtlEnterCriticalSection( &this->cs );
2932
2933 /* Unset group, increase references, and mark objects for shutdown */
2934 LIST_FOR_EACH_ENTRY_SAFE( object, next, &this->members, struct threadpool_object, group_entry )
2935 {
2936 assert( object->group == this );
2937 assert( object->is_group_member );
2938
2939 if (InterlockedIncrement( &object->refcount ) == 1)
2940 {
2941 /* Object is basically already destroyed, but group reference
2942 * was not deleted yet. We can safely ignore this object. */
2943 InterlockedDecrement( &object->refcount );
2944 list_remove( &object->group_entry );
2945 object->is_group_member = FALSE;
2946 continue;
2947 }
2948
2949 object->is_group_member = FALSE;
2951 }
2952
2953 /* Move members to a new temporary list */
2954 list_init( &members );
2955 list_move_tail( &members, &this->members );
2956
2957 RtlLeaveCriticalSection( &this->cs );
2958
2959 /* Cancel pending callbacks if requested */
2960 if (cancel_pending)
2961 {
2962 LIST_FOR_EACH_ENTRY( object, &members, struct threadpool_object, group_entry )
2963 {
2964 tp_object_cancel( object );
2965 }
2966 }
2967
2968 /* Wait for remaining callbacks to finish */
2969 LIST_FOR_EACH_ENTRY_SAFE( object, next, &members, struct threadpool_object, group_entry )
2970 {
2971 tp_object_wait( object, TRUE );
2972
2973 if (!object->shutdown)
2974 {
2975 /* Execute group cancellation callback if defined, and if this was actually a group cancel. */
2976 if (cancel_pending && object->group_cancel_callback)
2977 {
2978 TRACE( "executing group cancel callback %p(%p, %p)\n",
2979 object->group_cancel_callback, object->userdata, userdata );
2980 object->group_cancel_callback( object->userdata, userdata );
2981 TRACE( "callback %p returned\n", object->group_cancel_callback );
2982 }
2983
2984 if (object->type != TP_OBJECT_TYPE_SIMPLE)
2985 tp_object_release( object );
2986 }
2987
2988 object->shutdown = TRUE;
2989 tp_object_release( object );
2990 }
2991}
static void list_move_tail(struct list_head *list, struct list_head *head)
Definition: list.h:122
static void tp_object_wait(struct threadpool_object *object, BOOL group_wait)
Definition: threadpool.c:2208
static void tp_object_cancel(struct threadpool_object *object)
Definition: threadpool.c:2163

Referenced by init_threadpool().

◆ TpReleaseIoCompletion()

void WINAPI TpReleaseIoCompletion ( TP_IO io)

Definition at line 2996 of file threadpool.c.

2997{
2998 struct threadpool_object *this = impl_from_TP_IO( io );
2999 BOOL can_destroy;
3000
3001 TRACE( "%p\n", io );
3002
3003 RtlEnterCriticalSection( &this->pool->cs );
3004 this->u.io.shutting_down = TRUE;
3005 can_destroy = !this->u.io.pending_count && !this->u.io.skipped_count;
3006 RtlLeaveCriticalSection( &this->pool->cs );
3007
3008 if (can_destroy)
3009 {
3011 this->shutdown = TRUE;
3012 tp_object_release( this );
3013 }
3014}
INT WSAAPI shutdown(IN SOCKET s, IN INT how)
Definition: sockctrl.c:506

Referenced by init_threadpool().

◆ TpReleasePool()

VOID WINAPI TpReleasePool ( TP_POOL pool)

Definition at line 3019 of file threadpool.c.

3020{
3021 struct threadpool *this = impl_from_TP_POOL( pool );
3022
3023 TRACE( "%p\n", pool );
3024
3025 tp_threadpool_shutdown( this );
3026 tp_threadpool_release( this );
3027}

Referenced by init_threadpool().

◆ TpReleaseTimer()

VOID WINAPI TpReleaseTimer ( TP_TIMER timer)

Definition at line 3032 of file threadpool.c.

3033{
3034 struct threadpool_object *this = impl_from_TP_TIMER( timer );
3035
3036 TRACE( "%p\n", timer );
3037
3039 this->shutdown = TRUE;
3040 tp_object_release( this );
3041}

Referenced by init_threadpool().

◆ TpReleaseWait()

VOID WINAPI TpReleaseWait ( TP_WAIT wait)

Definition at line 3046 of file threadpool.c.

3047{
3048 struct threadpool_object *this = impl_from_TP_WAIT( wait );
3049
3050 TRACE( "%p\n", wait );
3051
3053 this->shutdown = TRUE;
3054 tp_object_release( this );
3055}

Referenced by init_threadpool(), and RtlDeregisterWaitEx().

◆ TpReleaseWork()

VOID WINAPI TpReleaseWork ( TP_WORK work)

Definition at line 3060 of file threadpool.c.

3061{
3062 struct threadpool_object *this = impl_from_TP_WORK( work );
3063
3064 TRACE( "%p\n", work );
3065
3067 this->shutdown = TRUE;
3068 tp_object_release( this );
3069}

Referenced by init_threadpool().

◆ TpSetPoolMaxThreads()

VOID WINAPI TpSetPoolMaxThreads ( TP_POOL pool,
DWORD  maximum 
)

Definition at line 3074 of file threadpool.c.

3075{
3076 struct threadpool *this = impl_from_TP_POOL( pool );
3077
3078 TRACE( "%p %lu\n", pool, maximum );
3079
3080 RtlEnterCriticalSection( &this->cs );
3081 this->max_workers = max( maximum, 1 );
3082 this->min_workers = min( this->min_workers, this->max_workers );
3083 RtlLeaveCriticalSection( &this->cs );
3084}
#define min(a, b)
Definition: monoChain.cc:55
int max_workers
Definition: threadpool.c:158
int min_workers
Definition: threadpool.c:159

Referenced by init_threadpool().

◆ TpSetPoolMinThreads()

BOOL WINAPI TpSetPoolMinThreads ( TP_POOL pool,
DWORD  minimum 
)

Definition at line 3089 of file threadpool.c.

3090{
3091 struct threadpool *this = impl_from_TP_POOL( pool );
3093
3094 TRACE( "%p %lu\n", pool, minimum );
3095
3096 RtlEnterCriticalSection( &this->cs );
3097
3098 while (this->num_workers < minimum)
3099 {
3100 status = tp_new_worker_thread( this );
3101 if (status != STATUS_SUCCESS)
3102 break;
3103 }
3104
3105 if (status == STATUS_SUCCESS)
3106 {
3107 this->min_workers = minimum;
3108 this->max_workers = max( this->min_workers, this->max_workers );
3109 }
3110
3111 RtlLeaveCriticalSection( &this->cs );
3112 return !status;
3113}
int num_workers
Definition: threadpool.c:160

◆ TpSetPoolStackInformation()

NTSTATUS WINAPI TpSetPoolStackInformation ( TP_POOL pool,
TP_POOL_STACK_INFORMATION stack_info 
)

Definition at line 3350 of file threadpool.c.

3351{
3352 struct threadpool *this = impl_from_TP_POOL( pool );
3353
3354 TRACE( "%p %p\n", pool, stack_info );
3355
3356 if (!stack_info)
3358
3359 RtlEnterCriticalSection( &this->cs );
3360 this->stack_info = *stack_info;
3361 RtlLeaveCriticalSection( &this->cs );
3362
3363 return STATUS_SUCCESS;
3364}

Referenced by init_threadpool(), and SetThreadpoolStackInformation().

◆ TpSetTimer()

VOID WINAPI TpSetTimer ( TP_TIMER timer,
LARGE_INTEGER timeout,
LONG  period,
LONG  window_length 
)

Definition at line 3118 of file threadpool.c.

3119{
3120 struct threadpool_object *this = impl_from_TP_TIMER( timer );
3121 struct threadpool_object *other_timer;
3122 BOOL submit_timer = FALSE;
3124
3125 TRACE( "%p %p %lu %lu\n", timer, timeout, period, window_length );
3126
3128
3129 assert( this->u.timer.timer_initialized );
3130 this->u.timer.timer_set = timeout != NULL;
3131
3132 /* Convert relative timeout to absolute timestamp and handle a timeout
3133 * of zero, which means that the timer is submitted immediately. */
3134 if (timeout)
3135 {
3136 timestamp = timeout->QuadPart;
3137 if ((LONGLONG)timestamp < 0)
3138 {
3141 timestamp = now.QuadPart - timestamp;
3142 }
3143 else if (!timestamp)
3144 {
3145 if (!period)
3146 timeout = NULL;
3147 else
3148 {
3151 timestamp = now.QuadPart + (ULONGLONG)period * 10000;
3152 }
3153 submit_timer = TRUE;
3154 }
3155 }
3156
3157 /* First remove existing timeout. */
3158 if (this->u.timer.timer_pending)
3159 {
3160 list_remove( &this->u.timer.timer_entry );
3161 this->u.timer.timer_pending = FALSE;
3162 }
3163
3164 /* If the timer was enabled, then add it back to the queue. */
3165 if (timeout)
3166 {
3167 this->u.timer.timeout = timestamp;
3168 this->u.timer.period = period;
3169 this->u.timer.window_length = window_length;
3170
3171 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
3172 struct threadpool_object, u.timer.timer_entry )
3173 {
3174 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
3175 if (this->u.timer.timeout < other_timer->u.timer.timeout)
3176 break;
3177 }
3178 list_add_before( &other_timer->u.timer.timer_entry, &this->u.timer.timer_entry );
3179
3180 /* Wake up the timer thread when the timeout has to be updated. */
3181 if (list_head( &timerqueue.pending_timers ) == &this->u.timer.timer_entry )
3182 RtlWakeAllConditionVariable( &timerqueue.update_event );
3183
3184 this->u.timer.timer_pending = TRUE;
3185 }
3186
3188
3189 if (submit_timer)
3190 tp_object_submit( this, FALSE );
3191}
int64_t LONGLONG
Definition: typedefs.h:68

Referenced by init_threadpool().

◆ TpSetWait()

VOID WINAPI TpSetWait ( TP_WAIT wait,
HANDLE  handle,
LARGE_INTEGER timeout 
)

Definition at line 3196 of file threadpool.c.

3197{
3198 struct threadpool_object *this = impl_from_TP_WAIT( wait );
3200
3201 TRACE( "%p %p %p\n", wait, handle, timeout );
3202
3204
3205 assert( this->u.wait.bucket );
3206 this->u.wait.handle = handle;
3207
3208 if (handle || this->u.wait.wait_pending)
3209 {
3210 struct waitqueue_bucket *bucket = this->u.wait.bucket;
3211 list_remove( &this->u.wait.wait_entry );
3212
3213 /* Convert relative timeout to absolute timestamp. */
3214 if (handle && timeout)
3215 {
3216 timestamp = timeout->QuadPart;
3217 if ((LONGLONG)timestamp < 0)
3218 {
3221 timestamp = now.QuadPart - timestamp;
3222 }
3223 }
3224
3225 /* Add wait object back into one of the queues. */
3226 if (handle)
3227 {
3228 list_add_tail( &bucket->waiting, &this->u.wait.wait_entry );
3229 this->u.wait.wait_pending = TRUE;
3230 this->u.wait.timeout = timestamp;
3231 }
3232 else
3233 {
3234 list_add_tail( &bucket->reserved, &this->u.wait.wait_entry );
3235 this->u.wait.wait_pending = FALSE;
3236 }
3237
3238 /* Wake up the wait queue thread. */
3239 NtSetEvent( bucket->update_event, NULL );
3240 }
3241
3243}

Referenced by init_threadpool(), RtlDeregisterWaitEx(), and RtlRegisterWait().

◆ TpSimpleTryPost()

NTSTATUS WINAPI TpSimpleTryPost ( PTP_SIMPLE_CALLBACK  callback,
PVOID  userdata,
TP_CALLBACK_ENVIRON environment 
)

Definition at line 3248 of file threadpool.c.

3250{
3251 struct threadpool_object *object;
3252 struct threadpool *pool;
3254
3255 TRACE( "%p %p %p\n", callback, userdata, environment );
3256
3257 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
3258 if (!object)
3259 return STATUS_NO_MEMORY;
3260
3261 status = tp_threadpool_lock( &pool, environment );
3262 if (status)
3263 {
3264 RtlFreeHeap( GetProcessHeap(), 0, object );
3265 return status;
3266 }
3267
3268 object->type = TP_OBJECT_TYPE_SIMPLE;
3269 object->u.simple.callback = callback;
3270 tp_object_initialize( object, pool, userdata, environment );
3271
3272 return STATUS_SUCCESS;
3273}

Referenced by init_threadpool(), RtlQueueWorkItem(), and TrySubmitThreadpoolCallback().

◆ TpStartAsyncIoOperation()

void WINAPI TpStartAsyncIoOperation ( TP_IO io)

Definition at line 3278 of file threadpool.c.

3279{
3280 struct threadpool_object *this = impl_from_TP_IO( io );
3281
3282 TRACE( "%p\n", io );
3283
3284 RtlEnterCriticalSection( &this->pool->cs );
3285
3286 this->u.io.pending_count++;
3287
3288 RtlLeaveCriticalSection( &this->pool->cs );
3289}

Referenced by init_threadpool().

◆ TpWaitForIoCompletion()

void WINAPI TpWaitForIoCompletion ( TP_IO io,
BOOL  cancel_pending 
)

Definition at line 3294 of file threadpool.c.

3295{
3296 struct threadpool_object *this = impl_from_TP_IO( io );
3297
3298 TRACE( "%p %d\n", io, cancel_pending );
3299
3300 if (cancel_pending)
3301 tp_object_cancel( this );
3302 tp_object_wait( this, FALSE );
3303}

Referenced by init_threadpool().

◆ TpWaitForTimer()

VOID WINAPI TpWaitForTimer ( TP_TIMER timer,
BOOL  cancel_pending 
)

Definition at line 3308 of file threadpool.c.

3309{
3310 struct threadpool_object *this = impl_from_TP_TIMER( timer );
3311
3312 TRACE( "%p %d\n", timer, cancel_pending );
3313
3314 if (cancel_pending)
3315 tp_object_cancel( this );
3316 tp_object_wait( this, FALSE );
3317}

Referenced by init_threadpool().

◆ TpWaitForWait()

VOID WINAPI TpWaitForWait ( TP_WAIT wait,
BOOL  cancel_pending 
)

Definition at line 3322 of file threadpool.c.

3323{
3324 struct threadpool_object *this = impl_from_TP_WAIT( wait );
3325
3326 TRACE( "%p %d\n", wait, cancel_pending );
3327
3328 if (cancel_pending)
3329 tp_object_cancel( this );
3330 tp_object_wait( this, FALSE );
3331}

Referenced by init_threadpool(), and RtlDeregisterWaitEx().

◆ TpWaitForWork()

VOID WINAPI TpWaitForWork ( TP_WORK work,
BOOL  cancel_pending 
)

Definition at line 3336 of file threadpool.c.

3337{
3338 struct threadpool_object *this = impl_from_TP_WORK( work );
3339
3340 TRACE( "%p %u\n", work, cancel_pending );
3341
3342 if (cancel_pending)
3343 tp_object_cancel( this );
3344 tp_object_wait( this, FALSE );
3345}

Referenced by init_threadpool().

◆ waitqueue_thread_proc()

static void CALLBACK waitqueue_thread_proc ( void param)
static

Definition at line 1344 of file threadpool.c.

1346{
1349 struct waitqueue_bucket *bucket = param;
1350 struct threadpool_object *wait, *next;
1352 DWORD num_handles;
1354
1355 TRACE( "starting wait queue thread\n" );
1356 set_thread_name(L"wine_threadpool_waitqueue");
1357
1359
1360 for (;;)
1361 {
1363 timeout.QuadPart = MAXLONGLONG;
1364 num_handles = 0;
1365
1367 u.wait.wait_entry )
1368 {
1369 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1370 if (wait->u.wait.timeout <= now.QuadPart)
1371 {
1372 /* Wait object timed out. */
1373 if ((wait->u.wait.flags & WT_EXECUTEONLYONCE))
1374 {
1375 list_remove( &wait->u.wait.wait_entry );
1376 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1377 }
1378 if ((wait->u.wait.flags & (WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD)))
1379 {
1380 InterlockedIncrement( &wait->refcount );
1381 wait->num_pending_callbacks++;
1382 RtlEnterCriticalSection( &wait->pool->cs );
1384 RtlLeaveCriticalSection( &wait->pool->cs );
1386 }
1387 else tp_object_submit( wait, FALSE );
1388 }
1389 else
1390 {
1391 if (wait->u.wait.timeout < timeout.QuadPart)
1392 timeout.QuadPart = wait->u.wait.timeout;
1393
1394 assert( num_handles < MAXIMUM_WAITQUEUE_OBJECTS );
1395 InterlockedIncrement( &wait->refcount );
1396 objects[num_handles] = wait;
1397 handles[num_handles] = wait->u.wait.handle;
1398 num_handles++;
1399 }
1400 }
1401
1402 if (!bucket->objcount)
1403 {
1404 /* All wait objects have been destroyed, if no new wait objects are created
1405 * within some amount of time, then we can shutdown this thread. */
1406 assert( num_handles == 0 );
1408 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
1409 status = NtWaitForMultipleObjects( 1, &bucket->update_event, TRUE, bucket->alertable, &timeout );
1411
1412 if (status == STATUS_TIMEOUT && !bucket->objcount)
1413 break;
1414 }
1415 else
1416 {
1417 handles[num_handles] = bucket->update_event;
1419 status = NtWaitForMultipleObjects( num_handles + 1, handles, TRUE, bucket->alertable, &timeout );
1421
1422 if (status >= STATUS_WAIT_0 && status < STATUS_WAIT_0 + num_handles)
1423 {
1425 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1426 if (wait->u.wait.bucket)
1427 {
1428 /* Wait object signaled. */
1429 assert( wait->u.wait.bucket == bucket );
1430 if ((wait->u.wait.flags & WT_EXECUTEONLYONCE))
1431 {
1432 list_remove( &wait->u.wait.wait_entry );
1433 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1434 }
1435 if ((wait->u.wait.flags & (WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD)))
1436 {
1437 wait->u.wait.signaled++;
1438 wait->num_pending_callbacks++;
1439 RtlEnterCriticalSection( &wait->pool->cs );
1441 RtlLeaveCriticalSection( &wait->pool->cs );
1442 }
1443 else tp_object_submit( wait, TRUE );
1444 }
1445 else
1446 WARN("wait object %p triggered while object was destroyed\n", wait);
1447 }
1448
1449 /* Release temporary references to wait objects. */
1450 while (num_handles)
1451 {
1452 wait = objects[--num_handles];
1453 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1455 }
1456 }
1457
1458 /* Try to merge bucket with other threads. */
1459 if (waitqueue.num_buckets > 1 && bucket->objcount &&
1460 bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 1 / 3)
1461 {
1462 struct waitqueue_bucket *other_bucket;
1463 LIST_FOR_EACH_ENTRY( other_bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
1464 {
1465 if (other_bucket != bucket && other_bucket->objcount && other_bucket->alertable == bucket->alertable &&
1466 other_bucket->objcount + bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 2 / 3)
1467 {
1468 other_bucket->objcount += bucket->objcount;
1469 bucket->objcount = 0;
1470
1471 /* Update reserved list. */
1472 LIST_FOR_EACH_ENTRY( wait, &bucket->reserved, struct threadpool_object, u.wait.wait_entry )
1473 {
1474 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1475 wait->u.wait.bucket = other_bucket;
1476 }
1477 list_move_tail( &other_bucket->reserved, &bucket->reserved );
1478
1479 /* Update waiting list. */
1480 LIST_FOR_EACH_ENTRY( wait, &bucket->waiting, struct threadpool_object, u.wait.wait_entry )
1481 {
1482 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1483 wait->u.wait.bucket = other_bucket;
1484 }
1485 list_move_tail( &other_bucket->waiting, &bucket->waiting );
1486
1487 /* Move bucket to the end, to keep the probability of
1488 * newly added wait objects as small as possible. */
1489 list_remove( &bucket->bucket_entry );
1490 list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
1491
1492 NtSetEvent( other_bucket->update_event, NULL );
1493 break;
1494 }
1495 }
1496 }
1497 }
1498
1499 /* Remove this bucket from the list. */
1500 list_remove( &bucket->bucket_entry );
1501 if (!--waitqueue.num_buckets)
1502 assert( list_empty( &waitqueue.buckets ) );
1503
1505
1506 TRACE( "terminating wait queue thread\n" );
1507
1508 assert( bucket->objcount == 0 );
1509 assert( list_empty( &bucket->reserved ) );
1510 assert( list_empty( &bucket->waiting ) );
1511 NtClose( bucket->update_event );
1512
1513 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1514 RtlExitUserThread( 0 );
1515}
#define WARN(fmt,...)
Definition: precomp.h:61
static const CLSID * objects[]
Definition: apphelp.c:112
NTSTATUS NTAPI NtWaitForMultipleObjects(IN ULONG ObjectCount, IN PHANDLE HandleArray, IN WAIT_TYPE WaitType, IN BOOLEAN Alertable, IN PLARGE_INTEGER TimeOut OPTIONAL)
Definition: obwait.c:46
struct waitqueue_bucket * bucket
Definition: threadpool.c:236
static EFI_HANDLE * handles
Definition: uefidisk.c:62

Referenced by tp_waitqueue_lock().

◆ WINE_DEFAULT_DEBUG_CHANNEL()

WINE_DEFAULT_DEBUG_CHANNEL ( threadpool  )

Variable Documentation

◆ buckets

struct list buckets

Definition at line 329 of file threadpool.c.

Referenced by add_hash(), delete_hash_entry(), hash_lookup(), and new_hash().

◆ compl_port

HANDLE compl_port

Definition at line 93 of file threadpool.c.

◆ critsect_compl_debug

static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug
static
Initial value:
=
{
0, 0, &old_threadpool.threadpool_compl_cs,
0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_compl_cs") }
}
static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug
Definition: threadpool.c:88

Definition at line 88 of file threadpool.c.

◆ cs

◆ default_threadpool

struct threadpool* default_threadpool = NULL
static

Definition at line 444 of file threadpool.c.

Referenced by tp_threadpool_lock(), and tp_threadpool_shutdown().

◆ 

struct { ... } ioqueue
Initial value:
=
{
.cs = { &ioqueue_debug, -1, 0, 0, 0, 0 },
}
static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug
Definition: threadpool.c:363

Referenced by ioqueue_thread_proc(), tp_ioqueue_lock(), and tp_ioqueue_unlock().

◆ ioqueue_debug

static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug
static
Initial value:
=
{
0, 0, &ioqueue.cs,
0, 0, { (DWORD_PTR)(__FILE__ ": ioqueue.cs") }
}

Definition at line 363 of file threadpool.c.

◆ num_buckets

LONG num_buckets

Definition at line 328 of file threadpool.c.

Referenced by hash_table_hash(), and hash_table_init().

◆ objcount

LONG objcount

Definition at line 291 of file threadpool.c.

◆ 

struct { ... } old_threadpool
Initial value:
=
{
NULL,
{ &critsect_compl_debug, -1, 0, 0, 0, 0 },
}

Referenced by RtlSetIoCompletionCallback().

◆ pending_timers

struct list pending_timers

Definition at line 293 of file threadpool.c.

◆ port

HANDLE port

Definition at line 371 of file threadpool.c.

◆ thread_running

BOOL thread_running

Definition at line 292 of file threadpool.c.

◆ threadpool_compl_cs

RTL_CRITICAL_SECTION threadpool_compl_cs

Definition at line 94 of file threadpool.c.

◆ 

struct { ... } timerqueue
Initial value:
=
{
{ &timerqueue_debug, -1, 0, 0, 0, 0 },
0,
LIST_INIT( timerqueue.pending_timers ),
}
static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug
Definition: threadpool.c:285
#define LIST_INIT(head)
Definition: queue.h:197
#define RTL_CONDITION_VARIABLE_INIT
Definition: rtltypes.h:282

Referenced by timerqueue_thread_proc(), tp_timerqueue_lock(), tp_timerqueue_unlock(), and TpSetTimer().

◆ timerqueue_debug

static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug
static
Initial value:
=
{
0, 0, &timerqueue.cs,
0, 0, { (DWORD_PTR)(__FILE__ ": timerqueue.cs") }
}

Definition at line 285 of file threadpool.c.

◆ update_event

RTL_CONDITION_VARIABLE update_event

Definition at line 294 of file threadpool.c.

◆ 

struct { ... } waitqueue
Initial value:
=
{
{ &waitqueue_debug, -1, 0, 0, 0, 0 },
0,
}
static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug
Definition: threadpool.c:322

Referenced by RtlRegisterWait(), tp_object_execute(), tp_waitqueue_lock(), tp_waitqueue_unlock(), TpSetWait(), and waitqueue_thread_proc().

◆ waitqueue_debug

static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug
static
Initial value:
=
{
0, 0, &waitqueue.cs,
0, 0, { (DWORD_PTR)(__FILE__ ": waitqueue.cs") }
}

Definition at line 322 of file threadpool.c.