Implement RESET_STREAM/STOP_SENDING in WebTransport.

This adds an API to QuicStream to send RESET_STREAM or STOP_SENDING individually, as that is required by the WebTransport API.

PiperOrigin-RevId: 400840270
diff --git a/quic/core/http/end_to_end_test.cc b/quic/core/http/end_to_end_test.cc
index eb1f66c..0964eff 100644
--- a/quic/core/http/end_to_end_test.cc
+++ b/quic/core/http/end_to_end_test.cc
@@ -80,7 +80,7 @@
 using ::testing::Assign;
 using ::testing::Invoke;
 using ::testing::NiceMock;
-using testing::NotNull;
+using ::testing::UnorderedElementsAreArray;
 
 namespace quic {
 namespace test {
@@ -728,7 +728,15 @@
 
   std::string ReadDataFromWebTransportStreamUntilFin(
       WebTransportStream* stream, MockStreamVisitor* visitor = nullptr) {
+    QuicStreamId id = stream->GetStreamId();
     std::string buffer;
+
+    // Try reading data if immediately available.
+    WebTransportStream::ReadResult result = stream->Read(&buffer);
+    if (result.fin) {
+      return buffer;
+    }
+
     while (true) {
       bool can_read = false;
       if (visitor == nullptr) {
@@ -739,12 +747,17 @@
       EXPECT_CALL(*visitor, OnCanRead()).WillOnce(Assign(&can_read, true));
       client_->WaitUntil(5000 /*ms*/, [&can_read]() { return can_read; });
       if (!can_read) {
-        ADD_FAILURE() << "Waiting for readable data on stream "
-                      << stream->GetStreamId() << " timed out";
+        ADD_FAILURE() << "Waiting for readable data on stream " << id
+                      << " timed out";
+        return buffer;
+      }
+      if (GetClientSession()->GetOrCreateSpdyDataStream(id) == nullptr) {
+        ADD_FAILURE() << "Stream " << id
+                      << " was deleted while waiting for incoming data";
         return buffer;
       }
 
-      WebTransportStream::ReadResult result = stream->Read(&buffer);
+      result = stream->Read(&buffer);
       if (result.fin) {
         return buffer;
       }
@@ -756,6 +769,19 @@
     }
   }
 
+  void ReadAllIncomingWebTransportUnidirectionalStreams(
+      WebTransportSession* session) {
+    while (true) {
+      WebTransportStream* received_stream =
+          session->AcceptIncomingUnidirectionalStream();
+      if (received_stream == nullptr) {
+        break;
+      }
+      received_webtransport_unidirectional_streams_.push_back(
+          ReadDataFromWebTransportStreamUntilFin(received_stream));
+    }
+  }
+
   void WaitForNewConnectionIds() {
     // Wait until a new server CID is available for another migration.
     const auto* client_connection = GetClientConnection();
@@ -796,6 +822,7 @@
   int override_client_connection_id_length_ = -1;
   uint8_t expected_server_connection_id_length_;
   bool enable_web_transport_ = false;
+  std::vector<std::string> received_webtransport_unidirectional_streams_;
 };
 
 // Run all end to end tests with all supported versions.
@@ -6355,6 +6382,62 @@
   EXPECT_TRUE(spdy_stream == nullptr);
 }
 
+TEST_P(EndToEndTest, WebTransportSessionStreamTermination) {
+  enable_web_transport_ = true;
+  ASSERT_TRUE(Initialize());
+
+  if (!version_.UsesHttp3()) {
+    return;
+  }
+
+  WebTransportHttp3* session =
+      CreateWebTransportSession("/resets", /*wait_for_server_response=*/true);
+  ASSERT_TRUE(session != nullptr);
+
+  NiceMock<MockClientVisitor>& visitor = SetupWebTransportVisitor(session);
+  EXPECT_CALL(visitor, OnIncomingUnidirectionalStreamAvailable())
+      .WillRepeatedly([this, session]() {
+        ReadAllIncomingWebTransportUnidirectionalStreams(session);
+      });
+
+  WebTransportStream* stream = session->OpenOutgoingBidirectionalStream();
+  QuicStreamId id1 = stream->GetStreamId();
+  ASSERT_TRUE(stream != nullptr);
+  EXPECT_TRUE(stream->Write("test"));
+  stream->ResetWithUserCode(42);
+
+  // This read fails if the stream is closed in both directions, since that
+  // results in stream object being deleted.
+  std::string received_data = ReadDataFromWebTransportStreamUntilFin(stream);
+  EXPECT_LE(received_data.size(), 4u);
+
+  stream = session->OpenOutgoingBidirectionalStream();
+  QuicStreamId id2 = stream->GetStreamId();
+  ASSERT_TRUE(stream != nullptr);
+  EXPECT_TRUE(stream->Write("test"));
+  stream->SendStopSending(24);
+
+  std::array<std::string, 2> expected_log = {
+      absl::StrCat("Received reset for stream ", id1, " with error code 42"),
+      absl::StrCat("Received stop sending for stream ", id2,
+                   " with error code 24"),
+  };
+  client_->WaitUntil(2000, [this, &expected_log]() {
+    return received_webtransport_unidirectional_streams_.size() >=
+           expected_log.size();
+  });
+  EXPECT_THAT(received_webtransport_unidirectional_streams_,
+              UnorderedElementsAreArray(expected_log));
+
+  // Since we closed the read side, cleanly closing the write side should result
+  // in the stream getting deleted.
+  ASSERT_TRUE(GetClientSession()->GetOrCreateSpdyDataStream(id2) != nullptr);
+  EXPECT_TRUE(stream->SendFin());
+  EXPECT_TRUE(client_->WaitUntil(2000, [this, id2]() {
+    return GetClientSession()->GetOrCreateSpdyDataStream(id2) == nullptr;
+  }));
+}
+
 }  // namespace
 }  // namespace test
 }  // namespace quic
