当前位置: 首页 > 知识库问答 >
问题:

Kafka消费者没有从分区获得任何消息

夏建木
2023-03-14

我刚刚注意到,当我在分区中生成单个消息时,我的使用者不会收到它。只有在我在同一分区中生成了更多的消息之后,使用者才会收到它们。我的提取最小字节数设置为 1。

是否有其他一些配置可能会影响这里?

每个分区都有一个专用的消费者。

相关部件的使用者代码。我的使用者为配置 ['stream'] 定义的不同主题启动多个线程。使用 https://github.com/mmustala/rdkafka-ruby 这是原始消费宝石的叉子。我添加了一个批处理使用方法。以及一种以托管方式关闭使用者的方法。

key = configs['app_key']
consumer = Rdkafka::Config.new(config(configs)).consumer
topic = "#{topic_prefix}#{app_env}_#{configs['stream']}"
consumer.subscribe(topic)

logger.info "#{rand}| Starting consumer for #{key} with topic #{topic}"
begin
  retry_counter = 0
  retries_started_at = nil
  current_assignment = nil
  partitions = []
  consumer.each_batch(configs['max_messages_per_partition'] || 5, 100, rand) do |messages|
    partitions = messages.collect {|m| m.partition}.uniq.sort
    logger.info "#{rand}| Batch started. Received #{messages.length} messages from partitions #{partitions} for app #{key}"
    current_assignment = consumer.assignment.to_h
    values = messages.collect {|m| JSON.parse(m.payload)}
    skip_commit = false
    begin
      values.each_slice((values.length / ((retry_counter * 2) + 1).to_f).ceil) do |slice|
        logger.info "#{rand}| Sending #{slice.length} messages to lambda"
        result = invoke_lambda(key, slice)
        if result.status_code != 200 || result.function_error
          logger.info "#{rand}| Batch finished with error #{result.function_error}"
          raise LambdaError, result.function_error.to_s
        end
      end
    rescue LambdaError => e
      logger.warn "#{rand}| #{e}"
      if consumer.running? && current_assignment == consumer.assignment.to_h
        retry_counter += 1
        retries_started_at ||= Time.now
        if retry_counter <= 5 && Time.now - retries_started_at < 600
          logger.warn "#{rand}| Retrying from: #{e.cause}, app_key: #{key}"
          Rollbar.warning("Retrying from: #{e.cause}", app_key: key, thread: rand, partitions: partitions.join(', '))
          sleep 5
          retry if consumer.running? && current_assignment == consumer.assignment.to_h
        else
          raise e # Raise to exit the retry loop so that consumers are rebalanced.
        end
      end
      skip_commit = true
    end
    retry_counter = 0
    retries_started_at = nil
    if skip_commit
      logger.info "#{rand}| Commit skipped"
    else
      consumer.commit
      logger.info "#{rand}| Batch finished"
    end
  end
  consumer.close
  logger.info "#{rand}| Stopped #{key}"
rescue Rdkafka::RdkafkaError => e
  logger.warn "#{rand}| #{e}"
  logger.info "#{rand}| assignment: #{consumer.assignment.to_h}"
  if e.to_s.index('No offset stored')
    retry
  else
    raise e
  end
end

配置

def config(app_config)
  {
      "bootstrap.servers": brokers,
      "group.id": app_configs['app_key'],
      "enable.auto.commit": false,
      "enable.partition.eof": false,
      "log.connection.close": false,
      "session.timeout.ms": 30*1000,
      "fetch.message.max.bytes": ['sources'].include?(app_configs['stream']) ? 102400 : 10240,
      "queued.max.messages.kbytes": ['sources'].include?(app_configs['stream']) ? 250 : 25,
      "queued.min.messages": (app_configs['max_messages_per_partition'] || 5) * 10,
      "fetch.min.bytes": 1,
      "partition.assignment.strategy": 'roundrobin'
  }
end

生产者代码使用https://github.com/zendesk/ruby-kafka

