Add MoqtSessionInterface to make mocking out a MoqtSession easier.
This CL only moves the subscriber-side APIs into the interface, leaving the publisher side as a TODO.
Also add comments and cleans up things a bit while we're at it.
PiperOrigin-RevId: 740952968
diff --git a/build/source_list.bzl b/build/source_list.bzl
index 17ab549..b6c75a5 100644
--- a/build/source_list.bzl
+++ b/build/source_list.bzl
@@ -1544,6 +1544,8 @@
"quic/moqt/moqt_probe_manager.h",
"quic/moqt/moqt_publisher.h",
"quic/moqt/moqt_session.h",
+ "quic/moqt/moqt_session_callbacks.h",
+ "quic/moqt/moqt_session_interface.h",
"quic/moqt/moqt_subscribe_windows.h",
"quic/moqt/moqt_track.h",
"quic/moqt/test_tools/moqt_framer_utils.h",
diff --git a/build/source_list.gni b/build/source_list.gni
index 5465b6a..d66f724 100644
--- a/build/source_list.gni
+++ b/build/source_list.gni
@@ -1548,6 +1548,8 @@
"src/quiche/quic/moqt/moqt_probe_manager.h",
"src/quiche/quic/moqt/moqt_publisher.h",
"src/quiche/quic/moqt/moqt_session.h",
+ "src/quiche/quic/moqt/moqt_session_callbacks.h",
+ "src/quiche/quic/moqt/moqt_session_interface.h",
"src/quiche/quic/moqt/moqt_subscribe_windows.h",
"src/quiche/quic/moqt/moqt_track.h",
"src/quiche/quic/moqt/test_tools/moqt_framer_utils.h",
diff --git a/build/source_list.json b/build/source_list.json
index 1a9fe83..8ae8fe1 100644
--- a/build/source_list.json
+++ b/build/source_list.json
@@ -1547,6 +1547,8 @@
"quiche/quic/moqt/moqt_probe_manager.h",
"quiche/quic/moqt/moqt_publisher.h",
"quiche/quic/moqt/moqt_session.h",
+ "quiche/quic/moqt/moqt_session_callbacks.h",
+ "quiche/quic/moqt/moqt_session_interface.h",
"quiche/quic/moqt/moqt_subscribe_windows.h",
"quiche/quic/moqt/moqt_track.h",
"quiche/quic/moqt/test_tools/moqt_framer_utils.h",
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc
index 7707755..a85e212 100644
--- a/quiche/quic/moqt/moqt_integration_test.cc
+++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -86,7 +86,8 @@
EXPECT_CALL(*visitor, OnReply(track_name, std::optional<FullSequence>(),
std::optional<absl::string_view>()))
.WillOnce([&]() { received_ok = true; });
- client_->session()->SubscribeCurrentObject(track_name, visitor);
+ client_->session()->SubscribeCurrentObject(track_name, visitor,
+ MoqtSubscribeParameters());
bool success =
test_harness_.RunUntilWithDefaultTimeout([&]() { return received_ok; });
EXPECT_TRUE(success);
@@ -236,7 +237,8 @@
FullTrackName track_name = track_namespace;
track_name.AddElement("/catalog");
EXPECT_FALSE(error.has_value());
- server_->session()->SubscribeCurrentGroup(track_name, &server_visitor);
+ server_->session()->SubscribeCurrentGroup(track_name, &server_visitor,
+ MoqtSubscribeParameters());
});
EXPECT_CALL(server_visitor, OnReply(_, _, _)).WillOnce([&]() {
matches = true;
@@ -259,7 +261,8 @@
FullTrackName track_name = track_namespace;
track_name.AddElement("data");
server_->session()->SubscribeAbsolute(
- track_name, /*start_group=*/0, /*start_object=*/0, &server_visitor);
+ track_name, /*start_group=*/0, /*start_object=*/0, &server_visitor,
+ MoqtSubscribeParameters());
return std::optional<MoqtAnnounceErrorReason>();
});
@@ -322,7 +325,8 @@
queue->AddObject(MemSliceFromString("object 5"), /*key=*/false);
client_->session()->SubscribeCurrentGroup(FullTrackName("test", name),
- &client_visitor);
+ &client_visitor,
+ MoqtSubscribeParameters());
int received = 0;
EXPECT_CALL(client_visitor, OnReply);
EXPECT_CALL(client_visitor,
@@ -453,7 +457,8 @@
});
EXPECT_CALL(client_visitor, OnReply(full_track_name, _, expected_reason))
.WillOnce([&]() { received_ok = true; });
- client_->session()->SubscribeAbsolute(full_track_name, 0, 0, &client_visitor);
+ client_->session()->SubscribeAbsolute(full_track_name, 0, 0, &client_visitor,
+ MoqtSubscribeParameters());
bool success =
test_harness_.RunUntilWithDefaultTimeout([&]() { return received_ok; });
EXPECT_TRUE(success);
@@ -478,7 +483,8 @@
});
EXPECT_CALL(client_visitor, OnReply(full_track_name, _, expected_reason))
.WillOnce([&]() { received_ok = true; });
- client_->session()->SubscribeCurrentObject(full_track_name, &client_visitor);
+ client_->session()->SubscribeCurrentObject(full_track_name, &client_visitor,
+ MoqtSubscribeParameters());
bool success =
test_harness_.RunUntilWithDefaultTimeout([&]() { return received_ok; });
EXPECT_TRUE(success);
@@ -503,7 +509,8 @@
});
EXPECT_CALL(client_visitor, OnReply(full_track_name, _, expected_reason))
.WillOnce([&]() { received_ok = true; });
- client_->session()->SubscribeCurrentGroup(full_track_name, &client_visitor);
+ client_->session()->SubscribeCurrentGroup(full_track_name, &client_visitor,
+ MoqtSubscribeParameters());
bool success =
test_harness_.RunUntilWithDefaultTimeout([&]() { return received_ok; });
EXPECT_TRUE(success);
@@ -517,7 +524,8 @@
bool received_ok = false;
EXPECT_CALL(client_visitor, OnReply(full_track_name, _, expected_reason))
.WillOnce([&]() { received_ok = true; });
- client_->session()->SubscribeCurrentObject(full_track_name, &client_visitor);
+ client_->session()->SubscribeCurrentObject(full_track_name, &client_visitor,
+ MoqtSubscribeParameters());
bool success =
test_harness_.RunUntilWithDefaultTimeout([&]() { return received_ok; });
EXPECT_TRUE(success);
@@ -549,8 +557,8 @@
EXPECT_TRUE(success);
// Reject this subscribe because there already is one.
- EXPECT_FALSE(client_->session()->SubscribeCurrentGroup(full_track_name,
- &client_visitor));
+ EXPECT_FALSE(client_->session()->SubscribeCurrentGroup(
+ full_track_name, &client_visitor, MoqtSubscribeParameters()));
queue->RemoveAllSubscriptions(); // Induce a SUBSCRIBE_DONE.
bool subscribe_done = false;
EXPECT_CALL(client_visitor, OnSubscribeDone).WillOnce([&]() {
@@ -561,8 +569,8 @@
EXPECT_TRUE(success);
// Subscription is deleted; the client session should not immediately reject
// a new attempt.
- EXPECT_TRUE(client_->session()->SubscribeCurrentGroup(full_track_name,
- &client_visitor));
+ EXPECT_TRUE(client_->session()->SubscribeCurrentGroup(
+ full_track_name, &client_visitor, MoqtSubscribeParameters()));
}
TEST_F(MoqtIntegrationTest, ObjectAcks) {
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index fb578f8..0d0bdfd 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -28,6 +28,8 @@
#include "quiche/quic/moqt/moqt_parser.h"
#include "quiche/quic/moqt/moqt_priority.h"
#include "quiche/quic/moqt/moqt_publisher.h"
+#include "quiche/quic/moqt/moqt_session_callbacks.h"
+#include "quiche/quic/moqt/moqt_session_interface.h"
#include "quiche/quic/moqt/moqt_subscribe_windows.h"
#include "quiche/quic/moqt/moqt_track.h"
#include "quiche/common/platform/api/quiche_export.h"
@@ -42,70 +44,6 @@
class MoqtSessionPeer;
}
-using MoqtSessionEstablishedCallback = quiche::SingleUseCallback<void()>;
-using MoqtSessionGoAwayCallback =
- quiche::SingleUseCallback<void(absl::string_view new_session_uri)>;
-using MoqtSessionTerminatedCallback =
- quiche::SingleUseCallback<void(absl::string_view error_message)>;
-using MoqtSessionDeletedCallback = quiche::SingleUseCallback<void()>;
-
-enum class SubscribeEvent { kSubscribe, kUnsubscribe };
-enum class AnnounceEvent { kAnnounce, kUnannounce };
-
-// If |error_message| is nullopt, this is triggered by an ANNOUNCE_OK.
-// Otherwise, it is triggered by ANNOUNCE_ERROR or ANNOUNCE_CANCEL. For
-// ERROR or CANCEL, MoqtSession is deleting all ANNOUNCE state immediately
-// after calling this callback. Alternatively, the application can call
-// Unannounce() to delete the state.
-using MoqtOutgoingAnnounceCallback = quiche::MultiUseCallback<void(
- FullTrackName track_namespace,
- std::optional<MoqtAnnounceErrorReason> error)>;
-using MoqtIncomingAnnounceCallback =
- quiche::MultiUseCallback<std::optional<MoqtAnnounceErrorReason>(
- const FullTrackName& track_namespace, AnnounceEvent announce_type)>;
-using MoqtOutgoingSubscribeAnnouncesCallback = quiche::SingleUseCallback<void(
- FullTrackName track_namespace, std::optional<SubscribeErrorCode> error,
- absl::string_view reason)>;
-// If the return value is nullopt, the Session will respond with
-// SUBSCRIBE_ANNOUNCES_OK. Otherwise, it will respond with
-// SUBSCRIBE_ANNOUNCES_ERROR.
-// If |subscribe_type| is kUnsubscribe, this is an UNSUBSCRIBE_ANNOUNCES message
-// and the return value will be ignored.
-using MoqtIncomingSubscribeAnnouncesCallback =
- quiche::MultiUseCallback<std::optional<MoqtSubscribeErrorReason>(
- const FullTrackName& track_namespace, SubscribeEvent subscribe_type)>;
-
-inline std::optional<MoqtAnnounceErrorReason> DefaultIncomingAnnounceCallback(
- const FullTrackName& /*track_namespace*/, AnnounceEvent /*announce*/) {
- return std::optional(MoqtAnnounceErrorReason{
- SubscribeErrorCode::kNotSupported,
- "This endpoint does not accept incoming ANNOUNCE messages"});
-};
-
-inline std::optional<MoqtSubscribeErrorReason>
-DefaultIncomingSubscribeAnnouncesCallback(const FullTrackName& track_namespace,
- SubscribeEvent /*subscribe_type*/) {
- return MoqtSubscribeErrorReason{
- SubscribeErrorCode::kUnauthorized,
- "This endpoint does not support incoming SUBSCRIBE_ANNOUNCES messages"};
-}
-
-// Callbacks for session-level events.
-struct MoqtSessionCallbacks {
- MoqtSessionEstablishedCallback session_established_callback = +[] {};
- MoqtSessionGoAwayCallback goaway_received_callback =
- +[](absl::string_view) {};
- MoqtSessionTerminatedCallback session_terminated_callback =
- +[](absl::string_view) {};
- MoqtSessionDeletedCallback session_deleted_callback = +[] {};
-
- MoqtIncomingAnnounceCallback incoming_announce_callback =
- DefaultIncomingAnnounceCallback;
- MoqtIncomingSubscribeAnnouncesCallback incoming_subscribe_announces_callback =
- DefaultIncomingSubscribeAnnouncesCallback;
- const quic::QuicClock* clock = quic::QuicDefaultClock::Get();
-};
-
struct SubscriptionWithQueuedStream {
webtransport::SendOrder send_order;
uint64_t subscription_id;
@@ -124,7 +62,8 @@
quic::QuicTimeDelta delta_from_deadline) = 0;
};
-class QUICHE_EXPORT MoqtSession : public webtransport::SessionVisitor {
+class QUICHE_EXPORT MoqtSession : public MoqtSessionInterface,
+ public webtransport::SessionVisitor {
public:
MoqtSession(webtransport::Session* session, MoqtSessionParameters parameters,
std::unique_ptr<quic::QuicAlarmFactory> alarm_factory,
@@ -147,7 +86,7 @@
void OnCanCreateNewOutgoingBidirectionalStream() override {}
void OnCanCreateNewOutgoingUnidirectionalStream() override;
- void Error(MoqtError code, absl::string_view error);
+ void Error(MoqtError code, absl::string_view error) override;
quic::Perspective perspective() const { return parameters_.perspective; }
@@ -174,21 +113,21 @@
// the track, the message will still be sent. However, the visitor will be
// ignored.
// Subscribe from (start_group, start_object) to the end of the track.
- bool SubscribeAbsolute(
- const FullTrackName& name, uint64_t start_group, uint64_t start_object,
- SubscribeRemoteTrack::Visitor* visitor,
- MoqtSubscribeParameters parameters = MoqtSubscribeParameters());
+ bool SubscribeAbsolute(const FullTrackName& name, uint64_t start_group,
+ uint64_t start_object,
+ SubscribeRemoteTrack::Visitor* visitor,
+ MoqtSubscribeParameters parameters) override;
// Subscribe from (start_group, start_object) to the end of end_group.
- bool SubscribeAbsolute(
- const FullTrackName& name, uint64_t start_group, uint64_t start_object,
- uint64_t end_group, SubscribeRemoteTrack::Visitor* visitor,
- MoqtSubscribeParameters parameters = MoqtSubscribeParameters());
- bool SubscribeCurrentObject(
- const FullTrackName& name, SubscribeRemoteTrack::Visitor* visitor,
- MoqtSubscribeParameters parameters = MoqtSubscribeParameters());
- bool SubscribeCurrentGroup(
- const FullTrackName& name, SubscribeRemoteTrack::Visitor* visitor,
- MoqtSubscribeParameters parameters = MoqtSubscribeParameters());
+ bool SubscribeAbsolute(const FullTrackName& name, uint64_t start_group,
+ uint64_t start_object, uint64_t end_group,
+ SubscribeRemoteTrack::Visitor* visitor,
+ MoqtSubscribeParameters parameters) override;
+ bool SubscribeCurrentObject(const FullTrackName& name,
+ SubscribeRemoteTrack::Visitor* visitor,
+ MoqtSubscribeParameters parameters) override;
+ bool SubscribeCurrentGroup(const FullTrackName& name,
+ SubscribeRemoteTrack::Visitor* visitor,
+ MoqtSubscribeParameters parameters) override;
// Returns false if the subscription is not found. The session immediately
// destroys all subscription state.
void Unsubscribe(const FullTrackName& name);
@@ -200,32 +139,33 @@
FullSequence start, uint64_t end_group,
std::optional<uint64_t> end_object, MoqtPriority priority,
std::optional<MoqtDeliveryOrder> delivery_order,
- MoqtSubscribeParameters parameters = MoqtSubscribeParameters());
+ MoqtSubscribeParameters parameters) override;
// Sends both a SUBSCRIBE and a joining FETCH, beginning |num_previous_groups|
// groups before the current group. The Fetch will not be flow controlled,
// instead using |visitor| to deliver fetched objects when they arrive. Gaps
// in the FETCH will not be filled by with ObjectDoesNotExist. If the FETCH
// fails for any reason, the application will not receive a notification; it
// will just appear to be missing objects.
- bool JoiningFetch(
- const FullTrackName& name, SubscribeRemoteTrack::Visitor* visitor,
- uint64_t num_previous_groups,
- MoqtSubscribeParameters parameters = MoqtSubscribeParameters());
+ bool JoiningFetch(const FullTrackName& name,
+ SubscribeRemoteTrack::Visitor* visitor,
+ uint64_t num_previous_groups,
+ MoqtSubscribeParameters parameters) override;
// Sends both a SUBSCRIBE and a joining FETCH, beginning |num_previous_groups|
// groups before the current group. The application provides |callback| to
// fully control acceptance of Fetched objects.
- bool JoiningFetch(
- const FullTrackName& name, SubscribeRemoteTrack::Visitor* visitor,
- FetchResponseCallback callback, uint64_t num_previous_groups,
- MoqtPriority priority, std::optional<MoqtDeliveryOrder> delivery_order,
- MoqtSubscribeParameters parameters = MoqtSubscribeParameters());
+ bool JoiningFetch(const FullTrackName& name,
+ SubscribeRemoteTrack::Visitor* visitor,
+ FetchResponseCallback callback,
+ uint64_t num_previous_groups, MoqtPriority priority,
+ std::optional<MoqtDeliveryOrder> delivery_order,
+ MoqtSubscribeParameters parameters) override;
// Send a GOAWAY message to the peer. |new_session_uri| must be empty if
// called by the client.
void GoAway(absl::string_view new_session_uri);
webtransport::Session* session() { return session_; }
- MoqtSessionCallbacks& callbacks() { return callbacks_; }
+ MoqtSessionCallbacks& callbacks() override { return callbacks_; }
MoqtPublisher* publisher() { return publisher_; }
void set_publisher(MoqtPublisher* publisher) { publisher_ = publisher; }
bool support_object_acks() const { return parameters_.support_object_acks; }
diff --git a/quiche/quic/moqt/moqt_session_callbacks.h b/quiche/quic/moqt/moqt_session_callbacks.h
new file mode 100644
index 0000000..2ae19bb
--- /dev/null
+++ b/quiche/quic/moqt/moqt_session_callbacks.h
@@ -0,0 +1,81 @@
+// Copyright 2023 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_MOQT_MOQT_SESSION_CALLBACKS_H_
+#define QUICHE_QUIC_MOQT_MOQT_SESSION_CALLBACKS_H_
+
+#include <optional>
+
+#include "absl/strings/string_view.h"
+#include "quiche/quic/core/quic_clock.h"
+#include "quiche/quic/core/quic_default_clock.h"
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/common/quiche_callbacks.h"
+
+namespace moqt {
+
+enum class SubscribeEvent { kSubscribe, kUnsubscribe };
+enum class AnnounceEvent { kAnnounce, kUnannounce };
+
+// Called when the SETUP message from the peer is received.
+using MoqtSessionEstablishedCallback = quiche::SingleUseCallback<void()>;
+
+// Called when a GOAWAY message is received from the server.
+using MoqtSessionGoAwayCallback =
+ quiche::SingleUseCallback<void(absl::string_view new_session_uri)>;
+
+// Called when the session is terminated.
+using MoqtSessionTerminatedCallback =
+ quiche::SingleUseCallback<void(absl::string_view error_message)>;
+
+// Called from the session destructor.
+using MoqtSessionDeletedCallback = quiche::SingleUseCallback<void()>;
+
+// Called whenever an ANNOUNCE or UNANNOUNCE message is received from the peer.
+using MoqtIncomingAnnounceCallback =
+ quiche::MultiUseCallback<std::optional<MoqtAnnounceErrorReason>(
+ const FullTrackName& track_namespace, AnnounceEvent announce_type)>;
+
+// Called whenever SUBSCRIBE_ANNOUNCES or UNSUBSCRIBE_ANNOUNCES is received from
+// the peer. For SUBSCRIBE_ANNOUNCES, the return value indicates whether to
+// return an OK or an ERROR; for UNSUBSCRIBE_ANNOUNCES, the return value is
+// ignored.
+using MoqtIncomingSubscribeAnnouncesCallback =
+ quiche::MultiUseCallback<std::optional<MoqtSubscribeErrorReason>(
+ const FullTrackName& track_namespace, SubscribeEvent subscribe_type)>;
+
+inline std::optional<MoqtAnnounceErrorReason> DefaultIncomingAnnounceCallback(
+ const FullTrackName& /*track_namespace*/, AnnounceEvent /*announce*/) {
+ return std::optional(MoqtAnnounceErrorReason{
+ SubscribeErrorCode::kNotSupported,
+ "This endpoint does not accept incoming ANNOUNCE messages"});
+};
+
+inline std::optional<MoqtSubscribeErrorReason>
+DefaultIncomingSubscribeAnnouncesCallback(const FullTrackName& track_namespace,
+ SubscribeEvent /*subscribe_type*/) {
+ return MoqtSubscribeErrorReason{
+ SubscribeErrorCode::kUnauthorized,
+ "This endpoint does not support incoming SUBSCRIBE_ANNOUNCES messages"};
+}
+
+// Callbacks for session-level events.
+struct MoqtSessionCallbacks {
+ MoqtSessionEstablishedCallback session_established_callback = +[] {};
+ MoqtSessionGoAwayCallback goaway_received_callback =
+ +[](absl::string_view) {};
+ MoqtSessionTerminatedCallback session_terminated_callback =
+ +[](absl::string_view) {};
+ MoqtSessionDeletedCallback session_deleted_callback = +[] {};
+
+ MoqtIncomingAnnounceCallback incoming_announce_callback =
+ DefaultIncomingAnnounceCallback;
+ MoqtIncomingSubscribeAnnouncesCallback incoming_subscribe_announces_callback =
+ DefaultIncomingSubscribeAnnouncesCallback;
+ const quic::QuicClock* clock = quic::QuicDefaultClock::Get();
+};
+
+} // namespace moqt
+
+#endif // QUICHE_QUIC_MOQT_MOQT_SESSION_CALLBACKS_H_
diff --git a/quiche/quic/moqt/moqt_session_interface.h b/quiche/quic/moqt/moqt_session_interface.h
new file mode 100644
index 0000000..a814bae
--- /dev/null
+++ b/quiche/quic/moqt/moqt_session_interface.h
@@ -0,0 +1,107 @@
+// Copyright 2023 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_MOQT_MOQT_SESSION_INTERFACE_H_
+#define QUICHE_QUIC_MOQT_MOQT_SESSION_INTERFACE_H_
+
+#include <cstdint>
+#include <optional>
+
+#include "absl/strings/string_view.h"
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_priority.h"
+#include "quiche/quic/moqt/moqt_session_callbacks.h"
+#include "quiche/quic/moqt/moqt_track.h"
+#include "quiche/common/quiche_callbacks.h"
+
+namespace moqt {
+
+// If |error_message| is nullopt, this is triggered by an ANNOUNCE_OK.
+// Otherwise, it is triggered by ANNOUNCE_ERROR or ANNOUNCE_CANCEL. For
+// ERROR or CANCEL, MoqtSession is deleting all ANNOUNCE state immediately
+// after calling this callback. Alternatively, the application can call
+// Unannounce() to delete the state.
+using MoqtOutgoingAnnounceCallback = quiche::MultiUseCallback<void(
+ FullTrackName track_namespace,
+ std::optional<MoqtAnnounceErrorReason> error)>;
+
+using MoqtOutgoingSubscribeAnnouncesCallback = quiche::SingleUseCallback<void(
+ FullTrackName track_namespace, std::optional<SubscribeErrorCode> error,
+ absl::string_view reason)>;
+
+class MoqtSessionInterface {
+ public:
+ virtual ~MoqtSessionInterface() = default;
+
+ // TODO: move ANNOUNCE logic here.
+
+ // Callbacks for session-level events.
+ virtual MoqtSessionCallbacks& callbacks() = 0;
+
+ // Close the session with a fatal error.
+ virtual void Error(MoqtError code, absl::string_view error) = 0;
+
+ // Methods below send a SUBSCRIBE for the specified track, and return true if
+ // SUBSCRIBE was actually sent.
+
+ // Subscribe from (start_group, start_object) to the end of the track.
+ virtual bool SubscribeAbsolute(const FullTrackName& name,
+ uint64_t start_group, uint64_t start_object,
+ SubscribeRemoteTrack::Visitor* visitor,
+ MoqtSubscribeParameters parameters) = 0;
+ // Subscribe from (start_group, start_object) to the end of end_group.
+ virtual bool SubscribeAbsolute(const FullTrackName& name,
+ uint64_t start_group, uint64_t start_object,
+ uint64_t end_group,
+ SubscribeRemoteTrack::Visitor* visitor,
+ MoqtSubscribeParameters parameters) = 0;
+ // Subscribe to all objects that are larger than the current Largest
+ // Group/Object ID.
+ virtual bool SubscribeCurrentObject(const FullTrackName& name,
+ SubscribeRemoteTrack::Visitor* visitor,
+ MoqtSubscribeParameters parameters) = 0;
+ // TODO(vasilvv): remove.
+ [[deprecated]] virtual bool SubscribeCurrentGroup(
+ const FullTrackName& name, SubscribeRemoteTrack::Visitor* visitor,
+ MoqtSubscribeParameters parameters) = 0;
+
+ // Sends an UNSUBSCRIBE message and removes all of the state related to the
+ // subscription. Returns false if the subscription is not found.
+ virtual void Unsubscribe(const FullTrackName& name) = 0;
+
+ // Sends a FETCH for a pre-specified object range. Once a FETCH_OK or a
+ // FETCH_ERROR is received, `callback` is called with a MoqtFetchTask that can
+ // be used to process the FETCH further. To cancel a FETCH, simply destroy
+ // the MoqtFetchTask.
+ virtual bool Fetch(const FullTrackName& name, FetchResponseCallback callback,
+ FullSequence start, uint64_t end_group,
+ std::optional<uint64_t> end_object, MoqtPriority priority,
+ std::optional<MoqtDeliveryOrder> delivery_order,
+ MoqtSubscribeParameters parameters) = 0;
+
+ // Sends both a SUBSCRIBE and a joining FETCH, beginning `num_previous_groups`
+ // groups before the current group. The Fetch will not be flow controlled,
+ // instead using |visitor| to deliver fetched objects when they arrive. Gaps
+ // in the FETCH will not be filled by with ObjectDoesNotExist. If the FETCH
+ // fails for any reason, the application will not receive a notification; it
+ // will just appear to be missing objects.
+ virtual bool JoiningFetch(const FullTrackName& name,
+ SubscribeRemoteTrack::Visitor* visitor,
+ uint64_t num_previous_groups,
+ MoqtSubscribeParameters parameters) = 0;
+
+ // Sends both a SUBSCRIBE and a joining FETCH, beginning `num_previous_groups`
+ // groups before the current group. `callback` acts the same way as the
+ // callback for the regular Fetch() call.
+ virtual bool JoiningFetch(const FullTrackName& name,
+ SubscribeRemoteTrack::Visitor* visitor,
+ FetchResponseCallback callback,
+ uint64_t num_previous_groups, MoqtPriority priority,
+ std::optional<MoqtDeliveryOrder> delivery_order,
+ MoqtSubscribeParameters parameters) = 0;
+};
+
+} // namespace moqt
+
+#endif // QUICHE_QUIC_MOQT_MOQT_SESSION_INTERFACE_H_
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index 5829417..8242ee1 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -605,16 +605,19 @@
EXPECT_CALL(mock_stream_,
Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _));
EXPECT_TRUE(session_.SubscribeCurrentGroup(FullTrackName("foo", "bar"),
- &remote_track_visitor));
+ &remote_track_visitor,
+ MoqtSubscribeParameters()));
EXPECT_CALL(
mock_stream_,
Writev(ControlMessageOfType(MoqtMessageType::kSubscribesBlocked), _))
.Times(1);
EXPECT_FALSE(session_.SubscribeCurrentGroup(FullTrackName("foo2", "bar2"),
- &remote_track_visitor));
+ &remote_track_visitor,
+ MoqtSubscribeParameters()));
// Second time does not send SUBSCRIBES_BLOCKED.
EXPECT_FALSE(session_.SubscribeCurrentGroup(FullTrackName("foo2", "bar2"),
- &remote_track_visitor));
+ &remote_track_visitor,
+ MoqtSubscribeParameters()));
}
TEST_F(MoqtSessionTest, SubscribeDuplicateTrackName) {
@@ -626,9 +629,11 @@
EXPECT_CALL(mock_stream_,
Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _));
EXPECT_TRUE(session_.SubscribeCurrentGroup(FullTrackName("foo", "bar"),
- &remote_track_visitor));
+ &remote_track_visitor,
+ MoqtSubscribeParameters()));
EXPECT_FALSE(session_.SubscribeCurrentGroup(FullTrackName("foo", "bar"),
- &remote_track_visitor));
+ &remote_track_visitor,
+ MoqtSubscribeParameters()));
}
TEST_F(MoqtSessionTest, SubscribeWithOk) {
@@ -639,7 +644,8 @@
EXPECT_CALL(mock_stream_,
Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _));
session_.SubscribeCurrentGroup(FullTrackName("foo", "bar"),
- &remote_track_visitor);
+ &remote_track_visitor,
+ MoqtSubscribeParameters());
MoqtSubscribeOk ok = {
/*subscribe_id=*/0,
@@ -667,7 +673,8 @@
mock_stream_,
Writev(ControlMessageOfType(MoqtMessageType::kSubscribesBlocked), _));
EXPECT_FALSE(session_.SubscribeCurrentGroup(FullTrackName("foo", "bar"),
- &remote_track_visitor));
+ &remote_track_visitor,
+ MoqtSubscribeParameters()));
MoqtMaxSubscribeId max_subscribe_id = {
/*max_subscribe_id=*/kDefaultInitialMaxSubscribeId + 1,
};
@@ -676,7 +683,8 @@
EXPECT_CALL(mock_stream_,
Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _));
EXPECT_TRUE(session_.SubscribeCurrentGroup(FullTrackName("foo", "bar"),
- &remote_track_visitor));
+ &remote_track_visitor,
+ MoqtSubscribeParameters()));
}
TEST_F(MoqtSessionTest, LowerMaxSubscribeIdIsAnError) {
@@ -715,7 +723,8 @@
EXPECT_CALL(mock_stream_,
Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _));
session_.SubscribeCurrentGroup(FullTrackName("foo", "bar"),
- &remote_track_visitor);
+ &remote_track_visitor,
+ MoqtSubscribeParameters());
MoqtSubscribeError error = {
/*subscribe_id=*/0,
@@ -2354,7 +2363,8 @@
EXPECT_CALL(mock_stream_,
Writev(ControlMessageOfType(MoqtMessageType::kFetch), _));
EXPECT_TRUE(session_.JoiningFetch(FullTrackName("foo", "bar"),
- &remote_track_visitor, 0));
+ &remote_track_visitor, 0,
+ MoqtSubscribeParameters()));
EXPECT_CALL(remote_track_visitor, OnReply).Times(1);
stream_input->OnSubscribeOkMessage(
@@ -2953,7 +2963,8 @@
EXPECT_CALL(mock_stream_, Writev).Times(0);
MockSubscribeRemoteTrackVisitor remote_track_visitor;
EXPECT_FALSE(session_.SubscribeCurrentGroup(FullTrackName("foo", "bar"),
- &remote_track_visitor));
+ &remote_track_visitor,
+ MoqtSubscribeParameters()));
EXPECT_FALSE(session_.SubscribeAnnounces(
FullTrackName{"foo"}, +[](FullTrackName /*track_namespace*/,
std::optional<SubscribeErrorCode> /*error*/,
@@ -2965,7 +2976,7 @@
EXPECT_FALSE(session_.Fetch(
FullTrackName{"foo", "bar"},
+[](std::unique_ptr<MoqtFetchTask> /*fetch_task*/) {}, FullSequence(0, 0),
- 5, std::nullopt, 127, std::nullopt));
+ 5, std::nullopt, 127, std::nullopt, MoqtSubscribeParameters()));
// Error on additional GOAWAY.
EXPECT_CALL(mock_session_,
CloseSession(static_cast<uint64_t>(MoqtError::kProtocolViolation),
@@ -3009,7 +3020,8 @@
EXPECT_CALL(mock_stream_, Writev).Times(0);
MockSubscribeRemoteTrackVisitor remote_track_visitor;
EXPECT_FALSE(session_.SubscribeCurrentGroup(FullTrackName("foo", "bar"),
- &remote_track_visitor));
+ &remote_track_visitor,
+ MoqtSubscribeParameters()));
EXPECT_FALSE(session_.SubscribeAnnounces(
FullTrackName{"foo"}, +[](FullTrackName /*track_namespace*/,
std::optional<SubscribeErrorCode> /*error*/,
@@ -3021,7 +3033,7 @@
EXPECT_FALSE(session_.Fetch(
FullTrackName{"foo", "bar"},
+[](std::unique_ptr<MoqtFetchTask> /*fetch_task*/) {}, FullSequence(0, 0),
- 5, std::nullopt, 127, std::nullopt));
+ 5, std::nullopt, 127, std::nullopt, MoqtSubscribeParameters()));
session_.GoAway("");
// GoAway timer fires.
auto* goaway_alarm = static_cast<quic::test::MockAlarmFactory::TestAlarm*>(
@@ -3077,7 +3089,8 @@
EXPECT_CALL(control_stream,
Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _));
EXPECT_TRUE(session_.SubscribeCurrentObject(FullTrackName("foo", "bar"),
- &remote_track_visitor));
+ &remote_track_visitor,
+ MoqtSubscribeParameters()));
MoqtSubscribeOk ok = {
/*subscribe_id=*/0,
/*expires=*/quic::QuicTimeDelta::FromMilliseconds(10000),
@@ -3134,7 +3147,8 @@
EXPECT_CALL(control_stream,
Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _));
EXPECT_TRUE(session_.SubscribeCurrentObject(FullTrackName("foo", "bar"),
- &remote_track_visitor));
+ &remote_track_visitor,
+ MoqtSubscribeParameters()));
MoqtSubscribeOk ok = {
/*subscribe_id=*/0,
/*expires=*/quic::QuicTimeDelta::FromMilliseconds(10000),
@@ -3188,7 +3202,8 @@
EXPECT_CALL(control_stream,
Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _));
EXPECT_TRUE(session_.SubscribeCurrentObject(FullTrackName("foo", "bar"),
- &remote_track_visitor));
+ &remote_track_visitor,
+ MoqtSubscribeParameters()));
MoqtSubscribeOk ok = {
/*subscribe_id=*/0,
/*expires=*/quic::QuicTimeDelta::FromMilliseconds(10000),
diff --git a/quiche/quic/moqt/tools/chat_server.cc b/quiche/quic/moqt/tools/chat_server.cc
index 95ee033..8d7a3c6 100644
--- a/quiche/quic/moqt/tools/chat_server.cc
+++ b/quiche/quic/moqt/tools/chat_server.cc
@@ -49,8 +49,8 @@
return std::nullopt;
}
std::cout << "Received ANNOUNCE for " << track_namespace.ToString() << "\n";
- session_->SubscribeCurrentGroup(*track_name_,
- server_->remote_track_visitor());
+ session_->SubscribeCurrentGroup(*track_name_, server_->remote_track_visitor(),
+ MoqtSubscribeParameters());
server_->AddUser(*track_name_);
return std::nullopt;
}
diff --git a/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc b/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc
index d19bff3..e4d01cd 100644
--- a/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc
+++ b/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc
@@ -154,7 +154,8 @@
for (absl::string_view track : tracks_to_subscribe) {
FullTrackName full_track_name = track_namespace;
full_track_name.AddElement(track);
- session_->SubscribeCurrentGroup(full_track_name, &it->second);
+ session_->SubscribeCurrentGroup(full_track_name, &it->second,
+ MoqtSubscribeParameters());
}
return std::nullopt;