cp: Do not block transfer on writing to terminal

This moves all the terminal writing to a goroutine that updates the
terminal periodically.
In our MITM copier we just use an atomic to add to the total number of
bytes read/written, the goroutine reads the total and updates the
terminal as needed.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>
This commit is contained in:
Brian Goff 2023-03-30 17:22:17 +00:00
parent b9a1b0928a
commit eb392ff4ce
1 changed files with 77 additions and 58 deletions

View File

@ -1,12 +1,15 @@
package container package container
import ( import (
"bytes"
"context" "context"
"fmt" "fmt"
"io" "io"
"os" "os"
"os/signal"
"path/filepath" "path/filepath"
"strings" "strings"
"sync/atomic"
"time" "time"
"github.com/docker/cli/cli" "github.com/docker/cli/cli"
@ -50,11 +53,7 @@ type cpConfig struct {
// copying files to/from a container. // copying files to/from a container.
type copyProgressPrinter struct { type copyProgressPrinter struct {
io.ReadCloser io.ReadCloser
total *float64 total *int64
writer io.Writer
isTerm bool
header string
lastUpdate time.Time
} }
const ( const (
@ -65,35 +64,62 @@ const (
func (pt *copyProgressPrinter) Read(p []byte) (int, error) { func (pt *copyProgressPrinter) Read(p []byte) (int, error) {
n, err := pt.ReadCloser.Read(p) n, err := pt.ReadCloser.Read(p)
isFirst := *pt.total == 0 atomic.AddInt64(pt.total, int64(n))
if n > 0 {
*pt.total += float64(n)
}
if !pt.isTerm {
return n, err return n, err
}
func copyProgress(ctx context.Context, dst io.Writer, header string, total *int64) (func(), <-chan struct{}) {
done := make(chan struct{})
if !streams.NewOut(dst).IsTerminal() {
close(done)
return func() {}, done
} }
doUpdate := func() { fmt.Fprint(dst, aec.Save)
fmt.Fprint(pt.writer, aec.Hide) fmt.Fprint(dst, "Preparing to copy...")
fmt.Fprint(pt.writer, aec.Column(uint(len(pt.header)+1)))
fmt.Fprint(pt.writer, aec.EraseLine(aec.EraseModes.Tail)) restore := func() {
fmt.Fprint(pt.writer, units.HumanSize(*pt.total)) fmt.Fprint(dst, aec.Restore)
fmt.Fprint(pt.writer, aec.Show) fmt.Fprint(dst, aec.EraseLine(aec.EraseModes.All))
pt.lastUpdate = time.Now()
} }
if isFirst { go func() {
fmt.Fprint(pt.writer, aec.Restore) defer close(done)
fmt.Fprint(pt.writer, aec.EraseLine(aec.EraseModes.All)) fmt.Fprint(dst, aec.Hide)
fmt.Fprint(pt.writer, pt.header) defer fmt.Fprint(dst, aec.Show)
doUpdate()
return n, err fmt.Fprint(dst, aec.Restore)
fmt.Fprint(dst, aec.EraseLine(aec.EraseModes.All))
fmt.Fprint(dst, header)
var last int64
fmt.Fprint(dst, progressHumanSize(last))
buf := bytes.NewBuffer(nil)
ticker := time.NewTicker(copyProgressUpdateThreshold)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
n := atomic.LoadInt64(total)
if n == last {
// Don't write to the terminal, if we don't need to.
continue
} }
if err != nil || time.Since(pt.lastUpdate) > copyProgressUpdateThreshold { // Write to the buffer first to avoid flickering and context switching
doUpdate() fmt.Fprint(buf, aec.Column(uint(len(header)+1)))
fmt.Fprint(buf, aec.EraseLine(aec.EraseModes.Tail))
fmt.Fprint(buf, progressHumanSize(n))
buf.WriteTo(dst)
buf.Reset()
last += n
} }
return n, err }
}()
return restore, done
} }
// NewCopyCommand creates a new `docker cp` command // NewCopyCommand creates a new `docker cp` command
@ -139,6 +165,10 @@ func NewCopyCommand(dockerCli command.Cli) *cobra.Command {
return cmd return cmd
} }
func progressHumanSize(n int64) string {
return units.HumanSizeWithPrecision(float64(n), 3)
}
func runCopy(dockerCli command.Cli, opts copyOptions) error { func runCopy(dockerCli command.Cli, opts copyOptions) error {
srcContainer, srcPath := splitCpArg(opts.source) srcContainer, srcPath := splitCpArg(opts.source)
destContainer, destPath := splitCpArg(opts.destination) destContainer, destPath := splitCpArg(opts.destination)
@ -219,6 +249,9 @@ func copyFromContainer(ctx context.Context, dockerCli command.Cli, copyConfig cp
} }
ctx, cancel := signal.NotifyContext(ctx, os.Interrupt)
defer cancel()
content, stat, err := client.CopyFromContainer(ctx, copyConfig.container, srcPath) content, stat, err := client.CopyFromContainer(ctx, copyConfig.container, srcPath)
if err != nil { if err != nil {
return err return err
@ -237,14 +270,11 @@ func copyFromContainer(ctx context.Context, dockerCli command.Cli, copyConfig cp
RebaseName: rebaseName, RebaseName: rebaseName,
} }
var copiedSize float64 var copiedSize int64
if !copyConfig.quiet { if !copyConfig.quiet {
content = &copyProgressPrinter{ content = &copyProgressPrinter{
ReadCloser: content, ReadCloser: content,
writer: dockerCli.Err(),
total: &copiedSize, total: &copiedSize,
isTerm: streams.NewOut(dockerCli.Err()).IsTerminal(),
header: copyFromContainerHeader,
} }
} }
@ -258,10 +288,12 @@ func copyFromContainer(ctx context.Context, dockerCli command.Cli, copyConfig cp
return archive.CopyTo(preArchive, srcInfo, dstPath) return archive.CopyTo(preArchive, srcInfo, dstPath)
} }
restore := prepareTTYCopyProgress(dockerCli) restore, done := copyProgress(ctx, dockerCli.Err(), copyFromContainerHeader, &copiedSize)
res := archive.CopyTo(preArchive, srcInfo, dstPath) res := archive.CopyTo(preArchive, srcInfo, dstPath)
cancel()
<-done
restore() restore()
fmt.Fprintln(dockerCli.Err(), "Successfully copied", units.HumanSize(copiedSize), "to", dstPath) fmt.Fprintln(dockerCli.Err(), "Successfully copied", progressHumanSize(copiedSize), "to", dstPath)
return res return res
} }
@ -318,7 +350,7 @@ func copyToContainer(ctx context.Context, dockerCli command.Cli, copyConfig cpCo
var ( var (
content io.ReadCloser content io.ReadCloser
resolvedDstPath string resolvedDstPath string
copiedSize float64 copiedSize int64
) )
if srcPath == "-" { if srcPath == "-" {
@ -363,10 +395,7 @@ func copyToContainer(ctx context.Context, dockerCli command.Cli, copyConfig cpCo
if !copyConfig.quiet { if !copyConfig.quiet {
content = &copyProgressPrinter{ content = &copyProgressPrinter{
ReadCloser: content, ReadCloser: content,
writer: dockerCli.Err(),
total: &copiedSize, total: &copiedSize,
isTerm: streams.NewOut(dockerCli.Err()).IsTerminal(),
header: copyToContainerHeader,
} }
} }
} }
@ -380,27 +409,17 @@ func copyToContainer(ctx context.Context, dockerCli command.Cli, copyConfig cpCo
return client.CopyToContainer(ctx, copyConfig.container, resolvedDstPath, content, options) return client.CopyToContainer(ctx, copyConfig.container, resolvedDstPath, content, options)
} }
restore := prepareTTYCopyProgress(dockerCli) ctx, cancel := signal.NotifyContext(ctx, os.Interrupt)
restore, done := copyProgress(ctx, dockerCli.Err(), copyToContainerHeader, &copiedSize)
res := client.CopyToContainer(ctx, copyConfig.container, resolvedDstPath, content, options) res := client.CopyToContainer(ctx, copyConfig.container, resolvedDstPath, content, options)
cancel()
<-done
restore() restore()
fmt.Fprintln(dockerCli.Err(), "Successfully copied", units.HumanSize(copiedSize), "to", copyConfig.container+":"+dstInfo.Path) fmt.Fprintln(dockerCli.Err(), "Successfully copied", progressHumanSize(copiedSize), "to", copyConfig.container+":"+dstInfo.Path)
return res return res
} }
func prepareTTYCopyProgress(cli command.Cli) func() {
if !streams.NewOut(cli.Err()).IsTerminal() {
return func() {}
}
fmt.Fprint(cli.Err(), aec.Save)
fmt.Fprintln(cli.Err(), "Preparing to copy...")
return func() {
fmt.Fprint(cli.Err(), aec.Restore)
fmt.Fprint(cli.Err(), aec.EraseLine(aec.EraseModes.All))
}
}
// We use `:` as a delimiter between CONTAINER and PATH, but `:` could also be // We use `:` as a delimiter between CONTAINER and PATH, but `:` could also be
// in a valid LOCALPATH, like `file:name.txt`. We can resolve this ambiguity by // in a valid LOCALPATH, like `file:name.txt`. We can resolve this ambiguity by
// requiring a LOCALPATH with a `:` to be made explicit with a relative or // requiring a LOCALPATH with a `:` to be made explicit with a relative or