Swarm MRUs: Adaptive frequency / Predictable lookups / API simplification (#17559)
* swarm/storage/mru: Adaptive Frequency swarm/storage/mru/lookup: fixed getBaseTime Added NewEpoch constructor swarm/api/client: better error handling in GetResource() swarm/storage/mru: Renamed structures. Renamed ResourceMetadata to ResourceID. Renamed ResourceID.Name to ResourceID.Topic swarm/storage/mru: Added binarySerializer interface and test tools swarm/storage/mru/lookup: Changed base time to time and + marshallers swarm/storage/mru: Added ResourceID (former resourceMetadata) swarm/storage/mru: Added ResourceViewId and serialization tests swarm/storage/mru/lookup: fixed epoch unmarshaller. Added Epoch Equals swarm/storage/mru: Fixes as per review comments cmd/swarm: reworded resource create/update help text regarding topic swarm/storage/mru: Added UpdateLookup and serializer tests swarm/storage/mru: Added UpdateHeader, serializers and tests swarm/storage/mru: changed UpdateAddr / epoch to Base() swarm/storage/mru: Added resourceUpdate serializer and tests swarm/storage/mru: Added SignedResourceUpdate tests and serializers swarm/storage/mru/lookup: fixed GetFirstEpoch bug swarm/storage/mru: refactor, comments, cleanup Also added tests for Topic swarm/storage/mru: handler tests pass swarm/storage/mru: all resource package tests pass swarm/storage/mru: resource test pass after adding timestamp checking support swarm/storage/mru: Added JSON serializers to ResourceIDView structures swarm/storage/mru: Sever, client, API test pass swarm/storage/mru: server test pass swarm/storage/mru: Added topic length check swarm/storage/mru: removed some literals, improved "previous lookup" test case swarm/storage/mru: some fixes and comments as per review swarm/storage/mru: first working version without metadata chunk swarm/storage/mru: Various fixes as per review swarm/storage/mru: client test pass swarm/storage/mru: resource query strings and manifest-less queries swarm/storage/mru: simplify naming swarm/storage/mru: first autofreq working version swarm/storage/mru: renamed ToValues to AppendValues swarm/resource/mru: Added ToValues / FromValues for URL query strings swarm/storage/mru: Changed POST resource to work with query strings. No more JSON. swarm/storage/mru: removed resourceid swarm/storage/mru: Opened up structures swarm/storage/mru: Merged Request and SignedResourceUpdate swarm/storage/mru: removed initial data from CLI resource create swarm/storage/mru: Refactor Topic as a direct fixed-length array swarm/storage/mru/lookup: Comprehensive GetNextLevel tests swarm/storage/mru: Added comments Added length checks in Topic swarm/storage/mru: fixes in tests and some code comments swarm/storage/mru/lookup: new optimized lookup algorithm swarm/api: moved getResourceView to api out of server swarm/storage/mru: Lookup algorithm working swarm/storage/mru: comments and renamed NewLookupParams Deleted commented code swarm/storage/mru/lookup: renamed Epoch.LaterThan to After swarm/storage/mru/lookup: Comments and tidying naming swarm/storage/mru: fix lookup algorithm swarm/storage/mru: exposed lookup hint removed updateheader swarm/storage/mru/lookup: changed GetNextEpoch for initial values swarm/storage/mru: resource tests pass swarm/storage/mru: valueSerializer interface and tests swarm/storage/mru/lookup: Comments, improvements, fixes, more tests swarm/storage/mru: renamed UpdateLookup to ID, LookupParams to Query swarm/storage/mru: renamed query receiver var swarm/cmd: MRU CLI tests * cmd/swarm: remove rogue fmt * swarm/storage/mru: Add version / header for future use * swarm/storage/mru: Fixes/comments as per review cmd/swarm: remove rogue fmt swarm/storage/mru: Add version / header for future use- * swarm/storage/mru: fix linter errors * cmd/swarm: Speeded up TestCLIResourceUpdate
This commit is contained in:
committed by
Martin Holst Swende
parent
0da3b17a11
commit
2c110c81ee
@@ -21,11 +21,12 @@ package mru
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"github.com/ethereum/go-ethereum/swarm/chunk"
|
||||
"github.com/ethereum/go-ethereum/swarm/storage/mru/lookup"
|
||||
|
||||
"github.com/ethereum/go-ethereum/swarm/log"
|
||||
"github.com/ethereum/go-ethereum/swarm/storage"
|
||||
)
|
||||
@@ -33,7 +34,7 @@ import (
|
||||
type Handler struct {
|
||||
chunkStore *storage.NetStore
|
||||
HashSize int
|
||||
resources map[uint64]*resource
|
||||
resources map[uint64]*cacheEntry
|
||||
resourceLock sync.RWMutex
|
||||
storeTimeout time.Duration
|
||||
queryMaxPeriods uint32
|
||||
@@ -42,12 +43,10 @@ type Handler struct {
|
||||
// HandlerParams pass parameters to the Handler constructor NewHandler
|
||||
// Signer and TimestampProvider are mandatory parameters
|
||||
type HandlerParams struct {
|
||||
QueryMaxPeriods uint32
|
||||
}
|
||||
|
||||
// hashPool contains a pool of ready hashers
|
||||
var hashPool sync.Pool
|
||||
var minimumChunkLength int
|
||||
|
||||
// init initializes the package and hashPool
|
||||
func init() {
|
||||
@@ -56,19 +55,12 @@ func init() {
|
||||
return storage.MakeHashFunc(resourceHashAlgorithm)()
|
||||
},
|
||||
}
|
||||
if minimumMetadataLength < minimumUpdateDataLength {
|
||||
minimumChunkLength = minimumMetadataLength
|
||||
} else {
|
||||
minimumChunkLength = minimumUpdateDataLength
|
||||
}
|
||||
}
|
||||
|
||||
// NewHandler creates a new Mutable Resource API
|
||||
func NewHandler(params *HandlerParams) *Handler {
|
||||
rh := &Handler{
|
||||
resources: make(map[uint64]*resource),
|
||||
storeTimeout: defaultStoreTimeout,
|
||||
queryMaxPeriods: params.QueryMaxPeriods,
|
||||
resources: make(map[uint64]*cacheEntry),
|
||||
}
|
||||
|
||||
for i := 0; i < hasherCount; i++ {
|
||||
@@ -88,44 +80,25 @@ func (h *Handler) SetStore(store *storage.NetStore) {
|
||||
}
|
||||
|
||||
// Validate is a chunk validation method
|
||||
// If it looks like a resource update, the chunk address is checked against the ownerAddr of the update's signature
|
||||
// If it looks like a resource update, the chunk address is checked against the userAddr of the update's signature
|
||||
// It implements the storage.ChunkValidator interface
|
||||
func (h *Handler) Validate(chunkAddr storage.Address, data []byte) bool {
|
||||
dataLength := len(data)
|
||||
if dataLength < minimumChunkLength || dataLength > chunk.DefaultSize+8 {
|
||||
if dataLength < minimumSignedUpdateLength {
|
||||
return false
|
||||
}
|
||||
|
||||
//metadata chunks have the first two bytes set to zero
|
||||
if data[0] == 0 && data[1] == 0 && dataLength >= minimumMetadataLength {
|
||||
//metadata chunk
|
||||
rootAddr, _ := metadataHash(data)
|
||||
valid := bytes.Equal(chunkAddr, rootAddr)
|
||||
if !valid {
|
||||
log.Debug("Invalid root metadata chunk with address", "addr", chunkAddr.Hex())
|
||||
}
|
||||
return valid
|
||||
}
|
||||
|
||||
// if it is not a metadata chunk, check if it is a properly formatted update chunk with
|
||||
// check if it is a properly formatted update chunk with
|
||||
// valid signature and proof of ownership of the resource it is trying
|
||||
// to update
|
||||
|
||||
// First, deserialize the chunk
|
||||
var r SignedResourceUpdate
|
||||
var r Request
|
||||
if err := r.fromChunk(chunkAddr, data); err != nil {
|
||||
log.Debug("Invalid resource chunk", "addr", chunkAddr.Hex(), "err", err.Error())
|
||||
return false
|
||||
}
|
||||
|
||||
// check that the lookup information contained in the chunk matches the updateAddr (chunk search key)
|
||||
// that was used to retrieve this chunk
|
||||
// if this validation fails, someone forged a chunk.
|
||||
if !bytes.Equal(chunkAddr, r.updateHeader.UpdateAddr()) {
|
||||
log.Debug("period,version,rootAddr contained in update chunk do not match updateAddr", "addr", chunkAddr.Hex())
|
||||
return false
|
||||
}
|
||||
|
||||
// Verify signatures and that the signer actually owns the resource
|
||||
// If it fails, it means either the signature is not valid, data is corrupted
|
||||
// or someone is trying to update someone else's resource.
|
||||
@@ -138,301 +111,134 @@ func (h *Handler) Validate(chunkAddr storage.Address, data []byte) bool {
|
||||
}
|
||||
|
||||
// GetContent retrieves the data payload of the last synced update of the Mutable Resource
|
||||
func (h *Handler) GetContent(rootAddr storage.Address) (storage.Address, []byte, error) {
|
||||
rsrc := h.get(rootAddr)
|
||||
if rsrc == nil || !rsrc.isSynced() {
|
||||
return nil, nil, NewError(ErrNotFound, " does not exist or is not synced")
|
||||
func (h *Handler) GetContent(view *View) (storage.Address, []byte, error) {
|
||||
if view == nil {
|
||||
return nil, nil, NewError(ErrInvalidValue, "view is nil")
|
||||
}
|
||||
rsrc := h.get(view)
|
||||
if rsrc == nil {
|
||||
return nil, nil, NewError(ErrNotFound, "resource does not exist")
|
||||
}
|
||||
return rsrc.lastKey, rsrc.data, nil
|
||||
}
|
||||
|
||||
// GetLastPeriod retrieves the period of the last synced update of the Mutable Resource
|
||||
func (h *Handler) GetLastPeriod(rootAddr storage.Address) (uint32, error) {
|
||||
rsrc := h.get(rootAddr)
|
||||
if rsrc == nil {
|
||||
return 0, NewError(ErrNotFound, " does not exist")
|
||||
} else if !rsrc.isSynced() {
|
||||
return 0, NewError(ErrNotSynced, " is not synced")
|
||||
}
|
||||
return rsrc.period, nil
|
||||
}
|
||||
|
||||
// GetVersion retrieves the period of the last synced update of the Mutable Resource
|
||||
func (h *Handler) GetVersion(rootAddr storage.Address) (uint32, error) {
|
||||
rsrc := h.get(rootAddr)
|
||||
if rsrc == nil {
|
||||
return 0, NewError(ErrNotFound, " does not exist")
|
||||
} else if !rsrc.isSynced() {
|
||||
return 0, NewError(ErrNotSynced, " is not synced")
|
||||
}
|
||||
return rsrc.version, nil
|
||||
}
|
||||
|
||||
// New creates a new metadata chunk out of the request passed in.
|
||||
func (h *Handler) New(ctx context.Context, request *Request) error {
|
||||
|
||||
// frequency 0 is invalid
|
||||
if request.metadata.Frequency == 0 {
|
||||
return NewError(ErrInvalidValue, "frequency cannot be 0 when creating a resource")
|
||||
}
|
||||
|
||||
// make sure owner is set to something
|
||||
if request.metadata.Owner == zeroAddr {
|
||||
return NewError(ErrInvalidValue, "ownerAddr must be set to create a new metadata chunk")
|
||||
}
|
||||
|
||||
// create the meta chunk and store it in swarm
|
||||
chunk, metaHash, err := request.metadata.newChunk()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if request.metaHash != nil && !bytes.Equal(request.metaHash, metaHash) ||
|
||||
request.rootAddr != nil && !bytes.Equal(request.rootAddr, chunk.Address()) {
|
||||
return NewError(ErrInvalidValue, "metaHash in UpdateRequest does not match actual metadata")
|
||||
}
|
||||
|
||||
request.metaHash = metaHash
|
||||
request.rootAddr = chunk.Address()
|
||||
|
||||
h.chunkStore.Put(ctx, chunk)
|
||||
log.Debug("new resource", "name", request.metadata.Name, "startTime", request.metadata.StartTime, "frequency", request.metadata.Frequency, "owner", request.metadata.Owner)
|
||||
|
||||
// create the internal index for the resource and populate it with its metadata
|
||||
rsrc := &resource{
|
||||
resourceUpdate: resourceUpdate{
|
||||
updateHeader: updateHeader{
|
||||
UpdateLookup: UpdateLookup{
|
||||
rootAddr: chunk.Address(),
|
||||
},
|
||||
},
|
||||
},
|
||||
ResourceMetadata: request.metadata,
|
||||
updated: time.Now(),
|
||||
}
|
||||
h.set(chunk.Address(), rsrc)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewUpdateRequest prepares an UpdateRequest structure with all the necessary information to
|
||||
// NewRequest prepares a Request structure with all the necessary information to
|
||||
// just add the desired data and sign it.
|
||||
// The resulting structure can then be signed and passed to Handler.Update to be verified and sent
|
||||
func (h *Handler) NewUpdateRequest(ctx context.Context, rootAddr storage.Address) (updateRequest *Request, err error) {
|
||||
|
||||
if rootAddr == nil {
|
||||
return nil, NewError(ErrInvalidValue, "rootAddr cannot be nil")
|
||||
func (h *Handler) NewRequest(ctx context.Context, view *View) (request *Request, err error) {
|
||||
if view == nil {
|
||||
return nil, NewError(ErrInvalidValue, "view cannot be nil")
|
||||
}
|
||||
|
||||
// Make sure we have a cache of the metadata chunk
|
||||
rsrc, err := h.Load(ctx, rootAddr)
|
||||
now := TimestampProvider.Now().Time
|
||||
request = new(Request)
|
||||
request.Header.Version = ProtocolVersion
|
||||
|
||||
query := NewQueryLatest(view, lookup.NoClue)
|
||||
|
||||
rsrc, err := h.Lookup(ctx, query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
now := TimestampProvider.Now()
|
||||
|
||||
updateRequest = new(Request)
|
||||
updateRequest.period, err = getNextPeriod(rsrc.StartTime.Time, now.Time, rsrc.Frequency)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if _, err = h.lookup(rsrc, LookupLatestVersionInPeriod(rsrc.rootAddr, updateRequest.period)); err != nil {
|
||||
if err.(*Error).code != ErrNotFound {
|
||||
return nil, err
|
||||
}
|
||||
// not finding updates means that there is a network error
|
||||
// or that the resource really does not have updates in this period.
|
||||
// or that the resource really does not have updates
|
||||
}
|
||||
|
||||
updateRequest.multihash = rsrc.multihash
|
||||
updateRequest.rootAddr = rsrc.rootAddr
|
||||
updateRequest.metaHash = rsrc.metaHash
|
||||
updateRequest.metadata = rsrc.ResourceMetadata
|
||||
request.View = *view
|
||||
|
||||
// if we already have an update for this period then increment version
|
||||
// resource object MUST be in sync for version to be correct, but we checked this earlier in the method already
|
||||
if h.hasUpdate(rootAddr, updateRequest.period) {
|
||||
updateRequest.version = rsrc.version + 1
|
||||
// if we already have an update, then find next epoch
|
||||
if rsrc != nil {
|
||||
request.Epoch = lookup.GetNextEpoch(rsrc.Epoch, now)
|
||||
} else {
|
||||
updateRequest.version = 1
|
||||
request.Epoch = lookup.GetFirstEpoch(now)
|
||||
}
|
||||
|
||||
return updateRequest, nil
|
||||
return request, nil
|
||||
}
|
||||
|
||||
// Lookup retrieves a specific or latest version of the resource update with metadata chunk at params.Root
|
||||
// Lookup works differently depending on the configuration of `LookupParams`
|
||||
// See the `LookupParams` documentation and helper functions:
|
||||
// `LookupLatest`, `LookupLatestVersionInPeriod` and `LookupVersion`
|
||||
// Lookup retrieves a specific or latest version of the resource
|
||||
// Lookup works differently depending on the configuration of `ID`
|
||||
// See the `ID` documentation and helper functions:
|
||||
// `LookupLatest` and `LookupBefore`
|
||||
// When looking for the latest update, it starts at the next period after the current time.
|
||||
// upon failure tries the corresponding keys of each previous period until one is found
|
||||
// (or startTime is reached, in which case there are no updates).
|
||||
func (h *Handler) Lookup(ctx context.Context, params *LookupParams) (*resource, error) {
|
||||
func (h *Handler) Lookup(ctx context.Context, query *Query) (*cacheEntry, error) {
|
||||
|
||||
rsrc := h.get(params.rootAddr)
|
||||
if rsrc == nil {
|
||||
return nil, NewError(ErrNothingToReturn, "resource not loaded")
|
||||
timeLimit := query.TimeLimit
|
||||
if timeLimit == 0 { // if time limit is set to zero, the user wants to get the latest update
|
||||
timeLimit = TimestampProvider.Now().Time
|
||||
}
|
||||
return h.lookup(rsrc, params)
|
||||
}
|
||||
|
||||
// LookupPrevious returns the resource before the one currently loaded in the resource cache
|
||||
// This is useful where resource updates are used incrementally in contrast to
|
||||
// merely replacing content.
|
||||
// Requires a cached resource object to determine the current state of the resource.
|
||||
func (h *Handler) LookupPrevious(ctx context.Context, params *LookupParams) (*resource, error) {
|
||||
rsrc := h.get(params.rootAddr)
|
||||
if rsrc == nil {
|
||||
return nil, NewError(ErrNothingToReturn, "resource not loaded")
|
||||
if query.Hint == lookup.NoClue { // try to use our cache
|
||||
entry := h.get(&query.View)
|
||||
if entry != nil && entry.Epoch.Time <= timeLimit { // avoid bad hints
|
||||
query.Hint = entry.Epoch
|
||||
}
|
||||
}
|
||||
if !rsrc.isSynced() {
|
||||
return nil, NewError(ErrNotSynced, "LookupPrevious requires synced resource.")
|
||||
} else if rsrc.period == 0 {
|
||||
return nil, NewError(ErrNothingToReturn, " not found")
|
||||
}
|
||||
var version, period uint32
|
||||
if rsrc.version > 1 {
|
||||
version = rsrc.version - 1
|
||||
period = rsrc.period
|
||||
} else if rsrc.period == 1 {
|
||||
return nil, NewError(ErrNothingToReturn, "Current update is the oldest")
|
||||
} else {
|
||||
version = 0
|
||||
period = rsrc.period - 1
|
||||
}
|
||||
return h.lookup(rsrc, NewLookupParams(rsrc.rootAddr, period, version, params.Limit))
|
||||
}
|
||||
|
||||
// base code for public lookup methods
|
||||
func (h *Handler) lookup(rsrc *resource, params *LookupParams) (*resource, error) {
|
||||
|
||||
lp := *params
|
||||
// we can't look for anything without a store
|
||||
if h.chunkStore == nil {
|
||||
return nil, NewError(ErrInit, "Call Handler.SetStore() before performing lookups")
|
||||
}
|
||||
|
||||
var specificperiod bool
|
||||
if lp.period > 0 {
|
||||
specificperiod = true
|
||||
} else {
|
||||
// get the current time and the next period
|
||||
now := TimestampProvider.Now()
|
||||
var ul ID
|
||||
ul.View = query.View
|
||||
var readCount int
|
||||
|
||||
var period uint32
|
||||
period, err := getNextPeriod(rsrc.StartTime.Time, now.Time, rsrc.Frequency)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
lp.period = period
|
||||
}
|
||||
|
||||
// start from the last possible period, and iterate previous ones
|
||||
// (unless we want a specific period only) until we find a match.
|
||||
// If we hit startTime we're out of options
|
||||
var specificversion bool
|
||||
if lp.version > 0 {
|
||||
specificversion = true
|
||||
} else {
|
||||
lp.version = 1
|
||||
}
|
||||
|
||||
var hops uint32
|
||||
if lp.Limit == 0 {
|
||||
lp.Limit = h.queryMaxPeriods
|
||||
}
|
||||
log.Trace("resource lookup", "period", lp.period, "version", lp.version, "limit", lp.Limit)
|
||||
for lp.period > 0 {
|
||||
if lp.Limit != 0 && hops > lp.Limit {
|
||||
return nil, NewErrorf(ErrPeriodDepth, "Lookup exceeded max period hops (%d)", lp.Limit)
|
||||
}
|
||||
updateAddr := lp.UpdateAddr()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultRetrieveTimeout)
|
||||
// Invoke the lookup engine.
|
||||
// The callback will be called every time the lookup algorithm needs to guess
|
||||
requestPtr, err := lookup.Lookup(timeLimit, query.Hint, func(epoch lookup.Epoch, now uint64) (interface{}, error) {
|
||||
readCount++
|
||||
ul.Epoch = epoch
|
||||
ctx, cancel := context.WithTimeout(ctx, defaultRetrieveTimeout)
|
||||
defer cancel()
|
||||
|
||||
chunk, err := h.chunkStore.Get(ctx, updateAddr)
|
||||
if err == nil {
|
||||
if specificversion {
|
||||
return h.updateIndex(rsrc, chunk)
|
||||
}
|
||||
// check if we have versions > 1. If a version fails, the previous version is used and returned.
|
||||
log.Trace("rsrc update version 1 found, checking for version updates", "period", lp.period, "updateAddr", updateAddr)
|
||||
for {
|
||||
newversion := lp.version + 1
|
||||
updateAddr := lp.UpdateAddr()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultRetrieveTimeout)
|
||||
defer cancel()
|
||||
|
||||
newchunk, err := h.chunkStore.Get(ctx, updateAddr)
|
||||
if err != nil {
|
||||
return h.updateIndex(rsrc, chunk)
|
||||
}
|
||||
chunk = newchunk
|
||||
lp.version = newversion
|
||||
log.Trace("version update found, checking next", "version", lp.version, "period", lp.period, "updateAddr", updateAddr)
|
||||
}
|
||||
chunk, err := h.chunkStore.Get(ctx, ul.Addr())
|
||||
if err != nil { // TODO: check for catastrophic errors other than chunk not found
|
||||
return nil, nil
|
||||
}
|
||||
if specificperiod {
|
||||
break
|
||||
}
|
||||
log.Trace("rsrc update not found, checking previous period", "period", lp.period, "updateAddr", updateAddr)
|
||||
lp.period--
|
||||
hops++
|
||||
}
|
||||
return nil, NewError(ErrNotFound, "no updates found")
|
||||
}
|
||||
|
||||
// Load retrieves the Mutable Resource metadata chunk stored at rootAddr
|
||||
// Upon retrieval it creates/updates the index entry for it with metadata corresponding to the chunk contents
|
||||
func (h *Handler) Load(ctx context.Context, rootAddr storage.Address) (*resource, error) {
|
||||
//TODO: Maybe add timeout to context, defaultRetrieveTimeout?
|
||||
ctx, cancel := context.WithTimeout(ctx, defaultRetrieveTimeout)
|
||||
defer cancel()
|
||||
chunk, err := h.chunkStore.Get(ctx, rootAddr)
|
||||
var request Request
|
||||
if err := request.fromChunk(chunk.Address(), chunk.Data()); err != nil {
|
||||
return nil, nil
|
||||
}
|
||||
if request.Time <= timeLimit {
|
||||
return &request, nil
|
||||
}
|
||||
return nil, nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, NewError(ErrNotFound, err.Error())
|
||||
}
|
||||
|
||||
// create the index entry
|
||||
rsrc := &resource{}
|
||||
|
||||
if err := rsrc.ResourceMetadata.binaryGet(chunk.Data()); err != nil { // Will fail if this is not really a metadata chunk
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rsrc.rootAddr, rsrc.metaHash = metadataHash(chunk.Data())
|
||||
if !bytes.Equal(rsrc.rootAddr, rootAddr) {
|
||||
return nil, NewError(ErrCorruptData, "Corrupt metadata chunk")
|
||||
log.Info(fmt.Sprintf("Resource lookup finished in %d lookups", readCount))
|
||||
|
||||
request, _ := requestPtr.(*Request)
|
||||
if request == nil {
|
||||
return nil, NewError(ErrNotFound, "no updates found")
|
||||
}
|
||||
h.set(rootAddr, rsrc)
|
||||
log.Trace("resource index load", "rootkey", rootAddr, "name", rsrc.ResourceMetadata.Name, "starttime", rsrc.ResourceMetadata.StartTime, "frequency", rsrc.ResourceMetadata.Frequency)
|
||||
return rsrc, nil
|
||||
return h.updateCache(request)
|
||||
|
||||
}
|
||||
|
||||
// update mutable resource index map with specified content
|
||||
func (h *Handler) updateIndex(rsrc *resource, chunk storage.Chunk) (*resource, error) {
|
||||
// update mutable resource cache map with specified content
|
||||
func (h *Handler) updateCache(request *Request) (*cacheEntry, error) {
|
||||
|
||||
// retrieve metadata from chunk data and check that it matches this mutable resource
|
||||
var r SignedResourceUpdate
|
||||
if err := r.fromChunk(chunk.Address(), chunk.Data()); err != nil {
|
||||
return nil, err
|
||||
updateAddr := request.Addr()
|
||||
log.Trace("resource cache update", "topic", request.Topic.Hex(), "updatekey", updateAddr, "epoch time", request.Epoch.Time, "epoch level", request.Epoch.Level)
|
||||
|
||||
rsrc := h.get(&request.View)
|
||||
if rsrc == nil {
|
||||
rsrc = &cacheEntry{}
|
||||
h.set(&request.View, rsrc)
|
||||
}
|
||||
log.Trace("resource index update", "name", rsrc.ResourceMetadata.Name, "updatekey", chunk.Address(), "period", r.period, "version", r.version)
|
||||
|
||||
// update our rsrcs entry map
|
||||
rsrc.lastKey = chunk.Address()
|
||||
rsrc.period = r.period
|
||||
rsrc.version = r.version
|
||||
rsrc.updated = time.Now()
|
||||
rsrc.data = make([]byte, len(r.data))
|
||||
rsrc.multihash = r.multihash
|
||||
copy(rsrc.data, r.data)
|
||||
rsrc.lastKey = updateAddr
|
||||
rsrc.ResourceUpdate = request.ResourceUpdate
|
||||
rsrc.Reader = bytes.NewReader(rsrc.data)
|
||||
log.Debug("resource synced", "name", rsrc.ResourceMetadata.Name, "updateAddr", chunk.Address(), "period", rsrc.period, "version", rsrc.version)
|
||||
h.set(chunk.Address(), rsrc)
|
||||
return rsrc, nil
|
||||
}
|
||||
|
||||
@@ -442,23 +248,16 @@ func (h *Handler) updateIndex(rsrc *resource, chunk storage.Chunk) (*resource, e
|
||||
// Note that a Mutable Resource update cannot span chunks, and thus has a MAX NET LENGTH 4096, INCLUDING update header data and signature. An error will be returned if the total length of the chunk payload will exceed this limit.
|
||||
// Update can only check if the caller is trying to overwrite the very last known version, otherwise it just puts the update
|
||||
// on the network.
|
||||
func (h *Handler) Update(ctx context.Context, r *SignedResourceUpdate) (storage.Address, error) {
|
||||
return h.update(ctx, r)
|
||||
}
|
||||
|
||||
// create and commit an update
|
||||
func (h *Handler) update(ctx context.Context, r *SignedResourceUpdate) (updateAddr storage.Address, err error) {
|
||||
func (h *Handler) Update(ctx context.Context, r *Request) (updateAddr storage.Address, err error) {
|
||||
|
||||
// we can't update anything without a store
|
||||
if h.chunkStore == nil {
|
||||
return nil, NewError(ErrInit, "Call Handler.SetStore() before updating")
|
||||
}
|
||||
|
||||
rsrc := h.get(r.rootAddr)
|
||||
if rsrc != nil && rsrc.period != 0 && rsrc.version != 0 && // This is the only cheap check we can do for sure
|
||||
rsrc.period == r.period && rsrc.version >= r.version { // without having to lookup update chunks
|
||||
|
||||
return nil, NewError(ErrInvalidValue, "A former update in this period is already known to exist")
|
||||
rsrc := h.get(&r.View)
|
||||
if rsrc != nil && rsrc.Epoch.Equals(r.Epoch) { // This is the only cheap check we can do for sure
|
||||
return nil, NewError(ErrInvalidValue, "A former update in this epoch is already known to exist")
|
||||
}
|
||||
|
||||
chunk, err := r.toChunk() // Serialize the update into a chunk. Fails if data is too big
|
||||
@@ -468,49 +267,32 @@ func (h *Handler) update(ctx context.Context, r *SignedResourceUpdate) (updateAd
|
||||
|
||||
// send the chunk
|
||||
h.chunkStore.Put(ctx, chunk)
|
||||
log.Trace("resource update", "updateAddr", r.updateAddr, "lastperiod", r.period, "version", r.version, "data", chunk.Data(), "multihash", r.multihash)
|
||||
|
||||
// update our resources map entry if the new update is older than the one we have, if we have it.
|
||||
if rsrc != nil && (r.period > rsrc.period || (rsrc.period == r.period && r.version > rsrc.version)) {
|
||||
rsrc.period = r.period
|
||||
rsrc.version = r.version
|
||||
log.Trace("resource update", "updateAddr", r.idAddr, "epoch time", r.Epoch.Time, "epoch level", r.Epoch.Level, "data", chunk.Data())
|
||||
// update our resources map cache entry if the new update is older than the one we have, if we have it.
|
||||
if rsrc != nil && r.Epoch.After(rsrc.Epoch) {
|
||||
rsrc.Epoch = r.Epoch
|
||||
rsrc.data = make([]byte, len(r.data))
|
||||
rsrc.updated = time.Now()
|
||||
rsrc.lastKey = r.updateAddr
|
||||
rsrc.multihash = r.multihash
|
||||
rsrc.lastKey = r.idAddr
|
||||
copy(rsrc.data, r.data)
|
||||
rsrc.Reader = bytes.NewReader(rsrc.data)
|
||||
}
|
||||
return r.updateAddr, nil
|
||||
|
||||
return r.idAddr, nil
|
||||
}
|
||||
|
||||
// Retrieves the resource index value for the given nameHash
|
||||
func (h *Handler) get(rootAddr storage.Address) *resource {
|
||||
if len(rootAddr) < storage.AddressLength {
|
||||
log.Warn("Handler.get with invalid rootAddr")
|
||||
return nil
|
||||
}
|
||||
hashKey := *(*uint64)(unsafe.Pointer(&rootAddr[0]))
|
||||
// Retrieves the resource cache value for the given nameHash
|
||||
func (h *Handler) get(view *View) *cacheEntry {
|
||||
mapKey := view.mapKey()
|
||||
h.resourceLock.RLock()
|
||||
defer h.resourceLock.RUnlock()
|
||||
rsrc := h.resources[hashKey]
|
||||
rsrc := h.resources[mapKey]
|
||||
return rsrc
|
||||
}
|
||||
|
||||
// Sets the resource index value for the given nameHash
|
||||
func (h *Handler) set(rootAddr storage.Address, rsrc *resource) {
|
||||
if len(rootAddr) < storage.AddressLength {
|
||||
log.Warn("Handler.set with invalid rootAddr")
|
||||
return
|
||||
}
|
||||
hashKey := *(*uint64)(unsafe.Pointer(&rootAddr[0]))
|
||||
// Sets the resource cache value for the given View
|
||||
func (h *Handler) set(view *View, rsrc *cacheEntry) {
|
||||
mapKey := view.mapKey()
|
||||
h.resourceLock.Lock()
|
||||
defer h.resourceLock.Unlock()
|
||||
h.resources[hashKey] = rsrc
|
||||
}
|
||||
|
||||
// Checks if we already have an update on this resource, according to the value in the current state of the resource index
|
||||
func (h *Handler) hasUpdate(rootAddr storage.Address, period uint32) bool {
|
||||
rsrc := h.get(rootAddr)
|
||||
return rsrc != nil && rsrc.period == period
|
||||
h.resources[mapKey] = rsrc
|
||||
}
|
||||
|
Reference in New Issue
Block a user