ReactOS 0.4.16-dev-983-g23ad936
threadpool.c File Reference
#include <assert.h>
#include <stdarg.h>
#include <limits.h>
#include "ntstatus.h"
#include "winternl.h"
#include "wine/debug.h"
#include "wine/list.h"
#include "ntdll_misc.h"
Include dependency graph for threadpool.c:

Go to the source code of this file.

Classes

struct  rtl_work_item
 
struct  queue_timer
 
struct  timer_queue
 
struct  threadpool
 
struct  io_completion
 
struct  threadpool_object
 
struct  threadpool_instance
 
struct  threadpool_group
 
struct  waitqueue_bucket
 

Macros

#define WIN32_NO_STATUS
 
#define EXPIRE_NEVER   (~(ULONGLONG)0)
 
#define TIMER_QUEUE_MAGIC   0x516d6954 /* TimQ */
 
#define THREADPOOL_WORKER_TIMEOUT   5000
 
#define MAXIMUM_WAITQUEUE_OBJECTS   (MAXIMUM_WAIT_OBJECTS - 1)
 

Enumerations

enum  threadpool_objtype {
  TP_OBJECT_TYPE_SIMPLE , TP_OBJECT_TYPE_WORK , TP_OBJECT_TYPE_TIMER , TP_OBJECT_TYPE_WAIT ,
  TP_OBJECT_TYPE_IO
}
 

Functions

 WINE_DEFAULT_DEBUG_CHANNEL (threadpool)
 
static struct threadpoolimpl_from_TP_POOL (TP_POOL *pool)
 
static struct threadpool_objectimpl_from_TP_WORK (TP_WORK *work)
 
static struct threadpool_objectimpl_from_TP_TIMER (TP_TIMER *timer)
 
static struct threadpool_objectimpl_from_TP_WAIT (TP_WAIT *wait)
 
static struct threadpool_objectimpl_from_TP_IO (TP_IO *io)
 
static struct threadpool_groupimpl_from_TP_CLEANUP_GROUP (TP_CLEANUP_GROUP *group)
 
static struct threadpool_instanceimpl_from_TP_CALLBACK_INSTANCE (TP_CALLBACK_INSTANCE *instance)
 
static void CALLBACK threadpool_worker_proc (void *param)
 
static void tp_object_submit (struct threadpool_object *object, BOOL signaled)
 
static void tp_object_execute (struct threadpool_object *object, BOOL wait_thread)
 
static void tp_object_prepare_shutdown (struct threadpool_object *object)
 
static BOOL tp_object_release (struct threadpool_object *object)
 
static BOOL array_reserve (void **elements, unsigned int *capacity, unsigned int count, unsigned int size)
 
static void set_thread_name (const WCHAR *name)
 
static void CALLBACK process_rtl_work_item (TP_CALLBACK_INSTANCE *instance, void *userdata)
 
NTSTATUS WINAPI RtlQueueWorkItem (PRTL_WORK_ITEM_ROUTINE function, PVOID context, ULONG flags)
 
static DWORD CALLBACK iocp_poller (LPVOID Arg)
 
NTSTATUS WINAPI RtlSetIoCompletionCallback (HANDLE FileHandle, PRTL_OVERLAPPED_COMPLETION_ROUTINE Function, ULONG Flags)
 
static PLARGE_INTEGER get_nt_timeout (PLARGE_INTEGER pTime, ULONG timeout)
 
static void queue_remove_timer (struct queue_timer *t)
 
static void timer_cleanup_callback (struct queue_timer *t)
 
static DWORD WINAPI timer_callback_wrapper (LPVOID p)
 
static ULONGLONG queue_current_time (void)
 
static void queue_add_timer (struct queue_timer *t, ULONGLONG time, BOOL set_event)
 
static void queue_move_timer (struct queue_timer *t, ULONGLONG time, BOOL set_event)
 
static void queue_timer_expire (struct timer_queue *q)
 
static ULONG queue_get_timeout (struct timer_queue *q)
 
static void WINAPI timer_queue_thread_proc (LPVOID p)
 
static void queue_destroy_timer (struct queue_timer *t)
 
NTSTATUS WINAPI RtlCreateTimerQueue (PHANDLE NewTimerQueue)
 
NTSTATUS WINAPI RtlDeleteTimerQueueEx (HANDLE TimerQueue, HANDLE CompletionEvent)
 
static struct timer_queueget_timer_queue (HANDLE TimerQueue)
 
NTSTATUS WINAPI RtlCreateTimer (HANDLE TimerQueue, HANDLE *NewTimer, RTL_WAITORTIMERCALLBACKFUNC Callback, PVOID Parameter, DWORD DueTime, DWORD Period, ULONG Flags)
 
NTSTATUS WINAPI RtlUpdateTimer (HANDLE TimerQueue, HANDLE Timer, DWORD DueTime, DWORD Period)
 
NTSTATUS WINAPI RtlDeleteTimer (HANDLE TimerQueue, HANDLE Timer, HANDLE CompletionEvent)
 
static void CALLBACK timerqueue_thread_proc (void *param)
 
static NTSTATUS tp_new_worker_thread (struct threadpool *pool)
 
static NTSTATUS tp_timerqueue_lock (struct threadpool_object *timer)
 
static void tp_timerqueue_unlock (struct threadpool_object *timer)
 
static void CALLBACK waitqueue_thread_proc (void *param)
 
static NTSTATUS tp_waitqueue_lock (struct threadpool_object *wait)
 
static void tp_waitqueue_unlock (struct threadpool_object *wait)
 
static void CALLBACK ioqueue_thread_proc (void *param)
 
static NTSTATUS tp_ioqueue_lock (struct threadpool_object *io, HANDLE file)
 
static NTSTATUS tp_threadpool_alloc (struct threadpool **out)
 
static void tp_threadpool_shutdown (struct threadpool *pool)
 
static BOOL tp_threadpool_release (struct threadpool *pool)
 
static NTSTATUS tp_threadpool_lock (struct threadpool **out, TP_CALLBACK_ENVIRON *environment)
 
static void tp_threadpool_unlock (struct threadpool *pool)
 
static NTSTATUS tp_group_alloc (struct threadpool_group **out)
 
static void tp_group_shutdown (struct threadpool_group *group)
 
static BOOL tp_group_release (struct threadpool_group *group)
 
static void tp_object_initialize (struct threadpool_object *object, struct threadpool *pool, PVOID userdata, TP_CALLBACK_ENVIRON *environment)
 
static void tp_object_prio_queue (struct threadpool_object *object)
 
static void tp_object_cancel (struct threadpool_object *object)
 
static BOOL object_is_finished (struct threadpool_object *object, BOOL group)
 
static void tp_object_wait (struct threadpool_object *object, BOOL group_wait)
 
static void tp_ioqueue_unlock (struct threadpool_object *io)
 
static struct listthreadpool_get_next_item (const struct threadpool *pool)
 
NTSTATUS WINAPI TpAllocCleanupGroup (TP_CLEANUP_GROUP **out)
 
NTSTATUS WINAPI TpAllocIoCompletion (TP_IO **out, HANDLE file, PTP_IO_CALLBACK callback, void *userdata, TP_CALLBACK_ENVIRON *environment)
 
NTSTATUS WINAPI TpAllocPool (TP_POOL **out, PVOID reserved)
 
NTSTATUS WINAPI TpAllocTimer (TP_TIMER **out, PTP_TIMER_CALLBACK callback, PVOID userdata, TP_CALLBACK_ENVIRON *environment)
 
static NTSTATUS tp_alloc_wait (TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata, TP_CALLBACK_ENVIRON *environment, DWORD flags)
 
NTSTATUS WINAPI TpAllocWait (TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata, TP_CALLBACK_ENVIRON *environment)
 
NTSTATUS WINAPI TpAllocWork (TP_WORK **out, PTP_WORK_CALLBACK callback, PVOID userdata, TP_CALLBACK_ENVIRON *environment)
 
void WINAPI TpCancelAsyncIoOperation (TP_IO *io)
 
VOID WINAPI TpCallbackLeaveCriticalSectionOnCompletion (TP_CALLBACK_INSTANCE *instance, CRITICAL_SECTION *crit)
 
NTSTATUS WINAPI TpCallbackMayRunLong (TP_CALLBACK_INSTANCE *instance)
 
VOID WINAPI TpCallbackReleaseMutexOnCompletion (TP_CALLBACK_INSTANCE *instance, HANDLE mutex)
 
VOID WINAPI TpCallbackReleaseSemaphoreOnCompletion (TP_CALLBACK_INSTANCE *instance, HANDLE semaphore, DWORD count)
 
VOID WINAPI TpCallbackSetEventOnCompletion (TP_CALLBACK_INSTANCE *instance, HANDLE event)
 
VOID WINAPI TpCallbackUnloadDllOnCompletion (TP_CALLBACK_INSTANCE *instance, HMODULE module)
 
VOID WINAPI TpDisassociateCallback (TP_CALLBACK_INSTANCE *instance)
 
BOOL WINAPI TpIsTimerSet (TP_TIMER *timer)
 
VOID WINAPI TpPostWork (TP_WORK *work)
 
VOID WINAPI TpReleaseCleanupGroup (TP_CLEANUP_GROUP *group)
 
VOID WINAPI TpReleaseCleanupGroupMembers (TP_CLEANUP_GROUP *group, BOOL cancel_pending, PVOID userdata)
 
void WINAPI TpReleaseIoCompletion (TP_IO *io)
 
VOID WINAPI TpReleasePool (TP_POOL *pool)
 
VOID WINAPI TpReleaseTimer (TP_TIMER *timer)
 
VOID WINAPI TpReleaseWait (TP_WAIT *wait)
 
VOID WINAPI TpReleaseWork (TP_WORK *work)
 
VOID WINAPI TpSetPoolMaxThreads (TP_POOL *pool, DWORD maximum)
 
BOOL WINAPI TpSetPoolMinThreads (TP_POOL *pool, DWORD minimum)
 
VOID WINAPI TpSetTimer (TP_TIMER *timer, LARGE_INTEGER *timeout, LONG period, LONG window_length)
 
VOID WINAPI TpSetWait (TP_WAIT *wait, HANDLE handle, LARGE_INTEGER *timeout)
 
NTSTATUS WINAPI TpSimpleTryPost (PTP_SIMPLE_CALLBACK callback, PVOID userdata, TP_CALLBACK_ENVIRON *environment)
 
void WINAPI TpStartAsyncIoOperation (TP_IO *io)
 
void WINAPI TpWaitForIoCompletion (TP_IO *io, BOOL cancel_pending)
 
VOID WINAPI TpWaitForTimer (TP_TIMER *timer, BOOL cancel_pending)
 
VOID WINAPI TpWaitForWait (TP_WAIT *wait, BOOL cancel_pending)
 
VOID WINAPI TpWaitForWork (TP_WORK *work, BOOL cancel_pending)
 
NTSTATUS WINAPI TpSetPoolStackInformation (TP_POOL *pool, TP_POOL_STACK_INFORMATION *stack_info)
 
NTSTATUS WINAPI TpQueryPoolStackInformation (TP_POOL *pool, TP_POOL_STACK_INFORMATION *stack_info)
 
static void CALLBACK rtl_wait_callback (TP_CALLBACK_INSTANCE *instance, void *userdata, TP_WAIT *wait, TP_WAIT_RESULT result)
 
NTSTATUS WINAPI RtlRegisterWait (HANDLE *out, HANDLE handle, RTL_WAITORTIMERCALLBACKFUNC callback, void *context, ULONG milliseconds, ULONG flags)
 
NTSTATUS WINAPI RtlDeregisterWaitEx (HANDLE handle, HANDLE event)
 
NTSTATUS WINAPI RtlDeregisterWait (HANDLE WaitHandle)
 

Variables

static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug
 
struct {
   HANDLE   compl_port
 
   RTL_CRITICAL_SECTION   threadpool_compl_cs
 
old_threadpool
 
static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug
 
struct {
   CRITICAL_SECTION   cs
 
   LONG   objcount
 
   BOOL   thread_running
 
   struct list   pending_timers
 
   RTL_CONDITION_VARIABLE   update_event
 
timerqueue
 
static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug
 
struct {
   CRITICAL_SECTION   cs
 
   LONG   num_buckets
 
   struct list   buckets
 
waitqueue
 
static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug
 
struct {
   CRITICAL_SECTION   cs
 
   LONG   objcount
 
   BOOL   thread_running
 
   HANDLE   port
 
