Add functions to enable/disable QuartcPeer and obtain the last sequence numbers
sent by each of its sources.
These functions allow a test to stop sending and drain network traffic before
closing the connection, by disabling both peers and waiting until each receives
the sequence number last sent by the other.
Note that this might not work perfectly on lossy networks, as the last message
may be dropped. In this case, draining should use a timeout. For example, the
receiver may stop waiting based if it does not receive any frames for an RTT.
gfe-relnote: n/a (Quartc test-only)
PiperOrigin-RevId: 243880014
Change-Id: I17eb39fe9db65138094be885ba2330936dbeac4e
diff --git a/quic/quartc/test/quartc_data_source.cc b/quic/quartc/test/quartc_data_source.cc
index 22d4716..c109325 100644
--- a/quic/quartc/test/quartc_data_source.cc
+++ b/quic/quartc/test/quartc_data_source.cc
@@ -100,7 +100,7 @@
return std::max(bandwidth - allocated_bandwidth_, QuicBandwidth::Zero());
}
-bool QuartcDataSource::Enabled() {
+bool QuartcDataSource::Enabled() const {
return send_alarm_->IsSet();
}
diff --git a/quic/quartc/test/quartc_data_source.h b/quic/quartc/test/quartc_data_source.h
index 25717b0..557a1de 100644
--- a/quic/quartc/test/quartc_data_source.h
+++ b/quic/quartc/test/quartc_data_source.h
@@ -85,9 +85,13 @@
// Whether the data source is enabled. The data source only produces data
// when enabled. When first enabled, the data source starts sending
// immediately. When disabled, the data source stops sending immediately.
- bool Enabled();
+ bool Enabled() const;
void SetEnabled(bool value);
+ // Returns the sequence number of the last frame generated (or -1 if no frames
+ // have been generated).
+ int64_t sequence_number() const { return sequence_number_ - 1; }
+
private:
void GenerateFrame(QuicByteCount frame_size, QuicTime now);
diff --git a/quic/quartc/test/quartc_peer.cc b/quic/quartc/test/quartc_peer.cc
index 5970969..32425c7 100644
--- a/quic/quartc/test/quartc_peer.cc
+++ b/quic/quartc/test/quartc_peer.cc
@@ -16,6 +16,7 @@
: clock_(clock),
alarm_factory_(alarm_factory),
random_(random),
+ enabled_(false),
session_(nullptr),
configs_(configs) {}
@@ -23,6 +24,22 @@
session_->CloseConnection("~QuartcPeer()");
}
+void QuartcPeer::SetEnabled(bool value) {
+ enabled_ = value;
+ for (auto& source : data_sources_) {
+ source->SetEnabled(enabled_);
+ }
+}
+
+std::map<int32_t, int64_t> QuartcPeer::GetLastSequenceNumbers() const {
+ DCHECK_GE(configs_.size(), data_sources_.size());
+ std::map<int32_t, int64_t> out;
+ for (int i = 0; i < data_sources_.size(); ++i) {
+ out[configs_[i].id] = data_sources_[i]->sequence_number();
+ }
+ return out;
+}
+
void QuartcPeer::OnSessionCreated(QuartcSession* session) {
session_ = session;
@@ -49,21 +66,15 @@
const std::string& error_details) {
QUIC_LOG(WARNING) << "Connect failed, error=" << error
<< ", details=" << error_details;
- for (auto& source : data_sources_) {
- source->SetEnabled(false);
- }
+ SetEnabled(false);
}
void QuartcPeer::OnCryptoHandshakeComplete() {
- for (auto& source : data_sources_) {
- source->SetEnabled(true);
- }
+ SetEnabled(true);
}
void QuartcPeer::OnConnectionWritable() {
- for (auto& source : data_sources_) {
- source->SetEnabled(true);
- }
+ SetEnabled(true);
}
void QuartcPeer::OnIncomingStream(QuartcStream* stream) {
@@ -89,9 +100,7 @@
ConnectionCloseSource source) {
QUIC_LOG(INFO) << "Connection closed, error=" << error_code
<< ", details=" << error_details;
- for (auto& source : data_sources_) {
- source->SetEnabled(false);
- }
+ SetEnabled(false);
}
void QuartcPeer::OnMessageReceived(QuicStringPiece message) {
diff --git a/quic/quartc/test/quartc_peer.h b/quic/quartc/test/quartc_peer.h
index 502024c..32b29be 100644
--- a/quic/quartc/test/quartc_peer.h
+++ b/quic/quartc/test/quartc_peer.h
@@ -51,11 +51,22 @@
~QuartcPeer();
+ // Enable or disable this peer. Disabling a peer causes it to stop sending
+ // messages (which may be useful for flushing data during tests).
+ // A peer begins disabled. It automatically enables itself as soon as its
+ // session becomes writable, and disables itself when its session closes.
+ bool Enabled() const { return enabled_; }
+ void SetEnabled(bool value);
+
// Messages received from the peer, in the order they were received.
const std::vector<ReceivedMessage>& received_messages() const {
return received_messages_;
}
+ // Returns a map of source id to the sequence number of the last frame
+ // produced by that source.
+ std::map<int32_t, int64_t> GetLastSequenceNumbers() const;
+
// QuartcEndpoint::Delegate overrides.
void OnSessionCreated(QuartcSession* session) override;
void OnConnectError(QuicErrorCode error,
@@ -81,6 +92,9 @@
QuicAlarmFactory* alarm_factory_;
QuicRandom* random_;
+ // Whether the peer is currently sending.
+ bool enabled_;
+
// Session used for sending and receiving data. Not owned. Created by an
// external QuartcEndpoint and set in the |OnSessionCreated| callback.
QuartcSession* session_;
diff --git a/quic/quartc/test/quartc_peer_test.cc b/quic/quartc/test/quartc_peer_test.cc
index 06a7432..cf1e2f1 100644
--- a/quic/quartc/test/quartc_peer_test.cc
+++ b/quic/quartc/test/quartc_peer_test.cc
@@ -362,6 +362,37 @@
source_3_size);
}
+TEST_F(QuartcPeerTest, DisableAndDrainMessages) {
+ QuartcDataSource::Config config;
+ config.id = 1;
+ config.max_bandwidth = client_server_link_.bandwidth() * 0.5;
+ config.frame_interval = QuicTime::Delta::FromMilliseconds(10);
+
+ CreatePeers({config});
+ Connect();
+
+ simulator_.RunFor(QuicTime::Delta::FromSeconds(15));
+
+ // After these calls, we should observe no new messages.
+ server_peer_->SetEnabled(false);
+ client_peer_->SetEnabled(false);
+
+ std::map<int32_t, int64_t> last_sent_by_client =
+ client_peer_->GetLastSequenceNumbers();
+ std::map<int32_t, int64_t> last_sent_by_server =
+ server_peer_->GetLastSequenceNumbers();
+
+ simulator_.RunFor(QuicTime::Delta::FromSeconds(15));
+
+ ASSERT_FALSE(client_peer_->received_messages().empty());
+ EXPECT_EQ(client_peer_->received_messages().back().frame.sequence_number,
+ last_sent_by_server[1]);
+
+ ASSERT_FALSE(server_peer_->received_messages().empty());
+ EXPECT_EQ(server_peer_->received_messages().back().frame.sequence_number,
+ last_sent_by_client[1]);
+}
+
} // namespace
} // namespace test
} // namespace quic