Add concurrency level for parallel upload to dropbox.

This commit is contained in:
MaxJa4 2023-08-21 13:26:00 +02:00
parent 6362b322c2
commit 05c7a15e5b
3 changed files with 69 additions and 26 deletions

View File

@ -72,6 +72,7 @@ type Config struct {
AzureStorageEndpoint string `split_words:"true" default:"https://{{ .AccountName }}.blob.core.windows.net/"` AzureStorageEndpoint string `split_words:"true" default:"https://{{ .AccountName }}.blob.core.windows.net/"`
DropboxToken string `split_words:"true"` DropboxToken string `split_words:"true"`
DropboxRemotePath string `split_words:"true"` DropboxRemotePath string `split_words:"true"`
DropboxConcurrencyLevel int `split_words:"true" default:"2"`
} }
func (c *Config) resolveSecret(envVar string, secretPath string) (string, error) { func (c *Config) resolveSecret(envVar string, secretPath string) (string, error) {

View File

@ -224,6 +224,7 @@ func newScript() (*script, error) {
dropboxConfig := dropbox.Config{ dropboxConfig := dropbox.Config{
Token: s.c.DropboxToken, Token: s.c.DropboxToken,
RemotePath: s.c.DropboxRemotePath, RemotePath: s.c.DropboxRemotePath,
ConcurrencyLevel: s.c.DropboxConcurrencyLevel,
} }
dropboxBackend, err := dropbox.NewStorageBackend(dropboxConfig, logFunc) dropboxBackend, err := dropbox.NewStorageBackend(dropboxConfig, logFunc)
if err != nil { if err != nil {

View File

@ -8,6 +8,7 @@ import (
"path/filepath" "path/filepath"
"reflect" "reflect"
"strings" "strings"
"sync"
"time" "time"
"github.com/dropbox/dropbox-sdk-go-unofficial/v6/dropbox" "github.com/dropbox/dropbox-sdk-go-unofficial/v6/dropbox"
@ -18,12 +19,14 @@ import (
type dropboxStorage struct { type dropboxStorage struct {
*storage.StorageBackend *storage.StorageBackend
client files.Client client files.Client
concurrencyLevel int
} }
// Config allows to configure a Dropbox storage backend. // Config allows to configure a Dropbox storage backend.
type Config struct { type Config struct {
Token string Token string
RemotePath string RemotePath string
ConcurrencyLevel int
} }
// NewStorageBackend creates and initializes a new Dropbox storage backend. // NewStorageBackend creates and initializes a new Dropbox storage backend.
@ -34,12 +37,18 @@ func NewStorageBackend(opts Config, logFunc storage.Log) (storage.Backend, error
client := files.New(config) client := files.New(config)
if opts.ConcurrencyLevel < 1 {
logFunc(storage.LogLevelWarning, "Dropbox", "Concurrency level must be at least 1! Using 1 instead of %d.", opts.ConcurrencyLevel)
opts.ConcurrencyLevel = 1
}
return &dropboxStorage{ return &dropboxStorage{
StorageBackend: &storage.StorageBackend{ StorageBackend: &storage.StorageBackend{
DestinationPath: opts.RemotePath, DestinationPath: opts.RemotePath,
Log: logFunc, Log: logFunc,
}, },
client: client, client: client,
concurrencyLevel: opts.ConcurrencyLevel,
}, nil }, nil
} }
@ -85,12 +94,39 @@ func (b *dropboxStorage) Copy(file string) error {
const chunkSize = 148 * 1024 * 1024 // 148MB const chunkSize = 148 * 1024 * 1024 // 148MB
var offset uint64 = 0 var offset uint64 = 0
var guard = make(chan struct{}, b.concurrencyLevel)
var errorChn = make(chan error, b.concurrencyLevel)
var EOFChn = make(chan bool, b.concurrencyLevel)
var mu sync.Mutex
loop:
for { for {
guard <- struct{}{} // limit concurrency
select {
case err := <-errorChn: // error from goroutine
return err
case <-EOFChn: // EOF from goroutine
break loop
default:
}
go func() {
defer func() { <-guard }()
chunk := make([]byte, chunkSize) chunk := make([]byte, chunkSize)
mu.Lock() // to preserve offset of chunks
select {
case <-EOFChn:
EOFChn <- true // put it back for outer loop
return // already EOF
default:
}
bytesRead, err := r.Read(chunk) bytesRead, err := r.Read(chunk)
if err != nil { if err != nil {
return fmt.Errorf("(*dropboxStorage).Copy: Error reading the file to be uploaded: %w", err) errorChn <- fmt.Errorf("(*dropboxStorage).Copy: Error reading the file to be uploaded: %w", err)
return
} }
chunk = chunk[:bytesRead] chunk = chunk[:bytesRead]
@ -99,25 +135,30 @@ func (b *dropboxStorage) Copy(file string) error {
) )
isEOF := bytesRead < chunkSize isEOF := bytesRead < chunkSize
uploadSessionAppendArg.Close = isEOF uploadSessionAppendArg.Close = isEOF
if isEOF {
EOFChn <- true
}
offset += uint64(bytesRead)
mu.Unlock()
if err := b.client.UploadSessionAppendV2(uploadSessionAppendArg, bytes.NewReader(chunk)); err != nil { if err := b.client.UploadSessionAppendV2(uploadSessionAppendArg, bytes.NewReader(chunk)); err != nil {
return fmt.Errorf("(*dropboxStorage).Copy: Error appending the file to the upload session: %w", err) errorChn <- fmt.Errorf("(*dropboxStorage).Copy: Error appending the file to the upload session: %w", err)
return
} }
}()
if isEOF {
break
}
offset += uint64(bytesRead)
} }
// Finish the upload session, commit the file (no new data added) // Finish the upload session, commit the file (no new data added)
b.client.UploadSessionFinish( _, err = b.client.UploadSessionFinish(
files.NewUploadSessionFinishArg( files.NewUploadSessionFinishArg(
files.NewUploadSessionCursor(sessionId, 0), files.NewUploadSessionCursor(sessionId, 0),
files.NewCommitInfo(filepath.Join(b.DestinationPath, name)), files.NewCommitInfo(filepath.Join(b.DestinationPath, name)),
), nil) ), nil)
if err != nil {
return fmt.Errorf("(*dropboxStorage).Copy: Error finishing the upload session: %w", err)
}
b.Log(storage.LogLevelInfo, b.Name(), "Uploaded a copy of backup '%s' to Dropbox at path '%s'.", file, b.DestinationPath) b.Log(storage.LogLevelInfo, b.Name(), "Uploaded a copy of backup '%s' to Dropbox at path '%s'.", file, b.DestinationPath)