学科分类
目录
Spark技术栈

反射机制推断Schema

在Windows系统下开发Scala代码,可以使用本地环境测试,因此我们首先需要在本地磁盘准备文本数据文件,这里将HDFS中的/spark/person.txt文件下载到本地D:/spark/person.txt路径下。从文件4-1可以看出,当前数据文件共3列,我们可以非常容易的分析出,这三列分别是编号、姓名、年龄。但是计算机无法像人一样直观的感受字段的实际含义,因此我们需要通过反射机制来推断包含特定类型对象的Schema信息。

接下来我们打开IDEA开发工具,创建名为“spark_chapter04”的Maven工程,讲解实现反射机制推断Schema的开发流程。

1.添加Spark SQL依赖

在pom.xml文件中添加Spark SQL依赖,代码片段如下所示。

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql_2.11</artifactId>
  <version>2.3.2</version>
</dependency>

2.编写代码

实现反射机制推断Schema需要定义一个case class样例类,定义字段和属性,样例类的参数名称会被利用反射机制作为列名,编写代码如文件1所示。

文件1 CaseClassSchema.scala

 1  import org.apache.spark.SparkContext
 2  import org.apache.spark.rdd.RDD
 3  import org.apache.spark.sql.{DataFrame, Row, SparkSession}
 4  //定义样例类
 5  case class Person(id:Int,name:String,age:Int)
 6  object CaseClassSchema {
 7    def main(args: Array[String]): Unit = {
 8      //1.构建SparkSession
 9      val spark : SparkSession = SparkSession.builder()
 10                      .appName("CaseClassSchema ")
 11                      .master("local[2]")
 12                      .getOrCreate()
 13     //2.获取SparkContext
 14     val sc : SparkContext =spark.sparkContext
 15     //设置日志打印级别
 16     sc.setLogLevel("WARN")
 17     //3.读取文件
 18     val data: RDD[Array[String]] =
 19         sc.textFile("D://spark//person.txt").map(x=>x.split(" "))
 20     //4.将RDD与样例类关联
 21     val personRdd: RDD[Person] = 
 22                data.map(x=>Person(x(0).toInt,x(1),x(2).toInt))
 23     //5.获取DataFrame
 24     //手动导入隐式转换
 25     import spark.implicits._
 26     val personDF: DataFrame = personRdd.toDF
 27     //------------DSL风格操作开始-------------
 28     //1.显示DataFrame的数据,默认显示20行
 29     personDF.show()
 30     //2.显示DataFrame的schema信息
 31     personDF.printSchema()
 32     //3.统计DataFrame中年龄大于30的人数
 33     println(personDF.filter($"age">30).count())
 34     //-----------DSL风格操作结束-------------
 35     //-----------SQL风格操作开始-------------
 36     //将DataFrame注册成表
 37     personDF.createOrReplaceTempView("t_person")
 38     spark.sql("select * from t_person").show()
 39     spark.sql("select * from t_person where name='zhangsan'").show()
 40     //-----------SQL风格操作结束-------------
 41     //关闭资源操作
 42     sc.stop()
 43     spark.stop()
 44   }
 45 }

在文件1中,第5行代码表示定义了一个Person的Case类,这是因为在利用反射机制推断RDD模式时,首先需要定义一个Case类,因为Spark SQL能够自动将包含Case类的RDD隐式转换成DataFrame,Case类定义了Table的结构,Case类的属性通过反射机制变成表的列名。第9-14行代码中通过SparkSession.builder()方法构建名为“spark”的SparkSession对象,并通过spark对象获取SparkContext。第18-26行代码中,通过sc对象读取文件,系统会将文件加载到内存中生成一个RDD,将RDD 与case class Person进行匹配,personRdd对象即为RDD【Person】,toDF()方法是将RDD转换为DataFrame,在调用toDF()方法之前需要手动添加“spark.implicits._”包。第29-39行代码表示当前创建DataFrame对象后,使用DSL和SQL两种语法操作风格进行数据查询。DataFrame操作和之前在Spark-Shell操作示例大致相同,因此这里将不再展示执行效果。

点击此处
隐藏目录