微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Flink接入apollo简单示例

1、创建配置文件

apollo官网示例显示java api使用配置文件方式如下:

配置文件内容

     app.id=java-apollo-test-20190724
     apollo.Meta=http://apollo-server:10080
     apollo.cacheDir=./cache

2、示例代码

public class StreamingJob2 {

	public static void main(String[] args) throws Exception {
		// set up the streaming execution environment
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
		env
				.socketTextStream("localhost", 9999)
				.map(new MapFunction<String, Integer>() {
					@Override
					public Integer map(String value) throws Exception {
						return Integer.parseInt(value);
					}
				})
				.process(new ProcessFunction<Integer, Integer>() {

					private int maxnum;

					@Override
					public void open(Configuration parameters) throws Exception {
						super.open(parameters);
						//config instance is singleton for each namespace and is never null
						Config config = ConfigService.getAppConfig();
						String someKey = "max.num";
						String someDefaultValue = "3";
                        maxnum = Integer.parseInt(config.getProperty(someKey, someDefaultValue));
						System.out.println("apollo maxnum: " + maxnum);

                        config.addchangelistener(new Configchangelistener() {
                            @Override
                            public void onChange(ConfigChangeEvent changeEvent) {
                                for (String key : changeEvent.changedKeys()) {
                                    ConfigChange change = changeEvent.getChange(key);
                                    if (someKey.equalsIgnoreCase(change.getPropertyName())) {
                                        maxnum = Integer.parseInt(change.getNewValue());
                                    }
                                }
                                System.out.println("maxnum is: " + maxnum);
                            }
                        });
					}

					@Override
					public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
					    if (value > maxnum) {
					        out.collect(value);
                        }
					}
				})
				.print("--->");
		env.execute(StreamingJob2.class.getCanonicalName());
	}
}

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