我们正在使用spring kafka 1.2.2。释放
我们想要的
1.一旦消息被消费并成功处理,就会在spring-kafka中提交偏移量。
我正在使用Manaul Commit/Ac认收它,它工作正常。
2.在任何异常的情况下,我们希望spring-kafka重新发送相同的消息。
我们对任何系统误差抛出RunTime异常,它由spring-kafka记录并且从未提交。
这很好,因为我们不希望它提交,但该消息将保留在spring-kafka中并且永远不会回来,除非我们重新启动服务。在重启时,消息会回来并再次执行,然后留在spring-kafka
我们尝试的
1。我已经尝试了ErrorHandler和RetryingMessageListenerAdapter,但在这两种情况下,我们都必须在服务中编写如何再次处理消息的代码
这是我的消费者
public class MyConsumer{
@KafkaListener
public void receive(...){
// application logic to return success/failure
if(success){
acknowledgement.acknowledge();
}else{
throw new RunTimeException();
}
}
}
此外,我有以下集装箱工厂的配置
factory.getContainerProperties().setErrorHandler(new ErrorHandler(){
@Override
public void handle(...){
throw new RunTimeException("");
}
});
在执行流时,控制首先进入这两个模块,以接收然后处理方法。之后,服务将等待新消息。然而,我期待的是,由于我们抛出了一个异常,并且消息未提交,相同的消息应该再次进入receive方法
有没有办法,我们可以告诉spring kafka“不要提交此消息并尽快再次发送?”
不幸的是,我们可以使用的版本是1.3.7。释放
我尝试过实现ConsumerSekAware接口。下面是我如何做到这一点,我可以看到信息传递的再创造
消费者
public class MyConsumer implements ConsumerSeekAware{
private ConsumerSeekCallback consumerSeekCallback;
if(condition) {
acknowledgement.acknowledge();
}else {
consumerSeekCallback.seek((String)headers.get("kafka_receivedTopic"),
(int) headers.get("kafka_receivedPartitionId"),
(int) headers.get("kafka_offset"));
}
}
@Override
public void registerSeekCallback(ConsumerSeekCallback consumerSeekCallback) {
this.consumerSeekCallback = consumerSeekCallback;
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> arg0, ConsumerSeekCallback arg1) {
LOGGER.debug("onIdleContainer called");
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> arg0, ConsumerSeekCallback arg1) {
LOGGER.debug("onPartitionsAssigned called");
}
}
配置
public class MyConsumerConfig {
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
// Set server, deserializer, group id
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return props;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MyModel> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, MyModel> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
return factory;
}
@Bean
public MyConsumer receiver() {
return new MyConsumer();
}
}
不再支持1.2. x;建议1. x用户至少升级到1.3. x(当前为1.3.8),因为它的线程模型要简单得多,这要归功于KIP-62。
当前版本为2.2.2。
2.0.1引入了seekToCurInterrorHandler
,它重新查找失败的记录,以便重新交付。
对于早期版本,您必须停止并重新启动容器以重新传递失败的消息,或者向侦听器适配器添加重试。
我建议您升级到最新版本。
假设kafka消息生产者向一个主题发送一条事件消息。然后一个消费者处理这个事件消息。但是,这个消费者进程因为业务错误而抛出异常,所以他想让消息生产者知道它并再次怨恨。 有什么解决办法吗?
如何保持一个连续的流,以“反应”新的丢弃的文件?(或其他事件,如HTTP GET请求或类似的事件)... 例如,如果我不返回PublisherBuilder的实例,而是返回一个整数,那么我的kafka主题将由一个非常巨大的整数值流填充。这就是为什么示例在发送消息时使用一些间隔... 我应该使用一些CompletationStage或CompletableFuture吗?RXJava2?使用哪个li
我知道Kafka制作人会将消息分批处理。每个批属于一个特定的分区。 我的问题是 生产者是否知道每个批次属于哪个分区? 生产者是否知道每个分区的代理地址? 当生产者发送请求时,每个请求包含多个批次还是只包含一个属于目标分区的批次。 如果生产者发送多个批次,接收kafka服务器是否将批次重传到目标分区。
如何在Kafka中发送同步消息 实现这一点的一种方法是设置properties参数 。 但是我想知道是否有一种甚至直接或替代的方式在Kafka中发送同步消息。(比如producer.sync发送(...)等等)。
我有一个工作的WebSocket示例,其中客户端从服务器接收消息。 我不确定当客户端连接时,我应该如何向客户端发送旧消息。 示例: 每个客户端在连接时提供其名称 服务器响应“[名称] 刚刚连接”(对所有客户端) 任何新客户端都不会收到这些消息 我想知道客户端是否有任何方法可以接收旧消息(所有消息或过去 5 分钟内的消息都可以接受)。 我怀疑我可能需要自己捕获这些信息,将其存储在某个地方(如数据库)
如果这是一个重复的问题,我真的很抱歉,但我在其他线程中尝试了许多答案,但没有一个对我有效。 我试图通过使用TLSv1.2协议的SSLSocket向远程服务器发送ISO8583消息,我用密钥库配置了证书,并试图发送一个ISO8583消息示例:08002220010000800000900000011312115000000180105000003 0800:MTI 222001000800000:二