当前位置: 首页 > 知识库问答 >
问题:

通过与kafka streams的联接批量处理数据会导致“跳过过期段的记录”`

欧阳杰
2023-03-14

当通过kafka steams应用程序推送批量数据时,我看到它多次记录以下消息。。。

WARNorg.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore-跳过过期段的记录。

...我希望通过leftJoin步骤连接的数据似乎丢失了。

我在实践中看到过这种情况,或者是当我的应用程序关闭了一段时间后又重新启动时,或者是当我使用类似应用程序重置工具的工具试图让应用程序重新处理过去的数据时。

我能够独立地再现这种行为,方法是:向两个主题生成1000条消息,间隔一小时(按照原始时间戳的顺序),然后让Kafka流为它们选择一个键,并尝试左键连接两个重新设置的流。

该复制的独立源代码可在https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java

实际的Kafka流拓扑看起来像这样。

            final StreamsBuilder builder = new StreamsBuilder();
            final KStream<String, String> leftStream = builder.stream(leftTopic);
            final KStream<String, String> rightStream = builder.stream(rightTopic);

            final KStream<String, String> rekeyedLeftStream = leftStream
                    .selectKey((k, v) -> v.substring(0, v.indexOf(":")));

            final KStream<String, String> rekeyedRightStream = rightStream
                    .selectKey((k, v) -> v.substring(0, v.indexOf(":")));

            JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5));

            final KStream<String, String> joined = rekeyedLeftStream.leftJoin(
                    rekeyedRightStream,
                    (left, right) -> left + "/" + right,
                    joinWindow
            );

...我生产的最终产品是这样的。。。

...
523 [523,left/null]
524 [524,left/null, 524,left/524,right]
525 [525,left/525,right]
526 [526,left/null]
527 [527,left/null]
528 [528,left/528,right]
529 [529,left/null]
530 [530,left/null]
531 [531,left/null, 531,left/531,right]
532 [532,left/null]
533 [533,left/null]
534 [534,left/null, 534,left/534,right]
535 [535,left/null]
536 [536,left/null]
537 [537,left/null, 537,left/537,right]
538 [538,left/null]
539 [539,left/null]
540 [540,left/null]
541 [541,left/null]
542 [542,left/null]
543 [543,left/null]
...

...其中,给定输入数据,我希望看到每行结束时两个值合并,而不是正确的值为空。

(注意,我们最初得到每个值的left/null值是可以的/应该的——据我所知,这就是kafka streams left join的预期语义。)

我注意到,如果我在连接窗口上设置一个非常大的宽限期值,问题就解决了,但是因为我提供的输入没有乱序,所以我没想到需要这样做,而且我厌倦了这样做的资源需求在一个体积很大的应用程序上练习。

我的怀疑是,当处理一个分区时,会导致流时间向前推到该分区中的最新消息,这意味着当检查下一个分区时,会发现它包含许多与流时间相比“太旧”的记录。然而,我希望有人能为我指出一个改变这种行为的设置,或者其他一些解决方案,以避免在应用程序处理积压数据时产生不准确的结果,而不会造成巨大的性能开销。


共有1个答案

黄骏喆
2023-03-14

您正在发布消息空间1小时,然后它无法加入
现在您正在使用:

JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5));

将其更改为更高的数量或添加宽限期将允许您处理更多的消息,如果消息相隔1小时,您将有1000条消息,因此值:

JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5)).grace(Duration.ofDays(42));

这是需要的。(因为1000*5h接近42天)
所以您需要根据数据大小调整此值,以便始终能够对所有预期消息执行此操作。

然后我会得到你期望的结果。。。我认为是这样。因为我不确定这里还有其他的空值,但你似乎说它是预期的-没有分析那个部分。正如某些实体所拥有的,其他实体则没有。


 11 [11:left/null, 11:left/11:right]
 12 [12:left/12:right]
 13 [13:left/null, 13:left/13:right]
 14 [14:left/null, 14:left/14:right]
 15 [15:left/null, 15:left/15:right]
 16 [16:left/null, 16:left/16:right]
 17 [17:left/17:right]
 18 [18:left/null, 18:left/18:right]
 19 [19:left/null, 19:left/19:right]
 20 [20:left/null, 20:left/20:right]
 21 [21:left/null, 21:left/21:right]
 22 [22:left/null, 22:left/22:right]
 23 [23:left/null, 23:left/23:right]
 24 [24:left/null, 24:left/24:right]
 25 [25:left/25:right]
 26 [26:left/null, 26:left/26:right]

但是对于所有1000个结果,总是有有效的对预设。

您需要旧数据,因此必须同意旧数据。

但据我所知,有一个非常大的宽限期将是昂贵的

如果你的宽限期比你需要的宽限期大得多,这可能会很昂贵,但在这种情况下,这恰恰是你需要的。除非你能完全避免这样做
正如您在文档中所看到的,grace确实满足了您的需求(或者更确切地说,满足了您的需求,默认值较低):https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#grace-爪哇。时间持续时间-

拒绝在窗口结束后超过afterWindowEnd到达的延迟事件。延迟被定义为(流时间-记录时间戳)。

Altenative soultion是使用更大的窗口,但这看起来不适合您的情况:

JoinWindows joinWindow = JoinWindows.of(Duration.ofDays(42);
 类似资料:
  • 假设我有一个像这样配置的简单任务: 当抛出MyRetryableException时,我重试了15次,但最后我得到了

  • 我正在开发spring-mvc应用程序。 我需要处理超过10万条数据记录。我不能让它依赖于数据库,所以我必须用java实现所有逻辑。 目前,我正在创建多个线程,并将1000条记录分配给每个要处理的线程。 我正在使用org。springframework。行程安排。同时发生的ThreadPoolTaskExecutor(线程池任务执行器)。 列表项 问题: 建议使用的线程数 我应该在线程之间平均分配

  • 我从某个时候起就被这个问题困扰着。 我使用的是spring batch 3.0.7 问题是在ItemWriter中的一个记录中出现org.springframework.dao.DataIntegrityViolationExcue的情况下,组块(组块大小=10)中的其余记录也不会插入到数据库中,即使在提供了skipPolicy之后(对于所有异常返回true)。 我的理解是,如果在提交整个大块时出

  • 我定义了一个块,提交间隔为10,跳过限制为10。处理器类通过应用一些算术运算来操作字段。其中一条记录(比如第6条记录)在处理器类中发生异常。在此之后,再次处理1到5条记录,跳过第6条记录,处理7到10条记录,并将其写入XML(自定义XML编写器类)。由于处理器处理1-5条记录两次,因此预期字段值计算两次是错误的。您能否建议一种解决方案,让处理器只处理一次记录,只跳过失败的记录,并将处理后的记录写入

  • 我有一个单节点,多(3)代理Zookeeper/Kafka设置。我使用的是Kafka0.10 Java客户端。 我写了以下simple remote(在不同于Kafka的服务器上)Producer(在代码中,我用MYIP替换了我的公共IP地址): 这3个代理的服务器属性如下所示(在3个不同的服务器属性文件中,Broker.ID为0、1、2、listeners为plaintext://:9092、p

  • 我最近一直在思考同一个问题,想知道我的令牌解决方案是否有什么重大缺陷: 将过期时间设置为较低值(约15分钟) 每个生成的JWT也被添加到每个用户的“issuedTokens”集合/表中 在 JWT 验证期间,如果过期已过,将从服务器返回“过期”响应(例如,正文中带有“过期”的 401)。当客户端收到此状态时,它应该启动一个刷新过程,该过程将过期的令牌换成新的令牌。 服务器上的刷新endpoint应