ReactOS 0.4.15-dev-7942-gd23573b
workitem.c File Reference
#include <rtl.h>
#include <debug.h>
Include dependency graph for workitem.c:

Go to the source code of this file.

Classes

struct  _RTLP_IOWORKERTHREAD
 
struct  _RTLP_WORKITEM
 

Macros

#define NDEBUG
 
#define MAX_WORKERTHREADS   0x100
 
#define WORKERTHREAD_CREATION_THRESHOLD   0x5
 
#define IsThreadPoolInitialized()   (*((volatile LONG*)&ThreadPoolInitialized) == 1)
 

Typedefs

typedef struct _RTLP_IOWORKERTHREAD RTLP_IOWORKERTHREAD
 
typedef struct _RTLP_IOWORKERTHREADPRTLP_IOWORKERTHREAD
 
typedef struct _RTLP_WORKITEM RTLP_WORKITEM
 
typedef struct _RTLP_WORKITEMPRTLP_WORKITEM
 

Functions

NTSTATUS NTAPI RtlpStartThread (IN PTHREAD_START_ROUTINE Function, IN PVOID Parameter, OUT PHANDLE ThreadHandle)
 
NTSTATUS NTAPI RtlpExitThread (IN NTSTATUS ExitStatus)
 
static NTSTATUS RtlpInitializeThreadPool (VOID)
 
static NTSTATUS RtlpGetImpersonationToken (OUT PHANDLE TokenHandle)
 
static NTSTATUS RtlpStartWorkerThread (PTHREAD_START_ROUTINE StartRoutine)
 
static VOID NTAPI RtlpExecuteWorkItem (IN OUT PVOID NormalContext, IN OUT PVOID SystemArgument1, IN OUT PVOID SystemArgument2)
 
static NTSTATUS RtlpQueueWorkerThread (IN OUT PRTLP_WORKITEM WorkItem)
 
static VOID NTAPI RtlpExecuteIoWorkItem (IN OUT PVOID NormalContext, IN OUT PVOID SystemArgument1, IN OUT PVOID SystemArgument2)
 
static NTSTATUS RtlpQueueIoWorkerThread (IN OUT PRTLP_WORKITEM WorkItem)
 
static BOOLEAN RtlpIsIoPending (IN HANDLE ThreadHandle OPTIONAL)
 
static ULONG NTAPI RtlpIoWorkerThreadProc (IN PVOID Parameter)
 
static ULONG NTAPI RtlpWorkerThreadProc (IN PVOID Parameter)
 
NTSTATUS NTAPI RtlQueueWorkItem (IN WORKERCALLBACKFUNC Function, IN PVOID Context OPTIONAL, IN ULONG Flags)
 
NTSTATUS NTAPI RtlSetIoCompletionCallback (IN HANDLE FileHandle, IN PIO_APC_ROUTINE Callback, IN ULONG Flags)
 
NTSTATUS NTAPI RtlSetThreadPoolStartFunc (IN PRTL_START_POOL_THREAD StartPoolThread, IN PRTL_EXIT_POOL_THREAD ExitPoolThread)
 

Variables

PRTL_START_POOL_THREAD RtlpStartThreadFunc = RtlpStartThread
 
PRTL_EXIT_POOL_THREAD RtlpExitThreadFunc = RtlpExitThread
 
static LONG ThreadPoolInitialized = 0
 
static RTL_CRITICAL_SECTION ThreadPoolLock
 
static PRTLP_IOWORKERTHREAD PersistentIoThread
 
static LIST_ENTRY ThreadPoolIOWorkerThreadsList
 
static HANDLE ThreadPoolCompletionPort
 
static LONG ThreadPoolWorkerThreads
 
static LONG ThreadPoolWorkerThreadsRequests
 
static LONG ThreadPoolWorkerThreadsLongRequests
 
static LONG ThreadPoolIOWorkerThreads
 
static LONG ThreadPoolIOWorkerThreadsRequests
 
static LONG ThreadPoolIOWorkerThreadsLongRequests
 

Macro Definition Documentation

◆ IsThreadPoolInitialized

#define IsThreadPoolInitialized ( )    (*((volatile LONG*)&ThreadPoolInitialized) == 1)

Definition at line 78 of file workitem.c.

◆ MAX_WORKERTHREADS

#define MAX_WORKERTHREADS   0x100

Definition at line 48 of file workitem.c.

◆ NDEBUG

#define NDEBUG

Definition at line 13 of file workitem.c.

◆ WORKERTHREAD_CREATION_THRESHOLD

#define WORKERTHREAD_CREATION_THRESHOLD   0x5

Definition at line 49 of file workitem.c.

Typedef Documentation

◆ PRTLP_IOWORKERTHREAD

◆ PRTLP_WORKITEM

◆ RTLP_IOWORKERTHREAD

◆ RTLP_WORKITEM

Function Documentation

◆ RtlpExecuteIoWorkItem()

static VOID NTAPI RtlpExecuteIoWorkItem ( IN OUT PVOID  NormalContext,
IN OUT PVOID  SystemArgument1,
IN OUT PVOID  SystemArgument2 
)
static

Definition at line 308 of file workitem.c.

