当前位置: 首页 > 知识库问答 >
问题:

Flink可查询状态错误

赵星华
2023-03-14

我试图在Flink(版本1.4.2)上使用可查询状态,但不幸的是,我一直收到以下错误:

INFO  my.test.flink.QueryableState  - Params are a96438fa12879b7598c9cf32684e2669, kafka-cluster_jobmanager_1, 6123
INFO  my.test.flink.QueryableState  - Before the call java.util.concurrent.CompletableFuture@26aa12dd[Not completed]
java.util.concurrent.ExecutionException: java.lang.IndexOutOfBoundsException: readerIndex(0) + length(4) exceeds writerIndex(0): PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 0)
        at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
        at my.test.flink.QueryableState.main(QueryableState.java:67)
Caused by: java.lang.IndexOutOfBoundsException: readerIndex(0) + length(4) exceeds writerIndex(0): PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 0)
        at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1166)
        at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.readInt(AbstractByteBuf.java:619)
        at org.apache.flink.queryablestate.network.messages.MessageSerializer.deserializeHeader(MessageSerializer.java:231)
        at org.apache.flink.queryablestate.network.ClientHandler.channelRead(ClientHandler.java:76)
        at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
        at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
        at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
        at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
        at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
        at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
        at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
        at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
        at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        at java.lang.Thread.run(Thread.java:745)

在客户端,我使用flink-queryable-state-client-java_2_11.jar可查询客户端的相关代码部分是

QueryableStateClient client = new QueryableStateClient(jobManagerHost, jobManagerPort);

TypeInformation<MyEvent> typeInformation = TypeInformation.of(new TypeHint<MyEvent>() {});
ListStateDescriptor<MyEvent> descriptor = new ListStateDescriptor<MyEvent>("myEvents",
                 typeInformation.createSerializer(new ExecutionConfig()));

CompletableFuture<ListState<MyEvent>> resultFuture =
                        client.getKvState(JobID.fromHexString(jobIdParam),"myEvents", "1", 
                        BasicTypeInfo.STRING_TYPE_INFO , descriptor );

logger.info("Before the call " + resultFuture);
try {
         logger.info("Finished"+ resultFuture.get());
 } catch(Exception ex) {
         ex.printStackTrace();
 }

最后,在Flink上运行的作业配置了一个ListState,如下所示。请注意,数据在ListState上由String键控

        TypeInformation<MyEvent> typeInformation = TypeInformation.of(new TypeHint<MyEvent>() {});
        ListStateDescriptor<MyEvent> eventState = 
                new ListStateDescriptor<MyEvent>("myEvents",typeInformation);
        eventState.setQueryable("myEvents");
        eventListState = getRuntimeContext().getListState(eventState);

在我看来,这似乎是一个序列化错误,但我不知道我需要做什么来修复它。有人知道上面的代码可能有什么问题吗?我错过什么了吗?

共有1个答案

郑功
2023-03-14

我在更新Flink 1.4的可查询状态演示时遇到了完全相同的问题。如果我没记错的话,重要的部分是正确处理可完成的未来——你不能直接调用get()。

请参阅工作示例的代码,其关键部分如下所示:

try {

    CompletableFuture<FoldingState<BumpEvent, Long>> resultFuture =
      client.getKvState(jobId, EventCountJob.ITEM_COUNTS, key, 
      BasicTypeInfo.STRING_TYPE_INFO, countingState);

    resultFuture.thenAccept(response -> {
      try {
        Long count = response.get();
        // now we could do something with the value
      } catch (Exception e) {
        e.printStackTrace();
      }
    });

    resultFuture.get(5, TimeUnit.SECONDS);

} catch (Exception e) {
  e.printStackTrace();
}
 类似资料:
  • 2.11 状态码查询 2.11.1 描述 通过调用该接口查询频道在指定时间段内的状态码统计。 2.11.2 请求地址 地址: https://api.bokecs.com/channel/queryHttpCode 2.11.3 请求方式 POST 2.11.4 请求参数 1) 请求入参 { "domainName":"", "beginTime":"", "endT

  • 2.19 刷新状态查询 2.19.1 描述 返回刷新进度 2.19.2 请求地址 地址: https://api.bokecs.com/queryrefresh/{package_id} 2.19.3 请求方式 GET 2.19.4 请求参数 参数名称 是否必须 参数描述 package_id 是 刷新接口中code为200时返回的message 2.19.5 请求格式 json 2.19.6 举

  • 我有一个这样的SQL问题 根据用户输入,我想将botcode='r1'更改为给定输入。在不重新启动作业的情况下说出botcode='r10'。有没有办法做到这一点。我在flink 1.7上使用stream env。我尝试配置流来读取输入。但仍停留在如何动态更改查询上。有人能帮我吗?提前谢谢

  • 本文介绍 TiDB Data Migration (DM) query-status 命令的查询结果、任务状态与子任务状态。 查询结果 » query-status { "result": true, # 查询是否成功 "msg": "", # 查询失败原因描述 "tasks": [ # 迁移 task 列表 {

  • 描述 用于平台方查询申请状态,可以根据返回结果判断是否申请是否满足资金提供方的要求,如果已满足要求(响应参数中can_push=Y时),则可以调用 融资申请推送API 尝试推送申请到资金提供方审核 。 API代码 loan_app:app:sts 请求参数 名称 类型 是否必须 最大长度 描述 示例值 app_id String 是 50 申请ID(融资申请创建API返回的结果) 00927284

  • 用于协议2.6付款码支付、协议2.4二维码支付、2.9多码合一二维码查询支付状态 请求参数说明 参数 描述 必填 示例值 类型 最大长度 action 接口参数组 是 object └action 需要调用的接口名称 是 cash_pay_query string get GET参数组,本组参数需要参与签名 是 object └payid 支付流水号,参数payid/bzid/token三选一 否