blob: 919a4d99282e50a46823145bda549e266baac302 [file] [log] [blame]
// Copyright (c) 2026 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef QUICHE_QUIC_MOQT_MOQT_NAMESPACE_STREAM_H_
#define QUICHE_QUIC_MOQT_MOQT_NAMESPACE_STREAM_H_
#include <cstddef>
#include <cstdint>
#include <memory>
#include <optional>
#include <utility>
#include "absl/base/nullability.h"
#include "absl/container/flat_hash_set.h"
#include "quiche/quic/moqt/moqt_bidi_stream.h"
#include "quiche/quic/moqt/moqt_fetch_task.h"
#include "quiche/quic/moqt/moqt_framer.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_names.h"
#include "quiche/quic/moqt/moqt_session_callbacks.h"
#include "quiche/quic/moqt/session_namespace_tree.h"
#include "quiche/common/quiche_circular_deque.h"
#include "quiche/common/quiche_weak_ptr.h"
#include "quiche/web_transport/web_transport.h"
namespace moqt {
// This class will be owned by the webtransport stream.
class MoqtNamespaceSubscriberStream : public MoqtBidiStreamBase {
public:
// Assumes the caller will send or queue the SUBSCRIBE_NAMESPACE.
MoqtNamespaceSubscriberStream(
MoqtFramer* framer, uint64_t request_id,
BidiStreamDeletedCallback stream_deleted_callback,
SessionErrorCallback session_error_callback,
MoqtResponseCallback response_callback)
: MoqtBidiStreamBase(framer, std::move(stream_deleted_callback),
std::move(session_error_callback)),
request_id_(request_id),
response_callback_(std::move(response_callback)) {}
~MoqtNamespaceSubscriberStream() override;
// MoqtBidiStreamBase overrides.
void set_stream(webtransport::Stream* absl_nonnull stream) override;
void OnRequestOkMessage(const MoqtRequestOk& message) override;
void OnRequestErrorMessage(const MoqtRequestError& message) override;
void OnNamespaceMessage(const MoqtNamespace& message) override;
void OnNamespaceDoneMessage(const MoqtNamespaceDone& message) override;
// Send the prefix now so it is only stored in one place (the task).
std::unique_ptr<MoqtNamespaceTask> CreateTask(const TrackNamespace& prefix,
ObjectsAvailableCallback
absl_nonnull callback);
private:
// The class that will be passed to the application to consume namespace
// information. Owned by the application.
class NamespaceTask : public MoqtNamespaceTask {
public:
NamespaceTask(MoqtNamespaceSubscriberStream* absl_nonnull state,
const TrackNamespace& prefix,
ObjectsAvailableCallback absl_nonnull callback)
: MoqtNamespaceTask(),
prefix_(prefix),
state_(state),
callback_(std::move(callback)),
weak_ptr_factory_(this) {}
~NamespaceTask() override;
// MoqtNamespaceTask methods. A return value of kEof implies
// NAMESPACE_DONE for all outstanding namespaces.
GetNextResult GetNextSuffix(TrackNamespace& suffix,
TransactionType& type) override;
std::optional<webtransport::StreamErrorCode> GetStatus() override {
return error_;
}
const TrackNamespace& prefix() override { return prefix_; }
// Queues a suffix corresponding to a NAMESPACE (if |type| is kAdd) or a
// NAMESPACE_DONE (if |type| is kDelete).
void AddPendingSuffix(TrackNamespace suffix, TransactionType type);
// The stream is closed, so no more NAMESPACE messages are forthcoming.
// This is an implicit NAMESPACE_DONE for all published namespaces.
void DeclareEof();
quiche::QuicheWeakPtr<NamespaceTask> GetWeakPtr() {
return weak_ptr_factory_.Create();
}
private:
struct PendingSuffix {
TrackNamespace suffix;
TransactionType type;
};
static constexpr size_t kMaxPendingSuffixes = 100;
const TrackNamespace prefix_;
// Must be nonnull initially, will be nullptr if the stream is closed.
MoqtNamespaceSubscriberStream* state_;
quiche::QuicheCircularDeque<PendingSuffix> pending_suffixes_;
ObjectsAvailableCallback callback_;
std::optional<webtransport::StreamErrorCode> error_;
bool eof_ = false;
// Must be last.
quiche::QuicheWeakPtrFactory<NamespaceTask> weak_ptr_factory_;
};
const uint64_t request_id_;
MoqtResponseCallback response_callback_;
absl::flat_hash_set<TrackNamespace> published_suffixes_;
quiche::QuicheWeakPtr<NamespaceTask> task_;
};
class MoqtNamespacePublisherStream : public MoqtBidiStreamBase {
public:
// Constructor for the publisher side.
MoqtNamespacePublisherStream(
MoqtFramer* framer, webtransport::Stream* stream,
SessionErrorCallback session_error_callback, SessionNamespaceTree& tree,
MoqtIncomingSubscribeNamespaceCallbackNew& application);
~MoqtNamespacePublisherStream() override;
// MoqtBidiStreamBase overrides.
void OnSubscribeNamespaceMessage(
const MoqtSubscribeNamespace& message) override;
// TODO(martinduke): Implement this.
void OnSubscribeUpdateMessage(const MoqtSubscribeUpdate& message) override {
QUICHE_DLOG(INFO) << "Got SUBSCRIBE_UPDATE on Namespace stream";
}
private:
void ProcessNamespaces();
uint64_t request_id_;
SessionNamespaceTree& tree_;
MoqtIncomingSubscribeNamespaceCallbackNew& application_;
std::unique_ptr<MoqtNamespaceTask> task_;
absl::flat_hash_set<TrackNamespace> published_suffixes_;
};
} // namespace moqt
#endif // QUICHE_QUIC_MOQT_MOQT_NAMESPACE_STREAM_H_