Home | Info | Community | Development | myReactOS | Contact Us
ReactOS Development > Doxygenwork.c
Go to the documentation of this file.
00001 /* 00002 * COPYRIGHT: See COPYING in the top level directory 00003 * PROJECT: ReactOS Kernel 00004 * FILE: ntoskrnl/ex/work.c 00005 * PURPOSE: Manage system work queues and worker threads 00006 * PROGRAMMER: Alex Ionescu (alex@relsoft.net) 00007 */ 00008 00009 /* INCLUDES ******************************************************************/ 00010 00011 #include <ntoskrnl.h> 00012 #define NDEBUG 00013 #include <debug.h> 00014 00015 #if defined (ALLOC_PRAGMA) 00016 #pragma alloc_text(INIT, ExpInitializeWorkerThreads) 00017 #endif 00018 00019 /* DATA **********************************************************************/ 00020 00021 /* Number of worker threads for each Queue */ 00022 #define EX_HYPERCRITICAL_WORK_THREADS 1 00023 #define EX_DELAYED_WORK_THREADS 3 00024 #define EX_CRITICAL_WORK_THREADS 5 00025 00026 /* Magic flag for dynamic worker threads */ 00027 #define EX_DYNAMIC_WORK_THREAD 0x80000000 00028 00029 /* Worker thread priority increments (added to base priority) */ 00030 #define EX_HYPERCRITICAL_QUEUE_PRIORITY_INCREMENT 7 00031 #define EX_CRITICAL_QUEUE_PRIORITY_INCREMENT 5 00032 #define EX_DELAYED_QUEUE_PRIORITY_INCREMENT 4 00033 00034 /* The actual worker queue array */ 00035 EX_WORK_QUEUE ExWorkerQueue[MaximumWorkQueue]; 00036 00037 /* Accounting of the total threads and registry hacked threads */ 00038 ULONG ExpCriticalWorkerThreads; 00039 ULONG ExpDelayedWorkerThreads; 00040 ULONG ExpAdditionalCriticalWorkerThreads; 00041 ULONG ExpAdditionalDelayedWorkerThreads; 00042 00043 /* Future support for stack swapping worker threads */ 00044 BOOLEAN ExpWorkersCanSwap; 00045 LIST_ENTRY ExpWorkerListHead; 00046 FAST_MUTEX ExpWorkerSwapinMutex; 00047 00048 /* The worker balance set manager events */ 00049 KEVENT ExpThreadSetManagerEvent; 00050 KEVENT ExpThreadSetManagerShutdownEvent; 00051 00052 /* Thread pointers for future worker thread shutdown support */ 00053 PETHREAD ExpWorkerThreadBalanceManagerPtr; 00054 PETHREAD ExpLastWorkerThread; 00055 00056 /* PRIVATE FUNCTIONS *********************************************************/ 00057 00058 /*++ 00059 * @name ExpWorkerThreadEntryPoint 00060 * 00061 * The ExpWorkerThreadEntryPoint routine is the entrypoint for any new 00062 * worker thread created by teh system. 00063 * 00064 * @param Context 00065 * Contains the work queue type masked with a flag specifing whether the 00066 * thread is dynamic or not. 00067 * 00068 * @return None. 00069 * 00070 * @remarks A dynamic thread can timeout after 10 minutes of waiting on a queue 00071 * while a static thread will never timeout. 00072 * 00073 * Worker threads must return at IRQL == PASSIVE_LEVEL, must not have 00074 * active impersonation info, and must not have disabled APCs. 00075 * 00076 * NB: We will re-enable APCs for broken threads but all other cases 00077 * will generate a bugcheck. 00078 * 00079 *--*/ 00080 VOID 00081 NTAPI 00082 ExpWorkerThreadEntryPoint(IN PVOID Context) 00083 { 00084 PWORK_QUEUE_ITEM WorkItem; 00085 PLIST_ENTRY QueueEntry; 00086 WORK_QUEUE_TYPE WorkQueueType; 00087 PEX_WORK_QUEUE WorkQueue; 00088 LARGE_INTEGER Timeout; 00089 PLARGE_INTEGER TimeoutPointer = NULL; 00090 PETHREAD Thread = PsGetCurrentThread(); 00091 KPROCESSOR_MODE WaitMode; 00092 EX_QUEUE_WORKER_INFO OldValue, NewValue; 00093 00094 /* Check if this is a dyamic thread */ 00095 if ((ULONG_PTR)Context & EX_DYNAMIC_WORK_THREAD) 00096 { 00097 /* It is, which means we will eventually time out after 10 minutes */ 00098 Timeout.QuadPart = Int32x32To64(10, -10000000 * 60); 00099 TimeoutPointer = &Timeout; 00100 } 00101 00102 /* Get Queue Type and Worker Queue */ 00103 WorkQueueType = (WORK_QUEUE_TYPE)((ULONG_PTR)Context & 00104 ~EX_DYNAMIC_WORK_THREAD); 00105 WorkQueue = &ExWorkerQueue[WorkQueueType]; 00106 00107 /* Select the wait mode */ 00108 WaitMode = (UCHAR)WorkQueue->Info.WaitMode; 00109 00110 /* Nobody should have initialized this yet, do it now */ 00111 ASSERT(Thread->ExWorkerCanWaitUser == 0); 00112 if (WaitMode == UserMode) Thread->ExWorkerCanWaitUser = TRUE; 00113 00114 /* If we shouldn't swap, disable that feature */ 00115 if (!ExpWorkersCanSwap) KeSetKernelStackSwapEnable(FALSE); 00116 00117 /* Set the worker flags */ 00118 do 00119 { 00120 /* Check if the queue is being disabled */ 00121 if (WorkQueue->Info.QueueDisabled) 00122 { 00123 /* Re-enable stack swapping and kill us */ 00124 KeSetKernelStackSwapEnable(TRUE); 00125 PsTerminateSystemThread(STATUS_SYSTEM_SHUTDOWN); 00126 } 00127 00128 /* Increase the worker count */ 00129 OldValue = WorkQueue->Info; 00130 NewValue = OldValue; 00131 NewValue.WorkerCount++; 00132 } 00133 while (InterlockedCompareExchange((PLONG)&WorkQueue->Info, 00134 *(PLONG)&NewValue, 00135 *(PLONG)&OldValue) != *(PLONG)&OldValue); 00136 00137 /* Success, you are now officially a worker thread! */ 00138 Thread->ActiveExWorker = TRUE; 00139 00140 /* Loop forever */ 00141 ProcessLoop: 00142 for (;;) 00143 { 00144 /* Wait for Something to Happen on the Queue */ 00145 QueueEntry = KeRemoveQueue(&WorkQueue->WorkerQueue, 00146 WaitMode, 00147 TimeoutPointer); 00148 00149 /* Check if we timed out and quit this loop in that case */ 00150 if ((NTSTATUS)(ULONG_PTR)QueueEntry == STATUS_TIMEOUT) break; 00151 00152 /* Increment Processed Work Items */ 00153 InterlockedIncrement((PLONG)&WorkQueue->WorkItemsProcessed); 00154 00155 /* Get the Work Item */ 00156 WorkItem = CONTAINING_RECORD(QueueEntry, WORK_QUEUE_ITEM, List); 00157 00158 /* Make sure nobody is trying to play smart with us */ 00159 ASSERT((ULONG_PTR)WorkItem->WorkerRoutine > MmUserProbeAddress); 00160 00161 /* Call the Worker Routine */ 00162 WorkItem->WorkerRoutine(WorkItem->Parameter); 00163 00164 /* Make sure APCs are not disabled */ 00165 if (Thread->Tcb.SpecialApcDisable) 00166 { 00167 /* We're nice and do it behind your back */ 00168 DPRINT1("Warning: Broken Worker Thread: %p %lx %p came back " 00169 "with APCs disabled!\n", 00170 WorkItem->WorkerRoutine, 00171 WorkItem->Parameter, 00172 WorkItem); 00173 Thread->Tcb.SpecialApcDisable = 0; 00174 } 00175 00176 /* Make sure it returned at right IRQL */ 00177 if (KeGetCurrentIrql() != PASSIVE_LEVEL) 00178 { 00179 /* It didn't, bugcheck! */ 00180 KeBugCheckEx(WORKER_THREAD_RETURNED_AT_BAD_IRQL, 00181 (ULONG_PTR)WorkItem->WorkerRoutine, 00182 KeGetCurrentIrql(), 00183 (ULONG_PTR)WorkItem->Parameter, 00184 (ULONG_PTR)WorkItem); 00185 } 00186 00187 /* Make sure it returned with Impersionation Disabled */ 00188 if (Thread->ActiveImpersonationInfo) 00189 { 00190 /* It didn't, bugcheck! */ 00191 KeBugCheckEx(IMPERSONATING_WORKER_THREAD, 00192 (ULONG_PTR)WorkItem->WorkerRoutine, 00193 (ULONG_PTR)WorkItem->Parameter, 00194 (ULONG_PTR)WorkItem, 00195 0); 00196 } 00197 } 00198 00199 /* This is a dynamic thread. Terminate it unless IRPs are pending */ 00200 if (!IsListEmpty(&Thread->IrpList)) goto ProcessLoop; 00201 00202 /* Don't terminate it if the queue is disabled either */ 00203 if (WorkQueue->Info.QueueDisabled) goto ProcessLoop; 00204 00205 /* Set the worker flags */ 00206 do 00207 { 00208 /* Decrease the worker count */ 00209 OldValue = WorkQueue->Info; 00210 NewValue = OldValue; 00211 NewValue.WorkerCount--; 00212 } 00213 while (InterlockedCompareExchange((PLONG)&WorkQueue->Info, 00214 *(PLONG)&NewValue, 00215 *(PLONG)&OldValue) != *(PLONG)&OldValue); 00216 00217 /* Decrement dynamic thread count */ 00218 InterlockedDecrement(&WorkQueue->DynamicThreadCount); 00219 00220 /* We're not a worker thread anymore */ 00221 Thread->ActiveExWorker = FALSE; 00222 00223 /* Re-enable the stack swap */ 00224 KeSetKernelStackSwapEnable(TRUE); 00225 return; 00226 } 00227 00228 /*++ 00229 * @name ExpCreateWorkerThread 00230 * 00231 * The ExpCreateWorkerThread routine creates a new worker thread for the 00232 * specified queue. 00233 ** 00234 * @param QueueType 00235 * Type of the queue to use for this thread. Valid values are: 00236 * - DelayedWorkQueue 00237 * - CriticalWorkQueue 00238 * - HyperCriticalWorkQueue 00239 * 00240 * @param Dynamic 00241 * Specifies whether or not this thread is a dynamic thread. 00242 * 00243 * @return None. 00244 * 00245 * @remarks HyperCritical work threads run at priority 7; Critical work threads 00246 * run at priority 5, and delayed work threads run at priority 4. 00247 * 00248 * This, worker threads cannot pre-empty a normal user-mode thread. 00249 * 00250 *--*/ 00251 VOID 00252 NTAPI 00253 ExpCreateWorkerThread(WORK_QUEUE_TYPE WorkQueueType, 00254 IN BOOLEAN Dynamic) 00255 { 00256 PETHREAD Thread; 00257 HANDLE hThread; 00258 ULONG Context; 00259 KPRIORITY Priority; 00260 00261 /* Check if this is going to be a dynamic thread */ 00262 Context = WorkQueueType; 00263 00264 /* Add the dynamic mask */ 00265 if (Dynamic) Context |= EX_DYNAMIC_WORK_THREAD; 00266 00267 /* Create the System Thread */ 00268 PsCreateSystemThread(&hThread, 00269 THREAD_ALL_ACCESS, 00270 NULL, 00271 NULL, 00272 NULL, 00273 ExpWorkerThreadEntryPoint, 00274 UlongToPtr(Context)); 00275 00276 /* If the thread is dynamic */ 00277 if (Dynamic) 00278 { 00279 /* Increase the count */ 00280 InterlockedIncrement(&ExWorkerQueue[WorkQueueType].DynamicThreadCount); 00281 } 00282 00283 /* Set the priority */ 00284 if (WorkQueueType == DelayedWorkQueue) 00285 { 00286 /* Priority == 4 */ 00287 Priority = EX_DELAYED_QUEUE_PRIORITY_INCREMENT; 00288 } 00289 else if (WorkQueueType == CriticalWorkQueue) 00290 { 00291 /* Priority == 5 */ 00292 Priority = EX_CRITICAL_QUEUE_PRIORITY_INCREMENT; 00293 } 00294 else 00295 { 00296 /* Priority == 7 */ 00297 Priority = EX_HYPERCRITICAL_QUEUE_PRIORITY_INCREMENT; 00298 } 00299 00300 /* Get the Thread */ 00301 ObReferenceObjectByHandle(hThread, 00302 THREAD_SET_INFORMATION, 00303 PsThreadType, 00304 KernelMode, 00305 (PVOID*)&Thread, 00306 NULL); 00307 00308 /* Set the Priority */ 00309 KeSetBasePriorityThread(&Thread->Tcb, Priority); 00310 00311 /* Dereference and close handle */ 00312 ObDereferenceObject(Thread); 00313 ObCloseHandle(hThread, KernelMode); 00314 } 00315 00316 /*++ 00317 * @name ExpCheckDynamicThreadCount 00318 * 00319 * The ExpCheckDynamicThreadCount routine checks every queue and creates a 00320 * dynamic thread if the queue seems to be deadlocked. 00321 * 00322 * @param None 00323 * 00324 * @return None. 00325 * 00326 * @remarks The algorithm for deciding if a new thread must be created is 00327 * based on wether the queue has processed no new items in the last 00328 * second, and new items are still enqueued. 00329 * 00330 *--*/ 00331 VOID 00332 NTAPI 00333 ExpDetectWorkerThreadDeadlock(VOID) 00334 { 00335 ULONG i; 00336 PEX_WORK_QUEUE Queue; 00337 00338 /* Loop the 3 queues */ 00339 for (i = 0; i < MaximumWorkQueue; i++) 00340 { 00341 /* Get the queue */ 00342 Queue = &ExWorkerQueue[i]; 00343 ASSERT(Queue->DynamicThreadCount <= 16); 00344 00345 /* Check if stuff is on the queue that still is unprocessed */ 00346 if ((Queue->QueueDepthLastPass) && 00347 (Queue->WorkItemsProcessed == Queue->WorkItemsProcessedLastPass) && 00348 (Queue->DynamicThreadCount < 16)) 00349 { 00350 /* Stuff is still on the queue and nobody did anything about it */ 00351 DPRINT1("EX: Work Queue Deadlock detected: %d\n", i); 00352 ExpCreateWorkerThread(i, TRUE); 00353 DPRINT1("Dynamic threads queued %d\n", Queue->DynamicThreadCount); 00354 } 00355 00356 /* Update our data */ 00357 Queue->WorkItemsProcessedLastPass = Queue->WorkItemsProcessed; 00358 Queue->QueueDepthLastPass = KeReadStateQueue(&Queue->WorkerQueue); 00359 } 00360 } 00361 00362 /*++ 00363 * @name ExpCheckDynamicThreadCount 00364 * 00365 * The ExpCheckDynamicThreadCount routine checks every queue and creates a 00366 * dynamic thread if the queue requires one. 00367 * 00368 * @param None 00369 * 00370 * @return None. 00371 * 00372 * @remarks The algorithm for deciding if a new thread must be created is 00373 * documented in the ExQueueWorkItem routine. 00374 * 00375 *--*/ 00376 VOID 00377 NTAPI 00378 ExpCheckDynamicThreadCount(VOID) 00379 { 00380 ULONG i; 00381 PEX_WORK_QUEUE Queue; 00382 00383 /* Loop the 3 queues */ 00384 for (i = 0; i < MaximumWorkQueue; i++) 00385 { 00386 /* Get the queue */ 00387 Queue = &ExWorkerQueue[i]; 00388 00389 /* Check if still need a new thread. See ExQueueWorkItem */ 00390 if ((Queue->Info.MakeThreadsAsNecessary) && 00391 (!IsListEmpty(&Queue->WorkerQueue.EntryListHead)) && 00392 (Queue->WorkerQueue.CurrentCount < 00393 Queue->WorkerQueue.MaximumCount) && 00394 (Queue->DynamicThreadCount < 16)) 00395 { 00396 /* Create a new thread */ 00397 DPRINT1("EX: Creating new dynamic thread as requested\n"); 00398 ExpCreateWorkerThread(i, TRUE); 00399 } 00400 } 00401 } 00402 00403 /*++ 00404 * @name ExpWorkerThreadBalanceManager 00405 * 00406 * The ExpWorkerThreadBalanceManager routine is the entrypoint for the 00407 * worker thread balance set manager. 00408 * 00409 * @param Context 00410 * Unused. 00411 * 00412 * @return None. 00413 * 00414 * @remarks The worker thread balance set manager listens every second, but can 00415 * also be woken up by an event when a new thread is needed, or by the 00416 * special shutdown event. This thread runs at priority 7. 00417 * 00418 * This routine must run at IRQL == PASSIVE_LEVEL. 00419 * 00420 *--*/ 00421 VOID 00422 NTAPI 00423 ExpWorkerThreadBalanceManager(IN PVOID Context) 00424 { 00425 KTIMER Timer; 00426 LARGE_INTEGER Timeout; 00427 NTSTATUS Status; 00428 PVOID WaitEvents[3]; 00429 PAGED_CODE(); 00430 UNREFERENCED_PARAMETER(Context); 00431 00432 /* Raise our priority above all other worker threads */ 00433 KeSetBasePriorityThread(KeGetCurrentThread(), 00434 EX_CRITICAL_QUEUE_PRIORITY_INCREMENT + 1); 00435 00436 /* Setup the timer */ 00437 KeInitializeTimer(&Timer); 00438 Timeout.QuadPart = Int32x32To64(-1, 10000000); 00439 00440 /* We'll wait on the periodic timer and also the emergency event */ 00441 WaitEvents[0] = &Timer; 00442 WaitEvents[1] = &ExpThreadSetManagerEvent; 00443 WaitEvents[2] = &ExpThreadSetManagerShutdownEvent; 00444 00445 /* Start wait loop */ 00446 for (;;) 00447 { 00448 /* Wait for the timer */ 00449 KeSetTimer(&Timer, Timeout, NULL); 00450 Status = KeWaitForMultipleObjects(3, 00451 WaitEvents, 00452 WaitAny, 00453 Executive, 00454 KernelMode, 00455 FALSE, 00456 NULL, 00457 NULL); 00458 if (Status == 0) 00459 { 00460 /* Our timer expired. Check for deadlocks */ 00461 ExpDetectWorkerThreadDeadlock(); 00462 } 00463 else if (Status == 1) 00464 { 00465 /* Someone notified us, verify if we should create a new thread */ 00466 ExpCheckDynamicThreadCount(); 00467 } 00468 else if (Status == 2) 00469 { 00470 /* We are shutting down. Cancel the timer */ 00471 DPRINT1("System shutdown\n"); 00472 KeCancelTimer(&Timer); 00473 00474 /* Make sure we have a final thread */ 00475 ASSERT(ExpLastWorkerThread); 00476 00477 /* Wait for it */ 00478 KeWaitForSingleObject(ExpLastWorkerThread, 00479 Executive, 00480 KernelMode, 00481 FALSE, 00482 NULL); 00483 00484 /* Dereference it and kill us */ 00485 ObDereferenceObject(ExpLastWorkerThread); 00486 PsTerminateSystemThread(STATUS_SYSTEM_SHUTDOWN); 00487 } 00488 } 00489 } 00490 00491 /*++ 00492 * @name ExpInitializeWorkerThreads 00493 * 00494 * The ExpInitializeWorkerThreads routine initializes worker thread and 00495 * work queue support. 00496 * 00497 * @param None. 00498 * 00499 * @return None. 00500 * 00501 * @remarks This routine is only called once during system initialization. 00502 * 00503 *--*/ 00504 VOID 00505 INIT_FUNCTION 00506 NTAPI 00507 ExpInitializeWorkerThreads(VOID) 00508 { 00509 ULONG WorkQueueType; 00510 ULONG CriticalThreads, DelayedThreads; 00511 HANDLE ThreadHandle; 00512 PETHREAD Thread; 00513 ULONG i; 00514 00515 /* Setup the stack swap support */ 00516 ExInitializeFastMutex(&ExpWorkerSwapinMutex); 00517 InitializeListHead(&ExpWorkerListHead); 00518 ExpWorkersCanSwap = TRUE; 00519 00520 /* Set the number of critical and delayed threads. We shouldn't hardcode */ 00521 DelayedThreads = EX_DELAYED_WORK_THREADS; 00522 CriticalThreads = EX_CRITICAL_WORK_THREADS; 00523 00524 /* Protect against greedy registry modifications */ 00525 ExpAdditionalDelayedWorkerThreads = 00526 min(ExpAdditionalDelayedWorkerThreads, 16); 00527 ExpAdditionalCriticalWorkerThreads = 00528 min(ExpAdditionalCriticalWorkerThreads, 16); 00529 00530 /* Calculate final count */ 00531 DelayedThreads += ExpAdditionalDelayedWorkerThreads; 00532 CriticalThreads += ExpAdditionalCriticalWorkerThreads; 00533 00534 /* Initialize the Array */ 00535 for (WorkQueueType = 0; WorkQueueType < MaximumWorkQueue; WorkQueueType++) 00536 { 00537 /* Clear the structure and initialize the queue */ 00538 RtlZeroMemory(&ExWorkerQueue[WorkQueueType], sizeof(EX_WORK_QUEUE)); 00539 KeInitializeQueue(&ExWorkerQueue[WorkQueueType].WorkerQueue, 0); 00540 } 00541 00542 /* Dynamic threads are only used for the critical queue */ 00543 ExWorkerQueue[CriticalWorkQueue].Info.MakeThreadsAsNecessary = TRUE; 00544 00545 /* Initialize the balance set manager events */ 00546 KeInitializeEvent(&ExpThreadSetManagerEvent, SynchronizationEvent, FALSE); 00547 KeInitializeEvent(&ExpThreadSetManagerShutdownEvent, 00548 NotificationEvent, 00549 FALSE); 00550 00551 /* Create the built-in worker threads for the critical queue */ 00552 for (i = 0; i < CriticalThreads; i++) 00553 { 00554 /* Create the thread */ 00555 ExpCreateWorkerThread(CriticalWorkQueue, FALSE); 00556 ExpCriticalWorkerThreads++; 00557 } 00558 00559 /* Create the built-in worker threads for the delayed queue */ 00560 for (i = 0; i < DelayedThreads; i++) 00561 { 00562 /* Create the thread */ 00563 ExpCreateWorkerThread(DelayedWorkQueue, FALSE); 00564 ExpDelayedWorkerThreads++; 00565 } 00566 00567 /* Create the built-in worker thread for the hypercritical queue */ 00568 ExpCreateWorkerThread(HyperCriticalWorkQueue, FALSE); 00569 00570 /* Create the balance set manager thread */ 00571 PsCreateSystemThread(&ThreadHandle, 00572 THREAD_ALL_ACCESS, 00573 NULL, 00574 0, 00575 NULL, 00576 ExpWorkerThreadBalanceManager, 00577 NULL); 00578 00579 /* Get a pointer to it for the shutdown process */ 00580 ObReferenceObjectByHandle(ThreadHandle, 00581 THREAD_ALL_ACCESS, 00582 NULL, 00583 KernelMode, 00584 (PVOID*)&Thread, 00585 NULL); 00586 ExpWorkerThreadBalanceManagerPtr = Thread; 00587 00588 /* Close the handle and return */ 00589 ObCloseHandle(ThreadHandle, KernelMode); 00590 } 00591 00592 VOID 00593 NTAPI 00594 ExpSetSwappingKernelApc(IN PKAPC Apc, 00595 OUT PKNORMAL_ROUTINE *NormalRoutine, 00596 IN OUT PVOID *NormalContext, 00597 IN OUT PVOID *SystemArgument1, 00598 IN OUT PVOID *SystemArgument2) 00599 { 00600 PBOOLEAN AllowSwap; 00601 PKEVENT Event = (PKEVENT)*SystemArgument1; 00602 00603 /* Make sure it's an active worker */ 00604 if (PsGetCurrentThread()->ActiveExWorker) 00605 { 00606 /* Read the setting from the context flag */ 00607 AllowSwap = (PBOOLEAN)NormalContext; 00608 KeSetKernelStackSwapEnable(*AllowSwap); 00609 } 00610 00611 /* Let caller know that we're done */ 00612 KeSetEvent(Event, 0, FALSE); 00613 } 00614 00615 VOID 00616 NTAPI 00617 ExSwapinWorkerThreads(IN BOOLEAN AllowSwap) 00618 { 00619 KEVENT Event; 00620 PETHREAD CurrentThread = PsGetCurrentThread(), Thread; 00621 PEPROCESS Process = PsInitialSystemProcess; 00622 KAPC Apc; 00623 PAGED_CODE(); 00624 00625 /* Initialize an event so we know when we're done */ 00626 KeInitializeEvent(&Event, NotificationEvent, FALSE); 00627 00628 /* Lock this routine */ 00629 ExAcquireFastMutex(&ExpWorkerSwapinMutex); 00630 00631 /* New threads cannot swap anymore */ 00632 ExpWorkersCanSwap = AllowSwap; 00633 00634 /* Loop all threads in the system process */ 00635 Thread = PsGetNextProcessThread(Process, NULL); 00636 while (Thread) 00637 { 00638 /* Skip threads with explicit permission to do this */ 00639 if (Thread->ExWorkerCanWaitUser) goto Next; 00640 00641 /* Check if we reached ourselves */ 00642 if (Thread == CurrentThread) 00643 { 00644 /* Do it inline */ 00645 KeSetKernelStackSwapEnable(AllowSwap); 00646 } 00647 else 00648 { 00649 /* Queue an APC */ 00650 KeInitializeApc(&Apc, 00651 &Thread->Tcb, 00652 InsertApcEnvironment, 00653 ExpSetSwappingKernelApc, 00654 NULL, 00655 NULL, 00656 KernelMode, 00657 &AllowSwap); 00658 if (KeInsertQueueApc(&Apc, &Event, NULL, 3)) 00659 { 00660 /* Wait for the APC to run */ 00661 KeWaitForSingleObject(&Event, Executive, KernelMode, FALSE, NULL); 00662 KeClearEvent(&Event); 00663 } 00664 } 00665 00666 /* Next thread */ 00667 Next: 00668 Thread = PsGetNextProcessThread(Process, Thread); 00669 } 00670 00671 /* Release the lock */ 00672 ExReleaseFastMutex(&ExpWorkerSwapinMutex); 00673 } 00674 00675 /* PUBLIC FUNCTIONS **********************************************************/ 00676 00677 /*++ 00678 * @name ExQueueWorkItem 00679 * @implemented NT4 00680 * 00681 * The ExQueueWorkItem routine acquires rundown protection for 00682 * the specified descriptor. 00683 * 00684 * @param WorkItem 00685 * Pointer to an initialized Work Queue Item structure. This structure 00686 * must be located in nonpaged pool memory. 00687 * 00688 * @param QueueType 00689 * Type of the queue to use for this item. Can be one of the following: 00690 * - DelayedWorkQueue 00691 * - CriticalWorkQueue 00692 * - HyperCriticalWorkQueue 00693 * 00694 * @return None. 00695 * 00696 * @remarks This routine is obsolete. Use IoQueueWorkItem instead. 00697 * 00698 * Callers of this routine must be running at IRQL <= DISPATCH_LEVEL. 00699 * 00700 *--*/ 00701 VOID 00702 NTAPI 00703 ExQueueWorkItem(IN PWORK_QUEUE_ITEM WorkItem, 00704 IN WORK_QUEUE_TYPE QueueType) 00705 { 00706 PEX_WORK_QUEUE WorkQueue = &ExWorkerQueue[QueueType]; 00707 ASSERT(QueueType < MaximumWorkQueue); 00708 ASSERT(WorkItem->List.Flink == NULL); 00709 00710 /* Don't try to trick us */ 00711 if ((ULONG_PTR)WorkItem->WorkerRoutine < MmUserProbeAddress) 00712 { 00713 /* Bugcheck the system */ 00714 KeBugCheckEx(WORKER_INVALID, 00715 1, 00716 (ULONG_PTR)WorkItem, 00717 (ULONG_PTR)WorkItem->WorkerRoutine, 00718 0); 00719 } 00720 00721 /* Insert the Queue */ 00722 KeInsertQueue(&WorkQueue->WorkerQueue, &WorkItem->List); 00723 ASSERT(!WorkQueue->Info.QueueDisabled); 00724 00725 /* 00726 * Check if we need a new thread. Our decision is as follows: 00727 * - This queue type must support Dynamic Threads (duh!) 00728 * - It actually has to have unprocessed items 00729 * - We have CPUs which could be handling another thread 00730 * - We haven't abused our usage of dynamic threads. 00731 */ 00732 if ((WorkQueue->Info.MakeThreadsAsNecessary) && 00733 (!IsListEmpty(&WorkQueue->WorkerQueue.EntryListHead)) && 00734 (WorkQueue->WorkerQueue.CurrentCount < 00735 WorkQueue->WorkerQueue.MaximumCount) && 00736 (WorkQueue->DynamicThreadCount < 16)) 00737 { 00738 /* Let the balance manager know about it */ 00739 DPRINT1("Requesting a new thread. CurrentCount: %d. MaxCount: %d\n", 00740 WorkQueue->WorkerQueue.CurrentCount, 00741 WorkQueue->WorkerQueue.MaximumCount); 00742 KeSetEvent(&ExpThreadSetManagerEvent, 0, FALSE); 00743 } 00744 } 00745 00746 /* EOF */ 00747 Generated on Sun May 27 2012 04:37:10 for ReactOS by
1.7.6.1
|