Object handling in MoQT chat client. Threaded model was fatally flawed; now it works correctly. PiperOrigin-RevId: 608007963
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc index 11feb9e..40285d5 100644 --- a/quiche/quic/moqt/moqt_session.cc +++ b/quiche/quic/moqt/moqt_session.cc
@@ -259,7 +259,7 @@ return new_stream->GetStreamId(); } -bool MoqtSession::PublishObject(FullTrackName& full_track_name, +bool MoqtSession::PublishObject(const FullTrackName& full_track_name, uint64_t group_id, uint64_t object_id, uint64_t object_send_order, MoqtForwardingPreference forwarding_preference, @@ -330,8 +330,8 @@ ++failures; continue; } - QUICHE_LOG(INFO) << ENDPOINT << "Sending object " - << full_track_name.track_namespace << ":" + QUICHE_LOG(INFO) << ENDPOINT << "Sending object length " << payload.length() + << " for " << full_track_name.track_namespace << ":" << full_track_name.track_name << " with sequence " << object.group_id << ":" << object.object_id << " on stream " << *stream_id; @@ -599,7 +599,7 @@ } MoqtSubscribe& subscribe = it->second.message; QUIC_DLOG(INFO) << ENDPOINT << "Received the SUBSCRIBE_OK for " - << "subscribe_id = " << message.subscribe_id + << "subscribe_id = " << message.subscribe_id << " " << subscribe.track_namespace << ":" << subscribe.track_name; // Copy the Remote Track from session_->active_subscribes_ to // session_->remote_tracks_.
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h index 911849a..ee0720f 100644 --- a/quiche/quic/moqt/moqt_session.h +++ b/quiche/quic/moqt/moqt_session.h
@@ -117,7 +117,7 @@ // |payload|. // |payload.length() >= |payload_length|, because the application can deliver // partial objects. - bool PublishObject(FullTrackName& full_track_name, uint64_t group_id, + bool PublishObject(const FullTrackName& full_track_name, uint64_t group_id, uint64_t object_id, uint64_t object_send_order, MoqtForwardingPreference forwarding_preference, absl::string_view payload,
diff --git a/quiche/quic/moqt/tools/chat_client_bin.cc b/quiche/quic/moqt/tools/chat_client_bin.cc index c5cb070..cab2790 100644 --- a/quiche/quic/moqt/tools/chat_client_bin.cc +++ b/quiche/quic/moqt/tools/chat_client_bin.cc
@@ -2,6 +2,9 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. +#include <poll.h> +#include <unistd.h> + #include <cstdint> #include <iostream> #include <memory> @@ -26,13 +29,11 @@ #include "quiche/quic/moqt/tools/moqt_client.h" #include "quiche/quic/platform/api/quic_default_proof_providers.h" #include "quiche/quic/platform/api/quic_socket_address.h" -#include "quiche/quic/platform/api/quic_thread.h" #include "quiche/quic/tools/fake_proof_verifier.h" #include "quiche/quic/tools/quic_name_lookup.h" #include "quiche/quic/tools/quic_url.h" #include "quiche/common/platform/api/quiche_command_line_flags.h" #include "quiche/common/platform/api/quiche_export.h" -#include "quiche/common/quiche_circular_deque.h" DEFINE_QUICHE_COMMAND_LINE_FLAG( bool, disable_certificate_verification, false, @@ -79,14 +80,20 @@ bool session_is_open() const { return session_is_open_; } bool is_syncing() const { - return catalog_group_.has_value() || subscribes_to_make_ > 0 || + return !catalog_group_.has_value() || subscribes_to_make_ > 0 || !session_->HasSubscribers(my_track_name_); } void RunEventLoop() { - event_loop_->RunEventLoopOnce(quic::QuicTime::Delta::FromSeconds(5)); + event_loop_->RunEventLoopOnce(quic::QuicTime::Delta::FromMilliseconds(500)); } + const moqt::FullTrackName& my_track_name() { return my_track_name_; } + + moqt::MoqtSession* session() { return session_; } + + moqt::FullSequence& next_sequence() { return next_sequence_; } + class QUICHE_EXPORT RemoteTrackVisitor : public moqt::RemoteTrack::Visitor { public: RemoteTrackVisitor(ChatClient* client) : client_(client) {} @@ -125,6 +132,13 @@ return; } // TODO: Message is from another chat participant + std::string username = full_track_name.track_namespace; + username = username.substr(username.find_last_of('/') + 1); + if (!client_->other_users_.contains(username)) { + std::cout << "Username " << username << "doesn't exist\n"; + return; + } + std::cout << username << ": " << object << "\n"; } private: @@ -166,41 +180,6 @@ return true; } - class InputHandler : quic::QuicThread { - public: - explicit InputHandler(ChatClient* client) - : quic::QuicThread("InputThread"), client_(client) {} - - void Run() final { - while (client_->session_is_open_) { - std::string message_to_send; - std::cin >> message_to_send; // Waiting to start input - client_->entering_data_ = true; - std::cout << "> "; - std::cin >> message_to_send; - client_->entering_data_ = false; - while (!client_->incoming_messages_.empty()) { - std::cout << client_->incoming_messages_.front() << "\n"; - client_->incoming_messages_.pop_front(); - } - if (message_to_send.empty()) { - continue; - } - if (message_to_send == ":exit") { - std::cout << "Exiting the app.\n"; - // TODO: Close the session. - client_->session_is_open_ = false; - break; - } - // TODO: Send the message - std::cout << client_->username_ << ": " << message_to_send << "\n"; - } - } - - private: - ChatClient* client_; - }; - private: moqt::FullTrackName UsernameToTrackName(absl::string_view username) { return moqt::FullTrackName( @@ -303,9 +282,9 @@ }; // Basic session information - std::string chat_id_; - std::string username_; - moqt::FullTrackName my_track_name_; + const std::string chat_id_; + const std::string username_; + const moqt::FullTrackName my_track_name_; // General state variables std::unique_ptr<quic::QuicEventLoop> event_loop_; @@ -325,8 +304,7 @@ std::unique_ptr<RemoteTrackVisitor> remote_track_visitor_; // Handling incoming and outgoing messages - quiche::QuicheCircularDeque<std::string> incoming_messages_; - bool entering_data_ = false; + moqt::FullSequence next_sequence_ = {0, 0}; }; // A client for MoQT over chat, used for interop testing. See @@ -358,11 +336,36 @@ } if (client.session_is_open()) { std::cout << "Fully connected. Press ENTER to begin input of message, " - << "ENTER when done.\n"; + << "ENTER when done. Exit session if the message is ':exit'\n"; } - ChatClient::InputHandler input_thread(&client); - while (client.session_is_open()) { - client.RunEventLoop(); + bool session_is_open = client.session_is_open(); + struct pollfd poll_settings = { + 0, + POLLIN, + POLLIN, + }; + while (session_is_open) { + std::string message_to_send; + while (poll(&poll_settings, 1, 0) <= 0) { + client.RunEventLoop(); + } + std::cin.ignore(10, '\n'); + std::cout << username << ": "; + std::getline(std::cin, message_to_send); + if (message_to_send.empty()) { + continue; + } + if (message_to_send == ":exit") { + std::cout << "Exiting the app.\n"; + // TODO: Close the session. + session_is_open = false; + break; + } + client.session()->PublishObject( + client.my_track_name(), client.next_sequence().group++, + client.next_sequence().object, /*object_send_order=*/0, + moqt::MoqtForwardingPreference::kObject, message_to_send, + /*payload_length=*/std::nullopt, true); } return 0; }