Project import generated by Copybara.

PiperOrigin-RevId: 237361882
Change-Id: I109a68f44db867b20f8c6a7732b0ce657133e52a
diff --git a/quic/core/quic_stream.cc b/quic/core/quic_stream.cc
new file mode 100644
index 0000000..86207b6
--- /dev/null
+++ b/quic/core/quic_stream.cc
@@ -0,0 +1,1101 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/third_party/quiche/src/quic/core/quic_stream.h"
+
+#include "net/third_party/quiche/src/quic/core/quic_flow_controller.h"
+#include "net/third_party/quiche/src/quic/core/quic_session.h"
+#include "net/third_party/quiche/src/quic/core/quic_utils.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_bug_tracker.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_flag_utils.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_flags.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_logging.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_str_cat.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_string.h"
+
+using spdy::SpdyPriority;
+
+namespace quic {
+
+#define ENDPOINT \
+  (perspective_ == Perspective::IS_SERVER ? "Server: " : "Client: ")
+
+namespace {
+
+size_t GetInitialStreamFlowControlWindowToSend(QuicSession* session) {
+  return session->config()->GetInitialStreamFlowControlWindowToSend();
+}
+
+size_t GetReceivedFlowControlWindow(QuicSession* session) {
+  if (session->config()->HasReceivedInitialStreamFlowControlWindowBytes()) {
+    return session->config()->ReceivedInitialStreamFlowControlWindowBytes();
+  }
+
+  return kMinimumFlowControlSendWindow;
+}
+
+}  // namespace
+
+// static
+const SpdyPriority QuicStream::kDefaultPriority;
+
+PendingStream::PendingStream(QuicStreamId id, QuicSession* session)
+    : id_(id),
+      session_(session),
+      stream_bytes_read_(0),
+      fin_received_(false),
+      connection_flow_controller_(session->flow_controller()),
+      flow_controller_(session,
+                       id,
+                       /*is_connection_flow_controller*/ false,
+                       GetReceivedFlowControlWindow(session),
+                       GetInitialStreamFlowControlWindowToSend(session),
+                       kStreamReceiveWindowLimit,
+                       session_->flow_controller()->auto_tune_receive_window(),
+                       session_->flow_controller()),
+      sequencer_(this) {}
+
+void PendingStream::OnDataAvailable() {
+  QUIC_BUG << "OnDataAvailable should not be called.";
+  CloseConnectionWithDetails(QUIC_INTERNAL_ERROR, "Unexpected data available");
+}
+
+void PendingStream::OnFinRead() {
+  QUIC_BUG << "OnFinRead should not be called.";
+  CloseConnectionWithDetails(QUIC_INTERNAL_ERROR, "Unexpected fin read");
+}
+
+void PendingStream::AddBytesConsumed(QuicByteCount bytes) {
+  QUIC_BUG << "AddBytesConsumed should not be called.";
+  CloseConnectionWithDetails(QUIC_INTERNAL_ERROR, "Unexpected bytes consumed");
+}
+
+void PendingStream::Reset(QuicRstStreamErrorCode error) {
+  session_->SendRstStream(id_, error, 0);
+}
+
+void PendingStream::CloseConnectionWithDetails(QuicErrorCode error,
+                                               const QuicString& details) {
+  session_->connection()->CloseConnection(
+      error, details, ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
+}
+
+QuicStreamId PendingStream::id() const {
+  return id_;
+}
+
+const QuicSocketAddress& PendingStream::PeerAddressOfLatestPacket() const {
+  return session_->connection()->last_packet_source_address();
+}
+
+void PendingStream::OnStreamFrame(const QuicStreamFrame& frame) {
+  DCHECK_EQ(frame.stream_id, id_);
+  DCHECK_NE(0u, frame.offset);
+
+  bool is_stream_too_long =
+      (frame.offset > kMaxStreamLength) ||
+      (kMaxStreamLength - frame.offset < frame.data_length);
+  if (is_stream_too_long) {
+    // Close connection if stream becomes too long.
+    QUIC_PEER_BUG
+        << "Receive stream frame reaches max stream length. frame offset "
+        << frame.offset << " length " << frame.data_length;
+    CloseConnectionWithDetails(
+        QUIC_STREAM_LENGTH_OVERFLOW,
+        "Peer sends more data than allowed on this stream.");
+    return;
+  }
+
+  if (frame.fin) {
+    fin_received_ = true;
+  }
+
+  // This count includes duplicate data received.
+  size_t frame_payload_size = frame.data_length;
+  stream_bytes_read_ += frame_payload_size;
+
+  // Flow control is interested in tracking highest received offset.
+  // Only interested in received frames that carry data.
+  if (frame_payload_size > 0 &&
+      MaybeIncreaseHighestReceivedOffset(frame.offset + frame_payload_size)) {
+    // As the highest received offset has changed, check to see if this is a
+    // violation of flow control.
+    if (flow_controller_.FlowControlViolation() ||
+        connection_flow_controller_->FlowControlViolation()) {
+      CloseConnectionWithDetails(
+          QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
+          "Flow control violation after increasing offset");
+      return;
+    }
+  }
+
+  sequencer_.OnStreamFrame(frame);
+}
+
+void PendingStream::OnRstStreamFrame(const QuicRstStreamFrame& frame) {
+  DCHECK_EQ(frame.stream_id, id_);
+
+  if (frame.byte_offset > kMaxStreamLength) {
+    // Peer are not suppose to write bytes more than maxium allowed.
+    CloseConnectionWithDetails(QUIC_STREAM_LENGTH_OVERFLOW,
+                               "Reset frame stream offset overflow.");
+    return;
+  }
+  MaybeIncreaseHighestReceivedOffset(frame.byte_offset);
+  if (flow_controller_.FlowControlViolation() ||
+      connection_flow_controller_->FlowControlViolation()) {
+    CloseConnectionWithDetails(
+        QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
+        "Flow control violation after increasing offset");
+    return;
+  }
+}
+
+bool PendingStream::MaybeIncreaseHighestReceivedOffset(
+    QuicStreamOffset new_offset) {
+  uint64_t increment =
+      new_offset - flow_controller_.highest_received_byte_offset();
+  if (!flow_controller_.UpdateHighestReceivedOffset(new_offset)) {
+    return false;
+  }
+
+  // If |new_offset| increased the stream flow controller's highest received
+  // offset, increase the connection flow controller's value by the incremental
+  // difference.
+  connection_flow_controller_->UpdateHighestReceivedOffset(
+      connection_flow_controller_->highest_received_byte_offset() + increment);
+  return true;
+}
+
+QuicStream::QuicStream(PendingStream pending, StreamType type)
+    : QuicStream(pending.id_,
+                 pending.session_,
+                 std::move(pending.sequencer_),
+                 /*is_static=*/false,
+                 type,
+                 pending.stream_bytes_read_,
+                 pending.fin_received_,
+                 std::move(pending.flow_controller_),
+                 pending.connection_flow_controller_) {
+  sequencer_.set_stream(this);
+}
+
+QuicStream::QuicStream(QuicStreamId id,
+                       QuicSession* session,
+                       bool is_static,
+                       StreamType type)
+    : QuicStream(id,
+                 session,
+                 QuicStreamSequencer(this),
+                 is_static,
+                 type,
+                 0,
+                 false,
+                 QuicFlowController(
+                     session,
+                     id,
+                     /*is_connection_flow_controller*/ false,
+                     GetReceivedFlowControlWindow(session),
+                     GetInitialStreamFlowControlWindowToSend(session),
+                     kStreamReceiveWindowLimit,
+                     session->flow_controller()->auto_tune_receive_window(),
+                     session->flow_controller()),
+                 session->flow_controller()) {}
+
+QuicStream::QuicStream(QuicStreamId id,
+                       QuicSession* session,
+                       QuicStreamSequencer sequencer,
+                       bool is_static,
+                       StreamType type,
+                       uint64_t stream_bytes_read,
+                       bool fin_received,
+                       QuicFlowController flow_controller,
+                       QuicFlowController* connection_flow_controller)
+    : sequencer_(std::move(sequencer)),
+      id_(id),
+      session_(session),
+      priority_(kDefaultPriority),
+      stream_bytes_read_(stream_bytes_read),
+      stream_error_(QUIC_STREAM_NO_ERROR),
+      connection_error_(QUIC_NO_ERROR),
+      read_side_closed_(false),
+      write_side_closed_(false),
+      fin_buffered_(false),
+      fin_sent_(false),
+      fin_outstanding_(false),
+      fin_lost_(false),
+      fin_received_(fin_received),
+      rst_sent_(false),
+      rst_received_(false),
+      perspective_(session_->perspective()),
+      flow_controller_(std::move(flow_controller)),
+      connection_flow_controller_(connection_flow_controller),
+      stream_contributes_to_connection_flow_control_(true),
+      busy_counter_(0),
+      add_random_padding_after_fin_(false),
+      send_buffer_(
+          session->connection()->helper()->GetStreamSendBufferAllocator()),
+      buffered_data_threshold_(GetQuicFlag(FLAGS_quic_buffered_data_threshold)),
+      is_static_(is_static),
+      deadline_(QuicTime::Zero()),
+      type_(session->connection()->transport_version() == QUIC_VERSION_99
+                ? QuicUtils::GetStreamType(id_,
+                                           perspective_,
+                                           session->IsIncomingStream(id_))
+                : type) {
+  if (type_ == WRITE_UNIDIRECTIONAL) {
+    set_fin_received(true);
+    CloseReadSide();
+  } else if (type_ == READ_UNIDIRECTIONAL) {
+    set_fin_sent(true);
+    CloseWriteSide();
+  }
+  SetFromConfig();
+  session_->RegisterStreamPriority(id, is_static_, priority_);
+}
+
+QuicStream::~QuicStream() {
+  if (session_ != nullptr && IsWaitingForAcks()) {
+    QUIC_DVLOG(1)
+        << ENDPOINT << "Stream " << id_
+        << " gets destroyed while waiting for acks. stream_bytes_outstanding = "
+        << send_buffer_.stream_bytes_outstanding()
+        << ", fin_outstanding: " << fin_outstanding_;
+  }
+  if (session_ != nullptr) {
+    session_->UnregisterStreamPriority(id(), is_static_);
+  }
+}
+
+void QuicStream::SetFromConfig() {}
+
+void QuicStream::OnStreamFrame(const QuicStreamFrame& frame) {
+  DCHECK_EQ(frame.stream_id, id_);
+
+  DCHECK(!(read_side_closed_ && write_side_closed_));
+
+  if (type_ == WRITE_UNIDIRECTIONAL) {
+    CloseConnectionWithDetails(
+        QUIC_DATA_RECEIVED_ON_WRITE_UNIDIRECTIONAL_STREAM,
+        "Data received on write unidirectional stream");
+    return;
+  }
+
+  bool is_stream_too_long =
+      (frame.offset > kMaxStreamLength) ||
+      (kMaxStreamLength - frame.offset < frame.data_length);
+  if (is_stream_too_long) {
+    // Close connection if stream becomes too long.
+    QUIC_PEER_BUG << "Receive stream frame on stream " << id_
+                  << " reaches max stream length. frame offset " << frame.offset
+                  << " length " << frame.data_length << ". "
+                  << sequencer_.DebugString();
+    CloseConnectionWithDetails(
+        QUIC_STREAM_LENGTH_OVERFLOW,
+        QuicStrCat("Peer sends more data than allowed on stream ", id_,
+                   ". frame: offset = ", frame.offset, ", length = ",
+                   frame.data_length, ". ", sequencer_.DebugString()));
+    return;
+  }
+  if (frame.fin) {
+    fin_received_ = true;
+    if (fin_sent_) {
+      session_->StreamDraining(id_);
+    }
+  }
+
+  if (read_side_closed_) {
+    QUIC_DLOG(INFO)
+        << ENDPOINT << "Stream " << frame.stream_id
+        << " is closed for reading. Ignoring newly received stream data.";
+    // The subclass does not want to read data:  blackhole the data.
+    return;
+  }
+
+  // This count includes duplicate data received.
+  size_t frame_payload_size = frame.data_length;
+  stream_bytes_read_ += frame_payload_size;
+
+  // Flow control is interested in tracking highest received offset.
+  // Only interested in received frames that carry data.
+  if (frame_payload_size > 0 &&
+      MaybeIncreaseHighestReceivedOffset(frame.offset + frame_payload_size)) {
+    // As the highest received offset has changed, check to see if this is a
+    // violation of flow control.
+    if (flow_controller_.FlowControlViolation() ||
+        connection_flow_controller_->FlowControlViolation()) {
+      CloseConnectionWithDetails(
+          QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
+          "Flow control violation after increasing offset");
+      return;
+    }
+  }
+
+  sequencer_.OnStreamFrame(frame);
+}
+
+int QuicStream::num_frames_received() const {
+  return sequencer_.num_frames_received();
+}
+
+int QuicStream::num_duplicate_frames_received() const {
+  return sequencer_.num_duplicate_frames_received();
+}
+
+void QuicStream::OnStreamReset(const QuicRstStreamFrame& frame) {
+  rst_received_ = true;
+  if (frame.byte_offset > kMaxStreamLength) {
+    // Peer are not suppose to write bytes more than maxium allowed.
+    CloseConnectionWithDetails(QUIC_STREAM_LENGTH_OVERFLOW,
+                               "Reset frame stream offset overflow.");
+    return;
+  }
+  MaybeIncreaseHighestReceivedOffset(frame.byte_offset);
+  if (flow_controller_.FlowControlViolation() ||
+      connection_flow_controller_->FlowControlViolation()) {
+    CloseConnectionWithDetails(
+        QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
+        "Flow control violation after increasing offset");
+    return;
+  }
+
+  stream_error_ = frame.error_code;
+  // Google QUIC closes both sides of the stream in response to a
+  // RESET_STREAM, IETF QUIC closes only the read side.
+  if (transport_version() != QUIC_VERSION_99) {
+    CloseWriteSide();
+  }
+  CloseReadSide();
+}
+
+void QuicStream::OnConnectionClosed(QuicErrorCode error,
+                                    ConnectionCloseSource /*source*/) {
+  if (read_side_closed_ && write_side_closed_) {
+    return;
+  }
+  if (error != QUIC_NO_ERROR) {
+    stream_error_ = QUIC_STREAM_CONNECTION_ERROR;
+    connection_error_ = error;
+  }
+
+  CloseWriteSide();
+  CloseReadSide();
+}
+
+void QuicStream::OnFinRead() {
+  DCHECK(sequencer_.IsClosed());
+  // OnFinRead can be called due to a FIN flag in a headers block, so there may
+  // have been no OnStreamFrame call with a FIN in the frame.
+  fin_received_ = true;
+  // If fin_sent_ is true, then CloseWriteSide has already been called, and the
+  // stream will be destroyed by CloseReadSide, so don't need to call
+  // StreamDraining.
+  CloseReadSide();
+}
+
+void QuicStream::Reset(QuicRstStreamErrorCode error) {
+  stream_error_ = error;
+  // Sending a RstStream results in calling CloseStream.
+  session()->SendRstStream(id(), error, stream_bytes_written());
+  rst_sent_ = true;
+}
+
+void QuicStream::CloseConnectionWithDetails(QuicErrorCode error,
+                                            const QuicString& details) {
+  session()->connection()->CloseConnection(
+      error, details, ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
+}
+
+SpdyPriority QuicStream::priority() const {
+  return priority_;
+}
+
+void QuicStream::SetPriority(SpdyPriority priority) {
+  priority_ = priority;
+  session_->UpdateStreamPriority(id(), priority);
+}
+
+void QuicStream::WriteOrBufferData(
+    QuicStringPiece data,
+    bool fin,
+    QuicReferenceCountedPointer<QuicAckListenerInterface> ack_listener) {
+  if (data.empty() && !fin) {
+    QUIC_BUG << "data.empty() && !fin";
+    return;
+  }
+
+  if (fin_buffered_) {
+    QUIC_BUG << "Fin already buffered";
+    return;
+  }
+  if (write_side_closed_) {
+    QUIC_DLOG(ERROR) << ENDPOINT
+                     << "Attempt to write when the write side is closed";
+    if (type_ == READ_UNIDIRECTIONAL) {
+      CloseConnectionWithDetails(
+          QUIC_TRY_TO_WRITE_DATA_ON_READ_UNIDIRECTIONAL_STREAM,
+          "Try to send data on read unidirectional stream");
+    }
+    return;
+  }
+
+  QuicConsumedData consumed_data(0, false);
+  fin_buffered_ = fin;
+
+  bool had_buffered_data = HasBufferedData();
+  // Do not respect buffered data upper limit as WriteOrBufferData guarantees
+  // all data to be consumed.
+  if (data.length() > 0) {
+    struct iovec iov(QuicUtils::MakeIovec(data));
+    QuicStreamOffset offset = send_buffer_.stream_offset();
+    if (kMaxStreamLength - offset < data.length()) {
+      QUIC_BUG << "Write too many data via stream " << id_;
+      CloseConnectionWithDetails(
+          QUIC_STREAM_LENGTH_OVERFLOW,
+          QuicStrCat("Write too many data via stream ", id_));
+      return;
+    }
+    send_buffer_.SaveStreamData(&iov, 1, 0, data.length());
+    OnDataBuffered(offset, data.length(), ack_listener);
+  }
+  if (!had_buffered_data && (HasBufferedData() || fin_buffered_)) {
+    // Write data if there is no buffered data before.
+    WriteBufferedData();
+  }
+}
+
+void QuicStream::OnCanWrite() {
+  if (HasDeadlinePassed()) {
+    OnDeadlinePassed();
+    return;
+  }
+  if (HasPendingRetransmission()) {
+    WritePendingRetransmission();
+    // Exit early to allow other streams to write pending retransmissions if
+    // any.
+    return;
+  }
+
+  if (write_side_closed_) {
+    QUIC_DLOG(ERROR)
+        << ENDPOINT << "Stream " << id()
+        << " attempting to write new data when the write side is closed";
+    return;
+  }
+  if (HasBufferedData() || (fin_buffered_ && !fin_sent_)) {
+    WriteBufferedData();
+  }
+  if (!fin_buffered_ && !fin_sent_ && CanWriteNewData()) {
+    // Notify upper layer to write new data when buffered data size is below
+    // low water mark.
+    OnCanWriteNewData();
+  }
+}
+
+void QuicStream::MaybeSendBlocked() {
+  if (flow_controller_.ShouldSendBlocked()) {
+    session_->SendBlocked(id_);
+  }
+  if (!stream_contributes_to_connection_flow_control_) {
+    return;
+  }
+  if (connection_flow_controller_->ShouldSendBlocked()) {
+    session_->SendBlocked(QuicUtils::GetInvalidStreamId(transport_version()));
+  }
+  // If the stream is blocked by connection-level flow control but not by
+  // stream-level flow control, add the stream to the write blocked list so that
+  // the stream will be given a chance to write when a connection-level
+  // WINDOW_UPDATE arrives.
+  if (connection_flow_controller_->IsBlocked() &&
+      !flow_controller_.IsBlocked()) {
+    session_->MarkConnectionLevelWriteBlocked(id());
+  }
+}
+
+QuicConsumedData QuicStream::WritevData(const struct iovec* iov,
+                                        int iov_count,
+                                        bool fin) {
+  if (write_side_closed_) {
+    QUIC_DLOG(ERROR) << ENDPOINT << "Stream " << id()
+                     << "attempting to write when the write side is closed";
+    if (type_ == READ_UNIDIRECTIONAL) {
+      CloseConnectionWithDetails(
+          QUIC_TRY_TO_WRITE_DATA_ON_READ_UNIDIRECTIONAL_STREAM,
+          "Try to send data on read unidirectional stream");
+    }
+    return QuicConsumedData(0, false);
+  }
+
+  // How much data was provided.
+  size_t write_length = 0;
+  if (iov != nullptr) {
+    for (int i = 0; i < iov_count; ++i) {
+      write_length += iov[i].iov_len;
+    }
+  }
+
+  QuicConsumedData consumed_data(0, false);
+  if (fin_buffered_) {
+    QUIC_BUG << "Fin already buffered";
+    return consumed_data;
+  }
+
+  if (kMaxStreamLength - send_buffer_.stream_offset() < write_length) {
+    QUIC_BUG << "Write too many data via stream " << id_;
+    CloseConnectionWithDetails(
+        QUIC_STREAM_LENGTH_OVERFLOW,
+        QuicStrCat("Write too many data via stream ", id_));
+    return consumed_data;
+  }
+
+  bool had_buffered_data = HasBufferedData();
+  if (CanWriteNewData()) {
+    // Save all data if buffered data size is below low water mark.
+    consumed_data.bytes_consumed = write_length;
+    if (consumed_data.bytes_consumed > 0) {
+      QuicStreamOffset offset = send_buffer_.stream_offset();
+      send_buffer_.SaveStreamData(iov, iov_count, 0, write_length);
+      OnDataBuffered(offset, write_length, nullptr);
+    }
+  }
+  consumed_data.fin_consumed =
+      consumed_data.bytes_consumed == write_length && fin;
+  fin_buffered_ = consumed_data.fin_consumed;
+
+  if (!had_buffered_data && (HasBufferedData() || fin_buffered_)) {
+    // Write data if there is no buffered data before.
+    WriteBufferedData();
+  }
+
+  return consumed_data;
+}
+
+QuicConsumedData QuicStream::WriteMemSlices(QuicMemSliceSpan span, bool fin) {
+  QuicConsumedData consumed_data(0, false);
+  if (span.empty() && !fin) {
+    QUIC_BUG << "span.empty() && !fin";
+    return consumed_data;
+  }
+
+  if (fin_buffered_) {
+    QUIC_BUG << "Fin already buffered";
+    return consumed_data;
+  }
+
+  if (write_side_closed_) {
+    QUIC_DLOG(ERROR) << ENDPOINT << "Stream " << id()
+                     << "attempting to write when the write side is closed";
+    if (type_ == READ_UNIDIRECTIONAL) {
+      CloseConnectionWithDetails(
+          QUIC_TRY_TO_WRITE_DATA_ON_READ_UNIDIRECTIONAL_STREAM,
+          "Try to send data on read unidirectional stream");
+    }
+    return consumed_data;
+  }
+
+  bool had_buffered_data = HasBufferedData();
+  if (CanWriteNewData() || span.empty()) {
+    consumed_data.fin_consumed = fin;
+    if (!span.empty()) {
+      // Buffer all data if buffered data size is below limit.
+      QuicStreamOffset offset = send_buffer_.stream_offset();
+      consumed_data.bytes_consumed =
+          span.SaveMemSlicesInSendBuffer(&send_buffer_);
+      if (offset > send_buffer_.stream_offset() ||
+          kMaxStreamLength < send_buffer_.stream_offset()) {
+        QUIC_BUG << "Write too many data via stream " << id_;
+        CloseConnectionWithDetails(
+            QUIC_STREAM_LENGTH_OVERFLOW,
+            QuicStrCat("Write too many data via stream ", id_));
+        return consumed_data;
+      }
+      OnDataBuffered(offset, consumed_data.bytes_consumed, nullptr);
+    }
+  }
+  fin_buffered_ = consumed_data.fin_consumed;
+
+  if (!had_buffered_data && (HasBufferedData() || fin_buffered_)) {
+    // Write data if there is no buffered data before.
+    WriteBufferedData();
+  }
+
+  return consumed_data;
+}
+
+bool QuicStream::HasPendingRetransmission() const {
+  return send_buffer_.HasPendingRetransmission() || fin_lost_;
+}
+
+bool QuicStream::IsStreamFrameOutstanding(QuicStreamOffset offset,
+                                          QuicByteCount data_length,
+                                          bool fin) const {
+  return send_buffer_.IsStreamDataOutstanding(offset, data_length) ||
+         (fin && fin_outstanding_);
+}
+
+QuicConsumedData QuicStream::WritevDataInner(size_t write_length,
+                                             QuicStreamOffset offset,
+                                             bool fin) {
+  StreamSendingState state = fin ? FIN : NO_FIN;
+  if (fin && add_random_padding_after_fin_) {
+    state = FIN_AND_PADDING;
+  }
+  return session()->WritevData(this, id(), write_length, offset, state);
+}
+
+void QuicStream::CloseReadSide() {
+  if (read_side_closed_) {
+    return;
+  }
+  QUIC_DVLOG(1) << ENDPOINT << "Done reading from stream " << id();
+
+  read_side_closed_ = true;
+  sequencer_.ReleaseBuffer();
+
+  if (write_side_closed_) {
+    QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << id();
+    session_->CloseStream(id());
+  }
+}
+
+void QuicStream::CloseWriteSide() {
+  if (write_side_closed_) {
+    return;
+  }
+  QUIC_DVLOG(1) << ENDPOINT << "Done writing to stream " << id();
+
+  write_side_closed_ = true;
+  if (read_side_closed_) {
+    QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << id();
+    session_->CloseStream(id());
+  }
+}
+
+bool QuicStream::HasBufferedData() const {
+  DCHECK_GE(send_buffer_.stream_offset(), stream_bytes_written());
+  return send_buffer_.stream_offset() > stream_bytes_written();
+}
+
+QuicTransportVersion QuicStream::transport_version() const {
+  return session_->connection()->transport_version();
+}
+
+HandshakeProtocol QuicStream::handshake_protocol() const {
+  return session_->connection()->version().handshake_protocol;
+}
+
+void QuicStream::StopReading() {
+  QUIC_DVLOG(1) << ENDPOINT << "Stop reading from stream " << id();
+  sequencer_.StopReading();
+}
+
+const QuicSocketAddress& QuicStream::PeerAddressOfLatestPacket() const {
+  return session_->connection()->last_packet_source_address();
+}
+
+void QuicStream::OnClose() {
+  CloseReadSide();
+  CloseWriteSide();
+
+  if (!fin_sent_ && !rst_sent_) {
+    // For flow control accounting, tell the peer how many bytes have been
+    // written on this stream before termination. Done here if needed, using a
+    // RST_STREAM frame.
+    QUIC_DLOG(INFO) << ENDPOINT << "Sending RST_STREAM in OnClose: " << id();
+    session_->SendRstStream(id(), QUIC_RST_ACKNOWLEDGEMENT,
+                            stream_bytes_written());
+    session_->OnStreamDoneWaitingForAcks(id_);
+    rst_sent_ = true;
+  }
+
+  if (flow_controller_.FlowControlViolation() ||
+      connection_flow_controller_->FlowControlViolation()) {
+    return;
+  }
+  // The stream is being closed and will not process any further incoming bytes.
+  // As there may be more bytes in flight, to ensure that both endpoints have
+  // the same connection level flow control state, mark all unreceived or
+  // buffered bytes as consumed.
+  QuicByteCount bytes_to_consume =
+      flow_controller_.highest_received_byte_offset() -
+      flow_controller_.bytes_consumed();
+  AddBytesConsumed(bytes_to_consume);
+}
+
+void QuicStream::OnWindowUpdateFrame(const QuicWindowUpdateFrame& frame) {
+  if (flow_controller_.UpdateSendWindowOffset(frame.byte_offset)) {
+    // Let session unblock this stream.
+    session_->MarkConnectionLevelWriteBlocked(id_);
+  }
+}
+
+bool QuicStream::MaybeIncreaseHighestReceivedOffset(
+    QuicStreamOffset new_offset) {
+  uint64_t increment =
+      new_offset - flow_controller_.highest_received_byte_offset();
+  if (!flow_controller_.UpdateHighestReceivedOffset(new_offset)) {
+    return false;
+  }
+
+  // If |new_offset| increased the stream flow controller's highest received
+  // offset, increase the connection flow controller's value by the incremental
+  // difference.
+  if (stream_contributes_to_connection_flow_control_) {
+    connection_flow_controller_->UpdateHighestReceivedOffset(
+        connection_flow_controller_->highest_received_byte_offset() +
+        increment);
+  }
+  return true;
+}
+
+void QuicStream::AddBytesSent(QuicByteCount bytes) {
+  flow_controller_.AddBytesSent(bytes);
+  if (stream_contributes_to_connection_flow_control_) {
+    connection_flow_controller_->AddBytesSent(bytes);
+  }
+}
+
+void QuicStream::AddBytesConsumed(QuicByteCount bytes) {
+  // Only adjust stream level flow controller if still reading.
+  if (!read_side_closed_) {
+    flow_controller_.AddBytesConsumed(bytes);
+  }
+
+  if (stream_contributes_to_connection_flow_control_) {
+    connection_flow_controller_->AddBytesConsumed(bytes);
+  }
+}
+
+void QuicStream::UpdateSendWindowOffset(QuicStreamOffset new_window) {
+  if (flow_controller_.UpdateSendWindowOffset(new_window)) {
+    // Let session unblock this stream.
+    session_->MarkConnectionLevelWriteBlocked(id_);
+  }
+}
+
+void QuicStream::AddRandomPaddingAfterFin() {
+  add_random_padding_after_fin_ = true;
+}
+
+bool QuicStream::OnStreamFrameAcked(QuicStreamOffset offset,
+                                    QuicByteCount data_length,
+                                    bool fin_acked,
+                                    QuicTime::Delta ack_delay_time,
+                                    QuicByteCount* newly_acked_length) {
+  QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ << " Acking "
+                << "[" << offset << ", " << offset + data_length << "]"
+                << " fin = " << fin_acked;
+  *newly_acked_length = 0;
+  if (!send_buffer_.OnStreamDataAcked(offset, data_length,
+                                      newly_acked_length)) {
+    CloseConnectionWithDetails(QUIC_INTERNAL_ERROR,
+                               "Trying to ack unsent data.");
+    return false;
+  }
+  if (!fin_sent_ && fin_acked) {
+    CloseConnectionWithDetails(QUIC_INTERNAL_ERROR,
+                               "Trying to ack unsent fin.");
+    return false;
+  }
+  // Indicates whether ack listener's OnPacketAcked should be called.
+  const bool new_data_acked =
+      *newly_acked_length > 0 || (fin_acked && fin_outstanding_);
+  if (fin_acked) {
+    fin_outstanding_ = false;
+    fin_lost_ = false;
+  }
+  if (!IsWaitingForAcks()) {
+    session_->OnStreamDoneWaitingForAcks(id_);
+  }
+  return new_data_acked;
+}
+
+void QuicStream::OnStreamFrameRetransmitted(QuicStreamOffset offset,
+                                            QuicByteCount data_length,
+                                            bool fin_retransmitted) {
+  send_buffer_.OnStreamDataRetransmitted(offset, data_length);
+  if (fin_retransmitted) {
+    fin_lost_ = false;
+  }
+}
+
+void QuicStream::OnStreamFrameLost(QuicStreamOffset offset,
+                                   QuicByteCount data_length,
+                                   bool fin_lost) {
+  QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ << " Losting "
+                << "[" << offset << ", " << offset + data_length << "]"
+                << " fin = " << fin_lost;
+  if (data_length > 0) {
+    send_buffer_.OnStreamDataLost(offset, data_length);
+  }
+  if (fin_lost && fin_outstanding_) {
+    fin_lost_ = true;
+  }
+}
+
+bool QuicStream::RetransmitStreamData(QuicStreamOffset offset,
+                                      QuicByteCount data_length,
+                                      bool fin) {
+  if (HasDeadlinePassed()) {
+    OnDeadlinePassed();
+    return true;
+  }
+  QuicIntervalSet<QuicStreamOffset> retransmission(offset,
+                                                   offset + data_length);
+  retransmission.Difference(bytes_acked());
+  bool retransmit_fin = fin && fin_outstanding_;
+  if (retransmission.Empty() && !retransmit_fin) {
+    return true;
+  }
+  QuicConsumedData consumed(0, false);
+  for (const auto& interval : retransmission) {
+    QuicStreamOffset retransmission_offset = interval.min();
+    QuicByteCount retransmission_length = interval.max() - interval.min();
+    const bool can_bundle_fin =
+        retransmit_fin && (retransmission_offset + retransmission_length ==
+                           stream_bytes_written());
+    consumed = session()->WritevData(this, id_, retransmission_length,
+                                     retransmission_offset,
+                                     can_bundle_fin ? FIN : NO_FIN);
+    QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
+                  << " is forced to retransmit stream data ["
+                  << retransmission_offset << ", "
+                  << retransmission_offset + retransmission_length
+                  << ") and fin: " << can_bundle_fin
+                  << ", consumed: " << consumed;
+    OnStreamFrameRetransmitted(retransmission_offset, consumed.bytes_consumed,
+                               consumed.fin_consumed);
+    if (can_bundle_fin) {
+      retransmit_fin = !consumed.fin_consumed;
+    }
+    if (consumed.bytes_consumed < retransmission_length ||
+        (can_bundle_fin && !consumed.fin_consumed)) {
+      // Connection is write blocked.
+      return false;
+    }
+  }
+  if (retransmit_fin) {
+    QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
+                  << " retransmits fin only frame.";
+    consumed = session()->WritevData(this, id_, 0, stream_bytes_written(), FIN);
+    if (!consumed.fin_consumed) {
+      return false;
+    }
+  }
+  return true;
+}
+
+bool QuicStream::IsWaitingForAcks() const {
+  return (!rst_sent_ || stream_error_ == QUIC_STREAM_NO_ERROR) &&
+         (send_buffer_.stream_bytes_outstanding() || fin_outstanding_);
+}
+
+size_t QuicStream::ReadableBytes() const {
+  return sequencer_.ReadableBytes();
+}
+
+bool QuicStream::WriteStreamData(QuicStreamOffset offset,
+                                 QuicByteCount data_length,
+                                 QuicDataWriter* writer) {
+  DCHECK_LT(0u, data_length);
+  QUIC_DVLOG(2) << ENDPOINT << "Write stream " << id_ << " data from offset "
+                << offset << " length " << data_length;
+  return send_buffer_.WriteStreamData(offset, data_length, writer);
+}
+
+void QuicStream::WriteBufferedData() {
+  DCHECK(!write_side_closed_ && (HasBufferedData() || fin_buffered_));
+
+  if (session_->ShouldYield(id())) {
+    session_->MarkConnectionLevelWriteBlocked(id());
+    return;
+  }
+
+  // Size of buffered data.
+  size_t write_length = BufferedDataBytes();
+
+  // A FIN with zero data payload should not be flow control blocked.
+  bool fin_with_zero_data = (fin_buffered_ && write_length == 0);
+
+  bool fin = fin_buffered_;
+
+  // How much data flow control permits to be written.
+  QuicByteCount send_window = flow_controller_.SendWindowSize();
+  if (stream_contributes_to_connection_flow_control_) {
+    send_window =
+        std::min(send_window, connection_flow_controller_->SendWindowSize());
+  }
+
+  if (send_window == 0 && !fin_with_zero_data) {
+    // Quick return if nothing can be sent.
+    MaybeSendBlocked();
+    return;
+  }
+
+  if (write_length > send_window) {
+    // Don't send the FIN unless all the data will be sent.
+    fin = false;
+
+    // Writing more data would be a violation of flow control.
+    write_length = static_cast<size_t>(send_window);
+    QUIC_DVLOG(1) << "stream " << id() << " shortens write length to "
+                  << write_length << " due to flow control";
+  }
+  if (session_->session_decides_what_to_write()) {
+    session_->SetTransmissionType(NOT_RETRANSMISSION);
+  }
+  QuicConsumedData consumed_data =
+      WritevDataInner(write_length, stream_bytes_written(), fin);
+
+  OnStreamDataConsumed(consumed_data.bytes_consumed);
+
+  AddBytesSent(consumed_data.bytes_consumed);
+  QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ << " sends "
+                << stream_bytes_written() << " bytes "
+                << " and has buffered data " << BufferedDataBytes() << " bytes."
+                << " fin is sent: " << consumed_data.fin_consumed
+                << " fin is buffered: " << fin_buffered_;
+
+  // The write may have generated a write error causing this stream to be
+  // closed. If so, simply return without marking the stream write blocked.
+  if (write_side_closed_) {
+    return;
+  }
+
+  if (consumed_data.bytes_consumed == write_length) {
+    if (!fin_with_zero_data) {
+      MaybeSendBlocked();
+    }
+    if (fin && consumed_data.fin_consumed) {
+      fin_sent_ = true;
+      fin_outstanding_ = true;
+      if (fin_received_) {
+        session_->StreamDraining(id_);
+      }
+      CloseWriteSide();
+    } else if (fin && !consumed_data.fin_consumed) {
+      session_->MarkConnectionLevelWriteBlocked(id());
+    }
+  } else {
+    session_->MarkConnectionLevelWriteBlocked(id());
+  }
+  if (consumed_data.bytes_consumed > 0 || consumed_data.fin_consumed) {
+    busy_counter_ = 0;
+  }
+}
+
+uint64_t QuicStream::BufferedDataBytes() const {
+  DCHECK_GE(send_buffer_.stream_offset(), stream_bytes_written());
+  return send_buffer_.stream_offset() - stream_bytes_written();
+}
+
+bool QuicStream::CanWriteNewData() const {
+  return BufferedDataBytes() < buffered_data_threshold_;
+}
+
+bool QuicStream::CanWriteNewDataAfterData(QuicByteCount length) const {
+  return (BufferedDataBytes() + length) < buffered_data_threshold_;
+}
+
+uint64_t QuicStream::stream_bytes_written() const {
+  return send_buffer_.stream_bytes_written();
+}
+
+const QuicIntervalSet<QuicStreamOffset>& QuicStream::bytes_acked() const {
+  return send_buffer_.bytes_acked();
+}
+
+void QuicStream::OnStreamDataConsumed(size_t bytes_consumed) {
+  send_buffer_.OnStreamDataConsumed(bytes_consumed);
+}
+
+void QuicStream::WritePendingRetransmission() {
+  while (HasPendingRetransmission()) {
+    QuicConsumedData consumed(0, false);
+    if (!send_buffer_.HasPendingRetransmission()) {
+      QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
+                    << " retransmits fin only frame.";
+      consumed =
+          session()->WritevData(this, id_, 0, stream_bytes_written(), FIN);
+      fin_lost_ = !consumed.fin_consumed;
+      if (fin_lost_) {
+        // Connection is write blocked.
+        return;
+      }
+    } else {
+      StreamPendingRetransmission pending =
+          send_buffer_.NextPendingRetransmission();
+      // Determine whether the lost fin can be bundled with the data.
+      const bool can_bundle_fin =
+          fin_lost_ &&
+          (pending.offset + pending.length == stream_bytes_written());
+      consumed =
+          session()->WritevData(this, id_, pending.length, pending.offset,
+                                can_bundle_fin ? FIN : NO_FIN);
+      QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
+                    << " tries to retransmit stream data [" << pending.offset
+                    << ", " << pending.offset + pending.length
+                    << ") and fin: " << can_bundle_fin
+                    << ", consumed: " << consumed;
+      OnStreamFrameRetransmitted(pending.offset, consumed.bytes_consumed,
+                                 consumed.fin_consumed);
+      if (consumed.bytes_consumed < pending.length ||
+          (can_bundle_fin && !consumed.fin_consumed)) {
+        // Connection is write blocked.
+        return;
+      }
+    }
+  }
+}
+
+bool QuicStream::MaybeSetTtl(QuicTime::Delta ttl) {
+  if (is_static_) {
+    QUIC_BUG << "Cannot set TTL of a static stream.";
+    return false;
+  }
+  if (deadline_.IsInitialized()) {
+    QUIC_DLOG(WARNING) << "Deadline has already been set.";
+    return false;
+  }
+  if (!session()->session_decides_what_to_write()) {
+    QUIC_DLOG(WARNING) << "This session does not support stream TTL yet.";
+    return false;
+  }
+  QuicTime now = session()->connection()->clock()->ApproximateNow();
+  deadline_ = now + ttl;
+  return true;
+}
+
+bool QuicStream::HasDeadlinePassed() const {
+  if (!deadline_.IsInitialized()) {
+    // No deadline has been set.
+    return false;
+  }
+  DCHECK(session()->session_decides_what_to_write());
+  QuicTime now = session()->connection()->clock()->ApproximateNow();
+  if (now < deadline_) {
+    return false;
+  }
+  // TTL expired.
+  QUIC_DVLOG(1) << "stream " << id() << " deadline has passed";
+  return true;
+}
+
+void QuicStream::OnDeadlinePassed() {
+  Reset(QUIC_STREAM_TTL_EXPIRED);
+}
+
+void QuicStream::SendStopSending(uint16_t code) {
+  if (transport_version() != QUIC_VERSION_99) {
+    // If the connection is not version 99, do nothing.
+    // Do not QUIC_BUG or anything; the application really does not need to know
+    // what version the connection is in.
+    return;
+  }
+  session_->SendStopSending(code, id_);
+}
+
+void QuicStream::OnStopSending(uint16_t code) {}
+
+}  // namespace quic