Source: index.js

/**
 * @copyright Copyright 2016-2017 Kevin Locke <kevin@kevinlocke.name>
 * @license MIT
 */

'use strict';

const util = require('util');

const AbortError = require('./lib/abort-error');
const EOFError = require('./lib/eof-error');
const SyncPromise = require('./lib/sync-promise');
const TimeoutError = require('./lib/timeout-error');

const debug = util.debuglog('promised-read');

/** Attempts to unshift result data down to a desired length.
 * @param {stream.Readable} stream Stream into which to unshift data.
 * @param {!Buffer|string|!Array} result Read result data.
 * @param {number} desiredLength Desired length of result after unshifting.
 * @param {boolean=} emptySlice Return an empty slice when all data is
 * unshifted, rather than <code>null</code>.
 * @return {Buffer|string|Array} Result data after unshifting, or
 * <code>null</code> if all data was unshifted and <code>emptySlice</code> is
 * falsey.
 * @private
 */
function tryUnshift(stream, result, desiredLength, emptySlice) {
  if (typeof stream.unshift !== 'function') {
    debug('Unable to unshift, stream does not have an unshift method.');
    return result;
  }

  const errorListeners = stream.listeners('error');
  stream.removeAllListeners('error');

  // Note:  Don't rely on the EventEmitter throwing on 'error' without
  // listeners, since it may be thrown in the stream's attached domain.
  let unshiftErr;
  function onUnshiftError(err) { unshiftErr = err; }
  stream.on('error', onUnshiftError);

  let resultLength = result.length;
  try {
    if (Array.isArray(result)) {
      while (resultLength > desiredLength && !unshiftErr) {
        stream.unshift(result[resultLength - 1]);
        if (!unshiftErr) {
          resultLength -= 1;
        }
      }
    } else {
      stream.unshift(result.slice(desiredLength));
      if (!unshiftErr) {
        resultLength = desiredLength;
      }
    }
  } catch (err) {
    unshiftErr = err;
  }

  if (unshiftErr) {
    debug('Unable to unshift data: ', unshiftErr);
  }

  stream.removeListener('error', onUnshiftError);
  errorListeners.forEach((errorListener) => {
    stream.on('error', errorListener);
  });

  return resultLength === 0 && !emptySlice ? null
    : resultLength < result.length ? result.slice(0, resultLength)
      : result;
}

/** Options for {@link read}, {@link readTo}, and {@link readUntil}.
 *
 * @typedef {{
 *   Promise: function(new:Promise)|undefined,
 *   cancellable: boolean|undefined,
 *   flowing: boolean|undefined,
 *   objectMode: boolean|undefined,
 *   timeout: number|undefined
 * }} ReadOptions
 * @property {function(new:Promise)=} Promise Promise type to return.  Default
 * for non-flowing streams is the global <code>Promise</code> type, when
 * available, or another Promises/A+ and ES6 compliant promise type.  Default
 * for flowing streams is a promise type which complies with Promises/A+ and
 * ES6 with the exception that it calls <code>onResolved</code> and
 * <code>onRejected</code> synchronously.
 * @property {boolean=} cancellable Provide <code>abortRead</code> and
 * <code>cancelRead</code> methods on the returned Promise which allow the
 * caller to abort or cancel an pending read.  This has no effect on
 * cancellation support provided by the Promise library, if any.  See
 * {@link CancellableReadPromise} for details.
 * @property {boolean=} flowing Assume that the stream is in flowing mode and
 * read data using <code>'data'</code> events.  This is the default for streams
 * without a <code>.read()</code> method.
 * @property {boolean=} objectMode Treat the stream as if it were created with
 * the <code>objectMode</code> option, meaning read results are returned in an
 * Array and are never combined.  (This is always true for non-string,
 * non-Buffer reads.)
 * @property {number=} timeout Cause the read to timeout after a given number
 * of milliseconds.  The promise will be rejected with a {@link TimeoutError}
 * which will have a <code>.read</code> property with any previously read
 * values.
 */
// var ReadOptions;

/** Promise type returned by {@link read}, {@link readTo}, and
 * {@link readUntil} for {@link ReadOptions.cancellable cancellable} reads.
 *
 * <p>The returned promise will be an instance of {@link ReadOptions.Promise},
 * if set, which has the additional methods defined for this type.</p>
 *
 * <p>Note that the method names were chosen to avoid conflict with the
 * existing promise cancellation methods under consideration (e.g.
 * <code>abort</code>, and <code>cancel</code>) since they may gain defined
 * semantics which differ from the methods described here.</p>
 *
 * <p>Any promises chained from the returned promise (e.g. returned from
 * calling <code>.then()</code>) will not share the methods defined on this
 * class, which prevents abort/cancel authority from being unintentionally
 * conveyed to other consumers of the read data or its dependencies.</p>
 *
 * @ template ReturnType
 * @constructor
 * @extends Promise<ReturnType>
 * @name CancellableReadPromise
 */
// function CancellableReadPromise() {}

function readInternal(stream, size, until, options) {
  const flowing =
    (options && options.flowing) || typeof stream.read !== 'function';
  let numSize =
    size === null || Number.isNaN(Number(size)) ? undefined : Number(size);
  let objectMode = Boolean(options && options.objectMode);
  const timeout = options && options.timeout;
  const ReadPromise = (options && options.Promise)
    || (flowing ? SyncPromise : Promise);

  let abortRead;
  let cancelRead;

  const promise = new ReadPromise((resolve, reject, cancelled) => {
    let isDoneReading = false;
    let result = null;
    let timeoutID;
    function doneReading() {
      if (isDoneReading) { return; }
      isDoneReading = true;
      /* eslint-disable no-use-before-define */
      stream.removeListener('data', onData);
      stream.removeListener('end', onEnd);
      stream.removeListener('error', doReject);
      stream.removeListener('readable', readPending);
      /* eslint-enable no-use-before-define */
      if (timeoutID) { clearTimeout(timeoutID); }
    }
    function doReject(err, unshiftResult) {
      doneReading();

      if (unshiftResult && result !== null && result.length > 0) {
        result = tryUnshift(stream, result, 0);
      }

      if (result !== null) {
        if (typeof err === 'object' && err !== null) {
          // If we have read some data, include it on the error so the caller
          // can use the partial result and to avoid losing data.
          err.read = result;
        } else {
          debug('Unable to set .read on non-object reject cause.  '
            + 'Discarding data.');
        }
      }
      reject(err);
    }
    function doResolve() {
      doneReading();
      resolve(result);
    }

    /** Aborts a pending read operation, causing the Promise to be rejected.
     *
     * If the read operation is not currently pending, this does nothing.
     *
     * @function
     * @name CancellableReadPromise#abortRead
     * @see ReadOptions.cancellable
     */
    abortRead = function() {
      if (isDoneReading) { return; }
      doReject(new AbortError('read aborted'), true);
    };

    /** Cancels a pending read operation, causing the Promise never to be
     * resolved or rejected.
     *
     * If the read operation is not currently pending, this does nothing.
     *
     * @function
     * @name CancellableReadPromise#cancelRead
     * @return {Buffer|string|Array} Any previously read data which could not
     * be unshifted, or <code>null</code> if all data was unshifted.
     * @see ReadOptions.cancellable
     */
    cancelRead = function() {
      if (isDoneReading) { return null; }
      // Note:  Must stop reading before unshifting to avoid emitting a
      // 'readable' event and immediately re-reading unshifted data.
      doneReading();
      if (result && result.length > 0) {
        result = tryUnshift(stream, result, 0);
      }
      return result;
    };

    // bluebird 3.x supports cancellation (when explicitly enabled by the
    // user).  It does not provide a way to query whether it is enabled
    // (AFAICT).  The third argument could be notify or something else.
    // Check for cancel method and config function to add certainty.
    // TODO:  Find a more reliable check.
    if (typeof cancelled === 'function'
        && typeof ReadPromise.prototype.cancel === 'function'
        && typeof ReadPromise.config === 'function') {
      cancelled(cancelRead);
    }

    stream.once('error', doReject);

    if (timeout !== undefined && timeout !== null) {
      timeoutID = setTimeout(() => {
        doReject(new TimeoutError(), true);
      }, timeout);
    }

    /** Calls the until function and handles its result.
     * @return {boolean} <code>true</code> if done reading, <code>false</code>
     * otherwise.
     * @private
     */
    function checkUntil(resultWithData, data, ended) {
      let desiredLength;
      try {
        desiredLength = until(resultWithData, data, ended);
      } catch (errUntil) {
        doReject(errUntil, true);
        return true;
      }

      const resultLength = result ? result.length : 0;
      if (typeof desiredLength === 'number') {
        if (desiredLength > resultLength) {
          debug(
            'until returned a desired length of %d.  '
              + 'Only have %d.  Reading up to %d.',
            desiredLength, resultLength, desiredLength,
          );
          numSize = desiredLength;
          size = desiredLength - resultLength;
        } else if (desiredLength >= 0) {
          debug(
            'until returned a desired length of %d out of %d',
            desiredLength, resultLength,
          );
          if (desiredLength < resultLength) {
            if (ended) {
              debug('Unable to unshift:  Can not unshift after end.');
            } else {
              result = tryUnshift(stream, result, desiredLength, true);
            }
          }
          doResolve();
          return true;
        } else {
          debug('until returned %d, continuing to read', desiredLength);
        }
      } else if (desiredLength === true) {
        debug('until returned true, read finished.');
        doResolve();
        return true;
      } else if (desiredLength !== undefined
          && desiredLength !== null
          && desiredLength !== false) {
        // Note:  Although this could be allowed, it causes an Error so that
        // future versions may add behavior for these values without causing
        // breakage.
        doReject(
          new TypeError(
            `non-numeric, non-boolean until() result: ${desiredLength}`,
          ),
          true,
        );
      } else {
        debug('until returned %s, continuing to read', desiredLength);
      }

      return false;
    }

    function onEnd() {
      if (until) {
        if (!checkUntil(result, null, true)) {
          doReject(new EOFError());
        }
      } else {
        doResolve();
      }
    }
    stream.once('end', onEnd);

    // Although reading stream internals is distasteful, it is less distasteful
    // than waiting endlessly for data that will never come because the caller
    // was not careful about handling the 'end' event.
    /* eslint-disable no-underscore-dangle */
    if (stream
        && stream._readableState
        && stream._readableState.endEmitted) {
      debug('Error:  stream has ended!  Calling read after end is unreliable!');
      onEnd();
      return;
    }
    /* eslint-enable no-underscore-dangle */

    let resultBuf;
    function onData(data) {
      if (result === null) {
        objectMode = objectMode
          || (typeof data !== 'string' && !(data instanceof Buffer));
        result = objectMode ? [data] : data;
      } else if (typeof result === 'string' && typeof data === 'string') {
        result += data;
      } else if (result instanceof Buffer && data instanceof Buffer) {
        // To avoid copying result on every read, make result a slice of
        // resultBuf which grows geometrically as necessary.
        const newResultSize = data.length + result.length;
        if (!resultBuf || newResultSize > resultBuf.length) {
          let newResultBufSize = resultBuf ? resultBuf.length : 128;
          while (newResultBufSize < newResultSize) {
            // Growth factor is a time/space tradeoff.  3/2 seems reasonable.
            // https://github.com/facebook/folly/blob/master/folly/docs/FBVector.md#memory-handling
            // Use right-shift for division to avoid unnecessary float+round
            // eslint-disable-next-line no-bitwise
            newResultBufSize = (newResultBufSize * 3) >>> 1;
          }
          resultBuf = Buffer.allocUnsafe
            ? Buffer.allocUnsafe(newResultBufSize)
            : Buffer.from(newResultBufSize);
          result.copy(resultBuf);
        }
        data.copy(resultBuf, result.length);
        result = resultBuf.slice(0, newResultSize);
      } else if (Array.isArray(result)) {
        result.push(data);
      // result/data mismatch, must be in objectMode.  result becomes an Array.
      // This case is really ugly.  Well-behaved streams shouldn't use it.
      } else {
        objectMode = true;
        result = [result, data];
      }

      // When doing a single read in objectMode, behave like stream.read(size):
      // ignore size and don't wrap the result in an Array.
      if (objectMode && !until) {
        // Since resolving to null indicates EOF, we do not permit returning
        // null if it was emitted by a 'data' event.  If there is a use-case
        // for this, I'd be open to adding an option to allow it.
        // Note:  Users can use readUntil(stream, untilTrue) to get [null].
        if (result[0] === null) {
          result.shift();
          return;
        }
        if (result.length > 1) {
          result = tryUnshift(stream, result, 1);
        }
        if (result.length === 1) {
          [result] = result;
        }
        doResolve();
        return;
      }

      // If we know the target read size, check if it has been reached
      if (numSize) {
        if (result.length >= numSize) {
          if (result.length > numSize) {
            result = tryUnshift(stream, result, numSize);
          }
          doResolve();
        }
        return;
      }

      if (!until) {
        doResolve();
        return;
      }

      checkUntil(result, data, false);
    }

    function readPending() {
      while (!isDoneReading) {
        const data = stream.read(size);
        if (data === null) {
          if (!isDoneReading) {
            stream.once('readable', readPending);
          }
          return;
        }

        onData(data);
      }
    }

    if (numSize !== undefined && numSize <= 0) {
      // .read(size) will always return null.  Do it once, then done.
      if (typeof stream.read === 'function') {
        result = stream.read(size);
      }
      doResolve();
      return;
    }

    if (!flowing) {
      readPending();
    } else {
      stream.on('data', onData);
    }
  });

  if (options && (options.cancellable || options.cancelable)) {
    promise.abortRead = abortRead;
    promise.cancelRead = cancelRead;
  }

  return promise;
}

