ISSUE-242

This commit is contained in:
Andrea Spacca
2019-06-23 09:11:54 +02:00
parent 14c48cb4d8
commit 703987c516
4 changed files with 94 additions and 196 deletions

View File

@ -1,9 +1,12 @@
package server
import (
"bytes"
"encoding/json"
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"io"
"io/ioutil"
"log"
@ -11,11 +14,9 @@ import (
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"github.com/goamz/goamz/s3"
"github.com/aws/aws-sdk-go/service/s3"
"golang.org/x/net/context"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
@ -124,18 +125,17 @@ func (s *LocalStorage) Put(token string, filename string, reader io.Reader, cont
type S3Storage struct {
Storage
bucket *s3.Bucket
bucket string
session *session.Session
s3 *s3.S3
logger *log.Logger
noMultipart bool
}
func NewS3Storage(accessKey, secretKey, bucketName, endpoint string, logger *log.Logger, disableMultipart bool) (*S3Storage, error) {
bucket, err := getBucket(accessKey, secretKey, bucketName, endpoint)
if err != nil {
return nil, err
}
sess := getAwsSession(accessKey, secretKey, endpoint)
return &S3Storage{bucket: bucket, logger: logger, noMultipart: disableMultipart}, nil
return &S3Storage{bucket: bucketName, s3: s3.New(sess), session: sess, logger: logger, noMultipart: disableMultipart}, nil
}
func (s *S3Storage) Type() string {
@ -145,16 +145,20 @@ func (s *S3Storage) Type() string {
func (s *S3Storage) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
key := fmt.Sprintf("%s/%s", token, filename)
// content type , content length
response, err := s.bucket.Head(key, map[string][]string{})
if err != nil {
return
headRequest := &s3.HeadObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(key),
}
contentType = response.Header.Get("Content-Type")
contentLength, err = strconv.ParseUint(response.Header.Get("Content-Length"), 10, 0)
if err != nil {
return
// content type , content length
_, response := s.s3.HeadObjectRequest(headRequest)
if response.ContentType != nil {
contentType = *response.ContentType
}
if response.ContentLength != nil {
contentLength = uint64(*response.ContentLength)
}
return
@ -165,26 +169,32 @@ func (s *S3Storage) IsNotExist(err error) bool {
return false
}
s.logger.Printf("IsNotExist: %s, %#v", err.Error(), err)
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case s3.ErrCodeNoSuchKey:
return true
}
}
b := (err.Error() == "The specified key does not exist.")
b = b || (err.Error() == "Access Denied")
return b
return false
}
func (s *S3Storage) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
key := fmt.Sprintf("%s/%s", token, filename)
// content type , content length
response, err := s.bucket.GetResponse(key)
if err != nil {
return
getRequest := &s3.GetObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(key),
}
contentType = response.Header.Get("Content-Type")
contentLength, err = strconv.ParseUint(response.Header.Get("Content-Length"), 10, 0)
if err != nil {
return
_, response := s.s3.GetObjectRequest(getRequest)
if response.ContentType != nil {
contentType = *response.ContentType
}
if response.ContentLength != nil {
contentLength = uint64(*response.ContentLength)
}
reader = response.Body
@ -193,123 +203,23 @@ func (s *S3Storage) Get(token string, filename string) (reader io.ReadCloser, co
func (s *S3Storage) Delete(token string, filename string) (err error) {
metadata := fmt.Sprintf("%s/%s.metadata", token, filename)
s.bucket.Del(metadata)
deleteRequest := &s3.DeleteObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(metadata),
}
_, err = s.s3.DeleteObject(deleteRequest)
if err != nil {
return
}
key := fmt.Sprintf("%s/%s", token, filename)
err = s.bucket.Del(key)
return
}
func (s *S3Storage) putMulti(key string, reader io.Reader, contentType string, contentLength uint64) (err error) {
var (
multi *s3.Multi
parts []s3.Part
)
if multi, err = s.bucket.InitMulti(key, contentType, s3.Private); err != nil {
s.logger.Printf(err.Error())
return
deleteRequest = &s3.DeleteObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(key),
}
// 20 mb parts
partsChan := make(chan interface{})
// partsChan := make(chan s3.Part)
go func() {
// maximize to 20 threads
sem := make(chan int, 20)
index := 1
var wg sync.WaitGroup
for {
// buffered in memory because goamz s3 multi needs seekable reader
var (
buffer []byte = make([]byte, (1<<20)*10)
count int
err error
)
// Amazon expects parts of at least 5MB, except for the last one
if count, err = io.ReadAtLeast(reader, buffer, (1<<20)*5); err != nil && err != io.ErrUnexpectedEOF && err != io.EOF {
s.logger.Printf(err.Error())
return
}
// always send minimal 1 part
if err == io.EOF && index > 1 {
s.logger.Printf("Waiting for all parts to finish uploading.")
// wait for all parts to be finished uploading
wg.Wait()
// and close the channel
close(partsChan)
return
}
wg.Add(1)
sem <- 1
// using goroutines because of retries when upload fails
go func(multi *s3.Multi, buffer []byte, index int) {
s.logger.Printf("Uploading part %d %d", index, len(buffer))
defer func() {
s.logger.Printf("Finished part %d %d", index, len(buffer))
wg.Done()
<-sem
}()
partReader := bytes.NewReader(buffer)
var part s3.Part
if part, err = multi.PutPart(index, partReader); err != nil {
s.logger.Printf("Error while uploading part %d %d %s", index, len(buffer), err.Error())
partsChan <- err
return
}
s.logger.Printf("Finished uploading part %d %d", index, len(buffer))
partsChan <- part
}(multi, buffer[:count], index)
index++
}
}()
// wait for all parts to be uploaded
for part := range partsChan {
switch part.(type) {
case s3.Part:
parts = append(parts, part.(s3.Part))
case error:
// abort multi upload
s.logger.Printf("Error during upload, aborting %s.", part.(error).Error())
err = part.(error)
multi.Abort()
return
}
}
s.logger.Printf("Completing upload %d parts", len(parts))
if err = multi.Complete(parts); err != nil {
s.logger.Printf("Error during completing upload %d parts %s", len(parts), err.Error())
return
}
s.logger.Printf("Completed uploading %d", len(parts))
_, err = s.s3.DeleteObject(deleteRequest)
return
}
@ -318,12 +228,29 @@ func (s *S3Storage) Put(token string, filename string, reader io.Reader, content
key := fmt.Sprintf("%s/%s", token, filename)
s.logger.Printf("Uploading file %s to S3 Bucket", filename)
var concurrency int
if !s.noMultipart {
err = s.putMulti(key, reader, contentType, contentLength)
concurrency = 20
} else {
err = s.bucket.PutReader(key, reader, int64(contentLength), contentType, s3.Private, s3.Options{})
concurrency = 1
}
// Create an uploader with the session and custom options
uploader := s3manager.NewUploader(s.session, func(u *s3manager.Uploader) {
u.PartSize = (1 << 20) * 5 // The minimum/default allowed part size is 5MB
u.Concurrency = concurrency // default is 5
u.MaxUploadParts = concurrency
u.LeavePartsOnError = false
})
_, err = uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(s.bucket),
Key: aws.String(key),
Body: reader,
})
return
if err != nil {
return
}