import { Observable } from '../Observable'; import { OperatorFunction, ObservableInput } from '../types'; import { Subject } from '../Subject'; import { operate } from '../util/lift'; import { createOperatorSubscriber } from './OperatorSubscriber'; import { noop } from '../util/noop'; import { innerFrom } from '../observable/innerFrom'; /** * Branch out the source Observable values as a nested Observable whenever * `windowBoundaries` emits. * * It's like {@link buffer}, but emits a nested Observable * instead of an array. * * ![](window.png) * * Returns an Observable that emits windows of items it collects from the source * Observable. The output Observable emits connected, non-overlapping * windows. It emits the current window and opens a new one whenever the * `windowBoundaries` emits an item. `windowBoundaries` can be any type that * `ObservableInput` accepts. It internally gets converted to an Observable. * Because each window is an Observable, the output is a higher-order Observable. * * ## Example * * In every window of 1 second each, emit at most 2 click events * * ```ts * import { fromEvent, interval, window, map, take, mergeAll } from 'rxjs'; * * const clicks = fromEvent(document, 'click'); * const sec = interval(1000); * const result = clicks.pipe( * window(sec), * map(win => win.pipe(take(2))), // take at most 2 emissions from each window * mergeAll() // flatten the Observable-of-Observables * ); * result.subscribe(x => console.log(x)); * ``` * * @see {@link windowCount} * @see {@link windowTime} * @see {@link windowToggle} * @see {@link windowWhen} * @see {@link buffer} * * @param windowBoundaries An `ObservableInput` that completes the * previous window and starts a new window. * @return A function that returns an Observable of windows, which are * Observables emitting values of the source Observable. */ export function window(windowBoundaries: ObservableInput): OperatorFunction> { return operate((source, subscriber) => { let windowSubject: Subject = new Subject(); subscriber.next(windowSubject.asObservable()); const errorHandler = (err: any) => { windowSubject.error(err); subscriber.error(err); }; // Subscribe to our source source.subscribe( createOperatorSubscriber( subscriber, (value) => windowSubject?.next(value), () => { windowSubject.complete(); subscriber.complete(); }, errorHandler ) ); // Subscribe to the window boundaries. innerFrom(windowBoundaries).subscribe( createOperatorSubscriber( subscriber, () => { windowSubject.complete(); subscriber.next((windowSubject = new Subject())); }, noop, errorHandler ) ); return () => { // Unsubscribing the subject ensures that anyone who has captured // a reference to this window that tries to use it after it can // no longer get values from the source will get an ObjectUnsubscribedError. windowSubject?.unsubscribe(); windowSubject = null!; }; }); }