18#ifndef _WORKQUEUE_H_INCLUDED_
19#define _WORKQUEUE_H_INCLUDED_
27#include <condition_variable>
30#include MDU_INCLUDE_LOG
35using namespace std::chrono_literals;
63 WorkQueue(
const std::string& name,
size_t hi = 0,
size_t lo = 1)
64 : m_name(name), m_high(hi), m_low(lo)
69 if (!m_worker_threads.empty()) {
83 m_taskfreefunc = func;
89 m_openforbusiness =
false;
100 bool start(
int nworkers,
void *(workproc)(
void *),
void *arg) {
101 std::unique_lock<std::mutex> lock(m_mutex);
102 for (
int i = 0; i < nworkers; i++) {
104 w.thr = std::thread(workproc, arg);
105 m_worker_threads.push_back(std::move(w));
114 bool put(T t,
bool flushprevious =
false) {
115 std::unique_lock<std::mutex> lock(m_mutex);
116 if (!ok() || !m_openforbusiness) {
117 LOGERR(
"WorkQueue::put: " << m_name <<
": ok: " << ok() <<
" openforbusiness " <<
118 m_openforbusiness <<
"\n");
121 LOGDEB2(
"WorkQueue::put: " << m_name <<
"\n");
123 while (ok() && m_high > 0 && m_queue.size() >= m_high) {
135 while (!m_queue.empty()) {
136 if (m_taskfreefunc) {
137 T& d = m_queue.front();
145 if (m_workers_waiting > 0) {
147 m_wcond.notify_one();
172 std::unique_lock<std::mutex> lock(m_mutex);
176 while (((m_queue.size() > 0 && m_workers_exited < m_worker_threads.size()) ||
177 (m_workers_waiting + m_workers_exited) < m_worker_threads.size())) {
178 LOGDEB0(
"waitIdle: " << m_name <<
" qsz " << m_queue.size() <<
179 " wwaiting " << m_workers_waiting <<
" wexit " << m_workers_exited <<
" nthr " <<
180 m_worker_threads.size() <<
"\n");
195 std::unique_lock<std::mutex> lock(m_mutex);
196 LOGDEB(
"setTerminateAndWait:" << m_name <<
"\n");
198 if (m_worker_threads.empty()) {
205 while (m_workers_exited < m_worker_threads.size()) {
206 m_wcond.notify_all();
212 LOGDEB(m_name <<
": tasks " << m_tottasks <<
" nowakes " << m_nowake <<
" wsleeps " <<
213 m_workersleeps <<
" csleeps " << m_clientsleeps <<
"\n");
216 void *statusall = (
void*)1;
217 while (!m_worker_threads.empty()) {
218 void *status = (
void*) 1;
219 m_worker_threads.front().thr.join();
220 if (status == (
void *)0) {
223 m_worker_threads.pop_front();
227 m_workers_exited = m_clients_waiting = m_workers_waiting =
228 m_tottasks = m_nowake = m_workersleeps = m_clientsleeps = 0;
231 LOGDEB(
"setTerminateAndWait:" << m_name <<
" done\n");
240 bool take(T* tp,
size_t *szp =
nullptr, std::chrono::duration<double> waitdur = {-1ms}) {
241 std::unique_lock<std::mutex> lock(m_mutex);
243 LOGDEB(
"WorkQueue::take:" << m_name <<
": not ok\n");
247 while (ok() && m_queue.size() < m_low) {
250 if (m_queue.empty()) {
251 m_ccond.notify_all();
255 }
else if (m_wcond.wait_for(lock, waitdur) == std::cv_status::timeout) {
270 *tp = m_queue.front();
272 *szp = m_queue.size();
275 if (m_clients_waiting > 0) {
277 m_ccond.notify_one();
284 bool waitminsz(
size_t sz) {
285 std::unique_lock<std::mutex> lock(m_mutex);
290 while (ok() && m_queue.size() < sz) {
293 if (m_queue.empty()) {
294 m_ccond.notify_all();
315 LOGDEB(
"workerExit:" << m_name <<
"\n");
316 std::unique_lock<std::mutex> lock(m_mutex);
319 m_ccond.notify_all();
323 std::unique_lock<std::mutex> lock(m_mutex);
324 return m_queue.size();
329 bool isok = m_ok && m_workers_exited == 0 && !m_worker_threads.empty();
331 LOGDEB(
"WorkQueue:ok:" << m_name <<
": not ok m_ok " << m_ok <<
332 " m_workers_exited " << m_workers_exited <<
333 " m_worker_threads size " << m_worker_threads.size() <<
343 void (*m_taskfreefunc)(T&){
nullptr};
350 unsigned int m_workers_exited{0};
355 bool m_openforbusiness{
true};
358 std::list<Worker> m_worker_threads;
361 std::queue<T> m_queue;
364 std::condition_variable m_ccond;
365 std::condition_variable m_wcond;
369 unsigned int m_clients_waiting{0};
370 unsigned int m_workers_waiting{0};
373 unsigned int m_tottasks{0};
374 unsigned int m_nowake{0};
375 unsigned int m_workersleeps{0};
376 unsigned int m_clientsleeps{0};
A WorkQueue manages the synchronisation around a queue of work items, where a number of client thread...
Definition workqueue.h:54
void closeShop()
Forbid inputting new tasks.
Definition workqueue.h:88
bool start(int nworkers, void *(workproc)(void *), void *arg)
Start the worker threads.
Definition workqueue.h:100
bool waitIdle()
Wait until the queue is inactive.
Definition workqueue.h:171
bool put(T t, bool flushprevious=false)
Add item to work queue, called from client.
Definition workqueue.h:114
bool take(T *tp, size_t *szp=nullptr, std::chrono::duration< double > waitdur={-1ms})
Take task from queue.
Definition workqueue.h:240
void workerExit()
Advertise exit and abort queue.
Definition workqueue.h:314
void setTaskFreeFunc(void(*func)(T &))
Task deleter If put() is called with the flush option, and the tasks allocate memory,...
Definition workqueue.h:82
void * setTerminateAndWait()
Tell the workers to exit, and wait for them.
Definition workqueue.h:194
WorkQueue(const std::string &name, size_t hi=0, size_t lo=1)
Create a WorkQueue.
Definition workqueue.h:63