194 lines
		
	
	
		
			5.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
		
		
			
		
	
	
			194 lines
		
	
	
		
			5.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| 
								 | 
							
								// Copyright 2019 The go-ethereum Authors
							 | 
						||
| 
								 | 
							
								// This file is part of the go-ethereum library.
							 | 
						||
| 
								 | 
							
								//
							 | 
						||
| 
								 | 
							
								// The go-ethereum library is free software: you can redistribute it and/or modify
							 | 
						||
| 
								 | 
							
								// it under the terms of the GNU Lesser General Public License as published by
							 | 
						||
| 
								 | 
							
								// the Free Software Foundation, either version 3 of the License, or
							 | 
						||
| 
								 | 
							
								// (at your option) any later version.
							 | 
						||
| 
								 | 
							
								//
							 | 
						||
| 
								 | 
							
								// The go-ethereum library is distributed in the hope that it will be useful,
							 | 
						||
| 
								 | 
							
								// but WITHOUT ANY WARRANTY; without even the implied warranty of
							 | 
						||
| 
								 | 
							
								// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
							 | 
						||
| 
								 | 
							
								// GNU Lesser General Public License for more details.
							 | 
						||
| 
								 | 
							
								//
							 | 
						||
| 
								 | 
							
								// You should have received a copy of the GNU Lesser General Public License
							 | 
						||
| 
								 | 
							
								// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								package localstore
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								import (
							 | 
						||
| 
								 | 
							
									"bytes"
							 | 
						||
| 
								 | 
							
									"context"
							 | 
						||
| 
								 | 
							
									"errors"
							 | 
						||
| 
								 | 
							
									"fmt"
							 | 
						||
| 
								 | 
							
									"sync"
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									"github.com/ethereum/go-ethereum/log"
							 | 
						||
| 
								 | 
							
									"github.com/ethereum/go-ethereum/swarm/shed"
							 | 
						||
| 
								 | 
							
									"github.com/ethereum/go-ethereum/swarm/storage"
							 | 
						||
| 
								 | 
							
								)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// SubscribePull returns a channel that provides chunk addresses and stored times from pull syncing index.
							 | 
						||
| 
								 | 
							
								// Pull syncing index can be only subscribed to a particular proximity order bin. If since
							 | 
						||
| 
								 | 
							
								// is not nil, the iteration will start from the first item stored after that timestamp. If until is not nil,
							 | 
						||
| 
								 | 
							
								// only chunks stored up to this timestamp will be send to the channel, and the returned channel will be
							 | 
						||
| 
								 | 
							
								// closed. The since-until interval is open on the left and closed on the right (since,until]. Returned stop
							 | 
						||
| 
								 | 
							
								// function will terminate current and further iterations without errors, and also close the returned channel.
							 | 
						||
| 
								 | 
							
								// Make sure that you check the second returned parameter from the channel to stop iteration when its value
							 | 
						||
| 
								 | 
							
								// is false.
							 | 
						||
| 
								 | 
							
								func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until *ChunkDescriptor) (c <-chan ChunkDescriptor, stop func()) {
							 | 
						||
| 
								 | 
							
									chunkDescriptors := make(chan ChunkDescriptor)
							 | 
						||
| 
								 | 
							
									trigger := make(chan struct{}, 1)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									db.pullTriggersMu.Lock()
							 | 
						||
| 
								 | 
							
									if _, ok := db.pullTriggers[bin]; !ok {
							 | 
						||
| 
								 | 
							
										db.pullTriggers[bin] = make([]chan struct{}, 0)
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
									db.pullTriggers[bin] = append(db.pullTriggers[bin], trigger)
							 | 
						||
| 
								 | 
							
									db.pullTriggersMu.Unlock()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									// send signal for the initial iteration
							 | 
						||
| 
								 | 
							
									trigger <- struct{}{}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									stopChan := make(chan struct{})
							 | 
						||
| 
								 | 
							
									var stopChanOnce sync.Once
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									// used to provide information from the iterator to
							 | 
						||
| 
								 | 
							
									// stop subscription when until chunk descriptor is reached
							 | 
						||
| 
								 | 
							
									var errStopSubscription = errors.New("stop subscription")
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									go func() {
							 | 
						||
| 
								 | 
							
										// close the returned ChunkDescriptor channel at the end to
							 | 
						||
| 
								 | 
							
										// signal that the subscription is done
							 | 
						||
| 
								 | 
							
										defer close(chunkDescriptors)
							 | 
						||
| 
								 | 
							
										// sinceItem is the Item from which the next iteration
							 | 
						||
| 
								 | 
							
										// should start. The first iteration starts from the first Item.
							 | 
						||
| 
								 | 
							
										var sinceItem *shed.Item
							 | 
						||
| 
								 | 
							
										if since != nil {
							 | 
						||
| 
								 | 
							
											sinceItem = &shed.Item{
							 | 
						||
| 
								 | 
							
												Address:        since.Address,
							 | 
						||
| 
								 | 
							
												StoreTimestamp: since.StoreTimestamp,
							 | 
						||
| 
								 | 
							
											}
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
										for {
							 | 
						||
| 
								 | 
							
											select {
							 | 
						||
| 
								 | 
							
											case <-trigger:
							 | 
						||
| 
								 | 
							
												// iterate until:
							 | 
						||
| 
								 | 
							
												// - last index Item is reached
							 | 
						||
| 
								 | 
							
												// - subscription stop is called
							 | 
						||
| 
								 | 
							
												// - context is done
							 | 
						||
| 
								 | 
							
												err := db.pullIndex.Iterate(func(item shed.Item) (stop bool, err error) {
							 | 
						||
| 
								 | 
							
													select {
							 | 
						||
| 
								 | 
							
													case chunkDescriptors <- ChunkDescriptor{
							 | 
						||
| 
								 | 
							
														Address:        item.Address,
							 | 
						||
| 
								 | 
							
														StoreTimestamp: item.StoreTimestamp,
							 | 
						||
| 
								 | 
							
													}:
							 | 
						||
| 
								 | 
							
														// until chunk descriptor is sent
							 | 
						||
| 
								 | 
							
														// break the iteration
							 | 
						||
| 
								 | 
							
														if until != nil &&
							 | 
						||
| 
								 | 
							
															(item.StoreTimestamp >= until.StoreTimestamp ||
							 | 
						||
| 
								 | 
							
																bytes.Equal(item.Address, until.Address)) {
							 | 
						||
| 
								 | 
							
															return true, errStopSubscription
							 | 
						||
| 
								 | 
							
														}
							 | 
						||
| 
								 | 
							
														// set next iteration start item
							 | 
						||
| 
								 | 
							
														// when its chunk is successfully sent to channel
							 | 
						||
| 
								 | 
							
														sinceItem = &item
							 | 
						||
| 
								 | 
							
														return false, nil
							 | 
						||
| 
								 | 
							
													case <-stopChan:
							 | 
						||
| 
								 | 
							
														// gracefully stop the iteration
							 | 
						||
| 
								 | 
							
														// on stop
							 | 
						||
| 
								 | 
							
														return true, nil
							 | 
						||
| 
								 | 
							
													case <-db.close:
							 | 
						||
| 
								 | 
							
														// gracefully stop the iteration
							 | 
						||
| 
								 | 
							
														// on database close
							 | 
						||
| 
								 | 
							
														return true, nil
							 | 
						||
| 
								 | 
							
													case <-ctx.Done():
							 | 
						||
| 
								 | 
							
														return true, ctx.Err()
							 | 
						||
| 
								 | 
							
													}
							 | 
						||
| 
								 | 
							
												}, &shed.IterateOptions{
							 | 
						||
| 
								 | 
							
													StartFrom: sinceItem,
							 | 
						||
| 
								 | 
							
													// sinceItem was sent as the last Address in the previous
							 | 
						||
| 
								 | 
							
													// iterator call, skip it in this one
							 | 
						||
| 
								 | 
							
													SkipStartFromItem: true,
							 | 
						||
| 
								 | 
							
													Prefix:            []byte{bin},
							 | 
						||
| 
								 | 
							
												})
							 | 
						||
| 
								 | 
							
												if err != nil {
							 | 
						||
| 
								 | 
							
													if err == errStopSubscription {
							 | 
						||
| 
								 | 
							
														// stop subscription without any errors
							 | 
						||
| 
								 | 
							
														// if until is reached
							 | 
						||
| 
								 | 
							
														return
							 | 
						||
| 
								 | 
							
													}
							 | 
						||
| 
								 | 
							
													log.Error("localstore pull subscription iteration", "bin", bin, "since", since, "until", until, "err", err)
							 | 
						||
| 
								 | 
							
													return
							 | 
						||
| 
								 | 
							
												}
							 | 
						||
| 
								 | 
							
											case <-stopChan:
							 | 
						||
| 
								 | 
							
												// terminate the subscription
							 | 
						||
| 
								 | 
							
												// on stop
							 | 
						||
| 
								 | 
							
												return
							 | 
						||
| 
								 | 
							
											case <-db.close:
							 | 
						||
| 
								 | 
							
												// terminate the subscription
							 | 
						||
| 
								 | 
							
												// on database close
							 | 
						||
| 
								 | 
							
												return
							 | 
						||
| 
								 | 
							
											case <-ctx.Done():
							 | 
						||
| 
								 | 
							
												err := ctx.Err()
							 | 
						||
| 
								 | 
							
												if err != nil {
							 | 
						||
| 
								 | 
							
													log.Error("localstore pull subscription", "bin", bin, "since", since, "until", until, "err", err)
							 | 
						||
| 
								 | 
							
												}
							 | 
						||
| 
								 | 
							
												return
							 | 
						||
| 
								 | 
							
											}
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
									}()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									stop = func() {
							 | 
						||
| 
								 | 
							
										stopChanOnce.Do(func() {
							 | 
						||
| 
								 | 
							
											close(stopChan)
							 | 
						||
| 
								 | 
							
										})
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
										db.pullTriggersMu.Lock()
							 | 
						||
| 
								 | 
							
										defer db.pullTriggersMu.Unlock()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
										for i, t := range db.pullTriggers[bin] {
							 | 
						||
| 
								 | 
							
											if t == trigger {
							 | 
						||
| 
								 | 
							
												db.pullTriggers[bin] = append(db.pullTriggers[bin][:i], db.pullTriggers[bin][i+1:]...)
							 | 
						||
| 
								 | 
							
												break
							 | 
						||
| 
								 | 
							
											}
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									return chunkDescriptors, stop
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// ChunkDescriptor holds information required for Pull syncing. This struct
							 | 
						||
| 
								 | 
							
								// is provided by subscribing to pull index.
							 | 
						||
| 
								 | 
							
								type ChunkDescriptor struct {
							 | 
						||
| 
								 | 
							
									Address        storage.Address
							 | 
						||
| 
								 | 
							
									StoreTimestamp int64
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (c *ChunkDescriptor) String() string {
							 | 
						||
| 
								 | 
							
									if c == nil {
							 | 
						||
| 
								 | 
							
										return "none"
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
									return fmt.Sprintf("%s stored at %v", c.Address.Hex(), c.StoreTimestamp)
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// triggerPullSubscriptions is used internally for starting iterations
							 | 
						||
| 
								 | 
							
								// on Pull subscriptions for a particular bin. When new item with address
							 | 
						||
| 
								 | 
							
								// that is in particular bin for DB's baseKey is added to pull index
							 | 
						||
| 
								 | 
							
								// this function should be called.
							 | 
						||
| 
								 | 
							
								func (db *DB) triggerPullSubscriptions(bin uint8) {
							 | 
						||
| 
								 | 
							
									db.pullTriggersMu.RLock()
							 | 
						||
| 
								 | 
							
									triggers, ok := db.pullTriggers[bin]
							 | 
						||
| 
								 | 
							
									db.pullTriggersMu.RUnlock()
							 | 
						||
| 
								 | 
							
									if !ok {
							 | 
						||
| 
								 | 
							
										return
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									for _, t := range triggers {
							 | 
						||
| 
								 | 
							
										select {
							 | 
						||
| 
								 | 
							
										case t <- struct{}{}:
							 | 
						||
| 
								 | 
							
										default:
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								}
							 |