基于 Apache Hudi 构建增量和无限回放事件流的 OLAP 平台

1. 摘要

在这篇博客中,我们将讨论如何在构建流数据平台时利用 Hudi 最令人难以置信的两项功能。

2. 当前状态2.1 问题描述

对于大多数需要人工干预以通过查看 KPI 和数据趋势以及其他不太实时的用例来决定下一组运营用例的企业来说,我们需要具有成本效益和高性能的近实时系统。

但是我们在数据湖中获得的数据通常以 D -1 的每日批处理粒度来,即使我们每天不止一次运行这些每日批处理数据处理系统以获取当前 D 的最新数据,这些批处理固有系统限制也不能帮助我们处理近乎实时的业务用例。

2.2 挑战

在将批量数据摄取到我们的数据湖中时,我们支持 S3 的数据集在每日更新日期分区上进行分区。即使我们每天多次运行这些批处理系统,我们从上游 Kafka 或 RDBMS 应用程序数据库中提取的最新批处理也会附加到 S3 数据集中当前日期的分区中。

当下游系统想要从我们的 S3 数据集中获取这些最新记录时,它需要重新处理当天的所有记录,因为下游进程在不扫描整个数据分区的情况下无法从增量记录中找出处理过的记录。

此外,如果我们按小时(而不是每日分区)对 S3 数据集进行分区,那么这会将分区粒度设置为每小时间隔。任何试图以低于一小时(例如最后 x 分钟)的粒度获取最新更新的下游作业将不得不在每次运行时再次重新处理每小时数据分区,即这些批处理源将错过解决近乎真实的所需时间-time use case 关键增量数据消耗。

2.3 无限播放事件流

现在回到 Hudi 帮助我们解决这些挑战的特性,让我们首先尝试了解(提交)和(提交时间线)如何影响增量消费和事件流保留/回放。

Hudi 维护了在不同时间对表执行的所有操作的时间线,这些(提交)包含有关插入或重写的文件部分的信息,作为我们所谓的 Hudi 提交时间线的一部分。

对于每个 Hudi 表,我们可以选择指定保留多少历史提交,默认保留 10 次,即 10 次提交后,第 11 次提交将额外运行一个清理服务,该服务将清理第一个 历史。

清理(​​)时,会清理该对应的文件部分的过时版本,相关数据被保留,因为过时文件中的所有数据无论如何都存在于新版本文件中,重要的是这里的事情是我们可以触发快照查询以获取数据的最新状态,但我们将无法对已清理的提交运行增量查询以获取增量数据。

简而言之,如果一个 () 被清理了,我们将失去从该 () 回放事件流的能力,但我们仍然可以从任何尚未清理的 () 回放事件流.

在我们的案例中,我们将 Hudi 表配置为保持 10K 提交,从而为我们提供 10 天的增量读取能力(类似于保持 10 天的 kafka 主题)

我们保留的历史提交越多,我们就越有能力及时回溯并重播事件流。

3. OLAP 每小时

让我快速展示一下我们的端到端消息传递 OLAP 计算管道的架构,其中包含 10 天的事件流

在 kafka 层,我们的 kafka 输入源每个都有 1 天的主题保留期。

在摄取层,我们有 Spark 结构化流作业,从 kafka 源读取数据并将微批处理写入 S3 支持的 Hudi 表。这是我们配置为保持 10k 提交以启用 10 天的事件流的地方。

.option("hoodie.cleaner.commits.retained", 10000)
.option("hoodie.keep.max.commits", 10002)
.option("hoodie.keep.min.commits", 10001)

计算层由我们的批处理 Spark 作业组成,该作业当前每 30 分钟运行一次,并重新处理我们在过去 60 分钟内摄取到 Hudi 表中的所有事件。每小时 OLAP 作业读取两个跨国表和一个可选的 N 维表,并将它们全部连接起来以准备我们的 OLAP 增量。

我们每 30 分钟处理 60 分钟的数据,以增强表连接的一致性。

有趣的是,一般不建议在生产系统中保留 1 天的 kafka 保留时间,但我们能够进行权衡以节省一些 SSD 和 Kafka 代理成本,因为无论如何我们都可以使用 S3 支持的 Hudi 表实现 10 天的事件流式处理能力。

4. 部分记录更新

上面的管道显示了我们如何通过读取和合并两个增量上游数据源来创建每小时增量 OLAP。

然而,这些增量数据处理也有其自身的挑战。可能会发生在两个上游表中,对于主键,我们在其中一个数据源中获得更新但在另一个数据源中没有更新,我们称之为不匹配事务问题。

下图试图帮助我们理解这一挑战并了解我们实施的解决方案。

在这里,表A和B都有一些对应的匹配事务和一些不匹配的事务。使用内部连接将简单地忽略可能永远不会流入我们底层 OLAP 的不匹配事务。使用外连接将不匹配的事务合并到我们每小时的增量数据加载中。但是使用外连接会将缺失的列值添加为空值,现在这些空值需要单独处理。

当使用默认有效负载类将此每小时增量数据更新到底层 Hudi OLAP 时,它将简单地用我们准备的每小时增量数据中的新记录覆盖底层 Hudi OLAP 中的记录。但是这样一来,当我们用传入记录的空列值覆盖现有记录时,就会丢失现有记录中可能已经存在的信息。因此,为了解决这个问题,我们提供了自定义的部分行更新有效负载类,同时将每小时增量数据从外部连接插入到底层 Hudi OLAP。有效负载类定义了控制我们在更新记录时如何合并新旧记录的函数。

我们的自定义有效负载类比较存储和传入记录的所有列,并通过将一条记录中的空列与另一条记录中的非空列重叠来返回一条新记录。

因此,即使只更新了一个上游表,我们的自定义有效负载类也将使用部分可用的新信息,并将返回包含部分更新信息的完全最新记录。

由于存储和部分行更新记录的主键和分区键相同,因此 Hudi 操作会自动更新旧记录,从而为我们提供了基本 OLAP 的去重和一致视图。

有关如何编写自己的有效负载类的更多技术细节。

5. 结论

结合增量消费、增量每小时OLAP处理和自定义部分行更新有效负载类这三个概念,我们为我们的独角兽初创公司构建了一个健壮的流处理平台,以随时扩展成为百角组织。

© 版权声明
THE END
喜欢就支持一下吧
点赞267赞赏 分享
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

取消
昵称表情代码图片

    暂无评论内容