blob: 7e5fb7faa0b6e59207c6d995750c2e666d5650a0 [file] [log] [blame]
// Copyright 2023 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef QUICHE_QUIC_MOQT_MOQT_SESSION_H_
#define QUICHE_QUIC_MOQT_MOQT_SESSION_H_
#include <cstdint>
#include <optional>
#include <string>
#include <utility>
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "absl/strings/string_view.h"
#include "quiche/quic/core/quic_types.h"
#include "quiche/quic/moqt/moqt_framer.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_parser.h"
#include "quiche/quic/moqt/moqt_track.h"
#include "quiche/common/platform/api/quiche_export.h"
#include "quiche/common/quiche_buffer_allocator.h"
#include "quiche/common/quiche_callbacks.h"
#include "quiche/common/simple_buffer_allocator.h"
#include "quiche/web_transport/web_transport.h"
namespace moqt {
namespace test {
class MoqtSessionPeer;
}
using MoqtSessionEstablishedCallback = quiche::SingleUseCallback<void()>;
using MoqtSessionTerminatedCallback =
quiche::SingleUseCallback<void(absl::string_view error_message)>;
using MoqtSessionDeletedCallback = quiche::SingleUseCallback<void()>;
// If |error_message| is nullopt, the ANNOUNCE was successful.
using MoqtOutgoingAnnounceCallback = quiche::SingleUseCallback<void(
absl::string_view track_namespace,
std::optional<MoqtAnnounceErrorReason> error)>;
using MoqtIncomingAnnounceCallback =
quiche::MultiUseCallback<std::optional<MoqtAnnounceErrorReason>(
absl::string_view track_namespace)>;
inline std::optional<MoqtAnnounceErrorReason> DefaultIncomingAnnounceCallback(
absl::string_view /*track_namespace*/) {
return std::optional(MoqtAnnounceErrorReason{
MoqtAnnounceErrorCode::kAnnounceNotSupported,
"This endpoint does not accept incoming ANNOUNCE messages"});
};
// Callbacks for session-level events.
struct MoqtSessionCallbacks {
MoqtSessionEstablishedCallback session_established_callback = +[] {};
MoqtSessionTerminatedCallback session_terminated_callback =
+[](absl::string_view) {};
MoqtSessionDeletedCallback session_deleted_callback = +[] {};
MoqtIncomingAnnounceCallback incoming_announce_callback =
DefaultIncomingAnnounceCallback;
};
class QUICHE_EXPORT MoqtSession : public webtransport::SessionVisitor {
public:
MoqtSession(webtransport::Session* session, MoqtSessionParameters parameters,
MoqtSessionCallbacks callbacks = MoqtSessionCallbacks())
: session_(session),
parameters_(parameters),
callbacks_(std::move(callbacks)),
framer_(quiche::SimpleBufferAllocator::Get(),
parameters.using_webtrans) {}
~MoqtSession() { std::move(callbacks_.session_deleted_callback)(); }
// webtransport::SessionVisitor implementation.
void OnSessionReady() override;
void OnSessionClosed(webtransport::SessionErrorCode,
const std::string&) override;
void OnIncomingBidirectionalStreamAvailable() override;
void OnIncomingUnidirectionalStreamAvailable() override;
void OnDatagramReceived(absl::string_view datagram) override;
void OnCanCreateNewOutgoingBidirectionalStream() override {}
void OnCanCreateNewOutgoingUnidirectionalStream() override {}
void Error(MoqtError code, absl::string_view error);
quic::Perspective perspective() const { return parameters_.perspective; }
// Add to the list of tracks that can be subscribed to. Call this before
// Announce() so that subscriptions can be processed correctly. If |visitor|
// is nullptr, then incoming SUBSCRIBE for objects in the path will receive
// SUBSCRIBE_OK, but never actually get the objects.
void AddLocalTrack(const FullTrackName& full_track_name,
MoqtForwardingPreference forwarding_preference,
LocalTrack::Visitor* visitor);
// Send an ANNOUNCE message for |track_namespace|, and call
// |announce_callback| when the response arrives. Will fail immediately if
// there is already an unresolved ANNOUNCE for that namespace.
void Announce(absl::string_view track_namespace,
MoqtOutgoingAnnounceCallback announce_callback);
bool HasSubscribers(const FullTrackName& full_track_name) const;
// Returns true if SUBSCRIBE was sent. If there is already a subscription to
// the track, the message will still be sent. However, the visitor will be
// ignored.
// Subscribe from (start_group, start_object) to the end of the track.
bool SubscribeAbsolute(absl::string_view track_namespace,
absl::string_view name, uint64_t start_group,
uint64_t start_object, RemoteTrack::Visitor* visitor,
absl::string_view auth_info = "");
// Subscribe from (start_group, start_object) to the end of end_group.
bool SubscribeAbsolute(absl::string_view track_namespace,
absl::string_view name, uint64_t start_group,
uint64_t start_object, uint64_t end_group,
RemoteTrack::Visitor* visitor,
absl::string_view auth_info = "");
// Subscribe from (start_group, start_object) to (end_group, end_object).
bool SubscribeAbsolute(absl::string_view track_namespace,
absl::string_view name, uint64_t start_group,
uint64_t start_object, uint64_t end_group,
uint64_t end_object, RemoteTrack::Visitor* visitor,
absl::string_view auth_info = "");
bool SubscribeCurrentObject(absl::string_view track_namespace,
absl::string_view name,
RemoteTrack::Visitor* visitor,
absl::string_view auth_info = "");
bool SubscribeCurrentGroup(absl::string_view track_namespace,
absl::string_view name,
RemoteTrack::Visitor* visitor,
absl::string_view auth_info = "");
// Returns true if SUBSCRIBE_DONE was sent.
bool SubscribeIsDone(uint64_t subscribe_id, SubscribeDoneCode code,
absl::string_view reason_phrase);
// Returns false if it could not open a stream when necessary, or if the
// track does not exist (there was no call to AddLocalTrack). Will still
// return false is some streams succeed.
// Also returns false if |payload_length| exists but is shorter than
// |payload|.
// |payload.length() >= |payload_length|, because the application can deliver
// partial objects.
bool PublishObject(const FullTrackName& full_track_name, uint64_t group_id,
uint64_t object_id, uint64_t object_send_order,
absl::string_view payload, bool end_of_stream);
void CloseObjectStream(const FullTrackName& full_track_name,
uint64_t group_id);
// TODO: Add an API to send partial objects.
MoqtSessionCallbacks& callbacks() { return callbacks_; }
private:
friend class test::MoqtSessionPeer;
class QUICHE_EXPORT Stream : public webtransport::StreamVisitor,
public MoqtParserVisitor {
public:
Stream(MoqtSession* session, webtransport::Stream* stream)
: session_(session),
stream_(stream),
parser_(session->parameters_.using_webtrans, *this) {}
Stream(MoqtSession* session, webtransport::Stream* stream,
bool is_control_stream)
: session_(session),
stream_(stream),
parser_(session->parameters_.using_webtrans, *this),
is_control_stream_(is_control_stream) {}
// webtransport::StreamVisitor implementation.
void OnCanRead() override;
void OnCanWrite() override;
void OnResetStreamReceived(webtransport::StreamErrorCode error) override;
void OnStopSendingReceived(webtransport::StreamErrorCode error) override;
void OnWriteSideInDataRecvdState() override {}
// MoqtParserVisitor implementation.
// TODO: Handle a stream FIN.
void OnObjectMessage(const MoqtObject& message, absl::string_view payload,
bool end_of_message) override;
void OnClientSetupMessage(const MoqtClientSetup& message) override;
void OnServerSetupMessage(const MoqtServerSetup& message) override;
void OnSubscribeMessage(const MoqtSubscribe& message) override;
void OnSubscribeOkMessage(const MoqtSubscribeOk& message) override;
void OnSubscribeErrorMessage(const MoqtSubscribeError& message) override;
void OnUnsubscribeMessage(const MoqtUnsubscribe& message) override;
void OnSubscribeDoneMessage(const MoqtSubscribeDone& /*message*/) override {
}
void OnSubscribeUpdateMessage(const MoqtSubscribeUpdate& message) override;
void OnAnnounceMessage(const MoqtAnnounce& message) override;
void OnAnnounceOkMessage(const MoqtAnnounceOk& message) override;
void OnAnnounceErrorMessage(const MoqtAnnounceError& message) override;
void OnAnnounceCancelMessage(const MoqtAnnounceCancel& message) override {};
void OnTrackStatusRequestMessage(
const MoqtTrackStatusRequest& message) override {};
void OnUnannounceMessage(const MoqtUnannounce& /*message*/) override {}
void OnTrackStatusMessage(const MoqtTrackStatus& message) override {}
void OnGoAwayMessage(const MoqtGoAway& /*message*/) override {}
void OnParsingError(MoqtError error_code,
absl::string_view reason) override;
quic::Perspective perspective() const {
return session_->parameters_.perspective;
}
webtransport::Stream* stream() const { return stream_; }
// Sends a control message, or buffers it if there is insufficient flow
// control credit.
void SendOrBufferMessage(quiche::QuicheBuffer message, bool fin = false);
private:
friend class test::MoqtSessionPeer;
void SendSubscribeError(const MoqtSubscribe& message,
SubscribeErrorCode error_code,
absl::string_view reason_phrase,
uint64_t track_alias);
bool CheckIfIsControlStream();
MoqtSession* session_;
webtransport::Stream* stream_;
MoqtParser parser_;
// nullopt means "incoming stream, and we don't know if it's the control
// stream or a data stream yet".
std::optional<bool> is_control_stream_;
std::string partial_object_;
};
// Returns the pointer to the control stream, or nullptr if none is present.
Stream* GetControlStream();
// Sends a message on the control stream; QUICHE_DCHECKs if no control stream
// is present.
void SendControlMessage(quiche::QuicheBuffer message);
// Returns false if the SUBSCRIBE isn't sent.
bool Subscribe(MoqtSubscribe& message, RemoteTrack::Visitor* visitor);
// Returns the stream ID if successful, nullopt if not.
// TODO: Add a callback if stream creation is delayed.
std::optional<webtransport::StreamId> OpenUnidirectionalStream();
// Get FullTrackName and visitor for a subscribe_id and track_alias. Returns
// nullptr if not present.
std::pair<FullTrackName, RemoteTrack::Visitor*> TrackPropertiesFromAlias(
const MoqtObject& message);
webtransport::Session* session_;
MoqtSessionParameters parameters_;
MoqtSessionCallbacks callbacks_;
MoqtFramer framer_;
std::optional<webtransport::StreamId> control_stream_;
std::string error_;
// All the tracks the session is subscribed to, indexed by track_alias.
// Multiple subscribes to the same track are recorded in a single
// subscription.
absl::flat_hash_map<uint64_t, RemoteTrack> remote_tracks_;
// Look up aliases for remote tracks by name
absl::flat_hash_map<FullTrackName, uint64_t> remote_track_aliases_;
uint64_t next_remote_track_alias_ = 0;
// All the tracks the peer can subscribe to.
absl::flat_hash_map<FullTrackName, LocalTrack> local_tracks_;
absl::flat_hash_map<uint64_t, FullTrackName> local_track_by_subscribe_id_;
// This is only used to check for track_alias collisions.
absl::flat_hash_set<uint64_t> used_track_aliases_;
uint64_t next_local_track_alias_ = 0;
// Indexed by subscribe_id.
struct ActiveSubscribe {
MoqtSubscribe message;
RemoteTrack::Visitor* visitor;
// The forwarding preference of the first received object, which all
// subsequent objects must match.
std::optional<MoqtForwardingPreference> forwarding_preference;
// If true, an object has arrived for the subscription before SUBSCRIBE_OK
// arrived.
bool received_object = false;
};
// Outgoing SUBSCRIBEs that have not received SUBSCRIBE_OK or SUBSCRIBE_ERROR.
absl::flat_hash_map<uint64_t, ActiveSubscribe> active_subscribes_;
uint64_t next_subscribe_id_ = 0;
// Indexed by track namespace.
absl::flat_hash_map<std::string, MoqtOutgoingAnnounceCallback>
pending_outgoing_announces_;
// The role the peer advertised in its SETUP message. Initialize it to avoid
// an uninitialized value if no SETUP arrives or it arrives with no Role
// parameter, and other checks have changed/been disabled.
MoqtRole peer_role_ = MoqtRole::kPubSub;
};
} // namespace moqt
#endif // QUICHE_QUIC_MOQT_MOQT_SESSION_H_