ReactOS  r75907
queue.c
Go to the documentation of this file.
1 /*
2  * PROJECT: ReactOS Kernel
3  * LICENSE: GPL - See COPYING in the top level directory
4  * FILE: ntoskrnl/ke/queue.c
5  * PURPOSE: Implements kernel queues
6  * PROGRAMMERS: Alex Ionescu (alex.ionescu@reactos.org)
7  * Gunnar Dalsnes
8  * Eric Kohl
9  */
10 
11 /* INCLUDES ******************************************************************/
12 
13 #include <ntoskrnl.h>
14 #define NDEBUG
15 #include <debug.h>
16 
17 /* PRIVATE FUNCTIONS *********************************************************/
18 
19 /*
20  * Called when a thread which has a queue entry is entering a wait state
21  */
22 VOID
25 {
26  PLIST_ENTRY QueueEntry;
27  PLIST_ENTRY WaitEntry;
28  PKWAIT_BLOCK WaitBlock;
30  ASSERT_QUEUE(Queue);
31 
32  /* Decrement the number of active threads */
33  Queue->CurrentCount--;
34 
35  /* Make sure the counts are OK */
36  if (Queue->CurrentCount < Queue->MaximumCount)
37  {
38  /* Get the Queue Entry */
39  QueueEntry = Queue->EntryListHead.Flink;
40 
41  /* Get the Wait Entry */
42  WaitEntry = Queue->Header.WaitListHead.Blink;
43 
44  /* Make sure that the Queue entries are not part of empty lists */
45  if ((WaitEntry != &Queue->Header.WaitListHead) &&
46  (QueueEntry != &Queue->EntryListHead))
47  {
48  /* Remove this entry */
49  RemoveEntryList(QueueEntry);
50  QueueEntry->Flink = NULL;
51 
52  /* Decrease the Signal State */
53  Queue->Header.SignalState--;
54 
55  /* Unwait the Thread */
56  WaitBlock = CONTAINING_RECORD(WaitEntry,
58  WaitListEntry);
59  Thread = WaitBlock->Thread;
60  KiUnwaitThread(Thread, (LONG_PTR)QueueEntry, IO_NO_INCREMENT);
61  }
62  }
63 }
64 
65 /*
66  * Returns the previous number of entries in the queue
67  */
68 LONG
69 NTAPI
72  IN BOOLEAN Head)
73 {
76  PKWAIT_BLOCK WaitBlock;
77  PLIST_ENTRY WaitEntry;
78  PKTIMER Timer;
79  ASSERT_QUEUE(Queue);
80 
81  /* Save the old state */
82  InitialState = Queue->Header.SignalState;
83 
84  /* Get the Entry */
85  WaitEntry = Queue->Header.WaitListHead.Blink;
86 
87  /*
88  * Why the KeGetCurrentThread()->Queue != Queue?
89  * KiInsertQueue might be called from an APC for the current thread.
90  * -Gunnar
91  */
92  if ((Queue->CurrentCount < Queue->MaximumCount) &&
93  (WaitEntry != &Queue->Header.WaitListHead) &&
94  ((Thread->Queue != Queue) ||
95  (Thread->WaitReason != WrQueue)))
96  {
97  /* Remove the wait entry */
98  RemoveEntryList(WaitEntry);
99 
100  /* Get the Wait Block and Thread */
101  WaitBlock = CONTAINING_RECORD(WaitEntry, KWAIT_BLOCK, WaitListEntry);
102  Thread = WaitBlock->Thread;
103 
104  /* Remove the queue from the thread's wait list */
105  Thread->WaitStatus = (LONG_PTR)Entry;
106  if (Thread->WaitListEntry.Flink) RemoveEntryList(&Thread->WaitListEntry);
107 
108  /* Increase the active threads and remove any wait reason */
109  Queue->CurrentCount++;
110  Thread->WaitReason = 0;
111 
112  /* Check if there's a Thread Timer */
113  Timer = &Thread->Timer;
114  if (Timer->Header.Inserted) KxRemoveTreeTimer(Timer);
115 
116  /* Reschedule the Thread */
117  KiReadyThread(Thread);
118  }
119  else
120  {
121  /* Increase the Entries */
122  Queue->Header.SignalState++;
123 
124  /* Check which mode we're using */
125  if (Head)
126  {
127  /* Insert in the head */
128  InsertHeadList(&Queue->EntryListHead, Entry);
129  }
130  else
131  {
132  /* Insert at the end */
133  InsertTailList(&Queue->EntryListHead, Entry);
134  }
135  }
136 
137  /* Return the previous state */
138  return InitialState;
139 }
140 
141 /* PUBLIC FUNCTIONS **********************************************************/
142 
143 /*
144  * @implemented
145  */
146 VOID
147 NTAPI
150 {
151  /* Initialize the Header */
152  Queue->Header.Type = QueueObject;
153  Queue->Header.Abandoned = 0;
154  Queue->Header.Size = sizeof(KQUEUE) / sizeof(ULONG);
155  Queue->Header.SignalState = 0;
156  InitializeListHead(&(Queue->Header.WaitListHead));
157 
158  /* Initialize the Lists */
159  InitializeListHead(&Queue->EntryListHead);
160  InitializeListHead(&Queue->ThreadListHead);
161 
162  /* Set the Current and Maximum Count */
163  Queue->CurrentCount = 0;
164  Queue->MaximumCount = (Count == 0) ? (ULONG) KeNumberProcessors : Count;
165 }
166 
167 /*
168  * @implemented
169  */
170 LONG
171 NTAPI
174 {
176  KIRQL OldIrql;
177  ASSERT_QUEUE(Queue);
179 
180  /* Lock the Dispatcher Database */
181  OldIrql = KiAcquireDispatcherLock();
182 
183  /* Insert the Queue */
184  PreviousState = KiInsertQueue(Queue, Entry, TRUE);
185 
186  /* Release the Dispatcher Lock */
187  KiReleaseDispatcherLock(OldIrql);
188 
189  /* Return previous State */
190  return PreviousState;
191 }
192 
193 /*
194  * @implemented
195  */
196 LONG
197 NTAPI
200 {
202  KIRQL OldIrql;
203  ASSERT_QUEUE(Queue);
205 
206  /* Lock the Dispatcher Database */
207  OldIrql = KiAcquireDispatcherLock();
208 
209  /* Insert the Queue */
210  PreviousState = KiInsertQueue(Queue, Entry, FALSE);
211 
212  /* Release the Dispatcher Lock */
213  KiReleaseDispatcherLock(OldIrql);
214 
215  /* Return previous State */
216  return PreviousState;
217 }
218 
219 /*
220  * @implemented
221  *
222  * Returns number of entries in the queue
223  */
224 LONG
225 NTAPI
227 {
228  /* Returns the Signal State */
229  ASSERT_QUEUE(Queue);
230  return Queue->Header.SignalState;
231 }
232 
233 /*
234  * @implemented
235  */
237 NTAPI
239  IN KPROCESSOR_MODE WaitMode,
241 {
242  PLIST_ENTRY QueueEntry;
245  PKQUEUE PreviousQueue;
246  PKWAIT_BLOCK WaitBlock = &Thread->WaitBlock[0];
247  PKWAIT_BLOCK TimerBlock = &Thread->WaitBlock[TIMER_WAIT_BLOCK];
248  PKTIMER Timer = &Thread->Timer;
249  BOOLEAN Swappable;
250  PLARGE_INTEGER OriginalDueTime = Timeout;
251  LARGE_INTEGER DueTime = {{0}}, NewDueTime, InterruptTime;
252  ULONG Hand = 0;
253  ASSERT_QUEUE(Queue);
255 
256  /* Check if the Lock is already held */
257  if (Thread->WaitNext)
258  {
259  /* It is, so next time don't do expect this */
260  Thread->WaitNext = FALSE;
262  }
263  else
264  {
265  /* Raise IRQL to synch, prepare the wait, then lock the database */
266  Thread->WaitIrql = KeRaiseIrqlToSynchLevel();
269  }
270 
271  /*
272  * This is needed so that we can set the new queue right here,
273  * before additional processing
274  */
275  PreviousQueue = Thread->Queue;
276  Thread->Queue = Queue;
277 
278  /* Check if this is a different queue */
279  if (Queue != PreviousQueue)
280  {
281  /* Get the current entry */
282  QueueEntry = &Thread->QueueListEntry;
283  if (PreviousQueue)
284  {
285  /* Remove from this list */
286  RemoveEntryList(QueueEntry);
287 
288  /* Wake the queue */
289  KiActivateWaiterQueue(PreviousQueue);
290  }
291 
292  /* Insert in this new Queue */
293  InsertTailList(&Queue->ThreadListHead, QueueEntry);
294  }
295  else
296  {
297  /* Same queue, decrement waiting threads */
298  Queue->CurrentCount--;
299  }
300 
301  /* Loop until the queue is processed */
302  while (TRUE)
303  {
304  /* Check if the counts are valid and if there is still a queued entry */
305  QueueEntry = Queue->EntryListHead.Flink;
306  if ((Queue->CurrentCount < Queue->MaximumCount) &&
307  (QueueEntry != &Queue->EntryListHead))
308  {
309  /* Decrease the number of entries */
310  Queue->Header.SignalState--;
311 
312  /* Increase numbef of running threads */
313  Queue->CurrentCount++;
314 
315  /* Check if the entry is valid. If not, bugcheck */
316  if (!(QueueEntry->Flink) || !(QueueEntry->Blink))
317  {
318  /* Invalid item */
319  KeBugCheckEx(INVALID_WORK_QUEUE_ITEM,
320  (ULONG_PTR)QueueEntry,
321  (ULONG_PTR)Queue,
322  (ULONG_PTR)NULL,
323  (ULONG_PTR)((PWORK_QUEUE_ITEM)QueueEntry)->
324  WorkerRoutine);
325  }
326 
327  /* Remove the Entry */
328  RemoveEntryList(QueueEntry);
329  QueueEntry->Flink = NULL;
330 
331  /* Nothing to wait on */
332  break;
333  }
334  else
335  {
336  /* Check if a kernel APC is pending and we're below APC_LEVEL */
337  if ((Thread->ApcState.KernelApcPending) &&
338  !(Thread->SpecialApcDisable) && (Thread->WaitIrql < APC_LEVEL))
339  {
340  /* Increment the count and unlock the dispatcher */
341  Queue->CurrentCount++;
343  KiExitDispatcher(Thread->WaitIrql);
344  }
345  else
346  {
347  /* Fail if there's a User APC Pending */
348  if ((WaitMode != KernelMode) &&
349  (Thread->ApcState.UserApcPending))
350  {
351  /* Return the status and increase the pending threads */
352  QueueEntry = (PLIST_ENTRY)STATUS_USER_APC;
353  Queue->CurrentCount++;
354  break;
355  }
356 
357  /* Enable the Timeout Timer if there was any specified */
358  if (Timeout)
359  {
360  /* Check if the timer expired */
361  InterruptTime.QuadPart = KeQueryInterruptTime();
362  if ((ULONG64)InterruptTime.QuadPart >= Timer->DueTime.QuadPart)
363  {
364  /* It did, so we don't need to wait */
365  QueueEntry = (PLIST_ENTRY)STATUS_TIMEOUT;
366  Queue->CurrentCount++;
367  break;
368  }
369 
370  /* It didn't, so activate it */
371  Timer->Header.Inserted = TRUE;
372  }
373 
374  /* Insert the wait block in the list */
375  InsertTailList(&Queue->Header.WaitListHead,
376  &WaitBlock->WaitListEntry);
377 
378  /* Setup the wait information */
379  Thread->State = Waiting;
380 
381  /* Add the thread to the wait list */
382  KiAddThreadToWaitList(Thread, Swappable);
383 
384  /* Activate thread swap */
385  ASSERT(Thread->WaitIrql <= DISPATCH_LEVEL);
386  KiSetThreadSwapBusy(Thread);
387 
388  /* Check if we have a timer */
389  if (Timeout)
390  {
391  /* Insert it */
392  KxInsertTimer(Timer, Hand);
393  }
394  else
395  {
396  /* Otherwise, unlock the dispatcher */
398  }
399 
400  /* Do the actual swap */
401  Status = KiSwapThread(Thread, KeGetCurrentPrcb());
402 
403  /* Reset the wait reason */
404  Thread->WaitReason = 0;
405 
406  /* Check if we were executing an APC */
407  if (Status != STATUS_KERNEL_APC) return (PLIST_ENTRY)Status;
408 
409  /* Check if we had a timeout */
410  if (Timeout)
411  {
412  /* Recalculate due times */
413  Timeout = KiRecalculateDueTime(OriginalDueTime,
414  &DueTime,
415  &NewDueTime);
416  }
417  }
418 
419  /* Start another wait */
420  Thread->WaitIrql = KeRaiseIrqlToSynchLevel();
423  Queue->CurrentCount--;
424  }
425  }
426 
427  /* Unlock Database and return */
429  KiExitDispatcher(Thread->WaitIrql);
430  return QueueEntry;
431 }
432 
433 /*
434  * @implemented
435  */
437 NTAPI
439 {
440  PLIST_ENTRY FirstEntry, NextEntry;
442  KIRQL OldIrql;
443  ASSERT_QUEUE(Queue);
445  ASSERT(IsListEmpty(&Queue->Header.WaitListHead));
446 
447  /* Get the Dispatcher Lock */
448  OldIrql = KiAcquireDispatcherLock();
449 
450  /* Check if the list is empty */
451  FirstEntry = Queue->EntryListHead.Flink;
452  if (FirstEntry == &Queue->EntryListHead)
453  {
454  /* We won't return anything */
455  FirstEntry = NULL;
456  }
457  else
458  {
459  /* Remove this entry */
460  RemoveEntryList(&Queue->EntryListHead);
461  }
462 
463  /* Loop the list */
464  while (!IsListEmpty(&Queue->ThreadListHead))
465  {
466  /* Get the next entry */
467  NextEntry = Queue->ThreadListHead.Flink;
468 
469  /* Get the associated thread */
470  Thread = CONTAINING_RECORD(NextEntry, KTHREAD, QueueListEntry);
471 
472  /* Clear its queue */
473  Thread->Queue = NULL;
474 
475  /* Remove this entry */
476  RemoveEntryList(NextEntry);
477  }
478 
479  /* Release the dispatcher lock */
481 
482  /* Exit the dispatcher and return the first entry (if any) */
483  KiExitDispatcher(OldIrql);
484  return FirstEntry;
485 }
486 
487 /* EOF */
struct _LIST_ENTRY * PLIST_ENTRY
#define ASSERT_IRQL_LESS_OR_EQUAL(x)
Definition: debug.h:250
#define IN
Definition: typedefs.h:39
VOID FASTCALL KiUnwaitThread(IN PKTHREAD Thread, IN LONG_PTR WaitStatus, IN KPRIORITY Increment)
Definition: wait.c:89
#define TRUE
Definition: types.h:120
LONG NTAPI KeInsertHeadQueue(IN PKQUEUE Queue, IN PLIST_ENTRY Entry)
Definition: queue.c:172
SHORT SpecialApcDisable
Definition: ketypes.h:1052
LIST_ENTRY WaitListEntry
Definition: ketypes.h:444
ASSERT((InvokeOnSuccess||InvokeOnError||InvokeOnCancel)?(CompletionRoutine!=NULL):TRUE)
_In_ LARGE_INTEGER DueTime
Definition: kefuncs.h:524
struct _LIST_ENTRY * Blink
Definition: typedefs.h:121
FORCEINLINE VOID InsertHeadList(_Inout_ PLIST_ENTRY ListHead, _Inout_ __drv_aliasesMem PLIST_ENTRY Entry)
Definition: rtlfuncs.h:201
VOID FASTCALL KiActivateWaiterQueue(IN PKQUEUE Queue)
Definition: queue.c:24
_Must_inspect_result_ _In_ PFLT_CALLBACK_DATA _In_ PFLT_DEFERRED_IO_WORKITEM_ROUTINE WorkerRoutine
Definition: fltkernel.h:1977
FORCEINLINE VOID KiSetThreadSwapBusy(IN PKTHREAD Thread)
Definition: ke_x.h:204
FORCEINLINE struct _KPRCB * KeGetCurrentPrcb(VOID)
Definition: ketypes.h:1054
FORCEINLINE VOID KxRemoveTreeTimer(IN PKTIMER Timer)
Definition: ke_x.h:995
struct _KTHREAD * Thread
Definition: ketypes.h:445
_Inout_ __drv_aliasesMem PSLIST_ENTRY _Inout_ PSLIST_ENTRY _In_ ULONG Count
Definition: exfuncs.h:1015
#define InsertTailList(ListHead, Entry)
#define FASTCALL
Definition: nt_native.h:50
_Must_inspect_result_ FORCEINLINE BOOLEAN IsListEmpty(_In_ const LIST_ENTRY *ListHead)
Definition: rtlfuncs.h:57
uint32_t ULONG_PTR
Definition: typedefs.h:64
PLIST_ENTRY NTAPI KeRundownQueue(IN PKQUEUE Queue)
Definition: queue.c:438
#define TIMER_WAIT_BLOCK
Definition: ke.h:155
FORCEINLINE BOOLEAN RemoveEntryList(_In_ PLIST_ENTRY Entry)
Definition: rtlfuncs.h:105
#define STATUS_TIMEOUT
Definition: ntstatus.h:81
UCHAR KIRQL
Definition: env_spec_w32.h:591
LONG NTAPI KiInsertQueue(IN PKQUEUE Queue, IN PLIST_ENTRY Entry, IN BOOLEAN Head)
Definition: queue.c:70
NTSTATUS(* NTAPI)(IN PFILE_FULL_EA_INFORMATION EaBuffer, IN ULONG EaLength, OUT PULONG ErrorOffset)
Definition: IoEaTest.cpp:117
ULONGLONG QuadPart
Definition: ms-dtyp.idl:185
#define FALSE
Definition: types.h:117
long LONG
Definition: pedump.c:60
DISPATCHER_HEADER Header
Definition: ketypes.h:825
KAPC_STATE ApcState
Definition: ketypes.h:969
uint64_t ULONG64
Definition: typedefs.h:66
ULONG WaitNext
Definition: ketypes.h:952
#define KiAddThreadToWaitList(Thread, Swappable)
Definition: ke_x.h:816
LONG NTAPI KeReadStateQueue(IN PKQUEUE Queue)
Definition: queue.c:226
smooth NULL
Definition: ftsmooth.c:513
LIST_ENTRY QueueListEntry
Definition: ketypes.h:1180
FORCEINLINE VOID KxInsertTimer(IN PKTIMER Timer, IN ULONG Hand)
Definition: ke_x.h:915
#define STATUS_KERNEL_APC
Definition: ntstatus.h:79
KTIMER Timer
Definition: ketypes.h:1060
UCHAR WaitReason
Definition: ketypes.h:1249
VOID FASTCALL KiExitDispatcher(KIRQL OldIrql)
VOID NTAPI KiReadyThread(IN PKTHREAD Thread)
Definition: thrdschd.c:429
struct _LIST_ENTRY * Flink
Definition: typedefs.h:120
unsigned char BOOLEAN
_In_ ACCESS_MASK _In_opt_ POBJECT_ATTRIBUTES _In_ EVENT_TYPE _In_ BOOLEAN InitialState
Definition: exfuncs.h:165
_In_opt_ PFILE_OBJECT _In_opt_ PETHREAD Thread
Definition: fltkernel.h:2653
PKQUEUE Queue
Definition: ketypes.h:1044
CCHAR KPROCESSOR_MODE
Definition: ketypes.h:7
ULARGE_INTEGER DueTime
Definition: ketypes.h:826
LIST_ENTRY WaitListEntry
Definition: ketypes.h:1041
_Requires_lock_held_ Interrupt _Releases_lock_ Interrupt _In_ _IRQL_restores_ KIRQL OldIrql
Definition: kefuncs.h:803
PLIST_ENTRY NTAPI KeRemoveQueue(IN PKQUEUE Queue, IN KPROCESSOR_MODE WaitMode, IN PLARGE_INTEGER Timeout OPTIONAL)
Definition: queue.c:238
Definition: typedefs.h:118
CCHAR KeNumberProcessors
Definition: krnlinit.c:35
Status
Definition: gdiplustypes.h:24
LONG_PTR WaitStatus
Definition: ketypes.h:1002
#define DISPATCH_LEVEL
Definition: env_spec_w32.h:696
#define STATUS_USER_APC
Definition: ntstatus.h:78
_In_ PLARGE_INTEGER _In_opt_ PTIMER_APC_ROUTINE _In_opt_ PVOID _In_ BOOLEAN _In_opt_ LONG _Out_opt_ PBOOLEAN PreviousState
Definition: zwfuncs.h:428
FORCEINLINE PLARGE_INTEGER KiRecalculateDueTime(IN PLARGE_INTEGER OriginalDueTime, IN PLARGE_INTEGER DueTime, IN OUT PLARGE_INTEGER NewDueTime)
Definition: ke_x.h:777
UINT Timer
Definition: capclock.c:11
static ULONG Timeout
Definition: ping.c:61
FORCEINLINE VOID KiReleaseDispatcherLock(IN KIRQL OldIrql)
Definition: ke_x.h:152
FORCEINLINE KIRQL KiAcquireDispatcherLock(VOID)
Definition: ke_x.h:144
LONG_PTR FASTCALL KiSwapThread(IN PKTHREAD Thread, IN PKPRCB Prcb)
Definition: thrdschd.c:355
#define InitializeListHead(ListHead)
Definition: env_spec_w32.h:944
LONG NTAPI KeInsertQueue(IN PKQUEUE Queue, IN PLIST_ENTRY Entry)
Definition: queue.c:198
#define KxQueueThreadWait()
Definition: ke_x.h:1199
__int3264 LONG_PTR
Definition: mstsclib_h.h:276
#define ASSERT_QUEUE(Q)
Definition: ketypes.h:1262
VOID NTAPI KeInitializeQueue(IN PKQUEUE Queue, IN ULONG Count OPTIONAL)
Definition: queue.c:148
FORCEINLINE VOID KiReleaseDispatcherLockFromDpcLevel(VOID)
Definition: ke_x.h:168
volatile UCHAR State
Definition: ketypes.h:997
unsigned int ULONG
Definition: retypes.h:1
#define IO_NO_INCREMENT
Definition: iotypes.h:565
#define CONTAINING_RECORD(address, type, field)
Definition: typedefs.h:260
struct _KQUEUE KQUEUE
#define KeGetCurrentThread
Definition: hal.h:44
#define APC_LEVEL
Definition: env_spec_w32.h:695
ULONGLONG NTAPI KeQueryInterruptTime(VOID)
Definition: clock.c:203
IN HDEVINFO IN PSP_DEVINFO_DATA DeviceInfoData OPTIONAL
Definition: devinst.c:44
base of all file and directory entries
Definition: entries.h:82
VOID NTAPI KeBugCheckEx(_In_ ULONG BugCheckCode, _In_ ULONG_PTR BugCheckParameter1, _In_ ULONG_PTR BugCheckParameter2, _In_ ULONG_PTR BugCheckParameter3, _In_ ULONG_PTR BugCheckParameter4)
Definition: rtlcompat.c:90
#define LONG_PTR
Definition: generated.c:23
FORCEINLINE VOID KiAcquireDispatcherLockAtDpcLevel(VOID)
Definition: ke_x.h:160
KIRQL WaitIrql
Definition: ketypes.h:999
KIRQL NTAPI KeRaiseIrqlToSynchLevel(VOID)
Definition: pic.c:156