Introduce MoQT demo ingestion server that writes incoming streams into a directory.
It is possible to test this server against the demo chat client by running a command among the lines of `moqt_ingestion_server /tmp/outdir --alsologtostderr --allow_invalid_track_namespaces --tracks=`.
PiperOrigin-RevId: 612845930
diff --git a/build/source_list.bzl b/build/source_list.bzl
index 0deac7a..c6e4873 100644
--- a/build/source_list.bzl
+++ b/build/source_list.bzl
@@ -1508,6 +1508,7 @@
"quic/moqt/tools/chat_client_bin.cc",
"quic/moqt/tools/moqt_client.cc",
"quic/moqt/tools/moqt_end_to_end_test.cc",
+ "quic/moqt/tools/moqt_ingestion_server_bin.cc",
"quic/moqt/tools/moqt_server.cc",
]
binary_http_hdrs = [
diff --git a/build/source_list.gni b/build/source_list.gni
index ea0fefd..72489d2 100644
--- a/build/source_list.gni
+++ b/build/source_list.gni
@@ -1512,6 +1512,7 @@
"src/quiche/quic/moqt/tools/chat_client_bin.cc",
"src/quiche/quic/moqt/tools/moqt_client.cc",
"src/quiche/quic/moqt/tools/moqt_end_to_end_test.cc",
+ "src/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc",
"src/quiche/quic/moqt/tools/moqt_server.cc",
]
binary_http_hdrs = [
diff --git a/build/source_list.json b/build/source_list.json
index 6fe5adc..6bebbca 100644
--- a/build/source_list.json
+++ b/build/source_list.json
@@ -1511,6 +1511,7 @@
"quiche/quic/moqt/tools/chat_client_bin.cc",
"quiche/quic/moqt/tools/moqt_client.cc",
"quiche/quic/moqt/tools/moqt_end_to_end_test.cc",
+ "quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc",
"quiche/quic/moqt/tools/moqt_server.cc"
],
"binary_http_hdrs": [
diff --git a/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc b/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc
new file mode 100644
index 0000000..c7d465c
--- /dev/null
+++ b/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc
@@ -0,0 +1,246 @@
+// Copyright 2024 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+// moqt_ingestion_server is a simple command-line utility that accepts incoming
+// ANNOUNCE messages and records them into a file.
+
+#include <sys/stat.h>
+
+#include <cerrno>
+#include <cstdint>
+#include <fstream>
+#include <ios>
+#include <memory>
+#include <optional>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "absl/algorithm/container.h"
+#include "absl/container/node_hash_map.h"
+#include "absl/functional/bind_front.h"
+#include "absl/status/status.h"
+#include "absl/status/statusor.h"
+#include "absl/strings/ascii.h"
+#include "absl/strings/str_cat.h"
+#include "absl/strings/str_split.h"
+#include "absl/strings/string_view.h"
+#include "absl/time/clock.h"
+#include "absl/time/time.h"
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_session.h"
+#include "quiche/quic/moqt/moqt_track.h"
+#include "quiche/quic/moqt/tools/moqt_server.h"
+#include "quiche/quic/platform/api/quic_ip_address.h"
+#include "quiche/quic/platform/api/quic_socket_address.h"
+#include "quiche/common/platform/api/quiche_command_line_flags.h"
+#include "quiche/common/platform/api/quiche_default_proof_providers.h"
+#include "quiche/common/platform/api/quiche_file_utils.h"
+#include "quiche/common/platform/api/quiche_logging.h"
+#include "quiche/common/quiche_ip_address.h"
+
+// Utility code for working with directories.
+// TODO: make those cross-platform and move into quiche_file_utils.h.
+namespace {
+absl::Status IsDirectory(absl::string_view path) {
+ std::string directory(path);
+ struct stat directory_stat;
+ int result = ::stat(directory.c_str(), &directory_stat);
+ if (result != 0) {
+ return absl::ErrnoToStatus(errno, "Failed to stat the directory");
+ }
+ if (!S_ISDIR(directory_stat.st_mode)) {
+ return absl::InvalidArgumentError("Requested path is not a directory");
+ }
+ return absl::OkStatus();
+}
+
+absl::Status MakeDirectory(absl::string_view path) {
+ int result = ::mkdir(std::string(path).c_str(), 0755);
+ if (result != 0) {
+ return absl::ErrnoToStatus(errno, "Failed to create directory");
+ }
+ return absl::OkStatus();
+}
+} // namespace
+
+DEFINE_QUICHE_COMMAND_LINE_FLAG(
+ bool, allow_invalid_track_namespaces, false,
+ "If true, invalid track namespaces will be escaped rather than rejected.");
+DEFINE_QUICHE_COMMAND_LINE_FLAG(
+ std::string, tracks, "video,audio",
+ "List of track names to request from the peer.");
+
+namespace moqt {
+namespace {
+
+bool IsValidTrackNamespaceChar(char c) {
+ // Since we using track namespaces for directory names, limit the set of
+ // allowed characters.
+ return absl::ascii_isalnum(c) || c == '-' || c == '_';
+}
+
+bool IsValidTrackNamespace(absl::string_view track_namespace) {
+ return absl::c_all_of(track_namespace, IsValidTrackNamespaceChar);
+}
+
+std::string CleanUpTrackNamespace(absl::string_view track_namespace) {
+ std::string output(track_namespace);
+ for (char& c : output) {
+ if (!IsValidTrackNamespaceChar(c)) {
+ c = '_';
+ }
+ }
+ return output;
+}
+
+// Maintains the state for individual incoming MoQT sessions.
+class MoqtIngestionHandler {
+ public:
+ explicit MoqtIngestionHandler(MoqtSession* session,
+ absl::string_view output_root)
+ : session_(session), output_root_(output_root) {
+ session_->callbacks().incoming_announce_callback =
+ absl::bind_front(&MoqtIngestionHandler::OnAnnounceReceived, this);
+ }
+
+ std::optional<MoqtAnnounceErrorReason> OnAnnounceReceived(
+ absl::string_view track_namespace) {
+ if (!IsValidTrackNamespace(track_namespace) &&
+ !quiche::GetQuicheCommandLineFlag(
+ FLAGS_allow_invalid_track_namespaces)) {
+ QUICHE_DLOG(WARNING) << "Rejected remote announce as it contained "
+ "disallowed characters; namespace: "
+ << track_namespace;
+ return MoqtAnnounceErrorReason{
+ MoqtAnnounceErrorCode::kInternalError,
+ "Track namespace contains disallowed characters"};
+ }
+
+ std::string directory_name = absl::StrCat(
+ CleanUpTrackNamespace(track_namespace), "_",
+ absl::FormatTime("%Y%m%d_%H%M%S", absl::Now(), absl::UTCTimeZone()));
+ std::string directory_path = quiche::JoinPath(output_root_, directory_name);
+ auto [it, added] = subscribed_namespaces_.emplace(
+ track_namespace, NamespaceHandler(directory_path));
+ if (!added) {
+ // Received before; should be handled by already existing subscriptions.
+ return std::nullopt;
+ }
+
+ if (absl::Status status = MakeDirectory(directory_path); !status.ok()) {
+ subscribed_namespaces_.erase(it);
+ QUICHE_LOG(ERROR) << "Failed to create directory " << directory_path
+ << "; " << status;
+ return MoqtAnnounceErrorReason{MoqtAnnounceErrorCode::kInternalError,
+ "Failed to create output directory"};
+ }
+
+ std::string track_list = quiche::GetQuicheCommandLineFlag(FLAGS_tracks);
+ std::vector<absl::string_view> tracks_to_subscribe =
+ absl::StrSplit(track_list, ',', absl::AllowEmpty());
+ for (absl::string_view track : tracks_to_subscribe) {
+ session_->SubscribeCurrentGroup(track_namespace, track, &it->second);
+ }
+
+ return std::nullopt;
+ }
+
+ private:
+ class NamespaceHandler : public RemoteTrack::Visitor {
+ public:
+ explicit NamespaceHandler(absl::string_view directory)
+ : directory_(directory) {}
+
+ void OnReply(
+ const FullTrackName& full_track_name,
+ std::optional<absl::string_view> error_reason_phrase) override {
+ if (error_reason_phrase.has_value()) {
+ QUICHE_LOG(ERROR) << "Failed to subscribe to the peer track "
+ << full_track_name.track_namespace << " "
+ << full_track_name.track_name << ": "
+ << *error_reason_phrase;
+ }
+ }
+
+ void OnObjectFragment(const FullTrackName& full_track_name,
+ uint64_t group_sequence, uint64_t object_sequence,
+ uint64_t /*object_send_order*/,
+ MoqtForwardingPreference /*forwarding_preference*/,
+ absl::string_view object,
+ bool /*end_of_message*/) override {
+ std::string file_name = absl::StrCat(group_sequence, "-", object_sequence,
+ ".", full_track_name.track_name);
+ std::string file_path = quiche::JoinPath(directory_, file_name);
+ std::ofstream output(file_path, std::ios::binary | std::ios::ate);
+ output.write(object.data(), object.size());
+ output.close();
+ }
+
+ private:
+ std::string directory_;
+ };
+
+ MoqtSession* session_; // Not owned.
+ std::string output_root_;
+ absl::node_hash_map<std::string, NamespaceHandler> subscribed_namespaces_;
+};
+
+absl::StatusOr<MoqtConfigureSessionCallback> IncomingSessionHandler(
+ std::string output_root, absl::string_view path) {
+ if (path != "/ingest") {
+ return absl::NotFoundError("Unknown endpoint; try \"/ingest\".");
+ }
+ return [output_root](MoqtSession* session) {
+ auto handler = std::make_unique<MoqtIngestionHandler>(session, output_root);
+ session->callbacks().session_deleted_callback = [handler =
+ std::move(handler)] {};
+ };
+}
+
+} // namespace
+} // namespace moqt
+
+DEFINE_QUICHE_COMMAND_LINE_FLAG(std::string, bind_address, "127.0.0.1",
+ "Local IP address to bind to");
+DEFINE_QUICHE_COMMAND_LINE_FLAG(uint16_t, port, 8000,
+ "Port for the server to listen on");
+
+int main(int argc, char** argv) {
+ const char* usage = "Usage: moqt_ingestion_server [options] output_directory";
+ std::vector<std::string> args =
+ quiche::QuicheParseCommandLineFlags(usage, argc, argv);
+ if (args.size() != 1) {
+ quiche::QuichePrintCommandLineFlagHelp(usage);
+ return 1;
+ }
+
+ std::string output_directory = args[0];
+ if (absl::Status stat_status = IsDirectory(output_directory);
+ !stat_status.ok()) {
+ if (absl::IsNotFound(stat_status)) {
+ absl::Status mkdir_status = MakeDirectory(output_directory);
+ if (!mkdir_status.ok()) {
+ QUICHE_LOG(ERROR) << "Failed to create output directory: "
+ << mkdir_status;
+ return 1;
+ }
+ } else {
+ QUICHE_LOG(ERROR) << stat_status;
+ return 1;
+ }
+ }
+
+ moqt::MoqtServer server(
+ quiche::CreateDefaultProofSource(),
+ absl::bind_front(moqt::IncomingSessionHandler, output_directory));
+ quiche::QuicheIpAddress bind_address;
+ QUICHE_CHECK(bind_address.FromString(
+ quiche::GetQuicheCommandLineFlag(FLAGS_bind_address)));
+ server.quic_server().CreateUDPSocketAndListen(quic::QuicSocketAddress(
+ bind_address, quiche::GetQuicheCommandLineFlag(FLAGS_port)));
+ server.quic_server().HandleEventsForever();
+
+ return 0;
+}