DataFrame的创建
在Spark2.0版本之前,Spark SQL中的SQLContext是创建DataFrame和执行SQL的入口,我们可以利用HiveContext接口,通过HiveQL语句操作Hive表数据,实现数据查询功能。而在Spark2.0之后,Spark使用全新的SparkSession接口替代SQLContext及HiveContext接口完成数据的加载、转换、处理等功能。
创建SparkSession对象可以通过“SparkSession.builder().getOrCreate()”方法获取,但当我们使用Spark-Shell编写程序时,Spark-Shell客户端会默认提供了一个名为“sc”的SparkContext对象和一个名为“spark”的SparkSession对象,因此我们可以直接使用这两个对象,不需要自行创建。启动Spark-Shell命令如下所示。
$ spark-shell --master local[2]
在启动Spark-Shell完成后,效果如图1所示。
图1 启动Spark-Shell
在图1中可以看出,SparkContext、SparkSession对象已创建完成。创建DataFrame有多种方式,最基本的方式是从一个已经存在的RDD调用toDF()方法进行转换得到DataFrame,或者通过Spark读取数据源直接创建。
在创建DataFrame之前,为了支持RDD转换成DataFrame及后续的SQL操作,需要导入spark.implicits._包启用隐式转换。若使用SparkSession方式创建DataFrame,可以使用spark.read操作,从不同类型的文件中加载数据创建DataFrame,具体操作API如表1所示。
表1 spark.read操作
代码示例 | 描述 |
---|---|
spark.read.text("people.txt") | 读取txt格式的文本文件,创建DataFrame |
spark.read.csv ("people.csv") | 读取csv格式的文本文件,创建DataFrame |
spark.read.json("people.json") | 读取json格式的文本文件,创建DataFrame |
spark.read.parquet("people.parquet") | 读取parquet格式的文本文件,创建DataFrame |
1.数据准备
在HDFS文件系统中的/spark目录中有一个person.txt文件,内容如文件1所示。
文件1 person.txt
1 zhangsan 20
2 lisi 29
3 wangwu 25
4 zhaoliu 30
5 tianqi 35
6 jerry 40
2.通过文件直接创建DataFrame
我们通过Spark读取数据源的方式进行创建DataFrame,在Spark-Shell输入下列代码:
scala > val personDF = spark.read.text("/spark/person.txt")
personDF: org.apache.spark.sql.DataFrame = [value: String]
scala > personDF.printSchema()
root
|-- value: String (Nullable = true)
从上述返回结果personDF的属性可以看出,创建DataFrame对象完成,之后调用DataFrame的printSchema()方法可以打印当前对象的Schema元数据信息。从返回结果可以看出,当前value字段是String数据类型,并且还可以为Null。
使用DataFrame的show()方法可以查看当前DataFrame的结果数据,具体代码和返回结果如下所示。
scala > personDF.show()
+-------------+
| value |
+-------------+
|1 zhangsan 20|
|2 lisi 29|
|3 wangwu 25|
|4 zhaoliu 30|
|5 tianqi 35|
|6 jerry 40|
+-------------+
从上述返回结果看出,当前personDF对象中的6条记录就对应了person.txt文本文件中的数据。
3.RDD转换DataFrame
调用RDD的toDF()方法,可以将RDD转换为DataFrame对象,具体代码如下所示。
1 scala > val lineRDD = sc.textFile("/spark/person.txt").map(_.split(" "))
2 lineRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[6] at
3 map at <console>:24
4 scala > case class Person(id:Int,name:String,age:Int)
5 defined class Person
6 scala > val personRDD =
lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))
7 personRDD: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[7] at map
8 at <console>:27
9 scala > val personDF = personRDD.toDF()
10 personDF: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more
11 field]
12 scala > personDF.show
13 +----+--------+----+
14 | id | name | age|
15 +----+--------+----+
16 | 1 |zhangsan | 20|
17 | 2 |lisi | 29|
18 | 3 |wangwu | 25|
19 | 4 |zhaoliu | 30|
20 | 5 |tianqi | 35|
21 | 6 |jerry | 40|
22 +----+--------+----+
23 scala > personDF.printSchema
24 root
25 |-- id: integer (nullable = false)
26 |-- name: string (nullable = true)
27 |-- age: integer (nullable = false)
在上述代码中,第1行代码将文本文件转换成RDD,第4行代码定义Person样例类,相当于定义表的Schema元数据信息,第6行代码表示使RDD中的数组数据与样例类进行关联,最终会将RDD[Array[String]]更改为RDD[Person],第9行代码表示调用RDD的toDF()方法,就可以把RDD转换成了DataFrame了。第12-27行代码表示调用DataFrame方法并从返回结果可以看出,RDD对象成功转换DataFrame。