先来看 2:WriteOrBufferData()
该函数在三种情况下无动作,即
- 非 fin 包,但是待发数据为空
- fin 包已经缓存了,说明理论上不再有数据需要发送了
- 写端已经关闭
从名字可以看出,该函数要么写数据要么缓存数据。
实际上是先将待发数据缓存到 stream 对象的 send_buffer 中,然后再发送。
这里用到了宏 GetQuicReloadableFlag(flag),它又用到了另一个宏 RELOADABLE(flag):
GetQuicReloadableFlag(flag) -> GetQuicReloadableFlagImpl(flag) ->
GetQuicFlag( RELOADABLE(flag) ) -> GetQuicFlagImpl(flag) -> return flag;
RELOADABLE(flag) -> FLAGS_quic_reloadable_flag_##flag
关于第二个宏的用法可以参考:
define 中 ## 的作用
无法缓存数据的前提是:
(stream data too long) && ( kMaxStreamLength - offset < data.length() )
第一个条件显然是指待发数据超过了 send_buffer 的可用空间;第二个条件,kMaxStreamLength 指一个 quic stream 上可以发送的最大字节量。而 stream_offset_ 表示已发送数据与已缓存数据之和。data.length() 显然是待发数据的长度,因此无法缓存的第二个条件便是 stream 受到流控了。
假设现在满足缓存的条件,看看在 send_buffer 中是怎么缓存的。
send_buffer_.SaveStreamData(&iov, 1, 0, data.length());
在 send_buffer 中,数据是保存在一个个 slice 中,其类型为 QuicMemSlice,容量为 max_data_slice_size,每次拷贝最多 max_data_slice_size 的数据到 send_buffer,拷贝的动作由 QuicUtils::CopyToBuffer() 完成。
那么 slice 的内存空间是谁分配的呢?
slice 构造的时候用了 allocator_,类型为 QuicBufferAllocator*,追溯到 QuicStream 对象构造的时候使用了
session->connection()->helper()->GetStreamSendBufferAllocator() 来初始化 allocator_,
helper() 是 Quic client 对象构造的时候创建的:
new QuicEpollConnectionHelper(epoll_server, QuicAllocator::SIMPLE);
因此可以发现 allocator_ 指向的类型为 SimpleBufferAllocator,该类实际就是对 new 和 delete 做了一次封装,其实际的内存申请动作为
new char[size]
。
3:WriteBufferedData()
session_->ShouldYield(id()) 判断当前 stream 是否需要被阻塞;
获取当前缓存的数据长度 write_length,如果当前 stream 的数据计入 connection 流控数据的话, 发送窗口应选择 stream 发送窗口 和 connection 发送窗口中较小的值。
数据发送:WritevDataInner();
数据发送完成后更新流控数据:
- OnStreamDataConsumed(): send_buffer 更新已发送的字节数,同时记录当前仍需被确认的字节数。
- AddBytesSent():流控对象更新 stream 和 connection 发送的总数据长度。
4:WritevDataInner()
其实现比较简单:
StreamSendingState state = fin ? FIN : NO_FIN;
if (fin && add_random_padding_after_fin_) {
state = FIN_AND_PADDING;
}
return session()->WritevData(this, id(), write_length, offset, state);
主要注意下 offset 的来源:QuicStream::stream_bytes_written(),表示当前 stream 已发送的总数据长度。
6:QuicSession::WritevData()
其实现也比较简单,有两个不发送数据的条件:
- 当前 stream 的 stream ID 为 1(kCryptoStreamId),却不是握手的 stream 对象,会话会保存握手的 stream 对象;
- 不是加密握手 stream,但是未加密数据,非加密握手 stream 已有共享密钥可以使用。
不满足上述两个条件则发送数据:
QuicConsumedData data =
connection_->SendStreamData(id, write_length, offset, state);
8:QuicConnection::SendStreamData()
如果是非 FIN 包,但待发数据长度为 0,则不发送数据。否则发送数据:
return packet_generator_.ConsumeData(id, write_length, offset, state);
10:QuicPacketGenerator::ConsumeData()
如果当前是加密握手 stream,且有待重传数据的话,优先将重传数据发送。
这个方法稍微复杂些,我们慢慢分析。但是抓住一条线就能更容易的理解了,那就是一个 packet 有大小限制,
帧的大小受限于 packet 的 payload 的大小,payload 指除 packet header 外的大小,一个 packet 有一个或多个 frame。
如果当前是加密握手 stream,且有待重传数据的话,优先将重传数据发送:
const bool flush = has_handshake && packet_creator_.HasPendingRetransmittableFrames();
SendQueuedFrames(flush);
接下来是下面的代码,来看看它的作用:
if (!packet_creator_.HasRoomForStreamFrame(id, offset)) {
packet_creator_.Flush();
}
目前 QuicPacketCreator 打开了一个 packet,可以一直添加 frame 直到无可用空间为止,此时需要继续往 packet 写数据,
因此需要先判断当前 packet 的剩余空间是否足够容纳一个完整的 frame。
那么它是怎么判断空间足够的呢?
首先看 HasRoomForStreamFrame() 的实现:
bool QuicPacketCreator::HasRoomForStreamFrame(QuicStreamId id, QuicStreamOffset offset) {
return BytesFree() > QuicFramer::GetMinStreamFrameSize(
framer_->transport_version(), id, offset, true);
}
显然,当可用空间大于一个 frame 最小所需空间即可满足。
BytesFree() 的计算:
size_t QuicPacketCreator::BytesFree() {
DCHECK_GE(max_plaintext_size_, PacketSize());
return max_plaintext_size_ -
std::min(max_plaintext_size_, PacketSize() + ExpansionOnNewFrame());
}
max_plaintext_size_ 默认值为 1350,ExpansionOnNewFrame() 为 2 或者 0,变化最大的是 PacketSize() 的值,
每当有新的 frame 添加到 packet 时,它的值就会增加相应的大小。因此 BytesFree() 的值代表当前 packet 最大可容纳的
payload 长度的剩余大小,即还差多少字节就达到 payload 的最大值。
接下来有一个变量:
run_fast_path = !has_handshake && state != FIN_AND_PADDING
&& !HasQueuedFrames()
&& write_length - total_bytes_consumed > kMaxPacketSize;
可见该变量为真,则必须满足四个条件:
- 当前的 stream 不是握手 stream
- stream 状态不为 FIN_AND_PADDING
- 无正在排队的帧
- 待写数据长度大于 kMaxPacketSize,说明需要多个 packet 才能将数据发送完。
满足的话会进入 ConsumeDataFastPath(),现在分析该函数:
它循环调用 QuicPacketCreator::CreateAndSerializeStreamFrame(),同时记录每次被消耗的数据大小,
累加到 total_bytes_consumed,直到待发数据全部被消耗。
现在分析 CreateAndSerializeStreamFrame():
构造 packet header 并写入 buffer 中,计算 packet payload 的大小,即:
const size_t available_size =
max_plaintext_size_ - writer.length() - min_frame_size;
构造一个 stream frame,填充 frame type、stream id、offset 等字段到 buffer 中:
if (!framer_->AppendTypeByte(QuicFrame(frame.get()),
/* no stream frame length */ true, &writer)) {
QUIC_BUG << "AppendTypeByte failed";
return;
}
if (!framer_->AppendStreamFrame(*frame, /* no stream frame length */ true,
&writer)) {
QUIC_BUG << "AppendStreamFrame failed";
return;
}
最后根据当前的加密级别队 payload 进行加密。最后将 packet 发送出去。
这里在分析下 QuicFramer::AppendStreamFrame(),它中间有这么一段代码:
if (!data_producer_->WriteStreamData(frame.stream_id, frame.offset,
frame.data_length, writer)) {
QUIC_BUG << "Writing frame data failed.";
return false;
}
上面这行语句的执行与下面这句的执行是互斥的:
if (!writer->WriteBytes(frame.data_buffer, frame.data_length)) {
QUIC_BUG << "Writing frame data failed.";
return false;
}
即,writer 的数据来源要么是 frame 已有的数据,要么是 send_buffer 中的数据(此时的拷贝动作以 offset为准)。
注意,writer 中的 buffer 是一个 packet 的数据,send_buffer 中数据是一个个的 slice,每一个 slice 有一个 offset,
代表它在所属 stream 的偏移量。send_buffer 的内存抽象如下图:
void QuicSession::Initialize() {
connection_->set_visitor(this);
connection_->SetSessionNotifier(this);
connection_->SetDataProducer(this);
connection_->SetFromConfig(config_);
DCHECK_EQ(kCryptoStreamId, GetMutableCryptoStream()->id());
static_stream_map_[kCryptoStreamId] = GetMutableCryptoStream();
}
最终定位到 QuicStreamSendBuffer::WriteStreamData(),这里的分析需要用到刚才 send_buffer 的内存图,
bool QuicStreamSendBuffer::WriteStreamData(QuicStreamOffset offset,
QuicByteCount data_length,
QuicDataWriter* writer) {
if (use_write_index_) {
return WriteStreamDataWithIndex(offset, data_length, writer);
}
for (const BufferedSlice& slice : buffered_slices_) {
if (data_length == 0 || offset < slice.offset) {
break;
}
if (offset >= slice.offset + slice.slice.length()) {
continue;
}
QuicByteCount slice_offset = offset - slice.offset;
QuicByteCount copy_length =
std::min(data_length, slice.slice.length() - slice_offset);
if (!writer->WriteBytes(slice.slice.data() + slice_offset, copy_length)) {
return false;
}
offset += copy_length;
data_length -= copy_length;
}
return data_length == 0;
}
先不看 use_write_index_ 为 true 的情况,for 循环所做的事就是遍历所有的 slice,
- 第一个 if 表示待发送数据长度为 0 ,或者当前的 offset 小于第一个 slice 的 offset(说明待拷贝数据中存在已经发送的数据),则不拷贝数据,
- 第二个 if 表示 offset 的位置在后面的 slice 中,因此跳到下一个 slice 中继续寻找
14:QuicPacketCreator::SerializePacket()
首先构造了一个 packet header,其具体的内容待会分析。
padding frame 的构造,其构造分为满填充和随机填充,满填充就是当 packet 的 payload 的长度小于 max_plaintext_size_ 时,
剩余字节数的大小就是 padding frame 的大小;随机填充,可能包含满填充,但是填充的大小是随机值。
接下来看它是如何构造一个完整的 packet,源码见 QuicFramer::BuildDataPacket():
设置 public_flags 的各个位,然后依次将 Public Flags、Connection ID(可选)、Quic version(可选)、Diversification Nonce(可选)、
Packet Number 写入缓冲区。其中 Packet Number 是可变长度,且有可能需要将主机序(小端)转换为网络序(大端),那么看看 Packet Number
写入缓冲区的过程,假设 Packet Number 的长度为 1 个字节,其存储单元为 uint32_t,从主机序到网络序如下图:
Packet Number 的值是从对应存储单位的低字节位置,即 0x78,转换为网络序以后,需要从 &value) + sizeof(value) – num_bytes 的位置
开始拷贝,num_bytes 指 Packet Number 的长度,即写入缓冲区的长度,本例为 1。至此,完成了 Packet Number 的缓冲区写入。
接下来构造 packet 的 payload,根据不同帧的格式依次将各帧的内容写入至缓冲区。
然后是对 payload 加密,加密过程设计到的两个函数 QuicFramer::EncryptInPlace() 和 NullEncrypter::EncryptPacket() 中相应参数如下图所示:
至此,一个完整的加密握手 packet 构造完成。
17:QuicConnection::SendOrQueuePacket()
这个函数做的事情比较简单,如果当前 connection 有其它 packet 正在排队或者发送当前 packet 失败,
立马将当前 packet 缓存到 queue_packets 中,等待下一次发送。