Created
June 13, 2022 02:17
-
-
Save TVect/69f5f98bb9f575b644bf135c747dfd31 to your computer and use it in GitHub Desktop.
k8s job + python api
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import logging | |
| import yaml | |
| from string import Template | |
| from kubernetes import client, config, watch | |
| import enum | |
| logger = logging.getLogger(__name__) | |
| console_handler = logging.StreamHandler() | |
| console_handler.setLevel('INFO') | |
| fmt = '%(asctime)s - %(funcName)s - %(lineno)s - %(levelname)s - %(message)s' | |
| formatter = logging.Formatter(fmt) | |
| console_handler.setFormatter(formatter) | |
| logger.setLevel('DEBUG') | |
| logger.addHandler(console_handler) | |
| class KubernetesEventType(enum.Enum): | |
| PENDING = "Pending" | |
| ADDED = "ADDED" | |
| MODIFIED = "MODIFIED" | |
| DELETE = "DELETE" | |
| SUCCEEDED = "Succeeded" | |
| class KubeJobClient: | |
| def __init__(self) -> None: | |
| config.load_kube_config(os.path.join(os.path.dirname(__file__), "kube_config_dev_local.yaml")) | |
| self.job_api = client.BatchV1Api() | |
| self.core_api = client.CoreV1Api() | |
| self.created_job = None | |
| def create_job(self, | |
| job_name, | |
| image, | |
| command="", | |
| resources_limits={"cpu": "1", "memory": "1G"}, | |
| namespace="drugai-jobs"): | |
| logger.info( | |
| f"Creating job with name: {job_name} and image name: {image}") | |
| if command: | |
| container = client.V1Container(name=job_name, | |
| image=image, | |
| command=command, | |
| resources={"limits": resources_limits}) | |
| else: | |
| container = client.V1Container(name=job_name, | |
| image=image, | |
| resources={"limits": resources_limits}) | |
| job = client.V1Job( | |
| api_version="batch/v1", | |
| kind="Job", | |
| metadata=client.V1ObjectMeta(name=job_name, namespace=namespace), | |
| spec=client.V1JobSpec( | |
| template=client.V1PodTemplateSpec( | |
| metadata=client.V1ObjectMeta(labels={"app": job_name}), | |
| spec=client.V1PodSpec( | |
| restart_policy="Never", | |
| containers=[container], | |
| ), | |
| ), | |
| backoff_limit=1, | |
| ), | |
| ) | |
| api_response = self.job_api.create_namespaced_job(body=job, namespace=namespace) | |
| logger.info(f"Job created. status={str(api_response.status)}") | |
| self.created_job = job | |
| def create_job_from_template(self, | |
| job_name, | |
| image, | |
| command=None, | |
| resources_limits={"cpu": 1, "memory": "1G"}, | |
| namespace="drugai-jobs"): | |
| with open(os.path.join(os.path.dirname(__file__), "template.yaml")) as fr: | |
| content_template = Template(fr.read()) | |
| content_str = content_template.safe_substitute({ | |
| "job_name": job_name, | |
| "image": image, | |
| "namespace": namespace, | |
| "resource_limits": resources_limits | |
| }) | |
| content = yaml.safe_load(content_str) | |
| if command: | |
| content["spec"]["template"]["spec"]["containers"]["command"] = command | |
| job = self.job_api.create_namespaced_job(body=content, namespace=namespace) | |
| logger.info(f"Job created. status={str(job.status)}") | |
| self.created_job = job | |
| def watch_job(self, namespace: str = "drugai-jobs"): | |
| if self.created_job is None: | |
| raise RuntimeError("Cannot watch job that's not yet created.") | |
| logger.info("Starting job watcher.") | |
| label = self.created_job.spec.template.metadata.labels | |
| w = watch.Watch() | |
| job_running = False | |
| for event in w.stream( | |
| func=self.core_api.list_namespaced_pod, | |
| namespace=namespace, | |
| label_selector="{}={}".format( | |
| list(label.keys())[0], list(label.values())[0] | |
| ), | |
| timeout_seconds=60, | |
| ): | |
| if event["object"].status.phase != KubernetesEventType.PENDING.value: | |
| job_running = True | |
| w.stop() | |
| if event["type"] == KubernetesEventType.DELETE.value: | |
| logger.info("Job was deleted before startup.") | |
| w.stop() | |
| if job_running: | |
| pod_name = ( | |
| self.core_api.list_namespaced_pod( | |
| namespace=namespace, | |
| label_selector="{}={}".format( | |
| list(label.keys())[0], list(label.values())[0] | |
| ), | |
| ) | |
| .items[0] | |
| .metadata.name | |
| ) | |
| w = watch.Watch() | |
| for line in w.stream( | |
| func=self.core_api.read_namespaced_pod_log, | |
| name=pod_name, | |
| namespace=namespace, | |
| ): | |
| logger.info(line) | |
| w = watch.Watch() | |
| for event in w.stream( | |
| func=self.core_api.list_namespaced_pod, | |
| namespace=namespace, | |
| label_selector="{}={}".format( | |
| list(label.keys())[0], list(label.values())[0] | |
| ), | |
| timeout_seconds=60, | |
| ): | |
| if event["object"].status.phase == KubernetesEventType.SUCCEEDED.value: | |
| logger.info("Job successfully completed!") | |
| break | |
| def get_job_status(self): | |
| if self.created_job is None: | |
| raise RuntimeError("job is not yet created.") | |
| job_name = self.created_job.metadata.name | |
| namespace = self.created_job.metadata.namespace | |
| job = self.job_api.read_namespaced_job_status(name=job_name, namespace=namespace) | |
| return job.status | |
| def is_completed(self): | |
| if self.created_job is None: | |
| raise RuntimeError("job is not yet created.") | |
| job_name = self.created_job.metadata.name | |
| namespace = self.created_job.metadata.namespace | |
| job = self.job_api.read_namespaced_job_status(name=job_name, namespace=namespace) | |
| return job.status.completion_time is not None | |
| def delete_job(self): | |
| if self.created_job is None: | |
| raise RuntimeError("job is not yet created.") | |
| job_name = self.created_job.metadata.name | |
| namespace = self.created_job.metadata.namespace | |
| delete = self.job_api.delete_namespaced_job( | |
| name=job_name, | |
| body=client.V1DeleteOptions(propagation_policy='Foreground', grace_period_seconds=5), | |
| namespace=namespace) | |
| return delete | |
| if __name__ == "__main__": | |
| import time | |
| job_client = KubeJobClient() | |
| job_params = { | |
| "job_name": f"""batch-job-{time.strftime("%Y%m%d%H%M%S", time.localtime())}""", | |
| "image": "luksa/batch-job", | |
| "command": "", | |
| "resources_limits": {"cpu": "1", "memory": "1G"}, | |
| "namespace": "drugai-jobs" | |
| } | |
| # job_client.create_job(**job_params) | |
| job_client.create_job_from_template(**job_params) | |
| job_client.watch_job() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment