p2p, swarm: fix node up races by granular locking (#18976)
* swarm/network: DRY out repeated giga comment I not necessarily agree with the way we wait for event propagation. But I truly disagree with having duplicated giga comments. * p2p/simulations: encapsulate Node.Up field so we avoid data races The Node.Up field was accessed concurrently without "proper" locking. There was a lock on Network and that was used sometimes to access the field. Other times the locking was missed and we had a data race. For example: https://github.com/ethereum/go-ethereum/pull/18464 The case above was solved, but there were still intermittent/hard to reproduce races. So let's solve the issue permanently. resolves: ethersphere/go-ethereum#1146 * p2p/simulations: fix unmarshal of simulations.Node Making Node.Up field private in13292ee897
broke TestHTTPNetwork and TestHTTPSnapshot. Because the default UnmarshalJSON does not handle unexported fields. Important: The fix is partial and not proper to my taste. But I cut scope as I think the fix may require a change to the current serialization format. New ticket: https://github.com/ethersphere/go-ethereum/issues/1177 * p2p/simulations: Add a sanity test case for Node.Config UnmarshalJSON * p2p/simulations: revert back to defer Unlock() pattern for Network It's a good patten to call `defer Unlock()` right after `Lock()` so (new) error cases won't miss to unlock. Let's get back to that pattern. The patten was abandoned in85a79b3ad3
, while fixing a data race. That data race does not exist anymore, since the Node.Up field got hidden behind its own lock. * p2p/simulations: consistent naming for test providers Node.UnmarshalJSON * p2p/simulations: remove JSON annotation from private fields of Node As unexported fields are not serialized. * p2p/simulations: fix deadlock in Network.GetRandomDownNode() Problem: GetRandomDownNode() locks -> getDownNodeIDs() -> GetNodes() tries to lock -> deadlock On Network type, unexported functions must assume that `net.lock` is already acquired and should not call exported functions which might try to lock again. * p2p/simulations: ensure method conformity for Network Connect* methods were moved to p2p/simulations.Network from swarm/network/simulation. However these new methods did not follow the pattern of Network methods, i.e., all exported method locks the whole Network either for read or write. * p2p/simulations: fix deadlock during network shutdown `TestDiscoveryPersistenceSimulationSimAdapter` often got into deadlock. The execution was stuck on two locks, i.e, `Kademlia.lock` and `p2p/simulations.Network.lock`. Usually the test got stuck once in each 20 executions with high confidence. `Kademlia` was stuck in `Kademlia.EachAddr()` and `Network` in `Network.Stop()`. Solution: in `Network.Stop()` `net.lock` must be released before calling `node.Stop()` as stopping a node (somehow - I did not find the exact code path) causes `Network.InitConn()` to be called from `Kademlia.SuggestPeer()` and that blocks on `net.lock`. Related ticket: https://github.com/ethersphere/go-ethereum/issues/1223 * swarm/state: simplify if statement in DBStore.Put() * p2p/simulations: remove faulty godoc from private function The comment started with the wrong method name. The method is simple and self explanatory. Also, it's private. => Let's just remove the comment.
This commit is contained in:
committed by
Viktor Trón
parent
12ca3b172a
commit
50b872bf05
@ -137,7 +137,7 @@ func (net *Network) Config() *NetworkConfig {
|
||||
// StartAll starts all nodes in the network
|
||||
func (net *Network) StartAll() error {
|
||||
for _, node := range net.Nodes {
|
||||
if node.Up {
|
||||
if node.Up() {
|
||||
continue
|
||||
}
|
||||
if err := net.Start(node.ID()); err != nil {
|
||||
@ -150,7 +150,7 @@ func (net *Network) StartAll() error {
|
||||
// StopAll stops all nodes in the network
|
||||
func (net *Network) StopAll() error {
|
||||
for _, node := range net.Nodes {
|
||||
if !node.Up {
|
||||
if !node.Up() {
|
||||
continue
|
||||
}
|
||||
if err := net.Stop(node.ID()); err != nil {
|
||||
@ -169,27 +169,23 @@ func (net *Network) Start(id enode.ID) error {
|
||||
// snapshots
|
||||
func (net *Network) startWithSnapshots(id enode.ID, snapshots map[string][]byte) error {
|
||||
net.lock.Lock()
|
||||
defer net.lock.Unlock()
|
||||
|
||||
node := net.getNode(id)
|
||||
if node == nil {
|
||||
net.lock.Unlock()
|
||||
return fmt.Errorf("node %v does not exist", id)
|
||||
}
|
||||
if node.Up {
|
||||
net.lock.Unlock()
|
||||
if node.Up() {
|
||||
return fmt.Errorf("node %v already up", id)
|
||||
}
|
||||
log.Trace("Starting node", "id", id, "adapter", net.nodeAdapter.Name())
|
||||
if err := node.Start(snapshots); err != nil {
|
||||
net.lock.Unlock()
|
||||
log.Warn("Node startup failed", "id", id, "err", err)
|
||||
return err
|
||||
}
|
||||
node.Up = true
|
||||
node.SetUp(true)
|
||||
log.Info("Started node", "id", id)
|
||||
ev := NewEvent(node)
|
||||
net.lock.Unlock()
|
||||
|
||||
net.events.Send(ev)
|
||||
|
||||
// subscribe to peer events
|
||||
@ -220,7 +216,7 @@ func (net *Network) watchPeerEvents(id enode.ID, events chan *p2p.PeerEvent, sub
|
||||
if node == nil {
|
||||
return
|
||||
}
|
||||
node.Up = false
|
||||
node.SetUp(false)
|
||||
ev := NewEvent(node)
|
||||
net.events.Send(ev)
|
||||
}()
|
||||
@ -258,30 +254,42 @@ func (net *Network) watchPeerEvents(id enode.ID, events chan *p2p.PeerEvent, sub
|
||||
|
||||
// Stop stops the node with the given ID
|
||||
func (net *Network) Stop(id enode.ID) error {
|
||||
net.lock.Lock()
|
||||
node := net.getNode(id)
|
||||
if node == nil {
|
||||
net.lock.Unlock()
|
||||
return fmt.Errorf("node %v does not exist", id)
|
||||
}
|
||||
if !node.Up {
|
||||
net.lock.Unlock()
|
||||
return fmt.Errorf("node %v already down", id)
|
||||
}
|
||||
node.Up = false
|
||||
net.lock.Unlock()
|
||||
// IMPORTANT: node.Stop() must NOT be called under net.lock as
|
||||
// node.Reachable() closure has a reference to the network and
|
||||
// calls net.InitConn() what also locks the network. => DEADLOCK
|
||||
// That holds until the following ticket is not resolved:
|
||||
|
||||
err := node.Stop()
|
||||
if err != nil {
|
||||
var err error
|
||||
|
||||
node, err := func() (*Node, error) {
|
||||
net.lock.Lock()
|
||||
node.Up = true
|
||||
net.lock.Unlock()
|
||||
defer net.lock.Unlock()
|
||||
|
||||
node := net.getNode(id)
|
||||
if node == nil {
|
||||
return nil, fmt.Errorf("node %v does not exist", id)
|
||||
}
|
||||
if !node.Up() {
|
||||
return nil, fmt.Errorf("node %v already down", id)
|
||||
}
|
||||
node.SetUp(false)
|
||||
return node, nil
|
||||
}()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = node.Stop() // must be called without net.lock
|
||||
|
||||
net.lock.Lock()
|
||||
defer net.lock.Unlock()
|
||||
|
||||
if err != nil {
|
||||
node.SetUp(true)
|
||||
return err
|
||||
}
|
||||
log.Info("Stopped node", "id", id, "err", err)
|
||||
net.lock.Lock()
|
||||
ev := ControlEvent(node)
|
||||
net.lock.Unlock()
|
||||
net.events.Send(ev)
|
||||
return nil
|
||||
}
|
||||
@ -289,8 +297,14 @@ func (net *Network) Stop(id enode.ID) error {
|
||||
// Connect connects two nodes together by calling the "admin_addPeer" RPC
|
||||
// method on the "one" node so that it connects to the "other" node
|
||||
func (net *Network) Connect(oneID, otherID enode.ID) error {
|
||||
net.lock.Lock()
|
||||
defer net.lock.Unlock()
|
||||
return net.connect(oneID, otherID)
|
||||
}
|
||||
|
||||
func (net *Network) connect(oneID, otherID enode.ID) error {
|
||||
log.Debug("Connecting nodes with addPeer", "id", oneID, "other", otherID)
|
||||
conn, err := net.InitConn(oneID, otherID)
|
||||
conn, err := net.initConn(oneID, otherID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -388,6 +402,14 @@ func (net *Network) GetNode(id enode.ID) *Node {
|
||||
return net.getNode(id)
|
||||
}
|
||||
|
||||
func (net *Network) getNode(id enode.ID) *Node {
|
||||
i, found := net.nodeMap[id]
|
||||
if !found {
|
||||
return nil
|
||||
}
|
||||
return net.Nodes[i]
|
||||
}
|
||||
|
||||
// GetNode gets the node with the given name, returning nil if the node does
|
||||
// not exist
|
||||
func (net *Network) GetNodeByName(name string) *Node {
|
||||
@ -410,28 +432,29 @@ func (net *Network) GetNodes() (nodes []*Node) {
|
||||
net.lock.RLock()
|
||||
defer net.lock.RUnlock()
|
||||
|
||||
nodes = append(nodes, net.Nodes...)
|
||||
return nodes
|
||||
return net.getNodes()
|
||||
}
|
||||
|
||||
func (net *Network) getNode(id enode.ID) *Node {
|
||||
i, found := net.nodeMap[id]
|
||||
if !found {
|
||||
return nil
|
||||
}
|
||||
return net.Nodes[i]
|
||||
func (net *Network) getNodes() (nodes []*Node) {
|
||||
nodes = append(nodes, net.Nodes...)
|
||||
return nodes
|
||||
}
|
||||
|
||||
// GetRandomUpNode returns a random node on the network, which is running.
|
||||
func (net *Network) GetRandomUpNode(excludeIDs ...enode.ID) *Node {
|
||||
net.lock.RLock()
|
||||
defer net.lock.RUnlock()
|
||||
return net.getRandomUpNode(excludeIDs...)
|
||||
}
|
||||
|
||||
// GetRandomUpNode returns a random node on the network, which is running.
|
||||
func (net *Network) getRandomUpNode(excludeIDs ...enode.ID) *Node {
|
||||
return net.getRandomNode(net.getUpNodeIDs(), excludeIDs)
|
||||
}
|
||||
|
||||
func (net *Network) getUpNodeIDs() (ids []enode.ID) {
|
||||
for _, node := range net.Nodes {
|
||||
if node.Up {
|
||||
if node.Up() {
|
||||
ids = append(ids, node.ID())
|
||||
}
|
||||
}
|
||||
@ -446,8 +469,8 @@ func (net *Network) GetRandomDownNode(excludeIDs ...enode.ID) *Node {
|
||||
}
|
||||
|
||||
func (net *Network) getDownNodeIDs() (ids []enode.ID) {
|
||||
for _, node := range net.GetNodes() {
|
||||
if !node.Up {
|
||||
for _, node := range net.getNodes() {
|
||||
if !node.Up() {
|
||||
ids = append(ids, node.ID())
|
||||
}
|
||||
}
|
||||
@ -539,6 +562,10 @@ func (net *Network) getConn(oneID, otherID enode.ID) *Conn {
|
||||
func (net *Network) InitConn(oneID, otherID enode.ID) (*Conn, error) {
|
||||
net.lock.Lock()
|
||||
defer net.lock.Unlock()
|
||||
return net.initConn(oneID, otherID)
|
||||
}
|
||||
|
||||
func (net *Network) initConn(oneID, otherID enode.ID) (*Conn, error) {
|
||||
if oneID == otherID {
|
||||
return nil, fmt.Errorf("refusing to connect to self %v", oneID)
|
||||
}
|
||||
@ -602,8 +629,21 @@ type Node struct {
|
||||
// Config if the config used to created the node
|
||||
Config *adapters.NodeConfig `json:"config"`
|
||||
|
||||
// Up tracks whether or not the node is running
|
||||
Up bool `json:"up"`
|
||||
// up tracks whether or not the node is running
|
||||
up bool
|
||||
upMu sync.RWMutex
|
||||
}
|
||||
|
||||
func (n *Node) Up() bool {
|
||||
n.upMu.RLock()
|
||||
defer n.upMu.RUnlock()
|
||||
return n.up
|
||||
}
|
||||
|
||||
func (n *Node) SetUp(up bool) {
|
||||
n.upMu.Lock()
|
||||
defer n.upMu.Unlock()
|
||||
n.up = up
|
||||
}
|
||||
|
||||
// ID returns the ID of the node
|
||||
@ -637,10 +677,29 @@ func (n *Node) MarshalJSON() ([]byte, error) {
|
||||
}{
|
||||
Info: n.NodeInfo(),
|
||||
Config: n.Config,
|
||||
Up: n.Up,
|
||||
Up: n.Up(),
|
||||
})
|
||||
}
|
||||
|
||||
// UnmarshalJSON implements json.Unmarshaler interface so that we don't lose
|
||||
// Node.up status. IMPORTANT: The implementation is incomplete; we lose
|
||||
// p2p.NodeInfo.
|
||||
func (n *Node) UnmarshalJSON(raw []byte) error {
|
||||
// TODO: How should we turn back NodeInfo into n.Node?
|
||||
// Ticket: https://github.com/ethersphere/go-ethereum/issues/1177
|
||||
node := struct {
|
||||
Config *adapters.NodeConfig `json:"config,omitempty"`
|
||||
Up bool `json:"up"`
|
||||
}{}
|
||||
if err := json.Unmarshal(raw, &node); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
n.SetUp(node.Up)
|
||||
n.Config = node.Config
|
||||
return nil
|
||||
}
|
||||
|
||||
// Conn represents a connection between two nodes in the network
|
||||
type Conn struct {
|
||||
// One is the node which initiated the connection
|
||||
@ -660,10 +719,10 @@ type Conn struct {
|
||||
|
||||
// nodesUp returns whether both nodes are currently up
|
||||
func (c *Conn) nodesUp() error {
|
||||
if !c.one.Up {
|
||||
if !c.one.Up() {
|
||||
return fmt.Errorf("one %v is not up", c.One)
|
||||
}
|
||||
if !c.other.Up {
|
||||
if !c.other.Up() {
|
||||
return fmt.Errorf("other %v is not up", c.Other)
|
||||
}
|
||||
return nil
|
||||
@ -735,7 +794,7 @@ func (net *Network) snapshot(addServices []string, removeServices []string) (*Sn
|
||||
}
|
||||
for i, node := range net.Nodes {
|
||||
snap.Nodes[i] = NodeSnapshot{Node: *node}
|
||||
if !node.Up {
|
||||
if !node.Up() {
|
||||
continue
|
||||
}
|
||||
snapshots, err := node.Snapshots()
|
||||
@ -790,7 +849,7 @@ func (net *Network) Load(snap *Snapshot) error {
|
||||
if _, err := net.NewNodeWithConfig(n.Node.Config); err != nil {
|
||||
return err
|
||||
}
|
||||
if !n.Node.Up {
|
||||
if !n.Node.Up() {
|
||||
continue
|
||||
}
|
||||
if err := net.startWithSnapshots(n.Node.Config.ID, n.Snapshots); err != nil {
|
||||
@ -862,7 +921,7 @@ func (net *Network) Load(snap *Snapshot) error {
|
||||
// Start connecting.
|
||||
for _, conn := range snap.Conns {
|
||||
|
||||
if !net.GetNode(conn.One).Up || !net.GetNode(conn.Other).Up {
|
||||
if !net.GetNode(conn.One).Up() || !net.GetNode(conn.Other).Up() {
|
||||
//in this case, at least one of the nodes of a connection is not up,
|
||||
//so it would result in the snapshot `Load` to fail
|
||||
continue
|
||||
@ -916,7 +975,7 @@ func (net *Network) executeControlEvent(event *Event) {
|
||||
}
|
||||
|
||||
func (net *Network) executeNodeEvent(e *Event) error {
|
||||
if !e.Node.Up {
|
||||
if !e.Node.Up() {
|
||||
return net.Stop(e.Node.ID())
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user