Use long running session in builder

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>

Add incremental context send support

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
This commit is contained in:
Tonis Tiigi 2017-05-15 14:14:31 -07:00
parent ae8d049f9e
commit b95638a5ac
6 changed files with 309 additions and 18 deletions

View File

@ -34,6 +34,7 @@ type DiskUsageContext struct {
Images []*types.ImageSummary Images []*types.ImageSummary
Containers []*types.Container Containers []*types.Container
Volumes []*types.Volume Volumes []*types.Volume
BuilderSize int64
} }
func (ctx *DiskUsageContext) startSubsection(format string) (*template.Template, error) { func (ctx *DiskUsageContext) startSubsection(format string) (*template.Template, error) {
@ -97,6 +98,13 @@ func (ctx *DiskUsageContext) Write() (err error) {
return err return err
} }
err = ctx.contextFormat(tmpl, &diskUsageBuilderContext{
builderSize: ctx.BuilderSize,
})
if err != nil {
return err
}
diskUsageContainersCtx := diskUsageContainersContext{containers: []*types.Container{}} diskUsageContainersCtx := diskUsageContainersContext{containers: []*types.Container{}}
diskUsageContainersCtx.header = map[string]string{ diskUsageContainersCtx.header = map[string]string{
"Type": typeHeader, "Type": typeHeader,
@ -179,6 +187,9 @@ func (ctx *DiskUsageContext) verboseWrite() (err error) {
} }
} }
ctx.postFormat(tmpl, newVolumeContext()) ctx.postFormat(tmpl, newVolumeContext())
// And build cache
fmt.Fprintf(ctx.Output, "\nBuild cache usage: %s\n\n", units.HumanSize(float64(ctx.BuilderSize)))
return return
} }
@ -357,3 +368,32 @@ func (c *diskUsageVolumesContext) Reclaimable() string {
return fmt.Sprintf("%s", units.HumanSize(float64(reclaimable))) return fmt.Sprintf("%s", units.HumanSize(float64(reclaimable)))
} }
type diskUsageBuilderContext struct {
HeaderContext
builderSize int64
}
func (c *diskUsageBuilderContext) MarshalJSON() ([]byte, error) {
return marshalJSON(c)
}
func (c *diskUsageBuilderContext) Type() string {
return "Build Cache"
}
func (c *diskUsageBuilderContext) TotalCount() string {
return ""
}
func (c *diskUsageBuilderContext) Active() string {
return ""
}
func (c *diskUsageBuilderContext) Size() string {
return units.HumanSize(float64(c.builderSize))
}
func (c *diskUsageBuilderContext) Reclaimable() string {
return c.Size()
}

View File

@ -23,6 +23,7 @@ func TestDiskUsageContextFormatWrite(t *testing.T) {
Images 0 0 0B 0B Images 0 0 0B 0B
Containers 0 0 0B 0B Containers 0 0 0B 0B
Local Volumes 0 0 0B 0B Local Volumes 0 0 0B 0B
Build Cache 0B 0B
`, `,
}, },
{ {
@ -38,6 +39,9 @@ CONTAINER ID IMAGE COMMAND LOCAL VOLUMES
Local Volumes space usage: Local Volumes space usage:
VOLUME NAME LINKS SIZE VOLUME NAME LINKS SIZE
Build cache usage: 0B
`, `,
}, },
// Errors // Errors
@ -70,6 +74,7 @@ VOLUME NAME LINKS SIZE
Images 0 0 0B 0B Images 0 0 0B 0B
Containers 0 0 0B 0B Containers 0 0 0B 0B
Local Volumes 0 0 0B 0B Local Volumes 0 0 0B 0B
Build Cache 0B 0B
`, `,
}, },
{ {
@ -82,6 +87,7 @@ Local Volumes 0 0 0B
Images 0 Images 0
Containers 0 Containers 0
Local Volumes 0 Local Volumes 0
Build Cache
`, `,
}, },
// Raw Format // Raw Format
@ -109,6 +115,12 @@ active: 0
size: 0B size: 0B
reclaimable: 0B reclaimable: 0B
type: Build Cache
total:
active:
size: 0B
reclaimable: 0B
`, `,
}, },
} }

View File

