freeCodeCamp/flattenUser.js

95 lines
2.5 KiB
JavaScript
Raw Normal View History

2015-06-07 17:15:53 -07:00
/* eslint-disable no-process-exit */
require('dotenv').load();
var assign = require('lodash/object/assign'),
Rx = require('rx'),
mongodb = require('mongodb'),
secrets = require('../config/secrets');
var MongoClient = mongodb.MongoClient;
function createConnection(URI) {
return Rx.Observable.create(function(observer) {
MongoClient.connect(URI, function(err, database) {
if (err) {
return observer.onError(err);
}
observer.onNext(database);
});
});
}
2015-06-07 22:06:57 -07:00
function createQuery(db, collection, options, batchSize) {
2015-06-07 17:15:53 -07:00
return Rx.Observable.create(function (observer) {
console.log('Creating cursor...');
2015-06-07 22:06:57 -07:00
var cursor = db.collection(collection).find({}, options);
2015-06-07 17:15:53 -07:00
cursor.batchSize(batchSize || 20);
// Cursor.each will yield all doc from a batch in the same tick,
// or schedule getting next batch on nextTick
cursor.each(function (err, doc) {
if (err) {
return observer.onError(err);
}
if (!doc) {
return observer.onCompleted();
}
observer.onNext(doc);
});
return Rx.Disposable.create(function () {
console.log('Disposing cursor...');
cursor.close();
});
});
}
2015-06-07 22:06:57 -07:00
function insertMany(db, collection, users, options) {
2015-06-07 17:15:53 -07:00
return Rx.Observable.create(function(observer) {
2015-06-07 22:06:57 -07:00
db.collection(collection).insertMany(users, options, function(err) {
2015-06-07 17:15:53 -07:00
if (err) {
return observer.onError(err);
}
observer.onCompleted();
});
});
}
var count = 0;
2015-06-07 22:06:57 -07:00
// will supply our db object
var dbObservable = createConnection(secrets.db).shareReplay();
dbObservable
2015-06-07 17:15:53 -07:00
.flatMap(function(db) {
2015-06-07 22:06:57 -07:00
// returns user document, n users per loop where n is the batchsize.
2015-06-07 17:15:53 -07:00
return createQuery(db, 'users', {});
})
.map(function(user) {
2015-06-07 22:06:57 -07:00
// flatten user
2015-06-07 17:15:53 -07:00
assign(user, user.portfolio, user.profile);
return user;
})
2015-06-07 22:06:57 -07:00
// batch them into arrays of twenty documents
.bufferWithCount(20)
// get bd object ready for insert
.withLatestFrom(dbObservable, function(users, db) {
return {
users: users,
db: db
};
2015-06-07 17:15:53 -07:00
})
2015-06-07 22:06:57 -07:00
.flatMap(function(dats) {
// bulk insert into new collection for loopback
return insertMany(dats.db, 'user', dats.users, { w: 1 });
})
// count how many times insert completes
2015-06-07 17:15:53 -07:00
.count()
.subscribe(
function(_count) {
2015-06-07 22:06:57 -07:00
count = _count * 20;
2015-06-07 17:15:53 -07:00
},
function(err) {
console.log('an error occured', err);
},
function() {
console.log('finished with %s documents processed', count);
}
);