From 28f273e7ee82dc75ebfe0bc8011e7de1f27384b9 Mon Sep 17 00:00:00 2001 From: Berkeley Martinez Date: Sun, 7 Jun 2015 22:06:57 -0700 Subject: [PATCH] save users into new `user` collection --- seed/flattenUser.js | 31 +++++++++++++++++++++++-------- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/seed/flattenUser.js b/seed/flattenUser.js index a01ccfb420..f919d6b471 100644 --- a/seed/flattenUser.js +++ b/seed/flattenUser.js @@ -18,10 +18,10 @@ function createConnection(URI) { }); } -function createQuery(db, collection, selection, options, batchSize) { +function createQuery(db, collection, options, batchSize) { return Rx.Observable.create(function (observer) { console.log('Creating cursor...'); - var cursor = db.collection(collection).find(selection, options); + var cursor = db.collection(collection).find({}, options); cursor.batchSize(batchSize || 20); // Cursor.each will yield all doc from a batch in the same tick, // or schedule getting next batch on nextTick @@ -42,9 +42,9 @@ function createQuery(db, collection, selection, options, batchSize) { }); } -function saveUser(user) { +function insertMany(db, collection, users, options) { return Rx.Observable.create(function(observer) { - user.save(function(err) { + db.collection(collection).insertMany(users, options, function(err) { if (err) { return observer.onError(err); } @@ -54,21 +54,36 @@ function saveUser(user) { } var count = 0; -createConnection(secrets.db) +// will supply our db object +var dbObservable = createConnection(secrets.db).shareReplay(); +dbObservable .flatMap(function(db) { + // returns user document, n users per loop where n is the batchsize. return createQuery(db, 'users', {}); }) .map(function(user) { + // flatten user assign(user, user.portfolio, user.profile); return user; }) - .flatMap(function(user) { - return saveUser(user); + // batch them into arrays of twenty documents + .bufferWithCount(20) + // get bd object ready for insert + .withLatestFrom(dbObservable, function(users, db) { + return { + users: users, + db: db + }; }) + .flatMap(function(dats) { + // bulk insert into new collection for loopback + return insertMany(dats.db, 'user', dats.users, { w: 1 }); + }) + // count how many times insert completes .count() .subscribe( function(_count) { - count = _count; + count = _count * 20; }, function(err) { console.log('an error occured', err);