Rename ANNOUNCE* to PUBLISH_NAMESPACE* (draft-14) PiperOrigin-RevId: 805409434
diff --git a/quiche/quic/moqt/moqt_framer.cc b/quiche/quic/moqt/moqt_framer.cc index 2858390..4717cf2 100644 --- a/quiche/quic/moqt/moqt_framer.cc +++ b/quiche/quic/moqt/moqt_framer.cc
@@ -531,46 +531,46 @@ WireBoolean(message.forward), WireKeyValuePairList(parameters)); } -quiche::QuicheBuffer MoqtFramer::SerializeAnnounce( - const MoqtAnnounce& message) { +quiche::QuicheBuffer MoqtFramer::SerializePublishNamespace( + const MoqtPublishNamespace& message) { KeyValuePairList parameters; VersionSpecificParametersToKeyValuePairList(message.parameters, parameters); if (!ValidateVersionSpecificParameters(parameters, - MoqtMessageType::kAnnounce)) { + MoqtMessageType::kPublishNamespace)) { QUICHE_BUG(QUICHE_BUG_invalid_parameters) << "Serializing invalid MoQT parameters"; return quiche::QuicheBuffer(); } - return SerializeControlMessage(MoqtMessageType::kAnnounce, + return SerializeControlMessage(MoqtMessageType::kPublishNamespace, WireVarInt62(message.request_id), WireTrackNamespace(message.track_namespace), WireKeyValuePairList(parameters)); } -quiche::QuicheBuffer MoqtFramer::SerializeAnnounceOk( - const MoqtAnnounceOk& message) { - return SerializeControlMessage(MoqtMessageType::kAnnounceOk, +quiche::QuicheBuffer MoqtFramer::SerializePublishNamespaceOk( + const MoqtPublishNamespaceOk& message) { + return SerializeControlMessage(MoqtMessageType::kPublishNamespaceOk, WireVarInt62(message.request_id)); } -quiche::QuicheBuffer MoqtFramer::SerializeAnnounceError( - const MoqtAnnounceError& message) { +quiche::QuicheBuffer MoqtFramer::SerializePublishNamespaceError( + const MoqtPublishNamespaceError& message) { return SerializeControlMessage( - MoqtMessageType::kAnnounceError, WireVarInt62(message.request_id), + MoqtMessageType::kPublishNamespaceError, WireVarInt62(message.request_id), WireVarInt62(message.error_code), WireStringWithVarInt62Length(message.error_reason)); } -quiche::QuicheBuffer MoqtFramer::SerializeUnannounce( - const MoqtUnannounce& message) { - return SerializeControlMessage(MoqtMessageType::kUnannounce, +quiche::QuicheBuffer MoqtFramer::SerializePublishNamespaceDone( + const MoqtPublishNamespaceDone& message) { + return SerializeControlMessage(MoqtMessageType::kPublishNamespaceDone, WireTrackNamespace(message.track_namespace)); } -quiche::QuicheBuffer MoqtFramer::SerializeAnnounceCancel( - const MoqtAnnounceCancel& message) { +quiche::QuicheBuffer MoqtFramer::SerializePublishNamespaceCancel( + const MoqtPublishNamespaceCancel& message) { return SerializeControlMessage( - MoqtMessageType::kAnnounceCancel, + MoqtMessageType::kPublishNamespaceCancel, WireTrackNamespace(message.track_namespace), WireVarInt62(message.error_code), WireStringWithVarInt62Length(message.error_reason));
diff --git a/quiche/quic/moqt/moqt_framer.h b/quiche/quic/moqt/moqt_framer.h index 0168758..e5a4a52 100644 --- a/quiche/quic/moqt/moqt_framer.h +++ b/quiche/quic/moqt/moqt_framer.h
@@ -56,12 +56,16 @@ quiche::QuicheBuffer SerializePublishDone(const MoqtPublishDone& message); quiche::QuicheBuffer SerializeSubscribeUpdate( const MoqtSubscribeUpdate& message); - quiche::QuicheBuffer SerializeAnnounce(const MoqtAnnounce& message); - quiche::QuicheBuffer SerializeAnnounceOk(const MoqtAnnounceOk& message); - quiche::QuicheBuffer SerializeAnnounceError(const MoqtAnnounceError& message); - quiche::QuicheBuffer SerializeUnannounce(const MoqtUnannounce& message); - quiche::QuicheBuffer SerializeAnnounceCancel( - const MoqtAnnounceCancel& message); + quiche::QuicheBuffer SerializePublishNamespace( + const MoqtPublishNamespace& message); + quiche::QuicheBuffer SerializePublishNamespaceOk( + const MoqtPublishNamespaceOk& message); + quiche::QuicheBuffer SerializePublishNamespaceError( + const MoqtPublishNamespaceError& message); + quiche::QuicheBuffer SerializePublishNamespaceDone( + const MoqtPublishNamespaceDone& message); + quiche::QuicheBuffer SerializePublishNamespaceCancel( + const MoqtPublishNamespaceCancel& message); quiche::QuicheBuffer SerializeTrackStatus(const MoqtTrackStatus& message); quiche::QuicheBuffer SerializeTrackStatusOk(const MoqtTrackStatusOk& message); quiche::QuicheBuffer SerializeTrackStatusError(
diff --git a/quiche/quic/moqt/moqt_framer_test.cc b/quiche/quic/moqt/moqt_framer_test.cc index a63dfb3..47c2fe2 100644 --- a/quiche/quic/moqt/moqt_framer_test.cc +++ b/quiche/quic/moqt/moqt_framer_test.cc
@@ -39,11 +39,11 @@ MoqtMessageType::kSubscribeError, MoqtMessageType::kUnsubscribe, MoqtMessageType::kPublishDone, - MoqtMessageType::kAnnounce, - MoqtMessageType::kAnnounceOk, - MoqtMessageType::kAnnounceError, - MoqtMessageType::kUnannounce, - MoqtMessageType::kAnnounceCancel, + MoqtMessageType::kPublishNamespace, + MoqtMessageType::kPublishNamespaceOk, + MoqtMessageType::kPublishNamespaceError, + MoqtMessageType::kPublishNamespaceDone, + MoqtMessageType::kPublishNamespaceCancel, MoqtMessageType::kTrackStatus, MoqtMessageType::kTrackStatusOk, MoqtMessageType::kTrackStatusError, @@ -144,25 +144,25 @@ auto data = std::get<MoqtPublishDone>(structured_data); return framer_.SerializePublishDone(data); } - case MoqtMessageType::kAnnounce: { - auto data = std::get<MoqtAnnounce>(structured_data); - return framer_.SerializeAnnounce(data); + case MoqtMessageType::kPublishNamespace: { + auto data = std::get<MoqtPublishNamespace>(structured_data); + return framer_.SerializePublishNamespace(data); } - case moqt::MoqtMessageType::kAnnounceOk: { - auto data = std::get<MoqtAnnounceOk>(structured_data); - return framer_.SerializeAnnounceOk(data); + case moqt::MoqtMessageType::kPublishNamespaceOk: { + auto data = std::get<MoqtPublishNamespaceOk>(structured_data); + return framer_.SerializePublishNamespaceOk(data); } - case moqt::MoqtMessageType::kAnnounceError: { - auto data = std::get<MoqtAnnounceError>(structured_data); - return framer_.SerializeAnnounceError(data); + case moqt::MoqtMessageType::kPublishNamespaceError: { + auto data = std::get<MoqtPublishNamespaceError>(structured_data); + return framer_.SerializePublishNamespaceError(data); } - case MoqtMessageType::kUnannounce: { - auto data = std::get<MoqtUnannounce>(structured_data); - return framer_.SerializeUnannounce(data); + case MoqtMessageType::kPublishNamespaceDone: { + auto data = std::get<MoqtPublishNamespaceDone>(structured_data); + return framer_.SerializePublishNamespaceDone(data); } - case moqt::MoqtMessageType::kAnnounceCancel: { - auto data = std::get<MoqtAnnounceCancel>(structured_data); - return framer_.SerializeAnnounceCancel(data); + case moqt::MoqtMessageType::kPublishNamespaceCancel: { + auto data = std::get<MoqtPublishNamespaceCancel>(structured_data); + return framer_.SerializePublishNamespaceCancel(data); } case moqt::MoqtMessageType::kTrackStatus: { auto data = std::get<MoqtTrackStatus>(structured_data);
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc index 6eeb973..5c232f9 100644 --- a/quiche/quic/moqt/moqt_integration_test.cc +++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -144,23 +144,24 @@ EXPECT_TRUE(success); } -TEST_F(MoqtIntegrationTest, AnnounceSuccessThenUnannounce) { +TEST_F(MoqtIntegrationTest, PublishNamespaceSuccessThenPublishNamespaceDone) { EstablishSession(); auto parameters = std::make_optional<VersionSpecificParameters>( AuthTokenType::kOutOfBand, "foo"); - EXPECT_CALL(server_callbacks_.incoming_announce_callback, + EXPECT_CALL(server_callbacks_.incoming_publish_namespace_callback, Call(TrackNamespace{"foo"}, parameters)) .WillOnce(Return(std::nullopt)); testing::MockFunction<void( TrackNamespace track_namespace, - std::optional<MoqtAnnounceErrorReason> error_message)> - announce_callback; - client_->session()->Announce(TrackNamespace{"foo"}, - announce_callback.AsStdFunction(), *parameters); + std::optional<MoqtPublishNamespaceErrorReason> error_message)> + publish_namespace_callback; + client_->session()->PublishNamespace( + TrackNamespace{"foo"}, publish_namespace_callback.AsStdFunction(), + *parameters); bool matches = false; - EXPECT_CALL(announce_callback, Call(_, _)) + EXPECT_CALL(publish_namespace_callback, Call(_, _)) .WillOnce([&](TrackNamespace track_namespace, - std::optional<MoqtAnnounceErrorReason> error) { + std::optional<MoqtPublishNamespaceErrorReason> error) { matches = true; EXPECT_EQ(track_namespace, TrackNamespace{"foo"}); EXPECT_FALSE(error.has_value()); @@ -169,7 +170,7 @@ test_harness_.RunUntilWithDefaultTimeout([&]() { return matches; }); EXPECT_TRUE(success); matches = false; - EXPECT_CALL(server_callbacks_.incoming_announce_callback, Call(_, _)) + EXPECT_CALL(server_callbacks_.incoming_publish_namespace_callback, Call(_, _)) .WillOnce([&](TrackNamespace name, std::optional<VersionSpecificParameters> parameters) { matches = true; @@ -177,28 +178,29 @@ EXPECT_FALSE(parameters.has_value()); return std::nullopt; }); - client_->session()->Unannounce(TrackNamespace{"foo"}); + client_->session()->PublishNamespaceDone(TrackNamespace{"foo"}); success = test_harness_.RunUntilWithDefaultTimeout([&]() { return matches; }); EXPECT_TRUE(success); } -TEST_F(MoqtIntegrationTest, AnnounceSuccessThenCancel) { +TEST_F(MoqtIntegrationTest, PublishNamespaceSuccessThenCancel) { EstablishSession(); auto parameters = std::make_optional<VersionSpecificParameters>( AuthTokenType::kOutOfBand, "foo"); - EXPECT_CALL(server_callbacks_.incoming_announce_callback, + EXPECT_CALL(server_callbacks_.incoming_publish_namespace_callback, Call(TrackNamespace{"foo"}, parameters)) .WillOnce(Return(std::nullopt)); testing::MockFunction<void( TrackNamespace track_namespace, - std::optional<MoqtAnnounceErrorReason> error_message)> - announce_callback; - client_->session()->Announce(TrackNamespace{"foo"}, - announce_callback.AsStdFunction(), *parameters); + std::optional<MoqtPublishNamespaceErrorReason> error_message)> + publish_namespace_callback; + client_->session()->PublishNamespace( + TrackNamespace{"foo"}, publish_namespace_callback.AsStdFunction(), + *parameters); bool matches = false; - EXPECT_CALL(announce_callback, Call(_, _)) + EXPECT_CALL(publish_namespace_callback, Call(_, _)) .WillOnce([&](TrackNamespace track_namespace, - std::optional<MoqtAnnounceErrorReason> error) { + std::optional<MoqtPublishNamespaceErrorReason> error) { matches = true; EXPECT_EQ(track_namespace, TrackNamespace{"foo"}); EXPECT_FALSE(error.has_value()); @@ -207,40 +209,41 @@ test_harness_.RunUntilWithDefaultTimeout([&]() { return matches; }); EXPECT_TRUE(success); matches = false; - EXPECT_CALL(announce_callback, Call(_, _)) + EXPECT_CALL(publish_namespace_callback, Call(_, _)) .WillOnce([&](TrackNamespace track_namespace, - std::optional<MoqtAnnounceErrorReason> error) { + std::optional<MoqtPublishNamespaceErrorReason> error) { matches = true; EXPECT_EQ(track_namespace, TrackNamespace{"foo"}); ASSERT_TRUE(error.has_value()); EXPECT_EQ(error->error_code, RequestErrorCode::kInternalError); EXPECT_EQ(error->reason_phrase, "internal error"); }); - server_->session()->CancelAnnounce(TrackNamespace{"foo"}, - RequestErrorCode::kInternalError, - "internal error"); + server_->session()->CancelPublishNamespace(TrackNamespace{"foo"}, + RequestErrorCode::kInternalError, + "internal error"); success = test_harness_.RunUntilWithDefaultTimeout([&]() { return matches; }); EXPECT_TRUE(success); } -TEST_F(MoqtIntegrationTest, AnnounceSuccessSubscribeInResponse) { +TEST_F(MoqtIntegrationTest, PublishNamespaceSuccessSubscribeInResponse) { EstablishSession(); auto parameters = std::make_optional<VersionSpecificParameters>( AuthTokenType::kOutOfBand, "foo"); - EXPECT_CALL(server_callbacks_.incoming_announce_callback, + EXPECT_CALL(server_callbacks_.incoming_publish_namespace_callback, Call(TrackNamespace{"foo"}, parameters)) .WillOnce(Return(std::nullopt)); MockSubscribeRemoteTrackVisitor server_visitor; testing::MockFunction<void( TrackNamespace track_namespace, - std::optional<MoqtAnnounceErrorReason> error_message)> - announce_callback; - client_->session()->Announce(TrackNamespace{"foo"}, - announce_callback.AsStdFunction(), *parameters); + std::optional<MoqtPublishNamespaceErrorReason> error_message)> + publish_namespace_callback; + client_->session()->PublishNamespace( + TrackNamespace{"foo"}, publish_namespace_callback.AsStdFunction(), + *parameters); bool matches = false; - EXPECT_CALL(announce_callback, Call(_, _)) + EXPECT_CALL(publish_namespace_callback, Call(_, _)) .WillOnce([&](TrackNamespace track_namespace, - std::optional<MoqtAnnounceErrorReason> error) { + std::optional<MoqtPublishNamespaceErrorReason> error) { EXPECT_EQ(track_namespace, TrackNamespace{"foo"}); FullTrackName track_name(track_namespace, "/catalog"); EXPECT_FALSE(error.has_value()); @@ -255,22 +258,23 @@ EXPECT_TRUE(success); } -TEST_F(MoqtIntegrationTest, AnnounceSuccessSendDataInResponse) { +TEST_F(MoqtIntegrationTest, PublishNamespaceSuccessSendDataInResponse) { EstablishSession(); - // Set up the server to subscribe to "data" track for the namespace announce - // it receives. + // Set up the server to subscribe to "data" track for the namespace + // publish_namespace it receives. auto parameters = std::make_optional<VersionSpecificParameters>( AuthTokenType::kOutOfBand, "foo"); MockSubscribeRemoteTrackVisitor server_visitor; - EXPECT_CALL(server_callbacks_.incoming_announce_callback, Call(_, parameters)) + EXPECT_CALL(server_callbacks_.incoming_publish_namespace_callback, + Call(_, parameters)) .WillOnce([&](const TrackNamespace& track_namespace, std::optional<VersionSpecificParameters> /*parameters*/) { FullTrackName track_name(track_namespace, "data"); server_->session()->SubscribeAbsolute( track_name, /*start_group=*/0, /*start_object=*/0, &server_visitor, VersionSpecificParameters()); - return std::optional<MoqtAnnounceErrorReason>(); + return std::optional<MoqtPublishNamespaceErrorReason>(); }); auto queue = std::make_shared<MoqtOutgoingQueue>( @@ -282,9 +286,9 @@ EXPECT_CALL(server_visitor, OnReply(_, _, _)).WillOnce([&]() { received_subscribe_ok = true; }); - client_->session()->Announce( + client_->session()->PublishNamespace( TrackNamespace{"test"}, - [](TrackNamespace, std::optional<MoqtAnnounceErrorReason>) {}, + [](TrackNamespace, std::optional<MoqtPublishNamespaceErrorReason>) {}, *parameters); bool success = test_harness_.RunUntilWithDefaultTimeout( [&]() { return received_subscribe_ok; }); @@ -474,19 +478,19 @@ EXPECT_EQ(expected, Location(99, 1)); } -TEST_F(MoqtIntegrationTest, AnnounceFailure) { +TEST_F(MoqtIntegrationTest, PublishNamespaceFailure) { EstablishSession(); testing::MockFunction<void( TrackNamespace track_namespace, - std::optional<MoqtAnnounceErrorReason> error_message)> - announce_callback; - client_->session()->Announce(TrackNamespace{"foo"}, - announce_callback.AsStdFunction(), - VersionSpecificParameters()); + std::optional<MoqtPublishNamespaceErrorReason> error_message)> + publish_namespace_callback; + client_->session()->PublishNamespace( + TrackNamespace{"foo"}, publish_namespace_callback.AsStdFunction(), + VersionSpecificParameters()); bool matches = false; - EXPECT_CALL(announce_callback, Call(_, _)) + EXPECT_CALL(publish_namespace_callback, Call(_, _)) .WillOnce([&](TrackNamespace track_namespace, - std::optional<MoqtAnnounceErrorReason> error) { + std::optional<MoqtPublishNamespaceErrorReason> error) { matches = true; EXPECT_EQ(track_namespace, TrackNamespace{"foo"}); ASSERT_TRUE(error.has_value());
diff --git a/quiche/quic/moqt/moqt_messages.cc b/quiche/quic/moqt/moqt_messages.cc index c1f5dce..af3cba9 100644 --- a/quiche/quic/moqt/moqt_messages.cc +++ b/quiche/quic/moqt/moqt_messages.cc
@@ -185,7 +185,7 @@ MoqtMessageType::kSubscribe, MoqtMessageType::kSubscribeUpdate, MoqtMessageType::kSubscribeNamespace, - MoqtMessageType::kAnnounce, + MoqtMessageType::kPublishNamespace, MoqtMessageType::kTrackStatus, MoqtMessageType::kFetch}; const std::array<MoqtMessageType, 7> kAllowsDeliveryTimeout = { @@ -241,22 +241,22 @@ return "SUBSCRIBE_DONE"; case MoqtMessageType::kSubscribeUpdate: return "SUBSCRIBE_UPDATE"; - case MoqtMessageType::kAnnounceCancel: - return "ANNOUNCE_CANCEL"; + case MoqtMessageType::kPublishNamespaceCancel: + return "PUBLISH_NAMESPACE_CANCEL"; case MoqtMessageType::kTrackStatus: return "TRACK_STATUS"; case MoqtMessageType::kTrackStatusOk: return "TRACK_STATUS_OK"; case MoqtMessageType::kTrackStatusError: return "TRACK_STATUS_ERROR"; - case MoqtMessageType::kAnnounce: - return "ANNOUNCE"; - case MoqtMessageType::kAnnounceOk: - return "ANNOUNCE_OK"; - case MoqtMessageType::kAnnounceError: - return "ANNOUNCE_ERROR"; - case MoqtMessageType::kUnannounce: - return "UNANNOUNCE"; + case MoqtMessageType::kPublishNamespace: + return "PUBLISH_NAMESPACE"; + case MoqtMessageType::kPublishNamespaceOk: + return "PUBLISH_NAMESPACE_OK"; + case MoqtMessageType::kPublishNamespaceError: + return "PUBLISH_NAMESPACE_ERROR"; + case MoqtMessageType::kPublishNamespaceDone: + return "PUBLISH_NAMESPACE_DONE"; case MoqtMessageType::kGoAway: return "GOAWAY"; case MoqtMessageType::kSubscribeNamespace:
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h index e208d06..2ab3320 100644 --- a/quiche/quic/moqt/moqt_messages.h +++ b/quiche/quic/moqt/moqt_messages.h
@@ -254,13 +254,13 @@ kSubscribe = 0x03, kSubscribeOk = 0x04, kSubscribeError = 0x05, - kAnnounce = 0x06, - kAnnounceOk = 0x7, - kAnnounceError = 0x08, - kUnannounce = 0x09, + kPublishNamespace = 0x06, + kPublishNamespaceOk = 0x7, + kPublishNamespaceError = 0x08, + kPublishNamespaceDone = 0x09, kUnsubscribe = 0x0a, kPublishDone = 0x0b, - kAnnounceCancel = 0x0c, + kPublishNamespaceCancel = 0x0c, kTrackStatus = 0x0d, kTrackStatusOk = 0x0e, kTrackStatusError = 0x0f, @@ -368,15 +368,16 @@ bool operator==(const VersionSpecificParameters& other) const = default; }; -// Used for SUBSCRIBE_ERROR, ANNOUNCE_ERROR, ANNOUNCE_CANCEL, +// Used for SUBSCRIBE_ERROR, PUBLISH_NAMESPACE_ERROR, PUBLISH_NAMESPACE_CANCEL, // SUBSCRIBE_NAMESPACE_ERROR, and FETCH_ERROR. enum class QUICHE_EXPORT RequestErrorCode : uint64_t { kInternalError = 0x0, kUnauthorized = 0x1, kTimeout = 0x2, kNotSupported = 0x3, - kTrackDoesNotExist = 0x4, // SUBSCRIBE_ERROR and FETCH_ERROR only. - kUninterested = 0x4, // ANNOUNCE_ERROR and ANNOUNCE_CANCEL only. + kTrackDoesNotExist = 0x4, // SUBSCRIBE_ERROR and FETCH_ERROR only. + kUninterested = + 0x4, // PUBLISH_NAMESPACE_ERROR and PUBLISH_NAMESPACE_CANCEL only. kNamespacePrefixUnknown = 0x4, // SUBSCRIBE_NAMESPACE_ERROR only. kInvalidRange = 0x5, // SUBSCRIBE_ERROR and FETCH_ERROR only. kNamespacePrefixOverlap = 0x5, // SUBSCRIBE_NAMESPACE_ERROR only. @@ -394,7 +395,7 @@ }; // TODO(martinduke): These are deprecated. Replace them in the code. using MoqtSubscribeErrorReason = MoqtRequestError; -using MoqtAnnounceErrorReason = MoqtSubscribeErrorReason; +using MoqtPublishNamespaceErrorReason = MoqtSubscribeErrorReason; class TrackNamespace { public: @@ -730,27 +731,27 @@ VersionSpecificParameters parameters; }; -struct QUICHE_EXPORT MoqtAnnounce { +struct QUICHE_EXPORT MoqtPublishNamespace { uint64_t request_id; TrackNamespace track_namespace; VersionSpecificParameters parameters; }; -struct QUICHE_EXPORT MoqtAnnounceOk { +struct QUICHE_EXPORT MoqtPublishNamespaceOk { uint64_t request_id; }; -struct QUICHE_EXPORT MoqtAnnounceError { +struct QUICHE_EXPORT MoqtPublishNamespaceError { uint64_t request_id; RequestErrorCode error_code; std::string error_reason; }; -struct QUICHE_EXPORT MoqtUnannounce { +struct QUICHE_EXPORT MoqtPublishNamespaceDone { TrackNamespace track_namespace; }; -struct QUICHE_EXPORT MoqtAnnounceCancel { +struct QUICHE_EXPORT MoqtPublishNamespaceCancel { TrackNamespace track_namespace; RequestErrorCode error_code; std::string error_reason;
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc index afe7e5e..f40dfd2 100644 --- a/quiche/quic/moqt/moqt_parser.cc +++ b/quiche/quic/moqt/moqt_parser.cc
@@ -238,20 +238,20 @@ case MoqtMessageType::kSubscribeUpdate: bytes_read = ProcessSubscribeUpdate(reader); break; - case MoqtMessageType::kAnnounce: - bytes_read = ProcessAnnounce(reader); + case MoqtMessageType::kPublishNamespace: + bytes_read = ProcessPublishNamespace(reader); break; - case MoqtMessageType::kAnnounceOk: - bytes_read = ProcessAnnounceOk(reader); + case MoqtMessageType::kPublishNamespaceOk: + bytes_read = ProcessPublishNamespaceOk(reader); break; - case MoqtMessageType::kAnnounceError: - bytes_read = ProcessAnnounceError(reader); + case MoqtMessageType::kPublishNamespaceError: + bytes_read = ProcessPublishNamespaceError(reader); break; - case MoqtMessageType::kUnannounce: - bytes_read = ProcessUnannounce(reader); + case MoqtMessageType::kPublishNamespaceDone: + bytes_read = ProcessPublishNamespaceDone(reader); break; - case MoqtMessageType::kAnnounceCancel: - bytes_read = ProcessAnnounceCancel(reader); + case MoqtMessageType::kPublishNamespaceCancel: + bytes_read = ProcessPublishNamespaceCancel(reader); break; case MoqtMessageType::kTrackStatus: bytes_read = ProcessTrackStatus(reader); @@ -581,10 +581,11 @@ return reader.PreviouslyReadPayload().length(); } -size_t MoqtControlParser::ProcessAnnounce(quic::QuicDataReader& reader) { - MoqtAnnounce announce; - if (!reader.ReadVarInt62(&announce.request_id) || - !ReadTrackNamespace(reader, announce.track_namespace)) { +size_t MoqtControlParser::ProcessPublishNamespace( + quic::QuicDataReader& reader) { + MoqtPublishNamespace publish_namespace; + if (!reader.ReadVarInt62(&publish_namespace.request_id) || + !ReadTrackNamespace(reader, publish_namespace.track_namespace)) { return 0; } KeyValuePairList parameters; @@ -592,61 +593,67 @@ return 0; } if (!ValidateVersionSpecificParameters(parameters, - MoqtMessageType::kAnnounce)) { - ParseError("ANNOUNCE contains invalid parameters"); + MoqtMessageType::kPublishNamespace)) { + ParseError("PUBLISH_NAMESPACE contains invalid parameters"); return 0; } - if (!KeyValuePairListToVersionSpecificParameters(parameters, - announce.parameters)) { + if (!KeyValuePairListToVersionSpecificParameters( + parameters, publish_namespace.parameters)) { return 0; } - visitor_.OnAnnounceMessage(announce); + visitor_.OnPublishNamespaceMessage(publish_namespace); return reader.PreviouslyReadPayload().length(); } -size_t MoqtControlParser::ProcessAnnounceOk(quic::QuicDataReader& reader) { - MoqtAnnounceOk announce_ok; - if (!reader.ReadVarInt62(&announce_ok.request_id)) { +size_t MoqtControlParser::ProcessPublishNamespaceOk( + quic::QuicDataReader& reader) { + MoqtPublishNamespaceOk publish_namespace_ok; + if (!reader.ReadVarInt62(&publish_namespace_ok.request_id)) { return 0; } - visitor_.OnAnnounceOkMessage(announce_ok); + visitor_.OnPublishNamespaceOkMessage(publish_namespace_ok); return reader.PreviouslyReadPayload().length(); } -size_t MoqtControlParser::ProcessAnnounceError(quic::QuicDataReader& reader) { - MoqtAnnounceError announce_error; +size_t MoqtControlParser::ProcessPublishNamespaceError( + quic::QuicDataReader& reader) { + MoqtPublishNamespaceError publish_namespace_error; uint64_t error_code; - if (!reader.ReadVarInt62(&announce_error.request_id) || + if (!reader.ReadVarInt62(&publish_namespace_error.request_id) || !reader.ReadVarInt62(&error_code) || - !reader.ReadStringVarInt62(announce_error.error_reason)) { + !reader.ReadStringVarInt62(publish_namespace_error.error_reason)) { return 0; } - announce_error.error_code = static_cast<RequestErrorCode>(error_code); - visitor_.OnAnnounceErrorMessage(announce_error); + publish_namespace_error.error_code = + static_cast<RequestErrorCode>(error_code); + visitor_.OnPublishNamespaceErrorMessage(publish_namespace_error); return reader.PreviouslyReadPayload().length(); } -size_t MoqtControlParser::ProcessUnannounce(quic::QuicDataReader& reader) { - MoqtUnannounce unannounce; - if (!ReadTrackNamespace(reader, unannounce.track_namespace)) { +size_t MoqtControlParser::ProcessPublishNamespaceDone( + quic::QuicDataReader& reader) { + MoqtPublishNamespaceDone unpublish_namespace; + if (!ReadTrackNamespace(reader, unpublish_namespace.track_namespace)) { return 0; } - visitor_.OnUnannounceMessage(unannounce); + visitor_.OnPublishNamespaceDoneMessage(unpublish_namespace); return reader.PreviouslyReadPayload().length(); } -size_t MoqtControlParser::ProcessAnnounceCancel(quic::QuicDataReader& reader) { - MoqtAnnounceCancel announce_cancel; - if (!ReadTrackNamespace(reader, announce_cancel.track_namespace)) { +size_t MoqtControlParser::ProcessPublishNamespaceCancel( + quic::QuicDataReader& reader) { + MoqtPublishNamespaceCancel publish_namespace_cancel; + if (!ReadTrackNamespace(reader, publish_namespace_cancel.track_namespace)) { return 0; } uint64_t error_code; if (!reader.ReadVarInt62(&error_code) || - !reader.ReadStringVarInt62(announce_cancel.error_reason)) { + !reader.ReadStringVarInt62(publish_namespace_cancel.error_reason)) { return 0; } - announce_cancel.error_code = static_cast<RequestErrorCode>(error_code); - visitor_.OnAnnounceCancelMessage(announce_cancel); + publish_namespace_cancel.error_code = + static_cast<RequestErrorCode>(error_code); + visitor_.OnPublishNamespaceCancelMessage(publish_namespace_cancel); return reader.PreviouslyReadPayload().length(); } @@ -674,9 +681,9 @@ size_t MoqtControlParser::ProcessSubscribeNamespace( quic::QuicDataReader& reader) { - MoqtSubscribeNamespace subscribe_announces; - if (!reader.ReadVarInt62(&subscribe_announces.request_id) || - !ReadTrackNamespace(reader, subscribe_announces.track_namespace)) { + MoqtSubscribeNamespace subscribe_namespace; + if (!reader.ReadVarInt62(&subscribe_namespace.request_id) || + !ReadTrackNamespace(reader, subscribe_namespace.track_namespace)) { return 0; } KeyValuePairList parameters; @@ -689,10 +696,10 @@ return 0; } if (!KeyValuePairListToVersionSpecificParameters( - parameters, subscribe_announces.parameters)) { + parameters, subscribe_namespace.parameters)) { return 0; } - visitor_.OnSubscribeNamespaceMessage(subscribe_announces); + visitor_.OnSubscribeNamespaceMessage(subscribe_namespace); return reader.PreviouslyReadPayload().length(); }
diff --git a/quiche/quic/moqt/moqt_parser.h b/quiche/quic/moqt/moqt_parser.h index e10beb9..8ee32cb 100644 --- a/quiche/quic/moqt/moqt_parser.h +++ b/quiche/quic/moqt/moqt_parser.h
@@ -41,11 +41,16 @@ virtual void OnUnsubscribeMessage(const MoqtUnsubscribe& message) = 0; virtual void OnPublishDoneMessage(const MoqtPublishDone& message) = 0; virtual void OnSubscribeUpdateMessage(const MoqtSubscribeUpdate& message) = 0; - virtual void OnAnnounceMessage(const MoqtAnnounce& message) = 0; - virtual void OnAnnounceOkMessage(const MoqtAnnounceOk& message) = 0; - virtual void OnAnnounceErrorMessage(const MoqtAnnounceError& message) = 0; - virtual void OnUnannounceMessage(const MoqtUnannounce& message) = 0; - virtual void OnAnnounceCancelMessage(const MoqtAnnounceCancel& message) = 0; + virtual void OnPublishNamespaceMessage( + const MoqtPublishNamespace& message) = 0; + virtual void OnPublishNamespaceOkMessage( + const MoqtPublishNamespaceOk& message) = 0; + virtual void OnPublishNamespaceErrorMessage( + const MoqtPublishNamespaceError& message) = 0; + virtual void OnPublishNamespaceDoneMessage( + const MoqtPublishNamespaceDone& message) = 0; + virtual void OnPublishNamespaceCancelMessage( + const MoqtPublishNamespaceCancel& message) = 0; virtual void OnTrackStatusMessage(const MoqtTrackStatus& message) = 0; virtual void OnTrackStatusOkMessage(const MoqtTrackStatusOk& message) = 0; virtual void OnTrackStatusErrorMessage( @@ -128,11 +133,11 @@ size_t ProcessUnsubscribe(quic::QuicDataReader& reader); size_t ProcessPublishDone(quic::QuicDataReader& reader); size_t ProcessSubscribeUpdate(quic::QuicDataReader& reader); - size_t ProcessAnnounce(quic::QuicDataReader& reader); - size_t ProcessAnnounceOk(quic::QuicDataReader& reader); - size_t ProcessAnnounceError(quic::QuicDataReader& reader); - size_t ProcessUnannounce(quic::QuicDataReader& reader); - size_t ProcessAnnounceCancel(quic::QuicDataReader& reader); + size_t ProcessPublishNamespace(quic::QuicDataReader& reader); + size_t ProcessPublishNamespaceOk(quic::QuicDataReader& reader); + size_t ProcessPublishNamespaceError(quic::QuicDataReader& reader); + size_t ProcessPublishNamespaceDone(quic::QuicDataReader& reader); + size_t ProcessPublishNamespaceCancel(quic::QuicDataReader& reader); size_t ProcessTrackStatus(quic::QuicDataReader& reader); size_t ProcessTrackStatusOk(quic::QuicDataReader& reader); size_t ProcessTrackStatusError(quic::QuicDataReader& reader);
diff --git a/quiche/quic/moqt/moqt_parser_test.cc b/quiche/quic/moqt/moqt_parser_test.cc index eaee5c4..9b70469 100644 --- a/quiche/quic/moqt/moqt_parser_test.cc +++ b/quiche/quic/moqt/moqt_parser_test.cc
@@ -38,14 +38,14 @@ MoqtMessageType::kSubscribeUpdate, MoqtMessageType::kUnsubscribe, MoqtMessageType::kPublishDone, - MoqtMessageType::kAnnounceCancel, MoqtMessageType::kTrackStatus, MoqtMessageType::kTrackStatusOk, MoqtMessageType::kTrackStatusError, - MoqtMessageType::kAnnounce, - MoqtMessageType::kAnnounceOk, - MoqtMessageType::kAnnounceError, - MoqtMessageType::kUnannounce, + MoqtMessageType::kPublishNamespace, + MoqtMessageType::kPublishNamespaceOk, + MoqtMessageType::kPublishNamespaceError, + MoqtMessageType::kPublishNamespaceDone, + MoqtMessageType::kPublishNamespaceCancel, MoqtMessageType::kClientSetup, MoqtMessageType::kServerSetup, MoqtMessageType::kGoAway, @@ -955,35 +955,38 @@ EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); } -TEST_F(MoqtMessageSpecificTest, AnnounceAuthorizationTokenTwice) { +TEST_F(MoqtMessageSpecificTest, PublishNamespaceAuthorizationTokenTwice) { webtransport::test::InMemoryStream stream(/*stream_id=*/0); MoqtControlParser parser(kWebTrans, &stream, visitor_); - char announce[] = { + char publish_namespace[] = { 0x06, 0x00, 0x15, 0x02, 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" 0x02, // 2 params 0x03, 0x05, 0x03, 0x00, 0x62, 0x61, 0x72, // authorization = "bar" 0x03, 0x05, 0x03, 0x00, 0x62, 0x61, 0x72, // authorization = "bar" }; - stream.Receive(absl::string_view(announce, sizeof(announce)), false); + stream.Receive( + absl::string_view(publish_namespace, sizeof(publish_namespace)), false); parser.ReadAndDispatchMessages(); EXPECT_EQ(visitor_.messages_received_, 1); } -TEST_F(MoqtMessageSpecificTest, AnnounceHasDeliveryTimeout) { +TEST_F(MoqtMessageSpecificTest, PublishNamespaceHasDeliveryTimeout) { webtransport::test::InMemoryStream stream(/*stream_id=*/0); MoqtControlParser parser(kWebTrans, &stream, visitor_); - char announce[] = { + char publish_namespace[] = { 0x06, 0x00, 0x11, 0x02, 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" 0x02, // 2 params 0x03, 0x05, 0x03, 0x00, 0x62, 0x61, 0x72, // authorization_info = "bar" 0x02, 0x67, 0x10, // delivery_timeout = 10000 }; - stream.Receive(absl::string_view(announce, sizeof(announce)), false); + stream.Receive( + absl::string_view(publish_namespace, sizeof(publish_namespace)), false); parser.ReadAndDispatchMessages(); EXPECT_EQ(visitor_.messages_received_, 0); - EXPECT_EQ(visitor_.parsing_error_, "ANNOUNCE contains invalid parameters"); + EXPECT_EQ(visitor_.parsing_error_, + "PUBLISH_NAMESPACE contains invalid parameters"); EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); } @@ -1477,22 +1480,25 @@ } // All messages with TrackNamespace use ReadTrackNamespace too check this. Use -// ANNOUNCE. +// PUBLISH_NAMESPACE. TEST_F(MoqtMessageSpecificTest, NamespaceTooSmall) { webtransport::test::InMemoryStream stream(/*stream_id=*/0); MoqtControlParser parser(kRawQuic, &stream, visitor_); - char announce[7] = { + char publish_namespace[7] = { 0x06, 0x00, 0x04, 0x02, // request_id = 2 0x01, 0x00, // one empty namespace element 0x00, // no parameters }; - stream.Receive(absl::string_view(announce, sizeof(announce)), false); + stream.Receive( + absl::string_view(publish_namespace, sizeof(publish_namespace)), false); parser.ReadAndDispatchMessages(); EXPECT_EQ(visitor_.messages_received_, 1); EXPECT_EQ(visitor_.parsing_error_, std::nullopt); - --announce[2]; // Remove one element. - --announce[4]; - stream.Receive(absl::string_view(announce, sizeof(announce) - 1), false); + --publish_namespace[2]; // Remove one element. + --publish_namespace[4]; + stream.Receive( + absl::string_view(publish_namespace, sizeof(publish_namespace) - 1), + false); parser.ReadAndDispatchMessages(); EXPECT_EQ(visitor_.messages_received_, 1); EXPECT_EQ(visitor_.parsing_error_, "Invalid number of namespace elements"); @@ -1501,18 +1507,21 @@ TEST_F(MoqtMessageSpecificTest, NamespaceTooLarge) { webtransport::test::InMemoryStream stream(/*stream_id=*/0); MoqtControlParser parser(kRawQuic, &stream, visitor_); - char announce[39] = { + char publish_namespace[39] = { 0x06, 0x00, 0x23, 0x02, // type, length = 35, request_id = 2 0x20, // 32 namespace elements. This is the maximum. }; // 32 empty namespace elements + no parameters. - stream.Receive(absl::string_view(announce, sizeof(announce) - 1), false); + stream.Receive( + absl::string_view(publish_namespace, sizeof(publish_namespace) - 1), + false); parser.ReadAndDispatchMessages(); EXPECT_EQ(visitor_.messages_received_, 1); EXPECT_EQ(visitor_.parsing_error_, std::nullopt); - ++announce[2]; // Add one element. - ++announce[4]; - stream.Receive(absl::string_view(announce, sizeof(announce)), false); + ++publish_namespace[2]; // Add one element. + ++publish_namespace[4]; + stream.Receive( + absl::string_view(publish_namespace, sizeof(publish_namespace)), false); parser.ReadAndDispatchMessages(); EXPECT_EQ(visitor_.messages_received_, 1); EXPECT_EQ(visitor_.parsing_error_, "Invalid number of namespace elements");
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc index 783a0cf..e0dbc62 100644 --- a/quiche/quic/moqt/moqt_session.cc +++ b/quiche/quic/moqt/moqt_session.cc
@@ -277,7 +277,7 @@ << peer_max_request_id_; return false; } - if (outgoing_subscribe_announces_.contains(track_namespace)) { + if (outgoing_subscribe_namespaces_.contains(track_namespace)) { std::move(callback)( track_namespace, RequestErrorCode::kInternalError, "SUBSCRIBE_NAMESPACE already outstanding for namespace"); @@ -291,15 +291,15 @@ SendControlMessage(framer_.SerializeSubscribeNamespace(message)); QUIC_DLOG(INFO) << ENDPOINT << "Sent SUBSCRIBE_NAMESPACE message for " << message.track_namespace; - pending_outgoing_subscribe_announces_[message.request_id] = + pending_outgoing_subscribe_namespaces_[message.request_id] = PendingSubscribeNamespaceData{track_namespace, std::move(callback)}; - outgoing_subscribe_announces_.emplace(track_namespace); + outgoing_subscribe_namespaces_.emplace(track_namespace); return true; } bool MoqtSession::UnsubscribeNamespace(TrackNamespace track_namespace) { QUICHE_DCHECK(track_namespace.IsValid()); - if (!outgoing_subscribe_announces_.contains(track_namespace)) { + if (!outgoing_subscribe_namespaces_.contains(track_namespace)) { return false; } MoqtUnsubscribeNamespace message; @@ -307,19 +307,21 @@ SendControlMessage(framer_.SerializeUnsubscribeNamespace(message)); QUIC_DLOG(INFO) << ENDPOINT << "Sent UNSUBSCRIBE_NAMESPACE message for " << message.track_namespace; - outgoing_subscribe_announces_.erase(track_namespace); + outgoing_subscribe_namespaces_.erase(track_namespace); return true; } -void MoqtSession::Announce(TrackNamespace track_namespace, - MoqtOutgoingAnnounceCallback announce_callback, - VersionSpecificParameters parameters) { +void MoqtSession::PublishNamespace( + TrackNamespace track_namespace, + MoqtOutgoingPublishNamespaceCallback publish_namespace_callback, + VersionSpecificParameters parameters) { QUICHE_DCHECK(track_namespace.IsValid()); - if (outgoing_announces_.contains(track_namespace)) { - std::move(announce_callback)( + if (outgoing_publish_namespaces_.contains(track_namespace)) { + std::move(publish_namespace_callback)( track_namespace, - MoqtAnnounceErrorReason{RequestErrorCode::kInternalError, - "ANNOUNCE already outstanding for namespace"}); + MoqtPublishNamespaceErrorReason{ + RequestErrorCode::kInternalError, + "PUBLISH_NAMESPACE already outstanding for namespace"}); return; } if (next_request_id_ >= peer_max_request_id_) { @@ -330,51 +332,54 @@ SendControlMessage(framer_.SerializeRequestsBlocked(requests_blocked)); last_requests_blocked_sent_ = peer_max_request_id_; } - QUIC_DLOG(INFO) << ENDPOINT << "Tried to send ANNOUNCE with ID " + QUIC_DLOG(INFO) << ENDPOINT << "Tried to send PUBLISH_NAMESPACE with ID " << next_request_id_ << " which is greater than the maximum ID " << peer_max_request_id_; return; } if (received_goaway_ || sent_goaway_) { - QUIC_DLOG(INFO) << ENDPOINT << "Tried to send ANNOUNCE after GOAWAY"; + QUIC_DLOG(INFO) << ENDPOINT + << "Tried to send PUBLISH_NAMESPACE after GOAWAY"; return; } - MoqtAnnounce message; + MoqtPublishNamespace message; message.request_id = next_request_id_; next_request_id_ += 2; message.track_namespace = track_namespace; message.parameters = parameters; - SendControlMessage(framer_.SerializeAnnounce(message)); - QUIC_DLOG(INFO) << ENDPOINT << "Sent ANNOUNCE message for " + SendControlMessage(framer_.SerializePublishNamespace(message)); + QUIC_DLOG(INFO) << ENDPOINT << "Sent PUBLISH_NAMESPACE message for " << message.track_namespace; - pending_outgoing_announces_[message.request_id] = track_namespace; - outgoing_announces_[track_namespace] = std::move(announce_callback); + pending_outgoing_publish_namespaces_[message.request_id] = track_namespace; + outgoing_publish_namespaces_[track_namespace] = + std::move(publish_namespace_callback); } -bool MoqtSession::Unannounce(TrackNamespace track_namespace) { +bool MoqtSession::PublishNamespaceDone(TrackNamespace track_namespace) { QUICHE_DCHECK(track_namespace.IsValid()); - auto it = outgoing_announces_.find(track_namespace); - if (it == outgoing_announces_.end()) { - return false; // Could have been destroyed by ANNOUNCE_CANCEL. + auto it = outgoing_publish_namespaces_.find(track_namespace); + if (it == outgoing_publish_namespaces_.end()) { + return false; // Could have been destroyed by PUBLISH_NAMESPACE_CANCEL. } - MoqtUnannounce message; + MoqtPublishNamespaceDone message; message.track_namespace = track_namespace; - SendControlMessage(framer_.SerializeUnannounce(message)); - QUIC_DLOG(INFO) << ENDPOINT << "Sent UNANNOUNCE message for " + SendControlMessage(framer_.SerializePublishNamespaceDone(message)); + QUIC_DLOG(INFO) << ENDPOINT << "Sent PUBLISH_NAMESPACE_DONE message for " << message.track_namespace; - outgoing_announces_.erase(it); + outgoing_publish_namespaces_.erase(it); return true; } -void MoqtSession::CancelAnnounce(TrackNamespace track_namespace, - RequestErrorCode code, - absl::string_view reason) { +void MoqtSession::CancelPublishNamespace(TrackNamespace track_namespace, + RequestErrorCode code, + absl::string_view reason) { QUICHE_DCHECK(track_namespace.IsValid()); - MoqtAnnounceCancel message{track_namespace, code, std::string(reason)}; + MoqtPublishNamespaceCancel message{track_namespace, code, + std::string(reason)}; - SendControlMessage(framer_.SerializeAnnounceCancel(message)); - QUIC_DLOG(INFO) << ENDPOINT << "Sent ANNOUNCE_CANCEL message for " + SendControlMessage(framer_.SerializePublishNamespaceCancel(message)); + QUIC_DLOG(INFO) << ENDPOINT << "Sent PUBLISH_NAMESPACE_CANCEL message for " << message.track_namespace << " with reason " << reason; } @@ -1193,98 +1198,104 @@ it->second->set_delivery_timeout(message.parameters.delivery_timeout); } -void MoqtSession::ControlStream::OnAnnounceMessage( - const MoqtAnnounce& message) { +void MoqtSession::ControlStream::OnPublishNamespaceMessage( + const MoqtPublishNamespace& message) { if (!session_->ValidateRequestId(message.request_id)) { return; } if (session_->sent_goaway_) { - QUIC_DLOG(INFO) << ENDPOINT << "Received an ANNOUNCE after GOAWAY"; - MoqtAnnounceError error; + QUIC_DLOG(INFO) << ENDPOINT << "Received a PUBLISH_NAMESPACE after GOAWAY"; + MoqtPublishNamespaceError error; error.request_id = message.request_id; error.error_code = RequestErrorCode::kUnauthorized; - error.error_reason = "ANNOUNCE after GOAWAY"; - SendOrBufferMessage(session_->framer_.SerializeAnnounceError(error)); + error.error_reason = "PUBLISH_NAMESPACE after GOAWAY"; + SendOrBufferMessage( + session_->framer_.SerializePublishNamespaceError(error)); return; } - std::optional<MoqtAnnounceErrorReason> error = - session_->callbacks_.incoming_announce_callback(message.track_namespace, - message.parameters); + std::optional<MoqtPublishNamespaceErrorReason> error = + session_->callbacks_.incoming_publish_namespace_callback( + message.track_namespace, message.parameters); if (error.has_value()) { - MoqtAnnounceError reply; + MoqtPublishNamespaceError reply; reply.request_id = message.request_id; reply.error_code = error->error_code; reply.error_reason = error->reason_phrase; - SendOrBufferMessage(session_->framer_.SerializeAnnounceError(reply)); + SendOrBufferMessage( + session_->framer_.SerializePublishNamespaceError(reply)); return; } - MoqtAnnounceOk ok; + MoqtPublishNamespaceOk ok; ok.request_id = message.request_id; - SendOrBufferMessage(session_->framer_.SerializeAnnounceOk(ok)); + SendOrBufferMessage(session_->framer_.SerializePublishNamespaceOk(ok)); } -// Do not enforce that there is only one of OK or ERROR per ANNOUNCE. Upon -// ERROR, we immediately destroy the state. -void MoqtSession::ControlStream::OnAnnounceOkMessage( - const MoqtAnnounceOk& message) { - auto it = session_->pending_outgoing_announces_.find(message.request_id); - if (it == session_->pending_outgoing_announces_.end()) { +// Do not enforce that there is only one of OK or ERROR per PUBLISH_NAMESPACE. +// Upon ERROR, we immediately destroy the state. +void MoqtSession::ControlStream::OnPublishNamespaceOkMessage( + const MoqtPublishNamespaceOk& message) { + auto it = + session_->pending_outgoing_publish_namespaces_.find(message.request_id); + if (it == session_->pending_outgoing_publish_namespaces_.end()) { session_->Error(MoqtError::kProtocolViolation, - "Received ANNOUNCE_OK for unknown request_id"); + "Received PUBLISH_NAMESPACE_OK for unknown request_id"); return; } TrackNamespace track_namespace = it->second; - session_->pending_outgoing_announces_.erase(it); - auto callback_it = session_->outgoing_announces_.find(track_namespace); - if (callback_it == session_->outgoing_announces_.end()) { - // It might have already been destroyed due to UNANNOUNCE. + session_->pending_outgoing_publish_namespaces_.erase(it); + auto callback_it = + session_->outgoing_publish_namespaces_.find(track_namespace); + if (callback_it == session_->outgoing_publish_namespaces_.end()) { + // It might have already been destroyed due to PUBLISH_NAMESPACE_DONE. return; } std::move(callback_it->second)(track_namespace, std::nullopt); } -void MoqtSession::ControlStream::OnAnnounceErrorMessage( - const MoqtAnnounceError& message) { - auto it = session_->pending_outgoing_announces_.find(message.request_id); - if (it == session_->pending_outgoing_announces_.end()) { +void MoqtSession::ControlStream::OnPublishNamespaceErrorMessage( + const MoqtPublishNamespaceError& message) { + auto it = + session_->pending_outgoing_publish_namespaces_.find(message.request_id); + if (it == session_->pending_outgoing_publish_namespaces_.end()) { session_->Error(MoqtError::kProtocolViolation, - "Received ANNOUNCE_ERROR for unknown request_id"); + "Received PUBLISH_NAMESPACE_ERROR for unknown request_id"); return; } TrackNamespace track_namespace = it->second; - session_->pending_outgoing_announces_.erase(it); - auto it2 = session_->outgoing_announces_.find(track_namespace); - if (it2 == session_->outgoing_announces_.end()) { - return; // State might have been destroyed due to UNANNOUNCE. + session_->pending_outgoing_publish_namespaces_.erase(it); + auto it2 = session_->outgoing_publish_namespaces_.find(track_namespace); + if (it2 == session_->outgoing_publish_namespaces_.end()) { + return; // State might have been destroyed due to PUBLISH_NAMESPACE_DONE. } std::move(it2->second)( track_namespace, - MoqtAnnounceErrorReason{message.error_code, - std::string(message.error_reason)}); - session_->outgoing_announces_.erase(it2); + MoqtPublishNamespaceErrorReason{message.error_code, + std::string(message.error_reason)}); + session_->outgoing_publish_namespaces_.erase(it2); } -void MoqtSession::ControlStream::OnUnannounceMessage( - const MoqtUnannounce& message) { - session_->callbacks_.incoming_announce_callback(message.track_namespace, - std::nullopt); +void MoqtSession::ControlStream::OnPublishNamespaceDoneMessage( + const MoqtPublishNamespaceDone& message) { + session_->callbacks_.incoming_publish_namespace_callback( + message.track_namespace, std::nullopt); } -void MoqtSession::ControlStream::OnAnnounceCancelMessage( - const MoqtAnnounceCancel& message) { +void MoqtSession::ControlStream::OnPublishNamespaceCancelMessage( + const MoqtPublishNamespaceCancel& message) { // The spec currently says that if a later SUBSCRIBE arrives for this // namespace, that SHOULD be a session error. I'm hoping that via Issue #557, // this will go away. Regardless, a SHOULD will not compel the session to keep // state forever, so there is no support for this requirement. - auto it = session_->outgoing_announces_.find(message.track_namespace); - if (it == session_->outgoing_announces_.end()) { - return; // State might have been destroyed due to UNANNOUNCE. + auto it = + session_->outgoing_publish_namespaces_.find(message.track_namespace); + if (it == session_->outgoing_publish_namespaces_.end()) { + return; // State might have been destroyed due to PUBLISH_NAMESPACE_DONE. } std::move(it->second)( message.track_namespace, - MoqtAnnounceErrorReason{message.error_code, - std::string(message.error_reason)}); - session_->outgoing_announces_.erase(it); + MoqtPublishNamespaceErrorReason{message.error_code, + std::string(message.error_reason)}); + session_->outgoing_publish_namespaces_.erase(it); } void MoqtSession::ControlStream::OnTrackStatusMessage( @@ -1355,7 +1366,7 @@ session_->framer_.SerializeSubscribeNamespaceError(error)); return; } - if (!session_->incoming_subscribe_announces_.AddNamespace( + if (!session_->incoming_subscribe_namespace_.AddNamespace( message.track_namespace)) { QUIC_DLOG(INFO) << ENDPOINT << "Received a SUBSCRIBE_NAMESPACE for " << message.track_namespace @@ -1369,7 +1380,7 @@ return; } std::optional<MoqtSubscribeErrorReason> result = - session_->callbacks_.incoming_subscribe_announces_callback( + session_->callbacks_.incoming_subscribe_namespace_callback( message.track_namespace, message.parameters); if (result.has_value()) { MoqtSubscribeNamespaceError error; @@ -1378,7 +1389,7 @@ error.error_reason = result->reason_phrase; SendOrBufferMessage( session_->framer_.SerializeSubscribeNamespaceError(error)); - session_->incoming_subscribe_announces_.RemoveNamespace( + session_->incoming_subscribe_namespace_.RemoveNamespace( message.track_namespace); return; } @@ -1390,21 +1401,21 @@ void MoqtSession::ControlStream::OnSubscribeNamespaceOkMessage( const MoqtSubscribeNamespaceOk& message) { auto it = - session_->pending_outgoing_subscribe_announces_.find(message.request_id); - if (it == session_->pending_outgoing_subscribe_announces_.end()) { + session_->pending_outgoing_subscribe_namespaces_.find(message.request_id); + if (it == session_->pending_outgoing_subscribe_namespaces_.end()) { session_->Error(MoqtError::kProtocolViolation, "Received SUBSCRIBE_NAMESPACE_OK for unknown request_id"); return; // UNSUBSCRIBE_NAMESPACE may already have deleted the entry. } std::move(it->second.callback)(it->second.track_namespace, std::nullopt, ""); - session_->pending_outgoing_subscribe_announces_.erase(it); + session_->pending_outgoing_subscribe_namespaces_.erase(it); } void MoqtSession::ControlStream::OnSubscribeNamespaceErrorMessage( const MoqtSubscribeNamespaceError& message) { auto it = - session_->pending_outgoing_subscribe_announces_.find(message.request_id); - if (it == session_->pending_outgoing_subscribe_announces_.end()) { + session_->pending_outgoing_subscribe_namespaces_.find(message.request_id); + if (it == session_->pending_outgoing_subscribe_namespaces_.end()) { session_->Error( MoqtError::kProtocolViolation, "Received SUBSCRIBE_NAMESPACE_ERROR for unknown request_id"); @@ -1412,17 +1423,17 @@ } std::move(it->second.callback)(it->second.track_namespace, message.error_code, absl::string_view(message.error_reason)); - session_->outgoing_subscribe_announces_.erase(it->second.track_namespace); - session_->pending_outgoing_subscribe_announces_.erase(it); + session_->outgoing_subscribe_namespaces_.erase(it->second.track_namespace); + session_->pending_outgoing_subscribe_namespaces_.erase(it); } void MoqtSession::ControlStream::OnUnsubscribeNamespaceMessage( const MoqtUnsubscribeNamespace& message) { // MoqtSession keeps no state here, so just tell the application. std::optional<MoqtSubscribeErrorReason> result = - session_->callbacks_.incoming_subscribe_announces_callback( + session_->callbacks_.incoming_subscribe_namespace_callback( message.track_namespace, std::nullopt); - session_->incoming_subscribe_announces_.RemoveNamespace( + session_->incoming_subscribe_namespace_.RemoveNamespace( message.track_namespace); }
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h index 407a5c7..55a67cf 100644 --- a/quiche/quic/moqt/moqt_session.h +++ b/quiche/quic/moqt/moqt_session.h
@@ -101,18 +101,22 @@ VersionSpecificParameters parameters); bool UnsubscribeNamespace(TrackNamespace track_namespace); - // Send an ANNOUNCE message for |track_namespace|, and call - // |announce_callback| when the response arrives. Will fail immediately if - // there is already an unresolved ANNOUNCE for that namespace. - void Announce(TrackNamespace track_namespace, - MoqtOutgoingAnnounceCallback announce_callback, - VersionSpecificParameters parameters); - // Returns true if message was sent, false if there is no ANNOUNCE to cancel. - bool Unannounce(TrackNamespace track_namespace); + // Send a PUBLISH_NAMESPACE message for |track_namespace|, and call + // |publish_namespace_callback| when the response arrives. Will fail + // immediately if there is already an unresolved PUBLISH_NAMESPACE for that + // namespace. + void PublishNamespace( + TrackNamespace track_namespace, + MoqtOutgoingPublishNamespaceCallback publish_namespace_callback, + VersionSpecificParameters parameters); + // Returns true if message was sent, false if there is no PUBLISH_NAMESPACE to + // cancel. + bool PublishNamespaceDone(TrackNamespace track_namespace); // Allows the subscriber to declare it will not subscribe to |track_namespace| // anymore. - void CancelAnnounce(TrackNamespace track_namespace, RequestErrorCode code, - absl::string_view reason_phrase); + void CancelPublishNamespace(TrackNamespace track_namespace, + RequestErrorCode code, + absl::string_view reason_phrase); // MoqtSessionInterface implementation. MoqtSessionCallbacks& callbacks() override { return callbacks_; } @@ -232,11 +236,16 @@ // There is no state to update for SUBSCRIBE_DONE. void OnPublishDoneMessage(const MoqtPublishDone& /*message*/) override; void OnSubscribeUpdateMessage(const MoqtSubscribeUpdate& message) override; - void OnAnnounceMessage(const MoqtAnnounce& message) override; - void OnAnnounceOkMessage(const MoqtAnnounceOk& message) override; - void OnAnnounceErrorMessage(const MoqtAnnounceError& message) override; - void OnUnannounceMessage(const MoqtUnannounce& message) override; - void OnAnnounceCancelMessage(const MoqtAnnounceCancel& message) override; + void OnPublishNamespaceMessage( + const MoqtPublishNamespace& message) override; + void OnPublishNamespaceOkMessage( + const MoqtPublishNamespaceOk& message) override; + void OnPublishNamespaceErrorMessage( + const MoqtPublishNamespaceError& message) override; + void OnPublishNamespaceDoneMessage( + const MoqtPublishNamespaceDone& /*message*/) override; + void OnPublishNamespaceCancelMessage( + const MoqtPublishNamespaceCancel& message) override; void OnTrackStatusMessage(const MoqtTrackStatus& message) override; void OnTrackStatusOkMessage(const MoqtTrackStatusOk& /*message*/) override { } @@ -830,25 +839,26 @@ absl::flat_hash_map<FullTrackName, MoqtPublishingMonitorInterface*> monitoring_interfaces_for_published_tracks_; - // Outgoing ANNOUNCE for which no OK or ERROR has been received. - absl::flat_hash_map<uint64_t, TrackNamespace> pending_outgoing_announces_; - // All outgoing ANNOUNCE. - absl::flat_hash_map<TrackNamespace, MoqtOutgoingAnnounceCallback> - outgoing_announces_; + // Outgoing PUBLISH_NAMESPACE for which no OK or ERROR has been received. + absl::flat_hash_map<uint64_t, TrackNamespace> + pending_outgoing_publish_namespaces_; + // All outgoing PUBLISH_NAMESPACE. + absl::flat_hash_map<TrackNamespace, MoqtOutgoingPublishNamespaceCallback> + outgoing_publish_namespaces_; // The value is nullptr after OK or ERROR is received. The entry is deleted // when sending UNSUBSCRIBE_NAMESPACE, to make sure the application doesn't - // unsubscribe from something that it isn't subscribed to. ANNOUNCEs that - // result from this subscription use incoming_announce_callback. + // unsubscribe from something that it isn't subscribed to. PUBLISH_NAMESPACEs + // that result from this subscription use incoming_publish_namespace_callback. struct PendingSubscribeNamespaceData { TrackNamespace track_namespace; MoqtOutgoingSubscribeNamespaceCallback callback; }; absl::flat_hash_map<uint64_t, PendingSubscribeNamespaceData> - pending_outgoing_subscribe_announces_; - absl::flat_hash_set<TrackNamespace> outgoing_subscribe_announces_; + pending_outgoing_subscribe_namespaces_; + absl::flat_hash_set<TrackNamespace> outgoing_subscribe_namespaces_; // It's an error if the namespaces overlap, so keep track of them. - NamespaceTree incoming_subscribe_announces_; + NamespaceTree incoming_subscribe_namespace_; // The minimum request ID the peer can use that is monotonically increasing. uint64_t next_incoming_request_id_ = 0;
diff --git a/quiche/quic/moqt/moqt_session_callbacks.h b/quiche/quic/moqt/moqt_session_callbacks.h index cc0a35c..f104de0 100644 --- a/quiche/quic/moqt/moqt_session_callbacks.h +++ b/quiche/quic/moqt/moqt_session_callbacks.h
@@ -29,10 +29,11 @@ // Called from the session destructor. using MoqtSessionDeletedCallback = quiche::SingleUseCallback<void()>; -// Called whenever an ANNOUNCE or UNANNOUNCE message is received from the peer. -// ANNOUNCE sets a value for |parameters|, UNANNOUNCE does not. -using MoqtIncomingAnnounceCallback = - quiche::MultiUseCallback<std::optional<MoqtAnnounceErrorReason>( +// Called whenever a PUBLISH_NAMESPACE or PUBLISH_NAMESPACE_DONE message is +// received from the peer. PUBLISH_NAMESPACE sets a value for |parameters|, +// PUBLISH_NAMESPACE_DONE does not. +using MoqtIncomingPublishNamespaceCallback = + quiche::MultiUseCallback<std::optional<MoqtPublishNamespaceErrorReason>( const TrackNamespace& track_namespace, const std::optional<VersionSpecificParameters>& parameters)>; @@ -46,12 +47,13 @@ const TrackNamespace& track_namespace, std::optional<VersionSpecificParameters> parameters)>; -inline std::optional<MoqtAnnounceErrorReason> DefaultIncomingAnnounceCallback( +inline std::optional<MoqtPublishNamespaceErrorReason> +DefaultIncomingPublishNamespaceCallback( const TrackNamespace& /*track_namespace*/, std::optional<VersionSpecificParameters> /*parameters*/) { - return std::optional(MoqtAnnounceErrorReason{ + return std::optional(MoqtPublishNamespaceErrorReason{ RequestErrorCode::kNotSupported, - "This endpoint does not accept incoming ANNOUNCE messages"}); + "This endpoint does not accept incoming PUBLISH_NAMESPACE messages"}); }; inline std::optional<MoqtSubscribeErrorReason> @@ -72,9 +74,9 @@ +[](absl::string_view) {}; MoqtSessionDeletedCallback session_deleted_callback = +[] {}; - MoqtIncomingAnnounceCallback incoming_announce_callback = - DefaultIncomingAnnounceCallback; - MoqtIncomingSubscribeNamespaceCallback incoming_subscribe_announces_callback = + MoqtIncomingPublishNamespaceCallback incoming_publish_namespace_callback = + DefaultIncomingPublishNamespaceCallback; + MoqtIncomingSubscribeNamespaceCallback incoming_subscribe_namespace_callback = DefaultIncomingSubscribeNamespaceCallback; const quic::QuicClock* clock = quic::QuicDefaultClock::Get(); };
diff --git a/quiche/quic/moqt/moqt_session_interface.h b/quiche/quic/moqt/moqt_session_interface.h index b60439d..cc2b51c 100644 --- a/quiche/quic/moqt/moqt_session_interface.h +++ b/quiche/quic/moqt/moqt_session_interface.h
@@ -63,17 +63,18 @@ using FetchResponseCallback = quiche::SingleUseCallback<void(std::unique_ptr<MoqtFetchTask> fetch_task)>; -// TODO(martinduke): MoqtOutgoingAnnounceCallback and +// TODO(martinduke): MoqtOutgoingPublishNamespaceCallback and // MoqtOutgoingSubscribeNamespaceCallback are deprecated. Remove. -// If |error_message| is nullopt, this is triggered by an ANNOUNCE_OK. -// Otherwise, it is triggered by ANNOUNCE_ERROR or ANNOUNCE_CANCEL. For -// ERROR or CANCEL, MoqtSession is deleting all ANNOUNCE state immediately -// after calling this callback. Alternatively, the application can call -// Unannounce() to delete the state. -using MoqtOutgoingAnnounceCallback = quiche::MultiUseCallback<void( +// If |error_message| is nullopt, this is triggered by a PUBLISH_NAMESPACE_OK. +// Otherwise, it is triggered by PUBLISH_NAMESPACE_ERROR or +// PUBLISH_NAMESPACE_CANCEL. For ERROR or CANCEL, MoqtSession is deleting all +// PUBLISH_NAMESPACE state immediately after calling this callback. +// Alternatively, the application can call PublishNamespaceDone() to delete the +// state. +using MoqtOutgoingPublishNamespaceCallback = quiche::MultiUseCallback<void( TrackNamespace track_namespace, - std::optional<MoqtAnnounceErrorReason> error)>; + std::optional<MoqtPublishNamespaceErrorReason> error)>; using MoqtOutgoingSubscribeNamespaceCallback = quiche::SingleUseCallback<void( TrackNamespace track_namespace, std::optional<RequestErrorCode> error, @@ -83,7 +84,7 @@ public: virtual ~MoqtSessionInterface() = default; - // TODO: move ANNOUNCE logic here. + // TODO: move PUBLISH_NAMESPACE logic here. // Callbacks for session-level events. virtual MoqtSessionCallbacks& callbacks() = 0; @@ -159,8 +160,8 @@ // TODO(martinduke): Add an API for absolute joining fetch. // TODO: Add SubscribeNamespace, UnsubscribeNamespace method. - // TODO: Add Announce, Unannounce method. - // TODO: Add AnnounceCancel method. + // TODO: Add PublishNamespace, PublishNamespaceDone method. + // TODO: Add PublishNamespaceCancel method. // TODO: Add TrackStatusRequest method. // TODO: Add SubscribeUpdate, PublishDone method.
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc index 5a62fb0..348e2c7 100644 --- a/quiche/quic/moqt/moqt_session_test.cc +++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -154,8 +154,8 @@ .WillByDefault(Return(MoqtForwardingPreference::kSubgroup)); } - // The publisher receives SUBSCRIBE and synchronously announces it will - // publish objects. + // The publisher receives SUBSCRIBE and synchronously publishes namespaces it + // supports. MoqtObjectListener* ReceiveSubscribeSynchronousOk( MockTrackPublisher* publisher, MoqtSubscribe& subscribe, MoqtControlParserVisitor* control_parser, uint64_t track_alias = 0) { @@ -384,112 +384,116 @@ stream_input->OnPublishMessage(publish); } -TEST_F(MoqtSessionTest, AnnounceWithOkAndCancel) { +TEST_F(MoqtSessionTest, PublishNamespaceWithOkAndCancel) { testing::MockFunction<void( TrackNamespace track_namespace, - std::optional<MoqtAnnounceErrorReason> error_message)> - announce_resolved_callback; + std::optional<MoqtPublishNamespaceErrorReason> error_message)> + publish_namespace_resolved_callback; std::unique_ptr<MoqtControlParserVisitor> stream_input = MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_); EXPECT_CALL(mock_session_, GetStreamById(_)).WillOnce(Return(&mock_stream_)); - EXPECT_CALL(mock_stream_, - Writev(ControlMessageOfType(MoqtMessageType::kAnnounce), _)); - session_.Announce(TrackNamespace("foo"), - announce_resolved_callback.AsStdFunction(), - VersionSpecificParameters()); + EXPECT_CALL( + mock_stream_, + Writev(ControlMessageOfType(MoqtMessageType::kPublishNamespace), _)); + session_.PublishNamespace(TrackNamespace("foo"), + publish_namespace_resolved_callback.AsStdFunction(), + VersionSpecificParameters()); - MoqtAnnounceOk ok = { + MoqtPublishNamespaceOk ok = { /*request_id=*/0, }; - EXPECT_CALL(announce_resolved_callback, Call(_, _)) + EXPECT_CALL(publish_namespace_resolved_callback, Call(_, _)) .WillOnce([&](TrackNamespace track_namespace, - std::optional<MoqtAnnounceErrorReason> error) { + std::optional<MoqtPublishNamespaceErrorReason> error) { EXPECT_EQ(track_namespace, TrackNamespace("foo")); EXPECT_FALSE(error.has_value()); }); - stream_input->OnAnnounceOkMessage(ok); + stream_input->OnPublishNamespaceOkMessage(ok); - MoqtAnnounceCancel cancel = { + MoqtPublishNamespaceCancel cancel = { TrackNamespace("foo"), RequestErrorCode::kInternalError, /*error_reason=*/"Test error", }; - EXPECT_CALL(announce_resolved_callback, Call(_, _)) + EXPECT_CALL(publish_namespace_resolved_callback, Call(_, _)) .WillOnce([&](TrackNamespace track_namespace, - std::optional<MoqtAnnounceErrorReason> error) { + std::optional<MoqtPublishNamespaceErrorReason> error) { EXPECT_EQ(track_namespace, TrackNamespace("foo")); ASSERT_TRUE(error.has_value()); EXPECT_EQ(error->error_code, RequestErrorCode::kInternalError); EXPECT_EQ(error->reason_phrase, "Test error"); }); - stream_input->OnAnnounceCancelMessage(cancel); + stream_input->OnPublishNamespaceCancelMessage(cancel); // State is gone. - EXPECT_FALSE(session_.Unannounce(TrackNamespace("foo"))); + EXPECT_FALSE(session_.PublishNamespaceDone(TrackNamespace("foo"))); } -TEST_F(MoqtSessionTest, AnnounceWithOkAndUnannounce) { +TEST_F(MoqtSessionTest, PublishNamespaceWithOkAndPublishNamespaceDone) { testing::MockFunction<void( TrackNamespace track_namespace, - std::optional<MoqtAnnounceErrorReason> error_message)> - announce_resolved_callback; + std::optional<MoqtPublishNamespaceErrorReason> error_message)> + publish_namespace_resolved_callback; std::unique_ptr<MoqtControlParserVisitor> stream_input = MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_); EXPECT_CALL(mock_session_, GetStreamById(_)).WillOnce(Return(&mock_stream_)); - EXPECT_CALL(mock_stream_, - Writev(ControlMessageOfType(MoqtMessageType::kAnnounce), _)); - session_.Announce(TrackNamespace{"foo"}, - announce_resolved_callback.AsStdFunction(), - VersionSpecificParameters()); + EXPECT_CALL( + mock_stream_, + Writev(ControlMessageOfType(MoqtMessageType::kPublishNamespace), _)); + session_.PublishNamespace(TrackNamespace{"foo"}, + publish_namespace_resolved_callback.AsStdFunction(), + VersionSpecificParameters()); - MoqtAnnounceOk ok = { + MoqtPublishNamespaceOk ok = { /*request_id=*/0, }; - EXPECT_CALL(announce_resolved_callback, Call(_, _)) + EXPECT_CALL(publish_namespace_resolved_callback, Call(_, _)) .WillOnce([&](TrackNamespace track_namespace, - std::optional<MoqtAnnounceErrorReason> error) { + std::optional<MoqtPublishNamespaceErrorReason> error) { EXPECT_EQ(track_namespace, TrackNamespace{"foo"}); EXPECT_FALSE(error.has_value()); }); - stream_input->OnAnnounceOkMessage(ok); + stream_input->OnPublishNamespaceOkMessage(ok); EXPECT_CALL(mock_session_, GetStreamById(_)).WillOnce(Return(&mock_stream_)); - EXPECT_CALL(mock_stream_, - Writev(ControlMessageOfType(MoqtMessageType::kUnannounce), _)); - session_.Unannounce(TrackNamespace{"foo"}); + EXPECT_CALL( + mock_stream_, + Writev(ControlMessageOfType(MoqtMessageType::kPublishNamespaceDone), _)); + session_.PublishNamespaceDone(TrackNamespace{"foo"}); // State is gone. - EXPECT_FALSE(session_.Unannounce(TrackNamespace{"foo"})); + EXPECT_FALSE(session_.PublishNamespaceDone(TrackNamespace{"foo"})); } -TEST_F(MoqtSessionTest, AnnounceWithError) { +TEST_F(MoqtSessionTest, PublishNamespaceWithError) { testing::MockFunction<void( TrackNamespace track_namespace, - std::optional<MoqtAnnounceErrorReason> error_message)> - announce_resolved_callback; + std::optional<MoqtPublishNamespaceErrorReason> error_message)> + publish_namespace_resolved_callback; std::unique_ptr<MoqtControlParserVisitor> stream_input = MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_); EXPECT_CALL(mock_session_, GetStreamById(_)).WillOnce(Return(&mock_stream_)); - EXPECT_CALL(mock_stream_, - Writev(ControlMessageOfType(MoqtMessageType::kAnnounce), _)); - session_.Announce(TrackNamespace{"foo"}, - announce_resolved_callback.AsStdFunction(), - VersionSpecificParameters()); + EXPECT_CALL( + mock_stream_, + Writev(ControlMessageOfType(MoqtMessageType::kPublishNamespace), _)); + session_.PublishNamespace(TrackNamespace{"foo"}, + publish_namespace_resolved_callback.AsStdFunction(), + VersionSpecificParameters()); - MoqtAnnounceError error = { + MoqtPublishNamespaceError error = { /*request_id=*/0, /*error_code=*/RequestErrorCode::kInternalError, /*reason_phrase=*/"Test error", }; - EXPECT_CALL(announce_resolved_callback, Call(_, _)) + EXPECT_CALL(publish_namespace_resolved_callback, Call(_, _)) .WillOnce([&](TrackNamespace track_namespace, - std::optional<MoqtAnnounceErrorReason> error) { + std::optional<MoqtPublishNamespaceErrorReason> error) { EXPECT_EQ(track_namespace, TrackNamespace{"foo"}); ASSERT_TRUE(error.has_value()); EXPECT_EQ(error->error_code, RequestErrorCode::kInternalError); EXPECT_EQ(error->reason_phrase, "Test error"); }); - stream_input->OnAnnounceErrorMessage(error); + stream_input->OnPublishNamespaceErrorMessage(error); // State is gone. - EXPECT_FALSE(session_.Unannounce(TrackNamespace{"foo"})); + EXPECT_FALSE(session_.PublishNamespaceDone(TrackNamespace{"foo"})); } TEST_F(MoqtSessionTest, AsynchronousSubscribeReturnsOk) { @@ -940,88 +944,89 @@ EXPECT_EQ(MoqtSessionPeer::remote_track(&session_, 2), nullptr); } -TEST_F(MoqtSessionTest, ReplyToAnnounceWithOkThenUnannounce) { +TEST_F(MoqtSessionTest, ReplyToPublishNamespaceWithOkThenPublishNamespaceDone) { TrackNamespace track_namespace{"foo"}; std::unique_ptr<MoqtControlParserVisitor> stream_input = MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_); auto parameters = std::make_optional<VersionSpecificParameters>( AuthTokenType::kOutOfBand, "foo"); - MoqtAnnounce announce = { + MoqtPublishNamespace publish_namespace = { kDefaultPeerRequestId, track_namespace, *parameters, }; - EXPECT_CALL(session_callbacks_.incoming_announce_callback, + EXPECT_CALL(session_callbacks_.incoming_publish_namespace_callback, Call(track_namespace, parameters)) .WillOnce(Return(std::nullopt)); - EXPECT_CALL( - mock_stream_, - Writev(SerializedControlMessage(MoqtAnnounceOk{kDefaultPeerRequestId}), - _)); - stream_input->OnAnnounceMessage(announce); - MoqtUnannounce unannounce = { + EXPECT_CALL(mock_stream_, + Writev(SerializedControlMessage( + MoqtPublishNamespaceOk{kDefaultPeerRequestId}), + _)); + stream_input->OnPublishNamespaceMessage(publish_namespace); + MoqtPublishNamespaceDone unpublish_namespace = { track_namespace, }; - EXPECT_CALL(session_callbacks_.incoming_announce_callback, + EXPECT_CALL(session_callbacks_.incoming_publish_namespace_callback, Call(track_namespace, std::optional<VersionSpecificParameters>())) .WillOnce(Return(std::nullopt)); - stream_input->OnUnannounceMessage(unannounce); + stream_input->OnPublishNamespaceDoneMessage(unpublish_namespace); } -TEST_F(MoqtSessionTest, ReplyToAnnounceWithOkThenAnnounceCancel) { +TEST_F(MoqtSessionTest, + ReplyToPublishNamespaceWithOkThenPublishNamespaceCancel) { TrackNamespace track_namespace{"foo"}; std::unique_ptr<MoqtControlParserVisitor> stream_input = MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_); auto parameters = std::make_optional<VersionSpecificParameters>( AuthTokenType::kOutOfBand, "foo"); - MoqtAnnounce announce = { + MoqtPublishNamespace publish_namespace = { kDefaultPeerRequestId, track_namespace, *parameters, }; - EXPECT_CALL(session_callbacks_.incoming_announce_callback, + EXPECT_CALL(session_callbacks_.incoming_publish_namespace_callback, Call(track_namespace, parameters)) .WillOnce(Return(std::nullopt)); - EXPECT_CALL( - mock_stream_, - Writev(SerializedControlMessage(MoqtAnnounceOk{kDefaultPeerRequestId}), - _)); - stream_input->OnAnnounceMessage(announce); EXPECT_CALL(mock_stream_, - Writev(SerializedControlMessage(MoqtAnnounceCancel{ + Writev(SerializedControlMessage( + MoqtPublishNamespaceOk{kDefaultPeerRequestId}), + _)); + stream_input->OnPublishNamespaceMessage(publish_namespace); + EXPECT_CALL(mock_stream_, + Writev(SerializedControlMessage(MoqtPublishNamespaceCancel{ track_namespace, RequestErrorCode::kInternalError, "deadbeef"}), _)); - session_.CancelAnnounce(track_namespace, RequestErrorCode::kInternalError, - "deadbeef"); + session_.CancelPublishNamespace(track_namespace, + RequestErrorCode::kInternalError, "deadbeef"); } -TEST_F(MoqtSessionTest, ReplyToAnnounceWithError) { +TEST_F(MoqtSessionTest, ReplyToPublishNamespaceWithError) { TrackNamespace track_namespace{"foo"}; std::unique_ptr<MoqtControlParserVisitor> stream_input = MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_); auto parameters = std::make_optional<VersionSpecificParameters>( AuthTokenType::kOutOfBand, "foo"); - MoqtAnnounce announce = { + MoqtPublishNamespace publish_namespace = { kDefaultPeerRequestId, track_namespace, *parameters, }; - MoqtAnnounceErrorReason error = { + MoqtPublishNamespaceErrorReason error = { RequestErrorCode::kNotSupported, "deadbeef", }; - EXPECT_CALL(session_callbacks_.incoming_announce_callback, + EXPECT_CALL(session_callbacks_.incoming_publish_namespace_callback, Call(track_namespace, parameters)) .WillOnce(Return(error)); EXPECT_CALL( mock_stream_, - Writev(SerializedControlMessage(MoqtAnnounceError{ + Writev(SerializedControlMessage(MoqtPublishNamespaceError{ kDefaultPeerRequestId, error.error_code, error.reason_phrase}), _)); - stream_input->OnAnnounceMessage(announce); + stream_input->OnPublishNamespaceMessage(publish_namespace); } TEST_F(MoqtSessionTest, SubscribeNamespaceLifeCycle) { @@ -2741,7 +2746,7 @@ TrackNamespace track_namespace = TrackNamespace{"foo"}; auto parameters = std::make_optional<VersionSpecificParameters>( AuthTokenType::kOutOfBand, "foo"); - MoqtSubscribeNamespace announces = { + MoqtSubscribeNamespace publish_namespaces = { /*request_id=*/1, track_namespace, *parameters, @@ -2749,27 +2754,27 @@ webtransport::test::MockStream control_stream; std::unique_ptr<MoqtControlParserVisitor> stream_input = MoqtSessionPeer::CreateControlStream(&session_, &control_stream); - EXPECT_CALL(session_callbacks_.incoming_subscribe_announces_callback, + EXPECT_CALL(session_callbacks_.incoming_subscribe_namespace_callback, Call(_, parameters)) .WillOnce(Return(std::nullopt)); EXPECT_CALL( control_stream, Writev(ControlMessageOfType(MoqtMessageType::kSubscribeNamespaceOk), _)); - stream_input->OnSubscribeNamespaceMessage(announces); - MoqtUnsubscribeNamespace unsubscribe_announces = { + stream_input->OnSubscribeNamespaceMessage(publish_namespaces); + MoqtUnsubscribeNamespace ununsubscribe_namespaces = { TrackNamespace{"foo"}, }; - EXPECT_CALL(session_callbacks_.incoming_subscribe_announces_callback, + EXPECT_CALL(session_callbacks_.incoming_subscribe_namespace_callback, Call(track_namespace, std::optional<VersionSpecificParameters>())) .WillOnce(Return(std::nullopt)); - stream_input->OnUnsubscribeNamespaceMessage(unsubscribe_announces); + stream_input->OnUnsubscribeNamespaceMessage(ununsubscribe_namespaces); } TEST_F(MoqtSessionTest, IncomingSubscribeNamespaceWithError) { TrackNamespace track_namespace{"foo"}; auto parameters = std::make_optional<VersionSpecificParameters>( AuthTokenType::kOutOfBand, "foo"); - MoqtSubscribeNamespace announces = { + MoqtSubscribeNamespace publish_namespaces = { /*request_id=*/1, track_namespace, *parameters, @@ -2777,7 +2782,7 @@ webtransport::test::MockStream control_stream; std::unique_ptr<MoqtControlParserVisitor> stream_input = MoqtSessionPeer::CreateControlStream(&session_, &control_stream); - EXPECT_CALL(session_callbacks_.incoming_subscribe_announces_callback, + EXPECT_CALL(session_callbacks_.incoming_subscribe_namespace_callback, Call(_, parameters)) .WillOnce(Return( MoqtSubscribeErrorReason{RequestErrorCode::kUnauthorized, "foo"})); @@ -2785,24 +2790,24 @@ control_stream, Writev(ControlMessageOfType(MoqtMessageType::kSubscribeNamespaceError), _)); - stream_input->OnSubscribeNamespaceMessage(announces); + stream_input->OnSubscribeNamespaceMessage(publish_namespaces); // Try again, to verify that it was purged from the tree. - announces.request_id += 2; - EXPECT_CALL(session_callbacks_.incoming_subscribe_announces_callback, + publish_namespaces.request_id += 2; + EXPECT_CALL(session_callbacks_.incoming_subscribe_namespace_callback, Call(_, parameters)) .WillOnce(Return(std::nullopt)); EXPECT_CALL( control_stream, Writev(ControlMessageOfType(MoqtMessageType::kSubscribeNamespaceOk), _)); - stream_input->OnSubscribeNamespaceMessage(announces); + stream_input->OnSubscribeNamespaceMessage(publish_namespaces); } TEST_F(MoqtSessionTest, IncomingSubscribeNamespaceWithPrefixOverlap) { TrackNamespace track_namespace{"foo"}; auto parameters = std::make_optional<VersionSpecificParameters>( AuthTokenType::kOutOfBand, "foo"); - MoqtSubscribeNamespace announces = { + MoqtSubscribeNamespace publish_namespaces = { /*request_id=*/1, track_namespace, *parameters, @@ -2810,41 +2815,41 @@ webtransport::test::MockStream control_stream; std::unique_ptr<MoqtControlParserVisitor> stream_input = MoqtSessionPeer::CreateControlStream(&session_, &control_stream); - EXPECT_CALL(session_callbacks_.incoming_subscribe_announces_callback, + EXPECT_CALL(session_callbacks_.incoming_subscribe_namespace_callback, Call(_, parameters)) .WillOnce(Return(std::nullopt)); EXPECT_CALL( control_stream, Writev(ControlMessageOfType(MoqtMessageType::kSubscribeNamespaceOk), _)); - stream_input->OnSubscribeNamespaceMessage(announces); + stream_input->OnSubscribeNamespaceMessage(publish_namespaces); // Overlapping request is rejected. - announces.request_id += 2; - announces.track_namespace = TrackNamespace{"foo", "bar"}; + publish_namespaces.request_id += 2; + publish_namespaces.track_namespace = TrackNamespace{"foo", "bar"}; EXPECT_CALL( control_stream, Writev(ControlMessageOfType(MoqtMessageType::kSubscribeNamespaceError), _)); - stream_input->OnSubscribeNamespaceMessage(announces); + stream_input->OnSubscribeNamespaceMessage(publish_namespaces); // Remove the subscription. Now a later one will work. - MoqtUnsubscribeNamespace unsubscribe_announces = { + MoqtUnsubscribeNamespace ununsubscribe_namespaces = { TrackNamespace{"foo"}, }; - EXPECT_CALL(session_callbacks_.incoming_subscribe_announces_callback, + EXPECT_CALL(session_callbacks_.incoming_subscribe_namespace_callback, Call(track_namespace, std::optional<VersionSpecificParameters>())) .WillOnce(Return(std::nullopt)); - stream_input->OnUnsubscribeNamespaceMessage(unsubscribe_announces); + stream_input->OnUnsubscribeNamespaceMessage(ununsubscribe_namespaces); // Try again, it will work. - announces.request_id += 2; - EXPECT_CALL(session_callbacks_.incoming_subscribe_announces_callback, + publish_namespaces.request_id += 2; + EXPECT_CALL(session_callbacks_.incoming_subscribe_namespace_callback, Call(_, parameters)) .WillOnce(Return(std::nullopt)); EXPECT_CALL( control_stream, Writev(ControlMessageOfType(MoqtMessageType::kSubscribeNamespaceOk), _)); - stream_input->OnSubscribeNamespaceMessage(announces); + stream_input->OnSubscribeNamespaceMessage(publish_namespaces); } TEST_F(MoqtSessionTest, FetchThenOkThenCancel) { @@ -3411,10 +3416,10 @@ std::optional<RequestErrorCode> /*error*/, absl::string_view /*reason*/) {}, VersionSpecificParameters())); - session_.Announce( + session_.PublishNamespace( TrackNamespace{"foo"}, +[](TrackNamespace /*track_namespace*/, - std::optional<MoqtAnnounceErrorReason> /*error*/) {}, + std::optional<MoqtPublishNamespaceErrorReason> /*error*/) {}, VersionSpecificParameters()); EXPECT_FALSE(session_.Fetch( FullTrackName{TrackNamespace("foo"), "bar"}, @@ -3445,10 +3450,11 @@ mock_stream_, Writev(ControlMessageOfType(MoqtMessageType::kSubscribeError), _)); stream_input->OnSubscribeMessage(DefaultSubscribe()); - EXPECT_CALL(mock_stream_, - Writev(ControlMessageOfType(MoqtMessageType::kAnnounceError), _)); - stream_input->OnAnnounceMessage( - MoqtAnnounce(3, TrackNamespace("foo"), VersionSpecificParameters())); + EXPECT_CALL( + mock_stream_, + Writev(ControlMessageOfType(MoqtMessageType::kPublishNamespaceError), _)); + stream_input->OnPublishNamespaceMessage(MoqtPublishNamespace( + 3, TrackNamespace("foo"), VersionSpecificParameters())); EXPECT_CALL(mock_stream_, Writev(ControlMessageOfType(MoqtMessageType::kFetchError), _)); MoqtFetch fetch = DefaultFetch(); @@ -3465,7 +3471,7 @@ mock_stream_, Writev(ControlMessageOfType(MoqtMessageType::kTrackStatusError), _)); stream_input->OnTrackStatusMessage(track_status); - // Block all outgoing SUBSCRIBE, ANNOUNCE, GOAWAY,etc. + // Block all outgoing SUBSCRIBE, PUBLISH_NAMESPACE, GOAWAY,etc. EXPECT_CALL(mock_stream_, Writev).Times(0); MockSubscribeRemoteTrackVisitor remote_track_visitor; EXPECT_FALSE(session_.SubscribeCurrentObject( @@ -3477,10 +3483,10 @@ std::optional<RequestErrorCode> /*error*/, absl::string_view /*reason*/) {}, VersionSpecificParameters())); - session_.Announce( + session_.PublishNamespace( TrackNamespace{"foo"}, +[](TrackNamespace /*track_namespace*/, - std::optional<MoqtAnnounceErrorReason> /*error*/) {}, + std::optional<MoqtPublishNamespaceErrorReason> /*error*/) {}, VersionSpecificParameters()); EXPECT_FALSE(session_.Fetch( FullTrackName(TrackNamespace("foo"), "bar"),
diff --git a/quiche/quic/moqt/test_tools/moqt_framer_utils.cc b/quiche/quic/moqt/test_tools/moqt_framer_utils.cc index 8643c12..6073b87 100644 --- a/quiche/quic/moqt/test_tools/moqt_framer_utils.cc +++ b/quiche/quic/moqt/test_tools/moqt_framer_utils.cc
@@ -50,20 +50,20 @@ quiche::QuicheBuffer operator()(const MoqtSubscribeUpdate& message) { return framer.SerializeSubscribeUpdate(message); } - quiche::QuicheBuffer operator()(const MoqtAnnounce& message) { - return framer.SerializeAnnounce(message); + quiche::QuicheBuffer operator()(const MoqtPublishNamespace& message) { + return framer.SerializePublishNamespace(message); } - quiche::QuicheBuffer operator()(const MoqtAnnounceOk& message) { - return framer.SerializeAnnounceOk(message); + quiche::QuicheBuffer operator()(const MoqtPublishNamespaceOk& message) { + return framer.SerializePublishNamespaceOk(message); } - quiche::QuicheBuffer operator()(const MoqtAnnounceError& message) { - return framer.SerializeAnnounceError(message); + quiche::QuicheBuffer operator()(const MoqtPublishNamespaceError& message) { + return framer.SerializePublishNamespaceError(message); } - quiche::QuicheBuffer operator()(const MoqtUnannounce& message) { - return framer.SerializeUnannounce(message); + quiche::QuicheBuffer operator()(const MoqtPublishNamespaceDone& message) { + return framer.SerializePublishNamespaceDone(message); } - quiche::QuicheBuffer operator()(const MoqtAnnounceCancel& message) { - return framer.SerializeAnnounceCancel(message); + quiche::QuicheBuffer operator()(const MoqtPublishNamespaceCancel& message) { + return framer.SerializePublishNamespaceCancel(message); } quiche::QuicheBuffer operator()(const MoqtTrackStatus& message) { return framer.SerializeTrackStatus(message); @@ -153,19 +153,21 @@ void OnSubscribeUpdateMessage(const MoqtSubscribeUpdate& message) { frames_.push_back(message); } - void OnAnnounceMessage(const MoqtAnnounce& message) { + void OnPublishNamespaceMessage(const MoqtPublishNamespace& message) { frames_.push_back(message); } - void OnAnnounceOkMessage(const MoqtAnnounceOk& message) { + void OnPublishNamespaceOkMessage(const MoqtPublishNamespaceOk& message) { frames_.push_back(message); } - void OnAnnounceErrorMessage(const MoqtAnnounceError& message) { + void OnPublishNamespaceErrorMessage( + const MoqtPublishNamespaceError& message) { frames_.push_back(message); } - void OnUnannounceMessage(const MoqtUnannounce& message) { + void OnPublishNamespaceDoneMessage(const MoqtPublishNamespaceDone& message) { frames_.push_back(message); } - void OnAnnounceCancelMessage(const MoqtAnnounceCancel& message) { + void OnPublishNamespaceCancelMessage( + const MoqtPublishNamespaceCancel& message) { frames_.push_back(message); } void OnTrackStatusMessage(const MoqtTrackStatus& message) {
diff --git a/quiche/quic/moqt/test_tools/moqt_framer_utils.h b/quiche/quic/moqt/test_tools/moqt_framer_utils.h index b0e05d7..57e0d5b 100644 --- a/quiche/quic/moqt/test_tools/moqt_framer_utils.h +++ b/quiche/quic/moqt/test_tools/moqt_framer_utils.h
@@ -28,9 +28,9 @@ using MoqtGenericFrame = std::variant< MoqtClientSetup, MoqtServerSetup, MoqtSubscribe, MoqtSubscribeOk, MoqtSubscribeError, MoqtUnsubscribe, MoqtPublishDone, MoqtSubscribeUpdate, - MoqtAnnounce, MoqtAnnounceOk, MoqtAnnounceError, MoqtUnannounce, - MoqtAnnounceCancel, MoqtTrackStatus, MoqtTrackStatusOk, - MoqtTrackStatusError, MoqtGoAway, MoqtSubscribeNamespace, + MoqtPublishNamespace, MoqtPublishNamespaceOk, MoqtPublishNamespaceError, + MoqtPublishNamespaceDone, MoqtPublishNamespaceCancel, MoqtTrackStatus, + MoqtTrackStatusOk, MoqtTrackStatusError, MoqtGoAway, MoqtSubscribeNamespace, MoqtSubscribeNamespaceOk, MoqtSubscribeNamespaceError, MoqtUnsubscribeNamespace, MoqtMaxRequestId, MoqtFetch, MoqtFetchCancel, MoqtFetchOk, MoqtFetchError, MoqtRequestsBlocked, MoqtPublish,
diff --git a/quiche/quic/moqt/test_tools/moqt_parser_test_visitor.h b/quiche/quic/moqt/test_tools/moqt_parser_test_visitor.h index be6f3b2..6998d98 100644 --- a/quiche/quic/moqt/test_tools/moqt_parser_test_visitor.h +++ b/quiche/quic/moqt/test_tools/moqt_parser_test_visitor.h
@@ -66,19 +66,23 @@ void OnPublishDoneMessage(const MoqtPublishDone& message) override { OnControlMessage(message); } - void OnAnnounceMessage(const MoqtAnnounce& message) override { + void OnPublishNamespaceMessage(const MoqtPublishNamespace& message) override { OnControlMessage(message); } - void OnAnnounceOkMessage(const MoqtAnnounceOk& message) override { + void OnPublishNamespaceOkMessage( + const MoqtPublishNamespaceOk& message) override { OnControlMessage(message); } - void OnAnnounceErrorMessage(const MoqtAnnounceError& message) override { + void OnPublishNamespaceErrorMessage( + const MoqtPublishNamespaceError& message) override { OnControlMessage(message); } - void OnUnannounceMessage(const MoqtUnannounce& message) override { + void OnPublishNamespaceDoneMessage( + const MoqtPublishNamespaceDone& message) override { OnControlMessage(message); } - void OnAnnounceCancelMessage(const MoqtAnnounceCancel& message) override { + void OnPublishNamespaceCancelMessage( + const MoqtPublishNamespaceCancel& message) override { OnControlMessage(message); } void OnTrackStatusMessage(const MoqtTrackStatus& message) override {
diff --git a/quiche/quic/moqt/test_tools/moqt_simulator_harness.cc b/quiche/quic/moqt/test_tools/moqt_simulator_harness.cc index 9c743eb..b124720 100644 --- a/quiche/quic/moqt/test_tools/moqt_simulator_harness.cc +++ b/quiche/quic/moqt/test_tools/moqt_simulator_harness.cc
@@ -39,7 +39,7 @@ MoqtSessionCallbacks CreateCallbacks(quic::simulator::Simulator* simulator) { return MoqtSessionCallbacks( +[] {}, +[](absl::string_view) {}, +[](absl::string_view) {}, +[] {}, - DefaultIncomingAnnounceCallback, + DefaultIncomingPublishNamespaceCallback, DefaultIncomingSubscribeNamespaceCallback, simulator->GetClock()); } } // namespace
diff --git a/quiche/quic/moqt/test_tools/moqt_test_message.h b/quiche/quic/moqt/test_tools/moqt_test_message.h index 13126e4..2af6e2b 100644 --- a/quiche/quic/moqt/test_tools/moqt_test_message.h +++ b/quiche/quic/moqt/test_tools/moqt_test_message.h
@@ -69,16 +69,18 @@ public: virtual ~TestMessageBase() = default; - using MessageStructuredData = std::variant< - MoqtClientSetup, MoqtServerSetup, MoqtObject, MoqtSubscribe, - MoqtSubscribeOk, MoqtSubscribeError, MoqtUnsubscribe, MoqtPublishDone, - MoqtSubscribeUpdate, MoqtAnnounce, MoqtAnnounceOk, MoqtAnnounceError, - MoqtUnannounce, MoqtAnnounceCancel, MoqtTrackStatus, MoqtTrackStatusOk, - MoqtTrackStatusError, MoqtGoAway, MoqtSubscribeNamespace, - MoqtSubscribeNamespaceOk, MoqtSubscribeNamespaceError, - MoqtUnsubscribeNamespace, MoqtMaxRequestId, MoqtFetch, MoqtFetchCancel, - MoqtFetchOk, MoqtFetchError, MoqtRequestsBlocked, MoqtPublish, - MoqtPublishOk, MoqtPublishError, MoqtObjectAck>; + using MessageStructuredData = + std::variant<MoqtClientSetup, MoqtServerSetup, MoqtObject, MoqtSubscribe, + MoqtSubscribeOk, MoqtSubscribeError, MoqtUnsubscribe, + MoqtPublishDone, MoqtSubscribeUpdate, MoqtPublishNamespace, + MoqtPublishNamespaceOk, MoqtPublishNamespaceError, + MoqtPublishNamespaceDone, MoqtPublishNamespaceCancel, + MoqtTrackStatus, MoqtTrackStatusOk, MoqtTrackStatusError, + MoqtGoAway, MoqtSubscribeNamespace, MoqtSubscribeNamespaceOk, + MoqtSubscribeNamespaceError, MoqtUnsubscribeNamespace, + MoqtMaxRequestId, MoqtFetch, MoqtFetchCancel, MoqtFetchOk, + MoqtFetchError, MoqtRequestsBlocked, MoqtPublish, + MoqtPublishOk, MoqtPublishError, MoqtObjectAck>; // The total actual size of the message. size_t total_message_size() const { return wire_image_size_; } @@ -969,24 +971,24 @@ }; }; -class QUICHE_NO_EXPORT AnnounceMessage : public TestMessageBase { +class QUICHE_NO_EXPORT PublishNamespaceMessage : public TestMessageBase { public: - AnnounceMessage() : TestMessageBase() { + PublishNamespaceMessage() : TestMessageBase() { SetWireImage(raw_packet_, sizeof(raw_packet_)); } bool EqualFieldValues(MessageStructuredData& values) const override { - auto cast = std::get<MoqtAnnounce>(values); - if (cast.request_id != announce_.request_id) { - QUIC_LOG(INFO) << "ANNOUNCE request ID mismatch"; + auto cast = std::get<MoqtPublishNamespace>(values); + if (cast.request_id != publish_namespace_.request_id) { + QUIC_LOG(INFO) << "PUBLISH_NAMESPACE request ID mismatch"; return false; } - if (cast.track_namespace != announce_.track_namespace) { - QUIC_LOG(INFO) << "ANNOUNCE track namespace mismatch"; + if (cast.track_namespace != publish_namespace_.track_namespace) { + QUIC_LOG(INFO) << "PUBLISH_NAMESPACE track namespace mismatch"; return false; } - if (cast.parameters != announce_.parameters) { - QUIC_LOG(INFO) << "ANNOUNCE parameter mismatch"; + if (cast.parameters != publish_namespace_.parameters) { + QUIC_LOG(INFO) << "PUBLISH_NAMESPACE parameter mismatch"; return false; } return true; @@ -995,7 +997,7 @@ void ExpandVarints() override { ExpandVarintsImpl("vvv---vvv-----"); } MessageStructuredData structured_data() const override { - return TestMessageBase::MessageStructuredData(announce_); + return TestMessageBase::MessageStructuredData(publish_namespace_); } private: @@ -1006,23 +1008,23 @@ 0x03, 0x05, 0x03, 0x00, 0x62, 0x61, 0x72, // authorization_tag = "bar" }; - MoqtAnnounce announce_ = { + MoqtPublishNamespace publish_namespace_ = { /*request_id=*/2, TrackNamespace{"foo"}, VersionSpecificParameters(AuthTokenType::kOutOfBand, "bar"), }; }; -class QUICHE_NO_EXPORT AnnounceOkMessage : public TestMessageBase { +class QUICHE_NO_EXPORT PublishNamespaceOkMessage : public TestMessageBase { public: - AnnounceOkMessage() : TestMessageBase() { + PublishNamespaceOkMessage() : TestMessageBase() { SetWireImage(raw_packet_, sizeof(raw_packet_)); } bool EqualFieldValues(MessageStructuredData& values) const override { - auto cast = std::get<MoqtAnnounceOk>(values); - if (cast.request_id != announce_ok_.request_id) { - QUIC_LOG(INFO) << "ANNOUNCE OK MESSAGE request ID mismatch"; + auto cast = std::get<MoqtPublishNamespaceOk>(values); + if (cast.request_id != publish_namespace_ok_.request_id) { + QUIC_LOG(INFO) << "PUBLISH_NAMESPACE OK MESSAGE request ID mismatch"; return false; } return true; @@ -1031,7 +1033,7 @@ void ExpandVarints() override { ExpandVarintsImpl("v"); } MessageStructuredData structured_data() const override { - return TestMessageBase::MessageStructuredData(announce_ok_); + return TestMessageBase::MessageStructuredData(publish_namespace_ok_); } private: @@ -1039,29 +1041,29 @@ 0x07, 0x00, 0x01, 0x01, // request_id = 1 }; - MoqtAnnounceOk announce_ok_ = { + MoqtPublishNamespaceOk publish_namespace_ok_ = { /*request_id=*/1, }; }; -class QUICHE_NO_EXPORT AnnounceErrorMessage : public TestMessageBase { +class QUICHE_NO_EXPORT PublishNamespaceErrorMessage : public TestMessageBase { public: - AnnounceErrorMessage() : TestMessageBase() { + PublishNamespaceErrorMessage() : TestMessageBase() { SetWireImage(raw_packet_, sizeof(raw_packet_)); } bool EqualFieldValues(MessageStructuredData& values) const override { - auto cast = std::get<MoqtAnnounceError>(values); - if (cast.request_id != announce_error_.request_id) { - QUIC_LOG(INFO) << "ANNOUNCE_ERROR request ID mismatch"; + auto cast = std::get<MoqtPublishNamespaceError>(values); + if (cast.request_id != publish_namespace_error_.request_id) { + QUIC_LOG(INFO) << "PUBLISH_NAMESPACE_ERROR request ID mismatch"; return false; } - if (cast.error_code != announce_error_.error_code) { - QUIC_LOG(INFO) << "ANNOUNCE_ERROR error code mismatch"; + if (cast.error_code != publish_namespace_error_.error_code) { + QUIC_LOG(INFO) << "PUBLISH_NAMESPACE_ERROR error code mismatch"; return false; } - if (cast.error_reason != announce_error_.error_reason) { - QUIC_LOG(INFO) << "ANNOUNCE_ERROR error reason mismatch"; + if (cast.error_reason != publish_namespace_error_.error_reason) { + QUIC_LOG(INFO) << "PUBLISH_NAMESPACE_ERROR error reason mismatch"; return false; } return true; @@ -1070,7 +1072,7 @@ void ExpandVarints() override { ExpandVarintsImpl("vvv---"); } MessageStructuredData structured_data() const override { - return TestMessageBase::MessageStructuredData(announce_error_); + return TestMessageBase::MessageStructuredData(publish_namespace_error_); } private: @@ -1080,23 +1082,23 @@ 0x03, 0x62, 0x61, 0x72, // reason_phrase = "bar" }; - MoqtAnnounceError announce_error_ = { + MoqtPublishNamespaceError publish_namespace_error_ = { /*request_id=*/1, RequestErrorCode::kNotSupported, /*reason_phrase=*/"bar", }; }; -class QUICHE_NO_EXPORT UnannounceMessage : public TestMessageBase { +class QUICHE_NO_EXPORT PublishNamespaceDoneMessage : public TestMessageBase { public: - UnannounceMessage() : TestMessageBase() { + PublishNamespaceDoneMessage() : TestMessageBase() { SetWireImage(raw_packet_, sizeof(raw_packet_)); } bool EqualFieldValues(MessageStructuredData& values) const override { - auto cast = std::get<MoqtUnannounce>(values); - if (cast.track_namespace != unannounce_.track_namespace) { - QUIC_LOG(INFO) << "UNANNOUNCE track namespace mismatch"; + auto cast = std::get<MoqtPublishNamespaceDone>(values); + if (cast.track_namespace != publish_namespace_done_.track_namespace) { + QUIC_LOG(INFO) << "PUBLISH_NAMESPACE_DONE track namespace mismatch"; return false; } return true; @@ -1105,7 +1107,7 @@ void ExpandVarints() override { ExpandVarintsImpl("vv---"); } MessageStructuredData structured_data() const override { - return TestMessageBase::MessageStructuredData(unannounce_); + return TestMessageBase::MessageStructuredData(publish_namespace_done_); } private: @@ -1113,29 +1115,29 @@ 0x09, 0x00, 0x05, 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace }; - MoqtUnannounce unannounce_ = { + MoqtPublishNamespaceDone publish_namespace_done_ = { TrackNamespace("foo"), }; }; -class QUICHE_NO_EXPORT AnnounceCancelMessage : public TestMessageBase { +class QUICHE_NO_EXPORT PublishNamespaceCancelMessage : public TestMessageBase { public: - AnnounceCancelMessage() : TestMessageBase() { + PublishNamespaceCancelMessage() : TestMessageBase() { SetWireImage(raw_packet_, sizeof(raw_packet_)); } bool EqualFieldValues(MessageStructuredData& values) const override { - auto cast = std::get<MoqtAnnounceCancel>(values); - if (cast.track_namespace != announce_cancel_.track_namespace) { - QUIC_LOG(INFO) << "ANNOUNCE CANCEL track namespace mismatch"; + auto cast = std::get<MoqtPublishNamespaceCancel>(values); + if (cast.track_namespace != publish_namespace_cancel_.track_namespace) { + QUIC_LOG(INFO) << "PUBLISH_NAMESPACE CANCEL track namespace mismatch"; return false; } - if (cast.error_code != announce_cancel_.error_code) { - QUIC_LOG(INFO) << "ANNOUNCE CANCEL error code mismatch"; + if (cast.error_code != publish_namespace_cancel_.error_code) { + QUIC_LOG(INFO) << "PUBLISH_NAMESPACE CANCEL error code mismatch"; return false; } - if (cast.error_reason != announce_cancel_.error_reason) { - QUIC_LOG(INFO) << "ANNOUNCE CANCEL reason phrase mismatch"; + if (cast.error_reason != publish_namespace_cancel_.error_reason) { + QUIC_LOG(INFO) << "PUBLISH_NAMESPACE CANCEL reason phrase mismatch"; return false; } return true; @@ -1144,7 +1146,7 @@ void ExpandVarints() override { ExpandVarintsImpl("vv---vv---"); } MessageStructuredData structured_data() const override { - return TestMessageBase::MessageStructuredData(announce_cancel_); + return TestMessageBase::MessageStructuredData(publish_namespace_cancel_); } private: @@ -1155,7 +1157,7 @@ 0x03, 0x62, 0x61, 0x72, // error_reason = "bar" }; - MoqtAnnounceCancel announce_cancel_ = { + MoqtPublishNamespaceCancel publish_namespace_cancel_ = { TrackNamespace("foo"), RequestErrorCode::kNotSupported, /*error_reason=*/"bar", @@ -2061,16 +2063,16 @@ return std::make_unique<PublishDoneMessage>(); case MoqtMessageType::kSubscribeUpdate: return std::make_unique<SubscribeUpdateMessage>(); - case MoqtMessageType::kAnnounce: - return std::make_unique<AnnounceMessage>(); - case MoqtMessageType::kAnnounceOk: - return std::make_unique<AnnounceOkMessage>(); - case MoqtMessageType::kAnnounceError: - return std::make_unique<AnnounceErrorMessage>(); - case MoqtMessageType::kUnannounce: - return std::make_unique<UnannounceMessage>(); - case MoqtMessageType::kAnnounceCancel: - return std::make_unique<AnnounceCancelMessage>(); + case MoqtMessageType::kPublishNamespace: + return std::make_unique<PublishNamespaceMessage>(); + case MoqtMessageType::kPublishNamespaceOk: + return std::make_unique<PublishNamespaceOkMessage>(); + case MoqtMessageType::kPublishNamespaceError: + return std::make_unique<PublishNamespaceErrorMessage>(); + case MoqtMessageType::kPublishNamespaceDone: + return std::make_unique<PublishNamespaceDoneMessage>(); + case MoqtMessageType::kPublishNamespaceCancel: + return std::make_unique<PublishNamespaceCancelMessage>(); case MoqtMessageType::kTrackStatus: return std::make_unique<TrackStatusMessage>(); case MoqtMessageType::kTrackStatusOk:
diff --git a/quiche/quic/moqt/tools/chat_client.cc b/quiche/quic/moqt/tools/chat_client.cc index 1bfb70e..ab6f73c 100644 --- a/quiche/quic/moqt/tools/chat_client.cc +++ b/quiche/quic/moqt/tools/chat_client.cc
@@ -27,6 +27,7 @@ #include "quiche/quic/moqt/moqt_priority.h" #include "quiche/quic/moqt/moqt_publisher.h" #include "quiche/quic/moqt/moqt_session.h" +#include "quiche/quic/moqt/moqt_session_interface.h" #include "quiche/quic/moqt/tools/moq_chat.h" #include "quiche/quic/moqt/tools/moqt_client.h" #include "quiche/quic/platform/api/quic_default_proof_providers.h" @@ -39,35 +40,37 @@ namespace moqt::moq_chat { -std::optional<MoqtAnnounceErrorReason> ChatClient::OnIncomingAnnounce( +std::optional<MoqtPublishNamespaceErrorReason> +ChatClient::OnIncomingPublishNamespace( const moqt::TrackNamespace& track_namespace, std::optional<VersionSpecificParameters> parameters) { if (track_namespace == GetUserNamespace(my_track_name_)) { - // Ignore ANNOUNCE for my own track. - return std::optional<MoqtAnnounceErrorReason>(); + // Ignore PUBLISH_NAMESPACE for my own track. + return std::optional<MoqtPublishNamespaceErrorReason>(); } std::optional<FullTrackName> track_name = ConstructTrackNameFromNamespace( track_namespace, GetChatId(my_track_name_)); if (!parameters.has_value()) { - std::cout << "UNANNOUNCE for " << track_namespace.ToString() << "\n"; + std::cout << "PUBLISH_NAMESPACE_DONE for " << track_namespace.ToString() + << "\n"; if (track_name.has_value() && other_users_.contains(*track_name)) { session_->Unsubscribe(*track_name); other_users_.erase(*track_name); } return std::nullopt; } - std::cout << "ANNOUNCE for " << track_namespace.ToString() << "\n"; + std::cout << "PUBLISH_NAMESPACE for " << track_namespace.ToString() << "\n"; if (!track_name.has_value()) { - std::cout << "ANNOUNCE rejected, invalid namespace\n"; - return std::make_optional<MoqtAnnounceErrorReason>( + std::cout << "PUBLISH_NAMESPACE rejected, invalid namespace\n"; + return std::make_optional<MoqtPublishNamespaceErrorReason>( RequestErrorCode::kTrackDoesNotExist, "Not a subscribed namespace"); } if (other_users_.contains(*track_name)) { - std::cout << "Duplicate ANNOUNCE, send OK and ignore\n"; + std::cout << "Duplicate PUBLISH_NAMESPACE, send OK and ignore\n"; return std::nullopt; } if (GetUsername(my_track_name_) == GetUsername(*track_name)) { - std::cout << "ANNOUNCE for a previous instance of my track, " + std::cout << "PUBLISH_NAMESPACE for a previous instance of my track, " "do not subscribe\n"; return std::nullopt; } @@ -78,7 +81,7 @@ ++subscribes_to_make_; other_users_.emplace(*track_name); } - return std::nullopt; // Send ANNOUNCE_OK. + return std::nullopt; // Send PUBLISH_NAMESPACE_OK. } ChatClient::ChatClient(const quic::QuicServerId& server_id, @@ -128,8 +131,8 @@ session_callbacks_.session_deleted_callback = [this]() { session_ = nullptr; }; - session_callbacks_.incoming_announce_callback = - absl::bind_front(&ChatClient::OnIncomingAnnounce, this); + session_callbacks_.incoming_publish_namespace_callback = + absl::bind_front(&ChatClient::OnIncomingPublishNamespace, this); interface_->Initialize( [this](absl::string_view input_message) { OnTerminalLineInput(input_message); @@ -150,9 +153,9 @@ return; } if (input_message == "/exit") { - // Clean teardown of SUBSCRIBE_NAMESPACE, ANNOUNCE, SUBSCRIBE. + // Clean teardown of SUBSCRIBE_NAMESPACE, PUBLISH_NAMESPACE, SUBSCRIBE. session_->UnsubscribeNamespace(GetChatNamespace(my_track_name_)); - session_->Unannounce(GetUserNamespace(my_track_name_)); + session_->PublishNamespaceDone(GetUserNamespace(my_track_name_)); for (const auto& track_name : other_users_) { session_->Unsubscribe(track_name); } @@ -205,7 +208,7 @@ client_->WriteToOutput(GetUsername(*it), object); } -bool ChatClient::AnnounceAndSubscribeNamespace() { +bool ChatClient::PublishNamespaceAndSubscribeNamespace() { session_ = client_->session(); if (session_ == nullptr) { std::cout << "Failed to connect.\n"; @@ -217,26 +220,29 @@ my_track_name_, MoqtForwardingPreference::kSubgroup); publisher_.Add(queue_); session_->set_publisher(&publisher_); - MoqtOutgoingAnnounceCallback announce_callback = + MoqtOutgoingPublishNamespaceCallback publish_namespace_callback = [this](TrackNamespace track_namespace, - std::optional<MoqtAnnounceErrorReason> reason) { + std::optional<MoqtPublishNamespaceErrorReason> reason) { if (reason.has_value()) { - std::cout << "ANNOUNCE rejected, " << reason->reason_phrase << "\n"; - session_->Error(MoqtError::kInternalError, "Local ANNOUNCE rejected"); + std::cout << "PUBLISH_NAMESPACE rejected, " << reason->reason_phrase + << "\n"; + session_->Error(MoqtError::kInternalError, + "Local PUBLISH_NAMESPACE rejected"); return; } - std::cout << "ANNOUNCE for " << track_namespace.ToString() + std::cout << "PUBLISH_NAMESPACE for " << track_namespace.ToString() << " accepted\n"; return; }; std::cout << "Announcing " << GetUserNamespace(my_track_name_).ToString() << "\n"; - session_->Announce(GetUserNamespace(my_track_name_), - std::move(announce_callback), VersionSpecificParameters()); + session_->PublishNamespace(GetUserNamespace(my_track_name_), + std::move(publish_namespace_callback), + VersionSpecificParameters()); - // Send SUBSCRIBE_ANNOUNCE. Pop 3 levels of namespace to get to {moq-chat, - // chat-id} - MoqtOutgoingSubscribeNamespaceCallback subscribe_announces_callback = + // Send SUBSCRIBE_NAMESPACE. Pop 3 levels of namespace to get to + // {moq-chat, chat-id} + MoqtOutgoingSubscribeNamespaceCallback subscribe_namespace_callback = [this](TrackNamespace track_namespace, std::optional<RequestErrorCode> error, absl::string_view reason) { if (error.has_value()) { @@ -252,7 +258,7 @@ VersionSpecificParameters parameters( AuthTokenType::kOutOfBand, std::string(GetUsername(my_track_name_))); session_->SubscribeNamespace(GetChatNamespace(my_track_name_), - std::move(subscribe_announces_callback), + std::move(subscribe_namespace_callback), parameters); while (session_is_open_ && is_syncing()) {
diff --git a/quiche/quic/moqt/tools/chat_client.h b/quiche/quic/moqt/tools/chat_client.h index fcb9981..85becbf 100644 --- a/quiche/quic/moqt/tools/chat_client.h +++ b/quiche/quic/moqt/tools/chat_client.h
@@ -112,7 +112,7 @@ }; // Returns false on error. - bool AnnounceAndSubscribeNamespace(); + bool PublishNamespaceAndSubscribeNamespace(); bool session_is_open() const { return session_is_open_; } @@ -126,8 +126,8 @@ private: void RunEventLoop() { event_loop_->RunEventLoopOnce(kChatEventLoopDuration); } - // Callback for incoming announces. - std::optional<MoqtAnnounceErrorReason> OnIncomingAnnounce( + // Callback for incoming publish_namespaces. + std::optional<MoqtPublishNamespaceErrorReason> OnIncomingPublishNamespace( const moqt::TrackNamespace& track_namespace, std::optional<VersionSpecificParameters> parameters); @@ -150,7 +150,7 @@ absl::flat_hash_set<FullTrackName> other_users_; int subscribes_to_make_ = 0; - // Related to subscriptions/announces + // Related to subscriptions/publish_namespaces // TODO: One for each subscribe RemoteTrackVisitor remote_track_visitor_;
diff --git a/quiche/quic/moqt/tools/chat_client_bin.cc b/quiche/quic/moqt/tools/chat_client_bin.cc index e627ea4..c391c8c 100644 --- a/quiche/quic/moqt/tools/chat_client_bin.cc +++ b/quiche/quic/moqt/tools/chat_client_bin.cc
@@ -152,7 +152,7 @@ if (!client.Connect(path)) { return 1; } - if (!client.AnnounceAndSubscribeNamespace()) { + if (!client.PublishNamespaceAndSubscribeNamespace()) { return 1; } client.IoLoop();
diff --git a/quiche/quic/moqt/tools/chat_server.cc b/quiche/quic/moqt/tools/chat_server.cc index 4c64466..fa7d5d4 100644 --- a/quiche/quic/moqt/tools/chat_server.cc +++ b/quiche/quic/moqt/tools/chat_server.cc
@@ -26,32 +26,34 @@ namespace moqt::moq_chat { -std::optional<MoqtAnnounceErrorReason> -ChatServer::ChatServerSessionHandler::OnIncomingAnnounce( +std::optional<MoqtPublishNamespaceErrorReason> +ChatServer::ChatServerSessionHandler::OnIncomingPublishNamespace( const moqt::TrackNamespace& track_namespace, std::optional<VersionSpecificParameters> parameters) { if (track_name_.has_value() && GetUserNamespace(*track_name_) != track_namespace) { // ChatServer only supports one track per client session at a time. Return - // ANNOUNCE_OK and exit. + // PUBLISH_NAMESPACE_OK and exit. return std::nullopt; } - // Accept the ANNOUNCE regardless of the chat_id. + // Accept the PUBLISH_NAMESPACE regardless of the chat_id. track_name_ = ConstructTrackNameFromNamespace(track_namespace, GetChatId(track_namespace)); if (!track_name_.has_value()) { - std::cout << "Malformed ANNOUNCE namespace\n"; - return MoqtAnnounceErrorReason(RequestErrorCode::kTrackDoesNotExist, - "Not a valid namespace for this chat."); + std::cout << "Malformed PUBLISH_NAMESPACE namespace\n"; + return MoqtPublishNamespaceErrorReason( + RequestErrorCode::kTrackDoesNotExist, + "Not a valid namespace for this chat."); } if (!parameters.has_value()) { - std::cout << "Received UNANNOUNCE for " << track_namespace.ToString() - << "\n"; + std::cout << "Received PUBLISH_NAMESPACE_DONE for " + << track_namespace.ToString() << "\n"; server_->DeleteUser(*track_name_); track_name_.reset(); return std::nullopt; } - std::cout << "Received ANNOUNCE for " << track_namespace.ToString() << "\n"; + std::cout << "Received PUBLISH_NAMESPACE for " << track_namespace.ToString() + << "\n"; session_->SubscribeCurrentObject(*track_name_, server_->remote_track_visitor(), moqt::VersionSpecificParameters()); @@ -59,11 +61,11 @@ return std::nullopt; } -void ChatServer::ChatServerSessionHandler::OnOutgoingAnnounceReply( +void ChatServer::ChatServerSessionHandler::OnOutgoingPublishNamespaceReply( TrackNamespace track_namespace, - std::optional<MoqtAnnounceErrorReason> error_message) { + std::optional<MoqtPublishNamespaceErrorReason> error_message) { // Log the result; the server doesn't really care. - std::cout << "ANNOUNCE for " << track_namespace.ToString(); + std::cout << "PUBLISH_NAMESPACE for " << track_namespace.ToString(); if (error_message.has_value()) { std::cout << " failed with error: " << error_message->reason_phrase << "\n"; } else { @@ -74,8 +76,8 @@ ChatServer::ChatServerSessionHandler::ChatServerSessionHandler( MoqtSession* session, ChatServer* server) : session_(session), server_(server) { - session_->callbacks().incoming_announce_callback = absl::bind_front( - &ChatServer::ChatServerSessionHandler::OnIncomingAnnounce, this); + session_->callbacks().incoming_publish_namespace_callback = absl::bind_front( + &ChatServer::ChatServerSessionHandler::OnIncomingPublishNamespace, this); session_->callbacks().session_terminated_callback = [this](absl::string_view error_message) { std::cout << "Session terminated, reason = " << error_message << "\n"; @@ -84,7 +86,7 @@ server_->DeleteUser(*track_name_); } }; - session_->callbacks().incoming_subscribe_announces_callback = + session_->callbacks().incoming_subscribe_namespace_callback = [this](const moqt::TrackNamespace& chat_namespace, std::optional<VersionSpecificParameters> parameters) { if (parameters.has_value()) { @@ -104,19 +106,19 @@ if (!parameters.has_value()) { return std::optional<MoqtSubscribeErrorReason>(); } - // Send all ANNOUNCE. + // Send all PUBLISH_NAMESPACE. for (auto& [track_name, queue] : server_->user_queues_) { - std::cout << "Sending ANNOUNCE for " + std::cout << "Sending PUBLISH_NAMESPACE for " << GetUserNamespace(track_name).ToString() << "\n"; if (track_name_.has_value() && GetUsername(*track_name_) == GetUsername(track_name)) { - // Don't ANNOUNCE a client to itself. + // Don't PUBLISH_NAMESPACE a client to itself. continue; } - session_->Announce( + session_->PublishNamespace( GetUserNamespace(track_name), absl::bind_front(&ChatServer::ChatServerSessionHandler:: - OnOutgoingAnnounceReply, + OnOutgoingPublishNamespaceReply, this), moqt::VersionSpecificParameters()); } @@ -202,7 +204,7 @@ MoqtDeliveryOrder::kAscending, quic::QuicTime::Infinite()); publisher_.Add(user_queues_[track_name]); for (auto& session : sessions_) { - session.AnnounceIfSubscribed(track_name.track_namespace()); + session.PublishNamespaceIfSubscribed(track_name.track_namespace()); } } @@ -216,7 +218,7 @@ publisher_.Delete(track_name); TrackNamespace track_namespace = GetUserNamespace(track_name); for (auto& session : sessions_) { - session.UnannounceIfSubscribed(track_namespace); + session.PublishNamespaceDoneIfSubscribed(track_namespace); } if (user_queues_.empty()) { std::cout << "No more users!\n";
diff --git a/quiche/quic/moqt/tools/chat_server.h b/quiche/quic/moqt/tools/chat_server.h index 468845f..971e49e 100644 --- a/quiche/quic/moqt/tools/chat_server.h +++ b/quiche/quic/moqt/tools/chat_server.h
@@ -64,14 +64,14 @@ it_ = it; } - void AnnounceIfSubscribed(TrackNamespace track_namespace) { + void PublishNamespaceIfSubscribed(TrackNamespace track_namespace) { for (const TrackNamespace& subscribed_namespace : subscribed_namespaces_) { if (track_namespace.InNamespace(subscribed_namespace)) { - session_->Announce( + session_->PublishNamespace( track_namespace, absl::bind_front(&ChatServer::ChatServerSessionHandler:: - OnOutgoingAnnounceReply, + OnOutgoingPublishNamespaceReply, this), VersionSpecificParameters()); return; @@ -79,24 +79,24 @@ } } - void UnannounceIfSubscribed(TrackNamespace track_namespace) { + void PublishNamespaceDoneIfSubscribed(TrackNamespace track_namespace) { for (const TrackNamespace& subscribed_namespace : subscribed_namespaces_) { if (track_namespace.InNamespace(subscribed_namespace)) { - session_->Unannounce(track_namespace); + session_->PublishNamespaceDone(track_namespace); return; } } } private: - // Callback for incoming announces. - std::optional<MoqtAnnounceErrorReason> OnIncomingAnnounce( + // Callback for incoming publish_namespaces. + std::optional<MoqtPublishNamespaceErrorReason> OnIncomingPublishNamespace( const moqt::TrackNamespace& track_namespace, std::optional<VersionSpecificParameters> parameters); - void OnOutgoingAnnounceReply( + void OnOutgoingPublishNamespaceReply( TrackNamespace track_namespace, - std::optional<MoqtAnnounceErrorReason> error_message); + std::optional<MoqtPublishNamespaceErrorReason> error_message); MoqtSession* session_; // Not owned. // This design assumes that each server has exactly one username, although
diff --git a/quiche/quic/moqt/tools/moq_chat.h b/quiche/quic/moqt/tools/moq_chat.h index 9d43814..ea40a84 100644 --- a/quiche/quic/moqt/tools/moq_chat.h +++ b/quiche/quic/moqt/tools/moq_chat.h
@@ -44,7 +44,7 @@ std::optional<FullTrackName> ConstructTrackNameFromNamespace( const TrackNamespace& track_namespace, absl::string_view chat_id); -// Strips "chat" from the end of |track_name| to use in ANNOUNCE. +// Strips "chat" from the end of |track_name| to use in PUBLISH_NAMESPACE. const TrackNamespace& GetUserNamespace(const FullTrackName& track_name); // Returns {"moq-chat", chat-id}, useful for SUBSCRIBE_NAMESPACE.
diff --git a/quiche/quic/moqt/tools/moq_chat_end_to_end_test.cc b/quiche/quic/moqt/tools/moq_chat_end_to_end_test.cc index f808d47..0527dfe 100644 --- a/quiche/quic/moqt/tools/moq_chat_end_to_end_test.cc +++ b/quiche/quic/moqt/tools/moq_chat_end_to_end_test.cc
@@ -96,8 +96,8 @@ TEST_F(MoqChatEndToEndTest, EndToEndTest) { EXPECT_TRUE(client1_->Connect(moqt::moq_chat::kWebtransPath)); EXPECT_TRUE(client2_->Connect(moqt::moq_chat::kWebtransPath)); - EXPECT_TRUE(client1_->AnnounceAndSubscribeNamespace()); - EXPECT_TRUE(client2_->AnnounceAndSubscribeNamespace()); + EXPECT_TRUE(client1_->PublishNamespaceAndSubscribeNamespace()); + EXPECT_TRUE(client2_->PublishNamespaceAndSubscribeNamespace()); SendAndWaitForOutput(interface1_, interface2_, "client1", "Hello"); SendAndWaitForOutput(interface2_, interface1_, "client2", "Hi"); SendAndWaitForOutput(interface1_, interface2_, "client1", "How are you?"); @@ -113,8 +113,8 @@ TEST_F(MoqChatEndToEndTest, LeaveAndRejoin) { EXPECT_TRUE(client1_->Connect(moqt::moq_chat::kWebtransPath)); EXPECT_TRUE(client2_->Connect(moqt::moq_chat::kWebtransPath)); - EXPECT_TRUE(client1_->AnnounceAndSubscribeNamespace()); - EXPECT_TRUE(client2_->AnnounceAndSubscribeNamespace()); + EXPECT_TRUE(client1_->PublishNamespaceAndSubscribeNamespace()); + EXPECT_TRUE(client2_->PublishNamespaceAndSubscribeNamespace()); SendAndWaitForOutput(interface1_, interface2_, "client1", "Hello"); SendAndWaitForOutput(interface2_, interface1_, "client2", "Hi"); @@ -136,7 +136,7 @@ std::move(if1bptr), "test_chat", "client1", "device1", server_.moqt_server().quic_server().event_loop()); EXPECT_TRUE(client1_->Connect(moqt::moq_chat::kWebtransPath)); - EXPECT_TRUE(client1_->AnnounceAndSubscribeNamespace()); + EXPECT_TRUE(client1_->PublishNamespaceAndSubscribeNamespace()); SendAndWaitForOutput(interface1b_, interface2_, "client1", "Hello again"); SendAndWaitForOutput(interface2_, interface1b_, "client2", "Hi again"); }
diff --git a/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc b/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc index 8fd51fe..c59955f 100644 --- a/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc +++ b/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc
@@ -3,7 +3,7 @@ // found in the LICENSE file. // moqt_ingestion_server is a simple command-line utility that accepts incoming -// ANNOUNCE messages and records them into a file. +// PUBLISH_NAMESPACE messages and records them into a file. #include <sys/stat.h> @@ -112,21 +112,24 @@ explicit MoqtIngestionHandler(MoqtSession* session, absl::string_view output_root) : session_(session), output_root_(output_root) { - session_->callbacks().incoming_announce_callback = - absl::bind_front(&MoqtIngestionHandler::OnAnnounceReceived, this); + session_->callbacks().incoming_publish_namespace_callback = + absl::bind_front(&MoqtIngestionHandler::OnPublishNamespaceReceived, + this); } - // TODO(martinduke): Handle when |announce| is false (UNANNOUNCE). - std::optional<MoqtAnnounceErrorReason> OnAnnounceReceived( + // TODO(martinduke): Handle when |publish_namespace| is false + // (PUBLISH_NAMESPACE_DONE). + std::optional<MoqtPublishNamespaceErrorReason> OnPublishNamespaceReceived( TrackNamespace track_namespace, std::optional<VersionSpecificParameters> /*parameters*/) { if (!IsValidTrackNamespace(track_namespace) && !quiche::GetQuicheCommandLineFlag( FLAGS_allow_invalid_track_namespaces)) { - QUICHE_DLOG(WARNING) << "Rejected remote announce as it contained " - "disallowed characters; namespace: " - << track_namespace; - return MoqtAnnounceErrorReason{ + QUICHE_DLOG(WARNING) + << "Rejected remote publish_namespace as it contained " + "disallowed characters; namespace: " + << track_namespace; + return MoqtPublishNamespaceErrorReason{ RequestErrorCode::kInternalError, "Track namespace contains disallowed characters"}; } @@ -146,8 +149,9 @@ subscribed_namespaces_.erase(it); QUICHE_LOG(ERROR) << "Failed to create directory " << directory_path << "; " << status; - return MoqtAnnounceErrorReason{RequestErrorCode::kInternalError, - "Failed to create output directory"}; + return MoqtPublishNamespaceErrorReason{ + RequestErrorCode::kInternalError, + "Failed to create output directory"}; } std::string track_list = quiche::GetQuicheCommandLineFlag(FLAGS_tracks);
diff --git a/quiche/quic/moqt/tools/moqt_mock_visitor.h b/quiche/quic/moqt/tools/moqt_mock_visitor.h index 1dbc446..ac1571d 100644 --- a/quiche/quic/moqt/tools/moqt_mock_visitor.h +++ b/quiche/quic/moqt/tools/moqt_mock_visitor.h
@@ -12,13 +12,14 @@ #include <variant> #include "absl/status/status.h" -#include "absl/status/statusor.h" #include "absl/strings/string_view.h" #include "quiche/quic/core/quic_time.h" #include "quiche/quic/moqt/moqt_messages.h" +#include "quiche/quic/moqt/moqt_object.h" #include "quiche/quic/moqt/moqt_priority.h" #include "quiche/quic/moqt/moqt_publisher.h" #include "quiche/quic/moqt/moqt_session.h" +#include "quiche/quic/moqt/moqt_session_callbacks.h" #include "quiche/quic/moqt/moqt_track.h" #include "quiche/common/platform/api/quiche_test.h" @@ -29,17 +30,17 @@ testing::MockFunction<void(absl::string_view)> goaway_received_callback; testing::MockFunction<void(absl::string_view)> session_terminated_callback; testing::MockFunction<void()> session_deleted_callback; - testing::MockFunction<std::optional<MoqtAnnounceErrorReason>( + testing::MockFunction<std::optional<MoqtPublishNamespaceErrorReason>( const TrackNamespace&, std::optional<VersionSpecificParameters>)> - incoming_announce_callback; + incoming_publish_namespace_callback; testing::MockFunction<std::optional<MoqtSubscribeErrorReason>( TrackNamespace, std::optional<VersionSpecificParameters>)> - incoming_subscribe_announces_callback; + incoming_subscribe_namespace_callback; MockSessionCallbacks() { - ON_CALL(incoming_announce_callback, Call(testing::_, testing::_)) - .WillByDefault(DefaultIncomingAnnounceCallback); - ON_CALL(incoming_subscribe_announces_callback, Call(testing::_, testing::_)) + ON_CALL(incoming_publish_namespace_callback, Call(testing::_, testing::_)) + .WillByDefault(DefaultIncomingPublishNamespaceCallback); + ON_CALL(incoming_subscribe_namespace_callback, Call(testing::_, testing::_)) .WillByDefault(DefaultIncomingSubscribeNamespaceCallback); } @@ -49,8 +50,8 @@ goaway_received_callback.AsStdFunction(), session_terminated_callback.AsStdFunction(), session_deleted_callback.AsStdFunction(), - incoming_announce_callback.AsStdFunction(), - incoming_subscribe_announces_callback.AsStdFunction()}; + incoming_publish_namespace_callback.AsStdFunction(), + incoming_subscribe_namespace_callback.AsStdFunction()}; } };