From 56143dd39a8791fba608f50aadb3d8e51a12cb04 Mon Sep 17 00:00:00 2001 From: LilyFaFa <21621231@zju.edu.cn> Date: Thu, 31 Aug 2017 21:24:57 +0800 Subject: [PATCH] Add streaming Test --- pkg/hyper/fake_client_interface_test.go | 127 ++++++++++++++- pkg/hyper/streaming_test.go | 208 ++++++++++++++++++++++++ 2 files changed, 328 insertions(+), 7 deletions(-) create mode 100644 pkg/hyper/streaming_test.go diff --git a/pkg/hyper/fake_client_interface_test.go b/pkg/hyper/fake_client_interface_test.go index eed35456..9b77799a 100644 --- a/pkg/hyper/fake_client_interface_test.go +++ b/pkg/hyper/fake_client_interface_test.go @@ -40,6 +40,7 @@ type fakeClientInterface struct { imageInfoList []*types.ImageInfo version string apiVersion string + execCmd map[string]*[]string } func newFakeClientInterface(c clock.Clock) *fakeClientInterface { @@ -47,11 +48,13 @@ func newFakeClientInterface(c clock.Clock) *fakeClientInterface { Clock: c, containerInfoMap: make(map[string]*types.ContainerInfo), podInfoMap: make(map[string]*types.PodInfo), + execCmd: make(map[string]*[]string), } } type FakePod struct { PodID string + Status string PodVolume []*types.PodVolume } @@ -63,8 +66,12 @@ func (f *fakeClientInterface) SetFakePod(pods []*FakePod) { podSpec := types.PodSpec{ Volumes: p.PodVolume, } + podStatus := types.PodStatus{ + Phase: p.Status, + } podInfo := types.PodInfo{ - Spec: &podSpec, + Spec: &podSpec, + Status: &podStatus, } f.podInfoMap[p.PodID] = &podInfo @@ -78,6 +85,36 @@ func (f *fakeClientInterface) SetVersion(version string, apiVersion string) { f.apiVersion = apiVersion } +type FakeContainer struct { + ID string + Name string + Status string + PodID string +} + +func (f *fakeClientInterface) SetFakeContainers(containers []*FakeContainer) { + f.Lock() + defer f.Unlock() + for i := range containers { + c := containers[i] + container := types.Container{ + Name: c.Name, + ContainerID: c.ID, + } + containerStatus := types.ContainerStatus{ + ContainerID: c.ID, + Phase: c.Status, + } + containerInfo := types.ContainerInfo{ + Container: &container, + Status: &containerStatus, + PodID: c.PodID, + } + f.containerInfoMap[c.ID] = &containerInfo + } + +} + func (f *fakeClientInterface) CleanCalls() { f.Lock() defer f.Unlock() @@ -128,8 +165,28 @@ func (f *fakeClientInterface) PodUnpause(ctx context.Context, in *types.PodUnpau return nil, fmt.Errorf("Not implemented") } +type fakePublicAPI_ExecVMClient struct { + grpc.ClientStream +} + +func (x *fakePublicAPI_ExecVMClient) Send(m *types.ExecVMRequest) error { + return nil +} + +func (x *fakePublicAPI_ExecVMClient) Recv() (*types.ExecVMResponse, error) { + m := &types.ExecVMResponse{} + return m, io.EOF +} + +func (x *fakePublicAPI_ExecVMClient) CloseSend() error { + return nil +} + func (f *fakeClientInterface) ExecVM(ctx context.Context, opts ...grpc.CallOption) (types.PublicAPI_ExecVMClient, error) { - return nil, fmt.Errorf("Not implemented") + f.Lock() + defer f.Unlock() + f.called = append(f.called, "ExecVM") + return &fakePublicAPI_ExecVMClient{}, f.err } func (f *fakeClientInterface) ContainerList(ctx context.Context, in *types.ContainerListRequest, opts ...grpc.CallOption) (*types.ContainerListResponse, error) { @@ -282,27 +339,83 @@ func (f *fakeClientInterface) ContainerRemove(ctx context.Context, in *types.Con } func (f *fakeClientInterface) ExecCreate(ctx context.Context, in *types.ExecCreateRequest, opts ...grpc.CallOption) (*types.ExecCreateResponse, error) { - return nil, fmt.Errorf("Not implemented") + f.Lock() + defer f.Unlock() + f.called = append(f.called, "ExecCreate") + containerId := in.ContainerID + f.execCmd[containerId] = &in.Command + //The container's name is "sidecar" + i,we use i as the execID + ids := strings.Split(containerId, "*") + execID := ids[1] + return &types.ExecCreateResponse{ + ExecID: execID, + }, f.err +} + +type fakePublicAPI_ExecStartClient struct { + grpc.ClientStream +} + +func (x *fakePublicAPI_ExecStartClient) Send(m *types.ExecStartRequest) error { + return nil +} + +func (x *fakePublicAPI_ExecStartClient) Recv() (*types.ExecStartResponse, error) { + m := &types.ExecStartResponse{} + return m, io.EOF +} + +func (x *fakePublicAPI_ExecStartClient) CloseSend() error { + return nil } func (f *fakeClientInterface) ExecStart(ctx context.Context, opts ...grpc.CallOption) (types.PublicAPI_ExecStartClient, error) { - return nil, fmt.Errorf("Not implemented") + f.Lock() + defer f.Unlock() + f.called = append(f.called, "ExecStart") + return &fakePublicAPI_ExecStartClient{}, f.err } func (f *fakeClientInterface) ExecSignal(ctx context.Context, in *types.ExecSignalRequest, opts ...grpc.CallOption) (*types.ExecSignalResponse, error) { return nil, fmt.Errorf("Not implemented") } +type fakePublicAPI_AttachClient struct { + grpc.ClientStream +} + +func (x *fakePublicAPI_AttachClient) Send(m *types.AttachMessage) error { + return nil +} + +func (x *fakePublicAPI_AttachClient) Recv() (*types.AttachMessage, error) { + m := &types.AttachMessage{} + return m, io.EOF +} + +func (x *fakePublicAPI_AttachClient) CloseSend() error { + return nil +} + func (f *fakeClientInterface) Attach(ctx context.Context, opts ...grpc.CallOption) (types.PublicAPI_AttachClient, error) { - return nil, fmt.Errorf("Not implemented") + f.Lock() + defer f.Unlock() + f.called = append(f.called, "Attach") + return &fakePublicAPI_AttachClient{}, f.err } func (f *fakeClientInterface) Wait(ctx context.Context, in *types.WaitRequest, opts ...grpc.CallOption) (*types.WaitResponse, error) { - return nil, fmt.Errorf("Not implemented") + f.Lock() + defer f.Unlock() + f.called = append(f.called, "Wait") + return &types.WaitResponse{}, f.err } func (f *fakeClientInterface) TTYResize(ctx context.Context, in *types.TTYResizeRequest, opts ...grpc.CallOption) (*types.TTYResizeResponse, error) { - return nil, fmt.Errorf("Not implemented") + f.Lock() + defer f.Unlock() + f.called = append(f.called, "TTYResize") + return &types.TTYResizeResponse{}, nil } func (f *fakeClientInterface) ServiceList(ctx context.Context, in *types.ServiceListRequest, opts ...grpc.CallOption) (*types.ServiceListResponse, error) { diff --git a/pkg/hyper/streaming_test.go b/pkg/hyper/streaming_test.go new file mode 100644 index 00000000..52685a6b --- /dev/null +++ b/pkg/hyper/streaming_test.go @@ -0,0 +1,208 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package hyper + +import ( + "fmt" + "io/ioutil" + "net/url" + "os" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + kubeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" + "k8s.io/kubernetes/pkg/kubelet/server/streaming" +) + +func newTestStreamingRuntime() (*streamingRuntime, *fakeClientInterface) { + publicClient := newFakeClientInterface(nil) + client := &Client{ + client: publicClient, + } + return &streamingRuntime{ + client: client, + }, publicClient +} + +func TestExec(t *testing.T) { + r, fakeClient := newTestStreamingRuntime() + container := "sidecar" + containerId, PodId := "c", "p" + containers := []*FakeContainer{} + //Create runnning containers for test + for i := 0; i < 2; i++ { + containerID := fmt.Sprintf("%s%s%d", containerId, "*", i) + podID := fmt.Sprintf("%s%s%d", PodId, "*", i) + containerName := fmt.Sprintf("%s%d", container, i) + container := &FakeContainer{ + ID: containerID, + Name: containerName, + Status: "running", + PodID: podID, + } + containers = append(containers, container) + } + fakeClient.SetFakeContainers(containers) + //Create a temporaty empty file + file, err := ioutil.TempFile("", "tmp") + assert.NoError(t, err) + defer os.Remove(file.Name()) + + rawContainerID := fmt.Sprintf("%s%s%d", containerId, "*", 0) + cmd := []string{ + "ls", + } + //Test streamingRuntime Exec + err = r.Exec(rawContainerID, cmd, file, nil, nil, false, nil) + assert.NoError(t, err) + //Test streamingRuntime Attach + rawContainerID = fmt.Sprintf("%s%s%d", containerId, "*", 1) + err = r.Attach(rawContainerID, file, nil, nil, false, nil) + assert.NoError(t, err) +} + +func TestPortForward(t *testing.T) { + r, fakeClient := newTestStreamingRuntime() + podId := "p" + pods := []*FakePod{} + //Create running pods for test + + podID := fmt.Sprintf("%s%s%d", podId, "*", 0) + pod := &FakePod{ + PodID: podID, + Status: "Running", + PodVolume: nil, + } + pods = append(pods, pod) + + fakeClient.SetFakePod(pods) + + //Create a temporaty empty file + file, err := ioutil.TempFile("", "tmp") + assert.NoError(t, err) + defer os.Remove(file.Name()) + err = r.PortForward(podID, int32(0), file) + assert.NoError(t, err) + +} + +func newTestRuntimeWithStreaming(url *url.URL) (*Runtime, *fakeClientInterface) { + publicClient := newFakeClientInterface(nil) + config := streaming.Config{ + BaseURL: url, + } + streamingServer, _ := streaming.NewServer(config, nil) + client := &Client{ + client: publicClient, + } + return &Runtime{ + client: client, + streamingServer: streamingServer, + }, publicClient +} + +func TestRuntimeExec(t *testing.T) { + host, scheme := "127.0.0.1", "http" + url := &url.URL{ + Scheme: scheme, + Host: host, + } + r, fakeClient := newTestRuntimeWithStreaming(url) + container := "sidecar" + containerId, PodId := "c", "p" + containers := []*FakeContainer{} + //Create runnning containers for test + for i := 0; i < 2; i++ { + containerID := fmt.Sprintf("%s%s%d", containerId, "*", i) + podID := fmt.Sprintf("%s%s%d", PodId, "*", i) + containerName := fmt.Sprintf("%s%d", container, i) + container := &FakeContainer{ + ID: containerID, + Name: containerName, + Status: "running", + PodID: podID, + } + containers = append(containers, container) + } + fakeClient.SetFakeContainers(containers) + //Test Runtime Exec + rawContainerID := fmt.Sprintf("%s%s%d", containerId, "*", 0) + execRequest := &kubeapi.ExecRequest{ + ContainerId: rawContainerID, + } + execResponse, err := r.Exec(execRequest) + assert.NoError(t, err) + //We cann't knew the token before it's created,eg:"http://127.0.0.1/exec/-BnwYvAM",-BnwYvAM is the token + urlRep := deleteToken(execResponse.Url) + expected := fmt.Sprintf("%s%s%s%s%s", scheme, "://", host, "/", "exec") + assert.Equal(t, urlRep, expected) + + //Test Runtime Attach + rawContainerID = fmt.Sprintf("%s%s%d", containerId, "*", 1) + attachRequest := &kubeapi.AttachRequest{ + ContainerId: rawContainerID, + } + attachResponse, err := r.Attach(attachRequest) + assert.NoError(t, err) + urlRep = deleteToken(attachResponse.Url) + expected = fmt.Sprintf("%s%s%s%s%s", scheme, "://", host, "/", "attach") + assert.Equal(t, urlRep, expected) +} + +func TestRuntimePortForward(t *testing.T) { + host, scheme := "127.0.0.1", "http" + url := &url.URL{ + Scheme: scheme, + Host: host, + } + r, fakeClient := newTestRuntimeWithStreaming(url) + podId := "p" + pods := []*FakePod{} + //Create running pods for test + podID := fmt.Sprintf("%s%s%d", podId, "*", 0) + pod := &FakePod{ + PodID: podID, + Status: "Running", + PodVolume: nil, + } + pods = append(pods, pod) + + fakeClient.SetFakePod(pods) + //Test Runtime PortForward + portForwardRequest := &kubeapi.PortForwardRequest{ + PodSandboxId: podID, + } + portForwardResponse, err := r.PortForward(portForwardRequest) + assert.NoError(t, err) + //We cann't knew the token before it's created,eg:"http://127.0.0.1/exec/-BnwYvAM",-BnwYvAM is the token + urlRep := deleteToken(portForwardResponse.Url) + expected := fmt.Sprintf("%s%s%s%s%s", scheme, "://", host, "/", "portforward") + assert.Equal(t, urlRep, expected) + +} + +//Used to remove the token from the string +func deleteToken(url string) string { + urlSplit := strings.Split(url, "/") + //urlRep := fmt.Sprintf("%s%s", urlSplit[0], "/") + urlRep := urlSplit[0] + for i := 1; i < len(urlSplit)-1; i++ { + urlRep = fmt.Sprintf("%s%s%s", urlRep, "/", urlSplit[i]) + } + return urlRep +}