swarm: fix network/stream data races (#19051)

* swarm/network/stream: newStreamerTester cleanup only if err is nil

* swarm/network/stream: raise newStreamerTester waitForPeers timeout

* swarm/network/stream: fix data races in GetPeerSubscriptions

* swarm/storage: prevent data race on LDBStore.batchesC

https://github.com/ethersphere/go-ethereum/issues/1198#issuecomment-461775049

* swarm/network/stream: fix TestGetSubscriptionsRPC data race

https://github.com/ethersphere/go-ethereum/issues/1198#issuecomment-461768477

* swarm/network/stream: correctly use Simulation.Run callback

https://github.com/ethersphere/go-ethereum/issues/1198#issuecomment-461783804

* swarm/network: protect addrCountC in Kademlia.AddrCountC function

https://github.com/ethersphere/go-ethereum/issues/1198#issuecomment-462273444

* p2p/simulations: fix a deadlock calling getRandomNode with lock

https://github.com/ethersphere/go-ethereum/issues/1198#issuecomment-462317407

* swarm/network/stream: terminate disconnect goruotines in tests

* swarm/network/stream: reduce memory consumption when testing data races

* swarm/network/stream: add watchDisconnections helper function

* swarm/network/stream: add concurrent counter for tests

* swarm/network/stream: rename race/norace test files and use const

* swarm/network/stream: remove watchSim and its panic

* swarm/network/stream: pass context in watchDisconnections

* swarm/network/stream: add concurrent safe bool for watchDisconnections

* swarm/storage: fix LDBStore.batchesC data race by not closing it
This commit is contained in:
Janoš Guljaš
2019-02-13 13:03:23 +01:00
committed by Viktor Trón
parent d596bea2d5
commit 3fd6db2bf6
14 changed files with 274 additions and 197 deletions

View File

@@ -41,10 +41,10 @@ import (
func TestStreamerSubscribe(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(nil)
defer teardown()
if err != nil {
t.Fatal(err)
}
defer teardown()
stream := NewStream("foo", "", true)
err = streamer.Subscribe(tester.Nodes[0].ID(), stream, NewRange(0, 0), Top)
@@ -55,10 +55,10 @@ func TestStreamerSubscribe(t *testing.T) {
func TestStreamerRequestSubscription(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(nil)
defer teardown()
if err != nil {
t.Fatal(err)
}
defer teardown()
stream := NewStream("foo", "", false)
err = streamer.RequestSubscription(tester.Nodes[0].ID(), stream, &Range{}, Top)
@@ -146,10 +146,10 @@ func (self *testServer) Close() {
func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(nil)
defer teardown()
if err != nil {
t.Fatal(err)
}
defer teardown()
streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) {
return newTestClient(t), nil
@@ -239,10 +239,10 @@ func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(nil)
defer teardown()
if err != nil {
t.Fatal(err)
}
defer teardown()
stream := NewStream("foo", "", false)
@@ -306,10 +306,10 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(nil)
defer teardown()
if err != nil {
t.Fatal(err)
}
defer teardown()
stream := NewStream("foo", "", true)
@@ -372,10 +372,10 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) {
func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(nil)
defer teardown()
if err != nil {
t.Fatal(err)
}
defer teardown()
streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
return newTestServer(t, 0), nil
@@ -416,10 +416,10 @@ func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) {
func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(nil)
defer teardown()
if err != nil {
t.Fatal(err)
}
defer teardown()
stream := NewStream("foo", "", true)
@@ -479,10 +479,10 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) {
func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(nil)
defer teardown()
if err != nil {
t.Fatal(err)
}
defer teardown()
stream := NewStream("foo", "", true)
@@ -544,10 +544,10 @@ func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) {
func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(nil)
defer teardown()
if err != nil {
t.Fatal(err)
}
defer teardown()
stream := NewStream("foo", "", true)
@@ -643,10 +643,10 @@ func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) {
func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(nil)
defer teardown()
if err != nil {
t.Fatal(err)
}
defer teardown()
streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
return newTestServer(t, 10), nil
@@ -780,10 +780,10 @@ func TestMaxPeerServersWithUnsubscribe(t *testing.T) {
Syncing: SyncingDisabled,
MaxPeerServers: maxPeerServers,
})
defer teardown()
if err != nil {
t.Fatal(err)
}
defer teardown()
streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
return newTestServer(t, 0), nil
@@ -854,10 +854,10 @@ func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(&RegistryOptions{
MaxPeerServers: maxPeerServers,
})
defer teardown()
if err != nil {
t.Fatal(err)
}
defer teardown()
streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
return newTestServer(t, 0), nil
@@ -940,10 +940,10 @@ func TestHasPriceImplementation(t *testing.T) {
Retrieval: RetrievalDisabled,
Syncing: SyncingDisabled,
})
defer teardown()
if err != nil {
t.Fatal(err)
}
defer teardown()
if r.prices == nil {
t.Fatal("No prices implementation available for the stream protocol")
@@ -1177,6 +1177,7 @@ starts the simulation, waits for SyncUpdateDelay in order to kick off
stream registration, then tests that there are subscriptions.
*/
func TestGetSubscriptionsRPC(t *testing.T) {
// arbitrarily set to 4
nodeCount := 4
// run with more nodes if `longrunning` flag is set
@@ -1188,19 +1189,16 @@ func TestGetSubscriptionsRPC(t *testing.T) {
// holds the msg code for SubscribeMsg
var subscribeMsgCode uint64
var ok bool
var expectedMsgCount = 0
var expectedMsgCount counter
// this channel signalizes that the expected amount of subscriptiosn is done
allSubscriptionsDone := make(chan struct{})
lock := sync.RWMutex{}
// after the test, we need to reset the subscriptionFunc to the default
defer func() { subscriptionFunc = doRequestSubscription }()
// we use this subscriptionFunc for this test: just increases count and calls the actual subscription
subscriptionFunc = func(r *Registry, p *network.Peer, bin uint8, subs map[enode.ID]map[Stream]struct{}) bool {
lock.Lock()
expectedMsgCount++
lock.Unlock()
expectedMsgCount.inc()
doRequestSubscription(r, p, bin, subs)
return true
}
@@ -1290,24 +1288,24 @@ func TestGetSubscriptionsRPC(t *testing.T) {
select {
case <-allSubscriptionsDone:
case <-ctx.Done():
t.Fatal("Context timed out")
return errors.New("Context timed out")
}
log.Debug("Expected message count: ", "expectedMsgCount", expectedMsgCount)
log.Debug("Expected message count: ", "expectedMsgCount", expectedMsgCount.count())
//now iterate again, this time we call each node via RPC to get its subscriptions
realCount := 0
for _, node := range nodes {
//create rpc client
client, err := node.Client()
if err != nil {
t.Fatalf("create node 1 rpc client fail: %v", err)
return fmt.Errorf("create node 1 rpc client fail: %v", err)
}
//ask it for subscriptions
pstreams := make(map[string][]string)
err = client.Call(&pstreams, "stream_getPeerSubscriptions")
if err != nil {
t.Fatal(err)
return fmt.Errorf("client call stream_getPeerSubscriptions: %v", err)
}
//length of the subscriptions can not be smaller than number of peers
log.Debug("node subscriptions", "node", node.String())
@@ -1324,8 +1322,9 @@ func TestGetSubscriptionsRPC(t *testing.T) {
}
}
// every node is mutually subscribed to each other, so the actual count is half of it
if realCount/2 != expectedMsgCount {
return fmt.Errorf("Real subscriptions and expected amount don't match; real: %d, expected: %d", realCount/2, expectedMsgCount)
emc := expectedMsgCount.count()
if realCount/2 != emc {
return fmt.Errorf("Real subscriptions and expected amount don't match; real: %d, expected: %d", realCount/2, emc)
}
return nil
})
@@ -1333,3 +1332,26 @@ func TestGetSubscriptionsRPC(t *testing.T) {
t.Fatal(result.Error)
}
}
// counter is used to concurrently increment
// and read an integer value.
type counter struct {
v int
mu sync.RWMutex
}
// Increment the counter.
func (c *counter) inc() {
c.mu.Lock()
defer c.mu.Unlock()
c.v++
}
// Read the counter value.
func (c *counter) count() int {
c.mu.RLock()
defer c.mu.RUnlock()
return c.v
}