我尝试执行Flink(1.12.1)批处理作业,步骤如下:
exception in thread "main" java.lang.IllegalStateException: Detected an UNBOUNDED source with the 'execution.runtime-mode' set to 'BATCH'. This combination is not allowed, please set the 'execution.runtime-mode' to STREAMING or AUTOMATIC
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.shouldExecuteInBatchMode(StreamGraphGenerator.java:335)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:258)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1958)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1943)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1782)
at com.grupotsk.bigdata.matadatapmexporter.MetadataPMExporter.main(MetadataPMExporter.java:33)
public static StreamExecutionEnvironment getBatch() {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.addSource(new MongoSource()).print();
return env;
}
资料来源:
public class MongoSource extends RichSourceFunction<Document> {
private static final long serialVersionUID = 8321722349907219802L;
private MongoClient mongoClient;
private MongoCollection mc;
@Override
public void open(Configuration con) {
mongoClient = new MongoClient(
new MongoClientURI("mongodb://localhost:27017/database"));
mc=mongoClient.getDatabase("database").getCollection("collection");
}
@Override
public void run(SourceContext<Document> ctx) throws Exception {
MongoCursor<Document> itr=mc.find(Document.class).cursor();
while(itr.hasNext())
ctx.collect(itr.next());
this.cancel();
}
@Override
public void cancel() {
mongoClient.close();
}
谢了!
与RuntimeExecutionMode.Batch
一起使用的源代码必须实现Source
而不是SourceFunction
。接收器应该实现sink
而不是sinkfunction
。
有关这些新接口的介绍,请参见将Flink集成到您的生态系统中--如何从头构建Flink连接器。它们在FLIP-27:重构源接口和FLIP-143:统一接收器API中进行了描述。
我想在Apache Flink 1.12中使用Kafka源作为有界数据源,我尝试过使用Flink Kafka消费者连接器,但它给了我以下原因 原因:java.lang.IllegalStateException:检测到一个未绑定的源,execution.runtime模式设置为BATCH。不允许此组合,请在org.apache.flink.util.Preconditions.check状态下将e
众所周知,Flink有两个核心API(数据流/数据集),但当我使用Flink Sql客户端提交作业时,我不需要选择流或批处理模式。所以,Flink SQL客户机是如何决定使用批处理模式和流模式的。我在官方文件中没有找到答案。所以,我想知道Flink SQL客户端如何区分批处理模式和流模式?
11.6 执行系统命令 许多批处理作业可能需要一个外部命令调用内部的批处理作业.这样一个过程可以分开调度,但常见的元数据对运行的优势将会丢失.此外,multi-step 作业需要分割成多个作业. 因此通常的 spring batch提供一个tasklet实现调用系统命令: <bean class="org.springframework.batch.core.step.tasklet.System
问题内容: 我有2个文件夹,每个文件夹包含数十个批处理文件()。 批处理文件包含类似于以下内容的文本 要么 在Java中,我列出了每个文件夹中的每个批处理,并循环浏览列表,执行每个批处理文件,如下所示: 方法返回后,删除批处理文件。 问题是,批处理文件中的所有命令均未运行(我请求删除或删除的文件和文件夹均未运行),并且Java进程只是继续并删除了批处理文件本身。 批处理文件位于文件夹中 写下来之后
我有以下情况 有2个虚拟机正在向Kafka发送流,CEP引擎正在接收这些流,当单个流满足特定条件时,会生成警告。 目前,CEP正在检查两条流上的相同情况(当心率 但我想对这两个流使用不同的条件。例如,如果 如何实现这一点?我需要在同一环境中创建多个流环境或多个模式吗?
问题内容: 我想从Java程序执行批处理文件。 我正在使用以下命令。 但是问题是我想提供一个相对路径而不是绝对路径,以便我可以在任何组件上部署该Java项目。 项目的目录结构如下: 我想从“解析器”目录中的“ Main.java”文件中运行“ util”目录中的“ Server.bat”文件。 问题答案: 当Java运行时,您可以将Runtime.exec()与相对路径一起使用,相对方式是相对于当