当前位置: 首页 > 知识库问答 >
问题:

使用Python从融合的Kafka主题中消费数据并退出

刘高驰
2023-03-14

我正在尝试编写一个python代码来使用来自Confluent Kafka主题的数据,并作为测试项目的一部分执行数据验证。我能够读取数据,但是消费过程处于无限循环中,如果循环读取所有消息,则寻找退出的决策点。

参见下面的示例代码

conf = {'bootstrap.servers': "server:port", #
    'group.id': str(uuid.uuid1),
    'auto.offset.reset': 'earliest',
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'PLAIN',        
    'session.timeout.ms': 6000
    }
consumer = Consumer(conf) 
consumer.subscribe([topic], on_assign=on_assign)
try:
while True:
    msg=consumer.poll(timeout=2.0)

    # print(msg)

    if msg is None:
        print('msg is None')
        continue

    if msg.error().code() == KafkaError._PARTITION_EOF:
        print('End of partition reached {0}/{1}'
              .format(msg.topic(), msg.partition()))
        print( msg.error().code())
    else:
        print(msg.value())           

except KeyboardInterrupt:
   print("Interrupted through kb")

finally:
   consumer.close()

请建议决定是否阅读所有消息的最佳方法,以便我可以退出循环并关闭消费者。

共有1个答案

慕河
2023-03-14

根据定义,Apache Kafka 主题是无限的事件流。流没有“结束”,只有您可以选择定义的人为结束。

您需要在您的应用程序逻辑中定义这一点。例如,如果您在超过< code >的时间内没有收到信息

 类似资料:
  • 我正在使用@StreamListener(Spring-Cloud-Stream)来使用来自主题(输入通道)的消息,进行一些处理并保存到一些缓存或数据库中。 我的要求是,如果DB在处理消费的消息时停止,我想暂停主消费者(输入通道),并从另一个主题(输入56通道)开始消费,一旦它消费了来自输入56通道的所有消息(没有很多),我想再次恢复主消费者(输入通道)。 这能做到吗??

  • 如何在apache/kafka中使用regex消费所有主题?我尝试了上面的代码,但不起作用。

  • 我们有一个服务器,负责处理消息的生成和消费。我们有4台笔记本电脑,所有带有confluent的Mac都运行相同的命令行。。。 /kafka avro控制台使用者--从一开始--引导服务器0.0.0.0:9092,0.0.0.0:9092--主题主题名称--属性schema.registry.url=http://0.0.0.0:8081 4台笔记本电脑中有3台没有问题使用这些消息,但是第四台不会。

  • 我有一个主题列表(目前是10个),其大小可以在未来增加。我知道我们可以产生多个线程(每个主题)来消耗每个主题,但在我的例子中,如果主题的数量增加,那么消耗主题的线程数量也会增加,这是我不希望的,因为主题不会太频繁地获取数据,所以线程将是理想的。 有没有办法让单个消费者从所有话题中消费?如果是的话,我们怎样才能做到呢?另外,Kafka将如何维护抵消?请建议答案。

  • 因为我是新的Kafka,所以我能够从文件中读取记录,并通过生产者将消息发送到Kafka主题,但不能通过消费者消费相同的主题。 注意:您可以从任何文本文件中读取数据,我使用的是Kafka2.11-0.9。0.0版本 这是我的密码: 下面是输出:

  • 我的问题与单个消费者从多个话题消费有关。假设所有主题都加载了1M个记录,一个使用者必须处理这些记录。它将按照什么顺序从主题中读取(我的意思是首先读取哪个主题/分区,等等) Kafka内部资料的任何链接会有帮助吗?