当前位置: 首页 > 知识库问答 >
问题:

如何停止发送到Kafka主题当控件去抓块函数Kafkaspring

薛经纶
2023-03-14

你能告诉我,我如何停止发送到我的第三个Kafka主题,当控件到达catch块时,当前消息被发送到错误主题以及在正常处理情况下它应该发送到的主题。一段代码如下所示:

@Component
public class Abc {
private final StreamBridge streamBridge;
public Abc (StreamBridge streamBridge)
this.streamBridge = streamBridge;
@Bean
public Function<KStream<String, KafkaClass>, KStream<String,KafkaClass>> hiProcess() {
return input -> input.map((key,value) -> {
try{
KafkaClass stream = processFunction();
}
catch(Exception e) {
Message<KakfaClass> mess = MessageBuilder.withPayload(value).build();
streamBridge.send("errProcess-out-0". mess);
}
return new KeyValue<>(key, stream);
})
}
}

    

共有1个答案

邓卓
2023-03-14

这可以使用以下模式实现:

KafkaClass stream;
return input -> input
    .branch((k, v) -> {
        try {
            stream = processFunction();
            return true;
        }
        catch (Exception e) {
            Message<KakfaClass> mess = MessageBuilder.withPayload(value).build();
            streamBridge.send("errProcess-out-0". mess);
            return false;
        }
       },
       (k, v) -> true)[0]
    .map((k, v) -> new KeyValue<>(k, stream));

这里,我们使用kstream的分支特性(API)将输入分为两个路径--正常流和导致错误的流。这是通过向branch方法调用提供两个筛选器来实现的。第一个过滤器是调用processfunction方法并返回响应的正常流。如果我们没有得到异常,过滤器返回true,分支操作的结果在输出数组的第一个元素[0]中捕获,该元素在map操作的下游处理,并将最终结果发送给出站主题。

另一方面,如果抛出异常,则使用StreamBridge向错误主题发送所需的内容,过滤器返回false。由于下游map操作只对分支[0]的数组的第一个元素执行,因此不会向外发送任何内容。当第一个筛选器返回false时,它会转到第二个筛选器,后者总是返回true。这是一个完全忽略结果的无操作筛选器。

这种特殊实现的一个缺点是,您需要将来自processfunction的响应存储在实例字段中,然后对每个传入的kstream记录进行变异,以便在发送输出的最终map方法中访问其值。然而,对于这个特定的用例,这可能不是一个问题。

 类似资料:
  • 我试图使用pyspark将每日批次的数据发送到Kafka主题,但我当前收到以下错误: Traceback(最近的最后一次调用): File", line 5, in File"/usr/local/rms/lib/hdp26_c5000/park2/python/pyspark/sql/readwriter.py", line 548, in保存自己。_jwrite.save()File"/usr

  • 我正在搜索框上使用事件 列的值列表,在变量中提到。 问题 什么时候 如果我输入一个字符(比如),筛选器工作正常。但是当我连续输入字符时,浏览器挂起了。原因是筛选器一直在激发筛选器框上的每一个输入(因为am使用事件) 我想要的 我在angular-2和打字。但是这个问题并不只是与或或或相关,因为我想要的是一个想法,而不是解决方案。所以我会为这个问题添加这些标记。不要移除。谢谢

  • 我在sping-boot应用程序中使用sping-kafka发送数据主题。我需要从oracle表中获取数据并发送它。 我从oracle表中获取列表。如何将它们发送到主题? 即。 > 有没有办法将它们作为列表发送?如果是,如何发送?如果是,那么如何在消费者端反序列化它? 是否可以使用spring book和spring kafka以流式方式发送数据?如果是,请提供更多信息或样本/片段plz。。。 如

  • 是否可以验证/筛选发送到Kafka主题的消息?

  • 我在Kafka中配置了3个代理运行在不同的端口上。我用的是春云流Kafka 我正在创建一个获得连续数据流的数据管道。我在kafka topic中存储3个代理运行的数据流。到目前为止没有问题。我担心的是假设3个经纪人倒下了5分钟,然后在那个时候我无法获得关于kafka主题的数据。将会有5分钟的数据丢失。从Spring开机我会得到警告 有没有一种方法可以在所有代理都停机时临时存储数据,并在代理再次启动

  • 问题内容: 我有我的自定义Java对象,希望利用JVM的内置序列化将其发送到Kafka主题,但是序列化失败并出现以下错误 org.apache.kafka.common.errors.SerializationException:无法将com.spring.kafka.Payload类的值转换为value.serializer中指定的org.apache.kafka.common.serializ