Object handling in MoQT chat client.

Threaded model was fatally flawed; now it works correctly.

PiperOrigin-RevId: 608007963
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index 11feb9e..40285d5 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -259,7 +259,7 @@
   return new_stream->GetStreamId();
 }
 
-bool MoqtSession::PublishObject(FullTrackName& full_track_name,
+bool MoqtSession::PublishObject(const FullTrackName& full_track_name,
                                 uint64_t group_id, uint64_t object_id,
                                 uint64_t object_send_order,
                                 MoqtForwardingPreference forwarding_preference,
@@ -330,8 +330,8 @@
       ++failures;
       continue;
     }
-    QUICHE_LOG(INFO) << ENDPOINT << "Sending object "
-                     << full_track_name.track_namespace << ":"
+    QUICHE_LOG(INFO) << ENDPOINT << "Sending object length " << payload.length()
+                     << " for " << full_track_name.track_namespace << ":"
                      << full_track_name.track_name << " with sequence "
                      << object.group_id << ":" << object.object_id
                      << " on stream " << *stream_id;
@@ -599,7 +599,7 @@
   }
   MoqtSubscribe& subscribe = it->second.message;
   QUIC_DLOG(INFO) << ENDPOINT << "Received the SUBSCRIBE_OK for "
-                  << "subscribe_id = " << message.subscribe_id
+                  << "subscribe_id = " << message.subscribe_id << " "
                   << subscribe.track_namespace << ":" << subscribe.track_name;
   // Copy the Remote Track from session_->active_subscribes_ to
   // session_->remote_tracks_.
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index 911849a..ee0720f 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -117,7 +117,7 @@
   // |payload|.
   // |payload.length() >= |payload_length|, because the application can deliver
   // partial objects.
