Process incoming Unsubscribes.
Not in production.
PiperOrigin-RevId: 606667410
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index 0c425a1..11feb9e 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -643,6 +643,13 @@
session_->active_subscribes_.erase(it);
}
+void MoqtSession::Stream::OnUnsubscribeMessage(const MoqtUnsubscribe& message) {
+ // Search all the tracks to find the subscribe ID.
+ for (auto& [name, track] : session_->local_tracks_) {
+ track.DeleteWindow(message.subscribe_id);
+ }
+}
+
void MoqtSession::Stream::OnAnnounceMessage(const MoqtAnnounce& message) {
if (!CheckIfIsControlStream()) {
return;
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index 9b77a11..911849a 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -157,7 +157,7 @@
void OnSubscribeMessage(const MoqtSubscribe& message) override;
void OnSubscribeOkMessage(const MoqtSubscribeOk& message) override;
void OnSubscribeErrorMessage(const MoqtSubscribeError& message) override;
- void OnUnsubscribeMessage(const MoqtUnsubscribe& /*message*/) override {}
+ void OnUnsubscribeMessage(const MoqtUnsubscribe& message) override;
void OnSubscribeFinMessage(const MoqtSubscribeFin& /*message*/) override {}
void OnSubscribeRstMessage(const MoqtSubscribeRst& /*message*/) override {}
void OnAnnounceMessage(const MoqtAnnounce& message) override;
@@ -231,6 +231,7 @@
MoqtSubscribe message;
RemoteTrack::Visitor* visitor;
};
+ // Outgoing SUBSCRIBEs that have not received SUBSCRIBE_OK or SUBSCRIBE_ERROR.
absl::flat_hash_map<uint64_t, ActiveSubscribe> active_subscribes_;
uint64_t next_subscribe_id_ = 0;
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index 1f4f9c0..a865118 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -871,6 +871,22 @@
EXPECT_TRUE(reported_error);
}
+TEST_F(MoqtSessionTest, ReceiveUnsubscribe) {
+ FullTrackName ftn("foo", "bar");
+ MockLocalTrackVisitor visitor;
+ session_.AddLocalTrack(ftn, &visitor);
+ MoqtSessionPeer::AddSubscription(&session_, ftn, 0, 1, 3, 4);
+ EXPECT_TRUE(session_.HasSubscribers(ftn));
+ StrictMock<webtransport::test::MockStream> mock_stream;
+ std::unique_ptr<MoqtParserVisitor> stream_input =
+ MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
+ MoqtUnsubscribe unsubscribe = {
+ /*subscribe_id=*/0,
+ };
+ stream_input->OnUnsubscribeMessage(unsubscribe);
+ EXPECT_FALSE(session_.HasSubscribers(ftn));
+}
+
// TODO: Cover more error cases in the above
} // namespace test
diff --git a/quiche/quic/moqt/moqt_subscribe_windows.cc b/quiche/quic/moqt/moqt_subscribe_windows.cc
index 335d2d8..9bea570 100644
--- a/quiche/quic/moqt/moqt_subscribe_windows.cc
+++ b/quiche/quic/moqt/moqt_subscribe_windows.cc
@@ -96,9 +96,9 @@
std::vector<SubscribeWindow*> MoqtSubscribeWindows::SequenceIsSubscribed(
FullSequence sequence) {
std::vector<SubscribeWindow*> retval;
- for (auto it = windows.begin(); it != windows.end(); it++) {
- if (it->InWindow(sequence)) {
- retval.push_back(&(*it));
+ for (auto& [subscribe_id, window] : windows_) {
+ if (window.InWindow(sequence)) {
+ retval.push_back(&(window));
}
}
return retval;
diff --git a/quiche/quic/moqt/moqt_subscribe_windows.h b/quiche/quic/moqt/moqt_subscribe_windows.h
index f8def02..e6ec05d 100644
--- a/quiche/quic/moqt/moqt_subscribe_windows.h
+++ b/quiche/quic/moqt/moqt_subscribe_windows.h
@@ -6,11 +6,10 @@
#define QUICHE_QUIC_MOQT_SUBSCRIBE_WINDOWS_H
#include <cstdint>
-#include <list>
#include <optional>
#include <vector>
-#include "absl/container/flat_hash_map.h"
+#include "absl/container/node_hash_map.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/common/platform/api/quiche_export.h"
#include "quiche/web_transport/web_transport.h"
@@ -77,12 +76,24 @@
// |window| has already been converted into absolute sequence numbers. An
// optimization could consolidate overlapping subscribe windows.
- void AddWindow(SubscribeWindow window) { windows.push_front(window); }
+ void AddWindow(SubscribeWindow window) {
+ windows_.emplace(window.subscribe_id(), window);
+ }
+ void RemoveWindow(uint64_t subscribe_id) { windows_.erase(subscribe_id); }
- bool IsEmpty() const { return windows.empty(); }
+ bool IsEmpty() const { return windows_.empty(); }
+
+ SubscribeWindow* GetWindow(uint64_t subscribe_id) {
+ auto it = windows_.find(subscribe_id);
+ if (it == windows_.end()) {
+ return nullptr;
+ }
+ return &it->second;
+ }
private:
- std::list<SubscribeWindow> windows;
+ // Indexed by Subscribe ID.
+ absl::node_hash_map<uint64_t, SubscribeWindow> windows_;
};
} // namespace moqt
diff --git a/quiche/quic/moqt/moqt_subscribe_windows_test.cc b/quiche/quic/moqt/moqt_subscribe_windows_test.cc
index 02e7007..8387a7e 100644
--- a/quiche/quic/moqt/moqt_subscribe_windows_test.cc
+++ b/quiche/quic/moqt/moqt_subscribe_windows_test.cc
@@ -166,6 +166,15 @@
EXPECT_EQ(hits[1]->subscribe_id(), 0);
}
+TEST_F(MoqtSubscribeWindowsTest, AddGetRemoveWindow) {
+ windows_.AddWindow(SubscribeWindow(0, 1, 0, 3, 9));
+ SubscribeWindow* window = windows_.GetWindow(0);
+ EXPECT_EQ(window->subscribe_id(), 0);
+ EXPECT_EQ(windows_.GetWindow(1), nullptr);
+ windows_.RemoveWindow(0);
+ EXPECT_EQ(windows_.GetWindow(0), nullptr);
+}
+
} // namespace test
} // namespace moqt
diff --git a/quiche/quic/moqt/moqt_track.h b/quiche/quic/moqt/moqt_track.h
index 4d04afc..e52b084 100644
--- a/quiche/quic/moqt/moqt_track.h
+++ b/quiche/quic/moqt/moqt_track.h
@@ -56,6 +56,9 @@
}
void AddWindow(SubscribeWindow window) { windows_.AddWindow(window); }
+ void DeleteWindow(uint64_t subscribe_id) {
+ windows_.RemoveWindow(subscribe_id);
+ }
// Returns the largest observed sequence, but increments the object sequence
// by one.
@@ -70,6 +73,10 @@
bool HasSubscriber() const { return !windows_.IsEmpty(); }
+ SubscribeWindow* GetWindow(uint64_t subscribe_id) {
+ return windows_.GetWindow(subscribe_id);
+ }
+
private:
// This only needs to track subscriptions to current and future objects;
// requests for objects in the past are forwarded to the application.
diff --git a/quiche/quic/moqt/moqt_track_test.cc b/quiche/quic/moqt/moqt_track_test.cc
index 4d85b6b..46fd3c5 100644
--- a/quiche/quic/moqt/moqt_track_test.cc
+++ b/quiche/quic/moqt/moqt_track_test.cc
@@ -42,6 +42,14 @@
EXPECT_EQ(track_.track_alias(), 6);
}
+TEST_F(LocalTrackTest, AddGetDeleteWindow) {
+ track_.AddWindow(SubscribeWindow(0, 4, 1));
+ EXPECT_EQ(track_.GetWindow(0)->subscribe_id(), 0);
+ EXPECT_EQ(track_.GetWindow(1), nullptr);
+ track_.DeleteWindow(0);
+ EXPECT_EQ(track_.GetWindow(0), nullptr);
+}
+
TEST_F(LocalTrackTest, ShouldSend) {
track_.AddWindow(SubscribeWindow(0, 4, 1));
EXPECT_TRUE(track_.HasSubscriber());