Add MoqtProbeManager This is an auxiliary class responsible for sending and timing out probes. PiperOrigin-RevId: 688186757
diff --git a/build/source_list.bzl b/build/source_list.bzl index 5aeac68..4f150d5 100644 --- a/build/source_list.bzl +++ b/build/source_list.bzl
@@ -1514,6 +1514,7 @@ "quic/moqt/moqt_outgoing_queue.h", "quic/moqt/moqt_parser.h", "quic/moqt/moqt_priority.h", + "quic/moqt/moqt_probe_manager.h", "quic/moqt/moqt_publisher.h", "quic/moqt/moqt_session.h", "quic/moqt/moqt_subscribe_windows.h", @@ -1545,6 +1546,8 @@ "quic/moqt/moqt_parser_test.cc", "quic/moqt/moqt_priority.cc", "quic/moqt/moqt_priority_test.cc", + "quic/moqt/moqt_probe_manager.cc", + "quic/moqt/moqt_probe_manager_test.cc", "quic/moqt/moqt_session.cc", "quic/moqt/moqt_session_test.cc", "quic/moqt/moqt_subscribe_windows.cc",
diff --git a/build/source_list.gni b/build/source_list.gni index ce85b0e..977a2eb 100644 --- a/build/source_list.gni +++ b/build/source_list.gni
@@ -1518,6 +1518,7 @@ "src/quiche/quic/moqt/moqt_outgoing_queue.h", "src/quiche/quic/moqt/moqt_parser.h", "src/quiche/quic/moqt/moqt_priority.h", + "src/quiche/quic/moqt/moqt_probe_manager.h", "src/quiche/quic/moqt/moqt_publisher.h", "src/quiche/quic/moqt/moqt_session.h", "src/quiche/quic/moqt/moqt_subscribe_windows.h", @@ -1549,6 +1550,8 @@ "src/quiche/quic/moqt/moqt_parser_test.cc", "src/quiche/quic/moqt/moqt_priority.cc", "src/quiche/quic/moqt/moqt_priority_test.cc", + "src/quiche/quic/moqt/moqt_probe_manager.cc", + "src/quiche/quic/moqt/moqt_probe_manager_test.cc", "src/quiche/quic/moqt/moqt_session.cc", "src/quiche/quic/moqt/moqt_session_test.cc", "src/quiche/quic/moqt/moqt_subscribe_windows.cc",
diff --git a/build/source_list.json b/build/source_list.json index c10a0a9..0ebf8f2 100644 --- a/build/source_list.json +++ b/build/source_list.json
@@ -1517,6 +1517,7 @@ "quiche/quic/moqt/moqt_outgoing_queue.h", "quiche/quic/moqt/moqt_parser.h", "quiche/quic/moqt/moqt_priority.h", + "quiche/quic/moqt/moqt_probe_manager.h", "quiche/quic/moqt/moqt_publisher.h", "quiche/quic/moqt/moqt_session.h", "quiche/quic/moqt/moqt_subscribe_windows.h", @@ -1548,6 +1549,8 @@ "quiche/quic/moqt/moqt_parser_test.cc", "quiche/quic/moqt/moqt_priority.cc", "quiche/quic/moqt/moqt_priority_test.cc", + "quiche/quic/moqt/moqt_probe_manager.cc", + "quiche/quic/moqt/moqt_probe_manager_test.cc", "quiche/quic/moqt/moqt_session.cc", "quiche/quic/moqt/moqt_session_test.cc", "quiche/quic/moqt/moqt_subscribe_windows.cc",
diff --git a/quiche/quic/moqt/moqt_priority.cc b/quiche/quic/moqt/moqt_priority.cc index 22a8f7a..b55546e 100644 --- a/quiche/quic/moqt/moqt_priority.cc +++ b/quiche/quic/moqt/moqt_priority.cc
@@ -75,5 +75,7 @@ const webtransport::SendOrder kMoqtControlStreamSendOrder = std::numeric_limits<webtransport::SendOrder>::max(); +const webtransport::SendOrder kMoqtProbeStreamSendOrder = + std::numeric_limits<webtransport::SendOrder>::min(); } // namespace moqt
diff --git a/quiche/quic/moqt/moqt_priority.h b/quiche/quic/moqt/moqt_priority.h index 15f8a55..a16d7e0 100644 --- a/quiche/quic/moqt/moqt_priority.h +++ b/quiche/quic/moqt/moqt_priority.h
@@ -39,6 +39,9 @@ // WebTransport send order set on the MoQT control stream. QUICHE_EXPORT extern const webtransport::SendOrder kMoqtControlStreamSendOrder; +// WebTransport send order set on MoQT bandwidth probe streams. +QUICHE_EXPORT extern const webtransport::SendOrder kMoqtProbeStreamSendOrder; + } // namespace moqt #endif // QUICHE_QUIC_MOQT_MOQT_PRIORITY_H_
diff --git a/quiche/quic/moqt/moqt_probe_manager.cc b/quiche/quic/moqt/moqt_probe_manager.cc new file mode 100644 index 0000000..c3745b3 --- /dev/null +++ b/quiche/quic/moqt/moqt_probe_manager.cc
@@ -0,0 +1,137 @@ +// Copyright 2024 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "quiche/quic/moqt/moqt_probe_manager.h" + +#include <algorithm> +#include <memory> +#include <optional> +#include <utility> + +#include "absl/status/status.h" +#include "absl/strings/string_view.h" +#include "absl/types/span.h" +#include "quiche/quic/core/quic_time.h" +#include "quiche/quic/core/quic_types.h" +#include "quiche/quic/moqt/moqt_messages.h" +#include "quiche/quic/moqt/moqt_priority.h" +#include "quiche/common/platform/api/quiche_bug_tracker.h" +#include "quiche/common/platform/api/quiche_logging.h" +#include "quiche/common/quiche_stream.h" +#include "quiche/common/wire_serialization.h" +#include "quiche/web_transport/web_transport.h" + +namespace moqt { + +namespace { +constexpr quic::QuicByteCount kWriteChunkSize = 4096; +constexpr char kZeroes[kWriteChunkSize] = {0}; +} // namespace + +std::optional<ProbeId> MoqtProbeManager::StartProbe( + quic::QuicByteCount probe_size, quic::QuicTimeDelta timeout, + Callback callback) { + if (probe_.has_value()) { + return std::nullopt; + } + + ProbeId id = next_probe_id_++; + webtransport::Stream* stream = session_->OpenOutgoingUnidirectionalStream(); + if (stream == nullptr) { + return std::nullopt; + } + + probe_ = PendingProbe{ + id, clock_->ApproximateNow(), clock_->ApproximateNow() + timeout, + probe_size, stream->GetStreamId(), std::move(callback)}; + auto visitor_owned = + std::make_unique<ProbeStreamVisitor>(this, stream, id, probe_size); + ProbeStreamVisitor* visitor = visitor_owned.get(); + stream->SetVisitor(std::move(visitor_owned)); + stream->SetPriority(webtransport::StreamPriority{ + /*send_group_id=*/0, /*send_order=*/kMoqtProbeStreamSendOrder}); + visitor->OnCanWrite(); + RescheduleAlarm(); + return id; +} + +std::optional<ProbeId> MoqtProbeManager::StopProbe() { + if (!probe_.has_value()) { + return std::nullopt; + } + ProbeId id = probe_->id; + ClosePendingProbe(ProbeStatus::kAborted); + return id; +} + +void MoqtProbeManager::ProbeStreamVisitor::OnCanWrite() { + if (!ValidateProbe() || !stream_->CanWrite()) { + return; + } + + if (!header_sent_) { + absl::Status status = quiche::WriteIntoStream( + *stream_, *quiche::SerializeIntoString( + quiche::WireVarInt62(MoqtDataStreamType::kPadding))); + QUICHE_DCHECK(status.ok()) << status; // Should succeed if CanWrite(). + header_sent_ = true; + } + + while (stream_->CanWrite() && data_remaining_ > 0) { + quic::QuicByteCount chunk_size = std::min(kWriteChunkSize, data_remaining_); + absl::string_view chunk(kZeroes, chunk_size); + quiche::StreamWriteOptions options; + options.set_send_fin(chunk_size == data_remaining_); + absl::Status status = stream_->Writev(absl::MakeSpan(&chunk, 1), options); + QUICHE_DCHECK(status.ok()) << status; // Should succeed if CanWrite(). + data_remaining_ -= chunk_size; + } +} + +void MoqtProbeManager::ProbeStreamVisitor::OnStopSendingReceived( + webtransport::StreamErrorCode error) { + if (!ValidateProbe()) { + return; + } + manager_->ClosePendingProbe(ProbeStatus::kAborted); +} + +void MoqtProbeManager::ProbeStreamVisitor::OnWriteSideInDataRecvdState() { + if (!ValidateProbe()) { + return; + } + manager_->ClosePendingProbe(ProbeStatus::kSuccess); +} + +void MoqtProbeManager::RescheduleAlarm() { + quic::QuicTime deadline = + probe_.has_value() ? probe_->deadline : quic::QuicTime::Zero(); + timeout_alarm_->Update(deadline, quic::QuicTimeDelta::Zero()); +} + +void MoqtProbeManager::OnAlarm() { + if (probe_.has_value()) { + ClosePendingProbe(ProbeStatus::kTimeout); + } + RescheduleAlarm(); +} + +void MoqtProbeManager::ClosePendingProbe(ProbeStatus status) { + std::optional<PendingProbe> probe = std::move(probe_); + if (!probe.has_value()) { + QUICHE_BUG(ClosePendingProbe_no_probe); + return; + } + if (status != ProbeStatus::kSuccess) { + webtransport::Stream* stream = session_->GetStreamById(probe->stream_id); + if (stream != nullptr) { + // TODO: figure out the error code. + stream->ResetWithUserCode(0); + } + } + quic::QuicTime now = clock_->ApproximateNow(); + std::move(probe->callback)( + ProbeResult{probe->id, status, probe->probe_size, now - probe->start}); +} +} // namespace moqt
diff --git a/quiche/quic/moqt/moqt_probe_manager.h b/quiche/quic/moqt/moqt_probe_manager.h new file mode 100644 index 0000000..0458a92 --- /dev/null +++ b/quiche/quic/moqt/moqt_probe_manager.h
@@ -0,0 +1,149 @@ +// Copyright 2024 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef QUICHE_QUIC_MOQT_MOQT_PROBE_MANAGER_H_ +#define QUICHE_QUIC_MOQT_MOQT_PROBE_MANAGER_H_ + +#include <cstdint> +#include <memory> +#include <optional> + +#include "quiche/quic/core/quic_alarm.h" +#include "quiche/quic/core/quic_alarm_factory.h" +#include "quiche/quic/core/quic_clock.h" +#include "quiche/quic/core/quic_time.h" +#include "quiche/quic/core/quic_types.h" +#include "quiche/common/quiche_callbacks.h" +#include "quiche/web_transport/web_transport.h" + +namespace moqt { + +// ID of a probe. +using ProbeId = uint64_t; + +// Potential outcomes of a proge. +enum class ProbeStatus : uint8_t { + // Probe has finished successfully. + kSuccess, + // Probe has timed out. + kTimeout, + // Probe has been aborted, via a STOP_SENDING or for some other reason. + kAborted, +}; + +// Represents the results of a probe. +struct ProbeResult { + ProbeId id; + ProbeStatus status; + // The number of bytes requested on the probe. + quic::QuicByteCount probe_size; + // Time elapsed between the time the probe was requested and now. + quic::QuicTimeDelta time_elapsed; +}; + +// Interface used to mock out MoqtProbeManager. +class MoqtProbeManagerInterface { + public: + using Callback = quiche::SingleUseCallback<void(const ProbeResult& result)>; + + virtual ~MoqtProbeManagerInterface() = default; + + // Starts the probe. Returns the ID of the probe, or nullopt if the probe + // cannot be started. Will fail if a probe is already pending. + virtual std::optional<ProbeId> StartProbe(quic::QuicByteCount probe_size, + quic::QuicTimeDelta timeout, + Callback callback) = 0; + // Cancels the currently pending probe. + virtual std::optional<ProbeId> StopProbe() = 0; +}; + +namespace test { +class MoqtProbeManagerPeer; +} + +// MoqtProbeManager keeps track of the pending bandwidth probe, including +// ensuring there is only one probe pending, and handling the timeout. +class MoqtProbeManager : public MoqtProbeManagerInterface { + public: + explicit MoqtProbeManager(webtransport::Session* session, + const quic::QuicClock* clock, + quic::QuicAlarmFactory& alarm_factory) + : session_(session), + clock_(clock), + timeout_alarm_(alarm_factory.CreateAlarm(new AlarmDelegate(this))) {} + + // MoqtProbeManagerInterface implementation. + std::optional<ProbeId> StartProbe(quic::QuicByteCount probe_size, + quic::QuicTimeDelta timeout, + Callback callback) override; + std::optional<ProbeId> StopProbe() override; + + private: + friend class ::moqt::test::MoqtProbeManagerPeer; + + struct PendingProbe { + ProbeId id; + quic::QuicTime start; + quic::QuicTime deadline; + quic::QuicByteCount probe_size; + webtransport::StreamId stream_id; + Callback callback; + }; + + class ProbeStreamVisitor : public webtransport::StreamVisitor { + public: + ProbeStreamVisitor(MoqtProbeManager* manager, webtransport::Stream* stream, + ProbeId probe_id, quic::QuicByteCount probe_size) + : manager_(manager), + stream_(stream), + probe_id_(probe_id), + data_remaining_(probe_size) {} + + void OnCanRead() override {} + void OnCanWrite() override; + void OnResetStreamReceived(webtransport::StreamErrorCode error) override {} + void OnStopSendingReceived(webtransport::StreamErrorCode error) override; + void OnWriteSideInDataRecvdState() override; + + private: + // Ensures the stream is associated with the currently active probe. + bool ValidateProbe() { + if (!manager_->probe_.has_value() || manager_->probe_->id != probe_id_) { + // TODO: figure out the error code. + stream_->ResetWithUserCode(0); + return false; + } + return true; + } + + MoqtProbeManager* manager_; + webtransport::Stream* stream_; + ProbeId probe_id_; + bool header_sent_ = false; + quic::QuicByteCount data_remaining_; + }; + + class AlarmDelegate : public quic::QuicAlarm::DelegateWithoutContext { + public: + explicit AlarmDelegate(MoqtProbeManager* manager) : manager_(manager) {} + void OnAlarm() override { manager_->OnAlarm(); } + + private: + MoqtProbeManager* manager_; + }; + + void RescheduleAlarm(); + void OnAlarm(); + void ClosePendingProbe(ProbeStatus status); + + std::optional<PendingProbe> probe_; + webtransport::Session* session_; + const quic::QuicClock* clock_; + std::unique_ptr<quic::QuicAlarm> timeout_alarm_; + ProbeId next_probe_id_ = 0; +}; + +} // namespace moqt + +#endif // QUICHE_QUIC_MOQT_MOQT_PROBE_MANAGER_H_
diff --git a/quiche/quic/moqt/moqt_probe_manager_test.cc b/quiche/quic/moqt/moqt_probe_manager_test.cc new file mode 100644 index 0000000..a60cfa2 --- /dev/null +++ b/quiche/quic/moqt/moqt_probe_manager_test.cc
@@ -0,0 +1,234 @@ +// Copyright 2024 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "quiche/quic/moqt/moqt_probe_manager.h" + +#include <cstddef> +#include <memory> +#include <optional> +#include <string> +#include <utility> + +#include "absl/status/status.h" +#include "absl/strings/string_view.h" +#include "absl/types/span.h" +#include "quiche/quic/core/quic_time.h" +#include "quiche/quic/core/quic_types.h" +#include "quiche/quic/test_tools/mock_clock.h" +#include "quiche/quic/test_tools/quic_test_utils.h" +#include "quiche/common/platform/api/quiche_logging.h" +#include "quiche/common/platform/api/quiche_test.h" +#include "quiche/common/quiche_stream.h" +#include "quiche/web_transport/test_tools/mock_web_transport.h" +#include "quiche/web_transport/web_transport.h" + +namespace moqt::test { + +using TestAlarm = quic::test::MockAlarmFactory::TestAlarm; + +class MoqtProbeManagerPeer { + public: + static TestAlarm* GetAlarm(MoqtProbeManager& manager) { + return static_cast<TestAlarm*>(manager.timeout_alarm_.get()); + } +}; + +namespace { + +using ::testing::_; +using ::testing::Return; + +// Two-byte varint. +constexpr size_t kProbeStreamHeaderSize = 2; + +class MockStream : public webtransport::test::MockStream { + public: + MockStream(webtransport::StreamId id) : id_(id) {} + + webtransport::StreamId GetStreamId() const override { return id_; } + absl::Status Writev(absl::Span<const absl::string_view> data, + const quiche::StreamWriteOptions& options) override { + QUICHE_CHECK(!fin_) << "FIN written twice."; + for (absl::string_view chunk : data) { + data_.append(chunk); + } + fin_ = options.send_fin(); + return absl::OkStatus(); + } + void SetVisitor(std::unique_ptr<webtransport::StreamVisitor> visitor) { + visitor_ = std::move(visitor); + } + webtransport::StreamVisitor* visitor() override { return visitor_.get(); } + + absl::string_view data() const { return data_; } + bool fin() const { return fin_; } + + private: + webtransport::StreamId id_; + std::unique_ptr<webtransport::StreamVisitor> visitor_ = nullptr; + std::string data_; + bool fin_ = false; +}; + +class MoqtProbeManagerTest : public quiche::test::QuicheTest { + protected: + MoqtProbeManagerTest() : manager_(&session_, &clock_, alarm_factory_) {} + + webtransport::test::MockSession session_; + quic::MockClock clock_; + quic::test::MockAlarmFactory alarm_factory_; + MoqtProbeManager manager_; +}; + +TEST_F(MoqtProbeManagerTest, AddProbe) { + constexpr webtransport::StreamId kStreamId = 17; + constexpr quic::QuicByteCount kProbeSize = 8192 + 1; + constexpr quic::QuicTimeDelta kProbeDuration = + quic::QuicTimeDelta::FromMilliseconds(100); + + MockStream stream(kStreamId); + EXPECT_CALL(session_, OpenOutgoingUnidirectionalStream()) + .WillOnce(Return(&stream)); + EXPECT_CALL(stream, CanWrite()).WillRepeatedly(Return(true)); + std::optional<ProbeResult> result; + std::optional<ProbeId> probe_id = + manager_.StartProbe(kProbeSize, 3 * kProbeDuration, + [&](const ProbeResult& r) { result = r; }); + ASSERT_NE(probe_id, std::nullopt); + ASSERT_EQ(result, std::nullopt); + + EXPECT_TRUE(stream.fin()); + EXPECT_EQ(stream.data().size(), kProbeSize + kProbeStreamHeaderSize); + + clock_.AdvanceTime(kProbeDuration); + stream.visitor()->OnWriteSideInDataRecvdState(); + + ASSERT_NE(result, std::nullopt); + EXPECT_EQ(result->id, probe_id); + EXPECT_EQ(result->status, ProbeStatus::kSuccess); + EXPECT_EQ(result->probe_size, kProbeSize); + EXPECT_EQ(result->time_elapsed, kProbeDuration); +} + +TEST_F(MoqtProbeManagerTest, AddProbeWriteBlockedInTheMiddle) { + constexpr webtransport::StreamId kStreamId = 17; + constexpr quic::QuicByteCount kProbeSize = 8192 + 1; + constexpr quic::QuicTimeDelta kProbeDuration = + quic::QuicTimeDelta::FromMilliseconds(100); + + MockStream stream(kStreamId); + EXPECT_CALL(session_, OpenOutgoingUnidirectionalStream()) + .WillOnce(Return(&stream)); + EXPECT_CALL(stream, CanWrite()) + .WillOnce(Return(true)) + .WillOnce(Return(true)) + .WillOnce(Return(false)); + std::optional<ProbeId> probe_id = manager_.StartProbe( + kProbeSize, 3 * kProbeDuration, [&](const ProbeResult& r) {}); + ASSERT_NE(probe_id, std::nullopt); + + EXPECT_FALSE(stream.fin()); + EXPECT_LT(stream.data().size(), kProbeSize); + + EXPECT_CALL(stream, CanWrite()).WillRepeatedly(Return(true)); + stream.visitor()->OnCanWrite(); + EXPECT_TRUE(stream.fin()); + EXPECT_EQ(stream.data().size(), kProbeSize + kProbeStreamHeaderSize); +} + +TEST_F(MoqtProbeManagerTest, ProbeCancelledByPeer) { + constexpr webtransport::StreamId kStreamId = 17; + constexpr quic::QuicByteCount kProbeSize = 8192 + 1; + constexpr quic::QuicTimeDelta kProbeDuration = + quic::QuicTimeDelta::FromMilliseconds(100); + + MockStream stream(kStreamId); + EXPECT_CALL(session_, OpenOutgoingUnidirectionalStream()) + .WillOnce(Return(&stream)); + EXPECT_CALL(stream, CanWrite()).WillRepeatedly(Return(true)); + std::optional<ProbeResult> result; + std::optional<ProbeId> probe_id = + manager_.StartProbe(kProbeSize, 3 * kProbeDuration, + [&](const ProbeResult& r) { result = r; }); + ASSERT_NE(probe_id, std::nullopt); + ASSERT_EQ(result, std::nullopt); + + EXPECT_TRUE(stream.fin()); + EXPECT_EQ(stream.data().size(), kProbeSize + kProbeStreamHeaderSize); + + clock_.AdvanceTime(kProbeDuration * 0.5); + stream.visitor()->OnStopSendingReceived(/*error=*/0); + + ASSERT_NE(result, std::nullopt); + EXPECT_EQ(result->id, probe_id); + EXPECT_EQ(result->status, ProbeStatus::kAborted); + EXPECT_EQ(result->time_elapsed, kProbeDuration * 0.5); +} + +TEST_F(MoqtProbeManagerTest, ProbeCancelledByClient) { + constexpr webtransport::StreamId kStreamId = 17; + constexpr quic::QuicByteCount kProbeSize = 8192 + 1; + constexpr quic::QuicTimeDelta kProbeDuration = + quic::QuicTimeDelta::FromMilliseconds(100); + + MockStream stream(kStreamId); + EXPECT_CALL(session_, OpenOutgoingUnidirectionalStream()) + .WillOnce(Return(&stream)); + EXPECT_CALL(stream, CanWrite()).WillRepeatedly(Return(true)); + std::optional<ProbeResult> result; + std::optional<ProbeId> probe_id = + manager_.StartProbe(kProbeSize, 3 * kProbeDuration, + [&](const ProbeResult& r) { result = r; }); + ASSERT_NE(probe_id, std::nullopt); + ASSERT_EQ(result, std::nullopt); + + EXPECT_TRUE(stream.fin()); + EXPECT_EQ(stream.data().size(), kProbeSize + kProbeStreamHeaderSize); + + EXPECT_CALL(session_, GetStreamById(kStreamId)).WillOnce(Return(&stream)); + EXPECT_CALL(stream, ResetWithUserCode(_)); + clock_.AdvanceTime(kProbeDuration * 0.5); + manager_.StopProbe(); + ASSERT_NE(result, std::nullopt); + EXPECT_EQ(result->id, probe_id); + EXPECT_EQ(result->status, ProbeStatus::kAborted); + EXPECT_EQ(result->time_elapsed, kProbeDuration * 0.5); +} + +TEST_F(MoqtProbeManagerTest, Timeout) { + constexpr webtransport::StreamId kStreamId = 17; + constexpr quic::QuicByteCount kProbeSize = 8192 + 1; + constexpr quic::QuicTimeDelta kProbeDuration = + quic::QuicTimeDelta::FromMilliseconds(100); + const quic::QuicTimeDelta kTimeout = 0.5 * kProbeDuration; + + MockStream stream(kStreamId); + EXPECT_CALL(session_, OpenOutgoingUnidirectionalStream()) + .WillOnce(Return(&stream)); + EXPECT_CALL(stream, CanWrite()).WillRepeatedly(Return(true)); + std::optional<ProbeResult> result; + std::optional<ProbeId> probe_id = manager_.StartProbe( + kProbeSize, kTimeout, [&](const ProbeResult& r) { result = r; }); + ASSERT_NE(probe_id, std::nullopt); + ASSERT_EQ(result, std::nullopt); + + EXPECT_TRUE(stream.fin()); + EXPECT_EQ(stream.data().size(), kProbeSize + kProbeStreamHeaderSize); + + clock_.AdvanceTime(kTimeout); + TestAlarm* alarm = MoqtProbeManagerPeer::GetAlarm(manager_); + EXPECT_EQ(alarm->deadline(), clock_.Now()); + + EXPECT_CALL(session_, GetStreamById(kStreamId)).WillOnce(Return(&stream)); + EXPECT_CALL(stream, ResetWithUserCode(_)); + alarm->Fire(); + ASSERT_NE(result, std::nullopt); + EXPECT_EQ(result->id, probe_id); + EXPECT_EQ(result->status, ProbeStatus::kTimeout); + EXPECT_EQ(result->probe_size, kProbeSize); + EXPECT_EQ(result->time_elapsed, kTimeout); +} + +} // namespace +} // namespace moqt::test