blob: f984a7130aeb1ad9aa2e304a0baae2a0f8770819 [file] [log] [blame]
// Copyright (c) 2024 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "quiche/quic/moqt/tools/chat_server.h"
#include <cstdint>
#include <iostream>
#include <memory>
#include <optional>
#include <string>
#include <utility>
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "quiche/quic/core/crypto/proof_source.h"
#include "quiche/quic/moqt/moqt_live_relay_queue.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_outgoing_queue.h"
#include "quiche/quic/moqt/moqt_priority.h"
#include "quiche/quic/moqt/moqt_session.h"
#include "quiche/quic/moqt/tools/moq_chat.h"
#include "quiche/quic/moqt/tools/moqt_server.h"
#include "quiche/common/platform/api/quiche_mem_slice.h"
#include "quiche/common/quiche_buffer_allocator.h"
namespace moqt {
ChatServer::ChatServerSessionHandler::ChatServerSessionHandler(
MoqtSession* session, ChatServer* server)
: session_(session), server_(server) {
session_->callbacks().incoming_announce_callback =
[&](absl::string_view track_namespace) {
std::cout << "Received ANNOUNCE for " << track_namespace << "\n";
username_ =
server_->strings().GetUsernameFromTrackNamespace(track_namespace);
if (username_->empty()) {
std::cout << "Malformed ANNOUNCE namespace\n";
return std::nullopt;
}
session_->SubscribeCurrentGroup(track_namespace, "",
server_->remote_track_visitor());
server_->AddUser(*username_);
return std::nullopt;
};
// TODO(martinduke): Add a callback for UNANNOUNCE that deletes the user and
// clears username_, but keeps the handler.
session_->callbacks().session_terminated_callback =
[&](absl::string_view error_message) {
std::cout << "Session terminated, reason = " << error_message << "\n";
session_ = nullptr;
server_->DeleteSession(it_);
};
session_->set_publisher(server_->publisher());
}
ChatServer::ChatServerSessionHandler::~ChatServerSessionHandler() {
if (username_.has_value()) {
server_->DeleteUser(*username_);
}
}
ChatServer::RemoteTrackVisitor::RemoteTrackVisitor(ChatServer* server)
: server_(server) {}
void ChatServer::RemoteTrackVisitor::OnReply(
const moqt::FullTrackName& full_track_name,
std::optional<absl::string_view> reason_phrase) {
std::cout << "Subscription to user " << full_track_name.track_namespace
<< " ";
if (reason_phrase.has_value()) {
std::cout << "REJECTED, reason = " << *reason_phrase << "\n";
std::string username =
server_->strings().GetUsernameFromFullTrackName(full_track_name);
if (!username.empty()) {
std::cout << "Rejection was for malformed namespace\n";
return;
}
server_->DeleteUser(username);
} else {
std::cout << "ACCEPTED\n";
}
}
void ChatServer::RemoteTrackVisitor::OnObjectFragment(
const moqt::FullTrackName& full_track_name, uint64_t group_sequence,
uint64_t object_sequence, moqt::MoqtPriority /*publisher_priority*/,
moqt::MoqtObjectStatus status,
moqt::MoqtForwardingPreference /*forwarding_preference*/,
absl::string_view object, bool end_of_message) {
if (!end_of_message) {
std::cerr << "Error: received partial message despite requesting "
"buffering\n";
}
std::string username =
server_->strings().GetUsernameFromFullTrackName(full_track_name);
if (username.empty()) {
std::cout << "Received user message with malformed namespace\n";
return;
}
auto it = server_->user_queues_.find(username);
if (it == server_->user_queues_.end()) {
std::cerr << "Error: received message for unknown user " << username
<< "\n";
return;
}
if (status != MoqtObjectStatus::kNormal) {
it->second->AddObject(group_sequence, object_sequence, status, "");
return;
}
if (!server_->WriteToFile(username, object)) {
std::cout << username << ": " << object << "\n\n";
}
it->second->AddObject(group_sequence, object_sequence, status, object);
}
ChatServer::ChatServer(std::unique_ptr<quic::ProofSource> proof_source,
absl::string_view chat_id, absl::string_view output_file)
: server_(std::move(proof_source), std::move(incoming_session_callback_)),
strings_(chat_id),
catalog_(std::make_shared<MoqtOutgoingQueue>(
strings_.GetCatalogName(), MoqtForwardingPreference::kGroup)),
remote_track_visitor_(this) {
catalog_->AddObject(quiche::QuicheMemSlice(quiche::QuicheBuffer::Copy(
&allocator_, MoqChatStrings::kCatalogHeader)),
/*key=*/true);
publisher_.Add(catalog_);
if (!output_file.empty()) {
output_filename_ = output_file;
}
if (!output_filename_.empty()) {
output_file_.open(output_filename_);
output_file_ << "Chat transcript:\n";
output_file_.flush();
}
}
void ChatServer::AddUser(absl::string_view username) {
std::string catalog_data = absl::StrCat("+", username);
catalog_->AddObject(quiche::QuicheMemSlice(quiche::QuicheBuffer::Copy(
&allocator_, catalog_data)),
/*key=*/false);
// Add a local track.
user_queues_[username] = std::make_shared<MoqtLiveRelayQueue>(
strings_.GetFullTrackNameFromUsername(username),
MoqtForwardingPreference::kObject);
publisher_.Add(user_queues_[username]);
}
void ChatServer::DeleteUser(absl::string_view username) {
// Delete from Catalog.
std::string catalog_data = absl::StrCat("-", username);
catalog_->AddObject(quiche::QuicheMemSlice(quiche::QuicheBuffer::Copy(
&allocator_, catalog_data)),
/*key=*/false);
user_queues_.erase(username);
}
bool ChatServer::WriteToFile(absl::string_view username,
absl::string_view message) {
if (!output_filename_.empty()) {
output_file_ << username << ": " << message << "\n\n";
output_file_.flush();
return true;
}
return false;
}
absl::StatusOr<MoqtConfigureSessionCallback> ChatServer::IncomingSessionHandler(
absl::string_view path) {
if (!strings_.IsValidPath(path)) {
return absl::NotFoundError("Unknown endpoint; try \"/moq-chat\".");
}
return [this](MoqtSession* session) {
sessions_.emplace_front(session, this);
// Add a self-reference so it can delete itself from ChatServer::sessions_.
sessions_.front().set_iterator(sessions_.cbegin());
};
}
} // namespace moqt