/** Reads from a stream.Readable.
 * @param {stream.Readable} stream Stream from which to read.
 * @param {number=} size Number of bytes to read.  If <code>stream.read</code>
 * is a function, <code>size</code> is passed to it, guaranteeing maximum
 * result size.  Otherwise, <code>'data'</code> events will be consumed until
 * <code>size</code> bytes are read, making it a minimum rather than an exact
 * value.
 * @param {ReadOptions=} options Options.
 * @return {Promise<Buffer|string|*>|CancellableReadPromise<Buffer|string|*>}
 * Promise with result of read or Error.  Result may be shorter than
 * <code>size</code> if <code>'end'</code> occurs and will be <code>null</code>
 * if no data can be read.  If an error occurs after reading some data, the
 * <code>.read</code> property of the error object will contain the partial
 * read result.  The promise is resolved synchronously for streams in flowing
 * mode (see README.md for details).
 */
function read(stream, size, options) {
  if (!options && typeof size === 'object') {
    options = size;
    size = null;
  }
  return readInternal(stream, size, undefined, options);
}

/** Reads from a stream.Readable until a given test is satisfied.
 * @param {stream.Readable} stream Stream from which to read.
 * @param {function((!Buffer|string|!Array), (Buffer|string|*)): number|boolean}
 * test Test function called with the data read so far and the most recent
 * chunk read.  If it returns a negative or falsey value, more data will be
 * read.  If it returns a non-negative number and the stream can be unshifted,
 * that many bytes will be returned and the others will be unshifted into the
 * stream.  Otherwise, all data read will be returned.  If it returns a number
 * larger than the length of the data read so far, enough data to reach the
 * requested length will be read before returning.  Non-numeric, non-boolean
 * values will result in an error.
 * @param {ReadOptions=} options Options.
 * @return {Promise<!Buffer|string|!Array>|
 * CancellableReadPromise<!Buffer|string|!Array>} Promise with the data read
 * and not unshifted, or an Error if one occurred.  If <code>'end'</code> is
 * emitted before <code>until</code> returns a non-negative/true value, an
 * {@link EOFError} is returned.  If an error occurs after reading some data,
 * the <code>.read</code> property of the error object will contain the partial
 * read result.  The promise is resolved synchronously for streams in flowing
 * mode (see README.md for details).
 */
