我有2个服务(都是在kotlin上的Spring引导)。将其命名为“客户机”和“服务器”,由于某些限制,我必须使用Kafka同步请求-应答模式。所以我尝试使用ReplyingKafkatemplate。我的问题是我需要为多个实体使用。意味着创建多个ReplyingKafkaTemplate,一个用于“foo”,第二个用于“bar”。因此,在我的代码中,我用setup创建了多个KafkaConfig类,每个实体和一个基本配置。
另外,我将KafkaAutoConfig排除在加载之外。下面是“服务器”端的配置(W/O kreplying kafka模板):
@Configuration
@EnableKafka
class KafkaConfig @Autowired constructor(
@Value("\${kafka.bootstrap-servers}")
private var bootstrapServers: String,
@Value("\${kafka.consumer-group.name}")
private var consumerGroup: String,
@Value("\${kafka.consumer-group.id}")
private var groupId: Number
) {
@Bean("kafkaProducerConfig")
fun producerConfigs(): MutableMap<String, Any> {
return mutableMapOf(
Pair(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9093,kafka3:9094"),
Pair(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java),
Pair(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer::class.java),
Pair(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"),
Pair(ProducerConfig.ACKS_CONFIG, "all"),
Pair(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"),
Pair(ProducerConfig.RETRIES_CONFIG, Int.MAX_VALUE.toString()),
Pair(ProducerConfig.LINGER_MS_CONFIG, "20"),
Pair(ProducerConfig.BATCH_SIZE_CONFIG, (32 * 1024).toString()),
Pair(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy")
)
}
@Bean("kafkaConsumerConfig")
fun consumerConfigs(): Map<String, Any> {
return mutableMapOf(
Pair(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9093,kafka3:9094"),
Pair(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer::class.java),
Pair(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer::class.java),
Pair(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"),
Pair(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup + groupId)
)
}
@Bean("kafkaAdminConfig")
fun admin(): KafkaAdmin {
val configs: MutableMap<String, Any> = HashMap()
configs[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
return KafkaAdmin(configs)
}
}
FOO(BAR与FOO相同的配置,而不是其他实体和bean的命名)配置是:
@Configuration
class KafkaConfigForFOO {
@Value("\${kafka.topic.request-consumable-topic}")
private lateinit var requestConsumableTopic: String
@Value("\${kafka.request-reply.timeout-ms}")
private lateinit var replyTimeout: Number
@Bean("requestFOOTopicConfig")
fun requestConsumableTopic(): NewTopic {
val configs: MutableMap<String, String> = HashMap()
configs["retention.ms"] = replyTimeout.toString()
return NewTopic(requestConsumableTopic, 6, 3.toShort()).configs(configs)
}
@Bean("producerFactoryForFOO")
@Autowired
fun producerFactoryForFOO(@Qualifier("kafkaProducerConfig") producerConfigs: MutableMap<String, Any>):
ProducerFactory<String, FOO> = DefaultKafkaProducerFactory(producerConfigs)
@Bean("kafkaTemplateForFOO")
@Autowired
fun kafkaTemplateForFOO(@Qualifier("producerFactoryForFOO") producerFactory: ProducerFactory<String, FOO>):
KafkaTemplate<String, FOO> = KafkaTemplate(producerFactory)
@Bean("consumerFactoryForFOO")
@Autowired
fun consumerFactoryForFOO(@Qualifier("kafkaConsumerConfig") consumerConfigs: MutableMap<String, Any>):
ConsumerFactory<String, FOO> = DefaultKafkaConsumerFactory(consumerConfigs, StringDeserializer(), JsonDeserializer(FOO::class.java))
@Bean("kafkaListenerContainerFactoryForFOO")
@Autowired
fun kafkaListenerContainerFactoryForFOO(
@Qualifier("consumerFactoryForFOO") consumerFactory: ConsumerFactory<String, FOO>,
@Qualifier("kafkaTemplateForFOO") kafkaTemplate: KafkaTemplate<String, FOO>
):
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, FOO>> {
val factory = ConcurrentKafkaListenerContainerFactory<String, FOO>()
factory.consumerFactory = consumerFactory
factory.setReplyTemplate(kafkaTemplate)
return factory
}
}
@Component
class FOOReplyingKafkaConsumer @Autowired constructor(
private val fooService: FooService
) {
@KafkaListener(topics = ["\${kafka.topic.request-FOO-topic}"], containerFactory = "kafkaListenerContainerFactoryForFoo", groupId = "\${spring.kafka.consumer.group-id}")
@SendTo()
fun cropListen(request: FOO): FOO{
return FOO(fooService.getAllByIds(request.ids ?: mutableSetOf()).toMutableSet())
}
}
你不需要两个消费者工厂;类型擦除意味着它在运行时是不相关的。
引导将一个配置为
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
它实际上是
(或者在Kotlin中是
)。
public KafkaTemplate<?, ?> kafkaTemplate
1-我有一个带有Spring Boot的API,我需要配置两个DBMS(MySQL和Postgres)。 2-对于每个DBMS,我需要配置不同的配置文件。(Dev,Prod) 遵循我的MySQL配置类: 我以为我的出口是: 重要: 当我将配置文件配置为连接到我的MySQL Dev数据库时,我只想连接到它。我想要同样的结果,当它是MySQL的Prod的基础时。 当我将配置文件配置为连接到我的Post
我想使用Spring Boot和JPA将GeoJSON保存到数据库 JSON对象示例: 我试图使用@ElementCollection和@Embedded配置实体。 GeoJson实体: 特点: 属性: 几何: 坐标: 此配置出现错误: 调用init方法失败;嵌套的异常是javax。坚持不懈PersistenceException:[PersistenceUnit:default]无法构建Hibe
在Spring Hibernate XML配置中,我有 我知道SpringJava配置的等价物如下:LocalSessionFactoryBuilder(dataSource())。AddAnnotatedClass(Foo.class)。buildSessionFactory(); 我的问题是,如果我不使用LocalSessionFactoryBuilder类,而是使用HibernateJpaV
我有一个Windows服务器,目前运行两个不同的Tomcat实例作为Windows服务。两者都有自己的目录,并且在它们之间不共享任何文件。通过设置向导安装的第一个Tomcat实例设置了CATALINA_HOME和CATALINA_BASE环境变量。第二个以相同的方式安装。它忽略全局设置并作为独立的实例运行。 问题来了。我需要安装一个自带Tomcat的产品。我已经完成了产品的安装,但是现在我需要配置
问题内容: 我目前正在构建一个库以对我的一些代码进行模块化,并且我遇到了Hibernate的问题。 在我的主应用程序中,我有一个hibernate配置来获取运行所需的信息,但是我的库中也需要hibernate,因为我想要的某些对象可以在其他应用程序中使用。 当我启动两个hibernate设置的tomcat服务器时,出现错误,指出无法解析bean,并且说我的查询中缺少位置参数的bean。但是,当我仅
我有一个XSD文件,我想在其中放置几个图表,以及服务的描述。添加时 我得到错误:“多个根标记” 如果我这样写: 我得到错误: 发现以元素{开头的内容无效http://www.w3.org/2001/XMLSchema“:架构}。其中一个{”http://www.w3.org/2001/XMLSchema“:simpleType,”http://www.w3.org/2001/XMLSchema“: