Spark手册Word文档下载推荐.docx
《Spark手册Word文档下载推荐.docx》由会员分享,可在线阅读,更多相关《Spark手册Word文档下载推荐.docx(23页珍藏版)》请在冰点文库上搜索。
![Spark手册Word文档下载推荐.docx](https://file1.bingdoc.com/fileroot1/2023-5/4/7f3b3e94-de93-45d6-9ed8-8309af883415/7f3b3e94-de93-45d6-9ed8-8309af8834151.gif)
2.在大型数据集上进行交互式分析,数据科学家可以在数据集上做ad-hoc查询。
毫无疑问,历经数年发展,Hadoop生态圈中的丰富工具已深受用户喜爱,然而这里仍然存在众多问题给使用带来了挑战:
1.每个用例都需要多个不同的技术堆栈来支撑,在不同使用场景下,大量的解决方案往往捉襟见肘。
2.在生产环境中机构往往需要精通数门技术。
3.许多技术存在版本兼容性问题。
4.无法在并行job中更快地共享数据。
而通过ApacheSpark,上述问题迎刃而解!
ApacheSpark是一个轻量级的内存集群计算平台,通过不同的组件来支撑批、流和交互式用例,如下图。
二、关于ApacheSpark
ApacheSpark是个开源和兼容Hadoop的集群计算平台。
由加州大学伯克利分校的AMPLabs开发,作为BerkeleyDataAnalyticsStack(BDAS)的一部分,当下由大数据公司Databricks保驾护航,更是Apache旗下的顶级项目,下图显示了ApacheSpark堆栈中的不同组件。
ApacheSpark的5大优势:
1.更高的性能,因为数据被加载到集群主机的分布式内存中。
数据可以被快速的转换迭代,并缓存用以后续的频繁访问需求。
很多对Spark感兴趣的朋友可能也会听过这样一句话——在数据全部加载到内存的情况下,Spark可以比Hadoop快100倍,在内存不够存放所有数据的情况下快Hadoop10倍。
2.通过建立在Java、Scala、Python、SQL(应对交互式查询)的标准API以方便各行各业使用,同时还含有大量开箱即用的机器学习库。
3.与现有Hadoopv1(SIMR)和2.x(YARN)生态兼容,因此机构可以进行无缝迁移。
4.方便下载和安装。
方便的shell(REPL:
Read-Eval-Print-Loop)可以对API进行交互式的学习。
5.借助高等级的架构提高生产力,从而可以讲精力放到计算上。
同时,ApacheSpark由Scala实现,代码非常简洁。
三、安装ApacheSpark
下表列出了一些重要链接和先决条件:
CurrentRelease
1.0.1@
DownloadsPage
https:
//spark.apache.org/downloads.html
JDKVersion(Required)
1.6orhigher
ScalaVersion(Required)
2.10orhigher
Python(Optional)
[2.6,3.0)
SimpleBuildTool(Required)
http:
//www.scala-sbt.org
DevelopmentVersion
gitclonegit:
//
BuildingInstructions
//spark.apache.org/docs/latest/building-with-maven.html
Maven
3.0orhigher
如图6所示,ApacheSpark的部署方式包括standalone、HadoopV1SIMR、Hadoop2YARN/Mesos。
ApacheSpark需求一定的Java、Scala或Python知识。
这里,我们将专注standalone配置下的安装和运行。
1.安装JDK1.6+、Scala2.10+、Python[2.6,3]和sbt
2.下载ApacheSpark1.0.1Release
3.在指定目录下Untar和Unzipspark-1.0.1.tgz
akuntamukkala@localhost~/Downloads$pwd
/Users/akuntamukkala/Downloadsakuntamukkala@localhost~/Downloads$tar-zxvfspark-1.0.1.tgz-C/Users/akuntamukkala/spark
4.运行sbt建立ApacheSpark
akuntamukkala@localhost~/spark/spark-1.0.1$pwd/Users/akuntamukkala/spark/spark-1.0.1akuntamukkala@localhost~/spark/spark-1.0.1$sbt/sbtassembly
5.发布Scala的ApacheSparkstandaloneREPL
/Users/akuntamukkala/spark/spark-1.0.1/bin/spark-shell
如果是Python
/Users/akuntamukkala/spark/spark-1.0.1/bin/pyspark
6.查看SparkUI@
//localhost:
4040
四、ApacheSpark的工作模式
Spark引擎提供了在集群中所有主机上进行分布式内存数据处理的能力,下图显示了一个典型Sparkjob的处理流程。
下图显示了ApacheSpark如何在集群中执行一个作业。
Master控制数据如何被分割,利用了数据本地性,并在Slaves上跟踪所有分布式计算。
在某个Slave不可用时,其存储的数据会分配给其他可用的Slaves。
虽然当下(1.0.1版本)Master还存在单点故障,但后期必然会被修复。
五、弹性分布式数据集(ResilientDistributedDataset,RDD)
弹性分布式数据集(RDD,从Spark1.3版本开始已被DataFrame替代)是ApacheSpark的核心理念。
它是由数据组成的不可变分布式集合,其主要进行两个操作:
transformation和action。
Transformation是类似在RDD上做filter()、map()或union()以生成另一个RDD的操作,而action则是count()、first()、take(n)、collect()等促发一个计算并返回值到Master或者稳定存储系统的操作。
Transformations一般都是lazy的,直到action执行后才会被执行。
SparkMaster/Driver会保存RDD上的Transformations。
这样一来,如果某个RDD丢失(也就是salves宕掉),它可以快速和便捷地转换到集群中存活的主机上。
这也就是RDD的弹性所在。
下图展示了Transformation的lazy:
我们可以通过下面示例来理解这个概念:
从文本中发现5个最常用的word。
下图显示了一个可能的解决方案。
在上面命令中,我们对文本进行读取并且建立字符串的RDD。
每个条目代表了文本中的1行。
scala>
valhamlet=sc.textFile(“/Users/akuntamukkala/temp/gutenburg.txt”)
hamlet:
org.apache.spark.rdd.RDD[String]=MappedRDD[1]attextFileat<
console>
:
12
valtopWordCount=hamlet.flatMap(str=>
str.split(““)).filter(!
_.isEmpty).map(word=>
(word,1)).reduceByKey(_+_).map{case(word,count)=>
(count,word)}.sortByKey(false)
topWordCount:
org.apache.spark.rdd.RDD[(Int,String)]=MapPartitionsRDD[10]atsortByKeyat<
14
1.
通过上述命令我们可以发现这个操作非常简单——通过简单的ScalaAPI来连接transformations和actions。
2.
可能存在某些words被1个以上空格分隔的情况,导致有些words是空字符串,因此需要使用filter(!
_.isEmpty)将它们过滤掉。
3.
每个word都被映射成一个键值对:
map(word=>
(word,1))。
4.
为了合计所有计数,这里需要调用一个reduce步骤——reduceByKey(_+_)。
_+_可以非常便捷地为每个key赋值。
5.
我们得到了words以及各自的counts,下一步需要做的是根据counts排序。
在ApacheSpark,用户只能根据key排序,而不是值。
因此,这里需要使用map{case(word,count)=>
(count,word)}将(word,count)流转到(count,word)。
6.
需要计算最常用的5个words,因此需要使用sortByKey(false)做一个计数的递减排序。
上述命令包含了一个.take(5)(anactionoperation,whichtriggerscomputation)和在/Users/akuntamukkala/temp/gutenburg.txt文本中输出10个最常用的words。
在Pythonshell中用户可以实现同样的功能。
RDDlineage可以通过toDebugString(一个值得记住的操作)来跟踪。
topWordCount.take(5).foreach(x=>
println(x))
(1044,the)
(730,and)
(679,of)
(648,to)
(511,I)
常用的Transformations:
Transformation&
Purpose
Example&
Result
filter(func)
Purpose:
newRDDbyselectingthosedataelementsonwhichfuncreturnstrue
valrdd=sc.parallelize(List(“ABC”,”BCD”,”DEF”))scala>
valfiltered=rdd.filter(_.contains(“C”))scala>
filtered.collect()
Result:
Array[String]=Array(ABC,BCD)
map(func)
returnnewRDDbyapplyingfunconeachdataelement
valrdd=sc.parallelize(List(1,2,3,4,5))scala>
valtimes2=rdd.map(_*2)scala>
times2.collect()
Array[Int]=Array(2,4,6,8,10)
flatMap(func)
SimilartomapbutfuncreturnsaSeqinsteadofavalue.Forexample,mappingasentenceintoaSeqofwords
valrdd=sc.parallelize(List(“Sparkisawesome”,”Itisfun”))scala>
valfm=rdd.flatMap(str=>
str.split(““))scala>
fm.collect()
Array[String]=Array(Spark,is,awesome,It,is,fun)
reduceByKey(func,[numTasks])
Toaggregatevaluesofakeyusingafunction.“numTasks”isanoptionalparametertospecifynumberofreducetasks
valword1=fm.map(word=>
(word,1))scala>
valwrdCnt=word1.reduceByKey(_+_)scala>
wrdCnt.collect()
Array[(String,Int)]=Array((is,2),(It,1),(awesome,1),(Spark,1),(fun,1))
groupByKey([numTasks])
Toconvert(K,V)to(K,Iterable<
V>
)
valcntWrd=wrdCnt.map{case(word,count)=>
(count,word)}scala>
cntWrd.groupByKey().collect()
Array[(Int,Iterable[String])]=Array((1,ArrayBuffer(It,awesome,Spark,fun)),(2,ArrayBuffer(is)))
distinct([numTasks])
EliminateduplicatesfromRDD
fm.distinct().collect()
Array[String]=Array(is,It,awesome,Spark,fun)
常用的集合操作:
TransformationandPurpose
ExampleandResult
union()
newRDDcontainingallelementsfromsourceRDDandargument.
Scala>
valrdd1=sc.parallelize(List(‘A’,’B’))
valrdd2=sc.parallelize(List(‘B’,’C’))
rdd1.union(rdd2).collect()
Array[Char]=Array(A,B,B,C)
intersection()
newRDDcontainingonlycommonelementsfromsourceRDDandargument.
rdd1.intersection(rdd2).collect()
Array[Char]=Array(B)
cartesian()
newRDDcrossproductofallelementsfromsourceRDDandargument
rdd1.cartesian(rdd2).collect()
Array[(Char,Char)]=Array((A,B),(A,C),(B,B),(B,C))
subtract()
newRDDcreatedbyremovingdataelementsinsourceRDDincommonwithargument
rdd1.subtract(rdd2).collect()
Array[Char]=Array(A)
join(RDD,[numTasks])
Wheninvokedon(K,V)and(K,W),thisoperationcreatesanewRDDof(K,(V,W))
valpersonFruit=sc.parallelize(Seq((“Andy”,“Apple”),(“Bob”,“Banana”),(“Charlie”,“Cherry”),(“Andy”,”Apricot”)))
valpersonSE=sc.parallelize(Seq((“Andy”,“Google”),(“Bob”,“Bing”),(“Charlie”,“Yahoo”),(“Bob”,”AltaVista”)))
personFruit.join(personSE).collect()
Array[(String,(String,String))]=Array((Andy,(Apple,Google)),(Andy,(Apricot,Google)),(Charlie,(Cherry,Yahoo)),(Bob,(Banana,Bing)),(Bob,(Banana,AltaVista)))
cogroup(RDD,[numTasks])
personFruit.cogroup(personSe).collect()
Array[(String,(Iterable[String],Iterable[String]))]=Array((Andy,(ArrayBuffer(Apple,Apricot),ArrayBuffer(google))),(Charlie,(ArrayBuffer(Cherry),ArrayBuffer(Yahoo))),(Bob,(ArrayBuffer(Banana),ArrayBuffer(Bing,AltaVista))))
更多transformations信息,请查看http:
//spark.apache.org/docs/latest/programming-guide.html#transformations
常用的actions
Action&
count()
getthenumberofdataelementsintheRDD
valrdd=sc.parallelize(list(‘A’,’B’,’c’))scala>
rdd.count()
long=3
collect()
getallthedataelementsinanRDDasanarray
rdd.collect()
Array[char]=Array(A,B,c)
reduce(func)
AggregatethedataelementsinanRDDusingthisfunctionwhichtakestwoargumentsandreturnsone
valrdd=sc.parallelize(list(1,2,3,4))scala>
rdd.reduce(_+_)
Int=10
take(n)
fetchfirstndataelementsinanRDD.computedbydriverprogram.
rdd.take
(2)
Array[Int]=Array(1,2)
foreach(func)
executefunctionforeachdataelementinRDD.usuallyusedtoupdateanaccumulator(discussedlater)orinteractingwithexternalsystems.
valrd