微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Flink实例三十一:状态管理二自定义键控状态一ValueState

 

ValueState[T]保存单个的值,值的类型为T。

  • get操作: ValueState.value()
  • set操作: ValueState.update(value: T)

实例一

scala version

复制代码

val sensorData: DataStream[SensorReading] = ...
val keyedData: KeyedStream[SensorReading, String] = sensorData.keyBy(_.id)

val alerts: DataStream[(String, Double, Double)] = keyedData
  .flatMap(new TemperatureAlertFunction(1.7))

class TemperatureAlertFunction(val threshold: Double)
  extends RichFlatMapFunction[SensorReading, (String, Double, Double)] {
  private var lastTempState: ValueState[Double] = _

  override def open(parameters: Configuration): Unit = {
    val lastTempDescriptor = new ValueStateDescriptor[Double](
      "lastTemp", classOf[Double])

    lastTempState = getRuntimeContext.getState[Double](lastTempDescriptor)
  }

  override def flatMap(
    reading: SensorReading,
    out: Collector[(String, Double, Double)]
  ): Unit = {
    val lastTemp = lastTempState.value()
    val tempDiff = (reading.temperature - lastTemp).abs
    if (tempDiff > threshold) {
      out.collect((reading.id, reading.temperature, tempDiff))
    }
    this.lastTempState.update(reading.temperature)
  }
}

复制代码

上面例子中的FlatMapFunction只能访问当前处理的元素所包含的key所对应的状态变量。

不同key对应的keyed state是相互隔离的。

  • 通过RuntimeContext注册StateDescriptor。StateDescriptor以状态state的名字和存储的数据类型为参数。数据类型必须指定,因为Flink需要选择合适的序列化器。
  • open()方法中创建state变量。注意复习之前的RichFunction相关知识。

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