我们有一个非常简单的Kafka Consumer(v 2.6.2)。它是使用者组中唯一的使用者,并且该组是唯一一个阅读主题的组(有6个分区,其中有大约300万个事件)。Broker也是2.6.x版本
由于我们需要实现一个“只有一次”的场景,我们深入研究了一下,如果我们真的只使用一次写入主题的每个事件。不幸的是,我们发现:消费者有时会跳过一个偏移量,有时甚至会跳过一组分区的偏移量。
消费者除了记录之外没有做更多的事情。错误处理程序配置了日志记录。
@KafkaListener(id = "${spring.kafka.consumer.group-id}", topics = "${topicname}")
public void onMessage(ConsumerRecord<String, BlaBlaEvent> record) throws Exception {
// mostly logging
}
并记录:
consuming Offset=30 Partition=2
consuming Offset=31 Partition=2
consuming Offset=32 Partition=2
consuming Offset=67 Partition=2
(看到Offset=67了吗?)
这是我们的应用程序属性
spring.kafka.bootstrap-servers=${KAFKA_BOOTSTRAP_SERVERS_URL}
spring.kafka.properties.security.protocol: SSL
spring.kafka.properties.schema.registry.url=${SCHEMA_REGISTRY_URL}
spring.kafka.properties.specific.avro.reader=true
spring.kafka.ssl.trust-store-type=PKCS12
spring.kafka.ssl.key-store-type=PKCS12
spring.kafka.ssl.key-store-password=${KEYSTORE_PASSWORD}
spring.kafka.ssl.trust-store-password=${TRUSTSTORE_PASSWORD}
spring.kafka.ssl.trust-store-location=${TRUSTSTORE_PATH}
spring.kafka.ssl.key-store-location=${KEYSTORE_PATH}
spring.kafka.consumer.client-id=BlablaNotificationReader
spring.kafka.consumer.group-id=blabla-group
spring.kafka.consumer.auto-offset-reset=none
spring.kafka.consumer.max-poll-records=1
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
logging.level.org.apache.kafka=DEBUG
有人知道如何找到这个问题的根源吗?
由于您正在寻找精确一次语义学,因此您可能有启用事务的生产者-如果是这样,这可能是由于分区具有事务记录。
Kafka消费者
留档声明:
具有事务性消息的分区将包括指示事务结果的提交或中止标记。这些标记不会返回给应用程序,但在日志中有一个偏移量。因此,从具有事务性消息的主题中读取的应用程序将会看到使用的偏移量中的缺口。这些丢失的消息将成为事务标记,在这两个隔离级别中,它们都被消费者过滤掉了。此外,使用read_committed使用者的应用程序也可能会看到由于中止的事务而产生的间隙,因为这些消息不会由使用者返回,但会有有效的偏移量。
我看到实际错过记录的唯一方法是,如果使用者为由于某种原因未被处理的记录/记录提交偏移量或之后提交偏移量。
如果需要有关从代理轮询的记录和正在提交的偏移量的更多信息,可以将 org.springframework.kafka.listener.KafkaMessageListenerContainer
类的日志记录级别设置为 TRACE。
这应该可以让您很好地了解收到的记录和提交的偏移量。如果在处理正在提交的偏移量之前,所有偏移量都已编辑:则您不太可能缺少任何偏移量。
为了检查<code>一次</code>语义是否正确,也许可以尝试通过某些id将生产者的日志与消费者的日志关联起来,特别是在发生此事件时。
您可能还想检查发生这种情况时的生产者日志,因为这种差距可能是由于中止的事务造成的。
是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?
我正在使用Spring Kafka consumer,它从主题中获取消息并将其保存到数据库中。如果满足故障条件,例如db不可用,kafka消费者库是否提供重试机制?如果是,是否有方法设置不同的重试间隔,如第1次重试应在5分钟后进行,第2次重试应在30分钟后进行,第3次重试应在1小时后进行等。
在使用Spring Kafka Consumer时,我有时会收到以下错误消息。如代码片段所示,我至少实现了一次语义 1)我的疑问是,我是否错过了来自消费者的任何信息? 2) 我需要处理这个错误吗。由于 org.apache.kafka.clients.consumer.提交失败异常:无法完成偏移提交,因为消费者不是自动分区分配的活动组的一部分;消费者很可能被踢出组。 我的SpringKafka消费
如果我不想使用自动通信模式,sping提供了另一种方法。 spring kafkfa/#提交补偿为我们提供了以下关于提交补偿的信息: -当侦听器在处理记录后返回时提交偏移量 -处理poll()返回的所有记录后提交偏移量 -当poll()返回的所有记录都已被处理时,只要超过自上次提交以来的确认时间,就提交偏移量 -当poll()返回的所有记录都已被处理时,只要自上次提交以来已收到ackCount记录
我需要使用consume process Product模式来处理Kafka消息,并已使用Kafka事务管理器配置了Spring Kafka侦听器容器,还设置了事务id前缀以启用Kafka事务。我正在使用批处理的ack模式,并试图了解在这种模式下,在事务中何时提交偏移量。文档似乎表明,一旦使用了轮询中的所有记录,ack模式批提交偏移量——在事务上下文中也是这样吗,即每个轮询1个事务? 或者,在使用
我在站点1(3个代理)有两个集群设置cluster-1,在站点2(3个代理)有两个集群设置cluster-2。使用spring kafka(1.3.6)消费者(一台机器)并通过@KafkaListener注释收听消息。我们如何为每个集群(c1和c2)实例化多个KafkaListenerContainerFactory,并同时监听来自这两个集群的数据。 我的侦听器应该同时使用来自这两个集群的消息。