我有一个spring boot应用程序,用于向kafka发送消息。该应用程序在每天1000万个请求的高流量下在6个实例上运行。我也有一款春装Kafka消费应用。但是这个应用程序有2个实例,这些实例不能使用所有的消息,因为这个应用程序运行的是单线程。我的主题有4个分区,我想根据分区数做消费者应用程序多线程。但是我不确定我的代码是否有效。
SpringKafka独立
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.1.7.RELEASE</version>
</dependency>
配置类
@Configuration
@EnableKafka
public class KafkaListenerConfig {
private final KafkaListenerProperties kafkaListenerProperties;
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(4);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(kafkaConsumerProps());
}
@Bean
public Map<String, Object> kafkaConsumerProps() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaListenerProperties.getBootstrap());
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaListenerProperties.getGroup());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
}
根据我主题的分区计数,我将并发字段设置为 4
factory.setConcurrency(4);
主题分区:
Topic:fraudSSCALogs PartitionCount:4 ReplicationFactor:1 Configs:
Topic: fraudSSCALogs Partition: 0 Leader: 62 Replicas: 62 Isr: 62
Topic: fraudSSCALogs Partition: 1 Leader: 166 Replicas: 166 Isr: 166
Topic: fraudSSCALogs Partition: 2 Leader: 270 Replicas: 270 Isr: 270
Topic: fraudSSCALogs Partition: 3 Leader: 167 Replicas: 167 Isr: 167
侦听器类
public class SSCAReqResLogListener {
@KafkaListener(id= "consumer0",topicPartitions = {@TopicPartition(topic="${kafka.consumer.topic}", partitions = {"0"})})
public void receive1(ConsumerRecord<String, String> record) {
proceed(record);
}
@KafkaListener(id= "consumer1",topicPartitions = {@TopicPartition(topic="${kafka.consumer.topic}", partitions = {"1"})})
public void receive2(ConsumerRecord<String, String> record) {
proceed(record);
}
@KafkaListener(id= "consumer2",topicPartitions = {@TopicPartition(topic="${kafka.consumer.topic}", partitions = {"2"})})
public void receive3(ConsumerRecord<String, String> record) {
proceed(record);
}
@KafkaListener(id= "consumer3",topicPartitions = {@TopicPartition(topic="${kafka.consumer.topic}", partitions = {"3"})})
public void receive4(ConsumerRecord<String, String> record) {
proceed(record);
}
private synchronized void proceed(ConsumerRecord<String, String> record) {
log.info("Message Received: Topic: {}, Partition: {}, Value: {}", record.topic(), record.partition(), record.value());
}
}
当我的应用程序启动时,Spring会发出警告;
WARN [main] org.springframework.kafka.listener.ConcurrentMessageListenerContainer: When specific partitions are provided, the concurrency must be less than or equal to the number of partitions; reduced from 4 to 1
我的配置有什么问题?我能做什么来修复这个警告?
当您显式定义分区时,并发
只能设置为小于或等于分区数的数字。
您只为每个主题定义了一个分区,因此并发性只能设置为1
。代码健全性检查这一点,并自动降低并发级别,并向您发出警告。
@KafkaListener(id= "consumer0",topicPartitions = {@TopicPartition(
topic="${kafka.consumer.topic}",
partitions = {"0"} // <-- Only one partition
)})
多个分区将被配置如下:
@KafkaListener(id= "consumer0",topicPartitions = {@TopicPartition(
topic="${kafka.consumer.topic}",
partitions = {"0", "1", "2", "3" }
)})
我有一个包含100个分区的df,在保存到HDFS之前,我想减少分区的数量,因为拼花文件太小了( 它可以工作,但将过程从每个文件 2-3 秒减慢到每个文件 10-20 秒。当我尝试重新分区时: 这个过程一点也不慢,每个文件2-3秒。 为什么?在减少分区数量时,合并不应该总是更快,因为它避免了完全洗牌吗? 背景: 我将文件从本地存储导入spark集群,并将生成的数据帧保存为拼花文件。每个文件大约100
我设置了一个Spring集成流程来处理一个有3个分区的主题,并将侦听器容器的并发性设置为3。正如所料,我看到三个线程处理来自所有3个分区的批处理。然而,我发现在某些情况下,一个侦听器线程可能处理包含来自多个分区的消息的单个批处理。在kafka中,我的数据是按id划分的,因此它可以与其他id同时处理,但不能在另一个线程上与相同的id一起处理(我很惊讶地发现这种情况正在发生)。通过阅读文档,我认为每个
问题内容: 让我解释一下:我并不是在问将特定日期时间的时区存储在数据库中的正确方法。我说的是时区本身。例如: 我有一个名为“用户”的MySQL表。现在,在此表上,我希望有一列包含用户居住地的时区(由用户提供,将从列表中选择)。我正在使用PHP,其中包含类似以下时区字符串的列表: 美国时区列表 现在,显而易见的解决方案(至少对我而言)是在“用户”表中创建VARCHAR列,然后将PHP使用的时区字符串
但是我的接收器只在2个分区中获取数据,但是我配置了20个流线程,并且我验证了我的生产者正在写入所有20个分区,如何知道我的转换节点转发到我的FINAL_TOPIC的所有20个分区
我试图通过使用基于索引的行位置简单地获得df的子集。但是,出于某些原因,以下代码有时返回空数据帧: dist数据帧(帧是索引): 输出: 我一辈子都不明白为什么会发生这种事。
在Master Hazelcast电子书“17.4.1.分区感知操作”下,它指出: 要执行分区感知操作,需要创建操作线程数组。 单个操作线程对多个分区执行操作; 每个分区只属于1个操作线程。 忽略备份和近缓存,当我创建一个IMap实例时,这是否意味着我只能有一个并发的put/get操作在整个集群的每个map分区上执行?进一步说,如果我附加了一个MapStore,这是否意味着我只能对我的后端数据库运