Clean up logic for closing WebTransport sessions.

This fixes the bug where receiving a FIN without a close capsule would result in a stalled session.  This also adds a test to ensure we receive error codes and messages correctly.

PiperOrigin-RevId: 401065160
diff --git a/quic/core/http/end_to_end_test.cc b/quic/core/http/end_to_end_test.cc
index 0964eff..f5aa312 100644
--- a/quic/core/http/end_to_end_test.cc
+++ b/quic/core/http/end_to_end_test.cc
@@ -6382,6 +6382,71 @@
   EXPECT_TRUE(spdy_stream == nullptr);
 }
 
+TEST_P(EndToEndTest, WebTransportSessionCloseWithoutCapsule) {
+  enable_web_transport_ = true;
+  ASSERT_TRUE(Initialize());
+
+  if (!version_.UsesHttp3()) {
+    return;
+  }
+
+  WebTransportHttp3* session =
+      CreateWebTransportSession("/echo", /*wait_for_server_response=*/true);
+  ASSERT_TRUE(session != nullptr);
+  NiceMock<MockClientVisitor>& visitor = SetupWebTransportVisitor(session);
+
+  WebTransportStream* stream = session->OpenOutgoingBidirectionalStream();
+  ASSERT_TRUE(stream != nullptr);
+  QuicStreamId stream_id = stream->GetStreamId();
+  EXPECT_TRUE(stream->Write("test"));
+  // Keep stream open.
+
+  bool close_received = false;
+  EXPECT_CALL(visitor, OnSessionClosed(0, ""))
+      .WillOnce(Assign(&close_received, true));
+  session->CloseSessionWithFinOnlyForTests();
+  client_->WaitUntil(2000, [&]() { return close_received; });
+  EXPECT_TRUE(close_received);
+
+  QuicSpdyStream* spdy_stream =
+      GetClientSession()->GetOrCreateSpdyDataStream(stream_id);
+  EXPECT_TRUE(spdy_stream == nullptr);
+}
+
+TEST_P(EndToEndTest, WebTransportSessionReceiveClose) {
+  enable_web_transport_ = true;
+  ASSERT_TRUE(Initialize());
+
+  if (!version_.UsesHttp3()) {
+    return;
+  }
+
+  WebTransportHttp3* session = CreateWebTransportSession(
+      "/session-close", /*wait_for_server_response=*/true);
+  ASSERT_TRUE(session != nullptr);
+  NiceMock<MockClientVisitor>& visitor = SetupWebTransportVisitor(session);
+
+  WebTransportStream* stream = session->OpenOutgoingUnidirectionalStream();
+  ASSERT_TRUE(stream != nullptr);
+  QuicStreamId stream_id = stream->GetStreamId();
+  EXPECT_TRUE(stream->Write("42 test error"));
+  EXPECT_TRUE(stream->SendFin());
+
+  // Have some other streams open pending, to ensure they are closed properly.
+  stream = session->OpenOutgoingUnidirectionalStream();
+  stream = session->OpenOutgoingBidirectionalStream();
+
+  bool close_received = false;
+  EXPECT_CALL(visitor, OnSessionClosed(42, "test error"))
+      .WillOnce(Assign(&close_received, true));
+  client_->WaitUntil(2000, [&]() { return close_received; });
+  EXPECT_TRUE(close_received);
+
+  QuicSpdyStream* spdy_stream =
+      GetClientSession()->GetOrCreateSpdyDataStream(stream_id);
+  EXPECT_TRUE(spdy_stream == nullptr);
+}
+
 TEST_P(EndToEndTest, WebTransportSessionStreamTermination) {
   enable_web_transport_ = true;
   ASSERT_TRUE(Initialize());
diff --git a/quic/core/http/quic_spdy_stream.cc b/quic/core/http/quic_spdy_stream.cc
index 36bd22d..98422e0 100644
--- a/quic/core/http/quic_spdy_stream.cc
+++ b/quic/core/http/quic_spdy_stream.cc
@@ -1731,6 +1731,9 @@
   // in the capsule parser.
   if (sequencer()->IsClosed()) {
     capsule_parser_->ErrorIfThereIsRemainingBufferedData();
+    if (web_transport_ != nullptr) {
+      web_transport_->OnConnectStreamFinReceived();
+    }
     OnFinRead();
   }
 }
diff --git a/quic/core/http/web_transport_http3.cc b/quic/core/http/web_transport_http3.cc
index 253e4ee..a54d210 100644
--- a/quic/core/http/web_transport_http3.cc
+++ b/quic/core/http/web_transport_http3.cc
@@ -140,6 +140,33 @@
   MaybeNotifyClose();
 }
 