-  bool PublishObject(FullTrackName& full_track_name, uint64_t group_id,
+  bool PublishObject(const FullTrackName& full_track_name, uint64_t group_id,
                      uint64_t object_id, uint64_t object_send_order,
                      MoqtForwardingPreference forwarding_preference,
                      absl::string_view payload,
diff --git a/quiche/quic/moqt/tools/chat_client_bin.cc b/quiche/quic/moqt/tools/chat_client_bin.cc
index c5cb070..cab2790 100644
--- a/quiche/quic/moqt/tools/chat_client_bin.cc
+++ b/quiche/quic/moqt/tools/chat_client_bin.cc
@@ -2,6 +2,9 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
+#include <poll.h>
+#include <unistd.h>
+
 #include <cstdint>
 #include <iostream>
 #include <memory>
@@ -26,13 +29,11 @@
 #include "quiche/quic/moqt/tools/moqt_client.h"
 #include "quiche/quic/platform/api/quic_default_proof_providers.h"
 #include "quiche/quic/platform/api/quic_socket_address.h"
-#include "quiche/quic/platform/api/quic_thread.h"
 #include "quiche/quic/tools/fake_proof_verifier.h"
 #include "quiche/quic/tools/quic_name_lookup.h"
 #include "quiche/quic/tools/quic_url.h"
 #include "quiche/common/platform/api/quiche_command_line_flags.h"
 #include "quiche/common/platform/api/quiche_export.h"
-#include "quiche/common/quiche_circular_deque.h"
 
 DEFINE_QUICHE_COMMAND_LINE_FLAG(
     bool, disable_certificate_verification, false,
@@ -79,14 +80,20 @@
 
   bool session_is_open() const { return session_is_open_; }
   bool is_syncing() const {
-    return catalog_group_.has_value() || subscribes_to_make_ > 0 ||
+    return !catalog_group_.has_value() || subscribes_to_make_ > 0 ||
            !session_->HasSubscribers(my_track_name_);
   }
 
   void RunEventLoop() {
-    event_loop_->RunEventLoopOnce(quic::QuicTime::Delta::FromSeconds(5));
+    event_loop_->RunEventLoopOnce(quic::QuicTime::Delta::FromMilliseconds(500));
   }
 
+  const moqt::FullTrackName& my_track_name() { return my_track_name_; }
+
+  moqt::MoqtSession* session() { return session_; }
+
+  moqt::FullSequence& next_sequence() { return next_sequence_; }
+
   class QUICHE_EXPORT RemoteTrackVisitor : public moqt::RemoteTrack::Visitor {
    public:
     RemoteTrackVisitor(ChatClient* client) : client_(client) {}
@@ -125,6 +132,13 @@
         return;
       }
       // TODO: Message is from another chat participant
+      std::string username = full_track_name.track_namespace;
+      username = username.substr(username.find_last_of('/') + 1);
+      if (!client_->other_users_.contains(username)) {
+        std::cout << "Username " << username << "doesn't exist\n";
+        return;
+      }
+      std::cout << username << ": " << object << "\n";
     }
 
    private:
@@ -166,41 +180,6 @@
     return true;
   }
 
-  class InputHandler : quic::QuicThread {
-   public:
-    explicit InputHandler(ChatClient* client)
-        : quic::QuicThread("InputThread"), client_(client) {}
-
-    void Run() final {
-      while (client_->session_is_open_) {
-        std::string message_to_send;
-        std::cin >> message_to_send;  // Waiting to start input
-        client_->entering_data_ = true;
-        std::cout << "> ";
-        std::cin >> message_to_send;
-        client_->entering_data_ = false;
-        while (!client_->incoming_messages_.empty()) {
-          std::cout << client_->incoming_messages_.front() << "\n";
-          client_->incoming_messages_.pop_front();
-        }
-        if (message_to_send.empty()) {
-          continue;
-        }
-        if (message_to_send == ":exit") {
-          std::cout << "Exiting the app.\n";
-          // TODO: Close the session.
-          client_->session_is_open_ = false;
-          break;
-        }
-        // TODO: Send the message
-        std::cout << client_->username_ << ": " << message_to_send << "\n";
-      }
-    }
-
-   private:
-    ChatClient* client_;
-  };
-
  private:
   moqt::FullTrackName UsernameToTrackName(absl::string_view username) {
     return moqt::FullTrackName(
@@ -303,9 +282,9 @@
   };
 
   // Basic session information
-  std::string chat_id_;
-  std::string username_;
-  moqt::FullTrackName my_track_name_;
+  const std::string chat_id_;
+  const std::string username_;
+  const moqt::FullTrackName my_track_name_;
 
   // General state variables
   std::unique_ptr<quic::QuicEventLoop> event_loop_;
@@ -325,8 +304,7 @@
   std::unique_ptr<RemoteTrackVisitor> remote_track_visitor_;
 
   // Handling incoming and outgoing messages
-  quiche::QuicheCircularDeque<std::string> incoming_messages_;
-  bool entering_data_ = false;
+  moqt::FullSequence next_sequence_ = {0, 0};
 };
 
 // A client for MoQT over chat, used for interop testing. See
@@ -358,11 +336,36 @@
   }
   if (client.session_is_open()) {
     std::cout << "Fully connected. Press ENTER to begin input of message, "
-              << "ENTER when done.\n";
+              << "ENTER when done. Exit session if the message is ':exit'\n";
   }
-  ChatClient::InputHandler input_thread(&client);
-  while (client.session_is_open()) {
-    client.RunEventLoop();
+  bool session_is_open = client.session_is_open();
+  struct pollfd poll_settings = {
+      0,
+      POLLIN,
+      POLLIN,
+  };
+  while (session_is_open) {
+    std::string message_to_send;
+    while (poll(&poll_settings, 1, 0) <= 0) {
+      client.RunEventLoop();
+    }
+    std::cin.ignore(10, '\n');
+    std::cout << username << ": ";
+    std::getline(std::cin, message_to_send);
+    if (message_to_send.empty()) {
+      continue;
+    }
+    if (message_to_send == ":exit") {
+      std::cout << "Exiting the app.\n";
+      // TODO: Close the session.
+      session_is_open = false;
+      break;
+    }
+    client.session()->PublishObject(
+        client.my_track_name(), client.next_sequence().group++,
+        client.next_sequence().object, /*object_send_order=*/0,
+        moqt::MoqtForwardingPreference::kObject, message_to_send,
+        /*payload_length=*/std::nullopt, true);
   }
   return 0;
 }