blob: 9a2c404ee025d2b2cae38721e81a300f6e4e3708 [file] [log] [blame] [edit]
// Copyright 2024 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "quiche/quic/core/web_transport_write_blocked_list.h"
#include <cstddef>
#include <optional>
#include <string>
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_format.h"
#include "quiche/quic/core/quic_stream_priority.h"
#include "quiche/quic/core/quic_types.h"
#include "quiche/common/platform/api/quiche_bug_tracker.h"
#include "quiche/common/platform/api/quiche_logging.h"
namespace quic {
bool WebTransportWriteBlockedList::HasWriteBlockedDataStreams() const {
return main_schedule_.NumScheduledInPriorityRange(
std::nullopt, RemapUrgency(HttpStreamPriority::kMaximumUrgency,
/*is_http=*/true)) > 0;
}
size_t WebTransportWriteBlockedList::NumBlockedSpecialStreams() const {
return main_schedule_.NumScheduledInPriorityRange(
RemapUrgency(kStaticUrgency, /*is_http=*/false), std::nullopt);
}
size_t WebTransportWriteBlockedList::NumBlockedStreams() const {
size_t num_streams = main_schedule_.NumScheduled();
for (const auto& [key, scheduler] : web_transport_session_schedulers_) {
if (scheduler.HasScheduled()) {
num_streams += scheduler.NumScheduled();
// Account for the fact that the group itself has an entry in the main
// scheduler that does not correspond to any actual stream.
QUICHE_DCHECK(main_schedule_.IsScheduled(key));
--num_streams;
}
}
return num_streams;
}
void WebTransportWriteBlockedList::RegisterStream(
QuicStreamId stream_id, bool is_static_stream,
const QuicStreamPriority& raw_priority) {
QuicStreamPriority priority =
is_static_stream
? QuicStreamPriority(HttpStreamPriority{kStaticUrgency, true})
: raw_priority;
auto [unused, success] = priorities_.emplace(stream_id, priority);
if (!success) {
QUICHE_BUG(WTWriteBlocked_RegisterStream_already_registered)
<< "Tried to register stream " << stream_id
<< " that is already registered";
return;
}
if (priority.type() == QuicPriorityType::kHttp) {
absl::Status status = main_schedule_.Register(
ScheduleKey::HttpStream(stream_id),
RemapUrgency(priority.http().urgency, /*is_http=*/true));
QUICHE_BUG_IF(WTWriteBlocked_RegisterStream_http_scheduler, !status.ok())
<< status;
return;
}
QUICHE_DCHECK_EQ(priority.type(), QuicPriorityType::kWebTransport);
ScheduleKey group_key = ScheduleKey::WebTransportSession(priority);
auto [it, created_new] =
web_transport_session_schedulers_.try_emplace(group_key);
absl::Status status =
it->second.Register(stream_id, priority.web_transport().send_order);
QUICHE_BUG_IF(WTWriteBlocked_RegisterStream_data_scheduler, !status.ok())
<< status;
// If the group is new, register it with the main scheduler.
if (created_new) {
// The IETF draft requires the priority of data streams associated with an
// individual session to be equivalent to the priority of the control
// stream.
auto session_priority_it =
priorities_.find(priority.web_transport().session_id);
// It is possible for a stream to be (re-)registered while the control
// stream is already gone.
QUICHE_DLOG_IF(WARNING, session_priority_it == priorities_.end())
<< "Stream " << stream_id << " is associated with session ID "
<< priority.web_transport().session_id
<< ", but the session control stream is not registered; assuming "
"default urgency.";
QuicStreamPriority session_priority =
session_priority_it != priorities_.end() ? session_priority_it->second
: QuicStreamPriority();
status = main_schedule_.Register(
group_key,
RemapUrgency(session_priority.http().urgency, /*is_http=*/false));
QUICHE_BUG_IF(WTWriteBlocked_RegisterStream_main_scheduler, !status.ok())
<< status;
}
}
void WebTransportWriteBlockedList::UnregisterStream(QuicStreamId stream_id) {
auto map_it = priorities_.find(stream_id);
if (map_it == priorities_.end()) {
QUICHE_BUG(WTWriteBlocked_UnregisterStream_not_found)
<< "Stream " << stream_id << " not found";
return;
}
QuicStreamPriority priority = map_it->second;
priorities_.erase(map_it);
if (priority.type() != QuicPriorityType::kWebTransport) {
absl::Status status =
main_schedule_.Unregister(ScheduleKey::HttpStream(stream_id));
QUICHE_BUG_IF(WTWriteBlocked_UnregisterStream_http, !status.ok()) << status;
return;
}
ScheduleKey key = ScheduleKey::WebTransportSession(priority);
auto subscheduler_it = web_transport_session_schedulers_.find(key);
if (subscheduler_it == web_transport_session_schedulers_.end()) {
QUICHE_BUG(WTWriteBlocked_UnregisterStream_no_subscheduler)
<< "Stream " << stream_id
<< " is a WebTransport data stream, but has no scheduler for the "
"associated group";
return;
}
Subscheduler& subscheduler = subscheduler_it->second;
absl::Status status = subscheduler.Unregister(stream_id);
QUICHE_BUG_IF(WTWriteBlocked_UnregisterStream_subscheduler_stream_failed,
!status.ok())
<< status;
// If this is the last stream associated with the group, remove the group.
if (!subscheduler.HasRegistered()) {
status = main_schedule_.Unregister(key);
QUICHE_BUG_IF(WTWriteBlocked_UnregisterStream_subscheduler_failed,
!status.ok())
<< status;
web_transport_session_schedulers_.erase(subscheduler_it);
}
}
void WebTransportWriteBlockedList::UpdateStreamPriority(
QuicStreamId stream_id, const QuicStreamPriority& new_priority) {
QuicStreamPriority old_priority = GetPriorityOfStream(stream_id);
if (old_priority == new_priority) {
return;
}
bool was_blocked = IsStreamBlocked(stream_id);
UnregisterStream(stream_id);
RegisterStream(stream_id, /*is_static_stream=*/false, new_priority);
if (was_blocked) {
AddStream(stream_id);
}
if (new_priority.type() == QuicPriorityType::kHttp) {
for (auto& [key, subscheduler] : web_transport_session_schedulers_) {
QUICHE_DCHECK(key.has_group());
if (key.stream() == stream_id) {
absl::Status status =
main_schedule_.UpdatePriority(key, new_priority.http().urgency);
QUICHE_BUG_IF(WTWriteBlocked_UpdateStreamPriority_subscheduler_failed,
!status.ok())
<< status;
}
}
}
}
QuicStreamId WebTransportWriteBlockedList::PopFront() {
absl::StatusOr<ScheduleKey> main_key = main_schedule_.PopFront();
if (!main_key.ok()) {
QUICHE_BUG(WTWriteBlocked_PopFront_no_streams)
<< "PopFront() called when no streams scheduled: " << main_key.status();
return 0;
}
if (!main_key->has_group()) {
return main_key->stream();
}
auto it = web_transport_session_schedulers_.find(*main_key);
if (it == web_transport_session_schedulers_.end()) {
QUICHE_BUG(WTWriteBlocked_PopFront_no_subscheduler)
<< "Subscheduler for WebTransport group " << main_key->DebugString()
<< " not found";
return 0;
}
Subscheduler& subscheduler = it->second;
absl::StatusOr<QuicStreamId> result = subscheduler.PopFront();
if (!result.ok()) {
QUICHE_BUG(WTWriteBlocked_PopFront_subscheduler_empty)
<< "Subscheduler for group " << main_key->DebugString()
<< " is empty while in the main schedule";
return 0;
}
if (subscheduler.HasScheduled()) {
absl::Status status = main_schedule_.Schedule(*main_key);
QUICHE_BUG_IF(WTWriteBlocked_PopFront_reschedule_group, !status.ok())
<< status;
}
return *result;
}
void WebTransportWriteBlockedList::AddStream(QuicStreamId stream_id) {
QuicStreamPriority priority = GetPriorityOfStream(stream_id);
absl::Status status;
switch (priority.type()) {
case QuicPriorityType::kHttp:
status = main_schedule_.Schedule(ScheduleKey::HttpStream(stream_id));
QUICHE_BUG_IF(WTWriteBlocked_AddStream_http, !status.ok()) << status;
break;
case QuicPriorityType::kWebTransport:
status =
main_schedule_.Schedule(ScheduleKey::WebTransportSession(priority));
QUICHE_BUG_IF(WTWriteBlocked_AddStream_wt_main, !status.ok()) << status;
auto it = web_transport_session_schedulers_.find(
ScheduleKey::WebTransportSession(priority));
if (it == web_transport_session_schedulers_.end()) {
QUICHE_BUG(WTWriteBlocked_AddStream_no_subscheduler)
<< ScheduleKey::WebTransportSession(priority);
return;
}
Subscheduler& subscheduler = it->second;
status = subscheduler.Schedule(stream_id);
QUICHE_BUG_IF(WTWriteBlocked_AddStream_wt_sub, !status.ok()) << status;
break;
}
}
bool WebTransportWriteBlockedList::IsStreamBlocked(
QuicStreamId stream_id) const {
QuicStreamPriority priority = GetPriorityOfStream(stream_id);
switch (priority.type()) {
case QuicPriorityType::kHttp:
return main_schedule_.IsScheduled(ScheduleKey::HttpStream(stream_id));
case QuicPriorityType::kWebTransport:
auto it = web_transport_session_schedulers_.find(
ScheduleKey::WebTransportSession(priority));
if (it == web_transport_session_schedulers_.end()) {
QUICHE_BUG(WTWriteBlocked_IsStreamBlocked_no_subscheduler)
<< ScheduleKey::WebTransportSession(priority);
return false;
}
const Subscheduler& subscheduler = it->second;
return subscheduler.IsScheduled(stream_id);
}
QUICHE_NOTREACHED();
return false;
}
QuicStreamPriority WebTransportWriteBlockedList::GetPriorityOfStream(
QuicStreamId id) const {
auto it = priorities_.find(id);
if (it == priorities_.end()) {
QUICHE_BUG(WTWriteBlocked_GetPriorityOfStream_not_found)
<< "Stream " << id << " not found";
return QuicStreamPriority();
}
return it->second;
}
std::string WebTransportWriteBlockedList::ScheduleKey::DebugString() const {
return absl::StrFormat("(%d, %d)", stream_, group_);
}
bool WebTransportWriteBlockedList::ShouldYield(QuicStreamId id) const {
QuicStreamPriority priority = GetPriorityOfStream(id);
if (priority.type() == QuicPriorityType::kHttp) {
absl::StatusOr<bool> should_yield =
main_schedule_.ShouldYield(ScheduleKey::HttpStream(id));
QUICHE_BUG_IF(WTWriteBlocked_ShouldYield_http, !should_yield.ok())
<< should_yield.status();
return *should_yield;
}
QUICHE_DCHECK_EQ(priority.type(), QuicPriorityType::kWebTransport);
absl::StatusOr<bool> should_yield =
main_schedule_.ShouldYield(ScheduleKey::WebTransportSession(priority));
QUICHE_BUG_IF(WTWriteBlocked_ShouldYield_wt_main, !should_yield.ok())
<< should_yield.status();
if (*should_yield) {
return true;
}
auto it = web_transport_session_schedulers_.find(
ScheduleKey::WebTransportSession(priority));
if (it == web_transport_session_schedulers_.end()) {
QUICHE_BUG(WTWriteBlocked_ShouldYield_subscheduler_not_found)
<< "Subscheduler not found for "
<< ScheduleKey::WebTransportSession(priority);
return false;
}
const Subscheduler& subscheduler = it->second;
should_yield = subscheduler.ShouldYield(id);
QUICHE_BUG_IF(WTWriteBlocked_ShouldYield_wt_subscheduler, !should_yield.ok())
<< should_yield.status();
return *should_yield;
}
} // namespace quic