Refactor to new events api

Signed-off-by: Josh Horwitz <horwitzja@gmail.com>
This commit is contained in:
Josh Horwitz 2016-08-09 10:34:07 -10:00
parent 8004cf1c10
commit 9acc93282e
3 changed files with 127 additions and 38 deletions

View File

@ -1,20 +1,71 @@
package client package client
import ( import (
"io" "encoding/json"
"net/url" "net/url"
"time" "time"
"golang.org/x/net/context" "golang.org/x/net/context"
"github.com/docker/docker/api/types" "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/events"
"github.com/docker/docker/api/types/filters" "github.com/docker/docker/api/types/filters"
timetypes "github.com/docker/docker/api/types/time" timetypes "github.com/docker/docker/api/types/time"
) )
// Events returns a stream of events in the daemon in a ReadCloser. // Events returns a stream of events in the daemon. It's up to the caller to close the stream
// It's up to the caller to close the stream. // by cancelling the context. Once the stream has been completely read an io.EOF error will
func (cli *Client) Events(ctx context.Context, options types.EventsOptions) (io.ReadCloser, error) { // be sent over the error channel. If an error is sent all processing will be stopped. It's up
// to the caller to reopen the stream in the event of an error by reinvoking this method.
func (cli *Client) Events(ctx context.Context, options types.EventsOptions) (<-chan events.Message, <-chan error) {
messages := make(chan events.Message)
errs := make(chan error, 1)
go func() {
defer close(errs)
query, err := buildEventsQueryParams(cli.version, options)
if err != nil {
errs <- err
return
}
resp, err := cli.get(ctx, "/events", query, nil)
if err != nil {
errs <- err
return
}
defer resp.body.Close()
decoder := json.NewDecoder(resp.body)
for {
select {
case <-ctx.Done():
errs <- ctx.Err()
return
default:
var event events.Message
if err := decoder.Decode(&event); err != nil {
errs <- err
return
}
select {
case messages <- event:
case <-ctx.Done():
errs <- ctx.Err()
return
}
}
}
}()
return messages, errs
}
func buildEventsQueryParams(cliVersion string, options types.EventsOptions) (url.Values, error) {
query := url.Values{} query := url.Values{}
ref := time.Now() ref := time.Now()
@ -25,6 +76,7 @@ func (cli *Client) Events(ctx context.Context, options types.EventsOptions) (io.
} }
query.Set("since", ts) query.Set("since", ts)
} }
if options.Until != "" { if options.Until != "" {
ts, err := timetypes.GetTimestamp(options.Until, ref) ts, err := timetypes.GetTimestamp(options.Until, ref)
if err != nil { if err != nil {
@ -32,17 +84,14 @@ func (cli *Client) Events(ctx context.Context, options types.EventsOptions) (io.
} }
query.Set("until", ts) query.Set("until", ts)
} }
if options.Filters.Len() > 0 { if options.Filters.Len() > 0 {
filterJSON, err := filters.ToParamWithVersion(cli.version, options.Filters) filterJSON, err := filters.ToParamWithVersion(cliVersion, options.Filters)
if err != nil { if err != nil {
return nil, err return nil, err
} }
query.Set("filters", filterJSON) query.Set("filters", filterJSON)
} }
serverResponse, err := cli.get(ctx, "/events", query, nil) return query, nil
if err != nil {
return nil, err
}
return serverResponse.body, nil
} }

View File

@ -2,7 +2,9 @@ package client
import ( import (
"bytes" "bytes"
"encoding/json"
"fmt" "fmt"
"io"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"strings" "strings"
@ -11,6 +13,7 @@ import (
"golang.org/x/net/context" "golang.org/x/net/context"
"github.com/docker/docker/api/types" "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/events"
"github.com/docker/docker/api/types/filters" "github.com/docker/docker/api/types/filters"
) )
@ -36,7 +39,8 @@ func TestEventsErrorInOptions(t *testing.T) {
client := &Client{ client := &Client{
client: newMockClient(errorMock(http.StatusInternalServerError, "Server error")), client: newMockClient(errorMock(http.StatusInternalServerError, "Server error")),
} }
_, err := client.Events(context.Background(), e.options) _, errs := client.Events(context.Background(), e.options)
err := <-errs
if err == nil || !strings.Contains(err.Error(), e.expectedError) { if err == nil || !strings.Contains(err.Error(), e.expectedError) {
t.Fatalf("expected an error %q, got %v", e.expectedError, err) t.Fatalf("expected an error %q, got %v", e.expectedError, err)
} }
@ -47,39 +51,36 @@ func TestEventsErrorFromServer(t *testing.T) {
client := &Client{ client := &Client{
client: newMockClient(errorMock(http.StatusInternalServerError, "Server error")), client: newMockClient(errorMock(http.StatusInternalServerError, "Server error")),
} }
_, err := client.Events(context.Background(), types.EventsOptions{}) _, errs := client.Events(context.Background(), types.EventsOptions{})
err := <-errs
if err == nil || err.Error() != "Error response from daemon: Server error" { if err == nil || err.Error() != "Error response from daemon: Server error" {
t.Fatalf("expected a Server Error, got %v", err) t.Fatalf("expected a Server Error, got %v", err)
} }
} }
func TestEvents(t *testing.T) { func TestEvents(t *testing.T) {
expectedURL := "/events" expectedURL := "/events"
filters := filters.NewArgs() filters := filters.NewArgs()
filters.Add("label", "label1") filters.Add("type", events.ContainerEventType)
filters.Add("label", "label2") expectedFiltersJSON := fmt.Sprintf(`{"type":{"%s":true}}`, events.ContainerEventType)
expectedFiltersJSON := `{"label":{"label1":true,"label2":true}}`
eventsCases := []struct { eventsCases := []struct {
options types.EventsOptions options types.EventsOptions
events []events.Message
expectedEvents map[string]bool
expectedQueryParams map[string]string expectedQueryParams map[string]string
}{ }{
{ {
options: types.EventsOptions{ options: types.EventsOptions{
Since: "invalid but valid", Filters: filters,
}, },
expectedQueryParams: map[string]string{ expectedQueryParams: map[string]string{
"since": "invalid but valid", "filters": expectedFiltersJSON,
},
},
{
options: types.EventsOptions{
Until: "invalid but valid",
},
expectedQueryParams: map[string]string{
"until": "invalid but valid",
}, },
events: []events.Message{},
expectedEvents: make(map[string]bool),
}, },
{ {
options: types.EventsOptions{ options: types.EventsOptions{
@ -88,6 +89,28 @@ func TestEvents(t *testing.T) {
expectedQueryParams: map[string]string{ expectedQueryParams: map[string]string{
"filters": expectedFiltersJSON, "filters": expectedFiltersJSON,
}, },
events: []events.Message{
{
Type: "container",
ID: "1",
Action: "create",
},
{
Type: "container",
ID: "2",
Action: "die",
},
{
Type: "container",
ID: "3",
Action: "create",
},
},
expectedEvents: map[string]bool{
"1": true,
"2": true,
"3": true,
},
}, },
} }
@ -98,29 +121,45 @@ func TestEvents(t *testing.T) {
return nil, fmt.Errorf("Expected URL '%s', got '%s'", expectedURL, req.URL) return nil, fmt.Errorf("Expected URL '%s', got '%s'", expectedURL, req.URL)
} }
query := req.URL.Query() query := req.URL.Query()
for key, expected := range eventsCase.expectedQueryParams { for key, expected := range eventsCase.expectedQueryParams {
actual := query.Get(key) actual := query.Get(key)
if actual != expected { if actual != expected {
return nil, fmt.Errorf("%s not set in URL query properly. Expected '%s', got %s", key, expected, actual) return nil, fmt.Errorf("%s not set in URL query properly. Expected '%s', got %s", key, expected, actual)
} }
} }
buffer := new(bytes.Buffer)
for _, e := range eventsCase.events {
b, _ := json.Marshal(e)
buffer.Write(b)
}
return &http.Response{ return &http.Response{
StatusCode: http.StatusOK, StatusCode: http.StatusOK,
Body: ioutil.NopCloser(bytes.NewReader([]byte("response"))), Body: ioutil.NopCloser(buffer),
}, nil }, nil
}), }),
} }
body, err := client.Events(context.Background(), eventsCase.options)
if err != nil { messages, errs := client.Events(context.Background(), eventsCase.options)
loop:
for {
select {
case err := <-errs:
if err != nil && err != io.EOF {
t.Fatal(err) t.Fatal(err)
} }
defer body.Close()
content, err := ioutil.ReadAll(body) break loop
if err != nil { case e := <-messages:
t.Fatal(err) _, ok := eventsCase.expectedEvents[e.ID]
} if !ok {
if string(content) != "response" { t.Fatalf("event received not expected with action %s & id %s", e.Action, e.ID)
t.Fatalf("expected response to contain 'response', got %s", string(content)) }
}
} }
} }
} }

View File

@ -6,6 +6,7 @@ import (
"github.com/docker/docker/api/types" "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/events"
"github.com/docker/docker/api/types/filters" "github.com/docker/docker/api/types/filters"
"github.com/docker/docker/api/types/network" "github.com/docker/docker/api/types/network"
"github.com/docker/docker/api/types/registry" "github.com/docker/docker/api/types/registry"
@ -120,7 +121,7 @@ type SwarmAPIClient interface {
// SystemAPIClient defines API client methods for the system // SystemAPIClient defines API client methods for the system
type SystemAPIClient interface { type SystemAPIClient interface {
Events(ctx context.Context, options types.EventsOptions) (io.ReadCloser, error) Events(ctx context.Context, options types.EventsOptions) (<-chan events.Message, <-chan error)
Info(ctx context.Context) (types.Info, error) Info(ctx context.Context) (types.Info, error)
RegistryLogin(ctx context.Context, auth types.AuthConfig) (types.AuthResponse, error) RegistryLogin(ctx context.Context, auth types.AuthConfig) (types.AuthResponse, error)
} }