mirror of https://github.com/docker/cli.git
Merge pull request #25853 from jhorwit2/jah/event-refactor
Refactor to new engine-api events api
This commit is contained in:
commit
e58c0c6e1b
|
@ -211,10 +211,7 @@ func runRun(dockerCli *command.DockerCli, flags *pflag.FlagSet, opts *runOptions
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
statusChan, err := waitExitOrRemoved(dockerCli, context.Background(), createResponse.ID, hostConfig.AutoRemove)
|
statusChan := waitExitOrRemoved(dockerCli, ctx, createResponse.ID, hostConfig.AutoRemove)
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("Error waiting container's exit code: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
//start the container
|
//start the container
|
||||||
if err := client.ContainerStart(ctx, createResponse.ID, types.ContainerStartOptions{}); err != nil {
|
if err := client.ContainerStart(ctx, createResponse.ID, types.ContainerStartOptions{}); err != nil {
|
||||||
|
|
|
@ -108,7 +108,7 @@ func runStart(dockerCli *command.DockerCli, opts *startOptions) error {
|
||||||
|
|
||||||
// 3. We should open a channel for receiving status code of the container
|
// 3. We should open a channel for receiving status code of the container
|
||||||
// no matter it's detached, removed on daemon side(--rm) or exit normally.
|
// no matter it's detached, removed on daemon side(--rm) or exit normally.
|
||||||
statusChan, statusErr := waitExitOrRemoved(dockerCli, context.Background(), c.ID, c.HostConfig.AutoRemove)
|
statusChan := waitExitOrRemoved(dockerCli, ctx, c.ID, c.HostConfig.AutoRemove)
|
||||||
startOptions := types.ContainerStartOptions{
|
startOptions := types.ContainerStartOptions{
|
||||||
CheckpointID: opts.checkpoint,
|
CheckpointID: opts.checkpoint,
|
||||||
}
|
}
|
||||||
|
@ -117,7 +117,7 @@ func runStart(dockerCli *command.DockerCli, opts *startOptions) error {
|
||||||
if err := dockerCli.Client().ContainerStart(ctx, c.ID, startOptions); err != nil {
|
if err := dockerCli.Client().ContainerStart(ctx, c.ID, startOptions); err != nil {
|
||||||
cancelFun()
|
cancelFun()
|
||||||
<-cErr
|
<-cErr
|
||||||
if c.HostConfig.AutoRemove && statusErr == nil {
|
if c.HostConfig.AutoRemove {
|
||||||
// wait container to be removed
|
// wait container to be removed
|
||||||
<-statusChan
|
<-statusChan
|
||||||
}
|
}
|
||||||
|
@ -134,10 +134,6 @@ func runStart(dockerCli *command.DockerCli, opts *startOptions) error {
|
||||||
return attchErr
|
return attchErr
|
||||||
}
|
}
|
||||||
|
|
||||||
if statusErr != nil {
|
|
||||||
return fmt.Errorf("can't get container's exit code: %v", statusErr)
|
|
||||||
}
|
|
||||||
|
|
||||||
if status := <-statusChan; status != 0 {
|
if status := <-statusChan; status != 0 {
|
||||||
return cli.StatusError{StatusCode: status}
|
return cli.StatusError{StatusCode: status}
|
||||||
}
|
}
|
||||||
|
|
|
@ -63,24 +63,22 @@ func runStats(dockerCli *command.DockerCli, opts *statsOptions) error {
|
||||||
options := types.EventsOptions{
|
options := types.EventsOptions{
|
||||||
Filters: f,
|
Filters: f,
|
||||||
}
|
}
|
||||||
resBody, err := dockerCli.Client().Events(ctx, options)
|
|
||||||
// Whether we successfully subscribed to events or not, we can now
|
eventq, errq := dockerCli.Client().Events(ctx, options)
|
||||||
|
|
||||||
|
// Whether we successfully subscribed to eventq or not, we can now
|
||||||
// unblock the main goroutine.
|
// unblock the main goroutine.
|
||||||
close(started)
|
close(started)
|
||||||
if err != nil {
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case event := <-eventq:
|
||||||
|
c <- event
|
||||||
|
case err := <-errq:
|
||||||
closeChan <- err
|
closeChan <- err
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer resBody.Close()
|
|
||||||
|
|
||||||
system.DecodeEvents(resBody, func(event events.Message, err error) error {
|
|
||||||
if err != nil {
|
|
||||||
closeChan <- err
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
c <- event
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// waitFirst is a WaitGroup to wait first stat data's reach for each container
|
// waitFirst is a WaitGroup to wait first stat data's reach for each container
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package container
|
package container
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
|
@ -11,11 +10,10 @@ import (
|
||||||
"github.com/docker/docker/api/types/events"
|
"github.com/docker/docker/api/types/events"
|
||||||
"github.com/docker/docker/api/types/filters"
|
"github.com/docker/docker/api/types/filters"
|
||||||
"github.com/docker/docker/cli/command"
|
"github.com/docker/docker/cli/command"
|
||||||
"github.com/docker/docker/cli/command/system"
|
|
||||||
clientapi "github.com/docker/docker/client"
|
clientapi "github.com/docker/docker/client"
|
||||||
)
|
)
|
||||||
|
|
||||||
func waitExitOrRemoved(dockerCli *command.DockerCli, ctx context.Context, containerID string, waitRemove bool) (chan int, error) {
|
func waitExitOrRemoved(dockerCli *command.DockerCli, ctx context.Context, containerID string, waitRemove bool) chan int {
|
||||||
if len(containerID) == 0 {
|
if len(containerID) == 0 {
|
||||||
// containerID can never be empty
|
// containerID can never be empty
|
||||||
panic("Internal Error: waitExitOrRemoved needs a containerID as parameter")
|
panic("Internal Error: waitExitOrRemoved needs a containerID as parameter")
|
||||||
|
@ -24,11 +22,7 @@ func waitExitOrRemoved(dockerCli *command.DockerCli, ctx context.Context, contai
|
||||||
statusChan := make(chan int)
|
statusChan := make(chan int)
|
||||||
exitCode := 125
|
exitCode := 125
|
||||||
|
|
||||||
eventProcessor := func(e events.Message, err error) error {
|
eventProcessor := func(e events.Message) bool {
|
||||||
if err != nil {
|
|
||||||
statusChan <- exitCode
|
|
||||||
return fmt.Errorf("failed to decode event: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
stopProcessing := false
|
stopProcessing := false
|
||||||
switch e.Status {
|
switch e.Status {
|
||||||
|
@ -53,11 +47,10 @@ func waitExitOrRemoved(dockerCli *command.DockerCli, ctx context.Context, contai
|
||||||
|
|
||||||
if stopProcessing {
|
if stopProcessing {
|
||||||
statusChan <- exitCode
|
statusChan <- exitCode
|
||||||
// stop the loop processing
|
return true
|
||||||
return fmt.Errorf("done")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get events via Events API
|
// Get events via Events API
|
||||||
|
@ -67,14 +60,29 @@ func waitExitOrRemoved(dockerCli *command.DockerCli, ctx context.Context, contai
|
||||||
options := types.EventsOptions{
|
options := types.EventsOptions{
|
||||||
Filters: f,
|
Filters: f,
|
||||||
}
|
}
|
||||||
resBody, err := dockerCli.Client().Events(ctx, options)
|
|
||||||
if err != nil {
|
eventCtx, cancel := context.WithCancel(ctx)
|
||||||
return nil, fmt.Errorf("can't get events from daemon: %v", err)
|
eventq, errq := dockerCli.Client().Events(eventCtx, options)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case evt := <-eventq:
|
||||||
|
if eventProcessor(evt) {
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
go system.DecodeEvents(resBody, eventProcessor)
|
case err := <-errq:
|
||||||
|
logrus.Errorf("error getting events from daemon: %v", err)
|
||||||
|
statusChan <- exitCode
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
return statusChan, nil
|
return statusChan
|
||||||
}
|
}
|
||||||
|
|
||||||
// getExitCode performs an inspect on the container. It returns
|
// getExitCode performs an inspect on the container. It returns
|
||||||
|
|
|
@ -63,13 +63,33 @@ func runEvents(dockerCli *command.DockerCli, opts *eventsOptions) error {
|
||||||
Filters: opts.filter.Value(),
|
Filters: opts.filter.Value(),
|
||||||
}
|
}
|
||||||
|
|
||||||
responseBody, err := dockerCli.Client().Events(context.Background(), options)
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
if err != nil {
|
events, errs := dockerCli.Client().Events(ctx, options)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
out := dockerCli.Out()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case event := <-events:
|
||||||
|
if err := handleEvent(out, event, tmpl); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer responseBody.Close()
|
case err := <-errs:
|
||||||
|
if err == io.EOF {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return streamEvents(dockerCli.Out(), responseBody, tmpl)
|
func handleEvent(out io.Writer, event eventtypes.Message, tmpl *template.Template) error {
|
||||||
|
if tmpl == nil {
|
||||||
|
return prettyPrintEvent(out, event)
|
||||||
|
}
|
||||||
|
|
||||||
|
return formatEvent(out, event, tmpl)
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeTemplate(format string) (*template.Template, error) {
|
func makeTemplate(format string) (*template.Template, error) {
|
||||||
|
@ -85,21 +105,6 @@ func makeTemplate(format string) (*template.Template, error) {
|
||||||
return tmpl, tmpl.Execute(ioutil.Discard, &eventtypes.Message{})
|
return tmpl, tmpl.Execute(ioutil.Discard, &eventtypes.Message{})
|
||||||
}
|
}
|
||||||
|
|
||||||
// streamEvents decodes prints the incoming events in the provided output.
|
|
||||||
func streamEvents(out io.Writer, input io.Reader, tmpl *template.Template) error {
|
|
||||||
return DecodeEvents(input, func(event eventtypes.Message, err error) error {
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if tmpl == nil {
|
|
||||||
return prettyPrintEvent(out, event)
|
|
||||||
}
|
|
||||||
return formatEvent(out, event, tmpl)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
type eventProcessor func(event eventtypes.Message, err error) error
|
|
||||||
|
|
||||||
// prettyPrintEvent prints all types of event information.
|
// prettyPrintEvent prints all types of event information.
|
||||||
// Each output includes the event type, actor id, name and action.
|
// Each output includes the event type, actor id, name and action.
|
||||||
// Actor attributes are printed at the end if the actor has any.
|
// Actor attributes are printed at the end if the actor has any.
|
||||||
|
|
|
@ -1,14 +1,14 @@
|
||||||
package system
|
package system
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
|
||||||
"io"
|
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/Sirupsen/logrus"
|
"github.com/Sirupsen/logrus"
|
||||||
eventtypes "github.com/docker/docker/api/types/events"
|
eventtypes "github.com/docker/docker/api/types/events"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type eventProcessor func(eventtypes.Message, error) error
|
||||||
|
|
||||||
// EventHandler is abstract interface for user to customize
|
// EventHandler is abstract interface for user to customize
|
||||||
// own handle functions of each type of events
|
// own handle functions of each type of events
|
||||||
type EventHandler interface {
|
type EventHandler interface {
|
||||||
|
@ -47,20 +47,3 @@ func (w *eventHandler) Watch(c <-chan eventtypes.Message) {
|
||||||
go h(e)
|
go h(e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// DecodeEvents decodes event from input stream
|
|
||||||
func DecodeEvents(input io.Reader, ep eventProcessor) error {
|
|
||||||
dec := json.NewDecoder(input)
|
|
||||||
for {
|
|
||||||
var event eventtypes.Message
|
|
||||||
err := dec.Decode(&event)
|
|
||||||
if err != nil && err == io.EOF {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
if procErr := ep(event, err); procErr != nil {
|
|
||||||
return procErr
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in New Issue