当前位置: 首页 > 知识库问答 >
问题:

Flink使用者连接到具有多个分区的Kafka集群的java.lang.RuntimeException

何涵忍
2023-03-14
java.lang.RuntimeException: topicName
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.getAllPartitionsForTopics(KafkaPartitionDiscoverer.java:80)
    at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:131)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:508)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)

我的消费者代码如下所示:

  def defineKafkaDataStream[A: TypeInformation](topic: String,
                                                env: StreamExecutionEnvironment,
                                                SASL_username:String,
                                                SASL_password:String,
                                                kafkaBootstrapServer: String = "localhost:9092",
                                                zookeeperHost: String = "localhost:2181",
                                                groupId: String = "test"
                                               )(implicit c: JsonConverter[A]): DataStream[A] = {
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", kafkaBootstrapServer)
    properties.setProperty("security.protocol" , "SASL_SSL")
    properties.setProperty("sasl.mechanism" , "PLAIN")
    val jaasTemplate = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";"
    val jaasConfig = String.format(jaasTemplate, SASL_username, SASL_password)
    properties.setProperty("sasl.jaas.config", jaasConfig)
    properties.setProperty("group.id", "MyConsumerGroup")

    env
      .addSource(new FlinkKafkaConsumer(topic, new JSONKeyValueDeserializationSchema(true), properties))
      .map(x => x.convertTo[A](c))
  }

我是否应该设置另一个属性来允许单个作业从多个分区使用?

共有1个答案

龙佐
2023-03-14

在我的过程中,经过四处挖掘和质疑,我发现了问题。

我查看了出现运行时异常的KafkaPartitionDiscoverer函数的Java代码。

我注意到的一节处理了RuntimeException

if (kafkaPartitions == null) {
    throw new RuntimeException("Could not fetch partitions for %s. Make sure that the topic exists.".format(topic));
    }
kafka-topics --describe --zookeeper serverIP:2181 --topic topicName
Error while executing topic command : Topics in [] does not exist
ERROR java.lang.IllegalArgumentException: Topics in [] does not exist
    at kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:435)
    at kafka.admin.TopicCommand$ZookeeperTopicService.describeTopic(TopicCommand.scala:350)
    at kafka.admin.TopicCommand$.main(TopicCommand.scala:66)
    at kafka.admin.TopicCommand.main(TopicCommand.scala)
 类似资料:
  • 我有一个Spring启动应用程序,它使用来自 Kafka 集群中某个主题(例如 topic1)的消息。这就是我的代码目前的样子。 现在我想从另一个Kafka集群中的不同主题开始消费。一种方法是为此创建另一个bean。但是有更好的方法吗?

  • 我的消费者并不是每次都能收到信息。我有3个代理(3个服务器)的Kafka集群,有3个主题和复制因子3的分区。 我有Java中的消费者,我将最大轮询记录设置在50000获取字节上,配置在50MB上。应用程序每分钟都进行轮询。当我向主题“my-topic”发送10条消息时,consumer不会给我所有的消息,而是只给我其中的一部分,其余的将在下一次运行中给我。消息是在applicatin睡眠期间由脚本

  • 我们正在使用Spring kafka来消费消息。我们已经为每个分区创建了接收消息的接收器。现在我们需要多个接收者从单个分区接收消息。 对于例如。假设我们有一个分区0。目前,我们只有一个接收器(接收器1)从这个分区接收消息。现在我想为同一个分区(分区0)添加另一个接收器(接收器2)。 因此,如果生产者向这个分区发送100条消息,接收器1应该接收50条消息,其余50条消息应该在接收器2中接收。我不希望

  • 我使用flink和Kafka创建了一个流媒体程序,用于流媒体mongodb oplog。根据与Flink支持团队的讨论,流的顺序不能通过kafka分区来保证。我已经创建了N个kafka分区,并希望每个分区创建N个flink kafka消费者,所以流的顺序应该至少在特定的分区中保持。请建议我是否可以创建分区特定的flink kafka消费者? 我正在使用env.setParallelism(N)进行

  • 我知道每个分区分配给一个Kafka消费者(在消费者组内),但一个Kafka消费者可以同时使用多个分区。如果每个用户都有一个到分区的开放连接,那么我可以想象每个用户都有成千上万个打开的连接。如果这是真的,那么在决定分区数量时,这似乎是需要注意的,不是吗?

  • 我们希望使用Kafka connect sink连接器将消息从Kafka复制到Mongo DB。在我们的用例中,我们有多个主题,每个主题都有一个分区(主题的名称可以用正则表达式表示,例如topic.XXX.name)。这些主题的数量在不断增加。我想知道Kafka connect架构是否适合这个用例。如果是这样,如何配置它的增益高可缩放性和并行性?任务是什么。最大值?工人数量?