+void WebTransportHttp3::OnConnectStreamFinReceived() {
+  // If we already received a CLOSE_WEBTRANSPORT_SESSION capsule, we don't need
+  // to do anything about receiving a FIN, since we already sent one in
+  // response.
+  if (close_received_) {
+    return;
+  }
+  close_received_ = true;
+  if (close_sent_) {
+    QUIC_DLOG(INFO) << "Ignoring received FIN as we've already sent our close.";
+    return;
+  }
+
+  connect_stream_->WriteOrBufferBody("", /*fin=*/true);
+  MaybeNotifyClose();
+}
+
+void WebTransportHttp3::CloseSessionWithFinOnlyForTests() {
+  QUICHE_DCHECK(!close_sent_);
+  close_sent_ = true;
+  if (close_received_) {
+    return;
+  }
+
+  connect_stream_->WriteOrBufferBody("", /*fin=*/true);
+}
+
 void WebTransportHttp3::HeadersReceived(const spdy::SpdyHeaderBlock& headers) {
   if (session_->perspective() == Perspective::IS_CLIENT) {
     int status_code;
diff --git a/quic/core/http/web_transport_http3.h b/quic/core/http/web_transport_http3.h
index cd715a1..7a72931 100644
--- a/quic/core/http/web_transport_http3.h
+++ b/quic/core/http/web_transport_http3.h
@@ -57,6 +57,12 @@
                     absl::string_view error_message) override;
   void OnCloseReceived(WebTransportSessionError error_code,
                        absl::string_view error_message);
+  void OnConnectStreamFinReceived();
+
+  // It is legal for WebTransport to be closed without a
+  // CLOSE_WEBTRANSPORT_SESSION capsule.  We always send a capsule, but we still
+  // need to ensure we handle this case correctly.
+  void CloseSessionWithFinOnlyForTests();
 
   // Return the earliest incoming stream that has been received by the session
   // but has not been accepted.  Returns nullptr if there are no incoming
diff --git a/quic/test_tools/quic_test_backend.cc b/quic/test_tools/quic_test_backend.cc
index e0d0a04..3cfe927 100644
--- a/quic/test_tools/quic_test_backend.cc
+++ b/quic/test_tools/quic_test_backend.cc
@@ -20,6 +20,51 @@
 namespace quic {
 namespace test {
 
+namespace {
+
+// SessionCloseVisitor implements the "/session-close" endpoint.  If the client
+// sends a unidirectional stream of format "code message" to this endpoint, it
+// will close the session with the corresponding error code and error message.
+// For instance, sending "42 test error" will cause it to be closed with code 42
+// and message "test error".
+class SessionCloseVisitor : public WebTransportVisitor {
+ public:
+  SessionCloseVisitor(WebTransportSession* session) : session_(session) {}
+
+  void OnSessionReady(const spdy::SpdyHeaderBlock& /*headers*/) override {}
+  void OnSessionClosed(WebTransportSessionError /*error_code*/,
+                       const std::string& /*error_message*/) override {}
+
+  void OnIncomingBidirectionalStreamAvailable() override {}
+  void OnIncomingUnidirectionalStreamAvailable() override {
+    WebTransportStream* stream = session_->AcceptIncomingUnidirectionalStream();
+    if (stream == nullptr) {
+      return;
+    }
+    stream->SetVisitor(
+        std::make_unique<WebTransportUnidirectionalEchoReadVisitor>(
+            stream, [this](const std::string& data) {
+              std::pair<absl::string_view, absl::string_view> parsed =
+                  absl::StrSplit(data, absl::MaxSplits(' ', 1));
+              WebTransportSessionError error_code = 0;
+              bool success = absl::SimpleAtoi(parsed.first, &error_code);
+              QUICHE_DCHECK(success) << data;
+              session_->CloseSession(error_code, parsed.second);
+            }));
+    stream->visitor()->OnCanRead();
+  }
+
+  void OnDatagramReceived(absl::string_view /*datagram*/) override {}
+
+  void OnCanCreateNewOutgoingBidirectionalStream() override {}
+  void OnCanCreateNewOutgoingUnidirectionalStream() override {}
+
+ private:
+  WebTransportSession* session_;  // Not owned.
+};
+
+}  // namespace
+
 QuicSimpleServerBackend::WebTransportResponse
 QuicTestBackend::ProcessWebTransportRequest(
     const spdy::Http2HeaderBlock& request_headers,
@@ -62,6 +107,12 @@
   if (path == "/resets") {
     return WebTransportResetsBackend(request_headers, session);
   }
+  if (path == "/session-close") {
+    WebTransportResponse response;
+    response.response_headers[":status"] = "200";
+    response.visitor = std::make_unique<SessionCloseVisitor>(session);
+    return response;
+  }
 
   WebTransportResponse response;
   response.response_headers[":status"] = "404";