def to_kafka(stream_name, data, batch_size)
  stream_name_with_env = "#{Rails.env}_#{stream_name}"
  topic = [Rails.application.secrets.kafka_topic_prefix, stream_name_with_env].compact.join
  partitions_count = KAFKA.partitions_for(topic)
  Rails.logger.info "Partition count for #{topic}: #{partitions_count}"
  if @job.active? && @job.partition.blank?
    @job.connect_to_partition
  end
  partition = @job.partition&.number.to_i % partitions_count
  producer = KAFKA.producer 
  if data.is_a?(Array)
    data.each_slice(batch_size) do |slice|
      producer.produce(JSON.generate(slice), topic: topic, partition: partition)
    end
  else
    producer.produce(JSON.generate(data), topic: topic, partition: partition)
  end
  producer.deliver_messages
  Rails.logger.info "records sent to topic #{topic} partition #{partition}"
  producer.shutdown
end

更新:看起来消息的数量无关紧要。我刚刚在一个分区中生成了100多条消息,消费者还没有开始使用这些消息。

更新2:它并没有在晚上开始消耗消息。但当我今天早上在同一分区中生成一组新消息时,它醒来并开始使用我刚刚生成的新消息。它跳过了昨晚产生的消息。

共有1个答案

濮泳
2023-03-14

我认为问题是分区有一段时间没有收到消息,显然它没有保存偏移量。获取偏移量时,将其设置为最大值,这是默认值。在我设置<code>auto.offset之后。reset:“minimum”我从未见过这样的问题,即消息会被跳过。

 类似资料:
  • 我有一个多分区主题,由多个使用者(同一组)使用。我的目标是最大化消费处理,即任何消费者都可以消费来自任何分区的消息。 我知道这看起来是不可能的,因为只有一个消费者可以从一个分区中消费。 有没有可能使用REST代理来实现这一点?例如,轮询所有代理消费者实例。 谢了。

  • null 我在这一页上读到以下内容: 使用者从任何单个分区读取,允许您以与消息生成类似的方式扩展消息消耗的吞吐量。 也可以将使用者组织为给定主题的使用者组-组内的每个使用者从唯一分区读取,并且组作为一个整体使用来自整个主题的所有消息。 如果使用者多于分区,则某些使用者将空闲,因为它们没有可从中读取的分区。 如果分区多于使用者,则使用者将从多个分区接收消息。 如果使用者和分区的数量相等,则每个使用者

  • TL;DR;我试图理解一个被分配了多个分区的单个使用者是如何处理reach分区的消费记录的。 例如: 在移动到下一个分区之前,会完全处理一个分区。 每次处理每个分区中的可用记录块。 从第一个可用分区处理一批N条记录 以循环旋转方式处理来自分区的N条记录 我找到了或分配程序的配置,但这只决定了使用者如何分配分区,而不是它如何从分配给它的分区中使用。 我开始深入研究KafkaConsumer源代码,#

  • 我正在尝试设置一个基本的Java消费者来接收来自Kafka主题的消息。我在-https://cwiki.apache.org/confluence/display/KAFKA/Consumer组示例-并具有以下代码: 和 Kafka在有问题的EC2主机上运行,我可以使用kafka-console-producer.sh和kafka-console-consumer.sh工具发送和接收关于主题“测试

  • 消费者使用Spring的JavaConfig类如下: Kafka主题侦听器使用@KafkaListener注释,如下所示: 我的pom包括依赖项: 现在当我打包到war并部署到tomcat时,它不会显示任何错误,即使在调试模式下也不会显示任何错误,只是部署war什么都没有。 请帮助我了解是否缺少触发kafkalistner的某些配置。 谢谢Gary我添加了上下文。xml和web。xml,但我得到了

  • 本文向大家介绍Kafka 消费者是否可以消费指定分区消息?相关面试题,主要包含被问及Kafka 消费者是否可以消费指定分区消息?时的应答技巧和注意事项,需要的朋友参考一下 Kafa consumer消费消息时,向broker发出fetch请求去消费特定分区的消息,consumer指定消息在日志中的偏移量(offset),就可以消费从这个位置开始的消息,customer拥有了offset的控制权,可