编程方式定义Schema
当Case类不能提前定义的时候,就需要采用编程方式定义Schema信息,定义DataFrame主要包含3个步骤,具体如下:
(1)创建一个Row对象结构的RDD;
(2)基于StructType类型创建Schema;
(3)通过SparkSession提供的createDataFrame()方法来拼接Schema。
根据上述步骤,创建SparkSqlSchema.scala文件,使用编程方式定义Schema信息的具体代码如文件1所示。
文件1 SparkSqlSchema.scala
1 import org.apache.spark.SparkContext
2 import org.apache.spark.SparkContext
3 import org.apache.spark.rdd.RDD
4 import org.apache.spark.sql.types.
5 {IntegerType, StringType, StructField, StructType}
6 import org.apache.spark.sql.{DataFrame, Row, SparkSession}
7 object SparkSqlSchema {
8 def main(args: Array[String]): Unit = {
9 //1.创建SparkSession
10 val spark: SparkSession = SparkSession.builder()
11 .appName("SparkSqlSchema")
12 .master("local[2]")
13 .getOrCreate()
14 //2.获取sparkContext对象
15 val sc: SparkContext = spark.sparkContext
16 //设置日志打印级别
17 sc.setLogLevel("WARN")
18 //3.加载数据
19 val dataRDD: RDD[String] = sc.textFile("D://spark//person.txt")
20 //4.切分每一行
21 val dataArrayRDD: RDD[Array[String]] = dataRDD.map(_.split(" "))
22 //5.加载数据到Row对象中
23 val personRDD: RDD[Row] =
24 dataArrayRDD.map(x=>Row(x(0).toInt,x(1),x(2).toInt))
25 //6.创建Schema
26 val schema:StructType= StructType(Seq(
27 StructField("id", IntegerType, false),
28 StructField("name", StringType, false),
29 StructField("age", IntegerType, false)
30 ))
31 //7.利用personRDD与Schema创建DataFrame
32 val personDF: DataFrame = spark.createDataFrame(personRDD,schema)
33 //8.DSL操作显示DataFrame的数据结果
34 personDF.show()
35 //9.将DataFrame注册成表
36 personDF.createOrReplaceTempView("t_person")
37 //10.sql语句操作
38 spark.sql("select * from t_person").show()
39 //11.关闭资源
40 sc.stop()
41 spark.stop()
42 }
43 }
在文件1中,第10-24行代码表示将文件转换成为RDD的基本步骤,第26-30行代码即为编程方式定义Schema核心代码,Spark SQL提供了Class StructType(val fields: Array[StructField])类来表示模式信息,生成一个StructType对象,需要提供Fields作为输入参数,Fields是一个集合类型,StructField(name,dataType,nullable)参数分别表示为字段名称、字段数据类型、字段值是否允许为空值,根据person.txt文本数据文件分别设置“id”、“name”、“age”字段作为Schema,第32行代码表示通过调用spark.createDataFrame()方法将RDD和Schema进行合并转换为DataFrame,第34-41行代码即为操作DataFrame进行数据查询。