From 21d5d1fa9dfa652585f0398fe186557d406fedee Mon Sep 17 00:00:00 2001 From: Aaron Lehmann Date: Thu, 19 Jan 2017 15:27:37 -0800 Subject: [PATCH] Topology-aware scheduling This adds support for placement preferences in Swarm services. - Convert PlacementPreferences between GRPC API and HTTP API - Add --placement-pref, --placement-pref-add and --placement-pref-rm to CLI - Add support for placement preferences in service inspect --pretty - Add integration test Signed-off-by: Aaron Lehmann --- command/formatter/service.go | 18 ++++++++++- command/service/create.go | 2 ++ command/service/opts.go | 55 ++++++++++++++++++++++++++++++---- command/service/update.go | 44 +++++++++++++++++++++++++-- command/service/update_test.go | 30 +++++++++++++++++-- 5 files changed, 139 insertions(+), 10 deletions(-) diff --git a/command/formatter/service.go b/command/formatter/service.go index b13c5ee608..b8b476dd66 100644 --- a/command/formatter/service.go +++ b/command/formatter/service.go @@ -39,9 +39,12 @@ UpdateStatus: Message: {{ .UpdateStatusMessage }} {{- end }} Placement: -{{- if .TaskPlacementConstraints -}} +{{- if .TaskPlacementConstraints }} Constraints: {{ .TaskPlacementConstraints }} {{- end }} +{{- if .TaskPlacementPreferences }} + Preferences: {{ .TaskPlacementPreferences }} +{{- end }} {{- if .HasUpdateConfig }} UpdateConfig: Parallelism: {{ .UpdateParallelism }} @@ -211,6 +214,19 @@ func (ctx *serviceInspectContext) TaskPlacementConstraints() []string { return nil } +func (ctx *serviceInspectContext) TaskPlacementPreferences() []string { + if ctx.Service.Spec.TaskTemplate.Placement == nil { + return nil + } + var strings []string + for _, pref := range ctx.Service.Spec.TaskTemplate.Placement.Preferences { + if pref.Spread != nil { + strings = append(strings, "spread="+pref.Spread.SpreadDescriptor) + } + } + return strings +} + func (ctx *serviceInspectContext) HasUpdateConfig() bool { return ctx.Service.Spec.UpdateConfig != nil } diff --git a/command/service/create.go b/command/service/create.go index ab90868424..c2eb81727a 100644 --- a/command/service/create.go +++ b/command/service/create.go @@ -37,6 +37,8 @@ func newCreateCommand(dockerCli *command.DockerCli) *cobra.Command { flags.Var(&opts.envFile, flagEnvFile, "Read in a file of environment variables") flags.Var(&opts.mounts, flagMount, "Attach a filesystem mount to the service") flags.Var(&opts.constraints, flagConstraint, "Placement constraints") + flags.Var(&opts.placementPrefs, flagPlacementPref, "Add a placement preference") + flags.SetAnnotation(flagPlacementPref, "version", []string{"1.27"}) flags.Var(&opts.networks, flagNetwork, "Network attachments") flags.Var(&opts.secrets, flagSecret, "Specify secrets to expose to the service") flags.SetAnnotation(flagSecret, "version", []string{"1.25"}) diff --git a/command/service/opts.go b/command/service/opts.go index d8618e73ca..060f7017fa 100644 --- a/command/service/opts.go +++ b/command/service/opts.go @@ -1,6 +1,7 @@ package service import ( + "errors" "fmt" "strconv" "strings" @@ -117,6 +118,45 @@ func (f *floatValue) Value() float32 { return float32(*f) } +// placementPrefOpts holds a list of placement preferences. +type placementPrefOpts struct { + prefs []swarm.PlacementPreference + strings []string +} + +func (opts *placementPrefOpts) String() string { + if len(opts.strings) == 0 { + return "" + } + return fmt.Sprintf("%v", opts.strings) +} + +// Set validates the input value and adds it to the internal slices. +// Note: in the future strategies other than "spread", may be supported, +// as well as additional comma-separated options. +func (opts *placementPrefOpts) Set(value string) error { + fields := strings.Split(value, "=") + if len(fields) != 2 { + return errors.New(`placement preference must be of the format "="`) + } + if fields[0] != "spread" { + return fmt.Errorf("unsupported placement preference %s (only spread is supported)", fields[0]) + } + + opts.prefs = append(opts.prefs, swarm.PlacementPreference{ + Spread: &swarm.SpreadOver{ + SpreadDescriptor: fields[1], + }, + }) + opts.strings = append(opts.strings, value) + return nil +} + +// Type returns a string name for this Option type +func (opts *placementPrefOpts) Type() string { + return "pref" +} + type updateOptions struct { parallelism uint64 delay time.Duration @@ -283,11 +323,12 @@ type serviceOptions struct { replicas Uint64Opt mode string - restartPolicy restartPolicyOptions - constraints opts.ListOpts - update updateOptions - networks opts.ListOpts - endpoint endpointOptions + restartPolicy restartPolicyOptions + constraints opts.ListOpts + placementPrefs placementPrefOpts + update updateOptions + networks opts.ListOpts + endpoint endpointOptions registryAuth bool @@ -398,6 +439,7 @@ func (opts *serviceOptions) ToService() (swarm.ServiceSpec, error) { RestartPolicy: opts.restartPolicy.ToRestartPolicy(), Placement: &swarm.Placement{ Constraints: opts.constraints.GetAll(), + Preferences: opts.placementPrefs.prefs, }, LogDriver: opts.logDriver.toLogDriver(), }, @@ -473,6 +515,9 @@ func addServiceFlags(cmd *cobra.Command, opts *serviceOptions) { } const ( + flagPlacementPref = "placement-pref" + flagPlacementPrefAdd = "placement-pref-add" + flagPlacementPrefRemove = "placement-pref-rm" flagConstraint = "constraint" flagConstraintRemove = "constraint-rm" flagConstraintAdd = "constraint-add" diff --git a/command/service/update.go b/command/service/update.go index 7f461c90a9..1300e5e381 100644 --- a/command/service/update.go +++ b/command/service/update.go @@ -69,6 +69,10 @@ func newUpdateCommand(dockerCli *command.DockerCli) *cobra.Command { flags.SetAnnotation(flagSecretAdd, "version", []string{"1.25"}) flags.Var(&serviceOpts.mounts, flagMountAdd, "Add or update a mount on a service") flags.Var(&serviceOpts.constraints, flagConstraintAdd, "Add or update a placement constraint") + flags.Var(&serviceOpts.placementPrefs, flagPlacementPrefAdd, "Add a placement preference") + flags.SetAnnotation(flagPlacementPrefAdd, "version", []string{"1.27"}) + flags.Var(&placementPrefOpts{}, flagPlacementPrefRemove, "Remove a placement preference") + flags.SetAnnotation(flagPlacementPrefRemove, "version", []string{"1.27"}) flags.Var(&serviceOpts.endpoint.publishPorts, flagPublishAdd, "Add or update a published port") flags.Var(&serviceOpts.groups, flagGroupAdd, "Add an additional supplementary user group to the container") flags.SetAnnotation(flagGroupAdd, "version", []string{"1.25"}) @@ -260,7 +264,14 @@ func updateService(flags *pflag.FlagSet, spec *swarm.ServiceSpec) error { if task.Placement == nil { task.Placement = &swarm.Placement{} } - updatePlacement(flags, task.Placement) + updatePlacementConstraints(flags, task.Placement) + } + + if anyChanged(flags, flagPlacementPrefAdd, flagPlacementPrefRemove) { + if task.Placement == nil { + task.Placement = &swarm.Placement{} + } + updatePlacementPreferences(flags, task.Placement) } if err := updateReplicas(flags, &spec.Mode); err != nil { @@ -372,7 +383,7 @@ func anyChanged(flags *pflag.FlagSet, fields ...string) bool { return false } -func updatePlacement(flags *pflag.FlagSet, placement *swarm.Placement) { +func updatePlacementConstraints(flags *pflag.FlagSet, placement *swarm.Placement) { if flags.Changed(flagConstraintAdd) { values := flags.Lookup(flagConstraintAdd).Value.(*opts.ListOpts).GetAll() placement.Constraints = append(placement.Constraints, values...) @@ -391,6 +402,35 @@ func updatePlacement(flags *pflag.FlagSet, placement *swarm.Placement) { placement.Constraints = newConstraints } +func updatePlacementPreferences(flags *pflag.FlagSet, placement *swarm.Placement) { + var newPrefs []swarm.PlacementPreference + + if flags.Changed(flagPlacementPrefRemove) { + for _, existing := range placement.Preferences { + removed := false + for _, removal := range flags.Lookup(flagPlacementPrefRemove).Value.(*placementPrefOpts).prefs { + if removal.Spread != nil && existing.Spread != nil && removal.Spread.SpreadDescriptor == existing.Spread.SpreadDescriptor { + removed = true + break + } + } + if !removed { + newPrefs = append(newPrefs, existing) + } + } + } else { + newPrefs = placement.Preferences + } + + if flags.Changed(flagPlacementPrefAdd) { + for _, addition := range flags.Lookup(flagPlacementPrefAdd).Value.(*placementPrefOpts).prefs { + newPrefs = append(newPrefs, addition) + } + } + + placement.Preferences = newPrefs +} + func updateContainerLabels(flags *pflag.FlagSet, field *map[string]string) { if flags.Changed(flagContainerLabelAdd) { if *field == nil { diff --git a/command/service/update_test.go b/command/service/update_test.go index f2887e229d..422ab33dac 100644 --- a/command/service/update_test.go +++ b/command/service/update_test.go @@ -51,7 +51,7 @@ func TestUpdateLabelsRemoveALabelThatDoesNotExist(t *testing.T) { assert.Equal(t, len(labels), 1) } -func TestUpdatePlacement(t *testing.T) { +func TestUpdatePlacementConstraints(t *testing.T) { flags := newUpdateCommand(nil).Flags() flags.Set("constraint-add", "node=toadd") flags.Set("constraint-rm", "node!=toremove") @@ -60,12 +60,38 @@ func TestUpdatePlacement(t *testing.T) { Constraints: []string{"node!=toremove", "container=tokeep"}, } - updatePlacement(flags, placement) + updatePlacementConstraints(flags, placement) assert.Equal(t, len(placement.Constraints), 2) assert.Equal(t, placement.Constraints[0], "container=tokeep") assert.Equal(t, placement.Constraints[1], "node=toadd") } +func TestUpdatePlacementPrefs(t *testing.T) { + flags := newUpdateCommand(nil).Flags() + flags.Set("placement-pref-add", "spread=node.labels.dc") + flags.Set("placement-pref-rm", "spread=node.labels.rack") + + placement := &swarm.Placement{ + Preferences: []swarm.PlacementPreference{ + { + Spread: &swarm.SpreadOver{ + SpreadDescriptor: "node.labels.rack", + }, + }, + { + Spread: &swarm.SpreadOver{ + SpreadDescriptor: "node.labels.row", + }, + }, + }, + } + + updatePlacementPreferences(flags, placement) + assert.Equal(t, len(placement.Preferences), 2) + assert.Equal(t, placement.Preferences[0].Spread.SpreadDescriptor, "node.labels.row") + assert.Equal(t, placement.Preferences[1].Spread.SpreadDescriptor, "node.labels.dc") +} + func TestUpdateEnvironment(t *testing.T) { flags := newUpdateCommand(nil).Flags() flags.Set("env-add", "toadd=newenv")