| 
									
										
										
										
											2017-04-14 10:29:00 +02:00
										 |  |  | // Copyright 2016 The go-ethereum Authors | 
					
						
							| 
									
										
										
										
											2016-07-19 01:39:12 +02:00
										 |  |  | // This file is part of the go-ethereum library. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // The go-ethereum library is free software: you can redistribute it and/or modify | 
					
						
							|  |  |  | // it under the terms of the GNU Lesser General Public License as published by | 
					
						
							|  |  |  | // the Free Software Foundation, either version 3 of the License, or | 
					
						
							|  |  |  | // (at your option) any later version. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // The go-ethereum library is distributed in the hope that it will be useful, | 
					
						
							|  |  |  | // but WITHOUT ANY WARRANTY; without even the implied warranty of | 
					
						
							|  |  |  | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | 
					
						
							|  |  |  | // GNU Lesser General Public License for more details. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // You should have received a copy of the GNU Lesser General Public License | 
					
						
							|  |  |  | // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | package event | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							| 
									
										
										
										
											2017-03-22 18:20:33 +01:00
										 |  |  | 	"context" | 
					
						
							| 
									
										
										
										
											2016-07-19 01:39:12 +02:00
										 |  |  | 	"sync" | 
					
						
							|  |  |  | 	"time" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	"github.com/ethereum/go-ethereum/common/mclock" | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Subscription represents a stream of events. The carrier of the events is typically a | 
					
						
							|  |  |  | // channel, but isn't part of the interface. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // Subscriptions can fail while established. Failures are reported through an error | 
					
						
							|  |  |  | // channel. It receives a value if there is an issue with the subscription (e.g. the | 
					
						
							|  |  |  | // network connection delivering the events has been closed). Only one value will ever be | 
					
						
							|  |  |  | // sent. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // The error channel is closed when the subscription ends successfully (i.e. when the | 
					
						
							|  |  |  | // source of events is closed). It is also closed when Unsubscribe is called. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // The Unsubscribe method cancels the sending of events. You must call Unsubscribe in all | 
					
						
							|  |  |  | // cases to ensure that resources related to the subscription are released. It can be | 
					
						
							|  |  |  | // called any number of times. | 
					
						
							|  |  |  | type Subscription interface { | 
					
						
							|  |  |  | 	Err() <-chan error // returns the error channel | 
					
						
							|  |  |  | 	Unsubscribe()      // cancels sending of events, closing the error channel | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-01-26 11:57:31 +01:00
										 |  |  | // NewSubscription runs a producer function as a subscription in a new goroutine. The | 
					
						
							|  |  |  | // channel given to the producer is closed when Unsubscribe is called. If fn returns an | 
					
						
							|  |  |  | // error, it is sent on the subscription's error channel. | 
					
						
							|  |  |  | func NewSubscription(producer func(<-chan struct{}) error) Subscription { | 
					
						
							| 
									
										
										
										
											2016-07-19 01:39:12 +02:00
										 |  |  | 	s := &funcSub{unsub: make(chan struct{}), err: make(chan error, 1)} | 
					
						
							|  |  |  | 	go func() { | 
					
						
							|  |  |  | 		defer close(s.err) | 
					
						
							| 
									
										
										
										
											2017-01-26 11:57:31 +01:00
										 |  |  | 		err := producer(s.unsub) | 
					
						
							| 
									
										
										
										
											2016-07-19 01:39:12 +02:00
										 |  |  | 		s.mu.Lock() | 
					
						
							|  |  |  | 		defer s.mu.Unlock() | 
					
						
							|  |  |  | 		if !s.unsubscribed { | 
					
						
							|  |  |  | 			if err != nil { | 
					
						
							|  |  |  | 				s.err <- err | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			s.unsubscribed = true | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	}() | 
					
						
							|  |  |  | 	return s | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | type funcSub struct { | 
					
						
							|  |  |  | 	unsub        chan struct{} | 
					
						
							|  |  |  | 	err          chan error | 
					
						
							|  |  |  | 	mu           sync.Mutex | 
					
						
							|  |  |  | 	unsubscribed bool | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (s *funcSub) Unsubscribe() { | 
					
						
							|  |  |  | 	s.mu.Lock() | 
					
						
							|  |  |  | 	if s.unsubscribed { | 
					
						
							|  |  |  | 		s.mu.Unlock() | 
					
						
							|  |  |  | 		return | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	s.unsubscribed = true | 
					
						
							|  |  |  | 	close(s.unsub) | 
					
						
							|  |  |  | 	s.mu.Unlock() | 
					
						
							|  |  |  | 	// Wait for producer shutdown. | 
					
						
							|  |  |  | 	<-s.err | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (s *funcSub) Err() <-chan error { | 
					
						
							|  |  |  | 	return s.err | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Resubscribe calls fn repeatedly to keep a subscription established. When the | 
					
						
							|  |  |  | // subscription is established, Resubscribe waits for it to fail and calls fn again. This | 
					
						
							|  |  |  | // process repeats until Unsubscribe is called or the active subscription ends | 
					
						
							|  |  |  | // successfully. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // Resubscribe applies backoff between calls to fn. The time between calls is adapted | 
					
						
							|  |  |  | // based on the error rate, but will never exceed backoffMax. | 
					
						
							|  |  |  | func Resubscribe(backoffMax time.Duration, fn ResubscribeFunc) Subscription { | 
					
						
							|  |  |  | 	s := &resubscribeSub{ | 
					
						
							|  |  |  | 		waitTime:   backoffMax / 10, | 
					
						
							|  |  |  | 		backoffMax: backoffMax, | 
					
						
							|  |  |  | 		fn:         fn, | 
					
						
							|  |  |  | 		err:        make(chan error), | 
					
						
							|  |  |  | 		unsub:      make(chan struct{}), | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	go s.loop() | 
					
						
							|  |  |  | 	return s | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // A ResubscribeFunc attempts to establish a subscription. | 
					
						
							|  |  |  | type ResubscribeFunc func(context.Context) (Subscription, error) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | type resubscribeSub struct { | 
					
						
							|  |  |  | 	fn                   ResubscribeFunc | 
					
						
							|  |  |  | 	err                  chan error | 
					
						
							|  |  |  | 	unsub                chan struct{} | 
					
						
							|  |  |  | 	unsubOnce            sync.Once | 
					
						
							|  |  |  | 	lastTry              mclock.AbsTime | 
					
						
							|  |  |  | 	waitTime, backoffMax time.Duration | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (s *resubscribeSub) Unsubscribe() { | 
					
						
							|  |  |  | 	s.unsubOnce.Do(func() { | 
					
						
							|  |  |  | 		s.unsub <- struct{}{} | 
					
						
							|  |  |  | 		<-s.err | 
					
						
							|  |  |  | 	}) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (s *resubscribeSub) Err() <-chan error { | 
					
						
							|  |  |  | 	return s.err | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (s *resubscribeSub) loop() { | 
					
						
							|  |  |  | 	defer close(s.err) | 
					
						
							|  |  |  | 	var done bool | 
					
						
							|  |  |  | 	for !done { | 
					
						
							|  |  |  | 		sub := s.subscribe() | 
					
						
							|  |  |  | 		if sub == nil { | 
					
						
							|  |  |  | 			break | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		done = s.waitForError(sub) | 
					
						
							|  |  |  | 		sub.Unsubscribe() | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (s *resubscribeSub) subscribe() Subscription { | 
					
						
							|  |  |  | 	subscribed := make(chan error) | 
					
						
							|  |  |  | 	var sub Subscription | 
					
						
							|  |  |  | 	for { | 
					
						
							|  |  |  | 		s.lastTry = mclock.Now() | 
					
						
							|  |  |  | 		ctx, cancel := context.WithCancel(context.Background()) | 
					
						
							|  |  |  | 		go func() { | 
					
						
							|  |  |  | 			rsub, err := s.fn(ctx) | 
					
						
							|  |  |  | 			sub = rsub | 
					
						
							|  |  |  | 			subscribed <- err | 
					
						
							|  |  |  | 		}() | 
					
						
							|  |  |  | 		select { | 
					
						
							|  |  |  | 		case err := <-subscribed: | 
					
						
							|  |  |  | 			cancel() | 
					
						
							| 
									
										
										
										
											2020-02-12 22:19:47 +08:00
										 |  |  | 			if err == nil { | 
					
						
							|  |  |  | 				if sub == nil { | 
					
						
							|  |  |  | 					panic("event: ResubscribeFunc returned nil subscription and no error") | 
					
						
							| 
									
										
										
										
											2016-07-19 01:39:12 +02:00
										 |  |  | 				} | 
					
						
							| 
									
										
										
										
											2020-02-12 22:19:47 +08:00
										 |  |  | 				return sub | 
					
						
							| 
									
										
										
										
											2016-07-19 01:39:12 +02:00
										 |  |  | 			} | 
					
						
							| 
									
										
										
										
											2020-02-12 22:19:47 +08:00
										 |  |  | 			// Subscribing failed, wait before launching the next try. | 
					
						
							|  |  |  | 			if s.backoffWait() { | 
					
						
							|  |  |  | 				return nil // unsubscribed during wait | 
					
						
							| 
									
										
										
										
											2016-07-19 01:39:12 +02:00
										 |  |  | 			} | 
					
						
							|  |  |  | 		case <-s.unsub: | 
					
						
							|  |  |  | 			cancel() | 
					
						
							| 
									
										
										
										
											2020-02-12 22:19:47 +08:00
										 |  |  | 			<-subscribed // avoid leaking the s.fn goroutine. | 
					
						
							| 
									
										
										
										
											2016-07-19 01:39:12 +02:00
										 |  |  | 			return nil | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (s *resubscribeSub) waitForError(sub Subscription) bool { | 
					
						
							|  |  |  | 	defer sub.Unsubscribe() | 
					
						
							|  |  |  | 	select { | 
					
						
							|  |  |  | 	case err := <-sub.Err(): | 
					
						
							|  |  |  | 		return err == nil | 
					
						
							|  |  |  | 	case <-s.unsub: | 
					
						
							|  |  |  | 		return true | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (s *resubscribeSub) backoffWait() bool { | 
					
						
							|  |  |  | 	if time.Duration(mclock.Now()-s.lastTry) > s.backoffMax { | 
					
						
							|  |  |  | 		s.waitTime = s.backoffMax / 10 | 
					
						
							|  |  |  | 	} else { | 
					
						
							|  |  |  | 		s.waitTime *= 2 | 
					
						
							|  |  |  | 		if s.waitTime > s.backoffMax { | 
					
						
							|  |  |  | 			s.waitTime = s.backoffMax | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	t := time.NewTimer(s.waitTime) | 
					
						
							|  |  |  | 	defer t.Stop() | 
					
						
							|  |  |  | 	select { | 
					
						
							|  |  |  | 	case <-t.C: | 
					
						
							|  |  |  | 		return false | 
					
						
							|  |  |  | 	case <-s.unsub: | 
					
						
							|  |  |  | 		return true | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // SubscriptionScope provides a facility to unsubscribe multiple subscriptions at once. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // For code that handle more than one subscription, a scope can be used to conveniently | 
					
						
							|  |  |  | // unsubscribe all of them with a single call. The example demonstrates a typical use in a | 
					
						
							|  |  |  | // larger program. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // The zero value is ready to use. | 
					
						
							|  |  |  | type SubscriptionScope struct { | 
					
						
							|  |  |  | 	mu     sync.Mutex | 
					
						
							|  |  |  | 	subs   map[*scopeSub]struct{} | 
					
						
							|  |  |  | 	closed bool | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | type scopeSub struct { | 
					
						
							|  |  |  | 	sc *SubscriptionScope | 
					
						
							|  |  |  | 	s  Subscription | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Track starts tracking a subscription. If the scope is closed, Track returns nil. The | 
					
						
							|  |  |  | // returned subscription is a wrapper. Unsubscribing the wrapper removes it from the | 
					
						
							|  |  |  | // scope. | 
					
						
							|  |  |  | func (sc *SubscriptionScope) Track(s Subscription) Subscription { | 
					
						
							|  |  |  | 	sc.mu.Lock() | 
					
						
							|  |  |  | 	defer sc.mu.Unlock() | 
					
						
							|  |  |  | 	if sc.closed { | 
					
						
							|  |  |  | 		return nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if sc.subs == nil { | 
					
						
							|  |  |  | 		sc.subs = make(map[*scopeSub]struct{}) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	ss := &scopeSub{sc, s} | 
					
						
							|  |  |  | 	sc.subs[ss] = struct{}{} | 
					
						
							|  |  |  | 	return ss | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Close calls Unsubscribe on all tracked subscriptions and prevents further additions to | 
					
						
							|  |  |  | // the tracked set. Calls to Track after Close return nil. | 
					
						
							|  |  |  | func (sc *SubscriptionScope) Close() { | 
					
						
							|  |  |  | 	sc.mu.Lock() | 
					
						
							|  |  |  | 	defer sc.mu.Unlock() | 
					
						
							|  |  |  | 	if sc.closed { | 
					
						
							|  |  |  | 		return | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	sc.closed = true | 
					
						
							|  |  |  | 	for s := range sc.subs { | 
					
						
							|  |  |  | 		s.s.Unsubscribe() | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	sc.subs = nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Count returns the number of tracked subscriptions. | 
					
						
							|  |  |  | // It is meant to be used for debugging. | 
					
						
							|  |  |  | func (sc *SubscriptionScope) Count() int { | 
					
						
							|  |  |  | 	sc.mu.Lock() | 
					
						
							|  |  |  | 	defer sc.mu.Unlock() | 
					
						
							|  |  |  | 	return len(sc.subs) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (s *scopeSub) Unsubscribe() { | 
					
						
							|  |  |  | 	s.s.Unsubscribe() | 
					
						
							|  |  |  | 	s.sc.mu.Lock() | 
					
						
							|  |  |  | 	defer s.sc.mu.Unlock() | 
					
						
							|  |  |  | 	delete(s.sc.subs, s) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (s *scopeSub) Err() <-chan error { | 
					
						
							|  |  |  | 	return s.s.Err() | 
					
						
							|  |  |  | } |