diff --git a/cli/command/service/create.go b/cli/command/service/create.go index 4c709eb3a6..ad6dcb3c8f 100644 --- a/cli/command/service/create.go +++ b/cli/command/service/create.go @@ -31,7 +31,7 @@ func newCreateCommand(dockerCli command.Cli) *cobra.Command { }, } flags := cmd.Flags() - flags.StringVar(&opts.mode, flagMode, "replicated", "Service mode (replicated or global)") + flags.StringVar(&opts.mode, flagMode, "replicated", "Service mode (replicated, global, replicated-job, or global-job)") flags.StringVar(&opts.name, flagName, "", "Service name") addServiceFlags(flags, opts, buildServiceDefaultFlagMapping()) diff --git a/cli/command/service/formatter.go b/cli/command/service/formatter.go index 08a8360f64..d283d87fdc 100644 --- a/cli/command/service/formatter.go +++ b/cli/command/service/formatter.go @@ -596,6 +596,10 @@ func (c *serviceContext) Mode() string { return "global" case c.service.Spec.Mode.Replicated != nil: return "replicated" + case c.service.Spec.Mode.ReplicatedJob != nil: + return "replicated job" + case c.service.Spec.Mode.GlobalJob != nil: + return "global job" default: return "" } @@ -604,10 +608,33 @@ func (c *serviceContext) Mode() string { func (c *serviceContext) Replicas() string { s := &c.service - var running, desired uint64 + var running, desired, completed uint64 if s.ServiceStatus != nil { running = c.service.ServiceStatus.RunningTasks desired = c.service.ServiceStatus.DesiredTasks + completed = c.service.ServiceStatus.CompletedTasks + } + // for jobs, we will not include the max per node, even if it is set. jobs + // include instead the progress of the job as a whole, in addition to the + // current running state. the system respects max per node, but if we + // included it in the list output, the lines for jobs would be entirely too + // long and make the UI look bad. + if s.Spec.Mode.ReplicatedJob != nil { + return fmt.Sprintf( + "%d/%d (%d/%d completed)", + running, desired, completed, *s.Spec.Mode.ReplicatedJob.TotalCompletions, + ) + } + if s.Spec.Mode.GlobalJob != nil { + // for global jobs, we need to do a little math. desired tasks are only + // the tasks that have not yet actually reached the Completed state. + // Completed tasks have reached the completed state. the TOTAL number + // of tasks to run is the sum of the tasks desired to still complete, + // and the tasks actually completed. + return fmt.Sprintf( + "%d/%d (%d/%d completed)", + running, desired, completed, desired+completed, + ) } if r := c.maxReplicas(); r > 0 { return fmt.Sprintf("%d/%d (max %d per node)", running, desired, r) diff --git a/cli/command/service/formatter_test.go b/cli/command/service/formatter_test.go index ef2693f01c..31510e27bb 100644 --- a/cli/command/service/formatter_test.go +++ b/cli/command/service/formatter_test.go @@ -15,6 +15,13 @@ import ( ) func TestServiceContextWrite(t *testing.T) { + var ( + // we need a pair of variables for setting the job parameters, because + // those parameters take pointers to uint64, which we can't make as a + // literal + varThree uint64 = 3 + varTen uint64 = 10 + ) cases := []struct { context formatter.Context expected string @@ -38,6 +45,8 @@ func TestServiceContextWrite(t *testing.T) { 01_baz baz global 1/3 *:80->8080/tcp 04_qux2 qux2 replicated 3/3 (max 2 per node) 03_qux10 qux10 replicated 2/3 (max 1 per node) +05_job1 zarp1 replicated job 2/3 (5/10 completed) +06_job2 zarp2 global job 1/1 (3/4 completed) `, }, { @@ -46,6 +55,8 @@ func TestServiceContextWrite(t *testing.T) { 01_baz 04_qux2 03_qux10 +05_job1 +06_job2 `, }, { @@ -55,6 +66,8 @@ bar replicated baz global qux2 replicated qux10 replicated +zarp1 replicated job +zarp2 global job `, }, { @@ -64,6 +77,8 @@ bar baz qux2 qux10 +zarp1 +zarp2 `, }, // Raw Format @@ -77,6 +92,8 @@ qux10 id: 01_baz id: 04_qux2 id: 03_qux10 +id: 05_job1 +id: 06_job2 `, }, // Custom Format @@ -86,6 +103,8 @@ id: 03_qux10 baz qux2 qux10 +zarp1 +zarp2 `, }, } @@ -170,6 +189,37 @@ qux10 DesiredTasks: 3, }, }, + { + ID: "05_job1", + Spec: swarm.ServiceSpec{ + Annotations: swarm.Annotations{Name: "zarp1"}, + Mode: swarm.ServiceMode{ + ReplicatedJob: &swarm.ReplicatedJob{ + MaxConcurrent: &varThree, + TotalCompletions: &varTen, + }, + }, + }, + ServiceStatus: &swarm.ServiceStatus{ + RunningTasks: 2, + DesiredTasks: 3, + CompletedTasks: 5, + }, + }, + { + ID: "06_job2", + Spec: swarm.ServiceSpec{ + Annotations: swarm.Annotations{Name: "zarp2"}, + Mode: swarm.ServiceMode{ + GlobalJob: &swarm.GlobalJob{}, + }, + }, + ServiceStatus: &swarm.ServiceStatus{ + RunningTasks: 1, + DesiredTasks: 1, + CompletedTasks: 3, + }, + }, } out := bytes.NewBufferString("") testcase.context.Output = out diff --git a/cli/command/service/list.go b/cli/command/service/list.go index 61e414c6b7..56cb3a23ba 100644 --- a/cli/command/service/list.go +++ b/cli/command/service/list.go @@ -111,6 +111,8 @@ func AppendServiceStatus(ctx context.Context, c client.APIClient, services []swa status := map[string]*swarm.ServiceStatus{} taskFilter := filters.NewArgs() for i, s := range services { + // there is no need in this switch to check for job modes. jobs are not + // supported until after ServiceStatus was introduced. switch { case s.ServiceStatus != nil: // Server already returned service-status, so we don't diff --git a/cli/command/service/opts.go b/cli/command/service/opts.go index e0beeba00e..ab2931e873 100644 --- a/cli/command/service/opts.go +++ b/cli/command/service/opts.go @@ -508,8 +508,9 @@ type serviceOptions struct { resources resourceOptions stopGrace opts.DurationOpt - replicas Uint64Opt - mode string + replicas Uint64Opt + mode string + maxConcurrent Uint64Opt restartPolicy restartPolicyOptions constraints opts.ListOpts @@ -554,18 +555,45 @@ func (options *serviceOptions) ToServiceMode() (swarm.ServiceMode, error) { switch options.mode { case "global": if options.replicas.Value() != nil { - return serviceMode, errors.Errorf("replicas can only be used with replicated mode") + return serviceMode, errors.Errorf("replicas can only be used with replicated or replicated-job mode") } if options.maxReplicas > 0 { - return serviceMode, errors.New("replicas-max-per-node can only be used with replicated mode") + return serviceMode, errors.New("replicas-max-per-node can only be used with replicated or replicated-job mode") + } + if options.maxConcurrent.Value() != nil { + return serviceMode, errors.New("max-concurrent can only be used with replicated-job mode") } serviceMode.Global = &swarm.GlobalService{} case "replicated": + if options.maxConcurrent.Value() != nil { + return serviceMode, errors.New("max-concurrent can only be used with replicated-job mode") + } + serviceMode.Replicated = &swarm.ReplicatedService{ Replicas: options.replicas.Value(), } + case "replicated-job": + concurrent := options.maxConcurrent.Value() + if concurrent == nil { + concurrent = options.replicas.Value() + } + serviceMode.ReplicatedJob = &swarm.ReplicatedJob{ + MaxConcurrent: concurrent, + TotalCompletions: options.replicas.Value(), + } + case "global-job": + if options.maxReplicas > 0 { + return serviceMode, errors.New("replicas-max-per-node can only be used with replicated or replicated-job mode") + } + if options.maxConcurrent.Value() != nil { + return serviceMode, errors.New("max-concurrent can only be used with replicated-job mode") + } + if options.replicas.Value() != nil { + return serviceMode, errors.Errorf("replicas can only be used with replicated or replicated-job mode") + } + serviceMode.GlobalJob = &swarm.GlobalJob{} default: return serviceMode, errors.Errorf("Unknown mode: %s, only replicated and global supported", options.mode) } @@ -579,14 +607,13 @@ func (options *serviceOptions) ToStopGracePeriod(flags *pflag.FlagSet) *time.Dur return nil } -func (options *serviceOptions) ToService(ctx context.Context, apiClient client.NetworkAPIClient, flags *pflag.FlagSet) (swarm.ServiceSpec, error) { - var service swarm.ServiceSpec - +// makeEnv gets the environment variables from the command line options and +// returns a slice of strings to use in the service spec when doing ToService +func (options *serviceOptions) makeEnv() ([]string, error) { envVariables, err := opts.ReadKVEnvStrings(options.envFile.GetAll(), options.env.GetAll()) if err != nil { - return service, err + return nil, err } - currentEnv := make([]string, 0, len(envVariables)) for _, env := range envVariables { // need to process each var, in order k := strings.SplitN(env, "=", 2)[0] @@ -601,6 +628,24 @@ func (options *serviceOptions) ToService(ctx context.Context, apiClient client.N currentEnv = append(currentEnv, env) } + return currentEnv, nil +} + +// ToService takes the set of flags passed to the command and converts them +// into a service spec. +// +// Takes an API client as the second argument in order to resolve network names +// from the flags into network IDs. +// +// Returns an error if any flags are invalid or contradictory. +func (options *serviceOptions) ToService(ctx context.Context, apiClient client.NetworkAPIClient, flags *pflag.FlagSet) (swarm.ServiceSpec, error) { + var service swarm.ServiceSpec + + currentEnv, err := options.makeEnv() + if err != nil { + return service, err + } + healthConfig, err := options.healthcheck.toHealthConfig() if err != nil { return service, err @@ -611,6 +656,16 @@ func (options *serviceOptions) ToService(ctx context.Context, apiClient client.N return service, err } + updateConfig := options.update.updateConfig(flags) + rollbackConfig := options.rollback.rollbackConfig(flags) + + // update and rollback configuration is not supported for jobs. If these + // flags are not set, then the values will be nil. If they are non-nil, + // then return an error. + if (serviceMode.ReplicatedJob != nil || serviceMode.GlobalJob != nil) && (updateConfig != nil || rollbackConfig != nil) { + return service, errors.Errorf("update and rollback configuration is not supported for jobs") + } + networks := convertNetworks(options.networks) for i, net := range networks { nwID, err := resolveNetworkID(ctx, apiClient, net.Target) @@ -671,8 +726,8 @@ func (options *serviceOptions) ToService(ctx context.Context, apiClient client.N LogDriver: options.logDriver.toLogDriver(), }, Mode: serviceMode, - UpdateConfig: options.update.updateConfig(flags), - RollbackConfig: options.rollback.rollbackConfig(flags), + UpdateConfig: updateConfig, + RollbackConfig: rollbackConfig, EndpointSpec: options.endpoint.ToEndpointSpec(), } @@ -769,6 +824,8 @@ func addServiceFlags(flags *pflag.FlagSet, opts *serviceOptions, defaultFlagValu flags.Var(&opts.stopGrace, flagStopGracePeriod, flagDesc(flagStopGracePeriod, "Time to wait before force killing a container (ns|us|ms|s|m|h)")) flags.Var(&opts.replicas, flagReplicas, "Number of tasks") + flags.Var(&opts.maxConcurrent, flagConcurrent, "Number of job tasks to run concurrently (default equal to --replicas)") + flags.SetAnnotation(flagConcurrent, "version", []string{"1.41"}) flags.Uint64Var(&opts.maxReplicas, flagMaxReplicas, defaultFlagValues.getUint64(flagMaxReplicas), "Maximum number of tasks per node (default 0 = unlimited)") flags.SetAnnotation(flagMaxReplicas, "version", []string{"1.40"}) @@ -878,6 +935,7 @@ const ( flagLimitCPU = "limit-cpu" flagLimitMemory = "limit-memory" flagMaxReplicas = "replicas-max-per-node" + flagConcurrent = "max-concurrent" flagMode = "mode" flagMount = "mount" flagMountRemove = "mount-rm" diff --git a/cli/command/service/opts_test.go b/cli/command/service/opts_test.go index a480f9c5c9..41560db698 100644 --- a/cli/command/service/opts_test.go +++ b/cli/command/service/opts_test.go @@ -285,7 +285,7 @@ func TestToServiceMaxReplicasGlobalModeConflict(t *testing.T) { maxReplicas: 1, } _, err := opt.ToServiceMode() - assert.Error(t, err, "replicas-max-per-node can only be used with replicated mode") + assert.Error(t, err, "replicas-max-per-node can only be used with replicated or replicated-job mode") } func TestToServiceSysCtls(t *testing.T) { diff --git a/cli/command/service/progress/progress.go b/cli/command/service/progress/progress.go index 4b9cdd7337..3f118d81a4 100644 --- a/cli/command/service/progress/progress.go +++ b/cli/command/service/progress/progress.go @@ -7,6 +7,7 @@ import ( "io" "os" "os/signal" + "strconv" "strings" "time" @@ -45,6 +46,7 @@ var ( const ( maxProgress = 9 maxProgressBars = 20 + maxJobProgress = 10 ) type progressUpdater interface { @@ -53,7 +55,9 @@ type progressUpdater interface { func init() { for state := range numberedStates { - if !terminalState(state) && len(state) > longestState { + // for jobs, we use the "complete" state, and so it should be factored + // in to the computation of the longest state. + if (!terminalState(state) || state == swarm.TaskStateComplete) && len(state) > longestState { longestState = len(state) } } @@ -164,6 +168,18 @@ func ServiceProgress(ctx context.Context, client client.APIClient, serviceID str return err } if converged { + // if the service is a job, there's no need to verify it. jobs are + // stay done once they're done. skip the verification and just end + // the progress monitoring. + // + // only job services have a non-nil job status, which means we can + // use the presence of this field to check if the service is a job + // here. + if service.JobStatus != nil { + progress.Message(progressOut, "", "job complete") + return nil + } + if convergedAt.IsZero() { convergedAt = time.Now() } @@ -230,6 +246,14 @@ func initializeUpdater(service swarm.Service, progressOut progress.Output) (prog progressOut: progressOut, }, nil } + if service.Spec.Mode.ReplicatedJob != nil { + return newReplicatedJobProgressUpdater(service, progressOut), nil + } + if service.Spec.Mode.GlobalJob != nil { + return &globalJobProgressUpdater{ + progressOut: progressOut, + }, nil + } return nil, errors.New("unrecognized service mode") } @@ -502,3 +526,322 @@ func (u *globalProgressUpdater) writeTaskProgress(task swarm.Task, nodeCount int }) } } + +// replicatedJobProgressUpdater outputs the progress of a replicated job. This +// progress consists of a few main elements. +// +// The first is the progress bar for the job as a whole. This shows the number +// of completed out of total tasks for the job. Tasks that are currently +// running are not counted. +// +// The second is the status of the "active" tasks for the job. We count a task +// as "active" if it has any non-terminal state, not just running. This is +// shown as a fraction of the maximum concurrent tasks that can be running, +// which is the less of MaxConcurrent or TotalCompletions - completed tasks. +type replicatedJobProgressUpdater struct { + progressOut progress.Output + + // jobIteration is the service's job iteration, used to exclude tasks + // belonging to earlier iterations. + jobIteration uint64 + + // concurrent is the value of MaxConcurrent as an int. That is, the maximum + // number of tasks allowed to be run simultaneously. + concurrent int + + // total is the value of TotalCompletions, the number of complete tasks + // desired. + total int + + // initialized is set to true after the first time update is called. the + // first time update is called, the components of the progress UI are all + // written out in an initial pass. this ensure that they will subsequently + // be in order, no matter how they are updated. + initialized bool + + // progressDigits is the number digits in total, so that we know how much + // to pad the job progress field with. + // + // when we're writing the number of completed over total tasks, we need to + // pad the numerator with spaces, so that the bar doesn't jump around. + // we'll compute that once on init, and then reuse it over and over. + // + // we compute this in the least clever way possible: convert to string + // with strconv.Itoa, then take the len. + progressDigits int + + // activeDigits is the same, but for active tasks, and it applies to both + // the numerator and denominator. + activeDigits int +} + +func newReplicatedJobProgressUpdater(service swarm.Service, progressOut progress.Output) *replicatedJobProgressUpdater { + u := &replicatedJobProgressUpdater{ + progressOut: progressOut, + concurrent: int(*service.Spec.Mode.ReplicatedJob.MaxConcurrent), + total: int(*service.Spec.Mode.ReplicatedJob.TotalCompletions), + jobIteration: service.JobStatus.JobIteration.Index, + } + u.progressDigits = len(strconv.Itoa(u.total)) + u.activeDigits = len(strconv.Itoa(u.concurrent)) + + return u +} + +// update writes out the progress of the replicated job. +func (u *replicatedJobProgressUpdater) update(_ swarm.Service, tasks []swarm.Task, _ map[string]struct{}, _ bool) (bool, error) { + if !u.initialized { + u.writeOverallProgress(0, 0) + + // only write out progress bars if there will be less than the maximum + if u.total <= maxProgressBars { + for i := 1; i <= u.total; i++ { + u.progressOut.WriteProgress(progress.Progress{ + ID: fmt.Sprintf("%d/%d", i, u.total), + Action: " ", + }) + } + } + u.initialized = true + } + + // tasksBySlot is a mapping of slot number to the task valid for that slot. + // it deduplicated tasks occupying the same numerical slot but in different + // states. + tasksBySlot := make(map[int]swarm.Task) + for _, task := range tasks { + // first, check if the task belongs to this service iteration. skip + // tasks belonging to other iterations. + if task.JobIteration == nil || task.JobIteration.Index != u.jobIteration { + continue + } + + // then, if the task is in an unknown state, ignore it. + if numberedStates[task.DesiredState] == 0 || + numberedStates[task.Status.State] == 0 { + continue + } + + // finally, check if the task already exists in the map + if existing, ok := tasksBySlot[task.Slot]; ok { + // if so, use the task with the lower actual state + if numberedStates[existing.Status.State] > numberedStates[task.Status.State] { + tasksBySlot[task.Slot] = task + } + } else { + // otherwise, just add it to the map. + tasksBySlot[task.Slot] = task + } + } + + activeTasks := 0 + completeTasks := 0 + + for i := 0; i < len(tasksBySlot); i++ { + task := tasksBySlot[i] + u.writeTaskProgress(task) + + if numberedStates[task.Status.State] < numberedStates[swarm.TaskStateComplete] { + activeTasks++ + } + + if task.Status.State == swarm.TaskStateComplete { + completeTasks++ + } + } + + u.writeOverallProgress(activeTasks, completeTasks) + + return completeTasks == u.total, nil +} + +func (u *replicatedJobProgressUpdater) writeOverallProgress(active, completed int) { + u.progressOut.WriteProgress(progress.Progress{ + ID: "job progress", + Action: fmt.Sprintf( + // * means "use the next positional arg to compute padding" + "%*d out of %d complete", u.progressDigits, completed, u.total, + ), + Current: int64(completed), + Total: int64(u.total), + HideCounts: true, + }) + + // actualDesired is the lesser of MaxConcurrent, or the remaining tasks + actualDesired := u.total - completed + if actualDesired > u.concurrent { + actualDesired = u.concurrent + } + + u.progressOut.WriteProgress(progress.Progress{ + ID: "active tasks", + Action: fmt.Sprintf( + // [n] notation lets us select a specific argument, 1-indexed + // putting the [1] before the star means "make the string this + // length". putting the [2] or the [3] means "use this argument + // here" + // + // we pad both the numerator and the denominator because, as the + // job reaches its conclusion, the number of possible concurrent + // tasks will go down, as fewer than MaxConcurrent tasks are needed + // to complete the job. + "%[1]*[2]d out of %[1]*[3]d tasks", u.activeDigits, active, actualDesired, + ), + }) +} + +func (u *replicatedJobProgressUpdater) writeTaskProgress(task swarm.Task) { + if u.total > maxProgressBars { + return + } + + if task.Status.Err != "" { + u.progressOut.WriteProgress(progress.Progress{ + ID: fmt.Sprintf("%d/%d", task.Slot+1, u.total), + Action: truncError(task.Status.Err), + }) + return + } + + u.progressOut.WriteProgress(progress.Progress{ + ID: fmt.Sprintf("%d/%d", task.Slot+1, u.total), + Action: fmt.Sprintf("%-*s", longestState, task.Status.State), + Current: numberedStates[task.Status.State], + Total: maxJobProgress, + HideCounts: true, + }) +} + +// globalJobProgressUpdater is the progressUpdater for GlobalJob-mode services. +// Because GlobalJob services are so much simpler than ReplicatedJob services, +// this updater is in turn simpler as well. +type globalJobProgressUpdater struct { + progressOut progress.Output + + // initialized is used to detect the first pass of update, and to perform + // first time initialization logic at that time. + initialized bool + + // total is the total number of tasks expected for this job + total int + + // progressDigits is the number of spaces to pad the numerator of the job + // progress field + progressDigits int + + taskNodes map[string]struct{} +} + +func (u *globalJobProgressUpdater) update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]struct{}, _ bool) (bool, error) { + if !u.initialized { + // if there are not yet tasks, then return early. + if len(tasks) == 0 && len(activeNodes) != 0 { + u.progressOut.WriteProgress(progress.Progress{ + ID: "job progress", + Action: "waiting for tasks", + }) + return false, nil + } + + // when a global job starts, all of its tasks are created at once, so + // we can use len(tasks) to know how many we're expecting. + u.taskNodes = map[string]struct{}{} + + for _, task := range tasks { + // skip any tasks not belonging to this job iteration. + if task.JobIteration == nil || task.JobIteration.Index != service.JobStatus.JobIteration.Index { + continue + } + + // collect the list of all node IDs for this service. + // + // basically, global jobs will execute on any new nodes that join + // the cluster in the future. to avoid making things complicated, + // we will only check the progress of the initial set of nodes. if + // any new nodes come online during the operation, we will ignore + // them. + u.taskNodes[task.NodeID] = struct{}{} + } + + u.total = len(u.taskNodes) + u.progressDigits = len(strconv.Itoa(u.total)) + + u.writeOverallProgress(0) + u.initialized = true + } + + // tasksByNodeID maps a NodeID to the latest task for that Node ID. this + // lets us pick only the latest task for any given node. + tasksByNodeID := map[string]swarm.Task{} + + for _, task := range tasks { + // skip any tasks not belonging to this job iteration + if task.JobIteration == nil || task.JobIteration.Index != service.JobStatus.JobIteration.Index { + continue + } + + // if the task is not on one of the initial set of nodes, ignore it. + if _, ok := u.taskNodes[task.NodeID]; !ok { + continue + } + + // if there is already a task recorded for this node, choose the one + // with the lower state + if oldtask, ok := tasksByNodeID[task.NodeID]; ok { + if numberedStates[oldtask.Status.State] > numberedStates[task.Status.State] { + tasksByNodeID[task.NodeID] = task + } + } else { + tasksByNodeID[task.NodeID] = task + } + } + + complete := 0 + for _, task := range tasksByNodeID { + u.writeTaskProgress(task) + if task.Status.State == swarm.TaskStateComplete { + complete++ + } + } + + u.writeOverallProgress(complete) + return complete == u.total, nil +} + +func (u *globalJobProgressUpdater) writeTaskProgress(task swarm.Task) { + if u.total > maxProgressBars { + return + } + + if task.Status.Err != "" { + u.progressOut.WriteProgress(progress.Progress{ + ID: task.NodeID, + Action: truncError(task.Status.Err), + }) + return + } + + u.progressOut.WriteProgress(progress.Progress{ + ID: task.NodeID, + Action: fmt.Sprintf("%-*s", longestState, task.Status.State), + Current: numberedStates[task.Status.State], + Total: maxJobProgress, + HideCounts: true, + }) +} + +func (u *globalJobProgressUpdater) writeOverallProgress(complete int) { + // all tasks for a global job are active at once, so we only write out the + // total progress. + u.progressOut.WriteProgress(progress.Progress{ + // see (*replicatedJobProgressUpdater).writeOverallProgress for an + // explanation fo the advanced fmt use in this function. + ID: "job progress", + Action: fmt.Sprintf( + "%*d out of %d complete", u.progressDigits, complete, u.total, + ), + Current: int64(complete), + Total: int64(u.total), + HideCounts: true, + }) +} diff --git a/cli/command/service/progress/progress_test.go b/cli/command/service/progress/progress_test.go index d45ba7c289..0ff83d2787 100644 --- a/cli/command/service/progress/progress_test.go +++ b/cli/command/service/progress/progress_test.go @@ -42,6 +42,20 @@ func (u updaterTester) testUpdater(tasks []swarm.Task, expectedConvergence bool, assert.Check(u.t, is.DeepEqual(expectedProgress, u.p.p)) } +func (u updaterTester) testUpdaterNoOrder(tasks []swarm.Task, expectedConvergence bool, expectedProgress []progress.Progress) { + u.p.clear() + converged, err := u.updater.update(u.service, tasks, u.activeNodes, u.rollback) + assert.Check(u.t, err) + assert.Check(u.t, is.Equal(expectedConvergence, converged)) + + // instead of checking that expected and actual match exactly, verify that + // they are the same length, and every time from actual is in expected. + assert.Check(u.t, is.Equal(len(expectedProgress), len(u.p.p))) + for _, prog := range expectedProgress { + assert.Check(u.t, is.Contains(u.p.p, prog)) + } +} + func TestReplicatedProgressUpdaterOneReplica(t *testing.T) { replicas := uint64(1) @@ -373,3 +387,511 @@ func TestGlobalProgressUpdaterManyNodes(t *testing.T) { }) } } + +func TestReplicatedJobProgressUpdaterSmall(t *testing.T) { + concurrent := uint64(2) + total := uint64(5) + + service := swarm.Service{ + Spec: swarm.ServiceSpec{ + Mode: swarm.ServiceMode{ + ReplicatedJob: &swarm.ReplicatedJob{ + MaxConcurrent: &concurrent, + TotalCompletions: &total, + }, + }, + }, + JobStatus: &swarm.JobStatus{ + JobIteration: swarm.Version{Index: 1}, + }, + } + + p := &mockProgress{} + ut := updaterTester{ + t: t, + updater: newReplicatedJobProgressUpdater(service, p), + p: p, + activeNodes: map[string]struct{}{"a": {}, "b": {}}, + service: service, + } + + // create some tasks belonging to a previous iteration + tasks := []swarm.Task{ + { + ID: "oldtask1", + Slot: 0, + NodeID: "", + DesiredState: swarm.TaskStateComplete, + Status: swarm.TaskStatus{State: swarm.TaskStateNew}, + JobIteration: &swarm.Version{Index: 0}, + }, { + ID: "oldtask2", + Slot: 1, + NodeID: "", + DesiredState: swarm.TaskStateComplete, + Status: swarm.TaskStatus{State: swarm.TaskStateComplete}, + JobIteration: &swarm.Version{Index: 0}, + }, + } + + ut.testUpdater(tasks, false, []progress.Progress{ + // on the initial pass, we draw all of the progress bars at once, which + // puts them in order for the rest of the operation + {ID: "job progress", Action: "0 out of 5 complete", Current: 0, Total: 5, HideCounts: true}, + {ID: "active tasks", Action: "0 out of 2 tasks"}, + {ID: "1/5", Action: " "}, + {ID: "2/5", Action: " "}, + {ID: "3/5", Action: " "}, + {ID: "4/5", Action: " "}, + {ID: "5/5", Action: " "}, + // from here on, we draw as normal. as a side effect, we will have a + // second update for the job progress and active tasks. This has no + // practical effect on the UI, it's just a side effect of the update + // logic. + {ID: "job progress", Action: "0 out of 5 complete", Current: 0, Total: 5, HideCounts: true}, + {ID: "active tasks", Action: "0 out of 2 tasks"}, + }) + + // wipe the old tasks out of the list + tasks = []swarm.Task{} + tasks = append(tasks, + swarm.Task{ + ID: "task1", + Slot: 0, + NodeID: "", + DesiredState: swarm.TaskStateComplete, + Status: swarm.TaskStatus{State: swarm.TaskStateNew}, + JobIteration: &swarm.Version{Index: service.JobStatus.JobIteration.Index}, + }, + swarm.Task{ + ID: "task2", + Slot: 1, + NodeID: "", + DesiredState: swarm.TaskStateComplete, + Status: swarm.TaskStatus{State: swarm.TaskStateNew}, + JobIteration: &swarm.Version{Index: service.JobStatus.JobIteration.Index}, + }, + ) + ut.testUpdater(tasks, false, []progress.Progress{ + {ID: "1/5", Action: "new ", Current: 1, Total: 10, HideCounts: true}, + {ID: "2/5", Action: "new ", Current: 1, Total: 10, HideCounts: true}, + {ID: "job progress", Action: "0 out of 5 complete", Current: 0, Total: 5, HideCounts: true}, + {ID: "active tasks", Action: "2 out of 2 tasks"}, + }) + + tasks[0].Status.State = swarm.TaskStatePreparing + tasks[1].Status.State = swarm.TaskStateAssigned + ut.testUpdater(tasks, false, []progress.Progress{ + {ID: "1/5", Action: "preparing", Current: 6, Total: 10, HideCounts: true}, + {ID: "2/5", Action: "assigned ", Current: 4, Total: 10, HideCounts: true}, + {ID: "job progress", Action: "0 out of 5 complete", Current: 0, Total: 5, HideCounts: true}, + {ID: "active tasks", Action: "2 out of 2 tasks"}, + }) + + tasks[0].Status.State = swarm.TaskStateRunning + tasks[1].Status.State = swarm.TaskStatePreparing + ut.testUpdater(tasks, false, []progress.Progress{ + {ID: "1/5", Action: "running ", Current: 9, Total: 10, HideCounts: true}, + {ID: "2/5", Action: "preparing", Current: 6, Total: 10, HideCounts: true}, + {ID: "job progress", Action: "0 out of 5 complete", Current: 0, Total: 5, HideCounts: true}, + {ID: "active tasks", Action: "2 out of 2 tasks"}, + }) + + tasks[0].Status.State = swarm.TaskStateComplete + tasks[1].Status.State = swarm.TaskStateComplete + ut.testUpdater(tasks, false, []progress.Progress{ + {ID: "1/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "2/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "job progress", Action: "2 out of 5 complete", Current: 2, Total: 5, HideCounts: true}, + {ID: "active tasks", Action: "0 out of 2 tasks"}, + }) + + tasks = append(tasks, + swarm.Task{ + ID: "task3", + Slot: 2, + NodeID: "", + DesiredState: swarm.TaskStateComplete, + Status: swarm.TaskStatus{State: swarm.TaskStateNew}, + JobIteration: &swarm.Version{Index: service.JobStatus.JobIteration.Index}, + }, + swarm.Task{ + ID: "task4", + Slot: 3, + NodeID: "", + DesiredState: swarm.TaskStateComplete, + Status: swarm.TaskStatus{State: swarm.TaskStateNew}, + JobIteration: &swarm.Version{Index: service.JobStatus.JobIteration.Index}, + }, + ) + + ut.testUpdater(tasks, false, []progress.Progress{ + {ID: "1/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "2/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "3/5", Action: "new ", Current: 1, Total: 10, HideCounts: true}, + {ID: "4/5", Action: "new ", Current: 1, Total: 10, HideCounts: true}, + {ID: "job progress", Action: "2 out of 5 complete", Current: 2, Total: 5, HideCounts: true}, + {ID: "active tasks", Action: "2 out of 2 tasks"}, + }) + + tasks[2].Status.State = swarm.TaskStateRunning + tasks[3].Status.State = swarm.TaskStateRunning + ut.testUpdater(tasks, false, []progress.Progress{ + {ID: "1/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "2/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "3/5", Action: "running ", Current: 9, Total: 10, HideCounts: true}, + {ID: "4/5", Action: "running ", Current: 9, Total: 10, HideCounts: true}, + {ID: "job progress", Action: "2 out of 5 complete", Current: 2, Total: 5, HideCounts: true}, + {ID: "active tasks", Action: "2 out of 2 tasks"}, + }) + + tasks[3].Status.State = swarm.TaskStateComplete + tasks = append(tasks, + swarm.Task{ + ID: "task5", + Slot: 4, + NodeID: "", + DesiredState: swarm.TaskStateComplete, + Status: swarm.TaskStatus{State: swarm.TaskStateRunning}, + JobIteration: &swarm.Version{Index: service.JobStatus.JobIteration.Index}, + }, + ) + ut.testUpdater(tasks, false, []progress.Progress{ + {ID: "1/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "2/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "3/5", Action: "running ", Current: 9, Total: 10, HideCounts: true}, + {ID: "4/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "5/5", Action: "running ", Current: 9, Total: 10, HideCounts: true}, + {ID: "job progress", Action: "3 out of 5 complete", Current: 3, Total: 5, HideCounts: true}, + {ID: "active tasks", Action: "2 out of 2 tasks"}, + }) + + tasks[2].Status.State = swarm.TaskStateFailed + tasks[2].Status.Err = "the task failed" + ut.testUpdater(tasks, false, []progress.Progress{ + {ID: "1/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "2/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "3/5", Action: "the task failed"}, + {ID: "4/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "5/5", Action: "running ", Current: 9, Total: 10, HideCounts: true}, + {ID: "job progress", Action: "3 out of 5 complete", Current: 3, Total: 5, HideCounts: true}, + {ID: "active tasks", Action: "1 out of 2 tasks"}, + }) + + tasks[4].Status.State = swarm.TaskStateComplete + tasks = append(tasks, + swarm.Task{ + ID: "task6", + Slot: 2, + NodeID: "", + DesiredState: swarm.TaskStateComplete, + Status: swarm.TaskStatus{State: swarm.TaskStateRunning}, + JobIteration: &swarm.Version{Index: service.JobStatus.JobIteration.Index}, + }, + ) + ut.testUpdater(tasks, false, []progress.Progress{ + {ID: "1/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "2/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "3/5", Action: "running ", Current: 9, Total: 10, HideCounts: true}, + {ID: "4/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "5/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "job progress", Action: "4 out of 5 complete", Current: 4, Total: 5, HideCounts: true}, + {ID: "active tasks", Action: "1 out of 1 tasks"}, + }) + + tasks[5].Status.State = swarm.TaskStateComplete + ut.testUpdater(tasks, true, []progress.Progress{ + {ID: "1/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "2/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "3/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "4/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "5/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "job progress", Action: "5 out of 5 complete", Current: 5, Total: 5, HideCounts: true}, + {ID: "active tasks", Action: "0 out of 0 tasks"}, + }) +} + +func TestReplicatedJobProgressUpdaterLarge(t *testing.T) { + concurrent := uint64(10) + total := uint64(50) + + service := swarm.Service{ + Spec: swarm.ServiceSpec{ + Mode: swarm.ServiceMode{ + ReplicatedJob: &swarm.ReplicatedJob{ + MaxConcurrent: &concurrent, + TotalCompletions: &total, + }, + }, + }, + JobStatus: &swarm.JobStatus{ + JobIteration: swarm.Version{Index: 0}, + }, + } + + p := &mockProgress{} + ut := updaterTester{ + t: t, + updater: newReplicatedJobProgressUpdater(service, p), + p: p, + activeNodes: map[string]struct{}{"a": {}, "b": {}}, + service: service, + } + + tasks := []swarm.Task{} + + // see the comments in TestReplicatedJobProgressUpdaterSmall for why + // we write this out twice. + ut.testUpdater(tasks, false, []progress.Progress{ + {ID: "job progress", Action: " 0 out of 50 complete", Current: 0, Total: 50, HideCounts: true}, + {ID: "active tasks", Action: " 0 out of 10 tasks"}, + // we don't write out individual status bars for a large job, only the + // overall progress bar + {ID: "job progress", Action: " 0 out of 50 complete", Current: 0, Total: 50, HideCounts: true}, + {ID: "active tasks", Action: " 0 out of 10 tasks"}, + }) + + // first, create the initial batch of running tasks + for i := 0; i < int(concurrent); i++ { + tasks = append(tasks, swarm.Task{ + ID: strconv.Itoa(i), + Slot: i, + DesiredState: swarm.TaskStateComplete, + Status: swarm.TaskStatus{State: swarm.TaskStateNew}, + JobIteration: &swarm.Version{Index: 0}, + }) + + ut.testUpdater(tasks, false, []progress.Progress{ + {ID: "job progress", Action: " 0 out of 50 complete", Current: 0, Total: 50, HideCounts: true}, + {ID: "active tasks", Action: fmt.Sprintf("%2d out of 10 tasks", i+1)}, + }) + } + + // now, start moving tasks to completed, and starting new tasks after them. + // to do this, we'll start at 0, mark a task complete, and then append a + // new one. we'll stop before we get to the end, because the end has a + // steadily decreasing denominator for the active tasks + // + // for 10 concurrent 50 total, this means we'll stop at 50 - 10 = 40 tasks + // in the completed state, 10 tasks running. the last index in use will be + // 39. + for i := 0; i < int(total)-int(concurrent); i++ { + tasks[i].Status.State = swarm.TaskStateComplete + ut.testUpdater(tasks, false, []progress.Progress{ + {ID: "job progress", Action: fmt.Sprintf("%2d out of 50 complete", i+1), Current: int64(i + 1), Total: 50, HideCounts: true}, + {ID: "active tasks", Action: " 9 out of 10 tasks"}, + }) + + last := len(tasks) + tasks = append(tasks, swarm.Task{ + ID: strconv.Itoa(last), + Slot: last, + DesiredState: swarm.TaskStateComplete, + Status: swarm.TaskStatus{State: swarm.TaskStateNew}, + JobIteration: &swarm.Version{Index: 0}, + }) + + ut.testUpdater(tasks, false, []progress.Progress{ + {ID: "job progress", Action: fmt.Sprintf("%2d out of 50 complete", i+1), Current: int64(i + 1), Total: 50, HideCounts: true}, + {ID: "active tasks", Action: "10 out of 10 tasks"}, + }) + } + + // quick check, to make sure we did the math right when we wrote this code: + // we do have 50 tasks in the slice, right? + assert.Check(t, is.Equal(len(tasks), int(total))) + + // now, we're down to our last 10 tasks, which are all running. We need to + // wind these down + for i := int(total) - int(concurrent) - 1; i < int(total); i++ { + tasks[i].Status.State = swarm.TaskStateComplete + ut.testUpdater(tasks, (i+1 == int(total)), []progress.Progress{ + {ID: "job progress", Action: fmt.Sprintf("%2d out of 50 complete", i+1), Current: int64(i + 1), Total: 50, HideCounts: true}, + {ID: "active tasks", Action: fmt.Sprintf("%2[1]d out of %2[1]d tasks", int(total)-(i+1))}, + }) + } +} + +func TestGlobalJobProgressUpdaterSmall(t *testing.T) { + service := swarm.Service{ + Spec: swarm.ServiceSpec{ + Mode: swarm.ServiceMode{ + GlobalJob: &swarm.GlobalJob{}, + }, + }, + JobStatus: &swarm.JobStatus{ + JobIteration: swarm.Version{Index: 1}, + }, + } + + p := &mockProgress{} + ut := updaterTester{ + t: t, + updater: &globalJobProgressUpdater{ + progressOut: p, + }, + p: p, + activeNodes: map[string]struct{}{"a": {}, "b": {}, "c": {}}, + service: service, + } + + tasks := []swarm.Task{ + { + ID: "oldtask1", + DesiredState: swarm.TaskStateComplete, + Status: swarm.TaskStatus{State: swarm.TaskStateComplete}, + JobIteration: &swarm.Version{Index: 0}, + NodeID: "a", + }, { + ID: "oldtask2", + DesiredState: swarm.TaskStateComplete, + Status: swarm.TaskStatus{State: swarm.TaskStateComplete}, + JobIteration: &swarm.Version{Index: 0}, + NodeID: "b", + }, { + ID: "task1", + DesiredState: swarm.TaskStateComplete, + Status: swarm.TaskStatus{State: swarm.TaskStateNew}, + JobIteration: &swarm.Version{Index: 1}, + NodeID: "a", + }, { + ID: "task2", + DesiredState: swarm.TaskStateComplete, + Status: swarm.TaskStatus{State: swarm.TaskStateNew}, + JobIteration: &swarm.Version{Index: 1}, + NodeID: "b", + }, { + ID: "task3", + DesiredState: swarm.TaskStateComplete, + Status: swarm.TaskStatus{State: swarm.TaskStateNew}, + JobIteration: &swarm.Version{Index: 1}, + NodeID: "c", + }, + } + + // we don't know how many tasks will be created until we get the initial + // task list, so we should not write out any definitive answers yet. + ut.testUpdater([]swarm.Task{}, false, []progress.Progress{ + {ID: "job progress", Action: "waiting for tasks"}, + }) + + ut.testUpdaterNoOrder(tasks, false, []progress.Progress{ + {ID: "job progress", Action: "0 out of 3 complete", Current: 0, Total: 3, HideCounts: true}, + {ID: "a", Action: "new ", Current: 1, Total: 10, HideCounts: true}, + {ID: "b", Action: "new ", Current: 1, Total: 10, HideCounts: true}, + {ID: "c", Action: "new ", Current: 1, Total: 10, HideCounts: true}, + {ID: "job progress", Action: "0 out of 3 complete", Current: 0, Total: 3, HideCounts: true}, + }) + + tasks[2].Status.State = swarm.TaskStatePreparing + tasks[3].Status.State = swarm.TaskStateRunning + tasks[4].Status.State = swarm.TaskStateAccepted + ut.testUpdaterNoOrder(tasks, false, []progress.Progress{ + {ID: "a", Action: "preparing", Current: 6, Total: 10, HideCounts: true}, + {ID: "b", Action: "running ", Current: 9, Total: 10, HideCounts: true}, + {ID: "c", Action: "accepted ", Current: 5, Total: 10, HideCounts: true}, + {ID: "job progress", Action: "0 out of 3 complete", Current: 0, Total: 3, HideCounts: true}, + }) + + tasks[2].Status.State = swarm.TaskStateRunning + tasks[3].Status.State = swarm.TaskStateComplete + tasks[4].Status.State = swarm.TaskStateRunning + ut.testUpdaterNoOrder(tasks, false, []progress.Progress{ + {ID: "a", Action: "running ", Current: 9, Total: 10, HideCounts: true}, + {ID: "b", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "c", Action: "running ", Current: 9, Total: 10, HideCounts: true}, + {ID: "job progress", Action: "1 out of 3 complete", Current: 1, Total: 3, HideCounts: true}, + }) + + tasks[2].Status.State = swarm.TaskStateFailed + tasks[2].Status.Err = "task failed" + tasks[4].Status.State = swarm.TaskStateComplete + ut.testUpdaterNoOrder(tasks, false, []progress.Progress{ + {ID: "a", Action: "task failed"}, + {ID: "b", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "c", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "job progress", Action: "2 out of 3 complete", Current: 2, Total: 3, HideCounts: true}, + }) + + tasks = append(tasks, swarm.Task{ + ID: "task4", + DesiredState: swarm.TaskStateComplete, + Status: swarm.TaskStatus{State: swarm.TaskStatePreparing}, + NodeID: tasks[2].NodeID, + JobIteration: &swarm.Version{Index: 1}, + }) + + ut.testUpdaterNoOrder(tasks, false, []progress.Progress{ + {ID: "a", Action: "preparing", Current: 6, Total: 10, HideCounts: true}, + {ID: "b", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "c", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "job progress", Action: "2 out of 3 complete", Current: 2, Total: 3, HideCounts: true}, + }) + + tasks[5].Status.State = swarm.TaskStateComplete + ut.testUpdaterNoOrder(tasks, true, []progress.Progress{ + {ID: "a", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "b", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "c", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "job progress", Action: "3 out of 3 complete", Current: 3, Total: 3, HideCounts: true}, + }) +} + +func TestGlobalJobProgressUpdaterLarge(t *testing.T) { + service := swarm.Service{ + Spec: swarm.ServiceSpec{ + Mode: swarm.ServiceMode{ + GlobalJob: &swarm.GlobalJob{}, + }, + }, + JobStatus: &swarm.JobStatus{ + JobIteration: swarm.Version{Index: 1}, + }, + } + + activeNodes := map[string]struct{}{} + for i := 0; i < 50; i++ { + activeNodes[fmt.Sprintf("node%v", i)] = struct{}{} + } + + p := &mockProgress{} + ut := updaterTester{ + t: t, + updater: &globalJobProgressUpdater{ + progressOut: p, + }, + p: p, + activeNodes: activeNodes, + service: service, + } + + tasks := []swarm.Task{} + for nodeID := range activeNodes { + tasks = append(tasks, swarm.Task{ + ID: fmt.Sprintf("task%s", nodeID), + NodeID: nodeID, + DesiredState: swarm.TaskStateComplete, + Status: swarm.TaskStatus{ + State: swarm.TaskStateNew, + }, + JobIteration: &swarm.Version{Index: 1}, + }) + } + + // no bars, because too many tasks + ut.testUpdater(tasks, false, []progress.Progress{ + {ID: "job progress", Action: " 0 out of 50 complete", Current: 0, Total: 50, HideCounts: true}, + {ID: "job progress", Action: " 0 out of 50 complete", Current: 0, Total: 50, HideCounts: true}, + }) + + for i := range tasks { + tasks[i].Status.State = swarm.TaskStateComplete + ut.testUpdater(tasks, i+1 == len(activeNodes), []progress.Progress{ + { + ID: "job progress", + Action: fmt.Sprintf("%2d out of 50 complete", i+1), + Current: int64(i + 1), Total: 50, HideCounts: true, + }, + }) + } +} diff --git a/cli/command/service/scale.go b/cli/command/service/scale.go index 5b656a7f33..1184241a48 100644 --- a/cli/command/service/scale.go +++ b/cli/command/service/scale.go @@ -102,12 +102,14 @@ func runServiceScale(ctx context.Context, dockerCli command.Cli, serviceID strin } serviceMode := &service.Spec.Mode - if serviceMode.Replicated == nil { - return errors.Errorf("scale can only be used with replicated mode") + if serviceMode.Replicated != nil { + serviceMode.Replicated.Replicas = &scale + } else if serviceMode.ReplicatedJob != nil { + serviceMode.ReplicatedJob.TotalCompletions = &scale + } else { + return errors.Errorf("scale can only be used with replicated or replicated-job mode") } - serviceMode.Replicated.Replicas = &scale - response, err := client.ServiceUpdate(ctx, service.ID, service.Version, service.Spec, types.ServiceUpdateOptions{}) if err != nil { return err diff --git a/cli/command/service/testdata/service-context-write-raw.golden b/cli/command/service/testdata/service-context-write-raw.golden index feb100c9d7..fa78876c6a 100644 --- a/cli/command/service/testdata/service-context-write-raw.golden +++ b/cli/command/service/testdata/service-context-write-raw.golden @@ -26,3 +26,17 @@ replicas: 2/3 (max 1 per node) image: ports: +id: 05_job1 +name: zarp1 +mode: replicated job +replicas: 2/3 (5/10 completed) +image: +ports: + +id: 06_job2 +name: zarp2 +mode: global job +replicas: 1/1 (3/4 completed) +image: +ports: + diff --git a/docs/reference/commandline/service_create.md b/docs/reference/commandline/service_create.md index 4ae820fcc9..9088758b3a 100644 --- a/docs/reference/commandline/service_create.md +++ b/docs/reference/commandline/service_create.md @@ -41,7 +41,8 @@ Options: --limit-memory bytes Limit Memory --log-driver string Logging driver for service --log-opt list Logging driver options - --mode string Service mode (replicated or global) (default "replicated") + --max-concurrent Number of job tasks to run at once (default equal to --replicas) + --mode string Service mode (replicated, global, replicated-job, or global-job) (default "replicated") --mount mount Attach a filesystem mount to the service --name string Service name --network network Network attachments @@ -1115,6 +1116,66 @@ $ docker service create \ nvidia/cuda ``` +### Running as a job + +Jobs are a special kind of service designed to run an operation to completion +and then stop, as opposed to running long-running daemons. When a Task +belonging to a job exits successfully (return value 0), the Task is marked as +"Completed", and is not run again. + +Jobs are started by using one of two modes, `replicated-job` or `global-job` + +```bash +$ docker service create --name myjob \ + --mode replicated-job \ + bash "true" +``` + +This command will run one Task, which will, using the `bash` image, execute the +command `true`, which will return 0 and then exit. + +Though Jobs are ultimately a different kind of service, they a couple of +caveats compared to other services: + +- None of the update or rollback configuration options are valid. Jobs can be + updated, but cannot be rolled out or rolled back, making these configuration + options moot. +- Jobs are never restarted on reaching the `Complete` state. This means that + for jobs, setting `--restart-condition` to `any` is the same as setting it to + `on-failure`. + +Jobs are available in both replicated and global modes. + +#### Replicated Jobs + +A replicated job is like a replicated service. Setting the `--replicas` flag +will specify total number of iterations of a job to execute. + +By default, all replicas of a replicated job will launch at once. To control +the total number of replicas that are executing simultaneously at any one time, +the `--max-concurrent` flag can be used: + +```bash +$ docker service create --name mythrottledjob \ + --mode replicated-job \ + --replicas 10 \ + --max-concurrent 2 \ + bash "true" +``` + +The above command will execute 10 Tasks in total, but only 2 of them will be +run at any given time. + +#### Global Jobs + +Global jobs are like global services, in that a Task is executed once on each node +matching placement constraints. Global jobs are represented by the mode `global-job`. + +Note that after a Global job is created, any new Nodes added to the cluster +will have a Task from that job started on them. The Global Job does not as a +whole have a "done" state, except insofar as every Node meeting the job's +constraints has a Completed task. + ## Related commands * [service inspect](service_inspect.md) diff --git a/docs/reference/commandline/service_ls.md b/docs/reference/commandline/service_ls.md index acb1e2c1f8..efe1bced17 100644 --- a/docs/reference/commandline/service_ls.md +++ b/docs/reference/commandline/service_ls.md @@ -39,14 +39,17 @@ On a manager node: ```bash $ docker service ls -ID NAME MODE REPLICAS IMAGE -c8wgl7q4ndfd frontend replicated 5/5 nginx:alpine -dmu1ept4cxcf redis replicated 3/3 redis:3.0.6 -iwe3278osahj mongo global 7/7 mongo:3.3 +ID NAME MODE REPLICAS IMAGE +c8wgl7q4ndfd frontend replicated 5/5 nginx:alpine +dmu1ept4cxcf redis replicated 3/3 redis:3.0.6 +iwe3278osahj mongo global 7/7 mongo:3.3 +hh08h9uu8uwr job replicated-job 1/1 (3/5 completed) nginx:latest ``` The `REPLICAS` column shows both the *actual* and *desired* number of tasks for -the service. +the service. If the service is in `replicated-job` or `global-job`, it will +additionally show the completion status of the job as completed tasks over +total tasks the job will execute. ### Filtering diff --git a/docs/reference/commandline/service_scale.md b/docs/reference/commandline/service_scale.md index ee999dffdb..866669ffa7 100644 --- a/docs/reference/commandline/service_scale.md +++ b/docs/reference/commandline/service_scale.md @@ -52,7 +52,7 @@ b4g08uwuairexjub6ome6usqh $ docker service scale backend=10 -backend: scale can only be used with replicated mode +backend: scale can only be used with replicated or replicated-job mode ``` Directly afterwards, run `docker service ls`, to see the actual number of diff --git a/docs/reference/commandline/service_update.md b/docs/reference/commandline/service_update.md index 34559555e0..15af4a89f2 100644 --- a/docs/reference/commandline/service_update.md +++ b/docs/reference/commandline/service_update.md @@ -54,6 +54,7 @@ Options: --limit-memory bytes Limit Memory --log-driver string Logging driver for service --log-opt list Logging driver options + --max-concurrent Number of job tasks to run at once (default equal to --replicas) --mount-add mount Add or update a mount on a service --mount-rm list Remove a mount by its target path --network-add network Add a network @@ -298,6 +299,23 @@ See [`service create`](service_create.md#create-services-using-templates) for th `service update` supports the same `--isolation` flag as `service create` See [`service create`](service_create.md) for the reference. +### Updating Jobs + +When a service is created as a job, by setting its mode to `replicated-job` or +to `global-job` when doing `service create`, options for updating it are +limited. + +Updating a Job immediately stops any Tasks that are in progress. The operation +creates a new set of Tasks for the job and effectively resets its completion +status. If any Tasks were running before the update, they are stopped, and new +Tasks are created. + +Jobs cannot be rolled out or rolled back. None of the flags for configuring +update or rollback settings are valid with job modes. + +To run a job again with the same parameters that it was run previously, it can +be force updated with the `--force` flag. + ## Related commands * [service create](service_create.md)