Topology-aware scheduling

This adds support for placement preferences in Swarm services.

- Convert PlacementPreferences between GRPC API and HTTP API
- Add --placement-pref, --placement-pref-add and --placement-pref-rm to CLI
- Add support for placement preferences in service inspect --pretty
- Add integration test

Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
This commit is contained in:
Aaron Lehmann 2017-01-19 15:27:37 -08:00
parent 7665cb52dc
commit 21d5d1fa9d
5 changed files with 139 additions and 10 deletions

View File

@ -39,9 +39,12 @@ UpdateStatus:
Message: {{ .UpdateStatusMessage }} Message: {{ .UpdateStatusMessage }}
{{- end }} {{- end }}
Placement: Placement:
{{- if .TaskPlacementConstraints -}} {{- if .TaskPlacementConstraints }}
Constraints: {{ .TaskPlacementConstraints }} Constraints: {{ .TaskPlacementConstraints }}
{{- end }} {{- end }}
{{- if .TaskPlacementPreferences }}
Preferences: {{ .TaskPlacementPreferences }}
{{- end }}
{{- if .HasUpdateConfig }} {{- if .HasUpdateConfig }}
UpdateConfig: UpdateConfig:
Parallelism: {{ .UpdateParallelism }} Parallelism: {{ .UpdateParallelism }}
@ -211,6 +214,19 @@ func (ctx *serviceInspectContext) TaskPlacementConstraints() []string {
return nil return nil
} }
func (ctx *serviceInspectContext) TaskPlacementPreferences() []string {
if ctx.Service.Spec.TaskTemplate.Placement == nil {
return nil
}
var strings []string
for _, pref := range ctx.Service.Spec.TaskTemplate.Placement.Preferences {
if pref.Spread != nil {
strings = append(strings, "spread="+pref.Spread.SpreadDescriptor)
}
}
return strings
}
func (ctx *serviceInspectContext) HasUpdateConfig() bool { func (ctx *serviceInspectContext) HasUpdateConfig() bool {
return ctx.Service.Spec.UpdateConfig != nil return ctx.Service.Spec.UpdateConfig != nil
} }

View File

