Source: index.js

/**
 * @copyright Copyright 2016 Kevin Locke <kevin@kevinlocke.name>
 * @license MIT
 */

'use strict';

const { EventEmitter } = require('events');
const util = require('util');

const debug = util.debuglog('stream-compare');

/** Comparison type.
 * @enum {string}
 * @private
 */
const CompareType = {
  /** A full (non-incremental) comparison. */
  checkpoint: 'checkpoint',
  /** An incremental comparison. */
  incremental: 'incremental',
  /** A full comparison followed by <code>'end'</code>. */
  last: 'last',
};

/** Defines the available read policies.
 * @enum {string}
 */
const ReadPolicy = {
  /** Reads are done concurrently using <code>'data'</code> events. */
  flowing: 'flowing',
  /** Reads from the stream which has output the least data, measured in
   * bytes/chars for non-<code>objectMode</code> or values for
   * <code>objectMode</code>. */
  least: 'least',
  /** No reads are done.  When using this readPolicy, be sure to either add
   * <code>'data'</code> to events, add other <code>'data'</code> listeners,
   * <code>.read()</code> the data elsewhere, or call <code>.resume()</code> on
   * the streams so that the data will be read and <code>'end'</code> can be
   * reached. */
  none: 'none',
};

/** Default option values.
 * @const
 * @private
 */
const DEFAULT_OPTIONS = {
  abortOnError: false,
  delay: 0,
  endEvents: ['end', 'error'],
  // Observe Readable events other than 'data' by default
  events: ['close', 'end', 'error'],
  objectMode: false,
  /** @type {!ReadPolicy} */
  readPolicy: 'least',
};

/** Caller-visible stream state for comparison.
 *
 * Guarantees/Invariants:
 *
 * <ul>
 * <li>Equivalent states are {@link assert.deepStrictEqual}.</li>
 * <li>States can be round-tripped to JSON at any point.</li>
 * <li>States are owned by the caller, so any additional properties (which are
 *   permitted to violate the above guarantees) are preserved and the same
 *   state object is always returned.</li>
 * </ul>
 *
 * <p>As a result, objects of this class have no methods and do not contain any
 * non-state information (e.g. the stream itself or the comparison options)
 * and their prototype is never used.</p>
 *
 * @constructor
 */
function StreamState() {
  /** Has the stream emitted <code>'end'</code> or <code>'error'</code>. */
  this.ended = false;
  /** Events emitted by the stream.
   * @type !Array.<!{name: string, args: !Array}> */
  this.events = [];
  /** Are more events expected on this stream?
   *
   * Initially true, currently set false once an event in options.endEvents has
   * been emitted and no additional events have been emitted since the event
   * queue was last cleared (i.e. after setImmediate).
   */
  this.expectEvents = true;
  /** Data returned/emitted by the stream (as an <code>Array</code> if in
   * <code>objectMode</code>).
   * @type Array|Buffer|string */
  this.data = undefined;
  /** Count of total objects read in <code>objectMode</code>, bytes/chars read
   * otherwise. */
  this.totalDataLen = 0;
}

/** Options for {@link streamCompare}.
 *
 * @ template CompareResult
 * @typedef {{
 *   abortOnError: boolean|undefined,
 *   compare: ((function(!StreamState,!StreamState): CompareResult)|undefined),
 *   delay: number|undefined,
 *   endEvents: Array<string>|undefined,
 *   events: Array<string>|undefined,
 *   incremental:
 *     ((function(!StreamState,!StreamState): CompareResult)|undefined),
 *   objectMode: boolean|undefined,
 *   readPolicy: ReadPolicy|undefined
 * }} StreamCompareOptions
 * @property {boolean=} abortOnError Abort comparison and return error emitted
 * by either stream.  (default: <code>false</code>)
 * @property {function(!StreamState,!StreamState)=} compare Comparison function
 * which will be called with a StreamState object for each stream, after both
 * streams have ended.  The value returned by this function will resolve the
 * returned promise and be passed to the callback as its second argument.  A
 * value thrown by this function will reject the promise and be passed to the
 * callback as its first argument.  This function is required if incremental is
 * not specified.
 * @property {number=} delay Delay (in ms) after both streams have emitted
 * their last expected event before comparing. (default: <code>0</code>)
 * @property {Array<string>=} endEvents Names of events which signal the end of
 * a stream.  Final compare is performed once both streams have emitted an end
 * event.  (default: <code>['end', 'error']</code>)
 * @property {Array<string>=} events Names of events to compare.
 * (default: <code>['close', 'end', 'error']</code>)
 * @property {function(!StreamState,!StreamState)=} incremental Incremental
 * comparison function which will be called periodically with a StreamState
 * object for each stream.  This function may modify the StreamState objects to
 * remove data not required for later comparisons (e.g. common output) and may
 * perform the comparison before the streams have ended (e.g. due to early
 * differences).  Any non-null, non-undefined value returned by this function
 * will finish the comparison, resolve the returned promise, and be passed to
 * the callback as its second argument. A value thrown by this function will
 * finish the comparison, reject the promise and be passed to the callback as
 * its first argument.  If compare is not specified, this function will also be
 * called for the final comparison.
 * @property {boolean=} objectMode Collect values read into an Array.  This
 * allows comparison of read values without concatenation and comparison of
 * non-string/Buffer types.
 * @property {ReadPolicy=} readPolicy Scheduling discipline for reads from th
 * streams.  (default: <code>'least'</code>)
 */
// var StreamCompareOptions;

/** Promise returned by {@link streamCompare}.
 *
 * @ template CompareResult
 * @constructor
 * @name StreamComparePromise
 * @extends Promise<CompareResult>
 */
// var StreamComparePromise;

/** Compares the output of two Readable streams.
 *
 * @ template CompareResult
 * @param {!stream.Readable} stream1 First stream to compare.
 * @param {!stream.Readable} stream2 Second stream to compare.
 * @param {!StreamCompareOptions<CompareResult>|
 * function(!StreamState,!StreamState): CompareResult}
 * optionsOrCompare Options, or a comparison function (as described in
 * {@link options.compare}).
 * @return {StreamComparePromise<CompareResult>} A <code>Promise</code> with
 * the comparison result or error.
 */
function streamCompare(stream1, stream2, optionsOrCompare) {
  let options;
  if (optionsOrCompare) {
    if (typeof optionsOrCompare === 'function') {
      options = { compare: optionsOrCompare };
    } else if (typeof optionsOrCompare === 'object') {
      options = optionsOrCompare;
    } else {
      throw new TypeError('optionsOrCompare must be an object or function');
    }
  }

  options = { ...DEFAULT_OPTIONS, ...options };
  if (!options.compare) {
    options.compare = options.incremental;
  }

  // Can change this to duck typing if there are non-EventEmitter streams
  if (!(stream1 instanceof EventEmitter)) {
    throw new TypeError('stream1 must be an EventEmitter');
  }
  // Can change this to duck typing if there are non-EventEmitter streams
  if (!(stream2 instanceof EventEmitter)) {
    throw new TypeError('stream2 must be an EventEmitter');
  }
  if (options.readPolicy === 'least'
      && (typeof stream1.read !== 'function'
       || typeof stream2.read !== 'function')) {
    throw new TypeError('streams must have .read() for readPolicy \'least\'');
  }
  if (typeof options.compare !== 'function') {
    throw new TypeError('options.compare must be a function');
  }
  if (!options.endEvents
      || typeof options.endEvents !== 'object'
      || options.endEvents.length !== Math.floor(options.endEvents.length)) {
    throw new TypeError('options.endEvents must be Array-like');
  }
  options.endEvents = Array.prototype.slice.call(options.endEvents);
  if (!options.events
      || typeof options.events !== 'object'
      || options.events.length !== Math.floor(options.events.length)) {
    throw new TypeError('options.events must be Array-like');
  }
  options.events = Array.prototype.slice.call(options.events);
  if (options.incremental && typeof options.incremental !== 'function') {
    throw new TypeError('options.incremental must be a function');
  }
  if (typeof options.readPolicy !== 'string') {
    throw new TypeError('options.readPolicy must be a string');
  }
  if (!hasOwnProperty.call(ReadPolicy, options.readPolicy)) {
    throw new RangeError(`Invalid options.readPolicy '${
      options.readPolicy}'`);
  }

  let reject;
  let resolve;
  // eslint-disable-next-line promise/param-names
  const promise = new Promise((resolveArg, rejectArg) => {
    resolve = resolveArg;
    reject = rejectArg;
  });
  const state1 = new StreamState();
  const state2 = new StreamState();
  let isDone = false;
  const listeners1 = {};
  const listeners2 = {};
  let lastEventImmediate1;
  let lastEventImmediate2;
  let postEndTimeout;

  /** Gets the name of a stream for logging purposes.
   * @private
   */
  function streamName(stream) {
    return stream === stream1 ? 'stream1'
      : stream === stream2 ? 'stream2'
        : 'unknown stream';
  }

  function done(err, result) {
    isDone = true;

    debug('Unregistering stream event listeners...');

    /* eslint-disable no-use-before-define */

    Object.keys(listeners1).forEach((eventName) => {
      stream1.removeListener(eventName, listeners1[eventName]);
    });
    stream1.removeListener('readable', readNext);
    stream1.removeListener('error', onStreamError);
    stream1.removeListener('end', readNextOnEnd);
    options.endEvents.forEach((eventName) => {
      stream1.removeListener(eventName, endListener1);
    });

    Object.keys(listeners2).forEach((eventName) => {
      stream2.removeListener(eventName, listeners2[eventName]);
    });
    stream2.removeListener('readable', readNext);
    stream2.removeListener('error', onStreamError);
    stream2.removeListener('end', readNextOnEnd);
    options.endEvents.forEach((eventName) => {
      stream2.removeListener(eventName, endListener2);
    });

    /* eslint-enable no-use-before-define */

    clearImmediate(lastEventImmediate1);
    clearImmediate(lastEventImmediate2);
    clearTimeout(postEndTimeout);

    debug('Comparison finished.');
  }

  function onStreamError(err) {
    debug(`${streamName(this)} emitted error`, err);
    reject(err);
    done();
  }

  function doCompare(compareFn, type) {
    debug('Performing %s compare.', type);

    let hasResultOrError = false;
    try {
      const result = compareFn(state1, state2);
      if (result !== undefined && result !== null) {
        debug('Comparison produced a result:', result);
        hasResultOrError = true;
        resolve(result);
      }
    } catch (err) {
      debug('Comparison produced an error:', err);
      hasResultOrError = true;
      reject(err);
    }

    if (hasResultOrError) {
      done();
      return true;
    } if (type === CompareType.last) {
      resolve();
      done();
      return true;
    }

    return false;
  }

  /** Compares the states of the two streams non-incrementally.
   * @function
   * @name StreamComparePromise#checkpoint
   */
  promise.checkpoint = function checkpoint() {
    if (isDone) {
      debug('Ignoring checkpoint() after settling.');
      return;
    }

    doCompare(options.compare, CompareType.checkpoint);
  };

  /** Compares the states of the two streams non-incrementally then ends the
   * comparison whether or not compare produced a result or error.
   * @function
   * @name StreamComparePromise#end
   */
  promise.end = function end() {
    if (isDone) {
      debug('Ignoring end() after settling.');
      return;
    }

    doCompare(options.compare, CompareType.last);
  };

  function lastEventListener(stream, state) {
    debug(`Not expecting more events from ${streamName(stream)}.`);

    state.expectEvents = false;

    if (options.incremental) {
      if (doCompare(options.incremental, CompareType.incremental)) {
        return;
      }
    }

    if (!state1.expectEvents && !state2.expectEvents) {
      const postEventsCompare =
        () => doCompare(options.compare, CompareType.last);
      if (options.delay) {
        debug(`All streams have ended.  Delaying for ${options.delay
        }ms before final compare.`);
        postEndTimeout = setTimeout(postEventsCompare, options.delay);
      } else {
        postEventsCompare();
      }
    }
  }

  function anyEventListener1() {
    // If waiting for the last event on this stream, move to end of queue.
    if (lastEventImmediate1) {
      clearImmediate(lastEventImmediate1);
      lastEventImmediate1 = setImmediate(lastEventListener, stream1, state1);
    }
  }

  function anyEventListener2() {
    // If waiting for the last event on this stream, move to end of queue.
    if (lastEventImmediate2) {
      clearImmediate(lastEventImmediate2);
      lastEventImmediate2 = setImmediate(lastEventListener, stream2, state2);
    }
  }

  // Note:  Add event listeners before endListeners so end/error is recorded
  options.events.forEach((eventName) => {
    if (listeners1[eventName]) {
      return;
    }

    if (options.abortOnError && eventName === 'error') {
      // Error event is always immediately fatal.
      return;
    }

    function listener(...args) {
      this.events.push({
        name: eventName,
        args: Array.prototype.slice.call(args),
      });

      if (options.incremental) {
        doCompare(options.incremental, CompareType.incremental);
      }
    }

    function listener1(...args) {
      debug(`'${eventName}' event from stream1.`);
      listener.apply(state1, args);
      anyEventListener1();
    }
    listeners1[eventName] = listener1;
    stream1.on(eventName, listener1);

    function listener2(...args) {
      debug(`'${eventName}' event from stream2.`);
      listener.apply(state2, args);
      anyEventListener2();
    }
    listeners2[eventName] = listener2;
    stream2.on(eventName, listener2);
  });

  /** Handles stream end events.
   * @this {!Readable}
   * @private
   */
  function endListener(state) {
    // Note:  If incremental is conclusive for 'end' event, this will be called
    // with isDone === true, since removeListener doesn't affect listeners for
    // an event which is already in-progress.
    if (state.ended || isDone) {
      return;
    }

    state.ended = true;

    debug(`${streamName(this)} has ended.`);

    if (options.incremental) {
      if (doCompare(options.incremental, CompareType.incremental)) {
        return;
      }
    }

    if (state === state1) {
      lastEventImmediate1 = setImmediate(lastEventListener, this, state);
    } else {
      lastEventImmediate2 = setImmediate(lastEventListener, this, state);
    }
  }

  function endListener1() {
    anyEventListener1();
    endListener.call(this, state1);
  }
  function endListener2() {
    anyEventListener2();
    endListener.call(this, state2);
  }
  options.endEvents.forEach((eventName) => {
    if (!options.abortOnError || eventName !== 'error') {
      stream1.on(eventName, endListener1);
      stream2.on(eventName, endListener2);
    }
  });

  if (options.abortOnError) {
    stream1.once('error', onStreamError);
    stream2.once('error', onStreamError);
  }

  /** Adds data to a stream state.
   *
   * This function should be a method of StreamState, but that would violate
   * our guarantees.  We call it as if it were to convey this behavior and to
   * avoid ESLint no-param-reassign.
   *
   * @this {!StreamState}
   * @param {*} data Data read from the stream for this StreamState.
   * @private
   */
  function addData(data) {
    if (options.objectMode) {
      if (!this.data) {
        this.data = [data];
      } else {
        this.data.push(data);
      }
      this.totalDataLen += 1;
    } else if (typeof data !== 'string' && !(data instanceof Buffer)) {
      throw new TypeError(`expected string or Buffer, got ${
        Object.prototype.toString.call(data)}.  Need objectMode?`);
    } else if (this.data === null || this.data === undefined) {
      this.data = data;
      this.totalDataLen += data.length;
    } else if (typeof this.data === 'string' && typeof data === 'string') {
      // perf:  Avoid unnecessary string concatenation
      if (this.data.length === 0) {
        this.data = data;
      } else if (data.length > 0) {
        this.data += data;
      }
      this.totalDataLen += data.length;
    } else if (this.data instanceof Buffer && data instanceof Buffer) {
      // perf:  Avoid unnecessary Buffer concatenation
      if (this.data.length === 0) {
        this.data = data;
      } else if (data.length > 0) {
        // FIXME:  Potential performance issue if data or this.data are large.
        // Should append to a Buffer we control and store a slice in .data
        this.data = Buffer.concat(
          [this.data, data],
          this.data.length + data.length,
        );
      }
      this.totalDataLen += data.length;
    } else {
      throw new TypeError(`read returned ${
        Object.prototype.toString.call(data)}, previously ${
        Object.prototype.toString.call(this.data)
      }.  Need objectMode?`);
    }
  }

  /** Handles data read from the stream for a given state.
   * @private
   */
  function handleData(state, data) {
    debug('Read data from ', streamName(this));

    try {
      addData.call(state, data);
    } catch (err) {
      debug(`Error adding data from ${streamName(this)}`, err);
      reject(err);
      done();
      return;
    }

    if (options.incremental) {
      doCompare(options.incremental, CompareType.incremental);
    }
  }

  /** Reads from the non-ended stream which has the smallest totalDataLen.
   * @private
   */
  function readNext() {
    let stream, state;

    while (!isDone) {
      if (!state1.ended
          && (state2.ended || state1.totalDataLen <= state2.totalDataLen)) {
        stream = stream1;
        state = state1;
      } else if (!state2.ended) {
        stream = stream2;
        state = state2;
      } else {
        debug('All streams have ended.  No further reads.');
        return;
      }

      const data = stream.read();
      if (data === null) {
        debug(`Waiting for ${streamName(stream)} to be readable...`);
        stream.once('readable', readNext);
        return;
      }

      handleData.call(stream, state, data);
    }
  }

  /** Reads data when an 'end' event occurs.
   *
   * If 'end' occurs on the stream for which readNext is waiting for
   * 'readable', that event will never occur and it needs to start reading
   * from the other stream.
   *
   * @private
   */
  function readNextOnEnd() {
    // Remove pending 'readable' listener.
    // This is primarily for the case where readNext was listening for
    // 'readable' from the stream which _did_not_ emit 'end', which would
    // cause readNext to be listening twice when .read() returns null.
    // It also handles the case where a broken stream implementation emits
    // 'readable' after 'end'.
    stream1.removeListener('readable', readNext);
    stream2.removeListener('readable', readNext);
    return readNext.call(this);
  }

  switch (options.readPolicy) {
    case 'flowing':
      debug('Will read from streams in flowing mode.');
      stream1.on('data', handleData.bind(stream1, state1));
      stream2.on('data', handleData.bind(stream2, state2));
      break;

    case 'least':
      debug('Will read from stream with least output.');
      stream1.once('end', readNextOnEnd);
      stream2.once('end', readNextOnEnd);
      process.nextTick(readNext);
      break;

    default:
      debug('Not reading from streams.');
      break;
  }

  return promise;
}

streamCompare.makeIncremental = require('./lib/make-incremental');

module.exports = streamCompare;