import { Injectable, inject } from "@angular/core";
import { Observable, ReplaySubject, from } from "rxjs";
import { map, share } from "rxjs/operators";

import { TranslateService } from "@ngx-translate/core";
import { KafkaConfig } from "../../../../models/shared";

import { AuthService } from "src/app/services/auth.service";
import { ZenApiService } from "src/app/services/zen-rpc-service";

@Injectable({
    providedIn: "root"
})
export class KafkaConfigsService {
    kafkaConfigs: Observable<KafkaConfig[]>;
    private kafkaConfigs$: ReplaySubject<KafkaConfig[]>;
    private dataStore: {
        configs: KafkaConfig[];
    };

    private zenApi = inject(ZenApiService);
    constructor(private authService: AuthService, private translate: TranslateService) {
        this.reset();

        this.authService.isLoggedIn.subscribe(isLoggedIn => {
            if (!isLoggedIn) this.reset();
        });
    }

    private reset() {
        this.dataStore = {
            configs: []
        };

        this.kafkaConfigs$ = new ReplaySubject(1) as ReplaySubject<KafkaConfig[]>;
        this.kafkaConfigs = this.kafkaConfigs$.asObservable();
    }

    private prepKafkaConfig(item: KafkaConfig) {
        return item;
    }

    private updateStore(kafkaConfig: KafkaConfig, merge: boolean): void {
        this.prepKafkaConfig(kafkaConfig);

        const currentIndex = this.dataStore.configs.findIndex(g => g.id === kafkaConfig.id);
        if (currentIndex === -1) {
            this.dataStore.configs.push(kafkaConfig);
            return;
        } else if (merge) {
            const current = this.dataStore.configs[currentIndex];

            Object.assign(current, kafkaConfig);

            const relationships = [];
            relationships.forEach(overwrite => {
                if (current[overwrite.id] == null) current[overwrite.obj] = null;
            });
        } else {
            this.dataStore.configs[currentIndex] = kafkaConfig;
        }
    }

    refreshKafkaConfigs(): Observable<KafkaConfig[]> {
        const kafkaConfigs = from(this.zenApi.client.kafkaConfigurations.list()).pipe(share());

        kafkaConfigs.subscribe(
            data => {
                const kafkas = data.body.result as unknown as KafkaConfig[];

                this.dataStore.configs.forEach((existing, existingIndex) => {
                    const newIndex = kafkas.findIndex(tp => tp.id === existing.id);
                    if (newIndex === -1) this.dataStore.configs.splice(existingIndex, 1);
                });

                kafkas.forEach(tp => this.updateStore(tp, true));

                this.kafkaConfigs$.next(Object.assign({}, this.dataStore).configs);
            },
            // eslint-disable-next-line no-console
            error => console.log(this.translate.instant("API_ERRORS.COULD_NOT_LOAD_TRANSCODING_PROFILES"), error)
        );

        return kafkaConfigs.pipe(map(r => r.body.result as unknown as KafkaConfig[]));
    }

    async testKafkaConfig(model: KafkaConfig): Promise<{ success: boolean; message: string }> {
        try {
            await this.zenApi.client.kafkaConfigurations.test({ params: { id: model.id } });
            return { success: true, message: "Test message sent successfully" };
        } catch (error) {
            return { success: false, message: error.error.error || "Failed testing Kafka configuration" };
        }
    }

    async createKafkaConfig(
        body: Omit<KafkaConfig, "id" | "created_at" | "updated_at" | "send_stats"> & { password: string }
    ): Promise<KafkaConfig | false> {
        try {
            const result = await this.zenApi.client.kafkaConfigurations.create({ body });
            const kafkaConfig = result.body.result as unknown as KafkaConfig;

            this.updateStore(kafkaConfig, false);

            this.kafkaConfigs$.next(Object.assign({}, this.dataStore).configs);
            return kafkaConfig;
        } catch (error) {
            return false;
        }
    }

    async updateKafkaConfig(
        id: number,
        body: Omit<KafkaConfig, "id" | "created_at" | "updated_at" | "send_stats">
    ): Promise<KafkaConfig | false> {
        try {
            const result = await this.zenApi.client.kafkaConfigurations.update({ params: { id }, body });
            const kafkaConfig = result.body.result as unknown as KafkaConfig;

            this.updateStore(kafkaConfig, false);

            this.kafkaConfigs$.next(Object.assign({}, this.dataStore).configs);
            return kafkaConfig;
        } catch (error) {
            return false;
        }
    }

    async deleteKafkaConfig({ id }: KafkaConfig): Promise<boolean> {
        try {
            const result = await this.zenApi.client.kafkaConfigurations.destroy({ params: { id } });
            const deletedId = result.body.result;

            const index = this.dataStore.configs.findIndex(tp => tp.id === deletedId);
            if (index !== -1) this.dataStore.configs.splice(index, 1);

            this.kafkaConfigs$.next(Object.assign({}, this.dataStore).configs);

            return true;
        } catch (error) {
            return false;
        }
    }
}
