基于Apache Hudi和Debezium构建CDC入湖管道

从 Hudi v0.10.0 开始,我们很高兴地宣布推出 的源,它提供从 MySQL 数据库到数据湖的变更捕获数据 (CDC) 摄取。有关详细信息,请参阅原始 RFC。

1.背景

当您想要连接来自事务型数据库(例如 MySQL 或 MySQL)的数据以执行分析时,这些数据通常会通过称为 CDC 的过程被引入数据仓库或数据湖等 OLAP 系统。是一个让CDC变得简单的流行工具,它提供了一种通过读取更改日志来捕获数据库中行级更改的方法,这样可以避免增加数据库的CPU负载并确保捕获包括删除所有更改包括。

现在 Hudi 提供了源连接器,CDC 引入数据湖比以往任何时候都容易,因为它具有一些独特的差异化功能。 Hudi 可在数据湖上实现高效的更新、合并和删除事务。与使用 Spark 或 Flink 的典型数据湖编写器相比,Hudi 独特地提供了一种可以显着减少摄取延迟的编写器。最后,Hudi 提供了这样的功能,在从数据库中捕获更改后,它们可以在所有后续 ETL 管道中进行下游增量处理。

2.整体设计

上面展示了一个使用 Hudi 的端到端 CDC 摄取流的架构,首先第一个组件是部署,它由一个 Kafka 集群,(或)和一个不断轮询数据库中的 的连接器组成并将每个数据库行的更改写入 AVRO 消息到每个表的专用 Kafka 主题。

第二个组件是Hudi,它为每个表读取和处理来自Kafka的传入记录,并将Hudi表中的相应行写入(更新)到云存储上。

为了近乎实时地将数据库表中的数据提取到 Hudi 表中,我们实现了两个可插拔类。首先我们实现一个源。以连续模式运行,从给定表的 Kafka 主题中连续读取和处理 Avro 格式的更改记录,并将更新的记录写入目标 Hudi 表。除了数据库表中的列之外,我们还通过将它们添加到目标 Hudi 表来摄取添加到目标 Hudi 表中的一些元字段。元字段帮助我们正确合并更新和删除的记录,并使用表中的最新模式读取记录。

其次,我们实现了一个自定义,该自定义控制在更新或删除同一行时如何合并 Hudi 记录,当接收到现有行的新 Hudi 记录时,有效负载使用相应列的比较。高值(MySQL 中的 和 POS 字段以及 中的 LSN 字段)选择最近的记录,并且在后一个事件是删除记录的情况下,有效负载实现确保该记录被硬删除贮存。被删除的记录由 op 字段标识,其值 d 表示删除。

3.hudi配置

在将源连接器用于 CDC 摄取时,请务必考虑以下 Hudi 部署配置。

3.1 引导现有表

一个重要的用例可能是需要 CDC 提取现有数据库表的地方。在流式传输更改之前,我们可以通过两种方式获取现有数据库数据:

3.2 个例子

以下描述了基于 AWS RDS 实例的部署,基于在 Spark 集群上运行 Hudi 实现端到端 CDC 管道的步骤。

3.3 个数据库

RDS 实例需要进行一些配置更改才能启用逻辑复制。

SET rds.logical_replication to 1 (instead of 0)

图片[1]-基于Apache Hudi和Debezium构建CDC入湖管道-唐朝资源网

psql --host= --port=5432 --username=postgres --password -d ; CREATE PUBLICATION FOR TABLE schema1.table1, schema1.table2; ALTER TABLE schema1.table1 REPLICA IDENTITY FULL;

3.4 个连接器

是在集群上部署和管理 Kafka 连接器的推荐选项,或者您可以选择使用托管连接器。

kubectl create namespace kafka
kubectl create -f https://strimzi.io/install/latest?namespace=kafka -n kafka
kubectl -n kafka apply -f kafka-connector.yaml

一个kafka-.yaml的例子如下所示:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: debezium-kafka-connect
annotations:
strimzi.io/use-connector-resources: "false"
spec:
image: debezium-kafka-connect:latest
replicas: 1

