我能够使用ErrorDesrializationHandler成功处理反序列化错误,但当我重新启动我的消费者时,它再次开始重新处理由于反序列化而导致的所有失败消息。
由于反序列化异常无法到达Kafka Listener,如何确认并提交偏移量?
谢谢。
我正在使用的自定义错误处理程序:
class KafkaErrorHandler implements ConsumerAwareErrorHandler{
private static final Logger LOG = LoggerFactory.getLogger(KafkaErrorHandler.class);
@Override
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
if (!records.isEmpty()) {
ConsumerRecord<?, ?> record = records.get(0);
String topic = record.topic();
long offset = record.offset();
int partition = record.partition();
if (thrownException.getClass().equals(DeserializationException.class)) {
DeserializationException exception = (DeserializationException) thrownException;
String malformedMessage = new String(exception.getData());
LOG.info("Skipping message with topic {} and offset {} " +
"- malformed message: {} , exception: {}", topic, offset, malformedMessage, exception.getLocalizedMessage());
} else {
LOG.info("Skipping message with topic {} - offset {} - partition {} - exception {}", topic, offset, partition, thrownException);
}
} else {
LOG.info("Consumer exception - cause: {}", thrownException.getMessage());
}
}
}
使用<代码>确认模式。手动_立即,请参阅
/**
* Set to true to commit the offset for a recovered record.
* The container must be configured with
* {@link org.springframework.kafka.listener.ContainerProperties.AckMode#MANUAL_IMMEDIATE}.
* Whether or not the commit is sync or async depends on the container's syncCommits
* property.
* @param commitRecovered true to commit.
*/
@Override
public void setCommitRecovered(boolean commitRecovered) { // NOSONAR enhanced javadoc
super.setCommitRecovered(commitRecovered);
}
在DefaultErrorHandler
上。
在确认模式下不可能。手册
;对于其他AckMode
s,默认情况下,容器将提交恢复记录的偏移量(基于isAckAfterHandle()
)。
编辑
提交偏移量的逻辑位于SeekUtils.seekOrRecover
(在查找之后)。
if (commitRecovered) {
if (container.getContainerProperties().getAckMode().equals(AckMode.MANUAL_IMMEDIATE)) {
ConsumerRecord<?, ?> record = records.get(0);
Map<TopicPartition, OffsetAndMetadata> offsetToCommit = Collections.singletonMap(
new TopicPartition(record.topic(), record.partition()),
ListenerUtils.createOffsetAndMetadata(container, record.offset() + 1));
if (container.getContainerProperties().isSyncCommits()) {
consumer.commitSync(offsetToCommit, container.getContainerProperties().getSyncCommitTimeout());
}
else {
OffsetCommitCallback commitCallback = container.getContainerProperties().getCommitCallback();
if (commitCallback == null) {
commitCallback = LOGGING_COMMIT_CALLBACK;
}
consumer.commitAsync(offsetToCommit, commitCallback);
}
}
else {
logger.debug(() -> "'commitRecovered' ignored, container AckMode must be MANUAL_IMMEDIATE, not "
+ container.getContainerProperties().getAckMode());
}
}
我有一个Kafka消费者,我从它消费数据从一个特定的主题,我看到下面的例外。我使用的是Kafka版本。 我添加了这两个额外的消费者属性,但仍然没有帮助: 那个错误意味着什么?我该如何解决它?我需要添加一些其他消费者属性吗?
我有一个ReactorKafka项目,它消耗来自Kafka主题的消息,转换消息,然后写入到另一个主题。 我的理解是,只有在Reactor中成功完成所有顺序步骤后,才会提交偏移量。对吗?我想确保不会处理下一条记录,除非当前记录成功发送到目标Kafka主题。
我有Kafka流应用程序。我的应用程序正在成功处理事件。 如何使用重新处理/跳过事件所需的偏移量更改Kafka committed consumer offset。我试过如何更改topic?的起始偏移量?。但我得到了“节点不存在”错误。请帮帮我。
我目前正在从具有特定偏移量的主题中获取消息。我正在使用寻求()来实现它。但是当我将enable.auto.commit设置为true或使用手动同步(委托同步()/委托同步())时,Seek()不起作用,因为它没有轮询来自特定偏移量的消息,而是从最后提交的偏移量中选择。 因此,在使用Seek()时,是否必须将偏移量存储在外部DB中,而不提交给Kafka?Seek和Commit不能并行工作吗? 客户端
我使用MANUAL_IMMEDIATEack模式,Spring-kafka 1.3.9(不能更改为Java8),并在监听器代码中完成处理时提交偏移量。我使用自定义反序列化器及其工作正常,除非我遇到反序列化异常。然后Kafka卡住了。我已经处理了这个由Deserializer,喜欢而不是抛出异常(当反序列化异常发生)我得到一个反序列化对象的新实例,并设置原始消息(导致反序列化异常)在一个字段(异常数
我有一个spring kafka消费者,它读取记录并将其交给缓存。计划的任务将定期清除缓存中的记录。我只想在批成功保存到数据库中后更新提交偏移量。我尝试将Acknowledgement对象传递给缓存服务以调用acknowledge方法,如下所示。 确认模式设置如下: 自动提交是错误的: 即使调用了确认方法,提交偏移量也不会更新。持久化记录后更新提交偏移量的最佳方法是什么? 我正在使用spring-