From 174f85fabdc83e6f3e941ac8d170c7ac8fc0dcca Mon Sep 17 00:00:00 2001 From: Frederik Ring Date: Sat, 27 Jan 2024 19:26:39 +0100 Subject: [PATCH] Factor out code for service updating --- cmd/backup/docker.go | 131 ++++++++++++++++++++----------------------- 1 file changed, 62 insertions(+), 69 deletions(-) diff --git a/cmd/backup/docker.go b/cmd/backup/docker.go index 33f3721..ed60170 100644 --- a/cmd/backup/docker.go +++ b/cmd/backup/docker.go @@ -13,8 +13,53 @@ import ( ctr "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/filters" "github.com/docker/docker/api/types/swarm" + "github.com/docker/docker/client" ) +func scaleService(cli *client.Client, serviceID string, replicas uint64) ([]string, error) { + service, _, err := cli.ServiceInspectWithRaw(context.Background(), serviceID, types.ServiceInspectOptions{}) + if err != nil { + return nil, fmt.Errorf("scaleService: error inspecting service %s: %w", serviceID, err) + } + serviceMode := &service.Spec.Mode + switch { + case serviceMode.Replicated != nil: + serviceMode.Replicated.Replicas = &replicas + default: + return nil, fmt.Errorf("scaleService: service to be scaled %s has to be in replicated mode", service.Spec.Name) + } + + response, err := cli.ServiceUpdate(context.Background(), service.ID, service.Version, service.Spec, types.ServiceUpdateOptions{}) + if err != nil { + return nil, fmt.Errorf("scaleService: error updating service: %w", err) + } + + discardWriter := &noopWriteCloser{io.Discard} + if err := progress.ServiceProgress(context.Background(), cli, service.ID, discardWriter); err != nil { + return nil, err + } + return response.Warnings, nil +} + +func awaitContainerCountForService(cli *client.Client, serviceID string, count int) error { + for { + containers, err := cli.ContainerList(context.Background(), types.ContainerListOptions{ + Filters: filters.NewArgs(filters.KeyValuePair{ + Key: "label", + Value: fmt.Sprintf("com.docker.swarm.service.id=%s", serviceID), + }), + }) + if err != nil { + return fmt.Errorf("awaitContainerCount: error listing containers: %w", err) + } + if len(containers) == count { + break + } + time.Sleep(time.Second) + } + return nil +} + // stopContainersAndServices stops all Docker containers that are marked as to being // stopped during the backup and returns a function that can be called to // restart everything that has been stopped. @@ -28,7 +73,6 @@ func (s *script) stopContainersAndServices() (func() error, error) { return noop, fmt.Errorf("(*script).stopContainersAndServices: error getting docker info: %w", err) } isDockerSwarm := dockerInfo.Swarm.LocalNodeState != "inactive" - discardWriter := &noopWriteCloser{io.Discard} filterMatchLabel := fmt.Sprintf( "docker-volume-backup.stop-during-backup=%s", @@ -119,7 +163,7 @@ func (s *script) stopContainersAndServices() (func() error, error) { } } - var scaledDownServices []swarm.Service + var scaledDownServices []handledSwarmService var scaleDownErrors concurrentSlice[error] if isDockerSwarm { wg := sync.WaitGroup{} @@ -127,60 +171,21 @@ func (s *script) stopContainersAndServices() (func() error, error) { wg.Add(1) go func(svc handledSwarmService) { defer wg.Done() - service, _, err := s.cli.ServiceInspectWithRaw(context.Background(), svc.serviceID, types.ServiceInspectOptions{}) + warnings, err := scaleService(s.cli, svc.serviceID, 0) if err != nil { - scaleDownErrors.append( - fmt.Errorf("(*script).stopContainersAndServices: error inspecting service %s: %w", svc.serviceID, err), - ) - return - } - var zero uint64 = 0 - serviceMode := &service.Spec.Mode - switch { - case serviceMode.Replicated != nil: - serviceMode.Replicated.Replicas = &zero - default: - scaleDownErrors.append( - fmt.Errorf("(*script).stopContainersAndServices: labeled service %s has to be in replicated mode", service.Spec.Name), - ) - return - } - - response, err := s.cli.ServiceUpdate(context.Background(), service.ID, service.Version, service.Spec, types.ServiceUpdateOptions{}) - if err != nil { - scaleDownErrors.append(err) - return - } - - for _, warning := range response.Warnings { - s.logger.Warn( - fmt.Sprintf("The Docker API returned a warning when scaling down service %s: %s", service.Spec.Name, warning), - ) - } - - if err := progress.ServiceProgress(context.Background(), s.cli, service.ID, discardWriter); err != nil { scaleDownErrors.append(err) } else { - scaledDownServices = append(scaledDownServices, service) + scaledDownServices = append(scaledDownServices, svc) + } + for _, warning := range warnings { + s.logger.Warn( + fmt.Sprintf("The Docker API returned a warning when scaling down service %s: %s", svc.serviceID, warning), + ) } - // progress.ServiceProgress returns too early, so we need to manually check // whether all containers belonging to the service have actually been removed - for { - containers, err := s.cli.ContainerList(context.Background(), types.ContainerListOptions{ - Filters: filters.NewArgs(filters.KeyValuePair{ - Key: "label", - Value: fmt.Sprintf("com.docker.swarm.service.id=%s", service.ID), - }), - }) - if err != nil { - scaleDownErrors.append(err) - break - } - if len(containers) == 0 { - break - } - time.Sleep(time.Second) + if err := awaitContainerCountForService(s.cli, svc.serviceID, 0); err != nil { + scaleDownErrors.append(err) } }(svc) } @@ -216,6 +221,9 @@ func (s *script) stopContainersAndServices() (func() error, error) { var restartErrors []error for _, container := range stoppedContainers { + // in case a container was part of a swarm service, teh service requires to + // be force updated instead of restarting the container as it would otherwise + // remain in a "completed" state if swarmServiceName, ok := container.Labels["com.docker.swarm.service.name"]; ok { servicesRequiringForceUpdate[swarmServiceName] = struct{}{} continue @@ -259,31 +267,16 @@ func (s *script) stopContainersAndServices() (func() error, error) { wg.Add(1) go func(svc handledSwarmService) { defer wg.Done() - service, _, err := s.cli.ServiceInspectWithRaw(context.Background(), svc.serviceID, types.ServiceInspectOptions{}) + warnings, err := scaleService(s.cli, svc.serviceID, svc.initialReplicaCount) if err != nil { - scaleUpErrors.append(err) + scaleDownErrors.append(err) return } - - service.Spec.Mode.Replicated.Replicas = &svc.initialReplicaCount - response, err := s.cli.ServiceUpdate( - context.Background(), - service.ID, - service.Version, service.Spec, - types.ServiceUpdateOptions{}, - ) - if err != nil { - scaleUpErrors.append(err) - return - } - for _, warning := range response.Warnings { + for _, warning := range warnings { s.logger.Warn( - fmt.Sprintf("The Docker API returned a warning when scaling up service %s: %s", service.Spec.Name, warning), + fmt.Sprintf("The Docker API returned a warning when scaling up service %s: %s", svc.serviceID, warning), ) } - if err := progress.ServiceProgress(context.Background(), s.cli, service.ID, discardWriter); err != nil { - scaleUpErrors.append(err) - } }(svc) } wg.Wait()