当前位置: 首页 > 知识库问答 >
问题:

Apache Kafka中的消息消费确认

何志业
2023-03-14

我实现了一个Java使用者,它使用来自Kafka主题的消息,然后将这些消息与POST请求一起发送到REST API。

    while (true) {
        ConsumerRecords<String, Object> records = consumer.poll(200);
        for (ConsumerRecord<String, Object> record : records) {
            CloseableHttpClient httpClient = HttpClientBuilder.create().build();  
            Object message = record.value();
            JSONObject jsonObj = new JSONObject(message.toString());
            try {
                HttpPost request = new HttpPost(this.getConsumerProperties().getProperty("api.url"));
                StringEntity params = new StringEntity(message.toString());
                request.addHeader("content-type", "application/json");
                request.addHeader("Accept", "application/json");
                request.setEntity(params);

                CloseableHttpResponse response = httpClient.execute(request);
                HttpEntity entity = response.getEntity();
                String responseString = EntityUtils.toString(entity, "UTF-8");
                System.out.println(message.toString());
                System.out.println(responseString);
            } catch(Exception ex) {
              ex.printStackTrace();
            }  finally {
                try {   
                    httpClient.close(); 
                } catch(IOException ex) {
                    ex.printStackTrace();
                } 
            }                  
        } 
    }   

假设一个消息已经被使用,但是Java类未能到达REST API。消息将永远不会被传递,但它将被标记为已消耗。处理这类案件最好的办法是什么?当且仅当来自REST API的响应成功时,我能以某种方式确认消息吗?

共有1个答案

苏承载
2023-03-14

在使用者属性中,将enable.auto.commit设置为false。这意味着承担抵消的责任在于消费者。

因此,在上面的示例中,基于Response.StatusCode,您可以选择通过调用consumer.commitAsync()来提交偏移量。

 类似资料:
  • 我正在尝试让 kafka 消费者获取在 Java 中生成并发布到主题的消息。我的消费者如下。 consumer.java 当我运行上面的代码时,我在控制台中什么也看不到,屏幕后面的java producer程序正在‘AATest’主题下不断地发布数据。另外,在动物园管理员控制台中,当我尝试运行上面的consumer.java时,我得到了以下行 此外,当我运行指向 AATest 主题的单独控制台使用

  • 我在ActiveMQ中使用异步消息使用者。我的制作人工作正常,向队列发送消息。现在,我的异步消息消费者正在等待调用onMessage(),但这从未发生过。因此,问题是: 异步使用者不会使用消息 ActiveMQ日志的快照还显示了许多刚刚堆积在挂起状态中的消息: 我想不出问题到底出在哪里。 计数: toPageIn 78 只是不断增加,信息仍然无法传递给消费者。 是服务器端问题还是客户端问题?

  • 我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认

  • 如何使用Apache Kafka生成/使用延迟消息?标准的Kafka(和Java的kafka-client)功能似乎没有这个特性。我知道我自己可以用标准的等待/通知机制来实现它,但是它看起来不是很可靠,所以任何建议和好的实践都很感谢。 找到相关问题,但没有帮助。正如我所看到的:Kafka基于从文件系统的顺序读取,并且只能用于直接读取主题,保持消息的顺序。我说的对吗?

  • 由于它是一个Spring Boot应用程序,默认偏移量设置为Latest。我在这里做错了什么,请帮我弄明白。

  • 我有一个springboot消费者应用程序。当我第一次运行它时,它消耗了来自Kafka主题的信息。但当我再次运行它时,它停止了消耗。在日志中,我看到以下消息。 我知道消费者无法获得偏移量。在这种情况下,消费者将引用自动偏移重置属性。如您所见,我已将其设置为,希望消费者从头开始阅读。但它没有。 应用程序. yml 在我的Java课上 我尝试了一些东西。 我将值设置为。不出所料,它抛出了一个异常,抱怨