Delta Lake 所有的更新操作都是在事务中进行的,deltaLog.withNewTransaction 就是一个事务,withNewTransaction 的实现如下:
- def withNewTransaction[T](thunk: OptimisticTransaction => T): T = {
- try {
- // 更新当前表事务日志的快照
- update()
- // 初始化乐观事务锁对象
- val txn = new OptimisticTransaction(this)
- // 开启事务
- OptimisticTransaction.setActive(txn)
- // 执行写数据操作
- thunk(txn)
- } finally {
- // 关闭事务
- OptimisticTransaction.clearActive()
- }
- }
在开启事务之前,需要更新当前表事务的快照,因为在执行写数据之前,这张表可能已经被修改了,执行 update 操作之后,就可以拿到当前表的最新版本,紧接着开启乐观事务锁。thunk(txn) 就是需要执行的事务操作,对应 deltaLog.withNewTransaction 里面的所有代码。
我们回到上面的 run 方法。val actions = write(txn, sparkSession) 就是执行写数据的操作,它的实现如下:
- def write(txn: OptimisticTransaction, sparkSession: SparkSession): Seq[Action] = {
- import sparkSession.implicits._
- // 如果不是第一次往表里面写数据,需要判断写数据的模式是否符合条件
- if (txn.readVersion > -1) {
- // This table already exists, check if the insert is valid.
- if (mode == SaveMode.ErrorIfExists) {
- throw DeltaErrors.pathAlreadyExistsException(deltaLog.dataPath)
- } else if (mode == SaveMode.Ignore) {
- return Nil
- } else if (mode == SaveMode.Overwrite) {
- deltaLog.assertRemovable()
- }
- }
-
- // 更新表的模式,比如是否覆盖现有的模式,是否和现有的模式进行 merge
- updateMetadata(txn, data, partitionColumns, configuration, isOverwriteOperation)
-
- // 是否定义分区过滤条件
- val replaceWhere = options.replaceWhere
- val partitionFilters = if (replaceWhere.isDefined) {
- val predicates = parsePartitionPredicates(sparkSession, replaceWhere.get)
- if (mode == SaveMode.Overwrite) {
- verifyPartitionPredicates(
- sparkSession, txn.metadata.partitionColumns, predicates)
- }
- Some(predicates)
- } else {
- None
- }
-
- // 第一次写数据初始化事务日志的目录
- if (txn.readVersion < 0) {
- // Initialize the log path
- deltaLog.fs.mkdirs(deltaLog.logPath)
- }
-
- // 写数据到文件系统中
- val newFiles = txn.writeFiles(data, Some(options))
-
- val deletedFiles = (mode, partitionFilters) match {
- // 全量覆盖,直接拿出缓存在内存中最新事务日志快照里面的所有 AddFile 文件
- case (SaveMode.Overwrite, None) =>
- txn.filterFiles().map(_.remove)
- // 从事务日志快照中获取对应分区里面的所有 AddFile 文件
- case (SaveMode.Overwrite, Some(predicates)) =>
- // Check to make sure the files we wrote out were actually valid.
- val matchingFiles = DeltaLog.filterFileList(
- txn.metadata.partitionColumns, newFiles.toDF(), predicates).as[AddFile].collect()
- val invalidFiles = newFiles.toSet -- matchingFiles
- if (invalidFiles.nonEmpty) {
- val badPartitions = invalidFiles
- .map(_.partitionValues)
- .map { _.map { case (k, v) => s"$k=$v" }.mkString("/") }
- .mkString(", ")
- throw DeltaErrors.replaceWhereMismatchException(replaceWhere.get, badPartitions)
- }
-
- txn.filterFiles(predicates).map(_.remove)
- case _ => Nil
- }
-
- newFiles ++ deletedFiles
- }
- }
如果 txn.readVersion == -1,说明是第一次写数据到 Delta Lake 表,所以当这个值大于 -1 的时候,需要判断一下写数据的操作是否合法。
由于 Delta Lake 底层使用的是 Parquet 格式,所以 Delta Lake 表也支持模式的增加合并等,这就是 updateMetadata 函数对应的操作。
因为 Delta Lake 表支持分区,所以我们可能在写数据的时候指定某个分区进行覆盖。 (编辑:南京站长网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|