mirror of https://github.com/docker/cli.git
Merge pull request #2262 from dperny/swarm-jobs
Add jobs support to CLI
This commit is contained in:
commit
4f058143c7
|
@ -31,7 +31,7 @@ func newCreateCommand(dockerCli command.Cli) *cobra.Command {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
flags := cmd.Flags()
|
flags := cmd.Flags()
|
||||||
flags.StringVar(&opts.mode, flagMode, "replicated", "Service mode (replicated or global)")
|
flags.StringVar(&opts.mode, flagMode, "replicated", "Service mode (replicated, global, replicated-job, or global-job)")
|
||||||
flags.StringVar(&opts.name, flagName, "", "Service name")
|
flags.StringVar(&opts.name, flagName, "", "Service name")
|
||||||
|
|
||||||
addServiceFlags(flags, opts, buildServiceDefaultFlagMapping())
|
addServiceFlags(flags, opts, buildServiceDefaultFlagMapping())
|
||||||
|
|
|
@ -596,6 +596,10 @@ func (c *serviceContext) Mode() string {
|
||||||
return "global"
|
return "global"
|
||||||
case c.service.Spec.Mode.Replicated != nil:
|
case c.service.Spec.Mode.Replicated != nil:
|
||||||
return "replicated"
|
return "replicated"
|
||||||
|
case c.service.Spec.Mode.ReplicatedJob != nil:
|
||||||
|
return "replicated job"
|
||||||
|
case c.service.Spec.Mode.GlobalJob != nil:
|
||||||
|
return "global job"
|
||||||
default:
|
default:
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
@ -604,10 +608,33 @@ func (c *serviceContext) Mode() string {
|
||||||
func (c *serviceContext) Replicas() string {
|
func (c *serviceContext) Replicas() string {
|
||||||
s := &c.service
|
s := &c.service
|
||||||
|
|
||||||
var running, desired uint64
|
var running, desired, completed uint64
|
||||||
if s.ServiceStatus != nil {
|
if s.ServiceStatus != nil {
|
||||||
running = c.service.ServiceStatus.RunningTasks
|
running = c.service.ServiceStatus.RunningTasks
|
||||||
desired = c.service.ServiceStatus.DesiredTasks
|
desired = c.service.ServiceStatus.DesiredTasks
|
||||||
|
completed = c.service.ServiceStatus.CompletedTasks
|
||||||
|
}
|
||||||
|
// for jobs, we will not include the max per node, even if it is set. jobs
|
||||||
|
// include instead the progress of the job as a whole, in addition to the
|
||||||
|
// current running state. the system respects max per node, but if we
|
||||||
|
// included it in the list output, the lines for jobs would be entirely too
|
||||||
|
// long and make the UI look bad.
|
||||||
|
if s.Spec.Mode.ReplicatedJob != nil {
|
||||||
|
return fmt.Sprintf(
|
||||||
|
"%d/%d (%d/%d completed)",
|
||||||
|
running, desired, completed, *s.Spec.Mode.ReplicatedJob.TotalCompletions,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
if s.Spec.Mode.GlobalJob != nil {
|
||||||
|
// for global jobs, we need to do a little math. desired tasks are only
|
||||||
|
// the tasks that have not yet actually reached the Completed state.
|
||||||
|
// Completed tasks have reached the completed state. the TOTAL number
|
||||||
|
// of tasks to run is the sum of the tasks desired to still complete,
|
||||||
|
// and the tasks actually completed.
|
||||||
|
return fmt.Sprintf(
|
||||||
|
"%d/%d (%d/%d completed)",
|
||||||
|
running, desired, completed, desired+completed,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
if r := c.maxReplicas(); r > 0 {
|
if r := c.maxReplicas(); r > 0 {
|
||||||
return fmt.Sprintf("%d/%d (max %d per node)", running, desired, r)
|
return fmt.Sprintf("%d/%d (max %d per node)", running, desired, r)
|
||||||
|
|
|
@ -15,6 +15,13 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestServiceContextWrite(t *testing.T) {
|
func TestServiceContextWrite(t *testing.T) {
|
||||||
|
var (
|
||||||
|
// we need a pair of variables for setting the job parameters, because
|
||||||
|
// those parameters take pointers to uint64, which we can't make as a
|
||||||
|
// literal
|
||||||
|
varThree uint64 = 3
|
||||||
|
varTen uint64 = 10
|
||||||
|
)
|
||||||
cases := []struct {
|
cases := []struct {
|
||||||
context formatter.Context
|
context formatter.Context
|
||||||
expected string
|
expected string
|
||||||
|
@ -38,6 +45,8 @@ func TestServiceContextWrite(t *testing.T) {
|
||||||
01_baz baz global 1/3 *:80->8080/tcp
|
01_baz baz global 1/3 *:80->8080/tcp
|
||||||
04_qux2 qux2 replicated 3/3 (max 2 per node)
|
04_qux2 qux2 replicated 3/3 (max 2 per node)
|
||||||
03_qux10 qux10 replicated 2/3 (max 1 per node)
|
03_qux10 qux10 replicated 2/3 (max 1 per node)
|
||||||
|
05_job1 zarp1 replicated job 2/3 (5/10 completed)
|
||||||
|
06_job2 zarp2 global job 1/1 (3/4 completed)
|
||||||
`,
|
`,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
@ -46,6 +55,8 @@ func TestServiceContextWrite(t *testing.T) {
|
||||||
01_baz
|
01_baz
|
||||||
04_qux2
|
04_qux2
|
||||||
03_qux10
|
03_qux10
|
||||||
|
05_job1
|
||||||
|
06_job2
|
||||||
`,
|
`,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
@ -55,6 +66,8 @@ bar replicated
|
||||||
baz global
|
baz global
|
||||||
qux2 replicated
|
qux2 replicated
|
||||||
qux10 replicated
|
qux10 replicated
|
||||||
|
zarp1 replicated job
|
||||||
|
zarp2 global job
|
||||||
`,
|
`,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
@ -64,6 +77,8 @@ bar
|
||||||
baz
|
baz
|
||||||
qux2
|
qux2
|
||||||
qux10
|
qux10
|
||||||
|
zarp1
|
||||||
|
zarp2
|
||||||
`,
|
`,
|
||||||
},
|
},
|
||||||
// Raw Format
|
// Raw Format
|
||||||
|
@ -77,6 +92,8 @@ qux10
|
||||||
id: 01_baz
|
id: 01_baz
|
||||||
id: 04_qux2
|
id: 04_qux2
|
||||||
id: 03_qux10
|
id: 03_qux10
|
||||||
|
id: 05_job1
|
||||||
|
id: 06_job2
|
||||||
`,
|
`,
|
||||||
},
|
},
|
||||||
// Custom Format
|
// Custom Format
|
||||||
|
@ -86,6 +103,8 @@ id: 03_qux10
|
||||||
baz
|
baz
|
||||||
qux2
|
qux2
|
||||||
qux10
|
qux10
|
||||||
|
zarp1
|
||||||
|
zarp2
|
||||||
`,
|
`,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -170,6 +189,37 @@ qux10
|
||||||
DesiredTasks: 3,
|
DesiredTasks: 3,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
ID: "05_job1",
|
||||||
|
Spec: swarm.ServiceSpec{
|
||||||
|
Annotations: swarm.Annotations{Name: "zarp1"},
|
||||||
|
Mode: swarm.ServiceMode{
|
||||||
|
ReplicatedJob: &swarm.ReplicatedJob{
|
||||||
|
MaxConcurrent: &varThree,
|
||||||
|
TotalCompletions: &varTen,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
ServiceStatus: &swarm.ServiceStatus{
|
||||||
|
RunningTasks: 2,
|
||||||
|
DesiredTasks: 3,
|
||||||
|
CompletedTasks: 5,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ID: "06_job2",
|
||||||
|
Spec: swarm.ServiceSpec{
|
||||||
|
Annotations: swarm.Annotations{Name: "zarp2"},
|
||||||
|
Mode: swarm.ServiceMode{
|
||||||
|
GlobalJob: &swarm.GlobalJob{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
ServiceStatus: &swarm.ServiceStatus{
|
||||||
|
RunningTasks: 1,
|
||||||
|
DesiredTasks: 1,
|
||||||
|
CompletedTasks: 3,
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
out := bytes.NewBufferString("")
|
out := bytes.NewBufferString("")
|
||||||
testcase.context.Output = out
|
testcase.context.Output = out
|
||||||
|
|
|
@ -111,6 +111,8 @@ func AppendServiceStatus(ctx context.Context, c client.APIClient, services []swa
|
||||||
status := map[string]*swarm.ServiceStatus{}
|
status := map[string]*swarm.ServiceStatus{}
|
||||||
taskFilter := filters.NewArgs()
|
taskFilter := filters.NewArgs()
|
||||||
for i, s := range services {
|
for i, s := range services {
|
||||||
|
// there is no need in this switch to check for job modes. jobs are not
|
||||||
|
// supported until after ServiceStatus was introduced.
|
||||||
switch {
|
switch {
|
||||||
case s.ServiceStatus != nil:
|
case s.ServiceStatus != nil:
|
||||||
// Server already returned service-status, so we don't
|
// Server already returned service-status, so we don't
|
||||||
|
|
|
@ -508,8 +508,9 @@ type serviceOptions struct {
|
||||||
resources resourceOptions
|
resources resourceOptions
|
||||||
stopGrace opts.DurationOpt
|
stopGrace opts.DurationOpt
|
||||||
|
|
||||||
replicas Uint64Opt
|
replicas Uint64Opt
|
||||||
mode string
|
mode string
|
||||||
|
maxConcurrent Uint64Opt
|
||||||
|
|
||||||
restartPolicy restartPolicyOptions
|
restartPolicy restartPolicyOptions
|
||||||
constraints opts.ListOpts
|
constraints opts.ListOpts
|
||||||
|
@ -554,18 +555,45 @@ func (options *serviceOptions) ToServiceMode() (swarm.ServiceMode, error) {
|
||||||
switch options.mode {
|
switch options.mode {
|
||||||
case "global":
|
case "global":
|
||||||
if options.replicas.Value() != nil {
|
if options.replicas.Value() != nil {
|
||||||
return serviceMode, errors.Errorf("replicas can only be used with replicated mode")
|
return serviceMode, errors.Errorf("replicas can only be used with replicated or replicated-job mode")
|
||||||
}
|
}
|
||||||
|
|
||||||
if options.maxReplicas > 0 {
|
if options.maxReplicas > 0 {
|
||||||
return serviceMode, errors.New("replicas-max-per-node can only be used with replicated mode")
|
return serviceMode, errors.New("replicas-max-per-node can only be used with replicated or replicated-job mode")
|
||||||
|
}
|
||||||
|
if options.maxConcurrent.Value() != nil {
|
||||||
|
return serviceMode, errors.New("max-concurrent can only be used with replicated-job mode")
|
||||||
}
|
}
|
||||||
|
|
||||||
serviceMode.Global = &swarm.GlobalService{}
|
serviceMode.Global = &swarm.GlobalService{}
|
||||||
case "replicated":
|
case "replicated":
|
||||||
|
if options.maxConcurrent.Value() != nil {
|
||||||
|
return serviceMode, errors.New("max-concurrent can only be used with replicated-job mode")
|
||||||
|
}
|
||||||
|
|
||||||
serviceMode.Replicated = &swarm.ReplicatedService{
|
serviceMode.Replicated = &swarm.ReplicatedService{
|
||||||
Replicas: options.replicas.Value(),
|
Replicas: options.replicas.Value(),
|
||||||
}
|
}
|
||||||
|
case "replicated-job":
|
||||||
|
concurrent := options.maxConcurrent.Value()
|
||||||
|
if concurrent == nil {
|
||||||
|
concurrent = options.replicas.Value()
|
||||||
|
}
|
||||||
|
serviceMode.ReplicatedJob = &swarm.ReplicatedJob{
|
||||||
|
MaxConcurrent: concurrent,
|
||||||
|
TotalCompletions: options.replicas.Value(),
|
||||||
|
}
|
||||||
|
case "global-job":
|
||||||
|
if options.maxReplicas > 0 {
|
||||||
|
return serviceMode, errors.New("replicas-max-per-node can only be used with replicated or replicated-job mode")
|
||||||
|
}
|
||||||
|
if options.maxConcurrent.Value() != nil {
|
||||||
|
return serviceMode, errors.New("max-concurrent can only be used with replicated-job mode")
|
||||||
|
}
|
||||||
|
if options.replicas.Value() != nil {
|
||||||
|
return serviceMode, errors.Errorf("replicas can only be used with replicated or replicated-job mode")
|
||||||
|
}
|
||||||
|
serviceMode.GlobalJob = &swarm.GlobalJob{}
|
||||||
default:
|
default:
|
||||||
return serviceMode, errors.Errorf("Unknown mode: %s, only replicated and global supported", options.mode)
|
return serviceMode, errors.Errorf("Unknown mode: %s, only replicated and global supported", options.mode)
|
||||||
}
|
}
|
||||||
|
@ -579,14 +607,13 @@ func (options *serviceOptions) ToStopGracePeriod(flags *pflag.FlagSet) *time.Dur
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (options *serviceOptions) ToService(ctx context.Context, apiClient client.NetworkAPIClient, flags *pflag.FlagSet) (swarm.ServiceSpec, error) {
|
// makeEnv gets the environment variables from the command line options and
|
||||||
var service swarm.ServiceSpec
|
// returns a slice of strings to use in the service spec when doing ToService
|
||||||
|
func (options *serviceOptions) makeEnv() ([]string, error) {
|
||||||
envVariables, err := opts.ReadKVEnvStrings(options.envFile.GetAll(), options.env.GetAll())
|
envVariables, err := opts.ReadKVEnvStrings(options.envFile.GetAll(), options.env.GetAll())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return service, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
currentEnv := make([]string, 0, len(envVariables))
|
currentEnv := make([]string, 0, len(envVariables))
|
||||||
for _, env := range envVariables { // need to process each var, in order
|
for _, env := range envVariables { // need to process each var, in order
|
||||||
k := strings.SplitN(env, "=", 2)[0]
|
k := strings.SplitN(env, "=", 2)[0]
|
||||||
|
@ -601,6 +628,24 @@ func (options *serviceOptions) ToService(ctx context.Context, apiClient client.N
|
||||||
currentEnv = append(currentEnv, env)
|
currentEnv = append(currentEnv, env)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return currentEnv, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ToService takes the set of flags passed to the command and converts them
|
||||||
|
// into a service spec.
|
||||||
|
//
|
||||||
|
// Takes an API client as the second argument in order to resolve network names
|
||||||
|
// from the flags into network IDs.
|
||||||
|
//
|
||||||
|
// Returns an error if any flags are invalid or contradictory.
|
||||||
|
func (options *serviceOptions) ToService(ctx context.Context, apiClient client.NetworkAPIClient, flags *pflag.FlagSet) (swarm.ServiceSpec, error) {
|
||||||
|
var service swarm.ServiceSpec
|
||||||
|
|
||||||
|
currentEnv, err := options.makeEnv()
|
||||||
|
if err != nil {
|
||||||
|
return service, err
|
||||||
|
}
|
||||||
|
|
||||||
healthConfig, err := options.healthcheck.toHealthConfig()
|
healthConfig, err := options.healthcheck.toHealthConfig()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return service, err
|
return service, err
|
||||||
|
@ -611,6 +656,16 @@ func (options *serviceOptions) ToService(ctx context.Context, apiClient client.N
|
||||||
return service, err
|
return service, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
updateConfig := options.update.updateConfig(flags)
|
||||||
|
rollbackConfig := options.rollback.rollbackConfig(flags)
|
||||||
|
|
||||||
|
// update and rollback configuration is not supported for jobs. If these
|
||||||
|
// flags are not set, then the values will be nil. If they are non-nil,
|
||||||
|
// then return an error.
|
||||||
|
if (serviceMode.ReplicatedJob != nil || serviceMode.GlobalJob != nil) && (updateConfig != nil || rollbackConfig != nil) {
|
||||||
|
return service, errors.Errorf("update and rollback configuration is not supported for jobs")
|
||||||
|
}
|
||||||
|
|
||||||
networks := convertNetworks(options.networks)
|
networks := convertNetworks(options.networks)
|
||||||
for i, net := range networks {
|
for i, net := range networks {
|
||||||
nwID, err := resolveNetworkID(ctx, apiClient, net.Target)
|
nwID, err := resolveNetworkID(ctx, apiClient, net.Target)
|
||||||
|
@ -671,8 +726,8 @@ func (options *serviceOptions) ToService(ctx context.Context, apiClient client.N
|
||||||
LogDriver: options.logDriver.toLogDriver(),
|
LogDriver: options.logDriver.toLogDriver(),
|
||||||
},
|
},
|
||||||
Mode: serviceMode,
|
Mode: serviceMode,
|
||||||
UpdateConfig: options.update.updateConfig(flags),
|
UpdateConfig: updateConfig,
|
||||||
RollbackConfig: options.rollback.rollbackConfig(flags),
|
RollbackConfig: rollbackConfig,
|
||||||
EndpointSpec: options.endpoint.ToEndpointSpec(),
|
EndpointSpec: options.endpoint.ToEndpointSpec(),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -769,6 +824,8 @@ func addServiceFlags(flags *pflag.FlagSet, opts *serviceOptions, defaultFlagValu
|
||||||
|
|
||||||
flags.Var(&opts.stopGrace, flagStopGracePeriod, flagDesc(flagStopGracePeriod, "Time to wait before force killing a container (ns|us|ms|s|m|h)"))
|
flags.Var(&opts.stopGrace, flagStopGracePeriod, flagDesc(flagStopGracePeriod, "Time to wait before force killing a container (ns|us|ms|s|m|h)"))
|
||||||
flags.Var(&opts.replicas, flagReplicas, "Number of tasks")
|
flags.Var(&opts.replicas, flagReplicas, "Number of tasks")
|
||||||
|
flags.Var(&opts.maxConcurrent, flagConcurrent, "Number of job tasks to run concurrently (default equal to --replicas)")
|
||||||
|
flags.SetAnnotation(flagConcurrent, "version", []string{"1.41"})
|
||||||
flags.Uint64Var(&opts.maxReplicas, flagMaxReplicas, defaultFlagValues.getUint64(flagMaxReplicas), "Maximum number of tasks per node (default 0 = unlimited)")
|
flags.Uint64Var(&opts.maxReplicas, flagMaxReplicas, defaultFlagValues.getUint64(flagMaxReplicas), "Maximum number of tasks per node (default 0 = unlimited)")
|
||||||
flags.SetAnnotation(flagMaxReplicas, "version", []string{"1.40"})
|
flags.SetAnnotation(flagMaxReplicas, "version", []string{"1.40"})
|
||||||
|
|
||||||
|
@ -878,6 +935,7 @@ const (
|
||||||
flagLimitCPU = "limit-cpu"
|
flagLimitCPU = "limit-cpu"
|
||||||
flagLimitMemory = "limit-memory"
|
flagLimitMemory = "limit-memory"
|
||||||
flagMaxReplicas = "replicas-max-per-node"
|
flagMaxReplicas = "replicas-max-per-node"
|
||||||
|
flagConcurrent = "max-concurrent"
|
||||||
flagMode = "mode"
|
flagMode = "mode"
|
||||||
flagMount = "mount"
|
flagMount = "mount"
|
||||||
flagMountRemove = "mount-rm"
|
flagMountRemove = "mount-rm"
|
||||||
|
|
|
@ -285,7 +285,7 @@ func TestToServiceMaxReplicasGlobalModeConflict(t *testing.T) {
|
||||||
maxReplicas: 1,
|
maxReplicas: 1,
|
||||||
}
|
}
|
||||||
_, err := opt.ToServiceMode()
|
_, err := opt.ToServiceMode()
|
||||||
assert.Error(t, err, "replicas-max-per-node can only be used with replicated mode")
|
assert.Error(t, err, "replicas-max-per-node can only be used with replicated or replicated-job mode")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestToServiceSysCtls(t *testing.T) {
|
func TestToServiceSysCtls(t *testing.T) {
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -45,6 +46,7 @@ var (
|
||||||
const (
|
const (
|
||||||
maxProgress = 9
|
maxProgress = 9
|
||||||
maxProgressBars = 20
|
maxProgressBars = 20
|
||||||
|
maxJobProgress = 10
|
||||||
)
|
)
|
||||||
|
|
||||||
type progressUpdater interface {
|
type progressUpdater interface {
|
||||||
|
@ -53,7 +55,9 @@ type progressUpdater interface {
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
for state := range numberedStates {
|
for state := range numberedStates {
|
||||||
if !terminalState(state) && len(state) > longestState {
|
// for jobs, we use the "complete" state, and so it should be factored
|
||||||
|
// in to the computation of the longest state.
|
||||||
|
if (!terminalState(state) || state == swarm.TaskStateComplete) && len(state) > longestState {
|
||||||
longestState = len(state)
|
longestState = len(state)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -164,6 +168,18 @@ func ServiceProgress(ctx context.Context, client client.APIClient, serviceID str
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if converged {
|
if converged {
|
||||||
|
// if the service is a job, there's no need to verify it. jobs are
|
||||||
|
// stay done once they're done. skip the verification and just end
|
||||||
|
// the progress monitoring.
|
||||||
|
//
|
||||||
|
// only job services have a non-nil job status, which means we can
|
||||||
|
// use the presence of this field to check if the service is a job
|
||||||
|
// here.
|
||||||
|
if service.JobStatus != nil {
|
||||||
|
progress.Message(progressOut, "", "job complete")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
if convergedAt.IsZero() {
|
if convergedAt.IsZero() {
|
||||||
convergedAt = time.Now()
|
convergedAt = time.Now()
|
||||||
}
|
}
|
||||||
|
@ -230,6 +246,14 @@ func initializeUpdater(service swarm.Service, progressOut progress.Output) (prog
|
||||||
progressOut: progressOut,
|
progressOut: progressOut,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
if service.Spec.Mode.ReplicatedJob != nil {
|
||||||
|
return newReplicatedJobProgressUpdater(service, progressOut), nil
|
||||||
|
}
|
||||||
|
if service.Spec.Mode.GlobalJob != nil {
|
||||||
|
return &globalJobProgressUpdater{
|
||||||
|
progressOut: progressOut,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
return nil, errors.New("unrecognized service mode")
|
return nil, errors.New("unrecognized service mode")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -502,3 +526,322 @@ func (u *globalProgressUpdater) writeTaskProgress(task swarm.Task, nodeCount int
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// replicatedJobProgressUpdater outputs the progress of a replicated job. This
|
||||||
|
// progress consists of a few main elements.
|
||||||
|
//
|
||||||
|
// The first is the progress bar for the job as a whole. This shows the number
|
||||||
|
// of completed out of total tasks for the job. Tasks that are currently
|
||||||
|
// running are not counted.
|
||||||
|
//
|
||||||
|
// The second is the status of the "active" tasks for the job. We count a task
|
||||||
|
// as "active" if it has any non-terminal state, not just running. This is
|
||||||
|
// shown as a fraction of the maximum concurrent tasks that can be running,
|
||||||
|
// which is the less of MaxConcurrent or TotalCompletions - completed tasks.
|
||||||
|
type replicatedJobProgressUpdater struct {
|
||||||
|
progressOut progress.Output
|
||||||
|
|
||||||
|
// jobIteration is the service's job iteration, used to exclude tasks
|
||||||
|
// belonging to earlier iterations.
|
||||||
|
jobIteration uint64
|
||||||
|
|
||||||
|
// concurrent is the value of MaxConcurrent as an int. That is, the maximum
|
||||||
|
// number of tasks allowed to be run simultaneously.
|
||||||
|
concurrent int
|
||||||
|
|
||||||
|
// total is the value of TotalCompletions, the number of complete tasks
|
||||||
|
// desired.
|
||||||
|
total int
|
||||||
|
|
||||||
|
// initialized is set to true after the first time update is called. the
|
||||||
|
// first time update is called, the components of the progress UI are all
|
||||||
|
// written out in an initial pass. this ensure that they will subsequently
|
||||||
|
// be in order, no matter how they are updated.
|
||||||
|
initialized bool
|
||||||
|
|
||||||
|
// progressDigits is the number digits in total, so that we know how much
|
||||||
|
// to pad the job progress field with.
|
||||||
|
//
|
||||||
|
// when we're writing the number of completed over total tasks, we need to
|
||||||
|
// pad the numerator with spaces, so that the bar doesn't jump around.
|
||||||
|
// we'll compute that once on init, and then reuse it over and over.
|
||||||
|
//
|
||||||
|
// we compute this in the least clever way possible: convert to string
|
||||||
|
// with strconv.Itoa, then take the len.
|
||||||
|
progressDigits int
|
||||||
|
|
||||||
|
// activeDigits is the same, but for active tasks, and it applies to both
|
||||||
|
// the numerator and denominator.
|
||||||
|
activeDigits int
|
||||||
|
}
|
||||||
|
|
||||||
|
func newReplicatedJobProgressUpdater(service swarm.Service, progressOut progress.Output) *replicatedJobProgressUpdater {
|
||||||
|
u := &replicatedJobProgressUpdater{
|
||||||
|
progressOut: progressOut,
|
||||||
|
concurrent: int(*service.Spec.Mode.ReplicatedJob.MaxConcurrent),
|
||||||
|
total: int(*service.Spec.Mode.ReplicatedJob.TotalCompletions),
|
||||||
|
jobIteration: service.JobStatus.JobIteration.Index,
|
||||||
|
}
|
||||||
|
u.progressDigits = len(strconv.Itoa(u.total))
|
||||||
|
u.activeDigits = len(strconv.Itoa(u.concurrent))
|
||||||
|
|
||||||
|
return u
|
||||||
|
}
|
||||||
|
|
||||||
|
// update writes out the progress of the replicated job.
|
||||||
|
func (u *replicatedJobProgressUpdater) update(_ swarm.Service, tasks []swarm.Task, _ map[string]struct{}, _ bool) (bool, error) {
|
||||||
|
if !u.initialized {
|
||||||
|
u.writeOverallProgress(0, 0)
|
||||||
|
|
||||||
|
// only write out progress bars if there will be less than the maximum
|
||||||
|
if u.total <= maxProgressBars {
|
||||||
|
for i := 1; i <= u.total; i++ {
|
||||||
|
u.progressOut.WriteProgress(progress.Progress{
|
||||||
|
ID: fmt.Sprintf("%d/%d", i, u.total),
|
||||||
|
Action: " ",
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
u.initialized = true
|
||||||
|
}
|
||||||
|
|
||||||
|
// tasksBySlot is a mapping of slot number to the task valid for that slot.
|
||||||
|
// it deduplicated tasks occupying the same numerical slot but in different
|
||||||
|
// states.
|
||||||
|
tasksBySlot := make(map[int]swarm.Task)
|
||||||
|
for _, task := range tasks {
|
||||||
|
// first, check if the task belongs to this service iteration. skip
|
||||||
|
// tasks belonging to other iterations.
|
||||||
|
if task.JobIteration == nil || task.JobIteration.Index != u.jobIteration {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// then, if the task is in an unknown state, ignore it.
|
||||||
|
if numberedStates[task.DesiredState] == 0 ||
|
||||||
|
numberedStates[task.Status.State] == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// finally, check if the task already exists in the map
|
||||||
|
if existing, ok := tasksBySlot[task.Slot]; ok {
|
||||||
|
// if so, use the task with the lower actual state
|
||||||
|
if numberedStates[existing.Status.State] > numberedStates[task.Status.State] {
|
||||||
|
tasksBySlot[task.Slot] = task
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// otherwise, just add it to the map.
|
||||||
|
tasksBySlot[task.Slot] = task
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
activeTasks := 0
|
||||||
|
completeTasks := 0
|
||||||
|
|
||||||
|
for i := 0; i < len(tasksBySlot); i++ {
|
||||||
|
task := tasksBySlot[i]
|
||||||
|
u.writeTaskProgress(task)
|
||||||
|
|
||||||
|
if numberedStates[task.Status.State] < numberedStates[swarm.TaskStateComplete] {
|
||||||
|
activeTasks++
|
||||||
|
}
|
||||||
|
|
||||||
|
if task.Status.State == swarm.TaskStateComplete {
|
||||||
|
completeTasks++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
u.writeOverallProgress(activeTasks, completeTasks)
|
||||||
|
|
||||||
|
return completeTasks == u.total, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (u *replicatedJobProgressUpdater) writeOverallProgress(active, completed int) {
|
||||||
|
u.progressOut.WriteProgress(progress.Progress{
|
||||||
|
ID: "job progress",
|
||||||
|
Action: fmt.Sprintf(
|
||||||
|
// * means "use the next positional arg to compute padding"
|
||||||
|
"%*d out of %d complete", u.progressDigits, completed, u.total,
|
||||||
|
),
|
||||||
|
Current: int64(completed),
|
||||||
|
Total: int64(u.total),
|
||||||
|
HideCounts: true,
|
||||||
|
})
|
||||||
|
|
||||||
|
// actualDesired is the lesser of MaxConcurrent, or the remaining tasks
|
||||||
|
actualDesired := u.total - completed
|
||||||
|
if actualDesired > u.concurrent {
|
||||||
|
actualDesired = u.concurrent
|
||||||
|
}
|
||||||
|
|
||||||
|
u.progressOut.WriteProgress(progress.Progress{
|
||||||
|
ID: "active tasks",
|
||||||
|
Action: fmt.Sprintf(
|
||||||
|
// [n] notation lets us select a specific argument, 1-indexed
|
||||||
|
// putting the [1] before the star means "make the string this
|
||||||
|
// length". putting the [2] or the [3] means "use this argument
|
||||||
|
// here"
|
||||||
|
//
|
||||||
|
// we pad both the numerator and the denominator because, as the
|
||||||
|
// job reaches its conclusion, the number of possible concurrent
|
||||||
|
// tasks will go down, as fewer than MaxConcurrent tasks are needed
|
||||||
|
// to complete the job.
|
||||||
|
"%[1]*[2]d out of %[1]*[3]d tasks", u.activeDigits, active, actualDesired,
|
||||||
|
),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (u *replicatedJobProgressUpdater) writeTaskProgress(task swarm.Task) {
|
||||||
|
if u.total > maxProgressBars {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if task.Status.Err != "" {
|
||||||
|
u.progressOut.WriteProgress(progress.Progress{
|
||||||
|
ID: fmt.Sprintf("%d/%d", task.Slot+1, u.total),
|
||||||
|
Action: truncError(task.Status.Err),
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
u.progressOut.WriteProgress(progress.Progress{
|
||||||
|
ID: fmt.Sprintf("%d/%d", task.Slot+1, u.total),
|
||||||
|
Action: fmt.Sprintf("%-*s", longestState, task.Status.State),
|
||||||
|
Current: numberedStates[task.Status.State],
|
||||||
|
Total: maxJobProgress,
|
||||||
|
HideCounts: true,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// globalJobProgressUpdater is the progressUpdater for GlobalJob-mode services.
|
||||||
|
// Because GlobalJob services are so much simpler than ReplicatedJob services,
|
||||||
|
// this updater is in turn simpler as well.
|
||||||
|
type globalJobProgressUpdater struct {
|
||||||
|
progressOut progress.Output
|
||||||
|
|
||||||
|
// initialized is used to detect the first pass of update, and to perform
|
||||||
|
// first time initialization logic at that time.
|
||||||
|
initialized bool
|
||||||
|
|
||||||
|
// total is the total number of tasks expected for this job
|
||||||
|
total int
|
||||||
|
|
||||||
|
// progressDigits is the number of spaces to pad the numerator of the job
|
||||||
|
// progress field
|
||||||
|
progressDigits int
|
||||||
|
|
||||||
|
taskNodes map[string]struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (u *globalJobProgressUpdater) update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]struct{}, _ bool) (bool, error) {
|
||||||
|
if !u.initialized {
|
||||||
|
// if there are not yet tasks, then return early.
|
||||||
|
if len(tasks) == 0 && len(activeNodes) != 0 {
|
||||||
|
u.progressOut.WriteProgress(progress.Progress{
|
||||||
|
ID: "job progress",
|
||||||
|
Action: "waiting for tasks",
|
||||||
|
})
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// when a global job starts, all of its tasks are created at once, so
|
||||||
|
// we can use len(tasks) to know how many we're expecting.
|
||||||
|
u.taskNodes = map[string]struct{}{}
|
||||||
|
|
||||||
|
for _, task := range tasks {
|
||||||
|
// skip any tasks not belonging to this job iteration.
|
||||||
|
if task.JobIteration == nil || task.JobIteration.Index != service.JobStatus.JobIteration.Index {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// collect the list of all node IDs for this service.
|
||||||
|
//
|
||||||
|
// basically, global jobs will execute on any new nodes that join
|
||||||
|
// the cluster in the future. to avoid making things complicated,
|
||||||
|
// we will only check the progress of the initial set of nodes. if
|
||||||
|
// any new nodes come online during the operation, we will ignore
|
||||||
|
// them.
|
||||||
|
u.taskNodes[task.NodeID] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
u.total = len(u.taskNodes)
|
||||||
|
u.progressDigits = len(strconv.Itoa(u.total))
|
||||||
|
|
||||||
|
u.writeOverallProgress(0)
|
||||||
|
u.initialized = true
|
||||||
|
}
|
||||||
|
|
||||||
|
// tasksByNodeID maps a NodeID to the latest task for that Node ID. this
|
||||||
|
// lets us pick only the latest task for any given node.
|
||||||
|
tasksByNodeID := map[string]swarm.Task{}
|
||||||
|
|
||||||
|
for _, task := range tasks {
|
||||||
|
// skip any tasks not belonging to this job iteration
|
||||||
|
if task.JobIteration == nil || task.JobIteration.Index != service.JobStatus.JobIteration.Index {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// if the task is not on one of the initial set of nodes, ignore it.
|
||||||
|
if _, ok := u.taskNodes[task.NodeID]; !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// if there is already a task recorded for this node, choose the one
|
||||||
|
// with the lower state
|
||||||
|
if oldtask, ok := tasksByNodeID[task.NodeID]; ok {
|
||||||
|
if numberedStates[oldtask.Status.State] > numberedStates[task.Status.State] {
|
||||||
|
tasksByNodeID[task.NodeID] = task
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
tasksByNodeID[task.NodeID] = task
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
complete := 0
|
||||||
|
for _, task := range tasksByNodeID {
|
||||||
|
u.writeTaskProgress(task)
|
||||||
|
if task.Status.State == swarm.TaskStateComplete {
|
||||||
|
complete++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
u.writeOverallProgress(complete)
|
||||||
|
return complete == u.total, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (u *globalJobProgressUpdater) writeTaskProgress(task swarm.Task) {
|
||||||
|
if u.total > maxProgressBars {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if task.Status.Err != "" {
|
||||||
|
u.progressOut.WriteProgress(progress.Progress{
|
||||||
|
ID: task.NodeID,
|
||||||
|
Action: truncError(task.Status.Err),
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
u.progressOut.WriteProgress(progress.Progress{
|
||||||
|
ID: task.NodeID,
|
||||||
|
Action: fmt.Sprintf("%-*s", longestState, task.Status.State),
|
||||||
|
Current: numberedStates[task.Status.State],
|
||||||
|
Total: maxJobProgress,
|
||||||
|
HideCounts: true,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (u *globalJobProgressUpdater) writeOverallProgress(complete int) {
|
||||||
|
// all tasks for a global job are active at once, so we only write out the
|
||||||
|
// total progress.
|
||||||
|
u.progressOut.WriteProgress(progress.Progress{
|
||||||
|
// see (*replicatedJobProgressUpdater).writeOverallProgress for an
|
||||||
|
// explanation fo the advanced fmt use in this function.
|
||||||
|
ID: "job progress",
|
||||||
|
Action: fmt.Sprintf(
|
||||||
|
"%*d out of %d complete", u.progressDigits, complete, u.total,
|
||||||
|
),
|
||||||
|
Current: int64(complete),
|
||||||
|
Total: int64(u.total),
|
||||||
|
HideCounts: true,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
|
@ -42,6 +42,20 @@ func (u updaterTester) testUpdater(tasks []swarm.Task, expectedConvergence bool,
|
||||||
assert.Check(u.t, is.DeepEqual(expectedProgress, u.p.p))
|
assert.Check(u.t, is.DeepEqual(expectedProgress, u.p.p))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (u updaterTester) testUpdaterNoOrder(tasks []swarm.Task, expectedConvergence bool, expectedProgress []progress.Progress) {
|
||||||
|
u.p.clear()
|
||||||
|
converged, err := u.updater.update(u.service, tasks, u.activeNodes, u.rollback)
|
||||||
|
assert.Check(u.t, err)
|
||||||
|
assert.Check(u.t, is.Equal(expectedConvergence, converged))
|
||||||
|
|
||||||
|
// instead of checking that expected and actual match exactly, verify that
|
||||||
|
// they are the same length, and every time from actual is in expected.
|
||||||
|
assert.Check(u.t, is.Equal(len(expectedProgress), len(u.p.p)))
|
||||||
|
for _, prog := range expectedProgress {
|
||||||
|
assert.Check(u.t, is.Contains(u.p.p, prog))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestReplicatedProgressUpdaterOneReplica(t *testing.T) {
|
func TestReplicatedProgressUpdaterOneReplica(t *testing.T) {
|
||||||
replicas := uint64(1)
|
replicas := uint64(1)
|
||||||
|
|
||||||
|
@ -373,3 +387,511 @@ func TestGlobalProgressUpdaterManyNodes(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestReplicatedJobProgressUpdaterSmall(t *testing.T) {
|
||||||
|
concurrent := uint64(2)
|
||||||
|
total := uint64(5)
|
||||||
|
|
||||||
|
service := swarm.Service{
|
||||||
|
Spec: swarm.ServiceSpec{
|
||||||
|
Mode: swarm.ServiceMode{
|
||||||
|
ReplicatedJob: &swarm.ReplicatedJob{
|
||||||
|
MaxConcurrent: &concurrent,
|
||||||
|
TotalCompletions: &total,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
JobStatus: &swarm.JobStatus{
|
||||||
|
JobIteration: swarm.Version{Index: 1},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
p := &mockProgress{}
|
||||||
|
ut := updaterTester{
|
||||||
|
t: t,
|
||||||
|
updater: newReplicatedJobProgressUpdater(service, p),
|
||||||
|
p: p,
|
||||||
|
activeNodes: map[string]struct{}{"a": {}, "b": {}},
|
||||||
|
service: service,
|
||||||
|
}
|
||||||
|
|
||||||
|
// create some tasks belonging to a previous iteration
|
||||||
|
tasks := []swarm.Task{
|
||||||
|
{
|
||||||
|
ID: "oldtask1",
|
||||||
|
Slot: 0,
|
||||||
|
NodeID: "",
|
||||||
|
DesiredState: swarm.TaskStateComplete,
|
||||||
|
Status: swarm.TaskStatus{State: swarm.TaskStateNew},
|
||||||
|
JobIteration: &swarm.Version{Index: 0},
|
||||||
|
}, {
|
||||||
|
ID: "oldtask2",
|
||||||
|
Slot: 1,
|
||||||
|
NodeID: "",
|
||||||
|
DesiredState: swarm.TaskStateComplete,
|
||||||
|
Status: swarm.TaskStatus{State: swarm.TaskStateComplete},
|
||||||
|
JobIteration: &swarm.Version{Index: 0},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
ut.testUpdater(tasks, false, []progress.Progress{
|
||||||
|
// on the initial pass, we draw all of the progress bars at once, which
|
||||||
|
// puts them in order for the rest of the operation
|
||||||
|
{ID: "job progress", Action: "0 out of 5 complete", Current: 0, Total: 5, HideCounts: true},
|
||||||
|
{ID: "active tasks", Action: "0 out of 2 tasks"},
|
||||||
|
{ID: "1/5", Action: " "},
|
||||||
|
{ID: "2/5", Action: " "},
|
||||||
|
{ID: "3/5", Action: " "},
|
||||||
|
{ID: "4/5", Action: " "},
|
||||||
|
{ID: "5/5", Action: " "},
|
||||||
|
// from here on, we draw as normal. as a side effect, we will have a
|
||||||
|
// second update for the job progress and active tasks. This has no
|
||||||
|
// practical effect on the UI, it's just a side effect of the update
|
||||||
|
// logic.
|
||||||
|
{ID: "job progress", Action: "0 out of 5 complete", Current: 0, Total: 5, HideCounts: true},
|
||||||
|
{ID: "active tasks", Action: "0 out of 2 tasks"},
|
||||||
|
})
|
||||||
|
|
||||||
|
// wipe the old tasks out of the list
|
||||||
|
tasks = []swarm.Task{}
|
||||||
|
tasks = append(tasks,
|
||||||
|
swarm.Task{
|
||||||
|
ID: "task1",
|
||||||
|
Slot: 0,
|
||||||
|
NodeID: "",
|
||||||
|
DesiredState: swarm.TaskStateComplete,
|
||||||
|
Status: swarm.TaskStatus{State: swarm.TaskStateNew},
|
||||||
|
JobIteration: &swarm.Version{Index: service.JobStatus.JobIteration.Index},
|
||||||
|
},
|
||||||
|
swarm.Task{
|
||||||
|
ID: "task2",
|
||||||
|
Slot: 1,
|
||||||
|
NodeID: "",
|
||||||
|
DesiredState: swarm.TaskStateComplete,
|
||||||
|
Status: swarm.TaskStatus{State: swarm.TaskStateNew},
|
||||||
|
JobIteration: &swarm.Version{Index: service.JobStatus.JobIteration.Index},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
ut.testUpdater(tasks, false, []progress.Progress{
|
||||||
|
{ID: "1/5", Action: "new ", Current: 1, Total: 10, HideCounts: true},
|
||||||
|
{ID: "2/5", Action: "new ", Current: 1, Total: 10, HideCounts: true},
|
||||||
|
{ID: "job progress", Action: "0 out of 5 complete", Current: 0, Total: 5, HideCounts: true},
|
||||||
|
{ID: "active tasks", Action: "2 out of 2 tasks"},
|
||||||
|
})
|
||||||
|
|
||||||
|
tasks[0].Status.State = swarm.TaskStatePreparing
|
||||||
|
tasks[1].Status.State = swarm.TaskStateAssigned
|
||||||
|
ut.testUpdater(tasks, false, []progress.Progress{
|
||||||
|
{ID: "1/5", Action: "preparing", Current: 6, Total: 10, HideCounts: true},
|
||||||
|
{ID: "2/5", Action: "assigned ", Current: 4, Total: 10, HideCounts: true},
|
||||||
|
{ID: "job progress", Action: "0 out of 5 complete", Current: 0, Total: 5, HideCounts: true},
|
||||||
|
{ID: "active tasks", Action: "2 out of 2 tasks"},
|
||||||
|
})
|
||||||
|
|
||||||
|
tasks[0].Status.State = swarm.TaskStateRunning
|
||||||
|
tasks[1].Status.State = swarm.TaskStatePreparing
|
||||||
|
ut.testUpdater(tasks, false, []progress.Progress{
|
||||||
|
{ID: "1/5", Action: "running ", Current: 9, Total: 10, HideCounts: true},
|
||||||
|
{ID: "2/5", Action: "preparing", Current: 6, Total: 10, HideCounts: true},
|
||||||
|
{ID: "job progress", Action: "0 out of 5 complete", Current: 0, Total: 5, HideCounts: true},
|
||||||
|
{ID: "active tasks", Action: "2 out of 2 tasks"},
|
||||||
|
})
|
||||||
|
|
||||||
|
tasks[0].Status.State = swarm.TaskStateComplete
|
||||||
|
tasks[1].Status.State = swarm.TaskStateComplete
|
||||||
|
ut.testUpdater(tasks, false, []progress.Progress{
|
||||||
|
{ID: "1/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||||
|
{ID: "2/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||||
|
{ID: "job progress", Action: "2 out of 5 complete", Current: 2, Total: 5, HideCounts: true},
|
||||||
|
{ID: "active tasks", Action: "0 out of 2 tasks"},
|
||||||
|
})
|
||||||
|
|
||||||
|
tasks = append(tasks,
|
||||||
|
swarm.Task{
|
||||||
|
ID: "task3",
|
||||||
|
Slot: 2,
|
||||||
|
NodeID: "",
|
||||||
|
DesiredState: swarm.TaskStateComplete,
|
||||||
|
Status: swarm.TaskStatus{State: swarm.TaskStateNew},
|
||||||
|
JobIteration: &swarm.Version{Index: service.JobStatus.JobIteration.Index},
|
||||||
|
},
|
||||||
|
swarm.Task{
|
||||||
|
ID: "task4",
|
||||||
|
Slot: 3,
|
||||||
|
NodeID: "",
|
||||||
|
DesiredState: swarm.TaskStateComplete,
|
||||||
|
Status: swarm.TaskStatus{State: swarm.TaskStateNew},
|
||||||
|
JobIteration: &swarm.Version{Index: service.JobStatus.JobIteration.Index},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
ut.testUpdater(tasks, false, []progress.Progress{
|
||||||
|
{ID: "1/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||||
|
{ID: "2/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||||
|
{ID: "3/5", Action: "new ", Current: 1, Total: 10, HideCounts: true},
|
||||||
|
{ID: "4/5", Action: "new ", Current: 1, Total: 10, HideCounts: true},
|
||||||
|
{ID: "job progress", Action: "2 out of 5 complete", Current: 2, Total: 5, HideCounts: true},
|
||||||
|
{ID: "active tasks", Action: "2 out of 2 tasks"},
|
||||||
|
})
|
||||||
|
|
||||||
|
tasks[2].Status.State = swarm.TaskStateRunning
|
||||||
|
tasks[3].Status.State = swarm.TaskStateRunning
|
||||||
|
ut.testUpdater(tasks, false, []progress.Progress{
|
||||||
|
{ID: "1/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||||
|
{ID: "2/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||||
|
{ID: "3/5", Action: "running ", Current: 9, Total: 10, HideCounts: true},
|
||||||
|
{ID: "4/5", Action: "running ", Current: 9, Total: 10, HideCounts: true},
|
||||||
|
{ID: "job progress", Action: "2 out of 5 complete", Current: 2, Total: 5, HideCounts: true},
|
||||||
|
{ID: "active tasks", Action: "2 out of 2 tasks"},
|
||||||
|
})
|
||||||
|
|
||||||
|
tasks[3].Status.State = swarm.TaskStateComplete
|
||||||
|
tasks = append(tasks,
|
||||||
|
swarm.Task{
|
||||||
|
ID: "task5",
|
||||||
|
Slot: 4,
|
||||||
|
NodeID: "",
|
||||||
|
DesiredState: swarm.TaskStateComplete,
|
||||||
|
Status: swarm.TaskStatus{State: swarm.TaskStateRunning},
|
||||||
|
JobIteration: &swarm.Version{Index: service.JobStatus.JobIteration.Index},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
ut.testUpdater(tasks, false, []progress.Progress{
|
||||||
|
{ID: "1/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||||
|
{ID: "2/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||||
|
{ID: "3/5", Action: "running ", Current: 9, Total: 10, HideCounts: true},
|
||||||
|
{ID: "4/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||||
|
{ID: "5/5", Action: "running ", Current: 9, Total: 10, HideCounts: true},
|
||||||
|
{ID: "job progress", Action: "3 out of 5 complete", Current: 3, Total: 5, HideCounts: true},
|
||||||
|
{ID: "active tasks", Action: "2 out of 2 tasks"},
|
||||||
|
})
|
||||||
|
|
||||||
|
tasks[2].Status.State = swarm.TaskStateFailed
|
||||||
|
tasks[2].Status.Err = "the task failed"
|
||||||
|
ut.testUpdater(tasks, false, []progress.Progress{
|
||||||
|
{ID: "1/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||||
|
{ID: "2/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||||
|
{ID: "3/5", Action: "the task failed"},
|
||||||
|
{ID: "4/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||||
|
{ID: "5/5", Action: "running ", Current: 9, Total: 10, HideCounts: true},
|
||||||
|
{ID: "job progress", Action: "3 out of 5 complete", Current: 3, Total: 5, HideCounts: true},
|
||||||
|
{ID: "active tasks", Action: "1 out of 2 tasks"},
|
||||||
|
})
|
||||||
|
|
||||||
|
tasks[4].Status.State = swarm.TaskStateComplete
|
||||||
|
tasks = append(tasks,
|
||||||
|
swarm.Task{
|
||||||
|
ID: "task6",
|
||||||
|
Slot: 2,
|
||||||
|
NodeID: "",
|
||||||
|
DesiredState: swarm.TaskStateComplete,
|
||||||
|
Status: swarm.TaskStatus{State: swarm.TaskStateRunning},
|
||||||
|
JobIteration: &swarm.Version{Index: service.JobStatus.JobIteration.Index},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
ut.testUpdater(tasks, false, []progress.Progress{
|
||||||
|
{ID: "1/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||||
|
{ID: "2/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||||
|
{ID: "3/5", Action: "running ", Current: 9, Total: 10, HideCounts: true},
|
||||||
|
{ID: "4/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||||
|
{ID: "5/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||||
|
{ID: "job progress", Action: "4 out of 5 complete", Current: 4, Total: 5, HideCounts: true},
|
||||||
|
{ID: "active tasks", Action: "1 out of 1 tasks"},
|
||||||
|
})
|
||||||
|
|
||||||
|
tasks[5].Status.State = swarm.TaskStateComplete
|
||||||
|
ut.testUpdater(tasks, true, []progress.Progress{
|
||||||
|
{ID: "1/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||||
|
{ID: "2/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||||
|
{ID: "3/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||||
|
{ID: "4/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||||
|
{ID: "5/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||||
|
{ID: "job progress", Action: "5 out of 5 complete", Current: 5, Total: 5, HideCounts: true},
|
||||||
|
{ID: "active tasks", Action: "0 out of 0 tasks"},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReplicatedJobProgressUpdaterLarge(t *testing.T) {
|
||||||
|
concurrent := uint64(10)
|
||||||
|
total := uint64(50)
|
||||||
|
|
||||||
|
service := swarm.Service{
|
||||||
|
Spec: swarm.ServiceSpec{
|
||||||
|
Mode: swarm.ServiceMode{
|
||||||
|
ReplicatedJob: &swarm.ReplicatedJob{
|
||||||
|
MaxConcurrent: &concurrent,
|
||||||
|
TotalCompletions: &total,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
JobStatus: &swarm.JobStatus{
|
||||||
|
JobIteration: swarm.Version{Index: 0},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
p := &mockProgress{}
|
||||||
|
ut := updaterTester{
|
||||||
|
t: t,
|
||||||
|
updater: newReplicatedJobProgressUpdater(service, p),
|
||||||
|
p: p,
|
||||||
|
activeNodes: map[string]struct{}{"a": {}, "b": {}},
|
||||||
|
service: service,
|
||||||
|
}
|
||||||
|
|
||||||
|
tasks := []swarm.Task{}
|
||||||
|
|
||||||
|
// see the comments in TestReplicatedJobProgressUpdaterSmall for why
|
||||||
|
// we write this out twice.
|
||||||
|
ut.testUpdater(tasks, false, []progress.Progress{
|
||||||
|
{ID: "job progress", Action: " 0 out of 50 complete", Current: 0, Total: 50, HideCounts: true},
|
||||||
|
{ID: "active tasks", Action: " 0 out of 10 tasks"},
|
||||||
|
// we don't write out individual status bars for a large job, only the
|
||||||
|
// overall progress bar
|
||||||
|
{ID: "job progress", Action: " 0 out of 50 complete", Current: 0, Total: 50, HideCounts: true},
|
||||||
|
{ID: "active tasks", Action: " 0 out of 10 tasks"},
|
||||||
|
})
|
||||||
|
|
||||||
|
// first, create the initial batch of running tasks
|
||||||
|
for i := 0; i < int(concurrent); i++ {
|
||||||
|
tasks = append(tasks, swarm.Task{
|
||||||
|
ID: strconv.Itoa(i),
|
||||||
|
Slot: i,
|
||||||
|
DesiredState: swarm.TaskStateComplete,
|
||||||
|
Status: swarm.TaskStatus{State: swarm.TaskStateNew},
|
||||||
|
JobIteration: &swarm.Version{Index: 0},
|
||||||
|
})
|
||||||
|
|
||||||
|
ut.testUpdater(tasks, false, []progress.Progress{
|
||||||
|
{ID: "job progress", Action: " 0 out of 50 complete", Current: 0, Total: 50, HideCounts: true},
|
||||||
|
{ID: "active tasks", Action: fmt.Sprintf("%2d out of 10 tasks", i+1)},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// now, start moving tasks to completed, and starting new tasks after them.
|
||||||
|
// to do this, we'll start at 0, mark a task complete, and then append a
|
||||||
|
// new one. we'll stop before we get to the end, because the end has a
|
||||||
|
// steadily decreasing denominator for the active tasks
|
||||||
|
//
|
||||||
|
// for 10 concurrent 50 total, this means we'll stop at 50 - 10 = 40 tasks
|
||||||
|
// in the completed state, 10 tasks running. the last index in use will be
|
||||||
|
// 39.
|
||||||
|
for i := 0; i < int(total)-int(concurrent); i++ {
|
||||||
|
tasks[i].Status.State = swarm.TaskStateComplete
|
||||||
|
ut.testUpdater(tasks, false, []progress.Progress{
|
||||||
|
{ID: "job progress", Action: fmt.Sprintf("%2d out of 50 complete", i+1), Current: int64(i + 1), Total: 50, HideCounts: true},
|
||||||
|
{ID: "active tasks", Action: " 9 out of 10 tasks"},
|
||||||
|
})
|
||||||
|
|
||||||
|
last := len(tasks)
|
||||||
|
tasks = append(tasks, swarm.Task{
|
||||||
|
ID: strconv.Itoa(last),
|
||||||
|
Slot: last,
|
||||||
|
DesiredState: swarm.TaskStateComplete,
|
||||||
|
Status: swarm.TaskStatus{State: swarm.TaskStateNew},
|
||||||
|
JobIteration: &swarm.Version{Index: 0},
|
||||||
|
})
|
||||||
|
|
||||||
|
ut.testUpdater(tasks, false, []progress.Progress{
|
||||||
|
{ID: "job progress", Action: fmt.Sprintf("%2d out of 50 complete", i+1), Current: int64(i + 1), Total: 50, HideCounts: true},
|
||||||
|
{ID: "active tasks", Action: "10 out of 10 tasks"},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// quick check, to make sure we did the math right when we wrote this code:
|
||||||
|
// we do have 50 tasks in the slice, right?
|
||||||
|
assert.Check(t, is.Equal(len(tasks), int(total)))
|
||||||
|
|
||||||
|
// now, we're down to our last 10 tasks, which are all running. We need to
|
||||||
|
// wind these down
|
||||||
|
for i := int(total) - int(concurrent) - 1; i < int(total); i++ {
|
||||||
|
tasks[i].Status.State = swarm.TaskStateComplete
|
||||||
|
ut.testUpdater(tasks, (i+1 == int(total)), []progress.Progress{
|
||||||
|
{ID: "job progress", Action: fmt.Sprintf("%2d out of 50 complete", i+1), Current: int64(i + 1), Total: 50, HideCounts: true},
|
||||||
|
{ID: "active tasks", Action: fmt.Sprintf("%2[1]d out of %2[1]d tasks", int(total)-(i+1))},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGlobalJobProgressUpdaterSmall(t *testing.T) {
|
||||||
|
service := swarm.Service{
|
||||||
|
Spec: swarm.ServiceSpec{
|
||||||
|
Mode: swarm.ServiceMode{
|
||||||
|
GlobalJob: &swarm.GlobalJob{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
JobStatus: &swarm.JobStatus{
|
||||||
|
JobIteration: swarm.Version{Index: 1},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
p := &mockProgress{}
|
||||||
|
ut := updaterTester{
|
||||||
|
t: t,
|
||||||
|
updater: &globalJobProgressUpdater{
|
||||||
|
progressOut: p,
|
||||||
|
},
|
||||||
|
p: p,
|
||||||
|
activeNodes: map[string]struct{}{"a": {}, "b": {}, "c": {}},
|
||||||
|
service: service,
|
||||||
|
}
|
||||||
|
|
||||||
|
tasks := []swarm.Task{
|
||||||
|
{
|
||||||
|
ID: "oldtask1",
|
||||||
|
DesiredState: swarm.TaskStateComplete,
|
||||||
|
Status: swarm.TaskStatus{State: swarm.TaskStateComplete},
|
||||||
|
JobIteration: &swarm.Version{Index: 0},
|
||||||
|
NodeID: "a",
|
||||||
|
}, {
|
||||||
|
ID: "oldtask2",
|
||||||
|
DesiredState: swarm.TaskStateComplete,
|
||||||
|
Status: swarm.TaskStatus{State: swarm.TaskStateComplete},
|
||||||
|
JobIteration: &swarm.Version{Index: 0},
|
||||||
|
NodeID: "b",
|
||||||
|
}, {
|
||||||
|
ID: "task1",
|
||||||
|
DesiredState: swarm.TaskStateComplete,
|
||||||
|
Status: swarm.TaskStatus{State: swarm.TaskStateNew},
|
||||||
|
JobIteration: &swarm.Version{Index: 1},
|
||||||
|
NodeID: "a",
|
||||||
|
}, {
|
||||||
|
ID: "task2",
|
||||||
|
DesiredState: swarm.TaskStateComplete,
|
||||||
|
Status: swarm.TaskStatus{State: swarm.TaskStateNew},
|
||||||
|
JobIteration: &swarm.Version{Index: 1},
|
||||||
|
NodeID: "b",
|
||||||
|
}, {
|
||||||
|
ID: "task3",
|
||||||
|
DesiredState: swarm.TaskStateComplete,
|
||||||
|
Status: swarm.TaskStatus{State: swarm.TaskStateNew},
|
||||||
|
JobIteration: &swarm.Version{Index: 1},
|
||||||
|
NodeID: "c",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// we don't know how many tasks will be created until we get the initial
|
||||||
|
// task list, so we should not write out any definitive answers yet.
|
||||||
|
ut.testUpdater([]swarm.Task{}, false, []progress.Progress{
|
||||||
|
{ID: "job progress", Action: "waiting for tasks"},
|
||||||
|
})
|
||||||
|
|
||||||
|
ut.testUpdaterNoOrder(tasks, false, []progress.Progress{
|
||||||
|
{ID: "job progress", Action: "0 out of 3 complete", Current: 0, Total: 3, HideCounts: true},
|
||||||
|
{ID: "a", Action: "new ", Current: 1, Total: 10, HideCounts: true},
|
||||||
|
{ID: "b", Action: "new ", Current: 1, Total: 10, HideCounts: true},
|
||||||
|
{ID: "c", Action: "new ", Current: 1, Total: 10, HideCounts: true},
|
||||||
|
{ID: "job progress", Action: "0 out of 3 complete", Current: 0, Total: 3, HideCounts: true},
|
||||||
|
})
|
||||||
|
|
||||||
|
tasks[2].Status.State = swarm.TaskStatePreparing
|
||||||
|
tasks[3].Status.State = swarm.TaskStateRunning
|
||||||
|
tasks[4].Status.State = swarm.TaskStateAccepted
|
||||||
|
ut.testUpdaterNoOrder(tasks, false, []progress.Progress{
|
||||||
|
{ID: "a", Action: "preparing", Current: 6, Total: 10, HideCounts: true},
|
||||||
|
{ID: "b", Action: "running ", Current: 9, Total: 10, HideCounts: true},
|
||||||
|
{ID: "c", Action: "accepted ", Current: 5, Total: 10, HideCounts: true},
|
||||||
|
{ID: "job progress", Action: "0 out of 3 complete", Current: 0, Total: 3, HideCounts: true},
|
||||||
|
})
|
||||||
|
|
||||||
|
tasks[2].Status.State = swarm.TaskStateRunning
|
||||||
|
tasks[3].Status.State = swarm.TaskStateComplete
|
||||||
|
tasks[4].Status.State = swarm.TaskStateRunning
|
||||||
|
ut.testUpdaterNoOrder(tasks, false, []progress.Progress{
|
||||||
|
{ID: "a", Action: "running ", Current: 9, Total: 10, HideCounts: true},
|
||||||
|
{ID: "b", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||||
|
{ID: "c", Action: "running ", Current: 9, Total: 10, HideCounts: true},
|
||||||
|
{ID: "job progress", Action: "1 out of 3 complete", Current: 1, Total: 3, HideCounts: true},
|
||||||
|
})
|
||||||
|
|
||||||
|
tasks[2].Status.State = swarm.TaskStateFailed
|
||||||
|
tasks[2].Status.Err = "task failed"
|
||||||
|
tasks[4].Status.State = swarm.TaskStateComplete
|
||||||
|
ut.testUpdaterNoOrder(tasks, false, []progress.Progress{
|
||||||
|
{ID: "a", Action: "task failed"},
|
||||||
|
{ID: "b", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||||
|
{ID: "c", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||||
|
{ID: "job progress", Action: "2 out of 3 complete", Current: 2, Total: 3, HideCounts: true},
|
||||||
|
})
|
||||||
|
|
||||||
|
tasks = append(tasks, swarm.Task{
|
||||||
|
ID: "task4",
|
||||||
|
DesiredState: swarm.TaskStateComplete,
|
||||||
|
Status: swarm.TaskStatus{State: swarm.TaskStatePreparing},
|
||||||
|
NodeID: tasks[2].NodeID,
|
||||||
|
JobIteration: &swarm.Version{Index: 1},
|
||||||
|
})
|
||||||
|
|
||||||
|
ut.testUpdaterNoOrder(tasks, false, []progress.Progress{
|
||||||
|
{ID: "a", Action: "preparing", Current: 6, Total: 10, HideCounts: true},
|
||||||
|
{ID: "b", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||||
|
{ID: "c", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||||
|
{ID: "job progress", Action: "2 out of 3 complete", Current: 2, Total: 3, HideCounts: true},
|
||||||
|
})
|
||||||
|
|
||||||
|
tasks[5].Status.State = swarm.TaskStateComplete
|
||||||
|
ut.testUpdaterNoOrder(tasks, true, []progress.Progress{
|
||||||
|
{ID: "a", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||||
|
{ID: "b", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||||
|
{ID: "c", Action: "complete ", Current: 10, Total: 10, HideCounts: true},
|
||||||
|
{ID: "job progress", Action: "3 out of 3 complete", Current: 3, Total: 3, HideCounts: true},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGlobalJobProgressUpdaterLarge(t *testing.T) {
|
||||||
|
service := swarm.Service{
|
||||||
|
Spec: swarm.ServiceSpec{
|
||||||
|
Mode: swarm.ServiceMode{
|
||||||
|
GlobalJob: &swarm.GlobalJob{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
JobStatus: &swarm.JobStatus{
|
||||||
|
JobIteration: swarm.Version{Index: 1},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
activeNodes := map[string]struct{}{}
|
||||||
|
for i := 0; i < 50; i++ {
|
||||||
|
activeNodes[fmt.Sprintf("node%v", i)] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
p := &mockProgress{}
|
||||||
|
ut := updaterTester{
|
||||||
|
t: t,
|
||||||
|
updater: &globalJobProgressUpdater{
|
||||||
|
progressOut: p,
|
||||||
|
},
|
||||||
|
p: p,
|
||||||
|
activeNodes: activeNodes,
|
||||||
|
service: service,
|
||||||
|
}
|
||||||
|
|
||||||
|
tasks := []swarm.Task{}
|
||||||
|
for nodeID := range activeNodes {
|
||||||
|
tasks = append(tasks, swarm.Task{
|
||||||
|
ID: fmt.Sprintf("task%s", nodeID),
|
||||||
|
NodeID: nodeID,
|
||||||
|
DesiredState: swarm.TaskStateComplete,
|
||||||
|
Status: swarm.TaskStatus{
|
||||||
|
State: swarm.TaskStateNew,
|
||||||
|
},
|
||||||
|
JobIteration: &swarm.Version{Index: 1},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// no bars, because too many tasks
|
||||||
|
ut.testUpdater(tasks, false, []progress.Progress{
|
||||||
|
{ID: "job progress", Action: " 0 out of 50 complete", Current: 0, Total: 50, HideCounts: true},
|
||||||
|
{ID: "job progress", Action: " 0 out of 50 complete", Current: 0, Total: 50, HideCounts: true},
|
||||||
|
})
|
||||||
|
|
||||||
|
for i := range tasks {
|
||||||
|
tasks[i].Status.State = swarm.TaskStateComplete
|
||||||
|
ut.testUpdater(tasks, i+1 == len(activeNodes), []progress.Progress{
|
||||||
|
{
|
||||||
|
ID: "job progress",
|
||||||
|
Action: fmt.Sprintf("%2d out of 50 complete", i+1),
|
||||||
|
Current: int64(i + 1), Total: 50, HideCounts: true,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -102,12 +102,14 @@ func runServiceScale(ctx context.Context, dockerCli command.Cli, serviceID strin
|
||||||
}
|
}
|
||||||
|
|
||||||
serviceMode := &service.Spec.Mode
|
serviceMode := &service.Spec.Mode
|
||||||
if serviceMode.Replicated == nil {
|
if serviceMode.Replicated != nil {
|
||||||
return errors.Errorf("scale can only be used with replicated mode")
|
serviceMode.Replicated.Replicas = &scale
|
||||||
|
} else if serviceMode.ReplicatedJob != nil {
|
||||||
|
serviceMode.ReplicatedJob.TotalCompletions = &scale
|
||||||
|
} else {
|
||||||
|
return errors.Errorf("scale can only be used with replicated or replicated-job mode")
|
||||||
}
|
}
|
||||||
|
|
||||||
serviceMode.Replicated.Replicas = &scale
|
|
||||||
|
|
||||||
response, err := client.ServiceUpdate(ctx, service.ID, service.Version, service.Spec, types.ServiceUpdateOptions{})
|
response, err := client.ServiceUpdate(ctx, service.ID, service.Version, service.Spec, types.ServiceUpdateOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -26,3 +26,17 @@ replicas: 2/3 (max 1 per node)
|
||||||
image:
|
image:
|
||||||
ports:
|
ports:
|
||||||
|
|
||||||
|
id: 05_job1
|
||||||
|
name: zarp1
|
||||||
|
mode: replicated job
|
||||||
|
replicas: 2/3 (5/10 completed)
|
||||||
|
image:
|
||||||
|
ports:
|
||||||
|
|
||||||
|
id: 06_job2
|
||||||
|
name: zarp2
|
||||||
|
mode: global job
|
||||||
|
replicas: 1/1 (3/4 completed)
|
||||||
|
image:
|
||||||
|
ports:
|
||||||
|
|
||||||
|
|
|
@ -41,7 +41,8 @@ Options:
|
||||||
--limit-memory bytes Limit Memory
|
--limit-memory bytes Limit Memory
|
||||||
--log-driver string Logging driver for service
|
--log-driver string Logging driver for service
|
||||||
--log-opt list Logging driver options
|
--log-opt list Logging driver options
|
||||||
--mode string Service mode (replicated or global) (default "replicated")
|
--max-concurrent Number of job tasks to run at once (default equal to --replicas)
|
||||||
|
--mode string Service mode (replicated, global, replicated-job, or global-job) (default "replicated")
|
||||||
--mount mount Attach a filesystem mount to the service
|
--mount mount Attach a filesystem mount to the service
|
||||||
--name string Service name
|
--name string Service name
|
||||||
--network network Network attachments
|
--network network Network attachments
|
||||||
|
@ -1115,6 +1116,66 @@ $ docker service create \
|
||||||
nvidia/cuda
|
nvidia/cuda
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### Running as a job
|
||||||
|
|
||||||
|
Jobs are a special kind of service designed to run an operation to completion
|
||||||
|
and then stop, as opposed to running long-running daemons. When a Task
|
||||||
|
belonging to a job exits successfully (return value 0), the Task is marked as
|
||||||
|
"Completed", and is not run again.
|
||||||
|
|
||||||
|
Jobs are started by using one of two modes, `replicated-job` or `global-job`
|
||||||
|
|
||||||
|
```bash
|
||||||
|
$ docker service create --name myjob \
|
||||||
|
--mode replicated-job \
|
||||||
|
bash "true"
|
||||||
|
```
|
||||||
|
|
||||||
|
This command will run one Task, which will, using the `bash` image, execute the
|
||||||
|
command `true`, which will return 0 and then exit.
|
||||||
|
|
||||||
|
Though Jobs are ultimately a different kind of service, they a couple of
|
||||||
|
caveats compared to other services:
|
||||||
|
|
||||||
|
- None of the update or rollback configuration options are valid. Jobs can be
|
||||||
|
updated, but cannot be rolled out or rolled back, making these configuration
|
||||||
|
options moot.
|
||||||
|
- Jobs are never restarted on reaching the `Complete` state. This means that
|
||||||
|
for jobs, setting `--restart-condition` to `any` is the same as setting it to
|
||||||
|
`on-failure`.
|
||||||
|
|
||||||
|
Jobs are available in both replicated and global modes.
|
||||||
|
|
||||||
|
#### Replicated Jobs
|
||||||
|
|
||||||
|
A replicated job is like a replicated service. Setting the `--replicas` flag
|
||||||
|
will specify total number of iterations of a job to execute.
|
||||||
|
|
||||||
|
By default, all replicas of a replicated job will launch at once. To control
|
||||||
|
the total number of replicas that are executing simultaneously at any one time,
|
||||||
|
the `--max-concurrent` flag can be used:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
$ docker service create --name mythrottledjob \
|
||||||
|
--mode replicated-job \
|
||||||
|
--replicas 10 \
|
||||||
|
--max-concurrent 2 \
|
||||||
|
bash "true"
|
||||||
|
```
|
||||||
|
|
||||||
|
The above command will execute 10 Tasks in total, but only 2 of them will be
|
||||||
|
run at any given time.
|
||||||
|
|
||||||
|
#### Global Jobs
|
||||||
|
|
||||||
|
Global jobs are like global services, in that a Task is executed once on each node
|
||||||
|
matching placement constraints. Global jobs are represented by the mode `global-job`.
|
||||||
|
|
||||||
|
Note that after a Global job is created, any new Nodes added to the cluster
|
||||||
|
will have a Task from that job started on them. The Global Job does not as a
|
||||||
|
whole have a "done" state, except insofar as every Node meeting the job's
|
||||||
|
constraints has a Completed task.
|
||||||
|
|
||||||
## Related commands
|
## Related commands
|
||||||
|
|
||||||
* [service inspect](service_inspect.md)
|
* [service inspect](service_inspect.md)
|
||||||
|
|
|
@ -39,14 +39,17 @@ On a manager node:
|
||||||
```bash
|
```bash
|
||||||
$ docker service ls
|
$ docker service ls
|
||||||
|
|
||||||
ID NAME MODE REPLICAS IMAGE
|
ID NAME MODE REPLICAS IMAGE
|
||||||
c8wgl7q4ndfd frontend replicated 5/5 nginx:alpine
|
c8wgl7q4ndfd frontend replicated 5/5 nginx:alpine
|
||||||
dmu1ept4cxcf redis replicated 3/3 redis:3.0.6
|
dmu1ept4cxcf redis replicated 3/3 redis:3.0.6
|
||||||
iwe3278osahj mongo global 7/7 mongo:3.3
|
iwe3278osahj mongo global 7/7 mongo:3.3
|
||||||
|
hh08h9uu8uwr job replicated-job 1/1 (3/5 completed) nginx:latest
|
||||||
```
|
```
|
||||||
|
|
||||||
The `REPLICAS` column shows both the *actual* and *desired* number of tasks for
|
The `REPLICAS` column shows both the *actual* and *desired* number of tasks for
|
||||||
the service.
|
the service. If the service is in `replicated-job` or `global-job`, it will
|
||||||
|
additionally show the completion status of the job as completed tasks over
|
||||||
|
total tasks the job will execute.
|
||||||
|
|
||||||
### Filtering
|
### Filtering
|
||||||
|
|
||||||
|
|
|
@ -52,7 +52,7 @@ b4g08uwuairexjub6ome6usqh
|
||||||
|
|
||||||
$ docker service scale backend=10
|
$ docker service scale backend=10
|
||||||
|
|
||||||
backend: scale can only be used with replicated mode
|
backend: scale can only be used with replicated or replicated-job mode
|
||||||
```
|
```
|
||||||
|
|
||||||
Directly afterwards, run `docker service ls`, to see the actual number of
|
Directly afterwards, run `docker service ls`, to see the actual number of
|
||||||
|
|
|
@ -54,6 +54,7 @@ Options:
|
||||||
--limit-memory bytes Limit Memory
|
--limit-memory bytes Limit Memory
|
||||||
--log-driver string Logging driver for service
|
--log-driver string Logging driver for service
|
||||||
--log-opt list Logging driver options
|
--log-opt list Logging driver options
|
||||||
|
--max-concurrent Number of job tasks to run at once (default equal to --replicas)
|
||||||
--mount-add mount Add or update a mount on a service
|
--mount-add mount Add or update a mount on a service
|
||||||
--mount-rm list Remove a mount by its target path
|
--mount-rm list Remove a mount by its target path
|
||||||
--network-add network Add a network
|
--network-add network Add a network
|
||||||
|
@ -295,6 +296,23 @@ See [`service create`](service_create.md#create-services-using-templates) for th
|
||||||
`service update` supports the same `--isolation` flag as `service create`
|
`service update` supports the same `--isolation` flag as `service create`
|
||||||
See [`service create`](service_create.md) for the reference.
|
See [`service create`](service_create.md) for the reference.
|
||||||
|
|
||||||
|
### Updating Jobs
|
||||||
|
|
||||||
|
When a service is created as a job, by setting its mode to `replicated-job` or
|
||||||
|
to `global-job` when doing `service create`, options for updating it are
|
||||||
|
limited.
|
||||||
|
|
||||||
|
Updating a Job immediately stops any Tasks that are in progress. The operation
|
||||||
|
creates a new set of Tasks for the job and effectively resets its completion
|
||||||
|
status. If any Tasks were running before the update, they are stopped, and new
|
||||||
|
Tasks are created.
|
||||||
|
|
||||||
|
Jobs cannot be rolled out or rolled back. None of the flags for configuring
|
||||||
|
update or rollback settings are valid with job modes.
|
||||||
|
|
||||||
|
To run a job again with the same parameters that it was run previously, it can
|
||||||
|
be force updated with the `--force` flag.
|
||||||
|
|
||||||
## Related commands
|
## Related commands
|
||||||
|
|
||||||
* [service create](service_create.md)
|
* [service create](service_create.md)
|
||||||
|
|
Loading…
Reference in New Issue