当前位置: 首页 > 知识库问答 >
问题:

是否指定了Kafka Streams拓扑的处理顺序?

颜实
2023-03-14

我想知道是否指定了流拓扑处理消息的顺序。

示例:

        // read input messages

        KStream<String, String> inputMessages = builder.stream("demo_input_topic_1");
        inputMessages = inputMessages.peek((k, v) -> System.out.println("TECHN. NEW MESSAGE: key: " + k + ", value: " + v));

        // check if message was already processed

        KTable<String, Long> alreadyProcessedMessages = inputMessages.groupByKey().count();
        KStream<String, String> newMessages =
                inputMessages.leftJoin(alreadyProcessedMessages, (streamValue, tableValue) -> getMessageValueOrNullIfKnownMessage(streamValue, tableValue));
        KStream<String, String> filteredNewMessages =
                newMessages.filter((key, val) -> val != null).peek((k, v) -> System.out.println("FUNC. NEW MESSAGE: key: " + k + ", value: " + v));

        // process the message

        filteredNewMessages.map((key, value) -> KeyValue.pair(key, "processed message: " + value))
                .peek((k, v) -> System.out.println("PROCESSED MESSAGE: key: " + k + ", value: " + v)).to("demo_output_topic_1");

使用GetMessagEvalueOrNullifKnownMessage(...):

    private static String getMessageValueOrNullIfKnownMessage(String newMessageValue, Long messageCounter) {
        if (messageCounter > 1) {
            return null;
        }

        return newMessageValue;
    }

在测试中,该流工作。但我认为这不是保证。它只起作用,因为消息在加入之前首先由计数节点处理。

但是那个订单有保证吗?

就我在所有的文档中所看到的,对这个处理订单没有任何保证。因此,如果收到新消息,也可能发生以下情况:

    null

谢谢你

共有1个答案

融泓
2023-03-14

没有保证。即使在当前的实现中,使用了子节点的list:https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/processorContexTimpl.java#L203-L206--但是,不能保证子节点按照DSL中指定的顺序追加到这个列表中(因为在这之间有一个转换层,它可以按照不同的顺序添加节点)。而且,实现可能在任何时间点发生更改。

我能想到的唯一可行的解决办法(这是相当昂贵的)是,在一个重复主题中发送流端数据:

KStream<String, String> newMessages =
   inputMessages.through(...) // note: as of 2.6.0 release, you could use `repartition()` instead of `through()`
                .leftJoin(alreadyProcessedMessages, ...);

这样,KTable将在连接执行之前更新,因为需要首先回读记录。但是,由于在回读记录时没有任何保证,因此在连接完成之前可能会对表进行多次更新,这可能会使您处于与以前类似的情况。(另外,通过另一个主题重新路由数据也有些昂贵。)

使用处理器API,您将拥有移动控制,因为您可以调用context.forward(...,to.child(...))。但是,对于这种情况,您还需要手动实现聚合和联接:

KStream routing = inputMessages.transform(...);
routing.groupByKey(...);
routing.leftJoin(...);

在本例中,您将在transform()之后获得要避免的重新分区主题:

KStream routing = inputMessages.transform(...);
routing.transform(...); // implement the aggregation
routing.transform(...); // implement the join
 类似资料:
  • 我正在运行一个3节点的Storm集群。我们正在提交一个包含10个工作者的拓扑结构,以下是拓扑结构的详细信息 我们每天处理800万到1000万个数据。问题是topolgy只运行了2到3天,而我们在kafka spout中看到了一些失败的元组,没有处理任何消息。当提交新的topolgy时,它工作良好,但在2到3天后,我们又看到了同样的问题。有人能给我们一个解决方案吗。下面是我的storm配置

  • 自定义拓扑 Mininet 提供了 Python API,可以用来方便的自定义拓扑结构。 在 mininet/custom 目录下给出了几个例子。例如在 topo-2sw-2host.py 文件中定义了一个 mytopo,则可以通过 --topo 选项来指定使用这一拓扑,命令为 sudo mn --custom ~/mininet/custom/topo-2sw-2host.py --topo m

  • 因此,在某种程度上,拓扑描述了一个文件所需要的流,以计数它所拥有的唯一单词。 如果我有两个文件file1和file2,那么一个应该能够调用相同的拓扑并创建该拓扑的两个实例来运行相同的字数。 为了跟踪单词计数是否确实完成,一旦文件处理完毕,单词计数拓扑的实例应该具有完成状态。 对于文件2 更别提使用storm客户端同样上传jar 另一个问题是,一旦文件被处理,拓扑就无法完成。在我们对拓扑发出杀戮之前

  • 问题内容: 我正在尝试在Java项目上运行Dagger 2和Lombok。当然,龙目岛必须首先运行,但是它的确存在取决于机会。起初,我怀疑我可以通过类路径中库jar的相应位置来指定顺序,但是该顺序显然被忽略了。 有没有办法指定它们以某种方式运行的顺序,还是我必须忍受不能合并两个AP的情况? 我制作了一个SSCCE测试用例。 一个简单的&就足以说明问题- 如果在App.java中注释第18行和取消注

  • 我正在尝试在我的Java项目中运行Dagger2和Lombok。当然,龙目岛必须先跑,但它是否真的跑了,似乎要看机会。起初,我怀疑我可以通过库jar在类路径中各自的位置来指定顺序,但这个顺序显然被忽略了。 有没有一种方法可以指定它们运行的顺序,或者我只能忍受不能组合两个AP? 我已经生成了一个SSCCE测试用例。 一个简单的&就足以说明这个问题--如果您在app.java中注释第18行而取消注释第

  • 拓扑排序主要解决的问题是给一个图的所有节点排序。 一、什么是拓扑排序 在图论中,拓扑排序(Topological Sorting)是一个有向无环图(DAG, Directed Acyclic Graph)的所有顶点的线性序列。且该序列必须满足下面两个条件: (1)每个顶点出现且只出现一次。 (2)若存在一条从顶点 A 到顶点 B 的路径,那么在序列中顶点 A 出现在顶点 B 的前面。 有向无环图(