import { getProgramArgs } from '../../app/utils/programArgsUtils';

angular.module('signalboost').factory('signalStream', [
    '$log',
    'ComputationManager',
    '$q',
    'featureEnabled',
    'urlOverridesService',
    function ($log, ComputationManager, $q, featureEnabled, urlOverridesService) {
        const DEFAULT_RESOLUTION = 1000;

        const JOB_OPTION_PROPS = [
            'disabledPublishLabels',
            'disabledDetectLabels',
            'enabledDetectLabels',
            'disableAllMetricPublishes',
            'disableAllEventPublishes',
            'sampleSize',
            'filter',
            'maxDelayMs',
            'computingFor',
            'offsetByMaxDelay',
            'withDerivedMetadata',
            'replaceOnlyFilter',
            'fallbackResolutionMs',
            'immediate',
            'usedByDetectorUI',
            'useCache',
        ];

        const PREFLIGHT_JOB_PARAMS = [
            'enabledDetectLabels',
            'disabledDetectLabels',
            'disableAllEventPublishes',
            'maxDelayMs',
            'computingFor',
            'timezone',
        ];

        const INCIDENT_COMPUTATION_PARAMS = ['incidentId', 'boundaryType', 'selectedIdentifiers'];

        const ALIASED_OPTIONS = {
            maxDelayMs: 'maxDelay',
            signalFlowText: 'program',
        };

        const MESSAGE_TYPES = {
            DATA: 'data',
            METADATA: 'metadata',
            EVENT: 'event',
            CONTROL_MESSAGE: 'control-message',
            MESSAGE: 'message',
        };

        const MESSAGES = {
            DETECT_INPUT_CONTEXTS: 'DETECT_INPUT_CONTEXTS',
            JOB_RUNNING_RESOLUTION: 'JOB_RUNNING_RESOLUTION',
        };

        const CONTROL_MESSAGES = {
            STREAM_START: 'STREAM_START',
            JOB_START: 'JOB_START',
            END_OF_CHANNEL: 'END_OF_CHANNEL',
        };

        return {
            MESSAGES,
            stream,
            streamPreflight,
        };

        /*
      jobOpts contains both computation parameters (mostly enumerated in
      INCIDENT_COMPUTATION_PARAMS and JOB_OPTIONS_PARAMS) as well as
      callbacks. The following callbacks are invoked at various points of the
      life of a stream:
        MANDATORY CALLBACKS
        - callback: invoked on receipt of data, passed an object containing:
          - data (array): 1 length array containing an array of [ timestamp, value ]
          - tsid (string)
        - metaDataUpdated:
          - metadata (object)
          - tsid (string)

        OPTIONAL CALLBACKS
        - eventCallback: invoked when message type is MESSAGE_TYPES.EVENT and
          passes the event, which contains at least the following fields:
          - wasSimulated (boolean)
          - timestampMs (integer)
          - tsId (string) ...note capitalization
          - metadata (object)
        - onFeedback: invoked if message type is MESSAGE_TYPE.MESSAGE and passes
          syntheticMessageDigest which is an array containing the all of the
          message.message objects that have been passed in previous messages of
          the same type.
        - onStreamError: invoked on stream error, passes the following objects:
          - jobId (string)
          - errorType (string)
          - message (object)
        - streamRequestedCallback: invoked when the message event is
          CONTROL_MESSAGES.STREAM_START
          - message (object)
        - streamStartCallback: invoked when message type is MESSAGES.CONTROL_MESSAGE
          and message.event is CONTROL_MESSAGES.JOB_START and passes:
          - jobId (string)
        - streamStopCallback: invoked when message type is MESSAGES.CONTROL_MESSAGE
          and message.event is CONTROL_MESSAGES.END_OF_CHANNEL
        - timestampAdvanceCallback: invoked when message type is MESSAGE_TYPES.DATA
          and passes:
          - message.logicalTimestampMs (integer)
    */
        function stream(jobOpts) {
            if (!jobOpts.signalFlowText && !jobOpts.incidentId) {
                $log.warn('No signalflow or incident id');
                return null;
            }

            const compParams = {};

            const targetResolution = jobOpts.resolution || DEFAULT_RESOLUTION;

            if (jobOpts.incidentId) {
                angular.extend(
                    compParams,
                    INCIDENT_COMPUTATION_PARAMS.reduce((params, property) => {
                        params[property] = jobOpts[property];
                        return params;
                    }, {})
                );
            } else {
                JOB_OPTION_PROPS.forEach((prop) => {
                    if (jobOpts[prop]) {
                        compParams[ALIASED_OPTIONS[prop] || prop] = jobOpts[prop];
                    }
                });

                compParams.program = jobOpts.signalFlowText;
            }

            const globalTime = urlOverridesService.getGlobalTimeAbsolute();
            let absoluteGlobalTime = null;
            if (globalTime) {
                absoluteGlobalTime = {
                    absoluteStart: globalTime.start,
                    absoluteEnd: globalTime.end,
                };
            }

            // use absolute start time when present
            const start =
                globalTime?.start && !globalTime?.relative
                    ? globalTime.start
                    : getStartTime(jobOpts, targetResolution);

            angular.extend(compParams, {
                start,
                resolution: targetResolution,
                ...(featureEnabled('dashboardTimeWindow') &&
                    getProgramArgs(jobOpts, absoluteGlobalTime)),
            });

            // use absolute end time when present
            if (globalTime?.end && !globalTime?.relative) {
                compParams.stop = globalTime.end;
            } else if (jobOpts.stopTime) {
                compParams.stop = getStopTime(jobOpts);
            }

            if (jobOpts.timezone) {
                compParams.timezone = jobOpts.timezone;
            }

            return streamComputation(jobOpts, compParams, false);
        }

        function streamPreflight(jobOpts) {
            if (!jobOpts.signalFlowText) {
                $log.warn('No signalflow');
                return null;
            }

            const compParams = {
                program: jobOpts.signalFlowText,
                start: getStartTime(jobOpts, DEFAULT_RESOLUTION),
            };

            if (jobOpts.stopTime) {
                compParams.stop = getStopTime(jobOpts);
            }

            PREFLIGHT_JOB_PARAMS.forEach((prop) => {
                if (jobOpts[prop]) {
                    compParams[ALIASED_OPTIONS[prop] || prop] = jobOpts[prop];
                }
            });
            return streamComputation(jobOpts, compParams, true);
        }

        function streamComputation(jobOpts, compParams, isPreflight) {
            // Exposed externally in returned streamObj
            const metaDataMap = {};
            const etsMetaDataMap = {};

            // Private variables
            const targetResolution = jobOpts.resolution || DEFAULT_RESOLUTION;
            const syntheticMessageDigest = [];
            let computation = null;
            let stopped = false;
            let latestTimeStampSeen = null;
            let messageBuffer = [];
            let processMessages = true;

            const computationPreFetchPromise = $q.when().then(function () {
                computation = ComputationManager.streamComputation(
                    compParams,
                    isPreflight,
                    msgCallback,
                    msgErrback
                );
            });

            const streamObj = {
                resolution: targetResolution,
                stopStream,
                suspendStreaming() {
                    processMessages = false;
                },
                resumeStreaming() {
                    processMessages = true;
                    messageBuffer.forEach(msgCallback);
                    messageBuffer = [];
                },
                getLatestTimeStamp() {
                    return latestTimeStampSeen;
                },
                setJobOpts(_) {
                    jobOpts = _;
                },
                getJobOpts() {
                    return jobOpts;
                },
                uniqueKeyToTSIDMap: {},
                metaDataMap,
                etsMetaDataMap,
            };

            return streamObj;

            //---------------------------------------
            // Public functions
            //---------------------------------------

            function stopStream() {
                if (stopped) {
                    $log.warn('Tried to stop a stopped job!');
                    return;
                }

                // TODO (kevin): add reason and passthru
                computationPreFetchPromise.then(() => computation.stop());
                stopped = true;
            }

            //---------------------------------------
            // Private functions
            //---------------------------------------

            function metaDataCallback(metadata) {
                const metaDataProperties = metadata.properties;
                metaDataProperties.tsid = metadata.tsId;
                metaDataMap[metadata.tsId] = metaDataProperties;
                jobOpts.metaDataUpdated(metadata.properties, metadata.tsId);
            }

            function etsMetaDataCallback(metadata) {
                const etsMetaDataProperties = metadata.properties;
                etsMetaDataProperties.tsid = metadata.tsId;
                etsMetaDataMap[metadata.tsId] = etsMetaDataProperties;
            }

            function onStreamRequested(message) {
                if (jobOpts.streamRequestedCallback) {
                    jobOpts.streamRequestedCallback(message);
                }
            }

            function onStreamStart(jobId) {
                Object.keys(metaDataMap).forEach(function (tsid) {
                    delete metaDataMap[tsid];
                });

                if (jobOpts.streamStartCallback) {
                    jobOpts.streamStartCallback(jobId);
                }

                streamObj.viewJobId = jobId;
            }

            function msgCallback(message) {
                if (!processMessages) {
                    messageBuffer.push(message);
                    return;
                }

                if (message.type === MESSAGE_TYPES.DATA) {
                    onDataMessage(message);
                } else if (message.type === MESSAGE_TYPES.METADATA) {
                    onMetadataMessage(message);
                } else if (message.type === MESSAGE_TYPES.EVENT) {
                    onEventMessage(message);
                } else if (message.type === MESSAGE_TYPES.MESSAGE) {
                    onMessageMessage(message);
                } else if (message.type === MESSAGE_TYPES.CONTROL_MESSAGE) {
                    onControlMessage(message);
                } else {
                    $log.warn('did not route message type ' + message.type);
                }
            }

            function onDataMessage(msg) {
                latestTimeStampSeen = msg.logicalTimestampMs;

                if (jobOpts.timestampAdvanceCallback) {
                    jobOpts.timestampAdvanceCallback(msg.logicalTimestampMs);
                }

                if (!jobOpts.bulk) {
                    fireIndividualCallbacks(msg);
                } else {
                    fireBulkCallback(msg);
                }
            }

            function fireIndividualCallbacks(message) {
                // synthesize nulls for non-reporters. This is actually kind of expensive :(
                const seenTsids = {};

                Object.keys(metaDataMap).forEach((tsid) => (seenTsids[tsid] = false));

                message.data.forEach(function (point) {
                    seenTsids[point.tsId] = true;

                    jobOpts.callback({
                        timestamp: message.logicalTimestampMs,
                        value: point.value,
                        tsid: point.tsId,
                    });
                });

                Object.entries(seenTsids, ([tsid, seen]) => {
                    if (!seen) {
                        jobOpts.callback({
                            timestamp: message.logicalTimestampMs,
                            value: null,
                            tsid: tsid,
                        });
                    }
                });
            }

            function fireBulkCallback(msg) {
                const messageMap = {};

                Object.keys(metaDataMap).forEach(function (tsid) {
                    messageMap[tsid] = {
                        timestamp: msg.logicalTimestampMs,
                        value: null,
                    };
                });

                msg.data.forEach(function (point) {
                    messageMap[point.tsId].value = point.value;
                });

                jobOpts.callback(messageMap, msg.logicalTimestampMs);
            }

            function onMetadataMessage(message) {
                if (message.properties.sf_type === 'EventTimeSeries') {
                    etsMetaDataCallback(message);
                } else {
                    metaDataCallback(message);
                }
            }

            function onEventMessage(message) {
                if (jobOpts.eventCallback) {
                    jobOpts.eventCallback(message);
                }
            }

            function onMessageMessage(msg) {
                if (msg.message.messageCode === MESSAGES.DETECT_INPUT_CONTEXTS) {
                    onDetectInputContextsMessage(msg);
                }

                if (jobOpts.onFeedback) {
                    if (msg.message.blockContexts && msg.message.blockContexts.length > 0) {
                        // make a copy of each returned block as the API we are enforcing
                        // internally is one blockContext per message as opposed to an array
                        // of blockContexts.
                        msg.message.blockContexts.forEach(function (serial) {
                            const copy = angular.copy(msg.message);
                            copy.blockContext = serial;
                            delete copy.blockContexts;
                            syntheticMessageDigest.push(copy);
                        });
                    } else {
                        syntheticMessageDigest.push(msg.message);
                    }

                    jobOpts.onFeedback(syntheticMessageDigest);
                }
            }

            function onDetectInputContextsMessage(msg) {
                angular.forEach(msg.message.contents.inputContexts, function (def, key) {
                    angular.forEach(metaDataMap, function (metadata) {
                        if (
                            metadata.sf_detectorDerived &&
                            metadata.sf_metric.indexOf(key) ===
                                metadata.sf_metric.length - key.length
                        ) {
                            const suffix = def.fragment;
                            const splitMetric = metadata.sf_metric.split('.');
                            splitMetric.pop();

                            metadata.sf_metric = `${suffix} (${splitMetric.join('.')})`;
                        }
                    });
                });
            }

            function onControlMessage(message) {
                if (message.event === CONTROL_MESSAGES.STREAM_START) {
                    onStreamRequested(message);
                } else if (message.event === CONTROL_MESSAGES.JOB_START) {
                    onStreamStart(message.handle);
                } else if (message.event === CONTROL_MESSAGES.END_OF_CHANNEL) {
                    if (jobOpts.streamStopCallback) {
                        jobOpts.streamStopCallback();
                    }
                }
            }

            function msgErrback(msg) {
                if (jobOpts.onStreamError) {
                    jobOpts.onStreamError(msg.channel, 'jobStart', msg);
                }
            }
        }

        function getStartTime(jobOpts, targetResolution) {
            return Date.now() - Math.abs(jobOpts.historyrange) - targetResolution * 5;
        }

        function getStopTime(jobOpts) {
            return Date.now() + jobOpts.stopTime;
        }
    },
]);
