Flink编程基本步骤:
1.创建流执行环境 StreamExecutionEnviroment.getExecutionEnviroment() 获取流环境。
2.加载数据源 Source
3.转换操作 Transformation
关于Flink 数据的基本操作 —— 四种分类
-
单条数据的操作 map filter
-
多条数据的操作 window
-
多个流合并成一个流操作 connect union join
Flink输入数据源 source
自带预定义Source
-
基于本地集合Source
-
应用场景,当程序写完之后,测试当前功能是否可用,开发测试用。
-
-
从元素 fromElements
-
从集合 fromCollection
-
基于Sequence的generateSequence
-
基于开始和结束的DataStream ,fromSequence
-
-
-
并行度设置
-
基于文件的Source
-
基于Socket的Source
env.socketTextStream("node1", 9999);
自定义Source
package cn.itcast.flink.api;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.sourceFunction;
import java.text.SimpleDateFormat;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* Author itcast
* Date 2021/11/29 14:56
* Desc - 每1秒钟随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)
* public class Order
* String oid;
* int uid;
* double money;
* long timestamp;
* String datetime;
* 每一秒钟生成一条数据
* 打印输出每条数据
* 执行流环境
*/
public class OrderSource {
public static void main(String[] args) throws Exception {
//1.创建流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.设置并行度
env.setParallelism(1);
//3.获取自定义数据源
//实现方式
DataStreamSource<Order> source = env.addSource(new OrderEmitSource());
//4.打印输出
source.printToErr();
//5.执行流环境
env.execute();
}
public static class OrderEmitSource implements SourceFunction<Order> {
//定义一个标记,用于标识当前持续生成数据
private volatile boolean isRunning = true;
/**
* 实现数据的生成,并将生成的数据输出 ctx 输出
*
* @param ctx
* @throws Exception
*/
@Override
public void run(SourceContext<Order> ctx) throws Exception {
//定义随机数
Random rm = new Random();
//时间转换格式工具
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//死循环,一直生成数据
while (isRunning) {
//随机数
String oid = UUID.randomUUID().toString();
//用户id ,随机 0~5 之间值
int uid = rm.nextInt(6);
//money 0~100之间的
double money = rm.nextDouble()*100;
//时间戳
long timestamp = System.currentTimeMillis();
//当前时间
String datetime = sdf.format(timestamp);
Order order = new Order(
oid,
uid,
money,
timestamp,
datetime
);
//收集数据
ctx.collect(order);
//程序休眠一秒接着执行
TimeUnit.SECONDS.sleep(1);
}
}
/**
* 用户取消生成的时候,取消生成
*/
@Override
public void cancel() {
isRunning = false;
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class Order {
private String oid;
private int uid;
private double money;
private long timestamp;
private String datetime;
}
}
实现ParallelSourceFunction 接口案例
并行化生成数据,算子上设置并行度 setParallelism(n)
package cn.itcast.flink.api;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.sourceFunction;
import java.text.SimpleDateFormat;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* Author itcast
* Date 2021/11/29 14:56
* Desc - 每1秒钟随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)
* public class Order
* String oid;
* int uid;
* double money;
* long timestamp;
* String datetime;
* 每一秒钟生成一条数据
* 打印输出每条数据
* 执行流环境
*/
public class OrderParallelismSource {
public static void main(String[] args) throws Exception {
//1.创建流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.设置并行度
env.setParallelism(1);
//3.获取自定义数据源
//实现方式
DataStreamSource<Order> source = env.addSource(new OrderEmitSource()).setParallelism(6);
//4.打印输出
source.printToErr();
//5.执行流环境
env.execute();
}
public static class OrderEmitSource implements ParallelSourceFunction<Order> {
//定义一个标记,用于标识当前持续生成数据
private volatile boolean isRunning = true;
/**
* 实现数据的生成,并将生成的数据输出 ctx 输出
*
* @param ctx
* @throws Exception
*/
@Override
public void run(SourceContext<Order> ctx) throws Exception {
//定义随机数
Random rm = new Random();
//时间转换格式工具
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//死循环,一直生成数据
while (isRunning) {
//随机数
String oid = UUID.randomUUID().toString();
//用户id ,随机 0~5 之间值
int uid = rm.nextInt(6);
//money 0~100之间的
double money = rm.nextDouble()*100;
//时间戳
long timestamp = System.currentTimeMillis();
//当前时间
String datetime = sdf.format(timestamp);
Order order = new Order(
oid,
uid,
money,
timestamp,
datetime
);
//收集数据
ctx.collect(order);
//程序休眠一秒接着执行
TimeUnit.SECONDS.sleep(5);
}
}
/**
* 用户取消生成的时候,取消生成
*/
@Override
public void cancel() {
isRunning = false;
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class Order {
private String oid;
private int uid;
private double money;
private long timestamp;
private String datetime;
}
}
实现RichParallelSourceFunction案例
# 创建数据库
create database test;
# 使用数据库
use test;
# 创建表和导入数据
DROP TABLE IF EXISTS `user`;
CREATE TABLE `user` (
`id` int(11) NOT NULL,
`username` varchar(255) CHaraCTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`password` varchar(255) CHaraCTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`name` varchar(255) CHaraCTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHaraCTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;
-- ----------------------------
-- Records of user
-- ----------------------------
INSERT INTO `user` VALUES (10, 'dazhuang', '123456', '大壮');
INSERT INTO `user` VALUES (11, 'erya', '123456', '二丫');
INSERT INTO `user` VALUES (12, 'sanpang', '123456', '三胖');
SET FOREIGN_KEY_CHECKS = 1;
2:Flink读取MysqL的数据源
package cn.itcast.flink.source;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.concurrent.TimeUnit;
/**
* Author itcast
* Date 2022/1/11 16:17
* Desc 读取MysqL数据表并打印输出
* 开发步骤:
* 1.创建和准备数据库和数据表 flink
* 2.获取流执行环境
* 3.设置并行度
* 4.添加自定义数据源,从MysqL中读取数据,实现 RichSourceFunction ,rich 增强富功能 open close getRuntimeContext
* 4.1. open 初始化动作,创建连接,创建 statement ,获取变量
* 4.2. run方法 读取数据表中数据并封装成对象
* 4.3. close方法 关闭statement和连接
* 5. 打印结果输出
* 6. 执行流环境
*/
public class UserSource {
public static void main(String[] args) throws Exception {
//2.获取流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//3.设置并行度
env.setParallelism(1);
//4.添加自定义数据源,从MysqL中读取数据,实现 RichSourceFunction ,rich 增强富功能 open close getRuntimeContext
DataStreamSource<User> source = env.addSource(new RichSourceFunction<User>() {
Connection conn = null;
Statement statement = null;
//标记
boolean isRunning = true;
/**
* 在所有执行source,首先要做的初始化工作
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
//1.设置 driver 驱动
Class.forName("com.MysqL.jdbc.Driver");
//2.获取连接 设置 url 用户名 密码
conn = DriverManager.getConnection(
"jdbc:MysqL://node1:3306/flink?useSSL=false",
"root",
"123456"
);
//3.创建 statement 基于 sql
statement = conn.createStatement();
}
/**
* 所有的元素都在这里执行
* @param ctx
* @throws Exception
*/
@Override
public void run(SourceContext<User> ctx) throws Exception {
String sql = "select id,username,password,name from user";
while (isRunning) {
//1.读取数据 statement.executeQuery 得到 ResultSet 结果集
ResultSet rs = statement.executeQuery(sql);
//2.遍历 ResultSet 是否有数据 hasNext() = true
while (rs.next()) {
User user = new User();
//3.将每条数据 赋值 对象 User
int id = rs.getInt("id");
String username = rs.getString("username");
String password = rs.getString("password");
String name = rs.getString("name");
user.setId(id);
user.setUsername(username);
user.setPassword(password);
user.setName(name);
//4.将 User 收集 ctx.collect(user)
ctx.collect(user);
}
TimeUnit.MINUTES.sleep(5);
}
}
@Override
public void cancel() {
//将flag置为 false
isRunning = false;
}
/**
* 所有的元素执行完毕的收尾工作
* @throws Exception
*/
@Override
public void close() throws Exception {
//关闭 statement
if (!statement.isClosed()) {
statement.close();
}
//关闭 connection
if (!conn.isClosed()) {
conn.close();
}
}
});
//4.1. open 初始化动作,创建连接,创建 statement ,获取变量
//4.2. run方法 读取数据表中数据并封装成对象
//4.3. close方法 关闭statement和连接
//5. 打印结果输出
source.printToErr();
//6. 执行流环境
env.execute();
}
public static class User {
// id
private int id;
// username
private String username;
// password
private String password;
// name
private String name;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getpassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "User{" +
"id=" + id +
", username='" + username + '\'' +
", password='" + password + '\'' +
", name='" + name + '\'' +
'}';
}
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。