Couler API Design¶
This document outlines the design of core Couler APIs to support multiple workflow backends. Please see this RFC for the initial design discussions.
Goals¶
- Design the core Couler APIs that can be implemented across different workflow engines.
- Provide a minimal set of APIs that are forward-looking, engine-agnostic, and less likely to deprecate over-time.
Non-Goals¶
- Provide an exhaustive set of APIs that cover all use cases and all workflow engines.
- Provide implementation details.
Design¶
Core operations (couler.ops
):
run_step(step_def)
wherestep_def
is the step definition that can be a container spec, Python function, or spec that's specific to the underlying workflow engine (e.g. k8s CRD if Argo Workflow is used). A "step" represents a node in the workflow graph, e.g. the "smallest" unit in some sense. For an analogy of a Couler "step" in different backends, please see the last section for a table of various Couler concepts.
Control flow (couler.control_flows
):
map(func, *args, **kwargs)
where*args
and**kwargs
correspond to various arguments passed to the functionfunc
.when(cond, if_op, else_op)
wherecond
can be any of the following predicates:equal()
,not_equal()
,bigger()
,smaller()
,bigger_equal()
,smaller_equal()
.- The operation defined in
if_op
will be executed whencond
is true. Otherwise,else_op
will be executed.
while_loop(cond, func, *args, **kwargs)
wherecond
can be any one of the predicates mentioned above.func
,*args
, and**kwargs
are similar tomap()
's.
Utilities (couler.utils
):
submit(config=workflow_config(schedule="* * * * 1"))
whereconfig
is engine-specific.get_status(workflow_name)
get_logs(workflow_name)
delete_workflow(workflow_name)
Backends (couler.backends
):
get_backend()
use_backend("argo")
Minimal Working Workflow Example¶
An example workflow defined via some of the APIs mentioned above is shown below:
import couler
import random
if couler.get_backend() != "argo":
couler.use_backend("argo")
def random_code():
result = "heads" if random.randint(0, 1) == 0 else "tails"
print(result)
def flip_coin():
return couler.run_step(
image="python:alpine3.6",
step_def=random_code,
)
def heads():
return couler.run_step(
image="alpine:3.6",
step_def=["bash", "-c", 'echo "it was heads"'],
)
def tails():
return couler.run_step(
image="alpine:3.6",
step_def=["bash", "-c", 'echo "it was tails"'],
)
result = flip_coin()
couler.when(couler.equal(result, "heads"), lambda: heads())
couler.when(couler.equal(result, "tails"), lambda: tails())
name = couler.submit(config=workflow_config(schedule="* * * * 1"))
while couler.get_status(name) == "Running":
if couler.get_status(name) == "Completed":
couler.delete_workflow(name)
break
Concepts Analogies in Different Backends¶
To help visualize various concepts and their analogies in different Backends, below is an attempt to compare them in a table which will be updated over time as support for new backends is proposed:
Concept\Framework | Couler | Argo | Tekton | Airflow | Dagster | Prefect |
---|---|---|---|---|---|---|
Step | Step | Step | Step | Task | Solid | Task |
Composite step | Reusable step | Template | Task | SubDag or TaskGroup | Composite Solid | TBA |
Worfklow | Workflow | Workflow | Pipeline | DAG | Pipeline | Flow |
Note that by "reusable step", we mean parameterized templates that can be used to define a Couler step where users only have to specify a few parameters.