// @todo enable the following disabled rules see OPENTOK-31136 for more info
/* eslint-disable no-shadow, prefer-rest-params, prefer-spread, no-param-reassign */
/* eslint-disable no-mixed-operators, no-cond-assign, one-var */
import createLogger from '../../helpers/log';
import sessionObjects from '../../ot/session/objects';
import Archive from '../../ot/archive';
import now from '../../helpers/now';
import Connection from '../../ot/connection';
import DelayedEventQueue from './DelayedEventQueue';
import Dispatcher from './Dispatcher';
import Stream from '../../ot/stream';
import StreamChannel from '../../ot/stream_channel';
import getMediaModeBySourceStreamId from '../../helpers/get-media-mode-by-source-stream-id';

const logging = createLogger('SessionDispatcher');

function constructBareConnection(fromAddress) {
  return {
    id: fromAddress,
    creationTime: new Date().getTime(),
    data: {},
    capablities: {},
    permissions: [],
  };
}

function parseStream(dict, session) {
  const channel = dict.channel.map(channel => new StreamChannel(channel));

  const connectionId = dict.connectionId ? dict.connectionId : dict.connection.id;

  return new Stream(
    dict.id,
    dict.name,
    dict.creationTime,
    session.connections.get(connectionId),
    session,
    channel,
    dict.customProperties && dict.customProperties.initials
  );
}

function parseAndAddStreamToSession(streamParams, session) {
  if (session.streams.has(streamParams.id)) {
    return undefined;
  }

  const stream = parseStream(streamParams, session);
  session.streams.add(stream);

  return stream;
}

function parseArchive(archiveParams) {
  return new Archive(archiveParams.id,
    archiveParams.name,
    archiveParams.status);
}

function parseAndAddArchiveToSession(archiveParams, session) {
  if (session.archives.has(archiveParams.id)) { return undefined; }

  const archive = parseArchive(archiveParams);
  session.archives.add(archive);

  return archive;
}

const DelayedSessionEvents = function (dispatcher) {
  const eventQueues = {};

  this.enqueue = function enqueue(/* key, arg1, arg2, ..., argN */) {
    const key = arguments[0];
    const eventArgs = Array.prototype.slice.call(arguments, 1);
    if (!eventQueues[key]) {
      eventQueues[key] = new DelayedEventQueue(dispatcher);
    }
    eventQueues[key].enqueue.apply(eventQueues[key], eventArgs);
  };

  this.triggerConnectionCreated = function triggerConnectionCreated(connection) {
    if (eventQueues[`connectionCreated${connection.id}`]) {
      eventQueues[`connectionCreated${connection.id}`].triggerAll();
    }
  };

  this.triggerSessionConnected = function triggerSessionConnected(connections) {
    if (eventQueues.sessionConnected) {
      eventQueues.sessionConnected.triggerAll();
    }

    connections.forEach(function (connection) {
      this.triggerConnectionCreated(connection);
    }, this);
  };
};

const unconnectedStreams = {};