   RTL_CONDITION_VARIABLE   update_event
 
ioqueue
 
static struct threadpooldefault_threadpool = NULL
 

Macro Definition Documentation

◆ EXPIRE_NEVER

#define EXPIRE_NEVER   (~(ULONGLONG)0)

Definition at line 84 of file threadpool.c.

◆ MAXIMUM_WAITQUEUE_OBJECTS

#define MAXIMUM_WAITQUEUE_OBJECTS   (MAXIMUM_WAIT_OBJECTS - 1)

Definition at line 145 of file threadpool.c.

◆ THREADPOOL_WORKER_TIMEOUT

#define THREADPOOL_WORKER_TIMEOUT   5000

Definition at line 144 of file threadpool.c.

◆ TIMER_QUEUE_MAGIC

#define TIMER_QUEUE_MAGIC   0x516d6954 /* TimQ */

Definition at line 85 of file threadpool.c.

◆ WIN32_NO_STATUS

#define WIN32_NO_STATUS

Definition at line 63 of file threadpool.c.

Enumeration Type Documentation

◆ threadpool_objtype

Enumerator
TP_OBJECT_TYPE_SIMPLE 
TP_OBJECT_TYPE_WORK 
TP_OBJECT_TYPE_TIMER 
TP_OBJECT_TYPE_WAIT 
TP_OBJECT_TYPE_IO 

Definition at line 166 of file threadpool.c.

167{
173};
@ TP_OBJECT_TYPE_SIMPLE
Definition: threadpool.c:168
@ TP_OBJECT_TYPE_WORK
Definition: threadpool.c:169
@ TP_OBJECT_TYPE_TIMER
Definition: threadpool.c:170
@ TP_OBJECT_TYPE_IO
Definition: threadpool.c:172
@ TP_OBJECT_TYPE_WAIT
Definition: threadpool.c:171

Function Documentation

◆ array_reserve()

static BOOL array_reserve ( void **  elements,
unsigned int capacity,
unsigned int  count,
unsigned int  size 
)
static

Definition at line 446 of file threadpool.c.

447{
448 unsigned int new_capacity, max_capacity;
449 void *new_elements;
450
451 if (count <= *capacity)
452 return TRUE;
453
454 max_capacity = ~(SIZE_T)0 / size;
455 if (count > max_capacity)
456 return FALSE;
457
458 new_capacity = max(4, *capacity);
459 while (new_capacity < count && new_capacity <= max_capacity / 2)
460 new_capacity *= 2;
461 if (new_capacity < count)
462 new_capacity = max_capacity;
463
464 if (!(new_elements = RtlReAllocateHeap( GetProcessHeap(), 0, *elements, new_capacity * size )))
465 return FALSE;
466
467 *elements = new_elements;
468 *capacity = new_capacity;
469
470 return TRUE;
471}
#define TRUE
Definition: types.h:120
#define FALSE
Definition: types.h:117
#define GetProcessHeap()
Definition: compat.h:736
GLuint GLuint GLsizei count
Definition: gl.h:1545
GLsizeiptr size
Definition: glext.h:5919
NTSYSAPI PVOID WINAPI RtlReAllocateHeap(HANDLE, ULONG, PVOID, SIZE_T)
Definition: heap.c:2686
#define max(a, b)
Definition: svc.c:63
ULONG_PTR SIZE_T
Definition: typedefs.h:80

Referenced by ioqueue_thread_proc().

◆ get_nt_timeout()

static PLARGE_INTEGER get_nt_timeout ( PLARGE_INTEGER  pTime,
ULONG  timeout 
)
inlinestatic

Definition at line 632 of file threadpool.c.

633{
634 if (timeout == INFINITE) return NULL;
635 pTime->QuadPart = (ULONGLONG)timeout * -10000;
636 return pTime;
637}
#define NULL
Definition: types.h:112
#define INFINITE
Definition: serial.h:102
Definition: dhcpd.h:245
uint64_t ULONGLONG
Definition: typedefs.h:67
_In_ PCCERT_CONTEXT _In_opt_ LPFILETIME pTime
Definition: wincrypt.h:4837

Referenced by RtlRegisterWait(), and timer_queue_thread_proc().

◆ get_timer_queue()

static struct timer_queue * get_timer_queue ( HANDLE  TimerQueue)
static

Definition at line 957 of file threadpool.c.

958{
959 static struct timer_queue *default_timer_queue;
960
961 if (TimerQueue)
962 return TimerQueue;
963 else
964 {
965 if (!default_timer_queue)
966 {
967 HANDLE q;
969 if (status == STATUS_SUCCESS)
970 {
971 PVOID p = InterlockedCompareExchangePointer( (void **) &default_timer_queue, q, NULL );
972 if (p)
973 /* Got beat to the punch. */
975 }
976 }
977 return default_timer_queue;
978 }
979}
LONG NTSTATUS
Definition: precomp.h:26
GLdouble GLdouble GLdouble GLdouble q
Definition: gl.h:2063
GLfloat GLfloat p
Definition: glext.h:8902
#define InterlockedCompareExchangePointer
Definition: interlocked.h:129
NTSTATUS WINAPI RtlCreateTimerQueue(PHANDLE NewTimerQueue)
Definition: threadpool.c:867
NTSTATUS WINAPI RtlDeleteTimerQueueEx(HANDLE TimerQueue, HANDLE CompletionEvent)
Definition: threadpool.c:913
#define STATUS_SUCCESS
Definition: shellext.h:65
Definition: ps.c:97

Referenced by RtlCreateTimer().

◆ impl_from_TP_CALLBACK_INSTANCE()

◆ impl_from_TP_CLEANUP_GROUP()

static struct threadpool_group * impl_from_TP_CLEANUP_GROUP ( TP_CLEANUP_GROUP group)
inlinestatic

Definition at line 425 of file threadpool.c.

426{
427 return (struct threadpool_group *)group;
428}
GLboolean GLuint group
Definition: glext.h:11120

Referenced by tp_object_initialize(), TpReleaseCleanupGroup(), and TpReleaseCleanupGroupMembers().

◆ impl_from_TP_IO()

static struct threadpool_object * impl_from_TP_IO ( TP_IO io)
inlinestatic

Definition at line 418 of file threadpool.c.

419{
420 struct threadpool_object *object = (struct threadpool_object *)io;
421 assert( object->type == TP_OBJECT_TYPE_IO );
422 return object;
423}
#define assert(x)
Definition: debug.h:53
static HANDLE PIO_APC_ROUTINE PVOID PIO_STATUS_BLOCK io
Definition: file.c:100

Referenced by TpCancelAsyncIoOperation(), TpReleaseIoCompletion(), TpStartAsyncIoOperation(), and TpWaitForIoCompletion().

◆ impl_from_TP_POOL()

static struct threadpool * impl_from_TP_POOL ( TP_POOL pool)
inlinestatic

◆ impl_from_TP_TIMER()

static struct threadpool_object * impl_from_TP_TIMER ( TP_TIMER timer)
inlinestatic

Definition at line 404 of file threadpool.c.

405{
406 struct threadpool_object *object = (struct threadpool_object *)timer;
408 return object;
409}
struct threadpool_object::@5143::@5146 timer

Referenced by TpIsTimerSet(), TpReleaseTimer(), TpSetTimer(), and TpWaitForTimer().

◆ impl_from_TP_WAIT()

static struct threadpool_object * impl_from_TP_WAIT ( TP_WAIT wait)
inlinestatic

Definition at line 411 of file threadpool.c.

412{
413 struct threadpool_object *object = (struct threadpool_object *)wait;
415 return object;
416}
struct threadpool_object::@5143::@5147 wait

Referenced by rtl_wait_callback(), RtlRegisterWait(), TpReleaseWait(), TpSetWait(), and TpWaitForWait().

◆ impl_from_TP_WORK()

static struct threadpool_object * impl_from_TP_WORK ( TP_WORK work)
inlinestatic

Definition at line 397 of file threadpool.c.

398{
399 struct threadpool_object *object = (struct threadpool_object *)work;
401 return object;
402}
struct threadpool_object::@5143::@5145 work

Referenced by TpPostWork(), TpReleaseWork(), and TpWaitForWork().

◆ iocp_poller()

static DWORD CALLBACK iocp_poller ( LPVOID  Arg)
static

Definition at line 544 of file threadpool.c.

545{
546 HANDLE cport = Arg;
547
548 while( TRUE )
549 {
553#ifdef __REACTOS__
555#else
557#endif
558 if (res)
559 {
560 ERR("NtRemoveIoCompletion failed: 0x%lx\n", res);
561 }
562 else
563 {
564 DWORD transferred = 0;
565 DWORD err = 0;
566
568 transferred = iosb.Information;
569 else
571
572 callback( err, transferred, overlapped );
573 }
574 }
575 return 0;
576}
#define ERR(fmt,...)
Definition: precomp.h:57
unsigned long DWORD
Definition: ntddk_ex.h:95
GLuint res
Definition: glext.h:9613
VOID(CALLBACK * PRTL_OVERLAPPED_COMPLETION_ROUTINE)(DWORD, DWORD, LPVOID)
Definition: winternl.h:2016
NTSYSAPI ULONG WINAPI RtlNtStatusToDosError(NTSTATUS)
NTSTATUS NTAPI NtRemoveIoCompletion(IN HANDLE IoCompletionHandle, OUT PVOID *KeyContext, OUT PVOID *ApcContext, OUT PIO_STATUS_BLOCK IoStatusBlock, IN PLARGE_INTEGER Timeout OPTIONAL)
Definition: iocomp.c:445
static IPrintDialogCallback callback
Definition: printdlg.c:326
static PIO_STATUS_BLOCK iosb
Definition: file.c:98
#define err(...)
namespace GUID const ADDRINFOEXW ADDRINFOEXW struct timeval OVERLAPPED * overlapped
Definition: sock.c:81
uint32_t * PULONG_PTR
Definition: typedefs.h:65

Referenced by RtlSetIoCompletionCallback().

◆ ioqueue_thread_proc()

static void CALLBACK ioqueue_thread_proc ( void param)
static

Definition at line 1618 of file threadpool.c.

1620{
1621 struct io_completion *completion;
1622 struct threadpool_object *io;
1624#ifdef __REACTOS__
1625 PVOID key, value;
1626#else
1628#endif
1629 BOOL destroy, skip;
1631
1632 TRACE( "starting I/O completion thread\n" );
1633 set_thread_name(L"wine_threadpool_ioqueue");
1634
1636
1637 for (;;)
1638 {
1640 if ((status = NtRemoveIoCompletion( ioqueue.port, &key, &value, &iosb, NULL )))
1641 ERR("NtRemoveIoCompletion failed, status %#lx.\n", status);
1643
1644 destroy = skip = FALSE;
1645 io = (struct threadpool_object *)key;
1646
1647 TRACE( "io %p, iosb.Status %#lx.\n", io, iosb.Status );
1648
1649 if (io && (io->shutdown || io->u.io.shutting_down))
1650 {
1651 RtlEnterCriticalSection( &io->pool->cs );
1652 if (!io->u.io.pending_count)
1653 {
1654 if (io->u.io.skipped_count)
1655 --io->u.io.skipped_count;
1656
1657 if (io->u.io.skipped_count)
1658 skip = TRUE;
1659 else
1660 destroy = TRUE;
1661 }
1662 RtlLeaveCriticalSection( &io->pool->cs );
1663 if (skip) continue;
1664 }
1665
1666 if (destroy)
1667 {
1668 --ioqueue.objcount;
1669 TRACE( "Releasing io %p.\n", io );
1670 io->shutdown = TRUE;
1672 }
1673 else if (io)
1674 {
1675 RtlEnterCriticalSection( &io->pool->cs );
1676
1677 TRACE( "pending_count %u.\n", io->u.io.pending_count );
1678
1679 if (io->u.io.pending_count)
1680 {
1681 --io->u.io.pending_count;
1682 if (!array_reserve((void **)&io->u.io.completions, &io->u.io.completion_max,
1683 io->u.io.completion_count + 1, sizeof(*io->u.io.completions)))
1684 {
1685 ERR( "Failed to allocate memory.\n" );
1686 RtlLeaveCriticalSection( &io->pool->cs );
1687 continue;
1688 }
1689
1690 completion = &io->u.io.completions[io->u.io.completion_count++];
1691 completion->iosb = iosb;
1692#ifdef __REACTOS__
1693 completion->cvalue = (ULONG_PTR)value;
1694#else
1695 completion->cvalue = value;
1696#endif
1697
1699 }
1700 RtlLeaveCriticalSection( &io->pool->cs );
1701 }
1702
1703 if (!ioqueue.objcount)
1704 {
1705 /* All I/O objects have been destroyed; if no new objects are
1706 * created within some amount of time, then we can shutdown this
1707 * thread. */
1708 LARGE_INTEGER timeout = {.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000};
1709 if (RtlSleepConditionVariableCS( &ioqueue.update_event, &ioqueue.cs,
1710 &timeout) == STATUS_TIMEOUT && !ioqueue.objcount)
1711 break;
1712 }
1713 }
1714
1715 ioqueue.thread_running = FALSE;
1717
1718 TRACE( "terminating I/O completion thread\n" );
1719
1720 RtlExitUserThread( 0 );
1721
1722#ifdef __REACTOS__
1723 return STATUS_SUCCESS;
1724#endif
1725}
void CALLBACK completion(DWORD dwError, DWORD cbTransferred, LPWSAOVERLAPPED lpOverlapped, DWORD dwFlags)
Definition: WSARecv.c:16
void destroy(_Tp *__pointer)
Definition: _construct.h:278
#define skip(...)
Definition: atltest.h:64
#define STATUS_TIMEOUT
Definition: d3dkmdt.h:49
#define ULONG_PTR
Definition: config.h:101
unsigned int BOOL
Definition: ntddk_ex.h:94
NTSYSAPI NTSTATUS NTAPI RtlSleepConditionVariableCS(_Inout_ PRTL_CONDITION_VARIABLE ConditionVariable, _Inout_ PRTL_CRITICAL_SECTION CriticalSection, _In_opt_ PLARGE_INTEGER TimeOut)
NTSYSAPI NTSTATUS NTAPI RtlEnterCriticalSection(_In_ PRTL_CRITICAL_SECTION CriticalSection)
NTSYSAPI VOID NTAPI RtlExitUserThread(_In_ NTSTATUS Status)
NTSYSAPI NTSTATUS NTAPI RtlLeaveCriticalSection(_In_ PRTL_CRITICAL_SECTION CriticalSection)
#define L(x)
Definition: ntvdm.h:50
static BOOL tp_object_release(struct threadpool_object *object)
Definition: threadpool.c:2255
static void set_thread_name(const WCHAR *name)
Definition: threadpool.c:473
static BOOL array_reserve(void **elements, unsigned int *capacity, unsigned int count, unsigned int size)
Definition: threadpool.c:446
static void tp_object_submit(struct threadpool_object *object, BOOL signaled)
Definition: threadpool.c:2122
#define THREADPOOL_WORKER_TIMEOUT
Definition: threadpool.c:144
static struct @5142 ioqueue
#define TRACE(s)
Definition: solgame.cpp:4
Definition: copy.c:22
uint32_t ULONG_PTR
Definition: typedefs.h:65
Definition: pdh_main.c:96

Referenced by tp_ioqueue_lock().

◆ object_is_finished()

static BOOL object_is_finished ( struct threadpool_object object,
BOOL  group 
)
static

Definition at line 2187 of file threadpool.c.

2188{
2189 if (object->num_pending_callbacks)
2190 return FALSE;
2191 if (object->type == TP_OBJECT_TYPE_IO && object->u.io.pending_count)
2192 return FALSE;
2193
2194 if (group)
2195 return !object->num_running_callbacks;
2196 else
2197 return !object->num_associated_callbacks;
2198}

Referenced by tp_object_execute(), tp_object_wait(), TpCancelAsyncIoOperation(), and TpDisassociateCallback().

◆ process_rtl_work_item()

static void CALLBACK process_rtl_work_item ( TP_CALLBACK_INSTANCE instance,
void userdata 
)
static

Definition at line 484 of file threadpool.c.

485{
486 struct rtl_work_item *item = userdata;
487
488 TRACE("executing %p(%p)\n", item->function, item->context);
489 item->function( item->context );
490
492}
BOOLEAN NTAPI RtlFreeHeap(IN PVOID HeapHandle, IN ULONG Flags, IN PVOID HeapBase)
Definition: heap.c:634
static ATOM item
Definition: dde.c:856

Referenced by RtlQueueWorkItem().

◆ queue_add_timer()

static void queue_add_timer ( struct queue_timer t,
ULONGLONG  time,
BOOL  set_event 
)
static

Definition at line 689 of file threadpool.c.

691{
692 /* We MUST hold the queue cs while calling this function. */
693 struct timer_queue *q = t->q;
694 struct list *ptr = &q->timers;
695
696 assert(!q->quit || (t->destroy && time == EXPIRE_NEVER));
697
698 if (time != EXPIRE_NEVER)
699 LIST_FOR_EACH(ptr, &q->timers)
700 {
701 struct queue_timer *cur = LIST_ENTRY(ptr, struct queue_timer, entry);
702 if (time < cur->expire)
703 break;
704 }
705 list_add_before(ptr, &t->entry);
706
707 t->expire = time;
708
709 /* If we insert at the head of the list, we need to expire sooner
710 than expected. */
711 if (set_event && &t->entry == list_head(&q->timers))
712 NtSetEvent(q->event, NULL);
713}
Definition: list.h:37
FxCollectionEntry * cur
GLdouble GLdouble t
Definition: gl.h:2047
uint32_t entry
Definition: isohybrid.c:63
__u16 time
Definition: mkdosfs.c:8
static PVOID ptr
Definition: dispmode.c:27
NTSTATUS NTAPI NtSetEvent(IN HANDLE EventHandle, OUT PLONG PreviousState OPTIONAL)
Definition: event.c:455
#define LIST_FOR_EACH(cursor, list)
Definition: list.h:188
__WINE_SERVER_LIST_INLINE void list_add_before(struct list *elem, struct list *to_add)
Definition: list.h:87
#define EXPIRE_NEVER
Definition: threadpool.c:84
Definition: list.h:15
ULONGLONG expire
Definition: threadpool.c:125
#define LIST_ENTRY(type)
Definition: queue.h:175

Referenced by queue_move_timer(), and RtlCreateTimer().

◆ queue_current_time()

static ULONGLONG queue_current_time ( void  )
inlinestatic

Definition at line 682 of file threadpool.c.

683{
684 LARGE_INTEGER now, freq;
686 return now.QuadPart * 1000 / freq.QuadPart;
687}
time_t now
Definition: finger.c:65
NTSTATUS NTAPI NtQueryPerformanceCounter(OUT PLARGE_INTEGER PerformanceCounter, OUT PLARGE_INTEGER PerformanceFrequency OPTIONAL)
Definition: profile.c:278
LONGLONG QuadPart
Definition: typedefs.h:114

Referenced by queue_get_timeout(), queue_timer_expire(), RtlCreateTimer(), and RtlUpdateTimer().

◆ queue_destroy_timer()

static void queue_destroy_timer ( struct queue_timer t)
static

Definition at line 840 of file threadpool.c.

841{
842 /* We MUST hold the queue cs while calling this function. */
843 t->destroy = TRUE;
844 if (t->runcount == 0)
845 /* Ensure a timer is promptly removed. If callbacks are pending,
846 it will be removed after the last one finishes by the callback
847 cleanup wrapper. */
849 else
850 /* Make sure no destroyed timer masks an active timer at the head
851 of the sorted list. */
853}
static void queue_move_timer(struct queue_timer *t, ULONGLONG time, BOOL set_event)
Definition: threadpool.c:715
static void queue_remove_timer(struct queue_timer *t)
Definition: threadpool.c:641

Referenced by RtlDeleteTimer(), and RtlDeleteTimerQueueEx().

◆ queue_get_timeout()

static ULONG queue_get_timeout ( struct timer_queue q)
static

Definition at line 768 of file threadpool.c.

769{
770 struct queue_timer *t;
772
774 if (list_head(&q->timers))
775 {
776 t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
777 assert(!t->destroy || t->expire == EXPIRE_NEVER);
778
779 if (t->expire != EXPIRE_NEVER)
780 {
782 timeout = t->expire < time ? 0 : t->expire - time;
783 }
784 }
786
787 return timeout;
788}
static ULONGLONG queue_current_time(void)
Definition: threadpool.c:682
uint32_t ULONG
Definition: typedefs.h:59

Referenced by timer_queue_thread_proc().

◆ queue_move_timer()

static void queue_move_timer ( struct queue_timer t,
ULONGLONG  time,
BOOL  set_event 
)
inlinestatic

Definition at line 715 of file threadpool.c.

717{
718 /* We MUST hold the queue cs while calling this function. */
719 list_remove(&t->entry);
720 queue_add_timer(t, time, set_event);
721}
static void list_remove(struct list_entry *entry)
Definition: list.h:90
static void queue_add_timer(struct queue_timer *t, ULONGLONG time, BOOL set_event)
Definition: threadpool.c:689

Referenced by queue_destroy_timer(), queue_timer_expire(), and RtlUpdateTimer().

◆ queue_remove_timer()

static void queue_remove_timer ( struct queue_timer t)
static

Definition at line 641 of file threadpool.c.

642{
643 /* We MUST hold the queue cs while calling this function. This ensures
644 that we cannot queue another callback for this timer. The runcount
645 being zero makes sure we don't have any already queued. */
646 struct timer_queue *q = t->q;
647
648 assert(t->runcount == 0);
649 assert(t->destroy);
650
651 list_remove(&t->entry);
652 if (t->event)
653 NtSetEvent(t->event, NULL);
655
656 if (q->quit && list_empty(&q->timers))
657 NtSetEvent(q->event, NULL);
658}
static int list_empty(struct list_entry *head)
Definition: list.h:58

Referenced by queue_destroy_timer(), and timer_cleanup_callback().

◆ queue_timer_expire()

static void queue_timer_expire ( struct timer_queue q)
static

Definition at line 723 of file threadpool.c.

724{
725 struct queue_timer *t = NULL;
726
728 if (list_head(&q->timers))
729 {
731 t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
732 if (!t->destroy && t->expire <= ((now = queue_current_time())))
733 {
734 ++t->runcount;
735 if (t->period)
736 {
737 next = t->expire + t->period;
738 /* avoid trigger cascade if overloaded / hibernated */
739 if (next < now)
740 next = now + t->period;
741 }
742 else
745 }
746 else
747 t = NULL;
748 }
750
751 if (t)
752 {
753 if (t->flags & WT_EXECUTEINTIMERTHREAD)
755 else
756 {
758 = (t->flags
762 if (status != STATUS_SUCCESS)
764 }
765 }
766}
GLbitfield flags
Definition: glext.h:7161
NTSYSAPI NTSTATUS NTAPI RtlQueueWorkItem(_In_ WORKERCALLBACKFUNC Function, _In_opt_ PVOID Context, _In_ ULONG Flags)
static unsigned __int64 next
Definition: rand_nt.c:6
static DWORD WINAPI timer_callback_wrapper(LPVOID p)
Definition: threadpool.c:674
static void timer_cleanup_callback(struct queue_timer *t)
Definition: threadpool.c:660
#define WT_TRANSFER_IMPERSONATION
Definition: winnt_old.h:1074
#define WT_EXECUTEINPERSISTENTTHREAD
Definition: winnt_old.h:1073
#define WT_EXECUTEINIOTHREAD
Definition: winnt_old.h:1066
#define WT_EXECUTELONGFUNCTION
Definition: winnt_old.h:1070
#define WT_EXECUTEINTIMERTHREAD
Definition: winnt_old.h:1071

Referenced by timer_queue_thread_proc().

◆ rtl_wait_callback()

static void CALLBACK rtl_wait_callback ( TP_CALLBACK_INSTANCE instance,
void userdata,
TP_WAIT wait,
TP_WAIT_RESULT  result 
)
static

Definition at line 3383 of file threadpool.c.

3384{
3385 struct threadpool_object *object = impl_from_TP_WAIT(wait);
3386 object->u.wait.rtl_callback( userdata, result != STATUS_WAIT_0 );
3387}
GLuint64EXT * result
Definition: glext.h:11304
#define STATUS_WAIT_0
Definition: ntstatus.h:237
static struct threadpool_object * impl_from_TP_WAIT(TP_WAIT *wait)
Definition: threadpool.c:411

Referenced by RtlRegisterWait().

◆ RtlCreateTimer()

NTSTATUS WINAPI RtlCreateTimer ( HANDLE  TimerQueue,
HANDLE NewTimer,
RTL_WAITORTIMERCALLBACKFUNC  Callback,
PVOID  Parameter,
DWORD  DueTime,
DWORD  Period,
ULONG  Flags 
)

Definition at line 1006 of file threadpool.c.

1010{
1012 struct queue_timer *t;
1013 struct timer_queue *q = get_timer_queue(TimerQueue);
1014
1015 if (!q) return STATUS_NO_MEMORY;
1016 if (q->magic != TIMER_QUEUE_MAGIC) return STATUS_INVALID_HANDLE;
1017
1018 t = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *t);
1019 if (!t)
1020 return STATUS_NO_MEMORY;
1021
1022 t->q = q;
1023 t->runcount = 0;
1024 t->callback = Callback;
1025 t->param = Parameter;
1026 t->period = Period;
1027 t->flags = Flags;
1028 t->destroy = FALSE;
1029 t->event = NULL;
1030
1033 if (q->quit)
1035 else
1038
1039 if (status == STATUS_SUCCESS)
1040 *NewTimer = t;
1041 else
1043
1044 return status;
1045}
PVOID NTAPI RtlAllocateHeap(IN PVOID HeapHandle, IN ULONG Flags, IN SIZE_T Size)
Definition: heap.c:616
#define STATUS_INVALID_HANDLE
Definition: d3dkmdt.h:40
#define STATUS_NO_MEMORY
Definition: d3dkmdt.h:51
#define TIMER_QUEUE_MAGIC
Definition: threadpool.c:85
static struct timer_queue * get_timer_queue(HANDLE TimerQueue)
Definition: threadpool.c:957
_In_ WDFINTERRUPT _In_ PFN_WDF_INTERRUPT_SYNCHRONIZE Callback
Definition: wdfinterrupt.h:458
_In_ WDFTIMER _In_ LONGLONG DueTime
Definition: wdftimer.h:190
_Must_inspect_result_ _In_ ULONG Flags
Definition: wsk.h:170
_In_ LARGE_INTEGER _In_ ULONG Period
Definition: kefuncs.h:1313
_Inout_opt_ PVOID Parameter
Definition: rtltypes.h:336

◆ RtlCreateTimerQueue()

NTSTATUS WINAPI RtlCreateTimerQueue ( PHANDLE  NewTimerQueue)

Definition at line 867 of file threadpool.c.

868{
870 struct timer_queue *q = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *q);
871 if (!q)
872 return STATUS_NO_MEMORY;
873
875 list_init(&q->timers);
876 q->quit = FALSE;
877 q->magic = TIMER_QUEUE_MAGIC;
879 if (status != STATUS_SUCCESS)
880 {
882 return status;
883 }
885 timer_queue_thread_proc, q, &q->thread, NULL);
886 if (status != STATUS_SUCCESS)
887 {
888 NtClose(q->event);
890 return status;
891 }
892
893 *NewTimerQueue = q;
894 return STATUS_SUCCESS;
895}
static void list_init(struct list_entry *head)
Definition: list.h:51
#define GetCurrentProcess()
Definition: compat.h:759
#define EVENT_ALL_ACCESS
Definition: isotest.c:82
NTSYSAPI NTSTATUS NTAPI RtlCreateUserThread(_In_ PVOID ThreadContext, _Out_ HANDLE *OutThreadHandle, _Reserved_ PVOID Reserved1, _Reserved_ PVOID Reserved2, _Reserved_ PVOID Reserved3, _Reserved_ PVOID Reserved4, _Reserved_ PVOID Reserved5, _Reserved_ PVOID Reserved6, _Reserved_ PVOID Reserved7, _Reserved_ PVOID Reserved8)
NTSYSAPI NTSTATUS NTAPI RtlInitializeCriticalSection(_In_ PRTL_CRITICAL_SECTION CriticalSection)
NTSTATUS NTAPI NtClose(IN HANDLE Handle)
Definition: obhandle.c:3402
@ SynchronizationEvent
NTSTATUS NTAPI NtCreateEvent(OUT PHANDLE EventHandle, IN ACCESS_MASK DesiredAccess, IN POBJECT_ATTRIBUTES ObjectAttributes OPTIONAL, IN EVENT_TYPE EventType, IN BOOLEAN InitialState)
Definition: event.c:96
static void WINAPI timer_queue_thread_proc(LPVOID p)
Definition: threadpool.c:793

Referenced by BasepCreateDefaultTimerQueue(), CreateTimerQueue(), and get_timer_queue().

◆ RtlDeleteTimer()

NTSTATUS WINAPI RtlDeleteTimer ( HANDLE  TimerQueue,
HANDLE  Timer,
HANDLE  CompletionEvent 
)

Definition at line 1101 of file threadpool.c.

1103{
1104 struct queue_timer *t = Timer;
1105 struct timer_queue *q;
1107 HANDLE event = NULL;
1108
1109 if (!Timer)
1111 q = t->q;
1112 if (CompletionEvent == INVALID_HANDLE_VALUE)
1113 {
1115 if (status == STATUS_SUCCESS)
1117 }
1118 else if (CompletionEvent)
1119 event = CompletionEvent;
1120
1122 t->event = event;
1123 if (t->runcount == 0 && event)
1127
1128 if (CompletionEvent == INVALID_HANDLE_VALUE && event)
1129 {
1130 if (status == STATUS_PENDING)
1131 {
1134 }
1135 NtClose(event);
1136 }
1137
1138 return status;
1139}
#define STATUS_PENDING
Definition: d3dkmdt.h:43
#define INVALID_HANDLE_VALUE
Definition: compat.h:731
struct _cl_event * event
Definition: glext.h:7739
NTSYSAPI NTSTATUS NTAPI NtWaitForSingleObject(IN HANDLE hObject, IN BOOLEAN bAlertable, IN PLARGE_INTEGER Timeout)
#define STATUS_INVALID_PARAMETER_1
Definition: ntstatus.h:475
static void queue_destroy_timer(struct queue_timer *t)
Definition: threadpool.c:840

◆ RtlDeleteTimerQueueEx()

NTSTATUS WINAPI RtlDeleteTimerQueueEx ( HANDLE  TimerQueue,
HANDLE  CompletionEvent 
)

Definition at line 913 of file threadpool.c.

914{
915 struct timer_queue *q = TimerQueue;
916 struct queue_timer *t, *temp;
919
920 if (!q || q->magic != TIMER_QUEUE_MAGIC)
922
923 thread = q->thread;
924
926 q->quit = TRUE;
927 if (list_head(&q->timers))
928 /* When the last timer is removed, it will signal the timer thread to
929 exit... */
930 LIST_FOR_EACH_ENTRY_SAFE(t, temp, &q->timers, struct queue_timer, entry)
932 else
933 /* However if we have none, we must do it ourselves. */
934 NtSetEvent(q->event, NULL);
936
937 if (CompletionEvent == INVALID_HANDLE_VALUE)
938 {
941 }
942 else
943 {
944 if (CompletionEvent)
945 {
946 FIXME("asynchronous return on completion event unimplemented\n");
948 NtSetEvent(CompletionEvent, NULL);
949 }
951 }
952
954 return status;
955}
#define FIXME(fmt,...)
Definition: precomp.h:53
static HANDLE thread
Definition: service.c:33
static calc_node_t temp
Definition: rpn_ieee.c:38
#define LIST_FOR_EACH_ENTRY_SAFE(cursor, cursor2, list, type, field)
Definition: list.h:204

Referenced by get_timer_queue().

◆ RtlDeregisterWait()

NTSTATUS WINAPI RtlDeregisterWait ( HANDLE  WaitHandle)

Definition at line 3501 of file threadpool.c.

3502{
3503 return RtlDeregisterWaitEx(WaitHandle, NULL);
3504}
NTSYSAPI NTSTATUS NTAPI RtlDeregisterWaitEx(_In_ HANDLE hWaitHandle, _In_opt_ HANDLE hCompletionEvent)

◆ RtlDeregisterWaitEx()

NTSTATUS WINAPI RtlDeregisterWaitEx ( HANDLE  handle,
HANDLE  event 
)

Definition at line 3460 of file threadpool.c.

3461{
3462 struct threadpool_object *object = handle;
3464
3465 TRACE( "handle %p, event %p\n", handle, event );
3466
3467 if (!object) return STATUS_INVALID_HANDLE;
3468
3469 TpSetWait( (TP_WAIT *)object, NULL, NULL );
3470
3471 if (event == INVALID_HANDLE_VALUE) TpWaitForWait( (TP_WAIT *)object, TRUE );
3472 else
3473 {
3474 assert( object->completed_event == NULL );
3475 object->completed_event = event;
3476 }
3477
3478 RtlEnterCriticalSection( &object->pool->cs );
3479 if (object->num_pending_callbacks + object->num_running_callbacks
3480 + object->num_associated_callbacks) status = STATUS_PENDING;
3481 else status = STATUS_SUCCESS;
3482 RtlLeaveCriticalSection( &object->pool->cs );
3483
3484 TpReleaseWait( (TP_WAIT *)object );
3485 return status;
3486}
VOID WINAPI TpWaitForWait(TP_WAIT *wait, BOOL cancel_pending)
Definition: threadpool.c:3320
VOID WINAPI TpReleaseWait(TP_WAIT *wait)
Definition: threadpool.c:3044
VOID WINAPI TpSetWait(TP_WAIT *wait, HANDLE handle, LARGE_INTEGER *timeout)
Definition: threadpool.c:3194
struct _TP_WAIT TP_WAIT
Definition: winnt_old.h:4492

◆ RtlQueueWorkItem()

NTSTATUS WINAPI RtlQueueWorkItem ( PRTL_WORK_ITEM_ROUTINE  function,
PVOID  context,
ULONG  flags 
)

Definition at line 516 of file threadpool.c.

517{
518 TP_CALLBACK_ENVIRON environment;
519 struct rtl_work_item *item;
521
522 TRACE( "%p %p %lu\n", function, context, flags );
523
524 item = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*item) );
525 if (!item)
526 return STATUS_NO_MEMORY;
527
528 memset( &environment, 0, sizeof(environment) );
529 environment.Version = 1;
530 environment.u.s.LongFunction = (flags & WT_EXECUTELONGFUNCTION) != 0;
531 environment.u.s.Persistent = (flags & WT_EXECUTEINPERSISTENTTHREAD) != 0;
532
533 item->function = function;
534 item->context = context;
535
538 return status;
539}
#define memset(x, y, z)
Definition: compat.h:39
static void CALLBACK process_rtl_work_item(TP_CALLBACK_INSTANCE *instance, void *userdata)
Definition: threadpool.c:484
NTSTATUS WINAPI TpSimpleTryPost(PTP_SIMPLE_CALLBACK callback, PVOID userdata, TP_CALLBACK_ENVIRON *environment)
Definition: threadpool.c:3246
struct _TP_CALLBACK_ENVIRON_V3::@4347::@4348 s
union _TP_CALLBACK_ENVIRON_V3::@4347 u
Definition: http.c:7252
PVOID context
Definition: threadpool.c:81
PRTL_WORK_ITEM_ROUTINE function
Definition: threadpool.c:80

