blob: e2c341fa0c3b736eb61e8ec7f51dbd254e182f59 [file] [log] [blame]
// Copyright (c) 2024 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef QUICHE_QUIC_MOQT_TOOLS_CHAT_CLIENT_H
#define QUICHE_QUIC_MOQT_TOOLS_CHAT_CLIENT_H
#include <cstdint>
#include <memory>
#include <optional>
#include <string>
#include "absl/container/flat_hash_map.h"
#include "absl/strings/string_view.h"
#include "quiche/quic/core/io/quic_event_loop.h"
#include "quiche/quic/core/quic_server_id.h"
#include "quiche/quic/core/quic_time.h"
#include "quiche/quic/moqt/moqt_known_track_publisher.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_outgoing_queue.h"
#include "quiche/quic/moqt/moqt_priority.h"
#include "quiche/quic/moqt/moqt_session.h"
#include "quiche/quic/moqt/moqt_track.h"
#include "quiche/quic/moqt/tools/moq_chat.h"
#include "quiche/quic/moqt/tools/moqt_client.h"
#include "quiche/common/platform/api/quiche_export.h"
#include "quiche/common/quiche_callbacks.h"
namespace moqt {
constexpr quic::QuicTime::Delta kChatEventLoopDuration =
quic::QuicTime::Delta::FromMilliseconds(500);
// Chat clients accept a ChatUserInterface that implements how user input is
// captured, and peer messages are displayed.
class ChatUserInterface {
public:
virtual ~ChatUserInterface() = default;
// ChatUserInterface cannot be used until initialized. This is separate from
// the constructor, because the constructor might create the event loop.
// |callback| is what ChatUserInterface will call when there is user input.
// |event_loop| is the event loop that the ChatUserInterface should use.
virtual void Initialize(
quiche::MultiUseCallback<void(absl::string_view)> callback,
quic::QuicEventLoop* event_loop) = 0;
// Write a peer message to the user output.
virtual void WriteToOutput(absl::string_view user,
absl::string_view message) = 0;
// Run the event loop for a short interval and exit.
virtual void IoLoop() = 0;
};
class ChatClient {
public:
// If |event_loop| is nullptr, a new one will be created. If multiple
// endpoints are running on the same thread, as in tests, they should share
// an event loop.
ChatClient(const quic::QuicServerId& server_id, bool ignore_certificate,
std::unique_ptr<ChatUserInterface> interface,
quic::QuicEventLoop* event_loop = nullptr);
~ChatClient() {
if (session_ != nullptr) {
session_->Close();
session_ = nullptr;
}
}
// Establish the MoQT session. Returns false if it fails.
bool Connect(absl::string_view path, absl::string_view username,
absl::string_view chat_id);
void OnTerminalLineInput(absl::string_view input_message);
// Run the event loop until an input or output event is ready, or the
// session closes.
void IoLoop() {
while (session_is_open_) {
interface_->IoLoop();
}
}
void WriteToOutput(absl::string_view user, absl::string_view message) {
if (interface_ != nullptr) {
interface_->WriteToOutput(user, message);
}
}
quic::QuicEventLoop* event_loop() { return event_loop_; }
class QUICHE_EXPORT RemoteTrackVisitor
: public moqt::SubscribeRemoteTrack::Visitor {
public:
RemoteTrackVisitor(ChatClient* client) : client_(client) {}
void OnReply(const moqt::FullTrackName& full_track_name,
std::optional<FullSequence> largest_id,
std::optional<absl::string_view> reason_phrase) override;
void OnCanAckObjects(MoqtObjectAckFunction) override {}
void OnObjectFragment(const moqt::FullTrackName& full_track_name,
FullSequence sequence,
moqt::MoqtPriority publisher_priority,
moqt::MoqtObjectStatus status,
absl::string_view object,
bool end_of_message) override;
private:
ChatClient* client_;
};
// Returns false on error.
bool AnnounceAndSubscribe();
bool session_is_open() const { return session_is_open_; }
// Returns true if the client is still doing initial sync: retrieving the
// catalog, subscribing to all the users in it, and waiting for the server
// to subscribe to the local track.
bool is_syncing() const {
return !catalog_group_.has_value() || subscribes_to_make_ > 0 ||
(queue_ == nullptr || !queue_->HasSubscribers());
}
private:
void RunEventLoop() { event_loop_->RunEventLoopOnce(kChatEventLoopDuration); }
// Objects from the same catalog group arrive on the same stream, and in
// object sequence order.
void ProcessCatalog(absl::string_view object,
moqt::SubscribeRemoteTrack::Visitor* visitor,
uint64_t group_sequence, uint64_t object_sequence);
struct ChatUser {
moqt::FullTrackName full_track_name;
uint64_t from_group;
ChatUser(const moqt::FullTrackName& ftn, uint64_t group)
: full_track_name(ftn), from_group(group) {}
};
// Basic session information
std::string username_;
std::optional<moqt::MoqChatStrings> chat_strings_;
// General state variables
// The event loop to use for this client.
quic::QuicEventLoop* event_loop_;
// If the client created its own event loop, it will own it.
std::unique_ptr<quic::QuicEventLoop> local_event_loop_;
bool connect_failed_ = false;
bool session_is_open_ = false;
moqt::MoqtSession* session_ = nullptr;
moqt::MoqtKnownTrackPublisher publisher_;
std::unique_ptr<moqt::MoqtClient> client_;
moqt::MoqtSessionCallbacks session_callbacks_;
// Related to syncing.
std::optional<uint64_t> catalog_group_;
absl::flat_hash_map<std::string, ChatUser> other_users_;
int subscribes_to_make_ = 1;
// Related to subscriptions/announces
// TODO: One for each subscribe
std::unique_ptr<RemoteTrackVisitor> remote_track_visitor_;
// Handling outgoing messages
std::shared_ptr<moqt::MoqtOutgoingQueue> queue_;
// User interface for input and output.
std::unique_ptr<ChatUserInterface> interface_;
};
} // namespace moqt
#endif // QUICHE_QUIC_MOQT_TOOLS_CHAT_CLIENT_H