当前位置: 首页 > 知识库问答 >
问题:

初始化状态导致此错误“java.lang.NullPointerException:Keyed state只能在'Keyed stream'上使用”

李嘉胜
2023-03-14

我试图初始化CoProcessFunction中的“ListState”,但它不断抛出这个错误“java.lang.NullPointerException:Keyed state只能用于'Keyed stream'”“。

这是我用的密码

    dataStream1.keyBy(ele->ele.f1).connect(dataStream2).process(
        new CoProcessFunction<Tuple2<Long, String>, String, Object>() {
            ListState stream1ListState;
            
            @Override
            public void open(Configuration parameters) throws Exception {
                LOG.info("inside CoProcess Function Constructor");
                ListStateDescriptor<Tuple2<Long,String>> listStateDescriptor = new
                    ListStateDescriptor<Tuple2<Long,String>>("type1",TypeInformation.of(new TypeHint<Tuple2<Long,String>>(){}));
                stream1ListState = getRuntimeContext().getListState(listStateDescriptor);
            }
            
            @Override
            public void processElement1(Tuple2<Long, String> longStringTuple2, Context context,
                Collector<Object> collector) throws Exception {
                LOG.info("inside stream1 processor");
                LOG.info(longStringTuple2.toString());
                collector.collect(longStringTuple2);
            }
            
            @Override
            public void processElement2(String s, Context context, Collector<Object> collector)
                throws Exception {
                LOG.info("inside stream2 processor");
                LOG.info(s);
            }
        }).print();

引发此错误的行是

stream1ListState = getRuntimeContext().getListState(listStateDescriptor);

错误日志跟踪的其余部分如下所示

Caused by: java.lang.NullPointerException: Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.
    at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:76)
    at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.checkPreconditionsAndGetKeyedStateStore(StreamingRuntimeContext.java:232)
    at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:202)

共有1个答案

司马飞鸿
2023-03-14

该错误意味着您只能在KeyedCoProcessFunction内部使用ListStateValueState等,而这些内容只能在两个流都被键控时使用。因此,如果第二个流是正常流,并且可以通过某个键进行键控,那么您可以这样做,否则您可能需要参考这里所述的广播状态模式(Broadcast state pattern)。

 类似资料:
  • 问题内容: 考虑: 如何做到这一点,以便可以从其他方法调用数组? 问题答案: 这意味着您必须像这样初始化它: 如果要使用数组初始化器,则不能拆分声明和赋值。

  • 我正在尝试为大学的一个项目和几个学生一起建立一个Springmvc应用程序,这是一本基于网络的烹饪书。 虽然Hello World工作正常,但我们使用hibernate存储数据的代码出现了一些困难的异常。 域类: 服务类: 控制器类: 我们使用基于eclipse的Spring工具包sdk并在启动过程中获得以下异常: VMware vFabric tc Runtime 2.9.3.Release/7

  • 本文向大家介绍react-native 初始化状态,包括了react-native 初始化状态的使用技巧和注意事项,需要的朋友参考一下 示例 您应该像这样在组件的构造函数内部初始化状态: 使用setState可以更新视图。

  • 问题内容: 我正在使用jQuery AJAX对Web服务进行查询。我的查询如下所示: 执行此操作时,我收到状态错误403。我不明白为什么我的通话导致状态代码为403。我控制着服务的安全性,并将其标记为完全开放。我知道密钥是有效的,因为我在另一个可以使用的电话中使用了它。这是有效的呼叫: 我知道这是两个不同的端点。但是我100%确信这不是服务器端身份验证或权限错误。再一次,一切在服务器端都是开放的。

  • 问题内容: 我是使用log4j软件包的新手,但看不到错误:这是一个非常简单明了的代码示例: 当我尝试编译时,出现此错误: my.package.logging.TestLogger.main(TestLogger.java:15)上的org.apache.logging.log4j.LogManager.getLogger(LogManager.java:129)处的线程“ main”中的java

  • 我有一个EJB无状态会话Bean。我有以下要求: 这个无状态EJB应该在启动时初始化 初始化代码应该对数据库进行事务性访问 问题是: @Startup仅适用于@Singleton EJB @PostConstruct注释(至少在WebSphere上)在这一点上没有事务性上下文,所以初始化代码在这里爆炸! 可能的解决方案? 使用JavaEE定时器,但它似乎是为周期性执行而设计的。我只想在零点执行一次