export default function SessionDispatcher(session, { connectionEventsSuppressed } = {}) {
  const dispatcher = new Dispatcher();
  let sessionStateReceived = false;
  const delayedSessionEvents = new DelayedSessionEvents(dispatcher);

  dispatcher.on('reconnecting', () => {
    session._.reconnecting();
  });

  dispatcher.on('reconnected', () => {
    session._.reconnected();
  });

  dispatcher.on('close', (reason) => {
    const connection = session.connection;

    if (!connection) {
      return;
    }

    if (connection.destroyedReason()) {
      logging.debug(`${'Socket was closed but the connection had already ' +
        'been destroyed. Reason: '}${connection.destroyedReason()}`);
      return;
    }

    connection.destroy(reason);
  });

  // This method adds connections to the session both on a connection#created and
  // on a session#read. In the case of session#read sessionRead is set to true and
  // we include our own connection.
  const addConnection = function (connection, sessionRead) {
    if (session.connections.has(connection.id)) {
      // Don't add a duplicate connection, since we add them not only on connection#created, but
      // also when stream#created or signal has an associated connection.
      return session.connections.get(connection.id);
    }

    connection = Connection.fromHash(connection);
    if (sessionRead || session.connection && connection.id !== session.connection.id) {
      session.connections.add(connection);
      delayedSessionEvents.triggerConnectionCreated(connection);
    }

    Object.keys(unconnectedStreams).forEach((streamId) => {
      const stream = unconnectedStreams[streamId];
      if (stream && connection.id === stream.connection.id) {
        // dispatch streamCreated event now that the connectionCreated has been dispatched
        parseAndAddStreamToSession(stream, session);
        delete unconnectedStreams[stream.id];

        const payload = {
          debug: sessionRead ? 'connection came in session#read' :
            'connection came in connection#created',
          streamId: stream.id,
          connectionId: connection.id,
        };
        session.logEvent('streamCreated', 'warning', payload);
      }
    });

    return connection;
  };

  dispatcher.on('session#read', (content, transactionId) => {
    let connection;
    const state = {};

    state.streams = [];
    state.connections = [];
    state.archives = [];

    content.connection.forEach((connectionParams) => {
      connection = addConnection(connectionParams, true);
      state.connections.push(connection);
    });

    content.stream.forEach((streamParams) => {
      state.streams.push(parseAndAddStreamToSession(streamParams, session));
    });

    (content.archive || content.archives).forEach((archiveParams) => {
      state.archives.push(parseAndAddArchiveToSession(archiveParams, session));
    });

    dispatcher.triggerCallback(transactionId, null, state);

    sessionStateReceived = true;
    delayedSessionEvents.triggerSessionConnected(session.connections);
  });

  dispatcher.on('session#update', (content) => {
    if (content.reason !== 'mute') {
      logging.debug('session#update that handle this reason is not currently implemented');
      return;
    }

    const active = content.active === true;
    session._.forceMute({ active });
    if (active) {
      session._.enableMuteOnEntry();
    } else {
      session._.disableMuteOnEntry();
    }
  });

  dispatcher.on('session#muted', () => {
    session._.enableMuteOnEntry();
    session._.forceMute({ active: true });
  });

  dispatcher.on('connection#created', (connection) => {
    addConnection(connection);
  });

  dispatcher.on('connection#deleted', (connection, reason) => {
    connection = session.connections.get(connection);
    if (!connection) {
      logging.warn('A connection was deleted that we do not know about');
      return;
    }
    connection.destroy(reason);
  });

  dispatcher.on('stream#created', (content, transactionId) => {
    const { id: streamId, sourceStreamId, connection } = content;
    let stream;

    if (connectionEventsSuppressed) {
      if (connection == null || connection.id == null) {
        session.logEvent('SessionDispatcher:stream#created', 'Event', {
          connection,
          info: 'Stream did not contain a connection object. Event ignored',
        });
        return;
      }
      addConnection(connection);
    }

    const connectionId = content.connectionId || connection.id;

    if (session.connections.has(connectionId)) {
      stream = parseAndAddStreamToSession(content, session);
    } else {
      unconnectedStreams[streamId] = content;

      const payload = {
        debug: 'eventOrderError -- streamCreated event before connectionCreated',
        streamId,
        sourceStreamId: getMediaModeBySourceStreamId(sourceStreamId),
      };
      session.logEvent('streamCreated', 'warning', payload);
    }

    if (stream) {
      if (stream.publisher) {
        stream.publisher.setStream(stream);
      }

      dispatcher.triggerCallback(transactionId, null, stream);
    } else if (session.sessionInfo.isAdaptiveEnabled && sourceStreamId === 'P2P') {
      // A P2P stream was created in an adaptive session, so let's request a new subscriber for it
      sessionObjects.subscribers.where({ streamId }).forEach((subscriber) => {
        subscriber._.startRoutedToRelayedTransition();
      });
    }
  });

  dispatcher.on('stream#deleted', (streamId, reason, content) => {
    const stream = session.streams.get(streamId);
    const { sourceStreamId } = content;
    const subscribers = sessionObjects.subscribers;
    const { isAdaptiveEnabled } = session.sessionInfo;
    const isAdaptiveP2pStream = isAdaptiveEnabled && sourceStreamId === 'P2P';

    if (!stream && !isAdaptiveP2pStream) {
      // Stream has been previously deleted
      return;
    }

    if (isAdaptiveP2pStream) {
      // In adaptive sessions, when the P2P stream is destroyed we need to start the transition
      // from relayed to routed.
      subscribers.where({ streamId }).forEach((subscriber) => {
        subscriber._.startRelayedToRoutedTransition();
      });
    } else {
      stream.destroy(reason);
    }
  });

  dispatcher.on('stream#updated', (streamId, content) => {
    const stream = session.streams.get(streamId);

    if (!stream) {
      logging.error(`A stream does not exist with the id of ${
        streamId}, for stream#updated message!`);
      // @todo error
      return;
    }

    stream._.update(content);
  });

  dispatcher.on('stream#update', (streamId, content) => {
    const stream = session.streams.get(streamId);
    const { reason, active } = content;

    if (!stream) {
      logging.error(`A stream does not exist with the id of ${
        streamId}, for stream#updated message!`);
      // @todo error
      return;
    }

    if (reason !== 'mute') {
      logging.debug('stream#update that handle this reason is not currently implemented');
      return;
    }

    if (active) {
      stream._.forceMute(content);
    }
  });

  dispatcher.on('streamChannel#updated', (streamId, channelId, content) => {
    let stream;
    // In the P2P leg of AMR-enabled sessions, messages are received from both P2P and Mantis.
    // We only want to dispatch one message, see VIDCS-35.
    if (session.sessionInfo.isAdaptiveEnabled && content.sourceStreamId === 'P2P') {
      return;
    }
    if (!(streamId && (stream = session.streams.get(streamId)))) {
      logging.error('Unable to determine streamId, or the stream does not ' +
        'exist, for streamChannel message!');
      // @todo error
      return;
    }
    stream._.updateChannel(channelId, content);
  });

  // Dispatch JSEP messages
  //
  // generateoffer:
  // Request to generate a offer for another Peer (or Prism). This kicks
  // off the JSEP process.
  //
  // answer:
  // generate a response to another peers offer, this contains our constraints
  // and requirements.
  //
  // pranswer:
  // a provisional answer, i.e. not the final one.
  //
  // candidate
  //
  //

  const jsepHandler = (method, streamId, fromAddress, message) => {
    let subscriberFilter;
    let actors;

    const hasStreamId = { streamId };
    const subscribers = sessionObjects.subscribers;
    const publishers = sessionObjects.publishers;

    if (message.params.subscriber) {
      subscriberFilter = { widgetId: message.params.subscriber };
    } else {
      // if we don't know the subscriber, we will just match the stream id
      subscriberFilter = hasStreamId;
    }

    // Determine which subscriber/publisher objects should receive this message.
    switch (method) {
      case 'offer':
        actors = []
          .concat(subscribers.where(subscriberFilter), publishers.where(hasStreamId))
          .slice(0, 1);
        break;

      case 'answer':
      case 'pranswer':
        actors = []
          .concat(publishers.where(hasStreamId), subscribers.where(subscriberFilter))
          .slice(0, 1);
        break;

      case 'generateoffer':
      case 'unsubscribe':
        actors = publishers.where(hasStreamId);
        break;

      case 'candidate':
        actors = [].concat(subscribers.where(subscriberFilter), publishers.where(hasStreamId));
        break;

      default:
        logging.debug(`jsep#${method
        } is not currently implemented`);
        return;
    }

    if (actors.length === 0) { return; }

    let fromConnection = session.connections.get(fromAddress);
    if (!fromConnection && fromAddress.match(/^symphony\./)) {
      fromConnection = Connection.fromHash({
        id: fromAddress,
        creationTime: Math.floor(now()),
      });

      session.connections.add(fromConnection);
    }

    actors.forEach(async (actor) => {
      try {
        // processMessage in subscriber is async, we need to await to catch
        // any errors that may be thrown by processMessage
        await actor.processMessage(method, fromAddress, message);
      } catch (err) {
        logging.error('Error occurred during processMessage', err);
      }
    });
  };

  dispatcher.on('jsep#offer', jsepHandler.bind(null, 'offer'));
  dispatcher.on('jsep#answer', jsepHandler.bind(null, 'answer'));
  dispatcher.on('jsep#pranswer', jsepHandler.bind(null, 'pranswer'));
  dispatcher.on('jsep#generateoffer', jsepHandler.bind(null, 'generateoffer'));
  dispatcher.on('jsep#unsubscribe', jsepHandler.bind(null, 'unsubscribe'));
  dispatcher.on('jsep#candidate', jsepHandler.bind(null, 'candidate'));

  dispatcher.on('subscriberChannel#updated', (streamId, channelId, content) => {
    if (!streamId || !session.streams.has(streamId)) {
      logging.error('Unable to determine streamId, or the stream does not ' +
        'exist, for subscriberChannel#updated message!');
      // @todo error
      return;
    }

    session.streams.get(streamId)._
      .updateChannel(channelId, content);
  });

  dispatcher.on('subscriberChannel#update', (subscriberId, streamId, content) => {
    if (!streamId || !session.streams.has(streamId)) {
      logging.error('Unable to determine streamId, or the stream does not ' +
        'exist, for subscriberChannel#update message!');
      // @todo error
      return;
    }

    // Hint to update for congestion control from the Media Server
    if (!sessionObjects.subscribers.has(subscriberId)) {
      logging.error('Unable to determine subscriberId, or the subscriber ' +
        'does not exist, for subscriberChannel#update message!');
      // @todo error
      return;
    }

    // We assume that an update on a Subscriber channel is to disableVideo
    // we may need to be more specific in the future
    sessionObjects.subscribers.get(subscriberId).disableVideo(content.active);
  });

  // Note: subscriber#created and subscriber#deleted messages are available but we currently
  // don't have a use for them.

  dispatcher.on('signal', (fromAddress, content) => {
    if (connectionEventsSuppressed) {
      let connection = content.connection || content.fromConnection;
      if (connection == null || connection.id == null) {
        connection = constructBareConnection(fromAddress);
        session.logEvent('SessionDispatcher:Signal', 'Event', {
          fromAddress,
          connection: content.connection,
          fromConnection: content.fromConnection,
          info: 'Signal did not contain a connection object. One has been constructed',
          constructedConnection: connection,
        });
      }
      addConnection(connection);
    }

    const signalType = content.type;
    const data = content.data;

    const fromConnection = session.connections.get(fromAddress);
    if (session.connection && fromAddress === session.connection.connectionId) {
      if (sessionStateReceived) {
        session._.dispatchSignal(fromConnection, signalType, data);
      } else {
        delayedSessionEvents.enqueue('sessionConnected',
          'signal', fromAddress, signalType, data);
      }
    } else if (session.connections.get(fromAddress)) {
      session._.dispatchSignal(fromConnection, signalType, data);
    } else if (fromAddress === '') { // Server originated signal
      session._.dispatchSignal(null, signalType, data);
    } else {
      delayedSessionEvents.enqueue(`connectionCreated${fromAddress}`,
        'signal', fromAddress, signalType, data);
    }
  });

  dispatcher.on('caption', (fromAddress, content) => {
    session._.dispatchCaption(fromAddress, content);
  });

  dispatcher.on('archive#created', (archive) => {
    parseAndAddArchiveToSession(archive, session);
  });

  dispatcher.on('archive#updated', (archiveId, update) => {
    const archive = session.archives.get(archiveId);

    if (!archive) {
      logging.error(`An archive does not exist with the id of ${
        archiveId}, for archive#updated message!`);
      // @todo error
      return;
    }

    archive._.update(update);
  });

  dispatcher.on('source#create', (sourceStreamId, streamId, reason) => {
    const mediaMode = getMediaModeBySourceStreamId(sourceStreamId);
    session.logEvent('SessionDispatcher:source#create', 'Event', {
      sourceStreamId: mediaMode,
      streamId,
      reason,
    });

    logging.debug('Received a request from RUMOR to start a transition to ' +
      `${sourceStreamId} for the stream ID ${streamId} with the reason: ${reason}`);

    sessionObjects.publishers.where({ streamId }).forEach((publisher) => {
      publisher._.startRoutedToRelayedTransition();
    });
  });

  dispatcher.on('source#delete', (sourceStreamId, streamId, reason) => {
    session.logEvent('SessionDispatcher:source#delete', 'Event', {
      sourceStreamId: getMediaModeBySourceStreamId(sourceStreamId),
      streamId,
      reason,
    });

    logging.debug('Received a request from RUMOR to start a transition from P2P ' +
      `to MANTIS for the stream ID ${streamId} with the reason: ${reason}`);

    sessionObjects.publishers.where({ streamId }).forEach((publisher) => {
      publisher._.startRelayedToRoutedTransition();
    });
  });

  return dispatcher;
}
