import autoBind from 'auto-bind';
import * as protooClient from 'protoo-client';
import { DistributiveOmit } from 'react-redux';
import { BehaviorSubject, firstValueFrom } from 'rxjs';

import { PeerData, RemotePeerId, SessionId } from 'common/models/db/vo.interface';
import {
  FloofMessages,
  MediasoupConsumerId,
  MediasoupProducerId,
  P2pGenericSignalingMessage,
} from 'common/models/floof.interface';
import { defaultWsPort } from 'common/models/floof.interface';
import { PeerMessage } from 'common/models/peer-message.interface';
import { filterIsTruthy } from 'common/utils/custom-rx-operators';
import { encodeString } from 'common/utils/string-utils';
import { assertExhaustedType, queryParamsToUrlFriendlyString } from 'common/utils/ts-utils';
import { AnyPeerId } from 'store/room/room.model';

export interface ProtooPeer {
  data: {
    isConnected$: BehaviorSubject<boolean>;
  };
  request<
    T extends
      | FloofMessages['requests']['client-to-server']['room']
      | FloofMessages['requests']['client-to-server']['p2p']
      | FloofMessages['requests']['client-to-server']['sfu'],
  >(
    method: DistributiveOmit<T, 'response'>['method'],
    data: DistributiveOmit<T, 'response'>['data'],
  ): Promise<T['response']>;
  on(event: 'open' | 'close' | 'failed' | 'disconnected', handler: any): void;
  on<
    T extends
      | FloofMessages['notifications']['server-to-client']['room']
      | FloofMessages['notifications']['server-to-client']['p2p']
      | FloofMessages['notifications']['server-to-client']['sfu'],
  >(
    event: 'notification',
    handler: (notification: T) => void,
  ): void;
  on<T extends FloofMessages['requests']['server-to-client']['sfu']>(
    event: 'request',
    handler: (request: T, accept: () => void, reject: () => void) => void,
  ): void;
  close(): void;
}

export type ProtooConnectionState =
  | 'created'
  | 'connecting'
  | 'connected'
  | 'closed'
  | 'disconnected'
  | 'failed';

export type ProtooP2pDelegate = ProtooGeneric & ProtooRoomSpecific & ProtooP2pSpecific;
export type ProtooSfuDelegate = ProtooGeneric & ProtooRoomSpecific & ProtooSfuSpecific;
export type ProtooDelegate = ProtooP2pDelegate & ProtooSfuDelegate;
export interface ProtooGeneric {
  onConnectionStateChanged: (isConnected: boolean) => void;
}
export interface ProtooRoomSpecific {
  onRoomPeerJoined: (peerId: RemotePeerId) => void;
  onRoomPeerLeft: (peerId: RemotePeerId) => void;
  onRoomPeerMessage: (peerId: RemotePeerId, message: PeerMessage) => void;
}
export interface ProtooP2pSpecific {
  onP2pGenericSignalingMessage?: (peerId: RemotePeerId, message: P2pGenericSignalingMessage) => void;
}
export interface ProtooSfuSpecific {
  onSfuActiveSpeaker?: (peerId: RemotePeerId, volume?: number) => void;
  onSfuAudioVolumes?: (volumes: { volume: number; peerId: RemotePeerId }[]) => void;
  onSfuConsumerClosed?: (consumerId: MediasoupConsumerId) => void;
  onSfuConsumerLayersChanged?: (
    consumerId: MediasoupConsumerId,
    spatialLayer?: number | null,
    temporalLayer?: number | null,
  ) => void;
  onSfuConsumerPaused?: (consumerId: MediasoupConsumerId) => void;
  onSfuConsumerResumed?: (consumerId: MediasoupConsumerId) => void;
  onSfuConsumerScore?: (
    consumerId: MediasoupConsumerId,
    score: {
      score: number;
      // The score of the currently selected RTP stream of the producer.
      producerScore: number;
      // The scores of all RTP streams in the producer ordered by encoding (just
      // useful when the producer uses simulcast).
      producerScores: number[];
    },
  ) => void;
  onSfuDataConsumerClosed?: (dataConsumerId: MediasoupConsumerId) => void;
  onSfuDownlinkBwe?: (
    desiredBitrate: number,
    effectiveDesiredBitrate: number,
    availableBitrate: number,
  ) => void;
  onSfuProducerScore?: (peerId: RemotePeerId) => void;
  onSfuNewConsumerRequest?: (
    request: {
      peerId: RemotePeerId;
      producerId: MediasoupProducerId;
      id: MediasoupConsumerId;
      kind: any;
      rtpParameters: any;
      appData: any;
    },
    accept: () => void,
    reject: () => void,
  ) => void;
}

export class Protoo {
  private protooConnection?: ProtooPeer;
  private isConnected$ = new BehaviorSubject<boolean>(false);
  constructor(private peerId: AnyPeerId, private sessionId: SessionId, private delegate: ProtooDelegate) {
    autoBind(this);
  }

