Create a test utility for sending and receiving data from QuartcDataSources.

QuartcPeer serves as a delegate for QuartcEndpoint, QuartcSession, and
QuartcDataSource.  It does not own a QuartcEndpoint--it instead expects the
endpoint to be set up by a test.  It waits for the endpoint to create a session,
then initializes QuartcDataSources according to a vector of configs.  It then
sends the data produced by those sources as QUIC datagrams.

QuartcPeer is responsible for adapting the data sources to the interface
provided by the session.  It adjusts bitrates and maximum frame sizes in order
to ensure that ensure that the data produced fits in the session (in accordance
with the session's max datagram frame size and bandwidth estimate).

QuartcPeer tracks messages it receives.  It parses each message as a frame sent
by QuartcDataSource, then attaches a receive timestamp.

This cl also fixes some bugs in QuartcDataSource.  It now produces at least
enough data to include the full frame header in each of its frames.  It no
longer reschedules the send alarm if SetEnabled(true) is called when the source
is already enabled.

gfe-relnote: n/a (Quartc only)
PiperOrigin-RevId: 243879297
Change-Id: I79412e9582fbbe98eaeaf33dca3ee621261c4846
diff --git a/quic/quartc/test/quartc_data_source.cc b/quic/quartc/test/quartc_data_source.cc
index 8f81ccb..22d4716 100644
--- a/quic/quartc/test/quartc_data_source.cc
+++ b/quic/quartc/test/quartc_data_source.cc
@@ -48,6 +48,7 @@
   out->sequence_number = sequence_number;
   out->send_time =
       QuicTime::Zero() + QuicTime::Delta::FromMicroseconds(time_bits);
+  out->size = data.size();
   out->payload = std::string(payload.data(), payload.size());
 
   return true;
@@ -81,10 +82,8 @@
   }
 
   QuicByteCount bytes =
-      std::max(kDataFrameHeaderSize,
-               allocated_bandwidth_.ToBytesPerPeriod(time_since_last_send));
-  while (config_.max_frame_size >= kDataFrameHeaderSize &&
-         bytes > config_.max_frame_size) {
+      allocated_bandwidth_.ToBytesPerPeriod(time_since_last_send);
+  while (config_.max_frame_size > 0 && bytes > config_.max_frame_size) {
     GenerateFrame(config_.max_frame_size, now);
     bytes -= config_.max_frame_size;
   }
@@ -106,9 +105,11 @@
 }
 
 void QuartcDataSource::SetEnabled(bool value) {
-  if (value) {
-    send_alarm_->Set(clock_->Now());
-  } else {
+  if (Enabled() == value) {
+    return;
+  }
+
+  if (!value) {
     send_alarm_->Cancel();
 
     // Reset the last send time.  When re-enabled, the data source should
@@ -116,10 +117,14 @@
     // bandwidth allocation and frame interval, not a huge frame accounting for
     // all the time since it was disabled.
     last_send_time_ = QuicTime::Zero();
+    return;
   }
+
+  send_alarm_->Set(clock_->Now());
 }
 
 void QuartcDataSource::GenerateFrame(QuicByteCount frame_size, QuicTime now) {
+  frame_size = std::max(frame_size, kDataFrameHeaderSize);
   if (buffer_.size() < frame_size) {
     buffer_.resize(frame_size);
   }
diff --git a/quic/quartc/test/quartc_data_source.h b/quic/quartc/test/quartc_data_source.h
index 0e851d3..25717b0 100644
--- a/quic/quartc/test/quartc_data_source.h
+++ b/quic/quartc/test/quartc_data_source.h
@@ -31,6 +31,9 @@
   int32_t source_id = -1;
   int64_t sequence_number = -1;
   QuicTime send_time = QuicTime::Zero();
+
+  // Total size, including header and payload.
+  QuicByteCount size = 0;
   std::string payload;
 };
 
@@ -52,9 +55,10 @@
     QuicTime::Delta frame_interval = QuicTime::Delta::FromMilliseconds(10);
 
     // Maximum size of frames produced by this source.  If this value is greater
-    // than or equal to kDataFrameHeaderSize, the source may produce multiple
-    // frames with the same timestamp rather than a single frame that is larger
-    // than this size.  If less than kDataFrameHeaderSize, the value is ignored.
+    // than 0, the source may produce multiple frames with the same timestamp
+    // rather than a single frame that is larger than this size.
+    // If less than kDataFrameHeaderSize, the source produces frames of
+    // kDataFrameHeaderSize.
     QuicByteCount max_frame_size = 0;
   };
 
diff --git a/quic/quartc/test/quartc_data_source_test.cc b/quic/quartc/test/quartc_data_source_test.cc
index d9a1cfe..88029b8 100644
--- a/quic/quartc/test/quartc_data_source_test.cc
+++ b/quic/quartc/test/quartc_data_source_test.cc
@@ -118,6 +118,35 @@
             21 * config.frame_interval);
 }
 
