Embarrassingly Parallel Computing with googleComputeEngineR

Why?

You want to run 100 regressions, they each take one hour, and the only difference is the data set they are using. This is an embarrassingly parallel problem.

Before you start

I will assume that:

  • you have a Google compute engine account,
  • you have correctly installed, and configured googleComputeEngineR

Create some fake data

library(googleComputeEngineR)
library(dplyr)
library(stringr)
library(future)
library(future.apply)

set.seed(12618)
n<-10000
fakeData <- list()
for(ii in 1:100){
  fakeData[[ii]] <- future({
    fakeDF <- data.frame(x=rnorm(n,0,1), e=rnorm(n,0,1)) %>% mutate(y=0.5*x+e) %>% select(-e)
    fname <- paste0("./data/file",str_pad(ii, 3, pad = "0"),".RDS")
    saveRDS(fakeDF, file = fname)
    return(paste0(fname, " has been writen"))
  })
}

v <- lapply(fakeData, FUN = value)

Create a cluster

# names for your cluster
vm_names <- c("vm1","vm2","vm3")

## create the cluster using default template for r-base
## creates jobs that are creating VMs in background

jobs <- lapply(vm_names, function(x) {
  gce_vm_template(template = "r-base",
                  predefined_type = "f1-micro",
                  name = x,
                  dynamic_image = gce_tag_container("im-rstudio"),
                  wait = FALSE)
})


jobs

## check status of jobs
lapply(jobs, gce_get_op)

## wait for all the jobs to complete and VMs are ready
vms <- lapply(jobs, gce_wait)

## get the VM objects
vms <- lapply(vm_names, gce_vm)

## set up SSH for the VMs
vms <- lapply(vms, gce_ssh_setup,
              username = "ignacio",
              key.pub = "/home/ignacio/.ssh/id_rsa.pub",
              key.private = "/home/ignacio/.ssh/id_rsa")


# gce_ssh(vms[[1]], "echo foo")

# What I want to do -------------------------------------------------------

# I want to run 100 regression using this cluster of 3 nodes
# lm(formula = y~x, data = df)

my_files <- list.files("data")

my_data <- lapply(paste0("./data/",my_files), readRDS)

my_func <- function(data){
  lm(formula = y~x, data = data)
}


## make a future cluster
plan(cluster, workers = as.cluster(vms))

result <- future_lapply(my_data, my_func) 



## once done this will be TRUE
resolved(result)

## Your list of forecasts are now available
result

## shutdown instances when finished
lapply(vms, gce_vm_stop)

## delete instances when finished
lapply(vms, gce_vm_delete)

Things to consider

Sending an receiving data to the nodes take time. If you only care about a coefficient, your function should only send back that coefficient instead of the whole lm object. You may also have noted that in this example I’m first loading all the data into my local computer. Instead of doing that I could save the data into a bucket and modify my script so I only pass the name of the file that the node should read. Even better, I could use BigQuery or a some other database.

Video talking about this

Ignacio Martinez avatar
About Ignacio Martinez
research economist, tech enthusiast
comments powered by Disqus