当前位置: 首页 > 知识库问答 >
问题:

Flink1.5中批处理表API的问题--抱怨需要流式API

常哲彦
2023-03-14

我正在尝试使用Flink1.5.0创建一个面向批处理的Flink作业,并希望使用表和SQL API来处理数据。我的问题是尝试创建BatchTableEnviroment时遇到编译错误

final BatchTableEnvironment bTableEnv = TableEnvironment.getTableEnvironment(bEnv);
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.sources.CsvTableSource;
import org.apache.flink.table.sources.TableSource;

import java.util.Date;


public class BatchJob {

    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
        // create a TableEnvironment for batch queries
        final BatchTableEnvironment bTableEnv = TableEnvironment.getTableEnvironment(bEnv);
    ... do stuff
    // execute program
        bEnv.execute("MY Batch Jon");
    }

我的pom依赖关系如下所示

<dependencies>
        <!-- Apache Flink dependencies -->
        <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>




        <!-- Add connector dependencies here. They must be in the default scope (compile). -->


        <!-- Example:

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        -->

        <!-- Add logging framework, to produce console output when running in the IDE. -->
        <!-- These dependencies are excluded from the application JAR by default. -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>

    </dependencies>

请有人能帮助我理解流式API的依赖关系是什么,以及为什么我需要它来进行批处理作业?非常感谢您的帮助。奥利弗

共有1个答案

蒋鸿文
2023-03-14

Flink的表API和SQL支持是用于批处理和流处理的统一API。许多内部类在批处理和流执行以及Scala/Java表API和SQL之间共享,因此链接到Flink的批处理和流依赖项。

由于这些通用类,批处理查询还需要flink-streaming依赖项。

 类似资料:
  • 我正在使用Ant构建testng jar。在构建过程中,它抱怨说它需要Java1.7,但我已经安装了Java1.8并且设置了JAVA_HOME。当我执行java版本时,它说我已经安装了1.8。 这是构建xml中的目标,它表示提示消息

  • 我在spark streaming应用程序中看到一些失败的批处理,原因是与内存相关的问题,如 无法计算拆分,找不到块输入-0-1464774108087

  • 我在自己的oracle数据库中使用以下链接创建了spring批处理元数据表 https://github.com/spring-projects/spring-batch/tree/main/spring-batch-core/src/main/resources/org/springframework/batch/core 但是当我运行我的Spring批处理时,它会抛出错误: 目前我使用的是sp

  • 问题内容: 对于Java-JDBC API和Oracle数据库,我有一个稍微独特的要求。我将autoCommit设置为默认值,这对于Oracle是正确的,并且我使用的示例与此链接相似。 但是,当我添加说1000批次时,可以说每个批次都是插入的。并且让我们假设大约20条记录违反了某些约束,我希望其余980条变为COMMITTED(并且以后对使用任何其他连接的任何其他查询都可见)到数据库,并忽略20条

  • 我想使用不同的模式来保存Spring批处理表。我可以在<code>JobRepositoryFactoryBean<code>中看到我的新数据源。但这些表仍然是在另一个shcema中创建的,在那里我有业务表。我在这里读到了可以使用<code>数据源的soemwhere。setValidationQuery来更改模式,但仍然不起作用。我能解决这个问题。下面是<code>JobRepositoryFa

  • 我一直在尝试以多种不同的方式在windows上构建fltk,但最终我总是得到: 配置:错误:C编译器无法创建可执行文件(msys fltk/配置) 或 检查C编译器是否正常工作:C:/MinGW/bin/gcc.exe--已损坏(CMake-gui) 我用fltk 1.3.0和1.3.3试用过它,我甚至用过三个不同的MinGW发行版,GCC版本:。但它无法编译。以下是由生成的一些错误日志/配置文件