Merge pull request #1 from stefanbenten/storj-storage-adapter
Implement Storj storage adapter
This commit is contained in:
38
cmd/cmd.go
38
cmd/cmd.go
@ -149,6 +149,30 @@ var globalFlags = []cli.Flag{
|
||||
Usage: "",
|
||||
Value: googleapi.DefaultUploadChunkSize / 1024 / 1024,
|
||||
},
|
||||
cli.StringFlag{
|
||||
Name: "storj-endpoint",
|
||||
Usage: "Satellite Address including Port.",
|
||||
Value: "",
|
||||
EnvVar: "STORJ_ENDPOINT",
|
||||
},
|
||||
cli.StringFlag{
|
||||
Name: "storj-apikey",
|
||||
Usage: "",
|
||||
Value: "",
|
||||
EnvVar: "STORJ_API_KEY",
|
||||
},
|
||||
cli.StringFlag{
|
||||
Name: "storj-bucket",
|
||||
Usage: "",
|
||||
Value: "",
|
||||
EnvVar: "STORJ_BUCKET",
|
||||
},
|
||||
cli.StringFlag{
|
||||
Name: "storj-enckey",
|
||||
Usage: "Encryption Key for local file encryption",
|
||||
Value: "",
|
||||
EnvVar: "STORJ_ENC_KEY",
|
||||
},
|
||||
cli.IntFlag{
|
||||
Name: "rate-limit",
|
||||
Usage: "requests per minute",
|
||||
@ -362,6 +386,20 @@ func New() *Cmd {
|
||||
} else {
|
||||
options = append(options, server.UseStorage(storage))
|
||||
}
|
||||
case "storj":
|
||||
if endpoint := c.String("storj-endpoint"); endpoint == "" {
|
||||
panic("storj-endpoint not set.")
|
||||
} else if apiKey := c.String("storj-apikey"); apiKey == "" {
|
||||
panic("storj-apikey not set.")
|
||||
} else if bucket := c.String("storj-bucket"); bucket == "" {
|
||||
panic("storj-enckey not set.")
|
||||
} else if encKey := c.String("storj-enckey"); encKey == "" {
|
||||
panic("storj-bucket not set.")
|
||||
} else if storage, err := server.NewStorjStorage(endpoint, apiKey, bucket, encKey, logger); err != nil {
|
||||
panic(err)
|
||||
} else {
|
||||
options = append(options, server.UseStorage(storage))
|
||||
}
|
||||
case "local":
|
||||
if v := c.String("basedir"); v == "" {
|
||||
panic("basedir not set.")
|
||||
|
2
go.mod
2
go.mod
@ -15,7 +15,6 @@ require (
|
||||
github.com/golang/gddo v0.0.0-20190815223733-287de01127ef
|
||||
github.com/gorilla/mux v1.7.3
|
||||
github.com/gorilla/securecookie v1.1.1 // indirect
|
||||
github.com/kr/pretty v0.1.0 // indirect
|
||||
github.com/mattn/go-colorable v0.1.2 // indirect
|
||||
github.com/mattn/go-isatty v0.0.9 // indirect
|
||||
github.com/microcosm-cc/bluemonday v1.0.2
|
||||
@ -31,6 +30,7 @@ require (
|
||||
google.golang.org/api v0.9.0
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127
|
||||
gopkg.in/russross/blackfriday.v2 v2.0.1
|
||||
storj.io/storj v0.19.0
|
||||
)
|
||||
|
||||
replace gopkg.in/russross/blackfriday.v2 v2.0.1 => github.com/russross/blackfriday/v2 v2.0.1
|
||||
|
@ -17,11 +17,14 @@ import (
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
"github.com/aws/aws-sdk-go/service/s3/s3manager"
|
||||
"github.com/zeebo/errs"
|
||||
"golang.org/x/net/context"
|
||||
"golang.org/x/oauth2"
|
||||
"golang.org/x/oauth2/google"
|
||||
"google.golang.org/api/drive/v3"
|
||||
"google.golang.org/api/googleapi"
|
||||
"storj.io/storj/lib/uplink"
|
||||
"storj.io/storj/pkg/storj"
|
||||
)
|
||||
|
||||
type Storage interface {
|
||||
@ -561,3 +564,113 @@ func saveGDriveToken(path string, token *oauth2.Token, logger *log.Logger) {
|
||||
|
||||
json.NewEncoder(f).Encode(token)
|
||||
}
|
||||
|
||||
var uplinkFailure = errs.Class("uplink failure")
|
||||
|
||||
type StorjStorage struct {
|
||||
Storage
|
||||
uplink *uplink.Uplink
|
||||
project *uplink.Project
|
||||
bucket *uplink.Bucket
|
||||
logger *log.Logger
|
||||
}
|
||||
|
||||
func NewStorjStorage(endpoint, apiKey, bucket, encKey string, logger *log.Logger) (*StorjStorage, error) {
|
||||
var instance StorjStorage
|
||||
var err error
|
||||
|
||||
ctx := context.TODO()
|
||||
|
||||
instance.uplink, err = uplink.NewUplink(ctx, nil)
|
||||
if err != nil {
|
||||
return nil, uplinkFailure.New("could not create new Uplink Instance: %v", err)
|
||||
}
|
||||
|
||||
key, err := uplink.ParseAPIKey(apiKey)
|
||||
if err != nil {
|
||||
return nil, uplinkFailure.New("could not parse api key: %v", err)
|
||||
}
|
||||
|
||||
instance.project, err = instance.uplink.OpenProject(ctx, endpoint, key)
|
||||
if err != nil {
|
||||
return nil, uplinkFailure.New("could not open project: %v", err)
|
||||
}
|
||||
|
||||
saltenckey, err := instance.project.SaltedKeyFromPassphrase(ctx, encKey)
|
||||
if err != nil {
|
||||
return nil, uplinkFailure.New("could not generate salted enc key: %v", err)
|
||||
}
|
||||
|
||||
access := uplink.NewEncryptionAccessWithDefaultKey(*saltenckey)
|
||||
|
||||
instance.bucket, err = instance.project.OpenBucket(ctx, bucket, access)
|
||||
if err != nil {
|
||||
return nil, uplinkFailure.New("could not open bucket %q: %v", bucket, err)
|
||||
}
|
||||
|
||||
instance.logger = logger
|
||||
|
||||
return &instance, nil
|
||||
}
|
||||
|
||||
func (s *StorjStorage) Type() string {
|
||||
return "storj"
|
||||
}
|
||||
|
||||
func (s *StorjStorage) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
|
||||
key := storj.JoinPaths(token, filename)
|
||||
|
||||
ctx := context.TODO()
|
||||
|
||||
obj, err := s.bucket.OpenObject(ctx, key)
|
||||
if err != nil {
|
||||
return "", 0, fmt.Errorf("unable to open object %v", err)
|
||||
}
|
||||
contentType = obj.Meta.ContentType
|
||||
contentLength = uint64(obj.Meta.Size)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (s *StorjStorage) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
|
||||
key := storj.JoinPaths(token, filename)
|
||||
|
||||
s.logger.Printf("Getting file %s from Storj Bucket", filename)
|
||||
|
||||
ctx := context.TODO()
|
||||
|
||||
obj, err := s.bucket.OpenObject(ctx, key)
|
||||
if err != nil {
|
||||
return nil, "", 0, uplinkFailure.New("unable to open object %v", err)
|
||||
}
|
||||
contentType = obj.Meta.ContentType
|
||||
contentLength = uint64(obj.Meta.Size)
|
||||
reader, err = obj.DownloadRange(ctx, 0, -1)
|
||||
return
|
||||
}
|
||||
|
||||
func (s *StorjStorage) Delete(token string, filename string) (err error) {
|
||||
key := storj.JoinPaths(token, filename)
|
||||
|
||||
s.logger.Printf("Deleting file %s from Storj Bucket", filename)
|
||||
|
||||
ctx := context.TODO()
|
||||
|
||||
err = s.bucket.DeleteObject(ctx, key)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (s *StorjStorage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) (err error) {
|
||||
key := storj.JoinPaths(token, filename)
|
||||
|
||||
s.logger.Printf("Uploading file %s to Storj Bucket", filename)
|
||||
|
||||
ctx := context.TODO()
|
||||
|
||||
err = s.bucket.UploadObject(ctx, key, reader, &uplink.UploadOptions{ContentType: contentType})
|
||||
if err != nil {
|
||||
return uplinkFailure.New("could not upload: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
Reference in New Issue
Block a user