真正写数据的操作是 txn.writeFiles 函数执行的,具体实现如下:
- def writeFiles(
- data: Dataset[_],
- writeOptions: Option[DeltaOptions],
- isOptimize: Boolean): Seq[AddFile] = {
- hasWritten = true
-
- val spark = data.sparkSession
- val partitionSchema = metadata.partitionSchema
- val outputPath = deltaLog.dataPath
-
- val (queryExecution, output) = normalizeData(data, metadata.partitionColumns)
- val partitioningColumns =
- getPartitioningColumns(partitionSchema, output, output.length < data.schema.size)
-
- // 获取 DelayedCommitProtocol,里面可以设置写文件的名字,
- // commitTask 和 commitJob 等做一些事情
- val committer = getCommitter(outputPath)
-
- val invariants = Invariants.getFromSchema(metadata.schema, spark)
-
- SQLExecution.withNewExecutionId(spark, queryExecution) {
- val outputSpec = FileFormatWriter.OutputSpec(
- outputPath.toString,
- Map.empty,
- output)
-
- val physicalPlan = DeltaInvariantCheckerExec(queryExecution.executedPlan, invariants)
-
- FileFormatWriter.write(
- sparkSession = spark,
- plan = physicalPlan,
- fileFormat = snapshot.fileFormat,
- committer = committer,
- outputSpec = outputSpec,
- hadoopConf = spark.sessionState.newHadoopConfWithOptions(metadata.configuration),
- partitionColumns = partitioningColumns,
- bucketSpec = None,
- statsTrackers = Nil,
- options = Map.empty)
- }
-
- // 返回新增的文件
- committer.addedStatuses
- }
Delta Lake 写操作最终调用 Spark 的 FileFormatWriter.write 方法进行的,通过这个方法的复用将我们真正的数据写入到 Delta Lake 表里面去了。 (编辑:南京站长网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|