Split range tracking logic in QuicStreamSendBuffer from the actual buffer management.
This will simplify introducing alternative buffer implementations.
PiperOrigin-RevId: 762049799
diff --git a/build/source_list.bzl b/build/source_list.bzl
index d6a55f4..37e3f2b 100644
--- a/build/source_list.bzl
+++ b/build/source_list.bzl
@@ -362,6 +362,7 @@
"quic/core/quic_stream_id_manager.h",
"quic/core/quic_stream_priority.h",
"quic/core/quic_stream_send_buffer.h",
+ "quic/core/quic_stream_send_buffer_base.h",
"quic/core/quic_stream_sequencer.h",
"quic/core/quic_stream_sequencer_buffer.h",
"quic/core/quic_sustained_bandwidth_recorder.h",
@@ -684,6 +685,7 @@
"quic/core/quic_stream_id_manager.cc",
"quic/core/quic_stream_priority.cc",
"quic/core/quic_stream_send_buffer.cc",
+ "quic/core/quic_stream_send_buffer_base.cc",
"quic/core/quic_stream_sequencer.cc",
"quic/core/quic_stream_sequencer_buffer.cc",
"quic/core/quic_sustained_bandwidth_recorder.cc",
diff --git a/build/source_list.gni b/build/source_list.gni
index e0371a5..6a24274 100644
--- a/build/source_list.gni
+++ b/build/source_list.gni
@@ -362,6 +362,7 @@
"src/quiche/quic/core/quic_stream_id_manager.h",
"src/quiche/quic/core/quic_stream_priority.h",
"src/quiche/quic/core/quic_stream_send_buffer.h",
+ "src/quiche/quic/core/quic_stream_send_buffer_base.h",
"src/quiche/quic/core/quic_stream_sequencer.h",
"src/quiche/quic/core/quic_stream_sequencer_buffer.h",
"src/quiche/quic/core/quic_sustained_bandwidth_recorder.h",
@@ -684,6 +685,7 @@
"src/quiche/quic/core/quic_stream_id_manager.cc",
"src/quiche/quic/core/quic_stream_priority.cc",
"src/quiche/quic/core/quic_stream_send_buffer.cc",
+ "src/quiche/quic/core/quic_stream_send_buffer_base.cc",
"src/quiche/quic/core/quic_stream_sequencer.cc",
"src/quiche/quic/core/quic_stream_sequencer_buffer.cc",
"src/quiche/quic/core/quic_sustained_bandwidth_recorder.cc",
diff --git a/build/source_list.json b/build/source_list.json
index 947bbf7..d7a7523 100644
--- a/build/source_list.json
+++ b/build/source_list.json
@@ -361,6 +361,7 @@
"quiche/quic/core/quic_stream_id_manager.h",
"quiche/quic/core/quic_stream_priority.h",
"quiche/quic/core/quic_stream_send_buffer.h",
+ "quiche/quic/core/quic_stream_send_buffer_base.h",
"quiche/quic/core/quic_stream_sequencer.h",
"quiche/quic/core/quic_stream_sequencer_buffer.h",
"quiche/quic/core/quic_sustained_bandwidth_recorder.h",
@@ -683,6 +684,7 @@
"quiche/quic/core/quic_stream_id_manager.cc",
"quiche/quic/core/quic_stream_priority.cc",
"quiche/quic/core/quic_stream_send_buffer.cc",
+ "quiche/quic/core/quic_stream_send_buffer_base.cc",
"quiche/quic/core/quic_stream_sequencer.cc",
"quiche/quic/core/quic_stream_sequencer_buffer.cc",
"quiche/quic/core/quic_sustained_bandwidth_recorder.cc",
diff --git a/quiche/quic/core/http/quic_spdy_session_test.cc b/quiche/quic/core/http/quic_spdy_session_test.cc
index 9bd5564..dfec18a 100644
--- a/quiche/quic/core/http/quic_spdy_session_test.cc
+++ b/quiche/quic/core/http/quic_spdy_session_test.cc
@@ -1961,7 +1961,7 @@
session_->WritePriority(id, parent_stream_id,
Spdy3PriorityToHttp2Weight(priority), exclusive);
- QuicStreamSendBuffer& send_buffer =
+ QuicStreamSendBufferBase& send_buffer =
QuicStreamPeer::SendBuffer(headers_stream);
ASSERT_EQ(1u, send_buffer.size());
@@ -1970,10 +1970,8 @@
SpdyFramer spdy_framer(SpdyFramer::ENABLE_COMPRESSION);
SpdySerializedFrame frame = spdy_framer.SerializeFrame(priority_frame);
- const quiche::QuicheMemSlice& slice =
- QuicStreamSendBufferPeer::CurrentWriteSlice(&send_buffer)->slice;
EXPECT_EQ(absl::string_view(frame.data(), frame.size()),
- absl::string_view(slice.data(), slice.length()));
+ send_buffer.LatestWriteForTest());
}
TEST_P(QuicSpdySessionTestClient, Http3ServerPush) {
@@ -4153,13 +4151,11 @@
stream->WriteHeaders(std::move(headers), /* fin = */ true, nullptr);
EXPECT_TRUE(headers_stream->HasBufferedData());
- QuicStreamSendBuffer& send_buffer =
+ QuicStreamSendBufferBase& send_buffer =
QuicStreamPeer::SendBuffer(headers_stream);
ASSERT_EQ(1u, send_buffer.size());
- const quiche::QuicheMemSlice& slice =
- QuicStreamSendBufferPeer::CurrentWriteSlice(&send_buffer)->slice;
- absl::string_view stream_data(slice.data(), slice.length());
+ absl::string_view stream_data = send_buffer.LatestWriteForTest();
std::string expected_stream_data_1;
ASSERT_TRUE(
diff --git a/quiche/quic/core/quic_stream.cc b/quiche/quic/core/quic_stream.cc
index 92df383..2d28091 100644
--- a/quiche/quic/core/quic_stream.cc
+++ b/quiche/quic/core/quic_stream.cc
@@ -387,8 +387,8 @@
stream_contributes_to_connection_flow_control_(true),
busy_counter_(0),
add_random_padding_after_fin_(false),
- send_buffer_(
- session->connection()->helper()->GetStreamSendBufferAllocator()),
+ send_buffer_(std::make_unique<QuicStreamSendBuffer>(
+ session->connection()->helper()->GetStreamSendBufferAllocator())),
buffered_data_threshold_(GetQuicFlag(quic_buffered_data_threshold)),
is_static_(is_static),
deadline_(QuicTime::Zero()),
@@ -420,7 +420,7 @@
QUIC_DVLOG(1)
<< ENDPOINT << "Stream " << id_
<< " gets destroyed while waiting for acks. stream_bytes_outstanding = "
- << send_buffer_.stream_bytes_outstanding()
+ << send_buffer_->stream_bytes_outstanding()
<< ", fin_outstanding: " << fin_outstanding_;
}
if (stream_delegate_ != nullptr && type_ != CRYPTO) {
@@ -681,7 +681,7 @@
type_ == READ_UNIDIRECTIONAL) {
return false;
}
- reliable_size_ = send_buffer_.stream_offset();
+ reliable_size_ = send_buffer_->stream_offset();
return true;
}
@@ -725,7 +725,7 @@
// Notionally ack unreliable, previously consumed data so that it's not
// retransmitted, and the buffer can free the memory.
QuicByteCount newly_acked;
- send_buffer_.OnStreamDataAcked(
+ send_buffer_->OnStreamDataAcked(
reliable_size_, stream_bytes_written() - reliable_size_, &newly_acked);
fin_outstanding_ = false; // Do not wait to close until FIN is acked.
fin_lost_ = false;
@@ -818,7 +818,7 @@
// Do not respect buffered data upper limit as WriteOrBufferData guarantees
// all data to be consumed.
if (!data.empty()) {
- QuicStreamOffset offset = send_buffer_.stream_offset();
+ QuicStreamOffset offset = send_buffer_->stream_offset();
if (kMaxStreamLength - offset < data.length()) {
QUIC_BUG(quic_bug_10586_4) << "Write too many data via stream " << id_;
OnUnrecoverableError(
@@ -826,7 +826,7 @@
absl::StrCat("Write too many data via stream ", id_));
return;
}
- send_buffer_.SaveStreamData(data);
+ send_buffer_->SaveStreamData(data);
OnDataBuffered(offset, data.length(), ack_listener);
}
if (!had_buffered_data && (HasBufferedData() || fin_buffered_)) {
@@ -921,10 +921,10 @@
consumed_data.fin_consumed = fin;
if (!span.empty()) {
// Buffer all data if buffered data size is below limit.
- QuicStreamOffset offset = send_buffer_.stream_offset();
- consumed_data.bytes_consumed = send_buffer_.SaveMemSliceSpan(span);
- if (offset > send_buffer_.stream_offset() ||
- kMaxStreamLength < send_buffer_.stream_offset()) {
+ QuicStreamOffset offset = send_buffer_->stream_offset();
+ consumed_data.bytes_consumed = send_buffer_->SaveMemSliceSpan(span);
+ if (offset > send_buffer_->stream_offset() ||
+ kMaxStreamLength < send_buffer_->stream_offset()) {
QUIC_BUG(quic_bug_10586_8) << "Write too many data via stream " << id_;
OnUnrecoverableError(
QUIC_STREAM_LENGTH_OVERFLOW,
@@ -945,13 +945,13 @@
}
bool QuicStream::HasPendingRetransmission() const {
- return send_buffer_.HasPendingRetransmission() || fin_lost_;
+ return send_buffer_->HasPendingRetransmission() || fin_lost_;
}
bool QuicStream::IsStreamFrameOutstanding(QuicStreamOffset offset,
QuicByteCount data_length,
bool fin) const {
- return send_buffer_.IsStreamDataOutstanding(offset, data_length) ||
+ return send_buffer_->IsStreamDataOutstanding(offset, data_length) ||
(fin && fin_outstanding_);
}
@@ -1045,8 +1045,8 @@
}
bool QuicStream::HasBufferedData() const {
- QUICHE_DCHECK_GE(send_buffer_.stream_offset(), stream_bytes_written());
- return (send_buffer_.stream_offset() > stream_bytes_written() &&
+ QUICHE_DCHECK_GE(send_buffer_->stream_offset(), stream_bytes_written());
+ return (send_buffer_->stream_offset() > stream_bytes_written() &&
(!rst_stream_at_sent_ || reliable_size_ > stream_bytes_written()));
}
@@ -1241,8 +1241,8 @@
<< "[" << offset << ", " << offset + data_length << "]"
<< " fin = " << fin_acked;
*newly_acked_length = 0;
- if (!send_buffer_.OnStreamDataAcked(offset, data_length,
- newly_acked_length)) {
+ if (!send_buffer_->OnStreamDataAcked(offset, data_length,
+ newly_acked_length)) {
OnUnrecoverableError(QUIC_INTERNAL_ERROR, "Trying to ack unsent data.");
return false;
}
@@ -1288,7 +1288,7 @@
void QuicStream::OnStreamFrameRetransmitted(QuicStreamOffset offset,
QuicByteCount data_length,
bool fin_retransmitted) {
- send_buffer_.OnStreamDataRetransmitted(offset, data_length);
+ send_buffer_->OnStreamDataRetransmitted(offset, data_length);
if (fin_retransmitted) {
fin_lost_ = false;
}
@@ -1300,7 +1300,7 @@
<< "[" << offset << ", " << offset + data_length << "]"
<< " fin = " << fin_lost;
if (data_length > 0) {
- send_buffer_.OnStreamDataLost(offset, data_length);
+ send_buffer_->OnStreamDataLost(offset, data_length);
}
if (fin_lost && fin_outstanding_) {
fin_lost_ = true;
@@ -1365,7 +1365,7 @@
bool QuicStream::IsWaitingForAcks() const {
return (!rst_sent_ || stream_error_.ok()) &&
- (send_buffer_.stream_bytes_outstanding() || fin_outstanding_);
+ (send_buffer_->stream_bytes_outstanding() || fin_outstanding_);
}
bool QuicStream::WriteStreamData(QuicStreamOffset offset,
@@ -1374,7 +1374,7 @@
QUICHE_DCHECK_LT(0u, data_length);
QUIC_DVLOG(2) << ENDPOINT << "Write stream " << id_ << " data from offset "
<< offset << " length " << data_length;
- return send_buffer_.WriteStreamData(offset, data_length, writer);
+ return send_buffer_->WriteStreamData(offset, data_length, writer);
}
void QuicStream::WriteBufferedData(EncryptionLevel level) {
@@ -1484,8 +1484,8 @@
}
uint64_t QuicStream::BufferedDataBytes() const {
- QUICHE_DCHECK_GE(send_buffer_.stream_offset(), stream_bytes_written());
- return send_buffer_.stream_offset() - stream_bytes_written();
+ QUICHE_DCHECK_GE(send_buffer_->stream_offset(), stream_bytes_written());
+ return send_buffer_->stream_offset() - stream_bytes_written();
}
bool QuicStream::CanWriteNewData() const {
@@ -1497,21 +1497,21 @@
}
uint64_t QuicStream::stream_bytes_written() const {
- return send_buffer_.stream_bytes_written();
+ return send_buffer_->stream_bytes_written();
}
const QuicIntervalSet<QuicStreamOffset>& QuicStream::bytes_acked() const {
- return send_buffer_.bytes_acked();
+ return send_buffer_->bytes_acked();
}
void QuicStream::OnStreamDataConsumed(QuicByteCount bytes_consumed) {
- send_buffer_.OnStreamDataConsumed(bytes_consumed);
+ send_buffer_->OnStreamDataConsumed(bytes_consumed);
}
void QuicStream::WritePendingRetransmission() {
while (HasPendingRetransmission()) {
QuicConsumedData consumed(0, false);
- if (!send_buffer_.HasPendingRetransmission()) {
+ if (!send_buffer_->HasPendingRetransmission()) {
QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
<< " retransmits fin only frame.";
consumed = stream_delegate_->WritevData(
@@ -1524,7 +1524,7 @@
}
} else {
StreamPendingRetransmission pending =
- send_buffer_.NextPendingRetransmission();
+ send_buffer_->NextPendingRetransmission();
// Determine whether the lost fin can be bundled with the data.
const bool can_bundle_fin =
fin_lost_ &&
diff --git a/quiche/quic/core/quic_stream.h b/quiche/quic/core/quic_stream.h
index 3f49572..4a7b738 100644
--- a/quiche/quic/core/quic_stream.h
+++ b/quiche/quic/core/quic_stream.h
@@ -20,6 +20,7 @@
#include <cstddef>
#include <cstdint>
#include <list>
+#include <memory>
#include <optional>
#include <string>
@@ -34,6 +35,7 @@
#include "quiche/quic/core/quic_packets.h"
#include "quiche/quic/core/quic_stream_priority.h"
#include "quiche/quic/core/quic_stream_send_buffer.h"
+#include "quiche/quic/core/quic_stream_send_buffer_base.h"
#include "quiche/quic/core/quic_stream_sequencer.h"
#include "quiche/quic/core/quic_types.h"
#include "quiche/quic/core/session_notifier_interface.h"
@@ -523,9 +525,9 @@
const QuicIntervalSet<QuicStreamOffset>& bytes_acked() const;
- const QuicStreamSendBuffer& send_buffer() const { return send_buffer_; }
+ const QuicStreamSendBufferBase& send_buffer() const { return *send_buffer_; }
- QuicStreamSendBuffer& send_buffer() { return send_buffer_; }
+ QuicStreamSendBufferBase& send_buffer() { return *send_buffer_; }
// Called when the write side of the stream is closed, and all of the outgoing
// data has been acknowledged. This corresponds to the "Data Recvd" state of
@@ -644,7 +646,7 @@
// Send buffer of this stream. Send buffer is cleaned up when data gets acked
// or discarded.
- QuicStreamSendBuffer send_buffer_;
+ std::unique_ptr<QuicStreamSendBufferBase> send_buffer_;
// Latched value of quic_buffered_data_threshold.
const QuicByteCount buffered_data_threshold_;
diff --git a/quiche/quic/core/quic_stream_send_buffer.cc b/quiche/quic/core/quic_stream_send_buffer.cc
index c747eaa..f2b9f51 100644
--- a/quiche/quic/core/quic_stream_send_buffer.cc
+++ b/quiche/quic/core/quic_stream_send_buffer.cc
@@ -13,6 +13,7 @@
#include "quiche/quic/core/quic_data_writer.h"
#include "quiche/quic/core/quic_interval.h"
#include "quiche/quic/core/quic_interval_set.h"
+#include "quiche/quic/core/quic_stream_send_buffer_base.h"
#include "quiche/quic/core/quic_types.h"
#include "quiche/quic/platform/api/quic_bug_tracker.h"
#include "quiche/quic/platform/api/quic_flags.h"
@@ -48,17 +49,10 @@
return QuicInterval<std::size_t>(offset, offset + length);
}
-bool StreamPendingRetransmission::operator==(
- const StreamPendingRetransmission& other) const {
- return offset == other.offset && length == other.length;
-}
-
QuicStreamSendBuffer::QuicStreamSendBuffer(
quiche::QuicheBufferAllocator* allocator)
: allocator_(allocator) {}
-QuicStreamSendBuffer::~QuicStreamSendBuffer() {}
-
void QuicStreamSendBuffer::SaveStreamData(absl::string_view data) {
QUIC_DVLOG(2) << "Save stream data offset " << stream_offset_ << " length "
<< data.length();
@@ -105,11 +99,6 @@
return total;
}
-void QuicStreamSendBuffer::OnStreamDataConsumed(size_t bytes_consumed) {
- stream_bytes_written_ += bytes_consumed;
- stream_bytes_outstanding_ += bytes_consumed;
-}
-
bool QuicStreamSendBuffer::WriteStreamData(QuicStreamOffset offset,
QuicByteCount data_length,
QuicDataWriter* writer) {
@@ -137,97 +126,6 @@
return data_length == 0;
}
-bool QuicStreamSendBuffer::OnStreamDataAcked(
- QuicStreamOffset offset, QuicByteCount data_length,
- QuicByteCount* newly_acked_length) {
- QUIC_DVLOG(2) << "Marking data acked at offset " << offset << " length "
- << data_length;
- *newly_acked_length = 0;
- if (data_length == 0) {
- return true;
- }
- if (bytes_acked_.Empty() || offset >= bytes_acked_.rbegin()->max() ||
- bytes_acked_.IsDisjoint(
- QuicInterval<QuicStreamOffset>(offset, offset + data_length))) {
- // Optimization for the typical case, when all data is newly acked.
- if (stream_bytes_outstanding_ < data_length) {
- return false;
- }
- bytes_acked_.AddOptimizedForAppend(offset, offset + data_length);
- *newly_acked_length = data_length;
- stream_bytes_outstanding_ -= data_length;
- pending_retransmissions_.Difference(offset, offset + data_length);
- if (!FreeMemSlices(offset, offset + data_length)) {
- return false;
- }
- CleanUpBufferedSlices();
- return true;
- }
- // Exit if no new data gets acked.
- if (bytes_acked_.Contains(offset, offset + data_length)) {
- return true;
- }
- // Execute the slow path if newly acked data fill in existing holes.
- QuicIntervalSet<QuicStreamOffset> newly_acked(offset, offset + data_length);
- newly_acked.Difference(bytes_acked_);
- for (const auto& interval : newly_acked) {
- *newly_acked_length += (interval.max() - interval.min());
- }
- if (stream_bytes_outstanding_ < *newly_acked_length) {
- return false;
- }
- stream_bytes_outstanding_ -= *newly_acked_length;
- bytes_acked_.Add(offset, offset + data_length);
- pending_retransmissions_.Difference(offset, offset + data_length);
- if (newly_acked.Empty()) {
- return true;
- }
- if (!FreeMemSlices(newly_acked.begin()->min(), newly_acked.rbegin()->max())) {
- return false;
- }
- CleanUpBufferedSlices();
- return true;
-}
-
-void QuicStreamSendBuffer::OnStreamDataLost(QuicStreamOffset offset,
- QuicByteCount data_length) {
- if (data_length == 0) {
- return;
- }
- QuicIntervalSet<QuicStreamOffset> bytes_lost(offset, offset + data_length);
- bytes_lost.Difference(bytes_acked_);
- if (bytes_lost.Empty()) {
- return;
- }
- for (const auto& lost : bytes_lost) {
- pending_retransmissions_.Add(lost.min(), lost.max());
- }
-}
-
-void QuicStreamSendBuffer::OnStreamDataRetransmitted(
- QuicStreamOffset offset, QuicByteCount data_length) {
- if (data_length == 0) {
- return;
- }
- pending_retransmissions_.Difference(offset, offset + data_length);
-}
-
-bool QuicStreamSendBuffer::HasPendingRetransmission() const {
- return !pending_retransmissions_.Empty();
-}
-
-StreamPendingRetransmission QuicStreamSendBuffer::NextPendingRetransmission()
- const {
- if (HasPendingRetransmission()) {
- const auto pending = pending_retransmissions_.begin();
- return {pending->min(), pending->max() - pending->min()};
- }
- QUIC_BUG(quic_bug_10853_3)
- << "NextPendingRetransmission is called unexpected with no "
- "pending retransmissions.";
- return {0, 0};
-}
-
bool QuicStreamSendBuffer::FreeMemSlices(QuicStreamOffset start,
QuicStreamOffset end) {
auto it = interval_deque_.DataBegin();
@@ -256,7 +154,7 @@
break;
}
if (!it->slice.empty() &&
- bytes_acked_.Contains(it->offset, it->offset + it->slice.length())) {
+ bytes_acked().Contains(it->offset, it->offset + it->slice.length())) {
it->slice.Reset();
}
}
@@ -270,12 +168,20 @@
}
}
-bool QuicStreamSendBuffer::IsStreamDataOutstanding(
- QuicStreamOffset offset, QuicByteCount data_length) const {
- return data_length > 0 &&
- !bytes_acked_.Contains(offset, offset + data_length);
+size_t QuicStreamSendBuffer::size() const { return interval_deque_.Size(); }
+
+void QuicStreamSendBuffer::SetStreamOffsetForTest(QuicStreamOffset new_offset) {
+ QuicStreamSendBufferBase::SetStreamOffsetForTest(new_offset);
+ stream_offset_ = new_offset;
}
-size_t QuicStreamSendBuffer::size() const { return interval_deque_.Size(); }
+absl::string_view QuicStreamSendBuffer::LatestWriteForTest() {
+ absl::string_view last_slice = "";
+ for (auto it = interval_deque_.DataBegin(); it != interval_deque_.DataEnd();
+ ++it) {
+ last_slice = it->slice.AsStringView();
+ }
+ return last_slice;
+}
} // namespace quic
diff --git a/quiche/quic/core/quic_stream_send_buffer.h b/quiche/quic/core/quic_stream_send_buffer.h
index ca23694..fa88e6d 100644
--- a/quiche/quic/core/quic_stream_send_buffer.h
+++ b/quiche/quic/core/quic_stream_send_buffer.h
@@ -12,7 +12,7 @@
#include "absl/types/span.h"
#include "quiche/quic/core/quic_interval.h"
#include "quiche/quic/core/quic_interval_deque.h"
-#include "quiche/quic/core/quic_interval_set.h"
+#include "quiche/quic/core/quic_stream_send_buffer_base.h"
#include "quiche/quic/core/quic_types.h"
#include "quiche/common/platform/api/quiche_export.h"
#include "quiche/common/quiche_buffer_allocator.h"
@@ -22,8 +22,7 @@
namespace test {
class QuicStreamSendBufferPeer;
-class QuicStreamPeer;
-} // namespace test
+}
class QuicDataWriter;
@@ -49,19 +48,6 @@
QuicStreamOffset offset;
};
-struct QUICHE_EXPORT StreamPendingRetransmission {
- constexpr StreamPendingRetransmission(QuicStreamOffset offset,
- QuicByteCount length)
- : offset(offset), length(length) {}
-
- // Starting offset of this pending retransmission.
- QuicStreamOffset offset;
- // Length of this pending retransmission.
- QuicByteCount length;
-
- bool operator==(const StreamPendingRetransmission& other) const;
-};
-
// QuicStreamSendBuffer contains a list of QuicStreamDataSlices. New data slices
// are added to the tail of the list. Data slices are removed from the head of
// the list when they get fully acked. Stream data can be retrieved and acked
@@ -69,86 +55,45 @@
// it cannot be written after it is marked as acked. Stream data can be written
// out-of-order within those bounds, but note that in-order wites are O(1)
// whereas out-of-order writes are O(log(n)), see QuicIntervalDeque for details.
-class QUICHE_EXPORT QuicStreamSendBuffer {
+class QUICHE_EXPORT QuicStreamSendBuffer : public QuicStreamSendBufferBase {
public:
explicit QuicStreamSendBuffer(quiche::QuicheBufferAllocator* allocator);
- QuicStreamSendBuffer(const QuicStreamSendBuffer& other) = delete;
- QuicStreamSendBuffer(QuicStreamSendBuffer&& other) = delete;
- ~QuicStreamSendBuffer();
// Save |data| to send buffer.
- void SaveStreamData(absl::string_view data);
+ void SaveStreamData(absl::string_view data) override;
// Save |slice| to send buffer.
- void SaveMemSlice(quiche::QuicheMemSlice slice);
+ void SaveMemSlice(quiche::QuicheMemSlice slice) override;
// Save all slices in |span| to send buffer. Return total bytes saved.
- QuicByteCount SaveMemSliceSpan(absl::Span<quiche::QuicheMemSlice> span);
-
- // Called when |bytes_consumed| bytes has been consumed by the stream.
- void OnStreamDataConsumed(size_t bytes_consumed);
+ QuicByteCount SaveMemSliceSpan(
+ absl::Span<quiche::QuicheMemSlice> span) override;
// Write |data_length| of data starts at |offset|. Returns true if all data
// was successfully written. Returns false if the writer fails to write, or if
// the data was already marked as acked, or if the data was never saved in the
// first place.
bool WriteStreamData(QuicStreamOffset offset, QuicByteCount data_length,
- QuicDataWriter* writer);
-
- // Called when data [offset, offset + data_length) is acked or removed as
- // stream is canceled. Removes fully acked data slice from send buffer. Set
- // |newly_acked_length|. Returns false if trying to ack unsent data.
- bool OnStreamDataAcked(QuicStreamOffset offset, QuicByteCount data_length,
- QuicByteCount* newly_acked_length);
-
- // Called when data [offset, offset + data_length) is considered as lost.
- void OnStreamDataLost(QuicStreamOffset offset, QuicByteCount data_length);
-
- // Called when data [offset, offset + length) was retransmitted.
- void OnStreamDataRetransmitted(QuicStreamOffset offset,
- QuicByteCount data_length);
-
- // Returns true if there is pending retransmissions.
- bool HasPendingRetransmission() const;
-
- // Returns next pending retransmissions.
- StreamPendingRetransmission NextPendingRetransmission() const;
-
- // Returns true if data [offset, offset + data_length) is outstanding and
- // waiting to be acked. Returns false otherwise.
- bool IsStreamDataOutstanding(QuicStreamOffset offset,
- QuicByteCount data_length) const;
+ QuicDataWriter* writer) override;
// Number of data slices in send buffer.
- size_t size() const;
+ size_t size() const override;
- QuicStreamOffset stream_offset() const { return stream_offset_; }
+ QuicStreamOffset stream_offset() const override { return stream_offset_; }
- uint64_t stream_bytes_written() const { return stream_bytes_written_; }
-
- uint64_t stream_bytes_outstanding() const {
- return stream_bytes_outstanding_;
- }
-
- const QuicIntervalSet<QuicStreamOffset>& bytes_acked() const {
- return bytes_acked_;
- }
-
- const QuicIntervalSet<QuicStreamOffset>& pending_retransmissions() const {
- return pending_retransmissions_;
- }
+ void SetStreamOffsetForTest(QuicStreamOffset new_offset) override;
+ absl::string_view LatestWriteForTest() override;
private:
friend class test::QuicStreamSendBufferPeer;
- friend class test::QuicStreamPeer;
// Called when data within offset [start, end) gets acked. Frees fully
// acked buffered slices if any. Returns false if the corresponding data does
// not exist or has been acked.
- bool FreeMemSlices(QuicStreamOffset start, QuicStreamOffset end);
+ bool FreeMemSlices(QuicStreamOffset start, QuicStreamOffset end) override;
// Cleanup acked data from the start of the interval.
- void CleanUpBufferedSlices();
+ void CleanUpBufferedSlices() override;
QuicIntervalDeque<BufferedSlice> interval_deque_;
@@ -156,18 +101,6 @@
QuicStreamOffset stream_offset_ = 0;
quiche::QuicheBufferAllocator* allocator_;
-
- // Bytes that have been consumed by the stream.
- uint64_t stream_bytes_written_ = 0;
-
- // Bytes that have been consumed and are waiting to be acked.
- uint64_t stream_bytes_outstanding_ = 0;
-
- // Offsets of data that has been acked.
- QuicIntervalSet<QuicStreamOffset> bytes_acked_;
-
- // Data considered as lost and needs to be retransmitted.
- QuicIntervalSet<QuicStreamOffset> pending_retransmissions_;
};
} // namespace quic
diff --git a/quiche/quic/core/quic_stream_send_buffer_base.cc b/quiche/quic/core/quic_stream_send_buffer_base.cc
new file mode 100644
index 0000000..1c15c2d
--- /dev/null
+++ b/quiche/quic/core/quic_stream_send_buffer_base.cc
@@ -0,0 +1,133 @@
+// Copyright (c) 2017 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quiche/quic/core/quic_stream_send_buffer_base.h"
+
+#include <cstddef>
+
+#include "quiche/quic/core/quic_interval.h"
+#include "quiche/quic/core/quic_interval_set.h"
+#include "quiche/quic/core/quic_types.h"
+#include "quiche/quic/platform/api/quic_bug_tracker.h"
+#include "quiche/quic/platform/api/quic_logging.h"
+#include "quiche/common/platform/api/quiche_logging.h"
+#include "quiche/common/quiche_buffer_allocator.h"
+#include "quiche/common/quiche_mem_slice.h"
+
+namespace quic {
+
+bool StreamPendingRetransmission::operator==(
+ const StreamPendingRetransmission& other) const {
+ return offset == other.offset && length == other.length;
+}
+
+void QuicStreamSendBufferBase::OnStreamDataConsumed(size_t bytes_consumed) {
+ stream_bytes_written_ += bytes_consumed;
+ stream_bytes_outstanding_ += bytes_consumed;
+}
+
+bool QuicStreamSendBufferBase::OnStreamDataAcked(
+ QuicStreamOffset offset, QuicByteCount data_length,
+ QuicByteCount* newly_acked_length) {
+ QUIC_DVLOG(2) << "Marking data acked at offset " << offset << " length "
+ << data_length;
+ *newly_acked_length = 0;
+ if (data_length == 0) {
+ return true;
+ }
+ if (bytes_acked_.Empty() || offset >= bytes_acked_.rbegin()->max() ||
+ bytes_acked_.IsDisjoint(
+ QuicInterval<QuicStreamOffset>(offset, offset + data_length))) {
+ // Optimization for the typical case, when all data is newly acked.
+ if (stream_bytes_outstanding_ < data_length) {
+ return false;
+ }
+ bytes_acked_.AddOptimizedForAppend(offset, offset + data_length);
+ *newly_acked_length = data_length;
+ stream_bytes_outstanding_ -= data_length;
+ pending_retransmissions_.Difference(offset, offset + data_length);
+ if (!FreeMemSlices(offset, offset + data_length)) {
+ return false;
+ }
+ CleanUpBufferedSlices();
+ return true;
+ }
+ // Exit if no new data gets acked.
+ if (bytes_acked_.Contains(offset, offset + data_length)) {
+ return true;
+ }
+ // Execute the slow path if newly acked data fill in existing holes.
+ QuicIntervalSet<QuicStreamOffset> newly_acked(offset, offset + data_length);
+ newly_acked.Difference(bytes_acked_);
+ for (const auto& interval : newly_acked) {
+ *newly_acked_length += (interval.max() - interval.min());
+ }
+ if (stream_bytes_outstanding_ < *newly_acked_length) {
+ return false;
+ }
+ stream_bytes_outstanding_ -= *newly_acked_length;
+ bytes_acked_.Add(offset, offset + data_length);
+ pending_retransmissions_.Difference(offset, offset + data_length);
+ if (newly_acked.Empty()) {
+ return true;
+ }
+ if (!FreeMemSlices(newly_acked.begin()->min(), newly_acked.rbegin()->max())) {
+ return false;
+ }
+ CleanUpBufferedSlices();
+ return true;
+}
+
+void QuicStreamSendBufferBase::OnStreamDataLost(QuicStreamOffset offset,
+ QuicByteCount data_length) {
+ if (data_length == 0) {
+ return;
+ }
+ QuicIntervalSet<QuicStreamOffset> bytes_lost(offset, offset + data_length);
+ bytes_lost.Difference(bytes_acked_);
+ if (bytes_lost.Empty()) {
+ return;
+ }
+ for (const auto& lost : bytes_lost) {
+ pending_retransmissions_.Add(lost.min(), lost.max());
+ }
+}
+
+void QuicStreamSendBufferBase::OnStreamDataRetransmitted(
+ QuicStreamOffset offset, QuicByteCount data_length) {
+ if (data_length == 0) {
+ return;
+ }
+ pending_retransmissions_.Difference(offset, offset + data_length);
+}
+
+bool QuicStreamSendBufferBase::HasPendingRetransmission() const {
+ return !pending_retransmissions_.Empty();
+}
+
+StreamPendingRetransmission
+QuicStreamSendBufferBase::NextPendingRetransmission() const {
+ if (HasPendingRetransmission()) {
+ const auto pending = pending_retransmissions_.begin();
+ return {pending->min(), pending->max() - pending->min()};
+ }
+ QUIC_BUG(quic_bug_10853_3)
+ << "NextPendingRetransmission is called unexpected with no "
+ "pending retransmissions.";
+ return {0, 0};
+}
+
+bool QuicStreamSendBufferBase::IsStreamDataOutstanding(
+ QuicStreamOffset offset, QuicByteCount data_length) const {
+ return data_length > 0 &&
+ !bytes_acked_.Contains(offset, offset + data_length);
+}
+
+void QuicStreamSendBufferBase::SetStreamOffsetForTest(
+ QuicStreamOffset new_offset) {
+ stream_bytes_written_ = new_offset;
+ stream_bytes_outstanding_ = new_offset;
+}
+
+} // namespace quic
diff --git a/quiche/quic/core/quic_stream_send_buffer_base.h b/quiche/quic/core/quic_stream_send_buffer_base.h
new file mode 100644
index 0000000..b2d32ff
--- /dev/null
+++ b/quiche/quic/core/quic_stream_send_buffer_base.h
@@ -0,0 +1,145 @@
+// Copyright (c) 2017 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_CORE_QUIC_STREAM_SEND_BUFFER_BASE_H_
+#define QUICHE_QUIC_CORE_QUIC_STREAM_SEND_BUFFER_BASE_H_
+
+#include <cstddef>
+#include <cstdint>
+
+#include "absl/strings/string_view.h"
+#include "absl/types/span.h"
+#include "quiche/quic/core/quic_interval_set.h"
+#include "quiche/quic/core/quic_types.h"
+#include "quiche/common/platform/api/quiche_export.h"
+#include "quiche/common/quiche_mem_slice.h"
+
+namespace quic {
+
+namespace test {
+class QuicStreamSendBufferPeer;
+class QuicStreamPeer;
+} // namespace test
+
+class QuicDataWriter;
+
+struct QUICHE_EXPORT StreamPendingRetransmission {
+ constexpr StreamPendingRetransmission(QuicStreamOffset offset,
+ QuicByteCount length)
+ : offset(offset), length(length) {}
+
+ // Starting offset of this pending retransmission.
+ QuicStreamOffset offset;
+ // Length of this pending retransmission.
+ QuicByteCount length;
+
+ bool operator==(const StreamPendingRetransmission& other) const;
+};
+
+// Base class for different implementations of QuicStreamSendBuffer.
+//
+// TODO: b/417402601 - merge those classes back once we are done experimenting
+// with different implementations.
+class QUICHE_EXPORT QuicStreamSendBufferBase {
+ public:
+ QuicStreamSendBufferBase() = default;
+ QuicStreamSendBufferBase(const QuicStreamSendBufferBase& other) = delete;
+ QuicStreamSendBufferBase(QuicStreamSendBufferBase&& other) = delete;
+ virtual ~QuicStreamSendBufferBase() = default;
+
+ // Save |data| to send buffer.
+ virtual void SaveStreamData(absl::string_view data) = 0;
+
+ // Save |slice| to send buffer.
+ virtual void SaveMemSlice(quiche::QuicheMemSlice slice) = 0;
+
+ // Save all slices in |span| to send buffer. Return total bytes saved.
+ virtual QuicByteCount SaveMemSliceSpan(
+ absl::Span<quiche::QuicheMemSlice> span) = 0;
+
+ // Called when |bytes_consumed| bytes has been consumed by the stream.
+ virtual void OnStreamDataConsumed(size_t bytes_consumed);
+
+ // Write |data_length| of data starts at |offset|. Returns true if all data
+ // was successfully written. Returns false if the writer fails to write, or if
+ // the data was already marked as acked, or if the data was never saved in the
+ // first place.
+ virtual bool WriteStreamData(QuicStreamOffset offset,
+ QuicByteCount data_length,
+ QuicDataWriter* writer) = 0;
+
+ // Called when data [offset, offset + data_length) is acked or removed as
+ // stream is canceled. Removes fully acked data slice from send buffer. Set
+ // |newly_acked_length|. Returns false if trying to ack unsent data.
+ bool OnStreamDataAcked(QuicStreamOffset offset, QuicByteCount data_length,
+ QuicByteCount* newly_acked_length);
+
+ // Called when data [offset, offset + data_length) is considered as lost.
+ void OnStreamDataLost(QuicStreamOffset offset, QuicByteCount data_length);
+
+ // Called when data [offset, offset + length) was retransmitted.
+ void OnStreamDataRetransmitted(QuicStreamOffset offset,
+ QuicByteCount data_length);
+
+ // Returns true if there is pending retransmissions.
+ bool HasPendingRetransmission() const;
+
+ // Returns next pending retransmissions.
+ StreamPendingRetransmission NextPendingRetransmission() const;
+
+ // Returns true if data [offset, offset + data_length) is outstanding and
+ // waiting to be acked. Returns false otherwise.
+ bool IsStreamDataOutstanding(QuicStreamOffset offset,
+ QuicByteCount data_length) const;
+
+ // Number of data slices in send buffer.
+ virtual size_t size() const = 0;
+
+ virtual QuicStreamOffset stream_offset() const = 0;
+
+ uint64_t stream_bytes_written() const { return stream_bytes_written_; }
+
+ uint64_t stream_bytes_outstanding() const {
+ return stream_bytes_outstanding_;
+ }
+
+ const QuicIntervalSet<QuicStreamOffset>& bytes_acked() const {
+ return bytes_acked_;
+ }
+
+ const QuicIntervalSet<QuicStreamOffset>& pending_retransmissions() const {
+ return pending_retransmissions_;
+ }
+
+ virtual void SetStreamOffsetForTest(QuicStreamOffset new_offset);
+ virtual absl::string_view LatestWriteForTest() = 0;
+
+ private:
+ friend class test::QuicStreamSendBufferPeer;
+ friend class test::QuicStreamPeer;
+
+ // Called when data within offset [start, end) gets acked. Frees fully
+ // acked buffered slices if any. Returns false if the corresponding data does
+ // not exist or has been acked.
+ virtual bool FreeMemSlices(QuicStreamOffset start, QuicStreamOffset end) = 0;
+
+ // Cleanup acked data from the start of the interval.
+ virtual void CleanUpBufferedSlices() = 0;
+
+ // Bytes that have been consumed by the stream.
+ uint64_t stream_bytes_written_ = 0;
+
+ // Bytes that have been consumed and are waiting to be acked.
+ uint64_t stream_bytes_outstanding_ = 0;
+
+ // Offsets of data that has been acked.
+ QuicIntervalSet<QuicStreamOffset> bytes_acked_;
+
+ // Data considered as lost and needs to be retransmitted.
+ QuicIntervalSet<QuicStreamOffset> pending_retransmissions_;
+};
+
+} // namespace quic
+
+#endif // QUICHE_QUIC_CORE_QUIC_STREAM_SEND_BUFFER_BASE_H_
diff --git a/quiche/quic/test_tools/quic_stream_peer.cc b/quiche/quic/test_tools/quic_stream_peer.cc
index 0fdaa58..cb42642 100644
--- a/quiche/quic/test_tools/quic_stream_peer.cc
+++ b/quiche/quic/test_tools/quic_stream_peer.cc
@@ -7,6 +7,7 @@
#include <list>
#include "quiche/quic/core/quic_stream.h"
+#include "quiche/quic/core/quic_stream_send_buffer_base.h"
#include "quiche/quic/core/quic_types.h"
#include "quiche/quic/test_tools/quic_flow_controller_peer.h"
#include "quiche/quic/test_tools/quic_stream_send_buffer_peer.h"
@@ -22,10 +23,7 @@
// static
void QuicStreamPeer::SetStreamBytesWritten(
QuicStreamOffset stream_bytes_written, QuicStream* stream) {
- stream->send_buffer_.stream_bytes_written_ = stream_bytes_written;
- stream->send_buffer_.stream_bytes_outstanding_ = stream_bytes_written;
- QuicStreamSendBufferPeer::SetStreamOffset(&stream->send_buffer_,
- stream_bytes_written);
+ stream->send_buffer_->SetStreamOffsetForTest(stream_bytes_written);
}
// static
@@ -101,8 +99,8 @@
}
// static
-QuicStreamSendBuffer& QuicStreamPeer::SendBuffer(QuicStream* stream) {
- return stream->send_buffer_;
+QuicStreamSendBufferBase& QuicStreamPeer::SendBuffer(QuicStream* stream) {
+ return stream->send_buffer();
}
// static
diff --git a/quiche/quic/test_tools/quic_stream_peer.h b/quiche/quic/test_tools/quic_stream_peer.h
index 3525deb..996b78d 100644
--- a/quiche/quic/test_tools/quic_stream_peer.h
+++ b/quiche/quic/test_tools/quic_stream_peer.h
@@ -9,6 +9,7 @@
#include "quiche/quic/core/quic_packets.h"
#include "quiche/quic/core/quic_stream_send_buffer.h"
+#include "quiche/quic/core/quic_stream_send_buffer_base.h"
#include "quiche/quic/core/quic_stream_sequencer.h"
#include "quiche/quic/core/quic_types.h"
@@ -45,7 +46,7 @@
static void SetFinReceived(QuicStream* stream);
static void SetFinSent(QuicStream* stream);
- static QuicStreamSendBuffer& SendBuffer(QuicStream* stream);
+ static QuicStreamSendBufferBase& SendBuffer(QuicStream* stream);
};
} // namespace test