Delta Plus
A library based on delta for Spark and MLSQL.
- JianShu How delta works
- Medium How delta works
- Video 是时候改变你的增量同步方案了
- ZhiHu 是时候改变你的增量同步方案了
Requirements
This library requires Spark 2.4+ (tested) and Delta 0.5.0(for spark 2.4.x)/0.7.0(for spark 3.0.x).
Linking
You can link against this library in your program at the following coordinates:
Scala 2.11/Spark 2.4.3
groupId: tech.mlsql
artifactId: delta-plus_2.11
version: 0.4.0
Scala 2.12/Spark3.0.0
groupId: tech.mlsql
artifactId: delta-plus_2.12
version: 0.4.0
ChangeLog
0.4.0
- fix issue if idCols is not the first columns, the schema will be changed.
Limitation
- Compaction can not be applied to delta table which will be operated by upsert/delete action.
Binlog Replay Support
To incremental sync MySQL table to Delta Lake, you should combine delta-plus with project spark-binlog.
DataFrame:
val spark = SparkSession.builder()
.master("local[*]")
.appName("Binlog2DeltaTest")
.getOrCreate()
val df = spark.readStream.
format("org.apache.spark.sql.mlsql.sources.MLSQLBinLogDataSource").
option("host","127.0.0.1").
option("port","3306").
option("userName","root").
option("password","123456").
option("databaseNamePattern","test").
option("tableNamePattern","mlsql_binlog").
option("bingLogNamePrefix","mysql-bin").
option("binlogIndex","10").
option("binlogFileOffset","90840").
load()
val query = df.writeStream.
format("org.apache.spark.sql.delta.sources.MLSQLDeltaDataSource").
option("__path__","/tmp/datahouse/{db}/{table}").
option("path","{db}/{table}").
option("mode","Append").
option("idCols","id").
option("duration","3").
option("syncType","binlog").
option("checkpointLocation", "/tmp/cpl-binlog2").
outputMode("append")
.trigger(Trigger.ProcessingTime("3 seconds"))
.start()
query.awaitTermination()
MLSQL Code:
set streamName="binlog";
load binlog.`` where
host="127.0.0.1"
and port="3306"
and userName="xxxx"
and password="xxxxxx"
and bingLogNamePrefix="mysql-bin"
and binlogIndex="4"
and binlogFileOffset="4"
and databaseNamePattern="mlsql_console"
and tableNamePattern="script_file"
as table1;
save append table1
as rate.`mysql_{db}.{table}`
options mode="Append"
and idCols="id"
and duration="5"
and syncType="binlog"
and checkpointLocation="/tmp/cpl-binlog2";
Before you run the streaming application, make sure you have fully sync the table .
MLSQL Code:
connect jdbc where
url="jdbc:mysql://127.0.0.1:3306/mlsql_console?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false"
and driver="com.mysql.jdbc.Driver"
and user="xxxxx"
and password="xxxx"
as db_cool;
load jdbc.`db_cool.script_file` as script_file;
run script_file as TableRepartition.`` where partitionNum="2" and partitionType="range" and partitionCols="id"
as rep_script_file;
save overwrite rep_script_file as delta.`mysql_mlsql_console.script_file` ;
load delta.`mysql_mlsql_console.script_file` as output;
DataFrame Code:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.master("local[*]")
.appName("wow")
.getOrCreate()
val mysqlConf = Map(
"url" -> "jdbc:mysql://localhost:3306/mlsql_console?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false",
"driver" -> "com.mysql.jdbc.Driver",
"user" -> "xxxxx",
"password" -> "xxxxxx",
"dbtable" -> "script_file"
)
import org.apache.spark.sql.functions.col
var df = spark.read.format("jdbc").options(mysqlConf).load()
df = df.repartitionByRange(2, col("id") )
df.write
.format("org.apache.spark.sql.delta.sources.MLSQLDeltaDataSource").
mode("overwrite").
save("/tmp/datahouse/mlsql_console/script_file")
spark.close()
Upsert/Delete Support
DataFrame:
df.writeStream.
format("org.apache.spark.sql.delta.sources.MLSQLDeltaDataSource").
option("idCols","id"). // this means will execute upsert
option("operation","delete"). // this means will delete data in df
.mode(OutputMode.Append).save("/tmp/delta-table1")
df.readStream.format("org.apache.spark.sql.delta.sources.MLSQLDeltaDataSource").load("/tmp/delta-table1")
when idCols
and operation
is not configured, then we will execute normal Append/Overwrite operation. If you have idCols
setup, then it will execute Upsert operation. If you have idCols
, operation
both setup and operation equal to delete
, then it will delete table records in df.
Notice that if the data which will be written to the delta table have duplicate records, delta-plus will throw exception by default. If you wanna do deduplicating, set
dropDuplicate
as true.
MLSQL:
save append table1
as rate.`mysql_{db}.{table}`
options mode="Append"
and idCols="id"
and duration="5"
and syncType="binlog"
and checkpointLocation="/tmp/cpl-binlog2";
CompactionSupport
DataFrame:
val optimizeTableInDelta = CompactTableInDelta(log,
new DeltaOptions(Map[String, String](), df.sparkSession.sessionState.conf), Seq(), Map(
CompactTableInDelta.COMPACT_VERSION_OPTION -> "8",
CompactTableInDelta.COMPACT_NUM_FILE_PER_DIR -> "1",
CompactTableInDelta.COMPACT_RETRY_TIMES_FOR_LOCK -> "60"
))
val items = optimizeTableInDelta.run(df.sparkSession)
MLSQL:
-- compact table1 files before version 10, and make
-- sure every partition only have one file
!delta compact /delta/table1 10 1;
You can use !delta history /delta/table1;
to get the history of the table.