当前位置: 首页 > 知识库问答 >
问题:

Flink+Kafka:getHostnamePort

戚翼
2023-03-14

我想从flink读一个Kafka的题目


    package Toletum.pruebas;

    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082;
    import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

    public class LeeKafka {
        public static void main(String[] args) throws Exception {
            final ParameterTool parameterTool = ParameterTool.fromArgs(args);

            // create execution environment
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

            FlinkKafkaConsumer082 kafkaSrc = new FlinkKafkaConsumer082("test02", 
          new SimpleStringSchema(), 
          parameterTool.getProperties());

            DataStream messageStream = env.addSource(kafkaSrc);

            messageStream.rebalance().map(new MapFunction() {
                private static final long serialVersionUID = -6867736771747690202L;

                public String map(String value) throws Exception {
                    return "Kafka and Flink says: " + value;
                }
            }).print();

            env.execute("LeeKafka");
        }

    }

此代码成功运行:


    java -cp Package.jar Toletum.pruebas.LeeKafka --topic test02 --bootstrap.servers kafka:9092 --zookeeper.connect zookeeper:2181 --group.id myGroup

但是,当我尝试使用from flink:


    flink run -c Toletum.pruebas.LeeKafka pruebas-0.0.1-SNAPSHOT-jar-with-dependencies.jar --topic test02 --bootstrap.servers kafka:9092 --zookeeper.connect zookeeper:2181 --group.id myGroup

我得到一个错误:

java.lang.NoSuchMethodError: org.apache.flink.util.NetUtils.getHostnamePort(Ljava/lang/String;)Ljava/net/URL;
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.getPartitionsForTopic(FlinkKafkaConsumer.java:592)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:280)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082.(FlinkKafkaConsumer082.java:49)
        at Toletum.pruebas.LeeKafka.main(LeeKafka.java:22)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
        at org.apache.flink.client.program.Client.runBlocking(Client.java:252)
        at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:676)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)
        at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:978)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1028)

共有2个答案

顾淳
2023-03-14

这个问题是由于使用了旧版本的FLink连接器库。

您可以查看最新的可用库并下载最新的Maven依赖项。

你正在使用的卡夫卡版本也应该被考虑。

尝试使用Flink文档中的最新Maven依赖项用于Kafka Connector

最新的maven依赖项是

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka-0.8_2.10</artifactId>
  <version>1.3.2</version>
</dependency>
洪育
2023-03-14

旧版本库.....

正确的pom.xml



            &ltdependency>
                    &ltgroupId&gtorg.apache.flink</groupId>
                    &ltartifactId&gtflink-connector-kafka</artifactId>
                    &ltversion&gt0.10.1</version>
            </dependency>

 类似资料:
  • Apache Flink Apache Flink is an open source stream processing framework with powerful stream- and batch-processing capabilities. Learn more about Flink at https://flink.apache.org/ Features A streamin

  • 主要内容:1.CDC概述,2.Flink CDC 编码,3.利用自定义格式编码,4.Flink Sql 编码,5.Flink CDC 2.0 的新特性1.CDC概述 1.1 CDC CDC 是 Change Data Capture()的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。 1.2 CDC 分类 分为查询CDC 和 Binlog CDC 常见的CDC 方案比较 1.3

  • Apache Flink 是高效和分布式的通用数据处理平台,是一个流批一体分析引擎。 Apache Flink 声明式的数据分析开源系统,结合了分布式 MapReduce 类平台的高效,灵活的编程和扩展性。同时在并行数据库发现查询优化方案。 要求 Unix 类环境(Linux, Mac OS X, Cygwin) git Maven (at least version 3.0.4) Java 6,

  • Flink 学习 麻烦路过的各位亲给这个项目点个 star,太不易了,写了这么多,算是对我坚持下来的一种鼓励吧! Stargazers over time 本项目结构 How to build Maybe your Maven conf file settings.xml mirrors can add aliyun central mirror : <mirror> <id>alimaven<

  • 一、Data Sinks 在使用 Flink 进行数据处理时,数据经 Data Source 流入,然后通过系列 Transformations 的转化,最终可以通过 Sink 将计算结果进行输出,Flink Data Sinks 就是用于定义数据流最终的输出位置。Flink 提供了几个较为简单的 Sink API 用于日常的开发,具体如下: 1.1 writeAsText writeAsText

  • 一、Transformations 分类 Flink 的 Transformations 操作主要用于将一个和多个 DataStream 按需转换成新的 DataStream。它主要分为以下三类: DataStream Transformations:进行数据流相关转换操作; Physical partitioning:物理分区。Flink 提供的底层 API ,允许用户定义数据的分区规则; Ta