Notify WebTransport stream visitors about the change in the stream state.

This adds three new events that are in various form are required to implement WebTransport, OnResetStreamReceived, OnStopSendingReceived and OnAllOutgoingDataAcknowledged.

PiperOrigin-RevId: 398817385
diff --git a/quic/core/http/end_to_end_test.cc b/quic/core/http/end_to_end_test.cc
index 3e94bf9..f8446f3 100644
--- a/quic/core/http/end_to_end_test.cc
+++ b/quic/core/http/end_to_end_test.cc
@@ -727,13 +727,16 @@
   }
 
   std::string ReadDataFromWebTransportStreamUntilFin(
-      WebTransportStream* stream) {
+      WebTransportStream* stream, MockStreamVisitor* visitor = nullptr) {
     std::string buffer;
     while (true) {
       bool can_read = false;
-      auto visitor = std::make_unique<MockStreamVisitor>();
+      if (visitor == nullptr) {
+        auto visitor_owned = std::make_unique<MockStreamVisitor>();
+        visitor = visitor_owned.get();
+        stream->SetVisitor(std::move(visitor_owned));
+      }
       EXPECT_CALL(*visitor, OnCanRead()).WillOnce(Assign(&can_read, true));
-      stream->SetVisitor(std::move(visitor));
       client_->WaitUntil(5000 /*ms*/, [&can_read]() { return can_read; });
       if (!can_read) {
         ADD_FAILURE() << "Waiting for readable data on stream "
@@ -6096,6 +6099,13 @@
   WebTransportStream* outgoing_stream =
       session->OpenOutgoingUnidirectionalStream();
   ASSERT_TRUE(outgoing_stream != nullptr);
+
+  auto stream_visitor = std::make_unique<NiceMock<MockStreamVisitor>>();
+  bool data_acknowledged = false;
+  EXPECT_CALL(*stream_visitor, OnWriteSideInDataRecvdState())
+      .WillOnce(Assign(&data_acknowledged, true));
+  outgoing_stream->SetVisitor(std::move(stream_visitor));
+
   EXPECT_TRUE(outgoing_stream->Write("test"));
   EXPECT_TRUE(outgoing_stream->SendFin());
 
@@ -6111,6 +6121,10 @@
   WebTransportStream::ReadResult result = received_stream->Read(&received_data);
   EXPECT_EQ(received_data, "test");
   EXPECT_TRUE(result.fin);
+
+  client_->WaitUntil(2000,
+                     [&data_acknowledged]() { return data_acknowledged; });
+  EXPECT_TRUE(data_acknowledged);
 }
 
 TEST_P(EndToEndTest, WebTransportSessionUnidirectionalStreamSentEarly) {
@@ -6161,11 +6175,24 @@
 
   WebTransportStream* stream = session->OpenOutgoingBidirectionalStream();
   ASSERT_TRUE(stream != nullptr);
+
+  auto stream_visitor_owned = std::make_unique<NiceMock<MockStreamVisitor>>();
+  MockStreamVisitor* stream_visitor = stream_visitor_owned.get();
+  bool data_acknowledged = false;
+  EXPECT_CALL(*stream_visitor, OnWriteSideInDataRecvdState())
+      .WillOnce(Assign(&data_acknowledged, true));
+  stream->SetVisitor(std::move(stream_visitor_owned));
+
   EXPECT_TRUE(stream->Write("test"));
   EXPECT_TRUE(stream->SendFin());
 
-  std::string received_data = ReadDataFromWebTransportStreamUntilFin(stream);
+  std::string received_data =
+      ReadDataFromWebTransportStreamUntilFin(stream, stream_visitor);
   EXPECT_EQ(received_data, "test");
+
+  client_->WaitUntil(2000,
+                     [&data_acknowledged]() { return data_acknowledged; });
+  EXPECT_TRUE(data_acknowledged);
 }
 
 TEST_P(EndToEndTest, WebTransportSessionBidirectionalStreamWithBuffering) {
diff --git a/quic/core/http/quic_spdy_stream.cc b/quic/core/http/quic_spdy_stream.cc
index 365ad44..c33e763 100644
--- a/quic/core/http/quic_spdy_stream.cc
+++ b/quic/core/http/quic_spdy_stream.cc
@@ -24,6 +24,7 @@
 #include "quic/core/quic_utils.h"
 #include "quic/core/quic_versions.h"
 #include "quic/core/quic_write_blocked_list.h"
+#include "quic/core/web_transport_interface.h"
 #include "quic/platform/api/quic_bug_tracker.h"
 #include "quic/platform/api/quic_flag_utils.h"
 #include "quic/platform/api/quic_flags.h"
@@ -687,6 +688,12 @@
 
 void QuicSpdyStream::OnStreamReset(const QuicRstStreamFrame& frame) {
   if (web_transport_data_ != nullptr) {
+    WebTransportStreamVisitor* webtransport_visitor =
+        web_transport_data_->adapter.visitor();
+    if (webtransport_visitor != nullptr) {
+      webtransport_visitor->OnResetStreamReceived(
+          Http3ErrorToWebTransportOrDefault(frame.ietf_error_code));
+    }
     QuicStream::OnStreamReset(frame);
     return;
   }
@@ -743,6 +750,29 @@
   QuicStream::ResetWithError(error);
 }
 
+bool QuicSpdyStream::OnStopSending(QuicResetStreamError error) {
+  if (web_transport_data_ != nullptr) {
+    WebTransportStreamVisitor* visitor = web_transport_data_->adapter.visitor();
+    if (visitor != nullptr) {
+      visitor->OnStopSendingReceived(
+          Http3ErrorToWebTransportOrDefault(error.ietf_application_code()));
+    }
+  }
+
+  return QuicStream::OnStopSending(error);
+}
+
+void QuicSpdyStream::OnWriteSideInDataRecvdState() {
+  if (web_transport_data_ != nullptr) {
+    WebTransportStreamVisitor* visitor = web_transport_data_->adapter.visitor();
+    if (visitor != nullptr) {
+      visitor->OnWriteSideInDataRecvdState();
+    }
+  }
+
+  QuicStream::OnWriteSideInDataRecvdState();
+}
+
 void QuicSpdyStream::OnDataAvailable() {
   if (!VersionUsesHttp3(transport_version())) {
     // Sequencer must be blocked until headers are consumed.
diff --git a/quic/core/http/quic_spdy_stream.h b/quic/core/http/quic_spdy_stream.h
index 50befb0..65b3e5e 100644
--- a/quic/core/http/quic_spdy_stream.h
+++ b/quic/core/http/quic_spdy_stream.h
@@ -111,8 +111,8 @@
   // Override the base class to not discard response when receiving
   // QUIC_STREAM_NO_ERROR.
   void OnStreamReset(const QuicRstStreamFrame& frame) override;
-
   void ResetWithError(QuicResetStreamError error) override;
+  bool OnStopSending(QuicResetStreamError error) override;
 
   // Called by the sequencer when new data is available. Decodes the data and
   // calls OnBodyAvailable() to pass to the upper layer.
@@ -365,6 +365,8 @@
     ack_listener_ = std::move(ack_listener);
   }
 
+  void OnWriteSideInDataRecvdState() override;
+
  private:
   friend class test::QuicSpdyStreamPeer;
   friend class test::QuicStreamPeer;
diff --git a/quic/core/http/web_transport_http3.cc b/quic/core/http/web_transport_http3.cc
index 48dccfc..b9b511c 100644
--- a/quic/core/http/web_transport_http3.cc
+++ b/quic/core/http/web_transport_http3.cc
@@ -354,9 +354,34 @@
   session->OnStreamClosed(id());
 }
 
+void WebTransportHttp3UnidirectionalStream::OnStreamReset(
+    const QuicRstStreamFrame& frame) {
+  if (adapter_.visitor() != nullptr) {
+    adapter_.visitor()->OnResetStreamReceived(
+        Http3ErrorToWebTransportOrDefault(frame.ietf_error_code));
+  }
+  QuicStream::OnStreamReset(frame);
+}
+bool WebTransportHttp3UnidirectionalStream::OnStopSending(
+    QuicResetStreamError error) {
+  if (adapter_.visitor() != nullptr) {
+    adapter_.visitor()->OnStopSendingReceived(
+        Http3ErrorToWebTransportOrDefault(error.ietf_application_code()));
+  }
+  return QuicStream::OnStopSending(error);
+}
+void WebTransportHttp3UnidirectionalStream::OnWriteSideInDataRecvdState() {
+  if (adapter_.visitor() != nullptr) {
+    adapter_.visitor()->OnWriteSideInDataRecvdState();
+  }
+
+  QuicStream::OnWriteSideInDataRecvdState();
+}
+
 namespace {
 constexpr uint64_t kWebTransportMappedErrorCodeFirst = 0x52e4a40fa8db;
 constexpr uint64_t kWebTransportMappedErrorCodeLast = 0x52e4a40fa9e2;
+constexpr WebTransportStreamError kDefaultWebTransportError = 0;
 }  // namespace
 
 absl::optional<WebTransportStreamError> Http3ErrorToWebTransport(
@@ -377,6 +402,13 @@
   return result;
 }
 
+WebTransportStreamError Http3ErrorToWebTransportOrDefault(
+    uint64_t http3_error_code) {
+  absl::optional<WebTransportStreamError> result =
+      Http3ErrorToWebTransport(http3_error_code);
+  return result.has_value() ? *result : kDefaultWebTransportError;
+}
+
 uint64_t WebTransportErrorToHttp3(
     WebTransportStreamError webtransport_error_code) {
   return kWebTransportMappedErrorCodeFirst + webtransport_error_code +
diff --git a/quic/core/http/web_transport_http3.h b/quic/core/http/web_transport_http3.h
index 7401f39..ea1897c 100644
--- a/quic/core/http/web_transport_http3.h
+++ b/quic/core/http/web_transport_http3.h
@@ -11,6 +11,7 @@
 #include "absl/container/flat_hash_set.h"
 #include "absl/types/optional.h"
 #include "quic/core/http/quic_spdy_session.h"
+#include "quic/core/quic_error_codes.h"
 #include "quic/core/quic_stream.h"
 #include "quic/core/quic_types.h"
 #include "quic/core/web_transport_interface.h"
@@ -124,6 +125,9 @@
   void OnDataAvailable() override;
   void OnCanWriteNewData() override;
   void OnClose() override;
+  void OnStreamReset(const QuicRstStreamFrame& frame) override;
+  bool OnStopSending(QuicResetStreamError error) override;
+  void OnWriteSideInDataRecvdState() override;
 
   WebTransportStream* interface() { return &adapter_; }
   void SetUnblocked() { sequencer()->SetUnblocked(); }
@@ -144,6 +148,11 @@
 QUIC_EXPORT_PRIVATE absl::optional<WebTransportStreamError>
 Http3ErrorToWebTransport(uint64_t http3_error_code);
 
+// Same as above, but returns default error value (zero) when none could be
+// mapped.
+QUIC_EXPORT_PRIVATE WebTransportStreamError
+Http3ErrorToWebTransportOrDefault(uint64_t http3_error_code);
+
 // Remaps WebTransport error code into an HTTP/3 error code.
 QUIC_EXPORT_PRIVATE uint64_t
 WebTransportErrorToHttp3(WebTransportStreamError webtransport_error_code);
diff --git a/quic/core/quic_stream.cc b/quic/core/quic_stream.cc
index 4c83fe8..08c0358 100644
--- a/quic/core/quic_stream.cc
+++ b/quic/core/quic_stream.cc
@@ -334,6 +334,7 @@
       connection_error_(QUIC_NO_ERROR),
       read_side_closed_(false),
       write_side_closed_(false),
+      write_side_data_recvd_state_notified_(false),
       fin_buffered_(false),
       fin_sent_(false),
       fin_outstanding_(false),
@@ -1075,6 +1076,11 @@
     fin_outstanding_ = false;
     fin_lost_ = false;
   }
+  if (!IsWaitingForAcks() && write_side_closed_ &&
+      !write_side_data_recvd_state_notified_) {
+    OnWriteSideInDataRecvdState();
+    write_side_data_recvd_state_notified_ = true;
+  }
   if (!IsWaitingForAcks() && read_side_closed_ && write_side_closed_) {
     session_->MaybeCloseZombieStream(id_);
   }
diff --git a/quic/core/quic_stream.h b/quic/core/quic_stream.h
index 64e79bd..fcc7fe5 100644
--- a/quic/core/quic_stream.h
+++ b/quic/core/quic_stream.h
@@ -461,6 +461,11 @@
 
   QuicStreamSendBuffer& send_buffer() { return send_buffer_; }
 
+  // Called when the write side of the stream is closed, and all of the outgoing
+  // data has been acknowledged.  This corresponds to the "Data Recvd" state of
+  // RFC 9000.
+  virtual void OnWriteSideInDataRecvdState() {}
+
   // Return the current flow control send window in bytes.
   absl::optional<QuicByteCount> GetSendWindow() const;
   absl::optional<QuicByteCount> GetReceiveWindow() const;
@@ -518,6 +523,9 @@
   // True if the write side is closed, and further writes should fail.
   bool write_side_closed_;
 
+  // True if OnWriteSideInDataRecvdState() has already been called.
+  bool write_side_data_recvd_state_notified_;
+
   // True if the subclass has written a FIN with WriteOrBufferData, but it was
   // buffered in queued_data_ rather than being sent to the session.
   bool fin_buffered_;
diff --git a/quic/core/quic_stream_test.cc b/quic/core/quic_stream_test.cc
index f412870..5d3b84f 100644
--- a/quic/core/quic_stream_test.cc
+++ b/quic/core/quic_stream_test.cc
@@ -65,6 +65,8 @@
 
   MOCK_METHOD(void, OnCanWriteNewData, (), (override));
 
+  MOCK_METHOD(void, OnWriteSideInDataRecvdState, (), (override));
+
   using QuicStream::CanWriteNewData;
   using QuicStream::CanWriteNewDataAfterData;
   using QuicStream::CloseWriteSide;
@@ -882,6 +884,7 @@
   EXPECT_EQ(0u, QuicStreamPeer::SendBuffer(stream_).size());
 
   // FIN is acked.
+  EXPECT_CALL(*stream_, OnWriteSideInDataRecvdState());
   EXPECT_TRUE(stream_->OnStreamFrameAcked(18, 0, true, QuicTime::Delta::Zero(),
                                           QuicTime::Zero(),
                                           &newly_acked_length));
@@ -925,6 +928,7 @@
   // FIN is not acked yet.
   EXPECT_TRUE(stream_->IsWaitingForAcks());
   EXPECT_TRUE(session_->HasUnackedStreamData());
+  EXPECT_CALL(*stream_, OnWriteSideInDataRecvdState());
   EXPECT_TRUE(stream_->OnStreamFrameAcked(27, 0, true, QuicTime::Delta::Zero(),
                                           QuicTime::Zero(),
                                           &newly_acked_length));
@@ -1330,6 +1334,7 @@
   EXPECT_TRUE(session_->HasUnackedStreamData());
 
   // Ack Fin.
+  EXPECT_CALL(*stream_, OnWriteSideInDataRecvdState()).Times(1);
   EXPECT_TRUE(stream_->OnStreamFrameAcked(27, 0, true, QuicTime::Delta::Zero(),
                                           QuicTime::Zero(),
                                           &newly_acked_length));
diff --git a/quic/core/web_transport_interface.h b/quic/core/web_transport_interface.h
index dbfce33..8a1de3e 100644
--- a/quic/core/web_transport_interface.h
+++ b/quic/core/web_transport_interface.h
@@ -29,6 +29,14 @@
   virtual void OnCanRead() = 0;
   // Called whenever the stream is not write-blocked and can accept new data.
   virtual void OnCanWrite() = 0;
+
+  // Called when RESET_STREAM is received for the stream.
+  virtual void OnResetStreamReceived(WebTransportStreamError error) = 0;
+  // Called when STOP_SENDING is received for the stream.
+  virtual void OnStopSendingReceived(WebTransportStreamError error) = 0;
+  // Called when the write side of the stream is closed and all of the data sent
+  // has been acknowledged ("Data Recvd" state of RFC 9000).
+  virtual void OnWriteSideInDataRecvdState() = 0;
 };
 
 // A stream (either bidirectional or unidirectional) that is contained within a
diff --git a/quic/test_tools/quic_transport_test_tools.h b/quic/test_tools/quic_transport_test_tools.h
index afa8b54..5babfd2 100644
--- a/quic/test_tools/quic_transport_test_tools.h
+++ b/quic/test_tools/quic_transport_test_tools.h
@@ -34,6 +34,12 @@
  public:
   MOCK_METHOD(void, OnCanRead, (), (override));
   MOCK_METHOD(void, OnCanWrite, (), (override));
+
+  MOCK_METHOD(void, OnResetStreamReceived, (WebTransportStreamError error),
+              (override));
+  MOCK_METHOD(void, OnStopSendingReceived, (WebTransportStreamError error),
+              (override));
+  MOCK_METHOD(void, OnWriteSideInDataRecvdState, (), (override));
 };
 
 }  // namespace test
