event: address Feed review issues event: clarify role of NewSubscription function event: more Feed review fixes * take sendLock after dropping f.mu * add constant for number of special cases event: fix subscribing/unsubscribing while Send is blocked
		
			
				
	
	
		
			295 lines
		
	
	
		
			6.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			295 lines
		
	
	
		
			6.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright 2016 The go-ethereum Authors
 | |
| // This file is part of the go-ethereum library.
 | |
| //
 | |
| // The go-ethereum library is free software: you can redistribute it and/or modify
 | |
| // it under the terms of the GNU Lesser General Public License as published by
 | |
| // the Free Software Foundation, either version 3 of the License, or
 | |
| // (at your option) any later version.
 | |
| //
 | |
| // The go-ethereum library is distributed in the hope that it will be useful,
 | |
| // but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 | |
| // GNU Lesser General Public License for more details.
 | |
| //
 | |
| // You should have received a copy of the GNU Lesser General Public License
 | |
| // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
 | |
| 
 | |
| package event
 | |
| 
 | |
| import (
 | |
| 	"fmt"
 | |
| 	"reflect"
 | |
| 	"sync"
 | |
| 	"testing"
 | |
| 	"time"
 | |
| )
 | |
| 
 | |
| func TestFeedPanics(t *testing.T) {
 | |
| 	{
 | |
| 		var f Feed
 | |
| 		f.Send(int(2))
 | |
| 		want := feedTypeError{op: "Send", got: reflect.TypeOf(uint64(0)), want: reflect.TypeOf(int(0))}
 | |
| 		if err := checkPanic(want, func() { f.Send(uint64(2)) }); err != nil {
 | |
| 			t.Error(err)
 | |
| 		}
 | |
| 	}
 | |
| 	{
 | |
| 		var f Feed
 | |
| 		ch := make(chan int)
 | |
| 		f.Subscribe(ch)
 | |
| 		want := feedTypeError{op: "Send", got: reflect.TypeOf(uint64(0)), want: reflect.TypeOf(int(0))}
 | |
| 		if err := checkPanic(want, func() { f.Send(uint64(2)) }); err != nil {
 | |
| 			t.Error(err)
 | |
| 		}
 | |
| 	}
 | |
| 	{
 | |
| 		var f Feed
 | |
| 		f.Send(int(2))
 | |
| 		want := feedTypeError{op: "Subscribe", got: reflect.TypeOf(make(chan uint64)), want: reflect.TypeOf(make(chan<- int))}
 | |
| 		if err := checkPanic(want, func() { f.Subscribe(make(chan uint64)) }); err != nil {
 | |
| 			t.Error(err)
 | |
| 		}
 | |
| 	}
 | |
| 	{
 | |
| 		var f Feed
 | |
| 		if err := checkPanic(errBadChannel, func() { f.Subscribe(make(<-chan int)) }); err != nil {
 | |
| 			t.Error(err)
 | |
| 		}
 | |
| 	}
 | |
| 	{
 | |
| 		var f Feed
 | |
| 		if err := checkPanic(errBadChannel, func() { f.Subscribe(int(0)) }); err != nil {
 | |
| 			t.Error(err)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func checkPanic(want error, fn func()) (err error) {
 | |
| 	defer func() {
 | |
| 		panic := recover()
 | |
| 		if panic == nil {
 | |
| 			err = fmt.Errorf("didn't panic")
 | |
| 		} else if !reflect.DeepEqual(panic, want) {
 | |
| 			err = fmt.Errorf("panicked with wrong error: got %q, want %q", panic, want)
 | |
| 		}
 | |
| 	}()
 | |
| 	fn()
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func TestFeed(t *testing.T) {
 | |
| 	var feed Feed
 | |
| 	var done, subscribed sync.WaitGroup
 | |
| 	subscriber := func(i int) {
 | |
| 		defer done.Done()
 | |
| 
 | |
| 		subchan := make(chan int)
 | |
| 		sub := feed.Subscribe(subchan)
 | |
| 		timeout := time.NewTimer(2 * time.Second)
 | |
| 		subscribed.Done()
 | |
| 
 | |
| 		select {
 | |
| 		case v := <-subchan:
 | |
| 			if v != 1 {
 | |
| 				t.Errorf("%d: received value %d, want 1", i, v)
 | |
| 			}
 | |
| 		case <-timeout.C:
 | |
| 			t.Errorf("%d: receive timeout", i)
 | |
| 		}
 | |
| 
 | |
| 		sub.Unsubscribe()
 | |
| 		select {
 | |
| 		case _, ok := <-sub.Err():
 | |
| 			if ok {
 | |
| 				t.Errorf("%d: error channel not closed after unsubscribe", i)
 | |
| 			}
 | |
| 		case <-timeout.C:
 | |
| 			t.Errorf("%d: unsubscribe timeout", i)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	const n = 1000
 | |
| 	done.Add(n)
 | |
| 	subscribed.Add(n)
 | |
| 	for i := 0; i < n; i++ {
 | |
| 		go subscriber(i)
 | |
| 	}
 | |
| 	subscribed.Wait()
 | |
| 	if nsent := feed.Send(1); nsent != n {
 | |
| 		t.Errorf("first send delivered %d times, want %d", nsent, n)
 | |
| 	}
 | |
| 	if nsent := feed.Send(2); nsent != 0 {
 | |
| 		t.Errorf("second send delivered %d times, want 0", nsent)
 | |
| 	}
 | |
| 	done.Wait()
 | |
| }
 | |
| 
 | |
| func TestFeedSubscribeSameChannel(t *testing.T) {
 | |
| 	var (
 | |
| 		feed Feed
 | |
| 		done sync.WaitGroup
 | |
| 		ch   = make(chan int)
 | |
| 		sub1 = feed.Subscribe(ch)
 | |
| 		sub2 = feed.Subscribe(ch)
 | |
| 		_    = feed.Subscribe(ch)
 | |
| 	)
 | |
| 	expectSends := func(value, n int) {
 | |
| 		if nsent := feed.Send(value); nsent != n {
 | |
| 			t.Errorf("send delivered %d times, want %d", nsent, n)
 | |
| 		}
 | |
| 		done.Done()
 | |
| 	}
 | |
| 	expectRecv := func(wantValue, n int) {
 | |
| 		for i := 0; i < n; i++ {
 | |
| 			if v := <-ch; v != wantValue {
 | |
| 				t.Errorf("received %d, want %d", v, wantValue)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	done.Add(1)
 | |
| 	go expectSends(1, 3)
 | |
| 	expectRecv(1, 3)
 | |
| 	done.Wait()
 | |
| 
 | |
| 	sub1.Unsubscribe()
 | |
| 
 | |
| 	done.Add(1)
 | |
| 	go expectSends(2, 2)
 | |
| 	expectRecv(2, 2)
 | |
| 	done.Wait()
 | |
| 
 | |
| 	sub2.Unsubscribe()
 | |
| 
 | |
| 	done.Add(1)
 | |
| 	go expectSends(3, 1)
 | |
| 	expectRecv(3, 1)
 | |
| 	done.Wait()
 | |
| }
 | |
| 
 | |
| func TestFeedSubscribeBlockedPost(t *testing.T) {
 | |
| 	var (
 | |
| 		feed   Feed
 | |
| 		nsends = 2000
 | |
| 		ch1    = make(chan int)
 | |
| 		ch2    = make(chan int)
 | |
| 		wg     sync.WaitGroup
 | |
| 	)
 | |
| 	defer wg.Wait()
 | |
| 
 | |
| 	feed.Subscribe(ch1)
 | |
| 	wg.Add(nsends)
 | |
| 	for i := 0; i < nsends; i++ {
 | |
| 		go func() {
 | |
| 			feed.Send(99)
 | |
| 			wg.Done()
 | |
| 		}()
 | |
| 	}
 | |
| 
 | |
| 	sub2 := feed.Subscribe(ch2)
 | |
| 	defer sub2.Unsubscribe()
 | |
| 
 | |
| 	// We're done when ch1 has received N times.
 | |
| 	// The number of receives on ch2 depends on scheduling.
 | |
| 	for i := 0; i < nsends; {
 | |
| 		select {
 | |
| 		case <-ch1:
 | |
| 			i++
 | |
| 		case <-ch2:
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func TestFeedUnsubscribeBlockedPost(t *testing.T) {
 | |
| 	var (
 | |
| 		feed   Feed
 | |
| 		nsends = 200
 | |
| 		chans  = make([]chan int, 2000)
 | |
| 		subs   = make([]Subscription, len(chans))
 | |
| 		bchan  = make(chan int)
 | |
| 		bsub   = feed.Subscribe(bchan)
 | |
| 		wg     sync.WaitGroup
 | |
| 	)
 | |
| 	for i := range chans {
 | |
| 		chans[i] = make(chan int, nsends)
 | |
| 	}
 | |
| 
 | |
| 	// Queue up some Sends. None of these can make progress while bchan isn't read.
 | |
| 	wg.Add(nsends)
 | |
| 	for i := 0; i < nsends; i++ {
 | |
| 		go func() {
 | |
| 			feed.Send(99)
 | |
| 			wg.Done()
 | |
| 		}()
 | |
| 	}
 | |
| 	// Subscribe the other channels.
 | |
| 	for i, ch := range chans {
 | |
| 		subs[i] = feed.Subscribe(ch)
 | |
| 	}
 | |
| 	// Unsubscribe them again.
 | |
| 	for _, sub := range subs {
 | |
| 		sub.Unsubscribe()
 | |
| 	}
 | |
| 	// Unblock the Sends.
 | |
| 	bsub.Unsubscribe()
 | |
| 	wg.Wait()
 | |
| }
 | |
| 
 | |
| func TestFeedUnsubscribeFromInbox(t *testing.T) {
 | |
| 	var (
 | |
| 		feed Feed
 | |
| 		ch1  = make(chan int)
 | |
| 		ch2  = make(chan int)
 | |
| 		sub1 = feed.Subscribe(ch1)
 | |
| 		sub2 = feed.Subscribe(ch1)
 | |
| 		sub3 = feed.Subscribe(ch2)
 | |
| 	)
 | |
| 	if len(feed.inbox) != 3 {
 | |
| 		t.Errorf("inbox length != 3 after subscribe")
 | |
| 	}
 | |
| 	if len(feed.sendCases) != 1 {
 | |
| 		t.Errorf("sendCases is non-empty after unsubscribe")
 | |
| 	}
 | |
| 
 | |
| 	sub1.Unsubscribe()
 | |
| 	sub2.Unsubscribe()
 | |
| 	sub3.Unsubscribe()
 | |
| 	if len(feed.inbox) != 0 {
 | |
| 		t.Errorf("inbox is non-empty after unsubscribe")
 | |
| 	}
 | |
| 	if len(feed.sendCases) != 1 {
 | |
| 		t.Errorf("sendCases is non-empty after unsubscribe")
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func BenchmarkFeedSend1000(b *testing.B) {
 | |
| 	var (
 | |
| 		done  sync.WaitGroup
 | |
| 		feed  Feed
 | |
| 		nsubs = 1000
 | |
| 	)
 | |
| 	subscriber := func(ch <-chan int) {
 | |
| 		for i := 0; i < b.N; i++ {
 | |
| 			<-ch
 | |
| 		}
 | |
| 		done.Done()
 | |
| 	}
 | |
| 	done.Add(nsubs)
 | |
| 	for i := 0; i < nsubs; i++ {
 | |
| 		ch := make(chan int, 200)
 | |
| 		feed.Subscribe(ch)
 | |
| 		go subscriber(ch)
 | |
| 	}
 | |
| 
 | |
| 	// The actual benchmark.
 | |
| 	b.ResetTimer()
 | |
| 	for i := 0; i < b.N; i++ {
 | |
| 		if feed.Send(i) != nsubs {
 | |
| 			panic("wrong number of sends")
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	b.StopTimer()
 | |
| 	done.Wait()
 | |
| }
 |