18 #ifndef _WORKQUEUE_H_INCLUDED_ 19 #define _WORKQUEUE_H_INCLUDED_ 29 #include <condition_variable> 31 #ifdef MDU_INCLUDE_LOG 32 #include MDU_INCLUDE_LOG 63 WorkQueue(
const std::string& name,
size_t hi = 0,
size_t lo = 1)
64 : m_name(name), m_high(hi), m_low(lo), m_workers_exited(0),
65 m_ok(true), m_clients_waiting(0), m_workers_waiting(0),
66 m_tottasks(0), m_nowake(0), m_workersleeps(0), m_clientsleeps(0) {
70 if (!m_worker_threads.empty()) {
82 m_taskfreefunc = func;
93 bool start(
int nworkers,
void *(workproc)(
void *),
void *arg) {
94 std::unique_lock<std::mutex> lock(m_mutex);
95 for (
int i = 0; i < nworkers; i++) {
98 std::packaged_task<void *(void *)> task(workproc);
99 w.res = task.get_future();
100 w.thr = std::thread(std::move(task), arg);
102 w.thr = std::thread(workproc, arg);
104 m_worker_threads.push_back(std::move(w));
113 bool put(T t,
bool flushprevious =
false) {
114 std::unique_lock<std::mutex> lock(m_mutex);
116 LOGERR(
"WorkQueue::put:" << m_name <<
": !ok\n");
120 while (ok() && m_high > 0 && m_queue.size() >= m_high) {
132 while (!m_queue.empty()) {
133 if (m_taskfreefunc) {
134 T& d = m_queue.front();
142 if (m_workers_waiting > 0) {
144 m_wcond.notify_one();
169 std::unique_lock<std::mutex> lock(m_mutex);
171 LOGERR(
"WorkQueue::waitIdle:" << m_name <<
": not ok\n");
177 while (ok() && (m_queue.size() > 0 ||
178 m_workers_waiting != m_worker_threads.size())) {
193 std::unique_lock<std::mutex> lock(m_mutex);
194 LOGDEB(
"setTerminateAndWait:" << m_name <<
"\n");
196 if (m_worker_threads.empty()) {
203 while (m_workers_exited < m_worker_threads.size()) {
204 m_wcond.notify_all();
210 LOGDEB(
"" << m_name <<
": tasks " << m_tottasks <<
" nowakes " <<
211 m_nowake <<
" wsleeps " << m_workersleeps <<
" csleeps " <<
212 m_clientsleeps <<
"\n");
215 void *statusall = (
void*)1;
216 while (!m_worker_threads.empty()) {
218 void *status = m_worker_threads.front().res.get();
220 void *status = (
void*) 1;
222 m_worker_threads.front().thr.join();
223 if (status == (
void *)0) {
226 m_worker_threads.pop_front();
230 m_workers_exited = m_clients_waiting = m_workers_waiting =
231 m_tottasks = m_nowake = m_workersleeps = m_clientsleeps = 0;
234 LOGDEB(
"setTerminateAndWait:" << m_name <<
" done\n");
243 bool take(T* tp,
size_t *szp = 0) {
244 std::unique_lock<std::mutex> lock(m_mutex);
246 LOGDEB(
"WorkQueue::take:" << m_name <<
": not ok\n");
250 while (ok() && m_queue.size() < m_low) {
253 if (m_queue.empty()) {
254 m_ccond.notify_all();
266 *tp = m_queue.front();
268 *szp = m_queue.size();
271 if (m_clients_waiting > 0) {
273 m_ccond.notify_one();
280 bool waitminsz(
size_t sz) {
281 std::unique_lock<std::mutex> lock(m_mutex);
286 while (ok() && m_queue.size() < sz) {
289 if (m_queue.empty()) {
290 m_ccond.notify_all();
311 LOGDEB(
"workerExit:" << m_name <<
"\n");
312 std::unique_lock<std::mutex> lock(m_mutex);
315 m_ccond.notify_all();
319 std::unique_lock<std::mutex> lock(m_mutex);
320 return m_queue.size();
325 bool isok = m_ok && m_workers_exited == 0 && !m_worker_threads.empty();
327 LOGDEB(
"WorkQueue:ok:" << m_name <<
": not ok m_ok " << m_ok <<
328 " m_workers_exited " << m_workers_exited <<
329 " m_worker_threads size " << m_worker_threads.size() <<
338 std::future<void *> res;
342 void (*m_taskfreefunc)(T&){
nullptr};
349 unsigned int m_workers_exited;
354 std::list<Worker> m_worker_threads;
357 std::queue<T> m_queue;
360 std::condition_variable m_ccond;
361 std::condition_variable m_wcond;
365 unsigned int m_clients_waiting;
366 unsigned int m_workers_waiting;
369 unsigned int m_tottasks;
370 unsigned int m_nowake;
371 unsigned int m_workersleeps;
372 unsigned int m_clientsleeps;
bool put(T t, bool flushprevious=false)
Add item to work queue, called from client.
Definition: workqueue.h:113
void workerExit()
Advertise exit and abort queue.
Definition: workqueue.h:310
A WorkQueue manages the synchronisation around a queue of work items, where a number of client thread...
Definition: workqueue.h:54
bool take(T *tp, size_t *szp=0)
Take task from queue.
Definition: workqueue.h:243
void setTaskFreeFunc(void(*func)(T &))
Task deleter If put() is called with the flush option, and the tasks allocate memory, you need to set this function, which will be called on each task popped from the queue.
Definition: workqueue.h:81
bool waitIdle()
Wait until the queue is inactive.
Definition: workqueue.h:168
void * setTerminateAndWait()
Tell the workers to exit, and wait for them.
Definition: workqueue.h:192
WorkQueue(const std::string &name, size_t hi=0, size_t lo=1)
Create a WorkQueue.
Definition: workqueue.h:63
bool start(int nworkers, void *(workproc)(void *), void *arg)
Start the worker threads.
Definition: workqueue.h:93