当前位置: 首页 > 知识库问答 >
问题:

Flink SQL 不支持 “table.exec.source.idle-timeout” 设置

商骞仕
2023-03-14

我有一个运行FlinkSQL的Flink作业,具有以下设置:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final EnvironmentSettings settings =
        EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

env.setMaxParallelism(env.getParallelism() * 8);
env.getConfig().setAutoWatermarkInterval(config.autowatermarkInterval());

final TableConfig tConfig = tEnv.getConfig();
tConfig.setIdleStateRetention(Duration.ofMinutes(60));

tConfig.getConfiguration().setString("table.exec.source.idle-timeout", "180000 ms");

为了用Kafka源代码在本地测试这一点,我向Flink作业触发了几个事件。Flink UI显示它生成了一个水印。我等待了3分钟,看看水印是否在没有发送新事件(即空闲分区)的情况下前进。但是,没有出现水印提升。

注意:我在本地使用具有三个分区的Kafka代理。我的测试数据被键入,因此被发送到同一个分区。但是,即使其他分区空闲并且等待3分钟,我也不会看到水印前进。

>

  • JOB UI中的任何位置我都可以看到我设置的3分钟值是否真的被拾取了?我是否使用了正确的单位(秒vs ms)

    我还能检查什么来测试这个设置吗?

    我们正在运行Flink 1.12.1。

    更新:我在我的FlinkSQL作业中的异常下看到这个异常:想知道是否有版本不匹配。

    2021-10-26 16:38:14
    java.lang.NoClassDefFoundError: org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest$PartitionData
        at org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.lambda$null$0(OffsetsForLeaderEpochClient.java:52)
        at java.base/java.util.Optional.ifPresent(Unknown Source)
        at org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.lambda$prepareRequest$1(OffsetsForLeaderEpochClient.java:51)
        at java.base/java.util.HashMap.forEach(Unknown Source)
        at org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.prepareRequest(OffsetsForLeaderEpochClient.java:51)
        at org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.prepareRequest(OffsetsForLeaderEpochClient.java:37)
        at org.apache.kafka.clients.consumer.internals.AsyncClient.sendAsyncRequest(AsyncClient.java:37)
        at org.apache.kafka.clients.consumer.internals.Fetcher.lambda$validateOffsetsAsync$5(Fetcher.java:798)
        at java.base/java.util.HashMap.forEach(Unknown Source)
        at org.apache.kafka.clients.consumer.internals.Fetcher.validateOffsetsAsync(Fetcher.java:774)
        at org.apache.kafka.clients.consumer.internals.Fetcher.validateOffsetsIfNeeded(Fetcher.java:498)
        at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2328)
        at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1271)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1235)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168)
        at org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:249)
    
  • 共有1个答案

    连厉刚
    2023-03-14

    问题是此设置在Flink 1.12.0或1.12.1中不起作用。我不得不升级到Flink 1.13.2,设置被接受并按预期工作。

    这一例外只是一种转移视线的行为,并不总是可以重复。

     类似资料:
    • 我尝试在Windows10机器上使用MediaFoundation H264硬件编码器将通过DesktopDupplication API捕获的NV12样本编码为视频流,并在局域网上实时流和渲染。 最初,我在编码器面临太多的缓冲,因为编码器在提供输出样本之前要缓冲最多25帧(GOP大小)。经过一些研究,我发现设置CODECAPI_AVLowLatencyMode可以减少延迟,但代价是质量和带宽。

    • DataSource 接口 不支持 timeout 相关操作 Connection 接口 不支持存储过程,函数,游标的操作 不支持执行 native SQL 不支持 savepoint 相关操作 不支持 Schema/Catalog 的操作 不支持自定义类型映射 Statement 和 PreparedStatement 接口 不支持返回多结果集的语句(即存储过程,非 SELECT 多条数据) 不

    • 我正在使用com.android.tools.build:Gradle:3.1.1和最新的Gradle版本(https://services.Gradle.org/distributions-snapshots/gradle-4.8-20180417000132+0000-all.zip)。 当我使用compileOnly依赖项时,其中有些不能编译,有些会编译。例如。 我的印象比任何依赖项都只能编

    • 我是Android编程新手。当我尝试将材料设计库添加到build.gradle它在实现'com.android.support: appcompat-v7:28.0.0'下显示错误。 错误消息是: 使用groupId和androidx*无法组合,但可以找到和不兼容的依赖项较少。。。(Ctrl-F1) 检查信息:有些库、工具和库的组合不兼容,或可能导致错误。其中一个不兼容之处是编译时使用的Andro

    • 为什么我的应用程序显示为新的HTC Droid DNA不支持的设备,我的清单是这个应用程序只适用于具有这些功能的设备,正如您的应用程序清单中所定义的。屏幕密度:正常,LDPI正常,XHDPI正常,HDPI正常,MDPI小,LDPI小,XHDPI小,HDPI小,MDPI X大,LDPI X大,LDPI X大,HDPI X大,MDPI大,LDPI大,XHDPI大,HDPI大,LDPI大,XHDPI大,

    • 我需要在JAX-WS中通过客户端连接到外部服务器。客户端在Wildfly 8上运行。使用Java8连接是正常的。但我在Java7中连接到服务器时遇到了问题(我尝试了u45、67、79)。服务器端安全性的属性为https://www.ssllabs.com/ssltest/analyze.html?d=app.bundesnetzagentur.de 在部分“密码套件”中有四个密码。源代码不应支持J