232 lines
		
	
	
		
			5.5 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
			
		
		
	
	
			232 lines
		
	
	
		
			5.5 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
| /* eslint-disable no-process-exit */
 | |
| require('dotenv').load();
 | |
| var Rx = require('rx'),
 | |
|   uuid = require('node-uuid'),
 | |
|   assign = require('lodash/object/assign'),
 | |
|   mongodb = require('mongodb'),
 | |
|   secrets = require('../config/secrets');
 | |
| 
 | |
| const batchSize = 20;
 | |
| var MongoClient = mongodb.MongoClient;
 | |
| Rx.config.longStackSupport = true;
 | |
| 
 | |
| var providers = [
 | |
|   'facebook',
 | |
|   'twitter',
 | |
|   'google',
 | |
|   'github',
 | |
|   'linkedin'
 | |
| ];
 | |
| 
 | |
| // create async console.logs
 | |
| function debug() {
 | |
|   var args = [].slice.call(arguments);
 | |
|   process.nextTick(function() {
 | |
|     console.log.apply(console, args);
 | |
|   });
 | |
| }
 | |
| 
 | |
| function createConnection(URI) {
 | |
|   return Rx.Observable.create(function(observer) {
 | |
|     debug('connecting to db');
 | |
|     MongoClient.connect(URI, function(err, database) {
 | |
|       if (err) {
 | |
|         return observer.onError(err);
 | |
|       }
 | |
|       debug('db connected');
 | |
|       observer.onNext(database);
 | |
|       observer.onCompleted();
 | |
|     });
 | |
|   });
 | |
| }
 | |
| 
 | |
| function createQuery(db, collection, options, batchSize) {
 | |
|   return Rx.Observable.create(function(observer) {
 | |
|     var cursor = db.collection(collection).find({}, options);
 | |
|     cursor.batchSize(batchSize || 20);
 | |
|     // Cursor.each will yield all doc from a batch in the same tick,
 | |
|     // or schedule getting next batch on nextTick
 | |
|     debug('opening cursor for %s', collection);
 | |
|     cursor.each(function(err, doc) {
 | |
|       if (err) {
 | |
|         return observer.onError(err);
 | |
|       }
 | |
|       if (!doc) {
 | |
|         console.log('onCompleted');
 | |
|         return observer.onCompleted();
 | |
|       }
 | |
|       observer.onNext(doc);
 | |
|     });
 | |
| 
 | |
|     return Rx.Disposable.create(function() {
 | |
|       debug('closing cursor for %s', collection);
 | |
|       cursor.close();
 | |
|     });
 | |
|   });
 | |
| }
 | |
| 
 | |
| function getUserCount(db) {
 | |
|   return Rx.Observable.create(function(observer) {
 | |
|     var cursor = db.collection('users').count(function(err, count) {
 | |
|       if (err) {
 | |
|         return observer.onError(err);
 | |
|       }
 | |
|       observer.onNext(count);
 | |
|       observer.onCompleted();
 | |
| 
 | |
|       return Rx.Disposable.create(function() {
 | |
|         debug('closing user count');
 | |
|         cursor.close();
 | |
|       });
 | |
|     });
 | |
|   });
 | |
| }
 | |
| 
 | |
| function insertMany(db, collection, users, options) {
 | |
|   return Rx.Observable.create(function(observer) {
 | |
|     db.collection(collection).insertMany(users, options, function(err) {
 | |
|       if (err) {
 | |
|         return observer.onError(err);
 | |
|       }
 | |
|       observer.onNext();
 | |
|       observer.onCompleted();
 | |
|     });
 | |
|   });
 | |
| }
 | |
| 
 | |
| var count = 0;
 | |
| // will supply our db object
 | |
| var dbObservable = createConnection(secrets.db).replay();
 | |
| 
 | |
| var totalUser = dbObservable
 | |
|   .flatMap(function(db) {
 | |
|     return getUserCount(db);
 | |
|   })
 | |
|   .shareReplay();
 | |
| 
 | |
| var users = dbObservable
 | |
|   .flatMap(function(db) {
 | |
|     // returns user document, n users per loop where n is the batchsize.
 | |
|     return createQuery(db, 'users', {}, batchSize);
 | |
|   })
 | |
