From cfa5f073d2b31b3b64e7d81108f4ce905885119a Mon Sep 17 00:00:00 2001 From: Frederik Ring Date: Sat, 27 Jan 2024 17:00:43 +0100 Subject: [PATCH] Scale services concurrently --- cmd/backup/script.go | 188 +++++++++++++++++++++---------------------- cmd/backup/util.go | 29 +++++++ 2 files changed, 122 insertions(+), 95 deletions(-) diff --git a/cmd/backup/script.go b/cmd/backup/script.go index 50d3d70..71eadb8 100644 --- a/cmd/backup/script.go +++ b/cmd/backup/script.go @@ -16,6 +16,7 @@ import ( "path/filepath" "slices" "strings" + "sync" "text/template" "time" @@ -319,19 +320,6 @@ func newScript() (*script, error) { return s, nil } -type noopWriteCloser struct { - io.Writer -} - -func (noopWriteCloser) Close() error { - return nil -} - -type handledSwarmService struct { - serviceID string - initialReplicaCount uint64 -} - // stopContainersAndServices stops all Docker containers that are marked as to being // stopped during the backup and returns a function that can be called to // restart everything that has been stopped. @@ -437,67 +425,71 @@ func (s *script) stopContainersAndServices() (func() error, error) { } var scaledDownServices []swarm.Service - var scaleDownErrors []error + var scaleDownErrors concurrentSlice[error] if isDockerSwarm { + wg := sync.WaitGroup{} for _, svc := range servicesToScaleDown { - service, _, err := s.cli.ServiceInspectWithRaw(context.Background(), svc.serviceID, types.ServiceInspectOptions{}) - if err != nil { - scaleDownErrors = append( - scaleDownErrors, - fmt.Errorf("(*script).stopContainersAndServices: error inspecting service %s: %w", svc.serviceID, err), - ) - continue - } - var zero uint64 = 0 - serviceMode := &service.Spec.Mode - switch { - case serviceMode.Replicated != nil: - serviceMode.Replicated.Replicas = &zero - default: - scaleDownErrors = append( - scaleDownErrors, - fmt.Errorf("(*script).stopContainersAndServices: labeled service %s has to be in replicated mode", service.Spec.Name), - ) - continue - } - - response, err := s.cli.ServiceUpdate(context.Background(), service.ID, service.Version, service.Spec, types.ServiceUpdateOptions{}) - if err != nil { - scaleDownErrors = append(scaleDownErrors, err) - continue - } - - for _, warning := range response.Warnings { - s.logger.Warn( - fmt.Sprintf("The Docker API returned a warning when scaling down service %s: %s", service.Spec.Name, warning), - ) - } - - if err := progress.ServiceProgress(context.Background(), s.cli, service.ID, discardWriter); err != nil { - scaleDownErrors = append(scaleDownErrors, err) - } else { - scaledDownServices = append(scaledDownServices, service) - } - - // progress.ServiceProgress returns too early, so we need to manually check - // whether all containers belonging to the service have actually been removed - for { - containers, err := s.cli.ContainerList(context.Background(), types.ContainerListOptions{ - Filters: filters.NewArgs(filters.KeyValuePair{ - Key: "label", - Value: fmt.Sprintf("com.docker.swarm.service.id=%s", service.ID), - }), - }) + wg.Add(1) + go func(svc handledSwarmService) { + defer wg.Done() + service, _, err := s.cli.ServiceInspectWithRaw(context.Background(), svc.serviceID, types.ServiceInspectOptions{}) if err != nil { - scaleDownErrors = append(scaleDownErrors, err) - break + scaleDownErrors.append( + fmt.Errorf("(*script).stopContainersAndServices: error inspecting service %s: %w", svc.serviceID, err), + ) + return } - if len(containers) == 0 { - break + var zero uint64 = 0 + serviceMode := &service.Spec.Mode + switch { + case serviceMode.Replicated != nil: + serviceMode.Replicated.Replicas = &zero + default: + scaleDownErrors.append( + fmt.Errorf("(*script).stopContainersAndServices: labeled service %s has to be in replicated mode", service.Spec.Name), + ) + return } - time.Sleep(time.Second) - } + + response, err := s.cli.ServiceUpdate(context.Background(), service.ID, service.Version, service.Spec, types.ServiceUpdateOptions{}) + if err != nil { + scaleDownErrors.append(err) + return + } + + for _, warning := range response.Warnings { + s.logger.Warn( + fmt.Sprintf("The Docker API returned a warning when scaling down service %s: %s", service.Spec.Name, warning), + ) + } + + if err := progress.ServiceProgress(context.Background(), s.cli, service.ID, discardWriter); err != nil { + scaleDownErrors.append(err) + } else { + scaledDownServices = append(scaledDownServices, service) + } + + // progress.ServiceProgress returns too early, so we need to manually check + // whether all containers belonging to the service have actually been removed + for { + containers, err := s.cli.ContainerList(context.Background(), types.ContainerListOptions{ + Filters: filters.NewArgs(filters.KeyValuePair{ + Key: "label", + Value: fmt.Sprintf("com.docker.swarm.service.id=%s", service.ID), + }), + }) + if err != nil { + scaleDownErrors.append(err) + break + } + if len(containers) == 0 { + break + } + time.Sleep(time.Second) + } + }(svc) } + wg.Wait() } s.stats.Containers = ContainersStats{ @@ -511,11 +503,11 @@ func (s *script) stopContainersAndServices() (func() error, error) { All: uint(len(allServices)), ToScaleDown: uint(len(servicesToScaleDown)), ScaledDown: uint(len(scaledDownServices)), - ScaleDownErrors: uint(len(scaleDownErrors)), + ScaleDownErrors: uint(len(scaleDownErrors.value())), } var initialErr error - allErrors := append(stopErrors, scaleDownErrors...) + allErrors := append(stopErrors, scaleDownErrors.value()...) if len(allErrors) != 0 { initialErr = fmt.Errorf( "(*script).stopContainersAndServices: %d error(s) stopping containers: %w", @@ -565,38 +557,44 @@ func (s *script) stopContainersAndServices() (func() error, error) { } } - var scaleUpErrors []error + var scaleUpErrors concurrentSlice[error] if isDockerSwarm { + wg := &sync.WaitGroup{} for _, svc := range servicesToScaleDown { - service, _, err := s.cli.ServiceInspectWithRaw(context.Background(), svc.serviceID, types.ServiceInspectOptions{}) - if err != nil { - scaleUpErrors = append(scaleUpErrors, err) - continue - } + wg.Add(1) + go func(svc handledSwarmService) { + defer wg.Done() + service, _, err := s.cli.ServiceInspectWithRaw(context.Background(), svc.serviceID, types.ServiceInspectOptions{}) + if err != nil { + scaleUpErrors.append(err) + return + } - service.Spec.Mode.Replicated.Replicas = &svc.initialReplicaCount - response, err := s.cli.ServiceUpdate( - context.Background(), - service.ID, - service.Version, service.Spec, - types.ServiceUpdateOptions{}, - ) - if err != nil { - scaleUpErrors = append(scaleUpErrors, err) - continue - } - for _, warning := range response.Warnings { - s.logger.Warn( - fmt.Sprintf("The Docker API returned a warning when scaling up service %s: %s", service.Spec.Name, warning), + service.Spec.Mode.Replicated.Replicas = &svc.initialReplicaCount + response, err := s.cli.ServiceUpdate( + context.Background(), + service.ID, + service.Version, service.Spec, + types.ServiceUpdateOptions{}, ) - } - if err := progress.ServiceProgress(context.Background(), s.cli, service.ID, discardWriter); err != nil { - scaleUpErrors = append(scaleUpErrors, err) - } + if err != nil { + scaleUpErrors.append(err) + return + } + for _, warning := range response.Warnings { + s.logger.Warn( + fmt.Sprintf("The Docker API returned a warning when scaling up service %s: %s", service.Spec.Name, warning), + ) + } + if err := progress.ServiceProgress(context.Background(), s.cli, service.ID, discardWriter); err != nil { + scaleUpErrors.append(err) + } + }(svc) } + wg.Wait() } - allErrors := append(restartErrors, scaleUpErrors...) + allErrors := append(restartErrors, scaleUpErrors.value()...) if len(allErrors) != 0 { return fmt.Errorf( "stopContainers: %d error(s) restarting containers and services: %w", diff --git a/cmd/backup/util.go b/cmd/backup/util.go index c349e7b..e13da86 100644 --- a/cmd/backup/util.go +++ b/cmd/backup/util.go @@ -8,6 +8,7 @@ import ( "fmt" "io" "os" + "sync" ) var noop = func() error { return nil } @@ -50,3 +51,31 @@ func (b *bufferingWriter) Write(p []byte) (n int, err error) { } return b.writer.Write(p) } + +type noopWriteCloser struct { + io.Writer +} + +func (noopWriteCloser) Close() error { + return nil +} + +type handledSwarmService struct { + serviceID string + initialReplicaCount uint64 +} + +type concurrentSlice[T any] struct { + val []T + sync.Mutex +} + +func (c *concurrentSlice[T]) append(v T) { + c.Lock() + defer c.Unlock() + c.val = append(c.val, v) +} + +func (c *concurrentSlice[T]) value() []T { + return c.val +}