自从计算机出现以来,我们一直在努力寻找计算机存储某些信息的方法。存储在计算机上的信息(也称为数据)有多种形式。数据变得如此重要,以至于信息现在是触手可及的商品。多年来,数据以多种方式存储在计算机中,包括数据库、blob 存储和其他方法。为了进行有效的业务分析,现代应用程序产生的数据必须经过处理和分析,而产生的数据量是巨大的!有效地存储 PB 级数据并拥有必要的工具来查询数据以便使用这些数据至关重要,这样对数据的分析才能产生有意义的结果。
大数据是处理分析方法、有条不紊地从中提取信息或以其他方式处理对于典型数据处理应用软件而言太大或太复杂的数据量的学科。为了处理现代应用程序产生的数据,大数据的应用是非常有必要的,考虑到这一点,本博客旨在提供一个小教程,介绍如何创建一个数据湖,从应用程序的数据库中读取任何数据的变化和将其写入数据湖中的相关位置,我们将使用的工具如下:
我们将构建的数据湖架构如下:
第一步是使用读取关系数据库中发生的所有更改并将所有更改推送到Kafka集群。
是一个开源的分布式变更数据捕获平台,可以指向任何关系数据库,并且可以实时开始捕获任何数据变更,速度非常快,功能强大,由 Red Hat 维护。
首先我们将使用 – 在我们的机器上安装 MySQL 和 Kafka,您也可以使用它们的独立安装,我们将使用提供给我们的 mysql 映像,因为它已经包含数据,在适当的 Kafka、MySQL 和集群中可以在具有以下文档的任何生产环境中使用:
version: '2'
services:
zookeeper:
image: debezium/zookeeper:${DEBEZIUM_VERSION}
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: debezium/kafka:${DEBEZIUM_VERSION}
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
mysql:
image: debezium/example-mysql:${DEBEZIUM_VERSION}
ports:
- 3307:3306
environment:
- MYSQL_ROOT_PASSWORD=${MYSQL_ROOT_PASS}
- MYSQL_USER=${MYSQL_USER}
- MYSQL_PASSWORD=${MYSQL_USER_PASS}
schema-registry:
image: confluentinc/cp-schema-registry
ports:
- 8181:8181
- 8081:8081
environment:
- SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=kafka:9092
- SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
- SCHEMA_REGISTRY_HOST_NAME=schema-registry
- SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081
links:
- zookeeper
connect:
image: debezium/connect:${DEBEZIUM_VERSION}
ports:
- 8083:8083
links:
- kafka
- mysql
- schema-registry
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
- KEY_CONVERTER=io.confluent.connect.avro.AvroConverter
- VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter
- INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
- INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
- CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081
- CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081
可以设置为 1.8。还要确保设置 、 和 。
在继续之前,我们将通过输入数据库的命令行来查看镜像为我们提供的数据库结构:
docker-compose -f docker-compose-avro-mysql.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory'
在shell里面,我们可以使用show命令。输出应该是这样的:
我们可以使用 * from 命令查看客户表的内容。输出应如下所示:
现在创建容器后我们就可以激活Kafka的源连接器了,我们将使用的数据格式是Avro数据格式,Avro是项目中开发的面向行的远程过程调用和数据序列化框架。它使用 JSON 来定义数据类型和协议,并以紧凑的二进制格式序列化数据。
让我们用连接器的配置创建另一个文件。
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "MYSQL_USER",
"database.password": "MYSQL_PASSWORD",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.include.list": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}
如我们所见,我们在哪里配置了数据库详细信息以及我们要从中读取更改的数据库,请确保将 和 的值设置为您之前配置的值,现在我们将运行一个命令向Kafka注册,命令如下:
curl -i -X POST -H "Accept:application/json" -H "Content-type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json
现在,它应该能够从 Kafka 数据库中读取更改了。
下一步涉及使用 Spark 和 Hudi 从 Kafka 读取数据并将其以 Hudi 文件格式放入 Cloud。在开始使用它们之前,让我们先了解一下 Hudi 和 Spark 是什么。
Hudi 是一个开源数据管理框架,用于简化增量数据处理和数据管道开发。该框架更有效地管理数据生命周期等业务需求并提高数据质量。 Hudi 使您能够管理基于云的数据湖上的记录级数据,以简化变更数据捕获 (CDC) 和流式数据摄取,并帮助处理需要记录级更新和删除的数据隐私用例。 Hudi 管理的数据集使用开放存储格式存储在云存储桶中,而与 Hive 和/或 Spark 的集成提供了使用熟悉的工具对更新数据的近乎实时的访问
Spark 是专为大规模数据处理而设计的开源统一分析引擎。 Spark 为集群编程提供了一个具有隐式数据并行性和容错性的接口。 Spark 代码库最初是在加州大学伯克利分校开发的,后来捐赠给了一直在维护它的软件基金会。
目前,由于我们正在云端构建我们的解决方案,因此最好的方法是使用云端。云是一种用于处理大型数据集的托管服务,例如在大数据计划中使用的数据集。是 Cloud 的公共云产品的一部分。帮助用户处理、转换和理解大量数据。
在该实例中,Spark 和所有必需的库都已预安装。创建实例后,我们可以在其中运行以下 Spark 作业来完成我们的管道:
spark-submit
--packages org.apache.hudi:hudi-spark3.1.2-bundle_2.12:0.10.1,org.apache.spark:spark-avro_2.12:3.1.2
--master yarn --deploy-mode client
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer /usr/lib/hadoop/hudi-packages/hudi-utilities-bundle_2.12-0.10.1.jar
--table-type COPY_ON_WRITE --op UPSERT
--target-base-path gs://your-data-lake-bucket/hudi/customers
--target-table hudi_customers --continuous
--min-sync-interval-seconds 60
--source-class org.apache.hudi.utilities.sources.debezium.MysqlDebeziumSource
--source-ordering-field _event_origin_ts_ms
--hoodie-conf schema.registry.url=http://localhost:8081
--hoodie-conf hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/dbserver1.inventory.customers-value/versions/latest
--hoodie-conf hoodie.deltastreamer.source.kafka.topic=dbserver1.inventory.customers
--hoodie-conf bootstrap.servers=localhost:9092
--hoodie-conf auto.offset.reset=earliest
--hoodie-conf hoodie.datasource.write.recordkey.field=id
--hoodie-conf hoodie.datasource.write.partitionpath.field=id
这将运行一个 spark 作业,该作业从我们之前的 Get data from Kafka 推送并将其写入 Cloud。我们必须指定Kafka主题、URL和其他相关配置。
结论
构建数据湖的方法有很多种。我正在尝试展示如何使用 Kafka、Hudi、Spark 和 Cloud 构建数据湖。通过这样的设置,可以轻松扩展管道以管理大型数据工作负载!有关每种技术的更多详细信息,可以访问文档。可以自定义 Spark 作业以进行更细粒度的控制。此处显示的 Hudi 也可以与 Hive 或 Trino 集成。定制的数量是无穷无尽的。本文提供了如何使用上述工具构建基本数据管道的基本介绍!
暂无评论内容