ReactOS  0.4.13-dev-259-g5ca9c9c
worker.c
Go to the documentation of this file.
1 /*
2  * COPYRIGHT: See COPYING in the top level directory
3  * PROJECT: ReactOS Kernel Streaming
4  * FILE: drivers/ksfilter/ks/worker.c
5  * PURPOSE: KS Allocator functions
6  * PROGRAMMER: Johannes Anderwald
7  */
8 
9 #include "precomp.h"
10 
11 #define NDEBUG
12 #include <debug.h>
13 
14 /* ===============================================================
15  Worker Management Functions
16 */
17 
18 typedef struct
19 {
21 
28 
31 
32 VOID
33 NTAPI
36 {
37  PKSIWORKER KsWorker;
38  KIRQL OldLevel;
39  PWORK_QUEUE_ITEM WorkItem;
41 
42 
43  /* get ks worker implementation */
44  KsWorker = (PKSIWORKER)Context;
45 
46  /* acquire back the lock */
47  KeAcquireSpinLock(&KsWorker->Lock, &OldLevel);
48 
49  do
50  {
51  /* sanity check */
52  ASSERT(!IsListEmpty(&KsWorker->QueuedWorkItems));
53 
54  /* remove first entry */
55  Entry = RemoveHeadList(&KsWorker->QueuedWorkItems);
56  /* get offset to work item */
58 
59  /* release lock as the callback might call one KsWorker functions */
60  KeReleaseSpinLock(&KsWorker->Lock, OldLevel);
61 
62  /* now dispatch the work */
63  WorkItem->WorkerRoutine(WorkItem->Parameter);
64 
65  /* acquire back the lock */
66  KeAcquireSpinLock(&KsWorker->Lock, &OldLevel);
67 
68  /* decrement queued work item count */
70 
71  }while(KsWorker->QueuedWorkItemCount);
72 
73  /* release the lock */
74  KeReleaseSpinLock(&KsWorker->Lock, OldLevel);
75 
76  /* signal completion event */
77  KeSetEvent(&KsWorker->Event, IO_NO_INCREMENT, FALSE);
78 
79 }
80 
81 
82 /*
83  @implemented
84 */
87 NTAPI
89  IN WORK_QUEUE_TYPE WorkQueueType,
90  OUT PKSWORKER* Worker)
91 {
92  PKSIWORKER KsWorker;
93 
94 
95  if (WorkQueueType != CriticalWorkQueue &&
96  WorkQueueType != DelayedWorkQueue &&
97  WorkQueueType != HyperCriticalWorkQueue)
98  {
100  }
101 
102  /* allocate worker context */
103  KsWorker = AllocateItem(NonPagedPool, sizeof(KSIWORKER));
104  if (!KsWorker)
106 
107  /* initialize the work ctx */
108  ExInitializeWorkItem(&KsWorker->WorkItem, WorkItemRoutine, (PVOID)KsWorker);
109  /* setup type */
110  KsWorker->Type = WorkQueueType;
111  /* Initialize work item queue */
112  InitializeListHead(&KsWorker->QueuedWorkItems);
113  /* initialize work item lock */
114  KeInitializeSpinLock(&KsWorker->Lock);
115  /* initialize event */
116  KeInitializeEvent(&KsWorker->Event, NotificationEvent, FALSE);
117 
118  *Worker = KsWorker;
119  return STATUS_SUCCESS;
120 }
121 
122 /*
123  @implemented
124 */
125 KSDDKAPI
126 VOID
127 NTAPI
129  IN PKSWORKER Worker)
130 {
131  PKSIWORKER KsWorker;
132  KIRQL OldIrql;
133 
134  if (!Worker)
135  return;
136 
137  /* get ks worker implementation */
138  KsWorker = (PKSIWORKER)Worker;
139  /* acquire spinlock */
140  KeAcquireSpinLock(&KsWorker->Lock, &OldIrql);
141  /* fake status running to avoid work items to be queued by the counted worker */
142  KsWorker->Counter = 1;
143  /* is there currently a work item active */
144  if (KsWorker->QueuedWorkItemCount)
145  {
146  /* release the lock */
147  KeReleaseSpinLock(&KsWorker->Lock, OldIrql);
148  /* wait for the worker routine to finish */
150  }
151  else
152  {
153  /* no work item active, just release the lock */
154  KeReleaseSpinLock(&KsWorker->Lock, OldIrql);
155  }
156  /* free worker context */
157  FreeItem(KsWorker);
158 }
159 
160 /*
161  @implemented
162 */
163 KSDDKAPI
164 NTSTATUS
165 NTAPI
167  IN WORK_QUEUE_TYPE WorkQueueType,
168  IN PWORK_QUEUE_ITEM CountedWorkItem,
169  OUT PKSWORKER* Worker)
170 {
172  PKSIWORKER KsWorker;
173 
174  /* check for counted work item parameter */
175  if (!CountedWorkItem)
177 
178  /* create the work ctx */
179  Status = KsRegisterWorker(WorkQueueType, Worker);
180  /* check for success */
181  if (NT_SUCCESS(Status))
182  {
183  /* get ks worker implementation */
184  KsWorker = *(PKSIWORKER*)Worker;
185  /* store counted work item */
186  KsWorker->CountedWorkItem = CountedWorkItem;
187  }
188 
189  return Status;
190 }
191 
192 /*
193  @implemented
194 */
195 KSDDKAPI
196 ULONG
197 NTAPI
199  IN PKSWORKER Worker)
200 {
201  PKSIWORKER KsWorker;
202  LONG Counter;
203 
204  /* did the caller pass a work ctx */
205  if (!Worker)
207 
208  /* get ks worker implementation */
209  KsWorker = (PKSIWORKER)Worker;
210  /* decrement counter */
211  Counter = InterlockedDecrement(&KsWorker->Counter);
212  /* return result */
213  return Counter;
214 }
215 
216 /*
217  @implemented
218 */
219 KSDDKAPI
220 ULONG
221 NTAPI
223  IN PKSWORKER Worker)
224 {
225  PKSIWORKER KsWorker;
226  LONG Counter;
227 
228  /* did the caller pass a work ctx */
229  if (!Worker)
231 
232  /* get ks worker implementation */
233  KsWorker = (PKSIWORKER)Worker;
234  /* increment counter */
235  Counter = InterlockedIncrement(&KsWorker->Counter);
236  if (Counter == 1)
237  {
238  /* this is the first work item in list, so queue a real work item */
239  KsQueueWorkItem(Worker, KsWorker->CountedWorkItem);
240  }
241 
242  /* return current counter */
243  return Counter;
244 }
245 
246 /*
247  @implemented
248 */
249 KSDDKAPI
250 NTSTATUS
251 NTAPI
253  IN PKSWORKER Worker,
254  IN PWORK_QUEUE_ITEM WorkItem)
255 {
256  PKSIWORKER KsWorker;
257  KIRQL OldIrql;
258 
259  /* check for all parameters */
260  if (!Worker || !WorkItem)
262 
263  /* get ks worker implementation */
264  KsWorker = (PKSIWORKER)Worker;
265  /* lock the work queue */
266  KeAcquireSpinLock(&KsWorker->Lock, &OldIrql);
267  /* insert work item to list */
268  InsertTailList(&KsWorker->QueuedWorkItems, &WorkItem->List);
269  /* increment active count */
271  /* is this the first work item */
272  if (KsWorker->QueuedWorkItemCount == 1)
273  {
274  /* clear event */
275  KeClearEvent(&KsWorker->Event);
276  /* it is, queue it */
277  ExQueueWorkItem(&KsWorker->WorkItem, KsWorker->Type);
278  }
279  /* release lock */
280  KeReleaseSpinLock(&KsWorker->Lock, OldIrql);
281 
282  return STATUS_SUCCESS;
283 }
KSDDKAPI NTSTATUS NTAPI KsRegisterWorker(IN WORK_QUEUE_TYPE WorkQueueType, OUT PKSWORKER *Worker)
Definition: worker.c:88
#define IN
Definition: typedefs.h:38
VOID NTAPI ExQueueWorkItem(IN PWORK_QUEUE_ITEM WorkItem, IN WORK_QUEUE_TYPE QueueType)
Definition: work.c:717
#define STATUS_INSUFFICIENT_RESOURCES
Definition: udferr_usr.h:158
KSDDKAPI NTSTATUS NTAPI KsQueueWorkItem(IN PKSWORKER Worker, IN PWORK_QUEUE_ITEM WorkItem)
Definition: worker.c:252
struct _Entry Entry
Definition: kefuncs.h:640
#define STATUS_INVALID_PARAMETER
Definition: udferr_usr.h:135
PVOID AllocateItem(IN POOL_TYPE PoolType, IN SIZE_T NumberOfBytes)
Definition: misc.c:30
LONG NTSTATUS
Definition: precomp.h:26
KEVENT Event
Definition: worker.c:22
VOID NTAPI WorkItemRoutine(IN PVOID Context)
Definition: worker.c:34
#define InsertTailList(ListHead, Entry)
struct KSIWORKER * PKSIWORKER
LONG NTAPI KeSetEvent(IN PKEVENT Event, IN KPRIORITY Increment, IN BOOLEAN Wait)
Definition: eventobj.c:159
LONG Counter
Definition: worker.c:25
NTSTATUS NTAPI KeWaitForSingleObject(IN PVOID Object, IN KWAIT_REASON WaitReason, IN KPROCESSOR_MODE WaitMode, IN BOOLEAN Alertable, IN PLARGE_INTEGER Timeout OPTIONAL)
Definition: wait.c:416
volatile PVOID Parameter
Definition: extypes.h:205
#define KSDDKAPI
Definition: ks.h:40
_Must_inspect_result_ FORCEINLINE BOOLEAN IsListEmpty(_In_ const LIST_ENTRY *ListHead)
Definition: rtlfuncs.h:57
WORK_QUEUE_ITEM WorkItem
Definition: worker.c:20
KSDDKAPI ULONG NTAPI KsIncrementCountedWorker(IN PKSWORKER Worker)
Definition: worker.c:222
UCHAR KIRQL
Definition: env_spec_w32.h:591
VOID FreeItem(IN PVOID Item)
Definition: misc.c:43
NTSTATUS(* NTAPI)(IN PFILE_FULL_EA_INFORMATION EaBuffer, IN ULONG EaLength, OUT PULONG ErrorOffset)
Definition: IoEaTest.cpp:117
long LONG
Definition: pedump.c:60
KSPIN_LOCK Lock
Definition: worker.c:23
FORCEINLINE VOID KeInitializeSpinLock(_Out_ PKSPIN_LOCK SpinLock)
Definition: kefuncs.h:251
smooth NULL
Definition: ftsmooth.c:416
#define STATUS_INVALID_PARAMETER_2
Definition: ntstatus.h:462
FORCEINLINE PLIST_ENTRY RemoveHeadList(_Inout_ PLIST_ENTRY ListHead)
Definition: rtlfuncs.h:128
PFLT_MESSAGE_WAITER_QUEUE CONTAINING_RECORD(Csq, DEVICE_EXTENSION, IrpQueue)) -> WaiterQ.mLock) _IRQL_raises_(DISPATCH_LEVEL) VOID NTAPI FltpAcquireMessageWaiterLock(_In_ PIO_CSQ Csq, _Out_ PKIRQL Irql)
Definition: Messaging.c:560
KSDDKAPI ULONG NTAPI KsDecrementCountedWorker(IN PKSWORKER Worker)
Definition: worker.c:198
#define ExInitializeWorkItem(Item, Routine, Context)
Definition: exfuncs.h:265
LIST_ENTRY List
Definition: psmgr.c:57
#define NT_SUCCESS(StatCode)
Definition: apphelp.c:32
#define KeAcquireSpinLock(sl, irql)
Definition: env_spec_w32.h:609
KSDDKAPI NTSTATUS NTAPI KsRegisterCountedWorker(IN WORK_QUEUE_TYPE WorkQueueType, IN PWORK_QUEUE_ITEM CountedWorkItem, OUT PKSWORKER *Worker)
Definition: worker.c:166
ASSERT((InvokeOnSuccess||InvokeOnError||InvokeOnCancel) ?(CompletionRoutine !=NULL) :TRUE)
enum _WORK_QUEUE_TYPE WORK_QUEUE_TYPE
#define InterlockedDecrement
Definition: armddk.h:52
_Requires_lock_held_ Interrupt _Releases_lock_ Interrupt _In_ _IRQL_restores_ KIRQL OldIrql
Definition: kefuncs.h:803
PWORKER_THREAD_ROUTINE WorkerRoutine
Definition: extypes.h:204
Definition: typedefs.h:117
Status
Definition: gdiplustypes.h:24
#define KeInitializeEvent(pEvt, foo, foo2)
Definition: env_spec_w32.h:477
#define InterlockedIncrement
Definition: armddk.h:53
#define InitializeListHead(ListHead)
Definition: env_spec_w32.h:944
ULONG KSPIN_LOCK
Definition: env_spec_w32.h:72
struct _WORK_QUEUE_ITEM * PWORK_QUEUE_ITEM
#define KeReleaseSpinLock(sl, irql)
Definition: env_spec_w32.h:627
static LARGE_INTEGER Counter
Definition: clock.c:43
KSDDKAPI VOID NTAPI KsUnregisterWorker(IN PKSWORKER Worker)
Definition: worker.c:128
#define OUT
Definition: typedefs.h:39
unsigned int ULONG
Definition: retypes.h:1
#define IO_NO_INCREMENT
Definition: iotypes.h:565
PWORK_QUEUE_ITEM CountedWorkItem
Definition: worker.c:29
VOID NTAPI KeClearEvent(IN PKEVENT Event)
Definition: eventobj.c:22
return STATUS_SUCCESS
Definition: btrfs.c:2745
WORK_QUEUE_TYPE Type
Definition: worker.c:24
LIST_ENTRY QueuedWorkItems
Definition: worker.c:27
LONG QueuedWorkItemCount
Definition: worker.c:26
base of all file and directory entries
Definition: entries.h:82