Better stack status check

Signed-off-by: Simon Ferquel <simon.ferquel@docker.com>
This commit is contained in:
Simon Ferquel 2018-05-14 15:44:55 +02:00
parent 8cb2e44d68
commit f38510b2d8
5 changed files with 550 additions and 86 deletions

View File

@ -7,6 +7,7 @@ import (
"github.com/docker/cli/cli/command"
"github.com/docker/cli/kubernetes"
cliv1beta1 "github.com/docker/cli/kubernetes/client/clientset/typed/compose/v1beta1"
flag "github.com/spf13/pflag"
kubeclient "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
@ -113,3 +114,11 @@ func (c *KubeCli) checkHostsMatch() error {
" Update $DOCKER_HOST (or pass -H), or use 'kubectl config use-context' to match.\n", daemonEndpoint.Hostname(), kubeEndpoint.Hostname())
return nil
}
func (c *KubeCli) stacksv1beta1() (cliv1beta1.StackInterface, error) {
raw, err := newStackV1Beta1(c.kubeConfig, c.kubeNamespace)
if err != nil {
return nil, err
}
return raw.stacks, nil
}

View File

@ -2,9 +2,12 @@ package kubernetes
import (
"fmt"
"io"
"github.com/docker/cli/cli/command"
"github.com/docker/cli/cli/command/stack/loader"
"github.com/docker/cli/cli/command/stack/options"
"github.com/morikuni/aec"
"github.com/pkg/errors"
)
@ -39,10 +42,6 @@ func RunDeploy(dockerCli *KubeCli, opts options.Deploy) error {
configMaps := composeClient.ConfigMaps()
secrets := composeClient.Secrets()
services := composeClient.Services()
pods := composeClient.Pods()
watcher := DeployWatcher{
Pods: pods,
}
if err := stacks.IsColliding(services, stack); err != nil {
return err
@ -61,10 +60,109 @@ func RunDeploy(dockerCli *KubeCli, opts options.Deploy) error {
}
fmt.Fprintln(cmdOut, "Waiting for the stack to be stable and running...")
<-watcher.Watch(stack.name, stack.getServices())
fmt.Fprintf(cmdOut, "Stack %s is stable and running\n\n", stack.name)
return nil
v1beta1Cli, err := dockerCli.stacksv1beta1()
if err != nil {
return err
}
pods := composeClient.Pods()
watcher := &deployWatcher{
stacks: v1beta1Cli,
pods: pods,
}
statusUpdates := make(chan serviceStatus)
displayDone := make(chan struct{})
go func() {
defer close(displayDone)
display := newStatusDisplay(dockerCli.Out())
for status := range statusUpdates {
display.OnStatus(status)
}
}()
err = watcher.Watch(stack.name, stack.getServices(), statusUpdates)
close(statusUpdates)
<-displayDone
if err != nil {
return err
}
fmt.Fprintf(cmdOut, "\nStack %s is stable and running\n\n", stack.name)
return nil
}
type statusDisplay interface {
OnStatus(serviceStatus)
}
type metaServiceState string
const (
metaServiceStateReady = metaServiceState("Ready")
metaServiceStatePending = metaServiceState("Pending")
metaServiceStateFailed = metaServiceState("Failed")
)
func metaStateFromStatus(status serviceStatus) metaServiceState {
switch {
case status.podsReady > 0:
return metaServiceStateReady
case status.podsPending > 0:
return metaServiceStatePending
default:
return metaServiceStateFailed
}
}
type forwardOnlyStatusDisplay struct {
o *command.OutStream
states map[string]metaServiceState
}
func (d *forwardOnlyStatusDisplay) OnStatus(status serviceStatus) {
state := metaStateFromStatus(status)
if d.states[status.name] != state {
d.states[status.name] = state
fmt.Fprintf(d.o, "%s: %s\n", status.name, state)
}
}
type interactiveStatusDisplay struct {
o *command.OutStream
statuses []serviceStatus
}
func (d *interactiveStatusDisplay) OnStatus(status serviceStatus) {
b := aec.EmptyBuilder
for ix := 0; ix < len(d.statuses); ix++ {
b = b.Up(1).EraseLine(aec.EraseModes.All)
}
b = b.Column(0)
fmt.Fprint(d.o, b.ANSI)
updated := false
for ix, s := range d.statuses {
if s.name == status.name {
d.statuses[ix] = status
s = status
updated = true
}
displayInteractiveServiceStatus(s, d.o)
}
if !updated {
d.statuses = append(d.statuses, status)
displayInteractiveServiceStatus(status, d.o)
}
}
func displayInteractiveServiceStatus(status serviceStatus, o io.Writer) {
state := metaStateFromStatus(status)
totalFailed := status.podsFailed + status.podsSucceeded + status.podsUnknown
fmt.Fprintf(o, "%[1]s: %[2]s\t\t[pod status: %[3]d/%[6]d ready, %[4]d/%[6]d pending, %[5]d/%[6]d failed]\n", status.name, state,
status.podsReady, status.podsPending, totalFailed, status.podsTotal)
}
func newStatusDisplay(o *command.OutStream) statusDisplay {
if !o.IsTerminal() {
return &forwardOnlyStatusDisplay{o: o, states: map[string]metaServiceState{}}
}
return &interactiveStatusDisplay{o: o}
}

View File

@ -33,7 +33,7 @@ type stackV1Beta1 struct {
stacks composev1beta1.StackInterface
}
func newStackV1Beta1(config *rest.Config, namespace string) (StackClient, error) {
func newStackV1Beta1(config *rest.Config, namespace string) (*stackV1Beta1, error) {
client, err := composev1beta1.NewForConfig(config)
if err != nil {
return nil, err
@ -136,7 +136,7 @@ type stackV1Beta2 struct {
stacks composev1beta2.StackInterface
}
func newStackV1Beta2(config *rest.Config, namespace string) (StackClient, error) {
func newStackV1Beta2(config *rest.Config, namespace string) (*stackV1Beta2, error) {
client, err := composev1beta2.NewForConfig(config)
if err != nil {
return nil, err

View File

@ -1,116 +1,255 @@
package kubernetes
import (
"fmt"
"context"
"sync"
"time"
apiv1beta1 "github.com/docker/cli/kubernetes/compose/v1beta1"
"github.com/docker/cli/kubernetes/labels"
"github.com/pkg/errors"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/apimachinery/pkg/runtime"
runtimeutil "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
cache "k8s.io/client-go/tools/cache"
podutils "k8s.io/kubernetes/pkg/api/v1/pod"
)
type stackListWatch interface {
List(opts metav1.ListOptions) (*apiv1beta1.StackList, error)
Watch(opts metav1.ListOptions) (watch.Interface, error)
}
type podListWatch interface {
List(opts metav1.ListOptions) (*apiv1.PodList, error)
Watch(opts metav1.ListOptions) (watch.Interface, error)
}
// DeployWatcher watches a stack deployement
type DeployWatcher struct {
Pods corev1.PodInterface
type deployWatcher struct {
pods podListWatch
stacks stackListWatch
}
// Watch watches a stuck deployement and return a chan that will holds the state of the stack
func (w DeployWatcher) Watch(name string, serviceNames []string) chan bool {
stop := make(chan bool)
func (w *deployWatcher) Watch(name string, serviceNames []string, statusUpdates chan serviceStatus) error {
errC := make(chan error, 1)
defer close(errC)
go w.waitForPods(name, serviceNames, stop)
handlers := runtimeutil.ErrorHandlers
return stop
}
func (w DeployWatcher) waitForPods(stackName string, serviceNames []string, stop chan bool) {
starts := map[string]int32{}
for {
time.Sleep(1 * time.Second)
list, err := w.Pods.List(metav1.ListOptions{
LabelSelector: labels.SelectorForStack(stackName),
IncludeUninitialized: true,
// informer errors are reported using global error handlers
runtimeutil.ErrorHandlers = append(handlers, func(err error) {
errC <- err
})
if err != nil {
stop <- true
defer func() {
runtimeutil.ErrorHandlers = handlers
}()
ctx, cancel := context.WithCancel(context.Background())
wg := sync.WaitGroup{}
defer func() {
cancel()
wg.Wait()
}()
wg.Add(2)
go func() {
defer wg.Done()
w.watchStackStatus(ctx, name, errC)
}()
go func() {
defer wg.Done()
w.waitForPods(ctx, name, serviceNames, errC, statusUpdates)
}()
return <-errC
}
type stackWatcher struct {
resultChan chan error
stackName string
}
var _ cache.ResourceEventHandler = &stackWatcher{}
func (sw *stackWatcher) OnAdd(obj interface{}) {
stack, ok := obj.(*apiv1beta1.Stack)
switch {
case !ok:
sw.resultChan <- errors.Errorf("stack %s has incorrect type", sw.stackName)
case stack.Status.Phase == apiv1beta1.StackFailure:
sw.resultChan <- errors.Errorf("stack %s failed with status %s: %s", sw.stackName, stack.Status.Phase, stack.Status.Message)
}
}
func (sw *stackWatcher) OnUpdate(oldObj, newObj interface{}) {
sw.OnAdd(newObj)
}
func (sw *stackWatcher) OnDelete(obj interface{}) {
}
func (w *deployWatcher) watchStackStatus(ctx context.Context, stackname string, e chan error) {
informer := newStackInformer(w.stacks, stackname)
sw := &stackWatcher{
resultChan: e,
}
informer.AddEventHandler(sw)
informer.Run(ctx.Done())
}
type serviceStatus struct {
name string
podsPending int
podsRunning int
podsSucceeded int
podsFailed int
podsUnknown int
podsReady int
podsTotal int
}
type podWatcher struct {
stackName string
services map[string]serviceStatus
resultChan chan error
starts map[string]int32
indexer cache.Indexer
statusUpdates chan serviceStatus
}
var _ cache.ResourceEventHandler = &podWatcher{}
func (pw *podWatcher) handlePod(obj interface{}) {
pod, ok := obj.(*apiv1.Pod)
if !ok {
pw.resultChan <- errors.Errorf("Pod has incorrect type in stack %s", pw.stackName)
return
}
for i := range list.Items {
pod := list.Items[i]
if pod.Status.Phase != apiv1.PodRunning {
continue
}
startCount := startCount(pod)
serviceName := pod.Labels[labels.ForServiceName]
if startCount != starts[serviceName] {
if startCount == 1 {
fmt.Printf(" - Service %s has one container running\n", serviceName)
} else {
fmt.Printf(" - Service %s was restarted %d %s\n", serviceName, startCount-1, timeTimes(startCount-1))
}
starts[serviceName] = startCount
}
}
if allReady(list.Items, serviceNames) {
stop <- true
return
pw.updateServiceStatus(serviceName)
if pw.allReady() {
select {
case pw.resultChan <- nil:
default:
// result has already been reported, just don't block
}
}
}
func startCount(pod apiv1.Pod) int32 {
restart := int32(0)
for _, status := range pod.Status.ContainerStatuses {
restart += status.RestartCount
func (pw *podWatcher) updateServiceStatus(serviceName string) {
pods, _ := pw.indexer.ByIndex("byservice", serviceName)
status := serviceStatus{name: serviceName}
for _, obj := range pods {
if pod, ok := obj.(*apiv1.Pod); ok {
switch pod.Status.Phase {
case apiv1.PodPending:
status.podsPending++
case apiv1.PodRunning:
status.podsRunning++
case apiv1.PodSucceeded:
status.podsSucceeded++
case apiv1.PodFailed:
status.podsFailed++
case apiv1.PodUnknown:
status.podsUnknown++
}
return 1 + restart
}
func allReady(pods []apiv1.Pod, serviceNames []string) bool {
serviceUp := map[string]bool{}
for _, pod := range pods {
if time.Since(pod.GetCreationTimestamp().Time) < 10*time.Second {
return false
}
ready := false
for _, cond := range pod.Status.Conditions {
if cond.Type == apiv1.PodReady && cond.Status == apiv1.ConditionTrue {
ready = true
if podutils.IsPodReady(pod) {
status.podsReady++
}
}
if !ready {
return false
}
status.podsTotal = len(pods)
oldStatus := pw.services[serviceName]
if oldStatus != status {
pw.statusUpdates <- status
}
pw.services[serviceName] = status
}
serviceName := pod.Labels[labels.ForServiceName]
serviceUp[serviceName] = true
}
for _, serviceName := range serviceNames {
if !serviceUp[serviceName] {
func (pw *podWatcher) allReady() bool {
for _, status := range pw.services {
if status.podsReady == 0 {
return false
}
}
return true
}
func timeTimes(n int32) string {
if n == 1 {
return "time"
func (pw *podWatcher) OnAdd(obj interface{}) {
pw.handlePod(obj)
}
return "times"
func (pw *podWatcher) OnUpdate(oldObj, newObj interface{}) {
pw.handlePod(newObj)
}
func (pw *podWatcher) OnDelete(obj interface{}) {
pw.handlePod(obj)
}
func (w *deployWatcher) waitForPods(ctx context.Context, stackName string, serviceNames []string, e chan error, statusUpdates chan serviceStatus) {
informer := newPodInformer(w.pods, stackName, cache.Indexers{
"byservice": func(obj interface{}) ([]string, error) {
pod, ok := obj.(*apiv1.Pod)
if !ok {
return nil, errors.Errorf("Pod has incorrect type in stack %s", stackName)
}
return []string{pod.Labels[labels.ForServiceName]}, nil
}})
services := map[string]serviceStatus{}
for _, name := range serviceNames {
services[name] = serviceStatus{name: name}
}
pw := &podWatcher{
stackName: stackName,
services: services,
resultChan: e,
starts: map[string]int32{},
indexer: informer.GetIndexer(),
statusUpdates: statusUpdates,
}
informer.AddEventHandler(pw)
informer.Run(ctx.Done())
}
func newPodInformer(podsClient podListWatch, stackName string, indexers cache.Indexers) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.LabelSelector = labels.SelectorForStack(stackName)
options.IncludeUninitialized = true
return podsClient.List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.LabelSelector = labels.SelectorForStack(stackName)
options.IncludeUninitialized = true
return podsClient.Watch(options)
},
},
&apiv1.Pod{},
time.Second*5,
indexers,
)
}
func newStackInformer(stacksClient stackListWatch, stackName string) cache.SharedInformer {
return cache.NewSharedInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.LabelSelector = labels.SelectorForStack(stackName)
return stacksClient.List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.LabelSelector = labels.SelectorForStack(stackName)
return stacksClient.Watch(options)
},
},
&apiv1beta1.Stack{},
time.Second*5,
)
}

View File

@ -0,0 +1,218 @@
package kubernetes
import (
"testing"
apiv1beta1 "github.com/docker/cli/kubernetes/compose/v1beta1"
composelabels "github.com/docker/cli/kubernetes/labels"
"github.com/gotestyourself/gotestyourself/assert"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/watch"
k8stesting "k8s.io/client-go/testing"
)
var podsResource = apiv1.SchemeGroupVersion.WithResource("pods")
var podKind = apiv1.SchemeGroupVersion.WithKind("Pod")
var stacksResource = apiv1beta1.SchemeGroupVersion.WithResource("stacks")
var stackKind = apiv1beta1.SchemeGroupVersion.WithKind("Stack")
type testPodAndStackRepository struct {
fake *k8stesting.Fake
}
func (r *testPodAndStackRepository) stackListWatchForNamespace(ns string) *testStackListWatch {
return &testStackListWatch{fake: r.fake, ns: ns}
}
func (r *testPodAndStackRepository) podListWatchForNamespace(ns string) *testPodListWatch {
return &testPodListWatch{fake: r.fake, ns: ns}
}
func newTestPodAndStackRepository(initialPods []apiv1.Pod, initialStacks []apiv1beta1.Stack, podWatchHandler, stackWatchHandler k8stesting.WatchReactionFunc) *testPodAndStackRepository {
var scheme = runtime.NewScheme()
var codecs = serializer.NewCodecFactory(scheme)
metav1.AddToGroupVersion(scheme, schema.GroupVersion{Version: "v1"})
apiv1.AddToScheme(scheme)
apiv1beta1.AddToScheme(scheme)
o := k8stesting.NewObjectTracker(scheme, codecs.UniversalDecoder())
for _, obj := range initialPods {
if err := o.Add(&obj); err != nil {
panic(err)
}
}
for _, obj := range initialStacks {
if err := o.Add(&obj); err != nil {
panic(err)
}
}
fakePtr := &k8stesting.Fake{}
fakePtr.AddReactor("*", "*", k8stesting.ObjectReaction(o))
if podWatchHandler != nil {
fakePtr.AddWatchReactor(podsResource.Resource, podWatchHandler)
}
if stackWatchHandler != nil {
fakePtr.AddWatchReactor(stacksResource.Resource, stackWatchHandler)
}
fakePtr.AddWatchReactor("*", k8stesting.DefaultWatchReactor(watch.NewFake(), nil))
return &testPodAndStackRepository{fake: fakePtr}
}
type testStackListWatch struct {
fake *k8stesting.Fake
ns string
}
func (s *testStackListWatch) List(opts metav1.ListOptions) (*apiv1beta1.StackList, error) {
obj, err := s.fake.Invokes(k8stesting.NewListAction(stacksResource, stackKind, s.ns, opts), &apiv1beta1.StackList{})
if obj == nil {
return nil, err
}
label, _, _ := k8stesting.ExtractFromListOptions(opts)
if label == nil {
label = labels.Everything()
}
list := &apiv1beta1.StackList{}
for _, item := range obj.(*apiv1beta1.StackList).Items {
if label.Matches(labels.Set(item.Labels)) {
list.Items = append(list.Items, item)
}
}
return list, err
}
func (s *testStackListWatch) Watch(opts metav1.ListOptions) (watch.Interface, error) {
return s.fake.InvokesWatch(k8stesting.NewWatchAction(stacksResource, s.ns, opts))
}
type testPodListWatch struct {
fake *k8stesting.Fake
ns string
}
func (p *testPodListWatch) List(opts metav1.ListOptions) (*apiv1.PodList, error) {
obj, err := p.fake.Invokes(k8stesting.NewListAction(podsResource, podKind, p.ns, opts), &apiv1.PodList{})
if obj == nil {
return nil, err
}
label, _, _ := k8stesting.ExtractFromListOptions(opts)
if label == nil {
label = labels.Everything()
}
list := &apiv1.PodList{}
for _, item := range obj.(*apiv1.PodList).Items {
if label.Matches(labels.Set(item.Labels)) {
list.Items = append(list.Items, item)
}
}
return list, err
}
func (p *testPodListWatch) Watch(opts metav1.ListOptions) (watch.Interface, error) {
return p.fake.InvokesWatch(k8stesting.NewWatchAction(podsResource, p.ns, opts))
}
func TestDeployWatchOk(t *testing.T) {
stack := apiv1beta1.Stack{
ObjectMeta: metav1.ObjectMeta{Name: "test-stack", Namespace: "test-ns"},
}
serviceNames := []string{"svc1", "svc2"}
testRepo := newTestPodAndStackRepository(nil, []apiv1beta1.Stack{stack}, func(action k8stesting.Action) (handled bool, ret watch.Interface, err error) {
res := watch.NewFake()
go func() {
pod1 := &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test1",
Namespace: "test-ns",
Labels: composelabels.ForService("test-stack", "svc1"),
},
Status: apiv1.PodStatus{
Phase: apiv1.PodRunning,
Conditions: []apiv1.PodCondition{
{
Type: apiv1.PodReady,
Status: apiv1.ConditionTrue,
},
},
},
}
pod2 := &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test2",
Namespace: "test-ns",
Labels: composelabels.ForService("test-stack", "svc2"),
},
Status: apiv1.PodStatus{
Phase: apiv1.PodRunning,
Conditions: []apiv1.PodCondition{
{
Type: apiv1.PodReady,
Status: apiv1.ConditionTrue,
},
},
},
}
res.Add(pod1)
res.Add(pod2)
}()
return true, res, nil
}, nil)
testee := &deployWatcher{
stacks: testRepo.stackListWatchForNamespace("test-ns"),
pods: testRepo.podListWatchForNamespace("test-ns"),
}
statusUpdates := make(chan serviceStatus)
go func() {
for range statusUpdates {
}
}()
defer close(statusUpdates)
err := testee.Watch(stack.Name, serviceNames, statusUpdates)
assert.NilError(t, err)
}
func TestDeployReconcileFailure(t *testing.T) {
stack := apiv1beta1.Stack{
ObjectMeta: metav1.ObjectMeta{Name: "test-stack", Namespace: "test-ns"},
}
serviceNames := []string{"svc1", "svc2"}
testRepo := newTestPodAndStackRepository(nil, []apiv1beta1.Stack{stack}, nil, func(action k8stesting.Action) (handled bool, ret watch.Interface, err error) {
res := watch.NewFake()
go func() {
sfailed := stack
sfailed.Status = apiv1beta1.StackStatus{
Phase: apiv1beta1.StackFailure,
Message: "test error",
}
res.Modify(&sfailed)
}()
return true, res, nil
})
testee := &deployWatcher{
stacks: testRepo.stackListWatchForNamespace("test-ns"),
pods: testRepo.podListWatchForNamespace("test-ns"),
}
statusUpdates := make(chan serviceStatus)
go func() {
for range statusUpdates {
}
}()
defer close(statusUpdates)
err := testee.Watch(stack.Name, serviceNames, statusUpdates)
assert.ErrorContains(t, err, "Failure: test error")
}