State

We are experimenting with saving state for the Flux Operator, which can have several different levels of difficulty:

  • Saving state of the jobs queue and metadata, after runs are complete (between two MiniClusters)

  • Saving state of the jobs queue and metadata, pausing a queue in the middle and resuming.

  • Saving state of the jobs queue and metadata, and including filesystem (storage) assets.

These small tutorials will walk through examples of each. The most likely use cases for doing this will be using the Flux Operator Python SDK (since we need to create multiple clusters) in a reasonable way) but for the purposes of explanation, minicluster.yaml files are provided as well. One important note is that since we cannot control the timing of a pod termination, while we can have Flux automatically load a saved archive, for the process to wait for jobs to finish and then dump the archive anew, we rely on issuing a command to the MiniCluster (done by a script or workflow tool). This can likely be improved upon.

Saving Pending Jobs

Pausing scheduling and the queue in a populated queue

This example shows (via the Python SDK) how we can pause and stop a running queue and move the jobs to a new MiniCluster to continue.

Tutorial File

To run this example:

$ python sdk/python/v1alpha1/examples/state-pending-jobs-minicluster.py 

Using this example, we are able to (with slight modification) test:

  • Starting jobs on one cluster and running on another

  • Changing the size of the cluster to be larger

  • Changing the size of the cluster to be smaller

For the different cases, you can adjust the original size (and updated size) in the script by changing the minicluster.size. All cases are successful to pause and resume on the new cluster (regardless of size). Make sure (between runs) that you delete the previous archive so you aren’t loading jobs across all the clusters!

$ minikube ssh -- rm /tmp/data/archive.tar.gz

The commands we are issuing to flux are:

# Stop the queue
flux queue stop

# This should wait for running jobs to finish
flux queue idle

# And then do the dump!
flux dump /state/archive.tar.gz

And this means we will stop and wait for jobs to finish, and then this state is loaded into the next cluster. If you run the example you might want to insert an IPython.embed() before the delete command at the end, and then interactively shell into the new MiniCluster (when the node are running) and then look at jobs:

$ kubectl exec -it -n flux-operator flux-sample-0-mbv54 -- sudo -u flux flux proxy local:///var/run/flux/local flux jobs -a

And always make sure to clean up your archive at the end!

$ minikube ssh -- rm /tmp/data/archive.tar.gz

The next (basic) example goes through the same ideas, but manually for each step so you can learn about what the script is doing.

Basic Saving Jobs and Metadata

Saving state of the jobs queue and metadata, after runs are complete (between two MiniClusters)

This example will walk through creating two MiniClusters - the first running a set of jobs (and finishing) and the second cluster then loading those states. The assets for these files are in examples/state/basic-job-completion. Note that in order for this to work, a shared storage location is required. Since it’s easier to submit multiple jobs interactively, we will do it that way. Here is the first minicluster.yaml to create:

apiVersion: flux-framework.org/v1alpha1
kind: MiniCluster
metadata:
  name: flux-sample
  namespace: flux-operator
spec:
  # Number of pods to create for MiniCluster
  size: 2

  # Make this interactive so we can launch a bunch of jobs!
  interactive: true

  # Define the archive load/save path here (in our volume mount that persists)
  archive:
    path: /state/archive.tar.gz

  # This volume needs to persistent between MiniClusters so we can load the archive!
  volumes:
    data:
      storageClass: hostpath
      path: /tmp/data
      labels:
        type: "local"

  # This is a list because a pod can support multiple containers
  containers:
    - image: ghcr.io/flux-framework/flux-restful-api:latest
      volumes:
        data:
          path: /state

Note that interactive mode is set to true - this will start a broker to keep running until we decide we are done. Since we are defining the archive path to /state/archive.tar.gz, this means that before Flux is started, we will load an archive from that path given that it exists with flux resource reload. This is done directly in the entrypoint. To have better control of the reverse sequence - saving the final state to that same location, we will run flux dump to that same archive as an interactive command. Note this is a simple approach that assumes we are OK replacing a previous state with a new one - for more complex workflows (where possibly we need to maintain an original state) we likely will need to do something differently. For the time being, let’s create this first minicluster to submit jobs to, and the plan will be that the second minicluster can load previous job history. If you are using Minikube, make sure to pull first:

$ minikube ssh docker pull ghcr.io/flux-framework/flux-restful-api:latest

Now let’s create it! You can either walk through this tutorial and learn about each step (continue) below with kubectl apply) or you can run a demo script that runs the commands on your behalf:

$ /bin/bash ./examples/state/basic-job-completion/example.sh
View the Interactive Example Output
$ bash examples/state/basic-job-completion/example.sh  
🌀️ Creating first MiniCluster...
minicluster.flux-framework.org/flux-sample created

🥱️ Sleeping 20 seconds to wait for cluster...Broker pod is flux-sample-0-qwsqw

🤓️ Contents of /tmp/data in MiniKube

✨️ Submitting jobs
ƒQK5i1V
ƒmXbRuh
ƒw92msM
ƒ27R1o9y
ƒ2JTUStw
ƒ2UhyUuD
ƒ2eVnjqH
ƒ2prDixw
ƒ2zV94Cw

🥱️ Waiting for jobs...
Jobs finished...
       JOBID USER     NAME       ST NTASKS NNODES     TIME INFO
    ƒ2zV94Cw flux     whoami      S      1      -        - 
    ƒ2prDixw flux     sleep       R      1      1   3.740s flux-sample-0
    ƒ2eVnjqH flux     whoami     CD      1      1   0.042s flux-sample-0
    ƒ2UhyUuD flux     sleep      CD      1      1   4.023s flux-sample-0
    ƒ2JTUStw flux     whoami     CD      1      1   0.045s flux-sample-0
    ƒ27R1o9y flux     sleep      CD      1      1   3.022s flux-sample-0
     ƒw92msM flux     whoami     CD      1      1   0.015s flux-sample-0
     ƒmXbRuh flux     sleep      CD      1      1   2.019s flux-sample-0

🥱️ Wait a minute to be sure we have saved...

🧊️ Current state directory at /var/lib/flux...
total 4332
-rw-r--r-- 1 flux flux  151552 Mar 12 16:36 content.sqlite
-rw-r--r-- 1 flux flux 4120032 Mar 12 16:37 content.sqlite-wal
-rw-r--r-- 1 flux flux    4096 Mar 12 16:36 job-archive.sqlite
-rw-r--r-- 1 flux flux   32768 Mar 12 16:37 job-archive.sqlite-shm
-rw-r--r-- 1 flux flux  123632 Mar 12 16:37 job-archive.sqlite-wal

🧊️ Current archive directory at /state... should be empty, not saved yet
total 0
Cleaning up...
minicluster.flux-framework.org "flux-sample" deleted
total 7
-rw-rw-r-- 1 docker docker 6165 Mar 12 16:38 archive.tar.gz

🌀️ Creating second MiniCluster
minicluster.flux-framework.org/flux-sample created

🥱️ Sleeping a minute to wait for cluster...
Broker pod is flux-sample-0-jpx76

🤓️ Contents of /tmp/data in MiniKube - should be populated with archive from first
total 7
-rw-rw-r-- 1 docker docker 6165 Mar 12 16:38 archive.tar.gz

🤓️ Inspecting state directory in new cluster...
total 1308
-rw-r--r-- 1 flux flux    4096 Mar 12 16:38 content.sqlite
-rw-r--r-- 1 flux flux 1281352 Mar 12 16:38 content.sqlite-wal
-rw-r--r-- 1 flux flux    4096 Mar 12 16:38 job-archive.sqlite
-rw-r--r-- 1 flux flux   32768 Mar 12 16:38 job-archive.sqlite-shm
-rw-r--r-- 1 flux flux   12392 Mar 12 16:38 job-archive.sqlite-wal

😎️ Looking to see if old job history exists...
       JOBID USER     NAME       ST NTASKS NNODES     TIME INFO
    ƒ2zV94Cw flux     whoami     CD      1      1   0.037s flux-sample-0
    ƒ2prDixw flux     sleep      CD      1      1   5.023s flux-sample-0
    ƒ2eVnjqH flux     whoami     CD      1      1   0.042s flux-sample-0
    ƒ2UhyUuD flux     sleep      CD      1      1   4.023s flux-sample-0
    ƒ2JTUStw flux     whoami     CD      1      1   0.045s flux-sample-0
    ƒ27R1o9y flux     sleep      CD      1      1   3.022s flux-sample-0
     ƒw92msM flux     whoami     CD      1      1   0.015s flux-sample-0
     ƒmXbRuh flux     sleep      CD      1      1   2.019s flux-sample-0
Cleaning up..
minicluster.flux-framework.org "flux-sample" deleted

Create the MiniCluster

This is how to create the MiniCluster:

$ kubectl apply -f examples/state/basic-job-completion/minicluster.yaml 

