你还在这样子Spark里面调MySQL Client吗?
Spark和MySQL老常用了
作为一名数据搬运功,项目上用Spark来处理的案例越来越多,Spark丰富的数据源对接,处理结果入库,作为轻量级目标库,MySQL当之无愧。针对Spark写入MySQL,Spark统一把他作为jdbc写的方式实现,Spark 使用JDBC connection providers(简称 CPs),底层使用JDBC源发起的JDBC连接,MySQLDialect实现JdbcDialect,通过canHandle(url : String)方法适配MySQL的URL:jdbc:mysql….进行链接后的一系列操作。想了解更多的举个爪子。
以上都不是我本篇章想重点说明的,重点在于,对于Spark jdbc write 提供的写方式有四种SaveMode.Append、SaveMode.Overwrite、SaveMode.ErrorIfExists、SaveMode.Ignore,分别是追加(需提前建表)、重写(删了表给你自动重建)、写入异常则终止(基于jdbc insert抛出异常)、写就完了(忽略错误,不管有无插入),这四种方式真的是你想要的,嗯,不想要的。更加灵活的SQL插入/更新,甚至删除才是我们想要的,如 INSERT INTO .. VALUES (?,?,?) ON DUPLICATE KEY UPDATE ..
// spark jdbc 写操作
dataframe.write.mode(SaveMode.Append).jdbc(url,table, connectionProperties)
灵活操作SQL语句
就是调用jdbc的老代码咯,初始化链接、创建预处理语句描述、占位适配符。然后老玩家了,上来就要按spark分区处理,按批次处理,关闭MySQL自动提交提高效率。嗯嗯,对,很棒。代码大概如下所示:
// 测试数据集
val rddz = spark.sparkContext.parallelize(Seq(Row(1,"a3","1"),Row(2,"bzzz","1"),Row(3,"a43","2"),Row(4,"a","634")))
// 常见的操作MySQL,考虑了分区操作,MySQL批处理,关闭自动提高效率,关闭链接(可补充try...catch..)
rddz.foreachPartition( partition => {
Class.forName("com.mysql.cj.jdbc.Driver").newInstance()
val conn = DriverManager.getConnection(url, username, password)
val ps: PreparedStatement = conn.prepareStatement("INSERT INTO testa (id,c1,c2) VALUES (?,?,?) ON DUPLICATE KEY UPDATE id = values( id), c1 = values( c1), c2 = values( c2)")
conn.setAutoCommit(false)
partition.foreach(row => {
ps.setInt(1, row.getAs[Int]("id"))
ps.setString(2, row.getAs[String]("c1"))
ps.setString(3, row.getAs[String]("c2"))
ps.addBatch()
} )
ps.executeBatch()
conn.commit()
ps.close()
conn.close()
})
想法很棒,现实很吐血。PreparedStatement 占位符填充太多代码冗余了,数问号“?” 找索引值,眼睛受罪得很。你还在这样子curd吗?
推荐的干货来了
- SparkCommonTools
官方说明提供了MySQL upsert或者自定义语句操作,但是相比上面,好在哪里呢?例如,下面官方案例代码模块:
val rddz = spark.sparkContext.parallelize(Seq(Row(1,"a3","1"),Row(2,"bzzz","1"),Row(3,"a43","2"),Row(4,"a","634")))
val struct =
StructType(
StructField("id", IntegerType, true) ::
StructField("c1", StringType, false) ::
StructField("c2", StringType, false) :: Nil)
// generate Sql
val sql = MySQLGen.mkUpsertSql(table ,struct)
println(sql)
// INSERT INTO testa (id,c1,c2) VALUES (?,?,?) ON DUPLICATE KEY UPDATE id = values( id), c1 = values( c1), c2 = values( c2)
// mapping configuration
val reConfig = spark.sparkContext.getConf.setAll(Map(
"spark.mysql.url" -> outMyqlUrl,
"spark.mysql.password" -> outPassword,
"spark.mysql.user" -> outUsername,
"spark.mysql.sql" -> sql,
"spark.mysql.driver" -> outMysqlDriverClass
))
// Implicit execution on partition batch SQL statement
MySQLWriter.MySQLSparkExecute(rddz,reConfig,struct)
// Explicit execution on partition batch SQL statement
val rddData = spark.sparkContext.parallelize(Seq((1,"a3","1"),(2,"bzzz","1"),(3,"a43","2"),(4,"a","634")))
MySQLWriter.MySQLSparkExecute[(Int,String,String)](rddData,reConfig,(unit,ps) => {
ps.setInt(1,unit._1)
ps.setString(2,unit._2)
ps.setString(3,unit._3)
ps.addBatch()
})
一个方法完成了常用的INSERT INTO … VALUES (?,?,?) ON DUPLICATE KEY UPDATE 操作,真香哈。通过 “spark.mysql.sql” -> sql, 注入SQL预处理模板。
MySQLWriter.MySQLSparkExecute(rddz,reConfig,struct)
这个实现方式称为隐式操作,相对的,也有显性操作,支持多种RDD格式。但是还是需要数“?”配索引值。只是为了方便扩展调用把。
MySQLWriter.MySQLSparkExecute[(Int,String,String)](rddData,reConfig,(unit,ps) => {
ps.setInt(1,unit._1)
ps.setString(2,unit._2)
ps.setString(3,unit._3)
ps.addBatch()
})
备注Maven依赖
<dependency>
<groupId>com.haooho.spark</groupId>
<artifactId>SparkCommonTools</artifactId>
<version>1.1</version>
</dependency>