DStream窗口操作
在Spark Streaming中,为DStream提供了窗口操作,即在DStream流上,将一个可配置的长度设置为窗口,以一个可配置的速率向前移动窗口。根据窗口操作,对窗口内的数据进行计算,每次落在窗口内的RDD数据会被聚合起来计算,生成的RDD会作为Window DStream的一个RDD,窗口操作如图1所示。
图1 DStream的窗口操作
在图1中,该窗口操作的滑动窗口长度为3个时间单位,这3个时间单位内的3个RDD会被聚合起来进行计算处理,然后过了2个时间单位,又会对最近3个时间单位内的数据执行滑动窗口进行计算。
下面,通过一张表来列举DStream API提供的与窗口操作相关的方法,具体如表1所示。
表1 DStream API提供的与窗口操作相关的方法
方法名称 | 相关说明 |
---|---|
window(windowLength,slideInterval) | 返回基于源DStream的窗口进行批计算后的一个新DStream |
countByWindow(windowLength,slideInterval) | 返回基于滑动窗口的DStream中的元素数 |
reduceByWindow(func,windowLength,slideInterval) | 基于滑动窗口的源DStream中的元素进行聚合操作,返回一个新DStream |
reduceByKeyAndWindow(func,windowLength,slideInterval,[ numTasks ]) | 基于滑动窗口对(K,V)类型的DStream中的值,按K应用聚合函数func进行聚合操作,返回一个新DStream |
reduceByKeyAndWindow(func,invFuncwindowLength,slideInterval,[ numTasks ]) | 更高效的reduceByKeyAndWindow()实现版本。每个窗口的聚合值,都是基于先前窗口的聚合值进行增量计算得到。该操作会对进入滑动窗口的新数据进行聚合操作,并对离开窗口的历史数据进行逆向聚合操作(即以InvFunc参数传入) |
countByValueAndWindow(windowLength,slideInterval,[numTasks ]) | 基于滑动窗口计算源DStream中每个RDD内每个元素出现的频次,返回一个由(K,V)组成的新的DStream,其中,K为RDD中的元素类型;V为元素在滑动窗口出现的次数 |
在表1中,列举了一些DStream API提供的与窗口操作相关的方法。下面,主要对window()和reduceByKeyAndWindow()这两个方法进行详细讲解。
- window()
基于源DStream的窗口进行批次计算后,返回一个新DStream。
接下来,通过一个具体的案例来演示如何使用window()方法输出3个时间单位长度的数据。在spark_chapter07项目的/src/main/scala/cn.itcast.dstream目录下创建一个名为“WindowTest”的Scala类,主要用于编写Spark Streaming应用程序,实现输出3个时间单位中的所有元素,具体代码如文件1所示。
文件1 WindowTest.scala
1 import org.apache.spark.{SparkConf, SparkContext}
2 import org.apache.spark.streaming.{Seconds, StreamingContext}
3 import org.apache.spark.streaming.dstream.{DStream,ReceiverInputDStream}
4 object WindowTest {
5 def main(args: Array[String]): Unit = {
6 //1.创建SparkConf对象
7 val sparkConf: SparkConf = new SparkConf()
8 .setAppName("WindowTest ").setMaster("local[2]")
9 //2.创建SparkContext对象,它是所有任务计算的源头
10 val sc: SparkContext = new SparkContext(sparkConf)
11 //3.设置日志级别
12 sc.setLogLevel("WARN")
13 //4.创建StreamingContext,需要两个参数,分别为SparkContext和批处理时间间隔
14 val ssc: StreamingContext = new StreamingContext(sc,Seconds(1))
15 //5.连接socket服务,需要socket服务地址、端口号及存储级别(默认的)
16 val dstream: ReceiverInputDStream[String] = ssc
17 .socketTextStream("192.168.121.134",9999)
18 //6.按空格进行切分每一行
19 val words: DStream[String] = dstream.flatMap(_.split(" "))
20 //7.调用window操作,需要两个参数,窗口长度和滑动时间间隔
21 val windowWords: DStream[String] = words.window(Seconds(3),Seconds(1))
22 //8.打印输出结果
23 windowWords.print()
24 //9.开启流式计算
25 ssc.start()
26 //10.让程序一直运行,除非人为干预停止
27 ssc.awaitTermination()
28 }
29 }
上述代码中,第7-8行代码创建SparkConf对象,用于配置Spark环境;第10行代码创建SparkContext对象,用于操作Spark集群;第12行代码设置日志输出级别;第14-17行代码创建StreamingContext对象,用于创建DStream对象,通过dstream对象连接socket服务,获取实时的流数据;第19行代码通过dstream对象的flatMap()方法将实时的流数据用空格进行切分;第21行代码调用window()方法,用于限制窗口的长度。
运行文件1中的代码,在hadoop01 9999端口每秒输入一个数字,具体内容如下:
[root@hadoop01 servers]# nc -lk 9999
1
2
3
4
5
打开IDEA工具,可以看到控制台输出窗口长度为3个时间单位中的所有元素,输出的内容如图2所示。
图2 window()方法的操作
从图2可以看出,窗口长度为3个时间单位以内的元素都可以输出,而到第4个时间单位的时候就看不到数字1,接着当第5个时间单位时,就看不到数字2,这说明此时1和2已经不在当前的窗口中。
- reduceByKeyAndWindow()
基于滑动窗口对(Key,Value)类型的DStream中的值,按Key应用聚合函数进行聚合操作,返回一个新DStream。
接下来,通过一个具体的案例,来演示如何使用reduceByKeyAndWindow()方法统计3个时间单位内不同字母出现的次数。在spark_chapter07项目的/src/main/scala/cn.itcast.dstream目录下创建一个名为“reduceByKeyAndWindowTest”的Scala类,用于编写Spark Streaming应用程序,具体代码如文件2所示。
文件2 ReduceByKeyAndWindowTest.scala
1 import org.apache.spark.{SparkConf, SparkContext}
2 import org.apache.spark.streaming.{Seconds, StreamingContext}
3 import org.apache.spark.streaming.dstream.{DStream,ReceiverInputDStream}
4 object ReduceByKeyAndWindowTest {
5 def main(args: Array[String]): Unit = {
6 //1.创建SparkConf对象
7 val sparkConf: SparkConf = new SparkConf()
8 .setAppName("ReduceByKeyAndWindowTest ").setMaster("local[2]")
9 //2.创建SparkContext对象,它是所有任务计算的源头
10 val sc: SparkContext = new SparkContext(sparkConf)
11 //3.设置日志级别
12 sc.setLogLevel("WARN")
13 //4.创建StreamingContext,需要两个参数,分别为SparkContext和批处理时间间隔
14 val ssc: StreamingContext = new StreamingContext(sc,Seconds(1))
30 //5.连接socket服务,需要socket服务地址、端口号及存储级别(默认的)
15 val dstream: ReceiverInputDStream[String] = ssc
31 .socketTextStream("192.168.121.134",9999)
32 //6.按空格进行切分每一行, 并将切分的单词出现次数记录为1
33 val wordAndOne: DStream[(String, Int)] = dstream.flatMap(_.split(" "))
34 .map(word =>(word,1))
35 //7.调用reduceByKeyAndWindow操作
36 val windowWords: DStream[(String, Int)] = wordAndOne
37 .reduceByKeyAndWindow((a:Int, b:Int)=>(a+b),Seconds(3),Seconds(1))
16 //8.打印输出结果
17 windowWords.print()
18 //9.开启流式计算
19 ssc.start()
20 //10.让程序一直运行,除非人为干预停止
21 ssc.awaitTermination()
22 }
23 }
在上述代码中,调用reduceByKeyAndWindow()方法需要三个参数,分别是函数、窗口长度及时间间隔。其中,窗口长度和时间间隔必须是批处理时间间隔的整数倍。
运行文件2中的代码,在hadoop01 9999端口每秒输入一个字母,具体内容如下:
[root@hadoop01 servers]# nc -lk 9999
a
a
b
b
c
打开IDEA工具,可以看到控制台输出窗口长度为3个时间单位内不同字母出现的次数,输出内容如图3所示。
图3 reduceByKeyAndWindow()方法的操作
从图3可以看出,当时间为4s(即1550799799000 ms)时,最前面的字母a已经不在当前的窗口中,因此字母a的次数为1;当时间为5s(即1550799800000 ms)时,第二个字母a也不在当前窗口中,因此就没有输出字母a出现的次数。