Scale services concurrently

This commit is contained in:
Frederik Ring 2024-01-27 17:00:43 +01:00
parent bb37b8b1d8
commit 7ad6fc9355
2 changed files with 122 additions and 95 deletions

View File

@ -16,6 +16,7 @@ import (
"path/filepath"
"slices"
"strings"
"sync"
"text/template"
"time"
@ -319,19 +320,6 @@ func newScript() (*script, error) {
return s, nil
}
type noopWriteCloser struct {
io.Writer
}
func (noopWriteCloser) Close() error {
return nil
}
type handledSwarmService struct {
serviceID string
initialReplicaCount uint64
}
// stopContainersAndServices stops all Docker containers that are marked as to being
// stopped during the backup and returns a function that can be called to
// restart everything that has been stopped.
@ -437,67 +425,71 @@ func (s *script) stopContainersAndServices() (func() error, error) {
}
var scaledDownServices []swarm.Service
var scaleDownErrors []error
var scaleDownErrors concurrentSlice[error]
if isDockerSwarm {
wg := sync.WaitGroup{}
for _, svc := range servicesToScaleDown {
service, _, err := s.cli.ServiceInspectWithRaw(context.Background(), svc.serviceID, types.ServiceInspectOptions{})
if err != nil {
scaleDownErrors = append(
scaleDownErrors,
fmt.Errorf("(*script).stopContainersAndServices: error inspecting service %s: %w", svc.serviceID, err),
)
continue
}
var zero uint64 = 0
serviceMode := &service.Spec.Mode
switch {
case serviceMode.Replicated != nil:
serviceMode.Replicated.Replicas = &zero
default:
scaleDownErrors = append(
scaleDownErrors,
fmt.Errorf("(*script).stopContainersAndServices: labeled service %s has to be in replicated mode", service.Spec.Name),
)
continue
}
response, err := s.cli.ServiceUpdate(context.Background(), service.ID, service.Version, service.Spec, types.ServiceUpdateOptions{})
if err != nil {
scaleDownErrors = append(scaleDownErrors, err)
continue
}
for _, warning := range response.Warnings {
s.logger.Warn(
fmt.Sprintf("The Docker API returned a warning when scaling down service %s: %s", service.Spec.Name, warning),
)
}
if err := progress.ServiceProgress(context.Background(), s.cli, service.ID, discardWriter); err != nil {
scaleDownErrors = append(scaleDownErrors, err)
} else {
scaledDownServices = append(scaledDownServices, service)
}
// progress.ServiceProgress returns too early, so we need to manually check
// whether all containers belonging to the service have actually been removed
for {
containers, err := s.cli.ContainerList(context.Background(), types.ContainerListOptions{
Filters: filters.NewArgs(filters.KeyValuePair{
Key: "label",
Value: fmt.Sprintf("com.docker.swarm.service.id=%s", service.ID),
}),
})
wg.Add(1)
go func(svc handledSwarmService) {
defer wg.Done()
service, _, err := s.cli.ServiceInspectWithRaw(context.Background(), svc.serviceID, types.ServiceInspectOptions{})
if err != nil {
scaleDownErrors = append(scaleDownErrors, err)
break
scaleDownErrors.append(
fmt.Errorf("(*script).stopContainersAndServices: error inspecting service %s: %w", svc.serviceID, err),
)
return
}
if len(containers) == 0 {
break
var zero uint64 = 0
serviceMode := &service.Spec.Mode
switch {
case serviceMode.Replicated != nil:
serviceMode.Replicated.Replicas = &zero
default:
scaleDownErrors.append(
fmt.Errorf("(*script).stopContainersAndServices: labeled service %s has to be in replicated mode", service.Spec.Name),
)
return
}
time.Sleep(time.Second)
}
response, err := s.cli.ServiceUpdate(context.Background(), service.ID, service.Version, service.Spec, types.ServiceUpdateOptions{})
if err != nil {
scaleDownErrors.append(err)
return
}
for _, warning := range response.Warnings {
s.logger.Warn(
fmt.Sprintf("The Docker API returned a warning when scaling down service %s: %s", service.Spec.Name, warning),
)
}
if err := progress.ServiceProgress(context.Background(), s.cli, service.ID, discardWriter); err != nil {
scaleDownErrors.append(err)
} else {
scaledDownServices = append(scaledDownServices, service)
}
// progress.ServiceProgress returns too early, so we need to manually check
// whether all containers belonging to the service have actually been removed
for {
containers, err := s.cli.ContainerList(context.Background(), types.ContainerListOptions{
Filters: filters.NewArgs(filters.KeyValuePair{
Key: "label",
Value: fmt.Sprintf("com.docker.swarm.service.id=%s", service.ID),
}),
})
if err != nil {
scaleDownErrors.append(err)
break
}
if len(containers) == 0 {
break
}
time.Sleep(time.Second)
}
}(svc)
}
wg.Wait()
}
s.stats.Containers = ContainersStats{
@ -511,11 +503,11 @@ func (s *script) stopContainersAndServices() (func() error, error) {
All: uint(len(allServices)),
ToScaleDown: uint(len(servicesToScaleDown)),
ScaledDown: uint(len(scaledDownServices)),
ScaleDownErrors: uint(len(scaleDownErrors)),
ScaleDownErrors: uint(len(scaleDownErrors.value())),
}
var initialErr error
allErrors := append(stopErrors, scaleDownErrors...)
allErrors := append(stopErrors, scaleDownErrors.value()...)
if len(allErrors) != 0 {
initialErr = fmt.Errorf(
"(*script).stopContainersAndServices: %d error(s) stopping containers: %w",
@ -565,38 +557,44 @@ func (s *script) stopContainersAndServices() (func() error, error) {
}
}
var scaleUpErrors []error
var scaleUpErrors concurrentSlice[error]
if isDockerSwarm {
wg := &sync.WaitGroup{}
for _, svc := range servicesToScaleDown {
service, _, err := s.cli.ServiceInspectWithRaw(context.Background(), svc.serviceID, types.ServiceInspectOptions{})
if err != nil {
scaleUpErrors = append(scaleUpErrors, err)
continue
}
wg.Add(1)
go func(svc handledSwarmService) {
defer wg.Done()
service, _, err := s.cli.ServiceInspectWithRaw(context.Background(), svc.serviceID, types.ServiceInspectOptions{})
if err != nil {
scaleUpErrors.append(err)
return
}
service.Spec.Mode.Replicated.Replicas = &svc.initialReplicaCount
response, err := s.cli.ServiceUpdate(
context.Background(),
service.ID,
service.Version, service.Spec,
types.ServiceUpdateOptions{},
)
if err != nil {
scaleUpErrors = append(scaleUpErrors, err)
continue
}
for _, warning := range response.Warnings {
s.logger.Warn(
fmt.Sprintf("The Docker API returned a warning when scaling up service %s: %s", service.Spec.Name, warning),
service.Spec.Mode.Replicated.Replicas = &svc.initialReplicaCount
response, err := s.cli.ServiceUpdate(
context.Background(),
service.ID,
service.Version, service.Spec,
types.ServiceUpdateOptions{},
)
}
if err := progress.ServiceProgress(context.Background(), s.cli, service.ID, discardWriter); err != nil {
scaleUpErrors = append(scaleUpErrors, err)
}
if err != nil {
scaleUpErrors.append(err)
return
}
for _, warning := range response.Warnings {
s.logger.Warn(
fmt.Sprintf("The Docker API returned a warning when scaling up service %s: %s", service.Spec.Name, warning),
)
}
if err := progress.ServiceProgress(context.Background(), s.cli, service.ID, discardWriter); err != nil {
scaleUpErrors.append(err)
}
}(svc)
}
wg.Wait()
}
allErrors := append(restartErrors, scaleUpErrors...)
allErrors := append(restartErrors, scaleUpErrors.value()...)
if len(allErrors) != 0 {
return fmt.Errorf(
"stopContainers: %d error(s) restarting containers and services: %w",

View File

@ -8,6 +8,7 @@ import (
"fmt"
"io"
"os"
"sync"
)
var noop = func() error { return nil }
@ -50,3 +51,31 @@ func (b *bufferingWriter) Write(p []byte) (n int, err error) {
}
return b.writer.Write(p)
}
type noopWriteCloser struct {
io.Writer
}
func (noopWriteCloser) Close() error {
return nil
}
type handledSwarmService struct {
serviceID string
initialReplicaCount uint64
}
type concurrentSlice[T any] struct {
val []T
sync.Mutex
}
func (c *concurrentSlice[T]) append(v T) {
c.Lock()
defer c.Unlock()
c.val = append(c.val, v)
}
func (c *concurrentSlice[T]) value() []T {
return c.val
}