Spark Streaming编程实战文档格式.docx
《Spark Streaming编程实战文档格式.docx》由会员分享,可在线阅读,更多相关《Spark Streaming编程实战文档格式.docx(12页珍藏版)》请在冰点文库上搜索。
<
filename>
<
port>
millisecond>
"
)
18.System.exit
(1)
19.}
20.
21.//获取指定文件总的行数
22.valfilename=args(0)
23.vallines=Source.fromFile(filename).getLines.toList
24.valfilerow=lines.length
25.
26.//指定监听某端口,当外部程序请求时建立连接
27.vallistener=newServerSocket(args
(1).toInt)
28.while(true){
29.valsocket=listener.accept()
30.newThread(){
31.overridedefrun={
32.printIn("
Gotclientconnectedfrom:
"
+socket.getInetAddress)
33.valout=newPrintWriter(socket.getOutputStream(),true)
34.while(true){
35.Thread.sleep(args
(2).toLong)
36.//当该端口接受请求时,随机获取某行数据发送给对方
37.valcontent=lines(index(filerow))
38.printIn(content)
39.out.write(content+'
\n'
)out.flush()
40.}
41.socket.close()
42.}
43.}.start()
44.}
45.}
46.}
在IDEA开发环境打包配置界面中:
∙首先需要在ClassPath加入Jar包(/app/scala-2.10.4/lib/scala—swing.jar/app/scala—2.10.4/lib/scala—library.jar/app/scala—2.10.4/lib/scala—actors.jar)。
∙然后单击“Build”→“BuildArtifacts”,选择“Build”或者“Rebuild”动作。
∙最后使用以下命令复制打包文件到Spark根目录下。
cd/home/hadoop/IdeaProjects/out/artifacts/LearnSpark_jar
cpLearnSpark.jar/app/hadoop/spark-1.1.0/
实例1:
读取文件演示
在该实例中,SparkStreaming将监控某目录中的文件,获取在该间隔时间段内变化的数据,然后通过SparkStreaming计算出该时间段内的单词统计数。
程序代码如下。
1.importorg.apache.spark.SparkConf
2.importorg.apache.spark.streaming.{Seconds,StreamingContext}
3.importorg.apache.spark.streaming.StreamingContext._
5.objectFileWordCount{
6.defmain(args:
Array[String]){
7.valsparkConf=newSparkConf().setAppName("
FileWordCount"
).setMaster("
local[2]"
8.
9.//创建Streaming的上下文,包括Spark的配置和时间间隔,这里时间间隔为20秒valssc=newStreamingContext(sparkConf,Seconds(20))
10.//指定监控的目录,这里为/home/hadoop/temp/
11.vallines=ssc.textFileStream("
/home/hadoop/temp/"
12.//对指定文件夹中变化的数据进行单词统计并且打印
13.valwords=lines.flatMap(_.split("
))
14.valwordCounts=words.map(x=>
(x,1)).reduceByKey(_+_)
15.wordCounts.print()
16.//启动Streaming
17.ssc.start()
18.ssc.awaitTermination()
20.}
运行代码的步骤共有三步。
1)创建Streaming监控目录。
创建/home/hadoop/temp为SparkStreaming监控的目录,在该目录中定时添加文件,然后由SparkStreaming统计出新添加的文件中的单词个数。
2)使用以下命令启动Spark集群。
$cd/app/hadoop/spark-1.1.0
$sbin/start-all.sh
3)在IDEA中运行Streaming程序。
在IDEA中运行该实例,由于该实例没有输入参数故不需要配置参数,在运行日志中将定时打印时间戳。
如果在监控目录中加入文件,则输出时间戳的同时将输出该时间段内新添加的文件的单词统计个数。
实例2:
网络数据演示
在该实例中将由流数据模拟器以1秒的频度发送模拟数据,SparkStreaming通过Socket接收流数据并每20秒运行一次来处理接收到的数据,处理完毕后打印该时间段内数据出现的频度,即在各处理段时间内的状态之间并无关系。
1.importorg.apache.spark.{SparkContext,SparkConf}
2.importorg.apache.spark.streaming.{Milliseconds,Seconds,StreamingContext}
3.importorg.apache.spark.streaming.StreamingContext._
4.importorg.apache.spark.storage.StorageLevel
5.
6.objectNetworkWordCount{
7.defmain(args:
8.valconf=newSparkConf().setAppName("
NetworkWordCount"
9.valsc=newSparkContext(conf)
10.valssc=newStreamingContext(sc,Seconds(20))
11.
12.//通过Socket获取数据,需要提供Socket的主机名和端口号,数据保存在内存和硬盘中
13.vallines=ssc.socketTextStream(args(0),args
(1).toInt,StorageLevel.MEMORY_AND_DISK_SER)
14.
15.//对读入的数据进行分割、计数
16.valwords=lines.flatMap(_.split("
"
17.valwordCounts=words.map(x=>
(x,1)).reduceByKey(_+_)wordCounts.print()
18.ssc.start()
19.ssc.awaitTermination()
20.}
21.}
运行代码的步骤共有四步。
1)启动流数据模拟器。
启动流数据模拟器,模拟器Socket的端口号为9999,频度为1秒。
在该实例中将定时发送/home/hadoop/upload/class7目录下的people.txt数据文件,其中,people.txt数据的内容如下。
1Michael
2Andy
3Justin
4
启动流数据模拟器的命令如下。
$java-cpLearnSpark.jarclass7.StreamingSimulation\
/home/hadoop/upload/class7/people.txt99991000
在没有程序连接时,该程序处于阻塞状态。
2)在IDEA中运行Streaming程序。
在IDEA中运行该实例,需要配置连接Socket的主机名和端口号,在这里配置主机名为hadoop1,端口号为9999。
3)观察模拟器发送情况。
IDEA中的SparkStreaming程序与模拟器建立连接,当模拟器检测到外部连接时开始发送测拭数据,数据是随机在指定的文件中获取的一行数据,时间间隔为1秒。
图1是一个模拟器发送情况的截图。
图1
模拟器发送情况的截图
4)观察统计结果。
在IDEA的运行窗口中,可以观测到统计结果。
通过分析可知,SparkStreaming每段时间内的单词数为20,正好是20秒内每秒发送数量的总和。
---------------------------
Time:
14369195400000ms
(Andy,2)
(Michael,9)
(Justin,9)
实例3:
Stateful演示
该实例为SparkStreaming状态操作,由流数据模拟器以1秒的频度发送模拟数据,SparkStreaming通过Socket接收流数据并每5秒运行一次来处理接收到的数据,处理完毕后打印程序启动后单词出现的频度。
也就是说,每次输出的结果不仅仅是统计该时段内接收到的数据,还包括前面所有时段的数据。
相比较实例2,在该实例中,各时间段内的状态之间是相关的。
1.importorg.apache.log4j.{Level,Logger}
2.importorg.apache.spark.{SparkContext,SparkConf}
3.importorg.apache.spark.streaming.{Seconds,StreamingContext}
4.importorg.apache.spark.streaming.StreamingContext._
6.objectStatefulWordCount{
8.if(args.length!
=2){
9.System.err.printIn("
StatefulWordCount<
10.System.exit
(1)
11.}
12.Logger.getLogger("
org.apache.spark"
).setLevel(Level.ERROR)
13.Logger.getLogger("
org.eclipse.jetty.server"
).setLevel(Level.OFF)
14.//定义更新状态方法,参数values为当前批次单词频度,state为以往批次单词频度
15.valupdateFunc=(values:
Seq[Int],state:
Option[Int])=>
{
16.valcurrentCount=values.foldLeft(0)(_+_)
17.valpreviousCount=state.getOrElse(0)
18.Some(currentCount+previousCount)
20.valconf=newSparkConf().setAppName("
StatefulWordCount"
21.valsc=newSparkContext(conf)
22.//创建StreamingContext,SparkSteaming运行时间间隔为5秒
23.valssc=newStreamingContext(sc,Seconds(5))
24.//定义checkpoint目录为当前目录
25.ssc.checkpoint("
."
26.//获取从Socket发送过来的数据
27.vallines=ssc.socketTextStream(args(0),args
(1).toInt)
28.valwords=lines.flatMap(_.split("
))
29.valwordCounts=words.map(x=>
(x,1))
30.
31.//使用updateStateByKey来更新状态,统计单词总的次数
32.valstateDstream=wordCounts.updateStateByKey[Int](updateFunc)
33.stateDstream.print()
34.ssc.start()
35.ssc.awaitTermination()
36.}
37.}
启动数据流模拟器和在IDEA启动应用程序的方法与实例2相同。
在IDEA的运行窗口中查看运行情况,可以观察到第一次统计的单词总数为0,第二次为5,第N次为5(N-1),即统计的单词总数为程序运行单词数的总和。
----------------------
14369196110000ms
14369196150000ms
(Andy,2)
(Michael,1)
(Justin,2)
实例4:
窗口演示
该实例为SparkStreaming窗口操作,由流数据模拟器以1秒的频度发送模拟数据,SparkStreaming通过Socket接收流数据并每10秒运行一次来处理接收到的数据,处理完毕后打印程序启动后单词出现的频度。
相比前面的实例,SparkStreaming窗口统计是通过reduceByKeyAndWindow()方法实现的,在该方法中需要指定窗口时间长度和滑动时间间隔。
程序代码如下:
3.importorg.apache.spark.storage.StorageLevel
4.importorg.apache.spark.streaming._
5.importorg.apache.spark.streaming.StreamingContext._
7.objectStatefulWordCount{
8.defmain(args:
9.if(args.length!
=4){
10.System.err.printIn("
WindowDuration>
slideDuration>
11."
12.System.exit
(1)
13.}
14.Logger.getLogger("
15.Logger.getLogger("
16.
17.valconf=newSparkConf().setAppName("
WindowWordCount"
18.valsc=newSparkContext(conf)
19.//创建StreamingContext
20.valssc=newStreamingContext(sc,Seconds(5))
21.//定义checkpoint目录为当前目录
22.ssc.checkpoint("
23.//通过Socket获取数据,需提供Socket的主机名和端口号,数据保存在内存和硬盘中
24.vallines=ssc.socketTextStream(args(0),args
(1).toInt,StorageLevel.MEMORY_ONLY_SER)
25.valwords=lines.flatMap(_.split("
26.//Windows操作,第一种方式为叠加处理,第二种方式为增量处理
27.valwordCounts=words.map(x=>
(x,1)).reduceByKeyAndWindow(_+_,_-_,Seconds(args
(2).toInt),Seconds(srg3).toInt)
28.wordCounts.print()
29.ssc.start()
30.ssc.awaitTermination()
31.}
32.}
在IDEA的运行窗口中,可以观察到第一次统计的单词总数为4,第二次为14,第N次为10(N-l)+4,即统计的单词总数为程序运行单词数的总和。
14369196740000ms
(Andy,1)
(Michael,2)
(Justin,1)
14369196750000ms
(Andy,4)
(Michael,5)
(Justin,5)