From 38931dd23d5f4816be77b062e33c3d1c0d80ea37 Mon Sep 17 00:00:00 2001 From: Berkeley Martinez Date: Sun, 7 Jun 2015 17:15:53 -0700 Subject: [PATCH 1/2] initial script to flatten user object --- flattenUser.js | 79 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 flattenUser.js diff --git a/flattenUser.js b/flattenUser.js new file mode 100644 index 0000000000..a01ccfb420 --- /dev/null +++ b/flattenUser.js @@ -0,0 +1,79 @@ +/* eslint-disable no-process-exit */ +require('dotenv').load(); +var assign = require('lodash/object/assign'), + Rx = require('rx'), + mongodb = require('mongodb'), + secrets = require('../config/secrets'); + +var MongoClient = mongodb.MongoClient; + +function createConnection(URI) { + return Rx.Observable.create(function(observer) { + MongoClient.connect(URI, function(err, database) { + if (err) { + return observer.onError(err); + } + observer.onNext(database); + }); + }); +} + +function createQuery(db, collection, selection, options, batchSize) { + return Rx.Observable.create(function (observer) { + console.log('Creating cursor...'); + var cursor = db.collection(collection).find(selection, options); + cursor.batchSize(batchSize || 20); + // Cursor.each will yield all doc from a batch in the same tick, + // or schedule getting next batch on nextTick + cursor.each(function (err, doc) { + if (err) { + return observer.onError(err); + } + if (!doc) { + return observer.onCompleted(); + } + observer.onNext(doc); + }); + + return Rx.Disposable.create(function () { + console.log('Disposing cursor...'); + cursor.close(); + }); + }); +} + +function saveUser(user) { + return Rx.Observable.create(function(observer) { + user.save(function(err) { + if (err) { + return observer.onError(err); + } + observer.onCompleted(); + }); + }); +} + +var count = 0; +createConnection(secrets.db) + .flatMap(function(db) { + return createQuery(db, 'users', {}); + }) + .map(function(user) { + assign(user, user.portfolio, user.profile); + return user; + }) + .flatMap(function(user) { + return saveUser(user); + }) + .count() + .subscribe( + function(_count) { + count = _count; + }, + function(err) { + console.log('an error occured', err); + }, + function() { + console.log('finished with %s documents processed', count); + } + ); From 0e240ba6300d96f006737fe8a8410783bdbc76a6 Mon Sep 17 00:00:00 2001 From: Berkeley Martinez Date: Sun, 7 Jun 2015 22:06:57 -0700 Subject: [PATCH 2/2] save users into new `user` collection --- flattenUser.js | 31 +++++++++++++++++++++++-------- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/flattenUser.js b/flattenUser.js index a01ccfb420..f919d6b471 100644 --- a/flattenUser.js +++ b/flattenUser.js @@ -18,10 +18,10 @@ function createConnection(URI) { }); } -function createQuery(db, collection, selection, options, batchSize) { +function createQuery(db, collection, options, batchSize) { return Rx.Observable.create(function (observer) { console.log('Creating cursor...'); - var cursor = db.collection(collection).find(selection, options); + var cursor = db.collection(collection).find({}, options); cursor.batchSize(batchSize || 20); // Cursor.each will yield all doc from a batch in the same tick, // or schedule getting next batch on nextTick @@ -42,9 +42,9 @@ function createQuery(db, collection, selection, options, batchSize) { }); } -function saveUser(user) { +function insertMany(db, collection, users, options) { return Rx.Observable.create(function(observer) { - user.save(function(err) { + db.collection(collection).insertMany(users, options, function(err) { if (err) { return observer.onError(err); } @@ -54,21 +54,36 @@ function saveUser(user) { } var count = 0; -createConnection(secrets.db) +// will supply our db object +var dbObservable = createConnection(secrets.db).shareReplay(); +dbObservable .flatMap(function(db) { + // returns user document, n users per loop where n is the batchsize. return createQuery(db, 'users', {}); }) .map(function(user) { + // flatten user assign(user, user.portfolio, user.profile); return user; }) - .flatMap(function(user) { - return saveUser(user); + // batch them into arrays of twenty documents + .bufferWithCount(20) + // get bd object ready for insert + .withLatestFrom(dbObservable, function(users, db) { + return { + users: users, + db: db + }; }) + .flatMap(function(dats) { + // bulk insert into new collection for loopback + return insertMany(dats.db, 'user', dats.users, { w: 1 }); + }) + // count how many times insert completes .count() .subscribe( function(_count) { - count = _count; + count = _count * 20; }, function(err) { console.log('an error occured', err);