bootstrapServers: localhost:9092
config:
config.storage.replication.factor: 1
offset.storage.replication.factor: 1
status.storage.replication.factor: 1

您可以使用以下构建映像包含连接器-kafka-

FROM confluentinc/cp-kafka-connect:6.2.0 as cp
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-avro-converter:6.2.0
FROM strimzi/kafka:0.18.0-kafka-2.5.0
USER root:root
RUN yum -y update
RUN yum -y install git
RUN yum -y install wget
RUN wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.6.1.Final/debezium-connector-postgres-1.6.1.Final-plugin.tar.gz
RUN tar xzf debezium-connector-postgres-1.6.1.Final-plugin.tar.gz
RUN mkdir -p /opt/kafka/plugins/debezium && mkdir -p /opt/kafka/plugins/avro/
RUN mv debezium-connector-postgres /opt/kafka/plugins/debezium/

COPY --from=cp /usr/share/confluent-hub-components/confluentinc-kafka-connect-avro-converter/lib /opt/kafka/plugins/avro/
USER 1001

和 Kafka 连接器部署完成后,我们就可以启动连接器了。

curl -X POST -H "Content-Type:application/json" -d @connect-source.json http://localhost:8083/connectors/

以下是设置连接器以生成两个表和的更改日志的示例配置。

-.json 内容如下

{
  "name": "postgres-debezium-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "localhost",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname": "database",
    "plugin.name": "pgoutput",
    "database.server.name": "postgres",
    "table.include.list": "schema1.table1,schema1.table2",

    "publication.autocreate.mode": "filtered",
    "tombstones.on.delete":"false",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "",
    "slot.name": "pgslot"
  }
}

3.5 胡迪

接下来我们使用 Spark 运行 Hudi,它将从 kafka 中提取变更日志并将其写入 Hudi 表。下面显示了此类命令的示例,因为它适用于数据库。几个关键配置如下:

spark-submit \
  --jars "/home/hadoop/hudi-utilities-bundle_2.12-0.10.0.jar,/usr/lib/spark/external/lib/spark-avro.jar" \
  --master yarn --deploy-mode client \
  --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer /home/hadoop/hudi-packages/hudi-utilities-bundle_2.12-0.10.0-SNAPSHOT.jar \
  --table-type COPY_ON_WRITE --op UPSERT \
  --target-base-path s3://bucket_name/path/for/hudi_table1 \
  --target-table hudi_table1  --continuous \
  --min-sync-interval-seconds 60 \

图片[2]-基于Apache Hudi和Debezium构建CDC入湖管道-唐朝资源网

--source-class org.apache.hudi.utilities.sources.debezium.PostgresDebeziumSource \ --source-ordering-field _event_lsn \ --payload-class org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload \ --hoodie-conf schema.registry.url=https://localhost:8081 \ --hoodie-conf hoodie.deltastreamer.schemaprovider.registry.url=https://localhost:8081/subjects/postgres.schema1.table1-value/versions/latest \ --hoodie-conf hoodie.deltastreamer.source.kafka.value.deserializer.class=io.confluent.kafka.serializers.KafkaAvroDeserializer \ --hoodie-conf hoodie.deltastreamer.source.kafka.topic=postgres.schema1.table1 \ --hoodie-conf auto.offset.reset=earliest \ --hoodie-conf hoodie.datasource.write.recordkey.field=”database_primary_key” \ --hoodie-conf hoodie.datasource.write.partitionpath.field=partition_key \ --enable-hive-sync \ --hoodie-conf hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor \ --hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \ --hoodie-conf hoodie.datasource.hive_sync.database=default \ --hoodie-conf hoodie.datasource.hive_sync.table=hudi_table1 \ --hoodie-conf hoodie.datasource.hive_sync.partition_fields=partition_key

4.总结

本文介绍了 Hudi 用于将变更日志提取到 Hudi 表中的源。现在可以将数据库数据提取到数据湖中,以提供一种经济高效的方式来存储和分析数据库数据。

关注此 JIRA 以了解有关此新功能的更多信息。

© 版权声明
THE END
喜欢就支持一下吧
点赞128 分享
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

取消
昵称表情代码图片