import { buffers, END, eventChannel, Task } from 'redux-saga'
import {
    all,
    call,
    cancel,
    cancelled,
    CancelledEffect,
    delay,
    fork,
    put,
    take,
} from 'redux-saga/effects'

import { Action } from '../ActionCreator'
import { debounceAndAggregateCalls, throttleCalls } from '../Aggregator'
import { loadFirebaseValue, snapshotToValue, snapshotToValueList } from './Firebase'
import { CollectionSyncActionCreators, FirebaseEntity } from './FirebaseCollectionReducer'

export function onEventChannel<T>(
    ref: firebase.database.Query,
    converter: f.Func1<firebase.database.DataSnapshot, T | null>,
    event: firebase.database.EventType,
    itemsToSkip: number,
) {
    let processedItems = 0
    return eventChannel(emit => {
        const callback = ref.on(
            event,
            dataSnapshot => {
                if (dataSnapshot && processedItems >= itemsToSkip) {
                    const v = converter(dataSnapshot)
                    if (v) {
                        emit(v)
                    }
                }
                processedItems += 1
            },
            (error: Error) => {
                // eslint-disable-next-line no-console
                console.error(error)
                emit(END)
            },
        )

        // Returns unsubscribe function
        return () => ref.off(event, callback as () => string | undefined)
    }, buffers.dropping<{} | null>(10))
}

export function onEventChannelThrottling<T>(
    ref: firebase.database.Query,
    converter: f.Func1<firebase.database.DataSnapshot, T | null>,
    event: firebase.database.EventType,
    interval: number,
) {
    const aggregator = throttleCalls<number>(interval)
    return eventChannel(emit => {
        const callback = ref.on(
            event,
            dataSnapshot => {
                if (dataSnapshot) {
                    const v = converter(dataSnapshot)
                    if (v) {
                        aggregator(emit, v as number)
                    }
                }
            },
            (error: Error) => {
                // eslint-disable-next-line no-console
                console.error(error)
                emit(END)
            },
        )

        // Returns unsubscribe function
        return () => ref.off(event, callback as () => string | undefined)
    }, buffers.dropping<{} | null>(10))
}

export function onEventChannelAggregating<T>(
    ref: firebase.database.Query,
    converter: f.Func1<firebase.database.DataSnapshot, T | null>,
    event: firebase.database.EventType,
    itemsToSkip: number,
    aggregateInterval: number,
) {
    let processedItems = 0
    const aggregator = debounceAndAggregateCalls(aggregateInterval)
    return eventChannel(emit => {
        const callback = ref.on(event, dataSnapshot => {
            if (dataSnapshot && processedItems >= itemsToSkip) {
                const v = converter(dataSnapshot)
                if (v) {
                    aggregator(emit, v)
                }
            }
            processedItems += 1
        })

        // Returns unsubscribe function
        return () => ref.off(event, callback as () => string | undefined)
    })
}

export function* syncCollectionCancellable<T, U extends FirebaseEntity>(
    ref: firebase.database.Query,
    actions: CollectionSyncActionCreators<T, U>,
    bufferTime: number = 500,
    cancellationTime: number = 900000, // Default cancellation time: 15 minutes (in milliseconds)
) {
    function factory(
        callbackKey: keyof CollectionSyncActionCreators<any, any>,
        event: firebase.database.EventType,
        itemsToSkip: number,
    ) {
        return function* () {
            const channelRef = onEventChannelAggregating(
                ref,
                snapshotToValue,
                event,
                itemsToSkip,
                bufferTime,
            )
            try {
                while (true) {
                    const value: T[] = yield take(channelRef)
                    yield put(actions[callbackKey](value))
                }
            } finally {
                const cancelEffect: CancelledEffect = yield cancelled()
                if (cancelEffect) {
                    channelRef.close()
                }
            }
        }
    }

    const items: T[] = yield call(loadFirebaseValue, ref, snapshotToValueList)
    yield put(actions.initialSet(items))

    const addedTask: Task = yield fork(factory('added', 'child_added', items.length))
    const removedTask: Task = yield fork(factory('removed', 'child_removed', 0))
    const changedTask: Task = yield fork(factory('changed', 'child_changed', 0))

    yield delay(cancellationTime)

    yield cancel(addedTask)
    yield cancel(removedTask)
    yield cancel(changedTask)
}

export function* syncCollection<T, U extends FirebaseEntity>(
    ref: firebase.database.Query,
    actions: CollectionSyncActionCreators<T, U>,
    bufferTime: number = 500,
) {
    // skip n initial elements to prevent added callback for initial listener attachment
    function factory(
        callbackKey: keyof CollectionSyncActionCreators<any, any>,
        event: firebase.database.EventType,
        itemsToSkip: number,
    ) {
        return function* () {
            const channelRef = onEventChannelAggregating(
                ref,
                snapshotToValue,
                event,
                itemsToSkip,
                bufferTime,
            )
            try {
                while (true) {
                    const value: T[] = yield take(channelRef)
                    yield put(actions[callbackKey](value))
                }
            } finally {
                const cancelledEffect: CancelledEffect = yield cancelled()
                if (cancelledEffect) {
                    channelRef.close()
                }
            }
        }
    }

    const items: T[] = yield call(loadFirebaseValue, ref, snapshotToValueList)
    yield put(actions.initialSet(items))

    const result: CancelledEffect[] = yield all([
        fork(factory('added', 'child_added', items.length)),
        fork(factory('removed', 'child_removed', 0)),
        fork(factory('changed', 'child_changed', 0)),
    ])
    return result
}

export function* syncValue<T>(
    ref: firebase.database.Query,
    action: f.Func1<T, Action<any>>,
    dropFirst: boolean,
) {
    const channelRef = onEventChannel(ref, snapshotToValue, 'value', 0)
    let hadFirst = false
    try {
        while (true) {
            const value: T = yield take(channelRef)
            if (!dropFirst || hadFirst) {
                yield put(action(value))
            }
            hadFirst = true
        }
    } finally {
        const cancelledEffect: CancelledEffect = yield cancelled()
        if (cancelledEffect) {
            channelRef.close()
        }
    }
}

export function* syncValueWithCallback<T>(
    ref: firebase.database.Query,
    callback: (value: T) => Generator,
    dropFirst: boolean,
) {
    const channelRef = onEventChannel(ref, snapshotToValue, 'value', 0)
    let hadFirst = false
    try {
        while (true) {
            const value: T = yield take(channelRef)
            if (!dropFirst || hadFirst) {
                yield callback(value)
            }
            hadFirst = true
        }
    } finally {
        const cancelledEffect: CancelledEffect = yield cancelled()
        if (cancelledEffect) {
            channelRef.close()
        }
    }
}

export function* syncValueThrottle<T>(
    ref: firebase.database.Query,
    action: f.Func1<T, Action<any>>,
    interval: number,
) {
    const channelRef = onEventChannelThrottling(ref, snapshotToValue, 'value', interval)
    try {
        while (true) {
            const value: T = yield take(channelRef)
            yield put(action(value))
        }
    } finally {
        const cancelledEffect: CancelledEffect = yield cancelled()
        if (cancelledEffect) {
            channelRef.close()
        }
    }
}
