Implement sending and receiving DRAIN_WEBTRANSPORT_SESSION
PiperOrigin-RevId: 540692201
diff --git a/quiche/common/capsule.cc b/quiche/common/capsule.cc
index 4c5ad5a..66d4a66 100644
--- a/quiche/common/capsule.cc
+++ b/quiche/common/capsule.cc
@@ -36,6 +36,8 @@
return "LEGACY_DATAGRAM_WITHOUT_CONTEXT";
case CapsuleType::CLOSE_WEBTRANSPORT_SESSION:
return "CLOSE_WEBTRANSPORT_SESSION";
+ case CapsuleType::DRAIN_WEBTRANSPORT_SESSION:
+ return "DRAIN_WEBTRANSPORT_SESSION";
case CapsuleType::ADDRESS_REQUEST:
return "ADDRESS_REQUEST";
case CapsuleType::ADDRESS_ASSIGN:
@@ -129,6 +131,10 @@
",error_message=\"", error_message, "\")");
}
+std::string DrainWebTransportSessionCapsule::ToString() const {
+ return "DRAIN_WEBTRANSPORT_SESSION()";
+}
+
std::string AddressRequestCapsule::ToString() const {
std::string rv = "ADDRESS_REQUEST[";
for (auto requested_address : requested_addresses) {
@@ -293,6 +299,8 @@
WireUint32(capsule.close_web_transport_session_capsule().error_code),
WireBytes(
capsule.close_web_transport_session_capsule().error_message));
+ case CapsuleType::DRAIN_WEBTRANSPORT_SESSION:
+ return SerializeCapsuleFields(capsule.capsule_type(), allocator);
case CapsuleType::ADDRESS_REQUEST:
return SerializeCapsuleFields(
capsule.capsule_type(), allocator,
@@ -414,6 +422,8 @@
capsule.error_message = reader.ReadRemainingPayload();
return Capsule(std::move(capsule));
}
+ case CapsuleType::DRAIN_WEBTRANSPORT_SESSION:
+ return Capsule(DrainWebTransportSessionCapsule());
case CapsuleType::ADDRESS_REQUEST: {
AddressRequestCapsule capsule;
while (!reader.IsDoneReading()) {
diff --git a/quiche/common/capsule.h b/quiche/common/capsule.h
index 08bde5b..7220ee4 100644
--- a/quiche/common/capsule.h
+++ b/quiche/common/capsule.h
@@ -13,6 +13,7 @@
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "absl/types/variant.h"
+#include "quiche/common/platform/api/quiche_export.h"
#include "quiche/common/platform/api/quiche_logging.h"
#include "quiche/common/quiche_buffer_allocator.h"
#include "quiche/common/quiche_ip_address.h"
@@ -29,6 +30,7 @@
// <https://datatracker.ietf.org/doc/draft-ietf-webtrans-http3/>
CLOSE_WEBTRANSPORT_SESSION = 0x2843,
+ DRAIN_WEBTRANSPORT_SESSION = 0x78ae,
// draft-ietf-masque-connect-ip-03.
ADDRESS_ASSIGN = 0x1ECA6A00,
@@ -106,6 +108,13 @@
error_message == other.error_message;
}
};
+struct QUICHE_EXPORT DrainWebTransportSessionCapsule {
+ std::string ToString() const;
+ CapsuleType capsule_type() const {
+ return CapsuleType::DRAIN_WEBTRANSPORT_SESSION;
+ }
+ bool operator==(const DrainWebTransportSessionCapsule&) const { return true; }
+};
// MASQUE CONNECT-IP.
struct QUICHE_EXPORT PrefixWithId {
@@ -320,7 +329,8 @@
private:
absl::variant<DatagramCapsule, LegacyDatagramCapsule,
LegacyDatagramWithoutContextCapsule,
- CloseWebTransportSessionCapsule, AddressRequestCapsule,
+ CloseWebTransportSessionCapsule,
+ DrainWebTransportSessionCapsule, AddressRequestCapsule,
AddressAssignCapsule, RouteAdvertisementCapsule,
WebTransportStreamDataCapsule, WebTransportResetStreamCapsule,
WebTransportStopSendingCapsule, WebTransportMaxStreamsCapsule,
diff --git a/quiche/common/capsule_test.cc b/quiche/common/capsule_test.cc
index 5ed4d1a..ae55aaf 100644
--- a/quiche/common/capsule_test.cc
+++ b/quiche/common/capsule_test.cc
@@ -136,6 +136,20 @@
TestSerialization(expected_capsule, capsule_fragment);
}
+TEST_F(CapsuleTest, DrainWebTransportStreamCapsule) {
+ std::string capsule_fragment = absl::HexStringToBytes(
+ "800078ae" // DRAIN_WEBTRANSPORT_STREAM capsule type
+ "00" // capsule length
+ );
+ Capsule expected_capsule = Capsule(DrainWebTransportSessionCapsule());
+ {
+ EXPECT_CALL(visitor_, OnCapsule(expected_capsule));
+ ASSERT_TRUE(capsule_parser_.IngestCapsuleFragment(capsule_fragment));
+ }
+ ValidateParserIsEmpty();
+ TestSerialization(expected_capsule, capsule_fragment);
+}
+
TEST_F(CapsuleTest, AddressAssignCapsule) {
std::string capsule_fragment = absl::HexStringToBytes(
"9ECA6A00" // ADDRESS_ASSIGN capsule type
diff --git a/quiche/common/wire_serialization.h b/quiche/common/wire_serialization.h
index 89a54f2..7cc2596 100644
--- a/quiche/common/wire_serialization.h
+++ b/quiche/common/wire_serialization.h
@@ -347,6 +347,10 @@
QUICHE_RETURN_IF_ERROR(SerializeIntoWriterCore(writer, argno, data1));
return SerializeIntoWriterCore(writer, argno + 1, rest...);
}
+
+inline absl::Status SerializeIntoWriterCore(QuicheDataWriter&, int) {
+ return absl::OkStatus();
+}
} // namespace wire_serialization_internal
// SerializeIntoWriter(writer, d1, d2, ... dN) serializes all of supplied data
@@ -369,6 +373,7 @@
size_t ComputeLengthOnWire(T1 data1, Ts... rest) {
return data1.GetLengthOnWire() + ComputeLengthOnWire(rest...);
}
+inline size_t ComputeLengthOnWire() { return 0; }
// SerializeIntoBuffer(allocator, d1, d2, ... dN) computes the length required
// to store the supplied data, allocates the buffer of appropriate size using
diff --git a/quiche/common/wire_serialization_test.cc b/quiche/common/wire_serialization_test.cc
index b1dea91..9abdda9 100644
--- a/quiche/common/wire_serialization_test.cc
+++ b/quiche/common/wire_serialization_test.cc
@@ -252,5 +252,7 @@
#endif
}
+TEST(SerializationTest, Empty) { ExpectEncodingHex("nothing", ""); }
+
} // namespace
} // namespace quiche::test
diff --git a/quiche/quic/core/http/end_to_end_test.cc b/quiche/quic/core/http/end_to_end_test.cc
index 6404559..52efb1a 100644
--- a/quiche/quic/core/http/end_to_end_test.cc
+++ b/quiche/quic/core/http/end_to_end_test.cc
@@ -6804,6 +6804,29 @@
EXPECT_TRUE(spdy_stream == nullptr);
}
+TEST_P(EndToEndTest, WebTransportSessionReceiveDrain) {
+ enable_web_transport_ = true;
+ ASSERT_TRUE(Initialize());
+
+ if (!version_.UsesHttp3()) {
+ return;
+ }
+
+ WebTransportHttp3* session = CreateWebTransportSession(
+ "/session-close", /*wait_for_server_response=*/true);
+ ASSERT_TRUE(session != nullptr);
+
+ WebTransportStream* stream = session->OpenOutgoingUnidirectionalStream();
+ ASSERT_TRUE(stream != nullptr);
+ QUICHE_EXPECT_OK(quiche::WriteIntoStream(*stream, "DRAIN"));
+ EXPECT_TRUE(stream->SendFin());
+
+ bool drain_received = false;
+ session->SetOnDraining([&drain_received] { drain_received = true; });
+ client_->WaitUntil(2000, [&]() { return drain_received; });
+ EXPECT_TRUE(drain_received);
+}
+
TEST_P(EndToEndTest, WebTransportSessionStreamTermination) {
enable_web_transport_ = true;
ASSERT_TRUE(Initialize());
diff --git a/quiche/quic/core/http/quic_spdy_stream.cc b/quiche/quic/core/http/quic_spdy_stream.cc
index 9a479f3..24a0a16 100644
--- a/quiche/quic/core/http/quic_spdy_stream.cc
+++ b/quiche/quic/core/http/quic_spdy_stream.cc
@@ -1397,6 +1397,14 @@
capsule.close_web_transport_session_capsule().error_code,
capsule.close_web_transport_session_capsule().error_message);
return true;
+ case CapsuleType::DRAIN_WEBTRANSPORT_SESSION:
+ if (web_transport_ == nullptr) {
+ QUIC_DLOG(ERROR) << ENDPOINT << "Received capsule " << capsule
+ << " for a non-WebTransport stream.";
+ return false;
+ }
+ web_transport_->OnDrainSessionReceived();
+ return true;
case CapsuleType::ADDRESS_ASSIGN:
if (connect_ip_visitor_ == nullptr) {
return true;
diff --git a/quiche/quic/core/http/web_transport_http3.cc b/quiche/quic/core/http/web_transport_http3.cc
index 6e55cdb..6db4eae 100644
--- a/quiche/quic/core/http/web_transport_http3.cc
+++ b/quiche/quic/core/http/web_transport_http3.cc
@@ -276,6 +276,14 @@
connect_stream_->SetMaxDatagramTimeInQueue(QuicTimeDelta(max_time_in_queue));
}
+void WebTransportHttp3::NotifySessionDraining() {
+ if (!drain_sent_) {
+ connect_stream_->WriteCapsule(
+ quiche::Capsule(quiche::DrainWebTransportSessionCapsule()));
+ drain_sent_ = true;
+ }
+}
+
void WebTransportHttp3::OnHttp3Datagram(QuicStreamId stream_id,
absl::string_view payload) {
QUICHE_DCHECK_EQ(stream_id, connect_stream_->id());
@@ -297,6 +305,8 @@
}
}
+void WebTransportHttp3::OnDrainSessionReceived() { OnGoAwayReceived(); }
+
WebTransportHttp3UnidirectionalStream::WebTransportHttp3UnidirectionalStream(
PendingStream* pending, QuicSpdySession* session)
: QuicStream(pending, session, /*is_static=*/false),
diff --git a/quiche/quic/core/http/web_transport_http3.h b/quiche/quic/core/http/web_transport_http3.h
index eb54c06..e74335e 100644
--- a/quiche/quic/core/http/web_transport_http3.h
+++ b/quiche/quic/core/http/web_transport_http3.h
@@ -90,6 +90,7 @@
QuicByteCount GetMaxDatagramSize() const override;
void SetDatagramMaxTimeInQueue(absl::Duration max_time_in_queue) override;
+ void NotifySessionDraining() override;
void SetOnDraining(quiche::SingleUseCallback<void()> callback) override {
drain_callback_ = std::move(callback);
}
@@ -106,6 +107,7 @@
}
void OnGoAwayReceived();
+ void OnDrainSessionReceived();
private:
// Notifies the visitor that the connection has been closed. Ensures that the
@@ -130,6 +132,7 @@
WebTransportHttp3RejectionReason rejection_reason_ =
WebTransportHttp3RejectionReason::kNone;
+ bool drain_sent_ = false;
// Those are set to default values, which are used if the session is not
// closed cleanly using an appropriate capsule.
WebTransportSessionError error_code_ = 0;
diff --git a/quiche/quic/test_tools/quic_test_backend.cc b/quiche/quic/test_tools/quic_test_backend.cc
index be8068b..4383f23 100644
--- a/quiche/quic/test_tools/quic_test_backend.cc
+++ b/quiche/quic/test_tools/quic_test_backend.cc
@@ -23,7 +23,8 @@
// sends a unidirectional stream of format "code message" to this endpoint, it
// will close the session with the corresponding error code and error message.
// For instance, sending "42 test error" will cause it to be closed with code 42
-// and message "test error".
+// and message "test error". As a special case, sending "DRAIN" would result in
+// a DRAIN_WEBTRANSPORT_SESSION capsule being sent.
class SessionCloseVisitor : public WebTransportVisitor {
public:
SessionCloseVisitor(WebTransportSession* session) : session_(session) {}
@@ -41,6 +42,10 @@
stream->SetVisitor(
std::make_unique<WebTransportUnidirectionalEchoReadVisitor>(
stream, [this](const std::string& data) {
+ if (data == "DRAIN") {
+ session_->NotifySessionDraining();
+ return;
+ }
std::pair<absl::string_view, absl::string_view> parsed =
absl::StrSplit(data, absl::MaxSplits(' ', 1));
WebTransportSessionError error_code = 0;
diff --git a/quiche/web_transport/test_tools/mock_web_transport.h b/quiche/web_transport/test_tools/mock_web_transport.h
index 2614581..e102a08 100644
--- a/quiche/web_transport/test_tools/mock_web_transport.h
+++ b/quiche/web_transport/test_tools/mock_web_transport.h
@@ -75,6 +75,7 @@
MOCK_METHOD(uint64_t, GetMaxDatagramSize, (), (const, override));
MOCK_METHOD(void, SetDatagramMaxTimeInQueue,
(absl::Duration max_time_in_queue), (override));
+ MOCK_METHOD(void, NotifySessionDraining, (), (override));
MOCK_METHOD(void, SetOnDraining, (quiche::SingleUseCallback<void()>),
(override));
};
diff --git a/quiche/web_transport/web_transport.h b/quiche/web_transport/web_transport.h
index 20598f1..6cb7811 100644
--- a/quiche/web_transport/web_transport.h
+++ b/quiche/web_transport/web_transport.h
@@ -215,6 +215,9 @@
// being silently dropped.
virtual void SetDatagramMaxTimeInQueue(absl::Duration max_time_in_queue) = 0;
+ // Sends a DRAIN_WEBTRANSPORT_SESSION capsule or an equivalent signal to the
+ // peer indicating that the session is draining.
+ virtual void NotifySessionDraining() = 0;
// Notifies that either the session itself (DRAIN_WEBTRANSPORT_SESSION
// capsule), or the underlying connection (HTTP GOAWAY) is being drained by
// the peer.