// Copyright 2024 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

// moqt_ingestion_server is a simple command-line utility that accepts incoming
// ANNOUNCE messages and records them into a file.

#include <sys/stat.h>

#include <cerrno>
#include <cstdint>
#include <fstream>
#include <ios>
#include <memory>
#include <optional>
#include <string>
#include <utility>
#include <vector>

#include "absl/algorithm/container.h"
#include "absl/container/node_hash_map.h"
#include "absl/functional/bind_front.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/ascii.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_split.h"
#include "absl/strings/string_view.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_priority.h"
#include "quiche/quic/moqt/moqt_session.h"
#include "quiche/quic/moqt/moqt_track.h"
#include "quiche/quic/moqt/tools/moqt_server.h"
#include "quiche/quic/platform/api/quic_ip_address.h"
#include "quiche/quic/platform/api/quic_socket_address.h"
#include "quiche/common/platform/api/quiche_command_line_flags.h"
#include "quiche/common/platform/api/quiche_default_proof_providers.h"
#include "quiche/common/platform/api/quiche_file_utils.h"
#include "quiche/common/platform/api/quiche_logging.h"
#include "quiche/common/quiche_ip_address.h"

// Utility code for working with directories.
// TODO: make those cross-platform and move into quiche_file_utils.h.
namespace {
absl::Status IsDirectory(absl::string_view path) {
  std::string directory(path);
  struct stat directory_stat;
  int result = ::stat(directory.c_str(), &directory_stat);
  if (result != 0) {
    return absl::ErrnoToStatus(errno, "Failed to stat the directory");
  }
  if (!S_ISDIR(directory_stat.st_mode)) {
    return absl::InvalidArgumentError("Requested path is not a directory");
  }
  return absl::OkStatus();
}

absl::Status MakeDirectory(absl::string_view path) {
  int result = ::mkdir(std::string(path).c_str(), 0755);
  if (result != 0) {
    return absl::ErrnoToStatus(errno, "Failed to create directory");
  }
  return absl::OkStatus();
}
}  // namespace

DEFINE_QUICHE_COMMAND_LINE_FLAG(
    bool, allow_invalid_track_namespaces, false,
    "If true, invalid track namespaces will be escaped rather than rejected.");
DEFINE_QUICHE_COMMAND_LINE_FLAG(
    std::string, tracks, "video,audio",
    "List of track names to request from the peer.");

namespace moqt {
namespace {

bool IsValidTrackNamespaceChar(char c) {
  // Since we using track namespaces for directory names, limit the set of
  // allowed characters.
  return absl::ascii_isalnum(c) || c == '-' || c == '_';
}

bool IsValidTrackNamespace(FullTrackName track_namespace) {
  for (const auto& element : track_namespace.tuple()) {
    if (!absl::c_all_of(element, IsValidTrackNamespaceChar)) {
      return false;
    }
  }
  return true;
}

FullTrackName CleanUpTrackNamespace(FullTrackName track_namespace) {
  FullTrackName output;
  for (auto& it : track_namespace.tuple()) {
    std::string element = it;
    for (char& c : element) {
      if (!IsValidTrackNamespaceChar(c)) {
        c = '_';
      }
    }
    output.AddElement(element);
  }
  return output;
}

// Maintains the state for individual incoming MoQT sessions.
class MoqtIngestionHandler {
 public:
  explicit MoqtIngestionHandler(MoqtSession* session,
                                absl::string_view output_root)
      : session_(session), output_root_(output_root) {
    session_->callbacks().incoming_announce_callback =
        absl::bind_front(&MoqtIngestionHandler::OnAnnounceReceived, this);
  }

  // TODO(martinduke): Handle when |announce| is false (UNANNOUNCE).
  std::optional<MoqtAnnounceErrorReason> OnAnnounceReceived(
      FullTrackName track_namespace, AnnounceEvent /*announce*/) {
    if (!IsValidTrackNamespace(track_namespace) &&
        !quiche::GetQuicheCommandLineFlag(
            FLAGS_allow_invalid_track_namespaces)) {
      QUICHE_DLOG(WARNING) << "Rejected remote announce as it contained "
                              "disallowed characters; namespace: "
                           << track_namespace;
      return MoqtAnnounceErrorReason{
          SubscribeErrorCode::kInternalError,
          "Track namespace contains disallowed characters"};
    }

    std::string directory_name = absl::StrCat(
        CleanUpTrackNamespace(track_namespace), "_",
        absl::FormatTime("%Y%m%d_%H%M%S", absl::Now(), absl::UTCTimeZone()));
    std::string directory_path = quiche::JoinPath(output_root_, directory_name);
    auto [it, added] = subscribed_namespaces_.emplace(
        track_namespace, NamespaceHandler(directory_path));
    if (!added) {
      // Received before; should be handled by already existing subscriptions.
      return std::nullopt;
    }

    if (absl::Status status = MakeDirectory(directory_path); !status.ok()) {
      subscribed_namespaces_.erase(it);
      QUICHE_LOG(ERROR) << "Failed to create directory " << directory_path
                        << "; " << status;
      return MoqtAnnounceErrorReason{SubscribeErrorCode::kInternalError,
                                     "Failed to create output directory"};
    }

    std::string track_list = quiche::GetQuicheCommandLineFlag(FLAGS_tracks);
    std::vector<absl::string_view> tracks_to_subscribe =
        absl::StrSplit(track_list, ',', absl::AllowEmpty());
    for (absl::string_view track : tracks_to_subscribe) {
      FullTrackName full_track_name = track_namespace;
      full_track_name.AddElement(track);
      session_->JoiningFetch(full_track_name, &it->second, 0,
                             VersionSpecificParameters());
    }

    return std::nullopt;
  }

