当前位置: 首页 > 知识库问答 >
问题:

kafka-streams:如果缺少单个元素,则重试完成消息连接

景书
2023-03-14
{
  "order_id": 123456789,
  "user_id": 987654,
  "placed_at": "2020-07-20T11:31:00",
  "amount": 5.79,
  "items" : [
     {"item_id": 13579, "quantity": 1, "price": 1.23},
     {"item_id": 24680, "quantity": 1, "price": 4.56}
  ]
}

我当前的方法是从placements获取传入消息,将其拆分为N个消息(其中N是item_id数组的长度),对item_id执行左联接以添加项目描述,然后将结果流分组到order_id(丰富的拆分消息的关键字)上,以重构完整消息。

问题是描述可能会延迟几秒钟到达,所以在一些罕见的情况下,我得到的重构消息的条目比原始未充实的少。

我已经在自定义联接示例中看到了这种方法。对我的案子来说挺好的,可惜不完全适合。的确,在我的案例中,如果缺少单个项目的描述,那么完整的消息应该被延迟。目前我不知道在这种情况下该如何进行。欢迎任何建议。

共有1个答案

漆雕硕
2023-03-14

在仔细分析了自定义联接示例之后,解决方案是稍微改变一下它的逻辑。

下面是示例的节选:

private static final class StreamTableJoinStreamSideLogic
      implements TransformerSupplier<String, Double, KeyValue<String, Pair<Double, Long>>> {

/* ... */

private KeyValue<String, Pair<Double, Long>> sendFullJoinRecordOrWaitForTableSide(final String key,
                                                                                          final Double value,
                                                                                          final long streamRecordTimestamp) {
          final ValueAndTimestamp<Long> tableValue = tableStore.get(key);
          if (tableValue != null &&
              withinAcceptableBounds(Instant.ofEpochMilli(tableValue.timestamp()), Instant.ofEpochMilli(streamRecordTimestamp))) {
            final KeyValue<String, Pair<Double, Long>> joinRecord = KeyValue.pair(key, new Pair<>(value, tableValue.value()));
            LOG.info("Table data available for key {}, sending fully populated join message {}", key, joinRecord);
            return joinRecord;
          } else {
            LOG.info("Table data unavailable for key {}, sending the join result as null", key);
            return KeyValue.pair(key, new Pair<>(value, null));
          }
        }

/* ... */

}

特别是,需要修改SendFullJoinRecordorWaitForTableSide()方法,以便以全或全的方式将相同的逻辑应用于Items

 类似资料:
  • 我有一个集成应用程序,大部分工作,但注意到昨天一个消息丢失了。当时,service-activatorendpoint正忙于处理先前的消息。 以下是适用于该问题的配置。

  • 我的Kafka消费者的代码是这样的 我已经意识到,这种消费者设置无法读取所有信息。我无法再现这一点,因为这是一个间歇性的问题。 当我使用 将最后 100 条消息与此消费者进行比较时,我发现我的消费者间歇性地随机错过了几条消息。我的消费者有什么问题? 在python中使用消息的方法太多了。应该有一种最好只有一种明显的方法来做到这一点。

  • 使用Kafka Streams,我们无法确定在处理写入接收器主题的消息后压缩这些消息所需的配置。 另一方面,使用经典的Kafka Producer,可以通过在KafkaProducer属性上设置配置“compression.type”轻松实现压缩 然而,似乎没有任何记录在案的Kafka Streams压缩处理过的消息的例子。 至于这次(2019年初),有没有一种方法可以使用Kafka流进行压缩?

  • 我有一个詹金斯声纳装置,还有一个maven项目。这个maven项目只有一个pom,但包含java、javascript、html等。所以对于sonar来说,它是一个多模块项目,所以我可以获得每个部分的统计数据。 我们还希望获得项目的代码覆盖率分析,这意味着我们需要在声纳分析之前运行测试并导出它们(至少从我收集的声纳跑步者不能做这种类型的分析)。 因此,我把詹金斯的工作设置为第一步做,然后运行调用独

  • 因此,这里的任务是创建一个LinkedList类,如果已经搜索了节点,它会将节点移动到列表的前面(第一个)。这里的第一张图片是当它被调用时: 第二个图像是当这被称为: 所以目标是在调用find并找到包含数据的节点时:将其搜索布尔值设置为true,并将该节点移动到列表的前面。到目前为止,我的insert只是将新节点放在列表的前面。insert and find中的注释解释了我想让它们做什么。然而,将

  • 我试图使用python selenium实现一些自动化功能,但遇到了一些奇怪的行为。 html的总体布局: 现在,每个iframe实际上都有相同的内部html,网站上的代码似乎是随机选择哪个iframe得到了显示="块"。然而,我找不到任何iframe。 我尝试了一种标准方法: