Implement client-initiated bidirectional streams in WebTransport over HTTP/3.

PiperOrigin-RevId: 366912320
Change-Id: Ia8e8cab56eb8a573941ac9f5cf467d609b4dd744
diff --git a/quic/core/http/end_to_end_test.cc b/quic/core/http/end_to_end_test.cc
index 90b4f5b..6a83d2e 100644
--- a/quic/core/http/end_to_end_test.cc
+++ b/quic/core/http/end_to_end_test.cc
@@ -724,6 +724,33 @@
     return visitor;
   }
 
+  std::string ReadDataFromWebTransportStreamUntilFin(
+      WebTransportStream* stream) {
+    std::string buffer;
+    while (true) {
+      bool can_read = false;
+      auto visitor = std::make_unique<MockStreamVisitor>();
+      EXPECT_CALL(*visitor, OnCanRead()).WillOnce(Assign(&can_read, true));
+      stream->SetVisitor(std::move(visitor));
+      client_->WaitUntil(5000 /*ms*/, [&can_read]() { return can_read; });
+      if (!can_read) {
+        ADD_FAILURE() << "Waiting for readable data on stream "
+                      << stream->GetStreamId() << " timed out";
+        return buffer;
+      }
+
+      WebTransportStream::ReadResult result = stream->Read(&buffer);
+      if (result.fin) {
+        return buffer;
+      }
+      if (result.bytes_read == 0) {
+        ADD_FAILURE() << "No progress made while reading from stream "
+                      << stream->GetStreamId();
+        return buffer;
+      }
+    }
+  }
+
   ScopedEnvironmentForThreads environment_;
   bool initialized_;
   // If true, the Initialize() function will create |client_| and starts to
@@ -5802,6 +5829,49 @@
   EXPECT_TRUE(result.fin);
 }
 
+TEST_P(EndToEndTest, WebTransportSessionBidirectionalStream) {
+  enable_web_transport_ = true;
+  ASSERT_TRUE(Initialize());
+
+  if (!version_.UsesHttp3()) {
+    return;
+  }
+
+  WebTransportHttp3* session =
+      CreateWebTransportSession("/echo", /*wait_for_server_response=*/true);
+  ASSERT_TRUE(session != nullptr);
+
+  WebTransportStream* stream = session->OpenOutgoingBidirectionalStream();
+  ASSERT_TRUE(stream != nullptr);
+  EXPECT_TRUE(stream->Write("test"));
+  EXPECT_TRUE(stream->SendFin());
+
+  std::string received_data = ReadDataFromWebTransportStreamUntilFin(stream);
+  EXPECT_EQ(received_data, "test");
+}
+
+TEST_P(EndToEndTest, WebTransportSessionBidirectionalStreamWithBuffering) {
+  enable_web_transport_ = true;
+  SetPacketLossPercentage(30);
+  ASSERT_TRUE(Initialize());
+
+  if (!version_.UsesHttp3()) {
+    return;
+  }
+
+  WebTransportHttp3* session =
+      CreateWebTransportSession("/echo", /*wait_for_server_response=*/false);
+  ASSERT_TRUE(session != nullptr);
+
+  WebTransportStream* stream = session->OpenOutgoingBidirectionalStream();
+  ASSERT_TRUE(stream != nullptr);
+  EXPECT_TRUE(stream->Write("test"));
+  EXPECT_TRUE(stream->SendFin());
+
+  std::string received_data = ReadDataFromWebTransportStreamUntilFin(stream);
+  EXPECT_EQ(received_data, "test");
+}
+
 }  // namespace
 }  // namespace test
 }  // namespace quic
diff --git a/quic/core/http/http_decoder.cc b/quic/core/http/http_decoder.cc
index 6a5eee0..3544cfd 100644
--- a/quic/core/http/http_decoder.cc
+++ b/quic/core/http/http_decoder.cc
@@ -5,6 +5,7 @@
 #include "quic/core/http/http_decoder.h"
 
 #include <cstdint>
+#include <limits>
 
 #include "absl/base/attributes.h"
 #include "absl/strings/string_view.h"
@@ -20,8 +21,10 @@
 
 namespace quic {
 
-HttpDecoder::HttpDecoder(Visitor* visitor)
+HttpDecoder::HttpDecoder(Visitor* visitor) : HttpDecoder(visitor, Options()) {}
+HttpDecoder::HttpDecoder(Visitor* visitor, Options options)
     : visitor_(visitor),
+      allow_web_transport_stream_(options.allow_web_transport_stream),
       state_(STATE_READING_FRAME_TYPE),
       current_frame_type_(0),
       current_length_field_length_(0),
@@ -108,6 +111,15 @@
       case STATE_FINISH_PARSING:
         continue_processing = FinishParsing(&reader);
         break;
+      case STATE_PARSING_NO_LONGER_POSSIBLE:
+        continue_processing = false;
+        QUIC_BUG(HttpDecoder PARSING_NO_LONGER_POSSIBLE)
+            << "HttpDecoder called after an indefinite-length frame has been "
+               "received";
+        RaiseError(QUIC_INTERNAL_ERROR,
+                   "HttpDecoder called after an indefinite-length frame has "
+                   "been received");
+        break;
       case STATE_ERROR:
         break;
       default:
@@ -192,6 +204,19 @@
     QUICHE_DCHECK(success);
   }
 
+  // WEBTRANSPORT_STREAM frames are indefinitely long, and thus require
+  // special handling; the number after the frame type is actually the
+  // WebTransport session ID, and not the length.
+  if (allow_web_transport_stream_ &&
+      current_frame_type_ ==
+          static_cast<uint64_t>(HttpFrameType::WEBTRANSPORT_STREAM)) {
+    visitor_->OnWebTransportStreamFrameType(
+        current_length_field_length_ + current_type_field_length_,
+        current_frame_length_);
+    state_ = STATE_PARSING_NO_LONGER_POSSIBLE;
+    return false;
+  }
+
   if (current_frame_length_ > MaxFrameLength(current_frame_type_)) {
     RaiseError(QUIC_HTTP_FRAME_TOO_LARGE, "Frame is too large.");
     return false;
diff --git a/quic/core/http/http_decoder.h b/quic/core/http/http_decoder.h
index 30ae932..a351415 100644
--- a/quic/core/http/http_decoder.h
+++ b/quic/core/http/http_decoder.h
@@ -27,6 +27,11 @@
 // session.
 class QUIC_EXPORT_PRIVATE HttpDecoder {
  public:
+  struct QUIC_EXPORT_PRIVATE Options {
+    // Indicates that WEBTRANSPORT_STREAM should be parsed.
+    bool allow_web_transport_stream = false;
+  };
+
   class QUIC_EXPORT_PRIVATE Visitor {
    public:
     virtual ~Visitor() {}
@@ -109,6 +114,15 @@
     // Called when an ACCEPT_CH frame has been successfully parsed.
     virtual bool OnAcceptChFrame(const AcceptChFrame& frame) = 0;
 
+    // Called when a WEBTRANSPORT_STREAM frame type and the session ID varint
+    // immediately following it has been received.  Any further parsing should
+    // be done by the stream itself, and not the parser. Note that this does not
+    // return bool, because WEBTRANSPORT_STREAM always causes the parsing
+    // process to cease.
+    virtual void OnWebTransportStreamFrameType(
+        QuicByteCount header_length,
+        WebTransportSessionId session_id) = 0;
+
     // Called when a frame of unknown type |frame_type| has been received.
     // Frame type might be reserved, Visitor must make sure to ignore.
     // |header_length| and |payload_length| are the length of the frame header
@@ -126,6 +140,7 @@
 
   // |visitor| must be non-null, and must outlive HttpDecoder.
   explicit HttpDecoder(Visitor* visitor);
+  explicit HttpDecoder(Visitor* visitor, Options options);
 
   ~HttpDecoder();
 
@@ -162,6 +177,7 @@
     STATE_READING_FRAME_TYPE,
     STATE_READING_FRAME_PAYLOAD,
     STATE_FINISH_PARSING,
+    STATE_PARSING_NO_LONGER_POSSIBLE,
     STATE_ERROR
   };
 
@@ -241,6 +257,8 @@
 
   // Visitor to invoke when messages are parsed.
   Visitor* const visitor_;  // Unowned.
+  // Whether WEBTRANSPORT_STREAM should be parsed.
+  bool allow_web_transport_stream_;
   // Current state of the parsing.
   HttpDecoderState state_;
   // Type of the frame currently being parsed.
diff --git a/quic/core/http/http_decoder_test.cc b/quic/core/http/http_decoder_test.cc
index 4f518a4..aac020a 100644
--- a/quic/core/http/http_decoder_test.cc
+++ b/quic/core/http/http_decoder_test.cc
@@ -15,6 +15,7 @@
 #include "quic/core/http/http_frames.h"
 #include "quic/core/quic_data_writer.h"
 #include "quic/core/quic_versions.h"
+#include "quic/platform/api/quic_expect_bug.h"
 #include "quic/platform/api/quic_flags.h"
 #include "quic/platform/api/quic_test.h"
 #include "quic/test_tools/quic_test_utils.h"
@@ -109,6 +110,10 @@
               (QuicByteCount header_length),
               (override));
   MOCK_METHOD(bool, OnAcceptChFrame, (const AcceptChFrame& frame), (override));
+  MOCK_METHOD(void,
+              OnWebTransportStreamFrameType,
+              (QuicByteCount header_length, WebTransportSessionId session_id),
+              (override));
 
   MOCK_METHOD(bool,
               OnUnknownFrameStart,
@@ -1301,6 +1306,43 @@
   EXPECT_EQ("", decoder_.error_detail());
 }
 
+TEST_F(HttpDecoderTest, WebTransportStreamDisabled) {
+  InSequence s;
+
+  // Unknown frame of type 0x41 and length 0x104.
+  std::string input = absl::HexStringToBytes("40414104");
+  EXPECT_CALL(visitor_, OnUnknownFrameStart(0x41, input.size(), 0x104));
+  EXPECT_EQ(ProcessInput(input), input.size());
+}
+
+TEST(HttpDecoderTestNoFixture, WebTransportStream) {
+  HttpDecoder::Options options;
+  options.allow_web_transport_stream = true;
+  testing::StrictMock<MockVisitor> visitor;
+  HttpDecoder decoder(&visitor, options);
+
+  // WebTransport stream for session ID 0x104, with four bytes of extra data.
+  std::string input = absl::HexStringToBytes("40414104ffffffff");
+  EXPECT_CALL(visitor, OnWebTransportStreamFrameType(4, 0x104));
+  QuicByteCount bytes = decoder.ProcessInput(input.data(), input.size());
+  EXPECT_EQ(bytes, 4u);
+}
+
+TEST(HttpDecoderTestNoFixture, WebTransportStreamError) {
+  HttpDecoder::Options options;
+  options.allow_web_transport_stream = true;
+  testing::StrictMock<MockVisitor> visitor;
+  HttpDecoder decoder(&visitor, options);
+
+  std::string input = absl::HexStringToBytes("404100");
+  EXPECT_CALL(visitor, OnWebTransportStreamFrameType(_, _));
+  decoder.ProcessInput(input.data(), input.size());
+
+  EXPECT_CALL(visitor, OnError(_));
+  EXPECT_QUIC_BUG(decoder.ProcessInput(input.data(), input.size()),
+                  "HttpDecoder called after an indefinite-length frame");
+}
+
 TEST_F(HttpDecoderTest, DecodeSettings) {
   std::string input = absl::HexStringToBytes(
       "04"    // type (SETTINGS)
diff --git a/quic/core/http/http_encoder.cc b/quic/core/http/http_encoder.cc
index 1fa215c..2fbc238 100644
--- a/quic/core/http/http_encoder.cc
+++ b/quic/core/http/http_encoder.cc
@@ -248,4 +248,25 @@
   return 0;
 }
 
+QuicByteCount HttpEncoder::SerializeWebTransportStreamFrameHeader(
+    WebTransportSessionId session_id,
+    std::unique_ptr<char[]>* output) {
+  uint64_t stream_type =
+      static_cast<uint64_t>(HttpFrameType::WEBTRANSPORT_STREAM);
+  QuicByteCount header_length = QuicDataWriter::GetVarInt62Len(stream_type) +
+                                QuicDataWriter::GetVarInt62Len(session_id);
+
+  *output = std::make_unique<char[]>(header_length);
+  QuicDataWriter writer(header_length, output->get());
+  bool success =
+      writer.WriteVarInt62(stream_type) && writer.WriteVarInt62(session_id);
+  if (success && writer.remaining() == 0) {
+    return header_length;
+  }
+
+  QUIC_DLOG(ERROR) << "Http encoder failed when attempting to serialize "
+                      "WEBTRANSPORT_STREAM frame header.";
+  return 0;
+}
+
 }  // namespace quic
diff --git a/quic/core/http/http_encoder.h b/quic/core/http/http_encoder.h
index b6cb9ad..0a8aa84 100644
--- a/quic/core/http/http_encoder.h
+++ b/quic/core/http/http_encoder.h
@@ -8,6 +8,7 @@
 #include <memory>
 #include "quic/core/http/http_frames.h"
 #include "quic/core/quic_error_codes.h"
+#include "quic/core/quic_types.h"
 #include "quic/platform/api/quic_export.h"
 
 namespace quic {
@@ -56,6 +57,12 @@
   // Serializes a frame with reserved frame type specified in
   // https://tools.ietf.org/html/draft-ietf-quic-http-25#section-7.2.9.
   static QuicByteCount SerializeGreasingFrame(std::unique_ptr<char[]>* output);
+
+  // Serializes a WEBTRANSPORT_STREAM frame header as specified in
+  // https://www.ietf.org/archive/id/draft-ietf-webtrans-http3-00.html#name-client-initiated-bidirectio
+  static QuicByteCount SerializeWebTransportStreamFrameHeader(
+      WebTransportSessionId session_id,
+      std::unique_ptr<char[]>* output);
 };
 
 }  // namespace quic
diff --git a/quic/core/http/http_encoder_test.cc b/quic/core/http/http_encoder_test.cc
index c59dd85..66e988f 100644
--- a/quic/core/http/http_encoder_test.cc
+++ b/quic/core/http/http_encoder_test.cc
@@ -123,5 +123,18 @@
                                               output2, ABSL_ARRAYSIZE(output2));
 }
 
+TEST(HttpEncoderTest, SerializeWebTransportStreamFrameHeader) {
+  WebTransportSessionId session_id = 0x17;
+  char output[] = {0x40, 0x41,  // type (WEBTRANSPORT_STREAM)
+                   0x17};       // session ID
+
+  std::unique_ptr<char[]> buffer;
+  uint64_t length =
+      HttpEncoder::SerializeWebTransportStreamFrameHeader(session_id, &buffer);
+  EXPECT_EQ(sizeof(output), length);
+  quiche::test::CompareCharArraysWithHexError(
+      "WEBTRANSPORT_STREAM", buffer.get(), length, output, sizeof(output));
+}
+
 }  // namespace test
 }  // namespace quic
diff --git a/quic/core/http/http_frames.h b/quic/core/http/http_frames.h
index 0d55857..56e5348 100644
--- a/quic/core/http/http_frames.h
+++ b/quic/core/http/http_frames.h
@@ -34,6 +34,8 @@
   ACCEPT_CH = 0x89,
   // https://tools.ietf.org/html/draft-ietf-httpbis-priority-02
   PRIORITY_UPDATE_REQUEST_STREAM = 0xF0700,
+  // https://www.ietf.org/archive/id/draft-ietf-webtrans-http3-00.html
+  WEBTRANSPORT_STREAM = 0x41,
 };
 
 // 7.2.1.  DATA
diff --git a/quic/core/http/quic_receive_control_stream.cc b/quic/core/http/quic_receive_control_stream.cc
index ffc38a0..fbd7b2e 100644
--- a/quic/core/http/quic_receive_control_stream.cc
+++ b/quic/core/http/quic_receive_control_stream.cc
@@ -233,6 +233,13 @@
   return true;
 }
 
+void QuicReceiveControlStream::OnWebTransportStreamFrameType(
+    QuicByteCount /*header_length*/,
+    WebTransportSessionId /*session_id*/) {
+  QUIC_BUG(WEBTRANSPORT_STREAM on Control Stream)
+      << "Parsed WEBTRANSPORT_STREAM on a control stream.";
+}
+
 bool QuicReceiveControlStream::OnUnknownFrameStart(
     uint64_t frame_type,
     QuicByteCount /*header_length*/,
diff --git a/quic/core/http/quic_receive_control_stream.h b/quic/core/http/quic_receive_control_stream.h
index 71f0e05..4ed0104 100644
--- a/quic/core/http/quic_receive_control_stream.h
+++ b/quic/core/http/quic_receive_control_stream.h
@@ -58,6 +58,8 @@
   bool OnPriorityUpdateFrame(const PriorityUpdateFrame& frame) override;
   bool OnAcceptChFrameStart(QuicByteCount header_length) override;
   bool OnAcceptChFrame(const AcceptChFrame& frame) override;
+  void OnWebTransportStreamFrameType(QuicByteCount header_length,
+                                     WebTransportSessionId session_id) override;
   bool OnUnknownFrameStart(uint64_t frame_type,
                            QuicByteCount header_length,
                            QuicByteCount payload_length) override;
diff --git a/quic/core/http/quic_spdy_session.cc b/quic/core/http/quic_spdy_session.cc
index 6a56cba..86ddc94 100644
--- a/quic/core/http/quic_spdy_session.cc
+++ b/quic/core/http/quic_spdy_session.cc
@@ -160,6 +160,11 @@
     session_->OnAcceptChFrameReceivedViaAlps(frame);
     return true;
   }
+  void OnWebTransportStreamFrameType(
+      QuicByteCount /*header_length*/,
+      WebTransportSessionId /*session_id*/) override {
+    QUICHE_NOTREACHED();
+  }
   bool OnUnknownFrameStart(uint64_t /*frame_type*/,
                            QuicByteCount
                            /*header_length*/,
@@ -1810,6 +1815,23 @@
   return stream;
 }
 
+QuicSpdyStream* QuicSpdySession::CreateOutgoingBidirectionalWebTransportStream(
+    WebTransportHttp3* session) {
+  QuicSpdyStream* stream = CreateOutgoingBidirectionalStream();
+  if (stream == nullptr) {
+    return nullptr;
+  }
+  QuicStreamId stream_id = stream->id();
+  stream->ConvertToWebTransportDataStream(session->id());
+  if (stream->web_transport_stream() == nullptr) {
+    // An error in ConvertToWebTransportDataStream() would result in
+    // CONNECTION_CLOSE, thus we don't need to do anything here.
+    return nullptr;
+  }
+  session->AssociateStream(stream_id);
+  return stream;
+}
+
 #undef ENDPOINT  // undef for jumbo builds
 
 }  // namespace quic
diff --git a/quic/core/http/quic_spdy_session.h b/quic/core/http/quic_spdy_session.h
index cc99b5b..f5903b3 100644
--- a/quic/core/http/quic_spdy_session.h
+++ b/quic/core/http/quic_spdy_session.h
@@ -435,6 +435,10 @@
   // Indicates whether the HTTP/3 session supports WebTransport.
   bool SupportsWebTransport();
 
+  // Indicates whether the HTTP/3 session will indicate WebTransport support to
+  // the peer.
+  bool WillNegotiateWebTransport();
+
   // Returns a WebTransport session by its session ID.  Returns nullptr if no
   // session is associated with the given ID.
   WebTransportHttp3* GetWebTransportSession(WebTransportSessionId id);
@@ -465,12 +469,23 @@
       WebTransportSessionId /*id*/) {
     return CanOpenNextOutgoingUnidirectionalStream();
   }
+  bool CanOpenOutgoingBidirectionalWebTransportStream(
+      WebTransportSessionId /*id*/) {
+    return CanOpenNextOutgoingBidirectionalStream();
+  }
 
   // Creates an outgoing unidirectional WebTransport stream.  Returns nullptr if
   // the stream cannot be created due to flow control or some other reason.
   WebTransportHttp3UnidirectionalStream*
   CreateOutgoingUnidirectionalWebTransportStream(WebTransportHttp3* session);
 
+  // Creates an outgoing bidirectional WebTransport stream.  Returns nullptr if
+  // the stream cannot be created due to flow control or some other reason.
+  QuicSpdyStream* CreateOutgoingBidirectionalWebTransportStream(
+      WebTransportHttp3* session);
+
+  QuicSpdyStream* GetOrCreateSpdyDataStream(const QuicStreamId stream_id);
+
  protected:
   // Override CreateIncomingStream(), CreateOutgoingBidirectionalStream() and
   // CreateOutgoingUnidirectionalStream() with QuicSpdyStream return type to
@@ -480,8 +495,6 @@
   virtual QuicSpdyStream* CreateOutgoingBidirectionalStream() = 0;
   virtual QuicSpdyStream* CreateOutgoingUnidirectionalStream() = 0;
 
-  QuicSpdyStream* GetOrCreateSpdyDataStream(const QuicStreamId stream_id);
-
   // If an incoming stream can be created, return true.
   virtual bool ShouldCreateIncomingStream(QuicStreamId id) = 0;
 
@@ -493,7 +506,6 @@
   // Indicates whether the underlying backend can accept and process
   // WebTransport sessions over HTTP/3.
   virtual bool ShouldNegotiateWebTransport();
-  bool WillNegotiateWebTransport();
 
   // Returns true if there are open HTTP requests.
   bool ShouldKeepConnectionAlive() const override;
diff --git a/quic/core/http/quic_spdy_stream.cc b/quic/core/http/quic_spdy_stream.cc
index 00c631b..e2df8f5 100644
--- a/quic/core/http/quic_spdy_stream.cc
+++ b/quic/core/http/quic_spdy_stream.cc
@@ -19,6 +19,7 @@
 #include "quic/core/http/web_transport_http3.h"
 #include "quic/core/qpack/qpack_decoder.h"
 #include "quic/core/qpack/qpack_encoder.h"
+#include "quic/core/quic_error_codes.h"
 #include "quic/core/quic_utils.h"
 #include "quic/core/quic_versions.h"
 #include "quic/core/quic_write_blocked_list.h"
@@ -166,6 +167,12 @@
     return false;
   }
 
+  void OnWebTransportStreamFrameType(
+      QuicByteCount header_length,
+      WebTransportSessionId session_id) override {
+    stream_->OnWebTransportStreamFrameType(header_length, session_id);
+  }
+
   bool OnUnknownFrameStart(uint64_t frame_type,
                            QuicByteCount header_length,
                            QuicByteCount payload_length) override {
@@ -194,6 +201,16 @@
                                                       : "Client:"  \
                                                         " ")
 
+namespace {
+HttpDecoder::Options HttpDecoderOptionsForBidiStream(
+    QuicSpdySession* spdy_session) {
+  HttpDecoder::Options options;
+  options.allow_web_transport_stream =
+      spdy_session->WillNegotiateWebTransport();
+  return options;
+}
+}  // namespace
+
 QuicSpdyStream::QuicSpdyStream(QuicStreamId id,
                                QuicSpdySession* spdy_session,
                                StreamType type)
@@ -208,7 +225,8 @@
       trailers_decompressed_(false),
       trailers_consumed_(false),
       http_decoder_visitor_(std::make_unique<HttpDecoderVisitor>(this)),
-      decoder_(http_decoder_visitor_.get()),
+      decoder_(http_decoder_visitor_.get(),
+               HttpDecoderOptionsForBidiStream(spdy_session)),
       sequencer_offset_(0),
       is_decoder_processing_input_(false),
       ack_listener_(nullptr),
@@ -271,6 +289,10 @@
     SpdyHeaderBlock header_block,
     bool fin,
     QuicReferenceCountedPointer<QuicAckListenerInterface> ack_listener) {
+  if (!AssertNotWebTransportDataStream("writing headers")) {
+    return 0;
+  }
+
   QuicConnection::ScopedPacketFlusher flusher(spdy_session_->connection());
   // Send stream type for server push stream
   if (VersionUsesHttp3(transport_version()) && type() == WRITE_UNIDIRECTIONAL &&
@@ -305,6 +327,9 @@
 }
 
 void QuicSpdyStream::WriteOrBufferBody(absl::string_view data, bool fin) {
+  if (!AssertNotWebTransportDataStream("writing body data")) {
+    return;
+  }
   if (!VersionUsesHttp3(transport_version()) || data.length() == 0) {
     WriteOrBufferData(data, fin, nullptr);
     return;
@@ -703,6 +728,11 @@
 }
 
 void QuicSpdyStream::OnStreamReset(const QuicRstStreamFrame& frame) {
+  if (web_transport_data_ != nullptr) {
+    QuicStream::OnStreamReset(frame);
+    return;
+  }
+
   // TODO(bnc): Merge the two blocks below when both
   // quic_abort_qpack_on_stream_reset and quic_fix_on_stream_reset are
   // deprecated.
@@ -743,7 +773,7 @@
 
 void QuicSpdyStream::Reset(QuicRstStreamErrorCode error) {
   if (VersionUsesHttp3(transport_version()) && !fin_received() &&
-      spdy_session_->qpack_decoder()) {
+      spdy_session_->qpack_decoder() && web_transport_data_ == nullptr) {
     QUIC_CODE_COUNT_N(quic_abort_qpack_on_stream_reset, 2, 2);
     spdy_session_->qpack_decoder()->OnStreamReset(id());
     if (GetQuicReloadableFlag(quic_abort_qpack_on_stream_reset)) {
@@ -766,6 +796,11 @@
     return;
   }
 
+  if (web_transport_data_ != nullptr) {
+    web_transport_data_->adapter.OnDataAvailable();
+    return;
+  }
+
   if (!spdy_session()->ShouldProcessIncomingRequests()) {
     spdy_session()->OnStreamWaitingForClientSettings(id());
     return;
@@ -797,6 +832,9 @@
     if (blocked_on_decoding_headers_) {
       return;
     }
+    if (web_transport_data_ != nullptr) {
+      return;
+    }
   }
 
   // Do not call OnBodyAvailable() until headers are consumed.
@@ -832,6 +870,20 @@
   if (web_transport_ != nullptr) {
     web_transport_->CloseAllAssociatedStreams();
   }
+  if (web_transport_data_ != nullptr) {
+    WebTransportHttp3* web_transport =
+        spdy_session_->GetWebTransportSession(web_transport_data_->session_id);
+    if (web_transport == nullptr) {
+      // Since there is no guaranteed destruction order for streams, the session
+      // could be already removed from the stream map by the time we reach here.
+      QUIC_DLOG(WARNING) << ENDPOINT << "WebTransport stream " << id()
+                         << " attempted to notify parent session "
+                         << web_transport_data_->session_id
+                         << ", but the session could not be found.";
+      return;
+    }
+    web_transport->OnStreamClosed(id());
+  }
 }
 
 void QuicSpdyStream::OnCanWrite() {
@@ -1071,6 +1123,36 @@
   return OnHeadersFrameEnd();
 }
 
+void QuicSpdyStream::OnWebTransportStreamFrameType(
+    QuicByteCount header_length,
+    WebTransportSessionId session_id) {
+  QUIC_DVLOG(1) << ENDPOINT << " Received WEBTRANSPORT_STREAM on stream "
+                << id() << " for session " << session_id;
+  sequencer()->MarkConsumed(header_length);
+
+  if (headers_payload_length_ > 0 || headers_decompressed_) {
+    QUIC_PEER_BUG(WEBTRANSPORT_STREAM received on HTTP request)
+        << ENDPOINT << "Stream " << id()
+        << " tried to convert to WebTransport, but it already "
+           "has HTTP data on it";
+    Reset(QUIC_STREAM_FRAME_UNEXPECTED);
+  }
+  if (QuicUtils::IsOutgoingStreamId(spdy_session_->version(), id(),
+                                    spdy_session_->perspective())) {
+    QUIC_PEER_BUG(WEBTRANSPORT_STREAM received on outgoing request)
+        << ENDPOINT << "Stream " << id()
+        << " tried to convert to WebTransport, but only the "
+           "initiator of the stream can do it.";
+    Reset(QUIC_STREAM_FRAME_UNEXPECTED);
+  }
+
+  QUICHE_DCHECK(web_transport_ == nullptr);
+  web_transport_data_ =
+      std::make_unique<WebTransportDataStream>(this, session_id);
+  spdy_session_->AssociateIncomingWebTransportStreamWithSession(session_id,
+                                                                id());
+}
+
 bool QuicSpdyStream::OnUnknownFrameStart(uint64_t frame_type,
                                          QuicByteCount header_length,
                                          QuicByteCount payload_length) {
@@ -1208,5 +1290,63 @@
       std::make_unique<WebTransportHttp3>(spdy_session_, this, id());
 }
 
+void QuicSpdyStream::OnCanWriteNewData() {
+  if (web_transport_data_ != nullptr) {
+    web_transport_data_->adapter.OnCanWriteNewData();
+  }
+}
+
+bool QuicSpdyStream::AssertNotWebTransportDataStream(
+    absl::string_view operation) {
+  if (web_transport_data_ != nullptr) {
+    QUIC_BUG(Invalid operation on WebTransport stream)
+        << "Attempted to " << operation << " on WebTransport data stream "
+        << id() << " associated with session "
+        << web_transport_data_->session_id;
+    OnUnrecoverableError(QUIC_INTERNAL_ERROR,
+                         absl::StrCat("Attempted to ", operation,
+                                      " on WebTransport data stream"));
+    return false;
+  }
+  return true;
+}
+
+void QuicSpdyStream::ConvertToWebTransportDataStream(
+    WebTransportSessionId session_id) {
+  if (send_buffer().stream_offset() != 0) {
+    QUIC_BUG(Sending WEBTRANSPORT_STREAM when data already sent)
+        << "Attempted to send a WEBTRANSPORT_STREAM frame when other data has "
+           "already been sent on the stream.";
+    OnUnrecoverableError(QUIC_INTERNAL_ERROR,
+                         "Attempted to send a WEBTRANSPORT_STREAM frame when "
+                         "other data has already been sent on the stream.");
+    return;
+  }
+
+  std::unique_ptr<char[]> header;
+  QuicByteCount header_size =
+      HttpEncoder::SerializeWebTransportStreamFrameHeader(session_id, &header);
+  if (header_size == 0) {
+    QUIC_BUG(Failed to serialize WEBTRANSPORT_STREAM)
+        << "Failed to serialize a WEBTRANSPORT_STREAM frame.";
+    OnUnrecoverableError(QUIC_INTERNAL_ERROR,
+                         "Failed to serialize a WEBTRANSPORT_STREAM frame.");
+    return;
+  }
+
+  WriteOrBufferData(absl::string_view(header.get(), header_size), /*fin=*/false,
+                    nullptr);
+  web_transport_data_ =
+      std::make_unique<WebTransportDataStream>(this, session_id);
+  QUIC_DVLOG(1) << ENDPOINT << "Successfully opened WebTransport data stream "
+                << id() << " for session " << session_id;
+}
+
+QuicSpdyStream::WebTransportDataStream::WebTransportDataStream(
+    QuicSpdyStream* stream,
+    WebTransportSessionId session_id)
+    : session_id(session_id),
+      adapter(stream->spdy_session_, stream, stream->sequencer()) {}
+
 #undef ENDPOINT  // undef for jumbo builds
 }  // namespace quic
diff --git a/quic/core/http/quic_spdy_stream.h b/quic/core/http/quic_spdy_stream.h
index ef3c846..5f96d83 100644
--- a/quic/core/http/quic_spdy_stream.h
+++ b/quic/core/http/quic_spdy_stream.h
@@ -25,6 +25,9 @@
 #include "quic/core/quic_packets.h"
 #include "quic/core/quic_stream.h"
 #include "quic/core/quic_stream_sequencer.h"
+#include "quic/core/quic_types.h"
+#include "quic/core/web_transport_interface.h"
+#include "quic/core/web_transport_stream_adapter.h"
 #include "quic/platform/api/quic_export.h"
 #include "quic/platform/api/quic_flags.h"
 #include "quic/platform/api/quic_socket_address.h"
@@ -223,6 +226,24 @@
   // Returns the WebTransport session owned by this stream, if one exists.
   WebTransportHttp3* web_transport() { return web_transport_.get(); }
 
+  // Returns the WebTransport data stream associated with this QUIC stream, or
+  // null if this is not a WebTransport data stream.
+  WebTransportStream* web_transport_stream() {
+    if (web_transport_data_ == nullptr) {
+      return nullptr;
+    }
+    return &web_transport_data_->adapter;
+  }
+
+  // Sends a WEBTRANSPORT_STREAM frame and sets up the appropriate metadata.
+  void ConvertToWebTransportDataStream(WebTransportSessionId session_id);
+
+  void OnCanWriteNewData() override;
+
+  // If this stream is a WebTransport data stream, closes the connection with an
+  // error, and returns false.
+  bool AssertNotWebTransportDataStream(absl::string_view operation);
+
  protected:
   // Called when the received headers are too large. By default this will
   // reset the stream.
@@ -254,6 +275,14 @@
   friend class QuicStreamUtils;
   class HttpDecoderVisitor;
 
+  struct QUIC_EXPORT_PRIVATE WebTransportDataStream {
+    WebTransportDataStream(QuicSpdyStream* stream,
+                           WebTransportSessionId session_id);
+
+    WebTransportSessionId session_id;
+    WebTransportStreamAdapter adapter;
+  };
+
   // Called by HttpDecoderVisitor.
   bool OnDataFrameStart(QuicByteCount header_length,
                         QuicByteCount payload_length);
@@ -269,6 +298,8 @@
                                 QuicByteCount header_block_length);
   bool OnPushPromiseFramePayload(absl::string_view payload);
   bool OnPushPromiseFrameEnd();
+  void OnWebTransportStreamFrameType(QuicByteCount header_length,
+                                     WebTransportSessionId session_id);
   bool OnUnknownFrameStart(uint64_t frame_type,
                            QuicByteCount header_length,
                            QuicByteCount payload_length);
@@ -347,6 +378,10 @@
   // If this stream is a WebTransport extended CONNECT stream, contains the
   // WebTransport session associated with this stream.
   std::unique_ptr<WebTransportHttp3> web_transport_;
+
+  // If this stream is a WebTransport data stream, |web_transport_data_|
+  // contains all of the associated metadata.
+  std::unique_ptr<WebTransportDataStream> web_transport_data_;
 };
 
 }  // namespace quic
diff --git a/quic/core/http/web_transport_http3.cc b/quic/core/http/web_transport_http3.cc
index dac35f7..3d1812e 100644
--- a/quic/core/http/web_transport_http3.cc
+++ b/quic/core/http/web_transport_http3.cc
@@ -91,7 +91,17 @@
 }
 
 WebTransportStream* WebTransportHttp3::AcceptIncomingBidirectionalStream() {
-  // TODO(vasilvv): implement this.
+  while (!incoming_bidirectional_streams_.empty()) {
+    QuicStreamId id = incoming_bidirectional_streams_.front();
+    incoming_bidirectional_streams_.pop_front();
+    QuicSpdyStream* stream = session_->GetOrCreateSpdyDataStream(id);
+    if (stream == nullptr) {
+      // Skip the streams that were reset in between the time they were
+      // receieved and the time the client has polled for them.
+      continue;
+    }
+    return stream->web_transport_stream();
+  }
   return nullptr;
 }
 
@@ -112,15 +122,20 @@
 }
 
 bool WebTransportHttp3::CanOpenNextOutgoingBidirectionalStream() {
-  // TODO(vasilvv): implement this.
-  return false;
+  return session_->CanOpenOutgoingBidirectionalWebTransportStream(id_);
 }
 bool WebTransportHttp3::CanOpenNextOutgoingUnidirectionalStream() {
   return session_->CanOpenOutgoingUnidirectionalWebTransportStream(id_);
 }
 WebTransportStream* WebTransportHttp3::OpenOutgoingBidirectionalStream() {
-  // TODO(vasilvv): implement this.
-  return nullptr;
+  QuicSpdyStream* stream =
+      session_->CreateOutgoingBidirectionalWebTransportStream(this);
+  if (stream == nullptr) {
+    // If stream cannot be created due to flow control or other errors, return
+    // nullptr.
+    return nullptr;
+  }
+  return stream->web_transport_stream();
 }
 
 WebTransportStream* WebTransportHttp3::OpenOutgoingUnidirectionalStream() {
diff --git a/quic/test_tools/quic_test_backend.cc b/quic/test_tools/quic_test_backend.cc
index 5defee7..bf7fc66 100644
--- a/quic/test_tools/quic_test_backend.cc
+++ b/quic/test_tools/quic_test_backend.cc
@@ -27,7 +27,18 @@
   void OnSessionReady() override {}
 
   void OnIncomingBidirectionalStreamAvailable() override {
-    // TODO(vasilvv): implement once bidirectional streams are supported.
+    while (true) {
+      WebTransportStream* stream =
+          session_->AcceptIncomingBidirectionalStream();
+      if (stream == nullptr) {
+        return;
+      }
+      QUIC_DVLOG(1) << "EchoWebTransportServer received a bidirectional stream "
+                    << stream->GetStreamId();
+      stream->SetVisitor(
+          std::make_unique<WebTransportBidirectionalEchoVisitor>(stream));
+      stream->visitor()->OnCanRead();
+    }
   }
 
   void OnIncomingUnidirectionalStreamAvailable() override {