|   .map(function(user) {
 | |
|     // flatten user
 | |
|     assign(user, user.portfolio, user.profile);
 | |
|     if (!user.username) {
 | |
|       user.username = 'fcc' + uuid.v4().slice(0, 8);
 | |
|     }
 | |
|     if (user.github) {
 | |
|       user.isGithubCool = true;
 | |
|     } else {
 | |
|       user.isMigrationGrandfathered = true;
 | |
|     }
 | |
|     providers.forEach(function(provider) {
 | |
|       user[provider + 'id'] = user[provider];
 | |
|       user[provider] = null;
 | |
|     });
 | |
|     user.rand = Math.random();
 | |
| 
 | |
|     return user;
 | |
|   })
 | |
|   .shareReplay();
 | |
| 
 | |
| // batch them into arrays of twenty documents
 | |
| var userSavesCount = users
 | |
|   .bufferWithCount(batchSize)
 | |
|   // get bd object ready for insert
 | |
|   .withLatestFrom(dbObservable, function(users, db) {
 | |
|     return {
 | |
|       users: users,
 | |
|       db: db
 | |
|     };
 | |
|   })
 | |
|   .flatMap(function(dats) {
 | |
|     // bulk insert into new collection for loopback
 | |
|     return insertMany(dats.db, 'user', dats.users, { w: 1 });
 | |
|   })
 | |
|   .flatMap(function() {
 | |
|     return totalUser;
 | |
|   })
 | |
|   .doOnNext(function(totalUsers) {
 | |
|     count = count + batchSize;
 | |
|     debug('user progress %s', count / totalUsers * 100);
 | |
|   })
 | |
|   // count how many times insert completes
 | |
|   .count();
 | |
| 
 | |
| // create User Identities
 | |
| var userIdentityCount = users
 | |
|   .flatMap(function(user) {
 | |
|     var ids = providers
 | |
|       .map(function(provider) {
 | |
|         return {
 | |
|           provider: provider,
 | |
|           externalId: user[provider + 'id'],
 | |
|           userId: user._id || user.id
 | |
|         };
 | |
|       })
 | |
|       .filter(function(ident) {
 | |
|         return !!ident.externalId;
 | |
|       });
 | |
| 
 | |
|     return Rx.Observable.from(ids);
 | |
|   })
 | |
|   .bufferWithCount(batchSize)
 | |
|   .withLatestFrom(dbObservable, function(identities, db) {
 | |
|     return {
 | |
|       identities: identities,
 | |
|       db: db
 | |
|     };
 | |
|   })
 | |
|   .flatMap(function(dats) {
 | |
|     // bulk insert into new collection for loopback
 | |
|     return insertMany(dats.db, 'userIdentity', dats.identities, { w: 1 });
 | |
|   })
 | |
|   // count how many times insert completes
 | |
|   .count();
 | |
| 
 | |
| /*
 | |
| var storyCount = dbObservable
 | |
|   .flatMap(function(db) {
 | |
|     return createQuery(db, 'stories', {}, batchSize);
 | |
|   })
 | |
|   .bufferWithCount(batchSize)
 | |
|   .withLatestFrom(dbObservable, function(stories, db) {
 | |
|     return {
 | |
|       stories: stories,
 | |
|       db: db
 | |
|     };
 | |
|   })
 | |
|   .flatMap(function(dats) {
 | |
|     return insertMany(dats.db, 'story', dats.stories, { w: 1 });
 | |
|   })
 | |
|   .count();
 | |
|   */
 | |
| 
 | |
| Rx.Observable.combineLatest(
 | |
|   userIdentityCount,
 | |
|   userSavesCount,
 | |
|   // storyCount,
 | |
|   function(userIdentCount, userCount) {
 | |
|     return {
 | |
|       userIdentCount: userIdentCount * batchSize,
 | |
|       userCount: userCount * batchSize
 | |
|       // storyCount: storyCount * batchSize
 | |
|     };
 | |
|   })
 | |
|   .subscribe(
 | |
|     function(countObj) {
 | |
|       console.log('next');
 | |
|       count = countObj;
 | |
|     },
 | |
|     function(err) {
 | |
|       console.error('an error occured', err, err.stack);
 | |
|     },
 | |
|     function() {
 | |
|       console.log('finished with ', count);
 | |
|       process.exit(0);
 | |
|     }
 | |
|   );
 | |
| 
 | |
| dbObservable.connect();
 |