add create identities on flatten users
This commit is contained in:
@ -67,6 +67,7 @@
|
|||||||
"mongodb": "^2.0.33",
|
"mongodb": "^2.0.33",
|
||||||
"morgan": "~1.5.0",
|
"morgan": "~1.5.0",
|
||||||
"node-slack": "0.0.7",
|
"node-slack": "0.0.7",
|
||||||
|
"node-uuid": "^1.4.3",
|
||||||
"nodemailer": "~1.3.0",
|
"nodemailer": "~1.3.0",
|
||||||
"passport-facebook": "^2.0.0",
|
"passport-facebook": "^2.0.0",
|
||||||
"passport-google-oauth": "^0.2.0",
|
"passport-google-oauth": "^0.2.0",
|
||||||
|
@ -1,12 +1,21 @@
|
|||||||
/* eslint-disable no-process-exit */
|
/* eslint-disable no-process-exit */
|
||||||
require('dotenv').load();
|
require('dotenv').load();
|
||||||
var assign = require('lodash/object/assign'),
|
var Rx = require('rx'),
|
||||||
Rx = require('rx'),
|
uuid = require('node-uuid'),
|
||||||
|
assign = require('lodash/object/assign'),
|
||||||
mongodb = require('mongodb'),
|
mongodb = require('mongodb'),
|
||||||
secrets = require('../config/secrets');
|
secrets = require('../config/secrets');
|
||||||
|
|
||||||
var MongoClient = mongodb.MongoClient;
|
var MongoClient = mongodb.MongoClient;
|
||||||
|
|
||||||
|
var providers = [
|
||||||
|
'facebook',
|
||||||
|
'twitter',
|
||||||
|
'google',
|
||||||
|
'github',
|
||||||
|
'linkedin'
|
||||||
|
];
|
||||||
|
|
||||||
function createConnection(URI) {
|
function createConnection(URI) {
|
||||||
return Rx.Observable.create(function(observer) {
|
return Rx.Observable.create(function(observer) {
|
||||||
MongoClient.connect(URI, function(err, database) {
|
MongoClient.connect(URI, function(err, database) {
|
||||||
@ -48,6 +57,7 @@ function insertMany(db, collection, users, options) {
|
|||||||
if (err) {
|
if (err) {
|
||||||
return observer.onError(err);
|
return observer.onError(err);
|
||||||
}
|
}
|
||||||
|
observer.onNext();
|
||||||
observer.onCompleted();
|
observer.onCompleted();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
@ -56,7 +66,8 @@ function insertMany(db, collection, users, options) {
|
|||||||
var count = 0;
|
var count = 0;
|
||||||
// will supply our db object
|
// will supply our db object
|
||||||
var dbObservable = createConnection(secrets.db).shareReplay();
|
var dbObservable = createConnection(secrets.db).shareReplay();
|
||||||
dbObservable
|
|
||||||
|
var users = dbObservable
|
||||||
.flatMap(function(db) {
|
.flatMap(function(db) {
|
||||||
// returns user document, n users per loop where n is the batchsize.
|
// returns user document, n users per loop where n is the batchsize.
|
||||||
return createQuery(db, 'users', {});
|
return createQuery(db, 'users', {});
|
||||||
@ -66,7 +77,17 @@ dbObservable
|
|||||||
assign(user, user.portfolio, user.profile);
|
assign(user, user.portfolio, user.profile);
|
||||||
return user;
|
return user;
|
||||||
})
|
})
|
||||||
// batch them into arrays of twenty documents
|
.map(function(user) {
|
||||||
|
if (user.username) {
|
||||||
|
return user;
|
||||||
|
}
|
||||||
|
user.username = 'fcc' + uuid.v4().slice(0, 8);
|
||||||
|
return user;
|
||||||
|
})
|
||||||
|
.shareReplay();
|
||||||
|
|
||||||
|
// batch them into arrays of twenty documents
|
||||||
|
var userSavesCount = users
|
||||||
.bufferWithCount(20)
|
.bufferWithCount(20)
|
||||||
// get bd object ready for insert
|
// get bd object ready for insert
|
||||||
.withLatestFrom(dbObservable, function(users, db) {
|
.withLatestFrom(dbObservable, function(users, db) {
|
||||||
@ -80,10 +101,45 @@ dbObservable
|
|||||||
return insertMany(dats.db, 'user', dats.users, { w: 1 });
|
return insertMany(dats.db, 'user', dats.users, { w: 1 });
|
||||||
})
|
})
|
||||||
// count how many times insert completes
|
// count how many times insert completes
|
||||||
.count()
|
.count();
|
||||||
|
|
||||||
|
// create User Identities
|
||||||
|
var userIdentityCount = users
|
||||||
|
.flatMap(function(user) {
|
||||||
|
var ids = providers
|
||||||
|
.map(function(provider) {
|
||||||
|
return {
|
||||||
|
provider: provider,
|
||||||
|
externalId: user[provider]
|
||||||
|
};
|
||||||
|
})
|
||||||
|
.filter(function(ident) {
|
||||||
|
return !!ident.externalId;
|
||||||
|
});
|
||||||
|
|
||||||
|
return Rx.Observable.from(ids);
|
||||||
|
})
|
||||||
|
.bufferWithCount(20)
|
||||||
|
.withLatestFrom(dbObservable, function(identities, db) {
|
||||||
|
return {
|
||||||
|
identities: identities,
|
||||||
|
db: db
|
||||||
|
};
|
||||||
|
})
|
||||||
|
.flatMap(function(dats) {
|
||||||
|
// bulk insert into new collection for loopback
|
||||||
|
return insertMany(dats.db, 'userIdentity', dats.identities, { w: 1 });
|
||||||
|
})
|
||||||
|
// count how many times insert completes
|
||||||
|
.count();
|
||||||
|
|
||||||
|
Rx.Observable.merge(
|
||||||
|
userIdentityCount,
|
||||||
|
userSavesCount
|
||||||
|
)
|
||||||
.subscribe(
|
.subscribe(
|
||||||
function(_count) {
|
function(_count) {
|
||||||
count = _count * 20;
|
count += _count * 20;
|
||||||
},
|
},
|
||||||
function(err) {
|
function(err) {
|
||||||
console.log('an error occured', err);
|
console.log('an error occured', err);
|
||||||
|
Reference in New Issue
Block a user