From f38510b2d839ccd09146bf0541af8b1f6253177e Mon Sep 17 00:00:00 2001 From: Simon Ferquel Date: Mon, 14 May 2018 15:44:55 +0200 Subject: [PATCH] Better stack status check Signed-off-by: Simon Ferquel --- cli/command/stack/kubernetes/cli.go | 9 + cli/command/stack/kubernetes/deploy.go | 112 ++++++- cli/command/stack/kubernetes/stackclient.go | 4 +- cli/command/stack/kubernetes/watcher.go | 293 ++++++++++++++----- cli/command/stack/kubernetes/watcher_test.go | 218 ++++++++++++++ 5 files changed, 550 insertions(+), 86 deletions(-) create mode 100644 cli/command/stack/kubernetes/watcher_test.go diff --git a/cli/command/stack/kubernetes/cli.go b/cli/command/stack/kubernetes/cli.go index 27bc819fa4..16c280114a 100644 --- a/cli/command/stack/kubernetes/cli.go +++ b/cli/command/stack/kubernetes/cli.go @@ -7,6 +7,7 @@ import ( "github.com/docker/cli/cli/command" "github.com/docker/cli/kubernetes" + cliv1beta1 "github.com/docker/cli/kubernetes/client/clientset/typed/compose/v1beta1" flag "github.com/spf13/pflag" kubeclient "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" @@ -113,3 +114,11 @@ func (c *KubeCli) checkHostsMatch() error { " Update $DOCKER_HOST (or pass -H), or use 'kubectl config use-context' to match.\n", daemonEndpoint.Hostname(), kubeEndpoint.Hostname()) return nil } + +func (c *KubeCli) stacksv1beta1() (cliv1beta1.StackInterface, error) { + raw, err := newStackV1Beta1(c.kubeConfig, c.kubeNamespace) + if err != nil { + return nil, err + } + return raw.stacks, nil +} diff --git a/cli/command/stack/kubernetes/deploy.go b/cli/command/stack/kubernetes/deploy.go index 48030a60f4..877cb1ef89 100644 --- a/cli/command/stack/kubernetes/deploy.go +++ b/cli/command/stack/kubernetes/deploy.go @@ -2,9 +2,12 @@ package kubernetes import ( "fmt" + "io" + "github.com/docker/cli/cli/command" "github.com/docker/cli/cli/command/stack/loader" "github.com/docker/cli/cli/command/stack/options" + "github.com/morikuni/aec" "github.com/pkg/errors" ) @@ -39,10 +42,6 @@ func RunDeploy(dockerCli *KubeCli, opts options.Deploy) error { configMaps := composeClient.ConfigMaps() secrets := composeClient.Secrets() services := composeClient.Services() - pods := composeClient.Pods() - watcher := DeployWatcher{ - Pods: pods, - } if err := stacks.IsColliding(services, stack); err != nil { return err @@ -61,10 +60,109 @@ func RunDeploy(dockerCli *KubeCli, opts options.Deploy) error { } fmt.Fprintln(cmdOut, "Waiting for the stack to be stable and running...") + v1beta1Cli, err := dockerCli.stacksv1beta1() + if err != nil { + return err + } - <-watcher.Watch(stack.name, stack.getServices()) - - fmt.Fprintf(cmdOut, "Stack %s is stable and running\n\n", stack.name) + pods := composeClient.Pods() + watcher := &deployWatcher{ + stacks: v1beta1Cli, + pods: pods, + } + statusUpdates := make(chan serviceStatus) + displayDone := make(chan struct{}) + go func() { + defer close(displayDone) + display := newStatusDisplay(dockerCli.Out()) + for status := range statusUpdates { + display.OnStatus(status) + } + }() + err = watcher.Watch(stack.name, stack.getServices(), statusUpdates) + close(statusUpdates) + <-displayDone + if err != nil { + return err + } + fmt.Fprintf(cmdOut, "\nStack %s is stable and running\n\n", stack.name) return nil + +} + +type statusDisplay interface { + OnStatus(serviceStatus) +} +type metaServiceState string + +const ( + metaServiceStateReady = metaServiceState("Ready") + metaServiceStatePending = metaServiceState("Pending") + metaServiceStateFailed = metaServiceState("Failed") +) + +func metaStateFromStatus(status serviceStatus) metaServiceState { + switch { + case status.podsReady > 0: + return metaServiceStateReady + case status.podsPending > 0: + return metaServiceStatePending + default: + return metaServiceStateFailed + } +} + +type forwardOnlyStatusDisplay struct { + o *command.OutStream + states map[string]metaServiceState +} + +func (d *forwardOnlyStatusDisplay) OnStatus(status serviceStatus) { + state := metaStateFromStatus(status) + if d.states[status.name] != state { + d.states[status.name] = state + fmt.Fprintf(d.o, "%s: %s\n", status.name, state) + } +} + +type interactiveStatusDisplay struct { + o *command.OutStream + statuses []serviceStatus +} + +func (d *interactiveStatusDisplay) OnStatus(status serviceStatus) { + b := aec.EmptyBuilder + for ix := 0; ix < len(d.statuses); ix++ { + b = b.Up(1).EraseLine(aec.EraseModes.All) + } + b = b.Column(0) + fmt.Fprint(d.o, b.ANSI) + updated := false + for ix, s := range d.statuses { + if s.name == status.name { + d.statuses[ix] = status + s = status + updated = true + } + displayInteractiveServiceStatus(s, d.o) + } + if !updated { + d.statuses = append(d.statuses, status) + displayInteractiveServiceStatus(status, d.o) + } +} + +func displayInteractiveServiceStatus(status serviceStatus, o io.Writer) { + state := metaStateFromStatus(status) + totalFailed := status.podsFailed + status.podsSucceeded + status.podsUnknown + fmt.Fprintf(o, "%[1]s: %[2]s\t\t[pod status: %[3]d/%[6]d ready, %[4]d/%[6]d pending, %[5]d/%[6]d failed]\n", status.name, state, + status.podsReady, status.podsPending, totalFailed, status.podsTotal) +} + +func newStatusDisplay(o *command.OutStream) statusDisplay { + if !o.IsTerminal() { + return &forwardOnlyStatusDisplay{o: o, states: map[string]metaServiceState{}} + } + return &interactiveStatusDisplay{o: o} } diff --git a/cli/command/stack/kubernetes/stackclient.go b/cli/command/stack/kubernetes/stackclient.go index 4a9852850d..b59a50d476 100644 --- a/cli/command/stack/kubernetes/stackclient.go +++ b/cli/command/stack/kubernetes/stackclient.go @@ -33,7 +33,7 @@ type stackV1Beta1 struct { stacks composev1beta1.StackInterface } -func newStackV1Beta1(config *rest.Config, namespace string) (StackClient, error) { +func newStackV1Beta1(config *rest.Config, namespace string) (*stackV1Beta1, error) { client, err := composev1beta1.NewForConfig(config) if err != nil { return nil, err @@ -136,7 +136,7 @@ type stackV1Beta2 struct { stacks composev1beta2.StackInterface } -func newStackV1Beta2(config *rest.Config, namespace string) (StackClient, error) { +func newStackV1Beta2(config *rest.Config, namespace string) (*stackV1Beta2, error) { client, err := composev1beta2.NewForConfig(config) if err != nil { return nil, err diff --git a/cli/command/stack/kubernetes/watcher.go b/cli/command/stack/kubernetes/watcher.go index cfe28800b1..93d3358b4b 100644 --- a/cli/command/stack/kubernetes/watcher.go +++ b/cli/command/stack/kubernetes/watcher.go @@ -1,116 +1,255 @@ package kubernetes import ( - "fmt" + "context" + "sync" "time" + apiv1beta1 "github.com/docker/cli/kubernetes/compose/v1beta1" "github.com/docker/cli/kubernetes/labels" + "github.com/pkg/errors" apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - corev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/apimachinery/pkg/runtime" + runtimeutil "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/watch" + cache "k8s.io/client-go/tools/cache" + podutils "k8s.io/kubernetes/pkg/api/v1/pod" ) +type stackListWatch interface { + List(opts metav1.ListOptions) (*apiv1beta1.StackList, error) + Watch(opts metav1.ListOptions) (watch.Interface, error) +} + +type podListWatch interface { + List(opts metav1.ListOptions) (*apiv1.PodList, error) + Watch(opts metav1.ListOptions) (watch.Interface, error) +} + // DeployWatcher watches a stack deployement -type DeployWatcher struct { - Pods corev1.PodInterface +type deployWatcher struct { + pods podListWatch + stacks stackListWatch } // Watch watches a stuck deployement and return a chan that will holds the state of the stack -func (w DeployWatcher) Watch(name string, serviceNames []string) chan bool { - stop := make(chan bool) +func (w *deployWatcher) Watch(name string, serviceNames []string, statusUpdates chan serviceStatus) error { + errC := make(chan error, 1) + defer close(errC) - go w.waitForPods(name, serviceNames, stop) + handlers := runtimeutil.ErrorHandlers - return stop + // informer errors are reported using global error handlers + runtimeutil.ErrorHandlers = append(handlers, func(err error) { + errC <- err + }) + defer func() { + runtimeutil.ErrorHandlers = handlers + }() + + ctx, cancel := context.WithCancel(context.Background()) + wg := sync.WaitGroup{} + defer func() { + cancel() + wg.Wait() + }() + wg.Add(2) + go func() { + defer wg.Done() + w.watchStackStatus(ctx, name, errC) + }() + go func() { + defer wg.Done() + w.waitForPods(ctx, name, serviceNames, errC, statusUpdates) + }() + + return <-errC } -func (w DeployWatcher) waitForPods(stackName string, serviceNames []string, stop chan bool) { - starts := map[string]int32{} +type stackWatcher struct { + resultChan chan error + stackName string +} - for { - time.Sleep(1 * time.Second) +var _ cache.ResourceEventHandler = &stackWatcher{} - list, err := w.Pods.List(metav1.ListOptions{ - LabelSelector: labels.SelectorForStack(stackName), - IncludeUninitialized: true, - }) - if err != nil { - stop <- true - return - } +func (sw *stackWatcher) OnAdd(obj interface{}) { + stack, ok := obj.(*apiv1beta1.Stack) + switch { + case !ok: + sw.resultChan <- errors.Errorf("stack %s has incorrect type", sw.stackName) + case stack.Status.Phase == apiv1beta1.StackFailure: + sw.resultChan <- errors.Errorf("stack %s failed with status %s: %s", sw.stackName, stack.Status.Phase, stack.Status.Message) + } +} - for i := range list.Items { - pod := list.Items[i] - if pod.Status.Phase != apiv1.PodRunning { - continue - } +func (sw *stackWatcher) OnUpdate(oldObj, newObj interface{}) { + sw.OnAdd(newObj) +} - startCount := startCount(pod) - serviceName := pod.Labels[labels.ForServiceName] - if startCount != starts[serviceName] { - if startCount == 1 { - fmt.Printf(" - Service %s has one container running\n", serviceName) - } else { - fmt.Printf(" - Service %s was restarted %d %s\n", serviceName, startCount-1, timeTimes(startCount-1)) - } +func (sw *stackWatcher) OnDelete(obj interface{}) { +} - starts[serviceName] = startCount - } - } +func (w *deployWatcher) watchStackStatus(ctx context.Context, stackname string, e chan error) { + informer := newStackInformer(w.stacks, stackname) + sw := &stackWatcher{ + resultChan: e, + } + informer.AddEventHandler(sw) + informer.Run(ctx.Done()) +} - if allReady(list.Items, serviceNames) { - stop <- true - return +type serviceStatus struct { + name string + podsPending int + podsRunning int + podsSucceeded int + podsFailed int + podsUnknown int + podsReady int + podsTotal int +} + +type podWatcher struct { + stackName string + services map[string]serviceStatus + resultChan chan error + starts map[string]int32 + indexer cache.Indexer + statusUpdates chan serviceStatus +} + +var _ cache.ResourceEventHandler = &podWatcher{} + +func (pw *podWatcher) handlePod(obj interface{}) { + pod, ok := obj.(*apiv1.Pod) + if !ok { + pw.resultChan <- errors.Errorf("Pod has incorrect type in stack %s", pw.stackName) + return + } + serviceName := pod.Labels[labels.ForServiceName] + pw.updateServiceStatus(serviceName) + if pw.allReady() { + select { + case pw.resultChan <- nil: + default: + // result has already been reported, just don't block } } } -func startCount(pod apiv1.Pod) int32 { - restart := int32(0) - - for _, status := range pod.Status.ContainerStatuses { - restart += status.RestartCount - } - - return 1 + restart -} - -func allReady(pods []apiv1.Pod, serviceNames []string) bool { - serviceUp := map[string]bool{} - - for _, pod := range pods { - if time.Since(pod.GetCreationTimestamp().Time) < 10*time.Second { - return false - } - - ready := false - for _, cond := range pod.Status.Conditions { - if cond.Type == apiv1.PodReady && cond.Status == apiv1.ConditionTrue { - ready = true +func (pw *podWatcher) updateServiceStatus(serviceName string) { + pods, _ := pw.indexer.ByIndex("byservice", serviceName) + status := serviceStatus{name: serviceName} + for _, obj := range pods { + if pod, ok := obj.(*apiv1.Pod); ok { + switch pod.Status.Phase { + case apiv1.PodPending: + status.podsPending++ + case apiv1.PodRunning: + status.podsRunning++ + case apiv1.PodSucceeded: + status.podsSucceeded++ + case apiv1.PodFailed: + status.podsFailed++ + case apiv1.PodUnknown: + status.podsUnknown++ + } + if podutils.IsPodReady(pod) { + status.podsReady++ } } - - if !ready { - return false - } - - serviceName := pod.Labels[labels.ForServiceName] - serviceUp[serviceName] = true } + status.podsTotal = len(pods) + oldStatus := pw.services[serviceName] + if oldStatus != status { + pw.statusUpdates <- status + } + pw.services[serviceName] = status +} - for _, serviceName := range serviceNames { - if !serviceUp[serviceName] { +func (pw *podWatcher) allReady() bool { + for _, status := range pw.services { + if status.podsReady == 0 { return false } } - return true } -func timeTimes(n int32) string { - if n == 1 { - return "time" - } - - return "times" +func (pw *podWatcher) OnAdd(obj interface{}) { + pw.handlePod(obj) +} + +func (pw *podWatcher) OnUpdate(oldObj, newObj interface{}) { + pw.handlePod(newObj) +} + +func (pw *podWatcher) OnDelete(obj interface{}) { + pw.handlePod(obj) +} + +func (w *deployWatcher) waitForPods(ctx context.Context, stackName string, serviceNames []string, e chan error, statusUpdates chan serviceStatus) { + informer := newPodInformer(w.pods, stackName, cache.Indexers{ + "byservice": func(obj interface{}) ([]string, error) { + pod, ok := obj.(*apiv1.Pod) + if !ok { + return nil, errors.Errorf("Pod has incorrect type in stack %s", stackName) + } + return []string{pod.Labels[labels.ForServiceName]}, nil + }}) + services := map[string]serviceStatus{} + for _, name := range serviceNames { + services[name] = serviceStatus{name: name} + } + pw := &podWatcher{ + stackName: stackName, + services: services, + resultChan: e, + starts: map[string]int32{}, + indexer: informer.GetIndexer(), + statusUpdates: statusUpdates, + } + informer.AddEventHandler(pw) + informer.Run(ctx.Done()) +} + +func newPodInformer(podsClient podListWatch, stackName string, indexers cache.Indexers) cache.SharedIndexInformer { + return cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + options.LabelSelector = labels.SelectorForStack(stackName) + options.IncludeUninitialized = true + return podsClient.List(options) + }, + + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + options.LabelSelector = labels.SelectorForStack(stackName) + options.IncludeUninitialized = true + return podsClient.Watch(options) + }, + }, + &apiv1.Pod{}, + time.Second*5, + indexers, + ) +} + +func newStackInformer(stacksClient stackListWatch, stackName string) cache.SharedInformer { + return cache.NewSharedInformer( + &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + options.LabelSelector = labels.SelectorForStack(stackName) + return stacksClient.List(options) + }, + + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + options.LabelSelector = labels.SelectorForStack(stackName) + return stacksClient.Watch(options) + }, + }, + &apiv1beta1.Stack{}, + time.Second*5, + ) } diff --git a/cli/command/stack/kubernetes/watcher_test.go b/cli/command/stack/kubernetes/watcher_test.go new file mode 100644 index 0000000000..80c7b5fd8d --- /dev/null +++ b/cli/command/stack/kubernetes/watcher_test.go @@ -0,0 +1,218 @@ +package kubernetes + +import ( + "testing" + + apiv1beta1 "github.com/docker/cli/kubernetes/compose/v1beta1" + composelabels "github.com/docker/cli/kubernetes/labels" + "github.com/gotestyourself/gotestyourself/assert" + apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/watch" + k8stesting "k8s.io/client-go/testing" +) + +var podsResource = apiv1.SchemeGroupVersion.WithResource("pods") +var podKind = apiv1.SchemeGroupVersion.WithKind("Pod") +var stacksResource = apiv1beta1.SchemeGroupVersion.WithResource("stacks") +var stackKind = apiv1beta1.SchemeGroupVersion.WithKind("Stack") + +type testPodAndStackRepository struct { + fake *k8stesting.Fake +} + +func (r *testPodAndStackRepository) stackListWatchForNamespace(ns string) *testStackListWatch { + return &testStackListWatch{fake: r.fake, ns: ns} +} +func (r *testPodAndStackRepository) podListWatchForNamespace(ns string) *testPodListWatch { + return &testPodListWatch{fake: r.fake, ns: ns} +} + +func newTestPodAndStackRepository(initialPods []apiv1.Pod, initialStacks []apiv1beta1.Stack, podWatchHandler, stackWatchHandler k8stesting.WatchReactionFunc) *testPodAndStackRepository { + var scheme = runtime.NewScheme() + var codecs = serializer.NewCodecFactory(scheme) + metav1.AddToGroupVersion(scheme, schema.GroupVersion{Version: "v1"}) + apiv1.AddToScheme(scheme) + apiv1beta1.AddToScheme(scheme) + + o := k8stesting.NewObjectTracker(scheme, codecs.UniversalDecoder()) + for _, obj := range initialPods { + if err := o.Add(&obj); err != nil { + panic(err) + } + } + for _, obj := range initialStacks { + if err := o.Add(&obj); err != nil { + panic(err) + } + } + fakePtr := &k8stesting.Fake{} + fakePtr.AddReactor("*", "*", k8stesting.ObjectReaction(o)) + if podWatchHandler != nil { + fakePtr.AddWatchReactor(podsResource.Resource, podWatchHandler) + } + if stackWatchHandler != nil { + fakePtr.AddWatchReactor(stacksResource.Resource, stackWatchHandler) + } + fakePtr.AddWatchReactor("*", k8stesting.DefaultWatchReactor(watch.NewFake(), nil)) + return &testPodAndStackRepository{fake: fakePtr} +} + +type testStackListWatch struct { + fake *k8stesting.Fake + ns string +} + +func (s *testStackListWatch) List(opts metav1.ListOptions) (*apiv1beta1.StackList, error) { + obj, err := s.fake.Invokes(k8stesting.NewListAction(stacksResource, stackKind, s.ns, opts), &apiv1beta1.StackList{}) + + if obj == nil { + return nil, err + } + + label, _, _ := k8stesting.ExtractFromListOptions(opts) + if label == nil { + label = labels.Everything() + } + list := &apiv1beta1.StackList{} + for _, item := range obj.(*apiv1beta1.StackList).Items { + if label.Matches(labels.Set(item.Labels)) { + list.Items = append(list.Items, item) + } + } + return list, err +} +func (s *testStackListWatch) Watch(opts metav1.ListOptions) (watch.Interface, error) { + return s.fake.InvokesWatch(k8stesting.NewWatchAction(stacksResource, s.ns, opts)) +} + +type testPodListWatch struct { + fake *k8stesting.Fake + ns string +} + +func (p *testPodListWatch) List(opts metav1.ListOptions) (*apiv1.PodList, error) { + obj, err := p.fake.Invokes(k8stesting.NewListAction(podsResource, podKind, p.ns, opts), &apiv1.PodList{}) + + if obj == nil { + return nil, err + } + + label, _, _ := k8stesting.ExtractFromListOptions(opts) + if label == nil { + label = labels.Everything() + } + list := &apiv1.PodList{} + for _, item := range obj.(*apiv1.PodList).Items { + if label.Matches(labels.Set(item.Labels)) { + list.Items = append(list.Items, item) + } + } + return list, err + +} +func (p *testPodListWatch) Watch(opts metav1.ListOptions) (watch.Interface, error) { + return p.fake.InvokesWatch(k8stesting.NewWatchAction(podsResource, p.ns, opts)) +} + +func TestDeployWatchOk(t *testing.T) { + stack := apiv1beta1.Stack{ + ObjectMeta: metav1.ObjectMeta{Name: "test-stack", Namespace: "test-ns"}, + } + + serviceNames := []string{"svc1", "svc2"} + testRepo := newTestPodAndStackRepository(nil, []apiv1beta1.Stack{stack}, func(action k8stesting.Action) (handled bool, ret watch.Interface, err error) { + res := watch.NewFake() + go func() { + pod1 := &apiv1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test1", + Namespace: "test-ns", + Labels: composelabels.ForService("test-stack", "svc1"), + }, + Status: apiv1.PodStatus{ + Phase: apiv1.PodRunning, + Conditions: []apiv1.PodCondition{ + { + Type: apiv1.PodReady, + Status: apiv1.ConditionTrue, + }, + }, + }, + } + pod2 := &apiv1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test2", + Namespace: "test-ns", + Labels: composelabels.ForService("test-stack", "svc2"), + }, + Status: apiv1.PodStatus{ + Phase: apiv1.PodRunning, + Conditions: []apiv1.PodCondition{ + { + Type: apiv1.PodReady, + Status: apiv1.ConditionTrue, + }, + }, + }, + } + res.Add(pod1) + res.Add(pod2) + }() + + return true, res, nil + }, nil) + + testee := &deployWatcher{ + stacks: testRepo.stackListWatchForNamespace("test-ns"), + pods: testRepo.podListWatchForNamespace("test-ns"), + } + + statusUpdates := make(chan serviceStatus) + go func() { + for range statusUpdates { + } + }() + defer close(statusUpdates) + err := testee.Watch(stack.Name, serviceNames, statusUpdates) + assert.NilError(t, err) +} + +func TestDeployReconcileFailure(t *testing.T) { + stack := apiv1beta1.Stack{ + ObjectMeta: metav1.ObjectMeta{Name: "test-stack", Namespace: "test-ns"}, + } + + serviceNames := []string{"svc1", "svc2"} + testRepo := newTestPodAndStackRepository(nil, []apiv1beta1.Stack{stack}, nil, func(action k8stesting.Action) (handled bool, ret watch.Interface, err error) { + res := watch.NewFake() + go func() { + sfailed := stack + sfailed.Status = apiv1beta1.StackStatus{ + Phase: apiv1beta1.StackFailure, + Message: "test error", + } + res.Modify(&sfailed) + }() + + return true, res, nil + }) + + testee := &deployWatcher{ + stacks: testRepo.stackListWatchForNamespace("test-ns"), + pods: testRepo.podListWatchForNamespace("test-ns"), + } + + statusUpdates := make(chan serviceStatus) + go func() { + for range statusUpdates { + } + }() + defer close(statusUpdates) + err := testee.Watch(stack.Name, serviceNames, statusUpdates) + assert.ErrorContains(t, err, "Failure: test error") +}