当前位置: 首页 > 知识库问答 >
问题:

Kafka Streams没有为加入的流触发输出?

乐正心思
2023-03-14

我有来自 3 个 mysql 表、1 个主表和两个子表的原始流。我尝试加入三个原始流并转换为单个输出流。如果父流上有任何更新,但如果子流发生任何变化,则不触发输出,它就可以工作。

    @StreamListener
    public Stream<Long, Output> handleStreams(@Input KStream<Long, Parent> parentStream,
    @Input KStream<Long, Child1> child1Stream,
    @Input KStream<Long, Child2> child2Stream) {

    KTable<Long, Parent> parentTable = convertParent(parentStream);
    KTable<Long, ArrayList<Child1>> child1Table = convertChild1(parentStream);
    KTable<Long, ArrayList<Child2>> child2Table = convertChild2(parentStream);

    parentTable
           .leftJoin(child1Table, (parent, child1List) -> new Output(k, v))
           .leftJoin(child2Table, (output, child2List) -> output.setChild2List(child2List))
           .toStream()
        }

父流上的任何新添加或更新都由处理器拾取,并将其与其他KTable连接,并在输出流上返回。但对child1stream或child2stream的任何添加或更新都不会触发输出流。

我认为将所有输入流设为 KTable,它们将始终html" target="_blank">存储更改,因为它们都具有相同的键,并且父表或子表的任何更新都将被选取连接。但它没有发生,谁能建议我在这里错过了什么?

我已经尝试了KStream-KStream、Stream-KTable、KTable-KTable连接,没有一个能在子更新的情况下工作。

-谢谢

共有2个答案

宋康安
2023-03-14

注意子表是如何从与父表相同的流中创建的:

KTable<Long, ArrayList<Child1>> child1Table = convertChild1(parentStream);
KTable<Long, ArrayList<Child2>> child2Table = convertChild2(parentStream);

不确定convertChild1和convertChild2方法的作用,但它们不应该分别作为参数child1Stream和child2Stream吗?

甘兴学
2023-03-14

您可以显示EnableB的和您要绑定到的处理器接口的位置吗?

这在我看来不太对劲:

@StreamListener
    public Stream<Long, Output> handleStreams(@Input KStream<Long, Parent> parentStream,
    @Input KStream<Long, Child1> child1Stream,
    @Input KStream<Long, Child2> child2Stream) {

您没有在输入上指定绑定。当您有多个输入时,您需要像这样:

@StreamListener
        public Stream<Long, Output> handleStreams(@Input("input1") KStream<Long, Parent> parentStream,
        @Input("input2") KStream<Long, Child1> child1Stream,
        @Input("input3") KStream<Long, Child2> child2Stream) {

每个输入都需要在处理器接口中定义。在此查看示例:https://github . com/spring-cloud/spring-cloud-stream-samples/blob/master/Kafka-streams-samples/Kafka-streams-table-join/src/main/Java/Kafka/streams/table/join/kafkastreamstablejoin . Java # L46

 类似资料:
  • 问题内容: 我正在尝试创建一种控制台/终端,允许用户输入一个字符串,然后将其编入进程并打印出结果。就像普通的控制台一样。但是我在管理输入/输出流时遇到了麻烦。我已经研究了这个线程,但是可悲的是,该解决方案不适用于我的问题。 与标准命令(例如“ ipconfig”和“ cmd.exe”)一起,如果脚本要求输入,我还需要能够运行脚本并使用相同的输入流传递一些参数。 例如,在运行脚本“ python p

  • null 触发器是否支持一次追加模式? 这里有一个最小的应用程序来再现这个问题。要旨

  • 我正在尝试加入apache flink中的两个流以获得一些结果。 我的项目的当前状态是,我正在获取twitter数据并将其映射到一个2元组中,其中保存用户的语言和定义时间窗口中的推文总和。我这样做是为了每种语言的推文数量和每种语言的转发。推文/转发聚合在其他进程中运行良好。 我现在想得到一个时间窗口内转发次数占所有推文次数的百分比。 因此我使用以下代码: 当我打印或时,输出似乎很好。我的问题是我从

  • 问题内容: 我遵循了这里的各种问题和答案来设置我的Android活动以覆盖,以便在软键盘打开和关闭时执行逻辑。这是我的代码的相关摘录。我将其归结为最简单的情况: AndroidManifest.xml SearchActivity.java 当我更改方向时,上面的代码将显示Toast,但在软键盘打开或关闭时,则无任何作用。我已经测试过通过EditText聚焦打开软键盘,并通过长按菜单按钮手动打开它

  • 问题内容: 永远不会在我的Angular控制器中触发。下列: 永远不会碰到断点,永远不会弹出警报。我认为这是非常标准的用法,所以我不确定它会是什么,但绝对不会触发。控制器中的其他所有内容均正常运行。 是什么原因造成的? 问题答案: 根据上面的评论,听起来您有这样的事情: 我不确定ng-view标记中的内容会做什么,但是是从ng-view范围 发出 的。这意味着,它只会从上升,因此您的控制器将永远不

  • 问题内容: 我下面有以下代码示例。你可以在其中输入的命令,即回显结果。但是,先读后。其他输出流不起作用? 为什么会这样或我做错了什么?我的最终目标是创建一个线程计划任务,该任务定期执行对/ bash的命令,因此必须一前一后工作,而不能停止工作。我也一直在经历错误的任何想法? 谢谢。 问题答案: 首先,我建议更换生产线 与线 ProcessBuilder是Java 5中的新增功能,它使运行外部进程更