---
title: "Technical Documentation"
output:
  rmarkdown::html_vignette
vignette: >
  %\VignetteIndexEntry{Technical Documentation}
  %\VignetteEngine{knitr::rmarkdown}
  %\VignetteEncoding{UTF-8}
---

```{css echo=FALSE}
img {
    border: 0px !important;
    margin: 2em 2em 2em 2em !important;
}
code {
    border: 0px !important;
}
```

```{r echo=FALSE, results="hide"}
knitr::opts_chunk$set(
    cache = FALSE,
    echo = TRUE,
    collapse = TRUE,
    comment = "#>"
)
options(clustermq.scheduler = "local")
suppressPackageStartupMessages(library(clustermq))
```

## Worker API

### Base API and schedulers

The main worker functions are wrapped in an _R6_ class with the name of `QSys`.
This provides a standardized API to the [lower-level
messages](https://mschubert.github.io/clustermq/articles/technicaldocs.html#zeromq-message-specification)
that are sent via [_ZeroMQ_](https://zeromq.org/).

The base class itself is derived in scheduler classes that add the required
functions for submitting and cleaning up jobs:

```
+ QSys
  |- Multicore
  |- LSF
  + SGE
    |- PBS
    |- Torque
  |- etc.
```

The user-visible object is a worker `Pool` that wraps this, and will eventually
allow to manage different workers.

### Workers

#### Creating a worker pool

A pool of workers can be created using the `workers()` function, which
instantiates a `Pool` object of the corresponding `QSys`-derived scheduler
class. See `?workers` for details.

```{r eval=FALSE}
# start up a pool of three workers using the default scheduler
w = workers(n_jobs=3)

# if we make an unclean exit for whatever reason, clean up the jobs
on.exit(w$cleanup())
```

#### Worker startup

For workers that are started up via a scheduler, we do not know which machine
they will run on. This is why we start up every worker with a TCP/IP address of
the master socket that will distribute work.

This is achieved by the call to R common to all schedulers:

```{sh eval=FALSE}
R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'
```

#### Worker communication

On the master's side, we wait until a worker connects:

```{r eval=FALSE}
msg = w$recv() # this will block until a worker is ready
```

We can then send any expression to be evaluated on the worker using the `send`
method:

```{r eval=FALSE}
w$send(expression, ...)
```

After the expression (in `...`), any variables that should be passed along with
the call can be added. For batch processing that `clustermq` usually does, this
command is `work_chunk`, where the `chunk` data is added:

```{r eval=FALSE}
w$send(clustermq:::work_chunk(chunk, fun, const, rettype, common_seed),
       chunk = chunk(iter, submit_index))
```

#### Worker environment

We can add any number of objects to a worker environment using the `env`
method:

```{r eval=FALSE}
w$env(object=value, ...)
```

This will also invisibly return a `data.frame` with all objects currently in
the environment. If a user wants to inspect the environment without changing it
they can call `w$env()` without arguments. The environment will be propagated
to all workers automatically in a greedy fashion.

### Main event loop

Putting the above together in an event loop, we get what is essentially
implemented in `master`. `w$send` invisibly returns an identifier to track
which call was submitted, and `w$current()` matches the same to `w$recv()`.

```{r eval=FALSE}
w = workers(3)
on.exit(w$cleanup())
w$env(...)

while (we have new work to send || jobs pending) {
    res = w$recv() # the result of the call, or NULL for a new worker
    w$current()$call_ref # matches answer to request, -1 otherwise
    # handle result

    if (more work)
        call_ref = w$send(expression, ...) # call_ref tracks request identity
    else
        w$send_shutdown()
}
```

A loop of a similar structure can be used to extend `clustermq`. As an example,
[this was done by the _targets_
package](https://github.com/ropensci/targets/blob/1.2.2/R/class_clustermq.R).

## ZeroMQ message specification

Communication between the `master` (main event loop) and workers (`QSys` base
class) is organised in _messages_. These are chunks of serialized data sent via
_ZeroMQ_'s protocol (_ZMTP_). The parts of each message are called *frames*.

### Master - Worker communication

The master requests an evaluation in a message with X frames (direct) or Y if
proxied. This is all handled by _clustermq_ internally.

* The worker identity frame or routing identifier
* A delimiter frame
* Worker status (`wlife_t`)
* The call to be evaluated
* _N_ repetitions of:
  * The variable name of an environment object that is not yet present on the
    worker
  * The variable value

If using a proxy, this will be followed by a `SEXP` that contains variable
names the proxy should add before forwarding to the worker.

### Worker evaluation

A worker evaluates the call using the R C API:

```{r eval=FALSE}
R_tryEvalSilent(cmd, env, &err);
```

If an error occurs in this evaluation will be returned as a structure with
class `worker_error`. If a developer wants to catch errors and warnings in a
more fine-grained manner, it is recommended to add their own `callingHandlers`
to `cmd` (as _clustermq_ does work its `work_chunk`).

### Worker - Master communication

The result of this evaluation is then returned in a message with four (direct)
or five (proxied) frames:

* Worker identity frame (handled internally by _ZeroMQ_'s `ZMQ_REQ` socket)
* Empty frame (handled internally by _ZeroMQ_'s `ZMQ_REQ` socket)
* Worker status (`wlife_t`) that is handled internally by _clustermq_
* The result of the call (`SEXP`), visible to the user

If using a worker via SSH, these frames will be preceded by a routing identify
frame that is handled internally by _ZeroMQ_ and added or peeled off by the
proxy.