目录

lab1-实现StreamReassembler


CS144 lab1

lab1具体的相关事宜可以查看博客:https://kiprey.github.io/2021/11/cs144-lab1/

完整项目代码: CS144

StreamReassembler实现

StreamReassembler 类需要完成对底层发送来的数据的顺序进行正确的重组,如下图:

https://img-blog.csdnimg.cn/1ba0899553ca4a688c0b5da9ff7160a8.png

具体来说你只需要实现下列的方法:

class StreamReassembler
{
private:
   // Your code here -- add private members as necessary.
   size_t _unass_base;   //!< The index of the first unassembled byte
   size_t _unass_size;   //!< The number of bytes in the substrings stored but
                         //!< not yet reassembled
   bool   _eof;          //!< The last byte has arrived
   std::string       _buffer;   //!< The unassembled strings
   std::vector<bool> _bitmap;   //!< buffer bitmap

   ByteStream _output;          //!< The reassembled in-order byte stream
   size_t     _capacity;        //!< The maximum number of bytes

public:

   explicit StreamReassembler(size_t capacity);

   void push_substring(const std::string &data, uint64_t index, bool eof);

   [[nodiscard]] const ByteStream &stream_out() const { return _output; }
   ByteStream                     &stream_out() { return _output; }

   size_t unassembled_bytes() const;

   bool empty() const;

   size_t ack_index() const;
};

方法有很多,具体最重要的方法是 push_substring(const string& data,uint64_t index,bool eof) ,该方法传入需要重排的数据,以及这个数据开始位置的绝对序号(index),还有表示是否是最后一块数据(eof).

我这里实现重排的逻辑如下:

首先需要一个 bitmap 记录每个序号的数据是否已经存入需要被重排的相应位置中,这个结构记录的数据作用有二:

  1. 对应序号的数据是否被加入重排队列.
  2. 已经重排的数据的下一个序号是否存在连续可重排数据.

第一个作用大家都能理解,那么第二个作用是什么意思呢?

每次把数据push进去之后,需要判断是否存在一个连续的需要重排的数据,且这个连续数据的第一个位置是已经重排数据的下一个位置,如果存在,那么需要写入已重排的缓冲区中.

整个数据重排的过程可以用下图进行表示:

https://img-blog.csdnimg.cn/2e19f427c418403384b55ae1911040b8.png

具体实现

我们先设计如下成员变量:

   size_t _unass_base;   //!< The index of the first unassembled byte
   size_t _unass_size;   //!< The number of bytes in the substrings stored but
                         //!< not yet reassembled
   bool   _eof;          //!< The last byte has arrived
   std::string       _buffer;   //!< The unassembled strings
   std::vector<bool> _bitmap;   //!< buffer bitmap

   ByteStream _output;          //!< The reassembled in-order byte stream
   size_t     _capacity;        //!< The maximum number of bytes
  • _unass_base 变量用于表示当前已经重排过多少数据,该值可以结合用户传入的 index 算出用户的数据在待重排队列中的 offset .
  • _unass_size 变量表示待重排数据的长度.
  • _eof 表示传入的数据块是最后一个.
  • _buffer 是待重排的队列.
  • _output 是已重排的队列.
  • _capacity 待重排和已重拍队列的最大容量,且待重排和已重拍(未读取)的数据不能超过这个数值.

对于每个数据的重排,分为下列三种情况:

  1. index-_unass_base>capacity 数据的标号顺序已经超过了当前重排器的最大范围.这个情况直接跳过暂时不进行重排.
  2. 在满足不超过capacity的情况下, index >= _unass_base ,需要计算 offset 然后插入待重排队列.
  3. 在满足不超过capacity的情况下,index + len > _unass_base ,同样计算offset插入队列,但这个offset的计算过程以及插入的位置均与第二种情况不同.

画图表示如下:

C:\Users\Alone\AppData\Roaming\Typora\typora-user-images\image-20230414003300518.png

在执行完插入待重排队列的操作后,我们还需要检查是重排队列的队头是否存在待重排数据,如果存在,则把整个连续的部分都插入到已重排的队列中,当然整个插入的过程就好像溪水流入的过程一样,需要将这些连续的数据删除,并让后续的数据流到前面来,最后不够capacity的部分补0即可.

讲完整个实现逻辑,我们发现这两个队列非常适合用C++的 std::deque 容器来实现,它既可以按下标访问,又能够支持队列的所有操作.

但是我这里的实现并没有使用 deque 而是普通的 vector 和 string,我建议可以使用deque,网络上的几乎所有实现也都是用的deque.

实现源码如下:

//! \details This functions calls just after pushing a substring into the
//! _output stream. It aims to check if there exists any contiguous substrings
//! recorded earlier can be push into the stream.
void StreamReassembler::check_contiguous()
{
   size_t len = 0;
   if (_bitmap.front())
   {
      while (len < _capacity && _bitmap[len]) ++len;
   }

   if (len > 0)
   {
      _unass_base += len;
      _unass_size -= len;
      _output.write(_buffer.substr(0, len));
      // 把流往前推进
      for (size_t i = 0; i < _capacity; i++)
      {
         if (i + len < _capacity)
         {
            _bitmap[i] = _bitmap[i + len];
            _buffer[i] = _buffer[i + len];
         }
         else { _bitmap[i] = false; }
      }
   }
}

//! \details This function accepts a substring (aka a segment) of bytes,
//! possibly out-of-order, from the logical stream, and assembles any newly
//! contiguous substrings and writes them into the output stream in order.
void StreamReassembler::push_substring(const string &data, const size_t index,
                                       const bool eof)
{
   if (eof) { _eof = true; }
   size_t len = data.length();
   if (len == 0 && _eof && _unass_size == 0)
   {
      _output.end_input();
      return;
   }
   // ignore invalid index
   if (index >= _unass_base + _capacity) return;
   // 未重排数据的序号大于等于已经重排数据的下一个序号
   if (index >= _unass_base)
   {
      int    offset   = index - _unass_base;
      // 减去_output.buffer_size()是为了确保滑动窗口的大小(如果已经重排好的数据没有被读取,那么不一直接收更多的未重排数据)
      size_t real_len = min(len, _capacity - _output.buffer_size() - offset);
      if (real_len < len) { _eof = false; }
      for (size_t i = 0; i < real_len; i++)
      {
         if (_bitmap[i + offset]) continue;
         _buffer[i + offset] = data[i];
         _bitmap[i + offset] = true;
         _unass_size++;
      }
   }
   // 如果未重排数据的序号包含了已经重排的情况,但是整体数据有部分数据未被重排
   else if (index + len > _unass_base)
   {
      int    offset   = _unass_base - index;
      size_t real_len = min(len - offset, _capacity - _output.buffer_size());
      if (real_len < len - offset) { _eof = false; }
      for (size_t i = 0; i < real_len; i++)
      {
         if (_bitmap[i]) continue;
         _buffer[i] = data[i + offset];
         _bitmap[i] = true;
         _unass_size++;
      }
   }
   check_contiguous();
   if (_eof && _unass_size == 0) { _output.end_input(); }
}