libupnpp  0.16.0
A C++ wrapper for the Portable UPnP reference library
workqueue.h
1 /* Copyright (C) 2006-2016 J.F.Dockes
2  *
3  * This library is free software; you can redistribute it and/or
4  * modify it under the terms of the GNU Lesser General Public
5  * License as published by the Free Software Foundation; either
6  * version 2.1 of the License, or (at your option) any later version.
7  *
8  * This library is distributed in the hope that it will be useful,
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11  * Lesser General Public License for more details.
12  *
13  * You should have received a copy of the GNU Lesser General Public
14  * License along with this library; if not, write to the Free Software
15  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
16  * 02110-1301 USA
17  */
18 #ifndef _WORKQUEUE_H_INCLUDED_
19 #define _WORKQUEUE_H_INCLUDED_
20 
21 #include <thread>
22 #if HAVE_STD_FUTURE
23 #include <future>
24 #endif
25 #include <string>
26 #include <queue>
27 #include <list>
28 #include <mutex>
29 #include <condition_variable>
30 
31 #ifdef MDU_INCLUDE_LOG
32 #include MDU_INCLUDE_LOG
33 #else
34 #include "log.h"
35 #endif
36 
54 template <class T> class WorkQueue {
55 public:
56 
63  WorkQueue(const std::string& name, size_t hi = 0, size_t lo = 1)
64  : m_name(name), m_high(hi), m_low(lo), m_workers_exited(0),
65  m_ok(true), m_clients_waiting(0), m_workers_waiting(0),
66  m_tottasks(0), m_nowake(0), m_workersleeps(0), m_clientsleeps(0) {
67  }
68 
69  ~WorkQueue() {
70  if (!m_worker_threads.empty()) {
72  }
73  }
74 
81  void setTaskFreeFunc(void (*func)(T&)) {
82  m_taskfreefunc = func;
83  }
84 
93  bool start(int nworkers, void *(workproc)(void *), void *arg) {
94  std::unique_lock<std::mutex> lock(m_mutex);
95  for (int i = 0; i < nworkers; i++) {
96  Worker w;
97 #if HAVE_STD_FUTURE
98  std::packaged_task<void *(void *)> task(workproc);
99  w.res = task.get_future();
100  w.thr = std::thread(std::move(task), arg);
101 #else
102  w.thr = std::thread(workproc, arg);
103 #endif
104  m_worker_threads.push_back(std::move(w));
105  }
106  return true;
107  }
108 
113  bool put(T t, bool flushprevious = false) {
114  std::unique_lock<std::mutex> lock(m_mutex);
115  if (!ok()) {
116  LOGERR("WorkQueue::put:" << m_name << ": !ok\n");
117  return false;
118  }
119 
120  while (ok() && m_high > 0 && m_queue.size() >= m_high) {
121  m_clientsleeps++;
122  // Keep the order: we test ok() AFTER the sleep...
123  m_clients_waiting++;
124  m_ccond.wait(lock);
125  if (!ok()) {
126  m_clients_waiting--;
127  return false;
128  }
129  m_clients_waiting--;
130  }
131  if (flushprevious) {
132  while (!m_queue.empty()) {
133  if (m_taskfreefunc) {
134  T& d = m_queue.front();
135  m_taskfreefunc(d);
136  }
137  m_queue.pop();
138  }
139  }
140 
141  m_queue.push(t);
142  if (m_workers_waiting > 0) {
143  // Just wake one worker, there is only one new task.
144  m_wcond.notify_one();
145  } else {
146  m_nowake++;
147  }
148 
149  return true;
150  }
151 
168  bool waitIdle() {
169  std::unique_lock<std::mutex> lock(m_mutex);
170  if (!ok()) {
171  LOGERR("WorkQueue::waitIdle:" << m_name << ": not ok\n");
172  return false;
173  }
174 
175  // We're done when the queue is empty AND all workers are back
176  // waiting for a task.
177  while (ok() && (m_queue.size() > 0 ||
178  m_workers_waiting != m_worker_threads.size())) {
179  m_clients_waiting++;
180  m_ccond.wait(lock);
181  m_clients_waiting--;
182  }
183 
184  return ok();
185  }
186 
193  std::unique_lock<std::mutex> lock(m_mutex);
194  LOGDEB("setTerminateAndWait:" << m_name << "\n");
195 
196  if (m_worker_threads.empty()) {
197  // Already called ?
198  return (void*)0;
199  }
200 
201  // Wait for all worker threads to have called workerExit()
202  m_ok = false;
203  while (m_workers_exited < m_worker_threads.size()) {
204  m_wcond.notify_all();
205  m_clients_waiting++;
206  m_ccond.wait(lock);
207  m_clients_waiting--;
208  }
209 
210  LOGDEB("" << m_name << ": tasks " << m_tottasks << " nowakes " <<
211  m_nowake << " wsleeps " << m_workersleeps << " csleeps " <<
212  m_clientsleeps << "\n");
213  // Perform the thread joins and compute overall status
214  // Workers return (void*)1 if ok
215  void *statusall = (void*)1;
216  while (!m_worker_threads.empty()) {
217 #if HAVE_STD_FUTURE
218  void *status = m_worker_threads.front().res.get();
219 #else
220  void *status = (void*) 1;
221 #endif
222  m_worker_threads.front().thr.join();
223  if (status == (void *)0) {
224  statusall = status;
225  }
226  m_worker_threads.pop_front();
227  }
228 
229  // Reset to start state.
230  m_workers_exited = m_clients_waiting = m_workers_waiting =
231  m_tottasks = m_nowake = m_workersleeps = m_clientsleeps = 0;
232  m_ok = true;
233 
234  LOGDEB("setTerminateAndWait:" << m_name << " done\n");
235  return statusall;
236  }
237 
243  bool take(T* tp, size_t *szp = 0) {
244  std::unique_lock<std::mutex> lock(m_mutex);
245  if (!ok()) {
246  LOGDEB("WorkQueue::take:" << m_name << ": not ok\n");
247  return false;
248  }
249 
250  while (ok() && m_queue.size() < m_low) {
251  m_workersleeps++;
252  m_workers_waiting++;
253  if (m_queue.empty()) {
254  m_ccond.notify_all();
255  }
256  m_wcond.wait(lock);
257  if (!ok()) {
258  // !ok is a normal condition when shutting down
259  m_workers_waiting--;
260  return false;
261  }
262  m_workers_waiting--;
263  }
264 
265  m_tottasks++;
266  *tp = m_queue.front();
267  if (szp) {
268  *szp = m_queue.size();
269  }
270  m_queue.pop();
271  if (m_clients_waiting > 0) {
272  // No reason to wake up more than one client thread
273  m_ccond.notify_one();
274  } else {
275  m_nowake++;
276  }
277  return true;
278  }
279 
280  bool waitminsz(size_t sz) {
281  std::unique_lock<std::mutex> lock(m_mutex);
282  if (!ok()) {
283  return false;
284  }
285 
286  while (ok() && m_queue.size() < sz) {
287  m_workersleeps++;
288  m_workers_waiting++;
289  if (m_queue.empty()) {
290  m_ccond.notify_all();
291  }
292  m_wcond.wait(lock);
293  if (!ok()) {
294  m_workers_waiting--;
295  return false;
296  }
297  m_workers_waiting--;
298  }
299  return true;
300  }
301 
310  void workerExit() {
311  LOGDEB("workerExit:" << m_name << "\n");
312  std::unique_lock<std::mutex> lock(m_mutex);
313  m_workers_exited++;
314  m_ok = false;
315  m_ccond.notify_all();
316  }
317 
318  size_t qsize() {
319  std::unique_lock<std::mutex> lock(m_mutex);
320  return m_queue.size();
321  }
322 
323 private:
324  bool ok() {
325  bool isok = m_ok && m_workers_exited == 0 && !m_worker_threads.empty();
326  if (!isok) {
327  LOGDEB("WorkQueue:ok:" << m_name << ": not ok m_ok " << m_ok <<
328  " m_workers_exited " << m_workers_exited <<
329  " m_worker_threads size " << m_worker_threads.size() <<
330  "\n");
331  }
332  return isok;
333  }
334 
335  struct Worker {
336  std::thread thr;
337 #if HAVE_STD_FUTURE
338  std::future<void *> res;
339 #endif
340  };
341 
342  void (*m_taskfreefunc)(T&){nullptr};
343  // Configuration
344  std::string m_name;
345  size_t m_high;
346  size_t m_low;
347 
348  // Worker threads having called exit. Used to decide when we're done
349  unsigned int m_workers_exited;
350  // Status
351  bool m_ok;
352 
353  // Our threads.
354  std::list<Worker> m_worker_threads;
355 
356  // Jobs input queue
357  std::queue<T> m_queue;
358 
359  // Synchronization
360  std::condition_variable m_ccond;
361  std::condition_variable m_wcond;
362  std::mutex m_mutex;
363 
364  // Client/Worker threads currently waiting for a job
365  unsigned int m_clients_waiting;
366  unsigned int m_workers_waiting;
367 
368  // Statistics
369  unsigned int m_tottasks;
370  unsigned int m_nowake;
371  unsigned int m_workersleeps;
372  unsigned int m_clientsleeps;
373 };
374 
375 #endif /* _WORKQUEUE_H_INCLUDED_ */
376 
bool put(T t, bool flushprevious=false)
Add item to work queue, called from client.
Definition: workqueue.h:113
void workerExit()
Advertise exit and abort queue.
Definition: workqueue.h:310
A WorkQueue manages the synchronisation around a queue of work items, where a number of client thread...
Definition: workqueue.h:54
bool take(T *tp, size_t *szp=0)
Take task from queue.
Definition: workqueue.h:243
void setTaskFreeFunc(void(*func)(T &))
Task deleter If put() is called with the flush option, and the tasks allocate memory, you need to set this function, which will be called on each task popped from the queue.
Definition: workqueue.h:81
bool waitIdle()
Wait until the queue is inactive.
Definition: workqueue.h:168
void * setTerminateAndWait()
Tell the workers to exit, and wait for them.
Definition: workqueue.h:192
WorkQueue(const std::string &name, size_t hi=0, size_t lo=1)
Create a WorkQueue.
Definition: workqueue.h:63
bool start(int nworkers, void *(workproc)(void *), void *arg)
Start the worker threads.
Definition: workqueue.h:93