Implement HTTP/3 datagrams within WebTransport

The version of HTTP/3 datagrams here uses the "Datagram-Flow-Id = sf-integer" header definition; however, since the HTTP/3 datagram is currently highly in flux, this is very likely to be not a final version of this.

PiperOrigin-RevId: 367157595
Change-Id: I2f0a6c077c5cf91435809969d98b0fd0e418f0ec
diff --git a/quic/core/http/end_to_end_test.cc b/quic/core/http/end_to_end_test.cc
index 5cb4db2..5e070dc 100644
--- a/quic/core/http/end_to_end_test.cc
+++ b/quic/core/http/end_to_end_test.cc
@@ -5895,6 +5895,36 @@
   EXPECT_EQ(received_data, "test");
 }
 
+TEST_P(EndToEndTest, WebTransportDatagrams) {
+  enable_web_transport_ = true;
+  ASSERT_TRUE(Initialize());
+
+  if (!version_.UsesHttp3()) {
+    return;
+  }
+
+  WebTransportHttp3* session =
+      CreateWebTransportSession("/echo", /*wait_for_server_response=*/true);
+  ASSERT_TRUE(session != nullptr);
+  NiceMock<MockClientVisitor>& visitor = SetupWebTransportVisitor(session);
+
+  SimpleBufferAllocator allocator;
+  for (int i = 0; i < 10; i++) {
+    absl::string_view datagram = "test";
+    auto buffer = MakeUniqueBuffer(&allocator, datagram.size());
+    memcpy(buffer.get(), datagram.data(), datagram.size());
+    QuicMemSlice slice(std::move(buffer), datagram.size());
+    session->SendOrQueueDatagram(std::move(slice));
+  }
+
+  int received = 0;
+  EXPECT_CALL(visitor, OnDatagramReceived(_)).WillRepeatedly([&received]() {
+    received++;
+  });
+  client_->WaitUntil(5000, [&received]() { return received > 0; });
+  EXPECT_GT(received, 0);
+}
+
 }  // namespace
 }  // namespace test
 }  // namespace quic
diff --git a/quic/core/http/quic_spdy_session.cc b/quic/core/http/quic_spdy_session.cc
index 86ddc94..fe75a05 100644
--- a/quic/core/http/quic_spdy_session.cc
+++ b/quic/core/http/quic_spdy_session.cc
@@ -509,8 +509,6 @@
       << "QuicSpdySession use after free. " << destruction_indicator_
       << QuicStackTrace();
   destruction_indicator_ = 987654321;
-  QUIC_BUG_IF(quic_bug_12477_3, !h3_datagram_registrations_.empty())
-      << "HTTP/3 datagram flow ID was not unregistered";
 }
 
 void QuicSpdySession::Initialize() {
@@ -1687,6 +1685,14 @@
       << "Attempted to unregister unknown HTTP/3 flow ID " << flow_id;
 }
 
