ReactOS 0.4.16-dev-36-g301675c
queue.c
Go to the documentation of this file.
1/*
2 * PROJECT: ReactOS Kernel
3 * LICENSE: GPL - See COPYING in the top level directory
4 * FILE: ntoskrnl/ke/queue.c
5 * PURPOSE: Implements kernel queues
6 * PROGRAMMERS: Alex Ionescu (alex.ionescu@reactos.org)
7 * Gunnar Dalsnes
8 * Eric Kohl
9 */
10
11/* INCLUDES ******************************************************************/
12
13#include <ntoskrnl.h>
14#define NDEBUG
15#include <debug.h>
16
17/* PRIVATE FUNCTIONS *********************************************************/
18
19/*
20 * Called when a thread which has a queue entry is entering a wait state
21 */
22VOID
25{
26 PLIST_ENTRY QueueEntry;
27 PLIST_ENTRY WaitEntry;
28 PKWAIT_BLOCK WaitBlock;
31
32 /* Decrement the number of active threads */
33 Queue->CurrentCount--;
34
35 /* Make sure the counts are OK */
36 if (Queue->CurrentCount < Queue->MaximumCount)
37 {
38 /* Get the Queue Entry */
39 QueueEntry = Queue->EntryListHead.Flink;
40
41 /* Get the Wait Entry */
42 WaitEntry = Queue->Header.WaitListHead.Blink;
43
44 /* Make sure that the Queue entries are not part of empty lists */
45 if ((WaitEntry != &Queue->Header.WaitListHead) &&
46 (QueueEntry != &Queue->EntryListHead))
47 {
48 /* Remove this entry */
49 RemoveEntryList(QueueEntry);
50 QueueEntry->Flink = NULL;
51
52 /* Decrease the Signal State */
53 Queue->Header.SignalState--;
54
55 /* Unwait the Thread */
56 WaitBlock = CONTAINING_RECORD(WaitEntry,
58 WaitListEntry);
59 Thread = WaitBlock->Thread;
61 }
62 }
63}
64
65/*
66 * Returns the previous number of entries in the queue
67 */
68LONG
72 IN BOOLEAN Head)
73{
76 PKWAIT_BLOCK WaitBlock;
77 PLIST_ENTRY WaitEntry;
80
81 /* Save the old state */
82 InitialState = Queue->Header.SignalState;
83
84 /* Get the Entry */
85 WaitEntry = Queue->Header.WaitListHead.Blink;
86
87 /*
88 * Why the KeGetCurrentThread()->Queue != Queue?
89 * KiInsertQueue might be called from an APC for the current thread.
90 * -Gunnar
91 */
92 if ((Queue->CurrentCount < Queue->MaximumCount) &&
93 (WaitEntry != &Queue->Header.WaitListHead) &&
94 ((Thread->Queue != Queue) ||
95 (Thread->WaitReason != WrQueue)))
96 {
97 /* Remove the wait entry */
98 RemoveEntryList(WaitEntry);
99
100 /* Get the Wait Block and Thread */
101 WaitBlock = CONTAINING_RECORD(WaitEntry, KWAIT_BLOCK, WaitListEntry);
102 Thread = WaitBlock->Thread;
103
104 /* Remove the queue from the thread's wait list */
105 Thread->WaitStatus = (LONG_PTR)Entry;
106 if (Thread->WaitListEntry.Flink) RemoveEntryList(&Thread->WaitListEntry);
107
108 /* Increase the active threads and remove any wait reason */
109 Queue->CurrentCount++;
110 Thread->WaitReason = 0;
111
112 /* Check if there's a Thread Timer */
113 Timer = &Thread->Timer;
114 if (Timer->Header.Inserted) KxRemoveTreeTimer(Timer);
115
116 /* Reschedule the Thread */
118 }
119 else
120 {
121 /* Increase the Entries */
122 Queue->Header.SignalState++;
123
124 /* Check which mode we're using */
125 if (Head)
126 {
127 /* Insert in the head */
128 InsertHeadList(&Queue->EntryListHead, Entry);
129 }
130 else
131 {
132 /* Insert at the end */
133 InsertTailList(&Queue->EntryListHead, Entry);
134 }
135 }
136
137 /* Return the previous state */
138 return InitialState;
139}
140
141/* PUBLIC FUNCTIONS **********************************************************/
142
143/*
144 * @implemented
145 */
146VOID
147NTAPI
150{
151 /* Initialize the Header */
152 Queue->Header.Type = QueueObject;
153 Queue->Header.Abandoned = 0;
154 Queue->Header.Size = sizeof(KQUEUE) / sizeof(ULONG);
155 Queue->Header.SignalState = 0;
156 InitializeListHead(&(Queue->Header.WaitListHead));
157
158 /* Initialize the Lists */
159 InitializeListHead(&Queue->EntryListHead);
160 InitializeListHead(&Queue->ThreadListHead);
161
162 /* Set the Current and Maximum Count */
163 Queue->CurrentCount = 0;
164 Queue->MaximumCount = (Count == 0) ? (ULONG) KeNumberProcessors : Count;
165}
166
167/*
168 * @implemented
169 */
170LONG
171NTAPI
174{
179
180 /* Lock the Dispatcher Database */
182
183 /* Insert the Queue */
185
186 /* Release the Dispatcher Lock */
188
189 /* Return previous State */
190 return PreviousState;
191}
192
193/*
194 * @implemented
195 */
196LONG
197NTAPI
200{
205
206 /* Lock the Dispatcher Database */
208
209 /* Insert the Queue */
211
212 /* Release the Dispatcher Lock */
214
215 /* Return previous State */
216 return PreviousState;
217}
218
219/*
220 * @implemented
221 *
222 * Returns number of entries in the queue
223 */
224LONG
225NTAPI
227{
228 /* Returns the Signal State */
230 return Queue->Header.SignalState;
231}
232
233/*
234 * @implemented
235 */
237NTAPI
239 IN KPROCESSOR_MODE WaitMode,
241{
242 PLIST_ENTRY QueueEntry;
245 PKQUEUE PreviousQueue;
246 PKWAIT_BLOCK WaitBlock = &Thread->WaitBlock[0];
247 PKWAIT_BLOCK TimerBlock = &Thread->WaitBlock[TIMER_WAIT_BLOCK];
248 PKTIMER Timer = &Thread->Timer;
249 BOOLEAN Swappable;
250 PLARGE_INTEGER OriginalDueTime = Timeout;
251 LARGE_INTEGER DueTime = {{0}}, NewDueTime, InterruptTime;
252 ULONG Hand = 0;
255
256 /* Check if the Lock is already held */
257 if (Thread->WaitNext)
258 {
259 /* It is, so next time don't do expect this */
260 Thread->WaitNext = FALSE;
262 }
263 else
264 {
265 /* Raise IRQL to synch, prepare the wait, then lock the database */
266 Thread->WaitIrql = KeRaiseIrqlToSynchLevel();
269 }
270
271 /*
272 * This is needed so that we can set the new queue right here,
273 * before additional processing
274 */
275 PreviousQueue = Thread->Queue;
276 Thread->Queue = Queue;
277
278 /* Check if this is a different queue */
279 if (Queue != PreviousQueue)
280 {
281 /* Get the current entry */
282 QueueEntry = &Thread->QueueListEntry;
283 if (PreviousQueue)
284 {
285 /* Remove from this list */
286 RemoveEntryList(QueueEntry);
287
288 /* Wake the queue */
289 KiActivateWaiterQueue(PreviousQueue);
290 }
291
292 /* Insert in this new Queue */
293 InsertTailList(&Queue->ThreadListHead, QueueEntry);
294 }
295 else
296 {
297 /* Same queue, decrement waiting threads */
298 Queue->CurrentCount--;
299 }
300
301 /* Loop until the queue is processed */
302 while (TRUE)
303 {
304 /* Check if the counts are valid and if there is still a queued entry */
305 QueueEntry = Queue->EntryListHead.Flink;
306 if ((Queue->CurrentCount < Queue->MaximumCount) &&
307 (QueueEntry != &Queue->EntryListHead))
308 {
309 /* Decrease the number of entries */
310 Queue->Header.SignalState--;
311
312 /* Increase numbef of running threads */
313 Queue->CurrentCount++;
314
315 /* Check if the entry is valid. If not, bugcheck */
316 if (!(QueueEntry->Flink) || !(QueueEntry->Blink))
317 {
318 /* Invalid item */
319 KeBugCheckEx(INVALID_WORK_QUEUE_ITEM,
320 (ULONG_PTR)QueueEntry,
323 (ULONG_PTR)((PWORK_QUEUE_ITEM)QueueEntry)->
325 }
326
327 /* Remove the Entry */
328 RemoveEntryList(QueueEntry);
329 QueueEntry->Flink = NULL;
330
331 /* Nothing to wait on */
332 break;
333 }
334 else
335 {
336 /* Check if a kernel APC is pending and we're below APC_LEVEL */
337 if ((Thread->ApcState.KernelApcPending) &&
338 !(Thread->SpecialApcDisable) && (Thread->WaitIrql < APC_LEVEL))
339 {
340 /* Increment the count and unlock the dispatcher */
341 Queue->CurrentCount++;
343 KiExitDispatcher(Thread->WaitIrql);
344 }
345 else
346 {
347 /* Fail if there's a User APC Pending */
348 if ((WaitMode != KernelMode) &&
349 (Thread->ApcState.UserApcPending))
350 {
351 /* Return the status and increase the pending threads */
352 QueueEntry = (PLIST_ENTRY)STATUS_USER_APC;
353 Queue->CurrentCount++;
354 break;
355 }
356
357 /* Enable the Timeout Timer if there was any specified */
358 if (Timeout)
359 {
360 /* Check if the timer expired */
361 InterruptTime.QuadPart = KeQueryInterruptTime();
362 if ((ULONG64)InterruptTime.QuadPart >= Timer->DueTime.QuadPart)
363 {
364 /* It did, so we don't need to wait */
365 QueueEntry = (PLIST_ENTRY)STATUS_TIMEOUT;
366 Queue->CurrentCount++;
367 break;
368 }
369
370 /* It didn't, so activate it */
371 Timer->Header.Inserted = TRUE;
372 }
373
374 /* Insert the wait block in the list */
375 InsertTailList(&Queue->Header.WaitListHead,
376 &WaitBlock->WaitListEntry);
377
378 /* Setup the wait information */
379 Thread->State = Waiting;
380
381 /* Add the thread to the wait list */
382 KiAddThreadToWaitList(Thread, Swappable);
383
384 /* Activate thread swap */
385 ASSERT(Thread->WaitIrql <= DISPATCH_LEVEL);
387
388 /* Check if we have a timer */
389 if (Timeout)
390 {
391 /* Insert it */
392 KxInsertTimer(Timer, Hand);
393 }
394 else
395 {
396 /* Otherwise, unlock the dispatcher */
398 }
399
400 /* Do the actual swap */
402
403 /* Reset the wait reason */
404 Thread->WaitReason = 0;
405
406 /* Check if we were executing an APC */
408
409 /* Check if we had a timeout */
410 if (Timeout)
411 {
412 /* Recalculate due times */
413 Timeout = KiRecalculateDueTime(OriginalDueTime,
414 &DueTime,
415 &NewDueTime);
416 }
417 }
418
419 /* Start another wait */
420 Thread->WaitIrql = KeRaiseIrqlToSynchLevel();
423 Queue->CurrentCount--;
424 }
425 }
426
427 /* Unlock Database and return */
429 KiExitDispatcher(Thread->WaitIrql);
430 return QueueEntry;
431}
432
433/*
434 * @implemented
435 */
437NTAPI
439{
440 PLIST_ENTRY FirstEntry, NextEntry;
445 ASSERT(IsListEmpty(&Queue->Header.WaitListHead));
446
447 /* Get the Dispatcher Lock */
449
450 /* Check if the list is empty */
451 FirstEntry = Queue->EntryListHead.Flink;
452 if (FirstEntry == &Queue->EntryListHead)
453 {
454 /* We won't return anything */
455 FirstEntry = NULL;
456 }
457 else
458 {
459 /* Remove this entry */
460 RemoveEntryList(&Queue->EntryListHead);
461 }
462
463 /* Loop the list */
464 while (!IsListEmpty(&Queue->ThreadListHead))
465 {
466 /* Get the next entry */
467 NextEntry = Queue->ThreadListHead.Flink;
468
469 /* Get the associated thread */
470 Thread = CONTAINING_RECORD(NextEntry, KTHREAD, QueueListEntry);
471
472 /* Clear its queue */
473 Thread->Queue = NULL;
474
475 /* Remove this entry */
476 RemoveEntryList(NextEntry);
477 }
478
479 /* Release the dispatcher lock */
481
482 /* Exit the dispatcher and return the first entry (if any) */
484 return FirstEntry;
485}
486
487/* EOF */
unsigned char BOOLEAN
#define NULL
Definition: types.h:112
#define TRUE
Definition: types.h:120
#define FALSE
Definition: types.h:117
#define RemoveEntryList(Entry)
Definition: env_spec_w32.h:986
#define InsertTailList(ListHead, Entry)
#define InsertHeadList(ListHead, Entry)
#define IsListEmpty(ListHead)
Definition: env_spec_w32.h:954
UCHAR KIRQL
Definition: env_spec_w32.h:591
#define APC_LEVEL
Definition: env_spec_w32.h:695
#define InitializeListHead(ListHead)
Definition: env_spec_w32.h:944
#define DISPATCH_LEVEL
Definition: env_spec_w32.h:696
_In_opt_ PFILE_OBJECT _In_opt_ PETHREAD Thread
Definition: fltkernel.h:2653
_Must_inspect_result_ _In_ PFLT_CALLBACK_DATA _In_ PFLT_DEFERRED_IO_WORKITEM_ROUTINE WorkerRoutine
Definition: fltkernel.h:1977
Status
Definition: gdiplustypes.h:25
KIRQL NTAPI KeRaiseIrqlToSynchLevel(VOID)
Definition: pic.c:156
#define KeGetCurrentThread
Definition: hal.h:55
FORCEINLINE VOID KiReleaseDispatcherLock(IN KIRQL OldIrql)
Definition: ke_x.h:157
FORCEINLINE VOID KiSetThreadSwapBusy(IN PKTHREAD Thread)
Definition: ke_x.h:210
FORCEINLINE VOID KxRemoveTreeTimer(IN PKTIMER Timer)
Definition: ke_x.h:1004
FORCEINLINE PLARGE_INTEGER KiRecalculateDueTime(IN PLARGE_INTEGER OriginalDueTime, IN PLARGE_INTEGER DueTime, IN OUT PLARGE_INTEGER NewDueTime)
Definition: ke_x.h:785
#define KiAddThreadToWaitList(Thread, Swappable)
Definition: ke_x.h:824
#define KxQueueThreadWait()
Definition: ke_x.h:1208
FORCEINLINE VOID KiReleaseDispatcherLockFromSynchLevel(VOID)
Definition: ke_x.h:174
FORCEINLINE VOID KiAcquireDispatcherLockAtSynchLevel(VOID)
Definition: ke_x.h:165
FORCEINLINE KIRQL KiAcquireDispatcherLock(VOID)
Definition: ke_x.h:149
FORCEINLINE VOID KxInsertTimer(IN PKTIMER Timer, IN ULONG Hand)
Definition: ke_x.h:923
CCHAR KeNumberProcessors
Definition: krnlinit.c:35
#define ASSERT(a)
Definition: mode.c:44
unsigned __int64 ULONG64
Definition: imports.h:198
__int3264 LONG_PTR
Definition: mstsclib_h.h:276
#define KernelMode
Definition: asm.h:34
FORCEINLINE struct _KPRCB * KeGetCurrentPrcb(VOID)
Definition: ketypes.h:1161
_In_ ACCESS_MASK _In_opt_ POBJECT_ATTRIBUTES _In_ EVENT_TYPE _In_ BOOLEAN InitialState
Definition: exfuncs.h:169
@ QueueObject
Definition: ketypes.h:410
@ Waiting
Definition: ketypes.h:393
int Count
Definition: noreturn.cpp:7
#define FASTCALL
Definition: nt_native.h:50
VOID NTAPI KiReadyThread(IN PKTHREAD Thread)
Definition: thrdschd.c:429
LONG_PTR FASTCALL KiSwapThread(IN PKTHREAD Thread, IN PKPRCB Prcb)
Definition: thrdschd.c:355
VOID FASTCALL KiUnwaitThread(IN PKTHREAD Thread, IN LONG_PTR WaitStatus, IN KPRIORITY Increment)
Definition: wait.c:89
#define TIMER_WAIT_BLOCK
Definition: ke.h:164
VOID FASTCALL KiExitDispatcher(KIRQL OldIrql)
LONG NTAPI KeReadStateQueue(IN PKQUEUE Queue)
Definition: queue.c:226
LONG NTAPI KeInsertHeadQueue(IN PKQUEUE Queue, IN PLIST_ENTRY Entry)
Definition: queue.c:172
LONG NTAPI KeInsertQueue(IN PKQUEUE Queue, IN PLIST_ENTRY Entry)
Definition: queue.c:198
LONG NTAPI KiInsertQueue(IN PKQUEUE Queue, IN PLIST_ENTRY Entry, IN BOOLEAN Head)
Definition: queue.c:70
PLIST_ENTRY NTAPI KeRundownQueue(IN PKQUEUE Queue)
Definition: queue.c:438
VOID NTAPI KeInitializeQueue(IN PKQUEUE Queue, IN ULONG Count OPTIONAL)
Definition: queue.c:148
VOID FASTCALL KiActivateWaiterQueue(IN PKQUEUE Queue)
Definition: queue.c:24
PLIST_ENTRY NTAPI KeRemoveQueue(IN PKQUEUE Queue, IN KPROCESSOR_MODE WaitMode, IN PLARGE_INTEGER Timeout OPTIONAL)
Definition: queue.c:238
#define STATUS_TIMEOUT
Definition: ntstatus.h:81
#define STATUS_USER_APC
Definition: ntstatus.h:78
#define STATUS_KERNEL_APC
Definition: ntstatus.h:79
long LONG
Definition: pedump.c:60
static ULONG Timeout
Definition: ping.c:61
VOID NTAPI KeBugCheckEx(_In_ ULONG BugCheckCode, _In_ ULONG_PTR BugCheckParameter1, _In_ ULONG_PTR BugCheckParameter2, _In_ ULONG_PTR BugCheckParameter3, _In_ ULONG_PTR BugCheckParameter4)
Definition: rtlcompat.c:108
#define ASSERT_IRQL_LESS_OR_EQUAL(x)
Definition: debug.h:251
#define KeQueryInterruptTime()
Definition: ke.h:37
PULONG MinorVersion OPTIONAL
Definition: CrossNt.h:68
base of all file and directory entries
Definition: entries.h:83
struct _KTHREAD * Thread
Definition: ketypes.h:465
LIST_ENTRY WaitListEntry
Definition: ketypes.h:456
Definition: typedefs.h:120
struct _LIST_ENTRY * Blink
Definition: typedefs.h:122
struct _LIST_ENTRY * Flink
Definition: typedefs.h:121
#define LONG_PTR
Definition: treelist.c:79
struct _LIST_ENTRY * PLIST_ENTRY
#define NTAPI
Definition: typedefs.h:36
uint32_t ULONG_PTR
Definition: typedefs.h:65
#define IN
Definition: typedefs.h:39
#define CONTAINING_RECORD(address, type, field)
Definition: typedefs.h:260
uint32_t ULONG
Definition: typedefs.h:59
_In_ WDF_POWER_DEVICE_STATE PreviousState
Definition: wdfdevice.h:829
_Must_inspect_result_ _In_ WDFDEVICE _In_ PIRP _In_ WDFQUEUE Queue
Definition: wdfdevice.h:2225
_In_ WDFTIMER _In_ LONGLONG DueTime
Definition: wdftimer.h:190
#define IO_NO_INCREMENT
Definition: iotypes.h:598
_Requires_lock_held_ Interrupt _Releases_lock_ Interrupt _In_ _IRQL_restores_ KIRQL OldIrql
Definition: kefuncs.h:778
#define ASSERT_QUEUE(Q)
Definition: ketypes.h:1413
@ WrQueue
Definition: ketypes.h:430
struct _KQUEUE KQUEUE
CCHAR KPROCESSOR_MODE
Definition: ketypes.h:7