---
title: "Advanced: How pipelines can modify themselves at runtime"
output:
  rmarkdown::html_vignette:
    toc: true
    toc_depth: 4
description: >
  Shows how you can setup pipelines to modify themselves at runtime, which,
  for example, allows for changing pipeline parameters based on intermediate
  results or even dynamically modify the pipeline's own structure during
  a pipeline run.

vignette: >
  %\VignetteIndexEntry{Advanced: How pipelines can modify themselves at runtime}
  %\VignetteEngine{knitr::rmarkdown}
  %\VignetteEncoding{UTF-8}
---

```{r knitr-setup, include = FALSE}
require(pipeflow)

knitr::opts_chunk$set(
  comment = "#",
  prompt = FALSE,
  tidy = FALSE,
  cache = FALSE,
  collapse = TRUE
)

old <- options(width = 100L)
library(ggplot2)
```

While the `pipeflow` package mostly aims to be an easy and intuitive framework,
it does provide some very advanced features and interfaces that allow for
a very flexible manipulation of the pipeline objects. In this vignette we will
introduce some of these features using simple examples in order to give you
an idea of what is possible.

### Changing pipeline parameters at runtime

Let's first define a pipeline, which fits a linear model, checks it's
residuals for normality using the Shapiro-Wilk test, and plots the
residuals.

```{r}
library(pipeflow)
library(ggplot2)

pip <- pipe_new("my-pipeline") |>

    pipe_add(
        "fit",
        function(data = ~data, xVar = "x", yVar = "y")
        {
            lm(paste(yVar, "~", xVar), data = data)
        }
    ) |>

    pipe_add(
        "residual_shapiro_p_value",
        function(fit = ~fit)
        {
            residuals <- residuals(fit)
            p <- shapiro.test(residuals)$p.value
            p
        },
        keepOut = TRUE
    ) |>

    pipe_add(
        "plot",
        function(fit = ~fit, pointColor = "black")
        {
            data <- data.frame(
                fitted = predict(fit),
                residuals = residuals(fit)
            )

            ggplot(data, aes(x = fitted, y = residuals)) +
                geom_point(shape = 21, color = pointColor) +
                geom_hline(yintercept = 0, linetype = "dashed") +
                theme_minimal()
        },
        keepOut = TRUE
    )
```

So our pipeline looks like this
```{r}
pip
```

and we can run it like this

```{r, fig.alt = "residual-plot"}
pip$set_data(airquality)
pip$set_params(list(xVar = "Ozone", yVar = "Temp"))
pip$run()$collect_out()
```

Now let's imagine, we want to change the color of the points in the plot depending on the
Shapiro-Wilk test result. The obvious way to do this would be to change the `plot` step
by passing the test result to the `plot` step function and change the color there.

However, here we are interested in another way that would keep the `plot` function
unchanged. For example, we could run the pipeline a second time as follows:

```{r, fig.alt = "residual-plot"}
if (pip$get_out("residual_shapiro_p_value") < 0.05) {
    pip$set_params(list(pointColor = "red"))
    pip$run()$collect_out()
}
```

As was mentioned in another vignette, this solution is not ideal, as it requires to run
additional code outside the pipeline framework. To solve this issue, we
therefore basically have to set the parameter from within the pipeline during
execution. That is, we have to make the pipeline aware of itself, which
can be done by passing the pipeline object as a parameter.

Let's update the `residual_shapiro_p_value` step in the above example.
```{r}
pip$replace_step(
    "residual_shapiro_p_value",
    function(
        fit = ~fit,
        .self = NULL
    ) {
        residuals <- residuals(fit)
        p <- shapiro.test(residuals)$p.value

        if (!is.null(.self) && p < 0.05) {
            .self$set_params(list(pointColor = "red"))
        }

        p
    },
    keepOut = TRUE
)
```

Now we just have to make sure to set the `.self` parameter.

```{r, fig.alt = "residual-plot"}
pip$set_data(airquality)
pip$set_params(list(xVar = "Ozone", yVar = "Temp", .self = pip))
pip$run()$collect_out()
```

