277 lines
		
	
	
		
			7.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
		
		
			
		
	
	
			277 lines
		
	
	
		
			7.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
|   | package p2p | ||
|  | 
 | ||
|  | import ( | ||
|  | 	"container/heap" | ||
|  | 	"crypto/rand" | ||
|  | 	"fmt" | ||
|  | 	"net" | ||
|  | 	"time" | ||
|  | 
 | ||
|  | 	"github.com/ethereum/go-ethereum/logger" | ||
|  | 	"github.com/ethereum/go-ethereum/logger/glog" | ||
|  | 	"github.com/ethereum/go-ethereum/p2p/discover" | ||
|  | ) | ||
|  | 
 | ||
|  | const ( | ||
|  | 	// This is the amount of time spent waiting in between | ||
|  | 	// redialing a certain node. | ||
|  | 	dialHistoryExpiration = 30 * time.Second | ||
|  | 
 | ||
|  | 	// Discovery lookup tasks will wait for this long when | ||
|  | 	// no results are returned. This can happen if the table | ||
|  | 	// becomes empty (i.e. not often). | ||
|  | 	emptyLookupDelay = 10 * time.Second | ||
|  | ) | ||
|  | 
 | ||
|  | // dialstate schedules dials and discovery lookups. | ||
|  | // it get's a chance to compute new tasks on every iteration | ||
|  | // of the main loop in Server.run. | ||
|  | type dialstate struct { | ||
|  | 	maxDynDials int | ||
|  | 	ntab        discoverTable | ||
|  | 
 | ||
|  | 	lookupRunning bool | ||
|  | 	bootstrapped  bool | ||
|  | 
 | ||
|  | 	dialing     map[discover.NodeID]connFlag | ||
|  | 	lookupBuf   []*discover.Node // current discovery lookup results | ||
|  | 	randomNodes []*discover.Node // filled from Table | ||
|  | 	static      map[discover.NodeID]*discover.Node | ||
|  | 	hist        *dialHistory | ||
|  | } | ||
|  | 
 | ||
|  | type discoverTable interface { | ||
|  | 	Self() *discover.Node | ||
|  | 	Close() | ||
|  | 	Bootstrap([]*discover.Node) | ||
|  | 	Lookup(target discover.NodeID) []*discover.Node | ||
|  | 	ReadRandomNodes([]*discover.Node) int | ||
|  | } | ||
|  | 
 | ||
|  | // the dial history remembers recent dials. | ||
|  | type dialHistory []pastDial | ||
|  | 
 | ||
|  | // pastDial is an entry in the dial history. | ||
|  | type pastDial struct { | ||
|  | 	id  discover.NodeID | ||
|  | 	exp time.Time | ||
|  | } | ||
|  | 
 | ||
|  | type task interface { | ||
|  | 	Do(*Server) | ||
|  | } | ||
|  | 
 | ||
|  | // A dialTask is generated for each node that is dialed. | ||
|  | type dialTask struct { | ||
|  | 	flags connFlag | ||
|  | 	dest  *discover.Node | ||
|  | } | ||
|  | 
 | ||
|  | // discoverTask runs discovery table operations. | ||
|  | // Only one discoverTask is active at any time. | ||
|  | // | ||
|  | // If bootstrap is true, the task runs Table.Bootstrap, | ||
|  | // otherwise it performs a random lookup and leaves the | ||
|  | // results in the task. | ||
|  | type discoverTask struct { | ||
|  | 	bootstrap bool | ||
|  | 	results   []*discover.Node | ||
|  | } | ||
|  | 
 | ||
|  | // A waitExpireTask is generated if there are no other tasks | ||
|  | // to keep the loop in Server.run ticking. | ||
|  | type waitExpireTask struct { | ||
|  | 	time.Duration | ||
|  | } | ||
|  | 
 | ||
|  | func newDialState(static []*discover.Node, ntab discoverTable, maxdyn int) *dialstate { | ||
|  | 	s := &dialstate{ | ||
|  | 		maxDynDials: maxdyn, | ||
|  | 		ntab:        ntab, | ||
|  | 		static:      make(map[discover.NodeID]*discover.Node), | ||
|  | 		dialing:     make(map[discover.NodeID]connFlag), | ||
|  | 		randomNodes: make([]*discover.Node, maxdyn/2), | ||
|  | 		hist:        new(dialHistory), | ||
|  | 	} | ||
|  | 	for _, n := range static { | ||
|  | 		s.static[n.ID] = n | ||
|  | 	} | ||
|  | 	return s | ||
|  | } | ||
|  | 
 | ||
|  | func (s *dialstate) addStatic(n *discover.Node) { | ||
|  | 	s.static[n.ID] = n | ||
|  | } | ||
|  | 
 | ||
|  | func (s *dialstate) newTasks(nRunning int, peers map[discover.NodeID]*Peer, now time.Time) []task { | ||
|  | 	var newtasks []task | ||
|  | 	addDial := func(flag connFlag, n *discover.Node) bool { | ||
|  | 		_, dialing := s.dialing[n.ID] | ||
|  | 		if dialing || peers[n.ID] != nil || s.hist.contains(n.ID) { | ||
|  | 			return false | ||
|  | 		} | ||
|  | 		s.dialing[n.ID] = flag | ||
|  | 		newtasks = append(newtasks, &dialTask{flags: flag, dest: n}) | ||
|  | 		return true | ||
|  | 	} | ||
|  | 
 | ||
|  | 	// Compute number of dynamic dials necessary at this point. | ||
|  | 	needDynDials := s.maxDynDials | ||
|  | 	for _, p := range peers { | ||
|  | 		if p.rw.is(dynDialedConn) { | ||
|  | 			needDynDials-- | ||
|  | 		} | ||
|  | 	} | ||
|  | 	for _, flag := range s.dialing { | ||
|  | 		if flag&dynDialedConn != 0 { | ||
|  | 			needDynDials-- | ||
|  | 		} | ||
|  | 	} | ||
|  | 
 | ||
|  | 	// Expire the dial history on every invocation. | ||
|  | 	s.hist.expire(now) | ||
|  | 
 | ||
|  | 	// Create dials for static nodes if they are not connected. | ||
|  | 	for _, n := range s.static { | ||
|  | 		addDial(staticDialedConn, n) | ||
|  | 	} | ||
|  | 
 | ||
|  | 	// Use random nodes from the table for half of the necessary | ||
|  | 	// dynamic dials. | ||
|  | 	randomCandidates := needDynDials / 2 | ||
|  | 	if randomCandidates > 0 && s.bootstrapped { | ||
|  | 		n := s.ntab.ReadRandomNodes(s.randomNodes) | ||
|  | 		for i := 0; i < randomCandidates && i < n; i++ { | ||
|  | 			if addDial(dynDialedConn, s.randomNodes[i]) { | ||
|  | 				needDynDials-- | ||
|  | 			} | ||
|  | 		} | ||
|  | 	} | ||
|  | 	// Create dynamic dials from random lookup results, removing tried | ||
|  | 	// items from the result buffer. | ||
|  | 	i := 0 | ||
|  | 	for ; i < len(s.lookupBuf) && needDynDials > 0; i++ { | ||
|  | 		if addDial(dynDialedConn, s.lookupBuf[i]) { | ||
|  | 			needDynDials-- | ||
|  | 		} | ||
|  | 	} | ||
|  | 	s.lookupBuf = s.lookupBuf[:copy(s.lookupBuf, s.lookupBuf[i:])] | ||
|  | 	// Launch a discovery lookup if more candidates are needed. The | ||
|  | 	// first discoverTask bootstraps the table and won't return any | ||
|  | 	// results. | ||
|  | 	if len(s.lookupBuf) < needDynDials && !s.lookupRunning { | ||
|  | 		s.lookupRunning = true | ||
|  | 		newtasks = append(newtasks, &discoverTask{bootstrap: !s.bootstrapped}) | ||
|  | 	} | ||
|  | 
 | ||
|  | 	// Launch a timer to wait for the next node to expire if all | ||
|  | 	// candidates have been tried and no task is currently active. | ||
|  | 	// This should prevent cases where the dialer logic is not ticked | ||
|  | 	// because there are no pending events. | ||
|  | 	if nRunning == 0 && len(newtasks) == 0 && s.hist.Len() > 0 { | ||
|  | 		t := &waitExpireTask{s.hist.min().exp.Sub(now)} | ||
|  | 		newtasks = append(newtasks, t) | ||
|  | 	} | ||
|  | 	return newtasks | ||
|  | } | ||
|  | 
 | ||
|  | func (s *dialstate) taskDone(t task, now time.Time) { | ||
|  | 	switch t := t.(type) { | ||
|  | 	case *dialTask: | ||
|  | 		s.hist.add(t.dest.ID, now.Add(dialHistoryExpiration)) | ||
|  | 		delete(s.dialing, t.dest.ID) | ||
|  | 	case *discoverTask: | ||
|  | 		if t.bootstrap { | ||
|  | 			s.bootstrapped = true | ||
|  | 		} | ||
|  | 		s.lookupRunning = false | ||
|  | 		s.lookupBuf = append(s.lookupBuf, t.results...) | ||
|  | 	} | ||
|  | } | ||
|  | 
 | ||
|  | func (t *dialTask) Do(srv *Server) { | ||
|  | 	addr := &net.TCPAddr{IP: t.dest.IP, Port: int(t.dest.TCP)} | ||
|  | 	glog.V(logger.Debug).Infof("dialing %v\n", t.dest) | ||
|  | 	fd, err := srv.Dialer.Dial("tcp", addr.String()) | ||
|  | 	if err != nil { | ||
|  | 		glog.V(logger.Detail).Infof("dial error: %v", err) | ||
|  | 		return | ||
|  | 	} | ||
|  | 	srv.setupConn(fd, t.flags, t.dest) | ||
|  | } | ||
|  | func (t *dialTask) String() string { | ||
|  | 	return fmt.Sprintf("%v %x %v:%d", t.flags, t.dest.ID[:8], t.dest.IP, t.dest.TCP) | ||
|  | } | ||
|  | 
 | ||
|  | func (t *discoverTask) Do(srv *Server) { | ||
|  | 	if t.bootstrap { | ||
|  | 		srv.ntab.Bootstrap(srv.BootstrapNodes) | ||
|  | 	} else { | ||
|  | 		var target discover.NodeID | ||
|  | 		rand.Read(target[:]) | ||
|  | 		t.results = srv.ntab.Lookup(target) | ||
|  | 		// newTasks generates a lookup task whenever dynamic dials are | ||
|  | 		// necessary. Lookups need to take some time, otherwise the | ||
|  | 		// event loop spins too fast. An empty result can only be | ||
|  | 		// returned if the table is empty. | ||
|  | 		if len(t.results) == 0 { | ||
|  | 			time.Sleep(emptyLookupDelay) | ||
|  | 		} | ||
|  | 	} | ||
|  | } | ||
|  | 
 | ||
|  | func (t *discoverTask) String() (s string) { | ||
|  | 	if t.bootstrap { | ||
|  | 		s = "discovery bootstrap" | ||
|  | 	} else { | ||
|  | 		s = "discovery lookup" | ||
|  | 	} | ||
|  | 	if len(t.results) > 0 { | ||
|  | 		s += fmt.Sprintf(" (%d results)", len(t.results)) | ||
|  | 	} | ||
|  | 	return s | ||
|  | } | ||
|  | 
 | ||
|  | func (t waitExpireTask) Do(*Server) { | ||
|  | 	time.Sleep(t.Duration) | ||
|  | } | ||
|  | func (t waitExpireTask) String() string { | ||
|  | 	return fmt.Sprintf("wait for dial hist expire (%v)", t.Duration) | ||
|  | } | ||
|  | 
 | ||
|  | // Use only these methods to access or modify dialHistory. | ||
|  | func (h dialHistory) min() pastDial { | ||
|  | 	return h[0] | ||
|  | } | ||
|  | func (h *dialHistory) add(id discover.NodeID, exp time.Time) { | ||
|  | 	heap.Push(h, pastDial{id, exp}) | ||
|  | } | ||
|  | func (h dialHistory) contains(id discover.NodeID) bool { | ||
|  | 	for _, v := range h { | ||
|  | 		if v.id == id { | ||
|  | 			return true | ||
|  | 		} | ||
|  | 	} | ||
|  | 	return false | ||
|  | } | ||
|  | func (h *dialHistory) expire(now time.Time) { | ||
|  | 	for h.Len() > 0 && h.min().exp.Before(now) { | ||
|  | 		heap.Pop(h) | ||
|  | 	} | ||
|  | } | ||
|  | 
 | ||
|  | // heap.Interface boilerplate | ||
|  | func (h dialHistory) Len() int           { return len(h) } | ||
|  | func (h dialHistory) Less(i, j int) bool { return h[i].exp.Before(h[j].exp) } | ||
|  | func (h dialHistory) Swap(i, j int)      { h[i], h[j] = h[j], h[i] } | ||
|  | func (h *dialHistory) Push(x interface{}) { | ||
|  | 	*h = append(*h, x.(pastDial)) | ||
|  | } | ||
|  | func (h *dialHistory) Pop() interface{} { | ||
|  | 	old := *h | ||
|  | 	n := len(old) | ||
|  | 	x := old[n-1] | ||
|  | 	*h = old[0 : n-1] | ||
|  | 	return x | ||
|  | } |