R语言创建包和并行

创建R包

限制加载包的信息输出

  suppressMessages(require(DropletUtils)) # For the downsampling
  suppressMessages(require(purrr))
  suppressMessages(require(dplyr))
  suppressMessages(require(tidyr))

R中创建S4对象

要创建S4对象,我们可以使用setClass函数,在该函数中我们将传递对象名称,列名称以及将存储在列中的数据类型。例如,如果我们要创建一个具有名称数据和两个由x和y调用的数字列的S4,则可以使用setClass(“ data”,representation(x1 =“ numeric”,x2 =“ numeric”))。

setClass("data1",representation(x1="numeric",x2="numeric"))
data1<-new("data1",x1=rnorm(20),x2=rexp(20,1.12))
data1

上传到CRAN的准备工作

Description文件

(1)The Title field非介词类单词需要大写. 比如’Joint Dimension Reduction and Spatial Clustering for Single-Cell RNA Sequencing and Spatial Transcriptomics Data’;标题不能超过65个字符,可以用nchar函数来检查。错误例子如:‘Joint dimension reduction and spatial clustering for Single-cell RNA sequencing and spatial transcriptomics data’。 ## Add data to this packeages usethis::use_data(PC)

(2)在函数帮助文件中,每个keyword里面只能放一个关键词,如 keyword{DR-SC}; keyword{tSNE}。非标准的关键词用命令concept{}而不要用keyword. 一个keyword里面不能放多个关键词, 比如:keyword{DR-SC, tSNE, UMAP.}

本地检测package的error和note

利用CMD到Windows命令行,然后用R CMD check –as-cran GFM_1.1.0.tar.gz 可以用于初步本地检测是否存在Error, Warning和Note,比发邮件更方便。直到只有一个关于“New submission”的Note为止,不能包含一个error和warning。

Vignettes格式

其中需要注意output的设置是只输出html!另外,其中的R命令,需要设置关闭模式,即eval=FALSE,否则运行时间无法通过CRAN的审核。另外,使用Vignettes,description中必须添加两句话:VignetteBuilder: knitr Suggests: knitr, rmarkdown

---title: 'DR-SC: HCC1 Data Analysis'
author: "Wei Liu"
date: "`r Sys.Date()`"
output: rmarkdown::html_vignette
vignette: >
  %\VignetteIndexEntry{DR-SC HCC1}
  %\VignetteEngine{knitr::rmarkdown}
  %\VignetteEncoding{UTF-8}
---
knitr::opts_chunk$set(
  collapse = TRUE,
  comment = "#>"
)
library("DR.SC")

global.R文件

新建一个R/global.R文件来声明nonvisible的函数和变量,避免出现warning,文件内容类似如下:

utils::globalVariables(c("makeSOCKcluster", "%dopar%", "detectCores",
                         "makeCluster", "stopCluster"));

若用并行计算:需要在namespace中添加:

import(parallel)
import(doSNOW)

邮件检测

初步本地检测合格了,再用邮件检测。具体操作为: check the corrrectness of Windows:

devtools::check_win_release();

然后你会收到CRAN在Windows平台检测的报告详情,你可以根据报告再做相应修改,直到只有一个note为止。

建立package网站

建立网站:https://pkgdown.r-lib.org/

parallel包并行

single_parallel <- function(func,num_core,iterable,varlist=NULL,...){
  "
  :param func: function to be parallel computing
  :param iteralbe:a dynamic parameter(vector銆乴ist) of func.
  :param ...: all static paramters of func.
  :return list whose length is equivalent to that of iterable.
  "
  #1.load package
  library(parallel)
  #2.count the number of cores
  # cores <- detectCores(logical = FALSE)
  #3.open parallel computing nodes
  cl <- makeCluster(num_core)
  #4.pass objects for each node.
  funcname <- deparse(substitute(func))
  varlist <- c(funcname,varlist)
  clusterExport(cl, varlist = varlist, envir = environment())
  # Put the reqiured functions in GFM package into all nodes.
  clusterCall(cl, function() library(MixPPCA))
  #5.start to parallel computing
  result <- parSapply(cl=cl,X=iterable,FUN=func,...)
  #6.close parallel computing
  stopCluster(cl)
  return(result)
}

parSingle_IC <- function(K, x1, Adj_sp1, q1, cov1, ...){
  reslist <- simulDRcluster(x1,Adj_sp = Adj_sp1, q=q1, K=K, cov=cov1, ...)
  return(c(reslist$bic, reslist$aic))
}

selectClustNumber <- function(X,Adj_sp, q, K_set= 3:10, cov='heter', num_core = 1, verbose=F){
  
  if (num_core > 1) {
    if (num_core > detectCores()) {
      warning("selectClustNumber:: the number of cores you're setting is larger than detected cores!")
      num_core = detectCores()
    }
  }
  nK <- length(K_set)
  
  
  icMat <- single_parallel(parSingle_IC, num_core=num_core,
                           iterable=K_set, x1=X, Adj_sp1=Adj_sp, q1=q, cov1=cov)
  icMat <- t(icMat)
  colnames(icMat) <- c('bic', 'aic')
  return(icMat)
}

doSNOW包并行

doSNOW的一个优势是它可以显示进度条

