blob: c3745b38c31af46b9a1c98cdb8d5c6f38549fc04 [file] [log] [blame]
// Copyright 2024 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "quiche/quic/moqt/moqt_probe_manager.h"
#include <algorithm>
#include <memory>
#include <optional>
#include <utility>
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
#include "absl/types/span.h"
#include "quiche/quic/core/quic_time.h"
#include "quiche/quic/core/quic_types.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_priority.h"
#include "quiche/common/platform/api/quiche_bug_tracker.h"
#include "quiche/common/platform/api/quiche_logging.h"
#include "quiche/common/quiche_stream.h"
#include "quiche/common/wire_serialization.h"
#include "quiche/web_transport/web_transport.h"
namespace moqt {
namespace {
constexpr quic::QuicByteCount kWriteChunkSize = 4096;
constexpr char kZeroes[kWriteChunkSize] = {0};
} // namespace
std::optional<ProbeId> MoqtProbeManager::StartProbe(
quic::QuicByteCount probe_size, quic::QuicTimeDelta timeout,
Callback callback) {
if (probe_.has_value()) {
return std::nullopt;
}
ProbeId id = next_probe_id_++;
webtransport::Stream* stream = session_->OpenOutgoingUnidirectionalStream();
if (stream == nullptr) {
return std::nullopt;
}
probe_ = PendingProbe{
id, clock_->ApproximateNow(), clock_->ApproximateNow() + timeout,
probe_size, stream->GetStreamId(), std::move(callback)};
auto visitor_owned =
std::make_unique<ProbeStreamVisitor>(this, stream, id, probe_size);
ProbeStreamVisitor* visitor = visitor_owned.get();
stream->SetVisitor(std::move(visitor_owned));
stream->SetPriority(webtransport::StreamPriority{
/*send_group_id=*/0, /*send_order=*/kMoqtProbeStreamSendOrder});
visitor->OnCanWrite();
RescheduleAlarm();
return id;
}
std::optional<ProbeId> MoqtProbeManager::StopProbe() {
if (!probe_.has_value()) {
return std::nullopt;
}
ProbeId id = probe_->id;
ClosePendingProbe(ProbeStatus::kAborted);
return id;
}
void MoqtProbeManager::ProbeStreamVisitor::OnCanWrite() {
if (!ValidateProbe() || !stream_->CanWrite()) {
return;
}
if (!header_sent_) {
absl::Status status = quiche::WriteIntoStream(
*stream_, *quiche::SerializeIntoString(
quiche::WireVarInt62(MoqtDataStreamType::kPadding)));
QUICHE_DCHECK(status.ok()) << status; // Should succeed if CanWrite().
header_sent_ = true;
}
while (stream_->CanWrite() && data_remaining_ > 0) {
quic::QuicByteCount chunk_size = std::min(kWriteChunkSize, data_remaining_);
absl::string_view chunk(kZeroes, chunk_size);
quiche::StreamWriteOptions options;
options.set_send_fin(chunk_size == data_remaining_);
absl::Status status = stream_->Writev(absl::MakeSpan(&chunk, 1), options);
QUICHE_DCHECK(status.ok()) << status; // Should succeed if CanWrite().
data_remaining_ -= chunk_size;
}
}
void MoqtProbeManager::ProbeStreamVisitor::OnStopSendingReceived(
webtransport::StreamErrorCode error) {
if (!ValidateProbe()) {
return;
}
manager_->ClosePendingProbe(ProbeStatus::kAborted);
}
void MoqtProbeManager::ProbeStreamVisitor::OnWriteSideInDataRecvdState() {
if (!ValidateProbe()) {
return;
}
manager_->ClosePendingProbe(ProbeStatus::kSuccess);
}
void MoqtProbeManager::RescheduleAlarm() {
quic::QuicTime deadline =
probe_.has_value() ? probe_->deadline : quic::QuicTime::Zero();
timeout_alarm_->Update(deadline, quic::QuicTimeDelta::Zero());
}
void MoqtProbeManager::OnAlarm() {
if (probe_.has_value()) {
ClosePendingProbe(ProbeStatus::kTimeout);
}
RescheduleAlarm();
}
void MoqtProbeManager::ClosePendingProbe(ProbeStatus status) {
std::optional<PendingProbe> probe = std::move(probe_);
if (!probe.has_value()) {
QUICHE_BUG(ClosePendingProbe_no_probe);
return;
}
if (status != ProbeStatus::kSuccess) {
webtransport::Stream* stream = session_->GetStreamById(probe->stream_id);
if (stream != nullptr) {
// TODO: figure out the error code.
stream->ResetWithUserCode(0);
}
}
quic::QuicTime now = clock_->ApproximateNow();
std::move(probe->callback)(
ProbeResult{probe->id, status, probe->probe_size, now - probe->start});
}
} // namespace moqt