diff --git a/quic/core/http/quic_spdy_stream.h b/quic/core/http/quic_spdy_stream.h
index ef7cd59..6a9f71f 100644
--- a/quic/core/http/quic_spdy_stream.h
+++ b/quic/core/http/quic_spdy_stream.h
@@ -24,6 +24,7 @@
 #include "quic/core/http/http_encoder.h"
 #include "quic/core/http/quic_header_list.h"
 #include "quic/core/http/quic_spdy_stream_body_manager.h"
+#include "quic/core/http/web_transport_stream_adapter.h"
 #include "quic/core/qpack/qpack_decoded_headers_accumulator.h"
 #include "quic/core/quic_error_codes.h"
 #include "quic/core/quic_packets.h"
@@ -31,7 +32,6 @@
 #include "quic/core/quic_stream_sequencer.h"
 #include "quic/core/quic_types.h"
 #include "quic/core/web_transport_interface.h"
-#include "quic/core/web_transport_stream_adapter.h"
 #include "quic/platform/api/quic_export.h"
 #include "quic/platform/api/quic_flags.h"
 #include "quic/platform/api/quic_socket_address.h"
diff --git a/quic/core/http/web_transport_http3.h b/quic/core/http/web_transport_http3.h
index ddcdd54..cd715a1 100644
--- a/quic/core/http/web_transport_http3.h
+++ b/quic/core/http/web_transport_http3.h
@@ -11,11 +11,11 @@
 #include "absl/container/flat_hash_set.h"
 #include "absl/types/optional.h"
 #include "quic/core/http/quic_spdy_session.h"
+#include "quic/core/http/web_transport_stream_adapter.h"
 #include "quic/core/quic_error_codes.h"
 #include "quic/core/quic_stream.h"
 #include "quic/core/quic_types.h"
 #include "quic/core/web_transport_interface.h"
-#include "quic/core/web_transport_stream_adapter.h"
 #include "spdy/core/spdy_header_block.h"
 
 namespace quic {
diff --git a/quic/core/web_transport_stream_adapter.cc b/quic/core/http/web_transport_stream_adapter.cc
similarity index 86%
rename from quic/core/web_transport_stream_adapter.cc
rename to quic/core/http/web_transport_stream_adapter.cc
index 171a3c0..bea1377 100644
--- a/quic/core/web_transport_stream_adapter.cc
+++ b/quic/core/http/web_transport_stream_adapter.cc
@@ -2,7 +2,10 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
-#include "quic/core/web_transport_stream_adapter.h"
+#include "quic/core/http/web_transport_stream_adapter.h"
+
+#include "quic/core/http/web_transport_http3.h"
+#include "quic/core/quic_error_codes.h"
 
 namespace quic {
 
@@ -110,4 +113,15 @@
   }
 }
 
+void WebTransportStreamAdapter::ResetWithUserCode(
+    WebTransportStreamError error) {
+  stream_->ResetWriteSide(QuicResetStreamError(
+      QUIC_STREAM_CANCELLED, WebTransportErrorToHttp3(error)));
+}
+
+void WebTransportStreamAdapter::SendStopSending(WebTransportStreamError error) {
+  stream_->SendStopSending(QuicResetStreamError(
+      QUIC_STREAM_CANCELLED, WebTransportErrorToHttp3(error)));
+}
+
 }  // namespace quic
