import { List, Map, Set } from "immutable";
import { Observable, Subject, Subscription } from "rxjs";
import { bufferTime, filter, map } from "rxjs/operators";
import { isDefined } from "../utils/misc";

export class Listener<T> {
  private streams = Map<string, Listener.Stream<T>>();

  private subject = new Subject<Listener.Event<T>>();

  private observable: Observable<List<Listener.Event<T>>> = this.subject.pipe(
    bufferTime(100),
    map((events) =>
      List(events)
        .groupBy((event) => event.subject)
        .map((eventsForStream): Listener.Event<T> | undefined => eventsForStream.last() as Listener.Event<T>)
        .toList()
        .filter<Listener.Event<T>>(isDefined)
    ),
    filter((events) => events.size !== 0)
  );

  public listen(subject: string, observable: Observable<T>): Subscription {
    const existingStream = this.streams.get(subject);

    // Theoretically, at this point of time stream.subscription should be defined
    if (existingStream && existingStream.subscription) {
      return existingStream.subscription;
    } else {
      // console.log("[Listener] Stream started: ", subject);

      // Registering a stream without subscription to make sure that no events can be lost once we subscribe
      const newStream: Listener.Stream<T> = {
        subject,
        observable,
        subscription: undefined,
        activated: false,
        lastValue: undefined
      };
      this.streams = this.streams.set(subject, newStream);

      const subscription = observable.subscribe((value) => {
        const stream = this.streams.get(subject);
        if (stream) {
          if (stream.activated) {
            this.subject.next({ subject, value });
          } else {
            // console.log("[Listener] Buffered a value for " + subject + " (activation pending)");
          }
          this.streams = this.streams.set(subject, { ...stream, lastValue: value });
        } else {
          console.warn("[Listener] Received an event for unregistered stream " + subject);
        }
      });

      this.streams = this.streams.set(subject, { ...newStream, subscription });

      return subscription;
    }
  }

  public activateSubjects(subjects: Set<string>): void {
    const activated = this.streams
      .deleteAll(this.streams.keySeq().toSet().subtract(subjects))
      .filterNot((stream) => stream.activated)
      .map((stream, subject) => {
        if (stream.lastValue !== undefined) {
          this.subject.next({ subject, value: stream.lastValue });
        }
        return { ...stream, activated: true };
      });
    if (!activated.isEmpty()) {
      // console.log("[Listener] Activated streams: " + activated.keySeq().join(", "));
      this.streams = this.streams.merge(activated);
    }
  }

  public watch(): Observable<List<Listener.Event<T>>> {
    return this.observable;
  }
}

export namespace Listener {
  export type ContextMatcher<Context> = (context: Context) => boolean;

  export interface Stream<T> {
    readonly subject: string;
    readonly observable: Observable<T>;
    readonly subscription: Subscription | undefined;
    readonly activated: boolean;
    readonly lastValue: T | undefined;
  }

  export interface Event<T> {
    readonly subject: string;
    readonly value: T;
  }
}
