当前位置: 首页 > 知识库问答 >
问题:

Kafka流控制流

暴乐邦
2023-03-14

我有一个关于kafka流应用程序中的控制流的基本问题。如果有两个源主题

我做了一个非常初步的测试,当记录被消费时,我偷看了一下,然后用一个简单的速溶软件打印了它们被处理的瞬间。现在

KStream<String, String> akStream= builder.stream("A",
        Consumed.with(Serdes.String(), Serdes.String()).withOffsetResetPolicy(Topology.AutoOffsetReset.EARLIEST))
        .peek((s, string) -> System.out.println("Topic A at " + Instant.now() ));

KStream<String, String> bkStream= builder.stream("B",
        Consumed.with(Serdes.String(), Serdes.String()))
        .peek((s, string) -> System.out.println("Topic B " + Instant.now()));

这些是主题中记录的开始和结束时间戳

A : 2020-03-27 14:36:04 (epoch: 1585316164843) 2020-03-27 14:34:02 (epoch: 1585316042569)
B : 2020-03-30 11:04:17 (epoch: 1585559057167) 2020-03-17 14:44:38 (epoch: 1584452678527)

主题B记录在主题A之前提取。Sysout显示主题B中的所有记录。有人能帮助理解这一点吗?我希望在编写具有多个输入源的流式应用程序时使用这种理解。

提前感谢

共有2个答案

左丘弘致
2023-03-14

看情况。一般来说,不能保证不同主题之间的处理顺序。但是有一个例外:如果一个任务处理来自不同主题的数据,那么记录将按时间戳顺序处理。然而,这是一种尽力而为的方法;从Kafka Streams 2.3开始,这些排序保证得到了改进,您可以使用max.task.idle.ms配置来影响它们。

贾俊艾
2023-03-14

按照您构建流的方式,每个流都是独立存在的,没有顺序保证。

关于根据时间戳处理记录。这只能在一个时间窗口内完成。例如,如果你有两个主题A和主题B,你可以加入它们,通过一个时间窗口,你可以安排活动。

<VO,VR> KStream<K,VR> join​(KStream<K,VO> otherStream,
                           ValueJoiner<? super V,? super VO,? extends VR> joiner,
                           JoinWindows windows)
 类似资料:
  • 截止到现在,在我们所看过的程序中,总是有一系列语句从上到下精确排列,并交由 Python 忠实地执行。如果你想改变这一工作流程,应该怎么做?就像这样的情况:你需要程序作出一些决定,并依据不同的情况去完成不同的事情,例如依据每天时间的不同打印出 ‘早上好’ ‘Good Morning’ 或 ‘晚上好’ ‘Good Evening’? 正如你可能已经猜测到的那番,这是通过控制流语句来实现的。在 Pyt

  • Swift 提供所有多样化的控制流语句。包括 while 循环来多次执行任务; if , guard 和 switch 语句来基于特定的条件执行不同的代码分支;还有比如 break 和 continue 语句来传递执行流到你代码的另一个点上。 Swift 同样添加了 for-in 循环,它让你更简便地遍历数组、字典、范围和其他序列。 Swift 的 switch 语句同样比 C 中的对应语句多了不

  • 本页包含内容: For 循环 While 循环 条件语句 控制转移语句(Control Transfer Statements) Swift提供了类似 C 语言的流程控制结构,包括可以多次执行任务的for和while循环,基于特定条件选择执行不同代码分支的if和switch语句,还有控制流程跳转到其他代码的break和continue语句。 除了 C 语言里面传统的 for 条件递增(for-co

  • 程序主要有顺序、分支和循环几种执行流程。本节主要讨论如何将Go语言的控制流比较直观地转译为汇编程序,或者说如何以汇编思维来编写Go语言代码。 3.5.1 顺序执行 顺序执行是我们比较熟悉的工作模式,类似俗称流水账编程。所有不含分支、循环和goto语句,并且没有递归调用的Go函数一般都是顺序执行的。 比如有如下顺序执行的代码: func main() { var a = 10 pri

  • 任何编程语言都包含的一个必要部分就是改变控制流程:if/else,for等。让我们讲述 Rust 语言中 的这部分内容。

  • 流程控制 和Java、PHP等语言不一样,sh的流程控制不可为空,如: <?php if (isset($_GET["q"])) { search(q); } else { //do nothing } 在sh/bash里可不能这么写,如果else分支没有语句执行,就不要写这个else。 还要注意,sh里的if [ $foo -eq 0 ],这个方括号跟Java/PHP里if后面的