DockerCLI/internal/containerizedengine/progress.go

216 lines
4.6 KiB
Go

package containerizedengine
import (
"context"
"encoding/json"
"fmt"
"io"
"strings"
"sync"
"time"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/remotes"
"github.com/docker/docker/pkg/jsonmessage"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/sirupsen/logrus"
)
func showProgress(ctx context.Context, ongoing *jobs, cs content.Store, out io.WriteCloser) {
var (
ticker = time.NewTicker(100 * time.Millisecond)
start = time.Now()
enc = json.NewEncoder(out)
statuses = map[string]statusInfo{}
done bool
)
defer ticker.Stop()
outer:
for {
select {
case <-ticker.C:
resolved := "resolved"
if !ongoing.isResolved() {
resolved = "resolving"
}
statuses[ongoing.name] = statusInfo{
Ref: ongoing.name,
Status: resolved,
}
keys := []string{ongoing.name}
activeSeen := map[string]struct{}{}
if !done {
active, err := cs.ListStatuses(ctx, "")
if err != nil {
logrus.Debugf("active check failed: %s", err)
continue
}
// update status of active entries!
for _, active := range active {
statuses[active.Ref] = statusInfo{
Ref: active.Ref,
Status: "downloading",
Offset: active.Offset,
Total: active.Total,
StartedAt: active.StartedAt,
UpdatedAt: active.UpdatedAt,
}
activeSeen[active.Ref] = struct{}{}
}
}
err := updateNonActive(ctx, ongoing, cs, statuses, &keys, activeSeen, &done, start)
if err != nil {
continue outer
}
var ordered []statusInfo
for _, key := range keys {
ordered = append(ordered, statuses[key])
}
for _, si := range ordered {
jm := si.JSONMessage()
err := enc.Encode(jm)
if err != nil {
logrus.Debugf("failed to encode progress message: %s", err)
}
}
if done {
out.Close()
return
}
case <-ctx.Done():
done = true // allow ui to update once more
}
}
}
func updateNonActive(ctx context.Context, ongoing *jobs, cs content.Store, statuses map[string]statusInfo, keys *[]string, activeSeen map[string]struct{}, done *bool, start time.Time) error {
for _, j := range ongoing.jobs() {
key := remotes.MakeRefKey(ctx, j)
*keys = append(*keys, key)
if _, ok := activeSeen[key]; ok {
continue
}
status, ok := statuses[key]
if !*done && (!ok || status.Status == "downloading") {
info, err := cs.Info(ctx, j.Digest)
if err != nil {
if !errdefs.IsNotFound(err) {
logrus.Debugf("failed to get content info: %s", err)
return err
}
statuses[key] = statusInfo{
Ref: key,
Status: "waiting",
}
} else if info.CreatedAt.After(start) {
statuses[key] = statusInfo{
Ref: key,
Status: "done",
Offset: info.Size,
Total: info.Size,
UpdatedAt: info.CreatedAt,
}
} else {
statuses[key] = statusInfo{
Ref: key,
Status: "exists",
}
}
} else if *done {
if ok {
if status.Status != "done" && status.Status != "exists" {
status.Status = "done"
statuses[key] = status
}
} else {
statuses[key] = statusInfo{
Ref: key,
Status: "done",
}
}
}
}
return nil
}
type jobs struct {
name string
added map[digest.Digest]struct{}
descs []ocispec.Descriptor
mu sync.Mutex
resolved bool
}
func newJobs(name string) *jobs {
return &jobs{
name: name,
added: map[digest.Digest]struct{}{},
}
}
func (j *jobs) add(desc ocispec.Descriptor) {
j.mu.Lock()
defer j.mu.Unlock()
j.resolved = true
if _, ok := j.added[desc.Digest]; ok {
return
}
j.descs = append(j.descs, desc)
j.added[desc.Digest] = struct{}{}
}
func (j *jobs) jobs() []ocispec.Descriptor {
j.mu.Lock()
defer j.mu.Unlock()
var descs []ocispec.Descriptor
return append(descs, j.descs...)
}
func (j *jobs) isResolved() bool {
j.mu.Lock()
defer j.mu.Unlock()
return j.resolved
}
// statusInfo holds the status info for an upload or download
type statusInfo struct {
Ref string
Status string
Offset int64
Total int64
StartedAt time.Time
UpdatedAt time.Time
}
func (s statusInfo) JSONMessage() jsonmessage.JSONMessage {
// Shorten the ID to use up less width on the display
id := s.Ref
if strings.Contains(id, ":") {
split := strings.SplitN(id, ":", 2)
id = split[1]
}
id = fmt.Sprintf("%.12s", id)
return jsonmessage.JSONMessage{
ID: id,
Status: s.Status,
Progress: &jsonmessage.JSONProgress{
Current: s.Offset,
Total: s.Total,
},
}
}