java prettyprint-override">stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Request>() {
@Override
public long extractAscendingTimestamp(Request req) {
return req.ts;
}
})
.windowAll(TumblingEventTimeWindows.of(Time.days(1)))
.apply((TimeWindow window, Iterable<Request> iterable, Collector<String> collector) -> {
collector.collect("Window: " + window.toString());
for (Request req : iterable) {
collector.collect(req.toString());
}
})
.print()
因此,我将AscendingTimeStampExtractor
替换为BoundedOutoFordernessGenerator
,就像在这个文档示例中一样(具有更高的maxOutOfOrderness延迟),以便处理乱序事件,但我仍然无法获得任何输出。这是为什么?
检查事件时间戳是否正常。必须有13的长度才能在Flink Java Epoch中使用。
更正:1563743505673
错误:1563743505
以下是我从Kafka的一个话题中对消费信息的理解。 使用者组负责从单个主题读取信息。如果一个主题有 5 个分区,并且使用者组中有 5 个使用者,则每个使用者从一个整个分区读取信息。如果我在使用者组中添加另一个使用者,则新添加的使用者将处于空闲状态。 与其将新消费者置于空闲状态,为什么Kafak不允许从已经被不同消费者消费的分区中消费信息?如果发生这种情况,将会有更多的并行性。 所以在jist中:在
我的工作是做以下事情: 根据事件时间使用Kafka主题中的事件 计算7天的窗口大小,以1天的幻灯片显示 将结果放入Redis 我有几个问题: 如果它从最近的记录中消耗Kafka事件,在作业存活1天后,作业关闭窗口并计算7天窗口。问题是作业只有1天的数据,因此结果是错误的。 如果我尝试让它从7天前的时间戳中消耗Kafka事件,当作业开始时,它从第一天开始计算整个窗口,并且花费了很多时间。另外,我只想
我在一个Kafka主题“原始数据”中获取CSV,目标是通过在另一个主题“数据”中发送具有正确时间戳(每行不同)的每行来转换它们。 null 我想通过直接设置时间戳来删除这个“内部”主题的使用,但我找不到一个方法(时间戳提取器只在消耗时间使用)。 我在文档中偶然发现了这一行: 请注意,通过调用#forward()时显式地为输出记录分配时间戳,可以在处理器API中更改description默认行为。
我有一个函数: 输出将为“x 天,0:00:00”。如何只输出没有小时,分钟和秒的天?我可以将日差与整数相乘而不取“天数”吗?*抱歉我的英语不好
这是一个场景:我知道,使用与Spring kafka相关的最新API(如Spring集成kafka 2.10),我们可以执行以下操作: 以及来自与相同kafka主题相关的不同分区的读取。 我想知道我们是否可以使用同样的方法,例如spsping-集成-Kafka1.3.1 我没有找到任何关于如何做到这一点的提示(我对xml版本很感兴趣)。
我的消费者并不是每次都能收到信息。我有3个代理(3个服务器)的Kafka集群,有3个主题和复制因子3的分区。 我有Java中的消费者,我将最大轮询记录设置在50000获取字节上,配置在50MB上。应用程序每分钟都进行轮询。当我向主题“my-topic”发送10条消息时,consumer不会给我所有的消息,而是只给我其中的一部分,其余的将在下一次运行中给我。消息是在applicatin睡眠期间由脚本