R语言创建包和并行
内容目录
创建R包
限制加载包的信息输出
suppressMessages(require(DropletUtils)) # For the downsampling
suppressMessages(require(purrr))
suppressMessages(require(dplyr))
suppressMessages(require(tidyr))
R中创建S4对象
要创建S4对象,我们可以使用setClass函数,在该函数中我们将传递对象名称,列名称以及将存储在列中的数据类型。例如,如果我们要创建一个具有名称数据和两个由x和y调用的数字列的S4,则可以使用setClass(“ data”,representation(x1 =“ numeric”,x2 =“ numeric”))。
setClass("data1",representation(x1="numeric",x2="numeric"))
data1<-new("data1",x1=rnorm(20),x2=rexp(20,1.12))
data1
上传到CRAN的准备工作
Description文件
(1)The Title field非介词类单词需要大写. 比如’Joint Dimension Reduction and Spatial Clustering for Single-Cell RNA Sequencing and Spatial Transcriptomics Data’;标题不能超过65个字符,可以用nchar函数来检查。错误例子如:‘Joint dimension reduction and spatial clustering for Single-cell RNA sequencing and spatial transcriptomics data’。 ## Add data to this packeages usethis::use_data(PC)
(2)在函数帮助文件中,每个keyword里面只能放一个关键词,如 keyword{DR-SC}; keyword{tSNE}。非标准的关键词用命令concept{}而不要用keyword. 一个keyword里面不能放多个关键词, 比如:keyword{DR-SC, tSNE, UMAP.}
本地检测package的error和note
利用CMD到Windows命令行,然后用R CMD check –as-cran GFM_1.1.0.tar.gz 可以用于初步本地检测是否存在Error, Warning和Note,比发邮件更方便。直到只有一个关于“New submission”的Note为止,不能包含一个error和warning。
Vignettes格式
其中需要注意output的设置是只输出html!另外,其中的R命令,需要设置关闭模式,即eval=FALSE,否则运行时间无法通过CRAN的审核。另外,使用Vignettes,description中必须添加两句话:VignetteBuilder: knitr Suggests: knitr, rmarkdown
---title: 'DR-SC: HCC1 Data Analysis'
author: "Wei Liu"
date: "`r Sys.Date()`"
output: rmarkdown::html_vignette
vignette: >
%\VignetteIndexEntry{DR-SC HCC1}
%\VignetteEngine{knitr::rmarkdown}
%\VignetteEncoding{UTF-8}
---
knitr::opts_chunk$set(
collapse = TRUE,
comment = "#>"
)
library("DR.SC")
global.R文件
新建一个R/global.R文件来声明nonvisible的函数和变量,避免出现warning,文件内容类似如下:
utils::globalVariables(c("makeSOCKcluster", "%dopar%", "detectCores",
"makeCluster", "stopCluster"));
若用并行计算:需要在namespace中添加:
import(parallel)
import(doSNOW)
邮件检测
初步本地检测合格了,再用邮件检测。具体操作为: check the corrrectness of Windows:
devtools::check_win_release();
然后你会收到CRAN在Windows平台检测的报告详情,你可以根据报告再做相应修改,直到只有一个note为止。
建立package网站
parallel包并行
single_parallel <- function(func,num_core,iterable,varlist=NULL,...){
"
:param func: function to be parallel computing
:param iteralbe:a dynamic parameter(vector銆乴ist) of func.
:param ...: all static paramters of func.
:return list whose length is equivalent to that of iterable.
"
#1.load package
library(parallel)
#2.count the number of cores
# cores <- detectCores(logical = FALSE)
#3.open parallel computing nodes
cl <- makeCluster(num_core)
#4.pass objects for each node.
funcname <- deparse(substitute(func))
varlist <- c(funcname,varlist)
clusterExport(cl, varlist = varlist, envir = environment())
# Put the reqiured functions in GFM package into all nodes.
clusterCall(cl, function() library(MixPPCA))
#5.start to parallel computing
result <- parSapply(cl=cl,X=iterable,FUN=func,...)
#6.close parallel computing
stopCluster(cl)
return(result)
}
parSingle_IC <- function(K, x1, Adj_sp1, q1, cov1, ...){
reslist <- simulDRcluster(x1,Adj_sp = Adj_sp1, q=q1, K=K, cov=cov1, ...)
return(c(reslist$bic, reslist$aic))
}
selectClustNumber <- function(X,Adj_sp, q, K_set= 3:10, cov='heter', num_core = 1, verbose=F){
if (num_core > 1) {
if (num_core > detectCores()) {
warning("selectClustNumber:: the number of cores you're setting is larger than detected cores!")
num_core = detectCores()
}
}
nK <- length(K_set)
icMat <- single_parallel(parSingle_IC, num_core=num_core,
iterable=K_set, x1=X, Adj_sp1=Adj_sp, q1=q, cov1=cov)
icMat <- t(icMat)
colnames(icMat) <- c('bic', 'aic')
return(icMat)
}
doSNOW包并行
doSNOW的一个优势是它可以显示进度条
require(doSNOW) # 加载并行包
cores <- parallel::detectCores() # 探测有多少核
cl <- makeSOCKcluster(cores) # 设定并行核
registerDoSNOW(cl) # 注册该核
mydata <- matrix(rnorm(8000*500), ncol=500)
pb <- txtProgressBar(min=1, max=8000, style=3) # 设置并行进度条
progress <- function(n) setTxtProgressBar(pb, n) # 进度条函数
opts <- list(progress=progress) # 并性函数的参数
result <-
foreach(i=1:8000, .packages=NULL, .options.snow=opts, # 注意这里的.package参数非常重要
.combine='rbind') %dopar% { # dopar%后面的内容若有需要使用到某些包的函数,这些包的名字都需要添加到.pakcage参数中
abc <- list(tau=i^2, sl = i)
data.frame(tau=abc$tau, sl=abc$sl) #设定该并行内容的返回值。
}
close(pb) # 关闭进度窗
stopCluster(cl) # 关闭并行核
我自己定义的函数:
K_set <- 2:4
nK <- length(K_set)
pb <- txtProgressBar(min=1, max=nK, style=3)
progress <- function(n) setTxtProgressBar(pb, n)
opts <- list(progress=progress)
k <- 1
res_vc <- foreach(k = 1:nK,.packages="MixPPCA" ,.options.snow=opts,
.combine='rbind') %dopar% {
reslist <- simulDRcluster(X,Adj_sp = Adj_sp, q=q, K=K_set[k], cov="heter")
c(reslist$bic, reslist$aic)
}
两种并行版本的结合体
function(X,Adj_sp, q, K_set= 3:10, cov='heter', parallel="parallel", num_core = 1,...){
nK <- length(K_set)
if(!is.null(parallel)){
if (num_core > 1) {
if (num_core > detectCores()) {
warning("selectClustNumber:: the number of cores you're setting is larger than detected cores!")
num_core = detectCores()
}
}
if(parallel=='doSNOW'){
require(doSNOW)
cl <- makeSOCKcluster(num_core)
registerDoSNOW(cl)
## set Prgogress Bar
pb <- txtProgressBar(min=1, max=nK, style=3)
progress <- function(n) setTxtProgressBar(pb, n)
opts <- list(progress=progress)
k <- 1
icMat <- foreach(k = 1:nK,.packages="MixPPCA" ,.options.snow=opts,
.combine='rbind') %dopar% {
reslist <- simulDRcluster(X,Adj_sp = Adj_sp, q=q, K=K_set[k], ...)
c(reslist$bic, reslist$aic)
}
close(pb)
stopCluster(cl)
}else if(parallel=='parallel'){
library(parallel)
cl <- makeCluster(num_core)
clusterExport(cl, list("EMmPCpp",
"EMmPCpp_heter", "icmem_heterCpp", "simulDRcluster"))
cat("Starting parallel computing...")
# clusterCall(cl, function() library(MixPPCA))
# Run
icMat <- parSapply(cl, X=K_set, parafun1, XX=X, Adj_sp=Adj_sp, q=q, ...)
stopCluster(cl)
icMat <- t(icMat)
}
}else{
icMat <- matrix(NA, nK, 2)
pb <- txtProgressBar()
for(k in 1:nK){
reslist <- simulDRcluster(X,Adj_sp = Adj_sp, q=q, K=K_set[k], ...)
setTxtProgressBar(pb, k)
icMat[k, ] <- c(reslist$bic, reslist$aic)
}
close(pb)
}
icMat <- cbind(K_set, icMat)
colnames(icMat) <- c("K", 'bic', 'aic')
row.names(icMat) <- as.character(K_set)
return(icMat)
}
内部变量赋值保存
用一个外部矩阵或数组变量将并行的每次返回结果都保存下来,以防某次并行报错后,无结果返回。
pfun <- function(i){
abc <- list(tau=i^2, sl = i)
mydata[i,1] <<- abc$tau##change global variables
mydata[i,2] <<- abc$sl
cat("i=",i,"\n")
return(data.frame(tau=abc$tau, sl=abc$sl))
}
# pfun(1)
library(doSNOW)
paral.size <- 1000
mydata <- matrix(NA,paral.size,2)
cores <- parallel::detectCores()##number of cores of system
cor.num <- makeSOCKcluster(cores)##set the number of parallel cores
registerDoSNOW(cor.num)##register cores
library(tcltk)
pb <- tkProgressBar(title="进度",label="已完成 %", min=0, max=100, initial = 0, width = 300)
res1 <- foreach(i=1:paral.size,.combine='rbind') %do% {
info<- sprintf("已完成 %d%%", round(i*100/paral.size))
setTkProgressBar(pb, value = i*100/paral.size, title = sprintf("进度 (%s)",info),label = info)
pfun(i)
}
close(pb)
stopCluster(cor.num)##close cores
head(res1)
head(mydata)
## 来自李贵珍师妹的版本
为一般循环添加进度条
testit <- function(x = sort(runif(20)), ...)
{
pb <- txtProgressBar(...)
for(i in c(0, x, 1)) {Sys.sleep(0.5); setTxtProgressBar(pb, i)}
Sys.sleep(1)
close(pb)
}
testit(style=3)
testit(runif(10), style=2)
testit(style = 4)