migration improvements
This commit is contained in:
@ -6,6 +6,7 @@ var Rx = require('rx'),
|
|||||||
mongodb = require('mongodb'),
|
mongodb = require('mongodb'),
|
||||||
secrets = require('../config/secrets');
|
secrets = require('../config/secrets');
|
||||||
|
|
||||||
|
const batchSize = 100;
|
||||||
var MongoClient = mongodb.MongoClient;
|
var MongoClient = mongodb.MongoClient;
|
||||||
Rx.config.longStackSupport = true;
|
Rx.config.longStackSupport = true;
|
||||||
|
|
||||||
@ -27,10 +28,12 @@ function debug() {
|
|||||||
|
|
||||||
function createConnection(URI) {
|
function createConnection(URI) {
|
||||||
return Rx.Observable.create(function(observer) {
|
return Rx.Observable.create(function(observer) {
|
||||||
|
debug('connecting to db');
|
||||||
MongoClient.connect(URI, function(err, database) {
|
MongoClient.connect(URI, function(err, database) {
|
||||||
if (err) {
|
if (err) {
|
||||||
return observer.onError(err);
|
return observer.onError(err);
|
||||||
}
|
}
|
||||||
|
debug('db connected');
|
||||||
observer.onNext(database);
|
observer.onNext(database);
|
||||||
observer.onCompleted();
|
observer.onCompleted();
|
||||||
});
|
});
|
||||||
@ -62,6 +65,23 @@ function createQuery(db, collection, options, batchSize) {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function getUserCount(db) {
|
||||||
|
return Rx.Observable.create(function(observer) {
|
||||||
|
var cursor = db.collection('users').count(function(err, count) {
|
||||||
|
if (err) {
|
||||||
|
return observer.onError(err);
|
||||||
|
}
|
||||||
|
observer.onNext(count);
|
||||||
|
observer.onCompleted();
|
||||||
|
|
||||||
|
return Rx.Disposable.create(function() {
|
||||||
|
debug('closing user count');
|
||||||
|
cursor.close();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
function insertMany(db, collection, users, options) {
|
function insertMany(db, collection, users, options) {
|
||||||
return Rx.Observable.create(function(observer) {
|
return Rx.Observable.create(function(observer) {
|
||||||
db.collection(collection).insertMany(users, options, function(err) {
|
db.collection(collection).insertMany(users, options, function(err) {
|
||||||
@ -76,12 +96,18 @@ function insertMany(db, collection, users, options) {
|
|||||||
|
|
||||||
var count = 0;
|
var count = 0;
|
||||||
// will supply our db object
|
// will supply our db object
|
||||||
var dbObservable = createConnection(secrets.db).shareReplay();
|
var dbObservable = createConnection(secrets.db).replay();
|
||||||
|
|
||||||
|
var totalUser = dbObservable
|
||||||
|
.flatMap(function(db) {
|
||||||
|
return getUserCount(db);
|
||||||
|
})
|
||||||
|
.shareReplay();
|
||||||
|
|
||||||
var users = dbObservable
|
var users = dbObservable
|
||||||
.flatMap(function(db) {
|
.flatMap(function(db) {
|
||||||
// returns user document, n users per loop where n is the batchsize.
|
// returns user document, n users per loop where n is the batchsize.
|
||||||
return createQuery(db, 'users', {});
|
return createQuery(db, 'users', {}, batchSize);
|
||||||
})
|
})
|
||||||
.map(function(user) {
|
.map(function(user) {
|
||||||
// flatten user
|
// flatten user
|
||||||
@ -105,7 +131,7 @@ var users = dbObservable
|
|||||||
|
|
||||||
// batch them into arrays of twenty documents
|
// batch them into arrays of twenty documents
|
||||||
var userSavesCount = users
|
var userSavesCount = users
|
||||||
.bufferWithCount(20)
|
.bufferWithCount(batchSize)
|
||||||
// get bd object ready for insert
|
// get bd object ready for insert
|
||||||
.withLatestFrom(dbObservable, function(users, db) {
|
.withLatestFrom(dbObservable, function(users, db) {
|
||||||
return {
|
return {
|
||||||
@ -117,6 +143,13 @@ var userSavesCount = users
|
|||||||
// bulk insert into new collection for loopback
|
// bulk insert into new collection for loopback
|
||||||
return insertMany(dats.db, 'user', dats.users, { w: 1 });
|
return insertMany(dats.db, 'user', dats.users, { w: 1 });
|
||||||
})
|
})
|
||||||
|
.flatMap(function() {
|
||||||
|
return totalUser;
|
||||||
|
})
|
||||||
|
.doOnNext(function(totalUsers) {
|
||||||
|
count = count + batchSize;
|
||||||
|
debug('user progress %s', count / totalUsers * 100);
|
||||||
|
})
|
||||||
// count how many times insert completes
|
// count how many times insert completes
|
||||||
.count();
|
.count();
|
||||||
|
|
||||||
@ -137,7 +170,7 @@ var userIdentityCount = users
|
|||||||
|
|
||||||
return Rx.Observable.from(ids);
|
return Rx.Observable.from(ids);
|
||||||
})
|
})
|
||||||
.bufferWithCount(20)
|
.bufferWithCount(batchSize)
|
||||||
.withLatestFrom(dbObservable, function(identities, db) {
|
.withLatestFrom(dbObservable, function(identities, db) {
|
||||||
return {
|
return {
|
||||||
identities: identities,
|
identities: identities,
|
||||||
@ -153,9 +186,9 @@ var userIdentityCount = users
|
|||||||
|
|
||||||
var storyCount = dbObservable
|
var storyCount = dbObservable
|
||||||
.flatMap(function(db) {
|
.flatMap(function(db) {
|
||||||
return createQuery(db, 'stories', {});
|
return createQuery(db, 'stories', {}, batchSize);
|
||||||
})
|
})
|
||||||
.bufferWithCount(20)
|
.bufferWithCount(batchSize)
|
||||||
.withLatestFrom(dbObservable, function(stories, db) {
|
.withLatestFrom(dbObservable, function(stories, db) {
|
||||||
return {
|
return {
|
||||||
stories: stories,
|
stories: stories,
|
||||||
@ -173,9 +206,9 @@ Rx.Observable.combineLatest(
|
|||||||
storyCount,
|
storyCount,
|
||||||
function(userIdentCount, userCount, storyCount) {
|
function(userIdentCount, userCount, storyCount) {
|
||||||
return {
|
return {
|
||||||
userIdentCount: userIdentCount * 20,
|
userIdentCount: userIdentCount * batchSize,
|
||||||
userCount: userCount * 20,
|
userCount: userCount * batchSize,
|
||||||
storyCount: storyCount * 20
|
storyCount: storyCount * batchSize
|
||||||
};
|
};
|
||||||
})
|
})
|
||||||
.subscribe(
|
.subscribe(
|
||||||
@ -191,3 +224,5 @@ Rx.Observable.combineLatest(
|
|||||||
process.exit(0);
|
process.exit(0);
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
|
dbObservable.connect();
|
||||||
|
Reference in New Issue
Block a user