progress: Light refactor

Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
This commit is contained in:
Aaron Lehmann 2017-06-28 17:48:23 -07:00
parent dd3eae84e1
commit c9b92a328d
1 changed files with 103 additions and 86 deletions

View File

@ -265,7 +265,6 @@ type replicatedProgressUpdater struct {
done bool done bool
} }
// nolint: gocyclo
func (u *replicatedProgressUpdater) update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]struct{}, rollback bool) (bool, error) { func (u *replicatedProgressUpdater) update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]struct{}, rollback bool) (bool, error) {
if service.Spec.Mode.Replicated == nil || service.Spec.Mode.Replicated.Replicas == nil { if service.Spec.Mode.Replicated == nil || service.Spec.Mode.Replicated.Replicas == nil {
return false, errors.New("no replica count") return false, errors.New("no replica count")
@ -286,34 +285,7 @@ func (u *replicatedProgressUpdater) update(service swarm.Service, tasks []swarm.
u.initialized = true u.initialized = true
} }
// If there are multiple tasks with the same slot number, favor the one tasksBySlot := u.tasksBySlot(tasks, activeNodes)
// with the *lowest* desired state. This can happen in restart
// scenarios.
tasksBySlot := make(map[int]swarm.Task)
for _, task := range tasks {
if numberedStates[task.DesiredState] == 0 || numberedStates[task.Status.State] == 0 {
continue
}
if existingTask, ok := tasksBySlot[task.Slot]; ok {
if numberedStates[existingTask.DesiredState] < numberedStates[task.DesiredState] {
continue
}
// If the desired states match, observed state breaks
// ties. This can happen with the "start first" service
// update mode.
if numberedStates[existingTask.DesiredState] == numberedStates[task.DesiredState] &&
numberedStates[existingTask.Status.State] <= numberedStates[task.Status.State] {
continue
}
}
if task.NodeID != "" {
if _, nodeActive := activeNodes[task.NodeID]; nodeActive {
tasksBySlot[task.Slot] = task
}
} else {
tasksBySlot[task.Slot] = task
}
}
// If we had reached a converged state, check if we are still converged. // If we had reached a converged state, check if we are still converged.
if u.done { if u.done {
@ -338,27 +310,7 @@ func (u *replicatedProgressUpdater) update(service swarm.Service, tasks []swarm.
running++ running++
} }
if u.done || replicas > maxProgressBars || uint64(mappedSlot) > replicas { u.writeTaskProgress(task, mappedSlot, replicas, rollback)
continue
}
if task.Status.Err != "" {
u.progressOut.WriteProgress(progress.Progress{
ID: fmt.Sprintf("%d/%d", mappedSlot, replicas),
Action: truncError(task.Status.Err),
})
continue
}
if !terminalState(task.DesiredState) && !terminalState(task.Status.State) {
u.progressOut.WriteProgress(progress.Progress{
ID: fmt.Sprintf("%d/%d", mappedSlot, replicas),
Action: fmt.Sprintf("%-[1]*s", longestState, task.Status.State),
Current: stateToProgress(task.Status.State, rollback),
Total: maxProgress,
HideCounts: true,
})
}
} }
if !u.done { if !u.done {
@ -372,28 +324,19 @@ func (u *replicatedProgressUpdater) update(service swarm.Service, tasks []swarm.
return running == replicas, nil return running == replicas, nil
} }
type globalProgressUpdater struct { func (u *replicatedProgressUpdater) tasksBySlot(tasks []swarm.Task, activeNodes map[string]struct{}) map[int]swarm.Task {
progressOut progress.Output // If there are multiple tasks with the same slot number, favor the one
initialized bool
done bool
}
// nolint: gocyclo
func (u *globalProgressUpdater) update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]struct{}, rollback bool) (bool, error) {
// If there are multiple tasks with the same node ID, favor the one
// with the *lowest* desired state. This can happen in restart // with the *lowest* desired state. This can happen in restart
// scenarios. // scenarios.
tasksByNode := make(map[string]swarm.Task) tasksBySlot := make(map[int]swarm.Task)
for _, task := range tasks { for _, task := range tasks {
if numberedStates[task.DesiredState] == 0 || numberedStates[task.Status.State] == 0 { if numberedStates[task.DesiredState] == 0 || numberedStates[task.Status.State] == 0 {
continue continue
} }
if existingTask, ok := tasksByNode[task.NodeID]; ok { if existingTask, ok := tasksBySlot[task.Slot]; ok {
if numberedStates[existingTask.DesiredState] < numberedStates[task.DesiredState] { if numberedStates[existingTask.DesiredState] < numberedStates[task.DesiredState] {
continue continue
} }
// If the desired states match, observed state breaks // If the desired states match, observed state breaks
// ties. This can happen with the "start first" service // ties. This can happen with the "start first" service
// update mode. // update mode.
@ -401,11 +344,52 @@ func (u *globalProgressUpdater) update(service swarm.Service, tasks []swarm.Task
numberedStates[existingTask.Status.State] <= numberedStates[task.Status.State] { numberedStates[existingTask.Status.State] <= numberedStates[task.Status.State] {
continue continue
} }
}
if task.NodeID != "" {
if _, nodeActive := activeNodes[task.NodeID]; !nodeActive {
continue
}
}
tasksBySlot[task.Slot] = task
}
return tasksBySlot
}
func (u *replicatedProgressUpdater) writeTaskProgress(task swarm.Task, mappedSlot int, replicas uint64, rollback bool) {
if u.done || replicas > maxProgressBars || uint64(mappedSlot) > replicas {
return
} }
tasksByNode[task.NodeID] = task
if task.Status.Err != "" {
u.progressOut.WriteProgress(progress.Progress{
ID: fmt.Sprintf("%d/%d", mappedSlot, replicas),
Action: truncError(task.Status.Err),
})
return
} }
if !terminalState(task.DesiredState) && !terminalState(task.Status.State) {
u.progressOut.WriteProgress(progress.Progress{
ID: fmt.Sprintf("%d/%d", mappedSlot, replicas),
Action: fmt.Sprintf("%-[1]*s", longestState, task.Status.State),
Current: stateToProgress(task.Status.State, rollback),
Total: maxProgress,
HideCounts: true,
})
}
}
type globalProgressUpdater struct {
progressOut progress.Output
initialized bool
done bool
}
func (u *globalProgressUpdater) update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]struct{}, rollback bool) (bool, error) {
tasksByNode := u.tasksByNode(tasks)
// We don't have perfect knowledge of how many nodes meet the // We don't have perfect knowledge of how many nodes meet the
// constraints for this service. But the orchestrator creates tasks // constraints for this service. But the orchestrator creates tasks
// for all eligible nodes at the same time, so we should see all those // for all eligible nodes at the same time, so we should see all those
@ -446,27 +430,7 @@ func (u *globalProgressUpdater) update(service swarm.Service, tasks []swarm.Task
running++ running++
} }
if u.done || nodeCount > maxProgressBars { u.writeTaskProgress(task, nodeCount, rollback)
continue
}
if task.Status.Err != "" {
u.progressOut.WriteProgress(progress.Progress{
ID: stringid.TruncateID(task.NodeID),
Action: truncError(task.Status.Err),
})
continue
}
if !terminalState(task.DesiredState) && !terminalState(task.Status.State) {
u.progressOut.WriteProgress(progress.Progress{
ID: stringid.TruncateID(task.NodeID),
Action: fmt.Sprintf("%-[1]*s", longestState, task.Status.State),
Current: stateToProgress(task.Status.State, rollback),
Total: maxProgress,
HideCounts: true,
})
}
} }
} }
@ -480,3 +444,56 @@ func (u *globalProgressUpdater) update(service swarm.Service, tasks []swarm.Task
return running == nodeCount, nil return running == nodeCount, nil
} }
func (u *globalProgressUpdater) tasksByNode(tasks []swarm.Task) map[string]swarm.Task {
// If there are multiple tasks with the same node ID, favor the one
// with the *lowest* desired state. This can happen in restart
// scenarios.
tasksByNode := make(map[string]swarm.Task)
for _, task := range tasks {
if numberedStates[task.DesiredState] == 0 || numberedStates[task.Status.State] == 0 {
continue
}
if existingTask, ok := tasksByNode[task.NodeID]; ok {
if numberedStates[existingTask.DesiredState] < numberedStates[task.DesiredState] {
continue
}
// If the desired states match, observed state breaks
// ties. This can happen with the "start first" service
// update mode.
if numberedStates[existingTask.DesiredState] == numberedStates[task.DesiredState] &&
numberedStates[existingTask.Status.State] <= numberedStates[task.Status.State] {
continue
}
}
tasksByNode[task.NodeID] = task
}
return tasksByNode
}
func (u *globalProgressUpdater) writeTaskProgress(task swarm.Task, nodeCount int, rollback bool) {
if u.done || nodeCount > maxProgressBars {
return
}
if task.Status.Err != "" {
u.progressOut.WriteProgress(progress.Progress{
ID: stringid.TruncateID(task.NodeID),
Action: truncError(task.Status.Err),
})
return
}
if !terminalState(task.DesiredState) && !terminalState(task.Status.State) {
u.progressOut.WriteProgress(progress.Progress{
ID: stringid.TruncateID(task.NodeID),
Action: fmt.Sprintf("%-[1]*s", longestState, task.Status.State),
Current: stateToProgress(task.Status.State, rollback),
Total: maxProgress,
HideCounts: true,
})
}
}