Implement MoQT SUBSCRIBE_DONE.

PiperOrigin-RevId: 735875434
diff --git a/quiche/quic/moqt/moqt_framer.cc b/quiche/quic/moqt/moqt_framer.cc
index fb324f7..60ba2ea 100644
--- a/quiche/quic/moqt/moqt_framer.cc
+++ b/quiche/quic/moqt/moqt_framer.cc
@@ -571,18 +571,10 @@
 
 quiche::QuicheBuffer MoqtFramer::SerializeSubscribeDone(
     const MoqtSubscribeDone& message) {
-  if (message.final_id.has_value()) {
-    return SerializeControlMessage(
-        MoqtMessageType::kSubscribeDone, WireVarInt62(message.subscribe_id),
-        WireVarInt62(message.status_code),
-        WireStringWithVarInt62Length(message.reason_phrase), WireUint8(1),
-        WireVarInt62(message.final_id->group),
-        WireVarInt62(message.final_id->object));
-  }
   return SerializeControlMessage(
       MoqtMessageType::kSubscribeDone, WireVarInt62(message.subscribe_id),
-      WireVarInt62(message.status_code),
-      WireStringWithVarInt62Length(message.reason_phrase), WireUint8(0));
+      WireVarInt62(message.status_code), WireVarInt62(message.stream_count),
+      WireStringWithVarInt62Length(message.reason_phrase));
 }
 
 quiche::QuicheBuffer MoqtFramer::SerializeSubscribeUpdate(
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc
index 992c3e9..71e59a0 100644
--- a/quiche/quic/moqt/moqt_integration_test.cc
+++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -79,6 +79,19 @@
     ConnectEndpoints();
   }
 
+  // Client subscribes to the latest object in |track_name|.
+  void SubscribeLatestObject(FullTrackName track_name,
+                             MockSubscribeRemoteTrackVisitor* visitor) {
+    bool received_ok = false;
+    EXPECT_CALL(*visitor, OnReply(track_name, std::optional<FullSequence>(),
+                                  std::optional<absl::string_view>()))
+        .WillOnce([&]() { received_ok = true; });
+    client_->session()->SubscribeCurrentObject(track_name, visitor);
+    bool success =
+        test_harness_.RunUntilWithDefaultTimeout([&]() { return received_ok; });
+    EXPECT_TRUE(success);
+  }
+
  protected:
   quic::simulator::TestHarness test_harness_;
 
@@ -423,6 +436,7 @@
 
   MoqtKnownTrackPublisher publisher;
   server_->session()->set_publisher(&publisher);
+  // TODO(martinduke): Unmock this.
   auto track_publisher = std::make_shared<MockTrackPublisher>(full_track_name);
   publisher.Add(track_publisher);
 
@@ -443,6 +457,7 @@
 
   MoqtKnownTrackPublisher publisher;
   server_->session()->set_publisher(&publisher);
+  // TODO(martinduke): Unmock this.
   auto track_publisher = std::make_shared<MockTrackPublisher>(full_track_name);
   publisher.Add(track_publisher);
 
@@ -463,6 +478,7 @@
 
   MoqtKnownTrackPublisher publisher;
   server_->session()->set_publisher(&publisher);
+  // TODO(martinduke): Unmock this.
   auto track_publisher = std::make_shared<MockTrackPublisher>(full_track_name);
   publisher.Add(track_publisher);
 
@@ -491,6 +507,48 @@
   EXPECT_TRUE(success);
 }
 
+TEST_F(MoqtIntegrationTest, CleanSubscribeDone) {
+  EstablishSession();
+  FullTrackName full_track_name("foo", "bar");
+
+  MoqtKnownTrackPublisher publisher;
+  server_->session()->set_publisher(&publisher);
+  auto queue = std::make_shared<MoqtLiveRelayQueue>(
+      full_track_name, MoqtForwardingPreference::kSubgroup);
+  publisher.Add(queue);
+
+  MockSubscribeRemoteTrackVisitor client_visitor;
+  SubscribeLatestObject(full_track_name, &client_visitor);
+
+  // Deliver 3 objects on 2 streams.
+  queue->AddObject(FullSequence(0, 0), "object,0,0", false);
+  queue->AddObject(FullSequence(0, 1), "object,0,1", true);
+  queue->AddObject(FullSequence(1, 0), "object,1,0", true);
+  int received = 0;
+  EXPECT_CALL(client_visitor, OnObjectFragment).WillRepeatedly([&]() {
+    ++received;
+  });
+  bool success =
+      test_harness_.RunUntilWithDefaultTimeout([&]() { return received == 3; });
+  EXPECT_TRUE(success);
+
+  // Reject this subscribe because there already is one.
+  EXPECT_FALSE(client_->session()->SubscribeCurrentGroup(full_track_name,
+                                                         &client_visitor));
+  queue->RemoveAllSubscriptions();  // Induce a SUBSCRIBE_DONE.
+  bool subscribe_done = false;
+  EXPECT_CALL(client_visitor, OnSubscribeDone).WillOnce([&]() {
+    subscribe_done = true;
+  });
+  success = test_harness_.RunUntilWithDefaultTimeout(
+      [&]() { return subscribe_done; });
+  EXPECT_TRUE(success);
+  // Subscription is deleted; the client session should not immediately reject
+  // a new attempt.
+  EXPECT_TRUE(client_->session()->SubscribeCurrentGroup(full_track_name,
+                                                        &client_visitor));
+}
+
 TEST_F(MoqtIntegrationTest, ObjectAcks) {
   CreateDefaultEndpoints();
   WireUpEndpoints();
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h
index 8c01b17..af78e4d 100644
--- a/quiche/quic/moqt/moqt_messages.h
+++ b/quiche/quic/moqt/moqt_messages.h
@@ -445,8 +445,8 @@
 struct QUICHE_EXPORT MoqtSubscribeDone {
   uint64_t subscribe_id;
   SubscribeDoneCode status_code;
+  uint64_t stream_count;
   std::string reason_phrase;
-  std::optional<FullSequence> final_id;
 };
 
 struct QUICHE_EXPORT MoqtSubscribeUpdate {
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc
index cef87a8..5233a6b 100644
--- a/quiche/quic/moqt/moqt_parser.cc
+++ b/quiche/quic/moqt/moqt_parser.cc
@@ -479,26 +479,14 @@
 
 size_t MoqtControlParser::ProcessSubscribeDone(quic::QuicDataReader& reader) {
   MoqtSubscribeDone subscribe_done;
-  uint8_t content_exists;
   uint64_t value;
   if (!reader.ReadVarInt62(&subscribe_done.subscribe_id) ||
       !reader.ReadVarInt62(&value) ||
-      !reader.ReadStringVarInt62(subscribe_done.reason_phrase) ||
-      !reader.ReadUInt8(&content_exists)) {
+      !reader.ReadVarInt62(&subscribe_done.stream_count) ||
+      !reader.ReadStringVarInt62(subscribe_done.reason_phrase)) {
     return 0;
   }
   subscribe_done.status_code = static_cast<SubscribeDoneCode>(value);
-  if (content_exists > 1) {
-    ParseError("SUBSCRIBE_DONE ContentExists has invalid value");
-    return 0;
-  }
-  if (content_exists == 1) {
-    subscribe_done.final_id = FullSequence();
-    if (!reader.ReadVarInt62(&subscribe_done.final_id->group) ||
-        !reader.ReadVarInt62(&subscribe_done.final_id->object)) {
-      return 0;
-    }
-  }
   visitor_.OnSubscribeDoneMessage(subscribe_done);
   return reader.PreviouslyReadPayload().length();
 }
diff --git a/quiche/quic/moqt/moqt_parser_test.cc b/quiche/quic/moqt/moqt_parser_test.cc
index 267cdb1..03b9856 100644
--- a/quiche/quic/moqt/moqt_parser_test.cc
+++ b/quiche/quic/moqt/moqt_parser_test.cc
@@ -1158,17 +1158,6 @@
             "Invalid group order value in SUBSCRIBE_OK");
 }
 
