summaryrefslogtreecommitdiff
path: root/node_modules/mongoose/lib/querystream.js
diff options
context:
space:
mode:
Diffstat (limited to 'node_modules/mongoose/lib/querystream.js')
-rw-r--r--node_modules/mongoose/lib/querystream.js179
1 files changed, 179 insertions, 0 deletions
diff --git a/node_modules/mongoose/lib/querystream.js b/node_modules/mongoose/lib/querystream.js
new file mode 100644
index 0000000..4093e60
--- /dev/null
+++ b/node_modules/mongoose/lib/querystream.js
@@ -0,0 +1,179 @@
+
+/**
+ * Module dependencies.
+ */
+
+var Stream = require('stream').Stream
+var utils = require('./utils')
+
+/**
+ * QueryStream
+ *
+ * Returns a stream interface for the `query`.
+ *
+ * @param {Query} query
+ * @return {Stream}
+ */
+
+function QueryStream (query) {
+ Stream.call(this);
+
+ this.query = query;
+ this.readable = true;
+ this.paused = false;
+ this._cursor = null;
+ this._destroyed = null;
+ this._fields = null;
+ this._ticks = 0;
+
+ // give time to hook up events
+ var self = this;
+ process.nextTick(function () {
+ self._init();
+ });
+}
+
+/**
+ * Inherit from Stream
+ * @private
+ */
+
+QueryStream.prototype.__proto__ = Stream.prototype;
+
+/**
+ * Flag stating whether or not this stream is readable.
+ */
+
+QueryStream.prototype.readable;
+
+/**
+ * Flag stating whether or not this stream is paused.
+ */
+
+QueryStream.prototype.paused;
+
+/**
+ * Initialize the query.
+ * @private
+ */
+
+QueryStream.prototype._init = function () {
+ if (this._destroyed) return;
+
+ var query = this.query
+ , model = query.model
+ , options = query._optionsForExec(model)
+ , self = this
+
+ try {
+ query.cast(model);
+ } catch (err) {
+ return self.destroy(err);
+ }
+
+ self._fields = utils.clone(options.fields = query._fields);
+
+ model.collection.find(query._conditions, options, function (err, cursor) {
+ if (err) return self.destroy(err);
+ self._cursor = cursor;
+ self._next();
+ });
+}
+
+/**
+ * Pull the next document from the cursor.
+ * @private
+ */
+
+QueryStream.prototype._next = function () {
+ if (this.paused || this._destroyed) return;
+
+ var self = this;
+
+ // nextTick is necessary to avoid stack overflows when
+ // dealing with large result sets. yield occasionally.
+ if (!(++this._ticks % 20)) {
+ process.nextTick(function () {
+ self._cursor.nextObject(function (err, doc) {
+ self._onNextObject(err, doc);
+ });
+ });
+ } else {
+ self._cursor.nextObject(function (err, doc) {
+ self._onNextObject(err, doc);
+ });
+ }
+}
+
+/**
+ * Handle each document as its returned from the cursor
+ * transforming the raw `doc` from -native into a model
+ * instance.
+ *
+ * @private
+ */
+
+QueryStream.prototype._onNextObject = function (err, doc) {
+ if (err) return this.destroy(err);
+
+ // when doc is null we hit the end of the cursor
+ if (!doc) {
+ return this.destroy();
+ }
+
+ var instance = new this.query.model(undefined, this._fields);
+
+ // skip _id for pre-init hooks
+ delete instance._doc._id;
+
+ var self = this;
+ instance.init(doc, this.query, function (err) {
+ if (err) return self.destroy(err);
+ self.emit('data', instance);
+ self._next();
+ });
+}
+
+/**
+ * Pauses this stream.
+ */
+
+QueryStream.prototype.pause = function () {
+ this.paused = true;
+}
+
+/**
+ * Resumes this stream.
+ */
+
+QueryStream.prototype.resume = function () {
+ this.paused = false;
+ this._next();
+}
+
+/**
+ * Destroys the stream, closing the underlying
+ * cursor. No more events will be emitted.
+ */
+
+QueryStream.prototype.destroy = function (err) {
+ if (this._destroyed) return;
+ this._destroyed = true;
+ this.readable = false;
+
+ if (this._cursor) {
+ this._cursor.close();
+ }
+
+ if (err) {
+ this.emit('error', err);
+ }
+
+ this.emit('close');
+}
+
+// TODO - maybe implement the -native raw option to pass binary?
+//QueryStream.prototype.setEncoding = function () {
+//}
+
+module.exports = exports = QueryStream;