学科分类
目录
Spark技术栈

DStream窗口操作

在Spark Streaming中,为DStream提供了窗口操作,即在DStream流上,将一个可配置的长度设置为窗口,以一个可配置的速率向前移动窗口。根据窗口操作,对窗口内的数据进行计算,每次落在窗口内的RDD数据会被聚合起来计算,生成的RDD会作为Window DStream的一个RDD,窗口操作如图1所示。

img

图1 DStream的窗口操作

在图1中,该窗口操作的滑动窗口长度为3个时间单位,这3个时间单位内的3个RDD会被聚合起来进行计算处理,然后过了2个时间单位,又会对最近3个时间单位内的数据执行滑动窗口进行计算。

下面,通过一张表来列举DStream API提供的与窗口操作相关的方法,具体如表1所示。

表1 DStream API提供的与窗口操作相关的方法

方法名称 相关说明
window(windowLength,slideInterval) 返回基于源DStream的窗口进行批计算后的一个新DStream
countByWindow(windowLength,slideInterval) 返回基于滑动窗口的DStream中的元素数
reduceByWindow(func,windowLength,slideInterval) 基于滑动窗口的源DStream中的元素进行聚合操作,返回一个新DStream
reduceByKeyAndWindow(func,windowLength,slideInterval,[ numTasks ]) 基于滑动窗口对(K,V)类型的DStream中的值,按K应用聚合函数func进行聚合操作,返回一个新DStream
reduceByKeyAndWindow(func,invFuncwindowLength,slideInterval,[ numTasks ]) 更高效的reduceByKeyAndWindow()实现版本。每个窗口的聚合值,都是基于先前窗口的聚合值进行增量计算得到。该操作会对进入滑动窗口的新数据进行聚合操作,并对离开窗口的历史数据进行逆向聚合操作(即以InvFunc参数传入)
countByValueAndWindow(windowLength,slideInterval,[numTasks ]) 基于滑动窗口计算源DStream中每个RDD内每个元素出现的频次,返回一个由(K,V)组成的新的DStream,其中,K为RDD中的元素类型;V为元素在滑动窗口出现的次数

在表1中,列举了一些DStream API提供的与窗口操作相关的方法。下面,主要对window()和reduceByKeyAndWindow()这两个方法进行详细讲解。

  • window()

基于源DStream的窗口进行批次计算后,返回一个新DStream。

接下来,通过一个具体的案例来演示如何使用window()方法输出3个时间单位长度的数据。在spark_chapter07项目的/src/main/scala/cn.itcast.dstream目录下创建一个名为“WindowTest”的Scala类,主要用于编写Spark Streaming应用程序,实现输出3个时间单位中的所有元素,具体代码如文件1所示。

​ 文件1 WindowTest.scala

 1  import org.apache.spark.{SparkConf, SparkContext}
 2  import org.apache.spark.streaming.{Seconds, StreamingContext}
 3  import org.apache.spark.streaming.dstream.{DStream,ReceiverInputDStream}
 4  object WindowTest {
 5    def main(args: Array[String]): Unit = {
 6      //1.创建SparkConf对象
 7      val sparkConf: SparkConf = new SparkConf()
 8               .setAppName("WindowTest ").setMaster("local[2]")
 9      //2.创建SparkContext对象,它是所有任务计算的源头
 10     val sc: SparkContext = new SparkContext(sparkConf)
 11     //3.设置日志级别
 12     sc.setLogLevel("WARN")
 13     //4.创建StreamingContext,需要两个参数,分别为SparkContext和批处理时间间隔
 14     val ssc: StreamingContext = new StreamingContext(sc,Seconds(1))
 15     //5.连接socket服务,需要socket服务地址、端口号及存储级别(默认的)
 16     val dstream: ReceiverInputDStream[String] = ssc
 17                 .socketTextStream("192.168.121.134",9999)
 18     //6.按空格进行切分每一行
 19     val words: DStream[String] = dstream.flatMap(_.split(" "))
 20     //7.调用window操作,需要两个参数,窗口长度和滑动时间间隔
 21     val windowWords: DStream[String] = words.window(Seconds(3),Seconds(1))
 22     //8.打印输出结果
 23     windowWords.print()
 24     //9.开启流式计算
 25     ssc.start()
 26     //10.让程序一直运行,除非人为干预停止
 27     ssc.awaitTermination()
 28   }
 29 }