-TEST_F(MoqtMessageSpecificTest, SubscribeDoneInvalidContentExists) {
-  MoqtControlParser parser(kRawQuic, visitor_);
-  SubscribeDoneMessage subscribe_done;
-  subscribe_done.SetInvalidContentExists();
-  parser.ProcessData(subscribe_done.PacketSample(), false);
-  EXPECT_EQ(visitor_.messages_received_, 0);
-  EXPECT_TRUE(visitor_.parsing_error_.has_value());
-  EXPECT_EQ(*visitor_.parsing_error_,
-            "SUBSCRIBE_DONE ContentExists has invalid value");
-}
-
 TEST_F(MoqtMessageSpecificTest, FetchInvalidRange) {
   MoqtControlParser parser(kRawQuic, visitor_);
   FetchMessage fetch;
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index d9b2c1b..7b1f300 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -420,10 +420,7 @@
   MoqtUnsubscribe message;
   message.subscribe_id = track->subscribe_id();
   SendControlMessage(framer_.SerializeUnsubscribe(message));
-  // Destroy state.
-  subscribe_by_name_.erase(name);
-  subscribe_by_alias_.erase(track->track_alias());
-  upstream_by_id_.erase(track->subscribe_id());
+  DestroySubscription(track);
 }
 
 bool MoqtSession::Fetch(const FullTrackName& name,
@@ -605,8 +602,8 @@
   MoqtSubscribeDone subscribe_done;
   subscribe_done.subscribe_id = subscribe_id;
   subscribe_done.status_code = code;
+  subscribe_done.stream_count = subscription.streams_opened();
   subscribe_done.reason_phrase = reason_phrase;
-  subscribe_done.final_id = subscription.largest_sent();
   SendControlMessage(framer_.SerializeSubscribeDone(subscribe_done));
   QUIC_DLOG(INFO) << ENDPOINT << "Sent SUBSCRIBE_DONE message for "
                   << subscription.publisher().GetTrackName();
@@ -622,6 +619,18 @@
   return true;
 }
 
+void MoqtSession::MaybeDestroySubscription(SubscribeRemoteTrack* subscribe) {
+  if (subscribe != nullptr && subscribe->all_streams_closed()) {
+    DestroySubscription(subscribe);
+  }
+}
+
+void MoqtSession::DestroySubscription(SubscribeRemoteTrack* subscribe) {
+  subscribe->visitor()->OnSubscribeDone(subscribe->full_track_name());
+  subscribe_by_name_.erase(subscribe->full_track_name());
+  subscribe_by_alias_.erase(subscribe->track_alias());
+}
+
 bool MoqtSession::Subscribe(MoqtSubscribe& message,
                             SubscribeRemoteTrack::Visitor* visitor,
                             std::optional<uint64_t> provided_track_alias) {
@@ -683,7 +692,10 @@
   SendControlMessage(framer_.SerializeSubscribe(message));
   QUIC_DLOG(INFO) << ENDPOINT << "Sent SUBSCRIBE message for "
                   << message.full_track_name;
-  auto track = std::make_unique<SubscribeRemoteTrack>(message, visitor);
+  auto track = std::make_unique<SubscribeRemoteTrack>(
+      message, visitor,
+      message.parameters.delivery_timeout.value_or(
+          quic::QuicTimeDelta::Infinite()));
   subscribe_by_name_.emplace(message.full_track_name, track.get());
   subscribe_by_alias_.emplace(message.track_alias, track.get());
   upstream_by_id_.emplace(message.subscribe_id, std::move(track));
@@ -1056,13 +1068,21 @@
                     "Received SUBSCRIBE_OK for a FETCH");
     return;
   }
-  QUIC_DLOG(INFO) << ENDPOINT << "Received the SUBSCRIBE_OK for "
-                  << "subscribe_id = " << message.subscribe_id << " "
-                  << track->full_track_name();
+  if (message.largest_id.has_value()) {
+    QUIC_DLOG(INFO) << ENDPOINT << "Received the SUBSCRIBE_OK for "
+                    << "subscribe_id = " << message.subscribe_id << " "
+                    << track->full_track_name()
+                    << " largest_id = " << *message.largest_id;
+  } else {
+    QUIC_DLOG(INFO) << ENDPOINT << "Received the SUBSCRIBE_OK for "
+                    << "subscribe_id = " << message.subscribe_id << " "
+                    << track->full_track_name();
+  }
   SubscribeRemoteTrack* subscribe = static_cast<SubscribeRemoteTrack*>(track);
   subscribe->OnObjectOrOk();
   // TODO(martinduke): Handle expires field.
   // TODO(martinduke): Resize the window based on largest_id.
+  // TODO(martinduke): Handle delivery_timeout parameter.
   if (subscribe->visitor() != nullptr) {
     subscribe->visitor()->OnReply(track->full_track_name(), message.largest_id,
                                   std::nullopt);
@@ -1123,6 +1143,22 @@
   session_->published_subscriptions_.erase(it);
 }
 