require(doSNOW) # 加载并行包
cores <- parallel::detectCores() # 探测有多少核
cl <- makeSOCKcluster(cores) # 设定并行核
registerDoSNOW(cl) # 注册该核
mydata <- matrix(rnorm(8000*500), ncol=500) 
pb <- txtProgressBar(min=1, max=8000, style=3) # 设置并行进度条
progress <- function(n) setTxtProgressBar(pb, n) # 进度条函数
opts <- list(progress=progress) # 并性函数的参数
result <- 
  foreach(i=1:8000, .packages=NULL, .options.snow=opts, # 注意这里的.package参数非常重要
          .combine='rbind') %dopar% { # dopar%后面的内容若有需要使用到某些包的函数,这些包的名字都需要添加到.pakcage参数中
            abc <- list(tau=i^2, sl = i)
            data.frame(tau=abc$tau, sl=abc$sl) #设定该并行内容的返回值。
          }
close(pb) # 关闭进度窗
stopCluster(cl) # 关闭并行核

我自己定义的函数:

K_set <- 2:4
nK <- length(K_set)
pb <- txtProgressBar(min=1, max=nK, style=3)
progress <- function(n) setTxtProgressBar(pb, n)
opts <- list(progress=progress)
k <- 1
res_vc <- foreach(k = 1:nK,.packages="MixPPCA" ,.options.snow=opts,
                  .combine='rbind') %dopar% {
    reslist <- simulDRcluster(X,Adj_sp = Adj_sp, q=q, K=K_set[k], cov="heter") 
  
    c(reslist$bic, reslist$aic)
}

两种并行版本的结合体

function(X,Adj_sp, q, K_set= 3:10, cov='heter', parallel="parallel", num_core = 1,...){
  
  
  nK <- length(K_set)
  if(!is.null(parallel)){
    if (num_core > 1) {
      if (num_core > detectCores()) {
        warning("selectClustNumber:: the number of cores you're setting is larger than detected cores!")
        num_core = detectCores()
      }
    }
    if(parallel=='doSNOW'){
      require(doSNOW)
      cl <- makeSOCKcluster(num_core)
      registerDoSNOW(cl)
      
      ## set Prgogress Bar
      pb <- txtProgressBar(min=1, max=nK, style=3)
      progress <- function(n) setTxtProgressBar(pb, n)
      opts <- list(progress=progress)
      k <- 1
      icMat <- foreach(k = 1:nK,.packages="MixPPCA" ,.options.snow=opts,
                       .combine='rbind') %dopar% {
                         reslist <- simulDRcluster(X,Adj_sp = Adj_sp, q=q, K=K_set[k],  ...) 
                         
                         c(reslist$bic, reslist$aic)
                       }
      close(pb)
      stopCluster(cl)
    }else if(parallel=='parallel'){
      library(parallel)
      
      cl <- makeCluster(num_core)
      clusterExport(cl, list("EMmPCpp", 
                             "EMmPCpp_heter", "icmem_heterCpp", "simulDRcluster"))
      cat("Starting parallel computing...")
      # clusterCall(cl, function() library(MixPPCA))
      # Run
      icMat <- parSapply(cl, X=K_set, parafun1, XX=X, Adj_sp=Adj_sp, q=q, ...)
      stopCluster(cl)
      icMat <- t(icMat)
    }
      
  }else{
    icMat <- matrix(NA, nK, 2)
    pb <- txtProgressBar()
    for(k in 1:nK){
      reslist <- simulDRcluster(X,Adj_sp = Adj_sp, q=q, K=K_set[k],  ...) 
      setTxtProgressBar(pb, k)
      icMat[k, ] <- c(reslist$bic, reslist$aic)
    }
    close(pb)
  }
  
  
  
  
  icMat <- cbind(K_set, icMat)
  colnames(icMat) <- c("K", 'bic', 'aic')
  row.names(icMat) <- as.character(K_set)
  return(icMat)
}

内部变量赋值保存

用一个外部矩阵或数组变量将并行的每次返回结果都保存下来,以防某次并行报错后,无结果返回。

pfun <- function(i){
  abc <- list(tau=i^2, sl = i)
  mydata[i,1] <<- abc$tau##change global variables
  mydata[i,2] <<- abc$sl
  cat("i=",i,"\n")
  return(data.frame(tau=abc$tau, sl=abc$sl))
}
# pfun(1)
library(doSNOW)
paral.size <- 1000
mydata <- matrix(NA,paral.size,2)
cores <- parallel::detectCores()##number of cores of system
cor.num <- makeSOCKcluster(cores)##set the number of parallel cores
registerDoSNOW(cor.num)##register cores
library(tcltk)
pb <- tkProgressBar(title="进度",label="已完成 %", min=0, max=100, initial = 0, width = 300)  
res1 <- foreach(i=1:paral.size,.combine='rbind') %do% {
  info<- sprintf("已完成 %d%%", round(i*100/paral.size))  
  setTkProgressBar(pb, value = i*100/paral.size, title = sprintf("进度 (%s)",info),label = info)  
  pfun(i)
}
close(pb)  
stopCluster(cor.num)##close cores

head(res1)
head(mydata)
## 来自李贵珍师妹的版本

为一般循环添加进度条

testit <- function(x = sort(runif(20)), ...)
{
    pb <- txtProgressBar(...)
    for(i in c(0, x, 1)) {Sys.sleep(0.5); setTxtProgressBar(pb, i)}
    Sys.sleep(1)
    close(pb)
}
testit(style=3)
testit(runif(10), style=2)
testit(style = 4)