Record MOQT-specific stream annotations into QUIC traces if available.
PiperOrigin-RevId: 817917687
diff --git a/build/source_list.bzl b/build/source_list.bzl
index daff02a..5180db2 100644
--- a/build/source_list.bzl
+++ b/build/source_list.bzl
@@ -1573,6 +1573,7 @@
"quic/moqt/moqt_session_callbacks.h",
"quic/moqt/moqt_session_interface.h",
"quic/moqt/moqt_subscribe_windows.h",
+ "quic/moqt/moqt_trace_recorder.h",
"quic/moqt/moqt_track.h",
"quic/moqt/relay_namespace_tree.h",
"quic/moqt/session_namespace_tree.h",
@@ -1598,6 +1599,7 @@
"quic/moqt/moqt_relay_track_publisher.cc",
"quic/moqt/moqt_session.cc",
"quic/moqt/moqt_subscribe_windows.cc",
+ "quic/moqt/moqt_trace_recorder.cc",
"quic/moqt/moqt_track.cc",
"quic/moqt/tools/chat_client.cc",
"quic/moqt/tools/chat_server.cc",
diff --git a/build/source_list.gni b/build/source_list.gni
index 93b2900..a8fbd1c 100644
--- a/build/source_list.gni
+++ b/build/source_list.gni
@@ -1577,6 +1577,7 @@
"src/quiche/quic/moqt/moqt_session_callbacks.h",
"src/quiche/quic/moqt/moqt_session_interface.h",
"src/quiche/quic/moqt/moqt_subscribe_windows.h",
+ "src/quiche/quic/moqt/moqt_trace_recorder.h",
"src/quiche/quic/moqt/moqt_track.h",
"src/quiche/quic/moqt/relay_namespace_tree.h",
"src/quiche/quic/moqt/session_namespace_tree.h",
@@ -1602,6 +1603,7 @@
"src/quiche/quic/moqt/moqt_relay_track_publisher.cc",
"src/quiche/quic/moqt/moqt_session.cc",
"src/quiche/quic/moqt/moqt_subscribe_windows.cc",
+ "src/quiche/quic/moqt/moqt_trace_recorder.cc",
"src/quiche/quic/moqt/moqt_track.cc",
"src/quiche/quic/moqt/tools/chat_client.cc",
"src/quiche/quic/moqt/tools/chat_server.cc",
diff --git a/build/source_list.json b/build/source_list.json
index c62f5e3..5614381 100644
--- a/build/source_list.json
+++ b/build/source_list.json
@@ -1576,6 +1576,7 @@
"quiche/quic/moqt/moqt_session_callbacks.h",
"quiche/quic/moqt/moqt_session_interface.h",
"quiche/quic/moqt/moqt_subscribe_windows.h",
+ "quiche/quic/moqt/moqt_trace_recorder.h",
"quiche/quic/moqt/moqt_track.h",
"quiche/quic/moqt/relay_namespace_tree.h",
"quiche/quic/moqt/session_namespace_tree.h",
@@ -1601,6 +1602,7 @@
"quiche/quic/moqt/moqt_relay_track_publisher.cc",
"quiche/quic/moqt/moqt_session.cc",
"quiche/quic/moqt/moqt_subscribe_windows.cc",
+ "quiche/quic/moqt/moqt_trace_recorder.cc",
"quiche/quic/moqt/moqt_track.cc",
"quiche/quic/moqt/tools/chat_client.cc",
"quiche/quic/moqt/tools/chat_server.cc",
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc
index e5d3f8d..ac28942 100644
--- a/quiche/quic/moqt/moqt_integration_test.cc
+++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -31,6 +31,7 @@
#include "quiche/quic/moqt/test_tools/moqt_simulator_harness.h"
#include "quiche/quic/test_tools/quic_test_utils.h"
#include "quiche/quic/test_tools/simulator/test_harness.h"
+#include "quic_trace/quic_trace.pb.h"
#include "quiche/common/platform/api/quiche_test.h"
namespace moqt::test {
@@ -60,6 +61,10 @@
server_->session()->callbacks() = server_callbacks_.AsSessionCallbacks();
server_->session()->callbacks().clock =
test_harness_.simulator().GetClock();
+
+ client_->RecordTrace();
+ client_->session()->trace_recorder().set_trace(
+ client_->trace_visitor()->trace());
}
void WireUpEndpoints() { test_harness_.WireUpEndpoints(); }
@@ -842,7 +847,8 @@
EstablishSession();
MoqtProbeManager probe_manager(client_->session()->session(),
test_harness_.simulator().GetClock(),
- *test_harness_.simulator().GetAlarmFactory());
+ *test_harness_.simulator().GetAlarmFactory(),
+ &client_->session()->trace_recorder());
constexpr quic::QuicBandwidth kModelBandwidth =
quic::simulator::TestHarness::kServerBandwidth;
@@ -850,14 +856,77 @@
constexpr quic::QuicTimeDelta kProbeTimeout =
kModelBandwidth.TransferTime(kProbeSize) * 10;
bool probe_done = false;
- probe_manager.StartProbe(kProbeSize, kProbeTimeout,
- [&probe_done](const ProbeResult& result) {
- probe_done = true;
- EXPECT_EQ(result.status, ProbeStatus::kSuccess);
- });
+ std::optional<ProbeId> probe_id = probe_manager.StartProbe(
+ kProbeSize, kProbeTimeout, [&probe_done](const ProbeResult& result) {
+ probe_done = true;
+ EXPECT_EQ(result.status, ProbeStatus::kSuccess);
+ });
bool success =
test_harness_.RunUntilWithDefaultTimeout([&]() { return probe_done; });
EXPECT_TRUE(success);
+
+ int probe_streams = 0;
+ for (const quic_trace::StreamAnnotation& annotation :
+ client_->trace_visitor()->trace()->stream_annotations()) {
+ if (annotation.has_moqt_probe_stream()) {
+ ++probe_streams;
+ EXPECT_EQ(probe_id, annotation.moqt_probe_stream().probe_id());
+ }
+ }
+ EXPECT_EQ(probe_streams, 1);
+}
+
+TEST_F(MoqtIntegrationTest, RecordTrace) {
+ EstablishSession();
+ MoqtKnownTrackPublisher publisher;
+ client_->session()->set_publisher(&publisher);
+
+ MockSubscribeRemoteTrackVisitor client_visitor;
+ auto queue = std::make_shared<MoqtOutgoingQueue>(
+ FullTrackName{"test", "subgroup"}, MoqtForwardingPreference::kSubgroup);
+ publisher.Add(queue);
+
+ server_->session()->SubscribeCurrentObject(FullTrackName("test", "subgroup"),
+ &client_visitor,
+ VersionSpecificParameters());
+ bool subscribed = false;
+ EXPECT_CALL(client_visitor, OnReply)
+ .WillOnce([&](const FullTrackName&,
+ std::variant<SubscribeOkData, MoqtRequestError>) {
+ subscribed = true;
+ });
+ bool success =
+ test_harness_.RunUntilWithDefaultTimeout([&]() { return subscribed; });
+ EXPECT_TRUE(success);
+
+ queue->AddObject(MemSliceFromString("object"), /*key=*/true);
+ int received = 0;
+ EXPECT_CALL(client_visitor,
+ OnObjectFragment(_,
+ MetadataLocationAndStatus(
+ Location{0, 0}, MoqtObjectStatus::kNormal),
+ "object", true))
+ .WillOnce([&] { ++received; });
+
+ success =
+ test_harness_.RunUntilWithDefaultTimeout([&]() { return received >= 1; });
+ EXPECT_TRUE(success);
+
+ int control_streams = 0;
+ int subgroup_streams = 0;
+ for (const quic_trace::StreamAnnotation& annotation :
+ client_->trace_visitor()->trace()->stream_annotations()) {
+ if (annotation.moqt_control_stream()) {
+ ++control_streams;
+ }
+ if (annotation.has_moqt_subgroup_stream()) {
+ ++subgroup_streams;
+ EXPECT_EQ(annotation.moqt_subgroup_stream().group_id(), 0);
+ EXPECT_EQ(annotation.moqt_subgroup_stream().subgroup_id(), 0);
+ }
+ }
+ EXPECT_EQ(control_streams, 1);
+ EXPECT_EQ(subgroup_streams, 1);
}
} // namespace
diff --git a/quiche/quic/moqt/moqt_probe_manager.cc b/quiche/quic/moqt/moqt_probe_manager.cc
index 2ad86df..3ff5bcc 100644
--- a/quiche/quic/moqt/moqt_probe_manager.cc
+++ b/quiche/quic/moqt/moqt_probe_manager.cc
@@ -42,6 +42,7 @@
if (stream == nullptr) {
return std::nullopt;
}
+ trace_recorder_->RecordProbeStreamCreated(stream->GetStreamId(), id);
probe_ = PendingProbe{
id, clock_->ApproximateNow(), clock_->ApproximateNow() + timeout,
diff --git a/quiche/quic/moqt/moqt_probe_manager.h b/quiche/quic/moqt/moqt_probe_manager.h
index 0458a92..f2d8639 100644
--- a/quiche/quic/moqt/moqt_probe_manager.h
+++ b/quiche/quic/moqt/moqt_probe_manager.h
@@ -14,6 +14,7 @@
#include "quiche/quic/core/quic_clock.h"
#include "quiche/quic/core/quic_time.h"
#include "quiche/quic/core/quic_types.h"
+#include "quiche/quic/moqt/moqt_trace_recorder.h"
#include "quiche/common/quiche_callbacks.h"
#include "quiche/web_transport/web_transport.h"
@@ -68,9 +69,11 @@
public:
explicit MoqtProbeManager(webtransport::Session* session,
const quic::QuicClock* clock,
- quic::QuicAlarmFactory& alarm_factory)
+ quic::QuicAlarmFactory& alarm_factory,
+ MoqtTraceRecorder* trace_recorder)
: session_(session),
clock_(clock),
+ trace_recorder_(trace_recorder),
timeout_alarm_(alarm_factory.CreateAlarm(new AlarmDelegate(this))) {}
// MoqtProbeManagerInterface implementation.
@@ -140,6 +143,7 @@
std::optional<PendingProbe> probe_;
webtransport::Session* session_;
const quic::QuicClock* clock_;
+ MoqtTraceRecorder* trace_recorder_;
std::unique_ptr<quic::QuicAlarm> timeout_alarm_;
ProbeId next_probe_id_ = 0;
};
diff --git a/quiche/quic/moqt/moqt_probe_manager_test.cc b/quiche/quic/moqt/moqt_probe_manager_test.cc
index 9e41f9b..d267886 100644
--- a/quiche/quic/moqt/moqt_probe_manager_test.cc
+++ b/quiche/quic/moqt/moqt_probe_manager_test.cc
@@ -15,6 +15,7 @@
#include "absl/types/span.h"
#include "quiche/quic/core/quic_time.h"
#include "quiche/quic/core/quic_types.h"
+#include "quiche/quic/moqt/moqt_trace_recorder.h"
#include "quiche/quic/test_tools/mock_clock.h"
#include "quiche/quic/test_tools/quic_test_utils.h"
#include "quiche/common/platform/api/quiche_logging.h"
@@ -74,10 +75,12 @@
class MoqtProbeManagerTest : public quiche::test::QuicheTest {
protected:
- MoqtProbeManagerTest() : manager_(&session_, &clock_, alarm_factory_) {}
+ MoqtProbeManagerTest()
+ : manager_(&session_, &clock_, alarm_factory_, &trace_recorder_) {}
webtransport::test::MockSession session_;
quic::MockClock clock_;
+ MoqtTraceRecorder trace_recorder_;
quic::test::MockAlarmFactory alarm_factory_;
MoqtProbeManager manager_;
};
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index f9b47a2..64ba595 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -936,6 +936,7 @@
stream_->SetPriority(
webtransport::StreamPriority{/*send_group_id=*/kMoqtSendGroupId,
/*send_order=*/kMoqtControlStreamSendOrder});
+ session->trace_recorder_.RecordControlStreamCreated(stream->GetStreamId());
}
void MoqtSession::ControlStream::OnCanRead() {
@@ -2320,6 +2321,10 @@
next_object_(parameters.first_object),
session_liveness_(session->liveness_token_) {
UpdateSendOrder(subscription);
+ if (subscription.track_alias().has_value()) {
+ session->trace_recorder_.RecordSubgroupStreamCreated(
+ stream->GetStreamId(), *subscription.track_alias(), parameters.index);
+ }
}
MoqtSession::OutgoingDataStream::~OutgoingDataStream() {
@@ -2574,4 +2579,13 @@
delivery_timeout_alarm_->Set(deadline);
}
+MoqtSession::PublishedFetch::FetchStreamVisitor::FetchStreamVisitor(
+ std::shared_ptr<PublishedFetch> fetch, webtransport::Stream* stream)
+ : fetch_(fetch), stream_(stream) {
+ fetch->fetch_task()->SetObjectAvailableCallback(
+ [this]() { this->OnCanWrite(); });
+ fetch->session()->trace_recorder_.RecordFetchStreamCreated(
+ stream->GetStreamId());
+}
+
} // namespace moqt
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index f3e812d..14c17ff 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -31,6 +31,7 @@
#include "quiche/quic/moqt/moqt_session_callbacks.h"
#include "quiche/quic/moqt/moqt_session_interface.h"
#include "quiche/quic/moqt/moqt_subscribe_windows.h"
+#include "quiche/quic/moqt/moqt_trace_recorder.h"
#include "quiche/quic/moqt/moqt_track.h"
#include "quiche/quic/moqt/session_namespace_tree.h"
#include "quiche/common/platform/api/quiche_export.h"
@@ -197,6 +198,8 @@
void UseAlternateDeliveryTimeout() { alternate_delivery_timeout_ = true; }
+ MoqtTraceRecorder& trace_recorder() { return trace_recorder_; }
+
private:
friend class test::MoqtSessionPeer;
@@ -582,11 +585,7 @@
class FetchStreamVisitor : public webtransport::StreamVisitor {
public:
FetchStreamVisitor(std::shared_ptr<PublishedFetch> fetch,
- webtransport::Stream* stream)
- : fetch_(fetch), stream_(stream) {
- fetch->fetch_task()->SetObjectAvailableCallback(
- [this]() { this->OnCanWrite(); });
- }
+ webtransport::Stream* stream);
~FetchStreamVisitor() {
std::shared_ptr<PublishedFetch> fetch = fetch_.lock();
if (fetch != nullptr) {
@@ -877,6 +876,8 @@
// the first object of group n+1 arrives.
bool alternate_delivery_timeout_ = false;
+ MoqtTraceRecorder trace_recorder_;
+
quiche::QuicheWeakPtrFactory<MoqtSessionInterface> weak_ptr_factory_;
// Must be last. Token used to make sure that the streams do not call into
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index 1f32f9d..9005ed1 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -265,7 +265,7 @@
visitor = std::move(new_visitor);
});
EXPECT_CALL(mock_stream_, GetStreamId())
- .WillOnce(Return(webtransport::StreamId(4)));
+ .WillRepeatedly(Return(webtransport::StreamId(4)));
EXPECT_CALL(mock_session_, GetStreamById(4)).WillOnce(Return(&mock_stream_));
EXPECT_CALL(mock_stream_, visitor()).WillOnce([&] { return visitor.get(); });
EXPECT_CALL(mock_stream_,
@@ -1919,7 +1919,7 @@
visitor = std::move(new_visitor);
});
EXPECT_CALL(mock_stream_, GetStreamId())
- .WillOnce(Return(webtransport::StreamId(4)));
+ .WillRepeatedly(Return(webtransport::StreamId(4)));
EXPECT_CALL(mock_session_, GetStreamById(4)).WillOnce(Return(&mock_stream_));
EXPECT_CALL(mock_stream_, visitor()).WillOnce([&] { return visitor.get(); });
EXPECT_CALL(mock_stream_,
diff --git a/quiche/quic/moqt/moqt_trace_recorder.cc b/quiche/quic/moqt/moqt_trace_recorder.cc
new file mode 100644
index 0000000..549d6ac
--- /dev/null
+++ b/quiche/quic/moqt/moqt_trace_recorder.cc
@@ -0,0 +1,58 @@
+// Copyright 2025 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quiche/quic/moqt/moqt_trace_recorder.h"
+
+#include <cstdint>
+
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quic_trace/quic_trace.pb.h"
+#include "quiche/web_transport/web_transport.h"
+
+namespace moqt {
+
+void MoqtTraceRecorder::RecordControlStreamCreated(
+ webtransport::StreamId stream_id) {
+ if (trace_ == nullptr) {
+ return;
+ }
+ quic_trace::StreamAnnotation* annotation = trace_->add_stream_annotations();
+ annotation->set_stream_id(stream_id);
+ annotation->set_moqt_control_stream(true);
+}
+
+void MoqtTraceRecorder::RecordSubgroupStreamCreated(
+ webtransport::StreamId stream_id, uint64_t track_alias,
+ DataStreamIndex index) {
+ if (trace_ == nullptr) {
+ return;
+ }
+ quic_trace::StreamAnnotation* annotation = trace_->add_stream_annotations();
+ annotation->set_stream_id(stream_id);
+ annotation->mutable_moqt_subgroup_stream()->set_track_alias(track_alias);
+ annotation->mutable_moqt_subgroup_stream()->set_group_id(index.group);
+ annotation->mutable_moqt_subgroup_stream()->set_subgroup_id(index.subgroup);
+}
+
+void MoqtTraceRecorder::RecordFetchStreamCreated(
+ webtransport::StreamId stream_id) {
+ if (trace_ == nullptr) {
+ return;
+ }
+ quic_trace::StreamAnnotation* annotation = trace_->add_stream_annotations();
+ annotation->set_stream_id(stream_id);
+ annotation->mutable_moqt_fetch_stream();
+}
+
+void MoqtTraceRecorder::RecordProbeStreamCreated(
+ webtransport::StreamId stream_id, uint64_t probe_id) {
+ if (trace_ == nullptr) {
+ return;
+ }
+ quic_trace::StreamAnnotation* annotation = trace_->add_stream_annotations();
+ annotation->set_stream_id(stream_id);
+ annotation->mutable_moqt_probe_stream()->set_probe_id(probe_id);
+}
+
+} // namespace moqt
diff --git a/quiche/quic/moqt/moqt_trace_recorder.h b/quiche/quic/moqt/moqt_trace_recorder.h
new file mode 100644
index 0000000..80c84bb
--- /dev/null
+++ b/quiche/quic/moqt/moqt_trace_recorder.h
@@ -0,0 +1,52 @@
+// Copyright 2025 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_MOQT_MOQT_TRACE_RECORDER_H_
+#define QUICHE_QUIC_MOQT_MOQT_TRACE_RECORDER_H_
+
+#include <cstdint>
+
+#include "absl/base/nullability.h"
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quic_trace/quic_trace.pb.h"
+#include "quiche/web_transport/web_transport.h"
+
+namespace moqt {
+
+// Records MOQT-specific information into the provided QUIC trace proto. The
+// wrapped trace can be nullptr, in which case no recording takes place.
+class MoqtTraceRecorder {
+ public:
+ MoqtTraceRecorder() : trace_(nullptr) {}
+ explicit MoqtTraceRecorder(quic_trace::Trace* absl_nullable trace)
+ : trace_(trace) {}
+
+ MoqtTraceRecorder(const MoqtTraceRecorder&) = delete;
+ MoqtTraceRecorder(MoqtTraceRecorder&&) = delete;
+ MoqtTraceRecorder& operator=(const MoqtTraceRecorder&) = delete;
+ MoqtTraceRecorder& operator=(MoqtTraceRecorder&&) = delete;
+
+ void set_trace(quic_trace::Trace* absl_nullable trace) { trace_ = trace; }
+
+ // Annotates the specified stream as the MOQT control stream.
+ void RecordControlStreamCreated(webtransport::StreamId stream_id);
+
+ // Annotates the specified stream as an MOQT subgroup data stream.
+ void RecordSubgroupStreamCreated(webtransport::StreamId stream_id,
+ uint64_t track_alias, DataStreamIndex index);
+
+ // Annotates the specified stream as an MOQT fetch data stream.
+ void RecordFetchStreamCreated(webtransport::StreamId stream_id);
+
+ // Annotates the specified stream as an MOQT probe stream.
+ void RecordProbeStreamCreated(webtransport::StreamId stream_id,
+ uint64_t probe_id);
+
+ private:
+ quic_trace::Trace* absl_nullable trace_;
+};
+
+} // namespace moqt
+
+#endif // QUICHE_QUIC_MOQT_MOQT_TRACE_RECORDER_H_
diff --git a/quiche/quic/moqt/tools/moqt_simulator_bin.cc b/quiche/quic/moqt/tools/moqt_simulator_bin.cc
index 9bf70ec..059b59d 100644
--- a/quiche/quic/moqt/tools/moqt_simulator_bin.cc
+++ b/quiche/quic/moqt/tools/moqt_simulator_bin.cc
@@ -39,6 +39,7 @@
#include "quiche/quic/moqt/moqt_priority.h"
#include "quiche/quic/moqt/moqt_session.h"
#include "quiche/quic/moqt/moqt_session_interface.h"
+#include "quiche/quic/moqt/moqt_trace_recorder.h"
#include "quiche/quic/moqt/test_tools/moqt_simulator_harness.h"
#include "quiche/quic/test_tools/simulator/actor.h"
#include "quiche/quic/test_tools/simulator/link.h"
@@ -415,6 +416,9 @@
timeout);
}
client_endpoint_.RecordTrace();
+ QUICHE_DCHECK(client_endpoint_.trace_visitor() != nullptr);
+ client_endpoint_.session()->trace_recorder().set_trace(
+ client_endpoint_.trace_visitor()->trace());
}
MoqtSession* client_session() { return client_endpoint_.session(); }
diff --git a/quiche/quic/test_tools/simulator/quic_endpoint_base.h b/quiche/quic/test_tools/simulator/quic_endpoint_base.h
index 336b2f9..797bfd6 100644
--- a/quiche/quic/test_tools/simulator/quic_endpoint_base.h
+++ b/quiche/quic/test_tools/simulator/quic_endpoint_base.h
@@ -7,6 +7,7 @@
#include <memory>
+#include "absl/base/nullability.h"
#include "absl/container/flat_hash_map.h"
#include "quiche/quic/core/crypto/null_decrypter.h"
#include "quiche/quic/core/crypto/null_encrypter.h"
@@ -69,6 +70,11 @@
// Queue::ListenerInterface method.
void OnPacketDequeued() override;
+ // Returns the trace visitor associated with the connection.
+ QuicTraceVisitor* absl_nullable trace_visitor() {
+ return trace_visitor_.get();
+ }
+
protected:
// A Writer object that writes into the |nic_tx_queue_|.
class Writer : public QuicPacketWriter {