|  | // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 
|  | // Use of this source code is governed by a BSD-style license that can be | 
|  | // found in the LICENSE file. | 
|  |  | 
|  | #include "quic/core/quic_stream_sequencer.h" | 
|  |  | 
|  | #include <algorithm> | 
|  | #include <cstddef> | 
|  | #include <limits> | 
|  | #include <string> | 
|  | #include <utility> | 
|  |  | 
|  | #include "absl/strings/str_cat.h" | 
|  | #include "absl/strings/string_view.h" | 
|  | #include "quic/core/quic_clock.h" | 
|  | #include "quic/core/quic_error_codes.h" | 
|  | #include "quic/core/quic_packets.h" | 
|  | #include "quic/core/quic_stream.h" | 
|  | #include "quic/core/quic_stream_sequencer_buffer.h" | 
|  | #include "quic/core/quic_types.h" | 
|  | #include "quic/core/quic_utils.h" | 
|  | #include "quic/platform/api/quic_bug_tracker.h" | 
|  | #include "quic/platform/api/quic_flag_utils.h" | 
|  | #include "quic/platform/api/quic_flags.h" | 
|  | #include "quic/platform/api/quic_logging.h" | 
|  | #include "quic/platform/api/quic_stack_trace.h" | 
|  |  | 
|  | namespace quic { | 
|  |  | 
|  | QuicStreamSequencer::QuicStreamSequencer(StreamInterface* quic_stream) | 
|  | : stream_(quic_stream), | 
|  | buffered_frames_(kStreamReceiveWindowLimit), | 
|  | highest_offset_(0), | 
|  | close_offset_(std::numeric_limits<QuicStreamOffset>::max()), | 
|  | blocked_(false), | 
|  | num_frames_received_(0), | 
|  | num_duplicate_frames_received_(0), | 
|  | ignore_read_data_(false), | 
|  | level_triggered_(false) {} | 
|  |  | 
|  | QuicStreamSequencer::~QuicStreamSequencer() { | 
|  | if (stream_ == nullptr) { | 
|  | QUIC_BUG(quic_bug_10858_1) << "Double free'ing QuicStreamSequencer at " | 
|  | << this << ". " << QuicStackTrace(); | 
|  | } | 
|  | stream_ = nullptr; | 
|  | } | 
|  |  | 
|  | void QuicStreamSequencer::OnStreamFrame(const QuicStreamFrame& frame) { | 
|  | QUICHE_DCHECK_LE(frame.offset + frame.data_length, close_offset_); | 
|  | ++num_frames_received_; | 
|  | const QuicStreamOffset byte_offset = frame.offset; | 
|  | const size_t data_len = frame.data_length; | 
|  |  | 
|  | if (frame.fin && | 
|  | (!CloseStreamAtOffset(frame.offset + data_len) || data_len == 0)) { | 
|  | return; | 
|  | } | 
|  | if (stream_->version().HasIetfQuicFrames() && data_len == 0) { | 
|  | QUICHE_DCHECK(!frame.fin); | 
|  | // Ignore empty frame with no fin. | 
|  | return; | 
|  | } | 
|  | OnFrameData(byte_offset, data_len, frame.data_buffer); | 
|  | } | 
|  |  | 
|  | void QuicStreamSequencer::OnCryptoFrame(const QuicCryptoFrame& frame) { | 
|  | ++num_frames_received_; | 
|  | OnFrameData(frame.offset, frame.data_length, frame.data_buffer); | 
|  | } | 
|  |  | 
|  | void QuicStreamSequencer::OnFrameData(QuicStreamOffset byte_offset, | 
|  | size_t data_len, | 
|  | const char* data_buffer) { | 
|  | highest_offset_ = std::max(highest_offset_, byte_offset + data_len); | 
|  | const size_t previous_readable_bytes = buffered_frames_.ReadableBytes(); | 
|  | size_t bytes_written; | 
|  | std::string error_details; | 
|  | QuicErrorCode result = buffered_frames_.OnStreamData( | 
|  | byte_offset, absl::string_view(data_buffer, data_len), &bytes_written, | 
|  | &error_details); | 
|  | if (result != QUIC_NO_ERROR) { | 
|  | std::string details = | 
|  | absl::StrCat("Stream ", stream_->id(), ": ", | 
|  | QuicErrorCodeToString(result), ": ", error_details); | 
|  | QUIC_LOG_FIRST_N(WARNING, 50) << QuicErrorCodeToString(result); | 
|  | QUIC_LOG_FIRST_N(WARNING, 50) << details; | 
|  | stream_->OnUnrecoverableError(result, details); | 
|  | return; | 
|  | } | 
|  |  | 
|  | if (bytes_written == 0) { | 
|  | ++num_duplicate_frames_received_; | 
|  | // Silently ignore duplicates. | 
|  | return; | 
|  | } | 
|  |  | 
|  | if (blocked_) { | 
|  | return; | 
|  | } | 
|  |  | 
|  | if (level_triggered_) { | 
|  | if (buffered_frames_.ReadableBytes() > previous_readable_bytes) { | 
|  | // Readable bytes has changed, let stream decide if to inform application | 
|  | // or not. | 
|  | if (ignore_read_data_) { | 
|  | FlushBufferedFrames(); | 
|  | } else { | 
|  | stream_->OnDataAvailable(); | 
|  | } | 
|  | } | 
|  | return; | 
|  | } | 
|  | const bool stream_unblocked = | 
|  | previous_readable_bytes == 0 && buffered_frames_.ReadableBytes() > 0; | 
|  | if (stream_unblocked) { | 
|  | if (ignore_read_data_) { | 
|  | FlushBufferedFrames(); | 
|  | } else { | 
|  | stream_->OnDataAvailable(); | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | bool QuicStreamSequencer::CloseStreamAtOffset(QuicStreamOffset offset) { | 
|  | const QuicStreamOffset kMaxOffset = | 
|  | std::numeric_limits<QuicStreamOffset>::max(); | 
|  |  | 
|  | // If there is a scheduled close, the new offset should match it. | 
|  | if (close_offset_ != kMaxOffset && offset != close_offset_) { | 
|  | stream_->OnUnrecoverableError( | 
|  | QUIC_STREAM_SEQUENCER_INVALID_STATE, | 
|  | absl::StrCat( | 
|  | "Stream ", stream_->id(), " received new final offset: ", offset, | 
|  | ", which is different from close offset: ", close_offset_)); | 
|  | return false; | 
|  | } | 
|  |  | 
|  | // The final offset should be no less than the highest offset that is | 
|  | // received. | 
|  | if (offset < highest_offset_) { | 
|  | stream_->OnUnrecoverableError( | 
|  | QUIC_STREAM_SEQUENCER_INVALID_STATE, | 
|  | absl::StrCat( | 
|  | "Stream ", stream_->id(), " received fin with offset: ", offset, | 
|  | ", which reduces current highest offset: ", highest_offset_)); | 
|  | return false; | 
|  | } | 
|  |  | 
|  | close_offset_ = offset; | 
|  |  | 
|  | MaybeCloseStream(); | 
|  | return true; | 
|  | } | 
|  |  | 
|  | void QuicStreamSequencer::MaybeCloseStream() { | 
|  | if (blocked_ || !IsClosed()) { | 
|  | return; | 
|  | } | 
|  |  | 
|  | QUIC_DVLOG(1) << "Passing up termination, as we've processed " | 
|  | << buffered_frames_.BytesConsumed() << " of " << close_offset_ | 
|  | << " bytes."; | 
|  | // This will cause the stream to consume the FIN. | 
|  | // Technically it's an error if |num_bytes_consumed| isn't exactly | 
|  | // equal to |close_offset|, but error handling seems silly at this point. | 
|  | if (ignore_read_data_) { | 
|  | // The sequencer is discarding stream data and must notify the stream on | 
|  | // receipt of a FIN because the consumer won't. | 
|  | stream_->OnFinRead(); | 
|  | } else { | 
|  | stream_->OnDataAvailable(); | 
|  | } | 
|  | buffered_frames_.Clear(); | 
|  | } | 
|  |  | 
|  | int QuicStreamSequencer::GetReadableRegions(iovec* iov, size_t iov_len) const { | 
|  | QUICHE_DCHECK(!blocked_); | 
|  | return buffered_frames_.GetReadableRegions(iov, iov_len); | 
|  | } | 
|  |  | 
|  | bool QuicStreamSequencer::GetReadableRegion(iovec* iov) const { | 
|  | QUICHE_DCHECK(!blocked_); | 
|  | return buffered_frames_.GetReadableRegion(iov); | 
|  | } | 
|  |  | 
|  | bool QuicStreamSequencer::PeekRegion(QuicStreamOffset offset, | 
|  | iovec* iov) const { | 
|  | QUICHE_DCHECK(!blocked_); | 
|  | return buffered_frames_.PeekRegion(offset, iov); | 
|  | } | 
|  |  | 
|  | void QuicStreamSequencer::Read(std::string* buffer) { | 
|  | QUICHE_DCHECK(!blocked_); | 
|  | buffer->resize(buffer->size() + ReadableBytes()); | 
|  | iovec iov; | 
|  | iov.iov_len = ReadableBytes(); | 
|  | iov.iov_base = &(*buffer)[buffer->size() - iov.iov_len]; | 
|  | Readv(&iov, 1); | 
|  | } | 
|  |  | 
|  | size_t QuicStreamSequencer::Readv(const struct iovec* iov, size_t iov_len) { | 
|  | QUICHE_DCHECK(!blocked_); | 
|  | std::string error_details; | 
|  | size_t bytes_read; | 
|  | QuicErrorCode read_error = | 
|  | buffered_frames_.Readv(iov, iov_len, &bytes_read, &error_details); | 
|  | if (read_error != QUIC_NO_ERROR) { | 
|  | std::string details = | 
|  | absl::StrCat("Stream ", stream_->id(), ": ", error_details); | 
|  | stream_->OnUnrecoverableError(read_error, details); | 
|  | return bytes_read; | 
|  | } | 
|  |  | 
|  | stream_->AddBytesConsumed(bytes_read); | 
|  | return bytes_read; | 
|  | } | 
|  |  | 
|  | bool QuicStreamSequencer::HasBytesToRead() const { | 
|  | return buffered_frames_.HasBytesToRead(); | 
|  | } | 
|  |  | 
|  | size_t QuicStreamSequencer::ReadableBytes() const { | 
|  | return buffered_frames_.ReadableBytes(); | 
|  | } | 
|  |  | 
|  | bool QuicStreamSequencer::IsClosed() const { | 
|  | return buffered_frames_.BytesConsumed() >= close_offset_; | 
|  | } | 
|  |  | 
|  | void QuicStreamSequencer::MarkConsumed(size_t num_bytes_consumed) { | 
|  | QUICHE_DCHECK(!blocked_); | 
|  | bool result = buffered_frames_.MarkConsumed(num_bytes_consumed); | 
|  | if (!result) { | 
|  | QUIC_BUG(quic_bug_10858_2) | 
|  | << "Invalid argument to MarkConsumed." | 
|  | << " expect to consume: " << num_bytes_consumed | 
|  | << ", but not enough bytes available. " << DebugString(); | 
|  | stream_->Reset(QUIC_ERROR_PROCESSING_STREAM); | 
|  | return; | 
|  | } | 
|  | stream_->AddBytesConsumed(num_bytes_consumed); | 
|  | } | 
|  |  | 
|  | void QuicStreamSequencer::SetBlockedUntilFlush() { | 
|  | blocked_ = true; | 
|  | } | 
|  |  | 
|  | void QuicStreamSequencer::SetUnblocked() { | 
|  | blocked_ = false; | 
|  | if (IsClosed() || HasBytesToRead()) { | 
|  | stream_->OnDataAvailable(); | 
|  | } | 
|  | } | 
|  |  | 
|  | void QuicStreamSequencer::StopReading() { | 
|  | if (ignore_read_data_) { | 
|  | return; | 
|  | } | 
|  | ignore_read_data_ = true; | 
|  | FlushBufferedFrames(); | 
|  | } | 
|  |  | 
|  | void QuicStreamSequencer::ReleaseBuffer() { | 
|  | buffered_frames_.ReleaseWholeBuffer(); | 
|  | } | 
|  |  | 
|  | void QuicStreamSequencer::ReleaseBufferIfEmpty() { | 
|  | if (buffered_frames_.Empty()) { | 
|  | buffered_frames_.ReleaseWholeBuffer(); | 
|  | } | 
|  | } | 
|  |  | 
|  | void QuicStreamSequencer::FlushBufferedFrames() { | 
|  | QUICHE_DCHECK(ignore_read_data_); | 
|  | size_t bytes_flushed = buffered_frames_.FlushBufferedFrames(); | 
|  | QUIC_DVLOG(1) << "Flushing buffered data at offset " | 
|  | << buffered_frames_.BytesConsumed() << " length " | 
|  | << bytes_flushed << " for stream " << stream_->id(); | 
|  | stream_->AddBytesConsumed(bytes_flushed); | 
|  | MaybeCloseStream(); | 
|  | } | 
|  |  | 
|  | size_t QuicStreamSequencer::NumBytesBuffered() const { | 
|  | return buffered_frames_.BytesBuffered(); | 
|  | } | 
|  |  | 
|  | QuicStreamOffset QuicStreamSequencer::NumBytesConsumed() const { | 
|  | return buffered_frames_.BytesConsumed(); | 
|  | } | 
|  |  | 
|  | const std::string QuicStreamSequencer::DebugString() const { | 
|  | // clang-format off | 
|  | return absl::StrCat("QuicStreamSequencer:", | 
|  | "\n  bytes buffered: ", NumBytesBuffered(), | 
|  | "\n  bytes consumed: ", NumBytesConsumed(), | 
|  | "\n  has bytes to read: ", HasBytesToRead() ? "true" : "false", | 
|  | "\n  frames received: ", num_frames_received(), | 
|  | "\n  close offset bytes: ", close_offset_, | 
|  | "\n  is closed: ", IsClosed() ? "true" : "false"); | 
|  | // clang-format on | 
|  | } | 
|  |  | 
|  | }  // namespace quic |