ReactOS Fundraising Campaign 2012
 
€ 4,410 / € 30,000

Information | Donate

Home | Info | Community | Development | myReactOS | Contact Us

  1. Home
  2. Community
  3. Development
  4. myReactOS
  5. Fundraiser 2012

  1. Main Page
  2. Alphabetical List
  3. Data Structures
  4. Directories
  5. File List
  6. Data Fields
  7. Globals
  8. Related Pages

ReactOS Development > Doxygen

workitem.c
Go to the documentation of this file.
00001 /*
00002  * COPYRIGHT:         See COPYING in the top level directory
00003  * PROJECT:           ReactOS system libraries
00004  * PURPOSE:           Work Item implementation
00005  * FILE:              lib/rtl/workitem.c
00006  * PROGRAMMER:
00007  */
00008 
00009 /* INCLUDES *****************************************************************/
00010 
00011 #include <rtl.h>
00012 
00013 #define NDEBUG
00014 #include <debug.h>
00015 
00016 /* FUNCTIONS ***************************************************************/
00017 
00018 NTSTATUS
00019 NTAPI
00020 RtlpStartThread(IN PTHREAD_START_ROUTINE Function,
00021                 IN PVOID Parameter,
00022                 OUT PHANDLE ThreadHandle)
00023 {
00024     /* Create a native worker thread -- used for SMSS, CSRSS, etc... */
00025     return RtlCreateUserThread(NtCurrentProcess(),
00026                                NULL,
00027                                TRUE,
00028                                0,
00029                                0,
00030                                0,
00031                                Function,
00032                                Parameter,
00033                                ThreadHandle,
00034                                NULL);
00035 }
00036 
00037 NTSTATUS
00038 NTAPI
00039 RtlpExitThread(IN NTSTATUS ExitStatus)
00040 {
00041     /* Kill a native worker thread -- used for SMSS, CSRSS, etc... */
00042     return NtTerminateThread(NtCurrentThread(), ExitStatus);
00043 }
00044 
00045 PRTL_START_POOL_THREAD RtlpStartThreadFunc = RtlpStartThread;
00046 PRTL_EXIT_POOL_THREAD RtlpExitThreadFunc = RtlpExitThread;
00047 
00048 #define MAX_WORKERTHREADS   0x100
00049 #define WORKERTHREAD_CREATION_THRESHOLD 0x5
00050 
00051 typedef struct _RTLP_IOWORKERTHREAD
00052 {
00053     LIST_ENTRY ListEntry;
00054     HANDLE ThreadHandle;
00055     ULONG Flags;
00056 } RTLP_IOWORKERTHREAD, *PRTLP_IOWORKERTHREAD;
00057 
00058 typedef struct _RTLP_WORKITEM
00059 {
00060     WORKERCALLBACKFUNC Function;
00061     PVOID Context;
00062     ULONG Flags;
00063     HANDLE TokenHandle;
00064 } RTLP_WORKITEM, *PRTLP_WORKITEM;
00065 
00066 static LONG ThreadPoolInitialized = 0;
00067 static RTL_CRITICAL_SECTION ThreadPoolLock;
00068 static PRTLP_IOWORKERTHREAD PersistentIoThread;
00069 static LIST_ENTRY ThreadPoolIOWorkerThreadsList;
00070 static HANDLE ThreadPoolCompletionPort;
00071 static LONG ThreadPoolWorkerThreads;
00072 static LONG ThreadPoolWorkerThreadsRequests;
00073 static LONG ThreadPoolWorkerThreadsLongRequests;
00074 static LONG ThreadPoolIOWorkerThreads;
00075 static LONG ThreadPoolIOWorkerThreadsRequests;
00076 static LONG ThreadPoolIOWorkerThreadsLongRequests;
00077 
00078 #define IsThreadPoolInitialized() (*((volatile LONG*)&ThreadPoolInitialized) == 1)
00079 
00080 static NTSTATUS
00081 RtlpInitializeThreadPool(VOID)
00082 {
00083     NTSTATUS Status = STATUS_SUCCESS;
00084     LONG InitStatus;
00085 
00086     do
00087     {
00088         InitStatus = InterlockedCompareExchange(&ThreadPoolInitialized,
00089                                                  2,
00090                                                  0);
00091         if (InitStatus == 0)
00092         {
00093             /* We're the first thread to initialize the thread pool */
00094 
00095             InitializeListHead(&ThreadPoolIOWorkerThreadsList);
00096 
00097             PersistentIoThread = NULL;
00098 
00099             ThreadPoolWorkerThreads = 0;
00100             ThreadPoolWorkerThreadsRequests = 0;
00101             ThreadPoolWorkerThreadsLongRequests = 0;
00102             ThreadPoolIOWorkerThreads = 0;
00103             ThreadPoolIOWorkerThreadsRequests = 0;
00104             ThreadPoolIOWorkerThreadsLongRequests = 0;
00105 
00106             /* Initialize the lock */
00107             Status = RtlInitializeCriticalSection(&ThreadPoolLock);
00108             if (!NT_SUCCESS(Status))
00109                 goto Finish;
00110 
00111             /* Create the complection port */
00112             Status = NtCreateIoCompletion(&ThreadPoolCompletionPort,
00113                                           IO_COMPLETION_ALL_ACCESS,
00114                                           NULL,
00115                                           0);
00116             if (!NT_SUCCESS(Status))
00117             {
00118                 RtlDeleteCriticalSection(&ThreadPoolLock);
00119                 goto Finish;
00120             }
00121 
00122 Finish:
00123             /* Initialization done */
00124             InterlockedExchange(&ThreadPoolInitialized,
00125                                  1);
00126             break;
00127         }
00128         else if (InitStatus == 2)
00129         {
00130             LARGE_INTEGER Timeout;
00131 
00132             /* Another thread is currently initializing the thread pool!
00133                Poll after a short period of time to see if the initialization
00134                was completed */
00135 
00136             Timeout.QuadPart = -10000000LL; /* Wait for a second */
00137             NtDelayExecution(FALSE,
00138                              &Timeout);
00139         }
00140     } while (InitStatus != 1);
00141 
00142     return Status;
00143 }
00144 
00145 static NTSTATUS
00146 RtlpGetImpersonationToken(OUT PHANDLE TokenHandle)
00147 {
00148     NTSTATUS Status;
00149 
00150     Status = NtOpenThreadToken(NtCurrentThread(),
00151                                TOKEN_IMPERSONATE,
00152                                TRUE,
00153                                TokenHandle);
00154     if (Status == STATUS_NO_TOKEN || Status == STATUS_CANT_OPEN_ANONYMOUS)
00155     {
00156         *TokenHandle = NULL;
00157         Status = STATUS_SUCCESS;
00158     }
00159 
00160     return Status;
00161 }
00162 
00163 static NTSTATUS
00164 RtlpStartWorkerThread(PTHREAD_START_ROUTINE StartRoutine)
00165 {
00166     NTSTATUS Status;
00167     HANDLE ThreadHandle;
00168     LARGE_INTEGER Timeout;
00169     volatile LONG WorkerInitialized = 0;
00170 
00171     Timeout.QuadPart = -10000LL; /* Wait for 100ms */
00172 
00173     /* Start the thread */
00174     Status = RtlpStartThreadFunc(StartRoutine, (PVOID)&WorkerInitialized, &ThreadHandle);
00175     if (NT_SUCCESS(Status))
00176     {
00177         NtResumeThread(ThreadHandle, NULL);
00178         
00179         /* Poll until the thread got a chance to initialize */
00180         while (WorkerInitialized == 0)
00181         {
00182             NtDelayExecution(FALSE,
00183                              &Timeout);
00184         }
00185 
00186         NtClose(ThreadHandle);
00187     }
00188 
00189     return Status;
00190 }
00191 
00192 static VOID
00193 NTAPI
00194 RtlpExecuteWorkItem(IN OUT PVOID NormalContext,
00195                     IN OUT PVOID SystemArgument1,
00196                     IN OUT PVOID SystemArgument2)
00197 {
00198     NTSTATUS Status;
00199     BOOLEAN Impersonated = FALSE;
00200     RTLP_WORKITEM WorkItem = *(volatile RTLP_WORKITEM *)SystemArgument2;
00201 
00202     RtlFreeHeap(RtlGetProcessHeap(),
00203                 0,
00204                 SystemArgument2);
00205 
00206     if (WorkItem.TokenHandle != NULL)
00207     {
00208         Status = NtSetInformationThread(NtCurrentThread(),
00209                                         ThreadImpersonationToken,
00210                                         &WorkItem.TokenHandle,
00211                                         sizeof(HANDLE));
00212 
00213         NtClose(WorkItem.TokenHandle);
00214 
00215         if (NT_SUCCESS(Status))
00216         {
00217             Impersonated = TRUE;
00218         }
00219     }
00220 
00221     _SEH2_TRY
00222     {
00223         DPRINT("RtlpExecuteWorkItem: Function: 0x%p Context: 0x%p ImpersonationToken: 0x%p\n", WorkItem.Function, WorkItem.Context, WorkItem.TokenHandle);
00224 
00225         /* Execute the function */
00226         WorkItem.Function(WorkItem.Context);
00227     }
00228     _SEH2_EXCEPT(EXCEPTION_EXECUTE_HANDLER)
00229     {
00230         DPRINT1("Exception 0x%x while executing IO work item 0x%p\n", _SEH2_GetExceptionCode(), WorkItem.Function);
00231     }
00232     _SEH2_END;
00233 
00234     if (Impersonated)
00235     {
00236         WorkItem.TokenHandle = NULL;
00237         Status = NtSetInformationThread(NtCurrentThread(),
00238                                         ThreadImpersonationToken,
00239                                         &WorkItem.TokenHandle,
00240                                         sizeof(HANDLE));
00241         if (!NT_SUCCESS(Status))
00242         {
00243             DPRINT1("Failed to revert worker thread to self!!! Status: 0x%x\n", Status);
00244         }
00245     }
00246 
00247     /* update the requests counter */
00248     InterlockedDecrement(&ThreadPoolWorkerThreadsRequests);
00249 
00250     if (WorkItem.Flags & WT_EXECUTELONGFUNCTION)
00251     {
00252         InterlockedDecrement(&ThreadPoolWorkerThreadsLongRequests);
00253     }
00254 }
00255 
00256 
00257 static NTSTATUS
00258 RtlpQueueWorkerThread(IN OUT PRTLP_WORKITEM WorkItem)
00259 {
00260     NTSTATUS Status = STATUS_SUCCESS;
00261 
00262     InterlockedIncrement(&ThreadPoolWorkerThreadsRequests);
00263 
00264     if (WorkItem->Flags & WT_EXECUTELONGFUNCTION)
00265     {
00266         InterlockedIncrement(&ThreadPoolWorkerThreadsLongRequests);
00267     }
00268 
00269     if (WorkItem->Flags & WT_EXECUTEINPERSISTENTTHREAD)
00270     {
00271         Status = RtlpInitializeTimerThread();
00272 
00273         if (NT_SUCCESS(Status))
00274         {
00275             /* Queue an APC in the timer thread */
00276             Status = NtQueueApcThread(TimerThreadHandle,
00277                                       RtlpExecuteWorkItem,
00278                                       NULL,
00279                                       NULL,
00280                                       WorkItem);
00281         }
00282     }
00283     else
00284     {
00285         /* Queue an IO completion message */
00286         Status = NtSetIoCompletion(ThreadPoolCompletionPort,
00287                                    RtlpExecuteWorkItem,
00288                                    WorkItem,
00289                                    STATUS_SUCCESS,
00290                                    0);
00291     }
00292 
00293     if (!NT_SUCCESS(Status))
00294     {
00295         InterlockedDecrement(&ThreadPoolWorkerThreadsRequests);
00296 
00297         if (WorkItem->Flags & WT_EXECUTELONGFUNCTION)
00298         {
00299             InterlockedDecrement(&ThreadPoolWorkerThreadsLongRequests);
00300         }
00301     }
00302 
00303     return Status;
00304 }
00305 
00306 static VOID
00307 NTAPI
00308 RtlpExecuteIoWorkItem(IN OUT PVOID NormalContext,
00309                       IN OUT PVOID SystemArgument1,
00310                       IN OUT PVOID SystemArgument2)
00311 {
00312     NTSTATUS Status;
00313     BOOLEAN Impersonated = FALSE;
00314     PRTLP_IOWORKERTHREAD IoThread = (PRTLP_IOWORKERTHREAD)NormalContext;
00315     RTLP_WORKITEM WorkItem = *(volatile RTLP_WORKITEM *)SystemArgument2;
00316 
00317     ASSERT(IoThread != NULL);
00318 
00319     RtlFreeHeap(RtlGetProcessHeap(),
00320                 0,
00321                 SystemArgument2);
00322 
00323     if (WorkItem.TokenHandle != NULL)
00324     {
00325         Status = NtSetInformationThread(NtCurrentThread(),
00326                                         ThreadImpersonationToken,
00327                                         &WorkItem.TokenHandle,
00328                                         sizeof(HANDLE));
00329 
00330         NtClose(WorkItem.TokenHandle);
00331 
00332         if (NT_SUCCESS(Status))
00333         {
00334             Impersonated = TRUE;
00335         }
00336     }
00337 
00338     _SEH2_TRY
00339     {
00340         DPRINT("RtlpExecuteIoWorkItem: Function: 0x%p Context: 0x%p ImpersonationToken: 0x%p\n", WorkItem.Function, WorkItem.Context, WorkItem.TokenHandle);
00341 
00342         /* Execute the function */
00343         WorkItem.Function(WorkItem.Context);
00344     }
00345     _SEH2_EXCEPT(EXCEPTION_EXECUTE_HANDLER)
00346     {
00347         DPRINT1("Exception 0x%x while executing IO work item 0x%p\n", _SEH2_GetExceptionCode(), WorkItem.Function);
00348     }
00349     _SEH2_END;
00350 
00351     if (Impersonated)
00352     {
00353         WorkItem.TokenHandle = NULL;
00354         Status = NtSetInformationThread(NtCurrentThread(),
00355                                         ThreadImpersonationToken,
00356                                         &WorkItem.TokenHandle,
00357                                         sizeof(HANDLE));
00358         if (!NT_SUCCESS(Status))
00359         {
00360             DPRINT1("Failed to revert worker thread to self!!! Status: 0x%x\n", Status);
00361         }
00362     }
00363 
00364     /* remove the long function flag */
00365     if (WorkItem.Flags & WT_EXECUTELONGFUNCTION)
00366     {
00367         Status = RtlEnterCriticalSection(&ThreadPoolLock);
00368         if (NT_SUCCESS(Status))
00369         {
00370             IoThread->Flags &= ~WT_EXECUTELONGFUNCTION;
00371             RtlLeaveCriticalSection(&ThreadPoolLock);
00372         }
00373     }
00374 
00375     /* update the requests counter */
00376     InterlockedDecrement(&ThreadPoolIOWorkerThreadsRequests);
00377 
00378     if (WorkItem.Flags & WT_EXECUTELONGFUNCTION)
00379     {
00380         InterlockedDecrement(&ThreadPoolIOWorkerThreadsLongRequests);
00381     }
00382 }
00383 
00384 static NTSTATUS
00385 RtlpQueueIoWorkerThread(IN OUT PRTLP_WORKITEM WorkItem)
00386 {
00387     PLIST_ENTRY CurrentEntry;
00388     PRTLP_IOWORKERTHREAD IoThread = NULL;
00389     NTSTATUS Status = STATUS_SUCCESS;
00390 
00391     if (WorkItem->Flags & WT_EXECUTEINPERSISTENTIOTHREAD)
00392     {
00393         if (PersistentIoThread != NULL)
00394         {
00395             /* We already have a persistent IO worker thread */
00396             IoThread = PersistentIoThread;
00397         }
00398         else
00399         {
00400             /* We're not aware of any persistent IO worker thread. Search for a unused
00401                worker thread that doesn't have a long function queued */
00402             CurrentEntry = ThreadPoolIOWorkerThreadsList.Flink;
00403             while (CurrentEntry != &ThreadPoolIOWorkerThreadsList)
00404             {
00405                 IoThread = CONTAINING_RECORD(CurrentEntry,
00406                                              RTLP_IOWORKERTHREAD,
00407                                              ListEntry);
00408 
00409                 if (!(IoThread->Flags & WT_EXECUTELONGFUNCTION))
00410                     break;
00411 
00412                 CurrentEntry = CurrentEntry->Flink;
00413             }
00414 
00415             if (CurrentEntry != &ThreadPoolIOWorkerThreadsList)
00416             {
00417                 /* Found a worker thread we can use. */
00418                 ASSERT(IoThread != NULL);
00419 
00420                 IoThread->Flags |= WT_EXECUTEINPERSISTENTIOTHREAD;
00421                 PersistentIoThread = IoThread;
00422             }
00423             else
00424             {
00425                 DPRINT1("Failed to find a worker thread for the persistent IO thread!\n");
00426                 return STATUS_NO_MEMORY;
00427             }
00428         }
00429     }
00430     else
00431     {
00432         /* Find a worker thread that is not currently executing a long function */
00433         CurrentEntry = ThreadPoolIOWorkerThreadsList.Flink;
00434         while (CurrentEntry != &ThreadPoolIOWorkerThreadsList)
00435         {
00436             IoThread = CONTAINING_RECORD(CurrentEntry,
00437                                          RTLP_IOWORKERTHREAD,
00438                                          ListEntry);
00439 
00440             if (!(IoThread->Flags & WT_EXECUTELONGFUNCTION))
00441             {
00442                 /* if we're trying to queue a long function then make sure we're not dealing
00443                    with the persistent thread */
00444                 if ((WorkItem->Flags & WT_EXECUTELONGFUNCTION) && !(IoThread->Flags & WT_EXECUTEINPERSISTENTIOTHREAD))
00445                 {
00446                     /* found a candidate */
00447                     break;
00448                 }
00449             }
00450 
00451             CurrentEntry = CurrentEntry->Flink;
00452         }
00453 
00454         if (CurrentEntry == &ThreadPoolIOWorkerThreadsList)
00455         {
00456             /* Couldn't find an appropriate thread, see if we can use the persistent thread (if it exists) for now */
00457             if (ThreadPoolIOWorkerThreads == 0)
00458             {
00459                 DPRINT1("Failed to find a worker thread for the work item 0x%p!\n");
00460                 ASSERT(IsListEmpty(&ThreadPoolIOWorkerThreadsList));
00461                 return STATUS_NO_MEMORY;
00462             }
00463             else
00464             {
00465                 /* pick the first worker thread */
00466                 CurrentEntry = ThreadPoolIOWorkerThreadsList.Flink;
00467                 IoThread = CONTAINING_RECORD(CurrentEntry,
00468                                              RTLP_IOWORKERTHREAD,
00469                                              ListEntry);
00470 
00471                 /* Since this might be the persistent worker thread, don't run as a
00472                    long function */
00473                 WorkItem->Flags &= ~WT_EXECUTELONGFUNCTION;
00474             }
00475         }
00476 
00477         /* Move the picked thread to the end of the list. Since we're always searching
00478            from the beginning, this improves distribution of work items */
00479         RemoveEntryList(&IoThread->ListEntry);
00480         InsertTailList(&ThreadPoolIOWorkerThreadsList,
00481                        &IoThread->ListEntry);
00482     }
00483 
00484     ASSERT(IoThread != NULL);
00485 
00486     InterlockedIncrement(&ThreadPoolIOWorkerThreadsRequests);
00487 
00488     if (WorkItem->Flags & WT_EXECUTELONGFUNCTION)
00489     {
00490         /* We're about to queue a long function, mark the thread */
00491         IoThread->Flags |= WT_EXECUTELONGFUNCTION;
00492 
00493         InterlockedIncrement(&ThreadPoolIOWorkerThreadsLongRequests);
00494     }
00495 
00496     /* It's time to queue the work item */
00497     Status = NtQueueApcThread(IoThread->ThreadHandle,
00498                               RtlpExecuteIoWorkItem,
00499                               IoThread,
00500                               NULL,
00501                               WorkItem);
00502     if (!NT_SUCCESS(Status))
00503     {
00504         DPRINT1("Failed to queue APC for work item 0x%p\n", WorkItem->Function);
00505         InterlockedDecrement(&ThreadPoolIOWorkerThreadsRequests);
00506 
00507         if (WorkItem->Flags & WT_EXECUTELONGFUNCTION)
00508         {
00509             InterlockedDecrement(&ThreadPoolIOWorkerThreadsLongRequests);
00510         }
00511     }
00512 
00513     return Status;
00514 }
00515 
00516 static BOOLEAN
00517 RtlpIsIoPending(IN HANDLE ThreadHandle  OPTIONAL)
00518 {
00519     NTSTATUS Status;
00520     ULONG IoPending;
00521     BOOLEAN CreatedHandle = FALSE;
00522     BOOLEAN IsIoPending = TRUE;
00523 
00524     if (ThreadHandle == NULL)
00525     {
00526         Status = NtDuplicateObject(NtCurrentProcess(),
00527                                    NtCurrentThread(),
00528                                    NtCurrentProcess(),
00529                                    &ThreadHandle,
00530                                    0,
00531                                    0,
00532                                    DUPLICATE_SAME_ACCESS);
00533         if (!NT_SUCCESS(Status))
00534         {
00535             return IsIoPending;
00536         }
00537 
00538         CreatedHandle = TRUE;
00539     }
00540 
00541     Status = NtQueryInformationThread(ThreadHandle,
00542                                       ThreadIsIoPending,
00543                                       &IoPending,
00544                                       sizeof(IoPending),
00545                                       NULL);
00546     if (NT_SUCCESS(Status) && IoPending == 0)
00547     {
00548         IsIoPending = FALSE;
00549     }
00550 
00551     if (CreatedHandle)
00552     {
00553         NtClose(ThreadHandle);
00554     }
00555 
00556     return IsIoPending;
00557 }
00558 
00559 static ULONG
00560 NTAPI
00561 RtlpIoWorkerThreadProc(IN PVOID Parameter)
00562 {
00563     volatile RTLP_IOWORKERTHREAD ThreadInfo;
00564     LARGE_INTEGER Timeout;
00565     BOOLEAN Terminate;
00566     NTSTATUS Status = STATUS_SUCCESS;
00567 
00568     if (InterlockedIncrement(&ThreadPoolIOWorkerThreads) > MAX_WORKERTHREADS)
00569     {
00570         /* Oops, too many worker threads... */
00571         goto InitFailed;
00572     }
00573 
00574     /* Get a thread handle to ourselves */
00575     Status = NtDuplicateObject(NtCurrentProcess(),
00576                                NtCurrentThread(),
00577                                NtCurrentProcess(),
00578                                (PHANDLE)&ThreadInfo.ThreadHandle,
00579                                0,
00580                                0,
00581                                DUPLICATE_SAME_ACCESS);
00582     if (!NT_SUCCESS(Status))
00583     {
00584         DPRINT1("Failed to create handle to own thread! Status: 0x%x\n", Status);
00585 
00586 InitFailed:
00587         InterlockedDecrement(&ThreadPoolIOWorkerThreads);
00588 
00589         /* Signal initialization completion */
00590         InterlockedExchange((PLONG)Parameter,
00591                             1);
00592 
00593         RtlpExitThreadFunc(Status);
00594         return 0;
00595     }
00596 
00597     ThreadInfo.Flags = 0;
00598 
00599     /* Insert the thread into the list */
00600     InsertHeadList((PLIST_ENTRY)&ThreadPoolIOWorkerThreadsList,
00601                    (PLIST_ENTRY)&ThreadInfo.ListEntry);
00602 
00603     /* Signal initialization completion */
00604     InterlockedExchange((PLONG)Parameter,
00605                          1);
00606 
00607     for (;;)
00608     {
00609         Timeout.QuadPart = -50000000LL; /* Wait for 5 seconds by default */
00610 
00611 Wait:
00612         do
00613         {
00614             /* Perform an alertable wait, the work items are going to be executed as APCs */
00615             Status = NtDelayExecution(TRUE,
00616                                       &Timeout);
00617 
00618             /* Loop as long as we executed an APC */
00619         } while (Status != STATUS_SUCCESS);
00620 
00621         /* We timed out, let's see if we're allowed to terminate */
00622         Terminate = FALSE;
00623 
00624         Status = RtlEnterCriticalSection(&ThreadPoolLock);
00625         if (NT_SUCCESS(Status))
00626         {
00627             if (ThreadInfo.Flags & WT_EXECUTEINPERSISTENTIOTHREAD)
00628             {
00629                 /* This thread is supposed to be persistent. Don't terminate! */
00630                 RtlLeaveCriticalSection(&ThreadPoolLock);
00631 
00632                 Timeout.QuadPart = -0x7FFFFFFFFFFFFFFFLL;
00633                 goto Wait;
00634             }
00635 
00636             /* FIXME - figure out an effective method to determine if it's appropriate to
00637                        lower the number of threads. For now let's always terminate if there's
00638                        at least one thread and no queued items. */
00639             Terminate = (*((volatile LONG*)&ThreadPoolIOWorkerThreads) - *((volatile LONG*)&ThreadPoolIOWorkerThreadsLongRequests) >= WORKERTHREAD_CREATION_THRESHOLD) &&
00640                         (*((volatile LONG*)&ThreadPoolIOWorkerThreadsRequests) == 0);
00641 
00642             if (Terminate)
00643             {
00644                 /* Prevent termination as long as IO is pending */
00645                 Terminate = !RtlpIsIoPending(ThreadInfo.ThreadHandle);
00646             }
00647 
00648             if (Terminate)
00649             {
00650                 /* Rundown the thread and unlink it from the list */
00651                 InterlockedDecrement(&ThreadPoolIOWorkerThreads);
00652                 RemoveEntryList((PLIST_ENTRY)&ThreadInfo.ListEntry);
00653             }
00654 
00655             RtlLeaveCriticalSection(&ThreadPoolLock);
00656 
00657             if (Terminate)
00658             {
00659                 /* Break the infinite loop and terminate */
00660                 Status = STATUS_SUCCESS;
00661                 break;
00662             }
00663         }
00664         else
00665         {
00666             DPRINT1("Failed to acquire the thread pool lock!!! Status: 0x%x\n", Status);
00667             break;
00668         }
00669     }
00670 
00671     NtClose(ThreadInfo.ThreadHandle);
00672     RtlpExitThreadFunc(Status);
00673     return 0;
00674 }
00675 
00676 static ULONG
00677 NTAPI
00678 RtlpWorkerThreadProc(IN PVOID Parameter)
00679 {
00680     LARGE_INTEGER Timeout;
00681     BOOLEAN Terminate;
00682     PVOID SystemArgument2;
00683     IO_STATUS_BLOCK IoStatusBlock;
00684     ULONG TimeoutCount = 0;
00685     PKNORMAL_ROUTINE ApcRoutine;
00686     NTSTATUS Status = STATUS_SUCCESS;
00687 
00688     if (InterlockedIncrement(&ThreadPoolWorkerThreads) > MAX_WORKERTHREADS)
00689     {
00690         /* Signal initialization completion */
00691         InterlockedExchange((PLONG)Parameter,
00692                              1);
00693 
00694         /* Oops, too many worker threads... */
00695         RtlpExitThreadFunc(Status);
00696         return 0;
00697     }
00698 
00699     /* Signal initialization completion */
00700     InterlockedExchange((PLONG)Parameter,
00701                          1);
00702 
00703     for (;;)
00704     {
00705         Timeout.QuadPart = -50000000LL; /* Wait for 5 seconds by default */
00706 
00707         /* Dequeue a completion message */
00708         Status = NtRemoveIoCompletion(ThreadPoolCompletionPort,
00709                                       (PVOID*)&ApcRoutine,
00710                                       &SystemArgument2,
00711                                       &IoStatusBlock,
00712                                       &Timeout);
00713 
00714         if (Status == STATUS_SUCCESS)
00715         {
00716             TimeoutCount = 0;
00717 
00718             _SEH2_TRY
00719             {
00720                 /* Call the APC routine */
00721                 ApcRoutine(NULL,
00722                            (PVOID)IoStatusBlock.Information,
00723                            SystemArgument2);
00724             }
00725             _SEH2_EXCEPT(EXCEPTION_EXECUTE_HANDLER)
00726             {
00727             }
00728             _SEH2_END;
00729         }
00730         else
00731         {
00732             Terminate = FALSE;
00733 
00734             if (!NT_SUCCESS(RtlEnterCriticalSection(&ThreadPoolLock)))
00735                 continue;
00736 
00737             /* FIXME - this should be optimized, check if there's requests, etc */
00738 
00739             if (Status == STATUS_TIMEOUT)
00740             {
00741                 /* FIXME - we might want to optimize this */
00742                 if (TimeoutCount++ > 2 &&
00743                     *((volatile LONG*)&ThreadPoolWorkerThreads) - *((volatile LONG*)&ThreadPoolWorkerThreadsLongRequests) >= WORKERTHREAD_CREATION_THRESHOLD)
00744                 {
00745                     Terminate = TRUE;
00746                 }
00747             }
00748             else
00749                 Terminate = TRUE;
00750 
00751             RtlLeaveCriticalSection(&ThreadPoolLock);
00752 
00753             if (Terminate)
00754             {
00755                 /* Prevent termination as long as IO is pending */
00756                 Terminate = !RtlpIsIoPending(NULL);
00757             }
00758 
00759             if (Terminate)
00760             {
00761                 InterlockedDecrement(&ThreadPoolWorkerThreads);
00762                 Status = STATUS_SUCCESS;
00763                 break;
00764             }
00765         }
00766     }
00767 
00768     RtlpExitThreadFunc(Status);
00769     return 0;
00770 
00771 }
00772 
00773 /*
00774  * @implemented
00775  */
00776 NTSTATUS
00777 NTAPI
00778 RtlQueueWorkItem(IN WORKERCALLBACKFUNC Function,
00779                  IN PVOID Context  OPTIONAL,
00780                  IN ULONG Flags)
00781 {
00782     LONG FreeWorkers;
00783     NTSTATUS Status;
00784     PRTLP_WORKITEM WorkItem;
00785 
00786     DPRINT("RtlQueueWorkItem(0x%p, 0x%p, 0x%x)\n", Function, Context, Flags);
00787 
00788     /* Initialize the thread pool if not already initialized */
00789     if (!IsThreadPoolInitialized())
00790     {
00791         Status = RtlpInitializeThreadPool();
00792 
00793         if (!NT_SUCCESS(Status))
00794             return Status;
00795     }
00796 
00797     /* Allocate a work item */
00798     WorkItem = RtlAllocateHeap(RtlGetProcessHeap(),
00799                                0,
00800                                sizeof(RTLP_WORKITEM));
00801     if (WorkItem == NULL)
00802         return STATUS_NO_MEMORY;
00803 
00804     WorkItem->Function = Function;
00805     WorkItem->Context = Context;
00806     WorkItem->Flags = Flags;
00807 
00808     if (Flags & WT_TRANSFER_IMPERSONATION)
00809     {
00810         Status = RtlpGetImpersonationToken(&WorkItem->TokenHandle);
00811 
00812         if (!NT_SUCCESS(Status))
00813         {
00814             DPRINT1("Failed to get impersonation token! Status: 0x%x\n", Status);
00815             goto Cleanup;
00816         }
00817     }
00818     else
00819         WorkItem->TokenHandle = NULL;
00820 
00821     Status = RtlEnterCriticalSection(&ThreadPoolLock);
00822     if (NT_SUCCESS(Status))
00823     {
00824         if (Flags & (WT_EXECUTEINIOTHREAD | WT_EXECUTEINUITHREAD | WT_EXECUTEINPERSISTENTIOTHREAD))
00825         {
00826             /* FIXME - We should optimize the algorithm used to determine whether to grow the thread pool! */
00827 
00828             FreeWorkers = ThreadPoolIOWorkerThreads - ThreadPoolIOWorkerThreadsLongRequests;
00829 
00830             if (((Flags & (WT_EXECUTEINPERSISTENTIOTHREAD | WT_EXECUTELONGFUNCTION)) == WT_EXECUTELONGFUNCTION) &&
00831                 PersistentIoThread != NULL)
00832             {
00833                 /* We shouldn't queue a long function into the persistent IO thread */
00834                 FreeWorkers--;
00835             }
00836 
00837             /* See if it's a good idea to grow the pool */
00838             if (ThreadPoolIOWorkerThreads < MAX_WORKERTHREADS &&
00839                 (FreeWorkers <= 0 || ThreadPoolIOWorkerThreads - ThreadPoolIOWorkerThreadsRequests < WORKERTHREAD_CREATION_THRESHOLD))
00840             {
00841                 /* Grow the thread pool */
00842                 Status = RtlpStartWorkerThread(RtlpIoWorkerThreadProc);
00843 
00844                 if (!NT_SUCCESS(Status) && *((volatile LONG*)&ThreadPoolIOWorkerThreads) != 0)
00845                 {
00846                     /* We failed to create the thread, but there's at least one there so
00847                        we can at least queue the request */
00848                     Status = STATUS_SUCCESS;
00849                 }
00850             }
00851 
00852             if (NT_SUCCESS(Status))
00853             {
00854                 /* Queue a IO worker thread */
00855                 Status = RtlpQueueIoWorkerThread(WorkItem);
00856             }
00857         }
00858         else
00859         {
00860             /* FIXME - We should optimize the algorithm used to determine whether to grow the thread pool! */
00861 
00862             FreeWorkers = ThreadPoolWorkerThreads - ThreadPoolWorkerThreadsLongRequests;
00863 
00864             /* See if it's a good idea to grow the pool */
00865             if (ThreadPoolWorkerThreads < MAX_WORKERTHREADS &&
00866                 (FreeWorkers <= 0 || ThreadPoolWorkerThreads - ThreadPoolWorkerThreadsRequests < WORKERTHREAD_CREATION_THRESHOLD))
00867             {
00868                 /* Grow the thread pool */
00869                 Status = RtlpStartWorkerThread(RtlpWorkerThreadProc);
00870 
00871                 if (!NT_SUCCESS(Status) && *((volatile LONG*)&ThreadPoolWorkerThreads) != 0)
00872                 {
00873                     /* We failed to create the thread, but there's at least one there so
00874                        we can at least queue the request */
00875                     Status = STATUS_SUCCESS;
00876                 }
00877             }
00878 
00879             if (NT_SUCCESS(Status))
00880             {
00881                 /* Queue a normal worker thread */
00882                 Status = RtlpQueueWorkerThread(WorkItem);
00883             }
00884         }
00885 
00886         RtlLeaveCriticalSection(&ThreadPoolLock);
00887     }
00888 
00889     if (!NT_SUCCESS(Status))
00890     {
00891         if (WorkItem->TokenHandle != NULL)
00892         {
00893             NtClose(WorkItem->TokenHandle);
00894         }
00895 
00896 Cleanup:
00897         RtlFreeHeap(RtlGetProcessHeap(),
00898                     0,
00899                     WorkItem);
00900     }
00901 
00902     return Status;
00903 }
00904 
00905 /*
00906  * @unimplemented
00907  */
00908 NTSTATUS
00909 NTAPI
00910 RtlSetIoCompletionCallback(IN HANDLE FileHandle,
00911                            IN PIO_APC_ROUTINE Callback,
00912                            IN ULONG Flags)
00913 {
00914     UNIMPLEMENTED;
00915     return STATUS_NOT_IMPLEMENTED;
00916 }
00917 
00918 /*
00919  * @implemented
00920  */
00921 NTSTATUS
00922 NTAPI
00923 RtlSetThreadPoolStartFunc(IN PRTL_START_POOL_THREAD StartPoolThread,
00924                           IN PRTL_EXIT_POOL_THREAD ExitPoolThread)
00925 {
00926     RtlpStartThreadFunc = StartPoolThread;
00927     RtlpExitThreadFunc = ExitPoolThread;
00928     return STATUS_SUCCESS;
00929 }

Generated on Sun May 27 2012 04:28:06 for ReactOS by doxygen 1.7.6.1

ReactOS is a registered trademark or a trademark of ReactOS Foundation in the United States and other countries.