  private setConnectionState(connectionState: ProtooConnectionState) {
    const isConnected = connectionState === 'connected';
    if (this.isConnected$.value === isConnected) return;
    this.isConnected$.next(isConnected);
    this.delegate.onConnectionStateChanged(isConnected);
  }

  public isConnected() {
    return this.isConnected$.value;
  }

  public connect(hostname: string, peerData: PeerData) {
    const protooPort = defaultWsPort;
    const protooUrl = `wss://${hostname}:${protooPort}/?${queryParamsToUrlFriendlyString({
      sessionId: this.sessionId,
      peerId: this.peerId,
      encodedPeerData: encodeString(JSON.stringify(peerData)),
    })}`;
    this.setConnectionState('connecting');
    const protooTransport = new protooClient.WebSocketTransport(protooUrl, {
      retry: { retries: 0, minTimeout: 1000, maxTimeout: 2000 },
    });
    const peer: ProtooPeer = (this.protooConnection = new protooClient.Peer(protooTransport));

    peer.on('open', () => {
      console.log('connected to floof', protooUrl, performance.now());
      this.setConnectionState('connected');
    });

    peer.on('close', () => {
      console.log('floof connection closed', protooUrl, performance.now());
      delete this.protooConnection;
      this.setConnectionState('closed');
    });
    peer.on('disconnected', () => {
      // TODO: how should we handle this?
      console.log('disconnected from floof', performance.now());
      this.setConnectionState('disconnected');
    });
    peer.on('failed', () => {
      // TODO: how should we handle this?
      this.setConnectionState('failed');
    });
    peer.on('notification', (note) => {
      switch (note.method) {
        case 'note-s2c-room-peer-joined':
          this.delegate.onRoomPeerJoined(note.data.peerId);
          break;

        case 'note-s2c-room-peer-left':
          this.delegate.onRoomPeerLeft(note.data.peerId);
          break;

        case 'note-s2c-room-peer-sent-message':
          this.delegate.onRoomPeerMessage(note.data.peerId, note.data.peerMessage.peerMessage);
          break;

        case 'note-s2c-p2p-recv-generic-signaling-message':
          this.delegate.onP2pGenericSignalingMessage?.(note.data.peerId, note.data.message);
          break;
        case 'note-s2c-sfu-active-speaker':
          this.delegate.onSfuActiveSpeaker?.(note.data.peerId, note.data.volume);
          break;
        case 'note-s2c-sfu-audio-volumes':
          this.delegate.onSfuAudioVolumes?.(note.data.volumes);
          break;
        case 'note-s2c-sfu-consumer-closed':
          this.delegate.onSfuConsumerClosed?.(note.data.consumerId);
          break;
        case 'note-s2c-sfu-consumer-layers-changed':
          this.delegate.onSfuConsumerLayersChanged?.(note.data.consumerId);
          break;
        case 'note-s2c-sfu-consumer-paused':
          this.delegate.onSfuConsumerPaused?.(note.data.consumerId);
          break;
        case 'note-s2c-sfu-consumer-resumed':
          this.delegate.onSfuConsumerResumed?.(note.data.consumerId);
          break;
        case 'note-s2c-sfu-consumer-score':
          this.delegate.onSfuConsumerScore?.(note.data.consumerId, note.data.score);
          break;
        case 'note-s2c-sfu-data-consumer-closed':
          this.delegate.onSfuDataConsumerClosed?.(note.data.consumerId);
          break;
        case 'note-s2c-sfu-downlink-bwe':
          this.delegate.onSfuDownlinkBwe?.(
            note.data.desiredBitrate,
            note.data.effectiveDesiredBitrate,
            note.data.availableBitrate,
          );
          break;
        case 'note-s2c-sfu-producer-score':
          // this.delegate.onSfuProducerScore?.(note.data.producerId, score);
          break;
        default:
          assertExhaustedType(note);
      }
    });
    peer.on('request', (request, accept, reject) => {
      switch (request.method) {
        case 'req-s2c-sfu-new-consumer':
          this.delegate.onSfuNewConsumerRequest?.(
            request.data,
            () => accept(),
            () => reject(),
          );
          break;
      }
    });
  }

  public disconnect() {
    this.protooConnection?.close();
    delete this.protooConnection;
  }

  public async sendRequest<
    T extends
      | FloofMessages['requests']['client-to-server']['room']
      | FloofMessages['requests']['client-to-server']['p2p']
      | FloofMessages['requests']['client-to-server']['sfu'],
  >(request: DistributiveOmit<T, 'response'>): Promise<T['response']> {
    await firstValueFrom(this.isConnected$.pipe(filterIsTruthy()));
    if (!this.protooConnection) return;
    const response = (await this.protooConnection
      .request(request.method, request.data)
      .catch((error) => console.log(error.message, request.method, request.data))) as T['response'];
    return response;
  }
}
