Spark SQL操作MySQL
Spark SQL可以通过JDBC从关系型数据库中读取数据的方式创建DataFrame,通过对DataFrame进行一系列的操作后,还可以将数据重新写入到关系型数据中。关于Spark SQL对MySQL数据库的相关操作具体如下。
1. 读取MySQL数据库
通过SQLyog工具远程连接hadoop01节点的MySQL服务,并利用可视化操作界面创建名称为“spark”的数据库,并创建名称为“person”的数据表,以及向表中添加数据。
同样也可以在hadoop01节点上使用MySQL客户端创建数据库、数据表以及插入数据,具体命令如下。
#启动mysql客户端
$ mysql -u root –p # 屏幕提示输入密码
#创建spark数据库
mysql > CREATE database spark;
#创建person数据表
mysql > CREATE TABLE person (id INT(4),NAME CHAR(20),age INT(4));
#插入数据
mysql > INSERT INTO person VALUE(1,'zhangsan',18);
mysql > INSERT INTO person VALUE(2,'lisi',20);
mysql > SELECT * FROM person;
数据库和数据表创建成功后,如果想通过Spark SQL API方式访问MySQL数据库,需要在pom.xml配置文件中添加MySQL驱动连接包,依赖参数如下。
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
当所需依赖添加完毕后,就可以编写代码读取MySQL数据库中的数据,具体代码如文件1所示。
文件1 DataFromMysql.scala
1 import java.util.Properties
2 import org.apache.spark.sql.{DataFrame, SparkSession}
3 object DataFromMysql {
4 def main(args: Array[String]): Unit = {
5 //1、创建sparkSession对象
6 val spark: SparkSession = SparkSession.builder()
7 .appName("DataFromMysql")
8 .master("local[2]")
9 .getOrCreate()
10 //2、创建Properties对象,设置连接mysql的用户名和密码
11 val properties: Properties =new Properties()
12 properties.setProperty("user","root")
13 properties.setProperty("password","123456")
14 //3、读取mysql中的数据
15 **val mysqlDF : DataFrame = spark.read.jdbc**
16 **("jdbc:mysql://192.168.121.134:3306/spark","person",properties)**
17 //4、显示mysql中表的数据
18 mysqlDF.show()
19 spark.stop()
20 }
21 }
文件1中,第15-16行代码spark.read.jdbc()方法可以实现读取MySQL数据库中的数据,它需要url、table、properties三个参数,分别表示JDBC的url、数据表名、数据库的用户名和密码。
运行文件1中的代码,控制台输出内容如图1所示。
图1 Spark SQL查询MySQL数据
2. 向MySQL数据库写入数据
Spark SQL不仅能够查询MySQL数据库中的数据,还可以向表中插入新的数据,实现方式的具体代码如文件2所示。
文件2 SparkSqlToMysql.scala
1 import java.util.Properties
2 import org.apache.spark.rdd.RDD
3 import org.apache.spark.sql.{DataFrame, SparkSession}
4 //创建样例类Person
5 case class Person(id:Int,name:String,age:Int)
6 object SparkSqlToMysql {
7 def main(args: Array[String]): Unit = {
8 //1.创建sparkSession对象
9 val spark: SparkSession = SparkSession.builder()
10 .appName("SparkSqlToMysql")
11 .master("local[2]")
12 .getOrCreate()
13 //2.创建数据
14 val data = spark.sparkContext
15 .parallelize(Array("3,wangwu,22","4,zhaoliu,26"))
16 //3.按MySQL列名切分数据
17 val arrRDD: RDD[Array[String]] = data.map(_.split(","))
18 //4.RDD关联Person样例类
19 val personRDD: RDD[Person] =
20 arrRDD.map(x=>Person(x(0).toInt,x(1),x(2).toInt))
21 //导入隐式转换
22 import spark.implicits._
23 //5.将RDD转换成DataFrame
24 val personDF: DataFrame = personRDD.toDF()
25 //6.设置JDBC配置参数
26 val prop =new Properties()
27 prop.setProperty("user","root")
28 prop.setProperty("password","123456")
29 prop.setProperty("driver","com.mysql.jdbc.Driver")
30 //7.写入数据
31 personDF.write.mode("append").jdbc(
32 "jdbc:mysql://192.168.121.134:3306/spark","spark.person",prop)
33 personDF.show()
34 }
35 }
在文件1代码中,第5行代码首先创建case class Person样例类,第9-12行代码用来创建SparkSession对象,第14-15行代码则通过spark.SparkContext.parallelize()方法创建一个RDD,该RDD值表示两个person数据,第17-24行代码表示将数据按照逗号切分并匹配case class Person中的字段用于转换成DataFrame对象,第26-29行代码表示设置JDBC配置参数,访问MySQL数据库,第31行代码personDF.write.mode()方法表示设置写入数据方式,该参数SaveMode是一个枚举类型,枚举参数分别有“Append”、“Overwrite”、“ErrorIfExists”、“Ignore”四个值,分别表示为追加、覆盖、表如果存在即报错(该值为默认值)、忽略新保存的数据。
运行文件1中的代码,返回SQLyog工具查看当前数据表,数据表内容如图2所示。
图2 person表数据
从图2中可以看出,新数据成功被写入到person数据表。