Parallel Workflows on Kubernetes
Learn how parallel workflows running in distributed systems on Kubernetes can be used to solve two problems.
Join the DZone community and get the full member experience.
Join For FreeApplications are now increasingly distributed, running on multiple machines and accessed by multiple users from all over the world. By bundling the application code, the application runtime, and the libraries, containers, and container orchestrators, we have addressed many of the challenges of building distributed systems. With container runtimes like Docker, we can deploy our applications on different environments. And with container orchestration tools like Kubernetes, we can scale our applications. We frequently have to break our distributed application into a collection of multiple containers running on different machines. This requires us to coordinate execution and facilitate communication among different containers. A scenario where this situation is encountered is when we compose a workflow using multiple containers. In this article, we will learn how to build such workflows using a container workflow engine, Argo, for Kubernetes. We will develop workflows for the following two examples of so-called "embarrassingly parallel" problems for which Kubernetes is the ideal scaling platform.
N-Queens using genetic algorithms
Distributed search
N-Queens Using Genetic Algorithms
The N-Queens problem is to place N queens on an NxN chessboard so that no two attack. Since each queen must be on a different row and column, we can assume that queen i is placed in i-th column. All solutions to the N-Queens problem can, therefore, be represented as N-tuples (q1, q2, …, qN) that are permutations of an N-tuple (1, 2, 3, …, N). The position of a number in the tuple represents the queen's column position, while its value represents the queen's row position. The complexity of the search is N*(N-1)* ... *1 = N!.
Genetic Algorithms
Genetic algorithms can be used to search for a permutation of N non-attacking queens. The pseudo-code shown below is a simple genetic algorithm to search for a solution.
create initial population
evaluate initial population
while not done
select 3 individuals
run mutation operator
evaluate offspring
if solution found, set done = true
end
The choice of initial population, random here, determines how fast the solution is found. There are different ways to parallelize this algorithm to speed it up. One of the simplest ways is to run this same computation on multiple workers with a different randomly-selected initial population. Once any worker has exited with success, no other worker should still be doing any work or writing any output. They should all exit as soon as one of the workers finds the solution.
Argo Workflow
Let us create a workflow to be run on Kubernetes that runs this genetic algorithm along with other subsequent processing tasks. A Kubernetes Job with a specified number of completions and parallelism is used for parallel execution of pods. We create a Job object with 5 completions and 5 parallelism that will launch 5 pods in parallel to search for a solution. A Redis queue service is used for pubsub. The pod that finishes first will publish a message on a channel on the Redis server. All workers are subscribed to the channel. On receiving "finished" message, all other workers stop the search and exit.
The Docker image containing the Python implementation of the genetic algorithm as described above can be pulled from the DockerHub. This image requires a Redis service to be running on the Kubernetes cluster. Refer to the Kubernetes documentation on how to create and configure a Redis service on Kubernetes.
> docker pull randhirkumars/n-queens-genetic-redis
The workflow then consists of the following steps:
Create a parallelized Kubernetes Job which launches 5 parallel workers. Once any pod has exited with success, no other pod will be doing any work. Return the job name and job uid as output parameters.
Using the uid of the job, query any of its associated pods and print the result to the stdout.
Delete the job using the job name.
The Argo workflow YAML with the above steps is shown below:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: nqueens-
spec:
entrypoint: nqueens-job
templates:
- name: nqueens-job
steps:
- - name: nqueens-genetic-job
template: nqueens-genetic-job
- - name: print-solution
template: print-solution
arguments:
parameters:
- name: job-uid
value: '{{steps.nqueens-genetic-job.outputs.parameters.job-uid}}'
- - name: delete-job
template: delete-job
arguments:
parameters:
- name: job-name
value: '{{steps.nqueens-genetic-job.outputs.parameters.job-name}}'
- name: nqueens-genetic-job
resource:
action: create
successCondition: status.succeeded > 1
failureCondition: status.failed > 0
manifest: |
apiVersion: batch/v1
kind: Job
metadata:
namespace: default
name: nqueens
labels:
job: n-queens
spec:
parallelism: 5
completions: 5
template:
metadata:
labels:
job: n-queens
spec:
containers:
- name: worker
image: randhirkumars/n-queens-genetic-redis
restartPolicy: Never
outputs:
parameters:
- name: job-name
valueFrom:
jsonPath: '{.metadata.name}'
- name: job-uid
valueFrom:
jsonPath: '{.metadata.uid}'
- name: print-solution
inputs:
parameters:
- name: job-uid
container:
image: argoproj/argoexec:latest
command: [sh, -c]
args: ["
for i in `kubectl get pods -l controller-uid={{inputs.parameters.job-uid}} -o name`; do
kubectl logs $i;
done
"]
- name: delete-job
inputs:
parameters:
- name: job-name
resource:
action: delete
manifest: |
apiVersion: batch/v1
kind: Job
metadata:
name: {{inputs.parameters.job-name}}
Notice that each step in the workflow is a Docker container. The first step uses a Docker image that we have built — randhirkumars/n-queens-genetic-redis
— and the second step uses a pre-built image from DockerHub — argoproj/argoexec:latest
. Submit the workflow using the Argo command line.
> argo submit nqueens.yaml
This launches the workflow and this can be visualized as a graph in Argo UI.
Once the workflow has run to completion, the solution is printed out to the console of the pod associated with the Job. A solution for a 14x14 chessboard is shown below.
Distributed Search
In the N-Queens problem, we learned how to create a workflow that launched parallel jobs on Kubernetes. The parallel jobs used an external Redis queue service for coordination. In this section, we look at another pattern for parallel workflow — distributed document search using the scatter/gather pattern. Here, the task is to search different words across a large database of documents for all documents that contain those words. To parallelize the task, we will scatter the different term requests across nodes in the cluster. All the nodes in the cluster have access to a shared volume hosting the documents to be searched. Then we gather all the responses from worker nodes into a single response. This task can be implemented in a workflow with the following steps.
When a request comes into search, parse the request and split the search string into words.
Loop through the words and farm out a leaf pod to search for each word.
Each of the pods returns a list of documents that match one of the words. Collate the search results and print the list of documents.
Argo Workflow
The Argo workflow YAML with the above steps is shown below. Here, we store the documents to be searched and the results of the search on persistent volumes mounted on the pods.
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: dgrep-
spec:
entrypoint: dgrep
volumeClaimTemplates:
- metadata:
name: workdir
spec:
accessModes: [ "ReadWriteOnce" ]
resources:
requests:
storage: 1Gi
arguments:
parameters:
- name: searchstring
value: hello world
templates:
- name: dgrep
steps:
- - name: data
template: data
- - name: generate
template: generate
- - name: search
template: search
arguments:
parameters:
- name: words
value: "{{item}}"
withParam: "{{steps.generate.outputs.result}}"
- - name: collate
template: collate
- name: data
container:
image: alpine
command: [sh, -c]
args: ["touch /mnt/data/file1; touch /mnt/data/file2; echo -n hello > /mnt/data/file1; echo -n world > /mnt/data/file2"]
volumeMounts:
- name: workdir
mountPath: /mnt/data
- name: generate
script:
image: python:alpine3.6
command: [python]
source: |
import json
import sys
json.dump([w for w in "{{workflow.parameters.searchstring}}".split()], sys.stdout)
- name: search
inputs:
parameters:
- name: words
container:
image: alpine
command: [sh, -c]
args: ["grep -rl {{inputs.parameters.words}} /mnt/data | awk -F/ '{ print $NF }' >> /mnt/data/output"]
volumeMounts:
- name: workdir
mountPath: /mnt/data
- name: collate
container:
image: alpine
command: [sh, -c]
args: ["cat /mnt/data/output | sort | uniq"]
volumeMounts:
- name: workdir
mountPath: /mnt/data
Here we are using the vanilla Docker image alpine
for all the steps. The workflow, when submitted using the Argo command line, can be visualized on the Argo UI. The search string consists of two words, "hello" and "world." In the response, two pods are created in parallel, each searching for a word. The individual search results from the pods are collated in the final step to print the list of documents.
Conclusion
We have illustrated how to create parallel workflows on Kubernetes with the help of two examples. We used Argo to create workflows that can be specified as a directed acyclic graph (DAG). Argo allows us to define a container-native workflow on Kubernetes where each step in the workflow is a Docker container. Kubernetes is ideal for running parallel workflows and Argo reduces the complexity of designing such workflows.
Opinions expressed by DZone contributors are their own.
Comments