Run Jobs Using Python

Run Kueue jobs programmatically with Python

This guide is for batch users that have a basic understanding of interacting with Kubernetes from Python. For more information, see Kueue’s overview.

Before you begin

Check administer cluster quotas for details on the initial cluster setup. You’ll also need kubernetes python installed. We recommend a virtual environment.

python -m venv env
source env/bin/activate
pip install kubernetes requests

Note that the following versions were used for developing these examples:

  • Python: 3.9.12
  • kubernetes: 26.1.0
  • requests: 2.31.0

You can either follow the install instructions for Kueue, or use the install example, below.

Kueue in Python

Kueue at the core is a controller for a Custom Resource, and so to interact with it from Python we don’t need a custom SDK, but rather we can use the generic functions provided by the Kubernetes Python library. In this guide, we provide several examples for interacting with Kueue in this fashion. If you would like to request a new example or would like help for a specific use case, please open an issue.

Examples

The following examples demonstrate different use cases for using Kueue in Python.

Install Kueue

This example demonstrates installing Kueue to an existing cluster. You can save this script to your local machine as install-kueue-queues.py.

#!/usr/bin/env python3

from kubernetes import utils, config, client
import tempfile
import requests
import argparse

# install-kueue-queues.py will:
# 1. install queue from the latest or a specific version on GitHub
# This example will demonstrate installing Kueue and applying a YAML file (local) to install Kueue

# Make sure your cluster is running!
config.load_kube_config()
crd_api = client.CustomObjectsApi()
api_client = crd_api.api_client


def get_parser():
    parser = argparse.ArgumentParser(
        description="Submit Kueue Job Example",
        formatter_class=argparse.RawTextHelpFormatter,
    )
    parser.add_argument(
        "--version",
        help="Version of Kueue to install (if undefined, will install from master branch)",
        default=None,
    )
    return parser


def main():
    """
    Install Kueue and the Queue components.

    This will error if they are already installed.
    """
    parser = get_parser()
    args, _ = parser.parse_known_args()
    install_kueue(args.version)


def get_install_url(version):
    """
    Get the install version.

    If a version is specified, use it. Otherwise install
    from the main branch.
    """
    if version is not None:
        return f"https://github.com/kubernetes-sigs/kueue/releases/download/v{version}/manifests.yaml"
    return "https://github.com/kubernetes-sigs/kueue/config/default?ref=main"


def install_kueue(version):
    """
    Install Kueue of a particular version.
    """
    print("⭐️ Installing Kueue...")
    url = get_install_url(version)
    with tempfile.NamedTemporaryFile(delete=True) as install_yaml:
        res = requests.get(url)
        assert res.status_code == 200
        install_yaml.write(res.content)
        utils.create_from_yaml(api_client, install_yaml.name)


if __name__ == "__main__":
    main()

And then run as follows:

python install-kueue-queues.py 
⭐️ Installing Kueue...
⭐️ Applying queues from single-clusterqueue-setup.yaml...

You can also target a specific version:

python install-kueue-queues.py --version v0.8.3

Sample Job

For the next example, let’s start with a cluster with Kueue installed, and first create our queues:

#!/usr/bin/env python3

import argparse
from kubernetes import config, client

# create_job.py
# This example will demonstrate full steps to submit a Job.

# Make sure your cluster is running!
config.load_kube_config()
crd_api = client.CustomObjectsApi()
api_client = crd_api.api_client


def get_parser():
    parser = argparse.ArgumentParser(
        description="Submit Kueue Job Example",
        formatter_class=argparse.RawTextHelpFormatter,
    )
    parser.add_argument(
        "--job-name",
        help="generateName field to set for job",
        default="sample-job-",
    )
    parser.add_argument(
        "--image",
        help="container image to use",
        default="gcr.io/k8s-staging-perf-tests/sleep:v0.1.0",
    )
    parser.add_argument(
        "--args",
        nargs="+",
        help="args for container",
        default=["30s"],
    )
    return parser


