我希望Spark 1.6的新mapWithState API几乎可以立即删除超时的对象,但有一个延迟。
我正在使用下面经过修改的JavaStatefulNetworkWordCount版本测试API:
SparkConf sparkConf = new SparkConf()
.setAppName("JavaStatefulNetworkWordCount")
.setMaster("local[*]");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
ssc.checkpoint("./tmp");
StateSpec<String, Integer, Integer, Tuple2<String, Integer>> mappingFunc =
StateSpec.function((word, one, state) -> {
if (state.isTimingOut())
{
System.out.println("Timing out the word: " + word);
return new Tuple2<String,Integer>(word, state.get());
}
else
{
int sum = one.or(0) + (state.exists() ? state.get() : 0);
Tuple2<String, Integer> output = new Tuple2<String, Integer>(word, sum);
state.update(sum);
return output;
}
});
JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream =
ssc.socketTextStream(args[0], Integer.parseInt(args[1]),
StorageLevels.MEMORY_AND_DISK_SER_2)
.flatMap(x -> Arrays.asList(SPACE.split(x)))
.mapToPair(w -> new Tuple2<String, Integer>(w, 1))
.mapWithState(mappingFunc.timeout(Durations.seconds(5)));
stateDstream.stateSnapshots().print();
一起nc(nc-l-p
当我在nc窗口中键入一个单词时,我看到每秒都在控制台中打印元组。但是,根据超时设置,超时消息似乎不会在5s后打印出来。元组过期所需的时间似乎在5秒之间变化
我是否遗漏了一些配置选项,或者超时可能只在检查点同时执行?
我是否遗漏了一些配置选项,或者超时可能只与快照同时执行?
每次调用mapWithState
(使用您的配置,大约每1秒一次),MapWithStateRDD
将在内部检查过期记录并将其超时。您可以在代码中看到:
// Get the timed out state records, call the mapping function on each and collect the
// data returned
if (removeTimedoutData && timeoutThresholdTime.isDefined) {
newStateMap.getByTime(timeoutThresholdTime.get).foreach { case (key, state, _) =>
wrappedState.wrapTimingOutState(state)
val returned = mappingFunction(batchTime, key, None, wrappedState)
mappedData ++= returned
newStateMap.remove(key)
}
}
(除了执行每个作业所需的时间之外,newStateMap.remove(key)
实际上只标记要删除的文件。有关更多信息,请参见“编辑”。)
你必须考虑到每一个阶段被安排所需的时间,以及每一个阶段的执行实际轮流运行所需的时间。这是不准确的,因为它作为一个分布式系统运行,其他因素可能会发挥作用,使您的超时比您预期的更准确/更不准确。
正如@etov正确指出的,newStateMap。remove(key)
实际上并不从OpenHashMapBasedStateMap[K,S]
中删除元素,只需将其标记为删除即可。这也是你看到过期时间增加的原因。
实际相关的代码如下:
// Write the data in the parent state html" target="_blank">map while
// copying the data into a new parent map for compaction (if needed)
val doCompaction = shouldCompact
val newParentSessionStore = if (doCompaction) {
val initCapacity = if (approxSize > 0) approxSize else 64
new OpenHashMapBasedStateMap[K, S](initialCapacity = initCapacity, deltaChainThreshold)
} else { null }
val iterOfActiveSessions = parentStateMap.getAll()
var parentSessionCount = 0
// First write the approximate size of the data to be written, so that readObject can
// allocate appropriately sized OpenHashMap.
outputStream.writeInt(approxSize)
while(iterOfActiveSessions.hasNext) {
parentSessionCount += 1
val (key, state, updateTime) = iterOfActiveSessions.next()
outputStream.writeObject(key)
outputStream.writeObject(state)
outputStream.writeLong(updateTime)
if (doCompaction) {
newParentSessionStore.deltaMap.update(
key, StateInfo(state, updateTime, deleted = false))
}
}
// Write the final limit marking object with the correct count of records written.
val limiterObj = new LimitMarker(parentSessionCount)
outputStream.writeObject(limiterObj)
if (doCompaction) {
parentStateMap = newParentSessionStore
}
如果deltaMap
应该被压缩(用doCompaction
变量标记),那么(并且只有这样)从所有已删除的实例中清除映射。这种情况发生的频率有多高?
val DELTA_CHAIN_LENGTH_THRESHOLD = 20
这意味着增量链的长度超过20个项,并且有一些项已被标记为删除。
一旦事件超时,它不会立即被删除,但只会通过将其保存到“deltaMap”来标记为删除:
override def remove(key: K): Unit = {
val stateInfo = deltaMap(key)
if (stateInfo != null) {
stateInfo.markDeleted()
} else {
val newInfo = new StateInfo[S](deleted = true)
deltaMap.update(key, newInfo)
}
}
然后,超时事件仅在检查点被收集并发送到输出流。也就是说:在批处理t超时的事件仅在下一个检查点出现在输出流中——默认情况下,平均每隔5个批处理间隔,即批处理t 5:
override def checkpoint(): Unit = {
super.checkpoint()
doFullScan = true
}
...
removeTimedoutData = doFullScan // remove timedout data only when full scan is enabled
...
// Get the timed out state records, call the mapping function on each and collect the
// data returned
if (removeTimedoutData && timeoutThresholdTime.isDefined) {
...
只有当有足够多的元素时,并且当状态映射被序列化时,元素才会被删除——这目前也只发生在检查点:
/** Whether the delta chain length is long enough that it should be compacted */
def shouldCompact: Boolean = {
deltaChainLength >= deltaChainThreshold
}
// Write the data in the parent state map while copying the data into a new parent map for
// compaction (if needed)
val doCompaction = shouldCompact
...
默认情况下,检查点每10次迭代发生一次,因此在上面的示例中每10秒发生一次;由于超时为5秒,事件预计在5-15秒内发生。
编辑:根据@YuvalItzchakov的评论更正并详细说明了答案
我目前在react native中有一个函数,它执行以下操作: 我运行上述命令,可以确认arrayId和title变量有效并包含数据。arrayId也不是“selectProduct”。我在调试时在那里添加了一个console.log,以确保它运行,事实上确实如此。我期望的行为是状态立即更新。 但是,所选下拉列表的状态不会更新。在this.setState更新之后添加:console.log(th
描述 我有一个小应用程序,它使用hook更新状态,但每次更新时,都会导致页面延迟。我指的是实际的延迟,而不仅仅是“等待异步”延迟。 我的理论是,更新状态会重新呈现太多的组件,因为如果我将状态减少到更少的值,滞后就会消失。 从本质上说,我担心我更新状态的方式没有隔离我想要的值。 密码 我将回购加载到CodeSandbox:https://codesandbox.io/s/long-forest-y9
问题内容: 在一些脚本中,我可以找到例如 而不是简单 立即调用$ timeout的目的是什么? 问题答案: 这是一个hack。:)但通常的目的是等待周期结束,然后设置为。完成所有监视后,将调用超时。
问题内容: 我相信只有两种使用Hibernate加载对象的方法,即延迟加载和一种渴望加载。延迟加载有其自身的优势,它不会加载很多对象,而只是在需要时才加载它们。我还了解到,如果您想强制为一个对象加载所有子代,则只需调用即可。假设我们有以下对象 假设我们有一些客户在我们的系统中有订单,并且该订单可能不止一个甚至为空。所以我的问题是,在这种情况下始终使用渴望加载会更好吗?我们需要与客户相关的订单的大小
问题内容: 我相信只有两种使用Hibernate加载对象的方法,即延迟加载和一种渴望加载。延迟加载有其自身的优势,它不会加载很多对象,而只是在需要时才加载它们。我还了解到,如果您想强制为一个对象加载所有子代,则只需调用即可。假设我们有以下对象 假设我们的客户在我们的系统中有订单,并且该订单可能不止一个甚至为空。所以我的问题是,在这种情况下始终使用渴望加载会更好吗?我们需要与客户相关的订单的大小或一
我是AnyLogic的新手,正在构建一个学生服务模拟,在其中我认为如果学生带着简单的问题来到服务中心,工作人员可以快速解决,否则会花费工作人员更多的时间。我使用statechart来实现学生的问题类型:在此处输入图像描述 我将相应的延迟时间设置为:在此处输入图像描述 一旦我尝试构建,编译器给我一个错误:在这里输入图像描述 这是什么意思?有人能告诉我怎么解决吗?