上述代码中,第7-8行代码创建SparkConf对象,用于配置Spark环境;第10行代码创建SparkContext对象,用于操作Spark集群;第12行代码设置日志输出级别;第14-17行代码创建StreamingContext对象,用于创建DStream对象,通过dstream对象连接socket服务,获取实时的流数据;第19行代码通过dstream对象的flatMap()方法将实时的流数据用空格进行切分;第21行代码调用window()方法,用于限制窗口的长度。

运行文件1中的代码,在hadoop01 9999端口每秒输入一个数字,具体内容如下:

[root@hadoop01 servers]# nc -lk 9999
1
2
3
4
5

打开IDEA工具,可以看到控制台输出窗口长度为3个时间单位中的所有元素,输出的内容如图2所示。

img

图2 window()方法的操作

从图2可以看出,窗口长度为3个时间单位以内的元素都可以输出,而到第4个时间单位的时候就看不到数字1,接着当第5个时间单位时,就看不到数字2,这说明此时1和2已经不在当前的窗口中。

  • reduceByKeyAndWindow()

基于滑动窗口对(Key,Value)类型的DStream中的值,按Key应用聚合函数进行聚合操作,返回一个新DStream。

接下来,通过一个具体的案例,来演示如何使用reduceByKeyAndWindow()方法统计3个时间单位内不同字母出现的次数。在spark_chapter07项目的/src/main/scala/cn.itcast.dstream目录下创建一个名为“reduceByKeyAndWindowTest”的Scala类,用于编写Spark Streaming应用程序,具体代码如文件2所示。

​ 文件2 ReduceByKeyAndWindowTest.scala

 1  import org.apache.spark.{SparkConf, SparkContext}
 2  import org.apache.spark.streaming.{Seconds, StreamingContext}
 3  import org.apache.spark.streaming.dstream.{DStream,ReceiverInputDStream}
 4  object ReduceByKeyAndWindowTest {
 5    def main(args: Array[String]): Unit = {
 6      //1.创建SparkConf对象
 7      val sparkConf: SparkConf = new SparkConf()
 8       .setAppName("ReduceByKeyAndWindowTest ").setMaster("local[2]")
 9      //2.创建SparkContext对象,它是所有任务计算的源头
 10     val sc: SparkContext = new SparkContext(sparkConf)
 11     //3.设置日志级别
 12     sc.setLogLevel("WARN")
 13     //4.创建StreamingContext,需要两个参数,分别为SparkContext和批处理时间间隔
 14     val ssc: StreamingContext = new StreamingContext(sc,Seconds(1))
 30     //5.连接socket服务,需要socket服务地址、端口号及存储级别(默认的)
 15     val dstream: ReceiverInputDStream[String] = ssc
 31                 .socketTextStream("192.168.121.134",9999)
 32     //6.按空格进行切分每一行, 并将切分的单词出现次数记录为1
 33     val wordAndOne: DStream[(String, Int)] = dstream.flatMap(_.split(" "))
 34                           .map(word =>(word,1))
 35     //7.调用reduceByKeyAndWindow操作
 36     val windowWords: DStream[(String, Int)] = wordAndOne
 37       .reduceByKeyAndWindow((a:Int, b:Int)=>(a+b),Seconds(3),Seconds(1))
 16     //8.打印输出结果
 17     windowWords.print()
 18     //9.开启流式计算
 19     ssc.start()
 20     //10.让程序一直运行,除非人为干预停止
 21     ssc.awaitTermination()
 22   }
 23 }

在上述代码中,调用reduceByKeyAndWindow()方法需要三个参数,分别是函数、窗口长度及时间间隔。其中,窗口长度和时间间隔必须是批处理时间间隔的整数倍。

运行文件2中的代码,在hadoop01 9999端口每秒输入一个字母,具体内容如下:

[root@hadoop01 servers]# nc -lk 9999
a
a
b
b
c

打开IDEA工具,可以看到控制台输出窗口长度为3个时间单位内不同字母出现的次数,输出内容如图3所示。

img

图3 reduceByKeyAndWindow()方法的操作

从图3可以看出,当时间为4s(即1550799799000 ms)时,最前面的字母a已经不在当前的窗口中,因此字母a的次数为1;当时间为5s(即1550799800000 ms)时,第二个字母a也不在当前窗口中,因此就没有输出字母a出现的次数。

点击此处
隐藏目录