The hardware and bandwidth for this mirror is donated by METANET, the Webhosting and Full Service-Cloud Provider.
If you wish to report a bug, or if you are interested in having us mirror your free-software or open-source project, please feel free to contact us at mirror[@]metanet.ch.

Parallel processing

speeding up skytrackr computation

Koen Hufkens

1 Introduction

Light loggers are often deployed in bulk. In order to speed up processing you can easily convert a standard serial {dplyr} based processing to a parallel one using the {multidplyr} package. To demonstrate this I’ll use a fake dataset with two loggers (using the included demo data).

2 Data setup

For this exercise you will need {skytrackr} and {multidplyr} libraries loaded. I use the included demo dataset cc876 and load it into two different data frames, while renaming the logger in one of them. I then merge both data frames. This will create a dataset with two loggers (although containing the same information), suitable to demonstrate parallel processing.

library(skytrackr)
library(dplyr)
library(multidplyr)

# creating a fake dataset
# by duplicating data and
# renaming the logger
df1 <- skytrackr::cc876
df1$logger <- "CC888"
df2 <- skytrackr::cc876
df <- bind_rows(df1,df2)

Next I’ll define the cluster setup using the new_cluster() function. In general you can safely use all but one of your cores on your local machine (n - 1). To detect the number of cores on your machine you can use parallel::detectCores(). In most modern computers there are at least four (4) local cores. In our case, with two datasets to process, one core will go unused. If more data is presented it will be distributed over the available cores.

# detect number of cores automatically
# n <- parallel::detectCores() - 1

# in this case I force them to two (2)
n <- 2

# create a new cluster
cluster <- new_cluster(n)

Since every CPU processes the data in isolation you need to explicitly specify the libraries you wish to use. In this case, we provide the cluster with the {skytrackr} library (and its dependencies).

# Make sure the "skytrackr" library
# is made available
cluster_library(cluster, "skytrackr")

With the cluster details specified I can now partition the data for distribution to the different CPUs. The partitioning of data is done by the standard {dplyr} group_by() function, followed by the {multidplyr} partition() function using the cluster specifications as an argument. This will transform the tibble data frame into a partitioned data frame, or party_df.

# split tasks by logger
# across cluster partitions
df_logger <- df |>
  group_by(logger) |>
  partition(cluster)

print(df_logger)
#> Source: party_df [17,280 x 6]
#> Groups: logger
#> Shards: 2 [8,640--8,640 rows]
#> 
#> # A data frame: 17,280 × 6
#>   logger date_time           date         hour measurement value
#>   <chr>  <dttm>              <date>      <dbl> <chr>       <dbl>
#> 1 CC876  2021-08-02 00:04:10 2021-08-02 0.0694 lux          0.08
#> 2 CC876  2021-08-02 00:09:10 2021-08-02 0.153  lux          0.08
#> 3 CC876  2021-08-02 00:14:10 2021-08-02 0.236  lux          0.08
#> 4 CC876  2021-08-02 00:19:10 2021-08-02 0.319  lux          0.08
#> 5 CC876  2021-08-02 00:24:10 2021-08-02 0.403  lux          0.08
#> 6 CC876  2021-08-02 00:29:10 2021-08-02 0.486  lux          0.08
#> # ℹ 17,274 more rows

3 Parallel processing

Finally, after the setup we can now call the main parallel routine to be executed. The setup follows the routine specified in the main README with a few exceptions. Mainly, the generation of the mask, step-selection function and setting the random seed must be included in the do() statement. The parallel sessions do not have access to shared memory. As such, we have to define the mask, step-selection function and random seed for each run (logger) separately.

# run the analysis in parallel
# on the cluster (local or remote)
locations <- df_logger |>
  group_by(logger) |>
  do({

    # set seed per parallel unit
    set.seed(1)

    # define land mask
    mask <- stk_mask(
      bbox  =  c(-20, -40, 60, 60),
      buffer = 150, # in km
      resolution = 0.5 # in degrees
    )

    # define land mask with a bounding box
    # and an off-shore buffer (in km), in addition
    # you can specifiy the resolution of the resulting raster
    mask <- stk_mask(
      bbox  =  c(-20, -40, 60, 60), #xmin, ymin, xmax, ymax
      buffer = 150, # in km
      resolution = 0.5 # map grid in degrees
    )

    # define a step selection distribution
    ssf <- function(x, shape = 0.9, scale = 100, tolerance = 1500){
      # normalize over expected range with km increments
      norm <- sum(stats::dgamma(1:tolerance, shape = shape, scale = scale))
      prob <- stats::dgamma(x, shape = shape, scale = scale) / norm
      return(prob)
    }

    skytrackr(
      .data,
      mask = mask,
      step_selection = ssf,
      plot = FALSE,
      verbose = FALSE,
      start_location = c(51.08, 3.73),
      tolerance = 1500, # in km
      scale = log(c(0.00001, 50)),
      range = c(0.09, 148),
      control = list(
        sampler = 'DEzs',
        settings = list(
          burnin = 250,
          iterations = 3000,
          message = FALSE
        )
      )
    )
  })

The output of the parallel run is a party_df data frame, which is incompatible with {skytrackr} functions. A simple coversion back to a tibble data frame is possible by calling the as.data.frame() function (dropping {multidplyr} ancillary data), making the data compatible with for example stk_map().

# drop the parallel processing info
locations <- locations |>
  as.data.frame()

These binaries (installable software) and packages are in development.
They may not be fully stable and should be used with caution. We make no claims about them.