import { OperatorFunction, ObservableInput } from '../types';
import { operate } from '../util/lift';
import { createOperatorSubscriber } from './OperatorSubscriber';
import { innerFrom } from '../observable/innerFrom';
/**
* Compares all values of two observables in sequence using an optional comparator function
* and returns an observable of a single boolean value representing whether or not the two sequences
* are equal.
*
* Checks to see of all values emitted by both observables are equal, in order.
*
* ![](sequenceEqual.png)
*
* `sequenceEqual` subscribes to source observable and `compareTo` `ObservableInput` (that internally
* gets converted to an observable) and buffers incoming values from each observable. Whenever either
* observable emits a value, the value is buffered and the buffers are shifted and compared from the bottom
* up; If any value pair doesn't match, the returned observable will emit `false` and complete. If one of the
* observables completes, the operator will wait for the other observable to complete; If the other
* observable emits before completing, the returned observable will emit `false` and complete. If one observable never
* completes or emits after the other completes, the returned observable will never complete.
*
* ## Example
*
* Figure out if the Konami code matches
*
* ```ts
* import { from, fromEvent, map, bufferCount, mergeMap, sequenceEqual } from 'rxjs';
*
* const codes = from([
* 'ArrowUp',
* 'ArrowUp',
* 'ArrowDown',
* 'ArrowDown',
* 'ArrowLeft',
* 'ArrowRight',
* 'ArrowLeft',
* 'ArrowRight',
* 'KeyB',
* 'KeyA',
* 'Enter', // no start key, clearly.
* ]);
*
* const keys = fromEvent(document, 'keyup').pipe(map(e => e.code));
* const matches = keys.pipe(
* bufferCount(11, 1),
* mergeMap(last11 => from(last11).pipe(sequenceEqual(codes)))
* );
* matches.subscribe(matched => console.log('Successful cheat at Contra? ', matched));
* ```
*
* @see {@link combineLatest}
* @see {@link zip}
* @see {@link withLatestFrom}
*
* @param compareTo The `ObservableInput` sequence to compare the source sequence to.
* @param comparator An optional function to compare each value pair.
*
* @return A function that returns an Observable that emits a single boolean
* value representing whether or not the values emitted by the source
* Observable and provided `ObservableInput` were equal in sequence.
*/
export function sequenceEqual(
compareTo: ObservableInput,
comparator: (a: T, b: T) => boolean = (a, b) => a === b
): OperatorFunction {
return operate((source, subscriber) => {
// The state for the source observable
const aState = createState();
// The state for the compareTo observable;
const bState = createState();
/** A utility to emit and complete */
const emit = (isEqual: boolean) => {
subscriber.next(isEqual);
subscriber.complete();
};
/**
* Creates a subscriber that subscribes to one of the sources, and compares its collected
* state -- `selfState` -- to the other source's collected state -- `otherState`. This
* is used for both streams.
*/
const createSubscriber = (selfState: SequenceState, otherState: SequenceState) => {
const sequenceEqualSubscriber = createOperatorSubscriber(
subscriber,
(a: T) => {
const { buffer, complete } = otherState;
if (buffer.length === 0) {
// If there's no values in the other buffer
// and the other stream is complete, we know
// this isn't a match, because we got one more value.
// Otherwise, we push onto our buffer, so when the other
// stream emits, it can pull this value off our buffer and check it
// at the appropriate time.
complete ? emit(false) : selfState.buffer.push(a);
} else {
// If the other stream *does* have values in its buffer,
// pull the oldest one off so we can compare it to what we
// just got. If it wasn't a match, emit `false` and complete.
!comparator(a, buffer.shift()!) && emit(false);
}
},
() => {
// Or observable completed
selfState.complete = true;
const { complete, buffer } = otherState;
// If the other observable is also complete, and there's
// still stuff left in their buffer, it doesn't match, if their
// buffer is empty, then it does match. This is because we can't
// possibly get more values here anymore.
complete && emit(buffer.length === 0);
// Be sure to clean up our stream as soon as possible if we can.
sequenceEqualSubscriber?.unsubscribe();
}
);
return sequenceEqualSubscriber;
};
// Subscribe to each source.
source.subscribe(createSubscriber(aState, bState));
innerFrom(compareTo).subscribe(createSubscriber(bState, aState));
});
}
/**
* A simple structure for the data used to test each sequence
*/
interface SequenceState {
/** A temporary store for arrived values before they are checked */
buffer: T[];
/** Whether or not the sequence source has completed. */
complete: boolean;
}
/**
* Creates a simple structure that is used to represent
* data used to test each sequence.
*/
function createState(): SequenceState {
return {
buffer: [],
complete: false,
};
}