数据湖有新解!Apache Hudi 与 Apache Flink 集成

 联系我们     |      2020-10-26 04:38

原标题:数据湖有新解!Apache Hudi 与 Apache Flink 集成

简介:纵不悦目大数据周围成熟、活跃、有生命力的框架,无一不是设计优雅,能与其他框架相互融相符,彼此借力,各专所长。

Apache Hudi 是由 Uber 开发并开源的数据湖框架,它于 2019 年 1 月进入 Apache 孵化器孵化,次年 5 月份顺当卒业晋升为 Apache 顶级项现在。是现在最为炎门的数据湖框架之一。

1. 为何要解耦

Hudi 自诞生至今不息操纵 Spark 行为其数据处理引擎。倘若用户想操纵 Hudi 行为其数据湖框架,就必须在其平台技术栈中引入 Spark。放在几年前,操纵 Spark 行为大数据处理引擎能够说是很往往甚至是理所自然的事。由于 Spark 既能够进走批处理也能够操纵微批模拟流,流批一体,一套引擎解决流、批题目。然而,近年来,随着大数据技术的发展,同为大数据处理引擎的 Flink 逐渐进入人们的视野,并在计算引擎周围获占有了必定的市场,大数据处理引擎不再是一家独大。在大数据技术社区、论坛等领地,Hudi 是否声援操纵 Flink 计算引擎的的声音最先逐渐展现,并日渐反复。以是使 Hudi 声援 Flink 引擎是个有价值的事情,而集成 Flink 引擎的前挑是 Hudi 与 Spark 解耦。

同时,纵不悦目大数据周围成熟、活跃、有生命力的框架,无一不是设计优雅,能与其他框架相互融相符,彼此借力,各专所长。因此将 Hudi 与 Spark 解耦,将其变成一个引擎无关的数据湖框架,无疑是给 Hudi 与其他组件的融相符创造了更众的能够,使得 Hudi 能更益的融入大数据生态圈。

2. 解耦难点

Hudi 内部操纵 Spark API 像吾们往往开发操纵 List 相通稀松往往。自从数据源读取数据,到最后写出数据到外,无处不是操纵 Spark RDD 行为主要数据组织,甚至连平庸的工具类,都操纵 Spark API 实现,能够说 Hudi 就是用 Spark 实现的一个通用数据湖框架,它与 Spark 的绑定可谓是深入骨髓。

此外,此次解耦后集成的主要引擎是 Flink。而 Flink 与 Spark 在核心抽象上迥异很大。Spark 认为数据是有界的,其核心抽象是一个有限的数据荟萃。而 Flink 则认为数据的内心是流,其核心抽象 DataStream 中包含的是各栽对数据的操作。同时,Hudi 内部还存在众处同时操作众个 RDD,以及将一个 RDD 的处理效果与另一个 RDD 说相符处理的情况,这栽抽象上的区别以及实眼前对于中心效果的复用,使得 Hudi 在解耦抽象上难以操纵联相符的 API 同时操作 RDD 和 DataStream。

3. 解耦思路

理论上,Hudi 操纵 Spark 行为其计算引擎无非是为了操纵 Spark 的分布式计算能力以及 RDD 雄厚的算子能力。抛开分布式计算能力外,Hudi 更众是把 RDD 行为一个数据组织抽象,而 RDD 内心上又是一个有界数据集,因此,把 RDD 换成 List,在理论上十足可走(自然,能够会捐躯些性能)。为了尽能够保证 Hudi Spark 版本的性能和安详性。吾们能够保留将有界数据集行为基本操作单位的设定,Hudi 主要操作 API 不变,将 RDD 抽取为一个泛型,Spark 引擎实现照样操纵 RDD,其他引擎则按照实际情况操纵 List 或者其他有界数据集。

解耦原则:

1)联相符泛型。Spark API 用到的 JavaRDD,JavaRDD,JavaRDD 联相符操纵泛型 I,K,张志军当选长春市市长(图|简历)O 代替;

2)往 Spark 化。抽象层一切 API 必须与 Spark 无关。涉及到详细操作难以在抽象层实现的,改写为抽象手段,引入 Spark 子类实现。

例如:Hudi 内部众处操纵到了 JavaSparkContext#map() 手段,往 Spark 化,则必要将 JavaSparkContext 暗藏,针对该题目吾们引入了 HoodieEngineContext#map() 手段,该手段会屏蔽 map 的详细实现细节,从而在抽象成实现往 Spark 化。

3)抽象层尽量缩短改动,保证 Hudi 原版功能和性能;

4)操纵 HoodieEngineContext 抽象类替换 JavaSparkContext,挑供运走环境上下文。

4.Flink 集成设计

Hudi 的写操作在内心上是批处理,DeltaStreamer 的不息模式是议决循环进走批处理实现的。为操纵联相符 API,Hudi 集成 Flink 时选择攒一批数据后再进走处理,末了联相符进走挑交(这边 Flink 吾们操纵 List 来攒批数据)。

攒批操作最容易想到的是议决操纵时间窗口来实现,然而,操纵窗口,在某个窗口异国数据流时兴,将异国输出数据,Sink 端难以判定联相符批数据是否已经处理完。因此吾们操纵 Flink 的检查点机制来攒批,每两个 Barrier 之间的数据为一个批次,当某个子义务中异国数据时,mock 效果数据凑数。云云在 Sink 端,当每个子义务都有效果数据下发时即可认为一批数据已经处理完善,能够实走 commit。

DAG 如下:

