Scale services concurrently

This commit is contained in:
Frederik Ring 2024-01-27 17:00:43 +01:00
parent d0f3f41fe7
commit cfa5f073d2
2 changed files with 122 additions and 95 deletions

View File

@ -16,6 +16,7 @@ import (
"path/filepath" "path/filepath"
"slices" "slices"
"strings" "strings"
"sync"
"text/template" "text/template"
"time" "time"
@ -319,19 +320,6 @@ func newScript() (*script, error) {
return s, nil return s, nil
} }
type noopWriteCloser struct {
io.Writer
}
func (noopWriteCloser) Close() error {
return nil
}
type handledSwarmService struct {
serviceID string
initialReplicaCount uint64
}
// stopContainersAndServices stops all Docker containers that are marked as to being // stopContainersAndServices stops all Docker containers that are marked as to being
// stopped during the backup and returns a function that can be called to // stopped during the backup and returns a function that can be called to
// restart everything that has been stopped. // restart everything that has been stopped.
@ -437,16 +425,19 @@ func (s *script) stopContainersAndServices() (func() error, error) {
} }
var scaledDownServices []swarm.Service var scaledDownServices []swarm.Service
var scaleDownErrors []error var scaleDownErrors concurrentSlice[error]
if isDockerSwarm { if isDockerSwarm {
wg := sync.WaitGroup{}
for _, svc := range servicesToScaleDown { for _, svc := range servicesToScaleDown {
wg.Add(1)
go func(svc handledSwarmService) {
defer wg.Done()
service, _, err := s.cli.ServiceInspectWithRaw(context.Background(), svc.serviceID, types.ServiceInspectOptions{}) service, _, err := s.cli.ServiceInspectWithRaw(context.Background(), svc.serviceID, types.ServiceInspectOptions{})
if err != nil { if err != nil {
scaleDownErrors = append( scaleDownErrors.append(
scaleDownErrors,
fmt.Errorf("(*script).stopContainersAndServices: error inspecting service %s: %w", svc.serviceID, err), fmt.Errorf("(*script).stopContainersAndServices: error inspecting service %s: %w", svc.serviceID, err),
) )
continue return
} }
var zero uint64 = 0 var zero uint64 = 0
serviceMode := &service.Spec.Mode serviceMode := &service.Spec.Mode
@ -454,17 +445,16 @@ func (s *script) stopContainersAndServices() (func() error, error) {
case serviceMode.Replicated != nil: case serviceMode.Replicated != nil:
serviceMode.Replicated.Replicas = &zero serviceMode.Replicated.Replicas = &zero
default: default:
scaleDownErrors = append( scaleDownErrors.append(
scaleDownErrors,
fmt.Errorf("(*script).stopContainersAndServices: labeled service %s has to be in replicated mode", service.Spec.Name), fmt.Errorf("(*script).stopContainersAndServices: labeled service %s has to be in replicated mode", service.Spec.Name),
) )
continue return
} }
response, err := s.cli.ServiceUpdate(context.Background(), service.ID, service.Version, service.Spec, types.ServiceUpdateOptions{}) response, err := s.cli.ServiceUpdate(context.Background(), service.ID, service.Version, service.Spec, types.ServiceUpdateOptions{})
if err != nil { if err != nil {
scaleDownErrors = append(scaleDownErrors, err) scaleDownErrors.append(err)
continue return
} }
for _, warning := range response.Warnings { for _, warning := range response.Warnings {
@ -474,7 +464,7 @@ func (s *script) stopContainersAndServices() (func() error, error) {
} }
if err := progress.ServiceProgress(context.Background(), s.cli, service.ID, discardWriter); err != nil { if err := progress.ServiceProgress(context.Background(), s.cli, service.ID, discardWriter); err != nil {
scaleDownErrors = append(scaleDownErrors, err) scaleDownErrors.append(err)
} else { } else {
scaledDownServices = append(scaledDownServices, service) scaledDownServices = append(scaledDownServices, service)
} }
@ -489,7 +479,7 @@ func (s *script) stopContainersAndServices() (func() error, error) {
}), }),
}) })
if err != nil { if err != nil {
scaleDownErrors = append(scaleDownErrors, err) scaleDownErrors.append(err)
break break
} }
if len(containers) == 0 { if len(containers) == 0 {
@ -497,7 +487,9 @@ func (s *script) stopContainersAndServices() (func() error, error) {
} }
time.Sleep(time.Second) time.Sleep(time.Second)
} }
}(svc)
} }
wg.Wait()
} }
s.stats.Containers = ContainersStats{ s.stats.Containers = ContainersStats{
@ -511,11 +503,11 @@ func (s *script) stopContainersAndServices() (func() error, error) {
All: uint(len(allServices)), All: uint(len(allServices)),
ToScaleDown: uint(len(servicesToScaleDown)), ToScaleDown: uint(len(servicesToScaleDown)),
ScaledDown: uint(len(scaledDownServices)), ScaledDown: uint(len(scaledDownServices)),
ScaleDownErrors: uint(len(scaleDownErrors)), ScaleDownErrors: uint(len(scaleDownErrors.value())),
} }
var initialErr error var initialErr error
allErrors := append(stopErrors, scaleDownErrors...) allErrors := append(stopErrors, scaleDownErrors.value()...)
if len(allErrors) != 0 { if len(allErrors) != 0 {
initialErr = fmt.Errorf( initialErr = fmt.Errorf(
"(*script).stopContainersAndServices: %d error(s) stopping containers: %w", "(*script).stopContainersAndServices: %d error(s) stopping containers: %w",
@ -565,13 +557,17 @@ func (s *script) stopContainersAndServices() (func() error, error) {
} }
} }
var scaleUpErrors []error var scaleUpErrors concurrentSlice[error]
if isDockerSwarm { if isDockerSwarm {
wg := &sync.WaitGroup{}
for _, svc := range servicesToScaleDown { for _, svc := range servicesToScaleDown {
wg.Add(1)
go func(svc handledSwarmService) {
defer wg.Done()
service, _, err := s.cli.ServiceInspectWithRaw(context.Background(), svc.serviceID, types.ServiceInspectOptions{}) service, _, err := s.cli.ServiceInspectWithRaw(context.Background(), svc.serviceID, types.ServiceInspectOptions{})
if err != nil { if err != nil {
scaleUpErrors = append(scaleUpErrors, err) scaleUpErrors.append(err)
continue return
} }
service.Spec.Mode.Replicated.Replicas = &svc.initialReplicaCount service.Spec.Mode.Replicated.Replicas = &svc.initialReplicaCount
@ -582,8 +578,8 @@ func (s *script) stopContainersAndServices() (func() error, error) {
types.ServiceUpdateOptions{}, types.ServiceUpdateOptions{},
) )
if err != nil { if err != nil {
scaleUpErrors = append(scaleUpErrors, err) scaleUpErrors.append(err)
continue return
} }
for _, warning := range response.Warnings { for _, warning := range response.Warnings {
s.logger.Warn( s.logger.Warn(
@ -591,12 +587,14 @@ func (s *script) stopContainersAndServices() (func() error, error) {
) )
} }
if err := progress.ServiceProgress(context.Background(), s.cli, service.ID, discardWriter); err != nil { if err := progress.ServiceProgress(context.Background(), s.cli, service.ID, discardWriter); err != nil {
scaleUpErrors = append(scaleUpErrors, err) scaleUpErrors.append(err)
} }
}(svc)
} }
wg.Wait()
} }
allErrors := append(restartErrors, scaleUpErrors...) allErrors := append(restartErrors, scaleUpErrors.value()...)
if len(allErrors) != 0 { if len(allErrors) != 0 {
return fmt.Errorf( return fmt.Errorf(
"stopContainers: %d error(s) restarting containers and services: %w", "stopContainers: %d error(s) restarting containers and services: %w",

View File

@ -8,6 +8,7 @@ import (
"fmt" "fmt"
"io" "io"
"os" "os"
"sync"
) )
var noop = func() error { return nil } var noop = func() error { return nil }
@ -50,3 +51,31 @@ func (b *bufferingWriter) Write(p []byte) (n int, err error) {
} }
return b.writer.Write(p) return b.writer.Write(p)
} }
type noopWriteCloser struct {
io.Writer
}
func (noopWriteCloser) Close() error {
return nil
}
type handledSwarmService struct {
serviceID string
initialReplicaCount uint64
}
type concurrentSlice[T any] struct {
val []T
sync.Mutex
}
func (c *concurrentSlice[T]) append(v T) {
c.Lock()
defer c.Unlock()
c.val = append(c.val, v)
}
func (c *concurrentSlice[T]) value() []T {
return c.val
}