Couler API Design¶
This document outlines the design of core Couler APIs to support multiple workflow backends. Please see this RFC for the initial design discussions.
Goals¶
- Design the core Couler APIs that can be implemented across different workflow engines.
- Provide a minimal set of APIs that are forward-looking, engine-agnostic, and less likely to deprecate over-time.
Non-Goals¶
- Provide an exhaustive set of APIs that cover all use cases and all workflow engines.
- Provide implementation details.
Design¶
Core operations (couler.ops):
run_step(step_def)wherestep_defis the step definition that can be a container spec, Python function, or spec that's specific to the underlying workflow engine (e.g. k8s CRD if Argo Workflow is used). A "step" represents a node in the workflow graph, e.g. the "smallest" unit in some sense. For an analogy of a Couler "step" in different backends, please see the last section for a table of various Couler concepts.
Control flow (couler.control_flows):
map(func, *args, **kwargs)where*argsand**kwargscorrespond to various arguments passed to the functionfunc.when(cond, if_op, else_op)wherecondcan be any of the following predicates:equal(),not_equal(),bigger(),smaller(),bigger_equal(),smaller_equal().- The operation defined in
if_opwill be executed whencondis true. Otherwise,else_opwill be executed.
while_loop(cond, func, *args, **kwargs)wherecondcan be any one of the predicates mentioned above.func,*args, and**kwargsare similar tomap()'s.
Utilities (couler.utils):
submit(config=workflow_config(schedule="* * * * 1"))whereconfigis engine-specific.get_status(workflow_name)get_logs(workflow_name)delete_workflow(workflow_name)
Backends (couler.backends):
get_backend()use_backend("argo")
Minimal Working Workflow Example¶
An example workflow defined via some of the APIs mentioned above is shown below:
import couler
import random
if couler.get_backend() != "argo":
couler.use_backend("argo")
def random_code():
result = "heads" if random.randint(0, 1) == 0 else "tails"
print(result)
def flip_coin():
return couler.run_step(
image="python:alpine3.6",
step_def=random_code,
)
def heads():
return couler.run_step(
image="alpine:3.6",
step_def=["bash", "-c", 'echo "it was heads"'],
)
def tails():
return couler.run_step(
image="alpine:3.6",
step_def=["bash", "-c", 'echo "it was tails"'],
)
result = flip_coin()
couler.when(couler.equal(result, "heads"), lambda: heads())
couler.when(couler.equal(result, "tails"), lambda: tails())
name = couler.submit(config=workflow_config(schedule="* * * * 1"))
while couler.get_status(name) == "Running":
if couler.get_status(name) == "Completed":
couler.delete_workflow(name)
break
Concepts Analogies in Different Backends¶
To help visualize various concepts and their analogies in different Backends, below is an attempt to compare them in a table which will be updated over time as support for new backends is proposed:
| Concept\Framework | Couler | Argo | Tekton | Airflow | Dagster | Prefect |
|---|---|---|---|---|---|---|
| Step | Step | Step | Step | Task | Solid | Task |
| Composite step | Reusable step | Template | Task | SubDag or TaskGroup | Composite Solid | TBA |
| Worfklow | Workflow | Workflow | Pipeline | DAG | Pipeline | Flow |
Note that by "reusable step", we mean parameterized templates that can be used to define a Couler step where users only have to specify a few parameters.