---
title: "Callback Hooks"
output: rmarkdown::html_vignette
vignette: >
  %\VignetteIndexEntry{Callback Hooks}
  %\VignetteEngine{knitr::rmarkdown}
  %\VignetteEncoding{UTF-8}
---


## Setup

```r
library(jobqueue)
q <- Queue$new(workers = 1)
```


## Introduction

Callbacks are the cornerstone of asynchronous programming.

If you want to calculate `2 + 2` and show the results, the synchronous programming approach would be:

```r
message('Result = ', 2 + 2)
#> Result = 20
```

In asynchronous programming, this task is broken apart into two discrete steps: *computation* and *result handling*. Using jobqueue, this looks like:

```r
job <- q$run({ 2 + 2 })
job$on('done', ~message('Result = ', .$result))
#> Result = 20
```

The asynchronous format allows parts of your code to run independently, in this case on separate R processes. When a part finishes, the callback will be executed to allow you to work with the result.

Callbacks are not limited to just the Job finishing. See the "State Triggers" section for a list of all events that can trigger a callback.

> **Important**
> 
> Hooks are evaluated on the main process, not the background processes. Therefore, ensure 
> that the callback functions execute quickly so as to not delay Job handling.



## A Callback Function

The callback function should accept one argument: the object that triggered the callback. Functions accepting zero or multiple arguments are also allowed.

This is a great place for R's new shorthand function definitions (R >= 4.1.0).

jobqueue also understands the lambda syntax for functions (see `rlang::as_function()`).

```r
# Queue Hooks
hook <- function (queue) { message('Queue is ', queue$state) }
hook <- \(q) message('Queue is ', q$state)
hook <- ~message('Queue is ', .$state)

# Worker Hooks
hook <- function (worker) { message('Worker is ', worker$state) }
hook <- \(w) message('Worker is ', w$state)
hook <- ~message('Worker is ', .$state)

# Job Hooks
hook <- function (job) { message('Job is ', job$state) }
hook <- \(j) message('Job is ', j$state)
hook <- ~message('Job is ', .$state)
```


## Triggers

Queue, Worker, and Job objects update their `$state` as described in the tables below. Each time the state changes, any callbacks registered to that state are executed. In addition, you can register `state = '*'` or `state='.next'` which trigger regardless of the present state name.


### Special Triggers

| State          | Triggers                                                |
|----------------|---------------------------------------------------------|
| `'*'`          | Every time the state changes.                           |
| `'.next'`      | Only one time, the next time the state changes.         |


### Queue States

| State          | Triggers                                                |
|----------------|---------------------------------------------------------|
| `'starting'`   | After initialization, before Workers are started.       |
| `'idle'`       | When all Workers are idle. Also, after initial startup. |
| `'busy'`       | At least one worker is busy.                            |
| `'stopped'`    | After `<Queue>$stop()` is called.                       |


### Worker States

| State          | Triggers                                                |
|----------------|---------------------------------------------------------|
| `'starting'`   | Background process is being configured.                 |
| `'idle'`       | Waiting on Jobs to be submitted.                        |
| `'busy'`       | After a Job starts running.                             |
| `'stopped'`    | After `<Worker>$stop()` is called.                      |


### Job States

| State          | Triggers                                                |
|----------------|---------------------------------------------------------|
| `'created'`    | After `Job$new()` initialization.                       |
| `'submitted'`  | After `<Job>$queue` is assigned.                        |
| `'queued'`     | After `stop_id` and `copy_id` are resolved.             |
| `'starting'`   | Before evaluation begins.                               |
| `'running'`    | After `<Job>$worker` is set and evaluation begins.      |
| `'done'`       | After `<Job>$output` is assigned.                       |



## Attaching

Callbacks can be attached to Queue, Worker, or Job objects.

You can add callbacks either when you create the object with `$new()`, or later with `$on()`.

```r
hook <- ~message(.$uid, ' is ', .$state)

q <- Queue$new(hooks  = list(q_idle = hook))
w <- Worker$new(hooks = list(idle   = hook))
j <- Job$new(hooks    = list(done   = hook))

q$on('busy',     hook)
w$on('busy',     hook)
j$on('starting', hook)
```