◆ RtlRegisterWait()

NTSTATUS WINAPI RtlRegisterWait ( HANDLE out,
HANDLE  handle,
RTL_WAITORTIMERCALLBACKFUNC  callback,
void context,
ULONG  milliseconds,
ULONG  flags 
)

Definition at line 3414 of file threadpool.c.

3416{
3417 struct threadpool_object *object;
3418 TP_CALLBACK_ENVIRON environment;
3421 TP_WAIT *wait;
3422
3423 TRACE( "out %p, handle %p, callback %p, context %p, milliseconds %lu, flags %lx\n",
3424 out, handle, callback, context, milliseconds, flags );
3425
3426 memset( &environment, 0, sizeof(environment) );
3427 environment.Version = 1;
3428 environment.u.s.LongFunction = (flags & WT_EXECUTELONGFUNCTION) != 0;
3429 environment.u.s.Persistent = (flags & WT_EXECUTEINPERSISTENTTHREAD) != 0;
3430
3432 if ((status = tp_alloc_wait( &wait, rtl_wait_callback, context, &environment, flags )))
3433 return status;
3434
3435 object = impl_from_TP_WAIT(wait);
3436 object->u.wait.rtl_callback = callback;
3437
3439 TpSetWait( (TP_WAIT *)object, handle, get_nt_timeout( &timeout, milliseconds ) );
3440
3441 *out = object;
3443
3444 return STATUS_SUCCESS;
3445}
static PLARGE_INTEGER get_nt_timeout(PLARGE_INTEGER pTime, ULONG timeout)
Definition: threadpool.c:632
static struct @5141 waitqueue
static NTSTATUS tp_alloc_wait(TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata, TP_CALLBACK_ENVIRON *environment, DWORD flags)
Definition: threadpool.c:2641
static void CALLBACK rtl_wait_callback(TP_CALLBACK_INSTANCE *instance, void *userdata, TP_WAIT *wait, TP_WAIT_RESULT result)
Definition: threadpool.c:3383
ULONGLONG timeout
Definition: threadpool.c:227
wchar_t tm const _CrtWcstime_Writes_and_advances_ptr_ count wchar_t ** out
Definition: wcsftime.cpp:383
#define WT_EXECUTEONLYONCE
Definition: winnt_old.h:1069
#define WT_EXECUTEINWAITTHREAD
Definition: winnt_old.h:1068

◆ RtlSetIoCompletionCallback()

NTSTATUS WINAPI RtlSetIoCompletionCallback ( HANDLE  FileHandle,
PRTL_OVERLAPPED_COMPLETION_ROUTINE  Function,
ULONG  Flags 
)

Definition at line 594 of file threadpool.c.

595{
598
599 if (Flags) FIXME("Unknown value Flags=0x%lx\n", Flags);
600
601 if (!old_threadpool.compl_port)
602 {
604
605 RtlEnterCriticalSection(&old_threadpool.threadpool_compl_cs);
606 if (!old_threadpool.compl_port)
607 {
608 HANDLE cport;
609
611 if (!res)
612 {
613 /* FIXME native can start additional threads in case of e.g. hung callback function. */
615 if (!res)
616 old_threadpool.compl_port = cport;
617 else
618 NtClose( cport );
619 }
620 }
621 RtlLeaveCriticalSection(&old_threadpool.threadpool_compl_cs);
622 if (res) return res;
623 }
624
625 info.CompletionPort = old_threadpool.compl_port;
626 info.CompletionKey = (ULONG_PTR)Function;
627
629}
_In_ CDROM_SCAN_FOR_SPECIAL_INFO _In_ PCDROM_SCAN_FOR_SPECIAL_HANDLER Function
Definition: cdrom.h:1156
_Must_inspect_result_ _In_opt_ PFLT_INSTANCE _Out_ PHANDLE FileHandle
Definition: fltkernel.h:1231
@ FileCompletionInformation
Definition: from_kernel.h:91
NTSTATUS NTAPI NtCreateIoCompletion(OUT PHANDLE IoCompletionHandle, IN ACCESS_MASK DesiredAccess, IN POBJECT_ATTRIBUTES ObjectAttributes, IN ULONG NumberOfConcurrentThreads)
Definition: iocomp.c:253
#define IO_COMPLETION_ALL_ACCESS
Definition: file.c:72
NTSYSAPI NTSTATUS NTAPI NtSetInformationFile(IN HANDLE hFile, OUT PIO_STATUS_BLOCK pIoStatusBlock, IN PVOID FileInformationBuffer, IN ULONG FileInformationBufferLength, IN FILE_INFORMATION_CLASS FileInfoClass)
Definition: iofunc.c:3096
static DWORD CALLBACK iocp_poller(LPVOID Arg)
Definition: threadpool.c:544
static struct @5139 old_threadpool
#define WT_EXECUTEDEFAULT
Definition: winnt_old.h:1065

◆ RtlUpdateTimer()

NTSTATUS WINAPI RtlUpdateTimer ( HANDLE  TimerQueue,
HANDLE  Timer,
DWORD  DueTime,
DWORD  Period 
)

Definition at line 1065 of file threadpool.c.

1067{
1068 struct queue_timer *t = Timer;
1069 struct timer_queue *q = t->q;
1070
1072 /* Can't change a timer if it was once-only or destroyed. */
1073 if (t->expire != EXPIRE_NEVER)
1074 {
1075 t->period = Period;
1077 }
1079
1080 return STATUS_SUCCESS;
1081}

◆ set_thread_name()

static void set_thread_name ( const WCHAR name)
static

