#ifndef BLOCKING_QUEUE_H #define BLOCKING_QUEUE_H //#include //#include #if defined(WIN32) || defined(_WIN64) #include #endif #include #include #include #include #include using namespace std; template class BlockingQueue { private: BlockingQueue(const BlockingQueue& rhs); BlockingQueue& operator =(const BlockingQueue& rhs); mutable std::mutex _mutex; std::condition_variable _condvar; typedef struct _dq { size_t bytes; uint32_t id; T t; }DQ; deque _queue; size_t bytes_; bool isShutDown; T tRet; public: BlockingQueue() : _mutex() , _condvar() , _queue() , isShutDown(false), bytes_(0) { } ~BlockingQueue() { ShutDown(); std::cout << "blocking queue release" << std::endl; } void Clear() { lock_guard lock(_mutex); _condvar.notify_all(); _queue.clear(); bytes_ = 0; } void ShutDown() { isShutDown = true; _condvar.notify_all(); _queue.clear(); bytes_ = 0; } bool IsShutDown() { return isShutDown; } void Put(const T task, size_t bytes, uint32_t id = -1) { lock_guard lock(_mutex); if (!isShutDown) { { DQ dq = { bytes, id, task }; _queue.push_back(dq); bytes_ += bytes; } _condvar.notify_all(); } } T Take(uint32_t* id = nullptr) { unique_lock lock(_mutex); if (_queue.size() <= 0) _condvar.wait(lock); if (isShutDown || _queue.empty()) { return tRet; } DQ front(_queue.front()); _queue.pop_front(); bytes_ -= front.bytes; if (id) *id = front.id; return front.t; } T Front(uint32_t* id = nullptr) { unique_lock lock(_mutex); if (_queue.size() <= 0) _condvar.wait(lock); if (isShutDown || _queue.empty()) { return tRet; } DQ front(_queue.front()); return front.t; } size_t Size(size_t* bytes = nullptr) const { lock_guard lock(_mutex); if (bytes) *bytes = bytes_; return _queue.size(); } }; #endif