Send MoQT SUBSCRIBE_DONE when all objects are delivered. Tracks objects published and not yet published separately. This does not cover the case where the first unpublished object begins a new group, because we don't have state where the last published group ends. The patch for that will be in another CL. PiperOrigin-RevId: 643336131
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h index 8d68733..c117946 100644 --- a/quiche/quic/moqt/moqt_messages.h +++ b/quiche/quic/moqt/moqt_messages.h
@@ -156,6 +156,7 @@ object = other.object; return *this; } + FullSequence next() const { return FullSequence{group, object + 1}; } template <typename H> friend H AbslHashValue(H h, const FullSequence& m);
diff --git a/quiche/quic/moqt/moqt_outgoing_queue_test.cc b/quiche/quic/moqt/moqt_outgoing_queue_test.cc index 7e6662e..18808ee 100644 --- a/quiche/quic/moqt/moqt_outgoing_queue_test.cc +++ b/quiche/quic/moqt/moqt_outgoing_queue_test.cc
@@ -74,8 +74,8 @@ queue.AddObject(MemSliceFromString("a"), true); queue.AddObject(MemSliceFromString("b"), false); queue.AddObject(MemSliceFromString("c"), false); - queue.CallSubscribeForPast( - SubscribeWindow(0, MoqtForwardingPreference::kGroup, 0, 0)); + queue.CallSubscribeForPast(SubscribeWindow( + 0, MoqtForwardingPreference::kGroup, FullSequence(0, 3), 0, 0)); } TEST(MoqtOutgoingQueue, SingleGroupPastSubscribeFromMidGroup) { @@ -92,8 +92,8 @@ queue.AddObject(MemSliceFromString("a"), true); queue.AddObject(MemSliceFromString("b"), false); queue.AddObject(MemSliceFromString("c"), false); - queue.CallSubscribeForPast( - SubscribeWindow(0, MoqtForwardingPreference::kGroup, 0, 1)); + queue.CallSubscribeForPast(SubscribeWindow( + 0, MoqtForwardingPreference::kGroup, FullSequence(0, 3), 0, 1)); } TEST(MoqtOutgoingQueue, TwoGroups) { @@ -140,8 +140,8 @@ queue.AddObject(MemSliceFromString("d"), true); queue.AddObject(MemSliceFromString("e"), false); queue.AddObject(MemSliceFromString("f"), false); - queue.CallSubscribeForPast( - SubscribeWindow(0, MoqtForwardingPreference::kGroup, 0, 1)); + queue.CallSubscribeForPast(SubscribeWindow( + 0, MoqtForwardingPreference::kGroup, FullSequence(1, 3), 0, 1)); } TEST(MoqtOutgoingQueue, FiveGroups) { @@ -212,8 +212,8 @@ queue.AddObject(MemSliceFromString("h"), false); queue.AddObject(MemSliceFromString("i"), true); queue.AddObject(MemSliceFromString("j"), false); - queue.CallSubscribeForPast( - SubscribeWindow(0, MoqtForwardingPreference::kGroup, 0, 0)); + queue.CallSubscribeForPast(SubscribeWindow( + 0, MoqtForwardingPreference::kGroup, FullSequence(4, 2), 0, 0)); } } // namespace
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc index bb86c12..3c41df1 100644 --- a/quiche/quic/moqt/moqt_session.cc +++ b/quiche/quic/moqt/moqt_session.cc
@@ -8,6 +8,7 @@ #include <cstdint> #include <memory> #include <optional> +#include <set> #include <string> #include <utility> #include <vector> @@ -422,7 +423,8 @@ "immediately closed"; return false; } - track.SentSequence(FullSequence(group_id, object_id)); + FullSequence sequence{group_id, object_id}; + track.SentSequence(sequence); std::vector<SubscribeWindow*> subscriptions = track.ShouldSend({group_id, object_id}); if (subscriptions.empty()) { @@ -439,8 +441,11 @@ int failures = 0; quiche::StreamWriteOptions write_options; write_options.set_send_fin(end_of_stream); + absl::flat_hash_set<uint64_t> subscribes_to_close; for (auto subscription : subscriptions) { - subscription->OnObjectDelivered(FullSequence(group_id, object_id)); + if (subscription->OnObjectSent(sequence)) { + subscribes_to_close.insert(subscription->subscribe_id()); + } if (forwarding_preference == MoqtForwardingPreference::kDatagram) { object.subscribe_id = subscription->subscribe_id(); quiche::QuicheBuffer datagram = @@ -452,7 +457,7 @@ } bool new_stream = false; std::optional<webtransport::StreamId> stream_id = - subscription->GetStreamForSequence(FullSequence(group_id, object_id)); + subscription->GetStreamForSequence(sequence); if (!stream_id.has_value()) { new_stream = true; stream_id = OpenUnidirectionalStream(); @@ -491,6 +496,9 @@ subscription->RemoveStream(group_id, object_id); } } + for (uint64_t subscribe_id : subscribes_to_close) { + SubscribeIsDone(subscribe_id, SubscribeDoneCode::kSubscriptionEnded, ""); + } return (failures == 0); } @@ -741,11 +749,8 @@ } LocalTrack::Visitor::PublishPastObjectsCallback publish_past_objects; SubscribeWindow window = - end.has_value() - ? SubscribeWindow(message.subscribe_id, track.forwarding_preference(), - start.group, start.object, end->group, end->object) - : SubscribeWindow(message.subscribe_id, track.forwarding_preference(), - start.group, start.object); + SubscribeWindow(message.subscribe_id, track.forwarding_preference(), + track.next_sequence(), start, end); if (start < track.next_sequence() && track.visitor() != nullptr) { absl::StatusOr<LocalTrack::Visitor::PublishPastObjectsCallback> past_objects_available = track.visitor()->OnSubscribeForPast(window);
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h index 7e5fb7f..90ac894 100644 --- a/quiche/quic/moqt/moqt_session.h +++ b/quiche/quic/moqt/moqt_session.h
@@ -127,9 +127,6 @@ absl::string_view name, RemoteTrack::Visitor* visitor, absl::string_view auth_info = ""); - // Returns true if SUBSCRIBE_DONE was sent. - bool SubscribeIsDone(uint64_t subscribe_id, SubscribeDoneCode code, - absl::string_view reason_phrase); // Returns false if it could not open a stream when necessary, or if the // track does not exist (there was no call to AddLocalTrack). Will still @@ -222,6 +219,9 @@ std::string partial_object_; }; + // Returns true if SUBSCRIBE_DONE was sent. + bool SubscribeIsDone(uint64_t subscribe_id, SubscribeDoneCode code, + absl::string_view reason_phrase); // Returns the pointer to the control stream, or nullptr if none is present. Stream* GetControlStream(); // Sends a message on the control stream; QUICHE_DCHECKs if no control stream
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc index 0b40053..89877b2 100644 --- a/quiche/quic/moqt/moqt_session_test.cc +++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -1233,7 +1233,7 @@ MoqtSessionPeer::AddSubscription(&session_, ftn, 0, 2, 5, 0); // Get the window, set the maximum delivered. LocalTrack& track = MoqtSessionPeer::local_track(&session_, ftn); - track.GetWindow(0)->OnObjectDelivered(FullSequence(7, 3)); + track.GetWindow(0)->OnObjectSent(FullSequence(7, 3)); // Update the end to fall at the last delivered object. MoqtSubscribeUpdate update = { /*subscribe_id=*/0,
diff --git a/quiche/quic/moqt/moqt_subscribe_windows.cc b/quiche/quic/moqt/moqt_subscribe_windows.cc index bcbb40a..3e07a0c 100644 --- a/quiche/quic/moqt/moqt_subscribe_windows.cc +++ b/quiche/quic/moqt/moqt_subscribe_windows.cc
@@ -54,6 +54,41 @@ send_streams_.erase(index); } +bool SubscribeWindow::OnObjectSent(FullSequence sequence) { + if (!largest_delivered_.has_value() || *largest_delivered_ < sequence) { + largest_delivered_ = sequence; + } + // Update next_to_backfill_ + if (sequence < original_next_object_ && next_to_backfill_.has_value() && + *next_to_backfill_ <= sequence) { + next_to_backfill_ = sequence.next(); + if (next_to_backfill_ == original_next_object_ || + *next_to_backfill_ == end_) { + // Redelivery is complete. + next_to_backfill_ = std::nullopt; + } + } + // TODO(martinduke): If the subscription ends in a full group with undefined + // object sequence, the only way to know to send SUBSCRIBE_DONE is by getting + // an upstream SUBSCRIBE_DONE. + return (!next_to_backfill_.has_value() && end_.has_value() && + *end_ <= sequence); +} + +bool SubscribeWindow::UpdateStartEnd(FullSequence start, + std::optional<FullSequence> end) { + // Can't make the subscription window bigger. + if (!InWindow(start)) { + return false; + } + if (end_.has_value() && (!end.has_value() || *end_ < *end)) { + return false; + } + start_ = start; + end_ = end; + return true; +} + FullSequence SubscribeWindow::SequenceToIndex(FullSequence sequence) const { switch (forwarding_preference_) { case MoqtForwardingPreference::kTrack:
diff --git a/quiche/quic/moqt/moqt_subscribe_windows.h b/quiche/quic/moqt/moqt_subscribe_windows.h index 40c2564..3bb2482 100644 --- a/quiche/quic/moqt/moqt_subscribe_windows.h +++ b/quiche/quic/moqt/moqt_subscribe_windows.h
@@ -21,23 +21,38 @@ // subscribed, the streams involved, and the subscribe IDs. class QUICHE_EXPORT SubscribeWindow { public: - // Creates a half-open window. + // Creates a half-open window. |next_object| is the expected sequence number + // of the next published object on the track. SubscribeWindow(uint64_t subscribe_id, MoqtForwardingPreference forwarding_preference, - uint64_t start_group, uint64_t start_object) - : subscribe_id_(subscribe_id), - start_({start_group, start_object}), - forwarding_preference_(forwarding_preference) {} + FullSequence next_object, uint64_t start_group, + uint64_t start_object) + : SubscribeWindow(subscribe_id, forwarding_preference, next_object, + FullSequence(start_group, start_object), std::nullopt) { + } // Creates a closed window. SubscribeWindow(uint64_t subscribe_id, MoqtForwardingPreference forwarding_preference, - uint64_t start_group, uint64_t start_object, - uint64_t end_group, uint64_t end_object) + FullSequence next_object, uint64_t start_group, + uint64_t start_object, uint64_t end_group, + uint64_t end_object) + : SubscribeWindow(subscribe_id, forwarding_preference, next_object, + FullSequence(start_group, start_object), + FullSequence(end_group, end_object)) {} + + SubscribeWindow(uint64_t subscribe_id, + MoqtForwardingPreference forwarding_preference, + FullSequence next_object, FullSequence start, + std::optional<FullSequence> end) : subscribe_id_(subscribe_id), - start_({start_group, start_object}), - end_(FullSequence(end_group, end_object)), - forwarding_preference_(forwarding_preference) {} + start_(start), + end_(end), + original_next_object_(next_object), + forwarding_preference_(forwarding_preference) { + next_to_backfill_ = + (start < next_object) ? start : std::optional<FullSequence>(); + } uint64_t subscribe_id() const { return subscribe_id_; } @@ -59,29 +74,15 @@ return forwarding_preference_; } - void OnObjectDelivered(FullSequence sequence) { - if (!largest_delivered_.has_value() || *largest_delivered_ < sequence) { - largest_delivered_ = sequence; - } - } + // Returns true if the object delivery completed the subscription + bool OnObjectSent(FullSequence sequence); std::optional<FullSequence>& largest_delivered() { return largest_delivered_; } // Returns true if the updated values are valid. - bool UpdateStartEnd(FullSequence start, std::optional<FullSequence> end) { - // Can't make the subscription window bigger. - if (!InWindow(start)) { - return false; - } - if (end_.has_value() && (!end.has_value() || *end_ < *end)) { - return false; - } - start_ = start; - end_ = end; - return true; - } + bool UpdateStartEnd(FullSequence start, std::optional<FullSequence> end); private: // Converts an object sequence number into one that matches the way that @@ -90,8 +91,13 @@ const uint64_t subscribe_id_; FullSequence start_; - std::optional<FullSequence> end_ = std::nullopt; + std::optional<FullSequence> end_; std::optional<FullSequence> largest_delivered_; + // The next sequence number to be redelivered, because it was published prior + // to the subscription. Is nullopt if no redeliveries are needed. + std::optional<FullSequence> next_to_backfill_; + // The first unpublished sequence number when the subscribe arrived. + const FullSequence original_next_object_; // Store open streams for this subscription. If the forwarding preference is // kTrack, there is one entry under sequence (0, 0). If kGroup, each entry is // under (group, 0). If kObject, it's tracked under the full sequence. If @@ -115,19 +121,19 @@ // |start_group| and |start_object| must be absolute sequence numbers. An // optimization could consolidate overlapping subscribe windows. - void AddWindow(uint64_t subscribe_id, uint64_t start_group, - uint64_t start_object) { + void AddWindow(uint64_t subscribe_id, FullSequence next_object, + uint64_t start_group, uint64_t start_object) { windows_.emplace(subscribe_id, SubscribeWindow(subscribe_id, forwarding_preference_, - start_group, start_object)); + next_object, start_group, start_object)); } - void AddWindow(uint64_t subscribe_id, uint64_t start_group, - uint64_t start_object, uint64_t end_group, - uint64_t end_object) { + void AddWindow(uint64_t subscribe_id, FullSequence next_object, + uint64_t start_group, uint64_t start_object, + uint64_t end_group, uint64_t end_object) { windows_.emplace( subscribe_id, - SubscribeWindow(subscribe_id, forwarding_preference_, start_group, - start_object, end_group, end_object)); + SubscribeWindow(subscribe_id, forwarding_preference_, next_object, + start_group, start_object, end_group, end_object)); } void RemoveWindow(uint64_t subscribe_id) { windows_.erase(subscribe_id); }
diff --git a/quiche/quic/moqt/moqt_subscribe_windows_test.cc b/quiche/quic/moqt/moqt_subscribe_windows_test.cc index a144474..487cc5d 100644 --- a/quiche/quic/moqt/moqt_subscribe_windows_test.cc +++ b/quiche/quic/moqt/moqt_subscribe_windows_test.cc
@@ -4,13 +4,13 @@ #include "quiche/quic/moqt/moqt_subscribe_windows.h" +#include <cstdint> #include <optional> #include "quiche/quic/moqt/moqt_messages.h" #include "quiche/quic/platform/api/quic_expect_bug.h" #include "quiche/quic/platform/api/quic_test.h" #include "quiche/common/platform/api/quiche_export.h" -#include "quiche/web_transport/web_transport.h" namespace moqt { @@ -21,15 +21,14 @@ SubscribeWindowTest() {} const uint64_t subscribe_id_ = 2; - const uint64_t start_group_ = 4; - const uint64_t start_object_ = 0; - const uint64_t end_group_ = 5; - const uint64_t end_object_ = 5; + const FullSequence right_edge_{4, 5}; + const FullSequence start_{4, 0}; + const FullSequence end_{5, 5}; }; TEST_F(SubscribeWindowTest, Queries) { SubscribeWindow window(subscribe_id_, MoqtForwardingPreference::kObject, - start_group_, start_object_, end_group_, end_object_); + right_edge_, start_, end_); EXPECT_EQ(window.subscribe_id(), 2); EXPECT_TRUE(window.InWindow(FullSequence(4, 0))); EXPECT_TRUE(window.InWindow(FullSequence(5, 5))); @@ -40,7 +39,7 @@ TEST_F(SubscribeWindowTest, AddQueryRemoveStreamIdTrack) { SubscribeWindow window(subscribe_id_, MoqtForwardingPreference::kTrack, - start_group_, start_object_, end_group_, end_object_); + right_edge_, start_, end_); window.AddStream(4, 0, 2); EXPECT_QUIC_BUG(window.AddStream(5, 2, 6), "Stream already added"); EXPECT_EQ(*window.GetStreamForSequence(FullSequence(5, 2)), 2); @@ -50,7 +49,7 @@ TEST_F(SubscribeWindowTest, AddQueryRemoveStreamIdGroup) { SubscribeWindow window(subscribe_id_, MoqtForwardingPreference::kGroup, - start_group_, start_object_, end_group_, end_object_); + right_edge_, start_, end_); window.AddStream(4, 0, 2); EXPECT_FALSE(window.GetStreamForSequence(FullSequence(5, 0)).has_value()); window.AddStream(5, 2, 6); @@ -63,7 +62,7 @@ TEST_F(SubscribeWindowTest, AddQueryRemoveStreamIdObject) { SubscribeWindow window(subscribe_id_, MoqtForwardingPreference::kObject, - start_group_, start_object_, end_group_, end_object_); + right_edge_, start_, end_); window.AddStream(4, 0, 2); window.AddStream(4, 1, 6); window.AddStream(4, 2, 10); @@ -78,47 +77,65 @@ TEST_F(SubscribeWindowTest, AddQueryRemoveStreamIdDatagram) { SubscribeWindow window(subscribe_id_, MoqtForwardingPreference::kDatagram, - start_group_, start_object_, end_group_, end_object_); + right_edge_, start_, end_); EXPECT_QUIC_BUG(window.AddStream(4, 0, 2), "Adding a stream for datagram"); } -TEST_F(SubscribeWindowTest, OnObjectDelivered) { +TEST_F(SubscribeWindowTest, OnObjectSent) { SubscribeWindow window(subscribe_id_, MoqtForwardingPreference::kObject, - start_group_, start_object_, end_group_, end_object_); + right_edge_, start_, end_); EXPECT_FALSE(window.largest_delivered().has_value()); - window.OnObjectDelivered(FullSequence(4, 1)); + EXPECT_FALSE(window.OnObjectSent(FullSequence(4, 1))); EXPECT_TRUE(window.largest_delivered().has_value()); EXPECT_EQ(window.largest_delivered().value(), FullSequence(4, 1)); - window.OnObjectDelivered(FullSequence(4, 2)); + EXPECT_FALSE(window.OnObjectSent(FullSequence(4, 2))); EXPECT_EQ(window.largest_delivered().value(), FullSequence(4, 2)); - window.OnObjectDelivered(FullSequence(4, 0)); + EXPECT_FALSE(window.OnObjectSent(FullSequence(4, 0))); EXPECT_EQ(window.largest_delivered().value(), FullSequence(4, 2)); } +TEST_F(SubscribeWindowTest, AllObjectsUnpublishedAtStart) { + SubscribeWindow window(subscribe_id_, MoqtForwardingPreference::kObject, + FullSequence(0, 0), FullSequence(0, 0), + FullSequence(0, 1)); + EXPECT_FALSE(window.OnObjectSent(FullSequence(0, 0))); + EXPECT_TRUE(window.OnObjectSent(FullSequence(0, 1))); +} + +TEST_F(SubscribeWindowTest, AllObjectsPublishedAtStart) { + SubscribeWindow window(subscribe_id_, MoqtForwardingPreference::kObject, + FullSequence(4, 0), FullSequence(0, 0), + FullSequence(0, 1)); + EXPECT_FALSE(window.OnObjectSent(FullSequence(0, 0))); + EXPECT_TRUE(window.OnObjectSent(FullSequence(0, 1))); +} + +TEST_F(SubscribeWindowTest, SomeObjectsUnpublishedAtStart) { + SubscribeWindow window(subscribe_id_, MoqtForwardingPreference::kObject, + FullSequence(0, 1), FullSequence(0, 0), + FullSequence(0, 1)); + EXPECT_FALSE(window.OnObjectSent(FullSequence(0, 0))); + EXPECT_TRUE(window.OnObjectSent(FullSequence(0, 1))); +} + TEST_F(SubscribeWindowTest, UpdateStartEnd) { SubscribeWindow window(subscribe_id_, MoqtForwardingPreference::kObject, - start_group_, start_object_, end_group_, end_object_); - EXPECT_TRUE( - window.UpdateStartEnd(FullSequence(start_group_, start_object_ + 1), - FullSequence(end_group_, end_object_ - 1))); - EXPECT_FALSE(window.InWindow(FullSequence(start_group_, start_object_))); - EXPECT_FALSE(window.InWindow(FullSequence(end_group_, end_object_))); + right_edge_, start_, end_); + EXPECT_TRUE(window.UpdateStartEnd(start_.next(), + FullSequence(end_.group, end_.object - 1))); + EXPECT_FALSE(window.InWindow(FullSequence(start_.group, start_.object))); + EXPECT_FALSE(window.InWindow(FullSequence(end_.group, end_.object))); EXPECT_FALSE( - window.UpdateStartEnd(FullSequence(start_group_, start_object_), - FullSequence(end_group_, end_object_ - 1))); - EXPECT_FALSE( - window.UpdateStartEnd(FullSequence(start_group_, start_object_ + 1), - FullSequence(end_group_, end_object_))); + window.UpdateStartEnd(start_, FullSequence(end_.group, end_.object - 1))); + EXPECT_FALSE(window.UpdateStartEnd(start_.next(), end_)); } TEST_F(SubscribeWindowTest, UpdateStartEndOpenEnded) { SubscribeWindow window(subscribe_id_, MoqtForwardingPreference::kObject, - start_group_, start_object_); - EXPECT_TRUE(window.UpdateStartEnd(FullSequence(start_group_, start_object_), - FullSequence(end_group_, end_object_))); - EXPECT_FALSE(window.InWindow(FullSequence(end_group_, end_object_ + 1))); - EXPECT_FALSE(window.UpdateStartEnd(FullSequence(start_group_, start_object_), - std::nullopt)); + right_edge_, start_, std::nullopt); + EXPECT_TRUE(window.UpdateStartEnd(start_, end_)); + EXPECT_FALSE(window.InWindow(end_.next())); + EXPECT_FALSE(window.UpdateStartEnd(start_, std::nullopt)); } class QUICHE_EXPORT MoqtSubscribeWindowsTest : public quic::test::QuicTest { @@ -129,16 +146,16 @@ TEST_F(MoqtSubscribeWindowsTest, IsEmpty) { EXPECT_TRUE(windows_.IsEmpty()); - windows_.AddWindow(0, 1, 3); + windows_.AddWindow(0, FullSequence(2, 1), 1, 3); EXPECT_FALSE(windows_.IsEmpty()); } TEST_F(MoqtSubscribeWindowsTest, IsSubscribed) { EXPECT_TRUE(windows_.IsEmpty()); // The first two windows overlap; the third is open-ended. - windows_.AddWindow(0, 1, 0, 3, 9); - windows_.AddWindow(1, 2, 4, 4, 3); - windows_.AddWindow(2, 10, 0); + windows_.AddWindow(0, FullSequence(0, 0), 1, 0, 3, 9); + windows_.AddWindow(1, FullSequence(0, 0), 2, 4, 4, 3); + windows_.AddWindow(2, FullSequence(0, 0), 10, 0); EXPECT_FALSE(windows_.IsEmpty()); EXPECT_TRUE(windows_.SequenceIsSubscribed(FullSequence(0, 8)).empty()); auto hits = windows_.SequenceIsSubscribed(FullSequence(1, 0)); @@ -155,7 +172,7 @@ } TEST_F(MoqtSubscribeWindowsTest, AddGetRemoveWindow) { - windows_.AddWindow(0, 1, 0, 3, 9); + windows_.AddWindow(0, FullSequence(2, 5), 1, 0, 3, 9); SubscribeWindow* window = windows_.GetWindow(0); EXPECT_EQ(window->subscribe_id(), 0); EXPECT_EQ(windows_.GetWindow(1), nullptr);
diff --git a/quiche/quic/moqt/moqt_track.h b/quiche/quic/moqt/moqt_track.h index 5f2957a..eaa20c6 100644 --- a/quiche/quic/moqt/moqt_track.h +++ b/quiche/quic/moqt/moqt_track.h
@@ -68,14 +68,14 @@ void AddWindow(uint64_t subscribe_id, uint64_t start_group, uint64_t start_object) { - windows_.AddWindow(subscribe_id, start_group, start_object); + windows_.AddWindow(subscribe_id, next_sequence_, start_group, start_object); } void AddWindow(uint64_t subscribe_id, uint64_t start_group, uint64_t start_object, uint64_t end_group, uint64_t end_object) { - windows_.AddWindow(subscribe_id, start_group, start_object, end_group, - end_object); + windows_.AddWindow(subscribe_id, next_sequence_, start_group, start_object, + end_group, end_object); } void DeleteWindow(uint64_t subscribe_id) {