Tengo algunos problemas con el uso de RCPP (y RCPP Armadillo) con el paquete paralelo, y estoy obteniendo resultados incorrectos dependiendo de la cantidad de núcleos que utilice para el cálculo.

Tengo una función compute_indices que calcula 3 juegos de índices para cada observación en mis datos. Lo hace creando primero un clúster (horquilla) usando parallel::makeCluster Dependiendo del número de núcleos que especifique. Luego, divide mis datos en partes iguales y se aplica (usando parallel::parLapply) una función meancorlsm en cada parte usando el objeto de clúster que creé anteriormente. Ahora meancorlsm es básicamente una envoltura alrededor de una función (llamada covArmadilloMclsmNormal) Escribí en RCPP y RCPP ARMADILLO, ya que estoy tratando de acelerar el cálculo. Sin embargo, tengo otra versión de esta función escrita enteramente en R (llamada meancorlsR) que uso para probar la corrección de la versión RCCPARMADILLO.

Ahora, si ejecuto compute_indices con el meancorlsm (esto lo hago por el primero usando sourceCpp() para hacer covArmadilloMclsmNormal disponible en el entorno global), obtengo respuestas parcialmente correctas dependiendo de la Número de núcleos que le digo compute_indices. Específicamente, si uso 4 núcleos, el primer 1/4 de los índices computados es correcto, si uso 2 núcleos, el primer 1/2 de mis resultados es correcto, y si uso un solo núcleo, todos mis resultados son correctos . Verifico la exactitud de los resultados utilizando la respuesta producida utilizando la versión R de meancorlsm (meancorlsR como se indicó anteriormente). Dado que estoy obteniendo resultados correctos cuando uso un solo núcleo, siento que la función RCPPARMADILLO es correcta y posiblemente, los diferentes hilos / trabajadores del clúster están interfiriendo entre sí durante el cálculo, de ahí la razón por la que obtengo este comportamiento extraño.

Abajo es compute_indices:

compute_indices <- function(dt, nr = ncol(dt), n_core = 4){ 
  par_cluster <- makeCluster(n_core, type = "FORK")
  # compute the column splits/partition for parallel processing
  splits <- parallel:::splitList(1:nr, n_core)
  # compute auxiliar support data
  data_means <- colMeans(dt, na.rm = T)
  data_vars <- apply(dt, MARGIN =  2, var)
  data_sds <- apply(dt, 2, sd)
  # compute Outliers using parapply
  vectors <- do.call(rbind, parLapply(par_cluster, splits,
                                      meancorlsm, dt, data_means,
                                      data_vars, data_sds))
  stopCluster(par_cluster)
  vectors
}

Y meancorlsm

meancorlsm<- function(i, mtx, means, vars, sds){
  pre_outl <- covArmadilloMclsmNormal(dti = mtx[,i], dt = mtx,
                                      dtmeans = means, dtvars = vars,
                                      dtsds = sds)
  colnames(pre_outl) <- c("sh", "mg", "ap")
  pre_outl
}

Con la función covArmadilloMclsmNormal rcpp:


#include <RcppArmadillo.h>
using namespace Rcpp;


//[[Rcpp::depends(RcppArmadillo)]]
// [[Rcpp::export]]

arma::mat covArmadilloMclsmNormal(arma::mat dti, arma::mat dt,
                     arma::vec dtmeans, arma::vec dtvars,
                     arma::vec dtsds){
  arma::mat out(dt.n_cols, dti.n_cols);
  out = arma::cov(dt, dti);
  int n = out.n_rows;
  int p = out.n_cols;
  //declare matrices to hold result
  arma::vec temp(n);
  arma::mat preout(3,p);
  for(int i = 0; i<p ; ++i){
    temp = out.col(i)/dtvars;
    preout(0,i) = arma::mean((out.col(i)/dtsds))/dtsds(i); 
    preout(1, i) = dtmeans(i) - arma::mean(temp % dtmeans);
    preout(2, i) = arma::mean(temp); 
  }

  return preout.t();
}

Ahora aquí está la versión R de meancorlsm que uso para pruebas:

