// Copyright 2023 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef QUICHE_QUIC_MOQT_MOQT_SESSION_H_
#define QUICHE_QUIC_MOQT_MOQT_SESSION_H_

#include <cstdint>
#include <optional>
#include <string>
#include <utility>

#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "absl/strings/string_view.h"
#include "quiche/quic/core/quic_types.h"
#include "quiche/quic/moqt/moqt_framer.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_parser.h"
#include "quiche/quic/moqt/moqt_track.h"
#include "quiche/common/platform/api/quiche_export.h"
#include "quiche/common/quiche_buffer_allocator.h"
#include "quiche/common/quiche_callbacks.h"
#include "quiche/common/simple_buffer_allocator.h"
#include "quiche/web_transport/web_transport.h"

namespace moqt {

namespace test {
class MoqtSessionPeer;
}

using MoqtSessionEstablishedCallback = quiche::SingleUseCallback<void()>;
using MoqtSessionTerminatedCallback =
    quiche::SingleUseCallback<void(absl::string_view error_message)>;
using MoqtSessionDeletedCallback = quiche::SingleUseCallback<void()>;
// If |error_message| is nullopt, the ANNOUNCE was successful.
using MoqtOutgoingAnnounceCallback = quiche::SingleUseCallback<void(
    absl::string_view track_namespace,
    std::optional<MoqtAnnounceErrorReason> error)>;
using MoqtIncomingAnnounceCallback =
    quiche::MultiUseCallback<std::optional<MoqtAnnounceErrorReason>(
        absl::string_view track_namespace)>;

inline std::optional<MoqtAnnounceErrorReason> DefaultIncomingAnnounceCallback(
    absl::string_view /*track_namespace*/) {
  return std::optional(MoqtAnnounceErrorReason{
      MoqtAnnounceErrorCode::kAnnounceNotSupported,
      "This endpoint does not accept incoming ANNOUNCE messages"});
};

// Callbacks for session-level events.
struct MoqtSessionCallbacks {
  MoqtSessionEstablishedCallback session_established_callback = +[] {};
  MoqtSessionTerminatedCallback session_terminated_callback =
      +[](absl::string_view) {};
  MoqtSessionDeletedCallback session_deleted_callback = +[] {};

  MoqtIncomingAnnounceCallback incoming_announce_callback =
      DefaultIncomingAnnounceCallback;
};

class QUICHE_EXPORT MoqtSession : public webtransport::SessionVisitor {
 public:
  MoqtSession(webtransport::Session* session, MoqtSessionParameters parameters,
              MoqtSessionCallbacks callbacks = MoqtSessionCallbacks())
      : session_(session),
        parameters_(parameters),
        callbacks_(std::move(callbacks)),
        framer_(quiche::SimpleBufferAllocator::Get(),
                parameters.using_webtrans) {}
  ~MoqtSession() { std::move(callbacks_.session_deleted_callback)(); }

  // webtransport::SessionVisitor implementation.
  void OnSessionReady() override;
  void OnSessionClosed(webtransport::SessionErrorCode,
                       const std::string&) override;
  void OnIncomingBidirectionalStreamAvailable() override;
  void OnIncomingUnidirectionalStreamAvailable() override;
  void OnDatagramReceived(absl::string_view datagram) override;
  void OnCanCreateNewOutgoingBidirectionalStream() override {}
  void OnCanCreateNewOutgoingUnidirectionalStream() override {}

  void Error(MoqtError code, absl::string_view error);

  quic::Perspective perspective() const { return parameters_.perspective; }

  // Add to the list of tracks that can be subscribed to. Call this before
  // Announce() so that subscriptions can be processed correctly. If |visitor|
  // is nullptr, then incoming SUBSCRIBE for objects in the path will receive
  // SUBSCRIBE_OK, but never actually get the objects.
  void AddLocalTrack(const FullTrackName& full_track_name,
                     MoqtForwardingPreference forwarding_preference,
                     LocalTrack::Visitor* visitor);
  // Send an ANNOUNCE message for |track_namespace|, and call
  // |announce_callback| when the response arrives. Will fail immediately if
  // there is already an unresolved ANNOUNCE for that namespace.
  void Announce(absl::string_view track_namespace,
                MoqtOutgoingAnnounceCallback announce_callback);
  bool HasSubscribers(const FullTrackName& full_track_name) const;
  // Send an ANNOUNCE_CANCEL and delete local tracks in that namespace when all
  // subscriptions are closed for that track.
  void CancelAnnounce(absl::string_view track_namespace);

  // Returns true if SUBSCRIBE was sent. If there is already a subscription to
  // the track, the message will still be sent. However, the visitor will be
  // ignored.
  // Subscribe from (start_group, start_object) to the end of the track.
  bool SubscribeAbsolute(absl::string_view track_namespace,
                         absl::string_view name, uint64_t start_group,
                         uint64_t start_object, RemoteTrack::Visitor* visitor,
                         absl::string_view auth_info = "");
  // Subscribe from (start_group, start_object) to the end of end_group.
  bool SubscribeAbsolute(absl::string_view track_namespace,
                         absl::string_view name, uint64_t start_group,
                         uint64_t start_object, uint64_t end_group,
                         RemoteTrack::Visitor* visitor,
                         absl::string_view auth_info = "");
  // Subscribe from (start_group, start_object) to (end_group, end_object).
  bool SubscribeAbsolute(absl::string_view track_namespace,
                         absl::string_view name, uint64_t start_group,
                         uint64_t start_object, uint64_t end_group,
                         uint64_t end_object, RemoteTrack::Visitor* visitor,
                         absl::string_view auth_info = "");
  bool SubscribeCurrentObject(absl::string_view track_namespace,
                              absl::string_view name,
                              RemoteTrack::Visitor* visitor,
                              absl::string_view auth_info = "");
  bool SubscribeCurrentGroup(absl::string_view track_namespace,
                             absl::string_view name,
                             RemoteTrack::Visitor* visitor,
                             absl::string_view auth_info = "");

  // Returns false if it could not open a stream when necessary, or if the
  // track does not exist (there was no call to AddLocalTrack). Will still
  // return false is some streams succeed.
  // Also returns false if |payload_length| exists but is shorter than
  // |payload|.
  // |payload.length() >= |payload_length|, because the application can deliver
  // partial objects.
  bool PublishObject(const FullTrackName& full_track_name, uint64_t group_id,
                     uint64_t object_id, uint64_t object_send_order,
                     MoqtObjectStatus status, absl::string_view payload);
  void CloseObjectStream(const FullTrackName& full_track_name,
                         uint64_t group_id);
  // TODO: Add an API to send partial objects.

  MoqtSessionCallbacks& callbacks() { return callbacks_; }

 private:
  friend class test::MoqtSessionPeer;
  class QUICHE_EXPORT Stream : public webtransport::StreamVisitor,
                               public MoqtParserVisitor {
   public:
    Stream(MoqtSession* session, webtransport::Stream* stream)
        : session_(session),
          stream_(stream),
          parser_(session->parameters_.using_webtrans, *this) {}
    Stream(MoqtSession* session, webtransport::Stream* stream,
           bool is_control_stream)
        : session_(session),
          stream_(stream),
          parser_(session->parameters_.using_webtrans, *this),
          is_control_stream_(is_control_stream) {}

    // webtransport::StreamVisitor implementation.
    void OnCanRead() override;
    void OnCanWrite() override;
    void OnResetStreamReceived(webtransport::StreamErrorCode error) override;
    void OnStopSendingReceived(webtransport::StreamErrorCode error) override;
    void OnWriteSideInDataRecvdState() override {}

    // MoqtParserVisitor implementation.
    // TODO: Handle a stream FIN.
    void OnObjectMessage(const MoqtObject& message, absl::string_view payload,
                         bool end_of_message) override;
    void OnClientSetupMessage(const MoqtClientSetup& message) override;
    void OnServerSetupMessage(const MoqtServerSetup& message) override;
    void OnSubscribeMessage(const MoqtSubscribe& message) override;
    void OnSubscribeOkMessage(const MoqtSubscribeOk& message) override;
    void OnSubscribeErrorMessage(const MoqtSubscribeError& message) override;
    void OnUnsubscribeMessage(const MoqtUnsubscribe& message) override;
    // There is no state to update for SUBSCRIBE_DONE.
    void OnSubscribeDoneMessage(const MoqtSubscribeDone& /*message*/) override {
    }
    void OnSubscribeUpdateMessage(const MoqtSubscribeUpdate& message) override;
    void OnAnnounceMessage(const MoqtAnnounce& message) override;
    void OnAnnounceOkMessage(const MoqtAnnounceOk& message) override;
    void OnAnnounceErrorMessage(const MoqtAnnounceError& message) override;
    void OnAnnounceCancelMessage(const MoqtAnnounceCancel& message) override;
    void OnTrackStatusRequestMessage(
        const MoqtTrackStatusRequest& message) override {};
    void OnUnannounceMessage(const MoqtUnannounce& /*message*/) override {}
    void OnTrackStatusMessage(const MoqtTrackStatus& message) override {}
    void OnGoAwayMessage(const MoqtGoAway& /*message*/) override {}
    void OnParsingError(MoqtError error_code,
                        absl::string_view reason) override;

    quic::Perspective perspective() const {
      return session_->parameters_.perspective;
    }

    webtransport::Stream* stream() const { return stream_; }

    // Sends a control message, or buffers it if there is insufficient flow
    // control credit.
    void SendOrBufferMessage(quiche::QuicheBuffer message, bool fin = false);

   private:
    friend class test::MoqtSessionPeer;
    void SendSubscribeError(const MoqtSubscribe& message,
                            SubscribeErrorCode error_code,
                            absl::string_view reason_phrase,
                            uint64_t track_alias);
    bool CheckIfIsControlStream();

    MoqtSession* session_;
    webtransport::Stream* stream_;
    MoqtParser parser_;
    // nullopt means "incoming stream, and we don't know if it's the control
    // stream or a data stream yet".
    std::optional<bool> is_control_stream_;
    std::string partial_object_;
  };

