你还是这样子在Spark里面调MySQL Client吗?

你还在这样子Spark里面调MySQL Client吗?

Spark和MySQL老常用了

作为一名数据搬运功,项目上用Spark来处理的案例越来越多,Spark丰富的数据源对接,处理结果入库,作为轻量级目标库,MySQL当之无愧。针对Spark写入MySQL,Spark统一把他作为jdbc写的方式实现,Spark 使用JDBC connection providers(简称 CPs),底层使用JDBC源发起的JDBC连接,MySQLDialect实现JdbcDialect,通过canHandle(url : String)方法适配MySQL的URL:jdbc:mysql….进行链接后的一系列操作。想了解更多的举个爪子。

以上都不是我本篇章想重点说明的,重点在于,对于Spark jdbc write 提供的写方式有四种SaveMode.Append、SaveMode.Overwrite、SaveMode.ErrorIfExists、SaveMode.Ignore,分别是追加(需提前建表)、重写(删了表给你自动重建)、写入异常则终止(基于jdbc insert抛出异常)、写就完了(忽略错误,不管有无插入),这四种方式真的是你想要的,嗯,不想要的。更加灵活的SQL插入/更新,甚至删除才是我们想要的,如 INSERT INTO .. VALUES (?,?,?) ON DUPLICATE KEY UPDATE ..

// spark jdbc 写操作
dataframe.write.mode(SaveMode.Append).jdbc(url,table, connectionProperties)

灵活操作SQL语句

就是调用jdbc的老代码咯,初始化链接、创建预处理语句描述、占位适配符。然后老玩家了,上来就要按spark分区处理,按批次处理,关闭MySQL自动提交提高效率。嗯嗯,对,很棒。代码大概如下所示:

// 测试数据集
val rddz = spark.sparkContext.parallelize(Seq(Row(1,"a3","1"),Row(2,"bzzz","1"),Row(3,"a43","2"),Row(4,"a","634")))

// 常见的操作MySQL,考虑了分区操作,MySQL批处理,关闭自动提高效率,关闭链接(可补充try...catch..)
rddz.foreachPartition( partition => {
  Class.forName("com.mysql.cj.jdbc.Driver").newInstance()
  val conn = DriverManager.getConnection(url, username, password)
  val ps: PreparedStatement =  conn.prepareStatement("INSERT INTO testa (id,c1,c2) VALUES (?,?,?) ON DUPLICATE KEY UPDATE  id = values( id), c1 = values( c1), c2 = values( c2)")
  conn.setAutoCommit(false)
  partition.foreach(row => {
    ps.setInt(1, row.getAs[Int]("id"))
    ps.setString(2, row.getAs[String]("c1"))
    ps.setString(3, row.getAs[String]("c2"))

    ps.addBatch()
  } )
  ps.executeBatch()
  conn.commit()
  ps.close()
  conn.close()
})

想法很棒,现实很吐血。PreparedStatement 占位符填充太多代码冗余了,数问号“?” 找索引值,眼睛受罪得很。你还在这样子curd吗?

推荐的干货来了

  • SparkCommonTools

官方说明提供了MySQL upsert或者自定义语句操作,但是相比上面,好在哪里呢?例如,下面官方案例代码模块:


 val rddz = spark.sparkContext.parallelize(Seq(Row(1,"a3","1"),Row(2,"bzzz","1"),Row(3,"a43","2"),Row(4,"a","634")))
  val struct =
      StructType(
        StructField("id", IntegerType, true) ::
          StructField("c1", StringType, false) ::
          StructField("c2", StringType, false) :: Nil)

    // generate Sql
    val sql = MySQLGen.mkUpsertSql(table ,struct)
    println(sql)
    // INSERT INTO testa (id,c1,c2) VALUES (?,?,?) ON DUPLICATE KEY UPDATE  id = values( id), c1 = values( c1), c2 = values( c2)
    
    // mapping configuration
    val reConfig = spark.sparkContext.getConf.setAll(Map(
      "spark.mysql.url" -> outMyqlUrl,
      "spark.mysql.password" -> outPassword,
      "spark.mysql.user" -> outUsername,
      "spark.mysql.sql" -> sql,
      "spark.mysql.driver" -> outMysqlDriverClass
    ))

    // Implicit execution on partition batch SQL statement
    MySQLWriter.MySQLSparkExecute(rddz,reConfig,struct)

    // Explicit execution on partition batch SQL statement
    val rddData =  spark.sparkContext.parallelize(Seq((1,"a3","1"),(2,"bzzz","1"),(3,"a43","2"),(4,"a","634")))
    MySQLWriter.MySQLSparkExecute[(Int,String,String)](rddData,reConfig,(unit,ps) => {
          ps.setInt(1,unit._1)
          ps.setString(2,unit._2)
          ps.setString(3,unit._3)

          ps.addBatch()
      })

一个方法完成了常用的INSERT INTO … VALUES (?,?,?) ON DUPLICATE KEY UPDATE 操作,真香哈。通过 “spark.mysql.sql” -> sql, 注入SQL预处理模板。

MySQLWriter.MySQLSparkExecute(rddz,reConfig,struct)

这个实现方式称为隐式操作,相对的,也有显性操作,支持多种RDD格式。但是还是需要数“?”配索引值。只是为了方便扩展调用把。

 MySQLWriter.MySQLSparkExecute[(Int,String,String)](rddData,reConfig,(unit,ps) => {
          ps.setInt(1,unit._1)
          ps.setString(2,unit._2)
          ps.setString(3,unit._3)

          ps.addBatch()
      })

备注Maven依赖

<dependency>
    <groupId>com.haooho.spark</groupId>
    <artifactId>SparkCommonTools</artifactId>
    <version>1.1</version>
</dependency>
Share