Refactor to new events api

Signed-off-by: Josh Horwitz <horwitzja@gmail.com>
This commit is contained in:
Josh Horwitz 2016-08-09 10:34:07 -10:00
parent bfbdb15f55
commit d700b90576
6 changed files with 67 additions and 80 deletions

View File

@ -211,10 +211,7 @@ func runRun(dockerCli *command.DockerCli, flags *pflag.FlagSet, opts *runOptions
}) })
} }
statusChan, err := waitExitOrRemoved(dockerCli, context.Background(), createResponse.ID, hostConfig.AutoRemove) statusChan := waitExitOrRemoved(dockerCli, ctx, createResponse.ID, hostConfig.AutoRemove)
if err != nil {
return fmt.Errorf("Error waiting container's exit code: %v", err)
}
//start the container //start the container
if err := client.ContainerStart(ctx, createResponse.ID, types.ContainerStartOptions{}); err != nil { if err := client.ContainerStart(ctx, createResponse.ID, types.ContainerStartOptions{}); err != nil {

View File

@ -108,7 +108,7 @@ func runStart(dockerCli *command.DockerCli, opts *startOptions) error {
// 3. We should open a channel for receiving status code of the container // 3. We should open a channel for receiving status code of the container
// no matter it's detached, removed on daemon side(--rm) or exit normally. // no matter it's detached, removed on daemon side(--rm) or exit normally.
statusChan, statusErr := waitExitOrRemoved(dockerCli, context.Background(), c.ID, c.HostConfig.AutoRemove) statusChan := waitExitOrRemoved(dockerCli, ctx, c.ID, c.HostConfig.AutoRemove)
startOptions := types.ContainerStartOptions{ startOptions := types.ContainerStartOptions{
CheckpointID: opts.checkpoint, CheckpointID: opts.checkpoint,
} }
@ -117,7 +117,7 @@ func runStart(dockerCli *command.DockerCli, opts *startOptions) error {
if err := dockerCli.Client().ContainerStart(ctx, c.ID, startOptions); err != nil { if err := dockerCli.Client().ContainerStart(ctx, c.ID, startOptions); err != nil {
cancelFun() cancelFun()
<-cErr <-cErr
if c.HostConfig.AutoRemove && statusErr == nil { if c.HostConfig.AutoRemove {
// wait container to be removed // wait container to be removed
<-statusChan <-statusChan
} }
@ -134,10 +134,6 @@ func runStart(dockerCli *command.DockerCli, opts *startOptions) error {
return attchErr return attchErr
} }
if statusErr != nil {
return fmt.Errorf("can't get container's exit code: %v", statusErr)
}
if status := <-statusChan; status != 0 { if status := <-statusChan; status != 0 {
return cli.StatusError{StatusCode: status} return cli.StatusError{StatusCode: status}
} }

View File

@ -63,24 +63,22 @@ func runStats(dockerCli *command.DockerCli, opts *statsOptions) error {
options := types.EventsOptions{ options := types.EventsOptions{
Filters: f, Filters: f,
} }
resBody, err := dockerCli.Client().Events(ctx, options)
// Whether we successfully subscribed to events or not, we can now eventq, errq := dockerCli.Client().Events(ctx, options)
// Whether we successfully subscribed to eventq or not, we can now
// unblock the main goroutine. // unblock the main goroutine.
close(started) close(started)
if err != nil {
closeChan <- err
return
}
defer resBody.Close()
system.DecodeEvents(resBody, func(event events.Message, err error) error { for {
if err != nil { select {
case event := <-eventq:
c <- event
case err := <-errq:
closeChan <- err closeChan <- err
return nil return
} }
c <- event }
return nil
})
} }
// waitFirst is a WaitGroup to wait first stat data's reach for each container // waitFirst is a WaitGroup to wait first stat data's reach for each container

View File

@ -1,7 +1,6 @@
package container package container
import ( import (
"fmt"
"strconv" "strconv"
"golang.org/x/net/context" "golang.org/x/net/context"
@ -11,11 +10,10 @@ import (
"github.com/docker/docker/api/types/events" "github.com/docker/docker/api/types/events"
"github.com/docker/docker/api/types/filters" "github.com/docker/docker/api/types/filters"
"github.com/docker/docker/cli/command" "github.com/docker/docker/cli/command"
"github.com/docker/docker/cli/command/system"
clientapi "github.com/docker/docker/client" clientapi "github.com/docker/docker/client"
) )
func waitExitOrRemoved(dockerCli *command.DockerCli, ctx context.Context, containerID string, waitRemove bool) (chan int, error) { func waitExitOrRemoved(dockerCli *command.DockerCli, ctx context.Context, containerID string, waitRemove bool) chan int {
if len(containerID) == 0 { if len(containerID) == 0 {
// containerID can never be empty // containerID can never be empty
panic("Internal Error: waitExitOrRemoved needs a containerID as parameter") panic("Internal Error: waitExitOrRemoved needs a containerID as parameter")
@ -24,11 +22,7 @@ func waitExitOrRemoved(dockerCli *command.DockerCli, ctx context.Context, contai
statusChan := make(chan int) statusChan := make(chan int)
exitCode := 125 exitCode := 125
eventProcessor := func(e events.Message, err error) error { eventProcessor := func(e events.Message) bool {
if err != nil {
statusChan <- exitCode
return fmt.Errorf("failed to decode event: %v", err)
}
stopProcessing := false stopProcessing := false
switch e.Status { switch e.Status {
@ -53,11 +47,10 @@ func waitExitOrRemoved(dockerCli *command.DockerCli, ctx context.Context, contai
if stopProcessing { if stopProcessing {
statusChan <- exitCode statusChan <- exitCode
// stop the loop processing return true
return fmt.Errorf("done")
} }
return nil return false
} }
// Get events via Events API // Get events via Events API
@ -67,14 +60,29 @@ func waitExitOrRemoved(dockerCli *command.DockerCli, ctx context.Context, contai
options := types.EventsOptions{ options := types.EventsOptions{
Filters: f, Filters: f,
} }
resBody, err := dockerCli.Client().Events(ctx, options)
if err != nil {
return nil, fmt.Errorf("can't get events from daemon: %v", err)
}
go system.DecodeEvents(resBody, eventProcessor) eventCtx, cancel := context.WithCancel(ctx)
eventq, errq := dockerCli.Client().Events(eventCtx, options)
return statusChan, nil go func() {
defer cancel()
for {
select {
case evt := <-eventq:
if eventProcessor(evt) {
return
}
case err := <-errq:
logrus.Errorf("error getting events from daemon: %v", err)
statusChan <- exitCode
return
}
}
}()
return statusChan
} }
// getExitCode performs an inspect on the container. It returns // getExitCode performs an inspect on the container. It returns

View File

@ -63,13 +63,33 @@ func runEvents(dockerCli *command.DockerCli, opts *eventsOptions) error {
Filters: opts.filter.Value(), Filters: opts.filter.Value(),
} }
responseBody, err := dockerCli.Client().Events(context.Background(), options) ctx, cancel := context.WithCancel(context.Background())
if err != nil { events, errs := dockerCli.Client().Events(ctx, options)
return err defer cancel()
}
defer responseBody.Close()
return streamEvents(dockerCli.Out(), responseBody, tmpl) out := dockerCli.Out()
for {
select {
case event := <-events:
if err := handleEvent(out, event, tmpl); err != nil {
return err
}
case err := <-errs:
if err == io.EOF {
return nil
}
return err
}
}
}
func handleEvent(out io.Writer, event eventtypes.Message, tmpl *template.Template) error {
if tmpl == nil {
return prettyPrintEvent(out, event)
}
return formatEvent(out, event, tmpl)
} }
func makeTemplate(format string) (*template.Template, error) { func makeTemplate(format string) (*template.Template, error) {
@ -85,21 +105,6 @@ func makeTemplate(format string) (*template.Template, error) {
return tmpl, tmpl.Execute(ioutil.Discard, &eventtypes.Message{}) return tmpl, tmpl.Execute(ioutil.Discard, &eventtypes.Message{})
} }
// streamEvents decodes prints the incoming events in the provided output.
func streamEvents(out io.Writer, input io.Reader, tmpl *template.Template) error {
return DecodeEvents(input, func(event eventtypes.Message, err error) error {
if err != nil {
return err
}
if tmpl == nil {
return prettyPrintEvent(out, event)
}
return formatEvent(out, event, tmpl)
})
}
type eventProcessor func(event eventtypes.Message, err error) error
// prettyPrintEvent prints all types of event information. // prettyPrintEvent prints all types of event information.
// Each output includes the event type, actor id, name and action. // Each output includes the event type, actor id, name and action.
// Actor attributes are printed at the end if the actor has any. // Actor attributes are printed at the end if the actor has any.

View File

@ -1,14 +1,14 @@
package system package system
import ( import (
"encoding/json"
"io"
"sync" "sync"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
eventtypes "github.com/docker/docker/api/types/events" eventtypes "github.com/docker/docker/api/types/events"
) )
type eventProcessor func(eventtypes.Message, error) error
// EventHandler is abstract interface for user to customize // EventHandler is abstract interface for user to customize
// own handle functions of each type of events // own handle functions of each type of events
type EventHandler interface { type EventHandler interface {
@ -47,20 +47,3 @@ func (w *eventHandler) Watch(c <-chan eventtypes.Message) {
go h(e) go h(e)
} }
} }
// DecodeEvents decodes event from input stream
func DecodeEvents(input io.Reader, ep eventProcessor) error {
dec := json.NewDecoder(input)
for {
var event eventtypes.Message
err := dec.Decode(&event)
if err != nil && err == io.EOF {
break
}
if procErr := ep(event, err); procErr != nil {
return procErr
}
}
return nil
}