diff --git a/loopbackMigration.js b/loopbackMigration.js index e300bd217b..ab86785b34 100644 --- a/loopbackMigration.js +++ b/loopbackMigration.js @@ -6,6 +6,7 @@ var Rx = require('rx'), mongodb = require('mongodb'), secrets = require('../config/secrets'); +const batchSize = 100; var MongoClient = mongodb.MongoClient; Rx.config.longStackSupport = true; @@ -27,10 +28,12 @@ function debug() { function createConnection(URI) { return Rx.Observable.create(function(observer) { + debug('connecting to db'); MongoClient.connect(URI, function(err, database) { if (err) { return observer.onError(err); } + debug('db connected'); observer.onNext(database); observer.onCompleted(); }); @@ -62,6 +65,23 @@ function createQuery(db, collection, options, batchSize) { }); } +function getUserCount(db) { + return Rx.Observable.create(function(observer) { + var cursor = db.collection('users').count(function(err, count) { + if (err) { + return observer.onError(err); + } + observer.onNext(count); + observer.onCompleted(); + + return Rx.Disposable.create(function() { + debug('closing user count'); + cursor.close(); + }); + }); + }); +} + function insertMany(db, collection, users, options) { return Rx.Observable.create(function(observer) { db.collection(collection).insertMany(users, options, function(err) { @@ -76,12 +96,18 @@ function insertMany(db, collection, users, options) { var count = 0; // will supply our db object -var dbObservable = createConnection(secrets.db).shareReplay(); +var dbObservable = createConnection(secrets.db).replay(); + +var totalUser = dbObservable + .flatMap(function(db) { + return getUserCount(db); + }) + .shareReplay(); var users = dbObservable .flatMap(function(db) { // returns user document, n users per loop where n is the batchsize. - return createQuery(db, 'users', {}); + return createQuery(db, 'users', {}, batchSize); }) .map(function(user) { // flatten user @@ -105,7 +131,7 @@ var users = dbObservable // batch them into arrays of twenty documents var userSavesCount = users - .bufferWithCount(20) + .bufferWithCount(batchSize) // get bd object ready for insert .withLatestFrom(dbObservable, function(users, db) { return { @@ -117,6 +143,13 @@ var userSavesCount = users // bulk insert into new collection for loopback return insertMany(dats.db, 'user', dats.users, { w: 1 }); }) + .flatMap(function() { + return totalUser; + }) + .doOnNext(function(totalUsers) { + count = count + batchSize; + debug('user progress %s', count / totalUsers * 100); + }) // count how many times insert completes .count(); @@ -137,7 +170,7 @@ var userIdentityCount = users return Rx.Observable.from(ids); }) - .bufferWithCount(20) + .bufferWithCount(batchSize) .withLatestFrom(dbObservable, function(identities, db) { return { identities: identities, @@ -153,9 +186,9 @@ var userIdentityCount = users var storyCount = dbObservable .flatMap(function(db) { - return createQuery(db, 'stories', {}); + return createQuery(db, 'stories', {}, batchSize); }) - .bufferWithCount(20) + .bufferWithCount(batchSize) .withLatestFrom(dbObservable, function(stories, db) { return { stories: stories, @@ -173,9 +206,9 @@ Rx.Observable.combineLatest( storyCount, function(userIdentCount, userCount, storyCount) { return { - userIdentCount: userIdentCount * 20, - userCount: userCount * 20, - storyCount: storyCount * 20 + userIdentCount: userIdentCount * batchSize, + userCount: userCount * batchSize, + storyCount: storyCount * batchSize }; }) .subscribe( @@ -191,3 +224,5 @@ Rx.Observable.combineLatest( process.exit(0); } ); + +dbObservable.connect();