meancorlsR <- function(i, mtx, means, vars, sds){
  pre_outl <- apply(cov(mtx, mtx[,i], use = "pairwise.complete.obs"), 2,
                    function(col){
                      tmp <- col/vars
                      c("sh" = mean(col/sds, na.rm = T), 
                        "mg" = mean(tmp * means, na.rm = T), 
                        "ap" = mean(tmp, na.rm = T)) 
                    })
  pre_outl[1,] <- pre_outl[1,]/sds[i] 
  pre_outl[2,] <- means[i] - pre_outl[2,] 
  t(pre_outl)
}

Puede reemplazar la función meancorlsm con meancorlsR en la función compute_indices y funcionará (para pruebas). Sin embargo, para la reproducibilidad inmediata, lo proporciono aquí como compute_indicesR.

compute_indicesR <- function(dt, nr = ncol(dt), n_core = 4){ 
  par_cluster <- makeCluster(n_core, type = "FORK")
  # compute the column splits/partition for parallel processing
  splits <- parallel:::splitList(1:nr, n_core)
  # compute auxiliar support data
  data_means <- colMeans(dt, na.rm = T)
  data_vars <- apply(dt, MARGIN =  2, var)
  data_sds <- apply(dt, 2, sd)
  # compute using parapply
  vectors <- do.call(rbind, parLapply(par_cluster, splits,
                                      meancorlsR, dt, data_means, 
                                      data_vars, data_sds))
  stopCluster(par_cluster)
  vectors
}

Finalmente, aquí hay un ejemplo mínimo para ejecutar:

library(Rcpp)
library(parallel)
# use this to source the Rcpp function from a file
# to make the covArmadilloMclsmNormal function available
sourceCpp("covArmadilloMclsmNormal.cpp")

data("attitude") # available in datasets in base R
dt <- t(as.matrix(attitude[1:10,])) #select first 10 row 

indices_rcpp4 <- compute_indices(dt) # using 4 cores
indices_rcpp2 <- compute_indices(dt, n_core = 2) # using 2 cores
indices_rcpp1 <- compute_indices(dt, n_core = 1) # 1 core
# using the R version
# already replaced the meancorlsm function to meancorlsR here  
indices_R <- compute_indicesR(dt) # R version

Espero que toda la salida coincida con la producida por la versión R independientemente del número de núcleos que especifique. Sin embargo, son diferentes. Aquí está el resultado que obtengo con la versión R:

"           sh               mg               ap 
1   0.634272567307155 -7.09315427645087   0.992492531586726
2   0.868144125333511  22.3206363514708   0.622504642756242
3   0.819231480417289  24.8027625928423   0.756088388472384
4   0.830462006956641 -6.26663378557611   1.03847748215856
5   0.836182582923674  15.0558414918816   0.901413435882058
6   0.648813304451793  23.4689784056255   0.40175151333289
7   0.839409670144446  3.73900558549848   0.883655665107456
8   0.781070895796188  13.1775081516076   0.810306856575333
9   0.772967959938828  2.24023877077873   1.1146249477264
10  0.826849986442202  3.31330282673472   0.910527502047015"

El resultado que obtuve con la versión RCPP usando 2 núcleos es

"           sh               mg               ap 
1   0.634272567307155 -7.09315427645086   0.992492531586726
2   0.868144125333511  22.3206363514708   0.622504642756242
3   0.819231480417289  24.8027625928424   0.756088388472384
4   0.830462006956641 -6.26663378557612   1.03847748215856
5   0.836182582923674  15.0558414918816   0.901413435882058
6   0.231943043232274  28.1832641199112   0.40175151333289
7   1.20839881621289   7.02471987121276   0.883655665107456
8   0.865212462148289  21.7489367230362   0.810306856575333
9   0.853048693647409  -10.474046943507   1.1146249477264
10  0.857055188335614  14.599017112449    0.910527502047015"

Mientras que para la versión RCCP usando 4 núcleos:

