import {Observable, Subject, zip} from "rxjs";

/**
 * Executes a specific asynchronous function parallel for the given data stream.
 * Executes as many functions parallel as maximally allowed.
 * Completes after having processed the given data.
 */
export class QueueSubject<T> extends Subject<T[]> {
    maxParallelExecutions: number = 1;

    /**
     * Holds the queued data.
     */
    private queue$$: Subject<T> = new Subject();

    /**
     * Handles the parallel processing of the data.
     */
    private queueProgressor$$: Subject<void> = new Subject();
    /**
     * Triggers the execution of the desired function.
     */
    private executeJob$: Observable<[T, void]> = zip(this.queue$$.asObservable(), this.queueProgressor$$.asObservable());
    private completeCounter: number = 0;

    constructor(protected executeFn: (value: T) => Promise<void>, maxParallelExecutions?: number) {
        super();
        this.maxParallelExecutions = maxParallelExecutions || this.maxParallelExecutions;

        // This subscription is automatically unsubscribed as soon as one of the observables inside zip() complete
        this.executeJob$.subscribe((value) => this.execute(value));

        // Emit as often as the desired count of concurrent downloads
        for (let i: number = 0; i < this.maxParallelExecutions; i++) {
            this.queueProgressor$$.next();
        }
    }

    private async execute(value: [T, void]): Promise<void> {
        // an empty value implies that all downloads have been queued beforehand
        if (value[0] === undefined) {
            // wait until all parallel executions finished and complete the subject
            if (++this.completeCounter >= this.maxParallelExecutions) {
                this.complete();
            }
        } else {
            try {
                await this.executeFn(value[0]);
            } catch(error) {
                // keep on going if one execution fails
                if (error) {
                    console.error(error);
                }
            }
            this.queueProgressor$$.next();
        }
    }

    /**
     * Queues the given data and starts processing the values parallel.
     * The subject will be completed, when all values have been processed!
     *
     * @Override
     */
    next(values: T[]): void {
        for (const value of values) {
            this.queue$$.next(value);
        }

        for (let i: number = 0; i < this.maxParallelExecutions; i++) {
            // Emit empty values to catch up to the count of emitted downloadQueueProgressor events.
            // This ensures that the queue progressor does not complete before the pending requests are finished.
            // This also signalises that the end of the queue is reached.
            this.queue$$.next();
        }
    }

    /**
     * Add the given values to the queue and wait for them to finish.
     */
    async awaitAll(values: T[], abortSubject?: Subject<any>): Promise<T[]> {
        abortSubject.subscribe(this);
        this.next(values);
        return this.toPromise();
    }

    /**
     * Complete the subject instance.
     *
     * @Override
     */
    complete(): void {
        this.queueProgressor$$.complete();
        this.queue$$.complete();
        this.clear();
        super.complete();
    }

    /**
     * Unsubscribe and close the subject instance.
     *
     * @Override
     */
    unsubscribe(): void {
        this.queueProgressor$$.unsubscribe();
        this.queue$$.unsubscribe();
        this.clear();
        super.unsubscribe();
    }

    clear(): void {}
}