libupnpp 0.16.0
A C++ wrapper for the Portable UPnP reference library
workqueue.h
1/* Copyright (C) 2006-2016 J.F.Dockes
2 *
3 * This library is free software; you can redistribute it and/or
4 * modify it under the terms of the GNU Lesser General Public
5 * License as published by the Free Software Foundation; either
6 * version 2.1 of the License, or (at your option) any later version.
7 *
8 * This library is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 * Lesser General Public License for more details.
12 *
13 * You should have received a copy of the GNU Lesser General Public
14 * License along with this library; if not, write to the Free Software
15 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
16 * 02110-1301 USA
17 */
18#ifndef _WORKQUEUE_H_INCLUDED_
19#define _WORKQUEUE_H_INCLUDED_
20
21#include <chrono>
22#include <thread>
23#include <string>
24#include <queue>
25#include <list>
26#include <mutex>
27#include <condition_variable>
28
29#ifdef MDU_INCLUDE_LOG
30#include MDU_INCLUDE_LOG
31#else
32#include "log.h"
33#endif
34
35using namespace std::chrono_literals;
36
54template <class T> class WorkQueue {
55public:
56
63 WorkQueue(const std::string& name, size_t hi = 0, size_t lo = 1)
64 : m_name(name), m_high(hi), m_low(lo)
65 {
66 }
67
68 ~WorkQueue() {
69 if (!m_worker_threads.empty()) {
71 }
72 }
73 WorkQueue(const WorkQueue&) = delete;
74 WorkQueue& operator=(const WorkQueue&) = delete;
75
82 void setTaskFreeFunc(void (*func)(T&)) {
83 m_taskfreefunc = func;
84 }
85
88 void closeShop() {
89 m_openforbusiness = false;
90 }
91
100 bool start(int nworkers, void *(workproc)(void *), void *arg) {
101 std::unique_lock<std::mutex> lock(m_mutex);
102 for (int i = 0; i < nworkers; i++) {
103 Worker w;
104 w.thr = std::thread(workproc, arg);
105 m_worker_threads.push_back(std::move(w));
106 }
107 return true;
108 }
109
114 bool put(T t, bool flushprevious = false) {
115 std::unique_lock<std::mutex> lock(m_mutex);
116 if (!ok() || !m_openforbusiness) {
117 LOGERR("WorkQueue::put: " << m_name << ": ok: " << ok() << " openforbusiness " <<
118 m_openforbusiness << "\n");
119 return false;
120 }
121 LOGDEB2("WorkQueue::put: " << m_name << "\n");
122
123 while (ok() && m_high > 0 && m_queue.size() >= m_high) {
124 m_clientsleeps++;
125 // Keep the order: we test ok() AFTER the sleep...
126 m_clients_waiting++;
127 m_ccond.wait(lock);
128 if (!ok()) {
129 m_clients_waiting--;
130 return false;
131 }
132 m_clients_waiting--;
133 }
134 if (flushprevious) {
135 while (!m_queue.empty()) {
136 if (m_taskfreefunc) {
137 T& d = m_queue.front();
138 m_taskfreefunc(d);
139 }
140 m_queue.pop();
141 }
142 }
143
144 m_queue.push(t);
145 if (m_workers_waiting > 0) {
146 // Just wake one worker, there is only one new task.
147 m_wcond.notify_one();
148 } else {
149 m_nowake++;
150 }
151
152 return true;
153 }
154
171 bool waitIdle() {
172 std::unique_lock<std::mutex> lock(m_mutex);
173 // We're not done while:
174 // - the queue is not empty and we have some workers left
175 // - OR some workers are working (not exited or back waiting for a task).
176 while (((m_queue.size() > 0 && m_workers_exited < m_worker_threads.size()) ||
177 (m_workers_waiting + m_workers_exited) < m_worker_threads.size())) {
178 LOGDEB0("waitIdle: " << m_name << " qsz " << m_queue.size() <<
179 " wwaiting " << m_workers_waiting << " wexit " << m_workers_exited << " nthr " <<
180 m_worker_threads.size() << "\n");
181 m_clients_waiting++;
182 m_ccond.wait(lock);
183 m_clients_waiting--;
184 }
185
186 return ok();
187 }
188
195 std::unique_lock<std::mutex> lock(m_mutex);
196 LOGDEB("setTerminateAndWait:" << m_name << "\n");
197
198 if (m_worker_threads.empty()) {
199 // Already called ?
200 return (void*)0;
201 }
202
203 // Wait for all worker threads to have called workerExit()
204 m_ok = false;
205 while (m_workers_exited < m_worker_threads.size()) {
206 m_wcond.notify_all();
207 m_clients_waiting++;
208 m_ccond.wait(lock);
209 m_clients_waiting--;
210 }
211
212 LOGDEB(m_name << ": tasks " << m_tottasks << " nowakes " << m_nowake << " wsleeps " <<
213 m_workersleeps << " csleeps " << m_clientsleeps << "\n");
214 // Perform the thread joins and compute overall status
215 // Workers return (void*)1 if ok
216 void *statusall = (void*)1;
217 while (!m_worker_threads.empty()) {
218 void *status = (void*) 1;
219 m_worker_threads.front().thr.join();
220 if (status == (void *)0) {
221 statusall = status;
222 }
223 m_worker_threads.pop_front();
224 }
225
226 // Reset to start state.
227 m_workers_exited = m_clients_waiting = m_workers_waiting =
228 m_tottasks = m_nowake = m_workersleeps = m_clientsleeps = 0;
229 m_ok = true;
230
231 LOGDEB("setTerminateAndWait:" << m_name << " done\n");
232 return statusall;
233 }
234
240 bool take(T* tp, size_t *szp = nullptr, std::chrono::duration<double> waitdur = {-1ms}) {
241 std::unique_lock<std::mutex> lock(m_mutex);
242 if (!ok()) {
243 LOGDEB("WorkQueue::take:" << m_name << ": not ok\n");
244 return false;
245 }
246
247 while (ok() && m_queue.size() < m_low) {
248 m_workersleeps++;
249 m_workers_waiting++;
250 if (m_queue.empty()) {
251 m_ccond.notify_all();
252 }
253 if (waitdur < 0ms) {
254 m_wcond.wait(lock);
255 } else if (m_wcond.wait_for(lock, waitdur) == std::cv_status::timeout) {
256 m_workers_waiting--;
257 *tp = nullptr;
258 return true;
259 }
260
261 if (!ok()) {
262 // !ok is a normal condition when shutting down
263 m_workers_waiting--;
264 return false;
265 }
266 m_workers_waiting--;
267 }
268
269 m_tottasks++;
270 *tp = m_queue.front();
271 if (szp) {
272 *szp = m_queue.size();
273 }
274 m_queue.pop();
275 if (m_clients_waiting > 0) {
276 // No reason to wake up more than one client thread
277 m_ccond.notify_one();
278 } else {
279 m_nowake++;
280 }
281 return true;
282 }
283
284 bool waitminsz(size_t sz) {
285 std::unique_lock<std::mutex> lock(m_mutex);
286 if (!ok()) {
287 return false;
288 }
289
290 while (ok() && m_queue.size() < sz) {
291 m_workersleeps++;
292 m_workers_waiting++;
293 if (m_queue.empty()) {
294 m_ccond.notify_all();
295 }
296 m_wcond.wait(lock);
297 if (!ok()) {
298 m_workers_waiting--;
299 return false;
300 }
301 m_workers_waiting--;
302 }
303 return true;
304 }
305
314 void workerExit() {
315 LOGDEB("workerExit:" << m_name << "\n");
316 std::unique_lock<std::mutex> lock(m_mutex);
317 m_workers_exited++;
318 m_ok = false;
319 m_ccond.notify_all();
320 }
321
322 size_t qsize() {
323 std::unique_lock<std::mutex> lock(m_mutex);
324 return m_queue.size();
325 }
326
327private:
328 bool ok() {
329 bool isok = m_ok && m_workers_exited == 0 && !m_worker_threads.empty();
330 if (!isok) {
331 LOGDEB("WorkQueue:ok:" << m_name << ": not ok m_ok " << m_ok <<
332 " m_workers_exited " << m_workers_exited <<
333 " m_worker_threads size " << m_worker_threads.size() <<
334 "\n");
335 }
336 return isok;
337 }
338
339 struct Worker {
340 std::thread thr;
341 };
342
343 void (*m_taskfreefunc)(T&){nullptr};
344 // Configuration
345 std::string m_name;
346 size_t m_high;
347 size_t m_low;
348
349 // Worker threads having called exit. Used to decide when we're done
350 unsigned int m_workers_exited{0};
351 // Status
352 bool m_ok{true};
353
354 // Accepting new tasks
355 bool m_openforbusiness{true};
356
357 // Our threads.
358 std::list<Worker> m_worker_threads;
359
360 // Jobs input queue
361 std::queue<T> m_queue;
362
363 // Synchronization
364 std::condition_variable m_ccond;
365 std::condition_variable m_wcond;
366 std::mutex m_mutex;
367
368 // Client/Worker threads currently waiting for a job
369 unsigned int m_clients_waiting{0};
370 unsigned int m_workers_waiting{0};
371
372 // Statistics
373 unsigned int m_tottasks{0};
374 unsigned int m_nowake{0};
375 unsigned int m_workersleeps{0};
376 unsigned int m_clientsleeps{0};
377};
378
379#endif /* _WORKQUEUE_H_INCLUDED_ */
A WorkQueue manages the synchronisation around a queue of work items, where a number of client thread...
Definition workqueue.h:54
void closeShop()
Forbid inputting new tasks.
Definition workqueue.h:88
bool start(int nworkers, void *(workproc)(void *), void *arg)
Start the worker threads.
Definition workqueue.h:100
bool waitIdle()
Wait until the queue is inactive.
Definition workqueue.h:171
bool put(T t, bool flushprevious=false)
Add item to work queue, called from client.
Definition workqueue.h:114
bool take(T *tp, size_t *szp=nullptr, std::chrono::duration< double > waitdur={-1ms})
Take task from queue.
Definition workqueue.h:240
void workerExit()
Advertise exit and abort queue.
Definition workqueue.h:314
void setTaskFreeFunc(void(*func)(T &))
Task deleter If put() is called with the flush option, and the tasks allocate memory,...
Definition workqueue.h:82
void * setTerminateAndWait()
Tell the workers to exit, and wait for them.
Definition workqueue.h:194
WorkQueue(const std::string &name, size_t hi=0, size_t lo=1)
Create a WorkQueue.
Definition workqueue.h:63