Program Listing for File CBorStream.h

Return to documentation for file (src/desert_classes/CBorStream.h)

/****************************************************************************
 * Copyright (C) 2024 Davide Costa                                          *
 *                                                                          *
 * This file is part of RMW desert.                                         *
 *                                                                          *
 *   RMW desert is free software: you can redistribute it and/or modify it  *
 *   under the terms of the GNU General Public License as published by the  *
 *   Free Software Foundation, either version 3 of the License, or any      *
 *   later version.                                                         *
 *                                                                          *
 *   RMW desert is distributed in the hope that it will be useful,          *
 *   but WITHOUT ANY WARRANTY; without even the implied warranty of         *
 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the          *
 *   GNU General Public License for more details.                           *
 *                                                                          *
 *   You should have received a copy of the GNU General Public License      *
 *   along with RMW desert.  If not, see <http://www.gnu.org/licenses/>.    *
 ****************************************************************************/

#ifndef CBORSTREAM_H_
#define CBORSTREAM_H_

#include "TcpDaemon.h"
#include "TopicsConfig.h"

#include <map>
#include <queue>
#include <utility>
#include <vector>
#include <string>
#include <locale>
#include <codecvt>
#include <cstdint>
#include <cstdio>
#include <mutex>

#include "cbor/encoder.h"
#include "cbor/ieee754.h"
#include "cbor/decoder.h"
#include "cbor/parser.h"
#include "cbor/helper.h"

#include "half.hpp"

#define PUBLISHER_TYPE  0
#define SUBSCRIBER_TYPE 1
#define CLIENT_TYPE     2
#define SERVICE_TYPE    3

#define MAX_BUFFER_CAPACITY 100

template <typename T, int MaxLen, typename Container=std::deque<T>>
class CircularQueue : public std::queue<T, Container> {
  public:
    void push(const T& value)
    {
        if (this->size() == MaxLen)
        {
           this->c.pop_front();
        }
        std::queue<T, Container>::push(value);
    }
};

namespace cbor
{

class TxStream
{
  public:
    TxStream(uint8_t stream_type, std::string stream_name, uint8_t stream_identifier);

    void start_transmission(uint64_t sequence_id);
    void start_transmission();
    void end_transmission();

    TxStream & operator<<(const uint64_t n);
    TxStream & operator<<(const uint32_t n);
    TxStream & operator<<(const uint16_t n);
    TxStream & operator<<(const uint8_t n);
    TxStream & operator<<(const int64_t n);
    TxStream & operator<<(const int32_t n);
    TxStream & operator<<(const int16_t n);
    TxStream & operator<<(const int8_t n);
    TxStream & operator<<(const char n);
    TxStream & operator<<(const float f);
    TxStream & operator<<(const double d);
    TxStream & operator<<(const std::string s);
    TxStream & operator<<(const std::u16string s);
    TxStream & operator<<(const bool b);

    template<typename T>
    inline TxStream & operator<<(const std::vector<T> v)
    {
      *this << static_cast<const uint32_t>(v.size());
      return serialize_sequence(v.data(), v.size());
    }

    TxStream & operator<<(const std::vector<bool> v);

    template<typename T>
    inline TxStream & serialize_sequence(const T * items, size_t size)
    {
      for (size_t i = 0; i < size; ++i)
      {
        *this << items[i];
      }
      return *this;
    }

  private:
    uint8_t _stream_type;
    std::string _stream_name;
    uint8_t _stream_identifier;

    bool _overflow;
    uint8_t *  _packet;
    cbor_writer_t *  _writer;

    void new_packet();
    void handle_overrun(cbor_error_t result);

    std::string toUTF8(const std::u16string source);

};

class RxStream
{
  public:
    RxStream(uint8_t stream_type, std::string stream_name, uint8_t stream_identifier);

    ~RxStream();

    bool data_available(int64_t sequence_id = 0);

    void clear_buffer();

    RxStream & operator>>(uint64_t & n);
    RxStream & operator>>(uint32_t & n);
    RxStream & operator>>(uint16_t & n);
    RxStream & operator>>(uint8_t & n);
    RxStream & operator>>(int64_t & n);
    RxStream & operator>>(int32_t & n);
    RxStream & operator>>(int16_t & n);
    RxStream & operator>>(int8_t & n);

    template<typename T>
    RxStream & deserialize_integer(T & n);

    RxStream & operator>>(char & n);
    RxStream & operator>>(float & f);
    RxStream & operator>>(double & d);
    RxStream & operator>>(std::string & s);
    RxStream & operator>>(std::u16string & s);
    RxStream & operator>>(bool & b);

    template<typename T>
    inline RxStream & operator>>(std::vector<T> & v)
    {
      uint32_t size;
      *this >> size;
      v.resize(size);

      return deserialize_sequence(v.data(), size);
    }

    RxStream & operator>>(std::vector<bool> & v);

    template<typename T>
    inline RxStream & deserialize_sequence(T * items, size_t size)
    {
      for (size_t i = 0; i < size; ++i)
      {
        *this >> items[i];
      }
      return *this;
    }


    uint8_t get_type() const;
    std::string get_name() const;
    uint8_t get_identifier() const;

    void push_packet(std::vector<std::pair<void *, int>> packet);

    static void interpret_packets();

  private:
    uint8_t _stream_type;
    std::string _stream_name;
    uint8_t _stream_identifier;

    size_t _buffered_iterator;

    // packets: <packet <field, field_type>>
    std::vector<std::pair<void *, int>> _buffered_packet;
    CircularQueue<std::vector<std::pair<void *, int>>, MAX_BUFFER_CAPACITY> _received_packets;

    static const std::map<int, int> _stream_type_match_map;
    static std::vector<RxStream *> _listening_streams;

    union _cbor_value {
    int8_t i8;
    int16_t i16;
    int32_t i32;
    int64_t i64;
    float f32;
    double f64;
    uint8_t *bin;
    char *str;
    uint8_t str_copy[128];
    };

    static std::mutex _rx_mutex;

    static std::pair<void *, int> interpret_field(cbor_item_t * items, size_t i, union _cbor_value & val);
    std::u16string toUTF16(const std::string source);
};

}  // namespace cbor


#endif