Merge quiche::ReadStream and quiche::WriteStream into webtransport::Stream.

In practice, it is much easier to disable one side than to deal with complex inheritance hierarchy.

PiperOrigin-RevId: 872115117
diff --git a/build/source_list.bzl b/build/source_list.bzl
index c465704..4c3283f 100644
--- a/build/source_list.bzl
+++ b/build/source_list.bzl
@@ -59,7 +59,6 @@
     "common/quiche_simple_arena.h",
     "common/quiche_socket_address.h",
     "common/quiche_status_utils.h",
-    "common/quiche_stream.h",
     "common/quiche_string_tuple.h",
     "common/quiche_text_utils.h",
     "common/quiche_weak_ptr.h",
@@ -415,6 +414,7 @@
     "quic/platform/api/quic_thread.h",
     "web_transport/complete_buffer_visitor.h",
     "web_transport/encapsulated/encapsulated_web_transport.h",
+    "web_transport/stream_helpers.h",
     "web_transport/web_transport.h",
     "web_transport/web_transport_headers.h",
     "web_transport/web_transport_priority_scheduler.h",
@@ -794,7 +794,6 @@
     "common/platform/api/quiche_test.h",
     "common/platform/api/quiche_test_loopback.h",
     "common/platform/api/quiche_test_output.h",
-    "common/test_tools/mock_streams.h",
     "common/test_tools/quiche_test_utils.h",
     "http2/adapter/mock_http2_visitor.h",
     "http2/adapter/recording_http2_visitor.h",
@@ -1161,7 +1160,6 @@
     "common/simple_buffer_allocator_test.cc",
     "common/structured_headers_generated_test.cc",
     "common/structured_headers_test.cc",
-    "common/test_tools/mock_streams_test.cc",
     "common/test_tools/quiche_test_utils_test.cc",
     "common/vectorized_io_utils_test.cc",
     "common/wire_serialization_test.cc",
diff --git a/build/source_list.gni b/build/source_list.gni
index 55fd86b..f4b0dd1 100644
--- a/build/source_list.gni
+++ b/build/source_list.gni
@@ -59,7 +59,6 @@
     "src/quiche/common/quiche_simple_arena.h",
     "src/quiche/common/quiche_socket_address.h",
     "src/quiche/common/quiche_status_utils.h",
-    "src/quiche/common/quiche_stream.h",
     "src/quiche/common/quiche_string_tuple.h",
     "src/quiche/common/quiche_text_utils.h",
     "src/quiche/common/quiche_weak_ptr.h",
@@ -415,6 +414,7 @@
     "src/quiche/quic/platform/api/quic_thread.h",
     "src/quiche/web_transport/complete_buffer_visitor.h",
     "src/quiche/web_transport/encapsulated/encapsulated_web_transport.h",
+    "src/quiche/web_transport/stream_helpers.h",
     "src/quiche/web_transport/web_transport.h",
     "src/quiche/web_transport/web_transport_headers.h",
     "src/quiche/web_transport/web_transport_priority_scheduler.h",
@@ -794,7 +794,6 @@
     "src/quiche/common/platform/api/quiche_test.h",
     "src/quiche/common/platform/api/quiche_test_loopback.h",
     "src/quiche/common/platform/api/quiche_test_output.h",
-    "src/quiche/common/test_tools/mock_streams.h",
     "src/quiche/common/test_tools/quiche_test_utils.h",
     "src/quiche/http2/adapter/mock_http2_visitor.h",
     "src/quiche/http2/adapter/recording_http2_visitor.h",
@@ -1162,7 +1161,6 @@
     "src/quiche/common/simple_buffer_allocator_test.cc",
     "src/quiche/common/structured_headers_generated_test.cc",
     "src/quiche/common/structured_headers_test.cc",
-    "src/quiche/common/test_tools/mock_streams_test.cc",
     "src/quiche/common/test_tools/quiche_test_utils_test.cc",
     "src/quiche/common/vectorized_io_utils_test.cc",
     "src/quiche/common/wire_serialization_test.cc",
diff --git a/build/source_list.json b/build/source_list.json
index cd25bcd..66a8052 100644
--- a/build/source_list.json
+++ b/build/source_list.json
@@ -58,7 +58,6 @@
     "quiche/common/quiche_simple_arena.h",
     "quiche/common/quiche_socket_address.h",
     "quiche/common/quiche_status_utils.h",
-    "quiche/common/quiche_stream.h",
     "quiche/common/quiche_string_tuple.h",
     "quiche/common/quiche_text_utils.h",
     "quiche/common/quiche_weak_ptr.h",
@@ -414,6 +413,7 @@
     "quiche/quic/platform/api/quic_thread.h",
     "quiche/web_transport/complete_buffer_visitor.h",
     "quiche/web_transport/encapsulated/encapsulated_web_transport.h",
+    "quiche/web_transport/stream_helpers.h",
     "quiche/web_transport/web_transport.h",
     "quiche/web_transport/web_transport_headers.h",
     "quiche/web_transport/web_transport_priority_scheduler.h"
@@ -793,7 +793,6 @@
     "quiche/common/platform/api/quiche_test.h",
     "quiche/common/platform/api/quiche_test_loopback.h",
     "quiche/common/platform/api/quiche_test_output.h",
-    "quiche/common/test_tools/mock_streams.h",
     "quiche/common/test_tools/quiche_test_utils.h",
     "quiche/http2/adapter/mock_http2_visitor.h",
     "quiche/http2/adapter/recording_http2_visitor.h",
@@ -1161,7 +1160,6 @@
     "quiche/common/simple_buffer_allocator_test.cc",
     "quiche/common/structured_headers_generated_test.cc",
     "quiche/common/structured_headers_test.cc",
-    "quiche/common/test_tools/mock_streams_test.cc",
     "quiche/common/test_tools/quiche_test_utils_test.cc",
     "quiche/common/vectorized_io_utils_test.cc",
     "quiche/common/wire_serialization_test.cc",
diff --git a/quiche/common/quiche_stream.h b/quiche/common/quiche_stream.h
deleted file mode 100644
index c8718bd..0000000
--- a/quiche/common/quiche_stream.h
+++ /dev/null
@@ -1,214 +0,0 @@
-// Copyright 2023 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-// General-purpose abstractions for read/write streams.
-
-#ifndef QUICHE_COMMON_QUICHE_STREAM_H_
-#define QUICHE_COMMON_QUICHE_STREAM_H_
-
-#include <cstddef>
-#include <string>
-
-#include "absl/status/status.h"
-#include "absl/strings/string_view.h"
-#include "absl/types/span.h"
-#include "quiche/common/platform/api/quiche_export.h"
-#include "quiche/common/quiche_callbacks.h"
-#include "quiche/common/quiche_mem_slice.h"
-
-namespace quiche {
-
-// A shared base class for read and write stream to support abrupt termination.
-class QUICHE_EXPORT TerminableStream {
- public:
-  virtual ~TerminableStream() = default;
-
-  // Abruptly terminate the stream due to an error. If `error` is not OK, it may
-  // carry the error information that could be potentially communicated to the
-  // peer in case the stream is remote. If the stream is a duplex stream, both
-  // ends of the stream are terminated.
-  virtual void AbruptlyTerminate(absl::Status error) = 0;
-};
-
-// A general-purpose visitor API that gets notifications for ReadStream-related
-// events.
-class QUICHE_EXPORT ReadStreamVisitor {
- public:
-  virtual ~ReadStreamVisitor() = default;
-
-  // Called whenever the stream has new data available to read. Unless otherwise
-  // specified, QUICHE stream reads are level-triggered, which means that the
-  // callback will be called repeatedly as long as there is still data in the
-  // buffer.
-  virtual void OnCanRead() = 0;
-};
-
-// General purpose abstraction for a stream of data that can be read from the
-// network. The class is designed around the idea that a network stream stores
-// all of the received data in a sequence of contiguous buffers. Because of
-// that, there are two ways to read from a stream:
-//   - Read() will copy data into a user-provided buffer, reassembling it if it
-//     is split across multiple buffers internally.
-//   - PeekNextReadableRegion()/SkipBytes() let the caller access the underlying
-//     buffers directly, potentially avoiding the copying at the cost of the
-//     caller having to deal with discontinuities.
-class QUICHE_EXPORT ReadStream {
- public:
-  struct QUICHE_EXPORT ReadResult {
-    // Number of bytes actually read.
-    size_t bytes_read = 0;
-    // Whether the FIN has been received; if true, no further data will arrive
-    // on the stream, and the stream object can be soon potentially garbage
-    // collected.
-    bool fin = false;
-  };
-
-  struct PeekResult {
-    // The next available chunk in the sequencer buffer.
-    absl::string_view peeked_data;
-    // True if all of the data up to the FIN has been read.
-    bool fin_next = false;
-    // True if all of the data up to the FIN has been received (but not
-    // necessarily read).
-    bool all_data_received = false;
-
-    // Indicates that `SkipBytes()` will make progress if called.
-    bool has_data() const { return !peeked_data.empty() || fin_next; }
-  };
-
-  virtual ~ReadStream() = default;
-
-  // Reads at most `buffer.size()` bytes into `buffer`.
-  [[nodiscard]] virtual ReadResult Read(absl::Span<char> buffer) = 0;
-
-  // Reads all available data and appends it to the end of `output`.
-  [[nodiscard]] virtual ReadResult Read(std::string* output) = 0;
-
-  // Indicates the total number of bytes that can be read from the stream.
-  virtual size_t ReadableBytes() const = 0;
-
-  // Returns a contiguous buffer to read (or an empty buffer, if there is no
-  // data to read). See `ProcessAllReadableRegions` below for an example of how
-  // to use this method while handling FIN correctly.
-  virtual PeekResult PeekNextReadableRegion() const = 0;
-
-  // Equivalent to reading `bytes`, but does not perform any copying. `bytes`
-  // must be less than or equal to `ReadableBytes()`. The return value indicates
-  // if the FIN has been reached. `SkipBytes(0)` can be used to consume the FIN
-  // if it's the only thing remaining on the stream.
-  [[nodiscard]] virtual bool SkipBytes(size_t bytes) = 0;
-};
-
-// Calls `callback` for every contiguous chunk available inside the stream.
-// Returns true if the FIN has been reached.
-inline bool ProcessAllReadableRegions(
-    ReadStream& stream, UnretainedCallback<void(absl::string_view)> callback) {
-  for (;;) {
-    ReadStream::PeekResult peek_result = stream.PeekNextReadableRegion();
-    if (!peek_result.has_data()) {
-      return false;
-    }
-    callback(peek_result.peeked_data);
-    bool fin = stream.SkipBytes(peek_result.peeked_data.size());
-    if (fin) {
-      return true;
-    }
-  }
-}
-
-// A general-purpose visitor API that gets notifications for WriteStream-related
-// events.
-class QUICHE_EXPORT WriteStreamVisitor {
- public:
-  virtual ~WriteStreamVisitor() {}
-
-  // Called whenever the stream is not write-blocked and can accept new data.
-  virtual void OnCanWrite() = 0;
-};
-
-// Options for writing data into a WriteStream.
-class QUICHE_EXPORT StreamWriteOptions {
- public:
-  StreamWriteOptions() = default;
-
-  // If send_fin() is set to true, the write operation also sends a FIN on the
-  // stream.
-  bool send_fin() const { return send_fin_; }
-  void set_send_fin(bool send_fin) { send_fin_ = send_fin; }
-
-  // If buffer_unconditionally() is set to true, the write operation will buffer
-  // data even if the internal buffer limit is exceeded.
-  bool buffer_unconditionally() const { return buffer_unconditionally_; }
-  void set_buffer_unconditionally(bool value) {
-    buffer_unconditionally_ = value;
-  }
-
- private:
-  bool send_fin_ = false;
-  bool buffer_unconditionally_ = false;
-};
-
-inline constexpr StreamWriteOptions kDefaultStreamWriteOptions =
-    StreamWriteOptions();
-
-// WriteStream is an object that can accept a stream of bytes.
-//
-// The writes into a WriteStream are all-or-nothing.  A WriteStream object has
-// to either accept all data written into it by returning absl::OkStatus, or ask
-// the caller to try again once via OnCanWrite() by returning
-// absl::UnavailableError.
-class QUICHE_EXPORT WriteStream {
- public:
-  virtual ~WriteStream() {}
-
-  // Writes `data` into the stream.  If the write succeeds, the ownership is
-  // transferred to the stream; if it does not, the behavior is undefined -- the
-  // users of this API should check `CanWrite()` before calling `Writev()`.
-  virtual absl::Status Writev(absl::Span<QuicheMemSlice> data,
-                              const StreamWriteOptions& options) = 0;
-
-  // Indicates whether it is possible to write into stream right now.
-  virtual bool CanWrite() const = 0;
-
-  // Legacy convenience method for writing a single string_view.  New users
-  // should use quiche::SendFinOnStream instead, since this method does not
-  // return useful failure information.
-  [[nodiscard]] bool SendFin() {
-    StreamWriteOptions options;
-    options.set_send_fin(true);
-    return Writev(absl::Span<QuicheMemSlice>(), options).ok();
-  }
-
-  // Legacy convenience method for writing a single string_view.  New users
-  // should use quiche::WriteIntoStream instead, since this method does not
-  // return useful failure information.
-  [[nodiscard]] bool Write(absl::string_view data) {
-    QuicheMemSlice slice = QuicheMemSlice::Copy(data);
-    return Writev(absl::MakeSpan(&slice, 1), kDefaultStreamWriteOptions).ok();
-  }
-};
-
-// Convenience methods to write a single chunk of data into the stream.
-inline absl::Status WriteIntoStream(
-    WriteStream& stream, QuicheMemSlice slice,
-    const StreamWriteOptions& options = kDefaultStreamWriteOptions) {
-  return stream.Writev(absl::MakeSpan(&slice, 1), options);
-}
-inline absl::Status WriteIntoStream(
-    WriteStream& stream, absl::string_view data,
-    const StreamWriteOptions& options = kDefaultStreamWriteOptions) {
-  QuicheMemSlice slice = QuicheMemSlice::Copy(data);
-  return stream.Writev(absl::MakeSpan(&slice, 1), options);
-}
-
-// Convenience methods to send a FIN on the stream.
-inline absl::Status SendFinOnStream(WriteStream& stream) {
-  StreamWriteOptions options;
-  options.set_send_fin(true);
-  return stream.Writev(absl::Span<QuicheMemSlice>(), options);
-}
-
-}  // namespace quiche
-
-#endif  // QUICHE_COMMON_QUICHE_STREAM_H_
diff --git a/quiche/common/test_tools/mock_streams.h b/quiche/common/test_tools/mock_streams.h
deleted file mode 100644
index 7290490..0000000
--- a/quiche/common/test_tools/mock_streams.h
+++ /dev/null
@@ -1,102 +0,0 @@
-// Copyright (c) 2023 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#ifndef QUICHE_COMMON_TEST_TOOLS_MOCK_STREAMS_H_
-#define QUICHE_COMMON_TEST_TOOLS_MOCK_STREAMS_H_
-
-#include <algorithm>
-#include <cstddef>
-#include <string>
-#include <utility>
-
-#include "absl/status/status.h"
-#include "absl/strings/string_view.h"
-#include "absl/types/span.h"
-#include "quiche/common/platform/api/quiche_test.h"
-#include "quiche/common/quiche_mem_slice.h"
-#include "quiche/common/quiche_stream.h"
-
-namespace quiche::test {
-
-// Mockable stream that stores all of the data into an std::string by default.
-class MockWriteStream : public quiche::WriteStream {
- public:
-  MockWriteStream() {
-    ON_CALL(*this, CanWrite()).WillByDefault(testing::Return(true));
-    ON_CALL(*this, Writev(testing::_, testing::_))
-        .WillByDefault([&](absl::Span<quiche::QuicheMemSlice> data,
-                           const StreamWriteOptions& options) {
-          return AppendToData(data, options);
-        });
-  }
-
-  MOCK_METHOD(absl::Status, Writev,
-              (absl::Span<quiche::QuicheMemSlice> data,
-               const StreamWriteOptions& options),
-              (override));
-  MOCK_METHOD(bool, CanWrite, (), (const, override));
-
-  absl::Status AppendToData(absl::Span<quiche::QuicheMemSlice> data,
-                            const StreamWriteOptions& options) {
-    for (const quiche::QuicheMemSlice& fragment : data) {
-      data_.append(fragment.data(), fragment.length());
-    }
-    ProcessOptions(options);
-    return absl::OkStatus();
-  }
-  void ProcessOptions(const StreamWriteOptions& options) {
-    fin_written_ |= options.send_fin();
-  }
-
-  std::string& data() { return data_; }
-  bool fin_written() { return fin_written_; }
-
- private:
-  std::string data_;
-  bool fin_written_ = false;
-};
-
-// Reads stream data from an std::string buffer.
-class ReadStreamFromString : public ReadStream {
- public:
-  explicit ReadStreamFromString(std::string* data) : data_(data) {}
-
-  ReadResult Read(absl::Span<char> buffer) override {
-    size_t data_to_copy = std::min(buffer.size(), data_->size());
-    std::copy(data_->begin(), data_->begin() + data_to_copy, buffer.begin());
-    *data_ = data_->substr(data_to_copy);
-    return ReadResult{data_to_copy, data_->empty() && fin_};
-  }
-  ReadResult Read(std::string* output) override {
-    size_t bytes = data_->size();
-    output->append(std::move(*data_));
-    data_->clear();
-    return ReadResult{bytes, fin_};
-  }
-
-  size_t ReadableBytes() const override { return data_->size(); }
-
-  virtual PeekResult PeekNextReadableRegion() const override {
-    PeekResult result;
-    result.peeked_data = *data_;
-    result.fin_next = data_->empty() && fin_;
-    result.all_data_received = fin_;
-    return result;
-  }
-
-  bool SkipBytes(size_t bytes) override {
-    *data_ = data_->substr(bytes);
-    return data_->empty() && fin_;
-  }
-
-  void set_fin() { fin_ = true; }
-
- private:
-  std::string* data_;
-  bool fin_ = false;
-};
-
-}  // namespace quiche::test
-
-#endif  // QUICHE_COMMON_TEST_TOOLS_MOCK_STREAMS_H_
diff --git a/quiche/common/test_tools/mock_streams_test.cc b/quiche/common/test_tools/mock_streams_test.cc
deleted file mode 100644
index 2e8a2e4..0000000
--- a/quiche/common/test_tools/mock_streams_test.cc
+++ /dev/null
@@ -1,63 +0,0 @@
-// Copyright (c) 2023 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include "quiche/common/test_tools/mock_streams.h"
-
-#include <array>
-#include <string>
-
-#include "absl/types/span.h"
-#include "quiche/common/platform/api/quiche_test.h"
-#include "quiche/common/quiche_stream.h"
-#include "quiche/common/test_tools/quiche_test_utils.h"
-
-namespace quiche::test {
-namespace {
-
-using ::testing::ElementsAre;
-using ::testing::IsEmpty;
-
-TEST(MockWriteStreamTest, DefaultWrite) {
-  MockWriteStream stream;
-  QUICHE_EXPECT_OK(quiche::WriteIntoStream(stream, "test"));
-  EXPECT_EQ(stream.data(), "test");
-  EXPECT_FALSE(stream.fin_written());
-}
-
-TEST(ReadStreamFromStringTest, ReadIntoSpan) {
-  std::string source = "abcdef";
-  std::array<char, 3> buffer;
-  ReadStreamFromString stream(&source);
-  EXPECT_EQ(stream.ReadableBytes(), 6);
-
-  stream.Read(absl::MakeSpan(buffer));
-  EXPECT_THAT(buffer, ElementsAre('a', 'b', 'c'));
-  EXPECT_EQ(stream.ReadableBytes(), 3);
-
-  stream.Read(absl::MakeSpan(buffer));
-  EXPECT_THAT(buffer, ElementsAre('d', 'e', 'f'));
-  EXPECT_EQ(stream.ReadableBytes(), 0);
-  EXPECT_THAT(source, IsEmpty());
-}
-
-TEST(ReadStreamFromStringTest, ReadIntoString) {
-  std::string source = "abcdef";
-  std::string destination;
-  ReadStreamFromString stream(&source);
-  stream.Read(&destination);
-  EXPECT_EQ(destination, "abcdef");
-  EXPECT_THAT(source, IsEmpty());
-}
-
-TEST(ReadStreamFromStringTest, PeekAndSkip) {
-  std::string source = "abcdef";
-  ReadStreamFromString stream(&source);
-  EXPECT_EQ(stream.PeekNextReadableRegion().peeked_data, "abcdef");
-  stream.SkipBytes(2);
-  EXPECT_EQ(stream.PeekNextReadableRegion().peeked_data, "cdef");
-  EXPECT_EQ(source, "cdef");
-}
-
-}  // namespace
-}  // namespace quiche::test
diff --git a/quiche/quic/core/http/end_to_end_test.cc b/quiche/quic/core/http/end_to_end_test.cc
index 9e30120..9f49142 100644
--- a/quiche/quic/core/http/end_to_end_test.cc
+++ b/quiche/quic/core/http/end_to_end_test.cc
@@ -124,9 +124,10 @@
 #include "quiche/common/platform/api/quiche_reference_counted.h"
 #include "quiche/common/platform/api/quiche_test.h"
 #include "quiche/common/quiche_mem_slice.h"
