微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

k8s python api二次封装 例子

k8s python api二次封装
pip install pprint   kubernetes
import urllib3
from pprint import pprint
from kubernetes import client
from os import path
import yaml

class K8sApi(object):
    def __init__(self):
        # self.config = config.kube_config.load_kube_config()
        urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
        self.configuration = client.Configuration()
        self.configuration.host = "https://192.168.100.111:6443"
        self.configuration.api_key[‘authorization‘] = ‘Bearer token‘
        self.configuration.verify_ssl = False
        self.k8s_apps_v1 = client.AppsV1Api(client.apiclient(self.configuration))
        self.Api_Instance = client.CoreV1Api(client.apiclient(self.configuration))
        self.Api_Instance_Extensions = client.ExtensionsV1beta1Api(client.apiclient(self.configuration))

    ####################################################################################################################

    def list_deployment(self,namespace="default"):
        api_response = self.k8s_apps_v1.list_namespaced_deployment(namespace)
        return api_response

    def read_deployment(self,name="Nginx-deployment",namespace="default"):
        api_response = self.k8s_apps_v1.read_namespaced_deployment(name,namespace)
        return api_response

    def create_deployment(self,file="deploy-Nginx.yaml"):
        with open(path.join(path.dirname(__file__),file)) as f:
            dep = yaml.safe_load(f)
            resp = self.k8s_apps_v1.create_namespaced_deployment(
                body=dep,namespace="default")
            return resp

    def replace_deployment(self,file="deploy-Nginx.yaml",namespace="default"):
        with open(path.join(path.dirname(__file__),file)) as f:
            dep = yaml.safe_load(f)
            resp = self.k8s_apps_v1.replace_namespaced_deployment(name,namespace,body=dep)
            return resp

    def delete_deployment(self,namespace="default"):
        api_response = self.k8s_apps_v1.delete_namespaced_deployment(name,namespace)
        return api_response

    ####################################################################################################################

    def list_node(self):
        api_response = self.Api_Instance.list_node()
        data = {}
        for i in api_response.items:
            data[i.Metadata.name] = {"name": i.Metadata.name,"status": i.status.conditions[-1].type if i.status.conditions[
                                                                                   -1].status == "True" else "NotReady","ip": i.status.addresses[0].address,"kubelet_version": i.status.node_info.kubelet_version,"os_image": i.status.node_info.os_image,}
        return data

    def list_pod(self):
        api_response = self.Api_Instance.list_pod_for_all_namespaces()
        data = {}
        for i in api_response.items:
            data[i.Metadata.name] = {"ip": i.status.pod_ip,"namespace": i.Metadata.namespace}
        return data

    def read_pod(self,name="Nginx-pod",namespace="default"):
        api_response = self.Api_Instance.read_namespaced_pod(name,namespace)
        return api_response

    def create_pod(self,file="pod-Nginx.yaml",file)) as f:
            dep = yaml.safe_load(f)
            api_response = self.Api_Instance.create_namespaced_pod(namespace,body=dep)
            return api_response

    def replace_pod(self,namespace="default"):
        dep = self.read_pod(name)
        api_response = self.Api_Instance.replace_namespaced_pod(name,body=dep)
        return api_response

    def delete_pod(self,namespace="default"):
        api_response = self.Api_Instance.delete_namespaced_pod(name,namespace)
        return api_response

    ####################################################################################################################

    def list_service(self):
        api_response = self.Api_Instance.list_service_for_all_namespaces()
        return api_response

    def read_service(self,name="",namespace="default"):
        api_response = self.Api_Instance.read_namespaced_service(name,namespace)
        return api_response

    def create_service(self,file="service-Nginx.yaml",file)) as f:
            dep = yaml.safe_load(f)
            api_response = self.Api_Instance.create_namespaced_service(namespace,body=dep)
            return api_response

    def replace_service(self,name="hequan",namespace="default"):
        dep = self.read_pod(name)
        api_response = self.Api_Instance.replace_namespaced_service(name,body=dep)
        return api_response

    def delete_service(self,namespace="default"):
        api_response = self.Api_Instance.delete_namespaced_service(name,namespace)
        return api_response

    ####################################################################################################################

    def list_ingress(self):
        api_response = self.Api_Instance_Extensions.list_ingress_for_all_namespaces()
        return api_response

    def read_ingress(self,namespace="default"):
        api_response = self.Api_Instance_Extensions.read_namespaced_ingress(name,namespace)
        return api_response

    def create_ingress(self,file="ingress-Nginx.yaml",file)) as f:
            dep = yaml.safe_load(f)
            api_response = self.Api_Instance_Extensions.create_namespaced_ingress(namespace,body=dep)
            return api_response

    def replace_ingress(self,file)) as f:
            dep = yaml.safe_load(f)
            api_response = self.Api_Instance_Extensions.replace_namespaced_ingress(name=name,namespace=namespace,body=dep)
            return api_response

    def delete_ingress(self,namespace="default"):
        api_response = self.Api_Instance_Extensions.delete_namespaced_ingress(name,namespace)
        return api_response

    #####################################################################################################################

    def list_stateful(self):
        api_response = self.k8s_apps_v1.list_stateful_set_for_all_namespaces()
        return api_response

    def read_stateful(self,namespace="default"):
        api_response = self.k8s_apps_v1.read_namespaced_stateful_set(name,namespace)
        return api_response

    def create_stateful(self,file)) as f:
            dep = yaml.safe_load(f)
            resp = self.k8s_apps_v1.create_namespaced_stateful_set(
                body=dep,namespace="default")
            return resp

    def replace_stateful(self,file)) as f:
            dep = yaml.safe_load(f)
            resp = self.k8s_apps_v1.replace_namespaced_stateful_set(name,body=dep)
            return resp

    def delete_stateful(self,namespace="default"):
        api_response = self.k8s_apps_v1.delete_namespaced_stateful_set(name,namespace)
        return api_response

    ####################################################################################################################

if __name__ == ‘__main__‘:
    def test():
        obj = K8sApi()
        ret = obj.list_deployment()
        pprint(ret)

    test()

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