Implement MoQT SUBSCRIBE_DONE.
PiperOrigin-RevId: 735875434
diff --git a/quiche/quic/moqt/moqt_framer.cc b/quiche/quic/moqt/moqt_framer.cc
index fb324f7..60ba2ea 100644
--- a/quiche/quic/moqt/moqt_framer.cc
+++ b/quiche/quic/moqt/moqt_framer.cc
@@ -571,18 +571,10 @@
quiche::QuicheBuffer MoqtFramer::SerializeSubscribeDone(
const MoqtSubscribeDone& message) {
- if (message.final_id.has_value()) {
- return SerializeControlMessage(
- MoqtMessageType::kSubscribeDone, WireVarInt62(message.subscribe_id),
- WireVarInt62(message.status_code),
- WireStringWithVarInt62Length(message.reason_phrase), WireUint8(1),
- WireVarInt62(message.final_id->group),
- WireVarInt62(message.final_id->object));
- }
return SerializeControlMessage(
MoqtMessageType::kSubscribeDone, WireVarInt62(message.subscribe_id),
- WireVarInt62(message.status_code),
- WireStringWithVarInt62Length(message.reason_phrase), WireUint8(0));
+ WireVarInt62(message.status_code), WireVarInt62(message.stream_count),
+ WireStringWithVarInt62Length(message.reason_phrase));
}
quiche::QuicheBuffer MoqtFramer::SerializeSubscribeUpdate(
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc
index 992c3e9..71e59a0 100644
--- a/quiche/quic/moqt/moqt_integration_test.cc
+++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -79,6 +79,19 @@
ConnectEndpoints();
}
+ // Client subscribes to the latest object in |track_name|.
+ void SubscribeLatestObject(FullTrackName track_name,
+ MockSubscribeRemoteTrackVisitor* visitor) {
+ bool received_ok = false;
+ EXPECT_CALL(*visitor, OnReply(track_name, std::optional<FullSequence>(),
+ std::optional<absl::string_view>()))
+ .WillOnce([&]() { received_ok = true; });
+ client_->session()->SubscribeCurrentObject(track_name, visitor);
+ bool success =
+ test_harness_.RunUntilWithDefaultTimeout([&]() { return received_ok; });
+ EXPECT_TRUE(success);
+ }
+
protected:
quic::simulator::TestHarness test_harness_;
@@ -423,6 +436,7 @@
MoqtKnownTrackPublisher publisher;
server_->session()->set_publisher(&publisher);
+ // TODO(martinduke): Unmock this.
auto track_publisher = std::make_shared<MockTrackPublisher>(full_track_name);
publisher.Add(track_publisher);
@@ -443,6 +457,7 @@
MoqtKnownTrackPublisher publisher;
server_->session()->set_publisher(&publisher);
+ // TODO(martinduke): Unmock this.
auto track_publisher = std::make_shared<MockTrackPublisher>(full_track_name);
publisher.Add(track_publisher);
@@ -463,6 +478,7 @@
MoqtKnownTrackPublisher publisher;
server_->session()->set_publisher(&publisher);
+ // TODO(martinduke): Unmock this.
auto track_publisher = std::make_shared<MockTrackPublisher>(full_track_name);
publisher.Add(track_publisher);
@@ -491,6 +507,48 @@
EXPECT_TRUE(success);
}
+TEST_F(MoqtIntegrationTest, CleanSubscribeDone) {
+ EstablishSession();
+ FullTrackName full_track_name("foo", "bar");
+
+ MoqtKnownTrackPublisher publisher;
+ server_->session()->set_publisher(&publisher);
+ auto queue = std::make_shared<MoqtLiveRelayQueue>(
+ full_track_name, MoqtForwardingPreference::kSubgroup);
+ publisher.Add(queue);
+
+ MockSubscribeRemoteTrackVisitor client_visitor;
+ SubscribeLatestObject(full_track_name, &client_visitor);
+
+ // Deliver 3 objects on 2 streams.
+ queue->AddObject(FullSequence(0, 0), "object,0,0", false);
+ queue->AddObject(FullSequence(0, 1), "object,0,1", true);
+ queue->AddObject(FullSequence(1, 0), "object,1,0", true);
+ int received = 0;
+ EXPECT_CALL(client_visitor, OnObjectFragment).WillRepeatedly([&]() {
+ ++received;
+ });
+ bool success =
+ test_harness_.RunUntilWithDefaultTimeout([&]() { return received == 3; });
+ EXPECT_TRUE(success);
+
+ // Reject this subscribe because there already is one.
+ EXPECT_FALSE(client_->session()->SubscribeCurrentGroup(full_track_name,
+ &client_visitor));
+ queue->RemoveAllSubscriptions(); // Induce a SUBSCRIBE_DONE.
+ bool subscribe_done = false;
+ EXPECT_CALL(client_visitor, OnSubscribeDone).WillOnce([&]() {
+ subscribe_done = true;
+ });
+ success = test_harness_.RunUntilWithDefaultTimeout(
+ [&]() { return subscribe_done; });
+ EXPECT_TRUE(success);
+ // Subscription is deleted; the client session should not immediately reject
+ // a new attempt.
+ EXPECT_TRUE(client_->session()->SubscribeCurrentGroup(full_track_name,
+ &client_visitor));
+}
+
TEST_F(MoqtIntegrationTest, ObjectAcks) {
CreateDefaultEndpoints();
WireUpEndpoints();
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h
index 8c01b17..af78e4d 100644
--- a/quiche/quic/moqt/moqt_messages.h
+++ b/quiche/quic/moqt/moqt_messages.h
@@ -445,8 +445,8 @@
struct QUICHE_EXPORT MoqtSubscribeDone {
uint64_t subscribe_id;
SubscribeDoneCode status_code;
+ uint64_t stream_count;
std::string reason_phrase;
- std::optional<FullSequence> final_id;
};
struct QUICHE_EXPORT MoqtSubscribeUpdate {
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc
index cef87a8..5233a6b 100644
--- a/quiche/quic/moqt/moqt_parser.cc
+++ b/quiche/quic/moqt/moqt_parser.cc
@@ -479,26 +479,14 @@
size_t MoqtControlParser::ProcessSubscribeDone(quic::QuicDataReader& reader) {
MoqtSubscribeDone subscribe_done;
- uint8_t content_exists;
uint64_t value;
if (!reader.ReadVarInt62(&subscribe_done.subscribe_id) ||
!reader.ReadVarInt62(&value) ||
- !reader.ReadStringVarInt62(subscribe_done.reason_phrase) ||
- !reader.ReadUInt8(&content_exists)) {
+ !reader.ReadVarInt62(&subscribe_done.stream_count) ||
+ !reader.ReadStringVarInt62(subscribe_done.reason_phrase)) {
return 0;
}
subscribe_done.status_code = static_cast<SubscribeDoneCode>(value);
- if (content_exists > 1) {
- ParseError("SUBSCRIBE_DONE ContentExists has invalid value");
- return 0;
- }
- if (content_exists == 1) {
- subscribe_done.final_id = FullSequence();
- if (!reader.ReadVarInt62(&subscribe_done.final_id->group) ||
- !reader.ReadVarInt62(&subscribe_done.final_id->object)) {
- return 0;
- }
- }
visitor_.OnSubscribeDoneMessage(subscribe_done);
return reader.PreviouslyReadPayload().length();
}
diff --git a/quiche/quic/moqt/moqt_parser_test.cc b/quiche/quic/moqt/moqt_parser_test.cc
index 267cdb1..03b9856 100644
--- a/quiche/quic/moqt/moqt_parser_test.cc
+++ b/quiche/quic/moqt/moqt_parser_test.cc
@@ -1158,17 +1158,6 @@
"Invalid group order value in SUBSCRIBE_OK");
}
-TEST_F(MoqtMessageSpecificTest, SubscribeDoneInvalidContentExists) {
- MoqtControlParser parser(kRawQuic, visitor_);
- SubscribeDoneMessage subscribe_done;
- subscribe_done.SetInvalidContentExists();
- parser.ProcessData(subscribe_done.PacketSample(), false);
- EXPECT_EQ(visitor_.messages_received_, 0);
- EXPECT_TRUE(visitor_.parsing_error_.has_value());
- EXPECT_EQ(*visitor_.parsing_error_,
- "SUBSCRIBE_DONE ContentExists has invalid value");
-}
-
TEST_F(MoqtMessageSpecificTest, FetchInvalidRange) {
MoqtControlParser parser(kRawQuic, visitor_);
FetchMessage fetch;
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index d9b2c1b..7b1f300 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -420,10 +420,7 @@
MoqtUnsubscribe message;
message.subscribe_id = track->subscribe_id();
SendControlMessage(framer_.SerializeUnsubscribe(message));
- // Destroy state.
- subscribe_by_name_.erase(name);
- subscribe_by_alias_.erase(track->track_alias());
- upstream_by_id_.erase(track->subscribe_id());
+ DestroySubscription(track);
}
bool MoqtSession::Fetch(const FullTrackName& name,
@@ -605,8 +602,8 @@
MoqtSubscribeDone subscribe_done;
subscribe_done.subscribe_id = subscribe_id;
subscribe_done.status_code = code;
+ subscribe_done.stream_count = subscription.streams_opened();
subscribe_done.reason_phrase = reason_phrase;
- subscribe_done.final_id = subscription.largest_sent();
SendControlMessage(framer_.SerializeSubscribeDone(subscribe_done));
QUIC_DLOG(INFO) << ENDPOINT << "Sent SUBSCRIBE_DONE message for "
<< subscription.publisher().GetTrackName();
@@ -622,6 +619,18 @@
return true;
}
+void MoqtSession::MaybeDestroySubscription(SubscribeRemoteTrack* subscribe) {
+ if (subscribe != nullptr && subscribe->all_streams_closed()) {
+ DestroySubscription(subscribe);
+ }
+}
+
+void MoqtSession::DestroySubscription(SubscribeRemoteTrack* subscribe) {
+ subscribe->visitor()->OnSubscribeDone(subscribe->full_track_name());
+ subscribe_by_name_.erase(subscribe->full_track_name());
+ subscribe_by_alias_.erase(subscribe->track_alias());
+}
+
bool MoqtSession::Subscribe(MoqtSubscribe& message,
SubscribeRemoteTrack::Visitor* visitor,
std::optional<uint64_t> provided_track_alias) {
@@ -683,7 +692,10 @@
SendControlMessage(framer_.SerializeSubscribe(message));
QUIC_DLOG(INFO) << ENDPOINT << "Sent SUBSCRIBE message for "
<< message.full_track_name;
- auto track = std::make_unique<SubscribeRemoteTrack>(message, visitor);
+ auto track = std::make_unique<SubscribeRemoteTrack>(
+ message, visitor,
+ message.parameters.delivery_timeout.value_or(
+ quic::QuicTimeDelta::Infinite()));
subscribe_by_name_.emplace(message.full_track_name, track.get());
subscribe_by_alias_.emplace(message.track_alias, track.get());
upstream_by_id_.emplace(message.subscribe_id, std::move(track));
@@ -1056,13 +1068,21 @@
"Received SUBSCRIBE_OK for a FETCH");
return;
}
- QUIC_DLOG(INFO) << ENDPOINT << "Received the SUBSCRIBE_OK for "
- << "subscribe_id = " << message.subscribe_id << " "
- << track->full_track_name();
+ if (message.largest_id.has_value()) {
+ QUIC_DLOG(INFO) << ENDPOINT << "Received the SUBSCRIBE_OK for "
+ << "subscribe_id = " << message.subscribe_id << " "
+ << track->full_track_name()
+ << " largest_id = " << *message.largest_id;
+ } else {
+ QUIC_DLOG(INFO) << ENDPOINT << "Received the SUBSCRIBE_OK for "
+ << "subscribe_id = " << message.subscribe_id << " "
+ << track->full_track_name();
+ }
SubscribeRemoteTrack* subscribe = static_cast<SubscribeRemoteTrack*>(track);
subscribe->OnObjectOrOk();
// TODO(martinduke): Handle expires field.
// TODO(martinduke): Resize the window based on largest_id.
+ // TODO(martinduke): Handle delivery_timeout parameter.
if (subscribe->visitor() != nullptr) {
subscribe->visitor()->OnReply(track->full_track_name(), message.largest_id,
std::nullopt);
@@ -1123,6 +1143,22 @@
session_->published_subscriptions_.erase(it);
}
+void MoqtSession::ControlStream::OnSubscribeDoneMessage(
+ const MoqtSubscribeDone& message) {
+ auto it = session_->upstream_by_id_.find(message.subscribe_id);
+ if (it == session_->upstream_by_id_.end()) {
+ return;
+ }
+ auto* subscribe = static_cast<SubscribeRemoteTrack*>(it->second.get());
+ QUIC_DLOG(INFO) << ENDPOINT << "Received a SUBSCRIBE_DONE for "
+ << it->second->full_track_name();
+ subscribe->OnSubscribeDone(
+ message.stream_count, session_->callbacks_.clock,
+ absl::WrapUnique(session_->alarm_factory_->CreateAlarm(
+ new SubscribeDoneDelegate(session_, subscribe))));
+ session_->MaybeDestroySubscription(subscribe);
+}
+
void MoqtSession::ControlStream::OnSubscribeUpdateMessage(
const MoqtSubscribeUpdate& message) {
auto it = session_->published_subscriptions_.find(message.subscribe_id);
@@ -1606,11 +1642,28 @@
}
MoqtSession::IncomingDataStream::~IncomingDataStream() {
- if (parser_.track_alias().has_value() &&
- parser_.stream_type() == MoqtDataStreamType::kStreamHeaderFetch &&
- track_.IsValid()) {
+ QUICHE_DVLOG(1) << ENDPOINT << "Destroying incoming data stream "
+ << stream_->GetStreamId();
+ if (!parser_.track_alias().has_value()) {
+ QUIC_DVLOG(1) << ENDPOINT
+ << "Destroying incoming data stream before "
+ "learning track alias";
+ return;
+ }
+ if (!track_.IsValid()) {
+ return;
+ }
+ if (parser_.stream_type() == MoqtDataStreamType::kStreamHeaderFetch) {
session_->upstream_by_id_.erase(*parser_.track_alias());
}
+ // It's a subscribe.
+ SubscribeRemoteTrack* subscribe =
+ static_cast<SubscribeRemoteTrack*>(track_.GetIfAvailable());
+ if (subscribe == nullptr) {
+ return;
+ }
+ subscribe->OnStreamClosed();
+ session_->MaybeDestroySubscription(subscribe);
}
void MoqtSession::IncomingDataStream::MaybeReadOneObject() {
@@ -1648,17 +1701,31 @@
return;
}
}
- if (parser_.stream_type() != MoqtDataStreamType::kStreamHeaderFetch) {
+ bool knew_track_alias = parser_.track_alias().has_value();
+ if (parser_.stream_type() == MoqtDataStreamType::kStreamHeaderSubgroup) {
parser_.ReadAllData();
+ } else if (!knew_track_alias) {
+ parser_.ReadTrackAlias();
+ }
+ if (!parser_.track_alias().has_value()) {
return;
}
- bool learned_track_alias = false;
- if (!parser_.track_alias().has_value()) {
- learned_track_alias = true;
- parser_.ReadTrackAlias();
- if (!parser_.track_alias().has_value()) {
+ if (parser_.stream_type() == MoqtDataStreamType::kStreamHeaderSubgroup) {
+ if (knew_track_alias) {
return;
}
+ // This is a new stream for a subscribe. Notify the subscription.
+ auto it = session_->subscribe_by_alias_.find(*parser_.track_alias());
+ if (it == session_->subscribe_by_alias_.end()) {
+ QUIC_DLOG(INFO) << ENDPOINT
+ << "Received object for a track with no SUBSCRIBE";
+ // This is a not a session error because there might be an UNSUBSCRIBE in
+ // flight.
+ stream_->SendStopSending(kResetCodeSubscriptionGone);
+ return;
+ }
+ it->second->OnStreamOpened();
+ return;
}
auto it = session_->upstream_by_id_.find(*parser_.track_alias());
if (it == session_->upstream_by_id_.end()) {
@@ -1674,7 +1741,7 @@
return;
}
UpstreamFetch* fetch = static_cast<UpstreamFetch*>(it->second.get());
- if (learned_track_alias) {
+ if (!knew_track_alias) {
// If the task already exists (FETCH_OK has arrived), the callback will
// immediately execute to read the first object. Otherwise, it will only
// execute when the task is created or a cached object is read.
@@ -1986,6 +2053,7 @@
void MoqtSession::PublishedSubscription::OnDataStreamCreated(
webtransport::StreamId id, FullSequence start_sequence) {
+ ++streams_opened_;
stream_map().AddStream(start_sequence, id);
}
void MoqtSession::PublishedSubscription::OnDataStreamDestroyed(
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index 482d70d..a62e58e 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -284,8 +284,7 @@
void OnSubscribeErrorMessage(const MoqtSubscribeError& message) override;
void OnUnsubscribeMessage(const MoqtUnsubscribe& message) override;
// There is no state to update for SUBSCRIBE_DONE.
- void OnSubscribeDoneMessage(const MoqtSubscribeDone& /*message*/) override {
- }
+ void OnSubscribeDoneMessage(const MoqtSubscribeDone& /*message*/) override;
void OnSubscribeUpdateMessage(const MoqtSubscribeUpdate& message) override;
void OnAnnounceMessage(const MoqtAnnounce& message) override;
void OnAnnounceOkMessage(const MoqtAnnounceOk& message) override;
@@ -478,6 +477,8 @@
return reset_subgroups_;
}
+ uint64_t streams_opened() const { return streams_opened_; }
+
private:
SendStreamMap& stream_map();
quic::Perspective perspective() const {
@@ -498,6 +499,7 @@
uint64_t track_alias_;
SubscribeWindow window_;
MoqtPriority subscriber_priority_;
+ uint64_t streams_opened_ = 0;
// The subscription will ignore any groups with a lower ID, so it doesn't
// need to track reset subgroups.
@@ -643,11 +645,25 @@
MoqtSession* session_;
};
- // Private members of MoqtSession.
+ class SubscribeDoneDelegate : public quic::QuicAlarm::DelegateWithoutContext {
+ public:
+ SubscribeDoneDelegate(MoqtSession* session, SubscribeRemoteTrack* subscribe)
+ : session_(session), subscribe_(subscribe) {}
+ void OnAlarm() override { session_->DestroySubscription(subscribe_); }
+
+ private:
+ MoqtSession* session_;
+ SubscribeRemoteTrack* subscribe_;
+ };
+
+ // Private members of MoqtSession.
// Returns true if SUBSCRIBE_DONE was sent.
bool SubscribeIsDone(uint64_t subscribe_id, SubscribeDoneCode code,
absl::string_view reason_phrase);
+ void MaybeDestroySubscription(SubscribeRemoteTrack* subscribe);
+ void DestroySubscription(SubscribeRemoteTrack* subscribe);
+
// Returns the pointer to the control stream, or nullptr if none is present.
ControlStream* GetControlStream();
// Sends a message on the control stream; QUICHE_DCHECKs if no control stream
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index d511ef7..c4e9332 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -4,6 +4,7 @@
#include "quiche/quic/moqt/moqt_session.h"
+#include <algorithm>
#include <cstdint>
#include <cstring>
#include <memory>
@@ -115,6 +116,61 @@
EXPECT_CALL(session_callbacks_.session_deleted_callback, Call());
}
+ // If visitor == nullptr, it's the first object in the stream, and will be
+ // assigned to the visitor the session creates.
+ // TODO(martinduke): Support delivering object payload.
+ void DeliverObject(MoqtObject& object, bool fin,
+ webtransport::test::MockSession& session,
+ webtransport::test::MockStream* stream,
+ std::unique_ptr<webtransport::StreamVisitor>& visitor,
+ MockSubscribeRemoteTrackVisitor* track_visitor) {
+ MoqtFramer framer(quiche::SimpleBufferAllocator::Get(), true);
+ quiche::QuicheBuffer buffer = framer.SerializeObjectHeader(
+ object, MoqtDataStreamType::kStreamHeaderSubgroup, visitor == nullptr);
+ size_t data_read = 0;
+ if (visitor == nullptr) { // It's the first object in the stream
+ EXPECT_CALL(session, AcceptIncomingUnidirectionalStream())
+ .WillOnce(Return(stream))
+ .WillOnce(Return(nullptr));
+ EXPECT_CALL(*stream, SetVisitor(_))
+ .WillOnce(Invoke(
+ [&](std::unique_ptr<webtransport::StreamVisitor> new_visitor) {
+ visitor = std::move(new_visitor);
+ }));
+ EXPECT_CALL(*stream, visitor()).WillRepeatedly(Invoke([&]() {
+ return visitor.get();
+ }));
+ }
+ EXPECT_CALL(*stream, PeekNextReadableRegion()).WillRepeatedly(Invoke([&]() {
+ return quiche::ReadStream::PeekResult(
+ absl::string_view(buffer.data() + data_read,
+ buffer.size() - data_read),
+ fin && data_read == buffer.size(), fin, );
+ }));
+ EXPECT_CALL(*stream, ReadableBytes()).WillRepeatedly(Invoke([&]() {
+ return buffer.size() - data_read;
+ }));
+ EXPECT_CALL(*stream, Read(testing::An<absl::Span<char>>()))
+ .WillRepeatedly(Invoke([&](absl::Span<char> bytes_to_read) {
+ size_t read_size =
+ std::min(bytes_to_read.size(), buffer.size() - data_read);
+ memcpy(bytes_to_read.data(), buffer.data() + data_read, read_size);
+ data_read += read_size;
+ return quiche::ReadStream::ReadResult(
+ read_size, fin && data_read == buffer.size());
+ }));
+ EXPECT_CALL(*stream, SkipBytes(_)).WillRepeatedly(Invoke([&](size_t bytes) {
+ data_read += bytes;
+ return fin && data_read == buffer.size();
+ }));
+ EXPECT_CALL(*track_visitor, OnObjectFragment).Times(1);
+ if (visitor == nullptr) {
+ session_.OnIncomingUnidirectionalStreamAvailable();
+ } else {
+ visitor->OnCanRead();
+ }
+ }
+
MockSessionCallbacks session_callbacks_;
webtransport::test::MockSession mock_session_;
MoqtSession session_;
@@ -3132,6 +3188,178 @@
EXPECT_TRUE(reported_error);
}
+TEST_F(MoqtSessionTest, ReceiveSubscribeDoneWithOpenStreams) {
+ MockSubscribeRemoteTrackVisitor remote_track_visitor;
+ webtransport::test::MockStream control_stream;
+ std::unique_ptr<MoqtControlParserVisitor> stream_input =
+ MoqtSessionPeer::CreateControlStream(&session_, &control_stream);
+ EXPECT_CALL(mock_session_, GetStreamById(_))
+ .WillRepeatedly(Return(&control_stream));
+ EXPECT_CALL(control_stream,
+ Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _));
+ EXPECT_TRUE(session_.SubscribeCurrentObject(FullTrackName("foo", "bar"),
+ &remote_track_visitor));
+ MoqtSubscribeOk ok = {
+ /*subscribe_id=*/0,
+ /*expires=*/quic::QuicTimeDelta::FromMilliseconds(10000),
+ /*group_order=*/MoqtDeliveryOrder::kAscending,
+ /*largest_id=*/std::nullopt,
+ /*parameters=*/MoqtSubscribeParameters(),
+ };
+ stream_input->OnSubscribeOkMessage(ok);
+ constexpr uint64_t kNumStreams = 3;
+ webtransport::test::MockStream data[kNumStreams];
+ std::unique_ptr<webtransport::StreamVisitor> data_streams[kNumStreams];
+
+ MoqtObject object = {
+ /*track_alias=*/0,
+ /*group_id=*/0,
+ /*object_id=*/0,
+ /*publisher_priority=*/7,
+ std::vector<MoqtExtensionHeader>(),
+ /*object_status=*/MoqtObjectStatus::kGroupDoesNotExist,
+ /*subgroup_id=*/0,
+ /*payload_length=*/0,
+ };
+ for (uint64_t i = 0; i < kNumStreams; ++i) {
+ EXPECT_CALL(data[i], GetStreamId())
+ .WillRepeatedly(Return(kOutgoingUniStreamId + i * 4));
+ EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId + i * 4))
+ .WillRepeatedly(Return(&data[i]));
+ object.group_id = i;
+ DeliverObject(object, false, mock_session_, &data[i], data_streams[i],
+ &remote_track_visitor);
+ }
+ SubscribeRemoteTrack* track = MoqtSessionPeer::remote_track(&session_, 0);
+ ASSERT_NE(track, nullptr);
+ EXPECT_FALSE(track->all_streams_closed());
+ stream_input->OnSubscribeDoneMessage(
+ MoqtSubscribeDone(0, SubscribeDoneCode::kTrackEnded, kNumStreams, "foo"));
+ track = MoqtSessionPeer::remote_track(&session_, 0);
+ ASSERT_NE(track, nullptr);
+ EXPECT_FALSE(track->all_streams_closed());
+ EXPECT_CALL(remote_track_visitor, OnSubscribeDone(_));
+ for (uint64_t i = 0; i < kNumStreams; ++i) {
+ data_streams[i].reset();
+ }
+ EXPECT_EQ(MoqtSessionPeer::remote_track(&session_, 0), nullptr);
+}
+
+TEST_F(MoqtSessionTest, ReceiveSubscribeDoneWithClosedStreams) {
+ MockSubscribeRemoteTrackVisitor remote_track_visitor;
+ webtransport::test::MockStream control_stream;
+ std::unique_ptr<MoqtControlParserVisitor> stream_input =
+ MoqtSessionPeer::CreateControlStream(&session_, &control_stream);
+ EXPECT_CALL(mock_session_, GetStreamById(_))
+ .WillRepeatedly(Return(&control_stream));
+ EXPECT_CALL(control_stream,
+ Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _));
+ EXPECT_TRUE(session_.SubscribeCurrentObject(FullTrackName("foo", "bar"),
+ &remote_track_visitor));
+ MoqtSubscribeOk ok = {
+ /*subscribe_id=*/0,
+ /*expires=*/quic::QuicTimeDelta::FromMilliseconds(10000),
+ /*group_order=*/MoqtDeliveryOrder::kAscending,
+ /*largest_id=*/std::nullopt,
+ /*parameters=*/MoqtSubscribeParameters(),
+ };
+ stream_input->OnSubscribeOkMessage(ok);
+ constexpr uint64_t kNumStreams = 3;
+ webtransport::test::MockStream data[kNumStreams];
+ std::unique_ptr<webtransport::StreamVisitor> data_streams[kNumStreams];
+
+ MoqtObject object = {
+ /*track_alias=*/0,
+ /*group_id=*/0,
+ /*object_id=*/0,
+ /*publisher_priority=*/7,
+ std::vector<MoqtExtensionHeader>(),
+ /*object_status=*/MoqtObjectStatus::kGroupDoesNotExist,
+ /*subgroup_id=*/0,
+ /*payload_length=*/0,
+ };
+ for (uint64_t i = 0; i < kNumStreams; ++i) {
+ EXPECT_CALL(data[i], GetStreamId())
+ .WillRepeatedly(Return(kOutgoingUniStreamId + i * 4));
+ EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId + i * 4))
+ .WillRepeatedly(Return(&data[i]));
+ object.group_id = i;
+ DeliverObject(object, true, mock_session_, &data[i], data_streams[i],
+ &remote_track_visitor);
+ }
+ for (uint64_t i = 0; i < kNumStreams; ++i) {
+ data_streams[i].reset();
+ }
+ SubscribeRemoteTrack* track = MoqtSessionPeer::remote_track(&session_, 0);
+ ASSERT_NE(track, nullptr);
+ EXPECT_FALSE(track->all_streams_closed());
+ EXPECT_CALL(remote_track_visitor, OnSubscribeDone(_));
+ stream_input->OnSubscribeDoneMessage(
+ MoqtSubscribeDone(0, SubscribeDoneCode::kTrackEnded, kNumStreams, "foo"));
+ EXPECT_EQ(MoqtSessionPeer::remote_track(&session_, 0), nullptr);
+}
+
+TEST_F(MoqtSessionTest, SubscribeDoneTimeout) {
+ MockSubscribeRemoteTrackVisitor remote_track_visitor;
+ webtransport::test::MockStream control_stream;
+ std::unique_ptr<MoqtControlParserVisitor> stream_input =
+ MoqtSessionPeer::CreateControlStream(&session_, &control_stream);
+ EXPECT_CALL(mock_session_, GetStreamById(_))
+ .WillRepeatedly(Return(&control_stream));
+ EXPECT_CALL(control_stream,
+ Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _));
+ EXPECT_TRUE(session_.SubscribeCurrentObject(FullTrackName("foo", "bar"),
+ &remote_track_visitor));
+ MoqtSubscribeOk ok = {
+ /*subscribe_id=*/0,
+ /*expires=*/quic::QuicTimeDelta::FromMilliseconds(10000),
+ /*group_order=*/MoqtDeliveryOrder::kAscending,
+ /*largest_id=*/std::nullopt,
+ /*parameters=*/MoqtSubscribeParameters(),
+ };
+ stream_input->OnSubscribeOkMessage(ok);
+ constexpr uint64_t kNumStreams = 3;
+ webtransport::test::MockStream data[kNumStreams];
+ std::unique_ptr<webtransport::StreamVisitor> data_streams[kNumStreams];
+
+ MoqtObject object = {
+ /*track_alias=*/0,
+ /*group_id=*/0,
+ /*object_id=*/0,
+ /*publisher_priority=*/7,
+ std::vector<MoqtExtensionHeader>(),
+ /*object_status=*/MoqtObjectStatus::kGroupDoesNotExist,
+ /*subgroup_id=*/0,
+ /*payload_length=*/0,
+ };
+ for (uint64_t i = 0; i < kNumStreams; ++i) {
+ EXPECT_CALL(data[i], GetStreamId())
+ .WillRepeatedly(Return(kOutgoingUniStreamId + i * 4));
+ EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId + i * 4))
+ .WillRepeatedly(Return(&data[i]));
+ object.group_id = i;
+ DeliverObject(object, true, mock_session_, &data[i], data_streams[i],
+ &remote_track_visitor);
+ }
+ for (uint64_t i = 0; i < kNumStreams; ++i) {
+ data_streams[i].reset();
+ }
+ SubscribeRemoteTrack* track = MoqtSessionPeer::remote_track(&session_, 0);
+ ASSERT_NE(track, nullptr);
+ EXPECT_FALSE(track->all_streams_closed());
+ // stream_count includes a stream that was never sent.
+ stream_input->OnSubscribeDoneMessage(MoqtSubscribeDone(
+ 0, SubscribeDoneCode::kTrackEnded, kNumStreams + 1, "foo"));
+ EXPECT_FALSE(track->all_streams_closed());
+ auto* subscribe_done_alarm =
+ static_cast<quic::test::MockAlarmFactory::TestAlarm*>(
+ MoqtSessionPeer::GetSubscribeDoneAlarm(track));
+ EXPECT_CALL(remote_track_visitor, OnSubscribeDone(_));
+ subscribe_done_alarm->Fire();
+ // quic::test::MockAlarmFactory::FireAlarm(subscribe_done_alarm);;
+ EXPECT_EQ(MoqtSessionPeer::remote_track(&session_, 0), nullptr);
+}
+
// TODO: re-enable this test once this behavior is re-implemented.
#if 0
TEST_F(MoqtSessionTest, SubscribeUpdateClosesSubscription) {
diff --git a/quiche/quic/moqt/moqt_track.cc b/quiche/quic/moqt/moqt_track.cc
index eededa5..4e131a4 100644
--- a/quiche/quic/moqt/moqt_track.cc
+++ b/quiche/quic/moqt/moqt_track.cc
@@ -4,6 +4,8 @@
#include "quiche/quic/moqt/moqt_track.h"
+#include <algorithm>
+#include <cstdint>
#include <cstring>
#include <memory>
#include <optional>
@@ -11,6 +13,9 @@
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
+#include "quiche/quic/core/quic_alarm.h"
+#include "quiche/quic/core/quic_clock.h"
+#include "quiche/quic/core/quic_time.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_publisher.h"
#include "quiche/common/platform/api/quiche_bug_tracker.h"
@@ -21,6 +26,15 @@
namespace moqt {
+namespace {
+
+constexpr quic::QuicTimeDelta kMinSubscribeDoneTimeout =
+ quic::QuicTimeDelta::FromSeconds(1);
+constexpr quic::QuicTimeDelta kMaxSubscribeDoneTimeout =
+ quic::QuicTimeDelta::FromSeconds(10);
+
+} // namespace
+
bool RemoteTrack::CheckDataStreamType(MoqtDataStreamType type) {
if (data_stream_type_.has_value()) {
return data_stream_type_.value() == type;
@@ -29,6 +43,42 @@
return true;
}
+void SubscribeRemoteTrack::OnStreamOpened() {
+ ++currently_open_streams_;
+ if (subscribe_done_alarm_ != nullptr && subscribe_done_alarm_->IsSet()) {
+ subscribe_done_alarm_->Cancel();
+ }
+}
+
+void SubscribeRemoteTrack::OnStreamClosed() {
+ ++streams_closed_;
+ --currently_open_streams_;
+ QUICHE_DCHECK_GE(currently_open_streams_, -1);
+ if (subscribe_done_alarm_ == nullptr) {
+ return;
+ }
+ MaybeSetSubscribeDoneAlarm();
+}
+
+void SubscribeRemoteTrack::OnSubscribeDone(
+ uint64_t stream_count, const quic::QuicClock* clock,
+ std::unique_ptr<quic::QuicAlarm> subscribe_done_alarm) {
+ total_streams_ = stream_count;
+ clock_ = clock;
+ subscribe_done_alarm_ = std::move(subscribe_done_alarm);
+ MaybeSetSubscribeDoneAlarm();
+}
+
+void SubscribeRemoteTrack::MaybeSetSubscribeDoneAlarm() {
+ if (currently_open_streams_ == 0 && total_streams_.has_value() &&
+ clock_ != nullptr) {
+ quic::QuicTimeDelta timeout =
+ std::min(delivery_timeout_, kMaxSubscribeDoneTimeout);
+ timeout = std::max(timeout, kMinSubscribeDoneTimeout);
+ subscribe_done_alarm_->Set(clock_->ApproximateNow() + timeout);
+ }
+}
+
void SubscribeRemoteTrack::OnJoiningFetchReady(
std::unique_ptr<MoqtFetchTask> fetch_task) {
fetch_task_ = std::move(fetch_task);
diff --git a/quiche/quic/moqt/moqt_track.h b/quiche/quic/moqt/moqt_track.h
index dd9f6cd..b1db0c3 100644
--- a/quiche/quic/moqt/moqt_track.h
+++ b/quiche/quic/moqt/moqt_track.h
@@ -12,6 +12,7 @@
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
+#include "quiche/quic/core/quic_alarm.h"
#include "quiche/quic/core/quic_time.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_priority.h"
@@ -25,6 +26,7 @@
namespace moqt {
namespace test {
+class MoqtSessionPeer;
class SubscribeRemoteTrackPeer;
} // namespace test
@@ -110,9 +112,11 @@
const FullTrackName& full_track_name, FullSequence sequence,
MoqtPriority publisher_priority, MoqtObjectStatus object_status,
absl::string_view object, bool end_of_message) = 0;
- // TODO(martinduke): Add final sequence numbers
+ virtual void OnSubscribeDone(FullTrackName full_track_name) = 0;
};
- SubscribeRemoteTrack(const MoqtSubscribe& subscribe, Visitor* visitor)
+ SubscribeRemoteTrack(
+ const MoqtSubscribe& subscribe, Visitor* visitor,
+ quic::QuicTimeDelta delivery_timeout = quic::QuicTimeDelta::Infinite())
: RemoteTrack(subscribe.full_track_name, subscribe.subscribe_id,
SubscribeWindow(subscribe.start_group.value_or(0),
subscribe.start_object.value_or(0),
@@ -120,7 +124,13 @@
UINT64_MAX)),
track_alias_(subscribe.track_alias),
visitor_(visitor),
+ delivery_timeout_(delivery_timeout),
subscribe_(std::make_unique<MoqtSubscribe>(subscribe)) {}
+ ~SubscribeRemoteTrack() override {
+ if (subscribe_done_alarm_ != nullptr) {
+ subscribe_done_alarm_->PermanentCancel();
+ }
+ }
void OnObjectOrOk() override {
subscribe_.reset(); // No SUBSCRIBE_ERROR, no need to store this anymore.
@@ -143,6 +153,13 @@
return (is_datagram_ == is_datagram);
}
}
+ void OnStreamOpened();
+ void OnStreamClosed();
+ void OnSubscribeDone(uint64_t stream_count, const quic::QuicClock* clock,
+ std::unique_ptr<quic::QuicAlarm> subscribe_done_alarm);
+ bool all_streams_closed() const {
+ return total_streams_.has_value() && *total_streams_ == streams_closed_;
+ }
// The application can request a Joining FETCH but also for FETCH objects to
// be delivered via SubscribeRemoteTrack::Visitor::OnObjectFragment(). When
@@ -151,13 +168,28 @@
void OnJoiningFetchReady(std::unique_ptr<MoqtFetchTask> fetch_task);
private:
+ friend class test::MoqtSessionPeer;
friend class test::SubscribeRemoteTrackPeer;
+
+ void MaybeSetSubscribeDoneAlarm();
+
void FetchObjects();
std::unique_ptr<MoqtFetchTask> fetch_task_;
const uint64_t track_alias_;
Visitor* visitor_;
std::optional<bool> is_datagram_;
+ int currently_open_streams_ = 0;
+ // Every stream that has received FIN or RESET_STREAM.
+ uint64_t streams_closed_ = 0;
+ // Value assigned on SUBSCRIBE_DONE. Can destroy subscription state if
+ // streams_closed_ == total_streams_.
+ std::optional<uint64_t> total_streams_;
+ // Timer to clean up the track if there are no open streams.
+ quic::QuicTimeDelta delivery_timeout_ = quic::QuicTimeDelta::Infinite();
+ std::unique_ptr<quic::QuicAlarm> subscribe_done_alarm_ = nullptr;
+ const quic::QuicClock* clock_ = nullptr;
+
// For convenience, store the subscribe message if it has to be re-sent with
// a new track alias.
std::unique_ptr<MoqtSubscribe> subscribe_;
diff --git a/quiche/quic/moqt/moqt_track_test.cc b/quiche/quic/moqt/moqt_track_test.cc
index a0d05da..13a6700 100644
--- a/quiche/quic/moqt/moqt_track_test.cc
+++ b/quiche/quic/moqt/moqt_track_test.cc
@@ -8,7 +8,10 @@
#include <optional>
#include <utility>
+#include "absl/memory/memory.h"
#include "absl/status/status.h"
+#include "quiche/quic/core/quic_alarm.h"
+#include "quiche/quic/core/quic_default_clock.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_publisher.h"
#include "quiche/quic/moqt/tools/moqt_mock_visitor.h"
@@ -20,10 +23,21 @@
namespace test {
+namespace {
+
using ::testing::_;
using ::testing::Invoke;
using ::testing::Return;
+class AlarmDelegate : public quic::QuicAlarm::DelegateWithoutContext {
+ public:
+ AlarmDelegate(bool* fired) : fired_(fired) {}
+ void OnAlarm() override { *fired_ = true; }
+ bool* fired_;
+};
+
+} // namespace
+
class SubscribeRemoteTrackPeer {
public:
static MoqtFetchTask* GetFetchTask(SubscribeRemoteTrack* track) {
@@ -79,56 +93,6 @@
EXPECT_FALSE(track_.InWindow(FullSequence(2, 0)));
}
-TEST_F(SubscribeRemoteTrackTest, JoiningFetch) {
- auto fetch_task = std::make_unique<MockFetchTask>();
- MockFetchTask* fetch = fetch_task.get();
- EXPECT_CALL(*fetch, GetStatus()).WillRepeatedly(Return(absl::OkStatus()));
- EXPECT_CALL(*fetch, GetNextObject(_))
- .WillOnce(Invoke([](PublishedObject& object) {
- object.sequence = FullSequence(0, 0);
- object.status = MoqtObjectStatus::kNormal;
- object.publisher_priority = 128;
- object.payload = quic::test::MemSliceFromString("foobar");
- object.fin_after_this = false;
- return MoqtFetchTask::GetNextObjectResult::kSuccess;
- }))
- .WillOnce(Return(MoqtFetchTask::GetNextObjectResult::kPending));
- EXPECT_CALL(visitor_, OnObjectFragment).Times(1);
- track_.OnJoiningFetchReady(std::move(fetch_task));
-
- EXPECT_CALL(*fetch, GetNextObject(_))
- .WillOnce(Invoke([](PublishedObject& object) {
- object.sequence = FullSequence(0, 1);
- object.status = MoqtObjectStatus::kNormal;
- object.publisher_priority = 128;
- object.payload = quic::test::MemSliceFromString("foobar");
- object.fin_after_this = false;
- return MoqtFetchTask::GetNextObjectResult::kSuccess;
- }))
- .WillOnce(Return(MoqtFetchTask::GetNextObjectResult::kEof));
- EXPECT_CALL(visitor_, OnObjectFragment).Times(1);
- fetch->objects_available_callback()();
- EXPECT_EQ(SubscribeRemoteTrackPeer::GetFetchTask(&track_), nullptr);
-}
-
-TEST_F(SubscribeRemoteTrackTest, JoiningFetchBadStatus) {
- auto fetch_task = std::make_unique<MockFetchTask>();
- MockFetchTask* fetch = fetch_task.get();
- EXPECT_CALL(*fetch, GetStatus()).WillOnce(Return(absl::NotFoundError("foo")));
- track_.OnJoiningFetchReady(std::move(fetch_task));
- EXPECT_EQ(SubscribeRemoteTrackPeer::GetFetchTask(&track_), nullptr);
-}
-
-TEST_F(SubscribeRemoteTrackTest, JoiningFetchErrorReturn) {
- auto fetch_task = std::make_unique<MockFetchTask>();
- MockFetchTask* fetch = fetch_task.get();
- EXPECT_CALL(*fetch, GetStatus()).WillRepeatedly(Return(absl::OkStatus()));
- EXPECT_CALL(*fetch, GetNextObject(_))
- .WillOnce(Return(MoqtFetchTask::GetNextObjectResult::kError));
- track_.OnJoiningFetchReady(std::move(fetch_task));
- EXPECT_EQ(SubscribeRemoteTrackPeer::GetFetchTask(&track_), nullptr);
-}
-
class UpstreamFetchTest : public quic::test::QuicTest {
protected:
UpstreamFetchTest()
diff --git a/quiche/quic/moqt/test_tools/moqt_session_peer.h b/quiche/quic/moqt/test_tools/moqt_session_peer.h
index 052a76a..4368ed4 100644
--- a/quiche/quic/moqt/test_tools/moqt_session_peer.h
+++ b/quiche/quic/moqt/test_tools/moqt_session_peer.h
@@ -232,6 +232,11 @@
->delivery_timeout_alarm_.get();
}
+ static quic::QuicAlarm* GetSubscribeDoneAlarm(
+ SubscribeRemoteTrack* subscription) {
+ return subscription->subscribe_done_alarm_.get();
+ }
+
static quic::QuicAlarm* GetGoAwayTimeoutAlarm(MoqtSession* session) {
return session->goaway_timeout_alarm_.get();
}
diff --git a/quiche/quic/moqt/test_tools/moqt_test_message.h b/quiche/quic/moqt/test_tools/moqt_test_message.h
index 3021a2b..9da46f0 100644
--- a/quiche/quic/moqt/test_tools/moqt_test_message.h
+++ b/quiche/quic/moqt/test_tools/moqt_test_message.h
@@ -736,40 +736,36 @@
QUIC_LOG(INFO) << "SUBSCRIBE_DONE status code mismatch";
return false;
}
+ if (cast.stream_count != subscribe_done_.stream_count) {
+ QUIC_LOG(INFO) << "SUBSCRIBE_DONE stream count mismatch";
+ return false;
+ }
if (cast.reason_phrase != subscribe_done_.reason_phrase) {
QUIC_LOG(INFO) << "SUBSCRIBE_DONE reason phrase mismatch";
return false;
}
- if (cast.final_id != subscribe_done_.final_id) {
- QUIC_LOG(INFO) << "SUBSCRIBE_DONE final ID mismatch";
- return false;
- }
+
return true;
}
- void ExpandVarints() override { ExpandVarintsImpl("vvvvv---vv"); }
+ void ExpandVarints() override { ExpandVarintsImpl("vvvvvv--"); }
MessageStructuredData structured_data() const override {
return TestMessageBase::MessageStructuredData(subscribe_done_);
}
- void SetInvalidContentExists() {
- raw_packet_[7] = 0x02;
- SetWireImage(raw_packet_, sizeof(raw_packet_));
- }
-
private:
- uint8_t raw_packet_[10] = {
- 0x0b, 0x08, 0x02, 0x02, // subscribe_id = 2, error_code = 2,
+ uint8_t raw_packet_[8] = {
+ 0x0b, 0x06, 0x02, 0x02, // subscribe_id = 2, error_code = 2,
+ 0x05, // stream_count = 5
0x02, 0x68, 0x69, // reason_phrase = "hi"
- 0x01, 0x08, 0x0c, // final_id = (8,12)
};
MoqtSubscribeDone subscribe_done_ = {
/*subscribe_id=*/2,
/*error_code=*/SubscribeDoneCode::kTrackEnded,
+ /*stream_count=*/5,
/*reason_phrase=*/"hi",
- /*final_id=*/FullSequence(8, 12),
};
};
diff --git a/quiche/quic/moqt/tools/chat_client.h b/quiche/quic/moqt/tools/chat_client.h
index b56142c..046bfeb 100644
--- a/quiche/quic/moqt/tools/chat_client.h
+++ b/quiche/quic/moqt/tools/chat_client.h
@@ -104,6 +104,8 @@
absl::string_view object,
bool end_of_message) override;
+ void OnSubscribeDone(FullTrackName full_track_name) override {}
+
private:
ChatClient* client_;
};
diff --git a/quiche/quic/moqt/tools/chat_server.h b/quiche/quic/moqt/tools/chat_server.h
index 5a4c3b9..d9cdc70 100644
--- a/quiche/quic/moqt/tools/chat_server.h
+++ b/quiche/quic/moqt/tools/chat_server.h
@@ -47,6 +47,7 @@
moqt::MoqtPriority /*publisher_priority*/,
moqt::MoqtObjectStatus /*status*/,
absl::string_view object, bool end_of_message) override;
+ void OnSubscribeDone(FullTrackName /*full_track_name*/) override {}
private:
ChatServer* server_;
diff --git a/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc b/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc
index 691ea9e..d19bff3 100644
--- a/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc
+++ b/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc
@@ -192,6 +192,8 @@
output.close();
}
+ void OnSubscribeDone(FullTrackName /*full_track_name*/) override {}
+
private:
std::string directory_;
};
diff --git a/quiche/quic/moqt/tools/moqt_mock_visitor.h b/quiche/quic/moqt/tools/moqt_mock_visitor.h
index c91bca5..6cb7007 100644
--- a/quiche/quic/moqt/tools/moqt_mock_visitor.h
+++ b/quiche/quic/moqt/tools/moqt_mock_visitor.h
@@ -101,6 +101,8 @@
MoqtPriority publisher_priority, MoqtObjectStatus status,
absl::string_view object, bool end_of_message),
(override));
+ MOCK_METHOD(void, OnSubscribeDone, (FullTrackName full_track_name),
+ (override));
};
class MockPublishingMonitorInterface : public MoqtPublishingMonitorInterface {
diff --git a/quiche/quic/moqt/tools/moqt_simulator_bin.cc b/quiche/quic/moqt/tools/moqt_simulator_bin.cc
index edf965a..988ddee 100644
--- a/quiche/quic/moqt/tools/moqt_simulator_bin.cc
+++ b/quiche/quic/moqt/tools/moqt_simulator_bin.cc
@@ -316,6 +316,8 @@
OnFullObject(sequence, object);
}
+ void OnSubscribeDone(FullTrackName /*full_track_name*/) override {}
+
void OnFullObject(FullSequence sequence, absl::string_view payload) {
QUICHE_CHECK_GE(payload.size(), 8u);
quiche::QuicheDataReader reader(payload);