lab1-实现StreamReassembler
CS144 lab1
lab1具体的相关事宜可以查看博客:https://kiprey.github.io/2021/11/cs144-lab1/
完整项目代码: CS144
StreamReassembler实现
StreamReassembler
类需要完成对底层发送来的数据的顺序进行正确的重组,如下图:
具体来说你只需要实现下列的方法:
class StreamReassembler
{
private:
// Your code here -- add private members as necessary.
size_t _unass_base; //!< The index of the first unassembled byte
size_t _unass_size; //!< The number of bytes in the substrings stored but
//!< not yet reassembled
bool _eof; //!< The last byte has arrived
std::string _buffer; //!< The unassembled strings
std::vector<bool> _bitmap; //!< buffer bitmap
ByteStream _output; //!< The reassembled in-order byte stream
size_t _capacity; //!< The maximum number of bytes
public:
explicit StreamReassembler(size_t capacity);
void push_substring(const std::string &data, uint64_t index, bool eof);
[[nodiscard]] const ByteStream &stream_out() const { return _output; }
ByteStream &stream_out() { return _output; }
size_t unassembled_bytes() const;
bool empty() const;
size_t ack_index() const;
};
方法有很多,具体最重要的方法是 push_substring(const string& data,uint64_t index,bool eof)
,该方法传入需要重排的数据,以及这个数据开始位置的绝对序号(index),还有表示是否是最后一块数据(eof).
我这里实现重排的逻辑如下:
首先需要一个 bitmap
记录每个序号的数据是否已经存入需要被重排的相应位置中,这个结构记录的数据作用有二:
- 对应序号的数据是否被加入重排队列.
- 已经重排的数据的下一个序号是否存在连续可重排数据.
第一个作用大家都能理解,那么第二个作用是什么意思呢?
每次把数据push进去之后,需要判断是否存在一个连续的需要重排的数据,且这个连续数据的第一个位置是已经重排数据的下一个位置,如果存在,那么需要写入已重排的缓冲区中.
整个数据重排的过程可以用下图进行表示:
具体实现
我们先设计如下成员变量:
size_t _unass_base; //!< The index of the first unassembled byte
size_t _unass_size; //!< The number of bytes in the substrings stored but
//!< not yet reassembled
bool _eof; //!< The last byte has arrived
std::string _buffer; //!< The unassembled strings
std::vector<bool> _bitmap; //!< buffer bitmap
ByteStream _output; //!< The reassembled in-order byte stream
size_t _capacity; //!< The maximum number of bytes
_unass_base
变量用于表示当前已经重排过多少数据,该值可以结合用户传入的index
算出用户的数据在待重排队列中的offset
._unass_size
变量表示待重排数据的长度._eof
表示传入的数据块是最后一个._buffer
是待重排的队列._output
是已重排的队列._capacity
待重排和已重拍队列的最大容量,且待重排和已重拍(未读取)的数据不能超过这个数值.
对于每个数据的重排,分为下列三种情况:
index-_unass_base>capacity
数据的标号顺序已经超过了当前重排器的最大范围.这个情况直接跳过暂时不进行重排.- 在满足不超过capacity的情况下,
index >= _unass_base
,需要计算 offset 然后插入待重排队列. - 在满足不超过capacity的情况下,
index + len > _unass_base
,同样计算offset插入队列,但这个offset的计算过程以及插入的位置均与第二种情况不同.
画图表示如下:
在执行完插入待重排队列的操作后,我们还需要检查是重排队列的队头是否存在待重排数据,如果存在,则把整个连续的部分都插入到已重排的队列中,当然整个插入的过程就好像溪水流入的过程一样,需要将这些连续的数据删除,并让后续的数据流到前面来,最后不够capacity的部分补0即可.
讲完整个实现逻辑,我们发现这两个队列非常适合用C++的 std::deque
容器来实现,它既可以按下标访问,又能够支持队列的所有操作.
但是我这里的实现并没有使用 deque 而是普通的 vector 和 string,我建议可以使用deque,网络上的几乎所有实现也都是用的deque.
实现源码如下:
//! \details This functions calls just after pushing a substring into the
//! _output stream. It aims to check if there exists any contiguous substrings
//! recorded earlier can be push into the stream.
void StreamReassembler::check_contiguous()
{
size_t len = 0;
if (_bitmap.front())
{
while (len < _capacity && _bitmap[len]) ++len;
}
if (len > 0)
{
_unass_base += len;
_unass_size -= len;
_output.write(_buffer.substr(0, len));
// 把流往前推进
for (size_t i = 0; i < _capacity; i++)
{
if (i + len < _capacity)
{
_bitmap[i] = _bitmap[i + len];
_buffer[i] = _buffer[i + len];
}
else { _bitmap[i] = false; }
}
}
}
//! \details This function accepts a substring (aka a segment) of bytes,
//! possibly out-of-order, from the logical stream, and assembles any newly
//! contiguous substrings and writes them into the output stream in order.
void StreamReassembler::push_substring(const string &data, const size_t index,
const bool eof)
{
if (eof) { _eof = true; }
size_t len = data.length();
if (len == 0 && _eof && _unass_size == 0)
{
_output.end_input();
return;
}
// ignore invalid index
if (index >= _unass_base + _capacity) return;
// 未重排数据的序号大于等于已经重排数据的下一个序号
if (index >= _unass_base)
{
int offset = index - _unass_base;
// 减去_output.buffer_size()是为了确保滑动窗口的大小(如果已经重排好的数据没有被读取,那么不一直接收更多的未重排数据)
size_t real_len = min(len, _capacity - _output.buffer_size() - offset);
if (real_len < len) { _eof = false; }
for (size_t i = 0; i < real_len; i++)
{
if (_bitmap[i + offset]) continue;
_buffer[i + offset] = data[i];
_bitmap[i + offset] = true;
_unass_size++;
}
}
// 如果未重排数据的序号包含了已经重排的情况,但是整体数据有部分数据未被重排
else if (index + len > _unass_base)
{
int offset = _unass_base - index;
size_t real_len = min(len - offset, _capacity - _output.buffer_size());
if (real_len < len - offset) { _eof = false; }
for (size_t i = 0; i < real_len; i++)
{
if (_bitmap[i]) continue;
_buffer[i] = data[i + offset];
_bitmap[i] = true;
_unass_size++;
}
}
check_contiguous();
if (_eof && _unass_size == 0) { _output.end_input(); }
}