Add 24 hour cache Add jobs to sitemap Possible warning about to many eventemitter listeners
		
			
				
	
	
		
			59 lines
		
	
	
		
			1.8 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
			
		
		
	
	
			59 lines
		
	
	
		
			1.8 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
import Rx, { AsyncSubject, Observable } from 'rx';
 | 
						|
import moment from 'moment';
 | 
						|
import debugFactory from 'debug';
 | 
						|
 | 
						|
const debug = debugFactory('fcc:rxUtils');
 | 
						|
 | 
						|
export function saveInstance(instance) {
 | 
						|
  return new Rx.Observable.create(function(observer) {
 | 
						|
    if (!instance || typeof instance.save !== 'function') {
 | 
						|
      debug('no instance or save method');
 | 
						|
      observer.onNext();
 | 
						|
      return observer.onCompleted();
 | 
						|
    }
 | 
						|
    return instance.save(function(err, savedInstance) {
 | 
						|
      if (err) {
 | 
						|
        return observer.onError(err);
 | 
						|
      }
 | 
						|
      debug('instance saved');
 | 
						|
      observer.onNext(savedInstance);
 | 
						|
      return observer.onCompleted();
 | 
						|
    });
 | 
						|
  });
 | 
						|
}
 | 
						|
 | 
						|
// alias saveInstance
 | 
						|
export const saveUser = saveInstance;
 | 
						|
 | 
						|
// observeQuery(Model: Object, methodName: String, query: Any) => Observable
 | 
						|
export function observeQuery(Model, methodName, query) {
 | 
						|
  return Rx.Observable.fromNodeCallback(Model[methodName], Model)(query);
 | 
						|
}
 | 
						|
 | 
						|
// observeMethod(
 | 
						|
//   context: Object, methodName: String
 | 
						|
// ) => (query: Any) => Observable
 | 
						|
export function observeMethod(context, methodName) {
 | 
						|
  return Rx.Observable.fromNodeCallback(context[methodName], context);
 | 
						|
}
 | 
						|
 | 
						|
// must be bound to an observable instance
 | 
						|
// timeCache(amount: Number, unit: String) => Observable
 | 
						|
export function timeCache(time, unit) {
 | 
						|
  const source = this;
 | 
						|
  let cache;
 | 
						|
  let expireCacheAt;
 | 
						|
  return Observable.create(observable => {
 | 
						|
    // if there is no expire time set
 | 
						|
    // or if expireCacheAt is smaller than now,
 | 
						|
    // set new expire time in MS and create new subscription to source
 | 
						|
    if (!expireCacheAt || expireCacheAt < Date.now()) {
 | 
						|
      // set expire in ms;
 | 
						|
      expireCacheAt = moment().add(time, unit).valueOf();
 | 
						|
      cache = new AsyncSubject();
 | 
						|
      source.subscribe(cache);
 | 
						|
    }
 | 
						|
    return cache.subscribe(observable);
 | 
						|
  });
 | 
						|
}
 |