Use MOQT standard SubscribeError codes from draft-02.
Also, do what the review for the preceding CL actually meant when it talked about 'enum class'
Move MoqtError in to moqt_messages.h, because it will be needed by the parser.
PiperOrigin-RevId: 601566414
diff --git a/quiche/quic/moqt/moqt_framer.cc b/quiche/quic/moqt/moqt_framer.cc
index a0b5f40..8666061 100644
--- a/quiche/quic/moqt/moqt_framer.cc
+++ b/quiche/quic/moqt/moqt_framer.cc
@@ -288,14 +288,14 @@
NeededVarIntLen(static_cast<uint64_t>(MoqtMessageType::kSubscribeError)) +
LengthPrefixedStringLength(message.track_namespace) +
LengthPrefixedStringLength(message.track_name) +
- NeededVarIntLen(message.error_code) +
+ NeededVarIntLen(static_cast<uint64_t>(message.error_code)) +
LengthPrefixedStringLength(message.reason_phrase);
quiche::QuicheBuffer buffer(allocator_, buffer_size);
quic::QuicDataWriter writer(buffer.size(), buffer.data());
writer.WriteVarInt62(static_cast<uint64_t>(MoqtMessageType::kSubscribeError));
writer.WriteStringPieceVarInt62(message.track_namespace);
writer.WriteStringPieceVarInt62(message.track_name);
- writer.WriteVarInt62(message.error_code);
+ writer.WriteVarInt62(static_cast<uint64_t>(message.error_code));
writer.WriteStringPieceVarInt62(message.reason_phrase);
QUICHE_DCHECK(writer.remaining() == 0);
return buffer;
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h
index 773907b..8fd0c55 100644
--- a/quiche/quic/moqt/moqt_messages.h
+++ b/quiche/quic/moqt/moqt_messages.h
@@ -63,6 +63,16 @@
kServerSetup = 0x41,
};
+enum class QUICHE_EXPORT MoqtError : uint64_t {
+ kNoError = 0x0,
+ kGenericError = 0x1,
+ kUnauthorized = 0x2,
+ kProtocolViolation = 0x3,
+ kDuplicateTrackAlias = 0x4,
+ kParameterLengthMismatch = 0x5,
+ kGoawayTimeout = 0x10,
+};
+
enum class QUICHE_EXPORT MoqtRole : uint64_t {
kIngestion = 0x1,
kDelivery = 0x2,
@@ -189,10 +199,16 @@
quic::QuicTimeDelta expires = quic::QuicTimeDelta::FromMilliseconds(0);
};
+enum class QUICHE_EXPORT SubscribeErrorCode : uint64_t {
+ kGenericError = 0x0,
+ kInvalidRange = 0x1,
+ kRetryTrackAlias = 0x2,
+};
+
struct QUICHE_EXPORT MoqtSubscribeError {
absl::string_view track_namespace;
absl::string_view track_name;
- uint64_t error_code;
+ SubscribeErrorCode error_code;
absl::string_view reason_phrase;
};
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc
index 8a9213c..cde099b 100644
--- a/quiche/quic/moqt/moqt_parser.cc
+++ b/quiche/quic/moqt/moqt_parser.cc
@@ -395,9 +395,11 @@
if (!reader.ReadStringPieceVarInt62(&subscribe_error.track_name)) {
return 0;
}
- if (!reader.ReadVarInt62(&subscribe_error.error_code)) {
+ uint64_t error_code;
+ if (!reader.ReadVarInt62(&error_code)) {
return 0;
}
+ subscribe_error.error_code = static_cast<SubscribeErrorCode>(error_code);
if (!reader.ReadStringPieceVarInt62(&subscribe_error.reason_phrase)) {
return 0;
}
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index adb4689..c79e515 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -30,9 +30,6 @@
using ::quic::Perspective;
-constexpr uint64_t kMoqtErrorTrackDoesntExist = 1;
-constexpr uint64_t kMoqtErrorObjectDoesntExist = 2;
-
constexpr int kMaxBufferedObjects = 1000;
void MoqtSession::OnSessionReady() {
@@ -44,7 +41,7 @@
webtransport::Stream* control_stream =
session_->OpenOutgoingBidirectionalStream();
if (control_stream == nullptr) {
- Error(kGenericError, "Unable to open a control stream");
+ Error(MoqtError::kGenericError, "Unable to open a control stream");
return;
}
control_stream->SetVisitor(std::make_unique<Stream>(
@@ -60,7 +57,7 @@
quiche::QuicheBuffer serialized_setup = framer_.SerializeClientSetup(setup);
bool success = control_stream->Write(serialized_setup.AsStringView());
if (!success) {
- Error(kGenericError, "Failed to write client SETUP message");
+ Error(MoqtError::kGenericError, "Failed to write client SETUP message");
return;
}
QUIC_DLOG(INFO) << ENDPOINT << "Send the SETUP message";
@@ -82,7 +79,7 @@
while (webtransport::Stream* stream =
session_->AcceptIncomingBidirectionalStream()) {
if (control_stream_.has_value()) {
- Error(kProtocolViolation, "Bidirectional stream already open");
+ Error(MoqtError::kProtocolViolation, "Bidirectional stream already open");
return;
}
stream->SetVisitor(std::make_unique<Stream>(this, stream));
@@ -105,7 +102,7 @@
QUICHE_DLOG(INFO) << ENDPOINT << "MOQT session closed with code: "
<< static_cast<int>(code) << " and message: " << error;
error_ = std::string(error);
- session_->CloseSession(code, error);
+ session_->CloseSession(static_cast<uint64_t>(code), error);
std::move(session_terminated_callback_)(error);
}
@@ -129,7 +126,7 @@
bool success = session_->GetStreamById(*control_stream_)
->Write(framer_.SerializeAnnounce(message).AsStringView());
if (!success) {
- Error(kGenericError, "Failed to write ANNOUNCE message");
+ Error(MoqtError::kGenericError, "Failed to write ANNOUNCE message");
return;
}
QUIC_DLOG(INFO) << ENDPOINT << "Sent ANNOUNCE message for "
@@ -230,7 +227,8 @@
session_->GetStreamById(*control_stream_)
->Write(framer_.SerializeSubscribeRequest(message).AsStringView());
if (!success) {
- Error(kGenericError, "Failed to write SUBSCRIBE_REQUEST message");
+ Error(MoqtError::kGenericError,
+ "Failed to write SUBSCRIBE_REQUEST message");
return false;
}
QUIC_DLOG(INFO) << ENDPOINT << "Sent SUBSCRIBE_REQUEST message for "
@@ -319,7 +317,7 @@
webtransport::StreamErrorCode error) {
if (is_control_stream_.has_value() && *is_control_stream_) {
session_->Error(
- kProtocolViolation,
+ MoqtError::kProtocolViolation,
absl::StrCat("Control stream reset with error code ", error));
}
}
@@ -327,7 +325,7 @@
webtransport::StreamErrorCode error) {
if (is_control_stream_.has_value() && *is_control_stream_) {
session_->Error(
- kProtocolViolation,
+ MoqtError::kProtocolViolation,
absl::StrCat("Control stream reset with error code ", error));
}
}
@@ -336,7 +334,7 @@
absl::string_view payload,
bool end_of_message) {
if (is_control_stream_ == true) {
- session_->Error(kProtocolViolation,
+ session_->Error(MoqtError::kProtocolViolation,
"Received OBJECT message on control stream");
return;
}
@@ -371,7 +369,7 @@
}
if (session_->num_buffered_objects_ >= kMaxBufferedObjects) {
session_->num_buffered_objects_++;
- session_->Error(kGenericError, "Too many buffered objects");
+ session_->Error(MoqtError::kGenericError, "Too many buffered objects");
return;
}
queue->push_back(BufferedObject(stream_->GetStreamId(), message, payload,
@@ -393,7 +391,7 @@
void MoqtSession::Stream::OnClientSetupMessage(const MoqtClientSetup& message) {
if (is_control_stream_.has_value()) {
if (!*is_control_stream_) {
- session_->Error(kProtocolViolation,
+ session_->Error(MoqtError::kProtocolViolation,
"Received SETUP on non-control stream");
return;
}
@@ -401,13 +399,14 @@
is_control_stream_ = true;
}
if (perspective() == Perspective::IS_CLIENT) {
- session_->Error(kProtocolViolation, "Received CLIENT_SETUP from server");
+ session_->Error(MoqtError::kProtocolViolation,
+ "Received CLIENT_SETUP from server");
return;
}
if (absl::c_find(message.supported_versions, session_->parameters_.version) ==
message.supported_versions.end()) {
// TODO(martinduke): Is this the right error code? See issue #346.
- session_->Error(kProtocolViolation,
+ session_->Error(MoqtError::kProtocolViolation,
absl::StrCat("Version mismatch: expected 0x",
absl::Hex(session_->parameters_.version)));
return;
@@ -420,7 +419,8 @@
bool success = stream_->Write(
session_->framer_.SerializeServerSetup(response).AsStringView());
if (!success) {
- session_->Error(kGenericError, "Failed to write server SETUP message");
+ session_->Error(MoqtError::kGenericError,
+ "Failed to write server SETUP message");
return;
}
QUIC_DLOG(INFO) << ENDPOINT << "Sent the SETUP message";
@@ -432,7 +432,7 @@
void MoqtSession::Stream::OnServerSetupMessage(const MoqtServerSetup& message) {
if (is_control_stream_.has_value()) {
if (!*is_control_stream_) {
- session_->Error(kProtocolViolation,
+ session_->Error(MoqtError::kProtocolViolation,
"Received SETUP on non-control stream");
return;
}
@@ -440,12 +440,13 @@
is_control_stream_ = true;
}
if (perspective() == Perspective::IS_SERVER) {
- session_->Error(kProtocolViolation, "Received SERVER_SETUP from client");
+ session_->Error(MoqtError::kProtocolViolation,
+ "Received SERVER_SETUP from client");
return;
}
if (message.selected_version != session_->parameters_.version) {
// TODO(martinduke): Is this the right error code? See issue #346.
- session_->Error(kProtocolViolation,
+ session_->Error(MoqtError::kProtocolViolation,
absl::StrCat("Version mismatch: expected 0x",
absl::Hex(session_->parameters_.version)));
return;
@@ -456,7 +457,7 @@
}
void MoqtSession::Stream::SendSubscribeError(
- const MoqtSubscribeRequest& message, uint64_t error_code,
+ const MoqtSubscribeRequest& message, SubscribeErrorCode error_code,
absl::string_view reason_phrase) {
MoqtSubscribeError subscribe_error;
subscribe_error.track_namespace = message.track_namespace;
@@ -467,7 +468,8 @@
stream_->Write(session_->framer_.SerializeSubscribeError(subscribe_error)
.AsStringView());
if (!success) {
- session_->Error(kGenericError, "Failed to write SUBSCRIBE_ERROR message");
+ session_->Error(MoqtError::kGenericError,
+ "Failed to write SUBSCRIBE_ERROR message");
}
}
@@ -485,7 +487,7 @@
QUIC_DLOG(INFO) << ENDPOINT << "Rejected because "
<< message.track_namespace << ":" << message.track_name
<< " does not exist";
- SendSubscribeError(message, kMoqtErrorTrackDoesntExist,
+ SendSubscribeError(message, SubscribeErrorCode::kGenericError,
"Track does not exist");
return;
}
@@ -503,7 +505,7 @@
std::optional<absl::string_view> past_objects_available =
track.visitor()->OnSubscribeRequestForPast(window);
if (!past_objects_available.has_value()) {
- SendSubscribeError(message, kMoqtErrorObjectDoesntExist,
+ SendSubscribeError(message, SubscribeErrorCode::kGenericError,
"Object does not exist");
return;
}
@@ -515,7 +517,8 @@
bool success = stream_->Write(
session_->framer_.SerializeSubscribeOk(subscribe_ok).AsStringView());
if (!success) {
- session_->Error(kGenericError, "Failed to write SUBSCRIBE_OK message");
+ session_->Error(MoqtError::kGenericError,
+ "Failed to write SUBSCRIBE_OK message");
return;
}
QUIC_DLOG(INFO) << ENDPOINT << "Created subscription for "
@@ -533,13 +536,14 @@
return;
}
if (session_->tracks_by_alias_.contains(message.track_id)) {
- session_->Error(kDuplicateTrackAlias, "Received duplicate track_alias");
+ session_->Error(MoqtError::kDuplicateTrackAlias,
+ "Received duplicate track_alias");
return;
}
auto it = session_->remote_tracks_.find(FullTrackName(
std::string(message.track_namespace), std::string(message.track_name)));
if (it == session_->remote_tracks_.end()) {
- session_->Error(kProtocolViolation,
+ session_->Error(MoqtError::kProtocolViolation,
"Received SUBSCRIBE_OK for nonexistent subscribe");
return;
}
@@ -586,13 +590,14 @@
auto it = session_->remote_tracks_.find(FullTrackName(
std::string(message.track_namespace), std::string(message.track_name)));
if (it == session_->remote_tracks_.end()) {
- session_->Error(kProtocolViolation,
+ session_->Error(MoqtError::kProtocolViolation,
"Received SUBSCRIBE_ERROR for nonexistent subscribe");
return;
}
QUIC_DLOG(INFO) << ENDPOINT << "Received the SUBSCRIBE_ERROR for "
<< message.track_namespace << ":" << message.track_name
- << ", error = " << message.reason_phrase;
+ << ", error = " << static_cast<int>(message.error_code)
+ << " (" << message.reason_phrase << ")";
if (it->second.visitor() != nullptr) {
it->second.visitor()->OnReply(it->second.full_track_name(),
message.reason_phrase);
@@ -608,7 +613,8 @@
bool success =
stream_->Write(session_->framer_.SerializeAnnounceOk(ok).AsStringView());
if (!success) {
- session_->Error(kGenericError, "Failed to write ANNOUNCE_OK message");
+ session_->Error(MoqtError::kGenericError,
+ "Failed to write ANNOUNCE_OK message");
return;
}
}
@@ -619,7 +625,7 @@
}
auto it = session_->pending_outgoing_announces_.find(message.track_namespace);
if (it == session_->pending_outgoing_announces_.end()) {
- session_->Error(kProtocolViolation,
+ session_->Error(MoqtError::kProtocolViolation,
"Received ANNOUNCE_OK for nonexistent announce");
return;
}
@@ -634,7 +640,7 @@
}
auto it = session_->pending_outgoing_announces_.find(message.track_namespace);
if (it == session_->pending_outgoing_announces_.end()) {
- session_->Error(kProtocolViolation,
+ session_->Error(MoqtError::kProtocolViolation,
"Received ANNOUNCE_ERROR for nonexistent announce");
return;
}
@@ -643,17 +649,18 @@
}
void MoqtSession::Stream::OnParsingError(absl::string_view reason) {
- session_->Error(kProtocolViolation, absl::StrCat("Parse error: ", reason));
+ session_->Error(MoqtError::kProtocolViolation,
+ absl::StrCat("Parse error: ", reason));
}
bool MoqtSession::Stream::CheckIfIsControlStream() {
if (!is_control_stream_.has_value()) {
- session_->Error(kProtocolViolation,
+ session_->Error(MoqtError::kProtocolViolation,
"Received SUBSCRIBE_REQUEST as first message");
return false;
}
if (!*is_control_stream_) {
- session_->Error(kProtocolViolation,
+ session_->Error(MoqtError::kProtocolViolation,
"Received SUBSCRIBE_REQUEST on non-control stream");
return false;
}
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index ec71181..6620b3c 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -47,16 +47,6 @@
MoqtSessionDeletedCallback session_deleted_callback = +[] {};
};
-enum MoqtError : uint64_t {
- kNoError = 0x0,
- kGenericError = 0x1,
- kUnauthorized = 0x2,
- kProtocolViolation = 0x3,
- kDuplicateTrackAlias = 0x4,
- kParameterLengthMismatch = 0x5,
- kGoawayTimeout = 0x10,
-};
-
class QUICHE_EXPORT MoqtSession : public webtransport::SessionVisitor {
public:
MoqtSession(webtransport::Session* session, MoqtSessionParameters parameters,
@@ -182,7 +172,7 @@
private:
friend class test::MoqtSessionPeer;
void SendSubscribeError(const MoqtSubscribeRequest& message,
- uint64_t error_code,
+ SubscribeErrorCode error_code,
absl::string_view reason_phrase);
bool CheckIfIsControlStream();
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index 34e7fc3..eb231b2 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -235,13 +235,16 @@
TEST_F(MoqtSessionTest, Error) {
bool reported_error = false;
- EXPECT_CALL(mock_session_, CloseSession(kParameterLengthMismatch, "foo"))
+ EXPECT_CALL(
+ mock_session_,
+ CloseSession(static_cast<uint64_t>(MoqtError::kParameterLengthMismatch),
+ "foo"))
.Times(1);
EXPECT_CALL(session_callbacks_.session_terminated_callback, Call(_))
.WillOnce([&](absl::string_view error_message) {
reported_error = (error_message == "foo");
});
- session_.Error(kParameterLengthMismatch, "foo");
+ session_.Error(MoqtError::kParameterLengthMismatch, "foo");
EXPECT_TRUE(reported_error);
}
@@ -442,7 +445,7 @@
MoqtSubscribeError error = {
/*track_namespace=*/"foo",
/*track_name=*/"bar",
- /*error_code=*/1,
+ /*error_code=*/SubscribeErrorCode::kInvalidRange,
/*reason_phrase=*/"deadbeef",
};
correct_message = false;
@@ -701,8 +704,9 @@
bool reported_error = false;
EXPECT_CALL(mock_session_, AcceptIncomingBidirectionalStream())
.WillOnce(Return(&mock_stream));
- EXPECT_CALL(mock_session_, CloseSession(kProtocolViolation,
- "Bidirectional stream already open"))
+ EXPECT_CALL(mock_session_,
+ CloseSession(static_cast<uint64_t>(MoqtError::kProtocolViolation),
+ "Bidirectional stream already open"))
.Times(1);
EXPECT_CALL(session_callbacks_.session_terminated_callback, Call(_))
.WillOnce([&](absl::string_view error_message) {
@@ -745,8 +749,9 @@
bool reported_error = false;
EXPECT_CALL(mock_session_, AcceptIncomingBidirectionalStream())
.WillOnce(Return(&mock_stream));
- EXPECT_CALL(mock_session_, CloseSession(kProtocolViolation,
- "Bidirectional stream already open"))
+ EXPECT_CALL(mock_session_,
+ CloseSession(static_cast<uint64_t>(MoqtError::kProtocolViolation),
+ "Bidirectional stream already open"))
.Times(1);
EXPECT_CALL(session_callbacks_.session_terminated_callback, Call(_))
.WillOnce([&](absl::string_view error_message) {
diff --git a/quiche/quic/moqt/test_tools/moqt_test_message.h b/quiche/quic/moqt/test_tools/moqt_test_message.h
index ada4dea..09ebb64 100644
--- a/quiche/quic/moqt/test_tools/moqt_test_message.h
+++ b/quiche/quic/moqt/test_tools/moqt_test_message.h
@@ -487,7 +487,7 @@
MoqtSubscribeError subscribe_error_ = {
/*track_namespace=*/"foo",
/*track_name=*/"bar",
- /*subscribe=*/1,
+ /*subscribe=*/SubscribeErrorCode::kInvalidRange,
/*reason_phrase=*/"bar",
};
};
diff --git a/quiche/quic/moqt/tools/chat_client_bin.cc b/quiche/quic/moqt/tools/chat_client_bin.cc
index 85772c4..f2af2f7 100644
--- a/quiche/quic/moqt/tools/chat_client_bin.cc
+++ b/quiche/quic/moqt/tools/chat_client_bin.cc
@@ -147,7 +147,8 @@
std::optional<absl::string_view> message) {
if (message.has_value()) {
std::cout << "ANNOUNCE rejected, " << *message << "\n";
- session_->Error(moqt::kGenericError, "Local ANNOUNCE rejected");
+ session_->Error(moqt::MoqtError::kGenericError,
+ "Local ANNOUNCE rejected");
return;
}
std::cout << "ANNOUNCE for " << track_namespace << " accepted\n";
@@ -224,7 +225,7 @@
if (!got_version) {
// Chat server currently does not send version
if (line != "version=1") {
- session_->Error(moqt::kProtocolViolation,
+ session_->Error(moqt::MoqtError::kProtocolViolation,
"Catalog does not begin with version");
return;
}
@@ -278,7 +279,7 @@
subscribes_to_make_++;
} else {
if (it->second.from_group == group_sequence) {
- session_->Error(moqt::kProtocolViolation,
+ session_->Error(moqt::MoqtError::kProtocolViolation,
"User listed twice in Catalog");
return;
}