当前位置: 首页 > 知识库问答 >
问题:

Kafka流连接

颛孙森
2023-03-14

我有两个Kafka主题-推荐点击。第一个主题包含由唯一Id(称为recommendationsId)键入的recommendations对象。每个产品都有一个用户可以单击的URL。

clicks主题获取通过单击推荐给用户的产品URL生成的消息。它是如此设置的,这些单击消息也由推荐id键控。

请注意

>

每个单击对象都会有一个相应的推荐对象。

click对象的时间戳将晚于Recommensions对象。

建议和相应的点击之间的间隔可能是几秒钟到几天(最多7天)。

我的目标是使用Kafka streams join连接这两个主题。我不清楚的是我应该使用KStream x KStream连接还是KStream x KTable连接。

我通过加入clicksstreambyrecommendations表来实现KStream x KTablejoin。但是,如果建议是在joiner启动之前生成的,而单击是在joiner启动之后到达的,则我无法看到任何连接的clicks建议对。

我是否使用了正确的连接?我应该使用KStream x KStream连接吗?如果是的话,为了最多过去7天能够加入一个有推荐的点击,我是不是应该把窗口大小设置为7天呢?在这种情况下,我是否也需要设置“保留”期限?

我执行KStream x KTable连接的代码如下。请注意,我已经定义了类建议单击及其相应的serde。单击消息只是普通的字符串(url)。此URL字符串与推荐对象连接,以创建一个单击对象,该对象被发送到jointTopic

public static void main(String[] args){
    if(args.length!=4){
      throw new RuntimeException("Expected 3 params: bootstraplist clickTopic recsTopic jointTopic");
    }

    final String booststrapList = args[0];
    final String clicksTopic = args[1];
    final String recsTopic = args[2];
    final String jointTopic = args[3];

    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "my_joiner_id");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, booststrapList);
    config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, JoinSerdes.CLICK_SERDE.getClass().getName());

    KStreamBuilder builder = new KStreamBuilder();

    // load clicks as KStream
    KStream<String, String> clicksStream = builder.stream(Serdes.String(), Serdes.String(), clicksTopic);

    // load recommendations as KTable
    KTable<String, Recommendations> recsTable = builder.table(Serdes.String(), JoinSerdes.RECS_SERDE, recsTopic);

    // join the two
    KStream<String, Click> join = clicksStream.leftJoin(recsTable, (click, recs) -> new Click(click, recs));

    // emit the join to the jointTopic
    join.to(Serdes.String(), JoinSerdes.CLICK_SERDE, jointTopic);

    // let the action begin
    KafkaStreams streams = new KafkaStreams(builder, config);
    streams.start();
  }

只要在joiner(上面的程序)运行后生成了建议和单击,这就可以正常工作。但是,如果在运行joiner之前为其生成建议的单击到达,我看不到任何连接发生。我该如何解决这个问题?

如果解决方案是使用KStream x KStreamjoin,请帮助我了解应选择的窗口大小和保留期。


共有1个答案

阴雪风
2023-03-14

你的总体观察是正确的。从概念上讲,两种方法都可以得到正确的结果。如果使用流表联接,则有两个缺点(不过在未来的Kafka版本中可能会重新讨论和改进)

  • 您已经提到,如果在相应的推荐之前处理了click-get,那么(内部)连接将失败。但是,正如您所知,会有建议,您可以使用左连接而不是内部连接,检查连接结果,并在建议为null(即,您得到重试逻辑)时将单击事件写回输入主题,连续单击单个建议可能会出现问题,您可能需要在应用程序代码中对此进行说明

如果您使用流-流连接,并且在推荐后7天可能会发生单击,则您的窗口大小必须为7天-否则,单击将不会与推荐连接。

  • 这种方法的缺点是,您需要更多的内存/磁盘,因为您将在应用程序中缓冲过去7天的所有单击和所有建议
  • 优点是,订单或处理(即,推荐与点击)不再重要(即,您不需要实施上述重试策略)
  • 此外,旧的建议将自动过期,因此您不需要实现特殊的“过期逻辑”

对于流-流连接,保留时间的答案有点不同。它必须至少7天,因为窗口大小是7天。否则,您将删除您的“运行窗口”的记录。您还可以将保留期设置得更长,以便能够处理“后期数据”。假设用户在窗口时间帧结束时(推荐的7天时间跨度结束前5分钟)点击,但点击仅在1小时后报告给您的应用程序。如果您的保留期为7天,则此延迟到达的记录将无法再处理(因为建议已被删除)。如果您设置了较大的保留期,例如,8天,您仍然可以处理延迟记录。这取决于你的应用程序/语义需要你想要使用什么保留时间。

摘要:从实现的角度来看,使用流连接比使用流表连接简单。但是,内存/磁盘节省是可以预期的,并且可能会很大,具体取决于您的点击流数据速率。

 类似资料:
  • 我想连接两个主题流(左连接),并在连接的流上进行基于窗口的聚合。然而,聚合将某些消息计数两倍,因为在连接期间,根据正确主题中的延迟,某些消息将发出两倍。以下是POC的代码。 它是否可以修复以避免因连接而重复?

  • 我想加入一个 kstream:从主题创建,该主题具有JSON值。我使用值中的两个属性来重新键控流。示例值(json的片段)。我创建了自定义pojo类并使用自定义SERDES。 键映射为: 我查看了KStream并打印了键和我使用的属性。看起来都很好。 null 现在,当我执行内部连接并对主题进行窥视或通过/时,我看到键和值不匹配。Join似乎不起作用, 我有完全相同的东西通过ksql工作,但想做我

  • 当kafka streams应用程序运行且kafka突然停机时,应用程序进入“等待”模式,发送警告日志的消费者和生产者线程无法连接,当kafka恢复时,一切(理论上)都应该恢复正常。我正在尝试获取有关此情况的警报,但我无法找到捕获该警报的位置并发送日志/度量。我尝试了以下方法: 但这只发生在异常情况下,而不是这里 扩展并将属性更改为我的类,从而扩展了此接口。 我知道Kafka有自己的衡量标准,我可

  • 我正在导入一个DB,其中包含一些表示多对多和一对多关系的链接表。 1-到目前为止,根据我对Kafka流的理解,我似乎需要为每个链接表提供一个流,以便执行聚合。KTable将不可用,因为记录是按键更新的。但是,聚合的结果可能是Ktable中的一个。 2-然后是外键上的连接问题。似乎唯一的方法是通过GlobalKtable。link-table-topic->link-table-stream->li

  • 我有一个我真的无法解决的问题。所以我有一个kafka流,其中包含一些这样的数据: 我想用另一个值“bookingId”替换“adId”。此值位于csv文件中,但我无法真正弄清楚如何使其工作。 这是我的映射csv文件: 所以我的输出最好是这样的 该文件可以每小时至少刷新一次,因此它应该会接收对它的更改。 我目前有一个不适合我的代码: 代码只运行一次,然后停止,因此它不会使用csv文件转换kafka中

  • 我们已经成功地使用了MySQL - 使用jdbc独立连接器的kafka数据摄取,但现在在分布式模式下使用相同的连接器(作为kafka connect服务)时面临问题。 用于独立连接器的命令,工作正常 - 现在,我们已经停止了这一项,并以分布式模式启动了kafka connect服务,如下所示 2 个节点当前正在运行具有相同连接服务。 连接服务已启动并正在运行,但它不会加载 下定义的连接器。 应该对