微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Flink实例118:flink-sql使用二十四一文了解基于Flink构建流批一体数仓的技术点一

来源:https://mp.weixin.qq.com/s/ECe_bn9HzFzXTlfEnAaLBg

0 背景

基于Flink构建流批一体的实时数仓是目前数据仓库领域比较火的实践方案。随着Flink的不断迭代,其提供的一系列技术特性使得用户构建流批一体的应用变得越来越方便。本文将以Flink1.12为例,一一介绍这些特性的基本使用方式,主要包括以下内容

  • Flink集成Hive
  • Hive Catalog与Hive Dialect
  • Flink读写Hive
  • Flink upsert-kafka连接器
  • Flink CDC的connector

Flink集成Hive

  使用Hive构建数据仓库已经成为了比较普遍的一种解决方案。目前,一些比较常见的大数据处理引擎,都无一例外兼容Hive。Flink从1.9开始支持集成Hive,不过1.9版本为beta版,不推荐在生产环境中使用。在Flink1.10版本中,标志着对 Blink的整合宣告完成,对 Hive 的集成也达到了生产级别的要求。值得注意的是,不同版本的Flink对于Hive的集成有所差异,本文将以最新的Flink1.12版本为例,阐述Flink集成Hive的简单步骤,以下是全文,希望对你有所帮助。

1 Flink集成Hive的基本方式

Flink 与 Hive 的集成主要体现在以下两个方面:

  • 持久化元数据

  Flink利用 Hive 的 metastore 作为持久化的 Catalog,我们可通过HiveCatalog将不同会话中的 Flink 元数据存储到 Hive metastore 中。例如,我们可以使用HiveCatalog将其 Kafka的数据源表存储在 Hive metastore 中,这样该表的元数据信息会被持久化到Hive的metastore对应的元数据库中,在后续的 sql 查询中,我们可以重复使用它们。

  • 利用 Flink 来读写 Hive 的表。

  Flink打通了与Hive的集成,如同使用Sparksql或者Impala操作Hive中的数据一样,我们可以使用Flink直接读写Hive中的表。

HiveCatalog的设计提供了与 Hive 良好的兼容性,用户可以”开箱即用”的访问其已有的 Hive表。不需要修改现有的 Hive metastore,也不需要更改表的数据位置或分区。

2 Flink集成Hive的步骤

Flink支持的Hive版本

大版本V1V2V3V4V5V6V7
1.0 1.0.0 1.0.1          
1.1 1.1.0 1.1.1          
1.2 1.2.0 1.2.1 1.2.2        
2.0 2.0.0 2.0.1          
2.1 2.1.0 2.1.1          
2.2 2.2.0            
2.3 2.3.0 2.3.1 2.3.2 2.3.3 2.3.4 2.3.5 2.3.6
3.1 3.1.0 3.1.1 3.1.2      

值得注意的是,对于不同的Hive版本,可能在功能方面有所差异,这些差异取决于你使用的Hive版本,而不取决于Flink,一些版本的功能差异如下:

  • Hive 内置函数在使用 Hive-1.2.0 及更高版本时支持
  • 列约束,也就是 PRIMARY KEY 和 NOT NULL,在使用 Hive-3.1.0 及更高版本时支持
  • 更改表的统计信息,在使用 Hive-1.2.0 及更高版本时支持
  • DATE统计信息,在使用 Hive-1.2.0 及更高版时支持
  • 使用 Hive-2.0.x 版本时不支持写入 ORC 表。

依赖项

  本文以Flink1.12为例,集成的Hive版本为Hive2.3.4。集成Hive需要额外添加一些依赖jar包,并将其放置在Flink安装目录下的lib文件夹下,这样我们才能通过 Table API 或 sql Client 与 Hive 进行交互。

  另外,Apache Hive 是基于 Hadoop 之上构建的, 所以还需要 Hadoop 的依赖,配置好HADOOP_CLAsspATH即可。这一点非常重要,否则在使用Flinksql Cli查询Hive中的表时,会报如下错误

java.lang.classNotFoundException: org.apache.hadoop.mapred.JobConf

配置HADOOP_CLAsspATH,需要在/etc/profile文件中配置如下的环境变量

export HADOOP_CLAsspATH=`hadoop classpath`

  Flink官网提供了两种方式添加Hive的依赖项。第一种是使用 Flink 提供的 Hive jar包(根据使用的 metastore 的版本来选择对应的 Hive jar),建议优先使用Flink提供的Hive jar包,这种方式比较简单方便。本文使用的就是此种方式。当然,如果你使用的Hive版本与Flink提供的Hive jar包兼容的版本不一致,你可以选择第二种方式,即别添加每个所需的 jar 包。

  下面列举了可用的jar包及其适用的Hive版本,我们可以根据使用的Hive版本,下载对应的jar包即可。比如本文使用的Hive版本为Hive2.3.4,所以只需要下载flink-sql-connector-hive-2.3.6即可,并将其放置在Flink安装目录的lib文件夹下。

metastore versionMaven dependencysql Client JAR
1.0.0 ~ 1.2.2 flink-sql-connector-hive-1.2.2 Download
2.0.0 ~2.2.0 flink-sql-connector-hive-2.2.0 Download
2.3.0 ~2.3.6 flink-sql-connector-hive-2.3.6 Download
3.0.0 ~ 3.1.2 flink-sql-connector-hive-3.1.2 Download

  上面列举的jar包,是我们在使用Flink sql Cli所需要的jar包,除此之外,根据不同的Hive版本,还需要添加如下jar包。以Hive2.3.4为例,除了上面的一个jar包之外,还需要添加下面两个jar包

  flink-connector-hive_2.11-1.12.0.jarhive-exec-2.3.4.jar。其中hive-exec-2.3.4.jar包存在于Hive安装路径下的lib文件夹。flink-connector-hive_2.11-1.12.0.jar的下载地址为:

https://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_2.11/1.12.0/

NOTE:black_nib::Flink1.12集成Hive只需要添加如下三个jar包,以Hive2.3.4为例,分别为:

flink-sql-connector-hive-2.3.6

flink-connector-hive_2.11-1.12.0.jar

hive-exec-2.3.4.jar

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