Refactor the read part of WebTransport Stream API into ReadStream.

The following changes are included:
- Read()/ReadableBytes() are now a part of their own interface.
- Introduce PeekNextAvaliableRegion()/MarkConsumed() as a zero-copy alternative to read.
- Introduce a utility method for to use the peek API while taking care of reading FIN correctly.
- Make webtransport::Stream a direct subclass of TerminableStream to avoid diamond inheritance problem.

The long-term plan here is to abstract multiple kinds of streams for the goals of public APIs, including incoming QUIC streams, incoming WebTransport streams, and incoming HTTP request/response bodies.

PiperOrigin-RevId: 568695001
diff --git a/quiche/common/quiche_stream.h b/quiche/common/quiche_stream.h
index 072b3ca..9de876b 100644
--- a/quiche/common/quiche_stream.h
+++ b/quiche/common/quiche_stream.h
@@ -2,15 +2,19 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
-// General-purpose abstractions for a write stream.
+// General-purpose abstractions for read/write streams.
 
 #ifndef QUICHE_COMMON_QUICHE_STREAM_H_
 #define QUICHE_COMMON_QUICHE_STREAM_H_
 
+#include <cstddef>
+#include <string>
+
 #include "absl/status/status.h"
 #include "absl/strings/string_view.h"
 #include "absl/types/span.h"
 #include "quiche/common/platform/api/quiche_export.h"
+#include "quiche/common/quiche_callbacks.h"
 
 namespace quiche {
 
@@ -26,6 +30,92 @@
   virtual void AbruptlyTerminate(absl::Status error) = 0;
 };
 
