.. _program_listing_file_include_depthai_utility_LockingQueue.hpp: Program Listing for File LockingQueue.hpp ========================================= |exhale_lsh| :ref:`Return to documentation for file ` (``include/depthai/utility/LockingQueue.hpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp #pragma once #include #include #include #include #include #include namespace dai { template class LockingQueue { public: LockingQueue() = default; explicit LockingQueue(unsigned maxSize, bool blocking = true) : maxSize(maxSize), blocking(blocking) {} void setMaxSize(unsigned sz) { // Lock first std::unique_lock lock(guard); maxSize = sz; } void setBlocking(bool bl) { // Lock first std::unique_lock lock(guard); blocking = bl; } unsigned getMaxSize() const { // Lock first std::unique_lock lock(guard); return maxSize; } bool getBlocking() const { // Lock first std::unique_lock lock(guard); return blocking; } void destruct() { std::unique_lock lock(guard); if(!destructed) { signalPop.notify_all(); signalPush.notify_all(); destructed = true; } } ~LockingQueue() = default; template bool waitAndConsumeAll(std::function callback, std::chrono::duration timeout) { { std::unique_lock lock(guard); // First checks predicate, then waits bool pred = signalPush.wait_for(lock, timeout, [this]() { return !queue.empty() || destructed; }); if(!pred) return false; if(destructed) return false; // Continue here if and only if queue has any elements while(!queue.empty()) { callback(queue.front()); queue.pop(); } } signalPop.notify_all(); return true; } bool waitAndConsumeAll(std::function callback) { { std::unique_lock lock(guard); signalPush.wait(lock, [this]() { return !queue.empty() || destructed; }); if(queue.empty()) return false; if(destructed) return false; while(!queue.empty()) { callback(queue.front()); queue.pop(); } } signalPop.notify_all(); return true; } bool consumeAll(std::function callback) { { std::lock_guard lock(guard); if(queue.empty()) return false; while(!queue.empty()) { callback(queue.front()); queue.pop(); } } signalPop.notify_all(); return true; } bool push(T const& data) { { std::unique_lock lock(guard); if(maxSize == 0) { // necessary if maxSize was changed while(!queue.empty()) { queue.pop(); } return true; } if(!blocking) { // if non blocking, remove as many oldest elements as necessary, so next one will fit // necessary if maxSize was changed while(queue.size() >= maxSize) { queue.pop(); } } else { signalPop.wait(lock, [this]() { return queue.size() < maxSize || destructed; }); if(destructed) return false; } queue.push(data); } signalPush.notify_all(); return true; } template bool tryWaitAndPush(T const& data, std::chrono::duration timeout) { { std::unique_lock lock(guard); if(maxSize == 0) { // necessary if maxSize was changed while(!queue.empty()) { queue.pop(); } return true; } if(!blocking) { // if non blocking, remove as many oldest elements as necessary, so next one will fit // necessary if maxSize was changed while(queue.size() >= maxSize) { queue.pop(); } } else { // First checks predicate, then waits bool pred = signalPop.wait_for(lock, timeout, [this]() { return queue.size() < maxSize || destructed; }); if(!pred) return false; if(destructed) return false; } queue.push(data); } signalPush.notify_all(); return true; } bool empty() const { std::lock_guard lock(guard); return queue.empty(); } bool front(T& value) { std::unique_lock lock(guard); if(queue.empty()) { return false; } value = queue.front(); return true; } bool tryPop(T& value) { { std::lock_guard lock(guard); if(queue.empty()) { return false; } value = std::move(queue.front()); queue.pop(); } signalPop.notify_all(); return true; } bool waitAndPop(T& value) { { std::unique_lock lock(guard); signalPush.wait(lock, [this]() { return (!queue.empty() || destructed); }); if(queue.empty()) return false; if(destructed) return false; value = std::move(queue.front()); queue.pop(); } signalPop.notify_all(); return true; } template bool tryWaitAndPop(T& value, std::chrono::duration timeout) { { std::unique_lock lock(guard); // First checks predicate, then waits bool pred = signalPush.wait_for(lock, timeout, [this]() { return !queue.empty() || destructed; }); if(!pred) return false; if(destructed) return false; value = std::move(queue.front()); queue.pop(); } signalPop.notify_all(); return true; } void waitEmpty() { std::unique_lock lock(guard); signalPop.wait(lock, [this]() { return queue.empty() || destructed; }); } private: unsigned maxSize = std::numeric_limits::max(); bool blocking = true; std::queue queue; mutable std::mutex guard; bool destructed{false}; std::condition_variable signalPop; std::condition_variable signalPush; }; } // namespace dai