// Copyright (c) 2024 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "quiche/quic/moqt/tools/chat_server.h"

#include <cstdint>
#include <iostream>
#include <memory>
#include <optional>
#include <string>
#include <utility>

#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "quiche/quic/core/crypto/proof_source.h"
#include "quiche/quic/moqt/moqt_live_relay_queue.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_outgoing_queue.h"
#include "quiche/quic/moqt/moqt_priority.h"
#include "quiche/quic/moqt/moqt_session.h"
#include "quiche/quic/moqt/tools/moq_chat.h"
#include "quiche/quic/moqt/tools/moqt_server.h"
#include "quiche/common/platform/api/quiche_mem_slice.h"
#include "quiche/common/quiche_buffer_allocator.h"

namespace moqt {

ChatServer::ChatServerSessionHandler::ChatServerSessionHandler(
    MoqtSession* session, ChatServer* server)
    : session_(session), server_(server) {
  session_->callbacks().incoming_announce_callback =
      [&](absl::string_view track_namespace) {
        std::cout << "Received ANNOUNCE for " << track_namespace << "\n";
        username_ =
            server_->strings().GetUsernameFromTrackNamespace(track_namespace);
        if (username_->empty()) {
          std::cout << "Malformed ANNOUNCE namespace\n";
          return std::nullopt;
        }
        session_->SubscribeCurrentGroup(track_namespace, "",
                                        server_->remote_track_visitor());
        server_->AddUser(*username_);
        return std::nullopt;
      };
  // TODO(martinduke): Add a callback for UNANNOUNCE that deletes the user and
  // clears username_, but keeps the handler.
  session_->callbacks().session_terminated_callback =
      [&](absl::string_view error_message) {
        std::cout << "Session terminated, reason = " << error_message << "\n";
        session_ = nullptr;
        server_->DeleteSession(it_);
      };
  session_->set_publisher(server_->publisher());
}

ChatServer::ChatServerSessionHandler::~ChatServerSessionHandler() {
  if (username_.has_value()) {
    server_->DeleteUser(*username_);
  }
}

ChatServer::RemoteTrackVisitor::RemoteTrackVisitor(ChatServer* server)
    : server_(server) {}

void ChatServer::RemoteTrackVisitor::OnReply(
    const moqt::FullTrackName& full_track_name,
    std::optional<absl::string_view> reason_phrase) {
  std::cout << "Subscription to user " << full_track_name.track_namespace
            << " ";
  if (reason_phrase.has_value()) {
    std::cout << "REJECTED, reason = " << *reason_phrase << "\n";
    std::string username =
        server_->strings().GetUsernameFromFullTrackName(full_track_name);
    if (!username.empty()) {
      std::cout << "Rejection was for malformed namespace\n";
      return;
    }
    server_->DeleteUser(username);
  } else {
    std::cout << "ACCEPTED\n";
  }
}

void ChatServer::RemoteTrackVisitor::OnObjectFragment(
    const moqt::FullTrackName& full_track_name, uint64_t group_sequence,
    uint64_t object_sequence, moqt::MoqtPriority /*publisher_priority*/,
    moqt::MoqtObjectStatus status,
    moqt::MoqtForwardingPreference /*forwarding_preference*/,
    absl::string_view object, bool end_of_message) {
  if (!end_of_message) {
    std::cerr << "Error: received partial message despite requesting "
                 "buffering\n";
  }
  std::string username =
      server_->strings().GetUsernameFromFullTrackName(full_track_name);
  if (username.empty()) {
    std::cout << "Received user message with malformed namespace\n";
    return;
  }
  auto it = server_->user_queues_.find(username);
  if (it == server_->user_queues_.end()) {
    std::cerr << "Error: received message for unknown user " << username
              << "\n";
    return;
  }
  if (status != MoqtObjectStatus::kNormal) {
    it->second->AddObject(group_sequence, object_sequence, status, "");
    return;
  }
  if (!server_->WriteToFile(username, object)) {
    std::cout << username << ": " << object << "\n\n";
  }
  it->second->AddObject(group_sequence, object_sequence, status, object);
}

ChatServer::ChatServer(std::unique_ptr<quic::ProofSource> proof_source,
                       absl::string_view chat_id, absl::string_view output_file)
    : server_(std::move(proof_source), std::move(incoming_session_callback_)),
      strings_(chat_id),
      catalog_(std::make_shared<MoqtOutgoingQueue>(
          strings_.GetCatalogName(), MoqtForwardingPreference::kGroup)),
      remote_track_visitor_(this) {
  catalog_->AddObject(quiche::QuicheMemSlice(quiche::QuicheBuffer::Copy(
                          &allocator_, MoqChatStrings::kCatalogHeader)),
                      /*key=*/true);
  publisher_.Add(catalog_);
  if (!output_file.empty()) {
    output_filename_ = output_file;
  }
  if (!output_filename_.empty()) {
    output_file_.open(output_filename_);
    output_file_ << "Chat transcript:\n";
    output_file_.flush();
  }
}

void ChatServer::AddUser(absl::string_view username) {
  std::string catalog_data = absl::StrCat("+", username);
  catalog_->AddObject(quiche::QuicheMemSlice(quiche::QuicheBuffer::Copy(
                          &allocator_, catalog_data)),
                      /*key=*/false);
  // Add a local track.
  user_queues_[username] = std::make_shared<MoqtLiveRelayQueue>(
      strings_.GetFullTrackNameFromUsername(username),
      MoqtForwardingPreference::kObject);
  publisher_.Add(user_queues_[username]);
}

void ChatServer::DeleteUser(absl::string_view username) {
  // Delete from Catalog.
  std::string catalog_data = absl::StrCat("-", username);
  catalog_->AddObject(quiche::QuicheMemSlice(quiche::QuicheBuffer::Copy(
                          &allocator_, catalog_data)),
                      /*key=*/false);
  user_queues_.erase(username);
}

bool ChatServer::WriteToFile(absl::string_view username,
                             absl::string_view message) {
  if (!output_filename_.empty()) {
    output_file_ << username << ": " << message << "\n\n";
    output_file_.flush();
    return true;
  }
  return false;
}

absl::StatusOr<MoqtConfigureSessionCallback> ChatServer::IncomingSessionHandler(
    absl::string_view path) {
  if (!strings_.IsValidPath(path)) {
    return absl::NotFoundError("Unknown endpoint; try \"/moq-chat\".");
  }
  return [this](MoqtSession* session) {
    sessions_.emplace_front(session, this);
    // Add a self-reference so it can delete itself from ChatServer::sessions_.
    sessions_.front().set_iterator(sessions_.cbegin());
  };
}

}  // namespace moqt
