Support internal Load Balancing for Kubernetes stacks

On the server v0.4.21 has introduced a better way of dealing with
intra-stack networking: if the user can specify a list of endpoints
exposed internally, we now can setup a ClusterIP for this to avoid the
pitfalls of DNS-based load balancing.
This exposes the feature using the "Expose" compose field, and adds an
extra x-internal-service-type field to explicitly define how intra-stack
networking is handled on a service.

Signed-off-by: Simon Ferquel <simon.ferquel@docker.com>
This commit is contained in:
Simon Ferquel 2019-04-08 10:37:18 +02:00
parent 5bbb56bfee
commit cad20c759f
3 changed files with 220 additions and 28 deletions

View File

@ -14,8 +14,10 @@ import (
latest "github.com/docker/compose-on-kubernetes/api/compose/v1alpha3"
"github.com/docker/compose-on-kubernetes/api/compose/v1beta1"
"github.com/docker/compose-on-kubernetes/api/compose/v1beta2"
"github.com/docker/go-connections/nat"
"github.com/pkg/errors"
yaml "gopkg.in/yaml.v2"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
@ -24,6 +26,8 @@ const (
pullSecretExtraField = "x-pull-secret"
// pullPolicyExtraField is an extra field on ServiceConfigs usable to specify a pull policy
pullPolicyExtraField = "x-pull-policy"
// internalServiceTypeExtraField is an extra field on ServiceConfigs to explicitly specify the kind of service to setup for intra-stack networking
internalServiceTypeExtraField = "x-internal-service-type"
)
// NewStackConverter returns a converter from types.Config (compose) to the specified
@ -265,12 +269,19 @@ func fromComposeServiceConfig(s composeTypes.ServiceConfig, capabilities compose
if err != nil {
return latest.ServiceConfig{}, err
}
if pullSecret != "" && !capabilities.hasPullSecrets {
return latest.ServiceConfig{}, errors.Errorf("stack API version %s does not support pull secrets (field %q), please use version v1alpha3 or higher", capabilities.apiVersion, pullSecretExtraField)
}
if pullPolicy != "" && !capabilities.hasPullPolicies {
return latest.ServiceConfig{}, errors.Errorf("stack API version %s does not support pull policies (field %q), please use version v1alpha3 or higher", capabilities.apiVersion, pullPolicyExtraField)
}
internalServiceType, internalPorts, err := setupIntraStackNetworking(s, capabilities)
if err != nil {
return latest.ServiceConfig{}, err
}
return latest.ServiceConfig{
Name: s.Name,
CapAdd: s.CapAdd,
@ -286,31 +297,94 @@ func fromComposeServiceConfig(s composeTypes.ServiceConfig, capabilities compose
RestartPolicy: fromComposeRestartPolicy(s.Deploy.RestartPolicy),
Placement: fromComposePlacement(s.Deploy.Placement),
},
Entrypoint: s.Entrypoint,
Environment: s.Environment,
ExtraHosts: s.ExtraHosts,
Hostname: s.Hostname,
HealthCheck: fromComposeHealthcheck(s.HealthCheck),
Image: s.Image,
Ipc: s.Ipc,
Labels: s.Labels,
Pid: s.Pid,
Ports: fromComposePorts(s.Ports),
Privileged: s.Privileged,
ReadOnly: s.ReadOnly,
Secrets: fromComposeServiceSecrets(s.Secrets),
StdinOpen: s.StdinOpen,
StopGracePeriod: composetypes.ConvertDurationPtr(s.StopGracePeriod),
Tmpfs: s.Tmpfs,
Tty: s.Tty,
User: userID,
Volumes: fromComposeServiceVolumeConfig(s.Volumes),
WorkingDir: s.WorkingDir,
PullSecret: pullSecret,
PullPolicy: pullPolicy,
Entrypoint: s.Entrypoint,
Environment: s.Environment,
ExtraHosts: s.ExtraHosts,
Hostname: s.Hostname,
HealthCheck: fromComposeHealthcheck(s.HealthCheck),
Image: s.Image,
Ipc: s.Ipc,
Labels: s.Labels,
Pid: s.Pid,
Ports: fromComposePorts(s.Ports),
Privileged: s.Privileged,
ReadOnly: s.ReadOnly,
Secrets: fromComposeServiceSecrets(s.Secrets),
StdinOpen: s.StdinOpen,
StopGracePeriod: composetypes.ConvertDurationPtr(s.StopGracePeriod),
Tmpfs: s.Tmpfs,
Tty: s.Tty,
User: userID,
Volumes: fromComposeServiceVolumeConfig(s.Volumes),
WorkingDir: s.WorkingDir,
PullSecret: pullSecret,
PullPolicy: pullPolicy,
InternalServiceType: internalServiceType,
InternalPorts: internalPorts,
}, nil
}
func setupIntraStackNetworking(s composeTypes.ServiceConfig, capabilities composeCapabilities) (latest.InternalServiceType, []latest.InternalPort, error) {
internalServiceTypeRaw, err := resolveServiceExtra(s, internalServiceTypeExtraField)
if err != nil {
return latest.InternalServiceTypeAuto, nil, err
}
if internalServiceTypeRaw != "" && !capabilities.hasIntraStackLoadBalancing {
return latest.InternalServiceTypeAuto, nil,
errors.Errorf("stack API version %s does not support intra-stack load balancing (field %q), please use version v1alpha3 or higher", capabilities.apiVersion, internalServiceTypeExtraField)
}
if !capabilities.hasIntraStackLoadBalancing {
return latest.InternalServiceTypeAuto, nil, nil
}
internalServiceType, err := validateInternalServiceType(internalServiceTypeRaw)
if err != nil {
return latest.InternalServiceTypeAuto, nil, err
}
internalPorts, err := toInternalPorts(s.Expose)
if err != nil {
return latest.InternalServiceTypeAuto, nil, err
}
return internalServiceType, internalPorts, nil
}
func validateInternalServiceType(raw string) (latest.InternalServiceType, error) {
internalServiceType := latest.InternalServiceType(raw)
switch internalServiceType {
case latest.InternalServiceTypeAuto, latest.InternalServiceTypeClusterIP, latest.InternalServiceTypeHeadless:
default:
return latest.InternalServiceTypeAuto,
errors.Errorf("invalid value %q for field %q, valid values are %q or %q", raw,
internalServiceTypeExtraField,
latest.InternalServiceTypeClusterIP,
latest.InternalServiceTypeHeadless)
}
return internalServiceType, nil
}
func toInternalPorts(expose []string) ([]latest.InternalPort, error) {
var internalPorts []latest.InternalPort
for _, sourcePort := range expose {
proto, port := nat.SplitProtoPort(sourcePort)
start, end, err := nat.ParsePortRange(port)
if err != nil {
return nil, errors.Errorf("invalid format for expose: %q, error: %s", sourcePort, err)
}
for i := start; i <= end; i++ {
k8sProto := v1.Protocol(strings.ToUpper(proto))
switch k8sProto {
case v1.ProtocolSCTP, v1.ProtocolTCP, v1.ProtocolUDP:
default:
return nil, errors.Errorf("invalid protocol for expose: %q, supported values are %q, %q and %q", sourcePort, v1.ProtocolSCTP, v1.ProtocolTCP, v1.ProtocolUDP)
}
internalPorts = append(internalPorts, latest.InternalPort{
Port: int32(i),
Protocol: k8sProto,
})
}
}
return internalPorts, nil
}
func resolveServiceExtra(s composeTypes.ServiceConfig, field string) (string, error) {
if iface, ok := s.Extras[field]; ok {
value, ok := iface.(string)
@ -489,14 +563,16 @@ var (
apiVersion: "v1beta2",
}
v1alpha3Capabilities = composeCapabilities{
apiVersion: "v1alpha3",
hasPullSecrets: true,
hasPullPolicies: true,
apiVersion: "v1alpha3",
hasPullSecrets: true,
hasPullPolicies: true,
hasIntraStackLoadBalancing: true,
}
)
type composeCapabilities struct {
apiVersion string
hasPullSecrets bool
hasPullPolicies bool
apiVersion string
hasPullSecrets bool
hasPullPolicies bool
hasIntraStackLoadBalancing bool
}

View File

@ -13,6 +13,7 @@ import (
"github.com/docker/compose-on-kubernetes/api/compose/v1beta2"
"gotest.tools/assert"
is "gotest.tools/assert/cmp"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
@ -235,3 +236,109 @@ func TestHandlePullPolicy(t *testing.T) {
})
}
}
func TestHandleInternalServiceType(t *testing.T) {
cases := []struct {
name string
value string
caps composeCapabilities
err string
expected v1alpha3.InternalServiceType
}{
{
name: "v1beta1",
value: "ClusterIP",
caps: v1beta1Capabilities,
err: `stack API version v1beta1 does not support intra-stack load balancing (field "x-internal-service-type"), please use version v1alpha3 or higher`,
},
{
name: "v1beta2",
value: "ClusterIP",
caps: v1beta2Capabilities,
err: `stack API version v1beta2 does not support intra-stack load balancing (field "x-internal-service-type"), please use version v1alpha3 or higher`,
},
{
name: "v1alpha3",
value: "ClusterIP",
caps: v1alpha3Capabilities,
expected: v1alpha3.InternalServiceTypeClusterIP,
},
{
name: "v1alpha3-invalid",
value: "invalid",
caps: v1alpha3Capabilities,
err: `invalid value "invalid" for field "x-internal-service-type", valid values are "ClusterIP" or "Headless"`,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
res, err := fromComposeServiceConfig(composetypes.ServiceConfig{
Name: "test",
Image: "test",
Extras: map[string]interface{}{
internalServiceTypeExtraField: c.value,
},
}, c.caps)
if c.err == "" {
assert.NilError(t, err)
assert.Equal(t, res.InternalServiceType, c.expected)
} else {
assert.ErrorContains(t, err, c.err)
}
})
}
}
func TestIgnoreExpose(t *testing.T) {
testData := loadTestStackWith(t, "expose")
for _, version := range []string{"v1beta1", "v1beta2"} {
conv, err := NewStackConverter(version)
assert.NilError(t, err)
s, err := conv.FromCompose(ioutil.Discard, "test", testData)
assert.NilError(t, err)
assert.Equal(t, len(s.Spec.Services[0].InternalPorts), 0)
}
}
func TestParseExpose(t *testing.T) {
testData := loadTestStackWith(t, "expose")
conv, err := NewStackConverter("v1alpha3")
assert.NilError(t, err)
s, err := conv.FromCompose(ioutil.Discard, "test", testData)
assert.NilError(t, err)
expected := []v1alpha3.InternalPort{
{
Port: 1,
Protocol: v1.ProtocolTCP,
},
{
Port: 2,
Protocol: v1.ProtocolTCP,
},
{
Port: 3,
Protocol: v1.ProtocolTCP,
},
{
Port: 4,
Protocol: v1.ProtocolTCP,
},
{
Port: 5,
Protocol: v1.ProtocolUDP,
},
{
Port: 6,
Protocol: v1.ProtocolUDP,
},
{
Port: 7,
Protocol: v1.ProtocolUDP,
},
{
Port: 8,
Protocol: v1.ProtocolUDP,
},
}
assert.DeepEqual(t, s.Spec.Services[0].InternalPorts, expected)
}

View File

@ -0,0 +1,9 @@
version: "3.7"
services:
test:
image: "some-image"
expose:
- "1" # default protocol, single port
- "2-4" # default protocol, port range
- "5/udp" # specific protocol, single port
- "6-8/udp" # specific protocol, port range