当前位置: 首页 > 知识库问答 >
问题:

为多个实体配置ReplyingKafkaTemplate

丌官嘉良
2023-03-14

我有2个服务(都是在kotlin上的Spring引导)。将其命名为“客户机”和“服务器”,由于某些限制,我必须使用Kafka同步请求-应答模式。所以我尝试使用ReplyingKafkatemplate。我的问题是我需要为多个实体使用。意味着创建多个ReplyingKafkaTemplate,一个用于“foo”,第二个用于“bar”。因此,在我的代码中,我用setup创建了多个KafkaConfig类,每个实体和一个基本配置。

另外,我将KafkaAutoConfig排除在加载之外。下面是“服务器”端的配置(W/O kreplying kafka模板):

@Configuration
@EnableKafka
class KafkaConfig @Autowired constructor(
    @Value("\${kafka.bootstrap-servers}")
    private var bootstrapServers: String,
    @Value("\${kafka.consumer-group.name}")
    private var consumerGroup: String,
    @Value("\${kafka.consumer-group.id}")
    private var groupId: Number
) {

    @Bean("kafkaProducerConfig")
    fun producerConfigs(): MutableMap<String, Any> {
        return mutableMapOf(
            Pair(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9093,kafka3:9094"),
            Pair(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java),
            Pair(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer::class.java),
            Pair(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"),
            Pair(ProducerConfig.ACKS_CONFIG, "all"),
            Pair(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"),
            Pair(ProducerConfig.RETRIES_CONFIG, Int.MAX_VALUE.toString()),
            Pair(ProducerConfig.LINGER_MS_CONFIG, "20"),
            Pair(ProducerConfig.BATCH_SIZE_CONFIG, (32 * 1024).toString()),
            Pair(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy")
        )
    }

    @Bean("kafkaConsumerConfig")
    fun consumerConfigs(): Map<String, Any> {
        return mutableMapOf(
            Pair(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9093,kafka3:9094"),
            Pair(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer::class.java),
            Pair(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer::class.java),
            Pair(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"),
            Pair(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup + groupId)
        )
    }

    @Bean("kafkaAdminConfig")
    fun admin(): KafkaAdmin {
        val configs: MutableMap<String, Any> = HashMap()
        configs[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
        return KafkaAdmin(configs)
    }
}

FOO(BAR与FOO相同的配置,而不是其他实体和bean的命名)配置是:

@Configuration
class KafkaConfigForFOO {

    @Value("\${kafka.topic.request-consumable-topic}")
    private lateinit var requestConsumableTopic: String

    @Value("\${kafka.request-reply.timeout-ms}")
    private lateinit var replyTimeout: Number

    @Bean("requestFOOTopicConfig")
    fun requestConsumableTopic(): NewTopic {
        val configs: MutableMap<String, String> = HashMap()
        configs["retention.ms"] = replyTimeout.toString()
        return NewTopic(requestConsumableTopic, 6, 3.toShort()).configs(configs)
    }

    @Bean("producerFactoryForFOO")
    @Autowired
    fun producerFactoryForFOO(@Qualifier("kafkaProducerConfig") producerConfigs: MutableMap<String, Any>):
        ProducerFactory<String, FOO> = DefaultKafkaProducerFactory(producerConfigs)

    @Bean("kafkaTemplateForFOO")
    @Autowired
    fun kafkaTemplateForFOO(@Qualifier("producerFactoryForFOO") producerFactory: ProducerFactory<String, FOO>):
        KafkaTemplate<String, FOO> = KafkaTemplate(producerFactory)

    @Bean("consumerFactoryForFOO")
    @Autowired
    fun consumerFactoryForFOO(@Qualifier("kafkaConsumerConfig") consumerConfigs: MutableMap<String, Any>):
        ConsumerFactory<String, FOO> = DefaultKafkaConsumerFactory(consumerConfigs, StringDeserializer(), JsonDeserializer(FOO::class.java))

    @Bean("kafkaListenerContainerFactoryForFOO")
    @Autowired
    fun kafkaListenerContainerFactoryForFOO(
        @Qualifier("consumerFactoryForFOO") consumerFactory: ConsumerFactory<String, FOO>,
        @Qualifier("kafkaTemplateForFOO") kafkaTemplate: KafkaTemplate<String, FOO>
    ):
        KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, FOO>> {
        val factory = ConcurrentKafkaListenerContainerFactory<String, FOO>()
        factory.consumerFactory = consumerFactory
        factory.setReplyTemplate(kafkaTemplate)
        return factory
    }
}
@Component
class FOOReplyingKafkaConsumer @Autowired constructor(
    private val fooService: FooService
) {
    @KafkaListener(topics = ["\${kafka.topic.request-FOO-topic}"], containerFactory = "kafkaListenerContainerFactoryForFoo", groupId = "\${spring.kafka.consumer.group-id}")
    @SendTo()
    fun cropListen(request: FOO): FOO{
        return FOO(fooService.getAllByIds(request.ids ?: mutableSetOf()).toMutableSet())
    }
}

共有1个答案

金子平
2023-03-14

你不需要两个消费者工厂;类型擦除意味着它在运行时是不相关的。

引导将一个配置为

ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(

它实际上是 (或者在Kotlin中是 )。

public KafkaTemplate<?, ?> kafkaTemplate
 类似资料:
  • 1-我有一个带有Spring Boot的API,我需要配置两个DBMS(MySQL和Postgres)。 2-对于每个DBMS,我需要配置不同的配置文件。(Dev,Prod) 遵循我的MySQL配置类: 我以为我的出口是: 重要: 当我将配置文件配置为连接到我的MySQL Dev数据库时,我只想连接到它。我想要同样的结果,当它是MySQL的Prod的基础时。 当我将配置文件配置为连接到我的Post

  • 我想使用Spring Boot和JPA将GeoJSON保存到数据库 JSON对象示例: 我试图使用@ElementCollection和@Embedded配置实体。 GeoJson实体: 特点: 属性: 几何: 坐标: 此配置出现错误: 调用init方法失败;嵌套的异常是javax。坚持不懈PersistenceException:[PersistenceUnit:default]无法构建Hibe

  • 在Spring Hibernate XML配置中,我有 我知道SpringJava配置的等价物如下:LocalSessionFactoryBuilder(dataSource())。AddAnnotatedClass(Foo.class)。buildSessionFactory(); 我的问题是,如果我不使用LocalSessionFactoryBuilder类,而是使用HibernateJpaV

  • 我有一个Windows服务器,目前运行两个不同的Tomcat实例作为Windows服务。两者都有自己的目录,并且在它们之间不共享任何文件。通过设置向导安装的第一个Tomcat实例设置了CATALINA_HOME和CATALINA_BASE环境变量。第二个以相同的方式安装。它忽略全局设置并作为独立的实例运行。 问题来了。我需要安装一个自带Tomcat的产品。我已经完成了产品的安装,但是现在我需要配置

  • 问题内容: 我目前正在构建一个库以对我的一些代码进行模块化,并且我遇到了Hibernate的问题。 在我的主应用程序中,我有一个hibernate配置来获取运行所需的信息,但是我的库中也需要hibernate,因为我想要的某些对象可以在其他应用程序中使用。 当我启动两个hibernate设置的tomcat服务器时,出现错误,指出无法解析bean,并且说我的查询中缺少位置参数的bean。但是,当我仅

  • 我有一个XSD文件,我想在其中放置几个图表,以及服务的描述。添加时 我得到错误:“多个根标记” 如果我这样写: 我得到错误: 发现以元素{开头的内容无效http://www.w3.org/2001/XMLSchema“:架构}。其中一个{”http://www.w3.org/2001/XMLSchema“:simpleType,”http://www.w3.org/2001/XMLSchema“: