Implement client-initiated bidirectional streams in WebTransport over HTTP/3. PiperOrigin-RevId: 366912320 Change-Id: Ia8e8cab56eb8a573941ac9f5cf467d609b4dd744
diff --git a/quic/core/http/end_to_end_test.cc b/quic/core/http/end_to_end_test.cc index 90b4f5b..6a83d2e 100644 --- a/quic/core/http/end_to_end_test.cc +++ b/quic/core/http/end_to_end_test.cc
@@ -724,6 +724,33 @@ return visitor; } + std::string ReadDataFromWebTransportStreamUntilFin( + WebTransportStream* stream) { + std::string buffer; + while (true) { + bool can_read = false; + auto visitor = std::make_unique<MockStreamVisitor>(); + EXPECT_CALL(*visitor, OnCanRead()).WillOnce(Assign(&can_read, true)); + stream->SetVisitor(std::move(visitor)); + client_->WaitUntil(5000 /*ms*/, [&can_read]() { return can_read; }); + if (!can_read) { + ADD_FAILURE() << "Waiting for readable data on stream " + << stream->GetStreamId() << " timed out"; + return buffer; + } + + WebTransportStream::ReadResult result = stream->Read(&buffer); + if (result.fin) { + return buffer; + } + if (result.bytes_read == 0) { + ADD_FAILURE() << "No progress made while reading from stream " + << stream->GetStreamId(); + return buffer; + } + } + } + ScopedEnvironmentForThreads environment_; bool initialized_; // If true, the Initialize() function will create |client_| and starts to @@ -5802,6 +5829,49 @@ EXPECT_TRUE(result.fin); } +TEST_P(EndToEndTest, WebTransportSessionBidirectionalStream) { + enable_web_transport_ = true; + ASSERT_TRUE(Initialize()); + + if (!version_.UsesHttp3()) { + return; + } + + WebTransportHttp3* session = + CreateWebTransportSession("/echo", /*wait_for_server_response=*/true); + ASSERT_TRUE(session != nullptr); + + WebTransportStream* stream = session->OpenOutgoingBidirectionalStream(); + ASSERT_TRUE(stream != nullptr); + EXPECT_TRUE(stream->Write("test")); + EXPECT_TRUE(stream->SendFin()); + + std::string received_data = ReadDataFromWebTransportStreamUntilFin(stream); + EXPECT_EQ(received_data, "test"); +} + +TEST_P(EndToEndTest, WebTransportSessionBidirectionalStreamWithBuffering) { + enable_web_transport_ = true; + SetPacketLossPercentage(30); + ASSERT_TRUE(Initialize()); + + if (!version_.UsesHttp3()) { + return; + } + + WebTransportHttp3* session = + CreateWebTransportSession("/echo", /*wait_for_server_response=*/false); + ASSERT_TRUE(session != nullptr); + + WebTransportStream* stream = session->OpenOutgoingBidirectionalStream(); + ASSERT_TRUE(stream != nullptr); + EXPECT_TRUE(stream->Write("test")); + EXPECT_TRUE(stream->SendFin()); + + std::string received_data = ReadDataFromWebTransportStreamUntilFin(stream); + EXPECT_EQ(received_data, "test"); +} + } // namespace } // namespace test } // namespace quic
diff --git a/quic/core/http/http_decoder.cc b/quic/core/http/http_decoder.cc index 6a5eee0..3544cfd 100644 --- a/quic/core/http/http_decoder.cc +++ b/quic/core/http/http_decoder.cc
@@ -5,6 +5,7 @@ #include "quic/core/http/http_decoder.h" #include <cstdint> +#include <limits> #include "absl/base/attributes.h" #include "absl/strings/string_view.h" @@ -20,8 +21,10 @@ namespace quic { -HttpDecoder::HttpDecoder(Visitor* visitor) +HttpDecoder::HttpDecoder(Visitor* visitor) : HttpDecoder(visitor, Options()) {} +HttpDecoder::HttpDecoder(Visitor* visitor, Options options) : visitor_(visitor), + allow_web_transport_stream_(options.allow_web_transport_stream), state_(STATE_READING_FRAME_TYPE), current_frame_type_(0), current_length_field_length_(0), @@ -108,6 +111,15 @@ case STATE_FINISH_PARSING: continue_processing = FinishParsing(&reader); break; + case STATE_PARSING_NO_LONGER_POSSIBLE: + continue_processing = false; + QUIC_BUG(HttpDecoder PARSING_NO_LONGER_POSSIBLE) + << "HttpDecoder called after an indefinite-length frame has been " + "received"; + RaiseError(QUIC_INTERNAL_ERROR, + "HttpDecoder called after an indefinite-length frame has " + "been received"); + break; case STATE_ERROR: break; default: @@ -192,6 +204,19 @@ QUICHE_DCHECK(success); } + // WEBTRANSPORT_STREAM frames are indefinitely long, and thus require + // special handling; the number after the frame type is actually the + // WebTransport session ID, and not the length. + if (allow_web_transport_stream_ && + current_frame_type_ == + static_cast<uint64_t>(HttpFrameType::WEBTRANSPORT_STREAM)) { + visitor_->OnWebTransportStreamFrameType( + current_length_field_length_ + current_type_field_length_, + current_frame_length_); + state_ = STATE_PARSING_NO_LONGER_POSSIBLE; + return false; + } + if (current_frame_length_ > MaxFrameLength(current_frame_type_)) { RaiseError(QUIC_HTTP_FRAME_TOO_LARGE, "Frame is too large."); return false;
diff --git a/quic/core/http/http_decoder.h b/quic/core/http/http_decoder.h index 30ae932..a351415 100644 --- a/quic/core/http/http_decoder.h +++ b/quic/core/http/http_decoder.h
@@ -27,6 +27,11 @@ // session. class QUIC_EXPORT_PRIVATE HttpDecoder { public: + struct QUIC_EXPORT_PRIVATE Options { + // Indicates that WEBTRANSPORT_STREAM should be parsed. + bool allow_web_transport_stream = false; + }; + class QUIC_EXPORT_PRIVATE Visitor { public: virtual ~Visitor() {} @@ -109,6 +114,15 @@ // Called when an ACCEPT_CH frame has been successfully parsed. virtual bool OnAcceptChFrame(const AcceptChFrame& frame) = 0; + // Called when a WEBTRANSPORT_STREAM frame type and the session ID varint + // immediately following it has been received. Any further parsing should + // be done by the stream itself, and not the parser. Note that this does not + // return bool, because WEBTRANSPORT_STREAM always causes the parsing + // process to cease. + virtual void OnWebTransportStreamFrameType( + QuicByteCount header_length, + WebTransportSessionId session_id) = 0; + // Called when a frame of unknown type |frame_type| has been received. // Frame type might be reserved, Visitor must make sure to ignore. // |header_length| and |payload_length| are the length of the frame header @@ -126,6 +140,7 @@ // |visitor| must be non-null, and must outlive HttpDecoder. explicit HttpDecoder(Visitor* visitor); + explicit HttpDecoder(Visitor* visitor, Options options); ~HttpDecoder(); @@ -162,6 +177,7 @@ STATE_READING_FRAME_TYPE, STATE_READING_FRAME_PAYLOAD, STATE_FINISH_PARSING, + STATE_PARSING_NO_LONGER_POSSIBLE, STATE_ERROR }; @@ -241,6 +257,8 @@ // Visitor to invoke when messages are parsed. Visitor* const visitor_; // Unowned. + // Whether WEBTRANSPORT_STREAM should be parsed. + bool allow_web_transport_stream_; // Current state of the parsing. HttpDecoderState state_; // Type of the frame currently being parsed.
diff --git a/quic/core/http/http_decoder_test.cc b/quic/core/http/http_decoder_test.cc index 4f518a4..aac020a 100644 --- a/quic/core/http/http_decoder_test.cc +++ b/quic/core/http/http_decoder_test.cc
@@ -15,6 +15,7 @@ #include "quic/core/http/http_frames.h" #include "quic/core/quic_data_writer.h" #include "quic/core/quic_versions.h" +#include "quic/platform/api/quic_expect_bug.h" #include "quic/platform/api/quic_flags.h" #include "quic/platform/api/quic_test.h" #include "quic/test_tools/quic_test_utils.h" @@ -109,6 +110,10 @@ (QuicByteCount header_length), (override)); MOCK_METHOD(bool, OnAcceptChFrame, (const AcceptChFrame& frame), (override)); + MOCK_METHOD(void, + OnWebTransportStreamFrameType, + (QuicByteCount header_length, WebTransportSessionId session_id), + (override)); MOCK_METHOD(bool, OnUnknownFrameStart, @@ -1301,6 +1306,43 @@ EXPECT_EQ("", decoder_.error_detail()); } +TEST_F(HttpDecoderTest, WebTransportStreamDisabled) { + InSequence s; + + // Unknown frame of type 0x41 and length 0x104. + std::string input = absl::HexStringToBytes("40414104"); + EXPECT_CALL(visitor_, OnUnknownFrameStart(0x41, input.size(), 0x104)); + EXPECT_EQ(ProcessInput(input), input.size()); +} + +TEST(HttpDecoderTestNoFixture, WebTransportStream) { + HttpDecoder::Options options; + options.allow_web_transport_stream = true; + testing::StrictMock<MockVisitor> visitor; + HttpDecoder decoder(&visitor, options); + + // WebTransport stream for session ID 0x104, with four bytes of extra data. + std::string input = absl::HexStringToBytes("40414104ffffffff"); + EXPECT_CALL(visitor, OnWebTransportStreamFrameType(4, 0x104)); + QuicByteCount bytes = decoder.ProcessInput(input.data(), input.size()); + EXPECT_EQ(bytes, 4u); +} + +TEST(HttpDecoderTestNoFixture, WebTransportStreamError) { + HttpDecoder::Options options; + options.allow_web_transport_stream = true; + testing::StrictMock<MockVisitor> visitor; + HttpDecoder decoder(&visitor, options); + + std::string input = absl::HexStringToBytes("404100"); + EXPECT_CALL(visitor, OnWebTransportStreamFrameType(_, _)); + decoder.ProcessInput(input.data(), input.size()); + + EXPECT_CALL(visitor, OnError(_)); + EXPECT_QUIC_BUG(decoder.ProcessInput(input.data(), input.size()), + "HttpDecoder called after an indefinite-length frame"); +} + TEST_F(HttpDecoderTest, DecodeSettings) { std::string input = absl::HexStringToBytes( "04" // type (SETTINGS)
diff --git a/quic/core/http/http_encoder.cc b/quic/core/http/http_encoder.cc index 1fa215c..2fbc238 100644 --- a/quic/core/http/http_encoder.cc +++ b/quic/core/http/http_encoder.cc
@@ -248,4 +248,25 @@ return 0; } +QuicByteCount HttpEncoder::SerializeWebTransportStreamFrameHeader( + WebTransportSessionId session_id, + std::unique_ptr<char[]>* output) { + uint64_t stream_type = + static_cast<uint64_t>(HttpFrameType::WEBTRANSPORT_STREAM); + QuicByteCount header_length = QuicDataWriter::GetVarInt62Len(stream_type) + + QuicDataWriter::GetVarInt62Len(session_id); + + *output = std::make_unique<char[]>(header_length); + QuicDataWriter writer(header_length, output->get()); + bool success = + writer.WriteVarInt62(stream_type) && writer.WriteVarInt62(session_id); + if (success && writer.remaining() == 0) { + return header_length; + } + + QUIC_DLOG(ERROR) << "Http encoder failed when attempting to serialize " + "WEBTRANSPORT_STREAM frame header."; + return 0; +} + } // namespace quic
diff --git a/quic/core/http/http_encoder.h b/quic/core/http/http_encoder.h index b6cb9ad..0a8aa84 100644 --- a/quic/core/http/http_encoder.h +++ b/quic/core/http/http_encoder.h
@@ -8,6 +8,7 @@ #include <memory> #include "quic/core/http/http_frames.h" #include "quic/core/quic_error_codes.h" +#include "quic/core/quic_types.h" #include "quic/platform/api/quic_export.h" namespace quic { @@ -56,6 +57,12 @@ // Serializes a frame with reserved frame type specified in // https://tools.ietf.org/html/draft-ietf-quic-http-25#section-7.2.9. static QuicByteCount SerializeGreasingFrame(std::unique_ptr<char[]>* output); + + // Serializes a WEBTRANSPORT_STREAM frame header as specified in + // https://www.ietf.org/archive/id/draft-ietf-webtrans-http3-00.html#name-client-initiated-bidirectio + static QuicByteCount SerializeWebTransportStreamFrameHeader( + WebTransportSessionId session_id, + std::unique_ptr<char[]>* output); }; } // namespace quic
diff --git a/quic/core/http/http_encoder_test.cc b/quic/core/http/http_encoder_test.cc index c59dd85..66e988f 100644 --- a/quic/core/http/http_encoder_test.cc +++ b/quic/core/http/http_encoder_test.cc
@@ -123,5 +123,18 @@ output2, ABSL_ARRAYSIZE(output2)); } +TEST(HttpEncoderTest, SerializeWebTransportStreamFrameHeader) { + WebTransportSessionId session_id = 0x17; + char output[] = {0x40, 0x41, // type (WEBTRANSPORT_STREAM) + 0x17}; // session ID + + std::unique_ptr<char[]> buffer; + uint64_t length = + HttpEncoder::SerializeWebTransportStreamFrameHeader(session_id, &buffer); + EXPECT_EQ(sizeof(output), length); + quiche::test::CompareCharArraysWithHexError( + "WEBTRANSPORT_STREAM", buffer.get(), length, output, sizeof(output)); +} + } // namespace test } // namespace quic
diff --git a/quic/core/http/http_frames.h b/quic/core/http/http_frames.h index 0d55857..56e5348 100644 --- a/quic/core/http/http_frames.h +++ b/quic/core/http/http_frames.h
@@ -34,6 +34,8 @@ ACCEPT_CH = 0x89, // https://tools.ietf.org/html/draft-ietf-httpbis-priority-02 PRIORITY_UPDATE_REQUEST_STREAM = 0xF0700, + // https://www.ietf.org/archive/id/draft-ietf-webtrans-http3-00.html + WEBTRANSPORT_STREAM = 0x41, }; // 7.2.1. DATA
diff --git a/quic/core/http/quic_receive_control_stream.cc b/quic/core/http/quic_receive_control_stream.cc index ffc38a0..fbd7b2e 100644 --- a/quic/core/http/quic_receive_control_stream.cc +++ b/quic/core/http/quic_receive_control_stream.cc
@@ -233,6 +233,13 @@ return true; } +void QuicReceiveControlStream::OnWebTransportStreamFrameType( + QuicByteCount /*header_length*/, + WebTransportSessionId /*session_id*/) { + QUIC_BUG(WEBTRANSPORT_STREAM on Control Stream) + << "Parsed WEBTRANSPORT_STREAM on a control stream."; +} + bool QuicReceiveControlStream::OnUnknownFrameStart( uint64_t frame_type, QuicByteCount /*header_length*/,
diff --git a/quic/core/http/quic_receive_control_stream.h b/quic/core/http/quic_receive_control_stream.h index 71f0e05..4ed0104 100644 --- a/quic/core/http/quic_receive_control_stream.h +++ b/quic/core/http/quic_receive_control_stream.h
@@ -58,6 +58,8 @@ bool OnPriorityUpdateFrame(const PriorityUpdateFrame& frame) override; bool OnAcceptChFrameStart(QuicByteCount header_length) override; bool OnAcceptChFrame(const AcceptChFrame& frame) override; + void OnWebTransportStreamFrameType(QuicByteCount header_length, + WebTransportSessionId session_id) override; bool OnUnknownFrameStart(uint64_t frame_type, QuicByteCount header_length, QuicByteCount payload_length) override;
diff --git a/quic/core/http/quic_spdy_session.cc b/quic/core/http/quic_spdy_session.cc index 6a56cba..86ddc94 100644 --- a/quic/core/http/quic_spdy_session.cc +++ b/quic/core/http/quic_spdy_session.cc
@@ -160,6 +160,11 @@ session_->OnAcceptChFrameReceivedViaAlps(frame); return true; } + void OnWebTransportStreamFrameType( + QuicByteCount /*header_length*/, + WebTransportSessionId /*session_id*/) override { + QUICHE_NOTREACHED(); + } bool OnUnknownFrameStart(uint64_t /*frame_type*/, QuicByteCount /*header_length*/, @@ -1810,6 +1815,23 @@ return stream; } +QuicSpdyStream* QuicSpdySession::CreateOutgoingBidirectionalWebTransportStream( + WebTransportHttp3* session) { + QuicSpdyStream* stream = CreateOutgoingBidirectionalStream(); + if (stream == nullptr) { + return nullptr; + } + QuicStreamId stream_id = stream->id(); + stream->ConvertToWebTransportDataStream(session->id()); + if (stream->web_transport_stream() == nullptr) { + // An error in ConvertToWebTransportDataStream() would result in + // CONNECTION_CLOSE, thus we don't need to do anything here. + return nullptr; + } + session->AssociateStream(stream_id); + return stream; +} + #undef ENDPOINT // undef for jumbo builds } // namespace quic
diff --git a/quic/core/http/quic_spdy_session.h b/quic/core/http/quic_spdy_session.h index cc99b5b..f5903b3 100644 --- a/quic/core/http/quic_spdy_session.h +++ b/quic/core/http/quic_spdy_session.h
@@ -435,6 +435,10 @@ // Indicates whether the HTTP/3 session supports WebTransport. bool SupportsWebTransport(); + // Indicates whether the HTTP/3 session will indicate WebTransport support to + // the peer. + bool WillNegotiateWebTransport(); + // Returns a WebTransport session by its session ID. Returns nullptr if no // session is associated with the given ID. WebTransportHttp3* GetWebTransportSession(WebTransportSessionId id); @@ -465,12 +469,23 @@ WebTransportSessionId /*id*/) { return CanOpenNextOutgoingUnidirectionalStream(); } + bool CanOpenOutgoingBidirectionalWebTransportStream( + WebTransportSessionId /*id*/) { + return CanOpenNextOutgoingBidirectionalStream(); + } // Creates an outgoing unidirectional WebTransport stream. Returns nullptr if // the stream cannot be created due to flow control or some other reason. WebTransportHttp3UnidirectionalStream* CreateOutgoingUnidirectionalWebTransportStream(WebTransportHttp3* session); + // Creates an outgoing bidirectional WebTransport stream. Returns nullptr if + // the stream cannot be created due to flow control or some other reason. + QuicSpdyStream* CreateOutgoingBidirectionalWebTransportStream( + WebTransportHttp3* session); + + QuicSpdyStream* GetOrCreateSpdyDataStream(const QuicStreamId stream_id); + protected: // Override CreateIncomingStream(), CreateOutgoingBidirectionalStream() and // CreateOutgoingUnidirectionalStream() with QuicSpdyStream return type to @@ -480,8 +495,6 @@ virtual QuicSpdyStream* CreateOutgoingBidirectionalStream() = 0; virtual QuicSpdyStream* CreateOutgoingUnidirectionalStream() = 0; - QuicSpdyStream* GetOrCreateSpdyDataStream(const QuicStreamId stream_id); - // If an incoming stream can be created, return true. virtual bool ShouldCreateIncomingStream(QuicStreamId id) = 0; @@ -493,7 +506,6 @@ // Indicates whether the underlying backend can accept and process // WebTransport sessions over HTTP/3. virtual bool ShouldNegotiateWebTransport(); - bool WillNegotiateWebTransport(); // Returns true if there are open HTTP requests. bool ShouldKeepConnectionAlive() const override;
diff --git a/quic/core/http/quic_spdy_stream.cc b/quic/core/http/quic_spdy_stream.cc index 00c631b..e2df8f5 100644 --- a/quic/core/http/quic_spdy_stream.cc +++ b/quic/core/http/quic_spdy_stream.cc
@@ -19,6 +19,7 @@ #include "quic/core/http/web_transport_http3.h" #include "quic/core/qpack/qpack_decoder.h" #include "quic/core/qpack/qpack_encoder.h" +#include "quic/core/quic_error_codes.h" #include "quic/core/quic_utils.h" #include "quic/core/quic_versions.h" #include "quic/core/quic_write_blocked_list.h" @@ -166,6 +167,12 @@ return false; } + void OnWebTransportStreamFrameType( + QuicByteCount header_length, + WebTransportSessionId session_id) override { + stream_->OnWebTransportStreamFrameType(header_length, session_id); + } + bool OnUnknownFrameStart(uint64_t frame_type, QuicByteCount header_length, QuicByteCount payload_length) override { @@ -194,6 +201,16 @@ : "Client:" \ " ") +namespace { +HttpDecoder::Options HttpDecoderOptionsForBidiStream( + QuicSpdySession* spdy_session) { + HttpDecoder::Options options; + options.allow_web_transport_stream = + spdy_session->WillNegotiateWebTransport(); + return options; +} +} // namespace + QuicSpdyStream::QuicSpdyStream(QuicStreamId id, QuicSpdySession* spdy_session, StreamType type) @@ -208,7 +225,8 @@ trailers_decompressed_(false), trailers_consumed_(false), http_decoder_visitor_(std::make_unique<HttpDecoderVisitor>(this)), - decoder_(http_decoder_visitor_.get()), + decoder_(http_decoder_visitor_.get(), + HttpDecoderOptionsForBidiStream(spdy_session)), sequencer_offset_(0), is_decoder_processing_input_(false), ack_listener_(nullptr), @@ -271,6 +289,10 @@ SpdyHeaderBlock header_block, bool fin, QuicReferenceCountedPointer<QuicAckListenerInterface> ack_listener) { + if (!AssertNotWebTransportDataStream("writing headers")) { + return 0; + } + QuicConnection::ScopedPacketFlusher flusher(spdy_session_->connection()); // Send stream type for server push stream if (VersionUsesHttp3(transport_version()) && type() == WRITE_UNIDIRECTIONAL && @@ -305,6 +327,9 @@ } void QuicSpdyStream::WriteOrBufferBody(absl::string_view data, bool fin) { + if (!AssertNotWebTransportDataStream("writing body data")) { + return; + } if (!VersionUsesHttp3(transport_version()) || data.length() == 0) { WriteOrBufferData(data, fin, nullptr); return; @@ -703,6 +728,11 @@ } void QuicSpdyStream::OnStreamReset(const QuicRstStreamFrame& frame) { + if (web_transport_data_ != nullptr) { + QuicStream::OnStreamReset(frame); + return; + } + // TODO(bnc): Merge the two blocks below when both // quic_abort_qpack_on_stream_reset and quic_fix_on_stream_reset are // deprecated. @@ -743,7 +773,7 @@ void QuicSpdyStream::Reset(QuicRstStreamErrorCode error) { if (VersionUsesHttp3(transport_version()) && !fin_received() && - spdy_session_->qpack_decoder()) { + spdy_session_->qpack_decoder() && web_transport_data_ == nullptr) { QUIC_CODE_COUNT_N(quic_abort_qpack_on_stream_reset, 2, 2); spdy_session_->qpack_decoder()->OnStreamReset(id()); if (GetQuicReloadableFlag(quic_abort_qpack_on_stream_reset)) { @@ -766,6 +796,11 @@ return; } + if (web_transport_data_ != nullptr) { + web_transport_data_->adapter.OnDataAvailable(); + return; + } + if (!spdy_session()->ShouldProcessIncomingRequests()) { spdy_session()->OnStreamWaitingForClientSettings(id()); return; @@ -797,6 +832,9 @@ if (blocked_on_decoding_headers_) { return; } + if (web_transport_data_ != nullptr) { + return; + } } // Do not call OnBodyAvailable() until headers are consumed. @@ -832,6 +870,20 @@ if (web_transport_ != nullptr) { web_transport_->CloseAllAssociatedStreams(); } + if (web_transport_data_ != nullptr) { + WebTransportHttp3* web_transport = + spdy_session_->GetWebTransportSession(web_transport_data_->session_id); + if (web_transport == nullptr) { + // Since there is no guaranteed destruction order for streams, the session + // could be already removed from the stream map by the time we reach here. + QUIC_DLOG(WARNING) << ENDPOINT << "WebTransport stream " << id() + << " attempted to notify parent session " + << web_transport_data_->session_id + << ", but the session could not be found."; + return; + } + web_transport->OnStreamClosed(id()); + } } void QuicSpdyStream::OnCanWrite() { @@ -1071,6 +1123,36 @@ return OnHeadersFrameEnd(); } +void QuicSpdyStream::OnWebTransportStreamFrameType( + QuicByteCount header_length, + WebTransportSessionId session_id) { + QUIC_DVLOG(1) << ENDPOINT << " Received WEBTRANSPORT_STREAM on stream " + << id() << " for session " << session_id; + sequencer()->MarkConsumed(header_length); + + if (headers_payload_length_ > 0 || headers_decompressed_) { + QUIC_PEER_BUG(WEBTRANSPORT_STREAM received on HTTP request) + << ENDPOINT << "Stream " << id() + << " tried to convert to WebTransport, but it already " + "has HTTP data on it"; + Reset(QUIC_STREAM_FRAME_UNEXPECTED); + } + if (QuicUtils::IsOutgoingStreamId(spdy_session_->version(), id(), + spdy_session_->perspective())) { + QUIC_PEER_BUG(WEBTRANSPORT_STREAM received on outgoing request) + << ENDPOINT << "Stream " << id() + << " tried to convert to WebTransport, but only the " + "initiator of the stream can do it."; + Reset(QUIC_STREAM_FRAME_UNEXPECTED); + } + + QUICHE_DCHECK(web_transport_ == nullptr); + web_transport_data_ = + std::make_unique<WebTransportDataStream>(this, session_id); + spdy_session_->AssociateIncomingWebTransportStreamWithSession(session_id, + id()); +} + bool QuicSpdyStream::OnUnknownFrameStart(uint64_t frame_type, QuicByteCount header_length, QuicByteCount payload_length) { @@ -1208,5 +1290,63 @@ std::make_unique<WebTransportHttp3>(spdy_session_, this, id()); } +void QuicSpdyStream::OnCanWriteNewData() { + if (web_transport_data_ != nullptr) { + web_transport_data_->adapter.OnCanWriteNewData(); + } +} + +bool QuicSpdyStream::AssertNotWebTransportDataStream( + absl::string_view operation) { + if (web_transport_data_ != nullptr) { + QUIC_BUG(Invalid operation on WebTransport stream) + << "Attempted to " << operation << " on WebTransport data stream " + << id() << " associated with session " + << web_transport_data_->session_id; + OnUnrecoverableError(QUIC_INTERNAL_ERROR, + absl::StrCat("Attempted to ", operation, + " on WebTransport data stream")); + return false; + } + return true; +} + +void QuicSpdyStream::ConvertToWebTransportDataStream( + WebTransportSessionId session_id) { + if (send_buffer().stream_offset() != 0) { + QUIC_BUG(Sending WEBTRANSPORT_STREAM when data already sent) + << "Attempted to send a WEBTRANSPORT_STREAM frame when other data has " + "already been sent on the stream."; + OnUnrecoverableError(QUIC_INTERNAL_ERROR, + "Attempted to send a WEBTRANSPORT_STREAM frame when " + "other data has already been sent on the stream."); + return; + } + + std::unique_ptr<char[]> header; + QuicByteCount header_size = + HttpEncoder::SerializeWebTransportStreamFrameHeader(session_id, &header); + if (header_size == 0) { + QUIC_BUG(Failed to serialize WEBTRANSPORT_STREAM) + << "Failed to serialize a WEBTRANSPORT_STREAM frame."; + OnUnrecoverableError(QUIC_INTERNAL_ERROR, + "Failed to serialize a WEBTRANSPORT_STREAM frame."); + return; + } + + WriteOrBufferData(absl::string_view(header.get(), header_size), /*fin=*/false, + nullptr); + web_transport_data_ = + std::make_unique<WebTransportDataStream>(this, session_id); + QUIC_DVLOG(1) << ENDPOINT << "Successfully opened WebTransport data stream " + << id() << " for session " << session_id; +} + +QuicSpdyStream::WebTransportDataStream::WebTransportDataStream( + QuicSpdyStream* stream, + WebTransportSessionId session_id) + : session_id(session_id), + adapter(stream->spdy_session_, stream, stream->sequencer()) {} + #undef ENDPOINT // undef for jumbo builds } // namespace quic
diff --git a/quic/core/http/quic_spdy_stream.h b/quic/core/http/quic_spdy_stream.h index ef3c846..5f96d83 100644 --- a/quic/core/http/quic_spdy_stream.h +++ b/quic/core/http/quic_spdy_stream.h
@@ -25,6 +25,9 @@ #include "quic/core/quic_packets.h" #include "quic/core/quic_stream.h" #include "quic/core/quic_stream_sequencer.h" +#include "quic/core/quic_types.h" +#include "quic/core/web_transport_interface.h" +#include "quic/core/web_transport_stream_adapter.h" #include "quic/platform/api/quic_export.h" #include "quic/platform/api/quic_flags.h" #include "quic/platform/api/quic_socket_address.h" @@ -223,6 +226,24 @@ // Returns the WebTransport session owned by this stream, if one exists. WebTransportHttp3* web_transport() { return web_transport_.get(); } + // Returns the WebTransport data stream associated with this QUIC stream, or + // null if this is not a WebTransport data stream. + WebTransportStream* web_transport_stream() { + if (web_transport_data_ == nullptr) { + return nullptr; + } + return &web_transport_data_->adapter; + } + + // Sends a WEBTRANSPORT_STREAM frame and sets up the appropriate metadata. + void ConvertToWebTransportDataStream(WebTransportSessionId session_id); + + void OnCanWriteNewData() override; + + // If this stream is a WebTransport data stream, closes the connection with an + // error, and returns false. + bool AssertNotWebTransportDataStream(absl::string_view operation); + protected: // Called when the received headers are too large. By default this will // reset the stream. @@ -254,6 +275,14 @@ friend class QuicStreamUtils; class HttpDecoderVisitor; + struct QUIC_EXPORT_PRIVATE WebTransportDataStream { + WebTransportDataStream(QuicSpdyStream* stream, + WebTransportSessionId session_id); + + WebTransportSessionId session_id; + WebTransportStreamAdapter adapter; + }; + // Called by HttpDecoderVisitor. bool OnDataFrameStart(QuicByteCount header_length, QuicByteCount payload_length); @@ -269,6 +298,8 @@ QuicByteCount header_block_length); bool OnPushPromiseFramePayload(absl::string_view payload); bool OnPushPromiseFrameEnd(); + void OnWebTransportStreamFrameType(QuicByteCount header_length, + WebTransportSessionId session_id); bool OnUnknownFrameStart(uint64_t frame_type, QuicByteCount header_length, QuicByteCount payload_length); @@ -347,6 +378,10 @@ // If this stream is a WebTransport extended CONNECT stream, contains the // WebTransport session associated with this stream. std::unique_ptr<WebTransportHttp3> web_transport_; + + // If this stream is a WebTransport data stream, |web_transport_data_| + // contains all of the associated metadata. + std::unique_ptr<WebTransportDataStream> web_transport_data_; }; } // namespace quic
diff --git a/quic/core/http/web_transport_http3.cc b/quic/core/http/web_transport_http3.cc index dac35f7..3d1812e 100644 --- a/quic/core/http/web_transport_http3.cc +++ b/quic/core/http/web_transport_http3.cc
@@ -91,7 +91,17 @@ } WebTransportStream* WebTransportHttp3::AcceptIncomingBidirectionalStream() { - // TODO(vasilvv): implement this. + while (!incoming_bidirectional_streams_.empty()) { + QuicStreamId id = incoming_bidirectional_streams_.front(); + incoming_bidirectional_streams_.pop_front(); + QuicSpdyStream* stream = session_->GetOrCreateSpdyDataStream(id); + if (stream == nullptr) { + // Skip the streams that were reset in between the time they were + // receieved and the time the client has polled for them. + continue; + } + return stream->web_transport_stream(); + } return nullptr; } @@ -112,15 +122,20 @@ } bool WebTransportHttp3::CanOpenNextOutgoingBidirectionalStream() { - // TODO(vasilvv): implement this. - return false; + return session_->CanOpenOutgoingBidirectionalWebTransportStream(id_); } bool WebTransportHttp3::CanOpenNextOutgoingUnidirectionalStream() { return session_->CanOpenOutgoingUnidirectionalWebTransportStream(id_); } WebTransportStream* WebTransportHttp3::OpenOutgoingBidirectionalStream() { - // TODO(vasilvv): implement this. - return nullptr; + QuicSpdyStream* stream = + session_->CreateOutgoingBidirectionalWebTransportStream(this); + if (stream == nullptr) { + // If stream cannot be created due to flow control or other errors, return + // nullptr. + return nullptr; + } + return stream->web_transport_stream(); } WebTransportStream* WebTransportHttp3::OpenOutgoingUnidirectionalStream() {
diff --git a/quic/test_tools/quic_test_backend.cc b/quic/test_tools/quic_test_backend.cc index 5defee7..bf7fc66 100644 --- a/quic/test_tools/quic_test_backend.cc +++ b/quic/test_tools/quic_test_backend.cc
@@ -27,7 +27,18 @@ void OnSessionReady() override {} void OnIncomingBidirectionalStreamAvailable() override { - // TODO(vasilvv): implement once bidirectional streams are supported. + while (true) { + WebTransportStream* stream = + session_->AcceptIncomingBidirectionalStream(); + if (stream == nullptr) { + return; + } + QUIC_DVLOG(1) << "EchoWebTransportServer received a bidirectional stream " + << stream->GetStreamId(); + stream->SetVisitor( + std::make_unique<WebTransportBidirectionalEchoVisitor>(stream)); + stream->visitor()->OnCanRead(); + } } void OnIncomingUnidirectionalStreamAvailable() override {