function readUntil(stream, until, options) {
  if (typeof until !== 'function') {
    // Note:  Synchronous Yaku emits unhandledRejection before returning.
    // Best current option is to use an async promise, even when flowing
    const ReadPromise = (options && options.Promise) || Promise;
    return ReadPromise.reject(new TypeError('until must be a function'));
  }
  return readInternal(stream, undefined, until, options);
}

/** Reads from a stream.Readable until a given value is found.
 *
 * <p>This function calls {@link readUntil} with an <code>until</code> function
 * which uses <code>.indexOf</code> to search for <code>needle</code>.  When
 * reading Buffers and performance is paramount, consider using
 * {@link readUntil} directly with an optional function for the problem (e.g.
 * {@link
 * https://www.npmjs.com/package/buffer-indexof-fast buffer-indexof-fast} for
 * single-character search).</p>
 *
 * <p>Doc note:  options should be a ReadToOptions type which extends
 * {@link ReadOptions}, but record types can't currently be extended.
 * See {@link https://github.com/google/closure-compiler/issues/604}.</p>
 *
 * @param {stream.Readable} stream Stream from which to read.
 * @param {!Buffer|string|*} needle Value to search for in the read result.
 * The stream will be read until this value is found or <code>'end'</code> or
 * <code>'error'</code> is emitted.
 * @param {ReadOptions=} options Options.  This function additionally supports
 * an <code>endOK</code> option which prevents {@link EOFError} on
 * <code>'end'</code>.
 * @return {Promise<Buffer|string|Array>|
 * CancellableReadPromise<Buffer|string|Array>} Promise with the data read, up
 * to and including <code>needle</code>, or an Error if one occurs.  If
 * <code>stream</code> does not support <code>unshift</code>, the result may
 * include additional data.  If <code>'end'</code> is emitted before
 * <code>needle</code> is found, an {@link EOFError} is returned, unless
 * <code>options.endOK</code> is truthy in which case any remaining data is
 * returned or <code>null</code> if none was read.  If an error occurs after
 * reading some data, the <code>.read</code> property of the error object will
 * contain the partial read result.  The promise is resolved synchronously for
 * streams in flowing mode (see README.md for details).
 */
