Fixed some instabilites. Changed default concurrency to 6.

This commit is contained in:
MaxJa4 2023-08-21 14:33:37 +02:00
parent 05c7a15e5b
commit f29b4c0994
2 changed files with 19 additions and 6 deletions

View File

@ -72,7 +72,7 @@ type Config struct {
AzureStorageEndpoint string `split_words:"true" default:"https://{{ .AccountName }}.blob.core.windows.net/"` AzureStorageEndpoint string `split_words:"true" default:"https://{{ .AccountName }}.blob.core.windows.net/"`
DropboxToken string `split_words:"true"` DropboxToken string `split_words:"true"`
DropboxRemotePath string `split_words:"true"` DropboxRemotePath string `split_words:"true"`
DropboxConcurrencyLevel int `split_words:"true" default:"2"` DropboxConcurrencyLevel int `split_words:"true" default:"6"`
} }
func (c *Config) resolveSecret(envVar string, secretPath string) (string, error) { func (c *Config) resolveSecret(envVar string, secretPath string) (string, error) {

View File

@ -63,11 +63,16 @@ func (b *dropboxStorage) Copy(file string) error {
folderArg := files.NewCreateFolderArg(b.DestinationPath) folderArg := files.NewCreateFolderArg(b.DestinationPath)
if _, err := b.client.CreateFolderV2(folderArg); err != nil { if _, err := b.client.CreateFolderV2(folderArg); err != nil {
if err.(files.CreateFolderV2APIError).EndpointError.Path.Tag == files.WriteErrorConflict { switch err := err.(type) {
case files.CreateFolderV2APIError:
if err.EndpointError.Path.Tag == files.WriteErrorConflict {
b.Log(storage.LogLevelInfo, b.Name(), "Destination path '%s' already exists in Dropbox, no new directory required.", b.DestinationPath) b.Log(storage.LogLevelInfo, b.Name(), "Destination path '%s' already exists in Dropbox, no new directory required.", b.DestinationPath)
} else { } else {
return fmt.Errorf("(*dropboxStorage).Copy: Error creating directory '%s' in Dropbox: %w", b.DestinationPath, err) return fmt.Errorf("(*dropboxStorage).Copy: Error creating directory '%s' in Dropbox: %w", b.DestinationPath, err)
} }
default:
return fmt.Errorf("(*dropboxStorage).Copy: Error creating directory '%s' in Dropbox: %w", b.DestinationPath, err)
}
} }
r, err := os.Open(file) r, err := os.Open(file)
@ -98,6 +103,7 @@ func (b *dropboxStorage) Copy(file string) error {
var errorChn = make(chan error, b.concurrencyLevel) var errorChn = make(chan error, b.concurrencyLevel)
var EOFChn = make(chan bool, b.concurrencyLevel) var EOFChn = make(chan bool, b.concurrencyLevel)
var mu sync.Mutex var mu sync.Mutex
var wg sync.WaitGroup
loop: loop:
for { for {
@ -106,12 +112,17 @@ loop:
case err := <-errorChn: // error from goroutine case err := <-errorChn: // error from goroutine
return err return err
case <-EOFChn: // EOF from goroutine case <-EOFChn: // EOF from goroutine
wg.Wait() // wait for all goroutines to finish
break loop break loop
default: default:
} }
go func() { go func() {
defer func() { <-guard }() defer func() {
wg.Done()
<-guard
}()
wg.Add(1)
chunk := make([]byte, chunkSize) chunk := make([]byte, chunkSize)
mu.Lock() // to preserve offset of chunks mu.Lock() // to preserve offset of chunks
@ -119,6 +130,7 @@ loop:
select { select {
case <-EOFChn: case <-EOFChn:
EOFChn <- true // put it back for outer loop EOFChn <- true // put it back for outer loop
mu.Unlock()
return // already EOF return // already EOF
default: default:
} }
@ -126,6 +138,7 @@ loop:
bytesRead, err := r.Read(chunk) bytesRead, err := r.Read(chunk)
if err != nil { if err != nil {
errorChn <- fmt.Errorf("(*dropboxStorage).Copy: Error reading the file to be uploaded: %w", err) errorChn <- fmt.Errorf("(*dropboxStorage).Copy: Error reading the file to be uploaded: %w", err)
mu.Unlock()
return return
} }
chunk = chunk[:bytesRead] chunk = chunk[:bytesRead]