MoQT Upstream FETCH data streams.
Also, added track_alias() query to Parser API.
PiperOrigin-RevId: 710104119
diff --git a/quiche/quic/moqt/moqt_parser.h b/quiche/quic/moqt/moqt_parser.h
index 1b4de80..b05fdcc 100644
--- a/quiche/quic/moqt/moqt_parser.h
+++ b/quiche/quic/moqt/moqt_parser.h
@@ -202,6 +202,13 @@
// Returns the type of the unidirectional stream, if already known.
std::optional<MoqtDataStreamType> stream_type() const { return type_; }
+ // Returns the track alias, if already known.
+ std::optional<uint64_t> track_alias() const {
+ return (next_input_ == kStreamType || next_input_ == kTrackAlias)
+ ? std::optional<uint64_t>()
+ : metadata_.track_alias;
+ }
+
private:
friend class test::MoqtDataParserPeer;
diff --git a/quiche/quic/moqt/moqt_publisher.h b/quiche/quic/moqt/moqt_publisher.h
index 010db7f..0f378ed 100644
--- a/quiche/quic/moqt/moqt_publisher.h
+++ b/quiche/quic/moqt/moqt_publisher.h
@@ -77,7 +77,9 @@
// Sets the callback that is called when GetNextObject() has previously
// returned kPending, but now a new object (or potentially an error or an
- // end-of-fetch) is available.
+ // end-of-fetch) is available. The application is responsible for calling
+ // GetNextObject() until it gets kPending; no further callback will occur
+ // until then.
virtual void SetObjectAvailableCallback(
ObjectsAvailableCallback callback) = 0;
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index 8496188..a7473b1 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -36,6 +36,7 @@
#include "quiche/quic/platform/api/quic_logging.h"
#include "quiche/common/platform/api/quiche_bug_tracker.h"
#include "quiche/common/platform/api/quiche_logging.h"
+#include "quiche/common/platform/api/quiche_mem_slice.h"
#include "quiche/common/quiche_buffer_allocator.h"
#include "quiche/common/quiche_stream.h"
#include "quiche/common/simple_buffer_allocator.h"
@@ -959,7 +960,6 @@
subscribe->visitor()->OnReply(track->full_track_name(), message.largest_id,
std::nullopt);
}
- subscribe->OnObjectOrOk();
}
void MoqtSession::ControlStream::OnSubscribeErrorMessage(
@@ -1385,19 +1385,114 @@
return;
}
track->OnObjectOrOk();
- SubscribeRemoteTrack* subscribe = static_cast<SubscribeRemoteTrack*>(track);
- if (subscribe->visitor() != nullptr) {
- subscribe->visitor()->OnObjectFragment(
- track->full_track_name(),
- FullSequence{message.group_id, message.subgroup_id.value_or(0),
- message.object_id},
- message.publisher_priority, message.object_status, payload,
- end_of_message);
+ if (!track->is_fetch()) {
+ SubscribeRemoteTrack* subscribe = static_cast<SubscribeRemoteTrack*>(track);
+ if (subscribe->visitor() != nullptr) {
+ subscribe->visitor()->OnObjectFragment(
+ track->full_track_name(),
+ FullSequence{message.group_id, message.subgroup_id.value_or(0),
+ message.object_id},
+ message.publisher_priority, message.object_status, payload,
+ end_of_message);
+ }
+ } else { // FETCH
+ UpstreamFetch* fetch = static_cast<UpstreamFetch*>(track);
+ UpstreamFetch::UpstreamFetchTask* task = fetch->task();
+ if (task == nullptr) {
+ // The application killed the FETCH.
+ stream_->SendStopSending(kResetCodeSubscriptionGone);
+ return;
+ }
+ if (!task->HasObject()) {
+ task->NewObject(message);
+ }
+ if (task->NeedsMorePayload() && !payload.empty()) {
+ task->AppendPayloadToObject(payload);
+ }
}
partial_object_.clear();
}
-void MoqtSession::IncomingDataStream::OnCanRead() { parser_.ReadAllData(); }
+MoqtSession::IncomingDataStream::~IncomingDataStream() {
+ if (parser_.track_alias().has_value() &&
+ parser_.stream_type() == MoqtDataStreamType::kStreamHeaderFetch) {
+ RemoteTrack* track = session_->RemoteTrackById(*parser_.track_alias());
+ if (track != nullptr && track->is_fetch()) {
+ session_->upstream_by_id_.erase(*parser_.track_alias());
+ }
+ }
+}
+
+void MoqtSession::IncomingDataStream::MaybeReadOneObject() {
+ if (!parser_.track_alias().has_value() ||
+ parser_.stream_type() != MoqtDataStreamType::kStreamHeaderFetch) {
+ QUICHE_BUG(quic_bug_read_one_object_parser_unexpected_state)
+ << "Requesting object, parser in unexpected state";
+ }
+ RemoteTrack* track = session_->RemoteTrackById(*parser_.track_alias());
+ if (track == nullptr || !track->is_fetch()) {
+ QUICHE_BUG(quic_bug_read_one_object_track_unexpected_state)
+ << "Requesting object, track in unexpected state";
+ return;
+ }
+ UpstreamFetch* fetch = static_cast<UpstreamFetch*>(track);
+ UpstreamFetch::UpstreamFetchTask* task = fetch->task();
+ if (task == nullptr) {
+ return;
+ }
+ if (task->HasObject() && !task->NeedsMorePayload()) {
+ return;
+ }
+ parser_.ReadAtMostOneObject();
+ // If it read an object, it called OnObjectMessage and may have altered the
+ // task's object state.
+ if (task->HasObject() && !task->NeedsMorePayload()) {
+ task->NotifyNewObject();
+ }
+}
+
+void MoqtSession::IncomingDataStream::OnCanRead() {
+ if (!parser_.stream_type().has_value()) {
+ parser_.ReadStreamType();
+ if (!parser_.stream_type().has_value()) {
+ return;
+ }
+ }
+ if (parser_.stream_type() != MoqtDataStreamType::kStreamHeaderFetch) {
+ parser_.ReadAllData();
+ return;
+ }
+ bool learned_track_alias = false;
+ if (!parser_.track_alias().has_value()) {
+ learned_track_alias = true;
+ parser_.ReadTrackAlias();
+ if (!parser_.track_alias().has_value()) {
+ return;
+ }
+ }
+ auto it = session_->upstream_by_id_.find(*parser_.track_alias());
+ if (it == session_->upstream_by_id_.end()) {
+ QUIC_DLOG(INFO) << ENDPOINT << "Received object for a track with no FETCH";
+ // This is a not a session error because there might be an UNSUBSCRIBE in
+ // flight.
+ stream_->SendStopSending(kResetCodeSubscriptionGone);
+ return;
+ }
+ if (it->second == nullptr) {
+ QUICHE_BUG(quiche_bug_moqt_fetch_pointer_is_null)
+ << "Fetch pointer is null";
+ return;
+ }
+ UpstreamFetch* fetch = static_cast<UpstreamFetch*>(it->second.get());
+ if (learned_track_alias) {
+ // If the task already exists (FETCH_OK has arrived), the callback will
+ // immediately execute to read the first object. Otherwise, it will only
+ // execute when the task is created or a cached object is read.
+ fetch->OnStreamOpened([this]() { MaybeReadOneObject(); });
+ return;
+ }
+ MaybeReadOneObject();
+}
void MoqtSession::IncomingDataStream::OnControlMessageReceived() {
session_->Error(MoqtError::kProtocolViolation,
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index adf4b7e..18c02e2 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -314,6 +314,7 @@
public:
IncomingDataStream(MoqtSession* session, webtransport::Stream* stream)
: session_(session), stream_(stream), parser_(stream, this) {}
+ ~IncomingDataStream();
// webtransport::StreamVisitor implementation.
void OnCanRead() override;
@@ -335,6 +336,8 @@
webtransport::Stream* stream() const { return stream_; }
+ void MaybeReadOneObject();
+
private:
friend class test::MoqtSessionPeer;
void OnControlMessageReceived();
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index 172a152..ae2a3ca 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -8,6 +8,7 @@
#include <cstring>
#include <memory>
#include <optional>
+#include <queue>
#include <string>
#include <utility>
#include <vector>
@@ -19,17 +20,22 @@
#include "quiche/quic/core/quic_data_reader.h"
#include "quiche/quic/core/quic_time.h"
#include "quiche/quic/core/quic_types.h"
+#include "quiche/quic/moqt/moqt_framer.h"
#include "quiche/quic/moqt/moqt_known_track_publisher.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_parser.h"
#include "quiche/quic/moqt/moqt_priority.h"
#include "quiche/quic/moqt/moqt_publisher.h"
+#include "quiche/quic/moqt/moqt_track.h"
#include "quiche/quic/moqt/test_tools/moqt_framer_utils.h"
#include "quiche/quic/moqt/test_tools/moqt_session_peer.h"
#include "quiche/quic/moqt/tools/moqt_mock_visitor.h"
#include "quiche/quic/platform/api/quic_test.h"
#include "quiche/quic/test_tools/quic_test_utils.h"
+#include "quiche/common/quiche_buffer_allocator.h"
#include "quiche/common/quiche_stream.h"
+#include "quiche/common/simple_buffer_allocator.h"
+#include "quiche/web_transport/test_tools/in_memory_stream.h"
#include "quiche/web_transport/test_tools/mock_web_transport.h"
#include "quiche/web_transport/web_transport.h"
@@ -2256,6 +2262,218 @@
EXPECT_EQ(fetch_task->GetStatus().message(), "No username provided");
}
+// The application takes objects as they arrive.
+TEST_F(MoqtSessionTest, IncomingFetchObjectsGreedyApp) {
+ webtransport::test::MockStream mock_stream;
+ std::unique_ptr<MoqtControlParserVisitor> stream_input =
+ MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
+ std::unique_ptr<MoqtFetchTask> fetch_task;
+ uint64_t expected_object_id = 0;
+ session_.Fetch(
+ FullTrackName("foo", "bar"),
+ [&](std::unique_ptr<MoqtFetchTask> task) {
+ fetch_task = std::move(task);
+ fetch_task->SetObjectAvailableCallback([&]() {
+ PublishedObject object;
+ MoqtFetchTask::GetNextObjectResult result;
+ do {
+ result = fetch_task->GetNextObject(object);
+ if (result == MoqtFetchTask::GetNextObjectResult::kSuccess) {
+ EXPECT_EQ(object.sequence.object, expected_object_id);
+ ++expected_object_id;
+ }
+ } while (result != MoqtFetchTask::GetNextObjectResult::kPending);
+ });
+ },
+ FullSequence(0, 0), 4, std::nullopt, 128, std::nullopt,
+ MoqtSubscribeParameters());
+ // Build queue of packets to arrive.
+ std::queue<quiche::QuicheBuffer> headers;
+ std::queue<std::string> payloads;
+ MoqtObject object = {
+ /*subscribe_id=*/0,
+ /*group_id, object_id=*/0,
+ 0,
+ /*publisher_priority=*/128,
+ /*status=*/MoqtObjectStatus::kNormal,
+ /*subgroup=*/0,
+ /*payload_length=*/3,
+ };
+ MoqtFramer framer_(quiche::SimpleBufferAllocator::Get(), true);
+ for (int i = 0; i < 4; ++i) {
+ object.object_id = i;
+ headers.push(framer_.SerializeObjectHeader(
+ object, MoqtDataStreamType::kStreamHeaderFetch, i == 0));
+ payloads.push("foo");
+ }
+
+ // Open stream, deliver two objects before FETCH_OK. Neither should be read.
+ webtransport::test::InMemoryStream data_stream(kIncomingUniStreamId);
+ data_stream.SetVisitor(
+ MoqtSessionPeer::CreateIncomingStreamVisitor(&session_, &data_stream));
+ for (int i = 0; i < 2; ++i) {
+ data_stream.Receive(headers.front().AsStringView(), false);
+ data_stream.Receive(payloads.front(), false);
+ headers.pop();
+ payloads.pop();
+ }
+ EXPECT_EQ(fetch_task, nullptr);
+ EXPECT_GT(data_stream.ReadableBytes(), 0);
+
+ // FETCH_OK arrives, objects are delivered.
+ MoqtFetchOk ok = {
+ /*subscribe_id=*/0,
+ /*group_order=*/MoqtDeliveryOrder::kAscending,
+ /*largest_id=*/FullSequence(3, 25),
+ MoqtSubscribeParameters(),
+ };
+ stream_input->OnFetchOkMessage(ok);
+ ASSERT_NE(fetch_task, nullptr);
+ EXPECT_EQ(expected_object_id, 2);
+
+ // Deliver the rest of the objects.
+ for (int i = 2; i < 4; ++i) {
+ data_stream.Receive(headers.front().AsStringView(), false);
+ data_stream.Receive(payloads.front(), false);
+ headers.pop();
+ payloads.pop();
+ }
+ EXPECT_EQ(expected_object_id, 4);
+}
+
+TEST_F(MoqtSessionTest, IncomingFetchObjectsSlowApp) {
+ webtransport::test::MockStream mock_stream;
+ std::unique_ptr<MoqtControlParserVisitor> stream_input =
+ MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
+ std::unique_ptr<MoqtFetchTask> fetch_task;
+ uint64_t expected_object_id = 0;
+ bool objects_available = false;
+ session_.Fetch(
+ FullTrackName("foo", "bar"),
+ [&](std::unique_ptr<MoqtFetchTask> task) {
+ fetch_task = std::move(task);
+ fetch_task->SetObjectAvailableCallback(
+ [&]() { objects_available = true; });
+ },
+ FullSequence(0, 0), 4, std::nullopt, 128, std::nullopt,
+ MoqtSubscribeParameters());
+ // Build queue of packets to arrive.
+ std::queue<quiche::QuicheBuffer> headers;
+ std::queue<std::string> payloads;
+ MoqtObject object = {
+ /*subscribe_id=*/0,
+ /*group_id, object_id=*/0,
+ 0,
+ /*publisher_priority=*/128,
+ /*status=*/MoqtObjectStatus::kNormal,
+ /*subgroup=*/0,
+ /*payload_length=*/3,
+ };
+ MoqtFramer framer_(quiche::SimpleBufferAllocator::Get(), true);
+ for (int i = 0; i < 4; ++i) {
+ object.object_id = i;
+ headers.push(framer_.SerializeObjectHeader(
+ object, MoqtDataStreamType::kStreamHeaderFetch, i == 0));
+ payloads.push("foo");
+ }
+
+ // Open stream, deliver two objects before FETCH_OK. Neither should be read.
+ webtransport::test::InMemoryStream data_stream(kIncomingUniStreamId);
+ data_stream.SetVisitor(
+ MoqtSessionPeer::CreateIncomingStreamVisitor(&session_, &data_stream));
+ for (int i = 0; i < 2; ++i) {
+ data_stream.Receive(headers.front().AsStringView(), false);
+ data_stream.Receive(payloads.front(), false);
+ headers.pop();
+ payloads.pop();
+ }
+ EXPECT_EQ(fetch_task, nullptr);
+ EXPECT_GT(data_stream.ReadableBytes(), 0);
+
+ // FETCH_OK arrives, objects are available.
+ MoqtFetchOk ok = {
+ /*subscribe_id=*/0,
+ /*group_order=*/MoqtDeliveryOrder::kAscending,
+ /*largest_id=*/FullSequence(3, 25),
+ MoqtSubscribeParameters(),
+ };
+ stream_input->OnFetchOkMessage(ok);
+ ASSERT_NE(fetch_task, nullptr);
+ EXPECT_TRUE(objects_available);
+
+ // Get the objects
+ MoqtFetchTask::GetNextObjectResult result;
+ do {
+ PublishedObject new_object;
+ result = fetch_task->GetNextObject(new_object);
+ if (result == MoqtFetchTask::GetNextObjectResult::kSuccess) {
+ EXPECT_EQ(new_object.sequence.object, expected_object_id);
+ ++expected_object_id;
+ }
+ } while (result != MoqtFetchTask::GetNextObjectResult::kPending);
+ EXPECT_EQ(expected_object_id, 2);
+ objects_available = false;
+
+ // Deliver the rest of the objects.
+ for (int i = 2; i < 4; ++i) {
+ data_stream.Receive(headers.front().AsStringView(), false);
+ data_stream.Receive(payloads.front(), false);
+ headers.pop();
+ payloads.pop();
+ }
+ EXPECT_TRUE(objects_available);
+ EXPECT_EQ(expected_object_id, 2); // Not delivered yet.
+ // Get the objects
+ do {
+ PublishedObject new_object;
+ result = fetch_task->GetNextObject(new_object);
+ if (result == MoqtFetchTask::GetNextObjectResult::kSuccess) {
+ EXPECT_EQ(new_object.sequence.object, expected_object_id);
+ ++expected_object_id;
+ }
+ } while (result != MoqtFetchTask::GetNextObjectResult::kPending);
+ EXPECT_EQ(expected_object_id, 4);
+}
+
+TEST_F(MoqtSessionTest, PartialObjectFetch) {
+ MoqtSessionParameters parameters(quic::Perspective::IS_CLIENT);
+ parameters.deliver_partial_objects = true;
+ MoqtSession session(&mock_session_, parameters,
+ session_callbacks_.AsSessionCallbacks());
+ webtransport::test::InMemoryStream stream(kIncomingUniStreamId);
+ std::unique_ptr<MoqtFetchTask> fetch_task =
+ MoqtSessionPeer::CreateUpstreamFetch(&session, &stream);
+ UpstreamFetch::UpstreamFetchTask* task =
+ static_cast<UpstreamFetch::UpstreamFetchTask*>(fetch_task.get());
+ ASSERT_NE(task, nullptr);
+ EXPECT_FALSE(task->HasObject());
+ bool object_ready = false;
+ task->SetObjectAvailableCallback([&]() { object_ready = true; });
+ MoqtObject object = {
+ /*subscribe_id=*/0,
+ /*group_id, object_id=*/0,
+ 0,
+ /*publisher_priority=*/128,
+ /*status=*/MoqtObjectStatus::kNormal,
+ /*subgroup=*/0,
+ /*payload_length=*/6,
+ };
+ MoqtFramer framer_(quiche::SimpleBufferAllocator::Get(), true);
+ quiche::QuicheBuffer header = framer_.SerializeObjectHeader(
+ object, MoqtDataStreamType::kStreamHeaderFetch, true);
+ stream.Receive(header.AsStringView(), false);
+ EXPECT_FALSE(task->HasObject());
+ EXPECT_FALSE(object_ready);
+ stream.Receive("foo", false);
+ EXPECT_TRUE(task->HasObject());
+ EXPECT_TRUE(task->NeedsMorePayload());
+ EXPECT_FALSE(object_ready);
+ stream.Receive("bar", false);
+ EXPECT_TRUE(object_ready);
+ EXPECT_TRUE(task->HasObject());
+ EXPECT_FALSE(task->NeedsMorePayload());
+}
+
// TODO: re-enable this test once this behavior is re-implemented.
#if 0
TEST_F(MoqtSessionTest, SubscribeUpdateClosesSubscription) {
diff --git a/quiche/quic/moqt/moqt_track.cc b/quiche/quic/moqt/moqt_track.cc
index 31980d1..d8e31a3 100644
--- a/quiche/quic/moqt/moqt_track.cc
+++ b/quiche/quic/moqt/moqt_track.cc
@@ -4,6 +4,7 @@
#include "quiche/quic/moqt/moqt_track.h"
+#include <cstring>
#include <memory>
#include <optional>
#include <utility>
@@ -12,7 +13,10 @@
#include "absl/strings/string_view.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_publisher.h"
-#include "quiche/common/platform/api/quiche_logging.h"
+#include "quiche/common/platform/api/quiche_bug_tracker.h"
+#include "quiche/common/platform/api/quiche_mem_slice.h"
+#include "quiche/common/quiche_buffer_allocator.h"
+#include "quiche/common/simple_buffer_allocator.h"
#include "quiche/web_transport/web_transport.h"
namespace moqt {
@@ -39,6 +43,18 @@
std::move(callback));
task_ = task->weak_ptr();
std::move(ok_callback_)(std::move(task));
+ if (can_read_callback_) {
+ task_.GetIfAvailable()->set_can_read_callback(
+ std::move(can_read_callback_));
+ }
+}
+
+void UpstreamFetch::OnStreamOpened(CanReadCallback can_read_callback) {
+ if (task_.IsValid()) {
+ task_.GetIfAvailable()->set_can_read_callback(std::move(can_read_callback));
+ } else {
+ can_read_callback_ = std::move(can_read_callback);
+ }
}
UpstreamFetch::UpstreamFetchTask::~UpstreamFetchTask() {
@@ -56,17 +72,47 @@
if (eof_) {
return kEof;
}
+ need_object_available_callback_ = true;
return kPending;
}
- output = *std::move(next_object_);
+ quiche::QuicheMemSlice message_slice(std::move(payload_));
+ output.sequence = FullSequence(next_object_->group_id,
+ next_object_->subgroup_id.value_or(0),
+ next_object_->object_id);
+ output.status = next_object_->object_status;
+ output.publisher_priority = next_object_->publisher_priority;
+ output.payload = std::move(message_slice);
+ output.fin_after_this = false;
next_object_.reset();
+ can_read_callback_();
return kSuccess;
}
-void UpstreamFetch::UpstreamFetchTask::NewObject(PublishedObject& object) {
- QUICHE_DCHECK(!next_object_.has_value());
- next_object_ = std::move(object);
- if (object_available_callback_) {
+void UpstreamFetch::UpstreamFetchTask::NewObject(const MoqtObject& message) {
+ next_object_ = message;
+ payload_ = quiche::QuicheBuffer(quiche::SimpleBufferAllocator::Get(),
+ message.payload_length);
+}
+
+void UpstreamFetch::UpstreamFetchTask::AppendPayloadToObject(
+ absl::string_view payload) {
+ QUICHE_BUG_IF(quic_bug_AppendPayloadToObjectCalledEarly,
+ !next_object_.has_value())
+ << "AppendPayloadToObject called without an object";
+ QUICHE_BUG_IF(quic_bug_AlreadyGotPayload, next_object_->payload_length == 0)
+ << "AppendPayloadToObject called after payload was already full";
+ // Copy |payload| to the right spot in the buffer.
+ memcpy(payload_.data() + payload_.size() - next_object_->payload_length,
+ payload.data(), payload.length());
+ next_object_->payload_length -= payload.length();
+}
+
+void UpstreamFetch::UpstreamFetchTask::NotifyNewObject() {
+ QUICHE_BUG_IF(quic_bug_NotifyNewObjectCalledEarly,
+ !next_object_.has_value() || next_object_->payload_length > 0)
+ << "NotifyNewObject called without a full object in store";
+ if (need_object_available_callback_ && object_available_callback_) {
+ need_object_available_callback_ = false;
object_available_callback_();
}
}
diff --git a/quiche/quic/moqt/moqt_track.h b/quiche/quic/moqt/moqt_track.h
index fe158e1..6c8bab3 100644
--- a/quiche/quic/moqt/moqt_track.h
+++ b/quiche/quic/moqt/moqt_track.h
@@ -17,6 +17,7 @@
#include "quiche/quic/moqt/moqt_priority.h"
#include "quiche/quic/moqt/moqt_publisher.h"
#include "quiche/quic/moqt/moqt_subscribe_windows.h"
+#include "quiche/common/quiche_buffer_allocator.h"
#include "quiche/common/quiche_callbacks.h"
#include "quiche/common/quiche_weak_ptr.h"
#include "quiche/web_transport/web_transport.h"
@@ -195,13 +196,27 @@
return weak_ptr_factory_.Create();
}
- // Manage the relationship with the data stream.
- void OnStreamOpened(CanReadCallback callback) {
+ // MoqtSession should not use this function; use
+ // UpstreamFetch::OnStreamOpened() instead, in case the task does not exist
+ // yet.
+ void set_can_read_callback(CanReadCallback callback) {
can_read_callback_ = std::move(callback);
+ can_read_callback_(); // Accept the first object.
}
// Called when the data stream receives a new object.
- void NewObject(PublishedObject& object);
+ void NewObject(const MoqtObject& message);
+ void AppendPayloadToObject(absl::string_view payload);
+ // MoqtSession calls this for a hint if the object has been read.
+ bool HasObject() const { return next_object_.has_value(); }
+ bool NeedsMorePayload() const {
+ return next_object_.has_value() && next_object_->payload_length > 0;
+ }
+ // MoqtSession calls NotifyNewObject() after NewObject() because it has to
+ // exit the parser loop before the callback possibly causes another read.
+ // Furthermore, NewObject() may be a partial object, and so
+ // NotifyNewObject() is called only when the object is complete.
+ void NotifyNewObject();
// Deletes callbacks to session or stream, updates the status. If |error|
// has no value, will append an EOF to the object stream.
@@ -214,10 +229,22 @@
absl::Status status_;
TaskDestroyedCallback task_destroyed_callback_;
- // Object delivery state.
- std::optional<PublishedObject> next_object_;
+ // Object delivery state. The payload_length member is used to track the
+ // payload bytes not yet received. The application receives a
+ // PublishedObject that is constructed from next_object_ and payload_.
+ std::optional<MoqtObject> next_object_;
+ // Store payload separately. Will be converted into QuicheMemSlice only when
+ // complete, since QuicheMemSlice is immutable.
+ quiche::QuicheBuffer payload_;
+
+ // The task should only call object_available_callback_ when the last result
+ // was kPending. Otherwise, there can be recursive loops of
+ // GetNextObjectResult().
+ bool need_object_available_callback_ = true;
bool eof_ = false; // The next object is EOF.
+ // The Fetch task signals the application when it has new objects.
ObjectsAvailableCallback object_available_callback_;
+ // The Fetch task signals the stream when it has dispensed of an object.
CanReadCallback can_read_callback_;
// Must be last.
@@ -230,9 +257,16 @@
UpstreamFetchTask* task() { return task_.GetIfAvailable(); }
+ // Manage the relationship with the data stream.
+ void OnStreamOpened(CanReadCallback callback);
+
private:
quiche::QuicheWeakPtr<UpstreamFetchTask> task_;
+ // Before FetchTask is created, an incoming stream will register the callback
+ // here instead.
+ CanReadCallback can_read_callback_;
+
// Initial values from Fetch() call.
FetchResponseCallback ok_callback_; // Will be destroyed on FETCH_OK.
};
diff --git a/quiche/quic/moqt/moqt_track_test.cc b/quiche/quic/moqt/moqt_track_test.cc
index 8072535..ecf1ed3 100644
--- a/quiche/quic/moqt/moqt_track_test.cc
+++ b/quiche/quic/moqt/moqt_track_test.cc
@@ -149,15 +149,35 @@
PublishedObject object;
EXPECT_EQ(fetch_task_->GetNextObject(object),
MoqtFetchTask::GetNextObjectResult::kPending);
- PublishedObject new_object(FullSequence(3, 0),
- MoqtObjectStatus::kGroupDoesNotExist, 128,
- quiche::QuicheMemSlice(), false);
+ MoqtObject new_object = {1, 3, 0, 128, MoqtObjectStatus::kNormal, 0, 6};
+ bool got_object = false;
fetch_task_->SetObjectAvailableCallback([&]() {
+ got_object = true;
EXPECT_EQ(fetch_task_->GetNextObject(object),
MoqtFetchTask::GetNextObjectResult::kSuccess);
- EXPECT_EQ(object.sequence, new_object.sequence);
+ EXPECT_EQ(object.sequence, FullSequence(3, 0, 0));
+ EXPECT_EQ(object.payload.AsStringView(), "foobar");
});
+ int got_read_callback = 0;
+ fetch_.OnStreamOpened([&]() { ++got_read_callback; });
+ EXPECT_FALSE(fetch_.task()->HasObject());
+ EXPECT_FALSE(fetch_.task()->NeedsMorePayload());
fetch_.task()->NewObject(new_object);
+ EXPECT_TRUE(fetch_.task()->HasObject());
+ EXPECT_TRUE(fetch_.task()->NeedsMorePayload());
+ fetch_.task()->AppendPayloadToObject("foo");
+ EXPECT_TRUE(fetch_.task()->HasObject());
+ EXPECT_TRUE(fetch_.task()->NeedsMorePayload());
+ fetch_.task()->AppendPayloadToObject("bar");
+ EXPECT_TRUE(fetch_.task()->HasObject());
+ EXPECT_FALSE(fetch_.task()->NeedsMorePayload());
+ EXPECT_FALSE(got_object);
+ EXPECT_EQ(got_read_callback, 1); // Call from OnStreamOpened().
+ fetch_.task()->NotifyNewObject();
+ EXPECT_FALSE(fetch_.task()->HasObject());
+ EXPECT_FALSE(fetch_.task()->NeedsMorePayload());
+ EXPECT_EQ(got_read_callback, 2); // Call from GetNextObjectResult().
+ EXPECT_TRUE(got_object);
}
} // namespace test
diff --git a/quiche/quic/moqt/test_tools/moqt_session_peer.h b/quiche/quic/moqt/test_tools/moqt_session_peer.h
index e867376..dc94e7b 100644
--- a/quiche/quic/moqt/test_tools/moqt_session_peer.h
+++ b/quiche/quic/moqt/test_tools/moqt_session_peer.h
@@ -10,6 +10,8 @@
#include <optional>
#include <utility>
+
+#include "absl/status/status.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_parser.h"
#include "quiche/quic/moqt/moqt_priority.h"
@@ -57,6 +59,14 @@
return new_stream;
}
+ static std::unique_ptr<webtransport::StreamVisitor>
+ CreateIncomingStreamVisitor(MoqtSession* session,
+ webtransport::Stream* stream) {
+ auto new_stream =
+ std::make_unique<MoqtSession::IncomingDataStream>(session, stream);
+ return new_stream;
+ }
+
// In the test OnSessionReady, the session creates a stream and then passes
// its unique_ptr to the mock webtransport stream. This function casts
// that unique_ptr into a MoqtSession::Stream*, which is a private class of
@@ -156,6 +166,43 @@
uint64_t subscribe_id) {
return *session->published_subscriptions_[subscribe_id]->largest_sent();
}
+
+ // Adds an upstream fetch and a stream ready to receive data.
+ static std::unique_ptr<MoqtFetchTask> CreateUpstreamFetch(
+ MoqtSession* session, webtransport::Stream* stream) {
+ MoqtFetch fetch_message = {
+ 0,
+ FullTrackName{"foo", "bar"},
+ 128,
+ std::nullopt,
+ FullSequence{0, 0},
+ 4,
+ std::nullopt,
+ MoqtSubscribeParameters(),
+ };
+ std::unique_ptr<MoqtFetchTask> task;
+ auto [it, success] = session->upstream_by_id_.try_emplace(
+ 0, std::make_unique<UpstreamFetch>(
+ fetch_message, [&](std::unique_ptr<MoqtFetchTask> fetch_task) {
+ task = std::move(fetch_task);
+ }));
+ QUICHE_DCHECK(success);
+ UpstreamFetch* fetch = static_cast<UpstreamFetch*>(it->second.get());
+ // Initialize the fetch task
+ fetch->OnFetchResult(
+ FullSequence{4, 10}, absl::OkStatus(),
+ [=, session_ptr = session, fetch_id = fetch_message.subscribe_id]() {
+ session_ptr->CancelFetch(fetch_id);
+ });
+ ;
+ auto mock_session =
+ static_cast<webtransport::test::MockSession*>(session->session());
+ EXPECT_CALL(*mock_session, AcceptIncomingUnidirectionalStream())
+ .WillOnce(testing::Return(stream))
+ .WillOnce(testing::Return(nullptr));
+ session->OnIncomingUnidirectionalStreamAvailable();
+ return task;
+ }
};
} // namespace moqt::test