Introduce MoQT demo ingestion server that writes incoming streams into a directory.

It is possible to test this server against the demo chat client by running a command among the lines of `moqt_ingestion_server /tmp/outdir --alsologtostderr --allow_invalid_track_namespaces --tracks=`.

PiperOrigin-RevId: 612845930
diff --git a/build/source_list.bzl b/build/source_list.bzl
index 0deac7a..c6e4873 100644
--- a/build/source_list.bzl
+++ b/build/source_list.bzl
@@ -1508,6 +1508,7 @@
     "quic/moqt/tools/chat_client_bin.cc",
     "quic/moqt/tools/moqt_client.cc",
     "quic/moqt/tools/moqt_end_to_end_test.cc",
+    "quic/moqt/tools/moqt_ingestion_server_bin.cc",
     "quic/moqt/tools/moqt_server.cc",
 ]
 binary_http_hdrs = [
diff --git a/build/source_list.gni b/build/source_list.gni
index ea0fefd..72489d2 100644
--- a/build/source_list.gni
+++ b/build/source_list.gni
@@ -1512,6 +1512,7 @@
     "src/quiche/quic/moqt/tools/chat_client_bin.cc",
     "src/quiche/quic/moqt/tools/moqt_client.cc",
     "src/quiche/quic/moqt/tools/moqt_end_to_end_test.cc",
+    "src/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc",
     "src/quiche/quic/moqt/tools/moqt_server.cc",
 ]
 binary_http_hdrs = [
diff --git a/build/source_list.json b/build/source_list.json
index 6fe5adc..6bebbca 100644
--- a/build/source_list.json
+++ b/build/source_list.json
@@ -1511,6 +1511,7 @@
     "quiche/quic/moqt/tools/chat_client_bin.cc",
     "quiche/quic/moqt/tools/moqt_client.cc",
     "quiche/quic/moqt/tools/moqt_end_to_end_test.cc",
+    "quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc",
     "quiche/quic/moqt/tools/moqt_server.cc"
   ],
   "binary_http_hdrs": [
diff --git a/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc b/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc
new file mode 100644
index 0000000..c7d465c
--- /dev/null
+++ b/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc
@@ -0,0 +1,246 @@
+// Copyright 2024 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+// moqt_ingestion_server is a simple command-line utility that accepts incoming
+// ANNOUNCE messages and records them into a file.
+
+#include <sys/stat.h>
+
+#include <cerrno>
+#include <cstdint>
+#include <fstream>
+#include <ios>
+#include <memory>
+#include <optional>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "absl/algorithm/container.h"
+#include "absl/container/node_hash_map.h"
+#include "absl/functional/bind_front.h"
+#include "absl/status/status.h"
+#include "absl/status/statusor.h"
+#include "absl/strings/ascii.h"
+#include "absl/strings/str_cat.h"
+#include "absl/strings/str_split.h"
+#include "absl/strings/string_view.h"
+#include "absl/time/clock.h"
+#include "absl/time/time.h"
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_session.h"
+#include "quiche/quic/moqt/moqt_track.h"
+#include "quiche/quic/moqt/tools/moqt_server.h"
+#include "quiche/quic/platform/api/quic_ip_address.h"
+#include "quiche/quic/platform/api/quic_socket_address.h"
+#include "quiche/common/platform/api/quiche_command_line_flags.h"
+#include "quiche/common/platform/api/quiche_default_proof_providers.h"
+#include "quiche/common/platform/api/quiche_file_utils.h"
+#include "quiche/common/platform/api/quiche_logging.h"
+#include "quiche/common/quiche_ip_address.h"
+
+// Utility code for working with directories.
+// TODO: make those cross-platform and move into quiche_file_utils.h.
+namespace {
+absl::Status IsDirectory(absl::string_view path) {
+  std::string directory(path);
+  struct stat directory_stat;
+  int result = ::stat(directory.c_str(), &directory_stat);
+  if (result != 0) {
+    return absl::ErrnoToStatus(errno, "Failed to stat the directory");
+  }
+  if (!S_ISDIR(directory_stat.st_mode)) {
+    return absl::InvalidArgumentError("Requested path is not a directory");
+  }
+  return absl::OkStatus();
+}
+
+absl::Status MakeDirectory(absl::string_view path) {
+  int result = ::mkdir(std::string(path).c_str(), 0755);
+  if (result != 0) {
+    return absl::ErrnoToStatus(errno, "Failed to create directory");
+  }
+  return absl::OkStatus();
+}
+}  // namespace
+
+DEFINE_QUICHE_COMMAND_LINE_FLAG(
+    bool, allow_invalid_track_namespaces, false,
+    "If true, invalid track namespaces will be escaped rather than rejected.");
+DEFINE_QUICHE_COMMAND_LINE_FLAG(
+    std::string, tracks, "video,audio",
+    "List of track names to request from the peer.");
+
+namespace moqt {
+namespace {
+
+bool IsValidTrackNamespaceChar(char c) {
+  // Since we using track namespaces for directory names, limit the set of
+  // allowed characters.
+  return absl::ascii_isalnum(c) || c == '-' || c == '_';
+}
+
+bool IsValidTrackNamespace(absl::string_view track_namespace) {
+  return absl::c_all_of(track_namespace, IsValidTrackNamespaceChar);
+}
+
+std::string CleanUpTrackNamespace(absl::string_view track_namespace) {
+  std::string output(track_namespace);
+  for (char& c : output) {
+    if (!IsValidTrackNamespaceChar(c)) {
+      c = '_';
+    }
+  }
+  return output;
+}
+
+// Maintains the state for individual incoming MoQT sessions.
+class MoqtIngestionHandler {
+ public:
+  explicit MoqtIngestionHandler(MoqtSession* session,
+                                absl::string_view output_root)
+      : session_(session), output_root_(output_root) {
+    session_->callbacks().incoming_announce_callback =
+        absl::bind_front(&MoqtIngestionHandler::OnAnnounceReceived, this);
+  }
+
+  std::optional<MoqtAnnounceErrorReason> OnAnnounceReceived(
+      absl::string_view track_namespace) {
+    if (!IsValidTrackNamespace(track_namespace) &&
+        !quiche::GetQuicheCommandLineFlag(
+            FLAGS_allow_invalid_track_namespaces)) {
+      QUICHE_DLOG(WARNING) << "Rejected remote announce as it contained "
+                              "disallowed characters; namespace: "
+                           << track_namespace;
+      return MoqtAnnounceErrorReason{
+          MoqtAnnounceErrorCode::kInternalError,
+          "Track namespace contains disallowed characters"};
+    }
+
+    std::string directory_name = absl::StrCat(
+        CleanUpTrackNamespace(track_namespace), "_",
+        absl::FormatTime("%Y%m%d_%H%M%S", absl::Now(), absl::UTCTimeZone()));
+    std::string directory_path = quiche::JoinPath(output_root_, directory_name);
+    auto [it, added] = subscribed_namespaces_.emplace(
+        track_namespace, NamespaceHandler(directory_path));
+    if (!added) {
+      // Received before; should be handled by already existing subscriptions.
+      return std::nullopt;
+    }
+
+    if (absl::Status status = MakeDirectory(directory_path); !status.ok()) {
+      subscribed_namespaces_.erase(it);
+      QUICHE_LOG(ERROR) << "Failed to create directory " << directory_path
+                        << "; " << status;
+      return MoqtAnnounceErrorReason{MoqtAnnounceErrorCode::kInternalError,
+                                     "Failed to create output directory"};
+    }
+
+    std::string track_list = quiche::GetQuicheCommandLineFlag(FLAGS_tracks);
+    std::vector<absl::string_view> tracks_to_subscribe =
+        absl::StrSplit(track_list, ',', absl::AllowEmpty());
+    for (absl::string_view track : tracks_to_subscribe) {
+      session_->SubscribeCurrentGroup(track_namespace, track, &it->second);
+    }
+
+    return std::nullopt;
+  }
+
+ private:
+  class NamespaceHandler : public RemoteTrack::Visitor {
+   public:
+    explicit NamespaceHandler(absl::string_view directory)
+        : directory_(directory) {}
+
+    void OnReply(
+        const FullTrackName& full_track_name,
+        std::optional<absl::string_view> error_reason_phrase) override {
+      if (error_reason_phrase.has_value()) {
+        QUICHE_LOG(ERROR) << "Failed to subscribe to the peer track "
+                          << full_track_name.track_namespace << " "
+                          << full_track_name.track_name << ": "
+                          << *error_reason_phrase;
+      }
+    }
+
+    void OnObjectFragment(const FullTrackName& full_track_name,
+                          uint64_t group_sequence, uint64_t object_sequence,
+                          uint64_t /*object_send_order*/,
+                          MoqtForwardingPreference /*forwarding_preference*/,
+                          absl::string_view object,
+                          bool /*end_of_message*/) override {
+      std::string file_name = absl::StrCat(group_sequence, "-", object_sequence,
+                                           ".", full_track_name.track_name);
+      std::string file_path = quiche::JoinPath(directory_, file_name);
+      std::ofstream output(file_path, std::ios::binary | std::ios::ate);
+      output.write(object.data(), object.size());
+      output.close();
+    }
+
+   private:
+    std::string directory_;
+  };
+
+  MoqtSession* session_;  // Not owned.
+  std::string output_root_;
+  absl::node_hash_map<std::string, NamespaceHandler> subscribed_namespaces_;
+};
+
+absl::StatusOr<MoqtConfigureSessionCallback> IncomingSessionHandler(
+    std::string output_root, absl::string_view path) {
+  if (path != "/ingest") {
+    return absl::NotFoundError("Unknown endpoint; try \"/ingest\".");
+  }
+  return [output_root](MoqtSession* session) {
+    auto handler = std::make_unique<MoqtIngestionHandler>(session, output_root);
+    session->callbacks().session_deleted_callback = [handler =
+                                                         std::move(handler)] {};
+  };
+}
+
+}  // namespace
+}  // namespace moqt
+
+DEFINE_QUICHE_COMMAND_LINE_FLAG(std::string, bind_address, "127.0.0.1",
+                                "Local IP address to bind to");
+DEFINE_QUICHE_COMMAND_LINE_FLAG(uint16_t, port, 8000,
+                                "Port for the server to listen on");
+
+int main(int argc, char** argv) {
+  const char* usage = "Usage: moqt_ingestion_server [options] output_directory";
+  std::vector<std::string> args =
+      quiche::QuicheParseCommandLineFlags(usage, argc, argv);
+  if (args.size() != 1) {
+    quiche::QuichePrintCommandLineFlagHelp(usage);
+    return 1;
+  }
+
+  std::string output_directory = args[0];
+  if (absl::Status stat_status = IsDirectory(output_directory);
+      !stat_status.ok()) {
+    if (absl::IsNotFound(stat_status)) {
+      absl::Status mkdir_status = MakeDirectory(output_directory);
+      if (!mkdir_status.ok()) {
+        QUICHE_LOG(ERROR) << "Failed to create output directory: "
+                          << mkdir_status;
+        return 1;
+      }
+    } else {
+      QUICHE_LOG(ERROR) << stat_status;
+      return 1;
+    }
+  }
+
+  moqt::MoqtServer server(
+      quiche::CreateDefaultProofSource(),
+      absl::bind_front(moqt::IncomingSessionHandler, output_directory));
+  quiche::QuicheIpAddress bind_address;
+  QUICHE_CHECK(bind_address.FromString(
+      quiche::GetQuicheCommandLineFlag(FLAGS_bind_address)));
+  server.quic_server().CreateUDPSocketAndListen(quic::QuicSocketAddress(
+      bind_address, quiche::GetQuicheCommandLineFlag(FLAGS_port)));
+  server.quic_server().HandleEventsForever();
+
+  return 0;
+}