-#include "quiche/common/quiche_stream.h"
 #include "quiche/common/simple_buffer_allocator.h"
 #include "quiche/common/test_tools/quiche_test_utils.h"
+#include "quiche/web_transport/stream_helpers.h"
+#include "quiche/web_transport/web_transport.h"
 #include "quiche/web_transport/web_transport_headers.h"
 
 using quiche::HttpHeaderBlock;
@@ -7653,7 +7654,7 @@
       .WillOnce(Assign(&data_acknowledged, true));
   outgoing_stream->SetVisitor(std::move(stream_visitor));
 
-  QUICHE_EXPECT_OK(quiche::WriteIntoStream(*outgoing_stream, "test"));
+  QUICHE_EXPECT_OK(webtransport::WriteIntoStream(*outgoing_stream, "test"));
   EXPECT_TRUE(outgoing_stream->SendFin());
 
   bool stream_received = false;
@@ -7694,7 +7695,7 @@
   WebTransportStream* outgoing_stream =
       session->OpenOutgoingUnidirectionalStream();
   ASSERT_TRUE(outgoing_stream != nullptr);
-  QUICHE_EXPECT_OK(quiche::WriteIntoStream(*outgoing_stream, "test"));
+  QUICHE_EXPECT_OK(webtransport::WriteIntoStream(*outgoing_stream, "test"));
   EXPECT_TRUE(outgoing_stream->SendFin());
 
   bool stream_received = false;
@@ -7735,7 +7736,7 @@
       .WillOnce(Assign(&data_acknowledged, true));
   stream->SetVisitor(std::move(stream_visitor_owned));
 
-  QUICHE_EXPECT_OK(quiche::WriteIntoStream(*stream, "test"));
+  QUICHE_EXPECT_OK(webtransport::WriteIntoStream(*stream, "test"));
   EXPECT_TRUE(stream->SendFin());
 
   std::string received_data =
@@ -7762,7 +7763,7 @@
 
   WebTransportStream* stream = session->OpenOutgoingBidirectionalStream();
   ASSERT_TRUE(stream != nullptr);
-  QUICHE_EXPECT_OK(quiche::WriteIntoStream(*stream, "test"));
+  QUICHE_EXPECT_OK(webtransport::WriteIntoStream(*stream, "test"));
   EXPECT_TRUE(stream->SendFin());
 
   std::string received_data = ReadDataFromWebTransportStreamUntilFin(stream);
@@ -7797,7 +7798,7 @@
                              quiche::QuicheMemSlice::Copy("bar"),
                              quiche::QuicheMemSlice::Copy("test"),
                              quiche::QuicheMemSlice::Copy(kLongString)};
-  quiche::StreamWriteOptions options;
+  webtransport::StreamWriteOptions options;
   options.set_send_fin(true);
   QUICHE_EXPECT_OK(stream->Writev(absl::MakeSpan(write_vector), options));
 
@@ -7849,7 +7850,7 @@
   WebTransportStream* stream = session->OpenOutgoingBidirectionalStream();
   ASSERT_TRUE(stream != nullptr);
   QuicStreamId stream_id = stream->GetStreamId();
-  QUICHE_EXPECT_OK(quiche::WriteIntoStream(*stream, "test"));
+  QUICHE_EXPECT_OK(webtransport::WriteIntoStream(*stream, "test"));
   // Keep stream open.
 
   bool close_received = false;
@@ -7881,7 +7882,7 @@
   WebTransportStream* stream = session->OpenOutgoingBidirectionalStream();
   ASSERT_TRUE(stream != nullptr);
   QuicStreamId stream_id = stream->GetStreamId();
-  QUICHE_EXPECT_OK(quiche::WriteIntoStream(*stream, "test"));
+  QUICHE_EXPECT_OK(webtransport::WriteIntoStream(*stream, "test"));
   // Keep stream open.
 
   bool close_received = false;
@@ -7913,7 +7914,7 @@
   WebTransportStream* stream = session->OpenOutgoingUnidirectionalStream();
   ASSERT_TRUE(stream != nullptr);
   QuicStreamId stream_id = stream->GetStreamId();
-  QUICHE_EXPECT_OK(quiche::WriteIntoStream(*stream, "42 test error"));
+  QUICHE_EXPECT_OK(webtransport::WriteIntoStream(*stream, "42 test error"));
   EXPECT_TRUE(stream->SendFin());
 
   // Have some other streams open pending, to ensure they are closed properly.
@@ -7945,7 +7946,7 @@
 
   WebTransportStream* stream = session->OpenOutgoingUnidirectionalStream();
   ASSERT_TRUE(stream != nullptr);
-  QUICHE_EXPECT_OK(quiche::WriteIntoStream(*stream, "DRAIN"));
+  QUICHE_EXPECT_OK(webtransport::WriteIntoStream(*stream, "DRAIN"));
   EXPECT_TRUE(stream->SendFin());
 
   bool drain_received = false;
@@ -7976,7 +7977,7 @@
   WebTransportStream* stream = session->OpenOutgoingBidirectionalStream();
   QuicStreamId id1 = stream->GetStreamId();
   ASSERT_TRUE(stream != nullptr);
-  QUICHE_EXPECT_OK(quiche::WriteIntoStream(*stream, "test"));
+  QUICHE_EXPECT_OK(webtransport::WriteIntoStream(*stream, "test"));
   stream->ResetWithUserCode(42);
 
   // This read fails if the stream is closed in both directions, since that
@@ -7987,7 +7988,7 @@
   stream = session->OpenOutgoingBidirectionalStream();
   QuicStreamId id2 = stream->GetStreamId();
   ASSERT_TRUE(stream != nullptr);
