当前位置: 首页 > 知识库问答 >
问题:

KStream和KTable之间的时间语义

姬衡
2023-03-14

我正在尝试构建以下拓扑:

>

  • 使用Debezium连接器,我拉出2个表(我们称它们为表A和表DA)。根据DBZ,存储表行的主题具有{before:“...”,after:“...”}结构。

    在我的拓扑中,第一步是从这两个“表”主题创建“干净的”KStreams。那里的子拓扑大致如下所示:

    private static KStream<String, TABLE_A.Value> getTableARowByIdStream(
        StreamsBuilder builder, Properties streamsConfig) {
      return builder
          .stream("TABLE_A", Consumed.withTimestampExtractor(Application::getRowDate))
          .filter((key, envelope) -> [ some filtering condition ] )
          .map((key, envelope) -> [ maps to TABLE_A.Value ] )
          .through(tableRowByIdTopicName);
    }
    

    请注意,我显式地分配记录时间,因为表行将在它们最初发布后被CDC'ed“年”。该函数目前正在做的是伪造从2010-01-01开始的时间,并使用AtomicInteger为每个消耗的实体添加1毫秒。它对表A这样做,但对DA不这样做(我将在后面解释原因)。

    private static KTable<String, EntityInfoList> getEntityInfoListById(
        KStream<String, TABLE_A.Value> tableAByIdStream) {
      return tableAByIdStream
          .map((key, value) -> [ some mapping ] )
          .groupByKey()
          .aggregate(() -> [ builds up a EntityInfoList object ] ));
    }
    
    private static KStream<String, OutputTopicEntity> getOutputTopicEntityStream(
        KStream<String, Table_DA.Value> tableDAStream,
        KTable<String, EntityInfoList> tableA_KTable) {
    
      KStream<String, Table_DA>[] branches = tableDAStream.branch(
          (key, value) -> [ some logic ],
          (key, value) -> true);
    
      KStream<String, OutputTopicEntity> internalAccountRefStream = branches[0]
          .join(
              tableA_KTable,
              (streamValue, tableValue) -> [ some logic to build a list of OutputTopicEntity ])
          .flatMap((key, listValue) -> [ some logic to flatten it ]));
    
       [ similar logic with branch[1] ]
    }
    

    因此,我没有所有的“加入点击”,我所期待的,这也是不确定的。基于这句话,我的理解是KTable与GlobalKTable和leftJoin()与outerJoin()之间有什么区别?却是相反的:

    对于流表联接,Kafka流对齐基于记录时间戳排序的记录处理。因此,对表的更新与流的记录对齐。

    我的经验是,到目前为止,这还没有发生。我还可以很容易地看到我的应用程序在消耗了Table_DA流中的所有条目(它碰巧是Table_DA流的10倍小)之后是如何继续在Table_A主题中翻腾的。

  • 共有1个答案

    郎鸿雪
    2023-03-14

    时间戳同步是2.1.0版本之前的最大努力(参见https://issues.apache.org/jira/browse/kafka-3514)。

    从2.1.0开始,时间戳是严格同步的。然而,如果一个输入没有任何数据,Kafka流将“强制”处理,如KIP-353中所述,以避免永远阻塞。如果您有突发输入,并且希望在一个输入没有数据的情况下“阻塞”处理一段时间,您可以增加配置参数max.task.idle.ms(默认值为0),如2.1.0中通过KIP-353介绍的。

     类似资料:
    • 我正在尝试以下列方式使用kafka流实现事件源模式。 我在一家安全服务公司工作,处理两个用例: 注册用户,处理 应生成 。 更改用户名,处理 应生成 。 我有两个主题: 命令主题,每个命令都是键控的,密钥是用户的电子邮件。例如: 实现思想可以用以下拓扑表示: 对于这个拓扑,我使用的是。 此拓扑的更显式版本: 我遇到的问题: 在具有现有记录的命令主题上启动流应用程序: 在构建这样的拓扑时,我缺少什么

    • 我正在尝试使用KStream-KTable leftJoin来丰富主题A中的条目和主题B。主题A是我的KStream,主题B是我的KTtable,它有大约2300万条记录。这两个主题中的键都没有计算,所以我必须使用reducer将KStream(主题B)转换为KTable。 下面是我的代码: 1)KTable初始化速度慢。(2000 msg/s左右),这正常吗?我的主题是只有1个分区。有什么方法可

    • 我正在使KStream-KStream连接,其中创建2个内部主题。而KStream-KTable join将创建1个内部主题+1个表。 就性能和其他因素而言,哪个更好?

    • 我试图加入KStream与KTable。如果没有连接,我可以从中间主题“book属性-by-id”中阅读。 KTable的示例消息: KStream的示例消息: “最终聚合”主题的所需输出: 这是密码 加入KStream时出现异常 线程“xxx-StreamThread-1”组织中出现异常。阿帕奇。Kafka。溪流。错误。TopologyBuilderException:无效的拓扑构建:未找到流线

    • 本文向大家介绍IOS 时间和时间戳之间转化示例,包括了IOS 时间和时间戳之间转化示例的使用技巧和注意事项,需要的朋友参考一下 以毫秒为整数值的时间戳转换 时间戳转化为时间NSDate 时间转化为时间戳 通过比较时间与当前时间返回年月日的方法 以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持呐喊教程。

    • 问题内容: 我想使用JSON从Python发送序列化形式的datetime.datetime对象,并使用JSON在JavaScript中反序列化。做这个的最好方式是什么? 问题答案: 你可以在中添加参数来处理此问题: 这是格式。 更全面的默认处理程序功能: