From 05c7a15e5b65dde12773afc75e4deb43775bd3a4 Mon Sep 17 00:00:00 2001 From: MaxJa4 <74194322+MaxJa4@users.noreply.github.com> Date: Mon, 21 Aug 2023 13:26:00 +0200 Subject: [PATCH] Add concurrency level for parallel upload to dropbox. --- cmd/backup/config.go | 1 + cmd/backup/script.go | 5 +- internal/storage/dropbox/dropbox.go | 89 +++++++++++++++++++++-------- 3 files changed, 69 insertions(+), 26 deletions(-) diff --git a/cmd/backup/config.go b/cmd/backup/config.go index b297304..b84f90c 100644 --- a/cmd/backup/config.go +++ b/cmd/backup/config.go @@ -72,6 +72,7 @@ type Config struct { AzureStorageEndpoint string `split_words:"true" default:"https://{{ .AccountName }}.blob.core.windows.net/"` DropboxToken string `split_words:"true"` DropboxRemotePath string `split_words:"true"` + DropboxConcurrencyLevel int `split_words:"true" default:"2"` } func (c *Config) resolveSecret(envVar string, secretPath string) (string, error) { diff --git a/cmd/backup/script.go b/cmd/backup/script.go index c074105..fc48449 100644 --- a/cmd/backup/script.go +++ b/cmd/backup/script.go @@ -222,8 +222,9 @@ func newScript() (*script, error) { if s.c.DropboxToken != "" { dropboxConfig := dropbox.Config{ - Token: s.c.DropboxToken, - RemotePath: s.c.DropboxRemotePath, + Token: s.c.DropboxToken, + RemotePath: s.c.DropboxRemotePath, + ConcurrencyLevel: s.c.DropboxConcurrencyLevel, } dropboxBackend, err := dropbox.NewStorageBackend(dropboxConfig, logFunc) if err != nil { diff --git a/internal/storage/dropbox/dropbox.go b/internal/storage/dropbox/dropbox.go index 7129f09..129655a 100644 --- a/internal/storage/dropbox/dropbox.go +++ b/internal/storage/dropbox/dropbox.go @@ -8,6 +8,7 @@ import ( "path/filepath" "reflect" "strings" + "sync" "time" "github.com/dropbox/dropbox-sdk-go-unofficial/v6/dropbox" @@ -17,13 +18,15 @@ import ( type dropboxStorage struct { *storage.StorageBackend - client files.Client + client files.Client + concurrencyLevel int } // Config allows to configure a Dropbox storage backend. type Config struct { - Token string - RemotePath string + Token string + RemotePath string + ConcurrencyLevel int } // NewStorageBackend creates and initializes a new Dropbox storage backend. @@ -34,12 +37,18 @@ func NewStorageBackend(opts Config, logFunc storage.Log) (storage.Backend, error client := files.New(config) + if opts.ConcurrencyLevel < 1 { + logFunc(storage.LogLevelWarning, "Dropbox", "Concurrency level must be at least 1! Using 1 instead of %d.", opts.ConcurrencyLevel) + opts.ConcurrencyLevel = 1 + } + return &dropboxStorage{ StorageBackend: &storage.StorageBackend{ DestinationPath: opts.RemotePath, Log: logFunc, }, - client: client, + client: client, + concurrencyLevel: opts.ConcurrencyLevel, }, nil } @@ -85,39 +94,71 @@ func (b *dropboxStorage) Copy(file string) error { const chunkSize = 148 * 1024 * 1024 // 148MB var offset uint64 = 0 + var guard = make(chan struct{}, b.concurrencyLevel) + var errorChn = make(chan error, b.concurrencyLevel) + var EOFChn = make(chan bool, b.concurrencyLevel) + var mu sync.Mutex +loop: for { - chunk := make([]byte, chunkSize) - bytesRead, err := r.Read(chunk) - if err != nil { - return fmt.Errorf("(*dropboxStorage).Copy: Error reading the file to be uploaded: %w", err) - } - chunk = chunk[:bytesRead] - - uploadSessionAppendArg := files.NewUploadSessionAppendArg( - files.NewUploadSessionCursor(sessionId, offset), - ) - isEOF := bytesRead < chunkSize - uploadSessionAppendArg.Close = isEOF - - if err := b.client.UploadSessionAppendV2(uploadSessionAppendArg, bytes.NewReader(chunk)); err != nil { - return fmt.Errorf("(*dropboxStorage).Copy: Error appending the file to the upload session: %w", err) + guard <- struct{}{} // limit concurrency + select { + case err := <-errorChn: // error from goroutine + return err + case <-EOFChn: // EOF from goroutine + break loop + default: } - if isEOF { - break - } + go func() { + defer func() { <-guard }() + chunk := make([]byte, chunkSize) - offset += uint64(bytesRead) + mu.Lock() // to preserve offset of chunks + + select { + case <-EOFChn: + EOFChn <- true // put it back for outer loop + return // already EOF + default: + } + + bytesRead, err := r.Read(chunk) + if err != nil { + errorChn <- fmt.Errorf("(*dropboxStorage).Copy: Error reading the file to be uploaded: %w", err) + return + } + chunk = chunk[:bytesRead] + + uploadSessionAppendArg := files.NewUploadSessionAppendArg( + files.NewUploadSessionCursor(sessionId, offset), + ) + isEOF := bytesRead < chunkSize + uploadSessionAppendArg.Close = isEOF + if isEOF { + EOFChn <- true + } + offset += uint64(bytesRead) + + mu.Unlock() + + if err := b.client.UploadSessionAppendV2(uploadSessionAppendArg, bytes.NewReader(chunk)); err != nil { + errorChn <- fmt.Errorf("(*dropboxStorage).Copy: Error appending the file to the upload session: %w", err) + return + } + }() } // Finish the upload session, commit the file (no new data added) - b.client.UploadSessionFinish( + _, err = b.client.UploadSessionFinish( files.NewUploadSessionFinishArg( files.NewUploadSessionCursor(sessionId, 0), files.NewCommitInfo(filepath.Join(b.DestinationPath, name)), ), nil) + if err != nil { + return fmt.Errorf("(*dropboxStorage).Copy: Error finishing the upload session: %w", err) + } b.Log(storage.LogLevelInfo, b.Name(), "Uploaded a copy of backup '%s' to Dropbox at path '%s'.", file, b.DestinationPath)