学科分类
目录
Spark技术栈

DStream实例——实现网站热词排序

接下来,以实现网站热词排序为例,分析出用户对网站哪些词感兴趣或者不感兴趣,以此来增加用户感兴趣词的内容,减少不感兴趣词的内容,从而提升用户访问网站的流量。在SparkStreaming中是通过DStream编程实现热词排序,并将排名前三的热词输出到Mysql数据表中进行保存。具体实现步骤如下:

1. 创建数据库和表

在MySQL数据库中创建数据库和表,用于接收处理后的数据,具体语句如下:

mysql>create database spark;
mysql>use spark;
mysql>create table searchKeyWord(insert_time date, keyword varchar(30),
​    >search_count integer);

上述语句中,字段insert_time代表的是插入数据的日期;字段keyword代表的是热词; 字段search_count代表的是在指定的时间内该热词出现的次数。

2. 导入依赖

在pom.xml文件中,添加Mysql数据库的依赖,具体内容如下:

<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version>
  </dependency>

3. 创建Scala类,实现热词排序

在spark_chapter07项目的/src/main/scala/cn.itcast.dstream文件夹下,创建一个名为“HotWordBySort”的Scala类,用于编写Spark Streaming应用程序,实现热词统计排序,具体实现代码如文件1所示。

​ 文件1 HotWordBySort.scala

 1  import java.sql.{DriverManager, Statement}
 2  import org.apache.spark.{SparkConf, SparkContext}
 3  import org.apache.spark.streaming.{Seconds, StreamingContext}
 4  import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
 5  object HotWordBySort {
 6    def main(args: Array[String]): Unit = {
 7      //1.创建SparkConf对象
 8      val sparkConf: SparkConf = new SparkConf()
 9             .setAppName("HotWordBySort").setMaster("local[2]")
 10     //2.创建SparkContext对象
 11     val sc: SparkContext = new SparkContext(sparkConf)
 12     //3.设置日志级别
 13     sc.setLogLevel("WARN")
 14     //4.创建StreamingContext,需要两个参数,分别为SparkContext和批处理时间间隔
 15     val ssc: StreamingContext = new StreamingContext(sc,Seconds(5))
 16     //5.连接socket服务,需要socket服务地址、端口号及存储级别(默认的)
 17     val dstream: ReceiverInputDStream[String] = ssc
 18                 .socketTextStream("192.168.121.134",9999)
 19     //6.通过逗号分隔第一个字段和第二个字段
 20     val itemPairs: DStream[(String, Int)] = dstream.map(line=>(line
 21                            .split(",")(0),1))
 22     //7.调用reduceByKeyAndWindow操作,需要三个参数
 23     val itemCount: DStream[(String, Int)] = itemPairs.reduceByKeyAndWindow
 24             ((v1:Int, v2:Int)=>v1+v2,Seconds(60),Seconds(10))
 25     //8.Dstream没有sortByKey操作,所以排序用transform实现,false降序,take(3)取前3 
 26     val hotWord=itemCount.transform(itemRDD=>{
 27       val top3: Array[(String, Int)] = itemRDD.map(pair=>(pair._2,pair._1))
 28          .sortByKey(false).map(pair=>(pair._2,pair._1)).take(3)
 29       //9.将本地的集合(排名前三热词组成的集合)转成RDD
 30       ssc.sparkContext.makeRDD(top3)
 31     })
 32     //10. 调用foreachRDD操作,将输出的数据保存到mysql数据库的表中
 33     hotWord.foreachRDD(rdd=>{
 34       val url="jdbc:mysql://192.168.121.134:3306/spark"
 35       val user="root"
 36       val password="123456"
 37       Class.forName("com.mysql.jdbc.Driver")
 38       val conn1=DriverManager.getConnection(url,user,password)
 39       conn1.prepareStatement("delete from searchKeyWord where 1=1")
 40                              .executeUpdate()
 41       conn1.close()
 42       rdd.foreachPartition(partitionOfRecords=>{
 43         val url="jdbc:mysql://192.168.121.134:3306/spark"
 44         val user="root"
 45         val password="123456"
 46         Class.forName("com.mysql.jdbc.Driver")
 47         val conn2=DriverManager.getConnection(url,user,password)
 48         conn2.setAutoCommit(false)
 49         val stat: Statement = conn2.createStatement()
 50         partitionOfRecords.foreach(record=>{
 51         stat.addBatch("insert into searchKeyWord
 52               (insert_time,keyword,search_count) values
 53               (now(),'"+record._1+"','"+record._2+"')")
 54         })
 55         stat.executeBatch()
 56         conn2.commit()
 57         conn2.close()
 58     })
 59   })
 60   ssc.start()
 61   ssc.awaitTermination()
 62   ssc.stop()
 63   }
 64 }

上述代码中,第8-9行代码置本地测试环境;第11行代码创建SparkConf对象,用于配置Spark环境;第12行代码创建SparkContext对象,用于操作Spark集群;第13行代码设置日志输出级别;第15-17行代码创建StreamingContext对象,用于创建DStream对象,通过dstream对象连接socket服务,获取实时的流数据;第21-22行代码调用map转换操作,通过逗号将第一个字段和第二个字段进行切分;第23-24行代码调用reduceByKeyAndWindow窗口操作,计算10s内每个单词出现的次数;第26-31行代码调用transform、map转换操作和sortByKey排序操作最终对单词出现的次数进行降序,调用take()操作将热词排名前三的集合转成RDD;第33-58行代码调用foreachRDD输出操作,将输出的数据保存到MySQL数据库的数据表searchKeyWord中。

运行文件1中的代码,并在hadoop01 9999端口输入数据,具体内容如下:

[root@hadoop01 servers]# nc -lk 9999
hadoop,111
spark,222
hadoop,222
hadoop,222
hive,222
hive,333

​ 在Mysql的窗口中,执行语句“select * from searchKeyWord”查看数据表searchKeyWord中的数据,具体内容如下:

mysql> select * from searchKeyWord;
+--------------+----------+-----------------+
| insert_time | keyword |   search_count  |
+--------------+----------+-----------------+
| 2018-12-04  |  hadoop  |     3    |
| 2018-12-04  |  hive   |     2    |
| 2018-12-04  |  spark  |        1     |
+--------------+----------+-----------------+

从上述内容可以看出,网站排名前三的热词已经输入到MySQL中的searchKeyWord表中。

点击此处
隐藏目录