当前位置: 首页 > 知识库问答 >
问题:

当存在分区时,并发数正在减少

鲍俊杰
2023-03-14

我有一个spring boot应用程序,用于向kafka发送消息。该应用程序在每天1000万个请求的高流量下在6个实例上运行。我也有一款春装Kafka消费应用。但是这个应用程序有2个实例,这些实例不能使用所有的消息,因为这个应用程序运行的是单线程。我的主题有4个分区,我想根据分区数做消费者应用程序多线程。但是我不确定我的代码是否有效。

SpringKafka独立

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.1.7.RELEASE</version>
</dependency>

配置类

@Configuration
@EnableKafka
public class KafkaListenerConfig {

    private final KafkaListenerProperties kafkaListenerProperties;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =  new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(4);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(kafkaConsumerProps());
    }
    @Bean
    public Map<String, Object> kafkaConsumerProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaListenerProperties.getBootstrap());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaListenerProperties.getGroup());
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

}

根据我主题的分区计数,我将并发字段设置为 4

factory.setConcurrency(4);

主题分区:

        Topic:fraudSSCALogs     PartitionCount:4        ReplicationFactor:1     Configs:
        Topic: fraudSSCALogs    Partition: 0    Leader: 62      Replicas: 62    Isr: 62
        Topic: fraudSSCALogs    Partition: 1    Leader: 166     Replicas: 166   Isr: 166
        Topic: fraudSSCALogs    Partition: 2    Leader: 270     Replicas: 270   Isr: 270
        Topic: fraudSSCALogs    Partition: 3    Leader: 167     Replicas: 167   Isr: 167

侦听器类

public class SSCAReqResLogListener {

@KafkaListener(id= "consumer0",topicPartitions = {@TopicPartition(topic="${kafka.consumer.topic}", partitions = {"0"})})
public void receive1(ConsumerRecord<String, String> record) {
    proceed(record);
}

@KafkaListener(id= "consumer1",topicPartitions = {@TopicPartition(topic="${kafka.consumer.topic}", partitions = {"1"})})
public void receive2(ConsumerRecord<String, String> record) {
    proceed(record);
}

@KafkaListener(id= "consumer2",topicPartitions = {@TopicPartition(topic="${kafka.consumer.topic}", partitions = {"2"})})
public void receive3(ConsumerRecord<String, String> record) {
    proceed(record);
}

@KafkaListener(id= "consumer3",topicPartitions = {@TopicPartition(topic="${kafka.consumer.topic}", partitions = {"3"})})
public void receive4(ConsumerRecord<String, String> record) {
    proceed(record);
}

private synchronized void proceed(ConsumerRecord<String, String> record) {
    log.info("Message Received: Topic: {}, Partition: {}, Value: {}", record.topic(), record.partition(), record.value());
}

}

当我的应用程序启动时,Spring会发出警告;

WARN  [main] org.springframework.kafka.listener.ConcurrentMessageListenerContainer: When specific partitions are provided, the concurrency must be less than or equal to the number of partitions; reduced from 4 to 1

我的配置有什么问题?我能做什么来修复这个警告?

共有1个答案

周通
2023-03-14

当您显式定义分区时,并发只能设置为小于或等于分区数的数字。

您只为每个主题定义了一个分区,因此并发性只能设置为1。代码健全性检查这一点,并自动降低并发级别,并向您发出警告。

@KafkaListener(id= "consumer0",topicPartitions = {@TopicPartition(
    topic="${kafka.consumer.topic}",
    partitions = {"0"}   // <-- Only one partition
)})

多个分区将被配置如下:

@KafkaListener(id= "consumer0",topicPartitions = {@TopicPartition(
    topic="${kafka.consumer.topic}",
    partitions = {"0", "1", "2", "3" }
)})
 类似资料:
  • 我有一个包含100个分区的df,在保存到HDFS之前,我想减少分区的数量,因为拼花文件太小了( 它可以工作,但将过程从每个文件 2-3 秒减慢到每个文件 10-20 秒。当我尝试重新分区时: 这个过程一点也不慢,每个文件2-3秒。 为什么?在减少分区数量时,合并不应该总是更快,因为它避免了完全洗牌吗? 背景: 我将文件从本地存储导入spark集群,并将生成的数据帧保存为拼花文件。每个文件大约100

  • 我设置了一个Spring集成流程来处理一个有3个分区的主题,并将侦听器容器的并发性设置为3。正如所料,我看到三个线程处理来自所有3个分区的批处理。然而,我发现在某些情况下,一个侦听器线程可能处理包含来自多个分区的消息的单个批处理。在kafka中,我的数据是按id划分的,因此它可以与其他id同时处理,但不能在另一个线程上与相同的id一起处理(我很惊讶地发现这种情况正在发生)。通过阅读文档,我认为每个

  • 问题内容: 让我解释一下:我并不是在问将特定日期时间的时区存储在数据库中的正确方法。我说的是时区本身。例如: 我有一个名为“用户”的MySQL表。现在,在此表上,我希望有一列包含用户居住地的时区(由用户提供,将从列表中选择)。我正在使用PHP,其中包含类似以下时区字符串的列表: 美国时区列表 现在,显而易见的解决方案(至少对我而言)是在“用户”表中创建VARCHAR列,然后将PHP使用的时区字符串

  • 但是我的接收器只在2个分区中获取数据,但是我配置了20个流线程,并且我验证了我的生产者正在写入所有20个分区,如何知道我的转换节点转发到我的FINAL_TOPIC的所有20个分区

  • 我试图通过使用基于索引的行位置简单地获得df的子集。但是,出于某些原因,以下代码有时返回空数据帧: dist数据帧(帧是索引): 输出: 我一辈子都不明白为什么会发生这种事。

  • 在Master Hazelcast电子书“17.4.1.分区感知操作”下,它指出: 要执行分区感知操作,需要创建操作线程数组。 单个操作线程对多个分区执行操作; 每个分区只属于1个操作线程。 忽略备份和近缓存,当我创建一个IMap实例时,这是否意味着我只能有一个并发的put/get操作在整个集群的每个map分区上执行?进一步说,如果我附加了一个MapStore,这是否意味着我只能对我的后端数据库运