我实现了一个Java使用者,它使用来自Kafka主题的消息,然后将这些消息与POST请求一起发送到REST API。
while (true) {
ConsumerRecords<String, Object> records = consumer.poll(200);
for (ConsumerRecord<String, Object> record : records) {
CloseableHttpClient httpClient = HttpClientBuilder.create().build();
Object message = record.value();
JSONObject jsonObj = new JSONObject(message.toString());
try {
HttpPost request = new HttpPost(this.getConsumerProperties().getProperty("api.url"));
StringEntity params = new StringEntity(message.toString());
request.addHeader("content-type", "application/json");
request.addHeader("Accept", "application/json");
request.setEntity(params);
CloseableHttpResponse response = httpClient.execute(request);
HttpEntity entity = response.getEntity();
String responseString = EntityUtils.toString(entity, "UTF-8");
System.out.println(message.toString());
System.out.println(responseString);
} catch(Exception ex) {
ex.printStackTrace();
} finally {
try {
httpClient.close();
} catch(IOException ex) {
ex.printStackTrace();
}
}
}
}
假设一个消息已经被使用,但是Java类未能到达REST API。消息将永远不会被传递,但它将被标记为已消耗。处理这类案件最好的办法是什么?当且仅当来自REST API的响应成功时,我能以某种方式确认消息吗?
在使用者属性中,将enable.auto.commit设置为false。这意味着承担抵消的责任在于消费者。
因此,在上面的示例中,基于Response.StatusCode,您可以选择通过调用consumer.commitAsync()来提交偏移量。
我正在尝试让 kafka 消费者获取在 Java 中生成并发布到主题的消息。我的消费者如下。 consumer.java 当我运行上面的代码时,我在控制台中什么也看不到,屏幕后面的java producer程序正在‘AATest’主题下不断地发布数据。另外,在动物园管理员控制台中,当我尝试运行上面的consumer.java时,我得到了以下行 此外,当我运行指向 AATest 主题的单独控制台使用
我在ActiveMQ中使用异步消息使用者。我的制作人工作正常,向队列发送消息。现在,我的异步消息消费者正在等待调用onMessage(),但这从未发生过。因此,问题是: 异步使用者不会使用消息 ActiveMQ日志的快照还显示了许多刚刚堆积在挂起状态中的消息: 我想不出问题到底出在哪里。 计数: toPageIn 78 只是不断增加,信息仍然无法传递给消费者。 是服务器端问题还是客户端问题?
我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认
如何使用Apache Kafka生成/使用延迟消息?标准的Kafka(和Java的kafka-client)功能似乎没有这个特性。我知道我自己可以用标准的等待/通知机制来实现它,但是它看起来不是很可靠,所以任何建议和好的实践都很感谢。 找到相关问题,但没有帮助。正如我所看到的:Kafka基于从文件系统的顺序读取,并且只能用于直接读取主题,保持消息的顺序。我说的对吗?
由于它是一个Spring Boot应用程序,默认偏移量设置为Latest。我在这里做错了什么,请帮我弄明白。
我有一个springboot消费者应用程序。当我第一次运行它时,它消耗了来自Kafka主题的信息。但当我再次运行它时,它停止了消耗。在日志中,我看到以下消息。 我知道消费者无法获得偏移量。在这种情况下,消费者将引用自动偏移重置属性。如您所见,我已将其设置为,希望消费者从头开始阅读。但它没有。 应用程序. yml 在我的Java课上 我尝试了一些东西。 我将值设置为。不出所料,它抛出了一个异常,抱怨