Definition at line 473 of file threadpool.c.

474{
475#ifndef __REACTOS__ // This is impossible on non vista+
476 THREAD_NAME_INFORMATION info;
477
478 RtlInitUnicodeString(&info.ThreadName, name);
479 NtSetInformationThread(GetCurrentThread(), ThreadNameInformation, &info, sizeof(info));
480#endif
481}
NTSYSAPI VOID NTAPI RtlInitUnicodeString(PUNICODE_STRING DestinationString, PCWSTR SourceString)
NTSTATUS NTAPI NtSetInformationThread(IN HANDLE ThreadHandle, IN THREADINFOCLASS ThreadInformationClass, IN PVOID ThreadInformation, IN ULONG ThreadInformationLength)
Definition: query.c:2067
Definition: name.c:39
HANDLE WINAPI GetCurrentThread(void)
Definition: proc.c:1148

Referenced by ioqueue_thread_proc(), threadpool_worker_proc(), timer_queue_thread_proc(), timerqueue_thread_proc(), and waitqueue_thread_proc().

◆ threadpool_get_next_item()

static struct list * threadpool_get_next_item ( const struct threadpool pool)
static

Definition at line 2295 of file threadpool.c.

2296{
2297 struct list *ptr;
2298 unsigned int i;
2299
2300 for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
2301 {
2302 if ((ptr = list_head( &pool->pools[i] )))
2303 break;
2304 }
2305
2306 return ptr;
2307}
#define ARRAY_SIZE(A)
Definition: main.h:20
GLsizei GLenum const GLvoid GLsizei GLenum GLbyte GLbyte GLbyte GLdouble GLdouble GLdouble GLfloat GLfloat GLfloat GLint GLint GLint GLshort GLshort GLshort GLubyte GLubyte GLubyte GLuint GLuint GLuint GLushort GLushort GLushort GLbyte GLbyte GLbyte GLbyte GLdouble GLdouble GLdouble GLdouble GLfloat GLfloat GLfloat GLfloat GLint GLint GLint GLint GLshort GLshort GLshort GLshort GLubyte GLubyte GLubyte GLubyte GLuint GLuint GLuint GLuint GLushort GLushort GLushort GLushort GLboolean const GLdouble const GLfloat const GLint const GLshort const GLbyte const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLdouble const GLfloat const GLfloat const GLint const GLint const GLshort const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort GLenum GLenum GLenum GLfloat GLenum GLint GLenum GLenum GLenum GLfloat GLenum GLenum GLint GLenum GLfloat GLenum GLint GLint GLushort GLenum GLenum GLfloat GLenum GLenum GLint GLfloat const GLubyte GLenum GLenum GLenum const GLfloat GLenum GLenum const GLint GLenum GLint GLint GLsizei GLsizei GLint GLenum GLenum const GLvoid GLenum GLenum const GLfloat GLenum GLenum const GLint GLenum GLenum const GLdouble GLenum GLenum const GLfloat GLenum GLenum const GLint GLsizei GLuint GLfloat GLuint GLbitfield GLfloat GLint GLuint GLboolean GLenum GLfloat GLenum GLbitfield GLenum GLfloat GLfloat GLint GLint const GLfloat GLenum GLfloat GLfloat GLint GLint GLfloat GLfloat GLint GLint const GLfloat GLint GLfloat GLfloat GLint GLfloat GLfloat GLint GLfloat GLfloat const GLdouble const GLfloat const GLdouble const GLfloat GLint i
Definition: glfuncs.h:248

Referenced by threadpool_worker_proc().

◆ threadpool_worker_proc()

static void CALLBACK threadpool_worker_proc ( void param)
static

Definition at line 2474 of file threadpool.c.

2476{
2477 struct threadpool *pool = param;
2479 struct list *ptr;
2480
2481 TRACE( "starting worker thread for pool %p\n", pool );
2482 set_thread_name(L"wine_threadpool_worker");
2483
2485 for (;;)
2486 {
2487 while ((ptr = threadpool_get_next_item( pool )))
2488 {
2489 struct threadpool_object *object = LIST_ENTRY( ptr, struct threadpool_object, pool_entry );
2490 assert( object->num_pending_callbacks > 0 );
2491
2492 /* If further pending callbacks are queued, move the work item to
2493 * the end of the pool list. Otherwise remove it from the pool. */
2494 list_remove( &object->pool_entry );
2495 if (object->num_pending_callbacks > 1)
2496 tp_object_prio_queue( object );
2497
2498 tp_object_execute( object, FALSE );
2499
2500 assert(pool->num_busy_workers);
2501 pool->num_busy_workers--;
2502
2503 tp_object_release( object );
2504 }
2505
2506 /* Shutdown worker thread if requested. */
2507 if (pool->shutdown)
2508 break;
2509
2510 /* Wait for new tasks or until the timeout expires. A thread only terminates
2511 * when no new tasks are available, and the number of threads can be
2512 * decreased without violating the min_workers limit. An exception is when
2513 * min_workers == 0, then objcount is used to detect if the last thread
2514 * can be terminated. */
2515 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
2516 if (RtlSleepConditionVariableCS( &pool->update_event, &pool->cs, &timeout ) == STATUS_TIMEOUT &&
2517 !threadpool_get_next_item( pool ) && (pool->num_workers > max( pool->min_workers, 1 ) ||
2518 (!pool->min_workers && !pool->objcount)))
2519 {
2520 break;
2521 }
2522 }
2523 pool->num_workers--;
2525
2526 TRACE( "terminating worker thread for pool %p\n", pool );
2528 RtlExitUserThread( 0 );
2529#ifdef __REACTOS__
2530 return STATUS_SUCCESS;
2531#endif
2532}
GLfloat param
Definition: glext.h:5796
static struct list * threadpool_get_next_item(const struct threadpool *pool)
Definition: threadpool.c:2295
static BOOL tp_threadpool_release(struct threadpool *pool)
Definition: threadpool.c:1848
static void tp_object_execute(struct threadpool_object *object, BOOL wait_thread)
Definition: threadpool.c:2315
static void tp_object_prio_queue(struct threadpool_object *object)
Definition: threadpool.c:2110
struct list pool_entry
Definition: threadpool.c:201

Referenced by tp_new_worker_thread().

◆ timer_callback_wrapper()

static DWORD WINAPI timer_callback_wrapper ( LPVOID  p)
static

Definition at line 674 of file threadpool.c.

675{
676 struct queue_timer *t = p;
677 t->callback(t->param, TRUE);
679 return 0;
680}

Referenced by queue_timer_expire().

◆ timer_cleanup_callback()

static void timer_cleanup_callback ( struct queue_timer t)
static

Definition at line 660 of file threadpool.c.

661{
662 struct timer_queue *q = t->q;
664
665 assert(0 < t->runcount);
666 --t->runcount;
667
668 if (t->destroy && t->runcount == 0)
670
672}

Referenced by queue_timer_expire(), and timer_callback_wrapper().

◆ timer_queue_thread_proc()

static void WINAPI timer_queue_thread_proc ( LPVOID  p)
static

Definition at line 793 of file threadpool.c.

795{
796 struct timer_queue *q = p;
797 ULONG timeout_ms;
798
799 set_thread_name(L"wine_threadpool_timer_queue");
800 timeout_ms = INFINITE;
801 for (;;)
802 {
805 BOOL done = FALSE;
806
808 q->event, FALSE, get_nt_timeout(&timeout, timeout_ms));
809
810 if (status == STATUS_WAIT_0)
811 {
812 /* There are two possible ways to trigger the event. Either
813 we are quitting and the last timer got removed, or a new
814 timer got put at the head of the list so we need to adjust
815 our timeout. */
817 if (q->quit && list_empty(&q->timers))
818 done = TRUE;
820 }
821 else if (status == STATUS_TIMEOUT)
823
824 if (done)
825 break;
826
827 timeout_ms = queue_get_timeout(q);
828 }
829
830 NtClose(q->event);
832 q->magic = 0;
835#ifdef __REACTOS__
836 return STATUS_SUCCESS;
837#endif
838}
NTSYSAPI NTSTATUS NTAPI RtlDeleteCriticalSection(_In_ PRTL_CRITICAL_SECTION CriticalSection)
static ULONG queue_get_timeout(struct timer_queue *q)
Definition: threadpool.c:768
static void queue_timer_expire(struct timer_queue *q)
Definition: threadpool.c:723

Referenced by RtlCreateTimerQueue().

◆ timerqueue_thread_proc()

static void CALLBACK timerqueue_thread_proc ( void param)
static

Definition at line 1147 of file threadpool.c.

1149{
1150 ULONGLONG timeout_lower, timeout_upper, new_timeout;
1151 struct threadpool_object *other_timer;
1153 struct list *ptr;
1154
1155 TRACE( "starting timer queue thread\n" );
1156 set_thread_name(L"wine_threadpool_timerqueue");
1157
1159 for (;;)
1160 {
1162
1163 /* Check for expired timers. */
1164 while ((ptr = list_head( &timerqueue.pending_timers )))
1165 {
1166 struct threadpool_object *timer = LIST_ENTRY( ptr, struct threadpool_object, u.timer.timer_entry );
1167 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1168 assert( timer->u.timer.timer_pending );
1169 if (timer->u.timer.timeout > now.QuadPart)
1170 break;
1171
1172 /* Queue a new callback in one of the worker threads. */
1173 list_remove( &timer->u.timer.timer_entry );
1174 timer->u.timer.timer_pending = FALSE;
1176
1177 /* Insert the timer back into the queue, except it's marked for shutdown. */
1178 if (timer->u.timer.period && !timer->shutdown)
1179 {
1180 timer->u.timer.timeout += (ULONGLONG)timer->u.timer.period * 10000;
1181 if (timer->u.timer.timeout <= now.QuadPart)
1182 timer->u.timer.timeout = now.QuadPart + 1;
1183
1184 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
1185 struct threadpool_object, u.timer.timer_entry )
1186 {
1187 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
1188 if (timer->u.timer.timeout < other_timer->u.timer.timeout)
1189 break;
1190 }
1191 list_add_before( &other_timer->u.timer.timer_entry, &timer->u.timer.timer_entry );
1192 timer->u.timer.timer_pending = TRUE;
1193 }
1194 }
1195
1196 timeout_lower = timeout_upper = MAXLONGLONG;
1197
1198 /* Determine next timeout and use the window length to optimize wakeup times. */
1199 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
1200 struct threadpool_object, u.timer.timer_entry )
1201 {
1202 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
1203 if (other_timer->u.timer.timeout >= timeout_upper)
1204 break;
1205
1206 timeout_lower = other_timer->u.timer.timeout;
1207 new_timeout = timeout_lower + (ULONGLONG)other_timer->u.timer.window_length * 10000;
1208 if (new_timeout < timeout_upper)
1209 timeout_upper = new_timeout;
1210 }
1211
1212 /* Wait for timer update events or until the next timer expires. */
1213 if (timerqueue.objcount)
1214 {
1215 timeout.QuadPart = timeout_lower;
1217 continue;
1218 }
1219
1220 /* All timers have been destroyed, if no new timers are created
1221 * within some amount of time, then we can shutdown this thread. */
1222 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
1223 if (RtlSleepConditionVariableCS( &timerqueue.update_event, &timerqueue.cs,
1224 &timeout ) == STATUS_TIMEOUT && !timerqueue.objcount)
1225 {
1226 break;
1227 }
1228 }
1229
1230 timerqueue.thread_running = FALSE;
1232
1233 TRACE( "terminating timer queue thread\n" );
1234 RtlExitUserThread( 0 );
1235#ifdef __REACTOS__
1236 return STATUS_SUCCESS;
1237#endif
1238}
GLsizei GLenum const GLvoid GLsizei GLenum GLbyte GLbyte GLbyte GLdouble GLdouble GLdouble GLfloat GLfloat GLfloat GLint GLint GLint GLshort GLshort GLshort GLubyte GLubyte GLubyte GLuint GLuint GLuint GLushort GLushort GLushort GLbyte GLbyte GLbyte GLbyte GLdouble GLdouble GLdouble GLdouble GLfloat GLfloat GLfloat GLfloat GLint GLint GLint GLint GLshort GLshort GLshort GLshort GLubyte GLubyte GLubyte GLubyte GLuint GLuint GLuint GLuint GLushort GLushort GLushort GLushort GLboolean const GLdouble const GLfloat const GLint const GLshort const GLbyte const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLdouble const GLfloat const GLfloat const GLint const GLint const GLshort const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort const GLdouble const GLfloat const GLint const GLshort GLenum GLenum GLenum GLfloat GLenum GLint GLenum GLenum GLenum GLfloat GLenum GLenum GLint GLenum GLfloat GLenum GLint GLint GLushort GLenum GLenum GLfloat GLenum GLenum GLint GLfloat const GLubyte GLenum GLenum GLenum const GLfloat GLenum GLenum const GLint GLenum GLint GLint GLsizei GLsizei GLint GLenum GLenum const GLvoid GLenum GLenum const GLfloat GLenum GLenum const GLint GLenum GLenum const GLdouble GLenum GLenum const GLfloat GLenum GLenum const GLint GLsizei GLuint GLfloat GLuint GLbitfield GLfloat GLint GLuint GLboolean GLenum GLfloat GLenum GLbitfield GLenum GLfloat GLfloat GLint GLint const GLfloat GLenum GLfloat GLfloat GLint GLint GLfloat GLfloat GLint GLint const GLfloat GLint GLfloat GLfloat GLint GLfloat GLfloat GLint GLfloat GLfloat const GLdouble * u
Definition: glfuncs.h:240
if(dx< 0)
Definition: linetemp.h:194
#define MAXLONGLONG
NTSTATUS NTAPI NtQuerySystemTime(OUT PLARGE_INTEGER SystemTime)
Definition: time.c:569
#define LIST_FOR_EACH_ENTRY(elem, list, type, field)
Definition: list.h:198
static struct @5140 timerqueue
enum threadpool_objtype type
Definition: threadpool.c:188
union threadpool_object::@5143 u

Referenced by tp_timerqueue_lock().

◆ tp_alloc_wait()

static NTSTATUS tp_alloc_wait ( TP_WAIT **  out,
PTP_WAIT_CALLBACK  callback,
PVOID  userdata,
TP_CALLBACK_ENVIRON environment,
DWORD  flags 
)
static

Definition at line 2641 of file threadpool.c.

2643{
2644 struct threadpool_object *object;
2645 struct threadpool *pool;
2647
2648 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2649 if (!object)
2650 return STATUS_NO_MEMORY;
2651
2652 status = tp_threadpool_lock( &pool, environment );
2653 if (status)
2654 {
2655 RtlFreeHeap( GetProcessHeap(), 0, object );
2656 return status;
2657 }
2658
2659 object->type = TP_OBJECT_TYPE_WAIT;
2660 object->u.wait.callback = callback;
2661 object->u.wait.flags = flags;
2662
2663 status = tp_waitqueue_lock( object );
2664 if (status)
2665 {
2667 RtlFreeHeap( GetProcessHeap(), 0, object );
2668 return status;
2669 }
2670
2671 tp_object_initialize( object, pool, userdata, environment );
2672
2673 *out = (TP_WAIT *)object;
2674 return STATUS_SUCCESS;
2675}
static void tp_object_initialize(struct threadpool_object *object, struct threadpool *pool, PVOID userdata, TP_CALLBACK_ENVIRON *environment)
Definition: threadpool.c:2029
static NTSTATUS tp_waitqueue_lock(struct threadpool_object *wait)
Definition: threadpool.c:1518
static void tp_threadpool_unlock(struct threadpool *pool)
Definition: threadpool.c:1950
static NTSTATUS tp_threadpool_lock(struct threadpool **out, TP_CALLBACK_ENVIRON *environment)
Definition: threadpool.c:1877

Referenced by RtlRegisterWait(), and TpAllocWait().

◆ tp_group_alloc()

static NTSTATUS tp_group_alloc ( struct threadpool_group **  out)
static

Definition at line 1963 of file threadpool.c.

1964{
1965 struct threadpool_group *group;
1966
1967 group = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*group) );
1968 if (!group)
1969 return STATUS_NO_MEMORY;
1970
1971 group->refcount = 1;
1972 group->shutdown = FALSE;
1973
1974#ifdef __REACTOS__
1976#else
1978
1979 group->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool_group.cs");
1980#endif
1981
1982 list_init( &group->members );
1983
1984 TRACE( "allocated group %p\n", group );
1985
1986 *out = group;
1987 return STATUS_SUCCESS;
1988}
NTSYSAPI NTSTATUS WINAPI RtlInitializeCriticalSectionEx(RTL_CRITICAL_SECTION *, ULONG, ULONG)
#define DWORD_PTR
Definition: treelist.c:76
#define RTL_CRITICAL_SECTION_FLAG_FORCE_DEBUG_INFO
Definition: winnt_old.h:1116

Referenced by TpAllocCleanupGroup().

◆ tp_group_release()

static BOOL tp_group_release ( struct threadpool_group group)
static

Definition at line 2005 of file threadpool.c.

2006{
2007 if (InterlockedDecrement( &group->refcount ))
2008 return FALSE;
2009
2010 TRACE( "destroying group %p\n", group );
2011
2012 assert( group->shutdown );
2013 assert( list_empty( &group->members ) );
2014
2015#ifndef __REACTOS__
2016 group->cs.DebugInfo->Spare[0] = 0;
2017#endif
2019
2021 return TRUE;
2022}
#define InterlockedDecrement
Definition: armddk.h:52

Referenced by tp_object_release(), and TpReleaseCleanupGroup().

◆ tp_group_shutdown()

static void tp_group_shutdown ( struct threadpool_group group)
static

Definition at line 1995 of file threadpool.c.

1996{
1997 group->shutdown = TRUE;
1998}

Referenced by TpReleaseCleanupGroup().

◆ tp_ioqueue_lock()

