DSL Recursion
Old Version
This page is about Kubeflow Pipelines V1, please see the V2 documentation for the latest information.
Note, while the V2 backend is able to run pipelines submitted by the V1 SDK, we strongly recommend migrating to the V2 SDK.
For reference, the final release of the V1 SDK was kfp==1.8.22
, and its reference documentation is available here.
This page describes how to write recursive functions in the domain specific language (DSL) provided by the Kubeflow Pipelines SDK.
Motivation
Recursion is a feature that is supported by almost all languages to express complex semantics in a succinct way. In machine learning workflows, recursions are especially important to enable features such as multiple rounds of training, iterative model analysis, and hypertuning. The recursion support also covers the loop feature since it enables the same code block to be executed and exited based on dynamic conditions.
How to write a recursive function
Decorator
Decorate the recursive function with kfp.dsl.graph_component as illustrated below. The decorator does not require any arguments.
import kfp.dsl as dsl
@dsl.graph_component
def graph_component_a(input_x):
with dsl.Condition(input_x == 'value_x'):
op_a = task_factory_a(input_x)
op_b = task_factory_b().after(op_a)
graph_component_a(op_b.output)
@dsl.pipeline(
name='pipeline',
description='shows how to use the recursion.'
)
def pipeline():
op_a = task_factory_a()
op_b = task_factory_b()
graph_op_a = graph_component_a(op_a.output)
graph_op_a.after(op_b)
task_factory_c(op_a.output).after(graph_op_a)
Function signature
Define the function signature as a standard Python function. The input parameters are PipelineParams.
Function body
Similar to the pipeline function body, you can instantiate components, create conditions, use the input parameters from the function signature, and specify dependencies explicitly among components. In the example above, one condition is created inside the recursive function and two components op_a and op_b are created inside the condition.
Call the recursive function in the pipeline function
You can pass pipeline/component output to the recursive function and specify the dependencies explicitly with after() function, similar to the ContainerOp. In the example above, the output of op_a defined in the pipeline is passed to the recursive function and the task_factory_c component is specified to depend on the graph_op_a. The recursive function can also be explicitly specified to depend on the ContainerOps. For example, graph_op_a depends on op_b in the pipeline.
More examples
Here is another example where the recursive function call is at the end of the function body, similar to do-while loops.
import kfp.dsl as dsl
@dsl.graph_component
def graph_component_a(input_x):
op_a = task_factory_a(input_x)
op_b = task_factory_b().after(op_a)
with dsl.Condition(op_b.output == 'value_x'):
graph_component_a(op_b.output)
@dsl.pipeline(
name='pipeline',
description='shows how to use the recursion.'
)
def pipeline():
op_a = task_factory_a()
op_b = task_factory_b()
graph_op_a = graph_component_a(op_a.output)
graph_op_a.after(op_b)
task_factory_c(op_a.output).after(graph_op_a)
Limitations
- Type checking does not work for the recursive functions. In other words, The type information that is annotated to the recursive function signature will not be checked.
- Since the output of the recursive functions cannot be dynamically resolved, the downstream ContainerOps cannot access the output from the recursive functions.
- A known issue is that the recursion fails to work when there are multiple recursive function calls in the function body.
Next steps
- See recursion sample
Feedback
Was this page helpful?
Glad to hear it! Please tell us how we can improve.
Sorry to hear that. Please tell us how we can improve.