+void MoqtSession::ControlStream::OnSubscribeDoneMessage(
+    const MoqtSubscribeDone& message) {
+  auto it = session_->upstream_by_id_.find(message.subscribe_id);
+  if (it == session_->upstream_by_id_.end()) {
+    return;
+  }
+  auto* subscribe = static_cast<SubscribeRemoteTrack*>(it->second.get());
+  QUIC_DLOG(INFO) << ENDPOINT << "Received a SUBSCRIBE_DONE for "
+                  << it->second->full_track_name();
+  subscribe->OnSubscribeDone(
+      message.stream_count, session_->callbacks_.clock,
+      absl::WrapUnique(session_->alarm_factory_->CreateAlarm(
+          new SubscribeDoneDelegate(session_, subscribe))));
+  session_->MaybeDestroySubscription(subscribe);
+}
+
 void MoqtSession::ControlStream::OnSubscribeUpdateMessage(
     const MoqtSubscribeUpdate& message) {
   auto it = session_->published_subscriptions_.find(message.subscribe_id);
@@ -1606,11 +1642,28 @@
 }
 
 MoqtSession::IncomingDataStream::~IncomingDataStream() {
-  if (parser_.track_alias().has_value() &&
-      parser_.stream_type() == MoqtDataStreamType::kStreamHeaderFetch &&
-      track_.IsValid()) {
+  QUICHE_DVLOG(1) << ENDPOINT << "Destroying incoming data stream "
+                  << stream_->GetStreamId();
+  if (!parser_.track_alias().has_value()) {
+    QUIC_DVLOG(1) << ENDPOINT
+                  << "Destroying incoming data stream before "
+                     "learning track alias";
+    return;
+  }
+  if (!track_.IsValid()) {
+    return;
+  }
+  if (parser_.stream_type() == MoqtDataStreamType::kStreamHeaderFetch) {
     session_->upstream_by_id_.erase(*parser_.track_alias());
   }
+  // It's a subscribe.
+  SubscribeRemoteTrack* subscribe =
+      static_cast<SubscribeRemoteTrack*>(track_.GetIfAvailable());
+  if (subscribe == nullptr) {
+    return;
+  }
+  subscribe->OnStreamClosed();
+  session_->MaybeDestroySubscription(subscribe);
 }
 
 void MoqtSession::IncomingDataStream::MaybeReadOneObject() {
@@ -1648,17 +1701,31 @@
       return;
     }
   }
-  if (parser_.stream_type() != MoqtDataStreamType::kStreamHeaderFetch) {
+  bool knew_track_alias = parser_.track_alias().has_value();
+  if (parser_.stream_type() == MoqtDataStreamType::kStreamHeaderSubgroup) {
     parser_.ReadAllData();
+  } else if (!knew_track_alias) {
+    parser_.ReadTrackAlias();
+  }
+  if (!parser_.track_alias().has_value()) {
     return;
   }
-  bool learned_track_alias = false;
-  if (!parser_.track_alias().has_value()) {
-    learned_track_alias = true;
-    parser_.ReadTrackAlias();
-    if (!parser_.track_alias().has_value()) {
+  if (parser_.stream_type() == MoqtDataStreamType::kStreamHeaderSubgroup) {
+    if (knew_track_alias) {
       return;
     }
+    // This is a new stream for a subscribe. Notify the subscription.
+    auto it = session_->subscribe_by_alias_.find(*parser_.track_alias());
+    if (it == session_->subscribe_by_alias_.end()) {
+      QUIC_DLOG(INFO) << ENDPOINT
+                      << "Received object for a track with no SUBSCRIBE";
+      // This is a not a session error because there might be an UNSUBSCRIBE in
+      // flight.
+      stream_->SendStopSending(kResetCodeSubscriptionGone);
+      return;
+    }
+    it->second->OnStreamOpened();
+    return;
   }
   auto it = session_->upstream_by_id_.find(*parser_.track_alias());
   if (it == session_->upstream_by_id_.end()) {
@@ -1674,7 +1741,7 @@
     return;
   }
   UpstreamFetch* fetch = static_cast<UpstreamFetch*>(it->second.get());
-  if (learned_track_alias) {
+  if (!knew_track_alias) {
     // If the task already exists (FETCH_OK has arrived), the callback will
     // immediately execute to read the first object. Otherwise, it will only
     // execute when the task is created or a cached object is read.
@@ -1986,6 +2053,7 @@
 
 void MoqtSession::PublishedSubscription::OnDataStreamCreated(
     webtransport::StreamId id, FullSequence start_sequence) {
+  ++streams_opened_;
   stream_map().AddStream(start_sequence, id);
 }
 void MoqtSession::PublishedSubscription::OnDataStreamDestroyed(
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index 482d70d..a62e58e 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -284,8 +284,7 @@
     void OnSubscribeErrorMessage(const MoqtSubscribeError& message) override;
     void OnUnsubscribeMessage(const MoqtUnsubscribe& message) override;
     // There is no state to update for SUBSCRIBE_DONE.
-    void OnSubscribeDoneMessage(const MoqtSubscribeDone& /*message*/) override {
-    }
+    void OnSubscribeDoneMessage(const MoqtSubscribeDone& /*message*/) override;
     void OnSubscribeUpdateMessage(const MoqtSubscribeUpdate& message) override;
     void OnAnnounceMessage(const MoqtAnnounce& message) override;
     void OnAnnounceOkMessage(const MoqtAnnounceOk& message) override;
@@ -478,6 +477,8 @@
       return reset_subgroups_;
     }
 
+    uint64_t streams_opened() const { return streams_opened_; }
+
    private:
     SendStreamMap& stream_map();
     quic::Perspective perspective() const {
@@ -498,6 +499,7 @@
     uint64_t track_alias_;
     SubscribeWindow window_;
     MoqtPriority subscriber_priority_;
+    uint64_t streams_opened_ = 0;
 
     // The subscription will ignore any groups with a lower ID, so it doesn't
     // need to track reset subgroups.
@@ -643,11 +645,25 @@
     MoqtSession* session_;
   };
 
-  // Private members of MoqtSession.
+  class SubscribeDoneDelegate : public quic::QuicAlarm::DelegateWithoutContext {
+   public:
+    SubscribeDoneDelegate(MoqtSession* session, SubscribeRemoteTrack* subscribe)
+        : session_(session), subscribe_(subscribe) {}
 
+    void OnAlarm() override { session_->DestroySubscription(subscribe_); }
+
+   private:
+    MoqtSession* session_;
+    SubscribeRemoteTrack* subscribe_;
+  };
+
+  // Private members of MoqtSession.
   // Returns true if SUBSCRIBE_DONE was sent.
   bool SubscribeIsDone(uint64_t subscribe_id, SubscribeDoneCode code,
                        absl::string_view reason_phrase);
+  void MaybeDestroySubscription(SubscribeRemoteTrack* subscribe);
+  void DestroySubscription(SubscribeRemoteTrack* subscribe);
+
   // Returns the pointer to the control stream, or nullptr if none is present.
   ControlStream* GetControlStream();
   // Sends a message on the control stream; QUICHE_DCHECKs if no control stream
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index d511ef7..c4e9332 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -4,6 +4,7 @@
 
 #include "quiche/quic/moqt/moqt_session.h"
 
+#include <algorithm>
 #include <cstdint>
 #include <cstring>
 #include <memory>
@@ -115,6 +116,61 @@
     EXPECT_CALL(session_callbacks_.session_deleted_callback, Call());
   }
 
+  // If visitor == nullptr, it's the first object in the stream, and will be
+  // assigned to the visitor the session creates.
+  // TODO(martinduke): Support delivering object payload.
+  void DeliverObject(MoqtObject& object, bool fin,
+                     webtransport::test::MockSession& session,
+                     webtransport::test::MockStream* stream,
+                     std::unique_ptr<webtransport::StreamVisitor>& visitor,
+                     MockSubscribeRemoteTrackVisitor* track_visitor) {
+    MoqtFramer framer(quiche::SimpleBufferAllocator::Get(), true);
+    quiche::QuicheBuffer buffer = framer.SerializeObjectHeader(
+        object, MoqtDataStreamType::kStreamHeaderSubgroup, visitor == nullptr);
+    size_t data_read = 0;
+    if (visitor == nullptr) {  // It's the first object in the stream
+      EXPECT_CALL(session, AcceptIncomingUnidirectionalStream())
+          .WillOnce(Return(stream))
+          .WillOnce(Return(nullptr));
+      EXPECT_CALL(*stream, SetVisitor(_))
+          .WillOnce(Invoke(
+              [&](std::unique_ptr<webtransport::StreamVisitor> new_visitor) {
+                visitor = std::move(new_visitor);
+              }));
+      EXPECT_CALL(*stream, visitor()).WillRepeatedly(Invoke([&]() {
+        return visitor.get();
+      }));
+    }
+    EXPECT_CALL(*stream, PeekNextReadableRegion()).WillRepeatedly(Invoke([&]() {
+      return quiche::ReadStream::PeekResult(
+          absl::string_view(buffer.data() + data_read,
+                            buffer.size() - data_read),
+          fin && data_read == buffer.size(), fin, );
+    }));
+    EXPECT_CALL(*stream, ReadableBytes()).WillRepeatedly(Invoke([&]() {
+      return buffer.size() - data_read;
+    }));
+    EXPECT_CALL(*stream, Read(testing::An<absl::Span<char>>()))
+        .WillRepeatedly(Invoke([&](absl::Span<char> bytes_to_read) {
+          size_t read_size =
+              std::min(bytes_to_read.size(), buffer.size() - data_read);
+          memcpy(bytes_to_read.data(), buffer.data() + data_read, read_size);
+          data_read += read_size;
+          return quiche::ReadStream::ReadResult(
+              read_size, fin && data_read == buffer.size());
+        }));
+    EXPECT_CALL(*stream, SkipBytes(_)).WillRepeatedly(Invoke([&](size_t bytes) {
+      data_read += bytes;
+      return fin && data_read == buffer.size();
+    }));
+    EXPECT_CALL(*track_visitor, OnObjectFragment).Times(1);
+    if (visitor == nullptr) {
+      session_.OnIncomingUnidirectionalStreamAvailable();
+    } else {
+      visitor->OnCanRead();
+    }
+  }
+
   MockSessionCallbacks session_callbacks_;
   webtransport::test::MockSession mock_session_;
   MoqtSession session_;
@@ -3132,6 +3188,178 @@
   EXPECT_TRUE(reported_error);
 }
 
+TEST_F(MoqtSessionTest, ReceiveSubscribeDoneWithOpenStreams) {
+  MockSubscribeRemoteTrackVisitor remote_track_visitor;
+  webtransport::test::MockStream control_stream;
+  std::unique_ptr<MoqtControlParserVisitor> stream_input =
+      MoqtSessionPeer::CreateControlStream(&session_, &control_stream);
+  EXPECT_CALL(mock_session_, GetStreamById(_))
+      .WillRepeatedly(Return(&control_stream));
+  EXPECT_CALL(control_stream,
+              Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _));
+  EXPECT_TRUE(session_.SubscribeCurrentObject(FullTrackName("foo", "bar"),
+                                              &remote_track_visitor));
+  MoqtSubscribeOk ok = {
+      /*subscribe_id=*/0,
+      /*expires=*/quic::QuicTimeDelta::FromMilliseconds(10000),
+      /*group_order=*/MoqtDeliveryOrder::kAscending,
+      /*largest_id=*/std::nullopt,
+      /*parameters=*/MoqtSubscribeParameters(),
+  };
+  stream_input->OnSubscribeOkMessage(ok);
+  constexpr uint64_t kNumStreams = 3;
+  webtransport::test::MockStream data[kNumStreams];
+  std::unique_ptr<webtransport::StreamVisitor> data_streams[kNumStreams];
+
+  MoqtObject object = {
+      /*track_alias=*/0,
+      /*group_id=*/0,
+      /*object_id=*/0,
+      /*publisher_priority=*/7,
+      std::vector<MoqtExtensionHeader>(),
+      /*object_status=*/MoqtObjectStatus::kGroupDoesNotExist,
+      /*subgroup_id=*/0,
+      /*payload_length=*/0,
+  };
+  for (uint64_t i = 0; i < kNumStreams; ++i) {
+    EXPECT_CALL(data[i], GetStreamId())
+        .WillRepeatedly(Return(kOutgoingUniStreamId + i * 4));
+    EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId + i * 4))
+        .WillRepeatedly(Return(&data[i]));
+    object.group_id = i;
+    DeliverObject(object, false, mock_session_, &data[i], data_streams[i],
+                  &remote_track_visitor);
+  }
+  SubscribeRemoteTrack* track = MoqtSessionPeer::remote_track(&session_, 0);
+  ASSERT_NE(track, nullptr);
+  EXPECT_FALSE(track->all_streams_closed());
+  stream_input->OnSubscribeDoneMessage(
+      MoqtSubscribeDone(0, SubscribeDoneCode::kTrackEnded, kNumStreams, "foo"));
+  track = MoqtSessionPeer::remote_track(&session_, 0);
+  ASSERT_NE(track, nullptr);
+  EXPECT_FALSE(track->all_streams_closed());
+  EXPECT_CALL(remote_track_visitor, OnSubscribeDone(_));
+  for (uint64_t i = 0; i < kNumStreams; ++i) {
+    data_streams[i].reset();
+  }
+  EXPECT_EQ(MoqtSessionPeer::remote_track(&session_, 0), nullptr);
+}
+
+TEST_F(MoqtSessionTest, ReceiveSubscribeDoneWithClosedStreams) {
+  MockSubscribeRemoteTrackVisitor remote_track_visitor;
+  webtransport::test::MockStream control_stream;
+  std::unique_ptr<MoqtControlParserVisitor> stream_input =
+      MoqtSessionPeer::CreateControlStream(&session_, &control_stream);
+  EXPECT_CALL(mock_session_, GetStreamById(_))
+      .WillRepeatedly(Return(&control_stream));
+  EXPECT_CALL(control_stream,
+              Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _));
+  EXPECT_TRUE(session_.SubscribeCurrentObject(FullTrackName("foo", "bar"),
+                                              &remote_track_visitor));
+  MoqtSubscribeOk ok = {
+      /*subscribe_id=*/0,
+      /*expires=*/quic::QuicTimeDelta::FromMilliseconds(10000),
+      /*group_order=*/MoqtDeliveryOrder::kAscending,
+      /*largest_id=*/std::nullopt,
+      /*parameters=*/MoqtSubscribeParameters(),
+  };
+  stream_input->OnSubscribeOkMessage(ok);
+  constexpr uint64_t kNumStreams = 3;
+  webtransport::test::MockStream data[kNumStreams];
+  std::unique_ptr<webtransport::StreamVisitor> data_streams[kNumStreams];
+
+  MoqtObject object = {
+      /*track_alias=*/0,
+      /*group_id=*/0,
+      /*object_id=*/0,
+      /*publisher_priority=*/7,
+      std::vector<MoqtExtensionHeader>(),
+      /*object_status=*/MoqtObjectStatus::kGroupDoesNotExist,
+      /*subgroup_id=*/0,
+      /*payload_length=*/0,
+  };
+  for (uint64_t i = 0; i < kNumStreams; ++i) {
+    EXPECT_CALL(data[i], GetStreamId())
+        .WillRepeatedly(Return(kOutgoingUniStreamId + i * 4));
+    EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId + i * 4))
+        .WillRepeatedly(Return(&data[i]));
+    object.group_id = i;
+    DeliverObject(object, true, mock_session_, &data[i], data_streams[i],
+                  &remote_track_visitor);
+  }
+  for (uint64_t i = 0; i < kNumStreams; ++i) {
+    data_streams[i].reset();
+  }
+  SubscribeRemoteTrack* track = MoqtSessionPeer::remote_track(&session_, 0);
+  ASSERT_NE(track, nullptr);
+  EXPECT_FALSE(track->all_streams_closed());
+  EXPECT_CALL(remote_track_visitor, OnSubscribeDone(_));
+  stream_input->OnSubscribeDoneMessage(
+      MoqtSubscribeDone(0, SubscribeDoneCode::kTrackEnded, kNumStreams, "foo"));
+  EXPECT_EQ(MoqtSessionPeer::remote_track(&session_, 0), nullptr);
+}
+
+TEST_F(MoqtSessionTest, SubscribeDoneTimeout) {
+  MockSubscribeRemoteTrackVisitor remote_track_visitor;
+  webtransport::test::MockStream control_stream;
+  std::unique_ptr<MoqtControlParserVisitor> stream_input =
+      MoqtSessionPeer::CreateControlStream(&session_, &control_stream);
+  EXPECT_CALL(mock_session_, GetStreamById(_))
+      .WillRepeatedly(Return(&control_stream));
+  EXPECT_CALL(control_stream,
+              Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _));
+  EXPECT_TRUE(session_.SubscribeCurrentObject(FullTrackName("foo", "bar"),
+                                              &remote_track_visitor));
+  MoqtSubscribeOk ok = {
+      /*subscribe_id=*/0,
+      /*expires=*/quic::QuicTimeDelta::FromMilliseconds(10000),
+      /*group_order=*/MoqtDeliveryOrder::kAscending,
+      /*largest_id=*/std::nullopt,
+      /*parameters=*/MoqtSubscribeParameters(),
+  };
+  stream_input->OnSubscribeOkMessage(ok);
+  constexpr uint64_t kNumStreams = 3;
+  webtransport::test::MockStream data[kNumStreams];
+  std::unique_ptr<webtransport::StreamVisitor> data_streams[kNumStreams];
+
+  MoqtObject object = {
+      /*track_alias=*/0,
+      /*group_id=*/0,
+      /*object_id=*/0,
+      /*publisher_priority=*/7,
+      std::vector<MoqtExtensionHeader>(),
+      /*object_status=*/MoqtObjectStatus::kGroupDoesNotExist,
+      /*subgroup_id=*/0,
+      /*payload_length=*/0,
+  };
+  for (uint64_t i = 0; i < kNumStreams; ++i) {
+    EXPECT_CALL(data[i], GetStreamId())
+        .WillRepeatedly(Return(kOutgoingUniStreamId + i * 4));
+    EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId + i * 4))
+        .WillRepeatedly(Return(&data[i]));
+    object.group_id = i;
+    DeliverObject(object, true, mock_session_, &data[i], data_streams[i],
+                  &remote_track_visitor);
+  }
+  for (uint64_t i = 0; i < kNumStreams; ++i) {
+    data_streams[i].reset();
+  }
+  SubscribeRemoteTrack* track = MoqtSessionPeer::remote_track(&session_, 0);
+  ASSERT_NE(track, nullptr);
+  EXPECT_FALSE(track->all_streams_closed());
+  // stream_count includes a stream that was never sent.
+  stream_input->OnSubscribeDoneMessage(MoqtSubscribeDone(
+      0, SubscribeDoneCode::kTrackEnded, kNumStreams + 1, "foo"));
+  EXPECT_FALSE(track->all_streams_closed());
+  auto* subscribe_done_alarm =
+      static_cast<quic::test::MockAlarmFactory::TestAlarm*>(
+          MoqtSessionPeer::GetSubscribeDoneAlarm(track));
+  EXPECT_CALL(remote_track_visitor, OnSubscribeDone(_));
+  subscribe_done_alarm->Fire();
+  // quic::test::MockAlarmFactory::FireAlarm(subscribe_done_alarm);;
+  EXPECT_EQ(MoqtSessionPeer::remote_track(&session_, 0), nullptr);
+}
+
 // TODO: re-enable this test once this behavior is re-implemented.
 #if 0
 TEST_F(MoqtSessionTest, SubscribeUpdateClosesSubscription) {
diff --git a/quiche/quic/moqt/moqt_track.cc b/quiche/quic/moqt/moqt_track.cc
index eededa5..4e131a4 100644
--- a/quiche/quic/moqt/moqt_track.cc
+++ b/quiche/quic/moqt/moqt_track.cc
@@ -4,6 +4,8 @@
 
 #include "quiche/quic/moqt/moqt_track.h"
 
+#include <algorithm>
+#include <cstdint>
 #include <cstring>
 #include <memory>
 #include <optional>
@@ -11,6 +13,9 @@
 
 #include "absl/status/status.h"
 #include "absl/strings/string_view.h"
+#include "quiche/quic/core/quic_alarm.h"
+#include "quiche/quic/core/quic_clock.h"
+#include "quiche/quic/core/quic_time.h"
 #include "quiche/quic/moqt/moqt_messages.h"
 #include "quiche/quic/moqt/moqt_publisher.h"
 #include "quiche/common/platform/api/quiche_bug_tracker.h"
@@ -21,6 +26,15 @@
 
 namespace moqt {
 
+namespace {
+
+constexpr quic::QuicTimeDelta kMinSubscribeDoneTimeout =
+    quic::QuicTimeDelta::FromSeconds(1);
+constexpr quic::QuicTimeDelta kMaxSubscribeDoneTimeout =
+    quic::QuicTimeDelta::FromSeconds(10);
+
+}  // namespace
+
 bool RemoteTrack::CheckDataStreamType(MoqtDataStreamType type) {
   if (data_stream_type_.has_value()) {
     return data_stream_type_.value() == type;
@@ -29,6 +43,42 @@
   return true;
 }
 
+void SubscribeRemoteTrack::OnStreamOpened() {
+  ++currently_open_streams_;
+  if (subscribe_done_alarm_ != nullptr && subscribe_done_alarm_->IsSet()) {
+    subscribe_done_alarm_->Cancel();
+  }
+}
+
+void SubscribeRemoteTrack::OnStreamClosed() {
+  ++streams_closed_;
+  --currently_open_streams_;
+  QUICHE_DCHECK_GE(currently_open_streams_, -1);
+  if (subscribe_done_alarm_ == nullptr) {
+    return;
+  }
+  MaybeSetSubscribeDoneAlarm();
+}
+
+void SubscribeRemoteTrack::OnSubscribeDone(
+    uint64_t stream_count, const quic::QuicClock* clock,
+    std::unique_ptr<quic::QuicAlarm> subscribe_done_alarm) {
+  total_streams_ = stream_count;
+  clock_ = clock;
+  subscribe_done_alarm_ = std::move(subscribe_done_alarm);
+  MaybeSetSubscribeDoneAlarm();
+}
+
+void SubscribeRemoteTrack::MaybeSetSubscribeDoneAlarm() {
+  if (currently_open_streams_ == 0 && total_streams_.has_value() &&
+      clock_ != nullptr) {
+    quic::QuicTimeDelta timeout =
+        std::min(delivery_timeout_, kMaxSubscribeDoneTimeout);
+    timeout = std::max(timeout, kMinSubscribeDoneTimeout);
+    subscribe_done_alarm_->Set(clock_->ApproximateNow() + timeout);
+  }
+}
+
 void SubscribeRemoteTrack::OnJoiningFetchReady(
     std::unique_ptr<MoqtFetchTask> fetch_task) {
   fetch_task_ = std::move(fetch_task);
diff --git a/quiche/quic/moqt/moqt_track.h b/quiche/quic/moqt/moqt_track.h
index dd9f6cd..b1db0c3 100644
--- a/quiche/quic/moqt/moqt_track.h
+++ b/quiche/quic/moqt/moqt_track.h
@@ -12,6 +12,7 @@
 
 #include "absl/status/status.h"
 #include "absl/strings/string_view.h"
+#include "quiche/quic/core/quic_alarm.h"
 #include "quiche/quic/core/quic_time.h"
 #include "quiche/quic/moqt/moqt_messages.h"
 #include "quiche/quic/moqt/moqt_priority.h"
@@ -25,6 +26,7 @@
 namespace moqt {
 
 namespace test {
+class MoqtSessionPeer;
 class SubscribeRemoteTrackPeer;
 }  // namespace test
 
@@ -110,9 +112,11 @@
         const FullTrackName& full_track_name, FullSequence sequence,
         MoqtPriority publisher_priority, MoqtObjectStatus object_status,
         absl::string_view object, bool end_of_message) = 0;
-    // TODO(martinduke): Add final sequence numbers
+    virtual void OnSubscribeDone(FullTrackName full_track_name) = 0;
   };
-  SubscribeRemoteTrack(const MoqtSubscribe& subscribe, Visitor* visitor)
+  SubscribeRemoteTrack(
+      const MoqtSubscribe& subscribe, Visitor* visitor,
+      quic::QuicTimeDelta delivery_timeout = quic::QuicTimeDelta::Infinite())
       : RemoteTrack(subscribe.full_track_name, subscribe.subscribe_id,
                     SubscribeWindow(subscribe.start_group.value_or(0),
                                     subscribe.start_object.value_or(0),
@@ -120,7 +124,13 @@
                                     UINT64_MAX)),
         track_alias_(subscribe.track_alias),
         visitor_(visitor),
+        delivery_timeout_(delivery_timeout),
         subscribe_(std::make_unique<MoqtSubscribe>(subscribe)) {}
+  ~SubscribeRemoteTrack() override {
+    if (subscribe_done_alarm_ != nullptr) {
+      subscribe_done_alarm_->PermanentCancel();
+    }
+  }
 
   void OnObjectOrOk() override {
     subscribe_.reset();  // No SUBSCRIBE_ERROR, no need to store this anymore.
@@ -143,6 +153,13 @@
       return (is_datagram_ == is_datagram);
     }
   }
+  void OnStreamOpened();
+  void OnStreamClosed();
+  void OnSubscribeDone(uint64_t stream_count, const quic::QuicClock* clock,
+                       std::unique_ptr<quic::QuicAlarm> subscribe_done_alarm);
+  bool all_streams_closed() const {
+    return total_streams_.has_value() && *total_streams_ == streams_closed_;
+  }
 
   // The application can request a Joining FETCH but also for FETCH objects to
   // be delivered via SubscribeRemoteTrack::Visitor::OnObjectFragment(). When
@@ -151,13 +168,28 @@
   void OnJoiningFetchReady(std::unique_ptr<MoqtFetchTask> fetch_task);
 
  private:
+  friend class test::MoqtSessionPeer;
   friend class test::SubscribeRemoteTrackPeer;
+
+  void MaybeSetSubscribeDoneAlarm();
+
   void FetchObjects();
   std::unique_ptr<MoqtFetchTask> fetch_task_;
 
   const uint64_t track_alias_;
   Visitor* visitor_;
   std::optional<bool> is_datagram_;
+  int currently_open_streams_ = 0;
+  // Every stream that has received FIN or RESET_STREAM.
+  uint64_t streams_closed_ = 0;
+  // Value assigned on SUBSCRIBE_DONE. Can destroy subscription state if
+  // streams_closed_ == total_streams_.
+  std::optional<uint64_t> total_streams_;
+  // Timer to clean up the track if there are no open streams.
+  quic::QuicTimeDelta delivery_timeout_ = quic::QuicTimeDelta::Infinite();
+  std::unique_ptr<quic::QuicAlarm> subscribe_done_alarm_ = nullptr;
+  const quic::QuicClock* clock_ = nullptr;
+
   // For convenience, store the subscribe message if it has to be re-sent with
   // a new track alias.
   std::unique_ptr<MoqtSubscribe> subscribe_;
diff --git a/quiche/quic/moqt/moqt_track_test.cc b/quiche/quic/moqt/moqt_track_test.cc
index a0d05da..13a6700 100644
--- a/quiche/quic/moqt/moqt_track_test.cc
+++ b/quiche/quic/moqt/moqt_track_test.cc
@@ -8,7 +8,10 @@
 #include <optional>
 #include <utility>
 
+#include "absl/memory/memory.h"
 #include "absl/status/status.h"
+#include "quiche/quic/core/quic_alarm.h"
+#include "quiche/quic/core/quic_default_clock.h"
 #include "quiche/quic/moqt/moqt_messages.h"
 #include "quiche/quic/moqt/moqt_publisher.h"
 #include "quiche/quic/moqt/tools/moqt_mock_visitor.h"
@@ -20,10 +23,21 @@
 
 namespace test {
 
+namespace {
+
 using ::testing::_;
 using ::testing::Invoke;
 using ::testing::Return;
 
+class AlarmDelegate : public quic::QuicAlarm::DelegateWithoutContext {
+ public:
+  AlarmDelegate(bool* fired) : fired_(fired) {}
+  void OnAlarm() override { *fired_ = true; }
+  bool* fired_;
+};
+
+}  // namespace
+
 class SubscribeRemoteTrackPeer {
  public:
   static MoqtFetchTask* GetFetchTask(SubscribeRemoteTrack* track) {
@@ -79,56 +93,6 @@
   EXPECT_FALSE(track_.InWindow(FullSequence(2, 0)));
 }
 
-TEST_F(SubscribeRemoteTrackTest, JoiningFetch) {
-  auto fetch_task = std::make_unique<MockFetchTask>();
-  MockFetchTask* fetch = fetch_task.get();
-  EXPECT_CALL(*fetch, GetStatus()).WillRepeatedly(Return(absl::OkStatus()));
-  EXPECT_CALL(*fetch, GetNextObject(_))
-      .WillOnce(Invoke([](PublishedObject& object) {
-        object.sequence = FullSequence(0, 0);
-        object.status = MoqtObjectStatus::kNormal;
-        object.publisher_priority = 128;
-        object.payload = quic::test::MemSliceFromString("foobar");
-        object.fin_after_this = false;
-        return MoqtFetchTask::GetNextObjectResult::kSuccess;
-      }))
-      .WillOnce(Return(MoqtFetchTask::GetNextObjectResult::kPending));
-  EXPECT_CALL(visitor_, OnObjectFragment).Times(1);
-  track_.OnJoiningFetchReady(std::move(fetch_task));
-
-  EXPECT_CALL(*fetch, GetNextObject(_))
-      .WillOnce(Invoke([](PublishedObject& object) {
-        object.sequence = FullSequence(0, 1);
-        object.status = MoqtObjectStatus::kNormal;
-        object.publisher_priority = 128;
-        object.payload = quic::test::MemSliceFromString("foobar");
-        object.fin_after_this = false;
-        return MoqtFetchTask::GetNextObjectResult::kSuccess;
-      }))
-      .WillOnce(Return(MoqtFetchTask::GetNextObjectResult::kEof));
-  EXPECT_CALL(visitor_, OnObjectFragment).Times(1);
-  fetch->objects_available_callback()();
-  EXPECT_EQ(SubscribeRemoteTrackPeer::GetFetchTask(&track_), nullptr);
-}
-
-TEST_F(SubscribeRemoteTrackTest, JoiningFetchBadStatus) {
-  auto fetch_task = std::make_unique<MockFetchTask>();
-  MockFetchTask* fetch = fetch_task.get();
-  EXPECT_CALL(*fetch, GetStatus()).WillOnce(Return(absl::NotFoundError("foo")));
-  track_.OnJoiningFetchReady(std::move(fetch_task));
-  EXPECT_EQ(SubscribeRemoteTrackPeer::GetFetchTask(&track_), nullptr);
-}
-
-TEST_F(SubscribeRemoteTrackTest, JoiningFetchErrorReturn) {
-  auto fetch_task = std::make_unique<MockFetchTask>();
-  MockFetchTask* fetch = fetch_task.get();
-  EXPECT_CALL(*fetch, GetStatus()).WillRepeatedly(Return(absl::OkStatus()));
-  EXPECT_CALL(*fetch, GetNextObject(_))
-      .WillOnce(Return(MoqtFetchTask::GetNextObjectResult::kError));
-  track_.OnJoiningFetchReady(std::move(fetch_task));
-  EXPECT_EQ(SubscribeRemoteTrackPeer::GetFetchTask(&track_), nullptr);
-}
-
 class UpstreamFetchTest : public quic::test::QuicTest {
  protected:
   UpstreamFetchTest()
diff --git a/quiche/quic/moqt/test_tools/moqt_session_peer.h b/quiche/quic/moqt/test_tools/moqt_session_peer.h
index 052a76a..4368ed4 100644
--- a/quiche/quic/moqt/test_tools/moqt_session_peer.h
+++ b/quiche/quic/moqt/test_tools/moqt_session_peer.h
@@ -232,6 +232,11 @@
         ->delivery_timeout_alarm_.get();
   }
 
+  static quic::QuicAlarm* GetSubscribeDoneAlarm(
+      SubscribeRemoteTrack* subscription) {
+    return subscription->subscribe_done_alarm_.get();
+  }
+
   static quic::QuicAlarm* GetGoAwayTimeoutAlarm(MoqtSession* session) {
     return session->goaway_timeout_alarm_.get();
   }
diff --git a/quiche/quic/moqt/test_tools/moqt_test_message.h b/quiche/quic/moqt/test_tools/moqt_test_message.h
index 3021a2b..9da46f0 100644
--- a/quiche/quic/moqt/test_tools/moqt_test_message.h
+++ b/quiche/quic/moqt/test_tools/moqt_test_message.h
@@ -736,40 +736,36 @@
       QUIC_LOG(INFO) << "SUBSCRIBE_DONE status code mismatch";
       return false;
     }
+    if (cast.stream_count != subscribe_done_.stream_count) {
+      QUIC_LOG(INFO) << "SUBSCRIBE_DONE stream count mismatch";
+      return false;
+    }
     if (cast.reason_phrase != subscribe_done_.reason_phrase) {
       QUIC_LOG(INFO) << "SUBSCRIBE_DONE reason phrase mismatch";
       return false;
     }
-    if (cast.final_id != subscribe_done_.final_id) {
-      QUIC_LOG(INFO) << "SUBSCRIBE_DONE final ID mismatch";
-      return false;
-    }
+
     return true;
   }
 
-  void ExpandVarints() override { ExpandVarintsImpl("vvvvv---vv"); }
+  void ExpandVarints() override { ExpandVarintsImpl("vvvvvv--"); }
 
   MessageStructuredData structured_data() const override {
     return TestMessageBase::MessageStructuredData(subscribe_done_);
   }
 
-  void SetInvalidContentExists() {
-    raw_packet_[7] = 0x02;
-    SetWireImage(raw_packet_, sizeof(raw_packet_));
-  }
-
  private:
-  uint8_t raw_packet_[10] = {
-      0x0b, 0x08, 0x02, 0x02,  // subscribe_id = 2, error_code = 2,
+  uint8_t raw_packet_[8] = {
+      0x0b, 0x06, 0x02, 0x02,  // subscribe_id = 2, error_code = 2,
+      0x05,                    // stream_count = 5
       0x02, 0x68, 0x69,        // reason_phrase = "hi"
-      0x01, 0x08, 0x0c,        // final_id = (8,12)
   };
 
   MoqtSubscribeDone subscribe_done_ = {
       /*subscribe_id=*/2,
       /*error_code=*/SubscribeDoneCode::kTrackEnded,
+      /*stream_count=*/5,
       /*reason_phrase=*/"hi",
-      /*final_id=*/FullSequence(8, 12),
   };
 };
 
diff --git a/quiche/quic/moqt/tools/chat_client.h b/quiche/quic/moqt/tools/chat_client.h
index b56142c..046bfeb 100644
--- a/quiche/quic/moqt/tools/chat_client.h
+++ b/quiche/quic/moqt/tools/chat_client.h
@@ -104,6 +104,8 @@
                           absl::string_view object,
                           bool end_of_message) override;
 
+    void OnSubscribeDone(FullTrackName full_track_name) override {}
+
    private:
     ChatClient* client_;
   };