static NTSTATUS tp_ioqueue_lock ( struct threadpool_object io,
HANDLE  file 
)
static

Definition at line 1727 of file threadpool.c.

1728{
1730
1731 assert( io->type == TP_OBJECT_TYPE_IO );
1732
1734
1735 if (!ioqueue.port && (status = NtCreateIoCompletion( &ioqueue.port,
1737 {
1739 return status;
1740 }
1741
1742 if (!ioqueue.thread_running)
1743 {
1744 HANDLE thread;
1745
1747 0, 0, 0, ioqueue_thread_proc, NULL, &thread, NULL )))
1748 {
1749 ioqueue.thread_running = TRUE;
1750 NtClose( thread );
1751 }
1752 }
1753
1754 if (status == STATUS_SUCCESS)
1755 {
1758
1759#ifdef __REACTOS__
1760 info.Port = ioqueue.port;
1761 info.Key = io;
1762#else
1763 info.CompletionPort = ioqueue.port;
1764 info.CompletionKey = (ULONG_PTR)io;
1765#endif
1766
1768 }
1769
1770 if (status == STATUS_SUCCESS)
1771 {
1772 if (!ioqueue.objcount++)
1773 RtlWakeConditionVariable( &ioqueue.update_event );
1774 }
1775
1777 return status;
1778}
NTSYSAPI VOID NTAPI RtlWakeConditionVariable(_Inout_ PRTL_CONDITION_VARIABLE ConditionVariable)
static void CALLBACK ioqueue_thread_proc(void *param)
Definition: threadpool.c:1618
Definition: fci.c:127

Referenced by TpAllocIoCompletion().

◆ tp_ioqueue_unlock()

static void tp_ioqueue_unlock ( struct threadpool_object io)
static

Definition at line 2221 of file threadpool.c.

2222{
2223 assert( io->type == TP_OBJECT_TYPE_IO );
2224
2226
2227 assert(ioqueue.objcount);
2228
2229 if (!io->shutdown && !--ioqueue.objcount)
2230 NtSetIoCompletion( ioqueue.port, 0, 0, STATUS_SUCCESS, 0 );
2231
2233}
NTSTATUS NTAPI NtSetIoCompletion(IN HANDLE IoCompletionPortHandle, IN PVOID CompletionKey, IN PVOID CompletionContext, IN NTSTATUS CompletionStatus, IN ULONG CompletionInformation)
Definition: iocomp.c:569

Referenced by tp_object_prepare_shutdown().

◆ tp_new_worker_thread()

static NTSTATUS tp_new_worker_thread ( struct threadpool pool)
static

Definition at line 1245 of file threadpool.c.

1246{
1247 HANDLE thread;
1249
1251 pool->stack_info.StackReserve, pool->stack_info.StackCommit,
1253 if (status == STATUS_SUCCESS)
1254 {
1255 InterlockedIncrement( &pool->refcount );
1256 pool->num_workers++;
1257 NtClose( thread );
1258 }
1259 return status;
1260}
#define InterlockedIncrement
Definition: armddk.h:53
static void CALLBACK threadpool_worker_proc(void *param)
Definition: threadpool.c:2474

Referenced by tp_object_submit(), tp_threadpool_lock(), TpCallbackMayRunLong(), and TpSetPoolMinThreads().

◆ tp_object_cancel()

static void tp_object_cancel ( struct threadpool_object object)
static

Definition at line 2161 of file threadpool.c.

2162{
2163 struct threadpool *pool = object->pool;
2164 LONG pending_callbacks = 0;
2165
2167 if (object->num_pending_callbacks)
2168 {
2169 pending_callbacks = object->num_pending_callbacks;
2170 object->num_pending_callbacks = 0;
2171 list_remove( &object->pool_entry );
2172
2173 if (object->type == TP_OBJECT_TYPE_WAIT)
2174 object->u.wait.signaled = 0;
2175 }
2176 if (object->type == TP_OBJECT_TYPE_IO)
2177 {
2178 object->u.io.skipped_count += object->u.io.pending_count;
2179 object->u.io.pending_count = 0;
2180 }
2182
2183 while (pending_callbacks--)
2184 tp_object_release( object );
2185}
long LONG
Definition: pedump.c:60

Referenced by TpReleaseCleanupGroupMembers(), TpWaitForIoCompletion(), TpWaitForTimer(), TpWaitForWait(), and TpWaitForWork().

◆ tp_object_execute()

static void tp_object_execute ( struct threadpool_object object,
BOOL  wait_thread 
)
static

Definition at line 2315 of file threadpool.c.

2316{
2317 TP_CALLBACK_INSTANCE *callback_instance;
2320 struct threadpool *pool = object->pool;
2321 TP_WAIT_RESULT wait_result = 0;
2323
2324 object->num_pending_callbacks--;
2325
2326 /* For wait objects check if they were signaled or have timed out. */
2327 if (object->type == TP_OBJECT_TYPE_WAIT)
2328 {
2329 wait_result = object->u.wait.signaled ? WAIT_OBJECT_0 : WAIT_TIMEOUT;
2330 if (wait_result == WAIT_OBJECT_0) object->u.wait.signaled--;
2331 }
2332 else if (object->type == TP_OBJECT_TYPE_IO)
2333 {
2334 assert( object->u.io.completion_count );
2335 completion = object->u.io.completions[--object->u.io.completion_count];
2336 }
2337
2338 /* Leave critical section and do the actual callback. */
2339 object->num_associated_callbacks++;
2340 object->num_running_callbacks++;
2342 if (wait_thread) RtlLeaveCriticalSection( &waitqueue.cs );
2343
2344 /* Initialize threadpool instance struct. */
2345 callback_instance = (TP_CALLBACK_INSTANCE *)&instance;
2346 instance.object = object;
2347 instance.threadid = GetCurrentThreadId();
2348 instance.associated = TRUE;
2349 instance.may_run_long = object->may_run_long;
2350 instance.cleanup.critical_section = NULL;
2351 instance.cleanup.mutex = NULL;
2352 instance.cleanup.semaphore = NULL;
2353 instance.cleanup.semaphore_count = 0;
2354 instance.cleanup.event = NULL;
2355 instance.cleanup.library = NULL;
2356
2357 switch (object->type)
2358 {
2360 {
2361 TRACE( "executing simple callback %p(%p, %p)\n",
2362 object->u.simple.callback, callback_instance, object->userdata );
2363 object->u.simple.callback( callback_instance, object->userdata );
2364 TRACE( "callback %p returned\n", object->u.simple.callback );
2365 break;
2366 }
2367
2369 {
2370 TRACE( "executing work callback %p(%p, %p, %p)\n",
2371 object->u.work.callback, callback_instance, object->userdata, object );
2372 object->u.work.callback( callback_instance, object->userdata, (TP_WORK *)object );
2373 TRACE( "callback %p returned\n", object->u.work.callback );
2374 break;
2375 }
2376
2378 {
2379 TRACE( "executing timer callback %p(%p, %p, %p)\n",
2380 object->u.timer.callback, callback_instance, object->userdata, object );
2381 object->u.timer.callback( callback_instance, object->userdata, (TP_TIMER *)object );
2382 TRACE( "callback %p returned\n", object->u.timer.callback );
2383 break;
2384 }
2385
2387 {
2388 TRACE( "executing wait callback %p(%p, %p, %p, %lu)\n",
2389 object->u.wait.callback, callback_instance, object->userdata, object, wait_result );
2390 object->u.wait.callback( callback_instance, object->userdata, (TP_WAIT *)object, wait_result );
2391 TRACE( "callback %p returned\n", object->u.wait.callback );
2392 break;
2393 }
2394
2395 case TP_OBJECT_TYPE_IO:
2396 {
2397 TRACE( "executing I/O callback %p(%p, %p, %#Ix, %p, %p)\n",
2398 object->u.io.callback, callback_instance, object->userdata,
2399 completion.cvalue, &completion.iosb, (TP_IO *)object );
2400 object->u.io.callback( callback_instance, object->userdata,
2401 (void *)completion.cvalue, &completion.iosb, (TP_IO *)object );
2402 TRACE( "callback %p returned\n", object->u.io.callback );
2403 break;
2404 }
2405
2406 default:
2407 assert(0);
2408 break;
2409 }
2410
2411 /* Execute finalization callback. */
2412 if (object->finalization_callback)
2413 {
2414 TRACE( "executing finalization callback %p(%p, %p)\n",
2415 object->finalization_callback, callback_instance, object->userdata );
2416 object->finalization_callback( callback_instance, object->userdata );
2417 TRACE( "callback %p returned\n", object->finalization_callback );
2418 }
2419
2420 /* Execute cleanup tasks. */
2421 if (instance.cleanup.critical_section)
2422 {
2423 RtlLeaveCriticalSection( instance.cleanup.critical_section );
2424 }
2425 if (instance.cleanup.mutex)
2426 {
2427 status = NtReleaseMutant( instance.cleanup.mutex, NULL );
2428 if (status != STATUS_SUCCESS) goto skip_cleanup;
2429 }
2430 if (instance.cleanup.semaphore)
2431 {
2432 status = NtReleaseSemaphore( instance.cleanup.semaphore, instance.cleanup.semaphore_count, NULL );
2433 if (status != STATUS_SUCCESS) goto skip_cleanup;
2434 }
2435 if (instance.cleanup.event)
2436 {
2437 status = NtSetEvent( instance.cleanup.event, NULL );
2438 if (status != STATUS_SUCCESS) goto skip_cleanup;
2439 }
2440 if (instance.cleanup.library)
2441 {
2442 LdrUnloadDll( instance.cleanup.library );
2443 }
2444
2445skip_cleanup:
2446 if (wait_thread) RtlEnterCriticalSection( &waitqueue.cs );
2448
2449 /* Simple callbacks are automatically shutdown after execution. */
2450 if (object->type == TP_OBJECT_TYPE_SIMPLE)
2451 {
2453 object->shutdown = TRUE;
2454 }
2455
2456 object->num_running_callbacks--;
2457 if (object_is_finished( object, TRUE ))
2458 RtlWakeAllConditionVariable( &object->group_finished_event );
2459
2460 if (instance.associated)
2461 {
2462 object->num_associated_callbacks--;
2463 if (object_is_finished( object, FALSE ))
2464 RtlWakeAllConditionVariable( &object->finished_event );
2465 }
2466}
#define WAIT_TIMEOUT
Definition: dderror.h:14
NTSTATUS NTAPI LdrUnloadDll(_In_ PVOID BaseAddress)
Definition: ldrapi.c:1291
NTSTATUS NTAPI NtReleaseMutant(IN HANDLE MutantHandle, IN PLONG PreviousCount OPTIONAL)
Definition: mutant.c:296
NTSYSAPI VOID NTAPI RtlWakeAllConditionVariable(_Inout_ PRTL_CONDITION_VARIABLE ConditionVariable)
static BOOL object_is_finished(struct threadpool_object *object, BOOL group)
Definition: threadpool.c:2187
static void tp_object_prepare_shutdown(struct threadpool_object *object)
Definition: threadpool.c:2240
NTSTATUS NTAPI NtReleaseSemaphore(IN HANDLE SemaphoreHandle, IN LONG ReleaseCount, OUT PLONG PreviousCount OPTIONAL)
Definition: sem.c:295
DWORD WINAPI GetCurrentThreadId(void)
Definition: thread.c:459
#define WAIT_OBJECT_0
Definition: winbase.h:432
DWORD TP_WAIT_RESULT
Definition: winnt_old.h:4495
struct _TP_WORK TP_WORK
Definition: winnt_old.h:4489
struct _TP_IO TP_IO
Definition: winnt_old.h:4493
struct _TP_TIMER TP_TIMER
Definition: winnt_old.h:4491
struct _TP_CALLBACK_INSTANCE TP_CALLBACK_INSTANCE
Definition: winnt_old.h:4490

Referenced by threadpool_worker_proc(), and waitqueue_thread_proc().

◆ tp_object_initialize()

static void tp_object_initialize ( struct threadpool_object object,
struct threadpool pool,
PVOID  userdata,
TP_CALLBACK_ENVIRON environment 
)
static

Definition at line 2029 of file threadpool.c.

2031{
2032 BOOL is_simple_callback = (object->type == TP_OBJECT_TYPE_SIMPLE);
2033
2034 object->refcount = 1;
2035 object->shutdown = FALSE;
2036
2037 object->pool = pool;
2038 object->group = NULL;
2039 object->userdata = userdata;
2040 object->group_cancel_callback = NULL;
2041 object->finalization_callback = NULL;
2042 object->may_run_long = 0;
2043 object->race_dll = NULL;
2044 object->priority = TP_CALLBACK_PRIORITY_NORMAL;
2045
2046 memset( &object->group_entry, 0, sizeof(object->group_entry) );
2047 object->is_group_member = FALSE;
2048
2049 memset( &object->pool_entry, 0, sizeof(object->pool_entry) );
2050 RtlInitializeConditionVariable( &object->finished_event );
2051 RtlInitializeConditionVariable( &object->group_finished_event );
2052 object->completed_event = NULL;
2053 object->num_pending_callbacks = 0;
2054 object->num_running_callbacks = 0;
2055 object->num_associated_callbacks = 0;
2056
2057 if (environment)
2058 {
2059 if (environment->Version != 1 && environment->Version != 3)
2060 FIXME( "unsupported environment version %lu\n", environment->Version );
2061
2062 object->group = impl_from_TP_CLEANUP_GROUP( environment->CleanupGroup );
2063 object->group_cancel_callback = environment->CleanupGroupCancelCallback;
2064 object->finalization_callback = environment->FinalizationCallback;
2065 object->may_run_long = environment->u.s.LongFunction != 0;
2066 object->race_dll = environment->RaceDll;
2067#ifndef __REACTOS__ //Windows 7 stuff
2068 if (environment->Version == 3)
2069 {
2070 TP_CALLBACK_ENVIRON_V3 *environment_v3 = (TP_CALLBACK_ENVIRON_V3 *)environment;
2071
2072 object->priority = environment_v3->CallbackPriority;
2073 assert( object->priority < ARRAY_SIZE(pool->pools) );
2074 }
2075#endif
2076 if (environment->ActivationContext)
2077 FIXME( "activation context not supported yet\n" );
2078
2079 if (environment->u.s.Persistent)
2080 FIXME( "persistent threads not supported yet\n" );
2081 }
2082
2083 if (object->race_dll)
2084 LdrAddRefDll( 0, object->race_dll );
2085
2086 TRACE( "allocated object %p of type %u\n", object, object->type );
2087
2088 /* For simple callbacks we have to run tp_object_submit before adding this object
2089 * to the cleanup group. As soon as the cleanup group members are released ->shutdown
2090 * will be set, and tp_object_submit would fail with an assertion. */
2091
2092 if (is_simple_callback)
2093 tp_object_submit( object, FALSE );
2094
2095 if (object->group)
2096 {
2097 struct threadpool_group *group = object->group;
2098 InterlockedIncrement( &group->refcount );
2099
2101 list_add_tail( &group->members, &object->group_entry );
2102 object->is_group_member = TRUE;
2104 }
2105
2106 if (is_simple_callback)
2107 tp_object_release( object );
2108}
static void list_add_tail(struct list_entry *head, struct list_entry *entry)
Definition: list.h:83
NTSTATUS NTAPI LdrAddRefDll(_In_ ULONG Flags, _In_ PVOID BaseAddress)
Definition: ldrapi.c:1205
NTSYSAPI VOID NTAPI RtlInitializeConditionVariable(_Out_ PRTL_CONDITION_VARIABLE ConditionVariable)
static struct threadpool_group * impl_from_TP_CLEANUP_GROUP(TP_CLEANUP_GROUP *group)
Definition: threadpool.c:425
PTP_CLEANUP_GROUP CleanupGroup
Definition: winnt_old.h:4537
PTP_SIMPLE_CALLBACK FinalizationCallback
Definition: winnt_old.h:4541
TP_CALLBACK_PRIORITY CallbackPriority
Definition: winnt_old.h:4550
PTP_CLEANUP_GROUP_CANCEL_CALLBACK CleanupGroupCancelCallback
Definition: winnt_old.h:4538
struct _ACTIVATION_CONTEXT * ActivationContext
Definition: winnt_old.h:4540
@ TP_CALLBACK_PRIORITY_NORMAL
Definition: winnt_old.h:4500

Referenced by tp_alloc_wait(), TpAllocIoCompletion(), TpAllocTimer(), TpAllocWork(), and TpSimpleTryPost().

◆ tp_object_prepare_shutdown()

static void tp_object_prepare_shutdown ( struct threadpool_object object)
static

Definition at line 2240 of file threadpool.c.

2241{
2242 if (object->type == TP_OBJECT_TYPE_TIMER)
2243 tp_timerqueue_unlock( object );
2244 else if (object->type == TP_OBJECT_TYPE_WAIT)
2245 tp_waitqueue_unlock( object );
2246 else if (object->type == TP_OBJECT_TYPE_IO)
2247 tp_ioqueue_unlock( object );
2248}
static void tp_waitqueue_unlock(struct threadpool_object *wait)
Definition: threadpool.c:1596
static void tp_ioqueue_unlock(struct threadpool_object *io)
Definition: threadpool.c:2221
static void tp_timerqueue_unlock(struct threadpool_object *timer)
Definition: threadpool.c:1310

Referenced by tp_object_execute(), TpReleaseCleanupGroupMembers(), TpReleaseIoCompletion(), TpReleaseTimer(), TpReleaseWait(), and TpReleaseWork().

◆ tp_object_prio_queue()

static void tp_object_prio_queue ( struct threadpool_object object)
static

Definition at line 2110 of file threadpool.c.

2111{
2112 ++object->pool->num_busy_workers;
2113 list_add_tail( &object->pool->pools[object->priority], &object->pool_entry );
2114}

Referenced by threadpool_worker_proc(), and tp_object_submit().

◆ tp_object_release()

static BOOL tp_object_release ( struct threadpool_object object)
static

Definition at line 2255 of file threadpool.c.

2256{
2257 if (InterlockedDecrement( &object->refcount ))
2258 return FALSE;
2259
2260 TRACE( "destroying object %p of type %u\n", object, object->type );
2261
2262 assert( object->shutdown );
2263 assert( !object->num_pending_callbacks );
2264 assert( !object->num_running_callbacks );
2265 assert( !object->num_associated_callbacks );
2266
2267 /* release reference to the group */
2268 if (object->group)
2269 {
2270 struct threadpool_group *group = object->group;
2271
2273 if (object->is_group_member)
2274 {
2275 list_remove( &object->group_entry );
2276 object->is_group_member = FALSE;
2277 }
2279
2281 }
2282
2284
2285 if (object->race_dll)
2286 LdrUnloadDll( object->race_dll );
2287
2288 if (object->completed_event && object->completed_event != INVALID_HANDLE_VALUE)
2289 NtSetEvent( object->completed_event, NULL );
2290
2291 RtlFreeHeap( GetProcessHeap(), 0, object );
2292 return TRUE;
2293}
static BOOL tp_group_release(struct threadpool_group *group)
Definition: threadpool.c:2005

Referenced by ioqueue_thread_proc(), threadpool_worker_proc(), tp_object_cancel(), tp_object_initialize(), TpReleaseCleanupGroupMembers(), TpReleaseIoCompletion(), TpReleaseTimer(), TpReleaseWait(), TpReleaseWork(), and waitqueue_thread_proc().

◆ tp_object_submit()

static void tp_object_submit ( struct threadpool_object object,
BOOL  signaled 
)
static

Definition at line 2122 of file threadpool.c.

2123{
2124 struct threadpool *pool = object->pool;
2126
2127 assert( !object->shutdown );
2128 assert( !pool->shutdown );
2129
2131
2132 /* Start new worker threads if required. */
2133 if (pool->num_busy_workers >= pool->num_workers &&
2134 pool->num_workers < pool->max_workers)
2136
2137 /* Queue work item and increment refcount. */
2138 InterlockedIncrement( &object->refcount );
2139 if (!object->num_pending_callbacks++)
2140 tp_object_prio_queue( object );
2141
2142 /* Count how often the object was signaled. */
2143 if (object->type == TP_OBJECT_TYPE_WAIT && signaled)
2144 object->u.wait.signaled++;
2145
2146 /* No new thread started - wake up one existing thread. */
2147 if (status != STATUS_SUCCESS)
2148 {
2149 assert( pool->num_workers > 0 );
2150 RtlWakeConditionVariable( &pool->update_event );
2151 }
2152
2154}
static NTSTATUS tp_new_worker_thread(struct threadpool *pool)
Definition: threadpool.c:1245
#define STATUS_UNSUCCESSFUL
Definition: udferr_usr.h:132

Referenced by ioqueue_thread_proc(), timerqueue_thread_proc(), tp_object_initialize(), TpPostWork(), TpSetTimer(), and waitqueue_thread_proc().

◆ tp_object_wait()

static void tp_object_wait ( struct threadpool_object object,
BOOL  group_wait 
)
static

Definition at line 2206 of file threadpool.c.

2207{
2208 struct threadpool *pool = object->pool;
2209
2211 while (!object_is_finished( object, group_wait ))
2212 {
2213 if (group_wait)
2214 RtlSleepConditionVariableCS( &object->group_finished_event, &pool->cs, NULL );
2215 else
2216 RtlSleepConditionVariableCS( &object->finished_event, &pool->cs, NULL );
2217 }
2219}

Referenced by TpReleaseCleanupGroupMembers(), TpWaitForIoCompletion(), TpWaitForTimer(), TpWaitForWait(), and TpWaitForWork().

◆ tp_threadpool_alloc()

static NTSTATUS tp_threadpool_alloc ( struct threadpool **  out)
static

Definition at line 1785 of file threadpool.c.

1786{
1787#ifdef __REACTOS__
1788 IMAGE_NT_HEADERS *nt = RtlImageNtHeader( NtCurrentTeb()->ProcessEnvironmentBlock->ImageBaseAddress );
1789#else
1791#endif
1792 struct threadpool *pool;
1793 unsigned int i;
1794
1795 pool = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*pool) );
1796 if (!pool)
1797 return STATUS_NO_MEMORY;
1798
1799 pool->refcount = 1;
1800 pool->objcount = 0;
1801 pool->shutdown = FALSE;
1802
1803#ifdef __REACTOS__
1805#else
1807
1808 pool->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool.cs");
1809#endif
1810
1811 for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
1812 list_init( &pool->pools[i] );
1813 RtlInitializeConditionVariable( &pool->update_event );
1814
1815 pool->max_workers = 500;
1816 pool->min_workers = 0;
1817 pool->num_workers = 0;
1818 pool->num_busy_workers = 0;
1819 pool->stack_info.StackReserve = nt->OptionalHeader.SizeOfStackReserve;
1820 pool->stack_info.StackCommit = nt->OptionalHeader.SizeOfStackCommit;
1821
1822 TRACE( "allocated threadpool %p\n", pool );
1823
1824 *out = pool;
1825 return STATUS_SUCCESS;
1826}
#define RtlImageNtHeader
Definition: compat.h:806
PPEB Peb
Definition: dllmain.c:27
#define NtCurrentTeb
IMAGE_NT_HEADERS nt
Definition: module.c:50
IMAGE_OPTIONAL_HEADER32 OptionalHeader
Definition: ntddk_ex.h:184
PVOID ImageBaseAddress
Definition: ntddk_ex.h:245

Referenced by tp_threadpool_lock(), and TpAllocPool().

◆ tp_threadpool_lock()

static NTSTATUS tp_threadpool_lock ( struct threadpool **  out,
TP_CALLBACK_ENVIRON environment 
)
static

Definition at line 1877 of file threadpool.c.

1878{
1879 struct threadpool *pool = NULL;
1881
1882 if (environment)
1883 {
1884#ifndef __REACTOS__ //Windows 7 stuff
1885 /* Validate environment parameters. */
1886 if (environment->Version == 3)
1887 {
1888 TP_CALLBACK_ENVIRON_V3 *environment3 = (TP_CALLBACK_ENVIRON_V3 *)environment;
1889
1890 switch (environment3->CallbackPriority)
1891 {
1895 break;
1896 default:
1898 }
1899 }
1900#endif
1901 pool = (struct threadpool *)environment->Pool;
1902 }
1903
1904 if (!pool)
1905 {
1906 if (!default_threadpool)
1907 {
1909 if (status != STATUS_SUCCESS)
1910 return status;
1911
1913 {
1916 }
1917 }
1918
1920 }
1921
1923
1924 /* Make sure that the threadpool has at least one thread. */
1925 if (!pool->num_workers)
1927
1928 /* Keep a reference, and increment objcount to ensure that the
1929 * last thread doesn't terminate. */
1930 if (status == STATUS_SUCCESS)
1931 {
1932 InterlockedIncrement( &pool->refcount );
1933 pool->objcount++;
1934 }
1935
1937
1938 if (status != STATUS_SUCCESS)
1939 return status;
1940
1941 *out = pool;
1942 return STATUS_SUCCESS;
1943}
static struct threadpool * default_threadpool
Definition: threadpool.c:444
static NTSTATUS tp_threadpool_alloc(struct threadpool **out)
Definition: threadpool.c:1785
static void tp_threadpool_shutdown(struct threadpool *pool)
Definition: threadpool.c:1835
#define STATUS_INVALID_PARAMETER
Definition: udferr_usr.h:135
@ TP_CALLBACK_PRIORITY_HIGH
Definition: winnt_old.h:4499
@ TP_CALLBACK_PRIORITY_LOW
Definition: winnt_old.h:4501

Referenced by tp_alloc_wait(), TpAllocIoCompletion(), TpAllocTimer(), TpAllocWork(), and TpSimpleTryPost().

◆ tp_threadpool_release()

static BOOL tp_threadpool_release ( struct threadpool pool)
static

Definition at line 1848 of file threadpool.c.

1849{
1850 unsigned int i;
1851
1852 if (InterlockedDecrement( &pool->refcount ))
1853 return FALSE;
1854
1855 TRACE( "destroying threadpool %p\n", pool );
1856
1857 assert( pool->shutdown );
1858 assert( !pool->objcount );
1859 for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
1860 assert( list_empty( &pool->pools[i] ) );
1861#ifndef __REACTOS__
1862 pool->cs.DebugInfo->Spare[0] = 0;
1863#endif
1865
1867 return TRUE;
1868}

Referenced by threadpool_worker_proc(), tp_threadpool_lock(), tp_threadpool_unlock(), and TpReleasePool().

◆ tp_threadpool_shutdown()

static void tp_threadpool_shutdown ( struct threadpool pool)
static

Definition at line 1835 of file threadpool.c.

1836{
1838
1839 pool->shutdown = TRUE;
1840 RtlWakeAllConditionVariable( &pool->update_event );
1841}

Referenced by tp_threadpool_lock(), and TpReleasePool().

◆ tp_threadpool_unlock()

static void tp_threadpool_unlock ( struct threadpool pool)
static

Definition at line 1950 of file threadpool.c.

1951{
1953 pool->objcount--;
1956}

Referenced by tp_alloc_wait(), tp_object_release(), TpAllocIoCompletion(), and TpAllocTimer().

◆ tp_timerqueue_lock()

static NTSTATUS tp_timerqueue_lock ( struct threadpool_object timer)
static

Definition at line 1268 of file threadpool.c.

1269{
1271 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1272
1273 timer->u.timer.timer_initialized = FALSE;
1274 timer->u.timer.timer_pending = FALSE;
1275 timer->u.timer.timer_set = FALSE;
1276 timer->u.timer.timeout = 0;
1277 timer->u.timer.period = 0;
1278 timer->u.timer.window_length = 0;
1279
1281
1282 /* Make sure that the timerqueue thread is running. */
1283 if (!timerqueue.thread_running)
1284 {
1285 HANDLE thread;
1288 if (status == STATUS_SUCCESS)
1289 {
1290 timerqueue.thread_running = TRUE;
1291 NtClose( thread );
1292 }
1293 }
1294
1295 if (status == STATUS_SUCCESS)
1296 {
1297 timer->u.timer.timer_initialized = TRUE;
1298 timerqueue.objcount++;
1299 }
1300
1302 return status;
1303}
static void CALLBACK timerqueue_thread_proc(void *param)
Definition: threadpool.c:1147

Referenced by TpAllocTimer().

◆ tp_timerqueue_unlock()

static void tp_timerqueue_unlock ( struct threadpool_object timer)
static

Definition at line 1310 of file threadpool.c.

1311{
1312 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1313
1315 if (timer->u.timer.timer_initialized)
1316 {
1317 /* If timer was pending, remove it. */
1318 if (timer->u.timer.timer_pending)
1319 {
1320 list_remove( &timer->u.timer.timer_entry );
1321 timer->u.timer.timer_pending = FALSE;
1322 }
1323
1324 /* If the last timer object was destroyed, then wake up the thread. */
1325 if (!--timerqueue.objcount)
1326 {
1327 assert( list_empty( &timerqueue.pending_timers ) );
1328 RtlWakeAllConditionVariable( &timerqueue.update_event );
1329 }
1330
1331 timer->u.timer.timer_initialized = FALSE;
1332 }
1334}

Referenced by tp_object_prepare_shutdown().

◆ tp_waitqueue_lock()

static NTSTATUS tp_waitqueue_lock ( struct threadpool_object wait)
static

Definition at line 1518 of file threadpool.c.

1519{
1520 struct waitqueue_bucket *bucket;
1522 HANDLE thread;
1523 BOOL alertable = (wait->u.wait.flags & WT_EXECUTEINIOTHREAD) != 0;
1524 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1525
1526 wait->u.wait.signaled = 0;
1527 wait->u.wait.bucket = NULL;
1528 wait->u.wait.wait_pending = FALSE;
1529 wait->u.wait.timeout = 0;
1530 wait->u.wait.handle = INVALID_HANDLE_VALUE;
1531
1533
1534 /* Try to assign to existing bucket if possible. */
1535 LIST_FOR_EACH_ENTRY( bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
1536 {
1537 if (bucket->objcount < MAXIMUM_WAITQUEUE_OBJECTS && bucket->alertable == alertable)
1538 {
1539 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1540 wait->u.wait.bucket = bucket;
1541 bucket->objcount++;
1542
1544 goto out;
1545 }
1546 }
1547
1548 /* Create a new bucket and corresponding worker thread. */
1549 bucket = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*bucket) );
1550 if (!bucket)
1551 {
1553 goto out;
1554 }
1555
1556 bucket->objcount = 0;
1557 bucket->alertable = alertable;
1558 list_init( &bucket->reserved );
1559 list_init( &bucket->waiting );
1560
1563 if (status)
1564 {
1565 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1566 goto out;
1567 }
1568
1571 if (status == STATUS_SUCCESS)
1572 {
1573 list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
1574 waitqueue.num_buckets++;
1575
1576 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1577 wait->u.wait.bucket = bucket;
1578 bucket->objcount++;
1579
1580 NtClose( thread );
1581 }
1582 else
1583 {
1584 NtClose( bucket->update_event );
1585 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1586 }
1587
1588out:
1590 return status;
1591}
ULONG(NTAPI * PTHREAD_START_ROUTINE)(PVOID Parameter)
Definition: rtltypes.h:560
#define MAXIMUM_WAITQUEUE_OBJECTS
Definition: threadpool.c:145
static void CALLBACK waitqueue_thread_proc(void *param)
Definition: threadpool.c:1342
struct list waiting
Definition: threadpool.c:356
HANDLE update_event
Definition: threadpool.c:357
struct list bucket_entry
Definition: threadpool.c:353
struct list reserved
Definition: threadpool.c:355

Referenced by tp_alloc_wait().

◆ tp_waitqueue_unlock()

static void tp_waitqueue_unlock ( struct threadpool_object wait)
static

Definition at line 1596 of file threadpool.c.

1597{
1598 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1599
1601 if (wait->u.wait.bucket)
1602 {
1603 struct waitqueue_bucket *bucket = wait->u.wait.bucket;
1604 assert( bucket->objcount > 0 );
1605
1606 list_remove( &wait->u.wait.wait_entry );
1607 wait->u.wait.bucket = NULL;
1608 bucket->objcount--;
1609
1610 NtSetEvent( bucket->update_event, NULL );
1611 }
1613}

Referenced by tp_object_prepare_shutdown().

◆ TpAllocCleanupGroup()

NTSTATUS WINAPI TpAllocCleanupGroup ( TP_CLEANUP_GROUP **  out)

Definition at line 2537 of file threadpool.c.

2538{
2539 TRACE( "%p\n", out );
2540
2541 return tp_group_alloc( (struct threadpool_group **)out );
2542}
static NTSTATUS tp_group_alloc(struct threadpool_group **out)
Definition: threadpool.c:1963

Referenced by init_threadpool().

◆ TpAllocIoCompletion()

NTSTATUS WINAPI TpAllocIoCompletion ( TP_IO **  out,
HANDLE  file,
PTP_IO_CALLBACK  callback,
void userdata,
TP_CALLBACK_ENVIRON environment 
)

Definition at line 2547 of file threadpool.c.

2549{
2550 struct threadpool_object *object;
2551 struct threadpool *pool;
2553
2554 TRACE( "%p %p %p %p %p\n", out, file, callback, userdata, environment );
2555
2556 if (!(object = RtlAllocateHeap( GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(*object) )))
2557 return STATUS_NO_MEMORY;
2558
2559 if ((status = tp_threadpool_lock( &pool, environment )))
2560 {
2561 RtlFreeHeap( GetProcessHeap(), 0, object );
2562 return status;
2563 }
2564
2565 object->type = TP_OBJECT_TYPE_IO;
2566 object->u.io.callback = callback;
2567 if (!(object->u.io.completions = RtlAllocateHeap( GetProcessHeap(), 0, 8 * sizeof(*object->u.io.completions) )))
2568 {
2570 RtlFreeHeap( GetProcessHeap(), 0, object );
2571 return status;
2572 }
2573
2574 if ((status = tp_ioqueue_lock( object, file )))
2575 {
2577 RtlFreeHeap( GetProcessHeap(), 0, object->u.io.completions );
2578 RtlFreeHeap( GetProcessHeap(), 0, object );
2579 return status;
2580 }
2581
2582 tp_object_initialize( object, pool, userdata, environment );
2583
2584 *out = (TP_IO *)object;
2585 return STATUS_SUCCESS;
2586}
#define HEAP_ZERO_MEMORY
Definition: compat.h:134
static NTSTATUS tp_ioqueue_lock(struct threadpool_object *io, HANDLE file)
Definition: threadpool.c:1727

Referenced by init_threadpool().

◆ TpAllocPool()

NTSTATUS WINAPI TpAllocPool ( TP_POOL **  out,
PVOID  reserved 
)

Definition at line 2591 of file threadpool.c.

2592{
2593 TRACE( "%p %p\n", out, reserved );
2594
2595 if (reserved)
2596 FIXME( "reserved argument is nonzero (%p)\n", reserved );
2597
2598 return tp_threadpool_alloc( (struct threadpool **)out );
2599}
r reserved
Definition: btrfs.c:3006

Referenced by init_threadpool().

◆ TpAllocTimer()

NTSTATUS WINAPI TpAllocTimer ( TP_TIMER **  out,
PTP_TIMER_CALLBACK  callback,
PVOID  userdata,
TP_CALLBACK_ENVIRON environment 
)

Definition at line 2604 of file threadpool.c.

