1. 声明
当前的内容主要为记录在学习Apache Flink中遇到的问题和主要记录访问msyql实现SQL查询的基本操作
主要内容为:
pom文件,其中${flink.version}@H_404_21@为
1.13.0@H_404_21@
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope> -->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope> -->
</dependency>
<!-- Add connector dependencies here. They must be in the default scope
(compile). -->
<!-- 直接导入需要的flink到kafka的连接器 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 提供Table Api的功能 (java版的) -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- 提供本地运行的能力 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.13.0</version>
<scope>provided</scope>
</dependency>
<!-- 提供jdbc的连接器的,可以连接数据库 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>MysqL</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.13</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
@H_404_21@
2. 准备
1.提供通过flink连接MysqL的连接器(手动导入maven依赖):官方文档
3.准备一个数据库flink_test@H_404_21@并创建一个
t_user@H_404_21@的表
3. 主要demo
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.connector.jdbc.table.JdbcTableSourceSinkFactory;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.factories.TableFactoryUtil;
import org.apache.flink.table.sinks.CsvAppendTableSinkFactory;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import com.hy.flink.test.data.StudentDatas;
/**
*
* @author hy
* @createTime 2021-06-12 13:56:26
* @description 当前内容主要为测试当前的TableApi的基本操作
* 注意使用的时候需要导入对应的maven依赖:官方依赖否则编译报错
*
*/
public class TableApiTest {
@SuppressWarnings("deprecation")
public static void main(String[] args) {
// 无法启动:使用useAnyPlanner
// EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useAnyPlanner().inStreamingMode().build();
// 无法启动:使用uSEOldplanner
// EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().uSEOldplanner().inStreamingMode().build();
// 可以正常启动:使用useBlinkPlanner,但是会报错:MiniCluster is not yet running or has already been shut down.
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.createLocalEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(fsEnv,fsSettings);
tableEnv.executesql("CREATE TABLE t_user (\n" +
"id INT,\n" +
"name VARCHAR(50),\n" +
"age INT,\n" +
"score DOUBLE,\n" +
"className VARCHAR(50)\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'driver'='com.MysqL.cj.jdbc.Driver',\n"+
" 'url'='jdbc:MysqL://localhost:3306/flink_test?serverTimezone=UTC&useUnicode=true&characterEncoding=UTF-8',\n" +
" 'table-name'='t_user',\n" +
" 'username'='root',\n"+
" 'password'='root'\n"+
")");
Table sqlQuery = tableEnv.sqlQuery("select * from t_user where name='张三'");
TableResult tableResult = sqlQuery.execute();
CloseableIterator<Row> collect = tableResult.collect();
while(collect.hasNext()) {
Row row = collect.next();
System.out.println(row);
}
}
}
@H_404_21@
执行结果:
主要就是在执行sql的时候需要指定with并指定连接器方式和其他的属性
4. 主要出现的错误
Partial inserts are not supported@H_404_21@(当前不支持部分插入操作)
例如:只能insert into t_user(id,name,age,score,className) values(1,'张三',18,55.5,'201'),(2,'李四',22,59.5,'202')@H_404_21@不能使用
insert into t_user values(1,'张三',18,55.5,'201'),(2,'李四',22,59.5,'202')@H_404_21@,也就是不能手动选择添加的列项
Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath.@H_404_21@(这个主要出现在不能使用local方式运行,导致的问题,如果需要在本地运行需要使用useBlinkPlanner()即可解决)
当前的Flink中的可以使用useBlinkPlanner@H_404_21@在本地执行连接并执行sql API的操作,但是使用
useAnyPlanner和uSEOldplanner@H_404_21@是不行的,会出现报错2的情况
5.总结
1.当前的Flink虽然提供了操作数据库的各种连接器的sql API但是在本地测试的时候还是以BlinkPlanner方式才可以执行
2.使用sql API还是需要schame的,否则无法执行操作
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。