311{
313 BOOLEAN Impersonated = FALSE;
314 PRTLP_IOWORKERTHREAD IoThread = (PRTLP_IOWORKERTHREAD)NormalContext;
316
317 ASSERT(IoThread != NULL);
318
319 RtlFreeHeap(RtlGetProcessHeap(),
320 0,
322
323 if (WorkItem.TokenHandle != NULL)
324 {
327 &WorkItem.TokenHandle,
328 sizeof(HANDLE));
329
330 NtClose(WorkItem.TokenHandle);
331
332 if (NT_SUCCESS(Status))
333 {
334 Impersonated = TRUE;
335 }
336 }
337
339 {
340 DPRINT("RtlpExecuteIoWorkItem: Function: 0x%p Context: 0x%p ImpersonationToken: 0x%p\n", WorkItem.Function, WorkItem.Context, WorkItem.TokenHandle);
341
342 /* Execute the function */
343 WorkItem.Function(WorkItem.Context);
344 }
346 {
347 DPRINT1("Exception 0x%x while executing IO work item 0x%p\n", _SEH2_GetExceptionCode(), WorkItem.Function);
348 }
349 _SEH2_END;
350
351 if (Impersonated)
352 {
353 WorkItem.TokenHandle = NULL;
356 &WorkItem.TokenHandle,
357 sizeof(HANDLE));
358 if (!NT_SUCCESS(Status))
359 {
360 DPRINT1("Failed to revert worker thread to self!!! Status: 0x%x\n", Status);
361 }
362 }
363
364 /* remove the long function flag */
366 {
368 if (NT_SUCCESS(Status))
369 {
370 IoThread->Flags &= ~WT_EXECUTELONGFUNCTION;
372 }
373 }
374
375 /* update the requests counter */
377
379 {
381 }
382}
unsigned char BOOLEAN
#define InterlockedDecrement
Definition: armddk.h:52
LONG NTSTATUS
Definition: precomp.h:26
#define DPRINT1
Definition: precomp.h:8
BOOLEAN NTAPI RtlFreeHeap(IN PVOID HeapHandle, IN ULONG Flags, IN PVOID HeapBase)
Definition: heap.c:608
#define NULL
Definition: types.h:112
#define TRUE
Definition: types.h:120
#define FALSE
Definition: types.h:117
#define NT_SUCCESS(StatCode)
Definition: apphelp.c:32
@ ThreadImpersonationToken
Definition: compat.h:940
#define _SEH2_END
Definition: filesup.c:22
#define _SEH2_TRY
Definition: filesup.c:19
Status
Definition: gdiplustypes.h:25
#define EXCEPTION_EXECUTE_HANDLER
Definition: excpt.h:85
#define ASSERT(a)
Definition: mode.c:44
NTSYSAPI NTSTATUS NTAPI RtlEnterCriticalSection(_In_ PRTL_CRITICAL_SECTION CriticalSection)
NTSYSAPI NTSTATUS NTAPI RtlLeaveCriticalSection(_In_ PRTL_CRITICAL_SECTION CriticalSection)
NTSTATUS NTAPI NtClose(IN HANDLE Handle)
Definition: obhandle.c:3402
NTSTATUS NTAPI NtSetInformationThread(IN HANDLE ThreadHandle, IN THREADINFOCLASS ThreadInformationClass, IN PVOID ThreadInformation, IN ULONG ThreadInformationLength)
Definition: query.c:2018
#define _SEH2_GetExceptionCode()
Definition: pseh2_64.h:159
#define _SEH2_EXCEPT(...)
Definition: pseh2_64.h:34
struct _RTLP_IOWORKERTHREAD * PRTLP_IOWORKERTHREAD
static LONG ThreadPoolIOWorkerThreadsLongRequests
Definition: workitem.c:76
static RTL_CRITICAL_SECTION ThreadPoolLock
Definition: workitem.c:67
static LONG ThreadPoolIOWorkerThreadsRequests
Definition: workitem.c:75
#define DPRINT
Definition: sndvol32.h:71
_Must_inspect_result_ _In_ PWDF_WORKITEM_CONFIG _In_ PWDF_OBJECT_ATTRIBUTES _Out_ WDFWORKITEM * WorkItem
Definition: wdfworkitem.h:115
#define WT_EXECUTELONGFUNCTION
Definition: winnt_old.h:1043
_In_opt_ PVOID _In_opt_ PVOID _In_opt_ PVOID SystemArgument2
Definition: ketypes.h:689
#define NtCurrentThread()

Referenced by RtlpQueueIoWorkerThread().

◆ RtlpExecuteWorkItem()

static VOID NTAPI RtlpExecuteWorkItem ( IN OUT PVOID  NormalContext,
IN OUT PVOID  SystemArgument1,
IN OUT PVOID  SystemArgument2 
)
static

Definition at line 194 of file workitem.c.

197{
199 BOOLEAN Impersonated = FALSE;
201
202 RtlFreeHeap(RtlGetProcessHeap(),
203 0,
205
206 if (WorkItem.TokenHandle != NULL)
207 {
210 &WorkItem.TokenHandle,
211 sizeof(HANDLE));
212
213 NtClose(WorkItem.TokenHandle);
214
215 if (NT_SUCCESS(Status))
216 {
217 Impersonated = TRUE;
218 }
219 }
220
222 {
223 DPRINT("RtlpExecuteWorkItem: Function: 0x%p Context: 0x%p ImpersonationToken: 0x%p\n", WorkItem.Function, WorkItem.Context, WorkItem.TokenHandle);
224
225 /* Execute the function */
226 WorkItem.Function(WorkItem.Context);
227 }
229 {
230 DPRINT1("Exception 0x%x while executing IO work item 0x%p\n", _SEH2_GetExceptionCode(), WorkItem.Function);
231 }
232 _SEH2_END;
233
234 if (Impersonated)
235 {
236 WorkItem.TokenHandle = NULL;
239 &WorkItem.TokenHandle,
240 sizeof(HANDLE));
241 if (!NT_SUCCESS(Status))
242 {
243 DPRINT1("Failed to revert worker thread to self!!! Status: 0x%x\n", Status);
244 }
245 }
246
247 /* update the requests counter */
249
251 {
253 }
254}
static LONG ThreadPoolWorkerThreadsLongRequests
Definition: workitem.c:73
static LONG ThreadPoolWorkerThreadsRequests
Definition: workitem.c:72

Referenced by RtlpQueueWorkerThread().

◆ RtlpExitThread()

NTSTATUS NTAPI RtlpExitThread ( IN NTSTATUS  ExitStatus)

Definition at line 39 of file workitem.c.

40{
41 /* Kill a native worker thread -- used for SMSS, CSRSS, etc... */
43}
_In_ NTSTATUS ExitStatus
Definition: psfuncs.h:867
NTSTATUS NtTerminateThread(IN HANDLE ThreadHandle OPTIONAL, IN NTSTATUS ExitStatus)
Definition: kill.c:1279

◆ RtlpGetImpersonationToken()

static NTSTATUS RtlpGetImpersonationToken ( OUT PHANDLE  TokenHandle)
static

Definition at line 146 of file workitem.c.

147{
149
152 TRUE,
155 {
156 *TokenHandle = NULL;
158 }
159
160 return Status;
161}
_In_ ACCESS_MASK _In_ ULONG _Out_ PHANDLE TokenHandle
Definition: psfuncs.h:726
NTSTATUS NTAPI NtOpenThreadToken(_In_ HANDLE ThreadHandle, _In_ ACCESS_MASK DesiredAccess, _In_ BOOLEAN OpenAsSelf, _Out_ PHANDLE TokenHandle)
Opens a token that is tied to a thread handle.
Definition: token.c:2474
#define STATUS_NO_TOKEN
Definition: ntstatus.h:360
#define STATUS_CANT_OPEN_ANONYMOUS
Definition: ntstatus.h:402
#define STATUS_SUCCESS
Definition: shellext.h:65
#define TOKEN_IMPERSONATE
Definition: setypes.h:927

Referenced by RtlQueueWorkItem().

◆ RtlpInitializeThreadPool()

static NTSTATUS RtlpInitializeThreadPool ( VOID  )
static

Definition at line 81 of file workitem.c.

82{
84 LONG InitStatus;
85
86 do
87 {
89 2,
90 0);
91 if (InitStatus == 0)
92 {
93 /* We're the first thread to initialize the thread pool */
94
96
98
105
106 /* Initialize the lock */
108 if (!NT_SUCCESS(Status))
109 goto Finish;
110
111 /* Create the complection port */
114 NULL,
115 0);
116 if (!NT_SUCCESS(Status))
117 {
119 goto Finish;
120 }
121
122Finish:
123 /* Initialization done */
125 1);
126 break;
127 }
128 else if (InitStatus == 2)
129 {
131
132 /* Another thread is currently initializing the thread pool!
133 Poll after a short period of time to see if the initialization
134 was completed */
135
136 Timeout.QuadPart = -10000000LL; /* Wait for a second */
138 &Timeout);
139 }
140 } while (InitStatus != 1);
141
142 return Status;
143}
#define InterlockedExchange
Definition: armddk.h:54
#define InitializeListHead(ListHead)
Definition: env_spec_w32.h:944
#define InterlockedCompareExchange
Definition: interlocked.h:104
NTSTATUS NTAPI NtCreateIoCompletion(OUT PHANDLE IoCompletionHandle, IN ACCESS_MASK DesiredAccess, IN POBJECT_ATTRIBUTES ObjectAttributes, IN ULONG NumberOfConcurrentThreads)
Definition: iocomp.c:253
#define IO_COMPLETION_ALL_ACCESS
Definition: file.c:72
NTSYSAPI NTSTATUS NTAPI RtlDeleteCriticalSection(_In_ PRTL_CRITICAL_SECTION CriticalSection)
NTSYSAPI NTSTATUS NTAPI RtlInitializeCriticalSection(_In_ PRTL_CRITICAL_SECTION CriticalSection)
NTSTATUS NTAPI NtDelayExecution(IN BOOLEAN Alertable, IN PLARGE_INTEGER DelayInterval)
Definition: wait.c:876
long LONG
Definition: pedump.c:60
static ULONG Timeout
Definition: ping.c:61
static HANDLE ThreadPoolCompletionPort
Definition: workitem.c:70
static LONG ThreadPoolWorkerThreads
Definition: workitem.c:71
static LIST_ENTRY ThreadPoolIOWorkerThreadsList
Definition: workitem.c:69
static PRTLP_IOWORKERTHREAD PersistentIoThread
Definition: workitem.c:68
static LONG ThreadPoolInitialized
Definition: workitem.c:66
static LONG ThreadPoolIOWorkerThreads
Definition: workitem.c:74

