| // Copyright 2023 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "quiche/quic/moqt/moqt_session.h" |
| |
| #include <cstdint> |
| #include <memory> |
| #include <optional> |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include "absl/algorithm/container.h" |
| #include "absl/strings/str_cat.h" |
| #include "absl/strings/string_view.h" |
| #include "quiche/quic/core/quic_types.h" |
| #include "quiche/quic/moqt/moqt_messages.h" |
| #include "quiche/quic/moqt/moqt_subscribe_windows.h" |
| #include "quiche/quic/moqt/moqt_track.h" |
| #include "quiche/common/platform/api/quiche_logging.h" |
| #include "quiche/common/quiche_buffer_allocator.h" |
| #include "quiche/common/quiche_stream.h" |
| #include "quiche/web_transport/web_transport.h" |
| |
| #define ENDPOINT \ |
| (perspective() == Perspective::IS_SERVER ? "MoQT Server: " : "MoQT Client: ") |
| |
| namespace moqt { |
| |
| using ::quic::Perspective; |
| |
| constexpr int kMaxBufferedObjects = 1000; |
| |
| void MoqtSession::OnSessionReady() { |
| QUICHE_DLOG(INFO) << ENDPOINT << "Underlying session ready"; |
| if (parameters_.perspective == Perspective::IS_SERVER) { |
| return; |
| } |
| |
| webtransport::Stream* control_stream = |
| session_->OpenOutgoingBidirectionalStream(); |
| if (control_stream == nullptr) { |
| Error(MoqtError::kGenericError, "Unable to open a control stream"); |
| return; |
| } |
| control_stream->SetVisitor(std::make_unique<Stream>( |
| this, control_stream, /*is_control_stream=*/true)); |
| control_stream_ = control_stream->GetStreamId(); |
| MoqtClientSetup setup = MoqtClientSetup{ |
| .supported_versions = std::vector<MoqtVersion>{parameters_.version}, |
| .role = MoqtRole::kBoth, |
| }; |
| if (!parameters_.using_webtrans) { |
| setup.path = parameters_.path; |
| } |
| quiche::QuicheBuffer serialized_setup = framer_.SerializeClientSetup(setup); |
| bool success = control_stream->Write(serialized_setup.AsStringView()); |
| if (!success) { |
| Error(MoqtError::kGenericError, "Failed to write client SETUP message"); |
| return; |
| } |
| QUIC_DLOG(INFO) << ENDPOINT << "Send the SETUP message"; |
| } |
| |
| void MoqtSession::OnSessionClosed(webtransport::SessionErrorCode, |
| const std::string& error_message) { |
| if (!error_.empty()) { |
| // Avoid erroring out twice. |
| return; |
| } |
| QUICHE_DLOG(INFO) << ENDPOINT << "Underlying session closed with message: " |
| << error_message; |
| error_ = error_message; |
| std::move(session_terminated_callback_)(error_message); |
| } |
| |
| void MoqtSession::OnIncomingBidirectionalStreamAvailable() { |
| while (webtransport::Stream* stream = |
| session_->AcceptIncomingBidirectionalStream()) { |
| if (control_stream_.has_value()) { |
| Error(MoqtError::kProtocolViolation, "Bidirectional stream already open"); |
| return; |
| } |
| stream->SetVisitor(std::make_unique<Stream>(this, stream)); |
| stream->visitor()->OnCanRead(); |
| } |
| } |
| void MoqtSession::OnIncomingUnidirectionalStreamAvailable() { |
| while (webtransport::Stream* stream = |
| session_->AcceptIncomingUnidirectionalStream()) { |
| stream->SetVisitor(std::make_unique<Stream>(this, stream)); |
| stream->visitor()->OnCanRead(); |
| } |
| } |
| |
| void MoqtSession::Error(MoqtError code, absl::string_view error) { |
| if (!error_.empty()) { |
| // Avoid erroring out twice. |
| return; |
| } |
| QUICHE_DLOG(INFO) << ENDPOINT << "MOQT session closed with code: " |
| << static_cast<int>(code) << " and message: " << error; |
| error_ = std::string(error); |
| session_->CloseSession(static_cast<uint64_t>(code), error); |
| std::move(session_terminated_callback_)(error); |
| } |
| |
| void MoqtSession::AddLocalTrack(const FullTrackName& full_track_name, |
| LocalTrack::Visitor* visitor) { |
| local_tracks_.try_emplace(full_track_name, full_track_name, |
| next_track_alias_++, visitor); |
| } |
| |
| // TODO: Create state that allows ANNOUNCE_OK/ERROR on spurious namespaces to |
| // trigger session errors. |
| void MoqtSession::Announce(absl::string_view track_namespace, |
| MoqtAnnounceCallback announce_callback) { |
| if (pending_outgoing_announces_.contains(track_namespace)) { |
| std::move(announce_callback)( |
| track_namespace, "ANNOUNCE message already outstanding for namespace"); |
| return; |
| } |
| MoqtAnnounce message; |
| message.track_namespace = track_namespace; |
| bool success = session_->GetStreamById(*control_stream_) |
| ->Write(framer_.SerializeAnnounce(message).AsStringView()); |
| if (!success) { |
| Error(MoqtError::kGenericError, "Failed to write ANNOUNCE message"); |
| return; |
| } |
| QUIC_DLOG(INFO) << ENDPOINT << "Sent ANNOUNCE message for " |
| << message.track_namespace; |
| pending_outgoing_announces_[track_namespace] = std::move(announce_callback); |
| } |
| |
| bool MoqtSession::HasSubscribers(const FullTrackName& full_track_name) const { |
| auto it = local_tracks_.find(full_track_name); |
| return (it != local_tracks_.end() && it->second.HasSubscriber()); |
| } |
| |
| bool MoqtSession::SubscribeAbsolute(absl::string_view track_namespace, |
| absl::string_view name, |
| uint64_t start_group, uint64_t start_object, |
| RemoteTrack::Visitor* visitor, |
| absl::string_view auth_info) { |
| MoqtSubscribeRequest message; |
| message.track_namespace = track_namespace; |
| message.track_name = name; |
| message.start_group = MoqtSubscribeLocation(true, start_group); |
| message.start_object = MoqtSubscribeLocation(true, start_object); |
| message.end_group = std::nullopt; |
| message.end_object = std::nullopt; |
| if (!auth_info.empty()) { |
| message.authorization_info = std::move(auth_info); |
| } |
| return Subscribe(message, visitor); |
| } |
| |
| bool MoqtSession::SubscribeAbsolute(absl::string_view track_namespace, |
| absl::string_view name, |
| uint64_t start_group, uint64_t start_object, |
| uint64_t end_group, uint64_t end_object, |
| RemoteTrack::Visitor* visitor, |
| absl::string_view auth_info) { |
| if (end_group < start_group) { |
| QUIC_DLOG(ERROR) << "Subscription end is before beginning"; |
| return false; |
| } |
| if (end_group == start_group && end_object < start_object) { |
| QUIC_DLOG(ERROR) << "Subscription end is before beginning"; |
| return false; |
| } |
| MoqtSubscribeRequest message; |
| message.track_namespace = track_namespace; |
| message.track_name = name; |
| message.start_group = MoqtSubscribeLocation(true, start_group); |
| message.start_object = MoqtSubscribeLocation(true, start_object); |
| message.end_group = MoqtSubscribeLocation(true, end_group); |
| message.end_object = MoqtSubscribeLocation(true, end_object); |
| if (!auth_info.empty()) { |
| message.authorization_info = std::move(auth_info); |
| } |
| return Subscribe(message, visitor); |
| } |
| |
| bool MoqtSession::SubscribeRelative(absl::string_view track_namespace, |
| absl::string_view name, int64_t start_group, |
| int64_t start_object, |
| RemoteTrack::Visitor* visitor, |
| absl::string_view auth_info) { |
| MoqtSubscribeRequest message; |
| message.track_namespace = track_namespace; |
| message.track_name = name; |
| message.start_group = MoqtSubscribeLocation(false, start_group); |
| message.start_object = MoqtSubscribeLocation(false, start_object); |
| message.end_group = std::nullopt; |
| message.end_object = std::nullopt; |
| if (!auth_info.empty()) { |
| message.authorization_info = std::move(auth_info); |
| } |
| return Subscribe(message, visitor); |
| } |
| |
| bool MoqtSession::SubscribeCurrentGroup(absl::string_view track_namespace, |
| absl::string_view name, |
| RemoteTrack::Visitor* visitor, |
| absl::string_view auth_info) { |
| MoqtSubscribeRequest message; |
| message.track_namespace = track_namespace; |
| message.track_name = name; |
| // First object of current group. |
| message.start_group = MoqtSubscribeLocation(false, (uint64_t)0); |
| message.start_object = MoqtSubscribeLocation(true, (int64_t)0); |
| message.end_group = std::nullopt; |
| message.end_object = std::nullopt; |
| if (!auth_info.empty()) { |
| message.authorization_info = std::move(auth_info); |
| } |
| return Subscribe(message, visitor); |
| } |
| |
| bool MoqtSession::Subscribe(const MoqtSubscribeRequest& message, |
| RemoteTrack::Visitor* visitor) { |
| // TODO(martinduke): support authorization info |
| bool success = |
| session_->GetStreamById(*control_stream_) |
| ->Write(framer_.SerializeSubscribeRequest(message).AsStringView()); |
| if (!success) { |
| Error(MoqtError::kGenericError, |
| "Failed to write SUBSCRIBE_REQUEST message"); |
| return false; |
| } |
| QUIC_DLOG(INFO) << ENDPOINT << "Sent SUBSCRIBE_REQUEST message for " |
| << message.track_namespace << ":" << message.track_name; |
| FullTrackName ftn(std::string(message.track_namespace), |
| std::string(message.track_name)); |
| remote_tracks_.try_emplace(ftn, ftn, visitor); |
| return true; |
| } |
| |
| std::optional<webtransport::StreamId> MoqtSession::OpenUnidirectionalStream() { |
| if (!session_->CanOpenNextOutgoingUnidirectionalStream()) { |
| return std::nullopt; |
| } |
| webtransport::Stream* new_stream = |
| session_->OpenOutgoingUnidirectionalStream(); |
| if (new_stream == nullptr) { |
| return std::nullopt; |
| } |
| new_stream->SetVisitor(std::make_unique<Stream>(this, new_stream, false)); |
| return new_stream->GetStreamId(); |
| } |
| |
| // increment object_sequence or group_sequence depending on |start_new_group| |
| void MoqtSession::PublishObjectToStream(webtransport::StreamId stream_id, |
| FullTrackName full_track_name, |
| bool start_new_group, |
| absl::string_view payload) { |
| // TODO: check that the peer is subscribed to the next sequence. |
| webtransport::Stream* stream = session_->GetStreamById(stream_id); |
| if (stream == nullptr) { |
| QUICHE_DLOG(ERROR) << ENDPOINT << "Sending OBJECT to nonexistent stream"; |
| return; |
| } |
| auto track_it = local_tracks_.find(full_track_name); |
| if (track_it == local_tracks_.end()) { |
| QUICHE_DLOG(ERROR) << ENDPOINT << "Sending OBJECT for nonexistent track"; |
| return; |
| } |
| MoqtObject object; |
| LocalTrack& track = track_it->second; |
| object.track_id = track.track_alias(); |
| FullSequence& next_sequence = track.next_sequence_mutable(); |
| object.group_sequence = next_sequence.group; |
| if (start_new_group) { |
| ++object.group_sequence; |
| object.object_sequence = 0; |
| } else { |
| object.object_sequence = next_sequence.object; |
| } |
| next_sequence.group = object.group_sequence; |
| next_sequence.object = object.object_sequence + 1; |
| if (!track.ShouldSend(object.group_sequence, object.object_sequence)) { |
| QUICHE_LOG(INFO) << ENDPOINT << "Not sending object " |
| << full_track_name.track_namespace << ":" |
| << full_track_name.track_name << " with sequence " |
| << object.group_sequence << ":" << object.object_sequence |
| << " because peer is not subscribed"; |
| return; |
| } |
| object.object_send_order = 0; |
| object.payload_length = payload.size(); |
| bool success = |
| stream->Write(framer_.SerializeObject(object, payload).AsStringView()); |
| if (!success) { |
| QUICHE_DLOG(ERROR) << ENDPOINT << "Failed to write OBJECT message"; |
| return; |
| } |
| QUICHE_LOG(INFO) << ENDPOINT << "Sending object " |
| << full_track_name.track_namespace << ":" |
| << full_track_name.track_name << " with sequence " |
| << object.group_sequence << ":" << object.object_sequence; |
| } |
| |
| void MoqtSession::Stream::OnCanRead() { |
| bool fin = |
| quiche::ProcessAllReadableRegions(*stream_, [&](absl::string_view chunk) { |
| parser_.ProcessData(chunk, /*end_of_stream=*/false); |
| }); |
| if (fin) { |
| parser_.ProcessData("", /*end_of_stream=*/true); |
| } |
| } |
| void MoqtSession::Stream::OnCanWrite() {} |
| void MoqtSession::Stream::OnResetStreamReceived( |
| webtransport::StreamErrorCode error) { |
| if (is_control_stream_.has_value() && *is_control_stream_) { |
| session_->Error( |
| MoqtError::kProtocolViolation, |
| absl::StrCat("Control stream reset with error code ", error)); |
| } |
| } |
| void MoqtSession::Stream::OnStopSendingReceived( |
| webtransport::StreamErrorCode error) { |
| if (is_control_stream_.has_value() && *is_control_stream_) { |
| session_->Error( |
| MoqtError::kProtocolViolation, |
| absl::StrCat("Control stream reset with error code ", error)); |
| } |
| } |
| |
| void MoqtSession::Stream::OnObjectMessage(const MoqtObject& message, |
| absl::string_view payload, |
| bool end_of_message) { |
| if (is_control_stream_ == true) { |
| session_->Error(MoqtError::kProtocolViolation, |
| "Received OBJECT message on control stream"); |
| return; |
| } |
| QUICHE_DLOG(INFO) << ENDPOINT << "Received OBJECT message on stream " |
| << stream_->GetStreamId() << " for track alias " |
| << message.track_id << " with sequence " |
| << message.group_sequence << ":" << message.object_sequence |
| << " length " << payload.size() << " explicit length " |
| << (message.payload_length.has_value() |
| ? (int)*message.payload_length |
| : -1) |
| << (end_of_message ? "F" : ""); |
| if (!session_->parameters_.deliver_partial_objects) { |
| if (!end_of_message) { // Buffer partial object. |
| absl::StrAppend(&partial_object_, payload); |
| return; |
| } |
| if (!partial_object_.empty()) { // Completes the object |
| absl::StrAppend(&partial_object_, payload); |
| payload = absl::string_view(partial_object_); |
| } |
| } |
| auto it = session_->tracks_by_alias_.find(message.track_id); |
| if (it == session_->tracks_by_alias_.end()) { |
| // No SUBSCRIBE_OK received with this alias, buffer it. |
| auto it2 = session_->object_queue_.find(message.track_id); |
| std::vector<BufferedObject>* queue; |
| if (it2 == session_->object_queue_.end()) { |
| queue = &session_->object_queue_[message.track_id]; |
| } else { |
| queue = &it2->second; |
| } |
| if (session_->num_buffered_objects_ >= kMaxBufferedObjects) { |
| session_->num_buffered_objects_++; |
| session_->Error(MoqtError::kGenericError, "Too many buffered objects"); |
| return; |
| } |
| queue->push_back(BufferedObject(stream_->GetStreamId(), message, payload, |
| end_of_message)); |
| QUIC_DLOG(INFO) << ENDPOINT << "Buffering OBJECT for track alias " |
| << message.track_id; |
| return; |
| } |
| RemoteTrack* subscription = it->second; |
| if (subscription->visitor() != nullptr) { |
| subscription->visitor()->OnObjectFragment( |
| subscription->full_track_name(), stream_->GetStreamId(), |
| message.group_sequence, message.object_sequence, |
| message.object_send_order, payload, end_of_message); |
| } |
| partial_object_.clear(); |
| } |
| |
| void MoqtSession::Stream::OnClientSetupMessage(const MoqtClientSetup& message) { |
| if (is_control_stream_.has_value()) { |
| if (!*is_control_stream_) { |
| session_->Error(MoqtError::kProtocolViolation, |
| "Received SETUP on non-control stream"); |
| return; |
| } |
| } else { |
| is_control_stream_ = true; |
| } |
| if (perspective() == Perspective::IS_CLIENT) { |
| session_->Error(MoqtError::kProtocolViolation, |
| "Received CLIENT_SETUP from server"); |
| return; |
| } |
| if (absl::c_find(message.supported_versions, session_->parameters_.version) == |
| message.supported_versions.end()) { |
| // TODO(martinduke): Is this the right error code? See issue #346. |
| session_->Error(MoqtError::kProtocolViolation, |
| absl::StrCat("Version mismatch: expected 0x", |
| absl::Hex(session_->parameters_.version))); |
| return; |
| } |
| QUICHE_DLOG(INFO) << ENDPOINT << "Received the SETUP message"; |
| if (session_->parameters_.perspective == Perspective::IS_SERVER) { |
| MoqtServerSetup response; |
| response.selected_version = session_->parameters_.version; |
| response.role = MoqtRole::kBoth; |
| bool success = stream_->Write( |
| session_->framer_.SerializeServerSetup(response).AsStringView()); |
| if (!success) { |
| session_->Error(MoqtError::kGenericError, |
| "Failed to write server SETUP message"); |
| return; |
| } |
| QUIC_DLOG(INFO) << ENDPOINT << "Sent the SETUP message"; |
| } |
| // TODO: handle role and path. |
| std::move(session_->session_established_callback_)(); |
| } |
| |
| void MoqtSession::Stream::OnServerSetupMessage(const MoqtServerSetup& message) { |
| if (is_control_stream_.has_value()) { |
| if (!*is_control_stream_) { |
| session_->Error(MoqtError::kProtocolViolation, |
| "Received SETUP on non-control stream"); |
| return; |
| } |
| } else { |
| is_control_stream_ = true; |
| } |
| if (perspective() == Perspective::IS_SERVER) { |
| session_->Error(MoqtError::kProtocolViolation, |
| "Received SERVER_SETUP from client"); |
| return; |
| } |
| if (message.selected_version != session_->parameters_.version) { |
| // TODO(martinduke): Is this the right error code? See issue #346. |
| session_->Error(MoqtError::kProtocolViolation, |
| absl::StrCat("Version mismatch: expected 0x", |
| absl::Hex(session_->parameters_.version))); |
| return; |
| } |
| QUIC_DLOG(INFO) << ENDPOINT << "Received the SETUP message"; |
| // TODO: handle role and path. |
| std::move(session_->session_established_callback_)(); |
| } |
| |
| void MoqtSession::Stream::SendSubscribeError( |
| const MoqtSubscribeRequest& message, SubscribeErrorCode error_code, |
| absl::string_view reason_phrase) { |
| MoqtSubscribeError subscribe_error; |
| subscribe_error.track_namespace = message.track_namespace; |
| subscribe_error.track_name = message.track_name; |
| subscribe_error.error_code = error_code; |
| subscribe_error.reason_phrase = reason_phrase; |
| bool success = |
| stream_->Write(session_->framer_.SerializeSubscribeError(subscribe_error) |
| .AsStringView()); |
| if (!success) { |
| session_->Error(MoqtError::kGenericError, |
| "Failed to write SUBSCRIBE_ERROR message"); |
| } |
| } |
| |
| void MoqtSession::Stream::OnSubscribeRequestMessage( |
| const MoqtSubscribeRequest& message) { |
| std::string reason_phrase = ""; |
| if (!CheckIfIsControlStream()) { |
| return; |
| } |
| QUIC_DLOG(INFO) << ENDPOINT << "Received a SUBSCRIBE_REQUEST for " |
| << message.track_namespace << ":" << message.track_name; |
| auto it = session_->local_tracks_.find(FullTrackName( |
| std::string(message.track_namespace), std::string(message.track_name))); |
| if (it == session_->local_tracks_.end()) { |
| QUIC_DLOG(INFO) << ENDPOINT << "Rejected because " |
| << message.track_namespace << ":" << message.track_name |
| << " does not exist"; |
| SendSubscribeError(message, SubscribeErrorCode::kGenericError, |
| "Track does not exist"); |
| return; |
| } |
| LocalTrack& track = it->second; |
| std::optional<FullSequence> start = session_->LocationToAbsoluteNumber( |
| track, message.start_group, message.start_object); |
| QUICHE_DCHECK(start.has_value()); // Parser enforces this. |
| std::optional<FullSequence> end = session_->LocationToAbsoluteNumber( |
| track, message.end_group, message.end_object); |
| if (start < track.next_sequence() && track.visitor() != nullptr) { |
| SubscribeWindow window = end.has_value() |
| ? SubscribeWindow(start->group, start->object, |
| end->group, end->object) |
| : SubscribeWindow(start->group, start->object); |
| std::optional<absl::string_view> past_objects_available = |
| track.visitor()->OnSubscribeRequestForPast(window); |
| if (!past_objects_available.has_value()) { |
| SendSubscribeError(message, SubscribeErrorCode::kGenericError, |
| "Object does not exist"); |
| return; |
| } |
| } |
| MoqtSubscribeOk subscribe_ok; |
| subscribe_ok.track_namespace = message.track_namespace; |
| subscribe_ok.track_name = message.track_name; |
| subscribe_ok.track_id = track.track_alias(); |
| bool success = stream_->Write( |
| session_->framer_.SerializeSubscribeOk(subscribe_ok).AsStringView()); |
| if (!success) { |
| session_->Error(MoqtError::kGenericError, |
| "Failed to write SUBSCRIBE_OK message"); |
| return; |
| } |
| QUIC_DLOG(INFO) << ENDPOINT << "Created subscription for " |
| << message.track_namespace << ":" << message.track_name; |
| if (!end.has_value()) { |
| track.AddWindow(SubscribeWindow(start->group, start->object)); |
| return; |
| } |
| track.AddWindow( |
| SubscribeWindow(start->group, start->object, end->group, end->object)); |
| } |
| |
| void MoqtSession::Stream::OnSubscribeOkMessage(const MoqtSubscribeOk& message) { |
| if (!CheckIfIsControlStream()) { |
| return; |
| } |
| if (session_->tracks_by_alias_.contains(message.track_id)) { |
| session_->Error(MoqtError::kDuplicateTrackAlias, |
| "Received duplicate track_alias"); |
| return; |
| } |
| auto it = session_->remote_tracks_.find(FullTrackName( |
| std::string(message.track_namespace), std::string(message.track_name))); |
| if (it == session_->remote_tracks_.end()) { |
| session_->Error(MoqtError::kProtocolViolation, |
| "Received SUBSCRIBE_OK for nonexistent subscribe"); |
| return; |
| } |
| // Note that if there are multiple SUBSCRIBE_OK for the same track, |
| // RemoteTrack.track_alias() will be the last alias received, but |
| // tracks_by_alias_ will have an entry for every track_alias received. |
| // TODO: revise this data structure to make it easier to clean up |
| // RemoteTracks, unless draft changes make it irrelevant. |
| QUIC_DLOG(INFO) << ENDPOINT << "Received the SUBSCRIBE_OK for " |
| << message.track_namespace << ":" << message.track_name |
| << ", track_alias = " << message.track_id; |
| RemoteTrack& track = it->second; |
| track.set_track_alias(message.track_id); |
| session_->tracks_by_alias_[message.track_id] = &track; |
| // TODO: handle expires. |
| if (track.visitor() != nullptr) { |
| track.visitor()->OnReply(track.full_track_name(), std::nullopt); |
| } |
| // Clear the buffer for this track alias. |
| auto it2 = session_->object_queue_.find(message.track_id); |
| if (it2 == session_->object_queue_.end() || track.visitor() == nullptr) { |
| // Nothing is buffered, or the app hasn't registered a visitor anyway. |
| return; |
| } |
| QUIC_DLOG(INFO) << ENDPOINT << "Processing buffered OBJECTs for track_alias " |
| << message.track_id; |
| std::vector<BufferedObject>& queue = it2->second; |
| for (BufferedObject& to_deliver : queue) { |
| track.visitor()->OnObjectFragment( |
| track.full_track_name(), to_deliver.stream_id, |
| to_deliver.message.group_sequence, to_deliver.message.object_sequence, |
| to_deliver.message.object_send_order, to_deliver.payload, |
| to_deliver.eom); |
| session_->num_buffered_objects_--; |
| } |
| session_->object_queue_.erase(it2); |
| } |
| |
| void MoqtSession::Stream::OnSubscribeErrorMessage( |
| const MoqtSubscribeError& message) { |
| if (!CheckIfIsControlStream()) { |
| return; |
| } |
| auto it = session_->remote_tracks_.find(FullTrackName( |
| std::string(message.track_namespace), std::string(message.track_name))); |
| if (it == session_->remote_tracks_.end()) { |
| session_->Error(MoqtError::kProtocolViolation, |
| "Received SUBSCRIBE_ERROR for nonexistent subscribe"); |
| return; |
| } |
| QUIC_DLOG(INFO) << ENDPOINT << "Received the SUBSCRIBE_ERROR for " |
| << message.track_namespace << ":" << message.track_name |
| << ", error = " << static_cast<int>(message.error_code) |
| << " (" << message.reason_phrase << ")"; |
| if (it->second.visitor() != nullptr) { |
| it->second.visitor()->OnReply(it->second.full_track_name(), |
| message.reason_phrase); |
| } |
| } |
| |
| void MoqtSession::Stream::OnAnnounceMessage(const MoqtAnnounce& message) { |
| if (!CheckIfIsControlStream()) { |
| return; |
| } |
| MoqtAnnounceOk ok; |
| ok.track_namespace = message.track_namespace; |
| bool success = |
| stream_->Write(session_->framer_.SerializeAnnounceOk(ok).AsStringView()); |
| if (!success) { |
| session_->Error(MoqtError::kGenericError, |
| "Failed to write ANNOUNCE_OK message"); |
| return; |
| } |
| } |
| |
| void MoqtSession::Stream::OnAnnounceOkMessage(const MoqtAnnounceOk& message) { |
| if (!CheckIfIsControlStream()) { |
| return; |
| } |
| auto it = session_->pending_outgoing_announces_.find(message.track_namespace); |
| if (it == session_->pending_outgoing_announces_.end()) { |
| session_->Error(MoqtError::kProtocolViolation, |
| "Received ANNOUNCE_OK for nonexistent announce"); |
| return; |
| } |
| std::move(it->second)(message.track_namespace, std::nullopt); |
| session_->pending_outgoing_announces_.erase(it); |
| } |
| |
| void MoqtSession::Stream::OnAnnounceErrorMessage( |
| const MoqtAnnounceError& message) { |
| if (!CheckIfIsControlStream()) { |
| return; |
| } |
| auto it = session_->pending_outgoing_announces_.find(message.track_namespace); |
| if (it == session_->pending_outgoing_announces_.end()) { |
| session_->Error(MoqtError::kProtocolViolation, |
| "Received ANNOUNCE_ERROR for nonexistent announce"); |
| return; |
| } |
| std::move(it->second)(message.track_namespace, message.reason_phrase); |
| session_->pending_outgoing_announces_.erase(it); |
| } |
| |
| void MoqtSession::Stream::OnParsingError(absl::string_view reason) { |
| session_->Error(MoqtError::kProtocolViolation, |
| absl::StrCat("Parse error: ", reason)); |
| } |
| |
| bool MoqtSession::Stream::CheckIfIsControlStream() { |
| if (!is_control_stream_.has_value()) { |
| session_->Error(MoqtError::kProtocolViolation, |
| "Received SUBSCRIBE_REQUEST as first message"); |
| return false; |
| } |
| if (!*is_control_stream_) { |
| session_->Error(MoqtError::kProtocolViolation, |
| "Received SUBSCRIBE_REQUEST on non-control stream"); |
| return false; |
| } |
| return true; |
| } |
| |
| std::optional<FullSequence> MoqtSession::LocationToAbsoluteNumber( |
| const LocalTrack& track, const std::optional<MoqtSubscribeLocation>& group, |
| const std::optional<MoqtSubscribeLocation>& object) { |
| FullSequence sequence; |
| if (!group.has_value() || !object.has_value()) { |
| return std::nullopt; |
| } |
| if (group->absolute) { |
| sequence.group = group->absolute_value; |
| } else { |
| sequence.group = track.next_sequence().group + group->relative_value; |
| } |
| if (object->absolute) { |
| sequence.object = object->absolute_value; |
| } else { |
| // Subtract 1 because the relative value is computed from the largest sent |
| // sequence number, not the next one. |
| sequence.object = track.next_sequence().object + object->relative_value - 1; |
| } |
| return sequence; |
| } |
| |
| } // namespace moqt |