diff --git a/quic/core/web_transport_stream_adapter.h b/quic/core/http/web_transport_stream_adapter.h
similarity index 93%
rename from quic/core/web_transport_stream_adapter.h
rename to quic/core/http/web_transport_stream_adapter.h
index 2e4704b..a761f6f 100644
--- a/quic/core/web_transport_stream_adapter.h
+++ b/quic/core/http/web_transport_stream_adapter.h
@@ -8,6 +8,7 @@
 #include "quic/core/quic_session.h"
 #include "quic/core/quic_stream.h"
 #include "quic/core/quic_stream_sequencer.h"
+#include "quic/core/quic_types.h"
 #include "quic/core/web_transport_interface.h"
 
 namespace quic {
@@ -34,12 +35,11 @@
   }
   QuicStreamId GetStreamId() const override { return stream_->id(); }
 
-  void ResetWithUserCode(QuicRstStreamErrorCode error) override {
-    stream_->Reset(error);
-  }
+  void ResetWithUserCode(WebTransportStreamError error) override;
   void ResetDueToInternalError() override {
     stream_->Reset(QUIC_STREAM_INTERNAL_ERROR);
   }
+  void SendStopSending(WebTransportStreamError error) override;
   void MaybeResetDueToStreamObjectGone() override {
     if (stream_->write_side_closed() && stream_->read_side_closed()) {
       return;
diff --git a/quic/core/quic_stream.cc b/quic/core/quic_stream.cc
index 74a5d45..f055bdf 100644
--- a/quic/core/quic_stream.cc
+++ b/quic/core/quic_stream.cc
@@ -609,7 +609,24 @@
 
   if (read_side_closed_ && write_side_closed_ && !IsWaitingForAcks()) {
     session()->MaybeCloseZombieStream(id_);
-    return;
+  }
+}
+
+void QuicStream::ResetWriteSide(QuicResetStreamError error) {
+  stream_error_ = error;
+  MaybeSendRstStream(error);
+
+  if (read_side_closed_ && write_side_closed_ && !IsWaitingForAcks()) {
+    session()->MaybeCloseZombieStream(id_);
+  }
+}
+
+void QuicStream::SendStopSending(QuicResetStreamError error) {
+  stream_error_ = error;
+  MaybeSendStopSending(error);
+
+  if (read_side_closed_ && write_side_closed_ && !IsWaitingForAcks()) {
+    session()->MaybeCloseZombieStream(id_);
   }
 }
 
diff --git a/quic/core/quic_stream.h b/quic/core/quic_stream.h
index f2485b9..dc129a2 100644
--- a/quic/core/quic_stream.h
+++ b/quic/core/quic_stream.h
@@ -180,6 +180,11 @@
   // interface.
   void Reset(QuicRstStreamErrorCode error);
 
+  // Reset() sends both RESET_STREAM and STOP_SENDING; the two methods below
+  // allow to send only one of those.
+  void ResetWriteSide(QuicResetStreamError error);
+  void SendStopSending(QuicResetStreamError error);
+
   // Called by the subclass or the sequencer to close the entire connection from
   // this end.
   void OnUnrecoverableError(QuicErrorCode error,
diff --git a/quic/core/web_transport_interface.h b/quic/core/web_transport_interface.h
index 1240291..102fd1f 100644
--- a/quic/core/web_transport_interface.h
+++ b/quic/core/web_transport_interface.h
@@ -75,9 +75,9 @@
   virtual QuicStreamId GetStreamId() const = 0;
 
   // Resets the stream with the specified error code.
-  // TODO(b/184048994): change the error code type based on IETF consensus.
-  virtual void ResetWithUserCode(QuicRstStreamErrorCode error) = 0;
+  virtual void ResetWithUserCode(WebTransportStreamError error) = 0;
   virtual void ResetDueToInternalError() = 0;
+  virtual void SendStopSending(WebTransportStreamError error) = 0;
   // Called when the owning object has been garbage-collected.
   virtual void MaybeResetDueToStreamObjectGone() = 0;
 
diff --git a/quic/quic_transport/quic_transport_stream.h b/quic/quic_transport/quic_transport_stream.h
index 87ef5fb..5d574af 100644
--- a/quic/quic_transport/quic_transport_stream.h
+++ b/quic/quic_transport/quic_transport_stream.h
@@ -10,11 +10,11 @@
 
 #include "absl/base/attributes.h"
 #include "absl/strings/string_view.h"
+#include "quic/core/http/web_transport_stream_adapter.h"
 #include "quic/core/quic_session.h"
 #include "quic/core/quic_stream.h"
 #include "quic/core/quic_types.h"
 #include "quic/core/web_transport_interface.h"
-#include "quic/core/web_transport_stream_adapter.h"
 #include "quic/quic_transport/quic_transport_session_interface.h"
 
 namespace quic {
@@ -50,12 +50,15 @@
 
   QuicStreamId GetStreamId() const override { return id(); }
 
-  void ResetWithUserCode(QuicRstStreamErrorCode error) override {
-    adapter_.ResetWithUserCode(error);
+  void ResetWithUserCode(WebTransportStreamError /*error*/) override {
+    adapter_.ResetWithUserCode(0);
   }
   void ResetDueToInternalError() override {
     adapter_.ResetDueToInternalError();
   }
+  void SendStopSending(WebTransportStreamError /*error*/) override {
+    adapter_.SendStopSending(0);
+  }
   void MaybeResetDueToStreamObjectGone() override {
     adapter_.MaybeResetDueToStreamObjectGone();
   }
diff --git a/quic/test_tools/quic_test_backend.cc b/quic/test_tools/quic_test_backend.cc
index 66d6769..e0d0a04 100644
--- a/quic/test_tools/quic_test_backend.cc
+++ b/quic/test_tools/quic_test_backend.cc
@@ -14,6 +14,7 @@
 #include "quic/core/quic_simple_buffer_allocator.h"
 #include "quic/core/web_transport_interface.h"
 #include "quic/platform/api/quic_mem_slice.h"
+#include "quic/test_tools/web_transport_resets_backend.h"
 #include "quic/tools/web_transport_test_visitors.h"
 
 namespace quic {
@@ -58,6 +59,9 @@
         std::make_unique<EchoWebTransportSessionVisitor>(session);
     return response;
   }
+  if (path == "/resets") {
+    return WebTransportResetsBackend(request_headers, session);
+  }
 
   WebTransportResponse response;
   response.response_headers[":status"] = "404";
diff --git a/quic/test_tools/web_transport_resets_backend.cc b/quic/test_tools/web_transport_resets_backend.cc
new file mode 100644
index 0000000..512619f
--- /dev/null
+++ b/quic/test_tools/web_transport_resets_backend.cc
@@ -0,0 +1,113 @@
+// Copyright (c) 2021 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quic/test_tools/web_transport_resets_backend.h"
+
+#include <memory>
+
+#include "quic/core/web_transport_interface.h"
+#include "quic/tools/web_transport_test_visitors.h"
+#include "common/quiche_circular_deque.h"
+
+namespace quic {
+namespace test {
+
+namespace {
+
+class ResetsVisitor;
+
+class BidirectionalEchoVisitorWithLogging
+    : public WebTransportBidirectionalEchoVisitor {
+ public:
+  BidirectionalEchoVisitorWithLogging(WebTransportStream* stream,
+                                      ResetsVisitor* session_visitor)
+      : WebTransportBidirectionalEchoVisitor(stream),
+        session_visitor_(session_visitor) {}
+
+  void OnResetStreamReceived(WebTransportStreamError error) override;
+  void OnStopSendingReceived(WebTransportStreamError error) override;
+
+ private:
+  ResetsVisitor* session_visitor_;  // Not owned.
+};
+
+class ResetsVisitor : public WebTransportVisitor {
+ public:
+  ResetsVisitor(WebTransportSession* session) : session_(session) {}
+
+  void OnSessionReady(const spdy::SpdyHeaderBlock& /*headers*/) override {}
+  void OnSessionClosed(WebTransportSessionError /*error_code*/,
+                       const std::string& /*error_message*/) override {}
+
+  void OnIncomingBidirectionalStreamAvailable() override {
+    while (true) {
+      WebTransportStream* stream =
+          session_->AcceptIncomingBidirectionalStream();
+      if (stream == nullptr) {
+        return;
+      }
+      stream->SetVisitor(
+          std::make_unique<BidirectionalEchoVisitorWithLogging>(stream, this));
+      stream->visitor()->OnCanRead();
+    }
+  }
+  void OnIncomingUnidirectionalStreamAvailable() override {}
+
+  void OnDatagramReceived(absl::string_view /*datagram*/) override {}
+
+  void OnCanCreateNewOutgoingBidirectionalStream() override {}
+  void OnCanCreateNewOutgoingUnidirectionalStream() override {
+    MaybeSendLogsBack();
+  }
+
+  void Log(std::string line) {
+    log_.push_back(std::move(line));
+    MaybeSendLogsBack();
+  }
+
+ private:
+  void MaybeSendLogsBack() {
+    while (!log_.empty() &&
+           session_->CanOpenNextOutgoingUnidirectionalStream()) {
+      WebTransportStream* stream = session_->OpenOutgoingUnidirectionalStream();
+      stream->SetVisitor(
+          std::make_unique<WebTransportUnidirectionalEchoWriteVisitor>(
+              stream, log_.front()));
+      log_.pop_front();
+      stream->visitor()->OnCanWrite();
+    }
+  }
+
+  WebTransportSession* session_;  // Not owned.
+  quiche::QuicheCircularDeque<std::string> log_;
+};
+
+void BidirectionalEchoVisitorWithLogging::OnResetStreamReceived(
+    WebTransportStreamError error) {
+  session_visitor_->Log(absl::StrCat("Received reset for stream ",
+                                     stream()->GetStreamId(),
+                                     " with error code ", error));
+  WebTransportBidirectionalEchoVisitor::OnResetStreamReceived(error);
+}
+void BidirectionalEchoVisitorWithLogging::OnStopSendingReceived(
+    WebTransportStreamError error) {
+  session_visitor_->Log(absl::StrCat("Received stop sending for stream ",
+                                     stream()->GetStreamId(),
+                                     " with error code ", error));
+  WebTransportBidirectionalEchoVisitor::OnStopSendingReceived(error);
+}
+
+}  // namespace
+
+QuicSimpleServerBackend::WebTransportResponse WebTransportResetsBackend(
+    const spdy::Http2HeaderBlock& /*request_headers*/,
+    WebTransportSession* session) {
+  QuicSimpleServerBackend::WebTransportResponse response;
+  response.response_headers[":status"] = "200";
+  response.visitor = std::make_unique<ResetsVisitor>(session);
+  return response;
+}
+
+}  // namespace test
+}  // namespace quic
diff --git a/quic/test_tools/web_transport_resets_backend.h b/quic/test_tools/web_transport_resets_backend.h
new file mode 100644
index 0000000..dda06be
--- /dev/null
+++ b/quic/test_tools/web_transport_resets_backend.h
@@ -0,0 +1,23 @@
+// Copyright (c) 2021 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_TEST_TOOLS_WEB_TRANSPORT_RESETS_BACKEND_H_
+#define QUICHE_QUIC_TEST_TOOLS_WEB_TRANSPORT_RESETS_BACKEND_H_
+
+#include "quic/test_tools/quic_test_backend.h"
+
+namespace quic {
+namespace test {
+
+// A backend for testing RESET_STREAM/STOP_SENDING behavior.  Provides
+// bidirectional echo streams; whenever one of those receives RESET_STREAM or
+// STOP_SENDING, a log message is sent as a unidirectional stream.
+QuicSimpleServerBackend::WebTransportResponse WebTransportResetsBackend(
+    const spdy::Http2HeaderBlock& request_headers,
+    WebTransportSession* session);
+
+}  // namespace test
+}  // namespace quic
+
+#endif  // QUICHE_QUIC_TEST_TOOLS_WEB_TRANSPORT_RESETS_BACKEND_H_
diff --git a/quic/tools/web_transport_test_visitors.h b/quic/tools/web_transport_test_visitors.h
index 3bc6471..b1f2246 100644
--- a/quic/tools/web_transport_test_visitors.h
+++ b/quic/tools/web_transport_test_visitors.h
@@ -55,6 +55,10 @@
   }
 
   void OnCanWrite() override {
+    if (stop_sending_received_) {
+      return;
+    }
+
     if (!buffer_.empty()) {
       bool success = stream_->Write(buffer_);
       QUIC_DVLOG(1) << "Attempted writing on WebTransport bidirectional stream "
@@ -73,14 +77,26 @@
     }
   }
 
-  void OnResetStreamReceived(WebTransportStreamError /*error*/) override {}
-  void OnStopSendingReceived(WebTransportStreamError /*error*/) override {}
+  void OnResetStreamReceived(WebTransportStreamError /*error*/) override {
+    // Send FIN in response to a stream reset.  We want to test that we can
+    // operate one side of the stream cleanly while the other is reset, thus
+    // replying with a FIN rather than a RESET_STREAM is more appropriate here.
+    send_fin_ = true;
+    OnCanWrite();
+  }
+  void OnStopSendingReceived(WebTransportStreamError /*error*/) override {
+    stop_sending_received_ = true;
+  }
   void OnWriteSideInDataRecvdState() override {}
 
+ protected:
+  WebTransportStream* stream() { return stream_; }
+
  private:
   WebTransportStream* stream_;
   std::string buffer_;
   bool send_fin_ = false;
+  bool stop_sending_received_ = false;
 };
 
 // Buffers all of the data and calls |callback| with the entirety of the stream