@ -37,6 +37,8 @@ func newCreateCommand(dockerCli *command.DockerCli) *cobra.Command {
flags.Var(&opts.envFile, flagEnvFile, "Read in a file of environment variables") flags.Var(&opts.envFile, flagEnvFile, "Read in a file of environment variables")
flags.Var(&opts.mounts, flagMount, "Attach a filesystem mount to the service") flags.Var(&opts.mounts, flagMount, "Attach a filesystem mount to the service")
flags.Var(&opts.constraints, flagConstraint, "Placement constraints") flags.Var(&opts.constraints, flagConstraint, "Placement constraints")
flags.Var(&opts.placementPrefs, flagPlacementPref, "Add a placement preference")
flags.SetAnnotation(flagPlacementPref, "version", []string{"1.27"})
flags.Var(&opts.networks, flagNetwork, "Network attachments") flags.Var(&opts.networks, flagNetwork, "Network attachments")
flags.Var(&opts.secrets, flagSecret, "Specify secrets to expose to the service") flags.Var(&opts.secrets, flagSecret, "Specify secrets to expose to the service")
flags.SetAnnotation(flagSecret, "version", []string{"1.25"}) flags.SetAnnotation(flagSecret, "version", []string{"1.25"})

View File

@ -1,6 +1,7 @@
package service package service
import ( import (
"errors"
"fmt" "fmt"
"strconv" "strconv"
"strings" "strings"
@ -117,6 +118,45 @@ func (f *floatValue) Value() float32 {
return float32(*f) return float32(*f)
} }
// placementPrefOpts holds a list of placement preferences.
type placementPrefOpts struct {
prefs []swarm.PlacementPreference
strings []string
}
func (opts *placementPrefOpts) String() string {
if len(opts.strings) == 0 {
return ""
}
return fmt.Sprintf("%v", opts.strings)
}
// Set validates the input value and adds it to the internal slices.
// Note: in the future strategies other than "spread", may be supported,
// as well as additional comma-separated options.
func (opts *placementPrefOpts) Set(value string) error {
fields := strings.Split(value, "=")
if len(fields) != 2 {
return errors.New(`placement preference must be of the format "<strategy>=<arg>"`)
}
if fields[0] != "spread" {
return fmt.Errorf("unsupported placement preference %s (only spread is supported)", fields[0])
}
opts.prefs = append(opts.prefs, swarm.PlacementPreference{
Spread: &swarm.SpreadOver{
SpreadDescriptor: fields[1],
},
})
opts.strings = append(opts.strings, value)
return nil
}
// Type returns a string name for this Option type
func (opts *placementPrefOpts) Type() string {
return "pref"
}
type updateOptions struct { type updateOptions struct {
parallelism uint64 parallelism uint64
delay time.Duration delay time.Duration
@ -285,6 +325,7 @@ type serviceOptions struct {
restartPolicy restartPolicyOptions restartPolicy restartPolicyOptions
constraints opts.ListOpts constraints opts.ListOpts
placementPrefs placementPrefOpts
update updateOptions update updateOptions
networks opts.ListOpts networks opts.ListOpts
endpoint endpointOptions endpoint endpointOptions
@ -398,6 +439,7 @@ func (opts *serviceOptions) ToService() (swarm.ServiceSpec, error) {
RestartPolicy: opts.restartPolicy.ToRestartPolicy(), RestartPolicy: opts.restartPolicy.ToRestartPolicy(),
Placement: &swarm.Placement{ Placement: &swarm.Placement{
Constraints: opts.constraints.GetAll(), Constraints: opts.constraints.GetAll(),
Preferences: opts.placementPrefs.prefs,
}, },
LogDriver: opts.logDriver.toLogDriver(), LogDriver: opts.logDriver.toLogDriver(),
}, },
@ -473,6 +515,9 @@ func addServiceFlags(cmd *cobra.Command, opts *serviceOptions) {
} }
const ( const (
flagPlacementPref = "placement-pref"
flagPlacementPrefAdd = "placement-pref-add"
flagPlacementPrefRemove = "placement-pref-rm"
flagConstraint = "constraint" flagConstraint = "constraint"
flagConstraintRemove = "constraint-rm" flagConstraintRemove = "constraint-rm"
flagConstraintAdd = "constraint-add" flagConstraintAdd = "constraint-add"

View File

@ -69,6 +69,10 @@ func newUpdateCommand(dockerCli *command.DockerCli) *cobra.Command {
flags.SetAnnotation(flagSecretAdd, "version", []string{"1.25"}) flags.SetAnnotation(flagSecretAdd, "version", []string{"1.25"})
flags.Var(&serviceOpts.mounts, flagMountAdd, "Add or update a mount on a service") flags.Var(&serviceOpts.mounts, flagMountAdd, "Add or update a mount on a service")
flags.Var(&serviceOpts.constraints, flagConstraintAdd, "Add or update a placement constraint") flags.Var(&serviceOpts.constraints, flagConstraintAdd, "Add or update a placement constraint")
flags.Var(&serviceOpts.placementPrefs, flagPlacementPrefAdd, "Add a placement preference")
flags.SetAnnotation(flagPlacementPrefAdd, "version", []string{"1.27"})
flags.Var(&placementPrefOpts{}, flagPlacementPrefRemove, "Remove a placement preference")
flags.SetAnnotation(flagPlacementPrefRemove, "version", []string{"1.27"})
flags.Var(&serviceOpts.endpoint.publishPorts, flagPublishAdd, "Add or update a published port") flags.Var(&serviceOpts.endpoint.publishPorts, flagPublishAdd, "Add or update a published port")
flags.Var(&serviceOpts.groups, flagGroupAdd, "Add an additional supplementary user group to the container") flags.Var(&serviceOpts.groups, flagGroupAdd, "Add an additional supplementary user group to the container")
flags.SetAnnotation(flagGroupAdd, "version", []string{"1.25"}) flags.SetAnnotation(flagGroupAdd, "version", []string{"1.25"})
@ -260,7 +264,14 @@ func updateService(flags *pflag.FlagSet, spec *swarm.ServiceSpec) error {
if task.Placement == nil { if task.Placement == nil {
task.Placement = &swarm.Placement{} task.Placement = &swarm.Placement{}
} }
updatePlacement(flags, task.Placement) updatePlacementConstraints(flags, task.Placement)
}
if anyChanged(flags, flagPlacementPrefAdd, flagPlacementPrefRemove) {
if task.Placement == nil {
task.Placement = &swarm.Placement{}
}
updatePlacementPreferences(flags, task.Placement)
} }
if err := updateReplicas(flags, &spec.Mode); err != nil { if err := updateReplicas(flags, &spec.Mode); err != nil {
@ -372,7 +383,7 @@ func anyChanged(flags *pflag.FlagSet, fields ...string) bool {
return false return false
} }
func updatePlacement(flags *pflag.FlagSet, placement *swarm.Placement) { func updatePlacementConstraints(flags *pflag.FlagSet, placement *swarm.Placement) {
if flags.Changed(flagConstraintAdd) { if flags.Changed(flagConstraintAdd) {
values := flags.Lookup(flagConstraintAdd).Value.(*opts.ListOpts).GetAll() values := flags.Lookup(flagConstraintAdd).Value.(*opts.ListOpts).GetAll()
placement.Constraints = append(placement.Constraints, values...) placement.Constraints = append(placement.Constraints, values...)
@ -391,6 +402,35 @@ func updatePlacement(flags *pflag.FlagSet, placement *swarm.Placement) {
placement.Constraints = newConstraints placement.Constraints = newConstraints
} }
func updatePlacementPreferences(flags *pflag.FlagSet, placement *swarm.Placement) {
var newPrefs []swarm.PlacementPreference
if flags.Changed(flagPlacementPrefRemove) {
for _, existing := range placement.Preferences {
removed := false
for _, removal := range flags.Lookup(flagPlacementPrefRemove).Value.(*placementPrefOpts).prefs {
if removal.Spread != nil && existing.Spread != nil && removal.Spread.SpreadDescriptor == existing.Spread.SpreadDescriptor {
removed = true
break
}
}
if !removed {
newPrefs = append(newPrefs, existing)
}
}
} else {
newPrefs = placement.Preferences
}
if flags.Changed(flagPlacementPrefAdd) {
for _, addition := range flags.Lookup(flagPlacementPrefAdd).Value.(*placementPrefOpts).prefs {
newPrefs = append(newPrefs, addition)
}
}
placement.Preferences = newPrefs
}
func updateContainerLabels(flags *pflag.FlagSet, field *map[string]string) { func updateContainerLabels(flags *pflag.FlagSet, field *map[string]string) {
if flags.Changed(flagContainerLabelAdd) { if flags.Changed(flagContainerLabelAdd) {
if *field == nil { if *field == nil {

View File

@ -51,7 +51,7 @@ func TestUpdateLabelsRemoveALabelThatDoesNotExist(t *testing.T) {
assert.Equal(t, len(labels), 1) assert.Equal(t, len(labels), 1)
} }
func TestUpdatePlacement(t *testing.T) { func TestUpdatePlacementConstraints(t *testing.T) {
flags := newUpdateCommand(nil).Flags() flags := newUpdateCommand(nil).Flags()
flags.Set("constraint-add", "node=toadd") flags.Set("constraint-add", "node=toadd")
flags.Set("constraint-rm", "node!=toremove") flags.Set("constraint-rm", "node!=toremove")
@ -60,12 +60,38 @@ func TestUpdatePlacement(t *testing.T) {
Constraints: []string{"node!=toremove", "container=tokeep"}, Constraints: []string{"node!=toremove", "container=tokeep"},
} }
updatePlacement(flags, placement) updatePlacementConstraints(flags, placement)
assert.Equal(t, len(placement.Constraints), 2) assert.Equal(t, len(placement.Constraints), 2)
assert.Equal(t, placement.Constraints[0], "container=tokeep") assert.Equal(t, placement.Constraints[0], "container=tokeep")
assert.Equal(t, placement.Constraints[1], "node=toadd") assert.Equal(t, placement.Constraints[1], "node=toadd")
} }
func TestUpdatePlacementPrefs(t *testing.T) {
flags := newUpdateCommand(nil).Flags()
flags.Set("placement-pref-add", "spread=node.labels.dc")
flags.Set("placement-pref-rm", "spread=node.labels.rack")
placement := &swarm.Placement{
Preferences: []swarm.PlacementPreference{
{
Spread: &swarm.SpreadOver{
SpreadDescriptor: "node.labels.rack",
},
},
{
Spread: &swarm.SpreadOver{
SpreadDescriptor: "node.labels.row",
},
},
},
}
updatePlacementPreferences(flags, placement)
assert.Equal(t, len(placement.Preferences), 2)
assert.Equal(t, placement.Preferences[0].Spread.SpreadDescriptor, "node.labels.row")
assert.Equal(t, placement.Preferences[1].Spread.SpreadDescriptor, "node.labels.dc")
}
func TestUpdateEnvironment(t *testing.T) { func TestUpdateEnvironment(t *testing.T) {
flags := newUpdateCommand(nil).Flags() flags := newUpdateCommand(nil).Flags()
flags.Set("env-add", "toadd=newenv") flags.Set("env-add", "toadd=newenv")