  // Returns true if SUBSCRIBE_DONE was sent.
  bool SubscribeIsDone(uint64_t subscribe_id, SubscribeDoneCode code,
                       absl::string_view reason_phrase);
  // Returns the pointer to the control stream, or nullptr if none is present.
  Stream* GetControlStream();
  // Sends a message on the control stream; QUICHE_DCHECKs if no control stream
  // is present.
  void SendControlMessage(quiche::QuicheBuffer message);

  // Returns false if the SUBSCRIBE isn't sent.
  bool Subscribe(MoqtSubscribe& message, RemoteTrack::Visitor* visitor);

  // Returns the stream ID if successful, nullopt if not.
  // TODO: Add a callback if stream creation is delayed.
  std::optional<webtransport::StreamId> OpenUnidirectionalStream();

  // Get FullTrackName and visitor for a subscribe_id and track_alias. Returns
  // nullptr if not present.
  std::pair<FullTrackName, RemoteTrack::Visitor*> TrackPropertiesFromAlias(
      const MoqtObject& message);

  webtransport::Session* session_;
  MoqtSessionParameters parameters_;
  MoqtSessionCallbacks callbacks_;
  MoqtFramer framer_;

  std::optional<webtransport::StreamId> control_stream_;
  std::string error_;

  // All the tracks the session is subscribed to, indexed by track_alias.
  // Multiple subscribes to the same track are recorded in a single
  // subscription.
  absl::flat_hash_map<uint64_t, RemoteTrack> remote_tracks_;
  // Look up aliases for remote tracks by name
  absl::flat_hash_map<FullTrackName, uint64_t> remote_track_aliases_;
  uint64_t next_remote_track_alias_ = 0;

  // All the tracks the peer can subscribe to.
  absl::flat_hash_map<FullTrackName, LocalTrack> local_tracks_;
  absl::flat_hash_map<uint64_t, FullTrackName> local_track_by_subscribe_id_;
  // This is only used to check for track_alias collisions.
  absl::flat_hash_set<uint64_t> used_track_aliases_;
  uint64_t next_local_track_alias_ = 0;

  // Indexed by subscribe_id.
  struct ActiveSubscribe {
    MoqtSubscribe message;
    RemoteTrack::Visitor* visitor;
    // The forwarding preference of the first received object, which all
    // subsequent objects must match.
    std::optional<MoqtForwardingPreference> forwarding_preference;
    // If true, an object has arrived for the subscription before SUBSCRIBE_OK
    // arrived.
    bool received_object = false;
  };
  // Outgoing SUBSCRIBEs that have not received SUBSCRIBE_OK or SUBSCRIBE_ERROR.
  absl::flat_hash_map<uint64_t, ActiveSubscribe> active_subscribes_;
  uint64_t next_subscribe_id_ = 0;

  // Indexed by track namespace.
  absl::flat_hash_map<std::string, MoqtOutgoingAnnounceCallback>
      pending_outgoing_announces_;

  // The role the peer advertised in its SETUP message. Initialize it to avoid
  // an uninitialized value if no SETUP arrives or it arrives with no Role
  // parameter, and other checks have changed/been disabled.
  MoqtRole peer_role_ = MoqtRole::kPubSub;
};

}  // namespace moqt

#endif  // QUICHE_QUIC_MOQT_MOQT_SESSION_H_