+TEST_F(QuartcDataSourceTest, EnablingTwiceDoesNotChangeSchedule) {
+  QuartcDataSource::Config config;
+  config.frame_interval = QuicTime::Delta::FromMilliseconds(20);
+
+  source_ = QuicMakeUnique<QuartcDataSource>(
+      simulator_.GetClock(), simulator_.GetAlarmFactory(),
+      simulator_.GetRandomGenerator(), config, &delegate_);
+  source_->AllocateBandwidth(
+      QuicBandwidth::FromBytesAndTimeDelta(1000, config.frame_interval));
+
+  // The first frame is produced immediately (but asynchronously) upon enabling
+  // the source.
+  source_->SetEnabled(true);
+  simulator_.RunFor(QuicTime::Delta::FromMicroseconds(1));
+  EXPECT_EQ(delegate_.frames().size(), 1);
+
+  // Enabling the source again does not re-schedule the alarm.
+  source_->SetEnabled(true);
+  simulator_.RunFor(QuicTime::Delta::FromMicroseconds(1));
+  EXPECT_EQ(delegate_.frames().size(), 1);
+
+  // The second frame is sent at the expected interval after the first.
+  ASSERT_TRUE(
+      simulator_.RunUntil([this] { return delegate_.frames().size() == 2; }));
+
+  EXPECT_EQ(delegate_.frames()[1].send_time - delegate_.frames()[0].send_time,
+            config.frame_interval);
+}
+
 TEST_F(QuartcDataSourceTest, ProducesFramesWithConfiguredSourceId) {
   QuartcDataSource::Config config;
   config.id = 7;
@@ -148,9 +177,10 @@
   ASSERT_EQ(delegate_.frames().size(), 1);
   EXPECT_EQ(delegate_.frames()[0].payload.size(),
             bytes_per_frame - kDataFrameHeaderSize);
+  EXPECT_EQ(delegate_.frames()[0].size, bytes_per_frame);
 }
 
-TEST_F(QuartcDataSourceTest, AlwaysProducesParseableHeader) {
+TEST_F(QuartcDataSourceTest, ProducesParseableHeaderWhenNotEnoughBandwidth) {
   QuartcDataSource::Config config;
   source_ = QuicMakeUnique<QuartcDataSource>(
       simulator_.GetClock(), simulator_.GetAlarmFactory(),
@@ -166,6 +196,7 @@
 
   ASSERT_EQ(delegate_.frames().size(), 1);
   EXPECT_EQ(delegate_.frames()[0].payload.size(), 0);
+  EXPECT_EQ(delegate_.frames()[0].size, kDataFrameHeaderSize);
 
   // Header fields are still present and parseable.
   EXPECT_EQ(delegate_.frames()[0].source_id, 0);
@@ -225,10 +256,12 @@
   simulator_.RunFor(config.frame_interval);
 
   // The frames produced use min_bandwidth instead of the lower allocation.
+  QuicByteCount bytes_per_frame =
+      config.min_bandwidth.ToBytesPerPeriod(config.frame_interval);
   ASSERT_EQ(delegate_.frames().size(), 1);
   EXPECT_EQ(delegate_.frames()[0].payload.size(),
-            config.min_bandwidth.ToBytesPerPeriod(config.frame_interval) -
-                kDataFrameHeaderSize);
+            bytes_per_frame - kDataFrameHeaderSize);
+  EXPECT_EQ(delegate_.frames()[0].size, bytes_per_frame);
 }
 
 TEST_F(QuartcDataSourceTest, AllocateClampsToMax) {
@@ -247,10 +280,12 @@
   simulator_.RunFor(config.frame_interval);
 
   // The frames produced use max_bandwidth instead of the higher allocation.
+  QuicByteCount bytes_per_frame =
+      config.max_bandwidth.ToBytesPerPeriod(config.frame_interval);
   ASSERT_EQ(delegate_.frames().size(), 1);
   EXPECT_EQ(delegate_.frames()[0].payload.size(),
-            config.max_bandwidth.ToBytesPerPeriod(config.frame_interval) -
-                kDataFrameHeaderSize);
+            bytes_per_frame - kDataFrameHeaderSize);
+  EXPECT_EQ(delegate_.frames()[0].size, bytes_per_frame);
 }
 
 TEST_F(QuartcDataSourceTest, MaxFrameSize) {
@@ -282,9 +317,64 @@
 
     // Each of the frames should have the configured maximum size.
     EXPECT_EQ(frame.payload.size(), bytes_per_frame - kDataFrameHeaderSize);
+    EXPECT_EQ(frame.size, bytes_per_frame);
   }
 }
 
