import { Subject, merge, Subscription } from 'rxjs';
import { debounceTime, takeUntil, repeat, switchMap, map, distinctUntilChanged, tap } from 'rxjs/operators';

interface InputDebounceManagerRegistryItem<TValue> {
  /** The source subject of value changes */
  changesSubj: Subject<TValue>;
  /** forces an update before debounceTime has a chance to complete */
  forceNotificationSubj: Subject<unknown>;
  /** stores subscription so it can be unsubscribed from later */
  subscription: Subscription;
}

/** Recieves updates from inputs */
export class InputDebounceManager<TKey, TValue> {
  /** The source of changes that consumers may subscribe to via change$ */
  private globalChangesSubj = new Subject<{ key: TKey, value: TValue }>();
  /** contains all tracked items */
  private registry = new Map<TKey, InputDebounceManagerRegistryItem<TValue>>();

  readonly change$ = this.globalChangesSubj.asObservable();

  /** the time it will take notifications to reach change$ subscribes */
  debounceDuration = 10000;

  /** Will unsubscribe from all internal subscriptions */
  clear() {
    this.registry.forEach(x => x.subscription.unsubscribe());
    this.registry.clear();
  }

  /** Forces a notification on all items waiting to notify changes. */
  forceAll() {
    Array.from(this.registry.entries()).forEach(([key, val]) => val.forceNotificationSubj.next(key));
  }

  /** Ignores remaining debounce time and forces change$ to be called for item with matching key */
  forceNotification(key: TKey) {
    this.getRegistryItem(key)?.forceNotificationSubj.next(key);
  }

  /** Report that an item with the matching key has been changed with value */
  onChange(key: TKey, value: TValue, immediateNotification?: boolean) {
    const item = this.getRegistryItem(key);
    item?.changesSubj.next(value);
    if (immediateNotification) {
      item?.forceNotificationSubj.next();
    }
  }

  private getRegistryItem(key: TKey) {
    if (!this.registry.has(key)) {
      const changesSubj = new Subject<TValue>();
      const forceNotificationSubj = new Subject();

      const change$ = merge(
        // handle debounced changes
        changesSubj.pipe(
          debounceTime(this.debounceDuration),
          takeUntil(forceNotificationSubj), // cancel The Debounce after a force.
          repeat()  // start listening to debounces again.
        ),
        // handle forced Updates
        changesSubj.pipe(
          switchMap(x => forceNotificationSubj.pipe(map(() => x)))
        )
      ).pipe(
        distinctUntilChanged(),
        tap(x => this.globalChangesSubj.next({ key, value: x }))
      );

      const subscription = change$.subscribe();
      const item: InputDebounceManagerRegistryItem<TValue> = { changesSubj, forceNotificationSubj, subscription };
      this.registry.set(key, item);
      return item;
    }
    else {
      return this.registry.get(key);
    }
  }
}
