我们有一个应用程序,它使用来自Kafka主题(3个分区)的消息,丰富数据,并将记录保存在DB(Spring JPA)中,然后将消息发布到另一个Kafka主题(在同一个代理上),所有这些都通过使用Camel 2.4.1和Spring Boot 2.1.7进行编排。释放
我们想为 kafka 消费者-生产者组合实现“exactly-once”语义。
消费者设置:
autoOffsetReset: earliest
autoCommitEnable: false
allowManualCommit: true
breakOnFirstError: true
group.id : CONSUMER.GROUP.ID
count: 3
max.poll.records = 1 # rollback when message processing fails.
生产者设置:
idempotence: true
transactionIdPrefix: txn-prefix-id
豆接线:
@Bean
SpringTransactionPolicy springTransactionPolicy() throws Exception {
SpringTransactionPolicy txRequired = new SpringTransactionPolicy();
txRequired.setTransactionManager(transactionManager());
txRequired.setPropagationBehaviorName("PROPAGATION_REQUIRED");
return txRequired;
}
@Bean
public DefaultKafkaProducerFactory<byte[], byte[]> producerFactory() {
DefaultKafkaProducerFactory<byte[], byte[]> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<byte[], byte[]>(
kafkaConfigs());
// enable transaction manager
defaultKafkaProducerFactory.setTransactionIdPrefix(transactionIdPrefix);
return defaultKafkaProducerFactory;
}
@Bean
@Primary
public ChainedKafkaTransactionManager<byte[], byte[]> transactionManager() throws Exception {
return new ChainedKafkaTransactionManager<>(kafkaTransactionManager(),jpaTransactionManager());
}
@Bean
public PlatformTransactionManager kafkaTransactionManager() {
KafkaTransactionManager<byte[], byte[]> kafkaTransactionManager = new KafkaTransactionManager<>(producerFactory);
kafkaTransactionManager.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
kafkaTransactionManager.setRollbackOnCommitFailure(true);
return kafkaTransactionManager;
}
@Bean
JpaTransactionManager jpaTransactionManager() {
JpaTransactionManager transactionManager = new JpaTransactionManager();
transactionManager.setRollbackOnCommitFailure(true);
return transactionManager;
}
骆驼路线:
public RoutesBuilder inboundRoute() {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
//Common error handler
onException(UnsupportedMessageTypeException.class).
maximumRedeliveries(redeliveryCount).
handled(true).
bean(ExceptionPropagatorProcessor.class, "process").
bean(manualCommitProcessor).
end();
onException(AppRuntimeException.class).
maximumRedeliveries(redeliveryCount).
bean(ExceptionPropagatorProcessor.class, "process")
end();
onException(RetryExhaustedException.class).
maximumRedeliveries(0).// No retry for this exception
handled(true).
bean(ExceptionPropagatorProcessor.class, "process").
bean(kafkaManualCommitProcessor).
end();
from("kafka:inboundTopic").
routeId("consume-msg").
transacted("springTransactionPolicy").
bean(transactionBeginProcessor).
//check if this is a retry scenario, the max retry count reached then throw RetryExhaustedException.
bean(retryEvaluationProcessor).
bean(enrichProcessor). // publish kafka messages
bean(persistenceProcessor).
bean(transactionEndProcessor). // publish kafka messages
bean(manualCommitProcessor);
但当出现异常处理场景时,我们无法让Kafka制作人提交消息。我缺少什么,正确的方法是什么?
您似乎正在使用Spring Kafka,而他们的KafkaTransactionManager并不是一个真正支持XA的事务管理器(有关限制,请参阅他们的文档),因此您不能将其用于Kafka和JDBC数据库等的回滚。
并且 camel-kafka 不支持 Kafka 事务(目前)。我已经创建了一张票:https://issues.apache.org/jira/browse/CAMEL-15016
我正在使用Spring boot2.1.7。RELEASE和spring-kafka 2.2.7。RELEASE。我正在使用@KafkaListener注释来创建一个消费者,我正在使用消费者的所有默认设置。 这是我的消费者配置: 由于某些原因,我在同一个应用程序中有多个使用者,如下所示。 尽管如此,根据关于“消费者线程安全”的合流文件 一个线程中不能有多个属于同一组的使用者,也不能有多个线程安全地
问题内容: 因此,以某种方式(玩转),我发现自己使用了regex这样的正则表达式。 从逻辑上讲,对我来说,它应表示: (一个数字正好一次)正好两次,即一个数字正好两次。 但实际上,它似乎仅表示“一个数字仅一次”(因此忽略了)。 使用或相似,可以看到相似的结果。 为什么会这样?它是在regex / Java文档中的某个地方明确声明的,还是只是Java开发人员即时做出的决定,还是一个错误? 还是实际上
当我只打开一次处理时,我会得到以下错误。注意:我们的应用程序非常安全,我们只允许Kafka用户和消费者访问他们明确需要的资源。 只有一次处理kafka流是否在所有流任务中使用每个流任务的消费者组而不是消费者组?
我在一些关于堆栈溢出的答案中看到,通常在web中也看到,Kafka不支持消费确认,或者消费一次就很难实现。 在以下作为示例的条目中,有没有理由使用RabbitMQ而不是Kafka?,我可以读到以下语句: RabbitMQ将保留已消耗/已确认/未确认消息的所有状态,而Kafka则不保留 或 有人能解释一下为什么Kafka的“一次消费保证”很难实现吗?这与Kafka和RabbitMQ等其他更传统的消息
如何提高Kafka消费者的绩效?我有(并且需要)至少一次Kafka消费语义学 我有以下配置。processInDB()需要2分钟才能完成。因此,仅处理10条消息(全部在单个分区中)就需要20分钟(假设每条消息2分钟)。我可以在不同的线程中调用processInDB,但我可能会丢失消息!。如何在2到4分钟的时间窗口内处理所有10条消息? 下面是我的Kafka消费者代码。
本文向大家介绍Kafka 的消费者如何消费数据相关面试题,主要包含被问及Kafka 的消费者如何消费数据时的应答技巧和注意事项,需要的朋友参考一下 消费者每次消费数据的时候,消费者都会记录消费的物理偏移量(offset)的位置 等到下次消费时,他会接着上次位置继续消费