我想从flink读一个Kafka的题目
package Toletum.pruebas; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; public class LeeKafka { public static void main(String[] args) throws Exception { final ParameterTool parameterTool = ParameterTool.fromArgs(args); // create execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); FlinkKafkaConsumer082 kafkaSrc = new FlinkKafkaConsumer082("test02", new SimpleStringSchema(), parameterTool.getProperties()); DataStream messageStream = env.addSource(kafkaSrc); messageStream.rebalance().map(new MapFunction() { private static final long serialVersionUID = -6867736771747690202L; public String map(String value) throws Exception { return "Kafka and Flink says: " + value; } }).print(); env.execute("LeeKafka"); } }
此代码成功运行:
java -cp Package.jar Toletum.pruebas.LeeKafka --topic test02 --bootstrap.servers kafka:9092 --zookeeper.connect zookeeper:2181 --group.id myGroup
但是,当我尝试使用from flink:
flink run -c Toletum.pruebas.LeeKafka pruebas-0.0.1-SNAPSHOT-jar-with-dependencies.jar --topic test02 --bootstrap.servers kafka:9092 --zookeeper.connect zookeeper:2181 --group.id myGroup
我得到一个错误:
java.lang.NoSuchMethodError: org.apache.flink.util.NetUtils.getHostnamePort(Ljava/lang/String;)Ljava/net/URL; at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.getPartitionsForTopic(FlinkKafkaConsumer.java:592) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:280) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082.(FlinkKafkaConsumer082.java:49) at Toletum.pruebas.LeeKafka.main(LeeKafka.java:22) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395) at org.apache.flink.client.program.Client.runBlocking(Client.java:252) at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:676) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:978) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1028)
这个问题是由于使用了旧版本的FLink连接器库。
您可以查看最新的可用库并下载最新的Maven依赖项。
你正在使用的卡夫卡版本也应该被考虑。
尝试使用Flink文档中的最新Maven依赖项用于Kafka Connector
最新的maven依赖项是
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.8_2.10</artifactId>
<version>1.3.2</version>
</dependency>
旧版本库.....
正确的pom.xml:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>0.10.1</version>
</dependency>
Apache Flink Apache Flink is an open source stream processing framework with powerful stream- and batch-processing capabilities. Learn more about Flink at https://flink.apache.org/ Features A streamin
主要内容:1.CDC概述,2.Flink CDC 编码,3.利用自定义格式编码,4.Flink Sql 编码,5.Flink CDC 2.0 的新特性1.CDC概述 1.1 CDC CDC 是 Change Data Capture()的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。 1.2 CDC 分类 分为查询CDC 和 Binlog CDC 常见的CDC 方案比较 1.3
Apache Flink 是高效和分布式的通用数据处理平台,是一个流批一体分析引擎。 Apache Flink 声明式的数据分析开源系统,结合了分布式 MapReduce 类平台的高效,灵活的编程和扩展性。同时在并行数据库发现查询优化方案。 要求 Unix 类环境(Linux, Mac OS X, Cygwin) git Maven (at least version 3.0.4) Java 6,
Flink 学习 麻烦路过的各位亲给这个项目点个 star,太不易了,写了这么多,算是对我坚持下来的一种鼓励吧! Stargazers over time 本项目结构 How to build Maybe your Maven conf file settings.xml mirrors can add aliyun central mirror : <mirror> <id>alimaven<
一、Data Sinks 在使用 Flink 进行数据处理时,数据经 Data Source 流入,然后通过系列 Transformations 的转化,最终可以通过 Sink 将计算结果进行输出,Flink Data Sinks 就是用于定义数据流最终的输出位置。Flink 提供了几个较为简单的 Sink API 用于日常的开发,具体如下: 1.1 writeAsText writeAsText
一、Transformations 分类 Flink 的 Transformations 操作主要用于将一个和多个 DataStream 按需转换成新的 DataStream。它主要分为以下三类: DataStream Transformations:进行数据流相关转换操作; Physical partitioning:物理分区。Flink 提供的底层 API ,允许用户定义数据的分区规则; Ta