我正在尝试使用Kafka-Python编写一个消费者,以确保精确的once语义。分区中的消息是使用事务感知生成器生成的。我从Kafka文档中了解到,我应该将isolation_level
指定为read_committed
,这样它将只读取提交的消息。问题是,我在Python客户机的文档中没有看到关于如何指定isolation_level
的任何地方。关于如何让我的消费者只阅读提交的消息,有什么想法吗?
预期结果:只需获取transation committed消息实际结果:使用者甚至读取如下所示的中止消息
ConsumerRecord(topic='tweets', partition=0, offset=504, timestamp=1557007360598, timestamp_type=0, key=b'\x00\x00\x00\x01', value='"\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000"', checksum=None, serialized_key_size=4, serialized_value_size=6)
根据这个问题,kafka-python
还不支持isolation.level=read_committed
。
我相信这三种类型的确认由于生产者属性仅限于领导者和生产者,我希望生产者在消费者通过kafka broker消费来自存储/队列的消息时收到具体的消息。还请纠正我,如果我在制作人的“acks”属性上有错误,它的默认值是“-1”,它确认所有副本是否已接收/存储消息,但它是否与消费者有关,或者我们是否可以在消费者提交且Kafka向制作人发送确认时创建一个桥梁?
例如,分区有1-10的偏移量。我只想从3-8消费。在消耗了第8条消息后,程序应该退出。
这是一个场景:我知道,使用与Spring kafka相关的最新API(如Spring集成kafka 2.10),我们可以执行以下操作: 以及来自与相同kafka主题相关的不同分区的读取。 我想知道我们是否可以使用同样的方法,例如spsping-集成-Kafka1.3.1 我没有找到任何关于如何做到这一点的提示(我对xml版本很感兴趣)。
现在,我有一个Spring Boot CLI应用程序,当应用程序启动时,它会自动启动Kafka消费者。我的任务是更新提供API的应用程序,允许在特定条件下启动或停止Kafka消费者。所以,我将使用SpringBootStarterWeb创建该API。但我找不到一种方法来手动管理消费过程。我需要的是 在不使用消费者的情况下启动API 关于如何手动管理消费过程的任何建议? 技术细节: 用于创建侦听器
我在kafka文档中读到:kafka还有一个命令行使用者,它将把消息转储到标准输出。 bin/kafka-console-consumer.sh--zookeeper localhost:2181--topic test--从头开始 我想知道如果我想要使用消息并将它们推送到另一个输出,应该向上面的命令添加哪些选项。kafka-console-consumer没有--help选项,我找不到任何参数,
我在使用Kafka时遇到了一些问题。非常感谢任何帮助!我在docker swell中分别有zookeeper和kafka集群3个节点。您可以在下面看到Kafka代理配置。 我的情况: < li > 20x位制片人不断向Kafka主题传达信息 < li>1x消费者读取和记录消息 < li >终止kafka节点(docker容器停止),因此现在群集有2个Kafka代理节点(第3个节点将自动启动并加入群