Rename SUBSCRIBE_DONE to PUBLISH_DONE. (moqt-draft-14) PiperOrigin-RevId: 805380602
diff --git a/quiche/quic/moqt/moqt_framer.cc b/quiche/quic/moqt/moqt_framer.cc index 203f8b3..2858390 100644 --- a/quiche/quic/moqt/moqt_framer.cc +++ b/quiche/quic/moqt/moqt_framer.cc
@@ -504,10 +504,10 @@ WireVarInt62(message.request_id)); } -quiche::QuicheBuffer MoqtFramer::SerializeSubscribeDone( - const MoqtSubscribeDone& message) { +quiche::QuicheBuffer MoqtFramer::SerializePublishDone( + const MoqtPublishDone& message) { return SerializeControlMessage( - MoqtMessageType::kSubscribeDone, WireVarInt62(message.request_id), + MoqtMessageType::kPublishDone, WireVarInt62(message.request_id), WireVarInt62(message.status_code), WireVarInt62(message.stream_count), WireStringWithVarInt62Length(message.error_reason)); }
diff --git a/quiche/quic/moqt/moqt_framer.h b/quiche/quic/moqt/moqt_framer.h index d1a6294..0168758 100644 --- a/quiche/quic/moqt/moqt_framer.h +++ b/quiche/quic/moqt/moqt_framer.h
@@ -53,7 +53,7 @@ const MoqtSubscribeError& message, MoqtMessageType message_type = MoqtMessageType::kSubscribeError); quiche::QuicheBuffer SerializeUnsubscribe(const MoqtUnsubscribe& message); - quiche::QuicheBuffer SerializeSubscribeDone(const MoqtSubscribeDone& message); + quiche::QuicheBuffer SerializePublishDone(const MoqtPublishDone& message); quiche::QuicheBuffer SerializeSubscribeUpdate( const MoqtSubscribeUpdate& message); quiche::QuicheBuffer SerializeAnnounce(const MoqtAnnounce& message);
diff --git a/quiche/quic/moqt/moqt_framer_test.cc b/quiche/quic/moqt/moqt_framer_test.cc index 5503c54..a63dfb3 100644 --- a/quiche/quic/moqt/moqt_framer_test.cc +++ b/quiche/quic/moqt/moqt_framer_test.cc
@@ -38,7 +38,7 @@ MoqtMessageType::kSubscribeOk, MoqtMessageType::kSubscribeError, MoqtMessageType::kUnsubscribe, - MoqtMessageType::kSubscribeDone, + MoqtMessageType::kPublishDone, MoqtMessageType::kAnnounce, MoqtMessageType::kAnnounceOk, MoqtMessageType::kAnnounceError, @@ -140,9 +140,9 @@ auto data = std::get<MoqtUnsubscribe>(structured_data); return framer_.SerializeUnsubscribe(data); } - case MoqtMessageType::kSubscribeDone: { - auto data = std::get<MoqtSubscribeDone>(structured_data); - return framer_.SerializeSubscribeDone(data); + case MoqtMessageType::kPublishDone: { + auto data = std::get<MoqtPublishDone>(structured_data); + return framer_.SerializePublishDone(data); } case MoqtMessageType::kAnnounce: { auto data = std::get<MoqtAnnounce>(structured_data);
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc index 05bda43..6eeb973 100644 --- a/quiche/quic/moqt/moqt_integration_test.cc +++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -602,7 +602,7 @@ EXPECT_TRUE(success); } -TEST_F(MoqtIntegrationTest, CleanSubscribeDone) { +TEST_F(MoqtIntegrationTest, CleanPublishDone) { EstablishSession(); FullTrackName full_track_name("foo", "bar"); @@ -633,7 +633,7 @@ full_track_name, &client_visitor, VersionSpecificParameters())); queue->RemoveAllSubscriptions(); // Induce a SUBSCRIBE_DONE. bool subscribe_done = false; - EXPECT_CALL(client_visitor, OnSubscribeDone).WillOnce([&]() { + EXPECT_CALL(client_visitor, OnPublishDone).WillOnce([&]() { subscribe_done = true; }); success = test_harness_.RunUntilWithDefaultTimeout(
diff --git a/quiche/quic/moqt/moqt_messages.cc b/quiche/quic/moqt/moqt_messages.cc index c28ad79..c1f5dce 100644 --- a/quiche/quic/moqt/moqt_messages.cc +++ b/quiche/quic/moqt/moqt_messages.cc
@@ -237,7 +237,7 @@ return "SUBSCRIBE_ERROR"; case MoqtMessageType::kUnsubscribe: return "UNSUBSCRIBE"; - case MoqtMessageType::kSubscribeDone: + case MoqtMessageType::kPublishDone: return "SUBSCRIBE_DONE"; case MoqtMessageType::kSubscribeUpdate: return "SUBSCRIBE_UPDATE";
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h index 805e478..e208d06 100644 --- a/quiche/quic/moqt/moqt_messages.h +++ b/quiche/quic/moqt/moqt_messages.h
@@ -259,7 +259,7 @@ kAnnounceError = 0x08, kUnannounce = 0x09, kUnsubscribe = 0x0a, - kSubscribeDone = 0x0b, + kPublishDone = 0x0b, kAnnounceCancel = 0x0c, kTrackStatus = 0x0d, kTrackStatusOk = 0x0e, @@ -703,7 +703,7 @@ uint64_t request_id; }; -enum class QUICHE_EXPORT SubscribeDoneCode : uint64_t { +enum class QUICHE_EXPORT PublishDoneCode : uint64_t { kInternalError = 0x0, kUnauthorized = 0x1, kTrackEnded = 0x2, @@ -714,9 +714,9 @@ kMalformedTrack = 0x7, }; -struct QUICHE_EXPORT MoqtSubscribeDone { +struct QUICHE_EXPORT MoqtPublishDone { uint64_t request_id; - SubscribeDoneCode status_code; + PublishDoneCode status_code; uint64_t stream_count; std::string error_reason; };
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc index 0221e5a..afe7e5e 100644 --- a/quiche/quic/moqt/moqt_parser.cc +++ b/quiche/quic/moqt/moqt_parser.cc
@@ -232,8 +232,8 @@ case MoqtMessageType::kUnsubscribe: bytes_read = ProcessUnsubscribe(reader); break; - case MoqtMessageType::kSubscribeDone: - bytes_read = ProcessSubscribeDone(reader); + case MoqtMessageType::kPublishDone: + bytes_read = ProcessPublishDone(reader); break; case MoqtMessageType::kSubscribeUpdate: bytes_read = ProcessSubscribeUpdate(reader); @@ -526,8 +526,8 @@ return reader.PreviouslyReadPayload().length(); } -size_t MoqtControlParser::ProcessSubscribeDone(quic::QuicDataReader& reader) { - MoqtSubscribeDone subscribe_done; +size_t MoqtControlParser::ProcessPublishDone(quic::QuicDataReader& reader) { + MoqtPublishDone subscribe_done; uint64_t value; if (!reader.ReadVarInt62(&subscribe_done.request_id) || !reader.ReadVarInt62(&value) || @@ -535,8 +535,8 @@ !reader.ReadStringVarInt62(subscribe_done.error_reason)) { return 0; } - subscribe_done.status_code = static_cast<SubscribeDoneCode>(value); - visitor_.OnSubscribeDoneMessage(subscribe_done); + subscribe_done.status_code = static_cast<PublishDoneCode>(value); + visitor_.OnPublishDoneMessage(subscribe_done); return reader.PreviouslyReadPayload().length(); }
diff --git a/quiche/quic/moqt/moqt_parser.h b/quiche/quic/moqt/moqt_parser.h index 9c287ba..e10beb9 100644 --- a/quiche/quic/moqt/moqt_parser.h +++ b/quiche/quic/moqt/moqt_parser.h
@@ -39,7 +39,7 @@ virtual void OnSubscribeOkMessage(const MoqtSubscribeOk& message) = 0; virtual void OnSubscribeErrorMessage(const MoqtSubscribeError& message) = 0; virtual void OnUnsubscribeMessage(const MoqtUnsubscribe& message) = 0; - virtual void OnSubscribeDoneMessage(const MoqtSubscribeDone& message) = 0; + virtual void OnPublishDoneMessage(const MoqtPublishDone& message) = 0; virtual void OnSubscribeUpdateMessage(const MoqtSubscribeUpdate& message) = 0; virtual void OnAnnounceMessage(const MoqtAnnounce& message) = 0; virtual void OnAnnounceOkMessage(const MoqtAnnounceOk& message) = 0; @@ -126,7 +126,7 @@ quic::QuicDataReader& reader, MoqtMessageType message_type = MoqtMessageType::kSubscribeError); size_t ProcessUnsubscribe(quic::QuicDataReader& reader); - size_t ProcessSubscribeDone(quic::QuicDataReader& reader); + size_t ProcessPublishDone(quic::QuicDataReader& reader); size_t ProcessSubscribeUpdate(quic::QuicDataReader& reader); size_t ProcessAnnounce(quic::QuicDataReader& reader); size_t ProcessAnnounceOk(quic::QuicDataReader& reader);
diff --git a/quiche/quic/moqt/moqt_parser_test.cc b/quiche/quic/moqt/moqt_parser_test.cc index c3e85e1..eaee5c4 100644 --- a/quiche/quic/moqt/moqt_parser_test.cc +++ b/quiche/quic/moqt/moqt_parser_test.cc
@@ -37,7 +37,7 @@ MoqtMessageType::kSubscribeError, MoqtMessageType::kSubscribeUpdate, MoqtMessageType::kUnsubscribe, - MoqtMessageType::kSubscribeDone, + MoqtMessageType::kPublishDone, MoqtMessageType::kAnnounceCancel, MoqtMessageType::kTrackStatus, MoqtMessageType::kTrackStatusOk,
diff --git a/quiche/quic/moqt/moqt_relay_track_publisher.h b/quiche/quic/moqt/moqt_relay_track_publisher.h index c379f34..20896e6 100644 --- a/quiche/quic/moqt/moqt_relay_track_publisher.h +++ b/quiche/quic/moqt/moqt_relay_track_publisher.h
@@ -71,7 +71,7 @@ const PublishedObjectMetadata& /*metadata*/, absl::string_view /*object*/, bool /*end_of_message*/) override {} - void OnSubscribeDone(FullTrackName /*full_track_name*/) override {} + void OnPublishDone(FullTrackName /*full_track_name*/) override {} void OnMalformedTrack(const FullTrackName& /*full_track_name*/) override {} // Publish a received object. Returns false if the object is invalid, given
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc index ee45395..783a0cf 100644 --- a/quiche/quic/moqt/moqt_session.cc +++ b/quiche/quic/moqt/moqt_session.cc
@@ -672,8 +672,8 @@ "Peer did not close session after GOAWAY"); } -bool MoqtSession::SubscribeIsDone(uint64_t request_id, SubscribeDoneCode code, - absl::string_view error_reason) { +bool MoqtSession::PublishIsDone(uint64_t request_id, PublishDoneCode code, + absl::string_view error_reason) { auto it = published_subscriptions_.find(request_id); if (it == published_subscriptions_.end()) { return false; @@ -683,12 +683,12 @@ std::vector<webtransport::StreamId> streams_to_reset = subscription.GetAllStreams(); - MoqtSubscribeDone subscribe_done; + MoqtPublishDone subscribe_done; subscribe_done.request_id = request_id; subscribe_done.status_code = code; subscribe_done.stream_count = subscription.streams_opened(); subscribe_done.error_reason = error_reason; - SendControlMessage(framer_.SerializeSubscribeDone(subscribe_done)); + SendControlMessage(framer_.SerializePublishDone(subscribe_done)); QUIC_DLOG(INFO) << ENDPOINT << "Sent SUBSCRIBE_DONE message for " << subscription.publisher().GetTrackName(); // Clean up the subscription @@ -710,7 +710,7 @@ } void MoqtSession::DestroySubscription(SubscribeRemoteTrack* subscribe) { - subscribe->visitor()->OnSubscribeDone(subscribe->full_track_name()); + subscribe->visitor()->OnPublishDone(subscribe->full_track_name()); subscribe_by_name_.erase(subscribe->full_track_name()); if (subscribe->track_alias().has_value()) { subscribe_by_alias_.erase(*subscribe->track_alias()); @@ -1166,8 +1166,8 @@ session_->published_subscriptions_.erase(it); } -void MoqtSession::ControlStream::OnSubscribeDoneMessage( - const MoqtSubscribeDone& message) { +void MoqtSession::ControlStream::OnPublishDoneMessage( + const MoqtPublishDone& message) { auto it = session_->upstream_by_id_.find(message.request_id); if (it == session_->upstream_by_id_.end()) { return; @@ -1175,10 +1175,10 @@ auto* subscribe = static_cast<SubscribeRemoteTrack*>(it->second.get()); QUIC_DLOG(INFO) << ENDPOINT << "Received a SUBSCRIBE_DONE for " << it->second->full_track_name(); - subscribe->OnSubscribeDone( + subscribe->OnPublishDone( message.stream_count, session_->callbacks_.clock, absl::WrapUnique(session_->alarm_factory_->CreateAlarm( - new SubscribeDoneDelegate(session_, subscribe)))); + new PublishDoneDelegate(session_, subscribe)))); session_->MaybeDestroySubscription(subscribe); } @@ -2092,8 +2092,8 @@ } void MoqtSession::PublishedSubscription::OnTrackPublisherGone() { - session_->SubscribeIsDone(request_id_, SubscribeDoneCode::kGoingAway, - "Publisher is gone"); + session_->PublishIsDone(request_id_, PublishDoneCode::kGoingAway, + "Publisher is gone"); } // TODO(martinduke): Revise to check if the last object has been delivered. @@ -2158,8 +2158,7 @@ stream_map().GetStreamsForGroup(group_id); if (delivery_timeout_.IsInfinite() && largest_sent_.has_value() && largest_sent_->group <= group_id) { - session_->SubscribeIsDone(request_id_, SubscribeDoneCode::kTooFarBehind, - ""); + session_->PublishIsDone(request_id_, PublishDoneCode::kTooFarBehind, ""); // No class access below this line! return; }
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h index 571d659..407a5c7 100644 --- a/quiche/quic/moqt/moqt_session.h +++ b/quiche/quic/moqt/moqt_session.h
@@ -230,7 +230,7 @@ void OnSubscribeErrorMessage(const MoqtSubscribeError& message) override; void OnUnsubscribeMessage(const MoqtUnsubscribe& message) override; // There is no state to update for SUBSCRIBE_DONE. - void OnSubscribeDoneMessage(const MoqtSubscribeDone& message) override; + void OnPublishDoneMessage(const MoqtPublishDone& /*message*/) override; void OnSubscribeUpdateMessage(const MoqtSubscribeUpdate& message) override; void OnAnnounceMessage(const MoqtAnnounce& message) override; void OnAnnounceOkMessage(const MoqtAnnounceOk& message) override; @@ -690,9 +690,9 @@ MoqtSession* session_; }; - class SubscribeDoneDelegate : public quic::QuicAlarm::DelegateWithoutContext { + class PublishDoneDelegate : public quic::QuicAlarm::DelegateWithoutContext { public: - SubscribeDoneDelegate(MoqtSession* session, SubscribeRemoteTrack* subscribe) + PublishDoneDelegate(MoqtSession* session, SubscribeRemoteTrack* subscribe) : session_(session), subscribe_(subscribe) {} void OnAlarm() override { session_->DestroySubscription(subscribe_); } @@ -704,8 +704,8 @@ // Private members of MoqtSession. // Returns true if SUBSCRIBE_DONE was sent. - bool SubscribeIsDone(uint64_t request_id, SubscribeDoneCode code, - absl::string_view error_reason); + bool PublishIsDone(uint64_t request_id, PublishDoneCode code, + absl::string_view error_reason); void MaybeDestroySubscription(SubscribeRemoteTrack* subscribe); void DestroySubscription(SubscribeRemoteTrack* subscribe);
diff --git a/quiche/quic/moqt/moqt_session_interface.h b/quiche/quic/moqt/moqt_session_interface.h index 5083a09..b60439d 100644 --- a/quiche/quic/moqt/moqt_session_interface.h +++ b/quiche/quic/moqt/moqt_session_interface.h
@@ -50,7 +50,7 @@ const PublishedObjectMetadata& metadata, absl::string_view object, bool end_of_message) = 0; - virtual void OnSubscribeDone(FullTrackName full_track_name) = 0; + virtual void OnPublishDone(FullTrackName full_track_name) = 0; // Called when the track is malformed per Section 2.5 of // draft-ietf-moqt-moq-transport-12. If the application is a relay, it MUST // terminate downstream delivery of the track. @@ -162,7 +162,7 @@ // TODO: Add Announce, Unannounce method. // TODO: Add AnnounceCancel method. // TODO: Add TrackStatusRequest method. - // TODO: Add SubscribeUpdate, SubscribeDone method. + // TODO: Add SubscribeUpdate, PublishDone method. virtual quiche::QuicheWeakPtr<MoqtSessionInterface> GetWeakPtr() = 0; };
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc index 967bc0a..5a62fb0 100644 --- a/quiche/quic/moqt/moqt_session_test.cc +++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -1392,9 +1392,9 @@ EXPECT_TRUE(correct_message); EXPECT_TRUE(fin); - struct MoqtSubscribeDone expected_subscribe_done = { + struct MoqtPublishDone expected_subscribe_done = { /*request_id=*/0, - SubscribeDoneCode::kTooFarBehind, + PublishDoneCode::kTooFarBehind, /*stream_count=*/1, /*error_reason=*/"", }; @@ -1458,9 +1458,9 @@ EXPECT_TRUE(correct_message); EXPECT_TRUE(fin); - struct MoqtSubscribeDone expected_subscribe_done = { + struct MoqtPublishDone expected_subscribe_done = { /*request_id=*/0, - SubscribeDoneCode::kTooFarBehind, + PublishDoneCode::kTooFarBehind, /*stream_count=*/1, /*error_reason=*/"", }; @@ -3531,7 +3531,7 @@ EXPECT_TRUE(reported_error); } -TEST_F(MoqtSessionTest, ReceiveSubscribeDoneWithOpenStreams) { +TEST_F(MoqtSessionTest, ReceivePublishDoneWithOpenStreams) { MockSubscribeRemoteTrackVisitor remote_track_visitor; webtransport::test::MockStream control_stream; std::unique_ptr<MoqtControlParserVisitor> stream_input = @@ -3578,19 +3578,19 @@ SubscribeRemoteTrack* track = MoqtSessionPeer::remote_track(&session_, 0); ASSERT_NE(track, nullptr); EXPECT_FALSE(track->all_streams_closed()); - stream_input->OnSubscribeDoneMessage( - MoqtSubscribeDone(0, SubscribeDoneCode::kTrackEnded, kNumStreams, "foo")); + stream_input->OnPublishDoneMessage( + MoqtPublishDone(0, PublishDoneCode::kTrackEnded, kNumStreams, "foo")); track = MoqtSessionPeer::remote_track(&session_, 0); ASSERT_NE(track, nullptr); EXPECT_FALSE(track->all_streams_closed()); - EXPECT_CALL(remote_track_visitor, OnSubscribeDone(_)); + EXPECT_CALL(remote_track_visitor, OnPublishDone(_)); for (uint64_t i = 0; i < kNumStreams; ++i) { data_streams[i].reset(); } EXPECT_EQ(MoqtSessionPeer::remote_track(&session_, 0), nullptr); } -TEST_F(MoqtSessionTest, ReceiveSubscribeDoneWithClosedStreams) { +TEST_F(MoqtSessionTest, ReceivePublishDoneWithClosedStreams) { MockSubscribeRemoteTrackVisitor remote_track_visitor; webtransport::test::MockStream control_stream; std::unique_ptr<MoqtControlParserVisitor> stream_input = @@ -3640,13 +3640,13 @@ SubscribeRemoteTrack* track = MoqtSessionPeer::remote_track(&session_, 0); ASSERT_NE(track, nullptr); EXPECT_FALSE(track->all_streams_closed()); - EXPECT_CALL(remote_track_visitor, OnSubscribeDone(_)); - stream_input->OnSubscribeDoneMessage( - MoqtSubscribeDone(0, SubscribeDoneCode::kTrackEnded, kNumStreams, "foo")); + EXPECT_CALL(remote_track_visitor, OnPublishDone(_)); + stream_input->OnPublishDoneMessage( + MoqtPublishDone(0, PublishDoneCode::kTrackEnded, kNumStreams, "foo")); EXPECT_EQ(MoqtSessionPeer::remote_track(&session_, 0), nullptr); } -TEST_F(MoqtSessionTest, SubscribeDoneTimeout) { +TEST_F(MoqtSessionTest, PublishDoneTimeout) { MockSubscribeRemoteTrackVisitor remote_track_visitor; webtransport::test::MockStream control_stream; std::unique_ptr<MoqtControlParserVisitor> stream_input = @@ -3697,13 +3697,13 @@ ASSERT_NE(track, nullptr); EXPECT_FALSE(track->all_streams_closed()); // stream_count includes a stream that was never sent. - stream_input->OnSubscribeDoneMessage(MoqtSubscribeDone( - 0, SubscribeDoneCode::kTrackEnded, kNumStreams + 1, "foo")); + stream_input->OnPublishDoneMessage( + MoqtPublishDone(0, PublishDoneCode::kTrackEnded, kNumStreams + 1, "foo")); EXPECT_FALSE(track->all_streams_closed()); auto* subscribe_done_alarm = static_cast<quic::test::MockAlarmFactory::TestAlarm*>( - MoqtSessionPeer::GetSubscribeDoneAlarm(track)); - EXPECT_CALL(remote_track_visitor, OnSubscribeDone(_)); + MoqtSessionPeer::GetPublishDoneAlarm(track)); + EXPECT_CALL(remote_track_visitor, OnPublishDone(_)); subscribe_done_alarm->Fire(); // quic::test::MockAlarmFactory::FireAlarm(subscribe_done_alarm);; EXPECT_EQ(MoqtSessionPeer::remote_track(&session_, 0), nullptr); @@ -3935,7 +3935,7 @@ const quiche::StreamWriteOptions& options) { correct_message = true; EXPECT_EQ(*ExtractMessageType(data[0].AsStringView()), - MoqtMessageType::kSubscribeDone); + MoqtMessageType::kPublishDone); return absl::OkStatus(); }); stream_input->OnSubscribeUpdateMessage(update);
diff --git a/quiche/quic/moqt/moqt_track.cc b/quiche/quic/moqt/moqt_track.cc index b61db08..bd3bf47 100644 --- a/quiche/quic/moqt/moqt_track.cc +++ b/quiche/quic/moqt/moqt_track.cc
@@ -30,9 +30,9 @@ namespace { -constexpr quic::QuicTimeDelta kMinSubscribeDoneTimeout = +constexpr quic::QuicTimeDelta kMinPublishDoneTimeout = quic::QuicTimeDelta::FromSeconds(1); -constexpr quic::QuicTimeDelta kMaxSubscribeDoneTimeout = +constexpr quic::QuicTimeDelta kMaxPublishDoneTimeout = quic::QuicTimeDelta::FromSeconds(10); } // namespace @@ -61,24 +61,24 @@ if (subscribe_done_alarm_ == nullptr) { return; } - MaybeSetSubscribeDoneAlarm(); + MaybeSetPublishDoneAlarm(); } -void SubscribeRemoteTrack::OnSubscribeDone( +void SubscribeRemoteTrack::OnPublishDone( uint64_t stream_count, const quic::QuicClock* clock, std::unique_ptr<quic::QuicAlarm> subscribe_done_alarm) { total_streams_ = stream_count; clock_ = clock; subscribe_done_alarm_ = std::move(subscribe_done_alarm); - MaybeSetSubscribeDoneAlarm(); + MaybeSetPublishDoneAlarm(); } -void SubscribeRemoteTrack::MaybeSetSubscribeDoneAlarm() { +void SubscribeRemoteTrack::MaybeSetPublishDoneAlarm() { if (currently_open_streams_ == 0 && total_streams_.has_value() && clock_ != nullptr) { quic::QuicTimeDelta timeout = - std::min(delivery_timeout_, kMaxSubscribeDoneTimeout); - timeout = std::max(timeout, kMinSubscribeDoneTimeout); + std::min(delivery_timeout_, kMaxPublishDoneTimeout); + timeout = std::max(timeout, kMinPublishDoneTimeout); subscribe_done_alarm_->Set(clock_->ApproximateNow() + timeout); } }
diff --git a/quiche/quic/moqt/moqt_track.h b/quiche/quic/moqt/moqt_track.h index 34dc3bf..ecb4670 100644 --- a/quiche/quic/moqt/moqt_track.h +++ b/quiche/quic/moqt/moqt_track.h
@@ -133,8 +133,8 @@ } void OnStreamOpened(); void OnStreamClosed(); - void OnSubscribeDone(uint64_t stream_count, const quic::QuicClock* clock, - std::unique_ptr<quic::QuicAlarm> subscribe_done_alarm); + void OnPublishDone(uint64_t stream_count, const quic::QuicClock* clock, + std::unique_ptr<quic::QuicAlarm> subscribe_done_alarm); bool all_streams_closed() const { return total_streams_.has_value() && *total_streams_ == streams_closed_; } @@ -154,7 +154,7 @@ friend class test::MoqtSessionPeer; friend class test::SubscribeRemoteTrackPeer; - void MaybeSetSubscribeDoneAlarm(); + void MaybeSetPublishDoneAlarm(); void FetchObjects(); std::unique_ptr<MoqtFetchTask> fetch_task_;
diff --git a/quiche/quic/moqt/test_tools/moqt_framer_utils.cc b/quiche/quic/moqt/test_tools/moqt_framer_utils.cc index 7656a9f..8643c12 100644 --- a/quiche/quic/moqt/test_tools/moqt_framer_utils.cc +++ b/quiche/quic/moqt/test_tools/moqt_framer_utils.cc
@@ -44,8 +44,8 @@ quiche::QuicheBuffer operator()(const MoqtUnsubscribe& message) { return framer.SerializeUnsubscribe(message); } - quiche::QuicheBuffer operator()(const MoqtSubscribeDone& message) { - return framer.SerializeSubscribeDone(message); + quiche::QuicheBuffer operator()(const MoqtPublishDone& message) { + return framer.SerializePublishDone(message); } quiche::QuicheBuffer operator()(const MoqtSubscribeUpdate& message) { return framer.SerializeSubscribeUpdate(message); @@ -147,7 +147,7 @@ void OnUnsubscribeMessage(const MoqtUnsubscribe& message) { frames_.push_back(message); } - void OnSubscribeDoneMessage(const MoqtSubscribeDone& message) { + void OnPublishDoneMessage(const MoqtPublishDone& message) { frames_.push_back(message); } void OnSubscribeUpdateMessage(const MoqtSubscribeUpdate& message) {
diff --git a/quiche/quic/moqt/test_tools/moqt_framer_utils.h b/quiche/quic/moqt/test_tools/moqt_framer_utils.h index b75a0c5..b0e05d7 100644 --- a/quiche/quic/moqt/test_tools/moqt_framer_utils.h +++ b/quiche/quic/moqt/test_tools/moqt_framer_utils.h
@@ -27,7 +27,7 @@ // those two types. using MoqtGenericFrame = std::variant< MoqtClientSetup, MoqtServerSetup, MoqtSubscribe, MoqtSubscribeOk, - MoqtSubscribeError, MoqtUnsubscribe, MoqtSubscribeDone, MoqtSubscribeUpdate, + MoqtSubscribeError, MoqtUnsubscribe, MoqtPublishDone, MoqtSubscribeUpdate, MoqtAnnounce, MoqtAnnounceOk, MoqtAnnounceError, MoqtUnannounce, MoqtAnnounceCancel, MoqtTrackStatus, MoqtTrackStatusOk, MoqtTrackStatusError, MoqtGoAway, MoqtSubscribeNamespace,
diff --git a/quiche/quic/moqt/test_tools/moqt_parser_test_visitor.h b/quiche/quic/moqt/test_tools/moqt_parser_test_visitor.h index 76ca1fa..be6f3b2 100644 --- a/quiche/quic/moqt/test_tools/moqt_parser_test_visitor.h +++ b/quiche/quic/moqt/test_tools/moqt_parser_test_visitor.h
@@ -63,7 +63,7 @@ void OnUnsubscribeMessage(const MoqtUnsubscribe& message) override { OnControlMessage(message); } - void OnSubscribeDoneMessage(const MoqtSubscribeDone& message) override { + void OnPublishDoneMessage(const MoqtPublishDone& message) override { OnControlMessage(message); } void OnAnnounceMessage(const MoqtAnnounce& message) override {
diff --git a/quiche/quic/moqt/test_tools/moqt_session_peer.h b/quiche/quic/moqt/test_tools/moqt_session_peer.h index 18366c0..5ab0fc4 100644 --- a/quiche/quic/moqt/test_tools/moqt_session_peer.h +++ b/quiche/quic/moqt/test_tools/moqt_session_peer.h
@@ -230,7 +230,7 @@ ->delivery_timeout_alarm_.get(); } - static quic::QuicAlarm* GetSubscribeDoneAlarm( + static quic::QuicAlarm* GetPublishDoneAlarm( SubscribeRemoteTrack* subscription) { return subscription->subscribe_done_alarm_.get(); }
diff --git a/quiche/quic/moqt/test_tools/moqt_test_message.h b/quiche/quic/moqt/test_tools/moqt_test_message.h index bbee41e..13126e4 100644 --- a/quiche/quic/moqt/test_tools/moqt_test_message.h +++ b/quiche/quic/moqt/test_tools/moqt_test_message.h
@@ -71,7 +71,7 @@ using MessageStructuredData = std::variant< MoqtClientSetup, MoqtServerSetup, MoqtObject, MoqtSubscribe, - MoqtSubscribeOk, MoqtSubscribeError, MoqtUnsubscribe, MoqtSubscribeDone, + MoqtSubscribeOk, MoqtSubscribeError, MoqtUnsubscribe, MoqtPublishDone, MoqtSubscribeUpdate, MoqtAnnounce, MoqtAnnounceOk, MoqtAnnounceError, MoqtUnannounce, MoqtAnnounceCancel, MoqtTrackStatus, MoqtTrackStatusOk, MoqtTrackStatusError, MoqtGoAway, MoqtSubscribeNamespace, @@ -860,14 +860,14 @@ }; }; -class QUICHE_NO_EXPORT SubscribeDoneMessage : public TestMessageBase { +class QUICHE_NO_EXPORT PublishDoneMessage : public TestMessageBase { public: - SubscribeDoneMessage() : TestMessageBase() { + PublishDoneMessage() : TestMessageBase() { SetWireImage(raw_packet_, sizeof(raw_packet_)); } bool EqualFieldValues(MessageStructuredData& values) const override { - auto cast = std::get<MoqtSubscribeDone>(values); + auto cast = std::get<MoqtPublishDone>(values); if (cast.request_id != subscribe_done_.request_id) { QUIC_LOG(INFO) << "SUBSCRIBE_DONE request ID mismatch"; return false; @@ -901,9 +901,9 @@ 0x02, 0x68, 0x69, // error_reason = "hi" }; - MoqtSubscribeDone subscribe_done_ = { + MoqtPublishDone subscribe_done_ = { /*request_id=*/2, - /*error_code=*/SubscribeDoneCode::kTrackEnded, + /*error_code=*/PublishDoneCode::kTrackEnded, /*stream_count=*/5, /*error_reason=*/"hi", }; @@ -2057,8 +2057,8 @@ return std::make_unique<SubscribeErrorMessage>(); case MoqtMessageType::kUnsubscribe: return std::make_unique<UnsubscribeMessage>(); - case MoqtMessageType::kSubscribeDone: - return std::make_unique<SubscribeDoneMessage>(); + case MoqtMessageType::kPublishDone: + return std::make_unique<PublishDoneMessage>(); case MoqtMessageType::kSubscribeUpdate: return std::make_unique<SubscribeUpdateMessage>(); case MoqtMessageType::kAnnounce:
diff --git a/quiche/quic/moqt/tools/chat_client.h b/quiche/quic/moqt/tools/chat_client.h index 99071e9..fcb9981 100644 --- a/quiche/quic/moqt/tools/chat_client.h +++ b/quiche/quic/moqt/tools/chat_client.h
@@ -102,7 +102,7 @@ absl::string_view object, bool end_of_message) override; - void OnSubscribeDone(FullTrackName /*full_track_name*/) override {} + void OnPublishDone(FullTrackName /*full_track_name*/) override {} // TODO(martinduke): Implement this. void OnMalformedTrack(const FullTrackName& /*full_track_name*/) override {}
diff --git a/quiche/quic/moqt/tools/chat_server.h b/quiche/quic/moqt/tools/chat_server.h index eacf67e..468845f 100644 --- a/quiche/quic/moqt/tools/chat_server.h +++ b/quiche/quic/moqt/tools/chat_server.h
@@ -46,7 +46,7 @@ const PublishedObjectMetadata& metadata, absl::string_view object, bool end_of_message) override; - void OnSubscribeDone(FullTrackName /*full_track_name*/) override {} + void OnPublishDone(FullTrackName /*full_track_name*/) override {} // TODO(martinduke): Implement this. void OnMalformedTrack(const FullTrackName& full_track_name) override {}
diff --git a/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc b/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc index d139961..8fd51fe 100644 --- a/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc +++ b/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc
@@ -193,7 +193,7 @@ output.close(); } - void OnSubscribeDone(FullTrackName /*full_track_name*/) override {} + void OnPublishDone(FullTrackName /*full_track_name*/) override {} void OnMalformedTrack(const FullTrackName& /*full_track_name*/) override {} private:
diff --git a/quiche/quic/moqt/tools/moqt_mock_visitor.h b/quiche/quic/moqt/tools/moqt_mock_visitor.h index da2a337..1dbc446 100644 --- a/quiche/quic/moqt/tools/moqt_mock_visitor.h +++ b/quiche/quic/moqt/tools/moqt_mock_visitor.h
@@ -102,8 +102,7 @@ const PublishedObjectMetadata& metadata, absl::string_view object, bool end_of_message), (override)); - MOCK_METHOD(void, OnSubscribeDone, (FullTrackName full_track_name), - (override)); + MOCK_METHOD(void, OnPublishDone, (FullTrackName full_track_name), (override)); MOCK_METHOD(void, OnMalformedTrack, (const FullTrackName& full_track_name), (override)); };
diff --git a/quiche/quic/moqt/tools/moqt_simulator_bin.cc b/quiche/quic/moqt/tools/moqt_simulator_bin.cc index 4048ba0..4b7d539 100644 --- a/quiche/quic/moqt/tools/moqt_simulator_bin.cc +++ b/quiche/quic/moqt/tools/moqt_simulator_bin.cc
@@ -321,7 +321,7 @@ OnFullObject(metadata.location, object); } - void OnSubscribeDone(FullTrackName /*full_track_name*/) override {} + void OnPublishDone(FullTrackName /*full_track_name*/) override {} void OnMalformedTrack(const FullTrackName& /*full_track_name*/) override {} void OnFullObject(Location sequence, absl::string_view payload) {