Referenced by RtlQueueWorkItem(), and RtlSetIoCompletionCallback().

◆ RtlpIoWorkerThreadProc()

static ULONG NTAPI RtlpIoWorkerThreadProc ( IN PVOID  Parameter)
static

Definition at line 561 of file workitem.c.

562{
565 BOOLEAN Terminate;
567
569 {
570 /* Oops, too many worker threads... */
571 goto InitFailed;
572 }
573
574 /* Get a thread handle to ourselves */
578 (PHANDLE)&ThreadInfo.ThreadHandle,
579 0,
580 0,
582 if (!NT_SUCCESS(Status))
583 {
584 DPRINT1("Failed to create handle to own thread! Status: 0x%x\n", Status);
585
586InitFailed:
588
589 /* Signal initialization completion */
591 1);
592
594 return 0;
595 }
596
597 ThreadInfo.Flags = 0;
598
599 /* Insert the thread into the list */
601 (PLIST_ENTRY)&ThreadInfo.ListEntry);
602
603 /* Signal initialization completion */
605 1);
606
607 for (;;)
608 {
609 Timeout.QuadPart = -50000000LL; /* Wait for 5 seconds by default */
610
611Wait:
612 do
613 {
614 /* Perform an alertable wait, the work items are going to be executed as APCs */
616 &Timeout);
617
618 /* Loop as long as we executed an APC */
619 } while (Status != STATUS_SUCCESS);
620
621 /* We timed out, let's see if we're allowed to terminate */
622 Terminate = FALSE;
623
625 if (NT_SUCCESS(Status))
626 {
628 {
629 /* This thread is supposed to be persistent. Don't terminate! */
631
632 Timeout.QuadPart = -0x7FFFFFFFFFFFFFFFLL;
633 goto Wait;
634 }
635
636 /* FIXME - figure out an effective method to determine if it's appropriate to
637 lower the number of threads. For now let's always terminate if there's
638 at least one thread and no queued items. */
640 (*((volatile LONG*)&ThreadPoolIOWorkerThreadsRequests) == 0);
641
642 if (Terminate)
643 {
644 /* Prevent termination as long as IO is pending */
645 Terminate = !RtlpIsIoPending(ThreadInfo.ThreadHandle);
646 }
647
648 if (Terminate)
649 {
650 /* Rundown the thread and unlink it from the list */
653 }
654
656
657 if (Terminate)
658 {
659 /* Break the infinite loop and terminate */
661 break;
662 }
663 }
664 else
665 {
666 DPRINT1("Failed to acquire the thread pool lock!!! Status: 0x%x\n", Status);
667 break;
668 }
669 }
670
671 NtClose(ThreadInfo.ThreadHandle);
673 return 0;
674}
#define InterlockedIncrement
Definition: armddk.h:53
#define RemoveEntryList(Entry)
Definition: env_spec_w32.h:986
#define InsertHeadList(ListHead, Entry)
struct _ThreadInfo ThreadInfo
#define NtCurrentProcess()
Definition: nt_native.h:1657
PVOID *typedef PHANDLE
Definition: ntsecpkg.h:455
NTSTATUS NTAPI NtDuplicateObject(IN HANDLE SourceProcessHandle, IN HANDLE SourceHandle, IN HANDLE TargetProcessHandle OPTIONAL, OUT PHANDLE TargetHandle OPTIONAL, IN ACCESS_MASK DesiredAccess, IN ULONG HandleAttributes, IN ULONG Options)
Definition: obhandle.c:3410
#define WT_EXECUTEINPERSISTENTIOTHREAD
Definition: queuetest.c:12
#define WORKERTHREAD_CREATION_THRESHOLD
Definition: workitem.c:49
#define MAX_WORKERTHREADS
Definition: workitem.c:48
static BOOLEAN RtlpIsIoPending(IN HANDLE ThreadHandle OPTIONAL)
Definition: workitem.c:517
PRTL_EXIT_POOL_THREAD RtlpExitThreadFunc
Definition: workitem.c:46
Definition: typedefs.h:120
int32_t * PLONG
Definition: typedefs.h:58
_In_ WDFDPC _In_ BOOLEAN Wait
Definition: wdfdpc.h:170
#define DUPLICATE_SAME_ACCESS
_Inout_opt_ PVOID Parameter
Definition: rtltypes.h:323

Referenced by RtlQueueWorkItem().

◆ RtlpIsIoPending()

static BOOLEAN RtlpIsIoPending ( IN HANDLE ThreadHandle  OPTIONAL)
static

Definition at line 517 of file workitem.c.

