当前位置: 首页 > 知识库问答 >
问题:

Kafka流处理器API上下文.转发

朱海超
2023-03-14

对于传入记录,我需要验证值,并且基于结果对象,我需要将错误转发到不同的主题,如果成功验证,则使用context.forward()转发相同的错误。可以使用本链接中提供的DSL来完成

    ValidateProcessor.java

    @Override
    public void process(String key, String value) {
        Object result = //validation logic
        if(result.isSuccessful()) {
            context().forward(key, value);
         }else {
            context.forward("error",Object)
        }

}

现在,调用者再次需要检查并根据键来区分接收器主题。我使用processorAPI是因为我需要use头。

编辑:

branch(new predicate{
 business logic 
 if(condition)
   return true
 else
   return false;

当条件为false时,如何推送到不同的流。当前正在创建另一个谓词,该谓词收集链中不满足上述谓词的所有其他记录。在同一个谓词中有什么方法可以做吗?

共有1个答案

刘昌翰
2023-03-14

当您指定拓扑时,您将为所有节点分配名称并连接它们:

java prettyprint-override">Topology topology = new Topology();
topology.addSource("source", ...);
topology.addProcessor("X", ..., "source"); // connect source->X
topology.addSink("Y", ..., "X"); // connect X->Y
topology.addSink("Z", ..., "X"); // connect X->Z

如果处理器“X”连接到下游处理器“Y”和“Z”,则可以使用节点名向“Y”或“Z”发送记录。如果不指定名称,记录将发送到所有下游(“子”)处理器。

// this is `process()` of "X"
public void process(String key, String value) {
    context.forward(newKey, newValue); // send to both Y and Z
    context.forward(newKey, newValue, To.child("Y")); // send it only to Y
    context.forward(newKey, newValue, To.child("Z")); // send it only to Z
}
 类似资料:
  • 使用kafka-stream0.10.0.0,我在转发消息时定期在StreamTask中看到空指针异常。它在10%到50%的调用之间变化。NPE发生在这个方法中: 似乎在某些情况下,thisNode字段为空。知道是什么导致了这种情况吗?堆栈跟踪在下面。

  • 我正在使用kafka处理器API做一些自定义计算。由于某些复杂的处理,DSL并不是最佳的选择。流代码如下所示。 我需要清除一些项目从状态存储基于一个事件来在一个单独的主题。我无法找到正确的方法来使用Processor API连接另一个流,或者通过其他方法来侦听另一个主题中的事件,从而能够触发CustomProcessor类中的清理代码。有没有一种方法可以在处理器API中获取另一个主题中的事件?或者

  • 使用kafka processor API(不是DSL)读取源主题并写入目标主题,对于单个kafka集群设置(也就是说,如果源主题和目标主题都驻留在同一集群上)来说工作很好,但是当源主题和目标主题驻留在不同的kafka集群上时,我将获得目标处理器上下文的NullPointerException 我们如何使用kafka streams处理器API从一个集群中的一个主题写到另一个集群中的另一个主题?

  • 问题内容: 我有一个奇怪的问题,我想使用上下文处理器添加全局查询。这是我通过以下方法做到的: 这样在我的应用中创建了一个processor.py: 并在我的setting.py结尾处添加了以下内容: 最后,我通过以下观点: 在我的index.html模板上: 最后是我的网址: 我的foos显示没有问题,但是我的media_url和其他上下文消失了。可能是什么问题 问题答案: 当你指定时: 在设置文

  • 我知道这里之前有人问过这个问题:Kafka流并发? 但这对我来说很奇怪。根据文档(或者我可能遗漏了什么),每个分区都有一个任务,这意味着不同的处理器实例,每个任务由不同的线程执行。但是当我测试它的时候,我看到不同的线程可以得到不同的处理器实例。因此,如果你想在处理器中保持内存状态(老式的方式),你必须锁定? 线程ID:88 ID:c667e669-9023-494b-9345-236777e9df

  • 我正在尝试使用Spring boot编写一个Kafka流处理器,但当消息产生到主题中时,它不会被调用。 主题消息有不同的类型,并且是Avro格式的。在模式注册表中使用Avro UNION注册模式。 这些是主题 application.yml我正在使用cp-all-in-one-community作为docker-file 但现在我得到以下错误: