Added whisper interface for xeth, added examples, updated RPC

* Added RPC methods for whisper
* Added whisper example
This commit is contained in:
obscuren
2015-01-30 13:25:12 +01:00
parent c48644490f
commit c03d403437
10 changed files with 296 additions and 17 deletions

View File

@ -44,18 +44,22 @@ type EthereumApi struct {
xeth *xeth.XEth
filterManager *filter.FilterManager
mut sync.RWMutex
logs map[int]state.Logs
logMut sync.RWMutex
logs map[int]state.Logs
messagesMut sync.RWMutex
messages map[int][]xeth.WhisperMessage
db ethutil.Database
}
func NewEthereumApi(xeth *xeth.XEth) *EthereumApi {
func NewEthereumApi(eth *xeth.XEth) *EthereumApi {
db, _ := ethdb.NewLDBDatabase("dapps")
api := &EthereumApi{
xeth: xeth,
filterManager: filter.NewFilterManager(xeth.Backend().EventMux()),
xeth: eth,
filterManager: filter.NewFilterManager(eth.Backend().EventMux()),
logs: make(map[int]state.Logs),
messages: make(map[int][]xeth.WhisperMessage),
db: db,
}
go api.filterManager.Start()
@ -67,8 +71,8 @@ func (self *EthereumApi) NewFilter(args *FilterOptions, reply *interface{}) erro
var id int
filter := core.NewFilter(self.xeth.Backend())
filter.LogsCallback = func(logs state.Logs) {
self.mut.Lock()
defer self.mut.Unlock()
self.logMut.Lock()
defer self.logMut.Unlock()
self.logs[id] = append(self.logs[id], logs...)
}
@ -79,8 +83,8 @@ func (self *EthereumApi) NewFilter(args *FilterOptions, reply *interface{}) erro
}
func (self *EthereumApi) FilterChanged(id int, reply *interface{}) error {
self.mut.RLock()
defer self.mut.RUnlock()
self.logMut.RLock()
defer self.logMut.RUnlock()
*reply = toLogs(self.logs[id])
@ -257,6 +261,44 @@ func (p *EthereumApi) DbGet(args *DbArgs, reply *interface{}) error {
return nil
}
func (p *EthereumApi) NewWhisperIdentity(reply *interface{}) error {
*reply = p.xeth.Whisper().NewIdentity()
return nil
}
func (p *EthereumApi) NewWhisperFilter(args *xeth.Options, reply *interface{}) error {
var id int
args.Fn = func(msg xeth.WhisperMessage) {
p.messagesMut.Lock()
defer p.messagesMut.Unlock()
p.messages[id] = append(p.messages[id], msg)
}
id = p.xeth.Whisper().Watch(args)
*reply = id
return nil
}
func (self *EthereumApi) MessagesChanged(id int, reply *interface{}) error {
self.messagesMut.RLock()
defer self.messagesMut.RUnlock()
*reply = self.messages[id]
self.messages[id] = nil // empty the messages
return nil
}
func (p *EthereumApi) WhisperPost(args *WhisperMessageArgs, reply *interface{}) error {
err := p.xeth.Whisper().Post(args.Payload, args.To, args.From, args.Topics, args.Priority, args.Ttl)
if err != nil {
return err
}
*reply = true
return nil
}
func (p *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) error {
// Spec at https://github.com/ethereum/wiki/wiki/Generic-ON-RPC
rpclogger.DebugDetailf("%T %s", req.Params, req.Params)
@ -354,6 +396,26 @@ func (p *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) error
return err
}
return p.DbGet(args, reply)
case "shh_newIdentity":
return p.NewWhisperIdentity(reply)
case "shh_newFilter":
args, err := req.ToWhisperFilterArgs()
if err != nil {
return err
}
return p.NewWhisperFilter(args, reply)
case "shh_changed":
args, err := req.ToWhisperChangedArgs()
if err != nil {
return err
}
return p.MessagesChanged(args, reply)
case "shh_post":
args, err := req.ToWhisperPostArgs()
if err != nil {
return nil
}
return p.WhisperPost(args, reply)
default:
return NewErrorResponse(fmt.Sprintf("%v %s", ErrorNotImplemented, req.Method))
}