我试图插入和更新MySql上使用SparkSQLDataFrames和JDBC连接的一些数据。
我已经成功地使用SaveMode插入新数据。追加。有没有办法从SparkSQL更新MySql Table中已经存在的数据?
我要插入的代码是:
myDataFrame。写模式(SaveMode.Append)。jdbc(JDBCurl、mySqlTable、connectionProperties)
如果我切换到保存模式。覆盖它会删除完整的表并创建一个新表,我正在寻找MySql中提供的类似“ON DUPLICATE KEY UPDATE”的内容
覆盖org.apache.spark.sql.execution.datasources.jdbc
JdbcUtils.scala
插入到
以替换到
import java.sql.{Connection, Driver, DriverManager, PreparedStatement, ResultSet, SQLException}
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
import com.typesafe.scalalogging.Logger
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.jdbc.{DriverRegistry, DriverWrapper, JDBCOptions}
import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects, JdbcType}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row}
/**
* Util functions for JDBC tables.
*/
object UpdateJdbcUtils {
val logger = Logger(this.getClass)
/**
* Returns a factory for creating connections to the given JDBC URL.
*
* @param options - JDBC options that contains url, table and other information.
*/
def createConnectionFactory(options: JDBCOptions): () => Connection = {
val driverClass: String = options.driverClass
() => {
DriverRegistry.register(driverClass)
val driver: Driver = DriverManager.getDrivers.asScala.collectFirst {
case d: DriverWrapper if d.wrapped.getClass.getCanonicalName == driverClass => d
case d if d.getClass.getCanonicalName == driverClass => d
}.getOrElse {
throw new IllegalStateException(
s"Did not find registered driver with class $driverClass")
}
driver.connect(options.url, options.asConnectionProperties)
}
}
/**
* Returns a PreparedStatement that inserts a row into table via conn.
*/
def insertStatement(conn: Connection, table: String, rddSchema: StructType, dialect: JdbcDialect)
: PreparedStatement = {
val columns = rddSchema.fields.map(x => dialect.quoteIdentifier(x.name)).mkString(",")
val placeholders = rddSchema.fields.map(_ => "?").mkString(",")
val sql = s"REPLACE INTO $table ($columns) VALUES ($placeholders)"
conn.prepareStatement(sql)
}
/**
* Retrieve standard jdbc types.
*
* @param dt The datatype (e.g. [[org.apache.spark.sql.types.StringType]])
* @return The default JdbcType for this DataType
*/
def getCommonJDBCType(dt: DataType): Option[JdbcType] = {
dt match {
case IntegerType => Option(JdbcType("INTEGER", java.sql.Types.INTEGER))
case LongType => Option(JdbcType("BIGINT", java.sql.Types.BIGINT))
case DoubleType => Option(JdbcType("DOUBLE PRECISION", java.sql.Types.DOUBLE))
case FloatType => Option(JdbcType("REAL", java.sql.Types.FLOAT))
case ShortType => Option(JdbcType("INTEGER", java.sql.Types.SMALLINT))
case ByteType => Option(JdbcType("BYTE", java.sql.Types.TINYINT))
case BooleanType => Option(JdbcType("BIT(1)", java.sql.Types.BIT))
case StringType => Option(JdbcType("TEXT", java.sql.Types.CLOB))
case BinaryType => Option(JdbcType("BLOB", java.sql.Types.BLOB))
case TimestampType => Option(JdbcType("TIMESTAMP", java.sql.Types.TIMESTAMP))
case DateType => Option(JdbcType("DATE", java.sql.Types.DATE))
case t: DecimalType => Option(
JdbcType(s"DECIMAL(${t.precision},${t.scale})", java.sql.Types.DECIMAL))
case _ => None
}
}
private def getJdbcType(dt: DataType, dialect: JdbcDialect): JdbcType = {
dialect.getJDBCType(dt).orElse(getCommonJDBCType(dt)).getOrElse(
throw new IllegalArgumentException(s"Can't get JDBC type for ${dt.simpleString}"))
}
// A `JDBCValueGetter` is responsible for getting a value from `ResultSet` into a field
// for `MutableRow`. The last argument `Int` means the index for the value to be set in
// the row and also used for the value in `ResultSet`.
private type JDBCValueGetter = (ResultSet, InternalRow, Int) => Unit
// A `JDBCValueSetter` is responsible for setting a value from `Row` into a field for
// `PreparedStatement`. The last argument `Int` means the index for the value to be set
// in the SQL statement and also used for the value in `Row`.
private type JDBCValueSetter = (PreparedStatement, Row, Int) => Unit
/**
* Saves a partition of a DataFrame to the JDBC database. This is done in
* a single database transaction (unless isolation level is "NONE")
* in order to avoid repeatedly inserting data as much as possible.
*
* It is still theoretically possible for rows in a DataFrame to be
* inserted into the database more than once if a stage somehow fails after
* the commit occurs but before the stage can return successfully.
*
* This is not a closure inside saveTable() because apparently cosmetic
* implementation changes elsewhere might easily render such a closure
* non-Serializable. Instead, we explicitly close over all variables that
* are used.
*/
def savePartition(
getConnection: () => Connection,
table: String,
iterator: Iterator[Row],
rddSchema: StructType,
nullTypes: Array[Int],
batchSize: Int,
dialect: JdbcDialect,
isolationLevel: Int): Iterator[Byte] = {
val conn = getConnection()
var committed = false
var finalIsolationLevel = Connection.TRANSACTION_NONE
if (isolationLevel != Connection.TRANSACTION_NONE) {
try {
val metadata = conn.getMetaData
if (metadata.supportsTransactions()) {
// Update to at least use the default isolation, if any transaction level
// has been chosen and transactions are supported
val defaultIsolation = metadata.getDefaultTransactionIsolation
finalIsolationLevel = defaultIsolation
if (metadata.supportsTransactionIsolationLevel(isolationLevel)) {
// Finally update to actually requested level if possible
finalIsolationLevel = isolationLevel
} else {
logger.warn(s"Requested isolation level $isolationLevel is not supported; " +
s"falling back to default isolation level $defaultIsolation")
}
} else {
logger.warn(s"Requested isolation level $isolationLevel, but transactions are unsupported")
}
} catch {
case NonFatal(e) => logger.warn("Exception while detecting transaction support", e)
}
}
val supportsTransactions = finalIsolationLevel != Connection.TRANSACTION_NONE
try {
if (supportsTransactions) {
conn.setAutoCommit(false) // Everything in the same db transaction.
conn.setTransactionIsolation(finalIsolationLevel)
}
val stmt = insertStatement(conn, table, rddSchema, dialect)
val setters: Array[JDBCValueSetter] = rddSchema.fields.map(_.dataType)
.map(makeSetter(conn, dialect, _))
val numFields = rddSchema.fields.length
try {
var rowCount = 0
while (iterator.hasNext) {
val row = iterator.next()
var i = 0
while (i < numFields) {
if (row.isNullAt(i)) {
stmt.setNull(i + 1, nullTypes(i))
} else {
setters(i).apply(stmt, row, i)
}
i = i + 1
}
stmt.addBatch()
rowCount += 1
if (rowCount % batchSize == 0) {
stmt.executeBatch()
rowCount = 0
}
}
if (rowCount > 0) {
stmt.executeBatch()
}
} finally {
stmt.close()
}
if (supportsTransactions) {
conn.commit()
}
committed = true
Iterator.empty
} catch {
case e: SQLException =>
val cause = e.getNextException
if (cause != null && e.getCause != cause) {
if (e.getCause == null) {
e.initCause(cause)
} else {
e.addSuppressed(cause)
}
}
throw e
} finally {
if (!committed) {
// The stage must fail. We got here through an exception path, so
// let the exception through unless rollback() or close() want to
// tell the user about another problem.
if (supportsTransactions) {
conn.rollback()
}
conn.close()
} else {
// The stage must succeed. We cannot propagate any exception close() might throw.
try {
conn.close()
} catch {
case e: Exception => logger.warn("Transaction succeeded, but closing failed", e)
}
}
}
}
/**
* Saves the RDD to the database in a single transaction.
*/
def saveTable(
df: DataFrame,
url: String,
table: String,
options: JDBCOptions) {
val dialect = JdbcDialects.get(url)
val nullTypes: Array[Int] = df.schema.fields.map { field =>
getJdbcType(field.dataType, dialect).jdbcNullType
}
val rddSchema = df.schema
val getConnection: () => Connection = createConnectionFactory(options)
val batchSize = options.batchSize
val isolationLevel = options.isolationLevel
df.foreachPartition(iterator => savePartition(
getConnection, table, iterator, rddSchema, nullTypes, batchSize, dialect, isolationLevel)
)
}
private def makeSetter(
conn: Connection,
dialect: JdbcDialect,
dataType: DataType): JDBCValueSetter = dataType match {
case IntegerType =>
(stmt: PreparedStatement, row: Row, pos: Int) =>
stmt.setInt(pos + 1, row.getInt(pos))
case LongType =>
(stmt: PreparedStatement, row: Row, pos: Int) =>
stmt.setLong(pos + 1, row.getLong(pos))
case DoubleType =>
(stmt: PreparedStatement, row: Row, pos: Int) =>
stmt.setDouble(pos + 1, row.getDouble(pos))
case FloatType =>
(stmt: PreparedStatement, row: Row, pos: Int) =>
stmt.setFloat(pos + 1, row.getFloat(pos))
case ShortType =>
(stmt: PreparedStatement, row: Row, pos: Int) =>
stmt.setInt(pos + 1, row.getShort(pos))
case ByteType =>
(stmt: PreparedStatement, row: Row, pos: Int) =>
stmt.setInt(pos + 1, row.getByte(pos))
case BooleanType =>
(stmt: PreparedStatement, row: Row, pos: Int) =>
stmt.setBoolean(pos + 1, row.getBoolean(pos))
case StringType =>
(stmt: PreparedStatement, row: Row, pos: Int) =>
stmt.setString(pos + 1, row.getString(pos))
case BinaryType =>
(stmt: PreparedStatement, row: Row, pos: Int) =>
stmt.setBytes(pos + 1, row.getAs[Array[Byte]](pos))
case TimestampType =>
(stmt: PreparedStatement, row: Row, pos: Int) =>
stmt.setTimestamp(pos + 1, row.getAs[java.sql.Timestamp](pos))
case DateType =>
(stmt: PreparedStatement, row: Row, pos: Int) =>
stmt.setDate(pos + 1, row.getAs[java.sql.Date](pos))
case t: DecimalType =>
(stmt: PreparedStatement, row: Row, pos: Int) =>
stmt.setBigDecimal(pos + 1, row.getDecimal(pos))
case ArrayType(et, _) =>
// remove type length parameters from end of type name
val typeName = getJdbcType(et, dialect).databaseTypeDefinition
.toLowerCase.split("\\(")(0)
(stmt: PreparedStatement, row: Row, pos: Int) =>
val array = conn.createArrayOf(
typeName,
row.getSeq[AnyRef](pos).toArray)
stmt.setArray(pos + 1, array)
case _ =>
(_: PreparedStatement, _: Row, pos: Int) =>
throw new IllegalArgumentException(
s"Can't translate non-null value for field $pos")
}
}
用法:
val url = s"jdbc:mysql://$host/$database?useUnicode=true&characterEncoding=UTF-8"
val parameters: Map[String, String] = Map(
"url" -> url,
"dbtable" -> table,
"driver" -> "com.mysql.jdbc.Driver",
"numPartitions" -> numPartitions.toString,
"user" -> user,
"password" -> password
)
val options = new JDBCOptions(parameters)
for (d <- data) {
UpdateJdbcUtils.saveTable(d, url, table, options)
}
ps:注意死锁,不要频繁更新数据,只是在紧急情况下重新运行时使用,我认为这就是spark不支持这位官员的原因。
遗憾的是没有保存模式。Upsert
模式在Spark中用于upserting等非常常见的情况。
zero322在总体上是正确的,但我认为提供这种替换功能应该是可能的(在性能上有所妥协)。
我还想为这个案例提供一些java代码。当然,它的性能不如spark内置的,但它应该是满足您需求的良好基础。只需根据您的需要进行修改:
myDF.repartition(20); //one connection per partition, see below
myDF.foreachPartition((Iterator<Row> t) -> {
Connection conn = DriverManager.getConnection(
Constants.DB_JDBC_CONN,
Constants.DB_JDBC_USER,
Constants.DB_JDBC_PASS);
conn.setAutoCommit(true);
Statement statement = conn.createStatement();
final int batchSize = 100000;
int i = 0;
while (t.hasNext()) {
Row row = t.next();
try {
// better than REPLACE INTO, less cycles
statement.addBatch(("INSERT INTO mytable " + "VALUES ("
+ "'" + row.getAs("_id") + "',
+ "'" + row.getStruct(1).get(0) + "'
+ "') ON DUPLICATE KEY UPDATE _id='" + row.getAs("_id") + "';"));
//conn.commit();
if (++i % batchSize == 0) {
statement.executeBatch();
}
} catch (SQLIntegrityConstraintViolationException e) {
//should not occur, nevertheless
//conn.commit();
} catch (SQLException e) {
e.printStackTrace();
} finally {
//conn.commit();
statement.executeBatch();
}
}
int[] ret = statement.executeBatch();
System.out.println("Ret val: " + Arrays.toString(ret));
System.out.println("Update count: " + statement.getUpdateCount());
//conn.commit();
statement.close();
conn.close();
这是不可能的。目前(Spark 1.6.0/2.2.0快照)SparkDataFrameWriter
仅支持四种写入模式:
SaveMode。覆盖
:覆盖现有数据
您可以手动插入,例如使用mapPartitions
(因为您希望UPSERT操作应该是幂等的,因此易于实现)、写入临时表并手动执行UPSERT,或者使用触发器。
一般来说,实现批处理操作的upsert行为并保持良好的性能绝非易事。您必须记住,在一般情况下,会有多个并发事务(每个分区一个),因此您必须确保不会有写冲突(通常通过使用特定于应用程序的分区)或提供适当的恢复过程。在实践中,最好执行并批量写入临时表,并直接在数据库中解析upsert部分。
问题内容: 我试图用PDO编写更新查询,但我无法执行代码? 问题答案: 您的语法错误 您可能打算更新一行而不是全部更新,因此您必须使用子句来定位特定行 更改 至
我知道我们可以使用火花jdbc从现有的mysql表中读写数据。但是我们甚至可以创建mysql表并使用数据框插入数据吗?当我尝试加载文件到数据框,并尝试写入不存在的表我面临空指针异常。以下是错误: JAVAorg上的lang.NullPointerException。阿帕奇。火花sql。处决数据源。jdbc。JdbcRelationProvider。在org上创建关系(jdbrelationprov
我试图使用JDBC更新一个MSSQL实例,使用一个预先准备好的语句,在给定列名、要更新的值和更新的值时,我创建了一个方法来更新表中的任何记录。 第一个print语句打印出正确的值,这样我就知道输入是正确的,第二个print语句让我知道1行受影响,这是正确的。 如果我在Management Studio中运行脚本,脚本会运行良好并更新表,但如果我从java项目中运行脚本,则不会更新任何内容,也不会生
在这里,我创建了一个表http://jsbin.com/ojanaji/13/edit和demo:http://jsbin.com/ojanaji/13 因此,当用户单击表上的某些行时,可以自动将表中的值填充到模式窗口中。当点击按钮“编辑行”时,用户打开模式窗口。现在我需要知道如何用列更新mysql表:姓名、性别、年龄、吃过的甜甜圈。 我创建js Ajax: HTML模式窗口和按钮: 那么我现在如
我试图将值插入到本地计算机上运行的现有java DB中。我的代码: 它一直说我已经添加了这里给出的JDBC驱动程序将Java连接到MySQL数据库(使用独立平台),并使用Intellij将其添加为依赖项。我错过了什么?谢谢! 编辑:任何人遇到这个问题,这个代码是罚款。
问题内容: 我找到了一个自动提交表单数据的教程,但我要做的就是添加一个提交按钮以将数据传递给ajax。 我的目标是创建一个具有多个输入的表单,当用户单击“提交”按钮时,它将通过ajax发送并更新页面,而无需重新加载页面。另外,另一个关键是将所有输入发布到数组中的方式,以便在运行更新脚本时,输入字段中的名称属性与数据库中的列匹配。 我想我接近了。我已经搜索过,但没有找到确切的解决方案。提前致谢。 u