network/newstream: new stream! protocol base implementation (#1500)

network/newstream: merge new stream protocol wire protocol changes, changes to docs, add basic protocol handlers and placeholders
This commit is contained in:
acud
2019-07-08 13:25:56 +03:00
committed by GitHub
parent 7deac5693c
commit af3b5e9ce1
5 changed files with 593 additions and 32 deletions

View File

@ -67,8 +67,8 @@ Wire Protocol Specifications
| Msg Name | From->To | Params | Example |
| -------- | -------- | -------- | ------- |
| StreamInfoReq | Client->Server | Streams`[]string` | `SYNC\|6, SYNC\|5` |
| StreamInfoRes | Server->Client | Streams`[]StreamDescriptor` <br>Stream`string`<br>Cursor`uint64`<br>Bounded`bool` | `SYNC\|6;CUR=1632;bounded, SYNC\|7;CUR=18433;bounded` |
| StreamInfoReq | Client->Server | Streams`[]ID` | `SYNC\|6, SYNC\|5` |
| StreamInfoRes | Server->Client | Streams`[]StreamDescriptor` <br>Stream`ID`<br>Cursor`uint64`<br>Bounded`bool` | `SYNC\|6;CUR=1632;bounded, SYNC\|7;CUR=18433;bounded` |
| GetRange | Client->Server| Ruid`uint`<br>Stream `string`<br>From`uint`<br>To`*uint`(nullable)<br>Roundtrip`bool` | `Ruid: 21321, Stream: SYNC\|6, From: 1, To: 100`(bounded), Roundtrip: true<br>`Stream: SYNC\|7, From: 109, Roundtrip: true`(unbounded) |
| OfferedHashes | Server->Client| Ruid`uint`<br>Hashes `[]byte` | `Ruid: 21321, Hashes: [cbcbbaddda, bcbbbdbbdc, ....]` |
| WantedHashes | Client->Server | Ruid`uint`<br>Bitvector`[]byte` | `Ruid: 21321, Bitvector: [0100100100] ` |
@ -81,65 +81,120 @@ Notes:
* two notions of bounded - on the stream level and on the localstore
* if TO is not specified - we assume unbounded stream, and we just send whatever, until at most, we fill up an entire batch.
### Message struct definitions:
### Message and interface definitions:
```go
// StreamProvider interface provides a lightweight abstraction that allows an easily-pluggable
// stream provider as part of the Stream! protocol specification.
type StreamProvider interface {
NeedData(ctx context.Context, key []byte) (need bool, wait func(context.Context) error)
Get(ctx context.Context, addr chunk.Address) ([]byte, error)
Put(ctx context.Context, addr chunk.Address, data []byte) (exists bool, err error)
Subscribe(ctx context.Context, key interface{}, from, to uint64) (<-chan chunk.Descriptor, func())
Cursor(interface{}) (uint64, error)
RunUpdateStreams(p *Peer)
StreamName() string
ParseKey(string) (interface{}, error)
EncodeKey(interface{}) (string, error)
StreamBehavior() StreamInitBehavior
Boundedness() bool
}
```
```go
type StreamInitBehavior int
```
```go
// StreamInfoReq is a request to get information about particular streams
type StreamInfoReq struct {
Streams []string
Streams []ID
}
```
```go
// StreamInfoRes is a response to StreamInfoReq with the corresponding stream descriptors
type StreamInfoRes struct {
Streams []StreamDescriptor
Streams []StreamDescriptor
}
```
```go
// StreamDescriptor describes an arbitrary stream
type StreamDescriptor struct {
Name string
Cursor uint
Bounded bool
Stream ID
Cursor uint64
Bounded bool
}
```
```go
// GetRange is a message sent from the downstream peer to the upstream peer asking for chunks
// within a particular interval for a certain stream
type GetRange struct {
Ruid uint
Stream string
From uint
To uint `rlp:nil`
BatchSize uint
Roundtrip bool
Ruid uint
Stream ID
From uint64
To uint64 `rlp:nil`
BatchSize uint
Roundtrip bool
}
```
```go
// OfferedHashes is a message sent from the upstream peer to the downstream peer allowing the latter
// to selectively ask for chunks within a particular requested interval
type OfferedHashes struct {
Ruid uint
LastIndex uint
Hashes []byte
Ruid uint
LastIndex uint
Hashes []byte
}
```
```go
// WantedHashes is a message sent from the downstream peer to the upstream peer in response
// to OfferedHashes in order to selectively ask for a particular chunks within an interval
type WantedHashes struct {
Ruid uint
BitVector []byte
Ruid uint
BitVector []byte
}
```
```go
// ChunkDelivery delivers a frame of chunks in response to a WantedHashes message
type ChunkDelivery struct {
Ruid uint
LastIndex uint
Chunks [][]byte
Ruid uint
LastIndex uint
Chunks []DeliveredChunk
}
```
```go
type BatchDone struct {
Ruid uint
Last uint
// DeliveredChunk encapsulates a particular chunk's underlying data within a ChunkDelivery message
type DeliveredChunk struct {
Addr storage.Address
Data []byte
}
```
```go
// StreamState is a message exchanged between two nodes to notify of changes or errors in a stream's state
type StreamState struct {
Stream string
Code uint16
Message string
Stream ID
Code uint16
Message string
}
```
```go
// Stream defines a unique stream identifier in a textual representation
type ID struct {
// Name is used for the Stream provider identification
Name string
// Key is the name of specific data stream within the stream provider. The semantics of this value
// is at the discretion of the stream provider implementation
Key string
}
```
@ -147,14 +202,14 @@ Message exchange examples:
======
Initial handshake - client queries server for stream states<br>
![handshake](https://raw.githubusercontent.com/ethersphere/swarm/stream-spec/docs/diagrams/stream-handshake.png)
![handshake](https://raw.githubusercontent.com/ethersphere/swarm/master/docs/diagrams/stream-handshake.png)
<br>
GetRange (bounded) - client requests a bounded range within a stream<br>
![bounded-range](https://raw.githubusercontent.com/ethersphere/swarm/stream-spec/docs/diagrams/stream-bounded.png)
![bounded-range](https://raw.githubusercontent.com/ethersphere/swarm/master/docs/diagrams/stream-bounded.png)
<br>
GetRange (unbounded) - client requests an unbounded range (specifies only `From` parameter)<br>
![unbounded-range](https://raw.githubusercontent.com/ethersphere/swarm/stream-spec/docs/diagrams/stream-unbounded.png)
![unbounded-range](https://raw.githubusercontent.com/ethersphere/swarm/master/docs/diagrams/stream-unbounded.png)
<br>
GetRange (no roundtrip) - client requests an unbounded or bounded range with no roundtrip configured<br>
![unbounded-range](https://raw.githubusercontent.com/ethersphere/swarm/stream-spec/docs/diagrams/stream-no-roundtrip.png)
![unbounded-range](https://raw.githubusercontent.com/ethersphere/swarm/master/docs/diagrams/stream-no-roundtrip.png)

View File

@ -0,0 +1,65 @@
// Copyright 2019 The Swarm Authors
// This file is part of the Swarm library.
//
// The Swarm library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The Swarm library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the Swarm library. If not, see <http://www.gnu.org/licenses/>.
package newstream
import (
"flag"
"io/ioutil"
"os"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethersphere/swarm/network"
"github.com/ethersphere/swarm/storage/localstore"
"github.com/ethersphere/swarm/storage/mock"
)
var (
loglevel = flag.Int("loglevel", 5, "verbosity of logs")
)
func init() {
flag.Parse()
log.PrintOrigins(true)
log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(os.Stderr, log.TerminalFormat(false))))
}
func newTestLocalStore(id enode.ID, addr *network.BzzAddr, globalStore mock.GlobalStorer) (localStore *localstore.DB, cleanup func(), err error) {
dir, err := ioutil.TempDir("", "swarm-stream-")
if err != nil {
return nil, nil, err
}
cleanup = func() {
os.RemoveAll(dir)
}
var mockStore *mock.NodeStore
if globalStore != nil {
mockStore = globalStore.NewNodeStore(common.BytesToAddress(id.Bytes()))
}
localStore, err = localstore.New(dir, addr.Over(), &localstore.Options{
MockStore: mockStore,
})
if err != nil {
cleanup()
return nil, nil, err
}
return localStore, cleanup, nil
}

100
network/newstream/peer.go Normal file
View File

@ -0,0 +1,100 @@
// Copyright 2019 The Swarm Authors
// This file is part of the Swarm library.
//
// The Swarm library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The Swarm library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the Swarm library. If not, see <http://www.gnu.org/licenses/>.
package newstream
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/network"
"github.com/ethersphere/swarm/network/bitvector"
"github.com/ethersphere/swarm/state"
)
var ErrEmptyBatch = errors.New("empty batch")
const (
HashSize = 32
BatchSize = 16
)
// Peer is the Peer extension for the streaming protocol
type Peer struct {
*network.BzzPeer
mtx sync.Mutex
providers map[string]StreamProvider
intervalsStore state.Store
streamCursorsMu sync.Mutex
streamCursors map[string]uint64 // key: Stream ID string representation, value: session cursor. Keeps cursors for all streams. when unset - we are not interested in that bin
dirtyStreams map[string]bool // key: stream ID, value: whether cursors for a stream should be updated
activeBoundedGets map[string]chan struct{}
openWants map[uint]*want // maintain open wants on the client side
openOffers map[uint]offer // maintain open offers on the server side
quit chan struct{} // closed when peer is going offline
}
// NewPeer is the constructor for Peer
func NewPeer(peer *network.BzzPeer, i state.Store, providers map[string]StreamProvider) *Peer {
p := &Peer{
BzzPeer: peer,
providers: providers,
intervalsStore: i,
streamCursors: make(map[string]uint64),
dirtyStreams: make(map[string]bool),
openWants: make(map[uint]*want),
openOffers: make(map[uint]offer),
quit: make(chan struct{}),
}
return p
}
func (p *Peer) Left() {
close(p.quit)
}
// HandleMsg is the message handler that delegates incoming messages
func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error {
switch msg := msg.(type) {
default:
return fmt.Errorf("unknown message type: %T", msg)
}
return nil
}
type offer struct {
ruid uint
stream ID
hashes []byte
requested time.Time
}
type want struct {
ruid uint
from uint64
to uint64
stream ID
hashes map[string]bool
bv *bitvector.BitVector
requested time.Time
remaining uint64
chunks chan chunk.Chunk
done chan error
}

167
network/newstream/stream.go Normal file
View File

@ -0,0 +1,167 @@
// Copyright 2019 The Swarm Authors
// This file is part of the Swarm library.
//
// The Swarm library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The Swarm library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the Swarm library. If not, see <http://www.gnu.org/licenses/>.
package newstream
import (
"sync"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/network"
"github.com/ethersphere/swarm/p2p/protocols"
"github.com/ethersphere/swarm/state"
)
// SlipStream implements node.Service
var _ node.Service = (*SlipStream)(nil)
var SyncerSpec = &protocols.Spec{
Name: "bzz-stream",
Version: 8,
MaxMsgSize: 10 * 1024 * 1024,
Messages: []interface{}{
StreamInfoReq{},
StreamInfoRes{},
GetRange{},
OfferedHashes{},
ChunkDelivery{},
WantedHashes{},
},
}
// SlipStream is the base type that handles all client/server operations on a node
// it is instantiated once per stream protocol instance, that is, it should have
// one instance per node
type SlipStream struct {
mtx sync.RWMutex
intervalsStore state.Store //every protocol would make use of this
peers map[enode.ID]*Peer
kad *network.Kademlia
providers map[string]StreamProvider
spec *protocols.Spec //this protocol's spec
balance protocols.Balance //implements protocols.Balance, for accounting
prices protocols.Prices //implements protocols.Prices, provides prices to accounting
quit chan struct{} // terminates registry goroutines
}
func NewSlipStream(intervalsStore state.Store, kad *network.Kademlia, providers ...StreamProvider) *SlipStream {
slipStream := &SlipStream{
intervalsStore: intervalsStore,
kad: kad,
peers: make(map[enode.ID]*Peer),
providers: make(map[string]StreamProvider),
quit: make(chan struct{}),
}
for _, p := range providers {
slipStream.providers[p.StreamName()] = p
}
slipStream.spec = SyncerSpec
return slipStream
}
func (s *SlipStream) getPeer(id enode.ID) *Peer {
s.mtx.Lock()
defer s.mtx.Unlock()
p := s.peers[id]
return p
}
func (s *SlipStream) addPeer(p *Peer) {
s.mtx.Lock()
defer s.mtx.Unlock()
s.peers[p.ID()] = p
}
func (s *SlipStream) removePeer(p *Peer) {
s.mtx.Lock()
defer s.mtx.Unlock()
if _, found := s.peers[p.ID()]; found {
log.Error("removing peer", "id", p.ID())
delete(s.peers, p.ID())
p.Left()
} else {
log.Warn("peer was marked for removal but not found", "peer", p.ID())
}
}
// Run is being dispatched when 2 nodes connect
func (s *SlipStream) Run(p *p2p.Peer, rw p2p.MsgReadWriter) error {
peer := protocols.NewPeer(p, rw, s.spec)
bp := network.NewBzzPeer(peer)
np := network.NewPeer(bp, s.kad)
s.kad.On(np)
defer s.kad.Off(np)
sp := NewPeer(bp, s.intervalsStore, s.providers)
s.addPeer(sp)
defer s.removePeer(sp)
return peer.Run(sp.HandleMsg)
}
func (s *SlipStream) Protocols() []p2p.Protocol {
return []p2p.Protocol{
{
Name: "bzz-stream",
Version: 1,
Length: 10 * 1024 * 1024,
Run: s.Run,
},
}
}
func (s *SlipStream) APIs() []rpc.API {
return []rpc.API{
{
Namespace: "bzz-stream",
Version: "1.0",
Service: NewAPI(s),
Public: false,
},
}
}
// Additional public methods accessible through API for pss
type API struct {
*SlipStream
}
func NewAPI(s *SlipStream) *API {
return &API{SlipStream: s}
}
func (s *SlipStream) Start(server *p2p.Server) error {
log.Info("slip stream starting")
return nil
}
func (s *SlipStream) Stop() error {
log.Info("slip stream closing")
s.mtx.Lock()
defer s.mtx.Unlock()
close(s.quit)
return nil
}

174
network/newstream/wire.go Normal file
View File

@ -0,0 +1,174 @@
// Copyright 2019 The Swarm Authors
// This file is part of the Swarm library.
//
// The Swarm library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The Swarm library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the Swarm library. If not, see <http://www.gnu.org/licenses/>.
package newstream
import (
"context"
"fmt"
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/storage"
)
// StreamProvider interface provides a lightweight abstraction that allows an easily-pluggable
// stream provider as part of the Stream! protocol specification.
// Since Stream! thoroughly defines the concepts of a stream, intervals, clients and servers, the
// interface therefore needs only a pluggable provider.
// The domain interpretable notions which are at the discretion of the implementing
// provider therefore are - sourcing data (get, put, subscribe for constant new data, and need data
// which is to decide whether to retrieve data or not), retrieving cursors from the data store, the
// implementation of which streams to maintain with a certain peer and providing functionality
// to expose, parse and encode values related to the string represntation of the stream
type StreamProvider interface {
// NeedData informs the caller whether a certain chunk needs to be fetched from another peer or not.
// Typically this will involve checking whether a certain chunk exists locally.
// In case a chunk does not exist locally - a `wait` function returns upon chunk delivery
NeedData(ctx context.Context, key []byte) (need bool, wait func(context.Context) error)
// Get a particular chunk identified by addr from the local storage
Get(ctx context.Context, addr chunk.Address) ([]byte, error)
// Put a certain chunk into the local storage
Put(ctx context.Context, addr chunk.Address, data []byte) (exists bool, err error)
// Subscribe to a data stream from an arbitrary data source
Subscribe(ctx context.Context, key interface{}, from, to uint64) (<-chan chunk.Descriptor, func())
// Cursor returns the last known Cursor for a given Stream Key
Cursor(interface{}) (uint64, error)
// RunUpdateStreams is a provider specific implementation on how to maintain running streams with
// an arbitrary Peer. This method should always be run in a separate goroutine
RunUpdateStreams(p *Peer)
// StreamName returns the Name of the Stream (see ID)
StreamName() string
// ParseStream from a standard pipe-separated string and return the Stream Key
ParseKey(string) (interface{}, error)
// EncodeStream from a Stream Key to a Stream pipe-separated string representation
EncodeKey(interface{}) (string, error)
// StreamBehavior defines how the stream behaves upon initialisation
StreamBehavior() StreamInitBehavior
Boundedness() bool
}
// StreamInitBehavior defines the stream behavior upon init
type StreamInitBehavior int
const (
// StreamIdle means that there is no initial automatic message exchange
// between the nodes when the protocol gets established
StreamIdle StreamInitBehavior = iota
// StreamGetCursors tells the two nodes to automatically fetch stream
// cursors from each other
StreamGetCursors
// StreamAutostart automatically starts fetching data from the streams
// once the cursors arrive
StreamAutostart
)
// StreamInfoReq is a request to get information about particular streams
type StreamInfoReq struct {
Streams []ID
}
// StreamInfoRes is a response to StreamInfoReq with the corresponding stream descriptors
type StreamInfoRes struct {
Streams []StreamDescriptor
}
// StreamDescriptor describes an arbitrary stream
type StreamDescriptor struct {
Stream ID
Cursor uint64
Bounded bool
}
// GetRange is a message sent from the downstream peer to the upstream peer asking for chunks
// within a particular interval for a certain stream
type GetRange struct {
Ruid uint
Stream ID
From uint64
To uint64 `rlp:nil`
BatchSize uint
Roundtrip bool
}
// OfferedHashes is a message sent from the upstream peer to the downstream peer allowing the latter
// to selectively ask for chunks within a particular requested interval
type OfferedHashes struct {
Ruid uint
LastIndex uint
Hashes []byte
}
// WantedHashes is a message sent from the downstream peer to the upstream peer in response
// to OfferedHashes in order to selectively ask for a particular chunks within an interval
type WantedHashes struct {
Ruid uint
BitVector []byte
}
// ChunkDelivery delivers a frame of chunks in response to a WantedHashes message
type ChunkDelivery struct {
Ruid uint
LastIndex uint
Chunks []DeliveredChunk
}
// DeliveredChunk encapsulates a particular chunk's underlying data within a ChunkDelivery message
type DeliveredChunk struct {
Addr storage.Address //chunk address
Data []byte //chunk data
}
// StreamState is a message exchanged between two nodes to notify of changes or errors in a stream's state
type StreamState struct {
Stream ID
Code uint16
Message string
}
// Stream defines a unique stream identifier in a textual representation
type ID struct {
// Name is used for the Stream provider identification
Name string
// Key is the name of specific data stream within the stream provider. The semantics of this value
// is at the discretion of the stream provider implementation
Key string
}
// NewID returns a new Stream ID for a particular stream Name and Key
func NewID(name string, key string) ID {
return ID{
Name: name,
Key: key,
}
}
// String return a stream id based on all Stream fields.
func (s ID) String() string {
return fmt.Sprintf("%s|%s", s.Name, s.Key)
}