function readTo(stream, needle, options) {
  const endOK = Boolean(options && (options.endOK || options.endOk));
  let needleForIndexOf;
  let needleLength;
  function until(result, chunk, ended) {
    if (ended) {
      return endOK ? result ? result.length : 0 : -1;
    }

    if (Array.isArray(result)) {
      // objectMode.  Use strict equality, like Array.prototype.indexOf
      return chunk === needle ? result.length : -1;
    }

    // Calculate the length of the needle, as used by indexOf and perform the
    // type conversion done by indexOf once, to avoid converting on every call
    if (needleLength === undefined) {
      if (typeof result === 'string') {
        needleForIndexOf = String(needle);
        needleLength = needleForIndexOf.length;
      } else if (result instanceof Buffer) {
        if (typeof needle === 'number') {
          // buffertools requires a Buffer or string
          // buffer-indexof-polyfill converts number to Buffer on each call
          needleForIndexOf = result.indexOf ? needle
            : Buffer.from ? Buffer.from([needle])
              : Buffer.from([needle]);
          needleLength = 1;
        } else if (typeof needle === 'string') {
          needleForIndexOf = needle;
          needleLength = Buffer.byteLength(needle);
        } else if (needle instanceof Buffer) {
          needleForIndexOf = needle;
          needleLength = needle.length;
        }
      }

      if (needleLength === undefined) {
        throw new TypeError(`Unsupported indexOf argument types: ${
          Object.prototype.toString.call(result)}.indexOf(${
          Object.prototype.toString.call(needle)})`);
      }

      // Buffer.prototype.indexOf returns -1 for 0-length string/Buffer.
      // To be consistent with string, we return 0.
      // Note:  If removing this check, remove + 1 from start calc when 0.
      if (needleLength === 0) {
        return 0;
      }
    }

    const start =
      Math.max((result.length - chunk.length - needleLength) + 1, 0);
    const needleIndex = result.indexOf(needleForIndexOf, start);
    if (needleIndex < 0) {
      return -1;
    }

    return needleIndex + needleLength;
  }
  return readInternal(stream, undefined, until, options);
}

