2. Flink 的 DataSource 数据源
4) 自定义 Source
当然也可以自定义数据源,有两种方式实现:
- 通过实现 SourceFunction 接口来自定义无并行度(也就是并行度只能为 1)的 Source。
- 通过实现 ParallelSourceFunction 接口或者继承 RichParallelSourceFunction 来自定义有并行度的数据源。
代码示例:
package com.it.flink.source import java.util.Properties import org.apache.flink.api.common.typeinfo.Type@R_857_4045@ion import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, createTuple2Type@R_857_4045@ion, createType@R_857_4045@ion} import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema} import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer object SourceFromKafkaBykeyvalue { def main(args: Array[String]): Unit = { // 1. 初始化流计算的环境 val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment streamEnv.setParallelism(1) val properties: Properties = new Properties() properties.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092") properties.setProperty("group.id", "fink02") properties.setProperty("key.deserializer", classOf[StringDeserializer].getName) properties.setProperty("value.deserializer", classOf[StringDeserializer].getName) properties.setProperty("auto.offset.reset", "latest") val stream: DataStream[(String, String)] = streamEnv.addSource( new FlinkKafkaConsumer[(String, String)]("topic2", new MyKafkaReader, properties)) stream.print() streamEnv.execute("SourceFromKafkaBykeyvalue") } } class MyKafkaReader extends KafkaDeserializationSchema[(String, String)] { override def isEndOfStream(t: (String, String)): Boolean = false override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String) = { if (consumerRecord == null) { return ("null", "null") } var key = "" var value = "" if (consumerRecord.key() != null) { key = new String(consumerRecord.key(), "UTF-8") } if (consumerRecord.value() != null) { value = new String(consumerRecord.value(), "UTF-8") } (key, value) } override def getProducedType: Type@R_857_4045@ion[(String, String)] = { createTuple2Type@R_857_4045@ion(createType@R_857_4045@ion[String], createType@R_857_4045@ion[String]) } }
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。