mirror of https://github.com/docker/cli.git
Add jobs support to CLI
* Added two new modes accepted by the `--mode` flag * `replicated-job` creates a replicated job * `global-job` creates a global job. * When using `replicated-job` mode, the `replicas` flag sets the `TotalCompletions` parameter of the job. This is the total number of tasks that will run * Added a new flag, `max-concurrent`, for use with `replicated-job` mode. This flag sets the `MaxConcurrent` parameter of the job, which is the maximum number of replicas the job will run simultaneously. * When using `replicated-job` or `global-job` mode, using any of the update parameter flags will result in an error, as jobs cannot be updated in the traditional sense. * Updated the `docker service ls` UI to include the completion status (completed vs total tasks) if the service is a job. * Updated the progress bars UI for service creation and update to support jobs. For jobs, there is displayed a bar covering the overall progress of the job (the number of tasks completed over the total number of tasks to complete). * Added documentation explaining the use of the new flags, and of jobs in general. Signed-off-by: Drew Erny <derny@mirantis.com>
This commit is contained in:
parent
2c3797015f
commit
9375644e34
|
@ -31,7 +31,7 @@ func newCreateCommand(dockerCli command.Cli) *cobra.Command {
|
|||
},
|
||||
}
|
||||
flags := cmd.Flags()
|
||||
flags.StringVar(&opts.mode, flagMode, "replicated", "Service mode (replicated or global)")
|
||||
flags.StringVar(&opts.mode, flagMode, "replicated", "Service mode (replicated, global, replicated-job, or global-job)")
|
||||
flags.StringVar(&opts.name, flagName, "", "Service name")
|
||||
|
||||
addServiceFlags(flags, opts, buildServiceDefaultFlagMapping())
|
||||
|
|
|
@ -596,6 +596,10 @@ func (c *serviceContext) Mode() string {
|
|||
return "global"
|
||||
case c.service.Spec.Mode.Replicated != nil:
|
||||
return "replicated"
|
||||
case c.service.Spec.Mode.ReplicatedJob != nil:
|
||||
return "replicated job"
|
||||
case c.service.Spec.Mode.GlobalJob != nil:
|
||||
return "global job"
|
||||
default:
|
||||
return ""
|
||||
}
|
||||
|
@ -604,10 +608,33 @@ func (c *serviceContext) Mode() string {
|
|||
func (c *serviceContext) Replicas() string {
|
||||
s := &c.service
|
||||
|
||||
var running, desired uint64
|
||||
var running, desired, completed uint64
|
||||
if s.ServiceStatus != nil {
|
||||
running = c.service.ServiceStatus.RunningTasks
|
||||
desired = c.service.ServiceStatus.DesiredTasks
|
||||
completed = c.service.ServiceStatus.CompletedTasks
|
||||
}
|
||||
// for jobs, we will not include the max per node, even if it is set. jobs
|
||||
// include instead the progress of the job as a whole, in addition to the
|
||||
// current running state. the system respects max per node, but if we
|
||||
// included it in the list output, the lines for jobs would be entirely too
|
||||
// long and make the UI look bad.
|
||||
if s.Spec.Mode.ReplicatedJob != nil {
|
||||
return fmt.Sprintf(
|
||||
"%d/%d (%d/%d completed)",
|
||||
running, desired, completed, *s.Spec.Mode.ReplicatedJob.TotalCompletions,
|
||||
)
|
||||
}
|
||||
if s.Spec.Mode.GlobalJob != nil {
|
||||
// for global jobs, we need to do a little math. desired tasks are only
|
||||
// the tasks that have not yet actually reached the Completed state.
|
||||
// Completed tasks have reached the completed state. the TOTAL number
|
||||
// of tasks to run is the sum of the tasks desired to still complete,
|
||||
// and the tasks actually completed.
|
||||
return fmt.Sprintf(
|
||||
"%d/%d (%d/%d completed)",
|
||||
running, desired, completed, desired+completed,
|
||||
)
|
||||
}
|
||||
if r := c.maxReplicas(); r > 0 {
|
||||
return fmt.Sprintf("%d/%d (max %d per node)", running, desired, r)
|
||||
|
|
|
@ -15,6 +15,13 @@ import (
|
|||
)
|
||||
|
||||
func TestServiceContextWrite(t *testing.T) {
|
||||
var (
|
||||
// we need a pair of variables for setting the job parameters, because
|
||||
// those parameters take pointers to uint64, which we can't make as a
|
||||
// literal
|
||||
varThree uint64 = 3
|
||||
varTen uint64 = 10
|
||||
)
|
||||
cases := []struct {
|
||||
context formatter.Context
|
||||
expected string
|
||||
|
@ -38,6 +45,8 @@ func TestServiceContextWrite(t *testing.T) {
|
|||
01_baz baz global 1/3 *:80->8080/tcp
|
||||
04_qux2 qux2 replicated 3/3 (max 2 per node)
|
||||
03_qux10 qux10 replicated 2/3 (max 1 per node)
|
||||
05_job1 zarp1 replicated job 2/3 (5/10 completed)
|
||||
06_job2 zarp2 global job 1/1 (3/4 completed)
|
||||
`,
|
||||
},
|
||||
{
|
||||
|
@ -46,6 +55,8 @@ func TestServiceContextWrite(t *testing.T) {
|
|||
01_baz
|
||||
04_qux2
|
||||
03_qux10
|
||||
05_job1
|
||||
06_job2
|
||||
`,
|
||||
},
|
||||
{
|
||||
|
@ -55,6 +66,8 @@ bar replicated
|
|||
baz global
|
||||
qux2 replicated
|
||||
qux10 replicated
|
||||
zarp1 replicated job
|
||||
zarp2 global job
|
||||
`,
|
||||
},
|
||||
{
|
||||
|
@ -64,6 +77,8 @@ bar
|
|||
baz
|
||||
qux2
|
||||
qux10
|
||||
zarp1
|
||||
zarp2
|
||||
`,
|
||||
},
|
||||
// Raw Format
|
||||
|
@ -77,6 +92,8 @@ qux10
|
|||
id: 01_baz
|
||||
id: 04_qux2
|
||||
id: 03_qux10
|
||||
id: 05_job1
|
||||
id: 06_job2
|
||||
`,
|
||||
},
|
||||
// Custom Format
|
||||
|
@ -86,6 +103,8 @@ id: 03_qux10
|
|||
baz
|
||||
qux2
|
||||
qux10
|
||||
zarp1
|
||||
zarp2
|
||||
`,
|
||||
},
|
||||
}
|
||||
|
@ -170,6 +189,37 @@ qux10
|
|||
DesiredTasks: 3,
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "05_job1",
|
||||
Spec: swarm.ServiceSpec{
|
||||
Annotations: swarm.Annotations{Name: "zarp1"},
|
||||
Mode: swarm.ServiceMode{
|
||||
ReplicatedJob: &swarm.ReplicatedJob{
|
||||
MaxConcurrent: &varThree,
|
||||
TotalCompletions: &varTen,
|
||||
},
|
||||
},
|
||||
},
|
||||
ServiceStatus: &swarm.ServiceStatus{
|
||||
RunningTasks: 2,
|
||||
DesiredTasks: 3,
|
||||
CompletedTasks: 5,
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "06_job2",
|
||||
Spec: swarm.ServiceSpec{
|
||||
Annotations: swarm.Annotations{Name: "zarp2"},
|
||||
Mode: swarm.ServiceMode{
|
||||
GlobalJob: &swarm.GlobalJob{},
|
||||
},
|
||||
},
|
||||
ServiceStatus: &swarm.ServiceStatus{
|
||||
RunningTasks: 1,
|
||||
DesiredTasks: 1,
|
||||
CompletedTasks: 3,
|
||||
},
|
||||
},
|
||||
}
|
||||
out := bytes.NewBufferString("")
|
||||
testcase.context.Output = out
|
||||
|
|
|
@ -111,6 +111,8 @@ func AppendServiceStatus(ctx context.Context, c client.APIClient, services []swa
|
|||
status := map[string]*swarm.ServiceStatus{}
|
||||
taskFilter := filters.NewArgs()
|
||||
for i, s := range services {
|
||||
// there is no need in this switch to check for job modes. jobs are not
|
||||
// supported until after ServiceStatus was introduced.
|
||||
switch {
|
||||
case s.ServiceStatus != nil:
|
||||
// Server already returned service-status, so we don't
|
||||
|
|
|
@ -508,8 +508,9 @@ type serviceOptions struct {
|
|||
resources resourceOptions
|
||||
stopGrace opts.DurationOpt
|
||||
|
||||
replicas Uint64Opt
|
||||
mode string
|
||||
replicas Uint64Opt
|
||||
mode string
|
||||
maxConcurrent Uint64Opt
|
||||
|
||||
restartPolicy restartPolicyOptions
|
||||
constraints opts.ListOpts
|
||||
|
@ -554,18 +555,45 @@ func (options *serviceOptions) ToServiceMode() (swarm.ServiceMode, error) {
|
|||
switch options.mode {
|
||||
case "global":
|
||||
if options.replicas.Value() != nil {
|
||||
return serviceMode, errors.Errorf("replicas can only be used with replicated mode")
|
||||
return serviceMode, errors.Errorf("replicas can only be used with replicated or replicated-job mode")
|
||||
}
|
||||
|
||||
if options.maxReplicas > 0 {
|
||||
return serviceMode, errors.New("replicas-max-per-node can only be used with replicated mode")
|
||||
return serviceMode, errors.New("replicas-max-per-node can only be used with replicated or replicated-job mode")
|
||||
}
|
||||
if options.maxConcurrent.Value() != nil {
|
||||
return serviceMode, errors.New("max-concurrent can only be used with replicated-job mode")
|
||||
}
|
||||
|
||||
serviceMode.Global = &swarm.GlobalService{}
|
||||
case "replicated":
|
||||
if options.maxConcurrent.Value() != nil {
|
||||
return serviceMode, errors.New("max-concurrent can only be used with replicated-job mode")
|
||||
}
|
||||
|
||||
serviceMode.Replicated = &swarm.ReplicatedService{
|
||||
Replicas: options.replicas.Value(),
|
||||
}
|
||||
case "replicated-job":
|
||||
concurrent := options.maxConcurrent.Value()
|
||||
if concurrent == nil {
|
||||
concurrent = options.replicas.Value()
|
||||
}
|
||||
serviceMode.ReplicatedJob = &swarm.ReplicatedJob{
|
||||
MaxConcurrent: concurrent,
|
||||
TotalCompletions: options.replicas.Value(),
|
||||
}
|
||||
case "global-job":
|
||||
if options.maxReplicas > 0 {
|
||||
return serviceMode, errors.New("replicas-max-per-node can only be used with replicated or replicated-job mode")
|
||||
}
|
||||
if options.maxConcurrent.Value() != nil {
|
||||
return serviceMode, errors.New("max-concurrent can only be used with replicated-job mode")
|
||||
}
|
||||
if options.replicas.Value() != nil {
|
||||
return serviceMode, errors.Errorf("replicas can only be used with replicated or replicated-job mode")
|
||||
}
|
||||
serviceMode.GlobalJob = &swarm.GlobalJob{}
|
||||
default:
|
||||
return serviceMode, errors.Errorf("Unknown mode: %s, only replicated and global supported", options.mode)
|
||||
}
|
||||
|
@ -579,14 +607,13 @@ func (options *serviceOptions) ToStopGracePeriod(flags *pflag.FlagSet) *time.Dur
|
|||
return nil
|
||||
}
|
||||
|
||||
func (options *serviceOptions) ToService(ctx context.Context, apiClient client.NetworkAPIClient, flags *pflag.FlagSet) (swarm.ServiceSpec, error) {
|
||||
var service swarm.ServiceSpec
|
||||
|
||||
// makeEnv gets the environment variables from the command line options and
|
||||
// returns a slice of strings to use in the service spec when doing ToService
|
||||
func (options *serviceOptions) makeEnv() ([]string, error) {
|
||||
envVariables, err := opts.ReadKVEnvStrings(options.envFile.GetAll(), options.env.GetAll())
|
||||
if err != nil {
|
||||
return service, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
currentEnv := make([]string, 0, len(envVariables))
|
||||
for _, env := range envVariables { // need to process each var, in order
|
||||
k := strings.SplitN(env, "=", 2)[0]
|
||||
|
@ -601,6 +628,24 @@ func (options *serviceOptions) ToService(ctx context.Context, apiClient client.N
|
|||
currentEnv = append(currentEnv, env)
|
||||
}
|
||||
|
||||
return currentEnv, nil
|
||||
}
|
||||
|
||||
// ToService takes the set of flags passed to the command and converts them
|
||||
// into a service spec.
|
||||
//
|
||||
// Takes an API client as the second argument in order to resolve network names
|
||||
// from the flags into network IDs.
|
||||
//
|
||||
// Returns an error if any flags are invalid or contradictory.
|
||||
func (options *serviceOptions) ToService(ctx context.Context, apiClient client.NetworkAPIClient, flags *pflag.FlagSet) (swarm.ServiceSpec, error) {
|
||||
var service swarm.ServiceSpec
|
||||
|
||||
currentEnv, err := options.makeEnv()
|
||||
if err != nil {
|
||||
return service, err
|
||||
}
|
||||
|
||||
healthConfig, err := options.healthcheck.toHealthConfig()
|
||||
if err != nil {
|
||||
return service, err
|
||||
|
@ -611,6 +656,16 @@ func (options *serviceOptions) ToService(ctx context.Context, apiClient client.N
|
|||
return service, err
|
||||
}
|
||||
|
||||
updateConfig := options.update.updateConfig(flags)
|
||||
rollbackConfig := options.rollback.rollbackConfig(flags)
|
||||
|
||||
// update and rollback configuration is not supported for jobs. If these
|
||||
// flags are not set, then the values will be nil. If they are non-nil,
|
||||
// then return an error.
|
||||
if (serviceMode.ReplicatedJob != nil || serviceMode.GlobalJob != nil) && (updateConfig != nil || rollbackConfig != nil) {
|
||||
return service, errors.Errorf("update and rollback configuration is not supported for jobs")
|
||||
}
|
||||
|
||||
networks := convertNetworks(options.networks)
|
||||
for i, net := range networks {
|
||||
nwID, err := resolveNetworkID(ctx, apiClient, net.Target)
|
||||
|
@ -671,8 +726,8 @@ func (options *serviceOptions) ToService(ctx context.Context, apiClient client.N
|
|||
LogDriver: options.logDriver.toLogDriver(),
|
||||
},
|
||||
Mode: serviceMode,
|
||||
UpdateConfig: options.update.updateConfig(flags),
|
||||
RollbackConfig: options.rollback.rollbackConfig(flags),
|
||||
UpdateConfig: updateConfig,
|
||||
RollbackConfig: rollbackConfig,
|
||||
EndpointSpec: options.endpoint.ToEndpointSpec(),
|
||||
}
|
||||
|
||||
|
@ -769,6 +824,8 @@ func addServiceFlags(flags *pflag.FlagSet, opts *serviceOptions, defaultFlagValu
|
|||
|
||||
flags.Var(&opts.stopGrace, flagStopGracePeriod, flagDesc(flagStopGracePeriod, "Time to wait before force killing a container (ns|us|ms|s|m|h)"))
|
||||
flags.Var(&opts.replicas, flagReplicas, "Number of tasks")
|
||||
flags.Var(&opts.maxConcurrent, flagConcurrent, "Number of job tasks to run concurrently (default equal to --replicas)")
|
||||
flags.SetAnnotation(flagConcurrent, "version", []string{"1.41"})
|
||||
flags.Uint64Var(&opts.maxReplicas, flagMaxReplicas, defaultFlagValues.getUint64(flagMaxReplicas), "Maximum number of tasks per node (default 0 = unlimited)")
|
||||
flags.SetAnnotation(flagMaxReplicas, "version", []string{"1.40"})
|
||||
|
||||
|
@ -878,6 +935,7 @@ const (
|
|||
flagLimitCPU = "limit-cpu"
|
||||
flagLimitMemory = "limit-memory"
|
||||
flagMaxReplicas = "replicas-max-per-node"
|
||||
flagConcurrent = "max-concurrent"
|
||||
flagMode = "mode"
|
||||
flagMount = "mount"
|
||||
flagMountRemove = "mount-rm"
|
||||
|
|
|
@ -285,7 +285,7 @@ func TestToServiceMaxReplicasGlobalModeConflict(t *testing.T) {
|
|||
maxReplicas: 1,
|
||||
}
|
||||
_, err := opt.ToServiceMode()
|
||||
assert.Error(t, err, "replicas-max-per-node can only be used with replicated mode")
|
||||
assert.Error(t, err, "replicas-max-per-node can only be used with replicated or replicated-job mode")
|
||||
}
|
||||
|
||||
func TestToServiceSysCtls(t *testing.T) {
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"io"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
|
@ -45,6 +46,7 @@ var (
|
|||
const (
|
||||
maxProgress = 9
|
||||
maxProgressBars = 20
|
||||
maxJobProgress = 10
|
||||
)
|
||||
|
||||
type progressUpdater interface {
|
||||
|
@ -53,7 +55,9 @@ type progressUpdater interface {
|
|||
|
||||
func init() {
|
||||
for state := range numberedStates {
|
||||
if !terminalState(state) && len(state) > longestState {
|
||||
// for jobs, we use the "complete" state, and so it should be factored
|
||||
// in to the computation of the longest state.
|
||||
if (!terminalState(state) || state == swarm.TaskStateComplete) && len(state) > longestState {
|
||||
longestState = len(state)
|
||||
}
|
||||
}
|
||||
|
@ -164,6 +168,18 @@ func ServiceProgress(ctx context.Context, client client.APIClient, serviceID str
|
|||
return err
|
||||
}
|
||||
if converged {
|
||||
// if the service is a job, there's no need to verify it. jobs are
|
||||
// stay done once they're done. skip the verification and just end
|
||||
// the progress monitoring.
|
||||
//
|
||||
// only job services have a non-nil job status, which means we can
|
||||
// use the presence of this field to check if the service is a job
|
||||
// here.
|
||||
if service.JobStatus != nil {
|
||||
progress.Message(progressOut, "", "job complete")
|
||||
return nil
|
||||
}
|
||||
|
||||
if convergedAt.IsZero() {
|
||||
convergedAt = time.Now()
|
||||
}
|
||||
|
@ -230,6 +246,14 @@ func initializeUpdater(service swarm.Service, progressOut progress.Output) (prog
|
|||
progressOut: progressOut,
|
||||
}, nil
|
||||
}
|
||||
if service.Spec.Mode.ReplicatedJob != nil {
|
||||
return newReplicatedJobProgressUpdater(service, progressOut), nil
|
||||
}
|
||||
if service.Spec.Mode.GlobalJob != nil {
|
||||
return &globalJobProgressUpdater{
|
||||
progressOut: progressOut,
|
||||
}, nil
|
||||
}
|
||||
return nil, errors.New("unrecognized service mode")
|
||||
}
|
||||
|
||||
|
@ -502,3 +526,322 @@ func (u *globalProgressUpdater) writeTaskProgress(task swarm.Task, nodeCount int
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
// replicatedJobProgressUpdater outputs the progress of a replicated job. This
|
||||
// progress consists of a few main elements.
|
||||
//
|
||||
// The first is the progress bar for the job as a whole. This shows the number
|
||||
// of completed out of total tasks for the job. Tasks that are currently
|
||||
// running are not counted.
|
||||
//
|
||||
// The second is the status of the "active" tasks for the job. We count a task
|
||||
// as "active" if it has any non-terminal state, not just running. This is
|
||||
// shown as a fraction of the maximum concurrent tasks that can be running,
|
||||
// which is the less of MaxConcurrent or TotalCompletions - completed tasks.
|
||||
type replicatedJobProgressUpdater struct {
|
||||
progressOut progress.Output
|
||||
|
||||
// jobIteration is the service's job iteration, used to exclude tasks
|
||||
// belonging to earlier iterations.
|
||||
jobIteration uint64
|
||||
|
||||
// concurrent is the value of MaxConcurrent as an int. That is, the maximum
|
||||
// number of tasks allowed to be run simultaneously.
|
||||
concurrent int
|
||||
|
||||
// total is the value of TotalCompletions, the number of complete tasks
|
||||
// desired.
|
||||
total int
|
||||
|
||||
// initialized is set to true after the first time update is called. the
|
||||
// first time update is called, the components of the progress UI are all
|
||||
// written out in an initial pass. this ensure that they will subsequently
|
||||
// be in order, no matter how they are updated.
|
||||
initialized bool
|
||||
|
||||
// progressDigits is the number digits in total, so that we know how much
|
||||
// to pad the job progress field with.
|
||||
//
|
||||
// when we're writing the number of completed over total tasks, we need to
|
||||
// pad the numerator with spaces, so that the bar doesn't jump around.
|
||||
// we'll compute that once on init, and then reuse it over and over.
|
||||
//
|
||||
// we compute this in the least clever way possible: convert to string
|
||||
// with strconv.Itoa, then take the len.
|
||||
progressDigits int
|
||||
|
||||
// activeDigits is the same, but for active tasks, and it applies to both
|
||||
// the numerator and denominator.
|
||||
activeDigits int
|
||||
}
|
||||
|
||||
func newReplicatedJobProgressUpdater(service swarm.Service, progressOut progress.Output) *replicatedJobProgressUpdater {
|
||||
u := &replicatedJobProgressUpdater{
|
||||
progressOut: progressOut,
|
||||
concurrent: int(*service.Spec.Mode.ReplicatedJob.MaxConcurrent),
|
||||
total: int(*service.Spec.Mode.ReplicatedJob.TotalCompletions),
|
||||
jobIteration: service.JobStatus.JobIteration.Index,
|
||||
}
|
||||
u.progressDigits = len(strconv.Itoa(u.total))
|
||||
u.activeDigits = len(strconv.Itoa(u.concurrent))
|
||||
|
||||
return u
|
||||
}
|
||||
|
||||
// update writes out the progress of the replicated job.
|
||||
func (u *replicatedJobProgressUpdater) update(_ swarm.Service, tasks []swarm.Task, _ map[string]struct{}, _ bool) (bool, error) {
|
||||
if !u.initialized {
|
||||
u.writeOverallProgress(0, 0)
|
||||
|
||||
// only write out progress bars if there will be less than the maximum
|
||||
if u.total <= maxProgressBars {
|
||||
for i := 1; i <= u.total; i++ {
|
||||
u.progressOut.WriteProgress(progress.Progress{
|
||||
ID: fmt.Sprintf("%d/%d", i, u.total),
|
||||
Action: " ",
|
||||
})
|
||||
}
|
||||
}
|
||||
u.initialized = true
|
||||
}
|
||||
|
||||
// tasksBySlot is a mapping of slot number to the task valid for that slot.
|
||||
// it deduplicated tasks occupying the same numerical slot but in different
|
||||
// states.
|
||||
tasksBySlot := make(map[int]swarm.Task)
|
||||
for _, task := range tasks {
|
||||
// first, check if the task belongs to this service iteration. skip
|
||||
// tasks belonging to other iterations.
|
||||
if task.JobIteration == nil || task.JobIteration.Index != u.jobIteration {
|
||||
continue
|
||||
}
|
||||
|
||||
// then, if the task is in an unknown state, ignore it.
|
||||
if numberedStates[task.DesiredState] == 0 ||
|
||||
numberedStates[task.Status.State] == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// finally, check if the task already exists in the map
|
||||
if existing, ok := tasksBySlot[task.Slot]; ok {
|
||||
// if so, use the task with the lower actual state
|
||||
if numberedStates[existing.Status.State] > numberedStates[task.Status.State] {
|
||||
tasksBySlot[task.Slot] = task
|
||||
}
|
||||
} else {
|
||||
// otherwise, just add it to the map.
|
||||
tasksBySlot[task.Slot] = task
|
||||
}
|
||||
}
|
||||
|
||||
activeTasks := 0
|
||||
completeTasks := 0
|
||||
|
||||
for i := 0; i < len(tasksBySlot); i++ {
|
||||
task := tasksBySlot[i]
|
||||
u.writeTaskProgress(task)
|
||||
|
||||
if numberedStates[task.Status.State] < numberedStates[swarm.TaskStateComplete] {
|
||||
activeTasks++
|
||||
}
|
||||
|
||||
if task.Status.State == swarm.TaskStateComplete {
|
||||
completeTasks++
|
||||
}
|
||||
}
|
||||
|
||||
u.writeOverallProgress(activeTasks, completeTasks)
|
||||
|
||||
return completeTasks == u.total, nil
|
||||
}
|
||||
|
||||
func (u *replicatedJobProgressUpdater) writeOverallProgress(active, completed int) {
|
||||
u.progressOut.WriteProgress(progress.Progress{
|
||||
ID: "job progress",
|
||||
Action: fmt.Sprintf(
|
||||
// * means "use the next positional arg to compute padding"
|
||||
"%*d out of %d complete", u.progressDigits, completed, u.total,
|
||||
),
|
||||
Current: int64(completed),
|
||||
Total: int64(u.total),
|
||||
HideCounts: true,
|
||||
})
|
||||
|
||||
// actualDesired is the lesser of MaxConcurrent, or the remaining tasks
|
||||
actualDesired := u.total - completed
|
||||
if actualDesired > u.concurrent {
|
||||
actualDesired = u.concurrent
|
||||
}
|
||||
|
||||
u.progressOut.WriteProgress(progress.Progress{
|
||||
ID: "active tasks",
|
||||
Action: fmt.Sprintf(
|
||||
// [n] notation lets us select a specific argument, 1-indexed
|
||||
// putting the [1] before the star means "make the string this
|
||||
// length". putting the [2] or the [3] means "use this argument
|
||||
// here"
|
||||
//
|
||||
// we pad both the numerator and the denominator because, as the
|
||||
// job reaches its conclusion, the number of possible concurrent
|
||||
// tasks will go down, as fewer than MaxConcurrent tasks are needed
|
||||
// to complete the job.
|
||||
"%[1]*[2]d out of %[1]*[3]d tasks", u.activeDigits, active, actualDesired,
|
||||
),
|
||||
})
|
||||
}
|
||||
|
||||
func (u *replicatedJobProgressUpdater) writeTaskProgress(task swarm.Task) {
|
||||
if u.total > maxProgressBars {
|
||||
return
|
||||
}
|
||||
|
||||
if task.Status.Err != "" {
|
||||
u.progressOut.WriteProgress(progress.Progress{
|
||||
ID: fmt.Sprintf("%d/%d", task.Slot+1, u.total),
|
||||
Action: truncError(task.Status.Err),
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
u.progressOut.WriteProgress(progress.Progress{
|
||||
ID: fmt.Sprintf("%d/%d", task.Slot+1, u.total),
|
||||
Action: fmt.Sprintf("%-*s", longestState, task.Status.State),
|
||||
Current: numberedStates[task.Status.State],
|
||||
Total: maxJobProgress,
|
||||
HideCounts: true,
|
||||
})
|
||||
}
|
||||
|
||||
// globalJobProgressUpdater is the progressUpdater for GlobalJob-mode services.
|
||||
// Because GlobalJob services are so much simpler than ReplicatedJob services,
|
||||
// this updater is in turn simpler as well.
|
||||
type globalJobProgressUpdater struct {
|
||||
progressOut progress.Output
|
||||
|
||||
// initialized is used to detect the first pass of update, and to perform
|
||||
// first time initialization logic at that time.
|
||||
initialized bool
|
||||
|
||||
// total is the total number of tasks expected for this job
|
||||
total int
|
||||
|
||||
// progressDigits is the number of spaces to pad the numerator of the job
|
||||
// progress field
|
||||
progressDigits int
|
||||
|
||||
taskNodes map[string]struct{}
|
||||
}
|
||||
|
||||
func (u *globalJobProgressUpdater) update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]struct{}, _ bool) (bool, error) {
|
||||
if !u.initialized {
|
||||
// if there are not yet tasks, then return early.
|
||||
if len(tasks) == 0 && len(activeNodes) != 0 {
|
||||
u.progressOut.WriteProgress(progress.Progress{
|
||||
ID: "job progress",
|
||||
Action: "waiting for tasks",
|
||||
})
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// when a global job starts, all of its tasks are created at once, so
|
||||
// we can use len(tasks) to know how many we're expecting.
|
||||
u.taskNodes = map[string]struct{}{}
|
||||
|
||||
for _, task := range tasks {
|
||||
// skip any tasks not belonging to this job iteration.
|
||||
if task.JobIteration == nil || task.JobIteration.Index != service.JobStatus.JobIteration.Index {
|
||||
continue
|
||||
}
|
||||
|
||||
// collect the list of all node IDs for this service.
|
||||
//
|
||||
// basically, global jobs will execute on any new nodes that join
|
||||
// the cluster in the future. to avoid making things complicated,
|
||||
// we will only check the progress of the initial set of nodes. if
|
||||
// any new nodes come online during the operation, we will ignore
|
||||
// them.
|
||||
u.taskNodes[task.NodeID] = struct{}{}
|
||||
}
|
||||
|
||||
u.total = len(u.taskNodes)
|
||||
u.progressDigits = len(strconv.Itoa(u.total))
|
||||
|
||||
u.writeOverallProgress(0)
|
||||
u.initialized = true
|
||||
}
|
||||
|
||||
// tasksByNodeID maps a NodeID to the latest task for that Node ID. this
|
||||
// lets us pick only the latest task for any given node.
|
||||
tasksByNodeID := map[string]swarm.Task{}
|
||||
|
||||
for _, task := range tasks {
|
||||
// skip any tasks not belonging to this job iteration
|
||||
if task.JobIteration == nil || task.JobIteration.Index != service.JobStatus.JobIteration.Index {
|
||||
continue
|
||||
}
|
||||
|
||||
// if the task is not on one of the initial set of nodes, ignore it.
|
||||
if _, ok := u.taskNodes[task.NodeID]; !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
// if there is already a task recorded for this node, choose the one
|
||||
// with the lower state
|
||||
if oldtask, ok := tasksByNodeID[task.NodeID]; ok {
|
||||
if numberedStates[oldtask.Status.State] > numberedStates[task.Status.State] {
|
||||
tasksByNodeID[task.NodeID] = task
|
||||
}
|
||||
} else {
|
||||
tasksByNodeID[task.NodeID] = task
|
||||
}
|
||||
}
|
||||
|
||||
complete := 0
|
||||
for _, task := range tasksByNodeID {
|
||||
u.writeTaskProgress(task)
|
||||
if task.Status.State == swarm.TaskStateComplete {
|
||||
complete++
|
||||
}
|
||||
}
|
||||
|
||||
u.writeOverallProgress(complete)
|
||||
return complete == u.total, nil
|
||||
}
|
||||
|
||||
func (u *globalJobProgressUpdater) writeTaskProgress(task swarm.Task) {
|
||||
if u.total > maxProgressBars {
|
||||
return
|
||||
}
|
||||
|
||||
if task.Status.Err != "" {
|
||||
u.progressOut.WriteProgress(progress.Progress{
|
||||
ID: task.NodeID,
|
||||
Action: truncError(task.Status.Err),
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
u.progressOut.WriteProgress(progress.Progress{
|
||||
ID: task.NodeID,
|
||||
Action: fmt.Sprintf("%-*s", longestState, task.Status.State),
|
||||
Current: numberedStates[task.Status.State],
|
||||
Total: maxJobProgress,
|
||||
HideCounts: true,
|
||||
})
|
||||
}
|
||||
|
||||
func (u *globalJobProgressUpdater) writeOverallProgress(complete int) {
|
||||
// all tasks for a global job are active at once, so we only write out the
|
||||
// total progress.
|
||||
u.progressOut.WriteProgress(progress.Progress{
|
||||
// see (*replicatedJobProgressUpdater).writeOverallProgress for an
|
||||
// explanation fo the advanced fmt use in this function.
|
||||
ID: "job progress",
|
||||
Action: fmt.Sprintf(
|
||||
"%*d out of %d complete", u.progressDigits, complete, u.total,
|
||||
),
|
||||
Current: int64(complete),
|
||||
Total: int64(u.total),
|
||||
HideCounts: true,
|
||||
})
|
||||
}
|
||||
|
|
|
@ -42,6 +42,20 @@ func (u updaterTester) testUpdater(tasks []swarm.Task, expectedConvergence bool,
|
|||
assert.Check(u.t, is.DeepEqual(expectedProgress, u.p.p))
|
||||
}
|
||||
|
||||
func (u updaterTester) testUpdaterNoOrder(tasks []swarm.Task, expectedConvergence bool, expectedProgress []progress.Progress) {
|
||||
u.p.clear()
|
||||
converged, err := u.updater.update(u.service, tasks, u.activeNodes, u.rollback)
|
||||
assert.Check(u.t, err)
|
||||
assert.Check(u.t, is.Equal(expectedConvergence, converged))
|
||||
|
||||
// instead of checking that expected and actual match exactly, verify that
|
||||
// they are the same length, and every time from actual is in expected.
|
||||
assert.Check(u.t, is.Equal(len(expectedProgress), len(u.p.p)))
|
||||
for _, prog := range expectedProgress {
|
||||
assert.Check(u.t, is.Contains(u.p.p, prog))
|
||||
}
|
||||
}
|
||||
|
||||
func TestReplicatedProgressUpdaterOneReplica(t *testing.T) {
|
||||
replicas := uint64(1)
|
||||
|
||||
|
@ -373,3 +387,511 @@ func TestGlobalProgressUpdaterManyNodes(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestReplicatedJobProgressUpdaterSmall(t *testing.T) {
|
||||
concurrent := uint64(2)
|
||||
total := uint64(5)
|
||||
|
||||
service := swarm.Service{
|
||||
Spec: swarm.ServiceSpec{
|
||||
Mode: swarm.ServiceMode{
|
||||
ReplicatedJob: &swarm.ReplicatedJob{
|
||||
MaxConcurrent: &concurrent,
|
||||
TotalCompletions: &total,
|
||||
},
|
||||
},
|
||||
},
|
||||
JobStatus: &swarm.JobStatus{
|
||||
JobIteration: swarm.Version{Index: 1},
|
||||
},
|
||||
}
|
||||
|
||||
p := &mockProgress{}
|
||||
ut := updaterTester{
|
||||
t: t,
|
||||
updater: newReplicatedJobProgressUpdater(service, p),
|
||||
p: p,
|
||||
activeNodes: map[string]struct{}{"a": {}, "b": {}},
|
||||
service: service,
|
||||
}
|
||||
|
||||
// create some tasks belonging to a previous iteration
|
||||
tasks := []swarm.Task{
|
||||
{
|
||||
ID: "oldtask1",
|
||||
Slot: 0,
|
||||
NodeID: "",
|
||||
DesiredState: swarm.TaskStateComplete,
|
||||
Status: swarm.TaskStatus{State: swarm.TaskStateNew},
|
||||
JobIteration: &swarm.Version{Index: 0},
|
||||
}, {
|
||||
ID: "oldtask2",
|
||||
Slot: 1,
|
||||
NodeID: "",
|
||||
DesiredState: swarm.TaskStateComplete,
|
||||
Status: swarm.TaskStatus{State: swarm.TaskStateComplete},
|
||||
JobIteration: &swarm.Version{Index: 0},
|
||||
},
|
||||
}
|
||||
|
||||
ut.testUpdater(tasks, false, []progress.Progress{
|
||||
// on the initial pass, we draw all of the progress bars at once, which
|
||||
// puts them in order for the rest of the operation
|
||||
{ID: "job progress", Action: "0 out of 5 complete", Current: 0, Total: 5, HideCounts: true},
|
||||
{ID: "active tasks", Action: "0 out of 2 tasks"},
|
||||
{ID: "1/5", Action: " "},
|
||||
{ID: "2/5", Action: " "},
|
||||
{ID: "3/5", Action: " "},
|
||||
{ID: "4/5", Action: " "},
|
||||
{ID: "5/5", Action: " "},
|
||||
// from here on, we draw as normal. as a side effect, we will have a
|
||||
// second update for the job progress and active tasks. This has no
|
||||
// practical effect on the UI, it's just a side effect of the update
|
||||
// logic.
|
||||
{ID: "job progress", Action: "0 out of 5 complete", Current: 0, Total: 5, HideCounts: true},
|
||||
{ID: "active tasks", Action: "0 out of 2 tasks"},
|
||||
})
|
||||
|
||||
// wipe the old tasks out of the list
|
||||
tasks = []swarm.Task{}
|
||||
tasks = append(tasks,
|
||||
swarm.Task{
|
||||
ID: "task1",
|
||||
Slot: 0,
|
||||
NodeID: "",
|
||||
DesiredState: swarm.TaskStateComplete,
|
||||
Status: swarm.TaskStatus{State: swarm.TaskStateNew},
|
||||
JobIteration: &swarm.Version{Index: service.JobStatus.JobIteration.Index},
|
||||
},
|
||||
swarm.Task{
|
||||
ID: "task2",
|
||||
Slot: 1,
|
||||
NodeID: "",
|
||||
DesiredState: swarm.TaskStateComplete,
|
||||
Status: swarm.TaskStatus{State: swarm.TaskStateNew},
|
||||
JobIteration: &swarm.Version{Index: service.JobStatus.JobIteration.Index},
|
||||
},
|
||||
)
|
||||
ut.testUpdater(tasks, false, []progress.Progress{
|
||||
{ID: "1/5", Action: "new ", Current: 1, Total: 10, HideCounts: true},
|
||||
{ID: "2/5", Action: "new ", Current: 1, Total: 10, HideCounts: true},
|
||||
{ID: "job progress", Action: "0 out of 5 complete", Current: 0, Total: 5, HideCounts: true},
|
||||
{ID: "active tasks", Action: "2 out of 2 tasks"},
|
||||
})
|
||||
|
||||
tasks[0].Status.State = swarm.TaskStatePreparing
|
||||
tasks[1].Status.State = swarm.TaskStateAssigned
|
||||
ut.testUpdater(tasks, false, []progress.Progress{
|
||||
{ID: "1/5", Action: "preparing", Current: 6, Total: 10, HideCounts: true},
|
||||
{ID: "2/5", Action: "assigned ", Current: 4, Total: 10, HideCounts: true},
|
||||
{ID: "job progress", Action: "0 out of 5 complete", Current: 0, Total: 5, HideCounts: true},
|
||||
{ID: "active tasks", Action: "2 out of 2 tasks"},
|
||||
})
|
||||
|
||||
tasks[0].Status.State = swarm.TaskStateRunning
|
||||
tasks[1].Status.State = swarm.TaskStatePreparing
|
||||
ut.testUpdater(tasks, false, []progress.Progress{
|
||||
{ID: "1/5", Action: "running ", Current: 9, Total: 10, HideCounts: true},
|
||||
{ID: "2/5", Action: "preparing", Current: 6, Total: 10, HideCounts: true},
|
||||
{ID: "job progress", Action: "0 out of 5 complete", Current: 0, Total: 5, HideCounts: true},
|
||||
{ID: "active tasks", Action: "2 out of 2 tasks"},
|
||||
})
|
||||
|
||||
tasks[0].Status.State = swarm.TaskStateComplete
|
||||
tasks[1].Status.State = swarm.TaskStateComplete
|
||||
ut.testUpdater(tasks, false, []progress.Progress{
|
||||
{ID: "1/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||
{ID: "2/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||
{ID: "job progress", Action: "2 out of 5 complete", Current: 2, Total: 5, HideCounts: true},
|
||||
{ID: "active tasks", Action: "0 out of 2 tasks"},
|
||||
})
|
||||
|
||||
tasks = append(tasks,
|
||||
swarm.Task{
|
||||
ID: "task3",
|
||||
Slot: 2,
|
||||
NodeID: "",
|
||||
DesiredState: swarm.TaskStateComplete,
|
||||
Status: swarm.TaskStatus{State: swarm.TaskStateNew},
|
||||
JobIteration: &swarm.Version{Index: service.JobStatus.JobIteration.Index},
|
||||
},
|
||||
swarm.Task{
|
||||
ID: "task4",
|
||||
Slot: 3,
|
||||
NodeID: "",
|
||||
DesiredState: swarm.TaskStateComplete,
|
||||
Status: swarm.TaskStatus{State: swarm.TaskStateNew},
|
||||
JobIteration: &swarm.Version{Index: service.JobStatus.JobIteration.Index},
|
||||
},
|
||||
)
|
||||
|
||||
ut.testUpdater(tasks, false, []progress.Progress{
|
||||
{ID: "1/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||
{ID: "2/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||
{ID: "3/5", Action: "new ", Current: 1, Total: 10, HideCounts: true},
|
||||
{ID: "4/5", Action: "new ", Current: 1, Total: 10, HideCounts: true},
|
||||
{ID: "job progress", Action: "2 out of 5 complete", Current: 2, Total: 5, HideCounts: true},
|
||||
{ID: "active tasks", Action: "2 out of 2 tasks"},
|
||||
})
|
||||
|
||||
tasks[2].Status.State = swarm.TaskStateRunning
|
||||
tasks[3].Status.State = swarm.TaskStateRunning
|
||||
ut.testUpdater(tasks, false, []progress.Progress{
|
||||
{ID: "1/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||
{ID: "2/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||
{ID: "3/5", Action: "running ", Current: 9, Total: 10, HideCounts: true},
|
||||
{ID: "4/5", Action: "running ", Current: 9, Total: 10, HideCounts: true},
|
||||
{ID: "job progress", Action: "2 out of 5 complete", Current: 2, Total: 5, HideCounts: true},
|
||||
{ID: "active tasks", Action: "2 out of 2 tasks"},
|
||||
})
|
||||
|
||||
tasks[3].Status.State = swarm.TaskStateComplete
|
||||
tasks = append(tasks,
|
||||
swarm.Task{
|
||||
ID: "task5",
|
||||
Slot: 4,
|
||||
NodeID: "",
|
||||
DesiredState: swarm.TaskStateComplete,
|
||||
Status: swarm.TaskStatus{State: swarm.TaskStateRunning},
|
||||
JobIteration: &swarm.Version{Index: service.JobStatus.JobIteration.Index},
|
||||
},
|
||||
)
|
||||
ut.testUpdater(tasks, false, []progress.Progress{
|
||||
{ID: "1/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||
{ID: "2/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||
{ID: "3/5", Action: "running ", Current: 9, Total: 10, HideCounts: true},
|
||||
{ID: "4/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||
{ID: "5/5", Action: "running ", Current: 9, Total: 10, HideCounts: true},
|
||||
{ID: "job progress", Action: "3 out of 5 complete", Current: 3, Total: 5, HideCounts: true},
|
||||
{ID: "active tasks", Action: "2 out of 2 tasks"},
|
||||
})
|
||||
|
||||
tasks[2].Status.State = swarm.TaskStateFailed
|
||||
tasks[2].Status.Err = "the task failed"
|
||||
ut.testUpdater(tasks, false, []progress.Progress{
|
||||
{ID: "1/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||
{ID: "2/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||
{ID: "3/5", Action: "the task failed"},
|
||||
{ID: "4/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||
{ID: "5/5", Action: "running ", Current: 9, Total: 10, HideCounts: true},
|
||||
{ID: "job progress", Action: "3 out of 5 complete", Current: 3, Total: 5, HideCounts: true},
|
||||
{ID: "active tasks", Action: "1 out of 2 tasks"},
|
||||
})
|
||||
|
||||
tasks[4].Status.State = swarm.TaskStateComplete
|
||||
tasks = append(tasks,
|
||||
swarm.Task{
|
||||
ID: "task6",
|
||||
Slot: 2,
|
||||
NodeID: "",
|
||||
DesiredState: swarm.TaskStateComplete,
|
||||
Status: swarm.TaskStatus{State: swarm.TaskStateRunning},
|
||||
JobIteration: &swarm.Version{Index: service.JobStatus.JobIteration.Index},
|
||||
},
|
||||
)
|
||||
ut.testUpdater(tasks, false, []progress.Progress{
|
||||
{ID: "1/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||
{ID: "2/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||
{ID: "3/5", Action: "running ", Current: 9, Total: 10, HideCounts: true},
|
||||
{ID: "4/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||
{ID: "5/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||
{ID: "job progress", Action: "4 out of 5 complete", Current: 4, Total: 5, HideCounts: true},
|
||||
{ID: "active tasks", Action: "1 out of 1 tasks"},
|
||||
})
|
||||
|
||||
tasks[5].Status.State = swarm.TaskStateComplete
|
||||
ut.testUpdater(tasks, true, []progress.Progress{
|
||||
{ID: "1/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||
{ID: "2/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||
{ID: "3/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||
{ID: "4/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||
{ID: "5/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||
{ID: "job progress", Action: "5 out of 5 complete", Current: 5, Total: 5, HideCounts: true},
|
||||
{ID: "active tasks", Action: "0 out of 0 tasks"},
|
||||
})
|
||||
}
|
||||
|
||||
func TestReplicatedJobProgressUpdaterLarge(t *testing.T) {
|
||||
concurrent := uint64(10)
|
||||
total := uint64(50)
|
||||
|
||||
service := swarm.Service{
|
||||
Spec: swarm.ServiceSpec{
|
||||
Mode: swarm.ServiceMode{
|
||||
ReplicatedJob: &swarm.ReplicatedJob{
|
||||
MaxConcurrent: &concurrent,
|
||||
TotalCompletions: &total,
|
||||
},
|
||||
},
|
||||
},
|
||||
JobStatus: &swarm.JobStatus{
|
||||
JobIteration: swarm.Version{Index: 0},
|
||||
},
|
||||
}
|
||||
|
||||
p := &mockProgress{}
|
||||
ut := updaterTester{
|
||||
t: t,
|
||||
updater: newReplicatedJobProgressUpdater(service, p),
|
||||
p: p,
|
||||
activeNodes: map[string]struct{}{"a": {}, "b": {}},
|
||||
service: service,
|
||||
}
|
||||
|
||||
tasks := []swarm.Task{}
|
||||
|
||||
// see the comments in TestReplicatedJobProgressUpdaterSmall for why
|
||||
// we write this out twice.
|
||||
ut.testUpdater(tasks, false, []progress.Progress{
|
||||
{ID: "job progress", Action: " 0 out of 50 complete", Current: 0, Total: 50, HideCounts: true},
|
||||
{ID: "active tasks", Action: " 0 out of 10 tasks"},
|
||||
// we don't write out individual status bars for a large job, only the
|
||||
// overall progress bar
|
||||
{ID: "job progress", Action: " 0 out of 50 complete", Current: 0, Total: 50, HideCounts: true},
|
||||
{ID: "active tasks", Action: " 0 out of 10 tasks"},
|
||||
})
|
||||
|
||||
// first, create the initial batch of running tasks
|
||||
for i := 0; i < int(concurrent); i++ {
|
||||
tasks = append(tasks, swarm.Task{
|
||||
ID: strconv.Itoa(i),
|
||||
Slot: i,
|
||||
DesiredState: swarm.TaskStateComplete,
|
||||
Status: swarm.TaskStatus{State: swarm.TaskStateNew},
|
||||
JobIteration: &swarm.Version{Index: 0},
|
||||
})
|
||||
|
||||
ut.testUpdater(tasks, false, []progress.Progress{
|
||||
{ID: "job progress", Action: " 0 out of 50 complete", Current: 0, Total: 50, HideCounts: true},
|
||||
{ID: "active tasks", Action: fmt.Sprintf("%2d out of 10 tasks", i+1)},
|
||||
})
|
||||
}
|
||||
|
||||
// now, start moving tasks to completed, and starting new tasks after them.
|
||||
// to do this, we'll start at 0, mark a task complete, and then append a
|
||||
// new one. we'll stop before we get to the end, because the end has a
|
||||
// steadily decreasing denominator for the active tasks
|
||||
//
|
||||
// for 10 concurrent 50 total, this means we'll stop at 50 - 10 = 40 tasks
|
||||
// in the completed state, 10 tasks running. the last index in use will be
|
||||
// 39.
|
||||
for i := 0; i < int(total)-int(concurrent); i++ {
|
||||
tasks[i].Status.State = swarm.TaskStateComplete
|
||||
ut.testUpdater(tasks, false, []progress.Progress{
|
||||
{ID: "job progress", Action: fmt.Sprintf("%2d out of 50 complete", i+1), Current: int64(i + 1), Total: 50, HideCounts: true},
|
||||
{ID: "active tasks", Action: " 9 out of 10 tasks"},
|
||||
})
|
||||
|
||||
last := len(tasks)
|
||||
tasks = append(tasks, swarm.Task{
|
||||
ID: strconv.Itoa(last),
|
||||
Slot: last,
|
||||
DesiredState: swarm.TaskStateComplete,
|
||||
Status: swarm.TaskStatus{State: swarm.TaskStateNew},
|
||||
JobIteration: &swarm.Version{Index: 0},
|
||||
})
|
||||
|
||||
ut.testUpdater(tasks, false, []progress.Progress{
|
||||
{ID: "job progress", Action: fmt.Sprintf("%2d out of 50 complete", i+1), Current: int64(i + 1), Total: 50, HideCounts: true},
|
||||
{ID: "active tasks", Action: "10 out of 10 tasks"},
|
||||
})
|
||||
}
|
||||
|
||||
// quick check, to make sure we did the math right when we wrote this code:
|
||||
// we do have 50 tasks in the slice, right?
|
||||
assert.Check(t, is.Equal(len(tasks), int(total)))
|
||||
|
||||
// now, we're down to our last 10 tasks, which are all running. We need to
|
||||
// wind these down
|
||||
for i := int(total) - int(concurrent) - 1; i < int(total); i++ {
|
||||
tasks[i].Status.State = swarm.TaskStateComplete
|
||||
ut.testUpdater(tasks, (i+1 == int(total)), []progress.Progress{
|
||||
{ID: "job progress", Action: fmt.Sprintf("%2d out of 50 complete", i+1), Current: int64(i + 1), Total: 50, HideCounts: true},
|
||||
{ID: "active tasks", Action: fmt.Sprintf("%2[1]d out of %2[1]d tasks", int(total)-(i+1))},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestGlobalJobProgressUpdaterSmall(t *testing.T) {
|
||||
service := swarm.Service{
|
||||
Spec: swarm.ServiceSpec{
|
||||
Mode: swarm.ServiceMode{
|
||||
GlobalJob: &swarm.GlobalJob{},
|
||||
},
|
||||
},
|
||||
JobStatus: &swarm.JobStatus{
|
||||
JobIteration: swarm.Version{Index: 1},
|
||||
},
|
||||
}
|
||||
|
||||
p := &mockProgress{}
|
||||
ut := updaterTester{
|
||||
t: t,
|
||||
updater: &globalJobProgressUpdater{
|
||||
progressOut: p,
|
||||
},
|
||||
p: p,
|
||||
activeNodes: map[string]struct{}{"a": {}, "b": {}, "c": {}},
|
||||
service: service,
|
||||
}
|
||||
|
||||
tasks := []swarm.Task{
|
||||
{
|
||||
ID: "oldtask1",
|
||||
DesiredState: swarm.TaskStateComplete,
|
||||
Status: swarm.TaskStatus{State: swarm.TaskStateComplete},
|
||||
JobIteration: &swarm.Version{Index: 0},
|
||||
NodeID: "a",
|
||||
}, {
|
||||
ID: "oldtask2",
|
||||
DesiredState: swarm.TaskStateComplete,
|
||||
Status: swarm.TaskStatus{State: swarm.TaskStateComplete},
|
||||
JobIteration: &swarm.Version{Index: 0},
|
||||
NodeID: "b",
|
||||
}, {
|
||||
ID: "task1",
|
||||
DesiredState: swarm.TaskStateComplete,
|
||||
Status: swarm.TaskStatus{State: swarm.TaskStateNew},
|
||||
JobIteration: &swarm.Version{Index: 1},
|
||||
NodeID: "a",
|
||||
}, {
|
||||
ID: "task2",
|
||||
DesiredState: swarm.TaskStateComplete,
|
||||
Status: swarm.TaskStatus{State: swarm.TaskStateNew},
|
||||
JobIteration: &swarm.Version{Index: 1},
|
||||
NodeID: "b",
|
||||
}, {
|
||||
ID: "task3",
|
||||
DesiredState: swarm.TaskStateComplete,
|
||||
Status: swarm.TaskStatus{State: swarm.TaskStateNew},
|
||||
JobIteration: &swarm.Version{Index: 1},
|
||||
NodeID: "c",
|
||||
},
|
||||
}
|
||||
|
||||
// we don't know how many tasks will be created until we get the initial
|
||||
// task list, so we should not write out any definitive answers yet.
|
||||
ut.testUpdater([]swarm.Task{}, false, []progress.Progress{
|
||||
{ID: "job progress", Action: "waiting for tasks"},
|
||||
})
|
||||
|
||||
ut.testUpdaterNoOrder(tasks, false, []progress.Progress{
|
||||
{ID: "job progress", Action: "0 out of 3 complete", Current: 0, Total: 3, HideCounts: true},
|
||||
{ID: "a", Action: "new ", Current: 1, Total: 10, HideCounts: true},
|
||||
{ID: "b", Action: "new ", Current: 1, Total: 10, HideCounts: true},
|
||||
{ID: "c", Action: "new ", Current: 1, Total: 10, HideCounts: true},
|
||||
{ID: "job progress", Action: "0 out of 3 complete", Current: 0, Total: 3, HideCounts: true},
|
||||
})
|
||||
|
||||
tasks[2].Status.State = swarm.TaskStatePreparing
|
||||
tasks[3].Status.State = swarm.TaskStateRunning
|
||||
tasks[4].Status.State = swarm.TaskStateAccepted
|
||||
ut.testUpdaterNoOrder(tasks, false, []progress.Progress{
|
||||
{ID: "a", Action: "preparing", Current: 6, Total: 10, HideCounts: true},
|
||||
{ID: "b", Action: "running ", Current: 9, Total: 10, HideCounts: true},
|
||||
{ID: "c", Action: "accepted ", Current: 5, Total: 10, HideCounts: true},
|
||||
{ID: "job progress", Action: "0 out of 3 complete", Current: 0, Total: 3, HideCounts: true},
|
||||
})
|
||||
|
||||
tasks[2].Status.State = swarm.TaskStateRunning
|
||||
tasks[3].Status.State = swarm.TaskStateComplete
|
||||
tasks[4].Status.State = swarm.TaskStateRunning
|
||||
ut.testUpdaterNoOrder(tasks, false, []progress.Progress{
|
||||
{ID: "a", Action: "running ", Current: 9, Total: 10, HideCounts: true},
|
||||
{ID: "b", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||
{ID: "c", Action: "running ", Current: 9, Total: 10, HideCounts: true},
|
||||
{ID: "job progress", Action: "1 out of 3 complete", Current: 1, Total: 3, HideCounts: true},
|
||||
})
|
||||
|
||||
tasks[2].Status.State = swarm.TaskStateFailed
|
||||
tasks[2].Status.Err = "task failed"
|
||||
tasks[4].Status.State = swarm.TaskStateComplete
|
||||
ut.testUpdaterNoOrder(tasks, false, []progress.Progress{
|
||||
{ID: "a", Action: "task failed"},
|
||||
{ID: "b", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||
{ID: "c", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||
{ID: "job progress", Action: "2 out of 3 complete", Current: 2, Total: 3, HideCounts: true},
|
||||
})
|
||||
|
||||
tasks = append(tasks, swarm.Task{
|
||||
ID: "task4",
|
||||
DesiredState: swarm.TaskStateComplete,
|
||||
Status: swarm.TaskStatus{State: swarm.TaskStatePreparing},
|
||||
NodeID: tasks[2].NodeID,
|
||||
JobIteration: &swarm.Version{Index: 1},
|
||||
})
|
||||
|
||||
ut.testUpdaterNoOrder(tasks, false, []progress.Progress{
|
||||
{ID: "a", Action: "preparing", Current: 6, Total: 10, HideCounts: true},
|
||||
{ID: "b", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||
{ID: "c", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||
{ID: "job progress", Action: "2 out of 3 complete", Current: 2, Total: 3, HideCounts: true},
|
||||
})
|
||||
|
||||
tasks[5].Status.State = swarm.TaskStateComplete
|
||||
ut.testUpdaterNoOrder(tasks, true, []progress.Progress{
|
||||
{ID: "a", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||
{ID: "b", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||
{ID: "c", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||
{ID: "job progress", Action: "3 out of 3 complete", Current: 3, Total: 3, HideCounts: true},
|
||||
})
|
||||
}
|
||||
|
||||
func TestGlobalJobProgressUpdaterLarge(t *testing.T) {
|
||||
service := swarm.Service{
|
||||
Spec: swarm.ServiceSpec{
|
||||
Mode: swarm.ServiceMode{
|
||||
GlobalJob: &swarm.GlobalJob{},
|
||||
},
|
||||
},
|
||||
JobStatus: &swarm.JobStatus{
|
||||
JobIteration: swarm.Version{Index: 1},
|
||||
},
|
||||
}
|
||||
|
||||
activeNodes := map[string]struct{}{}
|
||||
for i := 0; i < 50; i++ {
|
||||
activeNodes[fmt.Sprintf("node%v", i)] = struct{}{}
|
||||
}
|
||||
|
||||
p := &mockProgress{}
|
||||
ut := updaterTester{
|
||||
t: t,
|
||||
updater: &globalJobProgressUpdater{
|
||||
progressOut: p,
|
||||
},
|
||||
p: p,
|
||||
activeNodes: activeNodes,
|
||||
service: service,
|
||||
}
|
||||
|
||||
tasks := []swarm.Task{}
|
||||
for nodeID := range activeNodes {
|
||||
tasks = append(tasks, swarm.Task{
|
||||
ID: fmt.Sprintf("task%s", nodeID),
|
||||
NodeID: nodeID,
|
||||
DesiredState: swarm.TaskStateComplete,
|
||||
Status: swarm.TaskStatus{
|
||||
State: swarm.TaskStateNew,
|
||||
},
|
||||
JobIteration: &swarm.Version{Index: 1},
|
||||
})
|
||||
}
|
||||
|
||||
// no bars, because too many tasks
|
||||
ut.testUpdater(tasks, false, []progress.Progress{
|
||||
{ID: "job progress", Action: " 0 out of 50 complete", Current: 0, Total: 50, HideCounts: true},
|
||||
{ID: "job progress", Action: " 0 out of 50 complete", Current: 0, Total: 50, HideCounts: true},
|
||||
})
|
||||
|
||||
for i := range tasks {
|
||||
tasks[i].Status.State = swarm.TaskStateComplete
|
||||
ut.testUpdater(tasks, i+1 == len(activeNodes), []progress.Progress{
|
||||
{
|
||||
ID: "job progress",
|
||||
Action: fmt.Sprintf("%2d out of 50 complete", i+1),
|
||||
Current: int64(i + 1), Total: 50, HideCounts: true,
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -102,12 +102,14 @@ func runServiceScale(ctx context.Context, dockerCli command.Cli, serviceID strin
|
|||
}
|
||||
|
||||
serviceMode := &service.Spec.Mode
|
||||
if serviceMode.Replicated == nil {
|
||||
return errors.Errorf("scale can only be used with replicated mode")
|
||||
if serviceMode.Replicated != nil {
|
||||
serviceMode.Replicated.Replicas = &scale
|
||||
} else if serviceMode.ReplicatedJob != nil {
|
||||
serviceMode.ReplicatedJob.TotalCompletions = &scale
|
||||
} else {
|
||||
return errors.Errorf("scale can only be used with replicated or replicated-job mode")
|
||||
}
|
||||
|
||||
serviceMode.Replicated.Replicas = &scale
|
||||
|
||||
response, err := client.ServiceUpdate(ctx, service.ID, service.Version, service.Spec, types.ServiceUpdateOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -26,3 +26,17 @@ replicas: 2/3 (max 1 per node)
|
|||
image:
|
||||
ports:
|
||||
|
||||
id: 05_job1
|
||||
name: zarp1
|
||||
mode: replicated job
|
||||
replicas: 2/3 (5/10 completed)
|
||||
image:
|
||||
ports:
|
||||
|
||||
id: 06_job2
|
||||
name: zarp2
|
||||
mode: global job
|
||||
replicas: 1/1 (3/4 completed)
|
||||
image:
|
||||
ports:
|
||||
|
||||
|
|
|
@ -41,7 +41,8 @@ Options:
|
|||
--limit-memory bytes Limit Memory
|
||||
--log-driver string Logging driver for service
|
||||
--log-opt list Logging driver options
|
||||
--mode string Service mode (replicated or global) (default "replicated")
|
||||
--max-concurrent Number of job tasks to run at once (default equal to --replicas)
|
||||
--mode string Service mode (replicated, global, replicated-job, or global-job) (default "replicated")
|
||||
--mount mount Attach a filesystem mount to the service
|
||||
--name string Service name
|
||||
--network network Network attachments
|
||||
|
@ -1115,6 +1116,66 @@ $ docker service create \
|
|||
nvidia/cuda
|
||||
```
|
||||
|
||||
### Running as a job
|
||||
|
||||
Jobs are a special kind of service designed to run an operation to completion
|
||||
and then stop, as opposed to running long-running daemons. When a Task
|
||||
belonging to a job exits successfully (return value 0), the Task is marked as
|
||||
"Completed", and is not run again.
|
||||
|
||||
Jobs are started by using one of two modes, `replicated-job` or `global-job`
|
||||
|
||||
```bash
|
||||
$ docker service create --name myjob \
|
||||
--mode replicated-job \
|
||||
bash "true"
|
||||
```
|
||||
|
||||
This command will run one Task, which will, using the `bash` image, execute the
|
||||
command `true`, which will return 0 and then exit.
|
||||
|
||||
Though Jobs are ultimately a different kind of service, they a couple of
|
||||
caveats compared to other services:
|
||||
|
||||
- None of the update or rollback configuration options are valid. Jobs can be
|
||||
updated, but cannot be rolled out or rolled back, making these configuration
|
||||
options moot.
|
||||
- Jobs are never restarted on reaching the `Complete` state. This means that
|
||||
for jobs, setting `--restart-condition` to `any` is the same as setting it to
|
||||
`on-failure`.
|
||||
|
||||
Jobs are available in both replicated and global modes.
|
||||
|
||||
#### Replicated Jobs
|
||||
|
||||
A replicated job is like a replicated service. Setting the `--replicas` flag
|
||||
will specify total number of iterations of a job to execute.
|
||||
|
||||
By default, all replicas of a replicated job will launch at once. To control
|
||||
the total number of replicas that are executing simultaneously at any one time,
|
||||
the `--max-concurrent` flag can be used:
|
||||
|
||||
```bash
|
||||
$ docker service create --name mythrottledjob \
|
||||
--mode replicated-job \
|
||||
--replicas 10 \
|
||||
--max-concurrent 2 \
|
||||
bash "true"
|
||||
```
|
||||
|
||||
The above command will execute 10 Tasks in total, but only 2 of them will be
|
||||
run at any given time.
|
||||
|
||||
#### Global Jobs
|
||||
|
||||
Global jobs are like global services, in that a Task is executed once on each node
|
||||
matching placement constraints. Global jobs are represented by the mode `global-job`.
|
||||
|
||||
Note that after a Global job is created, any new Nodes added to the cluster
|
||||
will have a Task from that job started on them. The Global Job does not as a
|
||||
whole have a "done" state, except insofar as every Node meeting the job's
|
||||
constraints has a Completed task.
|
||||
|
||||
## Related commands
|
||||
|
||||
* [service inspect](service_inspect.md)
|
||||
|
|
|
@ -39,14 +39,17 @@ On a manager node:
|
|||
```bash
|
||||
$ docker service ls
|
||||
|
||||
ID NAME MODE REPLICAS IMAGE
|
||||
c8wgl7q4ndfd frontend replicated 5/5 nginx:alpine
|
||||
dmu1ept4cxcf redis replicated 3/3 redis:3.0.6
|
||||
iwe3278osahj mongo global 7/7 mongo:3.3
|
||||
ID NAME MODE REPLICAS IMAGE
|
||||
c8wgl7q4ndfd frontend replicated 5/5 nginx:alpine
|
||||
dmu1ept4cxcf redis replicated 3/3 redis:3.0.6
|
||||
iwe3278osahj mongo global 7/7 mongo:3.3
|
||||
hh08h9uu8uwr job replicated-job 1/1 (3/5 completed) nginx:latest
|
||||
```
|
||||
|
||||
The `REPLICAS` column shows both the *actual* and *desired* number of tasks for
|
||||
the service.
|
||||
the service. If the service is in `replicated-job` or `global-job`, it will
|
||||
additionally show the completion status of the job as completed tasks over
|
||||
total tasks the job will execute.
|
||||
|
||||
### Filtering
|
||||
|
||||
|
|
|
@ -52,7 +52,7 @@ b4g08uwuairexjub6ome6usqh
|
|||
|
||||
$ docker service scale backend=10
|
||||
|
||||
backend: scale can only be used with replicated mode
|
||||
backend: scale can only be used with replicated or replicated-job mode
|
||||
```
|
||||
|
||||
Directly afterwards, run `docker service ls`, to see the actual number of
|
||||
|
|
|
@ -54,6 +54,7 @@ Options:
|
|||
--limit-memory bytes Limit Memory
|
||||
--log-driver string Logging driver for service
|
||||
--log-opt list Logging driver options
|
||||
--max-concurrent Number of job tasks to run at once (default equal to --replicas)
|
||||
--mount-add mount Add or update a mount on a service
|
||||
--mount-rm list Remove a mount by its target path
|
||||
--network-add network Add a network
|
||||
|
@ -298,6 +299,23 @@ See [`service create`](service_create.md#create-services-using-templates) for th
|
|||
`service update` supports the same `--isolation` flag as `service create`
|
||||
See [`service create`](service_create.md) for the reference.
|
||||
|
||||
### Updating Jobs
|
||||
|
||||
When a service is created as a job, by setting its mode to `replicated-job` or
|
||||
to `global-job` when doing `service create`, options for updating it are
|
||||
limited.
|
||||
|
||||
Updating a Job immediately stops any Tasks that are in progress. The operation
|
||||
creates a new set of Tasks for the job and effectively resets its completion
|
||||
status. If any Tasks were running before the update, they are stopped, and new
|
||||
Tasks are created.
|
||||
|
||||
Jobs cannot be rolled out or rolled back. None of the flags for configuring
|
||||
update or rollback settings are valid with job modes.
|
||||
|
||||
To run a job again with the same parameters that it was run previously, it can
|
||||
be force updated with the `--force` flag.
|
||||
|
||||
## Related commands
|
||||
|
||||
* [service create](service_create.md)
|
||||
|
|
Loading…
Reference in New Issue