This simple "trick" opens up a wide range of possibilities for pipeline
modifications at runtime. As we will show in the next section, this is not limited
to changing parameters but can also be used to modify the pipeline structure itself.

### Changing the pipeline structure at runtime

Subsequently, the pipeline steps will be comprised only of very basic functions
in order to keep the examples simple. The focus here is on the pipeline structure
and how it can be modified at runtime.

```{r}
pip <- pipe_new("my-pipeline") |>

    pipe_add(
        "f1",
        function(x = ~data) {
            x + 1
        }
    ) |>

    pipe_add(
        "f2",
        function(x = ~f1) {
            x + 2
        }
    ) |>

    pipe_add(
        "f3",
        function(x = ~f2) {
            x + 3
        }
    )
```

This pipeline just adds 1, 2, and 3 to the input data, respectively.

```{r}
pip$set_data(1)$run()
pip
```

The `out` column in the table shows the output of each step. Now let's modify the
last step of the pipeline such that if the input is greater than 10, the pipeline
will replace the last step with a new step that now instead of `f2` references `f1`
and subtracts 3 from the input.

#### Modify a step

```{r}
pip <- pipe_new("my-pipeline") |>

    pipe_add(
        "f1",
        function(x = ~data) {
            x + 1
        }
    ) |>

    pipe_add(
        "f2",
        function(x = ~f1, .self = NULL)
        {
            if (x > 10 && !is.null(.self))
            {
                .self$replace_step(
                    "f3",
                    function(x = ~f1) {
                        x - 3
                    }
                )
            }
            x + 2
        }
    ) |>

    pipe_add(
        "f3",
        function(x = ~f2) {
            x + 3
        }
    )
```

If we run the pipeline as before, nothing will change.

```{r}
pip$set_params(list(.self = pip))
pip$set_data(1)$run()
pip
```

Now let's try it with an input of 10.

```{r}
pip$set_data(10)$run()
pip
```

We see that both the output of the pipeline and the dependencies of the last step
have changed.

#### Insert and remove steps

For our last example, instead of just replacing, we will go a bit further to
insert and remove steps. The pipeline definition is as follows:

```{r}
pip <- pipe_new(
        "my-pipeline"
    ) |>

    pipe_add(
        "f1",
        function(x = ~data) {
            x + 1
        }
    ) |>

    pipe_add(
        "f2",
        function(x = ~f1, .self = NULL)
        {
            if (x > 10 && !is.null(.self)) {
                .self$insert_after(
                    afterStep = "f1",
                    step = "f2a",
                    function(x = ~f1) {
                        x + 21
                    }
                )
                .self$insert_after(
                    afterStep = "f2a",
                    step = "f2b",
                    function(x = ~f2a) {
                        x + 22
                    }
                )
                .self$replace_step(
                    "f3",
                    function(x = ~f2b) {
                        x + 30
                    }
                )
                .self$remove_step("f2")
                return(.self)
            }
            x + 2
        }
    ) |>

    pipe_add(
        "f3",
        function(x = ~f2) {
            x + 3
        }
    )

```

Basically, if the input is greater than 10, we will insert two new steps
after `f1`, remove `f2`, and replace `f3` with a new step that adds 30 to the
input.

Also note that we return the pipeline object in this case.
This is important, because the pipeline's `run` function has an argument
`recursive`, which by default is set to `TRUE` and means that if a step
returns a pipeline, the run of the current pipeline is aborted and the
returned pipeline is re-run.

Let's see the pipeline structure before running it.

```{r}
pip
```

And now let's run it with an input of 10.

```{r}
pip$set_params(list(.self = pip))
pip$set_data(10)$run()
```

The log output shows the abort and re-run of the pipeline. Let's see the final structure
and step outputs.

```{r}
pip
```

The final structure is as expected with the new steps inserted and the old step removed.
As mentioned before, this is just a simple example to show the possibilities. I leave it
to the user to come up with more sensible and complex use cases.

```{r, include = FALSE}
options(old)
```