李昕垚的博客

sparklyr初探

在大数据炒的和房子一样的时代,spark 可谓无人不知,无人不晓。spark做为开源的Apache计算引擎,可以做低延时、分布式的计算,具备处理大数据的能力,另外还内置机器学习。spark在前一段时间支持了R语言,RStudio又推出了新包 sparklyr ,为了避免落伍,我赶紧瞄一眼sparklyr是何方神圣

sparklyr是R与spark的一个接口,可以将复杂的计算任务交给spark处理,做统计或者展现的话再用R处理,提高效率。可贵的是,sparklyr结合dplyr处理数据的函数,这样就不用去学其他的编程语言了

连接spark

1
2
library(devtools)
install_github("rstudio/sparklyr") # 下载

查看可用的spark版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
library(sparklyr)
> spark_available_versions()
spark hadoop
1 1.6.2 2.6
2 1.6.2 2.4
3 1.6.2 2.3
4 1.6.2 cdh4
5 1.6.1 2.6
6 1.6.1 2.4
7 1.6.1 2.3
8 1.6.1 cdh4
9 1.6.0 2.6
10 1.6.0 2.4
11 1.6.0 2.3
12 1.6.0 cdh4
13 2.0.0-preview 2.7
14 2.0.0-preview 2.6
15 2.0.0-preview 2.4
16 2.0.0-preview 2.3
install
1 spark_install(version = "1.6.2", hadoop_version = "2.6")
2 spark_install(version = "1.6.2", hadoop_version = "2.4")
3 spark_install(version = "1.6.2", hadoop_version = "2.3")
4 spark_install(version = "1.6.2", hadoop_version = "cdh4")
5 spark_install(version = "1.6.1", hadoop_version = "2.6")
6 spark_install(version = "1.6.1", hadoop_version = "2.4")
7 spark_install(version = "1.6.1", hadoop_version = "2.3")
8 spark_install(version = "1.6.1", hadoop_version = "cdh4")
9 spark_install(version = "1.6.0", hadoop_version = "2.6")
10 spark_install(version = "1.6.0", hadoop_version = "2.4")
11 spark_install(version = "1.6.0", hadoop_version = "2.3")
12 spark_install(version = "1.6.0", hadoop_version = "cdh4")
13 spark_install(version = "2.0.0-preview", hadoop_version = "2.7")
14 spark_install(version = "2.0.0-preview", hadoop_version = "2.6")
15 spark_install(version = "2.0.0-preview", hadoop_version = "2.4")
16 spark_install(version = "2.0.0-preview", hadoop_version = "2.3")

下载一个spark版本

1
spark_install(version = "1.6.2", hadoop_version = "2.6")

本地连接spark

1
sc = spark_connect(master = "local")

RStudio也提供了spark连接的界面

几个重要的函数

  • 将文件的数据读入spark:spark_read_csvspark_read_jsonspark_read_parquet。以csv为例
1
spark_read_csv(sc,name = "mydata",path = "xx.csv",header = TRUE,charset = "GB18030")
  • 将本地R数据集copy to spark
1
2
3
library(nycflights13)
flights = copy_to(sc,flights,"flights")
src_tbls(sc) # 列出source里的tbls
  • 将数据从spark复制到R内存
1
mydata1 = collect(flights)
  • 将结果保存为临时数据集
1
compute(mydata1,"flights")
  • 将dplyr的语句转化为spark sql
1
2
3
4
5
bestworst = flights %>%
group_by(year,month,day) %>%
select(dep_delay) %>%
filter(dep_delay == min(dep_delay) | dep_delay == max(dep_delay))
sql_render(bestworst) # 返回sql结果
  • 将结果写入HDFS
1
spark_write_parquet(tbl,"hdfs://hdfs.company.org:9000/hdfs-path/data")

意外发现了dplyr里的抽样函数,可设置replace = TRUE

1
2
sample_n(data,10) # 随机取10条
sample_frac(data,0.01) # 随机抽1%

spark机器学习的部分有时间再看,未完待续……

请李昕垚吃个糖?