我正在试图搭build一个Kafka-Storm“Hello World”系统。 我已经安装并运行了Kafka,当我向Kafka制作人发送数据时,我可以通过Kafka控制台消费者阅读。
我从“风暴入门”O'Reilly的书中拿出了第二章的例子,并修改它来使用Kafkaspout而不是常规的喷口。
当我运行应用程序时,数据已经在kafka中挂起,Kafkaspout的nextTuple没有得到任何消息 – 它进入,试图迭代协调器下的空pipe理器列表,然后退出。
我的环境是一个相当古老的Cloudera虚拟机,Storm 0.9和Kafka-Storm-0.9(最新),Kafka 2.9.2-0.7.0。
运行Apache风暴拓扑时,“无法删除文件stormconf.ser”错误
MAC(UNIX)系统上的PATH是什么?
风暴工作者连接
如何解决java.net.ConnectException:在Storm中拒绝连接
这是我如何定义spoutCon@R_404_6408@和拓扑:
String zookeepers = "localhost:2181"; spoutCon@R_404_6408@ spoutCon@R_404_6408@ = new spoutCon@R_404_6408@(new spoutCon@R_404[email protected](zookeepers,"/brokers"),"gtest","/kafka",// zookeeper root path for offset storing "Kafkaspout"); spoutCon@R_404[email protected](-1); KafkaspoutTester kafkaspout = new KafkaspoutTester(spoutCon@R_404_6408@); //Topology deFinition TopologyBuilder builder = new TopologyBuilder(); builder.setspout("word-reader",kafkaspout,1); builder.setBolt("word-normalizer",new Wordnormalizer()) .shuffleGrouping("word-reader"); builder.setBolt("word-counter",new WordCounter(),1) .fieldsGrouping("word-normalizer",new Fields("word")); //Con@R_404_6408@uration Con@R_404_6408@ conf = new Con@R_404_6408@(); conf.put("wordsFile",args[0]); conf.setDebug(false); //Topology run conf.put(Con@R_404[email protected]_MAX_spout_PENDING,1); cluster = new LocalCluster(); cluster.submitTopology("Getting-Started-Toplogie",conf,builder.createtopology());
有人可以帮我找出为什么我没有收到任何东西?
谢谢,G.
如果你已经消费了这个消息,除非你的生产者产生新的消息,否则它不应该再被读取。 这是因为您的代码中使用了-1的forceStartOffsetTime调用。
形成storm-contrib文档:
喷口中另一个非常有用的配置是能够强制喷口倒回到先前的偏移量。 您在spout con@R_404_6408@上执行forceStartOffsetTime,如下所示:
spoutCon@R_404[email protected](-2);
它将选择围绕该时间戳记录的最新偏移量开始消费。 您可以通过传入-1强制喷口总是从最新的偏移量开始,并且可以通过传入-2来强制它从最早的偏移量开始。
你是怎样生产的? 有一个片段是有用的。 你可以用-2替换-1,看看你是否收到任何东西,如果你的制作者没问题,那么你应该可以使用。
spoutCon@R_404_6408@ spoutConf = new spoutCon@R_404_6408@(...) spoutConf.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
spoutCon@R_404_6408@ spoutCon@R_404_6408@ = new spoutCon@R_404_6408@(new spoutCon@R_404[email protected](zookeepers,// name of topic used by producer & consumer "/kafka",// zookeeper root path for offset storing "Kafkaspout");
您正在使用“gtest”主题接收数据。 确保你是由制作者从这个主题发送数据。
然后在螺栓上打印那个元组
public void execute(Tuple tuple,BasicOutputCollector collector) { System.out.println(tuple); }
它应该在kafka中打印待处理的数据。
我经历了一场风波和Kafka的融合。 这些都是快速发展和相对年轻的项目,所以很难得到工作实例来开始你的发展。
为了帮助其他开发人员(希望能让其他人提供可以使用的有用示例),我开始了一个github项目,以存放与Storm / Kafka(和Esper)开发相关的代码片段。
欢迎您在这里查看> https://github.com/buildlackey/cep
(点击storm + kafka目录下的示例程序,可以启动并运行)。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。