source 授与 Kafka 数据,转换成 List; InstantGeneratorOperator 生成全局唯一的 instant.当上一个 instant 未完善或者现在批次众数据时,不创建新的 instant; KeyBy partitionPath 按照 partitionPath 分区,避免众个子义务写联相符个分区; WriteProcessOperator 实走写操作,当现在分区众数据时,向下游发送空的效果数据凑数; CommitSink 授与上游义务的计算效果,当收到 parallelism 个效果时,认为上游子义务通盘实走完善,实走 commit.

注:InstantGeneratorOperator 和 WriteProcessOperator 均为自定义的 Flink 算子,InstantGeneratorOperator 会在其内部壅塞检查上一个 instant 的状态,保证全局只有一个 inflight(或 requested)状态的 instant.WriteProcessOperator 是实际实走写操作的地方,其写操作在 checkpoint 时触发。

5. 实现示例

1) HoodieTable

/**

* Abstract implementation of a HoodieTable.

* @param <T> Sub type of HoodieRecordPayload

* @param <I> Type of inputs

* @param <K> Type of keys

* @param <O> Type of outputs

*/

public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implements Serializable {

protected final HoodieWriteConfig config;

protected final HoodieTableMetaClient metaClient;

protected final HoodieIndex<T, I, K, O> index;

public abstract HoodieWriteMetadata<O> upsert(HoodieEngineContext context, String instantTime,

I records);

public abstract HoodieWriteMetadata<O> insert(HoodieEngineContext context, String instantTime,

I records);

public abstract HoodieWriteMetadata<O> bulkInsert(HoodieEngineContext context, String instantTime,

I records, Option<BulkInsertPartitioner<I>> bulkInsertPartitioner);

}

HoodieTable 是 Hudi 的核心抽象之一,其中定义了外声援的 insert,upsert,bulkInsert 等操作。以 upsert 为例,输入数据由原先的 JavaRDD inputRdds 换成了 I records, 运走时 JavaSparkContext jsc 换成了 HoodieEngineContext context.

从类注解能够望到 T,I,K,O 别离代外了 Hudi 操作的负载数据类型、输入数据类型、主键类型以及输出数据类型。这些泛型将贯穿整个抽象层。

2) HoodieEngineContext

/**

* Base class contains the context information needed by the engine at runtime. It will be extended by different

* engine implementation if needed.

*/

public abstract class HoodieEngineContext {

public abstract <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism);

public abstract <I, O> List<O> flatMap(List<I> data, SerializableFunction<I, Stream<O>> func, int parallelism);

public abstract <I> void foreach(List<I> data, SerializableConsumer<I> consumer, int parallelism);

}

HoodieEngineContext 扮演了 JavaSparkContext 的角色,它不光能挑供一切 JavaSparkContext 能挑供的新闻,还封装了 map,flatMap,foreach 等诸众手段,暗藏了 JavaSparkContext#map(),JavaSparkContext#flatMap(),JavaSparkContext#foreach() 等手段的详细实现。

以 map 手段为例,在 Spark 的实现类 HoodieSparkEngineContext 中,map 手段如下:

@Override

public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism) {

return javaSparkContext.parallelize(data, parallelism).map(func::apply).collect();

}

在操作 List 的引擎中其实现能够为(分别手段需仔细线程坦然题目,慎用 parallel()):

@Override

public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism) {

return data.stream().parallel().map(func::apply).collect(Collectors.toList());

}

注:map 函数中抛出的变态,能够议决包装 SerializableFunction func 解决.

这边简要介绍下 SerializableFunction:

@FunctionalInterface

public interface SerializableFunction<I, O> extends Serializable {

O apply(I v1) throws Exception;

}

该手段实际上是 java.util.function.Function 的变栽,与java.util.function.Function 分别的是 SerializableFunction 能够序列化,能够抛变态。引入该函数是由于 JavaSparkContext#map() 函数能授与的入参必须可序列,同时在hudi的逻辑中,有众处必要抛变态,而在 Lambda 外达式中进走 try catch 代码会略显肥胖,不太优雅。

6.近况和后续计划

6.1 做事时间轴

2020 年 4 月,T3 出走(杨华@vinoyang,王祥虎@wangxianghu)和阿里巴巴的同学(李少锋@leesf)以及若干其他幼友人一首设计、敲定了该解耦方案;

2020 年 4 月,T3 出走(王祥虎@wangxianghu)在内部完善了编码实现,并进走了初步验证,得出方案可走的结论;

2020 年 7 月,T3 出走(王祥虎@wangxianghu)将该设计实现和基于新抽象实现的 Spark 版本推向社区(HUDI-1089);

2020 年 9 月 26 日,顺丰科技基于 T3 内片面支修改完善的版本在 Apache Flink Meetup(深圳站)公开 PR, 使其成为业界第一个在线上操纵 Flink 将数据写 Hudi 的企业。

2020 年 10 月 2 日,HUDI-1089 相符并入 Hudi 主分支,标志着 Hudi-Spark 解耦完善。

6.2 后续计划

1)推进 Hudi 和 Flink 集成

将 Flink 与 Hudi 的集成尽快推向社区,初期该特性能够只声援 Kafka 数据源。

2)性能优化

为保证 Hudi-Spark 版本的安详性和性能,此次解耦异国太众考虑 Flink 版本能够存在的性能题目。

3)类 flink-connector-hudi 第三方包开发

将 Hudi-Flink 的绑定做成第三方包,用户能够在 Flink 行使中以编码手段读取肆意数据源,议决这个第三方包写入 Hudi。

作者:王祥虎

本文为阿里云原创内容,未经批准不得转载。