Robinhood基于Apache Hudi的下一代数据湖实践

1.总结

我们的使命是使所有人的金融民主化。内部不同级别的持续数据分析和数据驱动决策是这一使命的基础。我们有各种数据源——OLTP 数据库、事件流和各种第 3 方数据源。需要快速、可靠、安全且以隐私为中心的数据湖摄取服务来支持各种报告、关键业务管道和仪表板。

从最初的数据湖发布开始,我们已经取得了长足的进步,不仅在数据存储规模和查询方面,而且在我们在数据湖中支持的用例方面。在这篇博客中,我们描述了如何使用各种开源工具基于变更数据捕获构建增量摄取,以将我们的核心数据集的数据新鲜延迟从 1 天减少到 15 分钟以下。我们还将描述大容量摄取模型的局限性以及在大规模操作增量摄取管道时的经验教训。

2.数据湖和生态系统

数据湖存储和计算基础架构是支持我们许多数据驱动功能的基石,例如业务分析仪表板和产品改进见解。它也是为业务和临时报告和分析运行大规模数据处理的数据源。此外,生态系统会影响以隐私为中心的原语,例如旨在保护用户隐私的匿名化和访问控制。

主要的OLTP(在线事务处理)数据库由RDS管理; S3 是 Data Lake 存储,它提供了我们的 Data Lake 高性价比和可扩展的存储层;我们主要使用 Spark 来运行生产批处理管道;我们的仪表板由 Trino 分布式 SQL 查询引擎提供支持; Yarn 管理用于运行 Spark 作业的计算集群; Hive 为查询引擎管理和提供表模式;是工作流编排服务。

下图是一个带有计算生态系统的数据湖

在本文中,我们使用指标“数据新鲜度”来比较下面的不同数据摄取架构,该指标提供了一个时间延迟,以使源数据库中的表中发生的更改在相应的 Data Lake 表中可见。

3.大容量摄取的限制

作为数据湖演进的第一步,我们首先使用在线数据库的只读副本对在线数据库进行每日快照。摄取这些表的完整快照会导致数据湖表的写入放大率很高。即使对于一个有数十亿行的表,一天只有几十万行变化,摄取表的完整快照也会导致整个表的读取和写入。此外,在使用实时副本(而不是作为上游数据库备份)时,只读副本 I/O 性能存在瓶颈,这可能导致快照时间较长,从而导致较大的摄取延迟。即使使用诸如通过分区读取并行化 I/O 等技术,这种摄取架构也无法在一小时内交付数据。

确实需要在数据湖中保持低数据新鲜度。许多过去在市场交易时间之后或之前以每日节奏运行的批处理管道必须以每小时或更高的频率运行,以支持不断发展的用例。显然,我们需要更快的摄取管道将在线数据库复制到数据湖。

4.新架构

实现 Data Lake 较低数据新鲜度的更好方法是增量摄取。增量摄取是一种众所周知的技术,用于为数据湖构建高效的摄取管道。在这里,摄取管道不是拍摄快照并将它们作为一个整体转储到 Data Lake,而是以流方式使用 OLTP 数据库的预写日志并将它们摄取到 Data Lake 表中,就像数据库到数据库的复制一样。

从概念上讲,我们有一个两阶段的管道。

下图是增量摄取组件

中间更改日志队列允许分离两个阶段之间的关注点,每个阶段将能够独立运行,并且每个阶段都可以暂停而不影响另一个阶段。队列提供了必要的隔离,因此将数据摄取到数据湖中的任何延迟都不会对 CDC 造成背压。

在第一阶段,我们选择了变更数据捕获 (CDC) 提供商。是一个基于 Kafka 的开源分布式变更数据捕获平台,具有久经考验的一流 CDC 连接器。根据我们的基准测试,我们发现我们期望的负载可以轻松处理,并且我们设置了使用开源将更改记录以 avro 编码写入 Kafka,与 json 编码相比提供了更好的性能。

在第二阶段,我们使用 Hudi 从 Kafka 增量摄取变更日志以创建数据湖表。 Hudi 是一个统一的数据湖平台,用于在数据湖上执行批处理和流处理,Hudi 带有一个功能齐全的基于 Spark 的摄取系统,称为开箱即用,具有一流的 Kafka 集成和一次性写入功能,与不可变数据不同,我们的 CDC 数据具有相当大比例的更新和删除,Hudi 利用其可插入的记录级索引在 Data Lake 表上快速高效地执行,Hudi 自动清理旧文件版本,数据,Hive 表模式同步, 和文件大小自行管理其表以写入适当大小的文件,并且原始表当前以 Hudi 的写时复制模式存储,该模式提供原生列式读取性能。

5.性能总结

我们部署了一个增量摄取管道,将 1000 个表摄取到数据湖中。在新架构之前,由于快照的限制和所涉及的成本,这些表只能保证以每日的节奏进行快照。使用这种新架构,Data Lake 用户很高兴看到关键表的数据新鲜度从 24 小时缩短到 15 分钟以下。

Large Run Time 显示快照表运行了多长时间。请注意,由于只读副本 I/O 瓶颈,其中许多表的快照需要按顺序运行。

显示批量快照的批量快照运行计划每天只运行一次,因为从数据库中为所有表创建快照的周转时间很长。

新的增量摄取数据新鲜度显示新摄取系统的端到端数据新鲜度约为 5 分钟。 ​

6. 经验教训

在本节中,我们将分享您需要了解的有关大规模构建增量摄取管道的经验教训。我们希望这对于希望为他们的数据湖踏上类似旅程的人来说是有价值的。

7.可扩展的初始引导

对数据湖的增量摄取仍需要源表的初始快照。确实提供了初始快照模式,但需要查询 RDS 主实例,我们不想在 RDS 主实例中查询快照,以避免生产 OLTP 查询和初始快照查询之间的资源争用。此外,我们需要通过以无锁方式运行并发分区查询并从数据库备份中获取快照来优化初始快照时间。

出于这些原因,我们在 Hudi 之上提供了一个专用的只读副本,并实现了一个自定义快照器,它利用 Spark 运行并发分区快照查询来获取表的初始快照,Hudi 的可插拔源框架允许我们这样做几行代码就能无缝衔接。

对于带外初始快照,我们需要在增量摄取和快照之间切换时仔细跟踪CDC流中的正确水印,使用Kafka,数据摄取作业的CDC水印转换为Kafka偏移量,这标志着要应用于快照表的变更日志事件的开始,如果我们选择任意的 Kafka 偏移量,我们最终可能会丢失一些应用于数据湖表的变更事件。

从概念上讲,我们需要 3 个阶段来执行正确的快照并过渡到增量摄取:

下图是使用架构的增量摄取架构

来自专用只读副本的快照存在一些限制,例如副本端的 I/O 瓶颈以及 24*7 在线维护只读副本的成本开销。我们正在探索一种方法来按需备份 OLTP 数据库并使用 AWS S3 导出发布到 S3。然后,我们可以依靠大规模处理这些 S3 导出并构建初始快照,这种机制可以实现更快的快照并克服只读副本端的一些 I/O 瓶颈。

8.使用逻辑复制监控背压风险

逻辑复制需要 CDC 连接器直接连接到主 RDS。逻辑复制协议保证 WAL 日志文件被保留,直到它们被完全处理。

如果您卡住或无法跟上消耗 WAL 日志的速度,这可能会导致 WAL 日志文件堆积并耗尽可用磁盘空间,社区建议密切监控滞后消息,我们的负载测试让我们有信心能够应对预计的变化率增加。

9.自动恢复

从每日快照切换到增量摄取的副作用之一是摄取工作流变得有状态。管道可能处于快照或增量摄取状态。

此外,还需要执行架构升级、监控和数据质量验证等操作,并且需要定期添加新表和数据库。

端到端管道涉及不同的系统 – 在线 CDC 世界和数据湖的批处理/流摄取。为 1000 个表执行入职和一般操作需要适当的状态管理和自动化。

我们意识到我们需要在内部构建一流的编排服务来管理摄取管道,跟踪负载和表状态,并自动处理状态转换和其他维护,这将有助于我们大大扩展操作管道。

10. 并非所有表都是平等的

当涉及到这些表对我们的关键用例的重要性时,该原则是有效的,我们有一个小的一些关键表需要 15 分钟内的数据新鲜度,我们采取一种方法,根据表的重要性将表分为不同的层,高度关键的表被标记为第 0 层,对于这些表,我们提供了一个单独的 CDC 复制槽,以将这些关键表的 CDC 通道与其他表的通道隔离开来。此外,我们还为 Hudi 提供了专门的资源来持续获取增量变更日志,并能够在 5-15 分钟内使数据保持最新。对于优先级较低的表,Hudi 配置为以批处理模式每 15 分钟运行一次。

11.管理架构更新

我们的业务是将表从在线 OLTP 世界复制到 Data Lake 世界,复制的数据不是不透明的,而是具有适当的 ,并且复制管道保证将在线表架构转换为数据湖。

鉴于数据湖还能够存储数据更改的整个历史记录,在线和数据湖世界的向后兼容性之间有什么区别。例如,在网络世界中,将不可为空的列添加到 .拥有明确定义的架构演化合约有助于保持数据湖管道更加稳定。

我们发现大多数时候,更改涉及添加新列,我们使用一个函数来冻结我们从表中读取的列集,并依靠重新启动表来处理架构升级,我们计划做端到端的 Side 增加了 兼容性检测机制,以减少重启次数。

12.未来规划

我们看到使用增量摄取来更快地采用原始数据湖表,并且我们正在不断努力提高管道的可靠性。以下是我们正在进行的一些后续步骤:

随着我们开始构建下一代数据湖,这是与数据基础架构团队合作的激动人心的时刻。

© 版权声明
THE END
喜欢就支持一下吧
点赞192 分享
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

取消
昵称表情代码图片