| 
									
										
										
										
											2014-07-04 19:38:53 +01:00
										 |  |  | package ethreact | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							|  |  |  | 	"sync" | 
					
						
							| 
									
										
										
										
											2014-08-23 19:00:52 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	"github.com/ethereum/eth-go/ethlog" | 
					
						
							| 
									
										
										
										
											2014-07-04 19:38:53 +01:00
										 |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | var logger = ethlog.NewLogger("REACTOR") | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-07-14 18:40:18 +01:00
										 |  |  | const ( | 
					
						
							|  |  |  | 	eventBufferSize int = 10 | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-07-04 19:38:53 +01:00
										 |  |  | type EventHandler struct { | 
					
						
							|  |  |  | 	lock  sync.RWMutex | 
					
						
							|  |  |  | 	name  string | 
					
						
							|  |  |  | 	chans []chan Event | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Post the Event with the reactor resource on the channels | 
					
						
							|  |  |  | // currently subscribed to the event | 
					
						
							|  |  |  | func (e *EventHandler) Post(event Event) { | 
					
						
							|  |  |  | 	e.lock.RLock() | 
					
						
							|  |  |  | 	defer e.lock.RUnlock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// if we want to preserve order pushing to subscibed channels | 
					
						
							|  |  |  | 	// dispatching should be syncrounous | 
					
						
							| 
									
										
										
										
											2014-07-14 18:40:18 +01:00
										 |  |  | 	// this means if subscribed event channel is blocked | 
					
						
							| 
									
										
										
										
											2014-07-04 19:38:53 +01:00
										 |  |  | 	// the reactor dispatch will be blocked, so we need to mitigate by skipping | 
					
						
							|  |  |  | 	// rogue blocking subscribers | 
					
						
							|  |  |  | 	for i, ch := range e.chans { | 
					
						
							|  |  |  | 		select { | 
					
						
							|  |  |  | 		case ch <- event: | 
					
						
							|  |  |  | 		default: | 
					
						
							| 
									
										
										
										
											2014-08-23 19:00:52 +02:00
										 |  |  | 			logger.Debugf("subscribing channel %d to event %s blocked. skipping\n", i, event.Name) | 
					
						
							| 
									
										
										
										
											2014-07-04 19:38:53 +01:00
										 |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Add a subscriber to this event | 
					
						
							|  |  |  | func (e *EventHandler) Add(ch chan Event) { | 
					
						
							|  |  |  | 	e.lock.Lock() | 
					
						
							|  |  |  | 	defer e.lock.Unlock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	e.chans = append(e.chans, ch) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Remove a subscriber | 
					
						
							|  |  |  | func (e *EventHandler) Remove(ch chan Event) int { | 
					
						
							|  |  |  | 	e.lock.Lock() | 
					
						
							|  |  |  | 	defer e.lock.Unlock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	for i, c := range e.chans { | 
					
						
							|  |  |  | 		if c == ch { | 
					
						
							|  |  |  | 			e.chans = append(e.chans[:i], e.chans[i+1:]...) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return len(e.chans) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-07-21 15:10:56 +01:00
										 |  |  | // Basic reactor event | 
					
						
							| 
									
										
										
										
											2014-07-04 19:38:53 +01:00
										 |  |  | type Event struct { | 
					
						
							|  |  |  | 	Resource interface{} | 
					
						
							|  |  |  | 	Name     string | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // The reactor basic engine. Acts as bridge | 
					
						
							|  |  |  | // between the events and the subscribers/posters | 
					
						
							|  |  |  | type ReactorEngine struct { | 
					
						
							| 
									
										
										
										
											2014-07-14 18:40:18 +01:00
										 |  |  | 	lock          sync.RWMutex | 
					
						
							|  |  |  | 	eventChannel  chan Event | 
					
						
							|  |  |  | 	eventHandlers map[string]*EventHandler | 
					
						
							|  |  |  | 	quit          chan chan error | 
					
						
							|  |  |  | 	running       bool | 
					
						
							|  |  |  | 	drained       chan bool | 
					
						
							| 
									
										
										
										
											2014-07-04 19:38:53 +01:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func New() *ReactorEngine { | 
					
						
							|  |  |  | 	return &ReactorEngine{ | 
					
						
							| 
									
										
										
										
											2014-07-14 18:40:18 +01:00
										 |  |  | 		eventHandlers: make(map[string]*EventHandler), | 
					
						
							|  |  |  | 		eventChannel:  make(chan Event, eventBufferSize), | 
					
						
							|  |  |  | 		quit:          make(chan chan error, 1), | 
					
						
							|  |  |  | 		drained:       make(chan bool, 1), | 
					
						
							| 
									
										
										
										
											2014-07-04 19:38:53 +01:00
										 |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (reactor *ReactorEngine) Start() { | 
					
						
							|  |  |  | 	reactor.lock.Lock() | 
					
						
							|  |  |  | 	defer reactor.lock.Unlock() | 
					
						
							|  |  |  | 	if !reactor.running { | 
					
						
							|  |  |  | 		go func() { | 
					
						
							|  |  |  | 			for { | 
					
						
							|  |  |  | 				select { | 
					
						
							| 
									
										
										
										
											2014-07-14 18:40:18 +01:00
										 |  |  | 				case status := <-reactor.quit: | 
					
						
							|  |  |  | 					reactor.lock.Lock() | 
					
						
							|  |  |  | 					defer reactor.lock.Unlock() | 
					
						
							|  |  |  | 					reactor.running = false | 
					
						
							|  |  |  | 					logger.Infoln("stopped") | 
					
						
							|  |  |  | 					status <- nil | 
					
						
							|  |  |  | 					return | 
					
						
							| 
									
										
										
										
											2014-07-04 19:38:53 +01:00
										 |  |  | 				case event := <-reactor.eventChannel: | 
					
						
							|  |  |  | 					// needs to be called syncronously to keep order of events | 
					
						
							|  |  |  | 					reactor.dispatch(event) | 
					
						
							|  |  |  | 				default: | 
					
						
							| 
									
										
										
										
											2014-07-05 19:56:01 +01:00
										 |  |  | 					reactor.drained <- true // blocking till message is coming in | 
					
						
							| 
									
										
										
										
											2014-07-04 19:38:53 +01:00
										 |  |  | 				} | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		}() | 
					
						
							|  |  |  | 		reactor.running = true | 
					
						
							|  |  |  | 		logger.Infoln("started") | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (reactor *ReactorEngine) Stop() { | 
					
						
							|  |  |  | 	if reactor.running { | 
					
						
							| 
									
										
										
										
											2014-07-14 18:40:18 +01:00
										 |  |  | 		status := make(chan error) | 
					
						
							|  |  |  | 		reactor.quit <- status | 
					
						
							| 
									
										
										
										
											2014-07-05 19:56:01 +01:00
										 |  |  | 		select { | 
					
						
							|  |  |  | 		case <-reactor.drained: | 
					
						
							| 
									
										
										
										
											2014-07-14 18:40:18 +01:00
										 |  |  | 		default: | 
					
						
							| 
									
										
										
										
											2014-07-05 19:56:01 +01:00
										 |  |  | 		} | 
					
						
							| 
									
										
										
										
											2014-07-14 18:40:18 +01:00
										 |  |  | 		<-status | 
					
						
							| 
									
										
										
										
											2014-07-04 19:38:53 +01:00
										 |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (reactor *ReactorEngine) Flush() { | 
					
						
							| 
									
										
										
										
											2014-07-05 19:56:01 +01:00
										 |  |  | 	<-reactor.drained | 
					
						
							| 
									
										
										
										
											2014-07-04 19:38:53 +01:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Subscribe a channel to the specified event | 
					
						
							|  |  |  | func (reactor *ReactorEngine) Subscribe(event string, eventChannel chan Event) { | 
					
						
							|  |  |  | 	reactor.lock.Lock() | 
					
						
							|  |  |  | 	defer reactor.lock.Unlock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	eventHandler := reactor.eventHandlers[event] | 
					
						
							|  |  |  | 	// Create a new event handler if one isn't available | 
					
						
							|  |  |  | 	if eventHandler == nil { | 
					
						
							|  |  |  | 		eventHandler = &EventHandler{name: event} | 
					
						
							|  |  |  | 		reactor.eventHandlers[event] = eventHandler | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	// Add the events channel to reactor event handler | 
					
						
							|  |  |  | 	eventHandler.Add(eventChannel) | 
					
						
							| 
									
										
										
										
											2014-07-05 19:56:01 +01:00
										 |  |  | 	logger.Debugf("added new subscription to %s", event) | 
					
						
							| 
									
										
										
										
											2014-07-04 19:38:53 +01:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (reactor *ReactorEngine) Unsubscribe(event string, eventChannel chan Event) { | 
					
						
							|  |  |  | 	reactor.lock.Lock() | 
					
						
							|  |  |  | 	defer reactor.lock.Unlock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	eventHandler := reactor.eventHandlers[event] | 
					
						
							|  |  |  | 	if eventHandler != nil { | 
					
						
							|  |  |  | 		len := eventHandler.Remove(eventChannel) | 
					
						
							|  |  |  | 		if len == 0 { | 
					
						
							|  |  |  | 			reactor.eventHandlers[event] = nil | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2014-07-05 19:56:01 +01:00
										 |  |  | 		logger.Debugf("removed subscription to %s", event) | 
					
						
							| 
									
										
										
										
											2014-07-04 19:38:53 +01:00
										 |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (reactor *ReactorEngine) Post(event string, resource interface{}) { | 
					
						
							|  |  |  | 	reactor.lock.Lock() | 
					
						
							|  |  |  | 	defer reactor.lock.Unlock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	if reactor.running { | 
					
						
							|  |  |  | 		reactor.eventChannel <- Event{Resource: resource, Name: event} | 
					
						
							| 
									
										
										
										
											2014-07-05 19:56:01 +01:00
										 |  |  | 		select { | 
					
						
							|  |  |  | 		case <-reactor.drained: | 
					
						
							| 
									
										
										
										
											2014-07-14 18:40:18 +01:00
										 |  |  | 		default: | 
					
						
							| 
									
										
										
										
											2014-07-05 19:56:01 +01:00
										 |  |  | 		} | 
					
						
							| 
									
										
										
										
											2014-07-04 19:38:53 +01:00
										 |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (reactor *ReactorEngine) dispatch(event Event) { | 
					
						
							|  |  |  | 	name := event.Name | 
					
						
							|  |  |  | 	eventHandler := reactor.eventHandlers[name] | 
					
						
							|  |  |  | 	// if no subscriptions to this event type - no event handler created | 
					
						
							|  |  |  | 	// then noone to notify | 
					
						
							|  |  |  | 	if eventHandler != nil { | 
					
						
							|  |  |  | 		// needs to be called syncronously | 
					
						
							|  |  |  | 		eventHandler.Post(event) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } |