Pipelines¶
Note: This documentation is based on
Kedro 0.15.4
, if you spot anything that is incorrect then please create an issue or pull request.In this section we introduce the concept of a pipeline.
Relevant API documentation: Pipeline
To benefit from Kedro’s automatic dependency resolution, nodes can be chained in a pipeline. A pipeline is a list of nodes that use a shared set of variables.
Building pipelines¶
In the following example, we construct a simple pipeline that computes the variance of a set of numbers. In practice, pipelines can use more complicated node definitions and variables usually correspond to entire datasets:
def mean(xs, n):
return sum(xs) / n
def mean_sos(xs, n):
return sum(x*x for x in xs) / n
def variance(m, m2):
return m2 - m * m
pipeline = Pipeline([
node(len, 'xs', 'n'),
node(mean, ['xs', 'n'], 'm', name='mean node'),
node(mean_sos, ['xs', 'n'], 'm2', name='mean sos'),
node(variance, ['m', 'm2'], 'v', name='variance node')
])
describe
can be used to understand what nodes are part of the pipeline:
print(pipeline.describe())
Output
:
#### Pipeline execution order ####
Name: None
Inputs: xs
len([xs]) -> [n]
mean node
mean sos
variance node
Outputs: v
##################################
Tagging pipeline nodes¶
You can also tag your Pipeline
by providing tags
argument, which will tag all of the pipeline’s nodes.
pipeline = Pipeline(
[node(..., name="node1"), node(..., name="node2", tags="node_tag")], tags="pipeline_tag"
)
Node node1
will only be tagged with pipeline_tag
, while node2
will have both node_tag
and pipeline_tag
.
Merging pipelines¶
You can merge multiple pipelines as shown below. Note that, in this case, pipeline_de
and pipeline_ds
are expanded to a list of their underlying nodes of nodes which are simply merged together:
pipeline_de = Pipeline([
node(len, 'xs', 'n'),
node(mean, ['xs', 'n'], 'm')
])
pipeline_ds = Pipeline([
node(mean_sos, ['xs', 'n'], 'm2'),
node(variance, ['m', 'm2'], 'v')
])
last_node = node(print, 'v', None)
pipeline_all = Pipeline([
pipeline_de,
pipeline_ds,
last_node
])
print(pipeline_all.describe())
Output
:
#### Pipeline execution order ####
Name: None
Inputs: xs
len([xs]) -> [n]
mean([n,xs]) -> [m]
mean_sos([n,xs]) -> [m2]
variance([m,m2]) -> [v]
print([v]) -> None
Outputs: None
##################################
Fetching pipeline nodes¶
Pipelines provide access to their nodes in a topological order for enabling custom functionality, e.g. custom visualisation of pipelines. Each node has information about its inputs and outputs:
nodes = pipeline.nodes
nodes
Output
:
Out[5]:
[Node(len, 'xs', 'n', None),
Node(mean, ['xs', 'n'], 'm', 'mean node'),
Node(mean_sos, ['xs', 'n'], 'm2', 'mean sos'),
Node(variance, ['m', 'm2'], 'v', 'variance node')]
nodes[0].inputs
Output
:
Out[6]: ['xs']
Modular pipelines¶
As your Kedro project evolves and gets more sophisticated, you may find out that a single (“master”) pipeline does not fit the purpose of discoverability anymore due to its complexity. Also, it may be quite hard to port any reusable parts of the original pipeline into a different project. The solution would be to split the pipeline into several logically isolated reusable components, i.e. modular pipelines.
Modular pipelines serve the following main purposes:
- Discoverability: modular pipeline represents a logically isolated unit of work that is much easier to develop, test and maintain
- Portability: the proposed internal structure of a modular pipeline makes it easy to copy the pipeline between projects
Modular pipeline structure¶
Note: Although Kedro does not enforce the structure from below, we strongly encourage to follow it when developing your modular pipelines. Future versions of Kedro may assume this structure, which would make your modular pipelines compatible with new Kedro features out of box.
Here is the proposed structure of Kedro project pipelines:
- modular pipelines:
src/new_kedro_project/pipelines/data_engineering
- Data Engineering pipelinesrc/new_kedro_project/pipelines/data_science
- Data Science pipeline
- master (or
__default__
) pipeline:src/new_kedro_project/pipeline.py
- combines 2 modular pipelines from the above
new-kedro-project
├── .ipython/
├── conf/
├── data/
├── docs/
├── logs/
├── notebooks/
├── references/
├── results/
├── src
│ ├── new_kedro_project
│ │ ├── pipelines
│ │ │ ├── data_engineering
│ │ │ │ ├── __init__.py
│ │ │ │ ├── nodes.py
│ │ │ │ ├── pipeline.py
│ │ │ │ ├── requirements.txt
│ │ │ │ └── README.md
│ │ │ ├── data_science
│ │ │ │ ├── __init__.py
│ │ │ │ ├── nodes.py
│ │ │ │ ├── pipeline.py
│ │ │ │ ├── requirements.txt
│ │ │ │ └── README.md
│ │ │ └── __init__.py
│ │ ├── __init__.py
│ │ ├── nodes.py
│ │ ├── pipeline.py
│ │ └── run.py
│ ├── tests
│ │ ├── __init__.py
│ │ └── test_run.py
│ ├── requirements.txt
│ └── setup.py
├── .kedro.yml
├── README.md
├── kedro_cli.py
└── setup.cfg
Requirements¶
Here is a list of requirements to make your pipeline modular:
- Each modular pipeline should be placed in a Python module
src/<python_package>/pipelines/<pipeline_name>
src/<python_package>/pipelines
should be a Python package itself- Modular pipeline must expose a function called
create_pipeline
at the top level of its package. Callingcreate_pipeline
with no arguments should return an instance of a Pipeline:
import new_kedro_project.pipelines.data_engineering as de
de_pipeline = de.create_pipeline() # type: kedro.pipeline.Pipeline
Best practice¶
This is a list of recommendations for developing modular pipeline. They are not enforced by Kedro at this stage, however this may change in future versions.
- A modular pipeline should include a
README.md
as a main documentation source for the end users with all the information regarding the execution of the pipeline - A modular pipeline may have external dependencies specified in
requirements.txt
. Those, however, are not currently installed bykedro install
command, so the users of your pipeline would have to runpip install -r src/<python_package>/pipelines/<pipeline_name>/requirements.txt
- To ensure portability, modular pipeline should use relative imports when accessing its own objects and absolute imports otherwise. Example from
src/<python_package>/pipelines/data_engineering/pipeline.py
:
from external_package import add # importing from external package
from kedro.pipeline import node, Pipeline
from .nodes import node1_func, node2_func # importing its own node functions
def create_pipeline():
node1 = node(func=node1_func, inputs='a', outputs='b')
node2 = node(func=node2_func, inputs='c', outputs='d')
node3 = node(func=add, inputs=['b', 'd'], outputs='sum')
return Pipeline([node1, node2, node3])
- Modular pipelines should not depend on the main Python package (
new_kedro_project
in this example) as it would break the portability to another project - Master pipeline should import and instantiate modular pipelines as shown in this example from
src/<python_package>/pipeline.py
:
from typing import Dict
from kedro.pipeline import Pipeline
from new_kedro_project.pipelines import data_engineering as de, data_science as ds
def create_pipelines(**kwargs) -> Dict[str, Pipeline]:
data_engineering_pipeline = de.create_pipeline()
data_science_pipeline = ds.create_pipeline()
pipeline_all = data_engineering_pipeline + data_science_pipeline
return {
"de": data_engineering_pipeline,
"__default__": pipeline_all
}
Note: To find out how you can run a pipeline by its name, please navigate to this section.
Configuration¶
Nested configuration in modular pipelines is not currently supported by Kedro. It means that putting config files (like catalog.yml
) in src/<python_package>/pipelines/<pipeline_name>/conf
will have no effect on Kedro project configuration.
The recommended way to apply the changes to project catalog or any other project configuration is to document those changes in the README.md
of your modular pipeline. For example, you may instruct the users to copy catalog.yml
into their top-level configuration like that:
mkdir -p conf/base/pipelines/data_engineering # create a separate folder for the pipeline configs
cp -r src/<python_package>/pipelines/data_engineering/conf/* conf/base/pipelines/data_engineering # copy the pipeline configs
Datasets¶
It is important to keep in mind that Kedro resolves node execution order based on their input and output datasets. For example, if node 1 outputs the dataset A
, and node 2 requires the dataset A
as an input, node 1 is guaranteed to be executed before node 2 when Kedro runs the pipeline.
As a modular pipeline developer, you may not know how your pipeline will be integrated in the downstream projects and what data catalog configuration they may have. Therefore, it is crucial to make it clear in the pipeline documentation what datasets (names and types) are required as inputs by your modular pipeline and what datasets it produces as outputs.
Connecting existing pipelines¶
When two existing pipelines need to work together, they should be connected by the datasets.
But the names might be different, requiring manual fixes to be applied to the pipeline itself.
Alternative solution would be to transform
an existing pipeline. Consider this example:
cook_pipeline = Pipeline([
node(defrost, 'frozen_meat', 'meat'),
node(grill, 'meat', 'grilled_meat'),
])
lunch_pipeline = Pipeline([
node(eat, 'food', None),
])
A simple cook_pipeline + lunch_pipeline
doesn’t work, food
input needs to be mapped to grilled_meat
output.
That’s how it can be done, all three resulting pipelines do the job equally fine:
final_pipeline1 = cook_pipeline.transform(datasets={"grilled_meat": "food"}) + lunch_pipeline
final_pipeline2 = cook_pipeline + lunch_pipeline.transform(datasets={"food": "grilled_meat"})
final_pipeline3 = cook_pipeline.transform(datasets={"grilled_meat": "new_name"}) + \
lunch_pipeline.transform(datasets={"food": "new_name")
Using a modular pipeline twice¶
Consider the example:
cook_pipeline = Pipeline([
node(defrost, "frozen_meat", "meat", name="defrost_node"),
node(grill, "meat", "grilled_meat"),
])
breakfast_pipeline = Pipeline([
node(eat_breakfast, "breakfast_food", None),
])
lunch_pipeline = Pipeline([
node(eat_lunch, "lunch_food", None),
])
Now we need to “defrost” two different types of food and feed it to different pipelines.
But we can’t use the cook_pipeline
twice, the internal dataset names will conflict.
We might try to call transform
and rename all datasets,
but the conflicting explicitly set name="defrost_node"
remains.
The right solution is:
pipeline = (
cook_pipeline.transform(
datasets={"grilled_meat": "breakfast_food"}, prefix="breakfast"
)
+ breakfast_pipeline
+ cook_pipeline.transform(
datasets={"grilled_meat": "lunch_food"}, prefix="lunch"
)
+ lunch_pipeline
)
prefix="lunch"
renames all datasets and nodes, prefixing them with "lunch."
,
except those datasets that we rename explicitly (grilled_meat
).
The resulting pipeline now has two separate nodes, breakfast.defrost_node
and
lunch.defrost_node
. Also two separate datasets breakfast.meat
and lunch.meat
connect the nodes inside the pipelines, causing no confusion between them.
Bad pipelines¶
As you notice, pipelines can usually readily resolve their dependencies. In some cases, resolution is not possible and pipelines are not well-formed.
Pipeline with bad nodes¶
In this case we have a pipeline consisting of a single node with no input and output:
try:
Pipeline([
node(lambda: print('!'), None, None)
])
except Exception as e:
print(e)
Output
:
Invalid Node definition: it must have some `inputs` or `outputs`.
Format should be: node(function, inputs, outputs)
Pipeline with circular dependencies¶
For every two variables where the first depends on the second, there must not be a way in which the second also depends on the first, otherwise, a circular dependency will prevent us from compiling the pipeline.
The first node captures the relationship of how to calculate y
from x
and the second captures the relationship of how to calculate x
knowing y
. Both cannot coexist in the same pipeline:
try:
Pipeline([
node(lambda x: x+1, 'x', 'y', name='first node'),
node(lambda y: y-1, 'y', 'x', name='second node')
])
except Exception as e:
print(e)
Output
:
Circular dependencies exist among these items: ['first node: <lambda>([x]) -> [y]', 'second node: <lambda>([y]) -> [x]']
Running pipelines¶
When running pipelines, it can be useful to check the inputs and outputs of a pipeline:
pipeline.inputs()
Output
:
Out[7]: {'xs'}
pipeline.outputs()
Output
:
Out[8]: {'v'}
Runners¶
Runners are different execution mechanisms for running pipelines. They all inherit from AbstractRunner
. You can use SequentialRunner
to execute pipeline nodes one-by-one based on their dependencies.
We recommend using SequentialRunner
in cases where:
- the pipeline has limited branching
- the pipeline is fast
- the resource consuming steps require most of a scarce resource (e.g., significant RAM, disk memory or CPU)
PySpark
is being used
Now we can execute the pipeline by providing a runner and values for each of the inputs.
From the command line, you can run the pipeline as follows:
kedro run
Output
:
2019-04-26 17:19:01,341 - root - INFO - ** Kedro project new-kedro-project
2019-04-26 17:19:01,360 - kedro.io.data_catalog - INFO - Loading data from `example_iris_data` (CSVLocalDataSet)...
2019-04-26 17:19:01,387 - kedro.io.data_catalog - INFO - Loading data from `parameters` (MemoryDataSet)...
2019-04-26 17:19:01,387 - kedro.pipeline.node - INFO - Running node: split_data([example_iris_data,params:example_test_data_ratio]) -> [example_test_x,example_test_y,example_train_x,example_train_y]
2019-04-26 17:19:01,437 - kedro.io.data_catalog - INFO - Saving data to `example_test_x` (MemoryDataSet)...
2019-04-26 17:19:01,439 - kedro.io.data_catalog - INFO - Saving data to `example_train_y` (MemoryDataSet)...
2019-04-26 17:19:01,443 - kedro.io.data_catalog - INFO - Saving data to `example_train_x` (MemoryDataSet)...
2019-04-26 17:19:01,447 - kedro.io.data_catalog - INFO - Saving data to `example_test_y` (MemoryDataSet)...
2019-04-26 17:19:01,447 - kedro.runner.sequential_runner - INFO - Completed 1 out of 4 tasks
2019-04-26 17:19:01,448 - kedro.io.data_catalog - INFO - Loading data from `example_train_x` (MemoryDataSet)...
2019-04-26 17:19:01,454 - kedro.io.data_catalog - INFO - Loading data from `example_train_y` (MemoryDataSet)...
2019-04-26 17:19:01,461 - kedro.io.data_catalog - INFO - Loading data from `parameters` (MemoryDataSet)...
2019-04-26 17:19:01,461 - kedro.pipeline.node - INFO - Running node: train_model([example_train_x,example_train_y,parameters]) -> [example_model]
2019-04-26 17:19:01,887 - kedro.io.data_catalog - INFO - Saving data to `example_model` (MemoryDataSet)...
2019-04-26 17:19:01,887 - kedro.runner.sequential_runner - INFO - Completed 2 out of 4 tasks
2019-04-26 17:19:01,888 - kedro.io.data_catalog - INFO - Loading data from `example_test_x` (MemoryDataSet)...
2019-04-26 17:19:01,888 - kedro.io.data_catalog - INFO - Loading data from `example_model` (MemoryDataSet)...
2019-04-26 17:19:01,888 - kedro.pipeline.node - INFO - Running node: predict([example_model,example_test_x]) -> [example_predictions]
2019-04-26 17:19:01,890 - kedro.io.data_catalog - INFO - Saving data to `example_predictions` (MemoryDataSet)...
2019-04-26 17:19:01,891 - kedro.runner.sequential_runner - INFO - Completed 3 out of 4 tasks
2019-04-26 17:19:01,891 - kedro.io.data_catalog - INFO - Loading data from `example_predictions` (MemoryDataSet)...
2019-04-26 17:19:01,891 - kedro.io.data_catalog - INFO - Loading data from `example_test_y` (MemoryDataSet)...
2019-04-26 17:19:01,891 - kedro.pipeline.node - INFO - Running node: report_accuracy([example_predictions,example_test_y]) -> None
2019-04-26 17:19:01,892 - new_kedro_project.nodes.example - INFO - Model accuracy on test set: 96.67%
2019-04-26 17:19:01,892 - kedro.runner.sequential_runner - INFO - Completed 4 out of 4 tasks
2019-04-26 17:19:01,892 - kedro.runner.sequential_runner - INFO - Pipeline execution completed successfully.
which will run the pipeline using SequentialRunner
by default. You can also explicitly use SequentialRunner
as follows:
kedro run --runner=SequentialRunner
In case you want to run the pipeline using ParallelRunner
, add a flag as follows:
kedro run --parallel
or
kedro run --runner=ParallelRunner
Output
:
2019-04-26 17:20:45,012 - root - INFO - ** Kedro project new-kedro-project
2019-04-26 17:20:45,081 - kedro.io.data_catalog - INFO - Loading data from `example_iris_data` (CSVLocalDataSet)...
2019-04-26 17:20:45,099 - kedro.io.data_catalog - INFO - Loading data from `parameters` (MemoryDataSet)...
2019-04-26 17:20:45,099 - kedro.pipeline.node - INFO - Running node: split_data([example_iris_data,params:example_test_data_ratio]) -> [example_test_x,example_test_y,example_train_x,example_train_y]
2019-04-26 17:20:45,115 - kedro.io.data_catalog - INFO - Saving data to `example_test_x` (AutoProxy[MemoryDataSet])...
2019-04-26 17:20:45,121 - kedro.io.data_catalog - INFO - Saving data to `example_test_y` (AutoProxy[MemoryDataSet])...
2019-04-26 17:20:45,123 - kedro.io.data_catalog - INFO - Saving data to `example_train_x` (AutoProxy[MemoryDataSet])...
2019-04-26 17:20:45,125 - kedro.io.data_catalog - INFO - Saving data to `example_train_y` (AutoProxy[MemoryDataSet])...
2019-04-26 17:20:45,135 - kedro.io.data_catalog - INFO - Loading data from `example_train_x` (AutoProxy[MemoryDataSet])...
2019-04-26 17:20:45,140 - kedro.io.data_catalog - INFO - Loading data from `example_train_y` (AutoProxy[MemoryDataSet])...
2019-04-26 17:20:45,142 - kedro.io.data_catalog - INFO - Loading data from `parameters` (MemoryDataSet)...
2019-04-26 17:20:45,142 - kedro.pipeline.node - INFO - Running node: train_model([example_train_x,example_train_y,parameters]) -> [example_model]
2019-04-26 17:20:45,437 - kedro.io.data_catalog - INFO - Saving data to `example_model` (AutoProxy[MemoryDataSet])...
2019-04-26 17:20:45,444 - kedro.io.data_catalog - INFO - Loading data from `example_test_x` (AutoProxy[MemoryDataSet])...
2019-04-26 17:20:45,449 - kedro.io.data_catalog - INFO - Loading data from `example_model` (AutoProxy[MemoryDataSet])...
2019-04-26 17:20:45,449 - kedro.pipeline.node - INFO - Running node: predict([example_model,example_test_x]) -> [example_predictions]
2019-04-26 17:20:45,451 - kedro.io.data_catalog - INFO - Saving data to `example_predictions` (AutoProxy[MemoryDataSet])...
2019-04-26 17:20:45,457 - kedro.io.data_catalog - INFO - Loading data from `example_predictions` (AutoProxy[MemoryDataSet])...
2019-04-26 17:20:45,461 - kedro.io.data_catalog - INFO - Loading data from `example_test_y` (AutoProxy[MemoryDataSet])...
2019-04-26 17:20:45,461 - kedro.pipeline.node - INFO - Running node: report_accuracy([example_predictions,example_test_y]) -> None
2019-04-26 17:20:45,466 - new_kedro_project.nodes.example - INFO - Model accuracy on test set: 100.00%
2019-04-26 17:20:45,494 - kedro.runner.parallel_runner - INFO - Pipeline execution completed successfully.
Note: You cannot use both--parallel
and--runner
flags at the same time (e.g.kedro run --parallel --runner=SequentialRunner
raises an exception).
Running a pipeline by name¶
To run the pipeline by its name, you need to add your new pipeline to create_pipelines()
function src/<python_package>/pipeline.py
as below:
def create_pipelines(**kwargs):
"""Create the project's pipeline.
Args:
kwargs: Ignore any additional arguments added in the future.
Returns:
Pipeline: The resulting pipeline.
"""
data_engineering_pipeline = de.create_pipeline()
data_science_pipeline = ds.create_pipeline()
my_pipeline = Pipeline(
[
# your definition goes here
]
)
return {
"de": data_engineering_pipeline,
"my_pipeline": my_pipeline,
"__default__": data_engineering_pipeline + data_science_pipeline,
}
Then from the command line, execute the following:
kedro run --pipeline my_pipeline
Note:kedro run
without--pipeline
option runs__default__
pipeline from the dictionary returned bycreate_pipelines()
.
Applying decorators on pipelines¶
You can apply decorators on whole pipelines, the same way you apply decorators on single nodes. For example, if you want to apply the decorators defined in the earlier section to all pipeline nodes simultaneously, you can do so as follows:
hello_pipeline = Pipeline([
node(say_hello, 'name1', None),
node(say_hello, 'name2', None)
]).decorate(apply_g, apply_h)
SequentialRunner().run(hello_pipeline, DataCatalog({}, dict(name1="Kedro", name2="Python")))
Output
:
Hello f(h(g(Kedro)))!
Hello f(h(g(Python)))!
Out[9]: {}
Kedro has a couple of built-in decorators, which can be useful for monitoring your pipeline. You can find the built-in decorators in kedro.pipeline.decorators
:
log_time
will log the time taken for executing your nodemem_profile
will log the max memory usage of your node.mem_profile
needs to perform at least 4 memory snapshots of your node and it will do them every 100ms, therefore it can be used only for nodes which take more than half a second to run.
Running pipelines with IO¶
The above definition of pipelines only applies for non-stateful or “pure” pipelines that do not interact with the outside world. In practice, we would like to interact with APIs, databases, files and other sources of data. By combining IO and pipelines, we can tackle these more complex use cases.
By using DataCatalog
from the IO module we are still able to write pure functions that work with our data and outsource file saving and loading to DataCatalog
.
Through DataCatalog
, we can control where inputs are loaded from, where intermediate variables get persisted and ultimately the location to which output variables are written:
io = DataCatalog(dict(
xs=MemoryDataSet()
))
io.list()
Output
:
Out[10]: ['xs']
io.save('xs', [1, 2, 3])
SequentialRunner().run(pipeline, catalog=io)
Output
:
Out[11]: {'v': 0.666666666666667}
In this simple example, we defined a MemoryDataSet
called xs
to store our inputs, saved our input list [1, 2, 3]
into xs
, then instantiated SequentialRunner
and called its run
method with the pipeline and data catalog instances.
Outputting to a file¶
We can also use IO to save outputs to a file. In this example, we define a custom LambdaDataSet that would serialise the output to a file locally:
def save(value):
with open("./data/07_model_output/variance.pickle", "wb") as f:
pickle.dump(value, f)
def load():
with open("./data/07_model_output/variance.pickle", "rb") as f:
return pickle.load(f)
pickler = LambdaDataSet(load=load, save=save)
io.add('v', pickler)
It is important to make sure that the data catalog variable name v
matches the name v
in the pipeline definition.
Next we can confirm that this LambdaDataSet
works:
io.save('v', 5)
io.load('v')
Ouput
:
Out[12]: 5
Finally, let’s run the pipeline again now serialising the output:
SequentialRunner().run(pipeline, catalog=io)
Ouput
:
Out[13]: {}
Because the output has been persisted to a local file we don’t see it directly, but it can be retrieved from the catalog:
io.load('v')
Ouput
:
Out[14]: 0.666666666666667
try:
os.remove("./data/07_model_output/variance.pickle")
except FileNotFoundError:
pass
Partial pipelines¶
Sometimes it is desirable to work with only a subset of a pipeline’s nodes. Let’s look at the example pipeline we created earlier:
print(pipeline.describe())
Ouput
:
#### Pipeline execution order ####
Name: None
Inputs: xs
len([xs]) -> [n]
mean node
mean sos
variance node
Outputs: v
##################################
Partial pipeline starting from inputs¶
One way to specify a partial pipeline is by providing a set of pre-calculated inputs which should serve as a start of the partial pipeline. For example, in order to fetch the partial pipeline running from input m2
downstream you can specify it like this:
print(pipeline.from_inputs('m2').describe())
Output
:
#### Pipeline execution order ####
Name: None
Inputs: m, m2
variance node
Outputs: v
##################################
Specifying that the partial pipeline from inputs m
and xs
is needed will result in the following pipeline:
print(pipeline.from_inputs('m', 'xs').describe())
Output
:
#### Pipeline execution order ####
Name: None
Inputs: xs
len([xs]) -> [n]
mean node
mean sos
variance node
Outputs: v
##################################
As it can been seen from the pipeline description, adding m
in the from_inputs
list does not guarantee that it will not be recomputed if another provided input like xs
forces recomputing it.
Partial pipeline starting from nodes¶
Another way of selecting a partial pipeline is by specifying the nodes which should be used as a start of the new pipeline. For example you can do as follows:
print(pipeline.from_nodes('mean node').describe())
Output
:
#### Pipeline execution order ####
Name: None
Inputs: m2, n, xs
mean node
variance node
Outputs: v
##################################
As you can see, this will create a partial pipeline starting from the specified node and continuing to all other nodes downstream.
You can run the resulting partial pipeline by running the following command in your terminal window:
kedro run --from-nodes="mean node"
Partial pipeline ending at nodes¶
Similarly, you can specify the nodes which should be used as an end of the new pipeline. For example, you can do as follows:
print(pipeline.to_nodes('mean node').describe())
Output
:
#### Pipeline execution order ####
Name: None
Inputs: xs
len([xs]) -> [n]
mean node
Outputs: m
##################################
As you can see, this will create a partial pipeline starting at the top and ending with the specified node.
You can run the resulting partial pipeline by running the following command in your terminal window:
kedro run --to-nodes="mean node"
Furthermore, you can combine these two flags to specify a range of nodes to be included in the new pipeline. This would look like:
kedro run --from-nodes A --to-nodes Z
or, when specifying multiple nodes:
kedro run --from-nodes A,D --to-nodes X,Y,Z
Partial pipeline from nodes with tags¶
One can also create a partial pipeline from the nodes that have specific tags attached to them. In order to construct a partial pipeline out of nodes that have both tag t1
AND tag t2
, you can run the following:
print(pipeline.only_nodes_with_tags('t1', 't2').describe())
Output
:
#### Pipeline execution order ####
Name: None
Inputs: None
Outputs: None
##################################
To construct a partial pipeline out of nodes that have tag t1
OR tag t2
, please execute the following:
partial_pipeline = pipeline.only_nodes_with_tags('t1') + pipeline.only_nodes_with_tags('t2')
print(partial_pipeline.describe())
Output
:
#### Pipeline execution order ####
Name: None
Inputs: None
Outputs: None
##################################
Running only some nodes¶
Sometimes you might need to run only some of the nodes in a pipeline. To do that, you can do as follows:
print(pipeline.only_nodes('mean node', 'mean sos').describe())
Output
:
#### Pipeline execution order ####
Name: None
Inputs: n, xs
mean node
mean sos
Outputs: m, m2
##################################
This will create a partial pipeline, consisting solely of the nodes you specify as arguments in the method call.
You can check this out for yourself by updating the definition of the first node in the example code provided in pipeline.py
as follows:
node(
split_data,
["example_iris_data", "parameters"],
dict(
train_x="example_train_x",
train_y="example_train_y",
test_x="example_test_x",
test_y="example_test_y",
),
name="node1",
),
and then run the following command in your terminal window:
kedro run --node=node1
You may specify multiple names like so:
kedro run --node=node1 --node=node2
Note: The run will only succeed if all the inputs required by those nodes already exist, i.e. already produced or present in the data catalog.
Recreating Missing Outputs¶
Kedro supports the automatic generation of partial pipelines that take into account existing node outputs. This can be helpful to avoid re-running nodes that take a long time:
print(pipeline.describe())
Output
:
#### Pipeline execution order ####
Name: None
Inputs: xs
len([xs]) -> [n]
mean node
mean sos
variance node
Outputs: v
##################################
To demonstrate this, let us save the intermediate output n
using a JSONLocalDataSet
.
n_json = JSONLocalDataSet(filepath="./data/07_model_output/len.json")
io = DataCatalog(dict(
xs=MemoryDataSet([1, 2, 3]),
n=n_json,
))
Because n
was not saved previously, checking for its existence returns False
:
io.exists('n')
Output
:
Out[15]: False
Running the pipeline calculates n
and saves the result to disk:
SequentialRunner().run(pipeline, io)
Output
:
Out[16]: {'v': 0.666666666666667}
io.exists('n')
Output
:
Out[17]: True
We can avoid re-calculating n
(and all other results that have already been saved) by using the Runner.run_only_missing
method. Note that the first node of the original pipeline (len([xs]) -> [n]
) has been removed:
SequentialRunner().run_only_missing(pipeline, io)
Ouput
:
Out[18]: {'v': 0.666666666666667}
try:
os.remove("./data/07_model_output/len.json")
except FileNotFoundError:
pass