518{
520 ULONG IoPending;
521 BOOLEAN CreatedHandle = FALSE;
522 BOOLEAN IsIoPending = TRUE;
523
524 if (ThreadHandle == NULL)
525 {
529 &ThreadHandle,
530 0,
531 0,
533 if (!NT_SUCCESS(Status))
534 {
535 return IsIoPending;
536 }
537
538 CreatedHandle = TRUE;
539 }
540
541 Status = NtQueryInformationThread(ThreadHandle,
543 &IoPending,
544 sizeof(IoPending),
545 NULL);
546 if (NT_SUCCESS(Status) && IoPending == 0)
547 {
548 IsIoPending = FALSE;
549 }
550
551 if (CreatedHandle)
552 {
553 NtClose(ThreadHandle);
554 }
555
556 return IsIoPending;
557}
@ ThreadIsIoPending
Definition: compat.h:951
NTSTATUS NTAPI NtQueryInformationThread(IN HANDLE ThreadHandle, IN THREADINFOCLASS ThreadInformationClass, OUT PVOID ThreadInformation, IN ULONG ThreadInformationLength, OUT PULONG ReturnLength OPTIONAL)
Definition: query.c:2624
uint32_t ULONG
Definition: typedefs.h:59

Referenced by RtlpIoWorkerThreadProc(), and RtlpWorkerThreadProc().

◆ RtlpQueueIoWorkerThread()

static NTSTATUS RtlpQueueIoWorkerThread ( IN OUT PRTLP_WORKITEM  WorkItem)
static

Definition at line 385 of file workitem.c.

386{
387 PLIST_ENTRY CurrentEntry;
388 PRTLP_IOWORKERTHREAD IoThread = NULL;
390
392 {
394 {
395 /* We already have a persistent IO worker thread */
396 IoThread = PersistentIoThread;
397 }
398 else
399 {
400 /* We're not aware of any persistent IO worker thread. Search for a unused
401 worker thread that doesn't have a long function queued */
403 while (CurrentEntry != &ThreadPoolIOWorkerThreadsList)
404 {
405 IoThread = CONTAINING_RECORD(CurrentEntry,
407 ListEntry);
408
409 if (!(IoThread->Flags & WT_EXECUTELONGFUNCTION))
410 break;
411
412 CurrentEntry = CurrentEntry->Flink;
413 }
414
415 if (CurrentEntry != &ThreadPoolIOWorkerThreadsList)
416 {
417 /* Found a worker thread we can use. */
418 ASSERT(IoThread != NULL);
419
421 PersistentIoThread = IoThread;
422 }
423 else
424 {
425 DPRINT1("Failed to find a worker thread for the persistent IO thread!\n");
426 return STATUS_NO_MEMORY;
427 }
428 }
429 }
430 else
431 {
432 /* Find a worker thread that is not currently executing a long function */
434 while (CurrentEntry != &ThreadPoolIOWorkerThreadsList)
435 {
436 IoThread = CONTAINING_RECORD(CurrentEntry,
438 ListEntry);
439
440 if (!(IoThread->Flags & WT_EXECUTELONGFUNCTION))
441 {
442 /* if we're trying to queue a long function then make sure we're not dealing
443 with the persistent thread */
445 {
446 /* found a candidate */
447 break;
448 }
449 }
450
451 CurrentEntry = CurrentEntry->Flink;
452 }
453
454 if (CurrentEntry == &ThreadPoolIOWorkerThreadsList)
455 {
456 /* Couldn't find an appropriate thread, see if we can use the persistent thread (if it exists) for now */
458 {
459 DPRINT1("Failed to find a worker thread for the work item 0x%p!\n", WorkItem);
461 return STATUS_NO_MEMORY;
462 }
463 else
464 {
465 /* pick the first worker thread */
467 IoThread = CONTAINING_RECORD(CurrentEntry,
469 ListEntry);
470
471 /* Since this might be the persistent worker thread, don't run as a
472 long function */
473 WorkItem->Flags &= ~WT_EXECUTELONGFUNCTION;
474 }
475 }
476
477 /* Move the picked thread to the end of the list. Since we're always searching
478 from the beginning, this improves distribution of work items */
479 RemoveEntryList(&IoThread->ListEntry);
481 &IoThread->ListEntry);
482 }
483
484 ASSERT(IoThread != NULL);
485
487
488 if (WorkItem->Flags & WT_EXECUTELONGFUNCTION)
489 {
490 /* We're about to queue a long function, mark the thread */
491 IoThread->Flags |= WT_EXECUTELONGFUNCTION;
492
494 }
495
496 /* It's time to queue the work item */
499 IoThread,
500 NULL,
501 WorkItem);
502 if (!NT_SUCCESS(Status))
503 {
504 DPRINT1("Failed to queue APC for work item 0x%p\n", WorkItem->Function);
506
507 if (WorkItem->Flags & WT_EXECUTELONGFUNCTION)
508 {
510 }
511 }
512
513 return Status;
514}
#define InsertTailList(ListHead, Entry)
#define IsListEmpty(ListHead)
Definition: env_spec_w32.h:954
NTSTATUS NTAPI NtQueueApcThread(IN HANDLE ThreadHandle, IN PKNORMAL_ROUTINE ApcRoutine, IN PVOID NormalContext, IN PVOID SystemArgument1, IN PVOID SystemArgument2)
Definition: state.c:600
#define STATUS_NO_MEMORY
Definition: ntstatus.h:260
static VOID NTAPI RtlpExecuteIoWorkItem(IN OUT PVOID NormalContext, IN OUT PVOID SystemArgument1, IN OUT PVOID SystemArgument2)
Definition: workitem.c:308
struct _LIST_ENTRY * Flink
Definition: typedefs.h:121
LIST_ENTRY ListEntry
Definition: workitem.c:53
#define CONTAINING_RECORD(address, type, field)
Definition: typedefs.h:260

