Make MoqtControlParser read directly from the stream, similar to what MoqtDataParser already does.
The immediate cause for this is a recurring (but very difficult to isolate bug) in which something like this happens:
1. Parser receives a control frame.
2. The application visitor does not like the contents of the frame, so it terminates the connection.
3. MoqtSession closes the underlying WebTransport session, which results in the control stream being reset.
4. Control stream being reset results in the receive buffer being deleted.
5. When the visitor returns, the parser attempts to parse the next chunk, not realizing that the buffer in question has been deleted.
I spent some time trying to come up with a workaround, but none of them seemed to work, so I gave up and just rewrote the thing to call the stream directly; that way, if the stream drops its buffers, the read will just fail.
(this CL does not have a repro for this bug, since triggering it consistently turns out to be really hard due to the way parser does its own buffering; a downstream CL has an ASAN crash that triggers it, and I've also randomly hit it in the integration tests before)
As a side effect, I also made the parser automatically fail on any FIN on the control stream (we used to handle those at message boundaries, which is not necessary).
PiperOrigin-RevId: 744787214
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc
index 71fd8d5..5282470 100644
--- a/quiche/quic/moqt/moqt_parser.cc
+++ b/quiche/quic/moqt/moqt_parser.cc
@@ -14,6 +14,7 @@
#include "absl/base/casts.h"
#include "absl/cleanup/cleanup.h"
+#include "absl/container/fixed_array.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/types/span.h"
@@ -66,83 +67,119 @@
return false;
}
+std::optional<uint64_t> ReadVarInt62FromStream(quiche::ReadStream& stream,
+ bool& fin_read) {
+ fin_read = false;
+
+ quiche::ReadStream::PeekResult peek_result = stream.PeekNextReadableRegion();
+ if (peek_result.peeked_data.empty()) {
+ if (peek_result.fin_next) {
+ fin_read = stream.SkipBytes(0);
+ QUICHE_DCHECK(fin_read);
+ }
+ return std::nullopt;
+ }
+ char first_byte = peek_result.peeked_data[0];
+ size_t varint_size =
+ 1 << ((absl::bit_cast<uint8_t>(first_byte) & 0b11000000) >> 6);
+ if (stream.ReadableBytes() < varint_size) {
+ return std::nullopt;
+ }
+
+ char buffer[8];
+ absl::Span<char> bytes_to_read =
+ absl::MakeSpan(buffer).subspan(0, varint_size);
+ quiche::ReadStream::ReadResult read_result = stream.Read(bytes_to_read);
+ QUICHE_DCHECK_EQ(read_result.bytes_read, varint_size);
+ fin_read = read_result.fin;
+
+ quiche::QuicheDataReader reader(buffer, read_result.bytes_read);
+ uint64_t result;
+ bool success = reader.ReadVarInt62(&result);
+ QUICHE_DCHECK(success);
+ QUICHE_DCHECK(reader.IsDoneReading());
+ return result;
+}
+
} // namespace
-// The buffering philosophy is complicated, to minimize copying. Here is an
-// overview:
-// If the entire message body is present (except for OBJECT payload), it is
-// parsed and delivered. If not, the partial body is buffered. (requiring a
-// copy).
-// Any OBJECT payload is always delivered to the application without copying.
-// If something has been buffered, when more data arrives copy just enough of it
-// to finish parsing that thing, then resume normal processing.
-void MoqtControlParser::ProcessData(absl::string_view data, bool fin) {
+void MoqtControlParser::ReadAndDispatchMessages() {
if (no_more_data_) {
ParseError("Data after end of stream");
+ return;
}
if (processing_) {
return;
}
processing_ = true;
auto on_return = absl::MakeCleanup([&] { processing_ = false; });
- // Check for early fin
- if (fin) {
- no_more_data_ = true;
- if (!buffered_message_.empty() && data.empty()) {
- ParseError("End of stream before complete message");
+ while (!no_more_data_) {
+ bool fin_read = false;
+
+ // Read the message type.
+ if (!message_type_.has_value()) {
+ message_type_ = ReadVarInt62FromStream(stream_, fin_read);
+ if (fin_read) {
+ ParseError("FIN on control stream");
+ return;
+ }
+ if (!message_type_.has_value()) {
+ return;
+ }
+ }
+ QUICHE_DCHECK(message_type_.has_value());
+
+ // Read the message length.
+ if (!message_size_.has_value()) {
+ message_size_ = ReadVarInt62FromStream(stream_, fin_read);
+ if (fin_read) {
+ ParseError("FIN on control stream");
+ return;
+ }
+ if (!message_size_.has_value()) {
+ return;
+ }
+
+ if (*message_size_ > kMaxMessageHeaderSize) {
+ ParseError(MoqtError::kInternalError,
+ absl::StrCat("Cannot parse control messages more than ",
+ kMaxMessageHeaderSize, " bytes"));
+ return;
+ }
+ }
+ QUICHE_DCHECK(message_size_.has_value());
+
+ // Read the message if it's fully received.
+ //
+ // CAUTION: if the flow control windows are too low, and
+ // kMaxMessageHeaderSize is too high, this will cause a deadlock.
+ if (stream_.ReadableBytes() < *message_size_) {
return;
}
- }
- std::optional<quic::QuicDataReader> reader = std::nullopt;
- size_t original_buffer_size = buffered_message_.size();
- if (!buffered_message_.empty()) {
- absl::StrAppend(&buffered_message_, data);
- reader.emplace(buffered_message_);
- } else {
- // No message in progress.
- reader.emplace(data);
- }
- size_t total_processed = 0;
- while (!reader->IsDoneReading()) {
- size_t message_len = ProcessMessage(reader->PeekRemainingPayload());
- if (message_len == 0) {
- if (reader->BytesRemaining() > kMaxMessageHeaderSize) {
- ParseError(MoqtError::kInternalError,
- "Cannot parse non-OBJECT messages > 2KB");
- return;
- }
- if (fin) {
- ParseError("FIN after incomplete message");
- return;
- }
- if (buffered_message_.empty()) {
- // If the buffer is not empty, |data| has already been copied there.
- absl::StrAppend(&buffered_message_, reader->PeekRemainingPayload());
- }
- break;
+ absl::FixedArray<char> message(*message_size_);
+ quiche::ReadStream::ReadResult result =
+ stream_.Read(absl::MakeSpan(message));
+ if (result.bytes_read != *message_size_) {
+ ParseError("Stream returned incorrect ReadableBytes");
+ return;
}
- // A message was successfully processed.
- total_processed += message_len;
- reader->Seek(message_len);
- }
- if (original_buffer_size > 0) {
- buffered_message_.erase(0, total_processed);
+ if (result.fin) {
+ ParseError("FIN on control stream");
+ return;
+ }
+
+ ProcessMessage(absl::string_view(message.data(), message.size()),
+ static_cast<MoqtMessageType>(*message_type_));
+ message_type_.reset();
+ message_size_.reset();
}
}
-size_t MoqtControlParser::ProcessMessage(absl::string_view data) {
- uint64_t value, length;
+size_t MoqtControlParser::ProcessMessage(absl::string_view data,
+ MoqtMessageType message_type) {
quic::QuicDataReader reader(data);
- if (!reader.ReadVarInt62(&value) || !reader.ReadVarInt62(&length)) {
- return 0;
- }
- if (length > reader.BytesRemaining()) {
- return 0;
- }
- auto type = static_cast<MoqtMessageType>(value);
- size_t message_header_length = reader.PreviouslyReadPayload().length();
size_t bytes_read;
- switch (type) {
+ switch (message_type) {
case MoqtMessageType::kClientSetup:
bytes_read = ProcessClientSetup(reader);
break;
@@ -229,7 +266,7 @@
bytes_read = 0;
break;
}
- if ((bytes_read - message_header_length) != length) {
+ if (bytes_read != data.size() || bytes_read == 0) {
ParseError("Message length does not match payload length");
return 0;
}
@@ -1007,42 +1044,9 @@
}
}
-std::optional<uint64_t> MoqtDataParser::ReadVarInt62(bool& fin_read) {
- fin_read = false;
-
- quiche::ReadStream::PeekResult peek_result = stream_.PeekNextReadableRegion();
- if (peek_result.peeked_data.empty()) {
- if (peek_result.fin_next) {
- fin_read = stream_.SkipBytes(0);
- QUICHE_DCHECK(fin_read);
- }
- return std::nullopt;
- }
- char first_byte = peek_result.peeked_data[0];
- size_t varint_size =
- 1 << ((absl::bit_cast<uint8_t>(first_byte) & 0b11000000) >> 6);
- if (stream_.ReadableBytes() < varint_size) {
- return std::nullopt;
- }
-
- char buffer[8];
- absl::Span<char> bytes_to_read =
- absl::MakeSpan(buffer).subspan(0, varint_size);
- quiche::ReadStream::ReadResult read_result = stream_.Read(bytes_to_read);
- QUICHE_DCHECK_EQ(read_result.bytes_read, varint_size);
- fin_read = read_result.fin;
-
- quiche::QuicheDataReader reader(buffer, read_result.bytes_read);
- uint64_t result;
- bool success = reader.ReadVarInt62(&result);
- QUICHE_DCHECK(success);
- QUICHE_DCHECK(reader.IsDoneReading());
- return result;
-}
-
std::optional<uint64_t> MoqtDataParser::ReadVarInt62NoFin() {
bool fin_read = false;
- std::optional<uint64_t> result = ReadVarInt62(fin_read);
+ std::optional<uint64_t> result = ReadVarInt62FromStream(stream_, fin_read);
if (fin_read) {
ParseError("Unexpected FIN received in the middle of a header");
return std::nullopt;
@@ -1206,7 +1210,8 @@
case kStatus: {
bool fin_read = false;
- std::optional<uint64_t> value_read = ReadVarInt62(fin_read);
+ std::optional<uint64_t> value_read =
+ ReadVarInt62FromStream(stream_, fin_read);
if (value_read.has_value()) {
metadata_.object_status = IntegerToObjectStatus(*value_read);
if (metadata_.object_status == MoqtObjectStatus::kInvalidObjectStatus) {
diff --git a/quiche/quic/moqt/moqt_parser.h b/quiche/quic/moqt/moqt_parser.h
index d9e5d0f..37c759e 100644
--- a/quiche/quic/moqt/moqt_parser.h
+++ b/quiche/quic/moqt/moqt_parser.h
@@ -86,27 +86,21 @@
class QUICHE_EXPORT MoqtControlParser {
public:
- MoqtControlParser(bool uses_web_transport, MoqtControlParserVisitor& visitor)
- : visitor_(visitor), uses_web_transport_(uses_web_transport) {}
+ MoqtControlParser(bool uses_web_transport, quiche::ReadStream* stream,
+ MoqtControlParserVisitor& visitor)
+ : visitor_(visitor),
+ stream_(*stream),
+ uses_web_transport_(uses_web_transport) {}
~MoqtControlParser() = default;
- // Take a buffer from the transport in |data|. Parse each complete message and
- // call the appropriate visitor function. If |fin| is true, there
- // is no more data arriving on the stream, so the parser will deliver any
- // message encoded as to run to the end of the stream.
- // All bytes can be freed. Calls OnParsingError() when there is a parsing
- // error.
- // Any calls after sending |fin| = true will be ignored.
- // TODO(martinduke): Figure out what has to happen if the message arrives via
- // datagram rather than a stream.
- void ProcessData(absl::string_view data, bool fin);
+ void ReadAndDispatchMessages();
private:
// The central switch statement to dispatch a message to the correct
// Process* function. Returns 0 if it could not parse the full messsage
// (except for object payload). Otherwise, returns the number of bytes
// processed.
- size_t ProcessMessage(absl::string_view data);
+ size_t ProcessMessage(absl::string_view data, MoqtMessageType message_type);
// The Process* functions parse the serialized data into the appropriate
// structs, and call the relevant visitor function for further action. Returns
@@ -169,11 +163,13 @@
FullTrackName& full_track_name);
MoqtControlParserVisitor& visitor_;
+ quiche::ReadStream& stream_;
bool uses_web_transport_;
bool no_more_data_ = false; // Fatal error or fin. No more parsing.
bool parsing_error_ = false;
- std::string buffered_message_;
+ std::optional<uint64_t> message_type_;
+ std::optional<uint64_t> message_size_;
bool processing_ = false; // True if currently in ProcessData(), to prevent
// re-entrancy.
@@ -245,8 +241,6 @@
void ReadDataUntil(StopCondition stop_condition);
- // Reads a single varint from the underlying stream.
- std::optional<uint64_t> ReadVarInt62(bool& fin_read);
// Reads a single varint from the underlying stream. Triggers a parse error if
// a FIN has been encountered.
std::optional<uint64_t> ReadVarInt62NoFin();
diff --git a/quiche/quic/moqt/moqt_parser_test.cc b/quiche/quic/moqt/moqt_parser_test.cc
index bec4458..a2270db 100644
--- a/quiche/quic/moqt/moqt_parser_test.cc
+++ b/quiche/quic/moqt/moqt_parser_test.cc
@@ -242,7 +242,9 @@
MoqtParserTest()
: message_type_(GetParam().message_type),
webtrans_(GetParam().uses_web_transport),
- control_parser_(GetParam().uses_web_transport, visitor_),
+ control_stream_(/*stream_id=*/0),
+ control_parser_(GetParam().uses_web_transport, &control_stream_,
+ visitor_),
data_stream_(/*stream_id=*/0),
data_parser_(&data_stream_, &visitor_) {}
@@ -264,7 +266,8 @@
data_stream_.Receive(data, fin);
data_parser_.ReadAllData();
} else {
- control_parser_.ProcessData(data, fin);
+ control_stream_.Receive(data, /*fin=*/false);
+ control_parser_.ReadAndDispatchMessages();
}
}
@@ -272,6 +275,7 @@
MoqtParserTestVisitor visitor_;
GeneralizedMessageType message_type_;
bool webtrans_;
+ webtransport::test::InMemoryStream control_stream_;
MoqtControlParser control_parser_;
webtransport::test::InMemoryStream data_stream_;
MoqtDataParser data_parser_;
@@ -284,7 +288,7 @@
TEST_P(MoqtParserTest, OneMessage) {
std::unique_ptr<TestMessageBase> message = MakeMessage();
ProcessData(message->PacketSample(), true);
- EXPECT_EQ(visitor_.messages_received_, 1);
+ ASSERT_EQ(visitor_.messages_received_, 1);
EXPECT_TRUE(message->EqualFieldValues(*visitor_.last_message_));
EXPECT_TRUE(visitor_.end_of_message_);
if (IsDataStream()) {
@@ -295,11 +299,11 @@
TEST_P(MoqtParserTest, OneMessageWithLongVarints) {
std::unique_ptr<TestMessageBase> message = MakeMessage();
message->ExpandVarints();
- ProcessData(message->PacketSample(), true);
+ ProcessData(message->PacketSample(), false);
EXPECT_EQ(visitor_.messages_received_, 1);
EXPECT_TRUE(message->EqualFieldValues(*visitor_.last_message_));
EXPECT_TRUE(visitor_.end_of_message_);
- EXPECT_FALSE(visitor_.parsing_error_.has_value());
+ EXPECT_EQ(visitor_.parsing_error_, std::nullopt);
if (IsDataStream()) {
EXPECT_EQ(visitor_.object_payload(), "foo");
}
@@ -379,6 +383,9 @@
}
TEST_P(MoqtParserTest, EarlyFin) {
+ if (!IsDataStream()) {
+ return;
+ }
std::unique_ptr<TestMessageBase> message = MakeMessage();
size_t first_data_size = message->total_message_size() - 1;
ProcessData(message->PacketSample().substr(0, first_data_size), true);
@@ -389,6 +396,9 @@
}
TEST_P(MoqtParserTest, SeparateEarlyFin) {
+ if (!IsDataStream()) {
+ return;
+ }
std::unique_ptr<TestMessageBase> message = MakeMessage();
size_t first_data_size = message->total_message_size() - 1;
ProcessData(message->PacketSample().substr(0, first_data_size), false);
@@ -420,7 +430,7 @@
std::unique_ptr<TestMessageBase> message = MakeMessage();
message->DecreasePayloadLengthByOne();
ProcessData(message->PacketSample(), false);
- EXPECT_EQ(visitor_.messages_received_, 1);
+ EXPECT_EQ(visitor_.messages_received_, 0);
EXPECT_EQ(visitor_.parsing_error_,
"Message length does not match payload length");
}
@@ -550,7 +560,8 @@
}
TEST_F(MoqtMessageSpecificTest, ClientSetupMaxSubscribeIdAppearsTwice) {
- MoqtControlParser parser(kRawQuic, visitor_);
+ webtransport::test::InMemoryStream stream(/*stream_id=*/0);
+ MoqtControlParser parser(kRawQuic, &stream, visitor_);
char setup[] = {
0x40, 0x40, 0x0f, 0x02, 0x01, 0x02, // versions
0x03, // 3 params
@@ -558,76 +569,81 @@
0x02, 0x01, 0x32, // max_subscribe_id = 50
0x02, 0x01, 0x32, // max_subscribe_id = 50
};
- parser.ProcessData(absl::string_view(setup, sizeof(setup)), false);
+ stream.Receive(absl::string_view(setup, sizeof(setup)), false);
+ parser.ReadAndDispatchMessages();
EXPECT_EQ(visitor_.messages_received_, 0);
- EXPECT_TRUE(visitor_.parsing_error_.has_value());
- EXPECT_EQ(*visitor_.parsing_error_,
+ EXPECT_EQ(visitor_.parsing_error_,
"MAX_SUBSCRIBE_ID parameter appears twice in SETUP");
EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation);
}
TEST_F(MoqtMessageSpecificTest, SetupPathFromServer) {
- MoqtControlParser parser(kRawQuic, visitor_);
+ webtransport::test::InMemoryStream stream(/*stream_id=*/0);
+ MoqtControlParser parser(kRawQuic, &stream, visitor_);
char setup[] = {
0x40, 0x41, 0x07,
0x01, // version = 1
0x01, // 1 param
0x01, 0x03, 0x66, 0x6f, 0x6f, // path = "foo"
};
- parser.ProcessData(absl::string_view(setup, sizeof(setup)), false);
+ stream.Receive(absl::string_view(setup, sizeof(setup)), false);
+ parser.ReadAndDispatchMessages();
EXPECT_EQ(visitor_.messages_received_, 0);
- EXPECT_TRUE(visitor_.parsing_error_.has_value());
- EXPECT_EQ(*visitor_.parsing_error_, "PATH parameter in SERVER_SETUP");
+ EXPECT_EQ(visitor_.parsing_error_, "PATH parameter in SERVER_SETUP");
EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation);
}
TEST_F(MoqtMessageSpecificTest, SetupPathAppearsTwice) {
- MoqtControlParser parser(kRawQuic, visitor_);
+ webtransport::test::InMemoryStream stream(/*stream_id=*/0);
+ MoqtControlParser parser(kRawQuic, &stream, visitor_);
char setup[] = {
0x40, 0x40, 0x0e, 0x02, 0x01, 0x02, // versions = 1, 2
0x02, // 2 params
0x01, 0x03, 0x66, 0x6f, 0x6f, // path = "foo"
0x01, 0x03, 0x66, 0x6f, 0x6f, // path = "foo"
};
- parser.ProcessData(absl::string_view(setup, sizeof(setup)), false);
+ stream.Receive(absl::string_view(setup, sizeof(setup)), false);
+ parser.ReadAndDispatchMessages();
EXPECT_EQ(visitor_.messages_received_, 0);
- EXPECT_TRUE(visitor_.parsing_error_.has_value());
- EXPECT_EQ(*visitor_.parsing_error_,
+ EXPECT_EQ(visitor_.parsing_error_,
"PATH parameter appears twice in CLIENT_SETUP");
EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation);
}
TEST_F(MoqtMessageSpecificTest, SetupPathOverWebtrans) {
- MoqtControlParser parser(kWebTrans, visitor_);
+ webtransport::test::InMemoryStream stream(/*stream_id=*/0);
+ MoqtControlParser parser(kWebTrans, &stream, visitor_);
char setup[] = {
0x40, 0x40, 0x09, 0x02, 0x01, 0x02, // versions = 1, 2
0x01, // 1 param
0x01, 0x03, 0x66, 0x6f, 0x6f, // path = "foo"
};
- parser.ProcessData(absl::string_view(setup, sizeof(setup)), false);
+ stream.Receive(absl::string_view(setup, sizeof(setup)), false);
+ parser.ReadAndDispatchMessages();
EXPECT_EQ(visitor_.messages_received_, 0);
- EXPECT_TRUE(visitor_.parsing_error_.has_value());
- EXPECT_EQ(*visitor_.parsing_error_,
+ EXPECT_EQ(visitor_.parsing_error_,
"WebTransport connection is using PATH parameter in SETUP");
EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation);
}
TEST_F(MoqtMessageSpecificTest, SetupPathMissing) {
- MoqtControlParser parser(kRawQuic, visitor_);
+ webtransport::test::InMemoryStream stream(/*stream_id=*/0);
+ MoqtControlParser parser(kRawQuic, &stream, visitor_);
char setup[] = {
0x40, 0x40, 0x04, 0x02, 0x01, 0x02, // versions = 1, 2
0x00, // no param
};
- parser.ProcessData(absl::string_view(setup, sizeof(setup)), false);
+ stream.Receive(absl::string_view(setup, sizeof(setup)), false);
+ parser.ReadAndDispatchMessages();
EXPECT_EQ(visitor_.messages_received_, 0);
- EXPECT_TRUE(visitor_.parsing_error_.has_value());
- EXPECT_EQ(*visitor_.parsing_error_,
+ EXPECT_EQ(visitor_.parsing_error_,
"PATH SETUP parameter missing from Client message over QUIC");
EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation);
}
TEST_F(MoqtMessageSpecificTest, ServerSetupMaxSubscribeIdAppearsTwice) {
- MoqtControlParser parser(kRawQuic, visitor_);
+ webtransport::test::InMemoryStream stream(/*stream_id=*/0);
+ MoqtControlParser parser(kRawQuic, &stream, visitor_);
char setup[] = {
0x40, 0x40, 0x0f, 0x02, 0x01, 0x02, // versions = 1, 2
0x03, // 4 params
@@ -635,16 +651,17 @@
0x02, 0x01, 0x32, // max_subscribe_id = 50
0x02, 0x01, 0x32, // max_subscribe_id = 50
};
- parser.ProcessData(absl::string_view(setup, sizeof(setup)), false);
+ stream.Receive(absl::string_view(setup, sizeof(setup)), false);
+ parser.ReadAndDispatchMessages();
EXPECT_EQ(visitor_.messages_received_, 0);
- EXPECT_TRUE(visitor_.parsing_error_.has_value());
- EXPECT_EQ(*visitor_.parsing_error_,
+ EXPECT_EQ(visitor_.parsing_error_,
"MAX_SUBSCRIBE_ID parameter appears twice in SETUP");
EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation);
}
TEST_F(MoqtMessageSpecificTest, SubscribeAuthorizationInfoTwice) {
- MoqtControlParser parser(kWebTrans, visitor_);
+ webtransport::test::InMemoryStream stream(/*stream_id=*/0);
+ MoqtControlParser parser(kWebTrans, &stream, visitor_);
char subscribe[] = {
0x03, 0x1a, 0x01, 0x02, 0x01,
0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
@@ -655,7 +672,8 @@
0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar"
0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar"
};
- parser.ProcessData(absl::string_view(subscribe, sizeof(subscribe)), false);
+ stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false);
+ parser.ReadAndDispatchMessages();
EXPECT_EQ(visitor_.messages_received_, 0);
EXPECT_EQ(visitor_.parsing_error_,
"AUTHORIZATION_INFO parameter appears twice");
@@ -663,7 +681,8 @@
}
TEST_F(MoqtMessageSpecificTest, SubscribeDeliveryTimeoutTwice) {
- MoqtControlParser parser(kRawQuic, visitor_);
+ webtransport::test::InMemoryStream stream(/*stream_id=*/0);
+ MoqtControlParser parser(kRawQuic, &stream, visitor_);
char subscribe[] = {
0x03, 0x18, 0x01, 0x02, 0x01,
0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
@@ -674,7 +693,8 @@
0x03, 0x02, 0x67, 0x10, // delivery_timeout = 10000
0x03, 0x02, 0x67, 0x10, // delivery_timeout = 10000
};
- parser.ProcessData(absl::string_view(subscribe, sizeof(subscribe)), false);
+ stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false);
+ parser.ReadAndDispatchMessages();
EXPECT_EQ(visitor_.messages_received_, 0);
EXPECT_EQ(visitor_.parsing_error_,
"DELIVERY_TIMEOUT parameter appears twice");
@@ -682,7 +702,8 @@
}
TEST_F(MoqtMessageSpecificTest, SubscribeDeliveryTimeoutMalformed) {
- MoqtControlParser parser(kRawQuic, visitor_);
+ webtransport::test::InMemoryStream stream(/*stream_id=*/0);
+ MoqtControlParser parser(kRawQuic, &stream, visitor_);
char subscribe[] = {
0x03, 0x14, 0x01, 0x02, 0x01,
0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
@@ -692,7 +713,8 @@
0x01, // one param
0x03, 0x01, 0x67, 0x10, // delivery_timeout = 10000
};
- parser.ProcessData(absl::string_view(subscribe, sizeof(subscribe)), false);
+ stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false);
+ parser.ReadAndDispatchMessages();
EXPECT_EQ(visitor_.messages_received_, 0);
EXPECT_EQ(visitor_.parsing_error_,
"Parameter length does not match varint encoding");
@@ -700,7 +722,8 @@
}
TEST_F(MoqtMessageSpecificTest, SubscribeMaxCacheDurationTwice) {
- MoqtControlParser parser(kRawQuic, visitor_);
+ webtransport::test::InMemoryStream stream(/*stream_id=*/0);
+ MoqtControlParser parser(kRawQuic, &stream, visitor_);
char subscribe[] = {
0x03, 0x18, 0x01, 0x02, 0x01,
0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
@@ -711,7 +734,8 @@
0x04, 0x02, 0x67, 0x10, // max_cache_duration = 10000
0x04, 0x02, 0x67, 0x10, // max_cache_duration = 10000
};
- parser.ProcessData(absl::string_view(subscribe, sizeof(subscribe)), false);
+ stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false);
+ parser.ReadAndDispatchMessages();
EXPECT_EQ(visitor_.messages_received_, 0);
EXPECT_EQ(visitor_.parsing_error_,
"MAX_CACHE_DURATION parameter appears twice");
@@ -719,7 +743,8 @@
}
TEST_F(MoqtMessageSpecificTest, SubscribeMaxCacheDurationMalformed) {
- MoqtControlParser parser(kRawQuic, visitor_);
+ webtransport::test::InMemoryStream stream(/*stream_id=*/0);
+ MoqtControlParser parser(kRawQuic, &stream, visitor_);
char subscribe[] = {
0x03, 0x14, 0x01, 0x02, 0x01,
0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
@@ -729,7 +754,8 @@
0x01, // one param
0x04, 0x01, 0x67, 0x10, // max_cache_duration = 10000
};
- parser.ProcessData(absl::string_view(subscribe, sizeof(subscribe)), false);
+ stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false);
+ parser.ReadAndDispatchMessages();
EXPECT_EQ(visitor_.messages_received_, 0);
EXPECT_EQ(visitor_.parsing_error_,
"Parameter length does not match varint encoding");
@@ -737,7 +763,8 @@
}
TEST_F(MoqtMessageSpecificTest, SubscribeOkHasAuthorizationInfo) {
- MoqtControlParser parser(kWebTrans, visitor_);
+ webtransport::test::InMemoryStream stream(/*stream_id=*/0);
+ MoqtControlParser parser(kWebTrans, &stream, visitor_);
char subscribe_ok[] = {
0x04, 0x10, 0x01, 0x03, // subscribe_id = 1, expires = 3
0x02, 0x01, // group_order = 2, content exists
@@ -746,56 +773,60 @@
0x03, 0x02, 0x67, 0x10, // delivery_timeout = 10000
0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar"
};
- parser.ProcessData(absl::string_view(subscribe_ok, sizeof(subscribe_ok)),
- false);
+ stream.Receive(absl::string_view(subscribe_ok, sizeof(subscribe_ok)), false);
+ parser.ReadAndDispatchMessages();
EXPECT_EQ(visitor_.messages_received_, 0);
EXPECT_EQ(visitor_.parsing_error_, "SUBSCRIBE_OK has authorization info");
EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation);
}
TEST_F(MoqtMessageSpecificTest, SubscribeUpdateHasAuthorizationInfo) {
- MoqtControlParser parser(kWebTrans, visitor_);
+ webtransport::test::InMemoryStream stream(/*stream_id=*/0);
+ MoqtControlParser parser(kWebTrans, &stream, visitor_);
char subscribe_update[] = {
0x02, 0x0b, 0x02, 0x03, 0x01, 0x05, // start and end sequences
0xaa, // priority = 0xaa
0x01, // 1 parameter
0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar"
};
- parser.ProcessData(
- absl::string_view(subscribe_update, sizeof(subscribe_update)), false);
+ stream.Receive(absl::string_view(subscribe_update, sizeof(subscribe_update)),
+ false);
+ parser.ReadAndDispatchMessages();
EXPECT_EQ(visitor_.messages_received_, 0);
EXPECT_EQ(visitor_.parsing_error_, "SUBSCRIBE_UPDATE has authorization info");
EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation);
}
TEST_F(MoqtMessageSpecificTest, AnnounceAuthorizationInfoTwice) {
- MoqtControlParser parser(kWebTrans, visitor_);
+ webtransport::test::InMemoryStream stream(/*stream_id=*/0);
+ MoqtControlParser parser(kWebTrans, &stream, visitor_);
char announce[] = {
0x06, 0x10, 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
0x02, // 2 params
0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar"
0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar"
};
- parser.ProcessData(absl::string_view(announce, sizeof(announce)), false);
+ stream.Receive(absl::string_view(announce, sizeof(announce)), false);
+ parser.ReadAndDispatchMessages();
EXPECT_EQ(visitor_.messages_received_, 0);
- EXPECT_TRUE(visitor_.parsing_error_.has_value());
- EXPECT_EQ(*visitor_.parsing_error_,
+ EXPECT_EQ(visitor_.parsing_error_,
"AUTHORIZATION_INFO parameter appears twice");
EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation);
}
TEST_F(MoqtMessageSpecificTest, AnnounceHasDeliveryTimeout) {
- MoqtControlParser parser(kWebTrans, visitor_);
+ webtransport::test::InMemoryStream stream(/*stream_id=*/0);
+ MoqtControlParser parser(kWebTrans, &stream, visitor_);
char announce[] = {
0x06, 0x0f, 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
0x02, // 2 params
0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar"
0x03, 0x02, 0x67, 0x10, // delivery_timeout = 10000
};
- parser.ProcessData(absl::string_view(announce, sizeof(announce)), false);
+ stream.Receive(absl::string_view(announce, sizeof(announce)), false);
+ parser.ReadAndDispatchMessages();
EXPECT_EQ(visitor_.messages_received_, 0);
- EXPECT_TRUE(visitor_.parsing_error_.has_value());
- EXPECT_EQ(*visitor_.parsing_error_, "ANNOUNCE has delivery timeout");
+ EXPECT_EQ(visitor_.parsing_error_, "ANNOUNCE has delivery timeout");
EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation);
}
@@ -829,11 +860,12 @@
EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation);
}
-TEST_F(MoqtMessageSpecificTest, DataAfterFin) {
- MoqtControlParser parser(kRawQuic, visitor_);
- parser.ProcessData(absl::string_view(), true); // Find FIN
- parser.ProcessData("foo", false);
- EXPECT_EQ(visitor_.parsing_error_, "Data after end of stream");
+TEST_F(MoqtMessageSpecificTest, ControlStreamFin) {
+ webtransport::test::InMemoryStream stream(/*stream_id=*/0);
+ MoqtControlParser parser(kRawQuic, &stream, visitor_);
+ stream.Receive(absl::string_view(), true); // Find FIN
+ parser.ReadAndDispatchMessages();
+ EXPECT_EQ(visitor_.parsing_error_, "FIN on control stream");
EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation);
}
@@ -855,7 +887,8 @@
}
TEST_F(MoqtMessageSpecificTest, Setup2KB) {
- MoqtControlParser parser(kRawQuic, visitor_);
+ webtransport::test::InMemoryStream stream(/*stream_id=*/0);
+ MoqtControlParser parser(kRawQuic, &stream, visitor_);
char big_message[2 * kMaxMessageHeaderSize];
quic::QuicDataWriter writer(sizeof(big_message), big_message);
writer.WriteVarInt62(static_cast<uint64_t>(MoqtMessageType::kServerSetup));
@@ -866,29 +899,31 @@
writer.WriteVarInt62(kMaxMessageHeaderSize); // very long parameter
writer.WriteRepeatedByte(0x04, kMaxMessageHeaderSize);
// Send incomplete message
- parser.ProcessData(absl::string_view(big_message, writer.length() - 1),
- false);
+ stream.Receive(absl::string_view(big_message, writer.length() - 1), false);
+ parser.ReadAndDispatchMessages();
EXPECT_EQ(visitor_.messages_received_, 0);
- EXPECT_TRUE(visitor_.parsing_error_.has_value());
- EXPECT_EQ(*visitor_.parsing_error_, "Cannot parse non-OBJECT messages > 2KB");
+ EXPECT_EQ(visitor_.parsing_error_,
+ "Cannot parse control messages more than 2048 bytes");
EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kInternalError);
}
TEST_F(MoqtMessageSpecificTest, UnknownMessageType) {
- MoqtControlParser parser(kRawQuic, visitor_);
+ webtransport::test::InMemoryStream stream(/*stream_id=*/0);
+ MoqtControlParser parser(kRawQuic, &stream, visitor_);
char message[6];
quic::QuicDataWriter writer(sizeof(message), message);
writer.WriteVarInt62(0xbeef); // unknown message type
writer.WriteVarInt62(0x1); // length
writer.WriteVarInt62(0x1); // payload
- parser.ProcessData(absl::string_view(message, writer.length()), false);
+ stream.Receive(absl::string_view(message, writer.length()), false);
+ parser.ReadAndDispatchMessages();
EXPECT_EQ(visitor_.messages_received_, 0);
- EXPECT_TRUE(visitor_.parsing_error_.has_value());
- EXPECT_EQ(*visitor_.parsing_error_, "Unknown message type");
+ EXPECT_EQ(visitor_.parsing_error_, "Unknown message type");
}
TEST_F(MoqtMessageSpecificTest, LatestObject) {
- MoqtControlParser parser(kRawQuic, visitor_);
+ webtransport::test::InMemoryStream stream(/*stream_id=*/0);
+ MoqtControlParser parser(kRawQuic, &stream, visitor_);
char subscribe[] = {
0x03, 0x15, 0x01, 0x02, // id and alias
0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
@@ -898,8 +933,9 @@
0x01, // 1 parameter
0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar"
};
- parser.ProcessData(absl::string_view(subscribe, sizeof(subscribe)), false);
- EXPECT_EQ(visitor_.messages_received_, 1);
+ stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false);
+ parser.ReadAndDispatchMessages();
+ ASSERT_EQ(visitor_.messages_received_, 1);
EXPECT_FALSE(visitor_.parsing_error_.has_value());
MoqtSubscribe message =
std::get<MoqtSubscribe>(visitor_.last_message_.value());
@@ -908,7 +944,8 @@
}
TEST_F(MoqtMessageSpecificTest, InvalidDeliveryOrder) {
- MoqtControlParser parser(kRawQuic, visitor_);
+ webtransport::test::InMemoryStream stream(/*stream_id=*/0);
+ MoqtControlParser parser(kRawQuic, &stream, visitor_);
char subscribe[] = {
0x03, 0x15, 0x01, 0x02, // id and alias
0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
@@ -918,13 +955,15 @@
0x01, // 1 parameter
0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar"
};
- parser.ProcessData(absl::string_view(subscribe, sizeof(subscribe)), false);
+ stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false);
+ parser.ReadAndDispatchMessages();
EXPECT_EQ(visitor_.messages_received_, 0);
EXPECT_THAT(visitor_.parsing_error_, Optional(HasSubstr("group order")));
}
TEST_F(MoqtMessageSpecificTest, AbsoluteStart) {
- MoqtControlParser parser(kRawQuic, visitor_);
+ webtransport::test::InMemoryStream stream(/*stream_id=*/0);
+ MoqtControlParser parser(kRawQuic, &stream, visitor_);
char subscribe[] = {
0x03, 0x17, 0x01, 0x02, // id and alias
0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
@@ -936,8 +975,9 @@
0x01, // 1 parameter
0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar"
};
- parser.ProcessData(absl::string_view(subscribe, sizeof(subscribe)), false);
- EXPECT_EQ(visitor_.messages_received_, 1);
+ stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false);
+ parser.ReadAndDispatchMessages();
+ ASSERT_EQ(visitor_.messages_received_, 1);
EXPECT_FALSE(visitor_.parsing_error_.has_value());
MoqtSubscribe message =
std::get<MoqtSubscribe>(visitor_.last_message_.value());
@@ -947,7 +987,8 @@
}
TEST_F(MoqtMessageSpecificTest, AbsoluteRange) {
- MoqtControlParser parser(kRawQuic, visitor_);
+ webtransport::test::InMemoryStream stream(/*stream_id=*/0);
+ MoqtControlParser parser(kRawQuic, &stream, visitor_);
char subscribe[] = {
0x03, 0x18, 0x01, 0x02, // id and alias
0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
@@ -960,8 +1001,9 @@
0x01, // 1 parameter
0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar"
};
- parser.ProcessData(absl::string_view(subscribe, sizeof(subscribe)), false);
- EXPECT_EQ(visitor_.messages_received_, 1);
+ stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false);
+ parser.ReadAndDispatchMessages();
+ ASSERT_EQ(visitor_.messages_received_, 1);
EXPECT_FALSE(visitor_.parsing_error_.has_value());
MoqtSubscribe message =
std::get<MoqtSubscribe>(visitor_.last_message_.value());
@@ -971,7 +1013,8 @@
}
TEST_F(MoqtMessageSpecificTest, AbsoluteRangeEndGroupTooLow) {
- MoqtControlParser parser(kRawQuic, visitor_);
+ webtransport::test::InMemoryStream stream(/*stream_id=*/0);
+ MoqtControlParser parser(kRawQuic, &stream, visitor_);
char subscribe[] = {
0x03, 0x18, 0x01, 0x02, // id and alias
0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
@@ -984,14 +1027,15 @@
0x01, // 1 parameter
0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar"
};
- parser.ProcessData(absl::string_view(subscribe, sizeof(subscribe)), false);
+ stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false);
+ parser.ReadAndDispatchMessages();
EXPECT_EQ(visitor_.messages_received_, 0);
- EXPECT_TRUE(visitor_.parsing_error_.has_value());
- EXPECT_EQ(*visitor_.parsing_error_, "End group is less than start group");
+ EXPECT_EQ(visitor_.parsing_error_, "End group is less than start group");
}
TEST_F(MoqtMessageSpecificTest, AbsoluteRangeExactlyOneObject) {
- MoqtControlParser parser(kRawQuic, visitor_);
+ webtransport::test::InMemoryStream stream(/*stream_id=*/0);
+ MoqtControlParser parser(kRawQuic, &stream, visitor_);
char subscribe[] = {
0x03, 0x13, 0x01, 0x02, // id and alias
0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
@@ -1003,45 +1047,51 @@
0x04, // end_group = 4
0x00, // no parameters
};
- parser.ProcessData(absl::string_view(subscribe, sizeof(subscribe)), false);
+ stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false);
+ parser.ReadAndDispatchMessages();
EXPECT_EQ(visitor_.messages_received_, 1);
}
TEST_F(MoqtMessageSpecificTest, SubscribeUpdateExactlyOneObject) {
- MoqtControlParser parser(kRawQuic, visitor_);
+ webtransport::test::InMemoryStream stream(/*stream_id=*/0);
+ MoqtControlParser parser(kRawQuic, &stream, visitor_);
char subscribe_update[] = {
0x02, 0x06, 0x02, 0x03, 0x01, 0x04, // start and end sequences
0x20, // priority
0x00, // No parameters
};
- parser.ProcessData(
- absl::string_view(subscribe_update, sizeof(subscribe_update)), false);
+ stream.Receive(absl::string_view(subscribe_update, sizeof(subscribe_update)),
+ false);
+ parser.ReadAndDispatchMessages();
EXPECT_EQ(visitor_.messages_received_, 1);
}
TEST_F(MoqtMessageSpecificTest, SubscribeUpdateEndGroupTooLow) {
- MoqtControlParser parser(kRawQuic, visitor_);
+ webtransport::test::InMemoryStream stream(/*stream_id=*/0);
+ MoqtControlParser parser(kRawQuic, &stream, visitor_);
char subscribe_update[] = {
0x02, 0x0b, 0x02, 0x03, 0x01, 0x03, // start and end sequences
0x20, // priority
0x01, // 1 parameter
0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar"
};
- parser.ProcessData(
- absl::string_view(subscribe_update, sizeof(subscribe_update)), false);
+ stream.Receive(absl::string_view(subscribe_update, sizeof(subscribe_update)),
+ false);
+ parser.ReadAndDispatchMessages();
EXPECT_EQ(visitor_.messages_received_, 0);
- EXPECT_TRUE(visitor_.parsing_error_.has_value());
- EXPECT_EQ(*visitor_.parsing_error_, "End group is less than start group");
+ EXPECT_EQ(visitor_.parsing_error_, "End group is less than start group");
}
TEST_F(MoqtMessageSpecificTest, ObjectAckNegativeDelta) {
- MoqtControlParser parser(kRawQuic, visitor_);
+ webtransport::test::InMemoryStream stream(/*stream_id=*/0);
+ MoqtControlParser parser(kRawQuic, &stream, visitor_);
char object_ack[] = {
0x71, 0x84, 0x05, // type
0x01, 0x10, 0x20, // subscribe ID, group, object
0x40, 0x81, // -0x40 time delta
};
- parser.ProcessData(absl::string_view(object_ack, sizeof(object_ack)), false);
+ stream.Receive(absl::string_view(object_ack, sizeof(object_ack)), false);
+ parser.ReadAndDispatchMessages();
EXPECT_EQ(visitor_.parsing_error_, std::nullopt);
ASSERT_EQ(visitor_.messages_received_, 1);
MoqtObjectAck message =
@@ -1055,7 +1105,8 @@
TEST_F(MoqtMessageSpecificTest, AllMessagesTogether) {
char buffer[5000];
- MoqtControlParser parser(kRawQuic, visitor_);
+ webtransport::test::InMemoryStream stream(/*stream_id=*/0);
+ MoqtControlParser parser(kRawQuic, &stream, visitor_);
size_t write = 0;
size_t read = 0;
int fully_received = 0;
@@ -1068,9 +1119,9 @@
memcpy(buffer + write, message->PacketSample().data(),
message->total_message_size());
size_t new_read = write + message->total_message_size() / 2;
- parser.ProcessData(absl::string_view(buffer + read, new_read - read),
- false);
- EXPECT_EQ(visitor_.messages_received_, fully_received);
+ stream.Receive(absl::string_view(buffer + read, new_read - read), false);
+ parser.ReadAndDispatchMessages();
+ ASSERT_EQ(visitor_.messages_received_, fully_received);
if (prev_message != nullptr) {
EXPECT_TRUE(prev_message->EqualFieldValues(*visitor_.last_message_));
}
@@ -1080,7 +1131,8 @@
prev_message = std::move(message);
}
// Deliver the rest
- parser.ProcessData(absl::string_view(buffer + read, write - read), true);
+ stream.Receive(absl::string_view(buffer + read, write - read), false);
+ parser.ReadAndDispatchMessages();
EXPECT_EQ(visitor_.messages_received_, fully_received);
EXPECT_TRUE(prev_message->EqualFieldValues(*visitor_.last_message_));
EXPECT_FALSE(visitor_.parsing_error_.has_value());
@@ -1136,57 +1188,62 @@
}
TEST_F(MoqtMessageSpecificTest, SubscribeOkInvalidContentExists) {
- MoqtControlParser parser(kRawQuic, visitor_);
+ webtransport::test::InMemoryStream stream(/*stream_id=*/0);
+ MoqtControlParser parser(kRawQuic, &stream, visitor_);
SubscribeOkMessage subscribe_ok;
subscribe_ok.SetInvalidContentExists();
- parser.ProcessData(subscribe_ok.PacketSample(), false);
+ stream.Receive(subscribe_ok.PacketSample(), false);
+ parser.ReadAndDispatchMessages();
EXPECT_EQ(visitor_.messages_received_, 0);
- EXPECT_TRUE(visitor_.parsing_error_.has_value());
- EXPECT_EQ(*visitor_.parsing_error_,
+ EXPECT_EQ(visitor_.parsing_error_,
"SUBSCRIBE_OK ContentExists has invalid value");
}
TEST_F(MoqtMessageSpecificTest, SubscribeOkInvalidDeliveryOrder) {
- MoqtControlParser parser(kRawQuic, visitor_);
+ webtransport::test::InMemoryStream stream(/*stream_id=*/0);
+ MoqtControlParser parser(kRawQuic, &stream, visitor_);
SubscribeOkMessage subscribe_ok;
subscribe_ok.SetInvalidDeliveryOrder();
- parser.ProcessData(subscribe_ok.PacketSample(), false);
+ stream.Receive(subscribe_ok.PacketSample(), false);
+ parser.ReadAndDispatchMessages();
EXPECT_EQ(visitor_.messages_received_, 0);
- EXPECT_TRUE(visitor_.parsing_error_.has_value());
- EXPECT_EQ(*visitor_.parsing_error_,
+ EXPECT_EQ(visitor_.parsing_error_,
"Invalid group order value in SUBSCRIBE_OK");
}
TEST_F(MoqtMessageSpecificTest, FetchInvalidRange) {
- MoqtControlParser parser(kRawQuic, visitor_);
+ webtransport::test::InMemoryStream stream(/*stream_id=*/0);
+ MoqtControlParser parser(kRawQuic, &stream, visitor_);
FetchMessage fetch;
fetch.SetEndObject(1, 1);
- parser.ProcessData(fetch.PacketSample(), false);
+ stream.Receive(fetch.PacketSample(), false);
+ parser.ReadAndDispatchMessages();
EXPECT_EQ(visitor_.messages_received_, 0);
- EXPECT_TRUE(visitor_.parsing_error_.has_value());
- EXPECT_EQ(*visitor_.parsing_error_,
+ EXPECT_EQ(visitor_.parsing_error_,
"End object comes before start object in FETCH");
}
TEST_F(MoqtMessageSpecificTest, FetchInvalidRange2) {
- MoqtControlParser parser(kRawQuic, visitor_);
+ webtransport::test::InMemoryStream stream(/*stream_id=*/0);
+ MoqtControlParser parser(kRawQuic, &stream, visitor_);
FetchMessage fetch;
fetch.SetEndObject(0, std::nullopt);
- parser.ProcessData(fetch.PacketSample(), false);
+ stream.Receive(fetch.PacketSample(), false);
+ parser.ReadAndDispatchMessages();
EXPECT_EQ(visitor_.messages_received_, 0);
- EXPECT_TRUE(visitor_.parsing_error_.has_value());
- EXPECT_EQ(*visitor_.parsing_error_,
+ EXPECT_EQ(visitor_.parsing_error_,
"End object comes before start object in FETCH");
}
TEST_F(MoqtMessageSpecificTest, FetchInvalidGroupOrder) {
- MoqtControlParser parser(kRawQuic, visitor_);
+ webtransport::test::InMemoryStream stream(/*stream_id=*/0);
+ MoqtControlParser parser(kRawQuic, &stream, visitor_);
FetchMessage fetch;
fetch.SetGroupOrder(3);
- parser.ProcessData(fetch.PacketSample(), false);
+ stream.Receive(fetch.PacketSample(), false);
+ parser.ReadAndDispatchMessages();
EXPECT_EQ(visitor_.messages_received_, 0);
- EXPECT_TRUE(visitor_.parsing_error_.has_value());
- EXPECT_EQ(*visitor_.parsing_error_,
+ EXPECT_EQ(visitor_.parsing_error_,
"Invalid group order value in FETCH message");
}
@@ -1208,25 +1265,28 @@
// All messages with TrackNamespace use ReadTrackNamespace too check this. Use
// ANNOUNCE_OK for the test because it's small.
TEST_F(MoqtMessageSpecificTest, NamespaceTooSmall) {
- MoqtControlParser parser(kRawQuic, visitor_);
+ webtransport::test::InMemoryStream stream(/*stream_id=*/0);
+ MoqtControlParser parser(kRawQuic, &stream, visitor_);
char announce_ok[] = {
0x07, 0x03, // type, length
0x01, 0x01, 'a', // 1 namespace element
};
- parser.ProcessData(absl::string_view(announce_ok, sizeof(announce_ok)),
- false);
+ stream.Receive(absl::string_view(announce_ok, sizeof(announce_ok)), false);
+ parser.ReadAndDispatchMessages();
EXPECT_EQ(visitor_.messages_received_, 1);
EXPECT_EQ(visitor_.parsing_error_, std::nullopt);
announce_ok[1] -= 2; // Remove one element.
announce_ok[2] = 0x00;
- parser.ProcessData(absl::string_view(announce_ok, sizeof(announce_ok) - 2),
- false);
+ stream.Receive(absl::string_view(announce_ok, sizeof(announce_ok) - 2),
+ false);
+ parser.ReadAndDispatchMessages();
EXPECT_EQ(visitor_.messages_received_, 1);
EXPECT_EQ(visitor_.parsing_error_, "Invalid number of namespace elements");
}
TEST_F(MoqtMessageSpecificTest, NamespaceTooLarge) {
- MoqtControlParser parser(kRawQuic, visitor_);
+ webtransport::test::InMemoryStream stream(/*stream_id=*/0);
+ MoqtControlParser parser(kRawQuic, &stream, visitor_);
char announce_ok[70] = {
0x07, 0x40, 0x41, // type, length = 65
0x20, // 32 namespace elements. This is the maximum.
@@ -1235,22 +1295,25 @@
announce_ok[i] = 0x01;
announce_ok[i + 1] = 'a' + i;
}
- parser.ProcessData(absl::string_view(announce_ok, sizeof(announce_ok) - 2),
- false);
+ stream.Receive(absl::string_view(announce_ok, sizeof(announce_ok) - 2),
+ false);
+ parser.ReadAndDispatchMessages();
EXPECT_EQ(visitor_.messages_received_, 1);
EXPECT_EQ(visitor_.parsing_error_, std::nullopt);
announce_ok[2] += 2; // Add one element.
++announce_ok[3];
- parser.ProcessData(absl::string_view(announce_ok, sizeof(announce_ok)),
- false);
+ stream.Receive(absl::string_view(announce_ok, sizeof(announce_ok)), false);
+ parser.ReadAndDispatchMessages();
EXPECT_EQ(visitor_.messages_received_, 1);
EXPECT_EQ(visitor_.parsing_error_, "Invalid number of namespace elements");
}
TEST_F(MoqtMessageSpecificTest, JoiningFetch) {
- MoqtControlParser parser(kRawQuic, visitor_);
+ webtransport::test::InMemoryStream stream(/*stream_id=*/0);
+ MoqtControlParser parser(kRawQuic, &stream, visitor_);
JoiningFetchMessage message;
- parser.ProcessData(message.PacketSample(), false);
+ stream.Receive(message.PacketSample(), false);
+ parser.ReadAndDispatchMessages();
EXPECT_EQ(visitor_.messages_received_, 1);
EXPECT_EQ(visitor_.parsing_error_, std::nullopt);
EXPECT_TRUE(visitor_.last_message_.has_value() &&
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index 48dd8f0..c1ee3d0 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -811,30 +811,18 @@
return true;
}
-template <class Parser>
-static void ForwardStreamDataToParser(webtransport::Stream& stream,
- Parser& parser) {
- bool fin =
- quiche::ProcessAllReadableRegions(stream, [&](absl::string_view chunk) {
- parser.ProcessData(chunk, /*end_of_stream=*/false);
- });
- if (fin) {
- parser.ProcessData("", /*end_of_stream=*/true);
- }
-}
-
MoqtSession::ControlStream::ControlStream(MoqtSession* session,
webtransport::Stream* stream)
: session_(session),
stream_(stream),
- parser_(session->parameters_.using_webtrans, *this) {
+ parser_(session->parameters_.using_webtrans, stream, *this) {
stream_->SetPriority(
webtransport::StreamPriority{/*send_group_id=*/kMoqtSendGroupId,
/*send_order=*/kMoqtControlStreamSendOrder});
}
void MoqtSession::ControlStream::OnCanRead() {
- ForwardStreamDataToParser(*stream_, parser_);
+ parser_.ReadAndDispatchMessages();
}
void MoqtSession::ControlStream::OnCanWrite() {
// We buffer serialized control frames unconditionally, thus OnCanWrite()
diff --git a/quiche/quic/moqt/test_tools/moqt_framer_utils.cc b/quiche/quic/moqt/test_tools/moqt_framer_utils.cc
index 709907e..9f7aadf 100644
--- a/quiche/quic/moqt/test_tools/moqt_framer_utils.cc
+++ b/quiche/quic/moqt/test_tools/moqt_framer_utils.cc
@@ -19,6 +19,7 @@
#include "quiche/common/quiche_buffer_allocator.h"
#include "quiche/common/quiche_stream.h"
#include "quiche/common/simple_buffer_allocator.h"
+#include "quiche/web_transport/test_tools/in_memory_stream.h"
namespace moqt::test {
@@ -303,8 +304,10 @@
std::vector<MoqtGenericFrame> ParseGenericMessage(absl::string_view body) {
std::vector<MoqtGenericFrame> result;
GenericMessageParseVisitor visitor(&result);
- MoqtControlParser parser(/*uses_web_transport=*/true, visitor);
- parser.ProcessData(body, /*fin=*/true);
+ webtransport::test::InMemoryStream stream(/*id=*/0);
+ MoqtControlParser parser(/*uses_web_transport=*/true, &stream, visitor);
+ stream.Receive(body, /*fin=*/false);
+ parser.ReadAndDispatchMessages();
return result;
}