微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

KafkaSpout没有收到来自Kafka的任何东西

我正在试图搭build一个Kafka-Storm“Hello World”系统。 我已经安装并运行了Kafka,当我向Kafka制作人发送数据时,我可以通过Kafka控制台消费者阅读。

我从“风暴入门”O'Reilly的书中拿出了第二章的例子,并修改它来使用Kafkaspout而不是常规的喷口。

当我运行应用程序时,数据已经在kafka中挂起,Kafkaspout的nextTuple没有得到任何消息 – 它进入,试图迭代协调器下的空pipe理器列表,然后退出

我的环境是一个相当古老的Cloudera虚拟机,Storm 0.9和Kafka-Storm-0.9(最新),Kafka 2.9.2-0.7.0。

运行Apache风暴拓扑时,“无法删除文件stormconf.ser”错误

MAC(UNIX)系统上的PATH是什么?

风暴工作者连接

如何解决java.net.ConnectException:在Storm中拒绝连接

这是我如何定义spoutCon@R_404_6408@和拓扑:

String zookeepers = "localhost:2181"; spoutCon@R_404_6408@ spoutCon@R_404_6408@ = new spoutCon@R_404_6408@(new spoutCon@R_404[email protected](zookeepers,"/brokers"),"gtest","/kafka",// zookeeper root path for offset storing "Kafkaspout"); spoutCon@R_404[email protected](-1); KafkaspoutTester kafkaspout = new KafkaspoutTester(spoutCon@R_404_6408@); //Topology deFinition TopologyBuilder builder = new TopologyBuilder(); builder.setspout("word-reader",kafkaspout,1); builder.setBolt("word-normalizer",new Wordnormalizer()) .shuffleGrouping("word-reader"); builder.setBolt("word-counter",new WordCounter(),1) .fieldsGrouping("word-normalizer",new Fields("word")); //Con@R_404_6408@uration Con@R_404_6408@ conf = new Con@R_404_6408@(); conf.put("wordsFile",args[0]); conf.setDebug(false); //Topology run conf.put(Con@R_404[email protected]_MAX_spout_PENDING,1); cluster = new LocalCluster(); cluster.submitTopology("Getting-Started-Toplogie",conf,builder.createtopology());

有人可以帮我找出为什么我没有收到任何东西?

谢谢,G.

如果你已经消费了这个消息,除非你的生产者产生新的消息,否则它不应该再被读取。 这是因为您的代码中使用了-1的forceStartOffsetTime调用

形成storm-contrib文档:

喷口中另一个非常有用的配置是能够强制喷口倒回到先前的偏移量。 您在spout con@R_404_6408@上执行forceStartOffsetTime,如下所示:

spoutCon@R_404[email protected](-2);

它将选择围绕该时间戳记录的最新偏移量开始消费。 您可以通过传入-1强制喷口总是从最新的偏移量开始,并且可以通过传入-2来强制它从最早的偏移量开始。

你是怎样生产的? 有一个片段是有用的。 你可以用-2替换-1,看看你是否收到任何东西,如果你的制作者没问题,那么你应该可以使用。

spoutCon@R_404_6408@ spoutConf = new spoutCon@R_404_6408@(...) spoutConf.startOffsetTime = kafka.api.OffsetRequest.LatestTime();

spoutCon@R_404_6408@ spoutCon@R_404_6408@ = new spoutCon@R_404_6408@(new spoutCon@R_404[email protected](zookeepers,// name of topic used by producer & consumer "/kafka",// zookeeper root path for offset storing "Kafkaspout");

您正在使用“gtest”主题接收数据。 确保你是由制作者从这个主题发送数据。

然后在螺栓上打印那个元组

public void execute(Tuple tuple,BasicOutputCollector collector) { System.out.println(tuple); }

它应该在kafka中打印待处理的数据。

我经历了一场风波和Kafka的融合。 这些都是快速发展和相对年轻的项目,所以很难得到工作实例来开始你的发展。

为了帮助其他开发人员(希望能让其他人提供可以使用的有用示例),我开始了一个github项目,以存放与Storm / Kafka(和Esper)开发相关的代码片段。

欢迎您在这里查看> https://github.com/buildlackey/cep

(点击storm + kafka目录下的示例程序,可以启动并运行)。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