我正在学习使用Kafka在科特林的SpringKafka。我知道,当一个新主题发布时,如果不存在,它就会被创建。所以,当我向从Spring创建的新/旧主题发送一个值时,默认分区是0,但我想在另一个分区上写一条消息,比如分区1。
当我创建/写一个主题时,它是有效的:
val topicTesteKotlin = "topico-teste-kotlin"
fun sendTopicCallback(@PathVariable message : String) : ResponseEntity<String> {
val msg = Optional.of(message)
return if (msg.isPresent) {
kafkaTemplate.send(topicTesteKotlin, message).addCallback({
println("Sent message=[" + message +
"] with offset=[" + it!!.recordMetadata.offset() + "]")
}, {
println("Unable to send message=["
+ message + "] due to : " + it.message)
})
ResponseEntity.ok(msg.get())
} else {
kafkaTemplate.send(topicTesteKotlin, "GET /send_topic_callback/message BadRequest > $message")
ResponseEntity.badRequest().body("Bad request!")
}
}
但是,当我使用以下选项选择分区和密钥时:
val topicTesteKotlin = "topico-teste-kotlin"
fun sendTopicCallback(@PathVariable message : String) : ResponseEntity<String> {
val msg = Optional.of(message)
return if (msg.isPresent) {
kafkaTemplate.send(topicTesteKotlin, 1, "1", message).addCallback({
println("Sent message=[" + message +
"] with offset=[" + it!!.recordMetadata.offset() + "]")
}, {
println("Unable to send message=["
+ message + "] due to : " + it.message)
})
ResponseEntity.ok(msg.get())
} else {
kafkaTemplate.send(topicTesteKotlin, "GET /send_topic_callback/message BadRequest > $message")
ResponseEntity.badRequest().body("Bad request!")
}
}
我得到了以下错误:
org.apache.kafka.common.KafkaException: Invalid partition given with record: 1 is not in the range [0...1).
我试着把钥匙换成0.1
,但也没用。显然,当我从Spring客户端创建一个主题时,只创建了一个分区,即0
。
Kafka制作人配置
@Configuration
class KafkaProducerConfig {
@Bean
fun producerFactory() : ProducerFactory<String, String> {
val configProps = HashMap<String,Any>()
configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092"
configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
return DefaultKafkaProducerFactory(configProps)
}
@Bean
fun kafkaTemplate() : KafkaTemplate<String, String> {
return KafkaTemplate(producerFactory())
}
}
那么,如何从Spring Kafka客户端创建分区呢?
您可以使用以下代码管理主题创建机制:
@Configuration
public class KafkaTopicConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
private String testTopicName = "topico-teste-kotlin";
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic testTopic() {
// second parameter is a number of partitions
return new NewTopic(testTopicName, 2, (short) 1);
}
}
本文向大家介绍创建topic时如何选择合适的分区数?相关面试题,主要包含被问及创建topic时如何选择合适的分区数?时的应答技巧和注意事项,需要的朋友参考一下 根据集群的机器数量和需要的吞吐量来决定适合的分区数
使用命令时: /usr/local/kafka/bin/kafka-topics.sh--创建--zookeeper localhost:2181--复制-因子1--分区1--主题测试 提前道谢。
我有@KafkaListener使用topicPattern与正则表达式,工作正常(foo。*),但现在我想将侦听器分配给所有匹配主题的所有分区。 https://docs.spring.io/spring-kafka/docs/2.6.1/reference/html/#tip-assign-all-parts并没有真正帮助我,因为我不知道主题名称。
我有以下制表符分隔的示例数据集: 我正在对此数据运行一些转换,最终数据位于spark dataset中。之后,我用“period”分区将该数据集写入s3。因为我也希望在s3文件中使用period,所以我正在从from period列创建另一列“datasetperiod”。 我的scala函数来保存TSV数据集。 在S3上保存数据集的Scala代码。为S3上的分区添加新列datasetPeriod
null camel-kafka中是否有任何配置,我们可以使用它来增加kafka主题分区计数?