微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

flink 动态广播状态

flink 动态广播状态笔记

1.概要

往往我们在做flink任务计算的时候,需要动态的匹配规则,但是我们又不能每次修改都去重新部署服务,所以我们这里需要借助于flink的动态广播状态机制,来实时的处理我们的规则变化

2.样例

本实例,我们做的是增加动态规则判断,根据实时规则变化来判断是否需要做外部存储校验

代码

package com.kn.broadcast

import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.co.broadcastProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object broadcast01 {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //设置自定义广播流:定义广播规则 job1:storageFlag:yes   (jobname:存储flag:开关value)
   val broadcastConfig = env.socketTextStream("localhost", 9001)
     .filter(_.contains("job1"))
     .map(_.split(":")(2))  //获取flag值
      .broadcast(new MapStateDescriptor("configFilter",BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO))

    //定义数据集
    env.socketTextStream("localhost",9000)
      .connect(broadcastConfig) // 并关联广播状态数据
      .process(new broadcastProcessFunction[String,String,String] {
        //定义拦截规则
        var flag:String = _

        override def open(parameters: Configuration): Unit = {
          println("open function")
          flag="no"
          super.open(parameters)
//          getRuntimeContext.getbroadcastvariable("configFilter").toArray.foreach(println(_))
        }

        override def processElement(value: String, ctx: broadcastProcessFunction[String, String, String]#ReadOnlyContext, out: Collector[String]): Unit = {
          println("processElement function: "+value)
          if("yes".equals(flag)){
            out.collect("需要检查存储状态,接收消息为:"+value+" flag:"+flag)
          }else{
            out.collect("不需要检查存储状态,接收消息为:"+value+" flag:"+flag)
          }
        }

        //获取广播更新变量,广播中有新增流数据才会执行,根据并行度次数执行
        override def processbroadcastElement(value: String, ctx: broadcastProcessFunction[String, String, String]#Context, out: Collector[String]): Unit = {
          flag = value
          println("processbroadcastElement function")
        }
      }).print()

    env.execute("flink broadcast job")
  }

}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