mirror of
https://github.com/offen/docker-volume-backup.git
synced 2024-11-26 15:10:27 +01:00
Factor out code for service updating
This commit is contained in:
parent
09cc1f5c60
commit
26bbc66cd5
@ -13,8 +13,53 @@ import (
|
|||||||
ctr "github.com/docker/docker/api/types/container"
|
ctr "github.com/docker/docker/api/types/container"
|
||||||
"github.com/docker/docker/api/types/filters"
|
"github.com/docker/docker/api/types/filters"
|
||||||
"github.com/docker/docker/api/types/swarm"
|
"github.com/docker/docker/api/types/swarm"
|
||||||
|
"github.com/docker/docker/client"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func scaleService(cli *client.Client, serviceID string, replicas uint64) ([]string, error) {
|
||||||
|
service, _, err := cli.ServiceInspectWithRaw(context.Background(), serviceID, types.ServiceInspectOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("scaleService: error inspecting service %s: %w", serviceID, err)
|
||||||
|
}
|
||||||
|
serviceMode := &service.Spec.Mode
|
||||||
|
switch {
|
||||||
|
case serviceMode.Replicated != nil:
|
||||||
|
serviceMode.Replicated.Replicas = &replicas
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("scaleService: service to be scaled %s has to be in replicated mode", service.Spec.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
response, err := cli.ServiceUpdate(context.Background(), service.ID, service.Version, service.Spec, types.ServiceUpdateOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("scaleService: error updating service: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
discardWriter := &noopWriteCloser{io.Discard}
|
||||||
|
if err := progress.ServiceProgress(context.Background(), cli, service.ID, discardWriter); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return response.Warnings, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func awaitContainerCountForService(cli *client.Client, serviceID string, count int) error {
|
||||||
|
for {
|
||||||
|
containers, err := cli.ContainerList(context.Background(), types.ContainerListOptions{
|
||||||
|
Filters: filters.NewArgs(filters.KeyValuePair{
|
||||||
|
Key: "label",
|
||||||
|
Value: fmt.Sprintf("com.docker.swarm.service.id=%s", serviceID),
|
||||||
|
}),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("awaitContainerCount: error listing containers: %w", err)
|
||||||
|
}
|
||||||
|
if len(containers) == count {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// stopContainersAndServices stops all Docker containers that are marked as to being
|
// stopContainersAndServices stops all Docker containers that are marked as to being
|
||||||
// stopped during the backup and returns a function that can be called to
|
// stopped during the backup and returns a function that can be called to
|
||||||
// restart everything that has been stopped.
|
// restart everything that has been stopped.
|
||||||
@ -28,7 +73,6 @@ func (s *script) stopContainersAndServices() (func() error, error) {
|
|||||||
return noop, fmt.Errorf("(*script).stopContainersAndServices: error getting docker info: %w", err)
|
return noop, fmt.Errorf("(*script).stopContainersAndServices: error getting docker info: %w", err)
|
||||||
}
|
}
|
||||||
isDockerSwarm := dockerInfo.Swarm.LocalNodeState != "inactive"
|
isDockerSwarm := dockerInfo.Swarm.LocalNodeState != "inactive"
|
||||||
discardWriter := &noopWriteCloser{io.Discard}
|
|
||||||
|
|
||||||
filterMatchLabel := fmt.Sprintf(
|
filterMatchLabel := fmt.Sprintf(
|
||||||
"docker-volume-backup.stop-during-backup=%s",
|
"docker-volume-backup.stop-during-backup=%s",
|
||||||
@ -119,7 +163,7 @@ func (s *script) stopContainersAndServices() (func() error, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var scaledDownServices []swarm.Service
|
var scaledDownServices []handledSwarmService
|
||||||
var scaleDownErrors concurrentSlice[error]
|
var scaleDownErrors concurrentSlice[error]
|
||||||
if isDockerSwarm {
|
if isDockerSwarm {
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
@ -127,60 +171,21 @@ func (s *script) stopContainersAndServices() (func() error, error) {
|
|||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(svc handledSwarmService) {
|
go func(svc handledSwarmService) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
service, _, err := s.cli.ServiceInspectWithRaw(context.Background(), svc.serviceID, types.ServiceInspectOptions{})
|
warnings, err := scaleService(s.cli, svc.serviceID, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
scaleDownErrors.append(
|
|
||||||
fmt.Errorf("(*script).stopContainersAndServices: error inspecting service %s: %w", svc.serviceID, err),
|
|
||||||
)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
var zero uint64 = 0
|
|
||||||
serviceMode := &service.Spec.Mode
|
|
||||||
switch {
|
|
||||||
case serviceMode.Replicated != nil:
|
|
||||||
serviceMode.Replicated.Replicas = &zero
|
|
||||||
default:
|
|
||||||
scaleDownErrors.append(
|
|
||||||
fmt.Errorf("(*script).stopContainersAndServices: labeled service %s has to be in replicated mode", service.Spec.Name),
|
|
||||||
)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
response, err := s.cli.ServiceUpdate(context.Background(), service.ID, service.Version, service.Spec, types.ServiceUpdateOptions{})
|
|
||||||
if err != nil {
|
|
||||||
scaleDownErrors.append(err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, warning := range response.Warnings {
|
|
||||||
s.logger.Warn(
|
|
||||||
fmt.Sprintf("The Docker API returned a warning when scaling down service %s: %s", service.Spec.Name, warning),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := progress.ServiceProgress(context.Background(), s.cli, service.ID, discardWriter); err != nil {
|
|
||||||
scaleDownErrors.append(err)
|
scaleDownErrors.append(err)
|
||||||
} else {
|
} else {
|
||||||
scaledDownServices = append(scaledDownServices, service)
|
scaledDownServices = append(scaledDownServices, svc)
|
||||||
|
}
|
||||||
|
for _, warning := range warnings {
|
||||||
|
s.logger.Warn(
|
||||||
|
fmt.Sprintf("The Docker API returned a warning when scaling down service %s: %s", svc.serviceID, warning),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// progress.ServiceProgress returns too early, so we need to manually check
|
// progress.ServiceProgress returns too early, so we need to manually check
|
||||||
// whether all containers belonging to the service have actually been removed
|
// whether all containers belonging to the service have actually been removed
|
||||||
for {
|
if err := awaitContainerCountForService(s.cli, svc.serviceID, 0); err != nil {
|
||||||
containers, err := s.cli.ContainerList(context.Background(), types.ContainerListOptions{
|
|
||||||
Filters: filters.NewArgs(filters.KeyValuePair{
|
|
||||||
Key: "label",
|
|
||||||
Value: fmt.Sprintf("com.docker.swarm.service.id=%s", service.ID),
|
|
||||||
}),
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
scaleDownErrors.append(err)
|
scaleDownErrors.append(err)
|
||||||
break
|
|
||||||
}
|
|
||||||
if len(containers) == 0 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
time.Sleep(time.Second)
|
|
||||||
}
|
}
|
||||||
}(svc)
|
}(svc)
|
||||||
}
|
}
|
||||||
@ -216,6 +221,9 @@ func (s *script) stopContainersAndServices() (func() error, error) {
|
|||||||
|
|
||||||
var restartErrors []error
|
var restartErrors []error
|
||||||
for _, container := range stoppedContainers {
|
for _, container := range stoppedContainers {
|
||||||
|
// in case a container was part of a swarm service, teh service requires to
|
||||||
|
// be force updated instead of restarting the container as it would otherwise
|
||||||
|
// remain in a "completed" state
|
||||||
if swarmServiceName, ok := container.Labels["com.docker.swarm.service.name"]; ok {
|
if swarmServiceName, ok := container.Labels["com.docker.swarm.service.name"]; ok {
|
||||||
servicesRequiringForceUpdate[swarmServiceName] = struct{}{}
|
servicesRequiringForceUpdate[swarmServiceName] = struct{}{}
|
||||||
continue
|
continue
|
||||||
@ -259,31 +267,16 @@ func (s *script) stopContainersAndServices() (func() error, error) {
|
|||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(svc handledSwarmService) {
|
go func(svc handledSwarmService) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
service, _, err := s.cli.ServiceInspectWithRaw(context.Background(), svc.serviceID, types.ServiceInspectOptions{})
|
warnings, err := scaleService(s.cli, svc.serviceID, svc.initialReplicaCount)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
scaleUpErrors.append(err)
|
scaleDownErrors.append(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
for _, warning := range warnings {
|
||||||
service.Spec.Mode.Replicated.Replicas = &svc.initialReplicaCount
|
|
||||||
response, err := s.cli.ServiceUpdate(
|
|
||||||
context.Background(),
|
|
||||||
service.ID,
|
|
||||||
service.Version, service.Spec,
|
|
||||||
types.ServiceUpdateOptions{},
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
scaleUpErrors.append(err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
for _, warning := range response.Warnings {
|
|
||||||
s.logger.Warn(
|
s.logger.Warn(
|
||||||
fmt.Sprintf("The Docker API returned a warning when scaling up service %s: %s", service.Spec.Name, warning),
|
fmt.Sprintf("The Docker API returned a warning when scaling up service %s: %s", svc.serviceID, warning),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
if err := progress.ServiceProgress(context.Background(), s.cli, service.ID, discardWriter); err != nil {
|
|
||||||
scaleUpErrors.append(err)
|
|
||||||
}
|
|
||||||
}(svc)
|
}(svc)
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
Loading…
Reference in New Issue
Block a user