微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

检查Python中是否存在Kafka主题

我想创建一个Kafka主题,如果它尚不存在.我知道如何通过bash创建一个主题,但我不知道如何检查它是否存在.

topic_exists = ??????
if not topic_exists:
    subprocess.call([os.path.join(KAFKABIN, 'kafka-topics.sh'),
        '--create',  
        '--zookeeper', '{}:2181'.format(KAFKAHOST),
        '--topic', str(self.topic), 
        '--partitions', str(self.partitions),
        '--replication-factor', str(self.replication_factor)])

解决方法:

您可以使用kafka-topics.sh的–list(列出所有可用主题)选项,并查看topics数组中是否存在self.topic,如下所示.

根据您拥有的主题数量,这种方法可能会有点沉重.如果是这种情况,您可能可以使用–describe(给定主题的列表详细信息),如果主题不存在,可能会返回空.我还没有对此进行彻底的测试,所以我不能肯定地说这个解决方案( – 描述)是多么可靠,但是你可能需要进一步研究它.

wanted_topics = ['host_updates_queue', 'foo_bar']

topics = subprocess.check_output([os.path.join(KAFKABIN, 'kafka-topics.sh'),
        '--list',
        '--zookeeper', '{}:2181'.format(KAFKAHOST)])

for wanted in wanted_topics:
    if wanted in topics:
        print '\'{}\' topic exists!'.format(wanted)
    else:
        print '\'{}\' topic does NOT exist!'.format(wanted)

    topic_desc = subprocess.check_output([os.path.join(KAFKABIN, 'kafka-topics.sh'),
        '--describe',
        '--topic', wanted,
        '--zookeeper', '{}:2181'.format(KAFKAHOST)])

    if not topic_desc:
        print 'No description found for the topic \'{}\''.format(wanted)

OUTPUT:

root@dev:/opt/kafka/kafka_2.10-0.8.2.1# ./t.py
'host_updates_queue' topic exists!
'foo_bar' topic does NOT exist!
No description found for the topic 'foo_bar'

还有一个Broker Configuration可用,因此您不必采取以下任何步骤:

auto.create.topics.enable | true | Enable auto creation of topic on the server. If this is set to true then attempts to produce data or fetch Metadata for a non-existent topic will automatically create it with the default replication factor and number of partitions.

如果可能的话,我会采用这种方法.

请注意,您应在代理上为num.partitions和default.replication.factor设置主题configs(server.properties)以匹配代码段中的设置.

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