|  | // Copyright (c) 2023 The Chromium Authors. All rights reserved. | 
|  | // Use of this source code is governed by a BSD-style license that can be | 
|  | // found in the LICENSE file. | 
|  |  | 
|  | #include "quiche/quic/moqt/tools/chat_client.h" | 
|  |  | 
|  | #include <poll.h> | 
|  | #include <unistd.h> | 
|  |  | 
|  | #include <cstdint> | 
|  | #include <iostream> | 
|  | #include <memory> | 
|  | #include <optional> | 
|  | #include <sstream> | 
|  | #include <string> | 
|  | #include <utility> | 
|  | #include <vector> | 
|  |  | 
|  | #include "absl/container/flat_hash_map.h" | 
|  | #include "absl/strings/str_split.h" | 
|  | #include "absl/strings/string_view.h" | 
|  | #include "quiche/quic/core/crypto/proof_verifier.h" | 
|  | #include "quiche/quic/core/io/quic_default_event_loop.h" | 
|  | #include "quiche/quic/core/io/quic_event_loop.h" | 
|  | #include "quiche/quic/core/quic_default_clock.h" | 
|  | #include "quiche/quic/core/quic_server_id.h" | 
|  | #include "quiche/quic/moqt/moqt_known_track_publisher.h" | 
|  | #include "quiche/quic/moqt/moqt_messages.h" | 
|  | #include "quiche/quic/moqt/moqt_outgoing_queue.h" | 
|  | #include "quiche/quic/moqt/moqt_priority.h" | 
|  | #include "quiche/quic/moqt/moqt_session.h" | 
|  | #include "quiche/quic/moqt/moqt_track.h" | 
|  | #include "quiche/quic/moqt/tools/moqt_client.h" | 
|  | #include "quiche/quic/platform/api/quic_default_proof_providers.h" | 
|  | #include "quiche/quic/platform/api/quic_socket_address.h" | 
|  | #include "quiche/quic/tools/fake_proof_verifier.h" | 
|  | #include "quiche/quic/tools/quic_name_lookup.h" | 
|  | #include "quiche/common/platform/api/quiche_mem_slice.h" | 
|  | #include "quiche/common/quiche_buffer_allocator.h" | 
|  | #include "quiche/common/simple_buffer_allocator.h" | 
|  |  | 
|  | namespace moqt { | 
|  |  | 
|  | ChatClient::ChatClient(const quic::QuicServerId& server_id, | 
|  | bool ignore_certificate, | 
|  | std::unique_ptr<ChatUserInterface> interface, | 
|  | quic::QuicEventLoop* event_loop) | 
|  | : event_loop_(event_loop), interface_(std::move(interface)) { | 
|  | if (event_loop_ == nullptr) { | 
|  | quic::QuicDefaultClock* clock = quic::QuicDefaultClock::Get(); | 
|  | local_event_loop_ = quic::GetDefaultEventLoop()->Create(clock); | 
|  | event_loop_ = local_event_loop_.get(); | 
|  | } | 
|  |  | 
|  | quic::QuicSocketAddress peer_address = | 
|  | quic::tools::LookupAddress(AF_UNSPEC, server_id); | 
|  | std::unique_ptr<quic::ProofVerifier> verifier; | 
|  | if (ignore_certificate) { | 
|  | verifier = std::make_unique<quic::FakeProofVerifier>(); | 
|  | } else { | 
|  | verifier = quic::CreateDefaultProofVerifier(server_id.host()); | 
|  | } | 
|  |  | 
|  | client_ = std::make_unique<MoqtClient>(peer_address, server_id, | 
|  | std::move(verifier), event_loop_); | 
|  | session_callbacks_.session_established_callback = [this]() { | 
|  | std::cout << "Session established\n"; | 
|  | session_is_open_ = true; | 
|  | }; | 
|  | session_callbacks_.session_terminated_callback = | 
|  | [this](absl::string_view error_message) { | 
|  | std::cerr << "Closed session, reason = " << error_message << "\n"; | 
|  | session_is_open_ = false; | 
|  | connect_failed_ = true; | 
|  | }; | 
|  | session_callbacks_.session_deleted_callback = [this]() { | 
|  | session_ = nullptr; | 
|  | }; | 
|  | interface_->Initialize( | 
|  | [this](absl::string_view input_message) { | 
|  | OnTerminalLineInput(input_message); | 
|  | }, | 
|  | event_loop_); | 
|  | } | 
|  |  | 
|  | bool ChatClient::Connect(absl::string_view path, absl::string_view username, | 
|  | absl::string_view chat_id) { | 
|  | username_ = username; | 
|  | chat_strings_.emplace(chat_id); | 
|  | client_->Connect(std::string(path), std::move(session_callbacks_)); | 
|  | while (!session_is_open_ && !connect_failed_) { | 
|  | RunEventLoop(); | 
|  | } | 
|  | return (!connect_failed_); | 
|  | } | 
|  |  | 
|  | void ChatClient::OnTerminalLineInput(absl::string_view input_message) { | 
|  | if (input_message.empty()) { | 
|  | return; | 
|  | } | 
|  | if (input_message == "/exit") { | 
|  | session_is_open_ = false; | 
|  | return; | 
|  | } | 
|  | quiche::QuicheMemSlice message_slice(quiche::QuicheBuffer::Copy( | 
|  | quiche::SimpleBufferAllocator::Get(), input_message)); | 
|  | queue_->AddObject(std::move(message_slice), /*key=*/true); | 
|  | } | 
|  |  | 
|  | void ChatClient::RemoteTrackVisitor::OnReply( | 
|  | const FullTrackName& full_track_name, | 
|  | std::optional<FullSequence> /*largest_id*/, | 
|  | std::optional<absl::string_view> reason_phrase) { | 
|  | client_->subscribes_to_make_--; | 
|  | if (full_track_name == client_->chat_strings_->GetCatalogName()) { | 
|  | std::cout << "Subscription to catalog "; | 
|  | } else { | 
|  | std::cout << "Subscription to user " << full_track_name.ToString() << " "; | 
|  | } | 
|  | if (reason_phrase.has_value()) { | 
|  | std::cout << "REJECTED, reason = " << *reason_phrase << "\n"; | 
|  | } else { | 
|  | std::cout << "ACCEPTED\n"; | 
|  | } | 
|  | } | 
|  |  | 
|  | void ChatClient::RemoteTrackVisitor::OnObjectFragment( | 
|  | const FullTrackName& full_track_name, FullSequence sequence, | 
|  | MoqtPriority /*publisher_priority*/, MoqtObjectStatus /*status*/, | 
|  | absl::string_view object, bool end_of_message) { | 
|  | if (!end_of_message) { | 
|  | std::cerr << "Error: received partial message despite requesting " | 
|  | "buffering\n"; | 
|  | } | 
|  | if (full_track_name == client_->chat_strings_->GetCatalogName()) { | 
|  | if (sequence.group < client_->catalog_group_) { | 
|  | std::cout << "Ignoring old catalog"; | 
|  | return; | 
|  | } | 
|  | client_->ProcessCatalog(object, this, sequence.group, sequence.object); | 
|  | return; | 
|  | } | 
|  | std::string username( | 
|  | client_->chat_strings_->GetUsernameFromFullTrackName(full_track_name)); | 
|  | if (!client_->other_users_.contains(username)) { | 
|  | std::cout << "Username " << username << "doesn't exist\n"; | 
|  | return; | 
|  | } | 
|  | if (object.empty()) { | 
|  | return; | 
|  | } | 
|  | client_->WriteToOutput(username, object); | 
|  | } | 
|  |  | 
|  | bool ChatClient::AnnounceAndSubscribe() { | 
|  | session_ = client_->session(); | 
|  | if (session_ == nullptr) { | 
|  | std::cout << "Failed to connect.\n"; | 
|  | return false; | 
|  | } | 
|  | if (!username_.empty()) { | 
|  | // A server log might choose to not provide a username, thus getting all | 
|  | // the messages without adding itself to the catalog. | 
|  | FullTrackName my_track_name = | 
|  | chat_strings_->GetFullTrackNameFromUsername(username_); | 
|  | queue_ = std::make_shared<MoqtOutgoingQueue>( | 
|  | my_track_name, MoqtForwardingPreference::kSubgroup); | 
|  | publisher_.Add(queue_); | 
|  | session_->set_publisher(&publisher_); | 
|  | MoqtOutgoingAnnounceCallback announce_callback = | 
|  | [this](FullTrackName track_namespace, | 
|  | std::optional<MoqtAnnounceErrorReason> reason) { | 
|  | if (reason.has_value()) { | 
|  | std::cout << "ANNOUNCE rejected, " << reason->reason_phrase << "\n"; | 
|  | session_->Error(MoqtError::kInternalError, | 
|  | "Local ANNOUNCE rejected"); | 
|  | return; | 
|  | } | 
|  | std::cout << "ANNOUNCE for " << track_namespace.ToString() | 
|  | << " accepted\n"; | 
|  | return; | 
|  | }; | 
|  | FullTrackName my_track_namespace = my_track_name; | 
|  | my_track_namespace.NameToNamespace(); | 
|  | std::cout << "Announcing " << my_track_namespace.ToString() << "\n"; | 
|  | session_->Announce(my_track_namespace, std::move(announce_callback)); | 
|  | } | 
|  | remote_track_visitor_ = std::make_unique<RemoteTrackVisitor>(this); | 
|  | FullTrackName catalog_name = chat_strings_->GetCatalogName(); | 
|  | if (!session_->SubscribeCurrentGroup( | 
|  | catalog_name, remote_track_visitor_.get(), | 
|  | MoqtSubscribeParameters{username_, std::nullopt, std::nullopt, | 
|  | std::nullopt})) { | 
|  | std::cout << "Failed to get catalog\n"; | 
|  | return false; | 
|  | } | 
|  | while (session_is_open_ && is_syncing()) { | 
|  | RunEventLoop(); | 
|  | } | 
|  | return session_is_open_; | 
|  | } | 
|  |  | 
|  | void ChatClient::ProcessCatalog(absl::string_view object, | 
|  | SubscribeRemoteTrack::Visitor* visitor, | 
|  | uint64_t group_sequence, | 
|  | uint64_t object_sequence) { | 
|  | std::string message(object); | 
|  | std::istringstream f(message); | 
|  | // std::string line; | 
|  | bool got_version = true; | 
|  | if (object_sequence == 0) { | 
|  | std::cout << "Received new Catalog. Users:\n"; | 
|  | got_version = false; | 
|  | } | 
|  | std::vector<absl::string_view> lines = | 
|  | absl::StrSplit(object, '\n', absl::SkipEmpty()); | 
|  | for (absl::string_view line : lines) { | 
|  | if (!got_version) { | 
|  | if (line != "version=1") { | 
|  | session_->Error(MoqtError::kProtocolViolation, | 
|  | "Catalog does not begin with version"); | 
|  | return; | 
|  | } | 
|  | got_version = true; | 
|  | continue; | 
|  | } | 
|  | std::string user; | 
|  | bool add = true; | 
|  | if (object_sequence > 0) { | 
|  | switch (line[0]) { | 
|  | case '-': | 
|  | add = false; | 
|  | break; | 
|  | case '+': | 
|  | break; | 
|  | default: | 
|  | std::cerr << "Catalog update with neither + nor -\n"; | 
|  | return; | 
|  | } | 
|  | user = line.substr(1, line.size() - 1); | 
|  | } else { | 
|  | user = line; | 
|  | } | 
|  | if (username_ == user) { | 
|  | std::cout << user << "\n"; | 
|  | continue; | 
|  | } | 
|  | if (!add) { | 
|  | session_->Unsubscribe(chat_strings_->GetFullTrackNameFromUsername(user)); | 
|  | std::cout << user << "left the chat\n"; | 
|  | other_users_.erase(user); | 
|  | continue; | 
|  | } | 
|  | if (object_sequence == 0) { | 
|  | std::cout << user << "\n"; | 
|  | } else { | 
|  | std::cout << user << " joined the chat\n"; | 
|  | } | 
|  | auto it = other_users_.find(user); | 
|  | if (it == other_users_.end()) { | 
|  | FullTrackName to_subscribe = | 
|  | chat_strings_->GetFullTrackNameFromUsername(user); | 
|  | auto new_user = other_users_.emplace( | 
|  | std::make_pair(user, ChatUser(to_subscribe, group_sequence))); | 
|  | ChatUser& user_record = new_user.first->second; | 
|  | session_->SubscribeCurrentGroup(user_record.full_track_name, visitor); | 
|  | subscribes_to_make_++; | 
|  | } else { | 
|  | if (it->second.from_group == group_sequence) { | 
|  | session_->Error(MoqtError::kProtocolViolation, | 
|  | "User listed twice in Catalog"); | 
|  | return; | 
|  | } | 
|  | it->second.from_group = group_sequence; | 
|  | } | 
|  | } | 
|  | if (object_sequence == 0) {  // Eliminate users that are no longer present | 
|  | absl::erase_if(other_users_, [&](const auto& kv) { | 
|  | return kv.second.from_group != group_sequence; | 
|  | }); | 
|  | } | 
|  | catalog_group_ = group_sequence; | 
|  | } | 
|  |  | 
|  | }  // namespace moqt |