From fde0ddb324788a51a94bd0f605bb8f935db2f7f4 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 6 Jan 2015 10:58:36 +0100 Subject: [PATCH 1/8] cmd/rlpdump: remove stray return --- cmd/rlpdump/main.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cmd/rlpdump/main.go b/cmd/rlpdump/main.go index 8f1c4a8c2e..8567dcff83 100644 --- a/cmd/rlpdump/main.go +++ b/cmd/rlpdump/main.go @@ -110,8 +110,7 @@ func dump(s *rlp.Stream, depth int) error { s.List() defer s.ListEnd() if size == 0 { - fmt.Printf(ws(depth) + "[]") - return nil + fmt.Print(ws(depth) + "[]") } else { fmt.Println(ws(depth) + "[") for i := 0; ; i++ { From be977858562941ed8b8bd96eff65fbca1d1c4e4f Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 6 Jan 2015 11:07:05 +0100 Subject: [PATCH 2/8] cmd/evm: add dummy implementation for GetHash Fixes the build. AFAIK evm does not bother keeping a chain and cannot provide a real implementation. --- cmd/evm/main.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/cmd/evm/main.go b/cmd/evm/main.go index 84ab0dc274..f902c99e51 100644 --- a/cmd/evm/main.go +++ b/cmd/evm/main.go @@ -131,6 +131,12 @@ func (self *VMEnv) Value() *big.Int { return self.value } func (self *VMEnv) GasLimit() *big.Int { return big.NewInt(1000000000) } func (self *VMEnv) Depth() int { return 0 } func (self *VMEnv) SetDepth(i int) { self.depth = i } +func (self *VMEnv) GetHash(n uint64) []byte { + if self.block.Number().Cmp(big.NewInt(int64(n))) == 0 { + return self.block.Hash() + } + return nil +} func (self *VMEnv) AddLog(log state.Log) { self.state.AddLog(log) } From 545e14691bd467992c905aa34fac71e25ef76108 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 6 Jan 2015 11:09:47 +0100 Subject: [PATCH 3/8] cmd/peerserver: fix for new client identity type --- cmd/peerserver/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/peerserver/main.go b/cmd/peerserver/main.go index eb0900f8b4..ce0c7ba0c1 100644 --- a/cmd/peerserver/main.go +++ b/cmd/peerserver/main.go @@ -35,7 +35,7 @@ func main() { srv := p2p.Server{ MaxPeers: 100, - Identity: p2p.NewSimpleClientIdentity("Ethereum(G)", "0.1", "Peer Server Two", string(marshaled)), + Identity: p2p.NewSimpleClientIdentity("Ethereum(G)", "0.1", "Peer Server Two", marshaled), ListenAddr: ":30301", NAT: p2p.UPNP(), } From 4c8c115a7633e39b85738cd7919c7d3e3e722e7a Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 6 Jan 2015 11:10:20 +0100 Subject: [PATCH 4/8] cmd/peerserver: use NoDial, don't use seed peers --- cmd/peerserver/main.go | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/cmd/peerserver/main.go b/cmd/peerserver/main.go index ce0c7ba0c1..7a5d5708ea 100644 --- a/cmd/peerserver/main.go +++ b/cmd/peerserver/main.go @@ -18,9 +18,7 @@ package main import ( "crypto/elliptic" - "fmt" "log" - "net" "os" "github.com/ethereum/go-ethereum/crypto" @@ -38,19 +36,10 @@ func main() { Identity: p2p.NewSimpleClientIdentity("Ethereum(G)", "0.1", "Peer Server Two", marshaled), ListenAddr: ":30301", NAT: p2p.UPNP(), + NoDial: true, } if err := srv.Start(); err != nil { - fmt.Println("could not start server:", err) - os.Exit(1) + log.Fatal("could not start server:", err) } - - // add seed peers - seed, err := net.ResolveTCPAddr("tcp", "poc-8.ethdev.com:30303") - if err != nil { - fmt.Println("couldn't resolve:", err) - } else { - srv.SuggestPeer(seed.IP, seed.Port, nil) - } - select {} } From 36e1e5f15142b37801844a072eb46ea67fbc8868 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 6 Jan 2015 11:15:37 +0100 Subject: [PATCH 5/8] cmd/peerserver: add some command line switches --- cmd/peerserver/main.go | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/cmd/peerserver/main.go b/cmd/peerserver/main.go index 7a5d5708ea..341c4dbb9d 100644 --- a/cmd/peerserver/main.go +++ b/cmd/peerserver/main.go @@ -18,6 +18,7 @@ package main import ( "crypto/elliptic" + "flag" "log" "os" @@ -26,7 +27,19 @@ import ( "github.com/ethereum/go-ethereum/p2p" ) +var ( + natType = flag.String("nat", "", "NAT traversal implementation") + pmpGateway = flag.String("gateway", "", "gateway address for NAT-PMP") + listenAddr = flag.String("addr", ":30301", "listen address") +) + func main() { + flag.Parse() + nat, err := p2p.ParseNAT(*natType, *pmpGateway) + if err != nil { + log.Fatal("invalid nat:", err) + } + logger.AddLogSystem(logger.NewStdLogSystem(os.Stdout, log.LstdFlags, logger.InfoLevel)) key, _ := crypto.GenerateKey() marshaled := elliptic.Marshal(crypto.S256(), key.PublicKey.X, key.PublicKey.Y) @@ -34,8 +47,8 @@ func main() { srv := p2p.Server{ MaxPeers: 100, Identity: p2p.NewSimpleClientIdentity("Ethereum(G)", "0.1", "Peer Server Two", marshaled), - ListenAddr: ":30301", - NAT: p2p.UPNP(), + ListenAddr: *listenAddr, + NAT: nat, NoDial: true, } if err := srv.Start(); err != nil { From eb0e7b1b8120852a1d56aa0ebd3a98e652965635 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 6 Jan 2015 11:35:09 +0100 Subject: [PATCH 6/8] eth, p2p: remove EncodeMsg from p2p.MsgWriter ...and make it a top-level function instead. The original idea behind having EncodeMsg in the interface was that implementations might be able to encode RLP data to their underlying writer directly instead of buffering the encoded data. The encoder will buffer anyway, so that doesn't matter anymore. Given the recent problems with EncodeMsg (copy-pasted implementation bug) I'd rather implement once, correctly. --- eth/protocol.go | 8 ++++---- eth/protocol_test.go | 4 ---- p2p/message.go | 20 +++++++++----------- p2p/message_test.go | 6 +++--- p2p/peer_test.go | 4 ++-- p2p/protocol.go | 10 +++++----- p2p/protocol_test.go | 4 ++-- 7 files changed, 25 insertions(+), 31 deletions(-) diff --git a/eth/protocol.go b/eth/protocol.go index b67e5aaeab..723ab5502e 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -140,7 +140,7 @@ func (self *ethProtocol) handle() error { return self.protoError(ErrDecode, "->msg %v: %v", msg, err) } hashes := self.chainManager.GetBlockHashesFromHash(request.Hash, request.Amount) - return self.rw.EncodeMsg(BlockHashesMsg, ethutil.ByteSliceToInterface(hashes)...) + return p2p.EncodeMsg(self.rw, BlockHashesMsg, ethutil.ByteSliceToInterface(hashes)...) case BlockHashesMsg: // TODO: redo using lazy decode , this way very inefficient on known chains @@ -185,7 +185,7 @@ func (self *ethProtocol) handle() error { break } } - return self.rw.EncodeMsg(BlocksMsg, blocks...) + return p2p.EncodeMsg(self.rw, BlocksMsg, blocks...) case BlocksMsg: msgStream := rlp.NewStream(msg.Payload) @@ -298,12 +298,12 @@ func (self *ethProtocol) handleStatus() error { func (self *ethProtocol) requestBlockHashes(from []byte) error { self.peer.Debugf("fetching hashes (%d) %x...\n", blockHashesBatchSize, from[0:4]) - return self.rw.EncodeMsg(GetBlockHashesMsg, interface{}(from), uint64(blockHashesBatchSize)) + return p2p.EncodeMsg(self.rw, GetBlockHashesMsg, interface{}(from), uint64(blockHashesBatchSize)) } func (self *ethProtocol) requestBlocks(hashes [][]byte) error { self.peer.Debugf("fetching %v blocks", len(hashes)) - return self.rw.EncodeMsg(GetBlocksMsg, ethutil.ByteSliceToInterface(hashes)...) + return p2p.EncodeMsg(self.rw, GetBlocksMsg, ethutil.ByteSliceToInterface(hashes)...) } func (self *ethProtocol) protoError(code int, format string, params ...interface{}) (err *protocolError) { diff --git a/eth/protocol_test.go b/eth/protocol_test.go index ab2aa289f0..224b59abd3 100644 --- a/eth/protocol_test.go +++ b/eth/protocol_test.go @@ -41,10 +41,6 @@ func (self *testMsgReadWriter) WriteMsg(msg p2p.Msg) error { return nil } -func (self *testMsgReadWriter) EncodeMsg(code uint64, data ...interface{}) error { - return self.WriteMsg(p2p.NewMsg(code, data...)) -} - func (self *testMsgReadWriter) ReadMsg() (p2p.Msg, error) { msg, ok := <-self.in if !ok { diff --git a/p2p/message.go b/p2p/message.go index a6f62ec4c8..daf2bf05c1 100644 --- a/p2p/message.go +++ b/p2p/message.go @@ -71,14 +71,11 @@ type MsgReader interface { } type MsgWriter interface { - // WriteMsg sends an existing message. - // The Payload reader of the message is consumed. + // WriteMsg sends a message. It will block until the message's + // Payload has been consumed by the other end. + // // Note that messages can be sent only once. WriteMsg(Msg) error - - // EncodeMsg writes an RLP-encoded message with the given - // code and data elements. - EncodeMsg(code uint64, data ...interface{}) error } // MsgReadWriter provides reading and writing of encoded messages. @@ -87,6 +84,12 @@ type MsgReadWriter interface { MsgWriter } +// EncodeMsg writes an RLP-encoded message with the given code and +// data elements. +func EncodeMsg(w MsgWriter, code uint64, data ...interface{}) error { + return w.WriteMsg(NewMsg(code, data...)) +} + var magicToken = []byte{34, 64, 8, 145} func writeMsg(w io.Writer, msg Msg) error { @@ -209,11 +212,6 @@ func (p *MsgPipeRW) WriteMsg(msg Msg) error { return ErrPipeClosed } -// EncodeMsg is a convenient shorthand for sending an RLP-encoded message. -func (p *MsgPipeRW) EncodeMsg(code uint64, data ...interface{}) error { - return p.WriteMsg(NewMsg(code, data...)) -} - // ReadMsg returns a message sent on the other end of the pipe. func (p *MsgPipeRW) ReadMsg() (Msg, error) { if atomic.LoadInt32(p.closed) == 0 { diff --git a/p2p/message_test.go b/p2p/message_test.go index 066d2516d4..5cde9abf5b 100644 --- a/p2p/message_test.go +++ b/p2p/message_test.go @@ -75,8 +75,8 @@ func TestDecodeRealMsg(t *testing.T) { func ExampleMsgPipe() { rw1, rw2 := MsgPipe() go func() { - rw1.EncodeMsg(8, []byte{0, 0}) - rw1.EncodeMsg(5, []byte{1, 1}) + EncodeMsg(rw1, 8, []byte{0, 0}) + EncodeMsg(rw1, 5, []byte{1, 1}) rw1.Close() }() @@ -100,7 +100,7 @@ loop: rw1, rw2 := MsgPipe() done := make(chan struct{}) go func() { - if err := rw1.EncodeMsg(1); err == nil { + if err := EncodeMsg(rw1, 1); err == nil { t.Error("EncodeMsg returned nil error") } else if err != ErrPipeClosed { t.Error("EncodeMsg returned wrong error: got %v, want %v", err, ErrPipeClosed) diff --git a/p2p/peer_test.go b/p2p/peer_test.go index 5b9e9e7847..4ee88f112b 100644 --- a/p2p/peer_test.go +++ b/p2p/peer_test.go @@ -126,10 +126,10 @@ func TestPeerProtoEncodeMsg(t *testing.T) { Name: "a", Length: 2, Run: func(peer *Peer, rw MsgReadWriter) error { - if err := rw.EncodeMsg(2); err == nil { + if err := EncodeMsg(rw, 2); err == nil { t.Error("expected error for out-of-range msg code, got nil") } - if err := rw.EncodeMsg(1, "foo", "bar"); err != nil { + if err := EncodeMsg(rw, 1, "foo", "bar"); err != nil { t.Errorf("write error: %v", err) } return nil diff --git a/p2p/protocol.go b/p2p/protocol.go index dd8cbc4ecd..969937076b 100644 --- a/p2p/protocol.go +++ b/p2p/protocol.go @@ -119,14 +119,14 @@ func (bp *baseProtocol) loop(quit <-chan error) error { getPeersTick := time.NewTicker(10 * time.Second) defer getPeersTick.Stop() - err := bp.rw.EncodeMsg(getPeersMsg) + err := EncodeMsg(bp.rw, getPeersMsg) for err == nil { select { case err = <-quit: return err case <-getPeersTick.C: - err = bp.rw.EncodeMsg(getPeersMsg) + err = EncodeMsg(bp.rw, getPeersMsg) case event := <-activity.Chan(): ping.Reset(pingTimeout) lastActive = event.(time.Time) @@ -134,7 +134,7 @@ func (bp *baseProtocol) loop(quit <-chan error) error { if lastActive.Add(pingTimeout * 2).Before(t) { err = newPeerError(errPingTimeout, "") } else if lastActive.Add(pingTimeout).Before(t) { - err = bp.rw.EncodeMsg(pingMsg) + err = EncodeMsg(bp.rw, pingMsg) } } } @@ -164,7 +164,7 @@ func (bp *baseProtocol) handle(rw MsgReadWriter) error { return discRequestedError(reason[0]) case pingMsg: - return bp.rw.EncodeMsg(pongMsg) + return EncodeMsg(bp.rw, pongMsg) case pongMsg: @@ -177,7 +177,7 @@ func (bp *baseProtocol) handle(rw MsgReadWriter) error { // // TODO: add event mechanism to notify baseProtocol for new peers if len(peers) > 0 { - return bp.rw.EncodeMsg(peersMsg, peers...) + return EncodeMsg(bp.rw, peersMsg, peers...) } case peersMsg: diff --git a/p2p/protocol_test.go b/p2p/protocol_test.go index ce25b3e1b5..ba5e95c021 100644 --- a/p2p/protocol_test.go +++ b/p2p/protocol_test.go @@ -93,7 +93,7 @@ func TestBaseProtocolDisconnect(t *testing.T) { if err := expectMsg(rw2, handshakeMsg); err != nil { t.Error(err) } - err := rw2.EncodeMsg(handshakeMsg, + err := EncodeMsg(rw2, handshakeMsg, baseProtocolVersion, "", []interface{}{}, @@ -106,7 +106,7 @@ func TestBaseProtocolDisconnect(t *testing.T) { if err := expectMsg(rw2, getPeersMsg); err != nil { t.Error(err) } - if err := rw2.EncodeMsg(discMsg, DiscQuitting); err != nil { + if err := EncodeMsg(rw2, discMsg, DiscQuitting); err != nil { t.Error(err) } From b0ff946b55c23f0fffc50a700bcb255f95855afc Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 6 Jan 2015 12:14:29 +0100 Subject: [PATCH 7/8] p2p: move peerList back into baseProtocol It had been moved to Peer, probably for debugging. --- p2p/peer.go | 22 ---------------------- p2p/protocol.go | 24 +++++++++++++++++++++++- 2 files changed, 23 insertions(+), 23 deletions(-) diff --git a/p2p/peer.go b/p2p/peer.go index 0d7eec9f46..2380a3285b 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -460,25 +460,3 @@ func (r *eofSignal) Read(buf []byte) (int, error) { } return n, err } - -func (peer *Peer) PeerList() []interface{} { - peers := peer.otherPeers() - ds := make([]interface{}, 0, len(peers)) - for _, p := range peers { - p.infolock.Lock() - addr := p.listenAddr - p.infolock.Unlock() - // filter out this peer and peers that are not listening or - // have not completed the handshake. - // TODO: track previously sent peers and exclude them as well. - if p == peer || addr == nil { - continue - } - ds = append(ds, addr) - } - ourAddr := peer.ourListenAddr - if ourAddr != nil && !ourAddr.IP.IsLoopback() && !ourAddr.IP.IsUnspecified() { - ds = append(ds, ourAddr) - } - return ds -} diff --git a/p2p/protocol.go b/p2p/protocol.go index 969937076b..1d121a8855 100644 --- a/p2p/protocol.go +++ b/p2p/protocol.go @@ -169,7 +169,7 @@ func (bp *baseProtocol) handle(rw MsgReadWriter) error { case pongMsg: case getPeersMsg: - peers := bp.peer.PeerList() + peers := bp.peerList() // this is dangerous. the spec says that we should _delay_ // sending the response if no new information is available. // this means that would need to send a response later when @@ -264,3 +264,25 @@ func (bp *baseProtocol) handshakeMsg() Msg { bp.peer.ourID.Pubkey()[1:], ) } + +func (bp *baseProtocol) peerList() []interface{} { + peers := bp.peer.otherPeers() + ds := make([]interface{}, 0, len(peers)) + for _, p := range peers { + p.infolock.Lock() + addr := p.listenAddr + p.infolock.Unlock() + // filter out this peer and peers that are not listening or + // have not completed the handshake. + // TODO: track previously sent peers and exclude them as well. + if p == bp.peer || addr == nil { + continue + } + ds = append(ds, addr) + } + ourAddr := bp.peer.ourListenAddr + if ourAddr != nil && !ourAddr.IP.IsLoopback() && !ourAddr.IP.IsUnspecified() { + ds = append(ds, ourAddr) + } + return ds +} From 3caa4ad1baba3019c06733e1a80d78d9a57137bb Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 6 Jan 2015 12:15:51 +0100 Subject: [PATCH 8/8] p2p: improve test for peers message The test now checks that the number of of addresses is correct and terminates cleanly. --- p2p/protocol_test.go | 64 +++++++++++++++++++++++++++++--------------- 1 file changed, 43 insertions(+), 21 deletions(-) diff --git a/p2p/protocol_test.go b/p2p/protocol_test.go index ba5e95c021..b1d10ac536 100644 --- a/p2p/protocol_test.go +++ b/p2p/protocol_test.go @@ -4,6 +4,7 @@ import ( "fmt" "net" "reflect" + "sync" "testing" "github.com/ethereum/go-ethereum/crypto" @@ -36,50 +37,71 @@ func newTestPeer() (peer *Peer) { } func TestBaseProtocolPeers(t *testing.T) { - cannedPeerList := []*peerAddr{ + peerList := []*peerAddr{ {IP: net.ParseIP("1.2.3.4"), Port: 2222, Pubkey: []byte{}}, {IP: net.ParseIP("5.6.7.8"), Port: 3333, Pubkey: []byte{}}, } - var ownAddr *peerAddr = &peerAddr{IP: net.ParseIP("1.3.5.7"), Port: 1111, Pubkey: []byte{}} + listenAddr := &peerAddr{IP: net.ParseIP("1.3.5.7"), Port: 1111, Pubkey: []byte{}} rw1, rw2 := MsgPipe() + defer rw1.Close() + wg := new(sync.WaitGroup) + // run matcher, close pipe when addresses have arrived - addrChan := make(chan *peerAddr, len(cannedPeerList)) + numPeers := len(peerList) + 1 + addrChan := make(chan *peerAddr) + wg.Add(1) go func() { - for _, want := range cannedPeerList { - got := <-addrChan - t.Logf("got peer: %+v", got) + i := 0 + for got := range addrChan { + var want *peerAddr + switch { + case i < len(peerList): + want = peerList[i] + case i == len(peerList): + want = listenAddr // listenAddr should be the last thing sent + } + t.Logf("got peer %d/%d: %v", i+1, numPeers, got) if !reflect.DeepEqual(want, got) { - t.Errorf("mismatch: got %#v, want %#v", got, want) + t.Errorf("mismatch: got %+v, want %+v", got, want) + } + i++ + if i == numPeers { + break } } - close(addrChan) - var own []*peerAddr - var got *peerAddr - for got = range addrChan { - own = append(own, got) + if i != numPeers { + t.Errorf("wrong number of peers received: got %d, want %d", i, numPeers) } - if len(own) != 1 || !reflect.DeepEqual(ownAddr, own[0]) { - t.Errorf("mismatch: peers own address is incorrectly or not given, got %v, want %#v", ownAddr) - } - rw2.Close() + rw1.Close() + wg.Done() }() - // run first peer + + // run first peer (in background) peer1 := newTestPeer() - peer1.ourListenAddr = ownAddr + peer1.ourListenAddr = listenAddr peer1.otherPeers = func() []*Peer { - pl := make([]*Peer, len(cannedPeerList)) - for i, addr := range cannedPeerList { + pl := make([]*Peer, len(peerList)) + for i, addr := range peerList { pl[i] = &Peer{listenAddr: addr} } return pl } - go runBaseProtocol(peer1, rw1) + wg.Add(1) + go func() { + runBaseProtocol(peer1, rw1) + wg.Done() + }() + // run second peer peer2 := newTestPeer() peer2.newPeerAddr = addrChan // feed peer suggestions into matcher if err := runBaseProtocol(peer2, rw2); err != ErrPipeClosed { t.Errorf("peer2 terminated with unexpected error: %v", err) } + + // terminate matcher + close(addrChan) + wg.Wait() } func TestBaseProtocolDisconnect(t *testing.T) {