这是我的Flink工作流程:
DataStream<FlinkEvent> events = env.addSource( consumer ).flatMap(...).assignTimestampsAndWatermarks( new EventTsExtractor() );
DataStream<SessionStatEvent> sessionEvents = events.keyBy(
new KeySelector<FlinkEvent, Tuple2<String, String> >()
{
@Override
public Tuple2<String, String> getKey( FlinkEvent value ) throws Exception {
return(Tuple2.of( value.getF0(), value.getSessionID ) );
}
} )
.window( TumblingEventTimeWindows.of( Time.minutes( 2 ) ) )
.allowedLateness( Time.seconds( 10 ) )
.aggregate( new SessionStatAggregator(), new SessionStatProcessor() );
/* ... */
sessionEvents.addSink( esSinkBuilder.build() );
首先我遇到了
java.lang.Exception: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Can not forward element to next operator
在< code>flatMap运算符中,任务继续重新启动。我观察到许多重复的结果,相同的键和窗口时间戳具有不同的值。
Q1:我猜重复是因为下游操作员在作业重新启动后重复使用消息。我说得对吗?在修复了异常inchainedOperatorException
问题后,我重新提交了作业。我又在第一个时间窗口中观察到了重复。在那之后,工作似乎进行得很顺利(每个键一个时间窗口中有一个结果)。
Q2:复制品是从哪里来的?
这就是Flink实现精确一次语义学的方法。如果失败,Flink会重放上次成功检查点的事件。重要的是要注意,精确一次意味着影响状态一次,而不是处理/发布事件一次。
回答问题1:是的,每次重新启动都会导致一遍又一遍地处理相同的消息 回答问题2:错误修复后的第一个窗口再次处理这些消息;然后一切都恢复正常。
...一个窗口的每个键应该有一个结果
这不(完全)正确。由于允许的延迟,任何延迟事件(在允许的延迟时间内)将导致相关窗口的延迟(或者换句话说,额外)触发。使用默认的EventTimeTrigger(您似乎正在使用它),每个延迟事件都会导致一个额外的窗口触发,并且会发出一个更新的窗口结果。
问题内容: 我有一个按名称列出的客户表:在SQL中,其中有3列:, 此表中有重复的条目,但 时间戳记 不同。 例如 我想从数据库中消除此问题,并保持第一时间/日期可用。 谢谢。 问题答案: 这有效,请尝试: 在子查询中,它确定哪个记录是每个的第一个记录,然后删除所有其他记录以作重复。我还添加了该子句,该子句返回受该语句影响的行。 您也可以通过使用排名功能来做到这一点: 看看哪一个查询开销较小并使用
我有以下问题。我的应用程序分为两部分:1)第一部分使用AES/CBC(Java)加密一些数据,2)第二部分必须检索数据并解密(Android)。要生成密钥,我使用以下代码 我的程序不需要不同的“源密钥”(字符串密码),但是只要源密钥相同,它就需要计算相同的密钥。不幸的是,程序的两部分生成的密钥不同,解密阶段失败。对如何解决这个问题有什么建议吗?
我找遍了,但没有找到是否可能。 我有一个MySQL查询: 字段id有一个“唯一索引”,所以不能有两个。现在,如果数据库中已经存在相同的id,我想更新它。但我真的必须再次指定所有这些字段吗,比如: 或: 我已经在插入中指定了所有内容。。。 一个额外的注意事项,我想使用周围的工作来获得ID! 我希望有人能告诉我什么是最有效的方法。
我有数据流就像 事件名,事件id,Start_time(时间戳)... 在这里,我想对最后一个带有时间戳的字段<;code>;Start_。 因此,我在flink window中看到的是,所以我猜它需要过去30分钟的事件,但不考虑 我想把数据放在start_ time在最后30分钟内的位置,然后我如何编写转换?我是否需要使用该列使用? 我是Flink的新手。 谢啦
至少一次语义:如果生产者从Kafka代理接收到确认(ack),并且acks=all,则表示消息已准确写入Kafka主题一次。但是,如果生产者确认超时或收到错误,它可能会在假定消息未写入Kafka主题的情况下重试发送消息。如果代理在发送ack之前失败,但在消息成功写入Kafka主题之后失败,则此重试会导致消息被写入两次,并因此多次传递给最终使用者。 我知道时间戳是根据消息从生产者发送的时间设置的。如
问题内容: 我基本上是想将Unix时间戳(time()函数)转换为与过去和将来的日期都兼容的相对日期/时间。因此输出可能是: 2个星期前 1小时60分钟前 15分钟54秒前 10分钟15秒后 首先,我尝试编写此代码,但是做了一个无法维护的巨大功能,然后我在互联网上搜索了几个小时,但我所能找到的只是脚本仅产生一部分时间(例如:“ 1小时前”纪要)。 您是否已经有执行此操作的脚本? 问题答案: 此功能