From cad20c759f7adf3e374149e59144a2655cc93398 Mon Sep 17 00:00:00 2001 From: Simon Ferquel Date: Mon, 8 Apr 2019 10:37:18 +0200 Subject: [PATCH] Support internal Load Balancing for Kubernetes stacks On the server v0.4.21 has introduced a better way of dealing with intra-stack networking: if the user can specify a list of endpoints exposed internally, we now can setup a ClusterIP for this to avoid the pitfalls of DNS-based load balancing. This exposes the feature using the "Expose" compose field, and adds an extra x-internal-service-type field to explicitly define how intra-stack networking is handled on a service. Signed-off-by: Simon Ferquel --- cli/command/stack/kubernetes/convert.go | 132 ++++++++++++++---- cli/command/stack/kubernetes/convert_test.go | 107 ++++++++++++++ .../testdata/compose-with-expose.yml | 9 ++ 3 files changed, 220 insertions(+), 28 deletions(-) create mode 100644 cli/command/stack/kubernetes/testdata/compose-with-expose.yml diff --git a/cli/command/stack/kubernetes/convert.go b/cli/command/stack/kubernetes/convert.go index a9cf5ba351..c1e066b2c3 100644 --- a/cli/command/stack/kubernetes/convert.go +++ b/cli/command/stack/kubernetes/convert.go @@ -14,8 +14,10 @@ import ( latest "github.com/docker/compose-on-kubernetes/api/compose/v1alpha3" "github.com/docker/compose-on-kubernetes/api/compose/v1beta1" "github.com/docker/compose-on-kubernetes/api/compose/v1beta2" + "github.com/docker/go-connections/nat" "github.com/pkg/errors" yaml "gopkg.in/yaml.v2" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -24,6 +26,8 @@ const ( pullSecretExtraField = "x-pull-secret" // pullPolicyExtraField is an extra field on ServiceConfigs usable to specify a pull policy pullPolicyExtraField = "x-pull-policy" + // internalServiceTypeExtraField is an extra field on ServiceConfigs to explicitly specify the kind of service to setup for intra-stack networking + internalServiceTypeExtraField = "x-internal-service-type" ) // NewStackConverter returns a converter from types.Config (compose) to the specified @@ -265,12 +269,19 @@ func fromComposeServiceConfig(s composeTypes.ServiceConfig, capabilities compose if err != nil { return latest.ServiceConfig{}, err } + if pullSecret != "" && !capabilities.hasPullSecrets { return latest.ServiceConfig{}, errors.Errorf("stack API version %s does not support pull secrets (field %q), please use version v1alpha3 or higher", capabilities.apiVersion, pullSecretExtraField) } if pullPolicy != "" && !capabilities.hasPullPolicies { return latest.ServiceConfig{}, errors.Errorf("stack API version %s does not support pull policies (field %q), please use version v1alpha3 or higher", capabilities.apiVersion, pullPolicyExtraField) } + + internalServiceType, internalPorts, err := setupIntraStackNetworking(s, capabilities) + if err != nil { + return latest.ServiceConfig{}, err + } + return latest.ServiceConfig{ Name: s.Name, CapAdd: s.CapAdd, @@ -286,31 +297,94 @@ func fromComposeServiceConfig(s composeTypes.ServiceConfig, capabilities compose RestartPolicy: fromComposeRestartPolicy(s.Deploy.RestartPolicy), Placement: fromComposePlacement(s.Deploy.Placement), }, - Entrypoint: s.Entrypoint, - Environment: s.Environment, - ExtraHosts: s.ExtraHosts, - Hostname: s.Hostname, - HealthCheck: fromComposeHealthcheck(s.HealthCheck), - Image: s.Image, - Ipc: s.Ipc, - Labels: s.Labels, - Pid: s.Pid, - Ports: fromComposePorts(s.Ports), - Privileged: s.Privileged, - ReadOnly: s.ReadOnly, - Secrets: fromComposeServiceSecrets(s.Secrets), - StdinOpen: s.StdinOpen, - StopGracePeriod: composetypes.ConvertDurationPtr(s.StopGracePeriod), - Tmpfs: s.Tmpfs, - Tty: s.Tty, - User: userID, - Volumes: fromComposeServiceVolumeConfig(s.Volumes), - WorkingDir: s.WorkingDir, - PullSecret: pullSecret, - PullPolicy: pullPolicy, + Entrypoint: s.Entrypoint, + Environment: s.Environment, + ExtraHosts: s.ExtraHosts, + Hostname: s.Hostname, + HealthCheck: fromComposeHealthcheck(s.HealthCheck), + Image: s.Image, + Ipc: s.Ipc, + Labels: s.Labels, + Pid: s.Pid, + Ports: fromComposePorts(s.Ports), + Privileged: s.Privileged, + ReadOnly: s.ReadOnly, + Secrets: fromComposeServiceSecrets(s.Secrets), + StdinOpen: s.StdinOpen, + StopGracePeriod: composetypes.ConvertDurationPtr(s.StopGracePeriod), + Tmpfs: s.Tmpfs, + Tty: s.Tty, + User: userID, + Volumes: fromComposeServiceVolumeConfig(s.Volumes), + WorkingDir: s.WorkingDir, + PullSecret: pullSecret, + PullPolicy: pullPolicy, + InternalServiceType: internalServiceType, + InternalPorts: internalPorts, }, nil } +func setupIntraStackNetworking(s composeTypes.ServiceConfig, capabilities composeCapabilities) (latest.InternalServiceType, []latest.InternalPort, error) { + internalServiceTypeRaw, err := resolveServiceExtra(s, internalServiceTypeExtraField) + if err != nil { + return latest.InternalServiceTypeAuto, nil, err + } + if internalServiceTypeRaw != "" && !capabilities.hasIntraStackLoadBalancing { + return latest.InternalServiceTypeAuto, nil, + errors.Errorf("stack API version %s does not support intra-stack load balancing (field %q), please use version v1alpha3 or higher", capabilities.apiVersion, internalServiceTypeExtraField) + } + if !capabilities.hasIntraStackLoadBalancing { + return latest.InternalServiceTypeAuto, nil, nil + } + internalServiceType, err := validateInternalServiceType(internalServiceTypeRaw) + if err != nil { + return latest.InternalServiceTypeAuto, nil, err + } + internalPorts, err := toInternalPorts(s.Expose) + if err != nil { + return latest.InternalServiceTypeAuto, nil, err + } + return internalServiceType, internalPorts, nil +} + +func validateInternalServiceType(raw string) (latest.InternalServiceType, error) { + internalServiceType := latest.InternalServiceType(raw) + switch internalServiceType { + case latest.InternalServiceTypeAuto, latest.InternalServiceTypeClusterIP, latest.InternalServiceTypeHeadless: + default: + return latest.InternalServiceTypeAuto, + errors.Errorf("invalid value %q for field %q, valid values are %q or %q", raw, + internalServiceTypeExtraField, + latest.InternalServiceTypeClusterIP, + latest.InternalServiceTypeHeadless) + } + return internalServiceType, nil +} + +func toInternalPorts(expose []string) ([]latest.InternalPort, error) { + var internalPorts []latest.InternalPort + for _, sourcePort := range expose { + proto, port := nat.SplitProtoPort(sourcePort) + start, end, err := nat.ParsePortRange(port) + if err != nil { + return nil, errors.Errorf("invalid format for expose: %q, error: %s", sourcePort, err) + } + for i := start; i <= end; i++ { + k8sProto := v1.Protocol(strings.ToUpper(proto)) + switch k8sProto { + case v1.ProtocolSCTP, v1.ProtocolTCP, v1.ProtocolUDP: + default: + return nil, errors.Errorf("invalid protocol for expose: %q, supported values are %q, %q and %q", sourcePort, v1.ProtocolSCTP, v1.ProtocolTCP, v1.ProtocolUDP) + } + internalPorts = append(internalPorts, latest.InternalPort{ + Port: int32(i), + Protocol: k8sProto, + }) + } + } + return internalPorts, nil +} + func resolveServiceExtra(s composeTypes.ServiceConfig, field string) (string, error) { if iface, ok := s.Extras[field]; ok { value, ok := iface.(string) @@ -489,14 +563,16 @@ var ( apiVersion: "v1beta2", } v1alpha3Capabilities = composeCapabilities{ - apiVersion: "v1alpha3", - hasPullSecrets: true, - hasPullPolicies: true, + apiVersion: "v1alpha3", + hasPullSecrets: true, + hasPullPolicies: true, + hasIntraStackLoadBalancing: true, } ) type composeCapabilities struct { - apiVersion string - hasPullSecrets bool - hasPullPolicies bool + apiVersion string + hasPullSecrets bool + hasPullPolicies bool + hasIntraStackLoadBalancing bool } diff --git a/cli/command/stack/kubernetes/convert_test.go b/cli/command/stack/kubernetes/convert_test.go index e8ce63a67c..14634eaa38 100644 --- a/cli/command/stack/kubernetes/convert_test.go +++ b/cli/command/stack/kubernetes/convert_test.go @@ -13,6 +13,7 @@ import ( "github.com/docker/compose-on-kubernetes/api/compose/v1beta2" "gotest.tools/assert" is "gotest.tools/assert/cmp" + "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -235,3 +236,109 @@ func TestHandlePullPolicy(t *testing.T) { }) } } + +func TestHandleInternalServiceType(t *testing.T) { + cases := []struct { + name string + value string + caps composeCapabilities + err string + expected v1alpha3.InternalServiceType + }{ + { + name: "v1beta1", + value: "ClusterIP", + caps: v1beta1Capabilities, + err: `stack API version v1beta1 does not support intra-stack load balancing (field "x-internal-service-type"), please use version v1alpha3 or higher`, + }, + { + name: "v1beta2", + value: "ClusterIP", + caps: v1beta2Capabilities, + err: `stack API version v1beta2 does not support intra-stack load balancing (field "x-internal-service-type"), please use version v1alpha3 or higher`, + }, + { + name: "v1alpha3", + value: "ClusterIP", + caps: v1alpha3Capabilities, + expected: v1alpha3.InternalServiceTypeClusterIP, + }, + { + name: "v1alpha3-invalid", + value: "invalid", + caps: v1alpha3Capabilities, + err: `invalid value "invalid" for field "x-internal-service-type", valid values are "ClusterIP" or "Headless"`, + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + res, err := fromComposeServiceConfig(composetypes.ServiceConfig{ + Name: "test", + Image: "test", + Extras: map[string]interface{}{ + internalServiceTypeExtraField: c.value, + }, + }, c.caps) + if c.err == "" { + assert.NilError(t, err) + assert.Equal(t, res.InternalServiceType, c.expected) + } else { + assert.ErrorContains(t, err, c.err) + } + }) + } +} + +func TestIgnoreExpose(t *testing.T) { + testData := loadTestStackWith(t, "expose") + for _, version := range []string{"v1beta1", "v1beta2"} { + conv, err := NewStackConverter(version) + assert.NilError(t, err) + s, err := conv.FromCompose(ioutil.Discard, "test", testData) + assert.NilError(t, err) + assert.Equal(t, len(s.Spec.Services[0].InternalPorts), 0) + } +} + +func TestParseExpose(t *testing.T) { + testData := loadTestStackWith(t, "expose") + conv, err := NewStackConverter("v1alpha3") + assert.NilError(t, err) + s, err := conv.FromCompose(ioutil.Discard, "test", testData) + assert.NilError(t, err) + expected := []v1alpha3.InternalPort{ + { + Port: 1, + Protocol: v1.ProtocolTCP, + }, + { + Port: 2, + Protocol: v1.ProtocolTCP, + }, + { + Port: 3, + Protocol: v1.ProtocolTCP, + }, + { + Port: 4, + Protocol: v1.ProtocolTCP, + }, + { + Port: 5, + Protocol: v1.ProtocolUDP, + }, + { + Port: 6, + Protocol: v1.ProtocolUDP, + }, + { + Port: 7, + Protocol: v1.ProtocolUDP, + }, + { + Port: 8, + Protocol: v1.ProtocolUDP, + }, + } + assert.DeepEqual(t, s.Spec.Services[0].InternalPorts, expected) +} diff --git a/cli/command/stack/kubernetes/testdata/compose-with-expose.yml b/cli/command/stack/kubernetes/testdata/compose-with-expose.yml new file mode 100644 index 0000000000..4d0b6f7e7a --- /dev/null +++ b/cli/command/stack/kubernetes/testdata/compose-with-expose.yml @@ -0,0 +1,9 @@ +version: "3.7" +services: + test: + image: "some-image" + expose: + - "1" # default protocol, single port + - "2-4" # default protocol, port range + - "5/udp" # specific protocol, single port + - "6-8/udp" # specific protocol, port range \ No newline at end of file