"           sh               mg               ap 
1   0.634272567307155 -7.09315427645086   0.992492531586726
2   0.868144125333511  22.3206363514708   0.622504642756242
3   0.819231480417289  24.8027625928424   0.756088388472384
4   0.648794650804865 -10.2666337855761   1.03847748215856
5   1.25119408317776   5.48441292045304   0.901413435882058
6   0.231943043232274  28.1832641199112   0.40175151333289
7   1.20839881621289   7.02471987121276   0.883655665107456
8   0.487272877566209  3.60607958017906   0.810306856575333
9   1.50139103326263  -6.75976122922128   1.1146249477264
10  1.01076542369015   15.4561599695919   0.910527502047015"

La versión RCCP que usa un solo núcleo produjo la misma respuesta que la versión R, que es el resultado correcto. También es interesante que la columna ap de la respuesta permaneció igual, sin importar el número de núcleos que utilicé mientras cambia la columna sh y la columna mg.

Finalmente, mi plataforma es Ubuntu 16.04. Parece que los clusters FORK no funcionan en Windows, por lo que es posible que no pueda reproducir este resultado. Sin embargo, obtuve el mismo comportamiento incluso con el clúster PSOCK (en cuyo caso usé clusterEvalQ() para obtener las funciones de CPP necesarias para que estén disponibles para los trabajadores). Cualquier ayuda o visión en cuanto a lo que estoy haciendo mal es muy apreciado.

3
Segun Ojo 13 jul. 2019 a las 23:02

1 respuesta

La mejor respuesta

No importa mis comentarios, malinterpreté la documentación de Armadillo.

Su código C ++ está indexando el Helper dtmeans y dtsds vectores con i, pero i siempre comienza en cero para cada instancia paralela, Por lo tanto, debe pasar un desplazamiento que indique cuántas columnas se saltó:

//[[Rcpp::depends(RcppArmadillo)]]

#include <RcppArmadillo.h>

//[[Rcpp::export]]
arma::mat covArmadilloMclsmNormal(arma::mat dti, arma::mat dt, int offset,
                                  arma::vec dtmeans, arma::vec dtvars, arma::vec dtsds)
{
  arma::mat out = arma::cov(dt, dti);
  size_t p = out.n_cols;

  arma::mat preout(p,3);
  for (int i = 0; i < p; ++i) {
    arma::vec temp = out.col(i) / dtvars;
    preout(i,0) = arma::mean((out.col(i) / dtsds)) / dtsds(i + offset); 
    preout(i,1) = dtmeans(i + offset) - arma::mean(temp % dtmeans);
    preout(i,2) = arma::mean(temp); 
  }

  return preout;
}

Así que en consecuencia:

meancorlsm <- function(i, mtx, means, vars, sds){
  pre_outl <- covArmadilloMclsmNormal(dti = mtx[,i, drop = FALSE], dt = mtx, offset = min(i) - 1L,
                                      dtmeans = means, dtvars = vars,
                                      dtsds = sds)
  colnames(pre_outl) <- c("sh", "mg", "ap")
  pre_outl
}

Incluso puedes corroborarlo secuencialmente:

data("attitude") # available in datasets in base R
dt <- t(as.matrix(attitude[1:10,])) #select first 10 row 

# compute the column splits/partition for parallel processing
splits <- parallel:::splitList(1:ncol(dt), 2)

# compute auxiliary support data
data_means <- colMeans(dt, na.rm = T)
data_vars <- apply(dt, MARGIN =  2, var)
data_sds <- apply(dt, 2, sd)

do.call(rbind, lapply(splits,
                      meancorlsm, dt, data_means,
                      data_vars, data_sds))
             sh        mg        ap
 [1,] 0.6342726 -7.093154 0.9924925
 [2,] 0.8681441 22.320636 0.6225046
 [3,] 0.8192315 24.802763 0.7560884
 [4,] 0.8304620 -6.266634 1.0384775
 [5,] 0.8361826 15.055841 0.9014134
 [6,] 0.6488133 23.468978 0.4017515
 [7,] 0.8394097  3.739006 0.8836557
 [8,] 0.7810709 13.177508 0.8103069
 [9,] 0.7729680  2.240239 1.1146249
[10,] 0.8268500  3.313303 0.9105275

Por cierto, creo que las matrices de asignación previa en el código C ++ no son de uso, si simplemente lo sobrescribirán con =.

1
Alexis 14 jul. 2019 a las 15:13