2606{
2607 struct threadpool_object *object;
2608 struct threadpool *pool;
2610
2611 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2612
2613 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2614 if (!object)
2615 return STATUS_NO_MEMORY;
2616
2617 status = tp_threadpool_lock( &pool, environment );
2618 if (status)
2619 {
2620 RtlFreeHeap( GetProcessHeap(), 0, object );
2621 return status;
2622 }
2623
2624 object->type = TP_OBJECT_TYPE_TIMER;
2625 object->u.timer.callback = callback;
2626
2627 status = tp_timerqueue_lock( object );
2628 if (status)
2629 {
2631 RtlFreeHeap( GetProcessHeap(), 0, object );
2632 return status;
2633 }
2634
2635 tp_object_initialize( object, pool, userdata, environment );
2636
2637 *out = (TP_TIMER *)object;
2638 return STATUS_SUCCESS;
2639}
static NTSTATUS tp_timerqueue_lock(struct threadpool_object *timer)
Definition: threadpool.c:1268

Referenced by init_threadpool().

◆ TpAllocWait()

NTSTATUS WINAPI TpAllocWait ( TP_WAIT **  out,
PTP_WAIT_CALLBACK  callback,
PVOID  userdata,
TP_CALLBACK_ENVIRON environment 
)

Definition at line 2680 of file threadpool.c.

2682{
2683 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2684 return tp_alloc_wait( out, callback, userdata, environment, WT_EXECUTEONLYONCE );
2685}

Referenced by init_threadpool().

◆ TpAllocWork()

NTSTATUS WINAPI TpAllocWork ( TP_WORK **  out,
PTP_WORK_CALLBACK  callback,
PVOID  userdata,
TP_CALLBACK_ENVIRON environment 
)

Definition at line 2690 of file threadpool.c.

2692{
2693 struct threadpool_object *object;
2694 struct threadpool *pool;
2696
2697 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2698
2699 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2700 if (!object)
2701 return STATUS_NO_MEMORY;
2702
2703 status = tp_threadpool_lock( &pool, environment );
2704 if (status)
2705 {
2706 RtlFreeHeap( GetProcessHeap(), 0, object );
2707 return status;
2708 }
2709
2710 object->type = TP_OBJECT_TYPE_WORK;
2711 object->u.work.callback = callback;
2712 tp_object_initialize( object, pool, userdata, environment );
2713
2714 *out = (TP_WORK *)object;
2715 return STATUS_SUCCESS;
2716}

Referenced by init_threadpool().

◆ TpCallbackLeaveCriticalSectionOnCompletion()

VOID WINAPI TpCallbackLeaveCriticalSectionOnCompletion ( TP_CALLBACK_INSTANCE instance,
CRITICAL_SECTION crit 
)

Definition at line 2743 of file threadpool.c.

2744{
2746
2747 TRACE( "%p %p\n", instance, crit );
2748
2749 if (!this->cleanup.critical_section)
2750 this->cleanup.critical_section = crit;
2751}
static void cleanup(void)
Definition: main.c:1335
static struct threadpool_instance * impl_from_TP_CALLBACK_INSTANCE(TP_CALLBACK_INSTANCE *instance)
Definition: threadpool.c:430

◆ TpCallbackMayRunLong()

NTSTATUS WINAPI TpCallbackMayRunLong ( TP_CALLBACK_INSTANCE instance)

Definition at line 2756 of file threadpool.c.

2757{
2759 struct threadpool_object *object = this->object;
2760 struct threadpool *pool;
2762
2763 TRACE( "%p\n", instance );
2764
2765 if (this->threadid != GetCurrentThreadId())
2766 {
2767 ERR("called from wrong thread, ignoring\n");
2768 return STATUS_UNSUCCESSFUL; /* FIXME */
2769 }
2770
2771 if (this->may_run_long)
2772 return STATUS_SUCCESS;
2773
2774 pool = object->pool;
2776
2777 /* Start new worker threads if required. */
2778 if (pool->num_busy_workers >= pool->num_workers)
2779 {
2780 if (pool->num_workers < pool->max_workers)
2781 {
2783 }
2784 else
2785 {
2787 }
2788 }
2789
2791 this->may_run_long = TRUE;
2792 return status;
2793}
#define STATUS_TOO_MANY_THREADS
Definition: ntstatus.h:533

Referenced by init_threadpool().

◆ TpCallbackReleaseMutexOnCompletion()

VOID WINAPI TpCallbackReleaseMutexOnCompletion ( TP_CALLBACK_INSTANCE instance,
HANDLE  mutex 
)

Definition at line 2798 of file threadpool.c.

2799{
2801
2802 TRACE( "%p %p\n", instance, mutex );
2803
2804 if (!this->cleanup.mutex)
2805 this->cleanup.mutex = mutex;
2806}
Definition: module.h:456

◆ TpCallbackReleaseSemaphoreOnCompletion()

VOID WINAPI TpCallbackReleaseSemaphoreOnCompletion ( TP_CALLBACK_INSTANCE instance,
HANDLE  semaphore,
DWORD  count 
)

Definition at line 2811 of file threadpool.c.

2812{
2814
2815 TRACE( "%p %p %lu\n", instance, semaphore, count );
2816
2817 if (!this->cleanup.semaphore)
2818 {
2819 this->cleanup.semaphore = semaphore;
2820 this->cleanup.semaphore_count = count;
2821 }
2822}
HANDLE semaphore
Definition: threadpool.c:1889

Referenced by init_threadpool().

◆ TpCallbackSetEventOnCompletion()

VOID WINAPI TpCallbackSetEventOnCompletion ( TP_CALLBACK_INSTANCE instance,
HANDLE  event 
)

Definition at line 2827 of file threadpool.c.

2828{
2830
2831 TRACE( "%p %p\n", instance, event );
2832
2833 if (!this->cleanup.event)
2834 this->cleanup.event = event;
2835}

◆ TpCallbackUnloadDllOnCompletion()

VOID WINAPI TpCallbackUnloadDllOnCompletion ( TP_CALLBACK_INSTANCE instance,
HMODULE  module 
)

Definition at line 2840 of file threadpool.c.

2841{
2843
2844 TRACE( "%p %p\n", instance, module );
2845
2846 if (!this->cleanup.library)
2847 this->cleanup.library = module;
2848}

◆ TpCancelAsyncIoOperation()

void WINAPI TpCancelAsyncIoOperation ( TP_IO io)

Definition at line 2721 of file threadpool.c.

2722{
2723 struct threadpool_object *this = impl_from_TP_IO( io );
2724
2725 TRACE( "%p\n", io );
2726
2727 RtlEnterCriticalSection( &this->pool->cs );
2728
2729 TRACE("pending_count %u.\n", this->u.io.pending_count);
2730
2731 this->u.io.pending_count--;
2732 if (object_is_finished( this, TRUE ))
2734 if (object_is_finished( this, FALSE ))
2736
2737 RtlLeaveCriticalSection( &this->pool->cs );
2738}
static struct threadpool_object * impl_from_TP_IO(TP_IO *io)
Definition: threadpool.c:418
RTL_CONDITION_VARIABLE finished_event
Definition: threadpool.c:202
RTL_CONDITION_VARIABLE group_finished_event
Definition: threadpool.c:203

Referenced by init_threadpool().

◆ TpDisassociateCallback()

VOID WINAPI TpDisassociateCallback ( TP_CALLBACK_INSTANCE instance)

Definition at line 2853 of file threadpool.c.

2854{
2856 struct threadpool_object *object = this->object;
2857 struct threadpool *pool;
2858
2859 TRACE( "%p\n", instance );
2860
2861 if (this->threadid != GetCurrentThreadId())
2862 {
2863 ERR("called from wrong thread, ignoring\n");
2864 return;
2865 }
2866
2867 if (!this->associated)
2868 return;
2869
2870 pool = object->pool;
2872
2873 object->num_associated_callbacks--;
2874 if (object_is_finished( object, FALSE ))
2875 RtlWakeAllConditionVariable( &object->finished_event );
2876
2878 this->associated = FALSE;
2879}

Referenced by init_threadpool().

◆ TpIsTimerSet()

BOOL WINAPI TpIsTimerSet ( TP_TIMER timer)

Definition at line 2884 of file threadpool.c.

2885{
2886 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2887
2888 TRACE( "%p\n", timer );
2889
2890 return this->u.timer.timer_set;
2891}
static struct threadpool_object * impl_from_TP_TIMER(TP_TIMER *timer)
Definition: threadpool.c:404

Referenced by init_threadpool().

◆ TpPostWork()

VOID WINAPI TpPostWork ( TP_WORK work)

Definition at line 2896 of file threadpool.c.

2897{
2898 struct threadpool_object *this = impl_from_TP_WORK( work );
2899
2900 TRACE( "%p\n", work );
2901
2902 tp_object_submit( this, FALSE );
2903}
static struct threadpool_object * impl_from_TP_WORK(TP_WORK *work)
Definition: threadpool.c:397

Referenced by init_threadpool().

◆ TpQueryPoolStackInformation()

NTSTATUS WINAPI TpQueryPoolStackInformation ( TP_POOL pool,
TP_POOL_STACK_INFORMATION stack_info 
)

Definition at line 3367 of file threadpool.c.

3368{
3369 struct threadpool *this = impl_from_TP_POOL( pool );
3370
3371 TRACE( "%p %p\n", pool, stack_info );
3372
3373 if (!stack_info)
3375
3376 RtlEnterCriticalSection( &this->cs );
3377 *stack_info = this->stack_info;
3378 RtlLeaveCriticalSection( &this->cs );
3379
3380 return STATUS_SUCCESS;
3381}
CRITICAL_SECTION cs
Definition: threadpool.c:290
static struct threadpool * impl_from_TP_POOL(TP_POOL *pool)
Definition: threadpool.c:392
TP_POOL_STACK_INFORMATION stack_info
Definition: threadpool.c:163

Referenced by init_threadpool().

◆ TpReleaseCleanupGroup()

VOID WINAPI TpReleaseCleanupGroup ( TP_CLEANUP_GROUP group)

Definition at line 2908 of file threadpool.c.

2909{
2911
2912 TRACE( "%p\n", group );
2913
2914 tp_group_shutdown( this );
2915 tp_group_release( this );
2916}
static void tp_group_shutdown(struct threadpool_group *group)
Definition: threadpool.c:1995

Referenced by init_threadpool().

◆ TpReleaseCleanupGroupMembers()

VOID WINAPI TpReleaseCleanupGroupMembers ( TP_CLEANUP_GROUP group,
BOOL  cancel_pending,
PVOID  userdata 
)

Definition at line 2921 of file threadpool.c.

2922{
2924 struct threadpool_object *object, *next;
2925 struct list members;
2926
2927 TRACE( "%p %u %p\n", group, cancel_pending, userdata );
2928
2929 RtlEnterCriticalSection( &this->cs );
2930
2931 /* Unset group, increase references, and mark objects for shutdown */
2932 LIST_FOR_EACH_ENTRY_SAFE( object, next, &this->members, struct threadpool_object, group_entry )
2933 {
2934 assert( object->group == this );
2935 assert( object->is_group_member );
2936
2937 if (InterlockedIncrement( &object->refcount ) == 1)
2938 {
2939 /* Object is basically already destroyed, but group reference
2940 * was not deleted yet. We can safely ignore this object. */
2941 InterlockedDecrement( &object->refcount );
2942 list_remove( &object->group_entry );
2943 object->is_group_member = FALSE;
2944 continue;
2945 }
2946
2947 object->is_group_member = FALSE;
2949 }
2950
2951 /* Move members to a new temporary list */
2952 list_init( &members );
2953 list_move_tail( &members, &this->members );
2954
2955 RtlLeaveCriticalSection( &this->cs );
2956
2957 /* Cancel pending callbacks if requested */
2958 if (cancel_pending)
2959 {
2960 LIST_FOR_EACH_ENTRY( object, &members, struct threadpool_object, group_entry )
2961 {
2962 tp_object_cancel( object );
2963 }
2964 }
2965
2966 /* Wait for remaining callbacks to finish */
2967 LIST_FOR_EACH_ENTRY_SAFE( object, next, &members, struct threadpool_object, group_entry )
2968 {
2969 tp_object_wait( object, TRUE );
2970
2971 if (!object->shutdown)
2972 {
2973 /* Execute group cancellation callback if defined, and if this was actually a group cancel. */
2974 if (cancel_pending && object->group_cancel_callback)
2975 {
2976 TRACE( "executing group cancel callback %p(%p, %p)\n",
2977 object->group_cancel_callback, object->userdata, userdata );
2978 object->group_cancel_callback( object->userdata, userdata );
2979 TRACE( "callback %p returned\n", object->group_cancel_callback );
2980 }
2981
2982 if (object->type != TP_OBJECT_TYPE_SIMPLE)
2983 tp_object_release( object );
2984 }
2985
2986 object->shutdown = TRUE;
2987 tp_object_release( object );
2988 }
2989}
static void list_move_tail(struct list_head *list, struct list_head *head)
Definition: list.h:122
static void tp_object_wait(struct threadpool_object *object, BOOL group_wait)
Definition: threadpool.c:2206
static void tp_object_cancel(struct threadpool_object *object)
Definition: threadpool.c:2161

Referenced by init_threadpool().

◆ TpReleaseIoCompletion()

void WINAPI TpReleaseIoCompletion ( TP_IO io)

Definition at line 2994 of file threadpool.c.

2995{
2996 struct threadpool_object *this = impl_from_TP_IO( io );
2997 BOOL can_destroy;
2998
2999 TRACE( "%p\n", io );
3000
3001 RtlEnterCriticalSection( &this->pool->cs );
3002 this->u.io.shutting_down = TRUE;
3003 can_destroy = !this->u.io.pending_count && !this->u.io.skipped_count;
3004 RtlLeaveCriticalSection( &this->pool->cs );
3005
3006 if (can_destroy)
3007 {
3009 this->shutdown = TRUE;
3010 tp_object_release( this );
3011 }
3012}
INT WSAAPI shutdown(IN SOCKET s, IN INT how)
Definition: sockctrl.c:506

Referenced by init_threadpool().

◆ TpReleasePool()

VOID WINAPI TpReleasePool ( TP_POOL pool)

Definition at line 3017 of file threadpool.c.

3018{
3019 struct threadpool *this = impl_from_TP_POOL( pool );
3020
3021 TRACE( "%p\n", pool );
3022
3023 tp_threadpool_shutdown( this );
3024 tp_threadpool_release( this );
3025}

Referenced by init_threadpool().

◆ TpReleaseTimer()

VOID WINAPI TpReleaseTimer ( TP_TIMER timer)

Definition at line 3030 of file threadpool.c.

3031{
3032 struct threadpool_object *this = impl_from_TP_TIMER( timer );
3033
3034 TRACE( "%p\n", timer );
3035
3037 this->shutdown = TRUE;
3038 tp_object_release( this );
3039}

Referenced by init_threadpool().

◆ TpReleaseWait()

VOID WINAPI TpReleaseWait ( TP_WAIT wait)

Definition at line 3044 of file threadpool.c.

3045{
3046 struct threadpool_object *this = impl_from_TP_WAIT( wait );
3047
3048 TRACE( "%p\n", wait );
3049
3051 this->shutdown = TRUE;
3052 tp_object_release( this );
3053}

Referenced by init_threadpool(), and RtlDeregisterWaitEx().

◆ TpReleaseWork()

VOID WINAPI TpReleaseWork ( TP_WORK work)

Definition at line 3058 of file threadpool.c.

3059{
3060 struct threadpool_object *this = impl_from_TP_WORK( work );
3061
3062 TRACE( "%p\n", work );
3063
3065 this->shutdown = TRUE;
3066 tp_object_release( this );
3067}

Referenced by init_threadpool().

◆ TpSetPoolMaxThreads()

VOID WINAPI TpSetPoolMaxThreads ( TP_POOL pool,
DWORD  maximum 
)

Definition at line 3072 of file threadpool.c.

3073{
3074 struct threadpool *this = impl_from_TP_POOL( pool );
3075
3076 TRACE( "%p %lu\n", pool, maximum );
3077
3078 RtlEnterCriticalSection( &this->cs );
3079 this->max_workers = max( maximum, 1 );
3080 this->min_workers = min( this->min_workers, this->max_workers );
3081 RtlLeaveCriticalSection( &this->cs );
3082}
#define min(a, b)
Definition: monoChain.cc:55
int max_workers
Definition: threadpool.c:158
int min_workers
Definition: threadpool.c:159

Referenced by init_threadpool().

◆ TpSetPoolMinThreads()

BOOL WINAPI TpSetPoolMinThreads ( TP_POOL pool,
DWORD  minimum 
)

Definition at line 3087 of file threadpool.c.

3088{
3089 struct threadpool *this = impl_from_TP_POOL( pool );
3091
3092 TRACE( "%p %lu\n", pool, minimum );
3093
3094 RtlEnterCriticalSection( &this->cs );
3095
3096 while (this->num_workers < minimum)
3097 {
3098 status = tp_new_worker_thread( this );
3099 if (status != STATUS_SUCCESS)
3100 break;
3101 }
3102
3103 if (status == STATUS_SUCCESS)
3104 {
3105 this->min_workers = minimum;
3106 this->max_workers = max( this->min_workers, this->max_workers );
3107 }
3108
3109 RtlLeaveCriticalSection( &this->cs );
3110 return !status;
3111}
int num_workers
Definition: threadpool.c:160

◆ TpSetPoolStackInformation()

NTSTATUS WINAPI TpSetPoolStackInformation ( TP_POOL pool,
TP_POOL_STACK_INFORMATION stack_info 
)

Definition at line 3348 of file threadpool.c.

3349{
3350 struct threadpool *this = impl_from_TP_POOL( pool );
3351
3352 TRACE( "%p %p\n", pool, stack_info );
3353
3354 if (!stack_info)
3356
3357 RtlEnterCriticalSection( &this->cs );
3358 this->stack_info = *stack_info;
3359 RtlLeaveCriticalSection( &this->cs );
3360
3361 return STATUS_SUCCESS;
3362}

