DStream实例——实现网站热词排序
接下来,以实现网站热词排序为例,分析出用户对网站哪些词感兴趣或者不感兴趣,以此来增加用户感兴趣词的内容,减少不感兴趣词的内容,从而提升用户访问网站的流量。在SparkStreaming中是通过DStream编程实现热词排序,并将排名前三的热词输出到Mysql数据表中进行保存。具体实现步骤如下:
1. 创建数据库和表
在MySQL数据库中创建数据库和表,用于接收处理后的数据,具体语句如下:
mysql>create database spark;
mysql>use spark;
mysql>create table searchKeyWord(insert_time date, keyword varchar(30),
>search_count integer);
上述语句中,字段insert_time代表的是插入数据的日期;字段keyword代表的是热词; 字段search_count代表的是在指定的时间内该热词出现的次数。
2. 导入依赖
在pom.xml文件中,添加Mysql数据库的依赖,具体内容如下:
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
3. 创建Scala类,实现热词排序
在spark_chapter07项目的/src/main/scala/cn.itcast.dstream文件夹下,创建一个名为“HotWordBySort”的Scala类,用于编写Spark Streaming应用程序,实现热词统计排序,具体实现代码如文件1所示。
文件1 HotWordBySort.scala
1 import java.sql.{DriverManager, Statement}
2 import org.apache.spark.{SparkConf, SparkContext}
3 import org.apache.spark.streaming.{Seconds, StreamingContext}
4 import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
5 object HotWordBySort {
6 def main(args: Array[String]): Unit = {
7 //1.创建SparkConf对象
8 val sparkConf: SparkConf = new SparkConf()
9 .setAppName("HotWordBySort").setMaster("local[2]")
10 //2.创建SparkContext对象
11 val sc: SparkContext = new SparkContext(sparkConf)
12 //3.设置日志级别
13 sc.setLogLevel("WARN")
14 //4.创建StreamingContext,需要两个参数,分别为SparkContext和批处理时间间隔
15 val ssc: StreamingContext = new StreamingContext(sc,Seconds(5))
16 //5.连接socket服务,需要socket服务地址、端口号及存储级别(默认的)
17 val dstream: ReceiverInputDStream[String] = ssc
18 .socketTextStream("192.168.121.134",9999)
19 //6.通过逗号分隔第一个字段和第二个字段
20 val itemPairs: DStream[(String, Int)] = dstream.map(line=>(line
21 .split(",")(0),1))
22 //7.调用reduceByKeyAndWindow操作,需要三个参数
23 val itemCount: DStream[(String, Int)] = itemPairs.reduceByKeyAndWindow
24 ((v1:Int, v2:Int)=>v1+v2,Seconds(60),Seconds(10))
25 //8.Dstream没有sortByKey操作,所以排序用transform实现,false降序,take(3)取前3
26 val hotWord=itemCount.transform(itemRDD=>{
27 val top3: Array[(String, Int)] = itemRDD.map(pair=>(pair._2,pair._1))
28 .sortByKey(false).map(pair=>(pair._2,pair._1)).take(3)
29 //9.将本地的集合(排名前三热词组成的集合)转成RDD
30 ssc.sparkContext.makeRDD(top3)
31 })
32 //10. 调用foreachRDD操作,将输出的数据保存到mysql数据库的表中
33 hotWord.foreachRDD(rdd=>{
34 val url="jdbc:mysql://192.168.121.134:3306/spark"
35 val user="root"
36 val password="123456"
37 Class.forName("com.mysql.jdbc.Driver")
38 val conn1=DriverManager.getConnection(url,user,password)
39 conn1.prepareStatement("delete from searchKeyWord where 1=1")
40 .executeUpdate()
41 conn1.close()
42 rdd.foreachPartition(partitionOfRecords=>{
43 val url="jdbc:mysql://192.168.121.134:3306/spark"
44 val user="root"
45 val password="123456"
46 Class.forName("com.mysql.jdbc.Driver")
47 val conn2=DriverManager.getConnection(url,user,password)
48 conn2.setAutoCommit(false)
49 val stat: Statement = conn2.createStatement()
50 partitionOfRecords.foreach(record=>{
51 stat.addBatch("insert into searchKeyWord
52 (insert_time,keyword,search_count) values
53 (now(),'"+record._1+"','"+record._2+"')")
54 })
55 stat.executeBatch()
56 conn2.commit()
57 conn2.close()
58 })
59 })
60 ssc.start()
61 ssc.awaitTermination()
62 ssc.stop()
63 }
64 }
上述代码中,第8-9行代码置本地测试环境;第11行代码创建SparkConf对象,用于配置Spark环境;第12行代码创建SparkContext对象,用于操作Spark集群;第13行代码设置日志输出级别;第15-17行代码创建StreamingContext对象,用于创建DStream对象,通过dstream对象连接socket服务,获取实时的流数据;第21-22行代码调用map转换操作,通过逗号将第一个字段和第二个字段进行切分;第23-24行代码调用reduceByKeyAndWindow窗口操作,计算10s内每个单词出现的次数;第26-31行代码调用transform、map转换操作和sortByKey排序操作最终对单词出现的次数进行降序,调用take()操作将热词排名前三的集合转成RDD;第33-58行代码调用foreachRDD输出操作,将输出的数据保存到MySQL数据库的数据表searchKeyWord中。
运行文件1中的代码,并在hadoop01 9999端口输入数据,具体内容如下:
[root@hadoop01 servers]# nc -lk 9999
hadoop,111
spark,222
hadoop,222
hadoop,222
hive,222
hive,333
在Mysql的窗口中,执行语句“select * from searchKeyWord”查看数据表searchKeyWord中的数据,具体内容如下:
mysql> select * from searchKeyWord;
+--------------+----------+-----------------+
| insert_time | keyword | search_count |
+--------------+----------+-----------------+
| 2018-12-04 | hadoop | 3 |
| 2018-12-04 | hive | 2 |
| 2018-12-04 | spark | 1 |
+--------------+----------+-----------------+
从上述内容可以看出,网站排名前三的热词已经输入到MySQL中的searchKeyWord表中。