| // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "quiche/quic/test_tools/simulator/queue.h" |
| |
| #include "quiche/quic/platform/api/quic_logging.h" |
| #include "quiche/quic/test_tools/simulator/simulator.h" |
| |
| namespace quic { |
| namespace simulator { |
| |
| Queue::ListenerInterface::~ListenerInterface() {} |
| |
| Queue::Queue(Simulator* simulator, std::string name, QuicByteCount capacity) |
| : Actor(simulator, name), |
| capacity_(capacity), |
| bytes_queued_(0), |
| aggregation_threshold_(0), |
| aggregation_timeout_(QuicTime::Delta::Infinite()), |
| current_bundle_(0), |
| current_bundle_bytes_(0), |
| tx_port_(nullptr), |
| listener_(nullptr) { |
| aggregation_timeout_alarm_.reset(simulator_->GetAlarmFactory()->CreateAlarm( |
| new AggregationAlarmDelegate(this))); |
| } |
| |
| Queue::~Queue() { aggregation_timeout_alarm_->PermanentCancel(); } |
| |
| void Queue::set_tx_port(ConstrainedPortInterface* port) { tx_port_ = port; } |
| |
| void Queue::AcceptPacket(std::unique_ptr<Packet> packet) { |
| if (packet->size + bytes_queued_ > capacity_) { |
| QUIC_DVLOG(1) << "Queue [" << name() << "] has received a packet from [" |
| << packet->source << "] to [" << packet->destination |
| << "] which is over capacity. Dropping it."; |
| QUIC_DVLOG(1) << "Queue size: " << bytes_queued_ << " out of " << capacity_ |
| << ". Packet size: " << packet->size; |
| return; |
| } |
| |
| bytes_queued_ += packet->size; |
| queue_.emplace_back(std::move(packet), current_bundle_); |
| |
| if (IsAggregationEnabled()) { |
| current_bundle_bytes_ += queue_.front().packet->size; |
| if (!aggregation_timeout_alarm_->IsSet()) { |
| aggregation_timeout_alarm_->Set(clock_->Now() + aggregation_timeout_); |
| } |
| if (current_bundle_bytes_ >= aggregation_threshold_) { |
| NextBundle(); |
| } |
| } |
| |
| ScheduleNextPacketDequeue(); |
| } |
| |
| void Queue::Act() { |
| QUICHE_DCHECK(!queue_.empty()); |
| if (tx_port_->TimeUntilAvailable().IsZero()) { |
| QUICHE_DCHECK(bytes_queued_ >= queue_.front().packet->size); |
| bytes_queued_ -= queue_.front().packet->size; |
| |
| tx_port_->AcceptPacket(std::move(queue_.front().packet)); |
| queue_.pop_front(); |
| if (listener_ != nullptr) { |
| listener_->OnPacketDequeued(); |
| } |
| } |
| |
| ScheduleNextPacketDequeue(); |
| } |
| |
| void Queue::EnableAggregation(QuicByteCount aggregation_threshold, |
| QuicTime::Delta aggregation_timeout) { |
| QUICHE_DCHECK_EQ(bytes_queued_, 0u); |
| QUICHE_DCHECK_GT(aggregation_threshold, 0u); |
| QUICHE_DCHECK(!aggregation_timeout.IsZero()); |
| QUICHE_DCHECK(!aggregation_timeout.IsInfinite()); |
| |
| aggregation_threshold_ = aggregation_threshold; |
| aggregation_timeout_ = aggregation_timeout; |
| } |
| |
| Queue::AggregationAlarmDelegate::AggregationAlarmDelegate(Queue* queue) |
| : queue_(queue) {} |
| |
| void Queue::AggregationAlarmDelegate::OnAlarm() { |
| queue_->NextBundle(); |
| queue_->ScheduleNextPacketDequeue(); |
| } |
| |
| Queue::EnqueuedPacket::EnqueuedPacket(std::unique_ptr<Packet> packet, |
| AggregationBundleNumber bundle) |
| : packet(std::move(packet)), bundle(bundle) {} |
| |
| Queue::EnqueuedPacket::EnqueuedPacket(EnqueuedPacket&& other) = default; |
| |
| Queue::EnqueuedPacket::~EnqueuedPacket() = default; |
| |
| void Queue::NextBundle() { |
| current_bundle_++; |
| current_bundle_bytes_ = 0; |
| aggregation_timeout_alarm_->Cancel(); |
| } |
| |
| void Queue::ScheduleNextPacketDequeue() { |
| if (queue_.empty()) { |
| QUICHE_DCHECK_EQ(bytes_queued_, 0u); |
| return; |
| } |
| |
| if (IsAggregationEnabled() && queue_.front().bundle == current_bundle_) { |
| return; |
| } |
| |
| QuicTime::Delta time_until_available = QuicTime::Delta::Zero(); |
| if (tx_port_) { |
| time_until_available = tx_port_->TimeUntilAvailable(); |
| } |
| |
| Schedule(clock_->Now() + time_until_available); |
| } |
| |
| } // namespace simulator |
| } // namespace quic |