import { noop } from 'lodash'
import MQTT from 'paho-mqtt'
import { eventChannel } from 'redux-saga'
import { cancelled, CancelledEffect, put, take } from 'redux-saga/effects'

import { Action } from '../ActionCreator'
import { aggregateCalls } from '../Aggregator'
import { logger } from '../logging/Logger'

export function onMqttEventChannel<T>(
    ref: MqttRef,
    converter: f.Func1<string, T>,
    aggregateInterval: number,
) {
    const client = new MQTT.Client(ref.url, ref.clientId)
    const connect = () => {
        client.connect({
            reconnect: true,
            onSuccess: () => {
                client.subscribe(ref.topic, { qos: 0 })
            },
        } as any)
    }
    return eventChannel(emit => {
        const aggregator = aggregateCalls(aggregateInterval)
        connect()
        client.onMessageArrived = (m: MQTT.Message) => {
            const v = converter(m.payloadString)
            if (v) {
                aggregator(emit, v)
            }
        }

        client.onConnectionLost = (e: MQTT.MQTTError) => {
            if (e.errorCode) {
                logger.handleError(new Error(e.errorMessage))
                setTimeout(connect, 1000)
            }
        }

        // Returns unsubscribe function
        return () => (client.isConnected() ? client.disconnect() : noop())
    })
}

export function* syncMqttWebsocket<T>(
    ref: MqttRef,
    action: f.Func1<T, Action<any>>,
    aggregateInterval: number,
) {
    const channelRef = onMqttEventChannel(ref, JSON.parse, aggregateInterval)
    try {
        while (true) {
            const value: T = yield take(channelRef)
            yield put(action(value))
        }
    } finally {
        const cancelledValue: CancelledEffect = yield cancelled()
        if (cancelledValue) {
            channelRef.close()
        }
    }
}

export interface MqttRef {
    url: string
    topic: string
    clientId: string
}
