当前位置: 首页 > 知识库问答 >
问题:

如何为Flink的批处理执行模式实现有界源?

龚宏壮
2023-03-14

我尝试执行Flink(1.12.1)批处理作业,步骤如下:

    null
    exception in thread "main" java.lang.IllegalStateException: Detected an UNBOUNDED source with the 'execution.runtime-mode' set to 'BATCH'. This combination is not allowed, please set the 'execution.runtime-mode' to STREAMING or AUTOMATIC
        at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
        at org.apache.flink.streaming.api.graph.StreamGraphGenerator.shouldExecuteInBatchMode(StreamGraphGenerator.java:335)
        at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:258)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1958)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1943)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1782)
        at com.grupotsk.bigdata.matadatapmexporter.MetadataPMExporter.main(MetadataPMExporter.java:33)
public static StreamExecutionEnvironment getBatch() {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setRuntimeMode(RuntimeExecutionMode.BATCH);
    
    env.addSource(new MongoSource()).print();
    
    return env;
    
}

资料来源:

public class MongoSource extends RichSourceFunction<Document> {

    private static final long serialVersionUID = 8321722349907219802L;
    private MongoClient mongoClient;
    private MongoCollection mc;
    
    
    @Override
    public void open(Configuration con) {
        mongoClient = new MongoClient(
                new MongoClientURI("mongodb://localhost:27017/database"));
        
        mc=mongoClient.getDatabase("database").getCollection("collection");
        
    }
    
    @Override
    public void run(SourceContext<Document> ctx) throws Exception {
        
        MongoCursor<Document> itr=mc.find(Document.class).cursor();
        while(itr.hasNext())
            ctx.collect(itr.next());
        this.cancel();
        
    }

    @Override
    public void cancel() {
        mongoClient.close();
        
    }

谢了!

共有1个答案

宓诚
2023-03-14

RuntimeExecutionMode.Batch一起使用的源代码必须实现Source而不是SourceFunction。接收器应该实现sink而不是sinkfunction

有关这些新接口的介绍,请参见将Flink集成到您的生态系统中--如何从头构建Flink连接器。它们在FLIP-27:重构源接口和FLIP-143:统一接收器API中进行了描述。

 类似资料:
  • 我想在Apache Flink 1.12中使用Kafka源作为有界数据源,我尝试过使用Flink Kafka消费者连接器,但它给了我以下原因 原因:java.lang.IllegalStateException:检测到一个未绑定的源,execution.runtime模式设置为BATCH。不允许此组合,请在org.apache.flink.util.Preconditions.check状态下将e

  • 众所周知,Flink有两个核心API(数据流/数据集),但当我使用Flink Sql客户端提交作业时,我不需要选择流或批处理模式。所以,Flink SQL客户机是如何决定使用批处理模式和流模式的。我在官方文件中没有找到答案。所以,我想知道Flink SQL客户端如何区分批处理模式和流模式?

  • 11.6 执行系统命令 许多批处理作业可能需要一个外部命令调用内部的批处理作业.这样一个过程可以分开调度,但常见的元数据对运行的优势将会丢失.此外,multi-step 作业需要分割成多个作业. 因此通常的 spring batch提供一个tasklet实现调用系统命令: <bean class="org.springframework.batch.core.step.tasklet.System

  • 问题内容: 我有2个文件夹,每个文件夹包含数十个批处理文件()。 批处理文件包含类似于以下内容的文本 要么 在Java中,我列出了每个文件夹中的每个批处理,并循环浏览列表,执行每个批处理文件,如下所示: 方法返回后,删除批处理文件。 问题是,批处理文件中的所有命令均未运行(我请求删除或删除的文件和文件夹均未运行),并且Java进程只是继续并删除了批处理文件本身。 批处理文件位于文件夹中 写下来之后

  • 问题内容: 我想从Java程序执行批处理文件。 我正在使用以下命令。 但是问题是我想提供一个相对路径而不是绝对路径,以便我可以在任何组件上部署该Java项目。 项目的目录结构如下: 我想从“解析器”目录中的“ Main.java”文件中运行“ util”目录中的“ Server.bat”文件。 问题答案: 当Java运行时,您可以将Runtime.exec()与相对路径一起使用,相对方式是相对于当

  • 我有以下情况 有2个虚拟机正在向Kafka发送流,CEP引擎正在接收这些流,当单个流满足特定条件时,会生成警告。 目前,CEP正在检查两条流上的相同情况(当心率 但我想对这两个流使用不同的条件。例如,如果 如何实现这一点?我需要在同一环境中创建多个流环境或多个模式吗?