In `Queue$new()`, `hooks` can set hooks for the Queue, Worker, and Job objects. The rules are:

* Prefixing with `q_`, `w_`, or `j_` attaches the hook to the Queue, Workers, or Jobs, respectively.
* Non-prefixed hooks are attached to Jobs.
* Alternatively, a list of lists can be assigned to hooks, of the format:
  
```r
Queue$new(
  'hooks' = list(
    'queue'  = list(idle = hook), 
    'worker' = list(idle = hook), 
    'job'    = list(done = hook) ))
```



## Removing

Callbacks attached with `$new()` cannot be removed.

When you attach a callback with `$on()`, the return value is a function, which, when called, will remove that callback from the object.

```r
job <- Job$new(
  'expr'  = { 3.14 }, 
  'hooks' = list(done = ~message('ABC')) )

off <- job$on('done', ~message('XYZ'))
off()

q$submit(job)
#> ABC
```


## Default Job Hooks

When you create a Queue (with `Queue$new()`), you can define a set of callbacks to automatically apply to any Jobs that are created with the `<Queue>$run()` command.

```r
n <- 0
q <- Queue$new(hooks = list(created = ~{ n <<- n + 1 } ))

for (i in 1:5) q$run({ 'Hi' })

n
#> [1] 5
```

How does `Queue$new()` know to apply the `hooks` to Jobs instead of Queues or Workers? Unless otherwise indicated, `Queue$new()` `hooks` are assumed to be for Jobs. You can also explicitly specify that `hooks` are for Jobs by using the formats described in the "Attaching Callbacks" section above.

```r
q <- Queue$new(hooks = list(j_created = ~{ n <<- n + 1 } ))
# or
q <- Queue$new(hooks = list(job = list(created = ~{ n <<- n + 1 } )))
```

If you set `hooks` in `<Queue>$run()`, those hooks will REPLACE the Job hooks from `Queue$new()`.

```r
n <- 0
q <- Queue$new(hooks = list(created = ~{ n <<- n + 1 } ))

for (i in 1:3) q$run({ 'Hi' }, hooks = list(done = ~message(.$result)))
#> Hi
#> Hi
#> Hi

n
#> [1] 0
```


## Use Case: Priority Setting

Below, we'll set up a callback function that triggers when each Job enters the `'queued'` state. It will modify the Job, adding a custom `<Job>$priority` field to the Job object. Then it will modify the Queue's internal list of jobs (`<Queue>$jobs`), sorting them according to each Job's `<Job>$priority`. Last, it will attach additional callbacks to the Job to output timing information upon exit from the `'queued`' state and upon entry into the `'done'` state.

```r
library(glue)
library(jobqueue)

# Our callback/hook function.
prioritize <- function (job) {

  queue      <- job$queue
  queue_jobs <- job$queue$jobs

  # Apply a random priority to this job.
  job$priority <- round(runif(1) * 10) - 5
  
  # Sort all this Queue's jobs by priority (including this job).
  priorities     <- sapply(queue_jobs, `[[`, 'priority')
  job$queue$jobs <- queue_jobs[order(priorities)]
  
  # Add hooks to this job to report queued/total times.
  t1    <- Sys.time()
  tdiff <- function () format(round(Sys.time() - t1, 1))
  
  job$on('.next', ~message(glue(
    'Job {.$uid} (priority {.$priority}) was {.$state} after {tdiff()}' )))
    
  job$on('done', ~message(glue(
    'Job {.$uid} (priority {.$priority}) finished in {tdiff()}' )))
}

# A single worker best illustrates processing order.
q <- Queue$new(
  'workers' = 1, 
  'hooks'   = list(queued = prioritize) )

