Examples¶
Hello World!¶
Let's start by running a very simple workflow template to echo "hello world" using the docker/whalesay
container image from DockerHub.
You can run this directly from your shell with a simple docker command:
$ docker run docker/whalesay cowsay "hello world"
_____________
< hello world >
-------------
\
\
\
## .
## ## ## ==
## ## ## ## ===
/""""""""""""""""___/ ===
~~~ {~~ ~~~~ ~~~ ~~~~ ~~ ~ / ===- ~~~
\______ o __/
\ \ __/
\____\______/
With Couler, we can run the same container on a Kubernetes cluster using an Argo Workflows as the workflow engine.
import couler.argo as couler
from couler.argo_submitter import ArgoSubmitter
couler.run_container(
image="docker/whalesay", command=["cowsay"], args=["hello world"]
)
submitter = ArgoSubmitter()
result = couler.run(submitter=submitter)
Coin Flip¶
This example combines the use of a Python function result, along with conditionals,
to take a dynamic path in the workflow. In this example, depending on the result
of the first step defined in flip_coin()
, the template will either run the
heads()
step or the tails()
step.
Steps can be defined via either couler.run_script()
for Python functions or couler.run_container()
for containers. In addition,
the conditional logic to decide whether to flip the coin in this example
is defined via the combined use of couler.when()
and couler.equal()
.
import couler.argo as couler
from couler.argo_submitter import ArgoSubmitter
def random_code():
import random
res = "heads" if random.randint(0, 1) == 0 else "tails"
print(res)
def flip_coin():
return couler.run_script(image="python:alpine3.6", source=random_code)
def heads():
return couler.run_container(
image="alpine:3.6", command=["sh", "-c", 'echo "it was heads"']
)
def tails():
return couler.run_container(
image="alpine:3.6", command=["sh", "-c", 'echo "it was tails"']
)
result = flip_coin()
couler.when(couler.equal(result, "heads"), lambda: heads())
couler.when(couler.equal(result, "tails"), lambda: tails())
submitter = ArgoSubmitter()
couler.run(submitter=submitter)
DAG¶
This example demonstrates different ways to define the workflow as a directed-acyclic graph (DAG) by specifying the
dependencies of each task via couler.set_dependencies()
and couler.dag()
. Please see the code comments for the
specific shape of DAG that we've defined in linear()
and diamond()
.
import couler.argo as couler
from couler.argo_submitter import ArgoSubmitter
def job(name):
couler.run_container(
image="docker/whalesay:latest",
command=["cowsay"],
args=[name],
step_name=name,
)
# A
# / \
# B C
# /
# D
def linear():
couler.set_dependencies(lambda: job(name="A"), dependencies=None)
couler.set_dependencies(lambda: job(name="B"), dependencies=["A"])
couler.set_dependencies(lambda: job(name="C"), dependencies=["A"])
couler.set_dependencies(lambda: job(name="D"), dependencies=["B"])
# A
# / \
# B C
# \ /
# D
def diamond():
couler.dag(
[
[lambda: job(name="A")],
[lambda: job(name="A"), lambda: job(name="B")], # A -> B
[lambda: job(name="A"), lambda: job(name="C")], # A -> C
[lambda: job(name="B"), lambda: job(name="D")], # B -> D
[lambda: job(name="C"), lambda: job(name="D")], # C -> D
]
)
linear()
submitter = ArgoSubmitter()
couler.run(submitter=submitter)
Note that the current version only works with Argo Workflows but we are actively working on the design of the unified interface that is extensible to additional workflow engines. Please stay tuned for more updates and we welcome any feedback and contributions from the community.