 private:
  class NamespaceHandler : public SubscribeRemoteTrack::Visitor {
   public:
    explicit NamespaceHandler(absl::string_view directory)
        : directory_(directory) {}

    void OnReply(
        const FullTrackName& full_track_name,
        std::optional<Location> /*largest_id*/,
        std::optional<absl::string_view> error_reason_phrase) override {
      if (error_reason_phrase.has_value()) {
        QUICHE_LOG(ERROR) << "Failed to subscribe to the peer track "
                          << full_track_name << ": " << *error_reason_phrase;
      }
    }

    void OnCanAckObjects(MoqtObjectAckFunction) override {}

    void OnObjectFragment(const FullTrackName& full_track_name,
                          Location sequence,
                          MoqtPriority /*publisher_priority*/,
                          MoqtObjectStatus /*status*/, absl::string_view object,
                          bool /*end_of_message*/) override {
      std::string file_name = absl::StrCat(sequence.group, "-", sequence.object,
                                           ".", full_track_name.tuple().back());
      std::string file_path = quiche::JoinPath(directory_, file_name);
      std::ofstream output(file_path, std::ios::binary | std::ios::ate);
      output.write(object.data(), object.size());
      output.close();
    }

    void OnSubscribeDone(FullTrackName /*full_track_name*/) override {}

   private:
    std::string directory_;
  };

  MoqtSession* session_;  // Not owned.
  std::string output_root_;
  absl::node_hash_map<FullTrackName, NamespaceHandler> subscribed_namespaces_;
};

absl::StatusOr<MoqtConfigureSessionCallback> IncomingSessionHandler(
    std::string output_root, absl::string_view path) {
  if (path != "/ingest") {
    return absl::NotFoundError("Unknown endpoint; try \"/ingest\".");
  }
  return [output_root](MoqtSession* session) {
    auto handler = std::make_unique<MoqtIngestionHandler>(session, output_root);
    session->callbacks().session_deleted_callback = [handler =
                                                         std::move(handler)] {};
  };
}

}  // namespace
}  // namespace moqt

DEFINE_QUICHE_COMMAND_LINE_FLAG(std::string, bind_address, "127.0.0.1",
                                "Local IP address to bind to");
DEFINE_QUICHE_COMMAND_LINE_FLAG(uint16_t, port, 8000,
                                "Port for the server to listen on");

int main(int argc, char** argv) {
  const char* usage = "Usage: moqt_ingestion_server [options] output_directory";
  std::vector<std::string> args =
      quiche::QuicheParseCommandLineFlags(usage, argc, argv);
  if (args.size() != 1) {
    quiche::QuichePrintCommandLineFlagHelp(usage);
    return 1;
  }

  std::string output_directory = args[0];
  if (absl::Status stat_status = IsDirectory(output_directory);
      !stat_status.ok()) {
    if (absl::IsNotFound(stat_status)) {
      absl::Status mkdir_status = MakeDirectory(output_directory);
      if (!mkdir_status.ok()) {
        QUICHE_LOG(ERROR) << "Failed to create output directory: "
                          << mkdir_status;
        return 1;
      }
    } else {
      QUICHE_LOG(ERROR) << stat_status;
      return 1;
    }
  }

  moqt::MoqtServer server(
      quiche::CreateDefaultProofSource(),
      absl::bind_front(moqt::IncomingSessionHandler, output_directory));
  quiche::QuicheIpAddress bind_address;
  QUICHE_CHECK(bind_address.FromString(
      quiche::GetQuicheCommandLineFlag(FLAGS_bind_address)));
  server.quic_server().CreateUDPSocketAndListen(quic::QuicSocketAddress(
      bind_address, quiche::GetQuicheCommandLineFlag(FLAGS_port)));
  server.quic_server().HandleEventsForever();

  return 0;
}