Referenced by init_threadpool().

◆ TpSetTimer()

VOID WINAPI TpSetTimer ( TP_TIMER timer,
LARGE_INTEGER timeout,
LONG  period,
LONG  window_length 
)

Definition at line 3116 of file threadpool.c.

3117{
3118 struct threadpool_object *this = impl_from_TP_TIMER( timer );
3119 struct threadpool_object *other_timer;
3120 BOOL submit_timer = FALSE;
3122
3123 TRACE( "%p %p %lu %lu\n", timer, timeout, period, window_length );
3124
3126
3127 assert( this->u.timer.timer_initialized );
3128 this->u.timer.timer_set = timeout != NULL;
3129
3130 /* Convert relative timeout to absolute timestamp and handle a timeout
3131 * of zero, which means that the timer is submitted immediately. */
3132 if (timeout)
3133 {
3134 timestamp = timeout->QuadPart;
3135 if ((LONGLONG)timestamp < 0)
3136 {
3139 timestamp = now.QuadPart - timestamp;
3140 }
3141 else if (!timestamp)
3142 {
3143 if (!period)
3144 timeout = NULL;
3145 else
3146 {
3149 timestamp = now.QuadPart + (ULONGLONG)period * 10000;
3150 }
3151 submit_timer = TRUE;
3152 }
3153 }
3154
3155 /* First remove existing timeout. */
3156 if (this->u.timer.timer_pending)
3157 {
3158 list_remove( &this->u.timer.timer_entry );
3159 this->u.timer.timer_pending = FALSE;
3160 }
3161
3162 /* If the timer was enabled, then add it back to the queue. */
3163 if (timeout)
3164 {
3165 this->u.timer.timeout = timestamp;
3166 this->u.timer.period = period;
3167 this->u.timer.window_length = window_length;
3168
3169 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
3170 struct threadpool_object, u.timer.timer_entry )
3171 {
3172 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
3173 if (this->u.timer.timeout < other_timer->u.timer.timeout)
3174 break;
3175 }
3176 list_add_before( &other_timer->u.timer.timer_entry, &this->u.timer.timer_entry );
3177
3178 /* Wake up the timer thread when the timeout has to be updated. */
3179 if (list_head( &timerqueue.pending_timers ) == &this->u.timer.timer_entry )
3180 RtlWakeAllConditionVariable( &timerqueue.update_event );
3181
3182 this->u.timer.timer_pending = TRUE;
3183 }
3184
3186
3187 if (submit_timer)
3188 tp_object_submit( this, FALSE );
3189}
int64_t LONGLONG
Definition: typedefs.h:68

Referenced by init_threadpool().

◆ TpSetWait()

VOID WINAPI TpSetWait ( TP_WAIT wait,
HANDLE  handle,
LARGE_INTEGER timeout 
)

Definition at line 3194 of file threadpool.c.

3195{
3196 struct threadpool_object *this = impl_from_TP_WAIT( wait );
3198
3199 TRACE( "%p %p %p\n", wait, handle, timeout );
3200
3202
3203 assert( this->u.wait.bucket );
3204 this->u.wait.handle = handle;
3205
3206 if (handle || this->u.wait.wait_pending)
3207 {
3208 struct waitqueue_bucket *bucket = this->u.wait.bucket;
3209 list_remove( &this->u.wait.wait_entry );
3210
3211 /* Convert relative timeout to absolute timestamp. */
3212 if (handle && timeout)
3213 {
3214 timestamp = timeout->QuadPart;
3215 if ((LONGLONG)timestamp < 0)
3216 {
3219 timestamp = now.QuadPart - timestamp;
3220 }
3221 }
3222
3223 /* Add wait object back into one of the queues. */
3224 if (handle)
3225 {
3226 list_add_tail( &bucket->waiting, &this->u.wait.wait_entry );
3227 this->u.wait.wait_pending = TRUE;
3228 this->u.wait.timeout = timestamp;
3229 }
3230 else
3231 {
3232 list_add_tail( &bucket->reserved, &this->u.wait.wait_entry );
3233 this->u.wait.wait_pending = FALSE;
3234 }
3235
3236 /* Wake up the wait queue thread. */
3237 NtSetEvent( bucket->update_event, NULL );
3238 }
3239
3241}

Referenced by init_threadpool(), RtlDeregisterWaitEx(), and RtlRegisterWait().

◆ TpSimpleTryPost()

NTSTATUS WINAPI TpSimpleTryPost ( PTP_SIMPLE_CALLBACK  callback,
PVOID  userdata,
TP_CALLBACK_ENVIRON environment 
)

Definition at line 3246 of file threadpool.c.

3248{
3249 struct threadpool_object *object;
3250 struct threadpool *pool;
3252
3253 TRACE( "%p %p %p\n", callback, userdata, environment );
3254
3255 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
3256 if (!object)
3257 return STATUS_NO_MEMORY;
3258
3259 status = tp_threadpool_lock( &pool, environment );
3260 if (status)
3261 {
3262 RtlFreeHeap( GetProcessHeap(), 0, object );
3263 return status;
3264 }
3265
3266 object->type = TP_OBJECT_TYPE_SIMPLE;
3267 object->u.simple.callback = callback;
3268 tp_object_initialize( object, pool, userdata, environment );
3269
3270 return STATUS_SUCCESS;
3271}

Referenced by init_threadpool(), and RtlQueueWorkItem().

◆ TpStartAsyncIoOperation()

void WINAPI TpStartAsyncIoOperation ( TP_IO io)

Definition at line 3276 of file threadpool.c.

3277{
3278 struct threadpool_object *this = impl_from_TP_IO( io );
3279
3280 TRACE( "%p\n", io );
3281
3282 RtlEnterCriticalSection( &this->pool->cs );
3283
3284 this->u.io.pending_count++;
3285
3286 RtlLeaveCriticalSection( &this->pool->cs );
3287}

Referenced by init_threadpool().

◆ TpWaitForIoCompletion()

void WINAPI TpWaitForIoCompletion ( TP_IO io,
BOOL  cancel_pending 
)

Definition at line 3292 of file threadpool.c.

3293{
3294 struct threadpool_object *this = impl_from_TP_IO( io );
3295
3296 TRACE( "%p %d\n", io, cancel_pending );
3297
3298 if (cancel_pending)
3299 tp_object_cancel( this );
3300 tp_object_wait( this, FALSE );
3301}

Referenced by init_threadpool().

◆ TpWaitForTimer()

VOID WINAPI TpWaitForTimer ( TP_TIMER timer,
BOOL  cancel_pending 
)

Definition at line 3306 of file threadpool.c.

3307{
3308 struct threadpool_object *this = impl_from_TP_TIMER( timer );
3309
3310 TRACE( "%p %d\n", timer, cancel_pending );
3311
3312 if (cancel_pending)
3313 tp_object_cancel( this );
3314 tp_object_wait( this, FALSE );
3315}

Referenced by init_threadpool().

◆ TpWaitForWait()

VOID WINAPI TpWaitForWait ( TP_WAIT wait,
BOOL  cancel_pending 
)

Definition at line 3320 of file threadpool.c.

3321{
3322 struct threadpool_object *this = impl_from_TP_WAIT( wait );
3323
3324 TRACE( "%p %d\n", wait, cancel_pending );
3325
3326 if (cancel_pending)
3327 tp_object_cancel( this );
3328 tp_object_wait( this, FALSE );
3329}

Referenced by init_threadpool(), and RtlDeregisterWaitEx().

◆ TpWaitForWork()

VOID WINAPI TpWaitForWork ( TP_WORK work,
BOOL  cancel_pending 
)

Definition at line 3334 of file threadpool.c.

3335{
3336 struct threadpool_object *this = impl_from_TP_WORK( work );
3337
3338 TRACE( "%p %u\n", work, cancel_pending );
3339
3340 if (cancel_pending)
3341 tp_object_cancel( this );
3342 tp_object_wait( this, FALSE );
3343}

Referenced by init_threadpool().

◆ waitqueue_thread_proc()

static void CALLBACK waitqueue_thread_proc ( void param)
static

Definition at line 1342 of file threadpool.c.

1344{
1347 struct waitqueue_bucket *bucket = param;
1348 struct threadpool_object *wait, *next;
1350 DWORD num_handles;
1352
1353 TRACE( "starting wait queue thread\n" );
1354 set_thread_name(L"wine_threadpool_waitqueue");
1355
1357
1358 for (;;)
1359 {
1361 timeout.QuadPart = MAXLONGLONG;
1362 num_handles = 0;
1363
1365 u.wait.wait_entry )
1366 {
1367 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1368 if (wait->u.wait.timeout <= now.QuadPart)
1369 {
1370 /* Wait object timed out. */
1371 if ((wait->u.wait.flags & WT_EXECUTEONLYONCE))
1372 {
1373 list_remove( &wait->u.wait.wait_entry );
1374 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1375 }
1376 if ((wait->u.wait.flags & (WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD)))
1377 {
1378 InterlockedIncrement( &wait->refcount );
1379 wait->num_pending_callbacks++;
1380 RtlEnterCriticalSection( &wait->pool->cs );
1382 RtlLeaveCriticalSection( &wait->pool->cs );
1384 }
1385 else tp_object_submit( wait, FALSE );
1386 }
1387 else
1388 {
1389 if (wait->u.wait.timeout < timeout.QuadPart)
1390 timeout.QuadPart = wait->u.wait.timeout;
1391
1392 assert( num_handles < MAXIMUM_WAITQUEUE_OBJECTS );
1393 InterlockedIncrement( &wait->refcount );
1394 objects[num_handles] = wait;
1395 handles[num_handles] = wait->u.wait.handle;
1396 num_handles++;
1397 }
1398 }
1399
1400 if (!bucket->objcount)
1401 {
1402 /* All wait objects have been destroyed, if no new wait objects are created
1403 * within some amount of time, then we can shutdown this thread. */
1404 assert( num_handles == 0 );
1406 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
1407 status = NtWaitForMultipleObjects( 1, &bucket->update_event, TRUE, bucket->alertable, &timeout );
1409
1410 if (status == STATUS_TIMEOUT && !bucket->objcount)
1411 break;
1412 }
1413 else
1414 {
1415 handles[num_handles] = bucket->update_event;
1417 status = NtWaitForMultipleObjects( num_handles + 1, handles, TRUE, bucket->alertable, &timeout );
1419
1420 if (status >= STATUS_WAIT_0 && status < STATUS_WAIT_0 + num_handles)
1421 {
1423 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1424 if (wait->u.wait.bucket)
1425 {
1426 /* Wait object signaled. */
1427 assert( wait->u.wait.bucket == bucket );
1428 if ((wait->u.wait.flags & WT_EXECUTEONLYONCE))
1429 {
1430 list_remove( &wait->u.wait.wait_entry );
1431 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1432 }
1433 if ((wait->u.wait.flags & (WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD)))
1434 {
1435 wait->u.wait.signaled++;
1436 wait->num_pending_callbacks++;
1437 RtlEnterCriticalSection( &wait->pool->cs );
1439 RtlLeaveCriticalSection( &wait->pool->cs );
1440 }
1441 else tp_object_submit( wait, TRUE );
1442 }
1443 else
1444 WARN("wait object %p triggered while object was destroyed\n", wait);
1445 }
1446
1447 /* Release temporary references to wait objects. */
1448 while (num_handles)
1449 {
1450 wait = objects[--num_handles];
1451 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1453 }
1454 }
1455
1456 /* Try to merge bucket with other threads. */
1457 if (waitqueue.num_buckets > 1 && bucket->objcount &&
1458 bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 1 / 3)
1459 {
1460 struct waitqueue_bucket *other_bucket;
1461 LIST_FOR_EACH_ENTRY( other_bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
1462 {
1463 if (other_bucket != bucket && other_bucket->objcount && other_bucket->alertable == bucket->alertable &&
1464 other_bucket->objcount + bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 2 / 3)
1465 {
1466 other_bucket->objcount += bucket->objcount;
1467 bucket->objcount = 0;
1468
1469 /* Update reserved list. */
1470 LIST_FOR_EACH_ENTRY( wait, &bucket->reserved, struct threadpool_object, u.wait.wait_entry )
1471 {
1472 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1473 wait->u.wait.bucket = other_bucket;
1474 }
1475 list_move_tail( &other_bucket->reserved, &bucket->reserved );
1476
1477 /* Update waiting list. */
1478 LIST_FOR_EACH_ENTRY( wait, &bucket->waiting, struct threadpool_object, u.wait.wait_entry )
1479 {
1480 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1481 wait->u.wait.bucket = other_bucket;
1482 }
1483 list_move_tail( &other_bucket->waiting, &bucket->waiting );
1484
1485 /* Move bucket to the end, to keep the probability of
1486 * newly added wait objects as small as possible. */
1487 list_remove( &bucket->bucket_entry );
1488 list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
1489
1490 NtSetEvent( other_bucket->update_event, NULL );
1491 break;
1492 }
1493 }
1494 }
1495 }
1496
1497 /* Remove this bucket from the list. */
1498 list_remove( &bucket->bucket_entry );
1499 if (!--waitqueue.num_buckets)
1500 assert( list_empty( &waitqueue.buckets ) );
1501
1503
1504 TRACE( "terminating wait queue thread\n" );
1505
1506 assert( bucket->objcount == 0 );
1507 assert( list_empty( &bucket->reserved ) );
1508 assert( list_empty( &bucket->waiting ) );
1509 NtClose( bucket->update_event );
1510
1511 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1512 RtlExitUserThread( 0 );
1513}
#define WARN(fmt,...)
Definition: precomp.h:61
static const CLSID * objects[]
Definition: apphelp.c:112
NTSTATUS NTAPI NtWaitForMultipleObjects(IN ULONG ObjectCount, IN PHANDLE HandleArray, IN WAIT_TYPE WaitType, IN BOOLEAN Alertable, IN PLARGE_INTEGER TimeOut OPTIONAL)
Definition: obwait.c:46
struct waitqueue_bucket * bucket
Definition: threadpool.c:236
static EFI_HANDLE * handles
Definition: uefidisk.c:62

Referenced by tp_waitqueue_lock().

◆ WINE_DEFAULT_DEBUG_CHANNEL()

WINE_DEFAULT_DEBUG_CHANNEL ( threadpool  )

Variable Documentation

◆ buckets

struct list buckets

Definition at line 329 of file threadpool.c.

Referenced by add_hash(), delete_hash_entry(), hash_lookup(), and new_hash().

◆ compl_port

HANDLE compl_port

Definition at line 93 of file threadpool.c.

◆ critsect_compl_debug

static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug
static
Initial value:
=
{
0, 0, &old_threadpool.threadpool_compl_cs,
0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_compl_cs") }
}
static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug
Definition: threadpool.c:88

Definition at line 88 of file threadpool.c.

◆ cs

◆ default_threadpool

struct threadpool* default_threadpool = NULL
static

Definition at line 444 of file threadpool.c.

Referenced by tp_threadpool_lock(), and tp_threadpool_shutdown().

◆ 

struct { ... } ioqueue
Initial value:
=
{
.cs = { &ioqueue_debug, -1, 0, 0, 0, 0 },
}
static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug
Definition: threadpool.c:363

Referenced by ioqueue_thread_proc(), tp_ioqueue_lock(), and tp_ioqueue_unlock().

◆ ioqueue_debug

static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug
static
Initial value:
=
{
0, 0, &ioqueue.cs,
0, 0, { (DWORD_PTR)(__FILE__ ": ioqueue.cs") }
}

Definition at line 363 of file threadpool.c.

◆ num_buckets

LONG num_buckets

Definition at line 328 of file threadpool.c.

Referenced by hash_table_hash(), and hash_table_init().

◆ objcount

LONG objcount

Definition at line 291 of file threadpool.c.

◆ 

struct { ... } old_threadpool
Initial value:
=
{
NULL,
{ &critsect_compl_debug, -1, 0, 0, 0, 0 },
}

Referenced by RtlSetIoCompletionCallback().

◆ pending_timers

struct list pending_timers

Definition at line 293 of file threadpool.c.

◆ port

HANDLE port

Definition at line 371 of file threadpool.c.

◆ thread_running

BOOL thread_running

Definition at line 292 of file threadpool.c.

◆ threadpool_compl_cs

RTL_CRITICAL_SECTION threadpool_compl_cs

Definition at line 94 of file threadpool.c.

◆ 

struct { ... } timerqueue
Initial value:
=
{
{ &timerqueue_debug, -1, 0, 0, 0, 0 },
0,
LIST_INIT( timerqueue.pending_timers ),
}
static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug
Definition: threadpool.c:285
#define LIST_INIT(head)
Definition: queue.h:197
#define RTL_CONDITION_VARIABLE_INIT
Definition: rtltypes.h:282

Referenced by timerqueue_thread_proc(), tp_timerqueue_lock(), tp_timerqueue_unlock(), and TpSetTimer().

◆ timerqueue_debug

static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug
static
Initial value:
=
{
0, 0, &timerqueue.cs,
0, 0, { (DWORD_PTR)(__FILE__ ": timerqueue.cs") }
}

Definition at line 285 of file threadpool.c.

◆ update_event

RTL_CONDITION_VARIABLE update_event

Definition at line 294 of file threadpool.c.

◆ 

struct { ... } waitqueue
Initial value:
=
{
{ &waitqueue_debug, -1, 0, 0, 0, 0 },
0,
}
static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug
Definition: threadpool.c:322

Referenced by RtlRegisterWait(), tp_object_execute(), tp_waitqueue_lock(), tp_waitqueue_unlock(), TpSetWait(), and waitqueue_thread_proc().

◆ waitqueue_debug

static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug
static
Initial value:
=
{
0, 0, &waitqueue.cs,
0, 0, { (DWORD_PTR)(__FILE__ ": waitqueue.cs") }
}

Definition at line 322 of file threadpool.c.