300 lines
8.1 KiB
JavaScript
300 lines
8.1 KiB
JavaScript
|
|
||
|
/**
|
||
|
* Module dependencies.
|
||
|
*/
|
||
|
|
||
|
var assert = require('assert');
|
||
|
var debug = require('debug')('stream-parser');
|
||
|
|
||
|
/**
|
||
|
* Module exports.
|
||
|
*/
|
||
|
|
||
|
module.exports = Parser;
|
||
|
|
||
|
/**
|
||
|
* Parser states.
|
||
|
*/
|
||
|
|
||
|
var INIT = -1;
|
||
|
var BUFFERING = 0;
|
||
|
var SKIPPING = 1;
|
||
|
var PASSTHROUGH = 2;
|
||
|
|
||
|
/**
|
||
|
* The `Parser` stream mixin works with either `Writable` or `Transform` stream
|
||
|
* instances/subclasses. Provides a convenient generic "parsing" API:
|
||
|
*
|
||
|
* _bytes(n, cb) - buffers "n" bytes and then calls "cb" with the "chunk"
|
||
|
* _skipBytes(n, cb) - skips "n" bytes and then calls "cb" when done
|
||
|
*
|
||
|
* If you extend a `Transform` stream, then the `_passthrough()` function is also
|
||
|
* added:
|
||
|
*
|
||
|
* _passthrough(n, cb) - passes through "n" bytes untouched and then calls "cb"
|
||
|
*
|
||
|
* @param {Stream} stream Transform or Writable stream instance to extend
|
||
|
* @api public
|
||
|
*/
|
||
|
|
||
|
function Parser (stream) {
|
||
|
var isTransform = stream && 'function' == typeof stream._transform;
|
||
|
var isWritable = stream && 'function' == typeof stream._write;
|
||
|
|
||
|
if (!isTransform && !isWritable) throw new Error('must pass a Writable or Transform stream in');
|
||
|
debug('extending Parser into stream');
|
||
|
|
||
|
// Transform streams and Writable streams get `_bytes()` and `_skipBytes()`
|
||
|
stream._bytes = _bytes;
|
||
|
stream._skipBytes = _skipBytes;
|
||
|
|
||
|
// only Transform streams get the `_passthrough()` function
|
||
|
if (isTransform) stream._passthrough = _passthrough;
|
||
|
|
||
|
// take control of the streams2 callback functions for this stream
|
||
|
if (isTransform) {
|
||
|
stream._transform = transform;
|
||
|
} else {
|
||
|
stream._write = write;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
function init (stream) {
|
||
|
debug('initializing parser stream');
|
||
|
|
||
|
// number of bytes left to parser for the next "chunk"
|
||
|
stream._parserBytesLeft = 0;
|
||
|
|
||
|
// array of Buffer instances that make up the next "chunk"
|
||
|
stream._parserBuffers = [];
|
||
|
|
||
|
// number of bytes parsed so far for the next "chunk"
|
||
|
stream._parserBuffered = 0;
|
||
|
|
||
|
// flag that keeps track of if what the parser should do with bytes received
|
||
|
stream._parserState = INIT;
|
||
|
|
||
|
// the callback for the next "chunk"
|
||
|
stream._parserCallback = null;
|
||
|
|
||
|
// XXX: backwards compat with the old Transform API... remove at some point..
|
||
|
if ('function' == typeof stream.push) {
|
||
|
stream._parserOutput = stream.push.bind(stream);
|
||
|
}
|
||
|
|
||
|
stream._parserInit = true;
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Buffers `n` bytes and then invokes `fn` once that amount has been collected.
|
||
|
*
|
||
|
* @param {Number} n the number of bytes to buffer
|
||
|
* @param {Function} fn callback function to invoke when `n` bytes are buffered
|
||
|
* @api public
|
||
|
*/
|
||
|
|
||
|
function _bytes (n, fn) {
|
||
|
assert(!this._parserCallback, 'there is already a "callback" set!');
|
||
|
assert(isFinite(n) && n > 0, 'can only buffer a finite number of bytes > 0, got "' + n + '"');
|
||
|
if (!this._parserInit) init(this);
|
||
|
debug('buffering %o bytes', n);
|
||
|
this._parserBytesLeft = n;
|
||
|
this._parserCallback = fn;
|
||
|
this._parserState = BUFFERING;
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Skips over the next `n` bytes, then invokes `fn` once that amount has
|
||
|
* been discarded.
|
||
|
*
|
||
|
* @param {Number} n the number of bytes to discard
|
||
|
* @param {Function} fn callback function to invoke when `n` bytes have been skipped
|
||
|
* @api public
|
||
|
*/
|
||
|
|
||
|
function _skipBytes (n, fn) {
|
||
|
assert(!this._parserCallback, 'there is already a "callback" set!');
|
||
|
assert(n > 0, 'can only skip > 0 bytes, got "' + n + '"');
|
||
|
if (!this._parserInit) init(this);
|
||
|
debug('skipping %o bytes', n);
|
||
|
this._parserBytesLeft = n;
|
||
|
this._parserCallback = fn;
|
||
|
this._parserState = SKIPPING;
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Passes through `n` bytes to the readable side of this stream untouched,
|
||
|
* then invokes `fn` once that amount has been passed through.
|
||
|
*
|
||
|
* @param {Number} n the number of bytes to pass through
|
||
|
* @param {Function} fn callback function to invoke when `n` bytes have passed through
|
||
|
* @api public
|
||
|
*/
|
||
|
|
||
|
function _passthrough (n, fn) {
|
||
|
assert(!this._parserCallback, 'There is already a "callback" set!');
|
||
|
assert(n > 0, 'can only pass through > 0 bytes, got "' + n + '"');
|
||
|
if (!this._parserInit) init(this);
|
||
|
debug('passing through %o bytes', n);
|
||
|
this._parserBytesLeft = n;
|
||
|
this._parserCallback = fn;
|
||
|
this._parserState = PASSTHROUGH;
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* The `_write()` callback function implementation.
|
||
|
*
|
||
|
* @api private
|
||
|
*/
|
||
|
|
||
|
function write (chunk, encoding, fn) {
|
||
|
if (!this._parserInit) init(this);
|
||
|
debug('write(%o bytes)', chunk.length);
|
||
|
|
||
|
// XXX: old Writable stream API compat... remove at some point...
|
||
|
if ('function' == typeof encoding) fn = encoding;
|
||
|
|
||
|
data(this, chunk, null, fn);
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* The `_transform()` callback function implementation.
|
||
|
*
|
||
|
* @api private
|
||
|
*/
|
||
|
|
||
|
|
||
|
function transform (chunk, output, fn) {
|
||
|
if (!this._parserInit) init(this);
|
||
|
debug('transform(%o bytes)', chunk.length);
|
||
|
|
||
|
// XXX: old Transform stream API compat... remove at some point...
|
||
|
if ('function' != typeof output) {
|
||
|
output = this._parserOutput;
|
||
|
}
|
||
|
|
||
|
data(this, chunk, output, fn);
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* The internal buffering/passthrough logic...
|
||
|
*
|
||
|
* This `_data` function get's "trampolined" to prevent stack overflows for tight
|
||
|
* loops. This technique requires us to return a "thunk" function for any
|
||
|
* synchronous action. Async stuff breaks the trampoline, but that's ok since it's
|
||
|
* working with a new stack at that point anyway.
|
||
|
*
|
||
|
* @api private
|
||
|
*/
|
||
|
|
||
|
function _data (stream, chunk, output, fn) {
|
||
|
if (stream._parserBytesLeft <= 0) {
|
||
|
return fn(new Error('got data but not currently parsing anything'));
|
||
|
}
|
||
|
|
||
|
if (chunk.length <= stream._parserBytesLeft) {
|
||
|
// small buffer fits within the "_parserBytesLeft" window
|
||
|
return function () {
|
||
|
return process(stream, chunk, output, fn);
|
||
|
};
|
||
|
} else {
|
||
|
// large buffer needs to be sliced on "_parserBytesLeft" and processed
|
||
|
return function () {
|
||
|
var b = chunk.slice(0, stream._parserBytesLeft);
|
||
|
return process(stream, b, output, function (err) {
|
||
|
if (err) return fn(err);
|
||
|
if (chunk.length > b.length) {
|
||
|
return function () {
|
||
|
return _data(stream, chunk.slice(b.length), output, fn);
|
||
|
};
|
||
|
}
|
||
|
});
|
||
|
};
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* The internal `process` function gets called by the `data` function when
|
||
|
* something "interesting" happens. This function takes care of buffering the
|
||
|
* bytes when buffering, passing through the bytes when doing that, and invoking
|
||
|
* the user callback when the number of bytes has been reached.
|
||
|
*
|
||
|
* @api private
|
||
|
*/
|
||
|
|
||
|
function process (stream, chunk, output, fn) {
|
||
|
stream._parserBytesLeft -= chunk.length;
|
||
|
debug('%o bytes left for stream piece', stream._parserBytesLeft);
|
||
|
|
||
|
if (stream._parserState === BUFFERING) {
|
||
|
// buffer
|
||
|
stream._parserBuffers.push(chunk);
|
||
|
stream._parserBuffered += chunk.length;
|
||
|
} else if (stream._parserState === PASSTHROUGH) {
|
||
|
// passthrough
|
||
|
output(chunk);
|
||
|
}
|
||
|
// don't need to do anything for the SKIPPING case
|
||
|
|
||
|
if (0 === stream._parserBytesLeft) {
|
||
|
// done with stream "piece", invoke the callback
|
||
|
var cb = stream._parserCallback;
|
||
|
if (cb && stream._parserState === BUFFERING && stream._parserBuffers.length > 1) {
|
||
|
chunk = Buffer.concat(stream._parserBuffers, stream._parserBuffered);
|
||
|
}
|
||
|
if (stream._parserState !== BUFFERING) {
|
||
|
chunk = null;
|
||
|
}
|
||
|
stream._parserCallback = null;
|
||
|
stream._parserBuffered = 0;
|
||
|
stream._parserState = INIT;
|
||
|
stream._parserBuffers.splice(0); // empty
|
||
|
|
||
|
if (cb) {
|
||
|
var args = [];
|
||
|
if (chunk) {
|
||
|
// buffered
|
||
|
args.push(chunk);
|
||
|
} else {
|
||
|
// passthrough
|
||
|
}
|
||
|
if (output) {
|
||
|
// on a Transform stream, has "output" function
|
||
|
args.push(output);
|
||
|
}
|
||
|
var async = cb.length > args.length;
|
||
|
if (async) {
|
||
|
args.push(trampoline(fn));
|
||
|
}
|
||
|
// invoke cb
|
||
|
var rtn = cb.apply(stream, args);
|
||
|
if (!async || fn === rtn) return fn;
|
||
|
}
|
||
|
} else {
|
||
|
// need more bytes
|
||
|
return fn;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
var data = trampoline(_data);
|
||
|
|
||
|
/**
|
||
|
* Generic thunk-based "trampoline" helper function.
|
||
|
*
|
||
|
* @param {Function} input function
|
||
|
* @return {Function} "trampolined" function
|
||
|
* @api private
|
||
|
*/
|
||
|
|
||
|
function trampoline (fn) {
|
||
|
return function () {
|
||
|
var result = fn.apply(this, arguments);
|
||
|
|
||
|
while ('function' == typeof result) {
|
||
|
result = result();
|
||
|
}
|
||
|
|
||
|
return result;
|
||
|
};
|
||
|
}
|