blob: 31980d11b58f6454de89b4e04184ae0753f43b45 [file] [log] [blame]
// Copyright 2024 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file
#include "quiche/quic/moqt/moqt_track.h"
#include <memory>
#include <optional>
#include <utility>
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_publisher.h"
#include "quiche/common/platform/api/quiche_logging.h"
#include "quiche/web_transport/web_transport.h"
namespace moqt {
bool RemoteTrack::CheckDataStreamType(MoqtDataStreamType type) {
if (data_stream_type_.has_value()) {
return data_stream_type_.value() == type;
}
data_stream_type_ = type;
return true;
}
UpstreamFetch::~UpstreamFetch() {
if (task_.IsValid()) {
// Notify the task (which the application owns) that nothing more is coming.
// If this has already been called, UpstreamFetchTask will ignore it.
task_.GetIfAvailable()->OnStreamAndFetchClosed(kResetCodeUnknown, "");
}
}
void UpstreamFetch::OnFetchResult(FullSequence largest_id, absl::Status status,
TaskDestroyedCallback callback) {
auto task = std::make_unique<UpstreamFetchTask>(largest_id, status,
std::move(callback));
task_ = task->weak_ptr();
std::move(ok_callback_)(std::move(task));
}
UpstreamFetch::UpstreamFetchTask::~UpstreamFetchTask() {
if (task_destroyed_callback_) {
std::move(task_destroyed_callback_)();
}
}
MoqtFetchTask::GetNextObjectResult
UpstreamFetch::UpstreamFetchTask::GetNextObject(PublishedObject& output) {
if (!next_object_.has_value()) {
if (!status_.ok()) {
return kError;
}
if (eof_) {
return kEof;
}
return kPending;
}
output = *std::move(next_object_);
next_object_.reset();
return kSuccess;
}
void UpstreamFetch::UpstreamFetchTask::NewObject(PublishedObject& object) {
QUICHE_DCHECK(!next_object_.has_value());
next_object_ = std::move(object);
if (object_available_callback_) {
object_available_callback_();
}
}
void UpstreamFetch::UpstreamFetchTask::OnStreamAndFetchClosed(
std::optional<webtransport::StreamErrorCode> error,
absl::string_view reason_phrase) {
if (eof_ || error.has_value()) {
return;
}
// Delete callbacks, because IncomingDataStream and UpstreamFetch are gone.
can_read_callback_ = nullptr;
task_destroyed_callback_ = nullptr;
if (!error.has_value()) { // This was a FIN.
eof_ = true;
} else {
status_ = MoqtStreamErrorToStatus(*error, reason_phrase);
}
if (object_available_callback_) {
object_available_callback_();
}
}
} // namespace moqt