Clean up MoQT error cases:
- Added error codes from the spec.
- Reject opening of second bidirectional stream.
PiperOrigin-RevId: 601451814
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index b17a40f..adb4689 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -44,7 +44,7 @@
webtransport::Stream* control_stream =
session_->OpenOutgoingBidirectionalStream();
if (control_stream == nullptr) {
- Error("Unable to open a control stream");
+ Error(kGenericError, "Unable to open a control stream");
return;
}
control_stream->SetVisitor(std::make_unique<Stream>(
@@ -60,7 +60,7 @@
quiche::QuicheBuffer serialized_setup = framer_.SerializeClientSetup(setup);
bool success = control_stream->Write(serialized_setup.AsStringView());
if (!success) {
- Error("Failed to write client SETUP message");
+ Error(kGenericError, "Failed to write client SETUP message");
return;
}
QUIC_DLOG(INFO) << ENDPOINT << "Send the SETUP message";
@@ -81,6 +81,10 @@
void MoqtSession::OnIncomingBidirectionalStreamAvailable() {
while (webtransport::Stream* stream =
session_->AcceptIncomingBidirectionalStream()) {
+ if (control_stream_.has_value()) {
+ Error(kProtocolViolation, "Bidirectional stream already open");
+ return;
+ }
stream->SetVisitor(std::make_unique<Stream>(this, stream));
stream->visitor()->OnCanRead();
}
@@ -93,16 +97,15 @@
}
}
-void MoqtSession::Error(absl::string_view error) {
+void MoqtSession::Error(MoqtError code, absl::string_view error) {
if (!error_.empty()) {
// Avoid erroring out twice.
return;
}
- QUICHE_DLOG(INFO) << ENDPOINT
- << "MOQT session closed with message: " << error;
+ QUICHE_DLOG(INFO) << ENDPOINT << "MOQT session closed with code: "
+ << static_cast<int>(code) << " and message: " << error;
error_ = std::string(error);
- // TODO(vasilvv): figure out the error code.
- session_->CloseSession(1, error);
+ session_->CloseSession(code, error);
std::move(session_terminated_callback_)(error);
}
@@ -126,7 +129,7 @@
bool success = session_->GetStreamById(*control_stream_)
->Write(framer_.SerializeAnnounce(message).AsStringView());
if (!success) {
- Error("Failed to write ANNOUNCE message");
+ Error(kGenericError, "Failed to write ANNOUNCE message");
return;
}
QUIC_DLOG(INFO) << ENDPOINT << "Sent ANNOUNCE message for "
@@ -227,7 +230,7 @@
session_->GetStreamById(*control_stream_)
->Write(framer_.SerializeSubscribeRequest(message).AsStringView());
if (!success) {
- Error("Failed to write SUBSCRIBE_REQUEST message");
+ Error(kGenericError, "Failed to write SUBSCRIBE_REQUEST message");
return false;
}
QUIC_DLOG(INFO) << ENDPOINT << "Sent SUBSCRIBE_REQUEST message for "
@@ -316,6 +319,7 @@
webtransport::StreamErrorCode error) {
if (is_control_stream_.has_value() && *is_control_stream_) {
session_->Error(
+ kProtocolViolation,
absl::StrCat("Control stream reset with error code ", error));
}
}
@@ -323,6 +327,7 @@
webtransport::StreamErrorCode error) {
if (is_control_stream_.has_value() && *is_control_stream_) {
session_->Error(
+ kProtocolViolation,
absl::StrCat("Control stream reset with error code ", error));
}
}
@@ -331,7 +336,8 @@
absl::string_view payload,
bool end_of_message) {
if (is_control_stream_ == true) {
- session_->Error("Received OBJECT message on control stream");
+ session_->Error(kProtocolViolation,
+ "Received OBJECT message on control stream");
return;
}
QUICHE_DLOG(INFO) << ENDPOINT << "Received OBJECT message on stream "
@@ -365,7 +371,7 @@
}
if (session_->num_buffered_objects_ >= kMaxBufferedObjects) {
session_->num_buffered_objects_++;
- session_->Error("Too many buffered objects");
+ session_->Error(kGenericError, "Too many buffered objects");
return;
}
queue->push_back(BufferedObject(stream_->GetStreamId(), message, payload,
@@ -387,19 +393,22 @@
void MoqtSession::Stream::OnClientSetupMessage(const MoqtClientSetup& message) {
if (is_control_stream_.has_value()) {
if (!*is_control_stream_) {
- session_->Error("Received SETUP on non-control stream");
+ session_->Error(kProtocolViolation,
+ "Received SETUP on non-control stream");
return;
}
} else {
is_control_stream_ = true;
}
if (perspective() == Perspective::IS_CLIENT) {
- session_->Error("Received CLIENT_SETUP from server");
+ session_->Error(kProtocolViolation, "Received CLIENT_SETUP from server");
return;
}
if (absl::c_find(message.supported_versions, session_->parameters_.version) ==
message.supported_versions.end()) {
- session_->Error(absl::StrCat("Version mismatch: expected 0x",
+ // TODO(martinduke): Is this the right error code? See issue #346.
+ session_->Error(kProtocolViolation,
+ absl::StrCat("Version mismatch: expected 0x",
absl::Hex(session_->parameters_.version)));
return;
}
@@ -411,7 +420,7 @@
bool success = stream_->Write(
session_->framer_.SerializeServerSetup(response).AsStringView());
if (!success) {
- session_->Error("Failed to write server SETUP message");
+ session_->Error(kGenericError, "Failed to write server SETUP message");
return;
}
QUIC_DLOG(INFO) << ENDPOINT << "Sent the SETUP message";
@@ -423,18 +432,21 @@
void MoqtSession::Stream::OnServerSetupMessage(const MoqtServerSetup& message) {
if (is_control_stream_.has_value()) {
if (!*is_control_stream_) {
- session_->Error("Received SETUP on non-control stream");
+ session_->Error(kProtocolViolation,
+ "Received SETUP on non-control stream");
return;
}
} else {
is_control_stream_ = true;
}
if (perspective() == Perspective::IS_SERVER) {
- session_->Error("Received SERVER_SETUP from client");
+ session_->Error(kProtocolViolation, "Received SERVER_SETUP from client");
return;
}
if (message.selected_version != session_->parameters_.version) {
- session_->Error(absl::StrCat("Version mismatch: expected 0x",
+ // TODO(martinduke): Is this the right error code? See issue #346.
+ session_->Error(kProtocolViolation,
+ absl::StrCat("Version mismatch: expected 0x",
absl::Hex(session_->parameters_.version)));
return;
}
@@ -455,7 +467,7 @@
stream_->Write(session_->framer_.SerializeSubscribeError(subscribe_error)
.AsStringView());
if (!success) {
- session_->Error("Failed to write SUBSCRIBE_ERROR message");
+ session_->Error(kGenericError, "Failed to write SUBSCRIBE_ERROR message");
}
}
@@ -503,7 +515,7 @@
bool success = stream_->Write(
session_->framer_.SerializeSubscribeOk(subscribe_ok).AsStringView());
if (!success) {
- session_->Error("Failed to write SUBSCRIBE_OK message");
+ session_->Error(kGenericError, "Failed to write SUBSCRIBE_OK message");
return;
}
QUIC_DLOG(INFO) << ENDPOINT << "Created subscription for "
@@ -521,13 +533,14 @@
return;
}
if (session_->tracks_by_alias_.contains(message.track_id)) {
- session_->Error("Received duplicate track_alias");
+ session_->Error(kDuplicateTrackAlias, "Received duplicate track_alias");
return;
}
auto it = session_->remote_tracks_.find(FullTrackName(
std::string(message.track_namespace), std::string(message.track_name)));
if (it == session_->remote_tracks_.end()) {
- session_->Error("Received SUBSCRIBE_OK for nonexistent subscribe");
+ session_->Error(kProtocolViolation,
+ "Received SUBSCRIBE_OK for nonexistent subscribe");
return;
}
// Note that if there are multiple SUBSCRIBE_OK for the same track,
@@ -573,7 +586,8 @@
auto it = session_->remote_tracks_.find(FullTrackName(
std::string(message.track_namespace), std::string(message.track_name)));
if (it == session_->remote_tracks_.end()) {
- session_->Error("Received SUBSCRIBE_ERROR for nonexistent subscribe");
+ session_->Error(kProtocolViolation,
+ "Received SUBSCRIBE_ERROR for nonexistent subscribe");
return;
}
QUIC_DLOG(INFO) << ENDPOINT << "Received the SUBSCRIBE_ERROR for "
@@ -594,7 +608,7 @@
bool success =
stream_->Write(session_->framer_.SerializeAnnounceOk(ok).AsStringView());
if (!success) {
- session_->Error("Failed to write ANNOUNCE_OK message");
+ session_->Error(kGenericError, "Failed to write ANNOUNCE_OK message");
return;
}
}
@@ -605,7 +619,8 @@
}
auto it = session_->pending_outgoing_announces_.find(message.track_namespace);
if (it == session_->pending_outgoing_announces_.end()) {
- session_->Error("Received ANNOUNCE_OK for nonexistent announce");
+ session_->Error(kProtocolViolation,
+ "Received ANNOUNCE_OK for nonexistent announce");
return;
}
std::move(it->second)(message.track_namespace, std::nullopt);
@@ -619,7 +634,8 @@
}
auto it = session_->pending_outgoing_announces_.find(message.track_namespace);
if (it == session_->pending_outgoing_announces_.end()) {
- session_->Error("Received ANNOUNCE_ERROR for nonexistent announce");
+ session_->Error(kProtocolViolation,
+ "Received ANNOUNCE_ERROR for nonexistent announce");
return;
}
std::move(it->second)(message.track_namespace, message.reason_phrase);
@@ -627,16 +643,18 @@
}
void MoqtSession::Stream::OnParsingError(absl::string_view reason) {
- session_->Error(absl::StrCat("Parse error: ", reason));
+ session_->Error(kProtocolViolation, absl::StrCat("Parse error: ", reason));
}
bool MoqtSession::Stream::CheckIfIsControlStream() {
if (!is_control_stream_.has_value()) {
- session_->Error("Received SUBSCRIBE_REQUEST as first message");
+ session_->Error(kProtocolViolation,
+ "Received SUBSCRIBE_REQUEST as first message");
return false;
}
if (!*is_control_stream_) {
- session_->Error("Received SUBSCRIBE_REQUEST on non-control stream");
+ session_->Error(kProtocolViolation,
+ "Received SUBSCRIBE_REQUEST on non-control stream");
return false;
}
return true;
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index 645372b..ec71181 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -47,6 +47,16 @@
MoqtSessionDeletedCallback session_deleted_callback = +[] {};
};
+enum MoqtError : uint64_t {
+ kNoError = 0x0,
+ kGenericError = 0x1,
+ kUnauthorized = 0x2,
+ kProtocolViolation = 0x3,
+ kDuplicateTrackAlias = 0x4,
+ kParameterLengthMismatch = 0x5,
+ kGoawayTimeout = 0x10,
+};
+
class QUICHE_EXPORT MoqtSession : public webtransport::SessionVisitor {
public:
MoqtSession(webtransport::Session* session, MoqtSessionParameters parameters,
@@ -73,7 +83,7 @@
void OnCanCreateNewOutgoingBidirectionalStream() override {}
void OnCanCreateNewOutgoingUnidirectionalStream() override {}
- void Error(absl::string_view error);
+ void Error(MoqtError code, absl::string_view error);
quic::Perspective perspective() const { return parameters_.perspective; }
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index 4e07a7e..34e7fc3 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -235,12 +235,13 @@
TEST_F(MoqtSessionTest, Error) {
bool reported_error = false;
- EXPECT_CALL(mock_session_, CloseSession(1, "foo")).Times(1);
+ EXPECT_CALL(mock_session_, CloseSession(kParameterLengthMismatch, "foo"))
+ .Times(1);
EXPECT_CALL(session_callbacks_.session_terminated_callback, Call(_))
.WillOnce([&](absl::string_view error_message) {
reported_error = (error_message == "foo");
});
- session_.Error("foo");
+ session_.Error(kParameterLengthMismatch, "foo");
EXPECT_TRUE(reported_error);
}
@@ -673,6 +674,88 @@
EXPECT_EQ(next_seq.object, 1);
}
+TEST_F(MoqtSessionTest, OneBidirectionalStreamClient) {
+ StrictMock<webtransport::test::MockStream> mock_stream;
+ EXPECT_CALL(mock_session_, OpenOutgoingBidirectionalStream())
+ .WillOnce(Return(&mock_stream));
+ std::unique_ptr<webtransport::StreamVisitor> visitor;
+ // Save a reference to MoqtSession::Stream
+ EXPECT_CALL(mock_stream, SetVisitor(_))
+ .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> new_visitor) {
+ visitor = std::move(new_visitor);
+ });
+ EXPECT_CALL(mock_stream, GetStreamId())
+ .WillOnce(Return(webtransport::StreamId(4)));
+ bool correct_message = false;
+ EXPECT_CALL(mock_stream, Writev(_, _))
+ .WillOnce([&](absl::Span<const absl::string_view> data,
+ const quiche::StreamWriteOptions& options) {
+ correct_message = true;
+ EXPECT_EQ(*ExtractMessageType(data[0]), MoqtMessageType::kClientSetup);
+ return absl::OkStatus();
+ });
+ session_.OnSessionReady();
+ EXPECT_TRUE(correct_message);
+
+ // Peer tries to open a bidi stream.
+ bool reported_error = false;
+ EXPECT_CALL(mock_session_, AcceptIncomingBidirectionalStream())
+ .WillOnce(Return(&mock_stream));
+ EXPECT_CALL(mock_session_, CloseSession(kProtocolViolation,
+ "Bidirectional stream already open"))
+ .Times(1);
+ EXPECT_CALL(session_callbacks_.session_terminated_callback, Call(_))
+ .WillOnce([&](absl::string_view error_message) {
+ reported_error = (error_message == "Bidirectional stream already open");
+ });
+ session_.OnIncomingBidirectionalStreamAvailable();
+ EXPECT_TRUE(reported_error);
+}
+
+TEST_F(MoqtSessionTest, OneBidirectionalStreamServer) {
+ MoqtSessionParameters server_parameters = {
+ /*version=*/MoqtVersion::kDraft01,
+ /*perspective=*/quic::Perspective::IS_SERVER,
+ /*using_webtrans=*/true,
+ /*path=*/"",
+ /*deliver_partial_objects=*/false,
+ };
+ MoqtSession server_session(&mock_session_, server_parameters,
+ session_callbacks_.AsSessionCallbacks());
+ StrictMock<webtransport::test::MockStream> mock_stream;
+ std::unique_ptr<MoqtParserVisitor> stream_input =
+ MoqtSessionPeer::CreateControlStream(&server_session, &mock_stream);
+ MoqtClientSetup setup = {
+ /*supported_versions*/ {MoqtVersion::kDraft01},
+ /*role=*/MoqtRole::kBoth,
+ /*path=*/std::nullopt,
+ };
+ bool correct_message = false;
+ EXPECT_CALL(mock_stream, Writev(_, _))
+ .WillOnce([&](absl::Span<const absl::string_view> data,
+ const quiche::StreamWriteOptions& options) {
+ correct_message = true;
+ EXPECT_EQ(*ExtractMessageType(data[0]), MoqtMessageType::kServerSetup);
+ return absl::OkStatus();
+ });
+ EXPECT_CALL(session_callbacks_.session_established_callback, Call()).Times(1);
+ stream_input->OnClientSetupMessage(setup);
+
+ // Peer tries to open a bidi stream.
+ bool reported_error = false;
+ EXPECT_CALL(mock_session_, AcceptIncomingBidirectionalStream())
+ .WillOnce(Return(&mock_stream));
+ EXPECT_CALL(mock_session_, CloseSession(kProtocolViolation,
+ "Bidirectional stream already open"))
+ .Times(1);
+ EXPECT_CALL(session_callbacks_.session_terminated_callback, Call(_))
+ .WillOnce([&](absl::string_view error_message) {
+ reported_error = (error_message == "Bidirectional stream already open");
+ });
+ server_session.OnIncomingBidirectionalStreamAvailable();
+ EXPECT_TRUE(reported_error);
+}
+
// TODO: Cover more error cases in the above
} // namespace test
diff --git a/quiche/quic/moqt/tools/chat_client_bin.cc b/quiche/quic/moqt/tools/chat_client_bin.cc
index 536709c..85772c4 100644
--- a/quiche/quic/moqt/tools/chat_client_bin.cc
+++ b/quiche/quic/moqt/tools/chat_client_bin.cc
@@ -147,7 +147,7 @@
std::optional<absl::string_view> message) {
if (message.has_value()) {
std::cout << "ANNOUNCE rejected, " << *message << "\n";
- session_->Error("Local ANNOUNCE rejected");
+ session_->Error(moqt::kGenericError, "Local ANNOUNCE rejected");
return;
}
std::cout << "ANNOUNCE for " << track_namespace << " accepted\n";
@@ -224,7 +224,8 @@
if (!got_version) {
// Chat server currently does not send version
if (line != "version=1") {
- session_->Error("Catalog does not begin with version");
+ session_->Error(moqt::kProtocolViolation,
+ "Catalog does not begin with version");
return;
}
got_version = true;
@@ -277,7 +278,8 @@
subscribes_to_make_++;
} else {
if (it->second.from_group == group_sequence) {
- session_->Error("User listed twice in Catalog");
+ session_->Error(moqt::kProtocolViolation,
+ "User listed twice in Catalog");
return;
}
it->second.from_group = group_sequence;