Referenced by RtlQueueWorkItem().

◆ RtlpQueueWorkerThread()

static NTSTATUS RtlpQueueWorkerThread ( IN OUT PRTLP_WORKITEM  WorkItem)
static

Definition at line 258 of file workitem.c.

259{
261
263
264 if (WorkItem->Flags & WT_EXECUTELONGFUNCTION)
265 {
267 }
268
270 {
272
273 if (NT_SUCCESS(Status))
274 {
275 /* Queue an APC in the timer thread */
278 NULL,
279 NULL,
280 WorkItem);
281 }
282 }
283 else
284 {
285 /* Queue an IO completion message */
288 WorkItem,
290 0);
291 }
292
293 if (!NT_SUCCESS(Status))
294 {
296
297 if (WorkItem->Flags & WT_EXECUTELONGFUNCTION)
298 {
300 }
301 }
302
303 return Status;
304}
NTSTATUS NTAPI NtSetIoCompletion(IN HANDLE IoCompletionPortHandle, IN PVOID CompletionKey, IN PVOID CompletionContext, IN NTSTATUS CompletionStatus, IN ULONG CompletionInformation)
Definition: iocomp.c:569
HANDLE TimerThreadHandle
Definition: timerqueue.c:24
NTSTATUS RtlpInitializeTimerThread(VOID)
Definition: timerqueue.c:27
static VOID NTAPI RtlpExecuteWorkItem(IN OUT PVOID NormalContext, IN OUT PVOID SystemArgument1, IN OUT PVOID SystemArgument2)
Definition: workitem.c:194
#define WT_EXECUTEINPERSISTENTTHREAD
Definition: winnt_old.h:1046

Referenced by RtlQueueWorkItem().

◆ RtlpStartThread()

NTSTATUS NTAPI RtlpStartThread ( IN PTHREAD_START_ROUTINE  Function,
IN PVOID  Parameter,
OUT PHANDLE  ThreadHandle 
)

Definition at line 20 of file workitem.c.

23{
24 /* Create a native worker thread -- used for SMSS, CSRSS, etc... */
26 NULL,
27 TRUE,
28 0,
29 0,
30 0,
33 ThreadHandle,
34 NULL);
35}
_In_ CDROM_SCAN_FOR_SPECIAL_INFO _In_ PCDROM_SCAN_FOR_SPECIAL_HANDLER Function
Definition: cdrom.h:1156
NTSYSAPI NTSTATUS NTAPI RtlCreateUserThread(_In_ PVOID ThreadContext, _Out_ HANDLE *OutThreadHandle, _Reserved_ PVOID Reserved1, _Reserved_ PVOID Reserved2, _Reserved_ PVOID Reserved3, _Reserved_ PVOID Reserved4, _Reserved_ PVOID Reserved5, _Reserved_ PVOID Reserved6, _Reserved_ PVOID Reserved7, _Reserved_ PVOID Reserved8)

◆ RtlpStartWorkerThread()

static NTSTATUS RtlpStartWorkerThread ( PTHREAD_START_ROUTINE  StartRoutine)
static

Definition at line 164 of file workitem.c.

165{
167 HANDLE ThreadHandle;
169 volatile LONG WorkerInitialized = 0;
170
171 Timeout.QuadPart = -10000LL; /* Wait for 100ms */
172
173 /* Start the thread */
174 Status = RtlpStartThreadFunc(StartRoutine, (PVOID)&WorkerInitialized, &ThreadHandle);
175 if (NT_SUCCESS(Status))
176 {
177 NtResumeThread(ThreadHandle, NULL);
178
179 /* Poll until the thread got a chance to initialize */
180 while (WorkerInitialized == 0)
181 {
183 &Timeout);
184 }
185
186 NtClose(ThreadHandle);
187 }
188
189 return Status;
190}
NTSTATUS NTAPI NtResumeThread(IN HANDLE ThreadHandle, OUT PULONG SuspendCount OPTIONAL)
Definition: state.c:290
PRTL_START_POOL_THREAD RtlpStartThreadFunc
Definition: workitem.c:45
_In_ ULONG _In_opt_ POBJECT_ATTRIBUTES _In_opt_ HANDLE _Out_opt_ PCLIENT_ID _In_ PKSTART_ROUTINE StartRoutine
Definition: psfuncs.h:91

Referenced by RtlQueueWorkItem().

◆ RtlpWorkerThreadProc()

static ULONG NTAPI RtlpWorkerThreadProc ( IN PVOID  Parameter)
static

Definition at line 678 of file workitem.c.