diff --git a/quiche/quic/moqt/tools/chat_server.h b/quiche/quic/moqt/tools/chat_server.h
index 5a4c3b9..d9cdc70 100644
--- a/quiche/quic/moqt/tools/chat_server.h
+++ b/quiche/quic/moqt/tools/chat_server.h
@@ -47,6 +47,7 @@
         moqt::MoqtPriority /*publisher_priority*/,
         moqt::MoqtObjectStatus /*status*/,
         absl::string_view object, bool end_of_message) override;
+    void OnSubscribeDone(FullTrackName /*full_track_name*/) override {}
 
    private:
     ChatServer* server_;
diff --git a/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc b/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc
index 691ea9e..d19bff3 100644
--- a/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc
+++ b/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc
@@ -192,6 +192,8 @@
       output.close();
     }
 
+    void OnSubscribeDone(FullTrackName /*full_track_name*/) override {}
+
    private:
     std::string directory_;
   };
diff --git a/quiche/quic/moqt/tools/moqt_mock_visitor.h b/quiche/quic/moqt/tools/moqt_mock_visitor.h
index c91bca5..6cb7007 100644
--- a/quiche/quic/moqt/tools/moqt_mock_visitor.h
+++ b/quiche/quic/moqt/tools/moqt_mock_visitor.h
@@ -101,6 +101,8 @@
                MoqtPriority publisher_priority, MoqtObjectStatus status,
                absl::string_view object, bool end_of_message),
               (override));
+  MOCK_METHOD(void, OnSubscribeDone, (FullTrackName full_track_name),
+              (override));
 };
 
 class MockPublishingMonitorInterface : public MoqtPublishingMonitorInterface {
diff --git a/quiche/quic/moqt/tools/moqt_simulator_bin.cc b/quiche/quic/moqt/tools/moqt_simulator_bin.cc
index edf965a..988ddee 100644
--- a/quiche/quic/moqt/tools/moqt_simulator_bin.cc
+++ b/quiche/quic/moqt/tools/moqt_simulator_bin.cc
@@ -316,6 +316,8 @@
     OnFullObject(sequence, object);
   }
 
+  void OnSubscribeDone(FullTrackName /*full_track_name*/) override {}
+
   void OnFullObject(FullSequence sequence, absl::string_view payload) {
     QUICHE_CHECK_GE(payload.size(), 8u);
     quiche::QuicheDataReader reader(payload);