| 
									
										
										
										
											2014-10-14 01:56:24 +02:00
										 |  |  | // Package event implements an event multiplexer. | 
					
						
							|  |  |  | package event | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							|  |  |  | 	"errors" | 
					
						
							| 
									
										
										
										
											2014-10-16 18:59:28 +02:00
										 |  |  | 	"fmt" | 
					
						
							| 
									
										
										
										
											2014-10-14 01:56:24 +02:00
										 |  |  | 	"reflect" | 
					
						
							|  |  |  | 	"sync" | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-10-16 18:07:27 +02:00
										 |  |  | // Subscription is implemented by event subscriptions. | 
					
						
							| 
									
										
										
										
											2014-10-14 01:56:24 +02:00
										 |  |  | type Subscription interface { | 
					
						
							| 
									
										
										
										
											2014-10-16 18:07:27 +02:00
										 |  |  | 	// Chan returns a channel that carries events. | 
					
						
							|  |  |  | 	// Implementations should return the same channel | 
					
						
							|  |  |  | 	// for any subsequent calls to Chan. | 
					
						
							| 
									
										
										
										
											2014-10-14 01:56:24 +02:00
										 |  |  | 	Chan() <-chan interface{} | 
					
						
							| 
									
										
										
										
											2014-10-16 18:07:27 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	// Unsubscribe stops delivery of events to a subscription. | 
					
						
							|  |  |  | 	// The event channel is closed. | 
					
						
							|  |  |  | 	// Unsubscribe can be called more than once. | 
					
						
							| 
									
										
										
										
											2014-10-14 01:56:24 +02:00
										 |  |  | 	Unsubscribe() | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // A TypeMux dispatches events to registered receivers. Receivers can be | 
					
						
							|  |  |  | // registered to handle events of certain type. Any operation | 
					
						
							|  |  |  | // called after mux is stopped will return ErrMuxClosed. | 
					
						
							| 
									
										
										
										
											2014-10-16 18:10:09 +02:00
										 |  |  | // | 
					
						
							|  |  |  | // The zero value is ready to use. | 
					
						
							| 
									
										
										
										
											2014-10-14 01:56:24 +02:00
										 |  |  | type TypeMux struct { | 
					
						
							|  |  |  | 	mutex   sync.RWMutex | 
					
						
							|  |  |  | 	subm    map[reflect.Type][]*muxsub | 
					
						
							|  |  |  | 	stopped bool | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-10-16 18:07:27 +02:00
										 |  |  | // ErrMuxClosed is returned when Posting on a closed TypeMux. | 
					
						
							| 
									
										
										
										
											2014-10-14 01:56:24 +02:00
										 |  |  | var ErrMuxClosed = errors.New("event: mux closed") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Subscribe creates a subscription for events of the given types. The | 
					
						
							|  |  |  | // subscription's channel is closed when it is unsubscribed | 
					
						
							|  |  |  | // or the mux is closed. | 
					
						
							|  |  |  | func (mux *TypeMux) Subscribe(types ...interface{}) Subscription { | 
					
						
							|  |  |  | 	sub := newsub(mux) | 
					
						
							|  |  |  | 	mux.mutex.Lock() | 
					
						
							| 
									
										
										
										
											2014-10-16 18:59:28 +02:00
										 |  |  | 	defer mux.mutex.Unlock() | 
					
						
							| 
									
										
										
										
											2014-10-14 01:56:24 +02:00
										 |  |  | 	if mux.stopped { | 
					
						
							|  |  |  | 		close(sub.postC) | 
					
						
							|  |  |  | 	} else { | 
					
						
							| 
									
										
										
										
											2014-10-16 18:10:09 +02:00
										 |  |  | 		if mux.subm == nil { | 
					
						
							|  |  |  | 			mux.subm = make(map[reflect.Type][]*muxsub) | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2014-10-14 01:56:24 +02:00
										 |  |  | 		for _, t := range types { | 
					
						
							|  |  |  | 			rtyp := reflect.TypeOf(t) | 
					
						
							|  |  |  | 			oldsubs := mux.subm[rtyp] | 
					
						
							| 
									
										
										
										
											2014-10-16 18:59:28 +02:00
										 |  |  | 			if find(oldsubs, sub) != -1 { | 
					
						
							|  |  |  | 				panic(fmt.Sprintf("event: duplicate type %s in Subscribe", rtyp)) | 
					
						
							|  |  |  | 			} | 
					
						
							| 
									
										
										
										
											2014-10-14 01:56:24 +02:00
										 |  |  | 			subs := make([]*muxsub, len(oldsubs)+1) | 
					
						
							|  |  |  | 			copy(subs, oldsubs) | 
					
						
							|  |  |  | 			subs[len(oldsubs)] = sub | 
					
						
							|  |  |  | 			mux.subm[rtyp] = subs | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return sub | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Post sends an event to all receivers registered for the given type. | 
					
						
							|  |  |  | // It returns ErrMuxClosed if the mux has been stopped. | 
					
						
							|  |  |  | func (mux *TypeMux) Post(ev interface{}) error { | 
					
						
							|  |  |  | 	rtyp := reflect.TypeOf(ev) | 
					
						
							|  |  |  | 	mux.mutex.RLock() | 
					
						
							|  |  |  | 	if mux.stopped { | 
					
						
							|  |  |  | 		mux.mutex.RUnlock() | 
					
						
							|  |  |  | 		return ErrMuxClosed | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	subs := mux.subm[rtyp] | 
					
						
							|  |  |  | 	mux.mutex.RUnlock() | 
					
						
							|  |  |  | 	for _, sub := range subs { | 
					
						
							|  |  |  | 		sub.deliver(ev) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Stop closes a mux. The mux can no longer be used. | 
					
						
							|  |  |  | // Future Post calls will fail with ErrMuxClosed. | 
					
						
							|  |  |  | // Stop blocks until all current deliveries have finished. | 
					
						
							|  |  |  | func (mux *TypeMux) Stop() { | 
					
						
							|  |  |  | 	mux.mutex.Lock() | 
					
						
							|  |  |  | 	for _, subs := range mux.subm { | 
					
						
							|  |  |  | 		for _, sub := range subs { | 
					
						
							|  |  |  | 			sub.closewait() | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	mux.subm = nil | 
					
						
							|  |  |  | 	mux.stopped = true | 
					
						
							|  |  |  | 	mux.mutex.Unlock() | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (mux *TypeMux) del(s *muxsub) { | 
					
						
							|  |  |  | 	mux.mutex.Lock() | 
					
						
							|  |  |  | 	for typ, subs := range mux.subm { | 
					
						
							|  |  |  | 		if pos := find(subs, s); pos >= 0 { | 
					
						
							|  |  |  | 			if len(subs) == 1 { | 
					
						
							|  |  |  | 				delete(mux.subm, typ) | 
					
						
							|  |  |  | 			} else { | 
					
						
							|  |  |  | 				mux.subm[typ] = posdelete(subs, pos) | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	s.mux.mutex.Unlock() | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func find(slice []*muxsub, item *muxsub) int { | 
					
						
							|  |  |  | 	for i, v := range slice { | 
					
						
							|  |  |  | 		if v == item { | 
					
						
							|  |  |  | 			return i | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return -1 | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func posdelete(slice []*muxsub, pos int) []*muxsub { | 
					
						
							|  |  |  | 	news := make([]*muxsub, len(slice)-1) | 
					
						
							|  |  |  | 	copy(news[:pos], slice[:pos]) | 
					
						
							|  |  |  | 	copy(news[pos:], slice[pos+1:]) | 
					
						
							|  |  |  | 	return news | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | type muxsub struct { | 
					
						
							|  |  |  | 	mux     *TypeMux | 
					
						
							| 
									
										
										
										
											2014-10-16 18:08:48 +02:00
										 |  |  | 	closeMu sync.Mutex | 
					
						
							| 
									
										
										
										
											2014-10-14 01:56:24 +02:00
										 |  |  | 	closing chan struct{} | 
					
						
							| 
									
										
										
										
											2014-10-16 18:08:48 +02:00
										 |  |  | 	closed  bool | 
					
						
							| 
									
										
										
										
											2014-10-14 01:56:24 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	// these two are the same channel. they are stored separately so | 
					
						
							|  |  |  | 	// postC can be set to nil without affecting the return value of | 
					
						
							|  |  |  | 	// Chan. | 
					
						
							| 
									
										
										
										
											2014-10-16 18:08:48 +02:00
										 |  |  | 	postMu sync.RWMutex | 
					
						
							|  |  |  | 	readC  <-chan interface{} | 
					
						
							|  |  |  | 	postC  chan<- interface{} | 
					
						
							| 
									
										
										
										
											2014-10-14 01:56:24 +02:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func newsub(mux *TypeMux) *muxsub { | 
					
						
							|  |  |  | 	c := make(chan interface{}) | 
					
						
							|  |  |  | 	return &muxsub{ | 
					
						
							|  |  |  | 		mux:     mux, | 
					
						
							|  |  |  | 		readC:   c, | 
					
						
							|  |  |  | 		postC:   c, | 
					
						
							|  |  |  | 		closing: make(chan struct{}), | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (s *muxsub) Chan() <-chan interface{} { | 
					
						
							|  |  |  | 	return s.readC | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (s *muxsub) Unsubscribe() { | 
					
						
							|  |  |  | 	s.mux.del(s) | 
					
						
							|  |  |  | 	s.closewait() | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (s *muxsub) closewait() { | 
					
						
							| 
									
										
										
										
											2014-10-16 18:08:48 +02:00
										 |  |  | 	s.closeMu.Lock() | 
					
						
							|  |  |  | 	defer s.closeMu.Unlock() | 
					
						
							|  |  |  | 	if s.closed { | 
					
						
							|  |  |  | 		return | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2014-10-14 01:56:24 +02:00
										 |  |  | 	close(s.closing) | 
					
						
							| 
									
										
										
										
											2014-10-16 18:08:48 +02:00
										 |  |  | 	s.closed = true | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	s.postMu.Lock() | 
					
						
							| 
									
										
										
										
											2014-10-14 01:56:24 +02:00
										 |  |  | 	close(s.postC) | 
					
						
							|  |  |  | 	s.postC = nil | 
					
						
							| 
									
										
										
										
											2014-10-16 18:08:48 +02:00
										 |  |  | 	s.postMu.Unlock() | 
					
						
							| 
									
										
										
										
											2014-10-14 01:56:24 +02:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (s *muxsub) deliver(ev interface{}) { | 
					
						
							| 
									
										
										
										
											2014-10-16 18:08:48 +02:00
										 |  |  | 	s.postMu.RLock() | 
					
						
							| 
									
										
										
										
											2014-10-14 01:56:24 +02:00
										 |  |  | 	select { | 
					
						
							|  |  |  | 	case s.postC <- ev: | 
					
						
							|  |  |  | 	case <-s.closing: | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2014-10-16 18:08:48 +02:00
										 |  |  | 	s.postMu.RUnlock() | 
					
						
							| 
									
										
										
										
											2014-10-14 01:56:24 +02:00
										 |  |  | } |