blob: e17e8d03e31f7976aca9eca1a68374e9ba51a611 [file]
// Copyright (c) 2024 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <cstdint>
#include <memory>
#include <string>
#include <utility>
#include "absl/strings/string_view.h"
#include "quiche/quic/core/io/quic_event_loop.h"
#include "quiche/quic/core/quic_server_id.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/test_tools/mock_moqt_session.h"
#include "quiche/quic/moqt/tools/chat_client.h"
#include "quiche/quic/moqt/tools/moq_chat.h"
#include "quiche/quic/moqt/tools/moqt_relay.h"
#include "quiche/quic/test_tools/crypto_test_utils.h"
#include "quiche/common/platform/api/quiche_test.h"
#include "quiche/common/quiche_callbacks.h"
namespace moqt {
namespace test {
using ::testing::_;
constexpr absl::string_view kChatHostname = "127.0.0.1";
class MockChatUserInterface : public moqt::moq_chat::ChatUserInterface {
public:
void Initialize(quiche::MultiUseCallback<void(absl::string_view)> callback,
quic::QuicEventLoop* event_loop) override {
callback_ = std::move(callback);
event_loop_ = event_loop;
}
void IoLoop() override {
event_loop_->RunEventLoopOnce(moqt::moq_chat::kChatEventLoopDuration);
}
MOCK_METHOD(void, WriteToOutput,
(absl::string_view user, absl::string_view message), (override));
void SendMessage(absl::string_view message) { callback_(message); }
private:
quiche::MultiUseCallback<void(absl::string_view)> callback_;
quic::QuicEventLoop* event_loop_;
std::string message_;
};
class TestMoqtRelay : public moqt::MoqtRelay {
public:
TestMoqtRelay(absl::string_view bind_address, uint16_t bind_port,
absl::string_view default_upstream, bool ignore_certificate)
: MoqtRelay(quic::test::crypto_test_utils::ProofSourceForTesting(),
std::string(bind_address), bind_port, default_upstream,
ignore_certificate) {}
using MoqtRelay::publisher;
using MoqtRelay::server;
};
class MoqChatEndToEndTest : public quiche::test::QuicheTest {
public:
MoqChatEndToEndTest() : relay_(kChatHostname, 0, "", true) {
auto if1ptr = std::make_unique<MockChatUserInterface>();
auto if2ptr = std::make_unique<MockChatUserInterface>();
interface1_ = if1ptr.get();
interface2_ = if2ptr.get();
uint16_t port = relay_.server()->port();
client1_ = std::make_unique<moqt::moq_chat::ChatClient>(
quic::QuicServerId(std::string(kChatHostname), port), true,
std::move(if1ptr), "test_chat", "client1", "device1",
relay_.server()->event_loop());
client2_ = std::make_unique<moqt::moq_chat::ChatClient>(
quic::QuicServerId(std::string(kChatHostname), port), true,
std::move(if2ptr), "test_chat", "client2", "device2",
relay_.server()->event_loop());
}
void SendAndWaitForOutput(MockChatUserInterface* sender,
MockChatUserInterface* receiver,
absl::string_view sender_name,
absl::string_view message) {
bool message_to_output = false;
EXPECT_CALL(*receiver, WriteToOutput(sender_name, message)).WillOnce([&] {
message_to_output = true;
});
sender->SendMessage(message);
while (!message_to_output) {
relay_.server()->WaitForEvents();
}
}
TestMoqtRelay relay_;
MockChatUserInterface *interface1_, *interface2_;
std::unique_ptr<moqt::moq_chat::ChatClient> client1_, client2_;
};
TEST_F(MoqChatEndToEndTest, EndToEndTest) {
EXPECT_TRUE(client1_->Connect(moqt::moq_chat::kWebtransPath));
EXPECT_TRUE(client2_->Connect(moqt::moq_chat::kWebtransPath));
EXPECT_TRUE(client1_->PublishNamespaceAndSubscribeNamespace());
EXPECT_TRUE(client2_->PublishNamespaceAndSubscribeNamespace());
while (client1_->is_syncing() || client2_->is_syncing()) {
relay_.server()->WaitForEvents();
}
SendAndWaitForOutput(interface1_, interface2_, "client1", "Hello");
SendAndWaitForOutput(interface2_, interface1_, "client2", "Hi");
SendAndWaitForOutput(interface1_, interface2_, "client1", "How are you?");
SendAndWaitForOutput(interface2_, interface1_, "client2", "Good, and you?");
SendAndWaitForOutput(interface1_, interface2_, "client1", "I'm fine");
SendAndWaitForOutput(interface2_, interface1_, "client2", "Goodbye");
interface1_->SendMessage("/exit");
EXPECT_CALL(*interface2_, WriteToOutput(_, _)).Times(0);
relay_.server()->WaitForEvents();
}
TEST_F(MoqChatEndToEndTest, EndToEndTestUngracefulClose) {
EXPECT_TRUE(client1_->Connect(moqt::moq_chat::kWebtransPath));
EXPECT_TRUE(client2_->Connect(moqt::moq_chat::kWebtransPath));
EXPECT_TRUE(client1_->PublishNamespaceAndSubscribeNamespace());
EXPECT_TRUE(client2_->PublishNamespaceAndSubscribeNamespace());
while (client1_->is_syncing() || client2_->is_syncing()) {
relay_.server()->WaitForEvents();
}
SendAndWaitForOutput(interface1_, interface2_, "client1", "Hello");
SendAndWaitForOutput(interface2_, interface1_, "client2", "Hi");
SendAndWaitForOutput(interface1_, interface2_, "client1", "How are you?");
SendAndWaitForOutput(interface2_, interface1_, "client2", "Good, and you?");
SendAndWaitForOutput(interface1_, interface2_, "client1", "I'm fine");
SendAndWaitForOutput(interface2_, interface1_, "client2", "Goodbye");
}
TEST_F(MoqChatEndToEndTest, LeaveAndRejoin) {
EXPECT_TRUE(client1_->Connect(moqt::moq_chat::kWebtransPath));
EXPECT_TRUE(client2_->Connect(moqt::moq_chat::kWebtransPath));
EXPECT_TRUE(client1_->PublishNamespaceAndSubscribeNamespace());
EXPECT_TRUE(client2_->PublishNamespaceAndSubscribeNamespace());
while (client1_->is_syncing() || client2_->is_syncing()) {
relay_.server()->WaitForEvents();
}
SendAndWaitForOutput(interface1_, interface2_, "client1", "Hello");
SendAndWaitForOutput(interface2_, interface1_, "client2", "Hi");
// Add a probe to see how many publishers are tracked on the relay.
MockMoqtSession namespace_probe;
EXPECT_CALL(namespace_probe, PublishNamespace).Times(3);
relay_.publisher()->AddNamespaceSubscriber(
TrackNamespace(moq_chat::kBasePath), &namespace_probe);
bool namespace_done = false;
EXPECT_CALL(namespace_probe, PublishNamespaceDone)
.WillOnce([&](const TrackNamespace&) {
namespace_done = true;
return true;
});
interface1_->SendMessage("/exit");
while (client1_->session_is_open()) {
relay_.server()->WaitForEvents();
}
client1_.reset();
while (!namespace_done) {
// Wait for the relay's session cleanup to send PUBLISH_NAMESPACE_DONE
// and PUBLISH_DONE.
relay_.server()->WaitForEvents();
}
// Create a new client with the same username and Reconnect.
auto if1bptr = std::make_unique<MockChatUserInterface>();
MockChatUserInterface* interface1b_ = if1bptr.get();
uint16_t port = relay_.server()->port();
client1_ = std::make_unique<moqt::moq_chat::ChatClient>(
quic::QuicServerId(std::string(kChatHostname), port), true,
std::move(if1bptr), "test_chat", "client1", "device1",
relay_.server()->event_loop());
EXPECT_TRUE(client1_->Connect(moqt::moq_chat::kWebtransPath));
EXPECT_TRUE(client1_->PublishNamespaceAndSubscribeNamespace());
while (client1_->is_syncing() || client2_->is_syncing()) {
relay_.server()->WaitForEvents();
}
SendAndWaitForOutput(interface1b_, interface2_, "client1", "Hello again");
SendAndWaitForOutput(interface2_, interface1b_, "client2", "Hi again");
// Cleanup the probe.
relay_.publisher()->RemoveNamespaceSubscriber(
TrackNamespace(moq_chat::kBasePath), &namespace_probe);
}
} // namespace test
} // namespace moqt