学科分类
目录
Spark技术栈

Spark SQL操作MySQL

Spark SQL可以通过JDBC从关系型数据库中读取数据的方式创建DataFrame,通过对DataFrame进行一系列的操作后,还可以将数据重新写入到关系型数据中。关于Spark SQL对MySQL数据库的相关操作具体如下。

1. 读取MySQL数据库

通过SQLyog工具远程连接hadoop01节点的MySQL服务,并利用可视化操作界面创建名称为“spark”的数据库,并创建名称为“person”的数据表,以及向表中添加数据。

同样也可以在hadoop01节点上使用MySQL客户端创建数据库、数据表以及插入数据,具体命令如下。

#启动mysql客户端
$ mysql -u root –p  # 屏幕提示输入密码
#创建spark数据库
mysql > CREATE database spark;
#创建person数据表
mysql > CREATE TABLE person (id INT(4),NAME CHAR(20),age INT(4));
#插入数据
mysql > INSERT INTO person VALUE(1,'zhangsan',18);
mysql > INSERT INTO person VALUE(2,'lisi',20);
mysql > SELECT * FROM person;

数据库和数据表创建成功后,如果想通过Spark SQL API方式访问MySQL数据库,需要在pom.xml配置文件中添加MySQL驱动连接包,依赖参数如下。

<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>5.1.38</version>
</dependency>

当所需依赖添加完毕后,就可以编写代码读取MySQL数据库中的数据,具体代码如文件1所示。

文件1 DataFromMysql.scala

 1  import java.util.Properties
 2  import org.apache.spark.sql.{DataFrame, SparkSession}
 3  object DataFromMysql {
 4    def main(args: Array[String]): Unit = {
 5      //1、创建sparkSession对象
 6      val spark: SparkSession = SparkSession.builder()
 7          .appName("DataFromMysql")
 8          .master("local[2]")
 9          .getOrCreate()
 10     //2、创建Properties对象,设置连接mysql的用户名和密码
 11     val properties: Properties =new Properties()
 12     properties.setProperty("user","root")
 13     properties.setProperty("password","123456")
 14     //3、读取mysql中的数据
 15     **val mysqlDF : DataFrame = spark.read.jdbc**
 16      **("jdbc:mysql://192.168.121.134:3306/spark","person",properties)**
 17     //4、显示mysql中表的数据
 18     mysqlDF.show()
 19     spark.stop()
 20   }
 21 }

文件1中,第15-16行代码spark.read.jdbc()方法可以实现读取MySQL数据库中的数据,它需要url、table、properties三个参数,分别表示JDBC的url、数据表名、数据库的用户名和密码。

运行文件1中的代码,控制台输出内容如图1所示。

img

图1 Spark SQL查询MySQL数据

2. 向MySQL数据库写入数据

​ Spark SQL不仅能够查询MySQL数据库中的数据,还可以向表中插入新的数据,实现方式的具体代码如文件2所示。

文件2 SparkSqlToMysql.scala

 1  import java.util.Properties
 2  import org.apache.spark.rdd.RDD
 3  import org.apache.spark.sql.{DataFrame, SparkSession}
 4  //创建样例类Person
 5  case class Person(id:Int,name:String,age:Int)
 6  object SparkSqlToMysql {
 7    def main(args: Array[String]): Unit = {
 8      //1.创建sparkSession对象
 9      val spark: SparkSession = SparkSession.builder()
 10         .appName("SparkSqlToMysql")
 11         .master("local[2]")
 12         .getOrCreate()
 13     //2.创建数据
 14     val data = spark.sparkContext
 15              .parallelize(Array("3,wangwu,22","4,zhaoliu,26"))
 16     //3.按MySQL列名切分数据
 17     val arrRDD: RDD[Array[String]] = data.map(_.split(","))
 18     //4.RDD关联Person样例类
 19     val personRDD: RDD[Person] = 
 20               arrRDD.map(x=>Person(x(0).toInt,x(1),x(2).toInt))
 21     //导入隐式转换
 22     import spark.implicits._
 23     //5.将RDD转换成DataFrame
 24     val personDF: DataFrame = personRDD.toDF()
 25     //6.设置JDBC配置参数
 26     val prop =new Properties()
 27     prop.setProperty("user","root")
 28     prop.setProperty("password","123456")
 29     prop.setProperty("driver","com.mysql.jdbc.Driver")
 30     //7.写入数据
 31     personDF.write.mode("append").jdbc(
 32     "jdbc:mysql://192.168.121.134:3306/spark","spark.person",prop)
 33     personDF.show()
 34   }
 35 }

在文件1代码中,第5行代码首先创建case class Person样例类,第9-12行代码用来创建SparkSession对象,第14-15行代码则通过spark.SparkContext.parallelize()方法创建一个RDD,该RDD值表示两个person数据,第17-24行代码表示将数据按照逗号切分并匹配case class Person中的字段用于转换成DataFrame对象,第26-29行代码表示设置JDBC配置参数,访问MySQL数据库,第31行代码personDF.write.mode()方法表示设置写入数据方式,该参数SaveMode是一个枚举类型,枚举参数分别有“Append”、“Overwrite”、“ErrorIfExists”、“Ignore”四个值,分别表示为追加、覆盖、表如果存在即报错(该值为默认值)、忽略新保存的数据。

运行文件1中的代码,返回SQLyog工具查看当前数据表,数据表内容如图2所示。

img

图2 person表数据

从图2中可以看出,新数据成功被写入到person数据表。

点击此处
隐藏目录