Skip to content
This repository has been archived by the owner on Mar 5, 2024. It is now read-only.

Commit

Permalink
wip: refactor pod_cache to use Identity
Browse files Browse the repository at this point in the history
  • Loading branch information
stefansedich committed Nov 10, 2020
1 parent 9b86308 commit 69d4878
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 43 deletions.
6 changes: 4 additions & 2 deletions pkg/k8s/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ package k8s

import (
"context"
"k8s.io/api/core/v1"

"github.com/uswitch/kiam/pkg/aws/sts"
v1 "k8s.io/api/core/v1"
)

type PodGetter interface {
Expand All @@ -26,7 +28,7 @@ type PodAnnouncer interface {
// Will receive a Pod whenever there's a change/addition for a Pod with a role.
Pods() <-chan *v1.Pod
// Return whether there are still uncompleted pods in the specified role
IsActivePodsForRole(role string) (bool, error)
IsActivePodsForRole(identity *sts.RoleIdentity) (bool, error)
}

type NamespaceFinder interface {
Expand Down
38 changes: 24 additions & 14 deletions pkg/k8s/pod_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

log "github.com/sirupsen/logrus"
"github.com/uswitch/kiam/pkg/aws/sts"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
)
Expand All @@ -34,10 +35,10 @@ type PodCache struct {
// IP address so that Kiam can identify which role a Pod should assume. It periodically syncs the list of
// pods and can announce Pods. When announcing Pods via the channel it will drop events if the buffer
// is full- bufferSize determines how many.
func NewPodCache(source cache.ListerWatcher, syncInterval time.Duration, bufferSize int) *PodCache {
func NewPodCache(arnResolver sts.ARNResolver, source cache.ListerWatcher, syncInterval time.Duration, bufferSize int) *PodCache {
indexers := cache.Indexers{
indexPodIP: podIPIndex,
indexPodRole: podRoleIndex,
indexPodIP: podIPIndex,
indexPodRoleIdentity: podRoleIdentityIndex(arnResolver),
}
pods := make(chan *v1.Pod, bufferSize)
podHandler := &podHandler{pods}
Expand Down Expand Up @@ -70,8 +71,8 @@ func (s *PodCache) Pods() <-chan *v1.Pod {
// using the provided role. This is used to identify whether the
// role credentials should be maintained. Part of the PodAnnouncer
// interface
func (s *PodCache) IsActivePodsForRole(role string) (bool, error) {
items, err := s.indexer.ByIndex(indexPodRole, role)
func (s *PodCache) IsActivePodsForRole(identity *sts.RoleIdentity) (bool, error) {
items, err := s.indexer.ByIndex(indexPodRoleIdentity, identity.String())
if err != nil {
return false, err
}
Expand Down Expand Up @@ -138,8 +139,8 @@ func (s *PodCache) GetPodByIP(ip string) (*v1.Pod, error) {
}

const (
indexPodIP = "byIP"
indexPodRole = "byRole"
indexPodIP = "byIP"
indexPodRoleIdentity = "byRoleIdentity"
)

func podIPIndex(obj interface{}) ([]string, error) {
Expand All @@ -152,14 +153,23 @@ func podIPIndex(obj interface{}) ([]string, error) {
return []string{pod.Status.PodIP}, nil
}

func podRoleIndex(obj interface{}) ([]string, error) {
pod := obj.(*v1.Pod)
role := PodRole(pod)
if role == "" {
return []string{}, nil
}
func podRoleIdentityIndex(arnResolver sts.ARNResolver) func(obj interface{}) ([]string, error) {
return func(obj interface{}) ([]string, error) {
pod := obj.(*v1.Pod)
role := PodRole(pod)
if role == "" {
return []string{}, nil
}

sessionName := PodSessionName(pod)
externalID := PodExternalID(pod)
identity, err := sts.NewRoleIdentity(arnResolver, role, sessionName, externalID)
if err != nil {
return nil, err
}

return []string{role}, nil
return []string{identity.String()}, nil
}
}

// Run starts the controller processing updates. Blocks until the cache has synced
Expand Down
81 changes: 72 additions & 9 deletions pkg/k8s/pod_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ package k8s
import (
"context"
"fmt"
"testing"
"time"

"github.com/fortytw2/leaktest"
"github.com/uswitch/kiam/pkg/aws/sts"
"github.com/uswitch/kiam/pkg/testutil"
kt "k8s.io/client-go/tools/cache/testing"
"testing"
"time"
)

const bufferSize = 10
Expand All @@ -32,7 +34,8 @@ func TestFindsRunningPod(t *testing.T) {
defer cancel()

source := kt.NewFakeControllerSource()
c := NewPodCache(source, time.Second, bufferSize)
arnResolver := sts.DefaultResolver("arn:account:")
c := NewPodCache(arnResolver, source, time.Second, bufferSize)
source.Add(testutil.NewPodWithRole("ns", "name", "192.168.0.1", "Failed", "failed_role"))
source.Add(testutil.NewPodWithRole("ns", "name", "192.168.0.1", "Running", "running_role"))
c.Run(ctx)
Expand All @@ -54,32 +57,90 @@ func TestFindRoleActive(t *testing.T) {
defer cancel()

source := kt.NewFakeControllerSource()
c := NewPodCache(source, time.Second, bufferSize)
arnResolver := sts.DefaultResolver("arn:account:")
c := NewPodCache(arnResolver, source, time.Second, bufferSize)
source.Add(testutil.NewPodWithRole("ns", "name", "192.168.0.1", "Failed", "failed_role"))
source.Modify(testutil.NewPodWithRole("ns", "name", "192.168.0.1", "Failed", "running_role"))
source.Modify(testutil.NewPodWithRole("ns", "name", "192.168.0.1", "Running", "running_role"))
c.Run(ctx)
defer source.Shutdown()

active, _ := c.IsActivePodsForRole("failed_role")
identity, _ := sts.NewRoleIdentity(arnResolver, "failed_role", "", "")
active, _ := c.IsActivePodsForRole(identity)
if active {
t.Error("expected no active pods in failed_role")
}

active, _ = c.IsActivePodsForRole("running_role")
identity, _ = sts.NewRoleIdentity(arnResolver, "running_role", "", "")
active, _ = c.IsActivePodsForRole(identity)
if !active {
t.Error("expected running pod")
}
}

func TestFindRoleActiveWithSessionName(t *testing.T) {
defer leaktest.Check(t)()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

source := kt.NewFakeControllerSource()
arnResolver := sts.DefaultResolver("arn:account:")
c := NewPodCache(arnResolver, source, time.Second, bufferSize)
source.Add(testutil.NewPodWithSessionName("ns", "active-reader", "192.168.0.1", "Running", "reader", "active-reader"))
source.Add(testutil.NewPodWithSessionName("ns", "stopped-reader", "192.168.0.2", "Succeeded", "reader", "stopped-reader"))
c.Run(ctx)
defer source.Shutdown()

identity, _ := sts.NewRoleIdentity(arnResolver, "reader", "active-reader", "")
active, _ := c.IsActivePodsForRole(identity)
if !active {
t.Error("expected running pod for active-reader")
}

identity, _ = sts.NewRoleIdentity(arnResolver, "reader", "stopped-reader", "")
active, _ = c.IsActivePodsForRole(identity)
if active {
t.Error("expected no active pods for stopped-reader")
}
}

func TestFindRoleActiveWithExternalID(t *testing.T) {
defer leaktest.Check(t)()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

source := kt.NewFakeControllerSource()
arnResolver := sts.DefaultResolver("arn:account:")
c := NewPodCache(arnResolver, source, time.Second, bufferSize)
source.Add(testutil.NewPodWithExternalID("ns", "active-reader", "192.168.0.1", "Running", "reader", "1234"))
source.Add(testutil.NewPodWithExternalID("ns", "stopped-reader", "192.168.0.2", "Succeeded", "reader", "4321"))
c.Run(ctx)
defer source.Shutdown()

identity, _ := sts.NewRoleIdentity(arnResolver, "reader", "", "1234")
active, _ := c.IsActivePodsForRole(identity)
if !active {
t.Error("expected running pod for active-reader")
}

identity, _ = sts.NewRoleIdentity(arnResolver, "reader", "", "4321")
active, _ = c.IsActivePodsForRole(identity)
if active {
t.Error("expected no active pods for stopped-reader")
}
}

func BenchmarkFindPodsByIP(b *testing.B) {
b.StopTimer()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

source := kt.NewFakeControllerSource()
c := NewPodCache(source, time.Second, bufferSize)
arnResolver := sts.DefaultResolver("arn:account:")
c := NewPodCache(arnResolver, source, time.Second, bufferSize)
for i := 0; i < 1000; i++ {
source.Add(testutil.NewPodWithRole("ns", fmt.Sprintf("name-%d", i), fmt.Sprintf("ip-%d", i), "Running", "foo_role"))
}
Expand All @@ -106,12 +167,14 @@ func BenchmarkIsActiveRole(b *testing.B) {
role := i % 100
source.Add(testutil.NewPodWithRole("ns", fmt.Sprintf("name-%d", i), fmt.Sprintf("ip-%d", i), "Running", fmt.Sprintf("role-%d", role)))
}
c := NewPodCache(source, time.Second, bufferSize)
arnResolver := sts.DefaultResolver("arn:account:")
c := NewPodCache(arnResolver, source, time.Second, bufferSize)
c.Run(ctx)

b.StartTimer()

for n := 0; n < b.N; n++ {
c.IsActivePodsForRole("role-0")
identity, _ := sts.NewRoleIdentity(arnResolver, "role-0", "", "")
c.IsActivePodsForRole(identity)
}
}
6 changes: 4 additions & 2 deletions pkg/k8s/testing/stub_finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ package testing

import (
"context"

"github.com/uswitch/kiam/pkg/aws/sts"
"github.com/uswitch/kiam/pkg/k8s"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
)

func NewStubFinder(pod *v1.Pod) *StubFinder {
Expand Down Expand Up @@ -51,7 +53,7 @@ func (f *stubAnnouncer) Pods() <-chan *v1.Pod {
return f.pods
}

func (f *stubAnnouncer) IsActivePodsForRole(role string) (bool, error) {
func (f *stubAnnouncer) IsActivePodsForRole(identity *sts.RoleIdentity) (bool, error) {
return true, nil
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/prefetch/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (m *CredentialManager) Run(ctx context.Context, parallelRoutines int) {
func (m *CredentialManager) handleExpiring(ctx context.Context, credentials *sts.CachedCredentials) {
logger := log.WithFields(sts.CredentialsFields(credentials.Identity, credentials.Credentials))

active, err := m.IsRoleActive(credentials.Identity.Role.Name)
active, err := m.IsRoleActive(credentials.Identity)
if err != nil {
logger.Errorf("error checking whether role active: %s", err.Error())
return
Expand All @@ -104,6 +104,6 @@ func (m *CredentialManager) handleExpiring(ctx context.Context, credentials *sts
}
}

func (m *CredentialManager) IsRoleActive(role string) (bool, error) {
return m.announcer.IsActivePodsForRole(role)
func (m *CredentialManager) IsRoleActive(identity *sts.RoleIdentity) (bool, error) {
return m.announcer.IsActivePodsForRole(identity)
}
12 changes: 9 additions & 3 deletions pkg/server/server_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (
"crypto/tls"
"crypto/x509"
"fmt"
"net"
"time"

grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
log "github.com/sirupsen/logrus"
"github.com/uswitch/k8sc/official"
Expand All @@ -33,8 +36,6 @@ import (
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
"net"
"time"
)

// KiamServerBuilder helps construct the KiamServer
Expand Down Expand Up @@ -109,7 +110,12 @@ func (b *KiamServerBuilder) WithKubernetesClient() (*KiamServerBuilder, error) {
return nil, err
}

podCache := k8s.NewPodCache(k8s.NewListWatch(client, k8s.ResourcePods), b.config.PodSyncInterval, b.config.PrefetchBufferSize)
arnResolver, err := newRoleARNResolver(b.config)
if err != nil {
return nil, err
}

podCache := k8s.NewPodCache(arnResolver, k8s.NewListWatch(client, k8s.ResourcePods), b.config.PodSyncInterval, b.config.PrefetchBufferSize)
nsCache := k8s.NewNamespaceCache(k8s.NewListWatch(client, k8s.ResourceNamespaces), time.Minute)

b.WithCaches(podCache, nsCache)
Expand Down
8 changes: 5 additions & 3 deletions pkg/server/server_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ package server

import (
"context"
"testing"
"time"

"github.com/fortytw2/leaktest"
"github.com/uswitch/kiam/pkg/aws/sts"
"github.com/uswitch/kiam/pkg/k8s"
"google.golang.org/grpc"
kt "k8s.io/client-go/tools/cache/testing"
"testing"
"time"
)

const (
Expand Down Expand Up @@ -120,7 +122,7 @@ func newTestServer(ctx context.Context) (*KiamServer, *kt.FakeControllerSource,
source := kt.NewFakeControllerSource()
defer source.Shutdown()

podCache := k8s.NewPodCache(source, time.Second, defaultBuffer)
podCache := k8s.NewPodCache(sts.DefaultResolver("arn:account:"), source, time.Second, defaultBuffer)
podCache.Run(ctx)
namespaceCache := k8s.NewNamespaceCache(source, time.Second)
namespaceCache.Run(ctx)
Expand Down
14 changes: 7 additions & 7 deletions pkg/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestReturnsErrorWhenPodNotFound(t *testing.T) {
source := kt.NewFakeControllerSource()
defer source.Shutdown()

podCache := k8s.NewPodCache(source, time.Second, defaultBuffer)
podCache := k8s.NewPodCache(sts.DefaultResolver("arn:account:"), source, time.Second, defaultBuffer)
server := &KiamServer{pods: podCache}

_, err := server.GetPodCredentials(context.Background(), &pb.GetPodCredentialsRequest{})
Expand All @@ -61,7 +61,7 @@ func TestReturnsPolicyErrorWhenForbidden(t *testing.T) {
defer source.Shutdown()
source.Add(testutil.NewPodWithRole("ns", "name", "192.168.0.1", "Running", "running_role"))

podCache := k8s.NewPodCache(source, time.Second, defaultBuffer)
podCache := k8s.NewPodCache(sts.DefaultResolver("arn:account:"), source, time.Second, defaultBuffer)
podCache.Run(ctx)
server := &KiamServer{pods: podCache, assumePolicy: &forbidPolicy{}, arnResolver: sts.DefaultResolver("prefix")}

Expand All @@ -81,7 +81,7 @@ func TestReturnsAnnotatedPodRole(t *testing.T) {
defer source.Shutdown()
source.Add(testutil.NewPodWithRole("ns", "name", "192.168.0.1", "Running", "running_role"))

podCache := k8s.NewPodCache(source, time.Second, defaultBuffer)
podCache := k8s.NewPodCache(sts.DefaultResolver("arn:account:"), source, time.Second, defaultBuffer)
podCache.Run(ctx)

server := &KiamServer{pods: podCache, assumePolicy: &allowPolicy{}, credentialsProvider: &stubCredentialsProvider{accessKey: "A1234"}}
Expand All @@ -102,7 +102,7 @@ func TestReturnsErrorFromGetPodRoleWhenPodNotFound(t *testing.T) {
defer source.Shutdown()
source.Add(testutil.NewPodWithRole("ns", "name", "192.168.0.1", "Running", "running_role"))

podCache := k8s.NewPodCache(source, time.Second, defaultBuffer)
podCache := k8s.NewPodCache(sts.DefaultResolver("arn:account:"), source, time.Second, defaultBuffer)
podCache.Run(ctx)

server := &KiamServer{pods: podCache, assumePolicy: &allowPolicy{}, credentialsProvider: &stubCredentialsProvider{accessKey: "A1234"}}
Expand All @@ -126,7 +126,7 @@ func TestReturnsCredentials(t *testing.T) {
defer source.Shutdown()
source.Add(testutil.NewPodWithRole("ns", "name", "192.168.0.1", "Running", roleName))

podCache := k8s.NewPodCache(source, time.Second, defaultBuffer)
podCache := k8s.NewPodCache(sts.DefaultResolver("arn:account:"), source, time.Second, defaultBuffer)
podCache.Run(ctx)
server := &KiamServer{pods: podCache, assumePolicy: &allowPolicy{}, credentialsProvider: &stubCredentialsProvider{accessKey: "A1234"}, arnResolver: sts.DefaultResolver("prefix")}

Expand Down Expand Up @@ -158,7 +158,7 @@ func TestGetPodCredentialsWithSessionName(t *testing.T) {
source.Add(testutil.NewPodWithSessionName("ns", "name", "192.168.0.1", "Running", roleName, sessionName))

credentialsProvider := stubCredentialsProvider{accessKey: "A1234"}
podCache := k8s.NewPodCache(source, time.Second, defaultBuffer)
podCache := k8s.NewPodCache(sts.DefaultResolver("arn:account:"), source, time.Second, defaultBuffer)
podCache.Run(ctx)
server := &KiamServer{pods: podCache, assumePolicy: &allowPolicy{}, credentialsProvider: &credentialsProvider, arnResolver: sts.DefaultResolver("prefix")}

Expand Down Expand Up @@ -187,7 +187,7 @@ func TestGetPodCredentialsWithExternalID(t *testing.T) {
source.Add(testutil.NewPodWithExternalID("ns", "name", "192.168.0.1", "Running", roleName, externalID))

credentialsProvider := stubCredentialsProvider{accessKey: "A1234"}
podCache := k8s.NewPodCache(source, time.Second, defaultBuffer)
podCache := k8s.NewPodCache(sts.DefaultResolver("arn:account:"), source, time.Second, defaultBuffer)
podCache.Run(ctx)
server := &KiamServer{pods: podCache, assumePolicy: &allowPolicy{}, credentialsProvider: &credentialsProvider, arnResolver: sts.DefaultResolver("prefix")}

Expand Down

0 comments on commit 69d4878

Please sign in to comment.