Home | Info | Community | Development | myReactOS | Contact Us
ReactOS Development > Doxygenworkitem.c
Go to the documentation of this file.
00001 /* 00002 * COPYRIGHT: See COPYING in the top level directory 00003 * PROJECT: ReactOS system libraries 00004 * PURPOSE: Work Item implementation 00005 * FILE: lib/rtl/workitem.c 00006 * PROGRAMMER: 00007 */ 00008 00009 /* INCLUDES *****************************************************************/ 00010 00011 #include <rtl.h> 00012 00013 #define NDEBUG 00014 #include <debug.h> 00015 00016 /* FUNCTIONS ***************************************************************/ 00017 00018 NTSTATUS 00019 NTAPI 00020 RtlpStartThread(IN PTHREAD_START_ROUTINE Function, 00021 IN PVOID Parameter, 00022 OUT PHANDLE ThreadHandle) 00023 { 00024 /* Create a native worker thread -- used for SMSS, CSRSS, etc... */ 00025 return RtlCreateUserThread(NtCurrentProcess(), 00026 NULL, 00027 TRUE, 00028 0, 00029 0, 00030 0, 00031 Function, 00032 Parameter, 00033 ThreadHandle, 00034 NULL); 00035 } 00036 00037 NTSTATUS 00038 NTAPI 00039 RtlpExitThread(IN NTSTATUS ExitStatus) 00040 { 00041 /* Kill a native worker thread -- used for SMSS, CSRSS, etc... */ 00042 return NtTerminateThread(NtCurrentThread(), ExitStatus); 00043 } 00044 00045 PRTL_START_POOL_THREAD RtlpStartThreadFunc = RtlpStartThread; 00046 PRTL_EXIT_POOL_THREAD RtlpExitThreadFunc = RtlpExitThread; 00047 00048 #define MAX_WORKERTHREADS 0x100 00049 #define WORKERTHREAD_CREATION_THRESHOLD 0x5 00050 00051 typedef struct _RTLP_IOWORKERTHREAD 00052 { 00053 LIST_ENTRY ListEntry; 00054 HANDLE ThreadHandle; 00055 ULONG Flags; 00056 } RTLP_IOWORKERTHREAD, *PRTLP_IOWORKERTHREAD; 00057 00058 typedef struct _RTLP_WORKITEM 00059 { 00060 WORKERCALLBACKFUNC Function; 00061 PVOID Context; 00062 ULONG Flags; 00063 HANDLE TokenHandle; 00064 } RTLP_WORKITEM, *PRTLP_WORKITEM; 00065 00066 static LONG ThreadPoolInitialized = 0; 00067 static RTL_CRITICAL_SECTION ThreadPoolLock; 00068 static PRTLP_IOWORKERTHREAD PersistentIoThread; 00069 static LIST_ENTRY ThreadPoolIOWorkerThreadsList; 00070 static HANDLE ThreadPoolCompletionPort; 00071 static LONG ThreadPoolWorkerThreads; 00072 static LONG ThreadPoolWorkerThreadsRequests; 00073 static LONG ThreadPoolWorkerThreadsLongRequests; 00074 static LONG ThreadPoolIOWorkerThreads; 00075 static LONG ThreadPoolIOWorkerThreadsRequests; 00076 static LONG ThreadPoolIOWorkerThreadsLongRequests; 00077 00078 #define IsThreadPoolInitialized() (*((volatile LONG*)&ThreadPoolInitialized) == 1) 00079 00080 static NTSTATUS 00081 RtlpInitializeThreadPool(VOID) 00082 { 00083 NTSTATUS Status = STATUS_SUCCESS; 00084 LONG InitStatus; 00085 00086 do 00087 { 00088 InitStatus = InterlockedCompareExchange(&ThreadPoolInitialized, 00089 2, 00090 0); 00091 if (InitStatus == 0) 00092 { 00093 /* We're the first thread to initialize the thread pool */ 00094 00095 InitializeListHead(&ThreadPoolIOWorkerThreadsList); 00096 00097 PersistentIoThread = NULL; 00098 00099 ThreadPoolWorkerThreads = 0; 00100 ThreadPoolWorkerThreadsRequests = 0; 00101 ThreadPoolWorkerThreadsLongRequests = 0; 00102 ThreadPoolIOWorkerThreads = 0; 00103 ThreadPoolIOWorkerThreadsRequests = 0; 00104 ThreadPoolIOWorkerThreadsLongRequests = 0; 00105 00106 /* Initialize the lock */ 00107 Status = RtlInitializeCriticalSection(&ThreadPoolLock); 00108 if (!NT_SUCCESS(Status)) 00109 goto Finish; 00110 00111 /* Create the complection port */ 00112 Status = NtCreateIoCompletion(&ThreadPoolCompletionPort, 00113 IO_COMPLETION_ALL_ACCESS, 00114 NULL, 00115 0); 00116 if (!NT_SUCCESS(Status)) 00117 { 00118 RtlDeleteCriticalSection(&ThreadPoolLock); 00119 goto Finish; 00120 } 00121 00122 Finish: 00123 /* Initialization done */ 00124 InterlockedExchange(&ThreadPoolInitialized, 00125 1); 00126 break; 00127 } 00128 else if (InitStatus == 2) 00129 { 00130 LARGE_INTEGER Timeout; 00131 00132 /* Another thread is currently initializing the thread pool! 00133 Poll after a short period of time to see if the initialization 00134 was completed */ 00135 00136 Timeout.QuadPart = -10000000LL; /* Wait for a second */ 00137 NtDelayExecution(FALSE, 00138 &Timeout); 00139 } 00140 } while (InitStatus != 1); 00141 00142 return Status; 00143 } 00144 00145 static NTSTATUS 00146 RtlpGetImpersonationToken(OUT PHANDLE TokenHandle) 00147 { 00148 NTSTATUS Status; 00149 00150 Status = NtOpenThreadToken(NtCurrentThread(), 00151 TOKEN_IMPERSONATE, 00152 TRUE, 00153 TokenHandle); 00154 if (Status == STATUS_NO_TOKEN || Status == STATUS_CANT_OPEN_ANONYMOUS) 00155 { 00156 *TokenHandle = NULL; 00157 Status = STATUS_SUCCESS; 00158 } 00159 00160 return Status; 00161 } 00162 00163 static NTSTATUS 00164 RtlpStartWorkerThread(PTHREAD_START_ROUTINE StartRoutine) 00165 { 00166 NTSTATUS Status; 00167 HANDLE ThreadHandle; 00168 LARGE_INTEGER Timeout; 00169 volatile LONG WorkerInitialized = 0; 00170 00171 Timeout.QuadPart = -10000LL; /* Wait for 100ms */ 00172 00173 /* Start the thread */ 00174 Status = RtlpStartThreadFunc(StartRoutine, (PVOID)&WorkerInitialized, &ThreadHandle); 00175 if (NT_SUCCESS(Status)) 00176 { 00177 NtResumeThread(ThreadHandle, NULL); 00178 00179 /* Poll until the thread got a chance to initialize */ 00180 while (WorkerInitialized == 0) 00181 { 00182 NtDelayExecution(FALSE, 00183 &Timeout); 00184 } 00185 00186 NtClose(ThreadHandle); 00187 } 00188 00189 return Status; 00190 } 00191 00192 static VOID 00193 NTAPI 00194 RtlpExecuteWorkItem(IN OUT PVOID NormalContext, 00195 IN OUT PVOID SystemArgument1, 00196 IN OUT PVOID SystemArgument2) 00197 { 00198 NTSTATUS Status; 00199 BOOLEAN Impersonated = FALSE; 00200 RTLP_WORKITEM WorkItem = *(volatile RTLP_WORKITEM *)SystemArgument2; 00201 00202 RtlFreeHeap(RtlGetProcessHeap(), 00203 0, 00204 SystemArgument2); 00205 00206 if (WorkItem.TokenHandle != NULL) 00207 { 00208 Status = NtSetInformationThread(NtCurrentThread(), 00209 ThreadImpersonationToken, 00210 &WorkItem.TokenHandle, 00211 sizeof(HANDLE)); 00212 00213 NtClose(WorkItem.TokenHandle); 00214 00215 if (NT_SUCCESS(Status)) 00216 { 00217 Impersonated = TRUE; 00218 } 00219 } 00220 00221 _SEH2_TRY 00222 { 00223 DPRINT("RtlpExecuteWorkItem: Function: 0x%p Context: 0x%p ImpersonationToken: 0x%p\n", WorkItem.Function, WorkItem.Context, WorkItem.TokenHandle); 00224 00225 /* Execute the function */ 00226 WorkItem.Function(WorkItem.Context); 00227 } 00228 _SEH2_EXCEPT(EXCEPTION_EXECUTE_HANDLER) 00229 { 00230 DPRINT1("Exception 0x%x while executing IO work item 0x%p\n", _SEH2_GetExceptionCode(), WorkItem.Function); 00231 } 00232 _SEH2_END; 00233 00234 if (Impersonated) 00235 { 00236 WorkItem.TokenHandle = NULL; 00237 Status = NtSetInformationThread(NtCurrentThread(), 00238 ThreadImpersonationToken, 00239 &WorkItem.TokenHandle, 00240 sizeof(HANDLE)); 00241 if (!NT_SUCCESS(Status)) 00242 { 00243 DPRINT1("Failed to revert worker thread to self!!! Status: 0x%x\n", Status); 00244 } 00245 } 00246 00247 /* update the requests counter */ 00248 InterlockedDecrement(&ThreadPoolWorkerThreadsRequests); 00249 00250 if (WorkItem.Flags & WT_EXECUTELONGFUNCTION) 00251 { 00252 InterlockedDecrement(&ThreadPoolWorkerThreadsLongRequests); 00253 } 00254 } 00255 00256 00257 static NTSTATUS 00258 RtlpQueueWorkerThread(IN OUT PRTLP_WORKITEM WorkItem) 00259 { 00260 NTSTATUS Status = STATUS_SUCCESS; 00261 00262 InterlockedIncrement(&ThreadPoolWorkerThreadsRequests); 00263 00264 if (WorkItem->Flags & WT_EXECUTELONGFUNCTION) 00265 { 00266 InterlockedIncrement(&ThreadPoolWorkerThreadsLongRequests); 00267 } 00268 00269 if (WorkItem->Flags & WT_EXECUTEINPERSISTENTTHREAD) 00270 { 00271 Status = RtlpInitializeTimerThread(); 00272 00273 if (NT_SUCCESS(Status)) 00274 { 00275 /* Queue an APC in the timer thread */ 00276 Status = NtQueueApcThread(TimerThreadHandle, 00277 RtlpExecuteWorkItem, 00278 NULL, 00279 NULL, 00280 WorkItem); 00281 } 00282 } 00283 else 00284 { 00285 /* Queue an IO completion message */ 00286 Status = NtSetIoCompletion(ThreadPoolCompletionPort, 00287 RtlpExecuteWorkItem, 00288 WorkItem, 00289 STATUS_SUCCESS, 00290 0); 00291 } 00292 00293 if (!NT_SUCCESS(Status)) 00294 { 00295 InterlockedDecrement(&ThreadPoolWorkerThreadsRequests); 00296 00297 if (WorkItem->Flags & WT_EXECUTELONGFUNCTION) 00298 { 00299 InterlockedDecrement(&ThreadPoolWorkerThreadsLongRequests); 00300 } 00301 } 00302 00303 return Status; 00304 } 00305 00306 static VOID 00307 NTAPI 00308 RtlpExecuteIoWorkItem(IN OUT PVOID NormalContext, 00309 IN OUT PVOID SystemArgument1, 00310 IN OUT PVOID SystemArgument2) 00311 { 00312 NTSTATUS Status; 00313 BOOLEAN Impersonated = FALSE; 00314 PRTLP_IOWORKERTHREAD IoThread = (PRTLP_IOWORKERTHREAD)NormalContext; 00315 RTLP_WORKITEM WorkItem = *(volatile RTLP_WORKITEM *)SystemArgument2; 00316 00317 ASSERT(IoThread != NULL); 00318 00319 RtlFreeHeap(RtlGetProcessHeap(), 00320 0, 00321 SystemArgument2); 00322 00323 if (WorkItem.TokenHandle != NULL) 00324 { 00325 Status = NtSetInformationThread(NtCurrentThread(), 00326 ThreadImpersonationToken, 00327 &WorkItem.TokenHandle, 00328 sizeof(HANDLE)); 00329 00330 NtClose(WorkItem.TokenHandle); 00331 00332 if (NT_SUCCESS(Status)) 00333 { 00334 Impersonated = TRUE; 00335 } 00336 } 00337 00338 _SEH2_TRY 00339 { 00340 DPRINT("RtlpExecuteIoWorkItem: Function: 0x%p Context: 0x%p ImpersonationToken: 0x%p\n", WorkItem.Function, WorkItem.Context, WorkItem.TokenHandle); 00341 00342 /* Execute the function */ 00343 WorkItem.Function(WorkItem.Context); 00344 } 00345 _SEH2_EXCEPT(EXCEPTION_EXECUTE_HANDLER) 00346 { 00347 DPRINT1("Exception 0x%x while executing IO work item 0x%p\n", _SEH2_GetExceptionCode(), WorkItem.Function); 00348 } 00349 _SEH2_END; 00350 00351 if (Impersonated) 00352 { 00353 WorkItem.TokenHandle = NULL; 00354 Status = NtSetInformationThread(NtCurrentThread(), 00355 ThreadImpersonationToken, 00356 &WorkItem.TokenHandle, 00357 sizeof(HANDLE)); 00358 if (!NT_SUCCESS(Status)) 00359 { 00360 DPRINT1("Failed to revert worker thread to self!!! Status: 0x%x\n", Status); 00361 } 00362 } 00363 00364 /* remove the long function flag */ 00365 if (WorkItem.Flags & WT_EXECUTELONGFUNCTION) 00366 { 00367 Status = RtlEnterCriticalSection(&ThreadPoolLock); 00368 if (NT_SUCCESS(Status)) 00369 { 00370 IoThread->Flags &= ~WT_EXECUTELONGFUNCTION; 00371 RtlLeaveCriticalSection(&ThreadPoolLock); 00372 } 00373 } 00374 00375 /* update the requests counter */ 00376 InterlockedDecrement(&ThreadPoolIOWorkerThreadsRequests); 00377 00378 if (WorkItem.Flags & WT_EXECUTELONGFUNCTION) 00379 { 00380 InterlockedDecrement(&ThreadPoolIOWorkerThreadsLongRequests); 00381 } 00382 } 00383 00384 static NTSTATUS 00385 RtlpQueueIoWorkerThread(IN OUT PRTLP_WORKITEM WorkItem) 00386 { 00387 PLIST_ENTRY CurrentEntry; 00388 PRTLP_IOWORKERTHREAD IoThread = NULL; 00389 NTSTATUS Status = STATUS_SUCCESS; 00390 00391 if (WorkItem->Flags & WT_EXECUTEINPERSISTENTIOTHREAD) 00392 { 00393 if (PersistentIoThread != NULL) 00394 { 00395 /* We already have a persistent IO worker thread */ 00396 IoThread = PersistentIoThread; 00397 } 00398 else 00399 { 00400 /* We're not aware of any persistent IO worker thread. Search for a unused 00401 worker thread that doesn't have a long function queued */ 00402 CurrentEntry = ThreadPoolIOWorkerThreadsList.Flink; 00403 while (CurrentEntry != &ThreadPoolIOWorkerThreadsList) 00404 { 00405 IoThread = CONTAINING_RECORD(CurrentEntry, 00406 RTLP_IOWORKERTHREAD, 00407 ListEntry); 00408 00409 if (!(IoThread->Flags & WT_EXECUTELONGFUNCTION)) 00410 break; 00411 00412 CurrentEntry = CurrentEntry->Flink; 00413 } 00414 00415 if (CurrentEntry != &ThreadPoolIOWorkerThreadsList) 00416 { 00417 /* Found a worker thread we can use. */ 00418 ASSERT(IoThread != NULL); 00419 00420 IoThread->Flags |= WT_EXECUTEINPERSISTENTIOTHREAD; 00421 PersistentIoThread = IoThread; 00422 } 00423 else 00424 { 00425 DPRINT1("Failed to find a worker thread for the persistent IO thread!\n"); 00426 return STATUS_NO_MEMORY; 00427 } 00428 } 00429 } 00430 else 00431 { 00432 /* Find a worker thread that is not currently executing a long function */ 00433 CurrentEntry = ThreadPoolIOWorkerThreadsList.Flink; 00434 while (CurrentEntry != &ThreadPoolIOWorkerThreadsList) 00435 { 00436 IoThread = CONTAINING_RECORD(CurrentEntry, 00437 RTLP_IOWORKERTHREAD, 00438 ListEntry); 00439 00440 if (!(IoThread->Flags & WT_EXECUTELONGFUNCTION)) 00441 { 00442 /* if we're trying to queue a long function then make sure we're not dealing 00443 with the persistent thread */ 00444 if ((WorkItem->Flags & WT_EXECUTELONGFUNCTION) && !(IoThread->Flags & WT_EXECUTEINPERSISTENTIOTHREAD)) 00445 { 00446 /* found a candidate */ 00447 break; 00448 } 00449 } 00450 00451 CurrentEntry = CurrentEntry->Flink; 00452 } 00453 00454 if (CurrentEntry == &ThreadPoolIOWorkerThreadsList) 00455 { 00456 /* Couldn't find an appropriate thread, see if we can use the persistent thread (if it exists) for now */ 00457 if (ThreadPoolIOWorkerThreads == 0) 00458 { 00459 DPRINT1("Failed to find a worker thread for the work item 0x%p!\n"); 00460 ASSERT(IsListEmpty(&ThreadPoolIOWorkerThreadsList)); 00461 return STATUS_NO_MEMORY; 00462 } 00463 else 00464 { 00465 /* pick the first worker thread */ 00466 CurrentEntry = ThreadPoolIOWorkerThreadsList.Flink; 00467 IoThread = CONTAINING_RECORD(CurrentEntry, 00468 RTLP_IOWORKERTHREAD, 00469 ListEntry); 00470 00471 /* Since this might be the persistent worker thread, don't run as a 00472 long function */ 00473 WorkItem->Flags &= ~WT_EXECUTELONGFUNCTION; 00474 } 00475 } 00476 00477 /* Move the picked thread to the end of the list. Since we're always searching 00478 from the beginning, this improves distribution of work items */ 00479 RemoveEntryList(&IoThread->ListEntry); 00480 InsertTailList(&ThreadPoolIOWorkerThreadsList, 00481 &IoThread->ListEntry); 00482 } 00483 00484 ASSERT(IoThread != NULL); 00485 00486 InterlockedIncrement(&ThreadPoolIOWorkerThreadsRequests); 00487 00488 if (WorkItem->Flags & WT_EXECUTELONGFUNCTION) 00489 { 00490 /* We're about to queue a long function, mark the thread */ 00491 IoThread->Flags |= WT_EXECUTELONGFUNCTION; 00492 00493 InterlockedIncrement(&ThreadPoolIOWorkerThreadsLongRequests); 00494 } 00495 00496 /* It's time to queue the work item */ 00497 Status = NtQueueApcThread(IoThread->ThreadHandle, 00498 RtlpExecuteIoWorkItem, 00499 IoThread, 00500 NULL, 00501 WorkItem); 00502 if (!NT_SUCCESS(Status)) 00503 { 00504 DPRINT1("Failed to queue APC for work item 0x%p\n", WorkItem->Function); 00505 InterlockedDecrement(&ThreadPoolIOWorkerThreadsRequests); 00506 00507 if (WorkItem->Flags & WT_EXECUTELONGFUNCTION) 00508 { 00509 InterlockedDecrement(&ThreadPoolIOWorkerThreadsLongRequests); 00510 } 00511 } 00512 00513 return Status; 00514 } 00515 00516 static BOOLEAN 00517 RtlpIsIoPending(IN HANDLE ThreadHandle OPTIONAL) 00518 { 00519 NTSTATUS Status; 00520 ULONG IoPending; 00521 BOOLEAN CreatedHandle = FALSE; 00522 BOOLEAN IsIoPending = TRUE; 00523 00524 if (ThreadHandle == NULL) 00525 { 00526 Status = NtDuplicateObject(NtCurrentProcess(), 00527 NtCurrentThread(), 00528 NtCurrentProcess(), 00529 &ThreadHandle, 00530 0, 00531 0, 00532 DUPLICATE_SAME_ACCESS); 00533 if (!NT_SUCCESS(Status)) 00534 { 00535 return IsIoPending; 00536 } 00537 00538 CreatedHandle = TRUE; 00539 } 00540 00541 Status = NtQueryInformationThread(ThreadHandle, 00542 ThreadIsIoPending, 00543 &IoPending, 00544 sizeof(IoPending), 00545 NULL); 00546 if (NT_SUCCESS(Status) && IoPending == 0) 00547 { 00548 IsIoPending = FALSE; 00549 } 00550 00551 if (CreatedHandle) 00552 { 00553 NtClose(ThreadHandle); 00554 } 00555 00556 return IsIoPending; 00557 } 00558 00559 static ULONG 00560 NTAPI 00561 RtlpIoWorkerThreadProc(IN PVOID Parameter) 00562 { 00563 volatile RTLP_IOWORKERTHREAD ThreadInfo; 00564 LARGE_INTEGER Timeout; 00565 BOOLEAN Terminate; 00566 NTSTATUS Status = STATUS_SUCCESS; 00567 00568 if (InterlockedIncrement(&ThreadPoolIOWorkerThreads) > MAX_WORKERTHREADS) 00569 { 00570 /* Oops, too many worker threads... */ 00571 goto InitFailed; 00572 } 00573 00574 /* Get a thread handle to ourselves */ 00575 Status = NtDuplicateObject(NtCurrentProcess(), 00576 NtCurrentThread(), 00577 NtCurrentProcess(), 00578 (PHANDLE)&ThreadInfo.ThreadHandle, 00579 0, 00580 0, 00581 DUPLICATE_SAME_ACCESS); 00582 if (!NT_SUCCESS(Status)) 00583 { 00584 DPRINT1("Failed to create handle to own thread! Status: 0x%x\n", Status); 00585 00586 InitFailed: 00587 InterlockedDecrement(&ThreadPoolIOWorkerThreads); 00588 00589 /* Signal initialization completion */ 00590 InterlockedExchange((PLONG)Parameter, 00591 1); 00592 00593 RtlpExitThreadFunc(Status); 00594 return 0; 00595 } 00596 00597 ThreadInfo.Flags = 0; 00598 00599 /* Insert the thread into the list */ 00600 InsertHeadList((PLIST_ENTRY)&ThreadPoolIOWorkerThreadsList, 00601 (PLIST_ENTRY)&ThreadInfo.ListEntry); 00602 00603 /* Signal initialization completion */ 00604 InterlockedExchange((PLONG)Parameter, 00605 1); 00606 00607 for (;;) 00608 { 00609 Timeout.QuadPart = -50000000LL; /* Wait for 5 seconds by default */ 00610 00611 Wait: 00612 do 00613 { 00614 /* Perform an alertable wait, the work items are going to be executed as APCs */ 00615 Status = NtDelayExecution(TRUE, 00616 &Timeout); 00617 00618 /* Loop as long as we executed an APC */ 00619 } while (Status != STATUS_SUCCESS); 00620 00621 /* We timed out, let's see if we're allowed to terminate */ 00622 Terminate = FALSE; 00623 00624 Status = RtlEnterCriticalSection(&ThreadPoolLock); 00625 if (NT_SUCCESS(Status)) 00626 { 00627 if (ThreadInfo.Flags & WT_EXECUTEINPERSISTENTIOTHREAD) 00628 { 00629 /* This thread is supposed to be persistent. Don't terminate! */ 00630 RtlLeaveCriticalSection(&ThreadPoolLock); 00631 00632 Timeout.QuadPart = -0x7FFFFFFFFFFFFFFFLL; 00633 goto Wait; 00634 } 00635 00636 /* FIXME - figure out an effective method to determine if it's appropriate to 00637 lower the number of threads. For now let's always terminate if there's 00638 at least one thread and no queued items. */ 00639 Terminate = (*((volatile LONG*)&ThreadPoolIOWorkerThreads) - *((volatile LONG*)&ThreadPoolIOWorkerThreadsLongRequests) >= WORKERTHREAD_CREATION_THRESHOLD) && 00640 (*((volatile LONG*)&ThreadPoolIOWorkerThreadsRequests) == 0); 00641 00642 if (Terminate) 00643 { 00644 /* Prevent termination as long as IO is pending */ 00645 Terminate = !RtlpIsIoPending(ThreadInfo.ThreadHandle); 00646 } 00647 00648 if (Terminate) 00649 { 00650 /* Rundown the thread and unlink it from the list */ 00651 InterlockedDecrement(&ThreadPoolIOWorkerThreads); 00652 RemoveEntryList((PLIST_ENTRY)&ThreadInfo.ListEntry); 00653 } 00654 00655 RtlLeaveCriticalSection(&ThreadPoolLock); 00656 00657 if (Terminate) 00658 { 00659 /* Break the infinite loop and terminate */ 00660 Status = STATUS_SUCCESS; 00661 break; 00662 } 00663 } 00664 else 00665 { 00666 DPRINT1("Failed to acquire the thread pool lock!!! Status: 0x%x\n", Status); 00667 break; 00668 } 00669 } 00670 00671 NtClose(ThreadInfo.ThreadHandle); 00672 RtlpExitThreadFunc(Status); 00673 return 0; 00674 } 00675 00676 static ULONG 00677 NTAPI 00678 RtlpWorkerThreadProc(IN PVOID Parameter) 00679 { 00680 LARGE_INTEGER Timeout; 00681 BOOLEAN Terminate; 00682 PVOID SystemArgument2; 00683 IO_STATUS_BLOCK IoStatusBlock; 00684 ULONG TimeoutCount = 0; 00685 PKNORMAL_ROUTINE ApcRoutine; 00686 NTSTATUS Status = STATUS_SUCCESS; 00687 00688 if (InterlockedIncrement(&ThreadPoolWorkerThreads) > MAX_WORKERTHREADS) 00689 { 00690 /* Signal initialization completion */ 00691 InterlockedExchange((PLONG)Parameter, 00692 1); 00693 00694 /* Oops, too many worker threads... */ 00695 RtlpExitThreadFunc(Status); 00696 return 0; 00697 } 00698 00699 /* Signal initialization completion */ 00700 InterlockedExchange((PLONG)Parameter, 00701 1); 00702 00703 for (;;) 00704 { 00705 Timeout.QuadPart = -50000000LL; /* Wait for 5 seconds by default */ 00706 00707 /* Dequeue a completion message */ 00708 Status = NtRemoveIoCompletion(ThreadPoolCompletionPort, 00709 (PVOID*)&ApcRoutine, 00710 &SystemArgument2, 00711 &IoStatusBlock, 00712 &Timeout); 00713 00714 if (Status == STATUS_SUCCESS) 00715 { 00716 TimeoutCount = 0; 00717 00718 _SEH2_TRY 00719 { 00720 /* Call the APC routine */ 00721 ApcRoutine(NULL, 00722 (PVOID)IoStatusBlock.Information, 00723 SystemArgument2); 00724 } 00725 _SEH2_EXCEPT(EXCEPTION_EXECUTE_HANDLER) 00726 { 00727 } 00728 _SEH2_END; 00729 } 00730 else 00731 { 00732 Terminate = FALSE; 00733 00734 if (!NT_SUCCESS(RtlEnterCriticalSection(&ThreadPoolLock))) 00735 continue; 00736 00737 /* FIXME - this should be optimized, check if there's requests, etc */ 00738 00739 if (Status == STATUS_TIMEOUT) 00740 { 00741 /* FIXME - we might want to optimize this */ 00742 if (TimeoutCount++ > 2 && 00743 *((volatile LONG*)&ThreadPoolWorkerThreads) - *((volatile LONG*)&ThreadPoolWorkerThreadsLongRequests) >= WORKERTHREAD_CREATION_THRESHOLD) 00744 { 00745 Terminate = TRUE; 00746 } 00747 } 00748 else 00749 Terminate = TRUE; 00750 00751 RtlLeaveCriticalSection(&ThreadPoolLock); 00752 00753 if (Terminate) 00754 { 00755 /* Prevent termination as long as IO is pending */ 00756 Terminate = !RtlpIsIoPending(NULL); 00757 } 00758 00759 if (Terminate) 00760 { 00761 InterlockedDecrement(&ThreadPoolWorkerThreads); 00762 Status = STATUS_SUCCESS; 00763 break; 00764 } 00765 } 00766 } 00767 00768 RtlpExitThreadFunc(Status); 00769 return 0; 00770 00771 } 00772 00773 /* 00774 * @implemented 00775 */ 00776 NTSTATUS 00777 NTAPI 00778 RtlQueueWorkItem(IN WORKERCALLBACKFUNC Function, 00779 IN PVOID Context OPTIONAL, 00780 IN ULONG Flags) 00781 { 00782 LONG FreeWorkers; 00783 NTSTATUS Status; 00784 PRTLP_WORKITEM WorkItem; 00785 00786 DPRINT("RtlQueueWorkItem(0x%p, 0x%p, 0x%x)\n", Function, Context, Flags); 00787 00788 /* Initialize the thread pool if not already initialized */ 00789 if (!IsThreadPoolInitialized()) 00790 { 00791 Status = RtlpInitializeThreadPool(); 00792 00793 if (!NT_SUCCESS(Status)) 00794 return Status; 00795 } 00796 00797 /* Allocate a work item */ 00798 WorkItem = RtlAllocateHeap(RtlGetProcessHeap(), 00799 0, 00800 sizeof(RTLP_WORKITEM)); 00801 if (WorkItem == NULL) 00802 return STATUS_NO_MEMORY; 00803 00804 WorkItem->Function = Function; 00805 WorkItem->Context = Context; 00806 WorkItem->Flags = Flags; 00807 00808 if (Flags & WT_TRANSFER_IMPERSONATION) 00809 { 00810 Status = RtlpGetImpersonationToken(&WorkItem->TokenHandle); 00811 00812 if (!NT_SUCCESS(Status)) 00813 { 00814 DPRINT1("Failed to get impersonation token! Status: 0x%x\n", Status); 00815 goto Cleanup; 00816 } 00817 } 00818 else 00819 WorkItem->TokenHandle = NULL; 00820 00821 Status = RtlEnterCriticalSection(&ThreadPoolLock); 00822 if (NT_SUCCESS(Status)) 00823 { 00824 if (Flags & (WT_EXECUTEINIOTHREAD | WT_EXECUTEINUITHREAD | WT_EXECUTEINPERSISTENTIOTHREAD)) 00825 { 00826 /* FIXME - We should optimize the algorithm used to determine whether to grow the thread pool! */ 00827 00828 FreeWorkers = ThreadPoolIOWorkerThreads - ThreadPoolIOWorkerThreadsLongRequests; 00829 00830 if (((Flags & (WT_EXECUTEINPERSISTENTIOTHREAD | WT_EXECUTELONGFUNCTION)) == WT_EXECUTELONGFUNCTION) && 00831 PersistentIoThread != NULL) 00832 { 00833 /* We shouldn't queue a long function into the persistent IO thread */ 00834 FreeWorkers--; 00835 } 00836 00837 /* See if it's a good idea to grow the pool */ 00838 if (ThreadPoolIOWorkerThreads < MAX_WORKERTHREADS && 00839 (FreeWorkers <= 0 || ThreadPoolIOWorkerThreads - ThreadPoolIOWorkerThreadsRequests < WORKERTHREAD_CREATION_THRESHOLD)) 00840 { 00841 /* Grow the thread pool */ 00842 Status = RtlpStartWorkerThread(RtlpIoWorkerThreadProc); 00843 00844 if (!NT_SUCCESS(Status) && *((volatile LONG*)&ThreadPoolIOWorkerThreads) != 0) 00845 { 00846 /* We failed to create the thread, but there's at least one there so 00847 we can at least queue the request */ 00848 Status = STATUS_SUCCESS; 00849 } 00850 } 00851 00852 if (NT_SUCCESS(Status)) 00853 { 00854 /* Queue a IO worker thread */ 00855 Status = RtlpQueueIoWorkerThread(WorkItem); 00856 } 00857 } 00858 else 00859 { 00860 /* FIXME - We should optimize the algorithm used to determine whether to grow the thread pool! */ 00861 00862 FreeWorkers = ThreadPoolWorkerThreads - ThreadPoolWorkerThreadsLongRequests; 00863 00864 /* See if it's a good idea to grow the pool */ 00865 if (ThreadPoolWorkerThreads < MAX_WORKERTHREADS && 00866 (FreeWorkers <= 0 || ThreadPoolWorkerThreads - ThreadPoolWorkerThreadsRequests < WORKERTHREAD_CREATION_THRESHOLD)) 00867 { 00868 /* Grow the thread pool */ 00869 Status = RtlpStartWorkerThread(RtlpWorkerThreadProc); 00870 00871 if (!NT_SUCCESS(Status) && *((volatile LONG*)&ThreadPoolWorkerThreads) != 0) 00872 { 00873 /* We failed to create the thread, but there's at least one there so 00874 we can at least queue the request */ 00875 Status = STATUS_SUCCESS; 00876 } 00877 } 00878 00879 if (NT_SUCCESS(Status)) 00880 { 00881 /* Queue a normal worker thread */ 00882 Status = RtlpQueueWorkerThread(WorkItem); 00883 } 00884 } 00885 00886 RtlLeaveCriticalSection(&ThreadPoolLock); 00887 } 00888 00889 if (!NT_SUCCESS(Status)) 00890 { 00891 if (WorkItem->TokenHandle != NULL) 00892 { 00893 NtClose(WorkItem->TokenHandle); 00894 } 00895 00896 Cleanup: 00897 RtlFreeHeap(RtlGetProcessHeap(), 00898 0, 00899 WorkItem); 00900 } 00901 00902 return Status; 00903 } 00904 00905 /* 00906 * @unimplemented 00907 */ 00908 NTSTATUS 00909 NTAPI 00910 RtlSetIoCompletionCallback(IN HANDLE FileHandle, 00911 IN PIO_APC_ROUTINE Callback, 00912 IN ULONG Flags) 00913 { 00914 UNIMPLEMENTED; 00915 return STATUS_NOT_IMPLEMENTED; 00916 } 00917 00918 /* 00919 * @implemented 00920 */ 00921 NTSTATUS 00922 NTAPI 00923 RtlSetThreadPoolStartFunc(IN PRTL_START_POOL_THREAD StartPoolThread, 00924 IN PRTL_EXIT_POOL_THREAD ExitPoolThread) 00925 { 00926 RtlpStartThreadFunc = StartPoolThread; 00927 RtlpExitThreadFunc = ExitPoolThread; 00928 return STATUS_SUCCESS; 00929 } Generated on Sun May 27 2012 04:28:06 for ReactOS by
1.7.6.1
|