From 285fef282f3e27295a1a1abadaf1674fea46766a Mon Sep 17 00:00:00 2001 From: Zhang Wei Date: Tue, 19 Jul 2016 00:02:41 +0800 Subject: [PATCH] Enhancement: allow parallel stop Stop multiple containers in parallel to speed up stop process, allow maximum 50 parallel stops. Signed-off-by: Abhinav Dahiya Signed-off-by: Zhang Wei --- command/container/stop.go | 8 ++++++-- command/container/utils.go | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/command/container/stop.go b/command/container/stop.go index dddb7efa22..2f22fd09a4 100644 --- a/command/container/stop.go +++ b/command/container/stop.go @@ -39,11 +39,15 @@ func NewStopCommand(dockerCli *command.DockerCli) *cobra.Command { func runStop(dockerCli *command.DockerCli, opts *stopOptions) error { ctx := context.Background() + timeout := time.Duration(opts.time) * time.Second var errs []string + + errChan := parallelOperation(ctx, opts.containers, func(ctx context.Context, id string) error { + return dockerCli.Client().ContainerStop(ctx, id, &timeout) + }) for _, container := range opts.containers { - timeout := time.Duration(opts.time) * time.Second - if err := dockerCli.Client().ContainerStop(ctx, container, &timeout); err != nil { + if err := <-errChan; err != nil { errs = append(errs, err.Error()) } else { fmt.Fprintf(dockerCli.Out(), "%s\n", container) diff --git a/command/container/utils.go b/command/container/utils.go index 8c993dcce5..7e895834f9 100644 --- a/command/container/utils.go +++ b/command/container/utils.go @@ -90,3 +90,35 @@ func getExitCode(dockerCli *command.DockerCli, ctx context.Context, containerID } return c.State.Running, c.State.ExitCode, nil } + +func parallelOperation(ctx context.Context, cids []string, op func(ctx context.Context, id string) error) chan error { + if len(cids) == 0 { + return nil + } + const defaultParallel int = 50 + sem := make(chan struct{}, defaultParallel) + errChan := make(chan error) + + // make sure result is printed in correct order + output := map[string]chan error{} + for _, c := range cids { + output[c] = make(chan error, 1) + } + go func() { + for _, c := range cids { + err := <-output[c] + errChan <- err + } + }() + + go func() { + for _, c := range cids { + sem <- struct{}{} // Wait for active queue sem to drain. + go func(container string) { + output[container] <- op(ctx, container) + <-sem + }(c) + } + }() + return errChan +}