initial commit of p2p package
This commit is contained in:
275
p2p/connection.go
Normal file
275
p2p/connection.go
Normal file
@@ -0,0 +1,275 @@
|
||||
package p2p
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
// "fmt"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/eth-go/ethutil"
|
||||
)
|
||||
|
||||
type Connection struct {
|
||||
conn net.Conn
|
||||
// conn NetworkConnection
|
||||
timeout time.Duration
|
||||
in chan []byte
|
||||
out chan []byte
|
||||
err chan *PeerError
|
||||
closingIn chan chan bool
|
||||
closingOut chan chan bool
|
||||
}
|
||||
|
||||
// const readBufferLength = 2 //for testing
|
||||
|
||||
const readBufferLength = 1440
|
||||
const partialsQueueSize = 10
|
||||
const maxPendingQueueSize = 1
|
||||
const defaultTimeout = 500
|
||||
|
||||
var magicToken = []byte{34, 64, 8, 145}
|
||||
|
||||
func (self *Connection) Open() {
|
||||
go self.startRead()
|
||||
go self.startWrite()
|
||||
}
|
||||
|
||||
func (self *Connection) Close() {
|
||||
self.closeIn()
|
||||
self.closeOut()
|
||||
}
|
||||
|
||||
func (self *Connection) closeIn() {
|
||||
errc := make(chan bool)
|
||||
self.closingIn <- errc
|
||||
<-errc
|
||||
}
|
||||
|
||||
func (self *Connection) closeOut() {
|
||||
errc := make(chan bool)
|
||||
self.closingOut <- errc
|
||||
<-errc
|
||||
}
|
||||
|
||||
func NewConnection(conn net.Conn, errchan chan *PeerError) *Connection {
|
||||
return &Connection{
|
||||
conn: conn,
|
||||
timeout: defaultTimeout,
|
||||
in: make(chan []byte),
|
||||
out: make(chan []byte),
|
||||
err: errchan,
|
||||
closingIn: make(chan chan bool, 1),
|
||||
closingOut: make(chan chan bool, 1),
|
||||
}
|
||||
}
|
||||
|
||||
func (self *Connection) Read() <-chan []byte {
|
||||
return self.in
|
||||
}
|
||||
|
||||
func (self *Connection) Write() chan<- []byte {
|
||||
return self.out
|
||||
}
|
||||
|
||||
func (self *Connection) Error() <-chan *PeerError {
|
||||
return self.err
|
||||
}
|
||||
|
||||
func (self *Connection) startRead() {
|
||||
payloads := make(chan []byte)
|
||||
done := make(chan *PeerError)
|
||||
pending := [][]byte{}
|
||||
var head []byte
|
||||
var wait time.Duration // initally 0 (no delay)
|
||||
read := time.After(wait * time.Millisecond)
|
||||
|
||||
for {
|
||||
// if pending empty, nil channel blocks
|
||||
var in chan []byte
|
||||
if len(pending) > 0 {
|
||||
in = self.in // enable send case
|
||||
head = pending[0]
|
||||
} else {
|
||||
in = nil
|
||||
}
|
||||
|
||||
select {
|
||||
case <-read:
|
||||
go self.read(payloads, done)
|
||||
case err := <-done:
|
||||
if err == nil { // no error but nothing to read
|
||||
if len(pending) < maxPendingQueueSize {
|
||||
wait = 100
|
||||
} else if wait == 0 {
|
||||
wait = 100
|
||||
} else {
|
||||
wait = 2 * wait
|
||||
}
|
||||
} else {
|
||||
self.err <- err // report error
|
||||
wait = 100
|
||||
}
|
||||
read = time.After(wait * time.Millisecond)
|
||||
case payload := <-payloads:
|
||||
pending = append(pending, payload)
|
||||
if len(pending) < maxPendingQueueSize {
|
||||
wait = 0
|
||||
} else {
|
||||
wait = 100
|
||||
}
|
||||
read = time.After(wait * time.Millisecond)
|
||||
case in <- head:
|
||||
pending = pending[1:]
|
||||
case errc := <-self.closingIn:
|
||||
errc <- true
|
||||
close(self.in)
|
||||
return
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func (self *Connection) startWrite() {
|
||||
pending := [][]byte{}
|
||||
done := make(chan *PeerError)
|
||||
writing := false
|
||||
for {
|
||||
if len(pending) > 0 && !writing {
|
||||
writing = true
|
||||
go self.write(pending[0], done)
|
||||
}
|
||||
select {
|
||||
case payload := <-self.out:
|
||||
pending = append(pending, payload)
|
||||
case err := <-done:
|
||||
if err == nil {
|
||||
pending = pending[1:]
|
||||
writing = false
|
||||
} else {
|
||||
self.err <- err // report error
|
||||
}
|
||||
case errc := <-self.closingOut:
|
||||
errc <- true
|
||||
close(self.out)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func pack(payload []byte) (packet []byte) {
|
||||
length := ethutil.NumberToBytes(uint32(len(payload)), 32)
|
||||
// return error if too long?
|
||||
// Write magic token and payload length (first 8 bytes)
|
||||
packet = append(magicToken, length...)
|
||||
packet = append(packet, payload...)
|
||||
return
|
||||
}
|
||||
|
||||
func avoidPanic(done chan *PeerError) {
|
||||
if rec := recover(); rec != nil {
|
||||
err := NewPeerError(MiscError, " %v", rec)
|
||||
logger.Debugln(err)
|
||||
done <- err
|
||||
}
|
||||
}
|
||||
|
||||
func (self *Connection) write(payload []byte, done chan *PeerError) {
|
||||
defer avoidPanic(done)
|
||||
var err *PeerError
|
||||
_, ok := self.conn.Write(pack(payload))
|
||||
if ok != nil {
|
||||
err = NewPeerError(WriteError, " %v", ok)
|
||||
logger.Debugln(err)
|
||||
}
|
||||
done <- err
|
||||
}
|
||||
|
||||
func (self *Connection) read(payloads chan []byte, done chan *PeerError) {
|
||||
//defer avoidPanic(done)
|
||||
|
||||
partials := make(chan []byte, partialsQueueSize)
|
||||
errc := make(chan *PeerError)
|
||||
go self.readPartials(partials, errc)
|
||||
|
||||
packet := []byte{}
|
||||
length := 8
|
||||
start := true
|
||||
var err *PeerError
|
||||
out:
|
||||
for {
|
||||
// appends partials read via connection until packet is
|
||||
// - either parseable (>=8bytes)
|
||||
// - or complete (payload fully consumed)
|
||||
for len(packet) < length {
|
||||
partial, ok := <-partials
|
||||
if !ok { // partials channel is closed
|
||||
err = <-errc
|
||||
if err == nil && len(packet) > 0 {
|
||||
if start {
|
||||
err = NewPeerError(PacketTooShort, "%v", packet)
|
||||
} else {
|
||||
err = NewPeerError(PayloadTooShort, "%d < %d", len(packet), length)
|
||||
}
|
||||
}
|
||||
break out
|
||||
}
|
||||
packet = append(packet, partial...)
|
||||
}
|
||||
if start {
|
||||
// at least 8 bytes read, can validate packet
|
||||
if bytes.Compare(magicToken, packet[:4]) != 0 {
|
||||
err = NewPeerError(MagicTokenMismatch, " received %v", packet[:4])
|
||||
break
|
||||
}
|
||||
length = int(ethutil.BytesToNumber(packet[4:8]))
|
||||
packet = packet[8:]
|
||||
|
||||
if length > 0 {
|
||||
start = false // now consuming payload
|
||||
} else { //penalize peer but read on
|
||||
self.err <- NewPeerError(EmptyPayload, "")
|
||||
length = 8
|
||||
}
|
||||
} else {
|
||||
// packet complete (payload fully consumed)
|
||||
payloads <- packet[:length]
|
||||
packet = packet[length:] // resclice packet
|
||||
start = true
|
||||
length = 8
|
||||
}
|
||||
}
|
||||
|
||||
// this stops partials read via the connection, should we?
|
||||
//if err != nil {
|
||||
// select {
|
||||
// case errc <- err
|
||||
// default:
|
||||
//}
|
||||
done <- err
|
||||
}
|
||||
|
||||
func (self *Connection) readPartials(partials chan []byte, errc chan *PeerError) {
|
||||
defer close(partials)
|
||||
for {
|
||||
// Give buffering some time
|
||||
self.conn.SetReadDeadline(time.Now().Add(self.timeout * time.Millisecond))
|
||||
buffer := make([]byte, readBufferLength)
|
||||
// read partial from connection
|
||||
bytesRead, err := self.conn.Read(buffer)
|
||||
if err == nil || err.Error() == "EOF" {
|
||||
if bytesRead > 0 {
|
||||
partials <- buffer[:bytesRead]
|
||||
}
|
||||
if err != nil && err.Error() == "EOF" {
|
||||
break
|
||||
}
|
||||
} else {
|
||||
// unexpected error, report to errc
|
||||
err := NewPeerError(ReadError, " %v", err)
|
||||
logger.Debugln(err)
|
||||
errc <- err
|
||||
return // will close partials channel
|
||||
}
|
||||
}
|
||||
close(errc)
|
||||
}
|
Reference in New Issue
Block a user