Refactor holdHijackedConnection

It has been refactored to a hijackedIOStreamer type which has several
methods which are used to prepare input and handle streaming the input
and output separately.

Docker-DCO-1.1-Signed-off-by: Josh Hawn <josh.hawn@docker.com> (github: jlhawn)
This commit is contained in:
Josh Hawn 2017-05-17 11:40:56 -07:00
parent 38591f20d0
commit 6eca53c7ae
5 changed files with 190 additions and 90 deletions

View File

@ -109,7 +109,18 @@ func runAttach(dockerCli *command.DockerCli, opts *attachOptions) error {
logrus.Debugf("Error monitoring TTY size: %s", err) logrus.Debugf("Error monitoring TTY size: %s", err)
} }
} }
if err := holdHijackedConnection(ctx, dockerCli, c.Config.Tty, options.DetachKeys, in, dockerCli.Out(), dockerCli.Err(), resp); err != nil {
streamer := hijackedIOStreamer{
streams: dockerCli,
inputStream: in,
outputStream: dockerCli.Out(),
errorStream: dockerCli.Err(),
resp: resp,
tty: c.Config.Tty,
detachKeys: options.DetachKeys,
}
if err := streamer.stream(ctx); err != nil {
return err return err
} }

View File

@ -146,7 +146,17 @@ func runExec(dockerCli *command.DockerCli, options *execOptions, container strin
} }
defer resp.Close() defer resp.Close()
errCh = promise.Go(func() error { errCh = promise.Go(func() error {
return holdHijackedConnection(ctx, dockerCli, execConfig.Tty, execConfig.DetachKeys, in, out, stderr, resp) streamer := hijackedIOStreamer{
streams: dockerCli,
inputStream: in,
outputStream: out,
errorStream: stderr,
resp: resp,
tty: execConfig.Tty,
detachKeys: execConfig.DetachKeys,
}
return streamer.stream(ctx)
}) })
if execConfig.Tty && dockerCli.In().IsTerminal() { if execConfig.Tty && dockerCli.In().IsTerminal() {

View File

@ -1,6 +1,7 @@
package container package container
import ( import (
"fmt"
"io" "io"
"runtime" "runtime"
"sync" "sync"
@ -15,108 +16,166 @@ import (
) )
// The default escape key sequence: ctrl-p, ctrl-q // The default escape key sequence: ctrl-p, ctrl-q
// TODO: This could be moved to `pkg/term`.
var defaultEscapeKeys = []byte{16, 17} var defaultEscapeKeys = []byte{16, 17}
// holdHijackedConnection handles copying input to and output from streams to the // A hijackedIOStreamer handles copying input to and output from streams to the
// connection // connection.
// nolint: gocyclo type hijackedIOStreamer struct {
func holdHijackedConnection(ctx context.Context, streams command.Streams, tty bool, detachKeys string, inputStream io.ReadCloser, outputStream, errorStream io.Writer, resp types.HijackedResponse) error { streams command.Streams
var ( inputStream io.ReadCloser
err error outputStream io.Writer
restoreOnce sync.Once errorStream io.Writer
)
if inputStream != nil && tty {
if err := setRawTerminal(streams); err != nil {
return err
}
defer func() {
restoreOnce.Do(func() {
restoreTerminal(streams, inputStream)
})
}()
// Wrap the input to detect detach control sequence. resp types.HijackedResponse
// Use default detach sequence if an invalid sequence is given.
escapeKeys, err := term.ToBytes(detachKeys) tty bool
if len(escapeKeys) == 0 || err != nil { detachKeys string
escapeKeys = defaultEscapeKeys }
// stream handles setting up the IO and then begins streaming stdin/stdout
// to/from the hijacked connection, blocking until it is either done reading
// output, the user inputs the detach key sequence when in TTY mode, or when
// the given context is cancelled.
func (h *hijackedIOStreamer) stream(ctx context.Context) error {
restoreInput, err := h.setupInput()
if err != nil {
return fmt.Errorf("unable to setup input stream: %s", err)
} }
inputStream = ioutils.NewReadCloserWrapper(term.NewEscapeProxy(inputStream, escapeKeys), inputStream.Close) defer restoreInput()
}
receiveStdout := make(chan error, 1) outputDone := h.beginOutputStream(restoreInput)
if outputStream != nil || errorStream != nil { inputDone, detached := h.beginInputStream(restoreInput)
go func() {
// When TTY is ON, use regular copy
if tty && outputStream != nil {
_, err = io.Copy(outputStream, resp.Reader)
// we should restore the terminal as soon as possible once connection end
// so any following print messages will be in normal type.
if inputStream != nil {
restoreOnce.Do(func() {
restoreTerminal(streams, inputStream)
})
}
} else {
_, err = stdcopy.StdCopy(outputStream, errorStream, resp.Reader)
}
logrus.Debug("[hijack] End of stdout")
receiveStdout <- err
}()
}
stdinDone := make(chan struct{})
detachedC := make(chan term.EscapeError)
go func() {
if inputStream != nil {
_, inputErr := io.Copy(resp.Conn, inputStream)
// we should restore the terminal as soon as possible once connection end
// so any following print messages will be in normal type.
if tty {
restoreOnce.Do(func() {
restoreTerminal(streams, inputStream)
})
}
logrus.Debug("[hijack] End of stdin")
if detached, ok := inputErr.(term.EscapeError); ok {
detachedC <- detached
return
}
}
if err := resp.CloseWrite(); err != nil {
logrus.Debugf("Couldn't send EOF: %s", err)
}
close(stdinDone)
}()
select { select {
case err := <-receiveStdout: case err := <-outputDone:
if err != nil {
logrus.Debugf("Error receiveStdout: %s", err)
return err return err
} case <-inputDone:
case <-stdinDone: // Input stream has closed.
if outputStream != nil || errorStream != nil { if h.outputStream != nil || h.errorStream != nil {
// Wait for output to complete streaming.
select { select {
case err := <-receiveStdout: case err := <-outputDone:
if err != nil {
logrus.Debugf("Error receiveStdout: %s", err)
return err return err
}
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err()
} }
} }
case err := <-detachedC: return nil
case err := <-detached:
// Got a detach key sequence. // Got a detach key sequence.
return err return err
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err()
}
}
func (h *hijackedIOStreamer) setupInput() (restore func(), err error) {
if h.inputStream == nil || !h.tty {
// No need to setup input TTY.
// The restore func is a nop.
return func() {}, nil
} }
if err := setRawTerminal(h.streams); err != nil {
return nil, fmt.Errorf("unable to set IO streams as raw terminal: %s", err)
}
// Use sync.Once so we may call restore multiple times but ensure we
// only restore the terminal once.
var restoreOnce sync.Once
restore = func() {
restoreOnce.Do(func() {
restoreTerminal(h.streams, h.inputStream)
})
}
// Wrap the input to detect detach escape sequence.
// Use default escape keys if an invalid sequence is given.
escapeKeys := defaultEscapeKeys
if h.detachKeys != "" {
customEscapeKeys, err := term.ToBytes(h.detachKeys)
if err != nil {
logrus.Warnf("invalid detach escape keys, using default: %s", err)
} else {
escapeKeys = customEscapeKeys
}
}
h.inputStream = ioutils.NewReadCloserWrapper(term.NewEscapeProxy(h.inputStream, escapeKeys), h.inputStream.Close)
return restore, nil
}
func (h *hijackedIOStreamer) beginOutputStream(restoreInput func()) <-chan error {
if h.outputStream == nil && h.errorStream == nil {
// Ther is no need to copy output.
return nil return nil
}
outputDone := make(chan error)
go func() {
var err error
// When TTY is ON, use regular copy
if h.outputStream != nil && h.tty {
_, err = io.Copy(h.outputStream, h.resp.Reader)
// We should restore the terminal as soon as possible
// once the connection ends so any following print
// messages will be in normal type.
restoreInput()
} else {
_, err = stdcopy.StdCopy(h.outputStream, h.errorStream, h.resp.Reader)
}
logrus.Debug("[hijack] End of stdout")
if err != nil {
logrus.Debugf("Error receiveStdout: %s", err)
}
outputDone <- err
}()
return outputDone
}
func (h *hijackedIOStreamer) beginInputStream(restoreInput func()) (doneC <-chan struct{}, detachedC <-chan error) {
inputDone := make(chan struct{})
detached := make(chan error)
go func() {
if h.inputStream != nil {
_, err := io.Copy(h.resp.Conn, h.inputStream)
// We should restore the terminal as soon as possible
// once the connection ends so any following print
// messages will be in normal type.
restoreInput()
logrus.Debug("[hijack] End of stdin")
if _, ok := err.(term.EscapeError); ok {
detached <- err
return
}
if err != nil {
// This error will also occur on the receive
// side (from stdout) where it will be
// propogated back to the caller.
logrus.Debugf("Error sendStdin: %s", err)
}
}
if err := h.resp.CloseWrite(); err != nil {
logrus.Debugf("Couldn't send EOF: %s", err)
}
close(inputDone)
}()
return inputDone, detached
} }
func setRawTerminal(streams command.Streams) error { func setRawTerminal(streams command.Streams) error {

View File

@ -176,8 +176,8 @@ func runContainer(dockerCli *command.DockerCli, opts *runOptions, copts *contain
//start the container //start the container
if err := client.ContainerStart(ctx, createResponse.ID, types.ContainerStartOptions{}); err != nil { if err := client.ContainerStart(ctx, createResponse.ID, types.ContainerStartOptions{}); err != nil {
// If we have holdHijackedConnection, we should notify // If we have hijackedIOStreamer, we should notify
// holdHijackedConnection we are going to exit and wait // hijackedIOStreamer we are going to exit and wait
// to avoid the terminal are not restored. // to avoid the terminal are not restored.
if attach { if attach {
cancelFun() cancelFun()
@ -267,7 +267,17 @@ func attachContainer(
} }
*errCh = promise.Go(func() error { *errCh = promise.Go(func() error {
if errHijack := holdHijackedConnection(ctx, dockerCli, config.Tty, options.DetachKeys, in, out, cerr, resp); errHijack != nil { streamer := hijackedIOStreamer{
streams: dockerCli,
inputStream: in,
outputStream: out,
errorStream: cerr,
resp: resp,
tty: config.Tty,
detachKeys: options.DetachKeys,
}
if errHijack := streamer.stream(ctx); errHijack != nil {
return errHijack return errHijack
} }
return errAttach return errAttach

View File

@ -104,7 +104,17 @@ func runStart(dockerCli *command.DockerCli, opts *startOptions) error {
} }
defer resp.Close() defer resp.Close()
cErr := promise.Go(func() error { cErr := promise.Go(func() error {
errHijack := holdHijackedConnection(ctx, dockerCli, c.Config.Tty, options.DetachKeys, in, dockerCli.Out(), dockerCli.Err(), resp) streamer := hijackedIOStreamer{
streams: dockerCli,
inputStream: in,
outputStream: dockerCli.Out(),
errorStream: dockerCli.Err(),
resp: resp,
tty: c.Config.Tty,
detachKeys: options.DetachKeys,
}
errHijack := streamer.stream(ctx)
if errHijack == nil { if errHijack == nil {
return errAttach return errAttach
} }