Support unidirectional streams in WebTransport over HTTP/3.

This CL adds support for sending, receiving, buffering, tracking and closing unidirectional streams attached to a WebTransport over HTTP/3 session.

This also fills in some of the missing gaps in the API and testing infrastructure to make this work.

PiperOrigin-RevId: 366108288
Change-Id: I16b779f8905b597b5f948f02a7c37e656ae44659
diff --git a/quic/core/http/end_to_end_test.cc b/quic/core/http/end_to_end_test.cc
index 3ac82a4..8b67c47 100644
--- a/quic/core/http/end_to_end_test.cc
+++ b/quic/core/http/end_to_end_test.cc
@@ -61,6 +61,7 @@
 #include "quic/test_tools/quic_test_client.h"
 #include "quic/test_tools/quic_test_server.h"
 #include "quic/test_tools/quic_test_utils.h"
+#include "quic/test_tools/quic_transport_test_tools.h"
 #include "quic/test_tools/server_thread.h"
 #include "quic/test_tools/simple_session_cache.h"
 #include "quic/tools/quic_backend_response.h"
@@ -77,6 +78,7 @@
 using spdy::SpdySerializedFrame;
 using spdy::SpdySettingsIR;
 using ::testing::_;
+using ::testing::Assign;
 using ::testing::Invoke;
 using ::testing::NiceMock;
 
@@ -709,10 +711,19 @@
     if (wait_for_server_response) {
       client_->WaitUntil(-1,
                          [stream]() { return stream->headers_decompressed(); });
+      EXPECT_TRUE(session->ready());
     }
     return session;
   }
 
+  NiceMock<MockClientVisitor>& SetupWebTransportVisitor(
+      WebTransportHttp3* session) {
+    auto visitor_owned = std::make_unique<NiceMock<MockClientVisitor>>();
+    NiceMock<MockClientVisitor>& visitor = *visitor_owned;
+    session->SetVisitor(std::move(visitor_owned));
+    return visitor;
+  }
+
   ScopedEnvironmentForThreads environment_;
   bool initialized_;
   // If true, the Initialize() function will create |client_| and starts to
@@ -5718,7 +5729,7 @@
   ADD_FAILURE() << "Client should not have 10 resumption tickets.";
 }
 
-TEST_P(EndToEndTest, WebTransportSession) {
+TEST_P(EndToEndTest, WebTransportSessionSetup) {
   enable_web_transport_ = true;
   ASSERT_TRUE(Initialize());
 
@@ -5757,6 +5768,71 @@
   server_thread_->Resume();
 }
 
+TEST_P(EndToEndTest, WebTransportSessionUnidirectionalStream) {
+  enable_web_transport_ = true;
+  ASSERT_TRUE(Initialize());
+
+  if (!version_.UsesHttp3()) {
+    return;
+  }
+
+  WebTransportHttp3* session =
+      CreateWebTransportSession("/echo", /*wait_for_server_response=*/true);
+  ASSERT_TRUE(session != nullptr);
+  NiceMock<MockClientVisitor>& visitor = SetupWebTransportVisitor(session);
+
+  WebTransportStream* outgoing_stream =
+      session->OpenOutgoingUnidirectionalStream();
+  ASSERT_TRUE(outgoing_stream != nullptr);
+  EXPECT_TRUE(outgoing_stream->Write("test"));
+  EXPECT_TRUE(outgoing_stream->SendFin());
+
+  bool stream_received = false;
+  EXPECT_CALL(visitor, OnIncomingUnidirectionalStreamAvailable())
+      .WillOnce(Assign(&stream_received, true));
+  client_->WaitUntil(2000, [&stream_received]() { return stream_received; });
+  EXPECT_TRUE(stream_received);
+  WebTransportStream* received_stream =
+      session->AcceptIncomingUnidirectionalStream();
+  ASSERT_TRUE(received_stream != nullptr);
+  std::string received_data;
+  received_stream->Read(&received_data);
+  EXPECT_EQ(received_data, "test");
+}
+
+TEST_P(EndToEndTest, WebTransportSessionUnidirectionalStreamSentEarly) {
+  enable_web_transport_ = true;
+  SetPacketLossPercentage(30);
+  ASSERT_TRUE(Initialize());
+
+  if (!version_.UsesHttp3()) {
+    return;
+  }
+
+  WebTransportHttp3* session =
+      CreateWebTransportSession("/echo", /*wait_for_server_response=*/false);
+  ASSERT_TRUE(session != nullptr);
+  NiceMock<MockClientVisitor>& visitor = SetupWebTransportVisitor(session);
+
+  WebTransportStream* outgoing_stream =
+      session->OpenOutgoingUnidirectionalStream();
+  ASSERT_TRUE(outgoing_stream != nullptr);
+  EXPECT_TRUE(outgoing_stream->Write("test"));
+  EXPECT_TRUE(outgoing_stream->SendFin());
+
+  bool stream_received = false;
+  EXPECT_CALL(visitor, OnIncomingUnidirectionalStreamAvailable())
+      .WillOnce(Assign(&stream_received, true));
+  client_->WaitUntil(5000, [&stream_received]() { return stream_received; });
+  EXPECT_TRUE(stream_received);
+  WebTransportStream* received_stream =
+      session->AcceptIncomingUnidirectionalStream();
+  ASSERT_TRUE(received_stream != nullptr);
+  std::string received_data;
+  received_stream->Read(&received_data);
+  EXPECT_EQ(received_data, "test");
+}
+
 }  // namespace
 }  // namespace test
 }  // namespace quic
diff --git a/quic/core/http/http_constants.h b/quic/core/http/http_constants.h
index a5409b5..c13a1a8 100644
--- a/quic/core/http/http_constants.h
+++ b/quic/core/http/http_constants.h
@@ -22,6 +22,8 @@
   // https://quicwg.org/base-drafts/draft-ietf-quic-qpack.html#enc-dec-stream-def
   kQpackEncoderStream = 0x02,
   kQpackDecoderStream = 0x03,
+  // https://ietf-wg-webtrans.github.io/draft-ietf-webtrans-http3/draft-ietf-webtrans-http3.html#name-unidirectional-streams
+  kWebTransportUnidirectionalStream = 0x54,
 };
 
 // This includes control stream, QPACK encoder stream, and QPACK decoder stream.
diff --git a/quic/core/http/quic_spdy_client_stream.cc b/quic/core/http/quic_spdy_client_stream.cc
index 931bfb3..d264ba6 100644
--- a/quic/core/http/quic_spdy_client_stream.cc
+++ b/quic/core/http/quic_spdy_client_stream.cc
@@ -10,6 +10,7 @@
 #include "quic/core/http/quic_client_promised_info.h"
 #include "quic/core/http/quic_spdy_client_session.h"
 #include "quic/core/http/spdy_utils.h"
+#include "quic/core/http/web_transport_http3.h"
 #include "quic/core/quic_alarm.h"
 #include "quic/platform/api/quic_logging.h"
 #include "spdy/core/spdy_protocol.h"
@@ -58,6 +59,15 @@
     return;
   }
 
+  if (web_transport() != nullptr) {
+    web_transport()->HeadersReceived(response_headers_);
+    if (!web_transport()->ready()) {
+      // Rejected due to status not being 200, or other reason.
+      WriteOrBufferData("", /*fin=*/true, nullptr);
+      return;
+    }
+  }
+
   if (!ParseHeaderStatusCode(response_headers_, &response_code_)) {
     QUIC_DLOG(ERROR) << "Received invalid response code: "
                      << response_headers_[":status"].as_string()
diff --git a/quic/core/http/quic_spdy_session.cc b/quic/core/http/quic_spdy_session.cc
index 442e1eb..6d2006c 100644
--- a/quic/core/http/quic_spdy_session.cc
+++ b/quic/core/http/quic_spdy_session.cc
@@ -7,6 +7,7 @@
 #include <algorithm>
 #include <cstdint>
 #include <limits>
+#include <memory>
 #include <string>
 #include <utility>
 
@@ -18,6 +19,7 @@
 #include "quic/core/http/http_decoder.h"
 #include "quic/core/http/http_frames.h"
 #include "quic/core/http/quic_headers_stream.h"
+#include "quic/core/http/web_transport_http3.h"
 #include "quic/core/quic_error_codes.h"
 #include "quic/core/quic_types.h"
 #include "quic/core/quic_utils.h"
@@ -55,6 +57,8 @@
 
 namespace quic {
 
+ABSL_CONST_INIT const size_t kMaxUnassociatedWebTransportStreams = 24;
+
 namespace {
 
 #define ENDPOINT \
@@ -1438,18 +1442,37 @@
       }
       return qpack_decoder_receive_stream_;
     }
-    default:
-      if (GetQuicReloadableFlag(quic_unify_stop_sending)) {
-        QUIC_RELOADABLE_FLAG_COUNT(quic_unify_stop_sending);
-        MaybeSendStopSendingFrame(pending->id(),
-                                  QUIC_STREAM_STREAM_CREATION_ERROR);
-      } else {
-        // TODO(renjietang): deprecate SendStopSending() when the flag is
-        // deprecated.
-        SendStopSending(QUIC_STREAM_STREAM_CREATION_ERROR, pending->id());
+    case kWebTransportUnidirectionalStream: {
+      // Note that this checks whether WebTransport is enabled on the receiver
+      // side, as we may receive WebTransport streams before peer's SETTINGS are
+      // received.
+      // TODO(b/184156476): consider whether this means we should drop buffered
+      // streams if we don't receive indication of WebTransport support.
+      if (!WillNegotiateWebTransport()) {
+        // Treat as unknown stream type.
+        break;
       }
-      pending->StopReading();
+      QUIC_DVLOG(1) << ENDPOINT << "Created an incoming WebTransport stream "
+                    << pending->id();
+      auto stream_owned =
+          std::make_unique<WebTransportHttp3UnidirectionalStream>(pending,
+                                                                  this);
+      WebTransportHttp3UnidirectionalStream* stream = stream_owned.get();
+      ActivateStream(std::move(stream_owned));
+      return stream;
+    }
+    default:
+      break;
   }
+  if (GetQuicReloadableFlag(quic_unify_stop_sending)) {
+    QUIC_RELOADABLE_FLAG_COUNT(quic_unify_stop_sending);
+    MaybeSendStopSendingFrame(pending->id(), QUIC_STREAM_STREAM_CREATION_ERROR);
+  } else {
+    // TODO(renjietang): deprecate SendStopSending() when the flag is
+    // deprecated.
+    SendStopSending(QUIC_STREAM_STREAM_CREATION_ERROR, pending->id());
+  }
+  pending->StopReading();
   return nullptr;
 }
 
@@ -1790,6 +1813,77 @@
   streams_waiting_for_settings_.insert(id);
 }
 
+void QuicSpdySession::AssociateIncomingWebTransportStreamWithSession(
+    WebTransportSessionId session_id,
+    QuicStreamId stream_id) {
+  if (QuicUtils::IsOutgoingStreamId(version(), stream_id, perspective())) {
+    QUIC_BUG(AssociateIncomingWebTransportStreamWithSession got outgoing stream)
+        << ENDPOINT
+        << "AssociateIncomingWebTransportStreamWithSession() got an outgoing "
+           "stream ID: "
+        << stream_id;
+    return;
+  }
+  WebTransportHttp3* session = GetWebTransportSession(session_id);
+  if (session != nullptr) {
+    QUIC_DVLOG(1) << ENDPOINT
+                  << "Successfully associated incoming WebTransport stream "
+                  << stream_id << " with session ID " << session_id;
+
+    session->AssociateStream(stream_id);
+    return;
+  }
+  // Evict the oldest streams until we are under the limit.
+  while (buffered_streams_.size() >= kMaxUnassociatedWebTransportStreams) {
+    QUIC_DVLOG(1) << ENDPOINT << "Removing stream "
+                  << buffered_streams_.front().stream_id
+                  << " from buffered streams as the queue is full.";
+    ResetStream(buffered_streams_.front().stream_id,
+                QUIC_STREAM_WEBTRANSPORT_BUFFERED_STREAMS_LIMIT_EXCEEDED);
+    buffered_streams_.pop_front();
+  }
+  QUIC_DVLOG(1) << ENDPOINT << "Received a WebTransport stream " << stream_id
+                << " for session ID " << session_id
+                << " but cannot associate it; buffering instead.";
+  buffered_streams_.push_back(
+      BufferedWebTransportStream{session_id, stream_id});
+}
+
+void QuicSpdySession::ProcessBufferedWebTransportStreamsForSession(
+    WebTransportHttp3* session) {
+  const WebTransportSessionId session_id = session->id();
+  QUIC_DVLOG(1) << "Processing buffered WebTransport streams for "
+                << session_id;
+  auto it = buffered_streams_.begin();
+  while (it != buffered_streams_.end()) {
+    if (it->session_id == session_id) {
+      QUIC_DVLOG(1) << "Unbuffered and associated WebTransport stream "
+                    << it->stream_id << " with session " << it->session_id;
+      session->AssociateStream(it->stream_id);
+      it = buffered_streams_.erase(it);
+    } else {
+      it++;
+    }
+  }
+}
+
+WebTransportHttp3UnidirectionalStream*
+QuicSpdySession::CreateOutgoingUnidirectionalWebTransportStream(
+    WebTransportHttp3* session) {
+  if (!CanOpenNextOutgoingUnidirectionalStream()) {
+    return nullptr;
+  }
+
+  QuicStreamId stream_id = GetNextOutgoingUnidirectionalStreamId();
+  auto stream_owned = std::make_unique<WebTransportHttp3UnidirectionalStream>(
+      stream_id, this, session->id());
+  WebTransportHttp3UnidirectionalStream* stream = stream_owned.get();
+  ActivateStream(std::move(stream_owned));
+  stream->WritePreamble();
+  session->AssociateStream(stream_id);
+  return stream;
+}
+
 #undef ENDPOINT  // undef for jumbo builds
 
 }  // namespace quic
diff --git a/quic/core/http/quic_spdy_session.h b/quic/core/http/quic_spdy_session.h
index 3a9b60e..703c018 100644
--- a/quic/core/http/quic_spdy_session.h
+++ b/quic/core/http/quic_spdy_session.h
@@ -6,6 +6,7 @@
 #define QUICHE_QUIC_CORE_HTTP_QUIC_SPDY_SESSION_H_
 
 #include <cstddef>
+#include <list>
 #include <memory>
 #include <string>
 
@@ -25,10 +26,12 @@
 #include "quic/core/qpack/qpack_encoder_stream_sender.h"
 #include "quic/core/qpack/qpack_receive_stream.h"
 #include "quic/core/qpack/qpack_send_stream.h"
+#include "quic/core/quic_circular_deque.h"
 #include "quic/core/quic_session.h"
 #include "quic/core/quic_time.h"
 #include "quic/core/quic_types.h"
 #include "quic/core/quic_versions.h"
+#include "quic/platform/api/quic_containers.h"
 #include "quic/platform/api/quic_export.h"
 #include "spdy/core/http2_frame_decoder_adapter.h"
 
@@ -38,6 +41,10 @@
 class QuicSpdySessionPeer;
 }  // namespace test
 
+class WebTransportHttp3UnidirectionalStream;
+
+QUIC_EXPORT_PRIVATE extern const size_t kMaxUnassociatedWebTransportStreams;
+
 class QUIC_EXPORT_PRIVATE Http3DebugVisitor {
  public:
   Http3DebugVisitor();
@@ -461,6 +468,24 @@
 
   void OnStreamWaitingForClientSettings(QuicStreamId id);
 
+  // Links the specified stream with a WebTransport session.  If the session is
+  // not present, it is buffered until a corresponding stream is found.
+  void AssociateIncomingWebTransportStreamWithSession(
+      WebTransportSessionId session_id,
+      QuicStreamId stream_id);
+
+  void ProcessBufferedWebTransportStreamsForSession(WebTransportHttp3* session);
+
+  bool CanOpenOutgoingUnidirectionalWebTransportStream(
+      WebTransportSessionId /*id*/) {
+    return CanOpenNextOutgoingUnidirectionalStream();
+  }
+
+  // Creates an outgoing unidirectional WebTransport stream.  Returns nullptr if
+  // the stream cannot be created due to flow control or some other reason.
+  WebTransportHttp3UnidirectionalStream*
+  CreateOutgoingUnidirectionalWebTransportStream(WebTransportHttp3* session);
+
  protected:
   // Override CreateIncomingStream(), CreateOutgoingBidirectionalStream() and
   // CreateOutgoingUnidirectionalStream() with QuicSpdyStream return type to
@@ -536,6 +561,11 @@
 
   class SpdyFramerVisitor;
 
+  struct QUIC_EXPORT_PRIVATE BufferedWebTransportStream {
+    WebTransportSessionId session_id;
+    QuicStreamId stream_id;
+  };
+
   // The following methods are called by the SimpleVisitor.
 
   // Called when a HEADERS frame has been received.
@@ -683,6 +713,11 @@
   // If ShouldBufferRequestsUntilSettings() is true, all streams that are
   // blocked by that are tracked here.
   absl::flat_hash_set<QuicStreamId> streams_waiting_for_settings_;
+
+  // WebTransport streams that do not have a session associated with them.
+  // Limited to kMaxUnassociatedWebTransportStreams; when the list is full,
+  // oldest streams are evicated first.
+  std::list<BufferedWebTransportStream> buffered_streams_;
 };
 
 }  // namespace quic
diff --git a/quic/core/http/quic_spdy_session_test.cc b/quic/core/http/quic_spdy_session_test.cc
index d434931..cf51fd0 100644
--- a/quic/core/http/quic_spdy_session_test.cc
+++ b/quic/core/http/quic_spdy_session_test.cc
@@ -20,6 +20,8 @@
 #include "quic/core/frames/quic_streams_blocked_frame.h"
 #include "quic/core/http/http_constants.h"
 #include "quic/core/http/http_encoder.h"
+#include "quic/core/http/quic_header_list.h"
+#include "quic/core/http/web_transport_http3.h"
 #include "quic/core/qpack/qpack_header_table.h"
 #include "quic/core/quic_config.h"
 #include "quic/core/quic_crypto_stream.h"
@@ -526,6 +528,53 @@
     testing::Mock::VerifyAndClearExpectations(connection_);
   }
 
+  void ReceiveWebTransportSettings() {
+    SettingsFrame settings;
+    settings.values[SETTINGS_H3_DATAGRAM] = 1;
+    settings.values[SETTINGS_WEBTRANS_DRAFT00] = 1;
+    std::string data =
+        std::string(1, kControlStream) + EncodeSettings(settings);
+    QuicStreamId control_stream_id =
+        session_.perspective() == Perspective::IS_SERVER
+            ? GetNthClientInitiatedUnidirectionalStreamId(transport_version(),
+                                                          3)
+            : GetNthServerInitiatedUnidirectionalStreamId(transport_version(),
+                                                          3);
+    QuicStreamFrame frame(control_stream_id, /*fin=*/false, /*offset=*/0, data);
+    session_.OnStreamFrame(frame);
+  }
+
+  void ReceiveWebTransportSession(WebTransportSessionId session_id) {
+    SetQuicReloadableFlag(quic_accept_empty_stream_frame_with_no_fin, true);
+    QuicStreamFrame frame(session_id, /*fin=*/false, /*offset=*/0,
+                          absl::string_view());
+    session_.OnStreamFrame(frame);
+    QuicSpdyStream* stream =
+        static_cast<QuicSpdyStream*>(session_.GetOrCreateStream(session_id));
+    QuicHeaderList headers;
+    headers.OnHeaderBlockStart();
+    headers.OnHeader(":method", "CONNECT");
+    headers.OnHeader(":protocol", "webtransport");
+    stream->OnStreamHeaderList(/*fin=*/true, 0, headers);
+    WebTransportHttp3* web_transport =
+        session_.GetWebTransportSession(session_id);
+    ASSERT_TRUE(web_transport != nullptr);
+    spdy::SpdyHeaderBlock header_block;
+    web_transport->HeadersReceived(header_block);
+  }
+
+  void ReceiveWebTransportUnidirectionalStream(WebTransportSessionId session_id,
+                                               QuicStreamId stream_id) {
+    char buffer[256];
+    QuicDataWriter data_writer(sizeof(buffer), buffer);
+    ASSERT_TRUE(data_writer.WriteVarInt62(kWebTransportUnidirectionalStream));
+    ASSERT_TRUE(data_writer.WriteVarInt62(session_id));
+    ASSERT_TRUE(data_writer.WriteStringPiece("test data"));
+    std::string data(buffer, data_writer.length());
+    QuicStreamFrame frame(stream_id, /*fin=*/false, /*offset=*/0, data);
+    session_.OnStreamFrame(frame);
+  }
+
   MockQuicConnectionHelper helper_;
   MockAlarmFactory alarm_factory_;
   StrictMock<MockQuicConnection>* connection_;
@@ -3566,19 +3615,120 @@
 
   CompleteHandshake();
 
-  SettingsFrame server_settings;
-  server_settings.values[SETTINGS_H3_DATAGRAM] = 1;
-  server_settings.values[SETTINGS_WEBTRANS_DRAFT00] = 1;
-  std::string data =
-      std::string(1, kControlStream) + EncodeSettings(server_settings);
-  QuicStreamId stream_id =
-      GetNthClientInitiatedUnidirectionalStreamId(transport_version(), 3);
-  QuicStreamFrame frame(stream_id, /*fin=*/false, /*offset=*/0, data);
-  session_.OnStreamFrame(frame);
+  ReceiveWebTransportSettings();
   EXPECT_TRUE(session_.SupportsWebTransport());
   EXPECT_TRUE(session_.ShouldProcessIncomingRequests());
 }
 
+TEST_P(QuicSpdySessionTestServer, BufferingIncomingStreams) {
+  if (!version().UsesHttp3()) {
+    return;
+  }
+  SetQuicReloadableFlag(quic_h3_datagram, true);
+  session_.set_supports_webtransport(true);
+
+  CompleteHandshake();
+  QuicStreamId session_id =
+      GetNthClientInitiatedBidirectionalStreamId(transport_version(), 1);
+
+  QuicStreamId data_stream_id =
+      GetNthClientInitiatedUnidirectionalStreamId(transport_version(), 4);
+  ReceiveWebTransportUnidirectionalStream(session_id, data_stream_id);
+
+  ReceiveWebTransportSettings();
+
+  ReceiveWebTransportSession(session_id);
+  WebTransportHttp3* web_transport =
+      session_.GetWebTransportSession(session_id);
+  ASSERT_TRUE(web_transport != nullptr);
+
+  EXPECT_EQ(web_transport->NumberOfAssociatedStreams(), 1u);
+
+  EXPECT_CALL(*connection_, SendControlFrame(_))
+      .WillRepeatedly(Invoke(&ClearControlFrame));
+  EXPECT_CALL(*connection_, OnStreamReset(session_id, _));
+  EXPECT_CALL(
+      *connection_,
+      OnStreamReset(data_stream_id, QUIC_STREAM_WEBTRANSPORT_SESSION_GONE));
+  session_.ResetStream(session_id, QUIC_STREAM_INTERNAL_ERROR);
+}
+
+TEST_P(QuicSpdySessionTestServer, BufferingIncomingStreamsLimit) {
+  if (!version().UsesHttp3()) {
+    return;
+  }
+  SetQuicReloadableFlag(quic_h3_datagram, true);
+  session_.set_supports_webtransport(true);
+
+  CompleteHandshake();
+  QuicStreamId session_id =
+      GetNthClientInitiatedBidirectionalStreamId(transport_version(), 1);
+
+  const int streams_to_send = kMaxUnassociatedWebTransportStreams + 4;
+  EXPECT_CALL(*connection_, SendControlFrame(_))
+      .WillRepeatedly(Invoke(&ClearControlFrame));
+  EXPECT_CALL(*connection_,
+              OnStreamReset(
+                  _, QUIC_STREAM_WEBTRANSPORT_BUFFERED_STREAMS_LIMIT_EXCEEDED))
+      .Times(4);
+  for (int i = 0; i < streams_to_send; i++) {
+    QuicStreamId data_stream_id =
+        GetNthClientInitiatedUnidirectionalStreamId(transport_version(), 4 + i);
+    ReceiveWebTransportUnidirectionalStream(session_id, data_stream_id);
+  }
+
+  ReceiveWebTransportSettings();
+
+  ReceiveWebTransportSession(session_id);
+  WebTransportHttp3* web_transport =
+      session_.GetWebTransportSession(session_id);
+  ASSERT_TRUE(web_transport != nullptr);
+
+  EXPECT_EQ(web_transport->NumberOfAssociatedStreams(),
+            kMaxUnassociatedWebTransportStreams);
+
+  EXPECT_CALL(*connection_, SendControlFrame(_))
+      .WillRepeatedly(Invoke(&ClearControlFrame));
+  EXPECT_CALL(*connection_, OnStreamReset(_, _))
+      .Times(kMaxUnassociatedWebTransportStreams + 1);
+  session_.ResetStream(session_id, QUIC_STREAM_INTERNAL_ERROR);
+}
+
+TEST_P(QuicSpdySessionTestServer, ResetOutgoingWebTransportStreams) {
+  if (!version().UsesHttp3()) {
+    return;
+  }
+  SetQuicReloadableFlag(quic_h3_datagram, true);
+  session_.set_supports_webtransport(true);
+
+  CompleteHandshake();
+  QuicStreamId session_id =
+      GetNthClientInitiatedBidirectionalStreamId(transport_version(), 1);
+
+  ReceiveWebTransportSettings();
+  ReceiveWebTransportSession(session_id);
+  WebTransportHttp3* web_transport =
+      session_.GetWebTransportSession(session_id);
+  ASSERT_TRUE(web_transport != nullptr);
+
+  session_.set_writev_consumes_all_data(true);
+  EXPECT_TRUE(web_transport->CanOpenNextOutgoingUnidirectionalStream());
+  EXPECT_EQ(web_transport->NumberOfAssociatedStreams(), 0u);
+  WebTransportStream* stream =
+      web_transport->OpenOutgoingUnidirectionalStream();
+  EXPECT_EQ(web_transport->NumberOfAssociatedStreams(), 1u);
+  ASSERT_TRUE(stream != nullptr);
+  QuicStreamId stream_id = stream->GetStreamId();
+
+  EXPECT_CALL(*connection_, SendControlFrame(_))
+      .WillRepeatedly(Invoke(&ClearControlFrame));
+  EXPECT_CALL(*connection_, OnStreamReset(session_id, _));
+  EXPECT_CALL(*connection_,
+              OnStreamReset(stream_id, QUIC_STREAM_WEBTRANSPORT_SESSION_GONE));
+  session_.ResetStream(session_id, QUIC_STREAM_INTERNAL_ERROR);
+  EXPECT_EQ(web_transport->NumberOfAssociatedStreams(), 0u);
+}
+
 }  // namespace
 }  // namespace test
 }  // namespace quic
diff --git a/quic/core/http/quic_spdy_stream.cc b/quic/core/http/quic_spdy_stream.cc
index 1302bf2..1a03885 100644
--- a/quic/core/http/quic_spdy_stream.cc
+++ b/quic/core/http/quic_spdy_stream.cc
@@ -857,6 +857,10 @@
     visitor_ = nullptr;
     visitor->OnClose(this);
   }
+
+  if (web_transport_ != nullptr) {
+    web_transport_->CloseAllAssociatedStreams();
+  }
 }
 
 void QuicSpdyStream::OnCanWrite() {
diff --git a/quic/core/http/web_transport_http3.cc b/quic/core/http/web_transport_http3.cc
index 9523d99..dac35f7 100644
--- a/quic/core/http/web_transport_http3.cc
+++ b/quic/core/http/web_transport_http3.cc
@@ -6,11 +6,21 @@
 
 #include <memory>
 
+#include "absl/strings/string_view.h"
 #include "quic/core/http/quic_spdy_session.h"
 #include "quic/core/http/quic_spdy_stream.h"
+#include "quic/core/quic_data_reader.h"
+#include "quic/core/quic_data_writer.h"
+#include "quic/core/quic_stream.h"
+#include "quic/core/quic_types.h"
 #include "quic/core/quic_utils.h"
+#include "quic/core/quic_versions.h"
+#include "quic/platform/api/quic_bug_tracker.h"
 #include "common/platform/api/quiche_logging.h"
 
+#define ENDPOINT \
+  (session_->perspective() == Perspective::IS_SERVER ? "Server: " : "Client: ")
+
 namespace quic {
 
 namespace {
@@ -36,18 +46,68 @@
   QUICHE_DCHECK_EQ(connect_stream_->id(), id);
 }
 
-void WebTransportHttp3::HeadersReceived(
-    const spdy::SpdyHeaderBlock& /*headers*/) {
+void WebTransportHttp3::AssociateStream(QuicStreamId stream_id) {
+  streams_.insert(stream_id);
+
+  ParsedQuicVersion version = session_->version();
+  if (QuicUtils::IsOutgoingStreamId(version, stream_id,
+                                    session_->perspective())) {
+    return;
+  }
+  if (QuicUtils::IsBidirectionalStreamId(stream_id, version)) {
+    incoming_bidirectional_streams_.push_back(stream_id);
+    visitor_->OnIncomingBidirectionalStreamAvailable();
+  } else {
+    incoming_unidirectional_streams_.push_back(stream_id);
+    visitor_->OnIncomingUnidirectionalStreamAvailable();
+  }
+}
+
+void WebTransportHttp3::CloseAllAssociatedStreams() {
+  // Copy the stream list before iterating over it, as calls to ResetStream()
+  // can potentially mutate the |session_| list.
+  std::vector<QuicStreamId> streams(streams_.begin(), streams_.end());
+  streams_.clear();
+  for (QuicStreamId id : streams) {
+    session_->ResetStream(id, QUIC_STREAM_WEBTRANSPORT_SESSION_GONE);
+  }
+}
+
+void WebTransportHttp3::HeadersReceived(const spdy::SpdyHeaderBlock& headers) {
+  if (session_->perspective() == Perspective::IS_CLIENT) {
+    auto it = headers.find(":status");
+    if (it == headers.end() || it->second != "200") {
+      QUIC_DVLOG(1) << ENDPOINT
+                    << "Received WebTransport headers from server without "
+                       "status 200, rejecting.";
+      return;
+    }
+  }
+
+  QUIC_DVLOG(1) << ENDPOINT << "WebTransport session " << id_ << " ready.";
   ready_ = true;
   visitor_->OnSessionReady();
+  session_->ProcessBufferedWebTransportStreamsForSession(this);
 }
 
 WebTransportStream* WebTransportHttp3::AcceptIncomingBidirectionalStream() {
   // TODO(vasilvv): implement this.
   return nullptr;
 }
+
 WebTransportStream* WebTransportHttp3::AcceptIncomingUnidirectionalStream() {
-  // TODO(vasilvv): implement this.
+  while (!incoming_unidirectional_streams_.empty()) {
+    QuicStreamId id = incoming_unidirectional_streams_.front();
+    incoming_unidirectional_streams_.pop_front();
+    QuicStream* stream = session_->GetOrCreateStream(id);
+    if (stream == nullptr) {
+      // Skip the streams that were reset in between the time they were
+      // receieved and the time the client has polled for them.
+      continue;
+    }
+    return static_cast<WebTransportHttp3UnidirectionalStream*>(stream)
+        ->interface();
+  }
   return nullptr;
 }
 
@@ -56,16 +116,21 @@
   return false;
 }
 bool WebTransportHttp3::CanOpenNextOutgoingUnidirectionalStream() {
-  // TODO(vasilvv): implement this.
-  return false;
+  return session_->CanOpenOutgoingUnidirectionalWebTransportStream(id_);
 }
 WebTransportStream* WebTransportHttp3::OpenOutgoingBidirectionalStream() {
   // TODO(vasilvv): implement this.
   return nullptr;
 }
+
 WebTransportStream* WebTransportHttp3::OpenOutgoingUnidirectionalStream() {
-  // TODO(vasilvv): implement this.
-  return nullptr;
+  WebTransportHttp3UnidirectionalStream* stream =
+      session_->CreateOutgoingUnidirectionalWebTransportStream(this);
+  if (stream == nullptr) {
+    // If stream cannot be created due to flow control, return nullptr.
+    return nullptr;
+  }
+  return stream->interface();
 }
 
 MessageStatus WebTransportHttp3::SendOrQueueDatagram(
@@ -78,4 +143,104 @@
   // TODO(vasilvv): implement this.
 }
 
+WebTransportHttp3UnidirectionalStream::WebTransportHttp3UnidirectionalStream(
+    PendingStream* pending,
+    QuicSpdySession* session)
+    : QuicStream(pending, session, READ_UNIDIRECTIONAL, /*is_static=*/false),
+      session_(session),
+      adapter_(session, this, sequencer()),
+      needs_to_send_preamble_(false) {}
+
+WebTransportHttp3UnidirectionalStream::WebTransportHttp3UnidirectionalStream(
+    QuicStreamId id,
+    QuicSpdySession* session,
+    WebTransportSessionId session_id)
+    : QuicStream(id, session, /*is_static=*/false, WRITE_UNIDIRECTIONAL),
+      session_(session),
+      adapter_(session, this, sequencer()),
+      session_id_(session_id),
+      needs_to_send_preamble_(true) {}
+
+void WebTransportHttp3UnidirectionalStream::WritePreamble() {
+  if (!needs_to_send_preamble_ || !session_id_.has_value()) {
+    QUIC_BUG(WebTransportHttp3UnidirectionalStream duplicate preamble)
+        << ENDPOINT << "Sending preamble on stream ID " << id()
+        << " at the wrong time.";
+    OnUnrecoverableError(QUIC_INTERNAL_ERROR,
+                         "Attempting to send a WebTransport unidirectional "
+                         "stream preamble at the wrong time.");
+    return;
+  }
+
+  QuicConnection::ScopedPacketFlusher flusher(session_->connection());
+  char buffer[sizeof(uint64_t) * 2];  // varint62, varint62
+  QuicDataWriter writer(sizeof(buffer), buffer);
+  bool success = true;
+  success = success && writer.WriteVarInt62(kWebTransportUnidirectionalStream);
+  success = success && writer.WriteVarInt62(*session_id_);
+  QUICHE_DCHECK(success);
+  WriteOrBufferData(absl::string_view(buffer, writer.length()), /*fin=*/false,
+                    /*ack_listener=*/nullptr);
+  QUIC_DVLOG(1) << ENDPOINT << "Sent stream type and session ID ("
+                << *session_id_ << ") on WebTransport stream " << id();
+  needs_to_send_preamble_ = false;
+}
+
+bool WebTransportHttp3UnidirectionalStream::ReadSessionId() {
+  iovec iov;
+  if (!sequencer()->GetReadableRegion(&iov)) {
+    return false;
+  }
+  QuicDataReader reader(static_cast<const char*>(iov.iov_base), iov.iov_len);
+  WebTransportSessionId session_id;
+  uint8_t session_id_length = reader.PeekVarInt62Length();
+  if (!reader.ReadVarInt62(&session_id)) {
+    // If all of the data has been received, and we still cannot associate the
+    // stream with a session, consume all of the data so that the stream can
+    // be closed.
+    if (sequencer()->NumBytesConsumed() + sequencer()->NumBytesBuffered() >=
+        sequencer()->close_offset()) {
+      QUIC_DLOG(WARNING)
+          << ENDPOINT << "Failed to associate WebTransport stream " << id()
+          << " with a session because the stream ended prematurely.";
+      sequencer()->MarkConsumed(sequencer()->NumBytesBuffered());
+    }
+    return false;
+  }
+  sequencer()->MarkConsumed(session_id_length);
+  session_id_ = session_id;
+  session_->AssociateIncomingWebTransportStreamWithSession(session_id, id());
+  return true;
+}
+
+void WebTransportHttp3UnidirectionalStream::OnDataAvailable() {
+  if (!session_id_.has_value()) {
+    if (!ReadSessionId()) {
+      return;
+    }
+  }
+
+  adapter_.OnDataAvailable();
+}
+
+void WebTransportHttp3UnidirectionalStream::OnCanWriteNewData() {
+  adapter_.OnCanWriteNewData();
+}
+
+void WebTransportHttp3UnidirectionalStream::OnClose() {
+  QuicStream::OnClose();
+
+  if (!session_id_.has_value()) {
+    return;
+  }
+  WebTransportHttp3* session = session_->GetWebTransportSession(*session_id_);
+  if (session == nullptr) {
+    QUIC_DLOG(WARNING) << ENDPOINT << "WebTransport stream " << id()
+                       << " attempted to notify parent session " << *session_id_
+                       << ", but the session could not be found.";
+    return;
+  }
+  session->OnStreamClosed(id());
+}
+
 }  // namespace quic
diff --git a/quic/core/http/web_transport_http3.h b/quic/core/http/web_transport_http3.h
index b2da0ed..cc622c2 100644
--- a/quic/core/http/web_transport_http3.h
+++ b/quic/core/http/web_transport_http3.h
@@ -7,8 +7,12 @@
 
 #include <memory>
 
+#include "absl/container/flat_hash_set.h"
+#include "absl/types/optional.h"
+#include "quic/core/quic_stream.h"
 #include "quic/core/quic_types.h"
 #include "quic/core/web_transport_interface.h"
+#include "quic/core/web_transport_stream_adapter.h"
 #include "spdy/core/spdy_header_block.h"
 
 namespace quic {
@@ -33,6 +37,13 @@
   }
 
   WebTransportSessionId id() { return id_; }
+  bool ready() { return ready_; }
+
+  void AssociateStream(QuicStreamId stream_id);
+  void OnStreamClosed(QuicStreamId stream_id) { streams_.erase(stream_id); }
+  void CloseAllAssociatedStreams();
+
+  size_t NumberOfAssociatedStreams() { return streams_.size(); }
 
   // Return the earliest incoming stream that has been received by the session
   // but has not been accepted.  Returns nullptr if there are no incoming
@@ -55,6 +66,42 @@
   // |ready_| is set to true when the peer has seen both sets of headers.
   bool ready_ = false;
   std::unique_ptr<WebTransportVisitor> visitor_;
+  absl::flat_hash_set<QuicStreamId> streams_;
+  QuicCircularDeque<QuicStreamId> incoming_bidirectional_streams_;
+  QuicCircularDeque<QuicStreamId> incoming_unidirectional_streams_;
+};
+
+class QUIC_EXPORT_PRIVATE WebTransportHttp3UnidirectionalStream
+    : public QuicStream {
+ public:
+  // Incoming stream.
+  WebTransportHttp3UnidirectionalStream(PendingStream* pending,
+                                        QuicSpdySession* session);
+  // Outgoing stream.
+  WebTransportHttp3UnidirectionalStream(QuicStreamId id,
+                                        QuicSpdySession* session,
+                                        WebTransportSessionId session_id);
+
+  // Sends the stream type and the session ID on the stream.
+  void WritePreamble();
+
+  // Implementation of QuicStream.
+  void OnDataAvailable() override;
+  void OnCanWriteNewData() override;
+  void OnClose() override;
+
+  WebTransportStream* interface() { return &adapter_; }
+  void SetUnblocked() { sequencer()->SetUnblocked(); }
+
+ private:
+  QuicSpdySession* session_;
+  WebTransportStreamAdapter adapter_;
+  absl::optional<WebTransportSessionId> session_id_;
+  bool needs_to_send_preamble_;
+
+  bool ReadSessionId();
+  // Closes the stream if all of the data has been received.
+  void MaybeCloseIncompleteStream();
 };
 
 }  // namespace quic