flink 动态广播状态笔记
1.概要
往往我们在做flink任务计算的时候,需要动态的匹配规则,但是我们又不能每次修改都去重新部署服务,所以我们这里需要借助于flink的动态广播状态机制,来实时的处理我们的规则变化
2.样例
代码:
package com.kn.broadcast
import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.co.broadcastProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object broadcast01 {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置自定义广播流:定义广播规则 job1:storageFlag:yes (jobname:存储flag:开关value)
val broadcastConfig = env.socketTextStream("localhost", 9001)
.filter(_.contains("job1"))
.map(_.split(":")(2)) //获取flag值
.broadcast(new MapStateDescriptor("configFilter",BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO))
//定义数据集
env.socketTextStream("localhost",9000)
.connect(broadcastConfig) // 并关联广播状态数据
.process(new broadcastProcessFunction[String,String,String] {
//定义拦截规则
var flag:String = _
override def open(parameters: Configuration): Unit = {
println("open function")
flag="no"
super.open(parameters)
// getRuntimeContext.getbroadcastvariable("configFilter").toArray.foreach(println(_))
}
override def processElement(value: String, ctx: broadcastProcessFunction[String, String, String]#ReadOnlyContext, out: Collector[String]): Unit = {
println("processElement function: "+value)
if("yes".equals(flag)){
out.collect("需要检查存储状态,接收消息为:"+value+" flag:"+flag)
}else{
out.collect("不需要检查存储状态,接收消息为:"+value+" flag:"+flag)
}
}
//获取广播更新变量,广播中有新增流数据才会执行,根据并行度次数执行
override def processbroadcastElement(value: String, ctx: broadcastProcessFunction[String, String, String]#Context, out: Collector[String]): Unit = {
flag = value
println("processbroadcastElement function")
}
}).print()
env.execute("flink broadcast job")
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。