Split range tracking logic in QuicStreamSendBuffer from the actual buffer management. This will simplify introducing alternative buffer implementations. PiperOrigin-RevId: 762049799
diff --git a/build/source_list.bzl b/build/source_list.bzl index d6a55f4..37e3f2b 100644 --- a/build/source_list.bzl +++ b/build/source_list.bzl
@@ -362,6 +362,7 @@ "quic/core/quic_stream_id_manager.h", "quic/core/quic_stream_priority.h", "quic/core/quic_stream_send_buffer.h", + "quic/core/quic_stream_send_buffer_base.h", "quic/core/quic_stream_sequencer.h", "quic/core/quic_stream_sequencer_buffer.h", "quic/core/quic_sustained_bandwidth_recorder.h", @@ -684,6 +685,7 @@ "quic/core/quic_stream_id_manager.cc", "quic/core/quic_stream_priority.cc", "quic/core/quic_stream_send_buffer.cc", + "quic/core/quic_stream_send_buffer_base.cc", "quic/core/quic_stream_sequencer.cc", "quic/core/quic_stream_sequencer_buffer.cc", "quic/core/quic_sustained_bandwidth_recorder.cc",
diff --git a/build/source_list.gni b/build/source_list.gni index e0371a5..6a24274 100644 --- a/build/source_list.gni +++ b/build/source_list.gni
@@ -362,6 +362,7 @@ "src/quiche/quic/core/quic_stream_id_manager.h", "src/quiche/quic/core/quic_stream_priority.h", "src/quiche/quic/core/quic_stream_send_buffer.h", + "src/quiche/quic/core/quic_stream_send_buffer_base.h", "src/quiche/quic/core/quic_stream_sequencer.h", "src/quiche/quic/core/quic_stream_sequencer_buffer.h", "src/quiche/quic/core/quic_sustained_bandwidth_recorder.h", @@ -684,6 +685,7 @@ "src/quiche/quic/core/quic_stream_id_manager.cc", "src/quiche/quic/core/quic_stream_priority.cc", "src/quiche/quic/core/quic_stream_send_buffer.cc", + "src/quiche/quic/core/quic_stream_send_buffer_base.cc", "src/quiche/quic/core/quic_stream_sequencer.cc", "src/quiche/quic/core/quic_stream_sequencer_buffer.cc", "src/quiche/quic/core/quic_sustained_bandwidth_recorder.cc",
diff --git a/build/source_list.json b/build/source_list.json index 947bbf7..d7a7523 100644 --- a/build/source_list.json +++ b/build/source_list.json
@@ -361,6 +361,7 @@ "quiche/quic/core/quic_stream_id_manager.h", "quiche/quic/core/quic_stream_priority.h", "quiche/quic/core/quic_stream_send_buffer.h", + "quiche/quic/core/quic_stream_send_buffer_base.h", "quiche/quic/core/quic_stream_sequencer.h", "quiche/quic/core/quic_stream_sequencer_buffer.h", "quiche/quic/core/quic_sustained_bandwidth_recorder.h", @@ -683,6 +684,7 @@ "quiche/quic/core/quic_stream_id_manager.cc", "quiche/quic/core/quic_stream_priority.cc", "quiche/quic/core/quic_stream_send_buffer.cc", + "quiche/quic/core/quic_stream_send_buffer_base.cc", "quiche/quic/core/quic_stream_sequencer.cc", "quiche/quic/core/quic_stream_sequencer_buffer.cc", "quiche/quic/core/quic_sustained_bandwidth_recorder.cc",
diff --git a/quiche/quic/core/http/quic_spdy_session_test.cc b/quiche/quic/core/http/quic_spdy_session_test.cc index 9bd5564..dfec18a 100644 --- a/quiche/quic/core/http/quic_spdy_session_test.cc +++ b/quiche/quic/core/http/quic_spdy_session_test.cc
@@ -1961,7 +1961,7 @@ session_->WritePriority(id, parent_stream_id, Spdy3PriorityToHttp2Weight(priority), exclusive); - QuicStreamSendBuffer& send_buffer = + QuicStreamSendBufferBase& send_buffer = QuicStreamPeer::SendBuffer(headers_stream); ASSERT_EQ(1u, send_buffer.size()); @@ -1970,10 +1970,8 @@ SpdyFramer spdy_framer(SpdyFramer::ENABLE_COMPRESSION); SpdySerializedFrame frame = spdy_framer.SerializeFrame(priority_frame); - const quiche::QuicheMemSlice& slice = - QuicStreamSendBufferPeer::CurrentWriteSlice(&send_buffer)->slice; EXPECT_EQ(absl::string_view(frame.data(), frame.size()), - absl::string_view(slice.data(), slice.length())); + send_buffer.LatestWriteForTest()); } TEST_P(QuicSpdySessionTestClient, Http3ServerPush) { @@ -4153,13 +4151,11 @@ stream->WriteHeaders(std::move(headers), /* fin = */ true, nullptr); EXPECT_TRUE(headers_stream->HasBufferedData()); - QuicStreamSendBuffer& send_buffer = + QuicStreamSendBufferBase& send_buffer = QuicStreamPeer::SendBuffer(headers_stream); ASSERT_EQ(1u, send_buffer.size()); - const quiche::QuicheMemSlice& slice = - QuicStreamSendBufferPeer::CurrentWriteSlice(&send_buffer)->slice; - absl::string_view stream_data(slice.data(), slice.length()); + absl::string_view stream_data = send_buffer.LatestWriteForTest(); std::string expected_stream_data_1; ASSERT_TRUE(
diff --git a/quiche/quic/core/quic_stream.cc b/quiche/quic/core/quic_stream.cc index 92df383..2d28091 100644 --- a/quiche/quic/core/quic_stream.cc +++ b/quiche/quic/core/quic_stream.cc
@@ -387,8 +387,8 @@ stream_contributes_to_connection_flow_control_(true), busy_counter_(0), add_random_padding_after_fin_(false), - send_buffer_( - session->connection()->helper()->GetStreamSendBufferAllocator()), + send_buffer_(std::make_unique<QuicStreamSendBuffer>( + session->connection()->helper()->GetStreamSendBufferAllocator())), buffered_data_threshold_(GetQuicFlag(quic_buffered_data_threshold)), is_static_(is_static), deadline_(QuicTime::Zero()), @@ -420,7 +420,7 @@ QUIC_DVLOG(1) << ENDPOINT << "Stream " << id_ << " gets destroyed while waiting for acks. stream_bytes_outstanding = " - << send_buffer_.stream_bytes_outstanding() + << send_buffer_->stream_bytes_outstanding() << ", fin_outstanding: " << fin_outstanding_; } if (stream_delegate_ != nullptr && type_ != CRYPTO) { @@ -681,7 +681,7 @@ type_ == READ_UNIDIRECTIONAL) { return false; } - reliable_size_ = send_buffer_.stream_offset(); + reliable_size_ = send_buffer_->stream_offset(); return true; } @@ -725,7 +725,7 @@ // Notionally ack unreliable, previously consumed data so that it's not // retransmitted, and the buffer can free the memory. QuicByteCount newly_acked; - send_buffer_.OnStreamDataAcked( + send_buffer_->OnStreamDataAcked( reliable_size_, stream_bytes_written() - reliable_size_, &newly_acked); fin_outstanding_ = false; // Do not wait to close until FIN is acked. fin_lost_ = false; @@ -818,7 +818,7 @@ // Do not respect buffered data upper limit as WriteOrBufferData guarantees // all data to be consumed. if (!data.empty()) { - QuicStreamOffset offset = send_buffer_.stream_offset(); + QuicStreamOffset offset = send_buffer_->stream_offset(); if (kMaxStreamLength - offset < data.length()) { QUIC_BUG(quic_bug_10586_4) << "Write too many data via stream " << id_; OnUnrecoverableError( @@ -826,7 +826,7 @@ absl::StrCat("Write too many data via stream ", id_)); return; } - send_buffer_.SaveStreamData(data); + send_buffer_->SaveStreamData(data); OnDataBuffered(offset, data.length(), ack_listener); } if (!had_buffered_data && (HasBufferedData() || fin_buffered_)) { @@ -921,10 +921,10 @@ consumed_data.fin_consumed = fin; if (!span.empty()) { // Buffer all data if buffered data size is below limit. - QuicStreamOffset offset = send_buffer_.stream_offset(); - consumed_data.bytes_consumed = send_buffer_.SaveMemSliceSpan(span); - if (offset > send_buffer_.stream_offset() || - kMaxStreamLength < send_buffer_.stream_offset()) { + QuicStreamOffset offset = send_buffer_->stream_offset(); + consumed_data.bytes_consumed = send_buffer_->SaveMemSliceSpan(span); + if (offset > send_buffer_->stream_offset() || + kMaxStreamLength < send_buffer_->stream_offset()) { QUIC_BUG(quic_bug_10586_8) << "Write too many data via stream " << id_; OnUnrecoverableError( QUIC_STREAM_LENGTH_OVERFLOW, @@ -945,13 +945,13 @@ } bool QuicStream::HasPendingRetransmission() const { - return send_buffer_.HasPendingRetransmission() || fin_lost_; + return send_buffer_->HasPendingRetransmission() || fin_lost_; } bool QuicStream::IsStreamFrameOutstanding(QuicStreamOffset offset, QuicByteCount data_length, bool fin) const { - return send_buffer_.IsStreamDataOutstanding(offset, data_length) || + return send_buffer_->IsStreamDataOutstanding(offset, data_length) || (fin && fin_outstanding_); } @@ -1045,8 +1045,8 @@ } bool QuicStream::HasBufferedData() const { - QUICHE_DCHECK_GE(send_buffer_.stream_offset(), stream_bytes_written()); - return (send_buffer_.stream_offset() > stream_bytes_written() && + QUICHE_DCHECK_GE(send_buffer_->stream_offset(), stream_bytes_written()); + return (send_buffer_->stream_offset() > stream_bytes_written() && (!rst_stream_at_sent_ || reliable_size_ > stream_bytes_written())); } @@ -1241,8 +1241,8 @@ << "[" << offset << ", " << offset + data_length << "]" << " fin = " << fin_acked; *newly_acked_length = 0; - if (!send_buffer_.OnStreamDataAcked(offset, data_length, - newly_acked_length)) { + if (!send_buffer_->OnStreamDataAcked(offset, data_length, + newly_acked_length)) { OnUnrecoverableError(QUIC_INTERNAL_ERROR, "Trying to ack unsent data."); return false; } @@ -1288,7 +1288,7 @@ void QuicStream::OnStreamFrameRetransmitted(QuicStreamOffset offset, QuicByteCount data_length, bool fin_retransmitted) { - send_buffer_.OnStreamDataRetransmitted(offset, data_length); + send_buffer_->OnStreamDataRetransmitted(offset, data_length); if (fin_retransmitted) { fin_lost_ = false; } @@ -1300,7 +1300,7 @@ << "[" << offset << ", " << offset + data_length << "]" << " fin = " << fin_lost; if (data_length > 0) { - send_buffer_.OnStreamDataLost(offset, data_length); + send_buffer_->OnStreamDataLost(offset, data_length); } if (fin_lost && fin_outstanding_) { fin_lost_ = true; @@ -1365,7 +1365,7 @@ bool QuicStream::IsWaitingForAcks() const { return (!rst_sent_ || stream_error_.ok()) && - (send_buffer_.stream_bytes_outstanding() || fin_outstanding_); + (send_buffer_->stream_bytes_outstanding() || fin_outstanding_); } bool QuicStream::WriteStreamData(QuicStreamOffset offset, @@ -1374,7 +1374,7 @@ QUICHE_DCHECK_LT(0u, data_length); QUIC_DVLOG(2) << ENDPOINT << "Write stream " << id_ << " data from offset " << offset << " length " << data_length; - return send_buffer_.WriteStreamData(offset, data_length, writer); + return send_buffer_->WriteStreamData(offset, data_length, writer); } void QuicStream::WriteBufferedData(EncryptionLevel level) { @@ -1484,8 +1484,8 @@ } uint64_t QuicStream::BufferedDataBytes() const { - QUICHE_DCHECK_GE(send_buffer_.stream_offset(), stream_bytes_written()); - return send_buffer_.stream_offset() - stream_bytes_written(); + QUICHE_DCHECK_GE(send_buffer_->stream_offset(), stream_bytes_written()); + return send_buffer_->stream_offset() - stream_bytes_written(); } bool QuicStream::CanWriteNewData() const { @@ -1497,21 +1497,21 @@ } uint64_t QuicStream::stream_bytes_written() const { - return send_buffer_.stream_bytes_written(); + return send_buffer_->stream_bytes_written(); } const QuicIntervalSet<QuicStreamOffset>& QuicStream::bytes_acked() const { - return send_buffer_.bytes_acked(); + return send_buffer_->bytes_acked(); } void QuicStream::OnStreamDataConsumed(QuicByteCount bytes_consumed) { - send_buffer_.OnStreamDataConsumed(bytes_consumed); + send_buffer_->OnStreamDataConsumed(bytes_consumed); } void QuicStream::WritePendingRetransmission() { while (HasPendingRetransmission()) { QuicConsumedData consumed(0, false); - if (!send_buffer_.HasPendingRetransmission()) { + if (!send_buffer_->HasPendingRetransmission()) { QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ << " retransmits fin only frame."; consumed = stream_delegate_->WritevData( @@ -1524,7 +1524,7 @@ } } else { StreamPendingRetransmission pending = - send_buffer_.NextPendingRetransmission(); + send_buffer_->NextPendingRetransmission(); // Determine whether the lost fin can be bundled with the data. const bool can_bundle_fin = fin_lost_ &&
diff --git a/quiche/quic/core/quic_stream.h b/quiche/quic/core/quic_stream.h index 3f49572..4a7b738 100644 --- a/quiche/quic/core/quic_stream.h +++ b/quiche/quic/core/quic_stream.h
@@ -20,6 +20,7 @@ #include <cstddef> #include <cstdint> #include <list> +#include <memory> #include <optional> #include <string> @@ -34,6 +35,7 @@ #include "quiche/quic/core/quic_packets.h" #include "quiche/quic/core/quic_stream_priority.h" #include "quiche/quic/core/quic_stream_send_buffer.h" +#include "quiche/quic/core/quic_stream_send_buffer_base.h" #include "quiche/quic/core/quic_stream_sequencer.h" #include "quiche/quic/core/quic_types.h" #include "quiche/quic/core/session_notifier_interface.h" @@ -523,9 +525,9 @@ const QuicIntervalSet<QuicStreamOffset>& bytes_acked() const; - const QuicStreamSendBuffer& send_buffer() const { return send_buffer_; } + const QuicStreamSendBufferBase& send_buffer() const { return *send_buffer_; } - QuicStreamSendBuffer& send_buffer() { return send_buffer_; } + QuicStreamSendBufferBase& send_buffer() { return *send_buffer_; } // Called when the write side of the stream is closed, and all of the outgoing // data has been acknowledged. This corresponds to the "Data Recvd" state of @@ -644,7 +646,7 @@ // Send buffer of this stream. Send buffer is cleaned up when data gets acked // or discarded. - QuicStreamSendBuffer send_buffer_; + std::unique_ptr<QuicStreamSendBufferBase> send_buffer_; // Latched value of quic_buffered_data_threshold. const QuicByteCount buffered_data_threshold_;
diff --git a/quiche/quic/core/quic_stream_send_buffer.cc b/quiche/quic/core/quic_stream_send_buffer.cc index c747eaa..f2b9f51 100644 --- a/quiche/quic/core/quic_stream_send_buffer.cc +++ b/quiche/quic/core/quic_stream_send_buffer.cc
@@ -13,6 +13,7 @@ #include "quiche/quic/core/quic_data_writer.h" #include "quiche/quic/core/quic_interval.h" #include "quiche/quic/core/quic_interval_set.h" +#include "quiche/quic/core/quic_stream_send_buffer_base.h" #include "quiche/quic/core/quic_types.h" #include "quiche/quic/platform/api/quic_bug_tracker.h" #include "quiche/quic/platform/api/quic_flags.h" @@ -48,17 +49,10 @@ return QuicInterval<std::size_t>(offset, offset + length); } -bool StreamPendingRetransmission::operator==( - const StreamPendingRetransmission& other) const { - return offset == other.offset && length == other.length; -} - QuicStreamSendBuffer::QuicStreamSendBuffer( quiche::QuicheBufferAllocator* allocator) : allocator_(allocator) {} -QuicStreamSendBuffer::~QuicStreamSendBuffer() {} - void QuicStreamSendBuffer::SaveStreamData(absl::string_view data) { QUIC_DVLOG(2) << "Save stream data offset " << stream_offset_ << " length " << data.length(); @@ -105,11 +99,6 @@ return total; } -void QuicStreamSendBuffer::OnStreamDataConsumed(size_t bytes_consumed) { - stream_bytes_written_ += bytes_consumed; - stream_bytes_outstanding_ += bytes_consumed; -} - bool QuicStreamSendBuffer::WriteStreamData(QuicStreamOffset offset, QuicByteCount data_length, QuicDataWriter* writer) { @@ -137,97 +126,6 @@ return data_length == 0; } -bool QuicStreamSendBuffer::OnStreamDataAcked( - QuicStreamOffset offset, QuicByteCount data_length, - QuicByteCount* newly_acked_length) { - QUIC_DVLOG(2) << "Marking data acked at offset " << offset << " length " - << data_length; - *newly_acked_length = 0; - if (data_length == 0) { - return true; - } - if (bytes_acked_.Empty() || offset >= bytes_acked_.rbegin()->max() || - bytes_acked_.IsDisjoint( - QuicInterval<QuicStreamOffset>(offset, offset + data_length))) { - // Optimization for the typical case, when all data is newly acked. - if (stream_bytes_outstanding_ < data_length) { - return false; - } - bytes_acked_.AddOptimizedForAppend(offset, offset + data_length); - *newly_acked_length = data_length; - stream_bytes_outstanding_ -= data_length; - pending_retransmissions_.Difference(offset, offset + data_length); - if (!FreeMemSlices(offset, offset + data_length)) { - return false; - } - CleanUpBufferedSlices(); - return true; - } - // Exit if no new data gets acked. - if (bytes_acked_.Contains(offset, offset + data_length)) { - return true; - } - // Execute the slow path if newly acked data fill in existing holes. - QuicIntervalSet<QuicStreamOffset> newly_acked(offset, offset + data_length); - newly_acked.Difference(bytes_acked_); - for (const auto& interval : newly_acked) { - *newly_acked_length += (interval.max() - interval.min()); - } - if (stream_bytes_outstanding_ < *newly_acked_length) { - return false; - } - stream_bytes_outstanding_ -= *newly_acked_length; - bytes_acked_.Add(offset, offset + data_length); - pending_retransmissions_.Difference(offset, offset + data_length); - if (newly_acked.Empty()) { - return true; - } - if (!FreeMemSlices(newly_acked.begin()->min(), newly_acked.rbegin()->max())) { - return false; - } - CleanUpBufferedSlices(); - return true; -} - -void QuicStreamSendBuffer::OnStreamDataLost(QuicStreamOffset offset, - QuicByteCount data_length) { - if (data_length == 0) { - return; - } - QuicIntervalSet<QuicStreamOffset> bytes_lost(offset, offset + data_length); - bytes_lost.Difference(bytes_acked_); - if (bytes_lost.Empty()) { - return; - } - for (const auto& lost : bytes_lost) { - pending_retransmissions_.Add(lost.min(), lost.max()); - } -} - -void QuicStreamSendBuffer::OnStreamDataRetransmitted( - QuicStreamOffset offset, QuicByteCount data_length) { - if (data_length == 0) { - return; - } - pending_retransmissions_.Difference(offset, offset + data_length); -} - -bool QuicStreamSendBuffer::HasPendingRetransmission() const { - return !pending_retransmissions_.Empty(); -} - -StreamPendingRetransmission QuicStreamSendBuffer::NextPendingRetransmission() - const { - if (HasPendingRetransmission()) { - const auto pending = pending_retransmissions_.begin(); - return {pending->min(), pending->max() - pending->min()}; - } - QUIC_BUG(quic_bug_10853_3) - << "NextPendingRetransmission is called unexpected with no " - "pending retransmissions."; - return {0, 0}; -} - bool QuicStreamSendBuffer::FreeMemSlices(QuicStreamOffset start, QuicStreamOffset end) { auto it = interval_deque_.DataBegin(); @@ -256,7 +154,7 @@ break; } if (!it->slice.empty() && - bytes_acked_.Contains(it->offset, it->offset + it->slice.length())) { + bytes_acked().Contains(it->offset, it->offset + it->slice.length())) { it->slice.Reset(); } } @@ -270,12 +168,20 @@ } } -bool QuicStreamSendBuffer::IsStreamDataOutstanding( - QuicStreamOffset offset, QuicByteCount data_length) const { - return data_length > 0 && - !bytes_acked_.Contains(offset, offset + data_length); +size_t QuicStreamSendBuffer::size() const { return interval_deque_.Size(); } + +void QuicStreamSendBuffer::SetStreamOffsetForTest(QuicStreamOffset new_offset) { + QuicStreamSendBufferBase::SetStreamOffsetForTest(new_offset); + stream_offset_ = new_offset; } -size_t QuicStreamSendBuffer::size() const { return interval_deque_.Size(); } +absl::string_view QuicStreamSendBuffer::LatestWriteForTest() { + absl::string_view last_slice = ""; + for (auto it = interval_deque_.DataBegin(); it != interval_deque_.DataEnd(); + ++it) { + last_slice = it->slice.AsStringView(); + } + return last_slice; +} } // namespace quic
diff --git a/quiche/quic/core/quic_stream_send_buffer.h b/quiche/quic/core/quic_stream_send_buffer.h index ca23694..fa88e6d 100644 --- a/quiche/quic/core/quic_stream_send_buffer.h +++ b/quiche/quic/core/quic_stream_send_buffer.h
@@ -12,7 +12,7 @@ #include "absl/types/span.h" #include "quiche/quic/core/quic_interval.h" #include "quiche/quic/core/quic_interval_deque.h" -#include "quiche/quic/core/quic_interval_set.h" +#include "quiche/quic/core/quic_stream_send_buffer_base.h" #include "quiche/quic/core/quic_types.h" #include "quiche/common/platform/api/quiche_export.h" #include "quiche/common/quiche_buffer_allocator.h" @@ -22,8 +22,7 @@ namespace test { class QuicStreamSendBufferPeer; -class QuicStreamPeer; -} // namespace test +} class QuicDataWriter; @@ -49,19 +48,6 @@ QuicStreamOffset offset; }; -struct QUICHE_EXPORT StreamPendingRetransmission { - constexpr StreamPendingRetransmission(QuicStreamOffset offset, - QuicByteCount length) - : offset(offset), length(length) {} - - // Starting offset of this pending retransmission. - QuicStreamOffset offset; - // Length of this pending retransmission. - QuicByteCount length; - - bool operator==(const StreamPendingRetransmission& other) const; -}; - // QuicStreamSendBuffer contains a list of QuicStreamDataSlices. New data slices // are added to the tail of the list. Data slices are removed from the head of // the list when they get fully acked. Stream data can be retrieved and acked @@ -69,86 +55,45 @@ // it cannot be written after it is marked as acked. Stream data can be written // out-of-order within those bounds, but note that in-order wites are O(1) // whereas out-of-order writes are O(log(n)), see QuicIntervalDeque for details. -class QUICHE_EXPORT QuicStreamSendBuffer { +class QUICHE_EXPORT QuicStreamSendBuffer : public QuicStreamSendBufferBase { public: explicit QuicStreamSendBuffer(quiche::QuicheBufferAllocator* allocator); - QuicStreamSendBuffer(const QuicStreamSendBuffer& other) = delete; - QuicStreamSendBuffer(QuicStreamSendBuffer&& other) = delete; - ~QuicStreamSendBuffer(); // Save |data| to send buffer. - void SaveStreamData(absl::string_view data); + void SaveStreamData(absl::string_view data) override; // Save |slice| to send buffer. - void SaveMemSlice(quiche::QuicheMemSlice slice); + void SaveMemSlice(quiche::QuicheMemSlice slice) override; // Save all slices in |span| to send buffer. Return total bytes saved. - QuicByteCount SaveMemSliceSpan(absl::Span<quiche::QuicheMemSlice> span); - - // Called when |bytes_consumed| bytes has been consumed by the stream. - void OnStreamDataConsumed(size_t bytes_consumed); + QuicByteCount SaveMemSliceSpan( + absl::Span<quiche::QuicheMemSlice> span) override; // Write |data_length| of data starts at |offset|. Returns true if all data // was successfully written. Returns false if the writer fails to write, or if // the data was already marked as acked, or if the data was never saved in the // first place. bool WriteStreamData(QuicStreamOffset offset, QuicByteCount data_length, - QuicDataWriter* writer); - - // Called when data [offset, offset + data_length) is acked or removed as - // stream is canceled. Removes fully acked data slice from send buffer. Set - // |newly_acked_length|. Returns false if trying to ack unsent data. - bool OnStreamDataAcked(QuicStreamOffset offset, QuicByteCount data_length, - QuicByteCount* newly_acked_length); - - // Called when data [offset, offset + data_length) is considered as lost. - void OnStreamDataLost(QuicStreamOffset offset, QuicByteCount data_length); - - // Called when data [offset, offset + length) was retransmitted. - void OnStreamDataRetransmitted(QuicStreamOffset offset, - QuicByteCount data_length); - - // Returns true if there is pending retransmissions. - bool HasPendingRetransmission() const; - - // Returns next pending retransmissions. - StreamPendingRetransmission NextPendingRetransmission() const; - - // Returns true if data [offset, offset + data_length) is outstanding and - // waiting to be acked. Returns false otherwise. - bool IsStreamDataOutstanding(QuicStreamOffset offset, - QuicByteCount data_length) const; + QuicDataWriter* writer) override; // Number of data slices in send buffer. - size_t size() const; + size_t size() const override; - QuicStreamOffset stream_offset() const { return stream_offset_; } + QuicStreamOffset stream_offset() const override { return stream_offset_; } - uint64_t stream_bytes_written() const { return stream_bytes_written_; } - - uint64_t stream_bytes_outstanding() const { - return stream_bytes_outstanding_; - } - - const QuicIntervalSet<QuicStreamOffset>& bytes_acked() const { - return bytes_acked_; - } - - const QuicIntervalSet<QuicStreamOffset>& pending_retransmissions() const { - return pending_retransmissions_; - } + void SetStreamOffsetForTest(QuicStreamOffset new_offset) override; + absl::string_view LatestWriteForTest() override; private: friend class test::QuicStreamSendBufferPeer; - friend class test::QuicStreamPeer; // Called when data within offset [start, end) gets acked. Frees fully // acked buffered slices if any. Returns false if the corresponding data does // not exist or has been acked. - bool FreeMemSlices(QuicStreamOffset start, QuicStreamOffset end); + bool FreeMemSlices(QuicStreamOffset start, QuicStreamOffset end) override; // Cleanup acked data from the start of the interval. - void CleanUpBufferedSlices(); + void CleanUpBufferedSlices() override; QuicIntervalDeque<BufferedSlice> interval_deque_; @@ -156,18 +101,6 @@ QuicStreamOffset stream_offset_ = 0; quiche::QuicheBufferAllocator* allocator_; - - // Bytes that have been consumed by the stream. - uint64_t stream_bytes_written_ = 0; - - // Bytes that have been consumed and are waiting to be acked. - uint64_t stream_bytes_outstanding_ = 0; - - // Offsets of data that has been acked. - QuicIntervalSet<QuicStreamOffset> bytes_acked_; - - // Data considered as lost and needs to be retransmitted. - QuicIntervalSet<QuicStreamOffset> pending_retransmissions_; }; } // namespace quic
diff --git a/quiche/quic/core/quic_stream_send_buffer_base.cc b/quiche/quic/core/quic_stream_send_buffer_base.cc new file mode 100644 index 0000000..1c15c2d --- /dev/null +++ b/quiche/quic/core/quic_stream_send_buffer_base.cc
@@ -0,0 +1,133 @@ +// Copyright (c) 2017 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "quiche/quic/core/quic_stream_send_buffer_base.h" + +#include <cstddef> + +#include "quiche/quic/core/quic_interval.h" +#include "quiche/quic/core/quic_interval_set.h" +#include "quiche/quic/core/quic_types.h" +#include "quiche/quic/platform/api/quic_bug_tracker.h" +#include "quiche/quic/platform/api/quic_logging.h" +#include "quiche/common/platform/api/quiche_logging.h" +#include "quiche/common/quiche_buffer_allocator.h" +#include "quiche/common/quiche_mem_slice.h" + +namespace quic { + +bool StreamPendingRetransmission::operator==( + const StreamPendingRetransmission& other) const { + return offset == other.offset && length == other.length; +} + +void QuicStreamSendBufferBase::OnStreamDataConsumed(size_t bytes_consumed) { + stream_bytes_written_ += bytes_consumed; + stream_bytes_outstanding_ += bytes_consumed; +} + +bool QuicStreamSendBufferBase::OnStreamDataAcked( + QuicStreamOffset offset, QuicByteCount data_length, + QuicByteCount* newly_acked_length) { + QUIC_DVLOG(2) << "Marking data acked at offset " << offset << " length " + << data_length; + *newly_acked_length = 0; + if (data_length == 0) { + return true; + } + if (bytes_acked_.Empty() || offset >= bytes_acked_.rbegin()->max() || + bytes_acked_.IsDisjoint( + QuicInterval<QuicStreamOffset>(offset, offset + data_length))) { + // Optimization for the typical case, when all data is newly acked. + if (stream_bytes_outstanding_ < data_length) { + return false; + } + bytes_acked_.AddOptimizedForAppend(offset, offset + data_length); + *newly_acked_length = data_length; + stream_bytes_outstanding_ -= data_length; + pending_retransmissions_.Difference(offset, offset + data_length); + if (!FreeMemSlices(offset, offset + data_length)) { + return false; + } + CleanUpBufferedSlices(); + return true; + } + // Exit if no new data gets acked. + if (bytes_acked_.Contains(offset, offset + data_length)) { + return true; + } + // Execute the slow path if newly acked data fill in existing holes. + QuicIntervalSet<QuicStreamOffset> newly_acked(offset, offset + data_length); + newly_acked.Difference(bytes_acked_); + for (const auto& interval : newly_acked) { + *newly_acked_length += (interval.max() - interval.min()); + } + if (stream_bytes_outstanding_ < *newly_acked_length) { + return false; + } + stream_bytes_outstanding_ -= *newly_acked_length; + bytes_acked_.Add(offset, offset + data_length); + pending_retransmissions_.Difference(offset, offset + data_length); + if (newly_acked.Empty()) { + return true; + } + if (!FreeMemSlices(newly_acked.begin()->min(), newly_acked.rbegin()->max())) { + return false; + } + CleanUpBufferedSlices(); + return true; +} + +void QuicStreamSendBufferBase::OnStreamDataLost(QuicStreamOffset offset, + QuicByteCount data_length) { + if (data_length == 0) { + return; + } + QuicIntervalSet<QuicStreamOffset> bytes_lost(offset, offset + data_length); + bytes_lost.Difference(bytes_acked_); + if (bytes_lost.Empty()) { + return; + } + for (const auto& lost : bytes_lost) { + pending_retransmissions_.Add(lost.min(), lost.max()); + } +} + +void QuicStreamSendBufferBase::OnStreamDataRetransmitted( + QuicStreamOffset offset, QuicByteCount data_length) { + if (data_length == 0) { + return; + } + pending_retransmissions_.Difference(offset, offset + data_length); +} + +bool QuicStreamSendBufferBase::HasPendingRetransmission() const { + return !pending_retransmissions_.Empty(); +} + +StreamPendingRetransmission +QuicStreamSendBufferBase::NextPendingRetransmission() const { + if (HasPendingRetransmission()) { + const auto pending = pending_retransmissions_.begin(); + return {pending->min(), pending->max() - pending->min()}; + } + QUIC_BUG(quic_bug_10853_3) + << "NextPendingRetransmission is called unexpected with no " + "pending retransmissions."; + return {0, 0}; +} + +bool QuicStreamSendBufferBase::IsStreamDataOutstanding( + QuicStreamOffset offset, QuicByteCount data_length) const { + return data_length > 0 && + !bytes_acked_.Contains(offset, offset + data_length); +} + +void QuicStreamSendBufferBase::SetStreamOffsetForTest( + QuicStreamOffset new_offset) { + stream_bytes_written_ = new_offset; + stream_bytes_outstanding_ = new_offset; +} + +} // namespace quic
diff --git a/quiche/quic/core/quic_stream_send_buffer_base.h b/quiche/quic/core/quic_stream_send_buffer_base.h new file mode 100644 index 0000000..b2d32ff --- /dev/null +++ b/quiche/quic/core/quic_stream_send_buffer_base.h
@@ -0,0 +1,145 @@ +// Copyright (c) 2017 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef QUICHE_QUIC_CORE_QUIC_STREAM_SEND_BUFFER_BASE_H_ +#define QUICHE_QUIC_CORE_QUIC_STREAM_SEND_BUFFER_BASE_H_ + +#include <cstddef> +#include <cstdint> + +#include "absl/strings/string_view.h" +#include "absl/types/span.h" +#include "quiche/quic/core/quic_interval_set.h" +#include "quiche/quic/core/quic_types.h" +#include "quiche/common/platform/api/quiche_export.h" +#include "quiche/common/quiche_mem_slice.h" + +namespace quic { + +namespace test { +class QuicStreamSendBufferPeer; +class QuicStreamPeer; +} // namespace test + +class QuicDataWriter; + +struct QUICHE_EXPORT StreamPendingRetransmission { + constexpr StreamPendingRetransmission(QuicStreamOffset offset, + QuicByteCount length) + : offset(offset), length(length) {} + + // Starting offset of this pending retransmission. + QuicStreamOffset offset; + // Length of this pending retransmission. + QuicByteCount length; + + bool operator==(const StreamPendingRetransmission& other) const; +}; + +// Base class for different implementations of QuicStreamSendBuffer. +// +// TODO: b/417402601 - merge those classes back once we are done experimenting +// with different implementations. +class QUICHE_EXPORT QuicStreamSendBufferBase { + public: + QuicStreamSendBufferBase() = default; + QuicStreamSendBufferBase(const QuicStreamSendBufferBase& other) = delete; + QuicStreamSendBufferBase(QuicStreamSendBufferBase&& other) = delete; + virtual ~QuicStreamSendBufferBase() = default; + + // Save |data| to send buffer. + virtual void SaveStreamData(absl::string_view data) = 0; + + // Save |slice| to send buffer. + virtual void SaveMemSlice(quiche::QuicheMemSlice slice) = 0; + + // Save all slices in |span| to send buffer. Return total bytes saved. + virtual QuicByteCount SaveMemSliceSpan( + absl::Span<quiche::QuicheMemSlice> span) = 0; + + // Called when |bytes_consumed| bytes has been consumed by the stream. + virtual void OnStreamDataConsumed(size_t bytes_consumed); + + // Write |data_length| of data starts at |offset|. Returns true if all data + // was successfully written. Returns false if the writer fails to write, or if + // the data was already marked as acked, or if the data was never saved in the + // first place. + virtual bool WriteStreamData(QuicStreamOffset offset, + QuicByteCount data_length, + QuicDataWriter* writer) = 0; + + // Called when data [offset, offset + data_length) is acked or removed as + // stream is canceled. Removes fully acked data slice from send buffer. Set + // |newly_acked_length|. Returns false if trying to ack unsent data. + bool OnStreamDataAcked(QuicStreamOffset offset, QuicByteCount data_length, + QuicByteCount* newly_acked_length); + + // Called when data [offset, offset + data_length) is considered as lost. + void OnStreamDataLost(QuicStreamOffset offset, QuicByteCount data_length); + + // Called when data [offset, offset + length) was retransmitted. + void OnStreamDataRetransmitted(QuicStreamOffset offset, + QuicByteCount data_length); + + // Returns true if there is pending retransmissions. + bool HasPendingRetransmission() const; + + // Returns next pending retransmissions. + StreamPendingRetransmission NextPendingRetransmission() const; + + // Returns true if data [offset, offset + data_length) is outstanding and + // waiting to be acked. Returns false otherwise. + bool IsStreamDataOutstanding(QuicStreamOffset offset, + QuicByteCount data_length) const; + + // Number of data slices in send buffer. + virtual size_t size() const = 0; + + virtual QuicStreamOffset stream_offset() const = 0; + + uint64_t stream_bytes_written() const { return stream_bytes_written_; } + + uint64_t stream_bytes_outstanding() const { + return stream_bytes_outstanding_; + } + + const QuicIntervalSet<QuicStreamOffset>& bytes_acked() const { + return bytes_acked_; + } + + const QuicIntervalSet<QuicStreamOffset>& pending_retransmissions() const { + return pending_retransmissions_; + } + + virtual void SetStreamOffsetForTest(QuicStreamOffset new_offset); + virtual absl::string_view LatestWriteForTest() = 0; + + private: + friend class test::QuicStreamSendBufferPeer; + friend class test::QuicStreamPeer; + + // Called when data within offset [start, end) gets acked. Frees fully + // acked buffered slices if any. Returns false if the corresponding data does + // not exist or has been acked. + virtual bool FreeMemSlices(QuicStreamOffset start, QuicStreamOffset end) = 0; + + // Cleanup acked data from the start of the interval. + virtual void CleanUpBufferedSlices() = 0; + + // Bytes that have been consumed by the stream. + uint64_t stream_bytes_written_ = 0; + + // Bytes that have been consumed and are waiting to be acked. + uint64_t stream_bytes_outstanding_ = 0; + + // Offsets of data that has been acked. + QuicIntervalSet<QuicStreamOffset> bytes_acked_; + + // Data considered as lost and needs to be retransmitted. + QuicIntervalSet<QuicStreamOffset> pending_retransmissions_; +}; + +} // namespace quic + +#endif // QUICHE_QUIC_CORE_QUIC_STREAM_SEND_BUFFER_BASE_H_
diff --git a/quiche/quic/test_tools/quic_stream_peer.cc b/quiche/quic/test_tools/quic_stream_peer.cc index 0fdaa58..cb42642 100644 --- a/quiche/quic/test_tools/quic_stream_peer.cc +++ b/quiche/quic/test_tools/quic_stream_peer.cc
@@ -7,6 +7,7 @@ #include <list> #include "quiche/quic/core/quic_stream.h" +#include "quiche/quic/core/quic_stream_send_buffer_base.h" #include "quiche/quic/core/quic_types.h" #include "quiche/quic/test_tools/quic_flow_controller_peer.h" #include "quiche/quic/test_tools/quic_stream_send_buffer_peer.h" @@ -22,10 +23,7 @@ // static void QuicStreamPeer::SetStreamBytesWritten( QuicStreamOffset stream_bytes_written, QuicStream* stream) { - stream->send_buffer_.stream_bytes_written_ = stream_bytes_written; - stream->send_buffer_.stream_bytes_outstanding_ = stream_bytes_written; - QuicStreamSendBufferPeer::SetStreamOffset(&stream->send_buffer_, - stream_bytes_written); + stream->send_buffer_->SetStreamOffsetForTest(stream_bytes_written); } // static @@ -101,8 +99,8 @@ } // static -QuicStreamSendBuffer& QuicStreamPeer::SendBuffer(QuicStream* stream) { - return stream->send_buffer_; +QuicStreamSendBufferBase& QuicStreamPeer::SendBuffer(QuicStream* stream) { + return stream->send_buffer(); } // static
diff --git a/quiche/quic/test_tools/quic_stream_peer.h b/quiche/quic/test_tools/quic_stream_peer.h index 3525deb..996b78d 100644 --- a/quiche/quic/test_tools/quic_stream_peer.h +++ b/quiche/quic/test_tools/quic_stream_peer.h
@@ -9,6 +9,7 @@ #include "quiche/quic/core/quic_packets.h" #include "quiche/quic/core/quic_stream_send_buffer.h" +#include "quiche/quic/core/quic_stream_send_buffer_base.h" #include "quiche/quic/core/quic_stream_sequencer.h" #include "quiche/quic/core/quic_types.h" @@ -45,7 +46,7 @@ static void SetFinReceived(QuicStream* stream); static void SetFinSent(QuicStream* stream); - static QuicStreamSendBuffer& SendBuffer(QuicStream* stream); + static QuicStreamSendBufferBase& SendBuffer(QuicStream* stream); }; } // namespace test