def generate_job_crd(job_name, image, args):
    """
    Generate an equivalent job CRD to sample-job.yaml
    """
    metadata = client.V1ObjectMeta(
        generate_name=job_name, labels={"kueue.x-k8s.io/queue-name": "user-queue"}
    )

    # Job container
    container = client.V1Container(
        image=image,
        name="dummy-job",
        args=args,
        resources={
            "requests": {
                "cpu": 1,
                "memory": "200Mi",
            }
        },
    )

    # Job template
    template = {"spec": {"containers": [container], "restartPolicy": "Never"}}
    return client.V1Job(
        api_version="batch/v1",
        kind="Job",
        metadata=metadata,
        spec=client.V1JobSpec(
            parallelism=1, completions=3, suspend=True, template=template
        ),
    )


def main():
    """
    Run a job.
    """
    parser = get_parser()
    args, _ = parser.parse_known_args()

    # Generate a CRD spec
    crd = generate_job_crd(args.job_name, args.image, args.args)
    batch_api = client.BatchV1Api()
    print(f"📦️ Container image selected is {args.image}...")
    print(f"⭐️ Creating sample job with prefix {args.job_name}...")
    batch_api.create_namespaced_job("default", crd)
    print(
        'Use:\n"kubectl get queue" to see queue assignment\n"kubectl get jobs" to see jobs'
    )


if __name__ == "__main__":
    main()

And run as follows:

python sample-job.py
📦️ Container image selected is gcr.io/k8s-staging-perf-tests/sleep:v0.1.0...
⭐️ Creating sample job with prefix sample-job-...
Use:
"kubectl get queue" to see queue assignment
"kubectl get jobs" to see jobs

or try changing the name (generateName) of the job:

python sample-job.py --job-name sleep-job-
📦️ Container image selected is gcr.io/k8s-staging-perf-tests/sleep:v0.1.0...
⭐️ Creating sample job with prefix sleep-job-...
Use:
"kubectl get queue" to see queue assignment
"kubectl get jobs" to see jobs

You can also change the container image with --image and args with --args. For more customization, you can edit the example script.

Interact with Queues and Jobs

If you are developing an application that submits jobs and needs to interact with and check on them, you likely want to interact with queues or jobs directly. After running the example above, you can test the following example to interact with the results. Write the following to a script called sample-queue-control.py.

#!/usr/bin/env python3

import argparse
from kubernetes import config, client

# sample-queue-control.py
# This will show how to interact with queues

# Make sure your cluster is running!
config.load_kube_config()
crd_api = client.CustomObjectsApi()
api_client = crd_api.api_client


def get_parser():
    parser = argparse.ArgumentParser(
        description="Interact with Queues e",
        formatter_class=argparse.RawTextHelpFormatter,
    )
    parser.add_argument(
        "--namespace",
        help="namespace to list for",
        default="default",
    )
    return parser


def main():
    """
    Get a listing of jobs in the queue
    """
    parser = get_parser()
    args, _ = parser.parse_known_args()

    listing = crd_api.list_namespaced_custom_object(
        group="kueue.x-k8s.io",
        version="v1beta1",
        namespace=args.namespace,
        plural="localqueues",
    )
    list_queues(listing)

    listing = crd_api.list_namespaced_custom_object(
        group="batch",
        version="v1",
        namespace=args.namespace,
        plural="jobs",
    )
    list_jobs(listing)


def list_jobs(listing):
    """
    Iterate and show job metadata.
    """
    if not listing:
        print("💼️ There are no jobs.")
        return

    print("\n💼️ Jobs")
    for job in listing["items"]:
        jobname = job["metadata"]["name"]
        status = (
            "TBA" if "succeeded" not in job["status"] else job["status"]["succeeded"]
        )
        ready = job["status"]["ready"]
        print(f"Found job {jobname}")
        print(f"  Succeeded: {status}")
        print(f"  Ready: {ready}")


def list_queues(listing):
    """
    Helper function to iterate over and list queues.
    """
    if not listing:
        print("⛑️  There are no queues.")
        return

    print("\n⛑️  Local Queues")

    # This is listing queues
    for q in listing["items"]:
        print(f'Found queue {q["metadata"]["name"]}')
        print(f"  Admitted workloads: {q['status']['admittedWorkloads']}")
        print(f"  Pending workloads: {q['status']['pendingWorkloads']}")

        # And flavors with resources
        for f in q["status"]["flavorUsage"]:
            print(f'  Flavor {f["name"]} has resources {f["resources"]}')


if __name__ == "__main__":
    main()

To make the output more interesting, we can run a few random jobs first:

python sample-job.py
python sample-job.py
python sample-job.py --job-name tacos

And then run the script to see your queue and sample job that you submit previously.

python sample-queue-control.py
⛑️  Local Queues
Found queue user-queue
  Admitted workloads: 3
  Pending workloads: 0
  Flavor default-flavor has resources [{'name': 'cpu', 'total': '3'}, {'name': 'memory', 'total': '600Mi'}]

💼️ Jobs
Found job sample-job-8n5sb
  Succeeded: 3
  Ready: 0
Found job sample-job-gnxtl
  Succeeded: 1
  Ready: 0
Found job tacos46bqw
  Succeeded: 1
  Ready: 1

