Home | Info | Community | Development | myReactOS | Contact Us
ReactOS Development > Doxygenqueue.c
Go to the documentation of this file.
00001 /* 00002 * PROJECT: ReactOS Kernel 00003 * LICENSE: GPL - See COPYING in the top level directory 00004 * FILE: ntoskrnl/ke/queue.c 00005 * PURPOSE: Implements kernel queues 00006 * PROGRAMMERS: Alex Ionescu (alex.ionescu@reactos.org) 00007 * Gunnar Dalsnes 00008 * Eric Kohl 00009 */ 00010 00011 /* INCLUDES ******************************************************************/ 00012 00013 #include <ntoskrnl.h> 00014 #define NDEBUG 00015 #include <debug.h> 00016 00017 /* PRIVATE FUNCTIONS *********************************************************/ 00018 00019 /* 00020 * Called when a thread which has a queue entry is entering a wait state 00021 */ 00022 VOID 00023 FASTCALL 00024 KiActivateWaiterQueue(IN PKQUEUE Queue) 00025 { 00026 PLIST_ENTRY QueueEntry; 00027 PLIST_ENTRY WaitEntry; 00028 PKWAIT_BLOCK WaitBlock; 00029 PKTHREAD Thread; 00030 ASSERT_QUEUE(Queue); 00031 00032 /* Decrement the number of active threads */ 00033 Queue->CurrentCount--; 00034 00035 /* Make sure the counts are OK */ 00036 if (Queue->CurrentCount < Queue->MaximumCount) 00037 { 00038 /* Get the Queue Entry */ 00039 QueueEntry = Queue->EntryListHead.Flink; 00040 00041 /* Get the Wait Entry */ 00042 WaitEntry = Queue->Header.WaitListHead.Blink; 00043 00044 /* Make sure that the Queue entries are not part of empty lists */ 00045 if ((WaitEntry != &Queue->Header.WaitListHead) && 00046 (QueueEntry != &Queue->EntryListHead)) 00047 { 00048 /* Remove this entry */ 00049 RemoveEntryList(QueueEntry); 00050 QueueEntry->Flink = NULL; 00051 00052 /* Decrease the Signal State */ 00053 Queue->Header.SignalState--; 00054 00055 /* Unwait the Thread */ 00056 WaitBlock = CONTAINING_RECORD(WaitEntry, 00057 KWAIT_BLOCK, 00058 WaitListEntry); 00059 Thread = WaitBlock->Thread; 00060 KiUnwaitThread(Thread, (LONG_PTR)QueueEntry, IO_NO_INCREMENT); 00061 } 00062 } 00063 } 00064 00065 /* 00066 * Returns the previous number of entries in the queue 00067 */ 00068 LONG 00069 NTAPI 00070 KiInsertQueue(IN PKQUEUE Queue, 00071 IN PLIST_ENTRY Entry, 00072 IN BOOLEAN Head) 00073 { 00074 ULONG InitialState; 00075 PKTHREAD Thread = KeGetCurrentThread(); 00076 PKWAIT_BLOCK WaitBlock; 00077 PLIST_ENTRY WaitEntry; 00078 PKTIMER Timer; 00079 ASSERT_QUEUE(Queue); 00080 00081 /* Save the old state */ 00082 InitialState = Queue->Header.SignalState; 00083 00084 /* Get the Entry */ 00085 WaitEntry = Queue->Header.WaitListHead.Blink; 00086 00087 /* 00088 * Why the KeGetCurrentThread()->Queue != Queue? 00089 * KiInsertQueue might be called from an APC for the current thread. 00090 * -Gunnar 00091 */ 00092 if ((Queue->CurrentCount < Queue->MaximumCount) && 00093 (WaitEntry != &Queue->Header.WaitListHead) && 00094 ((Thread->Queue != Queue) || 00095 (Thread->WaitReason != WrQueue))) 00096 { 00097 /* Remove the wait entry */ 00098 RemoveEntryList(WaitEntry); 00099 00100 /* Get the Wait Block and Thread */ 00101 WaitBlock = CONTAINING_RECORD(WaitEntry, KWAIT_BLOCK, WaitListEntry); 00102 Thread = WaitBlock->Thread; 00103 00104 /* Remove the queue from the thread's wait list */ 00105 Thread->WaitStatus = (LONG_PTR)Entry; 00106 if (Thread->WaitListEntry.Flink) RemoveEntryList(&Thread->WaitListEntry); 00107 00108 /* Increase the active threads and remove any wait reason */ 00109 Queue->CurrentCount++; 00110 Thread->WaitReason = 0; 00111 00112 /* Check if there's a Thread Timer */ 00113 Timer = &Thread->Timer; 00114 if (Timer->Header.Inserted) KxRemoveTreeTimer(Timer); 00115 00116 /* Reschedule the Thread */ 00117 KiReadyThread(Thread); 00118 } 00119 else 00120 { 00121 /* Increase the Entries */ 00122 Queue->Header.SignalState++; 00123 00124 /* Check which mode we're using */ 00125 if (Head) 00126 { 00127 /* Insert in the head */ 00128 InsertHeadList(&Queue->EntryListHead, Entry); 00129 } 00130 else 00131 { 00132 /* Insert at the end */ 00133 InsertTailList(&Queue->EntryListHead, Entry); 00134 } 00135 } 00136 00137 /* Return the previous state */ 00138 return InitialState; 00139 } 00140 00141 /* PUBLIC FUNCTIONS **********************************************************/ 00142 00143 /* 00144 * @implemented 00145 */ 00146 VOID 00147 NTAPI 00148 KeInitializeQueue(IN PKQUEUE Queue, 00149 IN ULONG Count OPTIONAL) 00150 { 00151 /* Initialize the Header */ 00152 Queue->Header.Type = QueueObject; 00153 Queue->Header.Abandoned = 0; 00154 Queue->Header.Size = sizeof(KQUEUE) / sizeof(ULONG); 00155 Queue->Header.SignalState = 0; 00156 InitializeListHead(&(Queue->Header.WaitListHead)); 00157 00158 /* Initialize the Lists */ 00159 InitializeListHead(&Queue->EntryListHead); 00160 InitializeListHead(&Queue->ThreadListHead); 00161 00162 /* Set the Current and Maximum Count */ 00163 Queue->CurrentCount = 0; 00164 Queue->MaximumCount = (Count == 0) ? (ULONG) KeNumberProcessors : Count; 00165 } 00166 00167 /* 00168 * @implemented 00169 */ 00170 LONG 00171 NTAPI 00172 KeInsertHeadQueue(IN PKQUEUE Queue, 00173 IN PLIST_ENTRY Entry) 00174 { 00175 LONG PreviousState; 00176 KIRQL OldIrql; 00177 ASSERT_QUEUE(Queue); 00178 ASSERT_IRQL_LESS_OR_EQUAL(DISPATCH_LEVEL); 00179 00180 /* Lock the Dispatcher Database */ 00181 OldIrql = KiAcquireDispatcherLock(); 00182 00183 /* Insert the Queue */ 00184 PreviousState = KiInsertQueue(Queue, Entry, TRUE); 00185 00186 /* Release the Dispatcher Lock */ 00187 KiReleaseDispatcherLock(OldIrql); 00188 00189 /* Return previous State */ 00190 return PreviousState; 00191 } 00192 00193 /* 00194 * @implemented 00195 */ 00196 LONG 00197 NTAPI 00198 KeInsertQueue(IN PKQUEUE Queue, 00199 IN PLIST_ENTRY Entry) 00200 { 00201 LONG PreviousState; 00202 KIRQL OldIrql; 00203 ASSERT_QUEUE(Queue); 00204 ASSERT_IRQL_LESS_OR_EQUAL(DISPATCH_LEVEL); 00205 00206 /* Lock the Dispatcher Database */ 00207 OldIrql = KiAcquireDispatcherLock(); 00208 00209 /* Insert the Queue */ 00210 PreviousState = KiInsertQueue(Queue, Entry, FALSE); 00211 00212 /* Release the Dispatcher Lock */ 00213 KiReleaseDispatcherLock(OldIrql); 00214 00215 /* Return previous State */ 00216 return PreviousState; 00217 } 00218 00219 /* 00220 * @implemented 00221 * 00222 * Returns number of entries in the queue 00223 */ 00224 LONG 00225 NTAPI 00226 KeReadStateQueue(IN PKQUEUE Queue) 00227 { 00228 /* Returns the Signal State */ 00229 ASSERT_QUEUE(Queue); 00230 return Queue->Header.SignalState; 00231 } 00232 00233 /* 00234 * @implemented 00235 */ 00236 PLIST_ENTRY 00237 NTAPI 00238 KeRemoveQueue(IN PKQUEUE Queue, 00239 IN KPROCESSOR_MODE WaitMode, 00240 IN PLARGE_INTEGER Timeout OPTIONAL) 00241 { 00242 PLIST_ENTRY QueueEntry; 00243 LONG_PTR Status; 00244 PKTHREAD Thread = KeGetCurrentThread(); 00245 PKQUEUE PreviousQueue; 00246 PKWAIT_BLOCK WaitBlock = &Thread->WaitBlock[0]; 00247 PKWAIT_BLOCK TimerBlock = &Thread->WaitBlock[TIMER_WAIT_BLOCK]; 00248 PKTIMER Timer = &Thread->Timer; 00249 BOOLEAN Swappable; 00250 PLARGE_INTEGER OriginalDueTime = Timeout; 00251 LARGE_INTEGER DueTime = {{0}}, NewDueTime, InterruptTime; 00252 ULONG Hand = 0; 00253 ASSERT_QUEUE(Queue); 00254 ASSERT_IRQL_LESS_OR_EQUAL(DISPATCH_LEVEL); 00255 00256 /* Check if the Lock is already held */ 00257 if (Thread->WaitNext) 00258 { 00259 /* It is, so next time don't do expect this */ 00260 Thread->WaitNext = FALSE; 00261 KxQueueThreadWait(); 00262 } 00263 else 00264 { 00265 /* Raise IRQL to synch, prepare the wait, then lock the database */ 00266 Thread->WaitIrql = KeRaiseIrqlToSynchLevel(); 00267 KxQueueThreadWait(); 00268 KiAcquireDispatcherLockAtDpcLevel(); 00269 } 00270 00271 /* 00272 * This is needed so that we can set the new queue right here, 00273 * before additional processing 00274 */ 00275 PreviousQueue = Thread->Queue; 00276 Thread->Queue = Queue; 00277 00278 /* Check if this is a different queue */ 00279 if (Queue != PreviousQueue) 00280 { 00281 /* Get the current entry */ 00282 QueueEntry = &Thread->QueueListEntry; 00283 if (PreviousQueue) 00284 { 00285 /* Remove from this list */ 00286 RemoveEntryList(QueueEntry); 00287 00288 /* Wake the queue */ 00289 KiActivateWaiterQueue(PreviousQueue); 00290 } 00291 00292 /* Insert in this new Queue */ 00293 InsertTailList(&Queue->ThreadListHead, QueueEntry); 00294 } 00295 else 00296 { 00297 /* Same queue, decrement waiting threads */ 00298 Queue->CurrentCount--; 00299 } 00300 00301 /* Loop until the queue is processed */ 00302 while (TRUE) 00303 { 00304 /* Check if the counts are valid and if there is still a queued entry */ 00305 QueueEntry = Queue->EntryListHead.Flink; 00306 if ((Queue->CurrentCount < Queue->MaximumCount) && 00307 (QueueEntry != &Queue->EntryListHead)) 00308 { 00309 /* Decrease the number of entries */ 00310 Queue->Header.SignalState--; 00311 00312 /* Increase numbef of running threads */ 00313 Queue->CurrentCount++; 00314 00315 /* Check if the entry is valid. If not, bugcheck */ 00316 if (!(QueueEntry->Flink) || !(QueueEntry->Blink)) 00317 { 00318 /* Invalid item */ 00319 KeBugCheckEx(INVALID_WORK_QUEUE_ITEM, 00320 (ULONG_PTR)QueueEntry, 00321 (ULONG_PTR)Queue, 00322 (ULONG_PTR)NULL, 00323 (ULONG_PTR)((PWORK_QUEUE_ITEM)QueueEntry)-> 00324 WorkerRoutine); 00325 } 00326 00327 /* Remove the Entry */ 00328 RemoveEntryList(QueueEntry); 00329 QueueEntry->Flink = NULL; 00330 00331 /* Nothing to wait on */ 00332 break; 00333 } 00334 else 00335 { 00336 /* Check if a kernel APC is pending and we're below APC_LEVEL */ 00337 if ((Thread->ApcState.KernelApcPending) && 00338 !(Thread->SpecialApcDisable) && (Thread->WaitIrql < APC_LEVEL)) 00339 { 00340 /* Increment the count and unlock the dispatcher */ 00341 Queue->CurrentCount++; 00342 KiReleaseDispatcherLockFromDpcLevel(); 00343 KiExitDispatcher(Thread->WaitIrql); 00344 } 00345 else 00346 { 00347 /* Fail if there's a User APC Pending */ 00348 if ((WaitMode != KernelMode) && 00349 (Thread->ApcState.UserApcPending)) 00350 { 00351 /* Return the status and increase the pending threads */ 00352 QueueEntry = (PLIST_ENTRY)STATUS_USER_APC; 00353 Queue->CurrentCount++; 00354 break; 00355 } 00356 00357 /* Enable the Timeout Timer if there was any specified */ 00358 if (Timeout) 00359 { 00360 /* Check if the timer expired */ 00361 InterruptTime.QuadPart = KeQueryInterruptTime(); 00362 if ((ULONG64)InterruptTime.QuadPart >= Timer->DueTime.QuadPart) 00363 { 00364 /* It did, so we don't need to wait */ 00365 QueueEntry = (PLIST_ENTRY)STATUS_TIMEOUT; 00366 Queue->CurrentCount++; 00367 break; 00368 } 00369 00370 /* It didn't, so activate it */ 00371 Timer->Header.Inserted = TRUE; 00372 } 00373 00374 /* Insert the wait block in the list */ 00375 InsertTailList(&Queue->Header.WaitListHead, 00376 &WaitBlock->WaitListEntry); 00377 00378 /* Setup the wait information */ 00379 Thread->State = Waiting; 00380 00381 /* Add the thread to the wait list */ 00382 KiAddThreadToWaitList(Thread, Swappable); 00383 00384 /* Activate thread swap */ 00385 ASSERT(Thread->WaitIrql <= DISPATCH_LEVEL); 00386 KiSetThreadSwapBusy(Thread); 00387 00388 /* Check if we have a timer */ 00389 if (Timeout) 00390 { 00391 /* Insert it */ 00392 KxInsertTimer(Timer, Hand); 00393 } 00394 else 00395 { 00396 /* Otherwise, unlock the dispatcher */ 00397 KiReleaseDispatcherLockFromDpcLevel(); 00398 } 00399 00400 /* Do the actual swap */ 00401 Status = KiSwapThread(Thread, KeGetCurrentPrcb()); 00402 00403 /* Reset the wait reason */ 00404 Thread->WaitReason = 0; 00405 00406 /* Check if we were executing an APC */ 00407 if (Status != STATUS_KERNEL_APC) return (PLIST_ENTRY)Status; 00408 00409 /* Check if we had a timeout */ 00410 if (Timeout) 00411 { 00412 /* Recalculate due times */ 00413 Timeout = KiRecalculateDueTime(OriginalDueTime, 00414 &DueTime, 00415 &NewDueTime); 00416 } 00417 } 00418 00419 /* Start another wait */ 00420 Thread->WaitIrql = KeRaiseIrqlToSynchLevel(); 00421 KxQueueThreadWait(); 00422 KiAcquireDispatcherLockAtDpcLevel(); 00423 Queue->CurrentCount--; 00424 } 00425 } 00426 00427 /* Unlock Database and return */ 00428 KiReleaseDispatcherLockFromDpcLevel(); 00429 KiExitDispatcher(Thread->WaitIrql); 00430 return QueueEntry; 00431 } 00432 00433 /* 00434 * @implemented 00435 */ 00436 PLIST_ENTRY 00437 NTAPI 00438 KeRundownQueue(IN PKQUEUE Queue) 00439 { 00440 PLIST_ENTRY FirstEntry, NextEntry; 00441 PKTHREAD Thread; 00442 KIRQL OldIrql; 00443 ASSERT_QUEUE(Queue); 00444 ASSERT_IRQL_LESS_OR_EQUAL(DISPATCH_LEVEL); 00445 ASSERT(IsListEmpty(&Queue->Header.WaitListHead)); 00446 00447 /* Get the Dispatcher Lock */ 00448 OldIrql = KiAcquireDispatcherLock(); 00449 00450 /* Check if the list is empty */ 00451 FirstEntry = Queue->EntryListHead.Flink; 00452 if (FirstEntry == &Queue->EntryListHead) 00453 { 00454 /* We won't return anything */ 00455 FirstEntry = NULL; 00456 } 00457 else 00458 { 00459 /* Remove this entry */ 00460 RemoveEntryList(&Queue->EntryListHead); 00461 } 00462 00463 /* Loop the list */ 00464 while (!IsListEmpty(&Queue->ThreadListHead)) 00465 { 00466 /* Get the next entry */ 00467 NextEntry = Queue->ThreadListHead.Flink; 00468 00469 /* Get the associated thread */ 00470 Thread = CONTAINING_RECORD(NextEntry, KTHREAD, QueueListEntry); 00471 00472 /* Clear its queue */ 00473 Thread->Queue = NULL; 00474 00475 /* Remove this entry */ 00476 RemoveEntryList(NextEntry); 00477 } 00478 00479 /* Release the dispatcher lock */ 00480 KiReleaseDispatcherLockFromDpcLevel(); 00481 00482 /* Exit the dispatcher and return the first entry (if any) */ 00483 KiExitDispatcher(OldIrql); 00484 return FirstEntry; 00485 } 00486 00487 /* EOF */ Generated on Sun May 27 2012 04:26:13 for ReactOS by
1.7.6.1
|