From 91dce0c63bc25a157dd6df65d2de57e5d01ca02b Mon Sep 17 00:00:00 2001 From: mbohlool Date: Wed, 30 Nov 2016 03:33:38 -0800 Subject: [PATCH] - Add context switch to kube config loader - Refactor kube config loader to be able to test it - Add test for kube config loader --- examples/example4.py | 62 ++++ kubernetes/config/__init__.py | 1 + kubernetes/config/kube_config.py | 326 +++++++++++------- kubernetes/config/kube_config_test.py | 469 ++++++++++++++++++++++++++ 4 files changed, 741 insertions(+), 117 deletions(-) create mode 100644 examples/example4.py create mode 100644 kubernetes/config/kube_config_test.py diff --git a/examples/example4.py b/examples/example4.py new file mode 100644 index 0000000000..af309b191e --- /dev/null +++ b/examples/example4.py @@ -0,0 +1,62 @@ +# Copyright 2016 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +from kubernetes import client, config +from kubernetes.client import configuration + + +def main(): + config_file = os.path.join(os.path.expanduser('~'), '.kube', 'config') + contexts, active_context = config.list_kube_config_contexts(config_file) + if not contexts: + print("Cannot find any context in kube-config file.") + return + contexts = [context['name'] for context in contexts] + active_context = active_context['name'] + for i, context in enumerate(contexts): + format_str = "%d. %s" + if context == active_context: + format_str = "* " + format_str + print(format_str % (i, context)) + context = input("Enter context number: ") + context = int(context) + if context not in range(len(contexts)): + print( + "Number out of range. Using default context %s." % + active_context) + context_name = active_context + else: + context_name = contexts[context] + + # Configs can be set in Configuration class directly or using helper + # utility + config.load_kube_config(config_file, context_name) + + print("Active host is %s" % configuration.host) + + v1 = client.CoreV1Api() + print("Listing pods with their IPs:") + ret = v1.list_pod_for_all_namespaces(watch=False) + for item in ret.items: + print( + "%s\t%s\t%s" % + (item.status.pod_ip, + item.metadata.namespace, + item.metadata.name)) + + +if __name__ == '__main__': + main() diff --git a/kubernetes/config/__init__.py b/kubernetes/config/__init__.py index e52d8a4a6e..92632043c1 100644 --- a/kubernetes/config/__init__.py +++ b/kubernetes/config/__init__.py @@ -14,4 +14,5 @@ from .config_exception import ConfigException from .incluster_config import load_incluster_config +from .kube_config import list_kube_config_contexts from .kube_config import load_kube_config diff --git a/kubernetes/config/kube_config.py b/kubernetes/config/kube_config.py index cc1c483de5..e7e593aa19 100644 --- a/kubernetes/config/kube_config.py +++ b/kubernetes/config/kube_config.py @@ -22,151 +22,243 @@ from kubernetes.client import configuration from oauth2client.client import GoogleCredentials -_temp_files = [] +from .config_exception import ConfigException + +_temp_files = {} def _cleanup_temp_files(): - for f in _temp_files: - os.remove(f) + for temp_file in _temp_files.values(): + os.remove(temp_file) def _create_temp_file_with_content(content): if len(_temp_files) == 0: atexit.register(_cleanup_temp_files) + # Because we may change context several times, try to remember files we + # created and reuse them at a small memory cost. + content_key = str(content) + if content_key in _temp_files: + return _temp_files[content_key] _, name = tempfile.mkstemp() - _temp_files.append(name) - if isinstance(content, str): - content = content.encode('utf8') + _temp_files[content_key] = name with open(name, 'wb') as fd: - fd.write(base64.decodestring(content)) + fd.write(content.encode() if isinstance(content, str) else content) return name -def _file_from_file_or_data(o, file_key_name, data_key_name=None): - if not data_key_name: - data_key_name = file_key_name + "-data" - if data_key_name in o: - return _create_temp_file_with_content(o[data_key_name]) - if file_key_name in o: - return o[file_key_name] - - -def _data_from_file_or_data(o, file_key_name, data_key_name=None): - if not data_key_name: - data_key_name = file_key_name + "_data" - if data_key_name in o: - return o[data_key_name] - if file_key_name in o: - with open(o[file_key_name], 'r') as f: - data = f.read() - return data - - -def _load_gcp_token(user): - if 'auth-provider' not in user: - return - if 'name' not in user['auth-provider']: - return - if user['auth-provider']['name'] != 'gcp': - return - # Ignore configs in auth-provider and rely on GoogleCredentials - # caching and refresh mechanism. - # TODO: support gcp command based token ("cmd-path" config). - return (GoogleCredentials - .get_application_default() - .get_access_token() - .access_token) - - -def _load_authentication(user): - """Read authentication from kube-config user section. - - This function goes through various authetication methods in user section of - kubeconfig and stops if it founds a valid authentication method. The order - of authentication methods is: - - 1. GCP auth-provider - 2. token_data - 3. token field (point to a token file) - 4. username/password - """ - # Read authentication - token = _load_gcp_token(user) - if not token: - token = _data_from_file_or_data(user, 'tokenFile', 'token') - if token: - configuration.api_key['authorization'] = "bearer " + token - else: - if 'username' in user and 'password' in user: - configuration.api_key['authorization'] = urllib3.util.make_headers( - basic_auth=user['username'] + ':' + - user['password']).get('authorization') - - -def _load_cluster_info(cluster, user): - """Loads cluster information from kubeconfig such as host and SSL certs.""" - if 'server' in cluster: - configuration.host = cluster['server'] - if configuration.host.startswith("https"): - configuration.ssl_ca_cert = _file_from_file_or_data( - cluster, 'certificate-authority') - configuration.cert_file = _file_from_file_or_data( - user, 'client-certificate') - configuration.key_file = _file_from_file_or_data( - user, 'client-key') - - -class _node: - """Remembers each key's path and construct a relevant exception message - in case of missing keys.""" +class FileOrData(object): + """Utility class to read content of obj[%data_key_name] or file's + content of obj[%file_key_name] and represent it as file or data. + Note that the data is preferred. The obj[%file_key_name] will be used iff + obj['%data_key_name'] is not set or empty. Assumption is file content is + raw data and data field is base64 string.""" + + def __init__(self, obj, file_key_name, data_key_name=None): + if not data_key_name: + data_key_name = file_key_name + "-data" + self._file = None + self._data = None + if data_key_name in obj: + self._data = obj[data_key_name] + elif file_key_name in obj: + self._file = obj[file_key_name] + + def as_file(self): + """If obj[%data_key_name] exists, return name of a file with base64 + decoded obj[%data_key_name] content otherwise obj[%file_key_name].""" + use_data_if_no_file = not self._file and self._data + if use_data_if_no_file: + self._file = _create_temp_file_with_content( + base64.decodestring(self._data.encode())) + return self._file + + def as_data(self): + """If obj[%data_key_name] exists, Return obj[%data_key_name] otherwise + base64 encoded string of obj[%file_key_name] file content.""" + use_file_if_no_data = not self._data and self._file + if use_file_if_no_data: + with open(self._file) as f: + self._data = bytes.decode( + base64.encodestring(str.encode(f.read()))) + return self._data + + +class KubeConfigLoader(object): + + def __init__(self, config_dict, active_context=None, + get_google_credentials=None, client_configuration=None): + self._config = ConfigNode('kube-config', config_dict) + self._current_context = None + self._user = None + self._cluster = None + self.set_active_context(active_context) + if get_google_credentials: + self._get_google_credentials = get_google_credentials + else: + self._get_google_credentials = lambda: ( + GoogleCredentials.get_application_default() + .get_access_token().access_token) + if client_configuration: + self._client_configuration = client_configuration + else: + self._client_configuration = configuration + + def set_active_context(self, context_name=None): + if context_name is None: + context_name = self._config['current-context'] + self._current_context = self._config['contexts'].get_with_name( + context_name) + if self._current_context['context'].safe_get('user'): + self._user = self._config['users'].get_with_name( + self._current_context['context']['user'])['user'] + else: + self._user = None + self._cluster = self._config['clusters'].get_with_name( + self._current_context['context']['cluster'])['cluster'] + + def _load_authentication(self): + """Read authentication from kube-config user section if exists. + + This function goes through various authentication methods in user + section of kube-config and stops if it finds a valid authentication + method. The order of authentication methods is: + + 1. GCP auth-provider + 2. token_data + 3. token field (point to a token file) + 4. username/password + """ + if not self._user: + return + if self._load_gcp_token(): + return + if self._load_user_token(): + return + self._load_user_pass_token() + + def _load_gcp_token(self): + if 'auth-provider' not in self._user: + return + if 'name' not in self._user['auth-provider']: + return + if self._user['auth-provider']['name'] != 'gcp': + return + # Ignore configs in auth-provider and rely on GoogleCredentials + # caching and refresh mechanism. + # TODO: support gcp command based token ("cmd-path" config). + self.token = self._get_google_credentials() + return self.token + + def _load_user_token(self): + token = FileOrData(self._user, 'tokenFile', 'token').as_data() + if token: + self.token = token + return True + + def _load_user_pass_token(self): + if 'username' in self._user and 'password' in self._user: + self.token = urllib3.util.make_headers( + basic_auth=(self._user['username'] + ':' + + self._user['password'])).get('authorization') + return True + + def _load_cluster_info(self): + if 'server' in self._cluster: + self.host = self._cluster['server'] + if self.host.startswith("https"): + self.ssl_ca_cert = FileOrData( + self._cluster, 'certificate-authority').as_file() + self.cert_file = FileOrData( + self._user, 'client-certificate').as_file() + self.key_file = FileOrData(self._user, 'client-key').as_file() + + def _set_config(self): + if 'token' in self.__dict__: + self._client_configuration.api_key['authorization'] = self.token + # copy these keys directly from self to configuration object + keys = ['host', 'ssl_ca_cert', 'cert_file', 'key_file'] + for key in keys: + if key in self.__dict__: + setattr(self._client_configuration, key, getattr(self, key)) + + def load_and_set(self): + self._load_authentication() + self._load_cluster_info() + self._set_config() + + def list_contexts(self): + return [context.value for context in self._config['contexts']] + + @property + def current_context(self): + return self._current_context.value + + +class ConfigNode(object): + """Remembers each config key's path and construct a relevant exception + message in case of missing keys. The assumption is all access keys are + present in a well-formed kube-config.""" def __init__(self, name, value): - self._name = name - self._value = value + self.name = name + self.value = value def __contains__(self, key): - return key in self._value + return key in self.value + + def __len__(self): + return len(self.value) + + def safe_get(self, key): + if (isinstance(self.value, list) and isinstance(key, int) or + key in self.value): + return self.value[key] def __getitem__(self, key): - if key in self._value: - v = self._value[key] - if isinstance(v, dict) or isinstance(v, list): - return _node('%s/%s' % (self._name, key), v) - else: - return v - raise Exception( - 'Invalid kube-config file. Expected key %s in %s' - % (key, self._name)) + v = self.safe_get(key) + if not v: + raise ConfigException( + 'Invalid kube-config file. Expected key %s in %s' + % (key, self.name)) + if isinstance(v, dict) or isinstance(v, list): + return ConfigNode('%s/%s' % (self.name, key), v) + else: + return v def get_with_name(self, name): - if not isinstance(self._value, list): - raise Exception( + if not isinstance(self.value, list): + raise ConfigException( 'Invalid kube-config file. Expected %s to be a list' - % self._name) - for v in self._value: + % self.name) + for v in self.value: if 'name' not in v: - raise Exception( + raise ConfigException( 'Invalid kube-config file. ' 'Expected all values in %s list to have \'name\' key' - % self._name) + % self.name) if v['name'] == name: - return _node('%s[name=%s]' % (self._name, name), v) - raise Exception( - "Cannot find object with name %s in %s list" % (name, self._name)) + return ConfigNode('%s[name=%s]' % (self.name, name), v) + raise ConfigException( + 'Invalid kube-config file. ' + 'Expected object with name %s in %s list' % (name, self.name)) -def load_kube_config(config_file): - """Loads authentication and cluster information from kube-config file - and store them in kubernetes.client.configuration.""" - +def list_kube_config_contexts(config_file): with open(config_file) as f: - config = _node('kube-config', yaml.load(f)) + loader = KubeConfigLoader(config_dict=yaml.load(f)) + return loader.list_contexts(), loader.current_context - current_context = config['contexts'].get_with_name( - config['current-context'])['context'] - user = config['users'].get_with_name(current_context['user'])['user'] - cluster = config['clusters'].get_with_name( - current_context['cluster'])['cluster'] - _load_cluster_info(cluster, user) - _load_authentication(user) +def load_kube_config(config_file, context=None): + """Loads authentication and cluster information from kube-config file + and stores them in kubernetes.client.configuration. + + :param config_file: Name of the kube-config file. + :param context: set the active context. If is set to None, current_context + from config file will be used. + """ + + with open(config_file) as f: + KubeConfigLoader( + config_dict=yaml.load(f), active_context=context).load_and_set() diff --git a/kubernetes/config/kube_config_test.py b/kubernetes/config/kube_config_test.py new file mode 100644 index 0000000000..6c4cd24309 --- /dev/null +++ b/kubernetes/config/kube_config_test.py @@ -0,0 +1,469 @@ +# Copyright 2016 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import base64 +import os +import tempfile +import unittest + +from .config_exception import ConfigException +from .kube_config import (ConfigNode, FileOrData, KubeConfigLoader, + _create_temp_file_with_content) + + +def _base64(string): + return base64.encodestring(string.encode()).decode() + + +TEST_FILE_KEY = "file" +TEST_DATA_KEY = "data" +TEST_FILENAME = "test-filename" + +TEST_DATA = "test-data" +TEST_DATA_BASE64 = _base64(TEST_DATA) + +TEST_ANOTHER_DATA = "another-test-data" +TEST_ANOTHER_DATA_BASE64 = _base64(TEST_ANOTHER_DATA) + +TEST_HOST = "test-host" +TEST_USERNAME = "me" +TEST_PASSWORD = "pass" +# token for me:pass +TEST_BASIC_TOKEN = "Basic bWU6cGFzcw==" + +TEST_SSL_HOST = "https://test-host" +TEST_CERTIFICATE_AUTH = "cert-auth" +TEST_CERTIFICATE_AUTH_BASE64 = _base64(TEST_CERTIFICATE_AUTH) +TEST_CLIENT_KEY = "client-key" +TEST_CLIENT_KEY_BASE64 = _base64(TEST_CLIENT_KEY) +TEST_CLIENT_CERT = "client-cert" +TEST_CLIENT_CERT_BASE64 = _base64(TEST_CLIENT_CERT) + + +class BaseTestCase(unittest.TestCase): + + def setUp(self): + self._temp_files = [] + + def tearDown(self): + for f in self._temp_files: + os.remove(f) + + def _create_temp_file(self, content=""): + handler, name = tempfile.mkstemp() + self._temp_files.append(name) + os.write(handler, str.encode(content)) + os.close(handler) + return name + + +class TestFileOrData(BaseTestCase): + + @staticmethod + def get_file_content(filename): + with open(filename) as f: + return f.read() + + def test_file_given_file(self): + obj = {TEST_FILE_KEY: TEST_FILENAME} + t = FileOrData(obj=obj, file_key_name=TEST_FILE_KEY) + self.assertEqual(TEST_FILENAME, t.as_file()) + + def test_file_given_data(self): + obj = {TEST_DATA_KEY: TEST_DATA_BASE64} + t = FileOrData(obj=obj, file_key_name=TEST_FILE_KEY, + data_key_name=TEST_DATA_KEY) + self.assertEqual(TEST_DATA, self.get_file_content(t.as_file())) + + def test_data_given_data(self): + obj = {TEST_DATA_KEY: TEST_DATA_BASE64} + t = FileOrData(obj=obj, file_key_name=TEST_FILE_KEY, + data_key_name=TEST_DATA_KEY) + self.assertEqual(TEST_DATA_BASE64, t.as_data()) + + def test_data_given_file(self): + obj = { + TEST_FILE_KEY: self._create_temp_file(content=TEST_DATA)} + t = FileOrData(obj=obj, file_key_name=TEST_FILE_KEY) + self.assertEqual(TEST_DATA_BASE64, t.as_data()) + + def test_data_given_file_and_data(self): + obj = { + TEST_DATA_KEY: TEST_DATA_BASE64, + TEST_FILE_KEY: self._create_temp_file( + content=TEST_ANOTHER_DATA)} + t = FileOrData(obj=obj, file_key_name=TEST_FILE_KEY, + data_key_name=TEST_DATA_KEY) + self.assertEqual(TEST_DATA_BASE64, t.as_data()) + + def test_file_given_file_and_data(self): + obj = { + TEST_DATA_KEY: TEST_DATA_BASE64, + TEST_FILE_KEY: self._create_temp_file( + content=TEST_ANOTHER_DATA)} + t = FileOrData(obj=obj, file_key_name=TEST_FILE_KEY, + data_key_name=TEST_DATA_KEY) + self.assertEqual(TEST_DATA, self.get_file_content(t.as_file())) + + def test_create_temp_file_with_content(self): + self.assertEqual(TEST_DATA, + self.get_file_content( + _create_temp_file_with_content(TEST_DATA))) + + +class TestConfigNode(BaseTestCase): + + test_obj = {"key1": "test", "key2": ["a", "b", "c"], + "key3": {"inner_key": "inner_value"}, + "with_names": [{"name": "test_name", "value": "test_value"}, + {"name": "test_name2", + "value": {"key1", "test"}}, + {"name": "test_name3", "value": [1, 2, 3]}]} + + def setUp(self): + super(TestConfigNode, self).setUp() + self.node = ConfigNode("test_obj", self.test_obj) + + def test_normal_map_array_operations(self): + self.assertEqual("test", self.node['key1']) + self.assertEqual(4, len(self.node)) + + self.assertEqual("test_obj/key2", self.node['key2'].name) + self.assertEqual(["a", "b", "c"], self.node['key2'].value) + self.assertEqual("b", self.node['key2'][1]) + self.assertEqual(3, len(self.node['key2'])) + + self.assertEqual("test_obj/key3", self.node['key3'].name) + self.assertEqual({"inner_key": "inner_value"}, self.node['key3'].value) + self.assertEqual("inner_value", self.node['key3']["inner_key"]) + self.assertEqual(1, len(self.node['key3'])) + + def test_get_with_name(self): + node = self.node["with_names"] + self.assertEqual( + "test_value", + node.get_with_name("test_name")["value"]) + self.assertTrue( + isinstance(node.get_with_name("test_name2"), ConfigNode)) + self.assertTrue( + isinstance(node.get_with_name("test_name3"), ConfigNode)) + self.assertEqual("test_obj/with_names[name=test_name2]", + node.get_with_name("test_name2").name) + self.assertEqual("test_obj/with_names[name=test_name3]", + node.get_with_name("test_name3").name) + + def expect_exception(self, func, message_part): + with self.assertRaises(ConfigException) as context: + func() + self.assertIn(message_part, str(context.exception)) + + def test_key_does_not_exists(self): + self.expect_exception(lambda: self.node['not-exists-key'], + "Expected key not-exists-key in test_obj") + self.expect_exception(lambda: self.node['key3']['not-exists-key'], + "Expected key not-exists-key in test_obj/key3") + + def test_get_with_name_on_invalid_object(self): + self.expect_exception( + lambda: self.node['key2'].get_with_name('no-name'), + "Expected all values in test_obj/key2 list to have \'name\' key") + + def test_get_with_name_on_non_list_object(self): + self.expect_exception( + lambda: self.node['key3'].get_with_name('no-name'), + "Expected test_obj/key3 to be a list") + + def test_get_with_name_on_name_does_not_exists(self): + self.expect_exception( + lambda: self.node['with_names'].get_with_name('no-name'), + "Expected object with name no-name in test_obj/with_names list") + + +class FakeConfig: + + FILE_KEYS = ["ssl_ca_cert", "key_file", "cert_file"] + + def __init__(self, token=None, **kwargs): + self.api_key = {} + if token: + self.api_key['authorization'] = token + + self.__dict__.update(kwargs) + + def __eq__(self, other): + if len(self.__dict__) != len(other.__dict__): + return + for k, v in self.__dict__.items(): + if k not in other.__dict__: + return + if k in self.FILE_KEYS: + try: + with open(v) as f1, open(other.__dict__[k]) as f2: + if f1.read() != f2.read(): + return + except IOError: + # fall back to only compare filenames in case we are + # testing the passing of filenames to the config + if other.__dict__[k] != v: + return + else: + if other.__dict__[k] != v: + return + return True + + def __repr__(self): + rep = "\n" + for k, v in self.__dict__.items(): + val = v + if k in self.FILE_KEYS: + try: + with open(v) as f: + val = "FILE: %s" % str.decode(f.read()) + except IOError as e: + val = "ERROR: %s" % str(e) + rep += "\t%s: %s\n" % (k, val) + return "Config(%s\n)" % rep + + +class TestKubeConfigLoader(BaseTestCase): + TEST_KUBE_CONFIG = { + "current-context": "no_user", + "contexts": [ + { + "name": "no_user", + "context": { + "cluster": "default" + } + }, + { + "name": "simple_token", + "context": { + "cluster": "default", + "user": "simple_token" + } + }, + { + "name": "gcp", + "context": { + "cluster": "default", + "user": "gcp" + } + }, + { + "name": "user_pass", + "context": { + "cluster": "default", + "user": "user_pass" + } + }, + { + "name": "ssl", + "context": { + "cluster": "ssl", + "user": "ssl" + } + }, + { + "name": "ssl-no_file", + "context": { + "cluster": "ssl-no_file", + "user": "ssl-no_file" + } + }, + ], + "clusters": [ + { + "name": "default", + "cluster": { + "server": TEST_HOST + } + }, + { + "name": "ssl-no_file", + "cluster": { + "server": TEST_SSL_HOST, + "certificate-authority": TEST_CERTIFICATE_AUTH, + } + }, + { + "name": "ssl", + "cluster": { + "server": TEST_SSL_HOST, + "certificate-authority-data": TEST_CERTIFICATE_AUTH_BASE64, + } + }, + ], + "users": [ + { + "name": "simple_token", + "user": { + "token": TEST_DATA_BASE64, + "username": TEST_USERNAME, # should be ignored + "password": TEST_PASSWORD, # should be ignored + } + }, + { + "name": "gcp", + "user": { + "auth-provider": { + "name": "gcp", + "access_token": "not_used", + }, + "token": TEST_DATA_BASE64, # should be ignored + "username": TEST_USERNAME, # should be ignored + "password": TEST_PASSWORD, # should be ignored + } + }, + { + "name": "user_pass", + "user": { + "username": TEST_USERNAME, # should be ignored + "password": TEST_PASSWORD, # should be ignored + } + }, + { + "name": "ssl-no_file", + "user": { + "token": TEST_DATA_BASE64, + "client-certificate": TEST_CLIENT_CERT, + "client-key": TEST_CLIENT_KEY, + } + }, + { + "name": "ssl", + "user": { + "token": TEST_DATA_BASE64, + "client-certificate-data": TEST_CLIENT_CERT_BASE64, + "client-key-data": TEST_CLIENT_KEY_BASE64, + } + }, + ] + } + + def test_no_user_context(self): + expected = FakeConfig(host=TEST_HOST) + actual = FakeConfig() + KubeConfigLoader( + config_dict=self.TEST_KUBE_CONFIG, + active_context="no_user", + client_configuration=actual).load_and_set() + self.assertEqual(expected, actual) + + def test_simple_token(self): + expected = FakeConfig(host=TEST_HOST, token=TEST_DATA_BASE64) + actual = FakeConfig() + KubeConfigLoader( + config_dict=self.TEST_KUBE_CONFIG, + active_context="simple_token", + client_configuration=actual).load_and_set() + self.assertEqual(expected, actual) + + def test_load_user_token(self): + loader = KubeConfigLoader( + config_dict=self.TEST_KUBE_CONFIG, + active_context="simple_token") + self.assertTrue(loader._load_user_token()) + self.assertEqual(TEST_DATA_BASE64, loader.token) + + def test_gcp(self): + expected = FakeConfig(host=TEST_HOST, token=TEST_ANOTHER_DATA_BASE64) + actual = FakeConfig() + KubeConfigLoader( + config_dict=self.TEST_KUBE_CONFIG, + active_context="gcp", + client_configuration=actual, + get_google_credentials=lambda: TEST_ANOTHER_DATA_BASE64) \ + .load_and_set() + self.assertEqual(expected, actual) + + def test_load_gcp_token(self): + loader = KubeConfigLoader( + config_dict=self.TEST_KUBE_CONFIG, + active_context="gcp", + get_google_credentials=lambda: TEST_ANOTHER_DATA_BASE64) + self.assertTrue(loader._load_gcp_token()) + self.assertEqual(TEST_ANOTHER_DATA_BASE64, loader.token) + + def test_user_pass(self): + expected = FakeConfig(host=TEST_HOST, token=TEST_BASIC_TOKEN) + actual = FakeConfig() + KubeConfigLoader( + config_dict=self.TEST_KUBE_CONFIG, + active_context="user_pass", + client_configuration=actual).load_and_set() + self.assertEqual(expected, actual) + + def test_load_user_pass_token(self): + loader = KubeConfigLoader( + config_dict=self.TEST_KUBE_CONFIG, + active_context="user_pass") + self.assertTrue(loader._load_user_pass_token()) + self.assertEqual(TEST_BASIC_TOKEN, loader.token) + + def test_ssl_no_cert_files(self): + expected = FakeConfig( + host=TEST_SSL_HOST, + token=TEST_DATA_BASE64, + cert_file=TEST_CLIENT_CERT, + key_file=TEST_CLIENT_KEY, + ssl_ca_cert=TEST_CERTIFICATE_AUTH + ) + actual = FakeConfig() + KubeConfigLoader( + config_dict=self.TEST_KUBE_CONFIG, + active_context="ssl-no_file", + client_configuration=actual).load_and_set() + self.assertEqual(expected, actual) + + def test_ssl(self): + expected = FakeConfig( + host=TEST_SSL_HOST, + token=TEST_DATA_BASE64, + cert_file=self._create_temp_file(TEST_CLIENT_CERT), + key_file=self._create_temp_file(TEST_CLIENT_KEY), + ssl_ca_cert=self._create_temp_file(TEST_CERTIFICATE_AUTH) + ) + actual = FakeConfig() + KubeConfigLoader( + config_dict=self.TEST_KUBE_CONFIG, + active_context="ssl", + client_configuration=actual).load_and_set() + self.assertEqual(expected, actual) + + def test_list_contexts(self): + loader = KubeConfigLoader( + config_dict=self.TEST_KUBE_CONFIG, + active_context="no_user") + actual_contexts = loader.list_contexts() + expected_contexts = ConfigNode("", self.TEST_KUBE_CONFIG)['contexts'] + for actual in actual_contexts: + expected = expected_contexts.get_with_name(actual['name']) + self.assertEqual(expected.value, actual) + + def test_current_context(self): + loader = KubeConfigLoader(config_dict=self.TEST_KUBE_CONFIG) + expected_contexts = ConfigNode("", self.TEST_KUBE_CONFIG)['contexts'] + self.assertEqual(expected_contexts.get_with_name("no_user").value, + loader.current_context) + + def test_set_active_context(self): + loader = KubeConfigLoader(config_dict=self.TEST_KUBE_CONFIG) + loader.set_active_context("ssl") + expected_contexts = ConfigNode("", self.TEST_KUBE_CONFIG)['contexts'] + self.assertEqual(expected_contexts.get_with_name("ssl").value, + loader.current_context) + + +if __name__ == '__main__': + unittest.main()