重磅!Vertica集成Apache Hudi指南

1.总结

本文演示了如何使用与 Hudi 的外部表集成。在演示中,我们使用 Hudi on Spark 将数据摄取到 S3 并使用外部表来访问这些数据。

2.Hudi 简介

Hudi 是一种变更数据捕获 (CDC) 工具,可跨不同时间线在表中记录事务。 Hudi 代表并且是一个开源框架。 Hudi 提供 ACID 事务、可扩展的元数据处理,并统一流式和批处理数据处理。

以下流程图说明了该过程。使用安装在 Spark 上的 Hudi 将数据处理到 S3,并从外部表中读取 S3 中的数据更改。

3. 环境准备就绪,可以在数据库中运行以下命令设置访问的S3参数:

SELECT SET_CONFIG_PARAMETER('AWSAuth', 'accesskey:secretkey');
SELECT SET_CONFIG_PARAMETER('AWSRegion','us-east-1');
SELECT SET_CONFIG_PARAMETER('AWSEndpoint',’:9000');
SELECT SET_CONFIG_PARAMETER('AWSEnableHttps','0');

可能会有所不同,具体取决于为 S3 存储桶位置选择的 S3 对象存储。

4.与 Hudi 集成

要与 Hudi 集成,您首先需要将 Spark 与 Hudi 集成,配置 jars,并访问 AWS S3 连接。二是连接胡迪。然后对 S3 存储桶执行 、 、 等操作。

按照以下部分中的步骤将数据写入 .

在 Spark 上配置 Hudi 和 AWS S3

配置并与 Hudi 集成

4.1 在 Spark 上配置 Hudi 和 AWS S3

在 Spark 机器上运行以下命令。

这将下载Hudi包,配置jar文件,以及AWS S3读取、写入等所需的包。

/opt/spark/bin/spark-shell 
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"--packages org.apache.hudi:hudi-spark3-bundle_2.12:0.9.0,org.apache.spark:spark-avro_2.12:3.0.1

导入 Hudi:

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._

根据需要使用以下命令配置 Minio 访问密钥、密钥和其他 S3A 算法和路径。

spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "*****")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "*****")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "http://XXXX.9000")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.path.style.access", "true")
sc.hadoopConfiguration.set("fs.s3a.signing-algorithm","S3SignerType")

创建变量来存储 MinIO 的表名和 S3 路径。

val tableName = “Trips”
val basepath = “s3a://apachehudi/vertica/”

准备数据,使用Scala在spark中创建示例数据

val df = Seq(
("aaa","r1","d1",10,"US","20211001"),
("bbb","r2","d2",20,"Europe","20211002"),
("ccc","r3","d3",30,"India","20211003"),
("ddd","r4","d4",40,"Europe","20211004"),
("eee","r5","d5",50,"India","20211005"),
).toDF("uuid", "rider", "driver","fare","partitionpath","ts")

将数据写入 AWS S3 并验证此数据

df.write.format("org.apache.hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)

使用 Scala 运行以下命令,验证是否从 S3 存储桶中正确读取数据。

spark.read.format("hudi").load(basePath).createOrReplaceTempView("dta")
spark.sql("select _hoodie_commit_time, uuid, rider, driver, fare,ts, partitionpath from  dta order by uuid").show()

4.2 配置和HUDI集成

in 创建一个外部表,其中包含 S3 上 Hudi 表中的数据。我们创建了“”表。

CREATE EXTERNAL TABLE Trips
(
_hoodie_commit_time TimestampTz,
uuid varchar,
rider varchar,
driver varchar,
fare int,
ts varchar,
partitionpath varchar
)
AS COPY FROM
's3a://apachehudi/parquet/vertica/*/*.parquet' PARQUET;

运行以下命令验证是否正在读取外部表:

< @4.3 如何查看更改的数据

以下部分包含为查看更改的数据而执行的一些操作的示例。

4.3.1 写入数据

在本例中,我们使用 Scala 在 spark 中运行以下命令并附加一些数据:

val df2 = Seq(
("fff","r6","d6",50,"India","20211005")
).toDF("uuid", "rider", "driver","fare","partitionpath","ts")

通过运行以下命令将此数据附加到 S3 上的 Hudi 表:

df2.write.format("org.apache.hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)

4.3.2 更新数据

在本例中,我们更新 Hudi 表中的一条记录。需要导入数据来触发和更新数据:

val df3 = Seq(
("aaa","r1","d1",100,"US","20211001"),
("eee","r5","d5",500,"India","20211001")
).toDF("uuid", "rider", "driver","fare","partitionpath","ts")

运行以下命令将数据更新到 S3 上的 HUDI 表中:

df3.write.format("org.apache.hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)

以下是spark.sql的输出:

以下是输出:

4.3.3 创建和查看数据的历史快照

执行以下指向特定时间戳的 spark 命令:

val dd = spark.read
.format("hudi")
.option("as.of.instant", "20211007092600")
.load(basePath)				

使用以下方法将数据写入 S3:

dd.write.parquet("s3a://apachehudi/parquet/p2")

在本例中,我们读取 Hudi 表截至日期 ‘200’ 的快照。

dd.show

通过在文件上创建外部表来执行命令。

© 版权声明
THE END
喜欢就支持一下吧
点赞75赞赏 分享
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

取消
昵称表情代码图片

    暂无评论内容