ReactOS Fundraising Campaign 2012
 
€ 4,410 / € 30,000

Information | Donate

Home | Info | Community | Development | myReactOS | Contact Us

  1. Home
  2. Community
  3. Development
  4. myReactOS
  5. Fundraiser 2012

  1. Main Page
  2. Alphabetical List
  3. Data Structures
  4. Directories
  5. File List
  6. Data Fields
  7. Globals
  8. Related Pages

ReactOS Development > Doxygen

work.c
Go to the documentation of this file.
00001 /*
00002  * COPYRIGHT:          See COPYING in the top level directory
00003  * PROJECT:            ReactOS Kernel
00004  * FILE:               ntoskrnl/ex/work.c
00005  * PURPOSE:            Manage system work queues and worker threads
00006  * PROGRAMMER:         Alex Ionescu (alex@relsoft.net)
00007  */
00008 
00009 /* INCLUDES ******************************************************************/
00010 
00011 #include <ntoskrnl.h>
00012 #define NDEBUG
00013 #include <debug.h>
00014 
00015 #if defined (ALLOC_PRAGMA)
00016 #pragma alloc_text(INIT, ExpInitializeWorkerThreads)
00017 #endif
00018 
00019 /* DATA **********************************************************************/
00020 
00021 /* Number of worker threads for each Queue */
00022 #define EX_HYPERCRITICAL_WORK_THREADS               1
00023 #define EX_DELAYED_WORK_THREADS                     3
00024 #define EX_CRITICAL_WORK_THREADS                    5
00025 
00026 /* Magic flag for dynamic worker threads */
00027 #define EX_DYNAMIC_WORK_THREAD                      0x80000000
00028 
00029 /* Worker thread priority increments (added to base priority) */
00030 #define EX_HYPERCRITICAL_QUEUE_PRIORITY_INCREMENT   7
00031 #define EX_CRITICAL_QUEUE_PRIORITY_INCREMENT        5
00032 #define EX_DELAYED_QUEUE_PRIORITY_INCREMENT         4
00033 
00034 /* The actual worker queue array */
00035 EX_WORK_QUEUE ExWorkerQueue[MaximumWorkQueue];
00036 
00037 /* Accounting of the total threads and registry hacked threads */
00038 ULONG ExpCriticalWorkerThreads;
00039 ULONG ExpDelayedWorkerThreads;
00040 ULONG ExpAdditionalCriticalWorkerThreads;
00041 ULONG ExpAdditionalDelayedWorkerThreads;
00042 
00043 /* Future support for stack swapping worker threads */
00044 BOOLEAN ExpWorkersCanSwap;
00045 LIST_ENTRY ExpWorkerListHead;
00046 FAST_MUTEX ExpWorkerSwapinMutex;
00047 
00048 /* The worker balance set manager events */
00049 KEVENT ExpThreadSetManagerEvent;
00050 KEVENT ExpThreadSetManagerShutdownEvent;
00051 
00052 /* Thread pointers for future worker thread shutdown support */
00053 PETHREAD ExpWorkerThreadBalanceManagerPtr;
00054 PETHREAD ExpLastWorkerThread;
00055 
00056 /* PRIVATE FUNCTIONS *********************************************************/
00057 
00058 /*++
00059  * @name ExpWorkerThreadEntryPoint
00060  *
00061  *     The ExpWorkerThreadEntryPoint routine is the entrypoint for any new
00062  *     worker thread created by teh system.
00063  *
00064  * @param Context
00065  *        Contains the work queue type masked with a flag specifing whether the
00066  *        thread is dynamic or not.
00067  *
00068  * @return None.
00069  *
00070  * @remarks A dynamic thread can timeout after 10 minutes of waiting on a queue
00071  *          while a static thread will never timeout.
00072  *
00073  *          Worker threads must return at IRQL == PASSIVE_LEVEL, must not have
00074  *          active impersonation info, and must not have disabled APCs.
00075  *
00076  *          NB: We will re-enable APCs for broken threads but all other cases
00077  *              will generate a bugcheck.
00078  *
00079  *--*/
00080 VOID
00081 NTAPI
00082 ExpWorkerThreadEntryPoint(IN PVOID Context)
00083 {
00084     PWORK_QUEUE_ITEM WorkItem;
00085     PLIST_ENTRY QueueEntry;
00086     WORK_QUEUE_TYPE WorkQueueType;
00087     PEX_WORK_QUEUE WorkQueue;
00088     LARGE_INTEGER Timeout;
00089     PLARGE_INTEGER TimeoutPointer = NULL;
00090     PETHREAD Thread = PsGetCurrentThread();
00091     KPROCESSOR_MODE WaitMode;
00092     EX_QUEUE_WORKER_INFO OldValue, NewValue;
00093 
00094     /* Check if this is a dyamic thread */
00095     if ((ULONG_PTR)Context & EX_DYNAMIC_WORK_THREAD)
00096     {
00097         /* It is, which means we will eventually time out after 10 minutes */
00098         Timeout.QuadPart = Int32x32To64(10, -10000000 * 60);
00099         TimeoutPointer = &Timeout;
00100     }
00101 
00102     /* Get Queue Type and Worker Queue */
00103     WorkQueueType = (WORK_QUEUE_TYPE)((ULONG_PTR)Context &
00104                                       ~EX_DYNAMIC_WORK_THREAD);
00105     WorkQueue = &ExWorkerQueue[WorkQueueType];
00106 
00107     /* Select the wait mode */
00108     WaitMode = (UCHAR)WorkQueue->Info.WaitMode;
00109 
00110     /* Nobody should have initialized this yet, do it now */
00111     ASSERT(Thread->ExWorkerCanWaitUser == 0);
00112     if (WaitMode == UserMode) Thread->ExWorkerCanWaitUser = TRUE;
00113 
00114     /* If we shouldn't swap, disable that feature */
00115     if (!ExpWorkersCanSwap) KeSetKernelStackSwapEnable(FALSE);
00116 
00117     /* Set the worker flags */
00118     do
00119     {
00120         /* Check if the queue is being disabled */
00121         if (WorkQueue->Info.QueueDisabled)
00122         {
00123             /* Re-enable stack swapping and kill us */
00124             KeSetKernelStackSwapEnable(TRUE);
00125             PsTerminateSystemThread(STATUS_SYSTEM_SHUTDOWN);
00126         }
00127 
00128         /* Increase the worker count */
00129         OldValue = WorkQueue->Info;
00130         NewValue = OldValue;
00131         NewValue.WorkerCount++;
00132     }
00133     while (InterlockedCompareExchange((PLONG)&WorkQueue->Info,
00134                                       *(PLONG)&NewValue,
00135                                       *(PLONG)&OldValue) != *(PLONG)&OldValue);
00136 
00137     /* Success, you are now officially a worker thread! */
00138     Thread->ActiveExWorker = TRUE;
00139 
00140     /* Loop forever */
00141 ProcessLoop:
00142     for (;;)
00143     {
00144         /* Wait for Something to Happen on the Queue */
00145         QueueEntry = KeRemoveQueue(&WorkQueue->WorkerQueue,
00146                                    WaitMode,
00147                                    TimeoutPointer);
00148 
00149         /* Check if we timed out and quit this loop in that case */
00150         if ((NTSTATUS)(ULONG_PTR)QueueEntry == STATUS_TIMEOUT) break;
00151 
00152         /* Increment Processed Work Items */
00153         InterlockedIncrement((PLONG)&WorkQueue->WorkItemsProcessed);
00154 
00155         /* Get the Work Item */
00156         WorkItem = CONTAINING_RECORD(QueueEntry, WORK_QUEUE_ITEM, List);
00157 
00158         /* Make sure nobody is trying to play smart with us */
00159         ASSERT((ULONG_PTR)WorkItem->WorkerRoutine > MmUserProbeAddress);
00160 
00161         /* Call the Worker Routine */
00162         WorkItem->WorkerRoutine(WorkItem->Parameter);
00163 
00164         /* Make sure APCs are not disabled */
00165         if (Thread->Tcb.SpecialApcDisable)
00166         {
00167             /* We're nice and do it behind your back */
00168             DPRINT1("Warning: Broken Worker Thread: %p %lx %p came back "
00169                     "with APCs disabled!\n",
00170                     WorkItem->WorkerRoutine,
00171                     WorkItem->Parameter,
00172                     WorkItem);
00173             Thread->Tcb.SpecialApcDisable = 0;
00174         }
00175 
00176         /* Make sure it returned at right IRQL */
00177         if (KeGetCurrentIrql() != PASSIVE_LEVEL)
00178         {
00179             /* It didn't, bugcheck! */
00180             KeBugCheckEx(WORKER_THREAD_RETURNED_AT_BAD_IRQL,
00181                          (ULONG_PTR)WorkItem->WorkerRoutine,
00182                          KeGetCurrentIrql(),
00183                          (ULONG_PTR)WorkItem->Parameter,
00184                          (ULONG_PTR)WorkItem);
00185         }
00186 
00187         /* Make sure it returned with Impersionation Disabled */
00188         if (Thread->ActiveImpersonationInfo)
00189         {
00190             /* It didn't, bugcheck! */
00191             KeBugCheckEx(IMPERSONATING_WORKER_THREAD,
00192                          (ULONG_PTR)WorkItem->WorkerRoutine,
00193                          (ULONG_PTR)WorkItem->Parameter,
00194                          (ULONG_PTR)WorkItem,
00195                          0);
00196         }
00197     }
00198 
00199     /* This is a dynamic thread. Terminate it unless IRPs are pending */
00200     if (!IsListEmpty(&Thread->IrpList)) goto ProcessLoop;
00201 
00202     /* Don't terminate it if the queue is disabled either */
00203     if (WorkQueue->Info.QueueDisabled) goto ProcessLoop;
00204 
00205     /* Set the worker flags */
00206     do
00207     {
00208         /* Decrease the worker count */
00209         OldValue = WorkQueue->Info;
00210         NewValue = OldValue;
00211         NewValue.WorkerCount--;
00212     }
00213     while (InterlockedCompareExchange((PLONG)&WorkQueue->Info,
00214                                       *(PLONG)&NewValue,
00215                                       *(PLONG)&OldValue) != *(PLONG)&OldValue);
00216 
00217     /* Decrement dynamic thread count */
00218     InterlockedDecrement(&WorkQueue->DynamicThreadCount);
00219 
00220     /* We're not a worker thread anymore */
00221     Thread->ActiveExWorker = FALSE;
00222 
00223     /* Re-enable the stack swap */
00224     KeSetKernelStackSwapEnable(TRUE);
00225     return;
00226 }
00227 
00228 /*++
00229  * @name ExpCreateWorkerThread
00230  *
00231  *     The ExpCreateWorkerThread routine creates a new worker thread for the
00232  *     specified queue.
00233  **
00234  * @param QueueType
00235  *        Type of the queue to use for this thread. Valid values are:
00236  *          - DelayedWorkQueue
00237  *          - CriticalWorkQueue
00238  *          - HyperCriticalWorkQueue
00239  *
00240  * @param Dynamic
00241  *        Specifies whether or not this thread is a dynamic thread.
00242  *
00243  * @return None.
00244  *
00245  * @remarks HyperCritical work threads run at priority 7; Critical work threads
00246  *          run at priority 5, and delayed work threads run at priority 4.
00247  *
00248  *          This, worker threads cannot pre-empty a normal user-mode thread.
00249  *
00250  *--*/
00251 VOID
00252 NTAPI
00253 ExpCreateWorkerThread(WORK_QUEUE_TYPE WorkQueueType,
00254                       IN BOOLEAN Dynamic)
00255 {
00256     PETHREAD Thread;
00257     HANDLE hThread;
00258     ULONG Context;
00259     KPRIORITY Priority;
00260 
00261     /* Check if this is going to be a dynamic thread */
00262     Context = WorkQueueType;
00263 
00264     /* Add the dynamic mask */
00265     if (Dynamic) Context |= EX_DYNAMIC_WORK_THREAD;
00266 
00267     /* Create the System Thread */
00268     PsCreateSystemThread(&hThread,
00269                          THREAD_ALL_ACCESS,
00270                          NULL,
00271                          NULL,
00272                          NULL,
00273                          ExpWorkerThreadEntryPoint,
00274                          UlongToPtr(Context));
00275 
00276     /* If the thread is dynamic */
00277     if (Dynamic)
00278     {
00279         /* Increase the count */
00280         InterlockedIncrement(&ExWorkerQueue[WorkQueueType].DynamicThreadCount);
00281     }
00282 
00283     /* Set the priority */
00284     if (WorkQueueType == DelayedWorkQueue)
00285     {
00286         /* Priority == 4 */
00287         Priority = EX_DELAYED_QUEUE_PRIORITY_INCREMENT;
00288     }
00289     else if (WorkQueueType == CriticalWorkQueue)
00290     {
00291         /* Priority == 5 */
00292         Priority = EX_CRITICAL_QUEUE_PRIORITY_INCREMENT;
00293     }
00294     else
00295     {
00296         /* Priority == 7 */
00297         Priority = EX_HYPERCRITICAL_QUEUE_PRIORITY_INCREMENT;
00298     }
00299 
00300     /* Get the Thread */
00301     ObReferenceObjectByHandle(hThread,
00302                               THREAD_SET_INFORMATION,
00303                               PsThreadType,
00304                               KernelMode,
00305                               (PVOID*)&Thread,
00306                               NULL);
00307 
00308     /* Set the Priority */
00309     KeSetBasePriorityThread(&Thread->Tcb, Priority);
00310 
00311     /* Dereference and close handle */
00312     ObDereferenceObject(Thread);
00313     ObCloseHandle(hThread, KernelMode);
00314 }
00315 
00316 /*++
00317  * @name ExpCheckDynamicThreadCount
00318  *
00319  *     The ExpCheckDynamicThreadCount routine checks every queue and creates a
00320  *     dynamic thread if the queue seems to be deadlocked.
00321  *
00322  * @param None
00323  *
00324  * @return None.
00325  *
00326  * @remarks The algorithm for deciding if a new thread must be created is
00327  *          based on wether the queue has processed no new items in the last
00328  *          second, and new items are still enqueued.
00329  *
00330  *--*/
00331 VOID
00332 NTAPI
00333 ExpDetectWorkerThreadDeadlock(VOID)
00334 {
00335     ULONG i;
00336     PEX_WORK_QUEUE Queue;
00337 
00338     /* Loop the 3 queues */
00339     for (i = 0; i < MaximumWorkQueue; i++)
00340     {
00341         /* Get the queue */
00342         Queue = &ExWorkerQueue[i];
00343         ASSERT(Queue->DynamicThreadCount <= 16);
00344 
00345         /* Check if stuff is on the queue that still is unprocessed */
00346         if ((Queue->QueueDepthLastPass) &&
00347             (Queue->WorkItemsProcessed == Queue->WorkItemsProcessedLastPass) &&
00348             (Queue->DynamicThreadCount < 16))
00349         {
00350             /* Stuff is still on the queue and nobody did anything about it */
00351             DPRINT1("EX: Work Queue Deadlock detected: %d\n", i);
00352             ExpCreateWorkerThread(i, TRUE);
00353             DPRINT1("Dynamic threads queued %d\n", Queue->DynamicThreadCount);
00354         }
00355 
00356         /* Update our data */
00357         Queue->WorkItemsProcessedLastPass = Queue->WorkItemsProcessed;
00358         Queue->QueueDepthLastPass = KeReadStateQueue(&Queue->WorkerQueue);
00359     }
00360 }
00361 
00362 /*++
00363  * @name ExpCheckDynamicThreadCount
00364  *
00365  *     The ExpCheckDynamicThreadCount routine checks every queue and creates a
00366  *     dynamic thread if the queue requires one.
00367  *
00368  * @param None
00369  *
00370  * @return None.
00371  *
00372  * @remarks The algorithm for deciding if a new thread must be created is
00373  *          documented in the ExQueueWorkItem routine.
00374  *
00375  *--*/
00376 VOID
00377 NTAPI
00378 ExpCheckDynamicThreadCount(VOID)
00379 {
00380     ULONG i;
00381     PEX_WORK_QUEUE Queue;
00382 
00383     /* Loop the 3 queues */
00384     for (i = 0; i < MaximumWorkQueue; i++)
00385     {
00386         /* Get the queue */
00387         Queue = &ExWorkerQueue[i];
00388 
00389         /* Check if still need a new thread. See ExQueueWorkItem */
00390         if ((Queue->Info.MakeThreadsAsNecessary) &&
00391             (!IsListEmpty(&Queue->WorkerQueue.EntryListHead)) &&
00392             (Queue->WorkerQueue.CurrentCount <
00393              Queue->WorkerQueue.MaximumCount) &&
00394             (Queue->DynamicThreadCount < 16))
00395         {
00396             /* Create a new thread */
00397             DPRINT1("EX: Creating new dynamic thread as requested\n");
00398             ExpCreateWorkerThread(i, TRUE);
00399         }
00400     }
00401 }
00402 
00403 /*++
00404  * @name ExpWorkerThreadBalanceManager
00405  *
00406  *     The ExpWorkerThreadBalanceManager routine is the entrypoint for the
00407  *     worker thread balance set manager.
00408  *
00409  * @param Context
00410  *        Unused.
00411  *
00412  * @return None.
00413  *
00414  * @remarks The worker thread balance set manager listens every second, but can
00415  *          also be woken up by an event when a new thread is needed, or by the
00416  *          special shutdown event. This thread runs at priority 7.
00417  *
00418  *          This routine must run at IRQL == PASSIVE_LEVEL.
00419  *
00420  *--*/
00421 VOID
00422 NTAPI
00423 ExpWorkerThreadBalanceManager(IN PVOID Context)
00424 {
00425     KTIMER Timer;
00426     LARGE_INTEGER Timeout;
00427     NTSTATUS Status;
00428     PVOID WaitEvents[3];
00429     PAGED_CODE();
00430     UNREFERENCED_PARAMETER(Context);
00431 
00432     /* Raise our priority above all other worker threads */
00433     KeSetBasePriorityThread(KeGetCurrentThread(),
00434                             EX_CRITICAL_QUEUE_PRIORITY_INCREMENT + 1);
00435 
00436     /* Setup the timer */
00437     KeInitializeTimer(&Timer);
00438     Timeout.QuadPart = Int32x32To64(-1, 10000000);
00439 
00440     /* We'll wait on the periodic timer and also the emergency event */
00441     WaitEvents[0] = &Timer;
00442     WaitEvents[1] = &ExpThreadSetManagerEvent;
00443     WaitEvents[2] = &ExpThreadSetManagerShutdownEvent;
00444 
00445     /* Start wait loop */
00446     for (;;)
00447     {
00448         /* Wait for the timer */
00449         KeSetTimer(&Timer, Timeout, NULL);
00450         Status = KeWaitForMultipleObjects(3,
00451                                           WaitEvents,
00452                                           WaitAny,
00453                                           Executive,
00454                                           KernelMode,
00455                                           FALSE,
00456                                           NULL,
00457                                           NULL);
00458         if (Status == 0)
00459         {
00460             /* Our timer expired. Check for deadlocks */
00461             ExpDetectWorkerThreadDeadlock();
00462         }
00463         else if (Status == 1)
00464         {
00465             /* Someone notified us, verify if we should create a new thread */
00466             ExpCheckDynamicThreadCount();
00467         }
00468         else if (Status == 2)
00469         {
00470             /* We are shutting down. Cancel the timer */
00471             DPRINT1("System shutdown\n");
00472             KeCancelTimer(&Timer);
00473 
00474             /* Make sure we have a final thread */
00475             ASSERT(ExpLastWorkerThread);
00476 
00477             /* Wait for it */
00478             KeWaitForSingleObject(ExpLastWorkerThread,
00479                                   Executive,
00480                                   KernelMode,
00481                                   FALSE,
00482                                   NULL);
00483 
00484             /* Dereference it and kill us */
00485             ObDereferenceObject(ExpLastWorkerThread);
00486             PsTerminateSystemThread(STATUS_SYSTEM_SHUTDOWN);
00487         }
00488     }
00489 }
00490 
00491 /*++
00492  * @name ExpInitializeWorkerThreads
00493  *
00494  *     The ExpInitializeWorkerThreads routine initializes worker thread and
00495  *     work queue support.
00496  *
00497  * @param None.
00498  *
00499  * @return None.
00500  *
00501  * @remarks This routine is only called once during system initialization.
00502  *
00503  *--*/
00504 VOID
00505 INIT_FUNCTION
00506 NTAPI
00507 ExpInitializeWorkerThreads(VOID)
00508 {
00509     ULONG WorkQueueType;
00510     ULONG CriticalThreads, DelayedThreads;
00511     HANDLE ThreadHandle;
00512     PETHREAD Thread;
00513     ULONG i;
00514 
00515     /* Setup the stack swap support */
00516     ExInitializeFastMutex(&ExpWorkerSwapinMutex);
00517     InitializeListHead(&ExpWorkerListHead);
00518     ExpWorkersCanSwap = TRUE;
00519 
00520     /* Set the number of critical and delayed threads. We shouldn't hardcode */
00521     DelayedThreads = EX_DELAYED_WORK_THREADS;
00522     CriticalThreads = EX_CRITICAL_WORK_THREADS;
00523 
00524     /* Protect against greedy registry modifications */
00525     ExpAdditionalDelayedWorkerThreads =
00526         min(ExpAdditionalDelayedWorkerThreads, 16);
00527     ExpAdditionalCriticalWorkerThreads =
00528         min(ExpAdditionalCriticalWorkerThreads, 16);
00529 
00530     /* Calculate final count */
00531     DelayedThreads += ExpAdditionalDelayedWorkerThreads;
00532     CriticalThreads += ExpAdditionalCriticalWorkerThreads;
00533 
00534     /* Initialize the Array */
00535     for (WorkQueueType = 0; WorkQueueType < MaximumWorkQueue; WorkQueueType++)
00536     {
00537         /* Clear the structure and initialize the queue */
00538         RtlZeroMemory(&ExWorkerQueue[WorkQueueType], sizeof(EX_WORK_QUEUE));
00539         KeInitializeQueue(&ExWorkerQueue[WorkQueueType].WorkerQueue, 0);
00540     }
00541 
00542     /* Dynamic threads are only used for the critical queue */
00543     ExWorkerQueue[CriticalWorkQueue].Info.MakeThreadsAsNecessary = TRUE;
00544 
00545     /* Initialize the balance set manager events */
00546     KeInitializeEvent(&ExpThreadSetManagerEvent, SynchronizationEvent, FALSE);
00547     KeInitializeEvent(&ExpThreadSetManagerShutdownEvent,
00548                       NotificationEvent,
00549                       FALSE);
00550 
00551     /* Create the built-in worker threads for the critical queue */
00552     for (i = 0; i < CriticalThreads; i++)
00553     {
00554         /* Create the thread */
00555         ExpCreateWorkerThread(CriticalWorkQueue, FALSE);
00556         ExpCriticalWorkerThreads++;
00557     }
00558 
00559     /* Create the built-in worker threads for the delayed queue */
00560     for (i = 0; i < DelayedThreads; i++)
00561     {
00562         /* Create the thread */
00563         ExpCreateWorkerThread(DelayedWorkQueue, FALSE);
00564         ExpDelayedWorkerThreads++;
00565     }
00566 
00567     /* Create the built-in worker thread for the hypercritical queue */
00568     ExpCreateWorkerThread(HyperCriticalWorkQueue, FALSE);
00569 
00570     /* Create the balance set manager thread */
00571     PsCreateSystemThread(&ThreadHandle,
00572                          THREAD_ALL_ACCESS,
00573                          NULL,
00574                          0,
00575                          NULL,
00576                          ExpWorkerThreadBalanceManager,
00577                          NULL);
00578 
00579     /* Get a pointer to it for the shutdown process */
00580     ObReferenceObjectByHandle(ThreadHandle,
00581                               THREAD_ALL_ACCESS,
00582                               NULL,
00583                               KernelMode,
00584                               (PVOID*)&Thread,
00585                               NULL);
00586     ExpWorkerThreadBalanceManagerPtr = Thread;
00587 
00588     /* Close the handle and return */
00589     ObCloseHandle(ThreadHandle, KernelMode);
00590 }
00591 
00592 VOID
00593 NTAPI
00594 ExpSetSwappingKernelApc(IN PKAPC Apc,
00595                         OUT PKNORMAL_ROUTINE *NormalRoutine,
00596                         IN OUT PVOID *NormalContext,
00597                         IN OUT PVOID *SystemArgument1,
00598                         IN OUT PVOID *SystemArgument2)
00599 {
00600     PBOOLEAN AllowSwap;
00601     PKEVENT Event = (PKEVENT)*SystemArgument1;
00602 
00603     /* Make sure it's an active worker */
00604     if (PsGetCurrentThread()->ActiveExWorker) 
00605     {
00606         /* Read the setting from the context flag */
00607         AllowSwap = (PBOOLEAN)NormalContext;
00608         KeSetKernelStackSwapEnable(*AllowSwap);
00609     }
00610 
00611     /* Let caller know that we're done */
00612     KeSetEvent(Event, 0, FALSE);
00613 }
00614 
00615 VOID
00616 NTAPI
00617 ExSwapinWorkerThreads(IN BOOLEAN AllowSwap)
00618 {
00619     KEVENT Event;
00620     PETHREAD CurrentThread = PsGetCurrentThread(), Thread;
00621     PEPROCESS Process = PsInitialSystemProcess;
00622     KAPC Apc;
00623     PAGED_CODE();
00624 
00625     /* Initialize an event so we know when we're done */
00626     KeInitializeEvent(&Event, NotificationEvent, FALSE);
00627 
00628     /* Lock this routine */
00629     ExAcquireFastMutex(&ExpWorkerSwapinMutex);
00630 
00631     /* New threads cannot swap anymore */
00632     ExpWorkersCanSwap = AllowSwap;
00633 
00634     /* Loop all threads in the system process */
00635     Thread = PsGetNextProcessThread(Process, NULL);
00636     while (Thread)
00637     {
00638         /* Skip threads with explicit permission to do this */
00639         if (Thread->ExWorkerCanWaitUser) goto Next;
00640 
00641         /* Check if we reached ourselves */
00642         if (Thread == CurrentThread)
00643         {
00644             /* Do it inline */
00645             KeSetKernelStackSwapEnable(AllowSwap);
00646         }
00647         else
00648         {
00649             /* Queue an APC */
00650             KeInitializeApc(&Apc,
00651                             &Thread->Tcb,
00652                             InsertApcEnvironment,
00653                             ExpSetSwappingKernelApc,
00654                             NULL,
00655                             NULL,
00656                             KernelMode,
00657                             &AllowSwap);
00658             if (KeInsertQueueApc(&Apc, &Event, NULL, 3))
00659             {
00660                 /* Wait for the APC to run */
00661                 KeWaitForSingleObject(&Event, Executive, KernelMode, FALSE, NULL);
00662                 KeClearEvent(&Event);
00663             }
00664         }
00665         
00666         /* Next thread */
00667 Next:
00668         Thread = PsGetNextProcessThread(Process, Thread);
00669     }
00670 
00671     /* Release the lock */
00672     ExReleaseFastMutex(&ExpWorkerSwapinMutex);
00673 }
00674 
00675 /* PUBLIC FUNCTIONS **********************************************************/
00676 
00677 /*++
00678  * @name ExQueueWorkItem
00679  * @implemented NT4
00680  *
00681  *     The ExQueueWorkItem routine acquires rundown protection for
00682  *     the specified descriptor.
00683  *
00684  * @param WorkItem
00685  *        Pointer to an initialized Work Queue Item structure. This structure
00686  *        must be located in nonpaged pool memory.
00687  *
00688  * @param QueueType
00689  *        Type of the queue to use for this item. Can be one of the following:
00690  *          - DelayedWorkQueue
00691  *          - CriticalWorkQueue
00692  *          - HyperCriticalWorkQueue
00693  *
00694  * @return None.
00695  *
00696  * @remarks This routine is obsolete. Use IoQueueWorkItem instead.
00697  *
00698  *          Callers of this routine must be running at IRQL <= DISPATCH_LEVEL.
00699  *
00700  *--*/
00701 VOID
00702 NTAPI
00703 ExQueueWorkItem(IN PWORK_QUEUE_ITEM WorkItem,
00704                 IN WORK_QUEUE_TYPE QueueType)
00705 {
00706     PEX_WORK_QUEUE WorkQueue = &ExWorkerQueue[QueueType];
00707     ASSERT(QueueType < MaximumWorkQueue);
00708     ASSERT(WorkItem->List.Flink == NULL);
00709 
00710     /* Don't try to trick us */
00711     if ((ULONG_PTR)WorkItem->WorkerRoutine < MmUserProbeAddress)
00712     {
00713         /* Bugcheck the system */
00714         KeBugCheckEx(WORKER_INVALID,
00715                      1,
00716                      (ULONG_PTR)WorkItem,
00717                      (ULONG_PTR)WorkItem->WorkerRoutine,
00718                      0);
00719     }
00720 
00721     /* Insert the Queue */
00722     KeInsertQueue(&WorkQueue->WorkerQueue, &WorkItem->List);
00723     ASSERT(!WorkQueue->Info.QueueDisabled);
00724 
00725     /*
00726      * Check if we need a new thread. Our decision is as follows:
00727      *  - This queue type must support Dynamic Threads (duh!)
00728      *  - It actually has to have unprocessed items
00729      *  - We have CPUs which could be handling another thread
00730      *  - We haven't abused our usage of dynamic threads.
00731      */
00732     if ((WorkQueue->Info.MakeThreadsAsNecessary) &&
00733         (!IsListEmpty(&WorkQueue->WorkerQueue.EntryListHead)) &&
00734         (WorkQueue->WorkerQueue.CurrentCount <
00735          WorkQueue->WorkerQueue.MaximumCount) &&
00736         (WorkQueue->DynamicThreadCount < 16))
00737     {
00738         /* Let the balance manager know about it */
00739         DPRINT1("Requesting a new thread. CurrentCount: %d. MaxCount: %d\n",
00740                 WorkQueue->WorkerQueue.CurrentCount,
00741                 WorkQueue->WorkerQueue.MaximumCount);
00742         KeSetEvent(&ExpThreadSetManagerEvent, 0, FALSE);
00743     }
00744 }
00745 
00746 /* EOF */
00747 

Generated on Sun May 27 2012 04:37:10 for ReactOS by doxygen 1.7.6.1

ReactOS is a registered trademark or a trademark of ReactOS Foundation in the United States and other countries.