加入收藏 | 设为首页 | 会员中心 | 我要投稿 南京站长网 (https://www.025zz.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 运营中心 > 网站设计 > 教程 > 正文

Apache Spark Delta Lake写数据使用及实现原理代码解析

发布时间:2019-10-03 21:23:58 所属栏目:教程 来源:明惠
导读:副标题#e# Delta Lake 写数据是其最基本的功能,而且其使用和现有的 Spark 写 Parquet 文件基本一致,在介绍 Delta Lake 实现原理之前先来看看如何使用它,具体使用如下: df.write.format(delta).save(/data/yangping.wyp/delta/test/) //数据按照dt分区 d

真正写数据的操作是 txn.writeFiles 函数执行的,具体实现如下:

  1. def writeFiles( 
  2.       data: Dataset[_], 
  3.       writeOptions: Option[DeltaOptions], 
  4.       isOptimize: Boolean): Seq[AddFile] = { 
  5.     hasWritten = true 
  6.   
  7.     val spark = data.sparkSession 
  8.     val partitionSchema = metadata.partitionSchema 
  9.     val outputPath = deltaLog.dataPath 
  10.   
  11.     val (queryExecution, output) = normalizeData(data, metadata.partitionColumns) 
  12.     val partitioningColumns = 
  13.       getPartitioningColumns(partitionSchema, output, output.length < data.schema.size) 
  14.   
  15.     // 获取 DelayedCommitProtocol,里面可以设置写文件的名字, 
  16.     // commitTask 和 commitJob 等做一些事情 
  17.     val committer = getCommitter(outputPath) 
  18.   
  19.     val invariants = Invariants.getFromSchema(metadata.schema, spark) 
  20.   
  21.     SQLExecution.withNewExecutionId(spark, queryExecution) { 
  22.       val outputSpec = FileFormatWriter.OutputSpec( 
  23.         outputPath.toString, 
  24.         Map.empty, 
  25.         output) 
  26.   
  27.       val physicalPlan = DeltaInvariantCheckerExec(queryExecution.executedPlan, invariants) 
  28.   
  29.       FileFormatWriter.write( 
  30.         sparkSession = spark, 
  31.         plan = physicalPlan, 
  32.         fileFormat = snapshot.fileFormat, 
  33.         committer = committer, 
  34.         outputSpec = outputSpec, 
  35.         hadoopConf = spark.sessionState.newHadoopConfWithOptions(metadata.configuration), 
  36.         partitionColumns = partitioningColumns, 
  37.         bucketSpec = None, 
  38.         statsTrackers = Nil, 
  39.         options = Map.empty) 
  40.     } 
  41.   
  42.     // 返回新增的文件 
  43.     committer.addedStatuses 

Delta Lake 写操作最终调用 Spark 的 FileFormatWriter.write 方法进行的,通过这个方法的复用将我们真正的数据写入到 Delta Lake 表里面去了。

(编辑:南京站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读