At this point you can proceed to either Interactive Submit or Programmatic Submit.

Interactive Submit

And now we need to submit a bunch of jobs to run to completion. We can do this by shelling in (and note this could be done by the Flux Restful API for a more proggrammatic example). First, here is how to do this interactively:

# Shell to the pod
$ kubectl exec -it -n flux-operator flux-sample-0-gzqfl -- bash

Check out the state directory! This is where Flux stores job metadata:

$ ls /var/lib/flux/
content.sqlite  content.sqlite-shm  content.sqlite-wal  job-archive.sqlite  job-archive.sqlite-shm  job-archive.sqlite-wal

Let’s now connect to the Flux instance:

$ sudo -u flux flux proxy local:///var/run/flux/local

And now launch a bunch of jobs. It doesn’t matter what they are, go crazy.

for i in {1..5}
do
   flux submit sleep ${i}
   flux submit whoami
done

These will be done very quickly! You should see them all green (to indicate success) via:

$ flux jobs -a

Programmatic Submit

Or just do the entire thing from the command line! First, confirm the archive path is empty:

$ minikube ssh ls /tmp/data
# no output

Then submit jobs - either one or many:

kubectl exec -it -n flux-operator flux-sample-0-g6gv4 -- sudo -u flux flux proxy local:///var/run/flux/local flux submit sleep 2

for i in {1..5}
do
   kubectl exec -it -n flux-operator flux-sample-0-g6gv4 -- sudo -u flux flux proxy local:///var/run/flux/local flux submit sleep ${i}
   kubectl exec -it -n flux-operator flux-sample-0-g6gv4 -- sudo -u flux flux proxy local:///var/run/flux/local flux submit whoami
done

When you are done, you can see all the jobs:

$ kubectl exec -it -n flux-operator flux-sample-0-mbv54 -- sudo -u flux flux proxy local:///var/run/flux/local flux jobs -a

Then you can stop the queue, wait for jobs to finish, and request the dump. Note that we do this as an interactive command and not automatically because (for large dumps 💩️) we cannot ensure that the time will be given for completion.

$ kubectl exec -it -n flux-operator flux-sample-0-mbv54 -- sudo -u flux flux proxy local:///var/run/flux/local flux queue stop
$ kubectl exec -it -n flux-operator flux-sample-0-mbv54 -- sudo -u flux flux proxy local:///var/run/flux/local flux queue idle
$ kubectl exec -it -n flux-operator flux-sample-0-mbv54 -- sudo -u flux flux proxy local:///var/run/flux/local flux queue dump /state/archive.tar.gz

After that, outside of the shell (if you didn’t already exit) let’s delete the Minicluster.

$ kubectl delete -f examples/state/basic-job-completion/minicluster.yaml 

At this point, it should be the case that the same flux state directory is dumped to the archive path we requested, which is located at /tmp/data/archive.tar.gz in the MiniKube vm (/tmp/data is bound to /state and the archive inside the container is asked to be saved to /state/archive.tar.gz).

$ minikube ssh -- ls -l /tmp/data/
total 7
-rw-rw-r-- 1 docker docker 6231 Mar 12 07:44 archive.tar.gz

Next, let’s bring up a second minicluster! This time, in the entry point it should find the existing archive, load into the broker, and then we will see them. We can use the same minicluster file!

$ kubectl apply -f examples/state/basic-job-completion/minicluster.yaml 

We then then test if this current setup has a memory of the jobs run on the first one:

$ kubectl exec -it -n flux-operator flux-sample-0-dpd42 -- sudo -u flux flux proxy local:///var/run/flux/local flux jobs -a

And of course, clean up when you are done.

$ kubectl delete -f examples/state/basic-job-completion/minicluster.yaml 
$ minikube ssh -- rm -rf /tmp/data/archive.tar.gz

We also have this example demonstrated entirely in Python using the Flux Operator Python SDK.

What are next steps?

This is really cool! Intuitively what we need to do for the next example (stopping a queue of jobs that are running, meaning waiting for running jobs to finish and pausing the rest) is to submit jobs that will take much longer to run, and then figure out how to issue a command to the cluster to stop scheduling, stop the queue, wait for running jobs to finish, and then to do the same archive. What isn’t clear is how it will work when Flux reloads the archive - will the jobs that weren’t run yet start? What commands should be the responsibility of the Operator vs. a client like the Python SDK? I’m not sure - we will find out!


Last update: Mar 30, 2023