| // Copyright 2025 The Chromium Authors. All rights reserved. | 
 | // Use of this source code is governed by a BSD-style license that can be | 
 | // found in the LICENSE file. | 
 |  | 
 | #ifndef QUICHE_QUIC_MOQT_RELAY_NAMESPACE_TREE_H_ | 
 | #define QUICHE_QUIC_MOQT_RELAY_NAMESPACE_TREE_H_ | 
 |  | 
 | #include <cstdint> | 
 | #include <memory> | 
 | #include <optional> | 
 | #include <string> | 
 |  | 
 | #include "absl/base/nullability.h" | 
 | #include "absl/container/flat_hash_map.h" | 
 | #include "absl/container/flat_hash_set.h" | 
 | #include "absl/strings/string_view.h" | 
 | #include "quiche/quic/moqt/moqt_messages.h" | 
 | #include "quiche/quic/moqt/moqt_session_interface.h" | 
 | #include "quiche/common/quiche_weak_ptr.h" | 
 |  | 
 | namespace moqt { | 
 |  | 
 | // A data structure for all namespaces an MOQT relay is aware of. | 
 | // For any given namespace, it stores all publishers, subscribers, and published | 
 | // tracks in that namespace. | 
 | // A subscriber must be notified of any publish in a child namespace, and a | 
 | // new PUBLISH(_NAMESPACE) has to find subscribers to parent namespaces. | 
 | // Therefore, this is a tree structure to easily and scalably move up and down | 
 | // the hierarchy to find parents or children. | 
 | class RelayNamespaceTree { | 
 |  public: | 
 |   // Adds a publisher to the namespace tree. The caller is responsible to call | 
 |   // RemovePublisher if it goes away. |session| is stored as a WeakPtr. | 
 |   void AddPublisher(const TrackNamespace& track_namespace, | 
 |                     MoqtSessionInterface* absl_nonnull session) { | 
 |     Node* node = FindOrCreateNode(track_namespace); | 
 |     if (node->publishers.empty()) { | 
 |       NotifyAllParents(track_namespace, /*adding=*/true); | 
 |     } | 
 |     node->publishers.emplace(session->GetWeakPtr()); | 
 |   } | 
 |  | 
 |   void RemovePublisher(const TrackNamespace& track_namespace, | 
 |                        MoqtSessionInterface* absl_nonnull session) { | 
 |     Node* node = FindNode(track_namespace); | 
 |     if (node == nullptr) { | 
 |       return; | 
 |     } | 
 |     node->publishers.erase(session->GetWeakPtr()); | 
 |     // Tell all the namespace listeners. | 
 |     if (node->publishers.empty()) { | 
 |       TrackNamespace mutable_namespace = track_namespace; | 
 |       NotifyAllParents(track_namespace, /*adding=*/false); | 
 |       MaybePrune(node, mutable_namespace); | 
 |     } | 
 |   } | 
 |  | 
 |   // The caller is responsible to call RemoveNamespaceListener if it goes away. | 
 |   // Thus, it is safe to store it as a raw pointer. | 
 |   void AddSubscriber(const TrackNamespace& track_namespace, | 
 |                      MoqtSessionInterface* absl_nonnull subscriber) { | 
 |     Node* node = FindOrCreateNode(track_namespace); | 
 |     node->subscribers.insert(subscriber->GetWeakPtr()); | 
 |     // Notify the listener of every published namespace and track in this | 
 |     // namespace. | 
 |     TrackNamespace mutable_namespace = track_namespace; | 
 |     NotifyOfAllChildren(node, mutable_namespace, subscriber); | 
 |   } | 
 |  | 
 |   void RemoveSubscriber(const TrackNamespace& track_namespace, | 
 |                         MoqtSessionInterface* absl_nonnull subscriber) { | 
 |     Node* node = FindNode(track_namespace); | 
 |     if (node == nullptr) { | 
 |       return; | 
 |     } | 
 |     node->subscribers.erase(subscriber->GetWeakPtr()); | 
 |     TrackNamespace mutable_namespace = track_namespace; | 
 |     MaybePrune(node, mutable_namespace); | 
 |   } | 
 |  | 
 |   // Returns a raw pointer to the session that publishes the smallest namespace | 
 |   // that contains |track_namespace|. If a WeakPtr is found to be invalid, | 
 |   // deletes them from the tree. | 
 |   MoqtSessionInterface* GetValidPublisher( | 
 |       const TrackNamespace& track_namespace) const { | 
 |     Node* node = FindNode(track_namespace); | 
 |     TrackNamespace mutable_namespace = track_namespace; | 
 |     while ((node == nullptr || node->publishers.empty()) && | 
 |            mutable_namespace.PopElement()) { | 
 |       node = FindNode(mutable_namespace); | 
 |     } | 
 |     if (node == nullptr || node->publishers.empty()) { | 
 |       return nullptr; | 
 |     } | 
 |     MoqtSessionInterface* upstream = node->publishers.begin()->GetIfAvailable(); | 
 |     if (!upstream) { | 
 |       QUICHE_BUG(publisher_is_invalid) | 
 |           << "Publisher WeakPtr is invalid but not removed from the set"; | 
 |       return nullptr; | 
 |     } | 
 |     return upstream; | 
 |   } | 
 |  | 
 |  protected: | 
 |   uint64_t NumNamespaces() const { return nodes_.size(); } | 
 |  | 
 |  private: | 
 |   struct Node { | 
 |     explicit Node(absl::string_view element) : element(element) {} | 
 |  | 
 |     const std::string element; | 
 |     absl::flat_hash_set<Node*> children; | 
 |  | 
 |     // Publishers of this namespace. | 
 |     absl::flat_hash_set<quiche::QuicheWeakPtr<MoqtSessionInterface>> publishers; | 
 |     // Just store the track name. Additional information will be in the | 
 |     // TrackPublisher. | 
 |     absl::flat_hash_set<std::string> published_tracks; | 
 |     absl::flat_hash_set<quiche::QuicheWeakPtr<MoqtSessionInterface>> | 
 |         subscribers; | 
 |     bool CanPrune() const { | 
 |       return children.empty() && publishers.empty() && | 
 |              published_tracks.empty() && subscribers.empty(); | 
 |     } | 
 |   }; | 
 |  | 
 |   Node* FindNode(const TrackNamespace& track_namespace) const { | 
 |     auto it = nodes_.find(track_namespace); | 
 |     if (it == nodes_.end()) { | 
 |       return nullptr; | 
 |     } | 
 |     return it->second.get(); | 
 |   } | 
 |  | 
 |   Node* FindOrCreateNode(const TrackNamespace& track_namespace) { | 
 |     auto [it, inserted] = | 
 |         nodes_.emplace(track_namespace, | 
 |                        std::make_unique<Node>(track_namespace.tuple().back())); | 
 |     if (!inserted) { | 
 |       return it->second.get(); | 
 |     } | 
 |     Node* node = it->second.get();  // store it in case it moves. | 
 |     TrackNamespace mutable_namespace = track_namespace; | 
 |     if (mutable_namespace.PopElement()) { | 
 |       Node* parent = FindOrCreateNode(mutable_namespace); | 
 |       parent->children.insert(node); | 
 |     } | 
 |     return node; | 
 |   } | 
 |  | 
 |   // Recursive function to notify |listener| of all published namespaces and | 
 |   // tracks in and below |node|. | 
 |   void NotifyOfAllChildren(Node* node, TrackNamespace& track_namespace, | 
 |                            MoqtSessionInterface* subscriber) { | 
 |     // TODO(martinduke): Publish everything in node->published_tracks. | 
 |     if (!node->publishers.empty()) { | 
 |       subscriber->PublishNamespace( | 
 |           track_namespace, | 
 |           [](const TrackNamespace&, std::optional<MoqtRequestError>) {}, | 
 |           // TODO(martinduke): Add parameters. | 
 |           VersionSpecificParameters()); | 
 |     } | 
 |     for (auto child = node->children.begin(); child != node->children.end(); | 
 |          ++child) { | 
 |       track_namespace.AddElement((*child)->element); | 
 |       NotifyOfAllChildren(*child, track_namespace, subscriber); | 
 |       track_namespace.PopElement(); | 
 |     } | 
 |   } | 
 |  | 
 |   // If |adding| is true, sends PUBLISH_NAMESPACE to all subscribers to a | 
 |   // parent namespace. If |adding| is false, sends PUBLISH_NAMESPACE_DONE. | 
 |   void NotifyAllParents(const TrackNamespace& track_namespace, bool adding) { | 
 |     TrackNamespace mutable_namespace = track_namespace; | 
 |     do { | 
 |       Node* node = FindNode(mutable_namespace); | 
 |       if (node == nullptr) { | 
 |         continue; | 
 |       } | 
 |       for (const quiche::QuicheWeakPtr<MoqtSessionInterface>& subscriber_ptr : | 
 |            node->subscribers) { | 
 |         MoqtSessionInterface* subscriber = subscriber_ptr.GetIfAvailable(); | 
 |         if (subscriber == nullptr) { | 
 |           QUICHE_BUG(subscriber_is_invalid) | 
 |               << "Subscriber WeakPtr is invalid but not removed from the set"; | 
 |           continue; | 
 |         } | 
 |         if (adding) { | 
 |           subscriber->PublishNamespace( | 
 |               track_namespace, | 
 |               [](const TrackNamespace&, std::optional<MoqtRequestError>) {}, | 
 |               // TODO(martinduke): Add parameters. | 
 |               VersionSpecificParameters()); | 
 |         } else { | 
 |           subscriber->PublishNamespaceDone(track_namespace); | 
 |         } | 
 |       } | 
 |     } while (mutable_namespace.PopElement()); | 
 |   } | 
 |  | 
 |   // If a node has no children, publishers, or subscribers, remove it and see | 
 |   // if the same applies to its parent. | 
 |   void MaybePrune(Node* node, TrackNamespace& track_namespace) { | 
 |     if (node == nullptr || !node->CanPrune()) { | 
 |       return; | 
 |     } | 
 |     Node* child = node;  // Save the pointer before erasing. | 
 |     nodes_.erase(track_namespace); | 
 |     // child is now gone, do not dereference! | 
 |     if (track_namespace.PopElement()) { | 
 |       Node* parent = FindNode(track_namespace); | 
 |       QUICHE_BUG_IF(quiche_bug_no_parent_namespace, parent == nullptr) | 
 |           << "Parent namespace not found for " << track_namespace; | 
 |       if (parent != nullptr) { | 
 |         parent->children.erase(child); | 
 |         MaybePrune(parent, track_namespace); | 
 |       } | 
 |     } | 
 |   } | 
 |  | 
 |   // A map that allows quick access to any namespace without traversing the | 
 |   // tree. Use unique_ptr so that it's pointer stable. | 
 |   absl::flat_hash_map<TrackNamespace, std::unique_ptr<Node>> nodes_; | 
 | }; | 
 |  | 
 | }  // namespace moqt | 
 |  | 
 | #endif  // QUICHE_QUIC_MOQT_RELAY_NAMESPACE_TREE_H_ |