From d8ab3840e02bb1f0e1f1df4767eb492fcddaac76 Mon Sep 17 00:00:00 2001 From: Aaron Lehmann Date: Thu, 16 Feb 2017 17:05:36 -0800 Subject: [PATCH] Synchronous service create and service update Change "service create" and "service update" to wait until the creation or update finishes, when --detach=false is specified. Show progress bars for the overall operation and for each individual task (when there are a small enough number of tasks), unless "-q" / "--quiet" is specified. Signed-off-by: Aaron Lehmann --- command/service/create.go | 16 +- command/service/helpers.go | 39 +++ command/service/opts.go | 6 + command/service/progress/progress.go | 409 +++++++++++++++++++++++++++ command/service/update.go | 15 +- 5 files changed, 479 insertions(+), 6 deletions(-) create mode 100644 command/service/helpers.go create mode 100644 command/service/progress/progress.go diff --git a/command/service/create.go b/command/service/create.go index 7fd0884930..76b61f6c2e 100644 --- a/command/service/create.go +++ b/command/service/create.go @@ -7,6 +7,7 @@ import ( "github.com/docker/docker/cli" "github.com/docker/docker/cli/command" "github.com/spf13/cobra" + "github.com/spf13/pflag" "golang.org/x/net/context" ) @@ -22,7 +23,7 @@ func newCreateCommand(dockerCli *command.DockerCli) *cobra.Command { if len(args) > 1 { opts.args = args[1:] } - return runCreate(dockerCli, opts) + return runCreate(dockerCli, cmd.Flags(), opts) }, } flags := cmd.Flags() @@ -58,7 +59,7 @@ func newCreateCommand(dockerCli *command.DockerCli) *cobra.Command { return cmd } -func runCreate(dockerCli *command.DockerCli, opts *serviceOptions) error { +func runCreate(dockerCli *command.DockerCli, flags *pflag.FlagSet, opts *serviceOptions) error { apiClient := dockerCli.Client() createOpts := types.ServiceCreateOptions{} @@ -104,5 +105,14 @@ func runCreate(dockerCli *command.DockerCli, opts *serviceOptions) error { } fmt.Fprintf(dockerCli.Out(), "%s\n", response.ID) - return nil + + if opts.detach { + if !flags.Changed("detach") { + fmt.Fprintln(dockerCli.Err(), "Since --detach=false was not specified, tasks will be created in the background.\n"+ + "In a future release, --detach=false will become the default.") + } + return nil + } + + return waitOnService(ctx, dockerCli, response.ID, opts) } diff --git a/command/service/helpers.go b/command/service/helpers.go new file mode 100644 index 0000000000..2289369908 --- /dev/null +++ b/command/service/helpers.go @@ -0,0 +1,39 @@ +package service + +import ( + "io" + + "github.com/docker/docker/cli/command" + "github.com/docker/docker/cli/command/service/progress" + "github.com/docker/docker/pkg/jsonmessage" + "golang.org/x/net/context" +) + +// waitOnService waits for the service to converge. It outputs a progress bar, +// if appopriate based on the CLI flags. +func waitOnService(ctx context.Context, dockerCli *command.DockerCli, serviceID string, opts *serviceOptions) error { + errChan := make(chan error, 1) + pipeReader, pipeWriter := io.Pipe() + + go func() { + errChan <- progress.ServiceProgress(ctx, dockerCli.Client(), serviceID, pipeWriter) + }() + + if opts.quiet { + go func() { + for { + var buf [1024]byte + if _, err := pipeReader.Read(buf[:]); err != nil { + return + } + } + }() + return <-errChan + } + + err := jsonmessage.DisplayJSONMessagesToStream(pipeReader, dockerCli.Out(), nil) + if err == nil { + err = <-errChan + } + return err +} diff --git a/command/service/opts.go b/command/service/opts.go index 1ff6575c09..cdfe513177 100644 --- a/command/service/opts.go +++ b/command/service/opts.go @@ -333,6 +333,9 @@ func convertExtraHostsToSwarmHosts(extraHosts []string) []string { } type serviceOptions struct { + detach bool + quiet bool + name string labels opts.ListOpts containerLabels opts.ListOpts @@ -496,6 +499,9 @@ func (opts *serviceOptions) ToService() (swarm.ServiceSpec, error) { // addServiceFlags adds all flags that are common to both `create` and `update`. // Any flags that are not common are added separately in the individual command func addServiceFlags(flags *pflag.FlagSet, opts *serviceOptions) { + flags.BoolVarP(&opts.detach, "detach", "d", true, "Exit immediately instead of waiting for the service to converge") + flags.BoolVarP(&opts.quiet, "quiet", "q", false, "Suppress progress output") + flags.StringVarP(&opts.workdir, flagWorkdir, "w", "", "Working directory inside the container") flags.StringVarP(&opts.user, flagUser, "u", "", "Username or UID (format: [:])") flags.StringVar(&opts.hostname, flagHostname, "", "Container hostname") diff --git a/command/service/progress/progress.go b/command/service/progress/progress.go new file mode 100644 index 0000000000..ccc7e60cfc --- /dev/null +++ b/command/service/progress/progress.go @@ -0,0 +1,409 @@ +package progress + +import ( + "errors" + "fmt" + "io" + "os" + "os/signal" + "time" + + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/filters" + "github.com/docker/docker/api/types/swarm" + "github.com/docker/docker/client" + "github.com/docker/docker/pkg/progress" + "github.com/docker/docker/pkg/streamformatter" + "github.com/docker/docker/pkg/stringid" + "golang.org/x/net/context" +) + +var ( + numberedStates = map[swarm.TaskState]int64{ + swarm.TaskStateNew: 1, + swarm.TaskStateAllocated: 2, + swarm.TaskStatePending: 3, + swarm.TaskStateAssigned: 4, + swarm.TaskStateAccepted: 5, + swarm.TaskStatePreparing: 6, + swarm.TaskStateReady: 7, + swarm.TaskStateStarting: 8, + swarm.TaskStateRunning: 9, + } + + longestState int +) + +const ( + maxProgress = 9 + maxProgressBars = 20 +) + +type progressUpdater interface { + update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]swarm.Node, rollback bool) (bool, error) +} + +func init() { + for state := range numberedStates { + if len(state) > longestState { + longestState = len(state) + } + } +} + +func stateToProgress(state swarm.TaskState, rollback bool) int64 { + if !rollback { + return numberedStates[state] + } + return int64(len(numberedStates)) - numberedStates[state] +} + +// ServiceProgress outputs progress information for convergence of a service. +func ServiceProgress(ctx context.Context, client client.APIClient, serviceID string, progressWriter io.WriteCloser) error { + defer progressWriter.Close() + + progressOut := streamformatter.NewJSONStreamFormatter().NewProgressOutput(progressWriter, false) + + sigint := make(chan os.Signal, 1) + signal.Notify(sigint, os.Interrupt) + defer signal.Stop(sigint) + + taskFilter := filters.NewArgs() + taskFilter.Add("service", serviceID) + taskFilter.Add("_up-to-date", "true") + + getUpToDateTasks := func() ([]swarm.Task, error) { + return client.TaskList(ctx, types.TaskListOptions{Filters: taskFilter}) + } + + var ( + updater progressUpdater + converged bool + convergedAt time.Time + monitor = 5 * time.Second + rollback bool + ) + + for { + service, _, err := client.ServiceInspectWithRaw(ctx, serviceID) + if err != nil { + return err + } + + if service.Spec.UpdateConfig != nil && service.Spec.UpdateConfig.Monitor != 0 { + monitor = service.Spec.UpdateConfig.Monitor + } + + if updater == nil { + updater, err = initializeUpdater(service, progressOut) + if err != nil { + return err + } + } + + if service.UpdateStatus != nil { + switch service.UpdateStatus.State { + case swarm.UpdateStateUpdating: + rollback = false + case swarm.UpdateStateCompleted: + if !converged { + return nil + } + case swarm.UpdateStatePaused: + return fmt.Errorf("service update paused: %s", service.UpdateStatus.Message) + case swarm.UpdateStateRollbackStarted: + if !rollback && service.UpdateStatus.Message != "" { + progressOut.WriteProgress(progress.Progress{ + ID: "rollback", + Action: service.UpdateStatus.Message, + }) + } + rollback = true + case swarm.UpdateStateRollbackPaused: + return fmt.Errorf("service rollback paused: %s", service.UpdateStatus.Message) + case swarm.UpdateStateRollbackCompleted: + if !converged { + return fmt.Errorf("service rolled back: %s", service.UpdateStatus.Message) + } + } + } + if converged && time.Since(convergedAt) >= monitor { + return nil + } + + tasks, err := getUpToDateTasks() + if err != nil { + return err + } + + activeNodes, err := getActiveNodes(ctx, client) + if err != nil { + return err + } + + converged, err = updater.update(service, tasks, activeNodes, rollback) + if err != nil { + return err + } + if converged { + if convergedAt.IsZero() { + convergedAt = time.Now() + } + wait := monitor - time.Since(convergedAt) + if wait >= 0 { + progressOut.WriteProgress(progress.Progress{ + // Ideally this would have no ID, but + // the progress rendering code behaves + // poorly on an "action" with no ID. It + // returns the cursor to the beginning + // of the line, so the first character + // may be difficult to read. Then the + // output is overwritten by the shell + // prompt when the command finishes. + ID: "verify", + Action: fmt.Sprintf("Waiting %d seconds to verify that tasks are stable...", wait/time.Second+1), + }) + } + } else { + if !convergedAt.IsZero() { + progressOut.WriteProgress(progress.Progress{ + ID: "verify", + Action: "Detected task failure", + }) + } + convergedAt = time.Time{} + } + + select { + case <-time.After(200 * time.Millisecond): + case <-sigint: + if !converged { + progress.Message(progressOut, "", "Operation continuing in background.") + progress.Messagef(progressOut, "", "Use `docker service ps %s` to check progress.", serviceID) + } + return nil + } + } +} + +func getActiveNodes(ctx context.Context, client client.APIClient) (map[string]swarm.Node, error) { + nodes, err := client.NodeList(ctx, types.NodeListOptions{}) + if err != nil { + return nil, err + } + + activeNodes := make(map[string]swarm.Node) + for _, n := range nodes { + if n.Status.State != swarm.NodeStateDown { + activeNodes[n.ID] = n + } + } + return activeNodes, nil +} + +func initializeUpdater(service swarm.Service, progressOut progress.Output) (progressUpdater, error) { + if service.Spec.Mode.Replicated != nil && service.Spec.Mode.Replicated.Replicas != nil { + return &replicatedProgressUpdater{ + progressOut: progressOut, + }, nil + } + if service.Spec.Mode.Global != nil { + return &globalProgressUpdater{ + progressOut: progressOut, + }, nil + } + return nil, errors.New("unrecognized service mode") +} + +func writeOverallProgress(progressOut progress.Output, numerator, denominator int, rollback bool) { + if rollback { + progressOut.WriteProgress(progress.Progress{ + ID: "overall progress", + Action: fmt.Sprintf("rolling back update: %d out of %d tasks", numerator, denominator), + }) + return + } + progressOut.WriteProgress(progress.Progress{ + ID: "overall progress", + Action: fmt.Sprintf("%d out of %d tasks", numerator, denominator), + }) +} + +type replicatedProgressUpdater struct { + progressOut progress.Output + + // used for maping slots to a contiguous space + // this also causes progress bars to appear in order + slotMap map[int]int + + initialized bool + done bool +} + +func (u *replicatedProgressUpdater) update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]swarm.Node, rollback bool) (bool, error) { + if service.Spec.Mode.Replicated == nil || service.Spec.Mode.Replicated.Replicas == nil { + return false, errors.New("no replica count") + } + replicas := *service.Spec.Mode.Replicated.Replicas + + if !u.initialized { + u.slotMap = make(map[int]int) + + // Draw progress bars in order + writeOverallProgress(u.progressOut, 0, int(replicas), rollback) + + if replicas <= maxProgressBars { + for i := uint64(1); i <= replicas; i++ { + progress.Update(u.progressOut, fmt.Sprintf("%d/%d", i, replicas), " ") + } + } + u.initialized = true + } + + // If there are multiple tasks with the same slot number, favor the one + // with the *lowest* desired state. This can happen in restart + // scenarios. + tasksBySlot := make(map[int]swarm.Task) + for _, task := range tasks { + if numberedStates[task.DesiredState] == 0 { + continue + } + if existingTask, ok := tasksBySlot[task.Slot]; ok { + if numberedStates[existingTask.DesiredState] <= numberedStates[task.DesiredState] { + continue + } + } + if _, nodeActive := activeNodes[task.NodeID]; nodeActive { + tasksBySlot[task.Slot] = task + } + } + + // If we had reached a converged state, check if we are still converged. + if u.done { + for _, task := range tasksBySlot { + if task.Status.State != swarm.TaskStateRunning { + u.done = false + break + } + } + } + + running := uint64(0) + + for _, task := range tasksBySlot { + mappedSlot := u.slotMap[task.Slot] + if mappedSlot == 0 { + mappedSlot = len(u.slotMap) + 1 + u.slotMap[task.Slot] = mappedSlot + } + + if !u.done && replicas <= maxProgressBars && uint64(mappedSlot) <= replicas { + u.progressOut.WriteProgress(progress.Progress{ + ID: fmt.Sprintf("%d/%d", mappedSlot, replicas), + Action: fmt.Sprintf("%-[1]*s", longestState, task.Status.State), + Current: stateToProgress(task.Status.State, rollback), + Total: maxProgress, + HideCounts: true, + }) + } + if task.Status.State == swarm.TaskStateRunning { + running++ + } + } + + if !u.done { + writeOverallProgress(u.progressOut, int(running), int(replicas), rollback) + + if running == replicas { + u.done = true + } + } + + return running == replicas, nil +} + +type globalProgressUpdater struct { + progressOut progress.Output + + initialized bool + done bool +} + +func (u *globalProgressUpdater) update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]swarm.Node, rollback bool) (bool, error) { + // If there are multiple tasks with the same node ID, favor the one + // with the *lowest* desired state. This can happen in restart + // scenarios. + tasksByNode := make(map[string]swarm.Task) + for _, task := range tasks { + if numberedStates[task.DesiredState] == 0 { + continue + } + if existingTask, ok := tasksByNode[task.NodeID]; ok { + if numberedStates[existingTask.DesiredState] <= numberedStates[task.DesiredState] { + continue + } + } + tasksByNode[task.NodeID] = task + } + + // We don't have perfect knowledge of how many nodes meet the + // constraints for this service. But the orchestrator creates tasks + // for all eligible nodes at the same time, so we should see all those + // nodes represented among the up-to-date tasks. + nodeCount := len(tasksByNode) + + if !u.initialized { + if nodeCount == 0 { + // Two possibilities: either the orchestrator hasn't created + // the tasks yet, or the service doesn't meet constraints for + // any node. Either way, we wait. + u.progressOut.WriteProgress(progress.Progress{ + ID: "overall progress", + Action: "waiting for new tasks", + }) + return false, nil + } + + writeOverallProgress(u.progressOut, 0, nodeCount, rollback) + u.initialized = true + } + + // If we had reached a converged state, check if we are still converged. + if u.done { + for _, task := range tasksByNode { + if task.Status.State != swarm.TaskStateRunning { + u.done = false + break + } + } + } + + running := 0 + + for _, task := range tasksByNode { + if node, nodeActive := activeNodes[task.NodeID]; nodeActive { + if !u.done && nodeCount <= maxProgressBars { + u.progressOut.WriteProgress(progress.Progress{ + ID: stringid.TruncateID(node.ID), + Action: fmt.Sprintf("%-[1]*s", longestState, task.Status.State), + Current: stateToProgress(task.Status.State, rollback), + Total: maxProgress, + HideCounts: true, + }) + } + if task.Status.State == swarm.TaskStateRunning { + running++ + } + } + } + + if !u.done { + writeOverallProgress(u.progressOut, running, nodeCount, rollback) + + if running == nodeCount { + u.done = true + } + } + + return running == nodeCount, nil +} diff --git a/command/service/update.go b/command/service/update.go index 77b980f599..afa0f807e9 100644 --- a/command/service/update.go +++ b/command/service/update.go @@ -31,7 +31,7 @@ func newUpdateCommand(dockerCli *command.DockerCli) *cobra.Command { Short: "Update a service", Args: cli.ExactArgs(1), RunE: func(cmd *cobra.Command, args []string) error { - return runUpdate(dockerCli, cmd.Flags(), args[0]) + return runUpdate(dockerCli, cmd.Flags(), serviceOpts, args[0]) }, } @@ -93,7 +93,7 @@ func newListOptsVar() *opts.ListOpts { return opts.NewListOptsRef(&[]string{}, nil) } -func runUpdate(dockerCli *command.DockerCli, flags *pflag.FlagSet, serviceID string) error { +func runUpdate(dockerCli *command.DockerCli, flags *pflag.FlagSet, opts *serviceOptions, serviceID string) error { apiClient := dockerCli.Client() ctx := context.Background() @@ -195,7 +195,16 @@ func runUpdate(dockerCli *command.DockerCli, flags *pflag.FlagSet, serviceID str } fmt.Fprintf(dockerCli.Out(), "%s\n", serviceID) - return nil + + if opts.detach { + if !flags.Changed("detach") { + fmt.Fprintln(dockerCli.Err(), "Since --detach=false was not specified, tasks will be updated in the background.\n"+ + "In a future release, --detach=false will become the default.") + } + return nil + } + + return waitOnService(ctx, dockerCli, serviceID, opts) } func updateService(flags *pflag.FlagSet, spec *swarm.ServiceSpec) error {