微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

flink 流 UDF

package UDFTestScalar

import ceshi.SensorReading
import org.apache.flink.streaming.api._
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.functions._
import org.apache.flink.types.Row

object ScalarFunctionTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val blinkStreamSettings = EnvironmentSettings.newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()
    val tableEnv = StreamTableEnvironment.create(env, blinkStreamSettings)


    val filePath = "C:\\代码\\myflink\\src\\main\\resources\\sensor"
    /*
    aa 10 1547718199
    aa 11 1547718200
     */

    val inputStream = env.readTextFile(filePath)
    val dataStream = inputStream.map(date => {
      val arr = date.split(" ")
      SensorReading(arr(0), arr(1).todouble, arr(2).toLong)
    })
      // 这里已经提前定义好水位线了 最大接受延迟1s的数据
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
        override def extractTimestamp(element: SensorReading): Long = {
          element.shiJianChuo * 1000L  // 毫秒数
        }
      })
    // 指定某个字段为事件时间
    val sensorTable = tableEnv.fromDataStream(dataStream, 'id, 'wendu, 'shiJianChuo.rowtime() as 'ts)

    // 调用自定义hash函数 对id进行hash运算
    // 1 table api
//    val hashCode1 = new HashCode(23)
//    val resultTable1 = sensorTable
//      .select('id, 'ts, hashCode1('id))
//    resultTable1.toAppendStream[Row].print("table api")

    // 2 sql
        val hashCode1 = new HashCode(23)
    tableEnv.createTemporaryView("sensor",sensorTable)
    tableEnv.registerFunction("hashCode1",hashCode1)
    val resultTable2 = tableEnv.sqlQuery(
      """
        |select
        |id,
        |ts,
        |hashCode1(id)
        |from sensor
        |""".stripMargin
    )
    resultTable2.toAppendStream[Row].print("sql api")


    env.execute("table execute")
  }

}

// 自定义标量函数 factor:随机数因子  s:输入元素
class HashCode(factor:Int) extends ScalarFunction{
  def eval(s:String): Int ={
    s.hashCode * factor - 10000
  }
}

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