我正在尝试编写一个python代码来使用来自Confluent Kafka主题的数据,并作为测试项目的一部分执行数据验证。我能够读取数据,但是消费过程处于无限循环中,如果循环读取所有消息,则寻找退出的决策点。
参见下面的示例代码
conf = {'bootstrap.servers': "server:port", #
'group.id': str(uuid.uuid1),
'auto.offset.reset': 'earliest',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',
'session.timeout.ms': 6000
}
consumer = Consumer(conf)
consumer.subscribe([topic], on_assign=on_assign)
try:
while True:
msg=consumer.poll(timeout=2.0)
# print(msg)
if msg is None:
print('msg is None')
continue
if msg.error().code() == KafkaError._PARTITION_EOF:
print('End of partition reached {0}/{1}'
.format(msg.topic(), msg.partition()))
print( msg.error().code())
else:
print(msg.value())
except KeyboardInterrupt:
print("Interrupted through kb")
finally:
consumer.close()
请建议决定是否阅读所有消息的最佳方法,以便我可以退出循环并关闭消费者。
根据定义,Apache Kafka 主题是无限的事件流。流没有“结束”,只有您可以选择定义的人为结束。
您需要在您的应用程序逻辑中定义这一点。例如,如果您在超过< code >的时间内没有收到信息
我正在使用@StreamListener(Spring-Cloud-Stream)来使用来自主题(输入通道)的消息,进行一些处理并保存到一些缓存或数据库中。 我的要求是,如果DB在处理消费的消息时停止,我想暂停主消费者(输入通道),并从另一个主题(输入56通道)开始消费,一旦它消费了来自输入56通道的所有消息(没有很多),我想再次恢复主消费者(输入通道)。 这能做到吗??
如何在apache/kafka中使用regex消费所有主题?我尝试了上面的代码,但不起作用。
我们有一个服务器,负责处理消息的生成和消费。我们有4台笔记本电脑,所有带有confluent的Mac都运行相同的命令行。。。 /kafka avro控制台使用者--从一开始--引导服务器0.0.0.0:9092,0.0.0.0:9092--主题主题名称--属性schema.registry.url=http://0.0.0.0:8081 4台笔记本电脑中有3台没有问题使用这些消息,但是第四台不会。
我有一个主题列表(目前是10个),其大小可以在未来增加。我知道我们可以产生多个线程(每个主题)来消耗每个主题,但在我的例子中,如果主题的数量增加,那么消耗主题的线程数量也会增加,这是我不希望的,因为主题不会太频繁地获取数据,所以线程将是理想的。 有没有办法让单个消费者从所有话题中消费?如果是的话,我们怎样才能做到呢?另外,Kafka将如何维护抵消?请建议答案。
因为我是新的Kafka,所以我能够从文件中读取记录,并通过生产者将消息发送到Kafka主题,但不能通过消费者消费相同的主题。 注意:您可以从任何文本文件中读取数据,我使用的是Kafka2.11-0.9。0.0版本 这是我的密码: 下面是输出:
我的问题与单个消费者从多个话题消费有关。假设所有主题都加载了1M个记录,一个使用者必须处理这些记录。它将按照什么顺序从主题中读取(我的意思是首先读取哪个主题/分区,等等) Kafka内部资料的任何链接会有帮助吗?