From 1ef585f65dd0079a22286ec404940685baa1f80b Mon Sep 17 00:00:00 2001 From: Aaron Lehmann Date: Tue, 27 Jun 2017 14:57:47 -0700 Subject: [PATCH 1/3] progress: Show task error in place of progress bar If a task encounters an error, the interactive "service create" and "service update" commands should show that error instead of showing a stuck progress bar. To validate: docker service create --detach=false --name broken --restart-condition=none --replicas 3 busybox asdf and docker service create --detach=false --name broken --mode global --restart-condition none busybox asdf Signed-off-by: Aaron Lehmann --- cli/command/service/progress/progress.go | 71 ++++++++++++++++++++---- 1 file changed, 61 insertions(+), 10 deletions(-) diff --git a/cli/command/service/progress/progress.go b/cli/command/service/progress/progress.go index d8300ce8d2..7c3f751a68 100644 --- a/cli/command/service/progress/progress.go +++ b/cli/command/service/progress/progress.go @@ -6,6 +6,7 @@ import ( "io" "os" "os/signal" + "strings" "time" "github.com/docker/docker/api/types" @@ -29,6 +30,13 @@ var ( swarm.TaskStateReady: 7, swarm.TaskStateStarting: 8, swarm.TaskStateRunning: 9, + + // The following states are not actually shown in progress + // output, but are used internally for ordering. + swarm.TaskStateComplete: 10, + swarm.TaskStateShutdown: 11, + swarm.TaskStateFailed: 12, + swarm.TaskStateRejected: 13, } longestState int @@ -45,17 +53,21 @@ type progressUpdater interface { func init() { for state := range numberedStates { - if len(state) > longestState { + if !terminalState(state) && len(state) > longestState { longestState = len(state) } } } +func terminalState(state swarm.TaskState) bool { + return numberedStates[state] > numberedStates[swarm.TaskStateRunning] +} + func stateToProgress(state swarm.TaskState, rollback bool) int64 { if !rollback { return numberedStates[state] } - return int64(len(numberedStates)) - numberedStates[state] + return numberedStates[swarm.TaskStateRunning] - numberedStates[state] } // ServiceProgress outputs progress information for convergence of a service. @@ -230,6 +242,18 @@ func writeOverallProgress(progressOut progress.Output, numerator, denominator in }) } +func truncError(errMsg string) string { + // Remove newlines from the error, which corrupt the output. + errMsg = strings.Replace(errMsg, "\n", " ", -1) + + // Limit the length to 75 characters, so that even on narrow terminals + // this will not overflow to the next line. + if len(errMsg) > 75 { + errMsg = errMsg[:74] + "…" + } + return errMsg +} + type replicatedProgressUpdater struct { progressOut progress.Output @@ -303,7 +327,23 @@ func (u *replicatedProgressUpdater) update(service swarm.Service, tasks []swarm. u.slotMap[task.Slot] = mappedSlot } - if !u.done && replicas <= maxProgressBars && uint64(mappedSlot) <= replicas { + if !terminalState(task.DesiredState) && task.Status.State == swarm.TaskStateRunning { + running++ + } + + if u.done || replicas > maxProgressBars || uint64(mappedSlot) > replicas { + continue + } + + if task.Status.Err != "" { + u.progressOut.WriteProgress(progress.Progress{ + ID: fmt.Sprintf("%d/%d", mappedSlot, replicas), + Action: truncError(task.Status.Err), + }) + continue + } + + if !terminalState(task.DesiredState) && !terminalState(task.Status.State) { u.progressOut.WriteProgress(progress.Progress{ ID: fmt.Sprintf("%d/%d", mappedSlot, replicas), Action: fmt.Sprintf("%-[1]*s", longestState, task.Status.State), @@ -312,9 +352,6 @@ func (u *replicatedProgressUpdater) update(service swarm.Service, tasks []swarm. HideCounts: true, }) } - if task.Status.State == swarm.TaskStateRunning { - running++ - } } if !u.done { @@ -335,6 +372,7 @@ type globalProgressUpdater struct { done bool } +// nolint: gocyclo func (u *globalProgressUpdater) update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]swarm.Node, rollback bool) (bool, error) { // If there are multiple tasks with the same node ID, favor the one // with the *lowest* desired state. This can happen in restart @@ -388,7 +426,23 @@ func (u *globalProgressUpdater) update(service swarm.Service, tasks []swarm.Task for _, task := range tasksByNode { if node, nodeActive := activeNodes[task.NodeID]; nodeActive { - if !u.done && nodeCount <= maxProgressBars { + if !terminalState(task.DesiredState) && task.Status.State == swarm.TaskStateRunning { + running++ + } + + if u.done || nodeCount > maxProgressBars { + continue + } + + if task.Status.Err != "" { + u.progressOut.WriteProgress(progress.Progress{ + ID: stringid.TruncateID(node.ID), + Action: truncError(task.Status.Err), + }) + continue + } + + if !terminalState(task.DesiredState) && !terminalState(task.Status.State) { u.progressOut.WriteProgress(progress.Progress{ ID: stringid.TruncateID(node.ID), Action: fmt.Sprintf("%-[1]*s", longestState, task.Status.State), @@ -397,9 +451,6 @@ func (u *globalProgressUpdater) update(service swarm.Service, tasks []swarm.Task HideCounts: true, }) } - if task.Status.State == swarm.TaskStateRunning { - running++ - } } } From dd3eae84e1ae11ae11c013b549933002074da814 Mon Sep 17 00:00:00 2001 From: Aaron Lehmann Date: Wed, 28 Jun 2017 17:29:57 -0700 Subject: [PATCH 2/3] progress: Add unit tests Signed-off-by: Aaron Lehmann --- cli/command/service/progress/progress.go | 42 +- cli/command/service/progress/progress_test.go | 374 ++++++++++++++++++ 2 files changed, 403 insertions(+), 13 deletions(-) create mode 100644 cli/command/service/progress/progress_test.go diff --git a/cli/command/service/progress/progress.go b/cli/command/service/progress/progress.go index 7c3f751a68..94bf229b3a 100644 --- a/cli/command/service/progress/progress.go +++ b/cli/command/service/progress/progress.go @@ -48,7 +48,7 @@ const ( ) type progressUpdater interface { - update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]swarm.Node, rollback bool) (bool, error) + update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]struct{}, rollback bool) (bool, error) } func init() { @@ -199,16 +199,16 @@ func ServiceProgress(ctx context.Context, client client.APIClient, serviceID str } } -func getActiveNodes(ctx context.Context, client client.APIClient) (map[string]swarm.Node, error) { +func getActiveNodes(ctx context.Context, client client.APIClient) (map[string]struct{}, error) { nodes, err := client.NodeList(ctx, types.NodeListOptions{}) if err != nil { return nil, err } - activeNodes := make(map[string]swarm.Node) + activeNodes := make(map[string]struct{}) for _, n := range nodes { if n.Status.State != swarm.NodeStateDown { - activeNodes[n.ID] = n + activeNodes[n.ID] = struct{}{} } } return activeNodes, nil @@ -266,7 +266,7 @@ type replicatedProgressUpdater struct { } // nolint: gocyclo -func (u *replicatedProgressUpdater) update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]swarm.Node, rollback bool) (bool, error) { +func (u *replicatedProgressUpdater) update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]struct{}, rollback bool) (bool, error) { if service.Spec.Mode.Replicated == nil || service.Spec.Mode.Replicated.Replicas == nil { return false, errors.New("no replica count") } @@ -291,11 +291,18 @@ func (u *replicatedProgressUpdater) update(service swarm.Service, tasks []swarm. // scenarios. tasksBySlot := make(map[int]swarm.Task) for _, task := range tasks { - if numberedStates[task.DesiredState] == 0 { + if numberedStates[task.DesiredState] == 0 || numberedStates[task.Status.State] == 0 { continue } if existingTask, ok := tasksBySlot[task.Slot]; ok { - if numberedStates[existingTask.DesiredState] <= numberedStates[task.DesiredState] { + if numberedStates[existingTask.DesiredState] < numberedStates[task.DesiredState] { + continue + } + // If the desired states match, observed state breaks + // ties. This can happen with the "start first" service + // update mode. + if numberedStates[existingTask.DesiredState] == numberedStates[task.DesiredState] && + numberedStates[existingTask.Status.State] <= numberedStates[task.Status.State] { continue } } @@ -373,19 +380,28 @@ type globalProgressUpdater struct { } // nolint: gocyclo -func (u *globalProgressUpdater) update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]swarm.Node, rollback bool) (bool, error) { +func (u *globalProgressUpdater) update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]struct{}, rollback bool) (bool, error) { // If there are multiple tasks with the same node ID, favor the one // with the *lowest* desired state. This can happen in restart // scenarios. tasksByNode := make(map[string]swarm.Task) for _, task := range tasks { - if numberedStates[task.DesiredState] == 0 { + if numberedStates[task.DesiredState] == 0 || numberedStates[task.Status.State] == 0 { continue } if existingTask, ok := tasksByNode[task.NodeID]; ok { - if numberedStates[existingTask.DesiredState] <= numberedStates[task.DesiredState] { + if numberedStates[existingTask.DesiredState] < numberedStates[task.DesiredState] { continue } + + // If the desired states match, observed state breaks + // ties. This can happen with the "start first" service + // update mode. + if numberedStates[existingTask.DesiredState] == numberedStates[task.DesiredState] && + numberedStates[existingTask.Status.State] <= numberedStates[task.Status.State] { + continue + } + } tasksByNode[task.NodeID] = task } @@ -425,7 +441,7 @@ func (u *globalProgressUpdater) update(service swarm.Service, tasks []swarm.Task running := 0 for _, task := range tasksByNode { - if node, nodeActive := activeNodes[task.NodeID]; nodeActive { + if _, nodeActive := activeNodes[task.NodeID]; nodeActive { if !terminalState(task.DesiredState) && task.Status.State == swarm.TaskStateRunning { running++ } @@ -436,7 +452,7 @@ func (u *globalProgressUpdater) update(service swarm.Service, tasks []swarm.Task if task.Status.Err != "" { u.progressOut.WriteProgress(progress.Progress{ - ID: stringid.TruncateID(node.ID), + ID: stringid.TruncateID(task.NodeID), Action: truncError(task.Status.Err), }) continue @@ -444,7 +460,7 @@ func (u *globalProgressUpdater) update(service swarm.Service, tasks []swarm.Task if !terminalState(task.DesiredState) && !terminalState(task.Status.State) { u.progressOut.WriteProgress(progress.Progress{ - ID: stringid.TruncateID(node.ID), + ID: stringid.TruncateID(task.NodeID), Action: fmt.Sprintf("%-[1]*s", longestState, task.Status.State), Current: stateToProgress(task.Status.State, rollback), Total: maxProgress, diff --git a/cli/command/service/progress/progress_test.go b/cli/command/service/progress/progress_test.go new file mode 100644 index 0000000000..d1c118f7d7 --- /dev/null +++ b/cli/command/service/progress/progress_test.go @@ -0,0 +1,374 @@ +package progress + +import ( + "fmt" + "strconv" + "testing" + + "github.com/docker/docker/api/types/swarm" + "github.com/docker/docker/pkg/progress" + "github.com/stretchr/testify/assert" +) + +type mockProgress struct { + p []progress.Progress +} + +func (mp *mockProgress) WriteProgress(p progress.Progress) error { + mp.p = append(mp.p, p) + return nil +} + +func (mp *mockProgress) clear() { + mp.p = nil +} + +type updaterTester struct { + t *testing.T + updater progressUpdater + p *mockProgress + service swarm.Service + activeNodes map[string]struct{} + rollback bool +} + +func (u updaterTester) testUpdater(tasks []swarm.Task, expectedConvergence bool, expectedProgress []progress.Progress) { + u.p.clear() + + converged, err := u.updater.update(u.service, tasks, u.activeNodes, u.rollback) + assert.NoError(u.t, err) + assert.Equal(u.t, expectedConvergence, converged) + assert.Equal(u.t, expectedProgress, u.p.p) +} + +func TestReplicatedProgressUpdaterOneReplica(t *testing.T) { + replicas := uint64(1) + + service := swarm.Service{ + Spec: swarm.ServiceSpec{ + Mode: swarm.ServiceMode{ + Replicated: &swarm.ReplicatedService{ + Replicas: &replicas, + }, + }, + }, + } + + p := &mockProgress{} + updaterTester := updaterTester{ + t: t, + updater: &replicatedProgressUpdater{ + progressOut: p, + }, + p: p, + activeNodes: map[string]struct{}{"a": {}, "b": {}}, + service: service, + } + + tasks := []swarm.Task{} + + updaterTester.testUpdater(tasks, false, + []progress.Progress{ + {ID: "overall progress", Action: "0 out of 1 tasks"}, + {ID: "1/1", Action: " "}, + {ID: "overall progress", Action: "0 out of 1 tasks"}, + }) + + // Task with DesiredState beyond Running is ignored + tasks = append(tasks, + swarm.Task{ID: "1", + NodeID: "a", + DesiredState: swarm.TaskStateShutdown, + Status: swarm.TaskStatus{State: swarm.TaskStateNew}, + }) + updaterTester.testUpdater(tasks, false, + []progress.Progress{ + {ID: "overall progress", Action: "0 out of 1 tasks"}, + }) + + // Task with valid DesiredState and State updates progress bar + tasks[0].DesiredState = swarm.TaskStateRunning + updaterTester.testUpdater(tasks, false, + []progress.Progress{ + {ID: "1/1", Action: "new ", Current: 1, Total: 9, HideCounts: true}, + {ID: "overall progress", Action: "0 out of 1 tasks"}, + }) + + // If the task exposes an error, we should show that instead of the + // progress bar. + tasks[0].Status.Err = "something is wrong" + updaterTester.testUpdater(tasks, false, + []progress.Progress{ + {ID: "1/1", Action: "something is wrong"}, + {ID: "overall progress", Action: "0 out of 1 tasks"}, + }) + + // When the task reaches running, update should return true + tasks[0].Status.Err = "" + tasks[0].Status.State = swarm.TaskStateRunning + updaterTester.testUpdater(tasks, true, + []progress.Progress{ + {ID: "1/1", Action: "running ", Current: 9, Total: 9, HideCounts: true}, + {ID: "overall progress", Action: "1 out of 1 tasks"}, + }) + + // If the task fails, update should return false again + tasks[0].Status.Err = "task failed" + tasks[0].Status.State = swarm.TaskStateFailed + updaterTester.testUpdater(tasks, false, + []progress.Progress{ + {ID: "1/1", Action: "task failed"}, + {ID: "overall progress", Action: "0 out of 1 tasks"}, + }) + + // If the task is restarted, progress output should be shown for the + // replacement task, not the old task. + tasks[0].DesiredState = swarm.TaskStateShutdown + tasks = append(tasks, + swarm.Task{ID: "2", + NodeID: "b", + DesiredState: swarm.TaskStateRunning, + Status: swarm.TaskStatus{State: swarm.TaskStateRunning}, + }) + updaterTester.testUpdater(tasks, true, + []progress.Progress{ + {ID: "1/1", Action: "running ", Current: 9, Total: 9, HideCounts: true}, + {ID: "overall progress", Action: "1 out of 1 tasks"}, + }) + + // Add a new task while the current one is still running, to simulate + // "start-then-stop" updates. + tasks = append(tasks, + swarm.Task{ID: "3", + NodeID: "b", + DesiredState: swarm.TaskStateRunning, + Status: swarm.TaskStatus{State: swarm.TaskStatePreparing}, + }) + updaterTester.testUpdater(tasks, false, + []progress.Progress{ + {ID: "1/1", Action: "preparing", Current: 6, Total: 9, HideCounts: true}, + {ID: "overall progress", Action: "0 out of 1 tasks"}, + }) +} + +func TestReplicatedProgressUpdaterManyReplicas(t *testing.T) { + replicas := uint64(50) + + service := swarm.Service{ + Spec: swarm.ServiceSpec{ + Mode: swarm.ServiceMode{ + Replicated: &swarm.ReplicatedService{ + Replicas: &replicas, + }, + }, + }, + } + + p := &mockProgress{} + updaterTester := updaterTester{ + t: t, + updater: &replicatedProgressUpdater{ + progressOut: p, + }, + p: p, + activeNodes: map[string]struct{}{"a": {}, "b": {}}, + service: service, + } + + tasks := []swarm.Task{} + + // No per-task progress bars because there are too many replicas + updaterTester.testUpdater(tasks, false, + []progress.Progress{ + {ID: "overall progress", Action: fmt.Sprintf("0 out of %d tasks", replicas)}, + {ID: "overall progress", Action: fmt.Sprintf("0 out of %d tasks", replicas)}, + }) + + for i := 0; i != int(replicas); i++ { + tasks = append(tasks, + swarm.Task{ + ID: strconv.Itoa(i), + Slot: i + 1, + NodeID: "a", + DesiredState: swarm.TaskStateRunning, + Status: swarm.TaskStatus{State: swarm.TaskStateNew}, + }) + + if i%2 == 1 { + tasks[i].NodeID = "b" + } + updaterTester.testUpdater(tasks, false, + []progress.Progress{ + {ID: "overall progress", Action: fmt.Sprintf("%d out of %d tasks", i, replicas)}, + }) + + tasks[i].Status.State = swarm.TaskStateRunning + updaterTester.testUpdater(tasks, uint64(i) == replicas-1, + []progress.Progress{ + {ID: "overall progress", Action: fmt.Sprintf("%d out of %d tasks", i+1, replicas)}, + }) + } +} + +func TestGlobalProgressUpdaterOneNode(t *testing.T) { + service := swarm.Service{ + Spec: swarm.ServiceSpec{ + Mode: swarm.ServiceMode{ + Global: &swarm.GlobalService{}, + }, + }, + } + + p := &mockProgress{} + updaterTester := updaterTester{ + t: t, + updater: &globalProgressUpdater{ + progressOut: p, + }, + p: p, + activeNodes: map[string]struct{}{"a": {}, "b": {}}, + service: service, + } + + tasks := []swarm.Task{} + + updaterTester.testUpdater(tasks, false, + []progress.Progress{ + {ID: "overall progress", Action: "waiting for new tasks"}, + }) + + // Task with DesiredState beyond Running is ignored + tasks = append(tasks, + swarm.Task{ID: "1", + NodeID: "a", + DesiredState: swarm.TaskStateShutdown, + Status: swarm.TaskStatus{State: swarm.TaskStateNew}, + }) + updaterTester.testUpdater(tasks, false, + []progress.Progress{ + {ID: "overall progress", Action: "0 out of 1 tasks"}, + {ID: "overall progress", Action: "0 out of 1 tasks"}, + }) + + // Task with valid DesiredState and State updates progress bar + tasks[0].DesiredState = swarm.TaskStateRunning + updaterTester.testUpdater(tasks, false, + []progress.Progress{ + {ID: "a", Action: "new ", Current: 1, Total: 9, HideCounts: true}, + {ID: "overall progress", Action: "0 out of 1 tasks"}, + }) + + // If the task exposes an error, we should show that instead of the + // progress bar. + tasks[0].Status.Err = "something is wrong" + updaterTester.testUpdater(tasks, false, + []progress.Progress{ + {ID: "a", Action: "something is wrong"}, + {ID: "overall progress", Action: "0 out of 1 tasks"}, + }) + + // When the task reaches running, update should return true + tasks[0].Status.Err = "" + tasks[0].Status.State = swarm.TaskStateRunning + updaterTester.testUpdater(tasks, true, + []progress.Progress{ + {ID: "a", Action: "running ", Current: 9, Total: 9, HideCounts: true}, + {ID: "overall progress", Action: "1 out of 1 tasks"}, + }) + + // If the task fails, update should return false again + tasks[0].Status.Err = "task failed" + tasks[0].Status.State = swarm.TaskStateFailed + updaterTester.testUpdater(tasks, false, + []progress.Progress{ + {ID: "a", Action: "task failed"}, + {ID: "overall progress", Action: "0 out of 1 tasks"}, + }) + + // If the task is restarted, progress output should be shown for the + // replacement task, not the old task. + tasks[0].DesiredState = swarm.TaskStateShutdown + tasks = append(tasks, + swarm.Task{ID: "2", + NodeID: "a", + DesiredState: swarm.TaskStateRunning, + Status: swarm.TaskStatus{State: swarm.TaskStateRunning}, + }) + updaterTester.testUpdater(tasks, true, + []progress.Progress{ + {ID: "a", Action: "running ", Current: 9, Total: 9, HideCounts: true}, + {ID: "overall progress", Action: "1 out of 1 tasks"}, + }) + + // Add a new task while the current one is still running, to simulate + // "start-then-stop" updates. + tasks = append(tasks, + swarm.Task{ID: "3", + NodeID: "a", + DesiredState: swarm.TaskStateRunning, + Status: swarm.TaskStatus{State: swarm.TaskStatePreparing}, + }) + updaterTester.testUpdater(tasks, false, + []progress.Progress{ + {ID: "a", Action: "preparing", Current: 6, Total: 9, HideCounts: true}, + {ID: "overall progress", Action: "0 out of 1 tasks"}, + }) +} + +func TestGlobalProgressUpdaterManyNodes(t *testing.T) { + nodes := 50 + + service := swarm.Service{ + Spec: swarm.ServiceSpec{ + Mode: swarm.ServiceMode{ + Global: &swarm.GlobalService{}, + }, + }, + } + + p := &mockProgress{} + updaterTester := updaterTester{ + t: t, + updater: &globalProgressUpdater{ + progressOut: p, + }, + p: p, + activeNodes: map[string]struct{}{}, + service: service, + } + + for i := 0; i != nodes; i++ { + updaterTester.activeNodes[strconv.Itoa(i)] = struct{}{} + } + + tasks := []swarm.Task{} + + updaterTester.testUpdater(tasks, false, + []progress.Progress{ + {ID: "overall progress", Action: "waiting for new tasks"}, + }) + + for i := 0; i != nodes; i++ { + tasks = append(tasks, + swarm.Task{ + ID: "task" + strconv.Itoa(i), + NodeID: strconv.Itoa(i), + DesiredState: swarm.TaskStateRunning, + Status: swarm.TaskStatus{State: swarm.TaskStateNew}, + }) + } + + updaterTester.testUpdater(tasks, false, + []progress.Progress{ + {ID: "overall progress", Action: fmt.Sprintf("0 out of %d tasks", nodes)}, + {ID: "overall progress", Action: fmt.Sprintf("0 out of %d tasks", nodes)}, + }) + + for i := 0; i != nodes; i++ { + tasks[i].Status.State = swarm.TaskStateRunning + updaterTester.testUpdater(tasks, i == nodes-1, + []progress.Progress{ + {ID: "overall progress", Action: fmt.Sprintf("%d out of %d tasks", i+1, nodes)}, + }) + } +} From c9b92a328d11741a7062d1cfa774427342d63c7b Mon Sep 17 00:00:00 2001 From: Aaron Lehmann Date: Wed, 28 Jun 2017 17:48:23 -0700 Subject: [PATCH 3/3] progress: Light refactor Signed-off-by: Aaron Lehmann --- cli/command/service/progress/progress.go | 189 ++++++++++++----------- 1 file changed, 103 insertions(+), 86 deletions(-) diff --git a/cli/command/service/progress/progress.go b/cli/command/service/progress/progress.go index 94bf229b3a..5882b49576 100644 --- a/cli/command/service/progress/progress.go +++ b/cli/command/service/progress/progress.go @@ -265,7 +265,6 @@ type replicatedProgressUpdater struct { done bool } -// nolint: gocyclo func (u *replicatedProgressUpdater) update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]struct{}, rollback bool) (bool, error) { if service.Spec.Mode.Replicated == nil || service.Spec.Mode.Replicated.Replicas == nil { return false, errors.New("no replica count") @@ -286,34 +285,7 @@ func (u *replicatedProgressUpdater) update(service swarm.Service, tasks []swarm. u.initialized = true } - // If there are multiple tasks with the same slot number, favor the one - // with the *lowest* desired state. This can happen in restart - // scenarios. - tasksBySlot := make(map[int]swarm.Task) - for _, task := range tasks { - if numberedStates[task.DesiredState] == 0 || numberedStates[task.Status.State] == 0 { - continue - } - if existingTask, ok := tasksBySlot[task.Slot]; ok { - if numberedStates[existingTask.DesiredState] < numberedStates[task.DesiredState] { - continue - } - // If the desired states match, observed state breaks - // ties. This can happen with the "start first" service - // update mode. - if numberedStates[existingTask.DesiredState] == numberedStates[task.DesiredState] && - numberedStates[existingTask.Status.State] <= numberedStates[task.Status.State] { - continue - } - } - if task.NodeID != "" { - if _, nodeActive := activeNodes[task.NodeID]; nodeActive { - tasksBySlot[task.Slot] = task - } - } else { - tasksBySlot[task.Slot] = task - } - } + tasksBySlot := u.tasksBySlot(tasks, activeNodes) // If we had reached a converged state, check if we are still converged. if u.done { @@ -338,27 +310,7 @@ func (u *replicatedProgressUpdater) update(service swarm.Service, tasks []swarm. running++ } - if u.done || replicas > maxProgressBars || uint64(mappedSlot) > replicas { - continue - } - - if task.Status.Err != "" { - u.progressOut.WriteProgress(progress.Progress{ - ID: fmt.Sprintf("%d/%d", mappedSlot, replicas), - Action: truncError(task.Status.Err), - }) - continue - } - - if !terminalState(task.DesiredState) && !terminalState(task.Status.State) { - u.progressOut.WriteProgress(progress.Progress{ - ID: fmt.Sprintf("%d/%d", mappedSlot, replicas), - Action: fmt.Sprintf("%-[1]*s", longestState, task.Status.State), - Current: stateToProgress(task.Status.State, rollback), - Total: maxProgress, - HideCounts: true, - }) - } + u.writeTaskProgress(task, mappedSlot, replicas, rollback) } if !u.done { @@ -372,28 +324,19 @@ func (u *replicatedProgressUpdater) update(service swarm.Service, tasks []swarm. return running == replicas, nil } -type globalProgressUpdater struct { - progressOut progress.Output - - initialized bool - done bool -} - -// nolint: gocyclo -func (u *globalProgressUpdater) update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]struct{}, rollback bool) (bool, error) { - // If there are multiple tasks with the same node ID, favor the one +func (u *replicatedProgressUpdater) tasksBySlot(tasks []swarm.Task, activeNodes map[string]struct{}) map[int]swarm.Task { + // If there are multiple tasks with the same slot number, favor the one // with the *lowest* desired state. This can happen in restart // scenarios. - tasksByNode := make(map[string]swarm.Task) + tasksBySlot := make(map[int]swarm.Task) for _, task := range tasks { if numberedStates[task.DesiredState] == 0 || numberedStates[task.Status.State] == 0 { continue } - if existingTask, ok := tasksByNode[task.NodeID]; ok { + if existingTask, ok := tasksBySlot[task.Slot]; ok { if numberedStates[existingTask.DesiredState] < numberedStates[task.DesiredState] { continue } - // If the desired states match, observed state breaks // ties. This can happen with the "start first" service // update mode. @@ -401,11 +344,52 @@ func (u *globalProgressUpdater) update(service swarm.Service, tasks []swarm.Task numberedStates[existingTask.Status.State] <= numberedStates[task.Status.State] { continue } - } - tasksByNode[task.NodeID] = task + if task.NodeID != "" { + if _, nodeActive := activeNodes[task.NodeID]; !nodeActive { + continue + } + } + tasksBySlot[task.Slot] = task } + return tasksBySlot +} + +func (u *replicatedProgressUpdater) writeTaskProgress(task swarm.Task, mappedSlot int, replicas uint64, rollback bool) { + if u.done || replicas > maxProgressBars || uint64(mappedSlot) > replicas { + return + } + + if task.Status.Err != "" { + u.progressOut.WriteProgress(progress.Progress{ + ID: fmt.Sprintf("%d/%d", mappedSlot, replicas), + Action: truncError(task.Status.Err), + }) + return + } + + if !terminalState(task.DesiredState) && !terminalState(task.Status.State) { + u.progressOut.WriteProgress(progress.Progress{ + ID: fmt.Sprintf("%d/%d", mappedSlot, replicas), + Action: fmt.Sprintf("%-[1]*s", longestState, task.Status.State), + Current: stateToProgress(task.Status.State, rollback), + Total: maxProgress, + HideCounts: true, + }) + } +} + +type globalProgressUpdater struct { + progressOut progress.Output + + initialized bool + done bool +} + +func (u *globalProgressUpdater) update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]struct{}, rollback bool) (bool, error) { + tasksByNode := u.tasksByNode(tasks) + // We don't have perfect knowledge of how many nodes meet the // constraints for this service. But the orchestrator creates tasks // for all eligible nodes at the same time, so we should see all those @@ -446,27 +430,7 @@ func (u *globalProgressUpdater) update(service swarm.Service, tasks []swarm.Task running++ } - if u.done || nodeCount > maxProgressBars { - continue - } - - if task.Status.Err != "" { - u.progressOut.WriteProgress(progress.Progress{ - ID: stringid.TruncateID(task.NodeID), - Action: truncError(task.Status.Err), - }) - continue - } - - if !terminalState(task.DesiredState) && !terminalState(task.Status.State) { - u.progressOut.WriteProgress(progress.Progress{ - ID: stringid.TruncateID(task.NodeID), - Action: fmt.Sprintf("%-[1]*s", longestState, task.Status.State), - Current: stateToProgress(task.Status.State, rollback), - Total: maxProgress, - HideCounts: true, - }) - } + u.writeTaskProgress(task, nodeCount, rollback) } } @@ -480,3 +444,56 @@ func (u *globalProgressUpdater) update(service swarm.Service, tasks []swarm.Task return running == nodeCount, nil } + +func (u *globalProgressUpdater) tasksByNode(tasks []swarm.Task) map[string]swarm.Task { + // If there are multiple tasks with the same node ID, favor the one + // with the *lowest* desired state. This can happen in restart + // scenarios. + tasksByNode := make(map[string]swarm.Task) + for _, task := range tasks { + if numberedStates[task.DesiredState] == 0 || numberedStates[task.Status.State] == 0 { + continue + } + if existingTask, ok := tasksByNode[task.NodeID]; ok { + if numberedStates[existingTask.DesiredState] < numberedStates[task.DesiredState] { + continue + } + + // If the desired states match, observed state breaks + // ties. This can happen with the "start first" service + // update mode. + if numberedStates[existingTask.DesiredState] == numberedStates[task.DesiredState] && + numberedStates[existingTask.Status.State] <= numberedStates[task.Status.State] { + continue + } + + } + tasksByNode[task.NodeID] = task + } + + return tasksByNode +} + +func (u *globalProgressUpdater) writeTaskProgress(task swarm.Task, nodeCount int, rollback bool) { + if u.done || nodeCount > maxProgressBars { + return + } + + if task.Status.Err != "" { + u.progressOut.WriteProgress(progress.Progress{ + ID: stringid.TruncateID(task.NodeID), + Action: truncError(task.Status.Err), + }) + return + } + + if !terminalState(task.DesiredState) && !terminalState(task.Status.State) { + u.progressOut.WriteProgress(progress.Progress{ + ID: stringid.TruncateID(task.NodeID), + Action: fmt.Sprintf("%-[1]*s", longestState, task.Status.State), + Current: stateToProgress(task.Status.State, rollback), + Total: maxProgress, + HideCounts: true, + }) + } +}