+void QuicSpdySession::SetMaxTimeInQueueForFlowId(
+    QuicDatagramFlowId /*flow_id*/,
+    QuicTime::Delta max_time_in_queue) {
+  // TODO(b/184598230): implement this in a way that works for multiple sessions
+  // on a same connection.
+  datagram_queue()->SetMaxTimeInQueue(max_time_in_queue);
+}
+
 void QuicSpdySession::OnMessageReceived(absl::string_view message) {
   QuicSession::OnMessageReceived(message);
   if (!h3_datagram_supported_) {
diff --git a/quic/core/http/quic_spdy_session.h b/quic/core/http/quic_spdy_session.h
index f5903b3..e0bf096 100644
--- a/quic/core/http/quic_spdy_session.h
+++ b/quic/core/http/quic_spdy_session.h
@@ -429,6 +429,10 @@
   // Unregister a given HTTP/3 datagram flow ID.
   void UnregisterHttp3FlowId(QuicDatagramFlowId flow_id);
 
+  // Sets max time in queue for a specified datagram flow ID.
+  void SetMaxTimeInQueueForFlowId(QuicDatagramFlowId flow_id,
+                                  QuicTime::Delta max_time_in_queue);
+
   // Override from QuicSession to support HTTP/3 datagrams.
   void OnMessageReceived(absl::string_view message) override;
 
diff --git a/quic/core/http/quic_spdy_session_test.cc b/quic/core/http/quic_spdy_session_test.cc
index 3608946..7c22389 100644
--- a/quic/core/http/quic_spdy_session_test.cc
+++ b/quic/core/http/quic_spdy_session_test.cc
@@ -569,6 +569,8 @@
     headers.OnHeaderBlockStart();
     headers.OnHeader(":method", "CONNECT");
     headers.OnHeader(":protocol", "webtransport");
+    headers.OnHeader("datagram-flow-id",
+                     absl::StrCat(session_.GetNextDatagramFlowId()));
     stream->OnStreamHeaderList(/*fin=*/true, 0, headers);
     WebTransportHttp3* web_transport =
         session_.GetWebTransportSession(session_id);
diff --git a/quic/core/http/quic_spdy_stream.cc b/quic/core/http/quic_spdy_stream.cc
index ac39f48..4d4cc95 100644
--- a/quic/core/http/quic_spdy_stream.cc
+++ b/quic/core/http/quic_spdy_stream.cc
@@ -21,6 +21,7 @@
 #include "quic/core/qpack/qpack_decoder.h"
 #include "quic/core/qpack/qpack_encoder.h"
 #include "quic/core/quic_error_codes.h"
+#include "quic/core/quic_types.h"
 #include "quic/core/quic_utils.h"
 #include "quic/core/quic_versions.h"
 #include "quic/core/quic_write_blocked_list.h"
@@ -1242,6 +1243,7 @@
 
   std::string method;
   std::string protocol;
+  absl::optional<QuicDatagramFlowId> flow_id;
   for (const auto& header : header_list_) {
     const std::string& header_name = header.first;
     const std::string& header_value = header.second;
@@ -1257,14 +1259,25 @@
       }
       protocol = header_value;
     }
+    if (header_name == "datagram-flow-id") {
+      if (flow_id.has_value() || header_value.empty()) {
+        return;
+      }
+      QuicDatagramFlowId flow_id_out;
+      if (!absl::SimpleAtoi(header_value, &flow_id_out)) {
+        return;
+      }
+      flow_id = flow_id_out;
+    }
   }
 
-  if (method != "CONNECT" || protocol != "webtransport") {
+  if (method != "CONNECT" || protocol != "webtransport" ||
+      !flow_id.has_value()) {
     return;
   }
 
   web_transport_ =
-      std::make_unique<WebTransportHttp3>(spdy_session_, this, id());
+      std::make_unique<WebTransportHttp3>(spdy_session_, this, id(), *flow_id);
 }
 
 void QuicSpdyStream::MaybeProcessSentWebTransportHeaders(
@@ -1286,8 +1299,11 @@
     return;
   }
 
+  QuicDatagramFlowId flow_id = spdy_session_->GetNextDatagramFlowId();
+  headers["datagram-flow-id"] = absl::StrCat(flow_id);
+
   web_transport_ =
-      std::make_unique<WebTransportHttp3>(spdy_session_, this, id());
+      std::make_unique<WebTransportHttp3>(spdy_session_, this, id(), flow_id);
 }
 
 void QuicSpdyStream::OnCanWriteNewData() {
diff --git a/quic/core/http/quic_spdy_stream_test.cc b/quic/core/http/quic_spdy_stream_test.cc
index ea62753..bb464bd 100644
--- a/quic/core/http/quic_spdy_stream_test.cc
+++ b/quic/core/http/quic_spdy_stream_test.cc
@@ -3122,6 +3122,7 @@
   spdy::SpdyHeaderBlock headers;
   headers[":method"] = "CONNECT";
   headers[":protocol"] = "webtransport";
+  headers["datagram-flow-id"] = absl::StrCat(session_->GetNextDatagramFlowId());
   stream_->WriteHeaders(std::move(headers), /*fin=*/false, nullptr);
   ASSERT_TRUE(stream_->web_transport() != nullptr);
   EXPECT_EQ(stream_->id(), stream_->web_transport()->id());
@@ -3138,6 +3139,8 @@
 
   headers_[":method"] = "CONNECT";
   headers_[":protocol"] = "webtransport";
+  headers_["datagram-flow-id"] =
+      absl::StrCat(session_->GetNextDatagramFlowId());
 
   stream_->OnStreamHeadersPriority(
       spdy::SpdyStreamPrecedence(kV3HighestPriority));
diff --git a/quic/core/http/web_transport_http3.cc b/quic/core/http/web_transport_http3.cc
index 3d1812e..2a105c2 100644
--- a/quic/core/http/web_transport_http3.cc
+++ b/quic/core/http/web_transport_http3.cc
@@ -36,14 +36,17 @@
 
 WebTransportHttp3::WebTransportHttp3(QuicSpdySession* session,
                                      QuicSpdyStream* connect_stream,
-                                     WebTransportSessionId id)
+                                     WebTransportSessionId id,
+                                     QuicDatagramFlowId flow_id)
     : session_(session),
       connect_stream_(connect_stream),
       id_(id),
