Merge pull request #259 from aaronlehmann/service-progress-surface-error

progress: Show task error in place of progress bar
This commit is contained in:
Sebastiaan van Stijn 2017-07-09 00:04:38 -07:00 committed by GitHub
commit 7ae9bc141c
2 changed files with 524 additions and 66 deletions

View File

@ -6,6 +6,7 @@ import (
"io"
"os"
"os/signal"
"strings"
"time"
"github.com/docker/docker/api/types"
@ -29,6 +30,13 @@ var (
swarm.TaskStateReady: 7,
swarm.TaskStateStarting: 8,
swarm.TaskStateRunning: 9,
// The following states are not actually shown in progress
// output, but are used internally for ordering.
swarm.TaskStateComplete: 10,
swarm.TaskStateShutdown: 11,
swarm.TaskStateFailed: 12,
swarm.TaskStateRejected: 13,
}
longestState int
@ -40,22 +48,26 @@ const (
)
type progressUpdater interface {
update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]swarm.Node, rollback bool) (bool, error)
update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]struct{}, rollback bool) (bool, error)
}
func init() {
for state := range numberedStates {
if len(state) > longestState {
if !terminalState(state) && len(state) > longestState {
longestState = len(state)
}
}
}
func terminalState(state swarm.TaskState) bool {
return numberedStates[state] > numberedStates[swarm.TaskStateRunning]
}
func stateToProgress(state swarm.TaskState, rollback bool) int64 {
if !rollback {
return numberedStates[state]
}
return int64(len(numberedStates)) - numberedStates[state]
return numberedStates[swarm.TaskStateRunning] - numberedStates[state]
}
// ServiceProgress outputs progress information for convergence of a service.
@ -192,16 +204,16 @@ func ServiceProgress(ctx context.Context, client client.APIClient, serviceID str
}
}
func getActiveNodes(ctx context.Context, client client.APIClient) (map[string]swarm.Node, error) {
func getActiveNodes(ctx context.Context, client client.APIClient) (map[string]struct{}, error) {
nodes, err := client.NodeList(ctx, types.NodeListOptions{})
if err != nil {
return nil, err
}
activeNodes := make(map[string]swarm.Node)
activeNodes := make(map[string]struct{})
for _, n := range nodes {
if n.Status.State != swarm.NodeStateDown {
activeNodes[n.ID] = n
activeNodes[n.ID] = struct{}{}
}
}
return activeNodes, nil
@ -235,6 +247,18 @@ func writeOverallProgress(progressOut progress.Output, numerator, denominator in
})
}
func truncError(errMsg string) string {
// Remove newlines from the error, which corrupt the output.
errMsg = strings.Replace(errMsg, "\n", " ", -1)
// Limit the length to 75 characters, so that even on narrow terminals
// this will not overflow to the next line.
if len(errMsg) > 75 {
errMsg = errMsg[:74] + "…"
}
return errMsg
}
type replicatedProgressUpdater struct {
progressOut progress.Output
@ -246,8 +270,7 @@ type replicatedProgressUpdater struct {
done bool
}
// nolint: gocyclo
func (u *replicatedProgressUpdater) update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]swarm.Node, rollback bool) (bool, error) {
func (u *replicatedProgressUpdater) update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]struct{}, rollback bool) (bool, error) {
if service.Spec.Mode.Replicated == nil || service.Spec.Mode.Replicated.Replicas == nil {
return false, errors.New("no replica count")
}
@ -267,27 +290,7 @@ func (u *replicatedProgressUpdater) update(service swarm.Service, tasks []swarm.
u.initialized = true
}
// If there are multiple tasks with the same slot number, favor the one
// with the *lowest* desired state. This can happen in restart
// scenarios.
tasksBySlot := make(map[int]swarm.Task)
for _, task := range tasks {
if numberedStates[task.DesiredState] == 0 {
continue
}
if existingTask, ok := tasksBySlot[task.Slot]; ok {
if numberedStates[existingTask.DesiredState] <= numberedStates[task.DesiredState] {
continue
}
}
if task.NodeID != "" {
if _, nodeActive := activeNodes[task.NodeID]; nodeActive {
tasksBySlot[task.Slot] = task
}
} else {
tasksBySlot[task.Slot] = task
}
}
tasksBySlot := u.tasksBySlot(tasks, activeNodes)
// If we had reached a converged state, check if we are still converged.
if u.done {
@ -308,18 +311,11 @@ func (u *replicatedProgressUpdater) update(service swarm.Service, tasks []swarm.
u.slotMap[task.Slot] = mappedSlot
}
if !u.done && replicas <= maxProgressBars && uint64(mappedSlot) <= replicas {
u.progressOut.WriteProgress(progress.Progress{
ID: fmt.Sprintf("%d/%d", mappedSlot, replicas),
Action: fmt.Sprintf("%-[1]*s", longestState, task.Status.State),
Current: stateToProgress(task.Status.State, rollback),
Total: maxProgress,
HideCounts: true,
})
}
if task.Status.State == swarm.TaskStateRunning {
if !terminalState(task.DesiredState) && task.Status.State == swarm.TaskStateRunning {
running++
}
u.writeTaskProgress(task, mappedSlot, replicas, rollback)
}
if !u.done {
@ -333,6 +329,62 @@ func (u *replicatedProgressUpdater) update(service swarm.Service, tasks []swarm.
return running == replicas, nil
}
func (u *replicatedProgressUpdater) tasksBySlot(tasks []swarm.Task, activeNodes map[string]struct{}) map[int]swarm.Task {
// If there are multiple tasks with the same slot number, favor the one
// with the *lowest* desired state. This can happen in restart
// scenarios.
tasksBySlot := make(map[int]swarm.Task)
for _, task := range tasks {
if numberedStates[task.DesiredState] == 0 || numberedStates[task.Status.State] == 0 {
continue
}
if existingTask, ok := tasksBySlot[task.Slot]; ok {
if numberedStates[existingTask.DesiredState] < numberedStates[task.DesiredState] {
continue
}
// If the desired states match, observed state breaks
// ties. This can happen with the "start first" service
// update mode.
if numberedStates[existingTask.DesiredState] == numberedStates[task.DesiredState] &&
numberedStates[existingTask.Status.State] <= numberedStates[task.Status.State] {
continue
}
}
if task.NodeID != "" {
if _, nodeActive := activeNodes[task.NodeID]; !nodeActive {
continue
}
}
tasksBySlot[task.Slot] = task
}
return tasksBySlot
}
func (u *replicatedProgressUpdater) writeTaskProgress(task swarm.Task, mappedSlot int, replicas uint64, rollback bool) {
if u.done || replicas > maxProgressBars || uint64(mappedSlot) > replicas {
return
}
if task.Status.Err != "" {
u.progressOut.WriteProgress(progress.Progress{
ID: fmt.Sprintf("%d/%d", mappedSlot, replicas),
Action: truncError(task.Status.Err),
})
return
}
if !terminalState(task.DesiredState) && !terminalState(task.Status.State) {
u.progressOut.WriteProgress(progress.Progress{
ID: fmt.Sprintf("%d/%d", mappedSlot, replicas),
Action: fmt.Sprintf("%-[1]*s", longestState, task.Status.State),
Current: stateToProgress(task.Status.State, rollback),
Total: maxProgress,
HideCounts: true,
})
}
}
type globalProgressUpdater struct {
progressOut progress.Output
@ -340,22 +392,8 @@ type globalProgressUpdater struct {
done bool
}
func (u *globalProgressUpdater) update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]swarm.Node, rollback bool) (bool, error) {
// If there are multiple tasks with the same node ID, favor the one
// with the *lowest* desired state. This can happen in restart
// scenarios.
tasksByNode := make(map[string]swarm.Task)
for _, task := range tasks {
if numberedStates[task.DesiredState] == 0 {
continue
}
if existingTask, ok := tasksByNode[task.NodeID]; ok {
if numberedStates[existingTask.DesiredState] <= numberedStates[task.DesiredState] {
continue
}
}
tasksByNode[task.NodeID] = task
}
func (u *globalProgressUpdater) update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]struct{}, rollback bool) (bool, error) {
tasksByNode := u.tasksByNode(tasks)
// We don't have perfect knowledge of how many nodes meet the
// constraints for this service. But the orchestrator creates tasks
@ -392,19 +430,12 @@ func (u *globalProgressUpdater) update(service swarm.Service, tasks []swarm.Task
running := 0
for _, task := range tasksByNode {
if node, nodeActive := activeNodes[task.NodeID]; nodeActive {
if !u.done && nodeCount <= maxProgressBars {
u.progressOut.WriteProgress(progress.Progress{
ID: stringid.TruncateID(node.ID),
Action: fmt.Sprintf("%-[1]*s", longestState, task.Status.State),
Current: stateToProgress(task.Status.State, rollback),
Total: maxProgress,
HideCounts: true,
})
}
if task.Status.State == swarm.TaskStateRunning {
if _, nodeActive := activeNodes[task.NodeID]; nodeActive {
if !terminalState(task.DesiredState) && task.Status.State == swarm.TaskStateRunning {
running++
}
u.writeTaskProgress(task, nodeCount, rollback)
}
}
@ -418,3 +449,56 @@ func (u *globalProgressUpdater) update(service swarm.Service, tasks []swarm.Task
return running == nodeCount, nil
}
func (u *globalProgressUpdater) tasksByNode(tasks []swarm.Task) map[string]swarm.Task {
// If there are multiple tasks with the same node ID, favor the one
// with the *lowest* desired state. This can happen in restart
// scenarios.
tasksByNode := make(map[string]swarm.Task)
for _, task := range tasks {
if numberedStates[task.DesiredState] == 0 || numberedStates[task.Status.State] == 0 {
continue
}
if existingTask, ok := tasksByNode[task.NodeID]; ok {
if numberedStates[existingTask.DesiredState] < numberedStates[task.DesiredState] {
continue
}
// If the desired states match, observed state breaks
// ties. This can happen with the "start first" service
// update mode.
if numberedStates[existingTask.DesiredState] == numberedStates[task.DesiredState] &&
numberedStates[existingTask.Status.State] <= numberedStates[task.Status.State] {
continue
}
}
tasksByNode[task.NodeID] = task
}
return tasksByNode
}
func (u *globalProgressUpdater) writeTaskProgress(task swarm.Task, nodeCount int, rollback bool) {
if u.done || nodeCount > maxProgressBars {
return
}
if task.Status.Err != "" {
u.progressOut.WriteProgress(progress.Progress{
ID: stringid.TruncateID(task.NodeID),
Action: truncError(task.Status.Err),
})
return
}
if !terminalState(task.DesiredState) && !terminalState(task.Status.State) {
u.progressOut.WriteProgress(progress.Progress{
ID: stringid.TruncateID(task.NodeID),
Action: fmt.Sprintf("%-[1]*s", longestState, task.Status.State),
Current: stateToProgress(task.Status.State, rollback),
Total: maxProgress,
HideCounts: true,
})
}
}

View File

@ -0,0 +1,374 @@
package progress
import (
"fmt"
"strconv"
"testing"
"github.com/docker/docker/api/types/swarm"
"github.com/docker/docker/pkg/progress"
"github.com/stretchr/testify/assert"
)
type mockProgress struct {
p []progress.Progress
}
func (mp *mockProgress) WriteProgress(p progress.Progress) error {
mp.p = append(mp.p, p)
return nil
}
func (mp *mockProgress) clear() {
mp.p = nil
}
type updaterTester struct {
t *testing.T
updater progressUpdater
p *mockProgress
service swarm.Service
activeNodes map[string]struct{}
rollback bool
}
func (u updaterTester) testUpdater(tasks []swarm.Task, expectedConvergence bool, expectedProgress []progress.Progress) {
u.p.clear()
converged, err := u.updater.update(u.service, tasks, u.activeNodes, u.rollback)
assert.NoError(u.t, err)
assert.Equal(u.t, expectedConvergence, converged)
assert.Equal(u.t, expectedProgress, u.p.p)
}
func TestReplicatedProgressUpdaterOneReplica(t *testing.T) {
replicas := uint64(1)
service := swarm.Service{
Spec: swarm.ServiceSpec{
Mode: swarm.ServiceMode{
Replicated: &swarm.ReplicatedService{
Replicas: &replicas,
},
},
},
}
p := &mockProgress{}
updaterTester := updaterTester{
t: t,
updater: &replicatedProgressUpdater{
progressOut: p,
},
p: p,
activeNodes: map[string]struct{}{"a": {}, "b": {}},
service: service,
}
tasks := []swarm.Task{}
updaterTester.testUpdater(tasks, false,
[]progress.Progress{
{ID: "overall progress", Action: "0 out of 1 tasks"},
{ID: "1/1", Action: " "},
{ID: "overall progress", Action: "0 out of 1 tasks"},
})
// Task with DesiredState beyond Running is ignored
tasks = append(tasks,
swarm.Task{ID: "1",
NodeID: "a",
DesiredState: swarm.TaskStateShutdown,
Status: swarm.TaskStatus{State: swarm.TaskStateNew},
})
updaterTester.testUpdater(tasks, false,
[]progress.Progress{
{ID: "overall progress", Action: "0 out of 1 tasks"},
})
// Task with valid DesiredState and State updates progress bar
tasks[0].DesiredState = swarm.TaskStateRunning
updaterTester.testUpdater(tasks, false,
[]progress.Progress{
{ID: "1/1", Action: "new ", Current: 1, Total: 9, HideCounts: true},
{ID: "overall progress", Action: "0 out of 1 tasks"},
})
// If the task exposes an error, we should show that instead of the
// progress bar.
tasks[0].Status.Err = "something is wrong"
updaterTester.testUpdater(tasks, false,
[]progress.Progress{
{ID: "1/1", Action: "something is wrong"},
{ID: "overall progress", Action: "0 out of 1 tasks"},
})
// When the task reaches running, update should return true
tasks[0].Status.Err = ""
tasks[0].Status.State = swarm.TaskStateRunning
updaterTester.testUpdater(tasks, true,
[]progress.Progress{
{ID: "1/1", Action: "running ", Current: 9, Total: 9, HideCounts: true},
{ID: "overall progress", Action: "1 out of 1 tasks"},
})
// If the task fails, update should return false again
tasks[0].Status.Err = "task failed"
tasks[0].Status.State = swarm.TaskStateFailed
updaterTester.testUpdater(tasks, false,
[]progress.Progress{
{ID: "1/1", Action: "task failed"},
{ID: "overall progress", Action: "0 out of 1 tasks"},
})
// If the task is restarted, progress output should be shown for the
// replacement task, not the old task.
tasks[0].DesiredState = swarm.TaskStateShutdown
tasks = append(tasks,
swarm.Task{ID: "2",
NodeID: "b",
DesiredState: swarm.TaskStateRunning,
Status: swarm.TaskStatus{State: swarm.TaskStateRunning},
})
updaterTester.testUpdater(tasks, true,
[]progress.Progress{
{ID: "1/1", Action: "running ", Current: 9, Total: 9, HideCounts: true},
{ID: "overall progress", Action: "1 out of 1 tasks"},
})
// Add a new task while the current one is still running, to simulate
// "start-then-stop" updates.
tasks = append(tasks,
swarm.Task{ID: "3",
NodeID: "b",
DesiredState: swarm.TaskStateRunning,
Status: swarm.TaskStatus{State: swarm.TaskStatePreparing},
})
updaterTester.testUpdater(tasks, false,
[]progress.Progress{
{ID: "1/1", Action: "preparing", Current: 6, Total: 9, HideCounts: true},
{ID: "overall progress", Action: "0 out of 1 tasks"},
})
}
func TestReplicatedProgressUpdaterManyReplicas(t *testing.T) {
replicas := uint64(50)
service := swarm.Service{
Spec: swarm.ServiceSpec{
Mode: swarm.ServiceMode{
Replicated: &swarm.ReplicatedService{
Replicas: &replicas,
},
},
},
}
p := &mockProgress{}
updaterTester := updaterTester{
t: t,
updater: &replicatedProgressUpdater{
progressOut: p,
},
p: p,
activeNodes: map[string]struct{}{"a": {}, "b": {}},
service: service,
}
tasks := []swarm.Task{}
// No per-task progress bars because there are too many replicas
updaterTester.testUpdater(tasks, false,
[]progress.Progress{
{ID: "overall progress", Action: fmt.Sprintf("0 out of %d tasks", replicas)},
{ID: "overall progress", Action: fmt.Sprintf("0 out of %d tasks", replicas)},
})
for i := 0; i != int(replicas); i++ {
tasks = append(tasks,
swarm.Task{
ID: strconv.Itoa(i),
Slot: i + 1,
NodeID: "a",
DesiredState: swarm.TaskStateRunning,
Status: swarm.TaskStatus{State: swarm.TaskStateNew},
})
if i%2 == 1 {
tasks[i].NodeID = "b"
}
updaterTester.testUpdater(tasks, false,
[]progress.Progress{
{ID: "overall progress", Action: fmt.Sprintf("%d out of %d tasks", i, replicas)},
})
tasks[i].Status.State = swarm.TaskStateRunning
updaterTester.testUpdater(tasks, uint64(i) == replicas-1,
[]progress.Progress{
{ID: "overall progress", Action: fmt.Sprintf("%d out of %d tasks", i+1, replicas)},
})
}
}
func TestGlobalProgressUpdaterOneNode(t *testing.T) {
service := swarm.Service{
Spec: swarm.ServiceSpec{
Mode: swarm.ServiceMode{
Global: &swarm.GlobalService{},
},
},
}
p := &mockProgress{}
updaterTester := updaterTester{
t: t,
updater: &globalProgressUpdater{
progressOut: p,
},
p: p,
activeNodes: map[string]struct{}{"a": {}, "b": {}},
service: service,
}
tasks := []swarm.Task{}
updaterTester.testUpdater(tasks, false,
[]progress.Progress{
{ID: "overall progress", Action: "waiting for new tasks"},
})
// Task with DesiredState beyond Running is ignored
tasks = append(tasks,
swarm.Task{ID: "1",
NodeID: "a",
DesiredState: swarm.TaskStateShutdown,
Status: swarm.TaskStatus{State: swarm.TaskStateNew},
})
updaterTester.testUpdater(tasks, false,
[]progress.Progress{
{ID: "overall progress", Action: "0 out of 1 tasks"},
{ID: "overall progress", Action: "0 out of 1 tasks"},
})
// Task with valid DesiredState and State updates progress bar
tasks[0].DesiredState = swarm.TaskStateRunning
updaterTester.testUpdater(tasks, false,
[]progress.Progress{
{ID: "a", Action: "new ", Current: 1, Total: 9, HideCounts: true},
{ID: "overall progress", Action: "0 out of 1 tasks"},
})
// If the task exposes an error, we should show that instead of the
// progress bar.
tasks[0].Status.Err = "something is wrong"
updaterTester.testUpdater(tasks, false,
[]progress.Progress{
{ID: "a", Action: "something is wrong"},
{ID: "overall progress", Action: "0 out of 1 tasks"},
})
// When the task reaches running, update should return true
tasks[0].Status.Err = ""
tasks[0].Status.State = swarm.TaskStateRunning
updaterTester.testUpdater(tasks, true,
[]progress.Progress{
{ID: "a", Action: "running ", Current: 9, Total: 9, HideCounts: true},
{ID: "overall progress", Action: "1 out of 1 tasks"},
})
// If the task fails, update should return false again
tasks[0].Status.Err = "task failed"
tasks[0].Status.State = swarm.TaskStateFailed
updaterTester.testUpdater(tasks, false,
[]progress.Progress{
{ID: "a", Action: "task failed"},
{ID: "overall progress", Action: "0 out of 1 tasks"},
})
// If the task is restarted, progress output should be shown for the
// replacement task, not the old task.
tasks[0].DesiredState = swarm.TaskStateShutdown
tasks = append(tasks,
swarm.Task{ID: "2",
NodeID: "a",
DesiredState: swarm.TaskStateRunning,
Status: swarm.TaskStatus{State: swarm.TaskStateRunning},
})
updaterTester.testUpdater(tasks, true,
[]progress.Progress{
{ID: "a", Action: "running ", Current: 9, Total: 9, HideCounts: true},
{ID: "overall progress", Action: "1 out of 1 tasks"},
})
// Add a new task while the current one is still running, to simulate
// "start-then-stop" updates.
tasks = append(tasks,
swarm.Task{ID: "3",
NodeID: "a",
DesiredState: swarm.TaskStateRunning,
Status: swarm.TaskStatus{State: swarm.TaskStatePreparing},
})
updaterTester.testUpdater(tasks, false,
[]progress.Progress{
{ID: "a", Action: "preparing", Current: 6, Total: 9, HideCounts: true},
{ID: "overall progress", Action: "0 out of 1 tasks"},
})
}
func TestGlobalProgressUpdaterManyNodes(t *testing.T) {
nodes := 50
service := swarm.Service{
Spec: swarm.ServiceSpec{
Mode: swarm.ServiceMode{
Global: &swarm.GlobalService{},
},
},
}
p := &mockProgress{}
updaterTester := updaterTester{
t: t,
updater: &globalProgressUpdater{
progressOut: p,
},
p: p,
activeNodes: map[string]struct{}{},
service: service,
}
for i := 0; i != nodes; i++ {
updaterTester.activeNodes[strconv.Itoa(i)] = struct{}{}
}
tasks := []swarm.Task{}
updaterTester.testUpdater(tasks, false,
[]progress.Progress{
{ID: "overall progress", Action: "waiting for new tasks"},
})
for i := 0; i != nodes; i++ {
tasks = append(tasks,
swarm.Task{
ID: "task" + strconv.Itoa(i),
NodeID: strconv.Itoa(i),
DesiredState: swarm.TaskStateRunning,
Status: swarm.TaskStatus{State: swarm.TaskStateNew},
})
}
updaterTester.testUpdater(tasks, false,
[]progress.Progress{
{ID: "overall progress", Action: fmt.Sprintf("0 out of %d tasks", nodes)},
{ID: "overall progress", Action: fmt.Sprintf("0 out of %d tasks", nodes)},
})
for i := 0; i != nodes; i++ {
tasks[i].Status.State = swarm.TaskStateRunning
updaterTester.testUpdater(tasks, i == nodes-1,
[]progress.Progress{
{ID: "overall progress", Action: fmt.Sprintf("%d out of %d tasks", i+1, nodes)},
})
}
}