code_util/BlockingQueue.h

134 lines
2.0 KiB
C
Raw Permalink Normal View History

2023-02-24 09:24:48 +00:00
#ifndef BLOCKING_QUEUE_H
#define BLOCKING_QUEUE_H
//#include <boost/thread/mutex.hpp>
//#include <boost/thread/condition_variable.hpp>
#if defined(WIN32) || defined(_WIN64)
#include <Windows.h>
#endif
#include <mutex>
#include <condition_variable>
#include <deque>
#include <iostream>
#include <exception>
using namespace std;
template <typename T>
class BlockingQueue
{
private:
BlockingQueue(const BlockingQueue& rhs);
BlockingQueue& operator =(const BlockingQueue& rhs);
mutable std::mutex _mutex;
std::condition_variable _condvar;
typedef struct _dq
{
size_t bytes;
uint32_t id;
T t;
}DQ;
deque<DQ> _queue;
size_t bytes_;
bool isShutDown;
T tRet;
public:
BlockingQueue()
: _mutex()
, _condvar()
, _queue()
, isShutDown(false), bytes_(0)
{
}
~BlockingQueue()
{
ShutDown();
std::cout << "blocking queue release" << std::endl;
}
void Clear()
{
lock_guard<mutex> lock(_mutex);
_condvar.notify_all();
_queue.clear();
bytes_ = 0;
}
void ShutDown()
{
isShutDown = true;
_condvar.notify_all();
_queue.clear();
bytes_ = 0;
}
bool IsShutDown()
{
return isShutDown;
}
void Put(const T task, size_t bytes, uint32_t id = -1)
{
lock_guard<mutex> lock(_mutex);
if (!isShutDown)
{
{
DQ dq = { bytes, id, task };
_queue.push_back(dq);
bytes_ += bytes;
}
_condvar.notify_all();
}
}
T Take(uint32_t* id = nullptr)
{
unique_lock<mutex> lock(_mutex);
if (_queue.size() <= 0)
_condvar.wait(lock);
if (isShutDown || _queue.empty())
{
return tRet;
}
DQ front(_queue.front());
_queue.pop_front();
bytes_ -= front.bytes;
if (id)
*id = front.id;
return front.t;
}
T Front(uint32_t* id = nullptr)
{
unique_lock<mutex> lock(_mutex);
if (_queue.size() <= 0)
_condvar.wait(lock);
if (isShutDown || _queue.empty())
{
return tRet;
}
DQ front(_queue.front());
return front.t;
}
size_t Size(size_t* bytes = nullptr) const
{
lock_guard<mutex> lock(_mutex);
if (bytes)
*bytes = bytes_;
return _queue.size();
}
};
#endif