当前位置: 首页 > 知识库问答 >
问题:

Python Kafka:如何从我停止的地方继续消费消息

段弘和
2023-03-14

我正在使用Kafka python版本2.0.2来生成和消费消息:我的生产者:

from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='my server')
producer.send('my topic', b'hello word')

我的消费者:

consumer = kafka.KafkaConsumer('my topic',bootstrap_servers=['192.168.2.137:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='name')
    
for message in consumer:    
    print(message.value)

当我运行消费者时,它运行得很好。但是当我在它完成所有消息之前停止它时,它不会从我停止的地方继续,如果我的程序崩溃或笔记本电脑没电了怎么办?我如何解决每个问题?我希望消费者继续阅读未阅读的消息?

共有1个答案

凌联
2023-03-14

尝试将偏移重置配置更改为最新(这是默认值)。

consumer = kafka.KafkaConsumer('my topic',bootstrap_servers=['192.168.2.137:9092'],
auto_offset_reset='latest',
enable_auto_commit=True,
group_id='name')
    
for message in consumer:    
    print(message.value)
 类似资料:
  • 我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认

  • 我的应用程序使用来自RabbitMQ的一些消息并对其进行处理。我有大约10个队列,每个队列最多有10个消费者(线程)。我有5次预回迁。我正在Heroku中使用CloudAMQP插件(RabbitMQ作为服务)运行安装程序。 我使用默认心跳和连接超时设置(60秒)运行。 我的java应用程序是一个使用sping-Rabbit库的Spring Boot应用程序。 版本: 问题是对于一个特定队列的消费者

  • 我正在使用带有KafkaListener注释的spring kafka v2.5.2。 在运行时,我希望能够向消费者发送停止消费的信号。 我看到了autoStartup参数,但它似乎只对初始化有效,之后无法更改。 我看到了KafkaListenerEndpointRegistry的methode close()。。。 你有什么建议吗? 提前谢谢。

  • 我使用Java的rabbitmq-client(https://mvnrepository.com/artifact/com.rabbitmq/amqp-client),我需要实现以下场景: 在接收Rabbit消息时,如果怀疑内存中不适合所有等待的数据,则可能需要暂停特定队列的Rabbitmq消耗。 处理完一些消息后,需要再次打开以下一组消息的消耗。 根据需要重复。 使用amqp-client J

  • 我正尝试使用Camel以事务方式从JMS队列中消费一条消息。特别是在这样的流程中: 等待消息在JMS队列上发布 尝试消费和处理单个消息 如果处理失败(发生异常),回滚消耗 如果处理通过,确认并停止使用更多消息 在应用程序生命周期的后期,另一个进程触发消费从(1)重新开始 起初,我试图使用轮询消费者,使用ConsumerTemplate来做这件事,但是我不知道是否可以通过事务来做这件事——似乎事务是

  • 我想这个话题发生了什么...偏移坏了还是我不知道... 有人知道会发生什么吗?谢谢