Implement RESET_STREAM/STOP_SENDING in WebTransport.
This adds an API to QuicStream to send RESET_STREAM or STOP_SENDING individually, as that is required by the WebTransport API.
PiperOrigin-RevId: 400840270
diff --git a/quic/core/http/end_to_end_test.cc b/quic/core/http/end_to_end_test.cc
index eb1f66c..0964eff 100644
--- a/quic/core/http/end_to_end_test.cc
+++ b/quic/core/http/end_to_end_test.cc
@@ -80,7 +80,7 @@
using ::testing::Assign;
using ::testing::Invoke;
using ::testing::NiceMock;
-using testing::NotNull;
+using ::testing::UnorderedElementsAreArray;
namespace quic {
namespace test {
@@ -728,7 +728,15 @@
std::string ReadDataFromWebTransportStreamUntilFin(
WebTransportStream* stream, MockStreamVisitor* visitor = nullptr) {
+ QuicStreamId id = stream->GetStreamId();
std::string buffer;
+
+ // Try reading data if immediately available.
+ WebTransportStream::ReadResult result = stream->Read(&buffer);
+ if (result.fin) {
+ return buffer;
+ }
+
while (true) {
bool can_read = false;
if (visitor == nullptr) {
@@ -739,12 +747,17 @@
EXPECT_CALL(*visitor, OnCanRead()).WillOnce(Assign(&can_read, true));
client_->WaitUntil(5000 /*ms*/, [&can_read]() { return can_read; });
if (!can_read) {
- ADD_FAILURE() << "Waiting for readable data on stream "
- << stream->GetStreamId() << " timed out";
+ ADD_FAILURE() << "Waiting for readable data on stream " << id
+ << " timed out";
+ return buffer;
+ }
+ if (GetClientSession()->GetOrCreateSpdyDataStream(id) == nullptr) {
+ ADD_FAILURE() << "Stream " << id
+ << " was deleted while waiting for incoming data";
return buffer;
}
- WebTransportStream::ReadResult result = stream->Read(&buffer);
+ result = stream->Read(&buffer);
if (result.fin) {
return buffer;
}
@@ -756,6 +769,19 @@
}
}
+ void ReadAllIncomingWebTransportUnidirectionalStreams(
+ WebTransportSession* session) {
+ while (true) {
+ WebTransportStream* received_stream =
+ session->AcceptIncomingUnidirectionalStream();
+ if (received_stream == nullptr) {
+ break;
+ }
+ received_webtransport_unidirectional_streams_.push_back(
+ ReadDataFromWebTransportStreamUntilFin(received_stream));
+ }
+ }
+
void WaitForNewConnectionIds() {
// Wait until a new server CID is available for another migration.
const auto* client_connection = GetClientConnection();
@@ -796,6 +822,7 @@
int override_client_connection_id_length_ = -1;
uint8_t expected_server_connection_id_length_;
bool enable_web_transport_ = false;
+ std::vector<std::string> received_webtransport_unidirectional_streams_;
};
// Run all end to end tests with all supported versions.
@@ -6355,6 +6382,62 @@
EXPECT_TRUE(spdy_stream == nullptr);
}
+TEST_P(EndToEndTest, WebTransportSessionStreamTermination) {
+ enable_web_transport_ = true;
+ ASSERT_TRUE(Initialize());
+
+ if (!version_.UsesHttp3()) {
+ return;
+ }
+
+ WebTransportHttp3* session =
+ CreateWebTransportSession("/resets", /*wait_for_server_response=*/true);
+ ASSERT_TRUE(session != nullptr);
+
+ NiceMock<MockClientVisitor>& visitor = SetupWebTransportVisitor(session);
+ EXPECT_CALL(visitor, OnIncomingUnidirectionalStreamAvailable())
+ .WillRepeatedly([this, session]() {
+ ReadAllIncomingWebTransportUnidirectionalStreams(session);
+ });
+
+ WebTransportStream* stream = session->OpenOutgoingBidirectionalStream();
+ QuicStreamId id1 = stream->GetStreamId();
+ ASSERT_TRUE(stream != nullptr);
+ EXPECT_TRUE(stream->Write("test"));
+ stream->ResetWithUserCode(42);
+
+ // This read fails if the stream is closed in both directions, since that
+ // results in stream object being deleted.
+ std::string received_data = ReadDataFromWebTransportStreamUntilFin(stream);
+ EXPECT_LE(received_data.size(), 4u);
+
+ stream = session->OpenOutgoingBidirectionalStream();
+ QuicStreamId id2 = stream->GetStreamId();
+ ASSERT_TRUE(stream != nullptr);
+ EXPECT_TRUE(stream->Write("test"));
+ stream->SendStopSending(24);
+
+ std::array<std::string, 2> expected_log = {
+ absl::StrCat("Received reset for stream ", id1, " with error code 42"),
+ absl::StrCat("Received stop sending for stream ", id2,
+ " with error code 24"),
+ };
+ client_->WaitUntil(2000, [this, &expected_log]() {
+ return received_webtransport_unidirectional_streams_.size() >=
+ expected_log.size();
+ });
+ EXPECT_THAT(received_webtransport_unidirectional_streams_,
+ UnorderedElementsAreArray(expected_log));
+
+ // Since we closed the read side, cleanly closing the write side should result
+ // in the stream getting deleted.
+ ASSERT_TRUE(GetClientSession()->GetOrCreateSpdyDataStream(id2) != nullptr);
+ EXPECT_TRUE(stream->SendFin());
+ EXPECT_TRUE(client_->WaitUntil(2000, [this, id2]() {
+ return GetClientSession()->GetOrCreateSpdyDataStream(id2) == nullptr;
+ }));
+}
+
} // namespace
} // namespace test
} // namespace quic
diff --git a/quic/core/http/quic_spdy_stream.h b/quic/core/http/quic_spdy_stream.h
index ef7cd59..6a9f71f 100644
--- a/quic/core/http/quic_spdy_stream.h
+++ b/quic/core/http/quic_spdy_stream.h
@@ -24,6 +24,7 @@
#include "quic/core/http/http_encoder.h"
#include "quic/core/http/quic_header_list.h"
#include "quic/core/http/quic_spdy_stream_body_manager.h"
+#include "quic/core/http/web_transport_stream_adapter.h"
#include "quic/core/qpack/qpack_decoded_headers_accumulator.h"
#include "quic/core/quic_error_codes.h"
#include "quic/core/quic_packets.h"
@@ -31,7 +32,6 @@
#include "quic/core/quic_stream_sequencer.h"
#include "quic/core/quic_types.h"
#include "quic/core/web_transport_interface.h"
-#include "quic/core/web_transport_stream_adapter.h"
#include "quic/platform/api/quic_export.h"
#include "quic/platform/api/quic_flags.h"
#include "quic/platform/api/quic_socket_address.h"
diff --git a/quic/core/http/web_transport_http3.h b/quic/core/http/web_transport_http3.h
index ddcdd54..cd715a1 100644
--- a/quic/core/http/web_transport_http3.h
+++ b/quic/core/http/web_transport_http3.h
@@ -11,11 +11,11 @@
#include "absl/container/flat_hash_set.h"
#include "absl/types/optional.h"
#include "quic/core/http/quic_spdy_session.h"
+#include "quic/core/http/web_transport_stream_adapter.h"
#include "quic/core/quic_error_codes.h"
#include "quic/core/quic_stream.h"
#include "quic/core/quic_types.h"
#include "quic/core/web_transport_interface.h"
-#include "quic/core/web_transport_stream_adapter.h"
#include "spdy/core/spdy_header_block.h"
namespace quic {
diff --git a/quic/core/web_transport_stream_adapter.cc b/quic/core/http/web_transport_stream_adapter.cc
similarity index 86%
rename from quic/core/web_transport_stream_adapter.cc
rename to quic/core/http/web_transport_stream_adapter.cc
index 171a3c0..bea1377 100644
--- a/quic/core/web_transport_stream_adapter.cc
+++ b/quic/core/http/web_transport_stream_adapter.cc
@@ -2,7 +2,10 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
-#include "quic/core/web_transport_stream_adapter.h"
+#include "quic/core/http/web_transport_stream_adapter.h"
+
+#include "quic/core/http/web_transport_http3.h"
+#include "quic/core/quic_error_codes.h"
namespace quic {
@@ -110,4 +113,15 @@
}
}
+void WebTransportStreamAdapter::ResetWithUserCode(
+ WebTransportStreamError error) {
+ stream_->ResetWriteSide(QuicResetStreamError(
+ QUIC_STREAM_CANCELLED, WebTransportErrorToHttp3(error)));
+}
+
+void WebTransportStreamAdapter::SendStopSending(WebTransportStreamError error) {
+ stream_->SendStopSending(QuicResetStreamError(
+ QUIC_STREAM_CANCELLED, WebTransportErrorToHttp3(error)));
+}
+
} // namespace quic
diff --git a/quic/core/web_transport_stream_adapter.h b/quic/core/http/web_transport_stream_adapter.h
similarity index 93%
rename from quic/core/web_transport_stream_adapter.h
rename to quic/core/http/web_transport_stream_adapter.h
index 2e4704b..a761f6f 100644
--- a/quic/core/web_transport_stream_adapter.h
+++ b/quic/core/http/web_transport_stream_adapter.h
@@ -8,6 +8,7 @@
#include "quic/core/quic_session.h"
#include "quic/core/quic_stream.h"
#include "quic/core/quic_stream_sequencer.h"
+#include "quic/core/quic_types.h"
#include "quic/core/web_transport_interface.h"
namespace quic {
@@ -34,12 +35,11 @@
}
QuicStreamId GetStreamId() const override { return stream_->id(); }
- void ResetWithUserCode(QuicRstStreamErrorCode error) override {
- stream_->Reset(error);
- }
+ void ResetWithUserCode(WebTransportStreamError error) override;
void ResetDueToInternalError() override {
stream_->Reset(QUIC_STREAM_INTERNAL_ERROR);
}
+ void SendStopSending(WebTransportStreamError error) override;
void MaybeResetDueToStreamObjectGone() override {
if (stream_->write_side_closed() && stream_->read_side_closed()) {
return;
diff --git a/quic/core/quic_stream.cc b/quic/core/quic_stream.cc
index 74a5d45..f055bdf 100644
--- a/quic/core/quic_stream.cc
+++ b/quic/core/quic_stream.cc
@@ -609,7 +609,24 @@
if (read_side_closed_ && write_side_closed_ && !IsWaitingForAcks()) {
session()->MaybeCloseZombieStream(id_);
- return;
+ }
+}
+
+void QuicStream::ResetWriteSide(QuicResetStreamError error) {
+ stream_error_ = error;
+ MaybeSendRstStream(error);
+
+ if (read_side_closed_ && write_side_closed_ && !IsWaitingForAcks()) {
+ session()->MaybeCloseZombieStream(id_);
+ }
+}
+
+void QuicStream::SendStopSending(QuicResetStreamError error) {
+ stream_error_ = error;
+ MaybeSendStopSending(error);
+
+ if (read_side_closed_ && write_side_closed_ && !IsWaitingForAcks()) {
+ session()->MaybeCloseZombieStream(id_);
}
}
diff --git a/quic/core/quic_stream.h b/quic/core/quic_stream.h
index f2485b9..dc129a2 100644
--- a/quic/core/quic_stream.h
+++ b/quic/core/quic_stream.h
@@ -180,6 +180,11 @@
// interface.
void Reset(QuicRstStreamErrorCode error);
+ // Reset() sends both RESET_STREAM and STOP_SENDING; the two methods below
+ // allow to send only one of those.
+ void ResetWriteSide(QuicResetStreamError error);
+ void SendStopSending(QuicResetStreamError error);
+
// Called by the subclass or the sequencer to close the entire connection from
// this end.
void OnUnrecoverableError(QuicErrorCode error,
diff --git a/quic/core/web_transport_interface.h b/quic/core/web_transport_interface.h
index 1240291..102fd1f 100644
--- a/quic/core/web_transport_interface.h
+++ b/quic/core/web_transport_interface.h
@@ -75,9 +75,9 @@
virtual QuicStreamId GetStreamId() const = 0;
// Resets the stream with the specified error code.
- // TODO(b/184048994): change the error code type based on IETF consensus.
- virtual void ResetWithUserCode(QuicRstStreamErrorCode error) = 0;
+ virtual void ResetWithUserCode(WebTransportStreamError error) = 0;
virtual void ResetDueToInternalError() = 0;
+ virtual void SendStopSending(WebTransportStreamError error) = 0;
// Called when the owning object has been garbage-collected.
virtual void MaybeResetDueToStreamObjectGone() = 0;
diff --git a/quic/quic_transport/quic_transport_stream.h b/quic/quic_transport/quic_transport_stream.h
index 87ef5fb..5d574af 100644
--- a/quic/quic_transport/quic_transport_stream.h
+++ b/quic/quic_transport/quic_transport_stream.h
@@ -10,11 +10,11 @@
#include "absl/base/attributes.h"
#include "absl/strings/string_view.h"
+#include "quic/core/http/web_transport_stream_adapter.h"
#include "quic/core/quic_session.h"
#include "quic/core/quic_stream.h"
#include "quic/core/quic_types.h"
#include "quic/core/web_transport_interface.h"
-#include "quic/core/web_transport_stream_adapter.h"
#include "quic/quic_transport/quic_transport_session_interface.h"
namespace quic {
@@ -50,12 +50,15 @@
QuicStreamId GetStreamId() const override { return id(); }
- void ResetWithUserCode(QuicRstStreamErrorCode error) override {
- adapter_.ResetWithUserCode(error);
+ void ResetWithUserCode(WebTransportStreamError /*error*/) override {
+ adapter_.ResetWithUserCode(0);
}
void ResetDueToInternalError() override {
adapter_.ResetDueToInternalError();
}
+ void SendStopSending(WebTransportStreamError /*error*/) override {
+ adapter_.SendStopSending(0);
+ }
void MaybeResetDueToStreamObjectGone() override {
adapter_.MaybeResetDueToStreamObjectGone();
}
diff --git a/quic/test_tools/quic_test_backend.cc b/quic/test_tools/quic_test_backend.cc
index 66d6769..e0d0a04 100644
--- a/quic/test_tools/quic_test_backend.cc
+++ b/quic/test_tools/quic_test_backend.cc
@@ -14,6 +14,7 @@
#include "quic/core/quic_simple_buffer_allocator.h"
#include "quic/core/web_transport_interface.h"
#include "quic/platform/api/quic_mem_slice.h"
+#include "quic/test_tools/web_transport_resets_backend.h"
#include "quic/tools/web_transport_test_visitors.h"
namespace quic {
@@ -58,6 +59,9 @@
std::make_unique<EchoWebTransportSessionVisitor>(session);
return response;
}
+ if (path == "/resets") {
+ return WebTransportResetsBackend(request_headers, session);
+ }
WebTransportResponse response;
response.response_headers[":status"] = "404";
diff --git a/quic/test_tools/web_transport_resets_backend.cc b/quic/test_tools/web_transport_resets_backend.cc
new file mode 100644
index 0000000..512619f
--- /dev/null
+++ b/quic/test_tools/web_transport_resets_backend.cc
@@ -0,0 +1,113 @@
+// Copyright (c) 2021 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quic/test_tools/web_transport_resets_backend.h"
+
+#include <memory>
+
+#include "quic/core/web_transport_interface.h"
+#include "quic/tools/web_transport_test_visitors.h"
+#include "common/quiche_circular_deque.h"
+
+namespace quic {
+namespace test {
+
+namespace {
+
+class ResetsVisitor;
+
+class BidirectionalEchoVisitorWithLogging
+ : public WebTransportBidirectionalEchoVisitor {
+ public:
+ BidirectionalEchoVisitorWithLogging(WebTransportStream* stream,
+ ResetsVisitor* session_visitor)
+ : WebTransportBidirectionalEchoVisitor(stream),
+ session_visitor_(session_visitor) {}
+
+ void OnResetStreamReceived(WebTransportStreamError error) override;
+ void OnStopSendingReceived(WebTransportStreamError error) override;
+
+ private:
+ ResetsVisitor* session_visitor_; // Not owned.
+};
+
+class ResetsVisitor : public WebTransportVisitor {
+ public:
+ ResetsVisitor(WebTransportSession* session) : session_(session) {}
+
+ void OnSessionReady(const spdy::SpdyHeaderBlock& /*headers*/) override {}
+ void OnSessionClosed(WebTransportSessionError /*error_code*/,
+ const std::string& /*error_message*/) override {}
+
+ void OnIncomingBidirectionalStreamAvailable() override {
+ while (true) {
+ WebTransportStream* stream =
+ session_->AcceptIncomingBidirectionalStream();
+ if (stream == nullptr) {
+ return;
+ }
+ stream->SetVisitor(
+ std::make_unique<BidirectionalEchoVisitorWithLogging>(stream, this));
+ stream->visitor()->OnCanRead();
+ }
+ }
+ void OnIncomingUnidirectionalStreamAvailable() override {}
+
+ void OnDatagramReceived(absl::string_view /*datagram*/) override {}
+
+ void OnCanCreateNewOutgoingBidirectionalStream() override {}
+ void OnCanCreateNewOutgoingUnidirectionalStream() override {
+ MaybeSendLogsBack();
+ }
+
+ void Log(std::string line) {
+ log_.push_back(std::move(line));
+ MaybeSendLogsBack();
+ }
+
+ private:
+ void MaybeSendLogsBack() {
+ while (!log_.empty() &&
+ session_->CanOpenNextOutgoingUnidirectionalStream()) {
+ WebTransportStream* stream = session_->OpenOutgoingUnidirectionalStream();
+ stream->SetVisitor(
+ std::make_unique<WebTransportUnidirectionalEchoWriteVisitor>(
+ stream, log_.front()));
+ log_.pop_front();
+ stream->visitor()->OnCanWrite();
+ }
+ }
+
+ WebTransportSession* session_; // Not owned.
+ quiche::QuicheCircularDeque<std::string> log_;
+};
+
+void BidirectionalEchoVisitorWithLogging::OnResetStreamReceived(
+ WebTransportStreamError error) {
+ session_visitor_->Log(absl::StrCat("Received reset for stream ",
+ stream()->GetStreamId(),
+ " with error code ", error));
+ WebTransportBidirectionalEchoVisitor::OnResetStreamReceived(error);
+}
+void BidirectionalEchoVisitorWithLogging::OnStopSendingReceived(
+ WebTransportStreamError error) {
+ session_visitor_->Log(absl::StrCat("Received stop sending for stream ",
+ stream()->GetStreamId(),
+ " with error code ", error));
+ WebTransportBidirectionalEchoVisitor::OnStopSendingReceived(error);
+}
+
+} // namespace
+
+QuicSimpleServerBackend::WebTransportResponse WebTransportResetsBackend(
+ const spdy::Http2HeaderBlock& /*request_headers*/,
+ WebTransportSession* session) {
+ QuicSimpleServerBackend::WebTransportResponse response;
+ response.response_headers[":status"] = "200";
+ response.visitor = std::make_unique<ResetsVisitor>(session);
+ return response;
+}
+
+} // namespace test
+} // namespace quic
diff --git a/quic/test_tools/web_transport_resets_backend.h b/quic/test_tools/web_transport_resets_backend.h
new file mode 100644
index 0000000..dda06be
--- /dev/null
+++ b/quic/test_tools/web_transport_resets_backend.h
@@ -0,0 +1,23 @@
+// Copyright (c) 2021 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_TEST_TOOLS_WEB_TRANSPORT_RESETS_BACKEND_H_
+#define QUICHE_QUIC_TEST_TOOLS_WEB_TRANSPORT_RESETS_BACKEND_H_
+
+#include "quic/test_tools/quic_test_backend.h"
+
+namespace quic {
+namespace test {
+
+// A backend for testing RESET_STREAM/STOP_SENDING behavior. Provides
+// bidirectional echo streams; whenever one of those receives RESET_STREAM or
+// STOP_SENDING, a log message is sent as a unidirectional stream.
+QuicSimpleServerBackend::WebTransportResponse WebTransportResetsBackend(
+ const spdy::Http2HeaderBlock& request_headers,
+ WebTransportSession* session);
+
+} // namespace test
+} // namespace quic
+
+#endif // QUICHE_QUIC_TEST_TOOLS_WEB_TRANSPORT_RESETS_BACKEND_H_
diff --git a/quic/tools/web_transport_test_visitors.h b/quic/tools/web_transport_test_visitors.h
index 3bc6471..b1f2246 100644
--- a/quic/tools/web_transport_test_visitors.h
+++ b/quic/tools/web_transport_test_visitors.h
@@ -55,6 +55,10 @@
}
void OnCanWrite() override {
+ if (stop_sending_received_) {
+ return;
+ }
+
if (!buffer_.empty()) {
bool success = stream_->Write(buffer_);
QUIC_DVLOG(1) << "Attempted writing on WebTransport bidirectional stream "
@@ -73,14 +77,26 @@
}
}
- void OnResetStreamReceived(WebTransportStreamError /*error*/) override {}
- void OnStopSendingReceived(WebTransportStreamError /*error*/) override {}
+ void OnResetStreamReceived(WebTransportStreamError /*error*/) override {
+ // Send FIN in response to a stream reset. We want to test that we can
+ // operate one side of the stream cleanly while the other is reset, thus
+ // replying with a FIN rather than a RESET_STREAM is more appropriate here.
+ send_fin_ = true;
+ OnCanWrite();
+ }
+ void OnStopSendingReceived(WebTransportStreamError /*error*/) override {
+ stop_sending_received_ = true;
+ }
void OnWriteSideInDataRecvdState() override {}
+ protected:
+ WebTransportStream* stream() { return stream_; }
+
private:
WebTransportStream* stream_;
std::string buffer_;
bool send_fin_ = false;
+ bool stop_sending_received_ = false;
};
// Buffers all of the data and calls |callback| with the entirety of the stream