chore(server): Move api-server in to it's own DIR
This commit is contained in:
committed by
mrugesh mohapatra
parent
9fba6bce4c
commit
46a217d0a5
58
api-server/server/utils/rx.js
Normal file
58
api-server/server/utils/rx.js
Normal file
@@ -0,0 +1,58 @@
|
||||
import Rx, { AsyncSubject, Observable } from 'rx';
|
||||
import moment from 'moment';
|
||||
import debugFactory from 'debug';
|
||||
|
||||
const debug = debugFactory('fcc:rxUtils');
|
||||
|
||||
export function saveInstance(instance) {
|
||||
return new Rx.Observable.create(function(observer) {
|
||||
if (!instance || typeof instance.save !== 'function') {
|
||||
debug('no instance or save method');
|
||||
observer.onNext();
|
||||
return observer.onCompleted();
|
||||
}
|
||||
return instance.save(function(err, savedInstance) {
|
||||
if (err) {
|
||||
return observer.onError(err);
|
||||
}
|
||||
debug('instance saved');
|
||||
observer.onNext(savedInstance);
|
||||
return observer.onCompleted();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// alias saveInstance
|
||||
export const saveUser = saveInstance;
|
||||
|
||||
// observeQuery(Model: Object, methodName: String, query: Any) => Observable
|
||||
export function observeQuery(Model, methodName, query) {
|
||||
return Rx.Observable.fromNodeCallback(Model[methodName], Model)(query);
|
||||
}
|
||||
|
||||
// observeMethod(
|
||||
// context: Object, methodName: String
|
||||
// ) => (query: Any) => Observable
|
||||
export function observeMethod(context, methodName) {
|
||||
return Rx.Observable.fromNodeCallback(context[methodName], context);
|
||||
}
|
||||
|
||||
// must be bound to an observable instance
|
||||
// timeCache(amount: Number, unit: String) => Observable
|
||||
export function timeCache(time, unit) {
|
||||
const source = this;
|
||||
let cache;
|
||||
let expireCacheAt;
|
||||
return Observable.create(observable => {
|
||||
// if there is no expire time set
|
||||
// or if expireCacheAt is smaller than now,
|
||||
// set new expire time in MS and create new subscription to source
|
||||
if (!expireCacheAt || expireCacheAt < Date.now()) {
|
||||
// set expire in ms;
|
||||
expireCacheAt = moment().add(time, unit).valueOf();
|
||||
cache = new AsyncSubject();
|
||||
source.subscribe(cache);
|
||||
}
|
||||
return cache.subscribe(observable);
|
||||
});
|
||||
}
|
Reference in New Issue
Block a user