From 09cc1f5c6097ae2664deaf14db089fbb3864cf8e Mon Sep 17 00:00:00 2001 From: Frederik Ring Date: Sat, 27 Jan 2024 18:41:03 +0100 Subject: [PATCH] Move docker interaction code into own file --- cmd/backup/docker.go | 309 +++++++++++++++++++++++++++++++++++++++++++ cmd/backup/script.go | 301 ----------------------------------------- 2 files changed, 309 insertions(+), 301 deletions(-) create mode 100644 cmd/backup/docker.go diff --git a/cmd/backup/docker.go b/cmd/backup/docker.go new file mode 100644 index 0000000..33f3721 --- /dev/null +++ b/cmd/backup/docker.go @@ -0,0 +1,309 @@ +package main + +import ( + "context" + "errors" + "fmt" + "io" + "sync" + "time" + + "github.com/docker/cli/cli/command/service/progress" + "github.com/docker/docker/api/types" + ctr "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/filters" + "github.com/docker/docker/api/types/swarm" +) + +// stopContainersAndServices stops all Docker containers that are marked as to being +// stopped during the backup and returns a function that can be called to +// restart everything that has been stopped. +func (s *script) stopContainersAndServices() (func() error, error) { + if s.cli == nil { + return noop, nil + } + + dockerInfo, err := s.cli.Info(context.Background()) + if err != nil { + return noop, fmt.Errorf("(*script).stopContainersAndServices: error getting docker info: %w", err) + } + isDockerSwarm := dockerInfo.Swarm.LocalNodeState != "inactive" + discardWriter := &noopWriteCloser{io.Discard} + + filterMatchLabel := fmt.Sprintf( + "docker-volume-backup.stop-during-backup=%s", + s.c.BackupStopContainerLabel, + ) + + allContainers, err := s.cli.ContainerList(context.Background(), types.ContainerListOptions{}) + if err != nil { + return noop, fmt.Errorf("(*script).stopContainersAndServices: error querying for containers: %w", err) + } + containersToStop, err := s.cli.ContainerList(context.Background(), types.ContainerListOptions{ + Filters: filters.NewArgs(filters.KeyValuePair{ + Key: "label", + Value: filterMatchLabel, + }), + }) + if err != nil { + return noop, fmt.Errorf("(*script).stopContainersAndServices: error querying for containers to stop: %w", err) + } + + var allServices []swarm.Service + var servicesToScaleDown []handledSwarmService + if isDockerSwarm { + allServices, err = s.cli.ServiceList(context.Background(), types.ServiceListOptions{}) + if err != nil { + return noop, fmt.Errorf("(*script).stopContainersAndServices: error querying for services: %w", err) + } + matchingServices, err := s.cli.ServiceList(context.Background(), types.ServiceListOptions{ + Filters: filters.NewArgs(filters.KeyValuePair{ + Key: "label", + Value: filterMatchLabel, + }), + Status: true, + }) + for _, s := range matchingServices { + servicesToScaleDown = append(servicesToScaleDown, handledSwarmService{ + serviceID: s.ID, + initialReplicaCount: *s.Spec.Mode.Replicated.Replicas, + }) + } + if err != nil { + return noop, fmt.Errorf("(*script).stopContainersAndServices: error querying for services to scale down: %w", err) + } + } + + if len(containersToStop) == 0 && len(servicesToScaleDown) == 0 { + return noop, nil + } + + if isDockerSwarm { + for _, container := range containersToStop { + if swarmServiceID, ok := container.Labels["com.docker.swarm.service.id"]; ok { + parentService, _, err := s.cli.ServiceInspectWithRaw(context.Background(), swarmServiceID, types.ServiceInspectOptions{}) + if err != nil { + return noop, fmt.Errorf("(*script).stopContainersAndServices: error querying for parent service with ID %s: %w", swarmServiceID, err) + } + for label := range parentService.Spec.Labels { + if label == "docker-volume-backup.stop-during-backup" { + return noop, fmt.Errorf( + "(*script).stopContainersAndServices: container %s is labeled to stop but has parent service %s which is also labeled, cannot continue", + container.Names[0], + parentService.Spec.Name, + ) + } + } + } + } + } + + s.logger.Info( + fmt.Sprintf( + "Stopping %d out of %d running container(s) and scaling down %d out of %d active service(s) as they were labeled %s.", + len(containersToStop), + len(allContainers), + len(servicesToScaleDown), + len(allServices), + filterMatchLabel, + ), + ) + + var stoppedContainers []types.Container + var stopErrors []error + for _, container := range containersToStop { + if err := s.cli.ContainerStop(context.Background(), container.ID, ctr.StopOptions{}); err != nil { + stopErrors = append(stopErrors, err) + } else { + stoppedContainers = append(stoppedContainers, container) + } + } + + var scaledDownServices []swarm.Service + var scaleDownErrors concurrentSlice[error] + if isDockerSwarm { + wg := sync.WaitGroup{} + for _, svc := range servicesToScaleDown { + wg.Add(1) + go func(svc handledSwarmService) { + defer wg.Done() + service, _, err := s.cli.ServiceInspectWithRaw(context.Background(), svc.serviceID, types.ServiceInspectOptions{}) + if err != nil { + scaleDownErrors.append( + fmt.Errorf("(*script).stopContainersAndServices: error inspecting service %s: %w", svc.serviceID, err), + ) + return + } + var zero uint64 = 0 + serviceMode := &service.Spec.Mode + switch { + case serviceMode.Replicated != nil: + serviceMode.Replicated.Replicas = &zero + default: + scaleDownErrors.append( + fmt.Errorf("(*script).stopContainersAndServices: labeled service %s has to be in replicated mode", service.Spec.Name), + ) + return + } + + response, err := s.cli.ServiceUpdate(context.Background(), service.ID, service.Version, service.Spec, types.ServiceUpdateOptions{}) + if err != nil { + scaleDownErrors.append(err) + return + } + + for _, warning := range response.Warnings { + s.logger.Warn( + fmt.Sprintf("The Docker API returned a warning when scaling down service %s: %s", service.Spec.Name, warning), + ) + } + + if err := progress.ServiceProgress(context.Background(), s.cli, service.ID, discardWriter); err != nil { + scaleDownErrors.append(err) + } else { + scaledDownServices = append(scaledDownServices, service) + } + + // progress.ServiceProgress returns too early, so we need to manually check + // whether all containers belonging to the service have actually been removed + for { + containers, err := s.cli.ContainerList(context.Background(), types.ContainerListOptions{ + Filters: filters.NewArgs(filters.KeyValuePair{ + Key: "label", + Value: fmt.Sprintf("com.docker.swarm.service.id=%s", service.ID), + }), + }) + if err != nil { + scaleDownErrors.append(err) + break + } + if len(containers) == 0 { + break + } + time.Sleep(time.Second) + } + }(svc) + } + wg.Wait() + } + + s.stats.Containers = ContainersStats{ + All: uint(len(allContainers)), + ToStop: uint(len(containersToStop)), + Stopped: uint(len(stoppedContainers)), + StopErrors: uint(len(stopErrors)), + } + + s.stats.Services = ServicesStats{ + All: uint(len(allServices)), + ToScaleDown: uint(len(servicesToScaleDown)), + ScaledDown: uint(len(scaledDownServices)), + ScaleDownErrors: uint(len(scaleDownErrors.value())), + } + + var initialErr error + allErrors := append(stopErrors, scaleDownErrors.value()...) + if len(allErrors) != 0 { + initialErr = fmt.Errorf( + "(*script).stopContainersAndServices: %d error(s) stopping containers: %w", + len(allErrors), + errors.Join(allErrors...), + ) + } + + return func() error { + servicesRequiringForceUpdate := map[string]struct{}{} + + var restartErrors []error + for _, container := range stoppedContainers { + if swarmServiceName, ok := container.Labels["com.docker.swarm.service.name"]; ok { + servicesRequiringForceUpdate[swarmServiceName] = struct{}{} + continue + } + if err := s.cli.ContainerStart(context.Background(), container.ID, types.ContainerStartOptions{}); err != nil { + restartErrors = append(restartErrors, err) + } + } + + if len(servicesRequiringForceUpdate) != 0 { + services, _ := s.cli.ServiceList(context.Background(), types.ServiceListOptions{}) + for serviceName := range servicesRequiringForceUpdate { + var serviceMatch swarm.Service + for _, service := range services { + if service.Spec.Name == serviceName { + serviceMatch = service + break + } + } + if serviceMatch.ID == "" { + restartErrors = append( + restartErrors, + fmt.Errorf("(*script).stopContainersAndServices: couldn't find service with name %s", serviceName), + ) + continue + } + serviceMatch.Spec.TaskTemplate.ForceUpdate += 1 + if _, err := s.cli.ServiceUpdate( + context.Background(), serviceMatch.ID, + serviceMatch.Version, serviceMatch.Spec, types.ServiceUpdateOptions{}, + ); err != nil { + restartErrors = append(restartErrors, err) + } + } + } + + var scaleUpErrors concurrentSlice[error] + if isDockerSwarm { + wg := &sync.WaitGroup{} + for _, svc := range servicesToScaleDown { + wg.Add(1) + go func(svc handledSwarmService) { + defer wg.Done() + service, _, err := s.cli.ServiceInspectWithRaw(context.Background(), svc.serviceID, types.ServiceInspectOptions{}) + if err != nil { + scaleUpErrors.append(err) + return + } + + service.Spec.Mode.Replicated.Replicas = &svc.initialReplicaCount + response, err := s.cli.ServiceUpdate( + context.Background(), + service.ID, + service.Version, service.Spec, + types.ServiceUpdateOptions{}, + ) + if err != nil { + scaleUpErrors.append(err) + return + } + for _, warning := range response.Warnings { + s.logger.Warn( + fmt.Sprintf("The Docker API returned a warning when scaling up service %s: %s", service.Spec.Name, warning), + ) + } + if err := progress.ServiceProgress(context.Background(), s.cli, service.ID, discardWriter); err != nil { + scaleUpErrors.append(err) + } + }(svc) + } + wg.Wait() + } + + allErrors := append(restartErrors, scaleUpErrors.value()...) + if len(allErrors) != 0 { + return fmt.Errorf( + "stopContainers: %d error(s) restarting containers and services: %w", + len(allErrors), + errors.Join(allErrors...), + ) + } + s.logger.Info( + fmt.Sprintf( + "Restarted %d container(s) and %d service(s).", + len(stoppedContainers), + len(scaledDownServices), + ), + ) + return nil + }, initialErr +} diff --git a/cmd/backup/script.go b/cmd/backup/script.go index 71eadb8..65b8009 100644 --- a/cmd/backup/script.go +++ b/cmd/backup/script.go @@ -5,8 +5,6 @@ package main import ( "bytes" - "context" - "errors" "fmt" "io" "io/fs" @@ -16,7 +14,6 @@ import ( "path/filepath" "slices" "strings" - "sync" "text/template" "time" @@ -31,11 +28,6 @@ import ( openpgp "github.com/ProtonMail/go-crypto/openpgp/v2" "github.com/containrrr/shoutrrr" "github.com/containrrr/shoutrrr/pkg/router" - "github.com/docker/cli/cli/command/service/progress" - "github.com/docker/docker/api/types" - ctr "github.com/docker/docker/api/types/container" - "github.com/docker/docker/api/types/filters" - "github.com/docker/docker/api/types/swarm" "github.com/docker/docker/client" "github.com/leekchan/timeutil" "github.com/offen/envconfig" @@ -320,299 +312,6 @@ func newScript() (*script, error) { return s, nil } -// stopContainersAndServices stops all Docker containers that are marked as to being -// stopped during the backup and returns a function that can be called to -// restart everything that has been stopped. -func (s *script) stopContainersAndServices() (func() error, error) { - if s.cli == nil { - return noop, nil - } - - dockerInfo, err := s.cli.Info(context.Background()) - if err != nil { - return noop, fmt.Errorf("(*script).stopContainersAndServices: error getting docker info: %w", err) - } - isDockerSwarm := dockerInfo.Swarm.LocalNodeState != "inactive" - discardWriter := &noopWriteCloser{io.Discard} - - filterMatchLabel := fmt.Sprintf( - "docker-volume-backup.stop-during-backup=%s", - s.c.BackupStopContainerLabel, - ) - - allContainers, err := s.cli.ContainerList(context.Background(), types.ContainerListOptions{}) - if err != nil { - return noop, fmt.Errorf("(*script).stopContainersAndServices: error querying for containers: %w", err) - } - containersToStop, err := s.cli.ContainerList(context.Background(), types.ContainerListOptions{ - Filters: filters.NewArgs(filters.KeyValuePair{ - Key: "label", - Value: filterMatchLabel, - }), - }) - if err != nil { - return noop, fmt.Errorf("(*script).stopContainersAndServices: error querying for containers to stop: %w", err) - } - - var allServices []swarm.Service - var servicesToScaleDown []handledSwarmService - if isDockerSwarm { - allServices, err = s.cli.ServiceList(context.Background(), types.ServiceListOptions{}) - if err != nil { - return noop, fmt.Errorf("(*script).stopContainersAndServices: error querying for services: %w", err) - } - matchingServices, err := s.cli.ServiceList(context.Background(), types.ServiceListOptions{ - Filters: filters.NewArgs(filters.KeyValuePair{ - Key: "label", - Value: filterMatchLabel, - }), - Status: true, - }) - for _, s := range matchingServices { - servicesToScaleDown = append(servicesToScaleDown, handledSwarmService{ - serviceID: s.ID, - initialReplicaCount: *s.Spec.Mode.Replicated.Replicas, - }) - } - if err != nil { - return noop, fmt.Errorf("(*script).stopContainersAndServices: error querying for services to scale down: %w", err) - } - } - - if len(containersToStop) == 0 && len(servicesToScaleDown) == 0 { - return noop, nil - } - - if isDockerSwarm { - for _, container := range containersToStop { - if swarmServiceID, ok := container.Labels["com.docker.swarm.service.id"]; ok { - parentService, _, err := s.cli.ServiceInspectWithRaw(context.Background(), swarmServiceID, types.ServiceInspectOptions{}) - if err != nil { - return noop, fmt.Errorf("(*script).stopContainersAndServices: error querying for parent service with ID %s: %w", swarmServiceID, err) - } - for label := range parentService.Spec.Labels { - if label == "docker-volume-backup.stop-during-backup" { - return noop, fmt.Errorf( - "(*script).stopContainersAndServices: container %s is labeled to stop but has parent service %s which is also labeled, cannot continue", - container.Names[0], - parentService.Spec.Name, - ) - } - } - } - } - } - - s.logger.Info( - fmt.Sprintf( - "Stopping %d out of %d running container(s) and scaling down %d out of %d active service(s) as they were labeled %s.", - len(containersToStop), - len(allContainers), - len(servicesToScaleDown), - len(allServices), - filterMatchLabel, - ), - ) - - var stoppedContainers []types.Container - var stopErrors []error - for _, container := range containersToStop { - if err := s.cli.ContainerStop(context.Background(), container.ID, ctr.StopOptions{}); err != nil { - stopErrors = append(stopErrors, err) - } else { - stoppedContainers = append(stoppedContainers, container) - } - } - - var scaledDownServices []swarm.Service - var scaleDownErrors concurrentSlice[error] - if isDockerSwarm { - wg := sync.WaitGroup{} - for _, svc := range servicesToScaleDown { - wg.Add(1) - go func(svc handledSwarmService) { - defer wg.Done() - service, _, err := s.cli.ServiceInspectWithRaw(context.Background(), svc.serviceID, types.ServiceInspectOptions{}) - if err != nil { - scaleDownErrors.append( - fmt.Errorf("(*script).stopContainersAndServices: error inspecting service %s: %w", svc.serviceID, err), - ) - return - } - var zero uint64 = 0 - serviceMode := &service.Spec.Mode - switch { - case serviceMode.Replicated != nil: - serviceMode.Replicated.Replicas = &zero - default: - scaleDownErrors.append( - fmt.Errorf("(*script).stopContainersAndServices: labeled service %s has to be in replicated mode", service.Spec.Name), - ) - return - } - - response, err := s.cli.ServiceUpdate(context.Background(), service.ID, service.Version, service.Spec, types.ServiceUpdateOptions{}) - if err != nil { - scaleDownErrors.append(err) - return - } - - for _, warning := range response.Warnings { - s.logger.Warn( - fmt.Sprintf("The Docker API returned a warning when scaling down service %s: %s", service.Spec.Name, warning), - ) - } - - if err := progress.ServiceProgress(context.Background(), s.cli, service.ID, discardWriter); err != nil { - scaleDownErrors.append(err) - } else { - scaledDownServices = append(scaledDownServices, service) - } - - // progress.ServiceProgress returns too early, so we need to manually check - // whether all containers belonging to the service have actually been removed - for { - containers, err := s.cli.ContainerList(context.Background(), types.ContainerListOptions{ - Filters: filters.NewArgs(filters.KeyValuePair{ - Key: "label", - Value: fmt.Sprintf("com.docker.swarm.service.id=%s", service.ID), - }), - }) - if err != nil { - scaleDownErrors.append(err) - break - } - if len(containers) == 0 { - break - } - time.Sleep(time.Second) - } - }(svc) - } - wg.Wait() - } - - s.stats.Containers = ContainersStats{ - All: uint(len(allContainers)), - ToStop: uint(len(containersToStop)), - Stopped: uint(len(stoppedContainers)), - StopErrors: uint(len(stopErrors)), - } - - s.stats.Services = ServicesStats{ - All: uint(len(allServices)), - ToScaleDown: uint(len(servicesToScaleDown)), - ScaledDown: uint(len(scaledDownServices)), - ScaleDownErrors: uint(len(scaleDownErrors.value())), - } - - var initialErr error - allErrors := append(stopErrors, scaleDownErrors.value()...) - if len(allErrors) != 0 { - initialErr = fmt.Errorf( - "(*script).stopContainersAndServices: %d error(s) stopping containers: %w", - len(allErrors), - errors.Join(allErrors...), - ) - } - - return func() error { - servicesRequiringForceUpdate := map[string]struct{}{} - - var restartErrors []error - for _, container := range stoppedContainers { - if swarmServiceName, ok := container.Labels["com.docker.swarm.service.name"]; ok { - servicesRequiringForceUpdate[swarmServiceName] = struct{}{} - continue - } - if err := s.cli.ContainerStart(context.Background(), container.ID, types.ContainerStartOptions{}); err != nil { - restartErrors = append(restartErrors, err) - } - } - - if len(servicesRequiringForceUpdate) != 0 { - services, _ := s.cli.ServiceList(context.Background(), types.ServiceListOptions{}) - for serviceName := range servicesRequiringForceUpdate { - var serviceMatch swarm.Service - for _, service := range services { - if service.Spec.Name == serviceName { - serviceMatch = service - break - } - } - if serviceMatch.ID == "" { - restartErrors = append( - restartErrors, - fmt.Errorf("(*script).stopContainersAndServices: couldn't find service with name %s", serviceName), - ) - continue - } - serviceMatch.Spec.TaskTemplate.ForceUpdate += 1 - if _, err := s.cli.ServiceUpdate( - context.Background(), serviceMatch.ID, - serviceMatch.Version, serviceMatch.Spec, types.ServiceUpdateOptions{}, - ); err != nil { - restartErrors = append(restartErrors, err) - } - } - } - - var scaleUpErrors concurrentSlice[error] - if isDockerSwarm { - wg := &sync.WaitGroup{} - for _, svc := range servicesToScaleDown { - wg.Add(1) - go func(svc handledSwarmService) { - defer wg.Done() - service, _, err := s.cli.ServiceInspectWithRaw(context.Background(), svc.serviceID, types.ServiceInspectOptions{}) - if err != nil { - scaleUpErrors.append(err) - return - } - - service.Spec.Mode.Replicated.Replicas = &svc.initialReplicaCount - response, err := s.cli.ServiceUpdate( - context.Background(), - service.ID, - service.Version, service.Spec, - types.ServiceUpdateOptions{}, - ) - if err != nil { - scaleUpErrors.append(err) - return - } - for _, warning := range response.Warnings { - s.logger.Warn( - fmt.Sprintf("The Docker API returned a warning when scaling up service %s: %s", service.Spec.Name, warning), - ) - } - if err := progress.ServiceProgress(context.Background(), s.cli, service.ID, discardWriter); err != nil { - scaleUpErrors.append(err) - } - }(svc) - } - wg.Wait() - } - - allErrors := append(restartErrors, scaleUpErrors.value()...) - if len(allErrors) != 0 { - return fmt.Errorf( - "stopContainers: %d error(s) restarting containers and services: %w", - len(allErrors), - errors.Join(allErrors...), - ) - } - s.logger.Info( - fmt.Sprintf( - "Restarted %d container(s) and %d service(s).", - len(stoppedContainers), - len(scaledDownServices), - ), - ) - return nil - }, initialErr -} - // createArchive creates a tar archive of the configured backup location and // saves it to disk. func (s *script) createArchive() error {