diff --git a/quic/tools/web_transport_test_visitors.h b/quic/tools/web_transport_test_visitors.h
index 9d7a5fc..3bc6471 100644
--- a/quic/tools/web_transport_test_visitors.h
+++ b/quic/tools/web_transport_test_visitors.h
@@ -29,6 +29,10 @@
 
   void OnCanWrite() override {}
 
+  void OnResetStreamReceived(WebTransportStreamError /*error*/) override {}
+  void OnStopSendingReceived(WebTransportStreamError /*error*/) override {}
+  void OnWriteSideInDataRecvdState() override {}
+
  private:
   WebTransportStream* stream_;
 };
@@ -69,6 +73,10 @@
     }
   }
 
+  void OnResetStreamReceived(WebTransportStreamError /*error*/) override {}
+  void OnStopSendingReceived(WebTransportStreamError /*error*/) override {}
+  void OnWriteSideInDataRecvdState() override {}
+
  private:
   WebTransportStream* stream_;
   std::string buffer_;
@@ -100,6 +108,10 @@
 
   void OnCanWrite() override { QUIC_NOTREACHED(); }
 
+  void OnResetStreamReceived(WebTransportStreamError /*error*/) override {}
+  void OnStopSendingReceived(WebTransportStreamError /*error*/) override {}
+  void OnWriteSideInDataRecvdState() override {}
+
  private:
   WebTransportStream* stream_;
   std::string buffer_;
@@ -129,6 +141,10 @@
     QUICHE_DCHECK(fin_sent);
   }
 
+  void OnResetStreamReceived(WebTransportStreamError /*error*/) override {}
+  void OnStopSendingReceived(WebTransportStreamError /*error*/) override {}
+  void OnWriteSideInDataRecvdState() override {}
+
  private:
   WebTransportStream* stream_;
   std::string data_;