-  QUICHE_EXPECT_OK(quiche::WriteIntoStream(*stream, "test"));
+  QUICHE_EXPECT_OK(webtransport::WriteIntoStream(*stream, "test"));
   stream->SendStopSending(100024);
 
   std::array<std::string, 2> expected_log = {
@@ -8070,7 +8071,7 @@
 
   WebTransportStream* stream = session->OpenOutgoingBidirectionalStream();
   ASSERT_TRUE(stream != nullptr);
-  QUICHE_EXPECT_OK(quiche::WriteIntoStream(*stream, "test"));
+  QUICHE_EXPECT_OK(webtransport::WriteIntoStream(*stream, "test"));
   EXPECT_TRUE(stream->SendFin());
 
   EXPECT_TRUE(client_->WaitUntil(-1, [this, connect_stream_id]() {
@@ -8106,7 +8107,7 @@
   WebTransportStream* outgoing_stream =
       session->OpenOutgoingUnidirectionalStream();
   ASSERT_TRUE(outgoing_stream != nullptr);
-  QUICHE_EXPECT_OK(quiche::WriteIntoStream(*outgoing_stream, "test"));
+  QUICHE_EXPECT_OK(webtransport::WriteIntoStream(*outgoing_stream, "test"));
   EXPECT_TRUE(outgoing_stream->SendFin());
 
   EXPECT_CALL(visitor, OnIncomingUnidirectionalStreamAvailable())
@@ -8131,7 +8132,7 @@
   MockWebTransportStreamVisitor* stream_visitor = stream_visitor_owned.get();
   stream->SetVisitor(std::move(stream_visitor_owned));
 
-  QUICHE_EXPECT_OK(quiche::WriteIntoStream(*stream, "test"));
+  QUICHE_EXPECT_OK(webtransport::WriteIntoStream(*stream, "test"));
   EXPECT_TRUE(stream->SendFin());
 
   std::string received_data =
diff --git a/quiche/quic/core/http/web_transport_stream_adapter.cc b/quiche/quic/core/http/web_transport_stream_adapter.cc
index 47357c6..5546930 100644
--- a/quiche/quic/core/http/web_transport_stream_adapter.cc
+++ b/quiche/quic/core/http/web_transport_stream_adapter.cc
@@ -25,7 +25,6 @@
 #include "quiche/quic/platform/api/quic_logging.h"
 #include "quiche/common/platform/api/quiche_logging.h"
 #include "quiche/common/quiche_mem_slice.h"
-#include "quiche/common/quiche_stream.h"
 #include "quiche/web_transport/web_transport.h"
 
 namespace quic {
@@ -66,7 +65,7 @@
 
 absl::Status WebTransportStreamAdapter::Writev(
     absl::Span<quiche::QuicheMemSlice> data,
-    const quiche::StreamWriteOptions& options) {
+    const webtransport::StreamWriteOptions& options) {
   if (data.empty() && !options.send_fin()) {
     return absl::InvalidArgumentError(
         "Writev() called without any data or a FIN");
@@ -118,21 +117,12 @@
   return CheckBeforeStreamWrite().ok();
 }
 
-void WebTransportStreamAdapter::AbruptlyTerminate(absl::Status error) {
-  QUIC_DLOG(WARNING) << (session_->perspective() == Perspective::IS_CLIENT
-                             ? "Client: "
-                             : "Server: ")
-                     << "Abruptly terminating stream " << stream_->id()
-                     << " due to the following error: " << error;
-  ResetDueToInternalError();
-}
-
 size_t WebTransportStreamAdapter::ReadableBytes() const {
   QUICHE_DCHECK(sequencer_->level_triggered());
   return sequencer_->ReadableBytes();
 }
 
-quiche::ReadStream::PeekResult
+webtransport::Stream::PeekResult
 WebTransportStreamAdapter::PeekNextReadableRegion() const {
   iovec iov;
   PeekResult result;
diff --git a/quiche/quic/core/http/web_transport_stream_adapter.h b/quiche/quic/core/http/web_transport_stream_adapter.h
index 7107dd9..91192bf 100644
--- a/quiche/quic/core/http/web_transport_stream_adapter.h
+++ b/quiche/quic/core/http/web_transport_stream_adapter.h
@@ -24,7 +24,6 @@
 #include "quiche/quic/core/web_transport_interface.h"
 #include "quiche/common/platform/api/quiche_export.h"
 #include "quiche/common/quiche_mem_slice.h"
-#include "quiche/common/quiche_stream.h"
 #include "quiche/web_transport/web_transport.h"
 
 namespace quic {
@@ -41,9 +40,8 @@
   ABSL_MUST_USE_RESULT ReadResult Read(absl::Span<char> output) override;
   ABSL_MUST_USE_RESULT ReadResult Read(std::string* output) override;
   absl::Status Writev(absl::Span<quiche::QuicheMemSlice> data,
-                      const quiche::StreamWriteOptions& options) override;
+                      const webtransport::StreamWriteOptions& options) override;
   bool CanWrite() const override;
-  void AbruptlyTerminate(absl::Status error) override;
   size_t ReadableBytes() const override;
   PeekResult PeekNextReadableRegion() const override;
   bool SkipBytes(size_t bytes) override;
diff --git a/quiche/quic/core/quic_generic_session_test.cc b/quiche/quic/core/quic_generic_session_test.cc
index 45dfef5..ce6a6b2 100644
--- a/quiche/quic/core/quic_generic_session_test.cc
+++ b/quiche/quic/core/quic_generic_session_test.cc
@@ -43,8 +43,8 @@
 #include "quiche/quic/tools/web_transport_test_visitors.h"
 #include "quiche/common/platform/api/quiche_logging.h"
 #include "quiche/common/quiche_mem_slice.h"
-#include "quiche/common/quiche_stream.h"
 #include "quiche/common/test_tools/quiche_test_utils.h"
+#include "quiche/web_transport/stream_helpers.h"
 #include "quiche/web_transport/test_tools/mock_web_transport.h"
 #include "quiche/web_transport/web_transport.h"
 
@@ -55,7 +55,7 @@
 
 constexpr char kZeroes[8192] = {0};
 
-absl::Status SendZeroes(quiche::WriteStream& stream, int num_of_zeroes,
+absl::Status SendZeroes(webtransport::Stream& stream, int num_of_zeroes,
                         bool fin = false) {
   std::vector<quiche::QuicheMemSlice> slices;
   slices.reserve(num_of_zeroes / sizeof(kZeroes) + 1);
@@ -66,7 +66,7 @@
         quiche::QuicheMemSlice(kZeroes, chunk_size, +[](absl::string_view) {}));
     remaining -= chunk_size;
   }
-  quiche::StreamWriteOptions options;
+  webtransport::StreamWriteOptions options;
   options.set_send_fin(fin);
   return stream.Writev(absl::MakeSpan(slices), options);
 }
@@ -330,12 +330,13 @@
       client_->session()->AcceptIncomingUnidirectionalStream();
   ASSERT_TRUE(reply != nullptr);
   std::string buffer;
-  quiche::ReadStream::PeekResult peek_result = reply->PeekNextReadableRegion();
+  webtransport::Stream::PeekResult peek_result =
+      reply->PeekNextReadableRegion();
   EXPECT_EQ(peek_result.peeked_data, "Stream Two");
   EXPECT_EQ(peek_result.fin_next, false);
   EXPECT_EQ(peek_result.all_data_received, true);
-  bool fin_received =
-      quiche::ProcessAllReadableRegions(*reply, [&](absl::string_view chunk) {
+  bool fin_received = webtransport::ProcessAllReadableRegions(
+      *reply, [&](absl::string_view chunk) {
         buffer.append(chunk.data(), chunk.size());
         return true;
       });
@@ -516,17 +517,17 @@
   ASSERT_TRUE(stream != nullptr);
 
   ASSERT_TRUE(stream->CanWrite());
-  absl::Status status = quiche::WriteIntoStream(*stream, buffer);
+  absl::Status status = webtransport::WriteIntoStream(*stream, buffer);
   QUICHE_EXPECT_OK(status);
   EXPECT_FALSE(stream->CanWrite());
 
-  status = quiche::WriteIntoStream(*stream, buffer);
+  status = webtransport::WriteIntoStream(*stream, buffer);
   EXPECT_THAT(status, StatusIs(absl::StatusCode::kUnavailable));
 
-  quiche::StreamWriteOptions options;
+  webtransport::StreamWriteOptions options;
   options.set_buffer_unconditionally(true);
   options.set_send_fin(true);
-  status = quiche::WriteIntoStream(*stream, buffer, options);
+  status = webtransport::WriteIntoStream(*stream, buffer, options);
   QUICHE_EXPECT_OK(status);
   EXPECT_FALSE(stream->CanWrite());
 
@@ -534,7 +535,7 @@
   for (;;) {
     test_harness_.RunUntilWithDefaultTimeout(
         [&] { return stream->PeekNextReadableRegion().has_data(); });
-    quiche::ReadStream::PeekResult result = stream->PeekNextReadableRegion();
+    webtransport::Stream::PeekResult result = stream->PeekNextReadableRegion();
     total_received += result.peeked_data.size();
     bool fin_consumed = stream->SkipBytes(result.peeked_data.size());
     if (fin_consumed) {
diff --git a/quiche/quic/moqt/moqt_bidi_stream.h b/quiche/quic/moqt/moqt_bidi_stream.h
index d0ded95..98971f8 100644
--- a/quiche/quic/moqt/moqt_bidi_stream.h
+++ b/quiche/quic/moqt/moqt_bidi_stream.h
@@ -27,7 +27,7 @@
 #include "quiche/common/quiche_buffer_allocator.h"
 #include "quiche/common/quiche_callbacks.h"
 #include "quiche/common/quiche_mem_slice.h"
-#include "quiche/common/quiche_stream.h"
+#include "quiche/web_transport/stream_helpers.h"
 #include "quiche/web_transport/web_transport.h"
 
 namespace moqt {
@@ -214,7 +214,7 @@
   void Fin() {
     fin_queued_ = true;
     if (pending_messages_.empty()) {
-      if (stream_ != nullptr && !SendFinOnStream(*stream_).ok()) {
+      if (stream_ != nullptr && !webtransport::SendFinOnStream(*stream_).ok()) {
         std::move(session_error_callback_)(MoqtError::kInternalError,
                                            "Failed to send FIN");
       }
@@ -260,7 +260,7 @@
 
  private:
   void SendMessage(quiche::QuicheBuffer message, bool fin) {
-    quiche::StreamWriteOptions options;
+    webtransport::StreamWriteOptions options;
     options.set_send_fin(fin);
     // TODO: while we buffer unconditionally, we should still at some point tear
     // down the connection if we've buffered too many control messages;
diff --git a/quiche/quic/moqt/moqt_bidi_stream_test.cc b/quiche/quic/moqt/moqt_bidi_stream_test.cc
index cf0cb88..dedd3c8 100644
--- a/quiche/quic/moqt/moqt_bidi_stream_test.cc
+++ b/quiche/quic/moqt/moqt_bidi_stream_test.cc
@@ -16,8 +16,8 @@
 #include "quiche/quic/moqt/test_tools/moqt_framer_utils.h"
 #include "quiche/common/platform/api/quiche_test.h"
 #include "quiche/common/quiche_mem_slice.h"
-#include "quiche/common/quiche_stream.h"
 #include "quiche/web_transport/test_tools/mock_web_transport.h"
+#include "quiche/web_transport/web_transport.h"
 
 using ::testing::_;
 using ::testing::Return;
@@ -220,7 +220,7 @@
     EXPECT_CALL(mock_stream_,
                 Writev(ControlMessageOfType(MoqtMessageType::kRequestError), _))
         .WillOnce([](absl::Span<quiche::QuicheMemSlice>,
-                     const quiche::StreamWriteOptions& options) {
+                     const webtransport::StreamWriteOptions& options) {
           EXPECT_TRUE(options.send_fin());
           return absl::OkStatus();
         });
@@ -232,7 +232,7 @@
   stream_->set_stream(&mock_stream_);
   EXPECT_CALL(mock_stream_, Writev)
       .WillOnce([](absl::Span<quiche::QuicheMemSlice>,
-                   const quiche::StreamWriteOptions& options) {
+                   const webtransport::StreamWriteOptions& options) {
         EXPECT_TRUE(options.send_fin());
         return absl::OkStatus();
       });
diff --git a/quiche/quic/moqt/moqt_namespace_stream.cc b/quiche/quic/moqt/moqt_namespace_stream.cc
index 3ac34a3..f43967b 100644
--- a/quiche/quic/moqt/moqt_namespace_stream.cc
+++ b/quiche/quic/moqt/moqt_namespace_stream.cc
@@ -21,7 +21,6 @@
 #include "quiche/quic/moqt/moqt_session_callbacks.h"
 #include "quiche/quic/moqt/session_namespace_tree.h"
 #include "quiche/common/platform/api/quiche_logging.h"
-#include "quiche/common/quiche_stream.h"
 #include "quiche/web_transport/web_transport.h"
 
 namespace moqt {
diff --git a/quiche/quic/moqt/moqt_namespace_stream_test.cc b/quiche/quic/moqt/moqt_namespace_stream_test.cc
index e538c27..9b4b2be 100644
--- a/quiche/quic/moqt/moqt_namespace_stream_test.cc
+++ b/quiche/quic/moqt/moqt_namespace_stream_test.cc
@@ -25,8 +25,8 @@
 #include "quiche/quic/moqt/test_tools/moqt_mock_visitor.h"
 #include "quiche/common/platform/api/quiche_test.h"
 #include "quiche/common/quiche_mem_slice.h"
-#include "quiche/common/quiche_stream.h"
 #include "quiche/web_transport/test_tools/mock_web_transport.h"
+#include "quiche/web_transport/web_transport.h"
 
 namespace moqt::test {
 namespace {
@@ -324,7 +324,7 @@
       .WillOnce([](TrackNamespace& ns, TransactionType& type) { return kEof; });
   EXPECT_CALL(mock_stream_, Writev)
       .WillOnce([&](absl::Span<quiche::QuicheMemSlice> slices,
-                    const quiche::StreamWriteOptions& options) {
+                    const webtransport::StreamWriteOptions& options) {
         EXPECT_EQ(slices.size(), 1);
         EXPECT_EQ(slices[0].data()[0],
                   static_cast<uint8_t>(MoqtMessageType::kNamespaceDone));
@@ -332,7 +332,7 @@
         return absl::OkStatus();
       })
       .WillOnce([&](absl::Span<quiche::QuicheMemSlice> slices,
-                    const quiche::StreamWriteOptions& options) {
+                    const webtransport::StreamWriteOptions& options) {
         EXPECT_EQ(slices.size(), 0);
         EXPECT_TRUE(options.send_fin());
         return absl::OkStatus();
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc
index 3e5a446..e1c663b 100644
--- a/quiche/quic/moqt/moqt_parser.cc
+++ b/quiche/quic/moqt/moqt_parser.cc
@@ -33,7 +33,7 @@
 #include "quiche/common/platform/api/quiche_bug_tracker.h"
 #include "quiche/common/platform/api/quiche_logging.h"
 #include "quiche/common/quiche_data_reader.h"
-#include "quiche/common/quiche_stream.h"
+#include "quiche/web_transport/web_transport.h"
 
 namespace moqt {
 
@@ -48,11 +48,12 @@
 
 // |fin_read| is set to true if there is a FIN anywhere before the end of the
 // varint.
-std::optional<uint64_t> ReadVarInt62FromStream(quiche::ReadStream& stream,
+std::optional<uint64_t> ReadVarInt62FromStream(webtransport::Stream& stream,
                                                bool& fin_read) {
   fin_read = false;
 
-  quiche::ReadStream::PeekResult peek_result = stream.PeekNextReadableRegion();
+  webtransport::Stream::PeekResult peek_result =
+      stream.PeekNextReadableRegion();
   if (peek_result.peeked_data.empty()) {
     if (peek_result.fin_next) {
       fin_read = stream.SkipBytes(0);
@@ -73,7 +74,7 @@
   char buffer[8];
   absl::Span<char> bytes_to_read =
       absl::MakeSpan(buffer).subspan(0, varint_size);
-  quiche::ReadStream::ReadResult read_result = stream.Read(bytes_to_read);
+  webtransport::Stream::ReadResult read_result = stream.Read(bytes_to_read);
   QUICHE_DCHECK_EQ(read_result.bytes_read, varint_size);
   fin_read = read_result.fin;
 
@@ -459,7 +460,7 @@
         return;
       }
       std::array<char, 2> size_bytes;
-      quiche::ReadStream::ReadResult result =
+      webtransport::Stream::ReadResult result =
           stream_.Read(absl::MakeSpan(size_bytes));
       if (result.bytes_read != 2) {
         ParseError(MoqtError::kInternalError,
@@ -489,7 +490,7 @@
       return;
     }
     absl::FixedArray<char> message(*message_size_);
-    quiche::ReadStream::ReadResult result =
+    webtransport::Stream::ReadResult result =
         stream_.Read(absl::MakeSpan(message));
     if (result.bytes_read != *message_size_) {
       ParseError("Stream returned incorrect ReadableBytes");
@@ -1205,7 +1206,7 @@
 
 std::optional<uint8_t> MoqtDataParser::ReadUint8NoFin() {
   char buffer[1];
-  quiche::ReadStream::ReadResult read_result =
+  webtransport::Stream::ReadResult read_result =
       stream_.Read(absl::MakeSpan(buffer));
   if (read_result.bytes_read == 0) {
     return std::nullopt;
@@ -1421,7 +1422,7 @@
     case kExtensionBody:
     case kData: {
       while (payload_length_remaining_ > 0) {
-        quiche::ReadStream::PeekResult peek_result =
+        webtransport::Stream::PeekResult peek_result =
             stream_.PeekNextReadableRegion();
         if (!peek_result.has_data()) {
           return;
diff --git a/quiche/quic/moqt/moqt_parser.h b/quiche/quic/moqt/moqt_parser.h
index 79c10a1..d0585ea 100644
--- a/quiche/quic/moqt/moqt_parser.h
+++ b/quiche/quic/moqt/moqt_parser.h
@@ -22,7 +22,7 @@
 #include "quiche/quic/moqt/moqt_priority.h"
 #include "quiche/common/platform/api/quiche_export.h"
 #include "quiche/common/quiche_callbacks.h"
-#include "quiche/common/quiche_stream.h"
+#include "quiche/web_transport/web_transport.h"
 
 namespace moqt {
 
@@ -89,7 +89,7 @@
 
 class QUICHE_EXPORT MoqtMessageTypeParser {
  public:
-  MoqtMessageTypeParser(quiche::ReadStream* stream) : stream_(*stream) {}
+  MoqtMessageTypeParser(webtransport::Stream* stream) : stream_(*stream) {}
   ~MoqtMessageTypeParser() = default;
 
   // Returns false if there was a FIN.
@@ -97,13 +97,13 @@
   std::optional<uint64_t> message_type() const { return message_type_; }
 
  private:
-  quiche::ReadStream& stream_;
+  webtransport::Stream& stream_;
   std::optional<uint64_t> message_type_;
 };
 
 class QUICHE_EXPORT MoqtControlParser {
  public:
-  MoqtControlParser(bool uses_web_transport, quiche::ReadStream* stream,
+  MoqtControlParser(bool uses_web_transport, webtransport::Stream* stream,
                     MoqtControlParserVisitor& visitor)
       : visitor_(visitor),
         stream_(*stream),
@@ -177,7 +177,7 @@
                                         MessageParameters& out);
 
   MoqtControlParserVisitor& visitor_;
-  quiche::ReadStream& stream_;
+  webtransport::Stream& stream_;
   bool uses_web_transport_;
   bool no_more_data_ = false;  // Fatal error or fin. No more parsing.
   bool parsing_error_ = false;
@@ -206,7 +206,7 @@
   // `stream` must outlive the parser.  The parser does not configure itself as
   // a listener for the read events of the stream; it is responsibility of the
   // caller to do so via one of the read methods below.
-  explicit MoqtDataParser(quiche::ReadStream* stream,
+  explicit MoqtDataParser(webtransport::Stream* stream,
                           MoqtDataParserVisitor* visitor)
       : stream_(*stream), visitor_(*visitor) {}
 
@@ -282,7 +282,7 @@
 
   void ParseError(absl::string_view reason);
 
-  quiche::ReadStream& stream_;
+  webtransport::Stream& stream_;
   MoqtDataParserVisitor& visitor_;
 
   bool no_more_data_ = false;  // Fatal error or fin. No more parsing.
diff --git a/quiche/quic/moqt/moqt_probe_manager.cc b/quiche/quic/moqt/moqt_probe_manager.cc
index fefab71..953fe6e 100644
--- a/quiche/quic/moqt/moqt_probe_manager.cc
+++ b/quiche/quic/moqt/moqt_probe_manager.cc
@@ -19,8 +19,8 @@
 #include "quiche/common/platform/api/quiche_bug_tracker.h"
 #include "quiche/common/platform/api/quiche_logging.h"
 #include "quiche/common/quiche_mem_slice.h"
-#include "quiche/common/quiche_stream.h"
 #include "quiche/common/wire_serialization.h"
+#include "quiche/web_transport/stream_helpers.h"
 #include "quiche/web_transport/web_transport.h"
 
 namespace moqt {
@@ -73,7 +73,7 @@
   }
 
   if (!header_sent_) {
-    absl::Status status = quiche::WriteIntoStream(
+    absl::Status status = webtransport::WriteIntoStream(
         *stream_, *quiche::SerializeIntoString(quiche::WireVarInt62(
                       MoqtDataStreamType::Padding().value())));
     QUICHE_DCHECK(status.ok()) << status;  // Should succeed if CanWrite().
@@ -84,7 +84,7 @@
     quic::QuicByteCount chunk_size = std::min(kWriteChunkSize, data_remaining_);
     quiche::QuicheMemSlice chunk(
         kZeroes, chunk_size, +[](absl::string_view) {});
-    quiche::StreamWriteOptions options;
+    webtransport::StreamWriteOptions options;
     options.set_send_fin(chunk_size == data_remaining_);
     absl::Status status = stream_->Writev(absl::MakeSpan(&chunk, 1), options);
     QUICHE_DCHECK(status.ok()) << status;  // Should succeed if CanWrite().
diff --git a/quiche/quic/moqt/moqt_probe_manager_test.cc b/quiche/quic/moqt/moqt_probe_manager_test.cc
index 5b7c2f3..9f97832 100644
--- a/quiche/quic/moqt/moqt_probe_manager_test.cc
+++ b/quiche/quic/moqt/moqt_probe_manager_test.cc
@@ -21,7 +21,6 @@
 #include "quiche/common/platform/api/quiche_logging.h"
 #include "quiche/common/platform/api/quiche_test.h"
 #include "quiche/common/quiche_mem_slice.h"
-#include "quiche/common/quiche_stream.h"
 #include "quiche/web_transport/test_tools/mock_web_transport.h"
 #include "quiche/web_transport/web_transport.h"
 
@@ -49,8 +48,9 @@
   MockStream(webtransport::StreamId id) : id_(id) {}
 
   webtransport::StreamId GetStreamId() const override { return id_; }
-  absl::Status Writev(absl::Span<quiche::QuicheMemSlice> data,
-                      const quiche::StreamWriteOptions& options) override {
+  absl::Status Writev(
+      absl::Span<quiche::QuicheMemSlice> data,
+      const webtransport::StreamWriteOptions& options) override {
     QUICHE_CHECK(!fin_) << "FIN written twice.";
     for (const quiche::QuicheMemSlice& slice : data) {
       data_.append(slice.AsStringView());
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index fff6197..09354b2 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -50,7 +50,6 @@
 #include "quiche/common/platform/api/quiche_logging.h"
 #include "quiche/common/quiche_buffer_allocator.h"
 #include "quiche/common/quiche_mem_slice.h"
-#include "quiche/common/quiche_stream.h"
 #include "quiche/common/quiche_weak_ptr.h"
 #include "quiche/web_transport/web_transport.h"
 
@@ -681,7 +680,7 @@
       case MoqtFetchTask::GetNextObjectResult::kEof:
         // TODO(martinduke): Either prefetch the next object, or alter the API
         // so that we're not sending FIN in a separate frame.
-        if (!quiche::SendFinOnStream(*stream_).ok()) {
+        if (!webtransport::SendFinOnStream(*stream_).ok()) {
           QUIC_DVLOG(1) << "Sending FIN onStream " << stream_->GetStreamId()
                         << " failed";
         }
@@ -2405,7 +2404,7 @@
   // memslices so that we can avoid a copy here.
   std::array write_vector = {
       quiche::QuicheMemSlice(std::move(serialized_header)), std::move(payload)};
-  quiche::StreamWriteOptions options;
+  webtransport::StreamWriteOptions options;
   options.set_send_fin(fin);
   absl::Status write_status =
       stream->Writev(absl::MakeSpan(write_vector), options);
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index 4f0feb7..cb65ccd 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -46,7 +46,6 @@
 #include "quiche/common/quiche_buffer_allocator.h"
 #include "quiche/common/quiche_data_reader.h"
 #include "quiche/common/quiche_mem_slice.h"
-#include "quiche/common/quiche_stream.h"
 #include "quiche/common/quiche_weak_ptr.h"
 #include "quiche/web_transport/test_tools/in_memory_stream.h"
 #include "quiche/web_transport/test_tools/mock_web_transport.h"
@@ -228,7 +227,7 @@
       });
     }
     EXPECT_CALL(*stream, PeekNextReadableRegion()).WillRepeatedly([&]() {
-      return quiche::ReadStream::PeekResult(
+      return webtransport::Stream::PeekResult(
           absl::string_view(buffer.data() + data_read,
                             buffer.size() - data_read),
           fin && data_read == buffer.size(), fin);
@@ -242,7 +241,7 @@
               std::min(bytes_to_read.size(), buffer.size() - data_read);
           memcpy(bytes_to_read.data(), buffer.data() + data_read, read_size);
           data_read += read_size;
-          return quiche::ReadStream::ReadResult(
+          return webtransport::Stream::ReadResult(
               read_size, fin && data_read == buffer.size());
         });
     EXPECT_CALL(*stream, SkipBytes(_)).WillRepeatedly([&](size_t bytes) {
@@ -326,7 +325,7 @@
       });
   EXPECT_CALL(mock_stream_, PeekNextReadableRegion())
       .WillOnce(Return(
-          quiche::ReadStream::PeekResult(absl::string_view(), false, false)));
+          webtransport::Stream::PeekResult(absl::string_view(), false, false)));
   server_session.OnIncomingBidirectionalStreamAvailable();
 }
 
@@ -1320,7 +1319,7 @@
   const std::string kExpectedMessage = {0x11, 0x02, 0x05, 0x7f, 0x00, 0x0a};
   EXPECT_CALL(mock_stream_, Writev(_, _))
       .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
-                    const quiche::StreamWriteOptions& options) {
+                    const webtransport::StreamWriteOptions& options) {
         correct_message =
             absl::StartsWith(data[0].AsStringView(), kExpectedMessage);
         fin |= options.send_fin();
@@ -1377,7 +1376,7 @@
   const std::string kExpectedMessage = {0x11, 0x02, 0x05, 0x7f};
   EXPECT_CALL(mock_stream_, Writev(_, _))
       .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
-                    const quiche::StreamWriteOptions& options) {
+                    const webtransport::StreamWriteOptions& options) {
         correct_message =
             absl::StartsWith(data[0].AsStringView(), kExpectedMessage);
         fin = options.send_fin();
@@ -1431,7 +1430,7 @@
   const std::string kExpectedMessage = {0x11, 0x02, 0x05, 0x7f};
   EXPECT_CALL(mock_stream_, Writev(_, _))
       .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
-                    const quiche::StreamWriteOptions& options) {
+                    const webtransport::StreamWriteOptions& options) {
         correct_message =
             absl::StartsWith(data[0].AsStringView(), kExpectedMessage);
         fin |= options.send_fin();
@@ -1499,7 +1498,7 @@
   const std::string kExpectedMessage = {0x11, 0x02, 0x05, 0x7f};
   EXPECT_CALL(mock_stream_, Writev(_, _))
       .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
-                    const quiche::StreamWriteOptions& options) {
+                    const webtransport::StreamWriteOptions& options) {
         correct_message =
             absl::StartsWith(data[0].AsStringView(), kExpectedMessage);
         fin |= options.send_fin();
@@ -1569,7 +1568,7 @@
   const std::string kExpectedMessage = {0x11, 0x02, 0x05, 0x7f};
   EXPECT_CALL(mock_stream_, Writev(_, _))
       .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
-                    const quiche::StreamWriteOptions& options) {
+                    const webtransport::StreamWriteOptions& options) {
         correct_message =
             absl::StartsWith(data[0].AsStringView(), kExpectedMessage);
         fin |= options.send_fin();
@@ -1625,7 +1624,7 @@
   const std::string kExpectedMessage = {0x11, 0x02, 0x05, 0x7f};
   EXPECT_CALL(mock_stream_, Writev)
       .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
-                    const quiche::StreamWriteOptions& options) {
+                    const webtransport::StreamWriteOptions& options) {
         correct_message =
             absl::StartsWith(data[0].AsStringView(), kExpectedMessage);
         fin = options.send_fin();
@@ -1649,7 +1648,7 @@
   fin = false;
   EXPECT_CALL(mock_stream_, Writev)
       .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
-                    const quiche::StreamWriteOptions& options) {
+                    const webtransport::StreamWriteOptions& options) {
         EXPECT_TRUE(data.empty());
         fin = options.send_fin();
         return absl::OkStatus();
@@ -1688,7 +1687,7 @@
   const std::string kExpectedMessage = {0x04, 0x02, 0x05, 0x7f, 0x00, 0x00};
   EXPECT_CALL(mock_stream_, Writev(_, _))
       .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
-                    const quiche::StreamWriteOptions& options) {
+                    const webtransport::StreamWriteOptions& options) {
         correct_message =
             absl::StartsWith(data[0].AsStringView(), kExpectedMessage);
         fin = options.send_fin();
@@ -1737,7 +1736,7 @@
   });
   EXPECT_CALL(mock_stream_, Writev(_, _))
       .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
-                    const quiche::StreamWriteOptions& options) {
+                    const webtransport::StreamWriteOptions& options) {
         correct_message =
             absl::StartsWith(data[0].AsStringView(), kExpectedMessage2);
         fin = options.send_fin();
@@ -1779,7 +1778,7 @@
   const std::string kExpectedMessage = {0x04, 0x02, 0x05, 0x7f, 0x00, 0x00};
   EXPECT_CALL(mock_stream_, Writev(_, _))
       .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
-                    const quiche::StreamWriteOptions& options) {
+                    const webtransport::StreamWriteOptions& options) {
         correct_message =
             absl::StartsWith(data[0].AsStringView(), kExpectedMessage);
         fin = options.send_fin();
@@ -2117,7 +2116,7 @@
       .WillOnce(Return(std::nullopt));
   EXPECT_CALL(mock_stream_, Writev)
       .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
-                    const quiche::StreamWriteOptions& options) {
+                    const webtransport::StreamWriteOptions& options) {
         // The stream type omits the priority.
         EXPECT_TRUE(static_cast<const uint8_t>(data[0].AsStringView()[0]) &
                     MoqtDataStreamType::kDefaultPriority);
@@ -2269,21 +2268,21 @@
   EXPECT_CALL(mock_stream2, CanWrite()).WillRepeatedly(Return(true));
   EXPECT_CALL(mock_stream0, Writev(_, _))
       .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
-                    const quiche::StreamWriteOptions& options) {
+                    const webtransport::StreamWriteOptions& options) {
         // The Group ID is the 3rd byte of the stream.
         EXPECT_EQ(static_cast<const uint8_t>(data[0].AsStringView()[2]), 0);
         return absl::OkStatus();
       });
   EXPECT_CALL(mock_stream1, Writev(_, _))
       .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
-                    const quiche::StreamWriteOptions& options) {
+                    const webtransport::StreamWriteOptions& options) {
         // The Group ID is the 3rd byte of the stream.
         EXPECT_EQ(static_cast<const uint8_t>(data[0].AsStringView()[2]), 1);
         return absl::OkStatus();
       });
   EXPECT_CALL(mock_stream2, Writev(_, _))
       .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
-                    const quiche::StreamWriteOptions& options) {
+                    const webtransport::StreamWriteOptions& options) {
         // The Group ID is the 3rd byte of the stream.
         EXPECT_EQ(static_cast<const uint8_t>(data[0].AsStringView()[2]), 2);
         return absl::OkStatus();
@@ -2369,7 +2368,7 @@
   EXPECT_CALL(mock_stream0, CanWrite()).WillRepeatedly(Return(true));
   EXPECT_CALL(mock_stream0, Writev)
       .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
-                    const quiche::StreamWriteOptions& options) {
+                    const webtransport::StreamWriteOptions& options) {
         // Check track alias is 14.
         EXPECT_EQ(static_cast<const uint8_t>(data[0].AsStringView()[1]), 14);
         // Check Group ID is 0
@@ -2405,7 +2404,7 @@
   EXPECT_CALL(mock_stream1, CanWrite()).WillRepeatedly(Return(true));
   EXPECT_CALL(mock_stream1, Writev(_, _))
       .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
-                    const quiche::StreamWriteOptions& options) {
+                    const webtransport::StreamWriteOptions& options) {
         // Check track alias is 15.
         EXPECT_EQ(static_cast<const uint8_t>(data[0].AsStringView()[1]), 15);
         // Check Group ID is 0
@@ -2460,7 +2459,7 @@
   if (second_result == MoqtFetchTask::GetNextObjectResult::kEof) {
     EXPECT_CALL(data_stream, Writev)
         .WillOnce([](absl::Span<quiche::QuicheMemSlice> data,
-                     const quiche::StreamWriteOptions& options) {
+                     const webtransport::StreamWriteOptions& options) {
           quic::QuicDataReader reader(data[0].AsStringView());
           uint64_t type;
           EXPECT_TRUE(reader.ReadVarInt62(&type));
@@ -2469,7 +2468,7 @@
           return absl::OkStatus();
         })
         .WillOnce([](absl::Span<quiche::QuicheMemSlice> data,
-                     const quiche::StreamWriteOptions& options) {
+                     const webtransport::StreamWriteOptions& options) {
           EXPECT_TRUE(data.empty());
           EXPECT_TRUE(options.send_fin());
           return absl::OkStatus();
@@ -2478,7 +2477,7 @@
   }
   EXPECT_CALL(data_stream, Writev)
       .WillOnce([](absl::Span<quiche::QuicheMemSlice> data,
-                   const quiche::StreamWriteOptions& options) {
+                   const webtransport::StreamWriteOptions& options) {
         quic::QuicDataReader reader(data[0].AsStringView());
         uint64_t type;
         EXPECT_TRUE(reader.ReadVarInt62(&type));
diff --git a/quiche/quic/moqt/test_tools/moqt_framer_utils.cc b/quiche/quic/moqt/test_tools/moqt_framer_utils.cc
index 5858001..970ee46 100644
--- a/quiche/quic/moqt/test_tools/moqt_framer_utils.cc
+++ b/quiche/quic/moqt/test_tools/moqt_framer_utils.cc
@@ -17,8 +17,8 @@
 #include "quiche/quic/moqt/moqt_parser.h"
 #include "quiche/common/platform/api/quiche_test.h"
 #include "quiche/common/quiche_buffer_allocator.h"
-#include "quiche/common/quiche_stream.h"
 #include "quiche/web_transport/test_tools/in_memory_stream.h"
+#include "quiche/web_transport/web_transport.h"
 
 namespace moqt::test {
 
@@ -213,7 +213,7 @@
 
 absl::Status StoreSubscribe::operator()(
     absl::Span<const absl::string_view> data,
-    const quiche::StreamWriteOptions& options) const {
+    const webtransport::StreamWriteOptions& options) const {
   std::string merged_message = absl::StrJoin(data, "");
   std::vector<MoqtGenericFrame> frames = ParseGenericMessage(merged_message);
   if (frames.size() != 1 || !std::holds_alternative<MoqtSubscribe>(frames[0])) {
diff --git a/quiche/quic/moqt/test_tools/moqt_framer_utils.h b/quiche/quic/moqt/test_tools/moqt_framer_utils.h
index cd5135d..6aa81aa 100644
--- a/quiche/quic/moqt/test_tools/moqt_framer_utils.h
+++ b/quiche/quic/moqt/test_tools/moqt_framer_utils.h
@@ -19,7 +19,7 @@
 #include "quiche/common/platform/api/quiche_test.h"
 #include "quiche/common/quiche_data_reader.h"
 #include "quiche/common/quiche_mem_slice.h"
-#include "quiche/common/quiche_stream.h"
+#include "quiche/web_transport/web_transport.h"
 
 namespace moqt::test {
 
@@ -83,8 +83,9 @@
       : subscribe_(subscribe) {}
 
   // quiche::WriteStream::Writev() implementation.
-  absl::Status operator()(absl::Span<const absl::string_view> data,
-                          const quiche::StreamWriteOptions& options) const;
+  absl::Status operator()(
+      absl::Span<const absl::string_view> data,
+      const webtransport::StreamWriteOptions& options) const;
 
  private:
   std::optional<MoqtSubscribe>* subscribe_;
diff --git a/quiche/quic/tools/web_transport_test_visitors.h b/quiche/quic/tools/web_transport_test_visitors.h
index e33eedd..88ee359 100644
--- a/quiche/quic/tools/web_transport_test_visitors.h
+++ b/quiche/quic/tools/web_transport_test_visitors.h
@@ -5,16 +5,18 @@
 #ifndef QUICHE_QUIC_TOOLS_WEB_TRANSPORT_TEST_VISITORS_H_
 #define QUICHE_QUIC_TOOLS_WEB_TRANSPORT_TEST_VISITORS_H_
 
+#include <memory>
 #include <string>
 
 #include "absl/status/status.h"
+#include "absl/strings/string_view.h"
 #include "quiche/quic/core/web_transport_interface.h"
 #include "quiche/quic/platform/api/quic_logging.h"
 #include "quiche/common/platform/api/quiche_logging.h"
 #include "quiche/common/quiche_circular_deque.h"
-#include "quiche/common/quiche_stream.h"
 #include "quiche/common/simple_buffer_allocator.h"
 #include "quiche/web_transport/complete_buffer_visitor.h"
+#include "quiche/web_transport/stream_helpers.h"
 #include "quiche/web_transport/web_transport.h"
 
 namespace quic {
@@ -32,7 +34,7 @@
                   << " bytes from WebTransport stream "
                   << stream_->GetStreamId() << ", fin: " << result.fin;
     if (bidi_ && result.fin) {
-      absl::Status status = quiche::SendFinOnStream(*stream_);
+      absl::Status status = webtransport::SendFinOnStream(*stream_);
       QUICHE_DCHECK(status.ok()) << status;
     }
   }
@@ -114,7 +116,7 @@
     }
 
     if (!buffer_.empty()) {
-      absl::Status status = quiche::WriteIntoStream(*stream_, buffer_);
+      absl::Status status = webtransport::WriteIntoStream(*stream_, buffer_);
       QUIC_DVLOG(1) << "Attempted writing on WebTransport bidirectional stream "
                     << stream_->GetStreamId() << ", success: " << status;
       if (!status.ok()) {
@@ -125,7 +127,7 @@
     }
 
     if (send_fin_ && !fin_sent_) {
-      absl::Status status = quiche::SendFinOnStream(*stream_);
+      absl::Status status = webtransport::SendFinOnStream(*stream_);
       if (status.ok()) {
         fin_sent_ = true;
       }
diff --git a/quiche/web_transport/complete_buffer_visitor.cc b/quiche/web_transport/complete_buffer_visitor.cc
index 45980c5..a2fbd3f 100644
--- a/quiche/web_transport/complete_buffer_visitor.cc
+++ b/quiche/web_transport/complete_buffer_visitor.cc
@@ -7,8 +7,10 @@
 #include <string>
 #include <utility>
 
+#include "absl/status/status.h"
 #include "quiche/common/platform/api/quiche_logging.h"
-#include "quiche/common/quiche_stream.h"
+#include "quiche/web_transport/stream_helpers.h"
+#include "quiche/web_transport/web_transport.h"
 
 namespace webtransport {
 
@@ -30,10 +32,9 @@
   if (!stream_->CanWrite()) {
     return;
   }
-  quiche::StreamWriteOptions options;
+  StreamWriteOptions options;
   options.set_send_fin(true);
-  absl::Status status =
-      quiche::WriteIntoStream(*stream_, *outgoing_data_, options);
+  absl::Status status = WriteIntoStream(*stream_, *outgoing_data_, options);
   if (!status.ok()) {
     QUICHE_DLOG(WARNING) << "Write from OnCanWrite() failed: " << status;
     return;
diff --git a/quiche/web_transport/encapsulated/encapsulated_web_transport.cc b/quiche/web_transport/encapsulated/encapsulated_web_transport.cc
index 5a00803..ccdf6bb 100644
--- a/quiche/web_transport/encapsulated/encapsulated_web_transport.cc
+++ b/quiche/web_transport/encapsulated/encapsulated_web_transport.cc
@@ -11,7 +11,6 @@
 #include <cstddef>
 #include <cstdint>
 #include <cstring>
-#include <iterator>
 #include <memory>
 #include <optional>
 #include <string>
@@ -19,7 +18,6 @@
 #include <utility>
 #include <vector>
 
-#include "absl/algorithm/container.h"
 #include "absl/container/node_hash_map.h"
 #include "absl/status/status.h"
 #include "absl/status/statusor.h"
@@ -36,8 +34,8 @@
 #include "quiche/common/quiche_circular_deque.h"
 #include "quiche/common/quiche_mem_slice.h"
 #include "quiche/common/quiche_status_utils.h"
-#include "quiche/common/quiche_stream.h"
 #include "quiche/common/vectorized_io_utils.h"
+#include "quiche/web_transport/stream_helpers.h"
 #include "quiche/web_transport/web_transport.h"
 
 namespace webtransport {
@@ -69,8 +67,7 @@
 
 void EncapsulatedSession::InitializeClient(
     std::unique_ptr<SessionVisitor> visitor,
-    quiche::HttpHeaderBlock& /*outgoing_headers*/, quiche::WriteStream* writer,
-    quiche::ReadStream* reader) {
+    quiche::HttpHeaderBlock& /*outgoing_headers*/, Stream* underlying) {
   if (state_ != kUninitialized) {
     OnFatalError("Called InitializeClient() in an invalid state");
     return;
@@ -81,16 +78,14 @@
   }
 
   visitor_ = std::move(visitor);
-  writer_ = writer;
-  reader_ = reader;
+  underlying_ = underlying;
   state_ = kWaitingForHeaders;
 }
 
 void EncapsulatedSession::InitializeServer(
     std::unique_ptr<SessionVisitor> visitor,
     const quiche::HttpHeaderBlock& /*incoming_headers*/,
-    quiche::HttpHeaderBlock& /*outgoing_headers*/, quiche::WriteStream* writer,
-    quiche::ReadStream* reader) {
+    quiche::HttpHeaderBlock& /*outgoing_headers*/, Stream* underlying) {
   if (state_ != kUninitialized) {
     OnFatalError("Called InitializeServer() in an invalid state");
     return;
@@ -101,8 +96,7 @@
   }
 
   visitor_ = std::move(visitor);
-  writer_ = writer;
-  reader_ = reader;
+  underlying_ = underlying;
   OpenSession();
 }
 void EncapsulatedSession::ProcessIncomingServerHeaders(
@@ -239,7 +233,7 @@
     // datagrams are not subject to queueing.
     case kWaitingForHeaders:
     case kSessionOpen:
-      write_blocked = !writer_->CanWrite();
+      write_blocked = !underlying_->CanWrite();
       break;
     case kSessionClosing:
     case kSessionClosed:
@@ -261,7 +255,7 @@
   std::array spans = {quiche::QuicheMemSlice(std::move(buffer)),
                       quiche::QuicheMemSlice::Copy(datagram)};
   absl::Status write_status =
-      writer_->Writev(absl::MakeSpan(spans), quiche::StreamWriteOptions());
+      underlying_->Writev(absl::MakeSpan(spans), StreamWriteOptions());
   if (!write_status.ok()) {
     OnWriteError(write_status);
     return DatagramStatus{
@@ -281,7 +275,7 @@
 }
 
 void EncapsulatedSession::OnCanWrite() {
-  if (state_ == kUninitialized || !writer_) {
+  if (state_ == kUninitialized || !underlying_) {
     OnFatalError("Trying to write before the session is initialized");
     return;
   }
@@ -291,7 +285,7 @@
   }
 
   if (state_ == kSessionClosing) {
-    if (writer_->CanWrite()) {
+    if (underlying_->CanWrite()) {
       CloseWebTransportSessionCapsule capsule{
           buffered_session_close_.error_code,
           buffered_session_close_.error_message};
@@ -309,9 +303,9 @@
     return;
   }
 
-  while (writer_->CanWrite() && !control_capsule_queue_.empty()) {
-    absl::Status write_status = quiche::WriteIntoStream(
-        *writer_, control_capsule_queue_.front().AsStringView());
+  while (underlying_->CanWrite() && !control_capsule_queue_.empty()) {
+    absl::Status write_status = WriteIntoStream(
+        *underlying_, control_capsule_queue_.front().AsStringView());
     if (!write_status.ok()) {
       OnWriteError(write_status);
       return;
@@ -319,7 +313,7 @@
     control_capsule_queue_.pop_front();
   }
 
-  while (writer_->CanWrite()) {
+  while (underlying_->CanWrite()) {
     absl::StatusOr<StreamId> next_id = scheduler_.PopFront();
     if (!next_id.ok()) {
       QUICHE_DCHECK_EQ(next_id.status().code(), absl::StatusCode::kNotFound);
@@ -340,8 +334,8 @@
   if (state_ == kSessionClosed || state_ == kSessionClosing) {
     return;
   }
-  bool has_fin = quiche::ProcessAllReadableRegions(
-      *reader_, [&](absl::string_view fragment) {
+  bool has_fin =
+      ProcessAllReadableRegions(*underlying_, [&](absl::string_view fragment) {
         capsule_parser_.IngestCapsuleFragment(fragment);
       });
   if (has_fin) {
@@ -488,9 +482,9 @@
 absl::Status EncapsulatedSession::SendFin(absl::string_view data) {
   QUICHE_DCHECK(!fin_sent_);
   fin_sent_ = true;
-  quiche::StreamWriteOptions options;
+  StreamWriteOptions options;
   options.set_send_fin(true);
-  return quiche::WriteIntoStream(*writer_, data, options);
+  return WriteIntoStream(*underlying_, data, options);
 }
 
 void EncapsulatedSession::OnSessionClosed(SessionErrorCode error_code,
@@ -516,8 +510,7 @@
 }
 
 void EncapsulatedSession::OnFatalError(absl::string_view error_message) {
-  QUICHE_DLOG(ERROR) << "Fatal error in encapsulated WebTransport: "
-                     << error_message;
+  QUICHE_DLOG(ERROR) << "Fatal error in encapsulated  " << error_message;
   state_ = kSessionClosed;
   if (fatal_error_callback_) {
     std::move(fatal_error_callback_)(error_message);
@@ -549,7 +542,7 @@
   }
 }
 
-quiche::ReadStream::ReadResult EncapsulatedSession::InnerStream::Read(
+Stream::ReadResult EncapsulatedSession::InnerStream::Read(
     absl::Span<char> output) {
   const size_t total_size = output.size();
   for (const IncomingRead& read : incoming_reads_) {
@@ -563,8 +556,7 @@
   bool fin_consumed = SkipBytes(total_size);
   return ReadResult{total_size, fin_consumed};
 }
-quiche::ReadStream::ReadResult EncapsulatedSession::InnerStream::Read(
-    std::string* output) {
+Stream::ReadResult EncapsulatedSession::InnerStream::Read(std::string* output) {
   const size_t total_size = ReadableBytes();
   const size_t initial_offset = output->size();
   output->resize(initial_offset + total_size);
@@ -577,8 +569,8 @@
   }
   return total_size;
 }
-quiche::ReadStream::PeekResult
-EncapsulatedSession::InnerStream::PeekNextReadableRegion() const {
+Stream::PeekResult EncapsulatedSession::InnerStream::PeekNextReadableRegion()
+    const {
   if (incoming_reads_.empty()) {
     return PeekResult{absl::string_view(), fin_received_, fin_received_};
   }
@@ -614,7 +606,7 @@
 
 absl::Status EncapsulatedSession::InnerStream::Writev(
     const absl::Span<quiche::QuicheMemSlice> data,
-    const quiche::StreamWriteOptions& options) {
+    const StreamWriteOptions& options) {
   // TODO: support zero copy.
   std::vector<absl::string_view> views;
   views.reserve(data.size());
@@ -641,8 +633,8 @@
     session_->OnFatalError("Stream not registered with the scheduler");
     return absl::InternalError("Stream not registered with the scheduler");
   }
-  const bool write_blocked = !session_->writer_->CanWrite() || *should_yield ||
-                             !pending_write_.empty();
+  const bool write_blocked = !session_->underlying_->CanWrite() ||
+                             *should_yield || !pending_write_.empty();
   if (write_blocked) {
     fin_buffered_ = options.send_fin();
     for (absl::string_view chunk : views) {
@@ -681,7 +673,7 @@
 
 void EncapsulatedSession::InnerStream::FlushPendingWrite() {
   QUICHE_DCHECK(!write_side_closed_);
-  QUICHE_DCHECK(session_->writer_->CanWrite());
+  QUICHE_DCHECK(session_->underlying_->CanWrite());
   QUICHE_DCHECK(!pending_write_.empty());
   absl::string_view to_write = pending_write_;
   size_t bytes_written =
@@ -716,8 +708,8 @@
     // TODO: support zero copy.
     views_to_write.push_back(quiche::QuicheMemSlice::Copy(view));
   }
-  absl::Status write_status = session_->writer_->Writev(
-      absl::MakeSpan(views_to_write), quiche::kDefaultStreamWriteOptions);
+  absl::Status write_status = session_->underlying_->Writev(
+      absl::MakeSpan(views_to_write), kDefaultStreamWriteOptions);
   if (!write_status.ok()) {
     session_->OnWriteError(write_status);
     return 0;
@@ -725,12 +717,6 @@
   return total_size;
 }
 
-void EncapsulatedSession::InnerStream::AbruptlyTerminate(absl::Status error) {
-  QUICHE_DLOG(INFO) << "Abruptly terminating the stream due to error: "
-                    << error;
-  ResetDueToInternalError();
-}
-
 void EncapsulatedSession::InnerStream::ResetWithUserCode(
     StreamErrorCode error) {
   if (reset_frame_sent_) {
diff --git a/quiche/web_transport/encapsulated/encapsulated_web_transport.h b/quiche/web_transport/encapsulated/encapsulated_web_transport.h
index 44f489a..cb0d416 100644
--- a/quiche/web_transport/encapsulated/encapsulated_web_transport.h
+++ b/quiche/web_transport/encapsulated/encapsulated_web_transport.h
@@ -26,7 +26,6 @@
 #include "quiche/common/quiche_callbacks.h"
 #include "quiche/common/quiche_circular_deque.h"
 #include "quiche/common/quiche_mem_slice.h"
-#include "quiche/common/quiche_stream.h"
 #include "quiche/common/simple_buffer_allocator.h"
 #include "quiche/web_transport/web_transport.h"
 #include "quiche/web_transport/web_transport_priority_scheduler.h"
@@ -47,9 +46,8 @@
 // arbitrary bidirectional bytestream that can be prefixed with HTTP headers.
 // Specification: https://datatracker.ietf.org/doc/draft-ietf-webtrans-http2/
 class QUICHE_EXPORT EncapsulatedSession
-    : public webtransport::Session,
-      public quiche::WriteStreamVisitor,
-      public quiche::ReadStreamVisitor,
+    : public Session,
+      public StreamVisitor,
       public quiche::CapsuleParser::Visitor {
  public:
   // The state machine of the transport.
@@ -77,16 +75,14 @@
   // thus, the headers are necessary to initialize the session.
   void InitializeClient(std::unique_ptr<SessionVisitor> visitor,
                         quiche::HttpHeaderBlock& outgoing_headers,
-                        quiche::WriteStream* writer,
-                        quiche::ReadStream* reader);
+                        Stream* underlying);
   void InitializeServer(std::unique_ptr<SessionVisitor> visitor,
                         const quiche::HttpHeaderBlock& incoming_headers,
                         quiche::HttpHeaderBlock& outgoing_headers,
-                        quiche::WriteStream* writer,
-                        quiche::ReadStream* reader);
+                        Stream* underlying);
   void ProcessIncomingServerHeaders(const quiche::HttpHeaderBlock& headers);
 
-  // webtransport::Session implementation.
+  // Session implementation.
   void CloseSession(SessionErrorCode error_code,
                     absl::string_view error_message) override;
   Stream* AcceptIncomingBidirectionalStream() override;
@@ -105,10 +101,16 @@
   void SetOnDraining(quiche::SingleUseCallback<void()> callback) override;
   std::optional<std::string> GetNegotiatedSubprotocol() const override;
 
-  // quiche::WriteStreamVisitor implementation.
+  // StreamVisitor implementation.
   void OnCanWrite() override;
-  // quiche::ReadStreamVisitor implementation.
   void OnCanRead() override;
+  void OnResetStreamReceived(StreamErrorCode) override {
+    OnFatalError("Underlying stream was reset");
+  }
+  void OnStopSendingReceived(StreamErrorCode) override {
+    OnFatalError("Underlying stream was reset");
+  }
+  void OnWriteSideInDataRecvdState() override {}
   // quiche::CapsuleParser::Visitor implementation.
   bool OnCapsule(const quiche::Capsule& capsule) override;
   void OnCapsuleParseFailure(absl::string_view error_message) override;
@@ -147,12 +149,9 @@
 
     // WriteStream implementation.
     absl::Status Writev(absl::Span<quiche::QuicheMemSlice> data,
-                        const quiche::StreamWriteOptions& options) override;
+                        const StreamWriteOptions& options) override;
     bool CanWrite() const override;
 
-    // TerminableStream implementation.
-    void AbruptlyTerminate(absl::Status error) override;
-
     // Stream implementation.
     StreamId GetStreamId() const override { return id_; }
     StreamVisitor* visitor() override { return visitor_.get(); }
@@ -224,8 +223,7 @@
   std::unique_ptr<SessionVisitor> visitor_ = nullptr;
   FatalErrorCallback fatal_error_callback_;
   quiche::SingleUseCallback<void()> draining_callback_;
-  quiche::WriteStream* writer_ = nullptr;  // Not owned.
-  quiche::ReadStream* reader_ = nullptr;   // Not owned.
+  Stream* underlying_ = nullptr;  // Not owned.
   quiche::QuicheBufferAllocator* allocator_ =
       quiche::SimpleBufferAllocator::Get();
   quiche::CapsuleParser capsule_parser_;
diff --git a/quiche/web_transport/encapsulated/encapsulated_web_transport_test.cc b/quiche/web_transport/encapsulated/encapsulated_web_transport_test.cc
index 6f27bc6..f80d932 100644
--- a/quiche/web_transport/encapsulated/encapsulated_web_transport_test.cc
+++ b/quiche/web_transport/encapsulated/encapsulated_web_transport_test.cc
@@ -8,7 +8,6 @@
 #include <memory>
 #include <string>
 #include <utility>
-#include <vector>
 
 #include "absl/status/status.h"
 #include "absl/strings/string_view.h"
@@ -17,11 +16,10 @@
 #include "quiche/common/http/http_header_block.h"
 #include "quiche/common/platform/api/quiche_test.h"
 #include "quiche/common/quiche_buffer_allocator.h"
-#include "quiche/common/quiche_mem_slice.h"
-#include "quiche/common/quiche_stream.h"
 #include "quiche/common/simple_buffer_allocator.h"
-#include "quiche/common/test_tools/mock_streams.h"
 #include "quiche/common/test_tools/quiche_test_utils.h"
+#include "quiche/web_transport/stream_helpers.h"
+#include "quiche/web_transport/test_tools/in_memory_stream.h"
 #include "quiche/web_transport/test_tools/mock_web_transport.h"
 #include "quiche/web_transport/web_transport.h"
 
@@ -41,20 +39,15 @@
 class EncapsulatedWebTransportTest : public quiche::test::QuicheTest,
                                      public quiche::CapsuleParser::Visitor {
  public:
-  EncapsulatedWebTransportTest() : parser_(this), reader_(&read_buffer_) {
+  EncapsulatedWebTransportTest() : parser_(this), underlying_(0) {
     ON_CALL(fatal_error_callback_, Call(_))
         .WillByDefault([](absl::string_view error) {
           ADD_FAILURE() << "Fatal session error: " << error;
         });
-    ON_CALL(writer_, Writev(_, _))
-        .WillByDefault([&](absl::Span<quiche::QuicheMemSlice> data,
-                           const quiche::StreamWriteOptions& options) {
-          for (const quiche::QuicheMemSlice& fragment : data) {
-            parser_.IngestCapsuleFragment(fragment.AsStringView());
-          }
-          writer_.ProcessOptions(options);
-          return absl::OkStatus();
-        });
+    ON_CALL(underlying_, OnWrite).WillByDefault([&](absl::string_view data) {
+      parser_.IngestCapsuleFragment(data);
+      return absl::OkStatus();
+    });
   }
 
   std::unique_ptr<EncapsulatedSession> CreateTransport(
@@ -80,7 +73,7 @@
   void ProcessIncomingCapsule(const Capsule& capsule) {
     quiche::QuicheBuffer buffer =
         quiche::SerializeCapsule(capsule, quiche::SimpleBufferAllocator::Get());
-    read_buffer_.append(buffer.data(), buffer.size());
+    underlying_.Receive(buffer.AsStringView());
     session_->OnCanRead();
   }
 
@@ -88,23 +81,21 @@
   void ProcessIncomingCapsule(const CapsuleType& capsule) {
     quiche::QuicheBuffer buffer = quiche::SerializeCapsule(
         quiche::Capsule(capsule), quiche::SimpleBufferAllocator::Get());
-    read_buffer_.append(buffer.data(), buffer.size());
+    underlying_.Receive(buffer.AsStringView());
     session_->OnCanRead();
   }
 
   void DefaultHandshakeForClient(EncapsulatedSession& session) {
     quiche::HttpHeaderBlock outgoing_headers, incoming_headers;
     session.InitializeClient(CreateAndStoreVisitor(), outgoing_headers,
-                             &writer_, &reader_);
+                             &underlying_);
     EXPECT_CALL(*visitor_, OnSessionReady());
     session.ProcessIncomingServerHeaders(incoming_headers);
   }
 
  protected:
   quiche::CapsuleParser parser_;
-  quiche::test::MockWriteStream writer_;
-  std::string read_buffer_;
-  quiche::test::ReadStreamFromString reader_;
+  InMemoryStreamWithMockWrite underlying_;
   MockSessionVisitor* visitor_ = nullptr;
   EncapsulatedSession* session_ = nullptr;
   testing::MockFunction<void(absl::string_view)> fatal_error_callback_;
@@ -127,8 +118,8 @@
       CreateTransport(Perspective::kClient);
   quiche::HttpHeaderBlock outgoing_headers, incoming_headers;
   EXPECT_EQ(session->state(), EncapsulatedSession::kUninitialized);
-  session->InitializeClient(CreateAndStoreVisitor(), outgoing_headers, &writer_,
-                            &reader_);
+  session->InitializeClient(CreateAndStoreVisitor(), outgoing_headers,
+                            &underlying_);
   EXPECT_EQ(session->state(), EncapsulatedSession::kWaitingForHeaders);
   EXPECT_CALL(*visitor_, OnSessionReady());
   session->ProcessIncomingServerHeaders(incoming_headers);
@@ -143,7 +134,7 @@
   std::unique_ptr<SessionVisitor> visitor = CreateAndStoreVisitor();
   EXPECT_CALL(*visitor_, OnSessionReady());
   session->InitializeServer(std::move(visitor), outgoing_headers,
-                            incoming_headers, &writer_, &reader_);
+                            incoming_headers, &underlying_);
   EXPECT_EQ(session->state(), EncapsulatedSession::kSessionOpen);
 }
 
@@ -162,7 +153,7 @@
   EXPECT_CALL(*visitor_, OnSessionClosed(0x1234, StrEq("test close")));
   session->CloseSession(0x1234, "test close");
   EXPECT_EQ(session->state(), EncapsulatedSession::kSessionClosed);
-  EXPECT_TRUE(writer_.fin_written());
+  EXPECT_TRUE(underlying_.fin_sent());
 
   EXPECT_CALL(fatal_error_callback_, Call(_))
       .WillOnce([](absl::string_view error) {
@@ -175,7 +166,8 @@
   std::unique_ptr<EncapsulatedSession> session =
       CreateTransport(Perspective::kClient);
   DefaultHandshakeForClient(*session);
-  EXPECT_CALL(writer_, CanWrite()).WillOnce(Return(false));
+  EXPECT_CALL(underlying_, GetWriteStatus)
+      .WillOnce(Return(absl::UnavailableError("Write-blocked")));
   EXPECT_CALL(*this, OnCapsule(_)).Times(0);
   EXPECT_EQ(session->state(), EncapsulatedSession::kSessionOpen);
   session->CloseSession(0x1234, "test close");
@@ -188,11 +180,12 @@
               "test close");
     return true;
   });
-  EXPECT_CALL(writer_, CanWrite()).WillOnce(Return(true));
+  EXPECT_CALL(underlying_, GetWriteStatus)
+      .WillRepeatedly(Return(absl::OkStatus()));
   EXPECT_CALL(*visitor_, OnSessionClosed(0x1234, StrEq("test close")));
   session->OnCanWrite();
   EXPECT_EQ(session->state(), EncapsulatedSession::kSessionClosed);
-  EXPECT_TRUE(writer_.fin_written());
+  EXPECT_TRUE(underlying_.fin_sent());
 }
 
 TEST_F(EncapsulatedWebTransportTest, ReceiveFin) {
@@ -201,9 +194,9 @@
   DefaultHandshakeForClient(*session);
 
   EXPECT_CALL(*visitor_, OnSessionClosed(0, IsEmpty()));
-  reader_.set_fin();
+  underlying_.Receive("", /*fin=*/true);
   session->OnCanRead();
-  EXPECT_TRUE(writer_.fin_written());
+  EXPECT_TRUE(underlying_.fin_sent());
 }
 
 TEST_F(EncapsulatedWebTransportTest, ReceiveCloseSession) {
@@ -213,8 +206,8 @@
 
   EXPECT_CALL(*visitor_, OnSessionClosed(0x1234, StrEq("test")));
   ProcessIncomingCapsule(Capsule::CloseWebTransportSession(0x1234, "test"));
-  EXPECT_TRUE(writer_.fin_written());
-  reader_.set_fin();
+  EXPECT_TRUE(underlying_.fin_sent());
+  underlying_.Receive("", /*fin=*/true);
   session->OnCanRead();
 }
 
@@ -225,7 +218,7 @@
 
   EXPECT_CALL(fatal_error_callback_, Call(HasSubstr("too much capsule data")))
       .WillOnce([] {});
-  read_buffer_ = std::string(2 * 1024 * 1024, '\xff');
+  underlying_.Receive(std::string(2 * 1024 * 1024, '\xff'));
   session->OnCanRead();
 }
 
@@ -246,8 +239,8 @@
   std::unique_ptr<EncapsulatedSession> session =
       CreateTransport(Perspective::kClient);
   quiche::HttpHeaderBlock outgoing_headers;
-  session->InitializeClient(CreateAndStoreVisitor(), outgoing_headers, &writer_,
-                            &reader_);
+  session->InitializeClient(CreateAndStoreVisitor(), outgoing_headers,
+                            &underlying_);
   EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) {
     EXPECT_EQ(capsule.capsule_type(), quiche::CapsuleType::DATAGRAM);
     EXPECT_EQ(capsule.datagram_capsule().http_datagram_payload, "test");
@@ -317,9 +310,11 @@
   std::unique_ptr<EncapsulatedSession> session =
       CreateTransport(Perspective::kClient);
   DefaultHandshakeForClient(*session);
-  EXPECT_CALL(writer_, Writev(_, _))
+  // Let the CanWrite() check pass, then fail on the actual write.
+  EXPECT_CALL(underlying_, GetWriteStatus)
+      .WillOnce(Return(absl::OkStatus()))
       .WillOnce(Return(absl::InternalError("Test write error")));
-  EXPECT_CALL(fatal_error_callback_, Call(_))
+  EXPECT_CALL(fatal_error_callback_, Call)
       .WillOnce([](absl::string_view error) {
         EXPECT_THAT(error, HasSubstr("Test write error"));
       });
@@ -331,9 +326,11 @@
   std::unique_ptr<EncapsulatedSession> session =
       CreateTransport(Perspective::kClient);
   DefaultHandshakeForClient(*session);
-  EXPECT_CALL(writer_, Writev(_, _))
+  // Let the CanWrite() check pass, then fail on the actual write.
+  EXPECT_CALL(underlying_, GetWriteStatus)
+      .WillOnce(Return(absl::OkStatus()))
       .WillOnce(Return(absl::InternalError("Test write error")));
-  EXPECT_CALL(fatal_error_callback_, Call(_))
+  EXPECT_CALL(fatal_error_callback_, Call)
       .WillOnce([](absl::string_view error) {
         EXPECT_THAT(error, HasSubstr("Test write error"));
       });
@@ -358,13 +355,13 @@
   EXPECT_EQ(stream->visitor(), nullptr);
   EXPECT_EQ(stream->ReadableBytes(), 4u);
 
-  quiche::ReadStream::PeekResult peek = stream->PeekNextReadableRegion();
+  Stream::PeekResult peek = stream->PeekNextReadableRegion();
   EXPECT_EQ(peek.peeked_data, "test");
   EXPECT_FALSE(peek.fin_next);
   EXPECT_FALSE(peek.all_data_received);
 
   std::string buffer;
-  quiche::ReadStream::ReadResult read = stream->Read(&buffer);
+  Stream::ReadResult read = stream->Read(&buffer);
   EXPECT_EQ(read.bytes_read, 4);
   EXPECT_FALSE(read.fin);
   EXPECT_EQ(buffer, "test");
@@ -418,7 +415,7 @@
 
   ProcessIncomingCapsule(quiche::WebTransportStreamDataCapsule{1, "ef", true});
 
-  quiche::ReadStream::PeekResult peek = stream->PeekNextReadableRegion();
+  Stream::PeekResult peek = stream->PeekNextReadableRegion();
   EXPECT_EQ(peek.peeked_data, "abcd");
   EXPECT_FALSE(peek.fin_next);
   EXPECT_TRUE(peek.all_data_received);
@@ -449,7 +446,7 @@
   EXPECT_EQ(stream->ReadableBytes(), 6u);
 
   std::array<char, 3> buffer;
-  quiche::ReadStream::ReadResult read = stream->Read(absl::MakeSpan(buffer));
+  Stream::ReadResult read = stream->Read(absl::MakeSpan(buffer));
   EXPECT_THAT(buffer, ElementsAre('a', 'b', 'c'));
   EXPECT_EQ(read.bytes_read, 3);
   EXPECT_FALSE(read.fin);
@@ -473,7 +470,7 @@
 
   for (int i = 0; i < 64; i++) {
     std::array<char, 1024> buffer;
-    quiche::ReadStream::ReadResult read = stream->Read(absl::MakeSpan(buffer));
+    Stream::ReadResult read = stream->Read(absl::MakeSpan(buffer));
     EXPECT_EQ(read.bytes_read, 1024);
     EXPECT_EQ(read.fin, i == 63);
   }
@@ -552,7 +549,7 @@
   EXPECT_CALL(*visitor, OnDelete()).WillOnce([&] { deleted = true; });
   EXPECT_CALL(*this, OnCapsule(_)).WillOnce(Return(true));
 
-  quiche::StreamWriteOptions options;
+  StreamWriteOptions options;
   options.set_send_fin(true);
   EXPECT_THAT(stream->Writev(absl::Span<quiche::QuicheMemSlice>(), options),
               StatusIs(absl::StatusCode::kOk));
@@ -576,7 +573,7 @@
     EXPECT_EQ(capsule.web_transport_stream_data().data, "test");
     return true;
   });
-  absl::Status status = quiche::WriteIntoStream(*stream, "test");
+  absl::Status status = WriteIntoStream(*stream, "test");
   EXPECT_THAT(status, StatusIs(absl::StatusCode::kOk));
 }
 
@@ -594,10 +591,10 @@
     EXPECT_EQ(capsule.web_transport_stream_data().data, "test");
     return true;
   });
-  quiche::StreamWriteOptions options;
+  StreamWriteOptions options;
   options.set_send_fin(true);
   EXPECT_TRUE(stream->CanWrite());
-  absl::Status status = quiche::WriteIntoStream(*stream, "test", options);
+  absl::Status status = WriteIntoStream(*stream, "test", options);
   EXPECT_THAT(status, StatusIs(absl::StatusCode::kOk));
   EXPECT_FALSE(stream->CanWrite());
 }
@@ -616,7 +613,7 @@
     EXPECT_EQ(capsule.web_transport_stream_data().data, "");
     return true;
   });
-  quiche::StreamWriteOptions options;
+  StreamWriteOptions options;
   options.set_send_fin(true);
   EXPECT_TRUE(stream->CanWrite());
   absl::Status status =
@@ -632,15 +629,17 @@
   Stream* stream = session->OpenOutgoingUnidirectionalStream();
   ASSERT_TRUE(stream != nullptr);
 
-  EXPECT_CALL(writer_, CanWrite()).WillOnce(Return(false));
-  absl::Status status = quiche::WriteIntoStream(*stream, "abc");
+  EXPECT_CALL(underlying_, GetWriteStatus)
+      .WillOnce(Return(absl::UnavailableError("Write-blocked")));
+  absl::Status status = WriteIntoStream(*stream, "abc");
   EXPECT_THAT(status, StatusIs(absl::StatusCode::kOk));
 
   // While the stream cannot be written right now, we should be still able to
   // buffer data into it.
   EXPECT_TRUE(stream->CanWrite());
-  EXPECT_CALL(writer_, CanWrite()).WillRepeatedly(Return(true));
-  status = quiche::WriteIntoStream(*stream, "def");
+  EXPECT_CALL(underlying_, GetWriteStatus)
+      .WillRepeatedly(Return(absl::OkStatus()));
+  status = WriteIntoStream(*stream, "def");
   EXPECT_THAT(status, StatusIs(absl::StatusCode::kOk));
 
   EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) {
@@ -659,13 +658,16 @@
   Stream* stream = session->OpenOutgoingUnidirectionalStream();
   ASSERT_TRUE(stream != nullptr);
 
-  EXPECT_CALL(writer_, CanWrite()).Times(2).WillRepeatedly(Return(false));
-  absl::Status status = quiche::WriteIntoStream(*stream, "abc");
+  EXPECT_CALL(underlying_, GetWriteStatus)
+      .Times(2)
+      .WillRepeatedly(Return(absl::UnavailableError("Write-blocked")));
+  absl::Status status = WriteIntoStream(*stream, "abc");
   EXPECT_THAT(status, StatusIs(absl::StatusCode::kOk));
-  status = quiche::WriteIntoStream(*stream, "def");
+  status = WriteIntoStream(*stream, "def");
   EXPECT_THAT(status, StatusIs(absl::StatusCode::kOk));
 
-  EXPECT_CALL(writer_, CanWrite()).WillRepeatedly(Return(true));
+  EXPECT_CALL(underlying_, GetWriteStatus)
+      .WillRepeatedly(Return(absl::OkStatus()));
   EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) {
     EXPECT_EQ(capsule.capsule_type(), CapsuleType::WT_STREAM);
     EXPECT_EQ(capsule.web_transport_stream_data().stream_id, 2u);
@@ -685,12 +687,14 @@
   ASSERT_TRUE(stream2 != nullptr);
 
   EXPECT_CALL(*this, OnCapsule(_)).Times(0);
-  EXPECT_CALL(writer_, CanWrite()).WillOnce(Return(false));
-  absl::Status status = quiche::WriteIntoStream(*stream1, "abc");
+  EXPECT_CALL(underlying_, GetWriteStatus)
+      .WillOnce(Return(absl::UnavailableError("Write-blocked")));
+  absl::Status status = WriteIntoStream(*stream1, "abc");
   EXPECT_THAT(status, StatusIs(absl::StatusCode::kOk));
   // ShouldYield will return false here, causing the write to get buffered.
-  EXPECT_CALL(writer_, CanWrite()).WillRepeatedly(Return(true));
-  status = quiche::WriteIntoStream(*stream2, "abc");
+  EXPECT_CALL(underlying_, GetWriteStatus)
+      .WillRepeatedly(Return(absl::OkStatus()));
+  status = WriteIntoStream(*stream2, "abc");
   EXPECT_THAT(status, StatusIs(absl::StatusCode::kOk));
 
   std::vector<StreamId> writes;
diff --git a/quiche/web_transport/stream_helpers.h b/quiche/web_transport/stream_helpers.h
new file mode 100644
index 0000000..e7ca5d4
--- /dev/null
+++ b/quiche/web_transport/stream_helpers.h
@@ -0,0 +1,59 @@
+// Copyright 2023 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+// Helper functions for webtransport::Stream API.
+
+#ifndef QUICHE_COMMON_QUICHE_STREAM_H_
+#define QUICHE_COMMON_QUICHE_STREAM_H_
+
+#include "absl/status/status.h"
+#include "absl/strings/string_view.h"
+#include "absl/types/span.h"
+#include "quiche/common/quiche_callbacks.h"
+#include "quiche/common/quiche_mem_slice.h"
+#include "quiche/web_transport/web_transport.h"
+
+namespace webtransport {
+
+// Calls `callback` for every contiguous chunk available inside the stream.
+// Returns true if the FIN has been reached.
+inline bool ProcessAllReadableRegions(
+    Stream& stream,
+    quiche::UnretainedCallback<void(absl::string_view)> callback) {
+  for (;;) {
+    Stream::PeekResult peek_result = stream.PeekNextReadableRegion();
+    if (!peek_result.has_data()) {
+      return false;
+    }
+    callback(peek_result.peeked_data);
+    bool fin = stream.SkipBytes(peek_result.peeked_data.size());
+    if (fin) {
+      return true;
+    }
+  }
+}
+
+// Convenience methods to write a single chunk of data into the stream.
+inline absl::Status WriteIntoStream(
+    Stream& stream, quiche::QuicheMemSlice slice,
+    const StreamWriteOptions& options = kDefaultStreamWriteOptions) {
+  return stream.Writev(absl::MakeSpan(&slice, 1), options);
+}
+inline absl::Status WriteIntoStream(
+    Stream& stream, absl::string_view data,
+    const StreamWriteOptions& options = kDefaultStreamWriteOptions) {
+  quiche::QuicheMemSlice slice = quiche::QuicheMemSlice::Copy(data);
+  return stream.Writev(absl::MakeSpan(&slice, 1), options);
+}
+
+// Convenience methods to send a FIN on the stream.
+inline absl::Status SendFinOnStream(Stream& stream) {
+  StreamWriteOptions options;
+  options.set_send_fin(true);
+  return stream.Writev(absl::Span<quiche::QuicheMemSlice>(), options);
+}
+
+}  // namespace webtransport
+
+#endif  // QUICHE_COMMON_QUICHE_STREAM_H_
diff --git a/quiche/web_transport/test_tools/in_memory_stream.cc b/quiche/web_transport/test_tools/in_memory_stream.cc
index 167b90e..87f49a8 100644
--- a/quiche/web_transport/test_tools/in_memory_stream.cc
+++ b/quiche/web_transport/test_tools/in_memory_stream.cc
@@ -15,13 +15,12 @@
 #include "quiche/common/platform/api/quiche_logging.h"
 #include "quiche/common/platform/api/quiche_test.h"
 #include "quiche/common/quiche_mem_slice.h"
-#include "quiche/common/quiche_stream.h"
 #include "quiche/common/vectorized_io_utils.h"
 #include "quiche/web_transport/web_transport.h"
 
 namespace webtransport::test {
 
-quiche::ReadStream::ReadResult InMemoryStream::Read(absl::Span<char> output) {
+Stream::ReadResult InMemoryStream::Read(absl::Span<char> output) {
   std::vector<absl::string_view> chunks;
   for (absl::string_view chunk : buffer_.Chunks()) {
     chunks.push_back(chunk);
@@ -31,7 +30,7 @@
   return ReadResult{bytes_read, buffer_.empty() && fin_received_};
 }
 
-quiche::ReadStream::ReadResult InMemoryStream::Read(std::string* output) {
+Stream::ReadResult InMemoryStream::Read(std::string* output) {
   ReadResult result;
   result.bytes_read = buffer_.size();
   result.fin = fin_received_;
@@ -42,7 +41,7 @@
 
 size_t InMemoryStream::ReadableBytes() const { return buffer_.size(); }
 
-quiche::ReadStream::PeekResult InMemoryStream::PeekNextReadableRegion() const {
+Stream::PeekResult InMemoryStream::PeekNextReadableRegion() const {
   if (buffer_.empty()) {
     return PeekResult{"", fin_received_, fin_received_};
   }
@@ -58,8 +57,9 @@
   return buffer_.empty() && fin_received_;
 }
 
-absl::Status InMemoryStream::Writev(absl::Span<quiche::QuicheMemSlice> data,
-                                    const quiche::StreamWriteOptions& options) {
+absl::Status InMemoryStream::Writev(
+    absl::Span<quiche::QuicheMemSlice> data,
+    const webtransport::StreamWriteOptions& options) {
   absl::Status status = GetWriteStatusWithExtraChecks();
   if (!status.ok()) {
     return status;
diff --git a/quiche/web_transport/test_tools/in_memory_stream.h b/quiche/web_transport/test_tools/in_memory_stream.h
index a35e8ea..5a27bde 100644
--- a/quiche/web_transport/test_tools/in_memory_stream.h
+++ b/quiche/web_transport/test_tools/in_memory_stream.h
@@ -18,7 +18,6 @@
 #include "quiche/common/platform/api/quiche_logging.h"
 #include "quiche/common/platform/api/quiche_test.h"
 #include "quiche/common/quiche_mem_slice.h"
-#include "quiche/common/quiche_stream.h"
 #include "quiche/web_transport/web_transport.h"
 
 namespace webtransport::test {
@@ -31,22 +30,19 @@
  public:
   explicit InMemoryStream(StreamId id) : id_(id) {}
 
-  // quiche::ReadStream implementation.
+  // webtransport::Stream implementation.
   [[nodiscard]] ReadResult Read(absl::Span<char> output) override;
   [[nodiscard]] ReadResult Read(std::string* output) override;
   size_t ReadableBytes() const override;
   PeekResult PeekNextReadableRegion() const override;
   bool SkipBytes(size_t bytes) override;
 
-  // quiche::WriteStream implementation.
   absl::Status Writev(absl::Span<quiche::QuicheMemSlice> data,
-                      const quiche::StreamWriteOptions& options) override;
+                      const StreamWriteOptions& options) override;
   bool CanWrite() const override {
     return GetWriteStatusWithExtraChecks().ok();
   }
 
-  void AbruptlyTerminate(absl::Status) override { Terminate(); }
-
   // webtransport::Stream implementation.
   StreamId GetStreamId() const override { return id_; }
   void ResetWithUserCode(StreamErrorCode) override {
diff --git a/quiche/web_transport/test_tools/in_memory_stream_test.cc b/quiche/web_transport/test_tools/in_memory_stream_test.cc
index 2c53df7..8068dfb 100644
--- a/quiche/web_transport/test_tools/in_memory_stream_test.cc
+++ b/quiche/web_transport/test_tools/in_memory_stream_test.cc
@@ -13,8 +13,8 @@
 #include "absl/types/span.h"
 #include "quiche/common/platform/api/quiche_test.h"
 #include "quiche/common/quiche_mem_slice.h"
-#include "quiche/common/quiche_stream.h"
 #include "quiche/common/test_tools/quiche_test_utils.h"
+#include "quiche/web_transport/stream_helpers.h"
 #include "quiche/web_transport/web_transport.h"
 
 namespace webtransport::test {
@@ -77,7 +77,7 @@
   EXPECT_TRUE(result.all_data_received);
 
   std::string merged_result;
-  bool fin_reached = quiche::ProcessAllReadableRegions(
+  bool fin_reached = ProcessAllReadableRegions(
       stream,
       [&](absl::string_view chunk) { absl::StrAppend(&merged_result, chunk); });
   EXPECT_EQ(merged_result, absl::StrCat(chunk_a, chunk_b));
@@ -89,8 +89,8 @@
   EXPECT_TRUE(stream.CanWrite());
 
   std::array write_vector = {quiche::QuicheMemSlice::Copy("test")};
-  quiche::StreamWriteOptions options;
   EXPECT_CALL(stream, OnWrite("test"));
+  StreamWriteOptions options;
   QUICHE_EXPECT_OK(stream.Writev(absl::MakeSpan(write_vector), options));
   EXPECT_FALSE(stream.fin_sent());
 
@@ -114,7 +114,7 @@
   EXPECT_TRUE(stream.CanWrite());
 
   std::array write_vector = {quiche::QuicheMemSlice::Copy("foo")};
-  quiche::StreamWriteOptions options;
+  StreamWriteOptions options;
   QUICHE_EXPECT_OK(stream.Writev(absl::MakeSpan(write_vector), options));
   EXPECT_FALSE(stream.fin_sent());
 
diff --git a/quiche/web_transport/test_tools/mock_web_transport.h b/quiche/web_transport/test_tools/mock_web_transport.h
index 77b1595..c3aa295 100644
--- a/quiche/web_transport/test_tools/mock_web_transport.h
+++ b/quiche/web_transport/test_tools/mock_web_transport.h
@@ -21,7 +21,6 @@
 #include "quiche/common/platform/api/quiche_test.h"
 #include "quiche/common/quiche_callbacks.h"
 #include "quiche/common/quiche_mem_slice.h"
-#include "quiche/common/quiche_stream.h"
 #include "quiche/web_transport/web_transport.h"
 
 namespace webtransport {
@@ -42,12 +41,11 @@
   MOCK_METHOD(ReadResult, Read, (std::string * output), (override));
   MOCK_METHOD(absl::Status, Writev,
               (absl::Span<quiche::QuicheMemSlice> data,
-               const quiche::StreamWriteOptions& options),
+               const StreamWriteOptions& options),
               (override));
   MOCK_METHOD(PeekResult, PeekNextReadableRegion, (), (const, override));
   MOCK_METHOD(bool, SkipBytes, (size_t bytes), (override));
   MOCK_METHOD(bool, CanWrite, (), (const, override));
-  MOCK_METHOD(void, AbruptlyTerminate, (absl::Status), (override));
   MOCK_METHOD(size_t, ReadableBytes, (), (const, override));
   MOCK_METHOD(StreamId, GetStreamId, (), (const, override));
   MOCK_METHOD(void, ResetWithUserCode, (StreamErrorCode error), (override));
diff --git a/quiche/web_transport/web_transport.h b/quiche/web_transport/web_transport.h
index 2c60d72..1e84efb 100644
--- a/quiche/web_transport/web_transport.h
+++ b/quiche/web_transport/web_transport.h
@@ -16,12 +16,13 @@
 
 // The dependencies of this API should be kept minimal and independent of
 // specific transport implementations.
+#include "absl/status/status.h"
 #include "absl/strings/string_view.h"
 #include "absl/time/time.h"
 #include "absl/types/span.h"
 #include "quiche/common/platform/api/quiche_export.h"
 #include "quiche/common/quiche_callbacks.h"
-#include "quiche/common/quiche_stream.h"
+#include "quiche/common/quiche_mem_slice.h"
 
 namespace webtransport {
 
@@ -117,11 +118,19 @@
 // events related to a WebTransport stream.  The visitor object is owned by the
 // stream itself, meaning that if the stream is ever fully closed, the visitor
 // will be garbage-collected.
-class QUICHE_EXPORT StreamVisitor : public quiche::ReadStreamVisitor,
-                                    public quiche::WriteStreamVisitor {
+class QUICHE_EXPORT StreamVisitor {
  public:
   virtual ~StreamVisitor() {}
 
+  // Called whenever the stream has new data available to read. Unless otherwise
+  // specified, QUICHE stream reads are level-triggered, which means that the
+  // callback will be called repeatedly as long as there is still data in the
+  // buffer.
+  virtual void OnCanRead() = 0;
+
+  // Called whenever the stream is not write-blocked and can accept new data.
+  virtual void OnCanWrite() = 0;
+
   // Called when RESET_STREAM is received for the stream.
   virtual void OnResetStreamReceived(StreamErrorCode error) = 0;
   // Called when STOP_SENDING is received for the stream.
@@ -132,14 +141,119 @@
   virtual void OnWriteSideInDataRecvdState() = 0;
 };
 
+// Options for writing data into a WriteStream.
+class QUICHE_EXPORT StreamWriteOptions {
+ public:
+  StreamWriteOptions() = default;
+
+  // If send_fin() is set to true, the write operation also sends a FIN on the
+  // stream.
+  bool send_fin() const { return send_fin_; }
+  void set_send_fin(bool send_fin) { send_fin_ = send_fin; }
+
+  // If buffer_unconditionally() is set to true, the write operation will buffer
+  // data even if the internal buffer limit is exceeded.
+  bool buffer_unconditionally() const { return buffer_unconditionally_; }
+  void set_buffer_unconditionally(bool value) {
+    buffer_unconditionally_ = value;
+  }
+
+ private:
+  bool send_fin_ = false;
+  bool buffer_unconditionally_ = false;
+};
+
+inline constexpr StreamWriteOptions kDefaultStreamWriteOptions =
+    StreamWriteOptions();
+
 // A stream (either bidirectional or unidirectional) that is contained within a
 // WebTransport session.
-class QUICHE_EXPORT Stream : public quiche::ReadStream,
-                             public quiche::WriteStream,
-                             public quiche::TerminableStream {
+//
+// This interface is designed around the idea that a network stream stores all
+// of the received data in a sequence of contiguous buffers. Because of that,
+// there are two ways to read from a stream:
+//   - Read() will copy data into a user-provided buffer, reassembling it if it
+//     is split across multiple buffers internally.
+//   - PeekNextReadableRegion()/SkipBytes() let the caller access the underlying
+//     buffers directly, potentially avoiding the copying at the cost of the
+//     caller having to deal with discontinuities.
+//
+// The writes into a WebTransport stream are all-or-nothing.  A Stream object
+// has to either accept all data written into it by returning absl::OkStatus, or
+// ask the caller to try again once via OnCanWrite() by returning
+// absl::UnavailableError.
+class QUICHE_EXPORT Stream {
  public:
+  struct QUICHE_EXPORT ReadResult {
+    // Number of bytes actually read.
+    size_t bytes_read = 0;
+    // Whether the FIN has been received; if true, no further data will arrive
+    // on the stream, and the stream object can be soon potentially garbage
+    // collected.
+    bool fin = false;
+  };
+
+  struct PeekResult {
+    // The next available chunk in the sequencer buffer.
+    absl::string_view peeked_data;
+    // True if all of the data up to the FIN has been read.
+    bool fin_next = false;
+    // True if all of the data up to the FIN has been received (but not
+    // necessarily read).
+    bool all_data_received = false;
+
+    // Indicates that `SkipBytes()` will make progress if called.
+    bool has_data() const { return !peeked_data.empty() || fin_next; }
+  };
+
   virtual ~Stream() {}
 
+  // Reads at most `buffer.size()` bytes into `buffer`.
+  [[nodiscard]] virtual ReadResult Read(absl::Span<char> buffer) = 0;
+
+  // Reads all available data and appends it to the end of `output`.
+  [[nodiscard]] virtual ReadResult Read(std::string* output) = 0;
+
+  // Indicates the total number of bytes that can be read from the stream.
+  virtual size_t ReadableBytes() const = 0;
+
+  // Returns a contiguous buffer to read (or an empty buffer, if there is no
+  // data to read). See `ProcessAllReadableRegions` below for an example of how
+  // to use this method while handling FIN correctly.
+  virtual PeekResult PeekNextReadableRegion() const = 0;
+
+  // Equivalent to reading `bytes`, but does not perform any copying. `bytes`
+  // must be less than or equal to `ReadableBytes()`. The return value indicates
+  // if the FIN has been reached. `SkipBytes(0)` can be used to consume the FIN
+  // if it's the only thing remaining on the stream.
+  [[nodiscard]] virtual bool SkipBytes(size_t bytes) = 0;
+
+  // Writes `data` into the stream.  If the write succeeds, the ownership is
+  // transferred to the stream; if it does not, the behavior is undefined -- the
+  // users of this API should check `CanWrite()` before calling `Writev()`.
+  virtual absl::Status Writev(absl::Span<quiche::QuicheMemSlice> data,
+                              const StreamWriteOptions& options) = 0;
+
+  // Indicates whether it is possible to write into stream right now.
+  virtual bool CanWrite() const = 0;
+
+  // Legacy convenience method for writing a single string_view.  New users
+  // should use quiche::SendFinOnStream instead, since this method does not
+  // return useful failure information.
+  [[nodiscard]] bool SendFin() {
+    StreamWriteOptions options;
+    options.set_send_fin(true);
+    return Writev(absl::Span<quiche::QuicheMemSlice>(), options).ok();
+  }
+
+  // Legacy convenience method for writing a single string_view.  New users
+  // should use quiche::WriteIntoStream instead, since this method does not
+  // return useful failure information.
+  [[nodiscard]] bool Write(absl::string_view data) {
+    quiche::QuicheMemSlice slice = quiche::QuicheMemSlice::Copy(data);
+    return Writev(absl::MakeSpan(&slice, 1), kDefaultStreamWriteOptions).ok();
+  }
+
   // An ID that is unique within the session.  Those are not exposed to the user
   // via the web API, but can be used internally for bookkeeping and
   // diagnostics.