If you wanted to filter jobs to a specific queue, you can do this via the job labels under `job[“metadata”][“labels”][“kueue.x-k8s.io/queue-name”]’. To list a specific job by name, you can do:

from kubernetes import client, config

# Interact with batch
config.load_kube_config()
batch_api = client.BatchV1Api()

# This is providing the name, and namespace
job = batch_api.read_namespaced_job("tacos46bqw", "default")
print(job)

See the BatchV1 API documentation for more calls.

Flux Operator Job

For this example, we will be using the Flux Operator to submit a job, and specifically using the Python SDK to do this easily. Given our Python environment created in the setup, we can install this Python SDK directly to it as follows:

pip install fluxoperator

We will also need to install the Flux operator.

kubectl apply -f https://raw.githubusercontent.com/flux-framework/flux-operator/main/examples/dist/flux-operator.yaml

Write the following script to sample-flux-operator-job.py:

#!/usr/bin/env python3

import argparse
from kubernetes import config, client
import fluxoperator.models as models

# sample-flux-operator.py
# This example will demonstrate full steps to submit a Job via the Flux Operator.

# Make sure your cluster is running!
config.load_kube_config()
crd_api = client.CustomObjectsApi()
api_client = crd_api.api_client


def get_parser():
    parser = argparse.ArgumentParser(
        description="Submit Kueue Flux Operator Job Example",
        formatter_class=argparse.RawTextHelpFormatter,
    )
    parser.add_argument(
        "--job-name",
        help="generateName field to set for job (job prefix does not work here)",
        default="hello-world",
    )
    parser.add_argument(
        "--image",
        help="container image to use",
        default="ghcr.io/flux-framework/flux-restful-api",
    )
    parser.add_argument(
        "--tasks",
        help="Number of tasks",
        default=1,
        type=int,
    )
    parser.add_argument(
        "--quiet",
        help="Do not show extra flux output (only hello worlds!)",
        action="store_true",
        default=False,
    )

    parser.add_argument(
        "--command",
        help="command to run",
        default="echo",
    )
    parser.add_argument(
        "--args", nargs="+", help="args for container", default=["hello", "world"]
    )
    return parser


def generate_minicluster_crd(job_name, image, command, args, quiet=False, tasks=1):
    """
    Generate a minicluster CRD
    """
    container = models.MiniClusterContainer(
        command=command + " " + " ".join(args),
        resources={
            "limits": {
                "cpu": 1,
                "memory": "2Gi",
            }
        },
    )

    # 4 pods and 4 tasks will echo hello-world x 4
    spec = models.MiniClusterSpec(
        job_labels={"kueue.x-k8s.io/queue-name": "user-queue"},
        containers=[container],
        size=4,
        tasks=tasks,
        logging={"quiet": quiet},
    )

    return models.MiniCluster(
        kind="MiniCluster",
        api_version="flux-framework.org/v1alpha1",
        metadata=client.V1ObjectMeta(
            generate_name=job_name,
            namespace="default",
        ),
        spec=spec,
    )


def main():
    """
    Run an example job using the Flux Operator.
    """
    parser = get_parser()
    args, _ = parser.parse_known_args()

    # Generate a CRD spec
    minicluster = generate_minicluster_crd(
        args.job_name, args.image, args.command, args.args, args.quiet, args.tasks
    )
    crd_api = client.CustomObjectsApi()

    print(f"📦️ Container image selected is {args.image}...")
    print(f"⭐️ Creating sample job with prefix {args.job_name}...")
    crd_api.create_namespaced_custom_object(
        group="flux-framework.org",
        version="v1alpha1",
        namespace="default",
        plural="miniclusters",
        body=minicluster,
    )
    print(
        'Use:\n"kubectl get queue" to see queue assignment\n"kubectl get pods" to see pods'
    )


if __name__ == "__main__":
    main()

Now try running the example:

python sample-flux-operator-job.py
📦️ Container image selected is ghcr.io/flux-framework/flux-restful-api...
⭐️ Creating sample job with prefix hello-world...
Use:
"kubectl get queue" to see queue assignment
"kubectl get pods" to see pods

You’ll be able to almost immediately see the MiniCluster job admitted to the local queue:

kubectl get queue
NAME         CLUSTERQUEUE    PENDING WORKLOADS   ADMITTED WORKLOADS
user-queue   cluster-queue   0                   1

And the 4 pods running (we are creating a networked cluster with 4 nodes):

kubectl get pods
NAME                       READY   STATUS      RESTARTS   AGE
hello-world7qgqd-0-wp596   1/1     Running     0          7s
hello-world7qgqd-1-d7r87   1/1     Running     0          7s
hello-world7qgqd-2-rfn4t   1/1     Running     0          7s
hello-world7qgqd-3-blvtn   1/1     Running     0          7s

If you look at logs of the main broker pod (index 0 of the job above), there is a lot of output for debugging, and you can see “hello world” running at the end:

kubectl logs hello-world7qgqd-0-wp596 
Flux Operator Lead Broker Output
🌀 Submit Mode: flux start -o --config /etc/flux/config -Scron.directory=/etc/flux/system/cron.d   -Stbon.fanout=256   -Srundir=/run/flux    -Sstatedir=/var/lib/flux   -Slocal-uri=local:///run/flux/local     -Slog-stderr-level=6    -Slog-stderr-mode=local  flux submit  -n 1 --quiet  --watch echo hello world
broker.info[0]: start: none->join 0.399725ms
broker.info[0]: parent-none: join->init 0.030894ms
cron.info[0]: synchronizing cron tasks to event heartbeat.pulse
job-manager.info[0]: restart: 0 jobs
job-manager.info[0]: restart: 0 running jobs
job-manager.info[0]: restart: checkpoint.job-manager not found
broker.info[0]: rc1.0: running /etc/flux/rc1.d/01-sched-fluxion
sched-fluxion-resource.info[0]: version 0.27.0-15-gc90fbcc2
sched-fluxion-resource.warning[0]: create_reader: allowlist unsupported
sched-fluxion-resource.info[0]: populate_resource_db: loaded resources from core's resource.acquire
sched-fluxion-qmanager.info[0]: version 0.27.0-15-gc90fbcc2
broker.info[0]: rc1.0: running /etc/flux/rc1.d/02-cron
broker.info[0]: rc1.0: /etc/flux/rc1 Exited (rc=0) 0.5s
broker.info[0]: rc1-success: init->quorum 0.485239s
broker.info[0]: online: hello-world7qgqd-0 (ranks 0)
broker.info[0]: online: hello-world7qgqd-[0-3] (ranks 0-3)
broker.info[0]: quorum-full: quorum->run 0.354587s
hello world
broker.info[0]: rc2.0: flux submit -n 1 --quiet --watch echo hello world Exited (rc=0) 0.3s
broker.info[0]: rc2-success: run->cleanup 0.308392s
broker.info[0]: cleanup.0: flux queue stop --quiet --all --nocheckpoint Exited (rc=0) 0.1s
broker.info[0]: cleanup.1: flux cancel --user=all --quiet --states RUN Exited (rc=0) 0.1s
broker.info[0]: cleanup.2: flux queue idle --quiet Exited (rc=0) 0.1s
broker.info[0]: cleanup-success: cleanup->shutdown 0.252899s
broker.info[0]: children-complete: shutdown->finalize 47.6699ms
broker.info[0]: rc3.0: running /etc/flux/rc3.d/01-sched-fluxion
broker.info[0]: rc3.0: /etc/flux/rc3 Exited (rc=0) 0.2s
broker.info[0]: rc3-success: finalize->goodbye 0.212425s
broker.info[0]: goodbye: goodbye->exit 0.06917ms

If you submit and ask for four tasks, you’ll see “hello world” four times:

python sample-flux-operator-job.py --tasks 4
...
broker.info[0]: quorum-full: quorum->run 23.5812s
hello world
hello world
hello world
hello world

You can further customize the job, and can ask questions on the Flux Operator issues board. Finally, for instructions for how to do this with YAML outside of Python, see Run A Flux MiniCluster.

MPI Operator Job

For this example, we will be using the MPI Operator to submit a job, and specifically using the Python SDK to do this easily. Given our Python environment created in the setup, we can install this Python SDK directly to it as follows:

git clone --depth 1 https://github.com/kubeflow/mpi-operator /tmp/mpijob
cd /tmp/mpijob/sdk/python/v2beta1
python setup.py install
cd -

Importantly, the MPI Operator must be installed before Kueue for this to work! Let’s start from scratch with a new Kind cluster. We will also need to install the MPI operator and Kueue. Here we install the exact versions tested with this example:

kubectl apply -f https://github.com/kubeflow/mpi-operator/releases/download/v0.4.0/mpi-operator.yaml
kubectl apply -f https://github.com/kubernetes-sigs/kueue/releases/download/v0.4.0/manifests.yaml

Check the mpi-operator release page and Kueue release page for alternate versions. You need to wait until Kueue is ready. You can determine this as follows:

# Wait until you see all pods in the kueue-system are Running
kubectl get pods -n kueue-system

When Kueue is ready:

kubectl apply -f https://raw.githubusercontent.com/kubernetes-sigs/kueue/main/site/static/examples/admin/single-clusterqueue-setup.yaml

Now try running the example MPI job.

python sample-mpijob.py
📦️ Container image selected is mpioperator/mpi-pi:openmpi...
⭐️ Creating sample job with prefix pi...
Use:
"kubectl get queue" to see queue assignment
"kubectl get jobs" to see jobs
#!/usr/bin/env python3

import argparse
from kubernetes import config, client
import mpijob.models as models

# sample-mpijob.py
# This example will demonstrate full steps to submit a Job via the MPI Operator

# Make sure your cluster is running!
config.load_kube_config()
crd_api = client.CustomObjectsApi()
api_client = crd_api.api_client


def get_parser():
    parser = argparse.ArgumentParser(
        description="Submit Kueue MPI Operator Job Example",
        formatter_class=argparse.RawTextHelpFormatter,
    )
    parser.add_argument(
        "--job-name",
        help="generateName field to set for job (job prefix does not work here)",
        default="pi",
    )
    parser.add_argument(
        "--image",
        help="container image to use",
        default="mpioperator/mpi-pi:openmpi",
    )
    parser.add_argument(
        "--command",
        help="command to run",
        default="mpirun",
    )
    parser.add_argument(
        "--args",
        nargs="+",
        help="args for container",
        default=["-n", "2", "/home/mpiuser/pi"],
    )
    return parser


def generate_job_crd(job_name, image, command, args):
    """
    Generate an equivalent job CRD to sample-job.yaml
    """
    metadata = client.V1ObjectMeta(
        name=job_name, labels={"kueue.x-k8s.io/queue-name": "user-queue"}
    )

    # containers for launcher and worker
    launcher_container = client.V1Container(
        image=image,
        name="mpi-launcher",
        command=[command],
        args=args,
        security_context=client.V1SecurityContext(run_as_user=1000),
        resources={
            "limits": {
                "cpu": 1,
                "memory": "1Gi",
            }
        },
    )

    worker_container = client.V1Container(
        image=image,
        name="mpi-worker",
        command=["/usr/sbin/sshd"],
        args=["-De", "-f", "/home/mpiuser/.sshd_config"],
        security_context=client.V1SecurityContext(run_as_user=1000),
        resources={
            "limits": {
                "cpu": 1,
                "memory": "1Gi",
            }
        },
    )

    # Create the Launcher and worker replica specs
    launcher = models.V2beta1ReplicaSpec(
        replicas=1,
        template=client.V1PodTemplateSpec(
            spec=client.V1PodSpec(containers=[launcher_container])
        ),
    )

    worker = models.V2beta1ReplicaSpec(
        replicas=2,
        template=client.V1PodTemplateSpec(
            spec=client.V1PodSpec(containers=[worker_container])
        ),
    )

    # runPolicy for jobspec
    policy = models.V2beta1RunPolicy(
        clean_pod_policy="Running", ttl_seconds_after_finished=60
    )

    # Create the jobspec
    jobspec = models.V2beta1MPIJobSpec(
        slots_per_worker=1,
        run_policy=policy,
        ssh_auth_mount_path="/home/mpiuser/.ssh",
        mpi_replica_specs={"Launcher": launcher, "Worker": worker},
    )
    return models.V2beta1MPIJob(
        metadata=metadata,
        api_version="kubeflow.org/v2beta1",
        kind="MPIJob",
        spec=jobspec,
    )


def main():
    """
    Run an MPI job. This requires the MPI Operator to be installed.
    """
    parser = get_parser()
    args, _ = parser.parse_known_args()

    # Generate a CRD spec
    crd = generate_job_crd(args.job_name, args.image, args.command, args.args)
    crd_api = client.CustomObjectsApi()

    print(f"📦️ Container image selected is {args.image}...")
    print(f"⭐️ Creating sample job with prefix {args.job_name}...")
    crd_api.create_namespaced_custom_object(
        group="kubeflow.org",
        version="v2beta1",
        namespace="default",
        plural="mpijobs",
        body=crd,
    )
    print(
        'Use:\n"kubectl get queue" to see queue assignment\n"kubectl get jobs" to see jobs'
    )


if __name__ == "__main__":
    main()

After submit, you can see that the queue has an admitted workload!

$ kubectl get queue
NAME         CLUSTERQUEUE    PENDING WORKLOADS   ADMITTED WORKLOADS
user-queue   cluster-queue   0                   1

And that the job “pi-launcher” has started:

$ kubectl get jobs
NAME          COMPLETIONS   DURATION   AGE
pi-launcher   0/1           9s         9s

The MPI Operator works by way of a central launcher interacting with nodes via ssh. We can inspect a worker and the launcher to get a glimpse of how both work:

$ kubectl logs pods/pi-worker-1 
Server listening on 0.0.0.0 port 22.
Server listening on :: port 22.
Accepted publickey for mpiuser from 10.244.0.8 port 51694 ssh2: ECDSA SHA256:rgZdwufXolOkUPA1w0bf780BNJC8e4/FivJb1/F7OOI
Received disconnect from 10.244.0.8 port 51694:11: disconnected by user
Disconnected from user mpiuser 10.244.0.8 port 51694
Received signal 15; terminating.

The job is fairly quick, and we can see the output of pi in the launcher:

$ kubectl logs pods/pi-launcher-f4gqv 
Warning: Permanently added 'pi-worker-0.pi-worker.default.svc,10.244.0.7' (ECDSA) to the list of known hosts.
Warning: Permanently added 'pi-worker-1.pi-worker.default.svc,10.244.0.9' (ECDSA) to the list of known hosts.
Rank 1 on host pi-worker-1
Workers: 2
Rank 0 on host pi-worker-0
pi is approximately 3.1410376000000002

That looks like pi! 🎉️🥧️ If you are interested in running this same example with YAML outside of Python, see Run an MPIJob.