mirror of https://github.com/docker/cli.git
Merge pull request #25853 from jhorwit2/jah/event-refactor
Refactor to new engine-api events api
This commit is contained in:
commit
0da5e77c67
69
events.go
69
events.go
|
@ -1,20 +1,71 @@
|
||||||
package client
|
package client
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io"
|
"encoding/json"
|
||||||
"net/url"
|
"net/url"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
|
|
||||||
"github.com/docker/docker/api/types"
|
"github.com/docker/docker/api/types"
|
||||||
|
"github.com/docker/docker/api/types/events"
|
||||||
"github.com/docker/docker/api/types/filters"
|
"github.com/docker/docker/api/types/filters"
|
||||||
timetypes "github.com/docker/docker/api/types/time"
|
timetypes "github.com/docker/docker/api/types/time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Events returns a stream of events in the daemon in a ReadCloser.
|
// Events returns a stream of events in the daemon. It's up to the caller to close the stream
|
||||||
// It's up to the caller to close the stream.
|
// by cancelling the context. Once the stream has been completely read an io.EOF error will
|
||||||
func (cli *Client) Events(ctx context.Context, options types.EventsOptions) (io.ReadCloser, error) {
|
// be sent over the error channel. If an error is sent all processing will be stopped. It's up
|
||||||
|
// to the caller to reopen the stream in the event of an error by reinvoking this method.
|
||||||
|
func (cli *Client) Events(ctx context.Context, options types.EventsOptions) (<-chan events.Message, <-chan error) {
|
||||||
|
|
||||||
|
messages := make(chan events.Message)
|
||||||
|
errs := make(chan error, 1)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer close(errs)
|
||||||
|
|
||||||
|
query, err := buildEventsQueryParams(cli.version, options)
|
||||||
|
if err != nil {
|
||||||
|
errs <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := cli.get(ctx, "/events", query, nil)
|
||||||
|
if err != nil {
|
||||||
|
errs <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer resp.body.Close()
|
||||||
|
|
||||||
|
decoder := json.NewDecoder(resp.body)
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
errs <- ctx.Err()
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
var event events.Message
|
||||||
|
if err := decoder.Decode(&event); err != nil {
|
||||||
|
errs <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case messages <- event:
|
||||||
|
case <-ctx.Done():
|
||||||
|
errs <- ctx.Err()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return messages, errs
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildEventsQueryParams(cliVersion string, options types.EventsOptions) (url.Values, error) {
|
||||||
query := url.Values{}
|
query := url.Values{}
|
||||||
ref := time.Now()
|
ref := time.Now()
|
||||||
|
|
||||||
|
@ -25,6 +76,7 @@ func (cli *Client) Events(ctx context.Context, options types.EventsOptions) (io.
|
||||||
}
|
}
|
||||||
query.Set("since", ts)
|
query.Set("since", ts)
|
||||||
}
|
}
|
||||||
|
|
||||||
if options.Until != "" {
|
if options.Until != "" {
|
||||||
ts, err := timetypes.GetTimestamp(options.Until, ref)
|
ts, err := timetypes.GetTimestamp(options.Until, ref)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -32,17 +84,14 @@ func (cli *Client) Events(ctx context.Context, options types.EventsOptions) (io.
|
||||||
}
|
}
|
||||||
query.Set("until", ts)
|
query.Set("until", ts)
|
||||||
}
|
}
|
||||||
|
|
||||||
if options.Filters.Len() > 0 {
|
if options.Filters.Len() > 0 {
|
||||||
filterJSON, err := filters.ToParamWithVersion(cli.version, options.Filters)
|
filterJSON, err := filters.ToParamWithVersion(cliVersion, options.Filters)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
query.Set("filters", filterJSON)
|
query.Set("filters", filterJSON)
|
||||||
}
|
}
|
||||||
|
|
||||||
serverResponse, err := cli.get(ctx, "/events", query, nil)
|
return query, nil
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return serverResponse.body, nil
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,7 +2,9 @@ package client
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
|
@ -11,6 +13,7 @@ import (
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
|
|
||||||
"github.com/docker/docker/api/types"
|
"github.com/docker/docker/api/types"
|
||||||
|
"github.com/docker/docker/api/types/events"
|
||||||
"github.com/docker/docker/api/types/filters"
|
"github.com/docker/docker/api/types/filters"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -36,7 +39,8 @@ func TestEventsErrorInOptions(t *testing.T) {
|
||||||
client := &Client{
|
client := &Client{
|
||||||
client: newMockClient(errorMock(http.StatusInternalServerError, "Server error")),
|
client: newMockClient(errorMock(http.StatusInternalServerError, "Server error")),
|
||||||
}
|
}
|
||||||
_, err := client.Events(context.Background(), e.options)
|
_, errs := client.Events(context.Background(), e.options)
|
||||||
|
err := <-errs
|
||||||
if err == nil || !strings.Contains(err.Error(), e.expectedError) {
|
if err == nil || !strings.Contains(err.Error(), e.expectedError) {
|
||||||
t.Fatalf("expected an error %q, got %v", e.expectedError, err)
|
t.Fatalf("expected an error %q, got %v", e.expectedError, err)
|
||||||
}
|
}
|
||||||
|
@ -47,39 +51,36 @@ func TestEventsErrorFromServer(t *testing.T) {
|
||||||
client := &Client{
|
client := &Client{
|
||||||
client: newMockClient(errorMock(http.StatusInternalServerError, "Server error")),
|
client: newMockClient(errorMock(http.StatusInternalServerError, "Server error")),
|
||||||
}
|
}
|
||||||
_, err := client.Events(context.Background(), types.EventsOptions{})
|
_, errs := client.Events(context.Background(), types.EventsOptions{})
|
||||||
|
err := <-errs
|
||||||
if err == nil || err.Error() != "Error response from daemon: Server error" {
|
if err == nil || err.Error() != "Error response from daemon: Server error" {
|
||||||
t.Fatalf("expected a Server Error, got %v", err)
|
t.Fatalf("expected a Server Error, got %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestEvents(t *testing.T) {
|
func TestEvents(t *testing.T) {
|
||||||
|
|
||||||
expectedURL := "/events"
|
expectedURL := "/events"
|
||||||
|
|
||||||
filters := filters.NewArgs()
|
filters := filters.NewArgs()
|
||||||
filters.Add("label", "label1")
|
filters.Add("type", events.ContainerEventType)
|
||||||
filters.Add("label", "label2")
|
expectedFiltersJSON := fmt.Sprintf(`{"type":{"%s":true}}`, events.ContainerEventType)
|
||||||
expectedFiltersJSON := `{"label":{"label1":true,"label2":true}}`
|
|
||||||
|
|
||||||
eventsCases := []struct {
|
eventsCases := []struct {
|
||||||
options types.EventsOptions
|
options types.EventsOptions
|
||||||
|
events []events.Message
|
||||||
|
expectedEvents map[string]bool
|
||||||
expectedQueryParams map[string]string
|
expectedQueryParams map[string]string
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
options: types.EventsOptions{
|
options: types.EventsOptions{
|
||||||
Since: "invalid but valid",
|
Filters: filters,
|
||||||
},
|
},
|
||||||
expectedQueryParams: map[string]string{
|
expectedQueryParams: map[string]string{
|
||||||
"since": "invalid but valid",
|
"filters": expectedFiltersJSON,
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
options: types.EventsOptions{
|
|
||||||
Until: "invalid but valid",
|
|
||||||
},
|
|
||||||
expectedQueryParams: map[string]string{
|
|
||||||
"until": "invalid but valid",
|
|
||||||
},
|
},
|
||||||
|
events: []events.Message{},
|
||||||
|
expectedEvents: make(map[string]bool),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
options: types.EventsOptions{
|
options: types.EventsOptions{
|
||||||
|
@ -88,6 +89,28 @@ func TestEvents(t *testing.T) {
|
||||||
expectedQueryParams: map[string]string{
|
expectedQueryParams: map[string]string{
|
||||||
"filters": expectedFiltersJSON,
|
"filters": expectedFiltersJSON,
|
||||||
},
|
},
|
||||||
|
events: []events.Message{
|
||||||
|
{
|
||||||
|
Type: "container",
|
||||||
|
ID: "1",
|
||||||
|
Action: "create",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Type: "container",
|
||||||
|
ID: "2",
|
||||||
|
Action: "die",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Type: "container",
|
||||||
|
ID: "3",
|
||||||
|
Action: "create",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedEvents: map[string]bool{
|
||||||
|
"1": true,
|
||||||
|
"2": true,
|
||||||
|
"3": true,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -98,29 +121,45 @@ func TestEvents(t *testing.T) {
|
||||||
return nil, fmt.Errorf("Expected URL '%s', got '%s'", expectedURL, req.URL)
|
return nil, fmt.Errorf("Expected URL '%s', got '%s'", expectedURL, req.URL)
|
||||||
}
|
}
|
||||||
query := req.URL.Query()
|
query := req.URL.Query()
|
||||||
|
|
||||||
for key, expected := range eventsCase.expectedQueryParams {
|
for key, expected := range eventsCase.expectedQueryParams {
|
||||||
actual := query.Get(key)
|
actual := query.Get(key)
|
||||||
if actual != expected {
|
if actual != expected {
|
||||||
return nil, fmt.Errorf("%s not set in URL query properly. Expected '%s', got %s", key, expected, actual)
|
return nil, fmt.Errorf("%s not set in URL query properly. Expected '%s', got %s", key, expected, actual)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
buffer := new(bytes.Buffer)
|
||||||
|
|
||||||
|
for _, e := range eventsCase.events {
|
||||||
|
b, _ := json.Marshal(e)
|
||||||
|
buffer.Write(b)
|
||||||
|
}
|
||||||
|
|
||||||
return &http.Response{
|
return &http.Response{
|
||||||
StatusCode: http.StatusOK,
|
StatusCode: http.StatusOK,
|
||||||
Body: ioutil.NopCloser(bytes.NewReader([]byte("response"))),
|
Body: ioutil.NopCloser(buffer),
|
||||||
}, nil
|
}, nil
|
||||||
}),
|
}),
|
||||||
}
|
}
|
||||||
body, err := client.Events(context.Background(), eventsCase.options)
|
|
||||||
if err != nil {
|
messages, errs := client.Events(context.Background(), eventsCase.options)
|
||||||
|
|
||||||
|
loop:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case err := <-errs:
|
||||||
|
if err != nil && err != io.EOF {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
defer body.Close()
|
|
||||||
content, err := ioutil.ReadAll(body)
|
break loop
|
||||||
if err != nil {
|
case e := <-messages:
|
||||||
t.Fatal(err)
|
_, ok := eventsCase.expectedEvents[e.ID]
|
||||||
}
|
if !ok {
|
||||||
if string(content) != "response" {
|
t.Fatalf("event received not expected with action %s & id %s", e.Action, e.ID)
|
||||||
t.Fatalf("expected response to contain 'response', got %s", string(content))
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
|
|
||||||
"github.com/docker/docker/api/types"
|
"github.com/docker/docker/api/types"
|
||||||
"github.com/docker/docker/api/types/container"
|
"github.com/docker/docker/api/types/container"
|
||||||
|
"github.com/docker/docker/api/types/events"
|
||||||
"github.com/docker/docker/api/types/filters"
|
"github.com/docker/docker/api/types/filters"
|
||||||
"github.com/docker/docker/api/types/network"
|
"github.com/docker/docker/api/types/network"
|
||||||
"github.com/docker/docker/api/types/registry"
|
"github.com/docker/docker/api/types/registry"
|
||||||
|
@ -120,7 +121,7 @@ type SwarmAPIClient interface {
|
||||||
|
|
||||||
// SystemAPIClient defines API client methods for the system
|
// SystemAPIClient defines API client methods for the system
|
||||||
type SystemAPIClient interface {
|
type SystemAPIClient interface {
|
||||||
Events(ctx context.Context, options types.EventsOptions) (io.ReadCloser, error)
|
Events(ctx context.Context, options types.EventsOptions) (<-chan events.Message, <-chan error)
|
||||||
Info(ctx context.Context) (types.Info, error)
|
Info(ctx context.Context) (types.Info, error)
|
||||||
RegistryLogin(ctx context.Context, auth types.AuthConfig) (types.AuthResponse, error)
|
RegistryLogin(ctx context.Context, auth types.AuthConfig) (types.AuthResponse, error)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue