学科分类
目录
Spark技术栈

编程方式定义Schema

当Case类不能提前定义的时候,就需要采用编程方式定义Schema信息,定义DataFrame主要包含3个步骤,具体如下:

(1)创建一个Row对象结构的RDD;

(2)基于StructType类型创建Schema;

(3)通过SparkSession提供的createDataFrame()方法来拼接Schema。

根据上述步骤,创建SparkSqlSchema.scala文件,使用编程方式定义Schema信息的具体代码如文件1所示。

文件1 SparkSqlSchema.scala

1  import org.apache.spark.SparkContext
 2  import org.apache.spark.SparkContext
 3  import org.apache.spark.rdd.RDD
 4  import org.apache.spark.sql.types.
 5              {IntegerType, StringType, StructField, StructType}
 6  import org.apache.spark.sql.{DataFrame, Row, SparkSession}
 7  object SparkSqlSchema {
 8    def main(args: Array[String]): Unit = {
 9      //1.创建SparkSession
 10     val spark: SparkSession = SparkSession.builder()
 11         .appName("SparkSqlSchema")
 12         .master("local[2]")
 13         .getOrCreate()
 14     //2.获取sparkContext对象
 15     val sc: SparkContext = spark.sparkContext
 16     //设置日志打印级别
 17     sc.setLogLevel("WARN")
 18     //3.加载数据
 19     val dataRDD: RDD[String] = sc.textFile("D://spark//person.txt")
 20     //4.切分每一行
 21     val dataArrayRDD: RDD[Array[String]] = dataRDD.map(_.split(" "))
 22     //5.加载数据到Row对象中
 23     val personRDD: RDD[Row] =
 24            dataArrayRDD.map(x=>Row(x(0).toInt,x(1),x(2).toInt))
 25     //6.创建Schema
 26     val schema:StructType= StructType(Seq(
 27       StructField("id", IntegerType, false),
 28       StructField("name", StringType, false),
 29       StructField("age", IntegerType, false)
 30     ))
 31     //7.利用personRDD与Schema创建DataFrame
 32     val personDF: DataFrame = spark.createDataFrame(personRDD,schema)
 33     //8.DSL操作显示DataFrame的数据结果
 34     personDF.show()
 35     //9.将DataFrame注册成表
 36     personDF.createOrReplaceTempView("t_person")
 37     //10.sql语句操作
 38     spark.sql("select * from t_person").show()
 39     //11.关闭资源
 40     sc.stop()
 41     spark.stop()
 42   }
 43 }

在文件1中,第10-24行代码表示将文件转换成为RDD的基本步骤,第26-30行代码即为编程方式定义Schema核心代码,Spark SQL提供了Class StructType(val fields: Array[StructField])类来表示模式信息,生成一个StructType对象,需要提供Fields作为输入参数,Fields是一个集合类型,StructField(name,dataType,nullable)参数分别表示为字段名称、字段数据类型、字段值是否允许为空值,根据person.txt文本数据文件分别设置“id”、“name”、“age”字段作为Schema,第32行代码表示通过调用spark.createDataFrame()方法将RDD和Schema进行合并转换为DataFrame,第34-41行代码即为操作DataFrame进行数据查询。

点击此处
隐藏目录