Process incoming Unsubscribes.

Not in production.

PiperOrigin-RevId: 606667410
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index 0c425a1..11feb9e 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -643,6 +643,13 @@
   session_->active_subscribes_.erase(it);
 }
 
+void MoqtSession::Stream::OnUnsubscribeMessage(const MoqtUnsubscribe& message) {
+  // Search all the tracks to find the subscribe ID.
+  for (auto& [name, track] : session_->local_tracks_) {
+    track.DeleteWindow(message.subscribe_id);
+  }
+}
+
 void MoqtSession::Stream::OnAnnounceMessage(const MoqtAnnounce& message) {
   if (!CheckIfIsControlStream()) {
     return;
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index 9b77a11..911849a 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -157,7 +157,7 @@
     void OnSubscribeMessage(const MoqtSubscribe& message) override;
     void OnSubscribeOkMessage(const MoqtSubscribeOk& message) override;
     void OnSubscribeErrorMessage(const MoqtSubscribeError& message) override;
-    void OnUnsubscribeMessage(const MoqtUnsubscribe& /*message*/) override {}
+    void OnUnsubscribeMessage(const MoqtUnsubscribe& message) override;
     void OnSubscribeFinMessage(const MoqtSubscribeFin& /*message*/) override {}
     void OnSubscribeRstMessage(const MoqtSubscribeRst& /*message*/) override {}
     void OnAnnounceMessage(const MoqtAnnounce& message) override;
@@ -231,6 +231,7 @@
     MoqtSubscribe message;
     RemoteTrack::Visitor* visitor;
   };
+  // Outgoing SUBSCRIBEs that have not received SUBSCRIBE_OK or SUBSCRIBE_ERROR.
   absl::flat_hash_map<uint64_t, ActiveSubscribe> active_subscribes_;
   uint64_t next_subscribe_id_ = 0;
 
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index 1f4f9c0..a865118 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -871,6 +871,22 @@
   EXPECT_TRUE(reported_error);
 }
 
+TEST_F(MoqtSessionTest, ReceiveUnsubscribe) {
+  FullTrackName ftn("foo", "bar");
+  MockLocalTrackVisitor visitor;
+  session_.AddLocalTrack(ftn, &visitor);
+  MoqtSessionPeer::AddSubscription(&session_, ftn, 0, 1, 3, 4);
+  EXPECT_TRUE(session_.HasSubscribers(ftn));
+  StrictMock<webtransport::test::MockStream> mock_stream;
+  std::unique_ptr<MoqtParserVisitor> stream_input =
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
+  MoqtUnsubscribe unsubscribe = {
+      /*subscribe_id=*/0,
+  };
+  stream_input->OnUnsubscribeMessage(unsubscribe);
+  EXPECT_FALSE(session_.HasSubscribers(ftn));
+}
+
 // TODO: Cover more error cases in the above
 
 }  // namespace test