// "until" function which returns true once ended
function untilEnded(result, chunk, ended) {
  return ended;
}

/** Reads from a stream.Readable until 'end' is emitted.
 * @param {stream.Readable} stream Stream from which to read.
 * @param {ReadOptions=} options Options.
 * @return {Promise<!Buffer|string|!Array>|
 * CancellableReadPromise<!Buffer|string|!Array>} Promise with the data read,
 * <code>null</code> if no data was read, or an <code>Error</code> if one
 * occurred.  If an error occurs after reading some data, the
 * <code>.read</code> property of the error object will contain the partial
 * read result.  The promise is resolved synchronously for streams in flowing
 * mode (see README.md for details).
 */
function readToEnd(stream, options) {
  return readInternal(stream, undefined, untilEnded, options);
}

/** Reads from a stream.Readable until a given expression is matched.
 *
 * <p>This function calls {@link readUntil} with an <code>until</code> function
 * which applies <code>regexp</code> to the data read.</p>
 *
 * <p>Doc note:  options should be a ReadToMatchOptions type which extends
 * ReadToOptions, but record types can't currently be extended.
 * See {@link https://github.com/google/closure-compiler/issues/604}.</p>
 *
 * @param {stream.Readable<string>} stream Stream from which to read.  This
 * stream must produce strings (so call <code>.setEncoding</code> if necessary).
 * @param {!RegExp|string} regexp Expression to find in the read result.
 * The stream will be read until this value is matched or <code>'end'</code> or
 * <code>'error'</code> is emitted.
 * @param {ReadOptions=} options Options.  This function additionally supports
 * an <code>endOK</code> option which prevents {@link EOFError} on
 * <code>'end'</code> and a <code>maxMatchLen</code> option which specifies
 * the maximum length of a match, which allow additional search optimizations.
 * @return {Promise<string>|CancellableReadPromise<string>} Promise with the
 * data read, up to and including the data matched by <code>regexp</code>, or
 * an Error if one occurs.  If <code>stream</code> does not support
 * <code>unshift</code>, the result may include additional data.  If
 * <code>'end'</code> is emitted before <code>regexp</code> is matched, an
 * {@link EOFError} is returned, unless <code>options.endOK</code> is truthy in
 * which case any remaining data is returned or <code>null</code> if none was
 * read.  If an error occurs after reading some data, the <code>.read</code>
 * property of the error object will contain the partial read result.  The
 * promise is resolved synchronously for streams in flowing mode (see README.md
 * for details).
 */
function readToMatch(stream, regexp, options) {
  const endOK = Boolean(options && (options.endOK || options.endOk));
  const maxMatchLen = Number(options && options.maxMatchLen);
  // Convert to RegExp where necessary, like String.prototype.match
  // Make sure RegExp has global flag so lastIndex will be set
  if (!(regexp instanceof RegExp)) {
    try {
      regexp = new RegExp(regexp, 'g');
    } catch (errRegExp) {
      // Note:  Synchronous Yaku emits unhandledRejection before returning.
      // Best current option is to use an async promise, even when flowing
      const ReadPromise = (options && options.Promise) || Promise;
      return ReadPromise.reject(errRegExp);
    }
  } else if (!regexp.global) {
    regexp = new RegExp(regexp.source, `${regexp.flags || ''}g`);
  }
  function until(result, chunk, ended) {
    if (ended) {
      return endOK ? result ? result.length : 0 : -1;
    }

    if (typeof result !== 'string') {
      throw new TypeError('readToMatch requires a string stream'
          + ' (use constructor options.encoding or .setEncoding method)');
    }

    regexp.lastIndex = maxMatchLen
      ? Math.max((result.length - chunk.length - maxMatchLen) + 1, 0)
      : 0;
    if (regexp.test(result)) {
      return regexp.lastIndex;
    }

    return -1;
  }
  return readInternal(stream, undefined, until, options);
}

module.exports = {
  AbortError,
  EOFError,
  TimeoutError,
  read,
  readUntil,
  readTo,
  readToEnd,
  readToMatch,
};