blob: 0b97b82c3459f5d76ffde974af430eee414d8103 [file] [log] [blame]
// Copyright 2024 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef QUICHE_QUIC_MOQT_MOQT_FETCH_TASK_H_
#define QUICHE_QUIC_MOQT_MOQT_FETCH_TASK_H_
#include <cstdint>
#include <optional>
#include <string>
#include <utility>
#include <variant>
#include "absl/base/nullability.h"
#include "absl/status/status.h"
#include "quiche/quic/moqt/moqt_error.h"
#include "quiche/quic/moqt/moqt_key_value_pair.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_names.h"
#include "quiche/quic/moqt/moqt_object.h"
#include "quiche/common/quiche_callbacks.h"
#include "quiche/web_transport/web_transport.h"
namespace moqt {
// The callback we'll use for all request types going forward. Can only be used
// once; if the argument is nullopt, an OK response was received. Otherwise, an
// ERROR response was received.
using MoqtResponseCallback =
quiche::SingleUseCallback<void(std::optional<MoqtRequestErrorInfo>)>;
// TODO(martinduke): There are will be multiple instances of flow-controlled
// "pull" data retrieval tasks. It might be worthwhile to extract some common
// features into a base class.
using ObjectsAvailableCallback = quiche::MultiUseCallback<void()>;
// Potential results of a GetNextObject/GetNextMessage() call.
enum GetNextResult {
// The next object or message is available, and is placed into the reference
// specified by the caller.
kSuccess,
// The next object or message is not yet available (equivalent of EAGAIN).
kPending,
// The end of the response has been reached.
kEof,
// The request has failed; the error is available.
kError,
};
enum class TransactionType : uint8_t { kAdd, kDelete };
// A handle representing a fetch in progress. The fetch in question can be
// cancelled by deleting the object.
class MoqtFetchTask {
public:
// The request_id field will be ignored.
using FetchResponseCallback = quiche::SingleUseCallback<void(
std::variant<MoqtFetchOk, MoqtRequestError>)>;
virtual ~MoqtFetchTask() = default;
// TODO(martinduke): Replace with GetNextResult above.
// Potential results of a GetNextObject() call.
enum GetNextObjectResult {
// The next object is available, and is placed into the reference specified
// by the caller.
kSuccess,
// The next object is not yet available (equivalent of EAGAIN).
kPending,
// The end of fetch has been reached.
kEof,
// The fetch has failed; the error is available via GetStatus().
kError,
};
// Returns the next object received via the fetch, if available. MUST NOT
// return an object with status kObjectDoesNotExist.
virtual GetNextObjectResult GetNextObject(PublishedObject& output) = 0;
// Sets the callback that is called when GetNextObject() has previously
// returned kPending, but now a new object (or potentially an error or an
// end-of-fetch) is available. The application is responsible for calling
// GetNextObject() until it gets kPending; no further callback will occur
// until then.
// If an object is available immediately, the callback will be called
// immediately.
virtual void SetObjectAvailableCallback(
ObjectsAvailableCallback callback) = 0;
// One of these callbacks is called as soon as the data publisher has enough
// information for either FETCH_OK or FETCH_ERROR.
// If the appropriate response is already available, the callback will be
// called immediately.
virtual void SetFetchResponseCallback(FetchResponseCallback callback) = 0;
// Returns the error if fetch has completely failed, and OK otherwise.
virtual absl::Status GetStatus() = 0;
};
class MoqtNamespaceTask {
public:
virtual ~MoqtNamespaceTask() = default;
// The provided callback may be immediately invoked.
virtual void SetObjectsAvailableCallback(ObjectsAvailableCallback
absl_nullable callback) = 0;
// Returns the state of the message queue. If available, writes the suffix
// into |suffix|. If |type| is kAdd, it is from a NAMESPACE message. If |type|
// is kDelete, it is from a NAMESPACE_DONE message.
virtual GetNextResult GetNextSuffix(TrackNamespace& suffix,
TransactionType& type) = 0;
// Returns the error if request has completely failed, and nullopt otherwise.
virtual std::optional<webtransport::StreamErrorCode> GetStatus() = 0;
// Handle a REQUEST_UPDATE message.
virtual void Update(const MessageParameters& parameters,
MoqtResponseCallback response_callback) = 0;
// Returns the prefix for this task.
virtual const TrackNamespace& prefix() = 0;
};
// A fetch that starts out in the failed state.
class MoqtFailedFetch : public MoqtFetchTask {
public:
explicit MoqtFailedFetch(absl::Status status) : status_(std::move(status)) {}
GetNextObjectResult GetNextObject(PublishedObject&) override {
return kError;
}
absl::Status GetStatus() override { return status_; }
void SetObjectAvailableCallback(
ObjectsAvailableCallback /*callback*/) override {}
void SetFetchResponseCallback(FetchResponseCallback callback) override {
MoqtRequestError error{/*request_id=*/0, StatusToRequestErrorCode(status_),
std::nullopt, std::string(status_.message())};
std::move(callback)(error);
}
private:
absl::Status status_;
};
} // namespace moqt
#endif // QUICHE_QUIC_MOQT_MOQT_FETCH_TASK_H_