679{
681 BOOLEAN Terminate;
684 ULONG TimeoutCount = 0;
687
689 {
690 /* Signal initialization completion */
692 1);
693
694 /* Oops, too many worker threads... */
696 return 0;
697 }
698
699 /* Signal initialization completion */
701 1);
702
703 for (;;)
704 {
705 Timeout.QuadPart = -50000000LL; /* Wait for 5 seconds by default */
706
707 /* Dequeue a completion message */
709 (PVOID*)&ApcRoutine,
712 &Timeout);
713
714 if (Status == STATUS_SUCCESS)
715 {
716 TimeoutCount = 0;
717
719 {
720 /* Call the APC routine */
724 }
726 {
727 (void)0;
728 }
729 _SEH2_END;
730 }
731 else
732 {
733 Terminate = FALSE;
734
736 continue;
737
738 /* FIXME - this should be optimized, check if there's requests, etc */
739
740 if (Status == STATUS_TIMEOUT)
741 {
742 /* FIXME - we might want to optimize this */
743 if (TimeoutCount++ > 2 &&
745 {
746 Terminate = TRUE;
747 }
748 }
749 else
750 Terminate = TRUE;
751
753
754 if (Terminate)
755 {
756 /* Prevent termination as long as IO is pending */
757 Terminate = !RtlpIsIoPending(NULL);
758 }
759
760 if (Terminate)
761 {
764 break;
765 }
766 }
767 }
768
770 return 0;
771
772}
NTSTATUS NTAPI NtRemoveIoCompletion(IN HANDLE IoCompletionHandle, OUT PVOID *KeyContext, OUT PVOID *ApcContext, OUT PIO_STATUS_BLOCK IoStatusBlock, IN PLARGE_INTEGER Timeout OPTIONAL)
Definition: iocomp.c:445
static OUT PIO_STATUS_BLOCK IoStatusBlock
Definition: pipe.c:75
_In_opt_ HANDLE _In_opt_ PIO_APC_ROUTINE ApcRoutine
Definition: iofuncs.h:726
VOID(NTAPI * PKNORMAL_ROUTINE)(IN PVOID NormalContext OPTIONAL, IN PVOID SystemArgument1 OPTIONAL, IN PVOID SystemArgument2 OPTIONAL)
Definition: ketypes.h:744
#define STATUS_TIMEOUT
Definition: ntstatus.h:81

Referenced by RtlQueueWorkItem().

◆ RtlQueueWorkItem()

NTSTATUS NTAPI RtlQueueWorkItem ( IN WORKERCALLBACKFUNC  Function,
IN PVOID Context  OPTIONAL,
IN ULONG  Flags 
)

Definition at line 779 of file workitem.c.

782{
783 LONG FreeWorkers;
786
787 DPRINT("RtlQueueWorkItem(0x%p, 0x%p, 0x%x)\n", Function, Context, Flags);
788
789 /* Initialize the thread pool if not already initialized */
791 {
793
794 if (!NT_SUCCESS(Status))
795 return Status;
796 }
797
798 /* Allocate a work item */
799 WorkItem = RtlAllocateHeap(RtlGetProcessHeap(),
800 0,
801 sizeof(RTLP_WORKITEM));
802 if (WorkItem == NULL)
803 return STATUS_NO_MEMORY;
804
805 WorkItem->Function = Function;
806 WorkItem->Context = Context;
807 WorkItem->Flags = Flags;
808
810 {
812
813 if (!NT_SUCCESS(Status))
814 {
815 DPRINT1("Failed to get impersonation token! Status: 0x%x\n", Status);
816 goto Cleanup;
817 }
818 }
819 else
820 WorkItem->TokenHandle = NULL;
821
823 if (NT_SUCCESS(Status))
824 {
826 {
827 /* FIXME - We should optimize the algorithm used to determine whether to grow the thread pool! */
828
830
833 {
834 /* We shouldn't queue a long function into the persistent IO thread */
835 FreeWorkers--;
836 }
837
838 /* See if it's a good idea to grow the pool */
841 {
842 /* Grow the thread pool */
844
845 if (!NT_SUCCESS(Status) && *((volatile LONG*)&ThreadPoolIOWorkerThreads) != 0)
846 {
847 /* We failed to create the thread, but there's at least one there so
848 we can at least queue the request */
850 }
851 }
852
853 if (NT_SUCCESS(Status))
854 {
855 /* Queue a IO worker thread */
857 }
858 }
859 else
860 {
861 /* FIXME - We should optimize the algorithm used to determine whether to grow the thread pool! */
862
864
865 /* See if it's a good idea to grow the pool */
868 {
869 /* Grow the thread pool */
871
872 if (!NT_SUCCESS(Status) && *((volatile LONG*)&ThreadPoolWorkerThreads) != 0)
873 {
874 /* We failed to create the thread, but there's at least one there so
875 we can at least queue the request */
877 }
878 }
879
880 if (NT_SUCCESS(Status))
881 {
882 /* Queue a normal worker thread */
884 }
885 }
886
888 }
889
890 if (!NT_SUCCESS(Status))
891 {
892 if (WorkItem->TokenHandle != NULL)
893 {
894 NtClose(WorkItem->TokenHandle);
895 }
896
897Cleanup:
898 RtlFreeHeap(RtlGetProcessHeap(),
899 0,
900 WorkItem);
901 }
902
903 return Status;
904}
PVOID NTAPI RtlAllocateHeap(IN PVOID HeapHandle, IN ULONG Flags, IN SIZE_T Size)
Definition: heap.c:590
static const WCHAR Cleanup[]
Definition: register.c:80
static NTSTATUS RtlpInitializeThreadPool(VOID)
Definition: workitem.c:81
static NTSTATUS RtlpGetImpersonationToken(OUT PHANDLE TokenHandle)
Definition: workitem.c:146
static ULONG NTAPI RtlpWorkerThreadProc(IN PVOID Parameter)
Definition: workitem.c:678
#define IsThreadPoolInitialized()
Definition: workitem.c:78
static ULONG NTAPI RtlpIoWorkerThreadProc(IN PVOID Parameter)
Definition: workitem.c:561
static NTSTATUS RtlpStartWorkerThread(PTHREAD_START_ROUTINE StartRoutine)
Definition: workitem.c:164
static NTSTATUS RtlpQueueIoWorkerThread(IN OUT PRTLP_WORKITEM WorkItem)
Definition: workitem.c:385
static NTSTATUS RtlpQueueWorkerThread(IN OUT PRTLP_WORKITEM WorkItem)
Definition: workitem.c:258
#define WT_TRANSFER_IMPERSONATION
Definition: winnt_old.h:1047
#define WT_EXECUTEINIOTHREAD
Definition: winnt_old.h:1039
#define WT_EXECUTEINUITHREAD
Definition: winnt_old.h:1040
_Must_inspect_result_ _In_ ULONG Flags
Definition: wsk.h:170

