Skip to content

Instantly share code, notes, and snippets.

@TVect
Created June 13, 2022 02:17
Show Gist options
  • Select an option

  • Save TVect/69f5f98bb9f575b644bf135c747dfd31 to your computer and use it in GitHub Desktop.

Select an option

Save TVect/69f5f98bb9f575b644bf135c747dfd31 to your computer and use it in GitHub Desktop.
k8s job + python api
import os
import logging
import yaml
from string import Template
from kubernetes import client, config, watch
import enum
logger = logging.getLogger(__name__)
console_handler = logging.StreamHandler()
console_handler.setLevel('INFO')
fmt = '%(asctime)s - %(funcName)s - %(lineno)s - %(levelname)s - %(message)s'
formatter = logging.Formatter(fmt)
console_handler.setFormatter(formatter)
logger.setLevel('DEBUG')
logger.addHandler(console_handler)
class KubernetesEventType(enum.Enum):
PENDING = "Pending"
ADDED = "ADDED"
MODIFIED = "MODIFIED"
DELETE = "DELETE"
SUCCEEDED = "Succeeded"
class KubeJobClient:
def __init__(self) -> None:
config.load_kube_config(os.path.join(os.path.dirname(__file__), "kube_config_dev_local.yaml"))
self.job_api = client.BatchV1Api()
self.core_api = client.CoreV1Api()
self.created_job = None
def create_job(self,
job_name,
image,
command="",
resources_limits={"cpu": "1", "memory": "1G"},
namespace="drugai-jobs"):
logger.info(
f"Creating job with name: {job_name} and image name: {image}")
if command:
container = client.V1Container(name=job_name,
image=image,
command=command,
resources={"limits": resources_limits})
else:
container = client.V1Container(name=job_name,
image=image,
resources={"limits": resources_limits})
job = client.V1Job(
api_version="batch/v1",
kind="Job",
metadata=client.V1ObjectMeta(name=job_name, namespace=namespace),
spec=client.V1JobSpec(
template=client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(labels={"app": job_name}),
spec=client.V1PodSpec(
restart_policy="Never",
containers=[container],
),
),
backoff_limit=1,
),
)
api_response = self.job_api.create_namespaced_job(body=job, namespace=namespace)
logger.info(f"Job created. status={str(api_response.status)}")
self.created_job = job
def create_job_from_template(self,
job_name,
image,
command=None,
resources_limits={"cpu": 1, "memory": "1G"},
namespace="drugai-jobs"):
with open(os.path.join(os.path.dirname(__file__), "template.yaml")) as fr:
content_template = Template(fr.read())
content_str = content_template.safe_substitute({
"job_name": job_name,
"image": image,
"namespace": namespace,
"resource_limits": resources_limits
})
content = yaml.safe_load(content_str)
if command:
content["spec"]["template"]["spec"]["containers"]["command"] = command
job = self.job_api.create_namespaced_job(body=content, namespace=namespace)
logger.info(f"Job created. status={str(job.status)}")
self.created_job = job
def watch_job(self, namespace: str = "drugai-jobs"):
if self.created_job is None:
raise RuntimeError("Cannot watch job that's not yet created.")
logger.info("Starting job watcher.")
label = self.created_job.spec.template.metadata.labels
w = watch.Watch()
job_running = False
for event in w.stream(
func=self.core_api.list_namespaced_pod,
namespace=namespace,
label_selector="{}={}".format(
list(label.keys())[0], list(label.values())[0]
),
timeout_seconds=60,
):
if event["object"].status.phase != KubernetesEventType.PENDING.value:
job_running = True
w.stop()
if event["type"] == KubernetesEventType.DELETE.value:
logger.info("Job was deleted before startup.")
w.stop()
if job_running:
pod_name = (
self.core_api.list_namespaced_pod(
namespace=namespace,
label_selector="{}={}".format(
list(label.keys())[0], list(label.values())[0]
),
)
.items[0]
.metadata.name
)
w = watch.Watch()
for line in w.stream(
func=self.core_api.read_namespaced_pod_log,
name=pod_name,
namespace=namespace,
):
logger.info(line)
w = watch.Watch()
for event in w.stream(
func=self.core_api.list_namespaced_pod,
namespace=namespace,
label_selector="{}={}".format(
list(label.keys())[0], list(label.values())[0]
),
timeout_seconds=60,
):
if event["object"].status.phase == KubernetesEventType.SUCCEEDED.value:
logger.info("Job successfully completed!")
break
def get_job_status(self):
if self.created_job is None:
raise RuntimeError("job is not yet created.")
job_name = self.created_job.metadata.name
namespace = self.created_job.metadata.namespace
job = self.job_api.read_namespaced_job_status(name=job_name, namespace=namespace)
return job.status
def is_completed(self):
if self.created_job is None:
raise RuntimeError("job is not yet created.")
job_name = self.created_job.metadata.name
namespace = self.created_job.metadata.namespace
job = self.job_api.read_namespaced_job_status(name=job_name, namespace=namespace)
return job.status.completion_time is not None
def delete_job(self):
if self.created_job is None:
raise RuntimeError("job is not yet created.")
job_name = self.created_job.metadata.name
namespace = self.created_job.metadata.namespace
delete = self.job_api.delete_namespaced_job(
name=job_name,
body=client.V1DeleteOptions(propagation_policy='Foreground', grace_period_seconds=5),
namespace=namespace)
return delete
if __name__ == "__main__":
import time
job_client = KubeJobClient()
job_params = {
"job_name": f"""batch-job-{time.strftime("%Y%m%d%H%M%S", time.localtime())}""",
"image": "luksa/batch-job",
"command": "",
"resources_limits": {"cpu": "1", "memory": "1G"},
"namespace": "drugai-jobs"
}
# job_client.create_job(**job_params)
job_client.create_job_from_template(**job_params)
job_client.watch_job()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment