blob: 8e23ebf1e2743c94e6dc0ff283f6df0032224c4a [file] [log] [blame]
// Copyright (c) 2025 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "quiche/quic/moqt/tools/moqt_relay.h"
#include <cstdint>
#include <string>
#include <utility>
#include "absl/strings/string_view.h"
#include "quiche/quic/core/io/quic_event_loop.h"
#include "quiche/quic/core/quic_time.h"
#include "quiche/quic/moqt/moqt_relay_publisher.h"
#include "quiche/quic/moqt/moqt_session.h"
#include "quiche/quic/moqt/tools/moqt_client.h"
#include "quiche/quic/moqt/tools/moqt_server.h"
#include "quiche/quic/test_tools/crypto_test_utils.h"
#include "quiche/common/platform/api/quiche_test.h"
#include "quiche/common/quiche_weak_ptr.h"
namespace moqt {
namespace test {
constexpr quic::QuicTime::Delta kEventLoopDuration =
quic::QuicTime::Delta::FromMilliseconds(50);
class TestMoqtRelay : public MoqtRelay {
public:
TestMoqtRelay(std::string bind_address, uint16_t bind_port,
absl::string_view default_upstream, bool ignore_certificate,
bool promiscuous_mode, quic::QuicEventLoop* event_loop)
: MoqtRelay(quic::test::crypto_test_utils::ProofSourceForTesting(),
bind_address, bind_port, default_upstream, ignore_certificate,
promiscuous_mode, event_loop) {}
quic::QuicEventLoop* server_event_loop() {
return server()->quic_server().event_loop();
}
void RunOneEvent() {
server_event_loop()->RunEventLoopOnce(kEventLoopDuration);
}
MoqtSession* client_session() {
return (client() == nullptr) ? nullptr : client()->session();
}
MoqtRelayPublisher* publisher() { return MoqtRelay::publisher(); }
};
class MoqtRelayTest : public quiche::test::QuicheTest {
public:
MoqtRelayTest()
: upstream_("127.0.0.1", 9991, "", true, false, nullptr), // no client.
relay_("127.0.0.1", 9992, "https://127.0.0.1:9991", true, false,
upstream_.server_event_loop()),
downstream_("127.0.0.1", 9993, "https://127.0.0.1:9992", true, false,
relay_.server_event_loop()) {
RunUntilConnected(relay_, upstream_);
RunUntilConnected(downstream_, relay_);
}
inline bool ClientFullyConnected(TestMoqtRelay& client) {
return client.publisher()->GetDefaultUpstreamSession().IsValid() &&
client.publisher()->GetDefaultUpstreamSession().GetIfAvailable() ==
client.client_session();
}
void RunUntilConnected(TestMoqtRelay& client, TestMoqtRelay& server) {
int iterations_remaining = 20;
while (!ClientFullyConnected(client) && iterations_remaining-- > 0) {
server.RunOneEvent();
}
ASSERT_GT(iterations_remaining, 0);
}
TestMoqtRelay upstream_, relay_, downstream_;
};
TEST_F(MoqtRelayTest, NodeChainEstablished) {
// relay_ and downstream_ have a default session.
ASSERT_NE(downstream_.client_session(), nullptr);
EXPECT_EQ(downstream_.client_session()->publisher(), downstream_.publisher());
ASSERT_NE(downstream_.publisher(), nullptr);
EXPECT_EQ(
downstream_.publisher()->GetDefaultUpstreamSession().GetIfAvailable(),
downstream_.client_session()->GetWeakPtr().GetIfAvailable());
ASSERT_NE(relay_.client_session(), nullptr);
EXPECT_EQ(relay_.client_session()->publisher(), relay_.publisher());
ASSERT_NE(relay_.publisher(), nullptr);
EXPECT_EQ(relay_.publisher()->GetDefaultUpstreamSession().GetIfAvailable(),
relay_.client_session()->GetWeakPtr().GetIfAvailable());
EXPECT_EQ(upstream_.client_session(), nullptr);
ASSERT_NE(upstream_.publisher(), nullptr);
EXPECT_EQ(upstream_.publisher()->GetDefaultUpstreamSession().GetIfAvailable(),
nullptr);
}
TEST_F(MoqtRelayTest, CloseSession) {
ASSERT_NE(relay_.client_session(), nullptr);
std::move(relay_.client_session()->callbacks().session_terminated_callback)(
"");
EXPECT_FALSE(relay_.publisher()->GetDefaultUpstreamSession().IsValid());
}
#if 0 // TODO(martinduke): Re-enable these tests when GOAWAY support exists.
TEST_F(MoqtRelayTest, GoAwayAtClient) {
ASSERT_NE(relay_.client_session(), nullptr);
// Provide the same URI again.
MoqtSessionInterface* original_default_session =
relay_.publisher()->GetDefaultUpstreamSession().GetIfAvailable();
EXPECT_NE(original_default_session, nullptr);
std::move(relay_.client_session()->callbacks().goaway_received_callback)(
"https://127.0.0.1:9991");
RunUntilConnected(relay_, upstream_);
EXPECT_TRUE(relay_.publisher()->GetDefaultUpstreamSession().IsValid());
EXPECT_EQ(relay_.publisher()->GetDefaultUpstreamSession().GetIfAvailable(),
relay_.client_session());
EXPECT_NE(relay_.publisher()->GetDefaultUpstreamSession().GetIfAvailable(),
original_default_session);
// Terminating the original session does nothing.
std::move(original_default_session->callbacks().session_terminated_callback)(
"test");
EXPECT_TRUE(relay_.publisher()->GetDefaultUpstreamSession().IsValid());
EXPECT_EQ(relay_.publisher()->GetDefaultUpstreamSession().GetIfAvailable(),
relay_.client_session());
}
TEST_F(MoqtRelayTest, TwoGoAwaysAtClient) {
ASSERT_NE(relay_.client_session(), nullptr);
// Provide the same URI again.
MoqtSessionInterface* original_default_session =
relay_.publisher()->GetDefaultUpstreamSession().GetIfAvailable();
EXPECT_NE(original_default_session, nullptr);
std::move(relay_.client_session()->callbacks().goaway_received_callback)(
"https://127.0.0.1:9991");
RunUntilConnected(relay_, upstream_);
EXPECT_TRUE(relay_.publisher()->GetDefaultUpstreamSession().IsValid());
EXPECT_EQ(relay_.publisher()->GetDefaultUpstreamSession().GetIfAvailable(),
relay_.client_session());
EXPECT_NE(relay_.publisher()->GetDefaultUpstreamSession().GetIfAvailable(),
original_default_session);
// The original session still exists, but the second session also receives
// a GOAWAY.
MoqtSessionInterface* second_default_session =
relay_.publisher()->GetDefaultUpstreamSession().GetIfAvailable();
EXPECT_NE(second_default_session, nullptr);
EXPECT_NE(second_default_session, original_default_session);
std::move(second_default_session->callbacks().goaway_received_callback)(
"https://127.0.0.1:9991");
RunUntilConnected(relay_, upstream_);
EXPECT_TRUE(relay_.publisher()->GetDefaultUpstreamSession().IsValid());
EXPECT_EQ(relay_.publisher()->GetDefaultUpstreamSession().GetIfAvailable(),
relay_.client_session());
EXPECT_NE(relay_.publisher()->GetDefaultUpstreamSession().GetIfAvailable(),
second_default_session);
// The original session is now been destroyed, along with its client. The test
// might reuse original_session's address for the third session,
// unfortunately, so comparing a pointer to original_default_session is
// dangerous.
// second_default_session still exists, so this call doesn't segfault.
std::move(second_default_session->callbacks().session_terminated_callback)(
"test");
// Third session is still connected.
EXPECT_TRUE(relay_.publisher()->GetDefaultUpstreamSession().IsValid());
EXPECT_EQ(relay_.publisher()->GetDefaultUpstreamSession().GetIfAvailable(),
relay_.client_session());
EXPECT_NE(relay_.publisher()->GetDefaultUpstreamSession().GetIfAvailable(),
second_default_session);
}
#endif
// TODO(martinduke): Write tests for server sessions once there is related state
// that we can access.
} // namespace test
} // namespace moqt