当前位置: 首页 > 知识库问答 >
问题:

kafka-apache flink执行log4j错误

文国发
2023-03-14

我试图用Kafka不等式运行一个简单的Apache Flink脚本,但在执行过程中一直存在问题。脚本应该阅读来自Kafka制作人的消息,对其进行详细阐述,然后将处理结果再次发送回其他主题。我从这里得到了这个例子:http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Simple-Flink-Kafka-Test-td4828.html

我的错误是:

Exception in thread "main" java.lang.NoSuchFieldError:ALL 
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenera‌tor.createJobGraph(S‌​treamingJobGraphGene‌​rator.java:86) 
at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph‌​(StreamGraph.java:42‌​9) 
at org.apache.flink.streaming.api.environment.LocalStreamEnviro‌nment.execute(LocalS‌​treamEnvironment.jav‌​a:46) 

在org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalS treamEnvironment.jav: 33)

这是我的代码:

public class App {
      public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
            Properties properties = new Properties(); 
            properties.setProperty("bootstrap.servers", "localhost:9092"); 

            //properties.setProperty("zookeeper.connect", "localhost:2181"); 
            properties.setProperty("group.id", "javaflink"); 

            DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer010<String>("test", new SimpleStringSchema(), properties));
            System.out.println("Step D"); 
            messageStream.map(new MapFunction<String, String>(){ 

                    public String map(String value) throws Exception { 
                            // TODO Auto-generated method stub 
                            return "Blablabla " +  value; 
                    } 
            }).addSink(new FlinkKafkaProducer010("localhost:9092", "demo2", new SimpleStringSchema())); 
            env.execute(); 
      }
}

这些是pom。xml依赖项:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>1.3.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java_2.11</artifactId>
    <version>0.10.2</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.11</artifactId>
    <version>1.3.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-core</artifactId>
    <version>0.9.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>1.3.1</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
    <version>1.3.1</version>
</dependency>

什么会导致这种错误?

谢谢卢卡

共有1个答案

林浩漫
2023-03-14

问题很可能是由您在pom中定义的不同Flink版本的混合造成的。xml。为了运行该程序,应该包含以下依赖项:

<!-- Streaming API -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>1.3.1</version>
</dependency>

<!-- In order to execute the program from within your IDE -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.11</artifactId>
    <version>1.3.1</version>
</dependency>

<!-- Kafka connector dependency -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
    <version>1.3.1</version>
</dependency>
 类似资料:
  • 从示例中,我看到了下面的代码片段,它运行良好。但问题是:我并不总是需要处理输入流并将其生成到接收器。 如果我有一个应用程序,根据某些事件,我必须只发布到kafka主题,以便下游应用程序可以做出某些决定。这意味着,我实际上没有输入流,但我只知道当我的应用程序中发生某些事情时,我需要向kafka的特定主题发布消息。也就是说,我只需要一个接收器。 我查看了示例,但没有找到符合我要求的任何内容。有没有一种

  • 安装(下载 这是Flink的默认配置。 关于这里发生了什么事,有什么建议吗?

  • 我是spark streaming的新手,我有一个关于其用法的一般性问题。我目前正在实现一个应用程序,它从一个Kafka主题流式传输数据。 使用应用程序只运行一次批处理是一种常见的场景吗,例如,一天结束,收集主题中的所有数据,做一些聚合和转换等等? 这意味着在用spark-submit启动应用程序后,所有这些东西将在一批中执行,然后应用程序将被关闭。或者spark stream build是为了在

  • 我想在Apache Flink中实现以下场景: 给定一个具有4个分区的Kafka主题,我想使用不同的逻辑在Flink中独立处理分区内数据,具体取决于事件的类型。 特别是,假设输入Kafka主题包含前面图像中描述的事件。每个事件具有不同的结构:分区1具有字段“a”作为关键字,分区2具有字段“b”作为关键字,等等。在Flink中,我希望根据事件应用不同的业务逻辑,所以我认为我应该以某种方式分割流。为了

  • 问题内容: 我正在尝试运行elasticsearch,并使用以下命令尝试放置数据- 但我收到以下错误- 我试图通过增加队列大小- 但是我仍然遇到同样的错误。 问题答案: 您遇到的问题是因为 批量操作队列 已满。 节点ES具有许多线程池,包括通用线程,搜索线程,索引线程,建议线程,批量线程等。在您的情况下,问题在于批量操作队列已满。 尝试调整批量操作的线程池的队列大小: 或减少一次发送的批量操作数量

  • 当我使用Maven执行mvn将文件复制到远程服务器时。 然而,我得到下面的错误信息。我该怎么解决这个问题? 我正在遵循一个例子,如帖子所示https://jarirajari.wordpress.com/2014/06/11/copy-files-and-execute-command-on-a-remote-host-with-maven-antrun-plugin-without-ant-us