Introductory example - using the APIΒΆ

This small tutorial walks through the basics of using an API. The most basic thing to do is submit a job to the API, list, get statuses, cancel. We can use our example client for this.

import json
import os
import sys
import time

import matplotlib.pyplot as plt
from flux_restful_client.main import get_client

# This is expected to be rendered from docs root
here = os.path.dirname(os.path.abspath(os.getcwd()))

# This is here for the nice thumbnail :)
image = plt.imread(os.path.join(here, "images", "logo.png"))
fig = plt.imshow(image)
plt.axis("off")
plt.show()

Here we instantiate a client. If you need authentication, this can optionally take a user and token, or also derive from the FLUX_USER and FLUX_TOKEN in the environment.

cli = get_client()

Let’s list the nodes in our cluster!

print("Listing nodes")
res = cli.list_nodes()
if res:
    print(json.dumps(res, indent=4))
Listing nodes
{
    "nodes": [
        "57d0f60fce2e"
    ]
}

Now let’s submit a job to Flux.

print("😴 Submitting job sleep 60")
res = cli.submit(command=["sleep", 60])

# This is an indication something went wrong - detail has an error.
if res and "detail" in res:
    print(res["detail"])
    sys.exit()
😴 Submitting job sleep 60

To require auth, the server should be startup with these variables in the environment (and the first two found by the client here) variables exported: FLUX_USER=fluxuser FLUX_TOKEN=12345 FLUX_REQUIRE_AUTH=true

And finally, let’s get job info.

print("πŸ“ Getting job info...")
res = cli.jobs(res["id"])
if res:
    print(json.dumps(res, indent=4))
πŸ“ Getting job info...
{
    "id": 361295786278912,
    "userid": 0,
    "urgency": 16,
    "priority": 16,
    "t_submit": 1668305924.5898263,
    "t_depend": 1668305924.5898263,
    "t_run": 1668305924.6036532,
    "state": "RUN",
    "name": "sleep",
    "ntasks": 1,
    "ncores": 1,
    "duration": 0.0,
    "nnodes": 1,
    "ranks": "3",
    "nodelist": "57d0f60fce2e",
    "expiration": 4821905924.0,
    "result": "",
    "returncode": "",
    "runtime": 0.4636096954345703,
    "waitstatus": "",
    "exception": {
        "occurred": "",
        "severity": "",
        "type": "",
        "note": ""
    }
}

And job logs This will be added to the client

print("😴 Submitting job to echo pancakes πŸ₯žπŸ₯žπŸ₯ž")
res = cli.submit(command="echo pancakes are really just morning cakes.")
time.sleep(5)
res = cli.output(res["id"])
if res:
    print(json.dumps(res, indent=4))
😴 Submitting job to echo pancakes πŸ₯žπŸ₯žπŸ₯ž
{
    "Output": [
        "pancakes are really just morning cakes.\n"
    ]
}

Now let’s submit three jobs in unison so we can list them back! Submit the job to flux

print("Submitting 3 jobs to sleep!")
for seconds in [10, 20, 30]:
    cli.submit(command=["sleep", seconds])
res = cli.jobs()
if res:
    print(json.dumps(res, indent=4))
Submitting 3 jobs to sleep!
{
    "jobs": [
        {
            "id": 361393211572224
        },
        {
            "id": 361392691478528
        },
        {
            "id": 361392037167104
        },
        {
            "id": 361295786278912
        },
        {
            "id": 360411408891904
        },
        {
            "id": 361304158109696
        },
        {
            "id": 360500059701248
        },
        {
            "id": 360499539607552
        },
        {
            "id": 360499053068288
        },
        {
            "id": 360502072967168
        },
        {
            "id": 360412012871680
        },
        {
            "id": 356049030742016
        },
        {
            "id": 356053023719424
        },
        {
            "id": 356051731873792
        },
        {
            "id": 356051111116800
        },
        {
            "id": 356054131015680
        },
        {
            "id": 356049668276224
        },
        {
            "id": 323743595364352
        },
        {
            "id": 323754819321856
        },
        {
            "id": 323754148233216
        },
        {
            "id": 323752906719232
        },
        {
            "id": 323756681592832
        },
        {
            "id": 323752151744512
        },
        {
            "id": 322373450465280
        },
        {
            "id": 322377242116096
        },
        {
            "id": 322376034156544
        },
        {
            "id": 322375480508416
        },
        {
            "id": 322379221827584
        },
        {
            "id": 322374071222272
        },
        {
            "id": 314610330632192
        },
        {
            "id": 314609810538496
        },
        {
            "id": 313911777689600
        },
        {
            "id": 314609307222016
        },
        {
            "id": 314380113674240
        },
        {
            "id": 310431579111424
        },
        {
            "id": 310434615787520
        },
        {
            "id": 310434011807744
        },
        {
            "id": 310433458159616
        },
        {
            "id": 310443658706944
        },
        {
            "id": 310432183091200
        },
        {
            "id": 306433887305728
        },
        {
            "id": 306436286447616
        },
        {
            "id": 305828548575232
        },
        {
            "id": 306435061710848
        },
        {
            "id": 306434508062720
        },
        {
            "id": 306436873650176
        },
        {
            "id": 244509770252288
        },
        {
            "id": 116794505297920
        },
        {
            "id": 39064489164800
        },
        {
            "id": 38133571780608
        },
        {
            "id": 36512691388416
        },
        {
            "id": 30820953751552
        },
        {
            "id": 26132493631488
        }
    ]
}

And this is how to search (with a start, length, or query)

print("πŸŒ“ Querying jobs!")
res = cli.search("sleep", start=1, length=2)
if res:
    print(json.dumps(res, indent=4))
πŸŒ“ Querying jobs!
{
    "data": [
        {
            "id": 361392691478528,
            "userid": 0,
            "urgency": 16,
            "priority": 16,
            "t_submit": 1668305930.3657866,
            "t_depend": 1668305930.3657866,
            "t_run": 1668305930.3788106,
            "state": "RUN",
            "name": "sleep",
            "ntasks": 1,
            "ncores": 1,
            "duration": 0.0,
            "nnodes": 1,
            "ranks": "3",
            "nodelist": "57d0f60fce2e",
            "expiration": 4821905930.0,
            "result": "",
            "returncode": "",
            "runtime": 0.09533405303955078,
            "waitstatus": "",
            "exception": {
                "occurred": "",
                "severity": "",
                "type": "",
                "note": ""
            }
        },
        {
            "id": 361392037167104,
            "userid": 0,
            "urgency": 16,
            "priority": 16,
            "t_submit": 1668305930.331216,
            "t_depend": 1668305930.331216,
            "t_run": 1668305930.3446293,
            "state": "RUN",
            "name": "sleep",
            "ntasks": 1,
            "ncores": 1,
            "duration": 0.0,
            "nnodes": 1,
            "ranks": "3",
            "nodelist": "57d0f60fce2e",
            "expiration": 4821905930.0,
            "result": "",
            "returncode": "",
            "runtime": 0.13027596473693848,
            "waitstatus": "",
            "exception": {
                "occurred": "",
                "severity": "",
                "type": "",
                "note": ""
            }
        }
    ],
    "draw": 1,
    "recordsTotal": 53,
    "recordsFiltered": 2
}

Finally, let’s submit and cancel a job

print("Submitting job sleep 60 intending to cancel..")
res = cli.submit(command=["sleep", 60])
if res:
    print(json.dumps(res, indent=4))
    print("Requesting job cancel..")
    res = cli.cancel(res["id"])
    print(json.dumps(res, indent=4))
Submitting job sleep 60 intending to cancel..
{
    "Message": "Job submit.",
    "id": 361395661045760
}
Requesting job cancel..
{
    "Message": "Job is requested to cancel."
}

And this would be how you stop your cluster service

print("Stopping the service...")
# res = cli.stop_service()
Stopping the service...

Total running time of the script: ( 0 minutes 6.262 seconds)

Gallery generated by Sphinx-Gallery


Last update: Mar 10, 2024