Giter Club home page Giter Club logo

ethernodes-org / -usr-bin-env-python-copyright-2018-the-kubernetes-authors.-licensed-under-the-apache-licens Goto Github PK

View Code? Open in Web Editor NEW
2.0 2.0 3.0 9 KB

#!/usr/bin/env python # Copyright 2018 The Kubernetes Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import base64 import datetime import json import os import shutil import tempfile import unittest import mock import yaml from six import PY3, next from kubernetes.client import Configuration from .config_exception import ConfigException from .kube_config import (ENV_KUBECONFIG_PATH_SEPARATOR, ConfigNode, FileOrData, KubeConfigLoader, KubeConfigMerger, _cleanup_temp_files, _create_temp_file_with_content, list_kube_config_contexts, load_kube_config, new_client_from_config) BEARER_TOKEN_FORMAT = "Bearer %s" EXPIRY_DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ" # should be less than kube_config.EXPIRY_SKEW_PREVENTION_DELAY PAST_EXPIRY_TIMEDELTA = 2 # should be more than kube_config.EXPIRY_SKEW_PREVENTION_DELAY FUTURE_EXPIRY_TIMEDELTA = 60 NON_EXISTING_FILE = "zz_non_existing_file_472398324" def _base64(string): return base64.standard_b64encode(string.encode()).decode() def _urlsafe_unpadded_b64encode(string): return base64.urlsafe_b64encode(string.encode()).decode().rstrip("=") def _format_expiry_datetime(dt): return dt.strftime(EXPIRY_DATETIME_FORMAT) def _get_expiry(loader, active_context): expired_gcp_conf = ( item for item in loader._config.value.get("users") if item.get("name") == active_context) return next(expired_gcp_conf).get("user").get("auth-provider") \ .get("config").get("expiry") def _raise_exception(st): raise Exception(st) TEST_FILE_KEY = "file" TEST_DATA_KEY = "data" TEST_FILENAME = "test-filename" TEST_DATA = "test-data" TEST_DATA_BASE64 = _base64(TEST_DATA) TEST_ANOTHER_DATA = "another-test-data" TEST_ANOTHER_DATA_BASE64 = _base64(TEST_ANOTHER_DATA) TEST_HOST = "test-host" TEST_USERNAME = "me" TEST_PASSWORD = "pass" # token for me:pass TEST_BASIC_TOKEN = "Basic bWU6cGFzcw==" DATETIME_EXPIRY_PAST = datetime.datetime.utcnow() - datetime.timedelta( minutes=PAST_EXPIRY_TIMEDELTA) DATETIME_EXPIRY_FUTURE = datetime.datetime.utcnow() + datetime.timedelta( minutes=FUTURE_EXPIRY_TIMEDELTA) TEST_TOKEN_EXPIRY_PAST = _format_expiry_datetime(DATETIME_EXPIRY_PAST) TEST_SSL_HOST = "https://test-host" TEST_CERTIFICATE_AUTH = "cert-auth" TEST_CERTIFICATE_AUTH_BASE64 = _base64(TEST_CERTIFICATE_AUTH) TEST_CLIENT_KEY = "client-key" TEST_CLIENT_KEY_BASE64 = _base64(TEST_CLIENT_KEY) TEST_CLIENT_CERT = "client-cert" TEST_CLIENT_CERT_BASE64 = _base64(TEST_CLIENT_CERT) TEST_OIDC_TOKEN = "test-oidc-token" TEST_OIDC_INFO = "{\"name\": \"test\"}" TEST_OIDC_BASE = ".".join([ _urlsafe_unpadded_b64encode(TEST_OIDC_TOKEN), _urlsafe_unpadded_b64encode(TEST_OIDC_INFO) ]) TEST_OIDC_LOGIN = ".".join( [TEST_OIDC_BASE, _urlsafe_unpadded_b64encode(TEST_CLIENT_CERT_BASE64)]) TEST_OIDC_TOKEN = "Bearer %s" % TEST_OIDC_LOGIN TEST_OIDC_EXP = "{\"name\": \"test\",\"exp\": 536457600}" TEST_OIDC_EXP_BASE = _urlsafe_unpadded_b64encode( TEST_OIDC_TOKEN) + "." + _urlsafe_unpadded_b64encode(TEST_OIDC_EXP) TEST_OIDC_EXPIRED_LOGIN = ".".join( [TEST_OIDC_EXP_BASE, _urlsafe_unpadded_b64encode(TEST_CLIENT_CERT)]) TEST_OIDC_CONTAINS_RESERVED_CHARACTERS = ".".join([ _urlsafe_unpadded_b64encode(TEST_OIDC_TOKEN), _urlsafe_unpadded_b64encode(TEST_OIDC_INFO).replace("a", "+"), _urlsafe_unpadded_b64encode(TEST_CLIENT_CERT) ]) TEST_OIDC_INVALID_PADDING_LENGTH = ".".join([ _urlsafe_unpadded_b64encode(TEST_OIDC_TOKEN), "aaaaa", _urlsafe_unpadded_b64encode(TEST_CLIENT_CERT) ]) TEST_OIDC_CA = _base64(TEST_CERTIFICATE_AUTH) class BaseTestCase(unittest.TestCase): def setUp(self): self._temp_files = [] def tearDown(self): for f in self._temp_files: os.remove(f) def _create_temp_file(self, content=""): handler, name = tempfile.mkstemp() self._temp_files.append(name) os.write(handler, str.encode(content)) os.close(handler) return name def expect_exception(self, func, message_part, *args, **kwargs): with self.assertRaises(ConfigException) as context: func(*args, **kwargs) self.assertIn(message_part, str(context.exception)) class TestFileOrData(BaseTestCase): @staticmethod def get_file_content(filename): with open(filename) as f: return f.read() def test_file_given_file(self): temp_filename = _create_temp_file_with_content(TEST_DATA) obj = {TEST_FILE_KEY: temp_filename} t = FileOrData(obj=obj, file_key_name=TEST_FILE_KEY) self.assertEqual(TEST_DATA, self.get_file_content(t.as_file())) def test_file_given_non_existing_file(self): temp_filename = NON_EXISTING_FILE obj = {TEST_FILE_KEY: temp_filename} t = FileOrData(obj=obj, file_key_name=TEST_FILE_KEY) self.expect_exception(t.as_file, "does not exists") def test_file_given_data(self): obj = {TEST_DATA_KEY: TEST_DATA_BASE64} t = FileOrData( obj=obj, file_key_name=TEST_FILE_KEY, data_key_name=TEST_DATA_KEY) self.assertEqual(TEST_DATA, self.get_file_content(t.as_file())) def test_file_given_data_no_base64(self): obj = {TEST_DATA_KEY: TEST_DATA} t = FileOrData( obj=obj, file_key_name=TEST_FILE_KEY, data_key_name=TEST_DATA_KEY, base64_file_content=False) self.assertEqual(TEST_DATA, self.get_file_content(t.as_file())) def test_data_given_data(self): obj = {TEST_DATA_KEY: TEST_DATA_BASE64} t = FileOrData( obj=obj, file_key_name=TEST_FILE_KEY, data_key_name=TEST_DATA_KEY) self.assertEqual(TEST_DATA_BASE64, t.as_data()) def test_data_given_file(self): obj = {TEST_FILE_KEY: self._create_temp_file(content=TEST_DATA)} t = FileOrData(obj=obj, file_key_name=TEST_FILE_KEY) self.assertEqual(TEST_DATA_BASE64, t.as_data()) def test_data_given_file_no_base64(self): obj = {TEST_FILE_KEY: self._create_temp_file(content=TEST_DATA)} t = FileOrData( obj=obj, file_key_name=TEST_FILE_KEY, base64_file_content=False) self.assertEqual(TEST_DATA, t.as_data()) def test_data_given_file_and_data(self): obj = { TEST_DATA_KEY: TEST_DATA_BASE64, TEST_FILE_KEY: self._create_temp_file(content=TEST_ANOTHER_DATA) } t = FileOrData( obj=obj, file_key_name=TEST_FILE_KEY, data_key_name=TEST_DATA_KEY) self.assertEqual(TEST_DATA_BASE64, t.as_data()) def test_file_given_file_and_data(self): obj = { TEST_DATA_KEY: TEST_DATA_BASE64, TEST_FILE_KEY: self._create_temp_file(content=TEST_ANOTHER_DATA) } t = FileOrData( obj=obj, file_key_name=TEST_FILE_KEY, data_key_name=TEST_DATA_KEY) self.assertEqual(TEST_DATA, self.get_file_content(t.as_file())) def test_file_with_custom_dirname(self): tempfile = self._create_temp_file(content=TEST_DATA) tempfile_dir = os.path.dirname(tempfile) tempfile_basename = os.path.basename(tempfile) obj = {TEST_FILE_KEY: tempfile_basename} t = FileOrData( obj=obj, file_key_name=TEST_FILE_KEY, file_base_path=tempfile_dir) self.assertEqual(TEST_DATA, self.get_file_content(t.as_file())) def test_create_temp_file_with_content(self): self.assertEqual( TEST_DATA, self.get_file_content(_create_temp_file_with_content(TEST_DATA))) _cleanup_temp_files() def test_file_given_data_bytes(self): obj = {TEST_DATA_KEY: TEST_DATA_BASE64.encode()} t = FileOrData( obj=obj, file_key_name=TEST_FILE_KEY, data_key_name=TEST_DATA_KEY) self.assertEqual(TEST_DATA, self.get_file_content(t.as_file())) def test_file_given_data_bytes_no_base64(self): obj = {TEST_DATA_KEY: TEST_DATA.encode()} t = FileOrData( obj=obj, file_key_name=TEST_FILE_KEY, data_key_name=TEST_DATA_KEY, base64_file_content=False) self.assertEqual(TEST_DATA, self.get_file_content(t.as_file())) class TestConfigNode(BaseTestCase): test_obj = { "key1": "test", "key2": ["a", "b", "c"], "key3": { "inner_key": "inner_value" }, "with_names": [{ "name": "test_name", "value": "test_value" }, { "name": "test_name2", "value": {"key1", "test"} }, { "name": "test_name3", "value": [1, 2, 3] }], "with_names_dup": [{ "name": "test_name", "value": "test_value" }, { "name": "test_name", "value": {"key1", "test"} }, { "name": "test_name3", "value": [1, 2, 3] }] } def setUp(self): super(TestConfigNode, self).setUp() self.node = ConfigNode("test_obj", self.test_obj) def test_normal_map_array_operations(self): self.assertEqual("test", self.node["key1"]) self.assertEqual(5, len(self.node)) self.assertEqual("test_obj/key2", self.node["key2"].name) self.assertEqual(["a", "b", "c"], self.node["key2"].value) self.assertEqual("b", self.node["key2"][1]) self.assertEqual(3, len(self.node["key2"])) self.assertEqual("test_obj/key3", self.node["key3"].name) self.assertEqual({"inner_key": "inner_value"}, self.node["key3"].value) self.assertEqual("inner_value", self.node["key3"]["inner_key"]) self.assertEqual(1, len(self.node["key3"])) def test_get_with_name(self): node = self.node["with_names"] self.assertEqual("test_value", node.get_with_name("test_name")["value"]) self.assertTrue(isinstance(node.get_with_name("test_name2"), ConfigNode)) self.assertTrue(isinstance(node.get_with_name("test_name3"), ConfigNode)) self.assertEqual("test_obj/with_names[name=test_name2]", node.get_with_name("test_name2").name) self.assertEqual("test_obj/with_names[name=test_name3]", node.get_with_name("test_name3").name) def test_key_does_not_exists(self): self.expect_exception(lambda: self.node["not-exists-key"], "Expected key not-exists-key in test_obj") self.expect_exception(lambda: self.node["key3"]["not-exists-key"], "Expected key not-exists-key in test_obj/key3") def test_get_with_name_on_invalid_object(self): self.expect_exception( lambda: self.node["key2"].get_with_name("no-name"), "Expected all values in test_obj/key2 list to have \'name\' key") def test_get_with_name_on_non_list_object(self): self.expect_exception(lambda: self.node["key3"].get_with_name("no-name"), "Expected test_obj/key3 to be a list") def test_get_with_name_on_name_does_not_exists(self): self.expect_exception( lambda: self.node["with_names"].get_with_name("no-name"), "Expected object with name no-name in test_obj/with_names list") def test_get_with_name_on_duplicate_name(self): self.expect_exception( lambda: self.node["with_names_dup"].get_with_name("test_name"), "Expected only one object with name test_name in " "test_obj/with_names_dup list") class FakeConfig: FILE_KEYS = ["ssl_ca_cert", "key_file", "cert_file"] def __init__(self, token=None, **kwargs): self.api_key = {} if token: self.api_key["authorization"] = token self.__dict__.update(kwargs) def __eq__(self, other): if len(self.__dict__) != len(other.__dict__): return for k, v in self.__dict__.items(): if k not in other.__dict__: return if k in self.FILE_KEYS: if v and other.__dict__[k]: try: with open(v) as f1, open(other.__dict__[k]) as f2: if f1.read() != f2.read(): return except IOError: # fall back to only compare filenames in case we are # testing the passing of filenames to the config if other.__dict__[k] != v: return else: if other.__dict__[k] != v: return else: if other.__dict__[k] != v: return return True def __repr__(self): rep = "\n" for k, v in self.__dict__.items(): val = v if k in self.FILE_KEYS: try: with open(v) as f: val = "FILE: %s" % str.decode(f.read()) except IOError as e: val = "ERROR: %s" % str(e) rep += "\t%s: %s\n" % (k, val) return "Config(%s\n)" % rep class TestKubeConfigLoader(BaseTestCase): TEST_KUBE_CONFIG = { "current-context": "no_user", "contexts": [ { "name": "no_user", "context": { "cluster": "default" } }, { "name": "simple_token", "context": { "cluster": "default", "user": "simple_token" } }, { "name": "gcp", "context": { "cluster": "default", "user": "gcp" } }, { "name": "expired_gcp", "context": { "cluster": "default", "user": "expired_gcp" } }, { "name": "expired_gcp_refresh", "context": { "cluster": "default", "user": "expired_gcp_refresh" } }, { "name": "oidc", "context": { "cluster": "default", "user": "oidc" } }, { "name": "expired_oidc", "context": { "cluster": "default", "user": "expired_oidc" } }, { "name": "expired_oidc_nocert", "context": { "cluster": "default", "user": "expired_oidc_nocert" } }, { "name": "oidc_contains_reserved_character", "context": { "cluster": "default", "user": "oidc_contains_reserved_character" } }, { "name": "oidc_invalid_padding_length", "context": { "cluster": "default", "user": "oidc_invalid_padding_length" } }, { "name": "user_pass", "context": { "cluster": "default", "user": "user_pass" } }, { "name": "ssl", "context": { "cluster": "ssl", "user": "ssl" } }, { "name": "no_ssl_verification", "context": { "cluster": "no_ssl_verification", "user": "ssl" } }, { "name": "ssl-no_file", "context": { "cluster": "ssl-no_file", "user": "ssl-no_file" } }, { "name": "ssl-local-file", "context": { "cluster": "ssl-local-file", "user": "ssl-local-file" } }, { "name": "non_existing_user", "context": { "cluster": "default", "user": "non_existing_user" } }, { "name": "exec_cred_user", "context": { "cluster": "default", "user": "exec_cred_user" } }, ], "clusters": [ { "name": "default", "cluster": { "server": TEST_HOST } }, { "name": "ssl-no_file", "cluster": { "server": TEST_SSL_HOST, "certificate-authority": TEST_CERTIFICATE_AUTH, } }, { "name": "ssl-local-file", "cluster": { "server": TEST_SSL_HOST, "certificate-authority": "cert_test", } }, { "name": "ssl", "cluster": { "server": TEST_SSL_HOST, "certificate-authority-data": TEST_CERTIFICATE_AUTH_BASE64, } }, { "name": "no_ssl_verification", "cluster": { "server": TEST_SSL_HOST, "insecure-skip-tls-verify": "true", } }, ], "users": [ { "name": "simple_token", "user": { "token": TEST_DATA_BASE64, "username": TEST_USERNAME, # should be ignored "password": TEST_PASSWORD, # should be ignored } }, { "name": "gcp", "user": { "auth-provider": { "name": "gcp", "config": { "access-token": TEST_DATA_BASE64, } }, "token": TEST_DATA_BASE64, # should be ignored "username": TEST_USERNAME, # should be ignored "password": TEST_PASSWORD, # should be ignored } }, { "name": "expired_gcp", "user": { "auth-provider": { "name": "gcp", "config": { "access-token": TEST_DATA_BASE64, "expiry": TEST_TOKEN_EXPIRY_PAST, # always in past } }, "token": TEST_DATA_BASE64, # should be ignored "username": TEST_USERNAME, # should be ignored "password": TEST_PASSWORD, # should be ignored } }, # Duplicated from "expired_gcp" so test_load_gcp_token_with_refresh # is isolated from test_gcp_get_api_key_with_prefix. { "name": "expired_gcp_refresh", "user": { "auth-provider": { "name": "gcp", "config": { "access-token": TEST_DATA_BASE64, "expiry": TEST_TOKEN_EXPIRY_PAST, # always in past } }, "token": TEST_DATA_BASE64, # should be ignored "username": TEST_USERNAME, # should be ignored "password": TEST_PASSWORD, # should be ignored } }, { "name": "oidc", "user": { "auth-provider": { "name": "oidc", "config": { "id-token": TEST_OIDC_LOGIN } } } }, { "name": "expired_oidc", "user": { "auth-provider": { "name": "oidc", "config": { "client-id": "tectonic-kubectl", "client-secret": "FAKE_SECRET", "id-token": TEST_OIDC_EXPIRED_LOGIN, "idp-certificate-authority-data": TEST_OIDC_CA, "idp-issuer-url": "https://example.org/identity", "refresh-token": "lucWJjEhlxZW01cXI3YmVlcYnpxNGhzk" } } } }, { "name": "expired_oidc_nocert", "user": { "auth-provider": { "name": "oidc", "config": { "client-id": "tectonic-kubectl", "client-secret": "FAKE_SECRET", "id-token": TEST_OIDC_EXPIRED_LOGIN, "idp-issuer-url": "https://example.org/identity", "refresh-token": "lucWJjEhlxZW01cXI3YmVlcYnpxNGhzk" } } } }, { "name": "oidc_contains_reserved_character", "user": { "auth-provider": { "name": "oidc", "config": { "client-id": "tectonic-kubectl", "client-secret": "FAKE_SECRET", "id-token": TEST_OIDC_CONTAINS_RESERVED_CHARACTERS, "idp-issuer-url": "https://example.org/identity", "refresh-token": "lucWJjEhlxZW01cXI3YmVlcYnpxNGhzk" } } } }, { "name": "oidc_invalid_padding_length", "user": { "auth-provider": { "name": "oidc", "config": { "client-id": "tectonic-kubectl", "client-secret": "FAKE_SECRET", "id-token": TEST_OIDC_INVALID_PADDING_LENGTH, "idp-issuer-url": "https://example.org/identity", "refresh-token": "lucWJjEhlxZW01cXI3YmVlcYnpxNGhzk" } } } }, { "name": "user_pass", "user": { "username": TEST_USERNAME, # should be ignored "password": TEST_PASSWORD, # should be ignored } }, { "name": "ssl-no_file", "user": { "token": TEST_DATA_BASE64, "client-certificate": TEST_CLIENT_CERT, "client-key": TEST_CLIENT_KEY, } }, { "name": "ssl-local-file", "user": { "tokenFile": "token_file", "client-certificate": "client_cert", "client-key": "client_key", } }, { "name": "ssl", "user": { "token": TEST_DATA_BASE64, "client-certificate-data": TEST_CLIENT_CERT_BASE64, "client-key-data": TEST_CLIENT_KEY_BASE64, } }, { "name": "exec_cred_user", "user": { "exec": { "apiVersion": "client.authentication.k8s.io/v1beta1", "command": "aws-iam-authenticator", "args": ["token", "-i", "dummy-cluster"] } } }, ] } def test_no_user_context(self): expected = FakeConfig(host=TEST_HOST) actual = FakeConfig() KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="no_user").load_and_set(actual) self.assertEqual(expected, actual) def test_simple_token(self): expected = FakeConfig( host=TEST_HOST, token=BEARER_TOKEN_FORMAT % TEST_DATA_BASE64) actual = FakeConfig() KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="simple_token").load_and_set(actual) self.assertEqual(expected, actual) def test_load_user_token(self): loader = KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="simple_token") self.assertTrue(loader._load_user_token()) self.assertEqual(BEARER_TOKEN_FORMAT % TEST_DATA_BASE64, loader.token) def test_gcp_no_refresh(self): fake_config = FakeConfig() # swagger-generated config has this, but FakeConfig does not. self.assertFalse(hasattr(fake_config, "get_api_key_with_prefix")) KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="gcp", get_google_credentials=lambda: _raise_exception("SHOULD NOT BE CALLED") ).load_and_set(fake_config) # Should now be populated with a gcp token fetcher. self.assertIsNotNone(fake_config.get_api_key_with_prefix) self.assertEqual(TEST_HOST, fake_config.host) # For backwards compatibility, authorization field should still be set. self.assertEqual(BEARER_TOKEN_FORMAT % TEST_DATA_BASE64, fake_config.api_key["authorization"]) def test_load_gcp_token_no_refresh(self): loader = KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="gcp", get_google_credentials=lambda: _raise_exception("SHOULD NOT BE CALLED")) self.assertTrue(loader._load_auth_provider_token()) self.assertEqual(BEARER_TOKEN_FORMAT % TEST_DATA_BASE64, loader.token) def test_load_gcp_token_with_refresh(self): def cred(): return None cred.token = TEST_ANOTHER_DATA_BASE64 cred.expiry = datetime.datetime.utcnow() loader = KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="expired_gcp", get_google_credentials=lambda: cred) original_expiry = _get_expiry(loader, "expired_gcp") self.assertTrue(loader._load_auth_provider_token()) new_expiry = _get_expiry(loader, "expired_gcp") # assert that the configs expiry actually updates self.assertTrue(new_expiry > original_expiry) self.assertEqual(BEARER_TOKEN_FORMAT % TEST_ANOTHER_DATA_BASE64, loader.token) def test_gcp_get_api_key_with_prefix(self): class cred_old: token = TEST_DATA_BASE64 expiry = DATETIME_EXPIRY_PAST class cred_new: token = TEST_ANOTHER_DATA_BASE64 expiry = DATETIME_EXPIRY_FUTURE fake_config = FakeConfig() _get_google_credentials = mock.Mock() _get_google_credentials.side_effect = [cred_old, cred_new] loader = KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="expired_gcp_refresh", get_google_credentials=_get_google_credentials) loader.load_and_set(fake_config) original_expiry = _get_expiry(loader, "expired_gcp_refresh") # Call GCP token fetcher. token = fake_config.get_api_key_with_prefix() new_expiry = _get_expiry(loader, "expired_gcp_refresh") self.assertTrue(new_expiry > original_expiry) self.assertEqual(BEARER_TOKEN_FORMAT % TEST_ANOTHER_DATA_BASE64, loader.token) self.assertEqual(BEARER_TOKEN_FORMAT % TEST_ANOTHER_DATA_BASE64, token) def test_oidc_no_refresh(self): loader = KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="oidc", ) self.assertTrue(loader._load_auth_provider_token()) self.assertEqual(TEST_OIDC_TOKEN, loader.token) @mock.patch("kubernetes.config.kube_config.OAuth2Session.refresh_token") @mock.patch("kubernetes.config.kube_config.ApiClient.request") def test_oidc_with_refresh(self, mock_ApiClient, mock_OAuth2Session): mock_response = mock.MagicMock() type(mock_response).status = mock.PropertyMock(return_value=200) type(mock_response).data = mock.PropertyMock( return_value=json.dumps( {"token_endpoint": "https://example.org/identity/token"})) mock_ApiClient.return_value = mock_response mock_OAuth2Session.return_value = { "id_token": "abc123", "refresh_token": "newtoken123" } loader = KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="expired_oidc", ) self.assertTrue(loader._load_auth_provider_token()) self.assertEqual("Bearer abc123", loader.token) @mock.patch("kubernetes.config.kube_config.OAuth2Session.refresh_token") @mock.patch("kubernetes.config.kube_config.ApiClient.request") def test_oidc_with_refresh_nocert(self, mock_ApiClient, mock_OAuth2Session): mock_response = mock.MagicMock() type(mock_response).status = mock.PropertyMock(return_value=200) type(mock_response).data = mock.PropertyMock( return_value=json.dumps( {"token_endpoint": "https://example.org/identity/token"})) mock_ApiClient.return_value = mock_response mock_OAuth2Session.return_value = { "id_token": "abc123", "refresh_token": "newtoken123" } loader = KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="expired_oidc_nocert", ) self.assertTrue(loader._load_auth_provider_token()) self.assertEqual("Bearer abc123", loader.token) def test_oidc_fails_if_contains_reserved_chars(self): loader = KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="oidc_contains_reserved_character", ) self.assertEqual( loader._load_oid_token("oidc_contains_reserved_character"), None, ) def test_oidc_fails_if_invalid_padding_length(self): loader = KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="oidc_invalid_padding_length", ) self.assertEqual( loader._load_oid_token("oidc_invalid_padding_length"), None, ) def test_user_pass(self): expected = FakeConfig(host=TEST_HOST, token=TEST_BASIC_TOKEN) actual = FakeConfig() KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="user_pass").load_and_set(actual) self.assertEqual(expected, actual) def test_load_user_pass_token(self): loader = KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="user_pass") self.assertTrue(loader._load_user_pass_token()) self.assertEqual(TEST_BASIC_TOKEN, loader.token) def test_ssl_no_cert_files(self): loader = KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="ssl-no_file") self.expect_exception(loader.load_and_set, "does not exists", FakeConfig()) def test_ssl(self): expected = FakeConfig( host=TEST_SSL_HOST, token=BEARER_TOKEN_FORMAT % TEST_DATA_BASE64, cert_file=self._create_temp_file(TEST_CLIENT_CERT), key_file=self._create_temp_file(TEST_CLIENT_KEY), ssl_ca_cert=self._create_temp_file(TEST_CERTIFICATE_AUTH)) actual = FakeConfig() KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="ssl").load_and_set(actual) self.assertEqual(expected, actual) def test_ssl_no_verification(self): expected = FakeConfig( host=TEST_SSL_HOST, token=BEARER_TOKEN_FORMAT % TEST_DATA_BASE64, cert_file=self._create_temp_file(TEST_CLIENT_CERT), key_file=self._create_temp_file(TEST_CLIENT_KEY), verify_ssl=False, ssl_ca_cert=None, ) actual = FakeConfig() KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="no_ssl_verification").load_and_set(actual) self.assertEqual(expected, actual) def test_list_contexts(self): loader = KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="no_user") actual_contexts = loader.list_contexts() expected_contexts = ConfigNode("", self.TEST_KUBE_CONFIG)["contexts"] for actual in actual_contexts: expected = expected_contexts.get_with_name(actual["name"]) self.assertEqual(expected.value, actual) def test_current_context(self): loader = KubeConfigLoader(config_dict=self.TEST_KUBE_CONFIG) expected_contexts = ConfigNode("", self.TEST_KUBE_CONFIG)["contexts"] self.assertEqual( expected_contexts.get_with_name("no_user").value, loader.current_context) def test_set_active_context(self): loader = KubeConfigLoader(config_dict=self.TEST_KUBE_CONFIG) loader.set_active_context("ssl") expected_contexts = ConfigNode("", self.TEST_KUBE_CONFIG)["contexts"] self.assertEqual( expected_contexts.get_with_name("ssl").value, loader.current_context) def test_ssl_with_relative_ssl_files(self): expected = FakeConfig( host=TEST_SSL_HOST, token=BEARER_TOKEN_FORMAT % TEST_DATA_BASE64, cert_file=self._create_temp_file(TEST_CLIENT_CERT), key_file=self._create_temp_file(TEST_CLIENT_KEY), ssl_ca_cert=self._create_temp_file(TEST_CERTIFICATE_AUTH)) try: temp_dir = tempfile.mkdtemp() actual = FakeConfig() with open(os.path.join(temp_dir, "cert_test"), "wb") as fd: fd.write(TEST_CERTIFICATE_AUTH.encode()) with open(os.path.join(temp_dir, "client_cert"), "wb") as fd: fd.write(TEST_CLIENT_CERT.encode()) with open(os.path.join(temp_dir, "client_key"), "wb") as fd: fd.write(TEST_CLIENT_KEY.encode()) with open(os.path.join(temp_dir, "token_file"), "wb") as fd: fd.write(TEST_DATA_BASE64.encode()) KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="ssl-local-file", config_base_path=temp_dir).load_and_set(actual) self.assertEqual(expected, actual) finally: shutil.rmtree(temp_dir) def test_load_kube_config(self): expected = FakeConfig( host=TEST_HOST, token=BEARER_TOKEN_FORMAT % TEST_DATA_BASE64) config_file = self._create_temp_file(yaml.safe_dump(self.TEST_KUBE_CONFIG)) actual = FakeConfig() load_kube_config( config_file=config_file, context="simple_token", client_configuration=actual) self.assertEqual(expected, actual) def test_list_kube_config_contexts(self): config_file = self._create_temp_file(yaml.safe_dump(self.TEST_KUBE_CONFIG)) contexts, active_context = list_kube_config_contexts( config_file=config_file) self.assertDictEqual(self.TEST_KUBE_CONFIG["contexts"][0], active_context) if PY3: self.assertCountEqual(self.TEST_KUBE_CONFIG["contexts"], contexts) else: self.assertItemsEqual(self.TEST_KUBE_CONFIG["contexts"], contexts) def test_new_client_from_config(self): config_file = self._create_temp_file(yaml.safe_dump(self.TEST_KUBE_CONFIG)) client = new_client_from_config( config_file=config_file, context="simple_token") self.assertEqual(TEST_HOST, client.configuration.host) self.assertEqual(BEARER_TOKEN_FORMAT % TEST_DATA_BASE64, client.configuration.api_key["authorization"]) def test_no_users_section(self): expected = FakeConfig(host=TEST_HOST) actual = FakeConfig() test_kube_config = self.TEST_KUBE_CONFIG.copy() del test_kube_config["users"] KubeConfigLoader( config_dict=test_kube_config, active_context="gcp").load_and_set(actual) self.assertEqual(expected, actual) def test_non_existing_user(self): expected = FakeConfig(host=TEST_HOST) actual = FakeConfig() KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="non_existing_user").load_and_set(actual) self.assertEqual(expected, actual) @mock.patch("kubernetes.config.kube_config.ExecProvider.run") def test_user_exec_auth(self, mock): token = "dummy" mock.return_value = {"token": token} expected = FakeConfig( host=TEST_HOST, api_key={"authorization": BEARER_TOKEN_FORMAT % token}) actual = FakeConfig() KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="exec_cred_user").load_and_set(actual) self.assertEqual(expected, actual) class TestKubernetesClientConfiguration(BaseTestCase): # Verifies properties of kubernetes.client.Configuration. # These tests guard against changes to the upstream configuration class, # since GCP authorization overrides get_api_key_with_prefix to refresh its # token regularly. def test_get_api_key_with_prefix_exists(self): self.assertTrue(hasattr(Configuration, "get_api_key_with_prefix")) def test_get_api_key_with_prefix_returns_token(self): expected_token = "expected_token" config = Configuration() config.api_key["authorization"] = expected_token self.assertEqual(expected_token, config.get_api_key_with_prefix("authorization")) def test_auth_settings_calls_get_api_key_with_prefix(self): expected_token = "expected_token" def fake_get_api_key_with_prefix(identifier): self.assertEqual("authorization", identifier) return expected_token config = Configuration() config.get_api_key_with_prefix = fake_get_api_key_with_prefix self.assertEqual(expected_token, config.auth_settings()["BearerToken"]["value"]) class TestKubeConfigMerger(BaseTestCase): TEST_KUBE_CONFIG_PART1 = { "current-context": "no_user", "contexts": [{ "name": "no_user", "context": { "cluster": "default" } },], "clusters": [{ "name": "default", "cluster": { "server": TEST_HOST } },], "users": [] } TEST_KUBE_CONFIG_PART2 = { "current-context": "", "contexts": [ { "name": "ssl", "context": { "cluster": "ssl", "user": "ssl" } }, { "name": "simple_token", "context": { "cluster": "default", "user": "simple_token" } }, ], "clusters": [{ "name": "ssl", "cluster": { "server": TEST_SSL_HOST, "certificate-authority-data": TEST_CERTIFICATE_AUTH_BASE64, } },], "users": [{ "name": "ssl", "user": { "token": TEST_DATA_BASE64, "client-certificate-data": TEST_CLIENT_CERT_BASE64, "client-key-data": TEST_CLIENT_KEY_BASE64, } },] } TEST_KUBE_CONFIG_PART3 = { "current-context": "no_user", "contexts": [ { "name": "expired_oidc", "context": { "cluster": "default", "user": "expired_oidc" } }, { "name": "ssl", "context": { "cluster": "skipped-part2-defined-this-context", "user": "skipped" } }, ], "clusters": [], "users": [ { "name": "expired_oidc", "user": { "auth-provider": { "name": "oidc", "config": { "client-id": "tectonic-kubectl", "client-secret": "FAKE_SECRET", "id-token": TEST_OIDC_EXPIRED_LOGIN, "idp-certificate-authority-data": TEST_OIDC_CA, "idp-issuer-url": "https://example.org/identity", "refresh-token": "lucWJjEhlxZW01cXI3YmVlcYnpxNGhzk" } } } }, { "name": "simple_token", "user": { "token": TEST_DATA_BASE64, "username": TEST_USERNAME, # should be ignored "password": TEST_PASSWORD, # should be ignored } }, ] } def _create_multi_config(self): files = [] for part in (self.TEST_KUBE_CONFIG_PART1, self.TEST_KUBE_CONFIG_PART2, self.TEST_KUBE_CONFIG_PART3): files.append(self._create_temp_file(yaml.safe_dump(part))) return ENV_KUBECONFIG_PATH_SEPARATOR.join(files) def test_list_kube_config_contexts(self): kubeconfigs = self._create_multi_config() expected_contexts = [{ "context": { "cluster": "default" }, "name": "no_user" }, { "context": { "cluster": "ssl", "user": "ssl" }, "name": "ssl" }, { "context": { "cluster": "default", "user": "simple_token" }, "name": "simple_token" }, { "context": { "cluster": "default", "user": "expired_oidc" }, "name": "expired_oidc" }] contexts, active_context = list_kube_config_contexts( config_file=kubeconfigs) self.assertEqual(contexts, expected_contexts) self.assertEqual(active_context, expected_contexts[0]) def test_new_client_from_config(self): kubeconfigs = self._create_multi_config() client = new_client_from_config( config_file=kubeconfigs, context="simple_token") self.assertEqual(TEST_HOST, client.configuration.host) self.assertEqual(BEARER_TOKEN_FORMAT % TEST_DATA_BASE64, client.configuration.api_key["authorization"]) def test_save_changes(self): kubeconfigs = self._create_multi_config() # load configuration, update token, save config kconf = KubeConfigMerger(kubeconfigs) user = kconf.config["users"].get_with_name("expired_oidc")["user"] provider = user["auth-provider"]["config"] provider.value["id-token"] = "token-changed" kconf.save_changes() # re-read configuration kconf = KubeConfigMerger(kubeconfigs) user = kconf.config["users"].get_with_name("expired_oidc")["user"] provider = user["auth-provider"]["config"] # new token self.assertEqual(provider.value["id-token"], "token-changed") if __name__ == "__main__": unittest.main()

License: MIT License

-usr-bin-env-python-copyright-2018-the-kubernetes-authors.-licensed-under-the-apache-licens's People

Contributors

matthelonianxl avatar

Stargazers

 avatar  avatar

Watchers

 avatar  avatar

Forkers

gear273

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.