diff --git a/quiche/quic/moqt/moqt_subscribe_windows.cc b/quiche/quic/moqt/moqt_subscribe_windows.cc
index 335d2d8..9bea570 100644
--- a/quiche/quic/moqt/moqt_subscribe_windows.cc
+++ b/quiche/quic/moqt/moqt_subscribe_windows.cc
@@ -96,9 +96,9 @@
 std::vector<SubscribeWindow*> MoqtSubscribeWindows::SequenceIsSubscribed(
     FullSequence sequence) {
   std::vector<SubscribeWindow*> retval;
-  for (auto it = windows.begin(); it != windows.end(); it++) {
-    if (it->InWindow(sequence)) {
-      retval.push_back(&(*it));
+  for (auto& [subscribe_id, window] : windows_) {
+    if (window.InWindow(sequence)) {
+      retval.push_back(&(window));
     }
   }
   return retval;
diff --git a/quiche/quic/moqt/moqt_subscribe_windows.h b/quiche/quic/moqt/moqt_subscribe_windows.h
index f8def02..e6ec05d 100644
--- a/quiche/quic/moqt/moqt_subscribe_windows.h
+++ b/quiche/quic/moqt/moqt_subscribe_windows.h
@@ -6,11 +6,10 @@
 #define QUICHE_QUIC_MOQT_SUBSCRIBE_WINDOWS_H
 
 #include <cstdint>
-#include <list>
 #include <optional>
 #include <vector>
 
-#include "absl/container/flat_hash_map.h"
+#include "absl/container/node_hash_map.h"
 #include "quiche/quic/moqt/moqt_messages.h"
 #include "quiche/common/platform/api/quiche_export.h"
 #include "quiche/web_transport/web_transport.h"
@@ -77,12 +76,24 @@
 
   // |window| has already been converted into absolute sequence numbers. An
   // optimization could consolidate overlapping subscribe windows.
-  void AddWindow(SubscribeWindow window) { windows.push_front(window); }
+  void AddWindow(SubscribeWindow window) {
+    windows_.emplace(window.subscribe_id(), window);
+  }
+  void RemoveWindow(uint64_t subscribe_id) { windows_.erase(subscribe_id); }
 
-  bool IsEmpty() const { return windows.empty(); }
+  bool IsEmpty() const { return windows_.empty(); }
+
+  SubscribeWindow* GetWindow(uint64_t subscribe_id) {
+    auto it = windows_.find(subscribe_id);
+    if (it == windows_.end()) {
+      return nullptr;
+    }
+    return &it->second;
+  }
 
  private:
-  std::list<SubscribeWindow> windows;
+  // Indexed by Subscribe ID.
+  absl::node_hash_map<uint64_t, SubscribeWindow> windows_;
 };
 
 }  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_subscribe_windows_test.cc b/quiche/quic/moqt/moqt_subscribe_windows_test.cc
index 02e7007..8387a7e 100644
--- a/quiche/quic/moqt/moqt_subscribe_windows_test.cc
+++ b/quiche/quic/moqt/moqt_subscribe_windows_test.cc
@@ -166,6 +166,15 @@
   EXPECT_EQ(hits[1]->subscribe_id(), 0);
 }
 
+TEST_F(MoqtSubscribeWindowsTest, AddGetRemoveWindow) {
+  windows_.AddWindow(SubscribeWindow(0, 1, 0, 3, 9));
+  SubscribeWindow* window = windows_.GetWindow(0);
+  EXPECT_EQ(window->subscribe_id(), 0);
+  EXPECT_EQ(windows_.GetWindow(1), nullptr);
+  windows_.RemoveWindow(0);
+  EXPECT_EQ(windows_.GetWindow(0), nullptr);
+}
+
 }  // namespace test
 
 }  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_track.h b/quiche/quic/moqt/moqt_track.h
index 4d04afc..e52b084 100644
--- a/quiche/quic/moqt/moqt_track.h
+++ b/quiche/quic/moqt/moqt_track.h
@@ -56,6 +56,9 @@
   }
 
   void AddWindow(SubscribeWindow window) { windows_.AddWindow(window); }
+  void DeleteWindow(uint64_t subscribe_id) {
+    windows_.RemoveWindow(subscribe_id);
+  }
 
   // Returns the largest observed sequence, but increments the object sequence
   // by one.
@@ -70,6 +73,10 @@
 
   bool HasSubscriber() const { return !windows_.IsEmpty(); }
 
+  SubscribeWindow* GetWindow(uint64_t subscribe_id) {
+    return windows_.GetWindow(subscribe_id);
+  }
+
  private:
   // This only needs to track subscriptions to current and future objects;
   // requests for objects in the past are forwarded to the application.
diff --git a/quiche/quic/moqt/moqt_track_test.cc b/quiche/quic/moqt/moqt_track_test.cc
index 4d85b6b..46fd3c5 100644
--- a/quiche/quic/moqt/moqt_track_test.cc
+++ b/quiche/quic/moqt/moqt_track_test.cc
@@ -42,6 +42,14 @@
   EXPECT_EQ(track_.track_alias(), 6);
 }
 
+TEST_F(LocalTrackTest, AddGetDeleteWindow) {
+  track_.AddWindow(SubscribeWindow(0, 4, 1));
+  EXPECT_EQ(track_.GetWindow(0)->subscribe_id(), 0);
+  EXPECT_EQ(track_.GetWindow(1), nullptr);
+  track_.DeleteWindow(0);
+  EXPECT_EQ(track_.GetWindow(0), nullptr);
+}
+
 TEST_F(LocalTrackTest, ShouldSend) {
   track_.AddWindow(SubscribeWindow(0, 4, 1));
   EXPECT_TRUE(track_.HasSubscriber());