代码案例:
import base64
import requests
def main():
# 你的 Kubernetes API Server 地址
api_server_url = 'https://xx.xx.xx.xx:6443'
# 你的 Base64 编码后的服务端证书字符串
encoded_server_cert = 'LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tC' # 在这里填写 Base64 编码后的客户端证书字符串
# 你的 Base64 编码后的客户端私钥字符串
encoded_client_key = 'LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLR' # 在这里填写 Base64 编码后的客户端私钥字符串
# 对 Base64 编码后的证书进行解码
server_cert_data = base64.b64decode(encoded_server_cert)
client_cert_data = base64.b64decode(encoded_client_cert)
client_key_data = base64.b64decode(encoded_client_key)
# 将解码后的证书数据写入文件
with open('server.crt', 'wb') as server_cert_file, \
open('client.crt', 'wb') as client_cert_file, \
open('client.key', 'wb') as client_key_file:
server_cert_file.write(server_cert_data)
client_cert_file.write(client_cert_data)
client_key_file.write(client_key_data)
try:
# 构建请求示例,获取 Pod 列表
pods_url = f'{api_server_url}/api/v1/nodes'
response = requests.get(pods_url, verify='server.crt', cert=('client.crt', 'client.key'))
# 处理响应结果
if response.status_code == 200:
pods_data = response.json()
print("Pods in the 'default' namespace:")
for pod in pods_data['items']:
print(pod['metadata']['name'])
else:
print(f"Failed to get pods. Status code: {response.status_code}")
print("Error message:", response.text)
except requests.exceptions.RequestException as e:
print("Request Exception:", e)
if __name__ == "__main__":
main()
版权声明:本文为LONG_Yi_1994原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。