Merge pull request #31144 from aaronlehmann/synchronous-service-commands

Synchronous service create and service update
This commit is contained in:
Victor Vieux 2017-04-03 17:44:03 -07:00 committed by GitHub
commit 1551ffaa38
5 changed files with 479 additions and 6 deletions

View File

@ -7,6 +7,7 @@ import (
"github.com/docker/docker/cli"
"github.com/docker/docker/cli/command"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"golang.org/x/net/context"
)
@ -22,7 +23,7 @@ func newCreateCommand(dockerCli *command.DockerCli) *cobra.Command {
if len(args) > 1 {
opts.args = args[1:]
}
return runCreate(dockerCli, opts)
return runCreate(dockerCli, cmd.Flags(), opts)
},
}
flags := cmd.Flags()
@ -58,7 +59,7 @@ func newCreateCommand(dockerCli *command.DockerCli) *cobra.Command {
return cmd
}
func runCreate(dockerCli *command.DockerCli, opts *serviceOptions) error {
func runCreate(dockerCli *command.DockerCli, flags *pflag.FlagSet, opts *serviceOptions) error {
apiClient := dockerCli.Client()
createOpts := types.ServiceCreateOptions{}
@ -104,5 +105,14 @@ func runCreate(dockerCli *command.DockerCli, opts *serviceOptions) error {
}
fmt.Fprintf(dockerCli.Out(), "%s\n", response.ID)
return nil
if opts.detach {
if !flags.Changed("detach") {
fmt.Fprintln(dockerCli.Err(), "Since --detach=false was not specified, tasks will be created in the background.\n"+
"In a future release, --detach=false will become the default.")
}
return nil
}
return waitOnService(ctx, dockerCli, response.ID, opts)
}

View File

@ -0,0 +1,39 @@
package service
import (
"io"
"github.com/docker/docker/cli/command"
"github.com/docker/docker/cli/command/service/progress"
"github.com/docker/docker/pkg/jsonmessage"
"golang.org/x/net/context"
)
// waitOnService waits for the service to converge. It outputs a progress bar,
// if appopriate based on the CLI flags.
func waitOnService(ctx context.Context, dockerCli *command.DockerCli, serviceID string, opts *serviceOptions) error {
errChan := make(chan error, 1)
pipeReader, pipeWriter := io.Pipe()
go func() {
errChan <- progress.ServiceProgress(ctx, dockerCli.Client(), serviceID, pipeWriter)
}()
if opts.quiet {
go func() {
for {
var buf [1024]byte
if _, err := pipeReader.Read(buf[:]); err != nil {
return
}
}
}()
return <-errChan
}
err := jsonmessage.DisplayJSONMessagesToStream(pipeReader, dockerCli.Out(), nil)
if err == nil {
err = <-errChan
}
return err
}

View File

@ -333,6 +333,9 @@ func convertExtraHostsToSwarmHosts(extraHosts []string) []string {
}
type serviceOptions struct {
detach bool
quiet bool
name string
labels opts.ListOpts
containerLabels opts.ListOpts
@ -496,6 +499,9 @@ func (opts *serviceOptions) ToService() (swarm.ServiceSpec, error) {
// addServiceFlags adds all flags that are common to both `create` and `update`.
// Any flags that are not common are added separately in the individual command
func addServiceFlags(flags *pflag.FlagSet, opts *serviceOptions) {
flags.BoolVarP(&opts.detach, "detach", "d", true, "Exit immediately instead of waiting for the service to converge")
flags.BoolVarP(&opts.quiet, "quiet", "q", false, "Suppress progress output")
flags.StringVarP(&opts.workdir, flagWorkdir, "w", "", "Working directory inside the container")
flags.StringVarP(&opts.user, flagUser, "u", "", "Username or UID (format: <name|uid>[:<group|gid>])")
flags.StringVar(&opts.hostname, flagHostname, "", "Container hostname")

View File

@ -0,0 +1,409 @@
package progress
import (
"errors"
"fmt"
"io"
"os"
"os/signal"
"time"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/api/types/swarm"
"github.com/docker/docker/client"
"github.com/docker/docker/pkg/progress"
"github.com/docker/docker/pkg/streamformatter"
"github.com/docker/docker/pkg/stringid"
"golang.org/x/net/context"
)
var (
numberedStates = map[swarm.TaskState]int64{
swarm.TaskStateNew: 1,
swarm.TaskStateAllocated: 2,
swarm.TaskStatePending: 3,
swarm.TaskStateAssigned: 4,
swarm.TaskStateAccepted: 5,
swarm.TaskStatePreparing: 6,
swarm.TaskStateReady: 7,
swarm.TaskStateStarting: 8,
swarm.TaskStateRunning: 9,
}
longestState int
)
const (
maxProgress = 9
maxProgressBars = 20
)
type progressUpdater interface {
update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]swarm.Node, rollback bool) (bool, error)
}
func init() {
for state := range numberedStates {
if len(state) > longestState {
longestState = len(state)
}
}
}
func stateToProgress(state swarm.TaskState, rollback bool) int64 {
if !rollback {
return numberedStates[state]
}
return int64(len(numberedStates)) - numberedStates[state]
}
// ServiceProgress outputs progress information for convergence of a service.
func ServiceProgress(ctx context.Context, client client.APIClient, serviceID string, progressWriter io.WriteCloser) error {
defer progressWriter.Close()
progressOut := streamformatter.NewJSONStreamFormatter().NewProgressOutput(progressWriter, false)
sigint := make(chan os.Signal, 1)
signal.Notify(sigint, os.Interrupt)
defer signal.Stop(sigint)
taskFilter := filters.NewArgs()
taskFilter.Add("service", serviceID)
taskFilter.Add("_up-to-date", "true")
getUpToDateTasks := func() ([]swarm.Task, error) {
return client.TaskList(ctx, types.TaskListOptions{Filters: taskFilter})
}
var (
updater progressUpdater
converged bool
convergedAt time.Time
monitor = 5 * time.Second
rollback bool
)
for {
service, _, err := client.ServiceInspectWithRaw(ctx, serviceID)
if err != nil {
return err
}
if service.Spec.UpdateConfig != nil && service.Spec.UpdateConfig.Monitor != 0 {
monitor = service.Spec.UpdateConfig.Monitor
}
if updater == nil {
updater, err = initializeUpdater(service, progressOut)
if err != nil {
return err
}
}
if service.UpdateStatus != nil {
switch service.UpdateStatus.State {
case swarm.UpdateStateUpdating:
rollback = false
case swarm.UpdateStateCompleted:
if !converged {
return nil
}
case swarm.UpdateStatePaused:
return fmt.Errorf("service update paused: %s", service.UpdateStatus.Message)
case swarm.UpdateStateRollbackStarted:
if !rollback && service.UpdateStatus.Message != "" {
progressOut.WriteProgress(progress.Progress{
ID: "rollback",
Action: service.UpdateStatus.Message,
})
}
rollback = true
case swarm.UpdateStateRollbackPaused:
return fmt.Errorf("service rollback paused: %s", service.UpdateStatus.Message)
case swarm.UpdateStateRollbackCompleted:
if !converged {
return fmt.Errorf("service rolled back: %s", service.UpdateStatus.Message)
}
}
}
if converged && time.Since(convergedAt) >= monitor {
return nil
}
tasks, err := getUpToDateTasks()
if err != nil {
return err
}
activeNodes, err := getActiveNodes(ctx, client)
if err != nil {
return err
}
converged, err = updater.update(service, tasks, activeNodes, rollback)
if err != nil {
return err
}
if converged {
if convergedAt.IsZero() {
convergedAt = time.Now()
}
wait := monitor - time.Since(convergedAt)
if wait >= 0 {
progressOut.WriteProgress(progress.Progress{
// Ideally this would have no ID, but
// the progress rendering code behaves
// poorly on an "action" with no ID. It
// returns the cursor to the beginning
// of the line, so the first character
// may be difficult to read. Then the
// output is overwritten by the shell
// prompt when the command finishes.
ID: "verify",
Action: fmt.Sprintf("Waiting %d seconds to verify that tasks are stable...", wait/time.Second+1),
})
}
} else {
if !convergedAt.IsZero() {
progressOut.WriteProgress(progress.Progress{
ID: "verify",
Action: "Detected task failure",
})
}
convergedAt = time.Time{}
}
select {
case <-time.After(200 * time.Millisecond):
case <-sigint:
if !converged {
progress.Message(progressOut, "", "Operation continuing in background.")
progress.Messagef(progressOut, "", "Use `docker service ps %s` to check progress.", serviceID)
}
return nil
}
}
}
func getActiveNodes(ctx context.Context, client client.APIClient) (map[string]swarm.Node, error) {
nodes, err := client.NodeList(ctx, types.NodeListOptions{})
if err != nil {
return nil, err
}
activeNodes := make(map[string]swarm.Node)
for _, n := range nodes {
if n.Status.State != swarm.NodeStateDown {
activeNodes[n.ID] = n
}
}
return activeNodes, nil
}
func initializeUpdater(service swarm.Service, progressOut progress.Output) (progressUpdater, error) {
if service.Spec.Mode.Replicated != nil && service.Spec.Mode.Replicated.Replicas != nil {
return &replicatedProgressUpdater{
progressOut: progressOut,
}, nil
}
if service.Spec.Mode.Global != nil {
return &globalProgressUpdater{
progressOut: progressOut,
}, nil
}
return nil, errors.New("unrecognized service mode")
}
func writeOverallProgress(progressOut progress.Output, numerator, denominator int, rollback bool) {
if rollback {
progressOut.WriteProgress(progress.Progress{
ID: "overall progress",
Action: fmt.Sprintf("rolling back update: %d out of %d tasks", numerator, denominator),
})
return
}
progressOut.WriteProgress(progress.Progress{
ID: "overall progress",
Action: fmt.Sprintf("%d out of %d tasks", numerator, denominator),
})
}
type replicatedProgressUpdater struct {
progressOut progress.Output
// used for maping slots to a contiguous space
// this also causes progress bars to appear in order
slotMap map[int]int
initialized bool
done bool
}
func (u *replicatedProgressUpdater) update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]swarm.Node, rollback bool) (bool, error) {
if service.Spec.Mode.Replicated == nil || service.Spec.Mode.Replicated.Replicas == nil {
return false, errors.New("no replica count")
}
replicas := *service.Spec.Mode.Replicated.Replicas
if !u.initialized {
u.slotMap = make(map[int]int)
// Draw progress bars in order
writeOverallProgress(u.progressOut, 0, int(replicas), rollback)
if replicas <= maxProgressBars {
for i := uint64(1); i <= replicas; i++ {
progress.Update(u.progressOut, fmt.Sprintf("%d/%d", i, replicas), " ")
}
}
u.initialized = true
}
// If there are multiple tasks with the same slot number, favor the one
// with the *lowest* desired state. This can happen in restart
// scenarios.
tasksBySlot := make(map[int]swarm.Task)
for _, task := range tasks {
if numberedStates[task.DesiredState] == 0 {
continue
}
if existingTask, ok := tasksBySlot[task.Slot]; ok {
if numberedStates[existingTask.DesiredState] <= numberedStates[task.DesiredState] {
continue
}
}
if _, nodeActive := activeNodes[task.NodeID]; nodeActive {
tasksBySlot[task.Slot] = task
}
}
// If we had reached a converged state, check if we are still converged.
if u.done {
for _, task := range tasksBySlot {
if task.Status.State != swarm.TaskStateRunning {
u.done = false
break
}
}
}
running := uint64(0)
for _, task := range tasksBySlot {
mappedSlot := u.slotMap[task.Slot]
if mappedSlot == 0 {
mappedSlot = len(u.slotMap) + 1
u.slotMap[task.Slot] = mappedSlot
}
if !u.done && replicas <= maxProgressBars && uint64(mappedSlot) <= replicas {
u.progressOut.WriteProgress(progress.Progress{
ID: fmt.Sprintf("%d/%d", mappedSlot, replicas),
Action: fmt.Sprintf("%-[1]*s", longestState, task.Status.State),
Current: stateToProgress(task.Status.State, rollback),
Total: maxProgress,
HideCounts: true,
})
}
if task.Status.State == swarm.TaskStateRunning {
running++
}
}
if !u.done {
writeOverallProgress(u.progressOut, int(running), int(replicas), rollback)
if running == replicas {
u.done = true
}
}
return running == replicas, nil
}
type globalProgressUpdater struct {
progressOut progress.Output
initialized bool
done bool
}
func (u *globalProgressUpdater) update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]swarm.Node, rollback bool) (bool, error) {
// If there are multiple tasks with the same node ID, favor the one
// with the *lowest* desired state. This can happen in restart
// scenarios.
tasksByNode := make(map[string]swarm.Task)
for _, task := range tasks {
if numberedStates[task.DesiredState] == 0 {
continue
}
if existingTask, ok := tasksByNode[task.NodeID]; ok {
if numberedStates[existingTask.DesiredState] <= numberedStates[task.DesiredState] {
continue
}
}
tasksByNode[task.NodeID] = task
}
// We don't have perfect knowledge of how many nodes meet the
// constraints for this service. But the orchestrator creates tasks
// for all eligible nodes at the same time, so we should see all those
// nodes represented among the up-to-date tasks.
nodeCount := len(tasksByNode)
if !u.initialized {
if nodeCount == 0 {
// Two possibilities: either the orchestrator hasn't created
// the tasks yet, or the service doesn't meet constraints for
// any node. Either way, we wait.
u.progressOut.WriteProgress(progress.Progress{
ID: "overall progress",
Action: "waiting for new tasks",
})
return false, nil
}
writeOverallProgress(u.progressOut, 0, nodeCount, rollback)
u.initialized = true
}
// If we had reached a converged state, check if we are still converged.
if u.done {
for _, task := range tasksByNode {
if task.Status.State != swarm.TaskStateRunning {
u.done = false
break
}
}
}
running := 0
for _, task := range tasksByNode {
if node, nodeActive := activeNodes[task.NodeID]; nodeActive {
if !u.done && nodeCount <= maxProgressBars {
u.progressOut.WriteProgress(progress.Progress{
ID: stringid.TruncateID(node.ID),
Action: fmt.Sprintf("%-[1]*s", longestState, task.Status.State),
Current: stateToProgress(task.Status.State, rollback),
Total: maxProgress,
HideCounts: true,
})
}
if task.Status.State == swarm.TaskStateRunning {
running++
}
}
}
if !u.done {
writeOverallProgress(u.progressOut, running, nodeCount, rollback)
if running == nodeCount {
u.done = true
}
}
return running == nodeCount, nil
}

View File

@ -31,7 +31,7 @@ func newUpdateCommand(dockerCli *command.DockerCli) *cobra.Command {
Short: "Update a service",
Args: cli.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
return runUpdate(dockerCli, cmd.Flags(), args[0])
return runUpdate(dockerCli, cmd.Flags(), serviceOpts, args[0])
},
}
@ -93,7 +93,7 @@ func newListOptsVar() *opts.ListOpts {
return opts.NewListOptsRef(&[]string{}, nil)
}
func runUpdate(dockerCli *command.DockerCli, flags *pflag.FlagSet, serviceID string) error {
func runUpdate(dockerCli *command.DockerCli, flags *pflag.FlagSet, opts *serviceOptions, serviceID string) error {
apiClient := dockerCli.Client()
ctx := context.Background()
@ -195,7 +195,16 @@ func runUpdate(dockerCli *command.DockerCli, flags *pflag.FlagSet, serviceID str
}
fmt.Fprintf(dockerCli.Out(), "%s\n", serviceID)
return nil
if opts.detach {
if !flags.Changed("detach") {
fmt.Fprintln(dockerCli.Err(), "Since --detach=false was not specified, tasks will be updated in the background.\n"+
"In a future release, --detach=false will become the default.")
}
return nil
}
return waitOnService(ctx, dockerCli, serviceID, opts)
}
func updateService(flags *pflag.FlagSet, spec *swarm.ServiceSpec) error {