我尝试使用Spark Streaming并希望有一个全局状态对象,可以在每个批处理后更新。据我所知,至少有两种选择适合我:1。使用mapwithstate
,其中Spark将在处理每个批处理后自动更新状态2。使用updateStateByKey
函数,在这里我必须自己调用更新
Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc =
new Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> call(String word, Optional<Integer> one,
State<Integer> state) {
int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
Tuple2<String, Integer> output = new Tuple2<>(word, sum);
state.update(sum);
return output;
}
};
JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream =
wordsDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD));;
Tuple2<String, Long> output = new Tuple2<>(word, sum);
state.update(sum);
return new String("Test");
}
});
JavaPairDStream<String, Long> runningCounts = processed.updateStateByKey(new Function2<List<Long>, Optional<Long>, Optional<Long>>() {
public Optional<Long> call(List<Long> values, Optional<Long> state) {
Long newSum = state.orElse((long)0);
for(long x : values){
newSum +=x;
}
return Optional.of(newSum);
});
类型javapairdStream
,optional
,optional
>)不适用于参数(new function2
<list,optional
我的导入的快照如下:
import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.Function3; import org.apache.spark.api.java.function.Function4; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.api.java.function.VoidFunction2;
我希望有人能帮我找出我的错误。
还有一点需要补充,如果您使用的是最新的Spark2.3.0版本,那么使用下面的包导入可选,以解决相同的问题。
Java代码:
import org.apache.spark.api.java.Optional;
选择无状态滑动窗口操作的一些注意事项是什么(例如,通过updateStateByKey或新mapStateByKey)选择保持状态(例如通过updateStateByKey或新mapStateByKey)时,使用火花流处理连续的有限事件会话流? 例如,考虑以下场景: 一种可穿戴设备跟踪由穿戴者进行的体育锻炼。该装置自动检测何时开始锻炼,并发出信息;在锻炼过程中发出附加信息(如心率);最后,当练习完
它没有任何错误,我得到以下错误时,我运行火花提交,任何帮助都非常感谢。谢谢你抽出时间。 线程“main”java.lang.noClassDeffounderror:org/apache/spark/streaming/kafka/kafkautils在kafkasparkstreaming.sparkstreamingtest(kafkasparkstreaming.java:40)在kafka
我尝试过用这个方法来计算累积值,但是如果日期字段与累积字段中的值相同,那么有人能提出类似于这个问题的解决方案吗
我试图从聚合原理的角度来理解火花流。Spark DF 基于迷你批次,计算在特定时间窗口内出现的迷你批次上完成。 假设我们有数据作为- 然后首先对Window_period_1进行计算,然后对Window_period_2进行计算。如果我需要将新的传入数据与历史数据一起使用,比如说Window_priod_new与Window_pperid_1和Window_perid_2的数据之间的分组函数,我该
我正在尝试使用python库Tweepy来传输twitter数据。我设置了工作环境,谷歌了一下这些东西,但是我不知道它们是如何工作的。我想在python (tweepy)中使用spark streaming(DStream-Batch processing)。我至少经历了以下环节: < li >如何获取tweepy中某个位置的特定标签的推文? < Li > http://spark . Apach
我在使用 Spark 流式处理示例时遇到问题:https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala 当我尝试使用 SBT 启动它时 我有这个例外 我确定该目录存在于Hadoop fs上,我甚至在那里复制了一