+// A general-purpose visitor API that gets notifications for ReadStream-related
+// events.
+class QUICHE_EXPORT ReadStreamVisitor {
+ public:
+  virtual ~ReadStreamVisitor() = default;
+
+  // Called whenever the stream has new data available to read. Unless otherwise
+  // specified, QUICHE stream reads are level-triggered, which means that the
+  // callback will be called repeatedly as long as there is still data in the
+  // buffer.
+  virtual void OnCanRead() = 0;
+};
+
+// General purpose abstraction for a stream of data that can be read from the
+// network. The class is designed around the idea that a network stream stores
+// all of the received data in a sequence of contiguous buffers. Because of
+// that, there are two ways to read from a stream:
+//   - Read() will copy data into a user-provided buffer, reassembling it if it
+//     is split across multiple buffers internally.
+//   - PeekNextReadableRegion()/SkipBytes() let the caller access the underlying
+//     buffers directly, potentially avoiding the copying at the cost of the
+//     caller having to deal with discontinuities.
+class QUICHE_EXPORT ReadStream {
+ public:
+  struct QUICHE_EXPORT ReadResult {
+    // Number of bytes actually read.
+    size_t bytes_read = 0;
+    // Whether the FIN has been received; if true, no further data will arrive
+    // on the stream, and the stream object can be soon potentially garbage
+    // collected.
+    bool fin = false;
+  };
+
+  struct PeekResult {
+    // The next available chunk in the sequencer buffer.
+    absl::string_view peeked_data;
+    // True if all of the data up to the FIN has been read.
+    bool fin_next = false;
+    // True if all of the data up to the FIN has been received (but not
+    // necessarily read).
+    bool all_data_received = false;
+
+    // Indicates that `SkipBytes()` will make progress if called.
+    bool has_data() const { return !peeked_data.empty() || fin_next; }
+  };
+
+  virtual ~ReadStream() = default;
+
+  // Reads at most `buffer.size()` bytes into `buffer`.
+  [[nodiscard]] virtual ReadResult Read(absl::Span<char> buffer) = 0;
+
+  // Reads all available data and appends it to the end of `output`.
+  [[nodiscard]] virtual ReadResult Read(std::string* output) = 0;
+
+  // Indicates the total number of bytes that can be read from the stream.
+  virtual size_t ReadableBytes() const = 0;
+
+  // Returns a contiguous buffer to read (or an empty buffer, if there is no
+  // data to read). See `ProcessAllReadableRegions` below for an example of how
+  // to use this method while handling FIN correctly.
+  virtual PeekResult PeekNextReadableRegion() const = 0;
+
+  // Equivalent to reading `bytes`, but does not perform any copying. `bytes`
+  // must be less than or equal to `ReadableBytes()`. The return value indicates
+  // if the FIN has been reached. `SkipBytes(0)` can be used to consume the FIN
+  // if it's the only thing remaining on the stream.
+  [[nodiscard]] virtual bool SkipBytes(size_t bytes) = 0;
+};
+
+// Calls `callback` for every contiguous chunk available inside the stream.
+// Returns true if the FIN has been reached.
+inline bool ProcessAllReadableRegions(
+    ReadStream& stream, UnretainedCallback<void(absl::string_view)> callback) {
+  for (;;) {
+    ReadStream::PeekResult peek_result = stream.PeekNextReadableRegion();
+    if (!peek_result.has_data()) {
+      return false;
+    }
+    callback(peek_result.peeked_data);
+    bool fin = stream.SkipBytes(peek_result.peeked_data.size());
+    if (fin) {
+      return true;
+    }
+  }
+}
+
 // A general-purpose visitor API that gets notifications for WriteStream-related
 // events.
 class QUICHE_EXPORT WriteStreamVisitor {
@@ -59,7 +149,7 @@
 // to either accept all data written into it by returning absl::OkStatus, or ask
 // the caller to try again once via OnCanWrite() by returning
 // absl::UnavailableError.
-class QUICHE_EXPORT WriteStream : public TerminableStream {
+class QUICHE_EXPORT WriteStream {
  public:
   virtual ~WriteStream() {}
 
diff --git a/quiche/quic/core/http/web_transport_stream_adapter.cc b/quiche/quic/core/http/web_transport_stream_adapter.cc
index f484249..18928f3 100644
--- a/quiche/quic/core/http/web_transport_stream_adapter.cc
+++ b/quiche/quic/core/http/web_transport_stream_adapter.cc
@@ -4,13 +4,26 @@
 
 #include "quiche/quic/core/http/web_transport_stream_adapter.h"
 
+#include <cstddef>
+#include <string>
+#include <vector>
+
 #include "absl/status/status.h"
+#include "absl/status/statusor.h"
+#include "absl/strings/string_view.h"
+#include "absl/types/span.h"
 #include "quiche/quic/core/http/web_transport_http3.h"
 #include "quiche/quic/core/quic_error_codes.h"
+#include "quiche/quic/core/quic_session.h"
+#include "quiche/quic/core/quic_stream.h"
+#include "quiche/quic/core/quic_stream_sequencer.h"
 #include "quiche/quic/core/quic_types.h"
-#include "quiche/common/platform/api/quiche_mem_slice.h"
-#include "quiche/common/quiche_buffer_allocator.h"
+#include "quiche/quic/core/web_transport_interface.h"
+#include "quiche/quic/platform/api/quic_bug_tracker.h"
+#include "quiche/quic/platform/api/quic_flags.h"
+#include "quiche/quic/platform/api/quic_logging.h"
 #include "quiche/common/quiche_mem_slice_storage.h"
+#include "quiche/common/quiche_stream.h"
 #include "quiche/web_transport/web_transport.h"
 
 namespace quic {
@@ -120,6 +133,28 @@
   return sequencer_->ReadableBytes();
 }
 
+quiche::ReadStream::PeekResult
+WebTransportStreamAdapter::PeekNextReadableRegion() const {
+  iovec iov;
+  PeekResult result;
+  if (sequencer_->GetReadableRegion(&iov)) {
+    result.peeked_data =
+        absl::string_view(static_cast<const char*>(iov.iov_base), iov.iov_len);
+  }
+  result.fin_next = sequencer_->IsClosed();
+  result.all_data_received = sequencer_->IsAllDataAvailable();
+  return result;
+}
+
+bool WebTransportStreamAdapter::SkipBytes(size_t bytes) {
+  sequencer_->MarkConsumed(bytes);
+  if (!fin_read_ && sequencer_->IsClosed()) {
+    fin_read_ = true;
+    stream_->OnFinRead();
+  }
+  return sequencer_->IsClosed();
+}
+
 void WebTransportStreamAdapter::OnDataAvailable() {
   if (visitor_ == nullptr) {
     return;
diff --git a/quiche/quic/core/http/web_transport_stream_adapter.h b/quiche/quic/core/http/web_transport_stream_adapter.h
index 24c250a..004a62f 100644
--- a/quiche/quic/core/http/web_transport_stream_adapter.h
+++ b/quiche/quic/core/http/web_transport_stream_adapter.h
@@ -5,18 +5,30 @@
 #ifndef QUICHE_QUIC_CORE_HTTP_WEB_TRANSPORT_STREAM_ADAPTER_H_
 #define QUICHE_QUIC_CORE_HTTP_WEB_TRANSPORT_STREAM_ADAPTER_H_
 
+#include <cstddef>
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "absl/base/attributes.h"
+#include "absl/status/status.h"
+#include "absl/strings/string_view.h"
+#include "absl/types/span.h"
+#include "quiche/quic/core/quic_error_codes.h"
 #include "quiche/quic/core/quic_session.h"
 #include "quiche/quic/core/quic_stream.h"
 #include "quiche/quic/core/quic_stream_sequencer.h"
 #include "quiche/quic/core/quic_types.h"
 #include "quiche/quic/core/web_transport_interface.h"
+#include "quiche/common/platform/api/quiche_export.h"
+#include "quiche/common/quiche_stream.h"
 #include "quiche/web_transport/web_transport.h"
 
 namespace quic {
 
 // Converts WebTransportStream API calls into QuicStream API calls.  The users
 // of this class can either subclass it, or wrap around it.
-class QUICHE_EXPORT WebTransportStreamAdapter : public WebTransportStream {
+class QUICHE_EXPORT WebTransportStreamAdapter : public webtransport::Stream {
  public:
   WebTransportStreamAdapter(QuicSession* session, QuicStream* stream,
                             QuicStreamSequencer* sequencer);
@@ -29,6 +41,8 @@
   bool CanWrite() const override;
   void AbruptlyTerminate(absl::Status error) override;
   size_t ReadableBytes() const override;
+  PeekResult PeekNextReadableRegion() const override;
+  bool SkipBytes(size_t bytes) override;
   void SetVisitor(std::unique_ptr<WebTransportStreamVisitor> visitor) override {
     visitor_ = std::move(visitor);
   }
diff --git a/quiche/quic/core/quic_generic_session_test.cc b/quiche/quic/core/quic_generic_session_test.cc
index 16febf0..ce5d184 100644
--- a/quiche/quic/core/quic_generic_session_test.cc
+++ b/quiche/quic/core/quic_generic_session_test.cc
@@ -33,6 +33,8 @@
 #include "quiche/quic/test_tools/simulator/test_harness.h"
 #include "quiche/quic/test_tools/web_transport_test_tools.h"
 #include "quiche/quic/tools/web_transport_test_visitors.h"
+#include "quiche/common/quiche_stream.h"
+#include "quiche/common/test_tools/quiche_test_utils.h"
 #include "quiche/web_transport/web_transport.h"
 
 namespace quic::test {
@@ -263,6 +265,76 @@
   EXPECT_EQ(buffer, "Stream One");
 }
 
+TEST_F(QuicGenericSessionTest, EchoStreamsUsingPeekApi) {
+  CreateDefaultEndpoints(kEchoServer);
+  WireUpEndpoints();
+  RunHandshake();
+
+  // Send two streams, a bidirectional and a unidirectional one, but only send
+  // FIN on the second one.
+  webtransport::Stream* stream1 =
+      client_->session()->OpenOutgoingBidirectionalStream();
+  EXPECT_TRUE(stream1->Write("Stream One"));
+  webtransport::Stream* stream2 =
+      client_->session()->OpenOutgoingUnidirectionalStream();
+  EXPECT_TRUE(stream2->Write("Stream Two"));
+  EXPECT_TRUE(stream2->SendFin());
+
+  // Wait until the unidirectional stream is received back.
+  bool stream_received_unidi = false;
+  EXPECT_CALL(*client_->visitor(), OnIncomingUnidirectionalStreamAvailable())
+      .WillOnce(Assign(&stream_received_unidi, true));
+  ASSERT_TRUE(test_harness_.RunUntilWithDefaultTimeout(
+      [&]() { return stream_received_unidi; }));
+
+  // Receive the unidirectional echo reply.
+  webtransport::Stream* reply =
+      client_->session()->AcceptIncomingUnidirectionalStream();
+  ASSERT_TRUE(reply != nullptr);
+  std::string buffer;
+  quiche::ReadStream::PeekResult peek_result = reply->PeekNextReadableRegion();
+  EXPECT_EQ(peek_result.peeked_data, "Stream Two");
+  EXPECT_EQ(peek_result.fin_next, false);
+  EXPECT_EQ(peek_result.all_data_received, true);
+  bool fin_received =
+      quiche::ProcessAllReadableRegions(*reply, [&](absl::string_view chunk) {
+        buffer.append(chunk.data(), chunk.size());
+        return true;
+      });
+  EXPECT_TRUE(fin_received);
+  EXPECT_EQ(buffer, "Stream Two");
+
+  // Receive the bidirectional stream reply without a FIN.
+  ASSERT_TRUE(test_harness_.RunUntilWithDefaultTimeout(
+      [&]() { return stream1->PeekNextReadableRegion().has_data(); }));
+  peek_result = stream1->PeekNextReadableRegion();
+  EXPECT_EQ(peek_result.peeked_data, "Stream One");
+  EXPECT_EQ(peek_result.fin_next, false);
+  EXPECT_EQ(peek_result.all_data_received, false);
+  fin_received = stream1->SkipBytes(strlen("Stream One"));
+  EXPECT_FALSE(fin_received);
+  peek_result = stream1->PeekNextReadableRegion();
+  EXPECT_EQ(peek_result.peeked_data, "");
+  EXPECT_EQ(peek_result.fin_next, false);
+  EXPECT_EQ(peek_result.all_data_received, false);
+
+  // Send FIN on the first stream, and expect to receive it back.
+  EXPECT_TRUE(stream1->SendFin());
+  ASSERT_TRUE(test_harness_.RunUntilWithDefaultTimeout(
+      [&]() { return stream1->PeekNextReadableRegion().all_data_received; }));
+  peek_result = stream1->PeekNextReadableRegion();
+  EXPECT_EQ(peek_result.peeked_data, "");
+  EXPECT_EQ(peek_result.fin_next, true);
+  EXPECT_EQ(peek_result.all_data_received, true);
+
+  // Read FIN and expect the stream to get garbage collected.
+  webtransport::StreamId id = stream1->GetStreamId();
+  EXPECT_TRUE(client_->session()->GetStreamById(id) != nullptr);
+  fin_received = stream1->SkipBytes(0);
+  EXPECT_TRUE(fin_received);
+  EXPECT_TRUE(client_->session()->GetStreamById(id) == nullptr);
+}
+
 TEST_F(QuicGenericSessionTest, EchoDatagram) {
   CreateDefaultEndpoints(kEchoServer);
   WireUpEndpoints();
diff --git a/quiche/web_transport/test_tools/mock_web_transport.h b/quiche/web_transport/test_tools/mock_web_transport.h
index 8855c9c..9ebf239 100644
--- a/quiche/web_transport/test_tools/mock_web_transport.h
+++ b/quiche/web_transport/test_tools/mock_web_transport.h
@@ -42,6 +42,8 @@
               (absl::Span<const absl::string_view> data,
                const quiche::StreamWriteOptions& options),
               (override));
+  MOCK_METHOD(PeekResult, PeekNextReadableRegion, (), (const, override));
+  MOCK_METHOD(bool, SkipBytes, (size_t bytes), (override));
   MOCK_METHOD(bool, CanWrite, (), (const, override));
   MOCK_METHOD(void, AbruptlyTerminate, (absl::Status), (override));
   MOCK_METHOD(size_t, ReadableBytes, (), (const, override));
diff --git a/quiche/web_transport/web_transport.h b/quiche/web_transport/web_transport.h
index 741be73..be08f89 100644
--- a/quiche/web_transport/web_transport.h
+++ b/quiche/web_transport/web_transport.h
@@ -93,13 +93,11 @@
 // events related to a WebTransport stream.  The visitor object is owned by the
 // stream itself, meaning that if the stream is ever fully closed, the visitor
 // will be garbage-collected.
-class QUICHE_EXPORT StreamVisitor : public quiche::WriteStreamVisitor {
+class QUICHE_EXPORT StreamVisitor : public quiche::ReadStreamVisitor,
+                                    public quiche::WriteStreamVisitor {
  public:
   virtual ~StreamVisitor() {}
 
-  // Called whenever the stream has readable data available.
-  virtual void OnCanRead() = 0;
-
   // Called when RESET_STREAM is received for the stream.
   virtual void OnResetStreamReceived(StreamErrorCode error) = 0;
   // Called when STOP_SENDING is received for the stream.
@@ -112,27 +110,12 @@
 
 // A stream (either bidirectional or unidirectional) that is contained within a
 // WebTransport session.
-class QUICHE_EXPORT Stream : public quiche::WriteStream {
+class QUICHE_EXPORT Stream : public quiche::ReadStream,
+                             public quiche::WriteStream,
+                             public quiche::TerminableStream {
  public:
-  struct QUICHE_EXPORT ReadResult {
-    // Number of bytes actually read.
-    size_t bytes_read;
-    // Whether the FIN has been received; if true, no further data will arrive
-    // on the stream, and the stream object can be soon potentially garbage
-    // collected.
-    bool fin;
-  };
-
   virtual ~Stream() {}
 
-  // Reads at most |buffer.size()| bytes into |buffer|.
-  [[nodiscard]] virtual ReadResult Read(absl::Span<char> buffer) = 0;
-  // Reads all available data and appends it to the end of |output|.
-  [[nodiscard]] virtual ReadResult Read(std::string* output) = 0;
-
-  // Indicates the number of bytes that can be read from the stream.
-  virtual size_t ReadableBytes() const = 0;
-
   // An ID that is unique within the session.  Those are not exposed to the user
   // via the web API, but can be used internally for bookkeeping and
   // diagnostics.