import {QueueSubject} from "SHARED_PATH/models/queue-subject.model";
import {Semaphore} from "./semaphore.model";

export class ThrottleQueueSubject<T> extends QueueSubject<T> {
    private readonly semaphore: Semaphore;

    constructor(executeFn: (value: T) => Promise<void>, private calcPortion: (value: T) => number, prepareValueFn: (initValue: any) => T, maxParallelExecutions?: number) {
        super(executeFn, maxParallelExecutions);

        this.semaphore = new Semaphore(this.maxParallelExecutions);
        this.initExecuteFn(executeFn, prepareValueFn);
    }

    /** Add a semaphore to the execute function to restrict how many functions are executed parallely. */
    private initExecuteFn(executeFn: (value: T) => Promise<void>, prepareValueFn?: (initValue: any) => T): void {
        this.executeFn = async (initValue: any) => {
            const value: T = prepareValueFn ? await prepareValueFn(initValue) : initValue;
            const release: any = await this.semaphore.getByFn(() => this.calcPortion(value));

            await executeFn(value);

            release();
        };
    }

    /** @Override */
    clear(): void {
        super.clear();
        this.semaphore.releaseAll();
    }
}