微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

从0开始学flink(2):flink能有哪些数据源

1.文件是数据源

 StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment
                .getExecutionEnvironment();
        //文件是数据源
        DataStreamSource<String> st= executionEnvironment.readTextFile("C:\\2.txt");
        st.print();
        executionEnvironment.execute();

 2.fromElements 读取元素

        DataStreamSource<Object> st1=executionEnvironment.fromElements("eeee",1, 34, "sss");//报错
        DataStreamSource<Object> st1=executionEnvironment.fromElements("eeee","123", "34", "sss");
        st1.print();
//元素必须为相同类型

3.读取集合

List<String> list=new ArrayList<>();
        list.add("qwee");
        list.add("123");
        DataStreamSource<String> st2= executionEnvironment.fromCollection(list);
        st2.print();

 4.读取kafka

  StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment
                .getExecutionEnvironment();      
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "student-group2");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "latest");
        DataStreamSource<String> student = executionEnvironment.addSource(new FlinkKafkaConsumer<>(
                "TOPIC",
                new SimpleStringSchema(),
                props)).setParallelism(1);
        student.print();

5.自定义source

import org.apache.flink.streaming.api.functions.source.sourceFunction;
import java.util.Random;


public class MySource implements SourceFunction<String> {

    Boolean running=true;
    @Override
    public void run(SourceContext<String> sourceContext) throws Exception {
        while (running) {
            Thread.sleep(1000);
            sourceContext.collect(String.valueOf(new Random().nextInt(99)));
        }
    }
    @Override
    public void cancel() {
        running=false;
    }
}

实现一个SourceFunction  每间隔1s发一个字符串类型的随机数字 sourceContext.collect(发送的数据)

 DataStreamSource<String> st3=executionEnvironment.addSource(new MySource());
 st3.print();

6.读取mq

https://github.com/apache/rocketmq-externals/tree/master/rocketmq-flink

7.读写hive表

Flink CDC(后面专门讲)

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