server: propagate context to storage layer (#455)

This change propagates the context of the actual user request through.
Additionally it configures the Storj Backend to do in memory buffering instead of relying on temporary files in /tmp/ folders.
Fixes #448
This commit is contained in:
Stefan Benten
2021-12-26 17:17:28 +01:00
committed by GitHub
parent f062af9fc5
commit 492731e31f
3 changed files with 86 additions and 97 deletions

View File

@ -1,19 +1,10 @@
package server
import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"golang.org/x/net/context"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
"google.golang.org/api/drive/v3"
"google.golang.org/api/googleapi"
"io"
"io/ioutil"
"log"
@ -23,6 +14,17 @@ import (
"strings"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
"google.golang.org/api/drive/v3"
"google.golang.org/api/googleapi"
"storj.io/common/fpath"
"storj.io/common/storj"
"storj.io/uplink"
)
@ -30,17 +32,17 @@ import (
// Storage is the interface for storage operation
type Storage interface {
// Get retrieves a file from storage
Get(token string, filename string) (reader io.ReadCloser, contentLength uint64, err error)
Get(ctx context.Context, token string, filename string) (reader io.ReadCloser, contentLength uint64, err error)
// Head retrieves content length of a file from storage
Head(token string, filename string) (contentLength uint64, err error)
Head(ctx context.Context, token string, filename string) (contentLength uint64, err error)
// Put saves a file on storage
Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error
Put(ctx context.Context, token string, filename string, reader io.Reader, contentType string, contentLength uint64) error
// Delete removes a file from storage
Delete(token string, filename string) error
Delete(ctx context.Context, token string, filename string) error
// IsNotExist indicates if a file doesn't exist on storage
IsNotExist(err error) bool
// Purge cleans up the storage
Purge(days time.Duration) error
Purge(ctx context.Context, days time.Duration) error
// Type returns the storage type
Type() string
@ -64,7 +66,7 @@ func (s *LocalStorage) Type() string {
}
// Head retrieves content length of a file from storage
func (s *LocalStorage) Head(token string, filename string) (contentLength uint64, err error) {
func (s *LocalStorage) Head(ctx context.Context, token string, filename string) (contentLength uint64, err error) {
path := filepath.Join(s.basedir, token, filename)
var fi os.FileInfo
@ -78,7 +80,7 @@ func (s *LocalStorage) Head(token string, filename string) (contentLength uint64
}
// Get retrieves a file from storage
func (s *LocalStorage) Get(token string, filename string) (reader io.ReadCloser, contentLength uint64, err error) {
func (s *LocalStorage) Get(ctx context.Context, token string, filename string) (reader io.ReadCloser, contentLength uint64, err error) {
path := filepath.Join(s.basedir, token, filename)
// content type , content length
@ -97,7 +99,7 @@ func (s *LocalStorage) Get(token string, filename string) (reader io.ReadCloser,
}
// Delete removes a file from storage
func (s *LocalStorage) Delete(token string, filename string) (err error) {
func (s *LocalStorage) Delete(ctx context.Context, token string, filename string) (err error) {
metadata := filepath.Join(s.basedir, token, fmt.Sprintf("%s.metadata", filename))
_ = os.Remove(metadata)
@ -107,7 +109,7 @@ func (s *LocalStorage) Delete(token string, filename string) (err error) {
}
// Purge cleans up the storage
func (s *LocalStorage) Purge(days time.Duration) (err error) {
func (s *LocalStorage) Purge(ctx context.Context, days time.Duration) (err error) {
err = filepath.Walk(s.basedir,
func(path string, info os.FileInfo, err error) error {
if err != nil {
@ -138,7 +140,7 @@ func (s *LocalStorage) IsNotExist(err error) bool {
}
// Put saves a file on storage
func (s *LocalStorage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
func (s *LocalStorage) Put(ctx context.Context, token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
var f io.WriteCloser
var err error
@ -193,7 +195,7 @@ func (s *S3Storage) Type() string {
}
// Head retrieves content length of a file from storage
func (s *S3Storage) Head(token string, filename string) (contentLength uint64, err error) {
func (s *S3Storage) Head(ctx context.Context, token string, filename string) (contentLength uint64, err error) {
key := fmt.Sprintf("%s/%s", token, filename)
headRequest := &s3.HeadObjectInput{
@ -202,7 +204,7 @@ func (s *S3Storage) Head(token string, filename string) (contentLength uint64, e
}
// content type , content length
response, err := s.s3.HeadObject(headRequest)
response, err := s.s3.HeadObjectWithContext(ctx, headRequest)
if err != nil {
return
}
@ -215,7 +217,7 @@ func (s *S3Storage) Head(token string, filename string) (contentLength uint64, e
}
// Purge cleans up the storage
func (s *S3Storage) Purge(days time.Duration) (err error) {
func (s *S3Storage) Purge(ctx context.Context, days time.Duration) (err error) {
// NOOP expiration is set at upload time
return nil
}
@ -237,7 +239,7 @@ func (s *S3Storage) IsNotExist(err error) bool {
}
// Get retrieves a file from storage
func (s *S3Storage) Get(token string, filename string) (reader io.ReadCloser, contentLength uint64, err error) {
func (s *S3Storage) Get(ctx context.Context, token string, filename string) (reader io.ReadCloser, contentLength uint64, err error) {
key := fmt.Sprintf("%s/%s", token, filename)
getRequest := &s3.GetObjectInput{
@ -245,7 +247,7 @@ func (s *S3Storage) Get(token string, filename string) (reader io.ReadCloser, co
Key: aws.String(key),
}
response, err := s.s3.GetObject(getRequest)
response, err := s.s3.GetObjectWithContext(ctx, getRequest)
if err != nil {
return
}
@ -259,14 +261,14 @@ func (s *S3Storage) Get(token string, filename string) (reader io.ReadCloser, co
}
// Delete removes a file from storage
func (s *S3Storage) Delete(token string, filename string) (err error) {
func (s *S3Storage) Delete(ctx context.Context, token string, filename string) (err error) {
metadata := fmt.Sprintf("%s/%s.metadata", token, filename)
deleteRequest := &s3.DeleteObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(metadata),
}
_, err = s.s3.DeleteObject(deleteRequest)
_, err = s.s3.DeleteObjectWithContext(ctx, deleteRequest)
if err != nil {
return
}
@ -277,13 +279,13 @@ func (s *S3Storage) Delete(token string, filename string) (err error) {
Key: aws.String(key),
}
_, err = s.s3.DeleteObject(deleteRequest)
_, err = s.s3.DeleteObjectWithContext(ctx, deleteRequest)
return
}
// Put saves a file on storage
func (s *S3Storage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) (err error) {
func (s *S3Storage) Put(ctx context.Context, token string, filename string, reader io.Reader, contentType string, contentLength uint64) (err error) {
key := fmt.Sprintf("%s/%s", token, filename)
s.logger.Printf("Uploading file %s to S3 Bucket", filename)
@ -305,7 +307,7 @@ func (s *S3Storage) Put(token string, filename string, reader io.Reader, content
expire = aws.Time(time.Now().Add(s.purgeDays))
}
_, err = uploader.Upload(&s3manager.UploadInput{
_, err = uploader.UploadWithContext(ctx, &s3manager.UploadInput{
Bucket: aws.String(s.bucket),
Key: aws.String(key),
Body: reader,
@ -339,7 +341,7 @@ func NewGDriveStorage(clientJSONFilepath string, localConfigPath string, basedir
}
// ToDo: Upgrade deprecated version
srv, err := drive.New(getGDriveClient(config, localConfigPath, logger)) // nolint: staticcheck
srv, err := drive.New(getGDriveClient(context.TODO(), config, localConfigPath, logger)) // nolint: staticcheck
if err != nil {
return nil, err
}
@ -468,7 +470,7 @@ func (s *GDrive) Type() string {
}
// Head retrieves content length of a file from storage
func (s *GDrive) Head(token string, filename string) (contentLength uint64, err error) {
func (s *GDrive) Head(ctx context.Context, token string, filename string) (contentLength uint64, err error) {
var fileID string
fileID, err = s.findID(filename, token)
if err != nil {
@ -486,7 +488,7 @@ func (s *GDrive) Head(token string, filename string) (contentLength uint64, err
}
// Get retrieves a file from storage
func (s *GDrive) Get(token string, filename string) (reader io.ReadCloser, contentLength uint64, err error) {
func (s *GDrive) Get(ctx context.Context, token string, filename string) (reader io.ReadCloser, contentLength uint64, err error) {
var fileID string
fileID, err = s.findID(filename, token)
if err != nil {
@ -505,7 +507,6 @@ func (s *GDrive) Get(token string, filename string) (reader io.ReadCloser, conte
contentLength = uint64(fi.Size)
ctx := context.Background()
var res *http.Response
res, err = s.service.Files.Get(fileID).Context(ctx).Download()
if err != nil {
@ -518,7 +519,7 @@ func (s *GDrive) Get(token string, filename string) (reader io.ReadCloser, conte
}
// Delete removes a file from storage
func (s *GDrive) Delete(token string, filename string) (err error) {
func (s *GDrive) Delete(ctx context.Context, token string, filename string) (err error) {
metadata, _ := s.findID(fmt.Sprintf("%s.metadata", filename), token)
_ = s.service.Files.Delete(metadata).Do()
@ -533,7 +534,7 @@ func (s *GDrive) Delete(token string, filename string) (err error) {
}
// Purge cleans up the storage
func (s *GDrive) Purge(days time.Duration) (err error) {
func (s *GDrive) Purge(ctx context.Context, days time.Duration) (err error) {
nextPageToken := ""
expirationDate := time.Now().Add(-1 * days).Format(time.RFC3339)
@ -578,7 +579,7 @@ func (s *GDrive) IsNotExist(err error) bool {
}
// Put saves a file on storage
func (s *GDrive) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
func (s *GDrive) Put(ctx context.Context, token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
dirID, err := s.findID("", token)
if err != nil {
return err
@ -607,7 +608,6 @@ func (s *GDrive) Put(token string, filename string, reader io.Reader, contentTyp
MimeType: contentType,
}
ctx := context.Background()
_, err = s.service.Files.Create(dst).Context(ctx).Media(reader, googleapi.ChunkSize(s.chunkSize)).Do()
if err != nil {
@ -618,19 +618,19 @@ func (s *GDrive) Put(token string, filename string, reader io.Reader, contentTyp
}
// Retrieve a token, saves the token, then returns the generated client.
func getGDriveClient(config *oauth2.Config, localConfigPath string, logger *log.Logger) *http.Client {
func getGDriveClient(ctx context.Context, config *oauth2.Config, localConfigPath string, logger *log.Logger) *http.Client {
tokenFile := filepath.Join(localConfigPath, gdriveTokenJSONFile)
tok, err := gDriveTokenFromFile(tokenFile)
if err != nil {
tok = getGDriveTokenFromWeb(config, logger)
tok = getGDriveTokenFromWeb(ctx, config, logger)
saveGDriveToken(tokenFile, tok, logger)
}
return config.Client(context.Background(), tok)
return config.Client(ctx, tok)
}
// Request a token from the web, then returns the retrieved token.
func getGDriveTokenFromWeb(config *oauth2.Config, logger *log.Logger) *oauth2.Token {
func getGDriveTokenFromWeb(ctx context.Context, config *oauth2.Config, logger *log.Logger) *oauth2.Token {
authURL := config.AuthCodeURL("state-token", oauth2.AccessTypeOffline)
fmt.Printf("Go to the following link in your browser then type the "+
"authorization code: \n%v\n", authURL)
@ -640,7 +640,7 @@ func getGDriveTokenFromWeb(config *oauth2.Config, logger *log.Logger) *oauth2.To
logger.Fatalf("Unable to read authorization code %v", err)
}
tok, err := config.Exchange(context.TODO(), authCode)
tok, err := config.Exchange(ctx, authCode)
if err != nil {
logger.Fatalf("Unable to retrieve token from web %v", err)
}
@ -688,7 +688,9 @@ func NewStorjStorage(access, bucket string, purgeDays int, logger *log.Logger) (
var instance StorjStorage
var err error
ctx := context.TODO()
pCtx := context.TODO()
ctx := fpath.WithTempData(pCtx, "", true)
parsedAccess, err := uplink.ParseAccess(access)
if err != nil {
@ -720,12 +722,10 @@ func (s *StorjStorage) Type() string {
}
// Head retrieves content length of a file from storage
func (s *StorjStorage) Head(token string, filename string) (contentLength uint64, err error) {
func (s *StorjStorage) Head(ctx context.Context, token string, filename string) (contentLength uint64, err error) {
key := storj.JoinPaths(token, filename)
ctx := context.TODO()
obj, err := s.project.StatObject(ctx, s.bucket.Name, key)
obj, err := s.project.StatObject(fpath.WithTempData(ctx, "", true), s.bucket.Name, key)
if err != nil {
return 0, err
}
@ -736,14 +736,12 @@ func (s *StorjStorage) Head(token string, filename string) (contentLength uint64
}
// Get retrieves a file from storage
func (s *StorjStorage) Get(token string, filename string) (reader io.ReadCloser, contentLength uint64, err error) {
func (s *StorjStorage) Get(ctx context.Context, token string, filename string) (reader io.ReadCloser, contentLength uint64, err error) {
key := storj.JoinPaths(token, filename)
s.logger.Printf("Getting file %s from Storj Bucket", filename)
ctx := context.TODO()
download, err := s.project.DownloadObject(ctx, s.bucket.Name, key, nil)
download, err := s.project.DownloadObject(fpath.WithTempData(ctx, "", true), s.bucket.Name, key, nil)
if err != nil {
return nil, 0, err
}
@ -755,38 +753,34 @@ func (s *StorjStorage) Get(token string, filename string) (reader io.ReadCloser,
}
// Delete removes a file from storage
func (s *StorjStorage) Delete(token string, filename string) (err error) {
func (s *StorjStorage) Delete(ctx context.Context, token string, filename string) (err error) {
key := storj.JoinPaths(token, filename)
s.logger.Printf("Deleting file %s from Storj Bucket", filename)
ctx := context.TODO()
_, err = s.project.DeleteObject(ctx, s.bucket.Name, key)
_, err = s.project.DeleteObject(fpath.WithTempData(ctx, "", true), s.bucket.Name, key)
return
}
// Purge cleans up the storage
func (s *StorjStorage) Purge(days time.Duration) (err error) {
func (s *StorjStorage) Purge(ctx context.Context, days time.Duration) (err error) {
// NOOP expiration is set at upload time
return nil
}
// Put saves a file on storage
func (s *StorjStorage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) (err error) {
func (s *StorjStorage) Put(ctx context.Context, token string, filename string, reader io.Reader, contentType string, contentLength uint64) (err error) {
key := storj.JoinPaths(token, filename)
s.logger.Printf("Uploading file %s to Storj Bucket", filename)
ctx := context.TODO()
var uploadOptions *uplink.UploadOptions
if s.purgeDays.Hours() > 0 {
uploadOptions = &uplink.UploadOptions{Expires: time.Now().Add(s.purgeDays)}
}
writer, err := s.project.UploadObject(ctx, s.bucket.Name, key, uploadOptions)
writer, err := s.project.UploadObject(fpath.WithTempData(ctx, "", true), s.bucket.Name, key, uploadOptions)
if err != nil {
return err
}