@ -12,6 +12,7 @@ import (
"regexp" "regexp"
"runtime" "runtime"
"github.com/Sirupsen/logrus"
"github.com/docker/cli/cli" "github.com/docker/cli/cli"
"github.com/docker/cli/cli/command" "github.com/docker/cli/cli/command"
"github.com/docker/cli/cli/command/image/build" "github.com/docker/cli/cli/command/image/build"
@ -61,6 +62,7 @@ type buildOptions struct {
squash bool squash bool
target string target string
imageIDFile string imageIDFile string
stream bool
} }
// dockerfileFromStdin returns true when the user specified that the Dockerfile // dockerfileFromStdin returns true when the user specified that the Dockerfile
@ -133,6 +135,10 @@ func NewBuildCommand(dockerCli *command.DockerCli) *cobra.Command {
flags.SetAnnotation("squash", "experimental", nil) flags.SetAnnotation("squash", "experimental", nil)
flags.SetAnnotation("squash", "version", []string{"1.25"}) flags.SetAnnotation("squash", "version", []string{"1.25"})
flags.BoolVar(&options.stream, "stream", false, "Stream attaches to server to negotiate build context")
flags.SetAnnotation("stream", "experimental", nil)
flags.SetAnnotation("stream", "version", []string{"1.31"})
return cmd return cmd
} }
@ -163,6 +169,7 @@ func runBuild(dockerCli *command.DockerCli, options buildOptions) error {
relDockerfile string relDockerfile string
progBuff io.Writer progBuff io.Writer
buildBuff io.Writer buildBuff io.Writer
remote string
) )
if options.dockerfileFromStdin() { if options.dockerfileFromStdin() {
@ -188,6 +195,7 @@ func runBuild(dockerCli *command.DockerCli, options buildOptions) error {
switch { switch {
case options.contextFromStdin(): case options.contextFromStdin():
// buildCtx is tar archive. if stdin was dockerfile then it is wrapped
buildCtx, relDockerfile, err = build.GetContextFromReader(dockerCli.In(), options.dockerfileName) buildCtx, relDockerfile, err = build.GetContextFromReader(dockerCli.In(), options.dockerfileName)
case isLocalDir(specifiedContext): case isLocalDir(specifiedContext):
contextDir, relDockerfile, err = build.GetContextFromLocalDir(specifiedContext, options.dockerfileName) contextDir, relDockerfile, err = build.GetContextFromLocalDir(specifiedContext, options.dockerfileName)
@ -211,7 +219,8 @@ func runBuild(dockerCli *command.DockerCli, options buildOptions) error {
contextDir = tempDir contextDir = tempDir
} }
if buildCtx == nil { // read from a directory into tar archive
if buildCtx == nil && !options.stream {
excludes, err := build.ReadDockerignore(contextDir) excludes, err := build.ReadDockerignore(contextDir)
if err != nil { if err != nil {
return err return err
@ -242,24 +251,45 @@ func runBuild(dockerCli *command.DockerCli, options buildOptions) error {
} }
} }
// replace Dockerfile if added dynamically // replace Dockerfile if it was added from stdin and there is archive context
if dockerfileCtx != nil { if dockerfileCtx != nil && buildCtx != nil {
buildCtx, relDockerfile, err = build.AddDockerfileToBuildContext(dockerfileCtx, buildCtx) buildCtx, relDockerfile, err = build.AddDockerfileToBuildContext(dockerfileCtx, buildCtx)
if err != nil { if err != nil {
return err return err
} }
} }
ctx := context.Background() // if streaming and dockerfile was not from stdin then read from file
// to the same reader that is usually stdin
if options.stream && dockerfileCtx == nil {
dockerfileCtx, err = os.Open(relDockerfile)
if err != nil {
return errors.Wrapf(err, "failed to open %s", relDockerfile)
}
defer dockerfileCtx.Close()
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var resolvedTags []*resolvedTag var resolvedTags []*resolvedTag
if command.IsTrusted() { if command.IsTrusted() {
translator := func(ctx context.Context, ref reference.NamedTagged) (reference.Canonical, error) { translator := func(ctx context.Context, ref reference.NamedTagged) (reference.Canonical, error) {
return TrustedReference(ctx, dockerCli, ref, nil) return TrustedReference(ctx, dockerCli, ref, nil)
} }
// if there is a tar wrapper, the dockerfile needs to be replaced inside it
if buildCtx != nil {
// Wrap the tar archive to replace the Dockerfile entry with the rewritten // Wrap the tar archive to replace the Dockerfile entry with the rewritten
// Dockerfile which uses trusted pulls. // Dockerfile which uses trusted pulls.
buildCtx = replaceDockerfileTarWrapper(ctx, buildCtx, relDockerfile, translator, &resolvedTags) buildCtx = replaceDockerfileTarWrapper(ctx, buildCtx, relDockerfile, translator, &resolvedTags)
} else if dockerfileCtx != nil {
// if there was not archive context still do the possible replacements in Dockerfile
newDockerfile, _, err := rewriteDockerfileFrom(ctx, dockerfileCtx, translator)
if err != nil {
return err
}
dockerfileCtx = ioutil.NopCloser(bytes.NewBuffer(newDockerfile))
}
} }
// Setup an upload progress bar // Setup an upload progress bar
@ -268,7 +298,43 @@ func runBuild(dockerCli *command.DockerCli, options buildOptions) error {
progressOutput = &lastProgressOutput{output: progressOutput} progressOutput = &lastProgressOutput{output: progressOutput}
} }
var body io.Reader = progress.NewProgressReader(buildCtx, progressOutput, 0, "", "Sending build context to Docker daemon") // if up to this point nothing has set the context then we must have have
// another way for sending it(streaming) and set the context to the Dockerfile
if dockerfileCtx != nil && buildCtx == nil {
buildCtx = dockerfileCtx
}
s, err := trySession(dockerCli, contextDir)
if err != nil {
return err
}
var body io.Reader
if buildCtx != nil && !options.stream {
body = progress.NewProgressReader(buildCtx, progressOutput, 0, "", "Sending build context to Docker daemon")
}
// add context stream to the session
if options.stream && s != nil {
syncDone := make(chan error) // used to signal first progress reporting completed.
// progress would also send errors but don't need it here as errors
// are handled by session.Run() and ImageBuild()
if err := addDirToSession(s, contextDir, progressOutput, syncDone); err != nil {
return err
}
buf := newBufferedWriter(syncDone, buildBuff)
defer func() {
select {
case <-buf.flushed:
case <-ctx.Done():
}
}()
buildBuff = buf
remote = clientSessionRemote
body = buildCtx
}
authConfigs, _ := dockerCli.GetAllCredentials() authConfigs, _ := dockerCli.GetAllCredentials()
buildOptions := types.ImageBuildOptions{ buildOptions := types.ImageBuildOptions{
@ -299,6 +365,18 @@ func runBuild(dockerCli *command.DockerCli, options buildOptions) error {
Squash: options.squash, Squash: options.squash,
ExtraHosts: options.extraHosts.GetAll(), ExtraHosts: options.extraHosts.GetAll(),
Target: options.target, Target: options.target,
RemoteContext: remote,
}
if s != nil {
go func() {
logrus.Debugf("running session: %v", s.UUID())
if err := s.Run(ctx, dockerCli.Client().DialSession); err != nil {
logrus.Error(err)
cancel() // cancel progress context
}
}()
buildOptions.SessionID = s.UUID()
} }
response, err := dockerCli.Client().ImageBuild(ctx, body, buildOptions) response, err := dockerCli.Client().ImageBuild(ctx, body, buildOptions)
@ -306,6 +384,7 @@ func runBuild(dockerCli *command.DockerCli, options buildOptions) error {
if options.quiet { if options.quiet {
fmt.Fprintf(dockerCli.Err(), "%s", progBuff) fmt.Fprintf(dockerCli.Err(), "%s", progBuff)
} }
cancel()
return err return err
} }
defer response.Body.Close() defer response.Body.Close()

View File

@ -0,0 +1,151 @@
package image
import (
"bytes"
"crypto/rand"
"crypto/sha256"
"encoding/hex"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"sync"
"time"
"github.com/docker/cli/cli/command"
"github.com/docker/cli/cli/command/image/build"
cliconfig "github.com/docker/cli/cli/config"
"github.com/docker/docker/api/types/versions"
"github.com/docker/docker/client/session"
"github.com/docker/docker/client/session/filesync"
"github.com/docker/docker/pkg/progress"
"github.com/pkg/errors"
"golang.org/x/time/rate"
)
const clientSessionRemote = "client-session"
func isSessionSupported(dockerCli *command.DockerCli) bool {
return dockerCli.ServerInfo().HasExperimental && versions.GreaterThanOrEqualTo(dockerCli.Client().ClientVersion(), "1.31")
}
func trySession(dockerCli *command.DockerCli, contextDir string) (*session.Session, error) {
var s *session.Session
if isSessionSupported(dockerCli) {
sharedKey, err := getBuildSharedKey(contextDir)
if err != nil {
return nil, errors.Wrap(err, "failed to get build shared key")
}
s, err = session.NewSession(filepath.Base(contextDir), sharedKey)
if err != nil {
return nil, errors.Wrap(err, "failed to create session")
}
}
return s, nil
}
func addDirToSession(session *session.Session, contextDir string, progressOutput progress.Output, done chan error) error {
excludes, err := build.ReadDockerignore(contextDir)
if err != nil {
return err
}
p := &sizeProgress{out: progressOutput, action: "Streaming build context to Docker daemon"}
workdirProvider := filesync.NewFSSyncProvider(contextDir, excludes)
session.Allow(workdirProvider)
// this will be replaced on parallel build jobs. keep the current
// progressbar for now
if snpc, ok := workdirProvider.(interface {
SetNextProgressCallback(func(int, bool), chan error)
}); ok {
snpc.SetNextProgressCallback(p.update, done)
}
return nil
}
type sizeProgress struct {
out progress.Output
action string
limiter *rate.Limiter
}
func (sp *sizeProgress) update(size int, last bool) {
if sp.limiter == nil {
sp.limiter = rate.NewLimiter(rate.Every(100*time.Millisecond), 1)
}
if last || sp.limiter.Allow() {
sp.out.WriteProgress(progress.Progress{Action: sp.action, Current: int64(size), LastUpdate: last})
}
}
type bufferedWriter struct {
done chan error
io.Writer
buf *bytes.Buffer
flushed chan struct{}
mu sync.Mutex
}
func newBufferedWriter(done chan error, w io.Writer) *bufferedWriter {
bw := &bufferedWriter{done: done, Writer: w, buf: new(bytes.Buffer), flushed: make(chan struct{})}
go func() {
<-done
bw.flushBuffer()
}()
return bw
}
func (bw *bufferedWriter) Write(dt []byte) (int, error) {
select {
case <-bw.done:
bw.flushBuffer()
return bw.Writer.Write(dt)
default:
return bw.buf.Write(dt)
}
}
func (bw *bufferedWriter) flushBuffer() {
bw.mu.Lock()
select {
case <-bw.flushed:
default:
bw.Writer.Write(bw.buf.Bytes())
close(bw.flushed)
}
bw.mu.Unlock()
}
func getBuildSharedKey(dir string) (string, error) {
// build session is hash of build dir with node based randomness
s := sha256.Sum256([]byte(fmt.Sprintf("%s:%s", tryNodeIdentifier(), dir)))
return hex.EncodeToString(s[:]), nil
}
func tryNodeIdentifier() (out string) {
out = cliconfig.Dir() // return config dir as default on permission error
if err := os.MkdirAll(cliconfig.Dir(), 0700); err == nil {
sessionFile := filepath.Join(cliconfig.Dir(), ".buildNodeID")
if _, err := os.Lstat(sessionFile); err != nil {
if os.IsNotExist(err) { // create a new file with stored randomness
b := make([]byte, 32)
if _, err := rand.Read(b); err != nil {
return
}
if err := ioutil.WriteFile(sessionFile, []byte(hex.EncodeToString(b)), 0600); err != nil {
return
}
}
}
dt, err := ioutil.ReadFile(sessionFile)
if err == nil {
return string(dt)
}
}
return
}

View File

@ -58,6 +58,7 @@ func runDiskUsage(dockerCli *command.DockerCli, opts diskUsageOptions) error {
Format: formatter.NewDiskUsageFormat(format), Format: formatter.NewDiskUsageFormat(format),
}, },
LayersSize: du.LayersSize, LayersSize: du.LayersSize,
BuilderSize: du.BuilderSize,
Images: du.Images, Images: du.Images,
Containers: du.Containers, Containers: du.Containers,
Volumes: du.Volumes, Volumes: du.Volumes,

View File

@ -9,6 +9,7 @@ import (
"github.com/docker/cli/opts" "github.com/docker/cli/opts"
units "github.com/docker/go-units" units "github.com/docker/go-units"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"golang.org/x/net/context"
) )
type pruneOptions struct { type pruneOptions struct {
@ -49,6 +50,7 @@ const (
- all volumes not used by at least one container - all volumes not used by at least one container
- all networks not used by at least one container - all networks not used by at least one container
%s %s
- all build cache
Are you sure you want to continue?` Are you sure you want to continue?`
danglingImageDesc = "- all dangling images" danglingImageDesc = "- all dangling images"
@ -97,6 +99,12 @@ func runPrune(dockerCli command.Cli, options pruneOptions) error {
fmt.Fprintln(dockerCli.Out(), output) fmt.Fprintln(dockerCli.Out(), output)
} }
report, err := dockerCli.Client().BuildCachePrune(context.Background())
if err != nil {
return err
}
spaceReclaimed += report.SpaceReclaimed
fmt.Fprintln(dockerCli.Out(), "Total reclaimed space:", units.HumanSize(float64(spaceReclaimed))) fmt.Fprintln(dockerCli.Out(), "Total reclaimed space:", units.HumanSize(float64(spaceReclaimed)))
return nil return nil