我试图用Kafka不等式运行一个简单的Apache Flink脚本,但在执行过程中一直存在问题。脚本应该阅读来自Kafka制作人的消息,对其进行详细阐述,然后将处理结果再次发送回其他主题。我从这里得到了这个例子:http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Simple-Flink-Kafka-Test-td4828.html
我的错误是:
Exception in thread "main" java.lang.NoSuchFieldError:ALL
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:86)
at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:429)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:46)
在org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalS treamEnvironment.jav: 33)
这是我的代码:
public class App {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
//properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "javaflink");
DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer010<String>("test", new SimpleStringSchema(), properties));
System.out.println("Step D");
messageStream.map(new MapFunction<String, String>(){
public String map(String value) throws Exception {
// TODO Auto-generated method stub
return "Blablabla " + value;
}
}).addSink(new FlinkKafkaProducer010("localhost:9092", "demo2", new SimpleStringSchema()));
env.execute();
}
}
这些是pom。xml
依赖项:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java_2.11</artifactId>
<version>0.10.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-core</artifactId>
<version>0.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.3.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.3.1</version>
</dependency>
什么会导致这种错误?
谢谢卢卡
问题很可能是由您在pom中定义的不同Flink版本的混合造成的。xml
。为了运行该程序,应该包含以下依赖项:
<!-- Streaming API -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.3.1</version>
</dependency>
<!-- In order to execute the program from within your IDE -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.3.1</version>
</dependency>
<!-- Kafka connector dependency -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.3.1</version>
</dependency>
从示例中,我看到了下面的代码片段,它运行良好。但问题是:我并不总是需要处理输入流并将其生成到接收器。 如果我有一个应用程序,根据某些事件,我必须只发布到kafka主题,以便下游应用程序可以做出某些决定。这意味着,我实际上没有输入流,但我只知道当我的应用程序中发生某些事情时,我需要向kafka的特定主题发布消息。也就是说,我只需要一个接收器。 我查看了示例,但没有找到符合我要求的任何内容。有没有一种
安装(下载 这是Flink的默认配置。 关于这里发生了什么事,有什么建议吗?
我是spark streaming的新手,我有一个关于其用法的一般性问题。我目前正在实现一个应用程序,它从一个Kafka主题流式传输数据。 使用应用程序只运行一次批处理是一种常见的场景吗,例如,一天结束,收集主题中的所有数据,做一些聚合和转换等等? 这意味着在用spark-submit启动应用程序后,所有这些东西将在一批中执行,然后应用程序将被关闭。或者spark stream build是为了在
我想在Apache Flink中实现以下场景: 给定一个具有4个分区的Kafka主题,我想使用不同的逻辑在Flink中独立处理分区内数据,具体取决于事件的类型。 特别是,假设输入Kafka主题包含前面图像中描述的事件。每个事件具有不同的结构:分区1具有字段“a”作为关键字,分区2具有字段“b”作为关键字,等等。在Flink中,我希望根据事件应用不同的业务逻辑,所以我认为我应该以某种方式分割流。为了
我想在Apache Flink中做流媒体工作来做Kafka- 这应该是流式处理。
问题内容: 我正在尝试运行elasticsearch,并使用以下命令尝试放置数据- 但我收到以下错误- 我试图通过增加队列大小- 但是我仍然遇到同样的错误。 问题答案: 您遇到的问题是因为 批量操作队列 已满。 节点ES具有许多线程池,包括通用线程,搜索线程,索引线程,建议线程,批量线程等。在您的情况下,问题在于批量操作队列已满。 尝试调整批量操作的线程池的队列大小: 或减少一次发送的批量操作数量