blob: d738fd6d84b84db9e1d9b9c2853020dc2814c2e7 [file] [log] [blame]
Bence Békybac04052022-04-07 15:44:29 -04001// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "quiche/quic/core/quic_stream.h"
6
martinduke890a21c2024-09-18 07:46:32 -07007#include <cstddef>
Bence Békybac04052022-04-07 15:44:29 -04008#include <memory>
vasilvv243b2622023-11-07 17:01:30 -08009#include <optional>
Bence Békybac04052022-04-07 15:44:29 -040010#include <string>
11#include <utility>
QUICHE team7baee702024-05-21 11:18:15 -070012#include <vector>
Bence Békybac04052022-04-07 15:44:29 -040013
14#include "absl/base/macros.h"
15#include "absl/memory/memory.h"
16#include "absl/strings/string_view.h"
Bence Békybac04052022-04-07 15:44:29 -040017#include "quiche/quic/core/crypto/null_encrypter.h"
renjietang20dbb382024-06-20 14:31:32 -070018#include "quiche/quic/core/frames/quic_connection_close_frame.h"
martinduke890a21c2024-09-18 07:46:32 -070019#include "quiche/quic/core/frames/quic_reset_stream_at_frame.h"
Bence Békybac04052022-04-07 15:44:29 -040020#include "quiche/quic/core/frames/quic_rst_stream_frame.h"
21#include "quiche/quic/core/quic_connection.h"
22#include "quiche/quic/core/quic_constants.h"
23#include "quiche/quic/core/quic_error_codes.h"
24#include "quiche/quic/core/quic_types.h"
25#include "quiche/quic/core/quic_utils.h"
26#include "quiche/quic/core/quic_versions.h"
27#include "quiche/quic/core/quic_write_blocked_list.h"
28#include "quiche/quic/platform/api/quic_expect_bug.h"
29#include "quiche/quic/platform/api/quic_flags.h"
30#include "quiche/quic/platform/api/quic_logging.h"
31#include "quiche/quic/platform/api/quic_test.h"
32#include "quiche/quic/test_tools/quic_config_peer.h"
33#include "quiche/quic/test_tools/quic_connection_peer.h"
34#include "quiche/quic/test_tools/quic_flow_controller_peer.h"
35#include "quiche/quic/test_tools/quic_session_peer.h"
36#include "quiche/quic/test_tools/quic_stream_peer.h"
37#include "quiche/quic/test_tools/quic_stream_sequencer_peer.h"
38#include "quiche/quic/test_tools/quic_test_utils.h"
39#include "quiche/common/quiche_mem_slice_storage.h"
40
41using testing::_;
42using testing::AnyNumber;
43using testing::AtLeast;
44using testing::InSequence;
45using testing::Invoke;
46using testing::InvokeWithoutArgs;
47using testing::Return;
48using testing::StrictMock;
49
50namespace quic {
51namespace test {
52namespace {
53
54const char kData1[] = "FooAndBar";
55const char kData2[] = "EepAndBaz";
56const QuicByteCount kDataLen = 9;
martindukea12ff5d2022-10-12 17:25:51 -070057const uint8_t kPacket0ByteConnectionId = 0;
58const uint8_t kPacket8ByteConnectionId = 8;
Bence Békybac04052022-04-07 15:44:29 -040059
60class TestStream : public QuicStream {
61 public:
62 TestStream(QuicStreamId id, QuicSession* session, StreamType type)
63 : QuicStream(id, session, /*is_static=*/false, type) {
64 sequencer()->set_level_triggered(true);
65 }
66
67 TestStream(PendingStream* pending, QuicSession* session, bool is_static)
68 : QuicStream(pending, session, is_static) {}
69
70 MOCK_METHOD(void, OnDataAvailable, (), (override));
71
72 MOCK_METHOD(void, OnCanWriteNewData, (), (override));
73
74 MOCK_METHOD(void, OnWriteSideInDataRecvdState, (), (override));
75
76 using QuicStream::CanWriteNewData;
77 using QuicStream::CanWriteNewDataAfterData;
78 using QuicStream::CloseWriteSide;
79 using QuicStream::fin_buffered;
80 using QuicStream::MaybeSendStopSending;
81 using QuicStream::OnClose;
82 using QuicStream::WriteMemSlices;
83 using QuicStream::WriteOrBufferData;
84
martinduke890a21c2024-09-18 07:46:32 -070085 void ConsumeData(size_t num_bytes) {
86 char buffer[1024];
87 ASSERT_GT(ABSL_ARRAYSIZE(buffer), num_bytes);
88 struct iovec iov;
89 iov.iov_base = buffer;
90 iov.iov_len = num_bytes;
91 ASSERT_EQ(num_bytes, QuicStreamPeer::sequencer(this)->Readv(&iov, 1));
92 }
93
Bence Békybac04052022-04-07 15:44:29 -040094 private:
95 std::string data_;
96};
97
98class QuicStreamTest : public QuicTestWithParam<ParsedQuicVersion> {
99 public:
100 QuicStreamTest()
101 : zero_(QuicTime::Delta::Zero()),
102 supported_versions_(AllSupportedVersions()) {}
103
104 void Initialize(Perspective perspective = Perspective::IS_SERVER) {
105 ParsedQuicVersionVector version_vector;
106 version_vector.push_back(GetParam());
107 connection_ = new StrictMock<MockQuicConnection>(
108 &helper_, &alarm_factory_, perspective, version_vector);
109 connection_->AdvanceTime(QuicTime::Delta::FromSeconds(1));
110 session_ = std::make_unique<StrictMock<MockQuicSession>>(connection_);
111 session_->Initialize();
112 connection_->SetEncrypter(
113 ENCRYPTION_FORWARD_SECURE,
114 std::make_unique<NullEncrypter>(connection_->perspective()));
115 QuicConfigPeer::SetReceivedInitialSessionFlowControlWindow(
116 session_->config(), kMinimumFlowControlSendWindow);
117 QuicConfigPeer::SetReceivedInitialMaxStreamDataBytesUnidirectional(
118 session_->config(), kMinimumFlowControlSendWindow);
119 QuicConfigPeer::SetReceivedInitialMaxStreamDataBytesIncomingBidirectional(
120 session_->config(), kMinimumFlowControlSendWindow);
121 QuicConfigPeer::SetReceivedInitialMaxStreamDataBytesOutgoingBidirectional(
122 session_->config(), kMinimumFlowControlSendWindow);
123 QuicConfigPeer::SetReceivedMaxUnidirectionalStreams(session_->config(), 10);
124 session_->OnConfigNegotiated();
125
126 stream_ = new StrictMock<TestStream>(kTestStreamId, session_.get(),
127 BIDIRECTIONAL);
128 EXPECT_NE(nullptr, stream_);
renjietang89540a62022-12-01 14:46:26 -0800129 EXPECT_CALL(*session_, ShouldKeepConnectionAlive())
130 .WillRepeatedly(Return(true));
Bence Békybac04052022-04-07 15:44:29 -0400131 // session_ now owns stream_.
132 session_->ActivateStream(absl::WrapUnique(stream_));
133 // Ignore resetting when session_ is terminated.
134 EXPECT_CALL(*session_, MaybeSendStopSendingFrame(kTestStreamId, _))
135 .Times(AnyNumber());
136 EXPECT_CALL(*session_, MaybeSendRstStreamFrame(kTestStreamId, _, _))
137 .Times(AnyNumber());
138 write_blocked_list_ =
139 QuicSessionPeer::GetWriteBlockedStreams(session_.get());
140 }
141
142 bool fin_sent() { return stream_->fin_sent(); }
143 bool rst_sent() { return stream_->rst_sent(); }
144
145 bool HasWriteBlockedStreams() {
146 return write_blocked_list_->HasWriteBlockedSpecialStream() ||
147 write_blocked_list_->HasWriteBlockedDataStreams();
148 }
149
150 QuicConsumedData CloseStreamOnWriteError(
151 QuicStreamId id, QuicByteCount /*write_length*/,
152 QuicStreamOffset /*offset*/, StreamSendingState /*state*/,
vasilvv243b2622023-11-07 17:01:30 -0800153 TransmissionType /*type*/, std::optional<EncryptionLevel> /*level*/) {
Bence Békybac04052022-04-07 15:44:29 -0400154 session_->ResetStream(id, QUIC_STREAM_CANCELLED);
155 return QuicConsumedData(1, false);
156 }
157
158 bool ClearResetStreamFrame(const QuicFrame& frame) {
159 EXPECT_EQ(RST_STREAM_FRAME, frame.type);
160 DeleteFrame(&const_cast<QuicFrame&>(frame));
161 return true;
162 }
163
164 bool ClearStopSendingFrame(const QuicFrame& frame) {
165 EXPECT_EQ(STOP_SENDING_FRAME, frame.type);
166 DeleteFrame(&const_cast<QuicFrame&>(frame));
167 return true;
168 }
169
170 protected:
171 MockQuicConnectionHelper helper_;
172 MockAlarmFactory alarm_factory_;
173 MockQuicConnection* connection_;
174 std::unique_ptr<MockQuicSession> session_;
175 StrictMock<TestStream>* stream_;
vasilvv5d896432023-03-20 06:11:07 -0700176 QuicWriteBlockedListInterface* write_blocked_list_;
Bence Békybac04052022-04-07 15:44:29 -0400177 QuicTime::Delta zero_;
178 ParsedQuicVersionVector supported_versions_;
bnc862751f2022-04-13 08:33:42 -0700179 QuicStreamId kTestStreamId = GetNthClientInitiatedBidirectionalStreamId(
180 GetParam().transport_version, 1);
Bence Békybac04052022-04-07 15:44:29 -0400181 const QuicStreamId kTestPendingStreamId =
182 GetNthClientInitiatedUnidirectionalStreamId(GetParam().transport_version,
183 1);
184};
185
186INSTANTIATE_TEST_SUITE_P(QuicStreamTests, QuicStreamTest,
187 ::testing::ValuesIn(AllSupportedVersions()),
188 ::testing::PrintToStringParamName());
189
190using PendingStreamTest = QuicStreamTest;
191
192INSTANTIATE_TEST_SUITE_P(PendingStreamTests, PendingStreamTest,
193 ::testing::ValuesIn(CurrentSupportedHttp3Versions()),
194 ::testing::PrintToStringParamName());
195
196TEST_P(PendingStreamTest, PendingStreamStaticness) {
197 Initialize();
198
199 PendingStream pending(kTestPendingStreamId, session_.get());
200 TestStream stream(&pending, session_.get(), false);
201 EXPECT_FALSE(stream.is_static());
202
203 PendingStream pending2(kTestPendingStreamId + 4, session_.get());
204 TestStream stream2(&pending2, session_.get(), true);
205 EXPECT_TRUE(stream2.is_static());
206}
207
208TEST_P(PendingStreamTest, PendingStreamType) {
209 Initialize();
210
211 PendingStream pending(kTestPendingStreamId, session_.get());
212 TestStream stream(&pending, session_.get(), false);
213 EXPECT_EQ(stream.type(), READ_UNIDIRECTIONAL);
214}
215
216TEST_P(PendingStreamTest, PendingStreamTypeOnClient) {
217 Initialize(Perspective::IS_CLIENT);
218
219 QuicStreamId server_initiated_pending_stream_id =
220 GetNthServerInitiatedUnidirectionalStreamId(session_->transport_version(),
221 1);
222 PendingStream pending(server_initiated_pending_stream_id, session_.get());
223 TestStream stream(&pending, session_.get(), false);
224 EXPECT_EQ(stream.type(), READ_UNIDIRECTIONAL);
225}
226
227TEST_P(PendingStreamTest, PendingStreamTooMuchData) {
228 Initialize();
229
230 PendingStream pending(kTestPendingStreamId, session_.get());
231 // Receive a stream frame that violates flow control: the byte offset is
232 // higher than the receive window offset.
233 QuicStreamFrame frame(kTestPendingStreamId, false,
234 kInitialSessionFlowControlWindowForTest + 1, ".");
235
236 // Stream should not accept the frame, and the connection should be closed.
237 EXPECT_CALL(*connection_,
238 CloseConnection(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA, _, _));
239 pending.OnStreamFrame(frame);
240}
241
242TEST_P(PendingStreamTest, PendingStreamTooMuchDataInRstStream) {
243 Initialize();
244
245 PendingStream pending1(kTestPendingStreamId, session_.get());
246 // Receive a rst stream frame that violates flow control: the byte offset is
247 // higher than the receive window offset.
248 QuicRstStreamFrame frame1(kInvalidControlFrameId, kTestPendingStreamId,
249 QUIC_STREAM_CANCELLED,
250 kInitialSessionFlowControlWindowForTest + 1);
251
252 // Pending stream should not accept the frame, and the connection should be
253 // closed.
254 EXPECT_CALL(*connection_,
255 CloseConnection(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA, _, _));
256 pending1.OnRstStreamFrame(frame1);
257
258 QuicStreamId bidirection_stream_id = QuicUtils::GetFirstBidirectionalStreamId(
259 session_->transport_version(), Perspective::IS_CLIENT);
260 PendingStream pending2(bidirection_stream_id, session_.get());
261 // Receive a rst stream frame that violates flow control: the byte offset is
262 // higher than the receive window offset.
263 QuicRstStreamFrame frame2(kInvalidControlFrameId, bidirection_stream_id,
264 QUIC_STREAM_CANCELLED,
265 kInitialSessionFlowControlWindowForTest + 1);
266 // Bidirectional Pending stream should not accept the frame, and the
267 // connection should be closed.
268 EXPECT_CALL(*connection_,
269 CloseConnection(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA, _, _));
270 pending2.OnRstStreamFrame(frame2);
271}
272
273TEST_P(PendingStreamTest, PendingStreamRstStream) {
274 Initialize();
275
276 PendingStream pending(kTestPendingStreamId, session_.get());
277 QuicStreamOffset final_byte_offset = 7;
278 QuicRstStreamFrame frame(kInvalidControlFrameId, kTestPendingStreamId,
279 QUIC_STREAM_CANCELLED, final_byte_offset);
280
281 // Pending stream should accept the frame and not close the connection.
282 EXPECT_CALL(*connection_, CloseConnection(_, _, _)).Times(0);
283 pending.OnRstStreamFrame(frame);
284}
285
286TEST_P(PendingStreamTest, PendingStreamWindowUpdate) {
287 Initialize();
288
289 QuicStreamId bidirection_stream_id = QuicUtils::GetFirstBidirectionalStreamId(
290 session_->transport_version(), Perspective::IS_CLIENT);
291 PendingStream pending(bidirection_stream_id, session_.get());
292 QuicWindowUpdateFrame frame(kInvalidControlFrameId, bidirection_stream_id,
293 kDefaultFlowControlSendWindow * 2);
294 pending.OnWindowUpdateFrame(frame);
295 TestStream stream(&pending, session_.get(), false);
296
297 EXPECT_EQ(QuicStreamPeer::SendWindowSize(&stream),
298 kDefaultFlowControlSendWindow * 2);
299}
300
301TEST_P(PendingStreamTest, PendingStreamStopSending) {
302 Initialize();
303
304 QuicStreamId bidirection_stream_id = QuicUtils::GetFirstBidirectionalStreamId(
305 session_->transport_version(), Perspective::IS_CLIENT);
306 PendingStream pending(bidirection_stream_id, session_.get());
307 QuicResetStreamError error =
308 QuicResetStreamError::FromInternal(QUIC_STREAM_INTERNAL_ERROR);
309 pending.OnStopSending(error);
310 EXPECT_TRUE(pending.GetStopSendingErrorCode());
311 auto actual_error = *pending.GetStopSendingErrorCode();
312 EXPECT_EQ(actual_error, error);
313}
314
315TEST_P(PendingStreamTest, FromPendingStream) {
316 Initialize();
317
318 PendingStream pending(kTestPendingStreamId, session_.get());
319
320 QuicStreamFrame frame(kTestPendingStreamId, false, 2, ".");
321 pending.OnStreamFrame(frame);
322 pending.OnStreamFrame(frame);
323 QuicStreamFrame frame2(kTestPendingStreamId, true, 3, ".");
324 pending.OnStreamFrame(frame2);
325
326 TestStream stream(&pending, session_.get(), false);
327 EXPECT_EQ(3, stream.num_frames_received());
328 EXPECT_EQ(3u, stream.stream_bytes_read());
329 EXPECT_EQ(1, stream.num_duplicate_frames_received());
330 EXPECT_EQ(true, stream.fin_received());
331 EXPECT_EQ(frame2.offset + 1, stream.highest_received_byte_offset());
332 EXPECT_EQ(frame2.offset + 1,
333 session_->flow_controller()->highest_received_byte_offset());
334}
335
336TEST_P(PendingStreamTest, FromPendingStreamThenData) {
337 Initialize();
338
339 PendingStream pending(kTestPendingStreamId, session_.get());
340
341 QuicStreamFrame frame(kTestPendingStreamId, false, 2, ".");
342 pending.OnStreamFrame(frame);
343
344 auto stream = new TestStream(&pending, session_.get(), false);
345 session_->ActivateStream(absl::WrapUnique(stream));
346
347 QuicStreamFrame frame2(kTestPendingStreamId, true, 3, ".");
348 stream->OnStreamFrame(frame2);
349
350 EXPECT_EQ(2, stream->num_frames_received());
351 EXPECT_EQ(2u, stream->stream_bytes_read());
352 EXPECT_EQ(true, stream->fin_received());
353 EXPECT_EQ(frame2.offset + 1, stream->highest_received_byte_offset());
354 EXPECT_EQ(frame2.offset + 1,
355 session_->flow_controller()->highest_received_byte_offset());
356}
357
martinduke890a21c2024-09-18 07:46:32 -0700358TEST_P(PendingStreamTest, ResetStreamAt) {
359 Initialize();
360 if (!VersionHasIetfQuicFrames(session_->transport_version())) {
361 return;
362 }
363
364 PendingStream pending(kTestPendingStreamId, session_.get());
365
366 QuicResetStreamAtFrame rst(0, kTestPendingStreamId, QUIC_STREAM_CANCELLED,
367 100, 3);
368 pending.OnResetStreamAtFrame(rst);
369 QuicStreamFrame frame(kTestPendingStreamId, false, 2, ".");
370 pending.OnStreamFrame(frame);
371
372 auto stream = new TestStream(&pending, session_.get(), false);
373 session_->ActivateStream(absl::WrapUnique(stream));
374
375 EXPECT_FALSE(stream->rst_received());
376 EXPECT_FALSE(stream->read_side_closed());
377 EXPECT_CALL(*stream, OnDataAvailable()).WillOnce([&]() {
378 stream->ConsumeData(3);
379 });
380 QuicStreamFrame frame2(kTestPendingStreamId, false, 0, "..");
381 stream->OnStreamFrame(frame2);
382 EXPECT_TRUE(stream->read_side_closed());
383 EXPECT_TRUE(stream->rst_received());
384}
385
Bence Békybac04052022-04-07 15:44:29 -0400386TEST_P(QuicStreamTest, WriteAllData) {
387 Initialize();
388
389 QuicByteCount length =
390 1 + QuicPacketCreator::StreamFramePacketOverhead(
martindukea12ff5d2022-10-12 17:25:51 -0700391 connection_->transport_version(), kPacket8ByteConnectionId,
392 kPacket0ByteConnectionId, !kIncludeVersion,
Bence Békybac04052022-04-07 15:44:29 -0400393 !kIncludeDiversificationNonce, PACKET_4BYTE_PACKET_NUMBER,
dschinazi35c0ff72022-08-16 12:10:06 -0700394 quiche::VARIABLE_LENGTH_INTEGER_LENGTH_0,
395 quiche::VARIABLE_LENGTH_INTEGER_LENGTH_0, 0u);
Bence Békybac04052022-04-07 15:44:29 -0400396 connection_->SetMaxPacketLength(length);
397
398 EXPECT_CALL(*session_, WritevData(kTestStreamId, _, _, _, _, _))
399 .WillOnce(Invoke(session_.get(), &MockQuicSession::ConsumeData));
400 stream_->WriteOrBufferData(kData1, false, nullptr);
401 EXPECT_FALSE(HasWriteBlockedStreams());
402}
403
404TEST_P(QuicStreamTest, NoBlockingIfNoDataOrFin) {
405 Initialize();
406
407 // Write no data and no fin. If we consume nothing we should not be write
408 // blocked.
409 EXPECT_QUIC_BUG(
410 stream_->WriteOrBufferData(absl::string_view(), false, nullptr), "");
411 EXPECT_FALSE(HasWriteBlockedStreams());
412}
413
414TEST_P(QuicStreamTest, BlockIfOnlySomeDataConsumed) {
415 Initialize();
416
417 // Write some data and no fin. If we consume some but not all of the data,
418 // we should be write blocked a not all the data was consumed.
419 EXPECT_CALL(*session_, WritevData(kTestStreamId, _, _, _, _, _))
420 .WillOnce(InvokeWithoutArgs([this]() {
421 return session_->ConsumeData(stream_->id(), 1u, 0u, NO_FIN,
vasilvv243b2622023-11-07 17:01:30 -0800422 NOT_RETRANSMISSION, std::nullopt);
Bence Békybac04052022-04-07 15:44:29 -0400423 }));
424 stream_->WriteOrBufferData(absl::string_view(kData1, 2), false, nullptr);
425 EXPECT_TRUE(session_->HasUnackedStreamData());
426 ASSERT_EQ(1u, write_blocked_list_->NumBlockedStreams());
427 EXPECT_EQ(1u, stream_->BufferedDataBytes());
428}
429
430TEST_P(QuicStreamTest, BlockIfFinNotConsumedWithData) {
431 Initialize();
432
433 // Write some data and no fin. If we consume all the data but not the fin,
434 // we should be write blocked because the fin was not consumed.
435 // (This should never actually happen as the fin should be sent out with the
436 // last data)
437 EXPECT_CALL(*session_, WritevData(kTestStreamId, _, _, _, _, _))
438 .WillOnce(InvokeWithoutArgs([this]() {
439 return session_->ConsumeData(stream_->id(), 2u, 0u, NO_FIN,
vasilvv243b2622023-11-07 17:01:30 -0800440 NOT_RETRANSMISSION, std::nullopt);
Bence Békybac04052022-04-07 15:44:29 -0400441 }));
442 stream_->WriteOrBufferData(absl::string_view(kData1, 2), true, nullptr);
443 EXPECT_TRUE(session_->HasUnackedStreamData());
444 ASSERT_EQ(1u, write_blocked_list_->NumBlockedStreams());
445}
446
447TEST_P(QuicStreamTest, BlockIfSoloFinNotConsumed) {
448 Initialize();
449
450 // Write no data and a fin. If we consume nothing we should be write blocked,
451 // as the fin was not consumed.
452 EXPECT_CALL(*session_, WritevData(kTestStreamId, _, _, _, _, _))
453 .WillOnce(Return(QuicConsumedData(0, false)));
454 stream_->WriteOrBufferData(absl::string_view(), true, nullptr);
455 ASSERT_EQ(1u, write_blocked_list_->NumBlockedStreams());
456}
457
458TEST_P(QuicStreamTest, CloseOnPartialWrite) {
459 Initialize();
460
461 // Write some data and no fin. However, while writing the data
462 // close the stream and verify that MarkConnectionLevelWriteBlocked does not
463 // crash with an unknown stream.
464 EXPECT_CALL(*session_, WritevData(kTestStreamId, _, _, _, _, _))
465 .WillOnce(Invoke(this, &QuicStreamTest::CloseStreamOnWriteError));
466 stream_->WriteOrBufferData(absl::string_view(kData1, 2), false, nullptr);
467 ASSERT_EQ(0u, write_blocked_list_->NumBlockedStreams());
468}
469
470TEST_P(QuicStreamTest, WriteOrBufferData) {
471 Initialize();
472
473 EXPECT_FALSE(HasWriteBlockedStreams());
474 QuicByteCount length =
475 1 + QuicPacketCreator::StreamFramePacketOverhead(
martindukea12ff5d2022-10-12 17:25:51 -0700476 connection_->transport_version(), kPacket8ByteConnectionId,
477 kPacket0ByteConnectionId, !kIncludeVersion,
Bence Békybac04052022-04-07 15:44:29 -0400478 !kIncludeDiversificationNonce, PACKET_4BYTE_PACKET_NUMBER,
dschinazi35c0ff72022-08-16 12:10:06 -0700479 quiche::VARIABLE_LENGTH_INTEGER_LENGTH_0,
480 quiche::VARIABLE_LENGTH_INTEGER_LENGTH_0, 0u);
Bence Békybac04052022-04-07 15:44:29 -0400481 connection_->SetMaxPacketLength(length);
482
483 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
484 .WillOnce(InvokeWithoutArgs([this]() {
485 return session_->ConsumeData(stream_->id(), kDataLen - 1, 0u, NO_FIN,
vasilvv243b2622023-11-07 17:01:30 -0800486 NOT_RETRANSMISSION, std::nullopt);
Bence Békybac04052022-04-07 15:44:29 -0400487 }));
488 stream_->WriteOrBufferData(kData1, false, nullptr);
489
490 EXPECT_TRUE(session_->HasUnackedStreamData());
491 EXPECT_EQ(1u, stream_->BufferedDataBytes());
492 EXPECT_TRUE(HasWriteBlockedStreams());
493
494 // Queue a bytes_consumed write.
495 stream_->WriteOrBufferData(kData2, false, nullptr);
496 EXPECT_EQ(10u, stream_->BufferedDataBytes());
497 // Make sure we get the tail of the first write followed by the bytes_consumed
498 InSequence s;
499 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
500 .WillOnce(InvokeWithoutArgs([this]() {
501 return session_->ConsumeData(stream_->id(), kDataLen - 1, kDataLen - 1,
vasilvv243b2622023-11-07 17:01:30 -0800502 NO_FIN, NOT_RETRANSMISSION, std::nullopt);
Bence Békybac04052022-04-07 15:44:29 -0400503 }));
504 EXPECT_CALL(*stream_, OnCanWriteNewData());
505 stream_->OnCanWrite();
506 EXPECT_TRUE(session_->HasUnackedStreamData());
507
508 // And finally the end of the bytes_consumed.
509 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
510 .WillOnce(InvokeWithoutArgs([this]() {
511 return session_->ConsumeData(stream_->id(), 2u, 2 * kDataLen - 2,
vasilvv243b2622023-11-07 17:01:30 -0800512 NO_FIN, NOT_RETRANSMISSION, std::nullopt);
Bence Békybac04052022-04-07 15:44:29 -0400513 }));
514 EXPECT_CALL(*stream_, OnCanWriteNewData());
515 stream_->OnCanWrite();
516 EXPECT_TRUE(session_->HasUnackedStreamData());
517}
518
519TEST_P(QuicStreamTest, WriteOrBufferDataReachStreamLimit) {
520 Initialize();
521 std::string data("aaaaa");
522 QuicStreamPeer::SetStreamBytesWritten(kMaxStreamLength - data.length(),
523 stream_);
524 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
525 .WillOnce(Invoke(session_.get(), &MockQuicSession::ConsumeData));
526 stream_->WriteOrBufferData(data, false, nullptr);
527 EXPECT_TRUE(session_->HasUnackedStreamData());
vasilvvac2e30d2022-06-02 14:26:59 -0700528 EXPECT_QUIC_BUG(
529 {
530 EXPECT_CALL(*connection_,
531 CloseConnection(QUIC_STREAM_LENGTH_OVERFLOW, _, _));
532 stream_->WriteOrBufferData("a", false, nullptr);
533 },
534 "Write too many data via stream");
Bence Békybac04052022-04-07 15:44:29 -0400535}
536
537TEST_P(QuicStreamTest, ConnectionCloseAfterStreamClose) {
538 Initialize();
539
540 QuicRstStreamFrame rst_frame(kInvalidControlFrameId, stream_->id(),
541 QUIC_STREAM_CANCELLED, 1234);
542 stream_->OnStreamReset(rst_frame);
543 if (VersionHasIetfQuicFrames(session_->transport_version())) {
544 // Create and inject a STOP SENDING frame to complete the close
545 // of the stream. This is only needed for version 99/IETF QUIC.
546 QuicStopSendingFrame stop_sending(kInvalidControlFrameId, stream_->id(),
547 QUIC_STREAM_CANCELLED);
548 session_->OnStopSendingFrame(stop_sending);
549 }
550 EXPECT_THAT(stream_->stream_error(), IsStreamError(QUIC_STREAM_CANCELLED));
551 EXPECT_THAT(stream_->connection_error(), IsQuicNoError());
renjietang20dbb382024-06-20 14:31:32 -0700552 QuicConnectionCloseFrame frame;
553 frame.quic_error_code = QUIC_INTERNAL_ERROR;
554 stream_->OnConnectionClosed(frame, ConnectionCloseSource::FROM_SELF);
Bence Békybac04052022-04-07 15:44:29 -0400555 EXPECT_THAT(stream_->stream_error(), IsStreamError(QUIC_STREAM_CANCELLED));
556 EXPECT_THAT(stream_->connection_error(), IsQuicNoError());
557}
558
559TEST_P(QuicStreamTest, RstAlwaysSentIfNoFinSent) {
560 // For flow control accounting, a stream must send either a FIN or a RST frame
561 // before termination.
562 // Test that if no FIN has been sent, we send a RST.
563
564 Initialize();
565 EXPECT_FALSE(fin_sent());
566 EXPECT_FALSE(rst_sent());
567
568 // Write some data, with no FIN.
569 EXPECT_CALL(*session_, WritevData(kTestStreamId, _, _, _, _, _))
570 .WillOnce(InvokeWithoutArgs([this]() {
571 return session_->ConsumeData(stream_->id(), 1u, 0u, NO_FIN,
vasilvv243b2622023-11-07 17:01:30 -0800572 NOT_RETRANSMISSION, std::nullopt);
Bence Békybac04052022-04-07 15:44:29 -0400573 }));
574 stream_->WriteOrBufferData(absl::string_view(kData1, 1), false, nullptr);
575 EXPECT_TRUE(session_->HasUnackedStreamData());
576 EXPECT_FALSE(fin_sent());
577 EXPECT_FALSE(rst_sent());
578
579 // Now close the stream, and expect that we send a RST.
580 EXPECT_CALL(*session_, MaybeSendRstStreamFrame(kTestStreamId, _, _));
581 QuicRstStreamFrame rst_frame(kInvalidControlFrameId, stream_->id(),
582 QUIC_STREAM_CANCELLED, 1234);
583 stream_->OnStreamReset(rst_frame);
584 if (VersionHasIetfQuicFrames(session_->transport_version())) {
585 // Create and inject a STOP SENDING frame to complete the close
586 // of the stream. This is only needed for version 99/IETF QUIC.
587 QuicStopSendingFrame stop_sending(kInvalidControlFrameId, stream_->id(),
588 QUIC_STREAM_CANCELLED);
589 session_->OnStopSendingFrame(stop_sending);
590 }
591 EXPECT_FALSE(session_->HasUnackedStreamData());
592 EXPECT_FALSE(fin_sent());
593 EXPECT_TRUE(rst_sent());
594}
595
596TEST_P(QuicStreamTest, RstNotSentIfFinSent) {
597 // For flow control accounting, a stream must send either a FIN or a RST frame
598 // before termination.
599 // Test that if a FIN has been sent, we don't also send a RST.
600
601 Initialize();
602 EXPECT_FALSE(fin_sent());
603 EXPECT_FALSE(rst_sent());
604
605 // Write some data, with FIN.
606 EXPECT_CALL(*session_, WritevData(kTestStreamId, _, _, _, _, _))
607 .WillOnce(InvokeWithoutArgs([this]() {
608 return session_->ConsumeData(stream_->id(), 1u, 0u, FIN,
vasilvv243b2622023-11-07 17:01:30 -0800609 NOT_RETRANSMISSION, std::nullopt);
Bence Békybac04052022-04-07 15:44:29 -0400610 }));
611 stream_->WriteOrBufferData(absl::string_view(kData1, 1), true, nullptr);
612 EXPECT_TRUE(fin_sent());
613 EXPECT_FALSE(rst_sent());
614
615 // Now close the stream, and expect that we do not send a RST.
616 QuicStreamPeer::CloseReadSide(stream_);
617 stream_->CloseWriteSide();
618 EXPECT_TRUE(fin_sent());
619 EXPECT_FALSE(rst_sent());
620}
621
622TEST_P(QuicStreamTest, OnlySendOneRst) {
623 // For flow control accounting, a stream must send either a FIN or a RST frame
624 // before termination.
625 // Test that if a stream sends a RST, it doesn't send an additional RST during
626 // OnClose() (this shouldn't be harmful, but we shouldn't do it anyway...)
627
628 Initialize();
629 EXPECT_FALSE(fin_sent());
630 EXPECT_FALSE(rst_sent());
631
632 // Reset the stream.
633 EXPECT_CALL(*session_, MaybeSendRstStreamFrame(kTestStreamId, _, _)).Times(1);
634 stream_->Reset(QUIC_STREAM_CANCELLED);
635 EXPECT_FALSE(fin_sent());
636 EXPECT_TRUE(rst_sent());
637
638 // Now close the stream (any further resets being sent would break the
639 // expectation above).
640 QuicStreamPeer::CloseReadSide(stream_);
641 stream_->CloseWriteSide();
642 EXPECT_FALSE(fin_sent());
643 EXPECT_TRUE(rst_sent());
644}
645
646TEST_P(QuicStreamTest, StreamFlowControlMultipleWindowUpdates) {
647 Initialize();
648
649 // If we receive multiple WINDOW_UPDATES (potentially out of order), then we
650 // want to make sure we latch the largest offset we see.
651
652 // Initially should be default.
653 EXPECT_EQ(kMinimumFlowControlSendWindow,
654 QuicStreamPeer::SendWindowOffset(stream_));
655
656 // Check a single WINDOW_UPDATE results in correct offset.
657 QuicWindowUpdateFrame window_update_1(kInvalidControlFrameId, stream_->id(),
658 kMinimumFlowControlSendWindow + 5);
659 stream_->OnWindowUpdateFrame(window_update_1);
660 EXPECT_EQ(window_update_1.max_data,
661 QuicStreamPeer::SendWindowOffset(stream_));
662
663 // Now send a few more WINDOW_UPDATES and make sure that only the largest is
664 // remembered.
665 QuicWindowUpdateFrame window_update_2(kInvalidControlFrameId, stream_->id(),
666 1);
667 QuicWindowUpdateFrame window_update_3(kInvalidControlFrameId, stream_->id(),
668 kMinimumFlowControlSendWindow + 10);
669 QuicWindowUpdateFrame window_update_4(kInvalidControlFrameId, stream_->id(),
670 5678);
671 stream_->OnWindowUpdateFrame(window_update_2);
672 stream_->OnWindowUpdateFrame(window_update_3);
673 stream_->OnWindowUpdateFrame(window_update_4);
674 EXPECT_EQ(window_update_3.max_data,
675 QuicStreamPeer::SendWindowOffset(stream_));
676}
677
678TEST_P(QuicStreamTest, FrameStats) {
679 Initialize();
680
681 EXPECT_EQ(0, stream_->num_frames_received());
682 EXPECT_EQ(0, stream_->num_duplicate_frames_received());
683 QuicStreamFrame frame(stream_->id(), false, 0, ".");
684 EXPECT_CALL(*stream_, OnDataAvailable()).Times(2);
685 stream_->OnStreamFrame(frame);
686 EXPECT_EQ(1, stream_->num_frames_received());
687 EXPECT_EQ(0, stream_->num_duplicate_frames_received());
688 stream_->OnStreamFrame(frame);
689 EXPECT_EQ(2, stream_->num_frames_received());
690 EXPECT_EQ(1, stream_->num_duplicate_frames_received());
691 QuicStreamFrame frame2(stream_->id(), false, 1, "abc");
692 stream_->OnStreamFrame(frame2);
693}
694
695// Verify that when we receive a packet which violates flow control (i.e. sends
696// too much data on the stream) that the stream sequencer never sees this frame,
697// as we check for violation and close the connection early.
698TEST_P(QuicStreamTest, StreamSequencerNeverSeesPacketsViolatingFlowControl) {
699 Initialize();
700
701 // Receive a stream frame that violates flow control: the byte offset is
702 // higher than the receive window offset.
703 QuicStreamFrame frame(stream_->id(), false,
704 kInitialSessionFlowControlWindowForTest + 1, ".");
705 EXPECT_GT(frame.offset, QuicStreamPeer::ReceiveWindowOffset(stream_));
706
707 // Stream should not accept the frame, and the connection should be closed.
708 EXPECT_CALL(*connection_,
709 CloseConnection(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA, _, _));
710 stream_->OnStreamFrame(frame);
711}
712
713// Verify that after the consumer calls StopReading(), the stream still sends
714// flow control updates.
715TEST_P(QuicStreamTest, StopReadingSendsFlowControl) {
716 Initialize();
717
718 stream_->StopReading();
719
720 // Connection should not get terminated due to flow control errors.
721 EXPECT_CALL(*connection_,
722 CloseConnection(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA, _, _))
723 .Times(0);
724 EXPECT_CALL(*session_, WriteControlFrame(_, _))
725 .Times(AtLeast(1))
726 .WillRepeatedly(Invoke(&ClearControlFrameWithTransmissionType));
727
728 std::string data(1000, 'x');
729 for (QuicStreamOffset offset = 0;
730 offset < 2 * kInitialStreamFlowControlWindowForTest;
731 offset += data.length()) {
732 QuicStreamFrame frame(stream_->id(), false, offset, data);
733 stream_->OnStreamFrame(frame);
734 }
735 EXPECT_LT(kInitialStreamFlowControlWindowForTest,
736 QuicStreamPeer::ReceiveWindowOffset(stream_));
737}
738
739TEST_P(QuicStreamTest, FinalByteOffsetFromFin) {
740 Initialize();
741
742 EXPECT_FALSE(stream_->HasReceivedFinalOffset());
743
744 QuicStreamFrame stream_frame_no_fin(stream_->id(), false, 1234, ".");
745 stream_->OnStreamFrame(stream_frame_no_fin);
746 EXPECT_FALSE(stream_->HasReceivedFinalOffset());
747
748 QuicStreamFrame stream_frame_with_fin(stream_->id(), true, 1234, ".");
749 stream_->OnStreamFrame(stream_frame_with_fin);
750 EXPECT_TRUE(stream_->HasReceivedFinalOffset());
751}
752
753TEST_P(QuicStreamTest, FinalByteOffsetFromRst) {
754 Initialize();
755
756 EXPECT_FALSE(stream_->HasReceivedFinalOffset());
757 QuicRstStreamFrame rst_frame(kInvalidControlFrameId, stream_->id(),
758 QUIC_STREAM_CANCELLED, 1234);
759 stream_->OnStreamReset(rst_frame);
760 EXPECT_TRUE(stream_->HasReceivedFinalOffset());
761}
762
763TEST_P(QuicStreamTest, InvalidFinalByteOffsetFromRst) {
764 Initialize();
765
766 EXPECT_FALSE(stream_->HasReceivedFinalOffset());
767 QuicRstStreamFrame rst_frame(kInvalidControlFrameId, stream_->id(),
768 QUIC_STREAM_CANCELLED, 0xFFFFFFFFFFFF);
769 // Stream should not accept the frame, and the connection should be closed.
770 EXPECT_CALL(*connection_,
771 CloseConnection(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA, _, _));
772 stream_->OnStreamReset(rst_frame);
773 EXPECT_TRUE(stream_->HasReceivedFinalOffset());
774}
775
776TEST_P(QuicStreamTest, FinalByteOffsetFromZeroLengthStreamFrame) {
777 // When receiving Trailers, an empty stream frame is created with the FIN set,
778 // and is passed to OnStreamFrame. The Trailers may be sent in advance of
779 // queued body bytes being sent, and thus the final byte offset may exceed
780 // current flow control limits. Flow control should only be concerned with
781 // data that has actually been sent/received, so verify that flow control
782 // ignores such a stream frame.
783 Initialize();
784
785 EXPECT_FALSE(stream_->HasReceivedFinalOffset());
786 const QuicStreamOffset kByteOffsetExceedingFlowControlWindow =
787 kInitialSessionFlowControlWindowForTest + 1;
788 const QuicStreamOffset current_stream_flow_control_offset =
789 QuicStreamPeer::ReceiveWindowOffset(stream_);
790 const QuicStreamOffset current_connection_flow_control_offset =
791 QuicFlowControllerPeer::ReceiveWindowOffset(session_->flow_controller());
792 ASSERT_GT(kByteOffsetExceedingFlowControlWindow,
793 current_stream_flow_control_offset);
794 ASSERT_GT(kByteOffsetExceedingFlowControlWindow,
795 current_connection_flow_control_offset);
796 QuicStreamFrame zero_length_stream_frame_with_fin(
797 stream_->id(), /*fin=*/true, kByteOffsetExceedingFlowControlWindow,
798 absl::string_view());
799 EXPECT_EQ(0, zero_length_stream_frame_with_fin.data_length);
800
801 EXPECT_CALL(*connection_, CloseConnection(_, _, _)).Times(0);
802 stream_->OnStreamFrame(zero_length_stream_frame_with_fin);
803 EXPECT_TRUE(stream_->HasReceivedFinalOffset());
804
805 // The flow control receive offset values should not have changed.
806 EXPECT_EQ(current_stream_flow_control_offset,
807 QuicStreamPeer::ReceiveWindowOffset(stream_));
808 EXPECT_EQ(
809 current_connection_flow_control_offset,
810 QuicFlowControllerPeer::ReceiveWindowOffset(session_->flow_controller()));
811}
812
813TEST_P(QuicStreamTest, OnStreamResetOffsetOverflow) {
814 Initialize();
815 QuicRstStreamFrame rst_frame(kInvalidControlFrameId, stream_->id(),
816 QUIC_STREAM_CANCELLED, kMaxStreamLength + 1);
817 EXPECT_CALL(*connection_, CloseConnection(QUIC_STREAM_LENGTH_OVERFLOW, _, _));
818 stream_->OnStreamReset(rst_frame);
819}
820
821TEST_P(QuicStreamTest, OnStreamFrameUpperLimit) {
822 Initialize();
823
824 // Modify receive window offset and sequencer buffer total_bytes_read_ to
825 // avoid flow control violation.
826 QuicStreamPeer::SetReceiveWindowOffset(stream_, kMaxStreamLength + 5u);
827 QuicFlowControllerPeer::SetReceiveWindowOffset(session_->flow_controller(),
828 kMaxStreamLength + 5u);
829 QuicStreamSequencerPeer::SetFrameBufferTotalBytesRead(
830 QuicStreamPeer::sequencer(stream_), kMaxStreamLength - 10u);
831
832 EXPECT_CALL(*connection_, CloseConnection(QUIC_STREAM_LENGTH_OVERFLOW, _, _))
833 .Times(0);
834 QuicStreamFrame stream_frame(stream_->id(), false, kMaxStreamLength - 1, ".");
835 stream_->OnStreamFrame(stream_frame);
836 QuicStreamFrame stream_frame2(stream_->id(), true, kMaxStreamLength, "");
837 stream_->OnStreamFrame(stream_frame2);
838}
839
840TEST_P(QuicStreamTest, StreamTooLong) {
841 Initialize();
Bence Békybac04052022-04-07 15:44:29 -0400842 QuicStreamFrame stream_frame(stream_->id(), false, kMaxStreamLength, ".");
843 EXPECT_QUIC_PEER_BUG(
vasilvvac2e30d2022-06-02 14:26:59 -0700844 {
845 EXPECT_CALL(*connection_,
846 CloseConnection(QUIC_STREAM_LENGTH_OVERFLOW, _, _))
847 .Times(1);
848 stream_->OnStreamFrame(stream_frame);
849 },
Bence Békybac04052022-04-07 15:44:29 -0400850 absl::StrCat("Receive stream frame on stream ", stream_->id(),
851 " reaches max stream length"));
852}
853
854TEST_P(QuicStreamTest, SetDrainingIncomingOutgoing) {
855 // Don't have incoming data consumed.
856 Initialize();
857
858 // Incoming data with FIN.
859 QuicStreamFrame stream_frame_with_fin(stream_->id(), true, 1234, ".");
860 stream_->OnStreamFrame(stream_frame_with_fin);
861 // The FIN has been received but not consumed.
862 EXPECT_TRUE(stream_->HasReceivedFinalOffset());
863 EXPECT_FALSE(QuicStreamPeer::read_side_closed(stream_));
864 EXPECT_FALSE(stream_->reading_stopped());
865
866 EXPECT_EQ(1u, QuicSessionPeer::GetNumOpenDynamicStreams(session_.get()));
867
868 // Outgoing data with FIN.
869 EXPECT_CALL(*session_, WritevData(kTestStreamId, _, _, _, _, _))
870 .WillOnce(InvokeWithoutArgs([this]() {
871 return session_->ConsumeData(stream_->id(), 2u, 0u, FIN,
vasilvv243b2622023-11-07 17:01:30 -0800872 NOT_RETRANSMISSION, std::nullopt);
Bence Békybac04052022-04-07 15:44:29 -0400873 }));
874 stream_->WriteOrBufferData(absl::string_view(kData1, 2), true, nullptr);
875 EXPECT_TRUE(stream_->write_side_closed());
876
877 EXPECT_EQ(1u, QuicSessionPeer::GetNumDrainingStreams(session_.get()));
878 EXPECT_EQ(0u, QuicSessionPeer::GetNumOpenDynamicStreams(session_.get()));
879}
880
881TEST_P(QuicStreamTest, SetDrainingOutgoingIncoming) {
882 // Don't have incoming data consumed.
883 Initialize();
884
885 // Outgoing data with FIN.
886 EXPECT_CALL(*session_, WritevData(kTestStreamId, _, _, _, _, _))
887 .WillOnce(InvokeWithoutArgs([this]() {
888 return session_->ConsumeData(stream_->id(), 2u, 0u, FIN,
vasilvv243b2622023-11-07 17:01:30 -0800889 NOT_RETRANSMISSION, std::nullopt);
Bence Békybac04052022-04-07 15:44:29 -0400890 }));
891 stream_->WriteOrBufferData(absl::string_view(kData1, 2), true, nullptr);
892 EXPECT_TRUE(stream_->write_side_closed());
893
894 EXPECT_EQ(1u, QuicSessionPeer::GetNumOpenDynamicStreams(session_.get()));
895
896 // Incoming data with FIN.
897 QuicStreamFrame stream_frame_with_fin(stream_->id(), true, 1234, ".");
898 stream_->OnStreamFrame(stream_frame_with_fin);
899 // The FIN has been received but not consumed.
900 EXPECT_TRUE(stream_->HasReceivedFinalOffset());
901 EXPECT_FALSE(QuicStreamPeer::read_side_closed(stream_));
902 EXPECT_FALSE(stream_->reading_stopped());
903
904 EXPECT_EQ(1u, QuicSessionPeer::GetNumDrainingStreams(session_.get()));
905 EXPECT_EQ(0u, QuicSessionPeer::GetNumOpenDynamicStreams(session_.get()));
906}
907
908TEST_P(QuicStreamTest, EarlyResponseFinHandling) {
909 // Verify that if the server completes the response before reading the end of
910 // the request, the received FIN is recorded.
911
912 Initialize();
913 EXPECT_CALL(*connection_, CloseConnection(_, _, _)).Times(0);
914 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
915 .WillRepeatedly(Invoke(session_.get(), &MockQuicSession::ConsumeData));
916
917 // Receive data for the request.
918 EXPECT_CALL(*stream_, OnDataAvailable()).Times(1);
919 QuicStreamFrame frame1(stream_->id(), false, 0, "Start");
920 stream_->OnStreamFrame(frame1);
921 // When QuicSimpleServerStream sends the response, it calls
922 // QuicStream::CloseReadSide() first.
923 QuicStreamPeer::CloseReadSide(stream_);
924 // Send data and FIN for the response.
925 stream_->WriteOrBufferData(kData1, false, nullptr);
926 EXPECT_TRUE(QuicStreamPeer::read_side_closed(stream_));
927 // Receive remaining data and FIN for the request.
928 QuicStreamFrame frame2(stream_->id(), true, 0, "End");
929 stream_->OnStreamFrame(frame2);
930 EXPECT_TRUE(stream_->fin_received());
931 EXPECT_TRUE(stream_->HasReceivedFinalOffset());
932}
933
934TEST_P(QuicStreamTest, StreamWaitsForAcks) {
935 Initialize();
936 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
937 .WillRepeatedly(Invoke(session_.get(), &MockQuicSession::ConsumeData));
938 // Stream is not waiting for acks initially.
939 EXPECT_FALSE(stream_->IsWaitingForAcks());
940 EXPECT_EQ(0u, QuicStreamPeer::SendBuffer(stream_).size());
941 EXPECT_FALSE(session_->HasUnackedStreamData());
942
943 // Send kData1.
944 stream_->WriteOrBufferData(kData1, false, nullptr);
945 EXPECT_TRUE(session_->HasUnackedStreamData());
946 EXPECT_EQ(1u, QuicStreamPeer::SendBuffer(stream_).size());
947 EXPECT_TRUE(stream_->IsWaitingForAcks());
948 QuicByteCount newly_acked_length = 0;
949 EXPECT_TRUE(stream_->OnStreamFrameAcked(0, 9, false, QuicTime::Delta::Zero(),
950 QuicTime::Zero(),
951 &newly_acked_length));
952 EXPECT_EQ(9u, newly_acked_length);
953 // Stream is not waiting for acks as all sent data is acked.
954 EXPECT_FALSE(stream_->IsWaitingForAcks());
955 EXPECT_FALSE(session_->HasUnackedStreamData());
956 EXPECT_EQ(0u, QuicStreamPeer::SendBuffer(stream_).size());
957
958 // Send kData2.
959 stream_->WriteOrBufferData(kData2, false, nullptr);
960 EXPECT_TRUE(stream_->IsWaitingForAcks());
961 EXPECT_TRUE(session_->HasUnackedStreamData());
962 EXPECT_EQ(1u, QuicStreamPeer::SendBuffer(stream_).size());
963 // Send FIN.
964 stream_->WriteOrBufferData("", true, nullptr);
965 // Fin only frame is not stored in send buffer.
966 EXPECT_EQ(1u, QuicStreamPeer::SendBuffer(stream_).size());
967
968 // kData2 is retransmitted.
969 stream_->OnStreamFrameRetransmitted(9, 9, false);
970
971 // kData2 is acked.
972 EXPECT_TRUE(stream_->OnStreamFrameAcked(9, 9, false, QuicTime::Delta::Zero(),
973 QuicTime::Zero(),
974 &newly_acked_length));
975 EXPECT_EQ(9u, newly_acked_length);
976 // Stream is waiting for acks as FIN is not acked.
977 EXPECT_TRUE(stream_->IsWaitingForAcks());
978 EXPECT_TRUE(session_->HasUnackedStreamData());
979 EXPECT_EQ(0u, QuicStreamPeer::SendBuffer(stream_).size());
980
981 // FIN is acked.
982 EXPECT_CALL(*stream_, OnWriteSideInDataRecvdState());
983 EXPECT_TRUE(stream_->OnStreamFrameAcked(18, 0, true, QuicTime::Delta::Zero(),
984 QuicTime::Zero(),
985 &newly_acked_length));
986 EXPECT_EQ(0u, newly_acked_length);
987 EXPECT_FALSE(stream_->IsWaitingForAcks());
988 EXPECT_FALSE(session_->HasUnackedStreamData());
989 EXPECT_EQ(0u, QuicStreamPeer::SendBuffer(stream_).size());
990}
991
992TEST_P(QuicStreamTest, StreamDataGetAckedOutOfOrder) {
993 Initialize();
994 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
995 .WillRepeatedly(Invoke(session_.get(), &MockQuicSession::ConsumeData));
996 // Send data.
997 stream_->WriteOrBufferData(kData1, false, nullptr);
998 stream_->WriteOrBufferData(kData1, false, nullptr);
999 stream_->WriteOrBufferData(kData1, false, nullptr);
1000 stream_->WriteOrBufferData("", true, nullptr);
1001 EXPECT_EQ(3u, QuicStreamPeer::SendBuffer(stream_).size());
1002 EXPECT_TRUE(stream_->IsWaitingForAcks());
1003 EXPECT_TRUE(session_->HasUnackedStreamData());
1004 QuicByteCount newly_acked_length = 0;
1005 EXPECT_TRUE(stream_->OnStreamFrameAcked(9, 9, false, QuicTime::Delta::Zero(),
1006 QuicTime::Zero(),
1007 &newly_acked_length));
1008 EXPECT_TRUE(session_->HasUnackedStreamData());
1009 EXPECT_EQ(9u, newly_acked_length);
1010 EXPECT_EQ(3u, QuicStreamPeer::SendBuffer(stream_).size());
1011 EXPECT_TRUE(stream_->OnStreamFrameAcked(18, 9, false, QuicTime::Delta::Zero(),
1012 QuicTime::Zero(),
1013 &newly_acked_length));
1014 EXPECT_TRUE(session_->HasUnackedStreamData());
1015 EXPECT_EQ(9u, newly_acked_length);
1016 EXPECT_EQ(3u, QuicStreamPeer::SendBuffer(stream_).size());
1017 EXPECT_TRUE(stream_->OnStreamFrameAcked(0, 9, false, QuicTime::Delta::Zero(),
1018 QuicTime::Zero(),
1019 &newly_acked_length));
1020 EXPECT_TRUE(session_->HasUnackedStreamData());
1021 EXPECT_EQ(9u, newly_acked_length);
1022 EXPECT_EQ(0u, QuicStreamPeer::SendBuffer(stream_).size());
1023 // FIN is not acked yet.
1024 EXPECT_TRUE(stream_->IsWaitingForAcks());
1025 EXPECT_TRUE(session_->HasUnackedStreamData());
1026 EXPECT_CALL(*stream_, OnWriteSideInDataRecvdState());
1027 EXPECT_TRUE(stream_->OnStreamFrameAcked(27, 0, true, QuicTime::Delta::Zero(),
1028 QuicTime::Zero(),
1029 &newly_acked_length));
1030 EXPECT_EQ(0u, newly_acked_length);
1031 EXPECT_FALSE(stream_->IsWaitingForAcks());
1032 EXPECT_FALSE(session_->HasUnackedStreamData());
1033}
1034
1035TEST_P(QuicStreamTest, CancelStream) {
1036 Initialize();
1037 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1038 .WillRepeatedly(Invoke(session_.get(), &MockQuicSession::ConsumeData));
1039 EXPECT_FALSE(stream_->IsWaitingForAcks());
1040 EXPECT_FALSE(session_->HasUnackedStreamData());
1041 EXPECT_EQ(0u, QuicStreamPeer::SendBuffer(stream_).size());
1042
1043 stream_->WriteOrBufferData(kData1, false, nullptr);
1044 EXPECT_TRUE(stream_->IsWaitingForAcks());
1045 EXPECT_TRUE(session_->HasUnackedStreamData());
1046 EXPECT_EQ(1u, QuicStreamPeer::SendBuffer(stream_).size());
1047 // Cancel stream.
1048 stream_->MaybeSendStopSending(QUIC_STREAM_NO_ERROR);
1049 // stream still waits for acks as the error code is QUIC_STREAM_NO_ERROR, and
1050 // data is going to be retransmitted.
1051 EXPECT_TRUE(stream_->IsWaitingForAcks());
1052 EXPECT_TRUE(session_->HasUnackedStreamData());
1053 EXPECT_CALL(*connection_,
1054 OnStreamReset(stream_->id(), QUIC_STREAM_CANCELLED));
1055 EXPECT_CALL(*session_, WriteControlFrame(_, _))
1056 .Times(AtLeast(1))
1057 .WillRepeatedly(Invoke(&ClearControlFrameWithTransmissionType));
1058
1059 EXPECT_CALL(*session_, MaybeSendRstStreamFrame(_, _, _))
1060 .WillOnce(InvokeWithoutArgs([this]() {
1061 session_->ReallyMaybeSendRstStreamFrame(
1062 stream_->id(), QUIC_STREAM_CANCELLED,
1063 stream_->stream_bytes_written());
1064 }));
1065
1066 stream_->Reset(QUIC_STREAM_CANCELLED);
1067 EXPECT_EQ(1u, QuicStreamPeer::SendBuffer(stream_).size());
1068 // Stream stops waiting for acks as data is not going to be retransmitted.
1069 EXPECT_FALSE(stream_->IsWaitingForAcks());
1070 EXPECT_FALSE(session_->HasUnackedStreamData());
1071}
1072
1073TEST_P(QuicStreamTest, RstFrameReceivedStreamNotFinishSending) {
1074 if (VersionHasIetfQuicFrames(GetParam().transport_version)) {
1075 // In IETF QUIC, receiving a RESET_STREAM will only close the read side. The
1076 // stream itself is not closed and will not send reset.
1077 return;
1078 }
1079
1080 Initialize();
1081 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1082 .WillRepeatedly(Invoke(session_.get(), &MockQuicSession::ConsumeData));
1083 EXPECT_FALSE(stream_->IsWaitingForAcks());
1084 EXPECT_FALSE(session_->HasUnackedStreamData());
1085 EXPECT_EQ(0u, QuicStreamPeer::SendBuffer(stream_).size());
1086
1087 stream_->WriteOrBufferData(kData1, false, nullptr);
1088 EXPECT_TRUE(stream_->IsWaitingForAcks());
1089 EXPECT_TRUE(session_->HasUnackedStreamData());
1090 EXPECT_EQ(1u, QuicStreamPeer::SendBuffer(stream_).size());
1091
1092 // RST_STREAM received.
1093 QuicRstStreamFrame rst_frame(kInvalidControlFrameId, stream_->id(),
1094 QUIC_STREAM_CANCELLED, 9);
1095
1096 EXPECT_CALL(
1097 *session_,
1098 MaybeSendRstStreamFrame(
1099 stream_->id(),
1100 QuicResetStreamError::FromInternal(QUIC_RST_ACKNOWLEDGEMENT), 9));
1101 stream_->OnStreamReset(rst_frame);
1102 EXPECT_EQ(1u, QuicStreamPeer::SendBuffer(stream_).size());
1103 // Stream stops waiting for acks as it does not finish sending and rst is
1104 // sent.
1105 EXPECT_FALSE(stream_->IsWaitingForAcks());
1106 EXPECT_FALSE(session_->HasUnackedStreamData());
1107}
1108
1109TEST_P(QuicStreamTest, RstFrameReceivedStreamFinishSending) {
1110 Initialize();
1111 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1112 .WillRepeatedly(Invoke(session_.get(), &MockQuicSession::ConsumeData));
1113 EXPECT_FALSE(stream_->IsWaitingForAcks());
1114 EXPECT_FALSE(session_->HasUnackedStreamData());
1115 EXPECT_EQ(0u, QuicStreamPeer::SendBuffer(stream_).size());
1116
1117 stream_->WriteOrBufferData(kData1, true, nullptr);
1118 EXPECT_TRUE(stream_->IsWaitingForAcks());
1119 EXPECT_TRUE(session_->HasUnackedStreamData());
1120
1121 // RST_STREAM received.
1122 QuicRstStreamFrame rst_frame(kInvalidControlFrameId, stream_->id(),
1123 QUIC_STREAM_CANCELLED, 1234);
1124 stream_->OnStreamReset(rst_frame);
1125 // Stream still waits for acks as it finishes sending and has unacked data.
1126 EXPECT_TRUE(stream_->IsWaitingForAcks());
1127 EXPECT_TRUE(session_->HasUnackedStreamData());
1128 EXPECT_EQ(1u, QuicStreamPeer::SendBuffer(stream_).size());
1129}
1130
1131TEST_P(QuicStreamTest, ConnectionClosed) {
1132 Initialize();
1133 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1134 .WillRepeatedly(Invoke(session_.get(), &MockQuicSession::ConsumeData));
1135 EXPECT_FALSE(stream_->IsWaitingForAcks());
1136 EXPECT_FALSE(session_->HasUnackedStreamData());
1137 EXPECT_EQ(0u, QuicStreamPeer::SendBuffer(stream_).size());
1138
1139 stream_->WriteOrBufferData(kData1, false, nullptr);
1140 EXPECT_TRUE(stream_->IsWaitingForAcks());
1141 EXPECT_TRUE(session_->HasUnackedStreamData());
1142 EXPECT_CALL(
1143 *session_,
1144 MaybeSendRstStreamFrame(
1145 stream_->id(),
1146 QuicResetStreamError::FromInternal(QUIC_RST_ACKNOWLEDGEMENT), 9));
1147 QuicConnectionPeer::SetConnectionClose(connection_);
renjietang20dbb382024-06-20 14:31:32 -07001148 QuicConnectionCloseFrame frame;
1149 frame.quic_error_code = QUIC_INTERNAL_ERROR;
1150 stream_->OnConnectionClosed(frame, ConnectionCloseSource::FROM_SELF);
Bence Békybac04052022-04-07 15:44:29 -04001151 EXPECT_EQ(1u, QuicStreamPeer::SendBuffer(stream_).size());
1152 // Stream stops waiting for acks as connection is going to close.
1153 EXPECT_FALSE(stream_->IsWaitingForAcks());
1154 EXPECT_FALSE(session_->HasUnackedStreamData());
1155}
1156
1157TEST_P(QuicStreamTest, CanWriteNewDataAfterData) {
birenroyef686222022-09-12 11:34:34 -07001158 SetQuicFlag(quic_buffered_data_threshold, 100);
Bence Békybac04052022-04-07 15:44:29 -04001159 Initialize();
1160 EXPECT_TRUE(stream_->CanWriteNewDataAfterData(99));
1161 EXPECT_FALSE(stream_->CanWriteNewDataAfterData(100));
1162}
1163
1164TEST_P(QuicStreamTest, WriteBufferedData) {
1165 // Set buffered data low water mark to be 100.
birenroyef686222022-09-12 11:34:34 -07001166 SetQuicFlag(quic_buffered_data_threshold, 100);
Bence Békybac04052022-04-07 15:44:29 -04001167
1168 Initialize();
1169 std::string data(1024, 'a');
1170 EXPECT_TRUE(stream_->CanWriteNewData());
1171
1172 // Testing WriteOrBufferData.
1173 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1174 .WillOnce(InvokeWithoutArgs([this]() {
1175 return session_->ConsumeData(stream_->id(), 100u, 0u, NO_FIN,
vasilvv243b2622023-11-07 17:01:30 -08001176 NOT_RETRANSMISSION, std::nullopt);
Bence Békybac04052022-04-07 15:44:29 -04001177 }));
1178 stream_->WriteOrBufferData(data, false, nullptr);
1179 stream_->WriteOrBufferData(data, false, nullptr);
1180 stream_->WriteOrBufferData(data, false, nullptr);
1181 EXPECT_TRUE(stream_->IsWaitingForAcks());
1182
1183 // Verify all data is saved.
1184 EXPECT_EQ(3 * data.length() - 100, stream_->BufferedDataBytes());
1185
1186 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1187 .WillOnce(InvokeWithoutArgs([this]() {
1188 return session_->ConsumeData(stream_->id(), 100, 100u, NO_FIN,
vasilvv243b2622023-11-07 17:01:30 -08001189 NOT_RETRANSMISSION, std::nullopt);
Bence Békybac04052022-04-07 15:44:29 -04001190 }));
1191 // Buffered data size > threshold, do not ask upper layer for more data.
1192 EXPECT_CALL(*stream_, OnCanWriteNewData()).Times(0);
1193 stream_->OnCanWrite();
1194 EXPECT_EQ(3 * data.length() - 200, stream_->BufferedDataBytes());
1195 EXPECT_FALSE(stream_->CanWriteNewData());
1196
1197 // Send buffered data to make buffered data size < threshold.
1198 QuicByteCount data_to_write =
birenroyef686222022-09-12 11:34:34 -07001199 3 * data.length() - 200 - GetQuicFlag(quic_buffered_data_threshold) + 1;
Bence Békybac04052022-04-07 15:44:29 -04001200 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1201 .WillOnce(InvokeWithoutArgs([this, data_to_write]() {
1202 return session_->ConsumeData(stream_->id(), data_to_write, 200u, NO_FIN,
vasilvv243b2622023-11-07 17:01:30 -08001203 NOT_RETRANSMISSION, std::nullopt);
Bence Békybac04052022-04-07 15:44:29 -04001204 }));
1205 // Buffered data size < threshold, ask upper layer for more data.
1206 EXPECT_CALL(*stream_, OnCanWriteNewData()).Times(1);
1207 stream_->OnCanWrite();
birenroyef686222022-09-12 11:34:34 -07001208 EXPECT_EQ(
1209 static_cast<uint64_t>(GetQuicFlag(quic_buffered_data_threshold) - 1),
1210 stream_->BufferedDataBytes());
Bence Békybac04052022-04-07 15:44:29 -04001211 EXPECT_TRUE(stream_->CanWriteNewData());
1212
1213 // Flush all buffered data.
1214 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1215 .WillOnce(Invoke(session_.get(), &MockQuicSession::ConsumeData));
1216 EXPECT_CALL(*stream_, OnCanWriteNewData()).Times(1);
1217 stream_->OnCanWrite();
1218 EXPECT_EQ(0u, stream_->BufferedDataBytes());
1219 EXPECT_FALSE(stream_->HasBufferedData());
1220 EXPECT_TRUE(stream_->CanWriteNewData());
1221
1222 // Testing Writev.
1223 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1224 .WillOnce(Return(QuicConsumedData(0, false)));
1225 struct iovec iov = {const_cast<char*>(data.data()), data.length()};
1226 quiche::QuicheMemSliceStorage storage(
1227 &iov, 1, session_->connection()->helper()->GetStreamSendBufferAllocator(),
1228 1024);
1229 QuicConsumedData consumed = stream_->WriteMemSlices(storage.ToSpan(), false);
1230
1231 // There is no buffered data before, all data should be consumed without
1232 // respecting buffered data upper limit.
1233 EXPECT_EQ(data.length(), consumed.bytes_consumed);
1234 EXPECT_FALSE(consumed.fin_consumed);
1235 EXPECT_EQ(data.length(), stream_->BufferedDataBytes());
1236 EXPECT_FALSE(stream_->CanWriteNewData());
1237
1238 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _)).Times(0);
1239 quiche::QuicheMemSliceStorage storage2(
1240 &iov, 1, session_->connection()->helper()->GetStreamSendBufferAllocator(),
1241 1024);
1242 consumed = stream_->WriteMemSlices(storage2.ToSpan(), false);
1243 // No Data can be consumed as buffered data is beyond upper limit.
1244 EXPECT_EQ(0u, consumed.bytes_consumed);
1245 EXPECT_FALSE(consumed.fin_consumed);
1246 EXPECT_EQ(data.length(), stream_->BufferedDataBytes());
1247
birenroyef686222022-09-12 11:34:34 -07001248 data_to_write = data.length() - GetQuicFlag(quic_buffered_data_threshold) + 1;
Bence Békybac04052022-04-07 15:44:29 -04001249 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1250 .WillOnce(InvokeWithoutArgs([this, data_to_write]() {
1251 return session_->ConsumeData(stream_->id(), data_to_write, 0u, NO_FIN,
vasilvv243b2622023-11-07 17:01:30 -08001252 NOT_RETRANSMISSION, std::nullopt);
Bence Békybac04052022-04-07 15:44:29 -04001253 }));
1254
1255 EXPECT_CALL(*stream_, OnCanWriteNewData()).Times(1);
1256 stream_->OnCanWrite();
birenroyef686222022-09-12 11:34:34 -07001257 EXPECT_EQ(
1258 static_cast<uint64_t>(GetQuicFlag(quic_buffered_data_threshold) - 1),
1259 stream_->BufferedDataBytes());
Bence Békybac04052022-04-07 15:44:29 -04001260 EXPECT_TRUE(stream_->CanWriteNewData());
1261
1262 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _)).Times(0);
1263 // All data can be consumed as buffered data is below upper limit.
1264 quiche::QuicheMemSliceStorage storage3(
1265 &iov, 1, session_->connection()->helper()->GetStreamSendBufferAllocator(),
1266 1024);
1267 consumed = stream_->WriteMemSlices(storage3.ToSpan(), false);
1268 EXPECT_EQ(data.length(), consumed.bytes_consumed);
1269 EXPECT_FALSE(consumed.fin_consumed);
birenroyef686222022-09-12 11:34:34 -07001270 EXPECT_EQ(data.length() + GetQuicFlag(quic_buffered_data_threshold) - 1,
Bence Békybac04052022-04-07 15:44:29 -04001271 stream_->BufferedDataBytes());
1272 EXPECT_FALSE(stream_->CanWriteNewData());
1273}
1274
1275TEST_P(QuicStreamTest, WritevDataReachStreamLimit) {
1276 Initialize();
1277 std::string data("aaaaa");
1278 QuicStreamPeer::SetStreamBytesWritten(kMaxStreamLength - data.length(),
1279 stream_);
1280 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1281 .WillOnce(Invoke(session_.get(), &MockQuicSession::ConsumeData));
1282 struct iovec iov = {const_cast<char*>(data.data()), 5u};
1283 quiche::QuicheMemSliceStorage storage(
1284 &iov, 1, session_->connection()->helper()->GetStreamSendBufferAllocator(),
1285 1024);
1286 QuicConsumedData consumed = stream_->WriteMemSlices(storage.ToSpan(), false);
1287 EXPECT_EQ(data.length(), consumed.bytes_consumed);
1288 struct iovec iov2 = {const_cast<char*>(data.data()), 1u};
1289 quiche::QuicheMemSliceStorage storage2(
1290 &iov2, 1,
1291 session_->connection()->helper()->GetStreamSendBufferAllocator(), 1024);
vasilvvac2e30d2022-06-02 14:26:59 -07001292 EXPECT_QUIC_BUG(
1293 {
1294 EXPECT_CALL(*connection_,
1295 CloseConnection(QUIC_STREAM_LENGTH_OVERFLOW, _, _));
1296 stream_->WriteMemSlices(storage2.ToSpan(), false);
1297 },
1298 "Write too many data via stream");
Bence Békybac04052022-04-07 15:44:29 -04001299}
1300
1301TEST_P(QuicStreamTest, WriteMemSlices) {
1302 // Set buffered data low water mark to be 100.
birenroyef686222022-09-12 11:34:34 -07001303 SetQuicFlag(quic_buffered_data_threshold, 100);
Bence Békybac04052022-04-07 15:44:29 -04001304
1305 Initialize();
1306 constexpr QuicByteCount kDataSize = 1024;
1307 quiche::QuicheBufferAllocator* allocator =
1308 connection_->helper()->GetStreamSendBufferAllocator();
1309 std::vector<quiche::QuicheMemSlice> vector1;
1310 vector1.push_back(
1311 quiche::QuicheMemSlice(quiche::QuicheBuffer(allocator, kDataSize)));
1312 vector1.push_back(
1313 quiche::QuicheMemSlice(quiche::QuicheBuffer(allocator, kDataSize)));
1314 std::vector<quiche::QuicheMemSlice> vector2;
1315 vector2.push_back(
1316 quiche::QuicheMemSlice(quiche::QuicheBuffer(allocator, kDataSize)));
1317 vector2.push_back(
1318 quiche::QuicheMemSlice(quiche::QuicheBuffer(allocator, kDataSize)));
1319 absl::Span<quiche::QuicheMemSlice> span1(vector1);
1320 absl::Span<quiche::QuicheMemSlice> span2(vector2);
1321
1322 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1323 .WillOnce(InvokeWithoutArgs([this]() {
1324 return session_->ConsumeData(stream_->id(), 100u, 0u, NO_FIN,
vasilvv243b2622023-11-07 17:01:30 -08001325 NOT_RETRANSMISSION, std::nullopt);
Bence Békybac04052022-04-07 15:44:29 -04001326 }));
1327 // There is no buffered data before, all data should be consumed.
1328 QuicConsumedData consumed = stream_->WriteMemSlices(span1, false);
1329 EXPECT_EQ(2048u, consumed.bytes_consumed);
1330 EXPECT_FALSE(consumed.fin_consumed);
1331 EXPECT_EQ(2 * kDataSize - 100, stream_->BufferedDataBytes());
1332 EXPECT_FALSE(stream_->fin_buffered());
1333
1334 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _)).Times(0);
1335 // No Data can be consumed as buffered data is beyond upper limit.
1336 consumed = stream_->WriteMemSlices(span2, true);
1337 EXPECT_EQ(0u, consumed.bytes_consumed);
1338 EXPECT_FALSE(consumed.fin_consumed);
1339 EXPECT_EQ(2 * kDataSize - 100, stream_->BufferedDataBytes());
1340 EXPECT_FALSE(stream_->fin_buffered());
1341
1342 QuicByteCount data_to_write =
birenroyef686222022-09-12 11:34:34 -07001343 2 * kDataSize - 100 - GetQuicFlag(quic_buffered_data_threshold) + 1;
Bence Békybac04052022-04-07 15:44:29 -04001344 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1345 .WillOnce(InvokeWithoutArgs([this, data_to_write]() {
1346 return session_->ConsumeData(stream_->id(), data_to_write, 100u, NO_FIN,
vasilvv243b2622023-11-07 17:01:30 -08001347 NOT_RETRANSMISSION, std::nullopt);
Bence Békybac04052022-04-07 15:44:29 -04001348 }));
1349 EXPECT_CALL(*stream_, OnCanWriteNewData()).Times(1);
1350 stream_->OnCanWrite();
birenroyef686222022-09-12 11:34:34 -07001351 EXPECT_EQ(
1352 static_cast<uint64_t>(GetQuicFlag(quic_buffered_data_threshold) - 1),
1353 stream_->BufferedDataBytes());
Bence Békybac04052022-04-07 15:44:29 -04001354 // Try to write slices2 again.
1355 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _)).Times(0);
1356 consumed = stream_->WriteMemSlices(span2, true);
1357 EXPECT_EQ(2048u, consumed.bytes_consumed);
1358 EXPECT_TRUE(consumed.fin_consumed);
birenroyef686222022-09-12 11:34:34 -07001359 EXPECT_EQ(2 * kDataSize + GetQuicFlag(quic_buffered_data_threshold) - 1,
Bence Békybac04052022-04-07 15:44:29 -04001360 stream_->BufferedDataBytes());
1361 EXPECT_TRUE(stream_->fin_buffered());
1362
1363 // Flush all buffered data.
1364 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1365 .WillOnce(Invoke(session_.get(), &MockQuicSession::ConsumeData));
1366 stream_->OnCanWrite();
1367 EXPECT_CALL(*stream_, OnCanWriteNewData()).Times(0);
1368 EXPECT_FALSE(stream_->HasBufferedData());
1369 EXPECT_TRUE(stream_->write_side_closed());
1370}
1371
1372TEST_P(QuicStreamTest, WriteMemSlicesReachStreamLimit) {
1373 Initialize();
1374 QuicStreamPeer::SetStreamBytesWritten(kMaxStreamLength - 5u, stream_);
1375 std::vector<std::pair<char*, size_t>> buffers;
1376 quiche::QuicheMemSlice slice1 = MemSliceFromString("12345");
1377 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1378 .WillOnce(InvokeWithoutArgs([this]() {
1379 return session_->ConsumeData(stream_->id(), 5u, 0u, NO_FIN,
vasilvv243b2622023-11-07 17:01:30 -08001380 NOT_RETRANSMISSION, std::nullopt);
Bence Békybac04052022-04-07 15:44:29 -04001381 }));
1382 // There is no buffered data before, all data should be consumed.
1383 QuicConsumedData consumed = stream_->WriteMemSlice(std::move(slice1), false);
1384 EXPECT_EQ(5u, consumed.bytes_consumed);
1385
1386 quiche::QuicheMemSlice slice2 = MemSliceFromString("6");
vasilvvac2e30d2022-06-02 14:26:59 -07001387 EXPECT_QUIC_BUG(
1388 {
1389 EXPECT_CALL(*connection_,
1390 CloseConnection(QUIC_STREAM_LENGTH_OVERFLOW, _, _));
1391 stream_->WriteMemSlice(std::move(slice2), false);
1392 },
1393 "Write too many data via stream");
Bence Békybac04052022-04-07 15:44:29 -04001394}
1395
1396TEST_P(QuicStreamTest, StreamDataGetAckedMultipleTimes) {
1397 Initialize();
1398 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1399 .WillRepeatedly(Invoke(session_.get(), &MockQuicSession::ConsumeData));
1400 EXPECT_FALSE(stream_->IsWaitingForAcks());
1401 EXPECT_FALSE(session_->HasUnackedStreamData());
1402
1403 // Send [0, 27) and fin.
1404 stream_->WriteOrBufferData(kData1, false, nullptr);
1405 stream_->WriteOrBufferData(kData1, false, nullptr);
1406 stream_->WriteOrBufferData(kData1, true, nullptr);
1407 EXPECT_EQ(3u, QuicStreamPeer::SendBuffer(stream_).size());
1408 EXPECT_TRUE(stream_->IsWaitingForAcks());
1409 EXPECT_TRUE(session_->HasUnackedStreamData());
1410 // Ack [0, 9), [5, 22) and [18, 26)
1411 // Verify [0, 9) 9 bytes are acked.
1412 QuicByteCount newly_acked_length = 0;
1413 EXPECT_TRUE(stream_->OnStreamFrameAcked(0, 9, false, QuicTime::Delta::Zero(),
1414 QuicTime::Zero(),
1415 &newly_acked_length));
1416 EXPECT_EQ(9u, newly_acked_length);
1417 EXPECT_EQ(2u, QuicStreamPeer::SendBuffer(stream_).size());
1418 // Verify [9, 22) 13 bytes are acked.
1419 EXPECT_TRUE(stream_->OnStreamFrameAcked(5, 17, false, QuicTime::Delta::Zero(),
1420 QuicTime::Zero(),
1421 &newly_acked_length));
1422 EXPECT_EQ(13u, newly_acked_length);
1423 EXPECT_EQ(1u, QuicStreamPeer::SendBuffer(stream_).size());
1424 // Verify [22, 26) 4 bytes are acked.
1425 EXPECT_TRUE(stream_->OnStreamFrameAcked(18, 8, false, QuicTime::Delta::Zero(),
1426 QuicTime::Zero(),
1427 &newly_acked_length));
1428 EXPECT_EQ(4u, newly_acked_length);
1429 EXPECT_EQ(1u, QuicStreamPeer::SendBuffer(stream_).size());
1430 EXPECT_TRUE(stream_->IsWaitingForAcks());
1431 EXPECT_TRUE(session_->HasUnackedStreamData());
1432
1433 // Ack [0, 27). Verify [26, 27) 1 byte is acked.
1434 EXPECT_TRUE(stream_->OnStreamFrameAcked(26, 1, false, QuicTime::Delta::Zero(),
1435 QuicTime::Zero(),
1436 &newly_acked_length));
1437 EXPECT_EQ(1u, newly_acked_length);
1438 EXPECT_EQ(0u, QuicStreamPeer::SendBuffer(stream_).size());
1439 EXPECT_TRUE(stream_->IsWaitingForAcks());
1440 EXPECT_TRUE(session_->HasUnackedStreamData());
1441
1442 // Ack Fin.
1443 EXPECT_CALL(*stream_, OnWriteSideInDataRecvdState()).Times(1);
1444 EXPECT_TRUE(stream_->OnStreamFrameAcked(27, 0, true, QuicTime::Delta::Zero(),
1445 QuicTime::Zero(),
1446 &newly_acked_length));
1447 EXPECT_EQ(0u, newly_acked_length);
1448 EXPECT_EQ(0u, QuicStreamPeer::SendBuffer(stream_).size());
1449 EXPECT_FALSE(stream_->IsWaitingForAcks());
1450 EXPECT_FALSE(session_->HasUnackedStreamData());
1451
1452 // Ack [10, 27) and fin. No new data is acked.
1453 EXPECT_FALSE(
1454 stream_->OnStreamFrameAcked(10, 17, true, QuicTime::Delta::Zero(),
1455 QuicTime::Zero(), &newly_acked_length));
1456 EXPECT_EQ(0u, newly_acked_length);
1457 EXPECT_EQ(0u, QuicStreamPeer::SendBuffer(stream_).size());
1458 EXPECT_FALSE(stream_->IsWaitingForAcks());
1459 EXPECT_FALSE(session_->HasUnackedStreamData());
1460}
1461
1462TEST_P(QuicStreamTest, OnStreamFrameLost) {
1463 Initialize();
1464
1465 // Send [0, 9).
1466 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1467 .WillOnce(Invoke(session_.get(), &MockQuicSession::ConsumeData));
1468 stream_->WriteOrBufferData(kData1, false, nullptr);
1469 EXPECT_FALSE(stream_->HasBufferedData());
1470 EXPECT_TRUE(stream_->IsStreamFrameOutstanding(0, 9, false));
1471
1472 // Try to send [9, 27), but connection is blocked.
1473 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1474 .WillOnce(Return(QuicConsumedData(0, false)));
1475 stream_->WriteOrBufferData(kData2, false, nullptr);
1476 stream_->WriteOrBufferData(kData2, false, nullptr);
1477 EXPECT_TRUE(stream_->HasBufferedData());
1478 EXPECT_FALSE(stream_->HasPendingRetransmission());
1479
1480 // Lost [0, 9). When stream gets a chance to write, only lost data is
1481 // transmitted.
1482 stream_->OnStreamFrameLost(0, 9, false);
1483 EXPECT_TRUE(stream_->HasPendingRetransmission());
1484 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1485 .WillOnce(Invoke(session_.get(), &MockQuicSession::ConsumeData));
1486 EXPECT_CALL(*stream_, OnCanWriteNewData()).Times(1);
1487 stream_->OnCanWrite();
1488 EXPECT_FALSE(stream_->HasPendingRetransmission());
1489 EXPECT_TRUE(stream_->HasBufferedData());
1490
1491 // This OnCanWrite causes [9, 27) to be sent.
1492 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1493 .WillOnce(Invoke(session_.get(), &MockQuicSession::ConsumeData));
1494 stream_->OnCanWrite();
1495 EXPECT_FALSE(stream_->HasBufferedData());
1496
1497 // Send a fin only frame.
1498 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1499 .WillOnce(Invoke(session_.get(), &MockQuicSession::ConsumeData));
1500 stream_->WriteOrBufferData("", true, nullptr);
1501
1502 // Lost [9, 27) and fin.
1503 stream_->OnStreamFrameLost(9, 18, false);
1504 stream_->OnStreamFrameLost(27, 0, true);
1505 EXPECT_TRUE(stream_->HasPendingRetransmission());
1506
1507 // Ack [9, 18).
1508 QuicByteCount newly_acked_length = 0;
1509 EXPECT_TRUE(stream_->OnStreamFrameAcked(9, 9, false, QuicTime::Delta::Zero(),
1510 QuicTime::Zero(),
1511 &newly_acked_length));
1512 EXPECT_EQ(9u, newly_acked_length);
1513 EXPECT_FALSE(stream_->IsStreamFrameOutstanding(9, 3, false));
1514 EXPECT_TRUE(stream_->HasPendingRetransmission());
1515 // This OnCanWrite causes [18, 27) and fin to be retransmitted. Verify fin can
1516 // be bundled with data.
1517 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1518 .WillOnce(InvokeWithoutArgs([this]() {
1519 return session_->ConsumeData(stream_->id(), 9u, 18u, FIN,
vasilvv243b2622023-11-07 17:01:30 -08001520 NOT_RETRANSMISSION, std::nullopt);
Bence Békybac04052022-04-07 15:44:29 -04001521 }));
1522 stream_->OnCanWrite();
1523 EXPECT_FALSE(stream_->HasPendingRetransmission());
1524 // Lost [9, 18) again, but it is not considered as lost because kData2
1525 // has been acked.
1526 stream_->OnStreamFrameLost(9, 9, false);
1527 EXPECT_FALSE(stream_->HasPendingRetransmission());
1528 EXPECT_TRUE(stream_->IsStreamFrameOutstanding(27, 0, true));
1529}
1530
1531TEST_P(QuicStreamTest, CannotBundleLostFin) {
1532 Initialize();
1533
1534 // Send [0, 18) and fin.
1535 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1536 .WillRepeatedly(Invoke(session_.get(), &MockQuicSession::ConsumeData));
1537 stream_->WriteOrBufferData(kData1, false, nullptr);
1538 stream_->WriteOrBufferData(kData2, true, nullptr);
1539
1540 // Lost [0, 9) and fin.
1541 stream_->OnStreamFrameLost(0, 9, false);
1542 stream_->OnStreamFrameLost(18, 0, true);
1543
1544 // Retransmit lost data. Verify [0, 9) and fin are retransmitted in two
1545 // frames.
1546 InSequence s;
1547 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1548 .WillOnce(InvokeWithoutArgs([this]() {
1549 return session_->ConsumeData(stream_->id(), 9u, 0u, NO_FIN,
vasilvv243b2622023-11-07 17:01:30 -08001550 NOT_RETRANSMISSION, std::nullopt);
Bence Békybac04052022-04-07 15:44:29 -04001551 }));
1552 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1553 .WillOnce(Return(QuicConsumedData(0, true)));
1554 stream_->OnCanWrite();
1555}
1556
1557TEST_P(QuicStreamTest, MarkConnectionLevelWriteBlockedOnWindowUpdateFrame) {
1558 Initialize();
1559
1560 // Set the config to a small value so that a newly created stream has small
1561 // send flow control window.
1562 QuicConfigPeer::SetReceivedInitialStreamFlowControlWindow(session_->config(),
1563 100);
1564 QuicConfigPeer::SetReceivedInitialMaxStreamDataBytesIncomingBidirectional(
1565 session_->config(), 100);
1566 auto stream = new TestStream(GetNthClientInitiatedBidirectionalStreamId(
1567 GetParam().transport_version, 2),
1568 session_.get(), BIDIRECTIONAL);
1569 session_->ActivateStream(absl::WrapUnique(stream));
1570
1571 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1572 .WillRepeatedly(Invoke(session_.get(), &MockQuicSession::ConsumeData));
QUICHE team1a271082022-05-18 10:22:22 -07001573 EXPECT_CALL(*session_, SendBlocked(_, _)).Times(1);
Bence Békybac04052022-04-07 15:44:29 -04001574 std::string data(1024, '.');
1575 stream->WriteOrBufferData(data, false, nullptr);
1576 EXPECT_FALSE(HasWriteBlockedStreams());
1577
1578 QuicWindowUpdateFrame window_update(kInvalidControlFrameId, stream_->id(),
1579 1234);
1580
1581 stream->OnWindowUpdateFrame(window_update);
1582 // Verify stream is marked connection level write blocked.
1583 EXPECT_TRUE(HasWriteBlockedStreams());
1584 EXPECT_TRUE(stream->HasBufferedData());
1585}
1586
1587// Regression test for b/73282665.
1588TEST_P(QuicStreamTest,
1589 MarkConnectionLevelWriteBlockedOnWindowUpdateFrameWithNoBufferedData) {
1590 Initialize();
1591
1592 // Set the config to a small value so that a newly created stream has small
1593 // send flow control window.
1594 QuicConfigPeer::SetReceivedInitialStreamFlowControlWindow(session_->config(),
1595 100);
1596 QuicConfigPeer::SetReceivedInitialMaxStreamDataBytesIncomingBidirectional(
1597 session_->config(), 100);
1598 auto stream = new TestStream(GetNthClientInitiatedBidirectionalStreamId(
1599 GetParam().transport_version, 2),
1600 session_.get(), BIDIRECTIONAL);
1601 session_->ActivateStream(absl::WrapUnique(stream));
1602
1603 std::string data(100, '.');
1604 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1605 .WillRepeatedly(Invoke(session_.get(), &MockQuicSession::ConsumeData));
QUICHE team1a271082022-05-18 10:22:22 -07001606 EXPECT_CALL(*session_, SendBlocked(_, _)).Times(1);
Bence Békybac04052022-04-07 15:44:29 -04001607 stream->WriteOrBufferData(data, false, nullptr);
1608 EXPECT_FALSE(HasWriteBlockedStreams());
1609
1610 QuicWindowUpdateFrame window_update(kInvalidControlFrameId, stream_->id(),
1611 120);
1612 stream->OnWindowUpdateFrame(window_update);
1613 EXPECT_FALSE(stream->HasBufferedData());
1614 // Verify stream is marked as blocked although there is no buffered data.
1615 EXPECT_TRUE(HasWriteBlockedStreams());
1616}
1617
1618TEST_P(QuicStreamTest, RetransmitStreamData) {
1619 Initialize();
1620 InSequence s;
1621
1622 // Send [0, 18) with fin.
1623 EXPECT_CALL(*session_, WritevData(stream_->id(), _, _, _, _, _))
1624 .Times(2)
1625 .WillRepeatedly(Invoke(session_.get(), &MockQuicSession::ConsumeData));
1626 stream_->WriteOrBufferData(kData1, false, nullptr);
1627 stream_->WriteOrBufferData(kData1, true, nullptr);
1628 // Ack [10, 13).
1629 QuicByteCount newly_acked_length = 0;
1630 stream_->OnStreamFrameAcked(10, 3, false, QuicTime::Delta::Zero(),
1631 QuicTime::Zero(), &newly_acked_length);
1632 EXPECT_EQ(3u, newly_acked_length);
1633 // Retransmit [0, 18) with fin, and only [0, 8) is consumed.
1634 EXPECT_CALL(*session_, WritevData(stream_->id(), 10, 0, NO_FIN, _, _))
1635 .WillOnce(InvokeWithoutArgs([this]() {
1636 return session_->ConsumeData(stream_->id(), 8, 0u, NO_FIN,
vasilvv243b2622023-11-07 17:01:30 -08001637 NOT_RETRANSMISSION, std::nullopt);
Bence Békybac04052022-04-07 15:44:29 -04001638 }));
1639 EXPECT_FALSE(stream_->RetransmitStreamData(0, 18, true, PTO_RETRANSMISSION));
1640
1641 // Retransmit [0, 18) with fin, and all is consumed.
1642 EXPECT_CALL(*session_, WritevData(stream_->id(), 10, 0, NO_FIN, _, _))
1643 .WillOnce(Invoke(session_.get(), &MockQuicSession::ConsumeData));
1644 EXPECT_CALL(*session_, WritevData(stream_->id(), 5, 13, FIN, _, _))
1645 .WillOnce(Invoke(session_.get(), &MockQuicSession::ConsumeData));
1646 EXPECT_TRUE(stream_->RetransmitStreamData(0, 18, true, PTO_RETRANSMISSION));
1647
1648 // Retransmit [0, 8) with fin, and all is consumed.
1649 EXPECT_CALL(*session_, WritevData(stream_->id(), 8, 0, NO_FIN, _, _))
1650 .WillOnce(Invoke(session_.get(), &MockQuicSession::ConsumeData));
1651 EXPECT_CALL(*session_, WritevData(stream_->id(), 0, 18, FIN, _, _))
1652 .WillOnce(Invoke(session_.get(), &MockQuicSession::ConsumeData));
1653 EXPECT_TRUE(stream_->RetransmitStreamData(0, 8, true, PTO_RETRANSMISSION));
1654}
1655
1656TEST_P(QuicStreamTest, ResetStreamOnTtlExpiresRetransmitLostData) {
1657 Initialize();
1658
1659 EXPECT_CALL(*session_, WritevData(stream_->id(), 200, 0, FIN, _, _))
1660 .WillOnce(Invoke(session_.get(), &MockQuicSession::ConsumeData));
1661 std::string body(200, 'a');
1662 stream_->WriteOrBufferData(body, true, nullptr);
1663
1664 // Set TTL to be 1 s.
1665 QuicTime::Delta ttl = QuicTime::Delta::FromSeconds(1);
1666 ASSERT_TRUE(stream_->MaybeSetTtl(ttl));
1667 // Verify data gets retransmitted because TTL does not expire.
1668 EXPECT_CALL(*session_, WritevData(stream_->id(), 100, 0, NO_FIN, _, _))
1669 .WillOnce(Invoke(session_.get(), &MockQuicSession::ConsumeData));
1670 EXPECT_TRUE(stream_->RetransmitStreamData(0, 100, false, PTO_RETRANSMISSION));
1671 stream_->OnStreamFrameLost(100, 100, true);
1672 EXPECT_TRUE(stream_->HasPendingRetransmission());
1673
1674 connection_->AdvanceTime(QuicTime::Delta::FromSeconds(1));
1675 // Verify stream gets reset because TTL expires.
1676 if (session_->version().UsesHttp3()) {
1677 EXPECT_CALL(*session_,
1678 MaybeSendStopSendingFrame(_, QuicResetStreamError::FromInternal(
1679 QUIC_STREAM_TTL_EXPIRED)))
1680 .Times(1);
1681 }
1682 EXPECT_CALL(
1683 *session_,
1684 MaybeSendRstStreamFrame(
1685 _, QuicResetStreamError::FromInternal(QUIC_STREAM_TTL_EXPIRED), _))
1686 .Times(1);
1687 stream_->OnCanWrite();
1688}
1689
1690TEST_P(QuicStreamTest, ResetStreamOnTtlExpiresEarlyRetransmitData) {
1691 Initialize();
1692
1693 EXPECT_CALL(*session_, WritevData(stream_->id(), 200, 0, FIN, _, _))
1694 .WillOnce(Invoke(session_.get(), &MockQuicSession::ConsumeData));
1695 std::string body(200, 'a');
1696 stream_->WriteOrBufferData(body, true, nullptr);
1697
1698 // Set TTL to be 1 s.
1699 QuicTime::Delta ttl = QuicTime::Delta::FromSeconds(1);
1700 ASSERT_TRUE(stream_->MaybeSetTtl(ttl));
1701
1702 connection_->AdvanceTime(QuicTime::Delta::FromSeconds(1));
1703 // Verify stream gets reset because TTL expires.
1704 if (session_->version().UsesHttp3()) {
1705 EXPECT_CALL(*session_,
1706 MaybeSendStopSendingFrame(_, QuicResetStreamError::FromInternal(
1707 QUIC_STREAM_TTL_EXPIRED)))
1708 .Times(1);
1709 }
1710 EXPECT_CALL(
1711 *session_,
1712 MaybeSendRstStreamFrame(
1713 _, QuicResetStreamError::FromInternal(QUIC_STREAM_TTL_EXPIRED), _))
1714 .Times(1);
1715 stream_->RetransmitStreamData(0, 100, false, PTO_RETRANSMISSION);
1716}
1717
1718// Test that OnStreamReset does one-way (read) closes if version 99, two way
1719// (read and write) if not version 99.
1720TEST_P(QuicStreamTest, OnStreamResetReadOrReadWrite) {
1721 Initialize();
1722 EXPECT_FALSE(stream_->write_side_closed());
1723 EXPECT_FALSE(QuicStreamPeer::read_side_closed(stream_));
1724
1725 QuicRstStreamFrame rst_frame(kInvalidControlFrameId, stream_->id(),
1726 QUIC_STREAM_CANCELLED, 1234);
1727 stream_->OnStreamReset(rst_frame);
1728 if (VersionHasIetfQuicFrames(connection_->transport_version())) {
1729 // Version 99/IETF QUIC should close just the read side.
1730 EXPECT_TRUE(QuicStreamPeer::read_side_closed(stream_));
1731 EXPECT_FALSE(stream_->write_side_closed());
1732 } else {
1733 // Google QUIC should close both sides of the stream.
1734 EXPECT_TRUE(stream_->write_side_closed());
1735 EXPECT_TRUE(QuicStreamPeer::read_side_closed(stream_));
1736 }
1737}
1738
1739TEST_P(QuicStreamTest, WindowUpdateForReadOnlyStream) {
1740 Initialize();
1741
1742 QuicStreamId stream_id = QuicUtils::GetFirstUnidirectionalStreamId(
1743 connection_->transport_version(), Perspective::IS_CLIENT);
1744 TestStream stream(stream_id, session_.get(), READ_UNIDIRECTIONAL);
1745 QuicWindowUpdateFrame window_update_frame(kInvalidControlFrameId, stream_id,
1746 0);
1747 EXPECT_CALL(
1748 *connection_,
1749 CloseConnection(
1750 QUIC_WINDOW_UPDATE_RECEIVED_ON_READ_UNIDIRECTIONAL_STREAM,
1751 "WindowUpdateFrame received on READ_UNIDIRECTIONAL stream.", _));
1752 stream.OnWindowUpdateFrame(window_update_frame);
1753}
1754
1755TEST_P(QuicStreamTest, RstStreamFrameChangesCloseOffset) {
1756 Initialize();
1757
1758 QuicStreamFrame stream_frame(stream_->id(), true, 0, "abc");
1759 EXPECT_CALL(*stream_, OnDataAvailable());
1760 stream_->OnStreamFrame(stream_frame);
1761 QuicRstStreamFrame rst(kInvalidControlFrameId, stream_->id(),
1762 QUIC_STREAM_CANCELLED, 0u);
1763
1764 EXPECT_CALL(*connection_, CloseConnection(QUIC_STREAM_MULTIPLE_OFFSET, _, _));
1765 stream_->OnStreamReset(rst);
1766}
1767
1768// Regression test for b/176073284.
1769TEST_P(QuicStreamTest, EmptyStreamFrameWithNoFin) {
1770 Initialize();
1771 QuicStreamFrame empty_stream_frame(stream_->id(), false, 0, "");
1772 if (stream_->version().HasIetfQuicFrames()) {
1773 EXPECT_CALL(*connection_,
1774 CloseConnection(QUIC_EMPTY_STREAM_FRAME_NO_FIN, _, _))
1775 .Times(0);
1776 } else {
1777 EXPECT_CALL(*connection_,
1778 CloseConnection(QUIC_EMPTY_STREAM_FRAME_NO_FIN, _, _));
1779 }
1780 EXPECT_CALL(*stream_, OnDataAvailable()).Times(0);
1781 stream_->OnStreamFrame(empty_stream_frame);
1782}
1783
1784TEST_P(QuicStreamTest, SendRstWithCustomIetfCode) {
1785 Initialize();
1786 QuicResetStreamError error(QUIC_STREAM_CANCELLED, 0x1234abcd);
1787 EXPECT_CALL(*session_, MaybeSendRstStreamFrame(kTestStreamId, error, _))
1788 .Times(1);
1789 stream_->ResetWithError(error);
1790 EXPECT_TRUE(rst_sent());
1791}
1792
martinduke890a21c2024-09-18 07:46:32 -07001793TEST_P(QuicStreamTest, ResetWhenOffsetReached) {
1794 Initialize();
1795 if (!VersionHasIetfQuicFrames(session_->transport_version())) {
1796 return;
1797 }
1798 QuicResetStreamAtFrame rst(0, stream_->id(), QUIC_STREAM_CANCELLED, 400, 100);
1799 stream_->OnResetStreamAtFrame(rst); // Nothing happens.
1800
1801 // Send data to reach reliable_offset.
1802 char data[100];
1803 EXPECT_CALL(*stream_, OnDataAvailable()).WillOnce([this]() {
1804 stream_->ConsumeData(99);
1805 });
1806 stream_->OnStreamFrame(
1807 QuicStreamFrame(stream_->id(), false, 0, absl::string_view(data, 99)));
1808 EXPECT_FALSE(stream_->rst_received());
1809 EXPECT_FALSE(stream_->read_side_closed());
1810 EXPECT_CALL(*stream_, OnDataAvailable()).WillOnce([this]() {
1811 stream_->ConsumeData(1);
1812 });
1813 stream_->OnStreamFrame(QuicStreamFrame(stream_->id(), false, 99,
1814 absl::string_view(data + 99, 1)));
1815 EXPECT_TRUE(stream_->rst_received());
1816 EXPECT_TRUE(stream_->read_side_closed());
1817}
1818
1819TEST_P(QuicStreamTest, ResetWhenOffsetReachedOutOfOrder) {
1820 Initialize();
1821 if (!VersionHasIetfQuicFrames(session_->transport_version())) {
1822 return;
1823 }
1824 QuicResetStreamAtFrame rst(0, stream_->id(), QUIC_STREAM_CANCELLED, 400, 100);
1825 stream_->OnResetStreamAtFrame(rst); // Nothing happens.
1826
1827 // Send data to reach reliable_offset.
1828 char data[100];
1829 stream_->OnStreamFrame(QuicStreamFrame(stream_->id(), false, 99,
1830 absl::string_view(data + 99, 1)));
1831 EXPECT_FALSE(stream_->rst_received());
1832 EXPECT_FALSE(stream_->read_side_closed());
1833 EXPECT_CALL(*stream_, OnDataAvailable()).WillOnce([this]() {
1834 stream_->ConsumeData(100);
1835 });
1836 stream_->OnStreamFrame(
1837 QuicStreamFrame(stream_->id(), false, 0, absl::string_view(data, 99)));
1838 EXPECT_TRUE(stream_->rst_received());
1839 EXPECT_TRUE(stream_->read_side_closed());
1840}
1841
1842TEST_P(QuicStreamTest, HigherReliableSizeIgnored) {
1843 Initialize();
1844 if (!VersionHasIetfQuicFrames(session_->transport_version())) {
1845 return;
1846 }
1847 QuicResetStreamAtFrame rst(0, stream_->id(), QUIC_STREAM_CANCELLED, 400, 100);
1848 stream_->OnResetStreamAtFrame(rst); // Nothing happens.
1849 QuicResetStreamAtFrame rst2(0, stream_->id(), QUIC_STREAM_CANCELLED, 400,
1850 200);
1851 stream_->OnResetStreamAtFrame(rst2); // Ignored.
1852
1853 // Send data to reach reliable_offset.
1854 char data[100];
1855 EXPECT_CALL(*stream_, OnDataAvailable()).WillOnce([this]() {
1856 stream_->ConsumeData(99);
1857 });
1858 stream_->OnStreamFrame(
1859 QuicStreamFrame(stream_->id(), false, 0, absl::string_view(data, 99)));
1860 EXPECT_FALSE(stream_->rst_received());
1861 EXPECT_FALSE(stream_->read_side_closed());
1862 EXPECT_CALL(*stream_, OnDataAvailable()).WillOnce([this]() {
1863 stream_->ConsumeData(1);
1864 });
1865 stream_->OnStreamFrame(QuicStreamFrame(stream_->id(), false, 99,
1866 absl::string_view(data + 99, 1)));
1867 EXPECT_TRUE(stream_->rst_received());
1868 EXPECT_TRUE(stream_->read_side_closed());
1869}
1870
1871TEST_P(QuicStreamTest, InstantReset) {
1872 Initialize();
1873 if (!VersionHasIetfQuicFrames(session_->transport_version())) {
1874 return;
1875 }
1876 char data[100];
1877 EXPECT_CALL(*stream_, OnDataAvailable()).WillOnce([this]() {
1878 stream_->ConsumeData(100);
1879 });
1880 stream_->OnStreamFrame(
1881 QuicStreamFrame(stream_->id(), false, 0, absl::string_view(data, 100)));
1882 QuicResetStreamAtFrame rst(0, stream_->id(), QUIC_STREAM_CANCELLED, 400, 100);
1883 EXPECT_FALSE(stream_->rst_received());
1884 EXPECT_FALSE(stream_->read_side_closed());
1885 stream_->OnResetStreamAtFrame(rst);
1886 EXPECT_TRUE(stream_->rst_received());
1887 EXPECT_TRUE(stream_->read_side_closed());
1888}
1889
1890TEST_P(QuicStreamTest, ResetIgnoredDueToFin) {
1891 Initialize();
1892 if (!VersionHasIetfQuicFrames(session_->transport_version())) {
1893 return;
1894 }
1895 char data[100];
1896 EXPECT_CALL(*stream_, OnDataAvailable()).WillOnce([this]() {
1897 stream_->ConsumeData(98);
1898 });
1899 stream_->OnStreamFrame(
1900 QuicStreamFrame(stream_->id(), false, 0, absl::string_view(data, 98)));
1901 QuicResetStreamAtFrame rst(0, stream_->id(), QUIC_STREAM_CANCELLED, 100, 99);
1902 stream_->OnResetStreamAtFrame(rst); // Nothing happens.
1903 // There is no call to OnFinRead() because the stream is responsible for
1904 // doing that.
1905 EXPECT_FALSE(stream_->rst_received());
1906 EXPECT_FALSE(stream_->read_side_closed());
1907 EXPECT_CALL(*stream_, OnDataAvailable()).WillOnce([this]() {
1908 stream_->ConsumeData(2);
1909 stream_->OnFinRead();
1910 });
1911 stream_->OnStreamFrame(QuicStreamFrame(stream_->id(), true, 98,
1912 absl::string_view(data + 98, 2)));
1913 EXPECT_FALSE(stream_->rst_received());
1914 EXPECT_TRUE(stream_->read_side_closed());
1915}
1916
1917TEST_P(QuicStreamTest, ReliableOffsetBeyondFin) {
1918 Initialize();
1919 if (!VersionHasIetfQuicFrames(session_->transport_version())) {
1920 return;
1921 }
1922 char data[100];
1923 stream_->OnStreamFrame(QuicStreamFrame(stream_->id(), true, 98,
1924 absl::string_view(data + 98, 2)));
1925 EXPECT_CALL(*connection_, CloseConnection(QUIC_STREAM_MULTIPLE_OFFSET, _, _))
1926 .Times(1);
1927 QuicResetStreamAtFrame rst(0, stream_->id(), QUIC_STREAM_CANCELLED, 101, 101);
1928 stream_->OnResetStreamAtFrame(rst); // Nothing happens.
1929}
1930
1931TEST_P(QuicStreamTest, FinBeforeReliableOffset) {
1932 Initialize();
1933 if (!VersionHasIetfQuicFrames(session_->transport_version())) {
1934 return;
1935 }
1936 QuicResetStreamAtFrame rst(0, stream_->id(), QUIC_STREAM_CANCELLED, 101, 101);
1937 stream_->OnResetStreamAtFrame(rst); // Nothing happens.
1938 char data[100];
1939 EXPECT_CALL(*connection_, CloseConnection(QUIC_STREAM_MULTIPLE_OFFSET, _, _))
1940 .Times(1);
1941 stream_->OnStreamFrame(
1942 QuicStreamFrame(stream_->id(), true, 0, absl::string_view(data, 100)));
1943}
1944
Bence Békybac04052022-04-07 15:44:29 -04001945} // namespace
1946} // namespace test
1947} // namespace quic