for (i in 1:5) {
  job <- q$run({ 3.14 })
  message(glue_data(job, 'Created Job {uid} with priority {priority}'))
}
#> Job J11 (priority -3) was dispatched after 0.1 secs
#> Created Job J11 with priority -3
#> Created Job J12 with priority -2
#> Created Job J13 with priority 1
#> Created Job J14 with priority 0
#> Created Job J15 with priority -1
#> Job J11 (priority -3) finished in 0.7 secs
#> Job J12 (priority -2) was dispatched after 0.6 secs
#> Job J12 (priority -2) finished in 1.1 secs
#> Job J15 (priority -1) was dispatched after 0.7 secs
#> Job J15 (priority -1) finished in 1.3 secs
#> Job J14 (priority 0) was dispatched after 1.4 secs
#> Job J14 (priority 0) finished in 2 secs
#> Job J13 (priority 1) was dispatched after 2.1 secs
#> Job J13 (priority 1) finished in 2.6 secs
```

In an actual application, you could set `<Job>$priority` based on `<Job>$vars`.

```r
prioritize <- function (job) {
  
  # Give priority to lower number of replications
  job$priority <- job$vars$replications
  
  queue_jobs     <- job$queue$jobs
  priorities     <- sapply(queue_jobs, `[[`, 'priority')
  job$queue$jobs <- queue_jobs[order(priorities)]
}
q <- Queue$new(hooks = list(queued = prioritize))

#                                        vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv
for (reps in 5:1) job <- q$run({ 3.14 }, vars = list(replications = reps))
```

Or, define the custom `<Job>$priority` field in `<Queue>$run()`.

```r
prioritize <- function (job) {
  queue_jobs     <- job$queue$jobs
  priorities     <- sapply(queue_jobs, `[[`, 'priority')
  job$queue$jobs <- queue_jobs[order(priorities)]
}
q <- Queue$new(hooks = list(queued = prioritize))

#                                        vvvvvvvvvvvvvvv
for (reps in 5:1) job <- q$run({ 3.14 }, priority = reps)
```

Or, set `<Job>$priority` in a hook that triggers before `prioritize()` is triggered.

```r
set_priority <- function (job) {
  job$priority <- job$vars$replications
}

prioritize <- function (job) {
  queue_jobs     <- job$queue$jobs
  priorities     <- sapply(queue_jobs, `[[`, 'priority')
  job$queue$jobs <- queue_jobs[order(priorities)]
}

#                           vvvvvvvvvvvvvvvvvvvvvv
q <- Queue$new(hooks = list(created = set_priority, queued = prioritize))
for (reps in 5:1) job <- q$run({ 3.14 }, vars = list(replications = reps))
```


## Use Case: Rate Limiting

Say you're hosting a web service, where users are allowed to submit one Job every 30 seconds. Jobs can take more than 30 seconds, so the solution is more complex than setting `stop_id = user_id`. And what if you want to stop the new Job instead of the old one?

```r
rate_limit <- function (job) {

  job$t_start <- Sys.time()

  for (j in job$queue$jobs)
    if (j$user_id == job$user_id)
      if (job$t_start - j$t_start < 30)
        job$stop('Rate Limit Exceeded')
}

q <- Queue$new(hooks = list(submitted = rate_limit))

j_A1 <- q$run({ 42 }, user_id = 'A')
j_B1 <- q$run({ 42 }, user_id = 'B')
j_B2 <- q$run({ 42 }, user_id = 'B')

j_B1$result
#> [1] 42

j_B2$result
#> <interrupt: Rate Limit Exceeded>
```

Note that the above code won't completely solve the rate limiting task. If a user's Job only takes five seconds to complete, then they could submit a Job every six seconds and the Queue would be none the wiser.

To give the Queue awareness of previously completed Jobs, you'll need to persistently store per-user Job start times - like in the below solution.

```r
t_user <- list()

rate_limit <- function (job) {
  t_start <- Sys.time()
  t_diff  <- t_user[[job$user_id]] - t_start
  if (isTRUE(t_diff < 30)) { job$stop('Rate Limit Exceeded')   }
  else                     { t_user[[job$user_id]] <<- t_start }
}

q <- Queue$new(hooks = list(created = rate_limit))
```

In the first example, we attached a hook to `'submitted'` because that's when `<Job>$queue` becomes available in callbacks. In the latter example, we attached a hook to `'created'` instead because we didn't need `<Job>$queue` for that solution. Check the trigger order listed in the "Callback Triggers" section above, and attach callbacks as early as possible to expedite Job handling.