.. _program_listing_file_include_depthai_device_DataQueue.hpp: Program Listing for File DataQueue.hpp ====================================== |exhale_lsh| :ref:`Return to documentation for file ` (``include/depthai/device/DataQueue.hpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp #pragma once // std #include #include #include // project #include "depthai/pipeline/datatype/ADatatype.hpp" #include "depthai/utility/LockingQueue.hpp" #include "depthai/xlink/XLinkConnection.hpp" // shared #include "depthai-shared/datatype/RawBuffer.hpp" #include "depthai-shared/xlink/XLinkConstants.hpp" namespace dai { class DataOutputQueue { public: using CallbackId = int; private: LockingQueue> queue; std::thread readingThread; std::atomic running{true}; std::string exceptionMessage{""}; const std::string name{""}; std::mutex callbacksMtx; std::unordered_map)>> callbacks; CallbackId uniqueCallbackId{0}; // const std::chrono::milliseconds READ_TIMEOUT{500}; public: // DataOutputQueue constructor DataOutputQueue(const std::shared_ptr conn, const std::string& streamName, unsigned int maxSize = 16, bool blocking = true); ~DataOutputQueue(); bool isClosed() const; void close(); void setBlocking(bool blocking); bool getBlocking() const; void setMaxSize(unsigned int maxSize); unsigned int getMaxSize() const; std::string getName() const; CallbackId addCallback(std::function)>); CallbackId addCallback(std::function)>); CallbackId addCallback(std::function callback); bool removeCallback(CallbackId callbackId); template bool has() { if(!running) throw std::runtime_error(exceptionMessage.c_str()); std::shared_ptr val = nullptr; if(queue.front(val) && dynamic_cast(val.get())) { return true; } return false; } bool has() { if(!running) throw std::runtime_error(exceptionMessage.c_str()); return !queue.empty(); } template std::shared_ptr tryGet() { if(!running) throw std::runtime_error(exceptionMessage.c_str()); std::shared_ptr val = nullptr; if(!queue.tryPop(val)) return nullptr; return std::dynamic_pointer_cast(val); } std::shared_ptr tryGet() { return tryGet(); } template std::shared_ptr get() { if(!running) throw std::runtime_error(exceptionMessage.c_str()); std::shared_ptr val = nullptr; if(!queue.waitAndPop(val)) { throw std::runtime_error(exceptionMessage.c_str()); } return std::dynamic_pointer_cast(val); } std::shared_ptr get() { return get(); } template std::shared_ptr front() { if(!running) throw std::runtime_error(exceptionMessage.c_str()); std::shared_ptr val = nullptr; if(!queue.front(val)) return nullptr; return std::dynamic_pointer_cast(val); } std::shared_ptr front() { return front(); } template std::shared_ptr get(std::chrono::duration timeout, bool& hasTimedout) { if(!running) throw std::runtime_error(exceptionMessage.c_str()); std::shared_ptr val = nullptr; if(!queue.tryWaitAndPop(val, timeout)) { hasTimedout = true; return nullptr; } hasTimedout = false; return std::dynamic_pointer_cast(val); } template std::shared_ptr get(std::chrono::duration timeout, bool& hasTimedout) { return get(timeout, hasTimedout); } template std::vector> tryGetAll() { if(!running) throw std::runtime_error(exceptionMessage.c_str()); std::vector> messages; queue.consumeAll([&messages](std::shared_ptr& msg) { // dynamic pointer cast may return nullptr // in which case that message in vector will be nullptr messages.push_back(std::dynamic_pointer_cast(std::move(msg))); }); return messages; } std::vector> tryGetAll() { return tryGetAll(); } template std::vector> getAll() { if(!running) throw std::runtime_error(exceptionMessage.c_str()); std::vector> messages; queue.waitAndConsumeAll([&messages](std::shared_ptr& msg) { // dynamic pointer cast may return nullptr // in which case that message in vector will be nullptr messages.push_back(std::dynamic_pointer_cast(std::move(msg))); }); return messages; } std::vector> getAll() { return getAll(); } template std::vector> getAll(std::chrono::duration timeout, bool& hasTimedout) { if(!running) throw std::runtime_error(exceptionMessage.c_str()); std::vector> messages; hasTimedout = !queue.waitAndConsumeAll( [&messages](std::shared_ptr& msg) { // dynamic pointer cast may return nullptr // in which case that message in vector will be nullptr messages.push_back(std::dynamic_pointer_cast(std::move(msg))); }, timeout); return messages; } template std::vector> getAll(std::chrono::duration timeout, bool& hasTimedout) { return getAll(timeout, hasTimedout); } }; class DataInputQueue { LockingQueue> queue; std::thread writingThread; std::atomic running{true}; std::string exceptionMessage; const std::string name; std::atomic maxDataSize{device::XLINK_USB_BUFFER_MAX_SIZE}; public: DataInputQueue(const std::shared_ptr conn, const std::string& streamName, unsigned int maxSize = 16, bool blocking = true, std::size_t maxDataSize = device::XLINK_USB_BUFFER_MAX_SIZE); ~DataInputQueue(); bool isClosed() const; void close(); void setMaxDataSize(std::size_t maxSize); std::size_t getMaxDataSize(); void setBlocking(bool blocking); bool getBlocking() const; void setMaxSize(unsigned int maxSize); unsigned int getMaxSize() const; std::string getName() const; void send(const std::shared_ptr& rawMsg); void send(const std::shared_ptr& msg); void send(const ADatatype& msg); bool send(const std::shared_ptr& rawMsg, std::chrono::milliseconds timeout); bool send(const std::shared_ptr& msg, std::chrono::milliseconds timeout); bool send(const ADatatype& msg, std::chrono::milliseconds timeout); }; } // namespace dai