import { Observable } from '../Observable'; import { Subject } from '../Subject'; import { Subscription } from '../Subscription'; import { ObservableInput, OperatorFunction } from '../types'; import { operate } from '../util/lift'; import { innerFrom } from '../observable/innerFrom'; import { createOperatorSubscriber } from './OperatorSubscriber'; import { noop } from '../util/noop'; import { arrRemove } from '../util/arrRemove'; /** * Branch out the source Observable values as a nested Observable starting from * an emission from `openings` and ending when the output of `closingSelector` * emits. * * It's like {@link bufferToggle}, but emits a nested * Observable instead of an array. * * ![](windowToggle.png) * * Returns an Observable that emits windows of items it collects from the source * Observable. The output Observable emits windows that contain those items * emitted by the source Observable between the time when the `openings` * Observable emits an item and when the Observable returned by * `closingSelector` emits an item. * * ## Example * * Every other second, emit the click events from the next 500ms * * ```ts * import { fromEvent, interval, windowToggle, EMPTY, mergeAll } from 'rxjs'; * * const clicks = fromEvent(document, 'click'); * const openings = interval(1000); * const result = clicks.pipe( * windowToggle(openings, i => i % 2 ? interval(500) : EMPTY), * mergeAll() * ); * result.subscribe(x => console.log(x)); * ``` * * @see {@link window} * @see {@link windowCount} * @see {@link windowTime} * @see {@link windowWhen} * @see {@link bufferToggle} * * @param {Observable} openings An observable of notifications to start new * windows. * @param {function(value: O): Observable} closingSelector A function that takes * the value emitted by the `openings` observable and returns an Observable, * which, when it emits a next notification, signals that the * associated window should complete. * @return A function that returns an Observable of windows, which in turn are * Observables. */ export function windowToggle( openings: ObservableInput, closingSelector: (openValue: O) => ObservableInput ): OperatorFunction> { return operate((source, subscriber) => { const windows: Subject[] = []; const handleError = (err: any) => { while (0 < windows.length) { windows.shift()!.error(err); } subscriber.error(err); }; innerFrom(openings).subscribe( createOperatorSubscriber( subscriber, (openValue) => { const window = new Subject(); windows.push(window); const closingSubscription = new Subscription(); const closeWindow = () => { arrRemove(windows, window); window.complete(); closingSubscription.unsubscribe(); }; let closingNotifier: Observable; try { closingNotifier = innerFrom(closingSelector(openValue)); } catch (err) { handleError(err); return; } subscriber.next(window.asObservable()); closingSubscription.add(closingNotifier.subscribe(createOperatorSubscriber(subscriber, closeWindow, noop, handleError))); }, noop ) ); // Subscribe to the source to get things started. source.subscribe( createOperatorSubscriber( subscriber, (value: T) => { // Copy the windows array before we emit to // make sure we don't have issues with reentrant code. const windowsCopy = windows.slice(); for (const window of windowsCopy) { window.next(value); } }, () => { // Complete all of our windows before we complete. while (0 < windows.length) { windows.shift()!.complete(); } subscriber.complete(); }, handleError, () => { // Add this finalization so that all window subjects are // disposed of. This way, if a user tries to subscribe // to a window *after* the outer subscription has been unsubscribed, // they will get an error, instead of waiting forever to // see if a value arrives. while (0 < windows.length) { windows.shift()!.unsubscribe(); } } ) ); }); }