当前位置: 首页 > 知识库问答 >
问题:

火花流中的状态函数问题

那利
2023-03-14

我尝试使用Spark Streaming并希望有一个全局状态对象,可以在每个批处理后更新。据我所知,至少有两种选择适合我:1。使用mapwithstate,其中Spark将在处理每个批处理后自动更新状态2。使用updateStateByKey函数,在这里我必须自己调用更新

    Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc =
    new Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>>() {
      @Override
      public Tuple2<String, Integer> call(String word, Optional<Integer> one,
          State<Integer> state) {
        int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
        Tuple2<String, Integer> output = new Tuple2<>(word, sum);
        state.update(sum);
        return output;
      }
    };


    JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream =
    wordsDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD));;
               Tuple2<String, Long> output = new Tuple2<>(word, sum);
               state.update(sum);
               return new String("Test");
        }
    });  
JavaPairDStream<String, Long> runningCounts = processed.updateStateByKey(new Function2<List<Long>, Optional<Long>, Optional<Long>>() {
            public Optional<Long> call(List<Long> values, Optional<Long> state) {
                Long newSum = state.orElse((long)0);
                for(long x : values){
                    newSum +=x;
                }
                return Optional.of(newSum);
              });

类型javapairdStream 中的方法updateStateByKey(Function2 ,optional ,optional >)不适用于参数(new function2 <list,optional ,optional ,optional >(){})

我的导入的快照如下:

  
   
    import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.Function3;
import org.apache.spark.api.java.function.Function4;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.api.java.function.VoidFunction2;

   
  

我希望有人能帮我找出我的错误。


共有1个答案

胥和悌
2023-03-14

还有一点需要补充,如果您使用的是最新的Spark2.3.0版本,那么使用下面的包导入可选,以解决相同的问题。

Java代码:

import org.apache.spark.api.java.Optional;
 类似资料:
  • 选择无状态滑动窗口操作的一些注意事项是什么(例如,通过updateStateByKey或新mapStateByKey)选择保持状态(例如通过updateStateByKey或新mapStateByKey)时,使用火花流处理连续的有限事件会话流? 例如,考虑以下场景: 一种可穿戴设备跟踪由穿戴者进行的体育锻炼。该装置自动检测何时开始锻炼,并发出信息;在锻炼过程中发出附加信息(如心率);最后,当练习完

  • 它没有任何错误,我得到以下错误时,我运行火花提交,任何帮助都非常感谢。谢谢你抽出时间。 线程“main”java.lang.noClassDeffounderror:org/apache/spark/streaming/kafka/kafkautils在kafkasparkstreaming.sparkstreamingtest(kafkasparkstreaming.java:40)在kafka

  • 我尝试过用这个方法来计算累积值,但是如果日期字段与累积字段中的值相同,那么有人能提出类似于这个问题的解决方案吗

  • 我试图从聚合原理的角度来理解火花流。Spark DF 基于迷你批次,计算在特定时间窗口内出现的迷你批次上完成。 假设我们有数据作为- 然后首先对Window_period_1进行计算,然后对Window_period_2进行计算。如果我需要将新的传入数据与历史数据一起使用,比如说Window_priod_new与Window_pperid_1和Window_perid_2的数据之间的分组函数,我该

  • 我正在尝试使用python库Tweepy来传输twitter数据。我设置了工作环境,谷歌了一下这些东西,但是我不知道它们是如何工作的。我想在python (tweepy)中使用spark streaming(DStream-Batch processing)。我至少经历了以下环节: < li >如何获取tweepy中某个位置的特定标签的推文? < Li > http://spark . Apach

  • 我在使用 Spark 流式处理示例时遇到问题:https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala 当我尝试使用 SBT 启动它时 我有这个例外 我确定该目录存在于Hadoop fs上,我甚至在那里复制了一