summaryrefslogtreecommitdiff
path: root/node_modules/mongoose/test/model.stream.test.js
diff options
context:
space:
mode:
Diffstat (limited to 'node_modules/mongoose/test/model.stream.test.js')
-rw-r--r--node_modules/mongoose/test/model.stream.test.js232
1 files changed, 232 insertions, 0 deletions
diff --git a/node_modules/mongoose/test/model.stream.test.js b/node_modules/mongoose/test/model.stream.test.js
new file mode 100644
index 0000000..a010b0e
--- /dev/null
+++ b/node_modules/mongoose/test/model.stream.test.js
@@ -0,0 +1,232 @@
+
+/**
+ * Test dependencies.
+ */
+
+var start = require('./common')
+ , should = require('should')
+ , mongoose = start.mongoose
+ , utils = require('../lib/utils')
+ , random = utils.random
+ , Query = require('../lib/query')
+ , Schema = mongoose.Schema
+ , SchemaType = mongoose.SchemaType
+ , CastError = SchemaType.CastError
+ , ObjectId = Schema.ObjectId
+ , MongooseBuffer = mongoose.Types.Buffer
+ , DocumentObjectId = mongoose.Types.ObjectId
+ , fs = require('fs')
+
+var names = ('Aaden Aaron Adrian Aditya Agustin Jim Bob Jonah Frank Sally Lucy').split(' ');
+
+/**
+ * Setup.
+ */
+
+var Person = new Schema({
+ name: String
+});
+
+mongoose.model('PersonForStream', Person);
+var collection = 'personforstream_' + random();
+
+;(function setup () {
+ var db = start()
+ , P = db.model('PersonForStream', collection)
+
+ var people = names.map(function (name) {
+ return { name: name };
+ });
+
+ P.create(people, function (err) {
+ should.strictEqual(null, err);
+ db.close();
+ assignExports();
+ });
+})()
+
+function assignExports () { var o = {
+
+ 'cursor stream': function () {
+ var db = start()
+ , P = db.model('PersonForStream', collection)
+ , i = 0
+ , closed = 0
+ , paused = 0
+ , resumed = 0
+ , err
+
+ var stream = P.find({}).stream();
+
+ stream.on('data', function (doc) {
+ should.strictEqual(true, !! doc.name);
+ should.strictEqual(true, !! doc._id);
+
+ if (paused > 0 && 0 === resumed) {
+ err = new Error('data emitted during pause');
+ return done();
+ }
+
+ if (++i === 3) {
+ stream.paused.should.be.false;
+ stream.pause();
+ stream.paused.should.be.true;
+ paused++;
+
+ setTimeout(function () {
+ stream.paused.should.be.true;
+ resumed++;
+ stream.resume();
+ stream.paused.should.be.false;
+ }, 20);
+ }
+ });
+
+ stream.on('error', function (er) {
+ err = er;
+ done();
+ });
+
+ stream.on('close', function () {
+ closed++;
+ done();
+ });
+
+ function done () {
+ db.close();
+ should.strictEqual(undefined, err);
+ should.equal(i, names.length);
+ closed.should.equal(1);
+ paused.should.equal(1);
+ resumed.should.equal(1);
+ stream._cursor.isClosed().should.be.true;
+ }
+ }
+
+, 'immediately destroying a stream prevents the query from executing': function () {
+ var db = start()
+ , P = db.model('PersonForStream', collection)
+ , i = 0
+
+ var stream = P.where('name', 'Jonah').select('name').findOne().stream();
+
+ stream.on('data', function () {
+ i++;
+ })
+ stream.on('close', done);
+ stream.on('error', done);
+
+ stream.destroy();
+
+ function done (err) {
+ should.strictEqual(undefined, err);
+ i.should.equal(0);
+ process.nextTick(function () {
+ db.close();
+ should.strictEqual(null, stream._fields);
+ })
+ }
+ }
+
+, 'destroying a stream stops it': function () {
+ var db = start()
+ , P = db.model('PersonForStream', collection)
+ , finished = 0
+ , i = 0
+
+ var stream = P.where('name').$exists().limit(10).only('_id').stream();
+
+ should.strictEqual(null, stream._destroyed);
+ stream.readable.should.be.true;
+
+ stream.on('data', function (doc) {
+ should.strictEqual(undefined, doc.name);
+ if (++i === 5) {
+ stream.destroy();
+ stream.readable.should.be.false;
+ }
+ });
+
+ stream.on('close', done);
+ stream.on('error', done);
+
+ function done (err) {
+ ++finished;
+ setTimeout(function () {
+ db.close();
+ should.strictEqual(undefined, err);
+ i.should.equal(5);
+ finished.should.equal(1);
+ stream._destroyed.should.equal(true);
+ stream.readable.should.be.false;
+ stream._cursor.isClosed().should.be.true;
+ }, 150)
+ }
+ }
+
+, 'cursor stream errors': function () {
+ var db = start({ server: { auto_reconnect: false }})
+ , P = db.model('PersonForStream', collection)
+ , finished = 0
+ , closed = 0
+ , i = 0
+
+ var stream = P.find().batchSize(5).stream();
+
+ stream.on('data', function (doc) {
+ if (++i === 5) {
+ db.close();
+ }
+ });
+
+ stream.on('close', function () {
+ closed++;
+ });
+
+ stream.on('error', done);
+
+ function done (err) {
+ ++finished;
+ setTimeout(function () {
+ should.equal('no open connections', err.message);
+ i.should.equal(5);
+ closed.should.equal(1);
+ finished.should.equal(1);
+ stream._destroyed.should.equal(true);
+ stream.readable.should.be.false;
+ stream._cursor.isClosed().should.be.true;
+ }, 150)
+ }
+ }
+
+, 'cursor stream pipe': function () {
+ var db = start()
+ , P = db.model('PersonForStream', collection)
+ , filename = '/tmp/_mongoose_stream_out.txt'
+ , out = fs.createWriteStream(filename)
+
+ var stream = P.find().sort('name', 1).limit(20).stream();
+ stream.pipe(out);
+
+ stream.on('error', done);
+ stream.on('close', done);
+
+ function done (err) {
+ db.close();
+ should.strictEqual(undefined, err);
+ var contents = fs.readFileSync(filename, 'utf8');
+ ;/Aaden/.test(contents).should.be.true;
+ ;/Aaron/.test(contents).should.be.true;
+ ;/Adrian/.test(contents).should.be.true;
+ ;/Aditya/.test(contents).should.be.true;
+ ;/Agustin/.test(contents).should.be.true;
+ fs.unlink(filename);
+ }
+ }
+}
+
+// end exports
+
+utils.merge(exports, o);
+
+}