当前位置: 首页 > 知识库问答 >
问题:

如果在处理步骤中出现故障,如何使Spring cloud stream Kafka streams binder重试处理消息?

魏鸿
2023-03-14

我正在使用Spring Cloud Stream制作Kafka流。在消息处理应用程序中,可能会产生错误。因此,不应再次提交和重试消息。

我的申请方法-

@Bean
public Function<KStream<Object, String>, KStream<String, Long>> process() {
return (input) -> {
KStream<Object, String> kt = input.flatMapValues(v -> Arrays.asList(v.toUpperCase().split("\\W+")));
KGroupedStream<String, String> kgt =kt.map((k, v) -> new KeyValue<>(v, v)).groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
KTable<Windowed<String>, Long> ktable = kgt.windowedBy(TimeWindows.of(500)).count();
KStream<String, WordCount> kst =ktable.toStream().map((k,v) -> {
WordCount wc = new WordCount();
wc.setWord(k.key());
wc.setCount(v);
wc.setStart(new Date(k.window().start()));
wc.setEnd(new Date(k.window().end()));

dao.insert(wc);

return new KeyValue<>(k.key(),wc);
});
return kst.map((k,v) -> new KeyValue<>(k, v.getCount()));
};
}

这里,如果DAO insert方法失败,则不应将消息发布到输出主题,并应重试处理同一消息。

我们如何配置kafka streams binder来执行此操作?。非常感谢您的帮助。

共有1个答案

胡元忠
2023-03-14

Spring Cloud Stream Kafka Streams绑定器本身在业务逻辑的执行中不提供这样的重试机制。但是,解决此用例的一种方法可能是将您的关键调用(dao.insert()在本例中)包装在您在本地定义的RetryTemboard中。这是一个可能的实现,它以1秒的退避策略重试10次。如果您正在尝试此解决方案,请确保从您的主要业务逻辑中提取RetryTemboard相关的通用代码。我还没有尝试过这个,但它应该可以工作。

KStream<String, WordCount> kst =ktable.toStream().map((k,v) -> {
  WordCount wc = new WordCount();
  ...

  org.springframework.retry.support.RetryTemplate retryTemplate = new 
   RetryTemplate();

  RetryPolicy retryPolicy = new SimpleRetryPolicy(10);
  FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
  backOffPolicy.setBackOffPeriod(1000);

  retryTemplate.setBackOffPolicy(backOffPolicy);
  retryTemplate.setRetryPolicy(retryPolicy);

  retryTemplate.execute(context -> {
    try {
      dao.insert(wc);
    }
    catch (Exception e) {
      throw new IllegalStateException(..);
   }
  });

  return new KeyValue<>(k.key(),wc);
});

重试道插入操作10次后的事件,如果仍然失败,将抛出异常,该异常将终止应用程序,在这种情况下不会提交偏移量。重新启动时,在修复了底层问题后,您的应用程序仍应从该偏移量继续。

 类似资料:
  • Webpack 的配置比较复杂,很容出现错误,下面是一些通常的故障处理手段。 一般情况下,webpack 如果出问题,会打印一些简单的错误信息,比如模块没有找到。我们还可以通过参数 --display-error-details 来打印错误详情。 $ webpack --display-error-details Hash: a40fbc6d852c51fceadb Version: webpa

  • 有一种方法可以控制作业失败后在Azkaban中发生的事情,我的意思是,如果特定的作业失败,就做特定的事情,假设一个对hive的加载失败了,我想向splank发送错误,这可能吗?或者我应该创建特定的作业来插入并像python那样处理失败 谢谢

  • 我有一个spring批处理作业,预计将根据FIFO顺序处理'N'个作业ID。这个Spring批处理作业有5个步骤。 我们使用DECIDER来确定是否有更多的job-id。如果是,请转到第一步并运行该job-id的所有步骤。 我在spring-batch发出的日志中看到“duplicate step”消息,在第一个作业中的步骤(例如job-id=1)获得未知状态之前,该消息似乎没有问题。在这种情况下

  • 问题内容: 我有一个用于捕获任何分段错误或ctrl- c的应用程序。使用下面的代码,我能够捕获分段错误,但是该处理程序一次又一次地被调用。我该如何阻止他们。供您参考,我不想退出我的申请。我只是可以小心释放所有损坏的缓冲区。 可能吗? 处理程序就是这样。 在这里,对于Segmentation故障信号,处理程序被多次调用,并且很明显MyfreeBuffers()给我释放已释放的内存的错误。我只想释放一

  • 我有一个应用程序,我用它来捕捉任何分割错误或ctrl-c。使用下面的代码,我能够捕获分段错误,但是处理程序被一次又一次地调用。我怎样才能阻止他们。告诉你,我不想退出我的申请。我只是可以小心释放所有损坏的缓冲区。 可能吗? handler是这样的。 这里的分段故障信号,处理程序被多次调用,因为明显的MyFreeBuffers()给我释放已经释放的内存的错误。我只想免费一次,但仍然不想退出应用程序。

  • 处理 new 故障的方法有多种。到目前为止,我们介绍过用宏 assert 测试 new 返回的值。如果返回值为0,则assert宏终止程序。这不是处理new故障的健壮机制,它不允许我们用任何方法从故障恢复。ANSI/ISO C++ 草案标准指定,出现 new 故障时抛出bad_alloc异常(在头文件<new>中定义)。但许多编译器目前还不支持草案标准,仍然在new故障时返回0。本节介绍三个new