blob: 504741867e259368c2444803794c7ceb48db3b5a [file] [log] [blame] [edit]
// Copyright 2025 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef QUICHE_QUIC_MOQT_RELAY_NAMESPACE_TREE_H_
#define QUICHE_QUIC_MOQT_RELAY_NAMESPACE_TREE_H_
#include <cstdint>
#include <memory>
#include <optional>
#include <string>
#include "absl/base/nullability.h"
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "absl/strings/string_view.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_session_interface.h"
#include "quiche/common/quiche_weak_ptr.h"
namespace moqt {
// A data structure for all namespaces an MOQT relay is aware of.
// For any given namespace, it stores all publishers, subscribers, and published
// tracks in that namespace.
// A subscriber must be notified of any publish in a child namespace, and a
// new PUBLISH(_NAMESPACE) has to find subscribers to parent namespaces.
// Therefore, this is a tree structure to easily and scalably move up and down
// the hierarchy to find parents or children.
class RelayNamespaceTree {
public:
// Adds a publisher to the namespace tree. The caller is responsible to call
// RemovePublisher if it goes away. |session| is stored as a WeakPtr.
void AddPublisher(const TrackNamespace& track_namespace,
MoqtSessionInterface* absl_nonnull session) {
Node* node = FindOrCreateNode(track_namespace);
if (node->publishers.empty()) {
NotifyAllParents(track_namespace, /*adding=*/true);
}
node->publishers.emplace(session->GetWeakPtr());
}
void RemovePublisher(const TrackNamespace& track_namespace,
MoqtSessionInterface* absl_nonnull session) {
Node* node = FindNode(track_namespace);
if (node == nullptr) {
return;
}
node->publishers.erase(session->GetWeakPtr());
// Tell all the namespace listeners.
if (node->publishers.empty()) {
TrackNamespace mutable_namespace = track_namespace;
NotifyAllParents(track_namespace, /*adding=*/false);
MaybePrune(node, mutable_namespace);
}
}
// The caller is responsible to call RemoveNamespaceListener if it goes away.
// Thus, it is safe to store it as a raw pointer.
void AddSubscriber(const TrackNamespace& track_namespace,
MoqtSessionInterface* absl_nonnull subscriber) {
Node* node = FindOrCreateNode(track_namespace);
node->subscribers.insert(subscriber->GetWeakPtr());
// Notify the listener of every published namespace and track in this
// namespace.
TrackNamespace mutable_namespace = track_namespace;
NotifyOfAllChildren(node, mutable_namespace, subscriber);
}
void RemoveSubscriber(const TrackNamespace& track_namespace,
MoqtSessionInterface* absl_nonnull subscriber) {
Node* node = FindNode(track_namespace);
if (node == nullptr) {
return;
}
node->subscribers.erase(subscriber->GetWeakPtr());
TrackNamespace mutable_namespace = track_namespace;
MaybePrune(node, mutable_namespace);
}
// Returns a raw pointer to the session that publishes the smallest namespace
// that contains |track_namespace|. If a WeakPtr is found to be invalid,
// deletes them from the tree.
MoqtSessionInterface* GetValidPublisher(
const TrackNamespace& track_namespace) const {
Node* node = FindNode(track_namespace);
TrackNamespace mutable_namespace = track_namespace;
while ((node == nullptr || node->publishers.empty()) &&
mutable_namespace.PopElement()) {
node = FindNode(mutable_namespace);
}
if (node == nullptr || node->publishers.empty()) {
return nullptr;
}
MoqtSessionInterface* upstream = node->publishers.begin()->GetIfAvailable();
if (!upstream) {
QUICHE_BUG(publisher_is_invalid)
<< "Publisher WeakPtr is invalid but not removed from the set";
return nullptr;
}
return upstream;
}
protected:
uint64_t NumNamespaces() const { return nodes_.size(); }
private:
struct Node {
explicit Node(absl::string_view element) : element(element) {}
const std::string element;
absl::flat_hash_set<Node*> children;
// Publishers of this namespace.
absl::flat_hash_set<quiche::QuicheWeakPtr<MoqtSessionInterface>> publishers;
// Just store the track name. Additional information will be in the
// TrackPublisher.
absl::flat_hash_set<std::string> published_tracks;
absl::flat_hash_set<quiche::QuicheWeakPtr<MoqtSessionInterface>>
subscribers;
bool CanPrune() const {
return children.empty() && publishers.empty() &&
published_tracks.empty() && subscribers.empty();
}
};
Node* FindNode(const TrackNamespace& track_namespace) const {
auto it = nodes_.find(track_namespace);
if (it == nodes_.end()) {
return nullptr;
}
return it->second.get();
}
Node* FindOrCreateNode(const TrackNamespace& track_namespace) {
auto [it, inserted] =
nodes_.emplace(track_namespace,
std::make_unique<Node>(track_namespace.tuple().back()));
if (!inserted) {
return it->second.get();
}
Node* node = it->second.get(); // store it in case it moves.
TrackNamespace mutable_namespace = track_namespace;
if (mutable_namespace.PopElement()) {
Node* parent = FindOrCreateNode(mutable_namespace);
parent->children.insert(node);
}
return node;
}
// Recursive function to notify |listener| of all published namespaces and
// tracks in and below |node|.
void NotifyOfAllChildren(Node* node, TrackNamespace& track_namespace,
MoqtSessionInterface* subscriber) {
// TODO(martinduke): Publish everything in node->published_tracks.
if (!node->publishers.empty()) {
subscriber->PublishNamespace(
track_namespace,
[](const TrackNamespace&, std::optional<MoqtRequestError>) {},
// TODO(martinduke): Add parameters.
VersionSpecificParameters());
}
for (auto child = node->children.begin(); child != node->children.end();
++child) {
track_namespace.AddElement((*child)->element);
NotifyOfAllChildren(*child, track_namespace, subscriber);
track_namespace.PopElement();
}
}
// If |adding| is true, sends PUBLISH_NAMESPACE to all subscribers to a
// parent namespace. If |adding| is false, sends PUBLISH_NAMESPACE_DONE.
void NotifyAllParents(const TrackNamespace& track_namespace, bool adding) {
TrackNamespace mutable_namespace = track_namespace;
do {
Node* node = FindNode(mutable_namespace);
if (node == nullptr) {
continue;
}
for (const quiche::QuicheWeakPtr<MoqtSessionInterface>& subscriber_ptr :
node->subscribers) {
MoqtSessionInterface* subscriber = subscriber_ptr.GetIfAvailable();
if (subscriber == nullptr) {
QUICHE_BUG(subscriber_is_invalid)
<< "Subscriber WeakPtr is invalid but not removed from the set";
continue;
}
if (adding) {
subscriber->PublishNamespace(
track_namespace,
[](const TrackNamespace&, std::optional<MoqtRequestError>) {},
// TODO(martinduke): Add parameters.
VersionSpecificParameters());
} else {
subscriber->PublishNamespaceDone(track_namespace);
}
}
} while (mutable_namespace.PopElement());
}
// If a node has no children, publishers, or subscribers, remove it and see
// if the same applies to its parent.
void MaybePrune(Node* node, TrackNamespace& track_namespace) {
if (node == nullptr || !node->CanPrune()) {
return;
}
Node* child = node; // Save the pointer before erasing.
nodes_.erase(track_namespace);
// child is now gone, do not dereference!
if (track_namespace.PopElement()) {
Node* parent = FindNode(track_namespace);
QUICHE_BUG_IF(quiche_bug_no_parent_namespace, parent == nullptr)
<< "Parent namespace not found for " << track_namespace;
if (parent != nullptr) {
parent->children.erase(child);
MaybePrune(parent, track_namespace);
}
}
}
// A map that allows quick access to any namespace without traversing the
// tree. Use unique_ptr so that it's pointer stable.
absl::flat_hash_map<TrackNamespace, std::unique_ptr<Node>> nodes_;
};
} // namespace moqt
#endif // QUICHE_QUIC_MOQT_RELAY_NAMESPACE_TREE_H_