+TEST_F(QuartcDataSourceTest, ProducesParseableHeaderWhenMaxFrameSizeTooSmall) {
+  QuartcDataSource::Config config;
+  config.max_frame_size = kDataFrameHeaderSize - 1;
+  source_ = QuicMakeUnique<QuartcDataSource>(
+      simulator_.GetClock(), simulator_.GetAlarmFactory(),
+      simulator_.GetRandomGenerator(), config, &delegate_);
+
+  source_->AllocateBandwidth(
+      QuicBandwidth::FromBytesAndTimeDelta(200, config.frame_interval));
+  source_->SetEnabled(true);
+
+  QuicTime start_time = simulator_.GetClock()->Now();
+  simulator_.RunFor(config.frame_interval);
+
+  ASSERT_GE(delegate_.frames().size(), 1);
+  EXPECT_EQ(delegate_.frames()[0].payload.size(), 0);
+  EXPECT_EQ(delegate_.frames()[0].size, kDataFrameHeaderSize);
+
+  // Header fields are still present and parseable.
+  EXPECT_EQ(delegate_.frames()[0].source_id, 0);
+  EXPECT_EQ(delegate_.frames()[0].sequence_number, 0);
+  EXPECT_EQ(delegate_.frames()[0].send_time, start_time);
+}
+
+TEST_F(QuartcDataSourceTest, ProducesParseableHeaderWhenLeftoverSizeTooSmall) {
+  QuartcDataSource::Config config;
+  config.max_frame_size = 200;
+  source_ = QuicMakeUnique<QuartcDataSource>(
+      simulator_.GetClock(), simulator_.GetAlarmFactory(),
+      simulator_.GetRandomGenerator(), config, &delegate_);
+
+  // Allocate enough bandwidth to send a 200-byte frame and a 1-byte frame.
+  source_->AllocateBandwidth(
+      QuicBandwidth::FromBytesAndTimeDelta(201, config.frame_interval));
+  source_->SetEnabled(true);
+
+  QuicTime start_time = simulator_.GetClock()->Now();
+  simulator_.RunFor(config.frame_interval);
+
+  ASSERT_EQ(delegate_.frames().size(), 2);
+  EXPECT_EQ(delegate_.frames()[0].payload.size(), 200 - kDataFrameHeaderSize);
+  EXPECT_EQ(delegate_.frames()[0].size, 200);
+
+  // The second frame, using the 1 leftover byte from the first, rounds up to
+  // the minimum frame size (just the header and no payload).
+  EXPECT_EQ(delegate_.frames()[1].payload.size(), 0);
+  EXPECT_EQ(delegate_.frames()[1].size, kDataFrameHeaderSize);
+
+  // Header fields are still present and parseable.
+  EXPECT_EQ(delegate_.frames()[1].source_id, 0);
+  EXPECT_EQ(delegate_.frames()[1].sequence_number, 1);
+  EXPECT_EQ(delegate_.frames()[1].send_time, start_time);
+}
+
 }  // namespace
 }  // namespace test
 }  // namespace quic
diff --git a/quic/quartc/test/quartc_peer.cc b/quic/quartc/test/quartc_peer.cc
new file mode 100644
index 0000000..5970969
--- /dev/null
+++ b/quic/quartc/test/quartc_peer.cc
@@ -0,0 +1,116 @@
+// Copyright (c) 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/third_party/quiche/src/quic/quartc/test/quartc_peer.h"
+
+#include "net/third_party/quiche/src/quic/platform/api/quic_string_piece.h"
+
+namespace quic {
+namespace test {
+
+QuartcPeer::QuartcPeer(const QuicClock* clock,
+                       QuicAlarmFactory* alarm_factory,
+                       QuicRandom* random,
+                       const std::vector<QuartcDataSource::Config>& configs)
+    : clock_(clock),
+      alarm_factory_(alarm_factory),
+      random_(random),
+      session_(nullptr),
+      configs_(configs) {}
+
+QuartcPeer::~QuartcPeer() {
+  session_->CloseConnection("~QuartcPeer()");
+}
+
+void QuartcPeer::OnSessionCreated(QuartcSession* session) {
+  session_ = session;
+
+  session_->SetDelegate(this);
+  session_->StartCryptoHandshake();
+
+  QuicByteCount largest_message_payload =
+      session_->GetGuaranteedLargestMessagePayload();
+  for (auto& config : configs_) {
+    // Clamp maximum frame sizes to the largest supported by the session before
+    // creating data sources.
+    config.max_frame_size =
+        config.max_frame_size > 0
+            ? std::min(config.max_frame_size, largest_message_payload)
+            : largest_message_payload;
+    QUIC_LOG(INFO) << "Set max frame size for source " << config.id << " to "
+                   << config.max_frame_size;
+    data_sources_.push_back(QuicMakeUnique<QuartcDataSource>(
+        clock_, alarm_factory_, random_, config, this));
+  }
+}
+
+void QuartcPeer::OnConnectError(QuicErrorCode error,
+                                const std::string& error_details) {
+  QUIC_LOG(WARNING) << "Connect failed, error=" << error
+                    << ", details=" << error_details;
+  for (auto& source : data_sources_) {
+    source->SetEnabled(false);
+  }
+}
+
+void QuartcPeer::OnCryptoHandshakeComplete() {
+  for (auto& source : data_sources_) {
+    source->SetEnabled(true);
+  }
+}
+
+void QuartcPeer::OnConnectionWritable() {
+  for (auto& source : data_sources_) {
+    source->SetEnabled(true);
+  }
+}
+
+void QuartcPeer::OnIncomingStream(QuartcStream* stream) {
+  QUIC_LOG(DFATAL) << "Unexpected incoming stream, id=" << stream->id();
+}
+
+void QuartcPeer::OnCongestionControlChange(QuicBandwidth bandwidth_estimate,
+                                           QuicBandwidth pacing_rate,
+                                           QuicTime::Delta latest_rtt) {
+  // Note: this is fairly crude rate adaptation and makes no effort to account
+  // for overhead.  The congestion controller is assumed to account for this.
+  // It may do so by detecting overuse and pushing back on its bandwidth
+  // estimate, or it may explicitly subtract overhead before surfacing its
+  // estimate.
+  QuicBandwidth available = std::min(bandwidth_estimate, pacing_rate);
+  for (auto& source : data_sources_) {
+    available = source->AllocateBandwidth(available);
+  }
+}
+
+void QuartcPeer::OnConnectionClosed(QuicErrorCode error_code,
+                                    const std::string& error_details,
+                                    ConnectionCloseSource source) {
+  QUIC_LOG(INFO) << "Connection closed, error=" << error_code
+                 << ", details=" << error_details;
+  for (auto& source : data_sources_) {
+    source->SetEnabled(false);
+  }
+}
+
+void QuartcPeer::OnMessageReceived(QuicStringPiece message) {
+  ReceivedMessage received;
+  received.receive_time = clock_->Now();
+
+  if (!ParsedQuartcDataFrame::Parse(message, &received.frame)) {
+    QUIC_LOG(DFATAL) << "Failed to parse incoming message as test data frame: ["
+                     << message << "]";
+  }
+  received_messages_.push_back(received);
+}
+
+void QuartcPeer::OnDataProduced(const char* data, size_t length) {
+  // Further packetization is not required, as sources are configured to produce
+  // frames that fit within message payloads.
+  DCHECK_LE(length, session_->GetCurrentLargestMessagePayload());
+  session_->SendOrQueueMessage(std::string(data, length));
+}
+
+}  // namespace test
+}  // namespace quic
diff --git a/quic/quartc/test/quartc_peer.h b/quic/quartc/test/quartc_peer.h
new file mode 100644
index 0000000..502024c
--- /dev/null
+++ b/quic/quartc/test/quartc_peer.h
@@ -0,0 +1,105 @@
+// Copyright (c) 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_QUARTC_TEST_QUARTC_PEER_H_
+#define QUICHE_QUIC_QUARTC_TEST_QUARTC_PEER_H_
+
+#include <string>
+
+#include "net/third_party/quiche/src/quic/core/crypto/quic_random.h"
+#include "net/third_party/quiche/src/quic/core/quic_alarm_factory.h"
+#include "net/third_party/quiche/src/quic/core/quic_bandwidth.h"
+#include "net/third_party/quiche/src/quic/core/quic_error_codes.h"
+#include "net/third_party/quiche/src/quic/core/quic_time.h"
+#include "net/third_party/quiche/src/quic/core/quic_types.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_containers.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_string_piece.h"
+#include "net/third_party/quiche/src/quic/quartc/quartc_endpoint.h"
+#include "net/third_party/quiche/src/quic/quartc/quartc_session.h"
+#include "net/third_party/quiche/src/quic/quartc/quartc_stream.h"
+#include "net/third_party/quiche/src/quic/quartc/test/quartc_data_source.h"
+
+namespace quic {
+namespace test {
+
+// ParsedQuartcDataFrame with a receive_time.
+struct ReceivedMessage {
+  ParsedQuartcDataFrame frame;
+  QuicTime receive_time = QuicTime::Zero();
+};
+
+// Test utility that adapts QuartcDataSources to a QuartcSession.
+// The utility creates and manages a set of QuartcDataSources.  It sends the
+// data produced by those sources as QUIC datagram frames.  It reconfigures the
+// maximum frame size of each source in order to fit test frames into QUIC
+// datagram frames.  It also adjusts the bitrate of each source to fit within
+// the bandwidth available to the session.
+class QuartcPeer : public QuartcEndpoint::Delegate,
+                   public QuartcSession::Delegate,
+                   public QuartcDataSource::Delegate {
+ public:
+  // Creates a QuartcPeer that sends data from a set of sources described by
+  // |configs|.  Note that the max frame size of each config may be adjusted in
+  // order to fit within the constraints of the QUIC session.
+  QuartcPeer(const QuicClock* clock,
+             QuicAlarmFactory* alarm_factory,
+             QuicRandom* random,
+             const std::vector<QuartcDataSource::Config>& configs);
+  QuartcPeer(QuartcPeer&) = delete;
+  QuartcPeer& operator=(QuartcPeer&) = delete;
+
+  ~QuartcPeer();
+
+  // Messages received from the peer, in the order they were received.
+  const std::vector<ReceivedMessage>& received_messages() const {
+    return received_messages_;
+  }
+
+  // QuartcEndpoint::Delegate overrides.
+  void OnSessionCreated(QuartcSession* session) override;
+  void OnConnectError(QuicErrorCode error,
+                      const std::string& error_details) override;
+
+  // QuartcSession::Delegate overrides.
+  void OnCryptoHandshakeComplete() override;
+  void OnConnectionWritable() override;
+  void OnIncomingStream(QuartcStream* stream) override;
+  void OnCongestionControlChange(QuicBandwidth bandwidth_estimate,
+                                 QuicBandwidth pacing_rate,
+                                 QuicTime::Delta latest_rtt) override;
+  void OnConnectionClosed(QuicErrorCode error_code,
+                          const std::string& error_details,
+                          ConnectionCloseSource source) override;
+  void OnMessageReceived(QuicStringPiece message) override;
+
+  // QuartcDataSource::Delegate overrides.
+  void OnDataProduced(const char* data, size_t length) override;
+
+ private:
+  const QuicClock* clock_;
+  QuicAlarmFactory* alarm_factory_;
+  QuicRandom* random_;
+
+  // Session used for sending and receiving data.  Not owned.  Created by an
+  // external QuartcEndpoint and set in the |OnSessionCreated| callback.
+  QuartcSession* session_;
+
+  // Saved copy of the configs for data sources.  These configs may be modified
+  // before |data_sources_| are initialized (for example, to set appropriate
+  // max frame sizes).
+  std::vector<QuartcDataSource::Config> configs_;
+
+  // Data sources are initialized once the session is created and enabled once
+  // the session is able to send.
+  std::vector<std::unique_ptr<QuartcDataSource>> data_sources_;
+
+  // Messages received by this peer from the remote peer.  Stored in the order
+  // they are received.
+  std::vector<ReceivedMessage> received_messages_;
+};
+
+}  // namespace test
+}  // namespace quic
+
+#endif  // QUICHE_QUIC_QUARTC_FAKE_QUARTC_PEER_H_
diff --git a/quic/quartc/test/quartc_peer_test.cc b/quic/quartc/test/quartc_peer_test.cc
new file mode 100644
index 0000000..06a7432
--- /dev/null
+++ b/quic/quartc/test/quartc_peer_test.cc
@@ -0,0 +1,367 @@
+// Copyright (c) 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/third_party/quiche/src/quic/quartc/test/quartc_peer.h"
+
+#include "testing/gmock/include/gmock/gmock.h"
+#include "testing/gtest/include/gtest/gtest.h"
+#include "net/third_party/quiche/src/quic/core/quic_bandwidth.h"
+#include "net/third_party/quiche/src/quic/core/quic_constants.h"
+#include "net/third_party/quiche/src/quic/core/quic_time.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_test.h"
+#include "net/third_party/quiche/src/quic/quartc/quartc_endpoint.h"
+#include "net/third_party/quiche/src/quic/quartc/simulated_packet_transport.h"
+#include "net/third_party/quiche/src/quic/test_tools/simulator/link.h"
+#include "net/third_party/quiche/src/quic/test_tools/simulator/simulator.h"
+
+namespace quic {
+namespace test {
+namespace {
+
+class QuartcPeerTest : public QuicTest {
+ protected:
+  QuartcPeerTest()
+      : client_transport_(&simulator_,
+                          "client_transport",
+                          "server_transport",
+                          10 * kDefaultMaxPacketSize),
+        server_transport_(&simulator_,
+                          "server_transport",
+                          "client_transport",
+                          10 * kDefaultMaxPacketSize),
+        client_server_link_(&client_transport_,
+                            &server_transport_,
+                            QuicBandwidth::FromKBitsPerSecond(512),
+                            QuicTime::Delta::FromMilliseconds(100)) {}
+
+  void CreatePeers(const std::vector<QuartcDataSource::Config>& configs) {
+    client_peer_ = QuicMakeUnique<QuartcPeer>(
+        simulator_.GetClock(), simulator_.GetAlarmFactory(),
+        simulator_.GetRandomGenerator(), configs);
+    server_peer_ = QuicMakeUnique<QuartcPeer>(
+        simulator_.GetClock(), simulator_.GetAlarmFactory(),
+        simulator_.GetRandomGenerator(), configs);
+  }
+
+  void Connect() {
+    DCHECK(client_peer_);
+    DCHECK(server_peer_);
+
+    server_endpoint_ = QuicMakeUnique<QuartcServerEndpoint>(
+        simulator_.GetAlarmFactory(), simulator_.GetClock(), server_peer_.get(),
+        QuartcSessionConfig());
+    client_endpoint_ = QuicMakeUnique<QuartcClientEndpoint>(
+        simulator_.GetAlarmFactory(), simulator_.GetClock(), client_peer_.get(),
+        QuartcSessionConfig(), server_endpoint_->server_crypto_config());
+
+    server_endpoint_->Connect(&server_transport_);
+    client_endpoint_->Connect(&client_transport_);
+  }
+
+  simulator::Simulator simulator_;
+  simulator::SimulatedQuartcPacketTransport client_transport_;
+  simulator::SimulatedQuartcPacketTransport server_transport_;
+  simulator::SymmetricLink client_server_link_;
+
+  std::unique_ptr<QuartcClientEndpoint> client_endpoint_;
+  std::unique_ptr<QuartcPeer> client_peer_;
+
+  std::unique_ptr<QuartcServerEndpoint> server_endpoint_;
+  std::unique_ptr<QuartcPeer> server_peer_;
+};
+
+const ReceivedMessage& FindLastMessageFromSource(
+    const std::vector<ReceivedMessage>& messages,
+    int32_t source_id) {
+  const auto& it = std::find_if(messages.rbegin(), messages.rend(),
+                                [source_id](const ReceivedMessage& r) {
+                                  return r.frame.source_id == source_id;
+                                });
+  return *it;
+}
+
+TEST_F(QuartcPeerTest, SendReceiveMessages) {
+  QuicTime start_time = simulator_.GetClock()->Now();
+
+  QuartcDataSource::Config config;
+  config.id = 1;
+
+  CreatePeers({config});
+  Connect();
+
+  ASSERT_TRUE(simulator_.RunUntil([this] {
+    return !client_peer_->received_messages().empty() &&
+           !server_peer_->received_messages().empty();
+  }));
+
+  QuicTime end_time = simulator_.GetClock()->Now();
+
+  // Sanity checks on messages.
+  const ReceivedMessage& client_message = client_peer_->received_messages()[0];
+  EXPECT_EQ(client_message.frame.source_id, 1);
+  EXPECT_EQ(client_message.frame.sequence_number, 0);
+  EXPECT_GE(client_message.frame.send_time, start_time);
+  EXPECT_LE(client_message.receive_time, end_time);
+
+  const ReceivedMessage& server_message = server_peer_->received_messages()[0];
+  EXPECT_EQ(server_message.frame.source_id, 1);
+  EXPECT_EQ(server_message.frame.sequence_number, 0);
+  EXPECT_GE(server_message.frame.send_time, start_time);
+  EXPECT_LE(server_message.receive_time, end_time);
+}
+
+TEST_F(QuartcPeerTest, MaxFrameSizeUnset) {
+  // Configure the source with no max frame size, and a framerate and max
+  // bandwidth that allows very large frames (larger than will fit in a packet).
+  QuartcDataSource::Config config;
+  config.id = 1;
+  config.frame_interval = QuicTime::Delta::FromMilliseconds(20);
+  config.max_bandwidth = QuicBandwidth::FromBytesAndTimeDelta(
+      2 * kDefaultMaxPacketSize, config.frame_interval);
+
+  CreatePeers({config});
+  Connect();
+
+  // Run long enough for the bandwidth estimate to ramp up.
+  simulator_.RunFor(QuicTime::Delta::FromSeconds(15));
+
+  // The peers generate frames that fit in one packet.
+  EXPECT_LT(client_peer_->received_messages().back().frame.size,
+            kDefaultMaxPacketSize);
+  EXPECT_LT(server_peer_->received_messages().back().frame.size,
+            kDefaultMaxPacketSize);
+}
+
+TEST_F(QuartcPeerTest, MaxFrameSizeLargerThanPacketSize) {
+  // Configure the source with a max frame size larger than the packet size.
+  QuartcDataSource::Config config;
+  config.id = 1;
+  config.max_frame_size = 2 * kDefaultMaxPacketSize;
+
+  CreatePeers({config});
+  Connect();
+
+  // Run long enough for the bandwidth estimate to ramp up.
+  simulator_.RunFor(QuicTime::Delta::FromSeconds(15));
+
+  // The peers generate frames that fit in one packet.
+  EXPECT_LT(client_peer_->received_messages().back().frame.size,
+            kDefaultMaxPacketSize);
+  EXPECT_LT(server_peer_->received_messages().back().frame.size,
+            kDefaultMaxPacketSize);
+}
+
+TEST_F(QuartcPeerTest, MaxFrameSizeSmallerThanPacketSize) {
+  QuartcDataSource::Config config;
+  config.id = 1;
+  config.max_frame_size = 100;
+  // Note that a long frame interval helps to ensure that the test produces
+  // enough bytes per frame to reach max_frame_size.
+  config.frame_interval = QuicTime::Delta::FromMilliseconds(100);
+
+  CreatePeers({config});
+  Connect();
+
+  // Run long enough for the bandwidth estimate to ramp up.
+  // Note that the ramp-up time is longer for this test because of the long
+  // frame_interval.  This seems especially true when QUIC enables all flags for
+  // testing.
+  simulator_.RunFor(QuicTime::Delta::FromSeconds(30));
+
+  EXPECT_EQ(client_peer_->received_messages().back().frame.size, 100);
+  EXPECT_EQ(server_peer_->received_messages().back().frame.size, 100);
+}
+
+TEST_F(QuartcPeerTest, MaxFrameSizeSmallerThanFrameHeader) {
+  QuartcDataSource::Config config;
+  config.id = 1;
+  config.max_frame_size = kDataFrameHeaderSize - 1;
+
+  CreatePeers({config});
+  Connect();
+
+  // Run long enough for the bandwidth estimate to ramp up.
+  simulator_.RunFor(QuicTime::Delta::FromSeconds(15));
+
+  // Max frame sizes smaller than the header are ignored, and the frame size is
+  // limited by packet size.
+  EXPECT_LT(client_peer_->received_messages().back().frame.size,
+            kDefaultMaxPacketSize);
+  EXPECT_LT(server_peer_->received_messages().back().frame.size,
+            kDefaultMaxPacketSize);
+}
+
+TEST_F(QuartcPeerTest, SendReceiveMultipleSources) {
+  QuicTime start_time = simulator_.GetClock()->Now();
+
+  // Note: use of really long frame intervals means that each source should send
+  // one frame during this test.  This simplifies expectations for received
+  // data.
+  QuartcDataSource::Config config_1;
+  config_1.id = 1;
+  config_1.max_bandwidth = QuicBandwidth::FromKBitsPerSecond(32);
+  config_1.frame_interval = QuicTime::Delta::FromSeconds(10);
+
+  QuartcDataSource::Config config_2;
+  config_2.id = 2;
+  config_2.max_bandwidth = QuicBandwidth::FromKBitsPerSecond(64);
+  config_2.frame_interval = QuicTime::Delta::FromSeconds(10);
+
+  QuartcDataSource::Config config_3;
+  config_3.id = 3;
+  config_3.max_bandwidth = QuicBandwidth::FromKBitsPerSecond(128);
+  config_3.frame_interval = QuicTime::Delta::FromSeconds(10);
+
+  CreatePeers({config_1, config_2, config_3});
+  Connect();
+
+  ASSERT_TRUE(simulator_.RunUntil([this] {
+    return client_peer_->received_messages().size() == 3 &&
+           server_peer_->received_messages().size() == 3;
+  }));
+
+  QuicTime end_time = simulator_.GetClock()->Now();
+
+  // Sanity checks on messages.
+  const auto& order = [](const ReceivedMessage& lhs,
+                         const ReceivedMessage& rhs) {
+    return lhs.frame.source_id < rhs.frame.source_id;
+  };
+
+  std::vector<ReceivedMessage> client_messages =
+      client_peer_->received_messages();
+  std::sort(client_messages.begin(), client_messages.end(), order);
+  for (int i = 0; i < client_messages.size(); ++i) {
+    EXPECT_EQ(client_messages[i].frame.source_id, i + 1);
+    EXPECT_EQ(client_messages[i].frame.sequence_number, 0);
+    EXPECT_GE(client_messages[i].frame.send_time, start_time);
+    EXPECT_LE(client_messages[i].receive_time, end_time);
+  }
+
+  std::vector<ReceivedMessage> server_messages =
+      server_peer_->received_messages();
+  std::sort(server_messages.begin(), server_messages.end(), order);
+  for (int i = 0; i < server_messages.size(); ++i) {
+    EXPECT_EQ(server_messages[i].frame.source_id, i + 1);
+    EXPECT_EQ(server_messages[i].frame.sequence_number, 0);
+    EXPECT_GE(server_messages[i].frame.send_time, start_time);
+    EXPECT_LE(server_messages[i].receive_time, end_time);
+  }
+}
+
+TEST_F(QuartcPeerTest, BandwidthAllocationWithEnoughAvailable) {
+  QuartcDataSource::Config config_1;
+  config_1.id = 1;
+  config_1.max_bandwidth = QuicBandwidth::FromKBitsPerSecond(32);
+  config_1.frame_interval = QuicTime::Delta::FromMilliseconds(100);
+
+  QuartcDataSource::Config config_2;
+  config_2.id = 2;
+  config_2.max_bandwidth = QuicBandwidth::FromKBitsPerSecond(64);
+  config_2.frame_interval = QuicTime::Delta::FromMilliseconds(25);
+
+  QuartcDataSource::Config config_3;
+  config_3.id = 3;
+  config_3.max_bandwidth = QuicBandwidth::FromKBitsPerSecond(128);
+  config_3.frame_interval = QuicTime::Delta::FromMilliseconds(10);
+
+  CreatePeers({config_1, config_2, config_3});
+  Connect();
+
+  // Run for long enough that bandwidth ramps up and meets the requirements of
+  // all three sources.
+  simulator_.RunFor(QuicTime::Delta::FromSeconds(15));
+
+  // The last message from each source should be the size allowed by that
+  // source's maximum bandwidth and frame interval.
+  QuicByteCount source_1_size =
+      config_1.max_bandwidth.ToBytesPerPeriod(config_1.frame_interval);
+  QuicByteCount source_2_size =
+      config_2.max_bandwidth.ToBytesPerPeriod(config_2.frame_interval);
+  QuicByteCount source_3_size =
+      config_3.max_bandwidth.ToBytesPerPeriod(config_3.frame_interval);
+
+  const std::vector<ReceivedMessage>& client_messages =
+      client_peer_->received_messages();
+  EXPECT_EQ(FindLastMessageFromSource(client_messages, 1).frame.size,
+            source_1_size);
+  EXPECT_EQ(FindLastMessageFromSource(client_messages, 2).frame.size,
+            source_2_size);
+  EXPECT_EQ(FindLastMessageFromSource(client_messages, 3).frame.size,
+            source_3_size);
+
+  const std::vector<ReceivedMessage>& server_messages =
+      server_peer_->received_messages();
+  EXPECT_EQ(FindLastMessageFromSource(server_messages, 1).frame.size,
+            source_1_size);
+  EXPECT_EQ(FindLastMessageFromSource(server_messages, 2).frame.size,
+            source_2_size);
+  EXPECT_EQ(FindLastMessageFromSource(server_messages, 3).frame.size,
+            source_3_size);
+}
+
+TEST_F(QuartcPeerTest, BandwidthAllocationWithoutEnoughAvailable) {
+  QuartcDataSource::Config config_1;
+  config_1.id = 1;
+  config_1.max_bandwidth = client_server_link_.bandwidth() * 0.5;
+  config_1.frame_interval = QuicTime::Delta::FromMilliseconds(10);
+
+  QuartcDataSource::Config config_2;
+  config_2.id = 2;
+  config_2.min_bandwidth = QuicBandwidth::FromKBitsPerSecond(32);
+  config_2.max_bandwidth = client_server_link_.bandwidth();
+  config_2.frame_interval = QuicTime::Delta::FromMilliseconds(5);
+
+  QuartcDataSource::Config config_3;
+  config_3.id = 3;
+  config_3.min_bandwidth = QuicBandwidth::FromKBitsPerSecond(32);
+  config_3.max_bandwidth = client_server_link_.bandwidth() * 2;
+  config_3.frame_interval = QuicTime::Delta::FromMilliseconds(20);
+
+  CreatePeers({config_1, config_2, config_3});
+  Connect();
+
+  // Run for long enough that bandwidth ramps up to link capacity.
+  simulator_.RunFor(QuicTime::Delta::FromSeconds(15));
+
+  const std::vector<ReceivedMessage>& client_messages =
+      client_peer_->received_messages();
+  const std::vector<ReceivedMessage>& server_messages =
+      server_peer_->received_messages();
+
+  // Source 1 eventually ramps up to full bandwidth.
+  const QuicByteCount source_1_size =
+      config_1.max_bandwidth.ToBytesPerPeriod(config_1.frame_interval);
+  EXPECT_EQ(FindLastMessageFromSource(client_messages, 1).frame.size,
+            source_1_size);
+  EXPECT_EQ(FindLastMessageFromSource(server_messages, 1).frame.size,
+            source_1_size);
+
+  // Source 2 takes the remainder of available bandwidth.  However, the exact
+  // value depends on the bandwidth estimate.
+  const QuicByteCount source_2_min =
+      config_2.min_bandwidth.ToBytesPerPeriod(config_2.frame_interval);
+  const QuicByteCount source_2_max =
+      config_2.max_bandwidth.ToBytesPerPeriod(config_2.frame_interval);
+  EXPECT_GT(FindLastMessageFromSource(client_messages, 2).frame.size,
+            source_2_min);
+  EXPECT_LT(FindLastMessageFromSource(client_messages, 2).frame.size,
+            source_2_max);
+  EXPECT_GT(FindLastMessageFromSource(server_messages, 2).frame.size,
+            source_2_min);
+  EXPECT_LT(FindLastMessageFromSource(server_messages, 2).frame.size,
+            source_2_max);
+
+  // Source 3 gets only its minimum bandwidth.
+  const QuicByteCount source_3_size =
+      config_3.min_bandwidth.ToBytesPerPeriod(config_3.frame_interval);
+  EXPECT_EQ(FindLastMessageFromSource(client_messages, 3).frame.size,
+            source_3_size);
+  EXPECT_EQ(FindLastMessageFromSource(server_messages, 3).frame.size,
+            source_3_size);
+}
+
+}  // namespace
+}  // namespace test
+}  // namespace quic