◆ RtlSetIoCompletionCallback()

NTSTATUS NTAPI RtlSetIoCompletionCallback ( IN HANDLE  FileHandle,
IN PIO_APC_ROUTINE  Callback,
IN ULONG  Flags 
)

Definition at line 911 of file workitem.c.

914{
916 FILE_COMPLETION_INFORMATION FileCompletionInfo;
918
919 DPRINT("RtlSetIoCompletionCallback(0x%p, 0x%p, 0x%x)\n", FileHandle, Callback, Flags);
920
921 /* Initialize the thread pool if not already initialized */
923 {
925 if (!NT_SUCCESS(Status))
926 return Status;
927 }
928
929 FileCompletionInfo.Port = ThreadPoolCompletionPort;
930 FileCompletionInfo.Key = (PVOID)Callback;
931
934 &FileCompletionInfo,
935 sizeof(FileCompletionInfo),
937
938 return Status;
939}
_Must_inspect_result_ _In_opt_ PFLT_INSTANCE _Out_ PHANDLE FileHandle
Definition: fltkernel.h:1231
@ FileCompletionInformation
Definition: from_kernel.h:91
NTSYSAPI NTSTATUS NTAPI NtSetInformationFile(IN HANDLE hFile, OUT PIO_STATUS_BLOCK pIoStatusBlock, IN PVOID FileInformationBuffer, IN ULONG FileInformationBufferLength, IN FILE_INFORMATION_CLASS FileInfoClass)
Definition: iofunc.c:3096
void * PVOID
Definition: typedefs.h:50
_In_ WDFINTERRUPT _In_ PFN_WDF_INTERRUPT_SYNCHRONIZE Callback
Definition: wdfinterrupt.h:458

◆ RtlSetThreadPoolStartFunc()

NTSTATUS NTAPI RtlSetThreadPoolStartFunc ( IN PRTL_START_POOL_THREAD  StartPoolThread,
IN PRTL_EXIT_POOL_THREAD  ExitPoolThread 
)

Definition at line 946 of file workitem.c.

948{
949 RtlpStartThreadFunc = StartPoolThread;
950 RtlpExitThreadFunc = ExitPoolThread;
951 return STATUS_SUCCESS;
952}

Variable Documentation

◆ PersistentIoThread

PRTLP_IOWORKERTHREAD PersistentIoThread
static

Definition at line 68 of file workitem.c.

Referenced by RtlpInitializeThreadPool(), RtlpQueueIoWorkerThread(), and RtlQueueWorkItem().

◆ RtlpExitThreadFunc

◆ RtlpStartThreadFunc

◆ ThreadPoolCompletionPort

HANDLE ThreadPoolCompletionPort
static

◆ ThreadPoolInitialized

LONG ThreadPoolInitialized = 0
static

Definition at line 66 of file workitem.c.

Referenced by RtlpInitializeThreadPool().

◆ ThreadPoolIOWorkerThreads

LONG ThreadPoolIOWorkerThreads
static

◆ ThreadPoolIOWorkerThreadsList

LIST_ENTRY ThreadPoolIOWorkerThreadsList
static

◆ ThreadPoolIOWorkerThreadsLongRequests

LONG ThreadPoolIOWorkerThreadsLongRequests
static

◆ ThreadPoolIOWorkerThreadsRequests

LONG ThreadPoolIOWorkerThreadsRequests
static

◆ ThreadPoolLock

◆ ThreadPoolWorkerThreads

LONG ThreadPoolWorkerThreads
static

Definition at line 71 of file workitem.c.

Referenced by RtlpInitializeThreadPool(), RtlpWorkerThreadProc(), and RtlQueueWorkItem().

◆ ThreadPoolWorkerThreadsLongRequests

LONG ThreadPoolWorkerThreadsLongRequests
static

◆ ThreadPoolWorkerThreadsRequests

LONG ThreadPoolWorkerThreadsRequests
static