From 686106d544ecc3b6ffd4db2b665d3bc879a58d8c Mon Sep 17 00:00:00 2001 From: Jules Laplace Date: Mon, 24 Sep 2012 16:22:07 -0400 Subject: ok --- node_modules/mongoose/lib/querystream.js | 179 +++++++++++++++++++++++++++++++ 1 file changed, 179 insertions(+) create mode 100644 node_modules/mongoose/lib/querystream.js (limited to 'node_modules/mongoose/lib/querystream.js') diff --git a/node_modules/mongoose/lib/querystream.js b/node_modules/mongoose/lib/querystream.js new file mode 100644 index 0000000..4093e60 --- /dev/null +++ b/node_modules/mongoose/lib/querystream.js @@ -0,0 +1,179 @@ + +/** + * Module dependencies. + */ + +var Stream = require('stream').Stream +var utils = require('./utils') + +/** + * QueryStream + * + * Returns a stream interface for the `query`. + * + * @param {Query} query + * @return {Stream} + */ + +function QueryStream (query) { + Stream.call(this); + + this.query = query; + this.readable = true; + this.paused = false; + this._cursor = null; + this._destroyed = null; + this._fields = null; + this._ticks = 0; + + // give time to hook up events + var self = this; + process.nextTick(function () { + self._init(); + }); +} + +/** + * Inherit from Stream + * @private + */ + +QueryStream.prototype.__proto__ = Stream.prototype; + +/** + * Flag stating whether or not this stream is readable. + */ + +QueryStream.prototype.readable; + +/** + * Flag stating whether or not this stream is paused. + */ + +QueryStream.prototype.paused; + +/** + * Initialize the query. + * @private + */ + +QueryStream.prototype._init = function () { + if (this._destroyed) return; + + var query = this.query + , model = query.model + , options = query._optionsForExec(model) + , self = this + + try { + query.cast(model); + } catch (err) { + return self.destroy(err); + } + + self._fields = utils.clone(options.fields = query._fields); + + model.collection.find(query._conditions, options, function (err, cursor) { + if (err) return self.destroy(err); + self._cursor = cursor; + self._next(); + }); +} + +/** + * Pull the next document from the cursor. + * @private + */ + +QueryStream.prototype._next = function () { + if (this.paused || this._destroyed) return; + + var self = this; + + // nextTick is necessary to avoid stack overflows when + // dealing with large result sets. yield occasionally. + if (!(++this._ticks % 20)) { + process.nextTick(function () { + self._cursor.nextObject(function (err, doc) { + self._onNextObject(err, doc); + }); + }); + } else { + self._cursor.nextObject(function (err, doc) { + self._onNextObject(err, doc); + }); + } +} + +/** + * Handle each document as its returned from the cursor + * transforming the raw `doc` from -native into a model + * instance. + * + * @private + */ + +QueryStream.prototype._onNextObject = function (err, doc) { + if (err) return this.destroy(err); + + // when doc is null we hit the end of the cursor + if (!doc) { + return this.destroy(); + } + + var instance = new this.query.model(undefined, this._fields); + + // skip _id for pre-init hooks + delete instance._doc._id; + + var self = this; + instance.init(doc, this.query, function (err) { + if (err) return self.destroy(err); + self.emit('data', instance); + self._next(); + }); +} + +/** + * Pauses this stream. + */ + +QueryStream.prototype.pause = function () { + this.paused = true; +} + +/** + * Resumes this stream. + */ + +QueryStream.prototype.resume = function () { + this.paused = false; + this._next(); +} + +/** + * Destroys the stream, closing the underlying + * cursor. No more events will be emitted. + */ + +QueryStream.prototype.destroy = function (err) { + if (this._destroyed) return; + this._destroyed = true; + this.readable = false; + + if (this._cursor) { + this._cursor.close(); + } + + if (err) { + this.emit('error', err); + } + + this.emit('close'); +} + +// TODO - maybe implement the -native raw option to pass binary? +//QueryStream.prototype.setEncoding = function () { +//} + +module.exports = exports = QueryStream; -- cgit v1.2.3-70-g09d2