+      flow_id_(flow_id),
       visitor_(std::make_unique<NoopWebTransportVisitor>()) {
   QUICHE_DCHECK(session_->SupportsWebTransport());
   QUICHE_DCHECK(IsValidWebTransportSessionId(id, session_->version()));
   QUICHE_DCHECK_EQ(connect_stream_->id(), id);
+  session_->RegisterHttp3FlowId(flow_id, this);
 }
 
 void WebTransportHttp3::AssociateStream(QuicStreamId stream_id) {
@@ -71,6 +74,7 @@
   for (QuicStreamId id : streams) {
     session_->ResetStream(id, QUIC_STREAM_WEBTRANSPORT_SESSION_GONE);
   }
+  session_->UnregisterHttp3FlowId(flow_id_);
 }
 
 void WebTransportHttp3::HeadersReceived(const spdy::SpdyHeaderBlock& headers) {
@@ -148,14 +152,20 @@
   return stream->interface();
 }
 
-MessageStatus WebTransportHttp3::SendOrQueueDatagram(
-    QuicMemSlice /*datagram*/) {
-  // TODO(vasilvv): implement this.
-  return MessageStatus::MESSAGE_STATUS_UNSUPPORTED;
+MessageStatus WebTransportHttp3::SendOrQueueDatagram(QuicMemSlice datagram) {
+  return session_->SendHttp3Datagram(
+      flow_id_, absl::string_view(datagram.data(), datagram.length()));
 }
+
 void WebTransportHttp3::SetDatagramMaxTimeInQueue(
-    QuicTime::Delta /*max_time_in_queue*/) {
-  // TODO(vasilvv): implement this.
+    QuicTime::Delta max_time_in_queue) {
+  session_->SetMaxTimeInQueueForFlowId(flow_id_, max_time_in_queue);
+}
+
+void WebTransportHttp3::OnHttp3Datagram(QuicDatagramFlowId flow_id,
+                                        absl::string_view payload) {
+  QUICHE_DCHECK_EQ(flow_id, flow_id_);
+  visitor_->OnDatagramReceived(payload);
 }
 
 WebTransportHttp3UnidirectionalStream::WebTransportHttp3UnidirectionalStream(
diff --git a/quic/core/http/web_transport_http3.h b/quic/core/http/web_transport_http3.h
index cc622c2..0c91c2b 100644
--- a/quic/core/http/web_transport_http3.h
+++ b/quic/core/http/web_transport_http3.h
@@ -9,6 +9,7 @@
 
 #include "absl/container/flat_hash_set.h"
 #include "absl/types/optional.h"
+#include "quic/core/http/quic_spdy_session.h"
 #include "quic/core/quic_stream.h"
 #include "quic/core/quic_types.h"
 #include "quic/core/web_transport_interface.h"
@@ -25,11 +26,14 @@
 //
 // WebTransport over HTTP/3 specification:
 // <https://datatracker.ietf.org/doc/html/draft-ietf-webtrans-http3>
-class QUIC_EXPORT_PRIVATE WebTransportHttp3 : public WebTransportSession {
+class QUIC_EXPORT_PRIVATE WebTransportHttp3
+    : public WebTransportSession,
+      public QuicSpdySession::Http3DatagramVisitor {
  public:
   WebTransportHttp3(QuicSpdySession* session,
                     QuicSpdyStream* connect_stream,
-                    WebTransportSessionId id);
+                    WebTransportSessionId id,
+                    QuicDatagramFlowId flow_id);
 
   void HeadersReceived(const spdy::SpdyHeaderBlock& headers);
   void SetVisitor(std::unique_ptr<WebTransportVisitor> visitor) {
@@ -59,10 +63,14 @@
   MessageStatus SendOrQueueDatagram(QuicMemSlice datagram) override;
   void SetDatagramMaxTimeInQueue(QuicTime::Delta max_time_in_queue) override;
 
+  void OnHttp3Datagram(QuicDatagramFlowId flow_id,
+                       absl::string_view payload) override;
+
  private:
   QuicSpdySession* const session_;        // Unowned.
   QuicSpdyStream* const connect_stream_;  // Unowned.
   const WebTransportSessionId id_;
+  const QuicDatagramFlowId flow_id_;
   // |ready_| is set to true when the peer has seen both sets of headers.
   bool ready_ = false;
   std::unique_ptr<WebTransportVisitor> visitor_;